From dd542eea5ab408833353dfde6107de84549b82a9 Mon Sep 17 00:00:00 2001 From: Antoine Richard Date: Tue, 20 May 2025 20:10:33 +0200 Subject: [PATCH 01/16] very quick prototyping of the Obs outputs --- scripts/environments/random_agent.py | 8 ++ .../isaaclab/envs/manager_based_env.py | 9 ++ .../isaaclab/envs/mdp/observations.py | 116 +++++++++++++++++- 3 files changed, 129 insertions(+), 4 deletions(-) diff --git a/scripts/environments/random_agent.py b/scripts/environments/random_agent.py index b3187c3b372..9422680e992 100644 --- a/scripts/environments/random_agent.py +++ b/scripts/environments/random_agent.py @@ -47,11 +47,19 @@ def main(): # create environment env = gym.make(args_cli.task, cfg=env_cfg) + + # print info (this is vectorized environment) print(f"[INFO]: Gym observation space: {env.observation_space}") print(f"[INFO]: Gym action space: {env.action_space}") # reset environment env.reset() + + out = env.unwrapped.get_IO_descriptors + for o in out: + print(f"--- Obs term: {o.name} ---") + for k, v in o.__dict__.items(): + print(f"{k}: {v}") # simulate environment while simulation_app.is_running(): # run everything in inference mode diff --git a/source/isaaclab/isaaclab/envs/manager_based_env.py b/source/isaaclab/isaaclab/envs/manager_based_env.py index 1febf07d70a..6e7d4e7a19b 100644 --- a/source/isaaclab/isaaclab/envs/manager_based_env.py +++ b/source/isaaclab/isaaclab/envs/manager_based_env.py @@ -209,6 +209,15 @@ def step_dt(self) -> float: def device(self): """The device on which the environment is running.""" return self.sim.device + + @property + def get_IO_descriptors(self): + """Get the IO descriptors for the environment. + + Returns: + A dictionary with keys as the group names and values as the IO descriptors. + """ + return self.observation_manager.get_IO_descriptors """ Operations - Setup. diff --git a/source/isaaclab/isaaclab/envs/mdp/observations.py b/source/isaaclab/isaaclab/envs/mdp/observations.py index 745482f8c7e..33a80d36398 100644 --- a/source/isaaclab/isaaclab/envs/mdp/observations.py +++ b/source/isaaclab/isaaclab/envs/mdp/observations.py @@ -24,12 +24,41 @@ if TYPE_CHECKING: from isaaclab.envs import ManagerBasedEnv, ManagerBasedRLEnv +from isaaclab.utils import configclass +from dataclasses import MISSING + +@configclass +class IODescriptor: + mdp_type: str = "Observation" + name: str = None + description: str = None + shape: tuple[int, ...] = None + dtype: torch.dtype = None + observation_type: str = None + """ Root state. """ +@configclass +class RootStateIODescriptor(IODescriptor): + observation_type: str = "RootState" + axes: list[str] = None + units: str = None + + +def root_state_io_descriptor(descriptor: RootStateIODescriptor): + def decorator(func): + def wrapper(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg("robot"), inspect: bool = False): + return func(env, asset_cfg) + descriptor.name = func.__name__ + wrapper._has_descriptor = True + wrapper._descriptor = descriptor + return wrapper + return decorator +@root_state_io_descriptor(RootStateIODescriptor(description="Root height in the world frame.", units="m", axes=["Z"], shape=(1,), dtype=torch.float32)) def base_pos_z(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg("robot")) -> torch.Tensor: """Root height in the simulation world frame.""" # extract the used quantities (to enable type-hinting) @@ -37,6 +66,7 @@ def base_pos_z(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg( return asset.data.root_pos_w[:, 2].unsqueeze(-1) +@root_state_io_descriptor(RootStateIODescriptor(description="Root linear velocity in the robot's frame.", units="m/s", axes=["X", "Y", "Z"], shape=(3,), dtype=torch.float32)) def base_lin_vel(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg("robot")) -> torch.Tensor: """Root linear velocity in the asset's root frame.""" # extract the used quantities (to enable type-hinting) @@ -44,6 +74,7 @@ def base_lin_vel(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityCf return asset.data.root_lin_vel_b +@root_state_io_descriptor(RootStateIODescriptor(description="Root angular velocity in the robot's frame.", units="rad/s", axes=["X", "Y", "Z"], shape=(3,), dtype=torch.float32)) def base_ang_vel(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg("robot")) -> torch.Tensor: """Root angular velocity in the asset's root frame.""" # extract the used quantities (to enable type-hinting) @@ -51,6 +82,7 @@ def base_ang_vel(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityCf return asset.data.root_ang_vel_b +@root_state_io_descriptor(RootStateIODescriptor(description="Projection of gravity in the robot's root frame.", units="m/s^2", axes=["X", "Y", "Z"] , shape=(3,), dtype=torch.float32)) def projected_gravity(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg("robot")) -> torch.Tensor: """Gravity projection on the asset's root frame.""" # extract the used quantities (to enable type-hinting) @@ -58,6 +90,7 @@ def projected_gravity(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEnt return asset.data.projected_gravity_b +@root_state_io_descriptor(RootStateIODescriptor(description="Root body position in the world frame.", units="m", axes=["X", "Y", "Z"], shape=(3,), dtype=torch.float32)) def root_pos_w(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg("robot")) -> torch.Tensor: """Asset root position in the environment frame.""" # extract the used quantities (to enable type-hinting) @@ -65,6 +98,7 @@ def root_pos_w(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg( return asset.data.root_pos_w - env.scene.env_origins +@root_state_io_descriptor(RootStateIODescriptor(description="Root body orientation in the world frame.", units="unit", axes=["W", "X", "Y", "Z"], shape=(4,), dtype=torch.float32)) def root_quat_w( env: ManagerBasedEnv, make_quat_unique: bool = False, asset_cfg: SceneEntityCfg = SceneEntityCfg("robot") ) -> torch.Tensor: @@ -82,6 +116,7 @@ def root_quat_w( return math_utils.quat_unique(quat) if make_quat_unique else quat +@root_state_io_descriptor(RootStateIODescriptor(description="Root body linear velocity in the world frame.", units="m/s", axes=["X", "Y", "Z"], shape=(3,), dtype=torch.float32)) def root_lin_vel_w(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg("robot")) -> torch.Tensor: """Asset root linear velocity in the environment frame.""" # extract the used quantities (to enable type-hinting) @@ -89,6 +124,7 @@ def root_lin_vel_w(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntity return asset.data.root_lin_vel_w +@root_state_io_descriptor(RootStateIODescriptor(description="Root body angular velocity in the world frame.", units="rad/s", axes=["X", "Y", "Z"], shape=(3,), dtype=torch.float32)) def root_ang_vel_w(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg("robot")) -> torch.Tensor: """Asset root angular velocity in the environment frame.""" # extract the used quantities (to enable type-hinting) @@ -100,7 +136,30 @@ def root_ang_vel_w(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntity Body state """ +@configclass +class BodyStateIODescriptor(IODescriptor): + observation_type: str = "BodyState" + body_ids: list[int] | int = [] + body_names: list[str] | str = [] + +def body_state_io_descriptor(descriptor: BodyStateIODescriptor, inspect: bool = False): + def decorator(func): + descriptor.name = func.__name__ + def wrapper(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg("robot")): + if inspect: + out = func(env, asset_cfg) + descriptor.shape = (out.shape[-1],) + descriptor.body_ids = asset_cfg.body_ids + descriptor.body_names = asset_cfg.body_names + return out + else: + return func(env, asset_cfg) + wrapper._has_descriptor = True + wrapper._descriptor = descriptor + return wrapper + return decorator +@body_state_io_descriptor(BodyStateIODescriptor(description="The flattened body poses of the robot in the world frame. The output shape is 7 * num_bodies", dtype=torch.float32)) def body_pose_w( env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg("robot"), @@ -124,6 +183,7 @@ def body_pose_w( return pose.reshape(env.num_envs, -1) +@body_state_io_descriptor(BodyStateIODescriptor(description="The direction of gravity projected on to bodies own frames. The output shape is 3 * num_bodies", dtype=torch.float32)) def body_projected_gravity_b( env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg("robot"), @@ -151,8 +211,35 @@ def body_projected_gravity_b( """ Joint state. """ +@configclass +class JointStateIODescriptor(IODescriptor): + observation_type: str = "JointState" + joint_ids: list[int] | int = [] + joint_names: list[str] | str = [] + +def joint_state_io_descriptor(descriptor: JointStateIODescriptor): + def decorator(func): + descriptor.name = func.__name__ + def wrapper(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg("robot"), inspect: bool = False): + if inspect: + out = func(env, asset_cfg) + descriptor.shape = (out.shape[-1],) + descriptor.joint_ids = asset_cfg.joint_ids + descriptor.joint_names = asset_cfg.joint_names + + if descriptor.joint_names is None: + asset: Articulation = env.scene[asset_cfg.name] + descriptor.joint_names = asset.joint_names + descriptor.joint_ids = list(range(len(asset.joint_names))) + return out + else: + return func(env, asset_cfg) + wrapper._has_descriptor = True + wrapper._descriptor = descriptor + return wrapper + return decorator - +@joint_state_io_descriptor(JointStateIODescriptor(description="The joint positions of the asset. The output shape is num_joints.", dtype=torch.float32)) def joint_pos(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg("robot")) -> torch.Tensor: """The joint positions of the asset. @@ -163,6 +250,7 @@ def joint_pos(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg(" return asset.data.joint_pos[:, asset_cfg.joint_ids] +@joint_state_io_descriptor(JointStateIODescriptor(description="The joint positions of the asset w.r.t. the default joint positions. The output shape is num_joints", dtype=torch.float32)) def joint_pos_rel(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg("robot")) -> torch.Tensor: """The joint positions of the asset w.r.t. the default joint positions. @@ -173,6 +261,7 @@ def joint_pos_rel(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityC return asset.data.joint_pos[:, asset_cfg.joint_ids] - asset.data.default_joint_pos[:, asset_cfg.joint_ids] +@joint_state_io_descriptor(JointStateIODescriptor(description="The joint positions of the asset normalized with the asset's joint limits. The output shape is num_joints", dtype=torch.float32)) def joint_pos_limit_normalized( env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg("robot") ) -> torch.Tensor: @@ -188,7 +277,7 @@ def joint_pos_limit_normalized( asset.data.soft_joint_pos_limits[:, asset_cfg.joint_ids, 1], ) - +@joint_state_io_descriptor(JointStateIODescriptor(description="The joint velocities of the asset. The output shape is num_joints", dtype=torch.float32)) def joint_vel(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg("robot")): """The joint velocities of the asset. @@ -198,7 +287,7 @@ def joint_vel(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg(" asset: Articulation = env.scene[asset_cfg.name] return asset.data.joint_vel[:, asset_cfg.joint_ids] - +@joint_state_io_descriptor(JointStateIODescriptor(description="The joint velocities of the asset w.r.t. the default joint velocities. The output shape is num_joints", dtype=torch.float32)) def joint_vel_rel(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg("robot")): """The joint velocities of the asset w.r.t. the default joint velocities. @@ -208,7 +297,7 @@ def joint_vel_rel(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityC asset: Articulation = env.scene[asset_cfg.name] return asset.data.joint_vel[:, asset_cfg.joint_ids] - asset.data.default_joint_vel[:, asset_cfg.joint_ids] - +@joint_state_io_descriptor(JointStateIODescriptor(description="The joint applied effort of the robot. The output shape is num_joints", dtype=torch.float32)) def joint_effort(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg("robot")) -> torch.Tensor: """The joint applied effort of the robot. @@ -578,6 +667,25 @@ def _inference(model, images: torch.Tensor) -> torch.Tensor: """ +@configclass +class ActionIODescriptor(IODescriptor): + observation_type: str = "Action" + +def root_state_io_descriptor(descriptor: RootStateIODescriptor): + def decorator(func): + def wrapper(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg("robot"), inspect: bool = False): + if inspect: + out = func(env, asset_cfg) + descriptor.shape = (out.shape[-1],) + return out + else: + return func(env, asset_cfg) + descriptor.name = func.__name__ + wrapper._has_descriptor = True + wrapper._descriptor = descriptor + return wrapper + return decorator + def last_action(env: ManagerBasedEnv, action_name: str | None = None) -> torch.Tensor: """The last input action to the environment. From 0a9455c6db9d750e39ce1a076515804c8c4258af Mon Sep 17 00:00:00 2001 From: Antoine Richard Date: Wed, 21 May 2025 12:01:29 +0200 Subject: [PATCH 02/16] It sort of works, but it's messy. Maybe we should use latch on to the resolve function to get more data out of the term. --- .../isaaclab/envs/mdp/observations.py | 18 ++++++---- .../isaaclab/managers/manager_base.py | 19 +++++++++-- .../isaaclab/managers/observation_manager.py | 34 +++++++++++++++++++ 3 files changed, 61 insertions(+), 10 deletions(-) diff --git a/source/isaaclab/isaaclab/envs/mdp/observations.py b/source/isaaclab/isaaclab/envs/mdp/observations.py index 33a80d36398..b4b99fb1538 100644 --- a/source/isaaclab/isaaclab/envs/mdp/observations.py +++ b/source/isaaclab/isaaclab/envs/mdp/observations.py @@ -668,24 +668,27 @@ def _inference(model, images: torch.Tensor) -> torch.Tensor: @configclass -class ActionIODescriptor(IODescriptor): - observation_type: str = "Action" +class GenericIODescriptor(IODescriptor): + observation_type: str = None + -def root_state_io_descriptor(descriptor: RootStateIODescriptor): +def generic_io_descriptor(descriptor: GenericIODescriptor): def decorator(func): - def wrapper(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg("robot"), inspect: bool = False): + def wrapper(env: ManagerBasedEnv, *args, inspect: bool = False, **kwargs): if inspect: - out = func(env, asset_cfg) + out = func(env, *args, **kwargs) descriptor.shape = (out.shape[-1],) return out else: - return func(env, asset_cfg) + return func(env, *args, **kwargs) descriptor.name = func.__name__ wrapper._has_descriptor = True wrapper._descriptor = descriptor return wrapper return decorator + +@generic_io_descriptor(GenericIODescriptor(description="The last input action to the environment.", dtype=torch.float32, observation_type="Action")) def last_action(env: ManagerBasedEnv, action_name: str | None = None) -> torch.Tensor: """The last input action to the environment. @@ -703,7 +706,8 @@ def last_action(env: ManagerBasedEnv, action_name: str | None = None) -> torch.T """ -def generated_commands(env: ManagerBasedRLEnv, command_name: str) -> torch.Tensor: +@generic_io_descriptor(GenericIODescriptor(description="The generated command term in the command manager.", dtype=torch.float32, observation_type="Command")) +def generated_commands(env: ManagerBasedRLEnv, command_name: str | None = None) -> torch.Tensor: """The generated command from command term in the command manager with the given name.""" return env.command_manager.get_command(command_name) diff --git a/source/isaaclab/isaaclab/managers/manager_base.py b/source/isaaclab/isaaclab/managers/manager_base.py index 081c3e271ec..14d198eb659 100644 --- a/source/isaaclab/isaaclab/managers/manager_base.py +++ b/source/isaaclab/isaaclab/managers/manager_base.py @@ -359,11 +359,24 @@ def _resolve_common_term_cfg(self, term_name: str, term_cfg: ManagerTermBaseCfg, args = inspect.signature(func_static).parameters args_with_defaults = [arg for arg in args if args[arg].default is not inspect.Parameter.empty] args_without_defaults = [arg for arg in args if args[arg].default is inspect.Parameter.empty] - args = args_without_defaults + args_with_defaults # ignore first two arguments for env and env_ids - # Think: Check for cases when kwargs are set inside the function? + # Think: Check for cases when kwargs are set inside the function? + if "kwargs" in args_without_defaults: + args_without_defaults.remove("kwargs") + args_with_defaults.append("kwargs") + args = args_without_defaults + args_with_defaults + + print("args", args) + print("args_without_defaults", args_without_defaults) + print("args_with_defaults", args_with_defaults) + print("min_argc", min_argc) + print("term_params", term_params) + print("term_cfg", term_cfg) + if len(args) > min_argc: - if set(args[min_argc:]) != set(term_params + args_with_defaults): + if "kwargs" in args: + pass + elif set(args[min_argc:]) != set(term_params + args_with_defaults): raise ValueError( f"The term '{term_name}' expects mandatory parameters: {args_without_defaults[min_argc:]}" f" and optional parameters: {args_with_defaults}, but received: {term_params}." diff --git a/source/isaaclab/isaaclab/managers/observation_manager.py b/source/isaaclab/isaaclab/managers/observation_manager.py index 3524260b241..7686ab53999 100644 --- a/source/isaaclab/isaaclab/managers/observation_manager.py +++ b/source/isaaclab/isaaclab/managers/observation_manager.py @@ -225,6 +225,40 @@ def group_obs_concatenate(self) -> dict[str, bool]: """ return self._group_obs_concatenate + @property + def get_IO_descriptors(self): + """Get the IO descriptors for the observation manager. + + Returns: + A dictionary with keys as the group names and values as the IO descriptors. + """ + + data = [] + + for group_name in self._group_obs_term_names: + # check ig group name is valid + if group_name not in self._group_obs_term_names: + raise ValueError( + f"Unable to find the group '{group_name}' in the observation manager." + f" Available groups are: {list(self._group_obs_term_names.keys())}" + ) + # iterate over all the terms in each group + group_term_names = self._group_obs_term_names[group_name] + # buffer to store obs per group + group_obs = dict.fromkeys(group_term_names, None) + # read attributes for each term + obs_terms = zip(group_term_names, self._group_obs_term_cfgs[group_name]) + + for term_name, term_cfg in obs_terms: + # dummy call to cache some values + try: + term_cfg.func(self._env, **term_cfg.params, inspect=True) + data.append(term_cfg.func._descriptor) + except Exception as e: + print(f"Error getting IO descriptor for term '{term_name}' in group '{group_name}': {e}") + + return data + """ Operations. """ From a4d8c60cfb9ce76eb922badc707b324c3639b5b0 Mon Sep 17 00:00:00 2001 From: Antoine Richard Date: Tue, 10 Jun 2025 14:55:56 +0200 Subject: [PATCH 03/16] Added a new auto-magik decorator that should help streamline the process. There are some caveats so we'll need good doc for it --- .../isaaclab/envs/mdp/observations.py | 278 +++++++++++------- 1 file changed, 172 insertions(+), 106 deletions(-) diff --git a/source/isaaclab/isaaclab/envs/mdp/observations.py b/source/isaaclab/isaaclab/envs/mdp/observations.py index b4b99fb1538..82909ca9588 100644 --- a/source/isaaclab/isaaclab/envs/mdp/observations.py +++ b/source/isaaclab/isaaclab/envs/mdp/observations.py @@ -11,6 +11,8 @@ from __future__ import annotations +import functools +import inspect import torch from typing import TYPE_CHECKING @@ -27,38 +29,176 @@ from isaaclab.utils import configclass from dataclasses import MISSING + +import dataclasses, functools, inspect +from typing import Any, Callable, ParamSpec, TypeVar, Concatenate + @configclass class IODescriptor: mdp_type: str = "Observation" name: str = None + full_path: str = None description: str = None shape: tuple[int, ...] = None dtype: torch.dtype = None observation_type: str = None - -""" -Root state. -""" @configclass -class RootStateIODescriptor(IODescriptor): - observation_type: str = "RootState" - axes: list[str] = None - units: str = None +class GenericIODescriptor: + mdp_type: str = "Observation" + name: str = None + full_path: str = None + description: str = None + shape: tuple[int, ...] = None + dtype: torch.dtype = None + observation_type: str = None + extras: dict[str, Any] = None + +# These are defined to help with type hinting +P = ParamSpec("P") +R = TypeVar("R") + +# Automatically builds a descriptor from the kwargs +def _make_descriptor(**kwargs: Any) -> "GenericIODescriptor": + """Split *kwargs* into (known dataclass fields) and (extras).""" + field_names = {f.name for f in dataclasses.fields(GenericIODescriptor)} + known = {k: v for k, v in kwargs.items() if k in field_names} + extras = {k: v for k, v in kwargs.items() if k not in field_names} + + desc = GenericIODescriptor(**known) + # User defined extras are stored in the descriptor under the `extras` field + desc.extras = extras + return desc + +# Decorator factory for generic IO descriptors. +def generic_io_descriptor( + _func: Callable[Concatenate[ManagerBasedEnv, P], R] | None = None, + *, + on_inspect: Callable[..., Any] | list[Callable[..., Any]] | None = None, + **descriptor_kwargs: Any, +) -> Callable[[Callable[Concatenate[ManagerBasedEnv, P], R]], + Callable[Concatenate[ManagerBasedEnv, P], R]]: + """ + Decorator factory for generic IO descriptors. + + This decorator can be used in different ways: + 1. The default decorator has all the information I need for my use case: + ..code-block:: python + @generic_io_descriptor(GenericIODescriptor(description="..", dtype="..")) + def my_func(env: ManagerBasedEnv, *args, **kwargs): + ... + ..note:: If description is not set, the function's docstring is used to populate it. + + 2. I need to add more information to the descriptor: + ..code-block:: python + @generic_io_descriptor(description="..", new_var_1="a", new_var_2="b") + def my_func(env: ManagerBasedEnv, *args, **kwargs): + ... + 3. I need to add a hook to the descriptor: + ..code-block:: python + def record_shape(tensor: torch.Tensor, desc: GenericIODescriptor): + desc.shape = (tensor.shape[-1],) + + @generic_io_descriptor(description="..", new_var_1="a", new_var_2="b", on_inspect=[record_shape]) + def my_func(env: ManagerBasedEnv, *args, **kwargs): + ..note:: The hook is called after the function is called, if and only if the `inspect` flag is set when calling the function. + For example: + ..code-block:: python + my_func(env, inspect=True) + + Args: + _func: The function to decorate. + **descriptor_kwargs: Keyword arguments to pass to the descriptor. + + Returns: + A decorator that can be used to decorate a function. + """ + if _func is not None and isinstance(_func, GenericIODescriptor): + descriptor = _func + _func = None + else: + descriptor = _make_descriptor(**descriptor_kwargs) -def root_state_io_descriptor(descriptor: RootStateIODescriptor): - def decorator(func): - def wrapper(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg("robot"), inspect: bool = False): - return func(env, asset_cfg) + # Ensures the hook is a list + if callable(on_inspect): + inspect_hooks: list[Callable[..., Any]] = [on_inspect] + else: + inspect_hooks: list[Callable[..., Any]] = list(on_inspect or []) # handles None + + def _apply( + func: Callable[Concatenate[ManagerBasedEnv, P], R] + ) -> Callable[Concatenate[ManagerBasedEnv, P], R]: + + # Capture the signature of the function + sig = inspect.signature(func) + + @functools.wraps(func) + def wrapper( + env: ManagerBasedEnv, *args: P.args, **kwargs: P.kwargs + ) -> R: + inspect_flag: bool = kwargs.pop("inspect", False) + out = func(env, *args, **kwargs) + if inspect_flag: + # Injects the function's arguments into the hooks and applies the defaults + bound = sig.bind(env, *args, **kwargs) + bound.apply_defaults() + call_kwargs = { + "output": out, + "descriptor": descriptor, + **bound.arguments, + } + for hook in inspect_hooks: + hook(**call_kwargs) + return out + + # --- Descriptor bookkeeping --- descriptor.name = func.__name__ - wrapper._has_descriptor = True + descriptor.full_path = f"{func.__module__}.{func.__name__}" + descriptor.dtype = str(descriptor.dtype) + # Check if description is set in the descriptor + if descriptor.description is None: + descriptor.description = func.__doc__ + + # Adds the descriptor to the wrapped function as an attribute wrapper._descriptor = descriptor + wrapper._has_descriptor = True + # Alters the signature of the wrapped function to make it match the original function. + # This allows the wrapped functions to pass the checks in the managers. + wrapper.__signature__ = sig return wrapper - return decorator + # If the decorator is used without parentheses, _func will be the function itself. + if callable(_func): + return _apply(_func) + return _apply + + +def record_shape(output: torch.Tensor, descriptor: GenericIODescriptor, **kwargs): + descriptor.shape = (output.shape[-1],) + +def record_dtype(output: torch.Tensor, descriptor: GenericIODescriptor, **kwargs): + descriptor.dtype = str(output.dtype) + +def record_joint_names(output: torch.Tensor, descriptor: GenericIODescriptor, **kwargs): + asset: Articulation = kwargs["env"].scene[kwargs["asset_cfg"].name] + joint_ids = kwargs["asset_cfg"].joint_ids + if joint_ids == slice(None, None, None): + joint_ids = list(range(len(asset.joint_names))) + descriptor.joint_names = [asset.joint_names[i] for i in joint_ids] + +def record_body_names(output: torch.Tensor, descriptor: GenericIODescriptor, **kwargs): + asset: Articulation = kwargs["env"].scene[kwargs["asset_cfg"].name] + body_ids = kwargs["asset_cfg"].body_ids + if body_ids == slice(None, None, None): + body_ids = list(range(len(asset.body_names))) + descriptor.body_names = [asset.body_names[i] for i in body_ids] + +""" +Root state. +""" -@root_state_io_descriptor(RootStateIODescriptor(description="Root height in the world frame.", units="m", axes=["Z"], shape=(1,), dtype=torch.float32)) +@generic_io_descriptor(units="m", axes=["Z"], observation_type="RootState", on_inspect=[record_shape, record_dtype]) def base_pos_z(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg("robot")) -> torch.Tensor: """Root height in the simulation world frame.""" # extract the used quantities (to enable type-hinting) @@ -66,7 +206,7 @@ def base_pos_z(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg( return asset.data.root_pos_w[:, 2].unsqueeze(-1) -@root_state_io_descriptor(RootStateIODescriptor(description="Root linear velocity in the robot's frame.", units="m/s", axes=["X", "Y", "Z"], shape=(3,), dtype=torch.float32)) +@generic_io_descriptor(units="m/s", axes=["X", "Y", "Z"], observation_type="RootState", on_inspect=[record_shape, record_dtype]) def base_lin_vel(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg("robot")) -> torch.Tensor: """Root linear velocity in the asset's root frame.""" # extract the used quantities (to enable type-hinting) @@ -74,7 +214,7 @@ def base_lin_vel(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityCf return asset.data.root_lin_vel_b -@root_state_io_descriptor(RootStateIODescriptor(description="Root angular velocity in the robot's frame.", units="rad/s", axes=["X", "Y", "Z"], shape=(3,), dtype=torch.float32)) +@generic_io_descriptor(units="rad/s", axes=["X", "Y", "Z"], observation_type="RootState", on_inspect=[record_shape, record_dtype]) def base_ang_vel(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg("robot")) -> torch.Tensor: """Root angular velocity in the asset's root frame.""" # extract the used quantities (to enable type-hinting) @@ -82,7 +222,7 @@ def base_ang_vel(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityCf return asset.data.root_ang_vel_b -@root_state_io_descriptor(RootStateIODescriptor(description="Projection of gravity in the robot's root frame.", units="m/s^2", axes=["X", "Y", "Z"] , shape=(3,), dtype=torch.float32)) +@generic_io_descriptor(units="m/s^2", axes=["X", "Y", "Z"], observation_type="RootState", on_inspect=[record_shape, record_dtype]) def projected_gravity(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg("robot")) -> torch.Tensor: """Gravity projection on the asset's root frame.""" # extract the used quantities (to enable type-hinting) @@ -90,7 +230,7 @@ def projected_gravity(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEnt return asset.data.projected_gravity_b -@root_state_io_descriptor(RootStateIODescriptor(description="Root body position in the world frame.", units="m", axes=["X", "Y", "Z"], shape=(3,), dtype=torch.float32)) +@generic_io_descriptor(units="m", axes=["X", "Y", "Z"], observation_type="RootState", on_inspect=[record_shape, record_dtype]) def root_pos_w(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg("robot")) -> torch.Tensor: """Asset root position in the environment frame.""" # extract the used quantities (to enable type-hinting) @@ -98,7 +238,7 @@ def root_pos_w(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg( return asset.data.root_pos_w - env.scene.env_origins -@root_state_io_descriptor(RootStateIODescriptor(description="Root body orientation in the world frame.", units="unit", axes=["W", "X", "Y", "Z"], shape=(4,), dtype=torch.float32)) +@generic_io_descriptor(units="unit", axes=["W", "X", "Y", "Z"], observation_type="RootState", on_inspect=[record_shape, record_dtype]) def root_quat_w( env: ManagerBasedEnv, make_quat_unique: bool = False, asset_cfg: SceneEntityCfg = SceneEntityCfg("robot") ) -> torch.Tensor: @@ -116,7 +256,7 @@ def root_quat_w( return math_utils.quat_unique(quat) if make_quat_unique else quat -@root_state_io_descriptor(RootStateIODescriptor(description="Root body linear velocity in the world frame.", units="m/s", axes=["X", "Y", "Z"], shape=(3,), dtype=torch.float32)) +@generic_io_descriptor(units="m/s", axes=["X", "Y", "Z"], observation_type="RootState", on_inspect=[record_shape, record_dtype]) def root_lin_vel_w(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg("robot")) -> torch.Tensor: """Asset root linear velocity in the environment frame.""" # extract the used quantities (to enable type-hinting) @@ -124,7 +264,7 @@ def root_lin_vel_w(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntity return asset.data.root_lin_vel_w -@root_state_io_descriptor(RootStateIODescriptor(description="Root body angular velocity in the world frame.", units="rad/s", axes=["X", "Y", "Z"], shape=(3,), dtype=torch.float32)) +@generic_io_descriptor(units="rad/s", axes=["X", "Y", "Z"], observation_type="RootState", on_inspect=[record_shape, record_dtype]) def root_ang_vel_w(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg("robot")) -> torch.Tensor: """Asset root angular velocity in the environment frame.""" # extract the used quantities (to enable type-hinting) @@ -136,30 +276,7 @@ def root_ang_vel_w(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntity Body state """ -@configclass -class BodyStateIODescriptor(IODescriptor): - observation_type: str = "BodyState" - body_ids: list[int] | int = [] - body_names: list[str] | str = [] - -def body_state_io_descriptor(descriptor: BodyStateIODescriptor, inspect: bool = False): - def decorator(func): - descriptor.name = func.__name__ - def wrapper(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg("robot")): - if inspect: - out = func(env, asset_cfg) - descriptor.shape = (out.shape[-1],) - descriptor.body_ids = asset_cfg.body_ids - descriptor.body_names = asset_cfg.body_names - return out - else: - return func(env, asset_cfg) - wrapper._has_descriptor = True - wrapper._descriptor = descriptor - return wrapper - return decorator - -@body_state_io_descriptor(BodyStateIODescriptor(description="The flattened body poses of the robot in the world frame. The output shape is 7 * num_bodies", dtype=torch.float32)) +@generic_io_descriptor(observation_type="BodyState", body_names=None, on_inspect=[record_shape, record_dtype, record_body_names]) def body_pose_w( env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg("robot"), @@ -183,7 +300,7 @@ def body_pose_w( return pose.reshape(env.num_envs, -1) -@body_state_io_descriptor(BodyStateIODescriptor(description="The direction of gravity projected on to bodies own frames. The output shape is 3 * num_bodies", dtype=torch.float32)) +@generic_io_descriptor(observation_type="BodyState", body_names=None, on_inspect=[record_shape, record_dtype, record_body_names]) def body_projected_gravity_b( env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg("robot"), @@ -211,35 +328,7 @@ def body_projected_gravity_b( """ Joint state. """ -@configclass -class JointStateIODescriptor(IODescriptor): - observation_type: str = "JointState" - joint_ids: list[int] | int = [] - joint_names: list[str] | str = [] - -def joint_state_io_descriptor(descriptor: JointStateIODescriptor): - def decorator(func): - descriptor.name = func.__name__ - def wrapper(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg("robot"), inspect: bool = False): - if inspect: - out = func(env, asset_cfg) - descriptor.shape = (out.shape[-1],) - descriptor.joint_ids = asset_cfg.joint_ids - descriptor.joint_names = asset_cfg.joint_names - - if descriptor.joint_names is None: - asset: Articulation = env.scene[asset_cfg.name] - descriptor.joint_names = asset.joint_names - descriptor.joint_ids = list(range(len(asset.joint_names))) - return out - else: - return func(env, asset_cfg) - wrapper._has_descriptor = True - wrapper._descriptor = descriptor - return wrapper - return decorator - -@joint_state_io_descriptor(JointStateIODescriptor(description="The joint positions of the asset. The output shape is num_joints.", dtype=torch.float32)) +@generic_io_descriptor(observation_type="JointState", joint_names=None, on_inspect=[record_joint_names, record_dtype, record_shape]) def joint_pos(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg("robot")) -> torch.Tensor: """The joint positions of the asset. @@ -250,7 +339,7 @@ def joint_pos(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg(" return asset.data.joint_pos[:, asset_cfg.joint_ids] -@joint_state_io_descriptor(JointStateIODescriptor(description="The joint positions of the asset w.r.t. the default joint positions. The output shape is num_joints", dtype=torch.float32)) +@generic_io_descriptor(observation_type="JointState", joint_names=None, on_inspect=[record_joint_names, record_dtype, record_shape]) def joint_pos_rel(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg("robot")) -> torch.Tensor: """The joint positions of the asset w.r.t. the default joint positions. @@ -261,7 +350,7 @@ def joint_pos_rel(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityC return asset.data.joint_pos[:, asset_cfg.joint_ids] - asset.data.default_joint_pos[:, asset_cfg.joint_ids] -@joint_state_io_descriptor(JointStateIODescriptor(description="The joint positions of the asset normalized with the asset's joint limits. The output shape is num_joints", dtype=torch.float32)) +@generic_io_descriptor(observation_type="JointState", joint_names=None, on_inspect=[record_joint_names, record_dtype, record_shape]) def joint_pos_limit_normalized( env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg("robot") ) -> torch.Tensor: @@ -277,7 +366,7 @@ def joint_pos_limit_normalized( asset.data.soft_joint_pos_limits[:, asset_cfg.joint_ids, 1], ) -@joint_state_io_descriptor(JointStateIODescriptor(description="The joint velocities of the asset. The output shape is num_joints", dtype=torch.float32)) +@generic_io_descriptor(observation_type="JointState", joint_names=None, on_inspect=[record_joint_names, record_dtype, record_shape]) def joint_vel(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg("robot")): """The joint velocities of the asset. @@ -287,7 +376,7 @@ def joint_vel(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg(" asset: Articulation = env.scene[asset_cfg.name] return asset.data.joint_vel[:, asset_cfg.joint_ids] -@joint_state_io_descriptor(JointStateIODescriptor(description="The joint velocities of the asset w.r.t. the default joint velocities. The output shape is num_joints", dtype=torch.float32)) +@generic_io_descriptor(observation_type="JointState", joint_names=None, on_inspect=[record_joint_names, record_dtype, record_shape]) def joint_vel_rel(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg("robot")): """The joint velocities of the asset w.r.t. the default joint velocities. @@ -297,7 +386,7 @@ def joint_vel_rel(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityC asset: Articulation = env.scene[asset_cfg.name] return asset.data.joint_vel[:, asset_cfg.joint_ids] - asset.data.default_joint_vel[:, asset_cfg.joint_ids] -@joint_state_io_descriptor(JointStateIODescriptor(description="The joint applied effort of the robot. The output shape is num_joints", dtype=torch.float32)) +@generic_io_descriptor(observation_type="JointState", joint_names=None, on_inspect=[record_joint_names, record_dtype, record_shape]) def joint_effort(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg("robot")) -> torch.Tensor: """The joint applied effort of the robot. @@ -666,29 +755,7 @@ def _inference(model, images: torch.Tensor) -> torch.Tensor: Actions. """ - -@configclass -class GenericIODescriptor(IODescriptor): - observation_type: str = None - - -def generic_io_descriptor(descriptor: GenericIODescriptor): - def decorator(func): - def wrapper(env: ManagerBasedEnv, *args, inspect: bool = False, **kwargs): - if inspect: - out = func(env, *args, **kwargs) - descriptor.shape = (out.shape[-1],) - return out - else: - return func(env, *args, **kwargs) - descriptor.name = func.__name__ - wrapper._has_descriptor = True - wrapper._descriptor = descriptor - return wrapper - return decorator - - -@generic_io_descriptor(GenericIODescriptor(description="The last input action to the environment.", dtype=torch.float32, observation_type="Action")) +@generic_io_descriptor(dtype=torch.float32, observation_type="Action", on_inspect=[record_shape]) def last_action(env: ManagerBasedEnv, action_name: str | None = None) -> torch.Tensor: """The last input action to the environment. @@ -705,8 +772,7 @@ def last_action(env: ManagerBasedEnv, action_name: str | None = None) -> torch.T Commands. """ - -@generic_io_descriptor(GenericIODescriptor(description="The generated command term in the command manager.", dtype=torch.float32, observation_type="Command")) +@generic_io_descriptor(dtype=torch.float32, observation_type="Command", on_inspect=[record_shape]) def generated_commands(env: ManagerBasedRLEnv, command_name: str | None = None) -> torch.Tensor: """The generated command from command term in the command manager with the given name.""" return env.command_manager.get_command(command_name) From 603f9df98895bbd1f5642275ba9ee47e44834ebb Mon Sep 17 00:00:00 2001 From: Antoine Richard Date: Tue, 10 Jun 2025 14:56:51 +0200 Subject: [PATCH 04/16] removed the custom logic used to bypass the checks inside the managers. The new decorator can mock its signature, so we don't need that anymore --- .../isaaclab/managers/manager_base.py | 21 ++++--------------- 1 file changed, 4 insertions(+), 17 deletions(-) diff --git a/source/isaaclab/isaaclab/managers/manager_base.py b/source/isaaclab/isaaclab/managers/manager_base.py index 14d198eb659..37cb0454f1b 100644 --- a/source/isaaclab/isaaclab/managers/manager_base.py +++ b/source/isaaclab/isaaclab/managers/manager_base.py @@ -353,30 +353,17 @@ def _resolve_common_term_cfg(self, term_name: str, term_cfg: ManagerTermBaseCfg, # check if function is callable if not callable(func_static): raise AttributeError(f"The term '{term_name}' is not callable. Received: {term_cfg.func}") - + # check statically if the term's arguments are matched by params term_params = list(term_cfg.params.keys()) args = inspect.signature(func_static).parameters args_with_defaults = [arg for arg in args if args[arg].default is not inspect.Parameter.empty] args_without_defaults = [arg for arg in args if args[arg].default is inspect.Parameter.empty] - # ignore first two arguments for env and env_ids - # Think: Check for cases when kwargs are set inside the function? - if "kwargs" in args_without_defaults: - args_without_defaults.remove("kwargs") - args_with_defaults.append("kwargs") args = args_without_defaults + args_with_defaults - - print("args", args) - print("args_without_defaults", args_without_defaults) - print("args_with_defaults", args_with_defaults) - print("min_argc", min_argc) - print("term_params", term_params) - print("term_cfg", term_cfg) - + # ignore first two arguments for env and env_ids + # Think: Check for cases when kwargs are set inside the function? if len(args) > min_argc: - if "kwargs" in args: - pass - elif set(args[min_argc:]) != set(term_params + args_with_defaults): + if set(args[min_argc:]) != set(term_params + args_with_defaults): raise ValueError( f"The term '{term_name}' expects mandatory parameters: {args_without_defaults[min_argc:]}" f" and optional parameters: {args_with_defaults}, but received: {term_params}." From 267ad3371c29fc6eaed62035ec47d6816abe9a95 Mon Sep 17 00:00:00 2001 From: Antoine Richard Date: Tue, 10 Jun 2025 14:57:27 +0200 Subject: [PATCH 05/16] added ugly temporary logic to handle the YAML convertion. Need to clean that up a bit --- .../isaaclab/managers/observation_manager.py | 32 +++++++++++++++++-- 1 file changed, 30 insertions(+), 2 deletions(-) diff --git a/source/isaaclab/isaaclab/managers/observation_manager.py b/source/isaaclab/isaaclab/managers/observation_manager.py index 7686ab53999..85735faf060 100644 --- a/source/isaaclab/isaaclab/managers/observation_manager.py +++ b/source/isaaclab/isaaclab/managers/observation_manager.py @@ -253,12 +253,40 @@ def get_IO_descriptors(self): # dummy call to cache some values try: term_cfg.func(self._env, **term_cfg.params, inspect=True) - data.append(term_cfg.func._descriptor) + desc = term_cfg.func._descriptor.__dict__.copy() + print(f"desc: {desc}") + overloads = {} + for k,v in term_cfg.__dict__.items(): + if k in ["modifiers", "clip", "scale", "history_length", "flatten_history_dim"]: + overloads[k] = v + desc.update(overloads) + data.append(desc) except Exception as e: print(f"Error getting IO descriptor for term '{term_name}' in group '{group_name}': {e}") - return data + # Format the data for YAML export + formatted_data = {} + for item in data: + name = item.pop("name") + formatted_item = {} + formatted_item["overloads"] = {} + formatted_item["extras"] = {} + for k,v in item.items(): + # Check if v is a tuple and convert to list + if isinstance(v, tuple): + v = list(v) + if k in ["scale", "clip", "history_length", "flatten_history_dim"]: + formatted_item["overloads"][k] = v + elif k in ["modifiers", "noise", "description", "units"]: + formatted_item["extras"][k] = v + else: + formatted_item[k] = v + + + formatted_data[name] = formatted_item + return formatted_data + """ Operations. """ From 05c8b08f372005ca2107497d087848aa0a64a669 Mon Sep 17 00:00:00 2001 From: Antoine Richard Date: Tue, 10 Jun 2025 14:57:55 +0200 Subject: [PATCH 06/16] added ugly temporary logic to handle the YAML convertion. Need to clean that up a bit --- scripts/environments/random_agent.py | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/scripts/environments/random_agent.py b/scripts/environments/random_agent.py index 9422680e992..b8e7ffee7d4 100644 --- a/scripts/environments/random_agent.py +++ b/scripts/environments/random_agent.py @@ -56,10 +56,16 @@ def main(): env.reset() out = env.unwrapped.get_IO_descriptors - for o in out: - print(f"--- Obs term: {o.name} ---") - for k, v in o.__dict__.items(): - print(f"{k}: {v}") + # Make a yaml file with the output + import yaml + with open("obs_descriptors.yaml", "w") as f: + yaml.safe_dump(out, f) + + for k, v in out.items(): + print(f"--- Obs term: {k} ---") + for k1, v1 in v.items(): + print(f"{k1}: {v1}") + exit(0) # simulate environment while simulation_app.is_running(): # run everything in inference mode From 3daf42763e1a75eb4776b030bdc0d32f229477a3 Mon Sep 17 00:00:00 2001 From: Antoine Richard Date: Tue, 10 Jun 2025 15:32:00 +0200 Subject: [PATCH 07/16] passing pre-commits --- scripts/environments/random_agent.py | 3 +- .../isaaclab/envs/manager_based_env.py | 2 +- .../isaaclab/envs/mdp/observations.py | 138 ++++++++++++------ .../isaaclab/managers/manager_base.py | 2 +- .../isaaclab/managers/observation_manager.py | 14 +- 5 files changed, 99 insertions(+), 60 deletions(-) diff --git a/scripts/environments/random_agent.py b/scripts/environments/random_agent.py index b8e7ffee7d4..ab877628620 100644 --- a/scripts/environments/random_agent.py +++ b/scripts/environments/random_agent.py @@ -47,8 +47,6 @@ def main(): # create environment env = gym.make(args_cli.task, cfg=env_cfg) - - # print info (this is vectorized environment) print(f"[INFO]: Gym observation space: {env.observation_space}") print(f"[INFO]: Gym action space: {env.action_space}") @@ -58,6 +56,7 @@ def main(): out = env.unwrapped.get_IO_descriptors # Make a yaml file with the output import yaml + with open("obs_descriptors.yaml", "w") as f: yaml.safe_dump(out, f) diff --git a/source/isaaclab/isaaclab/envs/manager_based_env.py b/source/isaaclab/isaaclab/envs/manager_based_env.py index 6e7d4e7a19b..6da245e3e7f 100644 --- a/source/isaaclab/isaaclab/envs/manager_based_env.py +++ b/source/isaaclab/isaaclab/envs/manager_based_env.py @@ -209,7 +209,7 @@ def step_dt(self) -> float: def device(self): """The device on which the environment is running.""" return self.sim.device - + @property def get_IO_descriptors(self): """Get the IO descriptors for the environment. diff --git a/source/isaaclab/isaaclab/envs/mdp/observations.py b/source/isaaclab/isaaclab/envs/mdp/observations.py index 82909ca9588..ab96597760d 100644 --- a/source/isaaclab/isaaclab/envs/mdp/observations.py +++ b/source/isaaclab/isaaclab/envs/mdp/observations.py @@ -14,7 +14,7 @@ import functools import inspect import torch -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Any, Concatenate, ParamSpec, TypeVar import isaaclab.utils.math as math_utils from isaaclab.assets import Articulation, RigidObject @@ -26,22 +26,11 @@ if TYPE_CHECKING: from isaaclab.envs import ManagerBasedEnv, ManagerBasedRLEnv -from isaaclab.utils import configclass -from dataclasses import MISSING - +import dataclasses +from collections.abc import Callable -import dataclasses, functools, inspect -from typing import Any, Callable, ParamSpec, TypeVar, Concatenate +from isaaclab.utils import configclass -@configclass -class IODescriptor: - mdp_type: str = "Observation" - name: str = None - full_path: str = None - description: str = None - shape: tuple[int, ...] = None - dtype: torch.dtype = None - observation_type: str = None @configclass class GenericIODescriptor: @@ -54,30 +43,32 @@ class GenericIODescriptor: observation_type: str = None extras: dict[str, Any] = None + # These are defined to help with type hinting P = ParamSpec("P") R = TypeVar("R") + # Automatically builds a descriptor from the kwargs -def _make_descriptor(**kwargs: Any) -> "GenericIODescriptor": +def _make_descriptor(**kwargs: Any) -> GenericIODescriptor: """Split *kwargs* into (known dataclass fields) and (extras).""" field_names = {f.name for f in dataclasses.fields(GenericIODescriptor)} - known = {k: v for k, v in kwargs.items() if k in field_names} - extras = {k: v for k, v in kwargs.items() if k not in field_names} + known = {k: v for k, v in kwargs.items() if k in field_names} + extras = {k: v for k, v in kwargs.items() if k not in field_names} desc = GenericIODescriptor(**known) # User defined extras are stored in the descriptor under the `extras` field desc.extras = extras return desc + # Decorator factory for generic IO descriptors. def generic_io_descriptor( _func: Callable[Concatenate[ManagerBasedEnv, P], R] | None = None, *, on_inspect: Callable[..., Any] | list[Callable[..., Any]] | None = None, **descriptor_kwargs: Any, -) -> Callable[[Callable[Concatenate[ManagerBasedEnv, P], R]], - Callable[Concatenate[ManagerBasedEnv, P], R]]: +) -> Callable[[Callable[Concatenate[ManagerBasedEnv, P], R]], Callable[Concatenate[ManagerBasedEnv, P], R]]: """ Decorator factory for generic IO descriptors. @@ -96,16 +87,32 @@ def my_func(env: ManagerBasedEnv, *args, **kwargs): ... 3. I need to add a hook to the descriptor: ..code-block:: python - def record_shape(tensor: torch.Tensor, desc: GenericIODescriptor): + def record_shape(tensor: torch.Tensor, desc: GenericIODescriptor, **kwargs): desc.shape = (tensor.shape[-1],) - - @generic_io_descriptor(description="..", new_var_1="a", new_var_2="b", on_inspect=[record_shape]) + + @generic_io_descriptor(description="..", new_var_1="a", new_var_2="b", on_inspect=[record_shape, record_dtype]) def my_func(env: ManagerBasedEnv, *args, **kwargs): ..note:: The hook is called after the function is called, if and only if the `inspect` flag is set when calling the function. + For example: ..code-block:: python my_func(env, inspect=True) - + + 4. I need to add a hook to the descriptor and this hook will write to a variable that is not part of the base descriptor. + ..code-block:: python + def record_joint_names(output: torch.Tensor, descriptor: GenericIODescriptor, **kwargs): + asset: Articulation = kwargs["env"].scene[kwargs["asset_cfg"].name] + joint_ids = kwargs["asset_cfg"].joint_ids + if joint_ids == slice(None, None, None): + joint_ids = list(range(len(asset.joint_names))) + descriptor.joint_names = [asset.joint_names[i] for i in joint_ids] + + @generic_io_descriptor(joint_names=None, new_var_1="a", new_var_2="b", on_inspect=[record_shape, record_dtype, record_joint_names]) + def my_func(env: ManagerBasedEnv, *args, **kwargs): + + ..note:: The hook can access all the variables in the wrapped function's signature. While it is useful, the user should be careful to + access only existing variables. + Args: _func: The function to decorate. **descriptor_kwargs: Keyword arguments to pass to the descriptor. @@ -126,17 +133,13 @@ def my_func(env: ManagerBasedEnv, *args, **kwargs): else: inspect_hooks: list[Callable[..., Any]] = list(on_inspect or []) # handles None - def _apply( - func: Callable[Concatenate[ManagerBasedEnv, P], R] - ) -> Callable[Concatenate[ManagerBasedEnv, P], R]: - + def _apply(func: Callable[Concatenate[ManagerBasedEnv, P], R]) -> Callable[Concatenate[ManagerBasedEnv, P], R]: + # Capture the signature of the function sig = inspect.signature(func) @functools.wraps(func) - def wrapper( - env: ManagerBasedEnv, *args: P.args, **kwargs: P.kwargs - ) -> R: + def wrapper(env: ManagerBasedEnv, *args: P.args, **kwargs: P.kwargs) -> R: inspect_flag: bool = kwargs.pop("inspect", False) out = func(env, *args, **kwargs) if inspect_flag: @@ -177,9 +180,11 @@ def wrapper( def record_shape(output: torch.Tensor, descriptor: GenericIODescriptor, **kwargs): descriptor.shape = (output.shape[-1],) + def record_dtype(output: torch.Tensor, descriptor: GenericIODescriptor, **kwargs): descriptor.dtype = str(output.dtype) + def record_joint_names(output: torch.Tensor, descriptor: GenericIODescriptor, **kwargs): asset: Articulation = kwargs["env"].scene[kwargs["asset_cfg"].name] joint_ids = kwargs["asset_cfg"].joint_ids @@ -187,6 +192,7 @@ def record_joint_names(output: torch.Tensor, descriptor: GenericIODescriptor, ** joint_ids = list(range(len(asset.joint_names))) descriptor.joint_names = [asset.joint_names[i] for i in joint_ids] + def record_body_names(output: torch.Tensor, descriptor: GenericIODescriptor, **kwargs): asset: Articulation = kwargs["env"].scene[kwargs["asset_cfg"].name] body_ids = kwargs["asset_cfg"].body_ids @@ -194,10 +200,12 @@ def record_body_names(output: torch.Tensor, descriptor: GenericIODescriptor, **k body_ids = list(range(len(asset.body_names))) descriptor.body_names = [asset.body_names[i] for i in body_ids] + """ Root state. """ + @generic_io_descriptor(units="m", axes=["Z"], observation_type="RootState", on_inspect=[record_shape, record_dtype]) def base_pos_z(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg("robot")) -> torch.Tensor: """Root height in the simulation world frame.""" @@ -206,7 +214,9 @@ def base_pos_z(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg( return asset.data.root_pos_w[:, 2].unsqueeze(-1) -@generic_io_descriptor(units="m/s", axes=["X", "Y", "Z"], observation_type="RootState", on_inspect=[record_shape, record_dtype]) +@generic_io_descriptor( + units="m/s", axes=["X", "Y", "Z"], observation_type="RootState", on_inspect=[record_shape, record_dtype] +) def base_lin_vel(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg("robot")) -> torch.Tensor: """Root linear velocity in the asset's root frame.""" # extract the used quantities (to enable type-hinting) @@ -214,7 +224,9 @@ def base_lin_vel(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityCf return asset.data.root_lin_vel_b -@generic_io_descriptor(units="rad/s", axes=["X", "Y", "Z"], observation_type="RootState", on_inspect=[record_shape, record_dtype]) +@generic_io_descriptor( + units="rad/s", axes=["X", "Y", "Z"], observation_type="RootState", on_inspect=[record_shape, record_dtype] +) def base_ang_vel(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg("robot")) -> torch.Tensor: """Root angular velocity in the asset's root frame.""" # extract the used quantities (to enable type-hinting) @@ -222,7 +234,9 @@ def base_ang_vel(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityCf return asset.data.root_ang_vel_b -@generic_io_descriptor(units="m/s^2", axes=["X", "Y", "Z"], observation_type="RootState", on_inspect=[record_shape, record_dtype]) +@generic_io_descriptor( + units="m/s^2", axes=["X", "Y", "Z"], observation_type="RootState", on_inspect=[record_shape, record_dtype] +) def projected_gravity(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg("robot")) -> torch.Tensor: """Gravity projection on the asset's root frame.""" # extract the used quantities (to enable type-hinting) @@ -230,7 +244,9 @@ def projected_gravity(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEnt return asset.data.projected_gravity_b -@generic_io_descriptor(units="m", axes=["X", "Y", "Z"], observation_type="RootState", on_inspect=[record_shape, record_dtype]) +@generic_io_descriptor( + units="m", axes=["X", "Y", "Z"], observation_type="RootState", on_inspect=[record_shape, record_dtype] +) def root_pos_w(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg("robot")) -> torch.Tensor: """Asset root position in the environment frame.""" # extract the used quantities (to enable type-hinting) @@ -238,7 +254,9 @@ def root_pos_w(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg( return asset.data.root_pos_w - env.scene.env_origins -@generic_io_descriptor(units="unit", axes=["W", "X", "Y", "Z"], observation_type="RootState", on_inspect=[record_shape, record_dtype]) +@generic_io_descriptor( + units="unit", axes=["W", "X", "Y", "Z"], observation_type="RootState", on_inspect=[record_shape, record_dtype] +) def root_quat_w( env: ManagerBasedEnv, make_quat_unique: bool = False, asset_cfg: SceneEntityCfg = SceneEntityCfg("robot") ) -> torch.Tensor: @@ -256,7 +274,9 @@ def root_quat_w( return math_utils.quat_unique(quat) if make_quat_unique else quat -@generic_io_descriptor(units="m/s", axes=["X", "Y", "Z"], observation_type="RootState", on_inspect=[record_shape, record_dtype]) +@generic_io_descriptor( + units="m/s", axes=["X", "Y", "Z"], observation_type="RootState", on_inspect=[record_shape, record_dtype] +) def root_lin_vel_w(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg("robot")) -> torch.Tensor: """Asset root linear velocity in the environment frame.""" # extract the used quantities (to enable type-hinting) @@ -264,7 +284,9 @@ def root_lin_vel_w(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntity return asset.data.root_lin_vel_w -@generic_io_descriptor(units="rad/s", axes=["X", "Y", "Z"], observation_type="RootState", on_inspect=[record_shape, record_dtype]) +@generic_io_descriptor( + units="rad/s", axes=["X", "Y", "Z"], observation_type="RootState", on_inspect=[record_shape, record_dtype] +) def root_ang_vel_w(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg("robot")) -> torch.Tensor: """Asset root angular velocity in the environment frame.""" # extract the used quantities (to enable type-hinting) @@ -276,7 +298,10 @@ def root_ang_vel_w(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntity Body state """ -@generic_io_descriptor(observation_type="BodyState", body_names=None, on_inspect=[record_shape, record_dtype, record_body_names]) + +@generic_io_descriptor( + observation_type="BodyState", body_names=None, on_inspect=[record_shape, record_dtype, record_body_names] +) def body_pose_w( env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg("robot"), @@ -300,7 +325,9 @@ def body_pose_w( return pose.reshape(env.num_envs, -1) -@generic_io_descriptor(observation_type="BodyState", body_names=None, on_inspect=[record_shape, record_dtype, record_body_names]) +@generic_io_descriptor( + observation_type="BodyState", body_names=None, on_inspect=[record_shape, record_dtype, record_body_names] +) def body_projected_gravity_b( env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg("robot"), @@ -328,7 +355,11 @@ def body_projected_gravity_b( """ Joint state. """ -@generic_io_descriptor(observation_type="JointState", joint_names=None, on_inspect=[record_joint_names, record_dtype, record_shape]) + + +@generic_io_descriptor( + observation_type="JointState", joint_names=None, on_inspect=[record_joint_names, record_dtype, record_shape] +) def joint_pos(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg("robot")) -> torch.Tensor: """The joint positions of the asset. @@ -339,7 +370,9 @@ def joint_pos(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg(" return asset.data.joint_pos[:, asset_cfg.joint_ids] -@generic_io_descriptor(observation_type="JointState", joint_names=None, on_inspect=[record_joint_names, record_dtype, record_shape]) +@generic_io_descriptor( + observation_type="JointState", joint_names=None, on_inspect=[record_joint_names, record_dtype, record_shape] +) def joint_pos_rel(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg("robot")) -> torch.Tensor: """The joint positions of the asset w.r.t. the default joint positions. @@ -350,7 +383,9 @@ def joint_pos_rel(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityC return asset.data.joint_pos[:, asset_cfg.joint_ids] - asset.data.default_joint_pos[:, asset_cfg.joint_ids] -@generic_io_descriptor(observation_type="JointState", joint_names=None, on_inspect=[record_joint_names, record_dtype, record_shape]) +@generic_io_descriptor( + observation_type="JointState", joint_names=None, on_inspect=[record_joint_names, record_dtype, record_shape] +) def joint_pos_limit_normalized( env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg("robot") ) -> torch.Tensor: @@ -366,7 +401,10 @@ def joint_pos_limit_normalized( asset.data.soft_joint_pos_limits[:, asset_cfg.joint_ids, 1], ) -@generic_io_descriptor(observation_type="JointState", joint_names=None, on_inspect=[record_joint_names, record_dtype, record_shape]) + +@generic_io_descriptor( + observation_type="JointState", joint_names=None, on_inspect=[record_joint_names, record_dtype, record_shape] +) def joint_vel(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg("robot")): """The joint velocities of the asset. @@ -376,7 +414,10 @@ def joint_vel(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg(" asset: Articulation = env.scene[asset_cfg.name] return asset.data.joint_vel[:, asset_cfg.joint_ids] -@generic_io_descriptor(observation_type="JointState", joint_names=None, on_inspect=[record_joint_names, record_dtype, record_shape]) + +@generic_io_descriptor( + observation_type="JointState", joint_names=None, on_inspect=[record_joint_names, record_dtype, record_shape] +) def joint_vel_rel(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg("robot")): """The joint velocities of the asset w.r.t. the default joint velocities. @@ -386,7 +427,10 @@ def joint_vel_rel(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityC asset: Articulation = env.scene[asset_cfg.name] return asset.data.joint_vel[:, asset_cfg.joint_ids] - asset.data.default_joint_vel[:, asset_cfg.joint_ids] -@generic_io_descriptor(observation_type="JointState", joint_names=None, on_inspect=[record_joint_names, record_dtype, record_shape]) + +@generic_io_descriptor( + observation_type="JointState", joint_names=None, on_inspect=[record_joint_names, record_dtype, record_shape] +) def joint_effort(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg("robot")) -> torch.Tensor: """The joint applied effort of the robot. @@ -755,6 +799,7 @@ def _inference(model, images: torch.Tensor) -> torch.Tensor: Actions. """ + @generic_io_descriptor(dtype=torch.float32, observation_type="Action", on_inspect=[record_shape]) def last_action(env: ManagerBasedEnv, action_name: str | None = None) -> torch.Tensor: """The last input action to the environment. @@ -772,6 +817,7 @@ def last_action(env: ManagerBasedEnv, action_name: str | None = None) -> torch.T Commands. """ + @generic_io_descriptor(dtype=torch.float32, observation_type="Command", on_inspect=[record_shape]) def generated_commands(env: ManagerBasedRLEnv, command_name: str | None = None) -> torch.Tensor: """The generated command from command term in the command manager with the given name.""" diff --git a/source/isaaclab/isaaclab/managers/manager_base.py b/source/isaaclab/isaaclab/managers/manager_base.py index 37cb0454f1b..081c3e271ec 100644 --- a/source/isaaclab/isaaclab/managers/manager_base.py +++ b/source/isaaclab/isaaclab/managers/manager_base.py @@ -353,7 +353,7 @@ def _resolve_common_term_cfg(self, term_name: str, term_cfg: ManagerTermBaseCfg, # check if function is callable if not callable(func_static): raise AttributeError(f"The term '{term_name}' is not callable. Received: {term_cfg.func}") - + # check statically if the term's arguments are matched by params term_params = list(term_cfg.params.keys()) args = inspect.signature(func_static).parameters diff --git a/source/isaaclab/isaaclab/managers/observation_manager.py b/source/isaaclab/isaaclab/managers/observation_manager.py index 85735faf060..d3d0854fa9d 100644 --- a/source/isaaclab/isaaclab/managers/observation_manager.py +++ b/source/isaaclab/isaaclab/managers/observation_manager.py @@ -244,8 +244,6 @@ def get_IO_descriptors(self): ) # iterate over all the terms in each group group_term_names = self._group_obs_term_names[group_name] - # buffer to store obs per group - group_obs = dict.fromkeys(group_term_names, None) # read attributes for each term obs_terms = zip(group_term_names, self._group_obs_term_cfgs[group_name]) @@ -254,9 +252,8 @@ def get_IO_descriptors(self): try: term_cfg.func(self._env, **term_cfg.params, inspect=True) desc = term_cfg.func._descriptor.__dict__.copy() - print(f"desc: {desc}") overloads = {} - for k,v in term_cfg.__dict__.items(): + for k, v in term_cfg.__dict__.items(): if k in ["modifiers", "clip", "scale", "history_length", "flatten_history_dim"]: overloads[k] = v desc.update(overloads) @@ -268,10 +265,8 @@ def get_IO_descriptors(self): formatted_data = {} for item in data: name = item.pop("name") - formatted_item = {} - formatted_item["overloads"] = {} - formatted_item["extras"] = {} - for k,v in item.items(): + formatted_item = {"overloads": {}, "extras": {}} + for k, v in item.items(): # Check if v is a tuple and convert to list if isinstance(v, tuple): v = list(v) @@ -281,12 +276,11 @@ def get_IO_descriptors(self): formatted_item["extras"][k] = v else: formatted_item[k] = v - formatted_data[name] = formatted_item return formatted_data - + """ Operations. """ From f57b8866b9b62664d403253136869d9c788ede51 Mon Sep 17 00:00:00 2001 From: Antoine Richard Date: Tue, 10 Jun 2025 16:43:18 +0200 Subject: [PATCH 08/16] WIP added action thingys --- scripts/environments/random_agent.py | 11 +++++-- .../isaaclab/envs/manager_based_env.py | 2 +- .../isaaclab/envs/mdp/actions/actions_cfg.py | 1 - .../envs/mdp/actions/joint_actions.py | 19 +++++++++++ .../isaaclab/managers/action_manager.py | 32 +++++++++++++++++++ .../isaaclab/managers/observation_manager.py | 6 +++- 6 files changed, 66 insertions(+), 5 deletions(-) diff --git a/scripts/environments/random_agent.py b/scripts/environments/random_agent.py index ab877628620..55f5fe92d0a 100644 --- a/scripts/environments/random_agent.py +++ b/scripts/environments/random_agent.py @@ -53,12 +53,19 @@ def main(): # reset environment env.reset() - out = env.unwrapped.get_IO_descriptors + outs = env.unwrapped.get_IO_descriptors + out = outs["observations"] + out_actions = outs["actions"] # Make a yaml file with the output import yaml with open("obs_descriptors.yaml", "w") as f: - yaml.safe_dump(out, f) + yaml.safe_dump(outs, f) + + for k, v in out_actions.items(): + print(f"--- Action term: {k} ---") + for k1, v1 in v.items(): + print(f"{k1}: {v1}") for k, v in out.items(): print(f"--- Obs term: {k} ---") diff --git a/source/isaaclab/isaaclab/envs/manager_based_env.py b/source/isaaclab/isaaclab/envs/manager_based_env.py index 6da245e3e7f..02610232f19 100644 --- a/source/isaaclab/isaaclab/envs/manager_based_env.py +++ b/source/isaaclab/isaaclab/envs/manager_based_env.py @@ -217,7 +217,7 @@ def get_IO_descriptors(self): Returns: A dictionary with keys as the group names and values as the IO descriptors. """ - return self.observation_manager.get_IO_descriptors + return {"observations": self.observation_manager.get_IO_descriptors, "actions": self.action_manager.get_IO_descriptors} """ Operations - Setup. diff --git a/source/isaaclab/isaaclab/envs/mdp/actions/actions_cfg.py b/source/isaaclab/isaaclab/envs/mdp/actions/actions_cfg.py index 9c932d227b0..8a769e4fb82 100644 --- a/source/isaaclab/isaaclab/envs/mdp/actions/actions_cfg.py +++ b/source/isaaclab/isaaclab/envs/mdp/actions/actions_cfg.py @@ -32,7 +32,6 @@ class JointActionCfg(ActionTermCfg): preserve_order: bool = False """Whether to preserve the order of the joint names in the action output. Defaults to False.""" - @configclass class JointPositionActionCfg(JointActionCfg): """Configuration for the joint position action term. diff --git a/source/isaaclab/isaaclab/envs/mdp/actions/joint_actions.py b/source/isaaclab/isaaclab/envs/mdp/actions/joint_actions.py index 0e2689fd254..9e0ffcc513f 100644 --- a/source/isaaclab/isaaclab/envs/mdp/actions/joint_actions.py +++ b/source/isaaclab/isaaclab/envs/mdp/actions/joint_actions.py @@ -122,6 +122,25 @@ def raw_actions(self) -> torch.Tensor: @property def processed_actions(self) -> torch.Tensor: return self._processed_actions + + @property + def IO_descriptor(self): + data = { + "name": self.__class__.__name__, + "mdp_type": "Action", + "full_path": self.__class__.__module__ + "." + self.__class__.__name__, + "description": self.__doc__, + "shape": (self.action_dim,), + "dtype": str(self.raw_actions.dtype), + "action_type": "JointState", + "joint_names": self._joint_names, + "scale": self._scale, + "offset": self._offset[0].detach().cpu().numpy().tolist(), # This seems to be always [4xNum_joints] IDK why. Need to check. + } + + if self.cfg.clip is not None: + data["clip"] = self._clip + return data """ Operations. diff --git a/source/isaaclab/isaaclab/managers/action_manager.py b/source/isaaclab/isaaclab/managers/action_manager.py index 72e78e68cb9..9dce7acd733 100644 --- a/source/isaaclab/isaaclab/managers/action_manager.py +++ b/source/isaaclab/isaaclab/managers/action_manager.py @@ -259,6 +259,38 @@ def has_debug_vis_implementation(self) -> bool: has_debug_vis |= term.has_debug_vis_implementation return has_debug_vis + @property + def get_IO_descriptors(self): + """Get the IO descriptors for the action manager. + + Returns: + A dictionary with keys as the term names and values as the IO descriptors. + """ + + data = [] + + for term in self._terms.values(): + try: + data.append(term.IO_descriptor) + except Exception as e: + print(f"Error getting IO descriptor for term: {e}") + + formatted_data = {} + for item in data: + name = item.pop("name") + formatted_item = {"extras": {}} + for k, v in item.items(): + # Check if v is a tuple and convert to list + if isinstance(v, tuple): + v = list(v) + if k in ["description", "units"]: + formatted_item["extras"][k] = v + else: + formatted_item[k] = v + formatted_data[name] = formatted_item + + return formatted_data + """ Operations. """ diff --git a/source/isaaclab/isaaclab/managers/observation_manager.py b/source/isaaclab/isaaclab/managers/observation_manager.py index d3d0854fa9d..ebed5883a9d 100644 --- a/source/isaaclab/isaaclab/managers/observation_manager.py +++ b/source/isaaclab/isaaclab/managers/observation_manager.py @@ -248,12 +248,16 @@ def get_IO_descriptors(self): obs_terms = zip(group_term_names, self._group_obs_term_cfgs[group_name]) for term_name, term_cfg in obs_terms: - # dummy call to cache some values + # Call to the observation function to get the IO descriptor with the inspect flag set to True try: term_cfg.func(self._env, **term_cfg.params, inspect=True) + # Copy the descriptor and update with the term's own extra parameters desc = term_cfg.func._descriptor.__dict__.copy() + # Create a dictionary to store the overloads overloads = {} + # Iterate over the term's own parameters and add them to the overloads dictionary for k, v in term_cfg.__dict__.items(): + # For now we do not add the noise modifier if k in ["modifiers", "clip", "scale", "history_length", "flatten_history_dim"]: overloads[k] = v desc.update(overloads) From c4c75e56308e7872f06bdd77fe0dedb677805844 Mon Sep 17 00:00:00 2001 From: Antoine Richard Date: Wed, 11 Jun 2025 15:27:10 +0200 Subject: [PATCH 09/16] Readying code for clean-up --- .../isaaclab/envs/manager_based_env.py | 5 +- .../isaaclab/envs/mdp/actions/actions_cfg.py | 1 + .../envs/mdp/actions/binary_joint_actions.py | 10 + .../envs/mdp/actions/joint_actions.py | 35 +-- .../mdp/actions/joint_actions_to_limits.py | 34 +++ .../envs/mdp/actions/non_holonomic_actions.py | 16 ++ .../mdp/actions/pink_task_space_actions.py | 12 + .../envs/mdp/actions/task_space_actions.py | 35 +++ .../isaaclab/envs/mdp/observations.py | 217 ++---------------- .../isaaclab/managers/action_manager.py | 23 +- .../isaaclab/managers/observation_manager.py | 6 +- 11 files changed, 165 insertions(+), 229 deletions(-) diff --git a/source/isaaclab/isaaclab/envs/manager_based_env.py b/source/isaaclab/isaaclab/envs/manager_based_env.py index 02610232f19..50ce4ad186f 100644 --- a/source/isaaclab/isaaclab/envs/manager_based_env.py +++ b/source/isaaclab/isaaclab/envs/manager_based_env.py @@ -217,7 +217,10 @@ def get_IO_descriptors(self): Returns: A dictionary with keys as the group names and values as the IO descriptors. """ - return {"observations": self.observation_manager.get_IO_descriptors, "actions": self.action_manager.get_IO_descriptors} + return { + "observations": self.observation_manager.get_IO_descriptors, + "actions": self.action_manager.get_IO_descriptors, + } """ Operations - Setup. diff --git a/source/isaaclab/isaaclab/envs/mdp/actions/actions_cfg.py b/source/isaaclab/isaaclab/envs/mdp/actions/actions_cfg.py index 8a769e4fb82..9c932d227b0 100644 --- a/source/isaaclab/isaaclab/envs/mdp/actions/actions_cfg.py +++ b/source/isaaclab/isaaclab/envs/mdp/actions/actions_cfg.py @@ -32,6 +32,7 @@ class JointActionCfg(ActionTermCfg): preserve_order: bool = False """Whether to preserve the order of the joint names in the action output. Defaults to False.""" + @configclass class JointPositionActionCfg(JointActionCfg): """Configuration for the joint position action term. diff --git a/source/isaaclab/isaaclab/envs/mdp/actions/binary_joint_actions.py b/source/isaaclab/isaaclab/envs/mdp/actions/binary_joint_actions.py index f3a4fa37af1..9b8666e4464 100644 --- a/source/isaaclab/isaaclab/envs/mdp/actions/binary_joint_actions.py +++ b/source/isaaclab/isaaclab/envs/mdp/actions/binary_joint_actions.py @@ -17,6 +17,7 @@ if TYPE_CHECKING: from isaaclab.envs import ManagerBasedEnv + from isaaclab.envs.utils.io_descriptors import GenericActionIODescriptor from . import actions_cfg @@ -111,6 +112,15 @@ def raw_actions(self) -> torch.Tensor: def processed_actions(self) -> torch.Tensor: return self._processed_actions + @property + def IO_descriptor(self) -> GenericActionIODescriptor: + super().IO_descriptor + self._IO_descriptor.shape = (self.action_dim,) + self._IO_descriptor.dtype = str(self.raw_actions.dtype) + self._IO_descriptor.action_type = "JointAction" + self._IO_descriptor.joint_names = self._joint_names + return self._IO_descriptor + """ Operations. """ diff --git a/source/isaaclab/isaaclab/envs/mdp/actions/joint_actions.py b/source/isaaclab/isaaclab/envs/mdp/actions/joint_actions.py index 9e0ffcc513f..8341c59184f 100644 --- a/source/isaaclab/isaaclab/envs/mdp/actions/joint_actions.py +++ b/source/isaaclab/isaaclab/envs/mdp/actions/joint_actions.py @@ -17,6 +17,7 @@ if TYPE_CHECKING: from isaaclab.envs import ManagerBasedEnv + from isaaclab.envs.utils.io_descriptors import GenericActionIODescriptor from . import actions_cfg @@ -122,25 +123,25 @@ def raw_actions(self) -> torch.Tensor: @property def processed_actions(self) -> torch.Tensor: return self._processed_actions - + @property - def IO_descriptor(self): - data = { - "name": self.__class__.__name__, - "mdp_type": "Action", - "full_path": self.__class__.__module__ + "." + self.__class__.__name__, - "description": self.__doc__, - "shape": (self.action_dim,), - "dtype": str(self.raw_actions.dtype), - "action_type": "JointState", - "joint_names": self._joint_names, - "scale": self._scale, - "offset": self._offset[0].detach().cpu().numpy().tolist(), # This seems to be always [4xNum_joints] IDK why. Need to check. - } - + def IO_descriptor(self) -> GenericActionIODescriptor: + super().IO_descriptor + self._IO_descriptor.shape = (self.action_dim,) + self._IO_descriptor.dtype = str(self.raw_actions.dtype) + self._IO_descriptor.action_type = "JointAction" + self._IO_descriptor.joint_names = self._joint_names + self._IO_descriptor.scale = self._scale + # This seems to be always [4xNum_joints] IDK why. Need to check. + if isinstance(self._offset, torch.Tensor): + self._IO_descriptor.offset = self._offset[0].detach().cpu().numpy().tolist() + else: + self._IO_descriptor.offset = self._offset if self.cfg.clip is not None: - data["clip"] = self._clip - return data + self._IO_descriptor.clip = self._clip + else: + self._IO_descriptor.clip = None + return self._IO_descriptor """ Operations. diff --git a/source/isaaclab/isaaclab/envs/mdp/actions/joint_actions_to_limits.py b/source/isaaclab/isaaclab/envs/mdp/actions/joint_actions_to_limits.py index 5398241e15e..d056d2e6ac6 100644 --- a/source/isaaclab/isaaclab/envs/mdp/actions/joint_actions_to_limits.py +++ b/source/isaaclab/isaaclab/envs/mdp/actions/joint_actions_to_limits.py @@ -18,6 +18,7 @@ if TYPE_CHECKING: from isaaclab.envs import ManagerBasedEnv + from isaaclab.envs.utils.io_descriptors import GenericActionIODescriptor from . import actions_cfg @@ -105,6 +106,25 @@ def raw_actions(self) -> torch.Tensor: def processed_actions(self) -> torch.Tensor: return self._processed_actions + @property + def IO_descriptor(self) -> GenericActionIODescriptor: + super().IO_descriptor + self._IO_descriptor.shape = (self.action_dim,) + self._IO_descriptor.dtype = str(self.raw_actions.dtype) + self._IO_descriptor.action_type = "JointAction" + self._IO_descriptor.joint_names = self._joint_names + self._IO_descriptor.scale = self._scale + # This seems to be always [4xNum_joints] IDK why. Need to check. + if isinstance(self._offset, torch.Tensor): + self._IO_descriptor.offset = self._offset[0].detach().cpu().numpy().tolist() + else: + self._IO_descriptor.offset = self._offset + if self.cfg.clip is not None: + self._IO_descriptor.clip = self._clip + else: + self._IO_descriptor.clip = None + return self._IO_descriptor + """ Operations. """ @@ -195,6 +215,20 @@ def __init__(self, cfg: actions_cfg.EMAJointPositionToLimitsActionCfg, env: Mana # initialize the previous targets self._prev_applied_actions = torch.zeros_like(self.processed_actions) + @property + def IO_descriptor(self) -> GenericActionIODescriptor: + super().IO_descriptor + if isinstance(self._alpha, float): + self._IO_descriptor.alpha = self._alpha + elif isinstance(self._alpha, torch.Tensor): + self._IO_descriptor.alpha = self._alpha[0].detach().cpu().numpy().tolist() + else: + raise ValueError( + f"Unsupported moving average weight type: {type(self._alpha)}. Supported types are float and" + " torch.Tensor." + ) + return self._IO_descriptor + def reset(self, env_ids: Sequence[int] | None = None) -> None: # check if specific environment ids are provided if env_ids is None: diff --git a/source/isaaclab/isaaclab/envs/mdp/actions/non_holonomic_actions.py b/source/isaaclab/isaaclab/envs/mdp/actions/non_holonomic_actions.py index a3e5cebdbf5..5d168da4151 100644 --- a/source/isaaclab/isaaclab/envs/mdp/actions/non_holonomic_actions.py +++ b/source/isaaclab/isaaclab/envs/mdp/actions/non_holonomic_actions.py @@ -18,6 +18,7 @@ if TYPE_CHECKING: from isaaclab.envs import ManagerBasedEnv + from isaaclab.envs.utils.io_descriptors import GenericActionIODescriptor from . import actions_cfg @@ -134,6 +135,21 @@ def raw_actions(self) -> torch.Tensor: def processed_actions(self) -> torch.Tensor: return self._processed_actions + @property + def IO_descriptor(self) -> GenericActionIODescriptor: + super().IO_descriptor + self._IO_descriptor.shape = (self.action_dim,) + self._IO_descriptor.dtype = str(self.raw_actions.dtype) + self._IO_descriptor.action_type = "non holonomic actions" + self._IO_descriptor.scale = self._scale + self._IO_descriptor.offset = self._offset + self._IO_descriptor.clip = self._clip + self._IO_descriptor.body_name = self._body_name + self._IO_descriptor.x_joint_name = self._joint_names[0] + self._IO_descriptor.y_joint_name = self._joint_names[1] + self._IO_descriptor.yaw_joint_name = self._joint_names[2] + return self._IO_descriptor + """ Operations. """ diff --git a/source/isaaclab/isaaclab/envs/mdp/actions/pink_task_space_actions.py b/source/isaaclab/isaaclab/envs/mdp/actions/pink_task_space_actions.py index 11c3ff6cedf..30980fa1fb4 100644 --- a/source/isaaclab/isaaclab/envs/mdp/actions/pink_task_space_actions.py +++ b/source/isaaclab/isaaclab/envs/mdp/actions/pink_task_space_actions.py @@ -22,6 +22,7 @@ if TYPE_CHECKING: from isaaclab.envs import ManagerBasedEnv + from isaaclab.envs.utils.io_descriptors import GenericActionIODescriptor from . import pink_actions_cfg @@ -130,6 +131,17 @@ def processed_actions(self) -> torch.Tensor: """Get the processed actions tensor.""" return self._processed_actions + @property + def IO_descriptor(self) -> GenericActionIODescriptor: + super().IO_descriptor + self._IO_descriptor.shape = (self.action_dim,) + self._IO_descriptor.dtype = str(self.raw_actions.dtype) + self._IO_descriptor.action_type = "PinkInverseKinematicsAction" + self._IO_descriptor.pink_controller_joint_names = self._pink_controlled_joint_names + self._IO_descriptor.hand_joint_names = self._hand_joint_names + self._IO_descriptor.extras["controller_cfg"] = self.cfg.controller.__dict__ + return self._IO_descriptor + # """ # Operations. # """ diff --git a/source/isaaclab/isaaclab/envs/mdp/actions/task_space_actions.py b/source/isaaclab/isaaclab/envs/mdp/actions/task_space_actions.py index 89f51817179..6f9bf0553ec 100644 --- a/source/isaaclab/isaaclab/envs/mdp/actions/task_space_actions.py +++ b/source/isaaclab/isaaclab/envs/mdp/actions/task_space_actions.py @@ -23,6 +23,7 @@ if TYPE_CHECKING: from isaaclab.envs import ManagerBasedEnv + from isaaclab.envs.utils.io_descriptors import GenericActionIODescriptor from . import actions_cfg @@ -148,6 +149,23 @@ def jacobian_b(self) -> torch.Tensor: jacobian[:, 3:, :] = torch.bmm(base_rot_matrix, jacobian[:, 3:, :]) return jacobian + @property + def IO_descriptor(self) -> GenericActionIODescriptor: + super().IO_descriptor + self._IO_descriptor.shape = (self.action_dim,) + self._IO_descriptor.dtype = str(self.raw_actions.dtype) + self._IO_descriptor.action_type = "TaskSpaceAction" + self._IO_descriptor.body_name = self._body_name + self._IO_descriptor.joint_names = self._joint_names + self._IO_descriptor.scale = self._scale + if self.cfg.clip is not None: + self._IO_descriptor.clip = self.cfg.clip + else: + self._IO_descriptor.clip = None + self._IO_descriptor.extras["controller_cfg"] = self.cfg.controller.__dict__ + self._IO_descriptor.extras["body_offset"] = self.cfg.body_offset.__dict__ + return self._IO_descriptor + """ Operations. """ @@ -409,6 +427,23 @@ def jacobian_b(self) -> torch.Tensor: jacobian[:, 3:, :] = torch.bmm(base_rot_matrix, jacobian[:, 3:, :]) return jacobian + @property + def IO_descriptor(self) -> GenericActionIODescriptor: + super().IO_descriptor + self._IO_descriptor.shape = (self.action_dim,) + self._IO_descriptor.dtype = str(self.raw_actions.dtype) + self._IO_descriptor.action_type = "TaskSpaceAction" + self._IO_descriptor.body_name = self._ee_body_name + self._IO_descriptor.joint_names = self._joint_names + self._IO_descriptor.scale = self._scale + if self.cfg.clip is not None: + self._IO_descriptor.clip = self.cfg.clip + else: + self._IO_descriptor.clip = None + self._IO_descriptor.extras["controller_cfg"] = self.cfg.controller.__dict__ + self._IO_descriptor.extras["body_offset"] = self.cfg.body_offset.__dict__ + return self._IO_descriptor + """ Operations. """ diff --git a/source/isaaclab/isaaclab/envs/mdp/observations.py b/source/isaaclab/isaaclab/envs/mdp/observations.py index ab96597760d..a9cec5b5903 100644 --- a/source/isaaclab/isaaclab/envs/mdp/observations.py +++ b/source/isaaclab/isaaclab/envs/mdp/observations.py @@ -11,10 +11,8 @@ from __future__ import annotations -import functools -import inspect import torch -from typing import TYPE_CHECKING, Any, Concatenate, ParamSpec, TypeVar +from typing import TYPE_CHECKING import isaaclab.utils.math as math_utils from isaaclab.assets import Articulation, RigidObject @@ -26,180 +24,13 @@ if TYPE_CHECKING: from isaaclab.envs import ManagerBasedEnv, ManagerBasedRLEnv -import dataclasses -from collections.abc import Callable - -from isaaclab.utils import configclass - - -@configclass -class GenericIODescriptor: - mdp_type: str = "Observation" - name: str = None - full_path: str = None - description: str = None - shape: tuple[int, ...] = None - dtype: torch.dtype = None - observation_type: str = None - extras: dict[str, Any] = None - - -# These are defined to help with type hinting -P = ParamSpec("P") -R = TypeVar("R") - - -# Automatically builds a descriptor from the kwargs -def _make_descriptor(**kwargs: Any) -> GenericIODescriptor: - """Split *kwargs* into (known dataclass fields) and (extras).""" - field_names = {f.name for f in dataclasses.fields(GenericIODescriptor)} - known = {k: v for k, v in kwargs.items() if k in field_names} - extras = {k: v for k, v in kwargs.items() if k not in field_names} - - desc = GenericIODescriptor(**known) - # User defined extras are stored in the descriptor under the `extras` field - desc.extras = extras - return desc - - -# Decorator factory for generic IO descriptors. -def generic_io_descriptor( - _func: Callable[Concatenate[ManagerBasedEnv, P], R] | None = None, - *, - on_inspect: Callable[..., Any] | list[Callable[..., Any]] | None = None, - **descriptor_kwargs: Any, -) -> Callable[[Callable[Concatenate[ManagerBasedEnv, P], R]], Callable[Concatenate[ManagerBasedEnv, P], R]]: - """ - Decorator factory for generic IO descriptors. - - This decorator can be used in different ways: - 1. The default decorator has all the information I need for my use case: - ..code-block:: python - @generic_io_descriptor(GenericIODescriptor(description="..", dtype="..")) - def my_func(env: ManagerBasedEnv, *args, **kwargs): - ... - ..note:: If description is not set, the function's docstring is used to populate it. - - 2. I need to add more information to the descriptor: - ..code-block:: python - @generic_io_descriptor(description="..", new_var_1="a", new_var_2="b") - def my_func(env: ManagerBasedEnv, *args, **kwargs): - ... - 3. I need to add a hook to the descriptor: - ..code-block:: python - def record_shape(tensor: torch.Tensor, desc: GenericIODescriptor, **kwargs): - desc.shape = (tensor.shape[-1],) - - @generic_io_descriptor(description="..", new_var_1="a", new_var_2="b", on_inspect=[record_shape, record_dtype]) - def my_func(env: ManagerBasedEnv, *args, **kwargs): - ..note:: The hook is called after the function is called, if and only if the `inspect` flag is set when calling the function. - - For example: - ..code-block:: python - my_func(env, inspect=True) - - 4. I need to add a hook to the descriptor and this hook will write to a variable that is not part of the base descriptor. - ..code-block:: python - def record_joint_names(output: torch.Tensor, descriptor: GenericIODescriptor, **kwargs): - asset: Articulation = kwargs["env"].scene[kwargs["asset_cfg"].name] - joint_ids = kwargs["asset_cfg"].joint_ids - if joint_ids == slice(None, None, None): - joint_ids = list(range(len(asset.joint_names))) - descriptor.joint_names = [asset.joint_names[i] for i in joint_ids] - - @generic_io_descriptor(joint_names=None, new_var_1="a", new_var_2="b", on_inspect=[record_shape, record_dtype, record_joint_names]) - def my_func(env: ManagerBasedEnv, *args, **kwargs): - - ..note:: The hook can access all the variables in the wrapped function's signature. While it is useful, the user should be careful to - access only existing variables. - - Args: - _func: The function to decorate. - **descriptor_kwargs: Keyword arguments to pass to the descriptor. - - Returns: - A decorator that can be used to decorate a function. - """ - - if _func is not None and isinstance(_func, GenericIODescriptor): - descriptor = _func - _func = None - else: - descriptor = _make_descriptor(**descriptor_kwargs) - - # Ensures the hook is a list - if callable(on_inspect): - inspect_hooks: list[Callable[..., Any]] = [on_inspect] - else: - inspect_hooks: list[Callable[..., Any]] = list(on_inspect or []) # handles None - - def _apply(func: Callable[Concatenate[ManagerBasedEnv, P], R]) -> Callable[Concatenate[ManagerBasedEnv, P], R]: - - # Capture the signature of the function - sig = inspect.signature(func) - - @functools.wraps(func) - def wrapper(env: ManagerBasedEnv, *args: P.args, **kwargs: P.kwargs) -> R: - inspect_flag: bool = kwargs.pop("inspect", False) - out = func(env, *args, **kwargs) - if inspect_flag: - # Injects the function's arguments into the hooks and applies the defaults - bound = sig.bind(env, *args, **kwargs) - bound.apply_defaults() - call_kwargs = { - "output": out, - "descriptor": descriptor, - **bound.arguments, - } - for hook in inspect_hooks: - hook(**call_kwargs) - return out - - # --- Descriptor bookkeeping --- - descriptor.name = func.__name__ - descriptor.full_path = f"{func.__module__}.{func.__name__}" - descriptor.dtype = str(descriptor.dtype) - # Check if description is set in the descriptor - if descriptor.description is None: - descriptor.description = func.__doc__ - - # Adds the descriptor to the wrapped function as an attribute - wrapper._descriptor = descriptor - wrapper._has_descriptor = True - # Alters the signature of the wrapped function to make it match the original function. - # This allows the wrapped functions to pass the checks in the managers. - wrapper.__signature__ = sig - return wrapper - - # If the decorator is used without parentheses, _func will be the function itself. - if callable(_func): - return _apply(_func) - return _apply - - -def record_shape(output: torch.Tensor, descriptor: GenericIODescriptor, **kwargs): - descriptor.shape = (output.shape[-1],) - - -def record_dtype(output: torch.Tensor, descriptor: GenericIODescriptor, **kwargs): - descriptor.dtype = str(output.dtype) - - -def record_joint_names(output: torch.Tensor, descriptor: GenericIODescriptor, **kwargs): - asset: Articulation = kwargs["env"].scene[kwargs["asset_cfg"].name] - joint_ids = kwargs["asset_cfg"].joint_ids - if joint_ids == slice(None, None, None): - joint_ids = list(range(len(asset.joint_names))) - descriptor.joint_names = [asset.joint_names[i] for i in joint_ids] - - -def record_body_names(output: torch.Tensor, descriptor: GenericIODescriptor, **kwargs): - asset: Articulation = kwargs["env"].scene[kwargs["asset_cfg"].name] - body_ids = kwargs["asset_cfg"].body_ids - if body_ids == slice(None, None, None): - body_ids = list(range(len(asset.body_names))) - descriptor.body_names = [asset.body_names[i] for i in body_ids] - +from isaaclab.envs.utils.io_descriptors import ( + generic_io_descriptor, + record_body_names, + record_dtype, + record_joint_names, + record_shape, +) """ Root state. @@ -299,9 +130,7 @@ def root_ang_vel_w(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntity """ -@generic_io_descriptor( - observation_type="BodyState", body_names=None, on_inspect=[record_shape, record_dtype, record_body_names] -) +@generic_io_descriptor(observation_type="BodyState", on_inspect=[record_shape, record_dtype, record_body_names]) def body_pose_w( env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg("robot"), @@ -325,9 +154,7 @@ def body_pose_w( return pose.reshape(env.num_envs, -1) -@generic_io_descriptor( - observation_type="BodyState", body_names=None, on_inspect=[record_shape, record_dtype, record_body_names] -) +@generic_io_descriptor(observation_type="BodyState", on_inspect=[record_shape, record_dtype, record_body_names]) def body_projected_gravity_b( env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg("robot"), @@ -357,9 +184,7 @@ def body_projected_gravity_b( """ -@generic_io_descriptor( - observation_type="JointState", joint_names=None, on_inspect=[record_joint_names, record_dtype, record_shape] -) +@generic_io_descriptor(observation_type="JointState", on_inspect=[record_joint_names, record_dtype, record_shape]) def joint_pos(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg("robot")) -> torch.Tensor: """The joint positions of the asset. @@ -370,9 +195,7 @@ def joint_pos(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg(" return asset.data.joint_pos[:, asset_cfg.joint_ids] -@generic_io_descriptor( - observation_type="JointState", joint_names=None, on_inspect=[record_joint_names, record_dtype, record_shape] -) +@generic_io_descriptor(observation_type="JointState", on_inspect=[record_joint_names, record_dtype, record_shape]) def joint_pos_rel(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg("robot")) -> torch.Tensor: """The joint positions of the asset w.r.t. the default joint positions. @@ -383,9 +206,7 @@ def joint_pos_rel(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityC return asset.data.joint_pos[:, asset_cfg.joint_ids] - asset.data.default_joint_pos[:, asset_cfg.joint_ids] -@generic_io_descriptor( - observation_type="JointState", joint_names=None, on_inspect=[record_joint_names, record_dtype, record_shape] -) +@generic_io_descriptor(observation_type="JointState", on_inspect=[record_joint_names, record_dtype, record_shape]) def joint_pos_limit_normalized( env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg("robot") ) -> torch.Tensor: @@ -402,9 +223,7 @@ def joint_pos_limit_normalized( ) -@generic_io_descriptor( - observation_type="JointState", joint_names=None, on_inspect=[record_joint_names, record_dtype, record_shape] -) +@generic_io_descriptor(observation_type="JointState", on_inspect=[record_joint_names, record_dtype, record_shape]) def joint_vel(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg("robot")): """The joint velocities of the asset. @@ -415,9 +234,7 @@ def joint_vel(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg(" return asset.data.joint_vel[:, asset_cfg.joint_ids] -@generic_io_descriptor( - observation_type="JointState", joint_names=None, on_inspect=[record_joint_names, record_dtype, record_shape] -) +@generic_io_descriptor(observation_type="JointState", on_inspect=[record_joint_names, record_dtype, record_shape]) def joint_vel_rel(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg("robot")): """The joint velocities of the asset w.r.t. the default joint velocities. @@ -428,9 +245,7 @@ def joint_vel_rel(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityC return asset.data.joint_vel[:, asset_cfg.joint_ids] - asset.data.default_joint_vel[:, asset_cfg.joint_ids] -@generic_io_descriptor( - observation_type="JointState", joint_names=None, on_inspect=[record_joint_names, record_dtype, record_shape] -) +@generic_io_descriptor(observation_type="JointState", on_inspect=[record_joint_names, record_dtype, record_shape]) def joint_effort(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg("robot")) -> torch.Tensor: """The joint applied effort of the robot. diff --git a/source/isaaclab/isaaclab/managers/action_manager.py b/source/isaaclab/isaaclab/managers/action_manager.py index 9dce7acd733..2f994686e5b 100644 --- a/source/isaaclab/isaaclab/managers/action_manager.py +++ b/source/isaaclab/isaaclab/managers/action_manager.py @@ -8,16 +8,18 @@ from __future__ import annotations import inspect +import re import torch import weakref from abc import abstractmethod from collections.abc import Sequence from prettytable import PrettyTable -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Any import omni.kit.app from isaaclab.assets import AssetBase +from isaaclab.envs.utils.io_descriptors import GenericActionIODescriptor from .manager_base import ManagerBase, ManagerTermBase from .manager_term_cfg import ActionTermCfg @@ -50,6 +52,7 @@ def __init__(self, cfg: ActionTermCfg, env: ManagerBasedEnv): super().__init__(cfg, env) # parse config to obtain asset to which the term is applied self._asset: AssetBase = self._env.scene[self.cfg.asset_name] + self._IO_descriptor = GenericActionIODescriptor() # add handle for debug visualization (this is set to a valid handle inside set_debug_vis) self._debug_vis_handle = None @@ -91,6 +94,14 @@ def has_debug_vis_implementation(self) -> bool: source_code = inspect.getsource(self._set_debug_vis_impl) return "NotImplementedError" not in source_code + @property + def IO_descriptor(self) -> GenericActionIODescriptor: + """The IO descriptor for the action term.""" + self._IO_descriptor.name = re.sub(r"([a-z])([A-Z])", r"\1_\2", self.__class__.__name__).lower() + self._IO_descriptor.full_path = f"{self.__class__.__module__}.{self.__class__.__name__}" + self._IO_descriptor.description = self.__class__.__doc__ + return self._IO_descriptor + """ Operations. """ @@ -260,7 +271,7 @@ def has_debug_vis_implementation(self) -> bool: return has_debug_vis @property - def get_IO_descriptors(self): + def get_IO_descriptors(self) -> dict[str, dict[str, Any]]: """Get the IO descriptors for the action manager. Returns: @@ -269,16 +280,16 @@ def get_IO_descriptors(self): data = [] - for term in self._terms.values(): + for term_name, term in self._terms.items(): try: - data.append(term.IO_descriptor) + data.append(term.IO_descriptor.__dict__.copy()) except Exception as e: - print(f"Error getting IO descriptor for term: {e}") + print(f"Error getting IO descriptor for term '{term_name}': {e}") formatted_data = {} for item in data: name = item.pop("name") - formatted_item = {"extras": {}} + formatted_item = {"extras": item.pop("extras")} for k, v in item.items(): # Check if v is a tuple and convert to list if isinstance(v, tuple): diff --git a/source/isaaclab/isaaclab/managers/observation_manager.py b/source/isaaclab/isaaclab/managers/observation_manager.py index ebed5883a9d..c1e0c09b4d6 100644 --- a/source/isaaclab/isaaclab/managers/observation_manager.py +++ b/source/isaaclab/isaaclab/managers/observation_manager.py @@ -264,23 +264,21 @@ def get_IO_descriptors(self): data.append(desc) except Exception as e: print(f"Error getting IO descriptor for term '{term_name}' in group '{group_name}': {e}") - # Format the data for YAML export formatted_data = {} for item in data: name = item.pop("name") - formatted_item = {"overloads": {}, "extras": {}} + formatted_item = {"overloads": {}, "extras": item.pop("extras")} for k, v in item.items(): # Check if v is a tuple and convert to list if isinstance(v, tuple): v = list(v) if k in ["scale", "clip", "history_length", "flatten_history_dim"]: formatted_item["overloads"][k] = v - elif k in ["modifiers", "noise", "description", "units"]: + elif k in ["modifiers", "description", "units"]: formatted_item["extras"][k] = v else: formatted_item[k] = v - formatted_data[name] = formatted_item return formatted_data From 44719de0081ba09f8186ccb9c41ff9de5eede60d Mon Sep 17 00:00:00 2001 From: Antoine Richard Date: Wed, 11 Jun 2025 15:50:49 +0200 Subject: [PATCH 10/16] Forgot to add some code... --- scripts/environments/export_IODescriptors.py | 79 +++++++ .../isaaclab/envs/utils/io_descriptors.py | 192 ++++++++++++++++++ 2 files changed, 271 insertions(+) create mode 100644 scripts/environments/export_IODescriptors.py create mode 100644 source/isaaclab/isaaclab/envs/utils/io_descriptors.py diff --git a/scripts/environments/export_IODescriptors.py b/scripts/environments/export_IODescriptors.py new file mode 100644 index 00000000000..9f0f1d83040 --- /dev/null +++ b/scripts/environments/export_IODescriptors.py @@ -0,0 +1,79 @@ +# Copyright (c) 2022-2025, The Isaac Lab Project Developers (https://github.com/isaac-sim/IsaacLab/blob/main/CONTRIBUTORS.md). +# All rights reserved. +# +# SPDX-License-Identifier: BSD-3-Clause + +"""Script to an environment with random action agent.""" + +"""Launch Isaac Sim Simulator first.""" + +import argparse + +from isaaclab.app import AppLauncher + +# add argparse arguments +parser = argparse.ArgumentParser(description="Random agent for Isaac Lab environments.") +parser.add_argument("--task", type=str, default=None, help="Name of the task.") +# append AppLauncher cli args +AppLauncher.add_app_launcher_args(parser) +# parse the arguments +args_cli = parser.parse_args() +args_cli.headless = True + +# launch omniverse app +app_launcher = AppLauncher(args_cli) +simulation_app = app_launcher.app + +"""Rest everything follows.""" + +import gymnasium as gym +import torch + +import isaaclab_tasks # noqa: F401 +from isaaclab_tasks.utils import parse_env_cfg + +# PLACEHOLDER: Extension template (do not remove this comment) + + +def main(): + """Random actions agent with Isaac Lab environment.""" + # create environment configuration + env_cfg = parse_env_cfg( + args_cli.task, device=args_cli.device, num_envs=1, use_fabric=True + ) + # create environment + env = gym.make(args_cli.task, cfg=env_cfg) + + # print info (this is vectorized environment) + print(f"[INFO]: Gym observation space: {env.observation_space}") + print(f"[INFO]: Gym action space: {env.action_space}") + # reset environment + env.reset() + + outs = env.unwrapped.get_IO_descriptors + out = outs["observations"] + out_actions = outs["actions"] + # Make a yaml file with the output + import yaml + + with open("obs_descriptors.yaml", "w") as f: + yaml.safe_dump(outs, f) + + for k, v in out_actions.items(): + print(f"--- Action term: {k} ---") + for k1, v1 in v.items(): + print(f"{k1}: {v1}") + + for k, v in out.items(): + print(f"--- Obs term: {k} ---") + for k1, v1 in v.items(): + print(f"{k1}: {v1}") + env.step(torch.zeros(env.action_space.shape, device=env.unwrapped.device)) + env.close() + + +if __name__ == "__main__": + # run the main function + main() + # close sim app + simulation_app.close() diff --git a/source/isaaclab/isaaclab/envs/utils/io_descriptors.py b/source/isaaclab/isaaclab/envs/utils/io_descriptors.py new file mode 100644 index 00000000000..6b17ba644aa --- /dev/null +++ b/source/isaaclab/isaaclab/envs/utils/io_descriptors.py @@ -0,0 +1,192 @@ +from __future__ import annotations + +from isaaclab.utils import configclass +from collections.abc import Callable +from typing import Any, Concatenate, ParamSpec, TypeVar, TYPE_CHECKING + +if TYPE_CHECKING: + from isaaclab.envs import ManagerBasedEnv + import torch + +import functools +import inspect +import dataclasses + +@configclass +class GenericActionIODescriptor: + mdp_type: str = "Action" + name: str = None + full_path: str = None + description: str = None + shape: tuple[int, ...] = None + dtype: str = None + action_type: str = None + extras: dict[str, Any] = {} + +@configclass +class GenericIODescriptor: + mdp_type: str = "Observation" + name: str = None + full_path: str = None + description: str = None + shape: tuple[int, ...] = None + dtype: str = None + observation_type: str = None + extras: dict[str, Any] = {} + + +# These are defined to help with type hinting +P = ParamSpec("P") +R = TypeVar("R") + + +# Automatically builds a descriptor from the kwargs +def _make_descriptor(**kwargs: Any) -> GenericIODescriptor: + """Split *kwargs* into (known dataclass fields) and (extras).""" + field_names = {f.name for f in dataclasses.fields(GenericIODescriptor)} + known = {k: v for k, v in kwargs.items() if k in field_names} + extras = {k: v for k, v in kwargs.items() if k not in field_names} + + desc = GenericIODescriptor(**known) + # User defined extras are stored in the descriptor under the `extras` field + desc.extras = extras + return desc + + +# Decorator factory for generic IO descriptors. +def generic_io_descriptor( + _func: Callable[Concatenate[ManagerBasedEnv, P], R] | None = None, + *, + on_inspect: Callable[..., Any] | list[Callable[..., Any]] | None = None, + **descriptor_kwargs: Any, +) -> Callable[[Callable[Concatenate[ManagerBasedEnv, P], R]], Callable[Concatenate[ManagerBasedEnv, P], R]]: + """ + Decorator factory for generic IO descriptors. + + This decorator can be used in different ways: + 1. The default decorator has all the information I need for my use case: + ..code-block:: python + @generic_io_descriptor(GenericIODescriptor(description="..", dtype="..")) + def my_func(env: ManagerBasedEnv, *args, **kwargs): + ... + ..note:: If description is not set, the function's docstring is used to populate it. + + 2. I need to add more information to the descriptor: + ..code-block:: python + @generic_io_descriptor(description="..", new_var_1="a", new_var_2="b") + def my_func(env: ManagerBasedEnv, *args, **kwargs): + ... + 3. I need to add a hook to the descriptor: + ..code-block:: python + def record_shape(tensor: torch.Tensor, desc: GenericIODescriptor, **kwargs): + desc.shape = (tensor.shape[-1],) + + @generic_io_descriptor(description="..", new_var_1="a", new_var_2="b", on_inspect=[record_shape, record_dtype]) + def my_func(env: ManagerBasedEnv, *args, **kwargs): + ..note:: The hook is called after the function is called, if and only if the `inspect` flag is set when calling the function. + + For example: + ..code-block:: python + my_func(env, inspect=True) + + 4. I need to add a hook to the descriptor and this hook will write to a variable that is not part of the base descriptor. + ..code-block:: python + def record_joint_names(output: torch.Tensor, descriptor: GenericIODescriptor, **kwargs): + asset: Articulation = kwargs["env"].scene[kwargs["asset_cfg"].name] + joint_ids = kwargs["asset_cfg"].joint_ids + if joint_ids == slice(None, None, None): + joint_ids = list(range(len(asset.joint_names))) + descriptor.joint_names = [asset.joint_names[i] for i in joint_ids] + + @generic_io_descriptor(new_var_1="a", new_var_2="b", on_inspect=[record_shape, record_dtype, record_joint_names]) + def my_func(env: ManagerBasedEnv, *args, **kwargs): + + ..note:: The hook can access all the variables in the wrapped function's signature. While it is useful, the user should be careful to + access only existing variables. + + Args: + _func: The function to decorate. + **descriptor_kwargs: Keyword arguments to pass to the descriptor. + + Returns: + A decorator that can be used to decorate a function. + """ + + if _func is not None and isinstance(_func, GenericIODescriptor): + descriptor = _func + _func = None + else: + descriptor = _make_descriptor(**descriptor_kwargs) + + # Ensures the hook is a list + if callable(on_inspect): + inspect_hooks: list[Callable[..., Any]] = [on_inspect] + else: + inspect_hooks: list[Callable[..., Any]] = list(on_inspect or []) # handles None + + def _apply(func: Callable[Concatenate[ManagerBasedEnv, P], R]) -> Callable[Concatenate[ManagerBasedEnv, P], R]: + + # Capture the signature of the function + sig = inspect.signature(func) + + @functools.wraps(func) + def wrapper(env: ManagerBasedEnv, *args: P.args, **kwargs: P.kwargs) -> R: + inspect_flag: bool = kwargs.pop("inspect", False) + out = func(env, *args, **kwargs) + if inspect_flag: + # Injects the function's arguments into the hooks and applies the defaults + bound = sig.bind(env, *args, **kwargs) + bound.apply_defaults() + call_kwargs = { + "output": out, + "descriptor": descriptor, + **bound.arguments, + } + for hook in inspect_hooks: + hook(**call_kwargs) + return out + + # --- Descriptor bookkeeping --- + descriptor.name = func.__name__ + descriptor.full_path = f"{func.__module__}.{func.__name__}" + descriptor.dtype = str(descriptor.dtype) + # Check if description is set in the descriptor + if descriptor.description is None: + descriptor.description = func.__doc__ + + # Adds the descriptor to the wrapped function as an attribute + wrapper._descriptor = descriptor + wrapper._has_descriptor = True + # Alters the signature of the wrapped function to make it match the original function. + # This allows the wrapped functions to pass the checks in the managers. + wrapper.__signature__ = sig + return wrapper + + # If the decorator is used without parentheses, _func will be the function itself. + if callable(_func): + return _apply(_func) + return _apply + + +def record_shape(output: torch.Tensor, descriptor: GenericIODescriptor, **kwargs): + descriptor.shape = (output.shape[-1],) + + +def record_dtype(output: torch.Tensor, descriptor: GenericIODescriptor, **kwargs): + descriptor.dtype = str(output.dtype) + + +def record_joint_names(output: torch.Tensor, descriptor: GenericIODescriptor, **kwargs): + asset: Articulation = kwargs["env"].scene[kwargs["asset_cfg"].name] + joint_ids = kwargs["asset_cfg"].joint_ids + if joint_ids == slice(None, None, None): + joint_ids = list(range(len(asset.joint_names))) + descriptor.joint_names = [asset.joint_names[i] for i in joint_ids] + + +def record_body_names(output: torch.Tensor, descriptor: GenericIODescriptor, **kwargs): + asset: Articulation = kwargs["env"].scene[kwargs["asset_cfg"].name] + body_ids = kwargs["asset_cfg"].body_ids + if body_ids == slice(None, None, None): + body_ids = list(range(len(asset.body_names))) + descriptor.body_names = [asset.body_names[i] for i in body_ids] \ No newline at end of file From 3286dae05a008327c036ffdb304ae71e5f6c6307 Mon Sep 17 00:00:00 2001 From: Antoine Richard Date: Wed, 11 Jun 2025 15:51:11 +0200 Subject: [PATCH 11/16] Forgot to add some code... --- .../isaaclab/envs/mdp/actions/task_space_actions.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/source/isaaclab/isaaclab/envs/mdp/actions/task_space_actions.py b/source/isaaclab/isaaclab/envs/mdp/actions/task_space_actions.py index 6f9bf0553ec..bde914ccae9 100644 --- a/source/isaaclab/isaaclab/envs/mdp/actions/task_space_actions.py +++ b/source/isaaclab/isaaclab/envs/mdp/actions/task_space_actions.py @@ -435,12 +435,17 @@ def IO_descriptor(self) -> GenericActionIODescriptor: self._IO_descriptor.action_type = "TaskSpaceAction" self._IO_descriptor.body_name = self._ee_body_name self._IO_descriptor.joint_names = self._joint_names - self._IO_descriptor.scale = self._scale + self._IO_descriptor.position_scale = self.cfg.position_scale + self._IO_descriptor.orientation_scale = self.cfg.orientation_scale + self._IO_descriptor.wrench_scale = self.cfg.wrench_scale + self._IO_descriptor.stiffness_scale = self.cfg.stiffness_scale + self._IO_descriptor.damping_ratio_scale = self.cfg.damping_ratio_scale + self._IO_descriptor.nullspace_joint_pos_target = self.cfg.nullspace_joint_pos_target if self.cfg.clip is not None: self._IO_descriptor.clip = self.cfg.clip else: self._IO_descriptor.clip = None - self._IO_descriptor.extras["controller_cfg"] = self.cfg.controller.__dict__ + self._IO_descriptor.extras["controller_cfg"] = self.cfg.controller_cfg.__dict__ self._IO_descriptor.extras["body_offset"] = self.cfg.body_offset.__dict__ return self._IO_descriptor From e3a521f019032acc37966c178f5b4b8cbe333836 Mon Sep 17 00:00:00 2001 From: Antoine Richard Date: Wed, 11 Jun 2025 15:56:37 +0200 Subject: [PATCH 12/16] undid changes to random action --- scripts/environments/random_agent.py | 20 -------------------- 1 file changed, 20 deletions(-) diff --git a/scripts/environments/random_agent.py b/scripts/environments/random_agent.py index 55f5fe92d0a..b3187c3b372 100644 --- a/scripts/environments/random_agent.py +++ b/scripts/environments/random_agent.py @@ -52,26 +52,6 @@ def main(): print(f"[INFO]: Gym action space: {env.action_space}") # reset environment env.reset() - - outs = env.unwrapped.get_IO_descriptors - out = outs["observations"] - out_actions = outs["actions"] - # Make a yaml file with the output - import yaml - - with open("obs_descriptors.yaml", "w") as f: - yaml.safe_dump(outs, f) - - for k, v in out_actions.items(): - print(f"--- Action term: {k} ---") - for k1, v1 in v.items(): - print(f"{k1}: {v1}") - - for k, v in out.items(): - print(f"--- Obs term: {k} ---") - for k1, v1 in v.items(): - print(f"{k1}: {v1}") - exit(0) # simulate environment while simulation_app.is_running(): # run everything in inference mode From 9afde9631dcb4da28a8a2e36a4c3f4081f3d5ce7 Mon Sep 17 00:00:00 2001 From: Antoine Richard Date: Wed, 11 Jun 2025 16:07:02 +0200 Subject: [PATCH 13/16] chanegd naming conventions --- scripts/environments/export_IODescriptors.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/scripts/environments/export_IODescriptors.py b/scripts/environments/export_IODescriptors.py index 9f0f1d83040..b163d4faee8 100644 --- a/scripts/environments/export_IODescriptors.py +++ b/scripts/environments/export_IODescriptors.py @@ -56,7 +56,10 @@ def main(): # Make a yaml file with the output import yaml - with open("obs_descriptors.yaml", "w") as f: + name = args_cli.task.lower().replace("-", "_") + name = name.replace(" ", "_") + + with open(f"{name}_IO_descriptors.yaml", "w") as f: yaml.safe_dump(outs, f) for k, v in out_actions.items(): From 47854e7fa87b63a7def963e491a147401c3d3afb Mon Sep 17 00:00:00 2001 From: Antoine Richard Date: Thu, 12 Jun 2025 11:31:20 +0200 Subject: [PATCH 14/16] added stripping of doc strings + units here and here --- source/isaaclab/isaaclab/envs/mdp/observations.py | 10 +++++----- source/isaaclab/isaaclab/envs/utils/io_descriptors.py | 2 +- source/isaaclab/isaaclab/managers/action_manager.py | 2 +- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/source/isaaclab/isaaclab/envs/mdp/observations.py b/source/isaaclab/isaaclab/envs/mdp/observations.py index a9cec5b5903..daa78d3db1f 100644 --- a/source/isaaclab/isaaclab/envs/mdp/observations.py +++ b/source/isaaclab/isaaclab/envs/mdp/observations.py @@ -184,7 +184,7 @@ def body_projected_gravity_b( """ -@generic_io_descriptor(observation_type="JointState", on_inspect=[record_joint_names, record_dtype, record_shape]) +@generic_io_descriptor(observation_type="JointState", on_inspect=[record_joint_names, record_dtype, record_shape], units="rad") def joint_pos(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg("robot")) -> torch.Tensor: """The joint positions of the asset. @@ -195,7 +195,7 @@ def joint_pos(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg(" return asset.data.joint_pos[:, asset_cfg.joint_ids] -@generic_io_descriptor(observation_type="JointState", on_inspect=[record_joint_names, record_dtype, record_shape]) +@generic_io_descriptor(observation_type="JointState", on_inspect=[record_joint_names, record_dtype, record_shape], units="rad") def joint_pos_rel(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg("robot")) -> torch.Tensor: """The joint positions of the asset w.r.t. the default joint positions. @@ -223,7 +223,7 @@ def joint_pos_limit_normalized( ) -@generic_io_descriptor(observation_type="JointState", on_inspect=[record_joint_names, record_dtype, record_shape]) +@generic_io_descriptor(observation_type="JointState", on_inspect=[record_joint_names, record_dtype, record_shape], units="rad/s") def joint_vel(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg("robot")): """The joint velocities of the asset. @@ -234,7 +234,7 @@ def joint_vel(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg(" return asset.data.joint_vel[:, asset_cfg.joint_ids] -@generic_io_descriptor(observation_type="JointState", on_inspect=[record_joint_names, record_dtype, record_shape]) +@generic_io_descriptor(observation_type="JointState", on_inspect=[record_joint_names, record_dtype, record_shape], units="rad/s") def joint_vel_rel(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg("robot")): """The joint velocities of the asset w.r.t. the default joint velocities. @@ -245,7 +245,7 @@ def joint_vel_rel(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityC return asset.data.joint_vel[:, asset_cfg.joint_ids] - asset.data.default_joint_vel[:, asset_cfg.joint_ids] -@generic_io_descriptor(observation_type="JointState", on_inspect=[record_joint_names, record_dtype, record_shape]) +@generic_io_descriptor(observation_type="JointState", on_inspect=[record_joint_names, record_dtype, record_shape], units="N.m") def joint_effort(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg("robot")) -> torch.Tensor: """The joint applied effort of the robot. diff --git a/source/isaaclab/isaaclab/envs/utils/io_descriptors.py b/source/isaaclab/isaaclab/envs/utils/io_descriptors.py index 6b17ba644aa..2a1573cfa14 100644 --- a/source/isaaclab/isaaclab/envs/utils/io_descriptors.py +++ b/source/isaaclab/isaaclab/envs/utils/io_descriptors.py @@ -152,7 +152,7 @@ def wrapper(env: ManagerBasedEnv, *args: P.args, **kwargs: P.kwargs) -> R: descriptor.dtype = str(descriptor.dtype) # Check if description is set in the descriptor if descriptor.description is None: - descriptor.description = func.__doc__ + descriptor.description = " ".join(func.__doc__.split()) # Adds the descriptor to the wrapped function as an attribute wrapper._descriptor = descriptor diff --git a/source/isaaclab/isaaclab/managers/action_manager.py b/source/isaaclab/isaaclab/managers/action_manager.py index 2f994686e5b..7d2f6f1bd03 100644 --- a/source/isaaclab/isaaclab/managers/action_manager.py +++ b/source/isaaclab/isaaclab/managers/action_manager.py @@ -99,7 +99,7 @@ def IO_descriptor(self) -> GenericActionIODescriptor: """The IO descriptor for the action term.""" self._IO_descriptor.name = re.sub(r"([a-z])([A-Z])", r"\1_\2", self.__class__.__name__).lower() self._IO_descriptor.full_path = f"{self.__class__.__module__}.{self.__class__.__name__}" - self._IO_descriptor.description = self.__class__.__doc__ + self._IO_descriptor.description = " ".join(self.__class__.__doc__.split()) return self._IO_descriptor """ From a02831eb9d4cc8f05a2582e7247fdbfb32bd67ad Mon Sep 17 00:00:00 2001 From: Antoine Richard Date: Wed, 18 Jun 2025 12:22:41 +0200 Subject: [PATCH 15/16] happy pre-commits --- scripts/environments/export_IODescriptors.py | 4 +--- .../isaaclab/envs/mdp/observations.py | 20 ++++++++++++++----- .../isaaclab/envs/utils/io_descriptors.py | 17 ++++++++++++---- 3 files changed, 29 insertions(+), 12 deletions(-) diff --git a/scripts/environments/export_IODescriptors.py b/scripts/environments/export_IODescriptors.py index b163d4faee8..0dea97b33b8 100644 --- a/scripts/environments/export_IODescriptors.py +++ b/scripts/environments/export_IODescriptors.py @@ -38,9 +38,7 @@ def main(): """Random actions agent with Isaac Lab environment.""" # create environment configuration - env_cfg = parse_env_cfg( - args_cli.task, device=args_cli.device, num_envs=1, use_fabric=True - ) + env_cfg = parse_env_cfg(args_cli.task, device=args_cli.device, num_envs=1, use_fabric=True) # create environment env = gym.make(args_cli.task, cfg=env_cfg) diff --git a/source/isaaclab/isaaclab/envs/mdp/observations.py b/source/isaaclab/isaaclab/envs/mdp/observations.py index daa78d3db1f..5e1ffbdbec0 100644 --- a/source/isaaclab/isaaclab/envs/mdp/observations.py +++ b/source/isaaclab/isaaclab/envs/mdp/observations.py @@ -184,7 +184,9 @@ def body_projected_gravity_b( """ -@generic_io_descriptor(observation_type="JointState", on_inspect=[record_joint_names, record_dtype, record_shape], units="rad") +@generic_io_descriptor( + observation_type="JointState", on_inspect=[record_joint_names, record_dtype, record_shape], units="rad" +) def joint_pos(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg("robot")) -> torch.Tensor: """The joint positions of the asset. @@ -195,7 +197,9 @@ def joint_pos(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg(" return asset.data.joint_pos[:, asset_cfg.joint_ids] -@generic_io_descriptor(observation_type="JointState", on_inspect=[record_joint_names, record_dtype, record_shape], units="rad") +@generic_io_descriptor( + observation_type="JointState", on_inspect=[record_joint_names, record_dtype, record_shape], units="rad" +) def joint_pos_rel(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg("robot")) -> torch.Tensor: """The joint positions of the asset w.r.t. the default joint positions. @@ -223,7 +227,9 @@ def joint_pos_limit_normalized( ) -@generic_io_descriptor(observation_type="JointState", on_inspect=[record_joint_names, record_dtype, record_shape], units="rad/s") +@generic_io_descriptor( + observation_type="JointState", on_inspect=[record_joint_names, record_dtype, record_shape], units="rad/s" +) def joint_vel(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg("robot")): """The joint velocities of the asset. @@ -234,7 +240,9 @@ def joint_vel(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg(" return asset.data.joint_vel[:, asset_cfg.joint_ids] -@generic_io_descriptor(observation_type="JointState", on_inspect=[record_joint_names, record_dtype, record_shape], units="rad/s") +@generic_io_descriptor( + observation_type="JointState", on_inspect=[record_joint_names, record_dtype, record_shape], units="rad/s" +) def joint_vel_rel(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg("robot")): """The joint velocities of the asset w.r.t. the default joint velocities. @@ -245,7 +253,9 @@ def joint_vel_rel(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityC return asset.data.joint_vel[:, asset_cfg.joint_ids] - asset.data.default_joint_vel[:, asset_cfg.joint_ids] -@generic_io_descriptor(observation_type="JointState", on_inspect=[record_joint_names, record_dtype, record_shape], units="N.m") +@generic_io_descriptor( + observation_type="JointState", on_inspect=[record_joint_names, record_dtype, record_shape], units="N.m" +) def joint_effort(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg("robot")) -> torch.Tensor: """The joint applied effort of the robot. diff --git a/source/isaaclab/isaaclab/envs/utils/io_descriptors.py b/source/isaaclab/isaaclab/envs/utils/io_descriptors.py index 2a1573cfa14..4746acc4da8 100644 --- a/source/isaaclab/isaaclab/envs/utils/io_descriptors.py +++ b/source/isaaclab/isaaclab/envs/utils/io_descriptors.py @@ -1,16 +1,24 @@ +# Copyright (c) 2022-2025, The Isaac Lab Project Developers (https://github.com/isaac-sim/IsaacLab/blob/main/CONTRIBUTORS.md). +# All rights reserved. +# +# SPDX-License-Identifier: BSD-3-Clause + from __future__ import annotations -from isaaclab.utils import configclass from collections.abc import Callable -from typing import Any, Concatenate, ParamSpec, TypeVar, TYPE_CHECKING +from typing import TYPE_CHECKING, Any, Concatenate, ParamSpec, TypeVar + +from isaaclab.utils import configclass if TYPE_CHECKING: from isaaclab.envs import ManagerBasedEnv + from isaaclab.assets.articulation import Articulation import torch +import dataclasses import functools import inspect -import dataclasses + @configclass class GenericActionIODescriptor: @@ -23,6 +31,7 @@ class GenericActionIODescriptor: action_type: str = None extras: dict[str, Any] = {} + @configclass class GenericIODescriptor: mdp_type: str = "Observation" @@ -189,4 +198,4 @@ def record_body_names(output: torch.Tensor, descriptor: GenericIODescriptor, **k body_ids = kwargs["asset_cfg"].body_ids if body_ids == slice(None, None, None): body_ids = list(range(len(asset.body_names))) - descriptor.body_names = [asset.body_names[i] for i in body_ids] \ No newline at end of file + descriptor.body_names = [asset.body_names[i] for i in body_ids] From d07d009475a9d16d9468114da35b637e4b19035b Mon Sep 17 00:00:00 2001 From: Antoine Richard Date: Tue, 8 Jul 2025 10:37:53 +0200 Subject: [PATCH 16/16] added FIXME --- source/isaaclab/isaaclab/envs/mdp/actions/joint_actions.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/source/isaaclab/isaaclab/envs/mdp/actions/joint_actions.py b/source/isaaclab/isaaclab/envs/mdp/actions/joint_actions.py index 8341c59184f..cb5966c7a3b 100644 --- a/source/isaaclab/isaaclab/envs/mdp/actions/joint_actions.py +++ b/source/isaaclab/isaaclab/envs/mdp/actions/joint_actions.py @@ -137,7 +137,8 @@ def IO_descriptor(self) -> GenericActionIODescriptor: self._IO_descriptor.offset = self._offset[0].detach().cpu().numpy().tolist() else: self._IO_descriptor.offset = self._offset - if self.cfg.clip is not None: + #FIXME: This is not correct. Add list support. + if self.cfg.clip is not None: self._IO_descriptor.clip = self._clip else: self._IO_descriptor.clip = None