Tutorial

We assume that you already have a project repo where you want to integrate the doe-suite, otherwise you can create a directory and init a new repository:

mkdir my-project
cd my-project
git init

Afterward, follow the installation instructions to integrate the doe-suite in your project as a submodule.

At the end of the installation section, you can run the suite designs from the demo_project. To use the suite in your project, you need to change the environment variable DOES_PROJECT_DIR from pointing to the demo_project to your project. As a summary, these environment variables should now be set:

# path to your project
export DOES_PROJECT_DIR=<PATH-TO-YOUR-PROJECT>

# your unique shortname, e.g., nku
export DOES_PROJECT_ID_SUFFIX=<SUFFIX>

# choose default cloud: aws, euler
export DOES_CLOUD=<DEFAULT-CLOUD>

# for AWS
export DOES_SSH_KEY_NAME=<YOUR-PRIVATE-SSH-KEY-FOR-AWS>

export DOES_AWS_USER=<SSH-USERNAME>

# for Euler
export DOES_EULER_USER=<YOUR-NETHZ>

# for Docker
export DOES_DOCKER_USER=<SSH-USERNAME>  # [optional] defaults to ubuntu
export DOES_DOCKER_SSH_PUBLIC_KEY=<SSH-PUBLIC-KEY>  # e.g., ~/.ssh/id_rsa.pub
export DOCKER_HOST=<DOCKER-HOST> # [optional]  defaults to unix://var/run/docker.sock

Tip

Direnv allows project-specific env vars in an .envrc file.

Move to the doe-suite repository and create your project-specific doe-suite-config with the help of a Cookiecutter project template. For a start, it should be fine to accept default values except for the repo you should use the repo of your project instead of the doe-suite repo.

cd doe-suite
make new

Note that make new first checks whether there is already a doe-suite-config in the DOES_PROJECT_DIR.


Project Layout

Your project repository should have a folder doe-suite-config which contains a skeleton of the whole configuration.

Overall, the DoE - Suite adds three top level folders to a project repository: doe-suite, doe-suite-config, and doe-suite-results. The doe-suite folder is the doe-suite repo as a submodule. The doe-suite-config folder contains the whole configuration of how to run experiments + project specific extensions of the suite. In the doe-suite-results folder all the result files are stored.

The complete folder structure for a project looks as follows:

<your-project-repository>
├── doe-suite                  # the doe-suite repo as a submodule
├── doe-suite-config
│   ├── designs                # experiment suite designs   ├── does_etl_custom        # custom steps for processing results   ├── group_vars             # host type config (e.g.,instance type)   ├── inventory              # manual inventories for ansible   ├── roles                  # setup host types setup roles   ├── super_etl              # multi suite results processing
|   └── pyproject.toml
├── doe-suite-results          # all experiment results   └── ...
└── ...                        # your project files

Suite Design

Todo

TODO: Add description of designs and maybe include make design

The figure above shows that a suite design consists of one or more experiments. Each experiment defines the computational environment (i.e., how many machines of which type) and a list of run configurations (i.e., concrete parameters) that we want to execute. Within the run configurations we distinguish between constants and factors. Constant remain the same across all runs, while for factors, we use in each run a unique combination of their levels. To improve validity, we support repeating a run multiple times.

Different experiments in the suite are executed on a different set of host instances in parallel, while run configurations within an experiment are executed sequentially on the same set of host instances.

For each experiment, one instance is the controller which is logically responsible for coordination.

The experiment suite runs experiments based on YAML files in doe-suite-config/designs. The YAML files represent the structure discussed above.

(Add) Host Type

The host type config follows the structure of Ansible group_vars.

The variables defined in all/main.yml are applying to all hosts, while the variables under <host-type1>/main.yml correspond specifically to the host type of name <host-type1>.

The name of the folder, e.g., placeholder <host-type1>, is then referenced by a suite design.

<your-project-repository>
└── doe-suite-config
    ├── ...
    └── group_vars
        ├── all
        │   └── main.yml
        ├── <host-type1>
        │   └── main.yml
        └── <host-type2>
            └── main.yml

To add a new host type, create a corresponding folder under doe-suite-config/group_vars/ and ensure that the required variables are set.

For example, the host type small of the demo_project:

doe-suite-config/group_vars/small/main.yml
---

# AWS EC2
instance_type: t2.medium
ec2_volume_size: 16
ec2_image_id: ami-08481eff064f39a84
ec2_volume_snapshot: snap-0b8d7894c93b6df7a

