Files
DLSiteFSearch/mtafe_lab/audiopreprocessing.py

95 lines
3.7 KiB
Python

import librosa
import pickle
import os
import numpy as np
from pathlib import Path
import logging
logger = logging.getLogger(__name__)
def triggerlog():
logger.critical("Testing: info")
def resample_load(input_path : Path, target_sr : int = 16000, mono_audio : bool = False) -> np.ndarray: # AI
"""Load and resamples the audio into `target_sr`.
Args:
input_path (Path): pathlib.Path object to audio file
target_sr (int, optional): Target Sample Rate to resample. Defaults to 16000.
mono_audio (bool, optional): Load the audio in mono mode. Defaults to False.
Returns:
np.ndarray: _description_
"""
# Load audio file with original sample rate
logger.info(f"[resample_load] Loading audio {input_path}")
audio, orig_sr = librosa.load(input_path, sr=None, mono=mono_audio)
# Resample if necessary
if orig_sr != target_sr:
logger.info(f"[resample_load] Resampling to {target_sr}")
audio = librosa.resample(audio, orig_sr=orig_sr, target_sr=target_sr)
return audio
def chunk_audio(audio : np.ndarray, sr: int, chunk_length: float = 10.0, overlap: float = 2.0) -> tuple[list[np.ndarray], list[float], int]: # AI
"""
Chunks audio file into overlapping segments. Only pass in mono audio here.
Args:
audio_file: Loaded audio ndarray (one channel only)
sr: Sample rate for the given audio file
chunk_length: Length of each chunk in seconds
overlap: Overlap between chunks in seconds
Returns:
List of audio chunks, list of chunk positions, and given sample rate
"""
logger.info(f"[chunk_audio] Chunking audio ({len(audio) / sr}s)")
# Calculate chunk size and hop length in samples
chunk_size = int(chunk_length * sr)
hop_length = int((chunk_length - overlap) * sr)
# Generate chunks
chunks = []
positions = []
k = 0
for i in range(0, len(audio) - chunk_size + 1, hop_length):
chunk = audio[i:i + chunk_size]
chunks.append(chunk)
positions.append(i / sr)
k += 1
if k == 0: # The full audio length is less than chunk_length
chunks = [audio]
positions = [0.0]
logger.info(f"[chunk_audio] Audio less than chunk_length. Returning original audio as chunk\r")
else:
logger.info(f"[chunk_audio] Audio is split into {k} chunks")
return chunks, positions, sr
def load_preprocessed_audio(
path: Path,
desired_sr: int,
mono: bool = False,
chunk_length: float = 15.0,
overlap: float = 2.0) -> list[tuple[np.ndarray, float, int]]:
result = []
# Load and resample audio
audio = resample_load(path, desired_sr, mono) # Stereo 2D matrix, Mono 1D array
if mono or (audio.ndim == 1):
# Chunk audio: mono (or the audio file loaded in itself is mono)
chunks, positions, _ = chunk_audio(audio, desired_sr, chunk_length, overlap)
assert len(chunks) == len(positions)
result.extend(zip(chunks, positions, [-1 for _ in range(len(chunks))]))
# (ndarray_chunk1, pos1, -1): first audio chunk, position1, -1 (Mono channel indicator)
else:
# Chunk audio: stereo/multichannel
for channel_id, channel_audio in enumerate(audio):
chunks, positions, _ = chunk_audio(channel_audio, desired_sr, chunk_length, overlap)
assert len(chunks) == len(positions)
result.extend(zip(chunks, positions, [channel_id for _ in range(len(chunks))]))
# (ndarray_chunk1, pos1, 0): first audio chunk, position1, 0 (channel 0)
logging.info(f"[load_preprocessed_audio] Loaded audio {path} ({desired_sr}Hz, Chunk {chunk_length}s with overlap {overlap}s) MONO:{mono}")
return result