Module robofish.io.entity

Tracks of entities are stored in Entity objects. They are created by the File object. They require a category and can have a name. If no name is given, it will be created, using the category and an id.

f = robofish.io.File(world_size_cm=[100, 100], frequency_hz=25.0)
nemo = f.create_entity(category="fish", name="nemo")

The function create_entity() returns an entity object. It can be stored but it's not neccessary. The entity is automatically stored in the file.

Pose options

The poses of an entity can be passed in multiple ways. Poses are divided into positions (x, y) and orientations. Orientations are internally represented in unit vectors but can also be passed in rads. The shape of the passed orientation array defines the meaning.

# Create dummy poses for 100 timesteps
pos = np.zeros((100, 2))
ori_vec = np.zeros((100,2)) * [0, 1]
ori_rad = np.zeros((100,1))

# Creating an entity without orientations. Here we keep the entity object, to use it later.
f.create_entity(category="fish", positions=pos)
# Creating an entity using orientation vectors. Keeping the entity object is not neccessary, it is saved in the file.
f.create_entity(category="fish", positions=pos, orientations=ori_vec)
# Creating an entity using radian orientations.
f.create_entity(category="fish", positions=pos, orientations=ori_rad)

The poses can be also passed in an combined array.

# Create an entity using orientation vectors.
f.create_entity(category="fish", poses=np.ones((100,4)) * np.sqrt(2))
# Create an entity using radian orientations
f.create_entity(category="fish", poses=np.zeros((100,3)))

Creating multiple entities at once

Multiple entities can be created at once.

# Here we create 4 fishes from poses with radian orientation
f.create_multiple_entities(category="fish", poses=np.zeros((4,100,3)))

Attributes

Entities can have attributes to describe them.

The attributes can be set like this:

nemo.attrs["species"] = "Clownfish"
nemo.attrs["fish_standard_length_cm"] = 10

Any attribute is allowed, but some cannonical attributes are prepared:
species:str, sex:str, fish_standard_length_cm:float

Properties

As described in robofish.io, Files and Entities have useful properties.

Entity function Description
Entity.positions Positions as a (timesteps, 2 (x, y)) array.
Entity.orientations Orientations as a (timesteps, 2 (ori_x, ori_y)) array.
Entity.orientations_rad Orientations as a (timesteps, 1 (ori_rad)) array.
Entity.poses Poses as a (timesteps, 4 (x, y, x_ori, y_ori)) array.
Entity.poses_rad Poses as a (timesteps, 3(x, y, ori_rad)) array.
Entity.actions_speeds_turns Speed and turn as a (timesteps - 1, 2 (speed in cm/frame, turn in rad/frame)) array.


⚠️ Try this out by extending the example of the main doc, so that a new teleporting fish with random positions [-50, 50] and orientations (0, 2 * pi) is generated. How does that change the output and speed histogram robofish-io-evaluate speed example.hdf5? Try writing some attributes.

Expand source code
"""
.. include:: ../../../docs/entity.md
"""

import robofish.io
import robofish.io.utils as utils

import h5py
import numpy as np
from typing import Iterable, Union
import datetime
import logging
import deprecation