# ETH Euler
euler_job_minutes: 10
euler_cpu_cores: 1
euler_cpu_mem_per_core_mb: 3072
euler_gpu_number: 0
euler_gpu_min_mem_per_gpu_mb: 0
euler_gpu_model: ~
euler_env: "gcc/8.2.0 python/3.9.9"
euler_scratch_dir: "/cluster/scratch/{{ euler_user }}"

# Docker
docker_image_id: "doe-ubuntu20"
docker_image_tag: "latest"

(Add) Setup Role

The host type setup roles are regular Ansible Roles and can be used to perform all sorts of tasks required before running experiments. For example: install packages, download data, and building the system artefact.

The name of the role, e.g., placeholder <setup-1>, is then referenced by a suite design.

The name of the file under tasks/ determines whether they are the same on all clouds, i.e., main.yml or there are cloud specific tasks, e.g., aws.yml or euler.yml.

<your-project-repository>
├── ...
└── doe-suite-config
    ├── ...
    └── roles
        ├── <setup-1>
        │   └── tasks
        │       └── main.yml
        └── <setup-2>
            └── tasks
                ├── aws.yml
                └── euler.yml

(Add) ETL Step

The doe-suite-config folder is also a Poetry project with a package package does_etl_custom to allow adding project-specific results processing.

<your-project-repository>
├── ...
└── doe-suite-config
    ├── ...
    ├── does_etl_custom
       └── <steps>.py
    └── pyproject.toml

The DoE-Suite already provides a few steps that implement common features. However, the majority of projects will need custom processing, e.g., for building a plot. Below, we discuss how to add custom Extractor, Transformer, and Loader.

The pyproject.toml file allows installing custom Python packages not yet used in . For example, to add the Seaborn visualization library navigate to the doe-suite-config directory and use Poetry to add the dependency:

cd doe-suite-config
poetry add seaborn

Extractor

The Extractor steps are responsible for bringing all generated results into a table form (data frame) together with the configuration. Extracting information about the suite and the run config are done automatically. For extracting the results itself, we configure a set of Extractors.

Todo

maybe remove the details on the extractor here

During the extract phase, we loop over all produced result files and assign them to exactly one extractor through a regex on the filename. The mapping must be 1:1, otherwise the phase aborts with an error. The reason behind this is that an experiment job should only generate expected files. Each extractor has a default regex on the filename. For example, the YamlExtractor matches all files ending in .yml and .yaml. However, it is possible to overwrite this default list in the ETL config of the suite design.

To add a custom Extractor extend the Extractor base class in the does_etl_custom package and implement the required methods.

from doespy.etl.steps.extractors import Extractor

class MyExtractor(Extractor):

    def default_file_regex() -> List[str]:
        # your implementation

    def extract(self, path: str, options: Dict) -> List[Dict]:
        # your implementation

Todo

add link to etl section

More details can be found in the or by consulting the already provided Extractor.

Extractor [Show Source]
doespy/doespy/etl/steps/extractors.py
from abc import ABC, abstractmethod, abstractproperty
from typing import List, Dict, Union
from typing import ClassVar

import warnings
import ruamel.yaml
import json
import csv
from dataclasses import dataclass, field

import sys
import inspect

from pydantic import BaseModel, validator


class Extractor(BaseModel, ABC):

    file_regex: Union[str, List[str]] = None

    class Config:
        extra = "forbid"

    @classmethod
    def default_file_regex(cls):
        pass

    @validator("file_regex", pre=True, always=True)
    def set_default_regex(cls, value):

        if value is None:
            value = cls.default_file_regex()

        if not isinstance(value, list):
            value = [value]

        return value

    @abstractmethod
    def extract(self, path: str, options: Dict) -> List[Dict]:
        """Reads the file defined by `path`, and converts it into a list of dicts.
            For example, the CsvExtractor reads a csv and returns a dict for each row.
        Args:
            path (str): absolute path of file to extract
            options (Dict): extractor options as provided in the ETL definition
        Returns:
            List[Dict]: results found in the file
        """

        # NOTE: Extending classes should not use the `options: Dict` and instead use instance variables for parameters


        pass


