Source code for htstools.io

"""Utilities for reading files from platereaders and writing columnar data."""

from typing import Dict, IO, Iterable, List, Optional, TextIO, Tuple, Union
from collections import defaultdict, namedtuple
import csv
from datetime import datetime
from functools import partial
from io import StringIO, TextIOBase, TextIOWrapper
from itertools import dropwhile
import os
from string import ascii_uppercase

from carabiner import cast, print_err
from carabiner.pd import sniff
from openpyxl import load_workbook
import pandas as pd
import numpy as np

from .utils import row_col_to_well

_FORMAT = ('\t', ',', 'xlsx')

BiotekData = namedtuple(
    'BiotekData',
    ('Meta', 'Procedure_Details', 'Layout', 'Results'),
    defaults=[None for _ in range(4)],
)

def _get_sheet_values(sheet) -> List[str]:
    s = []
    for row in sheet: 
        values = [r.value or '' for r in row]
        values = [r.isoformat() if isinstance(r, datetime) 
                  else r 
                  for r in values]
        s.append(values)
    return s


def _read_xlsx(file: TextIOWrapper) -> Dict[str, List[str]]:
    basename = os.path.basename(file.name)
    return {
        f'xlsx:{basename}:{sheet.title}': _get_sheet_values(sheet) 
        for sheet in load_workbook(file.name, data_only=True, read_only=True)
    }


def _read_delim(
    file: TextIOWrapper, 
    delimiter: str = None
) -> Dict[str, Iterable[Iterable]]:
    basename = os.path.basename(file.name)
    delimiter = sniff(file, default=delimiter)
    if delimiter is not None:
        delimiter = delimiter.in_delim
    return {f'delim:{basename}:': csv.reader(file, delimiter=delimiter)}


def _biotek_extract(x: Iterable) -> BiotekData:

    data = BiotekData(**{
        section: defaultdict(list) for section in BiotekData._fields
    })

    section, subsection = 'Meta', ''
    active_xlsx_hacks = False

    for row in x: 
        # skip empty rows
        try:
            row0 = row[0]  
        except IndexError:
            continue
        else:
            row0esc = row0.replace(' ', '_')  # to escape spaces
        
        all_none = all(len(str(item)) == 0 for item in row)
        n_skipped = 0  # count number of skipped rows since section

        # First column has section (sub)headings. If there is 
        # section heading in the first column, make it the 
        # current heading.
        if row0esc in data._fields:
            section, subsection = row0esc, 'main'
            n_skipped = 0
        
        # Otherwise, if the first column of the row has data, 
        # set this as the subsection and append the rest of
        # the row as data
        elif len(row0) > 0:

            subsection = row0.rstrip(':')
            
            if subsection not in getattr(data, section):

                subsection_n = 1 

            else:
                # handles repeated subsection names without overwriting
                subsection_n += 1  
                subsection = f'{subsection.split("_")[0]}_{subsection_n}'
                
            getattr(data, section)[subsection].append(row[1:])

        # Otherwise, append column 2 onwards as data
        elif not all_none:

            # hacky way to deal with XLSX export putting Actual Temperature 
            # in results section
            act_temp_subsection = subsection.startswith('Actual Temperature') 
            
            if section == 'Results':

                if act_temp_subsection:
                    active_xlsx_hacks = True
                
                if active_xlsx_hacks:
                    if (act_temp_subsection and 
                        (len(row[1]) == 0 or row[1] == 'Well ID')):
                        subsection = 'main'
                        row = [''] + list(dropwhile(lambda x: x == '', row))
                    elif (not act_temp_subsection
                        and row0 == '' 
                        and row[1] != '' and row[1] in ascii_uppercase):
                        subsection = row[1]
                        # row = row[1:]
                        
                    if subsection in ascii_uppercase:
                        if row[1] != '':
                            row = row[1:]
                        else:
                            row = row[1:]

                # print(active_xlsx_hacks, section, subsection, row)

            getattr(data, section)[subsection].append(row[1:])

        elif all_none:
            n_skipped += 1

    # more hacks to deal with XLSX export putting Actual Temperature 
    # in results section
    if active_xlsx_hacks:
        act_temps_in_results = tuple(key for key in data.Results 
                                    if key.startswith('Actual Temperature'))
        for act_temp in act_temps_in_results:
            data.Procedure_Details[act_temp] = data.Results[act_temp][:1]
            del data.Results[act_temp]

    return data


