import logging #logging.basicConfig(format="%(asctime)s/%(levelname)s: [%(module)s] %(message)s", level=logging.INFO) import dataset import numpy as np import audiopreprocessing import threading import queue import time from concurrent.futures import ThreadPoolExecutor, Future from pathlib import Path class mtafe: # Input audio_path_queue: queue.Queue[Path] # List of audio paths to preprocess # Feeder/Extractor/Queue threading options audio_feeder_threads: int # Amount of audio feeder threads feature_extractor_threads: int # Amount of feature extractor threads (if the method allows) max_audio_in_queue: int # Maximum audio in queue # Audio preprocessing parameters desired_sr: int # Desired Sample Rate (Resampling) mono: bool # Force load audio in mono mode chunk_length: float # Audio chunk length overlap: float # Audio chunk overlap # Runtime audio_queue: queue.Queue[ # List of ... tuple[ # Pair of chunked audio and its path list[tuple[np.ndarray, float, int]], # Chunked audio list of (ndarray, time position of chunk relative to original audio, channel_id) Path # Path to original audio ] ] # Listed of Chunked/Resampled audio audio_feeder_threadpool: list[Future] feature_extractor_threadpool: list[Future] features_lock: threading.Lock audio_feeder_barrier: threading.Barrier # Synchronization barrier for all audio feeder threads # Output features: dict[Path, list[tuple[np.ndarray, float, int]]] def __init__( self, paudio_paths: list[Path], pmax_audio_in_queue: int = 16, paudio_feeder_threads: int = 8, pfeature_extractor_threads: int = 8, pdesired_sr: int = 32000, pforce_mono: bool = False, pchunk_length: float = 15.0, pchunk_overlap: float = 2.0 ): # Check if the paths passed in are all valid and add them to queue self.audio_path_queue = queue.Queue() for p in paudio_paths: if not p.is_file(): raise Exception(f"Path '{p.absolute()}' is NOT a valid file!") else: self.audio_path_queue.put(p) self.audio_path_queue.put(None) # To signal to the producer that the audio path list is empty, since Queue.empty() is unreliable logging.info(f"[MTAFE] [Constructor] Queued {self.audio_path_queue.qsize() - 1} files") # Set up private attributes ## Audio preprocessing parameters self.desired_sr = pdesired_sr self.mono = pforce_mono self.chunk_length = pchunk_length self.overlap = pchunk_overlap ## Extractor/Feeder settings self.max_audio_in_queue = pmax_audio_in_queue self.audio_feeder_threads = paudio_feeder_threads self.feature_extractor_threads = pfeature_extractor_threads ## Set up runtime conditions self.audio_queue = queue.Queue(maxsize=self.max_audio_in_queue) self.features = {} self.features_lock = threading.Lock() self.audio_feeder_barrier = threading.Barrier(self.audio_feeder_threads) self.audio_feeder_threadpool = [] self.feature_extractor_threadpool = [] logging.info(f"[MTAFE] [Constructor] Extraction parameters: {pdesired_sr}Hz, Mono: {pforce_mono}, Divide into {pchunk_length}s chunks with {pchunk_overlap}s of overlap") logging.info(f"[MTAFE] [Constructor] Using {paudio_feeder_threads} threads for preprocessing audio and {pfeature_extractor_threads} threads for feature extraction. Max queue size of {pmax_audio_in_queue} files") def audio_inference_embedding(self, audio: list[tuple[np.ndarray, float, int]]) -> list[tuple[np.ndarray, float, int]]: """Receives a list of audio chunks, and then extracts embeddings for all audio chunks, returns the resulting embedding as a list of tuples(embedding, time, channel_id) Args: audio (list[tuple[np.ndarray, float, int]]): list of audio chunks Returns: list[tuple[np.ndarray, float, int]]: List of (embedding vector, timepos, channel id) """ features = [] for audio_chunk in audio: audio, timepos, channel_id = audio_chunk zero = np.zeros(32) features.append( (zero, timepos, channel_id) ) time.sleep(1.5) # Simulate effort, change to simulate spent seconds in each audio file return features # To be overridden def audio_feeder_worker(self, thread_id: int, barrier: threading.Barrier): # AI try: while True: # Add timeout to prevent blocking indefinitely try: new_audio_path = self.audio_path_queue.get(timeout=10) except queue.Empty: logging.warning(f"[MTAFE] [Audio Feeder {thread_id}] Queue get timeout") continue if new_audio_path is None: self.audio_path_queue.put(new_audio_path) # Put None back break logging.info(f"[MTAFE] [Audio Feeder {thread_id}] Preprocess: {new_audio_path.absolute()}") try: new_audio = audiopreprocessing.load_preprocessed_audio( new_audio_path, self.desired_sr, self.mono, self.chunk_length, self.overlap ) # Add timeout to prevent deadlock on full queue try: self.audio_queue.put((new_audio, new_audio_path), timeout=30) logging.info(f"[MTAFE] [Audio Feeder {thread_id}] Feed: {new_audio_path.absolute()}") except queue.Full: logging.error(f"[MTAFE] [Audio Feeder {thread_id}] Queue full, skipping {new_audio_path}") continue except Exception as e: logging.error(f"[MTAFE] [Audio Feeder {thread_id}] Error processing {new_audio_path}: {str(e)}") continue # Add barrier timeout to prevent indefinite wait logging.info(f"[MTAFE] [Audio Feeder {thread_id}] Waiting for other threads") try: barrier.wait(timeout=60) except threading.BrokenBarrierError: logging.error(f"[MTAFE] [Audio Feeder {thread_id}] Barrier broken") if thread_id == 0: self.audio_queue.put(None) # Signal end logging.info(f"[MTAFE] [Audio Feeder {thread_id}] Thread finished!") except Exception as e: logging.error(f"[MTAFE] [Audio Feeder {thread_id}] Fatal exception: {str(e)}") logging.exception(e) # Ensure barrier can progress even if a thread fails try: barrier.abort() except: pass # Ensure sentinel is added even if threads fail if thread_id == 0: try: self.audio_queue.put(None, timeout=5) except: pass # def audio_feeder_worker(self, thread_id: int, barrier: threading.Barrier): # try: # while True: # # Attempt to get audio path from audio path queue # new_audio_path = self.audio_path_queue.get() # # Check thread exit condition (If the queue returns None, that means the audio path queue is now empty and the thread should end itself) # if (new_audio_path is None): # self.audio_path_queue.put(new_audio_path) # Put None back to notify other audio feeder threads # break # Break out of the infinite loop # # Audio path queue is not empty: # logging.info(f"[MTAFE] [Audio Feeder {thread_id}] Preprocess: {new_audio_path.absolute()}") # new_audio = audiopreprocessing.load_preprocessed_audio( # new_audio_path, # self.desired_sr, # self.mono, # self.chunk_length, # self.overlap # ) # self.audio_queue.put((new_audio, new_audio_path)) # logging.info(f"[MTAFE] [Audio Feeder {thread_id}] Feed: {new_audio_path.absolute()}") # logging.info(f"[MTAFE] [Audio Feeder {thread_id}] Waiting for other threads to finish") # barrier.wait() # if (thread_id == 0): # self.audio_queue.put(None) # None to signal audio_queue has no more elements to process # logging.info(f"[MTAFE] [Audio Feeder {thread_id}] Thread finished!") # except Exception as e: # logging.error(f"[MTAFE] [Audio Feeder {thread_id}] An exception occurred! Committing seppuku!") # logging.exception(e) # return def feature_extractor_worker(self, thread_id: int): while True: # Attempt to get next audio chunks to process next_audio_tuple = self.audio_queue.get() # Check thread exit condition if (next_audio_tuple is None): self.audio_queue.put(next_audio_tuple) # Put the None back to notify other threads break # unalive urself else: # Assuming we got more tuples current_audio_to_process, current_audio_path = next_audio_tuple # Deconstruct tuple logging.info(f"[MTAFE] [Feature Extractor {thread_id}] Extracting: {current_audio_path}") features_to_add = self.audio_inference_embedding(current_audio_to_process) with self.features_lock: self.features[current_audio_path] = features_to_add logging.info(f"[MTAFE] [Feature Extractor {thread_id}] Feature Extraction complete for {current_audio_path} w/ {len(features_to_add)} features") logging.info(f"[MTAFE] [Feature Extractor {thread_id}] Thread finished!") def test_audio_feeder_worker(self): total_file_amount = self.audio_path_queue.qsize() - 1 logging.info("[MTAFE] [test_audio_feeder_worker] Spinning up new threads...") with ThreadPoolExecutor(max_workers=self.audio_feeder_threads) as executor: for i in range(self.audio_feeder_threads): ld_ft = executor.submit(self.audio_feeder_worker, i, self.audio_feeder_barrier) self.audio_feeder_threadpool.append(ld_ft) logging.info(f"[MTAFE] [test_audio_feeder_worker] Launched audio feeder {i}") for i in range(total_file_amount): _, p = self.audio_queue.get() time.sleep(0.25) logging.info(f"[MTAFE] [test_audio_feeder_worker] Popped: {p}") logging.info("[MTAFE] [test_audio_feeder_worker] All audio feeder worker joined!") #logging.info(f"[MTAFE] [test_audio_feeder_worker] Current audio queue size: {self.audio_queue.qsize()}") def count_running_threads(self) -> tuple[int, int]: running_extractors = 0 running_feeders = 0 for ft in self.feature_extractor_threadpool: if ft.running(): running_extractors += 1 for ft in self.audio_feeder_threadpool: if ft.running(): running_feeders += 1 return (running_feeders, running_extractors) def check_all_audiofeed_thread_finished(self) -> bool: for ft in self.audio_feeder_threadpool: if ft.running(): return False return True def check_all_featureextractor_thread_finished(self) -> bool: for ft in self.feature_extractor_threadpool: if ft.running(): return False return True def extract(self): total_amount = self.audio_path_queue.qsize() - 1 # Account for None to indicate queue end logging.info(f"[MTAFE] [Main] Starting feature extraction for {total_amount} file(s)") t_start = time.perf_counter() # Timer with ThreadPoolExecutor(max_workers=(self.audio_feeder_threads + self.feature_extractor_threads)) as executor: # Audio feeder threads for i in range(self.audio_feeder_threads): logging.info(f"[MTAFE] Started audio feeder thread {i}") ld_ft = executor.submit(self.audio_feeder_worker, i, self.audio_feeder_barrier) self.audio_feeder_threadpool.append(ld_ft) # Feature extractor threads for i in range(self.feature_extractor_threads): logging.info(f"[MTAFE] Started feature extractor thread {i}") ex_ft = executor.submit(self.feature_extractor_worker, i) self.feature_extractor_threadpool.append(ex_ft) # Progress checking while ( (not self.check_all_audiofeed_thread_finished()) and (not self.check_all_featureextractor_thread_finished()) ): nfeeder, nextract = self.count_running_threads() print(f"[MTAFE Progress] Processed {len(self.features)}/{total_amount} (L:{self.audio_queue.qsize()}/W:{self.audio_path_queue.qsize()}, LD:{nfeeder}/EXT:{nextract})", end="\r") t_stop = time.perf_counter() logging.info(f"[MTAFE] Processed {len(self.features)}/{total_amount} (L:{self.audio_queue.qsize() - 1}/W:{self.audio_path_queue.qsize() - 1} COMPLETE)") delta_t = t_stop - t_start total_features = sum( [len(self.features[path]) for path in self.features] ) logging.info(f"[MTAFE] Extraction complete. Took {delta_t} seconds. Added {total_features} vectors/embeddings")