class Entity(h5py.Group):
    @classmethod
    def from_h5py_group(cla, group):
        group.__class__ = cla
        return group

    @classmethod
    def create_entity(
        cla,
        entities_group,
        category: str,
        poses: Iterable = None,
        name: str = None,
        positions: Iterable = None,
        orientations: Iterable = None,
        outlines: Iterable = None,
        sampling: str = None,
    ):
        poses, positions, orientations, outlines = utils.np_array(
            poses, positions, orientations, outlines
        )

        assert poses is None or (poses.ndim == 2 and poses.shape[1] in [3, 4])
        assert positions is None or (positions.ndim == 2 and positions.shape[1] == 2)
        assert orientations is None or (
            orientations.ndim == 2 and orientations.shape[1] in [1, 2]
        )

        # If no name is given, create one from type and an id
        if name is None:
            i = 1
            name = "%s_%d" % (category, i)
            while name in entities_group and i < 10000:
                name = "%s_%d" % (category, i)
                i += 1

        # Create new group and convert it to a robofish.io.Entity object
        entity = entities_group.create_group(name)
        entity = cla.from_h5py_group(entity)

        entity.attrs["category"] = category

        entity.create_poses(poses, positions, orientations, sampling)

        if outlines is not None:
            entity.create_outlines(outlines, sampling)

        entity.update_calculated_data()

        return entity

    @classmethod
    def convert_rad_to_vector(cla, orientations_rad):
        if min(orientations_rad) < 0 or max(orientations_rad) > 2 * np.pi:
            logging.warning(
                "Converting orientations, from a bigger range than [0, 2 * pi]. When passing the orientations, they are assumed to be in radians."
            )
        ori_rad = utils.np_array(orientations_rad)
        assert ori_rad.shape[1] == 1
        ori_vec = np.empty((ori_rad.shape[0], 2))
        ori_vec[:, 0] = np.cos(ori_rad[:, 0])
        ori_vec[:, 1] = np.sin(ori_rad[:, 0])
        return ori_vec

    @property
    def group_name(self):
        return super().name

    @property
    def name(self):
        return self.group_name.split("/")[-1]

    @property
    def category(self):
        return self.attrs["category"]

    def create_outlines(self, outlines: Iterable, sampling=None):
        outlines = self.create_dataset("outlines", data=outlines, dtype=np.float32)
        if sampling is not None:
            outlines.attrs["sampling"] = sampling

    def create_poses(
        self,
        poses: Iterable = None,
        positions: Iterable = None,
        orientations: Iterable = None,
        sampling: str = None,
    ):
        poses, positions, orientations = utils.np_array(poses, positions, orientations)

        # Either poses or positions not both
        assert (
            poses is None or positions is None
        ), "Only either poses or positions can be given, not both."

        if poses is None and positions is None:
            logging.warning(
                "An entity without poses was created. If this was unwanted, add 'poses' or 'positions' to the constructor"
            )
        else:
            if poses is not None:
                assert poses.shape[1] in [3, 4]
                positions = poses[:, :2]
                orientations = poses[:, 2:]
            if orientations is not None and orientations.shape[1] == 1:
                orientations = Entity.convert_rad_to_vector(orientations)

            positions = self.create_dataset(
                "positions", data=positions, dtype=np.float32
            )
            if orientations is not None:
                orientations = self.create_dataset(
                    "orientations", data=orientations, dtype=np.float32
                )

            if sampling is not None:
                positions.attrs["sampling"] = sampling
                orientations.attrs["sampling"] = sampling

    @property
    def positions(self):
        return self["positions"]

    @property
    def orientations(self):
        if not "orientations" in self:
            # If no orientation is given, the default direction is to the right
            return np.tile([0, 1], (self.positions.shape[0], 1))
        return self["orientations"]

    @property
    @deprecation.deprecated(
        deprecated_in="0.2",
        removed_in="0.2.4",
        details="We found that our calculation of 'poses_calc_ori_rad' is flawed and replaced it "
        "Use the original poses ('poses_rad') with tracked orientations instead. "
        "If you see this message and you don't know what to do, update all packages, "
        "merge to the master branch of fish_models if nothing helps, contact Andi.\n"
        "Don't ignore this warning, it's a serious issue.",
    )
    def poses_calc_ori(self):
        poses_cor = self.poses_calc_ori_rad
        return np.concatenate(
            [
                poses_cor[:, :2],
                np.cos(poses_cor[:, 2, np.newaxis]),
                np.sin(poses_cor[:, 2, np.newaxis]),
            ],
            axis=1,
        )

    @property
    @deprecation.deprecated(
        deprecated_in="0.2",
        removed_in="0.2.4",
        details="We found that our calculation of 'poses_calc_ori' is flawed and replaced it "
        "Use the original poses ('poses') with tracked orientations instead. "
        "If you see this message and you don't know what to do, update all packages, "
        "merge to the master branch of fish_models if nothing helps, contact Andi.\n"
        "Don't ignore this warning, it's a serious issue.",
    )
    def poses_calc_ori_rad(self):
        # Diff between positions [t - 1, 2]
        diff = np.diff(self.positions, axis=0)

        # angles [t - 1]
        angles = utils.limit_angle_range(
            np.arctan2(diff[:, 1], diff[:, 0]), _range=(0, 2 * np.pi)
        )

        # Positions with angles. The first position is cut of, as it does not have an orientation.
        poses_with_calculated_orientation = np.concatenate(
            [self.positions[1:], angles[:, np.newaxis]], axis=1
        )

        return poses_with_calculated_orientation

    @property
    def poses_hash(self):
        # The hash of h5py datasets changes each time the file is reopened.
        # Also the hash of casting the array to bytes and calculating the hash changes.
        def npsumhash(a):
            return hash(np.nansum(a))

        if "orientations" in self:
            h = (npsumhash(self["positions"]) + npsumhash(self["orientations"])) // 2
        elif "positions" in self:
            print("We found positions")
            h = npsumhash(self["positions"])
        else:
            h = 0
        return h

    @property
    def poses(self):
        return np.concatenate([self.positions, self.orientations], axis=1)

    @property
    def poses_rad(self):
        return np.concatenate([self.positions, self.orientations_rad], axis=1)

    @property
    @deprecation.deprecated(
        deprecated_in="0.2",
        removed_in="0.2.4",
        details="We found that our calculation of 'speed_turn' is flawed and replaced it "
        "with 'actions_speeds_turns'. The difference in calculation is, that the tracked "
        "orientation is used now which gives the fish the ability to swim backwards. "
        "If you see this message and you don't know what to do, update all packages, "
        "merge to the master branch of fish_models if nothing helps, contact Andi.\n"
        "Don't ignore this warning, it's a serious issue.",
    )
    def speed_turn(self):
        """Get the speed, turn and from the positions.

        The vectors pointing from each position to the next are computed.
        The output of the function describe these vectors.
        Returns:
            An array with shape (number_of_positions -1, 3). It is one timestep shorter than the number_of_positions, since the last pose has no following timestep.
            The first column is the length of the vectors.
            The second column is the turning angle, required to get from one vector to the next.
            We assume, that the entity is oriented "correctly" in the first pose. So the first turn angle is 0.
        """

        # poses with calulated orientation have first position cut of as it does not have an orientation
        # (t - 1, (x ,y, ori))
        poses_calc_ori = self.poses_calc_ori_rad

        # Differences cuts of last item (t - 2, (dx, dy, d ori))
        diff = np.diff(poses_calc_ori, axis=0)
        speed = np.linalg.norm(diff[:, :2], axis=1)
        turn = utils.limit_angle_range(diff[:, 2], _range=(-np.pi, np.pi))
        return np.stack([speed, turn], axis=-1)

    def update_calculated_data(self, verbose=False, force_update=False):
        changed = False
        if (
            "poses_hash" not in self.attrs
            or self.attrs["poses_hash"] != self.poses_hash
            or "calculated_orientations_rad" not in self
            or "calculated_actions_speeds_turns" not in self
            or "unfinished_calculations" in self.attrs
            or force_update
        ):
            try:
                self.attrs["poses_hash"] = self.poses_hash
                if "orientations" in self:
                    self.attrs["unfinished_calculations"] = True
                    ori_rad = self.calculate_orientations_rad()
                    if "calculated_orientations_rad" in self:
                        del self["calculated_orientations_rad"]
                    self["calculated_orientations_rad"] = ori_rad.astype(np.float64)

                    speeds_turns = self.calculate_actions_speeds_turns()
                    if "calculated_actions_speeds_turns" in self:
                        del self["calculated_actions_speeds_turns"]
                    self["calculated_actions_speeds_turns"] = speeds_turns.astype(
                        np.float64
                    )
                    del self.attrs["unfinished_calculations"]

                    if verbose:
                        changed = True
                        print(
                            f"Updated calculated data for entity {self.name} with poses_hash {self.poses_hash}"
                        )
                elif verbose:
                    print(
                        "Since there were no orientations in the data, nothing was calculated."
                    )
            except RuntimeError as e:
                print("Trying to update calculated data in a read-only file")
                raise e
        else:
            if verbose:
                print(
                    f"Nothing to be updated in entity {self.name}. Poses_hash was {self.attrs['poses_hash']}"
                )

        assert self.attrs["poses_hash"] == self.poses_hash
        return changed

    def calculate_orientations_rad(self):
        ori_rad = utils.limit_angle_range(
            np.arctan2(self.orientations[:, 1], self.orientations[:, 0]),
            _range=(0, 2 * np.pi),
        )[:, np.newaxis]
        return ori_rad

    def calculate_actions_speeds_turns(self):
        """Calculate the speed, turn and from the recorded positions and orientations.

        The turn is calculated by the change of orientation between frames.
        The speed is calculated by the distance between the points, projected on the new orientation vector.
        The sideway change of position cannot be represented with this method.

        Returns:
            An array with shape (number_of_positions -1, 2 (speed in cm/frame, turn in rad/frame).
        """
        ori = self.orientations
        ori_rad = self.orientations_rad
        pos = self.positions
        turn = utils.limit_angle_range(np.diff(ori_rad, axis=0)[:, 0])
        pos_diff = np.diff(pos, axis=0)
        speed = np.array(
            [np.dot(pos_diff[i], ori[i + 1]) for i in range(pos_diff.shape[0])]
        )
        return np.stack([speed, turn], axis=-1)

    @property
    def actions_speeds_turns(self):
        if "calculated_actions_speeds_turns" in self:
            assert (
                self.attrs["poses_hash"] == self.poses_hash
            ), f"The calculated poses_hash was not identical to the stored poses_hash. Please update the calculated data after changing positions or orientations with entity.update_calculated_data(). stored hash: {self.attrs['poses_hash']}, calculated hash: {self.poses_hash}."
            return self["calculated_actions_speeds_turns"]
        else:
            return self.calculate_actions_speeds_turns()

    @property
    def orientations_rad(self):
        if "calculated_orientations_rad" in self:
            assert (
                self.attrs["poses_hash"] == self.poses_hash
            ), f"The calculated poses_hash was not identical to the stored poses_hash. Please update the calculated data after changing positions or orientations with entity.update_calculated_data(). stored hash: {self.attrs['poses_hash']}, calculated hash: {self.poses_hash}."
            return self["calculated_orientations_rad"]
        else:
            return self.calculate_orientations_rad()

