Skip to content

hyfi.utils

CONFs

Source code in hyfi/utils/conf.py
class CONFs:
    @staticmethod
    def select(
        cfg: Any,
        key: str,
        default: Any = None,
        throw_on_resolution_failure: bool = True,
        throw_on_missing: bool = False,
    ):
        """
        Wrapper for OmegaConf. select value from a config object using a key.

        Args:
            cfg: Config node to select from
            key: Key to select
            default: Default value to return if key is not found
            throw_on_resolution_failure: Raise an exception if an interpolation
                resolution error occurs, otherwise return None
            throw_on_missing: Raise an exception if an attempt to select a missing key (with the value '???')
                is made, otherwise return None

        Returns:
            selected value or None if not found.
        """
        key = key.replace("/", ".")
        return OmegaConf.select(
            cfg,
            key=key,
            default=default,
            throw_on_resolution_failure=throw_on_resolution_failure,
            throw_on_missing=throw_on_missing,
        )

    @staticmethod
    def to_dict(cfg: Any) -> Any:
        """
        Convert a config to a dict

        Args:
            cfg: The config to convert.

        Returns:
            The dict representation of the config.
        """
        # Convert a config object to a config object.
        if isinstance(cfg, dict):
            cfg = CONFs.to_config(cfg)
        # Returns a container for the given config.
        if isinstance(cfg, (DictConfig, ListConfig)):
            return OmegaConf.to_container(
                cfg,
                resolve=True,
                throw_on_missing=False,
                structured_config_mode=SCMode.DICT,
            )
        return cfg

    @staticmethod
    def to_config(cfg: Any) -> Union[DictConfig, ListConfig]:
        """
        Convert a config object to OmegaConf

        Args:
            cfg: The config to convert.

        Returns:
            A Config object that corresponds to the given config.
        """
        return OmegaConf.create(cfg)

    @staticmethod
    def print(cfg: Any, resolve: bool = True, **kwargs):
        """
        Prints the configuration object in a human-readable format.

        Args:
            cfg (Any): The configuration object to print.
            resolve (bool, optional): Whether to resolve the configuration object before printing. Defaults to True.
            **kwargs: Additional keyword arguments to pass to the pprint.pprint function.

        Returns:
            None
        """
        import pprint

        if CONFs.is_config(cfg):
            if resolve:
                pprint.pprint(CONFs.to_dict(cfg), **kwargs)
            else:
                pprint.pprint(cfg, **kwargs)
        else:
            print(cfg)

    @staticmethod
    def is_dictlike(cfg: Any):
        """
        Determines whether the input object is a valid dictionary configuration object.

        Args:
            cfg (Any): The object to check.

        Returns:
            bool: True if the object is a valid dictionary configuration object, False otherwise.
        """
        return isinstance(cfg, DictLike)

    @staticmethod
    def is_listlike(cfg: Any):
        """
        Determines whether the input object is a valid list configuration object.

        Args:
            cfg (Any): The object to check.

        Returns:
            bool: True if the object is a valid list configuration object, False otherwise.
        """
        return isinstance(cfg, ListLike)

    @staticmethod
    def is_config(cfg: Any):
        """
        Determines whether the input object is a valid configuration object.

        Args:
            cfg (Any): The object to check.

        Returns:
            bool: True if the object is a valid configuration object, False otherwise.
        """
        return isinstance(cfg, (DictConfig, dict))

    @staticmethod
    def is_list(cfg: Any):
        """
        Determines whether the input object is a valid list configuration object.

        Args:
            cfg (Any): The object to check.

        Returns:
            bool: True if the object is a valid list configuration object, False otherwise.
        """
        return isinstance(cfg, (ListConfig, list))

    @staticmethod
    def load(file_: Union[str, Path, IO[Any]]) -> Union[DictConfig, ListConfig]:
        """
        Load a configuration file and return a configuration object.

        Args:
            file_ (Union[str, Path, IO[Any]]): The path to the configuration file or a file-like object.

        Returns:
            Union[DictConfig, ListConfig]: The configuration object.
        """
        return OmegaConf.load(file_)

    @staticmethod
    def save(config: Any, f: Union[str, Path, IO[Any]], resolve: bool = False) -> None:
        """
        Save a configuration object to a file.

        Args:
            config (Any): The configuration object to save.
            f (Union[str, Path, IO[Any]]): The path to the file or a file-like object.
            resolve (bool, optional): Whether to resolve the configuration object before saving. Defaults to False.
        """
        os.makedirs(os.path.dirname(str(f)), exist_ok=True)
        OmegaConf.save(config, f, resolve=resolve)

    @staticmethod
    def save_json(
        json_dict: dict,
        f: Union[str, Path, IO[Any]],
        indent=4,
        ensure_ascii=False,
        default=None,
        encoding="utf-8",
        **kwargs,
    ):
        """
        Save a dictionary to a JSON file.

        Args:
            json_dict (dict): The dictionary to save.
            f (Union[str, Path, IO[Any]]): The path to the file or a file-like object.
            indent (int, optional): The number of spaces to use for indentation. Defaults to 4.
            ensure_ascii (bool, optional): Whether to escape non-ASCII characters. Defaults to False.
            default (Any, optional): A function to convert non-serializable objects. Defaults to None.
            encoding (str, optional): The encoding to use. Defaults to "utf-8".
            **kwargs: Additional arguments to pass to json.dump().
        """
        f = str(f)
        os.makedirs(os.path.dirname(f), exist_ok=True)
        with open(f, "w", encoding=encoding) as f:
            json.dump(
                json_dict,
                f,
                indent=indent,
                ensure_ascii=ensure_ascii,
                default=default,
                **kwargs,
            )

    @staticmethod
    def load_json(f: Union[str, Path, IO[Any]], encoding="utf-8", **kwargs) -> dict:
        """
        Load a JSON file into a dictionary.

        Args:
            f (Union[str, Path, IO[Any]]): The path to the file or a file-like object.
            encoding (str, optional): The encoding to use. Defaults to "utf-8".
            **kwargs: Additional arguments to pass to json.load().

        Returns:
            dict: The dictionary loaded from the JSON file.
        """
        f = str(f)
        with open(f, "r", encoding=encoding) as f:
            return json.load(f, **kwargs)

    @staticmethod
    def update(_dict: Mapping[str, Any], _overrides: Mapping[str, Any]) -> Mapping:
        """
        Update a dictionary with overrides.

        Args:
            _dict (Mapping[str, Any]): The dictionary to update.
            _overrides (Mapping[str, Any]): The dictionary with overrides.

        Returns:
            Mapping: The updated dictionary.
        """
        for k, v in _overrides.items():
            if isinstance(v, collections.abc.Mapping):
                _dict[k] = CONFs.update((_dict.get(k) or {}), v)  # type: ignore
            else:
                _dict[k] = v  # type: ignore
        return _dict

    @staticmethod
    def replace_keys(_dict: Mapping[str, Any], old_key: str, new_key: str) -> Mapping:
        """
        Replace a key in a dictionary.

        Args:
            _dict (Mapping[str, Any]): The dictionary to update.
            old_key (str): The old key to replace.
            new_key (str): The new key to use.

        Returns:
            Mapping: The updated dictionary.
        """
        _new_dict = {}
        for k, v in _dict.items():
            key = new_key if k == old_key else k
            if isinstance(v, collections.abc.Mapping):
                _new_dict[key] = CONFs.replace_keys(v, old_key, new_key)
            else:
                _new_dict[key] = v
        return _new_dict

    @staticmethod
    def merge(
        *configs: Union[
            DictConfig,
            ListConfig,
            Dict[DictKeyType, Any],
            List[Any],
            Tuple[Any, ...],
            Any,
        ],
    ) -> Union[ListConfig, DictConfig]:
        """
        Merge a list of previously created configs into a single one.

        Args:
            *configs: Input configs.

        Returns:
            Union[ListConfig, DictConfig]: The merged config object.
        """
        return OmegaConf.merge(*configs)

    @staticmethod
    def merge_as_dict(
        *configs: Union[
            DictConfig,
            ListConfig,
            Dict[DictKeyType, Any],
            List[Any],
            Tuple[Any, ...],
            Any,
        ],
    ) -> Union[ListConfig, DictConfig]:
        """
        Merge a list of previously created configs into a single dictionary.

        Args:
            *configs: Input configs.

        Returns:
            Union[ListConfig, DictConfig]: The merged config object as a dictionary.
        """
        return CONFs.to_dict(OmegaConf.merge(*configs))

    @staticmethod
    def to_yaml(cfg: Any, resolve: bool = False, sort_keys: bool = False) -> str:
        """
        Convert the input config object to a YAML string.

        Args:
            cfg (Any): The input config object.
            resolve (bool, optional): Whether to resolve the config object before converting it to YAML. Defaults to False.
            sort_keys (bool, optional): Whether to sort the keys in the resulting YAML string. Defaults to False.

        Returns:
            str: The YAML string representation of the input config object.
        """
        if resolve:
            cfg = CONFs.to_dict(cfg)
        return OmegaConf.to_yaml(cfg, resolve=resolve, sort_keys=sort_keys)

    @staticmethod
    def to_container(
        cfg: Any,
        resolve: bool = False,
        throw_on_missing: bool = False,
        enum_to_str: bool = False,
        structured_config_mode: SCMode = SCMode.DICT,
    ):
        """
        Convert the input config object to a nested container (e.g. dictionary).

        Args:
            cfg (Any): The input config object.
            resolve (bool, optional): Whether to resolve the config object before converting it to a container. Defaults to False.
            throw_on_missing (bool, optional): Whether to throw an exception if a missing key is encountered. Defaults to False.
            enum_to_str (bool, optional): Whether to convert enum values to strings. Defaults to False.
            structured_config_mode (SCMode, optional): The structured config mode to use. Defaults to SCMode.DICT.

        Returns:
            The nested container (e.g. dictionary) representation of the input config object.
        """
        return OmegaConf.to_container(
            cfg,
            resolve=resolve,
            throw_on_missing=throw_on_missing,
            enum_to_str=enum_to_str,
            structured_config_mode=structured_config_mode,
        )

    @staticmethod
    def ensure_list(value):
        """
        Ensure that the given value is a list. If the value is None or an empty string, an empty list is returned.
        If the value is already a list, it is returned as is. If the value is a string, it is returned as a list
        containing only that string. Otherwise, the value is converted to a dictionary using the CONF.to_dict method
        and the resulting dictionary is returned as a list.

        Args:
            value (Any): The value to ensure as a list.

        Returns:
            List: The value as a list.
        """
        if not value:
            return []
        elif isinstance(value, str):
            return [value]
        return CONFs.to_dict(value)

    @staticmethod
    def ensure_kwargs(_kwargs, _fn):
        """
        Ensure that the given keyword arguments are valid for the given function.

        Args:
            _kwargs (dict): The keyword arguments to validate.
            _fn (callable): The function to validate the keyword arguments against.

        Returns:
            dict: The valid keyword arguments for the given function.
        """
        from inspect import getfullargspec as getargspec

        if callable(_fn):
            args = getargspec(_fn).args
            logger.info(f"args of {_fn}: {args}")
            return {k: v for k, v in _kwargs.items() if k in args}
        return _kwargs

    @staticmethod
    def pprint(cfg: Any, resolve: bool = True, **kwargs):
        CONFs.print(cfg, resolve=resolve, **kwargs)

ensure_kwargs(_kwargs, _fn) staticmethod

Ensure that the given keyword arguments are valid for the given function.

Parameters:

Name Type Description Default
_kwargs dict

The keyword arguments to validate.

required
_fn callable

The function to validate the keyword arguments against.

required

Returns:

Name Type Description
dict

The valid keyword arguments for the given function.

Source code in hyfi/utils/conf.py
@staticmethod
def ensure_kwargs(_kwargs, _fn):
    """
    Ensure that the given keyword arguments are valid for the given function.

    Args:
        _kwargs (dict): The keyword arguments to validate.
        _fn (callable): The function to validate the keyword arguments against.

    Returns:
        dict: The valid keyword arguments for the given function.
    """
    from inspect import getfullargspec as getargspec

    if callable(_fn):
        args = getargspec(_fn).args
        logger.info(f"args of {_fn}: {args}")
        return {k: v for k, v in _kwargs.items() if k in args}
    return _kwargs

ensure_list(value) staticmethod

Ensure that the given value is a list. If the value is None or an empty string, an empty list is returned. If the value is already a list, it is returned as is. If the value is a string, it is returned as a list containing only that string. Otherwise, the value is converted to a dictionary using the CONF.to_dict method and the resulting dictionary is returned as a list.

Parameters:

Name Type Description Default
value Any

The value to ensure as a list.

required

Returns:

Name Type Description
List

The value as a list.

Source code in hyfi/utils/conf.py
@staticmethod
def ensure_list(value):
    """
    Ensure that the given value is a list. If the value is None or an empty string, an empty list is returned.
    If the value is already a list, it is returned as is. If the value is a string, it is returned as a list
    containing only that string. Otherwise, the value is converted to a dictionary using the CONF.to_dict method
    and the resulting dictionary is returned as a list.

    Args:
        value (Any): The value to ensure as a list.

    Returns:
        List: The value as a list.
    """
    if not value:
        return []
    elif isinstance(value, str):
        return [value]
    return CONFs.to_dict(value)

is_config(cfg) staticmethod

Determines whether the input object is a valid configuration object.

Parameters:

Name Type Description Default
cfg Any

The object to check.

required

Returns:

Name Type Description
bool

True if the object is a valid configuration object, False otherwise.

Source code in hyfi/utils/conf.py
@staticmethod
def is_config(cfg: Any):
    """
    Determines whether the input object is a valid configuration object.

    Args:
        cfg (Any): The object to check.

    Returns:
        bool: True if the object is a valid configuration object, False otherwise.
    """
    return isinstance(cfg, (DictConfig, dict))

is_dictlike(cfg) staticmethod

Determines whether the input object is a valid dictionary configuration object.

Parameters:

Name Type Description Default
cfg Any

The object to check.

required

Returns:

Name Type Description
bool

True if the object is a valid dictionary configuration object, False otherwise.

Source code in hyfi/utils/conf.py
@staticmethod
def is_dictlike(cfg: Any):
    """
    Determines whether the input object is a valid dictionary configuration object.

    Args:
        cfg (Any): The object to check.

    Returns:
        bool: True if the object is a valid dictionary configuration object, False otherwise.
    """
    return isinstance(cfg, DictLike)

is_list(cfg) staticmethod

Determines whether the input object is a valid list configuration object.

Parameters:

Name Type Description Default
cfg Any

The object to check.

required

Returns:

Name Type Description
bool

True if the object is a valid list configuration object, False otherwise.

Source code in hyfi/utils/conf.py
@staticmethod
def is_list(cfg: Any):
    """
    Determines whether the input object is a valid list configuration object.

    Args:
        cfg (Any): The object to check.

    Returns:
        bool: True if the object is a valid list configuration object, False otherwise.
    """
    return isinstance(cfg, (ListConfig, list))

is_listlike(cfg) staticmethod

Determines whether the input object is a valid list configuration object.

Parameters:

Name Type Description Default
cfg Any

The object to check.

required

Returns:

Name Type Description
bool

True if the object is a valid list configuration object, False otherwise.

Source code in hyfi/utils/conf.py
@staticmethod
def is_listlike(cfg: Any):
    """
    Determines whether the input object is a valid list configuration object.

    Args:
        cfg (Any): The object to check.

    Returns:
        bool: True if the object is a valid list configuration object, False otherwise.
    """
    return isinstance(cfg, ListLike)

load(file_) staticmethod

Load a configuration file and return a configuration object.

Parameters:

Name Type Description Default
file_ Union[str, Path, IO[Any]]

The path to the configuration file or a file-like object.

required

Returns:

Type Description
Union[DictConfig, ListConfig]

Union[DictConfig, ListConfig]: The configuration object.

Source code in hyfi/utils/conf.py
@staticmethod
def load(file_: Union[str, Path, IO[Any]]) -> Union[DictConfig, ListConfig]:
    """
    Load a configuration file and return a configuration object.

    Args:
        file_ (Union[str, Path, IO[Any]]): The path to the configuration file or a file-like object.

    Returns:
        Union[DictConfig, ListConfig]: The configuration object.
    """
    return OmegaConf.load(file_)

load_json(f, encoding='utf-8', **kwargs) staticmethod

Load a JSON file into a dictionary.

Parameters:

Name Type Description Default
f Union[str, Path, IO[Any]]

The path to the file or a file-like object.

required
encoding str

The encoding to use. Defaults to "utf-8".

'utf-8'
**kwargs

Additional arguments to pass to json.load().

{}

Returns:

Name Type Description
dict dict

The dictionary loaded from the JSON file.

Source code in hyfi/utils/conf.py
@staticmethod
def load_json(f: Union[str, Path, IO[Any]], encoding="utf-8", **kwargs) -> dict:
    """
    Load a JSON file into a dictionary.

    Args:
        f (Union[str, Path, IO[Any]]): The path to the file or a file-like object.
        encoding (str, optional): The encoding to use. Defaults to "utf-8".
        **kwargs: Additional arguments to pass to json.load().

    Returns:
        dict: The dictionary loaded from the JSON file.
    """
    f = str(f)
    with open(f, "r", encoding=encoding) as f:
        return json.load(f, **kwargs)

merge(*configs) staticmethod

Merge a list of previously created configs into a single one.

Parameters:

Name Type Description Default
*configs Union[DictConfig, ListConfig, Dict[DictKeyType, Any], List[Any], Tuple[Any, ...], Any]

Input configs.

()

Returns:

Type Description
Union[ListConfig, DictConfig]

Union[ListConfig, DictConfig]: The merged config object.

Source code in hyfi/utils/conf.py
@staticmethod
def merge(
    *configs: Union[
        DictConfig,
        ListConfig,
        Dict[DictKeyType, Any],
        List[Any],
        Tuple[Any, ...],
        Any,
    ],
) -> Union[ListConfig, DictConfig]:
    """
    Merge a list of previously created configs into a single one.

    Args:
        *configs: Input configs.

    Returns:
        Union[ListConfig, DictConfig]: The merged config object.
    """
    return OmegaConf.merge(*configs)

merge_as_dict(*configs) staticmethod

Merge a list of previously created configs into a single dictionary.

Parameters:

Name Type Description Default
*configs Union[DictConfig, ListConfig, Dict[DictKeyType, Any], List[Any], Tuple[Any, ...], Any]

Input configs.

()

Returns:

Type Description
Union[ListConfig, DictConfig]

Union[ListConfig, DictConfig]: The merged config object as a dictionary.

Source code in hyfi/utils/conf.py
@staticmethod
def merge_as_dict(
    *configs: Union[
        DictConfig,
        ListConfig,
        Dict[DictKeyType, Any],
        List[Any],
        Tuple[Any, ...],
        Any,
    ],
) -> Union[ListConfig, DictConfig]:
    """
    Merge a list of previously created configs into a single dictionary.

    Args:
        *configs: Input configs.

    Returns:
        Union[ListConfig, DictConfig]: The merged config object as a dictionary.
    """
    return CONFs.to_dict(OmegaConf.merge(*configs))

print(cfg, resolve=True, **kwargs) staticmethod

Prints the configuration object in a human-readable format.

Parameters:

Name Type Description Default
cfg Any

The configuration object to print.

required
resolve bool

Whether to resolve the configuration object before printing. Defaults to True.

True
**kwargs

Additional keyword arguments to pass to the pprint.pprint function.

{}

Returns:

Type Description

None

Source code in hyfi/utils/conf.py
@staticmethod
def print(cfg: Any, resolve: bool = True, **kwargs):
    """
    Prints the configuration object in a human-readable format.

    Args:
        cfg (Any): The configuration object to print.
        resolve (bool, optional): Whether to resolve the configuration object before printing. Defaults to True.
        **kwargs: Additional keyword arguments to pass to the pprint.pprint function.

    Returns:
        None
    """
    import pprint

    if CONFs.is_config(cfg):
        if resolve:
            pprint.pprint(CONFs.to_dict(cfg), **kwargs)
        else:
            pprint.pprint(cfg, **kwargs)
    else:
        print(cfg)

replace_keys(_dict, old_key, new_key) staticmethod

Replace a key in a dictionary.

Parameters:

Name Type Description Default
_dict Mapping[str, Any]

The dictionary to update.

required
old_key str

The old key to replace.

required
new_key str

The new key to use.

required

Returns:

Name Type Description
Mapping Mapping

The updated dictionary.

Source code in hyfi/utils/conf.py
@staticmethod
def replace_keys(_dict: Mapping[str, Any], old_key: str, new_key: str) -> Mapping:
    """
    Replace a key in a dictionary.

    Args:
        _dict (Mapping[str, Any]): The dictionary to update.
        old_key (str): The old key to replace.
        new_key (str): The new key to use.

    Returns:
        Mapping: The updated dictionary.
    """
    _new_dict = {}
    for k, v in _dict.items():
        key = new_key if k == old_key else k
        if isinstance(v, collections.abc.Mapping):
            _new_dict[key] = CONFs.replace_keys(v, old_key, new_key)
        else:
            _new_dict[key] = v
    return _new_dict

