Compare commits

..

3 Commits

Author SHA1 Message Date
b14a0a2a17 I am giving up on MTAFE 2025-04-19 20:05:35 +02:00
37b6a3c5e7 I'm burnt out, I can't get multithreaded audio feature extractor to work :( 2025-04-19 17:47:09 +02:00
b855b7e255 processing framework 2025-04-18 21:07:16 +02:00
15 changed files with 9292 additions and 10789 deletions

View File

@@ -0,0 +1,15 @@
My implementation attempt for a Multi-Threaded Audio Feature Extractor... my attempt ended in misery.
My vision is a program that is multi-threaded, that will do audio pre-processing and feature extraction in different threads. There should be `i` threads that will do pre-processing on all given audio file paths, and there should be `j` threads that will do feature extraction. If the audio pre-processing pipeline is single-threaded, it will pose a bottleneck to the entire program. But the feature extractor itself is also a bottleneck, since all audio embedding extractor rely on GPU inference, the feature extraction process must be single-threaded on my computer.
I was trying to adapt the program for multiple threads for audio pre-processing AND multi-threaded for feature extraction (for beefier GPU that can handle more inference threads)
Unfortunately... All my attempts has ended in misery, my multi-threaded code is littered with performance issues and deadlocks. Python isn't exactly the best language for multi-threaded code due to the existence of GIL. I am trying to implement a multi-producer, multi-consumer model here. The best attempt I was able to do will hang for a long time waiting for the producer (audio feeder) to pre-process the audio, and put it on the shared queue. It will lock up for a really long time, but after that, it will process everything in light speed. But when it's nearing the end, there is a great chance that the program will deadlock itself. I wasn't able to debug, and the profile didn't really yield any result that are useful to me.
At one point I even relied on AI, and I still wasn't getting a consistent result, the AI generated a code that was significantly faster, with less deadlock, but has the issue of skipping audio files due to them not being pre-processed in time. I could implement additional logic to catch processing errors, and retry if possible. But I am really burnt out, and I would look for better alternatives.
The next thing I am going to try is to separate this program into two, this program attempts to do pre-processing AND feature extraction in the same time. I would split the process into two. One program (preferably multi-threaded) that will do all the audio pre-processing (resampling, chunking, etc.), and it will output the pre-processed audio into a serialized pickle file, or any other serialization formats.
I can see various issues with this approach, the most important of which is space, I am basically taking all of those audio files (which is NOT a small amount), and I am re-encoding it, without any compression. Even though I have decided to lower the audio's bit-rate (fro, the typical 48000 Hz or 192000 Hz to just 32000 Hz, or in specific embedding extraction models: 8000 Hz or 16000 Hz), this will still take up a lot of space.
Also the pickle won't be the best format for storing all of those audio, safety issue is one of them, but the alternative of encoding each chunk into FLAC/MP3 compressed format, will be very heavy on the file system. Even though I do have a SSD. I am uncertain if the filesystem, handling hundred of thousands of audio chunk files will have a hit on the performance and the life of the SSD.
But at least this will be a lot easier to implement.

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@@ -3,18 +3,31 @@ import pickle
import os import os
import numpy as np import numpy as np
from pathlib import Path from pathlib import Path
import logging
DEBUG=True logger = logging.getLogger(__name__)
def triggerlog():
logger.critical("Testing: info")
def resample_load(input_path : Path, target_sr : int = 16000, mono_audio : bool = False) -> np.ndarray: # AI def resample_load(input_path : Path, target_sr : int = 16000, mono_audio : bool = False) -> np.ndarray: # AI
"""Resample audio to target sample rate and save to output directory""" """Load and resamples the audio into `target_sr`.
Args:
input_path (Path): pathlib.Path object to audio file
target_sr (int, optional): Target Sample Rate to resample. Defaults to 16000.
mono_audio (bool, optional): Load the audio in mono mode. Defaults to False.
Returns:
np.ndarray: _description_
"""
# Load audio file with original sample rate # Load audio file with original sample rate
if DEBUG: print("[resample_load] Loading audio", input_path) logger.info(f"[resample_load] Loading audio {input_path}")
audio, orig_sr = librosa.load(input_path, sr=None, mono=mono_audio) audio, orig_sr = librosa.load(input_path, sr=None, mono=mono_audio)
# Resample if necessary # Resample if necessary
if orig_sr != target_sr: if orig_sr != target_sr:
if DEBUG: print("[resample_load] Resampling to", target_sr) logger.info(f"[resample_load] Resampling to {target_sr}")
audio = librosa.resample(audio, orig_sr=orig_sr, target_sr=target_sr) audio = librosa.resample(audio, orig_sr=orig_sr, target_sr=target_sr)
return audio return audio
@@ -24,7 +37,7 @@ def chunk_audio(audio : np.ndarray, sr: int, chunk_length: float = 10.0, overlap
Chunks audio file into overlapping segments. Only pass in mono audio here. Chunks audio file into overlapping segments. Only pass in mono audio here.
Args: Args:
audio_file: Loaded audio ndarray audio_file: Loaded audio ndarray (one channel only)
sr: Sample rate for the given audio file sr: Sample rate for the given audio file
chunk_length: Length of each chunk in seconds chunk_length: Length of each chunk in seconds
overlap: Overlap between chunks in seconds overlap: Overlap between chunks in seconds
@@ -32,7 +45,7 @@ def chunk_audio(audio : np.ndarray, sr: int, chunk_length: float = 10.0, overlap
Returns: Returns:
List of audio chunks, list of chunk positions, and given sample rate List of audio chunks, list of chunk positions, and given sample rate
""" """
if DEBUG: print("[chunk_audio] Chunking audio") logger.info(f"[chunk_audio] Chunking audio ({len(audio) / sr}s)")
# Calculate chunk size and hop length in samples # Calculate chunk size and hop length in samples
chunk_size = int(chunk_length * sr) chunk_size = int(chunk_length * sr)
hop_length = int((chunk_length - overlap) * sr) hop_length = int((chunk_length - overlap) * sr)
@@ -46,10 +59,12 @@ def chunk_audio(audio : np.ndarray, sr: int, chunk_length: float = 10.0, overlap
chunks.append(chunk) chunks.append(chunk)
positions.append(i / sr) positions.append(i / sr)
k += 1 k += 1
if DEBUG: print("[chunk_audio] Chunked", k, end="\r")
if k == 0: # The full audio length is less than chunk_length if k == 0: # The full audio length is less than chunk_length
chunks = [audio] chunks = [audio]
positions = [0.0] positions = [0.0]
logger.info(f"[chunk_audio] Audio less than chunk_length. Returning original audio as chunk\r")
else:
logger.info(f"[chunk_audio] Audio is split into {k} chunks")
return chunks, positions, sr return chunks, positions, sr

View File

@@ -2,14 +2,15 @@ import platform
import os import os
import pickle import pickle
import random import random
import multiprocessing
import threading import threading
import time import time
import concurrent.futures import concurrent.futures
import numpy as np import numpy as np
from pathlib import Path from pathlib import Path
import audiopreprocessing import audiopreprocessing
import logging
DEBUG=True import queue
def serialize_dict_obj(path : Path, object : dict) -> int: def serialize_dict_obj(path : Path, object : dict) -> int:
"""Serializes Python Dictionary object to a file via Pickle. """Serializes Python Dictionary object to a file via Pickle.
@@ -27,7 +28,7 @@ def serialize_dict_obj(path : Path, object : dict) -> int:
size = fp.tell() size = fp.tell()
return size return size
print("Reading local dataset directory structure...") logging.info("Reading local dataset directory structure...")
ASMRThreePath = Path("C:\\ASMRThree") ASMRThreePath = Path("C:\\ASMRThree")
ASMRTwoPath = Path("D:\\ASMRTwo") ASMRTwoPath = Path("D:\\ASMRTwo")
@@ -133,61 +134,205 @@ def random_audio_chunk(n : int, seed : int = 177013) -> list[Path]:
#return random.choices(audio_paths, k=n) # Contains repeated elements #return random.choices(audio_paths, k=n) # Contains repeated elements
return random.sample(audio_paths, k=n) return random.sample(audio_paths, k=n)
class AudioFeatureExtractor(): # class AudioFeatureExtractor():
__audio_queue: list[ # List of ... # __audio_queue: list[ # List of ...
# tuple[ # Pair of chunked audio and its path
# list[tuple[np.ndarray, float, int]], # Chunked audio
# Path # Path to original audio
# ]
# ] # Listed of Chunked/Resampled audio
# __feeder_future: concurrent.futures.Future
# __extractor_future: concurrent.futures.Future
# __audio_paths_list: list[Path]
# __max_audio_in_queue: int
# __queue_lock: threading.Lock
# __desired_sr: int
# __mono: bool
# __chunk_length: float
# __overlap: float
# __features: dict[Path, list[tuple[np.ndarray, float, int]]] # This is a crime, I know
# # { audioPath:
# # [(embedding, pos, channel)...]
# # }
# def __embedding_inference(self, audio_ndarray: np.ndarray) -> np.ndarray:
# """Uses embedding model to inference an audio. Returns embedding vectors.
# Function to be overrided. Returns np.zeros(32).
# Args:
# audio_ndarray (np.ndarray):
# Returns:
# np.ndarray: _description_
# """
# return np.zeros(32)
# def __embedding_extract(self, audio: tuple[np.ndarray, float, int]) -> tuple[np.ndarray, float, int, np.ndarray]:
# """Receives a tuple of audio, position, and channel ID, then adding the embedding to the tuple
# Args:
# audio (tuple[np.ndarray, float, int]): tuple of audio, position, channel id
# Returns:
# tuple[np.ndarray, float, int, np.ndarray]: audio, position, channel id, embedding vector
# """
# audio_chunk, pos, channel_id = audio
# return (audio_chunk, pos, channel_id, self.__embedding_inference(audio_chunk))
# def __audio_queue_feeder(self): # TODO: Upgrade to multithreaded loader?
# """Internal thread function. Preprocess and load the audio continuously to
# audio_queue until the end of the audio_paths_list
# """
# while (self.__audio_paths_list): # While there are still Path elements in path list
# if (not (len(self.__audio_queue) < self.__max_audio_in_queue)):
# logging.info("[AFE] [Audio Queue Thread]: Queue Full, feeder thread sleeping for 5 seconds")
# time.sleep(5)
# while(len(self.__audio_queue) < self.__max_audio_in_queue): # While the audio queue is not full
# new_audio_path = self.__audio_paths_list[0]
# new_audio = audiopreprocessing.load_preprocessed_audio(
# new_audio_path,
# self.__desired_sr,
# self.__mono,
# self.__chunk_length,
# self.__overlap
# )
# with self.__queue_lock:
# self.__audio_queue.append(
# (new_audio, new_audio_path)
# )
# pop_path = self.__audio_paths_list.pop(0)
# logging.info(f"[AFE] [Audio Queue Thread]: Added new audio to queue {pop_path}")
# logging.info("[AFE] [Audio Queue Thread]: DONE. All audio files fed")
# def __audio_queue_feature_extractor(self):
# """Internal thread function. Get audio from audio queue. And extract embedding vector
# for all audio chunks. Stores the resulting embedding into self.__features.
# With Original Audio's Path as key, and list[tuple[np.ndarray, float, int]] (list of tuple of embedding vector, position, channel id)
# """
# while (self.__audio_paths_list or self.__audio_queue): # While there are still audio to be processed
# if (self.__audio_queue): # If audio queue is not empty
# with self.__queue_lock:
# audio_to_process, audio_path = self.__audio_queue.pop(0) # Get audio from queue
# logging.info(f"[AFE] [Feature Extractor Thread]: Extracting {len(audio_to_process)} features from audio {audio_path}")
# for audio_chunk in audio_to_process:
# same_audio_chunk, timepos, channel_id, embedd_vect = self.__embedding_extract(audio_chunk)
# if (audio_path not in self.__features.keys()):
# #if DEBUG: print("Adding new vector to", audio_path.name)
# self.__features[audio_path] = [(embedd_vect, timepos, channel_id)]
# else:
# #if DEBUG: print("Adding vector to", audio_path.name)
# self.__features[audio_path].append(
# (embedd_vect, timepos, channel_id)
# )
# else:
# logging.info("[AFE] [Feature Extractor Thread]: Queue Empty, extractor thread sleeping for 5 seconds") # If audio queue is empty, wait
# time.sleep(5)
# logging.info("[AFE] [Feature Extractor Thread]: DONE. Extracted all features from all audio files")
# def __init__(
# self,
# audio_paths_list: list[Path],
# max_audio_in_queue: int,
# desired_sr: int,
# mono: bool,
# chunk_length: float = 15.0,
# overlap: float = 2.0
# ):
# self.__audio_queue = []
# self.__audio_paths_list = audio_paths_list
# self.__max_audio_in_queue = max_audio_in_queue
# self.__queue_lock = threading.Lock()
# self.__desired_sr = desired_sr
# self.__mono = mono
# self.__chunk_length = chunk_length
# self.__overlap = overlap
# self.__features = {}
# @property
# def features(self) -> dict[Path, list[tuple[np.ndarray, float, int]]]:
# return self.__features
# def extract(self):
# print("Starting feature extraction for", len(self.__audio_paths_list), "file(s)")
# total_amount = len(self.__audio_paths_list)
# t_start = time.perf_counter()
# with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
# self.__feeder_future = executor.submit(self.__audio_queue_feeder)
# self.__extractor_future = executor.submit(self.__audio_queue_feature_extractor)
# while (self.__feeder_future.running() or self.__extractor_future.running()):
# print(f"Processed {len(self.__features)}/{total_amount} (L:{len(self.__audio_queue)}/W{len(self.__audio_paths_list)})", end="\r")
# time.sleep(1)
# t_stop = time.perf_counter()
# print(f"Processed {len(self.__features)}/{total_amount} (L:{len(self.__audio_queue)}/W:{len(self.__audio_paths_list)} COMPLETE)")
# delta_t = t_stop - t_start
# total_features = sum( [len(self.__features[path]) for path in self.__features] )
# print()
# print("Extraction completed")
# print(f"Took {delta_t} seconds. Added {total_features} vectors/embeddings")
class MultiThreadedAudioFeatureExtractor():
# This is the third time I am rewriting this, please send help. Multithreaded apps is pure hell to develop and debug
# After testing: this will hang at the last audio, precisely at preprocessing audio. I suspect that GIL hit the performance
# so much to the point that the preprocessing routine cannot get any share of the CPU execution cycle
__audio_queue: queue.Queue[ # List of ...
tuple[ # Pair of chunked audio and its path tuple[ # Pair of chunked audio and its path
list[tuple[np.ndarray, float, int]], # Chunked audio list[tuple[np.ndarray, float, int]], # Chunked audio list of (ndarray, time position of chunk relative to original audio, channel_id)
Path # Path to original audio Path # Path to original audio
] ]
] # Listed of Chunked/Resampled audio ] # Listed of Chunked/Resampled audio
__feeder_future: concurrent.futures.Future __audio_feeder_threads: int # Amount of audio feeder threads
__extractor_future: concurrent.futures.Future __feature_extractor_threads: int # Amount of feature extractor threads (if the method allows)
__audio_paths_list: list[Path] __audio_paths_list: queue.Queue[Path] # Path list to audio
__max_audio_in_queue: int __max_audio_in_queue: int # Maximum audio in queue
__queue_lock: threading.Lock __audio_feeder_barrier: threading.Barrier # Synchronization barrier for all audio feeder threads
__desired_sr: int # Audio Feeder parameter
__mono: bool __desired_sr: int # Desired Sample Rate (Resampling)
__chunk_length: float __mono: bool # Force load audio in mono mode
__chunk_length: float # Audio chunk length
__overlap: float __overlap: float
__features: dict[Path, list[tuple[np.ndarray, float, int]]] # Result
# { audioPath: __features: dict[Path, list[tuple[np.ndarray, float, int]]] # This is a crime, I know
# [(embedding, pos, channel)...] __features_lock: threading.Lock
# __features: { audioPath:
# [(embedding1, pos1, channel1),
# (embedding2, pos2, channel1)]
# ...
# } # }
# Runtime
__audio_feeder_threadpool: list[concurrent.futures.Future]
__feature_extractor_threadpool: list[concurrent.futures.Future]
def __embedding_inference(self, audio_ndarray: np.ndarray) -> np.ndarray: def __audio_inference_embedding(self, audio: list[tuple[np.ndarray, float, int]]) -> list[tuple[np.ndarray, float, int]]:
"""Uses embedding model to inference an audio. Returns embedding vectors. """Receives a list of audio chunks, and then extracts embeddings for all audio chunks, returns the resulting embedding as a list of tuples(embedding, time, channel_id)
Function to be overrided. Returns np.zeros(32).
Args: Args:
audio_ndarray (np.ndarray): audio (list[tuple[np.ndarray, float, int]]): list of audio chunks
Returns: Returns:
np.ndarray: _description_ list[tuple[np.ndarray, float, int]]: List of (embedding vector, timepos, channel id)
""" """
return np.zeros(32) features = []
for audio_chunk in audio:
audio, timepos, channel_id = audio_chunk
zero = np.zeros(32)
features.append( (zero, timepos, channel_id) )
time.sleep(0.01) # Simulate effort, change to simulate spent seconds in each audio file
return features
# To be overridden
def __embedding_extract(self, audio: tuple[np.ndarray, float, int]) -> tuple[np.ndarray, float, int, np.ndarray]: def __audio_feeder_thread(self, thread_id: int, barrier: threading.Barrier):
"""Receives a tuple of audio, position, and channel ID, then adding the embedding to the tuple try:
while True:
Args: # Attempt to get audio path from audio path queue
audio (tuple[np.ndarray, float, int]): tuple of audio, position, channel id new_audio_path = self.__audio_paths_list.get()
# Check thread exit condition (If the queue returns None, that means the audio path queue is now empty and the thread should end itself)
Returns: if (new_audio_path is None):
tuple[np.ndarray, float, int, np.ndarray]: audio, position, channel id, embedding vector self.__audio_paths_list.put(new_audio_path) # Put None back to notify other audio feeder threads
""" # Omae wa mou shindeiru
audio_chunk, pos, channel_id = audio break # Si la ETSISI ve esto seguramente me echarán de la escuela
return (audio_chunk, pos, channel_id, self.__embedding_inference(audio_chunk)) # Now that the audio path queue is not empty, try preprocessing an audio
logging.info(f"[MTAFE] [Audio Feeder {thread_id}] Preprocess: {new_audio_path.absolute()}")
def __audio_queue_feeder(self): # TODO: Upgrade to multithreaded loader?
"""Internal thread function. Preprocess and load the audio continuously to
audio_queue until the end of the audio_paths_list
"""
while (self.__audio_paths_list): # While there are still Path elements in path list
if (not (len(self.__audio_queue) < self.__max_audio_in_queue)):
if DEBUG: print("Audio Queue Thread: Queue Full, feeder thread sleeping for 5 seconds")
time.sleep(5)
while(len(self.__audio_queue) < self.__max_audio_in_queue): # While the audio queue is not full
new_audio_path = self.__audio_paths_list[0]
new_audio = audiopreprocessing.load_preprocessed_audio( new_audio = audiopreprocessing.load_preprocessed_audio(
new_audio_path, new_audio_path,
self.__desired_sr, self.__desired_sr,
@@ -195,78 +340,176 @@ class AudioFeatureExtractor():
self.__chunk_length, self.__chunk_length,
self.__overlap self.__overlap
) )
with self.__queue_lock: self.__audio_queue.put((new_audio, new_audio_path)) # In theory, this should block this audio feeder thread when the audio queue is full
self.__audio_queue.append( logging.info(f"[MTAFE] [Audio Feeder {thread_id}] Feed: {new_audio_path.absolute()}")
(new_audio, new_audio_path) logging.info("[MTAFE] [Audio Feeder {thread_id}] Waiting for other threads to finish")
) barrier.wait()
pop_path = self.__audio_paths_list.pop(0) if (thread_id == 0):
if DEBUG: print("Audio Queue Thread: Added new audio to queue", pop_path) self.__audio_queue.put(None) # None to signal audio_queue has no more elements to process
if DEBUG: print("Audio Queue Thread: DONE. All audio files fed") logging.info(f"[MTAFE] [Audio Feeder {thread_id}] Thread finished!")
except Exception as e:
logging.error(f"[MTAFE] [Audio Feeder {thread_id}] An exception occurred! Committing seppuku!")
logging.exception(e)
return
def __audio_queue_feature_extractor(self): # while (not self.__audio_paths_list.empty()):
"""Internal thread function. Get audio from audio queue. And extract embedding vector # if (not self.__audio_queue.full()):
for all audio chunks. Stores the resulting embedding into self.__features. # # Feed audio
With Original Audio's Path as key, and list[tuple[np.ndarray, float, int]] (list of tuple of embedding vector, position, channel id) # new_audio_path = self.__audio_paths_list.get()
""" # self.__audio_paths_list.task_done()
while (self.__audio_paths_list or self.__audio_queue): # While there are still audio to be processed # logging.info(f"[MTAFE] [Audio Feeder {thread_id}] Preprocess: {new_audio_path.absolute()}")
if (self.__audio_queue): # If audio queue is not empty # new_audio = audiopreprocessing.load_preprocessed_audio(
with self.__queue_lock: # new_audio_path,
audio_to_process, audio_path = self.__audio_queue.pop(0) # Get audio from queue # self.__desired_sr,
if DEBUG: print(f"Feature Extractor Thread: Extracting {len(audio_to_process)} features from audio", audio_path) # self.__mono,
for audio_chunk in audio_to_process: # self.__chunk_length,
same_audio_chunk, timepos, channel_id, embedd_vect = self.__embedding_extract(audio_chunk) # self.__overlap
if (audio_path not in self.__features.keys()): # )
#if DEBUG: print("Adding new vector to", audio_path.name) # self.__audio_queue.put((new_audio, new_audio_path))
self.__features[audio_path] = [(embedd_vect, timepos, channel_id)] # logging.info(f"[MTAFE] [Audio Feeder {thread_id}] Feed: {new_audio_path.absolute()}")
else: # logging.info(f"[MTAFE] [Audio Feeder {thread_id}] Thread finished!")
#if DEBUG: print("Adding vector to", audio_path.name)
self.__features[audio_path].append(
(embedd_vect, timepos, channel_id)
)
else:
if DEBUG: print("Feature Extractor Thread: Queue Empty, extractor thread sleeping for 5 seconds") # If audio queue is empty, wait
time.sleep(5)
if DEBUG: print("Feature Extractor Thread: DONE. Extracted all features from all audio files")
def __init__( #def testfeedthread(self, nthreads):
self, # t1 = threading.Thread(target=self.__audio_feeder_thread, args=(1,))
audio_paths_list: list[Path], # t2 = threading.Thread(target=self.__audio_feeder_thread, args=(2,))
max_audio_in_queue: int, # t1.start(); t2.start()
desired_sr: int, # #with self.__audio_feed_condition:
mono: bool, # # self.__audio_feed_condition.notify_all()
chunk_length: float = 15.0, # t1.join(); t2.join()
overlap: float = 2.0 # with concurrent.futures.ThreadPoolExecutor(max_workers=nthreads) as executor:
): # for i in range(nthreads):
self.__audio_queue = [] # ft = executor.submit(self.__audio_feeder_thread, i)
self.__audio_paths_list = audio_paths_list # self.__audio_loader_threadpool.append(ft)
self.__max_audio_in_queue = max_audio_in_queue
self.__queue_lock = threading.Lock() def __check_all_audiofeed_thread_finished(self) -> bool:
self.__desired_sr = desired_sr for ft in self.__audio_feeder_threadpool:
self.__mono = mono if ft.running():
self.__chunk_length = chunk_length return False
self.__overlap = overlap return True
self.__features = {}
def __check_all_featureextractor_thread_finished(self) -> bool:
for ft in self.__feature_extractor_threadpool:
if ft.running():
return False
return True
def __feature_extractor_thread(self, thread_id):
while True:
# Attempt to get next audio chunks to process
next_audio_tuple = self.__audio_queue.get()
# Check thread exit condition
if (next_audio_tuple is None):
self.__audio_queue.put(next_audio_tuple) # Put the None back to notify other threads
break # unalive urself
else: # Assuming we got more tuples
current_audio_to_process, current_audio_path = next_audio_tuple # Deconstruct tuple
logging.info(f"[MTAFE] [Feature Extractor {thread_id}] Extracting: {current_audio_path}")
features_to_add = self.__audio_inference_embedding(current_audio_to_process)
with self.__features_lock:
self.__features[current_audio_path] = features_to_add
logging.info(f"[MTAFE] [Feature Extractor {thread_id}] Feature Extraction complete for {current_audio_path} w/ {len(features_to_add)} features")
logging.info(f"[MTAFE] [Feature Extractor {thread_id}] Thread finished!")
# while (not self.__check_all_audiofeed_thread_finished() or not self.__audio_queue.empty()):
# if (not self.__audio_queue.empty()):
# audio_to_process, audio_path = self.__audio_queue.get()
# self.__audio_queue.task_done()
# logging.info(f"[MTAFE] [Feature Extractor {thread_id}] Extracting: {audio_path}")
# features_to_add = self.__audio_inference_embedding(audio_to_process)
# logging.info(f"[MTAFE] [Feature Extractor {thread_id}] Extracted: {len(features_to_add)} features")
# with self.__features_lock:
# self.__features[audio_path] = features_to_add
# #with self.__audio_feed_condition: self.__audio_feed_condition.notify_all()
# logging.info(f"[MTAFE] [Feature Extractor {thread_id}] Feature Extraction complete for {audio_path} w/ {len(features_to_add)} features")
#else:
# if (not self.__check_all_audiofeed_thread_finished()):
# with self.__audio_feed_condition:
# logging.info(f"[MTAFE] [Feature Extractor {thread_id}] Audio queue empty: waiting")
# self.__audio_feed_condition.wait(10)
# self.__audio_feed_condition.wait_for(lambda: not self.__audio_queue.empty())
def __count_running_threads(self) -> tuple[int, int]:
running_extractors = 0
running_feeders = 0
for ft in self.__feature_extractor_threadpool:
if ft.running(): running_extractors += 1
for ft in self.__audio_feeder_threadpool:
if ft.running(): running_feeders += 1
return (running_feeders, running_extractors)
@property @property
def features(self) -> dict[Path, list[tuple[np.ndarray, float, int]]]: def features(self) -> dict[Path, list[tuple[np.ndarray, float, int]]]:
return self.__features return self.__features
def extract(self): def extract(self):
print("Starting feature extraction for", len(self.__audio_paths_list), "file(s)") total_amount = self.__audio_paths_list.qsize() - 1 # Account for None to indicate queue end
total_amount = len(self.__audio_paths_list) logging.info(f"[MTAFE] [Main] Starting feature extraction for {total_amount} file(s)")
t_start = time.perf_counter() t_start = time.perf_counter() # Timer
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor: with concurrent.futures.ProcessPoolExecutor(max_workers=(self.__audio_feeder_threads + self.__feature_extractor_threads)) as executor:
self.__feeder_future = executor.submit(self.__audio_queue_feeder) # Audio feeder threads
self.__extractor_future = executor.submit(self.__audio_queue_feature_extractor) for i in range(self.__audio_feeder_threads):
while (self.__feeder_future.running() or self.__extractor_future.running()): logging.info(f"[MTAFE] Started audio feeder thread {i}")
print(f"Processed {len(self.__features)}/{total_amount} (L:{len(self.__audio_queue)}/W{len(self.__audio_paths_list)})", end="\r") ld_ft = executor.submit(self.__audio_feeder_thread, i, self.__audio_feeder_barrier)
time.sleep(1) self.__audio_feeder_threadpool.append(ld_ft)
# Feature extractor threads
for i in range(self.__feature_extractor_threads):
logging.info(f"[MTAFE] Started feature extractor thread {i}")
ex_ft = executor.submit(self.__feature_extractor_thread, i)
self.__feature_extractor_threadpool.append(ex_ft)
# Progress checking
while ( (not self.__check_all_audiofeed_thread_finished()) and (not self.__check_all_featureextractor_thread_finished()) ):
nfeeder, nextract = self.__count_running_threads()
print(f"[MTAFE Progress] Processed {len(self.__features)}/{total_amount} (L:{self.__audio_queue.qsize()}/W:{self.__audio_paths_list.qsize()}, LD:{nfeeder}/EXT:{nextract})", end="\r")
t_stop = time.perf_counter() t_stop = time.perf_counter()
print(f"Processed {len(self.__features)}/{total_amount} (L:{len(self.__audio_queue)}/W:{len(self.__audio_paths_list)} COMPLETE)") logging.info(f"[MTAFE] Processed {len(self.__features)}/{total_amount} (L:{self.__audio_queue.qsize() - 1}/W:{self.__audio_paths_list.qsize() - 1} COMPLETE)")
delta_t = t_stop - t_start delta_t = t_stop - t_start
total_features = sum( [len(self.__features[path]) for path in self.__features] ) total_features = sum( [len(self.__features[path]) for path in self.__features] )
print() logging.info(f"[MTAFE] Extraction complete. Took {delta_t} seconds. Added {total_features} vectors/embeddings")
print("Extraction completed")
print(f"Took {delta_t} seconds. Added {total_features} vectors/embeddings")
def __init__(
self,
audio_paths: list[Path],
max_audio_in_queue: int = 16,
audio_feeder_threads: int = 8,
feature_extractor_threads: int = 8,
desired_sr: int = 32000,
force_mono: bool = False,
chunk_length: float = 15.0,
chunk_overlap: float = 2.0,
):
# Check if the paths passed in are all valid and add them to queue
self.__audio_paths_list = multiprocessing.Queue()
for p in audio_paths:
if not p.is_file():
raise Exception(f"Path '{p.absolute()}' is NOT a valid file!")
else:
self.__audio_paths_list.put(p)
self.__audio_paths_list.put(None) # To signal to the producer that the audio path list is empty, since Queue.empty() is unreliable
logging.info(f"[MTAFE] [Constructor] Queued {self.__audio_paths_list.qsize() - 1} files")
# Set up private attributes
## Audio preprocessing parameters
self.__desired_sr = desired_sr
self.__mono = force_mono
self.__chunk_length = chunk_length
self.__overlap = chunk_overlap
## Extractor/Feeder settings
self.__max_audio_in_queue = max_audio_in_queue
self.__audio_feeder_threads = audio_feeder_threads
self.__feature_extractor_threads = feature_extractor_threads
## Set up runtime conditions
self.__audio_queue = multiprocessing.Queue(maxsize=self.__max_audio_in_queue)
self.__features = {}
self.__features_lock = multiprocessing.Lock()
self.__audio_feeder_barrier = multiprocessing.Barrier(self.__audio_feeder_threads)
self.__audio_feeder_threadpool = []
self.__feature_extractor_threadpool = []
logging.info(f"[MTAFE] [Constructor] Extraction parameters: {desired_sr}Hz, Mono: {force_mono}, Divide into {chunk_length}s chunks with {chunk_overlap}s of overlap")
logging.info(f"[MTAFE] [Constructor] Using {audio_feeder_threads} threads for preprocessing audio and {feature_extractor_threads} threads for feature extraction. Max queue size of {max_audio_in_queue} files")
# More audio embeddings specific code below (To be overridden)

View File

@@ -0,0 +1,8 @@
import dataset_files
import multiprocessing
import logging
import numpy as np
import threading
import queue
from pathlib import Path

View File

@@ -0,0 +1,193 @@
from dataset_files import MultiThreadedAudioFeatureExtractor
from pathlib import Path
from panns_inference import AudioTagging
import logging
import numpy as np
import queue
import concurrent.futures
import threading
import time
import audiopreprocessing
#import torch
#import gc
class mtafe_panns():
__audio_queue: queue.Queue[ # List of ...
tuple[ # Pair of chunked audio and its path
list[tuple[np.ndarray, float, int]], # Chunked audio
Path # Path to original audio
]
] # Listed of Chunked/Resampled audio
__audio_loader_threads: int # Amount of audio feeder threads
__feature_extractor_threads: int # Amount of feature extractor threads (if the method allows)
__audio_paths_list: queue.Queue[Path] # Path list to audio
__max_audio_in_queue: int # Maximum audio in queue
__desired_sr: int
__mono: bool
__chunk_length: float
__overlap: float
__features: dict[Path, list[tuple[np.ndarray, float, int]]] # This is a crime, I know
__features_lock: threading.Lock
__audio_loader_threadpool: list[concurrent.futures.Future]
__feature_extractor_threadpool: list[concurrent.futures.Future]
__at: AudioTagging
__batch_size: int
def __init__(self,
audio_paths: list[Path],
max_audio_in_queue: int = 16,
audio_feeder_threads: int = 8,
feature_extractor_threads: int = 8,
desired_sr: int = 32000,
force_mono: bool = False,
chunk_length: float = 15.0,
chunk_overlap: float = 2.0,
batch_size: int = 20
):
# Check if the paths passed in are all valid and add them to queue
self.__audio_paths_list = queue.Queue()
for p in audio_paths:
if not p.is_file():
raise Exception(f"Path '{p.absolute()}' is NOT a valid file!")
else:
self.__audio_paths_list.put(p)
#self.__audio_paths_list.task_done()
logging.info(f"[MTAFE] [Constructor] Queued {self.__audio_paths_list.qsize()} files")
# Set up private attributes
## Audio preprocessing parameters
self.__desired_sr = desired_sr
self.__mono = force_mono
self.__chunk_length = chunk_length
self.__overlap = chunk_overlap
## Extractor/Feeder settings
self.__max_audio_in_queue = max_audio_in_queue
self.__audio_loader_threads = audio_feeder_threads
self.__feature_extractor_threads = feature_extractor_threads
## Set up runtime conditions
self.__audio_queue = queue.Queue(maxsize=max_audio_in_queue)
self.__features = {}
self.__features_lock = threading.Lock()
self.__audio_loader_threadpool = []
self.__feature_extractor_threadpool = []
logging.info(f"[MTAFE] [Constructor] Extraction parameters: {desired_sr}Hz, Mono: {force_mono}, Divide into {chunk_length}s chunks with {chunk_overlap}s of overlap")
logging.info(f"[MTAFE] [Constructor] Using {audio_feeder_threads} threads for preprocessing audio and {feature_extractor_threads} threads for feature extraction. Max queue size of {max_audio_in_queue} files")
logging.info(f"[MTAFE] [Constructor] Initializing PANNs")
logging.info(f"[MTAFE] [Constructor] Inferencing with batch size {batch_size}")
self.__at = AudioTagging(checkpoint_path=None, device='cuda')
self.__batch_size = batch_size
def __chunks(self, lst, n):
# Stolen straight from Stackoverflow
"""Yield successive n-sized chunks from lst."""
for i in range(0, len(lst), n):
yield lst[i:i + n]
def __audio_inference_embedding(self, audio: list[tuple[np.ndarray, float, int]]) -> list[tuple[np.ndarray, float, int]]:
audio_chunk_list = []
timepos_list = []
channel_id_list = []
embedding_list = []
# Split into equal sized list
for audio_chunk, timepos, channel in audio:
audio_chunk_list.append(audio_chunk)
timepos_list.append(timepos)
channel_id_list.append(channel)
# Convert audio_chunk_list into numpy array
audio_chunk_list = np.array(audio_chunk_list)
#logging.info("[MTAFE] [PANNs] Inferencing...")
try:
for i, batch in enumerate(self.__chunks(audio_chunk_list, self.__batch_size)):
(clipwise_output, embedding) = self.__at.inference(batch)
for vect in embedding: # vect: np.ndarray
embedding_list.append(vect)
logging.info(f"[MTAFE] [PANNs] Inferenced batch {i}")
assert len(audio_chunk_list) == len(timepos_list) == len(channel_id_list) == len(embedding_list)
except Exception as e:
logging.critical("[MTAFE] [PANNs] ERROR! INFERENCE FAILED!!! OR LIST SIZE MISMATCH")
logging.critical(e)
embedding_list = [None for _ in audio_chunk_list] # Clearing embedding_list and filling it with None
return list(zip(embedding_list, channel_id_list, embedding_list))
def __audio_feeder_thread(self, thread_id):
while (not self.__audio_paths_list.empty()):
new_audio_path = self.__audio_paths_list.get()
self.__audio_paths_list.task_done()
logging.info(f"[MTAFE] [Audio Feeder {thread_id}] Preprocess: {new_audio_path.absolute()}")
new_audio = audiopreprocessing.load_preprocessed_audio(
new_audio_path,
self.__desired_sr,
self.__mono,
self.__chunk_length,
self.__overlap
)
self.__audio_queue.put((new_audio, new_audio_path))
logging.info(f"[MTAFE] [Audio Feeder {thread_id}] Feed: {new_audio_path.absolute()}")
logging.info(f"[MTAFE] [Audio Feeder {thread_id}] Thread finished!")
def __check_all_audiofeed_thread_finished(self) -> bool:
for ft in self.__audio_loader_threadpool:
if ft.running():
return False
return True
def __check_all_featureextractor_thread_finished(self) -> bool:
for ft in self.__feature_extractor_threadpool:
if ft.running():
return False
return True
def __feature_extractor_thread(self, thread_id):
while (not self.__check_all_audiofeed_thread_finished() or not self.__audio_queue.empty()):
if (not self.__audio_queue.empty()):
audio_to_process, audio_path = self.__audio_queue.get()
self.__audio_queue.task_done()
logging.info(f"[MTAFE] [Feature Extractor {thread_id}] Extracting: {audio_path}")
features_to_add = self.__audio_inference_embedding(audio_to_process)
logging.info(f"[MTAFE] [Feature Extractor {thread_id}] Extracted: {len(features_to_add)} features")
with self.__features_lock:
self.__features[audio_path] = features_to_add
logging.info(f"[MTAFE] [Feature Extractor {thread_id}] Feature Extraction complete for {audio_path} w/ {len(features_to_add)} features")
logging.info(f"[MTAFE] [Feature Extractor {thread_id}] Thread finished!")
def __count_running_threads(self) -> tuple[int, int]:
running_extractors = 0
running_feeders = 0
for ft in self.__feature_extractor_threadpool:
if ft.running(): running_extractors += 1
for ft in self.__audio_loader_threadpool:
if ft.running(): running_feeders += 1
return (running_feeders, running_extractors)
@property
def features(self) -> dict[Path, list[tuple[np.ndarray, float, int]]]:
return self.__features
def extract(self):
total_amount = self.__audio_paths_list.qsize()
logging.info(f"[MTAFE] [Main] Starting feature extraction for {total_amount} file(s)")
t_start = time.perf_counter()
with concurrent.futures.ThreadPoolExecutor(max_workers=(self.__audio_loader_threads + self.__feature_extractor_threads)) as executor:
for i in range(self.__audio_loader_threads):
ld_ft = executor.submit(self.__audio_feeder_thread, i)
self.__audio_loader_threadpool.append(ld_ft)
for i in range(self.__feature_extractor_threads):
ld_ft = executor.submit(self.__feature_extractor_thread, i)
self.__feature_extractor_threadpool.append(ld_ft)
while ( (not self.__check_all_audiofeed_thread_finished()) and (not self.__check_all_featureextractor_thread_finished()) ):
nfeeder, nextract = self.__count_running_threads()
print(f"[MTAFE Progress] Processed {len(self.__features)}/{total_amount} (L:{self.__audio_queue.qsize()}/W:{self.__audio_paths_list.qsize()}, LD:{nfeeder}/EXT:{nextract})", end="\r")
t_stop = time.perf_counter()
logging.info(f"[MTAFE] Processed {len(self.__features)}/{total_amount} (L:{self.__audio_queue.qsize()}/W:{self.__audio_paths_list.qsize()} COMPLETE)")
delta_t = t_stop - t_start
total_features = sum( [len(self.__features[path]) for path in self.__features] )
logging.info(f"[MTAFE] Extraction complete. Took {delta_t} seconds. Added {total_features} vectors/embeddings")

View File

@@ -1,3 +1,17 @@
from dataset_files import AudioFeatureExtractor, random_audio_chunk import logging
afe = AudioFeatureExtractor(random_audio_chunk(32), 16, 32000, False) from audiopreprocessing import triggerlog
afe.extract() #logger = logging.getLogger(__name__)
logging.basicConfig(format="%(asctime)s/%(levelname)s: [%(module)s] %(message)s", level=logging.INFO)
from dataset_files import MultiThreadedAudioFeatureExtractor, random_audio_chunk
mtafe = MultiThreadedAudioFeatureExtractor(
audio_paths=random_audio_chunk(8),
max_audio_in_queue=8,
audio_feeder_threads=8,
feature_extractor_threads=1,
desired_sr=32000,
force_mono=False,
chunk_length=15,
chunk_overlap=2
)
mtafe.extract()

View File

@@ -0,0 +1,17 @@
#import mtafe
import logging
#import dataset_files
logging.basicConfig(format="%(asctime)s/%(levelname)s: [%(module)s] %(message)s", level=logging.DEBUG)
logging.info("Running tests")
# m = mtafe.mtafe(
# audio_paths=dataset_files.random_audio_chunk(2),
# max_audio_in_queue=8,
# audio_feeder_threads=8,
# feature_extractor_threads=1,
# desired_sr=32000,
# force_mono=False,
# chunk_length=15,
# chunk_overlap=2
# )
# m.run()

View File

@@ -0,0 +1,24 @@
import logging
from audiopreprocessing import triggerlog
#logger = logging.getLogger(__name__)
import sys
logging.basicConfig(format="%(asctime)s/%(levelname)s: [%(module)s] %(message)s", level=logging.INFO, handlers=[logging.FileHandler('test_panns.log'), logging.StreamHandler(sys.stdout)])
from pathlib import Path
from mtafe_panns import mtafe_panns
from dataset_files import random_audio_chunk, serialize_dict_obj
mtafe = mtafe_panns(
audio_paths=random_audio_chunk(4),
max_audio_in_queue=4,
audio_feeder_threads=4,
feature_extractor_threads=1,
desired_sr=32000,
force_mono=False,
chunk_length=15,
chunk_overlap=2,
batch_size=32
)
mtafe.extract()
print("Saving inferenced results to file...")
p = Path('./test_panns.pkl')
serialize_dict_obj(p, mtafe.features)

View File

@@ -0,0 +1,95 @@
import librosa
import pickle
import os
import numpy as np
from pathlib import Path
import logging
logger = logging.getLogger(__name__)
def triggerlog():
logger.critical("Testing: info")
def resample_load(input_path : Path, target_sr : int = 16000, mono_audio : bool = False) -> np.ndarray: # AI
"""Load and resamples the audio into `target_sr`.
Args:
input_path (Path): pathlib.Path object to audio file
target_sr (int, optional): Target Sample Rate to resample. Defaults to 16000.
mono_audio (bool, optional): Load the audio in mono mode. Defaults to False.
Returns:
np.ndarray: _description_
"""
# Load audio file with original sample rate
logger.info(f"[resample_load] Loading audio {input_path}")
audio, orig_sr = librosa.load(input_path, sr=None, mono=mono_audio)
# Resample if necessary
if orig_sr != target_sr:
logger.info(f"[resample_load] Resampling to {target_sr}")
audio = librosa.resample(audio, orig_sr=orig_sr, target_sr=target_sr)
return audio
def chunk_audio(audio : np.ndarray, sr: int, chunk_length: float = 10.0, overlap: float = 2.0) -> tuple[list[np.ndarray], list[float], int]: # AI
"""
Chunks audio file into overlapping segments. Only pass in mono audio here.
Args:
audio_file: Loaded audio ndarray (one channel only)
sr: Sample rate for the given audio file
chunk_length: Length of each chunk in seconds
overlap: Overlap between chunks in seconds
Returns:
List of audio chunks, list of chunk positions, and given sample rate
"""
logger.info(f"[chunk_audio] Chunking audio ({len(audio) / sr}s)")
# Calculate chunk size and hop length in samples
chunk_size = int(chunk_length * sr)
hop_length = int((chunk_length - overlap) * sr)
# Generate chunks
chunks = []
positions = []
k = 0
for i in range(0, len(audio) - chunk_size + 1, hop_length):
chunk = audio[i:i + chunk_size]
chunks.append(chunk)
positions.append(i / sr)
k += 1
if k == 0: # The full audio length is less than chunk_length
chunks = [audio]
positions = [0.0]
logger.info(f"[chunk_audio] Audio less than chunk_length. Returning original audio as chunk\r")
else:
logger.info(f"[chunk_audio] Audio is split into {k} chunks")
return chunks, positions, sr
def load_preprocessed_audio(
path: Path,
desired_sr: int,
mono: bool = False,
chunk_length: float = 15.0,
overlap: float = 2.0) -> list[tuple[np.ndarray, float, int]]:
result = []
# Load and resample audio
audio = resample_load(path, desired_sr, mono) # Stereo 2D matrix, Mono 1D array
if mono or (audio.ndim == 1):
# Chunk audio: mono (or the audio file loaded in itself is mono)
chunks, positions, _ = chunk_audio(audio, desired_sr, chunk_length, overlap)
assert len(chunks) == len(positions)
result.extend(zip(chunks, positions, [-1 for _ in range(len(chunks))]))
# (ndarray_chunk1, pos1, -1): first audio chunk, position1, -1 (Mono channel indicator)
else:
# Chunk audio: stereo/multichannel
for channel_id, channel_audio in enumerate(audio):
chunks, positions, _ = chunk_audio(channel_audio, desired_sr, chunk_length, overlap)
assert len(chunks) == len(positions)
result.extend(zip(chunks, positions, [channel_id for _ in range(len(chunks))]))
# (ndarray_chunk1, pos1, 0): first audio chunk, position1, 0 (channel 0)
logging.info(f"[load_preprocessed_audio] Loaded audio {path} ({desired_sr}Hz, Chunk {chunk_length}s with overlap {overlap}s) MONO:{mono}")
return result

135
mtafe_lab/dataset.py Normal file
View File

@@ -0,0 +1,135 @@
import platform
import os
import pickle
import random
import multiprocessing
import threading
import time
import concurrent.futures
import numpy as np
from pathlib import Path
import audiopreprocessing
import logging
import queue
def serialize_dict_obj(path : Path, object : dict) -> int:
"""Serializes Python Dictionary object to a file via Pickle.
Args:
path (Path): Path to store the file
object (dict): Dictionary object to serialize
Returns:
int: size in bytes written
"""
# Horrible practice, horrible security, but it will work for now
with path.open("wb") as fp:
pickle.dump(object, fp)
fp.seek(0, os.SEEK_END)
size = fp.tell()
return size
logging.info("Reading local dataset directory structure...")
ASMRThreePath = Path("C:\\ASMRThree")
ASMRTwoPath = Path("D:\\ASMRTwo")
ASMROnePath = Path("E:\\ASMROne")
if (platform.system() == 'Linux'):
ASMROnePath = Path('/mnt/Scratchpad/ASMROne')
ASMRTwoPath = Path('/mnt/MyStuffz/ASMRTwo')
ASMRThreePath = Path('/mnt/Windows11/ASMRThree')
size_one, size_two, size_three = 0, 0, 0
files_one, files_two, files_three = [], [], []
folders_one, folders_two, folders_three = [], [], []
# Statistic calculation for ASMROne
for root, dirs, files in ASMROnePath.walk(): # Root will iterate through all folders
if root.absolute() != ASMROnePath.absolute(): # Skip root of ASMROnePath
folders_one.append(root) # Add folder to list
for fname in files: # Iterate through all files in current root
file = root/fname # Get file path
assert file.is_file()
files_one.append(file)
size_one += file.stat().st_size # Get file size
# Statistic calculation for ASMRTwo
for root, dirs, files in ASMRTwoPath.walk(): # Root will iterate through all folders
if root.absolute() != ASMRTwoPath.absolute(): # Skip root of ASMRTwoPath
folders_two.append(root) # Add folder to list
for fname in files: # Iterate through all files in current root
file = root/fname # Get file path
assert file.is_file()
files_two.append(file)
size_two += file.stat().st_size # Get file size
# Statistic calculation for ASMRThree
for root, dirs, files in ASMRThreePath.walk(): # Root will iterate through all folders
if root.absolute() != ASMRThreePath.absolute(): # Skip root of ASMRThreePath
folders_three.append(root) # Add folder to list
for fname in files: # Iterate through all files in current root
file = root/fname # Get file path
assert file.is_file()
files_three.append(file)
size_three += file.stat().st_size # Get file size
DataSubsetPaths = [ASMROnePath, ASMRTwoPath, ASMRThreePath]
DLSiteWorksPaths = []
# Collect ASMR Works (RJ ID, Paths)
for ASMRSubsetPath in DataSubsetPaths:
for WorkPaths in ASMRSubsetPath.iterdir():
DLSiteWorksPaths.append(WorkPaths)
fileExt2fileType = {
".TXT": "Document",
".WAV": "Audio",
".MP3": "Audio",
".PNG": "Image",
".JPG": "Image",
".VTT": "Subtitle",
".PDF": "Document",
".FLAC": "Audio",
".MP4": "Video",
".LRC": "Subtitle",
".SRT": "Subtitle",
".JPEG": "Image",
".ASS": "Subtitle",
"": "NO EXTENSION",
".M4A": "Audio",
".MKV": "Video"
}
fileext_stat = {}
file_list = files_one + files_two + files_three
file_list_count = len(file_list)
for file in file_list:
f_ext = file.suffix.upper()
if (f_ext in fileext_stat.keys()):
fileext_stat[f_ext]['Count'] += 1
fileext_stat[f_ext]['List'].append(file)
fileext_stat[f_ext]['ExtensionMass'] += file.stat().st_size
else:
fileext_stat[f_ext] = {}
fileext_stat[f_ext]['Count'] = 1
fileext_stat[f_ext]['List'] = [file]
fileext_stat[f_ext]['ExtensionMass'] = file.stat().st_size # The total sum of sizes of the same file extension
fileext_stat[f_ext]['MediaType'] = fileExt2fileType[f_ext]
audio_paths = []
for extension in fileext_stat: # I can't be bothered to convert this into a list compresion
if fileext_stat[extension]['MediaType'] == "Audio":
audio_paths += fileext_stat[extension]['List']
def random_audio_chunk(n : int, seed : int = 177013) -> list[Path]:
"""Returns a random selection of audio files
Args:
n (int): Amount of files to return
seed (int, optional): Seed for RNG. Defaults to 177013.
Returns:
list[Path]: List of randomly selected audio paths (using Path object)
"""
random.seed(seed)
#return random.choices(audio_paths, k=n) # Contains repeated elements
return random.sample(audio_paths, k=n)

269
mtafe_lab/mtafe.py Normal file
View File

@@ -0,0 +1,269 @@
import logging
#logging.basicConfig(format="%(asctime)s/%(levelname)s: [%(module)s] %(message)s", level=logging.INFO)
import dataset
import numpy as np
import audiopreprocessing
import threading
import queue
import time
from concurrent.futures import ThreadPoolExecutor, Future
from pathlib import Path
class mtafe:
# Input
audio_path_queue: queue.Queue[Path] # List of audio paths to preprocess
# Feeder/Extractor/Queue threading options
audio_feeder_threads: int # Amount of audio feeder threads
feature_extractor_threads: int # Amount of feature extractor threads (if the method allows)
max_audio_in_queue: int # Maximum audio in queue
# Audio preprocessing parameters
desired_sr: int # Desired Sample Rate (Resampling)
mono: bool # Force load audio in mono mode
chunk_length: float # Audio chunk length
overlap: float # Audio chunk overlap
# Runtime
audio_queue: queue.Queue[ # List of ...
tuple[ # Pair of chunked audio and its path
list[tuple[np.ndarray, float, int]], # Chunked audio list of (ndarray, time position of chunk relative to original audio, channel_id)
Path # Path to original audio
]
] # Listed of Chunked/Resampled audio
audio_feeder_threadpool: list[Future]
feature_extractor_threadpool: list[Future]
features_lock: threading.Lock
audio_feeder_barrier: threading.Barrier # Synchronization barrier for all audio feeder threads
# Output
features: dict[Path, list[tuple[np.ndarray, float, int]]]
def __init__(
self,
paudio_paths: list[Path],
pmax_audio_in_queue: int = 16,
paudio_feeder_threads: int = 8,
pfeature_extractor_threads: int = 8,
pdesired_sr: int = 32000,
pforce_mono: bool = False,
pchunk_length: float = 15.0,
pchunk_overlap: float = 2.0
):
# Check if the paths passed in are all valid and add them to queue
self.audio_path_queue = queue.Queue()
for p in paudio_paths:
if not p.is_file():
raise Exception(f"Path '{p.absolute()}' is NOT a valid file!")
else:
self.audio_path_queue.put(p)
self.audio_path_queue.put(None) # To signal to the producer that the audio path list is empty, since Queue.empty() is unreliable
logging.info(f"[MTAFE] [Constructor] Queued {self.audio_path_queue.qsize() - 1} files")
# Set up private attributes
## Audio preprocessing parameters
self.desired_sr = pdesired_sr
self.mono = pforce_mono
self.chunk_length = pchunk_length
self.overlap = pchunk_overlap
## Extractor/Feeder settings
self.max_audio_in_queue = pmax_audio_in_queue
self.audio_feeder_threads = paudio_feeder_threads
self.feature_extractor_threads = pfeature_extractor_threads
## Set up runtime conditions
self.audio_queue = queue.Queue(maxsize=self.max_audio_in_queue)
self.features = {}
self.features_lock = threading.Lock()
self.audio_feeder_barrier = threading.Barrier(self.audio_feeder_threads)
self.audio_feeder_threadpool = []
self.feature_extractor_threadpool = []
logging.info(f"[MTAFE] [Constructor] Extraction parameters: {pdesired_sr}Hz, Mono: {pforce_mono}, Divide into {pchunk_length}s chunks with {pchunk_overlap}s of overlap")
logging.info(f"[MTAFE] [Constructor] Using {paudio_feeder_threads} threads for preprocessing audio and {pfeature_extractor_threads} threads for feature extraction. Max queue size of {pmax_audio_in_queue} files")
def audio_inference_embedding(self, audio: list[tuple[np.ndarray, float, int]]) -> list[tuple[np.ndarray, float, int]]:
"""Receives a list of audio chunks, and then extracts embeddings for all audio chunks, returns the resulting embedding as a list of tuples(embedding, time, channel_id)
Args:
audio (list[tuple[np.ndarray, float, int]]): list of audio chunks
Returns:
list[tuple[np.ndarray, float, int]]: List of (embedding vector, timepos, channel id)
"""
features = []
for audio_chunk in audio:
audio, timepos, channel_id = audio_chunk
zero = np.zeros(32)
features.append( (zero, timepos, channel_id) )
time.sleep(1.5) # Simulate effort, change to simulate spent seconds in each audio file
return features
# To be overridden
def audio_feeder_worker(self, thread_id: int, barrier: threading.Barrier): # AI
try:
while True:
# Add timeout to prevent blocking indefinitely
try:
new_audio_path = self.audio_path_queue.get(timeout=10)
except queue.Empty:
logging.warning(f"[MTAFE] [Audio Feeder {thread_id}] Queue get timeout")
continue
if new_audio_path is None:
self.audio_path_queue.put(new_audio_path) # Put None back
break
logging.info(f"[MTAFE] [Audio Feeder {thread_id}] Preprocess: {new_audio_path.absolute()}")
try:
new_audio = audiopreprocessing.load_preprocessed_audio(
new_audio_path,
self.desired_sr,
self.mono,
self.chunk_length,
self.overlap
)
# Add timeout to prevent deadlock on full queue
try:
self.audio_queue.put((new_audio, new_audio_path), timeout=30)
logging.info(f"[MTAFE] [Audio Feeder {thread_id}] Feed: {new_audio_path.absolute()}")
except queue.Full:
logging.error(f"[MTAFE] [Audio Feeder {thread_id}] Queue full, skipping {new_audio_path}")
continue
except Exception as e:
logging.error(f"[MTAFE] [Audio Feeder {thread_id}] Error processing {new_audio_path}: {str(e)}")
continue
# Add barrier timeout to prevent indefinite wait
logging.info(f"[MTAFE] [Audio Feeder {thread_id}] Waiting for other threads")
try:
barrier.wait(timeout=60)
except threading.BrokenBarrierError:
logging.error(f"[MTAFE] [Audio Feeder {thread_id}] Barrier broken")
if thread_id == 0:
self.audio_queue.put(None) # Signal end
logging.info(f"[MTAFE] [Audio Feeder {thread_id}] Thread finished!")
except Exception as e:
logging.error(f"[MTAFE] [Audio Feeder {thread_id}] Fatal exception: {str(e)}")
logging.exception(e)
# Ensure barrier can progress even if a thread fails
try:
barrier.abort()
except:
pass
# Ensure sentinel is added even if threads fail
if thread_id == 0:
try:
self.audio_queue.put(None, timeout=5)
except:
pass
# def audio_feeder_worker(self, thread_id: int, barrier: threading.Barrier):
# try:
# while True:
# # Attempt to get audio path from audio path queue
# new_audio_path = self.audio_path_queue.get()
# # Check thread exit condition (If the queue returns None, that means the audio path queue is now empty and the thread should end itself)
# if (new_audio_path is None):
# self.audio_path_queue.put(new_audio_path) # Put None back to notify other audio feeder threads
# break # Break out of the infinite loop
# # Audio path queue is not empty:
# logging.info(f"[MTAFE] [Audio Feeder {thread_id}] Preprocess: {new_audio_path.absolute()}")
# new_audio = audiopreprocessing.load_preprocessed_audio(
# new_audio_path,
# self.desired_sr,
# self.mono,
# self.chunk_length,
# self.overlap
# )
# self.audio_queue.put((new_audio, new_audio_path))
# logging.info(f"[MTAFE] [Audio Feeder {thread_id}] Feed: {new_audio_path.absolute()}")
# logging.info(f"[MTAFE] [Audio Feeder {thread_id}] Waiting for other threads to finish")
# barrier.wait()
# if (thread_id == 0):
# self.audio_queue.put(None) # None to signal audio_queue has no more elements to process
# logging.info(f"[MTAFE] [Audio Feeder {thread_id}] Thread finished!")
# except Exception as e:
# logging.error(f"[MTAFE] [Audio Feeder {thread_id}] An exception occurred! Committing seppuku!")
# logging.exception(e)
# return
def feature_extractor_worker(self, thread_id: int):
while True:
# Attempt to get next audio chunks to process
next_audio_tuple = self.audio_queue.get()
# Check thread exit condition
if (next_audio_tuple is None):
self.audio_queue.put(next_audio_tuple) # Put the None back to notify other threads
break # unalive urself
else: # Assuming we got more tuples
current_audio_to_process, current_audio_path = next_audio_tuple # Deconstruct tuple
logging.info(f"[MTAFE] [Feature Extractor {thread_id}] Extracting: {current_audio_path}")
features_to_add = self.audio_inference_embedding(current_audio_to_process)
with self.features_lock:
self.features[current_audio_path] = features_to_add
logging.info(f"[MTAFE] [Feature Extractor {thread_id}] Feature Extraction complete for {current_audio_path} w/ {len(features_to_add)} features")
logging.info(f"[MTAFE] [Feature Extractor {thread_id}] Thread finished!")
def test_audio_feeder_worker(self):
total_file_amount = self.audio_path_queue.qsize() - 1
logging.info("[MTAFE] [test_audio_feeder_worker] Spinning up new threads...")
with ThreadPoolExecutor(max_workers=self.audio_feeder_threads) as executor:
for i in range(self.audio_feeder_threads):
ld_ft = executor.submit(self.audio_feeder_worker, i, self.audio_feeder_barrier)
self.audio_feeder_threadpool.append(ld_ft)
logging.info(f"[MTAFE] [test_audio_feeder_worker] Launched audio feeder {i}")
for i in range(total_file_amount):
_, p = self.audio_queue.get()
time.sleep(0.25)
logging.info(f"[MTAFE] [test_audio_feeder_worker] Popped: {p}")
logging.info("[MTAFE] [test_audio_feeder_worker] All audio feeder worker joined!")
#logging.info(f"[MTAFE] [test_audio_feeder_worker] Current audio queue size: {self.audio_queue.qsize()}")
def count_running_threads(self) -> tuple[int, int]:
running_extractors = 0
running_feeders = 0
for ft in self.feature_extractor_threadpool:
if ft.running(): running_extractors += 1
for ft in self.audio_feeder_threadpool:
if ft.running(): running_feeders += 1
return (running_feeders, running_extractors)
def check_all_audiofeed_thread_finished(self) -> bool:
for ft in self.audio_feeder_threadpool:
if ft.running():
return False
return True
def check_all_featureextractor_thread_finished(self) -> bool:
for ft in self.feature_extractor_threadpool:
if ft.running():
return False
return True
def extract(self):
total_amount = self.audio_path_queue.qsize() - 1 # Account for None to indicate queue end
logging.info(f"[MTAFE] [Main] Starting feature extraction for {total_amount} file(s)")
t_start = time.perf_counter() # Timer
with ThreadPoolExecutor(max_workers=(self.audio_feeder_threads + self.feature_extractor_threads)) as executor:
# Audio feeder threads
for i in range(self.audio_feeder_threads):
logging.info(f"[MTAFE] Started audio feeder thread {i}")
ld_ft = executor.submit(self.audio_feeder_worker, i, self.audio_feeder_barrier)
self.audio_feeder_threadpool.append(ld_ft)
# Feature extractor threads
for i in range(self.feature_extractor_threads):
logging.info(f"[MTAFE] Started feature extractor thread {i}")
ex_ft = executor.submit(self.feature_extractor_worker, i)
self.feature_extractor_threadpool.append(ex_ft)
# Progress checking
while ( (not self.check_all_audiofeed_thread_finished()) and (not self.check_all_featureextractor_thread_finished()) ):
nfeeder, nextract = self.count_running_threads()
print(f"[MTAFE Progress] Processed {len(self.features)}/{total_amount} (L:{self.audio_queue.qsize()}/W:{self.audio_path_queue.qsize()}, LD:{nfeeder}/EXT:{nextract})", end="\r")
t_stop = time.perf_counter()
logging.info(f"[MTAFE] Processed {len(self.features)}/{total_amount} (L:{self.audio_queue.qsize() - 1}/W:{self.audio_path_queue.qsize() - 1} COMPLETE)")
delta_t = t_stop - t_start
total_features = sum( [len(self.features[path]) for path in self.features] )
logging.info(f"[MTAFE] Extraction complete. Took {delta_t} seconds. Added {total_features} vectors/embeddings")

22
mtafe_lab/test_mtafe.py Normal file
View File

@@ -0,0 +1,22 @@
import logging
logging.basicConfig(format="%(asctime)s/%(levelname)s: [%(module)s] %(message)s", level=logging.INFO)
import mtafe
from dataset import random_audio_chunk
logging.info("Generating random audio path list")
rdpl = random_audio_chunk(256)
logging.info("Initializing MTAFE")
m = mtafe.mtafe(
paudio_paths=rdpl,
pmax_audio_in_queue=8,
paudio_feeder_threads=8,
pfeature_extractor_threads=2,
pdesired_sr=32000,
pforce_mono=False,
pchunk_length=15,
pchunk_overlap=2
)
#m.test_audio_feeder_worker()
m.extract()

Binary file not shown.