Source code for htstools.normalize

"""Functions for normalizing data."""

from typing import Iterable, List, Optional, Union
from functools import partial

from carabiner import print_err
import pandas as pd

def _unique_entries_str(x: pd.DataFrame) -> str:
    return ','.join(sorted(map(str, x.unique())))


def _check_controls(
    data: pd.DataFrame, 
    measurement_col: str,
    control_col: str,
    value: str
) -> None:
        
    if value not in data[control_col].values:
        try:
            raise ValueError(f"{value=} is not in data:\n\t" +
                            _unique_entries_str(data[control_col]))
        except TypeError:
            raise TypeError(f"{control_col=} is not a str column.")

    return None


def _get_grouped_control_means(
    data: pd.DataFrame, 
    measurement_col: str,
    control_col: str,
    neg: str,
    pos: Optional[str] = None,
    group: Optional[Union[str, Iterable[str]]] = None
) -> pd.DataFrame:

    if group is None:
        group = '__group__'
        data = data.assign(**{group: group})
    if measurement_col not in data:
        raise KeyError(
            f"{measurement_col=} is not in data:\n\t{','.join(data.columns.tolist())}"
        )
    
    control_values = {
        control: value for control, value in zip(('neg_mean', 'pos_mean'), (neg, pos))
        if value is not None
    }
    mean_control_column_names = {
        control: measurement_col + s + '_mean' for control, s in zip(control_values, ('_neg', '_pos'))
    }

    control_checker = partial(
        _check_controls,
        data=data,
        measurement_col=measurement_col,
        control_col=control_col,
    )
    control_checker(value=neg)
    
    if pos is not None:
        control_checker(value=pos)

    for control, value in control_values.items():
        name = mean_control_column_names[control]
        control = (
            data
            .query(f'{control_col} == "{value}"')
            .groupby(group)[[measurement_col]]
            .mean()
            .reset_index(names=group)
            .rename(columns={measurement_col: name})
        )
        data = pd.merge(data, control, how='outer')

    if group == '__group__':
        data = data.drop(columns=group)
    return mean_control_column_names, data


def _normalize_pon(data: pd.DataFrame, 
                   measurement_col: str,
                   neg_mean: str,
                   pos_mean: Union[str, None] = None,
                   flip: bool = False) -> pd.DataFrame:

    norm_colname = measurement_col + '_norm.pon'
    
    if flip:
        data[norm_colname] = 1. - (data[measurement_col] / data[neg_mean])
    else:
        data[norm_colname] = data[measurement_col] / data[neg_mean]

    return data


def _normalize_npg(data: pd.DataFrame, 
                   measurement_col: str,
                   neg_mean: str,
                   pos_mean: str,
                   flip: bool = False) -> pd.DataFrame:

    norm_colname = measurement_col + '_norm.npg'
    
    if flip:
        data[norm_colname] = ((data[measurement_col] - data[neg_mean]) / 
                              (data[pos_mean] - data[neg_mean]))
    else:
        data[norm_colname] = ((data[measurement_col] - data[pos_mean]) / 
                              (data[neg_mean] - data[pos_mean]))

    return data


_NORMALIZATION_METHODS = {'npg': _normalize_npg, 
                          'pon': _normalize_pon}

