processing framework
This commit is contained in:
@@ -8,8 +8,8 @@ import concurrent.futures
|
||||
import numpy as np
|
||||
from pathlib import Path
|
||||
import audiopreprocessing
|
||||
|
||||
DEBUG=True
|
||||
import logging
|
||||
import queue
|
||||
|
||||
def serialize_dict_obj(path : Path, object : dict) -> int:
|
||||
"""Serializes Python Dictionary object to a file via Pickle.
|
||||
@@ -27,7 +27,7 @@ def serialize_dict_obj(path : Path, object : dict) -> int:
|
||||
size = fp.tell()
|
||||
return size
|
||||
|
||||
print("Reading local dataset directory structure...")
|
||||
logging.info("Reading local dataset directory structure...")
|
||||
|
||||
ASMRThreePath = Path("C:\\ASMRThree")
|
||||
ASMRTwoPath = Path("D:\\ASMRTwo")
|
||||
@@ -149,7 +149,7 @@ class AudioFeatureExtractor():
|
||||
__mono: bool
|
||||
__chunk_length: float
|
||||
__overlap: float
|
||||
__features: dict[Path, list[tuple[np.ndarray, float, int]]]
|
||||
__features: dict[Path, list[tuple[np.ndarray, float, int]]] # This is a crime, I know
|
||||
# { audioPath:
|
||||
# [(embedding, pos, channel)...]
|
||||
# }
|
||||
@@ -184,7 +184,7 @@ class AudioFeatureExtractor():
|
||||
"""
|
||||
while (self.__audio_paths_list): # While there are still Path elements in path list
|
||||
if (not (len(self.__audio_queue) < self.__max_audio_in_queue)):
|
||||
if DEBUG: print("Audio Queue Thread: Queue Full, feeder thread sleeping for 5 seconds")
|
||||
logging.info("[AFE] [Audio Queue Thread]: Queue Full, feeder thread sleeping for 5 seconds")
|
||||
time.sleep(5)
|
||||
while(len(self.__audio_queue) < self.__max_audio_in_queue): # While the audio queue is not full
|
||||
new_audio_path = self.__audio_paths_list[0]
|
||||
@@ -200,8 +200,8 @@ class AudioFeatureExtractor():
|
||||
(new_audio, new_audio_path)
|
||||
)
|
||||
pop_path = self.__audio_paths_list.pop(0)
|
||||
if DEBUG: print("Audio Queue Thread: Added new audio to queue", pop_path)
|
||||
if DEBUG: print("Audio Queue Thread: DONE. All audio files fed")
|
||||
logging.info(f"[AFE] [Audio Queue Thread]: Added new audio to queue {pop_path}")
|
||||
logging.info("[AFE] [Audio Queue Thread]: DONE. All audio files fed")
|
||||
|
||||
def __audio_queue_feature_extractor(self):
|
||||
"""Internal thread function. Get audio from audio queue. And extract embedding vector
|
||||
@@ -212,7 +212,7 @@ class AudioFeatureExtractor():
|
||||
if (self.__audio_queue): # If audio queue is not empty
|
||||
with self.__queue_lock:
|
||||
audio_to_process, audio_path = self.__audio_queue.pop(0) # Get audio from queue
|
||||
if DEBUG: print(f"Feature Extractor Thread: Extracting {len(audio_to_process)} features from audio", audio_path)
|
||||
logging.info(f"[AFE] [Feature Extractor Thread]: Extracting {len(audio_to_process)} features from audio {audio_path}")
|
||||
for audio_chunk in audio_to_process:
|
||||
same_audio_chunk, timepos, channel_id, embedd_vect = self.__embedding_extract(audio_chunk)
|
||||
if (audio_path not in self.__features.keys()):
|
||||
@@ -224,9 +224,9 @@ class AudioFeatureExtractor():
|
||||
(embedd_vect, timepos, channel_id)
|
||||
)
|
||||
else:
|
||||
if DEBUG: print("Feature Extractor Thread: Queue Empty, extractor thread sleeping for 5 seconds") # If audio queue is empty, wait
|
||||
logging.info("[AFE] [Feature Extractor Thread]: Queue Empty, extractor thread sleeping for 5 seconds") # If audio queue is empty, wait
|
||||
time.sleep(5)
|
||||
if DEBUG: print("Feature Extractor Thread: DONE. Extracted all features from all audio files")
|
||||
logging.info("[AFE] [Feature Extractor Thread]: DONE. Extracted all features from all audio files")
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
@@ -269,4 +269,203 @@ class AudioFeatureExtractor():
|
||||
print()
|
||||
print("Extraction completed")
|
||||
print(f"Took {delta_t} seconds. Added {total_features} vectors/embeddings")
|
||||
|
||||
|
||||
class MultiThreadedAudioFeatureExtractor():
|
||||
__audio_queue: queue.Queue[ # List of ...
|
||||
tuple[ # Pair of chunked audio and its path
|
||||
list[tuple[np.ndarray, float, int]], # Chunked audio
|
||||
Path # Path to original audio
|
||||
]
|
||||
] # Listed of Chunked/Resampled audio
|
||||
__audio_loader_threads: int # Amount of audio feeder threads
|
||||
__feature_extractor_threads: int # Amount of feature extractor threads (if the method allows)
|
||||
__audio_paths_list: queue.Queue[Path] # Path list to audio
|
||||
__max_audio_in_queue: int # Maximum audio in queue
|
||||
# Audio Feeeder parameter
|
||||
__desired_sr: int # Desired Sample Rate (Resampling)
|
||||
__mono: bool # Force load audio in mono mode
|
||||
__chunk_length: float # Audio chunk length
|
||||
__overlap: float
|
||||
# Result
|
||||
__features: dict[Path, list[tuple[np.ndarray, float, int]]] # This is a crime, I know
|
||||
__features_lock: threading.Lock
|
||||
# __features: { audioPath:
|
||||
# [(embedding1, pos1, channel1),
|
||||
# (embedding2, pos2, channel1)]
|
||||
# ...
|
||||
# }
|
||||
# Runtime
|
||||
__audio_loader_threadpool: list[concurrent.futures.Future]
|
||||
__feature_extractor_threadpool: list[concurrent.futures.Future]
|
||||
__audio_feed_condition: threading.Condition
|
||||
|
||||
def __audio_inference_embedding(self, audio: list[tuple[np.ndarray, float, int]]) -> list[tuple[np.ndarray, float, int]]:
|
||||
"""Receives a list of audio chunks, and then extracts embeddings for all audio chunks, returns the resulting embedding as a list of tuples(embedding, time, channel_id)
|
||||
|
||||
Args:
|
||||
audio (list[tuple[np.ndarray, float, int]]): list of audio chunks
|
||||
|
||||
Returns:
|
||||
list[tuple[np.ndarray, float, int]]: List of (embedding vector, timepos, channel id)
|
||||
"""
|
||||
features = []
|
||||
for audio_chunk in audio:
|
||||
audio, timepos, channel_id = audio_chunk
|
||||
zero = np.zeros(32)
|
||||
features.append( (zero, timepos, channel_id) )
|
||||
time.sleep(0.01)
|
||||
return features
|
||||
# To be overridden
|
||||
|
||||
def __audio_feeder_thread(self, thread_id):
|
||||
# If there is still audio in paths list
|
||||
# Is the audio queue not full?
|
||||
while (not self.__audio_paths_list.empty()):
|
||||
if (not self.__audio_queue.full()):
|
||||
# Feed audio
|
||||
new_audio_path = self.__audio_paths_list.get()
|
||||
self.__audio_paths_list.task_done()
|
||||
logging.info(f"[MTAFE] [Audio Feeder {thread_id}] Preprocess: {new_audio_path.absolute()}")
|
||||
new_audio = audiopreprocessing.load_preprocessed_audio(
|
||||
new_audio_path,
|
||||
self.__desired_sr,
|
||||
self.__mono,
|
||||
self.__chunk_length,
|
||||
self.__overlap
|
||||
)
|
||||
self.__audio_queue.put((new_audio, new_audio_path))
|
||||
#self.__audio_queue.task_done()
|
||||
#with self.__audio_feed_condition: self.__audio_feed_condition.notify_all()
|
||||
logging.info(f"[MTAFE] [Audio Feeder {thread_id}] Feed: {new_audio_path.absolute()}")
|
||||
#else:
|
||||
# logging.info(f"[MTAFE] [Audio Feeder {thread_id}] Audio queue full ({self.__audio_queue.qsize()} <= {self.__max_audio_in_queue} FALSE): waiting")
|
||||
# with self.__audio_feed_condition:
|
||||
# logging.info(f"[MTAFE] [Audio Feeder {thread_id}] Audio queue full: waiting")
|
||||
# self.__audio_feed_condition.wait_for(lambda: not self.__audio_queue.qsize() <= self.__max_audio_in_queue) # This consumes way too much CPU power
|
||||
# self.__audio_feed_condition.wait(10)
|
||||
logging.info(f"[MTAFE] [Audio Feeder {thread_id}] Thread finished!")
|
||||
|
||||
#def testfeedthread(self, nthreads):
|
||||
# t1 = threading.Thread(target=self.__audio_feeder_thread, args=(1,))
|
||||
# t2 = threading.Thread(target=self.__audio_feeder_thread, args=(2,))
|
||||
# t1.start(); t2.start()
|
||||
# #with self.__audio_feed_condition:
|
||||
# # self.__audio_feed_condition.notify_all()
|
||||
# t1.join(); t2.join()
|
||||
# with concurrent.futures.ThreadPoolExecutor(max_workers=nthreads) as executor:
|
||||
# for i in range(nthreads):
|
||||
# ft = executor.submit(self.__audio_feeder_thread, i)
|
||||
# self.__audio_loader_threadpool.append(ft)
|
||||
|
||||
def __check_all_audiofeed_thread_finished(self) -> bool:
|
||||
for ft in self.__audio_loader_threadpool:
|
||||
if ft.running():
|
||||
return False
|
||||
return True
|
||||
|
||||
def __check_all_featureextractor_thread_finished(self) -> bool:
|
||||
for ft in self.__feature_extractor_threadpool:
|
||||
if ft.running():
|
||||
return False
|
||||
return True
|
||||
|
||||
def __feature_extractor_thread(self, thread_id):
|
||||
while (not self.__check_all_audiofeed_thread_finished() or not self.__audio_queue.empty()):
|
||||
if (not self.__audio_queue.empty()):
|
||||
audio_to_process, audio_path = self.__audio_queue.get()
|
||||
self.__audio_queue.task_done()
|
||||
logging.info(f"[MTAFE] [Feature Extractor {thread_id}] Extracting: {audio_path}")
|
||||
features_to_add = self.__audio_inference_embedding(audio_to_process)
|
||||
logging.info(f"[MTAFE] [Feature Extractor {thread_id}] Extracted: {len(features_to_add)} features")
|
||||
with self.__features_lock:
|
||||
self.__features[audio_path] = features_to_add
|
||||
#with self.__audio_feed_condition: self.__audio_feed_condition.notify_all()
|
||||
logging.info(f"[MTAFE] [Feature Extractor {thread_id}] Feature Extraction complete for {audio_path} w/ {len(features_to_add)} features")
|
||||
#else:
|
||||
# if (not self.__check_all_audiofeed_thread_finished()):
|
||||
# with self.__audio_feed_condition:
|
||||
# logging.info(f"[MTAFE] [Feature Extractor {thread_id}] Audio queue empty: waiting")
|
||||
# self.__audio_feed_condition.wait(10)
|
||||
# self.__audio_feed_condition.wait_for(lambda: not self.__audio_queue.empty())
|
||||
|
||||
logging.info(f"[MTAFE] [Feature Extractor {thread_id}] Thread finished!")
|
||||
|
||||
def __count_running_threads(self) -> tuple[int, int]:
|
||||
running_extractors = 0
|
||||
running_feeders = 0
|
||||
for ft in self.__feature_extractor_threadpool:
|
||||
if ft.running(): running_extractors += 1
|
||||
for ft in self.__audio_loader_threadpool:
|
||||
if ft.running(): running_feeders += 1
|
||||
return (running_feeders, running_extractors)
|
||||
|
||||
@property
|
||||
def features(self) -> dict[Path, list[tuple[np.ndarray, float, int]]]:
|
||||
return self.__features
|
||||
|
||||
def extract(self):
|
||||
total_amount = self.__audio_paths_list.qsize()
|
||||
logging.info(f"[MTAFE] [Main] Starting feature extraction for {total_amount} file(s)")
|
||||
t_start = time.perf_counter()
|
||||
with concurrent.futures.ThreadPoolExecutor(max_workers=(self.__audio_loader_threads + self.__feature_extractor_threads)) as executor:
|
||||
for i in range(self.__audio_loader_threads):
|
||||
ld_ft = executor.submit(self.__audio_feeder_thread, i)
|
||||
self.__audio_loader_threadpool.append(ld_ft)
|
||||
for i in range(self.__feature_extractor_threads):
|
||||
ld_ft = executor.submit(self.__feature_extractor_thread, i)
|
||||
self.__feature_extractor_threadpool.append(ld_ft)
|
||||
while ( (not self.__check_all_audiofeed_thread_finished()) and (not self.__check_all_featureextractor_thread_finished()) ):
|
||||
nfeeder, nextract = self.__count_running_threads()
|
||||
print(f"[MTAFE Progress] Processed {len(self.__features)}/{total_amount} (L:{self.__audio_queue.qsize()}/W:{self.__audio_paths_list.qsize()}, LD:{nfeeder}/EXT:{nextract})", end="\r")
|
||||
t_stop = time.perf_counter()
|
||||
logging.info(f"[MTAFE] Processed {len(self.__features)}/{total_amount} (L:{self.__audio_queue.qsize()}/W:{self.__audio_paths_list.qsize()} COMPLETE)")
|
||||
delta_t = t_stop - t_start
|
||||
total_features = sum( [len(self.__features[path]) for path in self.__features] )
|
||||
logging.info(f"[MTAFE] Extraction complete. Took {delta_t} seconds. Added {total_features} vectors/embeddings")
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
audio_paths: list[Path],
|
||||
max_audio_in_queue: int = 16,
|
||||
audio_feeder_threads: int = 8,
|
||||
feature_extractor_threads: int = 8,
|
||||
desired_sr: int = 32000,
|
||||
force_mono: bool = False,
|
||||
chunk_length: float = 15.0,
|
||||
chunk_overlap: float = 2.0,
|
||||
):
|
||||
# Check if the paths passed in are all valid and add them to queue
|
||||
self.__audio_paths_list = queue.Queue()
|
||||
for p in audio_paths:
|
||||
if not p.is_file():
|
||||
raise Exception(f"Path '{p.absolute()}' is NOT a valid file!")
|
||||
else:
|
||||
self.__audio_paths_list.put(p)
|
||||
#self.__audio_paths_list.task_done()
|
||||
|
||||
logging.info(f"[MTAFE] [Constructor] Queued {self.__audio_paths_list.qsize()} files")
|
||||
|
||||
# Set up private attributes
|
||||
## Audio preprocessing parameters
|
||||
self.__desired_sr = desired_sr
|
||||
self.__mono = force_mono
|
||||
self.__chunk_length = chunk_length
|
||||
self.__overlap = chunk_overlap
|
||||
|
||||
## Extractor/Feeder settings
|
||||
self.__max_audio_in_queue = max_audio_in_queue
|
||||
self.__audio_loader_threads = audio_feeder_threads
|
||||
self.__feature_extractor_threads = feature_extractor_threads
|
||||
|
||||
## Set up runtime conditions
|
||||
self.__audio_queue = queue.Queue()
|
||||
self.__features = {}
|
||||
self.__features_lock = threading.Lock()
|
||||
self.__audio_loader_threadpool = []
|
||||
self.__feature_extractor_threadpool = []
|
||||
self.__audio_feed_condition = threading.Condition()
|
||||
|
||||
logging.info(f"[MTAFE] [Constructor] Extraction parameters: {desired_sr}Hz, Mono: {force_mono}, Divide into {chunk_length}s chunks with {chunk_overlap}s of overlap")
|
||||
logging.info(f"[MTAFE] [Constructor] Using {audio_feeder_threads} threads for preprocessing audio and {feature_extractor_threads} threads for feature extraction. Max queue size of {max_audio_in_queue} files")
|
||||
|
||||
# More audio embeddings specific code below (To be overridden)
|
||||
Reference in New Issue
Block a user