save(config, f, resolve=False) staticmethod

Save a configuration object to a file.

Parameters:

Name Type Description Default
config Any

The configuration object to save.

required
f Union[str, Path, IO[Any]]

The path to the file or a file-like object.

required
resolve bool

Whether to resolve the configuration object before saving. Defaults to False.

False
Source code in hyfi/utils/conf.py
@staticmethod
def save(config: Any, f: Union[str, Path, IO[Any]], resolve: bool = False) -> None:
    """
    Save a configuration object to a file.

    Args:
        config (Any): The configuration object to save.
        f (Union[str, Path, IO[Any]]): The path to the file or a file-like object.
        resolve (bool, optional): Whether to resolve the configuration object before saving. Defaults to False.
    """
    os.makedirs(os.path.dirname(str(f)), exist_ok=True)
    OmegaConf.save(config, f, resolve=resolve)

save_json(json_dict, f, indent=4, ensure_ascii=False, default=None, encoding='utf-8', **kwargs) staticmethod

Save a dictionary to a JSON file.

Parameters:

Name Type Description Default
json_dict dict

The dictionary to save.

required
f Union[str, Path, IO[Any]]

The path to the file or a file-like object.

required
indent int

The number of spaces to use for indentation. Defaults to 4.

4
ensure_ascii bool

Whether to escape non-ASCII characters. Defaults to False.

False
default Any

A function to convert non-serializable objects. Defaults to None.

None
encoding str

The encoding to use. Defaults to "utf-8".

'utf-8'
**kwargs

Additional arguments to pass to json.dump().

{}
Source code in hyfi/utils/conf.py
@staticmethod
def save_json(
    json_dict: dict,
    f: Union[str, Path, IO[Any]],
    indent=4,
    ensure_ascii=False,
    default=None,
    encoding="utf-8",
    **kwargs,
):
    """
    Save a dictionary to a JSON file.

    Args:
        json_dict (dict): The dictionary to save.
        f (Union[str, Path, IO[Any]]): The path to the file or a file-like object.
        indent (int, optional): The number of spaces to use for indentation. Defaults to 4.
        ensure_ascii (bool, optional): Whether to escape non-ASCII characters. Defaults to False.
        default (Any, optional): A function to convert non-serializable objects. Defaults to None.
        encoding (str, optional): The encoding to use. Defaults to "utf-8".
        **kwargs: Additional arguments to pass to json.dump().
    """
    f = str(f)
    os.makedirs(os.path.dirname(f), exist_ok=True)
    with open(f, "w", encoding=encoding) as f:
        json.dump(
            json_dict,
            f,
            indent=indent,
            ensure_ascii=ensure_ascii,
            default=default,
            **kwargs,
        )

select(cfg, key, default=None, throw_on_resolution_failure=True, throw_on_missing=False) staticmethod

Wrapper for OmegaConf. select value from a config object using a key.

Parameters:

Name Type Description Default
cfg Any

Config node to select from

required
key str

Key to select

required
default Any

Default value to return if key is not found

None
throw_on_resolution_failure bool

Raise an exception if an interpolation resolution error occurs, otherwise return None

True
throw_on_missing bool

Raise an exception if an attempt to select a missing key (with the value '???') is made, otherwise return None

False

Returns:

Type Description

selected value or None if not found.

Source code in hyfi/utils/conf.py
@staticmethod
def select(
    cfg: Any,
    key: str,
    default: Any = None,
    throw_on_resolution_failure: bool = True,
    throw_on_missing: bool = False,
):
    """
    Wrapper for OmegaConf. select value from a config object using a key.

    Args:
        cfg: Config node to select from
        key: Key to select
        default: Default value to return if key is not found
        throw_on_resolution_failure: Raise an exception if an interpolation
            resolution error occurs, otherwise return None
        throw_on_missing: Raise an exception if an attempt to select a missing key (with the value '???')
            is made, otherwise return None

    Returns:
        selected value or None if not found.
    """
    key = key.replace("/", ".")
    return OmegaConf.select(
        cfg,
        key=key,
        default=default,
        throw_on_resolution_failure=throw_on_resolution_failure,
        throw_on_missing=throw_on_missing,
    )

to_config(cfg) staticmethod

Convert a config object to OmegaConf

Parameters:

Name Type Description Default
cfg Any

The config to convert.

required

Returns:

Type Description
Union[DictConfig, ListConfig]

A Config object that corresponds to the given config.

Source code in hyfi/utils/conf.py
@staticmethod
def to_config(cfg: Any) -> Union[DictConfig, ListConfig]:
    """
    Convert a config object to OmegaConf

    Args:
        cfg: The config to convert.

    Returns:
        A Config object that corresponds to the given config.
    """
    return OmegaConf.create(cfg)

to_container(cfg, resolve=False, throw_on_missing=False, enum_to_str=False, structured_config_mode=SCMode.DICT) staticmethod

Convert the input config object to a nested container (e.g. dictionary).

Parameters:

Name Type Description Default
cfg Any

The input config object.

required
resolve bool

Whether to resolve the config object before converting it to a container. Defaults to False.

False
throw_on_missing bool

Whether to throw an exception if a missing key is encountered. Defaults to False.

False
enum_to_str bool

Whether to convert enum values to strings. Defaults to False.

False
structured_config_mode SCMode

The structured config mode to use. Defaults to SCMode.DICT.

DICT

Returns:

Type Description

The nested container (e.g. dictionary) representation of the input config object.

Source code in hyfi/utils/conf.py
@staticmethod
def to_container(
    cfg: Any,
    resolve: bool = False,
    throw_on_missing: bool = False,
    enum_to_str: bool = False,
    structured_config_mode: SCMode = SCMode.DICT,
):
    """
    Convert the input config object to a nested container (e.g. dictionary).

    Args:
        cfg (Any): The input config object.
        resolve (bool, optional): Whether to resolve the config object before converting it to a container. Defaults to False.
        throw_on_missing (bool, optional): Whether to throw an exception if a missing key is encountered. Defaults to False.
        enum_to_str (bool, optional): Whether to convert enum values to strings. Defaults to False.
        structured_config_mode (SCMode, optional): The structured config mode to use. Defaults to SCMode.DICT.

    Returns:
        The nested container (e.g. dictionary) representation of the input config object.
    """
    return OmegaConf.to_container(
        cfg,
        resolve=resolve,
        throw_on_missing=throw_on_missing,
        enum_to_str=enum_to_str,
        structured_config_mode=structured_config_mode,
    )

to_dict(cfg) staticmethod

Convert a config to a dict

Parameters:

Name Type Description Default
cfg Any

The config to convert.

required

Returns:

Type Description
Any

The dict representation of the config.

Source code in hyfi/utils/conf.py
@staticmethod
def to_dict(cfg: Any) -> Any:
    """
    Convert a config to a dict

    Args:
        cfg: The config to convert.

    Returns:
        The dict representation of the config.
    """
    # Convert a config object to a config object.
    if isinstance(cfg, dict):
        cfg = CONFs.to_config(cfg)
    # Returns a container for the given config.
    if isinstance(cfg, (DictConfig, ListConfig)):
        return OmegaConf.to_container(
            cfg,
            resolve=True,
            throw_on_missing=False,
            structured_config_mode=SCMode.DICT,
        )
    return cfg

to_yaml(cfg, resolve=False, sort_keys=False) staticmethod

Convert the input config object to a YAML string.

Parameters:

Name Type Description Default
cfg Any

The input config object.

required
resolve bool

Whether to resolve the config object before converting it to YAML. Defaults to False.

False
sort_keys bool

Whether to sort the keys in the resulting YAML string. Defaults to False.

False

Returns:

Name Type Description
str str

The YAML string representation of the input config object.

Source code in hyfi/utils/conf.py
@staticmethod
def to_yaml(cfg: Any, resolve: bool = False, sort_keys: bool = False) -> str:
    """
    Convert the input config object to a YAML string.

    Args:
        cfg (Any): The input config object.
        resolve (bool, optional): Whether to resolve the config object before converting it to YAML. Defaults to False.
        sort_keys (bool, optional): Whether to sort the keys in the resulting YAML string. Defaults to False.

    Returns:
        str: The YAML string representation of the input config object.
    """
    if resolve:
        cfg = CONFs.to_dict(cfg)
    return OmegaConf.to_yaml(cfg, resolve=resolve, sort_keys=sort_keys)

update(_dict, _overrides) staticmethod

Update a dictionary with overrides.

Parameters:

Name Type Description Default
_dict Mapping[str, Any]

The dictionary to update.

required
_overrides Mapping[str, Any]

The dictionary with overrides.

required

Returns:

Name Type Description
Mapping Mapping

The updated dictionary.

Source code in hyfi/utils/conf.py
@staticmethod
def update(_dict: Mapping[str, Any], _overrides: Mapping[str, Any]) -> Mapping:
    """
    Update a dictionary with overrides.

    Args:
        _dict (Mapping[str, Any]): The dictionary to update.
        _overrides (Mapping[str, Any]): The dictionary with overrides.

    Returns:
        Mapping: The updated dictionary.
    """
    for k, v in _overrides.items():
        if isinstance(v, collections.abc.Mapping):
            _dict[k] = CONFs.update((_dict.get(k) or {}), v)  # type: ignore
        else:
            _dict[k] = v  # type: ignore
    return _dict

DATASETs

Bases: DSAggregate, DSBasic, DSCombine, DSLoad, DSPlot, DSReshape, DSSave, DSSlice, DSUtils

A class representing a collection of datasets.

This class inherits from various dataset utility classes and provides a convenient way to access and manipulate datasets.

Source code in hyfi/utils/datasets/__init__.py
class DATASETs(
    DSAggregate,
    DSBasic,
    DSCombine,
    DSLoad,
    DSPlot,
    DSReshape,
    DSSave,
    DSSlice,
    DSUtils,
):
    """
    A class representing a collection of datasets.

    This class inherits from various dataset utility classes and provides a convenient way to access and manipulate datasets.

    Attributes:
        None

    Methods:
        None
    """

ENVs

Source code in hyfi/utils/envs.py
class ENVs:
    @staticmethod
    def getcwd():
        """Get the original working directory before Hydra changed it.

        This function tries to call the `get_original_cwd` function from the `hydra.utils` module,
        which returns the original working directory if it exists. If the `get_original_cwd` function
        raises a `ValueError` exception, it means that Hydra did not change the working directory,
        so the function falls back to calling the `os.getcwd` function, which returns the current
        working directory.

        Returns:
            str: The original working directory before Hydra changed it.
        """
        try:
            return hydra.utils.get_original_cwd()
        except ValueError:
            return os.getcwd()

    @staticmethod
    def expand_posix_vars(posix_expr: str, context: dict = None) -> str:  # type: ignore
        # sourcery skip: dict-assign-update-to-union
        """
        Expand POSIX variables in a string.

        Args:
            posix_expr (str): The string containing POSIX variables to be expanded.
            context (dict, optional): A dictionary containing additional variables to be used in the expansion.
                Defaults to None.

        Returns:
            str: The expanded string.

        Examples:
            >>> expand_posix_vars("$HOME")
            '/home/user'
            >>> expand_posix_vars("$HOME/$USER", {"USER": "testuser"})
            '/home/user/testuser'

        """
        # Set the context to the default context.
        if context is None:
            context = {}
        env = defaultdict(str, os.environ.copy())
        env.update(context)
        return Template(posix_expr).substitute(env)

    @staticmethod
    def dotenv_values(dotenv_path: str = "", **kwargs):
        """
        Load dotenv file and return a dict of key / value pairs. This is a wrapper around : py : func : ` dotenv. dotenv_values `

        Args:
            dotenv_path: path to. env file

        Returns:
            dict of key / value pairs ( key = value )
        """
        config = dotenv.dotenv_values(dotenv_path=dotenv_path, **kwargs)
        return dict(config)

    @staticmethod
    def load_dotenv(
        override: bool = False,
        dotenv_file: str = ".env",
        raise_error_if_not_found: bool = False,
        usecwd: bool = False,
        verbose: bool = False,
        **kwargs,
    ) -> None:
        """
        Load. env file from given directory or from current directory. This is a convenience function for use in tests that want to run dotenv in a non - interactive environment

        Args:
            override: If True override existing. env file
            dotenv_file: Name of. env file to look for in given directory or current directory
            verbose: Print debug information to console

        Returns:
            None or a Path object for the. env file
        """
        dotenv_path = Path(dotenv_file)
        if not dotenv_path.is_absolute():
            dotenv_path = Path(ENVs.getcwd()) / dotenv_path
        dotenv_filename = dotenv_path.name
        dotenv_dir = str(dotenv_path.parent)
        # Load. env files and directories.
        if dotenv_path.is_file():
            dotenv.load_dotenv(
                dotenv_path=str(dotenv_path),
                verbose=verbose,
                override=override,
                **kwargs,
            )
            os.environ["DOTENV_FILENAME"] = dotenv_filename
            os.environ["DOTENV_FILE"] = str(dotenv_path)
            os.environ["DOTENV_DIR"] = dotenv_dir
            # Load. env from dotenv_path.
            logger.info("Loaded .env from [%s]", dotenv_path)
        else:
            # If verbose is true print out the. env file.
            logger.info("[%s] not found, finding .env in parent dirs", dotenv_path)
            # Find. env file in parent directories.
            if dotenv_path := ENVs.find_dotenv(
                filename=dotenv_filename,
                raise_error_if_not_found=raise_error_if_not_found,
                usecwd=usecwd,
            ):
                dotenv.load_dotenv(
                    dotenv_path=dotenv_path,
                    verbose=verbose,
                    override=override,
                    **kwargs,
                )
                dotenv_path = Path(dotenv_path)
                dotenv_filename = dotenv_path.name
                dotenv_dir = str(dotenv_path.parent)
                os.environ["DOTENV_FILENAME"] = dotenv_filename
                os.environ["DOTENV_FILE"] = str(dotenv_path)
                os.environ["DOTENV_DIR"] = dotenv_dir
                # Load. env from dotenv_path.
                logger.info("Loaded .env from [%s]", dotenv_path)
            else:
                os.environ["DOTENV_FILE"] = ""
                os.environ["DOTENV_DIR"] = ""
                # Print out the. env file if verbose is true.
                logger.info("No .env file found in the parent dirs of [%s]", dotenv_dir)

    @staticmethod
    def is_interactive():
        """Decide whether this is running in a REPL or IPython notebook"""
        main = __import__("__main__", None, None, fromlist=["__file__"])
        return not hasattr(main, "__file__")

    @staticmethod
    def find_dotenv(
        filename: str = ".env",
        raise_error_if_not_found: bool = False,
        usecwd: bool = False,
    ) -> str:
        """
        Search in increasingly higher folders for the given file

        Returns path to the file if found, or an empty string otherwise
        """

        if usecwd or ENVs.is_interactive() or getattr(sys, "frozen", False):
            # Should work without __file__, e.g. in REPL or IPython notebook.
            path = os.getcwd()
        else:
            # will work for .py files
            frame = sys._getframe()
            current_file = __file__

            while frame.f_code.co_filename == current_file:
                assert frame.f_back is not None
                frame = frame.f_back
            frame_filename = frame.f_code.co_filename
            path = os.path.dirname(os.path.abspath(frame_filename))

        logger.debug("Trying to find %s in %s", filename, path)
        for dirname in IOLIBs.walk_to_root(path):
            check_path = os.path.join(dirname, filename)
            if os.path.isfile(check_path):
                return check_path

        if raise_error_if_not_found:
            raise IOError("File not found")

        return ""

    @staticmethod
    def find_dotenv_dir(
        filename: str = ".env",
        raise_error_if_not_found: bool = False,
        usecwd: bool = False,
    ) -> str:
        """
        Search in increasingly higher folders for the given file

        Returns path to the file if found, or an empty string otherwise
        """
        if dotenv_path := ENVs.find_dotenv(
            filename=filename,
            raise_error_if_not_found=raise_error_if_not_found,
            usecwd=usecwd,
        ):
            return os.path.dirname(dotenv_path)
        return ""

    @staticmethod
    def get_osenv(key: str = "", default: Optional[str] = None) -> Any:
        """Get the value of an environment variable or return the default value"""
        ENVs.load_dotenv()
        return os.environ.get(key, default) if key else os.environ

    @staticmethod
    def set_osenv(key: str, value: Any) -> None:
        """Set the value of an environment variable"""
        if value and IOLIBs.is_dir(value):
            value = os.path.abspath(value)
        if pre_val := os.environ.get(key):
            logger.info("Overwriting %s=%s with %s", key, pre_val, value)
        else:
            logger.info("Setting %s=%s", key, value)
        os.setdefault(key, value)

    @staticmethod
    def check_and_set_osenv_var(key: str, value: Any) -> Any:
        """Check and set value to environment variable"""
        env_key = key.upper()
        if value is not None:
            old_value = os.getenv(env_key, "")
            if str(old_value).lower() != str(value).lower():
                os.environ[env_key] = str(value)
                logger.debug("Set environment variable %s=%s", env_key, str(value))
        elif env_key in os.environ and os.environ[env_key]:
            del os.environ[env_key]
            logger.debug("Deleted environment variable %s", env_key)
        return value

    @staticmethod
    def check_and_set_osenv_vars(values: Dict[str, Any]) -> Dict[str, Any]:
        for k, v in values.items():
            ENVs.check_and_set_osenv_var(k, v)
        return values

    @staticmethod
    def osenv():
        """
        Return the os environment variables as a dictionary.

        Returns:
            dict: A dictionary containing the os environment variables.
        """
        return os.environ

check_and_set_osenv_var(key, value) staticmethod

Check and set value to environment variable

Source code in hyfi/utils/envs.py
@staticmethod
def check_and_set_osenv_var(key: str, value: Any) -> Any:
    """Check and set value to environment variable"""
    env_key = key.upper()
    if value is not None:
        old_value = os.getenv(env_key, "")
        if str(old_value).lower() != str(value).lower():
            os.environ[env_key] = str(value)
            logger.debug("Set environment variable %s=%s", env_key, str(value))
    elif env_key in os.environ and os.environ[env_key]:
        del os.environ[env_key]
        logger.debug("Deleted environment variable %s", env_key)
    return value

dotenv_values(dotenv_path='', **kwargs) staticmethod

Load dotenv file and return a dict of key / value pairs. This is a wrapper around : py : func : dotenv. dotenv_values

Parameters:

Name Type Description Default
dotenv_path str

path to. env file

''

Returns:

Type Description

dict of key / value pairs ( key = value )

Source code in hyfi/utils/envs.py
@staticmethod
def dotenv_values(dotenv_path: str = "", **kwargs):
    """
    Load dotenv file and return a dict of key / value pairs. This is a wrapper around : py : func : ` dotenv. dotenv_values `

    Args:
        dotenv_path: path to. env file

    Returns:
        dict of key / value pairs ( key = value )
    """
    config = dotenv.dotenv_values(dotenv_path=dotenv_path, **kwargs)
    return dict(config)

expand_posix_vars(posix_expr, context=None) staticmethod

Expand POSIX variables in a string.

Parameters:

Name Type Description Default
posix_expr str

The string containing POSIX variables to be expanded.

required
context dict

A dictionary containing additional variables to be used in the expansion. Defaults to None.

None

Returns:

Name Type Description
str str

The expanded string.

Examples:

>>> expand_posix_vars("$HOME")
'/home/user'
>>> expand_posix_vars("$HOME/$USER", {"USER": "testuser"})
'/home/user/testuser'
Source code in hyfi/utils/envs.py
@staticmethod
def expand_posix_vars(posix_expr: str, context: dict = None) -> str:  # type: ignore
    # sourcery skip: dict-assign-update-to-union
    """
    Expand POSIX variables in a string.

    Args:
        posix_expr (str): The string containing POSIX variables to be expanded.
        context (dict, optional): A dictionary containing additional variables to be used in the expansion.
            Defaults to None.

    Returns:
        str: The expanded string.

    Examples:
        >>> expand_posix_vars("$HOME")
        '/home/user'
        >>> expand_posix_vars("$HOME/$USER", {"USER": "testuser"})
        '/home/user/testuser'

    """
    # Set the context to the default context.
    if context is None:
        context = {}
    env = defaultdict(str, os.environ.copy())
    env.update(context)
    return Template(posix_expr).substitute(env)

find_dotenv(filename='.env', raise_error_if_not_found=False, usecwd=False) staticmethod

Search in increasingly higher folders for the given file

Returns path to the file if found, or an empty string otherwise

Source code in hyfi/utils/envs.py
@staticmethod
def find_dotenv(
    filename: str = ".env",
    raise_error_if_not_found: bool = False,
    usecwd: bool = False,
) -> str:
    """
    Search in increasingly higher folders for the given file

    Returns path to the file if found, or an empty string otherwise
    """

    if usecwd or ENVs.is_interactive() or getattr(sys, "frozen", False):
        # Should work without __file__, e.g. in REPL or IPython notebook.
        path = os.getcwd()
    else:
        # will work for .py files
        frame = sys._getframe()
        current_file = __file__

        while frame.f_code.co_filename == current_file:
            assert frame.f_back is not None
            frame = frame.f_back
        frame_filename = frame.f_code.co_filename
        path = os.path.dirname(os.path.abspath(frame_filename))

    logger.debug("Trying to find %s in %s", filename, path)
    for dirname in IOLIBs.walk_to_root(path):
        check_path = os.path.join(dirname, filename)
        if os.path.isfile(check_path):
            return check_path

    if raise_error_if_not_found:
        raise IOError("File not found")

    return ""

