Module robofish.io.utils

Expand source code
import robofish.io
import numpy as np
from typing import Union, Iterable, Tuple
from pathlib import Path
from tqdm import tqdm


def np_array(*arrays):
    result = tuple(np.array(a) if a is not None else None for a in arrays)
    if len(result) == 1:
        result = result[0]
    return result


def full_path(current_file, path):
    return (Path(current_file).parent / path).resolve()


def limit_angle_range(angle: Union[float, Iterable], _range=(-np.pi, np.pi)):
    """Limit the range of an angle or array of angles between min and max

    Any given angle in rad will be moved to the given range.
    e.g. with min = -pi and max pi, 4.5*pi will be moved to be 0.5*pi.
    Args:
        angle: An angle or an array of angles (1D)
        _range: optional tuple of range (min, max)
    Returns:
        limited angle(s) in the same form as the input (float or ndarray)
    """
    assert np.isclose(_range[1] - _range[0], 2 * np.pi)

    def limit_simple(a):
        return (a - _range[0]) % (2 * np.pi) + _range[0]

    if isinstance(angle, Iterable):
        nan = np.isnan(angle)
        angle[~nan] = limit_simple(angle[~nan])
    else:
        angle = limit_simple(angle)
    return angle


def get_all_files_from_paths(paths: Iterable[Union[str, Path]], max_files=None):
    # Find all files with correct ending
    files = []
    for path in [Path(p) for p in paths]:
        if path.is_dir():
            files_path = []
            for ext in ("hdf", "hdf5", "h5", "he5"):
                files_path += list(path.rglob(f"*.{ext}"))
            files.append(files_path[:max_files])
        else:
            files.append([path])
    return files


def get_all_poses_from_paths(
    paths: Iterable[Union[str, Path]], predicate=None, max_files=None
) -> Tuple[Iterable[Iterable[np.ndarray]], float]:
    """Read all poses from given paths.

    The function shall be used by the evaluation functions.

    Args:
        paths(Iterable[Union[str, Path]]): An array of strings, with files or folders. The files are checked to have the same frequency.
    Returns:
        Tuple[Iterable[Iterable[np.ndarray]], float]: An array, containing poses with the shape [paths][files][entities, timesteps, 4], the common frequency of the files
    """
    return get_all_data_from_paths(paths, "poses_4d", predicate, max_files=max_files)


def get_all_data_from_paths(
    paths: Iterable[Union[str, Path]],
    request_type="poses_4d",
    predicate=None,
    max_files=None,
):
    expected_settings = None
    all_data = []

    files_per_path = get_all_files_from_paths(paths, max_files)

    pbar = tqdm(
        total=sum([len(files_in_path) for files_in_path in files_per_path]),
        desc=f"Loading {request_type} from files.",
    )

    # for each given path
    for files_in_path in files_per_path:
        data_from_files = []

        # Open all files, gather the poses and check if all of them have the same world size and frequency
        for i_path, file_path in enumerate(files_in_path):
            with robofish.io.File(file_path, "r") as file:
                pbar.update(1)
                pbar.refresh()
                file_settings = {
                    "world_size_cm_x": file.attrs["world_size_cm"][0],
                    "world_size_cm_y": file.attrs["world_size_cm"][1],
                    "frequency_hz": file.frequency,
                }
                if expected_settings is None:
                    expected_settings = file_settings

                assert file_settings == expected_settings

                properties = {
                    "poses_4d": robofish.io.Entity.poses,
                    "speeds_turns": robofish.io.Entity.actions_speeds_turns,
                }

                pred = None if predicate is None else predicate[i_path]
                data = file.select_entity_property(
                    pred, entity_property=properties[request_type]
                )

                # Exclude timesteps where there is any nan in the row
                data = data[:, ~np.isnan(data).any(axis=2).any(axis=0)]

                data_from_files.append(data)

        all_data.append(data_from_files)
    pbar.close()
    return all_data, expected_settings

Functions

def full_path(current_file, path)
Expand source code
def full_path(current_file, path):
    return (Path(current_file).parent / path).resolve()
