Speed Up Data Processing with Multi-Threaded Execution

Written by Ernest Cheong

15/04/2024

What you will learn:

How to use multi-threading in ROS2 to execute multiple instances of a callback simultaneously to process received data in parallel and prevent the data queue from overfilling.

Prerequisite:

Basic understanding of publishers, subscribers, and callback functions.

If you want to learn more advanced ROS 2 topics, like callbacks, parameters, QoS in ROS2, DDS in ROS2 and more, in a practical, hands-on way, check out the course Intermediate ROS 2: https://app.theconstruct.ai/courses/113


Disclaimer

Before we start I’d like to clarify that this may not be the best way to handle the problem of when the rate of data received is faster than the time spent in the callback function as it depends on the specifics of your application. It is usually recommended to keep callbacks as fast and simple as possible and to feed your data to your classes rather than do all processing in the callback. However, the concepts in this tutorial are useful to know and worth understanding!

Opening the rosject

In order to follow this tutorial, we need to have ROS2 installed in our system, and ideally a ros2_ws (ROS2 Workspace). To make your life easier, we have already prepared a rosject for that: https://app.theconstruct.ai/l/6111a6de/

Just by copying the rosject (clicking the link above), you will have a setup already prepared for you.

After the rosject has been successfully copied to your own area, you should see a Run button. Just click that button to launch the rosject.

After pressing the Run button, you should have the rosject loaded. Now, let’s head to the next section to get some real practice.

In order to interact with ROS2, we need a terminal.

Let’s open a terminal by clicking the Open a new terminal button.

Create a ROS2 python package with a simple publisher and subscriber

cd ~/ros2_ws/src/
ros2 pkg create multithreaded_py_pkg --build-type ament_python --dependencies rclpy

Specify the directory as a Python package by creating an empty __init__.py file. Then add python subscriber and publisher nodes:

cd multithreaded_py_pkg
mkdir scripts
touch scripts/__init__.py
touch scripts/minimal_sub.py scripts/minimal_pub.py

Here is a simple publisher node (to paste in minimal_pub.py):

  • It publishes “Hello World: 1”, “Hello World: 2” and so on every half a second using a timer callback, incrementing the number by one each time.
  • Here we used the String() data type for simplicity but it could easily be an image from a camera
#!/usr/bin/env python3

import rclpy
from rclpy.node import Node

from std_msgs.msg import String

class MinimalPublisher(Node):

    def __init__(self):
        super().__init__('minimal_publisher')
        self.publisher_ = self.create_publisher(String, 'topic', 10)
        timer_period = 0.5  # seconds
        self.timer = self.create_timer(timer_period, self.timer_callback)
        self.i = 0

    def timer_callback(self):
        msg = String()
        msg.data = 'Hello World: %d' % self.i
        self.publisher_.publish(msg)
        self.get_logger().info('Publishing: "%s"' % msg.data)
        self.i += 1

def main(args=None):
    rclpy.init(args=args)

    minimal_publisher = MinimalPublisher()

    rclpy.spin(minimal_publisher)

    minimal_publisher.destroy_node()
    rclpy.shutdown()

if __name__ == '__main__':
    main()

Here is a simple subscriber node (to paste in minimal_sub.py):

  • It subscribes to the messages sent by the publisher we created, but sleeps for 2 seconds in each callback to simulate processing the data inside the callback
  • Can guess as to what will happen when we launch the publisher and subscriber?
#!/usr/bin/env python3

import rclpy
from rclpy.node import Node

from std_msgs.msg import String

import time

class MinimalSubscriber(Node):

    def __init__(self):
        super().__init__('minimal_subscriber')

        self.subscription = self.create_subscription(
            String,
            'topic',
            self.listener_callback,
            10)
        self.subscription  # prevent unused variable warning

    def listener_callback(self, msg):
        self.get_logger().info('I heard: "%s"' % msg.data)
        time.sleep(2.0)
        

def main(args=None):
    rclpy.init(args=args)

    minimal_subscriber = MinimalSubscriber()
    
    rclpy.spin(minimal_subscriber)
    
    minimal_subscriber.destroy_node()
    rclpy.shutdown()

if __name__ == '__main__':
    main()

Configure the package and test

Modify setup.py

State the entry points:

