diff --git a/scripts/environments/export_IODescriptors.py b/scripts/environments/export_IODescriptors.py new file mode 100644 index 00000000000..0dea97b33b8 --- /dev/null +++ b/scripts/environments/export_IODescriptors.py @@ -0,0 +1,80 @@ +# Copyright (c) 2022-2025, The Isaac Lab Project Developers (https://github.com/isaac-sim/IsaacLab/blob/main/CONTRIBUTORS.md). +# All rights reserved. +# +# SPDX-License-Identifier: BSD-3-Clause + +"""Script to an environment with random action agent.""" + +"""Launch Isaac Sim Simulator first.""" + +import argparse + +from isaaclab.app import AppLauncher + +# add argparse arguments +parser = argparse.ArgumentParser(description="Random agent for Isaac Lab environments.") +parser.add_argument("--task", type=str, default=None, help="Name of the task.") +# append AppLauncher cli args +AppLauncher.add_app_launcher_args(parser) +# parse the arguments +args_cli = parser.parse_args() +args_cli.headless = True + +# launch omniverse app +app_launcher = AppLauncher(args_cli) +simulation_app = app_launcher.app + +"""Rest everything follows.""" + +import gymnasium as gym +import torch + +import isaaclab_tasks # noqa: F401 +from isaaclab_tasks.utils import parse_env_cfg + +# PLACEHOLDER: Extension template (do not remove this comment) + + +def main(): + """Random actions agent with Isaac Lab environment.""" + # create environment configuration + env_cfg = parse_env_cfg(args_cli.task, device=args_cli.device, num_envs=1, use_fabric=True) + # create environment + env = gym.make(args_cli.task, cfg=env_cfg) + + # print info (this is vectorized environment) + print(f"[INFO]: Gym observation space: {env.observation_space}") + print(f"[INFO]: Gym action space: {env.action_space}") + # reset environment + env.reset() + + outs = env.unwrapped.get_IO_descriptors + out = outs["observations"] + out_actions = outs["actions"] + # Make a yaml file with the output + import yaml + + name = args_cli.task.lower().replace("-", "_") + name = name.replace(" ", "_") + + with open(f"{name}_IO_descriptors.yaml", "w") as f: + yaml.safe_dump(outs, f) + + for k, v in out_actions.items(): + print(f"--- Action term: {k} ---") + for k1, v1 in v.items(): + print(f"{k1}: {v1}") + + for k, v in out.items(): + print(f"--- Obs term: {k} ---") + for k1, v1 in v.items(): + print(f"{k1}: {v1}") + env.step(torch.zeros(env.action_space.shape, device=env.unwrapped.device)) + env.close() + + +if __name__ == "__main__": + # run the main function + main() + # close sim app + simulation_app.close() diff --git a/source/isaaclab/isaaclab/envs/manager_based_env.py b/source/isaaclab/isaaclab/envs/manager_based_env.py index 1febf07d70a..50ce4ad186f 100644 --- a/source/isaaclab/isaaclab/envs/manager_based_env.py +++ b/source/isaaclab/isaaclab/envs/manager_based_env.py @@ -210,6 +210,18 @@ def device(self): """The device on which the environment is running.""" return self.sim.device + @property + def get_IO_descriptors(self): + """Get the IO descriptors for the environment. + + Returns: + A dictionary with keys as the group names and values as the IO descriptors. + """ + return { + "observations": self.observation_manager.get_IO_descriptors, + "actions": self.action_manager.get_IO_descriptors, + } + """ Operations - Setup. """ diff --git a/source/isaaclab/isaaclab/envs/mdp/actions/binary_joint_actions.py b/source/isaaclab/isaaclab/envs/mdp/actions/binary_joint_actions.py index f3a4fa37af1..9b8666e4464 100644 --- a/source/isaaclab/isaaclab/envs/mdp/actions/binary_joint_actions.py +++ b/source/isaaclab/isaaclab/envs/mdp/actions/binary_joint_actions.py @@ -17,6 +17,7 @@ if TYPE_CHECKING: from isaaclab.envs import ManagerBasedEnv + from isaaclab.envs.utils.io_descriptors import GenericActionIODescriptor from . import actions_cfg @@ -111,6 +112,15 @@ def raw_actions(self) -> torch.Tensor: def processed_actions(self) -> torch.Tensor: return self._processed_actions + @property + def IO_descriptor(self) -> GenericActionIODescriptor: + super().IO_descriptor + self._IO_descriptor.shape = (self.action_dim,) + self._IO_descriptor.dtype = str(self.raw_actions.dtype) + self._IO_descriptor.action_type = "JointAction" + self._IO_descriptor.joint_names = self._joint_names + return self._IO_descriptor + """ Operations. """ diff --git a/source/isaaclab/isaaclab/envs/mdp/actions/joint_actions.py b/source/isaaclab/isaaclab/envs/mdp/actions/joint_actions.py index 0e2689fd254..8341c59184f 100644 --- a/source/isaaclab/isaaclab/envs/mdp/actions/joint_actions.py +++ b/source/isaaclab/isaaclab/envs/mdp/actions/joint_actions.py @@ -17,6 +17,7 @@ if TYPE_CHECKING: from isaaclab.envs import ManagerBasedEnv + from isaaclab.envs.utils.io_descriptors import GenericActionIODescriptor from . import actions_cfg @@ -123,6 +124,25 @@ def raw_actions(self) -> torch.Tensor: def processed_actions(self) -> torch.Tensor: return self._processed_actions + @property + def IO_descriptor(self) -> GenericActionIODescriptor: + super().IO_descriptor + self._IO_descriptor.shape = (self.action_dim,) + self._IO_descriptor.dtype = str(self.raw_actions.dtype) + self._IO_descriptor.action_type = "JointAction" + self._IO_descriptor.joint_names = self._joint_names + self._IO_descriptor.scale = self._scale + # This seems to be always [4xNum_joints] IDK why. Need to check. + if isinstance(self._offset, torch.Tensor): + self._IO_descriptor.offset = self._offset[0].detach().cpu().numpy().tolist() + else: + self._IO_descriptor.offset = self._offset + if self.cfg.clip is not None: + self._IO_descriptor.clip = self._clip + else: + self._IO_descriptor.clip = None + return self._IO_descriptor + """ Operations. """ diff --git a/source/isaaclab/isaaclab/envs/mdp/actions/joint_actions_to_limits.py b/source/isaaclab/isaaclab/envs/mdp/actions/joint_actions_to_limits.py index 5398241e15e..d056d2e6ac6 100644 --- a/source/isaaclab/isaaclab/envs/mdp/actions/joint_actions_to_limits.py +++ b/source/isaaclab/isaaclab/envs/mdp/actions/joint_actions_to_limits.py @@ -18,6 +18,7 @@ if TYPE_CHECKING: from isaaclab.envs import ManagerBasedEnv + from isaaclab.envs.utils.io_descriptors import GenericActionIODescriptor from . import actions_cfg @@ -105,6 +106,25 @@ def raw_actions(self) -> torch.Tensor: def processed_actions(self) -> torch.Tensor: return self._processed_actions + @property + def IO_descriptor(self) -> GenericActionIODescriptor: + super().IO_descriptor + self._IO_descriptor.shape = (self.action_dim,) + self._IO_descriptor.dtype = str(self.raw_actions.dtype) + self._IO_descriptor.action_type = "JointAction" + self._IO_descriptor.joint_names = self._joint_names + self._IO_descriptor.scale = self._scale + # This seems to be always [4xNum_joints] IDK why. Need to check. + if isinstance(self._offset, torch.Tensor): + self._IO_descriptor.offset = self._offset[0].detach().cpu().numpy().tolist() + else: + self._IO_descriptor.offset = self._offset + if self.cfg.clip is not None: + self._IO_descriptor.clip = self._clip + else: + self._IO_descriptor.clip = None + return self._IO_descriptor + """ Operations. """ @@ -195,6 +215,20 @@ def __init__(self, cfg: actions_cfg.EMAJointPositionToLimitsActionCfg, env: Mana # initialize the previous targets self._prev_applied_actions = torch.zeros_like(self.processed_actions) + @property + def IO_descriptor(self) -> GenericActionIODescriptor: + super().IO_descriptor + if isinstance(self._alpha, float): + self._IO_descriptor.alpha = self._alpha + elif isinstance(self._alpha, torch.Tensor): + self._IO_descriptor.alpha = self._alpha[0].detach().cpu().numpy().tolist() + else: + raise ValueError( + f"Unsupported moving average weight type: {type(self._alpha)}. Supported types are float and" + " torch.Tensor." + ) + return self._IO_descriptor + def reset(self, env_ids: Sequence[int] | None = None) -> None: # check if specific environment ids are provided if env_ids is None: diff --git a/source/isaaclab/isaaclab/envs/mdp/actions/non_holonomic_actions.py b/source/isaaclab/isaaclab/envs/mdp/actions/non_holonomic_actions.py index a3e5cebdbf5..5d168da4151 100644 --- a/source/isaaclab/isaaclab/envs/mdp/actions/non_holonomic_actions.py +++ b/source/isaaclab/isaaclab/envs/mdp/actions/non_holonomic_actions.py @@ -18,6 +18,7 @@ if TYPE_CHECKING: from isaaclab.envs import ManagerBasedEnv + from isaaclab.envs.utils.io_descriptors import GenericActionIODescriptor from . import actions_cfg @@ -134,6 +135,21 @@ def raw_actions(self) -> torch.Tensor: def processed_actions(self) -> torch.Tensor: return self._processed_actions + @property + def IO_descriptor(self) -> GenericActionIODescriptor: + super().IO_descriptor + self._IO_descriptor.shape = (self.action_dim,) + self._IO_descriptor.dtype = str(self.raw_actions.dtype) + self._IO_descriptor.action_type = "non holonomic actions" + self._IO_descriptor.scale = self._scale + self._IO_descriptor.offset = self._offset + self._IO_descriptor.clip = self._clip + self._IO_descriptor.body_name = self._body_name + self._IO_descriptor.x_joint_name = self._joint_names[0] + self._IO_descriptor.y_joint_name = self._joint_names[1] + self._IO_descriptor.yaw_joint_name = self._joint_names[2] + return self._IO_descriptor + """ Operations. """ diff --git a/source/isaaclab/isaaclab/envs/mdp/actions/pink_task_space_actions.py b/source/isaaclab/isaaclab/envs/mdp/actions/pink_task_space_actions.py index 11c3ff6cedf..30980fa1fb4 100644 --- a/source/isaaclab/isaaclab/envs/mdp/actions/pink_task_space_actions.py +++ b/source/isaaclab/isaaclab/envs/mdp/actions/pink_task_space_actions.py @@ -22,6 +22,7 @@ if TYPE_CHECKING: from isaaclab.envs import ManagerBasedEnv + from isaaclab.envs.utils.io_descriptors import GenericActionIODescriptor from . import pink_actions_cfg @@ -130,6 +131,17 @@ def processed_actions(self) -> torch.Tensor: """Get the processed actions tensor.""" return self._processed_actions + @property + def IO_descriptor(self) -> GenericActionIODescriptor: + super().IO_descriptor + self._IO_descriptor.shape = (self.action_dim,) + self._IO_descriptor.dtype = str(self.raw_actions.dtype) + self._IO_descriptor.action_type = "PinkInverseKinematicsAction" + self._IO_descriptor.pink_controller_joint_names = self._pink_controlled_joint_names + self._IO_descriptor.hand_joint_names = self._hand_joint_names + self._IO_descriptor.extras["controller_cfg"] = self.cfg.controller.__dict__ + return self._IO_descriptor + # """ # Operations. # """ diff --git a/source/isaaclab/isaaclab/envs/mdp/actions/task_space_actions.py b/source/isaaclab/isaaclab/envs/mdp/actions/task_space_actions.py index 89f51817179..bde914ccae9 100644 --- a/source/isaaclab/isaaclab/envs/mdp/actions/task_space_actions.py +++ b/source/isaaclab/isaaclab/envs/mdp/actions/task_space_actions.py @@ -23,6 +23,7 @@ if TYPE_CHECKING: from isaaclab.envs import ManagerBasedEnv + from isaaclab.envs.utils.io_descriptors import GenericActionIODescriptor from . import actions_cfg @@ -148,6 +149,23 @@ def jacobian_b(self) -> torch.Tensor: jacobian[:, 3:, :] = torch.bmm(base_rot_matrix, jacobian[:, 3:, :]) return jacobian + @property + def IO_descriptor(self) -> GenericActionIODescriptor: + super().IO_descriptor + self._IO_descriptor.shape = (self.action_dim,) + self._IO_descriptor.dtype = str(self.raw_actions.dtype) + self._IO_descriptor.action_type = "TaskSpaceAction" + self._IO_descriptor.body_name = self._body_name + self._IO_descriptor.joint_names = self._joint_names + self._IO_descriptor.scale = self._scale + if self.cfg.clip is not None: + self._IO_descriptor.clip = self.cfg.clip + else: + self._IO_descriptor.clip = None + self._IO_descriptor.extras["controller_cfg"] = self.cfg.controller.__dict__ + self._IO_descriptor.extras["body_offset"] = self.cfg.body_offset.__dict__ + return self._IO_descriptor + """ Operations. """ @@ -409,6 +427,28 @@ def jacobian_b(self) -> torch.Tensor: jacobian[:, 3:, :] = torch.bmm(base_rot_matrix, jacobian[:, 3:, :]) return jacobian + @property + def IO_descriptor(self) -> GenericActionIODescriptor: + super().IO_descriptor + self._IO_descriptor.shape = (self.action_dim,) + self._IO_descriptor.dtype = str(self.raw_actions.dtype) + self._IO_descriptor.action_type = "TaskSpaceAction" + self._IO_descriptor.body_name = self._ee_body_name + self._IO_descriptor.joint_names = self._joint_names + self._IO_descriptor.position_scale = self.cfg.position_scale + self._IO_descriptor.orientation_scale = self.cfg.orientation_scale + self._IO_descriptor.wrench_scale = self.cfg.wrench_scale + self._IO_descriptor.stiffness_scale = self.cfg.stiffness_scale + self._IO_descriptor.damping_ratio_scale = self.cfg.damping_ratio_scale + self._IO_descriptor.nullspace_joint_pos_target = self.cfg.nullspace_joint_pos_target + if self.cfg.clip is not None: + self._IO_descriptor.clip = self.cfg.clip + else: + self._IO_descriptor.clip = None + self._IO_descriptor.extras["controller_cfg"] = self.cfg.controller_cfg.__dict__ + self._IO_descriptor.extras["body_offset"] = self.cfg.body_offset.__dict__ + return self._IO_descriptor + """ Operations. """ diff --git a/source/isaaclab/isaaclab/envs/mdp/observations.py b/source/isaaclab/isaaclab/envs/mdp/observations.py index 745482f8c7e..5e1ffbdbec0 100644 --- a/source/isaaclab/isaaclab/envs/mdp/observations.py +++ b/source/isaaclab/isaaclab/envs/mdp/observations.py @@ -24,12 +24,20 @@ if TYPE_CHECKING: from isaaclab.envs import ManagerBasedEnv, ManagerBasedRLEnv +from isaaclab.envs.utils.io_descriptors import ( + generic_io_descriptor, + record_body_names, + record_dtype, + record_joint_names, + record_shape, +) """ Root state. """ +@generic_io_descriptor(units="m", axes=["Z"], observation_type="RootState", on_inspect=[record_shape, record_dtype]) def base_pos_z(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg("robot")) -> torch.Tensor: """Root height in the simulation world frame.""" # extract the used quantities (to enable type-hinting) @@ -37,6 +45,9 @@ def base_pos_z(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg( return asset.data.root_pos_w[:, 2].unsqueeze(-1) +@generic_io_descriptor( + units="m/s", axes=["X", "Y", "Z"], observation_type="RootState", on_inspect=[record_shape, record_dtype] +) def base_lin_vel(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg("robot")) -> torch.Tensor: """Root linear velocity in the asset's root frame.""" # extract the used quantities (to enable type-hinting) @@ -44,6 +55,9 @@ def base_lin_vel(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityCf return asset.data.root_lin_vel_b +@generic_io_descriptor( + units="rad/s", axes=["X", "Y", "Z"], observation_type="RootState", on_inspect=[record_shape, record_dtype] +) def base_ang_vel(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg("robot")) -> torch.Tensor: """Root angular velocity in the asset's root frame.""" # extract the used quantities (to enable type-hinting) @@ -51,6 +65,9 @@ def base_ang_vel(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityCf return asset.data.root_ang_vel_b +@generic_io_descriptor( + units="m/s^2", axes=["X", "Y", "Z"], observation_type="RootState", on_inspect=[record_shape, record_dtype] +) def projected_gravity(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg("robot")) -> torch.Tensor: """Gravity projection on the asset's root frame.""" # extract the used quantities (to enable type-hinting) @@ -58,6 +75,9 @@ def projected_gravity(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEnt return asset.data.projected_gravity_b +@generic_io_descriptor( + units="m", axes=["X", "Y", "Z"], observation_type="RootState", on_inspect=[record_shape, record_dtype] +) def root_pos_w(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg("robot")) -> torch.Tensor: """Asset root position in the environment frame.""" # extract the used quantities (to enable type-hinting) @@ -65,6 +85,9 @@ def root_pos_w(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg( return asset.data.root_pos_w - env.scene.env_origins +@generic_io_descriptor( + units="unit", axes=["W", "X", "Y", "Z"], observation_type="RootState", on_inspect=[record_shape, record_dtype] +) def root_quat_w( env: ManagerBasedEnv, make_quat_unique: bool = False, asset_cfg: SceneEntityCfg = SceneEntityCfg("robot") ) -> torch.Tensor: @@ -82,6 +105,9 @@ def root_quat_w( return math_utils.quat_unique(quat) if make_quat_unique else quat +@generic_io_descriptor( + units="m/s", axes=["X", "Y", "Z"], observation_type="RootState", on_inspect=[record_shape, record_dtype] +) def root_lin_vel_w(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg("robot")) -> torch.Tensor: """Asset root linear velocity in the environment frame.""" # extract the used quantities (to enable type-hinting) @@ -89,6 +115,9 @@ def root_lin_vel_w(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntity return asset.data.root_lin_vel_w +@generic_io_descriptor( + units="rad/s", axes=["X", "Y", "Z"], observation_type="RootState", on_inspect=[record_shape, record_dtype] +) def root_ang_vel_w(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg("robot")) -> torch.Tensor: """Asset root angular velocity in the environment frame.""" # extract the used quantities (to enable type-hinting) @@ -101,6 +130,7 @@ def root_ang_vel_w(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntity """ +@generic_io_descriptor(observation_type="BodyState", on_inspect=[record_shape, record_dtype, record_body_names]) def body_pose_w( env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg("robot"), @@ -124,6 +154,7 @@ def body_pose_w( return pose.reshape(env.num_envs, -1) +@generic_io_descriptor(observation_type="BodyState", on_inspect=[record_shape, record_dtype, record_body_names]) def body_projected_gravity_b( env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg("robot"), @@ -153,6 +184,9 @@ def body_projected_gravity_b( """ +@generic_io_descriptor( + observation_type="JointState", on_inspect=[record_joint_names, record_dtype, record_shape], units="rad" +) def joint_pos(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg("robot")) -> torch.Tensor: """The joint positions of the asset. @@ -163,6 +197,9 @@ def joint_pos(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg(" return asset.data.joint_pos[:, asset_cfg.joint_ids] +@generic_io_descriptor( + observation_type="JointState", on_inspect=[record_joint_names, record_dtype, record_shape], units="rad" +) def joint_pos_rel(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg("robot")) -> torch.Tensor: """The joint positions of the asset w.r.t. the default joint positions. @@ -173,6 +210,7 @@ def joint_pos_rel(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityC return asset.data.joint_pos[:, asset_cfg.joint_ids] - asset.data.default_joint_pos[:, asset_cfg.joint_ids] +@generic_io_descriptor(observation_type="JointState", on_inspect=[record_joint_names, record_dtype, record_shape]) def joint_pos_limit_normalized( env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg("robot") ) -> torch.Tensor: @@ -189,6 +227,9 @@ def joint_pos_limit_normalized( ) +@generic_io_descriptor( + observation_type="JointState", on_inspect=[record_joint_names, record_dtype, record_shape], units="rad/s" +) def joint_vel(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg("robot")): """The joint velocities of the asset. @@ -199,6 +240,9 @@ def joint_vel(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg(" return asset.data.joint_vel[:, asset_cfg.joint_ids] +@generic_io_descriptor( + observation_type="JointState", on_inspect=[record_joint_names, record_dtype, record_shape], units="rad/s" +) def joint_vel_rel(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg("robot")): """The joint velocities of the asset w.r.t. the default joint velocities. @@ -209,6 +253,9 @@ def joint_vel_rel(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityC return asset.data.joint_vel[:, asset_cfg.joint_ids] - asset.data.default_joint_vel[:, asset_cfg.joint_ids] +@generic_io_descriptor( + observation_type="JointState", on_inspect=[record_joint_names, record_dtype, record_shape], units="N.m" +) def joint_effort(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg("robot")) -> torch.Tensor: """The joint applied effort of the robot. @@ -578,6 +625,7 @@ def _inference(model, images: torch.Tensor) -> torch.Tensor: """ +@generic_io_descriptor(dtype=torch.float32, observation_type="Action", on_inspect=[record_shape]) def last_action(env: ManagerBasedEnv, action_name: str | None = None) -> torch.Tensor: """The last input action to the environment. @@ -595,7 +643,8 @@ def last_action(env: ManagerBasedEnv, action_name: str | None = None) -> torch.T """ -def generated_commands(env: ManagerBasedRLEnv, command_name: str) -> torch.Tensor: +@generic_io_descriptor(dtype=torch.float32, observation_type="Command", on_inspect=[record_shape]) +def generated_commands(env: ManagerBasedRLEnv, command_name: str | None = None) -> torch.Tensor: """The generated command from command term in the command manager with the given name.""" return env.command_manager.get_command(command_name) diff --git a/source/isaaclab/isaaclab/envs/utils/io_descriptors.py b/source/isaaclab/isaaclab/envs/utils/io_descriptors.py new file mode 100644 index 00000000000..4746acc4da8 --- /dev/null +++ b/source/isaaclab/isaaclab/envs/utils/io_descriptors.py @@ -0,0 +1,201 @@ +# Copyright (c) 2022-2025, The Isaac Lab Project Developers (https://github.com/isaac-sim/IsaacLab/blob/main/CONTRIBUTORS.md). +# All rights reserved. +# +# SPDX-License-Identifier: BSD-3-Clause + +from __future__ import annotations + +from collections.abc import Callable +from typing import TYPE_CHECKING, Any, Concatenate, ParamSpec, TypeVar + +from isaaclab.utils import configclass + +if TYPE_CHECKING: + from isaaclab.envs import ManagerBasedEnv + from isaaclab.assets.articulation import Articulation + import torch + +import dataclasses +import functools +import inspect + + +@configclass +class GenericActionIODescriptor: + mdp_type: str = "Action" + name: str = None + full_path: str = None + description: str = None + shape: tuple[int, ...] = None + dtype: str = None + action_type: str = None + extras: dict[str, Any] = {} + + +@configclass +class GenericIODescriptor: + mdp_type: str = "Observation" + name: str = None + full_path: str = None + description: str = None + shape: tuple[int, ...] = None + dtype: str = None + observation_type: str = None + extras: dict[str, Any] = {} + + +# These are defined to help with type hinting +P = ParamSpec("P") +R = TypeVar("R") + + +# Automatically builds a descriptor from the kwargs +def _make_descriptor(**kwargs: Any) -> GenericIODescriptor: + """Split *kwargs* into (known dataclass fields) and (extras).""" + field_names = {f.name for f in dataclasses.fields(GenericIODescriptor)} + known = {k: v for k, v in kwargs.items() if k in field_names} + extras = {k: v for k, v in kwargs.items() if k not in field_names} + + desc = GenericIODescriptor(**known) + # User defined extras are stored in the descriptor under the `extras` field + desc.extras = extras + return desc + + +# Decorator factory for generic IO descriptors. +def generic_io_descriptor( + _func: Callable[Concatenate[ManagerBasedEnv, P], R] | None = None, + *, + on_inspect: Callable[..., Any] | list[Callable[..., Any]] | None = None, + **descriptor_kwargs: Any, +) -> Callable[[Callable[Concatenate[ManagerBasedEnv, P], R]], Callable[Concatenate[ManagerBasedEnv, P], R]]: + """ + Decorator factory for generic IO descriptors. + + This decorator can be used in different ways: + 1. The default decorator has all the information I need for my use case: + ..code-block:: python + @generic_io_descriptor(GenericIODescriptor(description="..", dtype="..")) + def my_func(env: ManagerBasedEnv, *args, **kwargs): + ... + ..note:: If description is not set, the function's docstring is used to populate it. + + 2. I need to add more information to the descriptor: + ..code-block:: python + @generic_io_descriptor(description="..", new_var_1="a", new_var_2="b") + def my_func(env: ManagerBasedEnv, *args, **kwargs): + ... + 3. I need to add a hook to the descriptor: + ..code-block:: python + def record_shape(tensor: torch.Tensor, desc: GenericIODescriptor, **kwargs): + desc.shape = (tensor.shape[-1],) + + @generic_io_descriptor(description="..", new_var_1="a", new_var_2="b", on_inspect=[record_shape, record_dtype]) + def my_func(env: ManagerBasedEnv, *args, **kwargs): + ..note:: The hook is called after the function is called, if and only if the `inspect` flag is set when calling the function. + + For example: + ..code-block:: python + my_func(env, inspect=True) + + 4. I need to add a hook to the descriptor and this hook will write to a variable that is not part of the base descriptor. + ..code-block:: python + def record_joint_names(output: torch.Tensor, descriptor: GenericIODescriptor, **kwargs): + asset: Articulation = kwargs["env"].scene[kwargs["asset_cfg"].name] + joint_ids = kwargs["asset_cfg"].joint_ids + if joint_ids == slice(None, None, None): + joint_ids = list(range(len(asset.joint_names))) + descriptor.joint_names = [asset.joint_names[i] for i in joint_ids] + + @generic_io_descriptor(new_var_1="a", new_var_2="b", on_inspect=[record_shape, record_dtype, record_joint_names]) + def my_func(env: ManagerBasedEnv, *args, **kwargs): + + ..note:: The hook can access all the variables in the wrapped function's signature. While it is useful, the user should be careful to + access only existing variables. + + Args: + _func: The function to decorate. + **descriptor_kwargs: Keyword arguments to pass to the descriptor. + + Returns: + A decorator that can be used to decorate a function. + """ + + if _func is not None and isinstance(_func, GenericIODescriptor): + descriptor = _func + _func = None + else: + descriptor = _make_descriptor(**descriptor_kwargs) + + # Ensures the hook is a list + if callable(on_inspect): + inspect_hooks: list[Callable[..., Any]] = [on_inspect] + else: + inspect_hooks: list[Callable[..., Any]] = list(on_inspect or []) # handles None + + def _apply(func: Callable[Concatenate[ManagerBasedEnv, P], R]) -> Callable[Concatenate[ManagerBasedEnv, P], R]: + + # Capture the signature of the function + sig = inspect.signature(func) + + @functools.wraps(func) + def wrapper(env: ManagerBasedEnv, *args: P.args, **kwargs: P.kwargs) -> R: + inspect_flag: bool = kwargs.pop("inspect", False) + out = func(env, *args, **kwargs) + if inspect_flag: + # Injects the function's arguments into the hooks and applies the defaults + bound = sig.bind(env, *args, **kwargs) + bound.apply_defaults() + call_kwargs = { + "output": out, + "descriptor": descriptor, + **bound.arguments, + } + for hook in inspect_hooks: + hook(**call_kwargs) + return out + + # --- Descriptor bookkeeping --- + descriptor.name = func.__name__ + descriptor.full_path = f"{func.__module__}.{func.__name__}" + descriptor.dtype = str(descriptor.dtype) + # Check if description is set in the descriptor + if descriptor.description is None: + descriptor.description = " ".join(func.__doc__.split()) + + # Adds the descriptor to the wrapped function as an attribute + wrapper._descriptor = descriptor + wrapper._has_descriptor = True + # Alters the signature of the wrapped function to make it match the original function. + # This allows the wrapped functions to pass the checks in the managers. + wrapper.__signature__ = sig + return wrapper + + # If the decorator is used without parentheses, _func will be the function itself. + if callable(_func): + return _apply(_func) + return _apply + + +def record_shape(output: torch.Tensor, descriptor: GenericIODescriptor, **kwargs): + descriptor.shape = (output.shape[-1],) + + +def record_dtype(output: torch.Tensor, descriptor: GenericIODescriptor, **kwargs): + descriptor.dtype = str(output.dtype) + + +def record_joint_names(output: torch.Tensor, descriptor: GenericIODescriptor, **kwargs): + asset: Articulation = kwargs["env"].scene[kwargs["asset_cfg"].name] + joint_ids = kwargs["asset_cfg"].joint_ids + if joint_ids == slice(None, None, None): + joint_ids = list(range(len(asset.joint_names))) + descriptor.joint_names = [asset.joint_names[i] for i in joint_ids] + + +def record_body_names(output: torch.Tensor, descriptor: GenericIODescriptor, **kwargs): + asset: Articulation = kwargs["env"].scene[kwargs["asset_cfg"].name] + body_ids = kwargs["asset_cfg"].body_ids + if body_ids == slice(None, None, None): + body_ids = list(range(len(asset.body_names))) + descriptor.body_names = [asset.body_names[i] for i in body_ids] diff --git a/source/isaaclab/isaaclab/managers/action_manager.py b/source/isaaclab/isaaclab/managers/action_manager.py index 72e78e68cb9..7d2f6f1bd03 100644 --- a/source/isaaclab/isaaclab/managers/action_manager.py +++ b/source/isaaclab/isaaclab/managers/action_manager.py @@ -8,16 +8,18 @@ from __future__ import annotations import inspect +import re import torch import weakref from abc import abstractmethod from collections.abc import Sequence from prettytable import PrettyTable -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Any import omni.kit.app from isaaclab.assets import AssetBase +from isaaclab.envs.utils.io_descriptors import GenericActionIODescriptor from .manager_base import ManagerBase, ManagerTermBase from .manager_term_cfg import ActionTermCfg @@ -50,6 +52,7 @@ def __init__(self, cfg: ActionTermCfg, env: ManagerBasedEnv): super().__init__(cfg, env) # parse config to obtain asset to which the term is applied self._asset: AssetBase = self._env.scene[self.cfg.asset_name] + self._IO_descriptor = GenericActionIODescriptor() # add handle for debug visualization (this is set to a valid handle inside set_debug_vis) self._debug_vis_handle = None @@ -91,6 +94,14 @@ def has_debug_vis_implementation(self) -> bool: source_code = inspect.getsource(self._set_debug_vis_impl) return "NotImplementedError" not in source_code + @property + def IO_descriptor(self) -> GenericActionIODescriptor: + """The IO descriptor for the action term.""" + self._IO_descriptor.name = re.sub(r"([a-z])([A-Z])", r"\1_\2", self.__class__.__name__).lower() + self._IO_descriptor.full_path = f"{self.__class__.__module__}.{self.__class__.__name__}" + self._IO_descriptor.description = " ".join(self.__class__.__doc__.split()) + return self._IO_descriptor + """ Operations. """ @@ -259,6 +270,38 @@ def has_debug_vis_implementation(self) -> bool: has_debug_vis |= term.has_debug_vis_implementation return has_debug_vis + @property + def get_IO_descriptors(self) -> dict[str, dict[str, Any]]: + """Get the IO descriptors for the action manager. + + Returns: + A dictionary with keys as the term names and values as the IO descriptors. + """ + + data = [] + + for term_name, term in self._terms.items(): + try: + data.append(term.IO_descriptor.__dict__.copy()) + except Exception as e: + print(f"Error getting IO descriptor for term '{term_name}': {e}") + + formatted_data = {} + for item in data: + name = item.pop("name") + formatted_item = {"extras": item.pop("extras")} + for k, v in item.items(): + # Check if v is a tuple and convert to list + if isinstance(v, tuple): + v = list(v) + if k in ["description", "units"]: + formatted_item["extras"][k] = v + else: + formatted_item[k] = v + formatted_data[name] = formatted_item + + return formatted_data + """ Operations. """ diff --git a/source/isaaclab/isaaclab/managers/observation_manager.py b/source/isaaclab/isaaclab/managers/observation_manager.py index 3524260b241..c1e0c09b4d6 100644 --- a/source/isaaclab/isaaclab/managers/observation_manager.py +++ b/source/isaaclab/isaaclab/managers/observation_manager.py @@ -225,6 +225,64 @@ def group_obs_concatenate(self) -> dict[str, bool]: """ return self._group_obs_concatenate + @property + def get_IO_descriptors(self): + """Get the IO descriptors for the observation manager. + + Returns: + A dictionary with keys as the group names and values as the IO descriptors. + """ + + data = [] + + for group_name in self._group_obs_term_names: + # check ig group name is valid + if group_name not in self._group_obs_term_names: + raise ValueError( + f"Unable to find the group '{group_name}' in the observation manager." + f" Available groups are: {list(self._group_obs_term_names.keys())}" + ) + # iterate over all the terms in each group + group_term_names = self._group_obs_term_names[group_name] + # read attributes for each term + obs_terms = zip(group_term_names, self._group_obs_term_cfgs[group_name]) + + for term_name, term_cfg in obs_terms: + # Call to the observation function to get the IO descriptor with the inspect flag set to True + try: + term_cfg.func(self._env, **term_cfg.params, inspect=True) + # Copy the descriptor and update with the term's own extra parameters + desc = term_cfg.func._descriptor.__dict__.copy() + # Create a dictionary to store the overloads + overloads = {} + # Iterate over the term's own parameters and add them to the overloads dictionary + for k, v in term_cfg.__dict__.items(): + # For now we do not add the noise modifier + if k in ["modifiers", "clip", "scale", "history_length", "flatten_history_dim"]: + overloads[k] = v + desc.update(overloads) + data.append(desc) + except Exception as e: + print(f"Error getting IO descriptor for term '{term_name}' in group '{group_name}': {e}") + # Format the data for YAML export + formatted_data = {} + for item in data: + name = item.pop("name") + formatted_item = {"overloads": {}, "extras": item.pop("extras")} + for k, v in item.items(): + # Check if v is a tuple and convert to list + if isinstance(v, tuple): + v = list(v) + if k in ["scale", "clip", "history_length", "flatten_history_dim"]: + formatted_item["overloads"][k] = v + elif k in ["modifiers", "description", "units"]: + formatted_item["extras"][k] = v + else: + formatted_item[k] = v + formatted_data[name] = formatted_item + + return formatted_data + """ Operations. """