Skip to content

hyfi.task

BatchPath

Bases: TaskPath

Source code in hyfi/path/batch.py
class BatchPath(TaskPath):
    _config_name_: str = "__batch__"

    batch_name: str = "demo-batch"

    @property
    def batch_dir(self) -> Path:
        """
        Returns the path to the batch directory.
        """
        return self.task_dir / self.batch_name

    @property
    def workspace_dir(self) -> Path:
        """
        Returns the path to the task root directory.

        Returns:
            an path to the task root directory or None if it doesn't exist or cannot be converted to a path object
        """
        return self.batch_dir

batch_dir: Path property

Returns the path to the batch directory.

workspace_dir: Path property

Returns the path to the task root directory.

Returns:

Type Description
Path

an path to the task root directory or None if it doesn't exist or cannot be converted to a path object

BatchTask

Bases: Task

Configuration class for batch tasks. Inherits from TaskConfig.

Attributes:

Name Type Description
_config_name_ str

The name of the configuration.

_config_group_ str

The configuration group.

batch_name str

The name of the batch.

batch BatchConfig

The batch configuration.

_property_set_methods_ Dict[str, str]

A dictionary of property set methods.

Source code in hyfi/task/batch.py
class BatchTask(Task):
    """
    Configuration class for batch tasks. Inherits from TaskConfig.

    Attributes:
        _config_name_ (str): The name of the configuration.
        _config_group_ (str): The configuration group.
        batch_name (str): The name of the batch.
        batch (BatchConfig): The batch configuration.
        _property_set_methods_ (Dict[str, str]): A dictionary of property set methods.
    """

    _config_name_: str = "__batch__"

    batch_name: str = "demo"
    batch: Batch = Batch()
    path: BatchPath = BatchPath()

    _property_set_methods_ = {
        "task_name": "set_task_name",
        "task_root": "set_task_root",
        "batch_name": "set_batch_name",
    }
    # _subconfigs_ = {"batch": BatchConfig}

    def set_batch_name(self, val: str):
        if not val:
            raise ValueError("Batch name cannot be empty")
        if not self.batch_name or self.batch_name != val:
            self.path.batch_name = val
            self.batch.batch_name = val

    def set_batch_num(self, val: Optional[int] = None):
        self.batch.batch_num = val

    def set_task_name(self, val: str):
        if not val:
            raise ValueError("Task name cannot be empty")
        if not self.task_name or self.task_name != val:
            self.path.task_name = val
            self.batch.batch_root = str(self.output_dir)

    def set_task_root(self, val: Union[str, Path]):
        if not val:
            raise ValueError("Task root cannot be empty")
        if not self.task_root or self.task_root != val:
            self.path.task_root = str(val)
            self.batch.batch_root = str(self.path.task_dir)

    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        # At init, set the batch root to the task root
        # Othertimes, the batch root is set to the task root when the task root is set
        self.batch.batch_root = str(self.path.task_dir)
        # Call set_batch_num to set the batch_num and batch_id correctly after init
        if self.batch.batch_num_auto:
            self.batch.set_batch_num()
        logger.info(
            "Initalized batch: %s(%s) in %s",
            self.batch_name,
            self.batch_num,
            self.batch_dir,
        )

    @property
    def batch_num(self) -> int:
        return self.batch.batch_num

    @batch_num.setter
    def batch_num(self, val: Optional[int]):
        self.set_batch_num(val)

    @property
    def batch_id(self) -> str:
        return self.batch.batch_id

    @property
    def seed(self) -> int:
        return self.batch.seed

    @property
    def batch_dir(self) -> Path:
        return self.batch.batch_dir

    @property
    def device(self) -> str:
        return self.batch.device

    @property
    def num_devices(self) -> int:
        return self.batch.num_devices

    def save_config(
        self,
        filepath: Optional[Union[str, Path]] = None,
        exclude: Optional[Union[str, List[str], Set[str], None]] = None,
        exclude_none: bool = True,
        only_include: Optional[Union[str, List[str], Set[str], None]] = None,
        save_as_json_as_well: bool = True,
    ) -> str:
        """
        Save the batch configuration to file.

        Args:
            filepath (Optional[Union[str, Path]]): The filepath to save the configuration to. Defaults to None.
            exclude (Optional[Union[str, List[str], Set[str], None]]): Keys to exclude from the saved configuration.
                Defaults to None.
            exclude_none (bool): Whether to exclude keys with None values from the saved configuration. Defaults to True.
            only_include (Optional[Union[str, List[str], Set[str], None]]): Keys to include in the saved configuration.
                Defaults to None.
            save_as_json_as_well (bool): Whether to save the configuration as a json file as well. Defaults to True.

        Returns:
            str: The filename of the saved configuration.
        """
        if not self.batch:
            raise ValueError("No batch configuration to save")
        if not filepath:
            filepath = self.batch.config_filepath

        if save_as_json_as_well:
            self.save_config_as_json(
                exclude=exclude,
                exclude_none=exclude_none,
                only_include=only_include,
            )
        return super().save_config(
            filepath=filepath,
            exclude=exclude,
            exclude_none=exclude_none,
            only_include=only_include,
        )

    def save_config_as_json(
        self,
        filepath: Optional[Union[str, Path]] = None,
        exclude: Optional[Union[str, List[str], Set[str], None]] = None,
        exclude_none: bool = True,
        only_include: Optional[Union[str, List[str], Set[str], None]] = None,
    ) -> str:
        if not self.batch:
            raise ValueError("No batch configuration to save")
        if not filepath:
            filepath = self.batch.config_jsonpath
        return super().save_config_as_json(
            filepath=filepath,
            exclude=exclude,
            exclude_none=exclude_none,
            only_include=only_include,
        )

    def load_config(
        self,
        batch_name: Optional[str] = None,
        batch_num: Optional[int] = None,
        filepath: Optional[Union[str, Path]] = None,
        **config_kwargs,
    ) -> Dict:
        """Load the config from the batch config file"""
        if not self.batch:
            raise ValueError("No batch configuration to load")
        if not batch_name:
            batch_name = self.batch_name
        if batch_num is None:
            batch_num = -1
        if not filepath and batch_num >= 0:
            batch = Batch(
                batch_root=self.batch.batch_root,
                batch_name=batch_name,
                batch_num=batch_num,
            )
            filepath = batch.config_filepath
        if isinstance(filepath, str):
            filepath = Path(filepath)

        if self.verbose:
            logger.info(
                "> Loading config for batch_name: %s batch_num: %s",
                batch_name,
                batch_num,
            )
        cfg = self.export_config()
        if filepath:
            if filepath.is_file():
                logger.info("Loading config from %s", filepath)
                batch_cfg = Composer.load(filepath)
                logger.info("Merging config with the loaded config")
                cfg = Composer.merge(cfg, batch_cfg)
            else:
                logger.info("No config file found at %s", filepath)
        if self.verbose:
            logger.info("Updating config with config_kwargs: %s", config_kwargs)
        cfg = Composer.update(Composer.to_dict(cfg), config_kwargs)

        # initialize self with the config
        self.__init__(**cfg)

        return self.model_dump()

    def print_config(
        self,
        batch_name: Optional[str] = None,
        batch_num: Optional[int] = None,
    ):
        self.load_config(batch_name, batch_num)
        Composer.print(self.model_dump())

