Source code for doespy.etl.steps.transformers
from abc import ABC, abstractmethod
from typing import Dict, Any, List
import pandas as pd
import inspect
import sys
from pydantic import ConfigDict, BaseModel, field_validator
import pandas as pd
from doespy.etl.etl_util import expand_factors
class Transformer(BaseModel, ABC):
name: str
model_config = ConfigDict(extra="forbid")
@field_validator("name", mode="before")
@classmethod
def set_name(cls, value):
return cls.__name__
@abstractmethod
def transform(self, df: pd.DataFrame, options: Dict) -> pd.DataFrame:
# NOTE: Extending classes should not use the `options: Dict` and instead use instance variables for parameters
pass
class DfTransformer(Transformer):
def transform(self, df: pd.DataFrame, options: Dict) -> pd.DataFrame:
# TODO [nku] here I could define the different things for the df.x -> at the moment unused
pass
[docs]class ConditionalTransformer(Transformer):
"""
The `ConditionalTransformer` replaces the value in the ``dest`` column with a
value from the ``value`` dict, if the value in the ``col`` column is equal to the key.
.. code-block:: yaml
:caption: Example ETL Pipeline Design
$ETL$:
transformers:
- name: ConditionalTransformer:
col: Country
dest: Code
value:
Switzerland: CH
Germany: DE
Example
.. container:: twocol
.. container:: leftside
============ ====
Country Code
============ ====
Germany
Switzerland
France
============ ====
.. container:: middle
|:arrow_right:|
.. container:: rightside
============ ====
Country Code
============ ====
Germany DE
Switzerland CH
France
============ ====
"""
col: str
"""Name of condition column in data frame."""
dest: str
"""Name of destination column in data frame."""
value: Dict[Any, Any]
"""Dictionary of replacement rules:
The dict key is the entry in the condition ``col`` and
the value is the replacement used in the ``dest`` column."""
def transform(self, df: pd.DataFrame, options: Dict) -> pd.DataFrame:
col = self.col
value = self.value
dest = self.dest
for cur, repl in value.items():
df.loc[df[col] == cur, dest] = repl
return df
class RepAggTransformer(Transformer):
r"""The `RepAggTransformer` aggregates over the repetitions of the same experiment run.
GroupBy all columns of df except ``data_columns`` and ``rep`` column.
Afterward, apply specified aggregate functions ``agg_functions`` on the ``data_columns``.
:param ignore_columns: List of columns to ignore within group_by condition (apart from repetition column ``rep``), defaults to ``[]``
:param data_columns: The columns that contain the data to aggregate, see ``agg_function``.
:param agg_functions: List of aggregate function to apply on ``data_columns``, defaults to ``["mean", "min", "max", "std", "count"]``
.. code-block:: yaml
:caption: Example ETL Pipeline Design
$ETL$:
transformers:
- name: RepAggTransformer:
ignore_columns: [$CMD$]
data_columns: [Lat]
agg_functions: [mean]
Example
.. container:: twocol
.. container:: leftside
=== ==== === ===== ===
Run ... Rep $CMD$ Lat
=== ==== === ===== ===
0 0 xyz 0.1
0 1 xyz 0.3
1 0 xyz 0.5
1 1 xyz 0.5
=== ==== === ===== ===
.. container:: middle
|:arrow_right:|
.. container:: rightside
=== ==== ========
Run ... Lat_mean
=== ==== ========
0 0.2
1 0.5
=== ==== ========
"""
ignore_columns: List[str] = []
data_columns: List[str]
agg_functions: List[str] = ["mean", "min", "max", "std", "count"]
# TODO [nku] can we remove this transformer by unifying it with GroupByAggTransformer? -> I think we could remove this here and replace it only with the GroupByAggTransformer and include rep in Groupby cols
def transform(self, df: pd.DataFrame, options: Dict) -> pd.DataFrame:
if df.empty:
return df
data_columns = self.data_columns
ignore_columns = self.ignore_columns
agg_functions = self.agg_functions
if not set(data_columns).issubset(df.columns.values):
raise ValueError(
"RepAggTransformer: ",
f"data_columns={data_columns} must be in df_columns={df.columns.values}"
)
# ensure that all data_columns are numbers
df[data_columns] = df[data_columns].apply(pd.to_numeric)
# we need to convert each column into a hashable type
# (list and dict are converted to string)
hashable_types = {}
for col in df.columns:
dtype = df[col].dtype
if dtype == "object":
hashable_types[col] = "str"
else:
hashable_types[col] = dtype
df = df.astype(hashable_types)
# group_by all except `rep` and `data_columns`
group_by_cols = [
col
for col in df.columns.values
if col not in data_columns and col != "rep" and col not in ignore_columns
]
agg_d = {data_col: agg_functions for data_col in data_columns}
df = df.groupby(group_by_cols).agg(agg_d).reset_index()
# flatten columns
df.columns = ["_".join(v) if v[1] else v[0] for v in df.columns.values]
return df
[docs]class GroupByAggTransformer(Transformer):
"""
The `GroupByAggTransformer` performs a group by followed
by a set of aggregate functions applied to the ``data_columns``.
.. code-block:: yaml
:caption: Example ETL Pipeline Design
$ETL$:
transformers:
- name: GroupByAggTransformer:
groupby_columns: [Run, $FACTORS$]
data_columns: [Lat]
agg_functions: [mean]
Example
.. container:: twocol
.. container:: leftside
=== ==== === ===== ===
Run ... Rep $CMD$ Lat
=== ==== === ===== ===
0 0 xyz 0.1
0 1 xyz 0.3
1 0 xyz 0.5
1 1 xyz 0.5
=== ==== === ===== ===
.. container:: middle
|:arrow_right:|
.. container:: rightside
=== ==== ========
Run ... Lat_mean
=== ==== ========
0 0.2
1 0.5
=== ==== ========
"""
data_columns: List[str]
""" The columns that contain the data to aggregate, see ``agg_function``."""
groupby_columns: List[str]
"""The columns to perform the group by.
The list can contain the magic entry `$FACTORS$` that expands to all factors of the experiment.
e.g., [exp_name, host_type, host_idx, $FACTORS$] would perform a group by of each run.
"""
agg_functions: List[str] = ["mean", "min", "max", "std", "count"]
"""List of aggregate function to apply on ``data_columns``"""
custom_tail_length: int = 5
""""custom_tail" is a custom aggregation function that calculates the mean over the last `custom_tail_length` entries of a column."""
def custom_tail_build(self, custom_tail_length):
def custom_tail(data):
return data.tail(custom_tail_length).mean()
return custom_tail
def transform(self, df: pd.DataFrame, options: Dict) -> pd.DataFrame:
if df.empty:
return df
data_columns = self.data_columns
# here, we get factor_columns
groupby_columns = expand_factors(df, self.groupby_columns)
agg_functions = self.agg_functions
# To configure size of the 'tail' to calculate the mean over
custom_tail_length = options.get("custom_tail_length", 5)
# custom agg functions
custom_agg_methods_available = {
"custom_tail": self.custom_tail_build(custom_tail_length)
}
for fun in agg_functions.copy():
for method, call in custom_agg_methods_available.items():
if fun == method:
agg_functions.remove(fun) # is this allowed while in the loop?
agg_functions.append(call)
if not set(data_columns).issubset(df.columns.values):
return df
# raise ValueError(f"RepAggTransformer: data_columns={data_columns}
# must be in df_columns={df.columns.values}")
if not set(groupby_columns).issubset(df.columns.values):
raise ValueError(
f"GroupByAggTransformer: groupby_columns={groupby_columns} "
f"must be in df_columns={df.columns.values}"
)
# ensure that all data_columns are numbers
df[data_columns] = df[data_columns].apply(pd.to_numeric)
# we need to convert each column into a hashable type
# (list and dict are converted to string)
hashable_types = {}
for col in df.columns:
dtype = df[col].dtype
if dtype == "object":
hashable_types[col] = "str"
else:
hashable_types[col] = dtype
df = df.astype(hashable_types)
# group_by all except `rep` and `data_columns`
group_by_cols = groupby_columns
agg_d = {data_col: agg_functions for data_col in data_columns}
df = df.groupby(group_by_cols, dropna=False).agg(agg_d).reset_index()
# flatten columns
df.columns = ["_".join(v) if v[1] else v[0] for v in df.columns.values]
return df
class FilterColumnTransformer(Transformer):
"""
Simple transformer to filter rows out of a dataframe by column values.
Accepts key-value pairs in `filters` option.
This transformer is simple for now, only accepts discrete values and
does not do any type handling.
Options:
- filters: dict of filters
- allow_empty_result: bool whether to throw an error when the dataframe
becomes empty as result of the filter.
Defaults to False.
"""
filters: Dict[str, Any] = {}
allow_empty_result: bool = False
def transform(self, df: pd.DataFrame, options: Dict) -> pd.DataFrame:
import warnings
warnings.warn(
"""FilterColumnTransformer is deprecated, instead you can directly use
df.query(col == 'A') in the etl definition
i.e., transformers: [df.query: {expr: col == 'A'}]""",
DeprecationWarning
)
filters: dict[str, Any] = options.get("filters", {})
allow_empty_result: bool = options.get("allow_empty_result", False)
if len(filters.keys()) == 0:
return df
if not set(filters.keys()).issubset(df.columns.values):
raise ValueError(
f"FilterColumnTransformer: filters={filters.keys()}",
f"must be in df_columns={df.columns.values}"
)
for key, value in filters.items():
df = df[df[key] == value]
if df.empty and not allow_empty_result:
raise ValueError(
"FilterColumnTransformer: resulting dataframe after filters is empty!",
"This is probably not supposed to happen"
)
return df
__all__ = [name for name, cl in inspect.getmembers(sys.modules[__name__], inspect.isclass) if name!="Transformer" and issubclass(cl, Transformer) ]