some test data

This commit is contained in:
2025-04-10 09:01:24 +02:00
parent a9d3d10da9
commit 6fc6df87b2
10 changed files with 25401 additions and 22798 deletions

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because one or more lines are too long

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,80 @@
import librosa
import pickle
import os
import numpy as np
from pathlib import Path
DEBUG=True
def resample_load(input_path : Path, target_sr : int = 16000, mono_audio : bool = False) -> np.ndarray: # AI
"""Resample audio to target sample rate and save to output directory"""
# Load audio file with original sample rate
if DEBUG: print("[resample_load] Loading audio", input_path)
audio, orig_sr = librosa.load(input_path, sr=None, mono=mono_audio)
# Resample if necessary
if orig_sr != target_sr:
if DEBUG: print("[resample_load] Resampling to", target_sr)
audio = librosa.resample(audio, orig_sr=orig_sr, target_sr=target_sr)
return audio
def chunk_audio(audio : np.ndarray, sr: int, chunk_length: float = 10.0, overlap: float = 2.0) -> tuple[list[np.ndarray], list[float], int]: # AI
"""
Chunks audio file into overlapping segments. Only pass in mono audio here.
Args:
audio_file: Loaded audio ndarray
sr: Sample rate for the given audio file
chunk_length: Length of each chunk in seconds
overlap: Overlap between chunks in seconds
Returns:
List of audio chunks, list of chunk positions, and given sample rate
"""
if DEBUG: print("[chunk_audio] Chunking audio")
# Calculate chunk size and hop length in samples
chunk_size = int(chunk_length * sr)
hop_length = int((chunk_length - overlap) * sr)
# Generate chunks
chunks = []
positions = []
k = 0
for i in range(0, len(audio) - chunk_size + 1, hop_length):
chunk = audio[i:i + chunk_size]
chunks.append(chunk)
positions.append(i / sr)
k += 1
if DEBUG: print("[chunk_audio] Chunked", k, end="\r")
if k == 0: # The full audio length is less than chunk_length
chunks = [audio]
positions = [0.0]
return chunks, positions, sr
def load_preprocessed_audio(
path: Path,
desired_sr: int,
mono: bool = False,
chunk_length: float = 15.0,
overlap: float = 2.0) -> list[tuple[np.ndarray, float, int]]:
result = []
# Load and resample audio
audio = resample_load(path, desired_sr, mono) # Stereo 2D matrix, Mono 1D array
if mono or (audio.ndim == 1):
# Chunk audio: mono (or the audio file loaded in itself is mono)
chunks, positions, _ = chunk_audio(audio, desired_sr, chunk_length, overlap)
assert len(chunks) == len(positions)
result.extend(zip(chunks, positions, [-1 for _ in range(len(chunks))]))
# (ndarray_chunk1, pos1, -1): first audio chunk, position1, -1 (Mono channel indicator)
else:
# Chunk audio: stereo/multichannel
for channel_id, channel_audio in enumerate(audio):
chunks, positions, _ = chunk_audio(channel_audio, desired_sr, chunk_length, overlap)
assert len(chunks) == len(positions)
result.extend(zip(chunks, positions, [channel_id for _ in range(len(chunks))]))
# (ndarray_chunk1, pos1, 0): first audio chunk, position1, 0 (channel 0)
return result

View File

