Source code for htstools.tables

"""Utilities for joining and pivoting tables."""

from typing import Dict, Iterable, List, Mapping, Optional, Tuple, Union
from functools import reduce

from carabiner import print_err
import numpy as np
import pandas as pd
from pandas import DataFrame

from .utils import row_col_to_well

_DUMMY_GROUP = "__group__"

def _join(
    left: DataFrame, 
    right: DataFrame,
    how: str = 'inner',
    sheet_name: Optional[str] = None
) -> DataFrame:
    
    left_cols, right_cols = left.columns.tolist(), right.columns.tolist()
    shared_cols = tuple(set(left_cols).intersection(right_cols))
    
    if len(shared_cols) == 0:
        raise AttributeError(
            'No shared columns for join.'
            f'\n\tLeft: {", ".join(left_cols)}'
            f'\n\tRight: {", ".join(right_cols)}'
            + (f'\n\tRight sheet name: {sheet_name}' 
            if sheet_name is not None else '')
        )

    try:
        data = pd.merge(
            left, 
            right, 
            how=how,
            on=shared_cols,
        )
    # probably joining on a type mismatch, usually int <-> str
    except ValueError as e:  
        col_types = {c: (left[c].dtype, right[c].dtype) 
                     for c in shared_cols}
        mismatches = [key for key, val in col_types.items() if val[0] != val[1]]
        print_err(f'{col_types=}')
        print_err(f'{mismatches=}')
        raise e
    else:    
        return shared_cols, data
    

def _join_reduce(
    how: str = 'inner'
) -> Tuple[Tuple[str], DataFrame]:
    
    def join_reduce(left, right):
        prev_shared_cols, left = left
        shared_cols, data = _join(left, right, how=how)
        return prev_shared_cols + (shared_cols, ), data
    
    return join_reduce