Classes

class Entity (bind)

Represents an HDF5 group.

Create a new Group object by binding to a low-level GroupID.

Expand source code
class Entity(h5py.Group):
    @classmethod
    def from_h5py_group(cla, group):
        group.__class__ = cla
        return group

    @classmethod
    def create_entity(
        cla,
        entities_group,
        category: str,
        poses: Iterable = None,
        name: str = None,
        positions: Iterable = None,
        orientations: Iterable = None,
        outlines: Iterable = None,
        sampling: str = None,
    ):
        poses, positions, orientations, outlines = utils.np_array(
            poses, positions, orientations, outlines
        )

        assert poses is None or (poses.ndim == 2 and poses.shape[1] in [3, 4])
        assert positions is None or (positions.ndim == 2 and positions.shape[1] == 2)
        assert orientations is None or (
            orientations.ndim == 2 and orientations.shape[1] in [1, 2]
        )

        # If no name is given, create one from type and an id
        if name is None:
            i = 1
            name = "%s_%d" % (category, i)
            while name in entities_group and i < 10000:
                name = "%s_%d" % (category, i)
                i += 1

        # Create new group and convert it to a robofish.io.Entity object
        entity = entities_group.create_group(name)
        entity = cla.from_h5py_group(entity)

        entity.attrs["category"] = category

        entity.create_poses(poses, positions, orientations, sampling)

        if outlines is not None:
            entity.create_outlines(outlines, sampling)

        entity.update_calculated_data()

        return entity

    @classmethod
    def convert_rad_to_vector(cla, orientations_rad):
        if min(orientations_rad) < 0 or max(orientations_rad) > 2 * np.pi:
            logging.warning(
                "Converting orientations, from a bigger range than [0, 2 * pi]. When passing the orientations, they are assumed to be in radians."
            )
        ori_rad = utils.np_array(orientations_rad)
        assert ori_rad.shape[1] == 1
        ori_vec = np.empty((ori_rad.shape[0], 2))
        ori_vec[:, 0] = np.cos(ori_rad[:, 0])
        ori_vec[:, 1] = np.sin(ori_rad[:, 0])
        return ori_vec

    @property
    def group_name(self):
        return super().name

    @property
    def name(self):
        return self.group_name.split("/")[-1]

    @property
    def category(self):
        return self.attrs["category"]

    def create_outlines(self, outlines: Iterable, sampling=None):
        outlines = self.create_dataset("outlines", data=outlines, dtype=np.float32)
        if sampling is not None:
            outlines.attrs["sampling"] = sampling

    def create_poses(
        self,
        poses: Iterable = None,
        positions: Iterable = None,
        orientations: Iterable = None,
        sampling: str = None,
    ):
        poses, positions, orientations = utils.np_array(poses, positions, orientations)

        # Either poses or positions not both
        assert (
            poses is None or positions is None
        ), "Only either poses or positions can be given, not both."

        if poses is None and positions is None:
            logging.warning(
                "An entity without poses was created. If this was unwanted, add 'poses' or 'positions' to the constructor"
            )
        else:
            if poses is not None:
                assert poses.shape[1] in [3, 4]
                positions = poses[:, :2]
                orientations = poses[:, 2:]
            if orientations is not None and orientations.shape[1] == 1:
                orientations = Entity.convert_rad_to_vector(orientations)

            positions = self.create_dataset(
                "positions", data=positions, dtype=np.float32
            )
            if orientations is not None:
                orientations = self.create_dataset(
                    "orientations", data=orientations, dtype=np.float32
                )

            if sampling is not None:
                positions.attrs["sampling"] = sampling
                orientations.attrs["sampling"] = sampling

    @property
    def positions(self):
        return self["positions"]

    @property
    def orientations(self):
        if not "orientations" in self:
            # If no orientation is given, the default direction is to the right
            return np.tile([0, 1], (self.positions.shape[0], 1))
        return self["orientations"]

    @property
    @deprecation.deprecated(
        deprecated_in="0.2",
        removed_in="0.2.4",
        details="We found that our calculation of 'poses_calc_ori_rad' is flawed and replaced it "
        "Use the original poses ('poses_rad') with tracked orientations instead. "
        "If you see this message and you don't know what to do, update all packages, "
        "merge to the master branch of fish_models if nothing helps, contact Andi.\n"
        "Don't ignore this warning, it's a serious issue.",
    )
    def poses_calc_ori(self):
        poses_cor = self.poses_calc_ori_rad
        return np.concatenate(
            [
                poses_cor[:, :2],
                np.cos(poses_cor[:, 2, np.newaxis]),
                np.sin(poses_cor[:, 2, np.newaxis]),
            ],
            axis=1,
        )

    @property
    @deprecation.deprecated(
        deprecated_in="0.2",
        removed_in="0.2.4",
        details="We found that our calculation of 'poses_calc_ori' is flawed and replaced it "
        "Use the original poses ('poses') with tracked orientations instead. "
        "If you see this message and you don't know what to do, update all packages, "
        "merge to the master branch of fish_models if nothing helps, contact Andi.\n"
        "Don't ignore this warning, it's a serious issue.",
    )
    def poses_calc_ori_rad(self):
        # Diff between positions [t - 1, 2]
        diff = np.diff(self.positions, axis=0)

        # angles [t - 1]
        angles = utils.limit_angle_range(
            np.arctan2(diff[:, 1], diff[:, 0]), _range=(0, 2 * np.pi)
        )

        # Positions with angles. The first position is cut of, as it does not have an orientation.
        poses_with_calculated_orientation = np.concatenate(
            [self.positions[1:], angles[:, np.newaxis]], axis=1
        )

        return poses_with_calculated_orientation

    @property
    def poses_hash(self):
        # The hash of h5py datasets changes each time the file is reopened.
        # Also the hash of casting the array to bytes and calculating the hash changes.
        def npsumhash(a):
            return hash(np.nansum(a))

        if "orientations" in self:
            h = (npsumhash(self["positions"]) + npsumhash(self["orientations"])) // 2
        elif "positions" in self:
            print("We found positions")
            h = npsumhash(self["positions"])
        else:
            h = 0
        return h

    @property
    def poses(self):
        return np.concatenate([self.positions, self.orientations], axis=1)

    @property
    def poses_rad(self):
        return np.concatenate([self.positions, self.orientations_rad], axis=1)

    @property
    @deprecation.deprecated(
        deprecated_in="0.2",
        removed_in="0.2.4",
        details="We found that our calculation of 'speed_turn' is flawed and replaced it "
        "with 'actions_speeds_turns'. The difference in calculation is, that the tracked "
        "orientation is used now which gives the fish the ability to swim backwards. "
        "If you see this message and you don't know what to do, update all packages, "
        "merge to the master branch of fish_models if nothing helps, contact Andi.\n"
        "Don't ignore this warning, it's a serious issue.",
    )
    def speed_turn(self):
        """Get the speed, turn and from the positions.

        The vectors pointing from each position to the next are computed.
        The output of the function describe these vectors.
        Returns:
            An array with shape (number_of_positions -1, 3). It is one timestep shorter than the number_of_positions, since the last pose has no following timestep.
            The first column is the length of the vectors.
            The second column is the turning angle, required to get from one vector to the next.
            We assume, that the entity is oriented "correctly" in the first pose. So the first turn angle is 0.
        """

        # poses with calulated orientation have first position cut of as it does not have an orientation
        # (t - 1, (x ,y, ori))
        poses_calc_ori = self.poses_calc_ori_rad

        # Differences cuts of last item (t - 2, (dx, dy, d ori))
        diff = np.diff(poses_calc_ori, axis=0)
        speed = np.linalg.norm(diff[:, :2], axis=1)
        turn = utils.limit_angle_range(diff[:, 2], _range=(-np.pi, np.pi))
        return np.stack([speed, turn], axis=-1)

    def update_calculated_data(self, verbose=False, force_update=False):
        changed = False
        if (
            "poses_hash" not in self.attrs
            or self.attrs["poses_hash"] != self.poses_hash
            or "calculated_orientations_rad" not in self
            or "calculated_actions_speeds_turns" not in self
            or "unfinished_calculations" in self.attrs
            or force_update
        ):
            try:
                self.attrs["poses_hash"] = self.poses_hash
                if "orientations" in self:
                    self.attrs["unfinished_calculations"] = True
                    ori_rad = self.calculate_orientations_rad()
                    if "calculated_orientations_rad" in self:
                        del self["calculated_orientations_rad"]
                    self["calculated_orientations_rad"] = ori_rad.astype(np.float64)

                    speeds_turns = self.calculate_actions_speeds_turns()
                    if "calculated_actions_speeds_turns" in self:
                        del self["calculated_actions_speeds_turns"]
                    self["calculated_actions_speeds_turns"] = speeds_turns.astype(
                        np.float64
                    )
                    del self.attrs["unfinished_calculations"]

                    if verbose:
                        changed = True
                        print(
                            f"Updated calculated data for entity {self.name} with poses_hash {self.poses_hash}"
                        )
                elif verbose:
                    print(
                        "Since there were no orientations in the data, nothing was calculated."
                    )
            except RuntimeError as e:
                print("Trying to update calculated data in a read-only file")
                raise e
        else:
            if verbose:
                print(
                    f"Nothing to be updated in entity {self.name}. Poses_hash was {self.attrs['poses_hash']}"
                )

        assert self.attrs["poses_hash"] == self.poses_hash
        return changed

    def calculate_orientations_rad(self):
        ori_rad = utils.limit_angle_range(
            np.arctan2(self.orientations[:, 1], self.orientations[:, 0]),
            _range=(0, 2 * np.pi),
        )[:, np.newaxis]
        return ori_rad

    def calculate_actions_speeds_turns(self):
        """Calculate the speed, turn and from the recorded positions and orientations.

        The turn is calculated by the change of orientation between frames.
        The speed is calculated by the distance between the points, projected on the new orientation vector.
        The sideway change of position cannot be represented with this method.

        Returns:
            An array with shape (number_of_positions -1, 2 (speed in cm/frame, turn in rad/frame).
        """
        ori = self.orientations
        ori_rad = self.orientations_rad
        pos = self.positions
        turn = utils.limit_angle_range(np.diff(ori_rad, axis=0)[:, 0])
        pos_diff = np.diff(pos, axis=0)
        speed = np.array(
            [np.dot(pos_diff[i], ori[i + 1]) for i in range(pos_diff.shape[0])]
        )
        return np.stack([speed, turn], axis=-1)

    @property
    def actions_speeds_turns(self):
        if "calculated_actions_speeds_turns" in self:
            assert (
                self.attrs["poses_hash"] == self.poses_hash
            ), f"The calculated poses_hash was not identical to the stored poses_hash. Please update the calculated data after changing positions or orientations with entity.update_calculated_data(). stored hash: {self.attrs['poses_hash']}, calculated hash: {self.poses_hash}."
            return self["calculated_actions_speeds_turns"]
        else:
            return self.calculate_actions_speeds_turns()

    @property
    def orientations_rad(self):
        if "calculated_orientations_rad" in self:
            assert (
                self.attrs["poses_hash"] == self.poses_hash
            ), f"The calculated poses_hash was not identical to the stored poses_hash. Please update the calculated data after changing positions or orientations with entity.update_calculated_data(). stored hash: {self.attrs['poses_hash']}, calculated hash: {self.poses_hash}."
            return self["calculated_orientations_rad"]
        else:
            return self.calculate_orientations_rad()

