Source code for maui.acoustic_indices.acoustic_indices

"""
Module for Calculating Acoustic Indices from Audio Files

This module provides functions to calculate acoustic indices from audio files stored 
in a pandas DataFrame. It supports both parallel and sequential processing, 
dividing the DataFrame into smaller chunks to optimize performance.

Functions
---------
- calculate_acoustic_indices(df_init, file_path_col, acoustic_indices_methods, 
  pre_calculation_method, parallel, chunk_size=None, temp_dir='./tmp_maui_ac_files/'): 
  Calculates acoustic indices for audio files in a DataFrame, with support for 
  parallel processing.

Dependencies
------------
- numpy
- pandas
- maad
- tqdm
- tempfile
- os
- gc
- functools.partial
- multiprocessing as mp

"""

import gc
import os
import multiprocessing as mp
from functools import partial

import numpy as np
import pandas as pd

from maad import sound

def _convert_if_list_string(value):
    """
    Helper function to convert a space-separated or comma-separated string of floats
    or integers into a list of floats or integers.

    Parameters:
    value (any): The value to check and possibly convert.

    Returns:
    any: The original value or a converted list of floats or integers if the string
    represents a list of numbers.
    """
    if isinstance(value, str):
        value = value.strip("[]")  # Remove the brackets
        value = value.replace("\n", " ")  # Replace newlines with spaces
        value = value.replace(",", " ")  # Replace commas with spaces
        try:
            # Convert the cleaned string to a list of floats or integers
            return [float(x) if "." in x else int(x) for x in value.split()]
        except ValueError:
            # If conversion fails, return the original string
            return value
    return value


def _convert_string_to_list(df: pd.DataFrame) -> pd.DataFrame:
    """
    Convert string representations of lists in a DataFrame to actual lists.

    Parameters:
    df (pd.DataFrame): The DataFrame to process.

    Returns:
    pd.DataFrame: The DataFrame with strings converted to lists where applicable.
    """
    for col in df.columns:
        if df[col].dtype == "object":  # Only process columns with object dtype
            df[col] = df[col].apply(_convert_if_list_string)
    return df


def _extract_indices_worker(
    df_chunk,
    file_path_col: str,
    acoustic_indices_methods,
    pre_calculation_method,
    temp_dir,
    **kwargs,
) -> str:
    """
    Helper function to extract acoustic indices from audio files and save them
    temporarily in a CSV file.

    This function processes a chunk of a DataFrame to calculate acoustic indices for
    audio files. It loads each audio file, applies a pre-calculation method, and then
    computes multiple acoustic indices using the provided methods. The results are stored
    in a temporary CSV file and the original data types of the DataFrame columns are retained.

    Parameters
    ----------
    df_chunk : tuple of (pd.DataFrame, int)
        A tuple containing a chunk of the DataFrame and its corresponding index.
        The DataFrame must have at least the column specified by `file_path_col`.

    file_path_col : str
        The name of the column in the DataFrame that contains the file paths to the audio files.

    acoustic_indices_methods : list of callables
        A list of methods, where each method computes a specific acoustic index.
        Each method should accept the output of `pre_calculation_method` and return a dictionary
        of index names and values.

    pre_calculation_method : callable
        A method that performs any necessary pre-calculations on the audio data.
        It should accept the loaded audio data and sampling rate and return an object
        that will be passed to each of the `acoustic_indices_methods`.

    temp_dir : str
        The directory path where the temporary CSV file will be saved.

    **kwargs : dict, optional
        Additional keyword arguments:
        - parallel (bool): If True, the function is running in parallel mode.
        - fid (str): A file identifier used when not running in parallel mode.

    Returns
    -------
    temp_file_path : str
        The file path to the temporary CSV file where the calculated indices are saved.

    original_dtypes : dict
        A dictionary mapping column names to their original data types in the DataFrame.

    Notes
    -----
    - This function assumes that the DataFrame `df_chunk` contains a column with file paths
      to the audio files. It processes each file, calculating the required indices and storing
      them in a temporary file.
    - If the audio file cannot be loaded, the function will print a message and skip the
      calculation for that file.
    - The `gc.collect()` calls are used to free memory after processing each row and after
      creating the temporary file.
    """

    indices_temp = {}

    df, fidx = df_chunk
    df = df.copy()

    for chunk_index, (_, row) in enumerate(df.iterrows()):
        s, fs = sound.load(row[file_path_col])

        if len(s) == 0:
            print(
                f"Sound loading failed or the file {row['file_path']} "\
                "is corrupted. Acoustic indices not calculated."
            )
        else:
            pre_calc_vars = pre_calculation_method(s, fs)
            for method in acoustic_indices_methods:
                indices_dict = method(pre_calc_vars)
                for key, value in indices_dict.items():
                    if key not in indices_temp:
                        indices_temp[key] = np.empty(len(df), dtype=type(value))
                        indices_temp[key][:] = np.nan  # Initialize with NaNs
                    indices_temp[key][
                        chunk_index
                    ] = value  # Use chunk_index instead of index
            del indices_dict
            gc.collect()

    if not kwargs["parallel"]:
        file_id = kwargs["fid"]
    else:
        file_id = f"{mp.current_process().pid}_{fidx}"

    temp_file_path = os.path.join(temp_dir, f"temp_{file_id}.csv")

    for key, value in indices_temp.items():
        df[key] = value
    df.to_csv(temp_file_path, index=False)
    original_dtypes = df.dtypes.to_dict()

    del indices_temp
    gc.collect()

    return temp_file_path, original_dtypes