class YamlExtractor(Extractor):

    """
    The `YamlExtractor` reads result files as YAML.

    The YAML file can contain either a single object (result)
    or a list of objects (results).

    .. code-block:: yaml
        :caption: Example ETL Pipeline Design

        $ETL$:
            extractors:
                YamlExtractor: {}         # with default file_regex
                YamlExtractor:            # with custom file_regex
                    file_regex: [out.yml]

    """

    file_regex: Union[str, List[str]] = [r".*\.yaml$", r".*\.yml$"]

    def extract(self, path: str, options: Dict) -> List[Dict]:
        # load file as yaml: if top level element is object -> return one element list
        with open(path, "r") as f:
            data = ruamel.yaml.safe_load(f)
        if not isinstance(data, list):
            data = [data]
        return data


class JsonExtractor(Extractor):
    """
    The `JsonExtractor` reads result files as JSON.
    The JSON file can contain either a single object (result)
    or a list of objects (results).

    .. code-block:: yaml
        :caption: Example ETL Pipeline Design

        $ETL$:
            extractors:
                JsonExtractor: {}         # with default file_regex
                JsonExtractor:            # with custom file_regex
                    file_regex: [out.json]

    """

    file_regex: Union[str, List[str]] = [r".*\.json$"]


    def extract(self, path: str, options: Dict) -> List[Dict]:
        # load file as json: if top level element is object -> return one element list
        with open(path, "r") as f:
            data = json.load(f)

        if not isinstance(data, list):
            data = [data]

        return data


class CsvExtractor(Extractor):
    """
    The `CsvExtractor` reads result files as CSV.
    The CSV file contains a result per line and by default starts with a header row,
    see ``has_header`` and ``fieldnames`` for CSV files without header.

    .. code-block:: yaml
       :caption: Example ETL Pipeline Design

        $ETL$:
            extractors:
                CsvExtractor: {}         # with default params
                CsvExtractor:            # with custom params
                    file_regex: [out.csv]
                    delimiter: ;
                    has_header: False
                    fieldnames: [col1, col2, col3]
    """

    file_regex: Union[str, List[str]] = [r".*\.csv$"]
    """The regex list to match result files."""

    delimiter: str = ","
    """The separator between columns."""

    has_header: bool = True
    """Indicates whether the first CSV row is a header or not."""

    fieldnames: List[str] = None
    """The names of the CSV columns if ``has_header`` is set to `False`"""


    def extract(self, path: str, options: Dict) -> List[Dict]:
        # load file as csv: by default treats the first line as a header
        #   for each later row, we create a dict and add it to the result list to return

        data = []

        with open(path, "r") as f:

            if self.has_header or self.fieldnames is not None:
                reader = csv.DictReader(f, delimiter=self.delimiter, fieldnames=self.fieldnames)
            else:
                reader = csv.reader(f, delimiter=self.delimiter)
            for row in reader:
                data.append(row)

        return data


class ErrorExtractor(Extractor):
    """
    The `ErrorExtractor` provides a mechanism to detect potential errors in an experiment job.
    For experiments with a large number of jobs, it is easy to overlook an error
    because there are many output folders and files e.g., the stderr.log of each job.

    The `ErrorExtractor` raises a warning if matching files are not empty.

    .. code-block:: yaml
       :caption: Example ETL Pipeline Design

        $ETL$:
            extractors:
                ErrorExtractor: {}         # checking stderr.log
                ErrorExtractor:            # checking custom files
                    file_regex: [stderr.log, error.log]
    """


    file_regex: Union[str, List[str]] = ["^stderr.log$"]
    """The regex list to match result files."""


    def extract(self, path: str, options: Dict) -> List[Dict]:
        # if the file is present and not empty, then throws a warning
        with open(path, "r") as f:
            content = f.read().replace("\n", " ")

        if (
            content.strip() and not content.strip().isspace()
        ):  # ignore empty error files
            # TODO [nku] should we not raise an error in the etl pipeline?
            warnings.warn(f"found error file: {path}")
            warnings.warn(f"   {content}")
        return []


class IgnoreExtractor(Extractor):
    """
    The `IgnoreExtractor` provides a mechanism to detect potential errors in an experiment job.
    For experiments with a large number of jobs, it is easy to overlook an error
    indicted by the presence of an unexpected file.

    As a result, the ETL requires that every file in the results folder of the job
    must be matched by exactly one Extractor.

    The `IgnoreExtractor` can be used to ignore certain files on purpose, e.g., stdout.log.

    .. code-block:: yaml
       :caption: Example ETL Pipeline Design

        $ETL$:
            extractors:
                IgnoreExtractor: {}        # ignore stdout.log
                IgnoreExtractor:           # custom ignore list
                    file_regex: [stdout.log, other.txt]
    """

    file_regex: Union[str, List[str]] = ["^stdout.log$"]
    """The regex list to match result files."""


    def extract(self, path: str, options: Dict) -> List[Dict]:
        # ignore this file
        #  -> every file produced by an experiment run needs to be matched to exactly one extractor.
        #       the ignore extractor can be used to ignore some files
        return []