load_config(batch_name=None, batch_num=None, filepath=None, **config_kwargs)

Load the config from the batch config file

Source code in hyfi/task/batch.py
def load_config(
    self,
    batch_name: Optional[str] = None,
    batch_num: Optional[int] = None,
    filepath: Optional[Union[str, Path]] = None,
    **config_kwargs,
) -> Dict:
    """Load the config from the batch config file"""
    if not self.batch:
        raise ValueError("No batch configuration to load")
    if not batch_name:
        batch_name = self.batch_name
    if batch_num is None:
        batch_num = -1
    if not filepath and batch_num >= 0:
        batch = Batch(
            batch_root=self.batch.batch_root,
            batch_name=batch_name,
            batch_num=batch_num,
        )
        filepath = batch.config_filepath
    if isinstance(filepath, str):
        filepath = Path(filepath)

    if self.verbose:
        logger.info(
            "> Loading config for batch_name: %s batch_num: %s",
            batch_name,
            batch_num,
        )
    cfg = self.export_config()
    if filepath:
        if filepath.is_file():
            logger.info("Loading config from %s", filepath)
            batch_cfg = Composer.load(filepath)
            logger.info("Merging config with the loaded config")
            cfg = Composer.merge(cfg, batch_cfg)
        else:
            logger.info("No config file found at %s", filepath)
    if self.verbose:
        logger.info("Updating config with config_kwargs: %s", config_kwargs)
    cfg = Composer.update(Composer.to_dict(cfg), config_kwargs)

    # initialize self with the config
    self.__init__(**cfg)

    return self.model_dump()

save_config(filepath=None, exclude=None, exclude_none=True, only_include=None, save_as_json_as_well=True)

Save the batch configuration to file.

Parameters:

Name Type Description Default
filepath Optional[Union[str, Path]]

The filepath to save the configuration to. Defaults to None.

