diff --git a/ur_robot_driver/CMakeLists.txt b/ur_robot_driver/CMakeLists.txt index 1a76a5cab..156d1c358 100644 --- a/ur_robot_driver/CMakeLists.txt +++ b/ur_robot_driver/CMakeLists.txt @@ -230,6 +230,11 @@ if(BUILD_TESTING) find_package(ur_msgs REQUIRED) find_package(launch_testing_ament_cmake) + add_launch_test(test/test_mock_hardware.py + TIMEOUT + 800 + ) + if(${UR_ROBOT_DRIVER_BUILD_INTEGRATION_TESTS}) add_launch_test(test/launch_args.py TIMEOUT @@ -243,7 +248,7 @@ if(BUILD_TESTING) TIMEOUT 180 ) - add_launch_test(test/robot_driver.py + add_launch_test(test/integration_test_scaled_joint_controller.py TIMEOUT 800 ) @@ -259,6 +264,18 @@ if(BUILD_TESTING) TIMEOUT 500 ) + add_launch_test(test/integration_test_config_controller.py + TIMEOUT + 800 + ) + add_launch_test(test/integration_test_passthrough_controller.py + TIMEOUT + 800 + ) + add_launch_test(test/integration_test_io_controller.py + TIMEOUT + 800 + ) add_launch_test(test/integration_test_tool_contact.py TIMEOUT 800 diff --git a/ur_robot_driver/test/integration_test_config_controller.py b/ur_robot_driver/test/integration_test_config_controller.py new file mode 100644 index 000000000..4a52c4cd2 --- /dev/null +++ b/ur_robot_driver/test/integration_test_config_controller.py @@ -0,0 +1,89 @@ +#!/usr/bin/env python +# Copyright 2025, Universal Robots A/S +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# +# * Redistributions in binary form must reproduce the above copyright +# notice, this list of conditions and the following disclaimer in the +# documentation and/or other materials provided with the distribution. +# +# * Neither the name of the {copyright_holder} nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE +# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +# POSSIBILITY OF SUCH DAMAGE. + +import os +import sys +import time +import unittest + +import launch_testing +import pytest +import rclpy +from rclpy.node import Node + +sys.path.append(os.path.dirname(__file__)) +from test_common import ( # noqa: E402 + ControllerManagerInterface, + DashboardInterface, + IoStatusInterface, + ConfigurationInterface, + generate_driver_test_description, +) + + +@pytest.mark.launch_test +@launch_testing.parametrize("tf_prefix", [(""), ("my_ur_")]) +def generate_test_description(tf_prefix): + return generate_driver_test_description(tf_prefix=tf_prefix) + + +class RobotDriverTest(unittest.TestCase): + @classmethod + def setUpClass(cls): + # Initialize the ROS context + rclpy.init() + cls.node = Node("robot_driver_test") + time.sleep(1) + cls.init_robot(cls) + + @classmethod + def tearDownClass(cls): + # Shutdown the ROS context + cls.node.destroy_node() + rclpy.shutdown() + + def init_robot(self): + self._dashboard_interface = DashboardInterface(self.node) + self._controller_manager_interface = ControllerManagerInterface(self.node) + self._io_status_controller_interface = IoStatusInterface(self.node) + self._configuration_controller_interface = ConfigurationInterface(self.node) + + def setUp(self): + self._dashboard_interface.start_robot() + time.sleep(1) + self.assertTrue(self._io_status_controller_interface.resend_robot_program().success) + + # + # Test functions + # + + def test_get_robot_software_version(self): + self.assertGreater( + self._configuration_controller_interface.get_robot_software_version().major, 1 + ) diff --git a/ur_robot_driver/test/integration_test_controller_switch.py b/ur_robot_driver/test/integration_test_controller_switch.py index 58e11a1e4..2ef097f04 100644 --- a/ur_robot_driver/test/integration_test_controller_switch.py +++ b/ur_robot_driver/test/integration_test_controller_switch.py @@ -60,11 +60,7 @@ @pytest.mark.launch_test -@launch_testing.parametrize( - "tf_prefix", - [""], - # [(""), ("my_ur_")], -) +@launch_testing.parametrize("tf_prefix", [(""), ("my_ur_")]) def generate_test_description(tf_prefix): return generate_driver_test_description(tf_prefix=tf_prefix) diff --git a/ur_robot_driver/test/integration_test_io_controller.py b/ur_robot_driver/test/integration_test_io_controller.py new file mode 100644 index 000000000..e3a54f14e --- /dev/null +++ b/ur_robot_driver/test/integration_test_io_controller.py @@ -0,0 +1,132 @@ +#!/usr/bin/env python +# Copyright 2025, Universal Robots A/S +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# +# * Redistributions in binary form must reproduce the above copyright +# notice, this list of conditions and the following disclaimer in the +# documentation and/or other materials provided with the distribution. +# +# * Neither the name of the {copyright_holder} nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE +# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +# POSSIBILITY OF SUCH DAMAGE. + +import logging +import os +import sys +import time +import unittest + +import launch_testing +import pytest +import rclpy +from rclpy.node import Node +from ur_msgs.msg import IOStates + +sys.path.append(os.path.dirname(__file__)) +from test_common import ( # noqa: E402 + ControllerManagerInterface, + DashboardInterface, + IoStatusInterface, + generate_driver_test_description, +) + + +@pytest.mark.launch_test +@launch_testing.parametrize("tf_prefix", [(""), ("my_ur_")]) +def generate_test_description(tf_prefix): + return generate_driver_test_description(tf_prefix=tf_prefix) + + +class RobotDriverTest(unittest.TestCase): + @classmethod + def setUpClass(cls): + # Initialize the ROS context + rclpy.init() + cls.node = Node("robot_driver_test") + time.sleep(1) + cls.init_robot(cls) + + @classmethod + def tearDownClass(cls): + # Shutdown the ROS context + cls.node.destroy_node() + rclpy.shutdown() + + def init_robot(self): + self._dashboard_interface = DashboardInterface(self.node) + self._controller_manager_interface = ControllerManagerInterface(self.node) + self._io_status_controller_interface = IoStatusInterface(self.node) + + def setUp(self): + self._dashboard_interface.start_robot() + time.sleep(1) + self.assertTrue(self._io_status_controller_interface.resend_robot_program().success) + + # + # Test functions + # + + def test_set_io(self): + """Test to set an IO and check whether it has been set.""" + # Create io callback to verify result + io_msg = None + + def io_msg_cb(msg): + nonlocal io_msg + io_msg = msg + + io_states_sub = self.node.create_subscription( + IOStates, + "/io_and_status_controller/io_states", + io_msg_cb, + rclpy.qos.qos_profile_system_default, + ) + + # Set pin 0 to 1.0 + test_pin = 0 + + logging.info("Setting pin %d to 1.0", test_pin) + self._io_status_controller_interface.set_io(fun=1, pin=test_pin, state=1.0) + + # Wait until the pin state has changed + pin_state = False + end_time = time.time() + 5 + while not pin_state and time.time() < end_time: + rclpy.spin_once(self.node, timeout_sec=0.1) + if io_msg is not None: + pin_state = io_msg.digital_out_states[test_pin].state + + self.assertEqual(pin_state, 1.0) + + # Set pin 0 to 0.0 + logging.info("Setting pin %d to 0.0", test_pin) + self._io_status_controller_interface.set_io(fun=1, pin=test_pin, state=0.0) + + # Wait until the pin state has changed back + end_time = time.time() + 5 + while pin_state and time.time() < end_time: + rclpy.spin_once(self.node, timeout_sec=0.1) + if io_msg is not None: + pin_state = io_msg.digital_out_states[test_pin].state + + self.assertEqual(pin_state, 0.0) + + # Clean up io subscription + self.node.destroy_subscription(io_states_sub) diff --git a/ur_robot_driver/test/integration_test_passthrough_controller.py b/ur_robot_driver/test/integration_test_passthrough_controller.py new file mode 100644 index 000000000..6b907bcd5 --- /dev/null +++ b/ur_robot_driver/test/integration_test_passthrough_controller.py @@ -0,0 +1,272 @@ +#!/usr/bin/env python +# Copyright 2025, Universal Robots A/S +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# +# * Redistributions in binary form must reproduce the above copyright +# notice, this list of conditions and the following disclaimer in the +# documentation and/or other materials provided with the distribution. +# +# * Neither the name of the {copyright_holder} nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE +# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +# POSSIBILITY OF SUCH DAMAGE. + +import os +import sys +import time +import unittest + +from math import pi +import launch_testing +import pytest +import rclpy +from builtin_interfaces.msg import Duration +from control_msgs.action import FollowJointTrajectory +from control_msgs.msg import JointTolerance +from controller_manager_msgs.srv import SwitchController +from rclpy.node import Node +from trajectory_msgs.msg import JointTrajectory, JointTrajectoryPoint + +sys.path.append(os.path.dirname(__file__)) +from test_common import ( # noqa: E402 + ActionInterface, + ControllerManagerInterface, + DashboardInterface, + IoStatusInterface, + generate_driver_test_description, + ROBOT_JOINTS, + TIMEOUT_EXECUTE_TRAJECTORY, +) + + +# Mock hardware does not work with passthrough controller, so dont test with it +@pytest.mark.launch_test +@launch_testing.parametrize("tf_prefix", [(""), ("my_ur_")]) +def generate_test_description(tf_prefix): + return generate_driver_test_description(tf_prefix=tf_prefix) + + +HOME = { + "elbow_joint": 0.0, + "shoulder_lift_joint": -1.5708, + "shoulder_pan_joint": 0.0, + "wrist_1_joint": -1.5708, + "wrist_2_joint": 0.0, + "wrist_3_joint": 0.0, +} +waypts = [[HOME[joint] + i * pi / 4 for joint in ROBOT_JOINTS] for i in [0, -1, 1]] +time_vec = [ + Duration(sec=4, nanosec=0), + Duration(sec=8, nanosec=0), + Duration(sec=12, nanosec=0), +] +TEST_TRAJECTORY = [(time_vec[i], waypts[i]) for i in range(len(waypts))] + + +class RobotDriverTest(unittest.TestCase): + @classmethod + def setUpClass(cls): + # Initialize the ROS context + rclpy.init() + cls.node = Node("robot_driver_test") + time.sleep(1) + cls.init_robot(cls) + + @classmethod + def tearDownClass(cls): + # Shutdown the ROS context + cls.node.destroy_node() + rclpy.shutdown() + + def init_robot(self): + + self._dashboard_interface = DashboardInterface(self.node) + + self._controller_manager_interface = ControllerManagerInterface(self.node) + self._io_status_controller_interface = IoStatusInterface(self.node) + + self._passthrough_forward_joint_trajectory = ActionInterface( + self.node, + "/passthrough_trajectory_controller/follow_joint_trajectory", + FollowJointTrajectory, + ) + + def setUp(self): + self._dashboard_interface.start_robot() + time.sleep(1) + self.assertTrue(self._io_status_controller_interface.resend_robot_program().success) + + # + # Test functions + # + + def test_start_passthrough_controller(self): + self.assertTrue( + self._controller_manager_interface.switch_controller( + strictness=SwitchController.Request.BEST_EFFORT, + activate_controllers=["passthrough_trajectory_controller"], + deactivate_controllers=["scaled_joint_trajectory_controller"], + ).ok + ) + + def test_passthrough_trajectory(self, tf_prefix): + self.assertTrue( + self._controller_manager_interface.switch_controller( + strictness=SwitchController.Request.BEST_EFFORT, + activate_controllers=["passthrough_trajectory_controller"], + deactivate_controllers=["scaled_joint_trajectory_controller"], + ).ok + ) + + goal_tolerance = [ + JointTolerance(position=0.01, name=tf_prefix + joint) for joint in ROBOT_JOINTS + ] + goal_time_tolerance = Duration(sec=1, nanosec=0) + trajectory = JointTrajectory( + points=[ + JointTrajectoryPoint(positions=pos, time_from_start=times) + for (times, pos) in TEST_TRAJECTORY + ], + joint_names=[tf_prefix + joint for joint in ROBOT_JOINTS], + ) + goal_handle = self._passthrough_forward_joint_trajectory.send_goal( + trajectory=trajectory, + goal_time_tolerance=goal_time_tolerance, + goal_tolerance=goal_tolerance, + ) + self.assertTrue(goal_handle.accepted) + if goal_handle.accepted: + result = self._passthrough_forward_joint_trajectory.get_result( + goal_handle, TIMEOUT_EXECUTE_TRAJECTORY + ) + self.assertEqual(result.error_code, FollowJointTrajectory.Result.SUCCESSFUL) + + def test_quintic_trajectory(self, tf_prefix): + # Full quintic trajectory + self.assertTrue( + self._controller_manager_interface.switch_controller( + strictness=SwitchController.Request.BEST_EFFORT, + activate_controllers=["passthrough_trajectory_controller"], + deactivate_controllers=["scaled_joint_trajectory_controller"], + ).ok + ) + trajectory = JointTrajectory( + points=[ + JointTrajectoryPoint( + positions=pos, + time_from_start=times, + velocities=[0.0, 0.0, 0.0, 0.0, 0.0, 0.0], + accelerations=[0.0, 0.0, 0.0, 0.0, 0.0, 0.0], + ) + for (times, pos) in TEST_TRAJECTORY + ], + joint_names=[tf_prefix + joint for joint in ROBOT_JOINTS], + ) + goal_time_tolerance = Duration(sec=1, nanosec=0) + goal_tolerance = [ + JointTolerance(position=0.01, name=tf_prefix + joint) for joint in ROBOT_JOINTS + ] + goal_handle = self._passthrough_forward_joint_trajectory.send_goal( + trajectory=trajectory, + goal_time_tolerance=goal_time_tolerance, + goal_tolerance=goal_tolerance, + ) + + self.assertTrue(goal_handle.accepted) + if goal_handle.accepted: + result = self._passthrough_forward_joint_trajectory.get_result( + goal_handle, TIMEOUT_EXECUTE_TRAJECTORY + ) + self.assertEqual(result.error_code, FollowJointTrajectory.Result.SUCCESSFUL) + + def test_impossible_goal_tolerance_fails(self, tf_prefix): + # Test impossible goal tolerance, should fail. + self.assertTrue( + self._controller_manager_interface.switch_controller( + strictness=SwitchController.Request.BEST_EFFORT, + activate_controllers=["passthrough_trajectory_controller"], + deactivate_controllers=["scaled_joint_trajectory_controller"], + ).ok + ) + trajectory = JointTrajectory( + points=[ + JointTrajectoryPoint(positions=pos, time_from_start=times) + for (times, pos) in TEST_TRAJECTORY + ], + joint_names=[tf_prefix + joint for joint in ROBOT_JOINTS], + ) + goal_tolerance = [ + JointTolerance(position=0.000000001, name=tf_prefix + joint) for joint in ROBOT_JOINTS + ] + goal_time_tolerance = Duration(sec=1, nanosec=0) + goal_handle = self._passthrough_forward_joint_trajectory.send_goal( + trajectory=trajectory, + goal_time_tolerance=goal_time_tolerance, + goal_tolerance=goal_tolerance, + ) + self.assertTrue(goal_handle.accepted) + if goal_handle.accepted: + result = self._passthrough_forward_joint_trajectory.get_result( + goal_handle, TIMEOUT_EXECUTE_TRAJECTORY + ) + self.assertEqual( + result.error_code, FollowJointTrajectory.Result.GOAL_TOLERANCE_VIOLATED + ) + + def test_impossible_goal_time_tolerance_fails(self, tf_prefix): + # Test impossible goal time + self.assertTrue( + self._controller_manager_interface.switch_controller( + strictness=SwitchController.Request.BEST_EFFORT, + activate_controllers=["passthrough_trajectory_controller"], + deactivate_controllers=["scaled_joint_trajectory_controller"], + ).ok + ) + + goal_tolerance = [ + JointTolerance(position=0.01, name=tf_prefix + joint) for joint in ROBOT_JOINTS + ] + goal_time_tolerance = Duration(sec=0, nanosec=10) + trajectory = JointTrajectory( + points=[ + JointTrajectoryPoint(positions=pos, time_from_start=times) + for (times, pos) in TEST_TRAJECTORY + ], + joint_names=[tf_prefix + joint for joint in ROBOT_JOINTS], + ) + goal_handle = self._passthrough_forward_joint_trajectory.send_goal( + trajectory=trajectory, + goal_time_tolerance=goal_time_tolerance, + goal_tolerance=goal_tolerance, + ) + self.assertTrue(goal_handle.accepted) + if goal_handle.accepted: + result = self._passthrough_forward_joint_trajectory.get_result( + goal_handle, TIMEOUT_EXECUTE_TRAJECTORY + ) + self.assertEqual( + result.error_code, FollowJointTrajectory.Result.GOAL_TOLERANCE_VIOLATED + ) + self.assertTrue( + self._controller_manager_interface.switch_controller( + strictness=SwitchController.Request.BEST_EFFORT, + deactivate_controllers=["passthrough_trajectory_controller"], + activate_controllers=["scaled_joint_trajectory_controller"], + ).ok + ) diff --git a/ur_robot_driver/test/integration_test_scaled_joint_controller.py b/ur_robot_driver/test/integration_test_scaled_joint_controller.py new file mode 100755 index 000000000..7c651cf30 --- /dev/null +++ b/ur_robot_driver/test/integration_test_scaled_joint_controller.py @@ -0,0 +1,227 @@ +#!/usr/bin/env python3 +# Copyright 2019, FZI Forschungszentrum Informatik +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# +# * Redistributions in binary form must reproduce the above copyright +# notice, this list of conditions and the following disclaimer in the +# documentation and/or other materials provided with the distribution. +# +# * Neither the name of the {copyright_holder} nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE +# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +# POSSIBILITY OF SUCH DAMAGE. +import logging +import os +import sys +import time +import unittest + +import launch_testing +import pytest +import rclpy +from builtin_interfaces.msg import Duration +from control_msgs.action import FollowJointTrajectory +from controller_manager_msgs.srv import SwitchController +from rclpy.node import Node +from sensor_msgs.msg import JointState +from trajectory_msgs.msg import JointTrajectory, JointTrajectoryPoint + +sys.path.append(os.path.dirname(__file__)) +from test_common import ( # noqa: E402 + ActionInterface, + ControllerManagerInterface, + DashboardInterface, + IoStatusInterface, + generate_driver_test_description, + ROBOT_JOINTS, + TIMEOUT_EXECUTE_TRAJECTORY, + sjtc_trajectory_test, + sjtc_illegal_trajectory_test, +) + + +@pytest.mark.launch_test +@launch_testing.parametrize("tf_prefix", [(""), ("my_ur_")]) +def generate_test_description(tf_prefix): + return generate_driver_test_description(tf_prefix=tf_prefix) + + +class RobotDriverTest(unittest.TestCase): + @classmethod + def setUpClass(cls): + # Initialize the ROS context + rclpy.init() + cls.node = Node("robot_driver_test") + time.sleep(1) + cls.init_robot(cls) + + @classmethod + def tearDownClass(cls): + # Shutdown the ROS context + cls.node.destroy_node() + rclpy.shutdown() + + def init_robot(self): + self._dashboard_interface = DashboardInterface(self.node) + self._controller_manager_interface = ControllerManagerInterface(self.node) + self._io_status_controller_interface = IoStatusInterface(self.node) + + self._scaled_follow_joint_trajectory = ActionInterface( + self.node, + "/scaled_joint_trajectory_controller/follow_joint_trajectory", + FollowJointTrajectory, + ) + + def setUp(self): + self._dashboard_interface.start_robot() + time.sleep(1) + self.assertTrue(self._io_status_controller_interface.resend_robot_program().success) + + # + # Test functions + # + + def test_start_scaled_jtc_controller(self): + self.assertTrue( + self._controller_manager_interface.switch_controller( + strictness=SwitchController.Request.BEST_EFFORT, + activate_controllers=["scaled_joint_trajectory_controller"], + ).ok + ) + + def test_trajectory(self, tf_prefix): + sjtc_trajectory_test(self, tf_prefix) + + def test_illegal_trajectory(self, tf_prefix): + sjtc_illegal_trajectory_test(self, tf_prefix) + + def test_trajectory_scaled(self, tf_prefix): + """Test robot movement.""" + self.assertTrue( + self._controller_manager_interface.switch_controller( + strictness=SwitchController.Request.BEST_EFFORT, + deactivate_controllers=["passthrough_trajectory_controller"], + activate_controllers=["scaled_joint_trajectory_controller"], + ).ok + ) + # Construct test trajectory + test_trajectory = [ + (Duration(sec=6, nanosec=0), [0.0 for j in ROBOT_JOINTS]), + (Duration(sec=6, nanosec=500000000), [-1.0 for j in ROBOT_JOINTS]), + ] + + trajectory = JointTrajectory( + joint_names=[tf_prefix + joint for joint in ROBOT_JOINTS], + points=[ + JointTrajectoryPoint(positions=test_pos, time_from_start=test_time) + for (test_time, test_pos) in test_trajectory + ], + ) + + # Execute trajectory + logging.info("Sending goal for robot to follow") + goal_handle = self._scaled_follow_joint_trajectory.send_goal(trajectory=trajectory) + self.assertTrue(goal_handle.accepted) + + # Verify execution + result = self._scaled_follow_joint_trajectory.get_result( + goal_handle, + TIMEOUT_EXECUTE_TRAJECTORY, + ) + self.assertEqual(result.error_code, FollowJointTrajectory.Result.SUCCESSFUL) + + def test_trajectory_scaled_aborts_on_violation(self, tf_prefix): + """Test that the robot correctly aborts the trajectory when the constraints are violated.""" + # Construct test trajectory + test_trajectory = [ + (Duration(sec=6, nanosec=0), [0.0 for j in ROBOT_JOINTS]), + ( + Duration(sec=6, nanosec=50000000), + [-1.0 for j in ROBOT_JOINTS], + ), # physically unfeasible + ( + Duration(sec=8, nanosec=0), + [-1.5 for j in ROBOT_JOINTS], + ), # physically unfeasible + ] + + trajectory = JointTrajectory( + joint_names=[tf_prefix + joint for joint in ROBOT_JOINTS], + points=[ + JointTrajectoryPoint(positions=test_pos, time_from_start=test_time) + for (test_time, test_pos) in test_trajectory + ], + ) + + last_joint_state = None + + def js_cb(msg): + nonlocal last_joint_state + last_joint_state = msg + + joint_state_sub = self.node.create_subscription(JointState, "/joint_states", js_cb, 1) + joint_state_sub # prevent warning about unused variable + + # Send goal + logging.info("Sending goal for robot to follow") + goal_handle = self._scaled_follow_joint_trajectory.send_goal(trajectory=trajectory) + self.assertTrue(goal_handle.accepted) + + # Get result + result = self._scaled_follow_joint_trajectory.get_result( + goal_handle, + TIMEOUT_EXECUTE_TRAJECTORY, + ) + self.assertEqual(result.error_code, FollowJointTrajectory.Result.PATH_TOLERANCE_VIOLATED) + + state_when_aborted = last_joint_state + + # This section is to make sure the robot stopped moving once the trajectory was aborted + time.sleep(2.0) + # Ugly workaround since we want to wait for a joint state in the same thread + while last_joint_state == state_when_aborted: + rclpy.spin_once(self.node) + state_after_sleep = last_joint_state + + logging.info("Joint states before sleep:\t %s", state_when_aborted.position.tolist()) + logging.info("Joint states after sleep:\t %s", state_after_sleep.position.tolist()) + + self.assertTrue( + all( + [ + abs(a - b) < 0.01 + for a, b in zip(state_after_sleep.position, state_when_aborted.position) + ] + ) + ) + + # TODO: uncomment when JTC starts taking into account goal_time_tolerance from goal message + # see https://github.com/ros-controls/ros2_controllers/issues/249 + # Now do the same again, but with a goal time constraint + # self.node.get_logger().info("Sending scaled goal with time restrictions") + # + # goal.goal_time_tolerance = Duration(nanosec=10000000) + # goal_response = self.call_action("/scaled_joint_trajectory_controller/follow_joint_trajectory", goal) + # + # self.assertEqual(goal_response.accepted, True) + # + # if goal_response.accepted: + # result = self.get_result("/scaled_joint_trajectory_controller/follow_joint_trajectory", goal_response, TIMEOUT_EXECUTE_TRAJECTORY) + # self.assertEqual(result.error_code, FollowJointTrajectory.Result.GOAL_TOLERANCE_VIOLATED) + # self.node.get_logger().info("Received result GOAL_TOLERANCE_VIOLATED") diff --git a/ur_robot_driver/test/robot_driver.py b/ur_robot_driver/test/robot_driver.py deleted file mode 100755 index e07462772..000000000 --- a/ur_robot_driver/test/robot_driver.py +++ /dev/null @@ -1,495 +0,0 @@ -#!/usr/bin/env python3 -# Copyright 2019, FZI Forschungszentrum Informatik -# -# Redistribution and use in source and binary forms, with or without -# modification, are permitted provided that the following conditions are met: -# -# * Redistributions of source code must retain the above copyright -# notice, this list of conditions and the following disclaimer. -# -# * Redistributions in binary form must reproduce the above copyright -# notice, this list of conditions and the following disclaimer in the -# documentation and/or other materials provided with the distribution. -# -# * Neither the name of the {copyright_holder} nor the names of its -# contributors may be used to endorse or promote products derived from -# this software without specific prior written permission. -# -# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" -# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE -# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE -# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE -# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR -# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF -# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS -# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN -# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) -# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE -# POSSIBILITY OF SUCH DAMAGE. -import logging -import os -import sys -import time -import unittest - -import launch_testing -import pytest -import rclpy -from builtin_interfaces.msg import Duration -from control_msgs.action import FollowJointTrajectory -from control_msgs.msg import JointTolerance -from controller_manager_msgs.srv import SwitchController -from rclpy.node import Node -from sensor_msgs.msg import JointState -from trajectory_msgs.msg import JointTrajectory, JointTrajectoryPoint -from ur_msgs.msg import IOStates - -sys.path.append(os.path.dirname(__file__)) -from test_common import ( # noqa: E402 - ActionInterface, - ControllerManagerInterface, - DashboardInterface, - IoStatusInterface, - ConfigurationInterface, - generate_driver_test_description, - ROBOT_JOINTS, -) - -TIMEOUT_EXECUTE_TRAJECTORY = 30 - - -@pytest.mark.launch_test -@launch_testing.parametrize( - "tf_prefix", - [(""), ("my_ur_")], -) -def generate_test_description(tf_prefix): - return generate_driver_test_description(tf_prefix=tf_prefix) - - -class RobotDriverTest(unittest.TestCase): - @classmethod - def setUpClass(cls): - # Initialize the ROS context - rclpy.init() - cls.node = Node("robot_driver_test") - time.sleep(1) - cls.init_robot(cls) - - @classmethod - def tearDownClass(cls): - # Shutdown the ROS context - cls.node.destroy_node() - rclpy.shutdown() - - def init_robot(self): - self._dashboard_interface = DashboardInterface(self.node) - self._controller_manager_interface = ControllerManagerInterface(self.node) - self._io_status_controller_interface = IoStatusInterface(self.node) - self._configuration_controller_interface = ConfigurationInterface(self.node) - - self._scaled_follow_joint_trajectory = ActionInterface( - self.node, - "/scaled_joint_trajectory_controller/follow_joint_trajectory", - FollowJointTrajectory, - ) - self._passthrough_forward_joint_trajectory = ActionInterface( - self.node, - "/passthrough_trajectory_controller/follow_joint_trajectory", - FollowJointTrajectory, - ) - - def setUp(self): - self._dashboard_interface.start_robot() - time.sleep(1) - self.assertTrue(self._io_status_controller_interface.resend_robot_program().success) - - # - # Test functions - # - - def test_get_robot_software_version(self): - self.assertNotEqual( - self._configuration_controller_interface.get_robot_software_version().major, 0 - ) - - def test_start_scaled_jtc_controller(self): - self.assertTrue( - self._controller_manager_interface.switch_controller( - strictness=SwitchController.Request.BEST_EFFORT, - activate_controllers=["scaled_joint_trajectory_controller"], - ).ok - ) - - def test_start_passthrough_controller(self): - self.assertTrue( - self._controller_manager_interface.switch_controller( - strictness=SwitchController.Request.BEST_EFFORT, - activate_controllers=["passthrough_trajectory_controller"], - deactivate_controllers=["scaled_joint_trajectory_controller"], - ).ok - ) - self.assertTrue( - self._controller_manager_interface.switch_controller( - strictness=SwitchController.Request.BEST_EFFORT, - deactivate_controllers=["passthrough_trajectory_controller"], - activate_controllers=["scaled_joint_trajectory_controller"], - ).ok - ) - - def test_set_io(self): - """Test to set an IO and check whether it has been set.""" - # Create io callback to verify result - io_msg = None - - def io_msg_cb(msg): - nonlocal io_msg - io_msg = msg - - io_states_sub = self.node.create_subscription( - IOStates, - "/io_and_status_controller/io_states", - io_msg_cb, - rclpy.qos.qos_profile_system_default, - ) - - # Set pin 0 to 1.0 - test_pin = 0 - - logging.info("Setting pin %d to 1.0", test_pin) - self._io_status_controller_interface.set_io(fun=1, pin=test_pin, state=1.0) - - # Wait until the pin state has changed - pin_state = False - end_time = time.time() + 5 - while not pin_state and time.time() < end_time: - rclpy.spin_once(self.node, timeout_sec=0.1) - if io_msg is not None: - pin_state = io_msg.digital_out_states[test_pin].state - - self.assertEqual(pin_state, 1.0) - - # Set pin 0 to 0.0 - logging.info("Setting pin %d to 0.0", test_pin) - self._io_status_controller_interface.set_io(fun=1, pin=test_pin, state=0.0) - - # Wait until the pin state has changed back - end_time = time.time() + 5 - while pin_state and time.time() < end_time: - rclpy.spin_once(self.node, timeout_sec=0.1) - if io_msg is not None: - pin_state = io_msg.digital_out_states[test_pin].state - - self.assertEqual(pin_state, 0.0) - - # Clean up io subscription - self.node.destroy_subscription(io_states_sub) - - def test_trajectory(self, tf_prefix): - """Test robot movement.""" - self.assertTrue( - self._controller_manager_interface.switch_controller( - strictness=SwitchController.Request.BEST_EFFORT, - deactivate_controllers=["passthrough_trajectory_controller"], - activate_controllers=["scaled_joint_trajectory_controller"], - ).ok - ) - # Construct test trajectory - test_trajectory = [ - (Duration(sec=6, nanosec=0), [0.0 for j in ROBOT_JOINTS]), - (Duration(sec=9, nanosec=0), [-0.5 for j in ROBOT_JOINTS]), - (Duration(sec=12, nanosec=0), [-1.0 for j in ROBOT_JOINTS]), - ] - - trajectory = JointTrajectory( - joint_names=[tf_prefix + joint for joint in ROBOT_JOINTS], - points=[ - JointTrajectoryPoint(positions=test_pos, time_from_start=test_time) - for (test_time, test_pos) in test_trajectory - ], - ) - - # Sending trajectory goal - logging.info("Sending simple goal") - goal_handle = self._scaled_follow_joint_trajectory.send_goal(trajectory=trajectory) - self.assertTrue(goal_handle.accepted) - - # Verify execution - result = self._scaled_follow_joint_trajectory.get_result( - goal_handle, TIMEOUT_EXECUTE_TRAJECTORY - ) - self.assertEqual(result.error_code, FollowJointTrajectory.Result.SUCCESSFUL) - - def test_illegal_trajectory(self, tf_prefix): - """ - Test trajectory server. - - This is more of a validation test that the testing suite does the right thing - """ - self.assertTrue( - self._controller_manager_interface.switch_controller( - strictness=SwitchController.Request.BEST_EFFORT, - deactivate_controllers=["passthrough_trajectory_controller"], - activate_controllers=["scaled_joint_trajectory_controller"], - ).ok - ) - # Construct test trajectory, the second point wrongly starts before the first - test_trajectory = [ - (Duration(sec=6, nanosec=0), [0.0 for j in ROBOT_JOINTS]), - (Duration(sec=3, nanosec=0), [-0.5 for j in ROBOT_JOINTS]), - ] - - trajectory = JointTrajectory( - joint_names=[tf_prefix + joint for joint in ROBOT_JOINTS], - points=[ - JointTrajectoryPoint(positions=test_pos, time_from_start=test_time) - for (test_time, test_pos) in test_trajectory - ], - ) - - # Send illegal goal - logging.info("Sending illegal goal") - goal_handle = self._scaled_follow_joint_trajectory.send_goal( - trajectory=trajectory, - ) - - # Verify the failure is correctly detected - self.assertFalse(goal_handle.accepted) - - def test_trajectory_scaled(self, tf_prefix): - """Test robot movement.""" - self.assertTrue( - self._controller_manager_interface.switch_controller( - strictness=SwitchController.Request.BEST_EFFORT, - deactivate_controllers=["passthrough_trajectory_controller"], - activate_controllers=["scaled_joint_trajectory_controller"], - ).ok - ) - # Construct test trajectory - test_trajectory = [ - (Duration(sec=6, nanosec=0), [0.0 for j in ROBOT_JOINTS]), - (Duration(sec=6, nanosec=500000000), [-1.0 for j in ROBOT_JOINTS]), - ] - - trajectory = JointTrajectory( - joint_names=[tf_prefix + joint for joint in ROBOT_JOINTS], - points=[ - JointTrajectoryPoint(positions=test_pos, time_from_start=test_time) - for (test_time, test_pos) in test_trajectory - ], - ) - - # Execute trajectory - logging.info("Sending goal for robot to follow") - goal_handle = self._scaled_follow_joint_trajectory.send_goal(trajectory=trajectory) - self.assertTrue(goal_handle.accepted) - - # Verify execution - result = self._scaled_follow_joint_trajectory.get_result( - goal_handle, - TIMEOUT_EXECUTE_TRAJECTORY, - ) - self.assertEqual(result.error_code, FollowJointTrajectory.Result.SUCCESSFUL) - - def test_trajectory_scaled_aborts_on_violation(self, tf_prefix): - """Test that the robot correctly aborts the trajectory when the constraints are violated.""" - # Construct test trajectory - test_trajectory = [ - (Duration(sec=6, nanosec=0), [0.0 for j in ROBOT_JOINTS]), - ( - Duration(sec=6, nanosec=50000000), - [-1.0 for j in ROBOT_JOINTS], - ), # physically unfeasible - ( - Duration(sec=8, nanosec=0), - [-1.5 for j in ROBOT_JOINTS], - ), # physically unfeasible - ] - - trajectory = JointTrajectory( - joint_names=[tf_prefix + joint for joint in ROBOT_JOINTS], - points=[ - JointTrajectoryPoint(positions=test_pos, time_from_start=test_time) - for (test_time, test_pos) in test_trajectory - ], - ) - - last_joint_state = None - - def js_cb(msg): - nonlocal last_joint_state - last_joint_state = msg - - joint_state_sub = self.node.create_subscription(JointState, "/joint_states", js_cb, 1) - joint_state_sub # prevent warning about unused variable - - # Send goal - logging.info("Sending goal for robot to follow") - goal_handle = self._scaled_follow_joint_trajectory.send_goal(trajectory=trajectory) - self.assertTrue(goal_handle.accepted) - - # Get result - result = self._scaled_follow_joint_trajectory.get_result( - goal_handle, - TIMEOUT_EXECUTE_TRAJECTORY, - ) - self.assertEqual(result.error_code, FollowJointTrajectory.Result.PATH_TOLERANCE_VIOLATED) - - state_when_aborted = last_joint_state - - # This section is to make sure the robot stopped moving once the trajectory was aborted - time.sleep(2.0) - # Ugly workaround since we want to wait for a joint state in the same thread - while last_joint_state == state_when_aborted: - rclpy.spin_once(self.node) - state_after_sleep = last_joint_state - - logging.info("Joint states before sleep:\t %s", state_when_aborted.position.tolist()) - logging.info("Joint states after sleep:\t %s", state_after_sleep.position.tolist()) - - self.assertTrue( - all( - [ - abs(a - b) < 0.01 - for a, b in zip(state_after_sleep.position, state_when_aborted.position) - ] - ) - ) - - # TODO: uncomment when JTC starts taking into account goal_time_tolerance from goal message - # see https://github.com/ros-controls/ros2_controllers/issues/249 - # Now do the same again, but with a goal time constraint - # self.node.get_logger().info("Sending scaled goal with time restrictions") - # - # goal.goal_time_tolerance = Duration(nanosec=10000000) - # goal_response = self.call_action("/scaled_joint_trajectory_controller/follow_joint_trajectory", goal) - # - # self.assertEqual(goal_response.accepted, True) - # - # if goal_response.accepted: - # result = self.get_result("/scaled_joint_trajectory_controller/follow_joint_trajectory", goal_response, TIMEOUT_EXECUTE_TRAJECTORY) - # self.assertEqual(result.error_code, FollowJointTrajectory.Result.GOAL_TOLERANCE_VIOLATED) - # self.node.get_logger().info("Received result GOAL_TOLERANCE_VIOLATED") - - def test_passthrough_trajectory(self, tf_prefix): - self.assertTrue( - self._controller_manager_interface.switch_controller( - strictness=SwitchController.Request.BEST_EFFORT, - activate_controllers=["passthrough_trajectory_controller"], - deactivate_controllers=["scaled_joint_trajectory_controller"], - ).ok - ) - waypts = [ - [-1.58, -1.692, -1.4311, -0.0174, 1.5882, 0.0349], - [-2.5, -1.692, -1.4311, -0.0174, 1.5882, 0.0349], - [-1.58, -1.692, -1.4311, -0.0174, 1.5882, 0.0349], - ] - time_vec = [ - Duration(sec=4, nanosec=0), - Duration(sec=8, nanosec=0), - Duration(sec=12, nanosec=0), - ] - goal_tolerance = [ - JointTolerance(position=0.01, velocity=5e-5, name=tf_prefix + ROBOT_JOINTS[i]) - for i in range(len(ROBOT_JOINTS)) - ] - goal_time_tolerance = Duration(sec=1, nanosec=0) - test_trajectory = zip(time_vec, waypts) - trajectory = JointTrajectory( - points=[ - JointTrajectoryPoint(positions=pos, time_from_start=times) - for (times, pos) in test_trajectory - ], - joint_names=[tf_prefix + ROBOT_JOINTS[i] for i in range(len(ROBOT_JOINTS))], - ) - goal_handle = self._passthrough_forward_joint_trajectory.send_goal( - trajectory=trajectory, - goal_time_tolerance=goal_time_tolerance, - goal_tolerance=goal_tolerance, - ) - self.assertTrue(goal_handle.accepted) - if goal_handle.accepted: - result = self._passthrough_forward_joint_trajectory.get_result( - goal_handle, TIMEOUT_EXECUTE_TRAJECTORY - ) - self.assertEqual(result.error_code, FollowJointTrajectory.Result.SUCCESSFUL) - - # Full quintic trajectory - test_trajectory = zip(time_vec, waypts) - trajectory = JointTrajectory( - points=[ - JointTrajectoryPoint( - positions=pos, - time_from_start=times, - velocities=[0.0, 0.0, 0.0, 0.0, 0.0, 0.0], - accelerations=[0.0, 0.0, 0.0, 0.0, 0.0, 0.0], - ) - for (times, pos) in test_trajectory - ], - joint_names=[tf_prefix + ROBOT_JOINTS[i] for i in range(len(ROBOT_JOINTS))], - ) - goal_handle = self._passthrough_forward_joint_trajectory.send_goal( - trajectory=trajectory, - goal_time_tolerance=goal_time_tolerance, - goal_tolerance=goal_tolerance, - ) - self.assertTrue(goal_handle.accepted) - if goal_handle.accepted: - result = self._passthrough_forward_joint_trajectory.get_result( - goal_handle, TIMEOUT_EXECUTE_TRAJECTORY - ) - self.assertEqual(result.error_code, FollowJointTrajectory.Result.SUCCESSFUL) - - # Test impossible goal tolerance, should fail. - test_trajectory = zip(time_vec, waypts) - trajectory = JointTrajectory( - points=[ - JointTrajectoryPoint(positions=pos, time_from_start=times) - for (times, pos) in test_trajectory - ], - joint_names=[tf_prefix + ROBOT_JOINTS[i] for i in range(len(ROBOT_JOINTS))], - ) - goal_tolerance = [ - JointTolerance(position=0.000000001, name=tf_prefix + ROBOT_JOINTS[i]) - for i in range(len(ROBOT_JOINTS)) - ] - goal_handle = self._passthrough_forward_joint_trajectory.send_goal( - trajectory=trajectory, - goal_time_tolerance=goal_time_tolerance, - goal_tolerance=goal_tolerance, - ) - self.assertTrue(goal_handle.accepted) - if goal_handle.accepted: - result = self._passthrough_forward_joint_trajectory.get_result( - goal_handle, TIMEOUT_EXECUTE_TRAJECTORY - ) - self.assertEqual( - result.error_code, FollowJointTrajectory.Result.GOAL_TOLERANCE_VIOLATED - ) - - # Test impossible goal time - goal_tolerance = [ - JointTolerance(position=0.01, name=tf_prefix + ROBOT_JOINTS[i]) for i in range(6) - ] - goal_time_tolerance.sec = 0 - goal_time_tolerance.nanosec = 10 - goal_handle = self._passthrough_forward_joint_trajectory.send_goal( - trajectory=trajectory, - goal_time_tolerance=goal_time_tolerance, - goal_tolerance=goal_tolerance, - ) - self.assertTrue(goal_handle.accepted) - if goal_handle.accepted: - result = self._passthrough_forward_joint_trajectory.get_result( - goal_handle, TIMEOUT_EXECUTE_TRAJECTORY - ) - self.assertEqual( - result.error_code, FollowJointTrajectory.Result.GOAL_TOLERANCE_VIOLATED - ) - self.assertTrue( - self._controller_manager_interface.switch_controller( - strictness=SwitchController.Request.BEST_EFFORT, - deactivate_controllers=["passthrough_trajectory_controller"], - activate_controllers=["scaled_joint_trajectory_controller"], - ).ok - ) diff --git a/ur_robot_driver/test/test_common.py b/ur_robot_driver/test/test_common.py index 215f8ff55..9ecd72157 100644 --- a/ur_robot_driver/test/test_common.py +++ b/ur_robot_driver/test/test_common.py @@ -58,11 +58,16 @@ Load, ) from ur_msgs.srv import SetIO, GetRobotSoftwareVersion, SetForceMode +from builtin_interfaces.msg import Duration +from control_msgs.action import FollowJointTrajectory +from trajectory_msgs.msg import JointTrajectory, JointTrajectoryPoint TIMEOUT_WAIT_SERVICE = 10 TIMEOUT_WAIT_SERVICE_INITIAL = 120 # If we download the docker image simultaneously to the tests, it can take quite some time until the dashboard server is reachable and usable. TIMEOUT_WAIT_ACTION = 10 +TIMEOUT_EXECUTE_TRAJECTORY = 30 + ROBOT_JOINTS = [ "elbow_joint", "shoulder_lift_joint", @@ -300,6 +305,79 @@ class ForceModeInterface( pass +def sjtc_trajectory_test(tester, tf_prefix): + """Test robot movement.""" + tester.assertTrue( + tester._controller_manager_interface.switch_controller( + strictness=SwitchController.Request.BEST_EFFORT, + deactivate_controllers=["passthrough_trajectory_controller"], + activate_controllers=["scaled_joint_trajectory_controller"], + ).ok + ) + # Construct test trajectory + test_trajectory = [ + (Duration(sec=6, nanosec=0), [0.0 for j in ROBOT_JOINTS]), + (Duration(sec=9, nanosec=0), [-0.5 for j in ROBOT_JOINTS]), + (Duration(sec=12, nanosec=0), [-1.0 for j in ROBOT_JOINTS]), + ] + + trajectory = JointTrajectory( + joint_names=[tf_prefix + joint for joint in ROBOT_JOINTS], + points=[ + JointTrajectoryPoint(positions=test_pos, time_from_start=test_time) + for (test_time, test_pos) in test_trajectory + ], + ) + + # Sending trajectory goal + logging.info("Sending simple goal") + goal_handle = tester._scaled_follow_joint_trajectory.send_goal(trajectory=trajectory) + tester.assertTrue(goal_handle.accepted) + + # Verify execution + result = tester._scaled_follow_joint_trajectory.get_result( + goal_handle, TIMEOUT_EXECUTE_TRAJECTORY + ) + tester.assertEqual(result.error_code, FollowJointTrajectory.Result.SUCCESSFUL) + + +def sjtc_illegal_trajectory_test(tester, tf_prefix): + """ + Test trajectory server. + + This is more of a validation test that the testing suite does the right thing + """ + tester.assertTrue( + tester._controller_manager_interface.switch_controller( + strictness=SwitchController.Request.BEST_EFFORT, + deactivate_controllers=["passthrough_trajectory_controller"], + activate_controllers=["scaled_joint_trajectory_controller"], + ).ok + ) + # Construct test trajectory, the second point wrongly starts before the first + test_trajectory = [ + (Duration(sec=6, nanosec=0), [0.0 for j in ROBOT_JOINTS]), + (Duration(sec=3, nanosec=0), [-0.5 for j in ROBOT_JOINTS]), + ] + + trajectory = JointTrajectory( + joint_names=[tf_prefix + joint for joint in ROBOT_JOINTS], + points=[ + JointTrajectoryPoint(positions=test_pos, time_from_start=test_time) + for (test_time, test_pos) in test_trajectory + ], + ) + + # Send illegal goal + logging.info("Sending illegal goal") + goal_handle = tester._scaled_follow_joint_trajectory.send_goal( + trajectory=trajectory, + ) + + # Verify the failure is correctly detected + tester.assertFalse(goal_handle.accepted) + + def _declare_launch_arguments(): declared_arguments = [] @@ -370,6 +448,41 @@ def generate_dashboard_test_description(): ) +def generate_mock_hardware_test_description( + tf_prefix="", + initial_joint_controller="scaled_joint_trajectory_controller", + controller_spawner_timeout=TIMEOUT_WAIT_SERVICE_INITIAL, +): + + ur_type = LaunchConfiguration("ur_type") + + launch_arguments = { + "robot_ip": "0.0.0.0", + "ur_type": ur_type, + "launch_rviz": "false", + "controller_spawner_timeout": str(controller_spawner_timeout), + "initial_joint_controller": initial_joint_controller, + "headless_mode": "true", + "launch_dashboard_client": "true", + "start_joint_controller": "false", + "use_mock_hardware": "true", + "mock_sensor_commands": "true", + } + if tf_prefix: + launch_arguments["tf_prefix"] = tf_prefix + + robot_driver = IncludeLaunchDescription( + PythonLaunchDescriptionSource( + PathJoinSubstitution( + [FindPackageShare("ur_robot_driver"), "launch", "ur_control.launch.py"] + ) + ), + launch_arguments=launch_arguments.items(), + ) + + return LaunchDescription(_declare_launch_arguments() + [ReadyToTest(), robot_driver]) + + def generate_driver_test_description( tf_prefix="", initial_joint_controller="scaled_joint_trajectory_controller", diff --git a/ur_robot_driver/test/test_mock_hardware.py b/ur_robot_driver/test/test_mock_hardware.py new file mode 100644 index 000000000..2bf36b6fd --- /dev/null +++ b/ur_robot_driver/test/test_mock_hardware.py @@ -0,0 +1,122 @@ +#!/usr/bin/env python +# Copyright 2025, Universal Robots A/S +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# +# * Redistributions in binary form must reproduce the above copyright +# notice, this list of conditions and the following disclaimer in the +# documentation and/or other materials provided with the distribution. +# +# * Neither the name of the {copyright_holder} nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE +# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +# POSSIBILITY OF SUCH DAMAGE. + +import os +import sys +import time +import unittest + +import launch_testing +import pytest +import rclpy +from rclpy.node import Node +from control_msgs.action import FollowJointTrajectory +from controller_manager_msgs.srv import SwitchController + +sys.path.append(os.path.dirname(__file__)) +from test_common import ( # noqa: E402 + ActionInterface, + ControllerManagerInterface, + IoStatusInterface, + ConfigurationInterface, + generate_mock_hardware_test_description, + sjtc_trajectory_test, + sjtc_illegal_trajectory_test, +) + + +@pytest.mark.launch_test +@launch_testing.parametrize("tf_prefix", [(""), ("my_ur_")]) +def generate_test_description(tf_prefix): + return generate_mock_hardware_test_description(tf_prefix=tf_prefix) + + +class RobotDriverTest(unittest.TestCase): + @classmethod + def setUpClass(cls): + # Initialize the ROS context + rclpy.init() + cls.node = Node("mock_hardware_test") + time.sleep(1) + cls.init_robot(cls) + + @classmethod + def tearDownClass(cls): + # Shutdown the ROS context + cls.node.destroy_node() + rclpy.shutdown() + + # Connect to all interfaces and actions, even ones we know won't work with mock hardware (Except dashboard) + def init_robot(self): + self._controller_manager_interface = ControllerManagerInterface(self.node) + self._io_status_controller_interface = IoStatusInterface(self.node) + self._configuration_controller_interface = ConfigurationInterface(self.node) + + self._scaled_follow_joint_trajectory = ActionInterface( + self.node, + "/scaled_joint_trajectory_controller/follow_joint_trajectory", + FollowJointTrajectory, + ) + + self._passthrough_forward_joint_trajectory = ActionInterface( + self.node, + "/passthrough_trajectory_controller/follow_joint_trajectory", + FollowJointTrajectory, + ) + + # + # Test functions + # + + def test_get_robot_software_version(self): + self.assertEqual( + self._configuration_controller_interface.get_robot_software_version().major, 1 + ) + + def test_start_scaled_jtc_controller(self): + # Deactivate controller, if it is not already + self.assertTrue( + self._controller_manager_interface.switch_controller( + strictness=SwitchController.Request.BEST_EFFORT, + deactivate_controllers=["scaled_joint_trajectory_controller"], + ).ok + ) + # Activate controller + self.assertTrue( + self._controller_manager_interface.switch_controller( + strictness=SwitchController.Request.STRICT, + activate_controllers=["scaled_joint_trajectory_controller"], + ).ok + ) + + def test_trajectory(self, tf_prefix): + sjtc_trajectory_test(self, tf_prefix) + + def test_illegal_trajectory(self, tf_prefix): + sjtc_illegal_trajectory_test(self, tf_prefix) diff --git a/ur_robot_driver/urdf/ur.ros2_control.xacro b/ur_robot_driver/urdf/ur.ros2_control.xacro index f32805273..b480434b0 100644 --- a/ur_robot_driver/urdf/ur.ros2_control.xacro +++ b/ur_robot_driver/urdf/ur.ros2_control.xacro @@ -300,7 +300,9 @@ - + + 1 +