How to know actual result of `proc_output.waitFor` if `expected_output` is wrong?


I am trying to follow a enter image description here

I have a node target_control_node which subscribes the topic turtle1/pose and then move the turtle to a new random pose.

import math
import random

import rclpy
from geometry_msgs.msg import Twist
from rclpy.node import Node
from turtlesim.msg import Pose

class TargetControlNode(Node):
    def __init__(self):
        self._target_pose = None
        self._cmd_vel_publisher = self.create_publisher(Twist, "turtle1/cmd_vel", 10)
        self.create_subscription(Pose, "turtle1/pose", self.subscribe_target_pose, 10)
        self.create_timer(1.0, self.control_loop)

    def subscribe_target_pose(self, msg):
        self._target_pose = msg

    def control_loop(self):
        if self._target_pose is None:

        target_x = random.uniform(0.0, 10.0)
        target_y = random.uniform(0.0, 10.0)

        dist_x = target_x - self._target_pose.x
        dist_y = target_y - self._target_pose.y
        distance = math.sqrt(dist_x**2   dist_y**2)

        msg = Twist()

        # position
        msg.linear.x = 1.0 * distance

        # orientation
        goal_theta = math.atan2(dist_y, dist_x)
        diff = goal_theta - self._target_pose.theta
        if diff > math.pi:
            diff -= 2 * math.pi
        elif diff < -math.pi:
            diff  = 2 * math.pi
        msg.angular.z = 2 * diff


def main(args=None):
    node = TargetControlNode()

if __name__ == "__main__":

I am trying to write a simple test for the subscription part based on the tutorial above to understand how it works.

Here is my initial test code. Note inside I am using expected_output=str(msg), however, it is wrong, and I am not sure what to put there.

import pathlib
import random
import sys
import time
import unittest
import uuid

import launch
import launch_ros
import launch_testing
import pytest
import rclpy
import std_msgs.msg
from geometry_msgs.msg import Twist
from turtlesim.msg import Pose

def generate_test_description():
    src_path = pathlib.Path(__file__).parent.parent

    target_control_node = launch_ros.actions.Node(
        additional_env={"PYTHONUNBUFFERED": "1"},

    return (
            "target_control_node": target_control_node,

class TestTargetControlNodeLink(unittest.TestCase):
    def setUpClass(cls):

    def tearDownClass(cls):

    def setUp(self):
        self.node = rclpy.create_node("target_control_test_node")

    def tearDown(self):

    def test_target_control_node(self, target_control_node, proc_output):
        pose_pub = self.node.create_publisher(Pose, "turtle1/pose", 10)

            msg = Pose()
            msg.x = random.uniform(0.0, 10.0)
            msg.y = random.uniform(0.0, 10.0)
            msg.theta = 0.0
            msg.linear_velocity = 0.0
            msg.angular_velocity = 0.0

            success = proc_output.waitFor(
                # `str(msg)` is wrong, however, I am not sure what to put here.
                expected_output=str(msg), process=target_control_node, timeout=1.0
            assert success


When I run launch_test src/turtle_robot/test/test_target_control_node.py, it only prints this without telling me what is actual output:

[INFO] [launch]: All log files can be found below /home/parallels/.ros/log/2023-01-02-16-37-27-631032-ubuntu-linux-22-04-desktop-1439830
[INFO] [launch]: Default logging verbosity is set to INFO
test_target_control_node (test_target_control_node.TestTargetControlNodeLink) ... [INFO] [python3-1]: process started with pid [1439833]
[python3-1] [INFO] [1672706247.877402445] [target_control_node]: target_control_node

FAIL: test_target_control_node (test_target_control_node.TestTargetControlNodeLink)
Traceback (most recent call last):
  File "/my-ros/src/turtle_robot/test/test_target_control_node.py", line 91, in test_target_control_node
    assert success

Ran 1 test in 1.061s

FAILED (failures=1)
[INFO] [python3-1]: sending signal 'SIGINT' to process[python3-1]
[python3-1] Traceback (most recent call last):
[python3-1]   File "/my-ros/src/turtle_robot/turtle_robot/target_control_node.py", line 59, in <module>
[python3-1]     main()
[python3-1]   File "/my-ros/src/turtle_robot/turtle_robot/target_control_node.py", line 53, in main
[python3-1]     rclpy.spin(node)
[python3-1]   File "/opt/ros/humble/local/lib/python3.10/dist-packages/rclpy/__init__.py", line 222, in spin
[python3-1]     executor.spin_once()
[python3-1]   File "/opt/ros/humble/local/lib/python3.10/dist-packages/rclpy/executors.py", line 705, in spin_once
[python3-1]     handler, entity, node = self.wait_for_ready_callbacks(timeout_sec=timeout_sec)
[python3-1]   File "/opt/ros/humble/local/lib/python3.10/dist-packages/rclpy/executors.py", line 691, in wait_for_ready_callbacks
[python3-1]     return next(self._cb_iter)
[python3-1]   File "/opt/ros/humble/local/lib/python3.10/dist-packages/rclpy/executors.py", line 588, in _wait_for_ready_callbacks
[python3-1]     wait_set.wait(timeout_nsec)
[python3-1] KeyboardInterrupt
[ERROR] [python3-1]: process has died [pid 1439833, exit code -2, cmd '/usr/bin/python3 /my-ros/src/turtle_robot/turtle_robot/target_control_node.py --ros-args'].

Ran 0 tests in 0.000s


I checked the source code of waitFor, but still no clue.

Is there a way to print the actual output so that I can give correct expected_output? Thanks!

To answer your question(and give some more general tips): You can always print out the msg inside the node. That said, the reason you're getting an error is because msg is a ros message type, meaning it's an object. So by doing str(msg) your expected output will be something like <object of type Pose at (some_address)>; and not the actual message values. Also since it appears you're just testing a subscriber, it doesn't usually make sense to have a unit test that's expecting stdout. I would also note that you're publishing a message before actually waiting for the result, this should always fail. The reason is because by the time your subscriber is started, the message has already been published and is gone.

Finally, you shouldn't have a publisher included in any unit tests. It adds an extra dependency and since it's a prepackaged transport layer testing if it works is irrelevant. Instead to fix your problem you should simply be calling the individual methods of your node directly. Basically import the script, instantiate the object, and don't try to deal with the whole node lifecycle.

