Source code for utils

import pandas as pd
import altair as alt
import polars as pl
import numpy as np
import polars.selectors as cs

alt.data_transformers.enable("vegafusion")

WIDE_COLUMNS = ["start", "end", "qual"]
M6A_COLOR = "#800080"
M5C_COLOR = "#8B4513"
NUC_COLOR = "#A9A9A9"
MSP_COLOR = "#9370db"


[docs]def empty_data_dict(): return { "chrom": [], "fiber_start": [], "fiber_end": [], "fiber_name": [], "strand": [], "type": [], "start": [], "end": [], "qual": [], }
[docs]def split_to_ints(df, col, sep=",", trim=True): """Split a columns with list of ints separated by "sep" into a numpy array quickly. Args: df (dataframe): dataframe that is like a bed12 file. col (str): column name within the dataframe to split up. sep (str, optional): Defaults to ",". trim (bool, optional): Remove the first and last call from the bed12 (removes bookendings). Defaults to True. Returns: column: New column that is a list of numpy array of ints. """ if trim: return df[col].map_elements( lambda x: np.fromstring(x, sep=sep, dtype=np.int32)[1:-1], return_dtype=pl.datatypes.List(pl.datatypes.Int32), ) return df[col].map_elements( lambda x: np.fromstring(x, sep=sep, dtype=np.int32), return_dtype=pl.datatypes.List(pl.datatypes.Int32), )
[docs]def read_extract_all(f: str, pandas=False, n_rows=None, long=False): """Read a table made with fibertools-rs. Specifically `ft extract --all`. Args: f (str): File path to the table. Can be compressed. Returns: pl.DataFrame: Dataframe of the table. """ cols_with_lists = [ "nuc_starts", "nuc_lengths", "ref_nuc_starts", "ref_nuc_lengths", "msp_starts", "msp_lengths", "fire", "ref_msp_starts", "ref_msp_lengths", "m6a", "ref_m6a", "m6a_qual", "5mC", "ref_5mC", "5mC_qual", ] df = pl.read_csv( f, separator="\t", n_rows=n_rows, null_values=["."], comment_prefix=None, ) # clean up comment char df.columns = list(map(lambda x: x.strip("#"), df.columns)) if df.shape[0] > 0: for col in cols_with_lists: col_index = df.columns.index(col) df.replace_column(col_index, split_to_ints(df, col, trim=False)) if long: explode_sets = { "m6a":["m6a", "ref_m6a", "m6a_qual"], "5mC": ["5mC", "ref_5mC", "5mC_qual"], "msp": ["msp_starts", "msp_lengths", "ref_msp_starts", "ref_msp_lengths", "fire"], "nuc": ["nuc_starts", "nuc_lengths", "ref_nuc_starts", "ref_nuc_lengths"], } def my_rename(col): if col == "m6a" or col == "5mC" or col == "msp_starts" or col == "nuc_starts": return "long_start" if col == "ref_m6a" or col == "ref_5mC" or col == "ref_msp_starts" or col == "ref_nuc_starts": return "long_ref_start" if col == "m6a_qual" or col == "5mC_qual" or col == "fire": return "long_qual" if col == "msp_lengths" or col == "nuc_lengths": return "zlength" if col == "ref_msp_lengths" or col == "ref_nuc_lengths": return "ref_zlength" else: return col all_explode_cols = [s for _t, sublist in explode_sets.items() for s in sublist] dfs = [] for cur_type, s in explode_sets.items(): not_in_s = [x for x in all_explode_cols if x not in s] tdf = df.drop(not_in_s).explode(s).with_columns( pl.lit(cur_type).alias("type") ).rename(my_rename) if cur_type == "m6a" or cur_type == "5mC": tdf = tdf.with_columns( zlength=1, ref_zlength=1, ) if cur_type == "nuc": tdf = tdf.with_columns( long_qual=0, ) tdf = tdf.with_columns( long_end=pl.col("long_start") + pl.col("zlength"), long_ref_end=pl.col("long_ref_start") + pl.col("ref_zlength"), ).drop( cs.contains("zlength") ) tdf=tdf.select(sorted(tdf.columns)) dfs.append(tdf) # concat all of the dataframes df = pl.concat(dfs) if pandas: df = pd.DataFrame(df.to_dicts()) return df
[docs]def footprint_code_to_vec(footprint_code, n_modules): """ Convert a footprint code to a boolean vector of length n_modules """ rtn = [] for i in range(1, n_modules + 1): val = (footprint_code & (1 << i)) > 0 rtn.append(val) return rtn
[docs]def read_footprint_table(f, long=False): """ Read a ft-footprint bed into a pandas dataframe """ wide_cols = ["footprint_codes", "fire_quals", "fiber_names"] df = pd.read_csv(f, sep="\t") # filter out columns that do not contain spanning fibers df = df[df["n_spanning_fibers"] > 0] # split the wide columns for col in wide_cols: df[col] = df[col].str.split(",") if col != "fiber_names": df[col] = df[col].apply(lambda x: [int(i) for i in x]) # infer the number of modules form the column names n_modules = df.columns.str.contains("module:").sum() # save the names of the module columns to a list module_names = df.columns[df.columns.str.contains("module:")] df["n_modules"] = n_modules if long: df = df.explode(wide_cols) df["has_spanning_msp"] = df["footprint_codes"].apply( lambda x: (x & (1 << 0)) > 0 ) df["footprinted_modules"] = df["footprint_codes"].apply( lambda x: footprint_code_to_vec(x, n_modules) ) # drop the module columns df.drop( df.columns[df.columns.str.contains("module_")], axis=1, inplace=True, ) # split footprinted_modules into separate columns df[module_names] = pd.DataFrame( df["footprinted_modules"].tolist(), index=df.index ) df.drop("footprinted_modules", axis=1, inplace=True) # rename fiber_names to fiber_name df.rename( columns={"fiber_names": "fiber_name", "fire_quals": "fire_qual"}, inplace=True, ) # get the right int columns df = df.infer_objects() df.rename( columns={"#chrom": "chrom", "start": "motif_start", "end": "motif_end"}, inplace=True, ) return df
[docs]def read_and_center_footprint_table(f): """ Read a ft-footprint bed into a pandas dataframe and reformat the data to reflect ft-center output """ df = read_footprint_table(f, long=True) # add how many modules are present in each row module_columns = [c for c in df.columns if c.startswith("module:")] df["n_footprints"] = df[module_columns].sum(axis=1) df["first_footprint"] = df[module_columns].idxmax(axis=1) # example of reading in a footprinting table not_module_columns = [col for col in df.columns if not col.startswith("module:")] dfm = df.melt( ignore_index=False, id_vars=not_module_columns, value_name="footprinted" ).reset_index() dfm[["module", "start", "end"]] = dfm["variable"].str.split(":|-", n=2, expand=True) dfm["start"] = dfm["start"].astype(int) dfm["end"] = dfm["end"].astype(int) dfm = dfm.infer_objects() # rename the columns to match the centering output style dfm["centering_position"] = dfm["motif_start"] # swap start and end if the footprint is on the reverse strand dfm.loc[dfm.strand == "-", "centering_position"] = ( dfm.loc[dfm.strand == "-", "motif_end"] - 1 ) dfm["centering_strand"] = dfm["strand"] dfm["type"] = "not-footprinted" dfm.loc[dfm.has_spanning_msp & dfm.footprinted, "type"] = "footprinted" dfm.drop( columns=[ "module", "variable", "n_modules", "index", "first_footprint", "n_footprints", "n_spanning_fibers", "n_spanning_msps", "n_overlapping_nucs", ], inplace=True, axis=1, ) return dfm
[docs]def read_center_table(f): """ Read a ft-center bed into a pandas dataframe """ df = pd.read_csv(f, sep="\t") df = df.infer_objects() return df
def _add_fiber_to_data_dict( fiber, data_dict, ): # sub function to add fiber data to a dictionary def add_standard_columns(): data_dict["chrom"].append(fiber.chrom) data_dict["fiber_start"].append(fiber.start) data_dict["fiber_end"].append(fiber.end) data_dict["strand"].append(fiber.strand) data_dict["fiber_name"].append(fiber.qname) def base_mod_end(list_of_starts): # add one to the vaules as long as they are not None return [x + 1 if x is not None else None for x in list_of_starts] # for msp add_standard_columns() data_dict["type"].append("msp") data_dict["start"].append(fiber.msp.reference_starts) data_dict["end"].append(fiber.msp.reference_ends) data_dict["qual"].append(fiber.msp.qual) # for nuc add_standard_columns() data_dict["type"].append("nuc") data_dict["start"].append(fiber.nuc.reference_starts) data_dict["end"].append(fiber.nuc.reference_ends) data_dict["qual"].append(fiber.nuc.qual) # for m6a add_standard_columns() data_dict["type"].append("m6a") data_dict["start"].append(fiber.m6a.reference_starts) data_dict["end"].append(fiber.m6a.get_reference_ends()) data_dict["qual"].append(fiber.m6a.ml) # for 5mC add_standard_columns() data_dict["type"].append("5mC") data_dict["start"].append(fiber.cpg.reference_starts) data_dict["end"].append(fiber.cpg.get_reference_ends()) data_dict["qual"].append(fiber.cpg.ml)
[docs]def region_to_centered_df( fiberbam, region, strand="+", max_flank=None, min_basemod_qual=125 ): """ Takes a fiberbam and a region and returns a pandas dataframe with reference centered positions """ data_dict = empty_data_dict() for fiber in fiberbam.center( region[0], start=region[1], end=region[2], strand=strand ): _add_fiber_to_data_dict(fiber, data_dict) df = pd.DataFrame.from_dict(data_dict).explode(WIDE_COLUMNS) df["centering_position"] = region[1] if strand == "-": df["centering_position"] = region[2] - 1 df["centering_strand"] = strand # trim the dataframe to only include fibers that overlap if max_flank is not None: df = df[(df.start < +max_flank) & (df.end > -max_flank)] is_basemod = df.type.isin(["m6a", "5mC"]) df = df[~(is_basemod & (df.qual < min_basemod_qual))] return df
[docs]def region_to_df(fiberbam, region, min_basemod_qual=125): """ Takes a fiberbam and a region and returns a pandas dataframe with fibers that overlap the region """ data_dict = empty_data_dict() for fiber in fiberbam.fetch( region[0], start=region[1], end=region[2], ): _add_fiber_to_data_dict(fiber, data_dict) df = pd.DataFrame.from_dict(data_dict).explode(WIDE_COLUMNS) is_basemod = df.type.isin(["m6a", "5mC"]) df = df[~(is_basemod & (df.qual < min_basemod_qual))] return df