Ancestors

  • h5py._hl.group.Group
  • h5py._hl.base.HLObject
  • h5py._hl.base.CommonStateObject
  • h5py._hl.base.MutableMappingHDF5
  • h5py._hl.base.MappingHDF5
  • collections.abc.MutableMapping
  • collections.abc.Mapping
  • collections.abc.Collection
  • collections.abc.Sized
  • collections.abc.Iterable
  • collections.abc.Container

Static methods

def convert_rad_to_vector(orientations_rad)
Expand source code
@classmethod
def convert_rad_to_vector(cla, orientations_rad):
    if min(orientations_rad) < 0 or max(orientations_rad) > 2 * np.pi:
        logging.warning(
            "Converting orientations, from a bigger range than [0, 2 * pi]. When passing the orientations, they are assumed to be in radians."
        )
    ori_rad = utils.np_array(orientations_rad)
    assert ori_rad.shape[1] == 1
    ori_vec = np.empty((ori_rad.shape[0], 2))
    ori_vec[:, 0] = np.cos(ori_rad[:, 0])
    ori_vec[:, 1] = np.sin(ori_rad[:, 0])
    return ori_vec
def create_entity(entities_group, category: str, poses: Iterable = None, name: str = None, positions: Iterable = None, orientations: Iterable = None, outlines: Iterable = None, sampling: str = None)
Expand source code
@classmethod
def create_entity(
    cla,
    entities_group,
    category: str,
    poses: Iterable = None,
    name: str = None,
    positions: Iterable = None,
    orientations: Iterable = None,
    outlines: Iterable = None,
    sampling: str = None,
):
    poses, positions, orientations, outlines = utils.np_array(
        poses, positions, orientations, outlines
    )

    assert poses is None or (poses.ndim == 2 and poses.shape[1] in [3, 4])
    assert positions is None or (positions.ndim == 2 and positions.shape[1] == 2)
    assert orientations is None or (
        orientations.ndim == 2 and orientations.shape[1] in [1, 2]
    )

    # If no name is given, create one from type and an id
    if name is None:
        i = 1
        name = "%s_%d" % (category, i)
        while name in entities_group and i < 10000:
            name = "%s_%d" % (category, i)
            i += 1

    # Create new group and convert it to a robofish.io.Entity object
    entity = entities_group.create_group(name)
    entity = cla.from_h5py_group(entity)

    entity.attrs["category"] = category

    entity.create_poses(poses, positions, orientations, sampling)

    if outlines is not None:
        entity.create_outlines(outlines, sampling)

    entity.update_calculated_data()

    return entity