__all__ = [name for name, cl in inspect.getmembers(sys.modules[__name__], inspect.isclass) if name!="Extractor" and issubclass(cl, Extractor) ]

Tranformer

The Transformer steps are responsible for bringing the table into a suitable form. For example, since we have an experiment with repetition, it is useful to aggregate over the repetitions and calculate error metrics.

The list of transformers in the $ETL$ design is executed as a chain: the first entry receives the dataframe from the extract phase and passes the modified dataframe to the next transformer in the list.

To add a custom Transformer extend the Transformer base class in the does_etl_custom package and implement the required methods.

from doespy.etl.steps.transformers import Transformer

class MyTransformer(Transformer):

    def transform(self, df: pd.DataFrame, options: Dict) -> pd.DataFrame:
        # your implementation

Todo

add link to etl section

More details can be found in the or by consulting the already provided Transformer.

Transformer [Show Source]
doespy/doespy/etl/steps/transformers.py
from abc import ABC, abstractmethod
from typing import Dict, Any, List

import pandas as pd
import inspect
import sys

from pydantic import BaseModel, validator

import pandas as pd

from doespy.etl.etl_util import expand_factors


class Transformer(BaseModel, ABC):

    name: str = None

    class Config:
        extra = "forbid"

    @validator("name", pre=True, always=True)
    def set_name(cls, value):
        return cls.__name__

    @abstractmethod
    def transform(self, df: pd.DataFrame, options: Dict) -> pd.DataFrame:

        # NOTE: Extending classes should not use the `options: Dict` and instead use instance variables for parameters

        pass


class DfTransformer(Transformer):

    def transform(self, df: pd.DataFrame, options: Dict) -> pd.DataFrame:

        # TODO [nku] here I could define the different things for the df.x -> at the moment unused
        pass

class ConditionalTransformer(Transformer):
    """
    The `ConditionalTransformer` replaces the value in the ``dest`` column with a
    value from the ``value`` dict, if the value in the ``col`` column is equal to the key.

    .. code-block:: yaml
       :caption: Example ETL Pipeline Design

        $ETL$:
            transformers:
              - name: ConditionalTransformer:
                col: Country
                dest: Code
                value:
                    Switzerland: CH
                    Germany: DE

    Example

    .. container:: twocol

        .. container:: leftside

            ============  ====
            Country       Code
            ============  ====
            Germany
            Switzerland
            France
            ============  ====

        .. container:: middle

            |:arrow_right:|

        .. container:: rightside

            ============  ====
            Country       Code
            ============  ====
            Germany       DE
            Switzerland   CH
            France
            ============  ====

    """

    col: str
    """Name of condition column in data frame."""

    dest: str
    """Name of destination column in data frame."""

    value: Dict[Any, Any]
    """Dictionary of replacement rules:
        The dict key is the entry in the condition ``col`` and
        the value is the replacement used in the ``dest`` column."""


    def transform(self, df: pd.DataFrame, options: Dict) -> pd.DataFrame:

        col = self.col
        value = self.value
        dest = self.dest

        for cur, repl in value.items():
            df.loc[df[col] == cur, dest] = repl

        return df