entry_points={
        'console_scripts': [
            'minimal_pub_executable = scripts.minimal_pub:main',
            'minimal_sub_executable = scripts.minimal_sub:main'
        ],

So your setup.py should look something like this:

from setuptools import find_packages, setup

package_name = 'multithreaded_py_pkg'

setup(
    name=package_name,
    version='0.0.0',
    packages=find_packages(exclude=['test']),
    data_files=[
        ('share/ament_index/resource_index/packages',
            ['resource/' + package_name]),
        ('share/' + package_name, ['package.xml']),
    ],
    install_requires=['setuptools'],
    zip_safe=True,
    maintainer='user',
    maintainer_email='user@todo.todo',
    description='TODO: Package description',
    license='TODO: License declaration',
    tests_require=['pytest'],
    entry_points={
        'console_scripts': [
            'minimal_pub_executable = scripts.minimal_pub:main',
            'minimal_sub_executable = scripts.minimal_sub:main'
        ],
    },
)

Compile the package

cd ~/ros2_ws/
colcon build --packages-select multithreaded_py_pkg

Run the node

In one terminal, start the subscriber:

source ~/ros2_ws/install/setup.bash
ros2 run multithreaded_py_pkg minimal_sub_executable

In another terminal, start the publisher:

source ~/ros2_ws/install/setup.bash
ros2 run multithreaded_py_pkg minimal_pub_executable

After awhile you should get something like this on the publisher’s side:

[INFO] [1711201052.213651723] [minimal_publisher]: Publishing: "Hello World: 0"
[INFO] [1711201052.692100911] [minimal_publisher]: Publishing: "Hello World: 1"
[INFO] [1711201053.192267050] [minimal_publisher]: Publishing: "Hello World: 2"
[INFO] [1711201053.692179012] [minimal_publisher]: Publishing: "Hello World: 3"
[INFO] [1711201054.192910371] [minimal_publisher]: Publishing: "Hello World: 4"
[INFO] [1711201054.692213964] [minimal_publisher]: Publishing: "Hello World: 5"
[INFO] [1711201055.192285741] [minimal_publisher]: Publishing: "Hello World: 6"
[INFO] [1711201055.692159941] [minimal_publisher]: Publishing: "Hello World: 7"
[INFO] [1711201056.192324184] [minimal_publisher]: Publishing: "Hello World: 8"
[INFO] [1711201056.692108815] [minimal_publisher]: Publishing: "Hello World: 9"
[INFO] [1711201057.192262238] [minimal_publisher]: Publishing: "Hello World: 10"
[INFO] [1711201057.692280665] [minimal_publisher]: Publishing: "Hello World: 11"
[INFO] [1711201058.192159660] [minimal_publisher]: Publishing: "Hello World: 12"
[INFO] [1711201058.692317751] [minimal_publisher]: Publishing: "Hello World: 13"
[INFO] [1711201059.192241851] [minimal_publisher]: Publishing: "Hello World: 14"
[INFO] [1711201059.692214363] [minimal_publisher]: Publishing: "Hello World: 15"
[INFO] [1711201060.192314571] [minimal_publisher]: Publishing: "Hello World: 16"
[INFO] [1711201060.692367678] [minimal_publisher]: Publishing: "Hello World: 17"
[INFO] [1711201061.192353913] [minimal_publisher]: Publishing: "Hello World: 18"
[INFO] [1711201061.692429832] [minimal_publisher]: Publishing: "Hello World: 19"
[INFO] [1711201062.192177421] [minimal_publisher]: Publishing: "Hello World: 20"

And this on the subscriber’s side:

[INFO] [1711202061.202686531] [minimal_subscriber]: I heard: "Hello World: 0"
[INFO] [1711202063.206075512] [minimal_subscriber]: I heard: "Hello World: 1"
[INFO] [1711202065.209073951] [minimal_subscriber]: I heard: "Hello World: 2"
[INFO] [1711202067.212279267] [minimal_subscriber]: I heard: "Hello World: 3"
[INFO] [1711202069.213966244] [minimal_subscriber]: I heard: "Hello World: 7"
[INFO] [1711202071.216851195] [minimal_subscriber]: I heard: "Hello World: 11"
[INFO] [1711202073.219905422] [minimal_subscriber]: I heard: "Hello World: 12"

So what happened? Not only is the data delayed as we would expect, but because the data queue is overfilling, we lose data as well and not all published messages are shown on the subscriber’s side.

To fix this by way of multi-threading, we need to understand a little bit about Executors and Callback Groups.

Executors and Callback Groups

Executors

An Executor in ROS2 uses one or more threads of the underlying operating system to invoke callbacks on incoming messages and events:

  • A SingleThreadedExecutor executes callbacks in a single thread, one at a time, and thus the previous callback must always finish before a new one can begin execution.
  • A MultiThreadedExecutor, on the other hand, is capable of executing several callbacks simultaneously. You can create a configurable number of threads to allow for processing multiple messages or events in parallel.

ROS2 nodes invoke the SingleThreadedExecutor when using rclpy.spin. Thus if we want to execute multiple callbacks simultaneously, we will need to use the MultiThreadedExecutor.

With a MultiThreadedExecutor you can select how many threads you need, which is typically one per callback to guarantee that each one can be executed simultaneously.

If you want to know how many threads you can have in your system, you can try this in a Python interpreter:

python3
>>> import multiprocessing
>>> multiprocessing.cpu_count()

Callback groups

Topic subscribers, timers, service servers, action servers all have an argument where you can set the callback group their callbacks will be in.

ROS2 nodes have two types of callback groups:

  • ReentrantCallbackGroup: Callbacks of this group may be executed in parallel.
  • MutuallyExclusiveCallbackGroup: Callbacks of this group must not be executed in parallel.

If nothing is specified, their callbacks will be assigned to the default MutuallyExclusiveCallbackGroup from the node.

You can run different callbacks simultaneously, and execute different instances of the same callback simultaneously. In this tutorial, we will be doing the latter.

More information and examples about Executors and Callback Groups can be found on the ROS2 wiki.

Fixing the issue using multi-threading

Thus we need a MultiThreadedExecutor with the subscription callback in a ReentrantCallbackGroup.

Replace the code in minimal_sub.py with the following (the highlighted lines are areas of change or addition):

  • Note that we have not specified the number of threads e.g. executor = MultiThreadedExecutor(num_threads=4) so it defaults to the CPU count.
  • Note that the node is not managed directly by the rclpy.spin, but by the executor object.
#!/usr/bin/env python3

import rclpy
from rclpy.node import Node

from std_msgs.msg import String

import time

from rclpy.executors import MultiThreadedExecutor
from rclpy.callback_groups import ReentrantCallbackGroup

class MinimalSubscriber(Node):

    def __init__(self):
        super().__init__('minimal_subscriber')

        self.reentrant_callback_group = ReentrantCallbackGroup()

        self.subscription = self.create_subscription(
            String,
            'topic',
            self.listener_callback,
            10,
            callback_group=self.reentrant_callback_group)
        self.subscription  # prevent unused variable warning

    def listener_callback(self, msg):
        self.get_logger().info('I heard: "%s"' % msg.data)
        time.sleep(2.0)
        

def main(args=None):
    rclpy.init(args=args)

    minimal_subscriber = MinimalSubscriber()

    # Use MultiThreadedExecutor
    executor = MultiThreadedExecutor()
    executor.add_node(minimal_subscriber)
    
    try:
        executor.spin()
    except KeyboardInterrupt:
        minimal_subscriber.destroy_node()
        rclpy.shutdown()

if __name__ == '__main__':
    main()

Compile and run the subscriber then publisher.

You should get something like this on the publisher’s side:

[INFO] [1711208685.497376531] [minimal_publisher]: Publishing: "Hello World: 0"
[INFO] [1711208685.979489812] [minimal_publisher]: Publishing: "Hello World: 1"
[INFO] [1711208686.479769383] [minimal_publisher]: Publishing: "Hello World: 2"
[INFO] [1711208686.979284800] [minimal_publisher]: Publishing: "Hello World: 3"
[INFO] [1711208687.479271344] [minimal_publisher]: Publishing: "Hello World: 4"
[INFO] [1711208687.979414332] [minimal_publisher]: Publishing: "Hello World: 5"
[INFO] [1711208688.479397837] [minimal_publisher]: Publishing: "Hello World: 6"
[INFO] [1711208688.979379973] [minimal_publisher]: Publishing: "Hello World: 7"
[INFO] [1711208689.479369576] [minimal_publisher]: Publishing: "Hello World: 8"
[INFO] [1711208689.979201123] [minimal_publisher]: Publishing: "Hello World: 9"
[INFO] [1711208690.479440648] [minimal_publisher]: Publishing: "Hello World: 10"
[INFO] [1711208690.979274539] [minimal_publisher]: Publishing: "Hello World: 11"
[INFO] [1711208691.479435731] [minimal_publisher]: Publishing: "Hello World: 12"
[INFO] [1711208691.979353466] [minimal_publisher]: Publishing: "Hello World: 13"

And this on the subscriber’s side:

[INFO] [1711208685.498698985] [minimal_subscriber]: I heard: "Hello World: 0"
[INFO] [1711208685.980588672] [minimal_subscriber]: I heard: "Hello World: 1"
[INFO] [1711208686.480816203] [minimal_subscriber]: I heard: "Hello World: 2"
[INFO] [1711208686.980613803] [minimal_subscriber]: I heard: "Hello World: 3"
[INFO] [1711208687.480707978] [minimal_subscriber]: I heard: "Hello World: 4"
[INFO] [1711208687.980819846] [minimal_subscriber]: I heard: "Hello World: 5"
[INFO] [1711208688.480733017] [minimal_subscriber]: I heard: "Hello World: 6"
[INFO] [1711208688.980382320] [minimal_subscriber]: I heard: "Hello World: 7"
[INFO] [1711208689.480969703] [minimal_subscriber]: I heard: "Hello World: 8"
[INFO] [1711208689.980889272] [minimal_subscriber]: I heard: "Hello World: 9"
[INFO] [1711208690.480743475] [minimal_subscriber]: I heard: "Hello World: 10"
[INFO] [1711208690.980555923] [minimal_subscriber]: I heard: "Hello World: 11"
[INFO] [1711208691.481019225] [minimal_subscriber]: I heard: "Hello World: 12"
[INFO] [1711208691.980809084] [minimal_subscriber]: I heard: "Hello World: 13"

This means that it is working successfully and you are running multiple instances of the subscriber callback in parallel!

Congratulations. You now have a basic understanding of callback.

To learn more advanced topics about ROS 2, have a look at the course below:

We hope this post was really helpful to you.

This tutorial is created by Robotics Ambassador Ernest.

Video Tutorial

Topics: callbacks | ros2
Masterclass 2023 batch2 blog banner

Check Out These Related Posts

129. ros2ai

129. ros2ai

I would like to dedicate this episode to all the ROS Developers who believe that ChatGPT or...

read more

0 Comments

Pin It on Pinterest

Share This