Source code for doespy.design.etl_design

from typing import Annotated, List
from typing import Dict
from typing import Optional, Any
from typing import Union

from pydantic import Discriminator, RootModel, Tag, field_validator, model_validator, ConfigDict, Field
from pydantic import BaseModel


import warnings
import os
import re
import inspect
import sys
import jinja2
import json
import enum
import pandas as pd

from doespy import util
from doespy import info
from doespy.etl.etl_base import _load_available_processes



class MyETLBaseModel(BaseModel):
    model_config = ConfigDict(extra="forbid", use_enum_values=True)

class ETLContext(MyETLBaseModel):

    prj_id: str
    suite_name: str

    experiment_names: List[str]

    etl_pipeline_names: List[str]

    my_etl_pipeline_name: str = None


class IncludeEtlSource(MyETLBaseModel):
    suite: str = None
    config: str = None # super etl config
    template: str = None
    pipeline: str = None


    model_config = ConfigDict(extra="forbid", allow_reuse=True)

    etl_pipeline: 'ETLPipelineBase' = Field(None, alias="_ETL_PIPELINE", exclude=True)
    """:meta private:"""

    @field_validator("suite")
    @classmethod
    def check_suite_available(cls, v):
        """checks that referenced suite exists"""
        if v is not None:
            avl_suites = info.get_suite_designs()
            if v not in avl_suites:
                raise ValueError(f"source not found: suite={v}")

        return v


    @field_validator("config")
    def check_config_available(cls, v, values):
        """checks that referenced config exists"""
        if v is not None:

            avl_configs = util.get_all_super_etl_configs()
            #avl_suites = info.get_suite_designs()
            if v not in avl_configs:
                raise ValueError(f"source not found: suite={v}")

        return v

    @field_validator("template")
    @classmethod
    def check_template_available(cls, v):
        """checks that referenced etl template exists"""
        if v is not None:
            designs_dir = util.get_suite_design_etl_template_dir()
            avl_suites = info.get_suite_designs(designs_dir=designs_dir)
            if v not in avl_suites:
                raise ValueError(f"source not found: suite={v}")

        return v


    @field_validator("pipeline")
    def check_suite_xor_template(cls, v, values):

        """checks that either suite or template or config is used"""

        count = 0

        if values.data.get("suite", None) is not None:
            count += 1

        if values.data.get("config", None) is not None:
            count += 1

        if values.data.get("template", None) is not None:
            count += 1

        assert count == 1, f"IncludeSource malformed: suite, config, and template are mutually exclusive but are: suite={values.get('suite')}  config={values.get('config')}  template={values.get('template')}"

        return v


    @field_validator("pipeline")
    def check_pipeline_available(cls, v, values):
        """checks that referenced pipeline in suite or template exists"""

        if values.data.get("suite") is not None:
            avl_pipelines = info.get_etl_pipelines(values.data["suite"])
        elif values.data.get("config") is not None:
            dir = util.get_super_etl_dir()
            avl_pipelines = info.get_etl_pipelines(values.data["config"], designs_dir=dir)
        elif values.data.get("template") is not None:
            dir = util.get_suite_design_etl_template_dir()
            avl_pipelines = info.get_etl_pipelines(values.data["template"], designs_dir=dir)
        else:
            raise ValueError("no suite, config, or template found")

        if v not in avl_pipelines:
            raise ValueError(
                f"source not found: suite={values.data['suite']}  template={values.data['template']}  pipeline={v}"
            )

        return v

    @model_validator(mode="after")
    def include_etl_pipeline(self):
        """loads the external etl source (and checks that the name of extractor, transformer, and loader are ok)"""


        if self.suite is not None:
            design = util.get_suite_design(self.suite)

        elif self.config is not None:
            dir = util.get_super_etl_dir()
            design = util.get_suite_design(suite=self.config, folder=dir)

        elif self.template is not None:
            templ_dir = util.get_suite_design_etl_template_dir()
            design = util.get_suite_design(suite=self.template, folder=templ_dir)


        etl_pipeline = design["$ETL$"][self.pipeline]
        if "experiments" in etl_pipeline:
            del etl_pipeline["experiments"]

        self.etl_pipeline = ETLPipelineBase(**etl_pipeline)
        return self



def _build_extra(cls, values: Dict[str, Any]):

    extra = {}
    keys = list(values.keys())
    for k in keys:
        if k not in cls.model_fields:
            v = values.pop(k)
            extra[k] = v
    values["extra"] = extra



avl_extractors, avl_transformers, avl_loaders = _load_available_processes()

