From 37b6a3c5e7093ccc33db6ac8e479cd4f16babb31 Mon Sep 17 00:00:00 2001 From: qtnull Date: Sat, 19 Apr 2025 17:47:09 +0200 Subject: [PATCH] I'm burnt out, I can't get multithreaded audio feature extractor to work :( --- FeatureExtraction/dataset_files.py | 400 ++++++++++++++++------------- FeatureExtraction/mtafe.py | 8 + FeatureExtraction/test.py | 5 +- FeatureExtraction/test_mtafe.py | 17 ++ mtafe_lab/audiopreprocessing.py | 95 +++++++ mtafe_lab/dataset.py | 135 ++++++++++ mtafe_lab/mtafe.py | 32 +++ mtafe_lab/test_mp.py | 30 +++ mtafe_lab/test_mtafe.py | 21 ++ 9 files changed, 563 insertions(+), 180 deletions(-) create mode 100644 FeatureExtraction/mtafe.py create mode 100644 FeatureExtraction/test_mtafe.py create mode 100644 mtafe_lab/audiopreprocessing.py create mode 100644 mtafe_lab/dataset.py create mode 100644 mtafe_lab/mtafe.py create mode 100644 mtafe_lab/test_mp.py create mode 100644 mtafe_lab/test_mtafe.py diff --git a/FeatureExtraction/dataset_files.py b/FeatureExtraction/dataset_files.py index 46c840a..42f1896 100644 --- a/FeatureExtraction/dataset_files.py +++ b/FeatureExtraction/dataset_files.py @@ -2,6 +2,7 @@ import platform import os import pickle import random +import multiprocessing import threading import time import concurrent.futures @@ -133,155 +134,159 @@ def random_audio_chunk(n : int, seed : int = 177013) -> list[Path]: #return random.choices(audio_paths, k=n) # Contains repeated elements return random.sample(audio_paths, k=n) -class AudioFeatureExtractor(): - __audio_queue: list[ # List of ... - tuple[ # Pair of chunked audio and its path - list[tuple[np.ndarray, float, int]], # Chunked audio - Path # Path to original audio - ] - ] # Listed of Chunked/Resampled audio - __feeder_future: concurrent.futures.Future - __extractor_future: concurrent.futures.Future - __audio_paths_list: list[Path] - __max_audio_in_queue: int - __queue_lock: threading.Lock - __desired_sr: int - __mono: bool - __chunk_length: float - __overlap: float - __features: dict[Path, list[tuple[np.ndarray, float, int]]] # This is a crime, I know - # { audioPath: - # [(embedding, pos, channel)...] - # } +# class AudioFeatureExtractor(): +# __audio_queue: list[ # List of ... +# tuple[ # Pair of chunked audio and its path +# list[tuple[np.ndarray, float, int]], # Chunked audio +# Path # Path to original audio +# ] +# ] # Listed of Chunked/Resampled audio +# __feeder_future: concurrent.futures.Future +# __extractor_future: concurrent.futures.Future +# __audio_paths_list: list[Path] +# __max_audio_in_queue: int +# __queue_lock: threading.Lock +# __desired_sr: int +# __mono: bool +# __chunk_length: float +# __overlap: float +# __features: dict[Path, list[tuple[np.ndarray, float, int]]] # This is a crime, I know +# # { audioPath: +# # [(embedding, pos, channel)...] +# # } - def __embedding_inference(self, audio_ndarray: np.ndarray) -> np.ndarray: - """Uses embedding model to inference an audio. Returns embedding vectors. - Function to be overrided. Returns np.zeros(32). +# def __embedding_inference(self, audio_ndarray: np.ndarray) -> np.ndarray: +# """Uses embedding model to inference an audio. Returns embedding vectors. +# Function to be overrided. Returns np.zeros(32). - Args: - audio_ndarray (np.ndarray): +# Args: +# audio_ndarray (np.ndarray): - Returns: - np.ndarray: _description_ - """ - return np.zeros(32) +# Returns: +# np.ndarray: _description_ +# """ +# return np.zeros(32) - def __embedding_extract(self, audio: tuple[np.ndarray, float, int]) -> tuple[np.ndarray, float, int, np.ndarray]: - """Receives a tuple of audio, position, and channel ID, then adding the embedding to the tuple +# def __embedding_extract(self, audio: tuple[np.ndarray, float, int]) -> tuple[np.ndarray, float, int, np.ndarray]: +# """Receives a tuple of audio, position, and channel ID, then adding the embedding to the tuple - Args: - audio (tuple[np.ndarray, float, int]): tuple of audio, position, channel id +# Args: +# audio (tuple[np.ndarray, float, int]): tuple of audio, position, channel id - Returns: - tuple[np.ndarray, float, int, np.ndarray]: audio, position, channel id, embedding vector - """ - audio_chunk, pos, channel_id = audio - return (audio_chunk, pos, channel_id, self.__embedding_inference(audio_chunk)) +# Returns: +# tuple[np.ndarray, float, int, np.ndarray]: audio, position, channel id, embedding vector +# """ +# audio_chunk, pos, channel_id = audio +# return (audio_chunk, pos, channel_id, self.__embedding_inference(audio_chunk)) - def __audio_queue_feeder(self): # TODO: Upgrade to multithreaded loader? - """Internal thread function. Preprocess and load the audio continuously to - audio_queue until the end of the audio_paths_list - """ - while (self.__audio_paths_list): # While there are still Path elements in path list - if (not (len(self.__audio_queue) < self.__max_audio_in_queue)): - logging.info("[AFE] [Audio Queue Thread]: Queue Full, feeder thread sleeping for 5 seconds") - time.sleep(5) - while(len(self.__audio_queue) < self.__max_audio_in_queue): # While the audio queue is not full - new_audio_path = self.__audio_paths_list[0] - new_audio = audiopreprocessing.load_preprocessed_audio( - new_audio_path, - self.__desired_sr, - self.__mono, - self.__chunk_length, - self.__overlap - ) - with self.__queue_lock: - self.__audio_queue.append( - (new_audio, new_audio_path) - ) - pop_path = self.__audio_paths_list.pop(0) - logging.info(f"[AFE] [Audio Queue Thread]: Added new audio to queue {pop_path}") - logging.info("[AFE] [Audio Queue Thread]: DONE. All audio files fed") +# def __audio_queue_feeder(self): # TODO: Upgrade to multithreaded loader? +# """Internal thread function. Preprocess and load the audio continuously to +# audio_queue until the end of the audio_paths_list +# """ +# while (self.__audio_paths_list): # While there are still Path elements in path list +# if (not (len(self.__audio_queue) < self.__max_audio_in_queue)): +# logging.info("[AFE] [Audio Queue Thread]: Queue Full, feeder thread sleeping for 5 seconds") +# time.sleep(5) +# while(len(self.__audio_queue) < self.__max_audio_in_queue): # While the audio queue is not full +# new_audio_path = self.__audio_paths_list[0] +# new_audio = audiopreprocessing.load_preprocessed_audio( +# new_audio_path, +# self.__desired_sr, +# self.__mono, +# self.__chunk_length, +# self.__overlap +# ) +# with self.__queue_lock: +# self.__audio_queue.append( +# (new_audio, new_audio_path) +# ) +# pop_path = self.__audio_paths_list.pop(0) +# logging.info(f"[AFE] [Audio Queue Thread]: Added new audio to queue {pop_path}") +# logging.info("[AFE] [Audio Queue Thread]: DONE. All audio files fed") - def __audio_queue_feature_extractor(self): - """Internal thread function. Get audio from audio queue. And extract embedding vector - for all audio chunks. Stores the resulting embedding into self.__features. - With Original Audio's Path as key, and list[tuple[np.ndarray, float, int]] (list of tuple of embedding vector, position, channel id) - """ - while (self.__audio_paths_list or self.__audio_queue): # While there are still audio to be processed - if (self.__audio_queue): # If audio queue is not empty - with self.__queue_lock: - audio_to_process, audio_path = self.__audio_queue.pop(0) # Get audio from queue - logging.info(f"[AFE] [Feature Extractor Thread]: Extracting {len(audio_to_process)} features from audio {audio_path}") - for audio_chunk in audio_to_process: - same_audio_chunk, timepos, channel_id, embedd_vect = self.__embedding_extract(audio_chunk) - if (audio_path not in self.__features.keys()): - #if DEBUG: print("Adding new vector to", audio_path.name) - self.__features[audio_path] = [(embedd_vect, timepos, channel_id)] - else: - #if DEBUG: print("Adding vector to", audio_path.name) - self.__features[audio_path].append( - (embedd_vect, timepos, channel_id) - ) - else: - logging.info("[AFE] [Feature Extractor Thread]: Queue Empty, extractor thread sleeping for 5 seconds") # If audio queue is empty, wait - time.sleep(5) - logging.info("[AFE] [Feature Extractor Thread]: DONE. Extracted all features from all audio files") +# def __audio_queue_feature_extractor(self): +# """Internal thread function. Get audio from audio queue. And extract embedding vector +# for all audio chunks. Stores the resulting embedding into self.__features. +# With Original Audio's Path as key, and list[tuple[np.ndarray, float, int]] (list of tuple of embedding vector, position, channel id) +# """ +# while (self.__audio_paths_list or self.__audio_queue): # While there are still audio to be processed +# if (self.__audio_queue): # If audio queue is not empty +# with self.__queue_lock: +# audio_to_process, audio_path = self.__audio_queue.pop(0) # Get audio from queue +# logging.info(f"[AFE] [Feature Extractor Thread]: Extracting {len(audio_to_process)} features from audio {audio_path}") +# for audio_chunk in audio_to_process: +# same_audio_chunk, timepos, channel_id, embedd_vect = self.__embedding_extract(audio_chunk) +# if (audio_path not in self.__features.keys()): +# #if DEBUG: print("Adding new vector to", audio_path.name) +# self.__features[audio_path] = [(embedd_vect, timepos, channel_id)] +# else: +# #if DEBUG: print("Adding vector to", audio_path.name) +# self.__features[audio_path].append( +# (embedd_vect, timepos, channel_id) +# ) +# else: +# logging.info("[AFE] [Feature Extractor Thread]: Queue Empty, extractor thread sleeping for 5 seconds") # If audio queue is empty, wait +# time.sleep(5) +# logging.info("[AFE] [Feature Extractor Thread]: DONE. Extracted all features from all audio files") - def __init__( - self, - audio_paths_list: list[Path], - max_audio_in_queue: int, - desired_sr: int, - mono: bool, - chunk_length: float = 15.0, - overlap: float = 2.0 - ): - self.__audio_queue = [] - self.__audio_paths_list = audio_paths_list - self.__max_audio_in_queue = max_audio_in_queue - self.__queue_lock = threading.Lock() - self.__desired_sr = desired_sr - self.__mono = mono - self.__chunk_length = chunk_length - self.__overlap = overlap - self.__features = {} +# def __init__( +# self, +# audio_paths_list: list[Path], +# max_audio_in_queue: int, +# desired_sr: int, +# mono: bool, +# chunk_length: float = 15.0, +# overlap: float = 2.0 +# ): +# self.__audio_queue = [] +# self.__audio_paths_list = audio_paths_list +# self.__max_audio_in_queue = max_audio_in_queue +# self.__queue_lock = threading.Lock() +# self.__desired_sr = desired_sr +# self.__mono = mono +# self.__chunk_length = chunk_length +# self.__overlap = overlap +# self.__features = {} - @property - def features(self) -> dict[Path, list[tuple[np.ndarray, float, int]]]: - return self.__features +# @property +# def features(self) -> dict[Path, list[tuple[np.ndarray, float, int]]]: +# return self.__features - def extract(self): - print("Starting feature extraction for", len(self.__audio_paths_list), "file(s)") - total_amount = len(self.__audio_paths_list) - t_start = time.perf_counter() - with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor: - self.__feeder_future = executor.submit(self.__audio_queue_feeder) - self.__extractor_future = executor.submit(self.__audio_queue_feature_extractor) - while (self.__feeder_future.running() or self.__extractor_future.running()): - print(f"Processed {len(self.__features)}/{total_amount} (L:{len(self.__audio_queue)}/W{len(self.__audio_paths_list)})", end="\r") - time.sleep(1) +# def extract(self): +# print("Starting feature extraction for", len(self.__audio_paths_list), "file(s)") +# total_amount = len(self.__audio_paths_list) +# t_start = time.perf_counter() +# with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor: +# self.__feeder_future = executor.submit(self.__audio_queue_feeder) +# self.__extractor_future = executor.submit(self.__audio_queue_feature_extractor) +# while (self.__feeder_future.running() or self.__extractor_future.running()): +# print(f"Processed {len(self.__features)}/{total_amount} (L:{len(self.__audio_queue)}/W{len(self.__audio_paths_list)})", end="\r") +# time.sleep(1) - t_stop = time.perf_counter() - print(f"Processed {len(self.__features)}/{total_amount} (L:{len(self.__audio_queue)}/W:{len(self.__audio_paths_list)} COMPLETE)") - delta_t = t_stop - t_start - total_features = sum( [len(self.__features[path]) for path in self.__features] ) - print() - print("Extraction completed") - print(f"Took {delta_t} seconds. Added {total_features} vectors/embeddings") +# t_stop = time.perf_counter() +# print(f"Processed {len(self.__features)}/{total_amount} (L:{len(self.__audio_queue)}/W:{len(self.__audio_paths_list)} COMPLETE)") +# delta_t = t_stop - t_start +# total_features = sum( [len(self.__features[path]) for path in self.__features] ) +# print() +# print("Extraction completed") +# print(f"Took {delta_t} seconds. Added {total_features} vectors/embeddings") class MultiThreadedAudioFeatureExtractor(): + # This is the third time I am rewriting this, please send help. Multithreaded apps is pure hell to develop and debug + # After testing: this will hang at the last audio, precisely at preprocessing audio. I suspect that GIL hit the performance + # so much to the point that the preprocessing routine cannot get any share of the CPU execution cycle __audio_queue: queue.Queue[ # List of ... tuple[ # Pair of chunked audio and its path - list[tuple[np.ndarray, float, int]], # Chunked audio + list[tuple[np.ndarray, float, int]], # Chunked audio list of (ndarray, time position of chunk relative to original audio, channel_id) Path # Path to original audio ] ] # Listed of Chunked/Resampled audio - __audio_loader_threads: int # Amount of audio feeder threads + __audio_feeder_threads: int # Amount of audio feeder threads __feature_extractor_threads: int # Amount of feature extractor threads (if the method allows) __audio_paths_list: queue.Queue[Path] # Path list to audio __max_audio_in_queue: int # Maximum audio in queue - # Audio Feeeder parameter + __audio_feeder_barrier: threading.Barrier # Synchronization barrier for all audio feeder threads + # Audio Feeder parameter __desired_sr: int # Desired Sample Rate (Resampling) __mono: bool # Force load audio in mono mode __chunk_length: float # Audio chunk length @@ -295,9 +300,8 @@ class MultiThreadedAudioFeatureExtractor(): # ... # } # Runtime - __audio_loader_threadpool: list[concurrent.futures.Future] + __audio_feeder_threadpool: list[concurrent.futures.Future] __feature_extractor_threadpool: list[concurrent.futures.Future] - __audio_feed_condition: threading.Condition def __audio_inference_embedding(self, audio: list[tuple[np.ndarray, float, int]]) -> list[tuple[np.ndarray, float, int]]: """Receives a list of audio chunks, and then extracts embeddings for all audio chunks, returns the resulting embedding as a list of tuples(embedding, time, channel_id) @@ -313,18 +317,21 @@ class MultiThreadedAudioFeatureExtractor(): audio, timepos, channel_id = audio_chunk zero = np.zeros(32) features.append( (zero, timepos, channel_id) ) - time.sleep(0.01) + time.sleep(0.01) # Simulate effort, change to simulate spent seconds in each audio file return features # To be overridden - def __audio_feeder_thread(self, thread_id): - # If there is still audio in paths list - # Is the audio queue not full? - while (not self.__audio_paths_list.empty()): - if (not self.__audio_queue.full()): - # Feed audio + def __audio_feeder_thread(self, thread_id: int, barrier: threading.Barrier): + try: + while True: + # Attempt to get audio path from audio path queue new_audio_path = self.__audio_paths_list.get() - self.__audio_paths_list.task_done() + # Check thread exit condition (If the queue returns None, that means the audio path queue is now empty and the thread should end itself) + if (new_audio_path is None): + self.__audio_paths_list.put(new_audio_path) # Put None back to notify other audio feeder threads + # Omae wa mou shindeiru + break # Si la ETSISI ve esto seguramente me echarán de la escuela + # Now that the audio path queue is not empty, try preprocessing an audio logging.info(f"[MTAFE] [Audio Feeder {thread_id}] Preprocess: {new_audio_path.absolute()}") new_audio = audiopreprocessing.load_preprocessed_audio( new_audio_path, @@ -333,17 +340,34 @@ class MultiThreadedAudioFeatureExtractor(): self.__chunk_length, self.__overlap ) - self.__audio_queue.put((new_audio, new_audio_path)) - #self.__audio_queue.task_done() - #with self.__audio_feed_condition: self.__audio_feed_condition.notify_all() + self.__audio_queue.put((new_audio, new_audio_path)) # In theory, this should block this audio feeder thread when the audio queue is full logging.info(f"[MTAFE] [Audio Feeder {thread_id}] Feed: {new_audio_path.absolute()}") - #else: - # logging.info(f"[MTAFE] [Audio Feeder {thread_id}] Audio queue full ({self.__audio_queue.qsize()} <= {self.__max_audio_in_queue} FALSE): waiting") - # with self.__audio_feed_condition: - # logging.info(f"[MTAFE] [Audio Feeder {thread_id}] Audio queue full: waiting") - # self.__audio_feed_condition.wait_for(lambda: not self.__audio_queue.qsize() <= self.__max_audio_in_queue) # This consumes way too much CPU power - # self.__audio_feed_condition.wait(10) - logging.info(f"[MTAFE] [Audio Feeder {thread_id}] Thread finished!") + logging.info("[MTAFE] [Audio Feeder {thread_id}] Waiting for other threads to finish") + barrier.wait() + if (thread_id == 0): + self.__audio_queue.put(None) # None to signal audio_queue has no more elements to process + logging.info(f"[MTAFE] [Audio Feeder {thread_id}] Thread finished!") + except Exception as e: + logging.error(f"[MTAFE] [Audio Feeder {thread_id}] An exception occurred! Committing seppuku!") + logging.exception(e) + return + + # while (not self.__audio_paths_list.empty()): + # if (not self.__audio_queue.full()): + # # Feed audio + # new_audio_path = self.__audio_paths_list.get() + # self.__audio_paths_list.task_done() + # logging.info(f"[MTAFE] [Audio Feeder {thread_id}] Preprocess: {new_audio_path.absolute()}") + # new_audio = audiopreprocessing.load_preprocessed_audio( + # new_audio_path, + # self.__desired_sr, + # self.__mono, + # self.__chunk_length, + # self.__overlap + # ) + # self.__audio_queue.put((new_audio, new_audio_path)) + # logging.info(f"[MTAFE] [Audio Feeder {thread_id}] Feed: {new_audio_path.absolute()}") + # logging.info(f"[MTAFE] [Audio Feeder {thread_id}] Thread finished!") #def testfeedthread(self, nthreads): # t1 = threading.Thread(target=self.__audio_feeder_thread, args=(1,)) @@ -358,7 +382,7 @@ class MultiThreadedAudioFeatureExtractor(): # self.__audio_loader_threadpool.append(ft) def __check_all_audiofeed_thread_finished(self) -> bool: - for ft in self.__audio_loader_threadpool: + for ft in self.__audio_feeder_threadpool: if ft.running(): return False return True @@ -370,17 +394,33 @@ class MultiThreadedAudioFeatureExtractor(): return True def __feature_extractor_thread(self, thread_id): - while (not self.__check_all_audiofeed_thread_finished() or not self.__audio_queue.empty()): - if (not self.__audio_queue.empty()): - audio_to_process, audio_path = self.__audio_queue.get() - self.__audio_queue.task_done() - logging.info(f"[MTAFE] [Feature Extractor {thread_id}] Extracting: {audio_path}") - features_to_add = self.__audio_inference_embedding(audio_to_process) - logging.info(f"[MTAFE] [Feature Extractor {thread_id}] Extracted: {len(features_to_add)} features") - with self.__features_lock: - self.__features[audio_path] = features_to_add - #with self.__audio_feed_condition: self.__audio_feed_condition.notify_all() - logging.info(f"[MTAFE] [Feature Extractor {thread_id}] Feature Extraction complete for {audio_path} w/ {len(features_to_add)} features") + while True: + # Attempt to get next audio chunks to process + next_audio_tuple = self.__audio_queue.get() + # Check thread exit condition + if (next_audio_tuple is None): + self.__audio_queue.put(next_audio_tuple) # Put the None back to notify other threads + break # unalive urself + else: # Assuming we got more tuples + current_audio_to_process, current_audio_path = next_audio_tuple # Deconstruct tuple + logging.info(f"[MTAFE] [Feature Extractor {thread_id}] Extracting: {current_audio_path}") + features_to_add = self.__audio_inference_embedding(current_audio_to_process) + with self.__features_lock: + self.__features[current_audio_path] = features_to_add + logging.info(f"[MTAFE] [Feature Extractor {thread_id}] Feature Extraction complete for {current_audio_path} w/ {len(features_to_add)} features") + logging.info(f"[MTAFE] [Feature Extractor {thread_id}] Thread finished!") + + # while (not self.__check_all_audiofeed_thread_finished() or not self.__audio_queue.empty()): + # if (not self.__audio_queue.empty()): + # audio_to_process, audio_path = self.__audio_queue.get() + # self.__audio_queue.task_done() + # logging.info(f"[MTAFE] [Feature Extractor {thread_id}] Extracting: {audio_path}") + # features_to_add = self.__audio_inference_embedding(audio_to_process) + # logging.info(f"[MTAFE] [Feature Extractor {thread_id}] Extracted: {len(features_to_add)} features") + # with self.__features_lock: + # self.__features[audio_path] = features_to_add + # #with self.__audio_feed_condition: self.__audio_feed_condition.notify_all() + # logging.info(f"[MTAFE] [Feature Extractor {thread_id}] Feature Extraction complete for {audio_path} w/ {len(features_to_add)} features") #else: # if (not self.__check_all_audiofeed_thread_finished()): # with self.__audio_feed_condition: @@ -388,14 +428,13 @@ class MultiThreadedAudioFeatureExtractor(): # self.__audio_feed_condition.wait(10) # self.__audio_feed_condition.wait_for(lambda: not self.__audio_queue.empty()) - logging.info(f"[MTAFE] [Feature Extractor {thread_id}] Thread finished!") def __count_running_threads(self) -> tuple[int, int]: running_extractors = 0 running_feeders = 0 for ft in self.__feature_extractor_threadpool: if ft.running(): running_extractors += 1 - for ft in self.__audio_loader_threadpool: + for ft in self.__audio_feeder_threadpool: if ft.running(): running_feeders += 1 return (running_feeders, running_extractors) @@ -404,21 +443,26 @@ class MultiThreadedAudioFeatureExtractor(): return self.__features def extract(self): - total_amount = self.__audio_paths_list.qsize() + total_amount = self.__audio_paths_list.qsize() - 1 # Account for None to indicate queue end logging.info(f"[MTAFE] [Main] Starting feature extraction for {total_amount} file(s)") - t_start = time.perf_counter() - with concurrent.futures.ThreadPoolExecutor(max_workers=(self.__audio_loader_threads + self.__feature_extractor_threads)) as executor: - for i in range(self.__audio_loader_threads): - ld_ft = executor.submit(self.__audio_feeder_thread, i) - self.__audio_loader_threadpool.append(ld_ft) + t_start = time.perf_counter() # Timer + with concurrent.futures.ProcessPoolExecutor(max_workers=(self.__audio_feeder_threads + self.__feature_extractor_threads)) as executor: + # Audio feeder threads + for i in range(self.__audio_feeder_threads): + logging.info(f"[MTAFE] Started audio feeder thread {i}") + ld_ft = executor.submit(self.__audio_feeder_thread, i, self.__audio_feeder_barrier) + self.__audio_feeder_threadpool.append(ld_ft) + # Feature extractor threads for i in range(self.__feature_extractor_threads): - ld_ft = executor.submit(self.__feature_extractor_thread, i) - self.__feature_extractor_threadpool.append(ld_ft) + logging.info(f"[MTAFE] Started feature extractor thread {i}") + ex_ft = executor.submit(self.__feature_extractor_thread, i) + self.__feature_extractor_threadpool.append(ex_ft) + # Progress checking while ( (not self.__check_all_audiofeed_thread_finished()) and (not self.__check_all_featureextractor_thread_finished()) ): nfeeder, nextract = self.__count_running_threads() print(f"[MTAFE Progress] Processed {len(self.__features)}/{total_amount} (L:{self.__audio_queue.qsize()}/W:{self.__audio_paths_list.qsize()}, LD:{nfeeder}/EXT:{nextract})", end="\r") t_stop = time.perf_counter() - logging.info(f"[MTAFE] Processed {len(self.__features)}/{total_amount} (L:{self.__audio_queue.qsize()}/W:{self.__audio_paths_list.qsize()} COMPLETE)") + logging.info(f"[MTAFE] Processed {len(self.__features)}/{total_amount} (L:{self.__audio_queue.qsize() - 1}/W:{self.__audio_paths_list.qsize() - 1} COMPLETE)") delta_t = t_stop - t_start total_features = sum( [len(self.__features[path]) for path in self.__features] ) logging.info(f"[MTAFE] Extraction complete. Took {delta_t} seconds. Added {total_features} vectors/embeddings") @@ -435,15 +479,15 @@ class MultiThreadedAudioFeatureExtractor(): chunk_overlap: float = 2.0, ): # Check if the paths passed in are all valid and add them to queue - self.__audio_paths_list = queue.Queue() + self.__audio_paths_list = multiprocessing.Queue() for p in audio_paths: if not p.is_file(): raise Exception(f"Path '{p.absolute()}' is NOT a valid file!") else: self.__audio_paths_list.put(p) - #self.__audio_paths_list.task_done() + self.__audio_paths_list.put(None) # To signal to the producer that the audio path list is empty, since Queue.empty() is unreliable - logging.info(f"[MTAFE] [Constructor] Queued {self.__audio_paths_list.qsize()} files") + logging.info(f"[MTAFE] [Constructor] Queued {self.__audio_paths_list.qsize() - 1} files") # Set up private attributes ## Audio preprocessing parameters @@ -454,16 +498,16 @@ class MultiThreadedAudioFeatureExtractor(): ## Extractor/Feeder settings self.__max_audio_in_queue = max_audio_in_queue - self.__audio_loader_threads = audio_feeder_threads + self.__audio_feeder_threads = audio_feeder_threads self.__feature_extractor_threads = feature_extractor_threads ## Set up runtime conditions - self.__audio_queue = queue.Queue() + self.__audio_queue = multiprocessing.Queue(maxsize=self.__max_audio_in_queue) self.__features = {} - self.__features_lock = threading.Lock() - self.__audio_loader_threadpool = [] + self.__features_lock = multiprocessing.Lock() + self.__audio_feeder_barrier = multiprocessing.Barrier(self.__audio_feeder_threads) + self.__audio_feeder_threadpool = [] self.__feature_extractor_threadpool = [] - self.__audio_feed_condition = threading.Condition() logging.info(f"[MTAFE] [Constructor] Extraction parameters: {desired_sr}Hz, Mono: {force_mono}, Divide into {chunk_length}s chunks with {chunk_overlap}s of overlap") logging.info(f"[MTAFE] [Constructor] Using {audio_feeder_threads} threads for preprocessing audio and {feature_extractor_threads} threads for feature extraction. Max queue size of {max_audio_in_queue} files") diff --git a/FeatureExtraction/mtafe.py b/FeatureExtraction/mtafe.py new file mode 100644 index 0000000..f3696c1 --- /dev/null +++ b/FeatureExtraction/mtafe.py @@ -0,0 +1,8 @@ +import dataset_files +import multiprocessing +import logging +import numpy as np +import threading +import queue +from pathlib import Path + diff --git a/FeatureExtraction/test.py b/FeatureExtraction/test.py index cd2c877..497f7d5 100644 --- a/FeatureExtraction/test.py +++ b/FeatureExtraction/test.py @@ -5,12 +5,13 @@ logging.basicConfig(format="%(asctime)s/%(levelname)s: [%(module)s] %(message)s" from dataset_files import MultiThreadedAudioFeatureExtractor, random_audio_chunk mtafe = MultiThreadedAudioFeatureExtractor( - audio_paths=random_audio_chunk(200), + audio_paths=random_audio_chunk(8), max_audio_in_queue=8, audio_feeder_threads=8, feature_extractor_threads=1, desired_sr=32000, force_mono=False, chunk_length=15, - chunk_overlap=2) + chunk_overlap=2 +) mtafe.extract() diff --git a/FeatureExtraction/test_mtafe.py b/FeatureExtraction/test_mtafe.py new file mode 100644 index 0000000..daec5fc --- /dev/null +++ b/FeatureExtraction/test_mtafe.py @@ -0,0 +1,17 @@ +#import mtafe +import logging +#import dataset_files +logging.basicConfig(format="%(asctime)s/%(levelname)s: [%(module)s] %(message)s", level=logging.DEBUG) + +logging.info("Running tests") + # m = mtafe.mtafe( + # audio_paths=dataset_files.random_audio_chunk(2), + # max_audio_in_queue=8, + # audio_feeder_threads=8, + # feature_extractor_threads=1, + # desired_sr=32000, + # force_mono=False, + # chunk_length=15, + # chunk_overlap=2 + # ) + # m.run() \ No newline at end of file diff --git a/mtafe_lab/audiopreprocessing.py b/mtafe_lab/audiopreprocessing.py new file mode 100644 index 0000000..a548612 --- /dev/null +++ b/mtafe_lab/audiopreprocessing.py @@ -0,0 +1,95 @@ +import librosa +import pickle +import os +import numpy as np +from pathlib import Path +import logging + +logger = logging.getLogger(__name__) + +def triggerlog(): + logger.critical("Testing: info") + +def resample_load(input_path : Path, target_sr : int = 16000, mono_audio : bool = False) -> np.ndarray: # AI + """Load and resamples the audio into `target_sr`. + + Args: + input_path (Path): pathlib.Path object to audio file + target_sr (int, optional): Target Sample Rate to resample. Defaults to 16000. + mono_audio (bool, optional): Load the audio in mono mode. Defaults to False. + + Returns: + np.ndarray: _description_ + """ + # Load audio file with original sample rate + logger.info(f"[resample_load] Loading audio {input_path}") + audio, orig_sr = librosa.load(input_path, sr=None, mono=mono_audio) + + # Resample if necessary + if orig_sr != target_sr: + logger.info(f"[resample_load] Resampling to {target_sr}") + audio = librosa.resample(audio, orig_sr=orig_sr, target_sr=target_sr) + + return audio + +def chunk_audio(audio : np.ndarray, sr: int, chunk_length: float = 10.0, overlap: float = 2.0) -> tuple[list[np.ndarray], list[float], int]: # AI + """ + Chunks audio file into overlapping segments. Only pass in mono audio here. + + Args: + audio_file: Loaded audio ndarray (one channel only) + sr: Sample rate for the given audio file + chunk_length: Length of each chunk in seconds + overlap: Overlap between chunks in seconds + + Returns: + List of audio chunks, list of chunk positions, and given sample rate + """ + logger.info(f"[chunk_audio] Chunking audio ({len(audio) / sr}s)") + # Calculate chunk size and hop length in samples + chunk_size = int(chunk_length * sr) + hop_length = int((chunk_length - overlap) * sr) + + # Generate chunks + chunks = [] + positions = [] + k = 0 + for i in range(0, len(audio) - chunk_size + 1, hop_length): + chunk = audio[i:i + chunk_size] + chunks.append(chunk) + positions.append(i / sr) + k += 1 + if k == 0: # The full audio length is less than chunk_length + chunks = [audio] + positions = [0.0] + logger.info(f"[chunk_audio] Audio less than chunk_length. Returning original audio as chunk\r") + else: + logger.info(f"[chunk_audio] Audio is split into {k} chunks") + + return chunks, positions, sr + +def load_preprocessed_audio( + path: Path, + desired_sr: int, + mono: bool = False, + chunk_length: float = 15.0, + overlap: float = 2.0) -> list[tuple[np.ndarray, float, int]]: + + result = [] + # Load and resample audio + audio = resample_load(path, desired_sr, mono) # Stereo 2D matrix, Mono 1D array + if mono or (audio.ndim == 1): + # Chunk audio: mono (or the audio file loaded in itself is mono) + chunks, positions, _ = chunk_audio(audio, desired_sr, chunk_length, overlap) + assert len(chunks) == len(positions) + result.extend(zip(chunks, positions, [-1 for _ in range(len(chunks))])) + # (ndarray_chunk1, pos1, -1): first audio chunk, position1, -1 (Mono channel indicator) + else: + # Chunk audio: stereo/multichannel + for channel_id, channel_audio in enumerate(audio): + chunks, positions, _ = chunk_audio(channel_audio, desired_sr, chunk_length, overlap) + assert len(chunks) == len(positions) + result.extend(zip(chunks, positions, [channel_id for _ in range(len(chunks))])) + # (ndarray_chunk1, pos1, 0): first audio chunk, position1, 0 (channel 0) + logging.info(f"[load_preprocessed_audio] Loaded audio {path} ({desired_sr}Hz, Chunk {chunk_length}s with overlap {overlap}s) MONO:{mono}") + return result \ No newline at end of file diff --git a/mtafe_lab/dataset.py b/mtafe_lab/dataset.py new file mode 100644 index 0000000..83e4abd --- /dev/null +++ b/mtafe_lab/dataset.py @@ -0,0 +1,135 @@ +import platform +import os +import pickle +import random +import multiprocessing +import threading +import time +import concurrent.futures +import numpy as np +from pathlib import Path +import audiopreprocessing +import logging +import queue + +def serialize_dict_obj(path : Path, object : dict) -> int: + """Serializes Python Dictionary object to a file via Pickle. + + Args: + path (Path): Path to store the file + object (dict): Dictionary object to serialize + Returns: + int: size in bytes written + """ + # Horrible practice, horrible security, but it will work for now + with path.open("wb") as fp: + pickle.dump(object, fp) + fp.seek(0, os.SEEK_END) + size = fp.tell() + return size + +logging.info("Reading local dataset directory structure...") + +ASMRThreePath = Path("C:\\ASMRThree") +ASMRTwoPath = Path("D:\\ASMRTwo") +ASMROnePath = Path("E:\\ASMROne") + +if (platform.system() == 'Linux'): + ASMROnePath = Path('/mnt/Scratchpad/ASMROne') + ASMRTwoPath = Path('/mnt/MyStuffz/ASMRTwo') + ASMRThreePath = Path('/mnt/Windows11/ASMRThree') + +size_one, size_two, size_three = 0, 0, 0 +files_one, files_two, files_three = [], [], [] +folders_one, folders_two, folders_three = [], [], [] + +# Statistic calculation for ASMROne +for root, dirs, files in ASMROnePath.walk(): # Root will iterate through all folders + if root.absolute() != ASMROnePath.absolute(): # Skip root of ASMROnePath + folders_one.append(root) # Add folder to list + for fname in files: # Iterate through all files in current root + file = root/fname # Get file path + assert file.is_file() + files_one.append(file) + size_one += file.stat().st_size # Get file size + +# Statistic calculation for ASMRTwo +for root, dirs, files in ASMRTwoPath.walk(): # Root will iterate through all folders + if root.absolute() != ASMRTwoPath.absolute(): # Skip root of ASMRTwoPath + folders_two.append(root) # Add folder to list + for fname in files: # Iterate through all files in current root + file = root/fname # Get file path + assert file.is_file() + files_two.append(file) + size_two += file.stat().st_size # Get file size + +# Statistic calculation for ASMRThree +for root, dirs, files in ASMRThreePath.walk(): # Root will iterate through all folders + if root.absolute() != ASMRThreePath.absolute(): # Skip root of ASMRThreePath + folders_three.append(root) # Add folder to list + for fname in files: # Iterate through all files in current root + file = root/fname # Get file path + assert file.is_file() + files_three.append(file) + size_three += file.stat().st_size # Get file size + +DataSubsetPaths = [ASMROnePath, ASMRTwoPath, ASMRThreePath] +DLSiteWorksPaths = [] +# Collect ASMR Works (RJ ID, Paths) +for ASMRSubsetPath in DataSubsetPaths: + for WorkPaths in ASMRSubsetPath.iterdir(): + DLSiteWorksPaths.append(WorkPaths) + +fileExt2fileType = { + ".TXT": "Document", + ".WAV": "Audio", + ".MP3": "Audio", + ".PNG": "Image", + ".JPG": "Image", + ".VTT": "Subtitle", + ".PDF": "Document", + ".FLAC": "Audio", + ".MP4": "Video", + ".LRC": "Subtitle", + ".SRT": "Subtitle", + ".JPEG": "Image", + ".ASS": "Subtitle", + "": "NO EXTENSION", + ".M4A": "Audio", + ".MKV": "Video" +} +fileext_stat = {} +file_list = files_one + files_two + files_three +file_list_count = len(file_list) + +for file in file_list: + f_ext = file.suffix.upper() + if (f_ext in fileext_stat.keys()): + fileext_stat[f_ext]['Count'] += 1 + fileext_stat[f_ext]['List'].append(file) + fileext_stat[f_ext]['ExtensionMass'] += file.stat().st_size + else: + fileext_stat[f_ext] = {} + fileext_stat[f_ext]['Count'] = 1 + fileext_stat[f_ext]['List'] = [file] + fileext_stat[f_ext]['ExtensionMass'] = file.stat().st_size # The total sum of sizes of the same file extension + fileext_stat[f_ext]['MediaType'] = fileExt2fileType[f_ext] + +audio_paths = [] +for extension in fileext_stat: # I can't be bothered to convert this into a list compresion + if fileext_stat[extension]['MediaType'] == "Audio": + audio_paths += fileext_stat[extension]['List'] + +def random_audio_chunk(n : int, seed : int = 177013) -> list[Path]: + """Returns a random selection of audio files + + Args: + n (int): Amount of files to return + seed (int, optional): Seed for RNG. Defaults to 177013. + + Returns: + list[Path]: List of randomly selected audio paths (using Path object) + """ + random.seed(seed) + #return random.choices(audio_paths, k=n) # Contains repeated elements + return random.sample(audio_paths, k=n) \ No newline at end of file diff --git a/mtafe_lab/mtafe.py b/mtafe_lab/mtafe.py new file mode 100644 index 0000000..ad4667d --- /dev/null +++ b/mtafe_lab/mtafe.py @@ -0,0 +1,32 @@ +import logging +logging.basicConfig(format="%(asctime)s/%(levelname)s: [%(module)s] %(message)s", level=logging.INFO) + +import multiprocessing +import multiprocessing.process +import dataset +import audiopreprocessing +from pathlib import Path + +def copy_worker(origin_queue, target_queue): + p = origin_queue.get() + logging.info(f"Processing: {p}") + l = audiopreprocessing.load_preprocessed_audio(p, 32000, True) + print("Preprocess complete, putting it into queue") + target_queue.put(l) # Even on a small scale test, the process will always hang here + +if __name__ == "__main__": + audio_path_queue = multiprocessing.Queue() + audio_queue = multiprocessing.Queue() + + rand_paths = dataset.random_audio_chunk(1) + for p in rand_paths: + audio_path_queue.put(p) + + print("Files queued") + + processes = [multiprocessing.Process(target=copy_worker, args=(audio_path_queue, audio_queue)) for _ in range(1)] + for p in processes: p.start() + for p in processes: p.join() + + print("Joined") + #for _ in range(1): print(audio_queue.get()) \ No newline at end of file diff --git a/mtafe_lab/test_mp.py b/mtafe_lab/test_mp.py new file mode 100644 index 0000000..f6c5f20 --- /dev/null +++ b/mtafe_lab/test_mp.py @@ -0,0 +1,30 @@ +import logging +logging.basicConfig(format="%(asctime)s/%(levelname)s: [%(module)s] %(message)s", level=logging.INFO) + +import multiprocessing +from dataset import random_audio_chunk +import audiopreprocessing +from time import sleep + +origin_queue = multiprocessing.Queue() +target_queue = multiprocessing.Queue() + +def worker(orig, targ): + p = orig.get() + #out = "PROCESSED" + str(p.absolute()) + out = audiopreprocessing.load_preprocessed_audio(p, 16000, True) # This will cause put to hang + targ.put(out) # This will hang the process + +if __name__ == "__main__": + K = 2 + + for p in random_audio_chunk(K): + origin_queue.put(p) + + processes = [multiprocessing.Process(target=worker, args=(origin_queue, target_queue)) for _ in range(K)] + for p in processes: p.start() + for p in processes: p.join() + + logging.critical("Successfully terminated all threads") + + for _ in range(K): print(target_queue.get()) \ No newline at end of file diff --git a/mtafe_lab/test_mtafe.py b/mtafe_lab/test_mtafe.py new file mode 100644 index 0000000..e77eb52 --- /dev/null +++ b/mtafe_lab/test_mtafe.py @@ -0,0 +1,21 @@ +import logging +logging.basicConfig(format="%(asctime)s/%(levelname)s: [%(module)s] %(message)s", level=logging.INFO) + +import mtafe +from dataset import random_audio_chunk + +logging.info("Generating random audio path list") +rdpl = random_audio_chunk(2) + +logging.info("Initializing MTAFE") +mtafe.initialize_parameters( + paudio_paths=rdpl, + pmax_audio_in_queue=4, + paudio_feeder_threads=2, + pfeature_extractor_threads=1, + pdesired_sr=32000, + pforce_mono=False, + pchunk_length=15, + pchunk_overlap=2 +) +mtafe.test_feeder() \ No newline at end of file