def _biotek_common(
    df: pd.DataFrame,
    data: BiotekData,
    filename: str
) -> pd.DataFrame:
    
    df = df.assign(
        plate_id=data.Meta['Plate Number'][0][0],
        data_filename=filename.split(':')[1],
        data_sheet=filename.split(':')[2],
    )

    for heading in ('Meta', 'Procedure_Details'):
        for subsection, values in getattr(data, heading).items():
            h = 'meta_' + subsection.casefold().strip().replace(' ', '_')
            df[h] = ';'.join(map(lambda x: str(x[0]).strip(), values))

    return df

     
def _parse_read_name(
    read_name: str, 
    abs_count: int = 0, 
    fluor_count: int = 0
) -> Tuple[str, str, int, int]:
    
    wavelengths = read_name.split(':')[-1].split(',')

    if len(wavelengths) == 1:
        abs_count += 1
        wv = f'{wavelengths[0]}nm'
        read_type = f'abs_ch{abs_count}'
    elif len(wavelengths) == 2:
        fluor_count += 1
        wv = f'ex:{wavelengths[0]}nm;em:{wavelengths[1]}nm'
        read_type = f'fluor_ch{fluor_count}'

    return read_type, wv, abs_count, fluor_count


def _biotek_plate(
    data: BiotekData,
    filename: str,
    measurement_prefix: str = 'measured_',
    overflow_value: str = "OVRFLW"
) -> Tuple[pd.DataFrame, Dict[str, set]]:
    
    df, read_types = defaultdict(list), defaultdict(set)

    row_ids = [subsection for subsection in data.Results if subsection != 'main']
    columns = [col for col in data.Results['main'][0] if col != '']
    n_cols = len(columns)

    for subsection, values in data.Results.items():
        if subsection in row_ids:
            these_rows = ([subsection] * n_cols)

            df['row_id'] += these_rows
            df['column_id'] += columns
            df['well_id'] += row_col_to_well(these_rows, columns).tolist()

            abs_count, fluor_count = 0, 0
            for value in values:
                (read_type, wv, abs_count, fluor_count) = _parse_read_name(
                    value[-1],  # very last col gives wavelengths
                    abs_count,
                    fluor_count,
                )
                read_types[read_type].add(wv)
                df[measurement_prefix + read_type] += value[:-1]
                df[read_type + '_wavelength'] += ([wv] * n_cols)

    col_lengths = {col: len(val) for col, val in df.items()}
    if not len(set(col_lengths.values())) == 1:
        raise AttributeError(
            f"ERROR: Not all columns are the same length: {col_lengths}"
        )
    
    df = pd.DataFrame(df)
    for col in df:
        if col.startswith(measurement_prefix):
            # replace Biotek "OVRFLW" with NaN
            df[col] = np.where(df[col] == overflow_value, np.nan, df[col]).astype(float)
    df = _biotek_common(df, data, filename)

    return df, read_types