def from_h5py_group(group)
Expand source code
@classmethod
def from_h5py_group(cla, group):
    group.__class__ = cla
    return group

Instance variables

var actions_speeds_turns
Expand source code
@property
def actions_speeds_turns(self):
    if "calculated_actions_speeds_turns" in self:
        assert (
            self.attrs["poses_hash"] == self.poses_hash
        ), f"The calculated poses_hash was not identical to the stored poses_hash. Please update the calculated data after changing positions or orientations with entity.update_calculated_data(). stored hash: {self.attrs['poses_hash']}, calculated hash: {self.poses_hash}."
        return self["calculated_actions_speeds_turns"]
    else:
        return self.calculate_actions_speeds_turns()
var category
Expand source code
@property
def category(self):
    return self.attrs["category"]
var group_name
Expand source code
@property
def group_name(self):
    return super().name
var name

Return the full name of this object. None if anonymous.

Expand source code
@property
def name(self):
    return self.group_name.split("/")[-1]
var orientations
Expand source code
@property
def orientations(self):
    if not "orientations" in self:
        # If no orientation is given, the default direction is to the right
        return np.tile([0, 1], (self.positions.shape[0], 1))
    return self["orientations"]
var orientations_rad
Expand source code
@property
def orientations_rad(self):
    if "calculated_orientations_rad" in self:
        assert (
            self.attrs["poses_hash"] == self.poses_hash
        ), f"The calculated poses_hash was not identical to the stored poses_hash. Please update the calculated data after changing positions or orientations with entity.update_calculated_data(). stored hash: {self.attrs['poses_hash']}, calculated hash: {self.poses_hash}."
        return self["calculated_orientations_rad"]
    else:
        return self.calculate_orientations_rad()