class RepAggTransformer(Transformer):

    r"""The `RepAggTransformer` aggregates over the repetitions of the same experiment run.
    GroupBy all columns of df except ``data_columns`` and ``rep`` column.
    Afterward, apply specified aggregate functions ``agg_functions`` on the ``data_columns``.

    :param ignore_columns: List of columns to ignore within group_by condition (apart from repetition column ``rep``), defaults to ``[]``

    :param data_columns: The columns that contain the data to aggregate, see ``agg_function``.

    :param agg_functions: List of aggregate function to apply on ``data_columns``, defaults to ``["mean", "min", "max", "std", "count"]``

    .. code-block:: yaml
       :caption: Example ETL Pipeline Design

        $ETL$:
            transformers:
              - name: RepAggTransformer:
                ignore_columns: [$CMD$]
                data_columns: [Lat]
                agg_functions: [mean]

    Example

    .. container:: twocol

        .. container:: leftside

            ===  ==== === ===== ===
            Run  ...  Rep $CMD$ Lat
            ===  ==== === ===== ===
            0         0   xyz   0.1
            0         1   xyz   0.3
            1         0   xyz   0.5
            1         1   xyz   0.5
            ===  ==== === ===== ===

        .. container:: middle

            |:arrow_right:|

        .. container:: rightside

            ===  ==== ========
            Run  ...  Lat_mean
            ===  ==== ========
            0         0.2
            1         0.5
            ===  ==== ========

    """

    ignore_columns: List[str] = []

    data_columns: List[str]

    agg_functions: List[str] = ["mean", "min", "max", "std", "count"]

    # TODO [nku] can we remove this transformer by unifying it with GroupByAggTransformer? -> I think we could remove this here and replace it only with the GroupByAggTransformer and include rep in Groupby cols

    def transform(self, df: pd.DataFrame, options: Dict) -> pd.DataFrame:
        if df.empty:
            return df

        data_columns = self.data_columns

        ignore_columns = self.ignore_columns

        agg_functions = self.agg_functions

        if not set(data_columns).issubset(df.columns.values):
            raise ValueError(
                "RepAggTransformer: ",
                f"data_columns={data_columns} must be in df_columns={df.columns.values}"
            )

        # ensure that all data_columns are numbers
        df[data_columns] = df[data_columns].apply(pd.to_numeric)

        # we need to convert each column into a hashable type
        # (list and dict are converted to string)
        hashable_types = {}
        for col in df.columns:
            dtype = df[col].dtype
            if dtype == "object":
                hashable_types[col] = "str"
            else:
                hashable_types[col] = dtype
        df = df.astype(hashable_types)

        # group_by all except `rep` and `data_columns`
        group_by_cols = [
            col
            for col in df.columns.values
            if col not in data_columns and col != "rep" and col not in ignore_columns
        ]
        agg_d = {data_col: agg_functions for data_col in data_columns}
        df = df.groupby(group_by_cols).agg(agg_d).reset_index()

        # flatten columns
        df.columns = ["_".join(v) if v[1] else v[0] for v in df.columns.values]

        return df


class GroupByAggTransformer(Transformer):
    """
    The `GroupByAggTransformer` performs a group by followed
    by a set of aggregate functions applied to the ``data_columns``.

    .. code-block:: yaml
       :caption: Example ETL Pipeline Design

        $ETL$:
            transformers:
              - name: GroupByAggTransformer:
                groupby_columns: [Run, $FACTORS$]
                data_columns: [Lat]
                agg_functions: [mean]

    Example

    .. container:: twocol

        .. container:: leftside

            ===  ==== === ===== ===
            Run  ...  Rep $CMD$ Lat
            ===  ==== === ===== ===
            0         0   xyz   0.1
            0         1   xyz   0.3
            1         0   xyz   0.5
            1         1   xyz   0.5
            ===  ==== === ===== ===

        .. container:: middle

            |:arrow_right:|

        .. container:: rightside

            ===  ==== ========
            Run  ...  Lat_mean
            ===  ==== ========
            0         0.2
            1         0.5
            ===  ==== ========

    """

    data_columns: List[str]
    """ The columns that contain the data to aggregate, see ``agg_function``."""

    groupby_columns: List[str]
    """The columns to perform the group by.
        The list can contain the magic entry `$FACTORS$` that expands to all factors of the experiment.
        e.g., [exp_name, host_type, host_idx, $FACTORS$] would perform a group by of each run.
    """

    agg_functions: List[str] = ["mean", "min", "max", "std", "count"]
    """List of aggregate function to apply on ``data_columns``"""

    custom_tail_length: int =  5
    """"custom_tail" is a custom aggregation function that calculates the mean over the last `custom_tail_length` entries of a column."""

    def custom_tail_build(self, custom_tail_length):

        def custom_tail(data):
            return data.tail(custom_tail_length).mean()

        return custom_tail

    def transform(self, df: pd.DataFrame, options: Dict) -> pd.DataFrame:
        if df.empty:
            return df

        data_columns = self.data_columns
        # here, we get factor_columns
        groupby_columns = expand_factors(df, self.groupby_columns)
        agg_functions = self.agg_functions

        # To configure size of the 'tail' to calculate the mean over
        custom_tail_length = options.get("custom_tail_length", 5)

        # custom agg functions
        custom_agg_methods_available = {
            "custom_tail": self.custom_tail_build(custom_tail_length)
        }
        for fun in agg_functions.copy():
            for method, call in custom_agg_methods_available.items():
                if fun == method:
                    agg_functions.remove(fun)  # is this allowed while in the loop?
                    agg_functions.append(call)

        if not set(data_columns).issubset(df.columns.values):
            return df
            # raise ValueError(f"RepAggTransformer: data_columns={data_columns}
            # must be in df_columns={df.columns.values}")
        if not set(groupby_columns).issubset(df.columns.values):
            raise ValueError(
                f"GroupByAggTransformer: groupby_columns={groupby_columns} "
                f"must be in df_columns={df.columns.values}"
            )

        # ensure that all data_columns are numbers
        df[data_columns] = df[data_columns].apply(pd.to_numeric)

        # we need to convert each column into a hashable type
        # (list and dict are converted to string)
        hashable_types = {}
        for col in df.columns:
            dtype = df[col].dtype
            if dtype == "object":
                hashable_types[col] = "str"
            else:
                hashable_types[col] = dtype
        df = df.astype(hashable_types)

        # group_by all except `rep` and `data_columns`
        group_by_cols = groupby_columns
        agg_d = {data_col: agg_functions for data_col in data_columns}
        df = df.groupby(group_by_cols, dropna=False).agg(agg_d).reset_index()

        # flatten columns
        df.columns = ["_".join(v) if v[1] else v[0] for v in df.columns.values]

        return df