None
exclude Optional[Union[str, List[str], Set[str], None]]

Keys to exclude from the saved configuration. Defaults to None.

None
exclude_none bool

Whether to exclude keys with None values from the saved configuration. Defaults to True.

True
only_include Optional[Union[str, List[str], Set[str], None]]

Keys to include in the saved configuration. Defaults to None.

None
save_as_json_as_well bool

Whether to save the configuration as a json file as well. Defaults to True.

True

Returns:

Name Type Description
str str

The filename of the saved configuration.

Source code in hyfi/task/batch.py
def save_config(
    self,
    filepath: Optional[Union[str, Path]] = None,
    exclude: Optional[Union[str, List[str], Set[str], None]] = None,
    exclude_none: bool = True,
    only_include: Optional[Union[str, List[str], Set[str], None]] = None,
    save_as_json_as_well: bool = True,
) -> str:
    """
    Save the batch configuration to file.

    Args:
        filepath (Optional[Union[str, Path]]): The filepath to save the configuration to. Defaults to None.
        exclude (Optional[Union[str, List[str], Set[str], None]]): Keys to exclude from the saved configuration.
            Defaults to None.
        exclude_none (bool): Whether to exclude keys with None values from the saved configuration. Defaults to True.
        only_include (Optional[Union[str, List[str], Set[str], None]]): Keys to include in the saved configuration.
            Defaults to None.
        save_as_json_as_well (bool): Whether to save the configuration as a json file as well. Defaults to True.

    Returns:
        str: The filename of the saved configuration.
    """
    if not self.batch:
        raise ValueError("No batch configuration to save")
    if not filepath:
        filepath = self.batch.config_filepath

    if save_as_json_as_well:
        self.save_config_as_json(
            exclude=exclude,
            exclude_none=exclude_none,
            only_include=only_include,
        )
    return super().save_config(
        filepath=filepath,
        exclude=exclude,
        exclude_none=exclude_none,
        only_include=only_include,
    )

Task

Bases: BaseConfig