find_dotenv_dir(filename='.env', raise_error_if_not_found=False, usecwd=False) staticmethod

Search in increasingly higher folders for the given file

Returns path to the file if found, or an empty string otherwise

Source code in hyfi/utils/envs.py
@staticmethod
def find_dotenv_dir(
    filename: str = ".env",
    raise_error_if_not_found: bool = False,
    usecwd: bool = False,
) -> str:
    """
    Search in increasingly higher folders for the given file

    Returns path to the file if found, or an empty string otherwise
    """
    if dotenv_path := ENVs.find_dotenv(
        filename=filename,
        raise_error_if_not_found=raise_error_if_not_found,
        usecwd=usecwd,
    ):
        return os.path.dirname(dotenv_path)
    return ""

get_osenv(key='', default=None) staticmethod

Get the value of an environment variable or return the default value

Source code in hyfi/utils/envs.py
@staticmethod
def get_osenv(key: str = "", default: Optional[str] = None) -> Any:
    """Get the value of an environment variable or return the default value"""
    ENVs.load_dotenv()
    return os.environ.get(key, default) if key else os.environ

getcwd() staticmethod

Get the original working directory before Hydra changed it.

This function tries to call the get_original_cwd function from the hydra.utils module, which returns the original working directory if it exists. If the get_original_cwd function raises a ValueError exception, it means that Hydra did not change the working directory, so the function falls back to calling the os.getcwd function, which returns the current working directory.

Returns:

Name Type Description
str

The original working directory before Hydra changed it.

Source code in hyfi/utils/envs.py
@staticmethod
def getcwd():
    """Get the original working directory before Hydra changed it.

    This function tries to call the `get_original_cwd` function from the `hydra.utils` module,
    which returns the original working directory if it exists. If the `get_original_cwd` function
    raises a `ValueError` exception, it means that Hydra did not change the working directory,
    so the function falls back to calling the `os.getcwd` function, which returns the current
    working directory.

    Returns:
        str: The original working directory before Hydra changed it.
    """
    try:
        return hydra.utils.get_original_cwd()
    except ValueError:
        return os.getcwd()

is_interactive() staticmethod

Decide whether this is running in a REPL or IPython notebook

Source code in hyfi/utils/envs.py
@staticmethod
def is_interactive():
    """Decide whether this is running in a REPL or IPython notebook"""
    main = __import__("__main__", None, None, fromlist=["__file__"])
    return not hasattr(main, "__file__")

load_dotenv(override=False, dotenv_file='.env', raise_error_if_not_found=False, usecwd=False, verbose=False, **kwargs) staticmethod

Load. env file from given directory or from current directory. This is a convenience function for use in tests that want to run dotenv in a non - interactive environment

Parameters:

Name Type Description Default
override bool

If True override existing. env file

False
dotenv_file str

Name of. env file to look for in given directory or current directory

'.env'
verbose bool

Print debug information to console

False

Returns:

Type Description
None

None or a Path object for the. env file

Source code in hyfi/utils/envs.py
@staticmethod
def load_dotenv(
    override: bool = False,
    dotenv_file: str = ".env",
    raise_error_if_not_found: bool = False,
    usecwd: bool = False,
    verbose: bool = False,
    **kwargs,
) -> None:
    """
    Load. env file from given directory or from current directory. This is a convenience function for use in tests that want to run dotenv in a non - interactive environment

    Args:
        override: If True override existing. env file
        dotenv_file: Name of. env file to look for in given directory or current directory
        verbose: Print debug information to console

    Returns:
        None or a Path object for the. env file
    """
    dotenv_path = Path(dotenv_file)
    if not dotenv_path.is_absolute():
        dotenv_path = Path(ENVs.getcwd()) / dotenv_path
    dotenv_filename = dotenv_path.name
    dotenv_dir = str(dotenv_path.parent)
    # Load. env files and directories.
    if dotenv_path.is_file():
        dotenv.load_dotenv(
            dotenv_path=str(dotenv_path),
            verbose=verbose,
            override=override,
            **kwargs,
        )
        os.environ["DOTENV_FILENAME"] = dotenv_filename
        os.environ["DOTENV_FILE"] = str(dotenv_path)
        os.environ["DOTENV_DIR"] = dotenv_dir
        # Load. env from dotenv_path.
        logger.info("Loaded .env from [%s]", dotenv_path)
    else:
        # If verbose is true print out the. env file.
        logger.info("[%s] not found, finding .env in parent dirs", dotenv_path)
        # Find. env file in parent directories.
        if dotenv_path := ENVs.find_dotenv(
            filename=dotenv_filename,
            raise_error_if_not_found=raise_error_if_not_found,
            usecwd=usecwd,
        ):
            dotenv.load_dotenv(
                dotenv_path=dotenv_path,
                verbose=verbose,
                override=override,
                **kwargs,
            )
            dotenv_path = Path(dotenv_path)
            dotenv_filename = dotenv_path.name
            dotenv_dir = str(dotenv_path.parent)
            os.environ["DOTENV_FILENAME"] = dotenv_filename
            os.environ["DOTENV_FILE"] = str(dotenv_path)
            os.environ["DOTENV_DIR"] = dotenv_dir
            # Load. env from dotenv_path.
            logger.info("Loaded .env from [%s]", dotenv_path)
        else:
            os.environ["DOTENV_FILE"] = ""
            os.environ["DOTENV_DIR"] = ""
            # Print out the. env file if verbose is true.
            logger.info("No .env file found in the parent dirs of [%s]", dotenv_dir)

osenv() staticmethod

Return the os environment variables as a dictionary.

Returns:

Name Type Description
dict

A dictionary containing the os environment variables.

Source code in hyfi/utils/envs.py
@staticmethod
def osenv():
    """
    Return the os environment variables as a dictionary.

    Returns:
        dict: A dictionary containing the os environment variables.
    """
    return os.environ

set_osenv(key, value) staticmethod

Set the value of an environment variable

Source code in hyfi/utils/envs.py
@staticmethod
def set_osenv(key: str, value: Any) -> None:
    """Set the value of an environment variable"""
    if value and IOLIBs.is_dir(value):
        value = os.path.abspath(value)
    if pre_val := os.environ.get(key):
        logger.info("Overwriting %s=%s with %s", key, pre_val, value)
    else:
        logger.info("Setting %s=%s", key, value)
    os.setdefault(key, value)

FUNCs

