Source code for maui.visualizations.visualizations

"""
    This module offers visualization tools for acoustic analysis, providing
    functions to generate radar, histogram, violin, and spectrogram plots. These
    visualizations aid in the comparison and analysis of acoustic indices
    extracted from audio files, facilitating the understanding of soundscapes
    and audio properties. The module leverages Plotly for generating interactive
    plots, offering flexibility in exploring and presenting acoustic data.

    These tools are designed for researchers, ecologists, and sound engineers
    interested in analyzing audio data, particularly for environmental sound
    analysis, bioacoustics, and similar fields.

    Usage examples and parameters for each function are provided within their
    respective docstrings, guiding their application in various analysis scenarios.

    Dependencies:
    - pandas for data manipulation.
    - plotly for interactive plotting.
    - maad for acoustic feature extraction and analysis.

    Note:
    - Ensure that audio files are accessible and properly formatted before analysis.
    - Function parameters allow customization of plots, including aspects like
      figure size and grouping.
"""

import math
import copy
import warnings
import os
import re

from dateutil import parser
import pandas as pd
from pandas.api.types import CategoricalDtype
import numpy as np
import matplotlib as plt
import plotly.express as px
from plotly.subplots import make_subplots
from plotly.colors import qualitative
import plotly.graph_objects as go

from maad import sound, util

import maui.utils


