Compare commits

..

2 Commits

14 changed files with 9069 additions and 10789 deletions

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@@ -3,18 +3,31 @@ import pickle
import os import os
import numpy as np import numpy as np
from pathlib import Path from pathlib import Path
import logging
DEBUG=True logger = logging.getLogger(__name__)
def triggerlog():
logger.critical("Testing: info")
def resample_load(input_path : Path, target_sr : int = 16000, mono_audio : bool = False) -> np.ndarray: # AI def resample_load(input_path : Path, target_sr : int = 16000, mono_audio : bool = False) -> np.ndarray: # AI
"""Resample audio to target sample rate and save to output directory""" """Load and resamples the audio into `target_sr`.
Args:
input_path (Path): pathlib.Path object to audio file
target_sr (int, optional): Target Sample Rate to resample. Defaults to 16000.
mono_audio (bool, optional): Load the audio in mono mode. Defaults to False.
Returns:
np.ndarray: _description_
"""
# Load audio file with original sample rate # Load audio file with original sample rate
if DEBUG: print("[resample_load] Loading audio", input_path) logger.info(f"[resample_load] Loading audio {input_path}")
audio, orig_sr = librosa.load(input_path, sr=None, mono=mono_audio) audio, orig_sr = librosa.load(input_path, sr=None, mono=mono_audio)
# Resample if necessary # Resample if necessary
if orig_sr != target_sr: if orig_sr != target_sr:
if DEBUG: print("[resample_load] Resampling to", target_sr) logger.info(f"[resample_load] Resampling to {target_sr}")
audio = librosa.resample(audio, orig_sr=orig_sr, target_sr=target_sr) audio = librosa.resample(audio, orig_sr=orig_sr, target_sr=target_sr)
return audio return audio
@@ -24,7 +37,7 @@ def chunk_audio(audio : np.ndarray, sr: int, chunk_length: float = 10.0, overlap
Chunks audio file into overlapping segments. Only pass in mono audio here. Chunks audio file into overlapping segments. Only pass in mono audio here.
Args: Args:
audio_file: Loaded audio ndarray audio_file: Loaded audio ndarray (one channel only)
sr: Sample rate for the given audio file sr: Sample rate for the given audio file
chunk_length: Length of each chunk in seconds chunk_length: Length of each chunk in seconds
overlap: Overlap between chunks in seconds overlap: Overlap between chunks in seconds
@@ -32,7 +45,7 @@ def chunk_audio(audio : np.ndarray, sr: int, chunk_length: float = 10.0, overlap
Returns: Returns:
List of audio chunks, list of chunk positions, and given sample rate List of audio chunks, list of chunk positions, and given sample rate
""" """
if DEBUG: print("[chunk_audio] Chunking audio") logger.info(f"[chunk_audio] Chunking audio ({len(audio) / sr}s)")
# Calculate chunk size and hop length in samples # Calculate chunk size and hop length in samples
chunk_size = int(chunk_length * sr) chunk_size = int(chunk_length * sr)
hop_length = int((chunk_length - overlap) * sr) hop_length = int((chunk_length - overlap) * sr)
@@ -46,10 +59,12 @@ def chunk_audio(audio : np.ndarray, sr: int, chunk_length: float = 10.0, overlap
chunks.append(chunk) chunks.append(chunk)
positions.append(i / sr) positions.append(i / sr)
k += 1 k += 1
if DEBUG: print("[chunk_audio] Chunked", k, end="\r")
if k == 0: # The full audio length is less than chunk_length if k == 0: # The full audio length is less than chunk_length
chunks = [audio] chunks = [audio]
positions = [0.0] positions = [0.0]
logger.info(f"[chunk_audio] Audio less than chunk_length. Returning original audio as chunk\r")
else:
logger.info(f"[chunk_audio] Audio is split into {k} chunks")
return chunks, positions, sr return chunks, positions, sr

View File

@@ -2,14 +2,15 @@ import platform
import os import os
import pickle import pickle
import random import random
import multiprocessing
import threading import threading
import time import time
import concurrent.futures import concurrent.futures
import numpy as np import numpy as np
from pathlib import Path from pathlib import Path
import audiopreprocessing import audiopreprocessing
import logging
DEBUG=True import queue
def serialize_dict_obj(path : Path, object : dict) -> int: def serialize_dict_obj(path : Path, object : dict) -> int:
"""Serializes Python Dictionary object to a file via Pickle. """Serializes Python Dictionary object to a file via Pickle.
@@ -27,7 +28,7 @@ def serialize_dict_obj(path : Path, object : dict) -> int:
size = fp.tell() size = fp.tell()
return size return size
print("Reading local dataset directory structure...") logging.info("Reading local dataset directory structure...")
ASMRThreePath = Path("C:\\ASMRThree") ASMRThreePath = Path("C:\\ASMRThree")
ASMRTwoPath = Path("D:\\ASMRTwo") ASMRTwoPath = Path("D:\\ASMRTwo")
@@ -133,61 +134,205 @@ def random_audio_chunk(n : int, seed : int = 177013) -> list[Path]:
#return random.choices(audio_paths, k=n) # Contains repeated elements #return random.choices(audio_paths, k=n) # Contains repeated elements
return random.sample(audio_paths, k=n) return random.sample(audio_paths, k=n)
class AudioFeatureExtractor(): # class AudioFeatureExtractor():
__audio_queue: list[ # List of ... # __audio_queue: list[ # List of ...
# tuple[ # Pair of chunked audio and its path
# list[tuple[np.ndarray, float, int]], # Chunked audio
# Path # Path to original audio
# ]
# ] # Listed of Chunked/Resampled audio
# __feeder_future: concurrent.futures.Future
# __extractor_future: concurrent.futures.Future
# __audio_paths_list: list[Path]
# __max_audio_in_queue: int
# __queue_lock: threading.Lock
# __desired_sr: int
# __mono: bool
# __chunk_length: float
# __overlap: float
# __features: dict[Path, list[tuple[np.ndarray, float, int]]] # This is a crime, I know
# # { audioPath:
# # [(embedding, pos, channel)...]
# # }
# def __embedding_inference(self, audio_ndarray: np.ndarray) -> np.ndarray:
# """Uses embedding model to inference an audio. Returns embedding vectors.
# Function to be overrided. Returns np.zeros(32).
# Args:
# audio_ndarray (np.ndarray):
# Returns:
# np.ndarray: _description_
# """
# return np.zeros(32)
# def __embedding_extract(self, audio: tuple[np.ndarray, float, int]) -> tuple[np.ndarray, float, int, np.ndarray]:
# """Receives a tuple of audio, position, and channel ID, then adding the embedding to the tuple
# Args:
# audio (tuple[np.ndarray, float, int]): tuple of audio, position, channel id
# Returns:
# tuple[np.ndarray, float, int, np.ndarray]: audio, position, channel id, embedding vector
# """
# audio_chunk, pos, channel_id = audio
# return (audio_chunk, pos, channel_id, self.__embedding_inference(audio_chunk))
# def __audio_queue_feeder(self): # TODO: Upgrade to multithreaded loader?
# """Internal thread function. Preprocess and load the audio continuously to
# audio_queue until the end of the audio_paths_list
# """
# while (self.__audio_paths_list): # While there are still Path elements in path list
# if (not (len(self.__audio_queue) < self.__max_audio_in_queue)):
# logging.info("[AFE] [Audio Queue Thread]: Queue Full, feeder thread sleeping for 5 seconds")
# time.sleep(5)
# while(len(self.__audio_queue) < self.__max_audio_in_queue): # While the audio queue is not full
# new_audio_path = self.__audio_paths_list[0]
# new_audio = audiopreprocessing.load_preprocessed_audio(
# new_audio_path,
# self.__desired_sr,
# self.__mono,
# self.__chunk_length,
# self.__overlap
# )
# with self.__queue_lock:
# self.__audio_queue.append(
# (new_audio, new_audio_path)
# )
# pop_path = self.__audio_paths_list.pop(0)
# logging.info(f"[AFE] [Audio Queue Thread]: Added new audio to queue {pop_path}")
# logging.info("[AFE] [Audio Queue Thread]: DONE. All audio files fed")
# def __audio_queue_feature_extractor(self):
# """Internal thread function. Get audio from audio queue. And extract embedding vector
# for all audio chunks. Stores the resulting embedding into self.__features.
# With Original Audio's Path as key, and list[tuple[np.ndarray, float, int]] (list of tuple of embedding vector, position, channel id)
# """
# while (self.__audio_paths_list or self.__audio_queue): # While there are still audio to be processed
# if (self.__audio_queue): # If audio queue is not empty
# with self.__queue_lock:
# audio_to_process, audio_path = self.__audio_queue.pop(0) # Get audio from queue
# logging.info(f"[AFE] [Feature Extractor Thread]: Extracting {len(audio_to_process)} features from audio {audio_path}")
# for audio_chunk in audio_to_process:
# same_audio_chunk, timepos, channel_id, embedd_vect = self.__embedding_extract(audio_chunk)
# if (audio_path not in self.__features.keys()):
# #if DEBUG: print("Adding new vector to", audio_path.name)
# self.__features[audio_path] = [(embedd_vect, timepos, channel_id)]
# else:
# #if DEBUG: print("Adding vector to", audio_path.name)
# self.__features[audio_path].append(
# (embedd_vect, timepos, channel_id)
# )
# else:
# logging.info("[AFE] [Feature Extractor Thread]: Queue Empty, extractor thread sleeping for 5 seconds") # If audio queue is empty, wait
# time.sleep(5)
# logging.info("[AFE] [Feature Extractor Thread]: DONE. Extracted all features from all audio files")
# def __init__(
# self,
# audio_paths_list: list[Path],
# max_audio_in_queue: int,
# desired_sr: int,
# mono: bool,
# chunk_length: float = 15.0,
# overlap: float = 2.0
# ):
# self.__audio_queue = []
# self.__audio_paths_list = audio_paths_list
# self.__max_audio_in_queue = max_audio_in_queue
# self.__queue_lock = threading.Lock()
# self.__desired_sr = desired_sr
# self.__mono = mono
# self.__chunk_length = chunk_length
# self.__overlap = overlap
# self.__features = {}
# @property
# def features(self) -> dict[Path, list[tuple[np.ndarray, float, int]]]:
# return self.__features
# def extract(self):
# print("Starting feature extraction for", len(self.__audio_paths_list), "file(s)")
# total_amount = len(self.__audio_paths_list)
# t_start = time.perf_counter()
# with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
# self.__feeder_future = executor.submit(self.__audio_queue_feeder)
# self.__extractor_future = executor.submit(self.__audio_queue_feature_extractor)
# while (self.__feeder_future.running() or self.__extractor_future.running()):
# print(f"Processed {len(self.__features)}/{total_amount} (L:{len(self.__audio_queue)}/W{len(self.__audio_paths_list)})", end="\r")
# time.sleep(1)
# t_stop = time.perf_counter()
# print(f"Processed {len(self.__features)}/{total_amount} (L:{len(self.__audio_queue)}/W:{len(self.__audio_paths_list)} COMPLETE)")
# delta_t = t_stop - t_start
# total_features = sum( [len(self.__features[path]) for path in self.__features] )
# print()
# print("Extraction completed")
# print(f"Took {delta_t} seconds. Added {total_features} vectors/embeddings")
class MultiThreadedAudioFeatureExtractor():
# This is the third time I am rewriting this, please send help. Multithreaded apps is pure hell to develop and debug
# After testing: this will hang at the last audio, precisely at preprocessing audio. I suspect that GIL hit the performance
# so much to the point that the preprocessing routine cannot get any share of the CPU execution cycle
__audio_queue: queue.Queue[ # List of ...
tuple[ # Pair of chunked audio and its path tuple[ # Pair of chunked audio and its path
list[tuple[np.ndarray, float, int]], # Chunked audio list[tuple[np.ndarray, float, int]], # Chunked audio list of (ndarray, time position of chunk relative to original audio, channel_id)
Path # Path to original audio Path # Path to original audio
] ]
] # Listed of Chunked/Resampled audio ] # Listed of Chunked/Resampled audio
__feeder_future: concurrent.futures.Future __audio_feeder_threads: int # Amount of audio feeder threads
__extractor_future: concurrent.futures.Future __feature_extractor_threads: int # Amount of feature extractor threads (if the method allows)
__audio_paths_list: list[Path] __audio_paths_list: queue.Queue[Path] # Path list to audio
__max_audio_in_queue: int __max_audio_in_queue: int # Maximum audio in queue
__queue_lock: threading.Lock __audio_feeder_barrier: threading.Barrier # Synchronization barrier for all audio feeder threads
__desired_sr: int # Audio Feeder parameter
__mono: bool __desired_sr: int # Desired Sample Rate (Resampling)
__chunk_length: float __mono: bool # Force load audio in mono mode
__chunk_length: float # Audio chunk length
__overlap: float __overlap: float
__features: dict[Path, list[tuple[np.ndarray, float, int]]] # Result
# { audioPath: __features: dict[Path, list[tuple[np.ndarray, float, int]]] # This is a crime, I know
# [(embedding, pos, channel)...] __features_lock: threading.Lock
# __features: { audioPath:
# [(embedding1, pos1, channel1),
# (embedding2, pos2, channel1)]
# ...
# } # }
# Runtime
__audio_feeder_threadpool: list[concurrent.futures.Future]
__feature_extractor_threadpool: list[concurrent.futures.Future]
def __embedding_inference(self, audio_ndarray: np.ndarray) -> np.ndarray: def __audio_inference_embedding(self, audio: list[tuple[np.ndarray, float, int]]) -> list[tuple[np.ndarray, float, int]]:
"""Uses embedding model to inference an audio. Returns embedding vectors. """Receives a list of audio chunks, and then extracts embeddings for all audio chunks, returns the resulting embedding as a list of tuples(embedding, time, channel_id)
Function to be overrided. Returns np.zeros(32).
Args: Args:
audio_ndarray (np.ndarray): audio (list[tuple[np.ndarray, float, int]]): list of audio chunks
Returns: Returns:
np.ndarray: _description_ list[tuple[np.ndarray, float, int]]: List of (embedding vector, timepos, channel id)
""" """
return np.zeros(32) features = []
for audio_chunk in audio:
audio, timepos, channel_id = audio_chunk
zero = np.zeros(32)
features.append( (zero, timepos, channel_id) )
time.sleep(0.01) # Simulate effort, change to simulate spent seconds in each audio file
return features
# To be overridden
def __embedding_extract(self, audio: tuple[np.ndarray, float, int]) -> tuple[np.ndarray, float, int, np.ndarray]: def __audio_feeder_thread(self, thread_id: int, barrier: threading.Barrier):
"""Receives a tuple of audio, position, and channel ID, then adding the embedding to the tuple try:
while True:
Args: # Attempt to get audio path from audio path queue
audio (tuple[np.ndarray, float, int]): tuple of audio, position, channel id new_audio_path = self.__audio_paths_list.get()
# Check thread exit condition (If the queue returns None, that means the audio path queue is now empty and the thread should end itself)
Returns: if (new_audio_path is None):
tuple[np.ndarray, float, int, np.ndarray]: audio, position, channel id, embedding vector self.__audio_paths_list.put(new_audio_path) # Put None back to notify other audio feeder threads
""" # Omae wa mou shindeiru
audio_chunk, pos, channel_id = audio break # Si la ETSISI ve esto seguramente me echarán de la escuela
return (audio_chunk, pos, channel_id, self.__embedding_inference(audio_chunk)) # Now that the audio path queue is not empty, try preprocessing an audio
logging.info(f"[MTAFE] [Audio Feeder {thread_id}] Preprocess: {new_audio_path.absolute()}")
def __audio_queue_feeder(self): # TODO: Upgrade to multithreaded loader?
"""Internal thread function. Preprocess and load the audio continuously to
audio_queue until the end of the audio_paths_list
"""
while (self.__audio_paths_list): # While there are still Path elements in path list
if (not (len(self.__audio_queue) < self.__max_audio_in_queue)):
if DEBUG: print("Audio Queue Thread: Queue Full, feeder thread sleeping for 5 seconds")
time.sleep(5)
while(len(self.__audio_queue) < self.__max_audio_in_queue): # While the audio queue is not full
new_audio_path = self.__audio_paths_list[0]
new_audio = audiopreprocessing.load_preprocessed_audio( new_audio = audiopreprocessing.load_preprocessed_audio(
new_audio_path, new_audio_path,
self.__desired_sr, self.__desired_sr,
@@ -195,78 +340,176 @@ class AudioFeatureExtractor():
self.__chunk_length, self.__chunk_length,
self.__overlap self.__overlap
) )
with self.__queue_lock: self.__audio_queue.put((new_audio, new_audio_path)) # In theory, this should block this audio feeder thread when the audio queue is full
self.__audio_queue.append( logging.info(f"[MTAFE] [Audio Feeder {thread_id}] Feed: {new_audio_path.absolute()}")
(new_audio, new_audio_path) logging.info("[MTAFE] [Audio Feeder {thread_id}] Waiting for other threads to finish")
) barrier.wait()
pop_path = self.__audio_paths_list.pop(0) if (thread_id == 0):
if DEBUG: print("Audio Queue Thread: Added new audio to queue", pop_path) self.__audio_queue.put(None) # None to signal audio_queue has no more elements to process
if DEBUG: print("Audio Queue Thread: DONE. All audio files fed") logging.info(f"[MTAFE] [Audio Feeder {thread_id}] Thread finished!")
except Exception as e:
logging.error(f"[MTAFE] [Audio Feeder {thread_id}] An exception occurred! Committing seppuku!")
logging.exception(e)
return
def __audio_queue_feature_extractor(self): # while (not self.__audio_paths_list.empty()):
"""Internal thread function. Get audio from audio queue. And extract embedding vector # if (not self.__audio_queue.full()):
for all audio chunks. Stores the resulting embedding into self.__features. # # Feed audio
With Original Audio's Path as key, and list[tuple[np.ndarray, float, int]] (list of tuple of embedding vector, position, channel id) # new_audio_path = self.__audio_paths_list.get()
""" # self.__audio_paths_list.task_done()
while (self.__audio_paths_list or self.__audio_queue): # While there are still audio to be processed # logging.info(f"[MTAFE] [Audio Feeder {thread_id}] Preprocess: {new_audio_path.absolute()}")
if (self.__audio_queue): # If audio queue is not empty # new_audio = audiopreprocessing.load_preprocessed_audio(
with self.__queue_lock: # new_audio_path,
audio_to_process, audio_path = self.__audio_queue.pop(0) # Get audio from queue # self.__desired_sr,
if DEBUG: print(f"Feature Extractor Thread: Extracting {len(audio_to_process)} features from audio", audio_path) # self.__mono,
for audio_chunk in audio_to_process: # self.__chunk_length,
same_audio_chunk, timepos, channel_id, embedd_vect = self.__embedding_extract(audio_chunk) # self.__overlap
if (audio_path not in self.__features.keys()): # )
#if DEBUG: print("Adding new vector to", audio_path.name) # self.__audio_queue.put((new_audio, new_audio_path))
self.__features[audio_path] = [(embedd_vect, timepos, channel_id)] # logging.info(f"[MTAFE] [Audio Feeder {thread_id}] Feed: {new_audio_path.absolute()}")
else: # logging.info(f"[MTAFE] [Audio Feeder {thread_id}] Thread finished!")
#if DEBUG: print("Adding vector to", audio_path.name)
self.__features[audio_path].append(
(embedd_vect, timepos, channel_id)
)
else:
if DEBUG: print("Feature Extractor Thread: Queue Empty, extractor thread sleeping for 5 seconds") # If audio queue is empty, wait
time.sleep(5)
if DEBUG: print("Feature Extractor Thread: DONE. Extracted all features from all audio files")
def __init__( #def testfeedthread(self, nthreads):
self, # t1 = threading.Thread(target=self.__audio_feeder_thread, args=(1,))
audio_paths_list: list[Path], # t2 = threading.Thread(target=self.__audio_feeder_thread, args=(2,))
max_audio_in_queue: int, # t1.start(); t2.start()
desired_sr: int, # #with self.__audio_feed_condition:
mono: bool, # # self.__audio_feed_condition.notify_all()
chunk_length: float = 15.0, # t1.join(); t2.join()
overlap: float = 2.0 # with concurrent.futures.ThreadPoolExecutor(max_workers=nthreads) as executor:
): # for i in range(nthreads):
self.__audio_queue = [] # ft = executor.submit(self.__audio_feeder_thread, i)
self.__audio_paths_list = audio_paths_list # self.__audio_loader_threadpool.append(ft)
self.__max_audio_in_queue = max_audio_in_queue
self.__queue_lock = threading.Lock() def __check_all_audiofeed_thread_finished(self) -> bool:
self.__desired_sr = desired_sr for ft in self.__audio_feeder_threadpool:
self.__mono = mono if ft.running():
self.__chunk_length = chunk_length return False
self.__overlap = overlap return True
self.__features = {}
def __check_all_featureextractor_thread_finished(self) -> bool:
for ft in self.__feature_extractor_threadpool:
if ft.running():
return False
return True
def __feature_extractor_thread(self, thread_id):
while True:
# Attempt to get next audio chunks to process
next_audio_tuple = self.__audio_queue.get()
# Check thread exit condition
if (next_audio_tuple is None):
self.__audio_queue.put(next_audio_tuple) # Put the None back to notify other threads
break # unalive urself
else: # Assuming we got more tuples
current_audio_to_process, current_audio_path = next_audio_tuple # Deconstruct tuple
logging.info(f"[MTAFE] [Feature Extractor {thread_id}] Extracting: {current_audio_path}")
features_to_add = self.__audio_inference_embedding(current_audio_to_process)
with self.__features_lock:
self.__features[current_audio_path] = features_to_add
logging.info(f"[MTAFE] [Feature Extractor {thread_id}] Feature Extraction complete for {current_audio_path} w/ {len(features_to_add)} features")
logging.info(f"[MTAFE] [Feature Extractor {thread_id}] Thread finished!")
# while (not self.__check_all_audiofeed_thread_finished() or not self.__audio_queue.empty()):
# if (not self.__audio_queue.empty()):
# audio_to_process, audio_path = self.__audio_queue.get()
# self.__audio_queue.task_done()
# logging.info(f"[MTAFE] [Feature Extractor {thread_id}] Extracting: {audio_path}")
# features_to_add = self.__audio_inference_embedding(audio_to_process)
# logging.info(f"[MTAFE] [Feature Extractor {thread_id}] Extracted: {len(features_to_add)} features")
# with self.__features_lock:
# self.__features[audio_path] = features_to_add
# #with self.__audio_feed_condition: self.__audio_feed_condition.notify_all()
# logging.info(f"[MTAFE] [Feature Extractor {thread_id}] Feature Extraction complete for {audio_path} w/ {len(features_to_add)} features")
#else:
# if (not self.__check_all_audiofeed_thread_finished()):
# with self.__audio_feed_condition:
# logging.info(f"[MTAFE] [Feature Extractor {thread_id}] Audio queue empty: waiting")
# self.__audio_feed_condition.wait(10)
# self.__audio_feed_condition.wait_for(lambda: not self.__audio_queue.empty())
def __count_running_threads(self) -> tuple[int, int]:
running_extractors = 0
running_feeders = 0
for ft in self.__feature_extractor_threadpool:
if ft.running(): running_extractors += 1
for ft in self.__audio_feeder_threadpool:
if ft.running(): running_feeders += 1
return (running_feeders, running_extractors)
@property @property
def features(self) -> dict[Path, list[tuple[np.ndarray, float, int]]]: def features(self) -> dict[Path, list[tuple[np.ndarray, float, int]]]:
return self.__features return self.__features
def extract(self): def extract(self):
print("Starting feature extraction for", len(self.__audio_paths_list), "file(s)") total_amount = self.__audio_paths_list.qsize() - 1 # Account for None to indicate queue end
total_amount = len(self.__audio_paths_list) logging.info(f"[MTAFE] [Main] Starting feature extraction for {total_amount} file(s)")
t_start = time.perf_counter() t_start = time.perf_counter() # Timer
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor: with concurrent.futures.ProcessPoolExecutor(max_workers=(self.__audio_feeder_threads + self.__feature_extractor_threads)) as executor:
self.__feeder_future = executor.submit(self.__audio_queue_feeder) # Audio feeder threads
self.__extractor_future = executor.submit(self.__audio_queue_feature_extractor) for i in range(self.__audio_feeder_threads):
while (self.__feeder_future.running() or self.__extractor_future.running()): logging.info(f"[MTAFE] Started audio feeder thread {i}")
print(f"Processed {len(self.__features)}/{total_amount} (L:{len(self.__audio_queue)}/W{len(self.__audio_paths_list)})", end="\r") ld_ft = executor.submit(self.__audio_feeder_thread, i, self.__audio_feeder_barrier)
time.sleep(1) self.__audio_feeder_threadpool.append(ld_ft)
# Feature extractor threads
for i in range(self.__feature_extractor_threads):
logging.info(f"[MTAFE] Started feature extractor thread {i}")
ex_ft = executor.submit(self.__feature_extractor_thread, i)
self.__feature_extractor_threadpool.append(ex_ft)
# Progress checking
while ( (not self.__check_all_audiofeed_thread_finished()) and (not self.__check_all_featureextractor_thread_finished()) ):
nfeeder, nextract = self.__count_running_threads()
print(f"[MTAFE Progress] Processed {len(self.__features)}/{total_amount} (L:{self.__audio_queue.qsize()}/W:{self.__audio_paths_list.qsize()}, LD:{nfeeder}/EXT:{nextract})", end="\r")
t_stop = time.perf_counter() t_stop = time.perf_counter()
print(f"Processed {len(self.__features)}/{total_amount} (L:{len(self.__audio_queue)}/W:{len(self.__audio_paths_list)} COMPLETE)") logging.info(f"[MTAFE] Processed {len(self.__features)}/{total_amount} (L:{self.__audio_queue.qsize() - 1}/W:{self.__audio_paths_list.qsize() - 1} COMPLETE)")
delta_t = t_stop - t_start delta_t = t_stop - t_start
total_features = sum( [len(self.__features[path]) for path in self.__features] ) total_features = sum( [len(self.__features[path]) for path in self.__features] )
print() logging.info(f"[MTAFE] Extraction complete. Took {delta_t} seconds. Added {total_features} vectors/embeddings")
print("Extraction completed")
print(f"Took {delta_t} seconds. Added {total_features} vectors/embeddings")
def __init__(
self,
audio_paths: list[Path],
max_audio_in_queue: int = 16,
audio_feeder_threads: int = 8,
feature_extractor_threads: int = 8,
desired_sr: int = 32000,
force_mono: bool = False,
chunk_length: float = 15.0,
chunk_overlap: float = 2.0,
):
# Check if the paths passed in are all valid and add them to queue
self.__audio_paths_list = multiprocessing.Queue()
for p in audio_paths:
if not p.is_file():
raise Exception(f"Path '{p.absolute()}' is NOT a valid file!")
else:
self.__audio_paths_list.put(p)
self.__audio_paths_list.put(None) # To signal to the producer that the audio path list is empty, since Queue.empty() is unreliable
logging.info(f"[MTAFE] [Constructor] Queued {self.__audio_paths_list.qsize() - 1} files")
# Set up private attributes
## Audio preprocessing parameters
self.__desired_sr = desired_sr
self.__mono = force_mono
self.__chunk_length = chunk_length
self.__overlap = chunk_overlap
## Extractor/Feeder settings
self.__max_audio_in_queue = max_audio_in_queue
self.__audio_feeder_threads = audio_feeder_threads
self.__feature_extractor_threads = feature_extractor_threads
## Set up runtime conditions
self.__audio_queue = multiprocessing.Queue(maxsize=self.__max_audio_in_queue)
self.__features = {}
self.__features_lock = multiprocessing.Lock()
self.__audio_feeder_barrier = multiprocessing.Barrier(self.__audio_feeder_threads)
self.__audio_feeder_threadpool = []
self.__feature_extractor_threadpool = []
logging.info(f"[MTAFE] [Constructor] Extraction parameters: {desired_sr}Hz, Mono: {force_mono}, Divide into {chunk_length}s chunks with {chunk_overlap}s of overlap")
logging.info(f"[MTAFE] [Constructor] Using {audio_feeder_threads} threads for preprocessing audio and {feature_extractor_threads} threads for feature extraction. Max queue size of {max_audio_in_queue} files")
# More audio embeddings specific code below (To be overridden)

View File

@@ -0,0 +1,8 @@
import dataset_files
import multiprocessing
import logging
import numpy as np
import threading
import queue
from pathlib import Path

View File

@@ -0,0 +1,193 @@
from dataset_files import MultiThreadedAudioFeatureExtractor
from pathlib import Path
from panns_inference import AudioTagging
import logging
import numpy as np
import queue
import concurrent.futures
import threading
import time
import audiopreprocessing
#import torch
#import gc
class mtafe_panns():
__audio_queue: queue.Queue[ # List of ...
tuple[ # Pair of chunked audio and its path
list[tuple[np.ndarray, float, int]], # Chunked audio
Path # Path to original audio
]
] # Listed of Chunked/Resampled audio
__audio_loader_threads: int # Amount of audio feeder threads
__feature_extractor_threads: int # Amount of feature extractor threads (if the method allows)
__audio_paths_list: queue.Queue[Path] # Path list to audio
__max_audio_in_queue: int # Maximum audio in queue
__desired_sr: int
__mono: bool
__chunk_length: float
__overlap: float
__features: dict[Path, list[tuple[np.ndarray, float, int]]] # This is a crime, I know
__features_lock: threading.Lock
__audio_loader_threadpool: list[concurrent.futures.Future]
__feature_extractor_threadpool: list[concurrent.futures.Future]
__at: AudioTagging
__batch_size: int
def __init__(self,
audio_paths: list[Path],
max_audio_in_queue: int = 16,
audio_feeder_threads: int = 8,
feature_extractor_threads: int = 8,
desired_sr: int = 32000,
force_mono: bool = False,
chunk_length: float = 15.0,
chunk_overlap: float = 2.0,
batch_size: int = 20
):
# Check if the paths passed in are all valid and add them to queue
self.__audio_paths_list = queue.Queue()
for p in audio_paths:
if not p.is_file():
raise Exception(f"Path '{p.absolute()}' is NOT a valid file!")
else:
self.__audio_paths_list.put(p)
#self.__audio_paths_list.task_done()
logging.info(f"[MTAFE] [Constructor] Queued {self.__audio_paths_list.qsize()} files")
# Set up private attributes
## Audio preprocessing parameters
self.__desired_sr = desired_sr
self.__mono = force_mono
self.__chunk_length = chunk_length
self.__overlap = chunk_overlap
## Extractor/Feeder settings
self.__max_audio_in_queue = max_audio_in_queue
self.__audio_loader_threads = audio_feeder_threads
self.__feature_extractor_threads = feature_extractor_threads
## Set up runtime conditions
self.__audio_queue = queue.Queue(maxsize=max_audio_in_queue)
self.__features = {}
self.__features_lock = threading.Lock()
self.__audio_loader_threadpool = []
self.__feature_extractor_threadpool = []
logging.info(f"[MTAFE] [Constructor] Extraction parameters: {desired_sr}Hz, Mono: {force_mono}, Divide into {chunk_length}s chunks with {chunk_overlap}s of overlap")
logging.info(f"[MTAFE] [Constructor] Using {audio_feeder_threads} threads for preprocessing audio and {feature_extractor_threads} threads for feature extraction. Max queue size of {max_audio_in_queue} files")
logging.info(f"[MTAFE] [Constructor] Initializing PANNs")
logging.info(f"[MTAFE] [Constructor] Inferencing with batch size {batch_size}")
self.__at = AudioTagging(checkpoint_path=None, device='cuda')
self.__batch_size = batch_size
def __chunks(self, lst, n):
# Stolen straight from Stackoverflow
"""Yield successive n-sized chunks from lst."""
for i in range(0, len(lst), n):
yield lst[i:i + n]
def __audio_inference_embedding(self, audio: list[tuple[np.ndarray, float, int]]) -> list[tuple[np.ndarray, float, int]]:
audio_chunk_list = []
timepos_list = []
channel_id_list = []
embedding_list = []
# Split into equal sized list
for audio_chunk, timepos, channel in audio:
audio_chunk_list.append(audio_chunk)
timepos_list.append(timepos)
channel_id_list.append(channel)
# Convert audio_chunk_list into numpy array
audio_chunk_list = np.array(audio_chunk_list)
#logging.info("[MTAFE] [PANNs] Inferencing...")
try:
for i, batch in enumerate(self.__chunks(audio_chunk_list, self.__batch_size)):
(clipwise_output, embedding) = self.__at.inference(batch)
for vect in embedding: # vect: np.ndarray
embedding_list.append(vect)
logging.info(f"[MTAFE] [PANNs] Inferenced batch {i}")
assert len(audio_chunk_list) == len(timepos_list) == len(channel_id_list) == len(embedding_list)
except Exception as e:
logging.critical("[MTAFE] [PANNs] ERROR! INFERENCE FAILED!!! OR LIST SIZE MISMATCH")
logging.critical(e)
embedding_list = [None for _ in audio_chunk_list] # Clearing embedding_list and filling it with None
return list(zip(embedding_list, channel_id_list, embedding_list))
def __audio_feeder_thread(self, thread_id):
while (not self.__audio_paths_list.empty()):
new_audio_path = self.__audio_paths_list.get()
self.__audio_paths_list.task_done()
logging.info(f"[MTAFE] [Audio Feeder {thread_id}] Preprocess: {new_audio_path.absolute()}")
new_audio = audiopreprocessing.load_preprocessed_audio(
new_audio_path,
self.__desired_sr,
self.__mono,
self.__chunk_length,
self.__overlap
)
self.__audio_queue.put((new_audio, new_audio_path))
logging.info(f"[MTAFE] [Audio Feeder {thread_id}] Feed: {new_audio_path.absolute()}")
logging.info(f"[MTAFE] [Audio Feeder {thread_id}] Thread finished!")
def __check_all_audiofeed_thread_finished(self) -> bool:
for ft in self.__audio_loader_threadpool:
if ft.running():
return False
return True
def __check_all_featureextractor_thread_finished(self) -> bool:
for ft in self.__feature_extractor_threadpool:
if ft.running():
return False
return True
def __feature_extractor_thread(self, thread_id):
while (not self.__check_all_audiofeed_thread_finished() or not self.__audio_queue.empty()):
if (not self.__audio_queue.empty()):
audio_to_process, audio_path = self.__audio_queue.get()
self.__audio_queue.task_done()
logging.info(f"[MTAFE] [Feature Extractor {thread_id}] Extracting: {audio_path}")
features_to_add = self.__audio_inference_embedding(audio_to_process)
logging.info(f"[MTAFE] [Feature Extractor {thread_id}] Extracted: {len(features_to_add)} features")
with self.__features_lock:
self.__features[audio_path] = features_to_add
logging.info(f"[MTAFE] [Feature Extractor {thread_id}] Feature Extraction complete for {audio_path} w/ {len(features_to_add)} features")
logging.info(f"[MTAFE] [Feature Extractor {thread_id}] Thread finished!")
def __count_running_threads(self) -> tuple[int, int]:
running_extractors = 0
running_feeders = 0
for ft in self.__feature_extractor_threadpool:
if ft.running(): running_extractors += 1
for ft in self.__audio_loader_threadpool:
if ft.running(): running_feeders += 1
return (running_feeders, running_extractors)
@property
def features(self) -> dict[Path, list[tuple[np.ndarray, float, int]]]:
return self.__features
def extract(self):
total_amount = self.__audio_paths_list.qsize()
logging.info(f"[MTAFE] [Main] Starting feature extraction for {total_amount} file(s)")
t_start = time.perf_counter()
with concurrent.futures.ThreadPoolExecutor(max_workers=(self.__audio_loader_threads + self.__feature_extractor_threads)) as executor:
for i in range(self.__audio_loader_threads):
ld_ft = executor.submit(self.__audio_feeder_thread, i)
self.__audio_loader_threadpool.append(ld_ft)
for i in range(self.__feature_extractor_threads):
ld_ft = executor.submit(self.__feature_extractor_thread, i)
self.__feature_extractor_threadpool.append(ld_ft)
while ( (not self.__check_all_audiofeed_thread_finished()) and (not self.__check_all_featureextractor_thread_finished()) ):
nfeeder, nextract = self.__count_running_threads()
print(f"[MTAFE Progress] Processed {len(self.__features)}/{total_amount} (L:{self.__audio_queue.qsize()}/W:{self.__audio_paths_list.qsize()}, LD:{nfeeder}/EXT:{nextract})", end="\r")
t_stop = time.perf_counter()
logging.info(f"[MTAFE] Processed {len(self.__features)}/{total_amount} (L:{self.__audio_queue.qsize()}/W:{self.__audio_paths_list.qsize()} COMPLETE)")
delta_t = t_stop - t_start
total_features = sum( [len(self.__features[path]) for path in self.__features] )
logging.info(f"[MTAFE] Extraction complete. Took {delta_t} seconds. Added {total_features} vectors/embeddings")

View File

@@ -1,3 +1,17 @@
from dataset_files import AudioFeatureExtractor, random_audio_chunk import logging
afe = AudioFeatureExtractor(random_audio_chunk(32), 16, 32000, False) from audiopreprocessing import triggerlog
afe.extract() #logger = logging.getLogger(__name__)
logging.basicConfig(format="%(asctime)s/%(levelname)s: [%(module)s] %(message)s", level=logging.INFO)
from dataset_files import MultiThreadedAudioFeatureExtractor, random_audio_chunk
mtafe = MultiThreadedAudioFeatureExtractor(
audio_paths=random_audio_chunk(8),
max_audio_in_queue=8,
audio_feeder_threads=8,
feature_extractor_threads=1,
desired_sr=32000,
force_mono=False,
chunk_length=15,
chunk_overlap=2
)
mtafe.extract()

View File

@@ -0,0 +1,17 @@
#import mtafe
import logging
#import dataset_files
logging.basicConfig(format="%(asctime)s/%(levelname)s: [%(module)s] %(message)s", level=logging.DEBUG)
logging.info("Running tests")
# m = mtafe.mtafe(
# audio_paths=dataset_files.random_audio_chunk(2),
# max_audio_in_queue=8,
# audio_feeder_threads=8,
# feature_extractor_threads=1,
# desired_sr=32000,
# force_mono=False,
# chunk_length=15,
# chunk_overlap=2
# )
# m.run()

View File

@@ -0,0 +1,24 @@
import logging
from audiopreprocessing import triggerlog
#logger = logging.getLogger(__name__)
import sys
logging.basicConfig(format="%(asctime)s/%(levelname)s: [%(module)s] %(message)s", level=logging.INFO, handlers=[logging.FileHandler('test_panns.log'), logging.StreamHandler(sys.stdout)])
from pathlib import Path
from mtafe_panns import mtafe_panns
from dataset_files import random_audio_chunk, serialize_dict_obj
mtafe = mtafe_panns(
audio_paths=random_audio_chunk(4),
max_audio_in_queue=4,
audio_feeder_threads=4,
feature_extractor_threads=1,
desired_sr=32000,
force_mono=False,
chunk_length=15,
chunk_overlap=2,
batch_size=32
)
mtafe.extract()
print("Saving inferenced results to file...")
p = Path('./test_panns.pkl')
serialize_dict_obj(p, mtafe.features)

View File

@@ -0,0 +1,95 @@
import librosa
import pickle
import os
import numpy as np
from pathlib import Path
import logging
logger = logging.getLogger(__name__)
def triggerlog():
logger.critical("Testing: info")
def resample_load(input_path : Path, target_sr : int = 16000, mono_audio : bool = False) -> np.ndarray: # AI
"""Load and resamples the audio into `target_sr`.
Args:
input_path (Path): pathlib.Path object to audio file
target_sr (int, optional): Target Sample Rate to resample. Defaults to 16000.
mono_audio (bool, optional): Load the audio in mono mode. Defaults to False.
Returns:
np.ndarray: _description_
"""
# Load audio file with original sample rate
logger.info(f"[resample_load] Loading audio {input_path}")
audio, orig_sr = librosa.load(input_path, sr=None, mono=mono_audio)
# Resample if necessary
if orig_sr != target_sr:
logger.info(f"[resample_load] Resampling to {target_sr}")
audio = librosa.resample(audio, orig_sr=orig_sr, target_sr=target_sr)
return audio
def chunk_audio(audio : np.ndarray, sr: int, chunk_length: float = 10.0, overlap: float = 2.0) -> tuple[list[np.ndarray], list[float], int]: # AI
"""
Chunks audio file into overlapping segments. Only pass in mono audio here.
Args:
audio_file: Loaded audio ndarray (one channel only)
sr: Sample rate for the given audio file
chunk_length: Length of each chunk in seconds
overlap: Overlap between chunks in seconds
Returns:
List of audio chunks, list of chunk positions, and given sample rate
"""
logger.info(f"[chunk_audio] Chunking audio ({len(audio) / sr}s)")
# Calculate chunk size and hop length in samples
chunk_size = int(chunk_length * sr)
hop_length = int((chunk_length - overlap) * sr)
# Generate chunks
chunks = []
positions = []
k = 0
for i in range(0, len(audio) - chunk_size + 1, hop_length):
chunk = audio[i:i + chunk_size]
chunks.append(chunk)
positions.append(i / sr)
k += 1
if k == 0: # The full audio length is less than chunk_length
chunks = [audio]
positions = [0.0]
logger.info(f"[chunk_audio] Audio less than chunk_length. Returning original audio as chunk\r")
else:
logger.info(f"[chunk_audio] Audio is split into {k} chunks")
return chunks, positions, sr
def load_preprocessed_audio(
path: Path,
desired_sr: int,
mono: bool = False,
chunk_length: float = 15.0,
overlap: float = 2.0) -> list[tuple[np.ndarray, float, int]]:
result = []
# Load and resample audio
audio = resample_load(path, desired_sr, mono) # Stereo 2D matrix, Mono 1D array
if mono or (audio.ndim == 1):
# Chunk audio: mono (or the audio file loaded in itself is mono)
chunks, positions, _ = chunk_audio(audio, desired_sr, chunk_length, overlap)
assert len(chunks) == len(positions)
result.extend(zip(chunks, positions, [-1 for _ in range(len(chunks))]))
# (ndarray_chunk1, pos1, -1): first audio chunk, position1, -1 (Mono channel indicator)
else:
# Chunk audio: stereo/multichannel
for channel_id, channel_audio in enumerate(audio):
chunks, positions, _ = chunk_audio(channel_audio, desired_sr, chunk_length, overlap)
assert len(chunks) == len(positions)
result.extend(zip(chunks, positions, [channel_id for _ in range(len(chunks))]))
# (ndarray_chunk1, pos1, 0): first audio chunk, position1, 0 (channel 0)
logging.info(f"[load_preprocessed_audio] Loaded audio {path} ({desired_sr}Hz, Chunk {chunk_length}s with overlap {overlap}s) MONO:{mono}")
return result

135
mtafe_lab/dataset.py Normal file
View File

@@ -0,0 +1,135 @@
import platform
import os
import pickle
import random
import multiprocessing
import threading
import time
import concurrent.futures
import numpy as np
from pathlib import Path
import audiopreprocessing
import logging
import queue
def serialize_dict_obj(path : Path, object : dict) -> int:
"""Serializes Python Dictionary object to a file via Pickle.
Args:
path (Path): Path to store the file
object (dict): Dictionary object to serialize
Returns:
int: size in bytes written
"""
# Horrible practice, horrible security, but it will work for now
with path.open("wb") as fp:
pickle.dump(object, fp)
fp.seek(0, os.SEEK_END)
size = fp.tell()
return size
logging.info("Reading local dataset directory structure...")
ASMRThreePath = Path("C:\\ASMRThree")
ASMRTwoPath = Path("D:\\ASMRTwo")
ASMROnePath = Path("E:\\ASMROne")
if (platform.system() == 'Linux'):
ASMROnePath = Path('/mnt/Scratchpad/ASMROne')
ASMRTwoPath = Path('/mnt/MyStuffz/ASMRTwo')
ASMRThreePath = Path('/mnt/Windows11/ASMRThree')
size_one, size_two, size_three = 0, 0, 0
files_one, files_two, files_three = [], [], []
folders_one, folders_two, folders_three = [], [], []
# Statistic calculation for ASMROne
for root, dirs, files in ASMROnePath.walk(): # Root will iterate through all folders
if root.absolute() != ASMROnePath.absolute(): # Skip root of ASMROnePath
folders_one.append(root) # Add folder to list
for fname in files: # Iterate through all files in current root
file = root/fname # Get file path
assert file.is_file()
files_one.append(file)
size_one += file.stat().st_size # Get file size
# Statistic calculation for ASMRTwo
for root, dirs, files in ASMRTwoPath.walk(): # Root will iterate through all folders
if root.absolute() != ASMRTwoPath.absolute(): # Skip root of ASMRTwoPath
folders_two.append(root) # Add folder to list
for fname in files: # Iterate through all files in current root
file = root/fname # Get file path
assert file.is_file()
files_two.append(file)
size_two += file.stat().st_size # Get file size
# Statistic calculation for ASMRThree
for root, dirs, files in ASMRThreePath.walk(): # Root will iterate through all folders
if root.absolute() != ASMRThreePath.absolute(): # Skip root of ASMRThreePath
folders_three.append(root) # Add folder to list
for fname in files: # Iterate through all files in current root
file = root/fname # Get file path
assert file.is_file()
files_three.append(file)
size_three += file.stat().st_size # Get file size
DataSubsetPaths = [ASMROnePath, ASMRTwoPath, ASMRThreePath]
DLSiteWorksPaths = []
# Collect ASMR Works (RJ ID, Paths)
for ASMRSubsetPath in DataSubsetPaths:
for WorkPaths in ASMRSubsetPath.iterdir():
DLSiteWorksPaths.append(WorkPaths)
fileExt2fileType = {
".TXT": "Document",
".WAV": "Audio",
".MP3": "Audio",
".PNG": "Image",
".JPG": "Image",
".VTT": "Subtitle",
".PDF": "Document",
".FLAC": "Audio",
".MP4": "Video",
".LRC": "Subtitle",
".SRT": "Subtitle",
".JPEG": "Image",
".ASS": "Subtitle",
"": "NO EXTENSION",
".M4A": "Audio",
".MKV": "Video"
}
fileext_stat = {}
file_list = files_one + files_two + files_three
file_list_count = len(file_list)
for file in file_list:
f_ext = file.suffix.upper()
if (f_ext in fileext_stat.keys()):
fileext_stat[f_ext]['Count'] += 1
fileext_stat[f_ext]['List'].append(file)
fileext_stat[f_ext]['ExtensionMass'] += file.stat().st_size
else:
fileext_stat[f_ext] = {}
fileext_stat[f_ext]['Count'] = 1
fileext_stat[f_ext]['List'] = [file]
fileext_stat[f_ext]['ExtensionMass'] = file.stat().st_size # The total sum of sizes of the same file extension
fileext_stat[f_ext]['MediaType'] = fileExt2fileType[f_ext]
audio_paths = []
for extension in fileext_stat: # I can't be bothered to convert this into a list compresion
if fileext_stat[extension]['MediaType'] == "Audio":
audio_paths += fileext_stat[extension]['List']
def random_audio_chunk(n : int, seed : int = 177013) -> list[Path]:
"""Returns a random selection of audio files
Args:
n (int): Amount of files to return
seed (int, optional): Seed for RNG. Defaults to 177013.
Returns:
list[Path]: List of randomly selected audio paths (using Path object)
"""
random.seed(seed)
#return random.choices(audio_paths, k=n) # Contains repeated elements
return random.sample(audio_paths, k=n)

32
mtafe_lab/mtafe.py Normal file
View File

@@ -0,0 +1,32 @@
import logging
logging.basicConfig(format="%(asctime)s/%(levelname)s: [%(module)s] %(message)s", level=logging.INFO)
import multiprocessing
import multiprocessing.process
import dataset
import audiopreprocessing
from pathlib import Path
def copy_worker(origin_queue, target_queue):
p = origin_queue.get()
logging.info(f"Processing: {p}")
l = audiopreprocessing.load_preprocessed_audio(p, 32000, True)
print("Preprocess complete, putting it into queue")
target_queue.put(l) # Even on a small scale test, the process will always hang here
if __name__ == "__main__":
audio_path_queue = multiprocessing.Queue()
audio_queue = multiprocessing.Queue()
rand_paths = dataset.random_audio_chunk(1)
for p in rand_paths:
audio_path_queue.put(p)
print("Files queued")
processes = [multiprocessing.Process(target=copy_worker, args=(audio_path_queue, audio_queue)) for _ in range(1)]
for p in processes: p.start()
for p in processes: p.join()
print("Joined")
#for _ in range(1): print(audio_queue.get())

30
mtafe_lab/test_mp.py Normal file
View File

@@ -0,0 +1,30 @@
import logging
logging.basicConfig(format="%(asctime)s/%(levelname)s: [%(module)s] %(message)s", level=logging.INFO)
import multiprocessing
from dataset import random_audio_chunk
import audiopreprocessing
from time import sleep
origin_queue = multiprocessing.Queue()
target_queue = multiprocessing.Queue()
def worker(orig, targ):
p = orig.get()
#out = "PROCESSED" + str(p.absolute())
out = audiopreprocessing.load_preprocessed_audio(p, 16000, True) # This will cause put to hang
targ.put(out) # This will hang the process
if __name__ == "__main__":
K = 2
for p in random_audio_chunk(K):
origin_queue.put(p)
processes = [multiprocessing.Process(target=worker, args=(origin_queue, target_queue)) for _ in range(K)]
for p in processes: p.start()
for p in processes: p.join()
logging.critical("Successfully terminated all threads")
for _ in range(K): print(target_queue.get())

21
mtafe_lab/test_mtafe.py Normal file
View File

@@ -0,0 +1,21 @@
import logging
logging.basicConfig(format="%(asctime)s/%(levelname)s: [%(module)s] %(message)s", level=logging.INFO)
import mtafe
from dataset import random_audio_chunk
logging.info("Generating random audio path list")
rdpl = random_audio_chunk(2)
logging.info("Initializing MTAFE")
mtafe.initialize_parameters(
paudio_paths=rdpl,
pmax_audio_in_queue=4,
paudio_feeder_threads=2,
pfeature_extractor_threads=1,
pdesired_sr=32000,
pforce_mono=False,
pchunk_length=15,
pchunk_overlap=2
)
mtafe.test_feeder()