Source code in hyfi/utils/funcs.py
class FUNCs:
    @staticmethod
    def unescape_dict(d):
        """Unescape a dictionary"""
        return ast.literal_eval(repr(d).encode("utf-8").decode("unicode-escape"))

    @staticmethod
    def lower_case_with_underscores(string):
        """Converts 'CamelCased' to 'camel_cased'."""
        return (
            re.sub(r"\s+", "_", re.sub(r"(?<!^)(?=[A-Z])", "_", string).lower())
            .replace("-", "_")
            .replace("__", "_")
        )

    @staticmethod
    def ordinal(num):
        """Return the ordinal of a number as a string."""
        return "%d%s" % (
            num,
            "tsnrhtdd"[(num // 10 % 10 != 1) * (num % 10 < 4) * num % 10 :: 4],
        )

    @staticmethod
    def get_offset_ranges(count, num_workers):
        """Get offset ranges for parallel processing"""
        assert count > num_workers
        step_sz = int(count / num_workers)
        offset_ranges = [0]
        pv_cnt = 1
        for i in range(num_workers):
            pv_cnt = count + 1 if i == num_workers - 1 else pv_cnt + step_sz
            offset_ranges.append(pv_cnt)
        return offset_ranges

    @staticmethod
    def fancy_print(*args, color=None, bold=False, **kwargs):
        """Print with color and bold"""
        if bold:
            print("\033[1m", end="")

        if color:
            print(f"\033[{color}m", end="")

        print(*args, **kwargs)

        print("\033[0m", end="")  # reset

    # https://stackoverflow.com/questions/12523586/python-format-size-application-converting-b-to-kb-mb-gb-tb/37423778
    @staticmethod
    def humanbytes(B, units=None):
        "Return the given bytes as a human friendly KB, MB, GB, or TB string"
        B = float(B)
        KB = float(1024)
        MB = float(KB**2)  # 1,048,576
        GB = float(KB**3)  # 1,073,741,824
        TB = float(KB**4)  # 1,099,511,627,776

        if (B < KB and units is None) or units == "B":
            return "{0} {1}".format(B, "Bytes" if 0 == B > 1 else "Byte")
        elif (KB <= B < MB and units is None) or units == "KiB":
            return "{0:.2f} KiB".format(B / KB)
        elif (MB <= B < GB and units is None) or units == "MiB":
            return "{0:.2f} MiB".format(B / MB)
        elif (GB <= B < TB and units is None) or units == "GiB":
            return "{0:.2f} GiB".format(B / GB)
        elif (TB <= B and units is None) or units == "TiB":
            return "{0:.2f} TiB".format(B / TB)

    @staticmethod
    def parse_size(sizestr):
        """
        Parse a size string into a number of bytes. For example, "16K" will
        return 16384.  If no suffix is provided, bytes are assumed.  This
        function is case-insensitive.

        :param sizestr: A string representing a size, such as "16K", "2M", "1G".
        :return: The number of bytes that the string represents.
        """
        unit = sizestr[-1]
        size = float(sizestr[:-1])

        if unit.upper() == "B":
            return size
        if unit.upper() == "K":
            return size * 1024
        if unit.upper() == "M":
            return size * 1024 * 1024
        if unit.upper() == "G":
            return size * 1024 * 1024 * 1024
        if unit.upper() == "T":
            return size * 1024 * 1024 * 1024 * 1024

    @staticmethod
    def check_min_len(s, len_func, min_len):
        """Check if the length of a string is greater than or equal to a minimum length"""
        return len_func(s) >= min_len

    @staticmethod
    def check_max_len(s, len_func, max_len):
        """Check if the length of a string is less than or equal to a maximum length"""
        return len_func(s) <= max_len

    @staticmethod
    def utf8len(s):
        """Return the length of a string in bytes"""
        return len(str(s).encode("utf-8"))

    @staticmethod
    def len_wospc(x):
        """Return the length of a string in bytes without spaces"""
        return FUNCs.utf8len(re.sub(r"\s", "", str(x)))

    @staticmethod
    def len_bytes(x):
        """Return the length of a string in bytes"""
        return FUNCs.utf8len(x)

    @staticmethod
    def len_words(x):
        """Return the number of words in a string"""
        return len(x.split()) if isinstance(x, str) else 0

    @staticmethod
    def len_sents(x, sep):
        """Return the number of sentences in a string"""
        sep = str(sep).encode("utf-8").decode("unicode-escape")
        return len(re.sub(r"(\r?\n|\r){1,}", sep, x).split(sep))

    @staticmethod
    def len_segments(x, sep):
        """Return the number of segments in a string"""
        sep = str(sep).encode("utf-8").decode("unicode-escape")
        return len(re.sub(r"(\r?\n|\r){2,}", sep, x).split(sep))

    @staticmethod
    def any_to_utf8(b):
        """Convert any string to utf-8"""
        try:
            return b.decode("utf-8")
        except UnicodeDecodeError:
            # try to figure out encoding if not utf-8

            guess = chardet.detect(b)["encoding"]

            if not guess or guess == "UTF-8":
                return

            try:
                return b.decode(guess)
            except (UnicodeDecodeError, LookupError):
                # still cant figure out encoding, give up
                return

    @staticmethod
    def today(_format="%Y-%m-%d"):
        """Return today's date"""
        from datetime import datetime

        if _format is None:
            return datetime.today().date()
        else:
            return datetime.today().strftime(_format)

    @staticmethod
    def now(_format="%Y-%m-%d %H:%M:%S"):
        """Return current date and time"""
        from datetime import datetime

        return datetime.now() if _format is None else datetime.now().strftime(_format)

    @staticmethod
    def strptime(
        _date_str: str,
        _format: str = "%Y-%m-%d",
    ):
        """Return a datetime object from a string"""
        from datetime import datetime

        return datetime.strptime(_date_str, _format)

    @staticmethod
    def to_dateparm(_date, _format="%Y-%m-%d"):
        """Return a date parameter string"""
        from datetime import datetime

        _dtstr = datetime.strftime(_date, _format)
        _dtstr = "${to_datetime:" + _dtstr + "," + _format + "}"
        return _dtstr

    @staticmethod
    def human_readable_type_name(t: Type) -> str:
        """
        Generates a useful-for-humans label for a type.
        For builtin types, it's just the class name (eg "str" or "int").
        For other types, it includes the module (eg "pathlib.Path").
        """
        module = t.__module__
        if module == "builtins":
            return t.__qualname__
        elif module.split(".")[0] == "hyfi":
            module = "hyfi"

        try:
            return f"{module}.{t.__qualname__}"
        except AttributeError:
            return str(t)

    @staticmethod
    def readable_types_list(type_list: List[Type]) -> str:
        """Generates a useful-for-humans label for a list of types."""
        return ", ".join(FUNCs.human_readable_type_name(t) for t in type_list)

    @staticmethod
    def dict_product(dicts) -> List[Dict]:
        """
        >>> list(dict_product(dict(number=[1,2], character='ab')))
        [{'character': 'a', 'number': 1},
        {'character': 'a', 'number': 2},
        {'character': 'b', 'number': 1},
        {'character': 'b', 'number': 2}]
        """
        return [dict(zip(dicts, x)) for x in itertools.product(*dicts.values())]

    @staticmethod
    def printf(
        action: str,
        msg: Any = "",
        style: Optional[IntSeq] = None,
        indent: int = 10,
        verbose: Union[bool, StrictBool] = True,
        file_: TextIO = sys.stdout,
    ) -> Optional[str]:
        """Print string with common format."""
        if not verbose:
            return None  # HACK: Satisfy MyPy
        _msg = str(msg)
        action = action.rjust(indent, " ")
        if not style:
            return action + _msg

        out = style + [action] + Style.RESET + [INDENT, _msg]  # type: ignore
        print(*out, sep="", file=file_)
        return None  # HACK: Satisfy MyPy

    @staticmethod
    def printf_exception(
        e: Exception, action: str, msg: str = "", indent: int = 0, quiet: bool = False
    ) -> None:
        """Print exception with common format."""
        if not quiet:
            print("", file=sys.stderr)
            FUNCs.printf(
                action, msg=msg, style=Style.DANGER, indent=indent, file_=sys.stderr
            )
            print(HLINE, file=sys.stderr)
            print(e, file=sys.stderr)
            print(HLINE, file=sys.stderr)

    @staticmethod
    def cast_str_to_bool(value: Any) -> bool:
        """Parse anything to bool.

        Params:
            value:
                Anything to be casted to a bool. Tries to be as smart as possible.

                1.  Cast to number. Then: 0 = False; anything else = True.
                1.  Find [YAML booleans](https://yaml.org/type/bool.html),
                    [YAML nulls](https://yaml.org/type/null.html) or `none` in it
                    and use it appropriately.
                1.  Cast to boolean using standard python `bool(value)`.
        """
        # Assume it's a number
        with suppress(TypeError, ValueError):
            return bool(float(value))
        # Assume it's a string
        with suppress(AttributeError):
            lower = value.lower()
            if lower in {"y", "yes", "t", "true", "on"}:
                return True
            elif lower in {"n", "no", "f", "false", "off", "~", "null", "none"}:
                return False
        # Assume nothing
        return bool(value)

    @staticmethod
    def force_str_end(original_str: str, end: str = "\n") -> str:
        """Make sure a `original_str` ends with `end`.

        Params:
            original_str: String that you want to ensure ending.
            end: String that must exist at the end of `original_str`
        """
        return original_str if original_str.endswith(end) else original_str + end

any_to_utf8(b) staticmethod

Convert any string to utf-8

Source code in hyfi/utils/funcs.py
@staticmethod
def any_to_utf8(b):
    """Convert any string to utf-8"""
    try:
        return b.decode("utf-8")
    except UnicodeDecodeError:
        # try to figure out encoding if not utf-8

        guess = chardet.detect(b)["encoding"]

        if not guess or guess == "UTF-8":
            return

        try:
            return b.decode(guess)
        except (UnicodeDecodeError, LookupError):
            # still cant figure out encoding, give up
            return

cast_str_to_bool(value) staticmethod

Parse anything to bool.

Parameters:

Name Type Description Default
value Any

Anything to be casted to a bool. Tries to be as smart as possible.

  1. Cast to number. Then: 0 = False; anything else = True.
  2. Find YAML booleans, YAML nulls or none in it and use it appropriately.
  3. Cast to boolean using standard python bool(value).
required
Source code in hyfi/utils/funcs.py
@staticmethod
def cast_str_to_bool(value: Any) -> bool:
    """Parse anything to bool.

    Params:
        value:
            Anything to be casted to a bool. Tries to be as smart as possible.

            1.  Cast to number. Then: 0 = False; anything else = True.
            1.  Find [YAML booleans](https://yaml.org/type/bool.html),
                [YAML nulls](https://yaml.org/type/null.html) or `none` in it
                and use it appropriately.
            1.  Cast to boolean using standard python `bool(value)`.
    """
    # Assume it's a number
    with suppress(TypeError, ValueError):
        return bool(float(value))
    # Assume it's a string
    with suppress(AttributeError):
        lower = value.lower()
        if lower in {"y", "yes", "t", "true", "on"}:
            return True
        elif lower in {"n", "no", "f", "false", "off", "~", "null", "none"}:
            return False
    # Assume nothing
    return bool(value)

check_max_len(s, len_func, max_len) staticmethod

Check if the length of a string is less than or equal to a maximum length

Source code in hyfi/utils/funcs.py
@staticmethod
def check_max_len(s, len_func, max_len):
    """Check if the length of a string is less than or equal to a maximum length"""
    return len_func(s) <= max_len

check_min_len(s, len_func, min_len) staticmethod

Check if the length of a string is greater than or equal to a minimum length

Source code in hyfi/utils/funcs.py
@staticmethod
def check_min_len(s, len_func, min_len):
    """Check if the length of a string is greater than or equal to a minimum length"""
    return len_func(s) >= min_len

dict_product(dicts) staticmethod

list(dict_product(dict(number=[1,2], character='ab'))) [{'character': 'a', 'number': 1}, {'character': 'a', 'number': 2}, {'character': 'b', 'number': 1}, {'character': 'b', 'number': 2}]

Source code in hyfi/utils/funcs.py
@staticmethod
def dict_product(dicts) -> List[Dict]:
    """
    >>> list(dict_product(dict(number=[1,2], character='ab')))
    [{'character': 'a', 'number': 1},
    {'character': 'a', 'number': 2},
    {'character': 'b', 'number': 1},
    {'character': 'b', 'number': 2}]
    """
    return [dict(zip(dicts, x)) for x in itertools.product(*dicts.values())]

fancy_print(*args, color=None, bold=False, **kwargs) staticmethod

Print with color and bold

Source code in hyfi/utils/funcs.py
@staticmethod
def fancy_print(*args, color=None, bold=False, **kwargs):
    """Print with color and bold"""
    if bold:
        print("\033[1m", end="")

    if color:
        print(f"\033[{color}m", end="")

    print(*args, **kwargs)

    print("\033[0m", end="")  # reset

force_str_end(original_str, end='\n') staticmethod

Make sure a original_str ends with end.

Parameters:

Name Type Description Default
original_str str

String that you want to ensure ending.

required
end str

String that must exist at the end of original_str

'\n'
Source code in hyfi/utils/funcs.py
@staticmethod
def force_str_end(original_str: str, end: str = "\n") -> str:
    """Make sure a `original_str` ends with `end`.

    Params:
        original_str: String that you want to ensure ending.
        end: String that must exist at the end of `original_str`
    """
    return original_str if original_str.endswith(end) else original_str + end

get_offset_ranges(count, num_workers) staticmethod

Get offset ranges for parallel processing

Source code in hyfi/utils/funcs.py
@staticmethod
def get_offset_ranges(count, num_workers):
    """Get offset ranges for parallel processing"""
    assert count > num_workers
    step_sz = int(count / num_workers)
    offset_ranges = [0]
    pv_cnt = 1
    for i in range(num_workers):
        pv_cnt = count + 1 if i == num_workers - 1 else pv_cnt + step_sz
        offset_ranges.append(pv_cnt)
    return offset_ranges

human_readable_type_name(t) staticmethod

Generates a useful-for-humans label for a type. For builtin types, it's just the class name (eg "str" or "int"). For other types, it includes the module (eg "pathlib.Path").

Source code in hyfi/utils/funcs.py
@staticmethod
def human_readable_type_name(t: Type) -> str:
    """
    Generates a useful-for-humans label for a type.
    For builtin types, it's just the class name (eg "str" or "int").
    For other types, it includes the module (eg "pathlib.Path").
    """
    module = t.__module__
    if module == "builtins":
        return t.__qualname__
    elif module.split(".")[0] == "hyfi":
        module = "hyfi"

    try:
        return f"{module}.{t.__qualname__}"
    except AttributeError:
        return str(t)

humanbytes(B, units=None) staticmethod

Return the given bytes as a human friendly KB, MB, GB, or TB string

Source code in hyfi/utils/funcs.py
@staticmethod
def humanbytes(B, units=None):
    "Return the given bytes as a human friendly KB, MB, GB, or TB string"
    B = float(B)
    KB = float(1024)
    MB = float(KB**2)  # 1,048,576
    GB = float(KB**3)  # 1,073,741,824
    TB = float(KB**4)  # 1,099,511,627,776

    if (B < KB and units is None) or units == "B":
        return "{0} {1}".format(B, "Bytes" if 0 == B > 1 else "Byte")
    elif (KB <= B < MB and units is None) or units == "KiB":
        return "{0:.2f} KiB".format(B / KB)
    elif (MB <= B < GB and units is None) or units == "MiB":
        return "{0:.2f} MiB".format(B / MB)
    elif (GB <= B < TB and units is None) or units == "GiB":
        return "{0:.2f} GiB".format(B / GB)
    elif (TB <= B and units is None) or units == "TiB":
        return "{0:.2f} TiB".format(B / TB)

len_bytes(x) staticmethod

Return the length of a string in bytes

Source code in hyfi/utils/funcs.py
@staticmethod
def len_bytes(x):
    """Return the length of a string in bytes"""
    return FUNCs.utf8len(x)

len_segments(x, sep) staticmethod

Return the number of segments in a string

Source code in hyfi/utils/funcs.py
@staticmethod
def len_segments(x, sep):
    """Return the number of segments in a string"""
    sep = str(sep).encode("utf-8").decode("unicode-escape")
    return len(re.sub(r"(\r?\n|\r){2,}", sep, x).split(sep))

len_sents(x, sep) staticmethod

Return the number of sentences in a string

Source code in hyfi/utils/funcs.py
@staticmethod
def len_sents(x, sep):
    """Return the number of sentences in a string"""
    sep = str(sep).encode("utf-8").decode("unicode-escape")
    return len(re.sub(r"(\r?\n|\r){1,}", sep, x).split(sep))

len_words(x) staticmethod

Return the number of words in a string

Source code in hyfi/utils/funcs.py
@staticmethod
def len_words(x):
    """Return the number of words in a string"""
    return len(x.split()) if isinstance(x, str) else 0

len_wospc(x) staticmethod

Return the length of a string in bytes without spaces

Source code in hyfi/utils/funcs.py
@staticmethod
def len_wospc(x):
    """Return the length of a string in bytes without spaces"""
    return FUNCs.utf8len(re.sub(r"\s", "", str(x)))

lower_case_with_underscores(string) staticmethod

Converts 'CamelCased' to 'camel_cased'.

Source code in hyfi/utils/funcs.py
@staticmethod
def lower_case_with_underscores(string):
    """Converts 'CamelCased' to 'camel_cased'."""
    return (
        re.sub(r"\s+", "_", re.sub(r"(?<!^)(?=[A-Z])", "_", string).lower())
        .replace("-", "_")
        .replace("__", "_")
    )

now(_format='%Y-%m-%d %H:%M:%S') staticmethod

Return current date and time

Source code in hyfi/utils/funcs.py
@staticmethod
def now(_format="%Y-%m-%d %H:%M:%S"):
    """Return current date and time"""
    from datetime import datetime

    return datetime.now() if _format is None else datetime.now().strftime(_format)

ordinal(num) staticmethod

Return the ordinal of a number as a string.

Source code in hyfi/utils/funcs.py
@staticmethod
def ordinal(num):
    """Return the ordinal of a number as a string."""
    return "%d%s" % (
        num,
        "tsnrhtdd"[(num // 10 % 10 != 1) * (num % 10 < 4) * num % 10 :: 4],
    )

parse_size(sizestr) staticmethod

Parse a size string into a number of bytes. For example, "16K" will return 16384. If no suffix is provided, bytes are assumed. This function is case-insensitive.

:param sizestr: A string representing a size, such as "16K", "2M", "1G". :return: The number of bytes that the string represents.

Source code in hyfi/utils/funcs.py
@staticmethod
def parse_size(sizestr):
    """
    Parse a size string into a number of bytes. For example, "16K" will
    return 16384.  If no suffix is provided, bytes are assumed.  This
    function is case-insensitive.

    :param sizestr: A string representing a size, such as "16K", "2M", "1G".
    :return: The number of bytes that the string represents.
    """
    unit = sizestr[-1]
    size = float(sizestr[:-1])

    if unit.upper() == "B":
        return size
    if unit.upper() == "K":
        return size * 1024
    if unit.upper() == "M":
        return size * 1024 * 1024
    if unit.upper() == "G":
        return size * 1024 * 1024 * 1024
    if unit.upper() == "T":
        return size * 1024 * 1024 * 1024 * 1024

printf(action, msg='', style=None, indent=10, verbose=True, file_=sys.stdout) staticmethod

Print string with common format.

Source code in hyfi/utils/funcs.py
@staticmethod
def printf(
    action: str,
    msg: Any = "",
    style: Optional[IntSeq] = None,
    indent: int = 10,
    verbose: Union[bool, StrictBool] = True,
    file_: TextIO = sys.stdout,
) -> Optional[str]:
    """Print string with common format."""
    if not verbose:
        return None  # HACK: Satisfy MyPy
    _msg = str(msg)
    action = action.rjust(indent, " ")
    if not style:
        return action + _msg

    out = style + [action] + Style.RESET + [INDENT, _msg]  # type: ignore
    print(*out, sep="", file=file_)
    return None  # HACK: Satisfy MyPy

printf_exception(e, action, msg='', indent=0, quiet=False) staticmethod

Print exception with common format.

Source code in hyfi/utils/funcs.py
@staticmethod
def printf_exception(
    e: Exception, action: str, msg: str = "", indent: int = 0, quiet: bool = False
) -> None:
    """Print exception with common format."""
    if not quiet:
        print("", file=sys.stderr)
        FUNCs.printf(
            action, msg=msg, style=Style.DANGER, indent=indent, file_=sys.stderr
        )
        print(HLINE, file=sys.stderr)
        print(e, file=sys.stderr)
        print(HLINE, file=sys.stderr)

readable_types_list(type_list) staticmethod

Generates a useful-for-humans label for a list of types.

Source code in hyfi/utils/funcs.py
@staticmethod
def readable_types_list(type_list: List[Type]) -> str:
    """Generates a useful-for-humans label for a list of types."""
    return ", ".join(FUNCs.human_readable_type_name(t) for t in type_list)

strptime(_date_str, _format='%Y-%m-%d') staticmethod

Return a datetime object from a string

Source code in hyfi/utils/funcs.py
@staticmethod
def strptime(
    _date_str: str,
    _format: str = "%Y-%m-%d",
):
    """Return a datetime object from a string"""
    from datetime import datetime

    return datetime.strptime(_date_str, _format)

to_dateparm(_date, _format='%Y-%m-%d') staticmethod

Return a date parameter string

Source code in hyfi/utils/funcs.py
@staticmethod
def to_dateparm(_date, _format="%Y-%m-%d"):
    """Return a date parameter string"""
    from datetime import datetime

    _dtstr = datetime.strftime(_date, _format)
    _dtstr = "${to_datetime:" + _dtstr + "," + _format + "}"
    return _dtstr

today(_format='%Y-%m-%d') staticmethod

Return today's date

Source code in hyfi/utils/funcs.py
@staticmethod
def today(_format="%Y-%m-%d"):
    """Return today's date"""
    from datetime import datetime

    if _format is None:
        return datetime.today().date()
    else:
        return datetime.today().strftime(_format)

unescape_dict(d) staticmethod

Unescape a dictionary

Source code in hyfi/utils/funcs.py
@staticmethod
def unescape_dict(d):
    """Unescape a dictionary"""
    return ast.literal_eval(repr(d).encode("utf-8").decode("unicode-escape"))

utf8len(s) staticmethod

Return the length of a string in bytes

Source code in hyfi/utils/funcs.py
@staticmethod
def utf8len(s):
    """Return the length of a string in bytes"""
    return len(str(s).encode("utf-8"))

GPUs

Source code in hyfi/utils/gpumon.py
class GPUs:
    ###############################
    # GPU Utility functions
    ###############################
    @staticmethod
    def nvidia_smi():
        return nvidia_smi()

    @staticmethod
    def set_cuda(device=0):
        return set_cuda(device)

    @staticmethod
    def gpu_usage(all=False, attrList=None, useOldCode=False):
        """
        Show GPU utilization in human readable format. This is a wrapper around the GPUtil library.

        Args:
                all: If True show all available GPUs ( default : False )
                attrList: List of attributes to show ( default : None )
                useOldCode: If True use old code instead of new code ( default : False )

        Returns:
                A string with the
        """
        try:
            from GPUtil import showUtilization  # type: ignore
        except ImportError:
            logger.error("GPUtil is not installed. To install, run: pip install GPUtil")
            return

        return showUtilization(all, attrList, useOldCode)

gpu_usage(all=False, attrList=None, useOldCode=False) staticmethod

Show GPU utilization in human readable format. This is a wrapper around the GPUtil library.

Parameters:

Name Type Description Default
all

If True show all available GPUs ( default : False )

False
attrList

List of attributes to show ( default : None )

None
useOldCode

If True use old code instead of new code ( default : False )

False

Returns:

Type Description

A string with the

Source code in hyfi/utils/gpumon.py
@staticmethod
def gpu_usage(all=False, attrList=None, useOldCode=False):
    """
    Show GPU utilization in human readable format. This is a wrapper around the GPUtil library.

    Args:
            all: If True show all available GPUs ( default : False )
            attrList: List of attributes to show ( default : None )
            useOldCode: If True use old code instead of new code ( default : False )

    Returns:
            A string with the
    """
    try:
        from GPUtil import showUtilization  # type: ignore
    except ImportError:
        logger.error("GPUtil is not installed. To install, run: pip install GPUtil")
        return

    return showUtilization(all, attrList, useOldCode)

IOLIBs

Source code in hyfi/utils/iolibs.py
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
class IOLIBs:
    @staticmethod
    def is_valid_regex(expr: str) -> bool:
        """Check if a string is a valid regular expression"""
        try:
            if expr.startswith("r:"):
                expr = expr[2:]
            else:
                return False
            re.compile(expr)
            return True
        except re.error:
            return False

    @staticmethod
    def glob_re(
        pattern: str,
        base_dir: str,
        recursive: bool = False,
    ) -> list:
        """Glob files matching a regular expression"""
        if IOLIBs.is_valid_regex(pattern):
            pattern = pattern[2:]
            rpattern = re.compile(pattern)  # type: ignore
            files = []
            if recursive:
                for dirpath, _, filenames in os.walk(base_dir):
                    files += [
                        os.path.join(dirpath, file)
                        for file in filenames
                        if rpattern.search(file)
                    ]
            else:
                files = [
                    os.path.join(base_dir, file)
                    for file in os.listdir(base_dir)
                    if rpattern.search(file)
                ]
        else:
            file = os.path.join(base_dir, pattern)
            files = glob(file, recursive=recursive)
        return files

    @staticmethod
    def get_filepaths(
        filename_patterns: Union[List[PathLikeType], PathLikeType],
        base_dir: Optional[Union[str, PosixPath, WindowsPath]] = None,
        recursive: bool = True,
        use_cached: bool = False,
        verbose: bool = False,
        **kwargs,
    ) -> List[str]:
        """Get a list of filepaths from a list of filename patterns"""
        if filename_patterns is None:
            raise ValueError("filename_patterns must be specified")
        if isinstance(filename_patterns, (PosixPath, WindowsPath)):
            filename_patterns = str(filename_patterns)
        if isinstance(filename_patterns, str):
            filename_patterns = [filename_patterns]
        filepaths = []
        base_dir = str(base_dir) if base_dir else ""
        for f_pattern in filename_patterns:
            f_pattern = str(f_pattern)
            if f_pattern.startswith("http") and not use_cached:
                filepaths.append(f_pattern)
            else:
                if f_pattern.startswith("http"):
                    filepath = IOLIBs.cached_path(f_pattern, **kwargs)
                else:
                    filepath = os.path.join(base_dir, f_pattern)
                if isinstance(filepath, str) and os.path.exists(filepath):
                    if Path(filepath).is_file():
                        filepaths.append(filepath)
                else:
                    if os.path.dirname(f_pattern) != "":
                        _dir = os.path.dirname(f_pattern)
                        f_pattern = os.path.basename(f_pattern)
                        base_dir = os.path.join(base_dir, _dir)
                    filepaths += IOLIBs.glob_re(
                        f_pattern, base_dir, recursive=recursive
                    )
        filepaths = [
            fp for fp in filepaths if Path(fp).is_file() or fp.startswith("http")
        ]
        if verbose:
            logger.info(f"Processing [{len(filepaths)}] files from {filename_patterns}")

        return filepaths

    @staticmethod
    def get_files_from_archive(archive_path: str, filetype: str = ""):
        """Get a list of files from an archive"""
        import tarfile
        from zipfile import ZipFile

        if ".tar.gz" in archive_path:
            logger.info(f"::Extracting files from {archive_path} with tar.gz")
            archive_handle = tarfile.open(archive_path, "r:gz")
            files = [
                (file, file.name)
                for file in archive_handle.getmembers()
                if file.isfile()
            ]
            open_func = archive_handle.extractfile
        elif ".tar.bz2" in archive_path:
            logger.info(f"::Extracting files from {archive_path} with tar.bz2")
            archive_handle = tarfile.open(archive_path, "r:bz2")
            files = [
                (file, file.name)
                for file in archive_handle.getmembers()
                if file.isfile()
            ]
            open_func = archive_handle.extractfile
        elif ".zip" in archive_path:
            logger.info(f"::Extracting files from {archive_path} with zip")
            archive_handle = ZipFile(archive_path)
            files = [
                (file, file.encode("cp437").decode("euc-kr"))
                for file in archive_handle.namelist()
            ]
            open_func = archive_handle.open
        else:
            # print(f'::{archive_path} is not archive, use generic method')
            files = [(archive_path, os.path.basename(archive_path))]
            archive_handle = None
            open_func = None
        if filetype:
            files = [file for file in files if filetype in file[1]]

        return files, archive_handle, open_func

    @staticmethod
    def read(uri, mode="rb", encoding=None, head=None, **kwargs) -> bytes:
        """Read data from a file or url"""
        uri = str(uri)
        if uri.startswith("http"):
            import requests

            if mode == "r" and head is not None and isinstance(head, int):
                r = requests.get(uri, stream=True)
                r.raw.decode_content = True
                return r.raw.read(head)
            return requests.get(uri, **kwargs).content
        # elif uri.startswith("s3://"):
        #     import boto3

        #     s3 = boto3.resource("s3")
        #     bucket, key = uri.replace("s3://", "").split("/", 1)
        #     obj = s3.Object(bucket, key)
        #     return obj.get()["Body"].read()
        else:
            with open(uri, mode=mode, encoding=encoding) as f:
                if mode == "r" and head is not None and isinstance(head, int):
                    return f.read(head)
                return f.read()

    @staticmethod
    def walk_to_root(path: str) -> Iterator[str]:
        """
        Yield directories starting from the given directory up to the root
        """
        if not os.path.exists(path):
            raise IOError("Starting path not found")

        if os.path.isfile(path):
            path = os.path.dirname(path)

        last_dir = None
        current_dir = os.path.abspath(path)
        while last_dir != current_dir:
            yield current_dir
            parent_dir = os.path.abspath(os.path.join(current_dir, os.path.pardir))
            last_dir, current_dir = current_dir, parent_dir

    @staticmethod
    def is_file(a, *p) -> bool:
        """Check if path is a file"""
        _path = os.path.join(a, *p)
        return Path(_path).is_file()

    @staticmethod
    def is_dir(a, *p) -> bool:
        """Check if path is a directory"""
        _path = os.path.join(a, *p)
        return Path(_path).is_dir()

    @staticmethod
    def check_path(_path: str, alt_path: str = "") -> str:
        """Check if path exists, return alt_path if not"""
        return _path if os.path.exists(_path) else alt_path

    @staticmethod
    def mkdir(_path: str) -> str:
        """Create directory if it does not exist"""
        if _path is None:
            return ""
        Path(_path).mkdir(parents=True, exist_ok=True)
        return _path

    @staticmethod
    def exists(a, *p) -> bool:
        """Check if path exists"""
        if a is None:
            return False
        _path = os.path.join(a, *p)
        return os.path.exists(_path)

    @staticmethod
    def join_path(a, *p) -> str:
        """Join path components intelligently."""
        if not p or p[0] is None:
            return a
        p = [str(_p) for _p in p]
        return os.path.join(*p) if a is None else os.path.join(a, *p)

    @staticmethod
    def copy(
        src: PathLikeType,
        dst: PathLikeType,
        follow_symlinks: bool = True,
    ):
        """
        Copy a file or directory. This is a wrapper around shutil.copy.
        If you need to copy an entire directory (including all of its contents), or if you need to overwrite existing files in the destination directory, shutil.copy() would be a better choice.

        Args:
                src: Path to the file or directory to be copied.
                dst: Path to the destination directory. If the destination directory does not exist it will be created.
                follow_symlinks: Whether or not symlinks should be followed
        """
        src = str(src)
        dst = str(dst)
        IOLIBs.mkdir(dst)
        shutil.copy(src, dst, follow_symlinks=follow_symlinks)
        logger.info(f"copied {src} to {dst}")

    @staticmethod
    def copyfile(
        src: PathLikeType,
        dst: PathLikeType,
        follow_symlinks: bool = True,
    ):
        """
        Copy a file or directory. This is a wrapper around shutil.copyfile.
        If you want to copy a single file from one location to another, shutil.copyfile() is the appropriate function to use.

        Args:
                src: Path to the file or directory to copy.
                dst: Path to the destination file or directory. If the destination file already exists it will be overwritten.
                follow_symlinks: Whether to follow symbolic links or not
        """
        src = str(src)
        dst = str(dst)
        shutil.copyfile(src, dst, follow_symlinks=follow_symlinks)
        logger.info(f"copied {src} to {dst}")

    @staticmethod
    def copy_file(
        src: PathLikeType,
        dst: PathLikeType,
        follow_symlinks: bool = True,
    ) -> None:
        """Copy one file to another place."""
        src = str(src)
        dst = str(dst)
        shutil.copy2(src, dst, follow_symlinks=follow_symlinks)

    @staticmethod
    def get_modified_time(path):
        """Return the modification time of a file"""
        if not os.path.exists(path):
            return None
        modTimesinceEpoc = os.path.getmtime(path)
        return time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(modTimesinceEpoc))

    @staticmethod
    def handle_remove_readonly(
        func: Callable, path: str, exc: Tuple[BaseException, OSError, TracebackType]
    ) -> None:
        """Handle errors when trying to remove read-only files through `shutil.rmtree`.

        This handler makes sure the given file is writable, then re-execute the given removal function.

        Arguments:
            func: An OS-dependant function used to remove a file.
            path: The path to the file to remove.
            exc: A `sys.exc_info()` object.
        """
        excvalue = exc[1]
        if func in (os.rmdir, os.remove, os.unlink) and excvalue.errno == errno.EACCES:
            os.chmod(path, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO)  # 0777
            func(path)
        else:
            raise

    @staticmethod
    def readlink(link: Path) -> Path:
        """A custom version of os.readlink/pathlib.Path.readlink.

        pathlib.Path.readlink is what we ideally would want to use, but it is only available on python>=3.9.
        os.readlink doesn't support Path and bytes on Windows for python<3.8
        """
        if sys.version_info >= (3, 9):
            return link.readlink()
        elif sys.version_info >= (3, 8) or os.name != "nt":
            return Path(os.readlink(link))
        else:
            return Path(os.readlink(str(link)))

    @staticmethod
    def extractall(
        path: str,
        dest: str = "",
        force_extract: bool = False,
    ):
        """Extract archive file.

        Parameters
        ----------
        path: str
            Path of archive file to be extracted.
        dest: str, optional
            Directory to which the archive file will be extracted.
            If None, it will be set to the parent directory of the archive file.
        """
        import tarfile
        from zipfile import ZipFile

        if dest is None:
            dest = os.path.dirname(path)

        if path.endswith(".zip"):
            opener, mode = ZipFile, "r"
        elif path.endswith(".tar"):
            opener, mode = tarfile.open, "r"
        elif path.endswith(".tar.gz") or path.endswith(".tgz"):
            opener, mode = tarfile.open, "r:gz"
        elif path.endswith(".tar.bz2") or path.endswith(".tbz"):
            opener, mode = tarfile.open, "r:bz2"
        else:
            logger.warning(
                f"Could not extract '{path}' as no appropriate extractor is found"
            )
            return path, None

        def namelist(f):
            return (
                f.namelist() if isinstance(f, ZipFile) else [m.path for m in f.members]
            )

        def filelist(f):
            files = []
            for fname in namelist(f):
                fname = os.path.join(dest, fname)
                files.append(fname)
            return files

        extraction_name = Path(path).stem
        extraction_path = f"{dest}/{extraction_name}"
        if extraction_path and (
            os.path.isdir(extraction_path)
            and os.listdir(extraction_path)
            and not force_extract
        ):
            files = [
                os.path.join(dirpath, filename)
                for dirpath, _, filenames in os.walk(extraction_path)
                for filename in filenames
            ]

            return dest, files

        with opener(path, mode) as f:  # type: ignore
            f.extractall(path=dest)

        return dest, filelist(f)

    @staticmethod
    def save_wordlist(
        words: List[str],
        filepath: Union[str, PosixPath, WindowsPath, Path],
        sort: bool = True,
        encoding: str = "utf-8",
        verbose: bool = True,
    ):
        """Save the word list to the file."""
        if sort:
            words = sorted(words)
        if verbose:
            logger.info(
                "Save the list to the file: %s, no. of words: %s", filepath, len(words)
            )
        with open(filepath, "w", encoding=encoding) as fo_:
            for word in words:
                fo_.write(word + "\n")

    @staticmethod
    def load_wordlist(
        filepath: Union[str, PosixPath, WindowsPath, Path],
        sort: bool = True,
        lowercase: bool = False,
        unique: bool = True,
        remove_tag: bool = False,
        max_ngram_to_include: Optional[int] = None,
        ngram_delimiter: str = ";",
        remove_delimiter: bool = False,
        encoding: str = "utf-8",
        verbose: bool = True,
    ) -> List[str]:
        """Load the word list from the file."""
        filepath = Path(filepath)
        if not filepath.is_file():
            logger.warning("File not found: %s", filepath)
            return []

        with open(filepath, encoding=encoding) as fo_:
            words = [word.strip().split()[0] for word in fo_ if len(word.strip()) > 0]

        if verbose:
            logger.info("Loaded the file: %s, No. of words: %s", filepath, len(words))

        return IOLIBs.process_wordlist(
            words,
            sort=sort,
            lowercase=lowercase,
            unique=unique,
            remove_tag=remove_tag,
            max_ngram_to_include=max_ngram_to_include,
            ngram_delimiter=ngram_delimiter,
            remove_delimiter=remove_delimiter,
            verbose=verbose,
        )

    @staticmethod
    def process_wordlist(
        words: List[str],
        sort: bool = True,
        lowercase: bool = False,
        unique: bool = True,
        remove_tag: bool = False,
        max_ngram_to_include: Optional[int] = None,
        ngram_delimiter: str = ";",
        remove_delimiter: bool = False,
        verbose: bool = True,
    ) -> List[str]:
        """Preprocess the word list.

        Args:
            words (List[str]): List of words.
            sort (bool, optional): Sort the words. Defaults to True.
            lowercase (bool, optional): Convert the words to lowercase. Defaults to False.
            unique (bool, optional): Remove duplicate words. Defaults to True.
            remove_tag (bool, optional): Remove the tag from the words. Defaults to False.
            max_ngram_to_include (Optional[int], optional): Maximum ngram to include. Defaults to None.
            ngram_delimiter (str, optional): Delimiter for ngram. Defaults to ";".
            remove_delimiter (bool, optional): Remove the delimiter. Defaults to False.
            verbose (bool, optional): Show the progress. Defaults to True.

        Returns:
            List[str]: List of words.
        """
        if remove_delimiter:
            words = [word.replace(ngram_delimiter, "") for word in words]
        if max_ngram_to_include:
            words = [
                word
                for word in words
                if len(word.split(ngram_delimiter)) <= max_ngram_to_include
            ]

        if remove_tag:
            words = [word.split("/")[0] for word in words]
        words = [
            word.lower() if lowercase else word
            for word in words
            if not word.startswith("#")
        ]
        if unique:
            words = list(set(words))
            if verbose:
                logger.info(
                    "Remove duplicate words, No. of words: %s",
                    len(words),
                )
        if sort:
            words = sorted(words)
        return words

    @staticmethod
    def append_to_jsonl(
        data: dict,
        filename: str,
        encoding: str = "utf-8",
    ) -> None:
        """Append a json payload to the end of a jsonl file."""
        json_string = json.dumps(data, ensure_ascii=False)
        with open(filename, "a", encoding=encoding) as f:
            f.write(json_string + "\n")

    @staticmethod
    def load_jsonl(
        filename: str,
        encoding: str = "utf-8",
    ) -> List[dict]:
        """Load a jsonl file into a list of json objects."""
        with open(filename, "r", encoding=encoding) as f:
            return [json.loads(line) for line in f]

    @staticmethod
    def save_jsonl(
        data: List[dict],
        filename: str,
        encoding: str = "utf-8",
    ) -> None:
        """
        Save a list of json objects to a jsonl file.
        """
        with open(filename, "w", encoding=encoding) as f:
            for line in data:
                f.write(json.dumps(line, ensure_ascii=False) + "\n")

    @staticmethod
    def remove_duplicates_from_list_of_dicts(
        data: List[dict],
        key: str,
    ) -> List[dict]:
        """Remove duplicates from a list of dicts based on a key."""
        seen = set()
        new_data = []
        for d in data:
            if d[key] not in seen:
                new_data.append(d)
                seen.add(d[key])
        return new_data

append_to_jsonl(data, filename, encoding='utf-8') staticmethod

Append a json payload to the end of a jsonl file.

Source code in hyfi/utils/iolibs.py
@staticmethod
def append_to_jsonl(
    data: dict,
    filename: str,
    encoding: str = "utf-8",
) -> None:
    """Append a json payload to the end of a jsonl file."""
    json_string = json.dumps(data, ensure_ascii=False)
    with open(filename, "a", encoding=encoding) as f:
        f.write(json_string + "\n")

check_path(_path, alt_path='') staticmethod

Check if path exists, return alt_path if not

Source code in hyfi/utils/iolibs.py
@staticmethod
def check_path(_path: str, alt_path: str = "") -> str:
    """Check if path exists, return alt_path if not"""
    return _path if os.path.exists(_path) else alt_path

copy(src, dst, follow_symlinks=True) staticmethod

Copy a file or directory. This is a wrapper around shutil.copy. If you need to copy an entire directory (including all of its contents), or if you need to overwrite existing files in the destination directory, shutil.copy() would be a better choice.

Parameters:

Name Type Description Default
src PathLikeType

Path to the file or directory to be copied.

required
dst PathLikeType

Path to the destination directory. If the destination directory does not exist it will be created.

required
follow_symlinks bool

Whether or not symlinks should be followed

True
Source code in hyfi/utils/iolibs.py
@staticmethod
def copy(
    src: PathLikeType,
    dst: PathLikeType,
    follow_symlinks: bool = True,
):
    """
    Copy a file or directory. This is a wrapper around shutil.copy.
    If you need to copy an entire directory (including all of its contents), or if you need to overwrite existing files in the destination directory, shutil.copy() would be a better choice.

    Args:
            src: Path to the file or directory to be copied.
            dst: Path to the destination directory. If the destination directory does not exist it will be created.
            follow_symlinks: Whether or not symlinks should be followed
    """
    src = str(src)
    dst = str(dst)
    IOLIBs.mkdir(dst)
    shutil.copy(src, dst, follow_symlinks=follow_symlinks)
    logger.info(f"copied {src} to {dst}")

copy_file(src, dst, follow_symlinks=True) staticmethod

Copy one file to another place.

Source code in hyfi/utils/iolibs.py
@staticmethod
def copy_file(
    src: PathLikeType,
    dst: PathLikeType,
    follow_symlinks: bool = True,
) -> None:
    """Copy one file to another place."""
    src = str(src)
    dst = str(dst)
    shutil.copy2(src, dst, follow_symlinks=follow_symlinks)

copyfile(src, dst, follow_symlinks=True) staticmethod

Copy a file or directory. This is a wrapper around shutil.copyfile. If you want to copy a single file from one location to another, shutil.copyfile() is the appropriate function to use.

Parameters:

Name Type Description Default
src PathLikeType

Path to the file or directory to copy.

required
dst PathLikeType

Path to the destination file or directory. If the destination file already exists it will be overwritten.

required
follow_symlinks bool

Whether to follow symbolic links or not

True
Source code in hyfi/utils/iolibs.py
@staticmethod
def copyfile(
    src: PathLikeType,
    dst: PathLikeType,
    follow_symlinks: bool = True,
):
    """
    Copy a file or directory. This is a wrapper around shutil.copyfile.
    If you want to copy a single file from one location to another, shutil.copyfile() is the appropriate function to use.

    Args:
            src: Path to the file or directory to copy.
            dst: Path to the destination file or directory. If the destination file already exists it will be overwritten.
            follow_symlinks: Whether to follow symbolic links or not
    """
    src = str(src)
    dst = str(dst)
    shutil.copyfile(src, dst, follow_symlinks=follow_symlinks)
    logger.info(f"copied {src} to {dst}")

exists(a, *p) staticmethod

Check if path exists

Source code in hyfi/utils/iolibs.py
@staticmethod
def exists(a, *p) -> bool:
    """Check if path exists"""
    if a is None:
        return False
    _path = os.path.join(a, *p)
    return os.path.exists(_path)

extractall(path, dest='', force_extract=False) staticmethod

Extract archive file.

Parameters

path: str Path of archive file to be extracted. dest: str, optional Directory to which the archive file will be extracted. If None, it will be set to the parent directory of the archive file.

Source code in hyfi/utils/iolibs.py
@staticmethod
def extractall(
    path: str,
    dest: str = "",
    force_extract: bool = False,
):
    """Extract archive file.

    Parameters
    ----------
    path: str
        Path of archive file to be extracted.
    dest: str, optional
        Directory to which the archive file will be extracted.
        If None, it will be set to the parent directory of the archive file.
    """
    import tarfile
    from zipfile import ZipFile

    if dest is None:
        dest = os.path.dirname(path)

    if path.endswith(".zip"):
        opener, mode = ZipFile, "r"
    elif path.endswith(".tar"):
        opener, mode = tarfile.open, "r"
    elif path.endswith(".tar.gz") or path.endswith(".tgz"):
        opener, mode = tarfile.open, "r:gz"
    elif path.endswith(".tar.bz2") or path.endswith(".tbz"):
        opener, mode = tarfile.open, "r:bz2"
    else:
        logger.warning(
            f"Could not extract '{path}' as no appropriate extractor is found"
        )
        return path, None

    def namelist(f):
        return (
            f.namelist() if isinstance(f, ZipFile) else [m.path for m in f.members]
        )

    def filelist(f):
        files = []
        for fname in namelist(f):
            fname = os.path.join(dest, fname)
            files.append(fname)
        return files

    extraction_name = Path(path).stem
    extraction_path = f"{dest}/{extraction_name}"
    if extraction_path and (
        os.path.isdir(extraction_path)
        and os.listdir(extraction_path)
        and not force_extract
    ):
        files = [
            os.path.join(dirpath, filename)
            for dirpath, _, filenames in os.walk(extraction_path)
            for filename in filenames
        ]

        return dest, files

    with opener(path, mode) as f:  # type: ignore
        f.extractall(path=dest)

    return dest, filelist(f)

get_filepaths(filename_patterns, base_dir=None, recursive=True, use_cached=False, verbose=False, **kwargs) staticmethod

Get a list of filepaths from a list of filename patterns

Source code in hyfi/utils/iolibs.py
@staticmethod
def get_filepaths(
    filename_patterns: Union[List[PathLikeType], PathLikeType],
    base_dir: Optional[Union[str, PosixPath, WindowsPath]] = None,
    recursive: bool = True,
    use_cached: bool = False,
    verbose: bool = False,
    **kwargs,
) -> List[str]:
    """Get a list of filepaths from a list of filename patterns"""
    if filename_patterns is None:
        raise ValueError("filename_patterns must be specified")
    if isinstance(filename_patterns, (PosixPath, WindowsPath)):
        filename_patterns = str(filename_patterns)
    if isinstance(filename_patterns, str):
        filename_patterns = [filename_patterns]
    filepaths = []
    base_dir = str(base_dir) if base_dir else ""
    for f_pattern in filename_patterns:
        f_pattern = str(f_pattern)
        if f_pattern.startswith("http") and not use_cached:
            filepaths.append(f_pattern)
        else:
            if f_pattern.startswith("http"):
                filepath = IOLIBs.cached_path(f_pattern, **kwargs)
            else:
                filepath = os.path.join(base_dir, f_pattern)
            if isinstance(filepath, str) and os.path.exists(filepath):
                if Path(filepath).is_file():
                    filepaths.append(filepath)
            else:
                if os.path.dirname(f_pattern) != "":
                    _dir = os.path.dirname(f_pattern)
                    f_pattern = os.path.basename(f_pattern)
                    base_dir = os.path.join(base_dir, _dir)
                filepaths += IOLIBs.glob_re(
                    f_pattern, base_dir, recursive=recursive
                )
    filepaths = [
        fp for fp in filepaths if Path(fp).is_file() or fp.startswith("http")
    ]
    if verbose:
        logger.info(f"Processing [{len(filepaths)}] files from {filename_patterns}")

    return filepaths

get_files_from_archive(archive_path, filetype='') staticmethod

Get a list of files from an archive

Source code in hyfi/utils/iolibs.py
@staticmethod
def get_files_from_archive(archive_path: str, filetype: str = ""):
    """Get a list of files from an archive"""
    import tarfile
    from zipfile import ZipFile

    if ".tar.gz" in archive_path:
        logger.info(f"::Extracting files from {archive_path} with tar.gz")
        archive_handle = tarfile.open(archive_path, "r:gz")
        files = [
            (file, file.name)
            for file in archive_handle.getmembers()
            if file.isfile()
        ]
        open_func = archive_handle.extractfile
    elif ".tar.bz2" in archive_path:
        logger.info(f"::Extracting files from {archive_path} with tar.bz2")
        archive_handle = tarfile.open(archive_path, "r:bz2")
        files = [
            (file, file.name)
            for file in archive_handle.getmembers()
            if file.isfile()
        ]
        open_func = archive_handle.extractfile
    elif ".zip" in archive_path:
        logger.info(f"::Extracting files from {archive_path} with zip")
        archive_handle = ZipFile(archive_path)
        files = [
            (file, file.encode("cp437").decode("euc-kr"))
            for file in archive_handle.namelist()
        ]
        open_func = archive_handle.open
    else:
        # print(f'::{archive_path} is not archive, use generic method')
        files = [(archive_path, os.path.basename(archive_path))]
        archive_handle = None
        open_func = None
    if filetype:
        files = [file for file in files if filetype in file[1]]

    return files, archive_handle, open_func

get_modified_time(path) staticmethod

Return the modification time of a file

Source code in hyfi/utils/iolibs.py
@staticmethod
def get_modified_time(path):
    """Return the modification time of a file"""
    if not os.path.exists(path):
        return None
    modTimesinceEpoc = os.path.getmtime(path)
    return time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(modTimesinceEpoc))

glob_re(pattern, base_dir, recursive=False) staticmethod

Glob files matching a regular expression

Source code in hyfi/utils/iolibs.py
@staticmethod
def glob_re(
    pattern: str,
    base_dir: str,
    recursive: bool = False,
) -> list:
    """Glob files matching a regular expression"""
    if IOLIBs.is_valid_regex(pattern):
        pattern = pattern[2:]
        rpattern = re.compile(pattern)  # type: ignore
        files = []
        if recursive:
            for dirpath, _, filenames in os.walk(base_dir):
                files += [
                    os.path.join(dirpath, file)
                    for file in filenames
                    if rpattern.search(file)
                ]
        else:
            files = [
                os.path.join(base_dir, file)
                for file in os.listdir(base_dir)
                if rpattern.search(file)
            ]
    else:
        file = os.path.join(base_dir, pattern)
        files = glob(file, recursive=recursive)
    return files

handle_remove_readonly(func, path, exc) staticmethod

Handle errors when trying to remove read-only files through shutil.rmtree.

This handler makes sure the given file is writable, then re-execute the given removal function.

Parameters:

Name Type Description Default
func Callable

An OS-dependant function used to remove a file.

required
path str

The path to the file to remove.

required
exc Tuple[BaseException, OSError, TracebackType]

A sys.exc_info() object.

required
Source code in hyfi/utils/iolibs.py
@staticmethod
def handle_remove_readonly(
    func: Callable, path: str, exc: Tuple[BaseException, OSError, TracebackType]
) -> None:
    """Handle errors when trying to remove read-only files through `shutil.rmtree`.

    This handler makes sure the given file is writable, then re-execute the given removal function.

    Arguments:
        func: An OS-dependant function used to remove a file.
        path: The path to the file to remove.
        exc: A `sys.exc_info()` object.
    """
    excvalue = exc[1]
    if func in (os.rmdir, os.remove, os.unlink) and excvalue.errno == errno.EACCES:
        os.chmod(path, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO)  # 0777
        func(path)
    else:
        raise

is_dir(a, *p) staticmethod

Check if path is a directory

Source code in hyfi/utils/iolibs.py
@staticmethod
def is_dir(a, *p) -> bool:
    """Check if path is a directory"""
    _path = os.path.join(a, *p)
    return Path(_path).is_dir()

is_file(a, *p) staticmethod

Check if path is a file

Source code in hyfi/utils/iolibs.py
@staticmethod
def is_file(a, *p) -> bool:
    """Check if path is a file"""
    _path = os.path.join(a, *p)
    return Path(_path).is_file()

is_valid_regex(expr) staticmethod

Check if a string is a valid regular expression

Source code in hyfi/utils/iolibs.py
@staticmethod
def is_valid_regex(expr: str) -> bool:
    """Check if a string is a valid regular expression"""
    try:
        if expr.startswith("r:"):
            expr = expr[2:]
        else:
            return False
        re.compile(expr)
        return True
    except re.error:
        return False

join_path(a, *p) staticmethod

Join path components intelligently.

Source code in hyfi/utils/iolibs.py
@staticmethod
def join_path(a, *p) -> str:
    """Join path components intelligently."""
    if not p or p[0] is None:
        return a
    p = [str(_p) for _p in p]
    return os.path.join(*p) if a is None else os.path.join(a, *p)

load_jsonl(filename, encoding='utf-8') staticmethod

Load a jsonl file into a list of json objects.

Source code in hyfi/utils/iolibs.py
@staticmethod
def load_jsonl(
    filename: str,
    encoding: str = "utf-8",
) -> List[dict]:
    """Load a jsonl file into a list of json objects."""
    with open(filename, "r", encoding=encoding) as f:
        return [json.loads(line) for line in f]

load_wordlist(filepath, sort=True, lowercase=False, unique=True, remove_tag=False, max_ngram_to_include=None, ngram_delimiter=';', remove_delimiter=False, encoding='utf-8', verbose=True) staticmethod

Load the word list from the file.

Source code in hyfi/utils/iolibs.py
@staticmethod
def load_wordlist(
    filepath: Union[str, PosixPath, WindowsPath, Path],
    sort: bool = True,
    lowercase: bool = False,
    unique: bool = True,
    remove_tag: bool = False,
    max_ngram_to_include: Optional[int] = None,
    ngram_delimiter: str = ";",
    remove_delimiter: bool = False,
    encoding: str = "utf-8",
    verbose: bool = True,
) -> List[str]:
    """Load the word list from the file."""
    filepath = Path(filepath)
    if not filepath.is_file():
        logger.warning("File not found: %s", filepath)
        return []

    with open(filepath, encoding=encoding) as fo_:
        words = [word.strip().split()[0] for word in fo_ if len(word.strip()) > 0]

    if verbose:
        logger.info("Loaded the file: %s, No. of words: %s", filepath, len(words))

    return IOLIBs.process_wordlist(
        words,
        sort=sort,
        lowercase=lowercase,
        unique=unique,
        remove_tag=remove_tag,
        max_ngram_to_include=max_ngram_to_include,
        ngram_delimiter=ngram_delimiter,
        remove_delimiter=remove_delimiter,
        verbose=verbose,
    )

mkdir(_path) staticmethod

Create directory if it does not exist

Source code in hyfi/utils/iolibs.py
@staticmethod
def mkdir(_path: str) -> str:
    """Create directory if it does not exist"""
    if _path is None:
        return ""
    Path(_path).mkdir(parents=True, exist_ok=True)
    return _path

process_wordlist(words, sort=True, lowercase=False, unique=True, remove_tag=False, max_ngram_to_include=None, ngram_delimiter=';', remove_delimiter=False, verbose=True) staticmethod

Preprocess the word list.

Parameters:

Name Type Description Default
words List[str]

List of words.

required
sort bool

Sort the words. Defaults to True.

True
lowercase bool

Convert the words to lowercase. Defaults to False.

False
unique bool

Remove duplicate words. Defaults to True.

True
remove_tag bool

Remove the tag from the words. Defaults to False.

False
max_ngram_to_include Optional[int]

Maximum ngram to include. Defaults to None.

None
ngram_delimiter str

Delimiter for ngram. Defaults to ";".

';'
remove_delimiter bool

Remove the delimiter. Defaults to False.

False
verbose bool

Show the progress. Defaults to True.

True

Returns:

Type Description
List[str]

List[str]: List of words.

Source code in hyfi/utils/iolibs.py
@staticmethod
def process_wordlist(
    words: List[str],
    sort: bool = True,
    lowercase: bool = False,
    unique: bool = True,
    remove_tag: bool = False,
    max_ngram_to_include: Optional[int] = None,
    ngram_delimiter: str = ";",
    remove_delimiter: bool = False,
    verbose: bool = True,
) -> List[str]:
    """Preprocess the word list.

    Args:
        words (List[str]): List of words.
        sort (bool, optional): Sort the words. Defaults to True.
        lowercase (bool, optional): Convert the words to lowercase. Defaults to False.
        unique (bool, optional): Remove duplicate words. Defaults to True.
        remove_tag (bool, optional): Remove the tag from the words. Defaults to False.
        max_ngram_to_include (Optional[int], optional): Maximum ngram to include. Defaults to None.
        ngram_delimiter (str, optional): Delimiter for ngram. Defaults to ";".
        remove_delimiter (bool, optional): Remove the delimiter. Defaults to False.
        verbose (bool, optional): Show the progress. Defaults to True.

    Returns:
        List[str]: List of words.
    """
    if remove_delimiter:
        words = [word.replace(ngram_delimiter, "") for word in words]
    if max_ngram_to_include:
        words = [
            word
            for word in words
            if len(word.split(ngram_delimiter)) <= max_ngram_to_include
        ]

    if remove_tag:
        words = [word.split("/")[0] for word in words]
    words = [
        word.lower() if lowercase else word
        for word in words
        if not word.startswith("#")
    ]
    if unique:
        words = list(set(words))
        if verbose:
            logger.info(
                "Remove duplicate words, No. of words: %s",
                len(words),
            )
    if sort:
        words = sorted(words)
    return words

read(uri, mode='rb', encoding=None, head=None, **kwargs) staticmethod

Read data from a file or url

Source code in hyfi/utils/iolibs.py
@staticmethod
def read(uri, mode="rb", encoding=None, head=None, **kwargs) -> bytes:
    """Read data from a file or url"""
    uri = str(uri)
    if uri.startswith("http"):
        import requests

        if mode == "r" and head is not None and isinstance(head, int):
            r = requests.get(uri, stream=True)
            r.raw.decode_content = True
            return r.raw.read(head)
        return requests.get(uri, **kwargs).content
    # elif uri.startswith("s3://"):
    #     import boto3

    #     s3 = boto3.resource("s3")
    #     bucket, key = uri.replace("s3://", "").split("/", 1)
    #     obj = s3.Object(bucket, key)
    #     return obj.get()["Body"].read()
    else:
        with open(uri, mode=mode, encoding=encoding) as f:
            if mode == "r" and head is not None and isinstance(head, int):
                return f.read(head)
            return f.read()

A custom version of os.readlink/pathlib.Path.readlink.

pathlib.Path.readlink is what we ideally would want to use, but it is only available on python>=3.9. os.readlink doesn't support Path and bytes on Windows for python<3.8

Source code in hyfi/utils/iolibs.py
@staticmethod
def readlink(link: Path) -> Path:
    """A custom version of os.readlink/pathlib.Path.readlink.

    pathlib.Path.readlink is what we ideally would want to use, but it is only available on python>=3.9.
    os.readlink doesn't support Path and bytes on Windows for python<3.8
    """
    if sys.version_info >= (3, 9):
        return link.readlink()
    elif sys.version_info >= (3, 8) or os.name != "nt":
        return Path(os.readlink(link))
    else:
        return Path(os.readlink(str(link)))

remove_duplicates_from_list_of_dicts(data, key) staticmethod

Remove duplicates from a list of dicts based on a key.

Source code in hyfi/utils/iolibs.py
@staticmethod
def remove_duplicates_from_list_of_dicts(
    data: List[dict],
    key: str,
) -> List[dict]:
    """Remove duplicates from a list of dicts based on a key."""
    seen = set()
    new_data = []
    for d in data:
        if d[key] not in seen:
            new_data.append(d)
            seen.add(d[key])
    return new_data

save_jsonl(data, filename, encoding='utf-8') staticmethod

Save a list of json objects to a jsonl file.

Source code in hyfi/utils/iolibs.py
@staticmethod
def save_jsonl(
    data: List[dict],
    filename: str,
    encoding: str = "utf-8",
) -> None:
    """
    Save a list of json objects to a jsonl file.
    """
    with open(filename, "w", encoding=encoding) as f:
        for line in data:
            f.write(json.dumps(line, ensure_ascii=False) + "\n")

save_wordlist(words, filepath, sort=True, encoding='utf-8', verbose=True) staticmethod

Save the word list to the file.

Source code in hyfi/utils/iolibs.py
@staticmethod
def save_wordlist(
    words: List[str],
    filepath: Union[str, PosixPath, WindowsPath, Path],
    sort: bool = True,
    encoding: str = "utf-8",
    verbose: bool = True,
):
    """Save the word list to the file."""
    if sort:
        words = sorted(words)
    if verbose:
        logger.info(
            "Save the list to the file: %s, no. of words: %s", filepath, len(words)
        )
    with open(filepath, "w", encoding=encoding) as fo_:
        for word in words:
            fo_.write(word + "\n")

walk_to_root(path) staticmethod

Yield directories starting from the given directory up to the root

Source code in hyfi/utils/iolibs.py
@staticmethod
def walk_to_root(path: str) -> Iterator[str]:
    """
    Yield directories starting from the given directory up to the root
    """
    if not os.path.exists(path):
        raise IOError("Starting path not found")

    if os.path.isfile(path):
        path = os.path.dirname(path)

    last_dir = None
    current_dir = os.path.abspath(path)
    while last_dir != current_dir:
        yield current_dir
        parent_dir = os.path.abspath(os.path.join(current_dir, os.path.pardir))
        last_dir, current_dir = current_dir, parent_dir

LOGGING

Source code in hyfi/utils/logging.py
class LOGGING:
    @staticmethod
    def setLogger(
        level=None,
        force=True,
        filterwarnings_action="ignore",
        **kwargs,
    ) -> None:
        """
        Set the logging level and format. This is a wrapper around logging. basicConfig to allow the user to specify a different logging level for each test and to filter warnings that are not caught by the filterwarnings_action

        Args:
            level: The logging level to use
            force: If True ( default ) all warnings will be logged even if there are no warnings
            filterwarnings_action: The action to call when a warning is logged

        Returns:
            The logger that was
        """
        level = level or os.environ.get("HYFI_LOG_LEVEL") or "INFO"
        level = level.upper()
        os.environ["HYFI_LOG_LEVEL"] = level
        # Filter warnings by applying filterwarnings_action to the warnings.
        if filterwarnings_action is not None:
            warnings.filterwarnings(filterwarnings_action)  # type: ignore
        # Return the logging level.
        if isinstance(level, str):
            level = getattr(logging, level.upper(), logging.INFO)
        # Configure logging level level and force logging.
        if sys.version_info >= (3, 8):
            logging.basicConfig(level=level, force=force, **kwargs)
        else:
            logging.basicConfig(level=level, **kwargs)

    @staticmethod
    def getLogger(
        _name=None,
        _log_level=None,
        **kwargs,
    ) -> logging.Logger:
        """
        Get a logger with a given name and log level. It is possible to pass a logger name and log level to this function.

        Args:
            _name: The name of the logger to get. If not specified the name of the module is used.
            _log_level: The log level to set.

        Returns:
            The logger with the given name and log level set to the value specified in HYFI_LOG_LEVEL
        """
        _name = _name or __name__
        logger = logging.getLogger(_name)
        _log_level = _log_level or os.environ.get("HYFI_LOG_LEVEL") or "INFO"
        logger.setLevel(_log_level)
        return logger

getLogger(_name=None, _log_level=None, **kwargs) staticmethod

Get a logger with a given name and log level. It is possible to pass a logger name and log level to this function.

Parameters:

Name Type Description Default
_name

The name of the logger to get. If not specified the name of the module is used.

None
_log_level

The log level to set.

None

Returns:

Type Description
Logger

The logger with the given name and log level set to the value specified in HYFI_LOG_LEVEL

Source code in hyfi/utils/logging.py
@staticmethod
def getLogger(
    _name=None,
    _log_level=None,
    **kwargs,
) -> logging.Logger:
    """
    Get a logger with a given name and log level. It is possible to pass a logger name and log level to this function.

    Args:
        _name: The name of the logger to get. If not specified the name of the module is used.
        _log_level: The log level to set.

    Returns:
        The logger with the given name and log level set to the value specified in HYFI_LOG_LEVEL
    """
    _name = _name or __name__
    logger = logging.getLogger(_name)
    _log_level = _log_level or os.environ.get("HYFI_LOG_LEVEL") or "INFO"
    logger.setLevel(_log_level)
    return logger

setLogger(level=None, force=True, filterwarnings_action='ignore', **kwargs) staticmethod

Set the logging level and format. This is a wrapper around logging. basicConfig to allow the user to specify a different logging level for each test and to filter warnings that are not caught by the filterwarnings_action

Parameters:

Name Type Description Default
level

The logging level to use

None
force

If True ( default ) all warnings will be logged even if there are no warnings

True
filterwarnings_action

The action to call when a warning is logged

'ignore'

Returns:

Type Description
None

The logger that was

Source code in hyfi/utils/logging.py
@staticmethod
def setLogger(
    level=None,
    force=True,
    filterwarnings_action="ignore",
    **kwargs,
) -> None:
    """
    Set the logging level and format. This is a wrapper around logging. basicConfig to allow the user to specify a different logging level for each test and to filter warnings that are not caught by the filterwarnings_action

    Args:
        level: The logging level to use
        force: If True ( default ) all warnings will be logged even if there are no warnings
        filterwarnings_action: The action to call when a warning is logged

    Returns:
        The logger that was
    """
    level = level or os.environ.get("HYFI_LOG_LEVEL") or "INFO"
    level = level.upper()
    os.environ["HYFI_LOG_LEVEL"] = level
    # Filter warnings by applying filterwarnings_action to the warnings.
    if filterwarnings_action is not None:
        warnings.filterwarnings(filterwarnings_action)  # type: ignore
    # Return the logging level.
    if isinstance(level, str):
        level = getattr(logging, level.upper(), logging.INFO)
    # Configure logging level level and force logging.
    if sys.version_info >= (3, 8):
        logging.basicConfig(level=level, force=force, **kwargs)
    else:
        logging.basicConfig(level=level, **kwargs)

NBs

Source code in hyfi/utils/notebooks.py
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
class NBs:
    @staticmethod
    def is_notebook():
        """Check if the code is running in a notebook."""
        try:
            get_ipython  # type: ignore
        except NameError:
            return False
        # pylint: disable=undefined-variable
        shell_type = get_ipython().__class__.__name__  # type: ignore # noqa
        # logger.info(f"shell type: {shell_type}")
        return shell_type in ["ZMQInteractiveShell", "Shell"]

    @staticmethod
    def is_colab():
        """Check if the code is running in Google Colab."""
        is_colab = "google.colab" in sys.modules
        if is_colab:
            logger.info("Google Colab detected.")
        else:
            logger.info("Google Colab not detected.")
        return is_colab

    @staticmethod
    def get_display():
        """Get the display object for the current environment."""
        try:
            from ipywidgets import Output
        except ImportError:
            logger.info("ipywidgets not installed.")
            return None

        return Output() if NBs.is_notebook() else None

    @staticmethod
    def clear_output(wait=False):
        """Clear the output of the current notebook."""
        from IPython import display

        if NBs.is_notebook():
            display.clear_output(wait=wait)

    @staticmethod
    def display(
        *objs,
        include=None,
        exclude=None,
        metadata=None,
        transient=None,
        display_id=None,
        **kwargs,
    ):
        """Display an object in the current notebook."""
        from IPython import display

        if NBs.is_notebook() and objs is not None:
            return display.display(
                *objs,
                include=include,
                exclude=exclude,
                metadata=metadata,
                transient=transient,
                display_id=display_id,
                **kwargs,
            )

    @staticmethod
    def display_image(
        data=None,
        url=None,
        filename=None,
        format=None,
        embed=None,
        width=None,
        height=None,
        retina=False,
        unconfined=False,
        metadata=None,
        **kwargs,
    ):
        """
        Display an image, which can be given as raw data or a URL.

        Parameters
        ----------
        data : unicode, str or bytes
            The raw image data or a URL or filename to load the data from.
            This always results in embedded image data.
        url : unicode
            A URL to download the data from. If you specify `url=`,
            the image data will not be embedded unless you also specify `embed=True`.
        filename : unicode
            Path to a local file to load the data from.
            Images from a file are always embedded.
        format : unicode
            The format of the image data (png/jpeg/jpg/gif). If a filename or URL is given
            for format will be inferred from the filename extension.
        embed : bool
            Should the image data be embedded using a data URI (True) or be
            loaded using an <img> tag. Set this to True if you want the image
            to be viewable later with no internet connection in the notebook.

            Default is `True`, unless the keyword argument `url` is set, then
            default value is `False`.

            Note that QtConsole is not able to display images if `embed` is set to `False`
        width : int
            Width in pixels to which to constrain the image in html
        height : int
            Height in pixels to which to constrain the image in html
        retina : bool
            Automatically set the width and height to half of the measured
            width and height.
            This only works for embedded images because it reads the width/height
            from image data.
            For non-embedded images, you can just set the desired display width
            and height directly.
        unconfined: bool
            Set unconfined=True to disable max-width confinement of the image.
        metadata: dict
            Specify extra metadata to attach to the image.

        """
        from IPython import display

        if NBs.is_notebook():
            img = display.Image(
                data=data,
                url=url,
                filename=filename,
                format=format,
                embed=embed,
                width=width,
                height=height,
                retina=retina,
                unconfined=unconfined,
                metadata=metadata,
                **kwargs,
            )
            return display.display(img)

    @staticmethod
    def hide_code_in_slideshow():
        """Hide code in slideshow."""
        import binascii

        from IPython import display

        uid = binascii.hexlify(os.urandom(8)).decode()
        html = """<div id="%s"></div>
        <script type="text/javascript">
            $(function(){
                var p = $("#%s");
                if (p.length==0) return;
                while (!p.hasClass("cell")) {
                    p=p.parent();
                    if (p.prop("tagName") =="body") return;
                }
                var cell = p;
                cell.find(".input").addClass("hide-in-slideshow")
            });
        </script>""" % (
            uid,
            uid,
        )
        display.display_html(html, raw=True)

    @staticmethod
    def colored_str(s, color="black"):
        """Colored string."""
        # return "<text style=color:{}>{}</text>".format(color, s)
        return "<text style=color:{}>{}</text>".format(color, s.replace("\n", "<br>"))

    @staticmethod
    def cprint(str_tuples):
        from IPython.core.display import HTML as html_print
        from IPython.display import display

        display(
            html_print(
                " ".join([NBs.colored_str(ti, color=ci) for ti, ci in str_tuples])
            )
        )

    @staticmethod
    def create_dropdown(
        options, value, description, disabled=False, style=None, layout=None, **kwargs
    ):
        """Create a dropdown widget."""
        import ipywidgets as widgets

        if style is None:
            style = {"description_width": "initial"}

        layout = (
            widgets.Layout(width="auto") if layout is None else widgets.Layout(**layout)
        )
        return widgets.Dropdown(
            options=options,
            value=value,
            description=description,
            disabled=disabled,
            style=style,
            layout=layout,
            **kwargs,
        )

    @staticmethod
    def create_textarea(
        value,
        description,
        placeholder="",
        disabled=False,
        style=None,
        layout=None,
        **kwargs,
    ):
        """Create a textarea widget."""
        import ipywidgets as widgets

        if style is None:
            style = {"description_width": "initial"}

        layout = (
            widgets.Layout(width="auto") if layout is None else widgets.Layout(**layout)
        )
        return widgets.Textarea(
            value=value,
            placeholder=placeholder,
            description=description,
            disabled=disabled,
            style=style,
            layout=layout,
            **kwargs,
        )

    @staticmethod
    def create_button(
        description, button_style="", icon="check", layout=None, **kwargs
    ):
        """Create a button widget."""
        import ipywidgets as widgets

        layout = (
            widgets.Layout(width="auto") if layout is None else widgets.Layout(**layout)
        )
        return widgets.Button(
            description=description,
            button_style=button_style,
            icon=icon,
            layout=layout,
            **kwargs,
        )

    @staticmethod
    def create_radiobutton(
        options,
        description,
        value=None,
        disabled=False,
        style=None,
        layout=None,
        **kwargs,
    ):
        """Create a radiobutton widget."""
        import ipywidgets as widgets

        if style is None:
            style = {"description_width": "initial"}

        layout = (
            widgets.Layout(width="auto") if layout is None else widgets.Layout(**layout)
        )
        return widgets.RadioButtons(
            options=options,
            value=value,
            description=description,
            disabled=disabled,
            style=style,
            layout=layout,
            **kwargs,
        )

    @staticmethod
    def create_image(
        filename=None,
        format=None,
        width=None,
        height=None,
        **kwargs,
    ):
        """Create an image widget."""
        import ipywidgets as widgets

        # from urllib.request import urlopen

        if filename is None:
            url = "https://assets.entelecheia.cc/img/placeholder.png"
            # img = urlopen(url).read()
            img = IOLIBs.read(url)
            _format = "png"
        else:
            img = IOLIBs.read(filename)
            _format = format or filename.split(".")[-1]
        return widgets.Image(
            value=img,
            format=_format,
            width=width,
            height=height,
            **kwargs,
        )

    @staticmethod
    def create_floatslider(
        min=0.0,
        max=1.0,
        step=0.1,
        value=None,
        description="",
        disabled=False,
        continuous_update=False,
        orientation="horizontal",
        readout=True,
        readout_format=".1f",
        style=None,
        layout=None,
        **kwargs,
    ):
        """Create a float slider widget."""
        if style is None:
            style = {"description_width": "initial"}
        import ipywidgets as widgets

        layout = (
            widgets.Layout(width="auto") if layout is None else widgets.Layout(**layout)
        )
        return widgets.FloatSlider(
            min=min,
            max=max,
            step=step,
            value=value,
            description=description,
            disabled=disabled,
            continuous_update=continuous_update,
            orientation=orientation,
            readout=readout,
            readout_format=readout_format,
            style=style,
            layout=layout,
            **kwargs,
        )

    @staticmethod
    def load_extentions(exts=None):
        """Load extentions."""
        if exts is None:
            exts = ["autotime"]
        if not NBs.is_notebook():
            return
        with contextlib.suppress(ImportError):
            from IPython.core.getipython import get_ipython

            ip = get_ipython()
            if ip is None:
                return
            try:
                loaded = ip.extension_manager.loaded
                for ext in exts:
                    if ext not in loaded:
                        ip.extentension_manager.load_extension(ext)
            except AttributeError:
                for ext in exts:
                    try:
                        ip.magic(f"load_ext {ext}")
                    except ModuleNotFoundError:
                        logger.info("Extension %s not found. Install it first.", ext)

    @staticmethod
    def set_matplotlib_formats(*formats, **kwargs):
        """Set matplotlib formats."""
        if NBs.is_notebook():
            from IPython.core.display import set_matplotlib_formats

            set_matplotlib_formats(*formats, **kwargs)

    @staticmethod
    def mount_google_drive(
        project_root: str = "",
        project_name: str = "",
        mountpoint: str = "/content/drive",
        force_remount: bool = False,
        timeout_ms: int = 120000,
    ) -> None:
        """Mount Google Drive to Colab."""
        try:
            from google.colab import drive  # type: ignore

            drive.mount(mountpoint, force_remount=force_remount, timeout_ms=timeout_ms)

            if project_root:
                if not project_root.startswith(
                    os.path.sep
                ) and not project_root.startswith(".."):
                    project_root = os.path.join(mountpoint, project_root)
                ENVs.set_osenv("HYFI_PROJECT_ROOT", project_root)
                logger.info(f"Setting HYFI_PROJECT_ROOT to {project_root}")
            if project_name:
                ENVs.set_osenv("HYFI_PROJECT_NAME", project_name)
                logger.info(f"Setting HYFI_PROJECT_NAME to {project_name}")
        except ImportError:
            logger.warning("Google Colab not detected.")

clear_output(wait=False) staticmethod

Clear the output of the current notebook.

Source code in hyfi/utils/notebooks.py
@staticmethod
def clear_output(wait=False):
    """Clear the output of the current notebook."""
    from IPython import display

    if NBs.is_notebook():
        display.clear_output(wait=wait)

colored_str(s, color='black') staticmethod

Colored string.

Source code in hyfi/utils/notebooks.py
@staticmethod
def colored_str(s, color="black"):
    """Colored string."""
    # return "<text style=color:{}>{}</text>".format(color, s)
    return "<text style=color:{}>{}</text>".format(color, s.replace("\n", "<br>"))

create_button(description, button_style='', icon='check', layout=None, **kwargs) staticmethod

Create a button widget.

Source code in hyfi/utils/notebooks.py
@staticmethod
def create_button(
    description, button_style="", icon="check", layout=None, **kwargs
):
    """Create a button widget."""
    import ipywidgets as widgets

    layout = (
        widgets.Layout(width="auto") if layout is None else widgets.Layout(**layout)
    )
    return widgets.Button(
        description=description,
        button_style=button_style,
        icon=icon,
        layout=layout,
        **kwargs,
    )

create_dropdown(options, value, description, disabled=False, style=None, layout=None, **kwargs) staticmethod

Create a dropdown widget.

Source code in hyfi/utils/notebooks.py
@staticmethod
def create_dropdown(
    options, value, description, disabled=False, style=None, layout=None, **kwargs
):
    """Create a dropdown widget."""
    import ipywidgets as widgets

    if style is None:
        style = {"description_width": "initial"}

    layout = (
        widgets.Layout(width="auto") if layout is None else widgets.Layout(**layout)
    )
    return widgets.Dropdown(
        options=options,
        value=value,
        description=description,
        disabled=disabled,
        style=style,
        layout=layout,
        **kwargs,
    )

create_floatslider(min=0.0, max=1.0, step=0.1, value=None, description='', disabled=False, continuous_update=False, orientation='horizontal', readout=True, readout_format='.1f', style=None, layout=None, **kwargs) staticmethod

Create a float slider widget.

Source code in hyfi/utils/notebooks.py
@staticmethod
def create_floatslider(
    min=0.0,
    max=1.0,
    step=0.1,
    value=None,
    description="",
    disabled=False,
    continuous_update=False,
    orientation="horizontal",
    readout=True,
    readout_format=".1f",
    style=None,
    layout=None,
    **kwargs,
):
    """Create a float slider widget."""
    if style is None:
        style = {"description_width": "initial"}
    import ipywidgets as widgets

    layout = (
        widgets.Layout(width="auto") if layout is None else widgets.Layout(**layout)
    )
    return widgets.FloatSlider(
        min=min,
        max=max,
        step=step,
        value=value,
        description=description,
        disabled=disabled,
        continuous_update=continuous_update,
        orientation=orientation,
        readout=readout,
        readout_format=readout_format,
        style=style,
        layout=layout,
        **kwargs,
    )

create_image(filename=None, format=None, width=None, height=None, **kwargs) staticmethod

Create an image widget.

Source code in hyfi/utils/notebooks.py
@staticmethod
def create_image(
    filename=None,
    format=None,
    width=None,
    height=None,
    **kwargs,
):
    """Create an image widget."""
    import ipywidgets as widgets

    # from urllib.request import urlopen

    if filename is None:
        url = "https://assets.entelecheia.cc/img/placeholder.png"
        # img = urlopen(url).read()
        img = IOLIBs.read(url)
        _format = "png"
    else:
        img = IOLIBs.read(filename)
        _format = format or filename.split(".")[-1]
    return widgets.Image(
        value=img,
        format=_format,
        width=width,
        height=height,
        **kwargs,
    )

create_radiobutton(options, description, value=None, disabled=False, style=None, layout=None, **kwargs) staticmethod

Create a radiobutton widget.

Source code in hyfi/utils/notebooks.py
@staticmethod
def create_radiobutton(
    options,
    description,
    value=None,
    disabled=False,
    style=None,
    layout=None,
    **kwargs,
):
    """Create a radiobutton widget."""
    import ipywidgets as widgets

    if style is None:
        style = {"description_width": "initial"}

    layout = (
        widgets.Layout(width="auto") if layout is None else widgets.Layout(**layout)
    )
    return widgets.RadioButtons(
        options=options,
        value=value,
        description=description,
        disabled=disabled,
        style=style,
        layout=layout,
        **kwargs,
    )

create_textarea(value, description, placeholder='', disabled=False, style=None, layout=None, **kwargs) staticmethod

Create a textarea widget.

Source code in hyfi/utils/notebooks.py
@staticmethod
def create_textarea(
    value,
    description,
    placeholder="",
    disabled=False,
    style=None,
    layout=None,
    **kwargs,
):
    """Create a textarea widget."""
    import ipywidgets as widgets

    if style is None:
        style = {"description_width": "initial"}

    layout = (
        widgets.Layout(width="auto") if layout is None else widgets.Layout(**layout)
    )
    return widgets.Textarea(
        value=value,
        placeholder=placeholder,
        description=description,
        disabled=disabled,
        style=style,
        layout=layout,
        **kwargs,
    )

display(*objs, include=None, exclude=None, metadata=None, transient=None, display_id=None, **kwargs) staticmethod

Display an object in the current notebook.

Source code in hyfi/utils/notebooks.py
@staticmethod
def display(
    *objs,
    include=None,
    exclude=None,
    metadata=None,
    transient=None,
    display_id=None,
    **kwargs,
):
    """Display an object in the current notebook."""
    from IPython import display

    if NBs.is_notebook() and objs is not None:
        return display.display(
            *objs,
            include=include,
            exclude=exclude,
            metadata=metadata,
            transient=transient,
            display_id=display_id,
            **kwargs,
        )

display_image(data=None, url=None, filename=None, format=None, embed=None, width=None, height=None, retina=False, unconfined=False, metadata=None, **kwargs) staticmethod

Display an image, which can be given as raw data or a URL.

Parameters

data : unicode, str or bytes The raw image data or a URL or filename to load the data from. This always results in embedded image data. url : unicode A URL to download the data from. If you specify url=, the image data will not be embedded unless you also specify embed=True. filename : unicode Path to a local file to load the data from. Images from a file are always embedded. format : unicode The format of the image data (png/jpeg/jpg/gif). If a filename or URL is given for format will be inferred from the filename extension. embed : bool Should the image data be embedded using a data URI (True) or be loaded using an tag. Set this to True if you want the image to be viewable later with no internet connection in the notebook.

Default is `True`, unless the keyword argument `url` is set, then
default value is `False`.

Note that QtConsole is not able to display images if `embed` is set to `False`

width : int Width in pixels to which to constrain the image in html height : int Height in pixels to which to constrain the image in html retina : bool Automatically set the width and height to half of the measured width and height. This only works for embedded images because it reads the width/height from image data. For non-embedded images, you can just set the desired display width and height directly. unconfined: bool Set unconfined=True to disable max-width confinement of the image. metadata: dict Specify extra metadata to attach to the image.

Source code in hyfi/utils/notebooks.py
@staticmethod
def display_image(
    data=None,
    url=None,
    filename=None,
    format=None,
    embed=None,
    width=None,
    height=None,
    retina=False,
    unconfined=False,
    metadata=None,
    **kwargs,
):
    """
    Display an image, which can be given as raw data or a URL.

    Parameters
    ----------
    data : unicode, str or bytes
        The raw image data or a URL or filename to load the data from.
        This always results in embedded image data.
    url : unicode
        A URL to download the data from. If you specify `url=`,
        the image data will not be embedded unless you also specify `embed=True`.
    filename : unicode
        Path to a local file to load the data from.
        Images from a file are always embedded.
    format : unicode
        The format of the image data (png/jpeg/jpg/gif). If a filename or URL is given
        for format will be inferred from the filename extension.
    embed : bool
        Should the image data be embedded using a data URI (True) or be
        loaded using an <img> tag. Set this to True if you want the image
        to be viewable later with no internet connection in the notebook.

        Default is `True`, unless the keyword argument `url` is set, then
        default value is `False`.

        Note that QtConsole is not able to display images if `embed` is set to `False`
    width : int
        Width in pixels to which to constrain the image in html
    height : int
        Height in pixels to which to constrain the image in html
    retina : bool
        Automatically set the width and height to half of the measured
        width and height.
        This only works for embedded images because it reads the width/height
        from image data.
        For non-embedded images, you can just set the desired display width
        and height directly.
    unconfined: bool
        Set unconfined=True to disable max-width confinement of the image.
    metadata: dict
        Specify extra metadata to attach to the image.

    """
    from IPython import display

    if NBs.is_notebook():
        img = display.Image(
            data=data,
            url=url,
            filename=filename,
            format=format,
            embed=embed,
            width=width,
            height=height,
            retina=retina,
            unconfined=unconfined,
            metadata=metadata,
            **kwargs,
        )
        return display.display(img)

get_display() staticmethod

Get the display object for the current environment.

Source code in hyfi/utils/notebooks.py
@staticmethod
def get_display():
    """Get the display object for the current environment."""
    try:
        from ipywidgets import Output
    except ImportError:
        logger.info("ipywidgets not installed.")
        return None

    return Output() if NBs.is_notebook() else None

hide_code_in_slideshow() staticmethod

Hide code in slideshow.

Source code in hyfi/utils/notebooks.py
@staticmethod
def hide_code_in_slideshow():
    """Hide code in slideshow."""
    import binascii

    from IPython import display

    uid = binascii.hexlify(os.urandom(8)).decode()
    html = """<div id="%s"></div>
    <script type="text/javascript">
        $(function(){
            var p = $("#%s");
            if (p.length==0) return;
            while (!p.hasClass("cell")) {
                p=p.parent();
                if (p.prop("tagName") =="body") return;
            }
            var cell = p;
            cell.find(".input").addClass("hide-in-slideshow")
        });
    </script>""" % (
        uid,
        uid,
    )
    display.display_html(html, raw=True)

is_colab() staticmethod

Check if the code is running in Google Colab.

Source code in hyfi/utils/notebooks.py
@staticmethod
def is_colab():
    """Check if the code is running in Google Colab."""
    is_colab = "google.colab" in sys.modules
    if is_colab:
        logger.info("Google Colab detected.")
    else:
        logger.info("Google Colab not detected.")
    return is_colab

is_notebook() staticmethod

Check if the code is running in a notebook.

Source code in hyfi/utils/notebooks.py
@staticmethod
def is_notebook():
    """Check if the code is running in a notebook."""
    try:
        get_ipython  # type: ignore
    except NameError:
        return False
    # pylint: disable=undefined-variable
    shell_type = get_ipython().__class__.__name__  # type: ignore # noqa
    # logger.info(f"shell type: {shell_type}")
    return shell_type in ["ZMQInteractiveShell", "Shell"]

load_extentions(exts=None) staticmethod

Load extentions.

Source code in hyfi/utils/notebooks.py
@staticmethod
def load_extentions(exts=None):
    """Load extentions."""
    if exts is None:
        exts = ["autotime"]
    if not NBs.is_notebook():
        return
    with contextlib.suppress(ImportError):
        from IPython.core.getipython import get_ipython

        ip = get_ipython()
        if ip is None:
            return
        try:
            loaded = ip.extension_manager.loaded
            for ext in exts:
                if ext not in loaded:
                    ip.extentension_manager.load_extension(ext)
        except AttributeError:
            for ext in exts:
                try:
                    ip.magic(f"load_ext {ext}")
                except ModuleNotFoundError:
                    logger.info("Extension %s not found. Install it first.", ext)

mount_google_drive(project_root='', project_name='', mountpoint='/content/drive', force_remount=False, timeout_ms=120000) staticmethod

Mount Google Drive to Colab.

Source code in hyfi/utils/notebooks.py
@staticmethod
def mount_google_drive(
    project_root: str = "",
    project_name: str = "",
    mountpoint: str = "/content/drive",
    force_remount: bool = False,
    timeout_ms: int = 120000,
) -> None:
    """Mount Google Drive to Colab."""
    try:
        from google.colab import drive  # type: ignore

        drive.mount(mountpoint, force_remount=force_remount, timeout_ms=timeout_ms)

        if project_root:
            if not project_root.startswith(
                os.path.sep
            ) and not project_root.startswith(".."):
                project_root = os.path.join(mountpoint, project_root)
            ENVs.set_osenv("HYFI_PROJECT_ROOT", project_root)
            logger.info(f"Setting HYFI_PROJECT_ROOT to {project_root}")
        if project_name:
            ENVs.set_osenv("HYFI_PROJECT_NAME", project_name)
            logger.info(f"Setting HYFI_PROJECT_NAME to {project_name}")
    except ImportError:
        logger.warning("Google Colab not detected.")

set_matplotlib_formats(*formats, **kwargs) staticmethod

Set matplotlib formats.

Source code in hyfi/utils/notebooks.py
@staticmethod
def set_matplotlib_formats(*formats, **kwargs):
    """Set matplotlib formats."""
    if NBs.is_notebook():
        from IPython.core.display import set_matplotlib_formats

        set_matplotlib_formats(*formats, **kwargs)

PKGs

Source code in hyfi/utils/packages.py
class PKGs:
    @staticmethod
    def gitclone(
        url: str,
        targetdir: str = "",
        verbose: bool = False,
    ) -> None:
        """
        Clone a git repository from the specified URL.

        Args:
            url (str): The URL of the git repository to clone.
            targetdir (str, optional): The directory to clone the repository into. Defaults to "".
            verbose (bool, optional): Whether to print the output of the git command. Defaults to False.
        """
        if targetdir:
            res = subprocess.run(
                ["git", "clone", url, targetdir], stdout=subprocess.PIPE
            ).stdout.decode("utf-8")
        else:
            res = subprocess.run(
                ["git", "clone", url], stdout=subprocess.PIPE
            ).stdout.decode("utf-8")
        if verbose:
            print(res)
        else:
            logger.info(res)

    @staticmethod
    def pip(
        name: str,
        upgrade: bool = False,
        prelease: bool = False,
        editable: bool = False,
        quiet: bool = True,
        find_links: str = "",
        requirement: bool = False,
        force_reinstall: bool = False,
        verbose: bool = False,
        **kwargs,
    ) -> None:
        """
        Install a package using pip.

        Args:
            name (str): The name of the package to install.
            upgrade (bool, optional): Whether to upgrade the package if it is already installed. Defaults to False.
            prelease (bool, optional): Whether to include pre-release versions. Defaults to False.
            editable (bool, optional): Whether to install the package in editable mode. Defaults to False.
            quiet (bool, optional): Whether to suppress output. Defaults to True.
            find_links (str, optional): URL to look for packages at. Defaults to "".
            requirement (bool, optional): Whether to install from the given requirements file. Defaults to False.
            force_reinstall (bool, optional): Whether to force a reinstall of the package. Defaults to False.
            verbose (bool, optional): Whether to print the output of the pip command. Defaults to False.
            **kwargs: Additional keyword arguments to pass to pip.

        Returns:
            None
        """
        _cmd = ["pip", "install"]
        if upgrade:
            _cmd.append("--upgrade")
        if prelease:
            _cmd.append("--pre")
        if editable:
            _cmd.append("--editable")
        if quiet:
            _cmd.append("--quiet")
        if find_links:
            _cmd += ["--find-links", find_links]
        if requirement:
            _cmd.append("--requirement")
        if force_reinstall:
            _cmd.append("--force-reinstall")
        for k in kwargs:
            k = k.replace("_", "-")
            _cmd.append(f"--{k}")
        _cmd.append(name)
        if verbose:
            logger.info(f"Installing: {' '.join(_cmd)}")
        res = subprocess.run(_cmd, stdout=subprocess.PIPE).stdout.decode("utf-8")
        if verbose:
            print(res)
        else:
            logger.info(res)

    @staticmethod
    def pipi(name: str, verbose: bool = False) -> None:
        """Install a package using pip."""
        res = subprocess.run(
            ["pip", "install", name], stdout=subprocess.PIPE
        ).stdout.decode("utf-8")
        if verbose:
            print(res)
        else:
            logger.info(res)

    @staticmethod
    def pipie(name: str, verbose: bool = False) -> None:
        """Install a editable package using pip."""
        res = subprocess.run(
            ["git", "install", "-e", name], stdout=subprocess.PIPE
        ).stdout.decode("utf-8")
        if verbose:
            print(res)
        else:
            logger.info(res)

    @staticmethod
    def apti(name: str, verbose: bool = False) -> None:
        """Install a package using apt."""
        res = subprocess.run(
            ["apt", "install", name], stdout=subprocess.PIPE
        ).stdout.decode("utf-8")
        if verbose:
            print(res)
        else:
            logger.info(res)

    @staticmethod
    def load_module_from_file(name: str, libpath: str, specname: str = "") -> None:
        """Load a module from a file"""
        module_path = os.path.join(libpath, name.replace(".", os.path.sep))
        if IOLIBs.is_file(f"{module_path}.py"):
            module_path = f"{module_path}.py"
        elif IOLIBs.is_dir(module_path):
            module_path = os.path.join(module_path, "__init__.py")
        else:
            module_path = str(Path(module_path).parent / "__init__.py")

        spec = importlib.util.spec_from_file_location(name, module_path)  # type: ignore
        module = importlib.util.module_from_spec(spec)  # type: ignore
        if not specname:
            specname = spec.name
        sys.modules[specname] = module
        spec.loader.exec_module(module)

    @staticmethod
    def ensure_import_module(
        name: str,
        libpath: str,
        liburi: str,
        specname: str = "",
        syspath: str = "",
    ) -> None:
        """Ensure a module is imported, if not, clone it from a git repo and load it"""
        try:
            if specname:
                importlib.import_module(specname)
            else:
                importlib.import_module(name)
            logger.info(f"{name} imported")
        except ImportError:
            if not os.path.exists(libpath):
                logger.info(f"{libpath} not found, cloning from {liburi}")
                PKGs.gitclone(liburi, libpath)
            if not syspath:
                syspath = libpath
            if syspath not in sys.path:
                sys.path.append(syspath)
            PKGs.load_module_from_file(name, syspath, specname)
            specname = specname or name
            logger.info(f"{name} not imported, loading from {syspath} as {specname}")

    @staticmethod
    def getsource(obj: str) -> str:
        """
        Return the source code of the object.

        Args:
            obj (str): The object to get the source code from.

        Returns:
            str: The source code of the object.

        """
        try:
            mod_name, object_name = obj.rsplit(".", 1)
            mod = importlib.import_module(mod_name)
            obj_ = getattr(mod, object_name)
            return inspect.getsource(obj_)
        except Exception as e:
            logger.error(f"Error getting source: {e}")
            return ""

    @staticmethod
    def viewsource(obj: str) -> None:
        """Print the source code of the object."""
        print(PKGs.getsource(obj))

    @staticmethod
    def get_module_name_stack() -> List[str]:
        """Get the name of the module that called this function."""
        try:
            _stack = inspect.stack()
            return [
                getattr(inspect.getmodule(_stack[i][0]), "__name__", "")
                for i in range(1, len(_stack))
            ]
        except Exception as e:
            logger.error(f"Error getting module name stack: {e}")
            return []

    @staticmethod
    def get_caller_module_name(caller_stack_depth: int = 2) -> str:
        """Get the name of the module that called this function."""
        _stack = PKGs.get_module_name_stack()
        if len(_stack) < caller_stack_depth + 1:
            logger.info("Returning top level module name (depth %d)", len(_stack) - 1)
            return _stack[-1]
        return _stack[caller_stack_depth]

    @staticmethod
    def get_next_level_caller_package_name() -> Optional[str]:
        """Get the name of the package that called this function."""
        _stack = PKGs.get_module_name_stack()
        package_name = _stack[0].split(".")[0]
        for name in _stack:
            name = name.split(".")[0]
            if name != package_name:
                return name

    @staticmethod
    def is_importable(module_name: str) -> bool:
        module_spec = importlib.util.find_spec(module_name)  # type: ignore
        return module_spec is not None

    @staticmethod
    def safe_import_module(module_name: str) -> Any:
        """Safely imports a module."""
        try:
            return importlib.import_module(module_name)
        except ImportError:
            logger.debug("Failed to import module: %s", module_name)
            return None

apti(name, verbose=False) staticmethod

Install a package using apt.

Source code in hyfi/utils/packages.py
@staticmethod
def apti(name: str, verbose: bool = False) -> None:
    """Install a package using apt."""
    res = subprocess.run(
        ["apt", "install", name], stdout=subprocess.PIPE
    ).stdout.decode("utf-8")
    if verbose:
        print(res)
    else:
        logger.info(res)

ensure_import_module(name, libpath, liburi, specname='', syspath='') staticmethod

Ensure a module is imported, if not, clone it from a git repo and load it

Source code in hyfi/utils/packages.py
@staticmethod
def ensure_import_module(
    name: str,
    libpath: str,
    liburi: str,
    specname: str = "",
    syspath: str = "",
) -> None:
    """Ensure a module is imported, if not, clone it from a git repo and load it"""
    try:
        if specname:
            importlib.import_module(specname)
        else:
            importlib.import_module(name)
        logger.info(f"{name} imported")
    except ImportError:
        if not os.path.exists(libpath):
            logger.info(f"{libpath} not found, cloning from {liburi}")
            PKGs.gitclone(liburi, libpath)
        if not syspath:
            syspath = libpath
        if syspath not in sys.path:
            sys.path.append(syspath)
        PKGs.load_module_from_file(name, syspath, specname)
        specname = specname or name
        logger.info(f"{name} not imported, loading from {syspath} as {specname}")

get_caller_module_name(caller_stack_depth=2) staticmethod

Get the name of the module that called this function.

Source code in hyfi/utils/packages.py
@staticmethod
def get_caller_module_name(caller_stack_depth: int = 2) -> str:
    """Get the name of the module that called this function."""
    _stack = PKGs.get_module_name_stack()
    if len(_stack) < caller_stack_depth + 1:
        logger.info("Returning top level module name (depth %d)", len(_stack) - 1)
        return _stack[-1]
    return _stack[caller_stack_depth]

get_module_name_stack() staticmethod

Get the name of the module that called this function.

Source code in hyfi/utils/packages.py
@staticmethod
def get_module_name_stack() -> List[str]:
    """Get the name of the module that called this function."""
    try:
        _stack = inspect.stack()
        return [
            getattr(inspect.getmodule(_stack[i][0]), "__name__", "")
            for i in range(1, len(_stack))
        ]
    except Exception as e:
        logger.error(f"Error getting module name stack: {e}")
        return []

get_next_level_caller_package_name() staticmethod

Get the name of the package that called this function.

Source code in hyfi/utils/packages.py
@staticmethod
def get_next_level_caller_package_name() -> Optional[str]:
    """Get the name of the package that called this function."""
    _stack = PKGs.get_module_name_stack()
    package_name = _stack[0].split(".")[0]
    for name in _stack:
        name = name.split(".")[0]
        if name != package_name:
            return name

getsource(obj) staticmethod

Return the source code of the object.

Parameters:

Name Type Description Default
obj str

The object to get the source code from.

required

Returns:

Name Type Description
str str

The source code of the object.

Source code in hyfi/utils/packages.py
@staticmethod
def getsource(obj: str) -> str:
    """
    Return the source code of the object.

    Args:
        obj (str): The object to get the source code from.

    Returns:
        str: The source code of the object.

    """
    try:
        mod_name, object_name = obj.rsplit(".", 1)
        mod = importlib.import_module(mod_name)
        obj_ = getattr(mod, object_name)
        return inspect.getsource(obj_)
    except Exception as e:
        logger.error(f"Error getting source: {e}")
        return ""

gitclone(url, targetdir='', verbose=False) staticmethod

Clone a git repository from the specified URL.

Parameters:

Name Type Description Default
url str

The URL of the git repository to clone.

required
targetdir str

The directory to clone the repository into. Defaults to "".

''
verbose bool

Whether to print the output of the git command. Defaults to False.

False
Source code in hyfi/utils/packages.py
@staticmethod
def gitclone(
    url: str,
    targetdir: str = "",
    verbose: bool = False,
) -> None:
    """
    Clone a git repository from the specified URL.

    Args:
        url (str): The URL of the git repository to clone.
        targetdir (str, optional): The directory to clone the repository into. Defaults to "".
        verbose (bool, optional): Whether to print the output of the git command. Defaults to False.
    """
    if targetdir:
        res = subprocess.run(
            ["git", "clone", url, targetdir], stdout=subprocess.PIPE
        ).stdout.decode("utf-8")
    else:
        res = subprocess.run(
            ["git", "clone", url], stdout=subprocess.PIPE
        ).stdout.decode("utf-8")
    if verbose:
        print(res)
    else:
        logger.info(res)

load_module_from_file(name, libpath, specname='') staticmethod

Load a module from a file

Source code in hyfi/utils/packages.py
@staticmethod
def load_module_from_file(name: str, libpath: str, specname: str = "") -> None:
    """Load a module from a file"""
    module_path = os.path.join(libpath, name.replace(".", os.path.sep))
    if IOLIBs.is_file(f"{module_path}.py"):
        module_path = f"{module_path}.py"
    elif IOLIBs.is_dir(module_path):
        module_path = os.path.join(module_path, "__init__.py")
    else:
        module_path = str(Path(module_path).parent / "__init__.py")

    spec = importlib.util.spec_from_file_location(name, module_path)  # type: ignore
    module = importlib.util.module_from_spec(spec)  # type: ignore
    if not specname:
        specname = spec.name
    sys.modules[specname] = module
    spec.loader.exec_module(module)

pip(name, upgrade=False, prelease=False, editable=False, quiet=True, find_links='', requirement=False, force_reinstall=False, verbose=False, **kwargs) staticmethod

Install a package using pip.

Parameters:

Name Type Description Default
name str

The name of the package to install.

required
upgrade bool

Whether to upgrade the package if it is already installed. Defaults to False.

False
prelease bool

Whether to include pre-release versions. Defaults to False.

False
editable bool

Whether to install the package in editable mode. Defaults to False.

False
quiet bool

Whether to suppress output. Defaults to True.

True
find_links str

URL to look for packages at. Defaults to "".

''
requirement bool

Whether to install from the given requirements file. Defaults to False.

False
force_reinstall bool

Whether to force a reinstall of the package. Defaults to False.

False
verbose bool

Whether to print the output of the pip command. Defaults to False.

False
**kwargs

Additional keyword arguments to pass to pip.

{}

Returns:

Type Description
None

None

Source code in hyfi/utils/packages.py
@staticmethod
def pip(
    name: str,
    upgrade: bool = False,
    prelease: bool = False,
    editable: bool = False,
    quiet: bool = True,
    find_links: str = "",
    requirement: bool = False,
    force_reinstall: bool = False,
    verbose: bool = False,
    **kwargs,
) -> None:
    """
    Install a package using pip.

    Args:
        name (str): The name of the package to install.
        upgrade (bool, optional): Whether to upgrade the package if it is already installed. Defaults to False.
        prelease (bool, optional): Whether to include pre-release versions. Defaults to False.
        editable (bool, optional): Whether to install the package in editable mode. Defaults to False.
        quiet (bool, optional): Whether to suppress output. Defaults to True.
        find_links (str, optional): URL to look for packages at. Defaults to "".
        requirement (bool, optional): Whether to install from the given requirements file. Defaults to False.
        force_reinstall (bool, optional): Whether to force a reinstall of the package. Defaults to False.
        verbose (bool, optional): Whether to print the output of the pip command. Defaults to False.
        **kwargs: Additional keyword arguments to pass to pip.

    Returns:
        None
    """
    _cmd = ["pip", "install"]
    if upgrade:
        _cmd.append("--upgrade")
    if prelease:
        _cmd.append("--pre")
    if editable:
        _cmd.append("--editable")
    if quiet:
        _cmd.append("--quiet")
    if find_links:
        _cmd += ["--find-links", find_links]
    if requirement:
        _cmd.append("--requirement")
    if force_reinstall:
        _cmd.append("--force-reinstall")
    for k in kwargs:
        k = k.replace("_", "-")
        _cmd.append(f"--{k}")
    _cmd.append(name)
    if verbose:
        logger.info(f"Installing: {' '.join(_cmd)}")
    res = subprocess.run(_cmd, stdout=subprocess.PIPE).stdout.decode("utf-8")
    if verbose:
        print(res)
    else:
        logger.info(res)

pipi(name, verbose=False) staticmethod

Install a package using pip.

Source code in hyfi/utils/packages.py
@staticmethod
def pipi(name: str, verbose: bool = False) -> None:
    """Install a package using pip."""
    res = subprocess.run(
        ["pip", "install", name], stdout=subprocess.PIPE
    ).stdout.decode("utf-8")
    if verbose:
        print(res)
    else:
        logger.info(res)

pipie(name, verbose=False) staticmethod

Install a editable package using pip.

Source code in hyfi/utils/packages.py
@staticmethod
def pipie(name: str, verbose: bool = False) -> None:
    """Install a editable package using pip."""
    res = subprocess.run(
        ["git", "install", "-e", name], stdout=subprocess.PIPE
    ).stdout.decode("utf-8")
    if verbose:
        print(res)
    else:
        logger.info(res)

safe_import_module(module_name) staticmethod

Safely imports a module.

Source code in hyfi/utils/packages.py
@staticmethod
def safe_import_module(module_name: str) -> Any:
    """Safely imports a module."""
    try:
        return importlib.import_module(module_name)
    except ImportError:
        logger.debug("Failed to import module: %s", module_name)
        return None

viewsource(obj) staticmethod

Print the source code of the object.

Source code in hyfi/utils/packages.py
@staticmethod
def viewsource(obj: str) -> None:
    """Print the source code of the object."""
    print(PKGs.getsource(obj))

SAFEEVAL

Source code in hyfi/utils/safeeval.py
class SAFEEVAL:
    @staticmethod
    def safe_eval(
        expr: str,
        operators=None,
        functions: Optional[Dict[str, Callable]] = None,
        names: Optional[Dict[str, Any]] = None,
    ):
        """Simply evaluate an expresssion"""
        s = SafeEval(operators=operators, functions=functions, names=names)
        return s.eval(expr)

safe_eval(expr, operators=None, functions=None, names=None) staticmethod

Simply evaluate an expresssion

Source code in hyfi/utils/safeeval.py
@staticmethod
def safe_eval(
    expr: str,
    operators=None,
    functions: Optional[Dict[str, Callable]] = None,
    names: Optional[Dict[str, Any]] = None,
):
    """Simply evaluate an expresssion"""
    s = SafeEval(operators=operators, functions=functions, names=names)
    return s.eval(expr)

SafeEval

Bases: object

A very simple expression parser.

s = SafeEval() s.eval("20 + 30 - ( 10 * 5)") 0

Source code in hyfi/utils/safeeval.py
class SafeEval(object):  # pylint: disable=too-few-public-methods
    """A very simple expression parser.
    >>> s = SafeEval()
    >>> s.eval("20 + 30 - ( 10 * 5)")
    0
    """

    expr = ""

    def __init__(
        self,
        operators: Optional[Dict[Any, Callable]] = None,
        functions: Optional[Dict[str, Callable]] = None,
        names: Optional[Dict[str, Any]] = None,
    ):
        """
        Create the evaluator instance.  Set up valid operators (+,-, etc)
        functions (add, random, get_val, whatever) and names."""

        if operators is None:
            operators = DEFAULT_OPERATORS.copy()
        if functions is None:
            functions = DEFAULT_FUNCTIONS.copy()
        if names is None:
            names = DEFAULT_NAMES.copy()

        self.operators = operators
        self.functions = functions
        self.names = names

        self.nodes = {
            ast.Expr: self._eval_expr,
            ast.Assign: self._eval_assign,
            ast.AugAssign: self._eval_aug_assign,
            ast.Import: self._eval_import,
            ast.Num: self._eval_num,
            ast.Str: self._eval_str,
            ast.Name: self._eval_name,
            ast.UnaryOp: self._eval_unaryop,
            ast.BinOp: self._eval_binop,
            ast.BoolOp: self._eval_boolop,
            ast.Compare: self._eval_compare,
            ast.IfExp: self._eval_ifexp,
            ast.Call: self._eval_call,
            ast.keyword: self._eval_keyword,
            ast.Subscript: self._eval_subscript,
            ast.Attribute: self._eval_attribute,
            ast.Index: self._eval_index,
            ast.Slice: self._eval_slice,
        }

        # py3k stuff:
        if hasattr(ast, "NameConstant"):
            self.nodes[ast.NameConstant] = self._eval_constant

        # py3.6, f-strings
        if hasattr(ast, "JoinedStr"):
            self.nodes[ast.JoinedStr] = self._eval_joinedstr  # f-string
            self.nodes[ast.FormattedValue] = (
                self._eval_formattedvalue
            )  # formatted value in f-string

        # py3.8 uses ast.Constant instead of ast.Num, ast.Str, ast.NameConstant
        if hasattr(ast, "Constant"):
            self.nodes[ast.Constant] = self._eval_constant

        # Defaults:

        self.ATTR_INDEX_FALLBACK = ATTR_INDEX_FALLBACK

        # Check for forbidden functions:

        for f in self.functions.values():
            if f in DISALLOW_FUNCTIONS:
                raise FeatureNotAvailable(f"This function {f} is a really bad idea.")

    def __del__(self):
        self.nodes = None

    @staticmethod
    def parse(expr):
        """parse an expression into a node tree"""

        parsed = ast.parse(expr.strip())

        if not parsed.body:
            raise InvalidExpression("Sorry, cannot evaluate empty string")
        if len(parsed.body) > 1:
            warnings.warn(
                f"'{expr}' contains multiple expressions. Only the first will be used.",
                MultipleExpressions,
            )
        return parsed.body[0]

    def eval(self, expr, previously_parsed=None):
        """evaluate an expresssion, using the operators, functions and
        names previously set up."""

        # set a copy of the expression aside, so we can give nice errors...
        self.expr = expr

        return self._eval(previously_parsed or self.parse(expr))

    def _eval(self, node):
        """The internal evaluator used on each node in the parsed tree."""

        try:
            handler = self.nodes[type(node)]
        except KeyError as e:
            raise FeatureNotAvailable(
                "Sorry, {0} is not available in this "
                "evaluator".format(type(node).__name__)
            ) from e

        return handler(node)

    def _eval_expr(self, node):
        return self._eval(node.value)

    def _eval_assign(self, node):
        warnings.warn(
            f"Assignment ({self.expr}) attempted, but this is ignored",
            AssignmentAttempted,
        )
        return self._eval(node.value)

    def _eval_aug_assign(self, node):
        warnings.warn(
            f"Assignment ({self.expr}) attempted, but this is ignored",
            AssignmentAttempted,
        )
        return self._eval(node.value)

    @staticmethod
    def _eval_import(node):
        raise FeatureNotAvailable("Sorry, 'import' is not allowed.")

    @staticmethod
    def _eval_num(node):
        return node.n

    @staticmethod
    def _eval_str(node):
        if len(node.s) > MAX_STRING_LENGTH:
            raise IterableTooLong(
                "String Literal in statement is too long!"
                " ({0}, when {1} is max)".format(len(node.s), MAX_STRING_LENGTH)
            )
        return node.s

    @staticmethod
    def _eval_constant(node):
        if hasattr(node.value, "__len__") and len(node.value) > MAX_STRING_LENGTH:
            raise IterableTooLong(
                "Literal in statement is too long!"
                " ({0}, when {1} is max)".format(len(node.value), MAX_STRING_LENGTH)
            )
        return node.value

    def _eval_unaryop(self, node):
        try:
            operator = self.operators[type(node.op)]
        except KeyError as e:
            raise OperatorNotDefined(node.op, self.expr) from e
        return operator(self._eval(node.operand))

    def _eval_binop(self, node):
        try:
            operator = self.operators[type(node.op)]
        except KeyError as e:
            raise OperatorNotDefined(node.op, self.expr) from e
        return operator(self._eval(node.left), self._eval(node.right))

    def _eval_boolop(self, node):
        to_return = False
        for value in node.values:
            if isinstance(node.op, ast.And):
                to_return = self._eval(value)
                if not to_return:
                    break
            elif isinstance(node.op, ast.Or):
                to_return = self._eval(value)
                if to_return:
                    break
        return to_return

    def _eval_compare(self, node):
        right = self._eval(node.left)
        to_return = True
        for operation, comp in zip(node.ops, node.comparators):
            if not to_return:
                break
            left = right
            right = self._eval(comp)
            to_return = self.operators[type(operation)](left, right)
        return to_return

    def _eval_ifexp(self, node):
        return (
            self._eval(node.body) if self._eval(node.test) else self._eval(node.orelse)
        )

    def _eval_call(self, node):
        if isinstance(node.func, ast.Attribute):
            func = self._eval(node.func)
        else:
            try:
                func = self.functions[node.func.id]
            except KeyError as e:
                raise FunctionNotDefined(node.func.id, self.expr) from e
            except AttributeError as e:
                raise FeatureNotAvailable("Lambda Functions not implemented") from e

            if func in DISALLOW_FUNCTIONS:
                raise FeatureNotAvailable("This function is forbidden")

        return func(
            *(self._eval(a) for a in node.args),
            **dict(self._eval(k) for k in node.keywords),
        )

    def _eval_keyword(self, node):
        return node.arg, self._eval(node.value)

    def _eval_name(self, node):
        try:
            # This happens at least for slicing
            # This is a safe thing to do because it is impossible
            # that there is a true exression assigning to none
            # (the compiler rejects it, so you can't even
            # pass that to ast.parse)
            if hasattr(self.names, "__getitem__"):
                return self.names[node.id]
            if callable(self.names):
                return self.names(node)
            raise InvalidExpression(
                'Trying to use name (variable) "{0}"'
                ' when no "names" defined for'
                " evaluator".format(node.id)
            )

        except KeyError as e:
            if node.id in self.functions:
                return self.functions[node.id]

            raise NameNotDefined(node.id, self.expr) from e

    def _eval_subscript(self, node):
        container = self._eval(node.value)
        key = self._eval(node.slice)
        try:
            return container[key]
        except KeyError as e:
            raise InvalidExpression(f"Invalid key: {key}") from e

    def _eval_attribute(self, node):
        attr = node.attr
        if any(attr.startswith(prefix) for prefix in DISALLOW_PREFIXES):
            raise FeatureNotAvailable(
                f"Sorry, access to __attributes or func_ attributes is not available. ({attr})"
            )
        if attr in DISALLOW_METHODS:
            raise FeatureNotAvailable(f"Sorry, this method is not available. ({attr})")

        node_evaluated = self._eval(node.value)

        try:
            return getattr(node_evaluated, attr)
        except (AttributeError, TypeError) as e:
            if self.ATTR_INDEX_FALLBACK:
                with contextlib.suppress(KeyError, TypeError):
                    return node_evaluated[attr]
            raise AttributeDoesNotExist(attr, self.expr) from e

    def _eval_index(self, node):
        return self._eval(node.value)

    def _eval_slice(self, node):
        lower = upper = step = None
        if node.lower is not None:
            lower = self._eval(node.lower)
        if node.upper is not None:
            upper = self._eval(node.upper)
        if node.step is not None:
            step = self._eval(node.step)
        return slice(lower, upper, step)

    def _eval_joinedstr(self, node):
        length = 0
        evaluated_values = []
        for n in node.values:
            val = str(self._eval(n))
            if len(val) + length > MAX_STRING_LENGTH:
                raise IterableTooLong("Sorry, I will not evaluate something this long.")
            evaluated_values.append(val)
        return "".join(evaluated_values)

    def _eval_formattedvalue(self, node):
        if node.format_spec:
            fmt = "{:" + self._eval(node.format_spec) + "}"
            return fmt.format(self._eval(node.value))
        return self._eval(node.value)

__init__(operators=None, functions=None, names=None)

Create the evaluator instance. Set up valid operators (+,-, etc) functions (add, random, get_val, whatever) and names.

Source code in hyfi/utils/safeeval.py
def __init__(
    self,
    operators: Optional[Dict[Any, Callable]] = None,
    functions: Optional[Dict[str, Callable]] = None,
    names: Optional[Dict[str, Any]] = None,
):
    """
    Create the evaluator instance.  Set up valid operators (+,-, etc)
    functions (add, random, get_val, whatever) and names."""

    if operators is None:
        operators = DEFAULT_OPERATORS.copy()
    if functions is None:
        functions = DEFAULT_FUNCTIONS.copy()
    if names is None:
        names = DEFAULT_NAMES.copy()

    self.operators = operators
    self.functions = functions
    self.names = names

    self.nodes = {
        ast.Expr: self._eval_expr,
        ast.Assign: self._eval_assign,
        ast.AugAssign: self._eval_aug_assign,
        ast.Import: self._eval_import,
        ast.Num: self._eval_num,
        ast.Str: self._eval_str,
        ast.Name: self._eval_name,
        ast.UnaryOp: self._eval_unaryop,
        ast.BinOp: self._eval_binop,
        ast.BoolOp: self._eval_boolop,
        ast.Compare: self._eval_compare,
        ast.IfExp: self._eval_ifexp,
        ast.Call: self._eval_call,
        ast.keyword: self._eval_keyword,
        ast.Subscript: self._eval_subscript,
        ast.Attribute: self._eval_attribute,
        ast.Index: self._eval_index,
        ast.Slice: self._eval_slice,
    }

    # py3k stuff:
    if hasattr(ast, "NameConstant"):
        self.nodes[ast.NameConstant] = self._eval_constant

    # py3.6, f-strings
    if hasattr(ast, "JoinedStr"):
        self.nodes[ast.JoinedStr] = self._eval_joinedstr  # f-string
        self.nodes[ast.FormattedValue] = (
            self._eval_formattedvalue
        )  # formatted value in f-string

    # py3.8 uses ast.Constant instead of ast.Num, ast.Str, ast.NameConstant
    if hasattr(ast, "Constant"):
        self.nodes[ast.Constant] = self._eval_constant

    # Defaults:

    self.ATTR_INDEX_FALLBACK = ATTR_INDEX_FALLBACK

    # Check for forbidden functions:

    for f in self.functions.values():
        if f in DISALLOW_FUNCTIONS:
            raise FeatureNotAvailable(f"This function {f} is a really bad idea.")

eval(expr, previously_parsed=None)

evaluate an expresssion, using the operators, functions and names previously set up.

Source code in hyfi/utils/safeeval.py
def eval(self, expr, previously_parsed=None):
    """evaluate an expresssion, using the operators, functions and
    names previously set up."""

    # set a copy of the expression aside, so we can give nice errors...
    self.expr = expr

    return self._eval(previously_parsed or self.parse(expr))

parse(expr) staticmethod

parse an expression into a node tree

Source code in hyfi/utils/safeeval.py
@staticmethod
def parse(expr):
    """parse an expression into a node tree"""

    parsed = ast.parse(expr.strip())

    if not parsed.body:
        raise InvalidExpression("Sorry, cannot evaluate empty string")
    if len(parsed.body) > 1:
        warnings.warn(
            f"'{expr}' contains multiple expressions. Only the first will be used.",
            MultipleExpressions,
        )
    return parsed.body[0]