@@ -1,9 +1,43 @@
import platform
import os
import pickle
import random
import threading
import time
import concurrent.futures
import numpy as np
from pathlib import Path
import audiopreprocessing
DEBUG=True
def serialize_dict_obj(path : Path, object : dict) -> int:
"""Serializes Python Dictionary object to a file via Pickle.
Args:
path (Path): Path to store the file
object (dict): Dictionary object to serialize
Returns:
int: size in bytes written
"""
# Horrible practice, horrible security, but it will work for now
with path.open("wb") as fp:
pickle.dump(object, fp)
fp.seek(0, os.SEEK_END)
size = fp.tell()
return size
print("Reading local dataset directory structure...")
ASMRThreePath = Path("C:\\ASMRThree")
ASMRTwoPath = Path("D:\\ASMRTwo")
ASMROnePath = Path("E:\\ASMROne")
if (platform.system() == 'Linux'):
ASMROnePath = Path('/mnt/Scratchpad/ASMROne')
ASMRTwoPath = Path('/mnt/MyStuffz/ASMRTwo')
ASMRThreePath = Path('/mnt/Windows11/ASMRThree')
size_one, size_two, size_three = 0, 0, 0
files_one, files_two, files_three = [], [], []
folders_one, folders_two, folders_three = [], [], []
@@ -78,4 +112,161 @@ for file in file_list:
fileext_stat[f_ext]['Count'] = 1
fileext_stat[f_ext]['List'] = [file]
fileext_stat[f_ext]['ExtensionMass'] = file.stat().st_size # The total sum of sizes of the same file extension
fileext_stat[f_ext]['MediaType'] = fileExt2fileType[f_ext]
fileext_stat[f_ext]['MediaType'] = fileExt2fileType[f_ext]
audio_paths = []
for extension in fileext_stat: # I can't be bothered to convert this into a list compresion
if fileext_stat[extension]['MediaType'] == "Audio":
audio_paths += fileext_stat[extension]['List']
def random_audio_chunk(n : int, seed : int = 177013) -> list[Path]:
"""Returns a random selection of audio files
Args:
n (int): Amount of files to return
seed (int, optional): Seed for RNG. Defaults to 177013.
Returns:
list[Path]: List of randomly selected audio paths (using Path object)
"""
random.seed(seed)
#return random.choices(audio_paths, k=n) # Contains repeated elements
return random.sample(audio_paths, k=n)
class AudioFeatureExtractor():
__audio_queue: list[ # List of ...
tuple[ # Pair of chunked audio and its path
list[tuple[np.ndarray, float, int]], # Chunked audio
Path # Path to original audio
]
] # Listed of Chunked/Resampled audio
__feeder_future: concurrent.futures.Future
__extractor_future: concurrent.futures.Future
__audio_paths_list: list[Path]
__max_audio_in_queue: int
__queue_lock: threading.Lock
__desired_sr: int
__mono: bool
__chunk_length: float
__overlap: float
__features: dict[Path, list[tuple[np.ndarray, float, int]]]
# { audioPath:
# [(embedding, pos, channel)...]
# }
def __embedding_inference(self, audio_ndarray: np.ndarray) -> np.ndarray:
"""Uses embedding model to inference an audio. Returns embedding vectors.
Function to be overrided. Returns np.zeros(32).
Args:
audio_ndarray (np.ndarray):
Returns:
np.ndarray: _description_
"""
return np.zeros(32)
def __embedding_extract(self, audio: tuple[np.ndarray, float, int]) -> tuple[np.ndarray, float, int, np.ndarray]:
"""Receives a tuple of audio, position, and channel ID, then adding the embedding to the tuple
Args:
audio (tuple[np.ndarray, float, int]): tuple of audio, position, channel id
Returns:
tuple[np.ndarray, float, int, np.ndarray]: audio, position, channel id, embedding vector
"""
audio_chunk, pos, channel_id = audio
return (audio_chunk, pos, channel_id, self.__embedding_inference(audio_chunk))
def __audio_queue_feeder(self): # TODO: Upgrade to multithreaded loader?
"""Internal thread function. Preprocess and load the audio continuously to
audio_queue until the end of the audio_paths_list
"""
while (self.__audio_paths_list): # While there are still Path elements in path list
if (not (len(self.__audio_queue) < self.__max_audio_in_queue)):
if DEBUG: print("Audio Queue Thread: Queue Full, feeder thread sleeping for 5 seconds")
time.sleep(5)
while(len(self.__audio_queue) < self.__max_audio_in_queue): # While the audio queue is not full
new_audio_path = self.__audio_paths_list[0]
new_audio = audiopreprocessing.load_preprocessed_audio(
new_audio_path,
self.__desired_sr,
self.__mono,
self.__chunk_length,
self.__overlap
)
with self.__queue_lock:
self.__audio_queue.append(
(new_audio, new_audio_path)
)
pop_path = self.__audio_paths_list.pop(0)
if DEBUG: print("Audio Queue Thread: Added new audio to queue", pop_path)
if DEBUG: print("Audio Queue Thread: DONE. All audio files fed")
def __audio_queue_feature_extractor(self):
"""Internal thread function. Get audio from audio queue. And extract embedding vector
for all audio chunks. Stores the resulting embedding into self.__features.
With Original Audio's Path as key, and list[tuple[np.ndarray, float, int]] (list of tuple of embedding vector, position, channel id)
"""
while (self.__audio_paths_list or self.__audio_queue): # While there are still audio to be processed
if (self.__audio_queue): # If audio queue is not empty
with self.__queue_lock:
audio_to_process, audio_path = self.__audio_queue.pop(0) # Get audio from queue
if DEBUG: print(f"Feature Extractor Thread: Extracting {len(audio_to_process)} features from audio", audio_path)
for audio_chunk in audio_to_process:
same_audio_chunk, timepos, channel_id, embedd_vect = self.__embedding_extract(audio_chunk)
if (audio_path not in self.__features.keys()):
#if DEBUG: print("Adding new vector to", audio_path.name)
self.__features[audio_path] = [(embedd_vect, timepos, channel_id)]
else:
#if DEBUG: print("Adding vector to", audio_path.name)
self.__features[audio_path].append(
(embedd_vect, timepos, channel_id)
)
else:
if DEBUG: print("Feature Extractor Thread: Queue Empty, extractor thread sleeping for 5 seconds") # If audio queue is empty, wait
time.sleep(5)
if DEBUG: print("Feature Extractor Thread: DONE. Extracted all features from all audio files")
def __init__(
self,
audio_paths_list: list[Path],
max_audio_in_queue: int,
desired_sr: int,
mono: bool,
chunk_length: float = 15.0,
overlap: float = 2.0
):
self.__audio_queue = []
self.__audio_paths_list = audio_paths_list
self.__max_audio_in_queue = max_audio_in_queue
self.__queue_lock = threading.Lock()
self.__desired_sr = desired_sr
self.__mono = mono
self.__chunk_length = chunk_length
self.__overlap = overlap
self.__features = {}
@property
def features(self) -> dict[Path, list[tuple[np.ndarray, float, int]]]:
return self.__features
def extract(self):
print("Starting feature extraction for", len(self.__audio_paths_list), "file(s)")
total_amount = len(self.__audio_paths_list)
t_start = time.perf_counter()
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
self.__feeder_future = executor.submit(self.__audio_queue_feeder)
self.__extractor_future = executor.submit(self.__audio_queue_feature_extractor)
while (self.__feeder_future.running() or self.__extractor_future.running()):
print(f"Processed {len(self.__features)}/{total_amount} (L:{len(self.__audio_queue)}/W{len(self.__audio_paths_list)})", end="\r")
time.sleep(1)
t_stop = time.perf_counter()
print(f"Processed {len(self.__features)}/{total_amount} (L:{len(self.__audio_queue)}/W:{len(self.__audio_paths_list)} COMPLETE)")
delta_t = t_stop - t_start
total_features = sum( [len(self.__features[path]) for path in self.__features] )
print()
print("Extraction completed")
print(f"Took {delta_t} seconds. Added {total_features} vectors/embeddings")

View File

@@ -0,0 +1,3 @@
from dataset_files import AudioFeatureExtractor, random_audio_chunk
afe = AudioFeatureExtractor(random_audio_chunk(32), 16, 32000, False)
afe.extract()