From 7b241989e97d915e512200d1c902d939d018bafb Mon Sep 17 00:00:00 2001 From: URJala Date: Tue, 4 Mar 2025 09:25:31 +0000 Subject: [PATCH 01/20] Revert "Apply renaming of member variables of JTC (#1275)" This reverts commit 341506a363eb089d235a0646824bb04eb106f47d. --- .../src/scaled_joint_trajectory_controller.cpp | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/ur_controllers/src/scaled_joint_trajectory_controller.cpp b/ur_controllers/src/scaled_joint_trajectory_controller.cpp index bdc935b0d..2f1a08b45 100644 --- a/ur_controllers/src/scaled_joint_trajectory_controller.cpp +++ b/ur_controllers/src/scaled_joint_trajectory_controller.cpp @@ -178,8 +178,8 @@ controller_interface::return_type ScaledJointTrajectoryController::update(const if (!before_last_point && !rt_is_holding_ && cmd_timeout_ > 0.0 && time_difference > cmd_timeout_) { RCLCPP_WARN(logger, "Aborted due to command timeout"); - new_trajectory_msg_.reset(); - new_trajectory_msg_.initRT(set_hold_position()); + traj_msg_external_point_ptr_.reset(); + traj_msg_external_point_ptr_.initRT(set_hold_position()); } // Check state/goal tolerance @@ -313,8 +313,8 @@ controller_interface::return_type ScaledJointTrajectoryController::update(const RCLCPP_WARN(logger, "%s", error_string.c_str()); - new_trajectory_msg_.reset(); - new_trajectory_msg_.initRT(set_hold_position()); + traj_msg_external_point_ptr_.reset(); + traj_msg_external_point_ptr_.initRT(set_hold_position()); } } } else if (tolerance_violated_while_moving && !rt_has_pending_goal_) { @@ -326,8 +326,8 @@ controller_interface::return_type ScaledJointTrajectoryController::update(const } else if (!before_last_point && !within_goal_time && !rt_has_pending_goal_) { RCLCPP_ERROR(logger, "Exceeded goal_time_tolerance: holding position..."); - new_trajectory_msg_.reset(); - new_trajectory_msg_.initRT(set_hold_position()); + traj_msg_external_point_ptr_.reset(); + traj_msg_external_point_ptr_.initRT(set_hold_position()); } // else, run another cycle while waiting for outside_goal_tolerance // to be satisfied (will stay in this state until new message arrives) From d8b02acecbe5baec5cdd198009cde9b617100233 Mon Sep 17 00:00:00 2001 From: URJala Date: Fri, 14 Mar 2025 07:41:43 +0000 Subject: [PATCH 02/20] Reapply "Apply renaming of member variables of JTC (#1275)" This reverts commit 99592214e10b58705c593e9b643c6f0d63cedca7. --- .../src/scaled_joint_trajectory_controller.cpp | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/ur_controllers/src/scaled_joint_trajectory_controller.cpp b/ur_controllers/src/scaled_joint_trajectory_controller.cpp index 2f1a08b45..bdc935b0d 100644 --- a/ur_controllers/src/scaled_joint_trajectory_controller.cpp +++ b/ur_controllers/src/scaled_joint_trajectory_controller.cpp @@ -178,8 +178,8 @@ controller_interface::return_type ScaledJointTrajectoryController::update(const if (!before_last_point && !rt_is_holding_ && cmd_timeout_ > 0.0 && time_difference > cmd_timeout_) { RCLCPP_WARN(logger, "Aborted due to command timeout"); - traj_msg_external_point_ptr_.reset(); - traj_msg_external_point_ptr_.initRT(set_hold_position()); + new_trajectory_msg_.reset(); + new_trajectory_msg_.initRT(set_hold_position()); } // Check state/goal tolerance @@ -313,8 +313,8 @@ controller_interface::return_type ScaledJointTrajectoryController::update(const RCLCPP_WARN(logger, "%s", error_string.c_str()); - traj_msg_external_point_ptr_.reset(); - traj_msg_external_point_ptr_.initRT(set_hold_position()); + new_trajectory_msg_.reset(); + new_trajectory_msg_.initRT(set_hold_position()); } } } else if (tolerance_violated_while_moving && !rt_has_pending_goal_) { @@ -326,8 +326,8 @@ controller_interface::return_type ScaledJointTrajectoryController::update(const } else if (!before_last_point && !within_goal_time && !rt_has_pending_goal_) { RCLCPP_ERROR(logger, "Exceeded goal_time_tolerance: holding position..."); - traj_msg_external_point_ptr_.reset(); - traj_msg_external_point_ptr_.initRT(set_hold_position()); + new_trajectory_msg_.reset(); + new_trajectory_msg_.initRT(set_hold_position()); } // else, run another cycle while waiting for outside_goal_tolerance // to be satisfied (will stay in this state until new message arrives) From fa6b324973dd8b9ba2056d927aa24603cd3aa934 Mon Sep 17 00:00:00 2001 From: URJala Date: Thu, 9 Jan 2025 12:59:17 +0000 Subject: [PATCH 03/20] Running integration tests with mock hardware Added some launch arguments and some checks to avoid connecting to the dashboard interface when using mock hardware, as that will not work. Maybe not the most elegant, but it works for now. Currently passthrough controller does not work at all with mock hardware, and is bypassed if using that in the test. test_trajectory_scaled_aborts_on_violation fails, as the hardware doesnt abort. test_set_io also fails as the controller cant verify that a pin has been set. --- ur_robot_driver/launch/ur_control.launch.py | 3 +++ ur_robot_driver/test/robot_driver.py | 24 +++++++++++++++------ ur_robot_driver/test/test_common.py | 6 ++++-- ur_robot_driver/urdf/ur.ros2_control.xacro | 4 +++- 4 files changed, 28 insertions(+), 9 deletions(-) diff --git a/ur_robot_driver/launch/ur_control.launch.py b/ur_robot_driver/launch/ur_control.launch.py index 528da2ee7..adb8bc39d 100644 --- a/ur_robot_driver/launch/ur_control.launch.py +++ b/ur_robot_driver/launch/ur_control.launch.py @@ -195,6 +195,9 @@ def controller_spawner(controllers, active=True): if activate_joint_controller.perform(context) == "true": controllers_active.append(initial_joint_controller.perform(context)) controllers_inactive.remove(initial_joint_controller.perform(context)) + + if use_mock_hardware.perform(context) == "true": + controllers_active.remove("tcp_pose_broadcaster") if use_mock_hardware.perform(context) == "true": controllers_active.remove("tcp_pose_broadcaster") diff --git a/ur_robot_driver/test/robot_driver.py b/ur_robot_driver/test/robot_driver.py index e07462772..5ee7838eb 100755 --- a/ur_robot_driver/test/robot_driver.py +++ b/ur_robot_driver/test/robot_driver.py @@ -56,16 +56,22 @@ ) TIMEOUT_EXECUTE_TRAJECTORY = 30 +MOCK_HARDWARE = False + +def set_mock_hardware_flag(use_mock_hardware): + return use_mock_hardware=="true" @pytest.mark.launch_test @launch_testing.parametrize( - "tf_prefix", - [(""), ("my_ur_")], + "tf_prefix, use_mock_hardware, mock_sensor_commands", + [("", "false", "false"), ("my_ur_", "false", "false"), ("", "true", "true")] ) -def generate_test_description(tf_prefix): - return generate_driver_test_description(tf_prefix=tf_prefix) +def generate_test_description(tf_prefix, use_mock_hardware, mock_sensor_commands): + global MOCK_HARDWARE + MOCK_HARDWARE = set_mock_hardware_flag(use_mock_hardware) + return generate_driver_test_description(tf_prefix=tf_prefix, use_mock_hardware=use_mock_hardware, mock_sensor_commands=mock_sensor_commands) class RobotDriverTest(unittest.TestCase): @classmethod @@ -83,7 +89,10 @@ def tearDownClass(cls): rclpy.shutdown() def init_robot(self): - self._dashboard_interface = DashboardInterface(self.node) + if(not MOCK_HARDWARE): + self._dashboard_interface = DashboardInterface(self.node) + else: + self._dashboard_interface = None self._controller_manager_interface = ControllerManagerInterface(self.node) self._io_status_controller_interface = IoStatusInterface(self.node) self._configuration_controller_interface = ConfigurationInterface(self.node) @@ -100,7 +109,8 @@ def init_robot(self): ) def setUp(self): - self._dashboard_interface.start_robot() + if(self._dashboard_interface): + self._dashboard_interface.start_robot() time.sleep(1) self.assertTrue(self._io_status_controller_interface.resend_robot_program().success) @@ -372,6 +382,8 @@ def js_cb(msg): # self.node.get_logger().info("Received result GOAL_TOLERANCE_VIOLATED") def test_passthrough_trajectory(self, tf_prefix): + if(MOCK_HARDWARE): + return True self.assertTrue( self._controller_manager_interface.switch_controller( strictness=SwitchController.Request.BEST_EFFORT, diff --git a/ur_robot_driver/test/test_common.py b/ur_robot_driver/test/test_common.py index 215f8ff55..96302d0ab 100644 --- a/ur_robot_driver/test/test_common.py +++ b/ur_robot_driver/test/test_common.py @@ -373,8 +373,8 @@ def generate_dashboard_test_description(): def generate_driver_test_description( tf_prefix="", initial_joint_controller="scaled_joint_trajectory_controller", - controller_spawner_timeout=TIMEOUT_WAIT_SERVICE_INITIAL, -): + controller_spawner_timeout=TIMEOUT_WAIT_SERVICE_INITIAL, use_mock_hardware="false", mock_sensor_commands="false", + ): ur_type = LaunchConfiguration("ur_type") launch_arguments = { @@ -386,6 +386,8 @@ def generate_driver_test_description( "headless_mode": "true", "launch_dashboard_client": "true", "start_joint_controller": "false", + "use_mock_hardware": use_mock_hardware, + "mock_sensor_commands": mock_sensor_commands, } if tf_prefix: launch_arguments["tf_prefix"] = tf_prefix diff --git a/ur_robot_driver/urdf/ur.ros2_control.xacro b/ur_robot_driver/urdf/ur.ros2_control.xacro index f32805273..b480434b0 100644 --- a/ur_robot_driver/urdf/ur.ros2_control.xacro +++ b/ur_robot_driver/urdf/ur.ros2_control.xacro @@ -300,7 +300,9 @@ - + + 1 + From bdb462337126466e9a05d8efb16e775218fec119 Mon Sep 17 00:00:00 2001 From: URJala Date: Fri, 14 Mar 2025 15:26:43 +0000 Subject: [PATCH 04/20] Robot driver tests pass with mock hardware --- ur_robot_driver/test/robot_driver.py | 28 ++++++++++++++-------------- ur_robot_driver/test/test_common.py | 6 +++--- 2 files changed, 17 insertions(+), 17 deletions(-) diff --git a/ur_robot_driver/test/robot_driver.py b/ur_robot_driver/test/robot_driver.py index 5ee7838eb..05e47efb7 100755 --- a/ur_robot_driver/test/robot_driver.py +++ b/ur_robot_driver/test/robot_driver.py @@ -56,30 +56,26 @@ ) TIMEOUT_EXECUTE_TRAJECTORY = 30 -MOCK_HARDWARE = False - -def set_mock_hardware_flag(use_mock_hardware): - return use_mock_hardware=="true" @pytest.mark.launch_test @launch_testing.parametrize( - "tf_prefix, use_mock_hardware, mock_sensor_commands", - [("", "false", "false"), ("my_ur_", "false", "false"), ("", "true", "true")] + "tf_prefix, use_mock_hardware", [("", "false"), ("my_ur_", "false"), ("", "true")] ) +def generate_test_description(tf_prefix, use_mock_hardware): + return generate_driver_test_description( + tf_prefix=tf_prefix, use_mock_hardware=use_mock_hardware + ) -def generate_test_description(tf_prefix, use_mock_hardware, mock_sensor_commands): - global MOCK_HARDWARE - MOCK_HARDWARE = set_mock_hardware_flag(use_mock_hardware) - return generate_driver_test_description(tf_prefix=tf_prefix, use_mock_hardware=use_mock_hardware, mock_sensor_commands=mock_sensor_commands) class RobotDriverTest(unittest.TestCase): @classmethod - def setUpClass(cls): + def setUpClass(cls, use_mock_hardware): # Initialize the ROS context rclpy.init() cls.node = Node("robot_driver_test") time.sleep(1) + cls.mock_hardware = use_mock_hardware == "true" cls.init_robot(cls) @classmethod @@ -89,7 +85,7 @@ def tearDownClass(cls): rclpy.shutdown() def init_robot(self): - if(not MOCK_HARDWARE): + if not self.mock_hardware: self._dashboard_interface = DashboardInterface(self.node) else: self._dashboard_interface = None @@ -109,7 +105,7 @@ def init_robot(self): ) def setUp(self): - if(self._dashboard_interface): + if self._dashboard_interface: self._dashboard_interface.start_robot() time.sleep(1) self.assertTrue(self._io_status_controller_interface.resend_robot_program().success) @@ -149,6 +145,8 @@ def test_start_passthrough_controller(self): def test_set_io(self): """Test to set an IO and check whether it has been set.""" + if self.mock_hardware: + return True # Create io callback to verify result io_msg = None @@ -303,6 +301,8 @@ def test_trajectory_scaled(self, tf_prefix): def test_trajectory_scaled_aborts_on_violation(self, tf_prefix): """Test that the robot correctly aborts the trajectory when the constraints are violated.""" + if self.mock_hardware: + return True # Construct test trajectory test_trajectory = [ (Duration(sec=6, nanosec=0), [0.0 for j in ROBOT_JOINTS]), @@ -382,7 +382,7 @@ def js_cb(msg): # self.node.get_logger().info("Received result GOAL_TOLERANCE_VIOLATED") def test_passthrough_trajectory(self, tf_prefix): - if(MOCK_HARDWARE): + if self.mock_hardware: return True self.assertTrue( self._controller_manager_interface.switch_controller( diff --git a/ur_robot_driver/test/test_common.py b/ur_robot_driver/test/test_common.py index 96302d0ab..d683104cd 100644 --- a/ur_robot_driver/test/test_common.py +++ b/ur_robot_driver/test/test_common.py @@ -373,8 +373,8 @@ def generate_dashboard_test_description(): def generate_driver_test_description( tf_prefix="", initial_joint_controller="scaled_joint_trajectory_controller", - controller_spawner_timeout=TIMEOUT_WAIT_SERVICE_INITIAL, use_mock_hardware="false", mock_sensor_commands="false", - ): + controller_spawner_timeout=TIMEOUT_WAIT_SERVICE_INITIAL, use_mock_hardware="false", +): ur_type = LaunchConfiguration("ur_type") launch_arguments = { @@ -387,7 +387,7 @@ def generate_driver_test_description( "launch_dashboard_client": "true", "start_joint_controller": "false", "use_mock_hardware": use_mock_hardware, - "mock_sensor_commands": mock_sensor_commands, + "mock_sensor_commands": use_mock_hardware, } if tf_prefix: launch_arguments["tf_prefix"] = tf_prefix From a6bd14015e82f1b9eceb8ef6111458ed6c187aba Mon Sep 17 00:00:00 2001 From: URJala Date: Fri, 14 Mar 2025 15:27:25 +0000 Subject: [PATCH 05/20] something got duplicated, removing --- ur_robot_driver/launch/ur_control.launch.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/ur_robot_driver/launch/ur_control.launch.py b/ur_robot_driver/launch/ur_control.launch.py index adb8bc39d..528da2ee7 100644 --- a/ur_robot_driver/launch/ur_control.launch.py +++ b/ur_robot_driver/launch/ur_control.launch.py @@ -195,9 +195,6 @@ def controller_spawner(controllers, active=True): if activate_joint_controller.perform(context) == "true": controllers_active.append(initial_joint_controller.perform(context)) controllers_inactive.remove(initial_joint_controller.perform(context)) - - if use_mock_hardware.perform(context) == "true": - controllers_active.remove("tcp_pose_broadcaster") if use_mock_hardware.perform(context) == "true": controllers_active.remove("tcp_pose_broadcaster") From 29fd56cc6f1cef65ad5972dd9f45f9d16821158a Mon Sep 17 00:00:00 2001 From: URJala Date: Tue, 1 Apr 2025 08:38:16 +0000 Subject: [PATCH 06/20] Split passthrough controller tests to its own file. And into separate test cases. Moved timout for trajectory execution to common file --- ...integration_test_passthrough_controller.py | 267 ++++++++++++++++++ ur_robot_driver/test/robot_driver.py | 20 +- ur_robot_driver/test/test_common.py | 2 + 3 files changed, 270 insertions(+), 19 deletions(-) create mode 100644 ur_robot_driver/test/integration_test_passthrough_controller.py diff --git a/ur_robot_driver/test/integration_test_passthrough_controller.py b/ur_robot_driver/test/integration_test_passthrough_controller.py new file mode 100644 index 000000000..a74f860fb --- /dev/null +++ b/ur_robot_driver/test/integration_test_passthrough_controller.py @@ -0,0 +1,267 @@ +#!/usr/bin/env python +# Copyright 2019, Universal Robots A/S +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# +# * Redistributions in binary form must reproduce the above copyright +# notice, this list of conditions and the following disclaimer in the +# documentation and/or other materials provided with the distribution. +# +# * Neither the name of the {copyright_holder} nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE +# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +# POSSIBILITY OF SUCH DAMAGE. + +import os +import sys +import time +import unittest + +from math import pi +import launch_testing +import pytest +import rclpy +from builtin_interfaces.msg import Duration +from control_msgs.action import FollowJointTrajectory +from control_msgs.msg import JointTolerance +from controller_manager_msgs.srv import SwitchController +from rclpy.node import Node +from trajectory_msgs.msg import JointTrajectory, JointTrajectoryPoint + +sys.path.append(os.path.dirname(__file__)) +from test_common import ( # noqa: E402 + ActionInterface, + ControllerManagerInterface, + DashboardInterface, + IoStatusInterface, + ConfigurationInterface, + generate_driver_test_description, + ROBOT_JOINTS, + TIMEOUT_EXECUTE_TRAJECTORY, +) + + +# Mock hardware does not work with passthrough controller, so dont test with it +@pytest.mark.launch_test +@launch_testing.parametrize("tf_prefix", [(""), ("my_ur_")]) +def generate_test_description(tf_prefix): + return generate_driver_test_description(tf_prefix=tf_prefix) + + +HOME = { + "elbow_joint": 0.0, + "shoulder_lift_joint": -1.5708, + "shoulder_pan_joint": 0.0, + "wrist_1_joint": -1.5708, + "wrist_2_joint": 0.0, + "wrist_3_joint": 0.0, +} +waypts = [[HOME[joint] + i * pi / 4 for joint in ROBOT_JOINTS] for i in [0, -1, 1]] +time_vec = [ + Duration(sec=4, nanosec=0), + Duration(sec=8, nanosec=0), + Duration(sec=12, nanosec=0), +] +TEST_TRAJECTORY = zip(time_vec, waypts) + + +class RobotDriverTest(unittest.TestCase): + @classmethod + def setUpClass(cls, use_mock_hardware): + # Initialize the ROS context + rclpy.init() + cls.node = Node("robot_driver_test") + time.sleep(1) + cls.init_robot(cls) + + @classmethod + def tearDownClass(cls): + # Shutdown the ROS context + cls.node.destroy_node() + rclpy.shutdown() + + def init_robot(self): + + self._dashboard_interface = DashboardInterface(self.node) + + self._controller_manager_interface = ControllerManagerInterface(self.node) + self._io_status_controller_interface = IoStatusInterface(self.node) + self._configuration_controller_interface = ConfigurationInterface(self.node) + + self._scaled_follow_joint_trajectory = ActionInterface( + self.node, + "/scaled_joint_trajectory_controller/follow_joint_trajectory", + FollowJointTrajectory, + ) + self._passthrough_forward_joint_trajectory = ActionInterface( + self.node, + "/passthrough_trajectory_controller/follow_joint_trajectory", + FollowJointTrajectory, + ) + + def setUp(self): + if self._dashboard_interface: + self._dashboard_interface.start_robot() + time.sleep(1) + self.assertTrue(self._io_status_controller_interface.resend_robot_program().success) + + # + # Test functions + # + + def test_start_passthrough_controller(self): + self.assertTrue( + self._controller_manager_interface.switch_controller( + strictness=SwitchController.Request.BEST_EFFORT, + activate_controllers=["passthrough_trajectory_controller"], + deactivate_controllers=["scaled_joint_trajectory_controller"], + ).ok + ) + self.assertTrue( + self._controller_manager_interface.switch_controller( + strictness=SwitchController.Request.BEST_EFFORT, + deactivate_controllers=["passthrough_trajectory_controller"], + activate_controllers=["scaled_joint_trajectory_controller"], + ).ok + ) + + def test_passthrough_trajectory(self, tf_prefix): + self.assertTrue( + self._controller_manager_interface.switch_controller( + strictness=SwitchController.Request.BEST_EFFORT, + activate_controllers=["passthrough_trajectory_controller"], + deactivate_controllers=["scaled_joint_trajectory_controller"], + ).ok + ) + + goal_tolerance = [ + JointTolerance(position=0.01, name=tf_prefix + joint) for joint in ROBOT_JOINTS + ] + goal_time_tolerance = Duration(sec=1, nanosec=0) + trajectory = JointTrajectory( + points=[ + JointTrajectoryPoint(positions=pos, time_from_start=times) + for (times, pos) in TEST_TRAJECTORY + ], + joint_names=[tf_prefix + joint for joint in ROBOT_JOINTS], + ) + goal_handle = self._passthrough_forward_joint_trajectory.send_goal( + trajectory=trajectory, + goal_time_tolerance=goal_time_tolerance, + goal_tolerance=goal_tolerance, + ) + self.assertTrue(goal_handle.accepted) + if goal_handle.accepted: + result = self._passthrough_forward_joint_trajectory.get_result( + goal_handle, TIMEOUT_EXECUTE_TRAJECTORY + ) + self.assertEqual(result.error_code, FollowJointTrajectory.Result.SUCCESSFUL) + + def test_quintic_trajectory(self, tf_prefix): + # Full quintic trajectory + + trajectory = JointTrajectory( + points=[ + JointTrajectoryPoint( + positions=pos, + time_from_start=times, + velocities=[0.0, 0.0, 0.0, 0.0, 0.0, 0.0], + accelerations=[0.0, 0.0, 0.0, 0.0, 0.0, 0.0], + ) + for (times, pos) in TEST_TRAJECTORY + ], + joint_names=[tf_prefix + joint for joint in ROBOT_JOINTS], + ) + goal_time_tolerance = Duration(sec=1, nanosec=0) + goal_tolerance = [ + JointTolerance(position=0.01, name=tf_prefix + joint) for joint in ROBOT_JOINTS + ] + goal_handle = self._passthrough_forward_joint_trajectory.send_goal( + trajectory=trajectory, + goal_time_tolerance=goal_time_tolerance, + goal_tolerance=goal_tolerance, + ) + + self.assertTrue(goal_handle.accepted) + if goal_handle.accepted: + result = self._passthrough_forward_joint_trajectory.get_result( + goal_handle, TIMEOUT_EXECUTE_TRAJECTORY + ) + self.assertEqual(result.error_code, FollowJointTrajectory.Result.SUCCESSFUL) + + def test_impossible_goal_tolerance_fails(self, tf_prefix): + # Test impossible goal tolerance, should fail. + + trajectory = JointTrajectory( + points=[ + JointTrajectoryPoint(positions=pos, time_from_start=times) + for (times, pos) in TEST_TRAJECTORY + ], + joint_names=[tf_prefix + joint for joint in ROBOT_JOINTS], + ) + goal_tolerance = [ + JointTolerance(position=0.000000001, name=tf_prefix + joint) for joint in ROBOT_JOINTS + ] + goal_time_tolerance = Duration(sec=1, nanosec=0) + goal_handle = self._passthrough_forward_joint_trajectory.send_goal( + trajectory=trajectory, + goal_time_tolerance=goal_time_tolerance, + goal_tolerance=goal_tolerance, + ) + self.assertTrue(goal_handle.accepted) + if goal_handle.accepted: + result = self._passthrough_forward_joint_trajectory.get_result( + goal_handle, TIMEOUT_EXECUTE_TRAJECTORY + ) + self.assertEqual( + result.error_code, FollowJointTrajectory.Result.GOAL_TOLERANCE_VIOLATED + ) + + def test_impossible_goal_time_tolerance_fails(self, tf_prefix): + # Test impossible goal time + goal_tolerance = [ + JointTolerance(position=0.01, name=tf_prefix + joint) for joint in ROBOT_JOINTS + ] + goal_time_tolerance = Duration(sec=0, nanosec=10) + trajectory = JointTrajectory( + points=[ + JointTrajectoryPoint(positions=pos, time_from_start=times) + for (times, pos) in TEST_TRAJECTORY + ], + joint_names=[tf_prefix + joint for joint in ROBOT_JOINTS], + ) + goal_handle = self._passthrough_forward_joint_trajectory.send_goal( + trajectory=trajectory, + goal_time_tolerance=goal_time_tolerance, + goal_tolerance=goal_tolerance, + ) + self.assertTrue(goal_handle.accepted) + if goal_handle.accepted: + result = self._passthrough_forward_joint_trajectory.get_result( + goal_handle, TIMEOUT_EXECUTE_TRAJECTORY + ) + self.assertEqual( + result.error_code, FollowJointTrajectory.Result.GOAL_TOLERANCE_VIOLATED + ) + self.assertTrue( + self._controller_manager_interface.switch_controller( + strictness=SwitchController.Request.BEST_EFFORT, + deactivate_controllers=["passthrough_trajectory_controller"], + activate_controllers=["scaled_joint_trajectory_controller"], + ).ok + ) diff --git a/ur_robot_driver/test/robot_driver.py b/ur_robot_driver/test/robot_driver.py index 05e47efb7..388a75704 100755 --- a/ur_robot_driver/test/robot_driver.py +++ b/ur_robot_driver/test/robot_driver.py @@ -37,7 +37,6 @@ import rclpy from builtin_interfaces.msg import Duration from control_msgs.action import FollowJointTrajectory -from control_msgs.msg import JointTolerance from controller_manager_msgs.srv import SwitchController from rclpy.node import Node from sensor_msgs.msg import JointState @@ -53,10 +52,9 @@ ConfigurationInterface, generate_driver_test_description, ROBOT_JOINTS, + TIMEOUT_EXECUTE_TRAJECTORY, ) -TIMEOUT_EXECUTE_TRAJECTORY = 30 - @pytest.mark.launch_test @launch_testing.parametrize( @@ -127,22 +125,6 @@ def test_start_scaled_jtc_controller(self): ).ok ) - def test_start_passthrough_controller(self): - self.assertTrue( - self._controller_manager_interface.switch_controller( - strictness=SwitchController.Request.BEST_EFFORT, - activate_controllers=["passthrough_trajectory_controller"], - deactivate_controllers=["scaled_joint_trajectory_controller"], - ).ok - ) - self.assertTrue( - self._controller_manager_interface.switch_controller( - strictness=SwitchController.Request.BEST_EFFORT, - deactivate_controllers=["passthrough_trajectory_controller"], - activate_controllers=["scaled_joint_trajectory_controller"], - ).ok - ) - def test_set_io(self): """Test to set an IO and check whether it has been set.""" if self.mock_hardware: diff --git a/ur_robot_driver/test/test_common.py b/ur_robot_driver/test/test_common.py index d683104cd..ce53b268c 100644 --- a/ur_robot_driver/test/test_common.py +++ b/ur_robot_driver/test/test_common.py @@ -63,6 +63,8 @@ TIMEOUT_WAIT_SERVICE_INITIAL = 120 # If we download the docker image simultaneously to the tests, it can take quite some time until the dashboard server is reachable and usable. TIMEOUT_WAIT_ACTION = 10 +TIMEOUT_EXECUTE_TRAJECTORY = 30 + ROBOT_JOINTS = [ "elbow_joint", "shoulder_lift_joint", From 92f33364f2b37bae93dc0b3da4eeb12085aac7de Mon Sep 17 00:00:00 2001 From: URJala Date: Tue, 8 Apr 2025 12:45:19 +0000 Subject: [PATCH 07/20] Split integration tests out to one file per controller --- .../integration_test_config_controller.py | 111 +++++++++++ .../integration_test_controller_switch.py | 6 +- .../test/integration_test_io_controller.py | 158 +++++++++++++++ ...integration_test_passthrough_controller.py | 187 +++++++++--------- ...tegration_test_scaled_joint_controller.py} | 56 ------ 5 files changed, 360 insertions(+), 158 deletions(-) create mode 100644 ur_robot_driver/test/integration_test_config_controller.py create mode 100644 ur_robot_driver/test/integration_test_io_controller.py rename ur_robot_driver/test/{robot_driver.py => integration_test_scaled_joint_controller.py} (90%) diff --git a/ur_robot_driver/test/integration_test_config_controller.py b/ur_robot_driver/test/integration_test_config_controller.py new file mode 100644 index 000000000..0699e6fe0 --- /dev/null +++ b/ur_robot_driver/test/integration_test_config_controller.py @@ -0,0 +1,111 @@ +#!/usr/bin/env python +# Copyright 2025, Universal Robots A/S +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# +# * Redistributions in binary form must reproduce the above copyright +# notice, this list of conditions and the following disclaimer in the +# documentation and/or other materials provided with the distribution. +# +# * Neither the name of the {copyright_holder} nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE +# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +# POSSIBILITY OF SUCH DAMAGE. + +import os +import sys +import time +import unittest + +import launch_testing +import pytest +import rclpy +from control_msgs.action import FollowJointTrajectory +from rclpy.node import Node + +sys.path.append(os.path.dirname(__file__)) +from test_common import ( # noqa: E402 + ActionInterface, + ControllerManagerInterface, + DashboardInterface, + IoStatusInterface, + ConfigurationInterface, + generate_driver_test_description, +) + + +@pytest.mark.launch_test +@launch_testing.parametrize( + "tf_prefix, use_mock_hardware", [("", "false"), ("my_ur_", "false"), ("", "true")] +) +def generate_test_description(tf_prefix, use_mock_hardware): + return generate_driver_test_description( + tf_prefix=tf_prefix, use_mock_hardware=use_mock_hardware + ) + + +class RobotDriverTest(unittest.TestCase): + @classmethod + def setUpClass(cls, use_mock_hardware): + # Initialize the ROS context + rclpy.init() + cls.node = Node("robot_driver_test") + time.sleep(1) + cls.mock_hardware = use_mock_hardware == "true" + cls.init_robot(cls) + + @classmethod + def tearDownClass(cls): + # Shutdown the ROS context + cls.node.destroy_node() + rclpy.shutdown() + + def init_robot(self): + if not self.mock_hardware: + self._dashboard_interface = DashboardInterface(self.node) + else: + self._dashboard_interface = None + self._controller_manager_interface = ControllerManagerInterface(self.node) + self._io_status_controller_interface = IoStatusInterface(self.node) + self._configuration_controller_interface = ConfigurationInterface(self.node) + + self._scaled_follow_joint_trajectory = ActionInterface( + self.node, + "/scaled_joint_trajectory_controller/follow_joint_trajectory", + FollowJointTrajectory, + ) + self._passthrough_forward_joint_trajectory = ActionInterface( + self.node, + "/passthrough_trajectory_controller/follow_joint_trajectory", + FollowJointTrajectory, + ) + + def setUp(self): + if self._dashboard_interface: + self._dashboard_interface.start_robot() + time.sleep(1) + self.assertTrue(self._io_status_controller_interface.resend_robot_program().success) + + # + # Test functions + # + + def test_get_robot_software_version(self): + self.assertNotEqual( + self._configuration_controller_interface.get_robot_software_version().major, 0 + ) diff --git a/ur_robot_driver/test/integration_test_controller_switch.py b/ur_robot_driver/test/integration_test_controller_switch.py index 58e11a1e4..2ef097f04 100644 --- a/ur_robot_driver/test/integration_test_controller_switch.py +++ b/ur_robot_driver/test/integration_test_controller_switch.py @@ -60,11 +60,7 @@ @pytest.mark.launch_test -@launch_testing.parametrize( - "tf_prefix", - [""], - # [(""), ("my_ur_")], -) +@launch_testing.parametrize("tf_prefix", [(""), ("my_ur_")]) def generate_test_description(tf_prefix): return generate_driver_test_description(tf_prefix=tf_prefix) diff --git a/ur_robot_driver/test/integration_test_io_controller.py b/ur_robot_driver/test/integration_test_io_controller.py new file mode 100644 index 000000000..bd4391204 --- /dev/null +++ b/ur_robot_driver/test/integration_test_io_controller.py @@ -0,0 +1,158 @@ +#!/usr/bin/env python +# Copyright 2025, Universal Robots A/S +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# +# * Redistributions in binary form must reproduce the above copyright +# notice, this list of conditions and the following disclaimer in the +# documentation and/or other materials provided with the distribution. +# +# * Neither the name of the {copyright_holder} nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE +# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +# POSSIBILITY OF SUCH DAMAGE. + +import logging +import os +import sys +import time +import unittest + +import launch_testing +import pytest +import rclpy +from control_msgs.action import FollowJointTrajectory +from rclpy.node import Node +from ur_msgs.msg import IOStates + +sys.path.append(os.path.dirname(__file__)) +from test_common import ( # noqa: E402 + ActionInterface, + ControllerManagerInterface, + DashboardInterface, + IoStatusInterface, + ConfigurationInterface, + generate_driver_test_description, +) + + +@pytest.mark.launch_test +@launch_testing.parametrize( + "tf_prefix, use_mock_hardware", [("", "false"), ("my_ur_", "false"), ("", "true")] +) +def generate_test_description(tf_prefix, use_mock_hardware): + return generate_driver_test_description( + tf_prefix=tf_prefix, use_mock_hardware=use_mock_hardware + ) + + +class RobotDriverTest(unittest.TestCase): + @classmethod + def setUpClass(cls, use_mock_hardware): + # Initialize the ROS context + rclpy.init() + cls.node = Node("robot_driver_test") + time.sleep(1) + cls.mock_hardware = use_mock_hardware == "true" + cls.init_robot(cls) + + @classmethod + def tearDownClass(cls): + # Shutdown the ROS context + cls.node.destroy_node() + rclpy.shutdown() + + def init_robot(self): + if not self.mock_hardware: + self._dashboard_interface = DashboardInterface(self.node) + else: + self._dashboard_interface = None + self._controller_manager_interface = ControllerManagerInterface(self.node) + self._io_status_controller_interface = IoStatusInterface(self.node) + self._configuration_controller_interface = ConfigurationInterface(self.node) + + self._scaled_follow_joint_trajectory = ActionInterface( + self.node, + "/scaled_joint_trajectory_controller/follow_joint_trajectory", + FollowJointTrajectory, + ) + self._passthrough_forward_joint_trajectory = ActionInterface( + self.node, + "/passthrough_trajectory_controller/follow_joint_trajectory", + FollowJointTrajectory, + ) + + def setUp(self): + if self._dashboard_interface: + self._dashboard_interface.start_robot() + time.sleep(1) + self.assertTrue(self._io_status_controller_interface.resend_robot_program().success) + + # + # Test functions + # + + def test_set_io(self): + """Test to set an IO and check whether it has been set.""" + if self.mock_hardware: + return True + # Create io callback to verify result + io_msg = None + + def io_msg_cb(msg): + nonlocal io_msg + io_msg = msg + + io_states_sub = self.node.create_subscription( + IOStates, + "/io_and_status_controller/io_states", + io_msg_cb, + rclpy.qos.qos_profile_system_default, + ) + + # Set pin 0 to 1.0 + test_pin = 0 + + logging.info("Setting pin %d to 1.0", test_pin) + self._io_status_controller_interface.set_io(fun=1, pin=test_pin, state=1.0) + + # Wait until the pin state has changed + pin_state = False + end_time = time.time() + 5 + while not pin_state and time.time() < end_time: + rclpy.spin_once(self.node, timeout_sec=0.1) + if io_msg is not None: + pin_state = io_msg.digital_out_states[test_pin].state + + self.assertEqual(pin_state, 1.0) + + # Set pin 0 to 0.0 + logging.info("Setting pin %d to 0.0", test_pin) + self._io_status_controller_interface.set_io(fun=1, pin=test_pin, state=0.0) + + # Wait until the pin state has changed back + end_time = time.time() + 5 + while pin_state and time.time() < end_time: + rclpy.spin_once(self.node, timeout_sec=0.1) + if io_msg is not None: + pin_state = io_msg.digital_out_states[test_pin].state + + self.assertEqual(pin_state, 0.0) + + # Clean up io subscription + self.node.destroy_subscription(io_states_sub) diff --git a/ur_robot_driver/test/integration_test_passthrough_controller.py b/ur_robot_driver/test/integration_test_passthrough_controller.py index a74f860fb..61942c37a 100644 --- a/ur_robot_driver/test/integration_test_passthrough_controller.py +++ b/ur_robot_driver/test/integration_test_passthrough_controller.py @@ -1,5 +1,5 @@ #!/usr/bin/env python -# Copyright 2019, Universal Robots A/S +# Copyright 2025, Universal Robots A/S # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions are met: @@ -82,7 +82,7 @@ def generate_test_description(tf_prefix): class RobotDriverTest(unittest.TestCase): @classmethod - def setUpClass(cls, use_mock_hardware): + def setUpClass(cls): # Initialize the ROS context rclpy.init() cls.node = Node("robot_driver_test") @@ -132,13 +132,6 @@ def test_start_passthrough_controller(self): deactivate_controllers=["scaled_joint_trajectory_controller"], ).ok ) - self.assertTrue( - self._controller_manager_interface.switch_controller( - strictness=SwitchController.Request.BEST_EFFORT, - deactivate_controllers=["passthrough_trajectory_controller"], - activate_controllers=["scaled_joint_trajectory_controller"], - ).ok - ) def test_passthrough_trajectory(self, tf_prefix): self.assertTrue( @@ -172,96 +165,96 @@ def test_passthrough_trajectory(self, tf_prefix): ) self.assertEqual(result.error_code, FollowJointTrajectory.Result.SUCCESSFUL) - def test_quintic_trajectory(self, tf_prefix): - # Full quintic trajectory + # def test_quintic_trajectory(self, tf_prefix): + # # Full quintic trajectory - trajectory = JointTrajectory( - points=[ - JointTrajectoryPoint( - positions=pos, - time_from_start=times, - velocities=[0.0, 0.0, 0.0, 0.0, 0.0, 0.0], - accelerations=[0.0, 0.0, 0.0, 0.0, 0.0, 0.0], - ) - for (times, pos) in TEST_TRAJECTORY - ], - joint_names=[tf_prefix + joint for joint in ROBOT_JOINTS], - ) - goal_time_tolerance = Duration(sec=1, nanosec=0) - goal_tolerance = [ - JointTolerance(position=0.01, name=tf_prefix + joint) for joint in ROBOT_JOINTS - ] - goal_handle = self._passthrough_forward_joint_trajectory.send_goal( - trajectory=trajectory, - goal_time_tolerance=goal_time_tolerance, - goal_tolerance=goal_tolerance, - ) + # trajectory = JointTrajectory( + # points=[ + # JointTrajectoryPoint( + # positions=pos, + # time_from_start=times, + # velocities=[0.0, 0.0, 0.0, 0.0, 0.0, 0.0], + # accelerations=[0.0, 0.0, 0.0, 0.0, 0.0, 0.0], + # ) + # for (times, pos) in TEST_TRAJECTORY + # ], + # joint_names=[tf_prefix + joint for joint in ROBOT_JOINTS], + # ) + # goal_time_tolerance = Duration(sec=1, nanosec=0) + # goal_tolerance = [ + # JointTolerance(position=0.01, name=tf_prefix + joint) for joint in ROBOT_JOINTS + # ] + # goal_handle = self._passthrough_forward_joint_trajectory.send_goal( + # trajectory=trajectory, + # goal_time_tolerance=goal_time_tolerance, + # goal_tolerance=goal_tolerance, + # ) - self.assertTrue(goal_handle.accepted) - if goal_handle.accepted: - result = self._passthrough_forward_joint_trajectory.get_result( - goal_handle, TIMEOUT_EXECUTE_TRAJECTORY - ) - self.assertEqual(result.error_code, FollowJointTrajectory.Result.SUCCESSFUL) + # self.assertTrue(goal_handle.accepted) + # if goal_handle.accepted: + # result = self._passthrough_forward_joint_trajectory.get_result( + # goal_handle, TIMEOUT_EXECUTE_TRAJECTORY + # ) + # self.assertEqual(result.error_code, FollowJointTrajectory.Result.SUCCESSFUL) - def test_impossible_goal_tolerance_fails(self, tf_prefix): - # Test impossible goal tolerance, should fail. + # def test_impossible_goal_tolerance_fails(self, tf_prefix): + # # Test impossible goal tolerance, should fail. - trajectory = JointTrajectory( - points=[ - JointTrajectoryPoint(positions=pos, time_from_start=times) - for (times, pos) in TEST_TRAJECTORY - ], - joint_names=[tf_prefix + joint for joint in ROBOT_JOINTS], - ) - goal_tolerance = [ - JointTolerance(position=0.000000001, name=tf_prefix + joint) for joint in ROBOT_JOINTS - ] - goal_time_tolerance = Duration(sec=1, nanosec=0) - goal_handle = self._passthrough_forward_joint_trajectory.send_goal( - trajectory=trajectory, - goal_time_tolerance=goal_time_tolerance, - goal_tolerance=goal_tolerance, - ) - self.assertTrue(goal_handle.accepted) - if goal_handle.accepted: - result = self._passthrough_forward_joint_trajectory.get_result( - goal_handle, TIMEOUT_EXECUTE_TRAJECTORY - ) - self.assertEqual( - result.error_code, FollowJointTrajectory.Result.GOAL_TOLERANCE_VIOLATED - ) + # trajectory = JointTrajectory( + # points=[ + # JointTrajectoryPoint(positions=pos, time_from_start=times) + # for (times, pos) in TEST_TRAJECTORY + # ], + # joint_names=[tf_prefix + joint for joint in ROBOT_JOINTS], + # ) + # goal_tolerance = [ + # JointTolerance(position=0.000000001, name=tf_prefix + joint) for joint in ROBOT_JOINTS + # ] + # goal_time_tolerance = Duration(sec=1, nanosec=0) + # goal_handle = self._passthrough_forward_joint_trajectory.send_goal( + # trajectory=trajectory, + # goal_time_tolerance=goal_time_tolerance, + # goal_tolerance=goal_tolerance, + # ) + # self.assertTrue(goal_handle.accepted) + # if goal_handle.accepted: + # result = self._passthrough_forward_joint_trajectory.get_result( + # goal_handle, TIMEOUT_EXECUTE_TRAJECTORY + # ) + # self.assertEqual( + # result.error_code, FollowJointTrajectory.Result.GOAL_TOLERANCE_VIOLATED + # ) - def test_impossible_goal_time_tolerance_fails(self, tf_prefix): - # Test impossible goal time - goal_tolerance = [ - JointTolerance(position=0.01, name=tf_prefix + joint) for joint in ROBOT_JOINTS - ] - goal_time_tolerance = Duration(sec=0, nanosec=10) - trajectory = JointTrajectory( - points=[ - JointTrajectoryPoint(positions=pos, time_from_start=times) - for (times, pos) in TEST_TRAJECTORY - ], - joint_names=[tf_prefix + joint for joint in ROBOT_JOINTS], - ) - goal_handle = self._passthrough_forward_joint_trajectory.send_goal( - trajectory=trajectory, - goal_time_tolerance=goal_time_tolerance, - goal_tolerance=goal_tolerance, - ) - self.assertTrue(goal_handle.accepted) - if goal_handle.accepted: - result = self._passthrough_forward_joint_trajectory.get_result( - goal_handle, TIMEOUT_EXECUTE_TRAJECTORY - ) - self.assertEqual( - result.error_code, FollowJointTrajectory.Result.GOAL_TOLERANCE_VIOLATED - ) - self.assertTrue( - self._controller_manager_interface.switch_controller( - strictness=SwitchController.Request.BEST_EFFORT, - deactivate_controllers=["passthrough_trajectory_controller"], - activate_controllers=["scaled_joint_trajectory_controller"], - ).ok - ) + # def test_impossible_goal_time_tolerance_fails(self, tf_prefix): + # # Test impossible goal time + # goal_tolerance = [ + # JointTolerance(position=0.01, name=tf_prefix + joint) for joint in ROBOT_JOINTS + # ] + # goal_time_tolerance = Duration(sec=0, nanosec=10) + # trajectory = JointTrajectory( + # points=[ + # JointTrajectoryPoint(positions=pos, time_from_start=times) + # for (times, pos) in TEST_TRAJECTORY + # ], + # joint_names=[tf_prefix + joint for joint in ROBOT_JOINTS], + # ) + # goal_handle = self._passthrough_forward_joint_trajectory.send_goal( + # trajectory=trajectory, + # goal_time_tolerance=goal_time_tolerance, + # goal_tolerance=goal_tolerance, + # ) + # self.assertTrue(goal_handle.accepted) + # if goal_handle.accepted: + # result = self._passthrough_forward_joint_trajectory.get_result( + # goal_handle, TIMEOUT_EXECUTE_TRAJECTORY + # ) + # self.assertEqual( + # result.error_code, FollowJointTrajectory.Result.GOAL_TOLERANCE_VIOLATED + # ) + # self.assertTrue( + # self._controller_manager_interface.switch_controller( + # strictness=SwitchController.Request.BEST_EFFORT, + # deactivate_controllers=["passthrough_trajectory_controller"], + # activate_controllers=["scaled_joint_trajectory_controller"], + # ).ok + # ) diff --git a/ur_robot_driver/test/robot_driver.py b/ur_robot_driver/test/integration_test_scaled_joint_controller.py similarity index 90% rename from ur_robot_driver/test/robot_driver.py rename to ur_robot_driver/test/integration_test_scaled_joint_controller.py index 388a75704..3daabe6e5 100755 --- a/ur_robot_driver/test/robot_driver.py +++ b/ur_robot_driver/test/integration_test_scaled_joint_controller.py @@ -41,7 +41,6 @@ from rclpy.node import Node from sensor_msgs.msg import JointState from trajectory_msgs.msg import JointTrajectory, JointTrajectoryPoint -from ur_msgs.msg import IOStates sys.path.append(os.path.dirname(__file__)) from test_common import ( # noqa: E402 @@ -112,11 +111,6 @@ def setUp(self): # Test functions # - def test_get_robot_software_version(self): - self.assertNotEqual( - self._configuration_controller_interface.get_robot_software_version().major, 0 - ) - def test_start_scaled_jtc_controller(self): self.assertTrue( self._controller_manager_interface.switch_controller( @@ -125,56 +119,6 @@ def test_start_scaled_jtc_controller(self): ).ok ) - def test_set_io(self): - """Test to set an IO and check whether it has been set.""" - if self.mock_hardware: - return True - # Create io callback to verify result - io_msg = None - - def io_msg_cb(msg): - nonlocal io_msg - io_msg = msg - - io_states_sub = self.node.create_subscription( - IOStates, - "/io_and_status_controller/io_states", - io_msg_cb, - rclpy.qos.qos_profile_system_default, - ) - - # Set pin 0 to 1.0 - test_pin = 0 - - logging.info("Setting pin %d to 1.0", test_pin) - self._io_status_controller_interface.set_io(fun=1, pin=test_pin, state=1.0) - - # Wait until the pin state has changed - pin_state = False - end_time = time.time() + 5 - while not pin_state and time.time() < end_time: - rclpy.spin_once(self.node, timeout_sec=0.1) - if io_msg is not None: - pin_state = io_msg.digital_out_states[test_pin].state - - self.assertEqual(pin_state, 1.0) - - # Set pin 0 to 0.0 - logging.info("Setting pin %d to 0.0", test_pin) - self._io_status_controller_interface.set_io(fun=1, pin=test_pin, state=0.0) - - # Wait until the pin state has changed back - end_time = time.time() + 5 - while pin_state and time.time() < end_time: - rclpy.spin_once(self.node, timeout_sec=0.1) - if io_msg is not None: - pin_state = io_msg.digital_out_states[test_pin].state - - self.assertEqual(pin_state, 0.0) - - # Clean up io subscription - self.node.destroy_subscription(io_states_sub) - def test_trajectory(self, tf_prefix): """Test robot movement.""" self.assertTrue( From 336c576232a5d5542ae6c9d7a50bf735bc2a8b7b Mon Sep 17 00:00:00 2001 From: URJala Date: Tue, 8 Apr 2025 14:04:27 +0000 Subject: [PATCH 08/20] finish splitting passthrough controller tests --- ...integration_test_passthrough_controller.py | 200 ++++++++++-------- 1 file changed, 110 insertions(+), 90 deletions(-) diff --git a/ur_robot_driver/test/integration_test_passthrough_controller.py b/ur_robot_driver/test/integration_test_passthrough_controller.py index 61942c37a..aa72745ad 100644 --- a/ur_robot_driver/test/integration_test_passthrough_controller.py +++ b/ur_robot_driver/test/integration_test_passthrough_controller.py @@ -77,7 +77,7 @@ def generate_test_description(tf_prefix): Duration(sec=8, nanosec=0), Duration(sec=12, nanosec=0), ] -TEST_TRAJECTORY = zip(time_vec, waypts) +TEST_TRAJECTORY = [(time_vec[i], waypts[i]) for i in range(len(waypts))] class RobotDriverTest(unittest.TestCase): @@ -165,96 +165,116 @@ def test_passthrough_trajectory(self, tf_prefix): ) self.assertEqual(result.error_code, FollowJointTrajectory.Result.SUCCESSFUL) - # def test_quintic_trajectory(self, tf_prefix): - # # Full quintic trajectory - - # trajectory = JointTrajectory( - # points=[ - # JointTrajectoryPoint( - # positions=pos, - # time_from_start=times, - # velocities=[0.0, 0.0, 0.0, 0.0, 0.0, 0.0], - # accelerations=[0.0, 0.0, 0.0, 0.0, 0.0, 0.0], - # ) - # for (times, pos) in TEST_TRAJECTORY - # ], - # joint_names=[tf_prefix + joint for joint in ROBOT_JOINTS], - # ) - # goal_time_tolerance = Duration(sec=1, nanosec=0) - # goal_tolerance = [ - # JointTolerance(position=0.01, name=tf_prefix + joint) for joint in ROBOT_JOINTS - # ] - # goal_handle = self._passthrough_forward_joint_trajectory.send_goal( - # trajectory=trajectory, - # goal_time_tolerance=goal_time_tolerance, - # goal_tolerance=goal_tolerance, - # ) + def test_quintic_trajectory(self, tf_prefix): + # Full quintic trajectory + self.assertTrue( + self._controller_manager_interface.switch_controller( + strictness=SwitchController.Request.BEST_EFFORT, + activate_controllers=["passthrough_trajectory_controller"], + deactivate_controllers=["scaled_joint_trajectory_controller"], + ).ok + ) + trajectory = JointTrajectory( + points=[ + JointTrajectoryPoint( + positions=pos, + time_from_start=times, + velocities=[0.0, 0.0, 0.0, 0.0, 0.0, 0.0], + accelerations=[0.0, 0.0, 0.0, 0.0, 0.0, 0.0], + ) + for (times, pos) in TEST_TRAJECTORY + ], + joint_names=[tf_prefix + joint for joint in ROBOT_JOINTS], + ) + goal_time_tolerance = Duration(sec=1, nanosec=0) + goal_tolerance = [ + JointTolerance(position=0.01, name=tf_prefix + joint) for joint in ROBOT_JOINTS + ] + goal_handle = self._passthrough_forward_joint_trajectory.send_goal( + trajectory=trajectory, + goal_time_tolerance=goal_time_tolerance, + goal_tolerance=goal_tolerance, + ) - # self.assertTrue(goal_handle.accepted) - # if goal_handle.accepted: - # result = self._passthrough_forward_joint_trajectory.get_result( - # goal_handle, TIMEOUT_EXECUTE_TRAJECTORY - # ) - # self.assertEqual(result.error_code, FollowJointTrajectory.Result.SUCCESSFUL) + self.assertTrue(goal_handle.accepted) + if goal_handle.accepted: + result = self._passthrough_forward_joint_trajectory.get_result( + goal_handle, TIMEOUT_EXECUTE_TRAJECTORY + ) + self.assertEqual(result.error_code, FollowJointTrajectory.Result.SUCCESSFUL) - # def test_impossible_goal_tolerance_fails(self, tf_prefix): - # # Test impossible goal tolerance, should fail. + def test_impossible_goal_tolerance_fails(self, tf_prefix): + # Test impossible goal tolerance, should fail. + self.assertTrue( + self._controller_manager_interface.switch_controller( + strictness=SwitchController.Request.BEST_EFFORT, + activate_controllers=["passthrough_trajectory_controller"], + deactivate_controllers=["scaled_joint_trajectory_controller"], + ).ok + ) + trajectory = JointTrajectory( + points=[ + JointTrajectoryPoint(positions=pos, time_from_start=times) + for (times, pos) in TEST_TRAJECTORY + ], + joint_names=[tf_prefix + joint for joint in ROBOT_JOINTS], + ) + goal_tolerance = [ + JointTolerance(position=0.000000001, name=tf_prefix + joint) for joint in ROBOT_JOINTS + ] + goal_time_tolerance = Duration(sec=1, nanosec=0) + goal_handle = self._passthrough_forward_joint_trajectory.send_goal( + trajectory=trajectory, + goal_time_tolerance=goal_time_tolerance, + goal_tolerance=goal_tolerance, + ) + self.assertTrue(goal_handle.accepted) + if goal_handle.accepted: + result = self._passthrough_forward_joint_trajectory.get_result( + goal_handle, TIMEOUT_EXECUTE_TRAJECTORY + ) + self.assertEqual( + result.error_code, FollowJointTrajectory.Result.GOAL_TOLERANCE_VIOLATED + ) - # trajectory = JointTrajectory( - # points=[ - # JointTrajectoryPoint(positions=pos, time_from_start=times) - # for (times, pos) in TEST_TRAJECTORY - # ], - # joint_names=[tf_prefix + joint for joint in ROBOT_JOINTS], - # ) - # goal_tolerance = [ - # JointTolerance(position=0.000000001, name=tf_prefix + joint) for joint in ROBOT_JOINTS - # ] - # goal_time_tolerance = Duration(sec=1, nanosec=0) - # goal_handle = self._passthrough_forward_joint_trajectory.send_goal( - # trajectory=trajectory, - # goal_time_tolerance=goal_time_tolerance, - # goal_tolerance=goal_tolerance, - # ) - # self.assertTrue(goal_handle.accepted) - # if goal_handle.accepted: - # result = self._passthrough_forward_joint_trajectory.get_result( - # goal_handle, TIMEOUT_EXECUTE_TRAJECTORY - # ) - # self.assertEqual( - # result.error_code, FollowJointTrajectory.Result.GOAL_TOLERANCE_VIOLATED - # ) + def test_impossible_goal_time_tolerance_fails(self, tf_prefix): + # Test impossible goal time + self.assertTrue( + self._controller_manager_interface.switch_controller( + strictness=SwitchController.Request.BEST_EFFORT, + activate_controllers=["passthrough_trajectory_controller"], + deactivate_controllers=["scaled_joint_trajectory_controller"], + ).ok + ) - # def test_impossible_goal_time_tolerance_fails(self, tf_prefix): - # # Test impossible goal time - # goal_tolerance = [ - # JointTolerance(position=0.01, name=tf_prefix + joint) for joint in ROBOT_JOINTS - # ] - # goal_time_tolerance = Duration(sec=0, nanosec=10) - # trajectory = JointTrajectory( - # points=[ - # JointTrajectoryPoint(positions=pos, time_from_start=times) - # for (times, pos) in TEST_TRAJECTORY - # ], - # joint_names=[tf_prefix + joint for joint in ROBOT_JOINTS], - # ) - # goal_handle = self._passthrough_forward_joint_trajectory.send_goal( - # trajectory=trajectory, - # goal_time_tolerance=goal_time_tolerance, - # goal_tolerance=goal_tolerance, - # ) - # self.assertTrue(goal_handle.accepted) - # if goal_handle.accepted: - # result = self._passthrough_forward_joint_trajectory.get_result( - # goal_handle, TIMEOUT_EXECUTE_TRAJECTORY - # ) - # self.assertEqual( - # result.error_code, FollowJointTrajectory.Result.GOAL_TOLERANCE_VIOLATED - # ) - # self.assertTrue( - # self._controller_manager_interface.switch_controller( - # strictness=SwitchController.Request.BEST_EFFORT, - # deactivate_controllers=["passthrough_trajectory_controller"], - # activate_controllers=["scaled_joint_trajectory_controller"], - # ).ok - # ) + goal_tolerance = [ + JointTolerance(position=0.01, name=tf_prefix + joint) for joint in ROBOT_JOINTS + ] + goal_time_tolerance = Duration(sec=0, nanosec=10) + trajectory = JointTrajectory( + points=[ + JointTrajectoryPoint(positions=pos, time_from_start=times) + for (times, pos) in TEST_TRAJECTORY + ], + joint_names=[tf_prefix + joint for joint in ROBOT_JOINTS], + ) + goal_handle = self._passthrough_forward_joint_trajectory.send_goal( + trajectory=trajectory, + goal_time_tolerance=goal_time_tolerance, + goal_tolerance=goal_tolerance, + ) + self.assertTrue(goal_handle.accepted) + if goal_handle.accepted: + result = self._passthrough_forward_joint_trajectory.get_result( + goal_handle, TIMEOUT_EXECUTE_TRAJECTORY + ) + self.assertEqual( + result.error_code, FollowJointTrajectory.Result.GOAL_TOLERANCE_VIOLATED + ) + self.assertTrue( + self._controller_manager_interface.switch_controller( + strictness=SwitchController.Request.BEST_EFFORT, + deactivate_controllers=["passthrough_trajectory_controller"], + activate_controllers=["scaled_joint_trajectory_controller"], + ).ok + ) From a229561ac82c1ace85dbff75ebf64da68f370f5b Mon Sep 17 00:00:00 2001 From: URJala Date: Tue, 8 Apr 2025 14:14:59 +0000 Subject: [PATCH 09/20] Add the separate tests to cmake file --- ur_robot_driver/CMakeLists.txt | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/ur_robot_driver/CMakeLists.txt b/ur_robot_driver/CMakeLists.txt index 1a76a5cab..1f35f4631 100644 --- a/ur_robot_driver/CMakeLists.txt +++ b/ur_robot_driver/CMakeLists.txt @@ -243,7 +243,7 @@ if(BUILD_TESTING) TIMEOUT 180 ) - add_launch_test(test/robot_driver.py + add_launch_test(test/integration_test_scaled_joint_controller.py TIMEOUT 800 ) @@ -259,6 +259,18 @@ if(BUILD_TESTING) TIMEOUT 500 ) + add_launch_test(test/integration_test_config_controller.py + TIMEOUT + 800 + ) + add_launch_test(test/integration_test_passthrough_controller.py + TIMEOUT + 800 + ) + add_launch_test(test/integration_test_io_controller.py + TIMEOUT + 800 + ) add_launch_test(test/integration_test_tool_contact.py TIMEOUT 800 From 8cfbe37f3b15bc40b64d85a4522865c762de6cba Mon Sep 17 00:00:00 2001 From: URJala Date: Tue, 8 Apr 2025 14:26:38 +0000 Subject: [PATCH 10/20] formatting --- ur_robot_driver/test/test_common.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/ur_robot_driver/test/test_common.py b/ur_robot_driver/test/test_common.py index ce53b268c..4ece7e68e 100644 --- a/ur_robot_driver/test/test_common.py +++ b/ur_robot_driver/test/test_common.py @@ -375,7 +375,8 @@ def generate_dashboard_test_description(): def generate_driver_test_description( tf_prefix="", initial_joint_controller="scaled_joint_trajectory_controller", - controller_spawner_timeout=TIMEOUT_WAIT_SERVICE_INITIAL, use_mock_hardware="false", + controller_spawner_timeout=TIMEOUT_WAIT_SERVICE_INITIAL, + use_mock_hardware="false", ): ur_type = LaunchConfiguration("ur_type") From 6992dd2ace54670cdc7bbe3f01e6c4c437b1321d Mon Sep 17 00:00:00 2001 From: URJala Date: Tue, 22 Apr 2025 10:32:01 +0000 Subject: [PATCH 11/20] Add mock hardware test description --- ur_robot_driver/test/test_common.py | 36 +++++++++++++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/ur_robot_driver/test/test_common.py b/ur_robot_driver/test/test_common.py index 4ece7e68e..573a29b17 100644 --- a/ur_robot_driver/test/test_common.py +++ b/ur_robot_driver/test/test_common.py @@ -372,6 +372,42 @@ def generate_dashboard_test_description(): ) +def generate_mock_hardware_test_description( + tf_prefix="", + initial_joint_controller="scaled_joint_trajectory_controller", + controller_spawner_timeout=TIMEOUT_WAIT_SERVICE_INITIAL, + use_mock_hardware="false", +): + + ur_type = LaunchConfiguration("ur_type") + + launch_arguments = { + "robot_ip": "0.0.0.0", + "ur_type": ur_type, + "launch_rviz": "false", + "controller_spawner_timeout": str(controller_spawner_timeout), + "initial_joint_controller": initial_joint_controller, + "headless_mode": "true", + "launch_dashboard_client": "true", + "start_joint_controller": "false", + "use_mock_hardware": use_mock_hardware, + "mock_sensor_commands": use_mock_hardware, + } + if tf_prefix: + launch_arguments["tf_prefix"] = tf_prefix + + robot_driver = IncludeLaunchDescription( + PythonLaunchDescriptionSource( + PathJoinSubstitution( + [FindPackageShare("ur_robot_driver"), "launch", "ur_control.launch.py"] + ) + ), + launch_arguments=launch_arguments.items(), + ) + + return LaunchDescription(_declare_launch_arguments() + [ReadyToTest(), robot_driver]) + + def generate_driver_test_description( tf_prefix="", initial_joint_controller="scaled_joint_trajectory_controller", From 31e93c90539817a77566b066a589120f35ef00d1 Mon Sep 17 00:00:00 2001 From: URJala Date: Tue, 22 Apr 2025 10:34:14 +0000 Subject: [PATCH 12/20] Move all mock harrdware tests in to their own file --- .../integration_test_config_controller.py | 21 +- .../test/integration_test_io_controller.py | 23 +- ...ntegration_test_scaled_joint_controller.py | 23 +- ur_robot_driver/test/test_mock_hardware.py | 610 ++++++++++++++++++ 4 files changed, 628 insertions(+), 49 deletions(-) create mode 100644 ur_robot_driver/test/test_mock_hardware.py diff --git a/ur_robot_driver/test/integration_test_config_controller.py b/ur_robot_driver/test/integration_test_config_controller.py index 0699e6fe0..1e29de8e4 100644 --- a/ur_robot_driver/test/integration_test_config_controller.py +++ b/ur_robot_driver/test/integration_test_config_controller.py @@ -50,23 +50,18 @@ @pytest.mark.launch_test -@launch_testing.parametrize( - "tf_prefix, use_mock_hardware", [("", "false"), ("my_ur_", "false"), ("", "true")] -) -def generate_test_description(tf_prefix, use_mock_hardware): - return generate_driver_test_description( - tf_prefix=tf_prefix, use_mock_hardware=use_mock_hardware - ) +@launch_testing.parametrize("tf_prefix", [(""), ("my_ur_")]) +def generate_test_description(tf_prefix): + return generate_driver_test_description(tf_prefix=tf_prefix) class RobotDriverTest(unittest.TestCase): @classmethod - def setUpClass(cls, use_mock_hardware): + def setUpClass(cls): # Initialize the ROS context rclpy.init() cls.node = Node("robot_driver_test") time.sleep(1) - cls.mock_hardware = use_mock_hardware == "true" cls.init_robot(cls) @classmethod @@ -76,10 +71,7 @@ def tearDownClass(cls): rclpy.shutdown() def init_robot(self): - if not self.mock_hardware: - self._dashboard_interface = DashboardInterface(self.node) - else: - self._dashboard_interface = None + self._dashboard_interface = DashboardInterface(self.node) self._controller_manager_interface = ControllerManagerInterface(self.node) self._io_status_controller_interface = IoStatusInterface(self.node) self._configuration_controller_interface = ConfigurationInterface(self.node) @@ -96,8 +88,7 @@ def init_robot(self): ) def setUp(self): - if self._dashboard_interface: - self._dashboard_interface.start_robot() + self._dashboard_interface.start_robot() time.sleep(1) self.assertTrue(self._io_status_controller_interface.resend_robot_program().success) diff --git a/ur_robot_driver/test/integration_test_io_controller.py b/ur_robot_driver/test/integration_test_io_controller.py index bd4391204..e1aa83706 100644 --- a/ur_robot_driver/test/integration_test_io_controller.py +++ b/ur_robot_driver/test/integration_test_io_controller.py @@ -52,23 +52,18 @@ @pytest.mark.launch_test -@launch_testing.parametrize( - "tf_prefix, use_mock_hardware", [("", "false"), ("my_ur_", "false"), ("", "true")] -) -def generate_test_description(tf_prefix, use_mock_hardware): - return generate_driver_test_description( - tf_prefix=tf_prefix, use_mock_hardware=use_mock_hardware - ) +@launch_testing.parametrize("tf_prefix", [(""), ("my_ur_")]) +def generate_test_description(tf_prefix): + return generate_driver_test_description(tf_prefix=tf_prefix) class RobotDriverTest(unittest.TestCase): @classmethod - def setUpClass(cls, use_mock_hardware): + def setUpClass(cls): # Initialize the ROS context rclpy.init() cls.node = Node("robot_driver_test") time.sleep(1) - cls.mock_hardware = use_mock_hardware == "true" cls.init_robot(cls) @classmethod @@ -78,10 +73,7 @@ def tearDownClass(cls): rclpy.shutdown() def init_robot(self): - if not self.mock_hardware: - self._dashboard_interface = DashboardInterface(self.node) - else: - self._dashboard_interface = None + self._dashboard_interface = DashboardInterface(self.node) self._controller_manager_interface = ControllerManagerInterface(self.node) self._io_status_controller_interface = IoStatusInterface(self.node) self._configuration_controller_interface = ConfigurationInterface(self.node) @@ -98,8 +90,7 @@ def init_robot(self): ) def setUp(self): - if self._dashboard_interface: - self._dashboard_interface.start_robot() + self._dashboard_interface.start_robot() time.sleep(1) self.assertTrue(self._io_status_controller_interface.resend_robot_program().success) @@ -109,8 +100,6 @@ def setUp(self): def test_set_io(self): """Test to set an IO and check whether it has been set.""" - if self.mock_hardware: - return True # Create io callback to verify result io_msg = None diff --git a/ur_robot_driver/test/integration_test_scaled_joint_controller.py b/ur_robot_driver/test/integration_test_scaled_joint_controller.py index 3daabe6e5..7ef2e529b 100755 --- a/ur_robot_driver/test/integration_test_scaled_joint_controller.py +++ b/ur_robot_driver/test/integration_test_scaled_joint_controller.py @@ -56,23 +56,18 @@ @pytest.mark.launch_test -@launch_testing.parametrize( - "tf_prefix, use_mock_hardware", [("", "false"), ("my_ur_", "false"), ("", "true")] -) -def generate_test_description(tf_prefix, use_mock_hardware): - return generate_driver_test_description( - tf_prefix=tf_prefix, use_mock_hardware=use_mock_hardware - ) +@launch_testing.parametrize("tf_prefix", [(""), ("my_ur_")]) +def generate_test_description(tf_prefix): + return generate_driver_test_description(tf_prefix=tf_prefix) class RobotDriverTest(unittest.TestCase): @classmethod - def setUpClass(cls, use_mock_hardware): + def setUpClass(cls): # Initialize the ROS context rclpy.init() cls.node = Node("robot_driver_test") time.sleep(1) - cls.mock_hardware = use_mock_hardware == "true" cls.init_robot(cls) @classmethod @@ -82,10 +77,7 @@ def tearDownClass(cls): rclpy.shutdown() def init_robot(self): - if not self.mock_hardware: - self._dashboard_interface = DashboardInterface(self.node) - else: - self._dashboard_interface = None + self._dashboard_interface = DashboardInterface(self.node) self._controller_manager_interface = ControllerManagerInterface(self.node) self._io_status_controller_interface = IoStatusInterface(self.node) self._configuration_controller_interface = ConfigurationInterface(self.node) @@ -102,8 +94,7 @@ def init_robot(self): ) def setUp(self): - if self._dashboard_interface: - self._dashboard_interface.start_robot() + self._dashboard_interface.start_robot() time.sleep(1) self.assertTrue(self._io_status_controller_interface.resend_robot_program().success) @@ -227,8 +218,6 @@ def test_trajectory_scaled(self, tf_prefix): def test_trajectory_scaled_aborts_on_violation(self, tf_prefix): """Test that the robot correctly aborts the trajectory when the constraints are violated.""" - if self.mock_hardware: - return True # Construct test trajectory test_trajectory = [ (Duration(sec=6, nanosec=0), [0.0 for j in ROBOT_JOINTS]), diff --git a/ur_robot_driver/test/test_mock_hardware.py b/ur_robot_driver/test/test_mock_hardware.py new file mode 100644 index 000000000..d16812c70 --- /dev/null +++ b/ur_robot_driver/test/test_mock_hardware.py @@ -0,0 +1,610 @@ +#!/usr/bin/env python +# Copyright 2025, Universal Robots A/S +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# +# * Redistributions in binary form must reproduce the above copyright +# notice, this list of conditions and the following disclaimer in the +# documentation and/or other materials provided with the distribution. +# +# * Neither the name of the {copyright_holder} nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE +# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +# POSSIBILITY OF SUCH DAMAGE. + +import os +import sys +import time +import unittest +import logging + +import launch_testing +import pytest +import rclpy +from rclpy.node import Node +from builtin_interfaces.msg import Duration +from control_msgs.action import FollowJointTrajectory +from controller_manager_msgs.srv import SwitchController +from trajectory_msgs.msg import JointTrajectory, JointTrajectoryPoint + + +sys.path.append(os.path.dirname(__file__)) +from test_common import ( # noqa: E402 + ActionInterface, + ControllerManagerInterface, + IoStatusInterface, + ConfigurationInterface, + generate_mock_hardware_test_description, + ROBOT_JOINTS, + TIMEOUT_EXECUTE_TRAJECTORY, +) + + +@pytest.mark.launch_test +@launch_testing.parametrize("tf_prefix, use_mock_hardware", [("", "true"), ("my_ur_", "true")]) +def generate_test_description(tf_prefix, use_mock_hardware): + return generate_mock_hardware_test_description( + tf_prefix=tf_prefix, use_mock_hardware=use_mock_hardware + ) + + +class RobotDriverTest(unittest.TestCase): + @classmethod + def setUpClass(cls, use_mock_hardware): + # Initialize the ROS context + rclpy.init() + cls.node = Node("mock_hardware_test") + time.sleep(1) + cls.mock_hardware = use_mock_hardware == "true" + cls.init_robot(cls) + + @classmethod + def tearDownClass(cls): + # Shutdown the ROS context + cls.node.destroy_node() + rclpy.shutdown() + + def init_robot(self): + self._dashboard_interface = None + self._controller_manager_interface = ControllerManagerInterface(self.node) + self._io_status_controller_interface = IoStatusInterface(self.node) + self._configuration_controller_interface = ConfigurationInterface(self.node) + + self._scaled_follow_joint_trajectory = ActionInterface( + self.node, + "/scaled_joint_trajectory_controller/follow_joint_trajectory", + FollowJointTrajectory, + ) + + def setUp(self): + time.sleep(1) + self.assertTrue(self._io_status_controller_interface.resend_robot_program().success) + + # + # Test functions + # + + def test_get_robot_software_version(self): + self.assertEqual( + self._configuration_controller_interface.get_robot_software_version().major, 1 + ) + + def test_start_scaled_jtc_controller(self): + self.assertTrue( + self._controller_manager_interface.switch_controller( + strictness=SwitchController.Request.BEST_EFFORT, + activate_controllers=["scaled_joint_trajectory_controller"], + ).ok + ) + + def test_trajectory(self, tf_prefix): + """Test robot movement.""" + # Construct test trajectory + test_trajectory = [ + (Duration(sec=6, nanosec=0), [0.0 for j in ROBOT_JOINTS]), + (Duration(sec=9, nanosec=0), [-0.5 for j in ROBOT_JOINTS]), + (Duration(sec=12, nanosec=0), [-1.0 for j in ROBOT_JOINTS]), + ] + + trajectory = JointTrajectory( + joint_names=[tf_prefix + joint for joint in ROBOT_JOINTS], + points=[ + JointTrajectoryPoint(positions=test_pos, time_from_start=test_time) + for (test_time, test_pos) in test_trajectory + ], + ) + + # Sending trajectory goal + logging.info("Sending simple goal") + goal_handle = self._scaled_follow_joint_trajectory.send_goal(trajectory=trajectory) + self.assertTrue(goal_handle.accepted) + + # Verify execution + result = self._scaled_follow_joint_trajectory.get_result( + goal_handle, TIMEOUT_EXECUTE_TRAJECTORY + ) + self.assertEqual(result.error_code, FollowJointTrajectory.Result.SUCCESSFUL) + + def test_illegal_trajectory(self, tf_prefix): + """ + Test trajectory server. + + This is more of a validation test that the testing suite does the right thing + """ + # Construct test trajectory, the second point wrongly starts before the first + test_trajectory = [ + (Duration(sec=6, nanosec=0), [0.0 for j in ROBOT_JOINTS]), + (Duration(sec=3, nanosec=0), [-0.5 for j in ROBOT_JOINTS]), + ] + + trajectory = JointTrajectory( + joint_names=[tf_prefix + joint for joint in ROBOT_JOINTS], + points=[ + JointTrajectoryPoint(positions=test_pos, time_from_start=test_time) + for (test_time, test_pos) in test_trajectory + ], + ) + + # Send illegal goal + logging.info("Sending illegal goal") + goal_handle = self._scaled_follow_joint_trajectory.send_goal( + trajectory=trajectory, + ) + + # Verify the failure is correctly detected + self.assertFalse(goal_handle.accepted) + + def test_trajectory_scaled(self, tf_prefix): + """Test robot movement.""" + # Construct test trajectory + test_trajectory = [ + (Duration(sec=6, nanosec=0), [0.0 for j in ROBOT_JOINTS]), + (Duration(sec=6, nanosec=500000000), [-1.0 for j in ROBOT_JOINTS]), + ] + + trajectory = JointTrajectory( + joint_names=[tf_prefix + joint for joint in ROBOT_JOINTS], + points=[ + JointTrajectoryPoint(positions=test_pos, time_from_start=test_time) + for (test_time, test_pos) in test_trajectory + ], + ) + + # Execute trajectory + logging.info("Sending goal for robot to follow") + goal_handle = self._scaled_follow_joint_trajectory.send_goal(trajectory=trajectory) + self.assertTrue(goal_handle.accepted) + + # Verify execution + result = self._scaled_follow_joint_trajectory.get_result( + goal_handle, + TIMEOUT_EXECUTE_TRAJECTORY, + ) + self.assertEqual(result.error_code, FollowJointTrajectory.Result.SUCCESSFUL) + + # Controller switching tests + def test_activating_multiple_controllers_same_interface_fails(self): + # Deactivate all writing controllers + self.assertTrue( + self._controller_manager_interface.switch_controller( + strictness=SwitchController.Request.BEST_EFFORT, + deactivate_controllers=[ + "scaled_joint_trajectory_controller", + "joint_trajectory_controller", + "forward_position_controller", + "forward_velocity_controller", + "passthrough_trajectory_controller", + "force_mode_controller", + "freedrive_mode_controller", + ], + ).ok + ) + + # Activating different motion controllers should not be possible + self.assertFalse( + self._controller_manager_interface.switch_controller( + strictness=SwitchController.Request.STRICT, + activate_controllers=[ + "scaled_joint_trajectory_controller", + "joint_trajectory_controller", + ], + ).ok + ) + self.assertFalse( + self._controller_manager_interface.switch_controller( + strictness=SwitchController.Request.STRICT, + activate_controllers=[ + "scaled_joint_trajectory_controller", + "forward_position_controller", + ], + ).ok + ) + + def test_activating_multiple_controllers_different_interface_fails(self): + # Deactivate all writing controllers + self.assertTrue( + self._controller_manager_interface.switch_controller( + strictness=SwitchController.Request.BEST_EFFORT, + deactivate_controllers=[ + "scaled_joint_trajectory_controller", + "joint_trajectory_controller", + "forward_position_controller", + "forward_velocity_controller", + "force_mode_controller", + "passthrough_trajectory_controller", + "freedrive_mode_controller", + ], + ).ok + ) + self.assertFalse( + self._controller_manager_interface.switch_controller( + strictness=SwitchController.Request.STRICT, + activate_controllers=[ + "scaled_joint_trajectory_controller", + "forward_velocity_controller", + ], + ).ok + ) + self.assertFalse( + self._controller_manager_interface.switch_controller( + strictness=SwitchController.Request.STRICT, + activate_controllers=[ + "scaled_joint_trajectory_controller", + "passthrough_trajectory_controller", + ], + ).ok + ) + self.assertFalse( + self._controller_manager_interface.switch_controller( + strictness=SwitchController.Request.STRICT, + activate_controllers=[ + "forward_velocity_controller", + "passthrough_trajectory_controller", + ], + ).ok + ) + self.assertFalse( + self._controller_manager_interface.switch_controller( + strictness=SwitchController.Request.STRICT, + activate_controllers=[ + "forward_position_controller", + "forward_velocity_controller", + ], + ).ok + ) + self.assertFalse( + self._controller_manager_interface.switch_controller( + strictness=SwitchController.Request.STRICT, + activate_controllers=[ + "scaled_joint_trajectory_controller", + "force_mode_controller", + ], + ).ok + ) + self.assertFalse( + self._controller_manager_interface.switch_controller( + strictness=SwitchController.Request.STRICT, + activate_controllers=[ + "scaled_joint_trajectory_controller", + "freedrive_mode_controller", + ], + ).ok + ) + + def test_activating_controller_with_running_position_controller_fails(self): + # Having a position-based controller active, no other controller should be able to + # activate. + self.assertTrue( + self._controller_manager_interface.switch_controller( + strictness=SwitchController.Request.BEST_EFFORT, + activate_controllers=[ + "scaled_joint_trajectory_controller", + ], + deactivate_controllers=[ + "joint_trajectory_controller", + "forward_position_controller", + "forward_velocity_controller", + "force_mode_controller", + "freedrive_mode_controller", + "passthrough_trajectory_controller", + ], + ).ok + ) + self.assertFalse( + self._controller_manager_interface.switch_controller( + strictness=SwitchController.Request.STRICT, + activate_controllers=[ + "forward_position_controller", + ], + ).ok + ) + self.assertFalse( + self._controller_manager_interface.switch_controller( + strictness=SwitchController.Request.STRICT, + activate_controllers=[ + "forward_velocity_controller", + ], + ).ok + ) + self.assertFalse( + self._controller_manager_interface.switch_controller( + strictness=SwitchController.Request.STRICT, + activate_controllers=[ + "freedrive_mode_controller", + ], + ).ok + ) + self.assertFalse( + self._controller_manager_interface.switch_controller( + strictness=SwitchController.Request.STRICT, + activate_controllers=[ + "passthrough_trajectory_controller", + ], + ).ok + ) + self.assertFalse( + self._controller_manager_interface.switch_controller( + strictness=SwitchController.Request.STRICT, + activate_controllers=[ + "force_mode_controller", + ], + ).ok + ) + # Stop controller again + self.assertTrue( + self._controller_manager_interface.switch_controller( + strictness=SwitchController.Request.STRICT, + deactivate_controllers=[ + "scaled_joint_trajectory_controller", + ], + ).ok + ) + + def test_activating_controller_with_running_passthrough_trajectory_controller_fails(self): + # Having a position-based controller active, no other controller should be able to + # activate. + self.assertTrue( + self._controller_manager_interface.switch_controller( + strictness=SwitchController.Request.BEST_EFFORT, + activate_controllers=["passthrough_trajectory_controller"], + deactivate_controllers=[ + "scaled_joint_trajectory_controller", + "joint_trajectory_controller", + "forward_position_controller", + "forward_velocity_controller", + "force_mode_controller", # tested in separate test + ], + ).ok + ) + self.assertFalse( + self._controller_manager_interface.switch_controller( + strictness=SwitchController.Request.STRICT, + activate_controllers=[ + "scaled_joint_trajectory_controller", + ], + ).ok + ) + self.assertFalse( + self._controller_manager_interface.switch_controller( + strictness=SwitchController.Request.STRICT, + activate_controllers=[ + "forward_position_controller", + ], + ).ok + ) + self.assertFalse( + self._controller_manager_interface.switch_controller( + strictness=SwitchController.Request.STRICT, + activate_controllers=[ + "forward_velocity_controller", + ], + ).ok + ) + self.assertFalse( + self._controller_manager_interface.switch_controller( + strictness=SwitchController.Request.STRICT, + activate_controllers=[ + "joint_trajectory_controller", + ], + ).ok + ) + self.assertFalse( + self._controller_manager_interface.switch_controller( + strictness=SwitchController.Request.STRICT, + activate_controllers=[ + "freedrive_mode_controller", + ], + ).ok + ) + # Stop the controller again + self.assertTrue( + self._controller_manager_interface.switch_controller( + strictness=SwitchController.Request.STRICT, + deactivate_controllers=[ + "passthrough_trajectory_controller", + ], + ).ok + ) + + def test_force_mode_and_trajectory_passthrough_controller_are_compatible(self): + # Deactivate all writing controllers + self.assertTrue( + self._controller_manager_interface.switch_controller( + strictness=SwitchController.Request.BEST_EFFORT, + deactivate_controllers=[ + "scaled_joint_trajectory_controller", + "joint_trajectory_controller", + "forward_position_controller", + "forward_velocity_controller", + "passthrough_trajectory_controller", + "force_mode_controller", + ], + ).ok + ) + + time.sleep(3) + + # Start both together + self.assertTrue( + self._controller_manager_interface.switch_controller( + strictness=SwitchController.Request.STRICT, + activate_controllers=[ + "passthrough_trajectory_controller", + "force_mode_controller", + ], + ).ok + ) + + # With passthrough traj controller running, start force mode controller + self.assertTrue( + self._controller_manager_interface.switch_controller( + strictness=SwitchController.Request.BEST_EFFORT, + activate_controllers=[ + "passthrough_trajectory_controller", + ], + deactivate_controllers=[ + "scaled_joint_trajectory_controller", + "joint_trajectory_controller", + "forward_position_controller", + "forward_velocity_controller", + "force_mode_controller", + ], + ).ok + ) + self.assertTrue( + self._controller_manager_interface.switch_controller( + strictness=SwitchController.Request.STRICT, + activate_controllers=[ + "force_mode_controller", + ], + ).ok + ) + + # With start force mode controller running, passthrough traj controller + self.assertTrue( + self._controller_manager_interface.switch_controller( + strictness=SwitchController.Request.BEST_EFFORT, + activate_controllers=[ + "force_mode_controller", + ], + deactivate_controllers=[ + "scaled_joint_trajectory_controller", + "joint_trajectory_controller", + "forward_position_controller", + "forward_velocity_controller", + "passthrough_trajectory_controller", + ], + ).ok + ) + self.assertTrue( + self._controller_manager_interface.switch_controller( + strictness=SwitchController.Request.STRICT, + activate_controllers=[ + "passthrough_trajectory_controller", + ], + ).ok + ) + + def test_tool_contact_compatibility(self): + # Deactivate all writing controllers + self.assertTrue( + self._controller_manager_interface.switch_controller( + strictness=SwitchController.Request.BEST_EFFORT, + deactivate_controllers=[ + "scaled_joint_trajectory_controller", + "joint_trajectory_controller", + "forward_position_controller", + "forward_velocity_controller", + "passthrough_trajectory_controller", + "force_mode_controller", + "tool_contact_controller", + ], + ).ok + ) + + time.sleep(3) + # Start tool contact controller and JTC + self.assertTrue( + self._controller_manager_interface.switch_controller( + strictness=SwitchController.Request.STRICT, + activate_controllers=[ + "scaled_joint_trajectory_controller", + "tool_contact_controller", + ], + ).ok + ) + self.assertTrue( + self._controller_manager_interface.switch_controller( + strictness=SwitchController.Request.STRICT, + deactivate_controllers=[ + "scaled_joint_trajectory_controller", + "tool_contact_controller", + ], + ).ok + ) + + # Start tool contact controller and passthrough trajectory + self.assertTrue( + self._controller_manager_interface.switch_controller( + strictness=SwitchController.Request.STRICT, + activate_controllers=[ + "passthrough_trajectory_controller", + "tool_contact_controller", + ], + ).ok + ) + self.assertTrue( + self._controller_manager_interface.switch_controller( + strictness=SwitchController.Request.STRICT, + deactivate_controllers=[ + "passthrough_trajectory_controller", + "tool_contact_controller", + ], + ).ok + ) + + # tool contact should not start with force_mode + self.assertFalse( + self._controller_manager_interface.switch_controller( + strictness=SwitchController.Request.STRICT, + activate_controllers=[ + "force_mode_controller", + "tool_contact_controller", + ], + ).ok + ) + self.assertFalse( + self._controller_manager_interface.switch_controller( + strictness=SwitchController.Request.STRICT, + activate_controllers=[ + "tool_contact_controller", + "force_mode_controller", + ], + ).ok + ) + + # tool contact should not start with freedrive + self.assertFalse( + self._controller_manager_interface.switch_controller( + strictness=SwitchController.Request.STRICT, + activate_controllers=[ + "tool_contact_controller", + "freedrive_mode_controller", + ], + ).ok + ) From 9648edcb2a56b5c5ad5e306edb30295532f66c02 Mon Sep 17 00:00:00 2001 From: URJala Date: Tue, 22 Apr 2025 10:56:18 +0000 Subject: [PATCH 13/20] Remove controller switching tests --- ur_robot_driver/test/test_mock_hardware.py | 412 --------------------- 1 file changed, 412 deletions(-) diff --git a/ur_robot_driver/test/test_mock_hardware.py b/ur_robot_driver/test/test_mock_hardware.py index d16812c70..77b6a640c 100644 --- a/ur_robot_driver/test/test_mock_hardware.py +++ b/ur_robot_driver/test/test_mock_hardware.py @@ -196,415 +196,3 @@ def test_trajectory_scaled(self, tf_prefix): TIMEOUT_EXECUTE_TRAJECTORY, ) self.assertEqual(result.error_code, FollowJointTrajectory.Result.SUCCESSFUL) - - # Controller switching tests - def test_activating_multiple_controllers_same_interface_fails(self): - # Deactivate all writing controllers - self.assertTrue( - self._controller_manager_interface.switch_controller( - strictness=SwitchController.Request.BEST_EFFORT, - deactivate_controllers=[ - "scaled_joint_trajectory_controller", - "joint_trajectory_controller", - "forward_position_controller", - "forward_velocity_controller", - "passthrough_trajectory_controller", - "force_mode_controller", - "freedrive_mode_controller", - ], - ).ok - ) - - # Activating different motion controllers should not be possible - self.assertFalse( - self._controller_manager_interface.switch_controller( - strictness=SwitchController.Request.STRICT, - activate_controllers=[ - "scaled_joint_trajectory_controller", - "joint_trajectory_controller", - ], - ).ok - ) - self.assertFalse( - self._controller_manager_interface.switch_controller( - strictness=SwitchController.Request.STRICT, - activate_controllers=[ - "scaled_joint_trajectory_controller", - "forward_position_controller", - ], - ).ok - ) - - def test_activating_multiple_controllers_different_interface_fails(self): - # Deactivate all writing controllers - self.assertTrue( - self._controller_manager_interface.switch_controller( - strictness=SwitchController.Request.BEST_EFFORT, - deactivate_controllers=[ - "scaled_joint_trajectory_controller", - "joint_trajectory_controller", - "forward_position_controller", - "forward_velocity_controller", - "force_mode_controller", - "passthrough_trajectory_controller", - "freedrive_mode_controller", - ], - ).ok - ) - self.assertFalse( - self._controller_manager_interface.switch_controller( - strictness=SwitchController.Request.STRICT, - activate_controllers=[ - "scaled_joint_trajectory_controller", - "forward_velocity_controller", - ], - ).ok - ) - self.assertFalse( - self._controller_manager_interface.switch_controller( - strictness=SwitchController.Request.STRICT, - activate_controllers=[ - "scaled_joint_trajectory_controller", - "passthrough_trajectory_controller", - ], - ).ok - ) - self.assertFalse( - self._controller_manager_interface.switch_controller( - strictness=SwitchController.Request.STRICT, - activate_controllers=[ - "forward_velocity_controller", - "passthrough_trajectory_controller", - ], - ).ok - ) - self.assertFalse( - self._controller_manager_interface.switch_controller( - strictness=SwitchController.Request.STRICT, - activate_controllers=[ - "forward_position_controller", - "forward_velocity_controller", - ], - ).ok - ) - self.assertFalse( - self._controller_manager_interface.switch_controller( - strictness=SwitchController.Request.STRICT, - activate_controllers=[ - "scaled_joint_trajectory_controller", - "force_mode_controller", - ], - ).ok - ) - self.assertFalse( - self._controller_manager_interface.switch_controller( - strictness=SwitchController.Request.STRICT, - activate_controllers=[ - "scaled_joint_trajectory_controller", - "freedrive_mode_controller", - ], - ).ok - ) - - def test_activating_controller_with_running_position_controller_fails(self): - # Having a position-based controller active, no other controller should be able to - # activate. - self.assertTrue( - self._controller_manager_interface.switch_controller( - strictness=SwitchController.Request.BEST_EFFORT, - activate_controllers=[ - "scaled_joint_trajectory_controller", - ], - deactivate_controllers=[ - "joint_trajectory_controller", - "forward_position_controller", - "forward_velocity_controller", - "force_mode_controller", - "freedrive_mode_controller", - "passthrough_trajectory_controller", - ], - ).ok - ) - self.assertFalse( - self._controller_manager_interface.switch_controller( - strictness=SwitchController.Request.STRICT, - activate_controllers=[ - "forward_position_controller", - ], - ).ok - ) - self.assertFalse( - self._controller_manager_interface.switch_controller( - strictness=SwitchController.Request.STRICT, - activate_controllers=[ - "forward_velocity_controller", - ], - ).ok - ) - self.assertFalse( - self._controller_manager_interface.switch_controller( - strictness=SwitchController.Request.STRICT, - activate_controllers=[ - "freedrive_mode_controller", - ], - ).ok - ) - self.assertFalse( - self._controller_manager_interface.switch_controller( - strictness=SwitchController.Request.STRICT, - activate_controllers=[ - "passthrough_trajectory_controller", - ], - ).ok - ) - self.assertFalse( - self._controller_manager_interface.switch_controller( - strictness=SwitchController.Request.STRICT, - activate_controllers=[ - "force_mode_controller", - ], - ).ok - ) - # Stop controller again - self.assertTrue( - self._controller_manager_interface.switch_controller( - strictness=SwitchController.Request.STRICT, - deactivate_controllers=[ - "scaled_joint_trajectory_controller", - ], - ).ok - ) - - def test_activating_controller_with_running_passthrough_trajectory_controller_fails(self): - # Having a position-based controller active, no other controller should be able to - # activate. - self.assertTrue( - self._controller_manager_interface.switch_controller( - strictness=SwitchController.Request.BEST_EFFORT, - activate_controllers=["passthrough_trajectory_controller"], - deactivate_controllers=[ - "scaled_joint_trajectory_controller", - "joint_trajectory_controller", - "forward_position_controller", - "forward_velocity_controller", - "force_mode_controller", # tested in separate test - ], - ).ok - ) - self.assertFalse( - self._controller_manager_interface.switch_controller( - strictness=SwitchController.Request.STRICT, - activate_controllers=[ - "scaled_joint_trajectory_controller", - ], - ).ok - ) - self.assertFalse( - self._controller_manager_interface.switch_controller( - strictness=SwitchController.Request.STRICT, - activate_controllers=[ - "forward_position_controller", - ], - ).ok - ) - self.assertFalse( - self._controller_manager_interface.switch_controller( - strictness=SwitchController.Request.STRICT, - activate_controllers=[ - "forward_velocity_controller", - ], - ).ok - ) - self.assertFalse( - self._controller_manager_interface.switch_controller( - strictness=SwitchController.Request.STRICT, - activate_controllers=[ - "joint_trajectory_controller", - ], - ).ok - ) - self.assertFalse( - self._controller_manager_interface.switch_controller( - strictness=SwitchController.Request.STRICT, - activate_controllers=[ - "freedrive_mode_controller", - ], - ).ok - ) - # Stop the controller again - self.assertTrue( - self._controller_manager_interface.switch_controller( - strictness=SwitchController.Request.STRICT, - deactivate_controllers=[ - "passthrough_trajectory_controller", - ], - ).ok - ) - - def test_force_mode_and_trajectory_passthrough_controller_are_compatible(self): - # Deactivate all writing controllers - self.assertTrue( - self._controller_manager_interface.switch_controller( - strictness=SwitchController.Request.BEST_EFFORT, - deactivate_controllers=[ - "scaled_joint_trajectory_controller", - "joint_trajectory_controller", - "forward_position_controller", - "forward_velocity_controller", - "passthrough_trajectory_controller", - "force_mode_controller", - ], - ).ok - ) - - time.sleep(3) - - # Start both together - self.assertTrue( - self._controller_manager_interface.switch_controller( - strictness=SwitchController.Request.STRICT, - activate_controllers=[ - "passthrough_trajectory_controller", - "force_mode_controller", - ], - ).ok - ) - - # With passthrough traj controller running, start force mode controller - self.assertTrue( - self._controller_manager_interface.switch_controller( - strictness=SwitchController.Request.BEST_EFFORT, - activate_controllers=[ - "passthrough_trajectory_controller", - ], - deactivate_controllers=[ - "scaled_joint_trajectory_controller", - "joint_trajectory_controller", - "forward_position_controller", - "forward_velocity_controller", - "force_mode_controller", - ], - ).ok - ) - self.assertTrue( - self._controller_manager_interface.switch_controller( - strictness=SwitchController.Request.STRICT, - activate_controllers=[ - "force_mode_controller", - ], - ).ok - ) - - # With start force mode controller running, passthrough traj controller - self.assertTrue( - self._controller_manager_interface.switch_controller( - strictness=SwitchController.Request.BEST_EFFORT, - activate_controllers=[ - "force_mode_controller", - ], - deactivate_controllers=[ - "scaled_joint_trajectory_controller", - "joint_trajectory_controller", - "forward_position_controller", - "forward_velocity_controller", - "passthrough_trajectory_controller", - ], - ).ok - ) - self.assertTrue( - self._controller_manager_interface.switch_controller( - strictness=SwitchController.Request.STRICT, - activate_controllers=[ - "passthrough_trajectory_controller", - ], - ).ok - ) - - def test_tool_contact_compatibility(self): - # Deactivate all writing controllers - self.assertTrue( - self._controller_manager_interface.switch_controller( - strictness=SwitchController.Request.BEST_EFFORT, - deactivate_controllers=[ - "scaled_joint_trajectory_controller", - "joint_trajectory_controller", - "forward_position_controller", - "forward_velocity_controller", - "passthrough_trajectory_controller", - "force_mode_controller", - "tool_contact_controller", - ], - ).ok - ) - - time.sleep(3) - # Start tool contact controller and JTC - self.assertTrue( - self._controller_manager_interface.switch_controller( - strictness=SwitchController.Request.STRICT, - activate_controllers=[ - "scaled_joint_trajectory_controller", - "tool_contact_controller", - ], - ).ok - ) - self.assertTrue( - self._controller_manager_interface.switch_controller( - strictness=SwitchController.Request.STRICT, - deactivate_controllers=[ - "scaled_joint_trajectory_controller", - "tool_contact_controller", - ], - ).ok - ) - - # Start tool contact controller and passthrough trajectory - self.assertTrue( - self._controller_manager_interface.switch_controller( - strictness=SwitchController.Request.STRICT, - activate_controllers=[ - "passthrough_trajectory_controller", - "tool_contact_controller", - ], - ).ok - ) - self.assertTrue( - self._controller_manager_interface.switch_controller( - strictness=SwitchController.Request.STRICT, - deactivate_controllers=[ - "passthrough_trajectory_controller", - "tool_contact_controller", - ], - ).ok - ) - - # tool contact should not start with force_mode - self.assertFalse( - self._controller_manager_interface.switch_controller( - strictness=SwitchController.Request.STRICT, - activate_controllers=[ - "force_mode_controller", - "tool_contact_controller", - ], - ).ok - ) - self.assertFalse( - self._controller_manager_interface.switch_controller( - strictness=SwitchController.Request.STRICT, - activate_controllers=[ - "tool_contact_controller", - "force_mode_controller", - ], - ).ok - ) - - # tool contact should not start with freedrive - self.assertFalse( - self._controller_manager_interface.switch_controller( - strictness=SwitchController.Request.STRICT, - activate_controllers=[ - "tool_contact_controller", - "freedrive_mode_controller", - ], - ).ok - ) From 73ed696a7df78bfdf061355cbb02d93fecc17783 Mon Sep 17 00:00:00 2001 From: URJala Date: Tue, 22 Apr 2025 11:00:24 +0000 Subject: [PATCH 14/20] add mock hardware test to build tests --- ur_robot_driver/CMakeLists.txt | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/ur_robot_driver/CMakeLists.txt b/ur_robot_driver/CMakeLists.txt index 1f35f4631..156d1c358 100644 --- a/ur_robot_driver/CMakeLists.txt +++ b/ur_robot_driver/CMakeLists.txt @@ -230,6 +230,11 @@ if(BUILD_TESTING) find_package(ur_msgs REQUIRED) find_package(launch_testing_ament_cmake) + add_launch_test(test/test_mock_hardware.py + TIMEOUT + 800 + ) + if(${UR_ROBOT_DRIVER_BUILD_INTEGRATION_TESTS}) add_launch_test(test/launch_args.py TIMEOUT From 4137ee4a46f51a5eda6f65a7c0b4bf0ba50996f0 Mon Sep 17 00:00:00 2001 From: URJala Date: Mon, 21 Jul 2025 07:58:46 +0000 Subject: [PATCH 15/20] Remove passthrough test from sjtc test script --- ...ntegration_test_scaled_joint_controller.py | 125 ------------------ 1 file changed, 125 deletions(-) diff --git a/ur_robot_driver/test/integration_test_scaled_joint_controller.py b/ur_robot_driver/test/integration_test_scaled_joint_controller.py index 7ef2e529b..9e311a079 100755 --- a/ur_robot_driver/test/integration_test_scaled_joint_controller.py +++ b/ur_robot_driver/test/integration_test_scaled_joint_controller.py @@ -295,128 +295,3 @@ def js_cb(msg): # result = self.get_result("/scaled_joint_trajectory_controller/follow_joint_trajectory", goal_response, TIMEOUT_EXECUTE_TRAJECTORY) # self.assertEqual(result.error_code, FollowJointTrajectory.Result.GOAL_TOLERANCE_VIOLATED) # self.node.get_logger().info("Received result GOAL_TOLERANCE_VIOLATED") - - def test_passthrough_trajectory(self, tf_prefix): - if self.mock_hardware: - return True - self.assertTrue( - self._controller_manager_interface.switch_controller( - strictness=SwitchController.Request.BEST_EFFORT, - activate_controllers=["passthrough_trajectory_controller"], - deactivate_controllers=["scaled_joint_trajectory_controller"], - ).ok - ) - waypts = [ - [-1.58, -1.692, -1.4311, -0.0174, 1.5882, 0.0349], - [-2.5, -1.692, -1.4311, -0.0174, 1.5882, 0.0349], - [-1.58, -1.692, -1.4311, -0.0174, 1.5882, 0.0349], - ] - time_vec = [ - Duration(sec=4, nanosec=0), - Duration(sec=8, nanosec=0), - Duration(sec=12, nanosec=0), - ] - goal_tolerance = [ - JointTolerance(position=0.01, velocity=5e-5, name=tf_prefix + ROBOT_JOINTS[i]) - for i in range(len(ROBOT_JOINTS)) - ] - goal_time_tolerance = Duration(sec=1, nanosec=0) - test_trajectory = zip(time_vec, waypts) - trajectory = JointTrajectory( - points=[ - JointTrajectoryPoint(positions=pos, time_from_start=times) - for (times, pos) in test_trajectory - ], - joint_names=[tf_prefix + ROBOT_JOINTS[i] for i in range(len(ROBOT_JOINTS))], - ) - goal_handle = self._passthrough_forward_joint_trajectory.send_goal( - trajectory=trajectory, - goal_time_tolerance=goal_time_tolerance, - goal_tolerance=goal_tolerance, - ) - self.assertTrue(goal_handle.accepted) - if goal_handle.accepted: - result = self._passthrough_forward_joint_trajectory.get_result( - goal_handle, TIMEOUT_EXECUTE_TRAJECTORY - ) - self.assertEqual(result.error_code, FollowJointTrajectory.Result.SUCCESSFUL) - - # Full quintic trajectory - test_trajectory = zip(time_vec, waypts) - trajectory = JointTrajectory( - points=[ - JointTrajectoryPoint( - positions=pos, - time_from_start=times, - velocities=[0.0, 0.0, 0.0, 0.0, 0.0, 0.0], - accelerations=[0.0, 0.0, 0.0, 0.0, 0.0, 0.0], - ) - for (times, pos) in test_trajectory - ], - joint_names=[tf_prefix + ROBOT_JOINTS[i] for i in range(len(ROBOT_JOINTS))], - ) - goal_handle = self._passthrough_forward_joint_trajectory.send_goal( - trajectory=trajectory, - goal_time_tolerance=goal_time_tolerance, - goal_tolerance=goal_tolerance, - ) - self.assertTrue(goal_handle.accepted) - if goal_handle.accepted: - result = self._passthrough_forward_joint_trajectory.get_result( - goal_handle, TIMEOUT_EXECUTE_TRAJECTORY - ) - self.assertEqual(result.error_code, FollowJointTrajectory.Result.SUCCESSFUL) - - # Test impossible goal tolerance, should fail. - test_trajectory = zip(time_vec, waypts) - trajectory = JointTrajectory( - points=[ - JointTrajectoryPoint(positions=pos, time_from_start=times) - for (times, pos) in test_trajectory - ], - joint_names=[tf_prefix + ROBOT_JOINTS[i] for i in range(len(ROBOT_JOINTS))], - ) - goal_tolerance = [ - JointTolerance(position=0.000000001, name=tf_prefix + ROBOT_JOINTS[i]) - for i in range(len(ROBOT_JOINTS)) - ] - goal_handle = self._passthrough_forward_joint_trajectory.send_goal( - trajectory=trajectory, - goal_time_tolerance=goal_time_tolerance, - goal_tolerance=goal_tolerance, - ) - self.assertTrue(goal_handle.accepted) - if goal_handle.accepted: - result = self._passthrough_forward_joint_trajectory.get_result( - goal_handle, TIMEOUT_EXECUTE_TRAJECTORY - ) - self.assertEqual( - result.error_code, FollowJointTrajectory.Result.GOAL_TOLERANCE_VIOLATED - ) - - # Test impossible goal time - goal_tolerance = [ - JointTolerance(position=0.01, name=tf_prefix + ROBOT_JOINTS[i]) for i in range(6) - ] - goal_time_tolerance.sec = 0 - goal_time_tolerance.nanosec = 10 - goal_handle = self._passthrough_forward_joint_trajectory.send_goal( - trajectory=trajectory, - goal_time_tolerance=goal_time_tolerance, - goal_tolerance=goal_tolerance, - ) - self.assertTrue(goal_handle.accepted) - if goal_handle.accepted: - result = self._passthrough_forward_joint_trajectory.get_result( - goal_handle, TIMEOUT_EXECUTE_TRAJECTORY - ) - self.assertEqual( - result.error_code, FollowJointTrajectory.Result.GOAL_TOLERANCE_VIOLATED - ) - self.assertTrue( - self._controller_manager_interface.switch_controller( - strictness=SwitchController.Request.BEST_EFFORT, - deactivate_controllers=["passthrough_trajectory_controller"], - activate_controllers=["scaled_joint_trajectory_controller"], - ).ok - ) From 8362f9f92da096767e5308bfee4ab9e7fe99023c Mon Sep 17 00:00:00 2001 From: URJala Date: Mon, 21 Jul 2025 08:10:24 +0000 Subject: [PATCH 16/20] Clean up test description generators --- ur_robot_driver/test/test_common.py | 8 ++------ ur_robot_driver/test/test_mock_hardware.py | 9 +++------ 2 files changed, 5 insertions(+), 12 deletions(-) diff --git a/ur_robot_driver/test/test_common.py b/ur_robot_driver/test/test_common.py index 573a29b17..860621967 100644 --- a/ur_robot_driver/test/test_common.py +++ b/ur_robot_driver/test/test_common.py @@ -376,7 +376,6 @@ def generate_mock_hardware_test_description( tf_prefix="", initial_joint_controller="scaled_joint_trajectory_controller", controller_spawner_timeout=TIMEOUT_WAIT_SERVICE_INITIAL, - use_mock_hardware="false", ): ur_type = LaunchConfiguration("ur_type") @@ -390,8 +389,8 @@ def generate_mock_hardware_test_description( "headless_mode": "true", "launch_dashboard_client": "true", "start_joint_controller": "false", - "use_mock_hardware": use_mock_hardware, - "mock_sensor_commands": use_mock_hardware, + "use_mock_hardware": "true", + "mock_sensor_commands": "true", } if tf_prefix: launch_arguments["tf_prefix"] = tf_prefix @@ -412,7 +411,6 @@ def generate_driver_test_description( tf_prefix="", initial_joint_controller="scaled_joint_trajectory_controller", controller_spawner_timeout=TIMEOUT_WAIT_SERVICE_INITIAL, - use_mock_hardware="false", ): ur_type = LaunchConfiguration("ur_type") @@ -425,8 +423,6 @@ def generate_driver_test_description( "headless_mode": "true", "launch_dashboard_client": "true", "start_joint_controller": "false", - "use_mock_hardware": use_mock_hardware, - "mock_sensor_commands": use_mock_hardware, } if tf_prefix: launch_arguments["tf_prefix"] = tf_prefix diff --git a/ur_robot_driver/test/test_mock_hardware.py b/ur_robot_driver/test/test_mock_hardware.py index 77b6a640c..18f95dfe1 100644 --- a/ur_robot_driver/test/test_mock_hardware.py +++ b/ur_robot_driver/test/test_mock_hardware.py @@ -56,11 +56,9 @@ @pytest.mark.launch_test -@launch_testing.parametrize("tf_prefix, use_mock_hardware", [("", "true"), ("my_ur_", "true")]) -def generate_test_description(tf_prefix, use_mock_hardware): - return generate_mock_hardware_test_description( - tf_prefix=tf_prefix, use_mock_hardware=use_mock_hardware - ) +@launch_testing.parametrize("tf_prefix", [(""), ("my_ur_")]) +def generate_test_description(tf_prefix): + return generate_mock_hardware_test_description(tf_prefix=tf_prefix) class RobotDriverTest(unittest.TestCase): @@ -70,7 +68,6 @@ def setUpClass(cls, use_mock_hardware): rclpy.init() cls.node = Node("mock_hardware_test") time.sleep(1) - cls.mock_hardware = use_mock_hardware == "true" cls.init_robot(cls) @classmethod From 5a2297b6486b5f8914ef87a7738f81777a3d1bf1 Mon Sep 17 00:00:00 2001 From: URJala Date: Mon, 21 Jul 2025 08:56:47 +0000 Subject: [PATCH 17/20] fix config controller test --- ur_robot_driver/test/integration_test_config_controller.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ur_robot_driver/test/integration_test_config_controller.py b/ur_robot_driver/test/integration_test_config_controller.py index 1e29de8e4..9e7e6f12b 100644 --- a/ur_robot_driver/test/integration_test_config_controller.py +++ b/ur_robot_driver/test/integration_test_config_controller.py @@ -97,6 +97,6 @@ def setUp(self): # def test_get_robot_software_version(self): - self.assertNotEqual( - self._configuration_controller_interface.get_robot_software_version().major, 0 + self.assertGreater( + self._configuration_controller_interface.get_robot_software_version().major, 1 ) From d77bc8f412afc983f0378830c275ad1f3e61672f Mon Sep 17 00:00:00 2001 From: URJala Date: Mon, 21 Jul 2025 09:01:58 +0000 Subject: [PATCH 18/20] Clean up all the test setups and remove irrelevant tests --- .../integration_test_config_controller.py | 13 ------- .../test/integration_test_io_controller.py | 15 -------- ...integration_test_passthrough_controller.py | 10 +---- ...ntegration_test_scaled_joint_controller.py | 7 ---- ur_robot_driver/test/test_mock_hardware.py | 38 +++---------------- 5 files changed, 7 insertions(+), 76 deletions(-) diff --git a/ur_robot_driver/test/integration_test_config_controller.py b/ur_robot_driver/test/integration_test_config_controller.py index 9e7e6f12b..4a52c4cd2 100644 --- a/ur_robot_driver/test/integration_test_config_controller.py +++ b/ur_robot_driver/test/integration_test_config_controller.py @@ -35,12 +35,10 @@ import launch_testing import pytest import rclpy -from control_msgs.action import FollowJointTrajectory from rclpy.node import Node sys.path.append(os.path.dirname(__file__)) from test_common import ( # noqa: E402 - ActionInterface, ControllerManagerInterface, DashboardInterface, IoStatusInterface, @@ -76,17 +74,6 @@ def init_robot(self): self._io_status_controller_interface = IoStatusInterface(self.node) self._configuration_controller_interface = ConfigurationInterface(self.node) - self._scaled_follow_joint_trajectory = ActionInterface( - self.node, - "/scaled_joint_trajectory_controller/follow_joint_trajectory", - FollowJointTrajectory, - ) - self._passthrough_forward_joint_trajectory = ActionInterface( - self.node, - "/passthrough_trajectory_controller/follow_joint_trajectory", - FollowJointTrajectory, - ) - def setUp(self): self._dashboard_interface.start_robot() time.sleep(1) diff --git a/ur_robot_driver/test/integration_test_io_controller.py b/ur_robot_driver/test/integration_test_io_controller.py index e1aa83706..e3a54f14e 100644 --- a/ur_robot_driver/test/integration_test_io_controller.py +++ b/ur_robot_driver/test/integration_test_io_controller.py @@ -36,17 +36,14 @@ import launch_testing import pytest import rclpy -from control_msgs.action import FollowJointTrajectory from rclpy.node import Node from ur_msgs.msg import IOStates sys.path.append(os.path.dirname(__file__)) from test_common import ( # noqa: E402 - ActionInterface, ControllerManagerInterface, DashboardInterface, IoStatusInterface, - ConfigurationInterface, generate_driver_test_description, ) @@ -76,18 +73,6 @@ def init_robot(self): self._dashboard_interface = DashboardInterface(self.node) self._controller_manager_interface = ControllerManagerInterface(self.node) self._io_status_controller_interface = IoStatusInterface(self.node) - self._configuration_controller_interface = ConfigurationInterface(self.node) - - self._scaled_follow_joint_trajectory = ActionInterface( - self.node, - "/scaled_joint_trajectory_controller/follow_joint_trajectory", - FollowJointTrajectory, - ) - self._passthrough_forward_joint_trajectory = ActionInterface( - self.node, - "/passthrough_trajectory_controller/follow_joint_trajectory", - FollowJointTrajectory, - ) def setUp(self): self._dashboard_interface.start_robot() diff --git a/ur_robot_driver/test/integration_test_passthrough_controller.py b/ur_robot_driver/test/integration_test_passthrough_controller.py index aa72745ad..6b907bcd5 100644 --- a/ur_robot_driver/test/integration_test_passthrough_controller.py +++ b/ur_robot_driver/test/integration_test_passthrough_controller.py @@ -49,7 +49,6 @@ ControllerManagerInterface, DashboardInterface, IoStatusInterface, - ConfigurationInterface, generate_driver_test_description, ROBOT_JOINTS, TIMEOUT_EXECUTE_TRAJECTORY, @@ -101,13 +100,7 @@ def init_robot(self): self._controller_manager_interface = ControllerManagerInterface(self.node) self._io_status_controller_interface = IoStatusInterface(self.node) - self._configuration_controller_interface = ConfigurationInterface(self.node) - self._scaled_follow_joint_trajectory = ActionInterface( - self.node, - "/scaled_joint_trajectory_controller/follow_joint_trajectory", - FollowJointTrajectory, - ) self._passthrough_forward_joint_trajectory = ActionInterface( self.node, "/passthrough_trajectory_controller/follow_joint_trajectory", @@ -115,8 +108,7 @@ def init_robot(self): ) def setUp(self): - if self._dashboard_interface: - self._dashboard_interface.start_robot() + self._dashboard_interface.start_robot() time.sleep(1) self.assertTrue(self._io_status_controller_interface.resend_robot_program().success) diff --git a/ur_robot_driver/test/integration_test_scaled_joint_controller.py b/ur_robot_driver/test/integration_test_scaled_joint_controller.py index 9e311a079..c2ca19f9f 100755 --- a/ur_robot_driver/test/integration_test_scaled_joint_controller.py +++ b/ur_robot_driver/test/integration_test_scaled_joint_controller.py @@ -48,7 +48,6 @@ ControllerManagerInterface, DashboardInterface, IoStatusInterface, - ConfigurationInterface, generate_driver_test_description, ROBOT_JOINTS, TIMEOUT_EXECUTE_TRAJECTORY, @@ -80,18 +79,12 @@ def init_robot(self): self._dashboard_interface = DashboardInterface(self.node) self._controller_manager_interface = ControllerManagerInterface(self.node) self._io_status_controller_interface = IoStatusInterface(self.node) - self._configuration_controller_interface = ConfigurationInterface(self.node) self._scaled_follow_joint_trajectory = ActionInterface( self.node, "/scaled_joint_trajectory_controller/follow_joint_trajectory", FollowJointTrajectory, ) - self._passthrough_forward_joint_trajectory = ActionInterface( - self.node, - "/passthrough_trajectory_controller/follow_joint_trajectory", - FollowJointTrajectory, - ) def setUp(self): self._dashboard_interface.start_robot() diff --git a/ur_robot_driver/test/test_mock_hardware.py b/ur_robot_driver/test/test_mock_hardware.py index 18f95dfe1..6292fd6bd 100644 --- a/ur_robot_driver/test/test_mock_hardware.py +++ b/ur_robot_driver/test/test_mock_hardware.py @@ -76,8 +76,8 @@ def tearDownClass(cls): cls.node.destroy_node() rclpy.shutdown() + # Connect to all interfaces and actions, even ones we know won't work with mock hardware (Except dashboard) def init_robot(self): - self._dashboard_interface = None self._controller_manager_interface = ControllerManagerInterface(self.node) self._io_status_controller_interface = IoStatusInterface(self.node) self._configuration_controller_interface = ConfigurationInterface(self.node) @@ -88,9 +88,11 @@ def init_robot(self): FollowJointTrajectory, ) - def setUp(self): - time.sleep(1) - self.assertTrue(self._io_status_controller_interface.resend_robot_program().success) + self._passthrough_forward_joint_trajectory = ActionInterface( + self.node, + "/passthrough_trajectory_controller/follow_joint_trajectory", + FollowJointTrajectory, + ) # # Test functions @@ -165,31 +167,3 @@ def test_illegal_trajectory(self, tf_prefix): # Verify the failure is correctly detected self.assertFalse(goal_handle.accepted) - - def test_trajectory_scaled(self, tf_prefix): - """Test robot movement.""" - # Construct test trajectory - test_trajectory = [ - (Duration(sec=6, nanosec=0), [0.0 for j in ROBOT_JOINTS]), - (Duration(sec=6, nanosec=500000000), [-1.0 for j in ROBOT_JOINTS]), - ] - - trajectory = JointTrajectory( - joint_names=[tf_prefix + joint for joint in ROBOT_JOINTS], - points=[ - JointTrajectoryPoint(positions=test_pos, time_from_start=test_time) - for (test_time, test_pos) in test_trajectory - ], - ) - - # Execute trajectory - logging.info("Sending goal for robot to follow") - goal_handle = self._scaled_follow_joint_trajectory.send_goal(trajectory=trajectory) - self.assertTrue(goal_handle.accepted) - - # Verify execution - result = self._scaled_follow_joint_trajectory.get_result( - goal_handle, - TIMEOUT_EXECUTE_TRAJECTORY, - ) - self.assertEqual(result.error_code, FollowJointTrajectory.Result.SUCCESSFUL) From 3cc8a2ca2121f16af35a9588a39b3a88b401473b Mon Sep 17 00:00:00 2001 From: URJala Date: Mon, 21 Jul 2025 10:50:13 +0000 Subject: [PATCH 19/20] remove mock hardware parameter from setUpClass --- ur_robot_driver/test/test_mock_hardware.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ur_robot_driver/test/test_mock_hardware.py b/ur_robot_driver/test/test_mock_hardware.py index 6292fd6bd..81dc01797 100644 --- a/ur_robot_driver/test/test_mock_hardware.py +++ b/ur_robot_driver/test/test_mock_hardware.py @@ -63,7 +63,7 @@ def generate_test_description(tf_prefix): class RobotDriverTest(unittest.TestCase): @classmethod - def setUpClass(cls, use_mock_hardware): + def setUpClass(cls): # Initialize the ROS context rclpy.init() cls.node = Node("mock_hardware_test") From 47b0e3d6bead554bb948cdba34d3afb261b0b569 Mon Sep 17 00:00:00 2001 From: URJala Date: Mon, 21 Jul 2025 11:34:15 +0000 Subject: [PATCH 20/20] Move sjtc trajectory and illegal trajectory test to test_common, so it can be used by mock hardware test as well. --- ...ntegration_test_scaled_joint_controller.py | 71 +---------------- ur_robot_driver/test/test_common.py | 76 +++++++++++++++++++ ur_robot_driver/test/test_mock_hardware.py | 71 +++-------------- 3 files changed, 92 insertions(+), 126 deletions(-) diff --git a/ur_robot_driver/test/integration_test_scaled_joint_controller.py b/ur_robot_driver/test/integration_test_scaled_joint_controller.py index c2ca19f9f..7c651cf30 100755 --- a/ur_robot_driver/test/integration_test_scaled_joint_controller.py +++ b/ur_robot_driver/test/integration_test_scaled_joint_controller.py @@ -51,6 +51,8 @@ generate_driver_test_description, ROBOT_JOINTS, TIMEOUT_EXECUTE_TRAJECTORY, + sjtc_trajectory_test, + sjtc_illegal_trajectory_test, ) @@ -104,75 +106,10 @@ def test_start_scaled_jtc_controller(self): ) def test_trajectory(self, tf_prefix): - """Test robot movement.""" - self.assertTrue( - self._controller_manager_interface.switch_controller( - strictness=SwitchController.Request.BEST_EFFORT, - deactivate_controllers=["passthrough_trajectory_controller"], - activate_controllers=["scaled_joint_trajectory_controller"], - ).ok - ) - # Construct test trajectory - test_trajectory = [ - (Duration(sec=6, nanosec=0), [0.0 for j in ROBOT_JOINTS]), - (Duration(sec=9, nanosec=0), [-0.5 for j in ROBOT_JOINTS]), - (Duration(sec=12, nanosec=0), [-1.0 for j in ROBOT_JOINTS]), - ] - - trajectory = JointTrajectory( - joint_names=[tf_prefix + joint for joint in ROBOT_JOINTS], - points=[ - JointTrajectoryPoint(positions=test_pos, time_from_start=test_time) - for (test_time, test_pos) in test_trajectory - ], - ) - - # Sending trajectory goal - logging.info("Sending simple goal") - goal_handle = self._scaled_follow_joint_trajectory.send_goal(trajectory=trajectory) - self.assertTrue(goal_handle.accepted) - - # Verify execution - result = self._scaled_follow_joint_trajectory.get_result( - goal_handle, TIMEOUT_EXECUTE_TRAJECTORY - ) - self.assertEqual(result.error_code, FollowJointTrajectory.Result.SUCCESSFUL) + sjtc_trajectory_test(self, tf_prefix) def test_illegal_trajectory(self, tf_prefix): - """ - Test trajectory server. - - This is more of a validation test that the testing suite does the right thing - """ - self.assertTrue( - self._controller_manager_interface.switch_controller( - strictness=SwitchController.Request.BEST_EFFORT, - deactivate_controllers=["passthrough_trajectory_controller"], - activate_controllers=["scaled_joint_trajectory_controller"], - ).ok - ) - # Construct test trajectory, the second point wrongly starts before the first - test_trajectory = [ - (Duration(sec=6, nanosec=0), [0.0 for j in ROBOT_JOINTS]), - (Duration(sec=3, nanosec=0), [-0.5 for j in ROBOT_JOINTS]), - ] - - trajectory = JointTrajectory( - joint_names=[tf_prefix + joint for joint in ROBOT_JOINTS], - points=[ - JointTrajectoryPoint(positions=test_pos, time_from_start=test_time) - for (test_time, test_pos) in test_trajectory - ], - ) - - # Send illegal goal - logging.info("Sending illegal goal") - goal_handle = self._scaled_follow_joint_trajectory.send_goal( - trajectory=trajectory, - ) - - # Verify the failure is correctly detected - self.assertFalse(goal_handle.accepted) + sjtc_illegal_trajectory_test(self, tf_prefix) def test_trajectory_scaled(self, tf_prefix): """Test robot movement.""" diff --git a/ur_robot_driver/test/test_common.py b/ur_robot_driver/test/test_common.py index 860621967..9ecd72157 100644 --- a/ur_robot_driver/test/test_common.py +++ b/ur_robot_driver/test/test_common.py @@ -58,6 +58,9 @@ Load, ) from ur_msgs.srv import SetIO, GetRobotSoftwareVersion, SetForceMode +from builtin_interfaces.msg import Duration +from control_msgs.action import FollowJointTrajectory +from trajectory_msgs.msg import JointTrajectory, JointTrajectoryPoint TIMEOUT_WAIT_SERVICE = 10 TIMEOUT_WAIT_SERVICE_INITIAL = 120 # If we download the docker image simultaneously to the tests, it can take quite some time until the dashboard server is reachable and usable. @@ -302,6 +305,79 @@ class ForceModeInterface( pass +def sjtc_trajectory_test(tester, tf_prefix): + """Test robot movement.""" + tester.assertTrue( + tester._controller_manager_interface.switch_controller( + strictness=SwitchController.Request.BEST_EFFORT, + deactivate_controllers=["passthrough_trajectory_controller"], + activate_controllers=["scaled_joint_trajectory_controller"], + ).ok + ) + # Construct test trajectory + test_trajectory = [ + (Duration(sec=6, nanosec=0), [0.0 for j in ROBOT_JOINTS]), + (Duration(sec=9, nanosec=0), [-0.5 for j in ROBOT_JOINTS]), + (Duration(sec=12, nanosec=0), [-1.0 for j in ROBOT_JOINTS]), + ] + + trajectory = JointTrajectory( + joint_names=[tf_prefix + joint for joint in ROBOT_JOINTS], + points=[ + JointTrajectoryPoint(positions=test_pos, time_from_start=test_time) + for (test_time, test_pos) in test_trajectory + ], + ) + + # Sending trajectory goal + logging.info("Sending simple goal") + goal_handle = tester._scaled_follow_joint_trajectory.send_goal(trajectory=trajectory) + tester.assertTrue(goal_handle.accepted) + + # Verify execution + result = tester._scaled_follow_joint_trajectory.get_result( + goal_handle, TIMEOUT_EXECUTE_TRAJECTORY + ) + tester.assertEqual(result.error_code, FollowJointTrajectory.Result.SUCCESSFUL) + + +def sjtc_illegal_trajectory_test(tester, tf_prefix): + """ + Test trajectory server. + + This is more of a validation test that the testing suite does the right thing + """ + tester.assertTrue( + tester._controller_manager_interface.switch_controller( + strictness=SwitchController.Request.BEST_EFFORT, + deactivate_controllers=["passthrough_trajectory_controller"], + activate_controllers=["scaled_joint_trajectory_controller"], + ).ok + ) + # Construct test trajectory, the second point wrongly starts before the first + test_trajectory = [ + (Duration(sec=6, nanosec=0), [0.0 for j in ROBOT_JOINTS]), + (Duration(sec=3, nanosec=0), [-0.5 for j in ROBOT_JOINTS]), + ] + + trajectory = JointTrajectory( + joint_names=[tf_prefix + joint for joint in ROBOT_JOINTS], + points=[ + JointTrajectoryPoint(positions=test_pos, time_from_start=test_time) + for (test_time, test_pos) in test_trajectory + ], + ) + + # Send illegal goal + logging.info("Sending illegal goal") + goal_handle = tester._scaled_follow_joint_trajectory.send_goal( + trajectory=trajectory, + ) + + # Verify the failure is correctly detected + tester.assertFalse(goal_handle.accepted) + + def _declare_launch_arguments(): declared_arguments = [] diff --git a/ur_robot_driver/test/test_mock_hardware.py b/ur_robot_driver/test/test_mock_hardware.py index 81dc01797..2bf36b6fd 100644 --- a/ur_robot_driver/test/test_mock_hardware.py +++ b/ur_robot_driver/test/test_mock_hardware.py @@ -31,17 +31,13 @@ import sys import time import unittest -import logging import launch_testing import pytest import rclpy from rclpy.node import Node -from builtin_interfaces.msg import Duration from control_msgs.action import FollowJointTrajectory from controller_manager_msgs.srv import SwitchController -from trajectory_msgs.msg import JointTrajectory, JointTrajectoryPoint - sys.path.append(os.path.dirname(__file__)) from test_common import ( # noqa: E402 @@ -50,8 +46,8 @@ IoStatusInterface, ConfigurationInterface, generate_mock_hardware_test_description, - ROBOT_JOINTS, - TIMEOUT_EXECUTE_TRAJECTORY, + sjtc_trajectory_test, + sjtc_illegal_trajectory_test, ) @@ -104,66 +100,23 @@ def test_get_robot_software_version(self): ) def test_start_scaled_jtc_controller(self): + # Deactivate controller, if it is not already self.assertTrue( self._controller_manager_interface.switch_controller( strictness=SwitchController.Request.BEST_EFFORT, + deactivate_controllers=["scaled_joint_trajectory_controller"], + ).ok + ) + # Activate controller + self.assertTrue( + self._controller_manager_interface.switch_controller( + strictness=SwitchController.Request.STRICT, activate_controllers=["scaled_joint_trajectory_controller"], ).ok ) def test_trajectory(self, tf_prefix): - """Test robot movement.""" - # Construct test trajectory - test_trajectory = [ - (Duration(sec=6, nanosec=0), [0.0 for j in ROBOT_JOINTS]), - (Duration(sec=9, nanosec=0), [-0.5 for j in ROBOT_JOINTS]), - (Duration(sec=12, nanosec=0), [-1.0 for j in ROBOT_JOINTS]), - ] - - trajectory = JointTrajectory( - joint_names=[tf_prefix + joint for joint in ROBOT_JOINTS], - points=[ - JointTrajectoryPoint(positions=test_pos, time_from_start=test_time) - for (test_time, test_pos) in test_trajectory - ], - ) - - # Sending trajectory goal - logging.info("Sending simple goal") - goal_handle = self._scaled_follow_joint_trajectory.send_goal(trajectory=trajectory) - self.assertTrue(goal_handle.accepted) - - # Verify execution - result = self._scaled_follow_joint_trajectory.get_result( - goal_handle, TIMEOUT_EXECUTE_TRAJECTORY - ) - self.assertEqual(result.error_code, FollowJointTrajectory.Result.SUCCESSFUL) + sjtc_trajectory_test(self, tf_prefix) def test_illegal_trajectory(self, tf_prefix): - """ - Test trajectory server. - - This is more of a validation test that the testing suite does the right thing - """ - # Construct test trajectory, the second point wrongly starts before the first - test_trajectory = [ - (Duration(sec=6, nanosec=0), [0.0 for j in ROBOT_JOINTS]), - (Duration(sec=3, nanosec=0), [-0.5 for j in ROBOT_JOINTS]), - ] - - trajectory = JointTrajectory( - joint_names=[tf_prefix + joint for joint in ROBOT_JOINTS], - points=[ - JointTrajectoryPoint(positions=test_pos, time_from_start=test_time) - for (test_time, test_pos) in test_trajectory - ], - ) - - # Send illegal goal - logging.info("Sending illegal goal") - goal_handle = self._scaled_follow_joint_trajectory.send_goal( - trajectory=trajectory, - ) - - # Verify the failure is correctly detected - self.assertFalse(goal_handle.accepted) + sjtc_illegal_trajectory_test(self, tf_prefix)