Source code in hyfi/task/task.py
class Task(BaseConfig):
    _config_name_: str = "__init__"
    _config_group_: str = "/task"

    task_name: str = "demo-task"
    task_root: str = "workspace"
    version: str = "0.0.0"
    module: Optional[Module] = None
    path: TaskPath = TaskPath()
    pipelines: Optional[List[Union[str, Dict]]] = []

    _property_set_methods_ = {
        "task_name": "set_task_name",
        "task_root": "set_task_root",
    }

    def set_task_root(self, val: Union[str, Path]):
        if not self.task_root or self.task_root != val:
            self.path.task_root = str(val)

    def set_task_name(self, val):
        if not self.task_name or self.task_name != val:
            self.path.task_name = val

    @property
    def config(self):
        return self.model_dump()

    @property
    def root_dir(self) -> Path:
        return self.path.root_dir

    @property
    def task_dir(self) -> Path:
        return self.path.task_dir

    @property
    def project_dir(self) -> Path:
        return self.path.project_dir

    @property
    def workspace_dir(self) -> Path:
        return self.path.workspace_dir

    @property
    def output_dir(self) -> Path:
        return self.path.output_dir

    @property
    def model_dir(self) -> Path:
        return self.path.model_dir

    @property
    def log_dir(self) -> Path:
        return self.path.log_dir

    @property
    def cache_dir(self) -> Path:
        return self.path.cache_dir

    @property
    def library_dir(self) -> Path:
        return self.path.library_dir

    @property
    def dataset_dir(self):
        return self.path.dataset_dir

    def print_config(self):
        Composer.print(self.config)

    def load_modules(self):
        """Load the modules"""
        if not self.module:
            logger.info("No module to load")
            return
        if not self.module.modules:
            logger.info("No modules to load")
            return
        library_dir = self.library_dir
        for module in self.module.modules:
            name = module.name
            libname = module.libname
            liburi = module.liburi
            specname = module.specname
            libpath = library_dir / libname
            syspath = module.get("syspath")
            if syspath is not None:
                syspath = library_dir / syspath
            PKGs.ensure_import_module(name, libpath, liburi, specname, syspath)

    def reset(self, objects=None, release_gpu_memory=True):
        """Reset the memory cache"""
        if isinstance(objects, list):
            for obj in objects:
                del obj
        if release_gpu_memory:
            from hyfi.utils.gpumon import GPUMon

            GPUMon.release_gpu_memory()

    def get_pipelines(self) -> Pipelines:
        """
        Get the list of pipelines for a task

        Args:
            task: The task to get the pipelines for

        Returns:
            A list of PipelineConfig objects
        """
        self.pipelines = self.pipelines or []
        pipelines: Pipelines = []
        for name in self.pipelines:
            if isinstance(name, str) and isinstance(getattr(self, name), dict):
                pipeline = Pipeline(**getattr(self, name))
                if not pipeline.name:
                    pipeline.name = name
                pipelines.append(pipeline)
        return pipelines

    def run(
        self,
        pipelines: Optional[Pipelines] = None,
    ):
        """
        Run pipelines specified in the task

        Args:
            pipelines: The pipelines to run
        """
        # Run all pipelines in the task.
        pipelines = pipelines or self.get_pipelines()
        if self.verbose:
            logger.info("Running %s pipeline(s)", len(pipelines or []))
        with elapsed_timer(format_time=True) as elapsed:
            for pipeline in pipelines:
                if self.verbose:
                    logger.info("Running pipeline: %s", pipeline.name)
                initial_object = self if pipeline.use_task_as_initial_object else None
                self.run_pipeline(pipeline, initial_object)
            # Print the elapsed time.
            if self.verbose:
                logger.info(
                    " >> elapsed time for the task with %s pipelines: %s",
                    len(pipelines or []),
                    elapsed(),
                )

    def run_pipeline(
        self,
        pipeline: Union[Dict, Pipeline],
        initial_object: Optional[Any] = None,
    ) -> Any:
        """
        Run a pipeline given a config

        Args:
            config: PipelineConfig to run the pipeline
            initial_obj: Object to use as initial value
            task: TaskConfig to use as task

        Returns:
            The result of the pipeline
        """
        # If config is not a PipelineConfig object it will be converted to a PipelineConfig object.
        if not isinstance(pipeline, Pipeline):
            pipeline = Pipeline(**Composer.to_dict(pipeline))
        pipes = pipeline.get_pipes()
        if (
            initial_object is None
            and pipeline.initial_object is not None
            and Composer.is_instantiatable(pipeline.initial_object)
        ):
            initial_object = Composer.instantiate(pipeline.initial_object)
        # Return initial object for the initial object
        if not pipes:
            logger.warning("No pipes specified")
            return initial_object

        pipe_names = [pipe.run for pipe in pipes]
        logger.info("Applying %s pipes: %s", len(pipe_names), pipe_names)
        # Run the task in the current directory.
        if self is None:
            self = Task()
        with elapsed_timer(format_time=True) as elapsed:
            with change_directory(self.workspace_dir):
                rst = reduce(run_pipe, pipes, initial_object)
            # Print the elapsed time.
            if pipeline.verbose:
                logger.info(
                    " >> elapsed time for the pipeline with %s pipes: %s",
                    len(pipes),
                    elapsed(),
                )
        return rst

get_pipelines()

Get the list of pipelines for a task

Parameters:

Name Type Description Default
task

The task to get the pipelines for

required

Returns:

Type Description
Pipelines

A list of PipelineConfig objects

Source code in hyfi/task/task.py
def get_pipelines(self) -> Pipelines:
    """
    Get the list of pipelines for a task

    Args:
        task: The task to get the pipelines for

    Returns:
        A list of PipelineConfig objects
    """
    self.pipelines = self.pipelines or []
    pipelines: Pipelines = []
    for name in self.pipelines:
        if isinstance(name, str) and isinstance(getattr(self, name), dict):
            pipeline = Pipeline(**getattr(self, name))
            if not pipeline.name:
                pipeline.name = name
            pipelines.append(pipeline)
    return pipelines

load_modules()

Load the modules

Source code in hyfi/task/task.py
def load_modules(self):
    """Load the modules"""
    if not self.module:
        logger.info("No module to load")
        return
    if not self.module.modules:
        logger.info("No modules to load")
        return
    library_dir = self.library_dir
    for module in self.module.modules:
        name = module.name
        libname = module.libname
        liburi = module.liburi
        specname = module.specname
        libpath = library_dir / libname
        syspath = module.get("syspath")
        if syspath is not None:
            syspath = library_dir / syspath
        PKGs.ensure_import_module(name, libpath, liburi, specname, syspath)

reset(objects=None, release_gpu_memory=True)

Reset the memory cache

Source code in hyfi/task/task.py
def reset(self, objects=None, release_gpu_memory=True):
    """Reset the memory cache"""
    if isinstance(objects, list):
        for obj in objects:
            del obj
    if release_gpu_memory:
        from hyfi.utils.gpumon import GPUMon

        GPUMon.release_gpu_memory()

run(pipelines=None)

Run pipelines specified in the task

Parameters:

Name Type Description Default
pipelines Optional[Pipelines]

The pipelines to run

