Source code for doespy.etl.steps.transformers

from abc import ABC, abstractmethod
from typing import Dict, Any, List

import pandas as pd
import inspect
import sys

from pydantic import BaseModel, validator

import pandas as pd

from doespy.etl.etl_util import expand_factors


class Transformer(BaseModel, ABC):

    name: str = None

    class Config:
        extra = "forbid"

    @validator("name", pre=True, always=True)
    def set_name(cls, value):
        return cls.__name__

    @abstractmethod
    def transform(self, df: pd.DataFrame, options: Dict) -> pd.DataFrame:

        # NOTE: Extending classes should not use the `options: Dict` and instead use instance variables for parameters

        pass


class DfTransformer(Transformer):

    def transform(self, df: pd.DataFrame, options: Dict) -> pd.DataFrame:

        # TODO [nku] here I could define the different things for the df.x -> at the moment unused
        pass

[docs]class ConditionalTransformer(Transformer): """ The `ConditionalTransformer` replaces the value in the ``dest`` column with a value from the ``value`` dict, if the value in the ``col`` column is equal to the key. .. code-block:: yaml :caption: Example ETL Pipeline Design $ETL$: transformers: - name: ConditionalTransformer: col: Country dest: Code value: Switzerland: CH Germany: DE Example .. container:: twocol .. container:: leftside ============ ==== Country Code ============ ==== Germany Switzerland France ============ ==== .. container:: middle |:arrow_right:| .. container:: rightside ============ ==== Country Code ============ ==== Germany DE Switzerland CH France ============ ==== """ col: str """Name of condition column in data frame.""" dest: str """Name of destination column in data frame.""" value: Dict[Any, Any] """Dictionary of replacement rules: The dict key is the entry in the condition ``col`` and the value is the replacement used in the ``dest`` column.""" def transform(self, df: pd.DataFrame, options: Dict) -> pd.DataFrame: col = self.col value = self.value dest = self.dest for cur, repl in value.items(): df.loc[df[col] == cur, dest] = repl return df
class RepAggTransformer(Transformer): r"""The `RepAggTransformer` aggregates over the repetitions of the same experiment run. GroupBy all columns of df except ``data_columns`` and ``rep`` column. Afterward, apply specified aggregate functions ``agg_functions`` on the ``data_columns``. :param ignore_columns: List of columns to ignore within group_by condition (apart from repetition column ``rep``), defaults to ``[]`` :param data_columns: The columns that contain the data to aggregate, see ``agg_function``. :param agg_functions: List of aggregate function to apply on ``data_columns``, defaults to ``["mean", "min", "max", "std", "count"]`` .. code-block:: yaml :caption: Example ETL Pipeline Design $ETL$: transformers: - name: RepAggTransformer: ignore_columns: [$CMD$] data_columns: [Lat] agg_functions: [mean] Example .. container:: twocol .. container:: leftside === ==== === ===== === Run ... Rep $CMD$ Lat === ==== === ===== === 0 0 xyz 0.1 0 1 xyz 0.3 1 0 xyz 0.5 1 1 xyz 0.5 === ==== === ===== === .. container:: middle |:arrow_right:| .. container:: rightside === ==== ======== Run ... Lat_mean === ==== ======== 0 0.2 1 0.5 === ==== ======== """ ignore_columns: List[str] = [] data_columns: List[str] agg_functions: List[str] = ["mean", "min", "max", "std", "count"] # TODO [nku] can we remove this transformer by unifying it with GroupByAggTransformer? -> I think we could remove this here and replace it only with the GroupByAggTransformer and include rep in Groupby cols def transform(self, df: pd.DataFrame, options: Dict) -> pd.DataFrame: if df.empty: return df data_columns = self.data_columns ignore_columns = self.ignore_columns agg_functions = self.agg_functions if not set(data_columns).issubset(df.columns.values): raise ValueError( "RepAggTransformer: ", f"data_columns={data_columns} must be in df_columns={df.columns.values}" ) # ensure that all data_columns are numbers df[data_columns] = df[data_columns].apply(pd.to_numeric) # we need to convert each column into a hashable type # (list and dict are converted to string) hashable_types = {} for col in df.columns: dtype = df[col].dtype if dtype == "object": hashable_types[col] = "str" else: hashable_types[col] = dtype df = df.astype(hashable_types) # group_by all except `rep` and `data_columns` group_by_cols = [ col for col in df.columns.values if col not in data_columns and col != "rep" and col not in ignore_columns ] agg_d = {data_col: agg_functions for data_col in data_columns} df = df.groupby(group_by_cols).agg(agg_d).reset_index() # flatten columns df.columns = ["_".join(v) if v[1] else v[0] for v in df.columns.values] return df
[docs]class GroupByAggTransformer(Transformer): """ The `GroupByAggTransformer` performs a group by followed by a set of aggregate functions applied to the ``data_columns``. .. code-block:: yaml :caption: Example ETL Pipeline Design $ETL$: transformers: - name: GroupByAggTransformer: groupby_columns: [Run, $FACTORS$] data_columns: [Lat] agg_functions: [mean] Example .. container:: twocol .. container:: leftside === ==== === ===== === Run ... Rep $CMD$ Lat === ==== === ===== === 0 0 xyz 0.1 0 1 xyz 0.3 1 0 xyz 0.5 1 1 xyz 0.5 === ==== === ===== === .. container:: middle |:arrow_right:| .. container:: rightside === ==== ======== Run ... Lat_mean === ==== ======== 0 0.2 1 0.5 === ==== ======== """ data_columns: List[str] """ The columns that contain the data to aggregate, see ``agg_function``.""" groupby_columns: List[str] """The columns to perform the group by. The list can contain the magic entry `$FACTORS$` that expands to all factors of the experiment. e.g., [exp_name, host_type, host_idx, $FACTORS$] would perform a group by of each run. """ agg_functions: List[str] = ["mean", "min", "max", "std", "count"] """List of aggregate function to apply on ``data_columns``""" custom_tail_length: int = 5 """"custom_tail" is a custom aggregation function that calculates the mean over the last `custom_tail_length` entries of a column.""" def custom_tail_build(self, custom_tail_length): def custom_tail(data): return data.tail(custom_tail_length).mean() return custom_tail def transform(self, df: pd.DataFrame, options: Dict) -> pd.DataFrame: if df.empty: return df data_columns = self.data_columns # here, we get factor_columns groupby_columns = expand_factors(df, self.groupby_columns) agg_functions = self.agg_functions # To configure size of the 'tail' to calculate the mean over custom_tail_length = options.get("custom_tail_length", 5) # custom agg functions custom_agg_methods_available = { "custom_tail": self.custom_tail_build(custom_tail_length) } for fun in agg_functions.copy(): for method, call in custom_agg_methods_available.items(): if fun == method: agg_functions.remove(fun) # is this allowed while in the loop? agg_functions.append(call) if not set(data_columns).issubset(df.columns.values): return df # raise ValueError(f"RepAggTransformer: data_columns={data_columns} # must be in df_columns={df.columns.values}") if not set(groupby_columns).issubset(df.columns.values): raise ValueError( f"GroupByAggTransformer: groupby_columns={groupby_columns} " f"must be in df_columns={df.columns.values}" ) # ensure that all data_columns are numbers df[data_columns] = df[data_columns].apply(pd.to_numeric) # we need to convert each column into a hashable type # (list and dict are converted to string) hashable_types = {} for col in df.columns: dtype = df[col].dtype if dtype == "object": hashable_types[col] = "str" else: hashable_types[col] = dtype df = df.astype(hashable_types) # group_by all except `rep` and `data_columns` group_by_cols = groupby_columns agg_d = {data_col: agg_functions for data_col in data_columns} df = df.groupby(group_by_cols, dropna=False).agg(agg_d).reset_index() # flatten columns df.columns = ["_".join(v) if v[1] else v[0] for v in df.columns.values] return df
class FilterColumnTransformer(Transformer): """ Simple transformer to filter rows out of a dataframe by column values. Accepts key-value pairs in `filters` option. This transformer is simple for now, only accepts discrete values and does not do any type handling. Options: - filters: dict of filters - allow_empty_result: bool whether to throw an error when the dataframe becomes empty as result of the filter. Defaults to False. """ filters: Dict[str, Any] = {} allow_empty_result: bool = False def transform(self, df: pd.DataFrame, options: Dict) -> pd.DataFrame: import warnings warnings.warn( """FilterColumnTransformer is deprecated, instead you can directly use df.query(col == 'A') in the etl definition i.e., transformers: [df.query: {expr: col == 'A'}]""", DeprecationWarning ) filters: dict[str, Any] = options.get("filters", {}) allow_empty_result: bool = options.get("allow_empty_result", False) if len(filters.keys()) == 0: return df if not set(filters.keys()).issubset(df.columns.values): raise ValueError( f"FilterColumnTransformer: filters={filters.keys()}", f"must be in df_columns={df.columns.values}" ) for key, value in filters.items(): df = df[df[key] == value] if df.empty and not allow_empty_result: raise ValueError( "FilterColumnTransformer: resulting dataframe after filters is empty!", "This is probably not supposed to happen" ) return df __all__ = [name for name, cl in inspect.getmembers(sys.modules[__name__], inspect.isclass) if name!="Transformer" and issubclass(cl, Transformer) ]