[docs] def indices_radar_plot( df, indices: list, agg_type: str, group_by: list = None, max_cols: int = 3, fig_size: dict = None, show_plot: bool = True, ): """ Create a radar plot to compare indices in a DataFrame. This function generates a radar plot to compare multiple indices from a DataFrame. It allows aggregating data based on specified aggregation types and grouping by one or two columns from the DataFrame. Parameters ---------- df : pandas.DataFrame The input DataFrame containing the data. indices : list A list of column names in the DataFrame representing the indices to be compared. agg_type : str The type of aggregation to be applied ('mean', 'median', 'stddev', 'var', 'max', 'min'). group_by : list, optional A list of one or two column names for grouping data (default is None). max_cols : int, optional Maximum number of columns for subplots (default is 3). fig_size : dict, optional A dictionary specifying the height and width of the plot (default is None). show_plot : bool, optional Whether to display the plot (default is True). Returns ------- plotly.graph_objs._figure.Figure A Plotly Figure object representing the radar plot. Raises ------ AssertionError If the arguments are not correctly specified. Exception If the input data or arguments are invalid. Examples -------- >>> from maui import samples, acoustic_indices, visualizations >>> df = samples.get_leec_audio_sample() >>> indices_list = ['median_amplitude_envelope', 'temporal_entropy'] >>> df = acoustic_indices.calculate_acoustic_indices(df, indices_list, parallel=False) >>> fig = visualizations.indices_radar_plot(df, indices=['m', 'ht'], agg_type='mean', group_by=['environment'], max_cols=3) # Generates a radar plot comparing 'Index1' and 'Index2' aggregated by 'Category'. Notes ----- - The 'agg_type' argument must be one of ['mean', 'median', 'stddev', 'var', 'max', 'min']. - The 'group_by' argument can contain one or two columns for grouping data. - 'fig_size' should be a dictionary with 'height' and 'width' keys. """ # 0. Initial configuration # 0.1. Verify if agg_type is available agg_options = ["mean", "median", "stddev", "var", "max", "min"] assert agg_type in agg_options, f"'{agg_type}' is not in {agg_options}" # 0.2. Verify if group_by column is available if group_by is not None: for col in group_by: assert col in df.columns, f"'{col}' is not in {df.column}" # 0.2.1. Verify if there is a maximum of two categories to group by if len(group_by) > 2: raise AttributeError("Sorry, the maximum categories to group by is 2") # 0.3. Verify if the select indices have been already calculated if indices is None or len(indices) == 0: raise IndexError("Sorry, the indices list must be non empty.") for index in indices: assert index in df.columns, ( f"'{index}' is not in {df.columns}. " "Verify if it is correctly spelled and if it have been calculated already." ) # 0.4. Verify if fig_size is correctly defined (has two keys, height and width) if fig_size is not None: if "height" not in fig_size.keys() or "width" not in fig_size.keys(): raise AttributeError("fig_size must contain width and height keys.") # 0.5 Create a copy of the dataframe df_unpivot = copy.deepcopy(df) # -------------------------------------------- # 1. Normalize columns to better suit the plot for index in indices: df_unpivot[index] = (df_unpivot[index] - df_unpivot[index].min()) / ( df_unpivot[index].max() - df_unpivot[index].min() ) # -------------------------------------------- # 2. Unpivot dataframe df_unpivot = pd.melt(df_unpivot, id_vars=group_by, value_vars=indices) # -------------------------------------------- # 3. Aggregate data gb_cols = ["variable"] if group_by is not None: for col in group_by: gb_cols.append(col) if agg_type == "mean": df_unpivot = df_unpivot.groupby(gb_cols).mean().reset_index() if agg_type == "median": df_unpivot = df_unpivot.groupby(gb_cols).median().reset_index() if agg_type == "stddev": df_unpivot = df_unpivot.groupby(gb_cols).std().reset_index() if agg_type == "var": df_unpivot = df_unpivot.groupby(gb_cols).var().reset_index() if agg_type == "max": df_unpivot = df_unpivot.groupby(gb_cols).max().reset_index() if agg_type == "min": df_unpivot = df_unpivot.groupby(gb_cols).min().reset_index() df_unpivot = df_unpivot.rename(columns={"variable": "index"}) # -------------------------------------------- # 4. Plot data n_cols = 1 n_rows = 1 if group_by is not None and len(group_by) > 1: n_cols = min(len(list(df[group_by[0]].unique())), max_cols) n_rows = math.ceil(len(list(df[group_by[0]].unique())) / max_cols) fig = make_subplots( rows=n_rows, cols=n_cols, specs=[[{"type": "polar"}] * n_cols] * n_rows, subplot_titles=[" "] * n_cols * n_rows, ) col = 1 row = 1 showlegend = True colors = px.colors.qualitative.Plotly # se não for agrupar if group_by is None: df_tmp = copy.copy(df_unpivot) r = list(df_tmp["value"]) r.append(r[0]) theta = list(df_tmp["index"]) theta.append(theta[0]) fig.add_trace(go.Scatterpolar(r=r, theta=theta, mode="lines"), row=1, col=1) else: if len(group_by) == 1: lables_list = list(df[group_by[0]].unique()) else: lables_list = list(df[group_by[1]].unique()) if len(lables_list) > len(colors): warnings.warn( "There are more categories than available color, " "some categories may use the same color" ) for ind, category in enumerate(list(df[group_by[0]].unique())): df_tmp = copy.copy(df_unpivot[df_unpivot[group_by[0]] == category]) if len(group_by) == 1: r = list(df_tmp["value"]) r.append(r[0]) theta = list(df_tmp["index"]) theta.append(theta[0]) fig.add_trace( go.Scatterpolar( name=category, r=r, theta=theta, mode="lines", legendgroup=category, showlegend=showlegend, ), row=row, col=col, ) if category in lables_list: lables_list.remove(category) if len(lables_list) == 0: showlegend = False else: for j, filter_col in enumerate(list(df[group_by[1]].unique())): df_tmp_final = copy.copy(df_tmp[df_tmp[group_by[1]] == filter_col]) r = list(df_tmp_final["value"]) if len(r) > 0: r.append(r[0]) theta = list(df_tmp_final["index"]) theta.append(theta[0]) if filter_col not in lables_list: showlegend = False else: lables_list.remove(filter_col) showlegend = True fig.add_trace( go.Scatterpolar( name=filter_col, r=r, theta=theta, mode="lines", legendgroup=filter_col, showlegend=showlegend, line_color=colors[j % len(colors)], ), row=row, col=col, ) fig.update_polars(radialaxis_showticklabels=False) fig.layout.annotations[ind]["text"] = category fig.layout.annotations[ind]["yshift"] = 25 if col >= max_cols: col = 1 row += 1 else: col += 1 fig.update_layout(title="Radar Plot - Comparisson between indices", title_x=0.5) fig.layout.autosize = True if fig_size is not None: fig.update_layout(height=fig_size["height"], width=fig_size["width"]) fig.update_layout(polar={"radialaxis": {"showticklabels": False}}) if show_plot: fig.show() return fig
# -----------------------------------------------------------------------
[docs] def indices_histogram_plot( df, indices: list, group_by: str = None, max_cols: int = 3, fig_size: dict = None, show_plot: bool = True, ): """ Create histogram plots to visualize the distribution of indices in a DataFrame. This function generates histogram plots to visualize the distribution of one or more indices from a DataFrame. It provides the option to group data by a single category column. Parameters ---------- df : pandas.DataFrame The input DataFrame containing the data. indices : list A list of column names in the DataFrame representing the indices to be plotted. group_by : str, optional A column name for grouping data (default is None). max_cols : int, optional Maximum number of columns for subplots (default is 3). fig_size : dict, optional A dictionary specifying the height and width of the plot (default is None). show_plot : bool, optional Whether to display the plot (default is True). Returns ------- plotly.graph_objs._figure.Figure A Plotly Figure object representing the histogram plot. Raises ------ AssertionError If the arguments are not correctly specified. Exception If the input data or arguments are invalid. Examples -------- >>> from maui import samples, acoustic_indices, visualizations >>> df = samples.get_leec_audio_sample() >>> indices_list = ['median_amplitude_envelope', 'temporal_entropy'] >>> df = acoustic_indices.calculate_acoustic_indices(df, indices_list, parallel=False) >>> fig = visualizations.indices_histogram_plot(df, indices=['m', 'ht'], group_by=None, max_cols=3) Notes ----- - The 'group_by' argument is optional, but if provided, only one index can be plotted. - 'fig_size' should be a dictionary with 'height' and 'width' keys. """ # 0. Initial configuration # 0.1. Verify if group_by column is available if group_by is not None: assert group_by in list( df.columns ), f"'{group_by}' is not in {list(df.columns)}" if len(indices) > 1: raise Exception( "Sorry, to group by some category, only one index is supported." ) # 0.2. Verify if the select indices have been already calculated if indices is None or len(indices) == 0: raise Exception("Sorry, the indices list must be non empty.") for index in indices: assert index in df.columns, ( f"'{index}' is not in {df.columns}. " "Verify if it is correctly spelled and if it have been calculated already." ) # 0.3. Verify if fig_size is correctly defined (has two keys, height and width) if fig_size is not None: if "height" not in fig_size.keys() or "width" not in fig_size.keys(): raise Exception("fig_size must contain width and height keys.") # -------------------------------------------- # 1. Plot data if group_by is None: n_cols = min(len(indices), max_cols) n_rows = math.ceil(len(indices) / max_cols) else: n_cols = min(len(df[group_by].unique()), max_cols) n_rows = math.ceil(len(df[group_by].unique()) / max_cols) fig = make_subplots( rows=n_rows, cols=n_cols, specs=[[{"type": "histogram"}] * n_cols] * n_rows, subplot_titles=[" "] * n_cols * n_rows, ) col = 1 row = 1 showlegend = True # add one trace for each index if group_by is None: for i, index in enumerate(indices): fig.add_trace( go.Histogram(name=index, x=df[index], showlegend=showlegend), row=row, col=col, ) fig.layout.annotations[i]["text"] = index fig.layout.annotations[i]["yshift"] = 25 if col >= max_cols: col = 1 row += 1 else: col += 1 else: for i, category in enumerate(df[group_by].unique()): df_index = df[df[group_by] == category] fig.add_trace( go.Histogram(name=index, x=df_index[indices[0]], showlegend=showlegend), row=row, col=col, ) fig.layout.annotations[i]["text"] = category fig.layout.annotations[i]["yshift"] = 25 if col >= max_cols: col = 1 row += 1 else: col += 1 fig.update_layout( title="Histogram Plot - Distribution of selected indices", title_x=0.5 ) fig.layout.autosize = True if fig_size is not None: fig.update_layout(height=fig_size["height"], width=fig_size["width"]) fig.update_layout(polar={"radialaxis": {"showticklabels": False}}) if show_plot: fig.show() return fig
# -----------------------------------------------------------------------
[docs] def indices_violin_plot( df, indices: list, group_by: str = None, fig_size: dict = None, show_plot: bool = True, ): """ Create violin plots to visualize the distribution of indices in a DataFrame. This function generates violin plots to visualize the distribution of one or more indices from a DataFrame. It provides the option to group data by a single category column. Parameters ---------- df : pandas.DataFrame The input DataFrame containing the data. indices : list A list of column names in the DataFrame representing the indices to be plotted. group_by : str, optional A column name for grouping data (default is None). fig_size : dict, optional A dictionary specifying the height and width of the plot (default is None). show_plot : bool, optional Whether to display the plot (default is True). Returns ------- plotly.graph_objs._figure.Figure A Plotly Figure object representing the violin plot. Raises ------ AssertionError If the arguments are not correctly specified. Exception If the input data or arguments are invalid. Examples -------- >>> from maui import samples, acoustic_indices, visualizations >>> df = samples.get_leec_audio_sample() >>> indices_list = ['median_amplitude_envelope', 'temporal_entropy'] >>> df = acoustic_indices.calculate_acoustic_indices(df, indices_list, parallel=False) >>> fig = visualizations.indices_violin_plot(df, indices=['m', 'ht'], group_by=None) Notes ----- - The 'group_by' argument is optional and allows grouping data by a single category column. - 'fig_size' should be a dictionary with 'height' and 'width' keys. """ # 0. Initial configuration # 0.1. Verify if group_by column is available if group_by is not None: assert group_by in df.columns, f"'{group_by}' is not in {df.columns}" # 0.2. Verify if the select indices have been already calculated if indices is None or len(indices) == 0: raise AttributeError("Sorry, the indices list must be non empty.") for index in indices: assert index in df.columns, ( f"'{index}' is not in {df.columns}. " "Verify if it is correctly spelled and if it have been calculated already." ) # 0.4. Verify if fig_size is correctly defined (has two keys, height and width) if fig_size is not None: if "height" not in fig_size.keys() or "width" not in fig_size.keys(): raise AttributeError("fig_size must contain width and height keys.") # -------------------------------------------- # 2. Plot data n_cols = 1 n_rows = len(indices) fig = make_subplots( rows=n_rows, cols=n_cols, specs=[[{"type": "histogram"}] * n_cols] * n_rows, subplot_titles=[" "] * n_cols * n_rows, ) showlegend = True violin_width = 0 if fig_size is not None and group_by is not None: violin_width = ( fig_size["width"] / (len(list(df[group_by].unique()))) ) / fig_size["width"] elif fig_size is not None and group_by is None: violin_width = 0.3 colors = px.colors.qualitative.Plotly if group_by is not None: lables_list = list(df[group_by].unique()) categories = list(df[group_by].unique()) if len(lables_list) > len(colors): warnings.warn( "There are more categories than available color, " "some categories may use the same color" ) # add one trace for each index for i, index in enumerate(indices): if group_by is not None: for j, lab in enumerate(categories): fig.add_trace( go.Violin( x=df[group_by][df[group_by] == lab], y=df[index][df[group_by] == lab], name=lab, box_visible=True, meanline_visible=True, points="all", scalemode="width", width=violin_width, legendgroup=lab, showlegend=showlegend, marker_color=colors[j % len(colors)], line_color=colors[j % len(colors)], ), row=i + 1, col=1, ) if lab in lables_list: lables_list.remove(lab) if len(lables_list) == 0: showlegend = False else: fig.add_trace( go.Violin( y=df[index], name="", box_visible=True, meanline_visible=True, points="all", scalemode="width", width=violin_width, showlegend=False, ), row=i + 1, col=1, ) fig.layout.annotations[i]["text"] = index fig.layout.annotations[i]["yshift"] = 25 fig.update_layout( title="""Violin Plot - Distribution of selected indices""", title_x=0.5 ) fig.layout.autosize = True if fig_size is not None: fig.update_layout(height=fig_size["height"], width=fig_size["width"]) if show_plot: fig.show() return fig
# -----------------------------------------------------------------------
[docs] def spectrogram_plot( file_path: str, mode: str = None, window: str = "hann", nperseg: int = 1024, noverlap: int = None, verbose: bool = False, fig_size: dict = None, show_plot: bool = True, ): """ Create a spectrogram plot from an audio file. This function loads an audio file, computes its spectrogram, and generates a heatmap plot to visualize the frequency content over time. Parameters ---------- file_path : str The path to the audio file. mode : str, optional The spectrogram mode ('psd', 'mean', 'complex'). Default is None. window : str, optional The window function to be used for the spectrogram calculation. Default is 'hann'. nperseg : int, optional The number of data points used in each block for the FFT. Default is 1024. noverlap : int, optional The number of points of overlap between blocks. Default is None. verbose : bool, optional Whether to display verbose information during computation. Default is False. fig_size : dict, optional A dictionary specifying the height and width of the plot. Default is None. show_plot : bool, optional Whether to display the plot. Default is True. Returns ------- plotly.graph_objs._figure.Figure A Plotly Figure object representing the spectrogram plot. Raises ------ AssertionError If the arguments are not correctly specified. Exception If there are errors in the file loading or spectrogram computation. Examples -------- >>> from maui import samples, visualizations >>> df = samples.get_leec_audio_sample() >>> file_path = df.at[df.index[1],'file_path'] >>> mode = 'psd' >>> fig = visualizations.spectrogram_plot(file_path, mode=mode) Notes ----- - The 'mode' parameter specifies the type of spectrogram to be computed: Power Spectral Density ('psd'), Amplitude Spectrogram ('amplitude'), or Complex Spectrogram ('complex'). - 'fig_size' should be a dictionary with 'height' and 'width' keys. """ # 0. Validations # 0.1. available modes mode_options = ["psd", "mean", "complex"] assert mode in mode_options, f"'{mode}' is not in {mode_options}" # 0.2. Verify if fig_size is correctly defined (has two keys, height and width) if fig_size is not None: if "height" not in fig_size.keys() or "width" not in fig_size.keys(): raise AttributeError("fig_size must contain width and height keys.") # -------------------------------------------- # 1. Load file s, fs = sound.load(file_path) # 2. Calculate spectrogram sxx, tn, fn, _ = sound.spectrogram( s, fs, nperseg=nperseg, noverlap=noverlap, verbose=verbose, mode=mode, window=window, ) if mode == "psd": sxx_disp = util.power2dB(sxx) if mode == "amplitude": sxx_disp = util.amplitude2dB(sxx) if mode == "complex": sxx_disp = util.amplitude2dB(sxx) fig_size = {"height": 500, "width": 1200} fig = go.Figure( data=go.Heatmap(z=sxx_disp, x=tn, y=fn, colorscale="gray", hoverinfo=None) ) fig.update_layout( title=f"""Spectrogram generated from the file {os.path.basename(file_path)}""", title_x=0.5, ) if fig_size is not None: fig.update_layout(height=fig_size["height"], width=fig_size["width"]) if show_plot: fig.show() return fig
# ----------------------------------------------------------------------- def _parse_time_format(time_str: str): """ Parse a time string into a 24-hour format (HH:MM). Parameters ---------- time_str : str The time string to parse. Can be in various formats (e.g., '9am', '09:00'). Returns ------- str The parsed time in HH:MM format. Raises ------ ValueError If the input time string is invalid or cannot be parsed. """ try: # Use dateutil's parser to interpret different time formats parsed_time = parser.parse(time_str) # Format the time in HH:MM (24-hour format) return parsed_time.strftime("%H:%M") except (ValueError, TypeError) as exc: # Raise an exception if the format can't be parsed raise ValueError(f"Invalid time format: {time_str}") from exc def _truncate_time_to_bin(time_str: str, time_bin_size: int) -> str: """ Truncate the given time string to the nearest time bin based on `time_bin_size`. Parameters ---------- time_str : str The time string in HH:MM format. time_bin_size : int The size of the time bin in minutes (e.g., 5, 10, 15). Returns ------- str The truncated time string in HH:MM format. """ # Parse the time string to a datetime object parsed_time = pd.to_datetime(time_str, format="%H:%M") # Extract hour and minute minute = parsed_time.minute # Truncate minute to the nearest bin truncated_minute = (minute // time_bin_size) * time_bin_size # Replace the minute in the parsed time truncated_time = parsed_time.replace(minute=truncated_minute) # Return the formatted truncated time in HH:MM format return truncated_time.strftime("%H:%M") def _aggregate_dataframe( df: pd.DataFrame, gb_cols: list, grouped_col: str, agg_type: str ) -> pd.DataFrame: """ Aggregate a DataFrame by specified columns and aggregation type. Parameters ---------- df : pd.DataFrame The DataFrame to aggregate. gb_cols : list List of columns to group by. grouped_col : str The column to aggregate. agg_type : str The type of aggregation to perform. Options: "mean", "median", "stddev", "var", "max", "min". Returns ------- pd.DataFrame The aggregated DataFrame. Raises ------ AttributeError If the `agg_type` is not one of the supported options. """ if agg_type == "mean": df_agg = df.groupby(gb_cols).mean(grouped_col).reset_index() elif agg_type == "median": df_agg = df.groupby(gb_cols).median(grouped_col).reset_index() elif agg_type == "stddev": df_agg = df.groupby(gb_cols).std(grouped_col).reset_index() elif agg_type == "var": df_agg = df.groupby(gb_cols).var(grouped_col).reset_index() elif agg_type == "max": df_agg = df.groupby(gb_cols).max(grouped_col).reset_index() elif agg_type == "min": df_agg = df.groupby(gb_cols).min(grouped_col).reset_index() else: agg_options = ["mean", "median", "stddev", "var", "max", "min"] raise AttributeError(f"'{agg_type}' is not in {agg_options}.") df_agg = df_agg.rename(columns={grouped_col: "metric"}) return df_agg
[docs] def diel_plot( df: pd.DataFrame, date_col: str, time_col: str, duration_col: str, time_bin_size: int, color_map_col: str, agg_type: str = None, show_plot: bool = True, **kwargs, ): """ Create a diel plot (heatmap) based on time and date columns. Parameters ---------- df : pd.DataFrame Input DataFrame containing date, time, and color mapping columns. date_col : str Column name for the date in the DataFrame. time_col : str Column name for the time in the DataFrame. duration_col : str Column name for the duration of each event in the DataFrame. time_bin_size : int The size of the time bin in minutes. Must be between 1 and 60. color_map_col : str Column used to color the plot. Can be numeric or categorical. agg_type : str, optional Aggregation type for numeric `color_map_col`. Default is None. show_plot : bool, optional Whether to show the plot. Default is True. **kwargs : dict Additional arguments for plot customization, such as `height` and `width`. Returns ------- plotly.graph_objects.Figure The generated diel plot as a Plotly figure. Raises ------ AttributeError If the `time_bin_size` is not between 1 and 60, or if `color_map_col` is not of numeric or string type. Warnings -------- UserWarning If any rows have durations greater than the `time_bin_size`, or if the date column contains invalid dates. Examples -------- >>> from maui import samples, utils, visualizations >>> df = samples.get_audio_sample(dataset="leec") >>> def convert_to_seconds(duration_str): >>> try: >>> minutes, seconds = map(int, duration_str.split(':')) >>> return minutes * 60 + seconds >>> except ValueError: >>> # Handle the case where the input is not in "mm:ss" format >>> raise ValueError(f"Invalid duration format: {duration_str}") >>> >>> # Apply the function to the 'duration' column >>> df = pd.read_csv('xc_data.csv') >>> df['length'] = df['length'].apply(convert_to_seconds) >>> >>> df = df[~df['time'].str.contains(r'?', na=False)] >>> df = df[df['time'] != 'am'] >>> df = df[df['time'] != 'pm'] >>> df = df[df['time'] != 'xx:xx'] >>> df = df[df['time'] != '?:?'] >>> fig = visualizations.diel_plot(df, date_col='date', >>> time_col='time', duration_col='length', >>> time_bin_size=1, color_map_col='group', >>> show_plot= True) """ # 0. Prepare date and time columns # 0.1. Parse time column df[time_col] = df[time_col].apply(_parse_time_format) # 0.2. Verify overlaps if time_bin_size < 1 or time_bin_size > 60: raise AttributeError( "time_bin_size must be an integer between 1 and 60, representing minutes" ) df_time_check = df[df[duration_col] > time_bin_size * 60] if len(df_time_check) > 0: warnings.warn( f"Warning: {len(df_time_check)} rows have a duration greater than the " f"time_bin_size of {time_bin_size} minutes. The time will be " "truncated according to time_bin_size. You should consider " "segmenting audio files so each one does not exceed " "time_bin_size duration." ) # 0.3. Truncate time column according to time_bin_size df[time_col] = df[time_col].apply(lambda t: _truncate_time_to_bin(t, time_bin_size)) # 0.4. Force date format df[date_col] = pd.to_datetime(df[date_col], format="%Y-%m-%d", errors="coerce") invalid_dates = df[date_col].isna().sum() if invalid_dates > 0: warnings.warn( f"Warning: {invalid_dates} rows have invalid dates. This rows " "will be ignored in the visualization." ) df = df.dropna(subset=[date_col]) # 1. Aggregate dataframe if pd.api.types.is_string_dtype(df[color_map_col]): # count by color_map_col df_plot = df.groupby([date_col, time_col]).size().reset_index(name="metric") color_title = "Number of samples" elif pd.api.types.is_numeric_dtype(df[color_map_col]): # aggregate color_map_col if agg_type is None: raise AttributeError("agg_type should not be None") df_plot = _aggregate_dataframe( df, [date_col, time_col], color_map_col, agg_type ) color_title = f"{agg_type} of {color_map_col}" else: raise AttributeError(f"'{color_map_col}' should be string or numeric type.") # 2. Plot image fig = go.Figure( data=go.Heatmap( z=df_plot["metric"], x=df_plot[time_col], y=df_plot[date_col], colorscale="Viridis", colorbar={"title": color_title}, ) ) # Set axis labels fig.update_layout( title="Diel Plot", title_x=0.5, xaxis_title="Time of day", yaxis_title="Date" ) if "height" in kwargs and "width" in kwargs: fig.update_layout(height=kwargs["height"], width=kwargs["width"]) if show_plot: fig.show() return fig
# ----------------------------------------------------------------------- def _display_false_color_spectrogram( df: pd.DataFrame, fc_spectrogram: np.array, indices: list, fig_size: dict, tick_interval: int, ): """ Display a false color spectrogram using Plotly. This function visualizes a false color spectrogram generated from acoustic indices. The spectrogram is displayed using Plotly with customized hover text and axis formatting. Parameters ---------- df : pd.DataFrame DataFrame containing the timestamps for the spectrogram. fc_spectrogram : np.array A 3D numpy array representing the false color spectrogram, where the third dimension corresponds to the color channels (R, G, B). fig_size : dict Dictionary specifying the figure size with 'width' and 'height' keys. If None, default values {'width': 2000, 'height': 1000} are used. tick_interval : int Interval for selecting ticks on the x-axis. If None, the default value is 40. Raises ------ AttributeError If `fig_size` does not contain both 'width' and 'height' keys. Notes ----- - The spectrogram is displayed with customized hover text showing the timestamp for each pixel. - The function uses Plotly's `go.Figure` and `go.Image` for rendering the image. - The layout is updated to ensure the spectrogram is displayed correctly with proper scaling and formatting. """ fig_size = {"width": 2000, "height": 1000} if fig_size is None else fig_size tick_interval = 40 if tick_interval is None else tick_interval if "height" not in fig_size.keys() or "width" not in fig_size.keys(): raise AttributeError("fig_size must contain width and height keys.") # 3.1 Create the figure fig = go.Figure() # 3.2. Add the image trace with hover text hover_text = df["timestamp"].dt.strftime("%Y-%m-%d %H:%M:%S").tolist() # Create hover text for each pixel in the image customdata = np.array([hover_text] * fc_spectrogram.shape[0]) fig.add_trace( go.Image( z=fc_spectrogram, customdata=customdata, hovertemplate="Timestamp: %{customdata}<extra></extra>", ) ) width = None height = None if fig_size is not None: width = fig_size["width"] height = fig_size["height"] # Create the x-axis values based on the timestamp x_axis_values = df["timestamp"].dt.strftime("%Y-%m-%d %H:%M:%S").tolist() # Select a subset of x ticks based on the tick_interval tick_indices = list(range(0, len(x_axis_values), tick_interval)) tick_values = [x_axis_values[i] for i in tick_indices] # 3.3. Update layout for better visualization fig.update_layout( title=f"""{re.sub(r'_per_bin', '', indices[0])} (R), """ f"""{re.sub(r'_per_bin', '', indices[1])} (G) and {indices[2]} """ f"""(B) False Color Spectrogram""", xaxis={ "showgrid": False, "zeroline": False, "tickvals": tick_indices, "ticktext": tick_values, "tickangle": 90, }, yaxis={ "showgrid": False, "zeroline": False, "scaleanchor": "x", "autorange": True, "range": [0, fc_spectrogram.shape[0]], }, margin={"l": 0, "r": 0, "t": 30, "b": 0}, width=width, height=height, ) # Display the image fig.show()
[docs] def false_color_spectrogram_plot( df, datetime_col: str, indices: list, display: bool = True, unit: str = "scale_60", **kwargs, ) -> np.array: """ Generate and optionally display a false color spectrogram from acoustic indices. This function creates a false color spectrogram by normalizing and combining selected acoustic indices from a DataFrame. The spectrogram can be displayed using Plotly and is returned as a 3D numpy array. Parameters ---------- df : pd.DataFrame DataFrame containing the acoustic indices and timestamp data. datetime_col : str Name of the column in `df` that contains datetime values. indices : list List of column names corresponding to the acoustic indices to be used for the R, G, and B channels of the spectrogram. display : bool, optional If True, the spectrogram is displayed using Plotly. Default is True. unit : str, optional The time unit to truncate the timestamps from 0.2 seconds to 60 seconds. Must be one of ['scale_02', 'scale_04', 'scale_06', 'scale_2', 'scale_4', 'scale_6', 'scale_12', 'scale_24']. Default is 'scale_60'. **kwargs : dict, optional Additional arguments for customizing the display: - fig_size (dict): Dictionary specifying the figure size with 'width' and 'height' keys. - tick_interval (int): Interval for selecting ticks on the x-axis. Returns ------- np.array A 3D numpy array representing the false color spectrogram, where the third dimension corresponds to the color channels (R, G, B). Raises ------ IndexError If `indices` is None or empty. AssertionError If any of the specified `indices` are not found in the DataFrame columns. Exception If `unit` is not one of the available units. Notes ----- - The function first checks that the selected indices are available in the DataFrame and that the specified time unit is valid. - The DataFrame is sorted by timestamp, and timestamps are truncated according to the specified unit. - Acoustic indices are normalized to the range [0, 255] and combined to form the false color spectrogram. - If `display` is True, the spectrogram is displayed using Plotly, with customizable figure size and tick interval. Examples -------- >>> from maui import samples, utils, visualizations >>> df = samples.get_audio_sample(dataset="leec") >>> df["dt"] = pd.to_datetime(df["timestamp_init"]).dt.date >>> def pre_calculation_method(s, fs): >>> Sxx_power, tn, fn, ext = maad.sound.spectrogram (s, fs) >>> Sxx_noNoise= maad.sound.median_equalizer(Sxx_power, display=False, extent=ext) >>> Sxx_dB_noNoise = maad.util.power2dB(Sxx_noNoise) >>> >>> Sxx, tn, fn, ext = maad.sound.spectrogram(s, fs, mode='amplitude') >>> >>> pre_calc_vars = {'Sxx': Sxx, 'tn':tn , 'fn':fn , 'ext':ext, >>> 'Sxx_dB_noNoise':Sxx_dB_noNoise } >>> return pre_calc_vars >>> >>> def get_aci(pre_calc_vars): >>> aci_xx, aci_per_bin, aci_sum = ( >>> maad.features.acoustic_complexity_index(pre_calc_vars['Sxx'])) >>> indices = {'aci_xx': aci_xx, 'aci_per_bin':aci_per_bin , 'aci_sum':aci_sum} >>> return indices >>> >>> def get_spectral_events(pre_calc_vars): >>> EVNspFract_per_bin, EVNspMean_per_bin, EVNspCount_per_bin, EVNsp = ( >>> maad.features.spectral_events( >>> pre_calc_vars['Sxx_dB_noNoise'], >>> dt=pre_calc_vars['tn'][1] - pre_calc_vars['tn'][0], >>> dB_threshold=6, >>> rejectDuration=0.1, >>> display=False, >>> extent=pre_calc_vars['ext']) >>> ) >>> >>> indices = {'EVNspFract_per_bin': EVNspFract_per_bin, >>> 'EVNspMean_per_bin':EVNspMean_per_bin, >>> 'EVNspCount_per_bin':EVNspCount_per_bin, 'EVNsp':EVNsp} >>> return indices >>> def get_spectral_activity(pre_calc_vars): >>> ACTspfract_per_bin, ACTspcount_per_bin, ACTspmean_per_bin = ( maad.features.spectral_activity(pre_calc_vars['Sxx_dB_noNoise'])) >>> indices = {'ACTspfract_per_bin': ACTspfract_per_bin, >>> 'ACTspcount_per_bin':ACTspcount_per_bin, >>> 'ACTspmean_per_bin':ACTspmean_per_bin} >>> return indices >>> acoustic_indices_methods = [get_aci, get_spectral_activity, get_spectral_events] >>> >>> df_temp = df.iloc[0:1] >>> segmented_df = utils.false_color_spectrogram_prepare_dataset( >>> df_temp, >>> datetime_col = 'timestamp_init', >>> duration_col = 'duration', >>> file_path_col = 'file_path', >>> indices = ['acoustic_complexity_index', 'spectral_activity', 'spectral_events'], >>> output_dir = './segmented_indices', >>> store_audio_segments = True, >>> unit = 'scale_02', >>> acoustic_indices_methods = acoustic_indices_methods, >>> pre_calculation_method = pre_calculation_method, >>> temp_dir = os.path.abspath('./temp_ac_files/'), >>> parallel = True >>> ) >>> >>> fcs = visualizations.false_color_spectrogram_plot( >>> segmented_df, >>> datetime_col = 'start_time', >>> indices = ['aci_per_bin', 'ACTspfract_per_bin', 'EVNspCount_per_bin'], >>> display = True, >>> unit = 'scale_02' >>> ) """ # 0. Initial configuration # 0.1. Verify if the select indices have been already calculated if indices is None or len(indices) == 0: raise IndexError("Sorry, the indices list must be non empty.") for index in indices: assert index in df.columns, ( f"'{index}' is not in {df.columns}. " "Verify if it is correctly spelled and if it have been calculated already." ) # 0.2. Verify if the unity is accepted available_units = [ "scale_02", "scale_04", "scale_06", "scale_2", "scale_4", "scale_6", "scale_12", "scale_24", "scale_60", ] if unit not in available_units: raise Exception( f"""The unity {unit} is not available. """ f"""The list of available unities is: {available_units}""" ) # 1. Order original dataset by timestamp and create helper columns df = df.sort_values(by=datetime_col) trunc_unit = "min" if unit != "scale_60": trunc_unit = "s" df["timestamp"] = df[datetime_col].dt.floor(trunc_unit) # 2. Normalize index and create false color spectrogram fc_spectrogram = [] for index in indices: ind = df[index].tolist() ind = np.asarray( ind ).T # transpose the array to place frequencies are in y axis ind_normalized = (255 * (ind - ind.min()) / (ind.max() - ind.min())).astype( np.uint8 ) fc_spectrogram.append(ind_normalized) fc_spectrogram = np.asarray(fc_spectrogram) fc_spectrogram = np.transpose(fc_spectrogram, (1, 2, 0)) # 3. Display false color spectrogram if display: _display_false_color_spectrogram( df, fc_spectrogram, indices, fig_size=kwargs["fig_size"] if "fig_size" in kwargs else None, tick_interval=( kwargs["tick_interval"] if "tick_interval" in kwargs else None ), ) return fc_spectrogram
# ----------------------------------------------------------------------------------------
[docs] def polar_bar_plot( df: pd.DataFrame, date_time_col: str, categories_col: str, percent: bool = False, show_plot: bool = True, **kwargs, ) -> go.Figure: """ Generate a polar bar plot to visualize category occurrences over the year. It will group data by day of the year, keep this in mind if you have more than one year of data. Parameters ---------- df : pandas.DataFrame The input dataframe containing the data to plot. date_time_col : str The name of the column in `df` containing date or datetime values. categories_col : str The name of the column in `df` representing the categorical variable. percent : bool, optional, default: False If True, the plot will display the data as percentages of total occurrences for each day. If False, raw counts will be used. show_plot : bool, optional, default: True If True, the plot will be displayed. If False, the plot will be returned without displaying it. **kwargs : dict, optional Additional keyword arguments passed to the plot layout, such as height and width for figure dimensions. Returns ------- plotly.graph_objects.Figure The generated polar bar plot figure. Raises ------ AssertionError If `date_time_col` or `categories_col` is not in `df`. AttributeError If `categories_col` contains continuous data instead of discrete categories. Warns ----- UserWarning If `date_time_col` contains invalid date values, a warning is issued, and those rows are ignored in the plot. Notes ----- - The `date_time_col` is converted to the day of the year (1 to 366, to account for leap years). - If `percent=True`, the data is normalized by day to represent the proportion of occurrences. Examples -------- >>> df = pd.DataFrame({ >>> 'date': pd.date_range(start='2023-01-01', periods=366, freq='D'), >>> 'category': ['A', 'B', 'C'] * 122 >>> }) >>> fig = polar_bar_plot(df, 'date', 'category', percent=True) >>> fig.show() """ # 0. Verify columns for col in [date_time_col, categories_col]: assert col in df.columns, ( f"'{col}' is not in {df.columns}. " "Verify if it is correctly spelled and if it has been calculated already." ) # 0.2. Verify if categories_col is categoric # if not pd.api.types.is_string_dtype(df[categories_col]): if not df[categories_col].apply(lambda x: isinstance(x, (int, str, bool))).all(): raise AttributeError( "The values of categories_col in the dataframe should be descrite, not continuous" ) # 1. Convert datetime to day of the year # 1.1. Convert date_time_col to datetime if not already df = df.copy() df[date_time_col] = pd.to_datetime( df[date_time_col], format="%Y-%m-%d", errors="coerce" ) invalid_dates = df[date_time_col].isna().sum() if invalid_dates > 0: warnings.warn( f"Warning: {invalid_dates} rows have invalid dates. This rows " "will be ignored in the visualization." ) df = df.dropna(subset=[date_time_col]) # 1.2. Create a 'day_of_year' column df["day_of_year"] = df[date_time_col].dt.dayofyear # 2. Get all days of the year (1 to 365 or 366) for a full year all_days = pd.DataFrame( {"day_of_year": np.arange(1, 367)} ) # 1-366 to include leap years # 3. Aggregate dataframe by day of year and categories df_agg = ( df.groupby(["day_of_year", categories_col]).size().reset_index(name="count") ) # 4. Merge with all days of the year to include missing days (fill with zero where no data) df_full = pd.merge(all_days, df_agg, on="day_of_year", how="left").fillna(0) df_full = df_full.sort_values(by="day_of_year") # 5. Create a new column for the day of the month df_full["date"] = pd.to_datetime(df_full["day_of_year"], format="%j") df_full["day_of_month"] = df_full["date"].dt.day # 6. Calculate percentages if needed if percent: total_counts = df_full.groupby("day_of_year")["count"].transform("sum") df_full["percent"] = ( df_full["count"] / total_counts * 100 ) # Convert to percentage r_value = "percent" # Use the 'percent' column for the plot else: r_value = "count" # Use the 'count' column for the plot cmap = maui.utils.get_blu_grn_palette() cmap = [plt.colors.rgb2hex(cmap(i)) for i in range(cmap.N)] # 7. Create polar plot fig = px.bar_polar( df_full, r=r_value, theta="day_of_year", color=categories_col, color_discrete_sequence=cmap, hover_data={"day_of_year": True, "day_of_month": True, "count": True}, ) # 8. Set custom hover text # 8.1. Primary data hovertemplate = ( "<b>Day of month</b>: %{customdata[1]}<br><b>Count</b>: %{customdata[2]}" ) # 8.2. Add percentage to hovertemplate if percent=True if percent: hovertemplate += "<br><b>Percentage</b>: %{r:.2f}%" # 8.3. update hover template fig.update_traces(hovertemplate=hovertemplate) # 9. Final plot customization fig.update_layout( title=f"Polar Bar Plot - {categories_col} over the year", title_x=0.5, polar={ "radialaxis": { "visible": True, "range": [ 0, (df_full["percent"] if percent else df_full["count"]).max() + 1, ], }, "angularaxis": { "tickvals": [1, 32, 60, 91, 121, 152, 182, 213, 244, 274, 305, 335], "ticktext": [ "Jan", "Feb", "Mar", "Apr", "May", "Jun", "Jul", "Aug", "Sep", "Oct", "Nov", "Dec", ], "rotation": 90, # Align Jan at the top }, }, ) # 9.2. Change fig size if custom fig size provided if "height" in kwargs and "width" in kwargs: fig.update_layout(height=kwargs["height"], width=kwargs["width"]) # 10. Show plot if show_plot: fig.show() return fig
[docs] def parallel_coordinates_plot( df: pd.DataFrame, indices: list, color_col: str, show_plot: bool = True ) -> go.Figure: """ Plots a parallel coordinates chart for comparing acoustic indices for some categorical variable. Parameters ---------- df : pandas.DataFrame Input DataFrame containing the data to plot. indices : list List of column names (strings) from `df` that represents the acoustic indices to be used as parallel axes. All must be numerical. color_col : str, optional Name of the column to use for coloring the lines. Can be categorical or numerical. Required. show_plot : bool, default True If True, the plot is displayed interactively. If False, the Plotly figure object is returned without displaying. Returns ------- fig : plotly.graph_objects.Figure The Plotly Figure object containing the parallel coordinates plot. Raises ------ IndexError If `indices` is empty or has fewer than two elements. AssertionError If any value in `indices` or `color_col` is not a column in `df`. Notes ----- - For categorical colors, colors and legend associations are assigned automatically. - The legend for categories appears below the plot and is adjusted to avoid overlapping plot elements. - If the coloring column is numerical, a standard colorbar is shown. - Only numerical columns can be used for parallel axes. """ if indices is None or len(indices) == 0: raise IndexError("The list of indices must be non-empty.") if len(indices) < 2: raise IndexError("At least two indices must be selected.") for index in indices: assert index in df.columns, f"'{index}' is not in {df.columns}." assert color_col in df.columns, f"'{color_col}' is not in {df.columns}." # Handle categorical coloring if ( isinstance(df[color_col].dtype, CategoricalDtype) or df[color_col].dtype == object ): df["_color_code"] = df[color_col].astype("category").cat.codes categories = list(df[color_col].astype("category").cat.categories) palette = qualitative.Plotly colors = [palette[i % len(palette)] for i in range(len(categories))] color_map = dict(zip(categories, colors)) n_cats = len(categories) if n_cats == 1: colorscale = [[0.0, colors[0]], [1.0, colors[0]]] else: colorscale = [[i / (n_cats - 1), color] for i, color in enumerate(colors)] color_column = df["_color_code"] showscale = False else: color_column = df[color_col] colorscale = "Viridis" showscale = True categories = [] color_map = {} dimensions = [ { "range": [df[column].min(), df[column].max()], "label": column, "values": df[column], } for column in indices ] fig = go.Figure( go.Parcoords( line={ "color": color_column, "colorscale": colorscale, "cmin": min(color_column), "cmax": max(color_column), "showscale": showscale, }, dimensions=dimensions, ) ) # Improved Legend: colored Unicode squares, non-overlapping, far below the plot if categories: annotations = [] max_per_row = 4 # You can adjust, e.g., 5 or 7 for your layout total = len(categories) num_rows = (total + max_per_row - 1) // max_per_row x_padding = 0.09 x_between = (1 - 2 * x_padding) / min(max_per_row, total) y_start = -0.36 # First row far below plot y_delta = 0.15 # Space between rows base_fig_height = 400 # Minimum figure height legend_row_height = 40 # Adjust for font/icon size fig_height = base_fig_height + (num_rows - 1) * legend_row_height base_margin_b = 220 # Works well for a 1-row legend extra_margin_per_row = 34 # Add for each additional legend row dynamic_bottom_margin = base_margin_b + extra_margin_per_row * max( 0, num_rows - 1 ) for i, cat in enumerate(categories): row = i // max_per_row col = i % max_per_row x = x_padding + (col + 0.5) * x_between y = y_start - row * y_delta wrapped_label = ( f"<span style='font-size:22px; color:{color_map[cat]}; vertical-align:middle'>■</span> " f"<span style='font-size:15px; display:inline-block; max-width:120px; " f"white-space:normal; word-break:break-all; vertical-align:middle'>{cat}</span>" ) annotations.append( { "x": x, "y": y, "xanchor": "center", "yanchor": "top", "xref": "paper", "yref": "paper", "showarrow": False, "align": "center", "text": wrapped_label, "font": {"size": 15}, } ) fig.update_layout( height=fig_height, margin={"b": dynamic_bottom_margin, "r": 40, "t": 40, "l": 40}, annotations=annotations, ) else: fig.update_layout(margin={"r": 40, "t": 40, "l": 40, "b": 40}) if show_plot: fig.show() return fig