var poses
Expand source code
@property
def poses(self):
    return np.concatenate([self.positions, self.orientations], axis=1)
var poses_calc_ori

Deprecated since version: 0.2

This will be removed in 0.2.4. We found that our calculation of 'poses_calc_ori_rad' is flawed and replaced it Use the original poses ('poses_rad') with tracked orientations instead. If you see this message and you don't know what to do, update all packages, merge to the master branch of fish_models if nothing helps, contact Andi.

Don't ignore this warning, it's a serious issue.

Expand source code
@property
@deprecation.deprecated(
    deprecated_in="0.2",
    removed_in="0.2.4",
    details="We found that our calculation of 'poses_calc_ori_rad' is flawed and replaced it "
    "Use the original poses ('poses_rad') with tracked orientations instead. "
    "If you see this message and you don't know what to do, update all packages, "
    "merge to the master branch of fish_models if nothing helps, contact Andi.\n"
    "Don't ignore this warning, it's a serious issue.",
)
def poses_calc_ori(self):
    poses_cor = self.poses_calc_ori_rad
    return np.concatenate(
        [
            poses_cor[:, :2],
            np.cos(poses_cor[:, 2, np.newaxis]),
            np.sin(poses_cor[:, 2, np.newaxis]),
        ],
        axis=1,
    )