def get_all_data_from_paths(paths: Iterable[Union[str, pathlib.Path]], request_type='poses_4d', predicate=None, max_files=None)
Expand source code
def get_all_data_from_paths(
    paths: Iterable[Union[str, Path]],
    request_type="poses_4d",
    predicate=None,
    max_files=None,
):
    expected_settings = None
    all_data = []

    files_per_path = get_all_files_from_paths(paths, max_files)

    pbar = tqdm(
        total=sum([len(files_in_path) for files_in_path in files_per_path]),
        desc=f"Loading {request_type} from files.",
    )

    # for each given path
    for files_in_path in files_per_path:
        data_from_files = []

        # Open all files, gather the poses and check if all of them have the same world size and frequency
        for i_path, file_path in enumerate(files_in_path):
            with robofish.io.File(file_path, "r") as file:
                pbar.update(1)
                pbar.refresh()
                file_settings = {
                    "world_size_cm_x": file.attrs["world_size_cm"][0],
                    "world_size_cm_y": file.attrs["world_size_cm"][1],
                    "frequency_hz": file.frequency,
                }
                if expected_settings is None:
                    expected_settings = file_settings

                assert file_settings == expected_settings

                properties = {
                    "poses_4d": robofish.io.Entity.poses,
                    "speeds_turns": robofish.io.Entity.actions_speeds_turns,
                }

                pred = None if predicate is None else predicate[i_path]
                data = file.select_entity_property(
                    pred, entity_property=properties[request_type]
                )

                # Exclude timesteps where there is any nan in the row
                data = data[:, ~np.isnan(data).any(axis=2).any(axis=0)]

                data_from_files.append(data)

        all_data.append(data_from_files)
    pbar.close()
    return all_data, expected_settings
def get_all_files_from_paths(paths: Iterable[Union[str, pathlib.Path]], max_files=None)
Expand source code
def get_all_files_from_paths(paths: Iterable[Union[str, Path]], max_files=None):
    # Find all files with correct ending
    files = []
    for path in [Path(p) for p in paths]:
        if path.is_dir():
            files_path = []
            for ext in ("hdf", "hdf5", "h5", "he5"):
                files_path += list(path.rglob(f"*.{ext}"))
            files.append(files_path[:max_files])
        else:
            files.append([path])
    return files
def get_all_poses_from_paths(paths: Iterable[Union[str, pathlib.Path]], predicate=None, max_files=None) ‑> Tuple[Iterable[Iterable[numpy.ndarray]], float]

Read all poses from given paths.

The function shall be used by the evaluation functions.

Args

paths(Iterable[Union[str, Path]]): An array of strings, with files or folders. The files are checked to have the same frequency.

Returns

Tuple[Iterable[Iterable[np.ndarray]], float]
An array, containing poses with the shape [paths][files][entities, timesteps, 4], the common frequency of the files
Expand source code
def get_all_poses_from_paths(
    paths: Iterable[Union[str, Path]], predicate=None, max_files=None
) -> Tuple[Iterable[Iterable[np.ndarray]], float]:
    """Read all poses from given paths.

    The function shall be used by the evaluation functions.

    Args:
        paths(Iterable[Union[str, Path]]): An array of strings, with files or folders. The files are checked to have the same frequency.
    Returns:
        Tuple[Iterable[Iterable[np.ndarray]], float]: An array, containing poses with the shape [paths][files][entities, timesteps, 4], the common frequency of the files
    """
    return get_all_data_from_paths(paths, "poses_4d", predicate, max_files=max_files)
def limit_angle_range(angle: Union[float, Iterable])

Limit the range of an angle or array of angles between min and max

Any given angle in rad will be moved to the given range. e.g. with min = -pi and max pi, 4.5pi will be moved to be 0.5pi.

Args

angle
An angle or an array of angles (1D)
_range
optional tuple of range (min, max)

Returns

limited angle(s) in the same form as the input (float or ndarray)

Expand source code
def limit_angle_range(angle: Union[float, Iterable], _range=(-np.pi, np.pi)):
    """Limit the range of an angle or array of angles between min and max

    Any given angle in rad will be moved to the given range.
    e.g. with min = -pi and max pi, 4.5*pi will be moved to be 0.5*pi.
    Args:
        angle: An angle or an array of angles (1D)
        _range: optional tuple of range (min, max)
    Returns:
        limited angle(s) in the same form as the input (float or ndarray)
    """
    assert np.isclose(_range[1] - _range[0], 2 * np.pi)

    def limit_simple(a):
        return (a - _range[0]) % (2 * np.pi) + _range[0]

    if isinstance(angle, Iterable):
        nan = np.isnan(angle)
        angle[~nan] = limit_simple(angle[~nan])
    else:
        angle = limit_simple(angle)
    return angle
def np_array(*arrays)
Expand source code
def np_array(*arrays):
    result = tuple(np.array(a) if a is not None else None for a in arrays)
    if len(result) == 1:
        result = result[0]
    return result