Skip to content

hyfi.pipeline

BaseRun

Bases: BaseModel

Run Configuration

Source code in hyfi/pipeline/config.py
class BaseRun(BaseModel):
    """Run Configuration"""

    run: Optional[Union[str, Dict[str, Any]]] = {}
    verbose: bool = False

    model_config = ConfigDict(
        arbitrary_types_allowed=True,
        extra="allow",
        validate_assignment=False,
        alias_generator=Composer.generate_alias_for_special_keys,
    )  # type: ignore

    @model_validator(mode="before")
    def validate_model_config_before(cls, data):
        # logger.debug("Validating model config before validating each field.")
        return Composer.replace_special_keys(Composer.to_dict(data))

    @property
    def run_config(self) -> Dict[str, Any]:
        if self.run and isinstance(self.run, str):
            return {"_target_": self.run}
        return Composer.to_dict(self.run) or {}

    @property
    def run_target(self) -> str:
        return self.run_config.get("_target_") or ""

    @property
    def run_kwargs(self) -> Dict[str, Any]:
        _kwargs = self.run_config.copy()
        _kwargs.pop("_target_", None)
        _kwargs.pop("_partial_", None)
        return _kwargs

PIPELINEs

A class to run a pipeline.

Source code in hyfi/pipeline/pipeline.py
class PIPELINEs:
    """
    A class to run a pipeline.
    """

    @staticmethod
    def run_pipeline(
        config: Union[Dict, Pipeline],
        initial_object: Optional[Any] = None,
        task: Optional[Task] = None,
    ) -> Any:
        """
        Run a pipeline given a config

        Args:
            config: PipelineConfig to run the pipeline
            initial_obj: Object to use as initial value
            task: TaskConfig to use as task

        Returns:
            The result of the pipeline
        """
        if task is None:
            task = Task()
        return task.run_pipeline(config, initial_object)

    @staticmethod
    def run_pipe(
        obj: Any,
        config: Union[Dict, Pipe],
    ) -> Any:
        """
        Run a pipe on an object

        Args:
            obj: The object to pipe on
            config: The configuration for the pipe

        Returns:
            The result of the pipe
        """
        return run_pipe(obj, config)

    @staticmethod
    def run_task(
        task: Task,
        dryrun: bool = False,
    ):
        """
        Run pipelines specified in the task

        Args:
            task: TaskConfig to run pipelines for
            project: ProjectConfig to run pipelines
        """
        if dryrun:
            print("\nDryrun is enabled, not running the HyFI task\n")
            return
        task.run()

    @staticmethod
    def run_workflow(workflow: Workflow, dryrun: bool = False):
        """
        Run the tasks specified in the workflow

        Args:
            workflow: WorkflowConfig object to run
        """
        if dryrun:
            print("\nDryrun is enabled, not running the HyFI workflow\n")
            return
        workflow.run()

    @staticmethod
    def pipe(**kwargs) -> Pipe:
        """
        Return the PipeConfig.

        Args:
            **kwargs: Additional keyword arguments to pass to the PipeConfig constructor.

        Returns:
            PipeConfig: An instance of the PipeConfig class.
        """
        return Pipe(**kwargs)

pipe(**kwargs) staticmethod

Return the PipeConfig.

Parameters:

Name Type Description Default
**kwargs

Additional keyword arguments to pass to the PipeConfig constructor.

{}

Returns:

Name Type Description
PipeConfig Pipe

An instance of the PipeConfig class.

Source code in hyfi/pipeline/pipeline.py
@staticmethod
def pipe(**kwargs) -> Pipe:
    """
    Return the PipeConfig.

    Args:
        **kwargs: Additional keyword arguments to pass to the PipeConfig constructor.

    Returns:
        PipeConfig: An instance of the PipeConfig class.
    """
    return Pipe(**kwargs)

run_pipe(obj, config) staticmethod

Run a pipe on an object

Parameters:

Name Type Description Default
obj Any

The object to pipe on

required
config Union[Dict, Pipe]

