Compare commits
1 Commits
37b6a3c5e7
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
|
b14a0a2a17
|
15
DLSiteFSearchObsidian/Implementation attempt for MTAFE.md
Normal file
15
DLSiteFSearchObsidian/Implementation attempt for MTAFE.md
Normal file
@@ -0,0 +1,15 @@
|
||||
My implementation attempt for a Multi-Threaded Audio Feature Extractor... my attempt ended in misery.
|
||||
|
||||
My vision is a program that is multi-threaded, that will do audio pre-processing and feature extraction in different threads. There should be `i` threads that will do pre-processing on all given audio file paths, and there should be `j` threads that will do feature extraction. If the audio pre-processing pipeline is single-threaded, it will pose a bottleneck to the entire program. But the feature extractor itself is also a bottleneck, since all audio embedding extractor rely on GPU inference, the feature extraction process must be single-threaded on my computer.
|
||||
|
||||
I was trying to adapt the program for multiple threads for audio pre-processing AND multi-threaded for feature extraction (for beefier GPU that can handle more inference threads)
|
||||
|
||||
Unfortunately... All my attempts has ended in misery, my multi-threaded code is littered with performance issues and deadlocks. Python isn't exactly the best language for multi-threaded code due to the existence of GIL. I am trying to implement a multi-producer, multi-consumer model here. The best attempt I was able to do will hang for a long time waiting for the producer (audio feeder) to pre-process the audio, and put it on the shared queue. It will lock up for a really long time, but after that, it will process everything in light speed. But when it's nearing the end, there is a great chance that the program will deadlock itself. I wasn't able to debug, and the profile didn't really yield any result that are useful to me.
|
||||
|
||||
At one point I even relied on AI, and I still wasn't getting a consistent result, the AI generated a code that was significantly faster, with less deadlock, but has the issue of skipping audio files due to them not being pre-processed in time. I could implement additional logic to catch processing errors, and retry if possible. But I am really burnt out, and I would look for better alternatives.
|
||||
|
||||
The next thing I am going to try is to separate this program into two, this program attempts to do pre-processing AND feature extraction in the same time. I would split the process into two. One program (preferably multi-threaded) that will do all the audio pre-processing (resampling, chunking, etc.), and it will output the pre-processed audio into a serialized pickle file, or any other serialization formats.
|
||||
I can see various issues with this approach, the most important of which is space, I am basically taking all of those audio files (which is NOT a small amount), and I am re-encoding it, without any compression. Even though I have decided to lower the audio's bit-rate (fro, the typical 48000 Hz or 192000 Hz to just 32000 Hz, or in specific embedding extraction models: 8000 Hz or 16000 Hz), this will still take up a lot of space.
|
||||
Also the pickle won't be the best format for storing all of those audio, safety issue is one of them, but the alternative of encoding each chunk into FLAC/MP3 compressed format, will be very heavy on the file system. Even though I do have a SSD. I am uncertain if the filesystem, handling hundred of thousands of audio chunk files will have a hit on the performance and the life of the SSD.
|
||||
|
||||
But at least this will be a lot easier to implement.
|
||||
@@ -1,32 +1,269 @@
|
||||
import logging
|
||||
logging.basicConfig(format="%(asctime)s/%(levelname)s: [%(module)s] %(message)s", level=logging.INFO)
|
||||
#logging.basicConfig(format="%(asctime)s/%(levelname)s: [%(module)s] %(message)s", level=logging.INFO)
|
||||
|
||||
import multiprocessing
|
||||
import multiprocessing.process
|
||||
import dataset
|
||||
import numpy as np
|
||||
import audiopreprocessing
|
||||
import threading
|
||||
import queue
|
||||
import time
|
||||
from concurrent.futures import ThreadPoolExecutor, Future
|
||||
from pathlib import Path
|
||||
|
||||
def copy_worker(origin_queue, target_queue):
|
||||
p = origin_queue.get()
|
||||
logging.info(f"Processing: {p}")
|
||||
l = audiopreprocessing.load_preprocessed_audio(p, 32000, True)
|
||||
print("Preprocess complete, putting it into queue")
|
||||
target_queue.put(l) # Even on a small scale test, the process will always hang here
|
||||
class mtafe:
|
||||
# Input
|
||||
audio_path_queue: queue.Queue[Path] # List of audio paths to preprocess
|
||||
# Feeder/Extractor/Queue threading options
|
||||
audio_feeder_threads: int # Amount of audio feeder threads
|
||||
feature_extractor_threads: int # Amount of feature extractor threads (if the method allows)
|
||||
max_audio_in_queue: int # Maximum audio in queue
|
||||
# Audio preprocessing parameters
|
||||
desired_sr: int # Desired Sample Rate (Resampling)
|
||||
mono: bool # Force load audio in mono mode
|
||||
chunk_length: float # Audio chunk length
|
||||
overlap: float # Audio chunk overlap
|
||||
# Runtime
|
||||
audio_queue: queue.Queue[ # List of ...
|
||||
tuple[ # Pair of chunked audio and its path
|
||||
list[tuple[np.ndarray, float, int]], # Chunked audio list of (ndarray, time position of chunk relative to original audio, channel_id)
|
||||
Path # Path to original audio
|
||||
]
|
||||
] # Listed of Chunked/Resampled audio
|
||||
audio_feeder_threadpool: list[Future]
|
||||
feature_extractor_threadpool: list[Future]
|
||||
features_lock: threading.Lock
|
||||
audio_feeder_barrier: threading.Barrier # Synchronization barrier for all audio feeder threads
|
||||
# Output
|
||||
features: dict[Path, list[tuple[np.ndarray, float, int]]]
|
||||
|
||||
if __name__ == "__main__":
|
||||
audio_path_queue = multiprocessing.Queue()
|
||||
audio_queue = multiprocessing.Queue()
|
||||
def __init__(
|
||||
self,
|
||||
paudio_paths: list[Path],
|
||||
pmax_audio_in_queue: int = 16,
|
||||
paudio_feeder_threads: int = 8,
|
||||
pfeature_extractor_threads: int = 8,
|
||||
pdesired_sr: int = 32000,
|
||||
pforce_mono: bool = False,
|
||||
pchunk_length: float = 15.0,
|
||||
pchunk_overlap: float = 2.0
|
||||
):
|
||||
# Check if the paths passed in are all valid and add them to queue
|
||||
self.audio_path_queue = queue.Queue()
|
||||
for p in paudio_paths:
|
||||
if not p.is_file():
|
||||
raise Exception(f"Path '{p.absolute()}' is NOT a valid file!")
|
||||
else:
|
||||
self.audio_path_queue.put(p)
|
||||
self.audio_path_queue.put(None) # To signal to the producer that the audio path list is empty, since Queue.empty() is unreliable
|
||||
|
||||
rand_paths = dataset.random_audio_chunk(1)
|
||||
for p in rand_paths:
|
||||
audio_path_queue.put(p)
|
||||
logging.info(f"[MTAFE] [Constructor] Queued {self.audio_path_queue.qsize() - 1} files")
|
||||
|
||||
print("Files queued")
|
||||
# Set up private attributes
|
||||
## Audio preprocessing parameters
|
||||
self.desired_sr = pdesired_sr
|
||||
self.mono = pforce_mono
|
||||
self.chunk_length = pchunk_length
|
||||
self.overlap = pchunk_overlap
|
||||
|
||||
processes = [multiprocessing.Process(target=copy_worker, args=(audio_path_queue, audio_queue)) for _ in range(1)]
|
||||
for p in processes: p.start()
|
||||
for p in processes: p.join()
|
||||
## Extractor/Feeder settings
|
||||
self.max_audio_in_queue = pmax_audio_in_queue
|
||||
self.audio_feeder_threads = paudio_feeder_threads
|
||||
self.feature_extractor_threads = pfeature_extractor_threads
|
||||
|
||||
print("Joined")
|
||||
#for _ in range(1): print(audio_queue.get())
|
||||
## Set up runtime conditions
|
||||
self.audio_queue = queue.Queue(maxsize=self.max_audio_in_queue)
|
||||
self.features = {}
|
||||
self.features_lock = threading.Lock()
|
||||
self.audio_feeder_barrier = threading.Barrier(self.audio_feeder_threads)
|
||||
self.audio_feeder_threadpool = []
|
||||
self.feature_extractor_threadpool = []
|
||||
|
||||
logging.info(f"[MTAFE] [Constructor] Extraction parameters: {pdesired_sr}Hz, Mono: {pforce_mono}, Divide into {pchunk_length}s chunks with {pchunk_overlap}s of overlap")
|
||||
logging.info(f"[MTAFE] [Constructor] Using {paudio_feeder_threads} threads for preprocessing audio and {pfeature_extractor_threads} threads for feature extraction. Max queue size of {pmax_audio_in_queue} files")
|
||||
|
||||
def audio_inference_embedding(self, audio: list[tuple[np.ndarray, float, int]]) -> list[tuple[np.ndarray, float, int]]:
|
||||
"""Receives a list of audio chunks, and then extracts embeddings for all audio chunks, returns the resulting embedding as a list of tuples(embedding, time, channel_id)
|
||||
|
||||
Args:
|
||||
audio (list[tuple[np.ndarray, float, int]]): list of audio chunks
|
||||
|
||||
Returns:
|
||||
list[tuple[np.ndarray, float, int]]: List of (embedding vector, timepos, channel id)
|
||||
"""
|
||||
features = []
|
||||
for audio_chunk in audio:
|
||||
audio, timepos, channel_id = audio_chunk
|
||||
zero = np.zeros(32)
|
||||
features.append( (zero, timepos, channel_id) )
|
||||
time.sleep(1.5) # Simulate effort, change to simulate spent seconds in each audio file
|
||||
return features
|
||||
# To be overridden
|
||||
|
||||
def audio_feeder_worker(self, thread_id: int, barrier: threading.Barrier): # AI
|
||||
try:
|
||||
while True:
|
||||
# Add timeout to prevent blocking indefinitely
|
||||
try:
|
||||
new_audio_path = self.audio_path_queue.get(timeout=10)
|
||||
except queue.Empty:
|
||||
logging.warning(f"[MTAFE] [Audio Feeder {thread_id}] Queue get timeout")
|
||||
continue
|
||||
|
||||
if new_audio_path is None:
|
||||
self.audio_path_queue.put(new_audio_path) # Put None back
|
||||
break
|
||||
|
||||
logging.info(f"[MTAFE] [Audio Feeder {thread_id}] Preprocess: {new_audio_path.absolute()}")
|
||||
|
||||
try:
|
||||
new_audio = audiopreprocessing.load_preprocessed_audio(
|
||||
new_audio_path,
|
||||
self.desired_sr,
|
||||
self.mono,
|
||||
self.chunk_length,
|
||||
self.overlap
|
||||
)
|
||||
|
||||
# Add timeout to prevent deadlock on full queue
|
||||
try:
|
||||
self.audio_queue.put((new_audio, new_audio_path), timeout=30)
|
||||
logging.info(f"[MTAFE] [Audio Feeder {thread_id}] Feed: {new_audio_path.absolute()}")
|
||||
except queue.Full:
|
||||
logging.error(f"[MTAFE] [Audio Feeder {thread_id}] Queue full, skipping {new_audio_path}")
|
||||
continue
|
||||
except Exception as e:
|
||||
logging.error(f"[MTAFE] [Audio Feeder {thread_id}] Error processing {new_audio_path}: {str(e)}")
|
||||
continue
|
||||
|
||||
# Add barrier timeout to prevent indefinite wait
|
||||
logging.info(f"[MTAFE] [Audio Feeder {thread_id}] Waiting for other threads")
|
||||
try:
|
||||
barrier.wait(timeout=60)
|
||||
except threading.BrokenBarrierError:
|
||||
logging.error(f"[MTAFE] [Audio Feeder {thread_id}] Barrier broken")
|
||||
|
||||
if thread_id == 0:
|
||||
self.audio_queue.put(None) # Signal end
|
||||
logging.info(f"[MTAFE] [Audio Feeder {thread_id}] Thread finished!")
|
||||
except Exception as e:
|
||||
logging.error(f"[MTAFE] [Audio Feeder {thread_id}] Fatal exception: {str(e)}")
|
||||
logging.exception(e)
|
||||
# Ensure barrier can progress even if a thread fails
|
||||
try:
|
||||
barrier.abort()
|
||||
except:
|
||||
pass
|
||||
# Ensure sentinel is added even if threads fail
|
||||
if thread_id == 0:
|
||||
try:
|
||||
self.audio_queue.put(None, timeout=5)
|
||||
except:
|
||||
pass
|
||||
|
||||
# def audio_feeder_worker(self, thread_id: int, barrier: threading.Barrier):
|
||||
# try:
|
||||
# while True:
|
||||
# # Attempt to get audio path from audio path queue
|
||||
# new_audio_path = self.audio_path_queue.get()
|
||||
# # Check thread exit condition (If the queue returns None, that means the audio path queue is now empty and the thread should end itself)
|
||||
# if (new_audio_path is None):
|
||||
# self.audio_path_queue.put(new_audio_path) # Put None back to notify other audio feeder threads
|
||||
# break # Break out of the infinite loop
|
||||
# # Audio path queue is not empty:
|
||||
# logging.info(f"[MTAFE] [Audio Feeder {thread_id}] Preprocess: {new_audio_path.absolute()}")
|
||||
# new_audio = audiopreprocessing.load_preprocessed_audio(
|
||||
# new_audio_path,
|
||||
# self.desired_sr,
|
||||
# self.mono,
|
||||
# self.chunk_length,
|
||||
# self.overlap
|
||||
# )
|
||||
# self.audio_queue.put((new_audio, new_audio_path))
|
||||
# logging.info(f"[MTAFE] [Audio Feeder {thread_id}] Feed: {new_audio_path.absolute()}")
|
||||
# logging.info(f"[MTAFE] [Audio Feeder {thread_id}] Waiting for other threads to finish")
|
||||
# barrier.wait()
|
||||
# if (thread_id == 0):
|
||||
# self.audio_queue.put(None) # None to signal audio_queue has no more elements to process
|
||||
# logging.info(f"[MTAFE] [Audio Feeder {thread_id}] Thread finished!")
|
||||
# except Exception as e:
|
||||
# logging.error(f"[MTAFE] [Audio Feeder {thread_id}] An exception occurred! Committing seppuku!")
|
||||
# logging.exception(e)
|
||||
# return
|
||||
|
||||
def feature_extractor_worker(self, thread_id: int):
|
||||
while True:
|
||||
# Attempt to get next audio chunks to process
|
||||
next_audio_tuple = self.audio_queue.get()
|
||||
# Check thread exit condition
|
||||
if (next_audio_tuple is None):
|
||||
self.audio_queue.put(next_audio_tuple) # Put the None back to notify other threads
|
||||
break # unalive urself
|
||||
else: # Assuming we got more tuples
|
||||
current_audio_to_process, current_audio_path = next_audio_tuple # Deconstruct tuple
|
||||
logging.info(f"[MTAFE] [Feature Extractor {thread_id}] Extracting: {current_audio_path}")
|
||||
features_to_add = self.audio_inference_embedding(current_audio_to_process)
|
||||
with self.features_lock:
|
||||
self.features[current_audio_path] = features_to_add
|
||||
logging.info(f"[MTAFE] [Feature Extractor {thread_id}] Feature Extraction complete for {current_audio_path} w/ {len(features_to_add)} features")
|
||||
logging.info(f"[MTAFE] [Feature Extractor {thread_id}] Thread finished!")
|
||||
|
||||
def test_audio_feeder_worker(self):
|
||||
total_file_amount = self.audio_path_queue.qsize() - 1
|
||||
logging.info("[MTAFE] [test_audio_feeder_worker] Spinning up new threads...")
|
||||
with ThreadPoolExecutor(max_workers=self.audio_feeder_threads) as executor:
|
||||
for i in range(self.audio_feeder_threads):
|
||||
ld_ft = executor.submit(self.audio_feeder_worker, i, self.audio_feeder_barrier)
|
||||
self.audio_feeder_threadpool.append(ld_ft)
|
||||
logging.info(f"[MTAFE] [test_audio_feeder_worker] Launched audio feeder {i}")
|
||||
for i in range(total_file_amount):
|
||||
_, p = self.audio_queue.get()
|
||||
time.sleep(0.25)
|
||||
logging.info(f"[MTAFE] [test_audio_feeder_worker] Popped: {p}")
|
||||
logging.info("[MTAFE] [test_audio_feeder_worker] All audio feeder worker joined!")
|
||||
#logging.info(f"[MTAFE] [test_audio_feeder_worker] Current audio queue size: {self.audio_queue.qsize()}")
|
||||
|
||||
def count_running_threads(self) -> tuple[int, int]:
|
||||
running_extractors = 0
|
||||
running_feeders = 0
|
||||
for ft in self.feature_extractor_threadpool:
|
||||
if ft.running(): running_extractors += 1
|
||||
for ft in self.audio_feeder_threadpool:
|
||||
if ft.running(): running_feeders += 1
|
||||
return (running_feeders, running_extractors)
|
||||
|
||||
def check_all_audiofeed_thread_finished(self) -> bool:
|
||||
for ft in self.audio_feeder_threadpool:
|
||||
if ft.running():
|
||||
return False
|
||||
return True
|
||||
|
||||
def check_all_featureextractor_thread_finished(self) -> bool:
|
||||
for ft in self.feature_extractor_threadpool:
|
||||
if ft.running():
|
||||
return False
|
||||
return True
|
||||
|
||||
def extract(self):
|
||||
total_amount = self.audio_path_queue.qsize() - 1 # Account for None to indicate queue end
|
||||
logging.info(f"[MTAFE] [Main] Starting feature extraction for {total_amount} file(s)")
|
||||
t_start = time.perf_counter() # Timer
|
||||
with ThreadPoolExecutor(max_workers=(self.audio_feeder_threads + self.feature_extractor_threads)) as executor:
|
||||
# Audio feeder threads
|
||||
for i in range(self.audio_feeder_threads):
|
||||
logging.info(f"[MTAFE] Started audio feeder thread {i}")
|
||||
ld_ft = executor.submit(self.audio_feeder_worker, i, self.audio_feeder_barrier)
|
||||
self.audio_feeder_threadpool.append(ld_ft)
|
||||
# Feature extractor threads
|
||||
for i in range(self.feature_extractor_threads):
|
||||
logging.info(f"[MTAFE] Started feature extractor thread {i}")
|
||||
ex_ft = executor.submit(self.feature_extractor_worker, i)
|
||||
self.feature_extractor_threadpool.append(ex_ft)
|
||||
# Progress checking
|
||||
while ( (not self.check_all_audiofeed_thread_finished()) and (not self.check_all_featureextractor_thread_finished()) ):
|
||||
nfeeder, nextract = self.count_running_threads()
|
||||
print(f"[MTAFE Progress] Processed {len(self.features)}/{total_amount} (L:{self.audio_queue.qsize()}/W:{self.audio_path_queue.qsize()}, LD:{nfeeder}/EXT:{nextract})", end="\r")
|
||||
t_stop = time.perf_counter()
|
||||
logging.info(f"[MTAFE] Processed {len(self.features)}/{total_amount} (L:{self.audio_queue.qsize() - 1}/W:{self.audio_path_queue.qsize() - 1} COMPLETE)")
|
||||
delta_t = t_stop - t_start
|
||||
total_features = sum( [len(self.features[path]) for path in self.features] )
|
||||
logging.info(f"[MTAFE] Extraction complete. Took {delta_t} seconds. Added {total_features} vectors/embeddings")
|
||||
@@ -1,30 +0,0 @@
|
||||
import logging
|
||||
logging.basicConfig(format="%(asctime)s/%(levelname)s: [%(module)s] %(message)s", level=logging.INFO)
|
||||
|
||||
import multiprocessing
|
||||
from dataset import random_audio_chunk
|
||||
import audiopreprocessing
|
||||
from time import sleep
|
||||
|
||||
origin_queue = multiprocessing.Queue()
|
||||
target_queue = multiprocessing.Queue()
|
||||
|
||||
def worker(orig, targ):
|
||||
p = orig.get()
|
||||
#out = "PROCESSED" + str(p.absolute())
|
||||
out = audiopreprocessing.load_preprocessed_audio(p, 16000, True) # This will cause put to hang
|
||||
targ.put(out) # This will hang the process
|
||||
|
||||
if __name__ == "__main__":
|
||||
K = 2
|
||||
|
||||
for p in random_audio_chunk(K):
|
||||
origin_queue.put(p)
|
||||
|
||||
processes = [multiprocessing.Process(target=worker, args=(origin_queue, target_queue)) for _ in range(K)]
|
||||
for p in processes: p.start()
|
||||
for p in processes: p.join()
|
||||
|
||||
logging.critical("Successfully terminated all threads")
|
||||
|
||||
for _ in range(K): print(target_queue.get())
|
||||
@@ -5,17 +5,18 @@ import mtafe
|
||||
from dataset import random_audio_chunk
|
||||
|
||||
logging.info("Generating random audio path list")
|
||||
rdpl = random_audio_chunk(2)
|
||||
rdpl = random_audio_chunk(256)
|
||||
|
||||
logging.info("Initializing MTAFE")
|
||||
mtafe.initialize_parameters(
|
||||
m = mtafe.mtafe(
|
||||
paudio_paths=rdpl,
|
||||
pmax_audio_in_queue=4,
|
||||
paudio_feeder_threads=2,
|
||||
pfeature_extractor_threads=1,
|
||||
pmax_audio_in_queue=8,
|
||||
paudio_feeder_threads=8,
|
||||
pfeature_extractor_threads=2,
|
||||
pdesired_sr=32000,
|
||||
pforce_mono=False,
|
||||
pchunk_length=15,
|
||||
pchunk_overlap=2
|
||||
)
|
||||
mtafe.test_feeder()
|
||||
#m.test_audio_feeder_worker()
|
||||
m.extract()
|
||||
BIN
mtafe_lab/testmtafeprofile.txt
Normal file
BIN
mtafe_lab/testmtafeprofile.txt
Normal file
Binary file not shown.
Reference in New Issue
Block a user