var poses_calc_ori_rad

Deprecated since version: 0.2

This will be removed in 0.2.4. We found that our calculation of 'poses_calc_ori' is flawed and replaced it Use the original poses ('poses') with tracked orientations instead. If you see this message and you don't know what to do, update all packages, merge to the master branch of fish_models if nothing helps, contact Andi.

Don't ignore this warning, it's a serious issue.

Expand source code
@property
@deprecation.deprecated(
    deprecated_in="0.2",
    removed_in="0.2.4",
    details="We found that our calculation of 'poses_calc_ori' is flawed and replaced it "
    "Use the original poses ('poses') with tracked orientations instead. "
    "If you see this message and you don't know what to do, update all packages, "
    "merge to the master branch of fish_models if nothing helps, contact Andi.\n"
    "Don't ignore this warning, it's a serious issue.",
)
def poses_calc_ori_rad(self):
    # Diff between positions [t - 1, 2]
    diff = np.diff(self.positions, axis=0)

    # angles [t - 1]
    angles = utils.limit_angle_range(
        np.arctan2(diff[:, 1], diff[:, 0]), _range=(0, 2 * np.pi)
    )

    # Positions with angles. The first position is cut of, as it does not have an orientation.
    poses_with_calculated_orientation = np.concatenate(
        [self.positions[1:], angles[:, np.newaxis]], axis=1
    )

    return poses_with_calculated_orientation
var poses_hash
Expand source code
@property
def poses_hash(self):
    # The hash of h5py datasets changes each time the file is reopened.
    # Also the hash of casting the array to bytes and calculating the hash changes.
    def npsumhash(a):
        return hash(np.nansum(a))

    if "orientations" in self:
        h = (npsumhash(self["positions"]) + npsumhash(self["orientations"])) // 2
    elif "positions" in self:
        print("We found positions")
        h = npsumhash(self["positions"])
    else:
        h = 0
    return h
var poses_rad
Expand source code
@property
def poses_rad(self):
    return np.concatenate([self.positions, self.orientations_rad], axis=1)
var positions
Expand source code
@property
def positions(self):
    return self["positions"]
var speed_turn

Get the speed, turn and from the positions.

The vectors pointing from each position to the next are computed. The output of the function describe these vectors.

Returns

An array with shape (number_of_positions -1, 3). It is one timestep shorter than the number_of_positions, since the last pose has no following timestep. The first column is the length of the vectors. The second column is the turning angle, required to get from one vector to the next. We assume, that the entity is oriented "correctly" in the first pose. So the first turn angle is 0.

Deprecated since version: 0.2

This will be removed in 0.2.4. We found that our calculation of 'speed_turn' is flawed and replaced it with 'actions_speeds_turns'. The difference in calculation is, that the tracked orientation is used now which gives the fish the ability to swim backwards. If you see this message and you don't know what to do, update all packages, merge to the master branch of fish_models if nothing helps, contact Andi.

Don't ignore this warning, it's a serious issue.

Expand source code
@property
@deprecation.deprecated(
    deprecated_in="0.2",
    removed_in="0.2.4",
    details="We found that our calculation of 'speed_turn' is flawed and replaced it "
    "with 'actions_speeds_turns'. The difference in calculation is, that the tracked "
    "orientation is used now which gives the fish the ability to swim backwards. "
    "If you see this message and you don't know what to do, update all packages, "
    "merge to the master branch of fish_models if nothing helps, contact Andi.\n"
    "Don't ignore this warning, it's a serious issue.",
)
def speed_turn(self):
    """Get the speed, turn and from the positions.

    The vectors pointing from each position to the next are computed.
    The output of the function describe these vectors.
    Returns:
        An array with shape (number_of_positions -1, 3). It is one timestep shorter than the number_of_positions, since the last pose has no following timestep.
        The first column is the length of the vectors.
        The second column is the turning angle, required to get from one vector to the next.
        We assume, that the entity is oriented "correctly" in the first pose. So the first turn angle is 0.
    """

    # poses with calulated orientation have first position cut of as it does not have an orientation
    # (t - 1, (x ,y, ori))
    poses_calc_ori = self.poses_calc_ori_rad

    # Differences cuts of last item (t - 2, (dx, dy, d ori))
    diff = np.diff(poses_calc_ori, axis=0)
    speed = np.linalg.norm(diff[:, :2], axis=1)
    turn = utils.limit_angle_range(diff[:, 2], _range=(-np.pi, np.pi))
    return np.stack([speed, turn], axis=-1)