class FilterColumnTransformer(Transformer):

    """
    Simple transformer to filter rows out of a dataframe by column values.

    Accepts key-value pairs in `filters` option.

    This transformer is simple for now, only accepts discrete values and
    does not do any type handling.
    Options:
        - filters: dict of filters
        - allow_empty_result: bool whether to throw an error when the dataframe
            becomes empty as result of the filter.
            Defaults to False.
    """

    filters: Dict[str, Any] = {}
    allow_empty_result: bool = False

    def transform(self, df: pd.DataFrame, options: Dict) -> pd.DataFrame:

        import warnings

        warnings.warn(
            """FilterColumnTransformer is deprecated, instead you can directly use
            df.query(col == 'A') in the etl definition
            i.e., transformers: [df.query: {expr: col == 'A'}]""",
            DeprecationWarning
        )

        filters: dict[str, Any] = options.get("filters", {})
        allow_empty_result: bool = options.get("allow_empty_result", False)

        if len(filters.keys()) == 0:
            return df
        if not set(filters.keys()).issubset(df.columns.values):
            raise ValueError(
                f"FilterColumnTransformer: filters={filters.keys()}",
                f"must be in df_columns={df.columns.values}"
            )

        for key, value in filters.items():
            df = df[df[key] == value]

        if df.empty and not allow_empty_result:
            raise ValueError(
                "FilterColumnTransformer: resulting dataframe after filters is empty!",
                "This is probably not supposed to happen"
            )
        return df



__all__ = [name for name, cl in inspect.getmembers(sys.modules[__name__], inspect.isclass) if name!="Transformer" and issubclass(cl, Transformer) ]

Loader

The Loader steps are responsible for taking the dataframe from the transform phase and produce different representations of the results. For example, storing results in a database, producing different plots of the data or storing a summary as a csv file.

All the loaders specified in the ETL config of the suite design, receive the resulting table (data frame) from the transform phase.

To add a custom Loader extend the Loader base class in the does_etl_custom package and implement the required methods.

from doespy.etl.steps.loaders import Loader

class MyLoader(Loader):

     def load(self, df: pd.DataFrame, options: Dict, etl_info: Dict) -> None:
        # your implementation

Todo

add link to etl section

More details can be found in the or by consulting the already provided Loader.

Loader [Show Source]
doespy/doespy/etl/steps/loaders.py
from abc import ABC, abstractmethod
from typing import List, Dict

import pandas as pd
import os
import matplotlib.pyplot as plt
import inspect
import sys
from typing import Optional

from pydantic import BaseModel

class Loader(BaseModel, ABC):
    class Config:
        extra = "forbid"

    def get_output_dir(self, etl_info):
        """:meta private:"""

        etl_output_dir = etl_info["etl_output_dir"]

        if etl_output_dir is not None and not os.path.exists(etl_output_dir):
            os.makedirs(etl_output_dir)

        return etl_output_dir

    @abstractmethod
    def load(self, df: pd.DataFrame, options: Dict, etl_info: Dict) -> None:
        """:meta private:"""
        # NOTE: Extending classes should not use the `options: Dict` and instead use instance variables for parameters

        pass