None
Source code in hyfi/task/task.py
def run(
    self,
    pipelines: Optional[Pipelines] = None,
):
    """
    Run pipelines specified in the task

    Args:
        pipelines: The pipelines to run
    """
    # Run all pipelines in the task.
    pipelines = pipelines or self.get_pipelines()
    if self.verbose:
        logger.info("Running %s pipeline(s)", len(pipelines or []))
    with elapsed_timer(format_time=True) as elapsed:
        for pipeline in pipelines:
            if self.verbose:
                logger.info("Running pipeline: %s", pipeline.name)
            initial_object = self if pipeline.use_task_as_initial_object else None
            self.run_pipeline(pipeline, initial_object)
        # Print the elapsed time.
        if self.verbose:
            logger.info(
                " >> elapsed time for the task with %s pipelines: %s",
                len(pipelines or []),
                elapsed(),
            )

run_pipeline(pipeline, initial_object=None)

Run a pipeline given a config

Parameters:

Name Type Description Default
config

PipelineConfig to run the pipeline

required
initial_obj

Object to use as initial value

required
task

TaskConfig to use as task

required

Returns:

Type Description
Any

The result of the pipeline

Source code in hyfi/task/task.py
def run_pipeline(
    self,
    pipeline: Union[Dict, Pipeline],
    initial_object: Optional[Any] = None,
) -> Any:
    """
    Run a pipeline given a config

    Args:
        config: PipelineConfig to run the pipeline
        initial_obj: Object to use as initial value
        task: TaskConfig to use as task

    Returns:
        The result of the pipeline
    """
    # If config is not a PipelineConfig object it will be converted to a PipelineConfig object.
    if not isinstance(pipeline, Pipeline):
        pipeline = Pipeline(**Composer.to_dict(pipeline))
    pipes = pipeline.get_pipes()
    if (
        initial_object is None
        and pipeline.initial_object is not None
        and Composer.is_instantiatable(pipeline.initial_object)
    ):
        initial_object = Composer.instantiate(pipeline.initial_object)
    # Return initial object for the initial object
    if not pipes:
        logger.warning("No pipes specified")
        return initial_object

    pipe_names = [pipe.run for pipe in pipes]
    logger.info("Applying %s pipes: %s", len(pipe_names), pipe_names)
    # Run the task in the current directory.
    if self is None:
        self = Task()
    with elapsed_timer(format_time=True) as elapsed:
        with change_directory(self.workspace_dir):
            rst = reduce(run_pipe, pipes, initial_object)
        # Print the elapsed time.
        if pipeline.verbose:
            logger.info(
                " >> elapsed time for the pipeline with %s pipes: %s",
                len(pipes),
                elapsed(),
            )
    return rst

TaskPath

Bases: BasePath

Source code in hyfi/path/task.py
class TaskPath(BasePath):
    _config_name_: str = "__task__"

    task_name: str = __default_task_name__
    task_root: str = __default_task_root__

    @property
    def name(self) -> str:
        """
        Returns the name of the path configuration.
        """
        return self.task_name

    @property
    def project_dir(self) -> Path:
        return global_config.project_dir

    @property
    def project_workspace_dir(self) -> Path:
        return global_config.project_workspace_dir

    @property
    def root_dir(self) -> Path:
        """
        Returns the path to the task root directory.

        Returns:
            an path to the task root directory or None if it doesn't exist or cannot be converted to a path object
        """
        # return as an path
        if self.task_root == __default_task_root__:
            path_ = self.project_workspace_dir
        else:
            path_ = Path(self.task_root)
            if not path_.is_absolute():
                path_ = self.project_dir / path_
        return path_.absolute()

    @property
    def task_dir(self) -> Path:
        """
        Returns the path to the task root directory.

        Returns:
            an path to the task root directory or None if it doesn't exist or cannot be converted to a path object
        """
        return self.root_dir / self.task_name

    @property
    def workspace_dir(self) -> Path:
        """
        Returns the path to the task root directory.

        Returns:
            an path to the task root directory or None if it doesn't exist or cannot be converted to a path object
        """
        return self.root_dir

name: str property

Returns the name of the path configuration.

root_dir: Path property

Returns the path to the task root directory.

Returns:

Type Description
Path

an path to the task root directory or None if it doesn't exist or cannot be converted to a path object

task_dir: Path property

Returns the path to the task root directory.

Returns:

Type Description
Path

an path to the task root directory or None if it doesn't exist or cannot be converted to a path object

workspace_dir: Path property

Returns the path to the task root directory.

Returns:

Type Description
Path

an path to the task root directory or None if it doesn't exist or cannot be converted to a path object