Source code for doespy.design.exp_design

from typing import List
from typing import Dict
from typing import Optional, Any
from typing import Literal
from typing import Union

from pydantic import Field
from pydantic import BaseModel
from pydantic import root_validator
from pydantic import validator
from pydantic import ValidationError
from pydantic import PydanticValueError

import warnings
import os
import re
import inspect
import sys
import ruamel.yaml
import enum
import json

from doespy import util
from doespy.design import dutil
from doespy.design import etl_design


class MyBaseModel(BaseModel):
    class Config:
        extra = "forbid"
        smart_union = True
        use_enum_values = True


HostTypeId = enum.Enum("HostTypeId", {ht.replace("-", "_"): ht for ht in util.get_host_types()})
"""Name of a host type and corresponds to folder in `doe-suite-config/group_vars`."""

SetupRoleId = enum.Enum("SetupRoleId", {x.replace("-", "_"):  x for x in util.get_setup_roles()})
"""Name of an Ansible role to setup a host. The role is located in folder: `doe-suite-config/roles`."""


class Cmd(MyBaseModel):
    __root__: str

[docs]class HostType(MyBaseModel): n: int = 1 check_status: bool = True init_roles: Union[SetupRoleId, List[SetupRoleId]] = [] cmd: Union[Cmd, Dict[str, Cmd], List[Cmd], List[Dict[str, Cmd]]] = Field(alias="$CMD$") class Config: extra = "forbid" smart_union = True @validator("init_roles") def convert_init_roles(cls, v): if not isinstance(v, list): return [v] else: return v @root_validator(skip_on_failure=True) def convert_cmd(cls, values): """ `cmd` is a list of length n, and each element is a dict that contains at least one entry with key "main" # minimal example n: 1 $CMD$: - main: X # two instances, one command n: 2 $CMD$: - main: X - main: Y # two instances, multiple commands per instance n: 2 $CMD$: - main: X monitor: Z # on first host instance also start `monitor` cmd Z - main: Y """ # GOAL: $CMD$: [{"main": cmd1}, {"main": cmd2}] for n==2 cmd = values["cmd"] if not isinstance(cmd, list): # not a list => # repeat the same cmd for all `n` hosts of this type values["cmd"] = [cmd] * values["n"] # host_type_raw["$CMD$"] is a list of length n assert isinstance(values["cmd"], list) and len(values["cmd"]) == values["n"], "cmd list length does not match the number of instances `n` of host type" cmds = [] for cmd in values["cmd"]: if isinstance(cmd, Cmd): cmd = {"main": cmd} elif isinstance(cmd, dict): assert "main" in cmd, "missing cmd for main" else: raise ValueError("unknown type") cmds.append(cmd) values["cmd"] = cmds return values
class ExperimentConfigDict(MyBaseModel): class Config: extra = "allow" @root_validator(skip_on_failure=True) def include_vars(cls, values): """At any depth of the config dict, we can include variables from another file. ``$INCLUDE_VARS$: Optional[Union[str, List[str]]]`` Where str corresponds to the file name, e.g., ``test.yml``, in ``doe-suite-config/designs/design_vars`` All the variables in the external file, are included at the level of where ``$INCLUDE_VARS$`` is located. If a variable is already present, then the variable is skipped. """ # do nothing (only here for documentation) # -> the resolving for both suite vars and base exp config happens in BaseExperimentConfig.__init__ return values def resolve_include_vars(values): info = [] info_len = None while info_len != len(info): info_len = len(info) # mark initial len -> if change -> len changes if len(info) > 100: raise warnings.warn("More than 100 $INCLUDE_VARS$, are you sure you did not define an infinite loop of includes?") for path, value in dutil.nested_dict_iter(values): assert "$INCLUDE_VARS$" not in path[:-1], f"Illegal $INCLUDE_VARS$ formatting: {'.'.join(['d'] + path)} (must be a string or a list of strings)" if path[-1] == "$INCLUDE_VARS$": d = values for p in path[:-1]: d = d[p] del d["$INCLUDE_VARS$"] if isinstance(value, str): value = [value] assert isinstance(value, list) and all(isinstance(s, str) for s in value), f"Illegal $INCLUDE_VARS$ formatting: {'.'.join(['d'] + path)} (must be a string or a list of strings)" for external_file in value: # value is the path relative to external dir info_str = f"{'.'.join(['d'] + path)}: {external_file}" file = os.path.join(util.get_suite_design_vars_dir(), external_file) assert os.path.exists(file), f"File not found: {file} for {info_str}" with open(file, "r") as f: vars = ruamel.yaml.safe_load(f) skipped_info, included_info = dutil.include_vars(d, vars) info += [(info_str, {"skipped": skipped_info, "included": included_info})] # break after every include because include can change "values" # and thus we need to change the nested dict iter process break # TODO [nku] could try to properly log this or move away from validation # output info on which files where included for include_info, details in info: print(f" $INCLUDE_VARS$: {include_info}") print(f" SKIPPED (already present):") for s in details["skipped"]: print(f" {s}:") print(f" INCLUDED:") for s in details["included"]: print(f" {s}:") return values
[docs]class SuiteVarsConfigDict(ExperimentConfigDict): pass
class Context(MyBaseModel): prj_id: str suite_name: str suite_vars: SuiteVarsConfigDict = None experiment_names: List[str] etl_pipeline_names: List[str] my_experiment_name: str = None my_experiment_factor_paths_levellist: List[List[str]] = [] my_experiment_factor_paths_cross: List[List[str]] = [] def merge_suite_vars(ctx, values): """The ``$SUITE_VARS$`` can define a config that belongs to every experiment of the suite. Each experiment defines it's own config in ``base_experiment`` but also inherits config from ``$SUITE_VARS$``. When merging the config from ``$SUITE_VARS$`` into the ``base_experiment``, the config in ``base_experiment`` takes precedence, i.e., is not overwritten. (Config in the ``base_experiment`` can overwrite config defined in ``$SUITE_VARS$``) """ # at this point, both the suite_vars and the vars here resolved all the $INCLUDE_VARS$ individually # -> now need to merge them and the base_experiment vars have precedence #ctx = values["ctx"] if ctx['suite_vars'] is not None: suite_vars_d = ctx['suite_vars'] assert "$INCLUDE_VARS$" not in str(suite_vars_d), f"$INCLUDE_VARS$ not resolved in $SUITE_VARS$: {suite_vars_d}" assert "$INCLUDE_VARS$" not in str(values), f"$INCLUDE_VARS$ not resolved in base_experiment: {values}" if len(suite_vars_d) > 0: skipped_info, included_info = dutil.include_vars(values, suite_vars_d) # TODO [nku] could try to properly log this or move away from validation print(f" $MERGE_SUITE_VARS$") print(f" SKIPPED (already present):") for s in skipped_info: print(f" {s}:") print(f" INCLUDED:") for s in included_info: print(f" {s}:") return values def identify_factors(values): """Validates the ``$FACTOR$`` syntax. Case 1: A ``$FACTOR$`` can be a value, and thus requires an entry in the ``factor_levels`` of the experiment. Case 2: A ``$FACTOR$`` can be a key, but then the corresponding value must be a list of factor levels for this factor. """ factors_levellist = [] factors_cross = [] # extract `path`of all factors from base experiment info = [] for path, value in dutil.nested_dict_iter(values): if value == "$FACTOR$": info += [f"$FACTOR$ (Level Syntax) -> {'.'.join(['d'] + path)}: $FACTOR$"] factors_levellist.append(path) if path[-1] == "$FACTOR$": if not isinstance(value, list): raise ValueError( "if $FACTOR$ is the key, then value must be a list of levels", f"(path={path} value={value})", ) info += [f"$FACTOR$ (Cross Syntax) -> {'.'.join(['d'] + path)}: {value}"] factors_cross.append(path) #values["ctx"].my_experiment_factor_paths_levellist = factors_levellist #values["ctx"].my_experiment_factor_paths_cross = factors_cross # TODO [nku] could try to properly log this or move away from validation for i in info: print(f" {i}") return factors_levellist, factors_cross
[docs]class BaseExperimentConfigDict(ExperimentConfigDict): # TODO [later] Can we define custom schemas for project and then enforce # that they must be present in a certain form across all suites/experiments? # (maybe should always be marked as optional but if present, than in that form) ctx: Context = Field(alias="_CTX", exclude=True) """:meta private:"""
[docs] def __init__(self, *args, **kwargs): # HACK: Because Pydantic does not preserve order for extra parameters (https://github.com/samuelcolvin/pydantic/issues/1234) # we assign them in order after the class has been created # separate extra vars from non extra vars non_extra_fields = set() for k, v in self.__fields__.items(): non_extra_fields.add(k) if v.alias is not None: non_extra_fields.add(v.alias) extra_kwargs = {} for k in list(kwargs): if k not in non_extra_fields: extra_kwargs[k] = kwargs.pop(k) # first resolve the $INCLUDE_VARS$ extra_kwargs = ExperimentConfigDict.resolve_include_vars(extra_kwargs) # then resolve the $INCLUDE_VARS$ in $SUITE_VARS$ if kwargs["_CTX"]['suite_vars'] is not None: kwargs["_CTX"]['suite_vars'] = ExperimentConfigDict.resolve_include_vars(kwargs["_CTX"]['suite_vars']) # add the variables from the $SUITE_VARS$ extra_kwargs = merge_suite_vars(kwargs["_CTX"], extra_kwargs) # identify factors in extra values factors_levellist, factors_cross = identify_factors(extra_kwargs) kwargs["_CTX"]["my_experiment_factor_paths_levellist"] = factors_levellist kwargs["_CTX"]["my_experiment_factor_paths_cross"] = factors_cross # init the actual class super().__init__(*args, **kwargs) # restoring the extra values old_allow_mutation = self.__config__.allow_mutation self.__config__.allow_mutation = True for k, v in extra_kwargs.items(): setattr(self, k, v) self.__config__.allow_mutation = old_allow_mutation
[docs]class Experiment(MyBaseModel): """An experiment is composed of a set of runs, each with its own unique configuration. The configurations of the runs vary different experiment factors, e.g., number of clients. Additionally, an experiment also specifies the hosts responsible for executing the runs. """ ctx: Context = Field(None, alias="_CTX", exclude=True) """:meta private:""" n_repetitions: int = 1 """Number of repetitions with the same experiment run configuration.""" common_roles: Union[SetupRoleId, List[SetupRoleId]] = [] """Ansible roles executed during the setup of all host types.""" host_types: Dict[HostTypeId, HostType] """The different :ref`host types<Host Type>` involved in the experiment.""" base_experiment: BaseExperimentConfigDict """Defines constants and factors of an experiment.""" factor_levels: List[Dict] = [{}] """For the factors of an experiment, lists the different levels. For example, `n_clients` can be a factor with two levels: 1 and 100.""" except_filters: List[Dict] = [] """A list of filters that can be used to exclude certain runs from the experiment. """ class Config: extra = "forbid"
[docs] @root_validator(pre=True, skip_on_failure=True) def context(cls, values): base_experiment = values.get("base_experiment") # we remove the ctx because it becomes out of date (due to setting factors in base_experiment) ctx = values.pop("_CTX") if base_experiment: base_experiment["_CTX"] = ctx return values
[docs] @validator("common_roles") def convert_common_roles(cls, v): if not isinstance(v, list): return [v] else: return v
[docs] @root_validator(skip_on_failure=True) def check_factor_levels(cls, values): """The ``base_experiment`` defines a set of $FACTOR$s that use the level list syntax. (i.e., $FACTOR$ is value). This validator checks that this set of $FACTOR$s matches each list entry of ``factor_levels``. """ # after setting factor fields in base_experiment, the ctx is here up to date again values['ctx'] = values["base_experiment"].ctx values["base_experiment"].ctx = None expected_factor_paths = values['ctx'].my_experiment_factor_paths_levellist for run in values.get("factor_levels"): actual_factors = [] for path, _value in dutil.nested_dict_iter(run): actual_factors.append(path) assert sorted(expected_factor_paths) == sorted(actual_factors), \ f"expected factors do not match actual factors: \ expected={expected_factor_paths} actual={actual_factors}" return values
[docs] @root_validator(skip_on_failure=True) def check_except_filters(cls, values): """Every entry in ``except_filters`` must be a subset of the actual factors. """ all_factors = set() # add level factors for x in values['ctx'].my_experiment_factor_paths_levellist: all_factors.add(tuple(x)) for x in values['ctx'].my_experiment_factor_paths_cross: assert x[-1] == "$FACTOR$" all_factors.add(tuple(x[:-1])) # remove the $FACTOR$ for filt in values.get("except_filters"): filtered_factors = set() for path, _value in dutil.nested_dict_iter(filt): filtered_factors.add(tuple(path)) assert filtered_factors.issubset(all_factors), \ f"except_filters entry is not a subset of the actual factors: \ except_filter={filtered_factors} all_factors={all_factors}" return values
# TODO [nku] could also extract some of them automatically from pydantic models? RESERVED_KEYWORDS = ["state", "$FACTOR$", "is_controller_yes", "is_controller_no", "check_status_yes", "check_status_no", "localhost", "n_repetitions", "common_roles", "host_types", "base_experiment", "factor_levels", "n", "init_roles", "check_status", "$CMD$"] def get_keywords(): keywords = set() for name, cl in inspect.getmembers(sys.modules[__name__], inspect.isclass): if issubclass(cl, BaseModel): for k in cl.__fields__.keys(): keywords.add(k) return keywords class SuiteDesign(MyBaseModel): ctx: Context = Field(alias="_CTX", exclude=True) """:meta private:""" suite_vars: SuiteVarsConfigDict = Field(alias="$SUITE_VARS$", default={}) experiment_designs: Dict[str, Experiment] etl: Dict[str, etl_design.ETLPipeline] = Field(alias="$ETL$", default={}) class Config: extra = "forbid" @root_validator(pre=True, skip_on_failure=True) def context(cls, values): ctx = values["_CTX"] ctx["experiment_names"] = list(values["experiment_designs"].keys()) ctx["etl_pipeline_names"] = list(values.get("$ETL$", {}).keys()) # ETLContext for etl_name, etl_pipeline in values.get("$ETL$", {}).items(): etl_ctx = ctx.copy() etl_ctx["my_etl_pipeline_name"] = etl_name etl_pipeline["_CTX"] = etl_ctx # EXPContext ctx["suite_vars"] = values.get("$SUITE_VARS$", None) for exp_name, exp in values.get("experiment_designs").items(): exp_ctx = ctx.copy() exp_ctx["my_experiment_name"] = exp_name exp["_CTX"] = exp_ctx return values @validator("experiment_designs") def check_exp_names(cls, v): # TODO [nku] check min length and forbidden keywords for exp_name in v.keys(): assert exp_name not in RESERVED_KEYWORDS, f'experiment name: "{exp_name}" is not allowed (reserved keyword)' assert len(exp_name) <= 200, f'experiment name: "{exp_name}" is not allowed (too long, limit=200)' assert re.match(r"^[A-Za-z0-9_]+$", exp_name), f'experiment name: "{exp_name}" is not allowed (must consist of alphanumeric chars or _)' return v
[docs]class Suite(MyBaseModel): """ A suite is a collection of experiments, denoted as `<EXP1>`, `<EXP2>`, etc. Each experiment has its own set of config variables. It is also possible to define variables that are shared by all experiments in the suite, referred to as `SUITE_VARS`. In addition to the experiments, a suite can also define an `ETL` (extract, transform, load) pipeline, which outlines the steps for processing the resulting files. """ suite_vars: SuiteVarsConfigDict = Field(alias="$SUITE_VARS$", default={}) """Shared variables belonging to every experiment of the suite.""" exp1: Experiment = Field(alias="<EXP1>") """A suite needs to contain at least one :ref:`experiment<design/design:Experiment>`. Choose a descriptive experiment name for the placeholder `<EXP1>`.""" exp2: Optional[Experiment] = Field(alias="<EXP2>") """Further :ref:`experiments<design/design:Experiment>` are optional. Choose a descriptive experiment name for the placeholder `<EXP2>`, `<EXP3>`, etc.""" etl: Dict[str, etl_design.ETLPipeline] = Field(alias="$ETL$", default={}) """:ref:`design/design:ETL Pipeline` to process the result files."""
def dict_to_pydantic(suite_name, suite_design_raw): suite_design = {"experiment_designs": {}} for exp_name, design in suite_design_raw.items(): if exp_name not in ["$ETL$", "$SUITE_VARS$"]: suite_design["experiment_designs"][exp_name] = design elif exp_name == "$ETL$": suite_design["$ETL$"] = design elif exp_name == "$SUITE_VARS$": suite_design["$SUITE_VARS$"] = design ctx = { "prj_id": util.get_project_id(), "suite_name": suite_name } suite_design["_CTX"] = ctx # check the pydantic model to check model = SuiteDesign(**suite_design) return model def pydantic_to_dict(model): suite_design = {} for name, exp in model.experiment_designs.items(): exp_design = exp.json(by_alias=True, exclude_none=True) suite_design[name] = json.loads(exp_design) suite_design["$ETL$"] = {} for name, pipeline in model.etl.items(): etl_pipeline = pipeline.json(by_alias=True, exclude_none=True) d = json.loads(etl_pipeline) # custom reordering of experiments exps = d.pop("experiments") d = {"experiments": exps, **d} suite_design["$ETL$"][name] = d return suite_design if __name__ == "__main__": print(get_keywords())