class PlotLoader(Loader):

    def save_data(self, df: pd.DataFrame, filename: str, output_dir: str, output_filetypes: List[str] = ["html"]):
        """:meta private:"""
        os.makedirs(output_dir, exist_ok=True)

        for ext in output_filetypes:
            if ext == "html":
                html_table = df.to_html()
                path = os.path.join(output_dir, f"{filename}.html")

                with open(path, 'w') as file:
                    file.write(html_table)
            else:
                raise ValueError(f"PlotLoader: Unknown file type {ext}")


    def save_plot(
        self,
        fig: plt.Figure,
        filename: str,
        output_dir: str,
        use_tight_layout: bool = True,
        output_filetypes: List[str] = ["pdf", "png"],
    ):
        """:meta private:"""
        os.makedirs(output_dir, exist_ok=True)


        for ext in output_filetypes:
            full_filename = f"{output_dir}/{filename}.{ext}"

            bbox_inches = "tight" if (use_tight_layout and ext == "pdf") else None
            pad_inches = 0 if (use_tight_layout and ext == "pdf") else 0.1
            dpi = 300
            fig.savefig(
                full_filename,
                format=ext,
                bbox_inches=bbox_inches,
                pad_inches=pad_inches,
                dpi=dpi,
            )

    def default_fig(self):
        """:meta private:"""
        scale_factor = 2.4
        figsize = [
            scale_factor * 1.618,
            scale_factor * 1,
        ]  # [width, height] based on golden ratio
        fig = plt.figure(figsize=figsize, dpi=100)
        plt.figure(fig.number)
        return fig


class CsvSummaryLoader(Loader):
    """
    The `CsvSummaryLoader` creates a CSV file of the data frame from the `Transformer` stage.

    .. code-block:: yaml
       :caption: Example ETL Pipeline Design

        $ETL$:
            loaders:
                CsvSummaryLoader: {}     # with default output dir
                CsvSummaryLoader:        # with skip empty df
                    skip_empty: True
    """

    skip_empty: bool = False
    """Ignore empty df, if set to ``False``, raises an error if the data frame is empty."""


    def load(self, df: pd.DataFrame, options: Dict, etl_info: Dict) -> None:

        if self.skip_empty and df.empty:
            return
        elif df.empty:
            raise ValueError("CsvSummaryLoader: DataFrame is empty so not creating an output file.")
        else:
            output_dir = self.get_output_dir(etl_info)
            df.to_csv(os.path.join(output_dir, f"{etl_info['pipeline']}.csv"))


class PickleSummaryLoader(Loader):
    """The `PickleSummaryLoader` creates a Pickle file of the data frame from the `Transformer` stage.

    .. code-block:: yaml
       :caption: Example ETL Pipeline Design

        $ETL$:
            loaders:
                PickleSummaryLoader: {}     # with default output dir
                PickleSummaryLoader:        # with skip empty dir
                    skip_empty: True
    """

    skip_empty: bool = False
    """Ignore empty df, if set to ``False``, raises an error if the data frame is empty."""


    def load(self, df: pd.DataFrame, options: Dict, etl_info: Dict) -> None:

        if self.skip_empty and df.empty:
            return
        elif df.empty:
            raise ValueError("PickleSummaryLoader: DataFrame is empty so not creating an output file.")
        else:
            output_dir = self.get_output_dir(etl_info)
            df.to_pickle(os.path.join(output_dir, f"{etl_info['pipeline']}.pkl"))


class LatexTableLoader(Loader):
    """The `LatexTableLoader` creates a tex file of the data frame from the `Transformer` stage formatted as a Latex table.

    .. code-block:: yaml
       :caption: Example ETL Pipeline Design

        $ETL$:
            loaders:
                LatexTableLoader: {}     # with default output dir
                LatexTableLoader:        # with skip empty df
                    skip_empty: True
    """

    skip_empty: bool = False
    """Ignore empty df, if set to ``False``, raises an error if the data frame is empty."""

    def load(self, df: pd.DataFrame, options: Dict, etl_info: Dict) -> None:

        if self.skip_empty and df.empty:
            return
        elif df.empty:
            raise ValueError("LatexTableLoader: DataFrame is empty so not creating an output file.")

        output_dir = self.get_output_dir(etl_info)
        with open(
            os.path.join(output_dir, f"{etl_info['pipeline']}.tex"), "w"
        ) as file:
            df.to_latex(buf=file)