[docs] def calculate_acoustic_indices( df_init: pd.DataFrame, file_path_col: str, acoustic_indices_methods: list, pre_calculation_method, parallel: bool, chunk_size: int = None, temp_dir: str = "./tmp_maui_ac_files/", ) -> pd.DataFrame: """ Calculate acoustic indices for audio files in a DataFrame. This method processes a DataFrame containing file paths to audio files, calculates acoustic indices using the specified methods, and returns a DataFrame with the results. The calculations can be performed in parallel or sequentially, depending on the `parallel` flag. Parameters ---------- df_init : pd.DataFrame The initial DataFrame containing the file paths to audio files and any other necessary metadata. file_path_col : str The name of the column in `df_init` that contains the file paths to the audio files. acoustic_indices_methods : list of callables A list of methods, where each method computes a specific acoustic index. Each method should accept the output of `pre_calculation_method` and return a dictionary of index names and values. pre_calculation_method : callable A method that performs any necessary pre-calculations on the audio data. It should accept the loaded audio data and sampling rate, returning an object that will be passed to each of the `acoustic_indices_methods`. parallel : bool If True, the calculations will be performed in parallel using multiple processes. If False, the calculations will be performed sequentially. chunk_size : int, optional The number of rows to process in each chunk. If not provided, a default value is calculated based on the number of CPU cores available. temp_dir : str, optional The directory path where the temporary CSV files will be saved. The default is './tmp_maui_ac_files/'. Returns ------- pd.DataFrame A DataFrame containing the original data along with the calculated acoustic indices. Notes ----- - The method first divides the DataFrame into smaller chunks, each of which is processed separately to calculate the acoustic indices. The results are saved as temporary CSV files. - If `parallel` is True, multiple processes are used to calculate the indices concurrently. Otherwise, the calculation is done sequentially. - The method combines the results from all chunks into a single DataFrame, restores the original data types, and removes the temporary files. - The `_convert_string_to_list` function is applied to the final DataFrame to ensure that the data types are correctly interpreted. Example ------- >>> from maui import samples, utils, acoustic_indices >>> df = samples.get_audio_sample(dataset="leec") >>> df["dt"] = pd.to_datetime(df["timestamp_init"]).dt.date >>> def pre_calculation_method(s, fs): >>> Sxx_power, tn, fn, ext = maad.sound.spectrogram (s, fs) >>> Sxx_noNoise= maad.sound.median_equalizer(Sxx_power, display=False, extent=ext) >>> Sxx_dB_noNoise = maad.util.power2dB(Sxx_noNoise) >>> >>> Sxx, tn, fn, ext = maad.sound.spectrogram(s, fs, mode='amplitude') >>> >>> pre_calc_vars = {'Sxx': Sxx, 'tn':tn , 'fn':fn , 'ext':ext, 'Sxx_dB_noNoise':Sxx_dB_noNoise } >>> return pre_calc_vars >>> >>> def get_aci(pre_calc_vars): >>> aci_xx, aci_per_bin, aci_sum = maad.features.acoustic_complexity_index(pre_calc_vars['Sxx']) >>> indices = {'aci_xx': aci_xx, 'aci_per_bin':aci_per_bin , 'aci_sum':aci_sum} >>> return indices >>> >>> def get_spectral_events(pre_calc_vars): >>> EVNspFract_per_bin, EVNspMean_per_bin, EVNspCount_per_bin, EVNsp = maad.features.spectral_events( >>> pre_calc_vars['Sxx_dB_noNoise'], >>> dt=pre_calc_vars['tn'][1] - pre_calc_vars['tn'][0], >>> dB_threshold=6, >>> rejectDuration=0.1, >>> display=False, >>> extent=pre_calc_vars['ext']) >>> >>> indices = {'EVNspFract_per_bin': EVNspFract_per_bin, 'EVNspMean_per_bin':EVNspMean_per_bin , 'EVNspCount_per_bin':EVNspCount_per_bin, 'EVNsp':EVNsp} >>> return indices >>> def get_spectral_activity(pre_calc_vars): >>> ACTspfract_per_bin, ACTspcount_per_bin, ACTspmean_per_bin = maad.features.spectral_activity(pre_calc_vars['Sxx_dB_noNoise']) >>> indices = {'ACTspfract_per_bin': ACTspfract_per_bin, 'ACTspcount_per_bin':ACTspcount_per_bin , 'ACTspmean_per_bin':ACTspmean_per_bin} >>> return indices >>> acoustic_indices_methods = [get_aci, get_spectral_activity, get_spectral_events] >>> result_df = acoustic_indices.calculate_acoustic_indices( df, 'file_path', indices_methods, pre_calc, parallel=True) """ os.makedirs(temp_dir, exist_ok=True) num_processes = mp.cpu_count() if chunk_size is None: chunk_size = min(len(df_init) // num_processes + 1, 20) df_chunks = [ df_init.iloc[i : i + chunk_size] for i in range(0, len(df_init), chunk_size) ] df_chunks = [(df, idx) for idx, df in enumerate(df_chunks)] print("Calculating acoustic indices...") if parallel: pool = mp.Pool(processes=num_processes) worker = partial( _extract_indices_worker, file_path_col=file_path_col, acoustic_indices_methods=acoustic_indices_methods, pre_calculation_method=pre_calculation_method, temp_dir=temp_dir, parallel=parallel, ) temp_files = pool.map(worker, df_chunks) print("Joinning threads...") pool.close() pool.join() else: temp_files = [] for it, df_it in enumerate(df_chunks): result = _extract_indices_worker( df_it, file_path_col, acoustic_indices_methods, pre_calculation_method, temp_dir, parallel=parallel, fid=it, ) temp_files.append(result) print("Preparing final dataframe and removing temporary files...") # Combine results from temp files combined_df = [] for file in temp_files: file, dtypes = file df_temp = pd.read_csv(file) df_temp = df_temp.astype(dtypes) combined_df.append(df_temp) os.remove(file) combined_df = pd.concat(combined_df, ignore_index=True) print("Fixing data types...") combined_df = _convert_string_to_list(combined_df) return combined_df