Source code for doespy.etl.steps.extractors

from abc import ABC, abstractmethod, abstractproperty
from typing import List, Dict, Union
from typing import ClassVar

import warnings
import ruamel.yaml
import json
import csv
from dataclasses import dataclass, field

import sys
import inspect

from pydantic import BaseModel, validator


class Extractor(BaseModel, ABC):

    file_regex: Union[str, List[str]] = None

    class Config:
        extra = "forbid"

    @classmethod
    def default_file_regex(cls):
        pass

    @validator("file_regex", pre=True, always=True)
    def set_default_regex(cls, value):

        if value is None:
            value = cls.default_file_regex()

        if not isinstance(value, list):
            value = [value]

        return value

    @abstractmethod
    def extract(self, path: str, options: Dict) -> List[Dict]:
        """Reads the file defined by `path`, and converts it into a list of dicts.
            For example, the CsvExtractor reads a csv and returns a dict for each row.
        Args:
            path (str): absolute path of file to extract
            options (Dict): extractor options as provided in the ETL definition
        Returns:
            List[Dict]: results found in the file
        """

        # NOTE: Extending classes should not use the `options: Dict` and instead use instance variables for parameters


        pass


[docs]class YamlExtractor(Extractor): """ The `YamlExtractor` reads result files as YAML. The YAML file can contain either a single object (result) or a list of objects (results). .. code-block:: yaml :caption: Example ETL Pipeline Design $ETL$: extractors: YamlExtractor: {} # with default file_regex YamlExtractor: # with custom file_regex file_regex: [out.yml] """ file_regex: Union[str, List[str]] = [r".*\.yaml$", r".*\.yml$"] def extract(self, path: str, options: Dict) -> List[Dict]: # load file as yaml: if top level element is object -> return one element list with open(path, "r") as f: data = ruamel.yaml.safe_load(f) if not isinstance(data, list): data = [data] return data
[docs]class JsonExtractor(Extractor): """ The `JsonExtractor` reads result files as JSON. The JSON file can contain either a single object (result) or a list of objects (results). .. code-block:: yaml :caption: Example ETL Pipeline Design $ETL$: extractors: JsonExtractor: {} # with default file_regex JsonExtractor: # with custom file_regex file_regex: [out.json] """ file_regex: Union[str, List[str]] = [r".*\.json$"] def extract(self, path: str, options: Dict) -> List[Dict]: # load file as json: if top level element is object -> return one element list with open(path, "r") as f: data = json.load(f) if not isinstance(data, list): data = [data] return data
[docs]class CsvExtractor(Extractor): """ The `CsvExtractor` reads result files as CSV. The CSV file contains a result per line and by default starts with a header row, see ``has_header`` and ``fieldnames`` for CSV files without header. .. code-block:: yaml :caption: Example ETL Pipeline Design $ETL$: extractors: CsvExtractor: {} # with default params CsvExtractor: # with custom params file_regex: [out.csv] delimiter: ; has_header: False fieldnames: [col1, col2, col3] """ file_regex: Union[str, List[str]] = [r".*\.csv$"] """The regex list to match result files.""" delimiter: str = "," """The separator between columns.""" has_header: bool = True """Indicates whether the first CSV row is a header or not.""" fieldnames: List[str] = None """The names of the CSV columns if ``has_header`` is set to `False`""" def extract(self, path: str, options: Dict) -> List[Dict]: # load file as csv: by default treats the first line as a header # for each later row, we create a dict and add it to the result list to return data = [] with open(path, "r") as f: if self.has_header or self.fieldnames is not None: reader = csv.DictReader(f, delimiter=self.delimiter, fieldnames=self.fieldnames) else: reader = csv.reader(f, delimiter=self.delimiter) for row in reader: data.append(row) return data
[docs]class ErrorExtractor(Extractor): """ The `ErrorExtractor` provides a mechanism to detect potential errors in an experiment job. For experiments with a large number of jobs, it is easy to overlook an error because there are many output folders and files e.g., the stderr.log of each job. The `ErrorExtractor` raises a warning if matching files are not empty. .. code-block:: yaml :caption: Example ETL Pipeline Design $ETL$: extractors: ErrorExtractor: {} # checking stderr.log ErrorExtractor: # checking custom files file_regex: [stderr.log, error.log] """ file_regex: Union[str, List[str]] = ["^stderr.log$"] """The regex list to match result files.""" def extract(self, path: str, options: Dict) -> List[Dict]: # if the file is present and not empty, then throws a warning with open(path, "r") as f: content = f.read().replace("\n", " ") if ( content.strip() and not content.strip().isspace() ): # ignore empty error files # TODO [nku] should we not raise an error in the etl pipeline? warnings.warn(f"found error file: {path}") warnings.warn(f" {content}") return []
[docs]class IgnoreExtractor(Extractor): """ The `IgnoreExtractor` provides a mechanism to detect potential errors in an experiment job. For experiments with a large number of jobs, it is easy to overlook an error indicted by the presence of an unexpected file. As a result, the ETL requires that every file in the results folder of the job must be matched by exactly one Extractor. The `IgnoreExtractor` can be used to ignore certain files on purpose, e.g., stdout.log. .. code-block:: yaml :caption: Example ETL Pipeline Design $ETL$: extractors: IgnoreExtractor: {} # ignore stdout.log IgnoreExtractor: # custom ignore list file_regex: [stdout.log, other.txt] """ file_regex: Union[str, List[str]] = ["^stdout.log$"] """The regex list to match result files.""" def extract(self, path: str, options: Dict) -> List[Dict]: # ignore this file # -> every file produced by an experiment run needs to be matched to exactly one extractor. # the ignore extractor can be used to ignore some files return []
__all__ = [name for name, cl in inspect.getmembers(sys.modules[__name__], inspect.isclass) if name!="Extractor" and issubclass(cl, Extractor) ]