I'm burnt out, I can't get multithreaded audio feature extractor to work :(

This commit is contained in:
2025-04-19 17:47:09 +02:00
parent b855b7e255
commit 37b6a3c5e7
9 changed files with 563 additions and 180 deletions

View File

@@ -2,6 +2,7 @@ import platform
import os import os
import pickle import pickle
import random import random
import multiprocessing
import threading import threading
import time import time
import concurrent.futures import concurrent.futures
@@ -133,155 +134,159 @@ def random_audio_chunk(n : int, seed : int = 177013) -> list[Path]:
#return random.choices(audio_paths, k=n) # Contains repeated elements #return random.choices(audio_paths, k=n) # Contains repeated elements
return random.sample(audio_paths, k=n) return random.sample(audio_paths, k=n)
class AudioFeatureExtractor(): # class AudioFeatureExtractor():
__audio_queue: list[ # List of ... # __audio_queue: list[ # List of ...
tuple[ # Pair of chunked audio and its path # tuple[ # Pair of chunked audio and its path
list[tuple[np.ndarray, float, int]], # Chunked audio # list[tuple[np.ndarray, float, int]], # Chunked audio
Path # Path to original audio # Path # Path to original audio
] # ]
] # Listed of Chunked/Resampled audio # ] # Listed of Chunked/Resampled audio
__feeder_future: concurrent.futures.Future # __feeder_future: concurrent.futures.Future
__extractor_future: concurrent.futures.Future # __extractor_future: concurrent.futures.Future
__audio_paths_list: list[Path] # __audio_paths_list: list[Path]
__max_audio_in_queue: int # __max_audio_in_queue: int
__queue_lock: threading.Lock # __queue_lock: threading.Lock
__desired_sr: int # __desired_sr: int
__mono: bool # __mono: bool
__chunk_length: float # __chunk_length: float
__overlap: float # __overlap: float
__features: dict[Path, list[tuple[np.ndarray, float, int]]] # This is a crime, I know # __features: dict[Path, list[tuple[np.ndarray, float, int]]] # This is a crime, I know
# { audioPath: # # { audioPath:
# [(embedding, pos, channel)...] # # [(embedding, pos, channel)...]
# } # # }
def __embedding_inference(self, audio_ndarray: np.ndarray) -> np.ndarray: # def __embedding_inference(self, audio_ndarray: np.ndarray) -> np.ndarray:
"""Uses embedding model to inference an audio. Returns embedding vectors. # """Uses embedding model to inference an audio. Returns embedding vectors.
Function to be overrided. Returns np.zeros(32). # Function to be overrided. Returns np.zeros(32).
Args: # Args:
audio_ndarray (np.ndarray): # audio_ndarray (np.ndarray):
Returns: # Returns:
np.ndarray: _description_ # np.ndarray: _description_
""" # """
return np.zeros(32) # return np.zeros(32)
def __embedding_extract(self, audio: tuple[np.ndarray, float, int]) -> tuple[np.ndarray, float, int, np.ndarray]: # def __embedding_extract(self, audio: tuple[np.ndarray, float, int]) -> tuple[np.ndarray, float, int, np.ndarray]:
"""Receives a tuple of audio, position, and channel ID, then adding the embedding to the tuple # """Receives a tuple of audio, position, and channel ID, then adding the embedding to the tuple
Args: # Args:
audio (tuple[np.ndarray, float, int]): tuple of audio, position, channel id # audio (tuple[np.ndarray, float, int]): tuple of audio, position, channel id
Returns: # Returns:
tuple[np.ndarray, float, int, np.ndarray]: audio, position, channel id, embedding vector # tuple[np.ndarray, float, int, np.ndarray]: audio, position, channel id, embedding vector
""" # """
audio_chunk, pos, channel_id = audio # audio_chunk, pos, channel_id = audio
return (audio_chunk, pos, channel_id, self.__embedding_inference(audio_chunk)) # return (audio_chunk, pos, channel_id, self.__embedding_inference(audio_chunk))
def __audio_queue_feeder(self): # TODO: Upgrade to multithreaded loader? # def __audio_queue_feeder(self): # TODO: Upgrade to multithreaded loader?
"""Internal thread function. Preprocess and load the audio continuously to # """Internal thread function. Preprocess and load the audio continuously to
audio_queue until the end of the audio_paths_list # audio_queue until the end of the audio_paths_list
""" # """
while (self.__audio_paths_list): # While there are still Path elements in path list # while (self.__audio_paths_list): # While there are still Path elements in path list
if (not (len(self.__audio_queue) < self.__max_audio_in_queue)): # if (not (len(self.__audio_queue) < self.__max_audio_in_queue)):
logging.info("[AFE] [Audio Queue Thread]: Queue Full, feeder thread sleeping for 5 seconds") # logging.info("[AFE] [Audio Queue Thread]: Queue Full, feeder thread sleeping for 5 seconds")
time.sleep(5) # time.sleep(5)
while(len(self.__audio_queue) < self.__max_audio_in_queue): # While the audio queue is not full # while(len(self.__audio_queue) < self.__max_audio_in_queue): # While the audio queue is not full
new_audio_path = self.__audio_paths_list[0] # new_audio_path = self.__audio_paths_list[0]
new_audio = audiopreprocessing.load_preprocessed_audio( # new_audio = audiopreprocessing.load_preprocessed_audio(
new_audio_path, # new_audio_path,
self.__desired_sr, # self.__desired_sr,
self.__mono, # self.__mono,
self.__chunk_length, # self.__chunk_length,
self.__overlap # self.__overlap
) # )
with self.__queue_lock: # with self.__queue_lock:
self.__audio_queue.append( # self.__audio_queue.append(
(new_audio, new_audio_path) # (new_audio, new_audio_path)
) # )
pop_path = self.__audio_paths_list.pop(0) # pop_path = self.__audio_paths_list.pop(0)
logging.info(f"[AFE] [Audio Queue Thread]: Added new audio to queue {pop_path}") # logging.info(f"[AFE] [Audio Queue Thread]: Added new audio to queue {pop_path}")
logging.info("[AFE] [Audio Queue Thread]: DONE. All audio files fed") # logging.info("[AFE] [Audio Queue Thread]: DONE. All audio files fed")
def __audio_queue_feature_extractor(self): # def __audio_queue_feature_extractor(self):
"""Internal thread function. Get audio from audio queue. And extract embedding vector # """Internal thread function. Get audio from audio queue. And extract embedding vector
for all audio chunks. Stores the resulting embedding into self.__features. # for all audio chunks. Stores the resulting embedding into self.__features.
With Original Audio's Path as key, and list[tuple[np.ndarray, float, int]] (list of tuple of embedding vector, position, channel id) # With Original Audio's Path as key, and list[tuple[np.ndarray, float, int]] (list of tuple of embedding vector, position, channel id)
""" # """
while (self.__audio_paths_list or self.__audio_queue): # While there are still audio to be processed # while (self.__audio_paths_list or self.__audio_queue): # While there are still audio to be processed
if (self.__audio_queue): # If audio queue is not empty # if (self.__audio_queue): # If audio queue is not empty
with self.__queue_lock: # with self.__queue_lock:
audio_to_process, audio_path = self.__audio_queue.pop(0) # Get audio from queue # audio_to_process, audio_path = self.__audio_queue.pop(0) # Get audio from queue
logging.info(f"[AFE] [Feature Extractor Thread]: Extracting {len(audio_to_process)} features from audio {audio_path}") # logging.info(f"[AFE] [Feature Extractor Thread]: Extracting {len(audio_to_process)} features from audio {audio_path}")
for audio_chunk in audio_to_process: # for audio_chunk in audio_to_process:
same_audio_chunk, timepos, channel_id, embedd_vect = self.__embedding_extract(audio_chunk) # same_audio_chunk, timepos, channel_id, embedd_vect = self.__embedding_extract(audio_chunk)
if (audio_path not in self.__features.keys()): # if (audio_path not in self.__features.keys()):
#if DEBUG: print("Adding new vector to", audio_path.name) # #if DEBUG: print("Adding new vector to", audio_path.name)
self.__features[audio_path] = [(embedd_vect, timepos, channel_id)] # self.__features[audio_path] = [(embedd_vect, timepos, channel_id)]
else: # else:
#if DEBUG: print("Adding vector to", audio_path.name) # #if DEBUG: print("Adding vector to", audio_path.name)
self.__features[audio_path].append( # self.__features[audio_path].append(
(embedd_vect, timepos, channel_id) # (embedd_vect, timepos, channel_id)
) # )
else: # else:
logging.info("[AFE] [Feature Extractor Thread]: Queue Empty, extractor thread sleeping for 5 seconds") # If audio queue is empty, wait # logging.info("[AFE] [Feature Extractor Thread]: Queue Empty, extractor thread sleeping for 5 seconds") # If audio queue is empty, wait
time.sleep(5) # time.sleep(5)
logging.info("[AFE] [Feature Extractor Thread]: DONE. Extracted all features from all audio files") # logging.info("[AFE] [Feature Extractor Thread]: DONE. Extracted all features from all audio files")
def __init__( # def __init__(
self, # self,
audio_paths_list: list[Path], # audio_paths_list: list[Path],
max_audio_in_queue: int, # max_audio_in_queue: int,
desired_sr: int, # desired_sr: int,
mono: bool, # mono: bool,
chunk_length: float = 15.0, # chunk_length: float = 15.0,
overlap: float = 2.0 # overlap: float = 2.0
): # ):
self.__audio_queue = [] # self.__audio_queue = []
self.__audio_paths_list = audio_paths_list # self.__audio_paths_list = audio_paths_list
self.__max_audio_in_queue = max_audio_in_queue # self.__max_audio_in_queue = max_audio_in_queue
self.__queue_lock = threading.Lock() # self.__queue_lock = threading.Lock()
self.__desired_sr = desired_sr # self.__desired_sr = desired_sr
self.__mono = mono # self.__mono = mono
self.__chunk_length = chunk_length # self.__chunk_length = chunk_length
self.__overlap = overlap # self.__overlap = overlap
self.__features = {} # self.__features = {}
@property # @property
def features(self) -> dict[Path, list[tuple[np.ndarray, float, int]]]: # def features(self) -> dict[Path, list[tuple[np.ndarray, float, int]]]:
return self.__features # return self.__features
def extract(self): # def extract(self):
print("Starting feature extraction for", len(self.__audio_paths_list), "file(s)") # print("Starting feature extraction for", len(self.__audio_paths_list), "file(s)")
total_amount = len(self.__audio_paths_list) # total_amount = len(self.__audio_paths_list)
t_start = time.perf_counter() # t_start = time.perf_counter()
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor: # with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
self.__feeder_future = executor.submit(self.__audio_queue_feeder) # self.__feeder_future = executor.submit(self.__audio_queue_feeder)
self.__extractor_future = executor.submit(self.__audio_queue_feature_extractor) # self.__extractor_future = executor.submit(self.__audio_queue_feature_extractor)
while (self.__feeder_future.running() or self.__extractor_future.running()): # while (self.__feeder_future.running() or self.__extractor_future.running()):
print(f"Processed {len(self.__features)}/{total_amount} (L:{len(self.__audio_queue)}/W{len(self.__audio_paths_list)})", end="\r") # print(f"Processed {len(self.__features)}/{total_amount} (L:{len(self.__audio_queue)}/W{len(self.__audio_paths_list)})", end="\r")
time.sleep(1) # time.sleep(1)
t_stop = time.perf_counter() # t_stop = time.perf_counter()
print(f"Processed {len(self.__features)}/{total_amount} (L:{len(self.__audio_queue)}/W:{len(self.__audio_paths_list)} COMPLETE)") # print(f"Processed {len(self.__features)}/{total_amount} (L:{len(self.__audio_queue)}/W:{len(self.__audio_paths_list)} COMPLETE)")
delta_t = t_stop - t_start # delta_t = t_stop - t_start
total_features = sum( [len(self.__features[path]) for path in self.__features] ) # total_features = sum( [len(self.__features[path]) for path in self.__features] )
print() # print()
print("Extraction completed") # print("Extraction completed")
print(f"Took {delta_t} seconds. Added {total_features} vectors/embeddings") # print(f"Took {delta_t} seconds. Added {total_features} vectors/embeddings")
class MultiThreadedAudioFeatureExtractor(): class MultiThreadedAudioFeatureExtractor():
# This is the third time I am rewriting this, please send help. Multithreaded apps is pure hell to develop and debug
# After testing: this will hang at the last audio, precisely at preprocessing audio. I suspect that GIL hit the performance
# so much to the point that the preprocessing routine cannot get any share of the CPU execution cycle
__audio_queue: queue.Queue[ # List of ... __audio_queue: queue.Queue[ # List of ...
tuple[ # Pair of chunked audio and its path tuple[ # Pair of chunked audio and its path
list[tuple[np.ndarray, float, int]], # Chunked audio list[tuple[np.ndarray, float, int]], # Chunked audio list of (ndarray, time position of chunk relative to original audio, channel_id)
Path # Path to original audio Path # Path to original audio
] ]
] # Listed of Chunked/Resampled audio ] # Listed of Chunked/Resampled audio
__audio_loader_threads: int # Amount of audio feeder threads __audio_feeder_threads: int # Amount of audio feeder threads
__feature_extractor_threads: int # Amount of feature extractor threads (if the method allows) __feature_extractor_threads: int # Amount of feature extractor threads (if the method allows)
__audio_paths_list: queue.Queue[Path] # Path list to audio __audio_paths_list: queue.Queue[Path] # Path list to audio
__max_audio_in_queue: int # Maximum audio in queue __max_audio_in_queue: int # Maximum audio in queue
# Audio Feeeder parameter __audio_feeder_barrier: threading.Barrier # Synchronization barrier for all audio feeder threads
# Audio Feeder parameter
__desired_sr: int # Desired Sample Rate (Resampling) __desired_sr: int # Desired Sample Rate (Resampling)
__mono: bool # Force load audio in mono mode __mono: bool # Force load audio in mono mode
__chunk_length: float # Audio chunk length __chunk_length: float # Audio chunk length
@@ -295,9 +300,8 @@ class MultiThreadedAudioFeatureExtractor():
# ... # ...
# } # }
# Runtime # Runtime
__audio_loader_threadpool: list[concurrent.futures.Future] __audio_feeder_threadpool: list[concurrent.futures.Future]
__feature_extractor_threadpool: list[concurrent.futures.Future] __feature_extractor_threadpool: list[concurrent.futures.Future]
__audio_feed_condition: threading.Condition
def __audio_inference_embedding(self, audio: list[tuple[np.ndarray, float, int]]) -> list[tuple[np.ndarray, float, int]]: def __audio_inference_embedding(self, audio: list[tuple[np.ndarray, float, int]]) -> list[tuple[np.ndarray, float, int]]:
"""Receives a list of audio chunks, and then extracts embeddings for all audio chunks, returns the resulting embedding as a list of tuples(embedding, time, channel_id) """Receives a list of audio chunks, and then extracts embeddings for all audio chunks, returns the resulting embedding as a list of tuples(embedding, time, channel_id)
@@ -313,18 +317,21 @@ class MultiThreadedAudioFeatureExtractor():
audio, timepos, channel_id = audio_chunk audio, timepos, channel_id = audio_chunk
zero = np.zeros(32) zero = np.zeros(32)
features.append( (zero, timepos, channel_id) ) features.append( (zero, timepos, channel_id) )
time.sleep(0.01) time.sleep(0.01) # Simulate effort, change to simulate spent seconds in each audio file
return features return features
# To be overridden # To be overridden
def __audio_feeder_thread(self, thread_id): def __audio_feeder_thread(self, thread_id: int, barrier: threading.Barrier):
# If there is still audio in paths list try:
# Is the audio queue not full? while True:
while (not self.__audio_paths_list.empty()): # Attempt to get audio path from audio path queue
if (not self.__audio_queue.full()):
# Feed audio
new_audio_path = self.__audio_paths_list.get() new_audio_path = self.__audio_paths_list.get()
self.__audio_paths_list.task_done() # Check thread exit condition (If the queue returns None, that means the audio path queue is now empty and the thread should end itself)
if (new_audio_path is None):
self.__audio_paths_list.put(new_audio_path) # Put None back to notify other audio feeder threads
# Omae wa mou shindeiru
break # Si la ETSISI ve esto seguramente me echarán de la escuela
# Now that the audio path queue is not empty, try preprocessing an audio
logging.info(f"[MTAFE] [Audio Feeder {thread_id}] Preprocess: {new_audio_path.absolute()}") logging.info(f"[MTAFE] [Audio Feeder {thread_id}] Preprocess: {new_audio_path.absolute()}")
new_audio = audiopreprocessing.load_preprocessed_audio( new_audio = audiopreprocessing.load_preprocessed_audio(
new_audio_path, new_audio_path,
@@ -333,17 +340,34 @@ class MultiThreadedAudioFeatureExtractor():
self.__chunk_length, self.__chunk_length,
self.__overlap self.__overlap
) )
self.__audio_queue.put((new_audio, new_audio_path)) self.__audio_queue.put((new_audio, new_audio_path)) # In theory, this should block this audio feeder thread when the audio queue is full
#self.__audio_queue.task_done()
#with self.__audio_feed_condition: self.__audio_feed_condition.notify_all()
logging.info(f"[MTAFE] [Audio Feeder {thread_id}] Feed: {new_audio_path.absolute()}") logging.info(f"[MTAFE] [Audio Feeder {thread_id}] Feed: {new_audio_path.absolute()}")
#else: logging.info("[MTAFE] [Audio Feeder {thread_id}] Waiting for other threads to finish")
# logging.info(f"[MTAFE] [Audio Feeder {thread_id}] Audio queue full ({self.__audio_queue.qsize()} <= {self.__max_audio_in_queue} FALSE): waiting") barrier.wait()
# with self.__audio_feed_condition: if (thread_id == 0):
# logging.info(f"[MTAFE] [Audio Feeder {thread_id}] Audio queue full: waiting") self.__audio_queue.put(None) # None to signal audio_queue has no more elements to process
# self.__audio_feed_condition.wait_for(lambda: not self.__audio_queue.qsize() <= self.__max_audio_in_queue) # This consumes way too much CPU power
# self.__audio_feed_condition.wait(10)
logging.info(f"[MTAFE] [Audio Feeder {thread_id}] Thread finished!") logging.info(f"[MTAFE] [Audio Feeder {thread_id}] Thread finished!")
except Exception as e:
logging.error(f"[MTAFE] [Audio Feeder {thread_id}] An exception occurred! Committing seppuku!")
logging.exception(e)
return
# while (not self.__audio_paths_list.empty()):
# if (not self.__audio_queue.full()):
# # Feed audio
# new_audio_path = self.__audio_paths_list.get()
# self.__audio_paths_list.task_done()
# logging.info(f"[MTAFE] [Audio Feeder {thread_id}] Preprocess: {new_audio_path.absolute()}")
# new_audio = audiopreprocessing.load_preprocessed_audio(
# new_audio_path,
# self.__desired_sr,
# self.__mono,
# self.__chunk_length,
# self.__overlap
# )
# self.__audio_queue.put((new_audio, new_audio_path))
# logging.info(f"[MTAFE] [Audio Feeder {thread_id}] Feed: {new_audio_path.absolute()}")
# logging.info(f"[MTAFE] [Audio Feeder {thread_id}] Thread finished!")
#def testfeedthread(self, nthreads): #def testfeedthread(self, nthreads):
# t1 = threading.Thread(target=self.__audio_feeder_thread, args=(1,)) # t1 = threading.Thread(target=self.__audio_feeder_thread, args=(1,))
@@ -358,7 +382,7 @@ class MultiThreadedAudioFeatureExtractor():
# self.__audio_loader_threadpool.append(ft) # self.__audio_loader_threadpool.append(ft)
def __check_all_audiofeed_thread_finished(self) -> bool: def __check_all_audiofeed_thread_finished(self) -> bool:
for ft in self.__audio_loader_threadpool: for ft in self.__audio_feeder_threadpool:
if ft.running(): if ft.running():
return False return False
return True return True
@@ -370,17 +394,33 @@ class MultiThreadedAudioFeatureExtractor():
return True return True
def __feature_extractor_thread(self, thread_id): def __feature_extractor_thread(self, thread_id):
while (not self.__check_all_audiofeed_thread_finished() or not self.__audio_queue.empty()): while True:
if (not self.__audio_queue.empty()): # Attempt to get next audio chunks to process
audio_to_process, audio_path = self.__audio_queue.get() next_audio_tuple = self.__audio_queue.get()
self.__audio_queue.task_done() # Check thread exit condition
logging.info(f"[MTAFE] [Feature Extractor {thread_id}] Extracting: {audio_path}") if (next_audio_tuple is None):
features_to_add = self.__audio_inference_embedding(audio_to_process) self.__audio_queue.put(next_audio_tuple) # Put the None back to notify other threads
logging.info(f"[MTAFE] [Feature Extractor {thread_id}] Extracted: {len(features_to_add)} features") break # unalive urself
else: # Assuming we got more tuples
current_audio_to_process, current_audio_path = next_audio_tuple # Deconstruct tuple
logging.info(f"[MTAFE] [Feature Extractor {thread_id}] Extracting: {current_audio_path}")
features_to_add = self.__audio_inference_embedding(current_audio_to_process)
with self.__features_lock: with self.__features_lock:
self.__features[audio_path] = features_to_add self.__features[current_audio_path] = features_to_add
#with self.__audio_feed_condition: self.__audio_feed_condition.notify_all() logging.info(f"[MTAFE] [Feature Extractor {thread_id}] Feature Extraction complete for {current_audio_path} w/ {len(features_to_add)} features")
logging.info(f"[MTAFE] [Feature Extractor {thread_id}] Feature Extraction complete for {audio_path} w/ {len(features_to_add)} features") logging.info(f"[MTAFE] [Feature Extractor {thread_id}] Thread finished!")
# while (not self.__check_all_audiofeed_thread_finished() or not self.__audio_queue.empty()):
# if (not self.__audio_queue.empty()):
# audio_to_process, audio_path = self.__audio_queue.get()
# self.__audio_queue.task_done()
# logging.info(f"[MTAFE] [Feature Extractor {thread_id}] Extracting: {audio_path}")
# features_to_add = self.__audio_inference_embedding(audio_to_process)
# logging.info(f"[MTAFE] [Feature Extractor {thread_id}] Extracted: {len(features_to_add)} features")
# with self.__features_lock:
# self.__features[audio_path] = features_to_add
# #with self.__audio_feed_condition: self.__audio_feed_condition.notify_all()
# logging.info(f"[MTAFE] [Feature Extractor {thread_id}] Feature Extraction complete for {audio_path} w/ {len(features_to_add)} features")
#else: #else:
# if (not self.__check_all_audiofeed_thread_finished()): # if (not self.__check_all_audiofeed_thread_finished()):
# with self.__audio_feed_condition: # with self.__audio_feed_condition:
@@ -388,14 +428,13 @@ class MultiThreadedAudioFeatureExtractor():
# self.__audio_feed_condition.wait(10) # self.__audio_feed_condition.wait(10)
# self.__audio_feed_condition.wait_for(lambda: not self.__audio_queue.empty()) # self.__audio_feed_condition.wait_for(lambda: not self.__audio_queue.empty())
logging.info(f"[MTAFE] [Feature Extractor {thread_id}] Thread finished!")
def __count_running_threads(self) -> tuple[int, int]: def __count_running_threads(self) -> tuple[int, int]:
running_extractors = 0 running_extractors = 0
running_feeders = 0 running_feeders = 0
for ft in self.__feature_extractor_threadpool: for ft in self.__feature_extractor_threadpool:
if ft.running(): running_extractors += 1 if ft.running(): running_extractors += 1
for ft in self.__audio_loader_threadpool: for ft in self.__audio_feeder_threadpool:
if ft.running(): running_feeders += 1 if ft.running(): running_feeders += 1
return (running_feeders, running_extractors) return (running_feeders, running_extractors)
@@ -404,21 +443,26 @@ class MultiThreadedAudioFeatureExtractor():
return self.__features return self.__features
def extract(self): def extract(self):
total_amount = self.__audio_paths_list.qsize() total_amount = self.__audio_paths_list.qsize() - 1 # Account for None to indicate queue end
logging.info(f"[MTAFE] [Main] Starting feature extraction for {total_amount} file(s)") logging.info(f"[MTAFE] [Main] Starting feature extraction for {total_amount} file(s)")
t_start = time.perf_counter() t_start = time.perf_counter() # Timer
with concurrent.futures.ThreadPoolExecutor(max_workers=(self.__audio_loader_threads + self.__feature_extractor_threads)) as executor: with concurrent.futures.ProcessPoolExecutor(max_workers=(self.__audio_feeder_threads + self.__feature_extractor_threads)) as executor:
for i in range(self.__audio_loader_threads): # Audio feeder threads
ld_ft = executor.submit(self.__audio_feeder_thread, i) for i in range(self.__audio_feeder_threads):
self.__audio_loader_threadpool.append(ld_ft) logging.info(f"[MTAFE] Started audio feeder thread {i}")
ld_ft = executor.submit(self.__audio_feeder_thread, i, self.__audio_feeder_barrier)
self.__audio_feeder_threadpool.append(ld_ft)
# Feature extractor threads
for i in range(self.__feature_extractor_threads): for i in range(self.__feature_extractor_threads):
ld_ft = executor.submit(self.__feature_extractor_thread, i) logging.info(f"[MTAFE] Started feature extractor thread {i}")
self.__feature_extractor_threadpool.append(ld_ft) ex_ft = executor.submit(self.__feature_extractor_thread, i)
self.__feature_extractor_threadpool.append(ex_ft)
# Progress checking
while ( (not self.__check_all_audiofeed_thread_finished()) and (not self.__check_all_featureextractor_thread_finished()) ): while ( (not self.__check_all_audiofeed_thread_finished()) and (not self.__check_all_featureextractor_thread_finished()) ):
nfeeder, nextract = self.__count_running_threads() nfeeder, nextract = self.__count_running_threads()
print(f"[MTAFE Progress] Processed {len(self.__features)}/{total_amount} (L:{self.__audio_queue.qsize()}/W:{self.__audio_paths_list.qsize()}, LD:{nfeeder}/EXT:{nextract})", end="\r") print(f"[MTAFE Progress] Processed {len(self.__features)}/{total_amount} (L:{self.__audio_queue.qsize()}/W:{self.__audio_paths_list.qsize()}, LD:{nfeeder}/EXT:{nextract})", end="\r")
t_stop = time.perf_counter() t_stop = time.perf_counter()
logging.info(f"[MTAFE] Processed {len(self.__features)}/{total_amount} (L:{self.__audio_queue.qsize()}/W:{self.__audio_paths_list.qsize()} COMPLETE)") logging.info(f"[MTAFE] Processed {len(self.__features)}/{total_amount} (L:{self.__audio_queue.qsize() - 1}/W:{self.__audio_paths_list.qsize() - 1} COMPLETE)")
delta_t = t_stop - t_start delta_t = t_stop - t_start
total_features = sum( [len(self.__features[path]) for path in self.__features] ) total_features = sum( [len(self.__features[path]) for path in self.__features] )
logging.info(f"[MTAFE] Extraction complete. Took {delta_t} seconds. Added {total_features} vectors/embeddings") logging.info(f"[MTAFE] Extraction complete. Took {delta_t} seconds. Added {total_features} vectors/embeddings")
@@ -435,15 +479,15 @@ class MultiThreadedAudioFeatureExtractor():
chunk_overlap: float = 2.0, chunk_overlap: float = 2.0,
): ):
# Check if the paths passed in are all valid and add them to queue # Check if the paths passed in are all valid and add them to queue
self.__audio_paths_list = queue.Queue() self.__audio_paths_list = multiprocessing.Queue()
for p in audio_paths: for p in audio_paths:
if not p.is_file(): if not p.is_file():
raise Exception(f"Path '{p.absolute()}' is NOT a valid file!") raise Exception(f"Path '{p.absolute()}' is NOT a valid file!")
else: else:
self.__audio_paths_list.put(p) self.__audio_paths_list.put(p)
#self.__audio_paths_list.task_done() self.__audio_paths_list.put(None) # To signal to the producer that the audio path list is empty, since Queue.empty() is unreliable
logging.info(f"[MTAFE] [Constructor] Queued {self.__audio_paths_list.qsize()} files") logging.info(f"[MTAFE] [Constructor] Queued {self.__audio_paths_list.qsize() - 1} files")
# Set up private attributes # Set up private attributes
## Audio preprocessing parameters ## Audio preprocessing parameters
@@ -454,16 +498,16 @@ class MultiThreadedAudioFeatureExtractor():
## Extractor/Feeder settings ## Extractor/Feeder settings
self.__max_audio_in_queue = max_audio_in_queue self.__max_audio_in_queue = max_audio_in_queue
self.__audio_loader_threads = audio_feeder_threads self.__audio_feeder_threads = audio_feeder_threads
self.__feature_extractor_threads = feature_extractor_threads self.__feature_extractor_threads = feature_extractor_threads
## Set up runtime conditions ## Set up runtime conditions
self.__audio_queue = queue.Queue() self.__audio_queue = multiprocessing.Queue(maxsize=self.__max_audio_in_queue)
self.__features = {} self.__features = {}
self.__features_lock = threading.Lock() self.__features_lock = multiprocessing.Lock()
self.__audio_loader_threadpool = [] self.__audio_feeder_barrier = multiprocessing.Barrier(self.__audio_feeder_threads)
self.__audio_feeder_threadpool = []
self.__feature_extractor_threadpool = [] self.__feature_extractor_threadpool = []
self.__audio_feed_condition = threading.Condition()
logging.info(f"[MTAFE] [Constructor] Extraction parameters: {desired_sr}Hz, Mono: {force_mono}, Divide into {chunk_length}s chunks with {chunk_overlap}s of overlap") logging.info(f"[MTAFE] [Constructor] Extraction parameters: {desired_sr}Hz, Mono: {force_mono}, Divide into {chunk_length}s chunks with {chunk_overlap}s of overlap")
logging.info(f"[MTAFE] [Constructor] Using {audio_feeder_threads} threads for preprocessing audio and {feature_extractor_threads} threads for feature extraction. Max queue size of {max_audio_in_queue} files") logging.info(f"[MTAFE] [Constructor] Using {audio_feeder_threads} threads for preprocessing audio and {feature_extractor_threads} threads for feature extraction. Max queue size of {max_audio_in_queue} files")

