Speed Up Data Processing with Multi-Threaded Execution
What you will learn:
How to use multi-threading in ROS2 to execute multiple instances of a callback simultaneously to process received data in parallel and prevent the data queue from overfilling.
Prerequisite:
Basic understanding of publishers, subscribers, and callback functions.
If you want to learn more advanced ROS 2 topics, like callbacks, parameters, QoS in ROS2, DDS in ROS2 and more, in a practical, hands-on way, check out the course Intermediate ROS 2: https://app.theconstruct.ai/courses/113
Disclaimer
Before we start I’d like to clarify that this may not be the best way to handle the problem of when the rate of data received is faster than the time spent in the callback function as it depends on the specifics of your application. It is usually recommended to keep callbacks as fast and simple as possible and to feed your data to your classes rather than do all processing in the callback. However, the concepts in this tutorial are useful to know and worth understanding!
Opening the rosject
In order to follow this tutorial, we need to have ROS2 installed in our system, and ideally a ros2_ws (ROS2 Workspace). To make your life easier, we have already prepared a rosject for that: https://app.theconstruct.ai/l/6111a6de/
Just by copying the rosject (clicking the link above), you will have a setup already prepared for you.
After the rosject has been successfully copied to your own area, you should see a Run button. Just click that button to launch the rosject.
After pressing the Run button, you should have the rosject loaded. Now, let’s head to the next section to get some real practice.
In order to interact with ROS2, we need a terminal.
Let’s open a terminal by clicking the Open a new terminal button.
Create a ROS2 python package with a simple publisher and subscriber
cd ~/ros2_ws/src/ ros2 pkg create multithreaded_py_pkg --build-type ament_python --dependencies rclpy
Specify the directory as a Python package by creating an empty __init__.py
file. Then add python subscriber and publisher nodes:
cd multithreaded_py_pkg mkdir scripts touch scripts/__init__.py touch scripts/minimal_sub.py scripts/minimal_pub.py
Here is a simple publisher node (to paste in minimal_pub.py
):
- It publishes “
Hello World: 1
”, “Hello World: 2
” and so on every half a second using a timer callback, incrementing the number by one each time. - Here we used the
String()
data type for simplicity but it could easily be an image from a camera
#!/usr/bin/env python3 import rclpy from rclpy.node import Node from std_msgs.msg import String class MinimalPublisher(Node): def __init__(self): super().__init__('minimal_publisher') self.publisher_ = self.create_publisher(String, 'topic', 10) timer_period = 0.5 # seconds self.timer = self.create_timer(timer_period, self.timer_callback) self.i = 0 def timer_callback(self): msg = String() msg.data = 'Hello World: %d' % self.i self.publisher_.publish(msg) self.get_logger().info('Publishing: "%s"' % msg.data) self.i += 1 def main(args=None): rclpy.init(args=args) minimal_publisher = MinimalPublisher() rclpy.spin(minimal_publisher) minimal_publisher.destroy_node() rclpy.shutdown() if __name__ == '__main__': main()
Here is a simple subscriber node (to paste in minimal_sub.py
):
- It subscribes to the messages sent by the publisher we created, but sleeps for 2 seconds in each callback to simulate processing the data inside the callback
- Can guess as to what will happen when we launch the publisher and subscriber?
#!/usr/bin/env python3 import rclpy from rclpy.node import Node from std_msgs.msg import String import time class MinimalSubscriber(Node): def __init__(self): super().__init__('minimal_subscriber') self.subscription = self.create_subscription( String, 'topic', self.listener_callback, 10) self.subscription # prevent unused variable warning def listener_callback(self, msg): self.get_logger().info('I heard: "%s"' % msg.data) time.sleep(2.0) def main(args=None): rclpy.init(args=args) minimal_subscriber = MinimalSubscriber() rclpy.spin(minimal_subscriber) minimal_subscriber.destroy_node() rclpy.shutdown() if __name__ == '__main__': main()
Configure the package and test
Modify setup.py
State the entry points:
entry_points={ 'console_scripts': [ 'minimal_pub_executable = scripts.minimal_pub:main', 'minimal_sub_executable = scripts.minimal_sub:main' ],
So your setup.py
should look something like this:
from setuptools import find_packages, setup package_name = 'multithreaded_py_pkg' setup( name=package_name, version='0.0.0', packages=find_packages(exclude=['test']), data_files=[ ('share/ament_index/resource_index/packages', ['resource/' + package_name]), ('share/' + package_name, ['package.xml']), ], install_requires=['setuptools'], zip_safe=True, maintainer='user', maintainer_email='user@todo.todo', description='TODO: Package description', license='TODO: License declaration', tests_require=['pytest'], entry_points={ 'console_scripts': [ 'minimal_pub_executable = scripts.minimal_pub:main', 'minimal_sub_executable = scripts.minimal_sub:main' ], }, )
Compile the package
cd ~/ros2_ws/ colcon build --packages-select multithreaded_py_pkg
Run the node
In one terminal, start the subscriber:
source ~/ros2_ws/install/setup.bash ros2 run multithreaded_py_pkg minimal_sub_executable
In another terminal, start the publisher:
source ~/ros2_ws/install/setup.bash ros2 run multithreaded_py_pkg minimal_pub_executable
After awhile you should get something like this on the publisher’s side:
[INFO] [1711201052.213651723] [minimal_publisher]: Publishing: "Hello World: 0" [INFO] [1711201052.692100911] [minimal_publisher]: Publishing: "Hello World: 1" [INFO] [1711201053.192267050] [minimal_publisher]: Publishing: "Hello World: 2" [INFO] [1711201053.692179012] [minimal_publisher]: Publishing: "Hello World: 3" [INFO] [1711201054.192910371] [minimal_publisher]: Publishing: "Hello World: 4" [INFO] [1711201054.692213964] [minimal_publisher]: Publishing: "Hello World: 5" [INFO] [1711201055.192285741] [minimal_publisher]: Publishing: "Hello World: 6" [INFO] [1711201055.692159941] [minimal_publisher]: Publishing: "Hello World: 7" [INFO] [1711201056.192324184] [minimal_publisher]: Publishing: "Hello World: 8" [INFO] [1711201056.692108815] [minimal_publisher]: Publishing: "Hello World: 9" [INFO] [1711201057.192262238] [minimal_publisher]: Publishing: "Hello World: 10" [INFO] [1711201057.692280665] [minimal_publisher]: Publishing: "Hello World: 11" [INFO] [1711201058.192159660] [minimal_publisher]: Publishing: "Hello World: 12" [INFO] [1711201058.692317751] [minimal_publisher]: Publishing: "Hello World: 13" [INFO] [1711201059.192241851] [minimal_publisher]: Publishing: "Hello World: 14" [INFO] [1711201059.692214363] [minimal_publisher]: Publishing: "Hello World: 15" [INFO] [1711201060.192314571] [minimal_publisher]: Publishing: "Hello World: 16" [INFO] [1711201060.692367678] [minimal_publisher]: Publishing: "Hello World: 17" [INFO] [1711201061.192353913] [minimal_publisher]: Publishing: "Hello World: 18" [INFO] [1711201061.692429832] [minimal_publisher]: Publishing: "Hello World: 19" [INFO] [1711201062.192177421] [minimal_publisher]: Publishing: "Hello World: 20"
And this on the subscriber’s side:
[INFO] [1711202061.202686531] [minimal_subscriber]: I heard: "Hello World: 0" [INFO] [1711202063.206075512] [minimal_subscriber]: I heard: "Hello World: 1" [INFO] [1711202065.209073951] [minimal_subscriber]: I heard: "Hello World: 2" [INFO] [1711202067.212279267] [minimal_subscriber]: I heard: "Hello World: 3" [INFO] [1711202069.213966244] [minimal_subscriber]: I heard: "Hello World: 7" [INFO] [1711202071.216851195] [minimal_subscriber]: I heard: "Hello World: 11" [INFO] [1711202073.219905422] [minimal_subscriber]: I heard: "Hello World: 12"
So what happened? Not only is the data delayed as we would expect, but because the data queue is overfilling, we lose data as well and not all published messages are shown on the subscriber’s side.
To fix this by way of multi-threading, we need to understand a little bit about Executors and Callback Groups.
Executors and Callback Groups
Executors
An Executor
in ROS2 uses one or more threads of the underlying operating system to invoke callbacks on incoming messages and events:
- A
SingleThreadedExecutor
executes callbacks in a single thread, one at a time, and thus the previous callback must always finish before a new one can begin execution. - A
MultiThreadedExecutor
, on the other hand, is capable of executing several callbacks simultaneously. You can create a configurable number of threads to allow for processing multiple messages or events in parallel.
ROS2 nodes invoke the SingleThreadedExecutor
when using rclpy.spin
. Thus if we want to execute multiple callbacks simultaneously, we will need to use the MultiThreadedExecutor
.
With a MultiThreadedExecutor
you can select how many threads you need, which is typically one per callback to guarantee that each one can be executed simultaneously.
If you want to know how many threads you can have in your system, you can try this in a Python interpreter:
python3 >>> import multiprocessing >>> multiprocessing.cpu_count()
Callback groups
Topic subscribers, timers, service servers, action servers all have an argument where you can set the callback group their callbacks will be in.
ROS2 nodes have two types of callback groups:
ReentrantCallbackGroup
: Callbacks of this group may be executed in parallel.MutuallyExclusiveCallbackGroup
: Callbacks of this group must not be executed in parallel.
If nothing is specified, their callbacks will be assigned to the default MutuallyExclusiveCallbackGroup
from the node.
You can run different callbacks simultaneously, and execute different instances of the same callback simultaneously. In this tutorial, we will be doing the latter.
More information and examples about Executors and Callback Groups can be found on the ROS2 wiki.
Fixing the issue using multi-threading
Thus we need a MultiThreadedExecutor
with the subscription callback in a ReentrantCallbackGroup
.
Replace the code in minimal_sub.py
with the following (the highlighted lines are areas of change or addition):
- Note that we have not specified the number of threads e.g.
executor = MultiThreadedExecutor(num_threads=4)
so it defaults to the CPU count. - Note that the node is not managed directly by the
rclpy.spin
, but by theexecutor
object.
#!/usr/bin/env python3 import rclpy from rclpy.node import Node from std_msgs.msg import String import time from rclpy.executors import MultiThreadedExecutor from rclpy.callback_groups import ReentrantCallbackGroup class MinimalSubscriber(Node): def __init__(self): super().__init__('minimal_subscriber') self.reentrant_callback_group = ReentrantCallbackGroup() self.subscription = self.create_subscription( String, 'topic', self.listener_callback, 10, callback_group=self.reentrant_callback_group) self.subscription # prevent unused variable warning def listener_callback(self, msg): self.get_logger().info('I heard: "%s"' % msg.data) time.sleep(2.0) def main(args=None): rclpy.init(args=args) minimal_subscriber = MinimalSubscriber() # Use MultiThreadedExecutor executor = MultiThreadedExecutor() executor.add_node(minimal_subscriber) try: executor.spin() except KeyboardInterrupt: minimal_subscriber.destroy_node() rclpy.shutdown() if __name__ == '__main__': main()
Compile and run the subscriber then publisher.
You should get something like this on the publisher’s side:
[INFO] [1711208685.497376531] [minimal_publisher]: Publishing: "Hello World: 0" [INFO] [1711208685.979489812] [minimal_publisher]: Publishing: "Hello World: 1" [INFO] [1711208686.479769383] [minimal_publisher]: Publishing: "Hello World: 2" [INFO] [1711208686.979284800] [minimal_publisher]: Publishing: "Hello World: 3" [INFO] [1711208687.479271344] [minimal_publisher]: Publishing: "Hello World: 4" [INFO] [1711208687.979414332] [minimal_publisher]: Publishing: "Hello World: 5" [INFO] [1711208688.479397837] [minimal_publisher]: Publishing: "Hello World: 6" [INFO] [1711208688.979379973] [minimal_publisher]: Publishing: "Hello World: 7" [INFO] [1711208689.479369576] [minimal_publisher]: Publishing: "Hello World: 8" [INFO] [1711208689.979201123] [minimal_publisher]: Publishing: "Hello World: 9" [INFO] [1711208690.479440648] [minimal_publisher]: Publishing: "Hello World: 10" [INFO] [1711208690.979274539] [minimal_publisher]: Publishing: "Hello World: 11" [INFO] [1711208691.479435731] [minimal_publisher]: Publishing: "Hello World: 12" [INFO] [1711208691.979353466] [minimal_publisher]: Publishing: "Hello World: 13"
And this on the subscriber’s side:
[INFO] [1711208685.498698985] [minimal_subscriber]: I heard: "Hello World: 0" [INFO] [1711208685.980588672] [minimal_subscriber]: I heard: "Hello World: 1" [INFO] [1711208686.480816203] [minimal_subscriber]: I heard: "Hello World: 2" [INFO] [1711208686.980613803] [minimal_subscriber]: I heard: "Hello World: 3" [INFO] [1711208687.480707978] [minimal_subscriber]: I heard: "Hello World: 4" [INFO] [1711208687.980819846] [minimal_subscriber]: I heard: "Hello World: 5" [INFO] [1711208688.480733017] [minimal_subscriber]: I heard: "Hello World: 6" [INFO] [1711208688.980382320] [minimal_subscriber]: I heard: "Hello World: 7" [INFO] [1711208689.480969703] [minimal_subscriber]: I heard: "Hello World: 8" [INFO] [1711208689.980889272] [minimal_subscriber]: I heard: "Hello World: 9" [INFO] [1711208690.480743475] [minimal_subscriber]: I heard: "Hello World: 10" [INFO] [1711208690.980555923] [minimal_subscriber]: I heard: "Hello World: 11" [INFO] [1711208691.481019225] [minimal_subscriber]: I heard: "Hello World: 12" [INFO] [1711208691.980809084] [minimal_subscriber]: I heard: "Hello World: 13"
This means that it is working successfully and you are running multiple instances of the subscriber callback in parallel!
Congratulations. You now have a basic understanding of callback.
To learn more advanced topics about ROS 2, have a look at the course below:
- Intermediate ROS 2: https://app.theconstruct.ai/courses/113
We hope this post was really helpful to you.
This tutorial is created by Robotics Ambassador Ernest.