Module robofish.io.utils
Expand source code
import robofish.io
import numpy as np
from typing import Union, Iterable, Tuple
from pathlib import Path
from tqdm import tqdm
def np_array(*arrays):
result = tuple(np.array(a) if a is not None else None for a in arrays)
if len(result) == 1:
result = result[0]
return result
def full_path(current_file, path):
return (Path(current_file).parent / path).resolve()
def limit_angle_range(angle: Union[float, Iterable], _range=(-np.pi, np.pi)):
"""Limit the range of an angle or array of angles between min and max
Any given angle in rad will be moved to the given range.
e.g. with min = -pi and max pi, 4.5*pi will be moved to be 0.5*pi.
Args:
angle: An angle or an array of angles (1D)
_range: optional tuple of range (min, max)
Returns:
limited angle(s) in the same form as the input (float or ndarray)
"""
assert np.isclose(_range[1] - _range[0], 2 * np.pi)
def limit_simple(a):
return (a - _range[0]) % (2 * np.pi) + _range[0]
if isinstance(angle, Iterable):
nan = np.isnan(angle)
angle[~nan] = limit_simple(angle[~nan])
else:
angle = limit_simple(angle)
return angle
def get_all_files_from_paths(paths: Iterable[Union[str, Path]], max_files=None):
# Find all files with correct ending
files = []
for path in [Path(p) for p in paths]:
if path.is_dir():
files_path = []
for ext in ("hdf", "hdf5", "h5", "he5"):
files_path += list(path.rglob(f"*.{ext}"))
files.append(files_path[:max_files])
else:
files.append([path])
return files
def get_all_poses_from_paths(
paths: Iterable[Union[str, Path]], predicate=None, max_files=None
) -> Tuple[Iterable[Iterable[np.ndarray]], float]:
"""Read all poses from given paths.
The function shall be used by the evaluation functions.
Args:
paths(Iterable[Union[str, Path]]): An array of strings, with files or folders. The files are checked to have the same frequency.
Returns:
Tuple[Iterable[Iterable[np.ndarray]], float]: An array, containing poses with the shape [paths][files][entities, timesteps, 4], the common frequency of the files
"""
return get_all_data_from_paths(paths, "poses_4d", predicate, max_files=max_files)
def get_all_data_from_paths(
paths: Iterable[Union[str, Path]],
request_type="poses_4d",
predicate=None,
max_files=None,
):
expected_settings = None
all_data = []
files_per_path = get_all_files_from_paths(paths, max_files)
pbar = tqdm(
total=sum([len(files_in_path) for files_in_path in files_per_path]),
desc=f"Loading {request_type} from files.",
)
# for each given path
for files_in_path in files_per_path:
data_from_files = []
# Open all files, gather the poses and check if all of them have the same world size and frequency
for i_path, file_path in enumerate(files_in_path):
with robofish.io.File(file_path, "r") as file:
pbar.update(1)
pbar.refresh()
file_settings = {
"world_size_cm_x": file.attrs["world_size_cm"][0],
"world_size_cm_y": file.attrs["world_size_cm"][1],
"frequency_hz": file.frequency,
}
if expected_settings is None:
expected_settings = file_settings
assert file_settings == expected_settings
properties = {
"poses_4d": robofish.io.Entity.poses,
"speeds_turns": robofish.io.Entity.actions_speeds_turns,
}
pred = None if predicate is None else predicate[i_path]
data = file.select_entity_property(
pred, entity_property=properties[request_type]
)
# Exclude timesteps where there is any nan in the row
data = data[:, ~np.isnan(data).any(axis=2).any(axis=0)]
data_from_files.append(data)
all_data.append(data_from_files)
pbar.close()
return all_data, expected_settings
Functions
def full_path(current_file, path)
-
Expand source code
def full_path(current_file, path): return (Path(current_file).parent / path).resolve()
def get_all_data_from_paths(paths: Iterable[Union[str, pathlib.Path]], request_type='poses_4d', predicate=None, max_files=None)
-
Expand source code
def get_all_data_from_paths( paths: Iterable[Union[str, Path]], request_type="poses_4d", predicate=None, max_files=None, ): expected_settings = None all_data = [] files_per_path = get_all_files_from_paths(paths, max_files) pbar = tqdm( total=sum([len(files_in_path) for files_in_path in files_per_path]), desc=f"Loading {request_type} from files.", ) # for each given path for files_in_path in files_per_path: data_from_files = [] # Open all files, gather the poses and check if all of them have the same world size and frequency for i_path, file_path in enumerate(files_in_path): with robofish.io.File(file_path, "r") as file: pbar.update(1) pbar.refresh() file_settings = { "world_size_cm_x": file.attrs["world_size_cm"][0], "world_size_cm_y": file.attrs["world_size_cm"][1], "frequency_hz": file.frequency, } if expected_settings is None: expected_settings = file_settings assert file_settings == expected_settings properties = { "poses_4d": robofish.io.Entity.poses, "speeds_turns": robofish.io.Entity.actions_speeds_turns, } pred = None if predicate is None else predicate[i_path] data = file.select_entity_property( pred, entity_property=properties[request_type] ) # Exclude timesteps where there is any nan in the row data = data[:, ~np.isnan(data).any(axis=2).any(axis=0)] data_from_files.append(data) all_data.append(data_from_files) pbar.close() return all_data, expected_settings
def get_all_files_from_paths(paths: Iterable[Union[str, pathlib.Path]], max_files=None)
-
Expand source code
def get_all_files_from_paths(paths: Iterable[Union[str, Path]], max_files=None): # Find all files with correct ending files = [] for path in [Path(p) for p in paths]: if path.is_dir(): files_path = [] for ext in ("hdf", "hdf5", "h5", "he5"): files_path += list(path.rglob(f"*.{ext}")) files.append(files_path[:max_files]) else: files.append([path]) return files
def get_all_poses_from_paths(paths: Iterable[Union[str, pathlib.Path]], predicate=None, max_files=None) ‑> Tuple[Iterable[Iterable[numpy.ndarray]], float]
-
Read all poses from given paths.
The function shall be used by the evaluation functions.
Args
paths(Iterable[Union[str, Path]]): An array of strings, with files or folders. The files are checked to have the same frequency.
Returns
Tuple[Iterable[Iterable[np.ndarray]], float]
- An array, containing poses with the shape [paths][files][entities, timesteps, 4], the common frequency of the files
Expand source code
def get_all_poses_from_paths( paths: Iterable[Union[str, Path]], predicate=None, max_files=None ) -> Tuple[Iterable[Iterable[np.ndarray]], float]: """Read all poses from given paths. The function shall be used by the evaluation functions. Args: paths(Iterable[Union[str, Path]]): An array of strings, with files or folders. The files are checked to have the same frequency. Returns: Tuple[Iterable[Iterable[np.ndarray]], float]: An array, containing poses with the shape [paths][files][entities, timesteps, 4], the common frequency of the files """ return get_all_data_from_paths(paths, "poses_4d", predicate, max_files=max_files)
def limit_angle_range(angle: Union[float, Iterable])
-
Limit the range of an angle or array of angles between min and max
Any given angle in rad will be moved to the given range. e.g. with min = -pi and max pi, 4.5pi will be moved to be 0.5pi.
Args
angle
- An angle or an array of angles (1D)
_range
- optional tuple of range (min, max)
Returns
limited angle(s) in the same form as the input (float or ndarray)
Expand source code
def limit_angle_range(angle: Union[float, Iterable], _range=(-np.pi, np.pi)): """Limit the range of an angle or array of angles between min and max Any given angle in rad will be moved to the given range. e.g. with min = -pi and max pi, 4.5*pi will be moved to be 0.5*pi. Args: angle: An angle or an array of angles (1D) _range: optional tuple of range (min, max) Returns: limited angle(s) in the same form as the input (float or ndarray) """ assert np.isclose(_range[1] - _range[0], 2 * np.pi) def limit_simple(a): return (a - _range[0]) % (2 * np.pi) + _range[0] if isinstance(angle, Iterable): nan = np.isnan(angle) angle[~nan] = limit_simple(angle[~nan]) else: angle = limit_simple(angle) return angle
def np_array(*arrays)
-
Expand source code
def np_array(*arrays): result = tuple(np.array(a) if a is not None else None for a in arrays) if len(result) == 1: result = result[0] return result