import platform import os import pickle import random import multiprocessing import threading import time import concurrent.futures import numpy as np from pathlib import Path import audiopreprocessing import logging import queue def serialize_dict_obj(path : Path, object : dict) -> int: """Serializes Python Dictionary object to a file via Pickle. Args: path (Path): Path to store the file object (dict): Dictionary object to serialize Returns: int: size in bytes written """ # Horrible practice, horrible security, but it will work for now with path.open("wb") as fp: pickle.dump(object, fp) fp.seek(0, os.SEEK_END) size = fp.tell() return size logging.info("Reading local dataset directory structure...") ASMRThreePath = Path("C:\\ASMRThree") ASMRTwoPath = Path("D:\\ASMRTwo") ASMROnePath = Path("E:\\ASMROne") if (platform.system() == 'Linux'): ASMROnePath = Path('/mnt/Scratchpad/ASMROne') ASMRTwoPath = Path('/mnt/MyStuffz/ASMRTwo') ASMRThreePath = Path('/mnt/Windows11/ASMRThree') size_one, size_two, size_three = 0, 0, 0 files_one, files_two, files_three = [], [], [] folders_one, folders_two, folders_three = [], [], [] # Statistic calculation for ASMROne for root, dirs, files in ASMROnePath.walk(): # Root will iterate through all folders if root.absolute() != ASMROnePath.absolute(): # Skip root of ASMROnePath folders_one.append(root) # Add folder to list for fname in files: # Iterate through all files in current root file = root/fname # Get file path assert file.is_file() files_one.append(file) size_one += file.stat().st_size # Get file size # Statistic calculation for ASMRTwo for root, dirs, files in ASMRTwoPath.walk(): # Root will iterate through all folders if root.absolute() != ASMRTwoPath.absolute(): # Skip root of ASMRTwoPath folders_two.append(root) # Add folder to list for fname in files: # Iterate through all files in current root file = root/fname # Get file path assert file.is_file() files_two.append(file) size_two += file.stat().st_size # Get file size # Statistic calculation for ASMRThree for root, dirs, files in ASMRThreePath.walk(): # Root will iterate through all folders if root.absolute() != ASMRThreePath.absolute(): # Skip root of ASMRThreePath folders_three.append(root) # Add folder to list for fname in files: # Iterate through all files in current root file = root/fname # Get file path assert file.is_file() files_three.append(file) size_three += file.stat().st_size # Get file size DataSubsetPaths = [ASMROnePath, ASMRTwoPath, ASMRThreePath] DLSiteWorksPaths = [] # Collect ASMR Works (RJ ID, Paths) for ASMRSubsetPath in DataSubsetPaths: for WorkPaths in ASMRSubsetPath.iterdir(): DLSiteWorksPaths.append(WorkPaths) fileExt2fileType = { ".TXT": "Document", ".WAV": "Audio", ".MP3": "Audio", ".PNG": "Image", ".JPG": "Image", ".VTT": "Subtitle", ".PDF": "Document", ".FLAC": "Audio", ".MP4": "Video", ".LRC": "Subtitle", ".SRT": "Subtitle", ".JPEG": "Image", ".ASS": "Subtitle", "": "NO EXTENSION", ".M4A": "Audio", ".MKV": "Video" } fileext_stat = {} file_list = files_one + files_two + files_three file_list_count = len(file_list) for file in file_list: f_ext = file.suffix.upper() if (f_ext in fileext_stat.keys()): fileext_stat[f_ext]['Count'] += 1 fileext_stat[f_ext]['List'].append(file) fileext_stat[f_ext]['ExtensionMass'] += file.stat().st_size else: fileext_stat[f_ext] = {} fileext_stat[f_ext]['Count'] = 1 fileext_stat[f_ext]['List'] = [file] fileext_stat[f_ext]['ExtensionMass'] = file.stat().st_size # The total sum of sizes of the same file extension fileext_stat[f_ext]['MediaType'] = fileExt2fileType[f_ext] audio_paths = [] for extension in fileext_stat: # I can't be bothered to convert this into a list compresion if fileext_stat[extension]['MediaType'] == "Audio": audio_paths += fileext_stat[extension]['List'] def random_audio_chunk(n : int, seed : int = 177013) -> list[Path]: """Returns a random selection of audio files Args: n (int): Amount of files to return seed (int, optional): Seed for RNG. Defaults to 177013. Returns: list[Path]: List of randomly selected audio paths (using Path object) """ random.seed(seed) #return random.choices(audio_paths, k=n) # Contains repeated elements return random.sample(audio_paths, k=n)