Compare commits
2 Commits
6fc6df87b2
...
37b6a3c5e7
| Author | SHA1 | Date | |
|---|---|---|---|
|
37b6a3c5e7
|
|||
|
b855b7e255
|
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
@@ -3,18 +3,31 @@ import pickle
|
|||||||
import os
|
import os
|
||||||
import numpy as np
|
import numpy as np
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
import logging
|
||||||
|
|
||||||
DEBUG=True
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
def triggerlog():
|
||||||
|
logger.critical("Testing: info")
|
||||||
|
|
||||||
def resample_load(input_path : Path, target_sr : int = 16000, mono_audio : bool = False) -> np.ndarray: # AI
|
def resample_load(input_path : Path, target_sr : int = 16000, mono_audio : bool = False) -> np.ndarray: # AI
|
||||||
"""Resample audio to target sample rate and save to output directory"""
|
"""Load and resamples the audio into `target_sr`.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
input_path (Path): pathlib.Path object to audio file
|
||||||
|
target_sr (int, optional): Target Sample Rate to resample. Defaults to 16000.
|
||||||
|
mono_audio (bool, optional): Load the audio in mono mode. Defaults to False.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
np.ndarray: _description_
|
||||||
|
"""
|
||||||
# Load audio file with original sample rate
|
# Load audio file with original sample rate
|
||||||
if DEBUG: print("[resample_load] Loading audio", input_path)
|
logger.info(f"[resample_load] Loading audio {input_path}")
|
||||||
audio, orig_sr = librosa.load(input_path, sr=None, mono=mono_audio)
|
audio, orig_sr = librosa.load(input_path, sr=None, mono=mono_audio)
|
||||||
|
|
||||||
# Resample if necessary
|
# Resample if necessary
|
||||||
if orig_sr != target_sr:
|
if orig_sr != target_sr:
|
||||||
if DEBUG: print("[resample_load] Resampling to", target_sr)
|
logger.info(f"[resample_load] Resampling to {target_sr}")
|
||||||
audio = librosa.resample(audio, orig_sr=orig_sr, target_sr=target_sr)
|
audio = librosa.resample(audio, orig_sr=orig_sr, target_sr=target_sr)
|
||||||
|
|
||||||
return audio
|
return audio
|
||||||
@@ -24,7 +37,7 @@ def chunk_audio(audio : np.ndarray, sr: int, chunk_length: float = 10.0, overlap
|
|||||||
Chunks audio file into overlapping segments. Only pass in mono audio here.
|
Chunks audio file into overlapping segments. Only pass in mono audio here.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
audio_file: Loaded audio ndarray
|
audio_file: Loaded audio ndarray (one channel only)
|
||||||
sr: Sample rate for the given audio file
|
sr: Sample rate for the given audio file
|
||||||
chunk_length: Length of each chunk in seconds
|
chunk_length: Length of each chunk in seconds
|
||||||
overlap: Overlap between chunks in seconds
|
overlap: Overlap between chunks in seconds
|
||||||
@@ -32,7 +45,7 @@ def chunk_audio(audio : np.ndarray, sr: int, chunk_length: float = 10.0, overlap
|
|||||||
Returns:
|
Returns:
|
||||||
List of audio chunks, list of chunk positions, and given sample rate
|
List of audio chunks, list of chunk positions, and given sample rate
|
||||||
"""
|
"""
|
||||||
if DEBUG: print("[chunk_audio] Chunking audio")
|
logger.info(f"[chunk_audio] Chunking audio ({len(audio) / sr}s)")
|
||||||
# Calculate chunk size and hop length in samples
|
# Calculate chunk size and hop length in samples
|
||||||
chunk_size = int(chunk_length * sr)
|
chunk_size = int(chunk_length * sr)
|
||||||
hop_length = int((chunk_length - overlap) * sr)
|
hop_length = int((chunk_length - overlap) * sr)
|
||||||
@@ -46,10 +59,12 @@ def chunk_audio(audio : np.ndarray, sr: int, chunk_length: float = 10.0, overlap
|
|||||||
chunks.append(chunk)
|
chunks.append(chunk)
|
||||||
positions.append(i / sr)
|
positions.append(i / sr)
|
||||||
k += 1
|
k += 1
|
||||||
if DEBUG: print("[chunk_audio] Chunked", k, end="\r")
|
|
||||||
if k == 0: # The full audio length is less than chunk_length
|
if k == 0: # The full audio length is less than chunk_length
|
||||||
chunks = [audio]
|
chunks = [audio]
|
||||||
positions = [0.0]
|
positions = [0.0]
|
||||||
|
logger.info(f"[chunk_audio] Audio less than chunk_length. Returning original audio as chunk\r")
|
||||||
|
else:
|
||||||
|
logger.info(f"[chunk_audio] Audio is split into {k} chunks")
|
||||||
|
|
||||||
return chunks, positions, sr
|
return chunks, positions, sr
|
||||||
|
|
||||||
|
|||||||
@@ -2,14 +2,15 @@ import platform
|
|||||||
import os
|
import os
|
||||||
import pickle
|
import pickle
|
||||||
import random
|
import random
|
||||||
|
import multiprocessing
|
||||||
import threading
|
import threading
|
||||||
import time
|
import time
|
||||||
import concurrent.futures
|
import concurrent.futures
|
||||||
import numpy as np
|
import numpy as np
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
import audiopreprocessing
|
import audiopreprocessing
|
||||||
|
import logging
|
||||||
DEBUG=True
|
import queue
|
||||||
|
|
||||||
def serialize_dict_obj(path : Path, object : dict) -> int:
|
def serialize_dict_obj(path : Path, object : dict) -> int:
|
||||||
"""Serializes Python Dictionary object to a file via Pickle.
|
"""Serializes Python Dictionary object to a file via Pickle.
|
||||||
@@ -27,7 +28,7 @@ def serialize_dict_obj(path : Path, object : dict) -> int:
|
|||||||
size = fp.tell()
|
size = fp.tell()
|
||||||
return size
|
return size
|
||||||
|
|
||||||
print("Reading local dataset directory structure...")
|
logging.info("Reading local dataset directory structure...")
|
||||||
|
|
||||||
ASMRThreePath = Path("C:\\ASMRThree")
|
ASMRThreePath = Path("C:\\ASMRThree")
|
||||||
ASMRTwoPath = Path("D:\\ASMRTwo")
|
ASMRTwoPath = Path("D:\\ASMRTwo")
|
||||||
@@ -133,61 +134,205 @@ def random_audio_chunk(n : int, seed : int = 177013) -> list[Path]:
|
|||||||
#return random.choices(audio_paths, k=n) # Contains repeated elements
|
#return random.choices(audio_paths, k=n) # Contains repeated elements
|
||||||
return random.sample(audio_paths, k=n)
|
return random.sample(audio_paths, k=n)
|
||||||
|
|
||||||
class AudioFeatureExtractor():
|
# class AudioFeatureExtractor():
|
||||||
__audio_queue: list[ # List of ...
|
# __audio_queue: list[ # List of ...
|
||||||
|
# tuple[ # Pair of chunked audio and its path
|
||||||
|
# list[tuple[np.ndarray, float, int]], # Chunked audio
|
||||||
|
# Path # Path to original audio
|
||||||
|
# ]
|
||||||
|
# ] # Listed of Chunked/Resampled audio
|
||||||
|
# __feeder_future: concurrent.futures.Future
|
||||||
|
# __extractor_future: concurrent.futures.Future
|
||||||
|
# __audio_paths_list: list[Path]
|
||||||
|
# __max_audio_in_queue: int
|
||||||
|
# __queue_lock: threading.Lock
|
||||||
|
# __desired_sr: int
|
||||||
|
# __mono: bool
|
||||||
|
# __chunk_length: float
|
||||||
|
# __overlap: float
|
||||||
|
# __features: dict[Path, list[tuple[np.ndarray, float, int]]] # This is a crime, I know
|
||||||
|
# # { audioPath:
|
||||||
|
# # [(embedding, pos, channel)...]
|
||||||
|
# # }
|
||||||
|
|
||||||
|
# def __embedding_inference(self, audio_ndarray: np.ndarray) -> np.ndarray:
|
||||||
|
# """Uses embedding model to inference an audio. Returns embedding vectors.
|
||||||
|
# Function to be overrided. Returns np.zeros(32).
|
||||||
|
|
||||||
|
# Args:
|
||||||
|
# audio_ndarray (np.ndarray):
|
||||||
|
|
||||||
|
# Returns:
|
||||||
|
# np.ndarray: _description_
|
||||||
|
# """
|
||||||
|
# return np.zeros(32)
|
||||||
|
|
||||||
|
# def __embedding_extract(self, audio: tuple[np.ndarray, float, int]) -> tuple[np.ndarray, float, int, np.ndarray]:
|
||||||
|
# """Receives a tuple of audio, position, and channel ID, then adding the embedding to the tuple
|
||||||
|
|
||||||
|
# Args:
|
||||||
|
# audio (tuple[np.ndarray, float, int]): tuple of audio, position, channel id
|
||||||
|
|
||||||
|
# Returns:
|
||||||
|
# tuple[np.ndarray, float, int, np.ndarray]: audio, position, channel id, embedding vector
|
||||||
|
# """
|
||||||
|
# audio_chunk, pos, channel_id = audio
|
||||||
|
# return (audio_chunk, pos, channel_id, self.__embedding_inference(audio_chunk))
|
||||||
|
|
||||||
|
# def __audio_queue_feeder(self): # TODO: Upgrade to multithreaded loader?
|
||||||
|
# """Internal thread function. Preprocess and load the audio continuously to
|
||||||
|
# audio_queue until the end of the audio_paths_list
|
||||||
|
# """
|
||||||
|
# while (self.__audio_paths_list): # While there are still Path elements in path list
|
||||||
|
# if (not (len(self.__audio_queue) < self.__max_audio_in_queue)):
|
||||||
|
# logging.info("[AFE] [Audio Queue Thread]: Queue Full, feeder thread sleeping for 5 seconds")
|
||||||
|
# time.sleep(5)
|
||||||
|
# while(len(self.__audio_queue) < self.__max_audio_in_queue): # While the audio queue is not full
|
||||||
|
# new_audio_path = self.__audio_paths_list[0]
|
||||||
|
# new_audio = audiopreprocessing.load_preprocessed_audio(
|
||||||
|
# new_audio_path,
|
||||||
|
# self.__desired_sr,
|
||||||
|
# self.__mono,
|
||||||
|
# self.__chunk_length,
|
||||||
|
# self.__overlap
|
||||||
|
# )
|
||||||
|
# with self.__queue_lock:
|
||||||
|
# self.__audio_queue.append(
|
||||||
|
# (new_audio, new_audio_path)
|
||||||
|
# )
|
||||||
|
# pop_path = self.__audio_paths_list.pop(0)
|
||||||
|
# logging.info(f"[AFE] [Audio Queue Thread]: Added new audio to queue {pop_path}")
|
||||||
|
# logging.info("[AFE] [Audio Queue Thread]: DONE. All audio files fed")
|
||||||
|
|
||||||
|
# def __audio_queue_feature_extractor(self):
|
||||||
|
# """Internal thread function. Get audio from audio queue. And extract embedding vector
|
||||||
|
# for all audio chunks. Stores the resulting embedding into self.__features.
|
||||||
|
# With Original Audio's Path as key, and list[tuple[np.ndarray, float, int]] (list of tuple of embedding vector, position, channel id)
|
||||||
|
# """
|
||||||
|
# while (self.__audio_paths_list or self.__audio_queue): # While there are still audio to be processed
|
||||||
|
# if (self.__audio_queue): # If audio queue is not empty
|
||||||
|
# with self.__queue_lock:
|
||||||
|
# audio_to_process, audio_path = self.__audio_queue.pop(0) # Get audio from queue
|
||||||
|
# logging.info(f"[AFE] [Feature Extractor Thread]: Extracting {len(audio_to_process)} features from audio {audio_path}")
|
||||||
|
# for audio_chunk in audio_to_process:
|
||||||
|
# same_audio_chunk, timepos, channel_id, embedd_vect = self.__embedding_extract(audio_chunk)
|
||||||
|
# if (audio_path not in self.__features.keys()):
|
||||||
|
# #if DEBUG: print("Adding new vector to", audio_path.name)
|
||||||
|
# self.__features[audio_path] = [(embedd_vect, timepos, channel_id)]
|
||||||
|
# else:
|
||||||
|
# #if DEBUG: print("Adding vector to", audio_path.name)
|
||||||
|
# self.__features[audio_path].append(
|
||||||
|
# (embedd_vect, timepos, channel_id)
|
||||||
|
# )
|
||||||
|
# else:
|
||||||
|
# logging.info("[AFE] [Feature Extractor Thread]: Queue Empty, extractor thread sleeping for 5 seconds") # If audio queue is empty, wait
|
||||||
|
# time.sleep(5)
|
||||||
|
# logging.info("[AFE] [Feature Extractor Thread]: DONE. Extracted all features from all audio files")
|
||||||
|
|
||||||
|
# def __init__(
|
||||||
|
# self,
|
||||||
|
# audio_paths_list: list[Path],
|
||||||
|
# max_audio_in_queue: int,
|
||||||
|
# desired_sr: int,
|
||||||
|
# mono: bool,
|
||||||
|
# chunk_length: float = 15.0,
|
||||||
|
# overlap: float = 2.0
|
||||||
|
# ):
|
||||||
|
# self.__audio_queue = []
|
||||||
|
# self.__audio_paths_list = audio_paths_list
|
||||||
|
# self.__max_audio_in_queue = max_audio_in_queue
|
||||||
|
# self.__queue_lock = threading.Lock()
|
||||||
|
# self.__desired_sr = desired_sr
|
||||||
|
# self.__mono = mono
|
||||||
|
# self.__chunk_length = chunk_length
|
||||||
|
# self.__overlap = overlap
|
||||||
|
# self.__features = {}
|
||||||
|
|
||||||
|
# @property
|
||||||
|
# def features(self) -> dict[Path, list[tuple[np.ndarray, float, int]]]:
|
||||||
|
# return self.__features
|
||||||
|
|
||||||
|
# def extract(self):
|
||||||
|
# print("Starting feature extraction for", len(self.__audio_paths_list), "file(s)")
|
||||||
|
# total_amount = len(self.__audio_paths_list)
|
||||||
|
# t_start = time.perf_counter()
|
||||||
|
# with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
|
||||||
|
# self.__feeder_future = executor.submit(self.__audio_queue_feeder)
|
||||||
|
# self.__extractor_future = executor.submit(self.__audio_queue_feature_extractor)
|
||||||
|
# while (self.__feeder_future.running() or self.__extractor_future.running()):
|
||||||
|
# print(f"Processed {len(self.__features)}/{total_amount} (L:{len(self.__audio_queue)}/W{len(self.__audio_paths_list)})", end="\r")
|
||||||
|
# time.sleep(1)
|
||||||
|
|
||||||
|
# t_stop = time.perf_counter()
|
||||||
|
# print(f"Processed {len(self.__features)}/{total_amount} (L:{len(self.__audio_queue)}/W:{len(self.__audio_paths_list)} COMPLETE)")
|
||||||
|
# delta_t = t_stop - t_start
|
||||||
|
# total_features = sum( [len(self.__features[path]) for path in self.__features] )
|
||||||
|
# print()
|
||||||
|
# print("Extraction completed")
|
||||||
|
# print(f"Took {delta_t} seconds. Added {total_features} vectors/embeddings")
|
||||||
|
|
||||||
|
class MultiThreadedAudioFeatureExtractor():
|
||||||
|
# This is the third time I am rewriting this, please send help. Multithreaded apps is pure hell to develop and debug
|
||||||
|
# After testing: this will hang at the last audio, precisely at preprocessing audio. I suspect that GIL hit the performance
|
||||||
|
# so much to the point that the preprocessing routine cannot get any share of the CPU execution cycle
|
||||||
|
__audio_queue: queue.Queue[ # List of ...
|
||||||
tuple[ # Pair of chunked audio and its path
|
tuple[ # Pair of chunked audio and its path
|
||||||
list[tuple[np.ndarray, float, int]], # Chunked audio
|
list[tuple[np.ndarray, float, int]], # Chunked audio list of (ndarray, time position of chunk relative to original audio, channel_id)
|
||||||
Path # Path to original audio
|
Path # Path to original audio
|
||||||
]
|
]
|
||||||
] # Listed of Chunked/Resampled audio
|
] # Listed of Chunked/Resampled audio
|
||||||
__feeder_future: concurrent.futures.Future
|
__audio_feeder_threads: int # Amount of audio feeder threads
|
||||||
__extractor_future: concurrent.futures.Future
|
__feature_extractor_threads: int # Amount of feature extractor threads (if the method allows)
|
||||||
__audio_paths_list: list[Path]
|
__audio_paths_list: queue.Queue[Path] # Path list to audio
|
||||||
__max_audio_in_queue: int
|
__max_audio_in_queue: int # Maximum audio in queue
|
||||||
__queue_lock: threading.Lock
|
__audio_feeder_barrier: threading.Barrier # Synchronization barrier for all audio feeder threads
|
||||||
__desired_sr: int
|
# Audio Feeder parameter
|
||||||
__mono: bool
|
__desired_sr: int # Desired Sample Rate (Resampling)
|
||||||
__chunk_length: float
|
__mono: bool # Force load audio in mono mode
|
||||||
|
__chunk_length: float # Audio chunk length
|
||||||
__overlap: float
|
__overlap: float
|
||||||
__features: dict[Path, list[tuple[np.ndarray, float, int]]]
|
# Result
|
||||||
# { audioPath:
|
__features: dict[Path, list[tuple[np.ndarray, float, int]]] # This is a crime, I know
|
||||||
# [(embedding, pos, channel)...]
|
__features_lock: threading.Lock
|
||||||
|
# __features: { audioPath:
|
||||||
|
# [(embedding1, pos1, channel1),
|
||||||
|
# (embedding2, pos2, channel1)]
|
||||||
|
# ...
|
||||||
# }
|
# }
|
||||||
|
# Runtime
|
||||||
|
__audio_feeder_threadpool: list[concurrent.futures.Future]
|
||||||
|
__feature_extractor_threadpool: list[concurrent.futures.Future]
|
||||||
|
|
||||||
def __embedding_inference(self, audio_ndarray: np.ndarray) -> np.ndarray:
|
def __audio_inference_embedding(self, audio: list[tuple[np.ndarray, float, int]]) -> list[tuple[np.ndarray, float, int]]:
|
||||||
"""Uses embedding model to inference an audio. Returns embedding vectors.
|
"""Receives a list of audio chunks, and then extracts embeddings for all audio chunks, returns the resulting embedding as a list of tuples(embedding, time, channel_id)
|
||||||
Function to be overrided. Returns np.zeros(32).
|
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
audio_ndarray (np.ndarray):
|
audio (list[tuple[np.ndarray, float, int]]): list of audio chunks
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
np.ndarray: _description_
|
list[tuple[np.ndarray, float, int]]: List of (embedding vector, timepos, channel id)
|
||||||
"""
|
"""
|
||||||
return np.zeros(32)
|
features = []
|
||||||
|
for audio_chunk in audio:
|
||||||
|
audio, timepos, channel_id = audio_chunk
|
||||||
|
zero = np.zeros(32)
|
||||||
|
features.append( (zero, timepos, channel_id) )
|
||||||
|
time.sleep(0.01) # Simulate effort, change to simulate spent seconds in each audio file
|
||||||
|
return features
|
||||||
|
# To be overridden
|
||||||
|
|
||||||
def __embedding_extract(self, audio: tuple[np.ndarray, float, int]) -> tuple[np.ndarray, float, int, np.ndarray]:
|
def __audio_feeder_thread(self, thread_id: int, barrier: threading.Barrier):
|
||||||
"""Receives a tuple of audio, position, and channel ID, then adding the embedding to the tuple
|
try:
|
||||||
|
while True:
|
||||||
Args:
|
# Attempt to get audio path from audio path queue
|
||||||
audio (tuple[np.ndarray, float, int]): tuple of audio, position, channel id
|
new_audio_path = self.__audio_paths_list.get()
|
||||||
|
# Check thread exit condition (If the queue returns None, that means the audio path queue is now empty and the thread should end itself)
|
||||||
Returns:
|
if (new_audio_path is None):
|
||||||
tuple[np.ndarray, float, int, np.ndarray]: audio, position, channel id, embedding vector
|
self.__audio_paths_list.put(new_audio_path) # Put None back to notify other audio feeder threads
|
||||||
"""
|
# Omae wa mou shindeiru
|
||||||
audio_chunk, pos, channel_id = audio
|
break # Si la ETSISI ve esto seguramente me echarán de la escuela
|
||||||
return (audio_chunk, pos, channel_id, self.__embedding_inference(audio_chunk))
|
# Now that the audio path queue is not empty, try preprocessing an audio
|
||||||
|
logging.info(f"[MTAFE] [Audio Feeder {thread_id}] Preprocess: {new_audio_path.absolute()}")
|
||||||
def __audio_queue_feeder(self): # TODO: Upgrade to multithreaded loader?
|
|
||||||
"""Internal thread function. Preprocess and load the audio continuously to
|
|
||||||
audio_queue until the end of the audio_paths_list
|
|
||||||
"""
|
|
||||||
while (self.__audio_paths_list): # While there are still Path elements in path list
|
|
||||||
if (not (len(self.__audio_queue) < self.__max_audio_in_queue)):
|
|
||||||
if DEBUG: print("Audio Queue Thread: Queue Full, feeder thread sleeping for 5 seconds")
|
|
||||||
time.sleep(5)
|
|
||||||
while(len(self.__audio_queue) < self.__max_audio_in_queue): # While the audio queue is not full
|
|
||||||
new_audio_path = self.__audio_paths_list[0]
|
|
||||||
new_audio = audiopreprocessing.load_preprocessed_audio(
|
new_audio = audiopreprocessing.load_preprocessed_audio(
|
||||||
new_audio_path,
|
new_audio_path,
|
||||||
self.__desired_sr,
|
self.__desired_sr,
|
||||||
@@ -195,78 +340,176 @@ class AudioFeatureExtractor():
|
|||||||
self.__chunk_length,
|
self.__chunk_length,
|
||||||
self.__overlap
|
self.__overlap
|
||||||
)
|
)
|
||||||
with self.__queue_lock:
|
self.__audio_queue.put((new_audio, new_audio_path)) # In theory, this should block this audio feeder thread when the audio queue is full
|
||||||
self.__audio_queue.append(
|
logging.info(f"[MTAFE] [Audio Feeder {thread_id}] Feed: {new_audio_path.absolute()}")
|
||||||
(new_audio, new_audio_path)
|
logging.info("[MTAFE] [Audio Feeder {thread_id}] Waiting for other threads to finish")
|
||||||
)
|
barrier.wait()
|
||||||
pop_path = self.__audio_paths_list.pop(0)
|
if (thread_id == 0):
|
||||||
if DEBUG: print("Audio Queue Thread: Added new audio to queue", pop_path)
|
self.__audio_queue.put(None) # None to signal audio_queue has no more elements to process
|
||||||
if DEBUG: print("Audio Queue Thread: DONE. All audio files fed")
|
logging.info(f"[MTAFE] [Audio Feeder {thread_id}] Thread finished!")
|
||||||
|
except Exception as e:
|
||||||
|
logging.error(f"[MTAFE] [Audio Feeder {thread_id}] An exception occurred! Committing seppuku!")
|
||||||
|
logging.exception(e)
|
||||||
|
return
|
||||||
|
|
||||||
def __audio_queue_feature_extractor(self):
|
# while (not self.__audio_paths_list.empty()):
|
||||||
"""Internal thread function. Get audio from audio queue. And extract embedding vector
|
# if (not self.__audio_queue.full()):
|
||||||
for all audio chunks. Stores the resulting embedding into self.__features.
|
# # Feed audio
|
||||||
With Original Audio's Path as key, and list[tuple[np.ndarray, float, int]] (list of tuple of embedding vector, position, channel id)
|
# new_audio_path = self.__audio_paths_list.get()
|
||||||
"""
|
# self.__audio_paths_list.task_done()
|
||||||
while (self.__audio_paths_list or self.__audio_queue): # While there are still audio to be processed
|
# logging.info(f"[MTAFE] [Audio Feeder {thread_id}] Preprocess: {new_audio_path.absolute()}")
|
||||||
if (self.__audio_queue): # If audio queue is not empty
|
# new_audio = audiopreprocessing.load_preprocessed_audio(
|
||||||
with self.__queue_lock:
|
# new_audio_path,
|
||||||
audio_to_process, audio_path = self.__audio_queue.pop(0) # Get audio from queue
|
# self.__desired_sr,
|
||||||
if DEBUG: print(f"Feature Extractor Thread: Extracting {len(audio_to_process)} features from audio", audio_path)
|
# self.__mono,
|
||||||
for audio_chunk in audio_to_process:
|
# self.__chunk_length,
|
||||||
same_audio_chunk, timepos, channel_id, embedd_vect = self.__embedding_extract(audio_chunk)
|
# self.__overlap
|
||||||
if (audio_path not in self.__features.keys()):
|
# )
|
||||||
#if DEBUG: print("Adding new vector to", audio_path.name)
|
# self.__audio_queue.put((new_audio, new_audio_path))
|
||||||
self.__features[audio_path] = [(embedd_vect, timepos, channel_id)]
|
# logging.info(f"[MTAFE] [Audio Feeder {thread_id}] Feed: {new_audio_path.absolute()}")
|
||||||
else:
|
# logging.info(f"[MTAFE] [Audio Feeder {thread_id}] Thread finished!")
|
||||||
#if DEBUG: print("Adding vector to", audio_path.name)
|
|
||||||
self.__features[audio_path].append(
|
|
||||||
(embedd_vect, timepos, channel_id)
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
if DEBUG: print("Feature Extractor Thread: Queue Empty, extractor thread sleeping for 5 seconds") # If audio queue is empty, wait
|
|
||||||
time.sleep(5)
|
|
||||||
if DEBUG: print("Feature Extractor Thread: DONE. Extracted all features from all audio files")
|
|
||||||
|
|
||||||
def __init__(
|
#def testfeedthread(self, nthreads):
|
||||||
self,
|
# t1 = threading.Thread(target=self.__audio_feeder_thread, args=(1,))
|
||||||
audio_paths_list: list[Path],
|
# t2 = threading.Thread(target=self.__audio_feeder_thread, args=(2,))
|
||||||
max_audio_in_queue: int,
|
# t1.start(); t2.start()
|
||||||
desired_sr: int,
|
# #with self.__audio_feed_condition:
|
||||||
mono: bool,
|
# # self.__audio_feed_condition.notify_all()
|
||||||
chunk_length: float = 15.0,
|
# t1.join(); t2.join()
|
||||||
overlap: float = 2.0
|
# with concurrent.futures.ThreadPoolExecutor(max_workers=nthreads) as executor:
|
||||||
):
|
# for i in range(nthreads):
|
||||||
self.__audio_queue = []
|
# ft = executor.submit(self.__audio_feeder_thread, i)
|
||||||
self.__audio_paths_list = audio_paths_list
|
# self.__audio_loader_threadpool.append(ft)
|
||||||
self.__max_audio_in_queue = max_audio_in_queue
|
|
||||||
self.__queue_lock = threading.Lock()
|
def __check_all_audiofeed_thread_finished(self) -> bool:
|
||||||
self.__desired_sr = desired_sr
|
for ft in self.__audio_feeder_threadpool:
|
||||||
self.__mono = mono
|
if ft.running():
|
||||||
self.__chunk_length = chunk_length
|
return False
|
||||||
self.__overlap = overlap
|
return True
|
||||||
self.__features = {}
|
|
||||||
|
def __check_all_featureextractor_thread_finished(self) -> bool:
|
||||||
|
for ft in self.__feature_extractor_threadpool:
|
||||||
|
if ft.running():
|
||||||
|
return False
|
||||||
|
return True
|
||||||
|
|
||||||
|
def __feature_extractor_thread(self, thread_id):
|
||||||
|
while True:
|
||||||
|
# Attempt to get next audio chunks to process
|
||||||
|
next_audio_tuple = self.__audio_queue.get()
|
||||||
|
# Check thread exit condition
|
||||||
|
if (next_audio_tuple is None):
|
||||||
|
self.__audio_queue.put(next_audio_tuple) # Put the None back to notify other threads
|
||||||
|
break # unalive urself
|
||||||
|
else: # Assuming we got more tuples
|
||||||
|
current_audio_to_process, current_audio_path = next_audio_tuple # Deconstruct tuple
|
||||||
|
logging.info(f"[MTAFE] [Feature Extractor {thread_id}] Extracting: {current_audio_path}")
|
||||||
|
features_to_add = self.__audio_inference_embedding(current_audio_to_process)
|
||||||
|
with self.__features_lock:
|
||||||
|
self.__features[current_audio_path] = features_to_add
|
||||||
|
logging.info(f"[MTAFE] [Feature Extractor {thread_id}] Feature Extraction complete for {current_audio_path} w/ {len(features_to_add)} features")
|
||||||
|
logging.info(f"[MTAFE] [Feature Extractor {thread_id}] Thread finished!")
|
||||||
|
|
||||||
|
# while (not self.__check_all_audiofeed_thread_finished() or not self.__audio_queue.empty()):
|
||||||
|
# if (not self.__audio_queue.empty()):
|
||||||
|
# audio_to_process, audio_path = self.__audio_queue.get()
|
||||||
|
# self.__audio_queue.task_done()
|
||||||
|
# logging.info(f"[MTAFE] [Feature Extractor {thread_id}] Extracting: {audio_path}")
|
||||||
|
# features_to_add = self.__audio_inference_embedding(audio_to_process)
|
||||||
|
# logging.info(f"[MTAFE] [Feature Extractor {thread_id}] Extracted: {len(features_to_add)} features")
|
||||||
|
# with self.__features_lock:
|
||||||
|
# self.__features[audio_path] = features_to_add
|
||||||
|
# #with self.__audio_feed_condition: self.__audio_feed_condition.notify_all()
|
||||||
|
# logging.info(f"[MTAFE] [Feature Extractor {thread_id}] Feature Extraction complete for {audio_path} w/ {len(features_to_add)} features")
|
||||||
|
#else:
|
||||||
|
# if (not self.__check_all_audiofeed_thread_finished()):
|
||||||
|
# with self.__audio_feed_condition:
|
||||||
|
# logging.info(f"[MTAFE] [Feature Extractor {thread_id}] Audio queue empty: waiting")
|
||||||
|
# self.__audio_feed_condition.wait(10)
|
||||||
|
# self.__audio_feed_condition.wait_for(lambda: not self.__audio_queue.empty())
|
||||||
|
|
||||||
|
|
||||||
|
def __count_running_threads(self) -> tuple[int, int]:
|
||||||
|
running_extractors = 0
|
||||||
|
running_feeders = 0
|
||||||
|
for ft in self.__feature_extractor_threadpool:
|
||||||
|
if ft.running(): running_extractors += 1
|
||||||
|
for ft in self.__audio_feeder_threadpool:
|
||||||
|
if ft.running(): running_feeders += 1
|
||||||
|
return (running_feeders, running_extractors)
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def features(self) -> dict[Path, list[tuple[np.ndarray, float, int]]]:
|
def features(self) -> dict[Path, list[tuple[np.ndarray, float, int]]]:
|
||||||
return self.__features
|
return self.__features
|
||||||
|
|
||||||
def extract(self):
|
def extract(self):
|
||||||
print("Starting feature extraction for", len(self.__audio_paths_list), "file(s)")
|
total_amount = self.__audio_paths_list.qsize() - 1 # Account for None to indicate queue end
|
||||||
total_amount = len(self.__audio_paths_list)
|
logging.info(f"[MTAFE] [Main] Starting feature extraction for {total_amount} file(s)")
|
||||||
t_start = time.perf_counter()
|
t_start = time.perf_counter() # Timer
|
||||||
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
|
with concurrent.futures.ProcessPoolExecutor(max_workers=(self.__audio_feeder_threads + self.__feature_extractor_threads)) as executor:
|
||||||
self.__feeder_future = executor.submit(self.__audio_queue_feeder)
|
# Audio feeder threads
|
||||||
self.__extractor_future = executor.submit(self.__audio_queue_feature_extractor)
|
for i in range(self.__audio_feeder_threads):
|
||||||
while (self.__feeder_future.running() or self.__extractor_future.running()):
|
logging.info(f"[MTAFE] Started audio feeder thread {i}")
|
||||||
print(f"Processed {len(self.__features)}/{total_amount} (L:{len(self.__audio_queue)}/W{len(self.__audio_paths_list)})", end="\r")
|
ld_ft = executor.submit(self.__audio_feeder_thread, i, self.__audio_feeder_barrier)
|
||||||
time.sleep(1)
|
self.__audio_feeder_threadpool.append(ld_ft)
|
||||||
|
# Feature extractor threads
|
||||||
|
for i in range(self.__feature_extractor_threads):
|
||||||
|
logging.info(f"[MTAFE] Started feature extractor thread {i}")
|
||||||
|
ex_ft = executor.submit(self.__feature_extractor_thread, i)
|
||||||
|
self.__feature_extractor_threadpool.append(ex_ft)
|
||||||
|
# Progress checking
|
||||||
|
while ( (not self.__check_all_audiofeed_thread_finished()) and (not self.__check_all_featureextractor_thread_finished()) ):
|
||||||
|
nfeeder, nextract = self.__count_running_threads()
|
||||||
|
print(f"[MTAFE Progress] Processed {len(self.__features)}/{total_amount} (L:{self.__audio_queue.qsize()}/W:{self.__audio_paths_list.qsize()}, LD:{nfeeder}/EXT:{nextract})", end="\r")
|
||||||
t_stop = time.perf_counter()
|
t_stop = time.perf_counter()
|
||||||
print(f"Processed {len(self.__features)}/{total_amount} (L:{len(self.__audio_queue)}/W:{len(self.__audio_paths_list)} COMPLETE)")
|
logging.info(f"[MTAFE] Processed {len(self.__features)}/{total_amount} (L:{self.__audio_queue.qsize() - 1}/W:{self.__audio_paths_list.qsize() - 1} COMPLETE)")
|
||||||
delta_t = t_stop - t_start
|
delta_t = t_stop - t_start
|
||||||
total_features = sum( [len(self.__features[path]) for path in self.__features] )
|
total_features = sum( [len(self.__features[path]) for path in self.__features] )
|
||||||
print()
|
logging.info(f"[MTAFE] Extraction complete. Took {delta_t} seconds. Added {total_features} vectors/embeddings")
|
||||||
print("Extraction completed")
|
|
||||||
print(f"Took {delta_t} seconds. Added {total_features} vectors/embeddings")
|
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
audio_paths: list[Path],
|
||||||
|
max_audio_in_queue: int = 16,
|
||||||
|
audio_feeder_threads: int = 8,
|
||||||
|
feature_extractor_threads: int = 8,
|
||||||
|
desired_sr: int = 32000,
|
||||||
|
force_mono: bool = False,
|
||||||
|
chunk_length: float = 15.0,
|
||||||
|
chunk_overlap: float = 2.0,
|
||||||
|
):
|
||||||
|
# Check if the paths passed in are all valid and add them to queue
|
||||||
|
self.__audio_paths_list = multiprocessing.Queue()
|
||||||
|
for p in audio_paths:
|
||||||
|
if not p.is_file():
|
||||||
|
raise Exception(f"Path '{p.absolute()}' is NOT a valid file!")
|
||||||
|
else:
|
||||||
|
self.__audio_paths_list.put(p)
|
||||||
|
self.__audio_paths_list.put(None) # To signal to the producer that the audio path list is empty, since Queue.empty() is unreliable
|
||||||
|
|
||||||
|
logging.info(f"[MTAFE] [Constructor] Queued {self.__audio_paths_list.qsize() - 1} files")
|
||||||
|
|
||||||
|
# Set up private attributes
|
||||||
|
## Audio preprocessing parameters
|
||||||
|
self.__desired_sr = desired_sr
|
||||||
|
self.__mono = force_mono
|
||||||
|
self.__chunk_length = chunk_length
|
||||||
|
self.__overlap = chunk_overlap
|
||||||
|
|
||||||
|
## Extractor/Feeder settings
|
||||||
|
self.__max_audio_in_queue = max_audio_in_queue
|
||||||
|
self.__audio_feeder_threads = audio_feeder_threads
|
||||||
|
self.__feature_extractor_threads = feature_extractor_threads
|
||||||
|
|
||||||
|
## Set up runtime conditions
|
||||||
|
self.__audio_queue = multiprocessing.Queue(maxsize=self.__max_audio_in_queue)
|
||||||
|
self.__features = {}
|
||||||
|
self.__features_lock = multiprocessing.Lock()
|
||||||
|
self.__audio_feeder_barrier = multiprocessing.Barrier(self.__audio_feeder_threads)
|
||||||
|
self.__audio_feeder_threadpool = []
|
||||||
|
self.__feature_extractor_threadpool = []
|
||||||
|
|
||||||
|
logging.info(f"[MTAFE] [Constructor] Extraction parameters: {desired_sr}Hz, Mono: {force_mono}, Divide into {chunk_length}s chunks with {chunk_overlap}s of overlap")
|
||||||
|
logging.info(f"[MTAFE] [Constructor] Using {audio_feeder_threads} threads for preprocessing audio and {feature_extractor_threads} threads for feature extraction. Max queue size of {max_audio_in_queue} files")
|
||||||
|
|
||||||
|
# More audio embeddings specific code below (To be overridden)
|
||||||
8
FeatureExtraction/mtafe.py
Normal file
8
FeatureExtraction/mtafe.py
Normal file
@@ -0,0 +1,8 @@
|
|||||||
|
import dataset_files
|
||||||
|
import multiprocessing
|
||||||
|
import logging
|
||||||
|
import numpy as np
|
||||||
|
import threading
|
||||||
|
import queue
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
193
FeatureExtraction/mtafe_panns.py
Normal file
193
FeatureExtraction/mtafe_panns.py
Normal file
@@ -0,0 +1,193 @@
|
|||||||
|
from dataset_files import MultiThreadedAudioFeatureExtractor
|
||||||
|
from pathlib import Path
|
||||||
|
from panns_inference import AudioTagging
|
||||||
|
import logging
|
||||||
|
import numpy as np
|
||||||
|
import queue
|
||||||
|
import concurrent.futures
|
||||||
|
import threading
|
||||||
|
import time
|
||||||
|
import audiopreprocessing
|
||||||
|
#import torch
|
||||||
|
#import gc
|
||||||
|
|
||||||
|
class mtafe_panns():
|
||||||
|
__audio_queue: queue.Queue[ # List of ...
|
||||||
|
tuple[ # Pair of chunked audio and its path
|
||||||
|
list[tuple[np.ndarray, float, int]], # Chunked audio
|
||||||
|
Path # Path to original audio
|
||||||
|
]
|
||||||
|
] # Listed of Chunked/Resampled audio
|
||||||
|
__audio_loader_threads: int # Amount of audio feeder threads
|
||||||
|
__feature_extractor_threads: int # Amount of feature extractor threads (if the method allows)
|
||||||
|
__audio_paths_list: queue.Queue[Path] # Path list to audio
|
||||||
|
__max_audio_in_queue: int # Maximum audio in queue
|
||||||
|
__desired_sr: int
|
||||||
|
__mono: bool
|
||||||
|
__chunk_length: float
|
||||||
|
__overlap: float
|
||||||
|
__features: dict[Path, list[tuple[np.ndarray, float, int]]] # This is a crime, I know
|
||||||
|
__features_lock: threading.Lock
|
||||||
|
__audio_loader_threadpool: list[concurrent.futures.Future]
|
||||||
|
__feature_extractor_threadpool: list[concurrent.futures.Future]
|
||||||
|
__at: AudioTagging
|
||||||
|
__batch_size: int
|
||||||
|
|
||||||
|
def __init__(self,
|
||||||
|
audio_paths: list[Path],
|
||||||
|
max_audio_in_queue: int = 16,
|
||||||
|
audio_feeder_threads: int = 8,
|
||||||
|
feature_extractor_threads: int = 8,
|
||||||
|
desired_sr: int = 32000,
|
||||||
|
force_mono: bool = False,
|
||||||
|
chunk_length: float = 15.0,
|
||||||
|
chunk_overlap: float = 2.0,
|
||||||
|
batch_size: int = 20
|
||||||
|
):
|
||||||
|
# Check if the paths passed in are all valid and add them to queue
|
||||||
|
self.__audio_paths_list = queue.Queue()
|
||||||
|
for p in audio_paths:
|
||||||
|
if not p.is_file():
|
||||||
|
raise Exception(f"Path '{p.absolute()}' is NOT a valid file!")
|
||||||
|
else:
|
||||||
|
self.__audio_paths_list.put(p)
|
||||||
|
#self.__audio_paths_list.task_done()
|
||||||
|
|
||||||
|
logging.info(f"[MTAFE] [Constructor] Queued {self.__audio_paths_list.qsize()} files")
|
||||||
|
|
||||||
|
# Set up private attributes
|
||||||
|
## Audio preprocessing parameters
|
||||||
|
self.__desired_sr = desired_sr
|
||||||
|
self.__mono = force_mono
|
||||||
|
self.__chunk_length = chunk_length
|
||||||
|
self.__overlap = chunk_overlap
|
||||||
|
|
||||||
|
## Extractor/Feeder settings
|
||||||
|
self.__max_audio_in_queue = max_audio_in_queue
|
||||||
|
self.__audio_loader_threads = audio_feeder_threads
|
||||||
|
self.__feature_extractor_threads = feature_extractor_threads
|
||||||
|
|
||||||
|
## Set up runtime conditions
|
||||||
|
self.__audio_queue = queue.Queue(maxsize=max_audio_in_queue)
|
||||||
|
self.__features = {}
|
||||||
|
self.__features_lock = threading.Lock()
|
||||||
|
self.__audio_loader_threadpool = []
|
||||||
|
self.__feature_extractor_threadpool = []
|
||||||
|
|
||||||
|
logging.info(f"[MTAFE] [Constructor] Extraction parameters: {desired_sr}Hz, Mono: {force_mono}, Divide into {chunk_length}s chunks with {chunk_overlap}s of overlap")
|
||||||
|
logging.info(f"[MTAFE] [Constructor] Using {audio_feeder_threads} threads for preprocessing audio and {feature_extractor_threads} threads for feature extraction. Max queue size of {max_audio_in_queue} files")
|
||||||
|
|
||||||
|
logging.info(f"[MTAFE] [Constructor] Initializing PANNs")
|
||||||
|
logging.info(f"[MTAFE] [Constructor] Inferencing with batch size {batch_size}")
|
||||||
|
self.__at = AudioTagging(checkpoint_path=None, device='cuda')
|
||||||
|
self.__batch_size = batch_size
|
||||||
|
|
||||||
|
def __chunks(self, lst, n):
|
||||||
|
# Stolen straight from Stackoverflow
|
||||||
|
"""Yield successive n-sized chunks from lst."""
|
||||||
|
for i in range(0, len(lst), n):
|
||||||
|
yield lst[i:i + n]
|
||||||
|
|
||||||
|
def __audio_inference_embedding(self, audio: list[tuple[np.ndarray, float, int]]) -> list[tuple[np.ndarray, float, int]]:
|
||||||
|
audio_chunk_list = []
|
||||||
|
timepos_list = []
|
||||||
|
channel_id_list = []
|
||||||
|
embedding_list = []
|
||||||
|
|
||||||
|
# Split into equal sized list
|
||||||
|
for audio_chunk, timepos, channel in audio:
|
||||||
|
audio_chunk_list.append(audio_chunk)
|
||||||
|
timepos_list.append(timepos)
|
||||||
|
channel_id_list.append(channel)
|
||||||
|
|
||||||
|
# Convert audio_chunk_list into numpy array
|
||||||
|
audio_chunk_list = np.array(audio_chunk_list)
|
||||||
|
|
||||||
|
#logging.info("[MTAFE] [PANNs] Inferencing...")
|
||||||
|
try:
|
||||||
|
for i, batch in enumerate(self.__chunks(audio_chunk_list, self.__batch_size)):
|
||||||
|
(clipwise_output, embedding) = self.__at.inference(batch)
|
||||||
|
for vect in embedding: # vect: np.ndarray
|
||||||
|
embedding_list.append(vect)
|
||||||
|
logging.info(f"[MTAFE] [PANNs] Inferenced batch {i}")
|
||||||
|
|
||||||
|
assert len(audio_chunk_list) == len(timepos_list) == len(channel_id_list) == len(embedding_list)
|
||||||
|
except Exception as e:
|
||||||
|
logging.critical("[MTAFE] [PANNs] ERROR! INFERENCE FAILED!!! OR LIST SIZE MISMATCH")
|
||||||
|
logging.critical(e)
|
||||||
|
embedding_list = [None for _ in audio_chunk_list] # Clearing embedding_list and filling it with None
|
||||||
|
return list(zip(embedding_list, channel_id_list, embedding_list))
|
||||||
|
|
||||||
|
def __audio_feeder_thread(self, thread_id):
|
||||||
|
while (not self.__audio_paths_list.empty()):
|
||||||
|
new_audio_path = self.__audio_paths_list.get()
|
||||||
|
self.__audio_paths_list.task_done()
|
||||||
|
logging.info(f"[MTAFE] [Audio Feeder {thread_id}] Preprocess: {new_audio_path.absolute()}")
|
||||||
|
new_audio = audiopreprocessing.load_preprocessed_audio(
|
||||||
|
new_audio_path,
|
||||||
|
self.__desired_sr,
|
||||||
|
self.__mono,
|
||||||
|
self.__chunk_length,
|
||||||
|
self.__overlap
|
||||||
|
)
|
||||||
|
self.__audio_queue.put((new_audio, new_audio_path))
|
||||||
|
logging.info(f"[MTAFE] [Audio Feeder {thread_id}] Feed: {new_audio_path.absolute()}")
|
||||||
|
logging.info(f"[MTAFE] [Audio Feeder {thread_id}] Thread finished!")
|
||||||
|
|
||||||
|
def __check_all_audiofeed_thread_finished(self) -> bool:
|
||||||
|
for ft in self.__audio_loader_threadpool:
|
||||||
|
if ft.running():
|
||||||
|
return False
|
||||||
|
return True
|
||||||
|
|
||||||
|
def __check_all_featureextractor_thread_finished(self) -> bool:
|
||||||
|
for ft in self.__feature_extractor_threadpool:
|
||||||
|
if ft.running():
|
||||||
|
return False
|
||||||
|
return True
|
||||||
|
|
||||||
|
def __feature_extractor_thread(self, thread_id):
|
||||||
|
while (not self.__check_all_audiofeed_thread_finished() or not self.__audio_queue.empty()):
|
||||||
|
if (not self.__audio_queue.empty()):
|
||||||
|
audio_to_process, audio_path = self.__audio_queue.get()
|
||||||
|
self.__audio_queue.task_done()
|
||||||
|
logging.info(f"[MTAFE] [Feature Extractor {thread_id}] Extracting: {audio_path}")
|
||||||
|
features_to_add = self.__audio_inference_embedding(audio_to_process)
|
||||||
|
logging.info(f"[MTAFE] [Feature Extractor {thread_id}] Extracted: {len(features_to_add)} features")
|
||||||
|
with self.__features_lock:
|
||||||
|
self.__features[audio_path] = features_to_add
|
||||||
|
logging.info(f"[MTAFE] [Feature Extractor {thread_id}] Feature Extraction complete for {audio_path} w/ {len(features_to_add)} features")
|
||||||
|
logging.info(f"[MTAFE] [Feature Extractor {thread_id}] Thread finished!")
|
||||||
|
|
||||||
|
def __count_running_threads(self) -> tuple[int, int]:
|
||||||
|
running_extractors = 0
|
||||||
|
running_feeders = 0
|
||||||
|
for ft in self.__feature_extractor_threadpool:
|
||||||
|
if ft.running(): running_extractors += 1
|
||||||
|
for ft in self.__audio_loader_threadpool:
|
||||||
|
if ft.running(): running_feeders += 1
|
||||||
|
return (running_feeders, running_extractors)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def features(self) -> dict[Path, list[tuple[np.ndarray, float, int]]]:
|
||||||
|
return self.__features
|
||||||
|
|
||||||
|
def extract(self):
|
||||||
|
total_amount = self.__audio_paths_list.qsize()
|
||||||
|
logging.info(f"[MTAFE] [Main] Starting feature extraction for {total_amount} file(s)")
|
||||||
|
t_start = time.perf_counter()
|
||||||
|
with concurrent.futures.ThreadPoolExecutor(max_workers=(self.__audio_loader_threads + self.__feature_extractor_threads)) as executor:
|
||||||
|
for i in range(self.__audio_loader_threads):
|
||||||
|
ld_ft = executor.submit(self.__audio_feeder_thread, i)
|
||||||
|
self.__audio_loader_threadpool.append(ld_ft)
|
||||||
|
for i in range(self.__feature_extractor_threads):
|
||||||
|
ld_ft = executor.submit(self.__feature_extractor_thread, i)
|
||||||
|
self.__feature_extractor_threadpool.append(ld_ft)
|
||||||
|
while ( (not self.__check_all_audiofeed_thread_finished()) and (not self.__check_all_featureextractor_thread_finished()) ):
|
||||||
|
nfeeder, nextract = self.__count_running_threads()
|
||||||
|
print(f"[MTAFE Progress] Processed {len(self.__features)}/{total_amount} (L:{self.__audio_queue.qsize()}/W:{self.__audio_paths_list.qsize()}, LD:{nfeeder}/EXT:{nextract})", end="\r")
|
||||||
|
t_stop = time.perf_counter()
|
||||||
|
logging.info(f"[MTAFE] Processed {len(self.__features)}/{total_amount} (L:{self.__audio_queue.qsize()}/W:{self.__audio_paths_list.qsize()} COMPLETE)")
|
||||||
|
delta_t = t_stop - t_start
|
||||||
|
total_features = sum( [len(self.__features[path]) for path in self.__features] )
|
||||||
|
logging.info(f"[MTAFE] Extraction complete. Took {delta_t} seconds. Added {total_features} vectors/embeddings")
|
||||||
@@ -1,3 +1,17 @@
|
|||||||
from dataset_files import AudioFeatureExtractor, random_audio_chunk
|
import logging
|
||||||
afe = AudioFeatureExtractor(random_audio_chunk(32), 16, 32000, False)
|
from audiopreprocessing import triggerlog
|
||||||
afe.extract()
|
#logger = logging.getLogger(__name__)
|
||||||
|
logging.basicConfig(format="%(asctime)s/%(levelname)s: [%(module)s] %(message)s", level=logging.INFO)
|
||||||
|
|
||||||
|
from dataset_files import MultiThreadedAudioFeatureExtractor, random_audio_chunk
|
||||||
|
mtafe = MultiThreadedAudioFeatureExtractor(
|
||||||
|
audio_paths=random_audio_chunk(8),
|
||||||
|
max_audio_in_queue=8,
|
||||||
|
audio_feeder_threads=8,
|
||||||
|
feature_extractor_threads=1,
|
||||||
|
desired_sr=32000,
|
||||||
|
force_mono=False,
|
||||||
|
chunk_length=15,
|
||||||
|
chunk_overlap=2
|
||||||
|
)
|
||||||
|
mtafe.extract()
|
||||||
|
|||||||
17
FeatureExtraction/test_mtafe.py
Normal file
17
FeatureExtraction/test_mtafe.py
Normal file
@@ -0,0 +1,17 @@
|
|||||||
|
#import mtafe
|
||||||
|
import logging
|
||||||
|
#import dataset_files
|
||||||
|
logging.basicConfig(format="%(asctime)s/%(levelname)s: [%(module)s] %(message)s", level=logging.DEBUG)
|
||||||
|
|
||||||
|
logging.info("Running tests")
|
||||||
|
# m = mtafe.mtafe(
|
||||||
|
# audio_paths=dataset_files.random_audio_chunk(2),
|
||||||
|
# max_audio_in_queue=8,
|
||||||
|
# audio_feeder_threads=8,
|
||||||
|
# feature_extractor_threads=1,
|
||||||
|
# desired_sr=32000,
|
||||||
|
# force_mono=False,
|
||||||
|
# chunk_length=15,
|
||||||
|
# chunk_overlap=2
|
||||||
|
# )
|
||||||
|
# m.run()
|
||||||
24
FeatureExtraction/test_panns.py
Normal file
24
FeatureExtraction/test_panns.py
Normal file
@@ -0,0 +1,24 @@
|
|||||||
|
import logging
|
||||||
|
from audiopreprocessing import triggerlog
|
||||||
|
#logger = logging.getLogger(__name__)
|
||||||
|
import sys
|
||||||
|
logging.basicConfig(format="%(asctime)s/%(levelname)s: [%(module)s] %(message)s", level=logging.INFO, handlers=[logging.FileHandler('test_panns.log'), logging.StreamHandler(sys.stdout)])
|
||||||
|
from pathlib import Path
|
||||||
|
from mtafe_panns import mtafe_panns
|
||||||
|
from dataset_files import random_audio_chunk, serialize_dict_obj
|
||||||
|
mtafe = mtafe_panns(
|
||||||
|
audio_paths=random_audio_chunk(4),
|
||||||
|
max_audio_in_queue=4,
|
||||||
|
audio_feeder_threads=4,
|
||||||
|
feature_extractor_threads=1,
|
||||||
|
desired_sr=32000,
|
||||||
|
force_mono=False,
|
||||||
|
chunk_length=15,
|
||||||
|
chunk_overlap=2,
|
||||||
|
batch_size=32
|
||||||
|
)
|
||||||
|
mtafe.extract()
|
||||||
|
|
||||||
|
print("Saving inferenced results to file...")
|
||||||
|
p = Path('./test_panns.pkl')
|
||||||
|
serialize_dict_obj(p, mtafe.features)
|
||||||
95
mtafe_lab/audiopreprocessing.py
Normal file
95
mtafe_lab/audiopreprocessing.py
Normal file
@@ -0,0 +1,95 @@
|
|||||||
|
import librosa
|
||||||
|
import pickle
|
||||||
|
import os
|
||||||
|
import numpy as np
|
||||||
|
from pathlib import Path
|
||||||
|
import logging
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
def triggerlog():
|
||||||
|
logger.critical("Testing: info")
|
||||||
|
|
||||||
|
def resample_load(input_path : Path, target_sr : int = 16000, mono_audio : bool = False) -> np.ndarray: # AI
|
||||||
|
"""Load and resamples the audio into `target_sr`.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
input_path (Path): pathlib.Path object to audio file
|
||||||
|
target_sr (int, optional): Target Sample Rate to resample. Defaults to 16000.
|
||||||
|
mono_audio (bool, optional): Load the audio in mono mode. Defaults to False.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
np.ndarray: _description_
|
||||||
|
"""
|
||||||
|
# Load audio file with original sample rate
|
||||||
|
logger.info(f"[resample_load] Loading audio {input_path}")
|
||||||
|
audio, orig_sr = librosa.load(input_path, sr=None, mono=mono_audio)
|
||||||
|
|
||||||
|
# Resample if necessary
|
||||||
|
if orig_sr != target_sr:
|
||||||
|
logger.info(f"[resample_load] Resampling to {target_sr}")
|
||||||
|
audio = librosa.resample(audio, orig_sr=orig_sr, target_sr=target_sr)
|
||||||
|
|
||||||
|
return audio
|
||||||
|
|
||||||
|
def chunk_audio(audio : np.ndarray, sr: int, chunk_length: float = 10.0, overlap: float = 2.0) -> tuple[list[np.ndarray], list[float], int]: # AI
|
||||||
|
"""
|
||||||
|
Chunks audio file into overlapping segments. Only pass in mono audio here.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
audio_file: Loaded audio ndarray (one channel only)
|
||||||
|
sr: Sample rate for the given audio file
|
||||||
|
chunk_length: Length of each chunk in seconds
|
||||||
|
overlap: Overlap between chunks in seconds
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
List of audio chunks, list of chunk positions, and given sample rate
|
||||||
|
"""
|
||||||
|
logger.info(f"[chunk_audio] Chunking audio ({len(audio) / sr}s)")
|
||||||
|
# Calculate chunk size and hop length in samples
|
||||||
|
chunk_size = int(chunk_length * sr)
|
||||||
|
hop_length = int((chunk_length - overlap) * sr)
|
||||||
|
|
||||||
|
# Generate chunks
|
||||||
|
chunks = []
|
||||||
|
positions = []
|
||||||
|
k = 0
|
||||||
|
for i in range(0, len(audio) - chunk_size + 1, hop_length):
|
||||||
|
chunk = audio[i:i + chunk_size]
|
||||||
|
chunks.append(chunk)
|
||||||
|
positions.append(i / sr)
|
||||||
|
k += 1
|
||||||
|
if k == 0: # The full audio length is less than chunk_length
|
||||||
|
chunks = [audio]
|
||||||
|
positions = [0.0]
|
||||||
|
logger.info(f"[chunk_audio] Audio less than chunk_length. Returning original audio as chunk\r")
|
||||||
|
else:
|
||||||
|
logger.info(f"[chunk_audio] Audio is split into {k} chunks")
|
||||||
|
|
||||||
|
return chunks, positions, sr
|
||||||
|
|
||||||
|
def load_preprocessed_audio(
|
||||||
|
path: Path,
|
||||||
|
desired_sr: int,
|
||||||
|
mono: bool = False,
|
||||||
|
chunk_length: float = 15.0,
|
||||||
|
overlap: float = 2.0) -> list[tuple[np.ndarray, float, int]]:
|
||||||
|
|
||||||
|
result = []
|
||||||
|
# Load and resample audio
|
||||||
|
audio = resample_load(path, desired_sr, mono) # Stereo 2D matrix, Mono 1D array
|
||||||
|
if mono or (audio.ndim == 1):
|
||||||
|
# Chunk audio: mono (or the audio file loaded in itself is mono)
|
||||||
|
chunks, positions, _ = chunk_audio(audio, desired_sr, chunk_length, overlap)
|
||||||
|
assert len(chunks) == len(positions)
|
||||||
|
result.extend(zip(chunks, positions, [-1 for _ in range(len(chunks))]))
|
||||||
|
# (ndarray_chunk1, pos1, -1): first audio chunk, position1, -1 (Mono channel indicator)
|
||||||
|
else:
|
||||||
|
# Chunk audio: stereo/multichannel
|
||||||
|
for channel_id, channel_audio in enumerate(audio):
|
||||||
|
chunks, positions, _ = chunk_audio(channel_audio, desired_sr, chunk_length, overlap)
|
||||||
|
assert len(chunks) == len(positions)
|
||||||
|
result.extend(zip(chunks, positions, [channel_id for _ in range(len(chunks))]))
|
||||||
|
# (ndarray_chunk1, pos1, 0): first audio chunk, position1, 0 (channel 0)
|
||||||
|
logging.info(f"[load_preprocessed_audio] Loaded audio {path} ({desired_sr}Hz, Chunk {chunk_length}s with overlap {overlap}s) MONO:{mono}")
|
||||||
|
return result
|
||||||
135
mtafe_lab/dataset.py
Normal file
135
mtafe_lab/dataset.py
Normal file
@@ -0,0 +1,135 @@
|
|||||||
|
import platform
|
||||||
|
import os
|
||||||
|
import pickle
|
||||||
|
import random
|
||||||
|
import multiprocessing
|
||||||
|
import threading
|
||||||
|
import time
|
||||||
|
import concurrent.futures
|
||||||
|
import numpy as np
|
||||||
|
from pathlib import Path
|
||||||
|
import audiopreprocessing
|
||||||
|
import logging
|
||||||
|
import queue
|
||||||
|
|
||||||
|
def serialize_dict_obj(path : Path, object : dict) -> int:
|
||||||
|
"""Serializes Python Dictionary object to a file via Pickle.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
path (Path): Path to store the file
|
||||||
|
object (dict): Dictionary object to serialize
|
||||||
|
Returns:
|
||||||
|
int: size in bytes written
|
||||||
|
"""
|
||||||
|
# Horrible practice, horrible security, but it will work for now
|
||||||
|
with path.open("wb") as fp:
|
||||||
|
pickle.dump(object, fp)
|
||||||
|
fp.seek(0, os.SEEK_END)
|
||||||
|
size = fp.tell()
|
||||||
|
return size
|
||||||
|
|
||||||
|
logging.info("Reading local dataset directory structure...")
|
||||||
|
|
||||||
|
ASMRThreePath = Path("C:\\ASMRThree")
|
||||||
|
ASMRTwoPath = Path("D:\\ASMRTwo")
|
||||||
|
ASMROnePath = Path("E:\\ASMROne")
|
||||||
|
|
||||||
|
if (platform.system() == 'Linux'):
|
||||||
|
ASMROnePath = Path('/mnt/Scratchpad/ASMROne')
|
||||||
|
ASMRTwoPath = Path('/mnt/MyStuffz/ASMRTwo')
|
||||||
|
ASMRThreePath = Path('/mnt/Windows11/ASMRThree')
|
||||||
|
|
||||||
|
size_one, size_two, size_three = 0, 0, 0
|
||||||
|
files_one, files_two, files_three = [], [], []
|
||||||
|
folders_one, folders_two, folders_three = [], [], []
|
||||||
|
|
||||||
|
# Statistic calculation for ASMROne
|
||||||
|
for root, dirs, files in ASMROnePath.walk(): # Root will iterate through all folders
|
||||||
|
if root.absolute() != ASMROnePath.absolute(): # Skip root of ASMROnePath
|
||||||
|
folders_one.append(root) # Add folder to list
|
||||||
|
for fname in files: # Iterate through all files in current root
|
||||||
|
file = root/fname # Get file path
|
||||||
|
assert file.is_file()
|
||||||
|
files_one.append(file)
|
||||||
|
size_one += file.stat().st_size # Get file size
|
||||||
|
|
||||||
|
# Statistic calculation for ASMRTwo
|
||||||
|
for root, dirs, files in ASMRTwoPath.walk(): # Root will iterate through all folders
|
||||||
|
if root.absolute() != ASMRTwoPath.absolute(): # Skip root of ASMRTwoPath
|
||||||
|
folders_two.append(root) # Add folder to list
|
||||||
|
for fname in files: # Iterate through all files in current root
|
||||||
|
file = root/fname # Get file path
|
||||||
|
assert file.is_file()
|
||||||
|
files_two.append(file)
|
||||||
|
size_two += file.stat().st_size # Get file size
|
||||||
|
|
||||||
|
# Statistic calculation for ASMRThree
|
||||||
|
for root, dirs, files in ASMRThreePath.walk(): # Root will iterate through all folders
|
||||||
|
if root.absolute() != ASMRThreePath.absolute(): # Skip root of ASMRThreePath
|
||||||
|
folders_three.append(root) # Add folder to list
|
||||||
|
for fname in files: # Iterate through all files in current root
|
||||||
|
file = root/fname # Get file path
|
||||||
|
assert file.is_file()
|
||||||
|
files_three.append(file)
|
||||||
|
size_three += file.stat().st_size # Get file size
|
||||||
|
|
||||||
|
DataSubsetPaths = [ASMROnePath, ASMRTwoPath, ASMRThreePath]
|
||||||
|
DLSiteWorksPaths = []
|
||||||
|
# Collect ASMR Works (RJ ID, Paths)
|
||||||
|
for ASMRSubsetPath in DataSubsetPaths:
|
||||||
|
for WorkPaths in ASMRSubsetPath.iterdir():
|
||||||
|
DLSiteWorksPaths.append(WorkPaths)
|
||||||
|
|
||||||
|
fileExt2fileType = {
|
||||||
|
".TXT": "Document",
|
||||||
|
".WAV": "Audio",
|
||||||
|
".MP3": "Audio",
|
||||||
|
".PNG": "Image",
|
||||||
|
".JPG": "Image",
|
||||||
|
".VTT": "Subtitle",
|
||||||
|
".PDF": "Document",
|
||||||
|
".FLAC": "Audio",
|
||||||
|
".MP4": "Video",
|
||||||
|
".LRC": "Subtitle",
|
||||||
|
".SRT": "Subtitle",
|
||||||
|
".JPEG": "Image",
|
||||||
|
".ASS": "Subtitle",
|
||||||
|
"": "NO EXTENSION",
|
||||||
|
".M4A": "Audio",
|
||||||
|
".MKV": "Video"
|
||||||
|
}
|
||||||
|
fileext_stat = {}
|
||||||
|
file_list = files_one + files_two + files_three
|
||||||
|
file_list_count = len(file_list)
|
||||||
|
|
||||||
|
for file in file_list:
|
||||||
|
f_ext = file.suffix.upper()
|
||||||
|
if (f_ext in fileext_stat.keys()):
|
||||||
|
fileext_stat[f_ext]['Count'] += 1
|
||||||
|
fileext_stat[f_ext]['List'].append(file)
|
||||||
|
fileext_stat[f_ext]['ExtensionMass'] += file.stat().st_size
|
||||||
|
else:
|
||||||
|
fileext_stat[f_ext] = {}
|
||||||
|
fileext_stat[f_ext]['Count'] = 1
|
||||||
|
fileext_stat[f_ext]['List'] = [file]
|
||||||
|
fileext_stat[f_ext]['ExtensionMass'] = file.stat().st_size # The total sum of sizes of the same file extension
|
||||||
|
fileext_stat[f_ext]['MediaType'] = fileExt2fileType[f_ext]
|
||||||
|
|
||||||
|
audio_paths = []
|
||||||
|
for extension in fileext_stat: # I can't be bothered to convert this into a list compresion
|
||||||
|
if fileext_stat[extension]['MediaType'] == "Audio":
|
||||||
|
audio_paths += fileext_stat[extension]['List']
|
||||||
|
|
||||||
|
def random_audio_chunk(n : int, seed : int = 177013) -> list[Path]:
|
||||||
|
"""Returns a random selection of audio files
|
||||||
|
|
||||||
|
Args:
|
||||||
|
n (int): Amount of files to return
|
||||||
|
seed (int, optional): Seed for RNG. Defaults to 177013.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
list[Path]: List of randomly selected audio paths (using Path object)
|
||||||
|
"""
|
||||||
|
random.seed(seed)
|
||||||
|
#return random.choices(audio_paths, k=n) # Contains repeated elements
|
||||||
|
return random.sample(audio_paths, k=n)
|
||||||
32
mtafe_lab/mtafe.py
Normal file
32
mtafe_lab/mtafe.py
Normal file
@@ -0,0 +1,32 @@
|
|||||||
|
import logging
|
||||||
|
logging.basicConfig(format="%(asctime)s/%(levelname)s: [%(module)s] %(message)s", level=logging.INFO)
|
||||||
|
|
||||||
|
import multiprocessing
|
||||||
|
import multiprocessing.process
|
||||||
|
import dataset
|
||||||
|
import audiopreprocessing
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
def copy_worker(origin_queue, target_queue):
|
||||||
|
p = origin_queue.get()
|
||||||
|
logging.info(f"Processing: {p}")
|
||||||
|
l = audiopreprocessing.load_preprocessed_audio(p, 32000, True)
|
||||||
|
print("Preprocess complete, putting it into queue")
|
||||||
|
target_queue.put(l) # Even on a small scale test, the process will always hang here
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
audio_path_queue = multiprocessing.Queue()
|
||||||
|
audio_queue = multiprocessing.Queue()
|
||||||
|
|
||||||
|
rand_paths = dataset.random_audio_chunk(1)
|
||||||
|
for p in rand_paths:
|
||||||
|
audio_path_queue.put(p)
|
||||||
|
|
||||||
|
print("Files queued")
|
||||||
|
|
||||||
|
processes = [multiprocessing.Process(target=copy_worker, args=(audio_path_queue, audio_queue)) for _ in range(1)]
|
||||||
|
for p in processes: p.start()
|
||||||
|
for p in processes: p.join()
|
||||||
|
|
||||||
|
print("Joined")
|
||||||
|
#for _ in range(1): print(audio_queue.get())
|
||||||
30
mtafe_lab/test_mp.py
Normal file
30
mtafe_lab/test_mp.py
Normal file
@@ -0,0 +1,30 @@
|
|||||||
|
import logging
|
||||||
|
logging.basicConfig(format="%(asctime)s/%(levelname)s: [%(module)s] %(message)s", level=logging.INFO)
|
||||||
|
|
||||||
|
import multiprocessing
|
||||||
|
from dataset import random_audio_chunk
|
||||||
|
import audiopreprocessing
|
||||||
|
from time import sleep
|
||||||
|
|
||||||
|
origin_queue = multiprocessing.Queue()
|
||||||
|
target_queue = multiprocessing.Queue()
|
||||||
|
|
||||||
|
def worker(orig, targ):
|
||||||
|
p = orig.get()
|
||||||
|
#out = "PROCESSED" + str(p.absolute())
|
||||||
|
out = audiopreprocessing.load_preprocessed_audio(p, 16000, True) # This will cause put to hang
|
||||||
|
targ.put(out) # This will hang the process
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
K = 2
|
||||||
|
|
||||||
|
for p in random_audio_chunk(K):
|
||||||
|
origin_queue.put(p)
|
||||||
|
|
||||||
|
processes = [multiprocessing.Process(target=worker, args=(origin_queue, target_queue)) for _ in range(K)]
|
||||||
|
for p in processes: p.start()
|
||||||
|
for p in processes: p.join()
|
||||||
|
|
||||||
|
logging.critical("Successfully terminated all threads")
|
||||||
|
|
||||||
|
for _ in range(K): print(target_queue.get())
|
||||||
21
mtafe_lab/test_mtafe.py
Normal file
21
mtafe_lab/test_mtafe.py
Normal file
@@ -0,0 +1,21 @@
|
|||||||
|
import logging
|
||||||
|
logging.basicConfig(format="%(asctime)s/%(levelname)s: [%(module)s] %(message)s", level=logging.INFO)
|
||||||
|
|
||||||
|
import mtafe
|
||||||
|
from dataset import random_audio_chunk
|
||||||
|
|
||||||
|
logging.info("Generating random audio path list")
|
||||||
|
rdpl = random_audio_chunk(2)
|
||||||
|
|
||||||
|
logging.info("Initializing MTAFE")
|
||||||
|
mtafe.initialize_parameters(
|
||||||
|
paudio_paths=rdpl,
|
||||||
|
pmax_audio_in_queue=4,
|
||||||
|
paudio_feeder_threads=2,
|
||||||
|
pfeature_extractor_threads=1,
|
||||||
|
pdesired_sr=32000,
|
||||||
|
pforce_mono=False,
|
||||||
|
pchunk_length=15,
|
||||||
|
pchunk_overlap=2
|
||||||
|
)
|
||||||
|
mtafe.test_feeder()
|
||||||
Reference in New Issue
Block a user