View File

@@ -0,0 +1,8 @@
import dataset_files
import multiprocessing
import logging
import numpy as np
import threading
import queue
from pathlib import Path

View File

@@ -5,12 +5,13 @@ logging.basicConfig(format="%(asctime)s/%(levelname)s: [%(module)s] %(message)s"
from dataset_files import MultiThreadedAudioFeatureExtractor, random_audio_chunk from dataset_files import MultiThreadedAudioFeatureExtractor, random_audio_chunk
mtafe = MultiThreadedAudioFeatureExtractor( mtafe = MultiThreadedAudioFeatureExtractor(
audio_paths=random_audio_chunk(200), audio_paths=random_audio_chunk(8),
max_audio_in_queue=8, max_audio_in_queue=8,
audio_feeder_threads=8, audio_feeder_threads=8,
feature_extractor_threads=1, feature_extractor_threads=1,
desired_sr=32000, desired_sr=32000,
force_mono=False, force_mono=False,
chunk_length=15, chunk_length=15,
chunk_overlap=2) chunk_overlap=2
)
mtafe.extract() mtafe.extract()

View File

@@ -0,0 +1,17 @@
#import mtafe
import logging
#import dataset_files
logging.basicConfig(format="%(asctime)s/%(levelname)s: [%(module)s] %(message)s", level=logging.DEBUG)
logging.info("Running tests")
# m = mtafe.mtafe(
# audio_paths=dataset_files.random_audio_chunk(2),
# max_audio_in_queue=8,
# audio_feeder_threads=8,
# feature_extractor_threads=1,
# desired_sr=32000,
# force_mono=False,
# chunk_length=15,
# chunk_overlap=2
# )
# m.run()

View File

@@ -0,0 +1,95 @@
import librosa
import pickle
import os
import numpy as np
from pathlib import Path
import logging
logger = logging.getLogger(__name__)
def triggerlog():
logger.critical("Testing: info")
def resample_load(input_path : Path, target_sr : int = 16000, mono_audio : bool = False) -> np.ndarray: # AI
"""Load and resamples the audio into `target_sr`.
Args:
input_path (Path): pathlib.Path object to audio file
target_sr (int, optional): Target Sample Rate to resample. Defaults to 16000.
mono_audio (bool, optional): Load the audio in mono mode. Defaults to False.
Returns:
np.ndarray: _description_
"""
# Load audio file with original sample rate
logger.info(f"[resample_load] Loading audio {input_path}")
audio, orig_sr = librosa.load(input_path, sr=None, mono=mono_audio)
# Resample if necessary
if orig_sr != target_sr:
logger.info(f"[resample_load] Resampling to {target_sr}")
audio = librosa.resample(audio, orig_sr=orig_sr, target_sr=target_sr)
return audio
def chunk_audio(audio : np.ndarray, sr: int, chunk_length: float = 10.0, overlap: float = 2.0) -> tuple[list[np.ndarray], list[float], int]: # AI
"""
Chunks audio file into overlapping segments. Only pass in mono audio here.
Args:
audio_file: Loaded audio ndarray (one channel only)
sr: Sample rate for the given audio file
chunk_length: Length of each chunk in seconds
overlap: Overlap between chunks in seconds
Returns:
List of audio chunks, list of chunk positions, and given sample rate
"""
logger.info(f"[chunk_audio] Chunking audio ({len(audio) / sr}s)")
# Calculate chunk size and hop length in samples
chunk_size = int(chunk_length * sr)
hop_length = int((chunk_length - overlap) * sr)
# Generate chunks
chunks = []
positions = []
k = 0
for i in range(0, len(audio) - chunk_size + 1, hop_length):
chunk = audio[i:i + chunk_size]
chunks.append(chunk)
positions.append(i / sr)
k += 1
if k == 0: # The full audio length is less than chunk_length
chunks = [audio]
positions = [0.0]
logger.info(f"[chunk_audio] Audio less than chunk_length. Returning original audio as chunk\r")
else:
logger.info(f"[chunk_audio] Audio is split into {k} chunks")
return chunks, positions, sr
def load_preprocessed_audio(
path: Path,
desired_sr: int,
mono: bool = False,
chunk_length: float = 15.0,
overlap: float = 2.0) -> list[tuple[np.ndarray, float, int]]:
result = []
# Load and resample audio
audio = resample_load(path, desired_sr, mono) # Stereo 2D matrix, Mono 1D array
if mono or (audio.ndim == 1):
# Chunk audio: mono (or the audio file loaded in itself is mono)
chunks, positions, _ = chunk_audio(audio, desired_sr, chunk_length, overlap)
assert len(chunks) == len(positions)
result.extend(zip(chunks, positions, [-1 for _ in range(len(chunks))]))
# (ndarray_chunk1, pos1, -1): first audio chunk, position1, -1 (Mono channel indicator)
else:
# Chunk audio: stereo/multichannel
for channel_id, channel_audio in enumerate(audio):
chunks, positions, _ = chunk_audio(channel_audio, desired_sr, chunk_length, overlap)
assert len(chunks) == len(positions)
result.extend(zip(chunks, positions, [channel_id for _ in range(len(chunks))]))
# (ndarray_chunk1, pos1, 0): first audio chunk, position1, 0 (channel 0)
logging.info(f"[load_preprocessed_audio] Loaded audio {path} ({desired_sr}Hz, Chunk {chunk_length}s with overlap {overlap}s) MONO:{mono}")
return result

135
mtafe_lab/dataset.py Normal file
View File

@@ -0,0 +1,135 @@
import platform
import os
import pickle
import random
import multiprocessing
import threading
import time
import concurrent.futures
import numpy as np
from pathlib import Path
import audiopreprocessing
import logging
import queue
def serialize_dict_obj(path : Path, object : dict) -> int:
"""Serializes Python Dictionary object to a file via Pickle.
Args:
path (Path): Path to store the file
object (dict): Dictionary object to serialize
Returns:
int: size in bytes written
"""
# Horrible practice, horrible security, but it will work for now
with path.open("wb") as fp:
pickle.dump(object, fp)
fp.seek(0, os.SEEK_END)
size = fp.tell()
return size
logging.info("Reading local dataset directory structure...")
ASMRThreePath = Path("C:\\ASMRThree")
ASMRTwoPath = Path("D:\\ASMRTwo")
ASMROnePath = Path("E:\\ASMROne")
if (platform.system() == 'Linux'):
ASMROnePath = Path('/mnt/Scratchpad/ASMROne')
ASMRTwoPath = Path('/mnt/MyStuffz/ASMRTwo')
ASMRThreePath = Path('/mnt/Windows11/ASMRThree')
size_one, size_two, size_three = 0, 0, 0
files_one, files_two, files_three = [], [], []
folders_one, folders_two, folders_three = [], [], []
# Statistic calculation for ASMROne
for root, dirs, files in ASMROnePath.walk(): # Root will iterate through all folders
if root.absolute() != ASMROnePath.absolute(): # Skip root of ASMROnePath
folders_one.append(root) # Add folder to list
for fname in files: # Iterate through all files in current root
file = root/fname # Get file path
assert file.is_file()
files_one.append(file)
size_one += file.stat().st_size # Get file size
# Statistic calculation for ASMRTwo
for root, dirs, files in ASMRTwoPath.walk(): # Root will iterate through all folders
if root.absolute() != ASMRTwoPath.absolute(): # Skip root of ASMRTwoPath
folders_two.append(root) # Add folder to list
for fname in files: # Iterate through all files in current root
file = root/fname # Get file path
assert file.is_file()
files_two.append(file)
size_two += file.stat().st_size # Get file size
# Statistic calculation for ASMRThree
for root, dirs, files in ASMRThreePath.walk(): # Root will iterate through all folders
if root.absolute() != ASMRThreePath.absolute(): # Skip root of ASMRThreePath
folders_three.append(root) # Add folder to list
for fname in files: # Iterate through all files in current root
file = root/fname # Get file path
assert file.is_file()
files_three.append(file)
size_three += file.stat().st_size # Get file size
DataSubsetPaths = [ASMROnePath, ASMRTwoPath, ASMRThreePath]
DLSiteWorksPaths = []
# Collect ASMR Works (RJ ID, Paths)
for ASMRSubsetPath in DataSubsetPaths:
for WorkPaths in ASMRSubsetPath.iterdir():
DLSiteWorksPaths.append(WorkPaths)
fileExt2fileType = {
".TXT": "Document",
".WAV": "Audio",
".MP3": "Audio",
".PNG": "Image",
".JPG": "Image",
".VTT": "Subtitle",
".PDF": "Document",
".FLAC": "Audio",
".MP4": "Video",
".LRC": "Subtitle",
".SRT": "Subtitle",
".JPEG": "Image",
".ASS": "Subtitle",
"": "NO EXTENSION",
".M4A": "Audio",
".MKV": "Video"
}
fileext_stat = {}
file_list = files_one + files_two + files_three
file_list_count = len(file_list)
for file in file_list:
f_ext = file.suffix.upper()
if (f_ext in fileext_stat.keys()):
fileext_stat[f_ext]['Count'] += 1
fileext_stat[f_ext]['List'].append(file)
fileext_stat[f_ext]['ExtensionMass'] += file.stat().st_size
else:
fileext_stat[f_ext] = {}
fileext_stat[f_ext]['Count'] = 1
fileext_stat[f_ext]['List'] = [file]
fileext_stat[f_ext]['ExtensionMass'] = file.stat().st_size # The total sum of sizes of the same file extension
fileext_stat[f_ext]['MediaType'] = fileExt2fileType[f_ext]
audio_paths = []
for extension in fileext_stat: # I can't be bothered to convert this into a list compresion
if fileext_stat[extension]['MediaType'] == "Audio":
audio_paths += fileext_stat[extension]['List']
def random_audio_chunk(n : int, seed : int = 177013) -> list[Path]:
"""Returns a random selection of audio files
Args:
n (int): Amount of files to return
seed (int, optional): Seed for RNG. Defaults to 177013.
Returns:
list[Path]: List of randomly selected audio paths (using Path object)
"""
random.seed(seed)
#return random.choices(audio_paths, k=n) # Contains repeated elements
return random.sample(audio_paths, k=n)

32
mtafe_lab/mtafe.py Normal file
View File

@@ -0,0 +1,32 @@
import logging
logging.basicConfig(format="%(asctime)s/%(levelname)s: [%(module)s] %(message)s", level=logging.INFO)
import multiprocessing
import multiprocessing.process
import dataset
import audiopreprocessing
from pathlib import Path
def copy_worker(origin_queue, target_queue):
p = origin_queue.get()
logging.info(f"Processing: {p}")
l = audiopreprocessing.load_preprocessed_audio(p, 32000, True)
print("Preprocess complete, putting it into queue")
target_queue.put(l) # Even on a small scale test, the process will always hang here
if __name__ == "__main__":
audio_path_queue = multiprocessing.Queue()
audio_queue = multiprocessing.Queue()
rand_paths = dataset.random_audio_chunk(1)
for p in rand_paths:
audio_path_queue.put(p)
print("Files queued")
processes = [multiprocessing.Process(target=copy_worker, args=(audio_path_queue, audio_queue)) for _ in range(1)]
for p in processes: p.start()
for p in processes: p.join()
print("Joined")
#for _ in range(1): print(audio_queue.get())

30
mtafe_lab/test_mp.py Normal file
View File

@@ -0,0 +1,30 @@
import logging
logging.basicConfig(format="%(asctime)s/%(levelname)s: [%(module)s] %(message)s", level=logging.INFO)
import multiprocessing
from dataset import random_audio_chunk
import audiopreprocessing
from time import sleep
origin_queue = multiprocessing.Queue()
target_queue = multiprocessing.Queue()
def worker(orig, targ):
p = orig.get()
#out = "PROCESSED" + str(p.absolute())
out = audiopreprocessing.load_preprocessed_audio(p, 16000, True) # This will cause put to hang
targ.put(out) # This will hang the process
if __name__ == "__main__":
K = 2
for p in random_audio_chunk(K):
origin_queue.put(p)
processes = [multiprocessing.Process(target=worker, args=(origin_queue, target_queue)) for _ in range(K)]
for p in processes: p.start()
for p in processes: p.join()
logging.critical("Successfully terminated all threads")
for _ in range(K): print(target_queue.get())

21
mtafe_lab/test_mtafe.py Normal file
View File

@@ -0,0 +1,21 @@
import logging
logging.basicConfig(format="%(asctime)s/%(levelname)s: [%(module)s] %(message)s", level=logging.INFO)
import mtafe
from dataset import random_audio_chunk
logging.info("Generating random audio path list")
rdpl = random_audio_chunk(2)
logging.info("Initializing MTAFE")
mtafe.initialize_parameters(
paudio_paths=rdpl,
pmax_audio_in_queue=4,
paudio_feeder_threads=2,
pfeature_extractor_threads=1,
pdesired_sr=32000,
pforce_mono=False,
pchunk_length=15,
pchunk_overlap=2
)
mtafe.test_feeder()