Files
DLSiteFSearch/FeatureExtraction/dataset_files.py

515 lines
25 KiB
Python

import platform
import os
import pickle
import random
import multiprocessing
import threading
import time
import concurrent.futures
import numpy as np
from pathlib import Path
import audiopreprocessing
import logging
import queue
def serialize_dict_obj(path : Path, object : dict) -> int:
"""Serializes Python Dictionary object to a file via Pickle.
Args:
path (Path): Path to store the file
object (dict): Dictionary object to serialize
Returns:
int: size in bytes written
"""
# Horrible practice, horrible security, but it will work for now
with path.open("wb") as fp:
pickle.dump(object, fp)
fp.seek(0, os.SEEK_END)
size = fp.tell()
return size
logging.info("Reading local dataset directory structure...")
ASMRThreePath = Path("C:\\ASMRThree")
ASMRTwoPath = Path("D:\\ASMRTwo")
ASMROnePath = Path("E:\\ASMROne")
if (platform.system() == 'Linux'):
ASMROnePath = Path('/mnt/Scratchpad/ASMROne')
ASMRTwoPath = Path('/mnt/MyStuffz/ASMRTwo')
ASMRThreePath = Path('/mnt/Windows11/ASMRThree')
size_one, size_two, size_three = 0, 0, 0
files_one, files_two, files_three = [], [], []
folders_one, folders_two, folders_three = [], [], []
# Statistic calculation for ASMROne
for root, dirs, files in ASMROnePath.walk(): # Root will iterate through all folders
if root.absolute() != ASMROnePath.absolute(): # Skip root of ASMROnePath
folders_one.append(root) # Add folder to list
for fname in files: # Iterate through all files in current root
file = root/fname # Get file path
assert file.is_file()
files_one.append(file)
size_one += file.stat().st_size # Get file size
# Statistic calculation for ASMRTwo
for root, dirs, files in ASMRTwoPath.walk(): # Root will iterate through all folders
if root.absolute() != ASMRTwoPath.absolute(): # Skip root of ASMRTwoPath
folders_two.append(root) # Add folder to list
for fname in files: # Iterate through all files in current root
file = root/fname # Get file path
assert file.is_file()
files_two.append(file)
size_two += file.stat().st_size # Get file size
# Statistic calculation for ASMRThree
for root, dirs, files in ASMRThreePath.walk(): # Root will iterate through all folders
if root.absolute() != ASMRThreePath.absolute(): # Skip root of ASMRThreePath
folders_three.append(root) # Add folder to list
for fname in files: # Iterate through all files in current root
file = root/fname # Get file path
assert file.is_file()
files_three.append(file)
size_three += file.stat().st_size # Get file size
DataSubsetPaths = [ASMROnePath, ASMRTwoPath, ASMRThreePath]
DLSiteWorksPaths = []
# Collect ASMR Works (RJ ID, Paths)
for ASMRSubsetPath in DataSubsetPaths:
for WorkPaths in ASMRSubsetPath.iterdir():
DLSiteWorksPaths.append(WorkPaths)
fileExt2fileType = {
".TXT": "Document",
".WAV": "Audio",
".MP3": "Audio",
".PNG": "Image",
".JPG": "Image",
".VTT": "Subtitle",
".PDF": "Document",
".FLAC": "Audio",
".MP4": "Video",
".LRC": "Subtitle",
".SRT": "Subtitle",
".JPEG": "Image",
".ASS": "Subtitle",
"": "NO EXTENSION",
".M4A": "Audio",
".MKV": "Video"
}
fileext_stat = {}
file_list = files_one + files_two + files_three
file_list_count = len(file_list)
for file in file_list:
f_ext = file.suffix.upper()
if (f_ext in fileext_stat.keys()):
fileext_stat[f_ext]['Count'] += 1
fileext_stat[f_ext]['List'].append(file)
fileext_stat[f_ext]['ExtensionMass'] += file.stat().st_size
else:
fileext_stat[f_ext] = {}
fileext_stat[f_ext]['Count'] = 1
fileext_stat[f_ext]['List'] = [file]
fileext_stat[f_ext]['ExtensionMass'] = file.stat().st_size # The total sum of sizes of the same file extension
fileext_stat[f_ext]['MediaType'] = fileExt2fileType[f_ext]
audio_paths = []
for extension in fileext_stat: # I can't be bothered to convert this into a list compresion
if fileext_stat[extension]['MediaType'] == "Audio":
audio_paths += fileext_stat[extension]['List']
def random_audio_chunk(n : int, seed : int = 177013) -> list[Path]:
"""Returns a random selection of audio files
Args:
n (int): Amount of files to return
seed (int, optional): Seed for RNG. Defaults to 177013.
Returns:
list[Path]: List of randomly selected audio paths (using Path object)
"""
random.seed(seed)
#return random.choices(audio_paths, k=n) # Contains repeated elements
return random.sample(audio_paths, k=n)
# class AudioFeatureExtractor():
# __audio_queue: list[ # List of ...
# tuple[ # Pair of chunked audio and its path
# list[tuple[np.ndarray, float, int]], # Chunked audio
# Path # Path to original audio
# ]
# ] # Listed of Chunked/Resampled audio
# __feeder_future: concurrent.futures.Future
# __extractor_future: concurrent.futures.Future
# __audio_paths_list: list[Path]
# __max_audio_in_queue: int
# __queue_lock: threading.Lock
# __desired_sr: int
# __mono: bool
# __chunk_length: float
# __overlap: float
# __features: dict[Path, list[tuple[np.ndarray, float, int]]] # This is a crime, I know
# # { audioPath:
# # [(embedding, pos, channel)...]
# # }
# def __embedding_inference(self, audio_ndarray: np.ndarray) -> np.ndarray:
# """Uses embedding model to inference an audio. Returns embedding vectors.
# Function to be overrided. Returns np.zeros(32).
# Args:
# audio_ndarray (np.ndarray):
# Returns:
# np.ndarray: _description_
# """
# return np.zeros(32)
# def __embedding_extract(self, audio: tuple[np.ndarray, float, int]) -> tuple[np.ndarray, float, int, np.ndarray]:
# """Receives a tuple of audio, position, and channel ID, then adding the embedding to the tuple
# Args:
# audio (tuple[np.ndarray, float, int]): tuple of audio, position, channel id
# Returns:
# tuple[np.ndarray, float, int, np.ndarray]: audio, position, channel id, embedding vector
# """
# audio_chunk, pos, channel_id = audio
# return (audio_chunk, pos, channel_id, self.__embedding_inference(audio_chunk))
# def __audio_queue_feeder(self): # TODO: Upgrade to multithreaded loader?
# """Internal thread function. Preprocess and load the audio continuously to
# audio_queue until the end of the audio_paths_list
# """
# while (self.__audio_paths_list): # While there are still Path elements in path list
# if (not (len(self.__audio_queue) < self.__max_audio_in_queue)):
# logging.info("[AFE] [Audio Queue Thread]: Queue Full, feeder thread sleeping for 5 seconds")
# time.sleep(5)
# while(len(self.__audio_queue) < self.__max_audio_in_queue): # While the audio queue is not full
# new_audio_path = self.__audio_paths_list[0]
# new_audio = audiopreprocessing.load_preprocessed_audio(
# new_audio_path,
# self.__desired_sr,
# self.__mono,
# self.__chunk_length,
# self.__overlap
# )
# with self.__queue_lock:
# self.__audio_queue.append(
# (new_audio, new_audio_path)
# )
# pop_path = self.__audio_paths_list.pop(0)
# logging.info(f"[AFE] [Audio Queue Thread]: Added new audio to queue {pop_path}")
# logging.info("[AFE] [Audio Queue Thread]: DONE. All audio files fed")
# def __audio_queue_feature_extractor(self):
# """Internal thread function. Get audio from audio queue. And extract embedding vector
# for all audio chunks. Stores the resulting embedding into self.__features.
# With Original Audio's Path as key, and list[tuple[np.ndarray, float, int]] (list of tuple of embedding vector, position, channel id)
# """
# while (self.__audio_paths_list or self.__audio_queue): # While there are still audio to be processed
# if (self.__audio_queue): # If audio queue is not empty
# with self.__queue_lock:
# audio_to_process, audio_path = self.__audio_queue.pop(0) # Get audio from queue
# logging.info(f"[AFE] [Feature Extractor Thread]: Extracting {len(audio_to_process)} features from audio {audio_path}")
# for audio_chunk in audio_to_process:
# same_audio_chunk, timepos, channel_id, embedd_vect = self.__embedding_extract(audio_chunk)
# if (audio_path not in self.__features.keys()):
# #if DEBUG: print("Adding new vector to", audio_path.name)
# self.__features[audio_path] = [(embedd_vect, timepos, channel_id)]
# else:
# #if DEBUG: print("Adding vector to", audio_path.name)
# self.__features[audio_path].append(
# (embedd_vect, timepos, channel_id)
# )
# else:
# logging.info("[AFE] [Feature Extractor Thread]: Queue Empty, extractor thread sleeping for 5 seconds") # If audio queue is empty, wait
# time.sleep(5)
# logging.info("[AFE] [Feature Extractor Thread]: DONE. Extracted all features from all audio files")
# def __init__(
# self,
# audio_paths_list: list[Path],
# max_audio_in_queue: int,
# desired_sr: int,
# mono: bool,
# chunk_length: float = 15.0,
# overlap: float = 2.0
# ):
# self.__audio_queue = []
# self.__audio_paths_list = audio_paths_list
# self.__max_audio_in_queue = max_audio_in_queue
# self.__queue_lock = threading.Lock()
# self.__desired_sr = desired_sr
# self.__mono = mono
# self.__chunk_length = chunk_length
# self.__overlap = overlap
# self.__features = {}
# @property
# def features(self) -> dict[Path, list[tuple[np.ndarray, float, int]]]:
# return self.__features
# def extract(self):
# print("Starting feature extraction for", len(self.__audio_paths_list), "file(s)")
# total_amount = len(self.__audio_paths_list)
# t_start = time.perf_counter()
# with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
# self.__feeder_future = executor.submit(self.__audio_queue_feeder)
# self.__extractor_future = executor.submit(self.__audio_queue_feature_extractor)
# while (self.__feeder_future.running() or self.__extractor_future.running()):
# print(f"Processed {len(self.__features)}/{total_amount} (L:{len(self.__audio_queue)}/W{len(self.__audio_paths_list)})", end="\r")
# time.sleep(1)
# t_stop = time.perf_counter()
# print(f"Processed {len(self.__features)}/{total_amount} (L:{len(self.__audio_queue)}/W:{len(self.__audio_paths_list)} COMPLETE)")
# delta_t = t_stop - t_start
# total_features = sum( [len(self.__features[path]) for path in self.__features] )
# print()
# print("Extraction completed")
# print(f"Took {delta_t} seconds. Added {total_features} vectors/embeddings")
class MultiThreadedAudioFeatureExtractor():
# This is the third time I am rewriting this, please send help. Multithreaded apps is pure hell to develop and debug
# After testing: this will hang at the last audio, precisely at preprocessing audio. I suspect that GIL hit the performance
# so much to the point that the preprocessing routine cannot get any share of the CPU execution cycle
__audio_queue: queue.Queue[ # List of ...
tuple[ # Pair of chunked audio and its path
list[tuple[np.ndarray, float, int]], # Chunked audio list of (ndarray, time position of chunk relative to original audio, channel_id)
Path # Path to original audio
]
] # Listed of Chunked/Resampled audio
__audio_feeder_threads: int # Amount of audio feeder threads
__feature_extractor_threads: int # Amount of feature extractor threads (if the method allows)
__audio_paths_list: queue.Queue[Path] # Path list to audio
__max_audio_in_queue: int # Maximum audio in queue
__audio_feeder_barrier: threading.Barrier # Synchronization barrier for all audio feeder threads
# Audio Feeder parameter
__desired_sr: int # Desired Sample Rate (Resampling)
__mono: bool # Force load audio in mono mode
__chunk_length: float # Audio chunk length
__overlap: float
# Result
__features: dict[Path, list[tuple[np.ndarray, float, int]]] # This is a crime, I know
__features_lock: threading.Lock
# __features: { audioPath:
# [(embedding1, pos1, channel1),
# (embedding2, pos2, channel1)]
# ...
# }
# Runtime
__audio_feeder_threadpool: list[concurrent.futures.Future]
__feature_extractor_threadpool: list[concurrent.futures.Future]
def __audio_inference_embedding(self, audio: list[tuple[np.ndarray, float, int]]) -> list[tuple[np.ndarray, float, int]]:
"""Receives a list of audio chunks, and then extracts embeddings for all audio chunks, returns the resulting embedding as a list of tuples(embedding, time, channel_id)
Args:
audio (list[tuple[np.ndarray, float, int]]): list of audio chunks
Returns:
list[tuple[np.ndarray, float, int]]: List of (embedding vector, timepos, channel id)
"""
features = []
for audio_chunk in audio:
audio, timepos, channel_id = audio_chunk
zero = np.zeros(32)
features.append( (zero, timepos, channel_id) )
time.sleep(0.01) # Simulate effort, change to simulate spent seconds in each audio file
return features
# To be overridden
def __audio_feeder_thread(self, thread_id: int, barrier: threading.Barrier):
try:
while True:
# Attempt to get audio path from audio path queue
new_audio_path = self.__audio_paths_list.get()
# Check thread exit condition (If the queue returns None, that means the audio path queue is now empty and the thread should end itself)
if (new_audio_path is None):
self.__audio_paths_list.put(new_audio_path) # Put None back to notify other audio feeder threads
# Omae wa mou shindeiru
break # Si la ETSISI ve esto seguramente me echarán de la escuela
# Now that the audio path queue is not empty, try preprocessing an audio
logging.info(f"[MTAFE] [Audio Feeder {thread_id}] Preprocess: {new_audio_path.absolute()}")
new_audio = audiopreprocessing.load_preprocessed_audio(
new_audio_path,
self.__desired_sr,
self.__mono,
self.__chunk_length,
self.__overlap
)
self.__audio_queue.put((new_audio, new_audio_path)) # In theory, this should block this audio feeder thread when the audio queue is full
logging.info(f"[MTAFE] [Audio Feeder {thread_id}] Feed: {new_audio_path.absolute()}")
logging.info("[MTAFE] [Audio Feeder {thread_id}] Waiting for other threads to finish")
barrier.wait()
if (thread_id == 0):
self.__audio_queue.put(None) # None to signal audio_queue has no more elements to process
logging.info(f"[MTAFE] [Audio Feeder {thread_id}] Thread finished!")
except Exception as e:
logging.error(f"[MTAFE] [Audio Feeder {thread_id}] An exception occurred! Committing seppuku!")
logging.exception(e)
return
# while (not self.__audio_paths_list.empty()):
# if (not self.__audio_queue.full()):
# # Feed audio
# new_audio_path = self.__audio_paths_list.get()
# self.__audio_paths_list.task_done()
# logging.info(f"[MTAFE] [Audio Feeder {thread_id}] Preprocess: {new_audio_path.absolute()}")
# new_audio = audiopreprocessing.load_preprocessed_audio(
# new_audio_path,
# self.__desired_sr,
# self.__mono,
# self.__chunk_length,
# self.__overlap
# )
# self.__audio_queue.put((new_audio, new_audio_path))
# logging.info(f"[MTAFE] [Audio Feeder {thread_id}] Feed: {new_audio_path.absolute()}")
# logging.info(f"[MTAFE] [Audio Feeder {thread_id}] Thread finished!")
#def testfeedthread(self, nthreads):
# t1 = threading.Thread(target=self.__audio_feeder_thread, args=(1,))
# t2 = threading.Thread(target=self.__audio_feeder_thread, args=(2,))
# t1.start(); t2.start()
# #with self.__audio_feed_condition:
# # self.__audio_feed_condition.notify_all()
# t1.join(); t2.join()
# with concurrent.futures.ThreadPoolExecutor(max_workers=nthreads) as executor:
# for i in range(nthreads):
# ft = executor.submit(self.__audio_feeder_thread, i)
# self.__audio_loader_threadpool.append(ft)
def __check_all_audiofeed_thread_finished(self) -> bool:
for ft in self.__audio_feeder_threadpool:
if ft.running():
return False
return True
def __check_all_featureextractor_thread_finished(self) -> bool:
for ft in self.__feature_extractor_threadpool:
if ft.running():
return False
return True
def __feature_extractor_thread(self, thread_id):
while True:
# Attempt to get next audio chunks to process
next_audio_tuple = self.__audio_queue.get()
# Check thread exit condition
if (next_audio_tuple is None):
self.__audio_queue.put(next_audio_tuple) # Put the None back to notify other threads
break # unalive urself
else: # Assuming we got more tuples
current_audio_to_process, current_audio_path = next_audio_tuple # Deconstruct tuple
logging.info(f"[MTAFE] [Feature Extractor {thread_id}] Extracting: {current_audio_path}")
features_to_add = self.__audio_inference_embedding(current_audio_to_process)
with self.__features_lock:
self.__features[current_audio_path] = features_to_add
logging.info(f"[MTAFE] [Feature Extractor {thread_id}] Feature Extraction complete for {current_audio_path} w/ {len(features_to_add)} features")
logging.info(f"[MTAFE] [Feature Extractor {thread_id}] Thread finished!")
# while (not self.__check_all_audiofeed_thread_finished() or not self.__audio_queue.empty()):
# if (not self.__audio_queue.empty()):
# audio_to_process, audio_path = self.__audio_queue.get()
# self.__audio_queue.task_done()
# logging.info(f"[MTAFE] [Feature Extractor {thread_id}] Extracting: {audio_path}")
# features_to_add = self.__audio_inference_embedding(audio_to_process)
# logging.info(f"[MTAFE] [Feature Extractor {thread_id}] Extracted: {len(features_to_add)} features")
# with self.__features_lock:
# self.__features[audio_path] = features_to_add
# #with self.__audio_feed_condition: self.__audio_feed_condition.notify_all()
# logging.info(f"[MTAFE] [Feature Extractor {thread_id}] Feature Extraction complete for {audio_path} w/ {len(features_to_add)} features")
#else:
# if (not self.__check_all_audiofeed_thread_finished()):
# with self.__audio_feed_condition:
# logging.info(f"[MTAFE] [Feature Extractor {thread_id}] Audio queue empty: waiting")
# self.__audio_feed_condition.wait(10)
# self.__audio_feed_condition.wait_for(lambda: not self.__audio_queue.empty())
def __count_running_threads(self) -> tuple[int, int]:
running_extractors = 0
running_feeders = 0
for ft in self.__feature_extractor_threadpool:
if ft.running(): running_extractors += 1
for ft in self.__audio_feeder_threadpool:
if ft.running(): running_feeders += 1
return (running_feeders, running_extractors)
@property
def features(self) -> dict[Path, list[tuple[np.ndarray, float, int]]]:
return self.__features
def extract(self):
total_amount = self.__audio_paths_list.qsize() - 1 # Account for None to indicate queue end
logging.info(f"[MTAFE] [Main] Starting feature extraction for {total_amount} file(s)")
t_start = time.perf_counter() # Timer
with concurrent.futures.ProcessPoolExecutor(max_workers=(self.__audio_feeder_threads + self.__feature_extractor_threads)) as executor:
# Audio feeder threads
for i in range(self.__audio_feeder_threads):
logging.info(f"[MTAFE] Started audio feeder thread {i}")
ld_ft = executor.submit(self.__audio_feeder_thread, i, self.__audio_feeder_barrier)
self.__audio_feeder_threadpool.append(ld_ft)
# Feature extractor threads
for i in range(self.__feature_extractor_threads):
logging.info(f"[MTAFE] Started feature extractor thread {i}")
ex_ft = executor.submit(self.__feature_extractor_thread, i)
self.__feature_extractor_threadpool.append(ex_ft)
# Progress checking
while ( (not self.__check_all_audiofeed_thread_finished()) and (not self.__check_all_featureextractor_thread_finished()) ):
nfeeder, nextract = self.__count_running_threads()
print(f"[MTAFE Progress] Processed {len(self.__features)}/{total_amount} (L:{self.__audio_queue.qsize()}/W:{self.__audio_paths_list.qsize()}, LD:{nfeeder}/EXT:{nextract})", end="\r")
t_stop = time.perf_counter()
logging.info(f"[MTAFE] Processed {len(self.__features)}/{total_amount} (L:{self.__audio_queue.qsize() - 1}/W:{self.__audio_paths_list.qsize() - 1} COMPLETE)")
delta_t = t_stop - t_start
total_features = sum( [len(self.__features[path]) for path in self.__features] )
logging.info(f"[MTAFE] Extraction complete. Took {delta_t} seconds. Added {total_features} vectors/embeddings")
def __init__(
self,
audio_paths: list[Path],
max_audio_in_queue: int = 16,
audio_feeder_threads: int = 8,
feature_extractor_threads: int = 8,
desired_sr: int = 32000,
force_mono: bool = False,
chunk_length: float = 15.0,
chunk_overlap: float = 2.0,
):
# Check if the paths passed in are all valid and add them to queue
self.__audio_paths_list = multiprocessing.Queue()
for p in audio_paths:
if not p.is_file():
raise Exception(f"Path '{p.absolute()}' is NOT a valid file!")
else:
self.__audio_paths_list.put(p)
self.__audio_paths_list.put(None) # To signal to the producer that the audio path list is empty, since Queue.empty() is unreliable
logging.info(f"[MTAFE] [Constructor] Queued {self.__audio_paths_list.qsize() - 1} files")
# Set up private attributes
## Audio preprocessing parameters
self.__desired_sr = desired_sr
self.__mono = force_mono
self.__chunk_length = chunk_length
self.__overlap = chunk_overlap
## Extractor/Feeder settings
self.__max_audio_in_queue = max_audio_in_queue
self.__audio_feeder_threads = audio_feeder_threads
self.__feature_extractor_threads = feature_extractor_threads
## Set up runtime conditions
self.__audio_queue = multiprocessing.Queue(maxsize=self.__max_audio_in_queue)
self.__features = {}
self.__features_lock = multiprocessing.Lock()
self.__audio_feeder_barrier = multiprocessing.Barrier(self.__audio_feeder_threads)
self.__audio_feeder_threadpool = []
self.__feature_extractor_threadpool = []
logging.info(f"[MTAFE] [Constructor] Extraction parameters: {desired_sr}Hz, Mono: {force_mono}, Divide into {chunk_length}s chunks with {chunk_overlap}s of overlap")
logging.info(f"[MTAFE] [Constructor] Using {audio_feeder_threads} threads for preprocessing audio and {feature_extractor_threads} threads for feature extraction. Max queue size of {max_audio_in_queue} files")
# More audio embeddings specific code below (To be overridden)