Compare commits

...

1 Commits

Author SHA1 Message Date
b14a0a2a17 I am giving up on MTAFE 2025-04-19 20:05:35 +02:00
5 changed files with 286 additions and 63 deletions

View File

@@ -0,0 +1,15 @@
My implementation attempt for a Multi-Threaded Audio Feature Extractor... my attempt ended in misery.
My vision is a program that is multi-threaded, that will do audio pre-processing and feature extraction in different threads. There should be `i` threads that will do pre-processing on all given audio file paths, and there should be `j` threads that will do feature extraction. If the audio pre-processing pipeline is single-threaded, it will pose a bottleneck to the entire program. But the feature extractor itself is also a bottleneck, since all audio embedding extractor rely on GPU inference, the feature extraction process must be single-threaded on my computer.
I was trying to adapt the program for multiple threads for audio pre-processing AND multi-threaded for feature extraction (for beefier GPU that can handle more inference threads)
Unfortunately... All my attempts has ended in misery, my multi-threaded code is littered with performance issues and deadlocks. Python isn't exactly the best language for multi-threaded code due to the existence of GIL. I am trying to implement a multi-producer, multi-consumer model here. The best attempt I was able to do will hang for a long time waiting for the producer (audio feeder) to pre-process the audio, and put it on the shared queue. It will lock up for a really long time, but after that, it will process everything in light speed. But when it's nearing the end, there is a great chance that the program will deadlock itself. I wasn't able to debug, and the profile didn't really yield any result that are useful to me.
At one point I even relied on AI, and I still wasn't getting a consistent result, the AI generated a code that was significantly faster, with less deadlock, but has the issue of skipping audio files due to them not being pre-processed in time. I could implement additional logic to catch processing errors, and retry if possible. But I am really burnt out, and I would look for better alternatives.
The next thing I am going to try is to separate this program into two, this program attempts to do pre-processing AND feature extraction in the same time. I would split the process into two. One program (preferably multi-threaded) that will do all the audio pre-processing (resampling, chunking, etc.), and it will output the pre-processed audio into a serialized pickle file, or any other serialization formats.
I can see various issues with this approach, the most important of which is space, I am basically taking all of those audio files (which is NOT a small amount), and I am re-encoding it, without any compression. Even though I have decided to lower the audio's bit-rate (fro, the typical 48000 Hz or 192000 Hz to just 32000 Hz, or in specific embedding extraction models: 8000 Hz or 16000 Hz), this will still take up a lot of space.
Also the pickle won't be the best format for storing all of those audio, safety issue is one of them, but the alternative of encoding each chunk into FLAC/MP3 compressed format, will be very heavy on the file system. Even though I do have a SSD. I am uncertain if the filesystem, handling hundred of thousands of audio chunk files will have a hit on the performance and the life of the SSD.
But at least this will be a lot easier to implement.

View File