[docs] def join( left: DataFrame, right: Union[DataFrame, Dict[str, DataFrame]], how: str = 'inner') -> DataFrame: """Perform a database-stype join (merge) between two dataframes. This is simply a wrapper around `pandas.merge()` to catch errors and return the shared columns for joining. Parameters ---------- left : pandas.DataFrame Left dataframe. right : pandas.DataFrame or dict Right dataframe. If a dict, this should map a str to a pandas.DataFrame, as returned by pandas.read_excel() when reading multipel sheets. In this case, the sheets will be joined in order. how : str, optional Style of join: "inner", "outer", "left". "right". Default: "inner". Returns ------- Tuple[str, pandas.DataFrame] Shared column headers and joined dataframe. Raises ------ AttributeError When there are no shared columns. ValueError When attempting to join on columns of different types. This often happens when integers are stored as int in one dataframe and str in the other. NotImplementedError If anything other than a pd.DataFrame or a dictionary mapping to pd.DataFrame is provided to the `right` parameter. Examples -------- >>> import pandas as pd >>> a = pd.DataFrame(dict(column=['A', 'B', 'A', 'B'], abs=[.1, .2, .23, .11])) >>> a # doctest: +ELLIPSIS, +NORMALIZE_WHITESPACE column abs 0 A 0.10 1 B 0.20 2 A 0.23 3 B 0.11 >>> b = pd.DataFrame(dict(column=['B', 'A'], drug=['TMP', 'RIF'])) >>> b # doctest: +ELLIPSIS, +NORMALIZE_WHITESPACE column drug 0 B TMP 1 A RIF >>> shared_cols, data = join(a, b) >>> shared_cols ('column',) >>> data # doctest: +ELLIPSIS, +NORMALIZE_WHITESPACE +SKIP column abs drug 0 A 0.10 RIF 1 A 0.23 RIF 2 B 0.20 TMP 3 B 0.11 TMP """ if isinstance(right, DataFrame): return _join(left, right, how) elif isinstance(right, dict): dict_values = list(right.values()) if isinstance(dict_values[0], DataFrame): return reduce( _join_reduce(how=how), dict_values, (tuple(), left), ) else: raise NotImplementedError(f"Right table of type {type(right)} not supported.")
def _pivot_plate_df( df: DataFrame, value_name: str ) -> DataFrame: df = ( df .reset_index(names='row_id') .melt( id_vars='row_id', var_name='column_id', value_name=value_name ) .assign( well_id=lambda x: row_col_to_well(x['row_id'], x['column_id']), plate_id='', ) ) return df def _pivot_plate_excel( df: Dict[str, DataFrame], value_name: str ) -> DataFrame: dfs = ((_pivot_plate_df(sheet_data, value_name) .assign(plate_id=sheet_name)) for sheet_name, sheet_data in df.items()) return pd.concat(dfs, axis=0)
[docs] def pivot_plate( df: Union[DataFrame, Mapping[str, DataFrame]], value_name: str = 'value' ) -> DataFrame: """Pivot from a row x column plate format to a columnar format. Handy to convert a visual plate layout to a columnar format for data analysis. Parameters ---------- df : pandas.DataFrame, Dict[str, pandas.DataFrame] Either a dataframe containing rows labels as index and column labels as headings, or a dictionary of names mapping to such dataframes (as returned by `pandas.read_excel()`). value_name : str, optional The column heading to give the values within the plate. Default: "value". Returns ------- pandas.DataFrame Columnar dataframe containign data from df. Raises ------ ValueError If df is not a dataframe or a dictionary. Examples -------- >>> import pandas as pd >>> import numpy as np >>> a = pd.DataFrame(index=list("ABCDEFGH"), ... columns=range(1, 13), ... data=np.arange(1, 97).reshape(8, 12)) >>> a # doctest: +NORMALIZE_WHITESPACE 1 2 3 4 5 6 7 8 9 10 11 12 A 1 2 3 4 5 6 7 8 9 10 11 12 B 13 14 15 16 17 18 19 20 21 22 23 24 C 25 26 27 28 29 30 31 32 33 34 35 36 D 37 38 39 40 41 42 43 44 45 46 47 48 E 49 50 51 52 53 54 55 56 57 58 59 60 F 61 62 63 64 65 66 67 68 69 70 71 72 G 73 74 75 76 77 78 79 80 81 82 83 84 H 85 86 87 88 89 90 91 92 93 94 95 96 >>> pivot_plate(a, value_name="well_number") # doctest: +ELLIPSIS, +NORMALIZE_WHITESPACE row_id column_id well_number well_id plate_id 0 A 1 1 A01 1 B 1 13 B01 2 C 1 25 C01 3 D 1 37 D01 4 E 1 49 E01 .. ... ... ... ... ... 91 D 12 48 D12 92 E 12 60 E12 93 F 12 72 F12 94 G 12 84 G12 95 H 12 96 H12 <BLANKLINE> [96 rows x 5 columns] >>> pivot_plate({'sheet_1': a}, value_name="well_number") # doctest: +ELLIPSIS, +NORMALIZE_WHITESPACE row_id column_id well_number well_id plate_id 0 A 1 1 A01 sheet_1 1 B 1 13 B01 sheet_1 2 C 1 25 C01 sheet_1 3 D 1 37 D01 sheet_1 4 E 1 49 E01 sheet_1 .. ... ... ... ... ... 91 D 12 48 D12 sheet_1 92 E 12 60 E12 sheet_1 93 F 12 72 F12 sheet_1 94 G 12 84 G12 sheet_1 95 H 12 96 H12 sheet_1 <BLANKLINE> [96 rows x 5 columns] """ if isinstance(df, Mapping): return _pivot_plate_excel(df, value_name) elif isinstance(df, DataFrame): return _pivot_plate_df(df, value_name) else: raise ValueError(f"df is a {type(df)}, which is not supported")
def _replicator(x: DataFrame) -> DataFrame: idx = np.arange(x.shape[0]) np.random.shuffle(idx) return x.assign(replicate=idx + 1)
[docs] def replicate_table( data: DataFrame, group: Optional[Union[str, Iterable[str]]] = None, wide: Optional[str] = None ) -> DataFrame: """Annotate a dataframe with replicates within a group. Adds a column called "replicate" which contains integer labels randomly assigned within groups indicating repeated measurements of the same experiemntal condition. Parameters ---------- data : pandas.DataFrame Input dataframe. group : str or list Columns which indicate the grouping within which statistics should be calculated. These groups indicate repeated measurements of the same experiemntal condition. wide : str, optional If provided, returns a "wide" dataframe with replciate labels as column headings and the column name porived as values for the table. Returns ------- pd.DataFrame Dataframe with a new column "replicate" with labels randomly assigned within the group. If a column name is provided to wide, then the table as the replicate labels as columns and the values from that column as values. Raises ------ KeyError If wide is provided and not a column in the data. Examples -------- >>> import pandas as pd >>> a = pd.DataFrame(dict(group=['g1', 'g1', 'g2', 'g2'], ... control=['n', 'n', 'p', 'p'], ... m_abs_ch1=[.1, .2, .9, .8], ... abs_ch1_wavelength=['600nm'] * 4)) >>> a # doctest: +NORMALIZE_WHITESPACE group control m_abs_ch1 abs_ch1_wavelength 0 g1 n 0.1 600nm 1 g1 n 0.2 600nm 2 g2 p 0.9 600nm 3 g2 p 0.8 600nm >>> replicate_table(a, group='group') # doctest:+NORMALIZE_WHITESPACE, +SKIP group control m_abs_ch1 abs_ch1_wavelength replicate 0 g1 n 0.1 600nm 1 1 g1 n 0.2 600nm 2 2 g2 p 0.9 600nm 2 3 g2 p 0.8 600nm 1 >>> replicate_table(a, group='group', wide='m_abs_ch1') # doctest: +NORMALIZE_WHITESPACE, +SKIP replicate rep_1 rep_2 group g1 0.2 0.1 g2 0.8 0.9 """ if group is None: group = _DUMMY_GROUP data[group] = group data = ( data .groupby(group, group_keys=False) .apply(_replicator) ) if wide is not None: if wide not in data: raise KeyError(f"Wide column '{wide}' not in data.") data = pd.pivot_table( data.assign( replicate=lambda x: 'rep_' + x['replicate'].astype(str)), index=group, columns='replicate', values=wide, ) if group == _DUMMY_GROUP: data = data.drop(columns=[group]) return data