__all__ = [name for name, cl in inspect.getmembers(sys.modules[__name__], inspect.isclass) if name!="Loader" and issubclass(cl, Loader) ]

Execution

We are done with configuring experiments and now want to execute them. For this we need to move to the doe-suite folder because you interact with the DoE-Suite with a Makefile located there.

cd doe-suite

You can call make or make help to see an overview of the functionality:

make help
Show Output
$ make help
Running Experiments
  make run suite=<SUITE> id=new                       - run the experiments in the suite
  make run suite=<SUITE> id=<ID>                      - continue with the experiments in the suite with <ID> (often id=last)
  make run suite=<SUITE> id=<ID> cloud=<CLOUD>        - run suite on non-default cloud ([aws], euler)
  make run suite=<SUITE> id=<ID> expfilter=<REGEX>    - run only subset of experiments in suite where name matches the <REGEX> (suite must be valid)
  make run-keep suite=<SUITE> id=new                  - does not terminate instances at the end, otherwise works the same as run target
Clean
  make clean                                          - terminate running cloud instances belonging to the project and local cleanup
  make clean-result                                   - delete all inclomplete results in doe-suite-results
Running ETL Locally
  make etl suite=<SUITE> id=<ID>                      - run the etl pipeline of the suite (locally) to process results (often id=last)
  make etl-design suite=<SUITE> id=<ID>               - same as `make etl ...` but uses the pipeline from the suite design instead of results
  make etl-all                                        - run etl pipelines of all results
  make etl-super config=<CONFIG> out=<PATH>           - run the super etl to combine results of multiple suites  (for <CONFIG> e.g., demo_plots)
  make etl-super ... pipelines="<P1> <P2>"            - run only a subset of pipelines in the super etl
Clean ETL
  make etl-clean suite=<SUITE> id=<ID>                - delete etl results from specific suite (can be regenerated with make etl ...)
  make etl-clean-all                                  - delete etl results from all suites (can be regenerated with make etl-all)
Gather Information
  make info                                           - list available suite designs
  make status suite=<SUITE> id=<ID>                   - show the status of a specific suite run (often id=last)
Design of Experiment Suites
  make design suite=<SUITE>                           - list all the run commands defined by the suite
  make design-validate suite=<SUITE>                  - validate suite design and show with default values
Setting up a Suite
  make new                                            - initialize doe-suite-config from a template
Running Tests
  make test                                           - running all suites (seq) and comparing results to expected (on aws)
  make euler-test cloud=euler                         - running all single instance suites on euler and compare results to expected
  make etl-test-all                                   - re-run all etl pipelines and compare results to current state (useful after update of etl step)

Warning

The DoE-Suite can easily start many instances in a remote cloud. If there is an error in the execution, or the suite finishes before all jobs are complete, then these remote resources are not terminated and can generate high costs. Always check that resources are terminated. We also provide the following command to ensure that the previously started instances are terminated:

make clean

To start a new suite on the default cloud, you use:

make run suite=<YOUR-SUITE-DESIGN> id=new

When we start a new experiment suite, it receives a unique ID (epoch timestamp). Each experiment of the suite must have a unique name in the experiment design specification.

The playbook periodically checks whether an experiment run is finished and then downloads the results. The variable job_n_tries controls the maximum number of times to check whether the job finished. In between checking, the playbook waits for job_check_wait_time seconds (see doe-suite-config/group_vars/all/main.yml). After the number of job_n_tries is exceeded, the playbook aborts.

Experiments that involve multiple instances (e.g., client-server experiment) require the experiment-suite playbook to start the next job after the previous finished. The consequence is that when the playbook aborts because job_n_tries is exceeded, an already running job will continue to run on AWS, but the next job won’t start unless the experiment-suite.yml playbook runs.

For experiments that run on a single instance, all jobs are scheduled on the instance from the beginning. As a consequence, after a job completes, the next job automatically starts even when the experiment-suite.yml playbook does not run. In this case, the playbook is only required to fetch results.

To continue checking a previously started experiment, we can specify the ID of the experiment when starting the playbook:

# can replace `id=last` with actual id, e.g., `id=1655831553`
make etl suite=<YOUR-SUITE-DESIGN> id=last

After fetching new results, ETL pipelines are executed locally on your machine. It’s also possible to re-run the ETL pipelines on the result files without re-running experiments.

# can replace `id=last` with actual id, e.g., `id=1655831553`
make etl suite=<YOUR-SUITE-DESIGN> id=last

Keep Experimenting