Tutorial¶
We assume that you already have a project repo where you want to integrate the doe-suite, otherwise you can create a directory and init a new repository:
mkdir my-project
cd my-project
git init
Afterward, follow the installation instructions to integrate the doe-suite in your project as a submodule.
At the end of the installation section, you can run the suite designs from the demo_project
.
To use the suite in your project, you need to change the environment variable DOES_PROJECT_DIR
from pointing to the demo_project
to your project.
As a summary, these environment variables should now be set:
# path to your project
export DOES_PROJECT_DIR=<PATH-TO-YOUR-PROJECT>
# your unique shortname, e.g., nku
export DOES_PROJECT_ID_SUFFIX=<SUFFIX>
# choose default cloud: aws, euler
export DOES_CLOUD=<DEFAULT-CLOUD>
# for AWS
export DOES_SSH_KEY_NAME=<YOUR-PRIVATE-SSH-KEY-FOR-AWS>
export DOES_AWS_USER=<SSH-USERNAME>
# for Euler
export DOES_EULER_USER=<YOUR-NETHZ>
# for Docker
export DOES_DOCKER_USER=<SSH-USERNAME> # [optional] defaults to ubuntu
export DOES_DOCKER_SSH_PUBLIC_KEY=<SSH-PUBLIC-KEY> # e.g., ~/.ssh/id_rsa.pub
export DOCKER_HOST=<DOCKER-HOST> # [optional] defaults to unix://var/run/docker.sock
Tip
Direnv allows project-specific env vars in an .envrc file.
Move to the doe-suite
repository and create your project-specific doe-suite-config
with the help of a Cookiecutter project template.
For a start, it should be fine to accept default values except for the repo you should use the repo of your project instead of the doe-suite
repo.
cd doe-suite
make new
Note that make new
first checks whether there is already a doe-suite-config
in the DOES_PROJECT_DIR
.
Project Layout¶
Your project repository should have a folder doe-suite-config
which contains a skeleton of the whole configuration.
Overall, the DoE - Suite adds three top level folders to a project repository: doe-suite
, doe-suite-config
, and doe-suite-results
.
The doe-suite
folder is the doe-suite repo as a submodule.
The doe-suite-config
folder contains the whole configuration of how to run experiments + project specific extensions of the suite.
In the doe-suite-results
folder all the result files are stored.
The complete folder structure for a project looks as follows:
<your-project-repository>
├── doe-suite # the doe-suite repo as a submodule
├── doe-suite-config
│ ├── designs # experiment suite designs
│ ├── does_etl_custom # custom steps for processing results
│ ├── group_vars # host type config (e.g.,instance type)
│ ├── inventory # manual inventories for ansible
│ ├── roles # setup host types setup roles
│ ├── super_etl # multi suite results processing
| └── pyproject.toml
├── doe-suite-results # all experiment results
│ └── ...
└── ... # your project files
<your-project-repository>
├── doe-suite # the doe-suite repo as a submodule
├── doe-suite-config
│ ├── designs # experiment suite designs
│ │ ├── design_vars # shared variables
│ │ │ └── <vars>.yml
│ │ ├── etl_templates # shared results processing
│ │ │ └── <etl>.yml
│ │ ├── <suite1>.yml
│ │ └── <suite2>.yml
│ ├── does_etl_custom # custom steps for processing results
│ │ └── <steps>.py
│ ├── group_vars # host type config (e.g., instance type)
│ │ ├── all # shared config
│ │ │ └── main.yml
│ │ ├── <host-type1>
│ │ │ └── main.yml
│ │ └── <host-type2>
│ │ └── main.yml
| ├── inventory # manual inventories for ansible (custom clouds)
| | ├── euler.yml # euler cloud is implemented as inventory
| | ├── <cloud-inventory1>.yml
| | └── <cloud-inventory2>.yml
│ ├── roles # setup host types setup roles
│ │ ├── <setup-1>
│ │ │ └── tasks
│ │ │ └── main.yml
│ │ └── <setup-2>
│ │ └── tasks
│ │ ├── aws.yml # cloud specific roles
│ │ └── euler.yml
│ ├── super_etl # multi suite results processing
| | └── <analysis.yml>
| └── pyproject.toml
├── doe-suite-results # all experiment results
│ └── ...
└── ... # your project files
Suite Design¶
Todo
TODO: Add description of designs and maybe include make design
The figure above shows that a suite design consists of one or more experiments. Each experiment defines the computational environment (i.e., how many machines of which type) and a list of run configurations (i.e., concrete parameters) that we want to execute. Within the run configurations we distinguish between constants and factors. Constant remain the same across all runs, while for factors, we use in each run a unique combination of their levels. To improve validity, we support repeating a run multiple times.
Different experiments in the suite are executed on a different set of host instances in parallel, while run configurations within an experiment are executed sequentially on the same set of host instances.
For each experiment, one instance is the controller which is logically responsible for coordination.
The experiment suite runs experiments based on YAML files in doe-suite-config/designs. The YAML files represent the structure discussed above.
(Add) Host Type¶
The host type config follows the structure of Ansible group_vars
.
The variables defined in all/main.yml
are applying to all hosts, while the variables under <host-type1>/main.yml
correspond specifically to the host type of name <host-type1>
.
The name of the folder, e.g., placeholder <host-type1>
, is then referenced by a suite design.
<your-project-repository>
└── doe-suite-config
├── ...
└── group_vars
├── all
│ └── main.yml
├── <host-type1>
│ └── main.yml
└── <host-type2>
└── main.yml
To add a new host type, create a corresponding folder under doe-suite-config/group_vars/
and ensure that the required variables are set.
For example, the host type small
of the demo_project
:
---
# AWS EC2
instance_type: t2.medium
ec2_volume_size: 16
ec2_image_id: ami-08481eff064f39a84
ec2_volume_snapshot: snap-0b8d7894c93b6df7a
# ETH Euler
euler_job_minutes: 10
euler_cpu_cores: 1
euler_cpu_mem_per_core_mb: 3072
euler_gpu_number: 0
euler_gpu_min_mem_per_gpu_mb: 0
euler_gpu_model: ~
euler_env: "gcc/8.2.0 python/3.9.9"
euler_scratch_dir: "/cluster/scratch/{{ euler_user }}"
# Docker
docker_image_id: "doe-ubuntu20"
docker_image_tag: "latest"
(Add) Setup Role¶
The host type setup roles are regular Ansible Roles and can be used to perform all sorts of tasks required before running experiments. For example: install packages, download data, and building the system artefact.
The name of the role, e.g., placeholder <setup-1>
, is then referenced by a suite design.
The name of the file under tasks/
determines whether they are the same on all clouds, i.e., main.yml
or there are cloud specific tasks, e.g., aws.yml
or euler.yml
.
<your-project-repository>
├── ...
└── doe-suite-config
├── ...
└── roles
├── <setup-1>
│ └── tasks
│ └── main.yml
└── <setup-2>
└── tasks
├── aws.yml
└── euler.yml
(Add) ETL Step¶
The doe-suite-config
folder is also a Poetry project with a package package does_etl_custom
to allow adding project-specific results processing.
<your-project-repository>
├── ...
└── doe-suite-config
├── ...
├── does_etl_custom
│ └── <steps>.py
└── pyproject.toml
The DoE-Suite already provides a few steps that implement common features.
However, the majority of projects will need custom processing, e.g., for building a plot.
Below, we discuss how to add custom Extractor
, Transformer
, and Loader
.
The pyproject.toml
file allows installing custom Python packages not yet used in .
For example, to add the Seaborn visualization library navigate to the doe-suite-config
directory and use Poetry to add the dependency:
cd doe-suite-config
poetry add seaborn
Extractor¶
The Extractor
steps are responsible for bringing all generated results into a table form (data frame) together with the configuration.
Extracting information about the suite and the run config are done automatically.
For extracting the results itself, we configure a set of Extractors.
Todo
maybe remove the details on the extractor here
During the extract phase, we loop over all produced result files and assign them to exactly one extractor through a regex on the filename. The mapping must be 1:1, otherwise the phase aborts with an error.
The reason behind this is that an experiment job should only generate expected files.
Each extractor has a default regex on the filename. For example, the YamlExtractor
matches all files ending in .yml
and .yaml
. However, it is possible to overwrite this default list in the ETL config of the suite design.
To add a custom Extractor
extend the Extractor
base class in the does_etl_custom
package and implement the required methods.
from doespy.etl.steps.extractors import Extractor
class MyExtractor(Extractor):
def default_file_regex() -> List[str]:
# your implementation
def extract(self, path: str, options: Dict) -> List[Dict]:
# your implementation
Todo
add link to etl section
More details can be found in the or by consulting the already provided Extractor
.
Extractor [Show Source]
from abc import ABC, abstractmethod, abstractproperty
from typing import List, Dict, Union
from typing import ClassVar
import warnings
import ruamel.yaml
import json
import csv
from dataclasses import dataclass, field
import sys
import inspect
from pydantic import BaseModel, validator
class Extractor(BaseModel, ABC):
file_regex: Union[str, List[str]] = None
class Config:
extra = "forbid"
@classmethod
def default_file_regex(cls):
pass
@validator("file_regex", pre=True, always=True)
def set_default_regex(cls, value):
if value is None:
value = cls.default_file_regex()
if not isinstance(value, list):
value = [value]
return value
@abstractmethod
def extract(self, path: str, options: Dict) -> List[Dict]:
"""Reads the file defined by `path`, and converts it into a list of dicts.
For example, the CsvExtractor reads a csv and returns a dict for each row.
Args:
path (str): absolute path of file to extract
options (Dict): extractor options as provided in the ETL definition
Returns:
List[Dict]: results found in the file
"""
# NOTE: Extending classes should not use the `options: Dict` and instead use instance variables for parameters
pass
class YamlExtractor(Extractor):
"""
The `YamlExtractor` reads result files as YAML.
The YAML file can contain either a single object (result)
or a list of objects (results).
.. code-block:: yaml
:caption: Example ETL Pipeline Design
$ETL$:
extractors:
YamlExtractor: {} # with default file_regex
YamlExtractor: # with custom file_regex
file_regex: [out.yml]
"""
file_regex: Union[str, List[str]] = [r".*\.yaml$", r".*\.yml$"]
def extract(self, path: str, options: Dict) -> List[Dict]:
# load file as yaml: if top level element is object -> return one element list
with open(path, "r") as f:
data = ruamel.yaml.safe_load(f)
if not isinstance(data, list):
data = [data]
return data
class JsonExtractor(Extractor):
"""
The `JsonExtractor` reads result files as JSON.
The JSON file can contain either a single object (result)
or a list of objects (results).
.. code-block:: yaml
:caption: Example ETL Pipeline Design
$ETL$:
extractors:
JsonExtractor: {} # with default file_regex
JsonExtractor: # with custom file_regex
file_regex: [out.json]
"""
file_regex: Union[str, List[str]] = [r".*\.json$"]
def extract(self, path: str, options: Dict) -> List[Dict]:
# load file as json: if top level element is object -> return one element list
with open(path, "r") as f:
data = json.load(f)
if not isinstance(data, list):
data = [data]
return data
class CsvExtractor(Extractor):
"""
The `CsvExtractor` reads result files as CSV.
The CSV file contains a result per line and by default starts with a header row,
see ``has_header`` and ``fieldnames`` for CSV files without header.
.. code-block:: yaml
:caption: Example ETL Pipeline Design
$ETL$:
extractors:
CsvExtractor: {} # with default params
CsvExtractor: # with custom params
file_regex: [out.csv]
delimiter: ;
has_header: False
fieldnames: [col1, col2, col3]
"""
file_regex: Union[str, List[str]] = [r".*\.csv$"]
"""The regex list to match result files."""
delimiter: str = ","
"""The separator between columns."""
has_header: bool = True
"""Indicates whether the first CSV row is a header or not."""
fieldnames: List[str] = None
"""The names of the CSV columns if ``has_header`` is set to `False`"""
def extract(self, path: str, options: Dict) -> List[Dict]:
# load file as csv: by default treats the first line as a header
# for each later row, we create a dict and add it to the result list to return
data = []
with open(path, "r") as f:
if self.has_header or self.fieldnames is not None:
reader = csv.DictReader(f, delimiter=self.delimiter, fieldnames=self.fieldnames)
else:
reader = csv.reader(f, delimiter=self.delimiter)
for row in reader:
data.append(row)
return data
class ErrorExtractor(Extractor):
"""
The `ErrorExtractor` provides a mechanism to detect potential errors in an experiment job.
For experiments with a large number of jobs, it is easy to overlook an error
because there are many output folders and files e.g., the stderr.log of each job.
The `ErrorExtractor` raises a warning if matching files are not empty.
.. code-block:: yaml
:caption: Example ETL Pipeline Design
$ETL$:
extractors:
ErrorExtractor: {} # checking stderr.log
ErrorExtractor: # checking custom files
file_regex: [stderr.log, error.log]
"""
file_regex: Union[str, List[str]] = ["^stderr.log$"]
"""The regex list to match result files."""
def extract(self, path: str, options: Dict) -> List[Dict]:
# if the file is present and not empty, then throws a warning
with open(path, "r") as f:
content = f.read().replace("\n", " ")
if (
content.strip() and not content.strip().isspace()
): # ignore empty error files
# TODO [nku] should we not raise an error in the etl pipeline?
warnings.warn(f"found error file: {path}")
warnings.warn(f" {content}")
return []
class IgnoreExtractor(Extractor):
"""
The `IgnoreExtractor` provides a mechanism to detect potential errors in an experiment job.
For experiments with a large number of jobs, it is easy to overlook an error
indicted by the presence of an unexpected file.
As a result, the ETL requires that every file in the results folder of the job
must be matched by exactly one Extractor.
The `IgnoreExtractor` can be used to ignore certain files on purpose, e.g., stdout.log.
.. code-block:: yaml
:caption: Example ETL Pipeline Design
$ETL$:
extractors:
IgnoreExtractor: {} # ignore stdout.log
IgnoreExtractor: # custom ignore list
file_regex: [stdout.log, other.txt]
"""
file_regex: Union[str, List[str]] = ["^stdout.log$"]
"""The regex list to match result files."""
def extract(self, path: str, options: Dict) -> List[Dict]:
# ignore this file
# -> every file produced by an experiment run needs to be matched to exactly one extractor.
# the ignore extractor can be used to ignore some files
return []
__all__ = [name for name, cl in inspect.getmembers(sys.modules[__name__], inspect.isclass) if name!="Extractor" and issubclass(cl, Extractor) ]
Tranformer¶
The Transformer
steps are responsible for bringing the table into a suitable form. For example, since we have an experiment with repetition, it is useful to aggregate over the repetitions and calculate error metrics.
The list of transformers
in the $ETL$
design is executed as a chain: the first entry receives the dataframe from the extract phase and passes the modified dataframe to the next transformer in the list.
To add a custom Transformer
extend the Transformer
base class in the does_etl_custom
package and implement the required methods.
from doespy.etl.steps.transformers import Transformer
class MyTransformer(Transformer):
def transform(self, df: pd.DataFrame, options: Dict) -> pd.DataFrame:
# your implementation
Todo
add link to etl section
More details can be found in the or by consulting the already provided Transformer
.
Transformer [Show Source]
from abc import ABC, abstractmethod
from typing import Dict, Any, List
import pandas as pd
import inspect
import sys
from pydantic import BaseModel, validator
import pandas as pd
from doespy.etl.etl_util import expand_factors
class Transformer(BaseModel, ABC):
name: str = None
class Config:
extra = "forbid"
@validator("name", pre=True, always=True)
def set_name(cls, value):
return cls.__name__
@abstractmethod
def transform(self, df: pd.DataFrame, options: Dict) -> pd.DataFrame:
# NOTE: Extending classes should not use the `options: Dict` and instead use instance variables for parameters
pass
class DfTransformer(Transformer):
def transform(self, df: pd.DataFrame, options: Dict) -> pd.DataFrame:
# TODO [nku] here I could define the different things for the df.x -> at the moment unused
pass
class ConditionalTransformer(Transformer):
"""
The `ConditionalTransformer` replaces the value in the ``dest`` column with a
value from the ``value`` dict, if the value in the ``col`` column is equal to the key.
.. code-block:: yaml
:caption: Example ETL Pipeline Design
$ETL$:
transformers:
- name: ConditionalTransformer:
col: Country
dest: Code
value:
Switzerland: CH
Germany: DE
Example
.. container:: twocol
.. container:: leftside
============ ====
Country Code
============ ====
Germany
Switzerland
France
============ ====
.. container:: middle
|:arrow_right:|
.. container:: rightside
============ ====
Country Code
============ ====
Germany DE
Switzerland CH
France
============ ====
"""
col: str
"""Name of condition column in data frame."""
dest: str
"""Name of destination column in data frame."""
value: Dict[Any, Any]
"""Dictionary of replacement rules:
The dict key is the entry in the condition ``col`` and
the value is the replacement used in the ``dest`` column."""
def transform(self, df: pd.DataFrame, options: Dict) -> pd.DataFrame:
col = self.col
value = self.value
dest = self.dest
for cur, repl in value.items():
df.loc[df[col] == cur, dest] = repl
return df
class RepAggTransformer(Transformer):
r"""The `RepAggTransformer` aggregates over the repetitions of the same experiment run.
GroupBy all columns of df except ``data_columns`` and ``rep`` column.
Afterward, apply specified aggregate functions ``agg_functions`` on the ``data_columns``.
:param ignore_columns: List of columns to ignore within group_by condition (apart from repetition column ``rep``), defaults to ``[]``
:param data_columns: The columns that contain the data to aggregate, see ``agg_function``.
:param agg_functions: List of aggregate function to apply on ``data_columns``, defaults to ``["mean", "min", "max", "std", "count"]``
.. code-block:: yaml
:caption: Example ETL Pipeline Design
$ETL$:
transformers:
- name: RepAggTransformer:
ignore_columns: [$CMD$]
data_columns: [Lat]
agg_functions: [mean]
Example
.. container:: twocol
.. container:: leftside
=== ==== === ===== ===
Run ... Rep $CMD$ Lat
=== ==== === ===== ===
0 0 xyz 0.1
0 1 xyz 0.3
1 0 xyz 0.5
1 1 xyz 0.5
=== ==== === ===== ===
.. container:: middle
|:arrow_right:|
.. container:: rightside
=== ==== ========
Run ... Lat_mean
=== ==== ========
0 0.2
1 0.5
=== ==== ========
"""
ignore_columns: List[str] = []
data_columns: List[str]
agg_functions: List[str] = ["mean", "min", "max", "std", "count"]
# TODO [nku] can we remove this transformer by unifying it with GroupByAggTransformer? -> I think we could remove this here and replace it only with the GroupByAggTransformer and include rep in Groupby cols
def transform(self, df: pd.DataFrame, options: Dict) -> pd.DataFrame:
if df.empty:
return df
data_columns = self.data_columns
ignore_columns = self.ignore_columns
agg_functions = self.agg_functions
if not set(data_columns).issubset(df.columns.values):
raise ValueError(
"RepAggTransformer: ",
f"data_columns={data_columns} must be in df_columns={df.columns.values}"
)
# ensure that all data_columns are numbers
df[data_columns] = df[data_columns].apply(pd.to_numeric)
# we need to convert each column into a hashable type
# (list and dict are converted to string)
hashable_types = {}
for col in df.columns:
dtype = df[col].dtype
if dtype == "object":
hashable_types[col] = "str"
else:
hashable_types[col] = dtype
df = df.astype(hashable_types)
# group_by all except `rep` and `data_columns`
group_by_cols = [
col
for col in df.columns.values
if col not in data_columns and col != "rep" and col not in ignore_columns
]
agg_d = {data_col: agg_functions for data_col in data_columns}
df = df.groupby(group_by_cols).agg(agg_d).reset_index()
# flatten columns
df.columns = ["_".join(v) if v[1] else v[0] for v in df.columns.values]
return df
class GroupByAggTransformer(Transformer):
"""
The `GroupByAggTransformer` performs a group by followed
by a set of aggregate functions applied to the ``data_columns``.
.. code-block:: yaml
:caption: Example ETL Pipeline Design
$ETL$:
transformers:
- name: GroupByAggTransformer:
groupby_columns: [Run, $FACTORS$]
data_columns: [Lat]
agg_functions: [mean]
Example
.. container:: twocol
.. container:: leftside
=== ==== === ===== ===
Run ... Rep $CMD$ Lat
=== ==== === ===== ===
0 0 xyz 0.1
0 1 xyz 0.3
1 0 xyz 0.5
1 1 xyz 0.5
=== ==== === ===== ===
.. container:: middle
|:arrow_right:|
.. container:: rightside
=== ==== ========
Run ... Lat_mean
=== ==== ========
0 0.2
1 0.5
=== ==== ========
"""
data_columns: List[str]
""" The columns that contain the data to aggregate, see ``agg_function``."""
groupby_columns: List[str]
"""The columns to perform the group by.
The list can contain the magic entry `$FACTORS$` that expands to all factors of the experiment.
e.g., [exp_name, host_type, host_idx, $FACTORS$] would perform a group by of each run.
"""
agg_functions: List[str] = ["mean", "min", "max", "std", "count"]
"""List of aggregate function to apply on ``data_columns``"""
custom_tail_length: int = 5
""""custom_tail" is a custom aggregation function that calculates the mean over the last `custom_tail_length` entries of a column."""
def custom_tail_build(self, custom_tail_length):
def custom_tail(data):
return data.tail(custom_tail_length).mean()
return custom_tail
def transform(self, df: pd.DataFrame, options: Dict) -> pd.DataFrame:
if df.empty:
return df
data_columns = self.data_columns
# here, we get factor_columns
groupby_columns = expand_factors(df, self.groupby_columns)
agg_functions = self.agg_functions
# To configure size of the 'tail' to calculate the mean over
custom_tail_length = options.get("custom_tail_length", 5)
# custom agg functions
custom_agg_methods_available = {
"custom_tail": self.custom_tail_build(custom_tail_length)
}
for fun in agg_functions.copy():
for method, call in custom_agg_methods_available.items():
if fun == method:
agg_functions.remove(fun) # is this allowed while in the loop?
agg_functions.append(call)
if not set(data_columns).issubset(df.columns.values):
return df
# raise ValueError(f"RepAggTransformer: data_columns={data_columns}
# must be in df_columns={df.columns.values}")
if not set(groupby_columns).issubset(df.columns.values):
raise ValueError(
f"GroupByAggTransformer: groupby_columns={groupby_columns} "
f"must be in df_columns={df.columns.values}"
)
# ensure that all data_columns are numbers
df[data_columns] = df[data_columns].apply(pd.to_numeric)
# we need to convert each column into a hashable type
# (list and dict are converted to string)
hashable_types = {}
for col in df.columns:
dtype = df[col].dtype
if dtype == "object":
hashable_types[col] = "str"
else:
hashable_types[col] = dtype
df = df.astype(hashable_types)
# group_by all except `rep` and `data_columns`
group_by_cols = groupby_columns
agg_d = {data_col: agg_functions for data_col in data_columns}
df = df.groupby(group_by_cols, dropna=False).agg(agg_d).reset_index()
# flatten columns
df.columns = ["_".join(v) if v[1] else v[0] for v in df.columns.values]
return df
class FilterColumnTransformer(Transformer):
"""
Simple transformer to filter rows out of a dataframe by column values.
Accepts key-value pairs in `filters` option.
This transformer is simple for now, only accepts discrete values and
does not do any type handling.
Options:
- filters: dict of filters
- allow_empty_result: bool whether to throw an error when the dataframe
becomes empty as result of the filter.
Defaults to False.
"""
filters: Dict[str, Any] = {}
allow_empty_result: bool = False
def transform(self, df: pd.DataFrame, options: Dict) -> pd.DataFrame:
import warnings
warnings.warn(
"""FilterColumnTransformer is deprecated, instead you can directly use
df.query(col == 'A') in the etl definition
i.e., transformers: [df.query: {expr: col == 'A'}]""",
DeprecationWarning
)
filters: dict[str, Any] = options.get("filters", {})
allow_empty_result: bool = options.get("allow_empty_result", False)
if len(filters.keys()) == 0:
return df
if not set(filters.keys()).issubset(df.columns.values):
raise ValueError(
f"FilterColumnTransformer: filters={filters.keys()}",
f"must be in df_columns={df.columns.values}"
)
for key, value in filters.items():
df = df[df[key] == value]
if df.empty and not allow_empty_result:
raise ValueError(
"FilterColumnTransformer: resulting dataframe after filters is empty!",
"This is probably not supposed to happen"
)
return df
__all__ = [name for name, cl in inspect.getmembers(sys.modules[__name__], inspect.isclass) if name!="Transformer" and issubclass(cl, Transformer) ]
Loader¶
The Loader
steps are responsible for taking the dataframe from the transform phase and produce different representations of the results.
For example, storing results in a database, producing different plots of the data or storing a summary as a csv file.
All the loaders
specified in the ETL config of the suite design, receive the resulting table (data frame) from the transform phase.
To add a custom Loader
extend the Loader
base class in the does_etl_custom
package and implement the required methods.
from doespy.etl.steps.loaders import Loader
class MyLoader(Loader):
def load(self, df: pd.DataFrame, options: Dict, etl_info: Dict) -> None:
# your implementation
Todo
add link to etl section
More details can be found in the or by consulting the already provided Loader
.
Loader [Show Source]
from abc import ABC, abstractmethod
from typing import List, Dict
import pandas as pd
import os
import matplotlib.pyplot as plt
import inspect
import sys
from typing import Optional
from pydantic import BaseModel
class Loader(BaseModel, ABC):
class Config:
extra = "forbid"
def get_output_dir(self, etl_info):
""":meta private:"""
etl_output_dir = etl_info["etl_output_dir"]
if etl_output_dir is not None and not os.path.exists(etl_output_dir):
os.makedirs(etl_output_dir)
return etl_output_dir
@abstractmethod
def load(self, df: pd.DataFrame, options: Dict, etl_info: Dict) -> None:
""":meta private:"""
# NOTE: Extending classes should not use the `options: Dict` and instead use instance variables for parameters
pass
class PlotLoader(Loader):
def save_data(self, df: pd.DataFrame, filename: str, output_dir: str, output_filetypes: List[str] = ["html"]):
""":meta private:"""
os.makedirs(output_dir, exist_ok=True)
for ext in output_filetypes:
if ext == "html":
html_table = df.to_html()
path = os.path.join(output_dir, f"{filename}.html")
with open(path, 'w') as file:
file.write(html_table)
else:
raise ValueError(f"PlotLoader: Unknown file type {ext}")
def save_plot(
self,
fig: plt.Figure,
filename: str,
output_dir: str,
use_tight_layout: bool = True,
output_filetypes: List[str] = ["pdf", "png"],
):
""":meta private:"""
os.makedirs(output_dir, exist_ok=True)
for ext in output_filetypes:
full_filename = f"{output_dir}/{filename}.{ext}"
bbox_inches = "tight" if (use_tight_layout and ext == "pdf") else None
pad_inches = 0 if (use_tight_layout and ext == "pdf") else 0.1
dpi = 300
fig.savefig(
full_filename,
format=ext,
bbox_inches=bbox_inches,
pad_inches=pad_inches,
dpi=dpi,
)
def default_fig(self):
""":meta private:"""
scale_factor = 2.4
figsize = [
scale_factor * 1.618,
scale_factor * 1,
] # [width, height] based on golden ratio
fig = plt.figure(figsize=figsize, dpi=100)
plt.figure(fig.number)
return fig
class CsvSummaryLoader(Loader):
"""
The `CsvSummaryLoader` creates a CSV file of the data frame from the `Transformer` stage.
.. code-block:: yaml
:caption: Example ETL Pipeline Design
$ETL$:
loaders:
CsvSummaryLoader: {} # with default output dir
CsvSummaryLoader: # with skip empty df
skip_empty: True
"""
skip_empty: bool = False
"""Ignore empty df, if set to ``False``, raises an error if the data frame is empty."""
def load(self, df: pd.DataFrame, options: Dict, etl_info: Dict) -> None:
if self.skip_empty and df.empty:
return
elif df.empty:
raise ValueError("CsvSummaryLoader: DataFrame is empty so not creating an output file.")
else:
output_dir = self.get_output_dir(etl_info)
df.to_csv(os.path.join(output_dir, f"{etl_info['pipeline']}.csv"))
class PickleSummaryLoader(Loader):
"""The `PickleSummaryLoader` creates a Pickle file of the data frame from the `Transformer` stage.
.. code-block:: yaml
:caption: Example ETL Pipeline Design
$ETL$:
loaders:
PickleSummaryLoader: {} # with default output dir
PickleSummaryLoader: # with skip empty dir
skip_empty: True
"""
skip_empty: bool = False
"""Ignore empty df, if set to ``False``, raises an error if the data frame is empty."""
def load(self, df: pd.DataFrame, options: Dict, etl_info: Dict) -> None:
if self.skip_empty and df.empty:
return
elif df.empty:
raise ValueError("PickleSummaryLoader: DataFrame is empty so not creating an output file.")
else:
output_dir = self.get_output_dir(etl_info)
df.to_pickle(os.path.join(output_dir, f"{etl_info['pipeline']}.pkl"))
class LatexTableLoader(Loader):
"""The `LatexTableLoader` creates a tex file of the data frame from the `Transformer` stage formatted as a Latex table.
.. code-block:: yaml
:caption: Example ETL Pipeline Design
$ETL$:
loaders:
LatexTableLoader: {} # with default output dir
LatexTableLoader: # with skip empty df
skip_empty: True
"""
skip_empty: bool = False
"""Ignore empty df, if set to ``False``, raises an error if the data frame is empty."""
def load(self, df: pd.DataFrame, options: Dict, etl_info: Dict) -> None:
if self.skip_empty and df.empty:
return
elif df.empty:
raise ValueError("LatexTableLoader: DataFrame is empty so not creating an output file.")
output_dir = self.get_output_dir(etl_info)
with open(
os.path.join(output_dir, f"{etl_info['pipeline']}.tex"), "w"
) as file:
df.to_latex(buf=file)
__all__ = [name for name, cl in inspect.getmembers(sys.modules[__name__], inspect.isclass) if name!="Loader" and issubclass(cl, Loader) ]
Execution¶
We are done with configuring experiments and now want to execute them.
For this we need to move to the doe-suite
folder because you interact with the DoE-Suite with a Makefile
located there.
cd doe-suite
You can call make
or make help
to see an overview of the functionality:
make help
Show Output
$ make help
Running Experiments
make run suite=<SUITE> id=new - run the experiments in the suite
make run suite=<SUITE> id=<ID> - continue with the experiments in the suite with <ID> (often id=last)
make run suite=<SUITE> id=<ID> cloud=<CLOUD> - run suite on non-default cloud ([aws], euler)
make run suite=<SUITE> id=<ID> expfilter=<REGEX> - run only subset of experiments in suite where name matches the <REGEX> (suite must be valid)
make run-keep suite=<SUITE> id=new - does not terminate instances at the end, otherwise works the same as run target
Clean
make clean - terminate running cloud instances belonging to the project and local cleanup
make clean-result - delete all inclomplete results in doe-suite-results
Running ETL Locally
make etl suite=<SUITE> id=<ID> - run the etl pipeline of the suite (locally) to process results (often id=last)
make etl-design suite=<SUITE> id=<ID> - same as `make etl ...` but uses the pipeline from the suite design instead of results
make etl-all - run etl pipelines of all results
make etl-super config=<CONFIG> out=<PATH> - run the super etl to combine results of multiple suites (for <CONFIG> e.g., demo_plots)
make etl-super ... pipelines="<P1> <P2>" - run only a subset of pipelines in the super etl
Clean ETL
make etl-clean suite=<SUITE> id=<ID> - delete etl results from specific suite (can be regenerated with make etl ...)
make etl-clean-all - delete etl results from all suites (can be regenerated with make etl-all)
Gather Information
make info - list available suite designs
make status suite=<SUITE> id=<ID> - show the status of a specific suite run (often id=last)
Design of Experiment Suites
make design suite=<SUITE> - list all the run commands defined by the suite
make design-validate suite=<SUITE> - validate suite design and show with default values
Setting up a Suite
make new - initialize doe-suite-config from a template
Running Tests
make test - running all suites (seq) and comparing results to expected (on aws)
make euler-test cloud=euler - running all single instance suites on euler and compare results to expected
make etl-test-all - re-run all etl pipelines and compare results to current state (useful after update of etl step)
Warning
The DoE-Suite can easily start many instances in a remote cloud. If there is an error in the execution, or the suite finishes before all jobs are complete, then these remote resources are not terminated and can generate high costs. Always check that resources are terminated. We also provide the following command to ensure that the previously started instances are terminated:
make clean
To start a new suite on the default cloud, you use:
make run suite=<YOUR-SUITE-DESIGN> id=new
When we start a new experiment suite, it receives a unique ID (epoch timestamp). Each experiment of the suite must have a unique name in the experiment design specification.
The playbook periodically checks whether an experiment run is finished and then downloads the results. The variable job_n_tries controls the maximum number of times to check whether the job finished. In between checking, the playbook waits for job_check_wait_time seconds (see doe-suite-config/group_vars/all/main.yml). After the number of job_n_tries is exceeded, the playbook aborts.
Experiments that involve multiple instances (e.g., client-server experiment) require the experiment-suite playbook to start the next job after the previous finished. The consequence is that when the playbook aborts because job_n_tries is exceeded, an already running job will continue to run on AWS, but the next job won’t start unless the experiment-suite.yml playbook runs.
For experiments that run on a single instance, all jobs are scheduled on the instance from the beginning. As a consequence, after a job completes, the next job automatically starts even when the experiment-suite.yml playbook does not run. In this case, the playbook is only required to fetch results.
To continue checking a previously started experiment, we can specify the ID of the experiment when starting the playbook:
# can replace `id=last` with actual id, e.g., `id=1655831553`
make etl suite=<YOUR-SUITE-DESIGN> id=last
After fetching new results, ETL pipelines are executed locally on your machine. It’s also possible to re-run the ETL pipelines on the result files without re-running experiments.
# can replace `id=last` with actual id, e.g., `id=1655831553`
make etl suite=<YOUR-SUITE-DESIGN> id=last