diff --git a/DLSiteFSearchObsidian/Implementation attempt for MTAFE.md b/DLSiteFSearchObsidian/Implementation attempt for MTAFE.md new file mode 100644 index 0000000..f462192 --- /dev/null +++ b/DLSiteFSearchObsidian/Implementation attempt for MTAFE.md @@ -0,0 +1,15 @@ +My implementation attempt for a Multi-Threaded Audio Feature Extractor... my attempt ended in misery. + +My vision is a program that is multi-threaded, that will do audio pre-processing and feature extraction in different threads. There should be `i` threads that will do pre-processing on all given audio file paths, and there should be `j` threads that will do feature extraction. If the audio pre-processing pipeline is single-threaded, it will pose a bottleneck to the entire program. But the feature extractor itself is also a bottleneck, since all audio embedding extractor rely on GPU inference, the feature extraction process must be single-threaded on my computer. + +I was trying to adapt the program for multiple threads for audio pre-processing AND multi-threaded for feature extraction (for beefier GPU that can handle more inference threads) + +Unfortunately... All my attempts has ended in misery, my multi-threaded code is littered with performance issues and deadlocks. Python isn't exactly the best language for multi-threaded code due to the existence of GIL. I am trying to implement a multi-producer, multi-consumer model here. The best attempt I was able to do will hang for a long time waiting for the producer (audio feeder) to pre-process the audio, and put it on the shared queue. It will lock up for a really long time, but after that, it will process everything in light speed. But when it's nearing the end, there is a great chance that the program will deadlock itself. I wasn't able to debug, and the profile didn't really yield any result that are useful to me. + +At one point I even relied on AI, and I still wasn't getting a consistent result, the AI generated a code that was significantly faster, with less deadlock, but has the issue of skipping audio files due to them not being pre-processed in time. I could implement additional logic to catch processing errors, and retry if possible. But I am really burnt out, and I would look for better alternatives. + +The next thing I am going to try is to separate this program into two, this program attempts to do pre-processing AND feature extraction in the same time. I would split the process into two. One program (preferably multi-threaded) that will do all the audio pre-processing (resampling, chunking, etc.), and it will output the pre-processed audio into a serialized pickle file, or any other serialization formats. +I can see various issues with this approach, the most important of which is space, I am basically taking all of those audio files (which is NOT a small amount), and I am re-encoding it, without any compression. Even though I have decided to lower the audio's bit-rate (fro, the typical 48000 Hz or 192000 Hz to just 32000 Hz, or in specific embedding extraction models: 8000 Hz or 16000 Hz), this will still take up a lot of space. +Also the pickle won't be the best format for storing all of those audio, safety issue is one of them, but the alternative of encoding each chunk into FLAC/MP3 compressed format, will be very heavy on the file system. Even though I do have a SSD. I am uncertain if the filesystem, handling hundred of thousands of audio chunk files will have a hit on the performance and the life of the SSD. + +But at least this will be a lot easier to implement. \ No newline at end of file diff --git a/mtafe_lab/mtafe.py b/mtafe_lab/mtafe.py index ad4667d..4ebe43f 100644 --- a/mtafe_lab/mtafe.py +++ b/mtafe_lab/mtafe.py @@ -1,32 +1,269 @@ import logging -logging.basicConfig(format="%(asctime)s/%(levelname)s: [%(module)s] %(message)s", level=logging.INFO) +#logging.basicConfig(format="%(asctime)s/%(levelname)s: [%(module)s] %(message)s", level=logging.INFO) -import multiprocessing -import multiprocessing.process import dataset +import numpy as np import audiopreprocessing +import threading +import queue +import time +from concurrent.futures import ThreadPoolExecutor, Future from pathlib import Path -def copy_worker(origin_queue, target_queue): - p = origin_queue.get() - logging.info(f"Processing: {p}") - l = audiopreprocessing.load_preprocessed_audio(p, 32000, True) - print("Preprocess complete, putting it into queue") - target_queue.put(l) # Even on a small scale test, the process will always hang here - -if __name__ == "__main__": - audio_path_queue = multiprocessing.Queue() - audio_queue = multiprocessing.Queue() +class mtafe: + # Input + audio_path_queue: queue.Queue[Path] # List of audio paths to preprocess + # Feeder/Extractor/Queue threading options + audio_feeder_threads: int # Amount of audio feeder threads + feature_extractor_threads: int # Amount of feature extractor threads (if the method allows) + max_audio_in_queue: int # Maximum audio in queue + # Audio preprocessing parameters + desired_sr: int # Desired Sample Rate (Resampling) + mono: bool # Force load audio in mono mode + chunk_length: float # Audio chunk length + overlap: float # Audio chunk overlap + # Runtime + audio_queue: queue.Queue[ # List of ... + tuple[ # Pair of chunked audio and its path + list[tuple[np.ndarray, float, int]], # Chunked audio list of (ndarray, time position of chunk relative to original audio, channel_id) + Path # Path to original audio + ] + ] # Listed of Chunked/Resampled audio + audio_feeder_threadpool: list[Future] + feature_extractor_threadpool: list[Future] + features_lock: threading.Lock + audio_feeder_barrier: threading.Barrier # Synchronization barrier for all audio feeder threads + # Output + features: dict[Path, list[tuple[np.ndarray, float, int]]] - rand_paths = dataset.random_audio_chunk(1) - for p in rand_paths: - audio_path_queue.put(p) + def __init__( + self, + paudio_paths: list[Path], + pmax_audio_in_queue: int = 16, + paudio_feeder_threads: int = 8, + pfeature_extractor_threads: int = 8, + pdesired_sr: int = 32000, + pforce_mono: bool = False, + pchunk_length: float = 15.0, + pchunk_overlap: float = 2.0 + ): + # Check if the paths passed in are all valid and add them to queue + self.audio_path_queue = queue.Queue() + for p in paudio_paths: + if not p.is_file(): + raise Exception(f"Path '{p.absolute()}' is NOT a valid file!") + else: + self.audio_path_queue.put(p) + self.audio_path_queue.put(None) # To signal to the producer that the audio path list is empty, since Queue.empty() is unreliable - print("Files queued") + logging.info(f"[MTAFE] [Constructor] Queued {self.audio_path_queue.qsize() - 1} files") + + # Set up private attributes + ## Audio preprocessing parameters + self.desired_sr = pdesired_sr + self.mono = pforce_mono + self.chunk_length = pchunk_length + self.overlap = pchunk_overlap + + ## Extractor/Feeder settings + self.max_audio_in_queue = pmax_audio_in_queue + self.audio_feeder_threads = paudio_feeder_threads + self.feature_extractor_threads = pfeature_extractor_threads + + ## Set up runtime conditions + self.audio_queue = queue.Queue(maxsize=self.max_audio_in_queue) + self.features = {} + self.features_lock = threading.Lock() + self.audio_feeder_barrier = threading.Barrier(self.audio_feeder_threads) + self.audio_feeder_threadpool = [] + self.feature_extractor_threadpool = [] + + logging.info(f"[MTAFE] [Constructor] Extraction parameters: {pdesired_sr}Hz, Mono: {pforce_mono}, Divide into {pchunk_length}s chunks with {pchunk_overlap}s of overlap") + logging.info(f"[MTAFE] [Constructor] Using {paudio_feeder_threads} threads for preprocessing audio and {pfeature_extractor_threads} threads for feature extraction. Max queue size of {pmax_audio_in_queue} files") + + def audio_inference_embedding(self, audio: list[tuple[np.ndarray, float, int]]) -> list[tuple[np.ndarray, float, int]]: + """Receives a list of audio chunks, and then extracts embeddings for all audio chunks, returns the resulting embedding as a list of tuples(embedding, time, channel_id) + + Args: + audio (list[tuple[np.ndarray, float, int]]): list of audio chunks + + Returns: + list[tuple[np.ndarray, float, int]]: List of (embedding vector, timepos, channel id) + """ + features = [] + for audio_chunk in audio: + audio, timepos, channel_id = audio_chunk + zero = np.zeros(32) + features.append( (zero, timepos, channel_id) ) + time.sleep(1.5) # Simulate effort, change to simulate spent seconds in each audio file + return features + # To be overridden + + def audio_feeder_worker(self, thread_id: int, barrier: threading.Barrier): # AI + try: + while True: + # Add timeout to prevent blocking indefinitely + try: + new_audio_path = self.audio_path_queue.get(timeout=10) + except queue.Empty: + logging.warning(f"[MTAFE] [Audio Feeder {thread_id}] Queue get timeout") + continue + + if new_audio_path is None: + self.audio_path_queue.put(new_audio_path) # Put None back + break + + logging.info(f"[MTAFE] [Audio Feeder {thread_id}] Preprocess: {new_audio_path.absolute()}") + + try: + new_audio = audiopreprocessing.load_preprocessed_audio( + new_audio_path, + self.desired_sr, + self.mono, + self.chunk_length, + self.overlap + ) + + # Add timeout to prevent deadlock on full queue + try: + self.audio_queue.put((new_audio, new_audio_path), timeout=30) + logging.info(f"[MTAFE] [Audio Feeder {thread_id}] Feed: {new_audio_path.absolute()}") + except queue.Full: + logging.error(f"[MTAFE] [Audio Feeder {thread_id}] Queue full, skipping {new_audio_path}") + continue + except Exception as e: + logging.error(f"[MTAFE] [Audio Feeder {thread_id}] Error processing {new_audio_path}: {str(e)}") + continue + + # Add barrier timeout to prevent indefinite wait + logging.info(f"[MTAFE] [Audio Feeder {thread_id}] Waiting for other threads") + try: + barrier.wait(timeout=60) + except threading.BrokenBarrierError: + logging.error(f"[MTAFE] [Audio Feeder {thread_id}] Barrier broken") + + if thread_id == 0: + self.audio_queue.put(None) # Signal end + logging.info(f"[MTAFE] [Audio Feeder {thread_id}] Thread finished!") + except Exception as e: + logging.error(f"[MTAFE] [Audio Feeder {thread_id}] Fatal exception: {str(e)}") + logging.exception(e) + # Ensure barrier can progress even if a thread fails + try: + barrier.abort() + except: + pass + # Ensure sentinel is added even if threads fail + if thread_id == 0: + try: + self.audio_queue.put(None, timeout=5) + except: + pass - processes = [multiprocessing.Process(target=copy_worker, args=(audio_path_queue, audio_queue)) for _ in range(1)] - for p in processes: p.start() - for p in processes: p.join() + # def audio_feeder_worker(self, thread_id: int, barrier: threading.Barrier): + # try: + # while True: + # # Attempt to get audio path from audio path queue + # new_audio_path = self.audio_path_queue.get() + # # Check thread exit condition (If the queue returns None, that means the audio path queue is now empty and the thread should end itself) + # if (new_audio_path is None): + # self.audio_path_queue.put(new_audio_path) # Put None back to notify other audio feeder threads + # break # Break out of the infinite loop + # # Audio path queue is not empty: + # logging.info(f"[MTAFE] [Audio Feeder {thread_id}] Preprocess: {new_audio_path.absolute()}") + # new_audio = audiopreprocessing.load_preprocessed_audio( + # new_audio_path, + # self.desired_sr, + # self.mono, + # self.chunk_length, + # self.overlap + # ) + # self.audio_queue.put((new_audio, new_audio_path)) + # logging.info(f"[MTAFE] [Audio Feeder {thread_id}] Feed: {new_audio_path.absolute()}") + # logging.info(f"[MTAFE] [Audio Feeder {thread_id}] Waiting for other threads to finish") + # barrier.wait() + # if (thread_id == 0): + # self.audio_queue.put(None) # None to signal audio_queue has no more elements to process + # logging.info(f"[MTAFE] [Audio Feeder {thread_id}] Thread finished!") + # except Exception as e: + # logging.error(f"[MTAFE] [Audio Feeder {thread_id}] An exception occurred! Committing seppuku!") + # logging.exception(e) + # return - print("Joined") - #for _ in range(1): print(audio_queue.get()) \ No newline at end of file + def feature_extractor_worker(self, thread_id: int): + while True: + # Attempt to get next audio chunks to process + next_audio_tuple = self.audio_queue.get() + # Check thread exit condition + if (next_audio_tuple is None): + self.audio_queue.put(next_audio_tuple) # Put the None back to notify other threads + break # unalive urself + else: # Assuming we got more tuples + current_audio_to_process, current_audio_path = next_audio_tuple # Deconstruct tuple + logging.info(f"[MTAFE] [Feature Extractor {thread_id}] Extracting: {current_audio_path}") + features_to_add = self.audio_inference_embedding(current_audio_to_process) + with self.features_lock: + self.features[current_audio_path] = features_to_add + logging.info(f"[MTAFE] [Feature Extractor {thread_id}] Feature Extraction complete for {current_audio_path} w/ {len(features_to_add)} features") + logging.info(f"[MTAFE] [Feature Extractor {thread_id}] Thread finished!") + + def test_audio_feeder_worker(self): + total_file_amount = self.audio_path_queue.qsize() - 1 + logging.info("[MTAFE] [test_audio_feeder_worker] Spinning up new threads...") + with ThreadPoolExecutor(max_workers=self.audio_feeder_threads) as executor: + for i in range(self.audio_feeder_threads): + ld_ft = executor.submit(self.audio_feeder_worker, i, self.audio_feeder_barrier) + self.audio_feeder_threadpool.append(ld_ft) + logging.info(f"[MTAFE] [test_audio_feeder_worker] Launched audio feeder {i}") + for i in range(total_file_amount): + _, p = self.audio_queue.get() + time.sleep(0.25) + logging.info(f"[MTAFE] [test_audio_feeder_worker] Popped: {p}") + logging.info("[MTAFE] [test_audio_feeder_worker] All audio feeder worker joined!") + #logging.info(f"[MTAFE] [test_audio_feeder_worker] Current audio queue size: {self.audio_queue.qsize()}") + + def count_running_threads(self) -> tuple[int, int]: + running_extractors = 0 + running_feeders = 0 + for ft in self.feature_extractor_threadpool: + if ft.running(): running_extractors += 1 + for ft in self.audio_feeder_threadpool: + if ft.running(): running_feeders += 1 + return (running_feeders, running_extractors) + + def check_all_audiofeed_thread_finished(self) -> bool: + for ft in self.audio_feeder_threadpool: + if ft.running(): + return False + return True + + def check_all_featureextractor_thread_finished(self) -> bool: + for ft in self.feature_extractor_threadpool: + if ft.running(): + return False + return True + + def extract(self): + total_amount = self.audio_path_queue.qsize() - 1 # Account for None to indicate queue end + logging.info(f"[MTAFE] [Main] Starting feature extraction for {total_amount} file(s)") + t_start = time.perf_counter() # Timer + with ThreadPoolExecutor(max_workers=(self.audio_feeder_threads + self.feature_extractor_threads)) as executor: + # Audio feeder threads + for i in range(self.audio_feeder_threads): + logging.info(f"[MTAFE] Started audio feeder thread {i}") + ld_ft = executor.submit(self.audio_feeder_worker, i, self.audio_feeder_barrier) + self.audio_feeder_threadpool.append(ld_ft) + # Feature extractor threads + for i in range(self.feature_extractor_threads): + logging.info(f"[MTAFE] Started feature extractor thread {i}") + ex_ft = executor.submit(self.feature_extractor_worker, i) + self.feature_extractor_threadpool.append(ex_ft) + # Progress checking + while ( (not self.check_all_audiofeed_thread_finished()) and (not self.check_all_featureextractor_thread_finished()) ): + nfeeder, nextract = self.count_running_threads() + print(f"[MTAFE Progress] Processed {len(self.features)}/{total_amount} (L:{self.audio_queue.qsize()}/W:{self.audio_path_queue.qsize()}, LD:{nfeeder}/EXT:{nextract})", end="\r") + t_stop = time.perf_counter() + logging.info(f"[MTAFE] Processed {len(self.features)}/{total_amount} (L:{self.audio_queue.qsize() - 1}/W:{self.audio_path_queue.qsize() - 1} COMPLETE)") + delta_t = t_stop - t_start + total_features = sum( [len(self.features[path]) for path in self.features] ) + logging.info(f"[MTAFE] Extraction complete. Took {delta_t} seconds. Added {total_features} vectors/embeddings") \ No newline at end of file diff --git a/mtafe_lab/test_mp.py b/mtafe_lab/test_mp.py deleted file mode 100644 index f6c5f20..0000000 --- a/mtafe_lab/test_mp.py +++ /dev/null @@ -1,30 +0,0 @@ -import logging -logging.basicConfig(format="%(asctime)s/%(levelname)s: [%(module)s] %(message)s", level=logging.INFO) - -import multiprocessing -from dataset import random_audio_chunk -import audiopreprocessing -from time import sleep - -origin_queue = multiprocessing.Queue() -target_queue = multiprocessing.Queue() - -def worker(orig, targ): - p = orig.get() - #out = "PROCESSED" + str(p.absolute()) - out = audiopreprocessing.load_preprocessed_audio(p, 16000, True) # This will cause put to hang - targ.put(out) # This will hang the process - -if __name__ == "__main__": - K = 2 - - for p in random_audio_chunk(K): - origin_queue.put(p) - - processes = [multiprocessing.Process(target=worker, args=(origin_queue, target_queue)) for _ in range(K)] - for p in processes: p.start() - for p in processes: p.join() - - logging.critical("Successfully terminated all threads") - - for _ in range(K): print(target_queue.get()) \ No newline at end of file diff --git a/mtafe_lab/test_mtafe.py b/mtafe_lab/test_mtafe.py index e77eb52..9c8d55a 100644 --- a/mtafe_lab/test_mtafe.py +++ b/mtafe_lab/test_mtafe.py @@ -5,17 +5,18 @@ import mtafe from dataset import random_audio_chunk logging.info("Generating random audio path list") -rdpl = random_audio_chunk(2) +rdpl = random_audio_chunk(256) logging.info("Initializing MTAFE") -mtafe.initialize_parameters( - paudio_paths=rdpl, - pmax_audio_in_queue=4, - paudio_feeder_threads=2, - pfeature_extractor_threads=1, - pdesired_sr=32000, - pforce_mono=False, - pchunk_length=15, - pchunk_overlap=2 +m = mtafe.mtafe( + paudio_paths=rdpl, + pmax_audio_in_queue=8, + paudio_feeder_threads=8, + pfeature_extractor_threads=2, + pdesired_sr=32000, + pforce_mono=False, + pchunk_length=15, + pchunk_overlap=2 ) -mtafe.test_feeder() \ No newline at end of file +#m.test_audio_feeder_worker() +m.extract() \ No newline at end of file diff --git a/mtafe_lab/testmtafeprofile.txt b/mtafe_lab/testmtafeprofile.txt new file mode 100644 index 0000000..b3e97ae Binary files /dev/null and b/mtafe_lab/testmtafeprofile.txt differ