def _biotek_row(
    data: BiotekData,
    filename: str,
    measurement_prefix: str = 'measured_'
) -> Tuple[pd.DataFrame, Dict[str, set]]:
    
    read_types = defaultdict(set)

    # Row-wise XLSX data has a subheading Actual Temperature before 
    # the actual data table, hence this awkward naming
    # data_str = '\n'.join(','.join(str(item) for item in row) 
    #                      for row in data.Results['Actual Temperature'][1:])
    # print(data.Results)
    data_str = '\n'.join(
        ','.join(str(item) for item in row) for row in data.Results['main']
    )
    # print(data_str)
    df = (
        pd.read_csv(StringIO(data_str))
        .rename(columns={
        'Well': 'well_id',
        'Well ID': 'well_name'
        })
        .assign(
            row_id=lambda x: x['well_id'].str.slice(stop=1),
            column_id=lambda x: x['well_id'].str.slice(start=1).astype(int)
        )
    )

    abs_count, fluor_count = 0, 0
    cols_to_drop = [col for col in df if col.startswith('Unnamed')]
    
    for header in df:

        if not header.startswith('Unnamed') and ':' in header:  # this is a data column

            (read_type, wv, 
             abs_count, fluor_count) = _parse_read_name(
                header,
                abs_count,
                fluor_count,
            )

            read_types[read_type].add(wv)
            df[measurement_prefix + read_type] = df[header].copy()
            df[read_type + '_wavelength'] = wv
            cols_to_drop.append(header)

    df = df.drop(columns=cols_to_drop)
    df = _biotek_common(df, data, filename)

    return df, read_types


def _from_platereader(
    file: Union[IO, str],
    shape: str,
    vendor: str,
    delimiter: Optional[str] = None,
    measurement_prefix: str = 'measured_'
) -> dict:

    delimiter = sniff(file, default=delimiter)
    filename = cast(file, to=str)
    file = cast(file, to=TextIOWrapper)
    if delimiter.in_delim == 'xlsx':
        data_handle = _read_xlsx(file)
    elif delimiter.in_delim in _FORMAT:
        data_handle = _read_delim(file, delimiter=delimiter)
    else:
        raise NotImplementedError(f'File format "{format}" not supported.')
    
    if vendor == 'Biotek':
        extracted_data = {
            n: _biotek_extract(d) for n, d in data_handle.items()
        }
        if shape == 'row':
            return {
                n: _biotek_row(
                    d, 
                    filename=n, 
                    measurement_prefix=measurement_prefix,
                ) for n, d in extracted_data.items()
            }
        elif shape == 'plate':
            return {
                n: _biotek_plate(
                    d, 
                    filename=n, 
                    measurement_prefix=measurement_prefix,
                ) for n, d in extracted_data.items()
            }
        
    raise NotImplementedError(
        f'Cannot read platereader file type: {filename=}\n\t{vendor=}, {shape=}'
    )


[docs] def from_platereader(file: Union[IO, str, Iterable[Union[IO, str]]], shape: str, vendor: str, delimiter: Optional[str] = None, measurement_prefix: str = 'measured_') -> pd.DataFrame: """Convert raw platereader files to columnar format. Initially only supports Biotek platereaders. Loads files exported from platereader software according to the parameters, and does the necessary parsing to extract metadata and measured values into a Pandas DataFrame, with one row per well and each column corresponding to a variable such as a measurement or metadata. Measurement columns can be identified by the `measurement_prefix` value, and wavelengths are annotated in columns starting with "fluor" or "abs" and ending with "_wavelength". Parameters ---------- file : str, file-like, or list File to parse. Must be CSV, TSV, or XLSX format. shape : str "plate" or "row", idicating whether the data are in a plate format or row-wise table format. vendor : str Platereader manufacturer. Currently only "Biotek" is implemented. delimiter : str Override inference of file format with this delimiter. Must be either ",", "\\t", or "xlsx" (to enforce XLSX parsing). measurement_prefix : str The prefix to add to columns containing raw measured variables. Returns ------- pandas.DataFrame Parsed data. Raises ------ NotImplementedError Where the indicated platereader export format is not yet supported. """ file = cast(file, to=list) parser = partial( _from_platereader, shape=shape, vendor=vendor, delimiter=delimiter, measurement_prefix=measurement_prefix, ) parsed_files = tuple(parser(f) for f in file) return pd.concat(( df for parsed_file in parsed_files for _, (df, _) in parsed_file.items() ), axis=0)