Methods

def calculate_actions_speeds_turns(self)

Calculate the speed, turn and from the recorded positions and orientations.

The turn is calculated by the change of orientation between frames. The speed is calculated by the distance between the points, projected on the new orientation vector. The sideway change of position cannot be represented with this method.

Returns

An array with shape (number_of_positions -1, 2 (speed in cm/frame, turn in rad/frame).

Expand source code
def calculate_actions_speeds_turns(self):
    """Calculate the speed, turn and from the recorded positions and orientations.

    The turn is calculated by the change of orientation between frames.
    The speed is calculated by the distance between the points, projected on the new orientation vector.
    The sideway change of position cannot be represented with this method.

    Returns:
        An array with shape (number_of_positions -1, 2 (speed in cm/frame, turn in rad/frame).
    """
    ori = self.orientations
    ori_rad = self.orientations_rad
    pos = self.positions
    turn = utils.limit_angle_range(np.diff(ori_rad, axis=0)[:, 0])
    pos_diff = np.diff(pos, axis=0)
    speed = np.array(
        [np.dot(pos_diff[i], ori[i + 1]) for i in range(pos_diff.shape[0])]
    )
    return np.stack([speed, turn], axis=-1)
def calculate_orientations_rad(self)
Expand source code
def calculate_orientations_rad(self):
    ori_rad = utils.limit_angle_range(
        np.arctan2(self.orientations[:, 1], self.orientations[:, 0]),
        _range=(0, 2 * np.pi),
    )[:, np.newaxis]
    return ori_rad
def create_outlines(self, outlines: Iterable, sampling=None)
Expand source code
def create_outlines(self, outlines: Iterable, sampling=None):
    outlines = self.create_dataset("outlines", data=outlines, dtype=np.float32)
    if sampling is not None:
        outlines.attrs["sampling"] = sampling
def create_poses(self, poses: Iterable = None, positions: Iterable = None, orientations: Iterable = None, sampling: str = None)
Expand source code
def create_poses(
    self,
    poses: Iterable = None,
    positions: Iterable = None,
    orientations: Iterable = None,
    sampling: str = None,
):
    poses, positions, orientations = utils.np_array(poses, positions, orientations)

    # Either poses or positions not both
    assert (
        poses is None or positions is None
    ), "Only either poses or positions can be given, not both."

    if poses is None and positions is None:
        logging.warning(
            "An entity without poses was created. If this was unwanted, add 'poses' or 'positions' to the constructor"
        )
    else:
        if poses is not None:
            assert poses.shape[1] in [3, 4]
            positions = poses[:, :2]
            orientations = poses[:, 2:]
        if orientations is not None and orientations.shape[1] == 1:
            orientations = Entity.convert_rad_to_vector(orientations)

        positions = self.create_dataset(
            "positions", data=positions, dtype=np.float32
        )
        if orientations is not None:
            orientations = self.create_dataset(
                "orientations", data=orientations, dtype=np.float32
            )

        if sampling is not None:
            positions.attrs["sampling"] = sampling
            orientations.attrs["sampling"] = sampling
def update_calculated_data(self, verbose=False, force_update=False)
Expand source code
def update_calculated_data(self, verbose=False, force_update=False):
    changed = False
    if (
        "poses_hash" not in self.attrs
        or self.attrs["poses_hash"] != self.poses_hash
        or "calculated_orientations_rad" not in self
        or "calculated_actions_speeds_turns" not in self
        or "unfinished_calculations" in self.attrs
        or force_update
    ):
        try:
            self.attrs["poses_hash"] = self.poses_hash
            if "orientations" in self:
                self.attrs["unfinished_calculations"] = True
                ori_rad = self.calculate_orientations_rad()
                if "calculated_orientations_rad" in self:
                    del self["calculated_orientations_rad"]
                self["calculated_orientations_rad"] = ori_rad.astype(np.float64)

                speeds_turns = self.calculate_actions_speeds_turns()
                if "calculated_actions_speeds_turns" in self:
                    del self["calculated_actions_speeds_turns"]
                self["calculated_actions_speeds_turns"] = speeds_turns.astype(
                    np.float64
                )
                del self.attrs["unfinished_calculations"]

                if verbose:
                    changed = True
                    print(
                        f"Updated calculated data for entity {self.name} with poses_hash {self.poses_hash}"
                    )
            elif verbose:
                print(
                    "Since there were no orientations in the data, nothing was calculated."
                )
        except RuntimeError as e:
            print("Trying to update calculated data in a read-only file")
            raise e
    else:
        if verbose:
            print(
                f"Nothing to be updated in entity {self.name}. Poses_hash was {self.attrs['poses_hash']}"
            )

    assert self.attrs["poses_hash"] == self.poses_hash
    return changed