extractors = {ext: ext for ext in avl_extractors.keys()}
extractors["INCLUDE_STEPS"] = "$INCLUDE_STEPS$"
ExtractorId = enum.Enum("ExtractorId", extractors)

transformers = {tr: tr for tr in avl_transformers.keys()}
TransformerId = enum.Enum("TransformerId", transformers)

loaders = {ld: ld for ld in avl_loaders.keys()}
loaders["INCLUDE_STEPS"] = "$INCLUDE_STEPS$"
LoaderId = enum.Enum("LoaderId", loaders)



[docs]class ExtractorDesign(MyETLBaseModel): file_regex: Optional[Union[str, List[str]]] = None model_config = ConfigDict(extra="allow") @model_validator(mode="before") @classmethod def build_extra(cls, values: Dict[str, Any]) -> Dict[str, Any]: _build_extra(cls, values) return values
[docs]class TransformerDesign(MyETLBaseModel): pass
class IncludeStepTransformer(TransformerDesign): include_steps: IncludeEtlSource = Field(alias="$INCLUDE_STEPS$") model_config = ConfigDict(extra="forbid") class NamedTransformer(TransformerDesign): name: TransformerId model_config = ConfigDict(extra="allow") @model_validator(mode="before") @classmethod def build_extra(cls, values: Dict[str, Any]) -> Dict[str, Any]: _build_extra(cls, values) return values avl_df_functions = set() for name, _ in inspect.getmembers(pd.DataFrame, predicate=inspect.isfunction): if not name.startswith("_"): avl_df_functions.add(f"df.{name}") class DfTransformer(TransformerDesign): model_config = ConfigDict(extra="allow") @model_validator(mode="after") def check_df_function(self) -> Dict[str, Any]: for v in self.model_dump().keys(): assert v in avl_df_functions, f"{v} is unknown df function" return self
[docs]class LoaderDesign(MyETLBaseModel): include_steps: Optional[List[IncludeEtlSource]] = Field(None, alias="$INCLUDE_STEPS$") model_config = ConfigDict(extra="allow") extra: Dict[str, Any] @model_validator(mode="before") @classmethod def build_extra(cls, values: Dict[str, Any]) -> Dict[str, Any]: _build_extra(cls, values) return values
def get_transformer_disc_value(v: Any) -> str: if "name" in v: return "named" if all(k == "$INCLUDE_STEPS$" for k in v.keys()): return "include" if all(k.startswith("df.") for k in v.keys()): return "df" assert False, f"unknown transformer type: {v}" def get_extractor_disc_value(v: Any) -> str: if isinstance(v, list): return "include" else: return "extractor" def get_loader_disc_value(v: Any) -> str: if isinstance(v, list): return "include" else: return "loader" class ETLPipelineBase(MyETLBaseModel): include_pipeline: Optional[IncludeEtlSource] = Field(default=None, alias="$INCLUDE_PIPELINE$", exclude=True) extractors: Dict[ExtractorId, Annotated[Union[Annotated[List[IncludeEtlSource], Tag("include")], Annotated[ExtractorDesign, Tag("extractor")], Annotated[Any, Tag("EXTERNAL")]], Discriminator(get_extractor_disc_value), ]] = {} transformers: List[Annotated[Union[Annotated[NamedTransformer, Tag('named')], Annotated[DfTransformer, Tag('df')], Annotated[IncludeStepTransformer, Tag('include')], Annotated[Any, Tag('EXTERNAL')]],Discriminator(get_transformer_disc_value),]] = [] # TODO [nku] loaders should also be possible to be a list because may want to use same loader id multiple times loaders: Dict[LoaderId, Annotated[Union[Annotated[List[IncludeEtlSource], Tag("include")], Annotated[LoaderDesign, Tag("loader")], Annotated[Any, Tag("EXTERNAL")]], Discriminator(get_loader_disc_value), ]] = {} model_config = ConfigDict(extra="forbid", use_enum_values=False) @model_validator(mode="after") def inc_pipeline(self): if self.include_pipeline is not None: assert len(self.extractors) == 0 assert len(self.transformers) == 0 assert len(self.loaders) == 0 self.extractors = self.include_pipeline.etl_pipeline.extractors self.transformers = self.include_pipeline.etl_pipeline.transformers self.loaders = self.include_pipeline.etl_pipeline.loaders self.include_pipeline = None # mark as complete return self @model_validator(mode="after") def inc_extractors(self): if ExtractorId.INCLUDE_STEPS in self.extractors: for include_etl_source in self.extractors[ExtractorId.INCLUDE_STEPS]: for k, v in include_etl_source.etl_pipeline.extractors.items(): self.extractors[k] = v del self.extractors[ExtractorId.INCLUDE_STEPS] return self @model_validator(mode="after") def inc_transformer(self): steps = [] for t in self.transformers: if isinstance(t, IncludeStepTransformer): for step in t.include_steps.etl_pipeline.transformers: steps.append(step) else: steps.append(t) self.transformers = steps return self @model_validator(mode="after") def inc_loader(self): if LoaderId.INCLUDE_STEPS in self.loaders: for include_etl_source in self.loaders[LoaderId.INCLUDE_STEPS]: for k, v in include_etl_source.etl_pipeline.loaders.items(): self.loaders[k] = v del self.loaders[LoaderId.INCLUDE_STEPS] return self class ExperimentNames(RootModel): root: Union[str, List[str]] @field_validator("root") @classmethod def check_exp(cls, v): assert v == "*" or isinstance(v, list), f"experiments must be a list of strings or * but is: {v}" return v
[docs]class ETLPipeline(ETLPipelineBase): ctx: ETLContext = Field(alias="_CTX", exclude=True, ) """:meta private:""" experiments: ExperimentNames etl_vars: Optional[Dict[str, Any]] = Field(None, alias="$ETL_VARS$") model_config = ConfigDict(extra="forbid") @model_validator(mode="after") def check_experiments(self): """Resolves * in experiments and ensures that every experiment listed, also exists in the suite """ experiments = self.experiments.root avl_experiments = self.ctx.experiment_names if experiments == "*": self.experiments.root = avl_experiments else: missing = list(set(experiments).difference(avl_experiments)) assert len(missing) == 0, f"Non-existing experiments: {missing} (Ctx={self.ctx})" return self @model_validator(mode="after") def resolve_etl_vars(self): """The field ``$ETL_VARS$`` allows defining variables that can be used for options of extractors, transformers, and loaders. Use the ``[% VAR %]`` syntax to refer to those variables. """ def include_etl_vars(base, etl_vars): env = util.jinja2_env( loader=None, undefined=jinja2.StrictUndefined, variable_start_string="[%", variable_end_string="%]", ) template = json.dumps(base) while "[%" in template and "%]" in template: template = env.from_string(template) template = template.render(**etl_vars) return json.loads(template) etl_vars = self.etl_vars if etl_vars is not None: from itertools import chain for step in chain(self.extractors.values(), self.transformers, self.loaders.values()): if hasattr(step, 'extra') and len(step.extra) > 0: step.extra = include_etl_vars(step.extra, etl_vars) self.etl_vars = None return self @model_validator(mode="after") def check_extractors(self): """check that each extractor (also included) defines the correct values""" #print(f"Create Extractor from ExtractorDesign (and check Extractor fields)") d = {} for name_enum, extractor in self.extractors.items(): assert isinstance(name_enum, ExtractorId), f"invalid extractor id: {name_enum}" if isinstance(extractor, ExtractorDesign): args = extractor.model_dump() if "extra" in args: del args["extra"] if "file_regex" in args and args["file_regex"] is None: del args["file_regex"] args = {**extractor.extra, **args} ext = avl_extractors[name_enum.value](**args) d[name_enum] = ext else: d[name_enum] = extractor self.extractors = d return self @model_validator(mode="after") def check_transformers(self): """check that each transformer (also included) defines the correct values""" #print(f"Create Transformer from TransformerDesign (and check Transformer fields)") steps = [] for t in self.transformers: assert not isinstance(t, IncludeStepTransformer) if isinstance(t, NamedTransformer) and t.name is not None: trans = avl_transformers[t.name](**t.extra, name=t.name) steps.append(trans) else: steps.append(t) self.transformers = steps return self @model_validator(mode="after") def check_loaders(self): """check that each loader (also included) defines the correct values""" #print(f"Create Loader from LoaderDesign (and check Loader fields)") d = {} for name_enum, loader in self.loaders.items(): assert isinstance(name_enum, LoaderId), f"invalid loader id: {name_enum}" if isinstance(loader, LoaderDesign): ext = avl_loaders[name_enum.value](**loader.extra) d[name_enum] = ext else: d[name_enum] = loader self.loaders = d return self
suite_names = dict() for s in util.get_does_results(): suite_names[s["suite"].replace("-","_")] = s["suite"] for s in info.get_suite_designs(): suite_names[s.replace("-","_")] = s SuiteName = enum.Enum("SuiteName", suite_names) """Name of the available experiment suites in `doe-suite-config/designs`.""" class SuperETLContext(MyETLBaseModel): model_config = ConfigDict(use_enum_values=False) class SuperETLSuiteContext(MyETLBaseModel): experiment_names: List[str] prj_id: str suites: Dict[SuiteName, SuperETLSuiteContext] my_etl_pipeline_name: str = None class SuperETLPipeline(ETLPipeline): ctx: SuperETLContext = Field(alias="_CTX", exclude=True) """:meta private:""" experiments: Dict[SuiteName, ExperimentNames] # etl_vars: Note -> inherited from ETLPipeline @model_validator(mode="after") def check_experiments(self): """Resolves * in experiments and #ensures that every experiment listed, also exists in the suite """ for suite_name, experiments in self.experiments.items(): avl_experiments = self.ctx.suites[suite_name].experiment_names if experiments.root == "*": experiments.root = avl_experiments else: missing = list(set(experiments.root).difference(avl_experiments)) assert len(missing) == 0, f"Non-existing experiments: {missing} (Ctx={self.ctx})" return self class SuperETL(MyETLBaseModel): ctx: SuperETLContext = Field(alias="_CTX", exclude=True) """:meta private:""" suite_id: Dict[SuiteName, Union[Union[str, int], Dict[str, Union[str, int]]]] = Field(alias="$SUITE_ID$") etl: Dict[str, SuperETLPipeline] = Field(alias="$ETL$") @model_validator(mode="before") @classmethod def context(cls, values): # build info on avl suite ids existing_suite_ids = {} for d in util.get_all_suite_results_dir(): if d['suite'] not in existing_suite_ids: existing_suite_ids[d['suite']] = set() existing_suite_ids[d['suite']].add(d['suite_id']) # print(f"existing_suite_ids={existing_suite_ids}") for suite_name, suite_id_entry in values.get("$SUITE_ID$", {}).items(): # 1. Check that we have a result for this suite assert suite_name in existing_suite_ids, f"no results for suite={suite_name} exist" # 2. Convert to dict representation if isinstance(suite_id_entry, str) or isinstance(suite_id_entry, int): # meaning: every experiment in that suite should be based on this id # -> load all experiments for this suite exps = util.get_does_result_experiments(suite=suite_name, suite_id=suite_id_entry) values["$SUITE_ID$"][suite_name] = {exp: suite_id_entry for exp in exps} # 3. Resolve default suite id elif isinstance(suite_id_entry, dict): default_suite_id = suite_id_entry.pop("$DEFAULT$", None) assert default_suite_id is None or isinstance(default_suite_id, str) or isinstance(default_suite_id, int), f"default suite id must be string or int but is {default_suite_id}" if default_suite_id is not None: # use default suite id for all experiments that are not explicitly set for exp_name in util.get_does_result_experiments(suite=suite_name, suite_id=default_suite_id): if exp_name not in suite_id_entry: suite_id_entry[exp_name] = default_suite_id else: raise ValueError(f"format of $SUITE_ID$ is invalid for suite={suite_name}") # -> now values["$SUITE_ID$"] is a dict of {suite_name: {exp_name: suite_id}} suites = {} all_exps_d = {} for suite_name, suite_id_entry in values.get("$SUITE_ID$", {}).items(): assert isinstance(suite_id_entry, dict), f"format of $SUITE_ID$ is invalid for suite={suite_name}" exps = [] # we have a dict of {exp_name: suite_id} for exp_name, suite_id in suite_id_entry.items(): if (suite_name, suite_id) not in all_exps_d: all_exps_d[(suite_name, suite_id)] = util.get_does_result_experiments(suite=suite_name, suite_id=suite_id) assert exp_name in all_exps_d[(suite_name, suite_id)], f"experiment={exp_name} does not exist in suite={suite_name} suite_id={suite_id_entry}" exps.append(exp_name) suites[suite_name] = { "experiment_names": list(set(exps)), # TODO [nku] maybe here load all avaialble etl pipelines from somewhere? -> maybe only from super_etl locally or from templates #"etl_pipeline_names": values['ctx'].suites[suite_name].etl_pipeline_names } #print(f"suites={suites}") #print(f"$SUITE_ID$={values.get('$SUITE_ID$', {})}") ctx = { "prj_id": util.get_project_id(), "suites": suites } values["_CTX"] = ctx # ETLContext for etl_name, etl_pipeline in values.get("$ETL$", {}).items(): etl_ctx = ctx.copy() etl_ctx["my_etl_pipeline_name"] = etl_name etl_pipeline["_CTX"] = etl_ctx return values