@@ -1,32 +1,269 @@
import logging import logging
logging.basicConfig(format="%(asctime)s/%(levelname)s: [%(module)s] %(message)s", level=logging.INFO) #logging.basicConfig(format="%(asctime)s/%(levelname)s: [%(module)s] %(message)s", level=logging.INFO)
import multiprocessing
import multiprocessing.process
import dataset import dataset
import numpy as np
import audiopreprocessing import audiopreprocessing
import threading
import queue
import time
from concurrent.futures import ThreadPoolExecutor, Future
from pathlib import Path from pathlib import Path
def copy_worker(origin_queue, target_queue): class mtafe:
p = origin_queue.get() # Input
logging.info(f"Processing: {p}") audio_path_queue: queue.Queue[Path] # List of audio paths to preprocess
l = audiopreprocessing.load_preprocessed_audio(p, 32000, True) # Feeder/Extractor/Queue threading options
print("Preprocess complete, putting it into queue") audio_feeder_threads: int # Amount of audio feeder threads
target_queue.put(l) # Even on a small scale test, the process will always hang here feature_extractor_threads: int # Amount of feature extractor threads (if the method allows)
max_audio_in_queue: int # Maximum audio in queue
# Audio preprocessing parameters
desired_sr: int # Desired Sample Rate (Resampling)
mono: bool # Force load audio in mono mode
chunk_length: float # Audio chunk length
overlap: float # Audio chunk overlap
# Runtime
audio_queue: queue.Queue[ # List of ...
tuple[ # Pair of chunked audio and its path
list[tuple[np.ndarray, float, int]], # Chunked audio list of (ndarray, time position of chunk relative to original audio, channel_id)
Path # Path to original audio
]
] # Listed of Chunked/Resampled audio
audio_feeder_threadpool: list[Future]
feature_extractor_threadpool: list[Future]
features_lock: threading.Lock
audio_feeder_barrier: threading.Barrier # Synchronization barrier for all audio feeder threads
# Output
features: dict[Path, list[tuple[np.ndarray, float, int]]]
if __name__ == "__main__": def __init__(
audio_path_queue = multiprocessing.Queue() self,
audio_queue = multiprocessing.Queue() paudio_paths: list[Path],
pmax_audio_in_queue: int = 16,
paudio_feeder_threads: int = 8,
pfeature_extractor_threads: int = 8,
pdesired_sr: int = 32000,
pforce_mono: bool = False,
pchunk_length: float = 15.0,
pchunk_overlap: float = 2.0
):
# Check if the paths passed in are all valid and add them to queue
self.audio_path_queue = queue.Queue()
for p in paudio_paths:
if not p.is_file():
raise Exception(f"Path '{p.absolute()}' is NOT a valid file!")
else:
self.audio_path_queue.put(p)
self.audio_path_queue.put(None) # To signal to the producer that the audio path list is empty, since Queue.empty() is unreliable
rand_paths = dataset.random_audio_chunk(1) logging.info(f"[MTAFE] [Constructor] Queued {self.audio_path_queue.qsize() - 1} files")
for p in rand_paths:
audio_path_queue.put(p)
print("Files queued") # Set up private attributes
## Audio preprocessing parameters
self.desired_sr = pdesired_sr
self.mono = pforce_mono
self.chunk_length = pchunk_length
self.overlap = pchunk_overlap
processes = [multiprocessing.Process(target=copy_worker, args=(audio_path_queue, audio_queue)) for _ in range(1)] ## Extractor/Feeder settings
for p in processes: p.start() self.max_audio_in_queue = pmax_audio_in_queue
for p in processes: p.join() self.audio_feeder_threads = paudio_feeder_threads
self.feature_extractor_threads = pfeature_extractor_threads
print("Joined") ## Set up runtime conditions
#for _ in range(1): print(audio_queue.get()) self.audio_queue = queue.Queue(maxsize=self.max_audio_in_queue)
self.features = {}
self.features_lock = threading.Lock()
self.audio_feeder_barrier = threading.Barrier(self.audio_feeder_threads)
self.audio_feeder_threadpool = []
self.feature_extractor_threadpool = []
logging.info(f"[MTAFE] [Constructor] Extraction parameters: {pdesired_sr}Hz, Mono: {pforce_mono}, Divide into {pchunk_length}s chunks with {pchunk_overlap}s of overlap")
logging.info(f"[MTAFE] [Constructor] Using {paudio_feeder_threads} threads for preprocessing audio and {pfeature_extractor_threads} threads for feature extraction. Max queue size of {pmax_audio_in_queue} files")
def audio_inference_embedding(self, audio: list[tuple[np.ndarray, float, int]]) -> list[tuple[np.ndarray, float, int]]:
"""Receives a list of audio chunks, and then extracts embeddings for all audio chunks, returns the resulting embedding as a list of tuples(embedding, time, channel_id)
Args:
audio (list[tuple[np.ndarray, float, int]]): list of audio chunks
Returns:
list[tuple[np.ndarray, float, int]]: List of (embedding vector, timepos, channel id)
"""
features = []
for audio_chunk in audio:
audio, timepos, channel_id = audio_chunk
zero = np.zeros(32)
features.append( (zero, timepos, channel_id) )
time.sleep(1.5) # Simulate effort, change to simulate spent seconds in each audio file
return features
# To be overridden
def audio_feeder_worker(self, thread_id: int, barrier: threading.Barrier): # AI
try:
while True:
# Add timeout to prevent blocking indefinitely
try:
new_audio_path = self.audio_path_queue.get(timeout=10)
except queue.Empty:
logging.warning(f"[MTAFE] [Audio Feeder {thread_id}] Queue get timeout")
continue
if new_audio_path is None:
self.audio_path_queue.put(new_audio_path) # Put None back
break
logging.info(f"[MTAFE] [Audio Feeder {thread_id}] Preprocess: {new_audio_path.absolute()}")
try:
new_audio = audiopreprocessing.load_preprocessed_audio(
new_audio_path,
self.desired_sr,
self.mono,
self.chunk_length,
self.overlap
)
# Add timeout to prevent deadlock on full queue
try:
self.audio_queue.put((new_audio, new_audio_path), timeout=30)
logging.info(f"[MTAFE] [Audio Feeder {thread_id}] Feed: {new_audio_path.absolute()}")
except queue.Full:
logging.error(f"[MTAFE] [Audio Feeder {thread_id}] Queue full, skipping {new_audio_path}")
continue
except Exception as e:
logging.error(f"[MTAFE] [Audio Feeder {thread_id}] Error processing {new_audio_path}: {str(e)}")
continue
# Add barrier timeout to prevent indefinite wait
logging.info(f"[MTAFE] [Audio Feeder {thread_id}] Waiting for other threads")
try:
barrier.wait(timeout=60)
except threading.BrokenBarrierError:
logging.error(f"[MTAFE] [Audio Feeder {thread_id}] Barrier broken")
if thread_id == 0:
self.audio_queue.put(None) # Signal end
logging.info(f"[MTAFE] [Audio Feeder {thread_id}] Thread finished!")
except Exception as e:
logging.error(f"[MTAFE] [Audio Feeder {thread_id}] Fatal exception: {str(e)}")
logging.exception(e)
# Ensure barrier can progress even if a thread fails
try:
barrier.abort()
except:
pass
# Ensure sentinel is added even if threads fail
if thread_id == 0:
try:
self.audio_queue.put(None, timeout=5)
except:
pass
# def audio_feeder_worker(self, thread_id: int, barrier: threading.Barrier):
# try:
# while True:
# # Attempt to get audio path from audio path queue
# new_audio_path = self.audio_path_queue.get()
# # Check thread exit condition (If the queue returns None, that means the audio path queue is now empty and the thread should end itself)
# if (new_audio_path is None):
# self.audio_path_queue.put(new_audio_path) # Put None back to notify other audio feeder threads
# break # Break out of the infinite loop
# # Audio path queue is not empty:
# logging.info(f"[MTAFE] [Audio Feeder {thread_id}] Preprocess: {new_audio_path.absolute()}")
# new_audio = audiopreprocessing.load_preprocessed_audio(
# new_audio_path,
# self.desired_sr,
# self.mono,
# self.chunk_length,
# self.overlap
# )
# self.audio_queue.put((new_audio, new_audio_path))
# logging.info(f"[MTAFE] [Audio Feeder {thread_id}] Feed: {new_audio_path.absolute()}")
# logging.info(f"[MTAFE] [Audio Feeder {thread_id}] Waiting for other threads to finish")
# barrier.wait()
# if (thread_id == 0):
# self.audio_queue.put(None) # None to signal audio_queue has no more elements to process
# logging.info(f"[MTAFE] [Audio Feeder {thread_id}] Thread finished!")
# except Exception as e:
# logging.error(f"[MTAFE] [Audio Feeder {thread_id}] An exception occurred! Committing seppuku!")
# logging.exception(e)
# return
def feature_extractor_worker(self, thread_id: int):
while True:
# Attempt to get next audio chunks to process
next_audio_tuple = self.audio_queue.get()
# Check thread exit condition
if (next_audio_tuple is None):
self.audio_queue.put(next_audio_tuple) # Put the None back to notify other threads
break # unalive urself
else: # Assuming we got more tuples
current_audio_to_process, current_audio_path = next_audio_tuple # Deconstruct tuple
logging.info(f"[MTAFE] [Feature Extractor {thread_id}] Extracting: {current_audio_path}")
features_to_add = self.audio_inference_embedding(current_audio_to_process)
with self.features_lock:
self.features[current_audio_path] = features_to_add
logging.info(f"[MTAFE] [Feature Extractor {thread_id}] Feature Extraction complete for {current_audio_path} w/ {len(features_to_add)} features")
logging.info(f"[MTAFE] [Feature Extractor {thread_id}] Thread finished!")
def test_audio_feeder_worker(self):
total_file_amount = self.audio_path_queue.qsize() - 1
logging.info("[MTAFE] [test_audio_feeder_worker] Spinning up new threads...")
with ThreadPoolExecutor(max_workers=self.audio_feeder_threads) as executor:
for i in range(self.audio_feeder_threads):
ld_ft = executor.submit(self.audio_feeder_worker, i, self.audio_feeder_barrier)
self.audio_feeder_threadpool.append(ld_ft)
logging.info(f"[MTAFE] [test_audio_feeder_worker] Launched audio feeder {i}")
for i in range(total_file_amount):
_, p = self.audio_queue.get()
time.sleep(0.25)
logging.info(f"[MTAFE] [test_audio_feeder_worker] Popped: {p}")
logging.info("[MTAFE] [test_audio_feeder_worker] All audio feeder worker joined!")
#logging.info(f"[MTAFE] [test_audio_feeder_worker] Current audio queue size: {self.audio_queue.qsize()}")
def count_running_threads(self) -> tuple[int, int]:
running_extractors = 0
running_feeders = 0
for ft in self.feature_extractor_threadpool:
if ft.running(): running_extractors += 1
for ft in self.audio_feeder_threadpool:
if ft.running(): running_feeders += 1
return (running_feeders, running_extractors)
def check_all_audiofeed_thread_finished(self) -> bool:
for ft in self.audio_feeder_threadpool:
if ft.running():
return False
return True
def check_all_featureextractor_thread_finished(self) -> bool:
for ft in self.feature_extractor_threadpool:
if ft.running():
return False
return True
def extract(self):
total_amount = self.audio_path_queue.qsize() - 1 # Account for None to indicate queue end
logging.info(f"[MTAFE] [Main] Starting feature extraction for {total_amount} file(s)")
t_start = time.perf_counter() # Timer
with ThreadPoolExecutor(max_workers=(self.audio_feeder_threads + self.feature_extractor_threads)) as executor:
# Audio feeder threads
for i in range(self.audio_feeder_threads):
logging.info(f"[MTAFE] Started audio feeder thread {i}")
ld_ft = executor.submit(self.audio_feeder_worker, i, self.audio_feeder_barrier)
self.audio_feeder_threadpool.append(ld_ft)
# Feature extractor threads
for i in range(self.feature_extractor_threads):
logging.info(f"[MTAFE] Started feature extractor thread {i}")
ex_ft = executor.submit(self.feature_extractor_worker, i)
self.feature_extractor_threadpool.append(ex_ft)
# Progress checking
while ( (not self.check_all_audiofeed_thread_finished()) and (not self.check_all_featureextractor_thread_finished()) ):
nfeeder, nextract = self.count_running_threads()
print(f"[MTAFE Progress] Processed {len(self.features)}/{total_amount} (L:{self.audio_queue.qsize()}/W:{self.audio_path_queue.qsize()}, LD:{nfeeder}/EXT:{nextract})", end="\r")
t_stop = time.perf_counter()
logging.info(f"[MTAFE] Processed {len(self.features)}/{total_amount} (L:{self.audio_queue.qsize() - 1}/W:{self.audio_path_queue.qsize() - 1} COMPLETE)")
delta_t = t_stop - t_start
total_features = sum( [len(self.features[path]) for path in self.features] )
logging.info(f"[MTAFE] Extraction complete. Took {delta_t} seconds. Added {total_features} vectors/embeddings")