The configuration for the pipe

required

Returns:

Type Description
Any

The result of the pipe

Source code in hyfi/pipeline/pipeline.py
@staticmethod
def run_pipe(
    obj: Any,
    config: Union[Dict, Pipe],
) -> Any:
    """
    Run a pipe on an object

    Args:
        obj: The object to pipe on
        config: The configuration for the pipe

    Returns:
        The result of the pipe
    """
    return run_pipe(obj, config)

run_pipeline(config, initial_object=None, task=None) staticmethod

Run a pipeline given a config

Parameters:

Name Type Description Default
config Union[Dict, Pipeline]

PipelineConfig to run the pipeline

required
initial_obj

Object to use as initial value

required
task Optional[Task]

TaskConfig to use as task

None

Returns:

Type Description
Any

The result of the pipeline

Source code in hyfi/pipeline/pipeline.py
@staticmethod
def run_pipeline(
    config: Union[Dict, Pipeline],
    initial_object: Optional[Any] = None,
    task: Optional[Task] = None,
) -> Any:
    """
    Run a pipeline given a config

    Args:
        config: PipelineConfig to run the pipeline
        initial_obj: Object to use as initial value
        task: TaskConfig to use as task

    Returns:
        The result of the pipeline
    """
    if task is None:
        task = Task()
    return task.run_pipeline(config, initial_object)

run_task(task, dryrun=False) staticmethod

Run pipelines specified in the task

Parameters:

Name Type Description Default
task Task

TaskConfig to run pipelines for

required
project

ProjectConfig to run pipelines

required
Source code in hyfi/pipeline/pipeline.py
@staticmethod
def run_task(
    task: Task,
    dryrun: bool = False,
):
    """
    Run pipelines specified in the task

    Args:
        task: TaskConfig to run pipelines for
        project: ProjectConfig to run pipelines
    """
    if dryrun:
        print("\nDryrun is enabled, not running the HyFI task\n")
        return
    task.run()

run_workflow(workflow, dryrun=False) staticmethod

Run the tasks specified in the workflow

Parameters:

Name Type Description Default
workflow Workflow

WorkflowConfig object to run

required
Source code in hyfi/pipeline/pipeline.py
@staticmethod
def run_workflow(workflow: Workflow, dryrun: bool = False):
    """
    Run the tasks specified in the workflow

    Args:
        workflow: WorkflowConfig object to run
    """
    if dryrun:
        print("\nDryrun is enabled, not running the HyFI workflow\n")
        return
    workflow.run()

Pipe

Bases: BaseRun

Pipe Configuration

Source code in hyfi/pipeline/config.py
class Pipe(BaseRun):
    """Pipe Configuration"""

    pipe_target: str = ""
    name: Optional[str] = ""
    desc: Optional[str] = ""
    env: Optional[Dict[str, Any]] = {}
    use_pipe_obj: bool = True
    pipe_obj_arg_name: Optional[str] = ""
    return_pipe_obj: bool = False
    # task: Optional[TaskConfig] = None

    def set_enviroment(self):
        if self.env:
            ENVs.check_and_set_osenv_vars(self.env)

    def get_pipe_func(self) -> Optional[Callable]:
        if self.pipe_target.startswith("lambda"):
            raise NotImplementedError("Lambda functions are not supported. (dangerous)")
            # return eval(self.pipe_target)
        elif self.pipe_target:
            return Composer.partial(self.pipe_target)
        else:
            return None

    def get_run_func(self) -> Optional[Callable]:
        run_cfg = self.run_config
        run_target = self.run_target
        if run_target and run_target.startswith("lambda"):
            raise NotImplementedError("Lambda functions are not supported. (dangerous)")
            # return eval(run_target)
        elif run_cfg:
            if self.pipe_obj_arg_name:
                run_cfg.pop(self.pipe_obj_arg_name)
            logger.info(
                "Returning partial function: %s with kwargs: %s", run_target, run_cfg
            )
            return Composer.partial(run_cfg)
        else:
            logger.warning("No function found for %s", self)
            return None