[docs] def normalize(data: pd.DataFrame, measurement_col: str, control_col: str, neg: str, method: Union[str, None] = None, pos: Union[str, None] = None, group: Union[str, List[str], None] = None, flip: bool = False) -> pd.DataFrame: r"""Normalize a column based on controls, optionally within groups. Positive controls should represent the 0% signal, and negative controls should represent the 100% signal. If you set `flip = True`, then this is reversed. Calculations are performed within groups, such as batches or plates, indicated by the `group` column. This function takes the group-wise mean negative controls $\mu_n$ and, optionally, positive controls $\mu_p$. Then within each group calculates the normalized signal. Two methods are offered: - Normalized proportion of growth (NPG) Within each group calculates the normalized signal, $s$, of each measured datapoint, $m$: $$s = \\frac{m - \mu_p}{\mu_n - \mu_p}$$ If you set `flip = True`, then this equation is used instead: $$s = \\frac{m - \mu_n}{\mu_p - \mu_n}$$ Requires both positive and negative controls. - Proportion of negative (PON) Within each group calculates the normalized signal, $s$, of each measured datapoint, $m$: $$s = \\frac{m}{\mu_n}$$ If you set `flip = True`, then this equation is used instead: $$s = 1 - \\frac{m}{\mu_n}$$ Requires only negative controls. Parameters ---------- data : pandas.DataFrame Input dataframe. measurement_col : str Name of column containing raw data. control_col : str Name of column containing control indicators. neg : str Name of negative controls. method : str One of PON or NPG. Default PON. pos : str, optional Name of positive controls. group : str or list, optional Name of column containing the grouping variable, such as plates or batches. If not set, then entire the data is taken as one big group. flip : bool, optional Set positive controls as 100% signal, and negative controls as 0% signal. Returns ------- pandas.DataFrame Input data with additional columns, containing mean positive and negative control values (headers ending with "_neg_mean" and "_pos_mean") and normalized data values (header ending with "_norm"). Raises ------ KeyError If measurement_col is not in data. ValueError If neg or pos is not in data. TypeError If control_col is not a str column. Examples -------- >>> import pandas as pd >>> a = pd.DataFrame(dict(compound=['p', 'p', 'c1', 'c2', 'n', 'n'], ... m_abs_ch1=[.1, .2, .5, .4, .9, .8], ... abs_ch1_wavelength=['600nm'] * 6)) >>> a # doctest: +NORMALIZE_WHITESPACE compound m_abs_ch1 abs_ch1_wavelength 0 p 0.1 600nm 1 p 0.2 600nm 2 c1 0.5 600nm 3 c2 0.4 600nm 4 n 0.9 600nm 5 n 0.8 600nm >>> normalize(a, control_col='compound', pos='p', neg='n', measurement_col='m_abs_ch1') # doctest: +ELLIPSIS, +NORMALIZE_WHITESPACE, +SKIP compound m_abs_ch1 abs_ch1_wavelength m_abs_ch1_neg_mean m_abs_ch1_pos_mean m_abs_ch1_norm.pon 0 p 0.1 600nm 0.85 0.15 0.117647 1 p 0.2 600nm 0.85 0.15 0.235294 2 c1 0.5 600nm 0.85 0.15 0.588235 3 c2 0.4 600nm 0.85 0.15 0.470588 4 n 0.9 600nm 0.85 0.15 1.058824 5 n 0.8 600nm 0.85 0.15 0.941176 >>> normalize(a, control_col='compound', pos='p', neg='n', measurement_col='m_abs_ch1', flip=True) # doctest: +ELLIPSIS, +NORMALIZE_WHITESPACE, +SKIP compound m_abs_ch1 abs_ch1_wavelength m_abs_ch1_neg_mean m_abs_ch1_pos_mean m_abs_ch1_norm.pon 0 p 0.1 600nm 0.85 0.15 0.882353 1 p 0.2 600nm 0.85 0.15 0.764706 2 c1 0.5 600nm 0.85 0.15 0.411765 3 c2 0.4 600nm 0.85 0.15 0.529412 4 n 0.9 600nm 0.85 0.15 -0.058824 5 n 0.8 600nm 0.85 0.15 0.058824 >>> normalize(a, control_col='compound', pos='p', neg='n', measurement_col='m_abs_ch1', method='npg') # doctest: +ELLIPSIS, +NORMALIZE_WHITESPACE, +SKIP compound m_abs_ch1 abs_ch1_wavelength m_abs_ch1_neg_mean m_abs_ch1_pos_mean m_abs_ch1_norm.npg 0 p 0.1 600nm 0.85 0.15 -0.071429 1 p 0.2 600nm 0.85 0.15 0.071429 2 c1 0.5 600nm 0.85 0.15 0.500000 3 c2 0.4 600nm 0.85 0.15 0.357143 4 n 0.9 600nm 0.85 0.15 1.071429 5 n 0.8 600nm 0.85 0.15 0.928571 """ method = (method or 'pon').casefold() try: normalization_function = _NORMALIZATION_METHODS[method] except KeyError as e: raise AttributeError(f"Normalization method {method} is not supported.") if method == 'npg' and pos is None: raise AttributeError(f"Normalization method {method} requires positive controls.") control_mean_names, data = _get_grouped_control_means( data=data, measurement_col=measurement_col, control_col=control_col, neg=neg, pos=pos, group=group, ) return normalization_function( data=data, measurement_col=measurement_col, flip=flip, **control_mean_names, )