View File

@@ -1,30 +0,0 @@
import logging
logging.basicConfig(format="%(asctime)s/%(levelname)s: [%(module)s] %(message)s", level=logging.INFO)
import multiprocessing
from dataset import random_audio_chunk
import audiopreprocessing
from time import sleep
origin_queue = multiprocessing.Queue()
target_queue = multiprocessing.Queue()
def worker(orig, targ):
p = orig.get()
#out = "PROCESSED" + str(p.absolute())
out = audiopreprocessing.load_preprocessed_audio(p, 16000, True) # This will cause put to hang
targ.put(out) # This will hang the process
if __name__ == "__main__":
K = 2
for p in random_audio_chunk(K):
origin_queue.put(p)
processes = [multiprocessing.Process(target=worker, args=(origin_queue, target_queue)) for _ in range(K)]
for p in processes: p.start()
for p in processes: p.join()
logging.critical("Successfully terminated all threads")
for _ in range(K): print(target_queue.get())

View File

@@ -5,17 +5,18 @@ import mtafe
from dataset import random_audio_chunk from dataset import random_audio_chunk
logging.info("Generating random audio path list") logging.info("Generating random audio path list")
rdpl = random_audio_chunk(2) rdpl = random_audio_chunk(256)
logging.info("Initializing MTAFE") logging.info("Initializing MTAFE")
mtafe.initialize_parameters( m = mtafe.mtafe(
paudio_paths=rdpl, paudio_paths=rdpl,
pmax_audio_in_queue=4, pmax_audio_in_queue=8,
paudio_feeder_threads=2, paudio_feeder_threads=8,
pfeature_extractor_threads=1, pfeature_extractor_threads=2,
pdesired_sr=32000, pdesired_sr=32000,
pforce_mono=False, pforce_mono=False,
pchunk_length=15, pchunk_length=15,
pchunk_overlap=2 pchunk_overlap=2
) )
mtafe.test_feeder() #m.test_audio_feeder_worker()
m.extract()

Binary file not shown.