From 1848f8b1209709a9dd50e7ae4cff034cfdcae870 Mon Sep 17 00:00:00 2001 From: URJala Date: Tue, 4 Mar 2025 09:25:31 +0000 Subject: [PATCH 01/22] Revert "Apply renaming of member variables of JTC (#1275)" This reverts commit 341506a363eb089d235a0646824bb04eb106f47d. --- .../src/scaled_joint_trajectory_controller.cpp | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/ur_controllers/src/scaled_joint_trajectory_controller.cpp b/ur_controllers/src/scaled_joint_trajectory_controller.cpp index bdc935b0d..2f1a08b45 100644 --- a/ur_controllers/src/scaled_joint_trajectory_controller.cpp +++ b/ur_controllers/src/scaled_joint_trajectory_controller.cpp @@ -178,8 +178,8 @@ controller_interface::return_type ScaledJointTrajectoryController::update(const if (!before_last_point && !rt_is_holding_ && cmd_timeout_ > 0.0 && time_difference > cmd_timeout_) { RCLCPP_WARN(logger, "Aborted due to command timeout"); - new_trajectory_msg_.reset(); - new_trajectory_msg_.initRT(set_hold_position()); + traj_msg_external_point_ptr_.reset(); + traj_msg_external_point_ptr_.initRT(set_hold_position()); } // Check state/goal tolerance @@ -313,8 +313,8 @@ controller_interface::return_type ScaledJointTrajectoryController::update(const RCLCPP_WARN(logger, "%s", error_string.c_str()); - new_trajectory_msg_.reset(); - new_trajectory_msg_.initRT(set_hold_position()); + traj_msg_external_point_ptr_.reset(); + traj_msg_external_point_ptr_.initRT(set_hold_position()); } } } else if (tolerance_violated_while_moving && !rt_has_pending_goal_) { @@ -326,8 +326,8 @@ controller_interface::return_type ScaledJointTrajectoryController::update(const } else if (!before_last_point && !within_goal_time && !rt_has_pending_goal_) { RCLCPP_ERROR(logger, "Exceeded goal_time_tolerance: holding position..."); - new_trajectory_msg_.reset(); - new_trajectory_msg_.initRT(set_hold_position()); + traj_msg_external_point_ptr_.reset(); + traj_msg_external_point_ptr_.initRT(set_hold_position()); } // else, run another cycle while waiting for outside_goal_tolerance // to be satisfied (will stay in this state until new message arrives) From d7de8a7caaaefaebf54542d94d79520393a13cdc Mon Sep 17 00:00:00 2001 From: URJala Date: Fri, 14 Mar 2025 07:41:43 +0000 Subject: [PATCH 02/22] Reapply "Apply renaming of member variables of JTC (#1275)" This reverts commit 99592214e10b58705c593e9b643c6f0d63cedca7. --- .../src/scaled_joint_trajectory_controller.cpp | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/ur_controllers/src/scaled_joint_trajectory_controller.cpp b/ur_controllers/src/scaled_joint_trajectory_controller.cpp index 2f1a08b45..bdc935b0d 100644 --- a/ur_controllers/src/scaled_joint_trajectory_controller.cpp +++ b/ur_controllers/src/scaled_joint_trajectory_controller.cpp @@ -178,8 +178,8 @@ controller_interface::return_type ScaledJointTrajectoryController::update(const if (!before_last_point && !rt_is_holding_ && cmd_timeout_ > 0.0 && time_difference > cmd_timeout_) { RCLCPP_WARN(logger, "Aborted due to command timeout"); - traj_msg_external_point_ptr_.reset(); - traj_msg_external_point_ptr_.initRT(set_hold_position()); + new_trajectory_msg_.reset(); + new_trajectory_msg_.initRT(set_hold_position()); } // Check state/goal tolerance @@ -313,8 +313,8 @@ controller_interface::return_type ScaledJointTrajectoryController::update(const RCLCPP_WARN(logger, "%s", error_string.c_str()); - traj_msg_external_point_ptr_.reset(); - traj_msg_external_point_ptr_.initRT(set_hold_position()); + new_trajectory_msg_.reset(); + new_trajectory_msg_.initRT(set_hold_position()); } } } else if (tolerance_violated_while_moving && !rt_has_pending_goal_) { @@ -326,8 +326,8 @@ controller_interface::return_type ScaledJointTrajectoryController::update(const } else if (!before_last_point && !within_goal_time && !rt_has_pending_goal_) { RCLCPP_ERROR(logger, "Exceeded goal_time_tolerance: holding position..."); - traj_msg_external_point_ptr_.reset(); - traj_msg_external_point_ptr_.initRT(set_hold_position()); + new_trajectory_msg_.reset(); + new_trajectory_msg_.initRT(set_hold_position()); } // else, run another cycle while waiting for outside_goal_tolerance // to be satisfied (will stay in this state until new message arrives) From 9cd437a727af8aac77782dbf1ce371d52b9b948b Mon Sep 17 00:00:00 2001 From: URJala Date: Thu, 9 Jan 2025 12:59:17 +0000 Subject: [PATCH 03/22] Running integration tests with mock hardware Added some launch arguments and some checks to avoid connecting to the dashboard interface when using mock hardware, as that will not work. Maybe not the most elegant, but it works for now. Currently passthrough controller does not work at all with mock hardware, and is bypassed if using that in the test. test_trajectory_scaled_aborts_on_violation fails, as the hardware doesnt abort. test_set_io also fails as the controller cant verify that a pin has been set. --- ur_robot_driver/launch/ur_control.launch.py | 3 +++ ur_robot_driver/test/robot_driver.py | 24 +++++++++++++++------ ur_robot_driver/test/test_common.py | 6 ++++-- ur_robot_driver/urdf/ur.ros2_control.xacro | 4 +++- 4 files changed, 28 insertions(+), 9 deletions(-) diff --git a/ur_robot_driver/launch/ur_control.launch.py b/ur_robot_driver/launch/ur_control.launch.py index 528da2ee7..adb8bc39d 100644 --- a/ur_robot_driver/launch/ur_control.launch.py +++ b/ur_robot_driver/launch/ur_control.launch.py @@ -195,6 +195,9 @@ def controller_spawner(controllers, active=True): if activate_joint_controller.perform(context) == "true": controllers_active.append(initial_joint_controller.perform(context)) controllers_inactive.remove(initial_joint_controller.perform(context)) + + if use_mock_hardware.perform(context) == "true": + controllers_active.remove("tcp_pose_broadcaster") if use_mock_hardware.perform(context) == "true": controllers_active.remove("tcp_pose_broadcaster") diff --git a/ur_robot_driver/test/robot_driver.py b/ur_robot_driver/test/robot_driver.py index e07462772..5ee7838eb 100755 --- a/ur_robot_driver/test/robot_driver.py +++ b/ur_robot_driver/test/robot_driver.py @@ -56,16 +56,22 @@ ) TIMEOUT_EXECUTE_TRAJECTORY = 30 +MOCK_HARDWARE = False + +def set_mock_hardware_flag(use_mock_hardware): + return use_mock_hardware=="true" @pytest.mark.launch_test @launch_testing.parametrize( - "tf_prefix", - [(""), ("my_ur_")], + "tf_prefix, use_mock_hardware, mock_sensor_commands", + [("", "false", "false"), ("my_ur_", "false", "false"), ("", "true", "true")] ) -def generate_test_description(tf_prefix): - return generate_driver_test_description(tf_prefix=tf_prefix) +def generate_test_description(tf_prefix, use_mock_hardware, mock_sensor_commands): + global MOCK_HARDWARE + MOCK_HARDWARE = set_mock_hardware_flag(use_mock_hardware) + return generate_driver_test_description(tf_prefix=tf_prefix, use_mock_hardware=use_mock_hardware, mock_sensor_commands=mock_sensor_commands) class RobotDriverTest(unittest.TestCase): @classmethod @@ -83,7 +89,10 @@ def tearDownClass(cls): rclpy.shutdown() def init_robot(self): - self._dashboard_interface = DashboardInterface(self.node) + if(not MOCK_HARDWARE): + self._dashboard_interface = DashboardInterface(self.node) + else: + self._dashboard_interface = None self._controller_manager_interface = ControllerManagerInterface(self.node) self._io_status_controller_interface = IoStatusInterface(self.node) self._configuration_controller_interface = ConfigurationInterface(self.node) @@ -100,7 +109,8 @@ def init_robot(self): ) def setUp(self): - self._dashboard_interface.start_robot() + if(self._dashboard_interface): + self._dashboard_interface.start_robot() time.sleep(1) self.assertTrue(self._io_status_controller_interface.resend_robot_program().success) @@ -372,6 +382,8 @@ def js_cb(msg): # self.node.get_logger().info("Received result GOAL_TOLERANCE_VIOLATED") def test_passthrough_trajectory(self, tf_prefix): + if(MOCK_HARDWARE): + return True self.assertTrue( self._controller_manager_interface.switch_controller( strictness=SwitchController.Request.BEST_EFFORT, diff --git a/ur_robot_driver/test/test_common.py b/ur_robot_driver/test/test_common.py index 215f8ff55..96302d0ab 100644 --- a/ur_robot_driver/test/test_common.py +++ b/ur_robot_driver/test/test_common.py @@ -373,8 +373,8 @@ def generate_dashboard_test_description(): def generate_driver_test_description( tf_prefix="", initial_joint_controller="scaled_joint_trajectory_controller", - controller_spawner_timeout=TIMEOUT_WAIT_SERVICE_INITIAL, -): + controller_spawner_timeout=TIMEOUT_WAIT_SERVICE_INITIAL, use_mock_hardware="false", mock_sensor_commands="false", + ): ur_type = LaunchConfiguration("ur_type") launch_arguments = { @@ -386,6 +386,8 @@ def generate_driver_test_description( "headless_mode": "true", "launch_dashboard_client": "true", "start_joint_controller": "false", + "use_mock_hardware": use_mock_hardware, + "mock_sensor_commands": mock_sensor_commands, } if tf_prefix: launch_arguments["tf_prefix"] = tf_prefix diff --git a/ur_robot_driver/urdf/ur.ros2_control.xacro b/ur_robot_driver/urdf/ur.ros2_control.xacro index f32805273..b480434b0 100644 --- a/ur_robot_driver/urdf/ur.ros2_control.xacro +++ b/ur_robot_driver/urdf/ur.ros2_control.xacro @@ -300,7 +300,9 @@ - + + 1 + From f1389a6fbb19bb5cdd29bef6342dede66a9aaff3 Mon Sep 17 00:00:00 2001 From: URJala Date: Fri, 14 Mar 2025 15:26:43 +0000 Subject: [PATCH 04/22] Robot driver tests pass with mock hardware --- ur_robot_driver/test/robot_driver.py | 28 ++++++++++++++-------------- ur_robot_driver/test/test_common.py | 6 +++--- 2 files changed, 17 insertions(+), 17 deletions(-) diff --git a/ur_robot_driver/test/robot_driver.py b/ur_robot_driver/test/robot_driver.py index 5ee7838eb..05e47efb7 100755 --- a/ur_robot_driver/test/robot_driver.py +++ b/ur_robot_driver/test/robot_driver.py @@ -56,30 +56,26 @@ ) TIMEOUT_EXECUTE_TRAJECTORY = 30 -MOCK_HARDWARE = False - -def set_mock_hardware_flag(use_mock_hardware): - return use_mock_hardware=="true" @pytest.mark.launch_test @launch_testing.parametrize( - "tf_prefix, use_mock_hardware, mock_sensor_commands", - [("", "false", "false"), ("my_ur_", "false", "false"), ("", "true", "true")] + "tf_prefix, use_mock_hardware", [("", "false"), ("my_ur_", "false"), ("", "true")] ) +def generate_test_description(tf_prefix, use_mock_hardware): + return generate_driver_test_description( + tf_prefix=tf_prefix, use_mock_hardware=use_mock_hardware + ) -def generate_test_description(tf_prefix, use_mock_hardware, mock_sensor_commands): - global MOCK_HARDWARE - MOCK_HARDWARE = set_mock_hardware_flag(use_mock_hardware) - return generate_driver_test_description(tf_prefix=tf_prefix, use_mock_hardware=use_mock_hardware, mock_sensor_commands=mock_sensor_commands) class RobotDriverTest(unittest.TestCase): @classmethod - def setUpClass(cls): + def setUpClass(cls, use_mock_hardware): # Initialize the ROS context rclpy.init() cls.node = Node("robot_driver_test") time.sleep(1) + cls.mock_hardware = use_mock_hardware == "true" cls.init_robot(cls) @classmethod @@ -89,7 +85,7 @@ def tearDownClass(cls): rclpy.shutdown() def init_robot(self): - if(not MOCK_HARDWARE): + if not self.mock_hardware: self._dashboard_interface = DashboardInterface(self.node) else: self._dashboard_interface = None @@ -109,7 +105,7 @@ def init_robot(self): ) def setUp(self): - if(self._dashboard_interface): + if self._dashboard_interface: self._dashboard_interface.start_robot() time.sleep(1) self.assertTrue(self._io_status_controller_interface.resend_robot_program().success) @@ -149,6 +145,8 @@ def test_start_passthrough_controller(self): def test_set_io(self): """Test to set an IO and check whether it has been set.""" + if self.mock_hardware: + return True # Create io callback to verify result io_msg = None @@ -303,6 +301,8 @@ def test_trajectory_scaled(self, tf_prefix): def test_trajectory_scaled_aborts_on_violation(self, tf_prefix): """Test that the robot correctly aborts the trajectory when the constraints are violated.""" + if self.mock_hardware: + return True # Construct test trajectory test_trajectory = [ (Duration(sec=6, nanosec=0), [0.0 for j in ROBOT_JOINTS]), @@ -382,7 +382,7 @@ def js_cb(msg): # self.node.get_logger().info("Received result GOAL_TOLERANCE_VIOLATED") def test_passthrough_trajectory(self, tf_prefix): - if(MOCK_HARDWARE): + if self.mock_hardware: return True self.assertTrue( self._controller_manager_interface.switch_controller( diff --git a/ur_robot_driver/test/test_common.py b/ur_robot_driver/test/test_common.py index 96302d0ab..d683104cd 100644 --- a/ur_robot_driver/test/test_common.py +++ b/ur_robot_driver/test/test_common.py @@ -373,8 +373,8 @@ def generate_dashboard_test_description(): def generate_driver_test_description( tf_prefix="", initial_joint_controller="scaled_joint_trajectory_controller", - controller_spawner_timeout=TIMEOUT_WAIT_SERVICE_INITIAL, use_mock_hardware="false", mock_sensor_commands="false", - ): + controller_spawner_timeout=TIMEOUT_WAIT_SERVICE_INITIAL, use_mock_hardware="false", +): ur_type = LaunchConfiguration("ur_type") launch_arguments = { @@ -387,7 +387,7 @@ def generate_driver_test_description( "launch_dashboard_client": "true", "start_joint_controller": "false", "use_mock_hardware": use_mock_hardware, - "mock_sensor_commands": mock_sensor_commands, + "mock_sensor_commands": use_mock_hardware, } if tf_prefix: launch_arguments["tf_prefix"] = tf_prefix From 4d55d7597b4b7a0d66b6096b442b421e7bd13c09 Mon Sep 17 00:00:00 2001 From: URJala Date: Fri, 14 Mar 2025 15:27:25 +0000 Subject: [PATCH 05/22] something got duplicated, removing --- ur_robot_driver/launch/ur_control.launch.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/ur_robot_driver/launch/ur_control.launch.py b/ur_robot_driver/launch/ur_control.launch.py index adb8bc39d..528da2ee7 100644 --- a/ur_robot_driver/launch/ur_control.launch.py +++ b/ur_robot_driver/launch/ur_control.launch.py @@ -195,9 +195,6 @@ def controller_spawner(controllers, active=True): if activate_joint_controller.perform(context) == "true": controllers_active.append(initial_joint_controller.perform(context)) controllers_inactive.remove(initial_joint_controller.perform(context)) - - if use_mock_hardware.perform(context) == "true": - controllers_active.remove("tcp_pose_broadcaster") if use_mock_hardware.perform(context) == "true": controllers_active.remove("tcp_pose_broadcaster") From a848909e3386228f33268ba33edd83d5970e5a22 Mon Sep 17 00:00:00 2001 From: URJala Date: Tue, 1 Apr 2025 08:38:16 +0000 Subject: [PATCH 06/22] Split passthrough controller tests to its own file. And into separate test cases. Moved timout for trajectory execution to common file --- ...integration_test_passthrough_controller.py | 267 ++++++++++++++++++ ur_robot_driver/test/robot_driver.py | 20 +- ur_robot_driver/test/test_common.py | 2 + 3 files changed, 270 insertions(+), 19 deletions(-) create mode 100644 ur_robot_driver/test/integration_test_passthrough_controller.py diff --git a/ur_robot_driver/test/integration_test_passthrough_controller.py b/ur_robot_driver/test/integration_test_passthrough_controller.py new file mode 100644 index 000000000..a74f860fb --- /dev/null +++ b/ur_robot_driver/test/integration_test_passthrough_controller.py @@ -0,0 +1,267 @@ +#!/usr/bin/env python +# Copyright 2019, Universal Robots A/S +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# +# * Redistributions in binary form must reproduce the above copyright +# notice, this list of conditions and the following disclaimer in the +# documentation and/or other materials provided with the distribution. +# +# * Neither the name of the {copyright_holder} nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE +# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +# POSSIBILITY OF SUCH DAMAGE. + +import os +import sys +import time +import unittest + +from math import pi +import launch_testing +import pytest +import rclpy +from builtin_interfaces.msg import Duration +from control_msgs.action import FollowJointTrajectory +from control_msgs.msg import JointTolerance +from controller_manager_msgs.srv import SwitchController +from rclpy.node import Node +from trajectory_msgs.msg import JointTrajectory, JointTrajectoryPoint + +sys.path.append(os.path.dirname(__file__)) +from test_common import ( # noqa: E402 + ActionInterface, + ControllerManagerInterface, + DashboardInterface, + IoStatusInterface, + ConfigurationInterface, + generate_driver_test_description, + ROBOT_JOINTS, + TIMEOUT_EXECUTE_TRAJECTORY, +) + + +# Mock hardware does not work with passthrough controller, so dont test with it +@pytest.mark.launch_test +@launch_testing.parametrize("tf_prefix", [(""), ("my_ur_")]) +def generate_test_description(tf_prefix): + return generate_driver_test_description(tf_prefix=tf_prefix) + + +HOME = { + "elbow_joint": 0.0, + "shoulder_lift_joint": -1.5708, + "shoulder_pan_joint": 0.0, + "wrist_1_joint": -1.5708, + "wrist_2_joint": 0.0, + "wrist_3_joint": 0.0, +} +waypts = [[HOME[joint] + i * pi / 4 for joint in ROBOT_JOINTS] for i in [0, -1, 1]] +time_vec = [ + Duration(sec=4, nanosec=0), + Duration(sec=8, nanosec=0), + Duration(sec=12, nanosec=0), +] +TEST_TRAJECTORY = zip(time_vec, waypts) + + +class RobotDriverTest(unittest.TestCase): + @classmethod + def setUpClass(cls, use_mock_hardware): + # Initialize the ROS context + rclpy.init() + cls.node = Node("robot_driver_test") + time.sleep(1) + cls.init_robot(cls) + + @classmethod + def tearDownClass(cls): + # Shutdown the ROS context + cls.node.destroy_node() + rclpy.shutdown() + + def init_robot(self): + + self._dashboard_interface = DashboardInterface(self.node) + + self._controller_manager_interface = ControllerManagerInterface(self.node) + self._io_status_controller_interface = IoStatusInterface(self.node) + self._configuration_controller_interface = ConfigurationInterface(self.node) + + self._scaled_follow_joint_trajectory = ActionInterface( + self.node, + "/scaled_joint_trajectory_controller/follow_joint_trajectory", + FollowJointTrajectory, + ) + self._passthrough_forward_joint_trajectory = ActionInterface( + self.node, + "/passthrough_trajectory_controller/follow_joint_trajectory", + FollowJointTrajectory, + ) + + def setUp(self): + if self._dashboard_interface: + self._dashboard_interface.start_robot() + time.sleep(1) + self.assertTrue(self._io_status_controller_interface.resend_robot_program().success) + + # + # Test functions + # + + def test_start_passthrough_controller(self): + self.assertTrue( + self._controller_manager_interface.switch_controller( + strictness=SwitchController.Request.BEST_EFFORT, + activate_controllers=["passthrough_trajectory_controller"], + deactivate_controllers=["scaled_joint_trajectory_controller"], + ).ok + ) + self.assertTrue( + self._controller_manager_interface.switch_controller( + strictness=SwitchController.Request.BEST_EFFORT, + deactivate_controllers=["passthrough_trajectory_controller"], + activate_controllers=["scaled_joint_trajectory_controller"], + ).ok + ) + + def test_passthrough_trajectory(self, tf_prefix): + self.assertTrue( + self._controller_manager_interface.switch_controller( + strictness=SwitchController.Request.BEST_EFFORT, + activate_controllers=["passthrough_trajectory_controller"], + deactivate_controllers=["scaled_joint_trajectory_controller"], + ).ok + ) + + goal_tolerance = [ + JointTolerance(position=0.01, name=tf_prefix + joint) for joint in ROBOT_JOINTS + ] + goal_time_tolerance = Duration(sec=1, nanosec=0) + trajectory = JointTrajectory( + points=[ + JointTrajectoryPoint(positions=pos, time_from_start=times) + for (times, pos) in TEST_TRAJECTORY + ], + joint_names=[tf_prefix + joint for joint in ROBOT_JOINTS], + ) + goal_handle = self._passthrough_forward_joint_trajectory.send_goal( + trajectory=trajectory, + goal_time_tolerance=goal_time_tolerance, + goal_tolerance=goal_tolerance, + ) + self.assertTrue(goal_handle.accepted) + if goal_handle.accepted: + result = self._passthrough_forward_joint_trajectory.get_result( + goal_handle, TIMEOUT_EXECUTE_TRAJECTORY + ) + self.assertEqual(result.error_code, FollowJointTrajectory.Result.SUCCESSFUL) + + def test_quintic_trajectory(self, tf_prefix): + # Full quintic trajectory + + trajectory = JointTrajectory( + points=[ + JointTrajectoryPoint( + positions=pos, + time_from_start=times, + velocities=[0.0, 0.0, 0.0, 0.0, 0.0, 0.0], + accelerations=[0.0, 0.0, 0.0, 0.0, 0.0, 0.0], + ) + for (times, pos) in TEST_TRAJECTORY + ], + joint_names=[tf_prefix + joint for joint in ROBOT_JOINTS], + ) + goal_time_tolerance = Duration(sec=1, nanosec=0) + goal_tolerance = [ + JointTolerance(position=0.01, name=tf_prefix + joint) for joint in ROBOT_JOINTS + ] + goal_handle = self._passthrough_forward_joint_trajectory.send_goal( + trajectory=trajectory, + goal_time_tolerance=goal_time_tolerance, + goal_tolerance=goal_tolerance, + ) + + self.assertTrue(goal_handle.accepted) + if goal_handle.accepted: + result = self._passthrough_forward_joint_trajectory.get_result( + goal_handle, TIMEOUT_EXECUTE_TRAJECTORY + ) + self.assertEqual(result.error_code, FollowJointTrajectory.Result.SUCCESSFUL) + + def test_impossible_goal_tolerance_fails(self, tf_prefix): + # Test impossible goal tolerance, should fail. + + trajectory = JointTrajectory( + points=[ + JointTrajectoryPoint(positions=pos, time_from_start=times) + for (times, pos) in TEST_TRAJECTORY + ], + joint_names=[tf_prefix + joint for joint in ROBOT_JOINTS], + ) + goal_tolerance = [ + JointTolerance(position=0.000000001, name=tf_prefix + joint) for joint in ROBOT_JOINTS + ] + goal_time_tolerance = Duration(sec=1, nanosec=0) + goal_handle = self._passthrough_forward_joint_trajectory.send_goal( + trajectory=trajectory, + goal_time_tolerance=goal_time_tolerance, + goal_tolerance=goal_tolerance, + ) + self.assertTrue(goal_handle.accepted) + if goal_handle.accepted: + result = self._passthrough_forward_joint_trajectory.get_result( + goal_handle, TIMEOUT_EXECUTE_TRAJECTORY + ) + self.assertEqual( + result.error_code, FollowJointTrajectory.Result.GOAL_TOLERANCE_VIOLATED + ) + + def test_impossible_goal_time_tolerance_fails(self, tf_prefix): + # Test impossible goal time + goal_tolerance = [ + JointTolerance(position=0.01, name=tf_prefix + joint) for joint in ROBOT_JOINTS + ] + goal_time_tolerance = Duration(sec=0, nanosec=10) + trajectory = JointTrajectory( + points=[ + JointTrajectoryPoint(positions=pos, time_from_start=times) + for (times, pos) in TEST_TRAJECTORY + ], + joint_names=[tf_prefix + joint for joint in ROBOT_JOINTS], + ) + goal_handle = self._passthrough_forward_joint_trajectory.send_goal( + trajectory=trajectory, + goal_time_tolerance=goal_time_tolerance, + goal_tolerance=goal_tolerance, + ) + self.assertTrue(goal_handle.accepted) + if goal_handle.accepted: + result = self._passthrough_forward_joint_trajectory.get_result( + goal_handle, TIMEOUT_EXECUTE_TRAJECTORY + ) + self.assertEqual( + result.error_code, FollowJointTrajectory.Result.GOAL_TOLERANCE_VIOLATED + ) + self.assertTrue( + self._controller_manager_interface.switch_controller( + strictness=SwitchController.Request.BEST_EFFORT, + deactivate_controllers=["passthrough_trajectory_controller"], + activate_controllers=["scaled_joint_trajectory_controller"], + ).ok + ) diff --git a/ur_robot_driver/test/robot_driver.py b/ur_robot_driver/test/robot_driver.py index 05e47efb7..388a75704 100755 --- a/ur_robot_driver/test/robot_driver.py +++ b/ur_robot_driver/test/robot_driver.py @@ -37,7 +37,6 @@ import rclpy from builtin_interfaces.msg import Duration from control_msgs.action import FollowJointTrajectory -from control_msgs.msg import JointTolerance from controller_manager_msgs.srv import SwitchController from rclpy.node import Node from sensor_msgs.msg import JointState @@ -53,10 +52,9 @@ ConfigurationInterface, generate_driver_test_description, ROBOT_JOINTS, + TIMEOUT_EXECUTE_TRAJECTORY, ) -TIMEOUT_EXECUTE_TRAJECTORY = 30 - @pytest.mark.launch_test @launch_testing.parametrize( @@ -127,22 +125,6 @@ def test_start_scaled_jtc_controller(self): ).ok ) - def test_start_passthrough_controller(self): - self.assertTrue( - self._controller_manager_interface.switch_controller( - strictness=SwitchController.Request.BEST_EFFORT, - activate_controllers=["passthrough_trajectory_controller"], - deactivate_controllers=["scaled_joint_trajectory_controller"], - ).ok - ) - self.assertTrue( - self._controller_manager_interface.switch_controller( - strictness=SwitchController.Request.BEST_EFFORT, - deactivate_controllers=["passthrough_trajectory_controller"], - activate_controllers=["scaled_joint_trajectory_controller"], - ).ok - ) - def test_set_io(self): """Test to set an IO and check whether it has been set.""" if self.mock_hardware: diff --git a/ur_robot_driver/test/test_common.py b/ur_robot_driver/test/test_common.py index d683104cd..ce53b268c 100644 --- a/ur_robot_driver/test/test_common.py +++ b/ur_robot_driver/test/test_common.py @@ -63,6 +63,8 @@ TIMEOUT_WAIT_SERVICE_INITIAL = 120 # If we download the docker image simultaneously to the tests, it can take quite some time until the dashboard server is reachable and usable. TIMEOUT_WAIT_ACTION = 10 +TIMEOUT_EXECUTE_TRAJECTORY = 30 + ROBOT_JOINTS = [ "elbow_joint", "shoulder_lift_joint", From 664b50cafa03011651e4467abc769bfa3b918581 Mon Sep 17 00:00:00 2001 From: URJala Date: Tue, 8 Apr 2025 12:45:19 +0000 Subject: [PATCH 07/22] Split integration tests out to one file per controller --- .../integration_test_config_controller.py | 111 +++++++++++ .../integration_test_controller_switch.py | 6 +- .../test/integration_test_io_controller.py | 158 +++++++++++++++ ...integration_test_passthrough_controller.py | 187 +++++++++--------- ...tegration_test_scaled_joint_controller.py} | 56 ------ 5 files changed, 360 insertions(+), 158 deletions(-) create mode 100644 ur_robot_driver/test/integration_test_config_controller.py create mode 100644 ur_robot_driver/test/integration_test_io_controller.py rename ur_robot_driver/test/{robot_driver.py => integration_test_scaled_joint_controller.py} (90%) diff --git a/ur_robot_driver/test/integration_test_config_controller.py b/ur_robot_driver/test/integration_test_config_controller.py new file mode 100644 index 000000000..0699e6fe0 --- /dev/null +++ b/ur_robot_driver/test/integration_test_config_controller.py @@ -0,0 +1,111 @@ +#!/usr/bin/env python +# Copyright 2025, Universal Robots A/S +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# +# * Redistributions in binary form must reproduce the above copyright +# notice, this list of conditions and the following disclaimer in the +# documentation and/or other materials provided with the distribution. +# +# * Neither the name of the {copyright_holder} nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE +# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +# POSSIBILITY OF SUCH DAMAGE. + +import os +import sys +import time +import unittest + +import launch_testing +import pytest +import rclpy +from control_msgs.action import FollowJointTrajectory +from rclpy.node import Node + +sys.path.append(os.path.dirname(__file__)) +from test_common import ( # noqa: E402 + ActionInterface, + ControllerManagerInterface, + DashboardInterface, + IoStatusInterface, + ConfigurationInterface, + generate_driver_test_description, +) + + +@pytest.mark.launch_test +@launch_testing.parametrize( + "tf_prefix, use_mock_hardware", [("", "false"), ("my_ur_", "false"), ("", "true")] +) +def generate_test_description(tf_prefix, use_mock_hardware): + return generate_driver_test_description( + tf_prefix=tf_prefix, use_mock_hardware=use_mock_hardware + ) + + +class RobotDriverTest(unittest.TestCase): + @classmethod + def setUpClass(cls, use_mock_hardware): + # Initialize the ROS context + rclpy.init() + cls.node = Node("robot_driver_test") + time.sleep(1) + cls.mock_hardware = use_mock_hardware == "true" + cls.init_robot(cls) + + @classmethod + def tearDownClass(cls): + # Shutdown the ROS context + cls.node.destroy_node() + rclpy.shutdown() + + def init_robot(self): + if not self.mock_hardware: + self._dashboard_interface = DashboardInterface(self.node) + else: + self._dashboard_interface = None + self._controller_manager_interface = ControllerManagerInterface(self.node) + self._io_status_controller_interface = IoStatusInterface(self.node) + self._configuration_controller_interface = ConfigurationInterface(self.node) + + self._scaled_follow_joint_trajectory = ActionInterface( + self.node, + "/scaled_joint_trajectory_controller/follow_joint_trajectory", + FollowJointTrajectory, + ) + self._passthrough_forward_joint_trajectory = ActionInterface( + self.node, + "/passthrough_trajectory_controller/follow_joint_trajectory", + FollowJointTrajectory, + ) + + def setUp(self): + if self._dashboard_interface: + self._dashboard_interface.start_robot() + time.sleep(1) + self.assertTrue(self._io_status_controller_interface.resend_robot_program().success) + + # + # Test functions + # + + def test_get_robot_software_version(self): + self.assertNotEqual( + self._configuration_controller_interface.get_robot_software_version().major, 0 + ) diff --git a/ur_robot_driver/test/integration_test_controller_switch.py b/ur_robot_driver/test/integration_test_controller_switch.py index 58e11a1e4..2ef097f04 100644 --- a/ur_robot_driver/test/integration_test_controller_switch.py +++ b/ur_robot_driver/test/integration_test_controller_switch.py @@ -60,11 +60,7 @@ @pytest.mark.launch_test -@launch_testing.parametrize( - "tf_prefix", - [""], - # [(""), ("my_ur_")], -) +@launch_testing.parametrize("tf_prefix", [(""), ("my_ur_")]) def generate_test_description(tf_prefix): return generate_driver_test_description(tf_prefix=tf_prefix) diff --git a/ur_robot_driver/test/integration_test_io_controller.py b/ur_robot_driver/test/integration_test_io_controller.py new file mode 100644 index 000000000..bd4391204 --- /dev/null +++ b/ur_robot_driver/test/integration_test_io_controller.py @@ -0,0 +1,158 @@ +#!/usr/bin/env python +# Copyright 2025, Universal Robots A/S +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# +# * Redistributions in binary form must reproduce the above copyright +# notice, this list of conditions and the following disclaimer in the +# documentation and/or other materials provided with the distribution. +# +# * Neither the name of the {copyright_holder} nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE +# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +# POSSIBILITY OF SUCH DAMAGE. + +import logging +import os +import sys +import time +import unittest + +import launch_testing +import pytest +import rclpy +from control_msgs.action import FollowJointTrajectory +from rclpy.node import Node +from ur_msgs.msg import IOStates + +sys.path.append(os.path.dirname(__file__)) +from test_common import ( # noqa: E402 + ActionInterface, + ControllerManagerInterface, + DashboardInterface, + IoStatusInterface, + ConfigurationInterface, + generate_driver_test_description, +) + + +@pytest.mark.launch_test +@launch_testing.parametrize( + "tf_prefix, use_mock_hardware", [("", "false"), ("my_ur_", "false"), ("", "true")] +) +def generate_test_description(tf_prefix, use_mock_hardware): + return generate_driver_test_description( + tf_prefix=tf_prefix, use_mock_hardware=use_mock_hardware + ) + + +class RobotDriverTest(unittest.TestCase): + @classmethod + def setUpClass(cls, use_mock_hardware): + # Initialize the ROS context + rclpy.init() + cls.node = Node("robot_driver_test") + time.sleep(1) + cls.mock_hardware = use_mock_hardware == "true" + cls.init_robot(cls) + + @classmethod + def tearDownClass(cls): + # Shutdown the ROS context + cls.node.destroy_node() + rclpy.shutdown() + + def init_robot(self): + if not self.mock_hardware: + self._dashboard_interface = DashboardInterface(self.node) + else: + self._dashboard_interface = None + self._controller_manager_interface = ControllerManagerInterface(self.node) + self._io_status_controller_interface = IoStatusInterface(self.node) + self._configuration_controller_interface = ConfigurationInterface(self.node) + + self._scaled_follow_joint_trajectory = ActionInterface( + self.node, + "/scaled_joint_trajectory_controller/follow_joint_trajectory", + FollowJointTrajectory, + ) + self._passthrough_forward_joint_trajectory = ActionInterface( + self.node, + "/passthrough_trajectory_controller/follow_joint_trajectory", + FollowJointTrajectory, + ) + + def setUp(self): + if self._dashboard_interface: + self._dashboard_interface.start_robot() + time.sleep(1) + self.assertTrue(self._io_status_controller_interface.resend_robot_program().success) + + # + # Test functions + # + + def test_set_io(self): + """Test to set an IO and check whether it has been set.""" + if self.mock_hardware: + return True + # Create io callback to verify result + io_msg = None + + def io_msg_cb(msg): + nonlocal io_msg + io_msg = msg + + io_states_sub = self.node.create_subscription( + IOStates, + "/io_and_status_controller/io_states", + io_msg_cb, + rclpy.qos.qos_profile_system_default, + ) + + # Set pin 0 to 1.0 + test_pin = 0 + + logging.info("Setting pin %d to 1.0", test_pin) + self._io_status_controller_interface.set_io(fun=1, pin=test_pin, state=1.0) + + # Wait until the pin state has changed + pin_state = False + end_time = time.time() + 5 + while not pin_state and time.time() < end_time: + rclpy.spin_once(self.node, timeout_sec=0.1) + if io_msg is not None: + pin_state = io_msg.digital_out_states[test_pin].state + + self.assertEqual(pin_state, 1.0) + + # Set pin 0 to 0.0 + logging.info("Setting pin %d to 0.0", test_pin) + self._io_status_controller_interface.set_io(fun=1, pin=test_pin, state=0.0) + + # Wait until the pin state has changed back + end_time = time.time() + 5 + while pin_state and time.time() < end_time: + rclpy.spin_once(self.node, timeout_sec=0.1) + if io_msg is not None: + pin_state = io_msg.digital_out_states[test_pin].state + + self.assertEqual(pin_state, 0.0) + + # Clean up io subscription + self.node.destroy_subscription(io_states_sub) diff --git a/ur_robot_driver/test/integration_test_passthrough_controller.py b/ur_robot_driver/test/integration_test_passthrough_controller.py index a74f860fb..61942c37a 100644 --- a/ur_robot_driver/test/integration_test_passthrough_controller.py +++ b/ur_robot_driver/test/integration_test_passthrough_controller.py @@ -1,5 +1,5 @@ #!/usr/bin/env python -# Copyright 2019, Universal Robots A/S +# Copyright 2025, Universal Robots A/S # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions are met: @@ -82,7 +82,7 @@ def generate_test_description(tf_prefix): class RobotDriverTest(unittest.TestCase): @classmethod - def setUpClass(cls, use_mock_hardware): + def setUpClass(cls): # Initialize the ROS context rclpy.init() cls.node = Node("robot_driver_test") @@ -132,13 +132,6 @@ def test_start_passthrough_controller(self): deactivate_controllers=["scaled_joint_trajectory_controller"], ).ok ) - self.assertTrue( - self._controller_manager_interface.switch_controller( - strictness=SwitchController.Request.BEST_EFFORT, - deactivate_controllers=["passthrough_trajectory_controller"], - activate_controllers=["scaled_joint_trajectory_controller"], - ).ok - ) def test_passthrough_trajectory(self, tf_prefix): self.assertTrue( @@ -172,96 +165,96 @@ def test_passthrough_trajectory(self, tf_prefix): ) self.assertEqual(result.error_code, FollowJointTrajectory.Result.SUCCESSFUL) - def test_quintic_trajectory(self, tf_prefix): - # Full quintic trajectory + # def test_quintic_trajectory(self, tf_prefix): + # # Full quintic trajectory - trajectory = JointTrajectory( - points=[ - JointTrajectoryPoint( - positions=pos, - time_from_start=times, - velocities=[0.0, 0.0, 0.0, 0.0, 0.0, 0.0], - accelerations=[0.0, 0.0, 0.0, 0.0, 0.0, 0.0], - ) - for (times, pos) in TEST_TRAJECTORY - ], - joint_names=[tf_prefix + joint for joint in ROBOT_JOINTS], - ) - goal_time_tolerance = Duration(sec=1, nanosec=0) - goal_tolerance = [ - JointTolerance(position=0.01, name=tf_prefix + joint) for joint in ROBOT_JOINTS - ] - goal_handle = self._passthrough_forward_joint_trajectory.send_goal( - trajectory=trajectory, - goal_time_tolerance=goal_time_tolerance, - goal_tolerance=goal_tolerance, - ) + # trajectory = JointTrajectory( + # points=[ + # JointTrajectoryPoint( + # positions=pos, + # time_from_start=times, + # velocities=[0.0, 0.0, 0.0, 0.0, 0.0, 0.0], + # accelerations=[0.0, 0.0, 0.0, 0.0, 0.0, 0.0], + # ) + # for (times, pos) in TEST_TRAJECTORY + # ], + # joint_names=[tf_prefix + joint for joint in ROBOT_JOINTS], + # ) + # goal_time_tolerance = Duration(sec=1, nanosec=0) + # goal_tolerance = [ + # JointTolerance(position=0.01, name=tf_prefix + joint) for joint in ROBOT_JOINTS + # ] + # goal_handle = self._passthrough_forward_joint_trajectory.send_goal( + # trajectory=trajectory, + # goal_time_tolerance=goal_time_tolerance, + # goal_tolerance=goal_tolerance, + # ) - self.assertTrue(goal_handle.accepted) - if goal_handle.accepted: - result = self._passthrough_forward_joint_trajectory.get_result( - goal_handle, TIMEOUT_EXECUTE_TRAJECTORY - ) - self.assertEqual(result.error_code, FollowJointTrajectory.Result.SUCCESSFUL) + # self.assertTrue(goal_handle.accepted) + # if goal_handle.accepted: + # result = self._passthrough_forward_joint_trajectory.get_result( + # goal_handle, TIMEOUT_EXECUTE_TRAJECTORY + # ) + # self.assertEqual(result.error_code, FollowJointTrajectory.Result.SUCCESSFUL) - def test_impossible_goal_tolerance_fails(self, tf_prefix): - # Test impossible goal tolerance, should fail. + # def test_impossible_goal_tolerance_fails(self, tf_prefix): + # # Test impossible goal tolerance, should fail. - trajectory = JointTrajectory( - points=[ - JointTrajectoryPoint(positions=pos, time_from_start=times) - for (times, pos) in TEST_TRAJECTORY - ], - joint_names=[tf_prefix + joint for joint in ROBOT_JOINTS], - ) - goal_tolerance = [ - JointTolerance(position=0.000000001, name=tf_prefix + joint) for joint in ROBOT_JOINTS - ] - goal_time_tolerance = Duration(sec=1, nanosec=0) - goal_handle = self._passthrough_forward_joint_trajectory.send_goal( - trajectory=trajectory, - goal_time_tolerance=goal_time_tolerance, - goal_tolerance=goal_tolerance, - ) - self.assertTrue(goal_handle.accepted) - if goal_handle.accepted: - result = self._passthrough_forward_joint_trajectory.get_result( - goal_handle, TIMEOUT_EXECUTE_TRAJECTORY - ) - self.assertEqual( - result.error_code, FollowJointTrajectory.Result.GOAL_TOLERANCE_VIOLATED - ) + # trajectory = JointTrajectory( + # points=[ + # JointTrajectoryPoint(positions=pos, time_from_start=times) + # for (times, pos) in TEST_TRAJECTORY + # ], + # joint_names=[tf_prefix + joint for joint in ROBOT_JOINTS], + # ) + # goal_tolerance = [ + # JointTolerance(position=0.000000001, name=tf_prefix + joint) for joint in ROBOT_JOINTS + # ] + # goal_time_tolerance = Duration(sec=1, nanosec=0) + # goal_handle = self._passthrough_forward_joint_trajectory.send_goal( + # trajectory=trajectory, + # goal_time_tolerance=goal_time_tolerance, + # goal_tolerance=goal_tolerance, + # ) + # self.assertTrue(goal_handle.accepted) + # if goal_handle.accepted: + # result = self._passthrough_forward_joint_trajectory.get_result( + # goal_handle, TIMEOUT_EXECUTE_TRAJECTORY + # ) + # self.assertEqual( + # result.error_code, FollowJointTrajectory.Result.GOAL_TOLERANCE_VIOLATED + # ) - def test_impossible_goal_time_tolerance_fails(self, tf_prefix): - # Test impossible goal time - goal_tolerance = [ - JointTolerance(position=0.01, name=tf_prefix + joint) for joint in ROBOT_JOINTS - ] - goal_time_tolerance = Duration(sec=0, nanosec=10) - trajectory = JointTrajectory( - points=[ - JointTrajectoryPoint(positions=pos, time_from_start=times) - for (times, pos) in TEST_TRAJECTORY - ], - joint_names=[tf_prefix + joint for joint in ROBOT_JOINTS], - ) - goal_handle = self._passthrough_forward_joint_trajectory.send_goal( - trajectory=trajectory, - goal_time_tolerance=goal_time_tolerance, - goal_tolerance=goal_tolerance, - ) - self.assertTrue(goal_handle.accepted) - if goal_handle.accepted: - result = self._passthrough_forward_joint_trajectory.get_result( - goal_handle, TIMEOUT_EXECUTE_TRAJECTORY - ) - self.assertEqual( - result.error_code, FollowJointTrajectory.Result.GOAL_TOLERANCE_VIOLATED - ) - self.assertTrue( - self._controller_manager_interface.switch_controller( - strictness=SwitchController.Request.BEST_EFFORT, - deactivate_controllers=["passthrough_trajectory_controller"], - activate_controllers=["scaled_joint_trajectory_controller"], - ).ok - ) + # def test_impossible_goal_time_tolerance_fails(self, tf_prefix): + # # Test impossible goal time + # goal_tolerance = [ + # JointTolerance(position=0.01, name=tf_prefix + joint) for joint in ROBOT_JOINTS + # ] + # goal_time_tolerance = Duration(sec=0, nanosec=10) + # trajectory = JointTrajectory( + # points=[ + # JointTrajectoryPoint(positions=pos, time_from_start=times) + # for (times, pos) in TEST_TRAJECTORY + # ], + # joint_names=[tf_prefix + joint for joint in ROBOT_JOINTS], + # ) + # goal_handle = self._passthrough_forward_joint_trajectory.send_goal( + # trajectory=trajectory, + # goal_time_tolerance=goal_time_tolerance, + # goal_tolerance=goal_tolerance, + # ) + # self.assertTrue(goal_handle.accepted) + # if goal_handle.accepted: + # result = self._passthrough_forward_joint_trajectory.get_result( + # goal_handle, TIMEOUT_EXECUTE_TRAJECTORY + # ) + # self.assertEqual( + # result.error_code, FollowJointTrajectory.Result.GOAL_TOLERANCE_VIOLATED + # ) + # self.assertTrue( + # self._controller_manager_interface.switch_controller( + # strictness=SwitchController.Request.BEST_EFFORT, + # deactivate_controllers=["passthrough_trajectory_controller"], + # activate_controllers=["scaled_joint_trajectory_controller"], + # ).ok + # ) diff --git a/ur_robot_driver/test/robot_driver.py b/ur_robot_driver/test/integration_test_scaled_joint_controller.py similarity index 90% rename from ur_robot_driver/test/robot_driver.py rename to ur_robot_driver/test/integration_test_scaled_joint_controller.py index 388a75704..3daabe6e5 100755 --- a/ur_robot_driver/test/robot_driver.py +++ b/ur_robot_driver/test/integration_test_scaled_joint_controller.py @@ -41,7 +41,6 @@ from rclpy.node import Node from sensor_msgs.msg import JointState from trajectory_msgs.msg import JointTrajectory, JointTrajectoryPoint -from ur_msgs.msg import IOStates sys.path.append(os.path.dirname(__file__)) from test_common import ( # noqa: E402 @@ -112,11 +111,6 @@ def setUp(self): # Test functions # - def test_get_robot_software_version(self): - self.assertNotEqual( - self._configuration_controller_interface.get_robot_software_version().major, 0 - ) - def test_start_scaled_jtc_controller(self): self.assertTrue( self._controller_manager_interface.switch_controller( @@ -125,56 +119,6 @@ def test_start_scaled_jtc_controller(self): ).ok ) - def test_set_io(self): - """Test to set an IO and check whether it has been set.""" - if self.mock_hardware: - return True - # Create io callback to verify result - io_msg = None - - def io_msg_cb(msg): - nonlocal io_msg - io_msg = msg - - io_states_sub = self.node.create_subscription( - IOStates, - "/io_and_status_controller/io_states", - io_msg_cb, - rclpy.qos.qos_profile_system_default, - ) - - # Set pin 0 to 1.0 - test_pin = 0 - - logging.info("Setting pin %d to 1.0", test_pin) - self._io_status_controller_interface.set_io(fun=1, pin=test_pin, state=1.0) - - # Wait until the pin state has changed - pin_state = False - end_time = time.time() + 5 - while not pin_state and time.time() < end_time: - rclpy.spin_once(self.node, timeout_sec=0.1) - if io_msg is not None: - pin_state = io_msg.digital_out_states[test_pin].state - - self.assertEqual(pin_state, 1.0) - - # Set pin 0 to 0.0 - logging.info("Setting pin %d to 0.0", test_pin) - self._io_status_controller_interface.set_io(fun=1, pin=test_pin, state=0.0) - - # Wait until the pin state has changed back - end_time = time.time() + 5 - while pin_state and time.time() < end_time: - rclpy.spin_once(self.node, timeout_sec=0.1) - if io_msg is not None: - pin_state = io_msg.digital_out_states[test_pin].state - - self.assertEqual(pin_state, 0.0) - - # Clean up io subscription - self.node.destroy_subscription(io_states_sub) - def test_trajectory(self, tf_prefix): """Test robot movement.""" self.assertTrue( From 33ce7ed331cc129e8307f4c18ce51052bac2415b Mon Sep 17 00:00:00 2001 From: URJala Date: Tue, 8 Apr 2025 14:04:27 +0000 Subject: [PATCH 08/22] finish splitting passthrough controller tests --- ...integration_test_passthrough_controller.py | 200 ++++++++++-------- 1 file changed, 110 insertions(+), 90 deletions(-) diff --git a/ur_robot_driver/test/integration_test_passthrough_controller.py b/ur_robot_driver/test/integration_test_passthrough_controller.py index 61942c37a..aa72745ad 100644 --- a/ur_robot_driver/test/integration_test_passthrough_controller.py +++ b/ur_robot_driver/test/integration_test_passthrough_controller.py @@ -77,7 +77,7 @@ def generate_test_description(tf_prefix): Duration(sec=8, nanosec=0), Duration(sec=12, nanosec=0), ] -TEST_TRAJECTORY = zip(time_vec, waypts) +TEST_TRAJECTORY = [(time_vec[i], waypts[i]) for i in range(len(waypts))] class RobotDriverTest(unittest.TestCase): @@ -165,96 +165,116 @@ def test_passthrough_trajectory(self, tf_prefix): ) self.assertEqual(result.error_code, FollowJointTrajectory.Result.SUCCESSFUL) - # def test_quintic_trajectory(self, tf_prefix): - # # Full quintic trajectory - - # trajectory = JointTrajectory( - # points=[ - # JointTrajectoryPoint( - # positions=pos, - # time_from_start=times, - # velocities=[0.0, 0.0, 0.0, 0.0, 0.0, 0.0], - # accelerations=[0.0, 0.0, 0.0, 0.0, 0.0, 0.0], - # ) - # for (times, pos) in TEST_TRAJECTORY - # ], - # joint_names=[tf_prefix + joint for joint in ROBOT_JOINTS], - # ) - # goal_time_tolerance = Duration(sec=1, nanosec=0) - # goal_tolerance = [ - # JointTolerance(position=0.01, name=tf_prefix + joint) for joint in ROBOT_JOINTS - # ] - # goal_handle = self._passthrough_forward_joint_trajectory.send_goal( - # trajectory=trajectory, - # goal_time_tolerance=goal_time_tolerance, - # goal_tolerance=goal_tolerance, - # ) + def test_quintic_trajectory(self, tf_prefix): + # Full quintic trajectory + self.assertTrue( + self._controller_manager_interface.switch_controller( + strictness=SwitchController.Request.BEST_EFFORT, + activate_controllers=["passthrough_trajectory_controller"], + deactivate_controllers=["scaled_joint_trajectory_controller"], + ).ok + ) + trajectory = JointTrajectory( + points=[ + JointTrajectoryPoint( + positions=pos, + time_from_start=times, + velocities=[0.0, 0.0, 0.0, 0.0, 0.0, 0.0], + accelerations=[0.0, 0.0, 0.0, 0.0, 0.0, 0.0], + ) + for (times, pos) in TEST_TRAJECTORY + ], + joint_names=[tf_prefix + joint for joint in ROBOT_JOINTS], + ) + goal_time_tolerance = Duration(sec=1, nanosec=0) + goal_tolerance = [ + JointTolerance(position=0.01, name=tf_prefix + joint) for joint in ROBOT_JOINTS + ] + goal_handle = self._passthrough_forward_joint_trajectory.send_goal( + trajectory=trajectory, + goal_time_tolerance=goal_time_tolerance, + goal_tolerance=goal_tolerance, + ) - # self.assertTrue(goal_handle.accepted) - # if goal_handle.accepted: - # result = self._passthrough_forward_joint_trajectory.get_result( - # goal_handle, TIMEOUT_EXECUTE_TRAJECTORY - # ) - # self.assertEqual(result.error_code, FollowJointTrajectory.Result.SUCCESSFUL) + self.assertTrue(goal_handle.accepted) + if goal_handle.accepted: + result = self._passthrough_forward_joint_trajectory.get_result( + goal_handle, TIMEOUT_EXECUTE_TRAJECTORY + ) + self.assertEqual(result.error_code, FollowJointTrajectory.Result.SUCCESSFUL) - # def test_impossible_goal_tolerance_fails(self, tf_prefix): - # # Test impossible goal tolerance, should fail. + def test_impossible_goal_tolerance_fails(self, tf_prefix): + # Test impossible goal tolerance, should fail. + self.assertTrue( + self._controller_manager_interface.switch_controller( + strictness=SwitchController.Request.BEST_EFFORT, + activate_controllers=["passthrough_trajectory_controller"], + deactivate_controllers=["scaled_joint_trajectory_controller"], + ).ok + ) + trajectory = JointTrajectory( + points=[ + JointTrajectoryPoint(positions=pos, time_from_start=times) + for (times, pos) in TEST_TRAJECTORY + ], + joint_names=[tf_prefix + joint for joint in ROBOT_JOINTS], + ) + goal_tolerance = [ + JointTolerance(position=0.000000001, name=tf_prefix + joint) for joint in ROBOT_JOINTS + ] + goal_time_tolerance = Duration(sec=1, nanosec=0) + goal_handle = self._passthrough_forward_joint_trajectory.send_goal( + trajectory=trajectory, + goal_time_tolerance=goal_time_tolerance, + goal_tolerance=goal_tolerance, + ) + self.assertTrue(goal_handle.accepted) + if goal_handle.accepted: + result = self._passthrough_forward_joint_trajectory.get_result( + goal_handle, TIMEOUT_EXECUTE_TRAJECTORY + ) + self.assertEqual( + result.error_code, FollowJointTrajectory.Result.GOAL_TOLERANCE_VIOLATED + ) - # trajectory = JointTrajectory( - # points=[ - # JointTrajectoryPoint(positions=pos, time_from_start=times) - # for (times, pos) in TEST_TRAJECTORY - # ], - # joint_names=[tf_prefix + joint for joint in ROBOT_JOINTS], - # ) - # goal_tolerance = [ - # JointTolerance(position=0.000000001, name=tf_prefix + joint) for joint in ROBOT_JOINTS - # ] - # goal_time_tolerance = Duration(sec=1, nanosec=0) - # goal_handle = self._passthrough_forward_joint_trajectory.send_goal( - # trajectory=trajectory, - # goal_time_tolerance=goal_time_tolerance, - # goal_tolerance=goal_tolerance, - # ) - # self.assertTrue(goal_handle.accepted) - # if goal_handle.accepted: - # result = self._passthrough_forward_joint_trajectory.get_result( - # goal_handle, TIMEOUT_EXECUTE_TRAJECTORY - # ) - # self.assertEqual( - # result.error_code, FollowJointTrajectory.Result.GOAL_TOLERANCE_VIOLATED - # ) + def test_impossible_goal_time_tolerance_fails(self, tf_prefix): + # Test impossible goal time + self.assertTrue( + self._controller_manager_interface.switch_controller( + strictness=SwitchController.Request.BEST_EFFORT, + activate_controllers=["passthrough_trajectory_controller"], + deactivate_controllers=["scaled_joint_trajectory_controller"], + ).ok + ) - # def test_impossible_goal_time_tolerance_fails(self, tf_prefix): - # # Test impossible goal time - # goal_tolerance = [ - # JointTolerance(position=0.01, name=tf_prefix + joint) for joint in ROBOT_JOINTS - # ] - # goal_time_tolerance = Duration(sec=0, nanosec=10) - # trajectory = JointTrajectory( - # points=[ - # JointTrajectoryPoint(positions=pos, time_from_start=times) - # for (times, pos) in TEST_TRAJECTORY - # ], - # joint_names=[tf_prefix + joint for joint in ROBOT_JOINTS], - # ) - # goal_handle = self._passthrough_forward_joint_trajectory.send_goal( - # trajectory=trajectory, - # goal_time_tolerance=goal_time_tolerance, - # goal_tolerance=goal_tolerance, - # ) - # self.assertTrue(goal_handle.accepted) - # if goal_handle.accepted: - # result = self._passthrough_forward_joint_trajectory.get_result( - # goal_handle, TIMEOUT_EXECUTE_TRAJECTORY - # ) - # self.assertEqual( - # result.error_code, FollowJointTrajectory.Result.GOAL_TOLERANCE_VIOLATED - # ) - # self.assertTrue( - # self._controller_manager_interface.switch_controller( - # strictness=SwitchController.Request.BEST_EFFORT, - # deactivate_controllers=["passthrough_trajectory_controller"], - # activate_controllers=["scaled_joint_trajectory_controller"], - # ).ok - # ) + goal_tolerance = [ + JointTolerance(position=0.01, name=tf_prefix + joint) for joint in ROBOT_JOINTS + ] + goal_time_tolerance = Duration(sec=0, nanosec=10) + trajectory = JointTrajectory( + points=[ + JointTrajectoryPoint(positions=pos, time_from_start=times) + for (times, pos) in TEST_TRAJECTORY + ], + joint_names=[tf_prefix + joint for joint in ROBOT_JOINTS], + ) + goal_handle = self._passthrough_forward_joint_trajectory.send_goal( + trajectory=trajectory, + goal_time_tolerance=goal_time_tolerance, + goal_tolerance=goal_tolerance, + ) + self.assertTrue(goal_handle.accepted) + if goal_handle.accepted: + result = self._passthrough_forward_joint_trajectory.get_result( + goal_handle, TIMEOUT_EXECUTE_TRAJECTORY + ) + self.assertEqual( + result.error_code, FollowJointTrajectory.Result.GOAL_TOLERANCE_VIOLATED + ) + self.assertTrue( + self._controller_manager_interface.switch_controller( + strictness=SwitchController.Request.BEST_EFFORT, + deactivate_controllers=["passthrough_trajectory_controller"], + activate_controllers=["scaled_joint_trajectory_controller"], + ).ok + ) From 45b22f1752638ffcca4926ce901dec07014f1238 Mon Sep 17 00:00:00 2001 From: URJala Date: Tue, 8 Apr 2025 14:14:59 +0000 Subject: [PATCH 09/22] Add the separate tests to cmake file --- ur_robot_driver/CMakeLists.txt | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/ur_robot_driver/CMakeLists.txt b/ur_robot_driver/CMakeLists.txt index 1a76a5cab..1f35f4631 100644 --- a/ur_robot_driver/CMakeLists.txt +++ b/ur_robot_driver/CMakeLists.txt @@ -243,7 +243,7 @@ if(BUILD_TESTING) TIMEOUT 180 ) - add_launch_test(test/robot_driver.py + add_launch_test(test/integration_test_scaled_joint_controller.py TIMEOUT 800 ) @@ -259,6 +259,18 @@ if(BUILD_TESTING) TIMEOUT 500 ) + add_launch_test(test/integration_test_config_controller.py + TIMEOUT + 800 + ) + add_launch_test(test/integration_test_passthrough_controller.py + TIMEOUT + 800 + ) + add_launch_test(test/integration_test_io_controller.py + TIMEOUT + 800 + ) add_launch_test(test/integration_test_tool_contact.py TIMEOUT 800 From 09b36eaeb2a035d1ead43c1169c61561a3b57ca9 Mon Sep 17 00:00:00 2001 From: URJala Date: Tue, 8 Apr 2025 14:26:38 +0000 Subject: [PATCH 10/22] formatting --- ur_robot_driver/test/test_common.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/ur_robot_driver/test/test_common.py b/ur_robot_driver/test/test_common.py index ce53b268c..4ece7e68e 100644 --- a/ur_robot_driver/test/test_common.py +++ b/ur_robot_driver/test/test_common.py @@ -375,7 +375,8 @@ def generate_dashboard_test_description(): def generate_driver_test_description( tf_prefix="", initial_joint_controller="scaled_joint_trajectory_controller", - controller_spawner_timeout=TIMEOUT_WAIT_SERVICE_INITIAL, use_mock_hardware="false", + controller_spawner_timeout=TIMEOUT_WAIT_SERVICE_INITIAL, + use_mock_hardware="false", ): ur_type = LaunchConfiguration("ur_type") From 45dc4b78350342944ea7772a9c1ab243d24690b1 Mon Sep 17 00:00:00 2001 From: URJala Date: Tue, 22 Apr 2025 10:32:01 +0000 Subject: [PATCH 11/22] Add mock hardware test description --- ur_robot_driver/test/test_common.py | 36 +++++++++++++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/ur_robot_driver/test/test_common.py b/ur_robot_driver/test/test_common.py index 4ece7e68e..573a29b17 100644 --- a/ur_robot_driver/test/test_common.py +++ b/ur_robot_driver/test/test_common.py @@ -372,6 +372,42 @@ def generate_dashboard_test_description(): ) +def generate_mock_hardware_test_description( + tf_prefix="", + initial_joint_controller="scaled_joint_trajectory_controller", + controller_spawner_timeout=TIMEOUT_WAIT_SERVICE_INITIAL, + use_mock_hardware="false", +): + + ur_type = LaunchConfiguration("ur_type") + + launch_arguments = { + "robot_ip": "0.0.0.0", + "ur_type": ur_type, + "launch_rviz": "false", + "controller_spawner_timeout": str(controller_spawner_timeout), + "initial_joint_controller": initial_joint_controller, + "headless_mode": "true", + "launch_dashboard_client": "true", + "start_joint_controller": "false", + "use_mock_hardware": use_mock_hardware, + "mock_sensor_commands": use_mock_hardware, + } + if tf_prefix: + launch_arguments["tf_prefix"] = tf_prefix + + robot_driver = IncludeLaunchDescription( + PythonLaunchDescriptionSource( + PathJoinSubstitution( + [FindPackageShare("ur_robot_driver"), "launch", "ur_control.launch.py"] + ) + ), + launch_arguments=launch_arguments.items(), + ) + + return LaunchDescription(_declare_launch_arguments() + [ReadyToTest(), robot_driver]) + + def generate_driver_test_description( tf_prefix="", initial_joint_controller="scaled_joint_trajectory_controller", From 96f550f06b720726c8ff9f694e46c15152e8d4f5 Mon Sep 17 00:00:00 2001 From: URJala Date: Tue, 22 Apr 2025 10:34:14 +0000 Subject: [PATCH 12/22] Move all mock harrdware tests in to their own file --- .../integration_test_config_controller.py | 21 +- .../test/integration_test_io_controller.py | 23 +- ...ntegration_test_scaled_joint_controller.py | 23 +- ur_robot_driver/test/test_mock_hardware.py | 610 ++++++++++++++++++ 4 files changed, 628 insertions(+), 49 deletions(-) create mode 100644 ur_robot_driver/test/test_mock_hardware.py diff --git a/ur_robot_driver/test/integration_test_config_controller.py b/ur_robot_driver/test/integration_test_config_controller.py index 0699e6fe0..1e29de8e4 100644 --- a/ur_robot_driver/test/integration_test_config_controller.py +++ b/ur_robot_driver/test/integration_test_config_controller.py @@ -50,23 +50,18 @@ @pytest.mark.launch_test -@launch_testing.parametrize( - "tf_prefix, use_mock_hardware", [("", "false"), ("my_ur_", "false"), ("", "true")] -) -def generate_test_description(tf_prefix, use_mock_hardware): - return generate_driver_test_description( - tf_prefix=tf_prefix, use_mock_hardware=use_mock_hardware - ) +@launch_testing.parametrize("tf_prefix", [(""), ("my_ur_")]) +def generate_test_description(tf_prefix): + return generate_driver_test_description(tf_prefix=tf_prefix) class RobotDriverTest(unittest.TestCase): @classmethod - def setUpClass(cls, use_mock_hardware): + def setUpClass(cls): # Initialize the ROS context rclpy.init() cls.node = Node("robot_driver_test") time.sleep(1) - cls.mock_hardware = use_mock_hardware == "true" cls.init_robot(cls) @classmethod @@ -76,10 +71,7 @@ def tearDownClass(cls): rclpy.shutdown() def init_robot(self): - if not self.mock_hardware: - self._dashboard_interface = DashboardInterface(self.node) - else: - self._dashboard_interface = None + self._dashboard_interface = DashboardInterface(self.node) self._controller_manager_interface = ControllerManagerInterface(self.node) self._io_status_controller_interface = IoStatusInterface(self.node) self._configuration_controller_interface = ConfigurationInterface(self.node) @@ -96,8 +88,7 @@ def init_robot(self): ) def setUp(self): - if self._dashboard_interface: - self._dashboard_interface.start_robot() + self._dashboard_interface.start_robot() time.sleep(1) self.assertTrue(self._io_status_controller_interface.resend_robot_program().success) diff --git a/ur_robot_driver/test/integration_test_io_controller.py b/ur_robot_driver/test/integration_test_io_controller.py index bd4391204..e1aa83706 100644 --- a/ur_robot_driver/test/integration_test_io_controller.py +++ b/ur_robot_driver/test/integration_test_io_controller.py @@ -52,23 +52,18 @@ @pytest.mark.launch_test -@launch_testing.parametrize( - "tf_prefix, use_mock_hardware", [("", "false"), ("my_ur_", "false"), ("", "true")] -) -def generate_test_description(tf_prefix, use_mock_hardware): - return generate_driver_test_description( - tf_prefix=tf_prefix, use_mock_hardware=use_mock_hardware - ) +@launch_testing.parametrize("tf_prefix", [(""), ("my_ur_")]) +def generate_test_description(tf_prefix): + return generate_driver_test_description(tf_prefix=tf_prefix) class RobotDriverTest(unittest.TestCase): @classmethod - def setUpClass(cls, use_mock_hardware): + def setUpClass(cls): # Initialize the ROS context rclpy.init() cls.node = Node("robot_driver_test") time.sleep(1) - cls.mock_hardware = use_mock_hardware == "true" cls.init_robot(cls) @classmethod @@ -78,10 +73,7 @@ def tearDownClass(cls): rclpy.shutdown() def init_robot(self): - if not self.mock_hardware: - self._dashboard_interface = DashboardInterface(self.node) - else: - self._dashboard_interface = None + self._dashboard_interface = DashboardInterface(self.node) self._controller_manager_interface = ControllerManagerInterface(self.node) self._io_status_controller_interface = IoStatusInterface(self.node) self._configuration_controller_interface = ConfigurationInterface(self.node) @@ -98,8 +90,7 @@ def init_robot(self): ) def setUp(self): - if self._dashboard_interface: - self._dashboard_interface.start_robot() + self._dashboard_interface.start_robot() time.sleep(1) self.assertTrue(self._io_status_controller_interface.resend_robot_program().success) @@ -109,8 +100,6 @@ def setUp(self): def test_set_io(self): """Test to set an IO and check whether it has been set.""" - if self.mock_hardware: - return True # Create io callback to verify result io_msg = None diff --git a/ur_robot_driver/test/integration_test_scaled_joint_controller.py b/ur_robot_driver/test/integration_test_scaled_joint_controller.py index 3daabe6e5..7ef2e529b 100755 --- a/ur_robot_driver/test/integration_test_scaled_joint_controller.py +++ b/ur_robot_driver/test/integration_test_scaled_joint_controller.py @@ -56,23 +56,18 @@ @pytest.mark.launch_test -@launch_testing.parametrize( - "tf_prefix, use_mock_hardware", [("", "false"), ("my_ur_", "false"), ("", "true")] -) -def generate_test_description(tf_prefix, use_mock_hardware): - return generate_driver_test_description( - tf_prefix=tf_prefix, use_mock_hardware=use_mock_hardware - ) +@launch_testing.parametrize("tf_prefix", [(""), ("my_ur_")]) +def generate_test_description(tf_prefix): + return generate_driver_test_description(tf_prefix=tf_prefix) class RobotDriverTest(unittest.TestCase): @classmethod - def setUpClass(cls, use_mock_hardware): + def setUpClass(cls): # Initialize the ROS context rclpy.init() cls.node = Node("robot_driver_test") time.sleep(1) - cls.mock_hardware = use_mock_hardware == "true" cls.init_robot(cls) @classmethod @@ -82,10 +77,7 @@ def tearDownClass(cls): rclpy.shutdown() def init_robot(self): - if not self.mock_hardware: - self._dashboard_interface = DashboardInterface(self.node) - else: - self._dashboard_interface = None + self._dashboard_interface = DashboardInterface(self.node) self._controller_manager_interface = ControllerManagerInterface(self.node) self._io_status_controller_interface = IoStatusInterface(self.node) self._configuration_controller_interface = ConfigurationInterface(self.node) @@ -102,8 +94,7 @@ def init_robot(self): ) def setUp(self): - if self._dashboard_interface: - self._dashboard_interface.start_robot() + self._dashboard_interface.start_robot() time.sleep(1) self.assertTrue(self._io_status_controller_interface.resend_robot_program().success) @@ -227,8 +218,6 @@ def test_trajectory_scaled(self, tf_prefix): def test_trajectory_scaled_aborts_on_violation(self, tf_prefix): """Test that the robot correctly aborts the trajectory when the constraints are violated.""" - if self.mock_hardware: - return True # Construct test trajectory test_trajectory = [ (Duration(sec=6, nanosec=0), [0.0 for j in ROBOT_JOINTS]), diff --git a/ur_robot_driver/test/test_mock_hardware.py b/ur_robot_driver/test/test_mock_hardware.py new file mode 100644 index 000000000..d16812c70 --- /dev/null +++ b/ur_robot_driver/test/test_mock_hardware.py @@ -0,0 +1,610 @@ +#!/usr/bin/env python +# Copyright 2025, Universal Robots A/S +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# +# * Redistributions in binary form must reproduce the above copyright +# notice, this list of conditions and the following disclaimer in the +# documentation and/or other materials provided with the distribution. +# +# * Neither the name of the {copyright_holder} nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE +# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +# POSSIBILITY OF SUCH DAMAGE. + +import os +import sys +import time +import unittest +import logging + +import launch_testing +import pytest +import rclpy +from rclpy.node import Node +from builtin_interfaces.msg import Duration +from control_msgs.action import FollowJointTrajectory +from controller_manager_msgs.srv import SwitchController +from trajectory_msgs.msg import JointTrajectory, JointTrajectoryPoint + + +sys.path.append(os.path.dirname(__file__)) +from test_common import ( # noqa: E402 + ActionInterface, + ControllerManagerInterface, + IoStatusInterface, + ConfigurationInterface, + generate_mock_hardware_test_description, + ROBOT_JOINTS, + TIMEOUT_EXECUTE_TRAJECTORY, +) + + +@pytest.mark.launch_test +@launch_testing.parametrize("tf_prefix, use_mock_hardware", [("", "true"), ("my_ur_", "true")]) +def generate_test_description(tf_prefix, use_mock_hardware): + return generate_mock_hardware_test_description( + tf_prefix=tf_prefix, use_mock_hardware=use_mock_hardware + ) + + +class RobotDriverTest(unittest.TestCase): + @classmethod + def setUpClass(cls, use_mock_hardware): + # Initialize the ROS context + rclpy.init() + cls.node = Node("mock_hardware_test") + time.sleep(1) + cls.mock_hardware = use_mock_hardware == "true" + cls.init_robot(cls) + + @classmethod + def tearDownClass(cls): + # Shutdown the ROS context + cls.node.destroy_node() + rclpy.shutdown() + + def init_robot(self): + self._dashboard_interface = None + self._controller_manager_interface = ControllerManagerInterface(self.node) + self._io_status_controller_interface = IoStatusInterface(self.node) + self._configuration_controller_interface = ConfigurationInterface(self.node) + + self._scaled_follow_joint_trajectory = ActionInterface( + self.node, + "/scaled_joint_trajectory_controller/follow_joint_trajectory", + FollowJointTrajectory, + ) + + def setUp(self): + time.sleep(1) + self.assertTrue(self._io_status_controller_interface.resend_robot_program().success) + + # + # Test functions + # + + def test_get_robot_software_version(self): + self.assertEqual( + self._configuration_controller_interface.get_robot_software_version().major, 1 + ) + + def test_start_scaled_jtc_controller(self): + self.assertTrue( + self._controller_manager_interface.switch_controller( + strictness=SwitchController.Request.BEST_EFFORT, + activate_controllers=["scaled_joint_trajectory_controller"], + ).ok + ) + + def test_trajectory(self, tf_prefix): + """Test robot movement.""" + # Construct test trajectory + test_trajectory = [ + (Duration(sec=6, nanosec=0), [0.0 for j in ROBOT_JOINTS]), + (Duration(sec=9, nanosec=0), [-0.5 for j in ROBOT_JOINTS]), + (Duration(sec=12, nanosec=0), [-1.0 for j in ROBOT_JOINTS]), + ] + + trajectory = JointTrajectory( + joint_names=[tf_prefix + joint for joint in ROBOT_JOINTS], + points=[ + JointTrajectoryPoint(positions=test_pos, time_from_start=test_time) + for (test_time, test_pos) in test_trajectory + ], + ) + + # Sending trajectory goal + logging.info("Sending simple goal") + goal_handle = self._scaled_follow_joint_trajectory.send_goal(trajectory=trajectory) + self.assertTrue(goal_handle.accepted) + + # Verify execution + result = self._scaled_follow_joint_trajectory.get_result( + goal_handle, TIMEOUT_EXECUTE_TRAJECTORY + ) + self.assertEqual(result.error_code, FollowJointTrajectory.Result.SUCCESSFUL) + + def test_illegal_trajectory(self, tf_prefix): + """ + Test trajectory server. + + This is more of a validation test that the testing suite does the right thing + """ + # Construct test trajectory, the second point wrongly starts before the first + test_trajectory = [ + (Duration(sec=6, nanosec=0), [0.0 for j in ROBOT_JOINTS]), + (Duration(sec=3, nanosec=0), [-0.5 for j in ROBOT_JOINTS]), + ] + + trajectory = JointTrajectory( + joint_names=[tf_prefix + joint for joint in ROBOT_JOINTS], + points=[ + JointTrajectoryPoint(positions=test_pos, time_from_start=test_time) + for (test_time, test_pos) in test_trajectory + ], + ) + + # Send illegal goal + logging.info("Sending illegal goal") + goal_handle = self._scaled_follow_joint_trajectory.send_goal( + trajectory=trajectory, + ) + + # Verify the failure is correctly detected + self.assertFalse(goal_handle.accepted) + + def test_trajectory_scaled(self, tf_prefix): + """Test robot movement.""" + # Construct test trajectory + test_trajectory = [ + (Duration(sec=6, nanosec=0), [0.0 for j in ROBOT_JOINTS]), + (Duration(sec=6, nanosec=500000000), [-1.0 for j in ROBOT_JOINTS]), + ] + + trajectory = JointTrajectory( + joint_names=[tf_prefix + joint for joint in ROBOT_JOINTS], + points=[ + JointTrajectoryPoint(positions=test_pos, time_from_start=test_time) + for (test_time, test_pos) in test_trajectory + ], + ) + + # Execute trajectory + logging.info("Sending goal for robot to follow") + goal_handle = self._scaled_follow_joint_trajectory.send_goal(trajectory=trajectory) + self.assertTrue(goal_handle.accepted) + + # Verify execution + result = self._scaled_follow_joint_trajectory.get_result( + goal_handle, + TIMEOUT_EXECUTE_TRAJECTORY, + ) + self.assertEqual(result.error_code, FollowJointTrajectory.Result.SUCCESSFUL) + + # Controller switching tests + def test_activating_multiple_controllers_same_interface_fails(self): + # Deactivate all writing controllers + self.assertTrue( + self._controller_manager_interface.switch_controller( + strictness=SwitchController.Request.BEST_EFFORT, + deactivate_controllers=[ + "scaled_joint_trajectory_controller", + "joint_trajectory_controller", + "forward_position_controller", + "forward_velocity_controller", + "passthrough_trajectory_controller", + "force_mode_controller", + "freedrive_mode_controller", + ], + ).ok + ) + + # Activating different motion controllers should not be possible + self.assertFalse( + self._controller_manager_interface.switch_controller( + strictness=SwitchController.Request.STRICT, + activate_controllers=[ + "scaled_joint_trajectory_controller", + "joint_trajectory_controller", + ], + ).ok + ) + self.assertFalse( + self._controller_manager_interface.switch_controller( + strictness=SwitchController.Request.STRICT, + activate_controllers=[ + "scaled_joint_trajectory_controller", + "forward_position_controller", + ], + ).ok + ) + + def test_activating_multiple_controllers_different_interface_fails(self): + # Deactivate all writing controllers + self.assertTrue( + self._controller_manager_interface.switch_controller( + strictness=SwitchController.Request.BEST_EFFORT, + deactivate_controllers=[ + "scaled_joint_trajectory_controller", + "joint_trajectory_controller", + "forward_position_controller", + "forward_velocity_controller", + "force_mode_controller", + "passthrough_trajectory_controller", + "freedrive_mode_controller", + ], + ).ok + ) + self.assertFalse( + self._controller_manager_interface.switch_controller( + strictness=SwitchController.Request.STRICT, + activate_controllers=[ + "scaled_joint_trajectory_controller", + "forward_velocity_controller", + ], + ).ok + ) + self.assertFalse( + self._controller_manager_interface.switch_controller( + strictness=SwitchController.Request.STRICT, + activate_controllers=[ + "scaled_joint_trajectory_controller", + "passthrough_trajectory_controller", + ], + ).ok + ) + self.assertFalse( + self._controller_manager_interface.switch_controller( + strictness=SwitchController.Request.STRICT, + activate_controllers=[ + "forward_velocity_controller", + "passthrough_trajectory_controller", + ], + ).ok + ) + self.assertFalse( + self._controller_manager_interface.switch_controller( + strictness=SwitchController.Request.STRICT, + activate_controllers=[ + "forward_position_controller", + "forward_velocity_controller", + ], + ).ok + ) + self.assertFalse( + self._controller_manager_interface.switch_controller( + strictness=SwitchController.Request.STRICT, + activate_controllers=[ + "scaled_joint_trajectory_controller", + "force_mode_controller", + ], + ).ok + ) + self.assertFalse( + self._controller_manager_interface.switch_controller( + strictness=SwitchController.Request.STRICT, + activate_controllers=[ + "scaled_joint_trajectory_controller", + "freedrive_mode_controller", + ], + ).ok + ) + + def test_activating_controller_with_running_position_controller_fails(self): + # Having a position-based controller active, no other controller should be able to + # activate. + self.assertTrue( + self._controller_manager_interface.switch_controller( + strictness=SwitchController.Request.BEST_EFFORT, + activate_controllers=[ + "scaled_joint_trajectory_controller", + ], + deactivate_controllers=[ + "joint_trajectory_controller", + "forward_position_controller", + "forward_velocity_controller", + "force_mode_controller", + "freedrive_mode_controller", + "passthrough_trajectory_controller", + ], + ).ok + ) + self.assertFalse( + self._controller_manager_interface.switch_controller( + strictness=SwitchController.Request.STRICT, + activate_controllers=[ + "forward_position_controller", + ], + ).ok + ) + self.assertFalse( + self._controller_manager_interface.switch_controller( + strictness=SwitchController.Request.STRICT, + activate_controllers=[ + "forward_velocity_controller", + ], + ).ok + ) + self.assertFalse( + self._controller_manager_interface.switch_controller( + strictness=SwitchController.Request.STRICT, + activate_controllers=[ + "freedrive_mode_controller", + ], + ).ok + ) + self.assertFalse( + self._controller_manager_interface.switch_controller( + strictness=SwitchController.Request.STRICT, + activate_controllers=[ + "passthrough_trajectory_controller", + ], + ).ok + ) + self.assertFalse( + self._controller_manager_interface.switch_controller( + strictness=SwitchController.Request.STRICT, + activate_controllers=[ + "force_mode_controller", + ], + ).ok + ) + # Stop controller again + self.assertTrue( + self._controller_manager_interface.switch_controller( + strictness=SwitchController.Request.STRICT, + deactivate_controllers=[ + "scaled_joint_trajectory_controller", + ], + ).ok + ) + + def test_activating_controller_with_running_passthrough_trajectory_controller_fails(self): + # Having a position-based controller active, no other controller should be able to + # activate. + self.assertTrue( + self._controller_manager_interface.switch_controller( + strictness=SwitchController.Request.BEST_EFFORT, + activate_controllers=["passthrough_trajectory_controller"], + deactivate_controllers=[ + "scaled_joint_trajectory_controller", + "joint_trajectory_controller", + "forward_position_controller", + "forward_velocity_controller", + "force_mode_controller", # tested in separate test + ], + ).ok + ) + self.assertFalse( + self._controller_manager_interface.switch_controller( + strictness=SwitchController.Request.STRICT, + activate_controllers=[ + "scaled_joint_trajectory_controller", + ], + ).ok + ) + self.assertFalse( + self._controller_manager_interface.switch_controller( + strictness=SwitchController.Request.STRICT, + activate_controllers=[ + "forward_position_controller", + ], + ).ok + ) + self.assertFalse( + self._controller_manager_interface.switch_controller( + strictness=SwitchController.Request.STRICT, + activate_controllers=[ + "forward_velocity_controller", + ], + ).ok + ) + self.assertFalse( + self._controller_manager_interface.switch_controller( + strictness=SwitchController.Request.STRICT, + activate_controllers=[ + "joint_trajectory_controller", + ], + ).ok + ) + self.assertFalse( + self._controller_manager_interface.switch_controller( + strictness=SwitchController.Request.STRICT, + activate_controllers=[ + "freedrive_mode_controller", + ], + ).ok + ) + # Stop the controller again + self.assertTrue( + self._controller_manager_interface.switch_controller( + strictness=SwitchController.Request.STRICT, + deactivate_controllers=[ + "passthrough_trajectory_controller", + ], + ).ok + ) + + def test_force_mode_and_trajectory_passthrough_controller_are_compatible(self): + # Deactivate all writing controllers + self.assertTrue( + self._controller_manager_interface.switch_controller( + strictness=SwitchController.Request.BEST_EFFORT, + deactivate_controllers=[ + "scaled_joint_trajectory_controller", + "joint_trajectory_controller", + "forward_position_controller", + "forward_velocity_controller", + "passthrough_trajectory_controller", + "force_mode_controller", + ], + ).ok + ) + + time.sleep(3) + + # Start both together + self.assertTrue( + self._controller_manager_interface.switch_controller( + strictness=SwitchController.Request.STRICT, + activate_controllers=[ + "passthrough_trajectory_controller", + "force_mode_controller", + ], + ).ok + ) + + # With passthrough traj controller running, start force mode controller + self.assertTrue( + self._controller_manager_interface.switch_controller( + strictness=SwitchController.Request.BEST_EFFORT, + activate_controllers=[ + "passthrough_trajectory_controller", + ], + deactivate_controllers=[ + "scaled_joint_trajectory_controller", + "joint_trajectory_controller", + "forward_position_controller", + "forward_velocity_controller", + "force_mode_controller", + ], + ).ok + ) + self.assertTrue( + self._controller_manager_interface.switch_controller( + strictness=SwitchController.Request.STRICT, + activate_controllers=[ + "force_mode_controller", + ], + ).ok + ) + + # With start force mode controller running, passthrough traj controller + self.assertTrue( + self._controller_manager_interface.switch_controller( + strictness=SwitchController.Request.BEST_EFFORT, + activate_controllers=[ + "force_mode_controller", + ], + deactivate_controllers=[ + "scaled_joint_trajectory_controller", + "joint_trajectory_controller", + "forward_position_controller", + "forward_velocity_controller", + "passthrough_trajectory_controller", + ], + ).ok + ) + self.assertTrue( + self._controller_manager_interface.switch_controller( + strictness=SwitchController.Request.STRICT, + activate_controllers=[ + "passthrough_trajectory_controller", + ], + ).ok + ) + + def test_tool_contact_compatibility(self): + # Deactivate all writing controllers + self.assertTrue( + self._controller_manager_interface.switch_controller( + strictness=SwitchController.Request.BEST_EFFORT, + deactivate_controllers=[ + "scaled_joint_trajectory_controller", + "joint_trajectory_controller", + "forward_position_controller", + "forward_velocity_controller", + "passthrough_trajectory_controller", + "force_mode_controller", + "tool_contact_controller", + ], + ).ok + ) + + time.sleep(3) + # Start tool contact controller and JTC + self.assertTrue( + self._controller_manager_interface.switch_controller( + strictness=SwitchController.Request.STRICT, + activate_controllers=[ + "scaled_joint_trajectory_controller", + "tool_contact_controller", + ], + ).ok + ) + self.assertTrue( + self._controller_manager_interface.switch_controller( + strictness=SwitchController.Request.STRICT, + deactivate_controllers=[ + "scaled_joint_trajectory_controller", + "tool_contact_controller", + ], + ).ok + ) + + # Start tool contact controller and passthrough trajectory + self.assertTrue( + self._controller_manager_interface.switch_controller( + strictness=SwitchController.Request.STRICT, + activate_controllers=[ + "passthrough_trajectory_controller", + "tool_contact_controller", + ], + ).ok + ) + self.assertTrue( + self._controller_manager_interface.switch_controller( + strictness=SwitchController.Request.STRICT, + deactivate_controllers=[ + "passthrough_trajectory_controller", + "tool_contact_controller", + ], + ).ok + ) + + # tool contact should not start with force_mode + self.assertFalse( + self._controller_manager_interface.switch_controller( + strictness=SwitchController.Request.STRICT, + activate_controllers=[ + "force_mode_controller", + "tool_contact_controller", + ], + ).ok + ) + self.assertFalse( + self._controller_manager_interface.switch_controller( + strictness=SwitchController.Request.STRICT, + activate_controllers=[ + "tool_contact_controller", + "force_mode_controller", + ], + ).ok + ) + + # tool contact should not start with freedrive + self.assertFalse( + self._controller_manager_interface.switch_controller( + strictness=SwitchController.Request.STRICT, + activate_controllers=[ + "tool_contact_controller", + "freedrive_mode_controller", + ], + ).ok + ) From c868514d56bd7d7c31b394854a133ad062887429 Mon Sep 17 00:00:00 2001 From: URJala Date: Tue, 22 Apr 2025 10:56:18 +0000 Subject: [PATCH 13/22] Remove controller switching tests --- ur_robot_driver/test/test_mock_hardware.py | 412 --------------------- 1 file changed, 412 deletions(-) diff --git a/ur_robot_driver/test/test_mock_hardware.py b/ur_robot_driver/test/test_mock_hardware.py index d16812c70..77b6a640c 100644 --- a/ur_robot_driver/test/test_mock_hardware.py +++ b/ur_robot_driver/test/test_mock_hardware.py @@ -196,415 +196,3 @@ def test_trajectory_scaled(self, tf_prefix): TIMEOUT_EXECUTE_TRAJECTORY, ) self.assertEqual(result.error_code, FollowJointTrajectory.Result.SUCCESSFUL) - - # Controller switching tests - def test_activating_multiple_controllers_same_interface_fails(self): - # Deactivate all writing controllers - self.assertTrue( - self._controller_manager_interface.switch_controller( - strictness=SwitchController.Request.BEST_EFFORT, - deactivate_controllers=[ - "scaled_joint_trajectory_controller", - "joint_trajectory_controller", - "forward_position_controller", - "forward_velocity_controller", - "passthrough_trajectory_controller", - "force_mode_controller", - "freedrive_mode_controller", - ], - ).ok - ) - - # Activating different motion controllers should not be possible - self.assertFalse( - self._controller_manager_interface.switch_controller( - strictness=SwitchController.Request.STRICT, - activate_controllers=[ - "scaled_joint_trajectory_controller", - "joint_trajectory_controller", - ], - ).ok - ) - self.assertFalse( - self._controller_manager_interface.switch_controller( - strictness=SwitchController.Request.STRICT, - activate_controllers=[ - "scaled_joint_trajectory_controller", - "forward_position_controller", - ], - ).ok - ) - - def test_activating_multiple_controllers_different_interface_fails(self): - # Deactivate all writing controllers - self.assertTrue( - self._controller_manager_interface.switch_controller( - strictness=SwitchController.Request.BEST_EFFORT, - deactivate_controllers=[ - "scaled_joint_trajectory_controller", - "joint_trajectory_controller", - "forward_position_controller", - "forward_velocity_controller", - "force_mode_controller", - "passthrough_trajectory_controller", - "freedrive_mode_controller", - ], - ).ok - ) - self.assertFalse( - self._controller_manager_interface.switch_controller( - strictness=SwitchController.Request.STRICT, - activate_controllers=[ - "scaled_joint_trajectory_controller", - "forward_velocity_controller", - ], - ).ok - ) - self.assertFalse( - self._controller_manager_interface.switch_controller( - strictness=SwitchController.Request.STRICT, - activate_controllers=[ - "scaled_joint_trajectory_controller", - "passthrough_trajectory_controller", - ], - ).ok - ) - self.assertFalse( - self._controller_manager_interface.switch_controller( - strictness=SwitchController.Request.STRICT, - activate_controllers=[ - "forward_velocity_controller", - "passthrough_trajectory_controller", - ], - ).ok - ) - self.assertFalse( - self._controller_manager_interface.switch_controller( - strictness=SwitchController.Request.STRICT, - activate_controllers=[ - "forward_position_controller", - "forward_velocity_controller", - ], - ).ok - ) - self.assertFalse( - self._controller_manager_interface.switch_controller( - strictness=SwitchController.Request.STRICT, - activate_controllers=[ - "scaled_joint_trajectory_controller", - "force_mode_controller", - ], - ).ok - ) - self.assertFalse( - self._controller_manager_interface.switch_controller( - strictness=SwitchController.Request.STRICT, - activate_controllers=[ - "scaled_joint_trajectory_controller", - "freedrive_mode_controller", - ], - ).ok - ) - - def test_activating_controller_with_running_position_controller_fails(self): - # Having a position-based controller active, no other controller should be able to - # activate. - self.assertTrue( - self._controller_manager_interface.switch_controller( - strictness=SwitchController.Request.BEST_EFFORT, - activate_controllers=[ - "scaled_joint_trajectory_controller", - ], - deactivate_controllers=[ - "joint_trajectory_controller", - "forward_position_controller", - "forward_velocity_controller", - "force_mode_controller", - "freedrive_mode_controller", - "passthrough_trajectory_controller", - ], - ).ok - ) - self.assertFalse( - self._controller_manager_interface.switch_controller( - strictness=SwitchController.Request.STRICT, - activate_controllers=[ - "forward_position_controller", - ], - ).ok - ) - self.assertFalse( - self._controller_manager_interface.switch_controller( - strictness=SwitchController.Request.STRICT, - activate_controllers=[ - "forward_velocity_controller", - ], - ).ok - ) - self.assertFalse( - self._controller_manager_interface.switch_controller( - strictness=SwitchController.Request.STRICT, - activate_controllers=[ - "freedrive_mode_controller", - ], - ).ok - ) - self.assertFalse( - self._controller_manager_interface.switch_controller( - strictness=SwitchController.Request.STRICT, - activate_controllers=[ - "passthrough_trajectory_controller", - ], - ).ok - ) - self.assertFalse( - self._controller_manager_interface.switch_controller( - strictness=SwitchController.Request.STRICT, - activate_controllers=[ - "force_mode_controller", - ], - ).ok - ) - # Stop controller again - self.assertTrue( - self._controller_manager_interface.switch_controller( - strictness=SwitchController.Request.STRICT, - deactivate_controllers=[ - "scaled_joint_trajectory_controller", - ], - ).ok - ) - - def test_activating_controller_with_running_passthrough_trajectory_controller_fails(self): - # Having a position-based controller active, no other controller should be able to - # activate. - self.assertTrue( - self._controller_manager_interface.switch_controller( - strictness=SwitchController.Request.BEST_EFFORT, - activate_controllers=["passthrough_trajectory_controller"], - deactivate_controllers=[ - "scaled_joint_trajectory_controller", - "joint_trajectory_controller", - "forward_position_controller", - "forward_velocity_controller", - "force_mode_controller", # tested in separate test - ], - ).ok - ) - self.assertFalse( - self._controller_manager_interface.switch_controller( - strictness=SwitchController.Request.STRICT, - activate_controllers=[ - "scaled_joint_trajectory_controller", - ], - ).ok - ) - self.assertFalse( - self._controller_manager_interface.switch_controller( - strictness=SwitchController.Request.STRICT, - activate_controllers=[ - "forward_position_controller", - ], - ).ok - ) - self.assertFalse( - self._controller_manager_interface.switch_controller( - strictness=SwitchController.Request.STRICT, - activate_controllers=[ - "forward_velocity_controller", - ], - ).ok - ) - self.assertFalse( - self._controller_manager_interface.switch_controller( - strictness=SwitchController.Request.STRICT, - activate_controllers=[ - "joint_trajectory_controller", - ], - ).ok - ) - self.assertFalse( - self._controller_manager_interface.switch_controller( - strictness=SwitchController.Request.STRICT, - activate_controllers=[ - "freedrive_mode_controller", - ], - ).ok - ) - # Stop the controller again - self.assertTrue( - self._controller_manager_interface.switch_controller( - strictness=SwitchController.Request.STRICT, - deactivate_controllers=[ - "passthrough_trajectory_controller", - ], - ).ok - ) - - def test_force_mode_and_trajectory_passthrough_controller_are_compatible(self): - # Deactivate all writing controllers - self.assertTrue( - self._controller_manager_interface.switch_controller( - strictness=SwitchController.Request.BEST_EFFORT, - deactivate_controllers=[ - "scaled_joint_trajectory_controller", - "joint_trajectory_controller", - "forward_position_controller", - "forward_velocity_controller", - "passthrough_trajectory_controller", - "force_mode_controller", - ], - ).ok - ) - - time.sleep(3) - - # Start both together - self.assertTrue( - self._controller_manager_interface.switch_controller( - strictness=SwitchController.Request.STRICT, - activate_controllers=[ - "passthrough_trajectory_controller", - "force_mode_controller", - ], - ).ok - ) - - # With passthrough traj controller running, start force mode controller - self.assertTrue( - self._controller_manager_interface.switch_controller( - strictness=SwitchController.Request.BEST_EFFORT, - activate_controllers=[ - "passthrough_trajectory_controller", - ], - deactivate_controllers=[ - "scaled_joint_trajectory_controller", - "joint_trajectory_controller", - "forward_position_controller", - "forward_velocity_controller", - "force_mode_controller", - ], - ).ok - ) - self.assertTrue( - self._controller_manager_interface.switch_controller( - strictness=SwitchController.Request.STRICT, - activate_controllers=[ - "force_mode_controller", - ], - ).ok - ) - - # With start force mode controller running, passthrough traj controller - self.assertTrue( - self._controller_manager_interface.switch_controller( - strictness=SwitchController.Request.BEST_EFFORT, - activate_controllers=[ - "force_mode_controller", - ], - deactivate_controllers=[ - "scaled_joint_trajectory_controller", - "joint_trajectory_controller", - "forward_position_controller", - "forward_velocity_controller", - "passthrough_trajectory_controller", - ], - ).ok - ) - self.assertTrue( - self._controller_manager_interface.switch_controller( - strictness=SwitchController.Request.STRICT, - activate_controllers=[ - "passthrough_trajectory_controller", - ], - ).ok - ) - - def test_tool_contact_compatibility(self): - # Deactivate all writing controllers - self.assertTrue( - self._controller_manager_interface.switch_controller( - strictness=SwitchController.Request.BEST_EFFORT, - deactivate_controllers=[ - "scaled_joint_trajectory_controller", - "joint_trajectory_controller", - "forward_position_controller", - "forward_velocity_controller", - "passthrough_trajectory_controller", - "force_mode_controller", - "tool_contact_controller", - ], - ).ok - ) - - time.sleep(3) - # Start tool contact controller and JTC - self.assertTrue( - self._controller_manager_interface.switch_controller( - strictness=SwitchController.Request.STRICT, - activate_controllers=[ - "scaled_joint_trajectory_controller", - "tool_contact_controller", - ], - ).ok - ) - self.assertTrue( - self._controller_manager_interface.switch_controller( - strictness=SwitchController.Request.STRICT, - deactivate_controllers=[ - "scaled_joint_trajectory_controller", - "tool_contact_controller", - ], - ).ok - ) - - # Start tool contact controller and passthrough trajectory - self.assertTrue( - self._controller_manager_interface.switch_controller( - strictness=SwitchController.Request.STRICT, - activate_controllers=[ - "passthrough_trajectory_controller", - "tool_contact_controller", - ], - ).ok - ) - self.assertTrue( - self._controller_manager_interface.switch_controller( - strictness=SwitchController.Request.STRICT, - deactivate_controllers=[ - "passthrough_trajectory_controller", - "tool_contact_controller", - ], - ).ok - ) - - # tool contact should not start with force_mode - self.assertFalse( - self._controller_manager_interface.switch_controller( - strictness=SwitchController.Request.STRICT, - activate_controllers=[ - "force_mode_controller", - "tool_contact_controller", - ], - ).ok - ) - self.assertFalse( - self._controller_manager_interface.switch_controller( - strictness=SwitchController.Request.STRICT, - activate_controllers=[ - "tool_contact_controller", - "force_mode_controller", - ], - ).ok - ) - - # tool contact should not start with freedrive - self.assertFalse( - self._controller_manager_interface.switch_controller( - strictness=SwitchController.Request.STRICT, - activate_controllers=[ - "tool_contact_controller", - "freedrive_mode_controller", - ], - ).ok - ) From 6aa4b59f926d655087e85274e7190793f1c87b55 Mon Sep 17 00:00:00 2001 From: URJala Date: Tue, 22 Apr 2025 11:00:24 +0000 Subject: [PATCH 14/22] add mock hardware test to build tests --- ur_robot_driver/CMakeLists.txt | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/ur_robot_driver/CMakeLists.txt b/ur_robot_driver/CMakeLists.txt index 1f35f4631..156d1c358 100644 --- a/ur_robot_driver/CMakeLists.txt +++ b/ur_robot_driver/CMakeLists.txt @@ -230,6 +230,11 @@ if(BUILD_TESTING) find_package(ur_msgs REQUIRED) find_package(launch_testing_ament_cmake) + add_launch_test(test/test_mock_hardware.py + TIMEOUT + 800 + ) + if(${UR_ROBOT_DRIVER_BUILD_INTEGRATION_TESTS}) add_launch_test(test/launch_args.py TIMEOUT From d1de97bf71aa0ee1e45f0e4828fdbfeb8bff54f3 Mon Sep 17 00:00:00 2001 From: URJala Date: Mon, 21 Jul 2025 07:58:46 +0000 Subject: [PATCH 15/22] Remove passthrough test from sjtc test script --- ...ntegration_test_scaled_joint_controller.py | 125 ------------------ 1 file changed, 125 deletions(-) diff --git a/ur_robot_driver/test/integration_test_scaled_joint_controller.py b/ur_robot_driver/test/integration_test_scaled_joint_controller.py index 7ef2e529b..9e311a079 100755 --- a/ur_robot_driver/test/integration_test_scaled_joint_controller.py +++ b/ur_robot_driver/test/integration_test_scaled_joint_controller.py @@ -295,128 +295,3 @@ def js_cb(msg): # result = self.get_result("/scaled_joint_trajectory_controller/follow_joint_trajectory", goal_response, TIMEOUT_EXECUTE_TRAJECTORY) # self.assertEqual(result.error_code, FollowJointTrajectory.Result.GOAL_TOLERANCE_VIOLATED) # self.node.get_logger().info("Received result GOAL_TOLERANCE_VIOLATED") - - def test_passthrough_trajectory(self, tf_prefix): - if self.mock_hardware: - return True - self.assertTrue( - self._controller_manager_interface.switch_controller( - strictness=SwitchController.Request.BEST_EFFORT, - activate_controllers=["passthrough_trajectory_controller"], - deactivate_controllers=["scaled_joint_trajectory_controller"], - ).ok - ) - waypts = [ - [-1.58, -1.692, -1.4311, -0.0174, 1.5882, 0.0349], - [-2.5, -1.692, -1.4311, -0.0174, 1.5882, 0.0349], - [-1.58, -1.692, -1.4311, -0.0174, 1.5882, 0.0349], - ] - time_vec = [ - Duration(sec=4, nanosec=0), - Duration(sec=8, nanosec=0), - Duration(sec=12, nanosec=0), - ] - goal_tolerance = [ - JointTolerance(position=0.01, velocity=5e-5, name=tf_prefix + ROBOT_JOINTS[i]) - for i in range(len(ROBOT_JOINTS)) - ] - goal_time_tolerance = Duration(sec=1, nanosec=0) - test_trajectory = zip(time_vec, waypts) - trajectory = JointTrajectory( - points=[ - JointTrajectoryPoint(positions=pos, time_from_start=times) - for (times, pos) in test_trajectory - ], - joint_names=[tf_prefix + ROBOT_JOINTS[i] for i in range(len(ROBOT_JOINTS))], - ) - goal_handle = self._passthrough_forward_joint_trajectory.send_goal( - trajectory=trajectory, - goal_time_tolerance=goal_time_tolerance, - goal_tolerance=goal_tolerance, - ) - self.assertTrue(goal_handle.accepted) - if goal_handle.accepted: - result = self._passthrough_forward_joint_trajectory.get_result( - goal_handle, TIMEOUT_EXECUTE_TRAJECTORY - ) - self.assertEqual(result.error_code, FollowJointTrajectory.Result.SUCCESSFUL) - - # Full quintic trajectory - test_trajectory = zip(time_vec, waypts) - trajectory = JointTrajectory( - points=[ - JointTrajectoryPoint( - positions=pos, - time_from_start=times, - velocities=[0.0, 0.0, 0.0, 0.0, 0.0, 0.0], - accelerations=[0.0, 0.0, 0.0, 0.0, 0.0, 0.0], - ) - for (times, pos) in test_trajectory - ], - joint_names=[tf_prefix + ROBOT_JOINTS[i] for i in range(len(ROBOT_JOINTS))], - ) - goal_handle = self._passthrough_forward_joint_trajectory.send_goal( - trajectory=trajectory, - goal_time_tolerance=goal_time_tolerance, - goal_tolerance=goal_tolerance, - ) - self.assertTrue(goal_handle.accepted) - if goal_handle.accepted: - result = self._passthrough_forward_joint_trajectory.get_result( - goal_handle, TIMEOUT_EXECUTE_TRAJECTORY - ) - self.assertEqual(result.error_code, FollowJointTrajectory.Result.SUCCESSFUL) - - # Test impossible goal tolerance, should fail. - test_trajectory = zip(time_vec, waypts) - trajectory = JointTrajectory( - points=[ - JointTrajectoryPoint(positions=pos, time_from_start=times) - for (times, pos) in test_trajectory - ], - joint_names=[tf_prefix + ROBOT_JOINTS[i] for i in range(len(ROBOT_JOINTS))], - ) - goal_tolerance = [ - JointTolerance(position=0.000000001, name=tf_prefix + ROBOT_JOINTS[i]) - for i in range(len(ROBOT_JOINTS)) - ] - goal_handle = self._passthrough_forward_joint_trajectory.send_goal( - trajectory=trajectory, - goal_time_tolerance=goal_time_tolerance, - goal_tolerance=goal_tolerance, - ) - self.assertTrue(goal_handle.accepted) - if goal_handle.accepted: - result = self._passthrough_forward_joint_trajectory.get_result( - goal_handle, TIMEOUT_EXECUTE_TRAJECTORY - ) - self.assertEqual( - result.error_code, FollowJointTrajectory.Result.GOAL_TOLERANCE_VIOLATED - ) - - # Test impossible goal time - goal_tolerance = [ - JointTolerance(position=0.01, name=tf_prefix + ROBOT_JOINTS[i]) for i in range(6) - ] - goal_time_tolerance.sec = 0 - goal_time_tolerance.nanosec = 10 - goal_handle = self._passthrough_forward_joint_trajectory.send_goal( - trajectory=trajectory, - goal_time_tolerance=goal_time_tolerance, - goal_tolerance=goal_tolerance, - ) - self.assertTrue(goal_handle.accepted) - if goal_handle.accepted: - result = self._passthrough_forward_joint_trajectory.get_result( - goal_handle, TIMEOUT_EXECUTE_TRAJECTORY - ) - self.assertEqual( - result.error_code, FollowJointTrajectory.Result.GOAL_TOLERANCE_VIOLATED - ) - self.assertTrue( - self._controller_manager_interface.switch_controller( - strictness=SwitchController.Request.BEST_EFFORT, - deactivate_controllers=["passthrough_trajectory_controller"], - activate_controllers=["scaled_joint_trajectory_controller"], - ).ok - ) From f0721c019e81933c9876461469f5866825ce896d Mon Sep 17 00:00:00 2001 From: URJala Date: Mon, 21 Jul 2025 08:10:24 +0000 Subject: [PATCH 16/22] Clean up test description generators --- ur_robot_driver/test/test_common.py | 8 ++------ ur_robot_driver/test/test_mock_hardware.py | 9 +++------ 2 files changed, 5 insertions(+), 12 deletions(-) diff --git a/ur_robot_driver/test/test_common.py b/ur_robot_driver/test/test_common.py index 573a29b17..860621967 100644 --- a/ur_robot_driver/test/test_common.py +++ b/ur_robot_driver/test/test_common.py @@ -376,7 +376,6 @@ def generate_mock_hardware_test_description( tf_prefix="", initial_joint_controller="scaled_joint_trajectory_controller", controller_spawner_timeout=TIMEOUT_WAIT_SERVICE_INITIAL, - use_mock_hardware="false", ): ur_type = LaunchConfiguration("ur_type") @@ -390,8 +389,8 @@ def generate_mock_hardware_test_description( "headless_mode": "true", "launch_dashboard_client": "true", "start_joint_controller": "false", - "use_mock_hardware": use_mock_hardware, - "mock_sensor_commands": use_mock_hardware, + "use_mock_hardware": "true", + "mock_sensor_commands": "true", } if tf_prefix: launch_arguments["tf_prefix"] = tf_prefix @@ -412,7 +411,6 @@ def generate_driver_test_description( tf_prefix="", initial_joint_controller="scaled_joint_trajectory_controller", controller_spawner_timeout=TIMEOUT_WAIT_SERVICE_INITIAL, - use_mock_hardware="false", ): ur_type = LaunchConfiguration("ur_type") @@ -425,8 +423,6 @@ def generate_driver_test_description( "headless_mode": "true", "launch_dashboard_client": "true", "start_joint_controller": "false", - "use_mock_hardware": use_mock_hardware, - "mock_sensor_commands": use_mock_hardware, } if tf_prefix: launch_arguments["tf_prefix"] = tf_prefix diff --git a/ur_robot_driver/test/test_mock_hardware.py b/ur_robot_driver/test/test_mock_hardware.py index 77b6a640c..18f95dfe1 100644 --- a/ur_robot_driver/test/test_mock_hardware.py +++ b/ur_robot_driver/test/test_mock_hardware.py @@ -56,11 +56,9 @@ @pytest.mark.launch_test -@launch_testing.parametrize("tf_prefix, use_mock_hardware", [("", "true"), ("my_ur_", "true")]) -def generate_test_description(tf_prefix, use_mock_hardware): - return generate_mock_hardware_test_description( - tf_prefix=tf_prefix, use_mock_hardware=use_mock_hardware - ) +@launch_testing.parametrize("tf_prefix", [(""), ("my_ur_")]) +def generate_test_description(tf_prefix): + return generate_mock_hardware_test_description(tf_prefix=tf_prefix) class RobotDriverTest(unittest.TestCase): @@ -70,7 +68,6 @@ def setUpClass(cls, use_mock_hardware): rclpy.init() cls.node = Node("mock_hardware_test") time.sleep(1) - cls.mock_hardware = use_mock_hardware == "true" cls.init_robot(cls) @classmethod From f2179844841ff5e0056383f15b9a1f93fac34312 Mon Sep 17 00:00:00 2001 From: URJala Date: Mon, 21 Jul 2025 08:56:47 +0000 Subject: [PATCH 17/22] fix config controller test --- ur_robot_driver/test/integration_test_config_controller.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ur_robot_driver/test/integration_test_config_controller.py b/ur_robot_driver/test/integration_test_config_controller.py index 1e29de8e4..9e7e6f12b 100644 --- a/ur_robot_driver/test/integration_test_config_controller.py +++ b/ur_robot_driver/test/integration_test_config_controller.py @@ -97,6 +97,6 @@ def setUp(self): # def test_get_robot_software_version(self): - self.assertNotEqual( - self._configuration_controller_interface.get_robot_software_version().major, 0 + self.assertGreater( + self._configuration_controller_interface.get_robot_software_version().major, 1 ) From 08af3b10cbbe363d1b4ab47e212061d332e4b43d Mon Sep 17 00:00:00 2001 From: URJala Date: Mon, 21 Jul 2025 09:01:58 +0000 Subject: [PATCH 18/22] Clean up all the test setups and remove irrelevant tests --- .../integration_test_config_controller.py | 13 ------- .../test/integration_test_io_controller.py | 15 -------- ...integration_test_passthrough_controller.py | 10 +---- ...ntegration_test_scaled_joint_controller.py | 7 ---- ur_robot_driver/test/test_mock_hardware.py | 38 +++---------------- 5 files changed, 7 insertions(+), 76 deletions(-) diff --git a/ur_robot_driver/test/integration_test_config_controller.py b/ur_robot_driver/test/integration_test_config_controller.py index 9e7e6f12b..4a52c4cd2 100644 --- a/ur_robot_driver/test/integration_test_config_controller.py +++ b/ur_robot_driver/test/integration_test_config_controller.py @@ -35,12 +35,10 @@ import launch_testing import pytest import rclpy -from control_msgs.action import FollowJointTrajectory from rclpy.node import Node sys.path.append(os.path.dirname(__file__)) from test_common import ( # noqa: E402 - ActionInterface, ControllerManagerInterface, DashboardInterface, IoStatusInterface, @@ -76,17 +74,6 @@ def init_robot(self): self._io_status_controller_interface = IoStatusInterface(self.node) self._configuration_controller_interface = ConfigurationInterface(self.node) - self._scaled_follow_joint_trajectory = ActionInterface( - self.node, - "/scaled_joint_trajectory_controller/follow_joint_trajectory", - FollowJointTrajectory, - ) - self._passthrough_forward_joint_trajectory = ActionInterface( - self.node, - "/passthrough_trajectory_controller/follow_joint_trajectory", - FollowJointTrajectory, - ) - def setUp(self): self._dashboard_interface.start_robot() time.sleep(1) diff --git a/ur_robot_driver/test/integration_test_io_controller.py b/ur_robot_driver/test/integration_test_io_controller.py index e1aa83706..e3a54f14e 100644 --- a/ur_robot_driver/test/integration_test_io_controller.py +++ b/ur_robot_driver/test/integration_test_io_controller.py @@ -36,17 +36,14 @@ import launch_testing import pytest import rclpy -from control_msgs.action import FollowJointTrajectory from rclpy.node import Node from ur_msgs.msg import IOStates sys.path.append(os.path.dirname(__file__)) from test_common import ( # noqa: E402 - ActionInterface, ControllerManagerInterface, DashboardInterface, IoStatusInterface, - ConfigurationInterface, generate_driver_test_description, ) @@ -76,18 +73,6 @@ def init_robot(self): self._dashboard_interface = DashboardInterface(self.node) self._controller_manager_interface = ControllerManagerInterface(self.node) self._io_status_controller_interface = IoStatusInterface(self.node) - self._configuration_controller_interface = ConfigurationInterface(self.node) - - self._scaled_follow_joint_trajectory = ActionInterface( - self.node, - "/scaled_joint_trajectory_controller/follow_joint_trajectory", - FollowJointTrajectory, - ) - self._passthrough_forward_joint_trajectory = ActionInterface( - self.node, - "/passthrough_trajectory_controller/follow_joint_trajectory", - FollowJointTrajectory, - ) def setUp(self): self._dashboard_interface.start_robot() diff --git a/ur_robot_driver/test/integration_test_passthrough_controller.py b/ur_robot_driver/test/integration_test_passthrough_controller.py index aa72745ad..6b907bcd5 100644 --- a/ur_robot_driver/test/integration_test_passthrough_controller.py +++ b/ur_robot_driver/test/integration_test_passthrough_controller.py @@ -49,7 +49,6 @@ ControllerManagerInterface, DashboardInterface, IoStatusInterface, - ConfigurationInterface, generate_driver_test_description, ROBOT_JOINTS, TIMEOUT_EXECUTE_TRAJECTORY, @@ -101,13 +100,7 @@ def init_robot(self): self._controller_manager_interface = ControllerManagerInterface(self.node) self._io_status_controller_interface = IoStatusInterface(self.node) - self._configuration_controller_interface = ConfigurationInterface(self.node) - self._scaled_follow_joint_trajectory = ActionInterface( - self.node, - "/scaled_joint_trajectory_controller/follow_joint_trajectory", - FollowJointTrajectory, - ) self._passthrough_forward_joint_trajectory = ActionInterface( self.node, "/passthrough_trajectory_controller/follow_joint_trajectory", @@ -115,8 +108,7 @@ def init_robot(self): ) def setUp(self): - if self._dashboard_interface: - self._dashboard_interface.start_robot() + self._dashboard_interface.start_robot() time.sleep(1) self.assertTrue(self._io_status_controller_interface.resend_robot_program().success) diff --git a/ur_robot_driver/test/integration_test_scaled_joint_controller.py b/ur_robot_driver/test/integration_test_scaled_joint_controller.py index 9e311a079..c2ca19f9f 100755 --- a/ur_robot_driver/test/integration_test_scaled_joint_controller.py +++ b/ur_robot_driver/test/integration_test_scaled_joint_controller.py @@ -48,7 +48,6 @@ ControllerManagerInterface, DashboardInterface, IoStatusInterface, - ConfigurationInterface, generate_driver_test_description, ROBOT_JOINTS, TIMEOUT_EXECUTE_TRAJECTORY, @@ -80,18 +79,12 @@ def init_robot(self): self._dashboard_interface = DashboardInterface(self.node) self._controller_manager_interface = ControllerManagerInterface(self.node) self._io_status_controller_interface = IoStatusInterface(self.node) - self._configuration_controller_interface = ConfigurationInterface(self.node) self._scaled_follow_joint_trajectory = ActionInterface( self.node, "/scaled_joint_trajectory_controller/follow_joint_trajectory", FollowJointTrajectory, ) - self._passthrough_forward_joint_trajectory = ActionInterface( - self.node, - "/passthrough_trajectory_controller/follow_joint_trajectory", - FollowJointTrajectory, - ) def setUp(self): self._dashboard_interface.start_robot() diff --git a/ur_robot_driver/test/test_mock_hardware.py b/ur_robot_driver/test/test_mock_hardware.py index 18f95dfe1..6292fd6bd 100644 --- a/ur_robot_driver/test/test_mock_hardware.py +++ b/ur_robot_driver/test/test_mock_hardware.py @@ -76,8 +76,8 @@ def tearDownClass(cls): cls.node.destroy_node() rclpy.shutdown() + # Connect to all interfaces and actions, even ones we know won't work with mock hardware (Except dashboard) def init_robot(self): - self._dashboard_interface = None self._controller_manager_interface = ControllerManagerInterface(self.node) self._io_status_controller_interface = IoStatusInterface(self.node) self._configuration_controller_interface = ConfigurationInterface(self.node) @@ -88,9 +88,11 @@ def init_robot(self): FollowJointTrajectory, ) - def setUp(self): - time.sleep(1) - self.assertTrue(self._io_status_controller_interface.resend_robot_program().success) + self._passthrough_forward_joint_trajectory = ActionInterface( + self.node, + "/passthrough_trajectory_controller/follow_joint_trajectory", + FollowJointTrajectory, + ) # # Test functions @@ -165,31 +167,3 @@ def test_illegal_trajectory(self, tf_prefix): # Verify the failure is correctly detected self.assertFalse(goal_handle.accepted) - - def test_trajectory_scaled(self, tf_prefix): - """Test robot movement.""" - # Construct test trajectory - test_trajectory = [ - (Duration(sec=6, nanosec=0), [0.0 for j in ROBOT_JOINTS]), - (Duration(sec=6, nanosec=500000000), [-1.0 for j in ROBOT_JOINTS]), - ] - - trajectory = JointTrajectory( - joint_names=[tf_prefix + joint for joint in ROBOT_JOINTS], - points=[ - JointTrajectoryPoint(positions=test_pos, time_from_start=test_time) - for (test_time, test_pos) in test_trajectory - ], - ) - - # Execute trajectory - logging.info("Sending goal for robot to follow") - goal_handle = self._scaled_follow_joint_trajectory.send_goal(trajectory=trajectory) - self.assertTrue(goal_handle.accepted) - - # Verify execution - result = self._scaled_follow_joint_trajectory.get_result( - goal_handle, - TIMEOUT_EXECUTE_TRAJECTORY, - ) - self.assertEqual(result.error_code, FollowJointTrajectory.Result.SUCCESSFUL) From 782264f33fa95657d2a94b722a840704f99afebd Mon Sep 17 00:00:00 2001 From: URJala Date: Mon, 21 Jul 2025 10:50:13 +0000 Subject: [PATCH 19/22] remove mock hardware parameter from setUpClass --- ur_robot_driver/test/test_mock_hardware.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ur_robot_driver/test/test_mock_hardware.py b/ur_robot_driver/test/test_mock_hardware.py index 6292fd6bd..81dc01797 100644 --- a/ur_robot_driver/test/test_mock_hardware.py +++ b/ur_robot_driver/test/test_mock_hardware.py @@ -63,7 +63,7 @@ def generate_test_description(tf_prefix): class RobotDriverTest(unittest.TestCase): @classmethod - def setUpClass(cls, use_mock_hardware): + def setUpClass(cls): # Initialize the ROS context rclpy.init() cls.node = Node("mock_hardware_test") From 08c71ed79ca44ede92baf32bb87fabf63ac39cfb Mon Sep 17 00:00:00 2001 From: URJala Date: Mon, 21 Jul 2025 11:34:15 +0000 Subject: [PATCH 20/22] Move sjtc trajectory and illegal trajectory test to test_common, so it can be used by mock hardware test as well. --- ...ntegration_test_scaled_joint_controller.py | 71 +---------------- ur_robot_driver/test/test_common.py | 76 +++++++++++++++++++ ur_robot_driver/test/test_mock_hardware.py | 71 +++-------------- 3 files changed, 92 insertions(+), 126 deletions(-) diff --git a/ur_robot_driver/test/integration_test_scaled_joint_controller.py b/ur_robot_driver/test/integration_test_scaled_joint_controller.py index c2ca19f9f..7c651cf30 100755 --- a/ur_robot_driver/test/integration_test_scaled_joint_controller.py +++ b/ur_robot_driver/test/integration_test_scaled_joint_controller.py @@ -51,6 +51,8 @@ generate_driver_test_description, ROBOT_JOINTS, TIMEOUT_EXECUTE_TRAJECTORY, + sjtc_trajectory_test, + sjtc_illegal_trajectory_test, ) @@ -104,75 +106,10 @@ def test_start_scaled_jtc_controller(self): ) def test_trajectory(self, tf_prefix): - """Test robot movement.""" - self.assertTrue( - self._controller_manager_interface.switch_controller( - strictness=SwitchController.Request.BEST_EFFORT, - deactivate_controllers=["passthrough_trajectory_controller"], - activate_controllers=["scaled_joint_trajectory_controller"], - ).ok - ) - # Construct test trajectory - test_trajectory = [ - (Duration(sec=6, nanosec=0), [0.0 for j in ROBOT_JOINTS]), - (Duration(sec=9, nanosec=0), [-0.5 for j in ROBOT_JOINTS]), - (Duration(sec=12, nanosec=0), [-1.0 for j in ROBOT_JOINTS]), - ] - - trajectory = JointTrajectory( - joint_names=[tf_prefix + joint for joint in ROBOT_JOINTS], - points=[ - JointTrajectoryPoint(positions=test_pos, time_from_start=test_time) - for (test_time, test_pos) in test_trajectory - ], - ) - - # Sending trajectory goal - logging.info("Sending simple goal") - goal_handle = self._scaled_follow_joint_trajectory.send_goal(trajectory=trajectory) - self.assertTrue(goal_handle.accepted) - - # Verify execution - result = self._scaled_follow_joint_trajectory.get_result( - goal_handle, TIMEOUT_EXECUTE_TRAJECTORY - ) - self.assertEqual(result.error_code, FollowJointTrajectory.Result.SUCCESSFUL) + sjtc_trajectory_test(self, tf_prefix) def test_illegal_trajectory(self, tf_prefix): - """ - Test trajectory server. - - This is more of a validation test that the testing suite does the right thing - """ - self.assertTrue( - self._controller_manager_interface.switch_controller( - strictness=SwitchController.Request.BEST_EFFORT, - deactivate_controllers=["passthrough_trajectory_controller"], - activate_controllers=["scaled_joint_trajectory_controller"], - ).ok - ) - # Construct test trajectory, the second point wrongly starts before the first - test_trajectory = [ - (Duration(sec=6, nanosec=0), [0.0 for j in ROBOT_JOINTS]), - (Duration(sec=3, nanosec=0), [-0.5 for j in ROBOT_JOINTS]), - ] - - trajectory = JointTrajectory( - joint_names=[tf_prefix + joint for joint in ROBOT_JOINTS], - points=[ - JointTrajectoryPoint(positions=test_pos, time_from_start=test_time) - for (test_time, test_pos) in test_trajectory - ], - ) - - # Send illegal goal - logging.info("Sending illegal goal") - goal_handle = self._scaled_follow_joint_trajectory.send_goal( - trajectory=trajectory, - ) - - # Verify the failure is correctly detected - self.assertFalse(goal_handle.accepted) + sjtc_illegal_trajectory_test(self, tf_prefix) def test_trajectory_scaled(self, tf_prefix): """Test robot movement.""" diff --git a/ur_robot_driver/test/test_common.py b/ur_robot_driver/test/test_common.py index 860621967..9ecd72157 100644 --- a/ur_robot_driver/test/test_common.py +++ b/ur_robot_driver/test/test_common.py @@ -58,6 +58,9 @@ Load, ) from ur_msgs.srv import SetIO, GetRobotSoftwareVersion, SetForceMode +from builtin_interfaces.msg import Duration +from control_msgs.action import FollowJointTrajectory +from trajectory_msgs.msg import JointTrajectory, JointTrajectoryPoint TIMEOUT_WAIT_SERVICE = 10 TIMEOUT_WAIT_SERVICE_INITIAL = 120 # If we download the docker image simultaneously to the tests, it can take quite some time until the dashboard server is reachable and usable. @@ -302,6 +305,79 @@ class ForceModeInterface( pass +def sjtc_trajectory_test(tester, tf_prefix): + """Test robot movement.""" + tester.assertTrue( + tester._controller_manager_interface.switch_controller( + strictness=SwitchController.Request.BEST_EFFORT, + deactivate_controllers=["passthrough_trajectory_controller"], + activate_controllers=["scaled_joint_trajectory_controller"], + ).ok + ) + # Construct test trajectory + test_trajectory = [ + (Duration(sec=6, nanosec=0), [0.0 for j in ROBOT_JOINTS]), + (Duration(sec=9, nanosec=0), [-0.5 for j in ROBOT_JOINTS]), + (Duration(sec=12, nanosec=0), [-1.0 for j in ROBOT_JOINTS]), + ] + + trajectory = JointTrajectory( + joint_names=[tf_prefix + joint for joint in ROBOT_JOINTS], + points=[ + JointTrajectoryPoint(positions=test_pos, time_from_start=test_time) + for (test_time, test_pos) in test_trajectory + ], + ) + + # Sending trajectory goal + logging.info("Sending simple goal") + goal_handle = tester._scaled_follow_joint_trajectory.send_goal(trajectory=trajectory) + tester.assertTrue(goal_handle.accepted) + + # Verify execution + result = tester._scaled_follow_joint_trajectory.get_result( + goal_handle, TIMEOUT_EXECUTE_TRAJECTORY + ) + tester.assertEqual(result.error_code, FollowJointTrajectory.Result.SUCCESSFUL) + + +def sjtc_illegal_trajectory_test(tester, tf_prefix): + """ + Test trajectory server. + + This is more of a validation test that the testing suite does the right thing + """ + tester.assertTrue( + tester._controller_manager_interface.switch_controller( + strictness=SwitchController.Request.BEST_EFFORT, + deactivate_controllers=["passthrough_trajectory_controller"], + activate_controllers=["scaled_joint_trajectory_controller"], + ).ok + ) + # Construct test trajectory, the second point wrongly starts before the first + test_trajectory = [ + (Duration(sec=6, nanosec=0), [0.0 for j in ROBOT_JOINTS]), + (Duration(sec=3, nanosec=0), [-0.5 for j in ROBOT_JOINTS]), + ] + + trajectory = JointTrajectory( + joint_names=[tf_prefix + joint for joint in ROBOT_JOINTS], + points=[ + JointTrajectoryPoint(positions=test_pos, time_from_start=test_time) + for (test_time, test_pos) in test_trajectory + ], + ) + + # Send illegal goal + logging.info("Sending illegal goal") + goal_handle = tester._scaled_follow_joint_trajectory.send_goal( + trajectory=trajectory, + ) + + # Verify the failure is correctly detected + tester.assertFalse(goal_handle.accepted) + + def _declare_launch_arguments(): declared_arguments = [] diff --git a/ur_robot_driver/test/test_mock_hardware.py b/ur_robot_driver/test/test_mock_hardware.py index 81dc01797..2bf36b6fd 100644 --- a/ur_robot_driver/test/test_mock_hardware.py +++ b/ur_robot_driver/test/test_mock_hardware.py @@ -31,17 +31,13 @@ import sys import time import unittest -import logging import launch_testing import pytest import rclpy from rclpy.node import Node -from builtin_interfaces.msg import Duration from control_msgs.action import FollowJointTrajectory from controller_manager_msgs.srv import SwitchController -from trajectory_msgs.msg import JointTrajectory, JointTrajectoryPoint - sys.path.append(os.path.dirname(__file__)) from test_common import ( # noqa: E402 @@ -50,8 +46,8 @@ IoStatusInterface, ConfigurationInterface, generate_mock_hardware_test_description, - ROBOT_JOINTS, - TIMEOUT_EXECUTE_TRAJECTORY, + sjtc_trajectory_test, + sjtc_illegal_trajectory_test, ) @@ -104,66 +100,23 @@ def test_get_robot_software_version(self): ) def test_start_scaled_jtc_controller(self): + # Deactivate controller, if it is not already self.assertTrue( self._controller_manager_interface.switch_controller( strictness=SwitchController.Request.BEST_EFFORT, + deactivate_controllers=["scaled_joint_trajectory_controller"], + ).ok + ) + # Activate controller + self.assertTrue( + self._controller_manager_interface.switch_controller( + strictness=SwitchController.Request.STRICT, activate_controllers=["scaled_joint_trajectory_controller"], ).ok ) def test_trajectory(self, tf_prefix): - """Test robot movement.""" - # Construct test trajectory - test_trajectory = [ - (Duration(sec=6, nanosec=0), [0.0 for j in ROBOT_JOINTS]), - (Duration(sec=9, nanosec=0), [-0.5 for j in ROBOT_JOINTS]), - (Duration(sec=12, nanosec=0), [-1.0 for j in ROBOT_JOINTS]), - ] - - trajectory = JointTrajectory( - joint_names=[tf_prefix + joint for joint in ROBOT_JOINTS], - points=[ - JointTrajectoryPoint(positions=test_pos, time_from_start=test_time) - for (test_time, test_pos) in test_trajectory - ], - ) - - # Sending trajectory goal - logging.info("Sending simple goal") - goal_handle = self._scaled_follow_joint_trajectory.send_goal(trajectory=trajectory) - self.assertTrue(goal_handle.accepted) - - # Verify execution - result = self._scaled_follow_joint_trajectory.get_result( - goal_handle, TIMEOUT_EXECUTE_TRAJECTORY - ) - self.assertEqual(result.error_code, FollowJointTrajectory.Result.SUCCESSFUL) + sjtc_trajectory_test(self, tf_prefix) def test_illegal_trajectory(self, tf_prefix): - """ - Test trajectory server. - - This is more of a validation test that the testing suite does the right thing - """ - # Construct test trajectory, the second point wrongly starts before the first - test_trajectory = [ - (Duration(sec=6, nanosec=0), [0.0 for j in ROBOT_JOINTS]), - (Duration(sec=3, nanosec=0), [-0.5 for j in ROBOT_JOINTS]), - ] - - trajectory = JointTrajectory( - joint_names=[tf_prefix + joint for joint in ROBOT_JOINTS], - points=[ - JointTrajectoryPoint(positions=test_pos, time_from_start=test_time) - for (test_time, test_pos) in test_trajectory - ], - ) - - # Send illegal goal - logging.info("Sending illegal goal") - goal_handle = self._scaled_follow_joint_trajectory.send_goal( - trajectory=trajectory, - ) - - # Verify the failure is correctly detected - self.assertFalse(goal_handle.accepted) + sjtc_illegal_trajectory_test(self, tf_prefix) From 238c04ef3f022f7a96eea523f93d8a27f533c504 Mon Sep 17 00:00:00 2001 From: URJala <159417921+URJala@users.noreply.github.com> Date: Wed, 30 Jul 2025 12:23:20 +0200 Subject: [PATCH 21/22] Apply suggestions from code review Co-authored-by: Felix Exner --- ur_robot_driver/test/integration_test_config_controller.py | 4 ++-- ur_robot_driver/test/integration_test_io_controller.py | 4 ++-- .../test/integration_test_passthrough_controller.py | 4 ++-- .../test/integration_test_scaled_joint_controller.py | 4 ++-- ur_robot_driver/test/test_mock_hardware.py | 2 +- 5 files changed, 9 insertions(+), 9 deletions(-) diff --git a/ur_robot_driver/test/integration_test_config_controller.py b/ur_robot_driver/test/integration_test_config_controller.py index 4a52c4cd2..ae427d881 100644 --- a/ur_robot_driver/test/integration_test_config_controller.py +++ b/ur_robot_driver/test/integration_test_config_controller.py @@ -53,12 +53,12 @@ def generate_test_description(tf_prefix): return generate_driver_test_description(tf_prefix=tf_prefix) -class RobotDriverTest(unittest.TestCase): +class ConfigControllerTest(unittest.TestCase): @classmethod def setUpClass(cls): # Initialize the ROS context rclpy.init() - cls.node = Node("robot_driver_test") + cls.node = Node("config_controller_test") time.sleep(1) cls.init_robot(cls) diff --git a/ur_robot_driver/test/integration_test_io_controller.py b/ur_robot_driver/test/integration_test_io_controller.py index e3a54f14e..0856fd9fb 100644 --- a/ur_robot_driver/test/integration_test_io_controller.py +++ b/ur_robot_driver/test/integration_test_io_controller.py @@ -54,12 +54,12 @@ def generate_test_description(tf_prefix): return generate_driver_test_description(tf_prefix=tf_prefix) -class RobotDriverTest(unittest.TestCase): +class IOControllerTest(unittest.TestCase): @classmethod def setUpClass(cls): # Initialize the ROS context rclpy.init() - cls.node = Node("robot_driver_test") + cls.node = Node("io_controller_test") time.sleep(1) cls.init_robot(cls) diff --git a/ur_robot_driver/test/integration_test_passthrough_controller.py b/ur_robot_driver/test/integration_test_passthrough_controller.py index 6b907bcd5..edfa3b90e 100644 --- a/ur_robot_driver/test/integration_test_passthrough_controller.py +++ b/ur_robot_driver/test/integration_test_passthrough_controller.py @@ -79,12 +79,12 @@ def generate_test_description(tf_prefix): TEST_TRAJECTORY = [(time_vec[i], waypts[i]) for i in range(len(waypts))] -class RobotDriverTest(unittest.TestCase): +class PassthroughControllerTest(unittest.TestCase): @classmethod def setUpClass(cls): # Initialize the ROS context rclpy.init() - cls.node = Node("robot_driver_test") + cls.node = Node("passthrough_controller_test") time.sleep(1) cls.init_robot(cls) diff --git a/ur_robot_driver/test/integration_test_scaled_joint_controller.py b/ur_robot_driver/test/integration_test_scaled_joint_controller.py index 7c651cf30..809465012 100755 --- a/ur_robot_driver/test/integration_test_scaled_joint_controller.py +++ b/ur_robot_driver/test/integration_test_scaled_joint_controller.py @@ -62,12 +62,12 @@ def generate_test_description(tf_prefix): return generate_driver_test_description(tf_prefix=tf_prefix) -class RobotDriverTest(unittest.TestCase): +class SJTCTest(unittest.TestCase): @classmethod def setUpClass(cls): # Initialize the ROS context rclpy.init() - cls.node = Node("robot_driver_test") + cls.node = Node("sjtc_test") time.sleep(1) cls.init_robot(cls) diff --git a/ur_robot_driver/test/test_mock_hardware.py b/ur_robot_driver/test/test_mock_hardware.py index 2bf36b6fd..0dfebec6a 100644 --- a/ur_robot_driver/test/test_mock_hardware.py +++ b/ur_robot_driver/test/test_mock_hardware.py @@ -57,7 +57,7 @@ def generate_test_description(tf_prefix): return generate_mock_hardware_test_description(tf_prefix=tf_prefix) -class RobotDriverTest(unittest.TestCase): +class MockHWTest(unittest.TestCase): @classmethod def setUpClass(cls): # Initialize the ROS context From 3201f733fcde827cd6d1f62d651a214eeefd0d30 Mon Sep 17 00:00:00 2001 From: Jacob Larsen Date: Wed, 30 Jul 2025 10:32:55 +0000 Subject: [PATCH 22/22] Apply renaming to the last few test classes --- ur_robot_driver/test/integration_test_controller_switch.py | 4 ++-- ur_robot_driver/test/integration_test_force_mode.py | 4 ++-- ur_robot_driver/test/integration_test_tool_contact.py | 4 ++-- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/ur_robot_driver/test/integration_test_controller_switch.py b/ur_robot_driver/test/integration_test_controller_switch.py index 2ef097f04..8bfd8ac7b 100644 --- a/ur_robot_driver/test/integration_test_controller_switch.py +++ b/ur_robot_driver/test/integration_test_controller_switch.py @@ -65,12 +65,12 @@ def generate_test_description(tf_prefix): return generate_driver_test_description(tf_prefix=tf_prefix) -class RobotDriverTest(unittest.TestCase): +class ControllerSwitchTest(unittest.TestCase): @classmethod def setUpClass(cls): # Initialize the ROS context rclpy.init() - cls.node = Node("robot_driver_test") + cls.node = Node("controller_switching_test") time.sleep(1) cls.init_robot(cls) diff --git a/ur_robot_driver/test/integration_test_force_mode.py b/ur_robot_driver/test/integration_test_force_mode.py index 02520e0ad..aaa9bb69f 100644 --- a/ur_robot_driver/test/integration_test_force_mode.py +++ b/ur_robot_driver/test/integration_test_force_mode.py @@ -88,12 +88,12 @@ def generate_test_description(tf_prefix): return generate_driver_test_description(tf_prefix=tf_prefix) -class RobotDriverTest(unittest.TestCase): +class ForceModeTest(unittest.TestCase): @classmethod def setUpClass(cls): # Initialize the ROS context rclpy.init() - cls.node = Node("robot_driver_test") + cls.node = Node("force_mode_test") time.sleep(1) cls.init_robot(cls) diff --git a/ur_robot_driver/test/integration_test_tool_contact.py b/ur_robot_driver/test/integration_test_tool_contact.py index db6c60817..2cce70cf4 100644 --- a/ur_robot_driver/test/integration_test_tool_contact.py +++ b/ur_robot_driver/test/integration_test_tool_contact.py @@ -62,12 +62,12 @@ def generate_test_description(tf_prefix): return generate_driver_test_description(tf_prefix=tf_prefix) -class RobotDriverTest(unittest.TestCase): +class ToolContactTest(unittest.TestCase): @classmethod def setUpClass(cls): # Initialize the ROS context rclpy.init() - cls.node = Node("robot_driver_test") + cls.node = Node("tool_contact_test") time.sleep(1) cls.init_robot(cls)