Pipeline

Bases: BaseRun

Pipeline Configuration

Source code in hyfi/pipeline/config.py
class Pipeline(BaseRun):
    """Pipeline Configuration"""

    name: Optional[str] = ""
    steps: Optional[List[Union[str, Dict]]] = []
    initial_object: Optional[Any] = None
    use_task_as_initial_object: bool = False

    @field_validator("steps", mode="before")
    def steps_to_list(cls, v):
        """
        Convert a list of steps to a list

        Args:
            cls: class to use for conversion
            v: list of steps to convert

        Returns:
            list of steps converted to
        """
        return [v] if isinstance(v, str) else Composer.to_dict(v)

    def update_configs(
        self,
        rc: Union[Dict, Running],
    ):
        """
        Update running config with values from another config

        Args:
            rc: RunningConfig to update from
        """
        # If rc is a dict or dict it will be converted to RunningConfig.
        if isinstance(rc, dict):
            rc = Running(**rc)
        self.name = rc.name or self.name
        self.desc = rc.desc or self.desc

    def get_pipes(self) -> Pipes:
        """
        Get all pipes that this task is aware of

        Args:
            task: The task to use for the pipe

        Returns:
            A list of : class : `PipeConfig` objects
        """
        pipes: Pipes = []
        self.steps = self.steps or []
        # Add pipes to the pipeline.
        for rc in get_running_configs(self.steps):
            # Add a pipe to the pipeline.
            config = getattr(self, rc.uses, None)
            if isinstance(config, dict):
                pipe = Pipe(**Composer.update(config, rc.model_dump()))
                # Set the task to be used for the pipe.
                # if task is not None:
                #     pipe.task = task
                pipes.append(pipe)
        return pipes

get_pipes()

Get all pipes that this task is aware of

Parameters:

Name Type Description Default
task

The task to use for the pipe

required

Returns:

Type Description
Pipes

A list of : class : PipeConfig objects

Source code in hyfi/pipeline/config.py
def get_pipes(self) -> Pipes:
    """
    Get all pipes that this task is aware of

    Args:
        task: The task to use for the pipe

    Returns:
        A list of : class : `PipeConfig` objects
    """
    pipes: Pipes = []
    self.steps = self.steps or []
    # Add pipes to the pipeline.
    for rc in get_running_configs(self.steps):
        # Add a pipe to the pipeline.
        config = getattr(self, rc.uses, None)
        if isinstance(config, dict):
            pipe = Pipe(**Composer.update(config, rc.model_dump()))
            # Set the task to be used for the pipe.
            # if task is not None:
            #     pipe.task = task
            pipes.append(pipe)
    return pipes

steps_to_list(v)

Convert a list of steps to a list

Parameters:

Name Type Description Default
cls

class to use for conversion

required
v

list of steps to convert

required

Returns:

Type Description

list of steps converted to

Source code in hyfi/pipeline/config.py
@field_validator("steps", mode="before")
def steps_to_list(cls, v):
    """
    Convert a list of steps to a list

    Args:
        cls: class to use for conversion
        v: list of steps to convert

    Returns:
        list of steps converted to
    """
    return [v] if isinstance(v, str) else Composer.to_dict(v)

update_configs(rc)

Update running config with values from another config

Parameters:

Name Type Description Default
rc Union[Dict, Running]

RunningConfig to update from

required
Source code in hyfi/pipeline/config.py
def update_configs(
    self,
    rc: Union[Dict, Running],
):
    """
    Update running config with values from another config

    Args:
        rc: RunningConfig to update from
    """
    # If rc is a dict or dict it will be converted to RunningConfig.
    if isinstance(rc, dict):
        rc = Running(**rc)
    self.name = rc.name or self.name
    self.desc = rc.desc or self.desc

Running

Bases: BaseRun

Running Configuration

Source code in hyfi/pipeline/config.py
class Running(BaseRun):
    """Running Configuration"""

    uses: str = ""