Hướng dẫn tạo một Service Client – Vietnamese ROS2 Tutorial

Hướng dẫn tạo một Service Client – Vietnamese ROS2 Tutorial

This tutorial is created by Robotics Ambassador – Tung

Robotics Ambassador: theconstruct.ai/robotics-ambassador/

Xin chào các bạn!

Hôm nay mình sẽ hướng dẫn các bạn cách tạo một Khách hàng Dịch vụ (Service Client) trong ROS2 sử dụng nền tảng của The Construct.

Nguồn tham khảo

  1. Nền tảng học ROS trực tuyến The Construct
  2. Khóa học ROS2 cơ bản (Python) ROS2 Basics in 5 Days

Trước khi đi vào nội dung chính, nếu bạn nào chưa biết Service trong ROS2 là gì thì mình khuyên các bạn nên ghé qua video mình hướng dẫn các câu lệnh cơ bản với Dịch vụ trong ROS2 trước ở link này nhé:

1. Tạo một package

Đầu tiên, chúng ta sẽ tạo một package chứa code của Client cần tạo.

Tạo một package mới tên là client_pkg trong thư mục ~/ros2_ws/src bằng các lệnh sau:

cd ~/ros2_ws/src
ros2 pkg create client_pkg --build-type ament_python --dependencies rclpy std_srvs

Chúng ta sẽ cần package std_srvs để giao tiếp với các Dịch vụ /moving/stop như trong video trước mình đã hướng dẫn.

2. Tạo một tệp chứa nội dung code của Client

Tiếp theo, trong thư mục client_pkg của package vừa tạo, ta sẽ tạo một tệp mới tên là service_client.py chứa phần code chính của Service Client mà chúng ta muốn tạo. Các bạn hãy copy nội dung phần code sau vào trong file service_client.py vừa tạo:

# import the empty module from std_servs Service interface
from std_srvs.srv import Empty
# import the ROS2 Python client libraries
import rclpy
from rclpy.node import Node

class ClientAsync(Node):

    def __init__(self):
        # Here you have the class constructor

        # call the class constructor to initialize the node as service_client
        super().__init__('service_client')
        # create the Service Client object
        # defines the name and type of the Service Server you will work with.
        self.client = self.create_client(Empty, 'moving')
        # checks once per second if a Service matching the type and name of the Client is available.
        while not self.client.wait_for_service(timeout_sec=1.0):
            # if it is not available, a message is displayed
            self.get_logger().info('service not available, waiting again...')
        
        # create an Empty request
        self.req = Empty.Request()
        

    def send_request(self):
        
        # send the request
        self.future = self.client.call_async(self.req)

def main(args=None):
    # initialize the ROS communication
    rclpy.init(args=args)
    # declare the node constructor
    client = ClientAsync()
    # run the send_request() method
    client.send_request()

    while rclpy.ok():
        # pause the program execution, waits for a request to kill the node (ctrl+c)
        rclpy.spin_once(client)
        if client.future.done():
            try:
                # checks the future for a response from the Service
                # while the system is running. 
                # If the Service has sent a response, the result will be written
                # to a log message.
                response = client.future.result()
            except Exception as e:
                # Display the message on the console
                client.get_logger().info(
                    'Service call failed %r' % (e,))
            else:
                # Display the message on the console
                client.get_logger().info(
                    'the robot is moving' ) 
            break

    client.destroy_node()
    # shutdown the ROS communication
    rclpy.shutdown()

if __name__ == '__main__':
    main()

Mình sẽ giải thích qua đoạn code này nhé. Phần code trong file này cũng đi theo cấu trúc chung của một node trong ROS2. Ở trong hàm main(), sau khi khởi tạo ROS2, chương trình sẽ khai báo object ClientAsync() mà chúng ta đã định nghĩa ở phần trên. Trong phần constructor của class ClientAsync(), lệnh để tạo Client là:

self.client = self.create_client(Empty, 'moving')

Lệnh này sẽ tạo một Client sử dụng kiểu dữ liệu Empty và kết nối tới một Service tên là /moving.

Tiếp theo, vòng lặp while được sử dụng để đảm bảo rằng Service Server (/moving ) đang hoạt động.

while not self.client.wait_for_service(timeout_sec=1.0):

Quay trở lại với hàm main(). Sau khi khởi tạo xong, yêu cầu sẽ được gửi tới server bằng hàmsend_request(). Hàm này rất quan trọng và có nội dung là:

self.future = self.client.call_async(self.req)

Câu lệnh này sẽ gửi một yêu cầu không đồng bộ (asynchronous request) tới Service Server bằng hàm call_async(). Sau đó, lưu phản hồi từ Server về biến self.future. Biến này cũng rất quan trọng. Sau khi tạo yêu cầu, Server sẽ ngay lập tức trả về future cho biết việc gọi và phản hồi được thực hiện (nhưng nó không chứa nội dung của phản hồi).

Cuối cùng, trong hàm main() là vòng lặp while:

while rclpy.ok():
        # pause the program execution, waits for a request to kill the node (ctrl+c)
        rclpy.spin_once(client)
        if client.future.done():
            try:
                # checks the future for a response from the Service
                # while the system is running. 
                # If the Service has sent a response, the result will be written
                # to a log message.
                response = client.future.result()
            except Exception as e:
                # Display the message on the console
                client.get_logger().info(
                    'Service call failed %r' % (e,))
            else:
                # Display the message on the console
                client.get_logger().info(
                    'the robot is moving' ) 
            break

Khi chương trình đang chạy (rclpy.ok()), nó sẽ liên tục kiểm tra xem phản hồi từ dịch vụ đã hoàn thành hay chưa:

if client.future.done():

Nếu nó đã hoàn thành thì chương trình sẽ nhận được giá trị của phản hồi từ Server:

response = client.future.result()

Trong trường hợp này, vì chúng ta đang làm việc với một phản hồi kiểu Empty nên chúng ta sẽ không làm gì cả.

Trong khi đợi phản hồi hoàn thành của Dịch vụ, chương trình sẽ in ra các thông tin vào nhật ký node bằng lệnh:

client.get_logger().info('the robot is moving' ) 

Và cuối cùng phải kể đến dòng lệnh sau:

rclpy.spin_once(client)

Dòng lệnh này tương tự như lệnh rclpy.spin() nhưng chỉ quay vòng node đó một lần.

3. Tạo một tệp launch

Tiếp theo, chúng ta sẽ tạo một file launch để khởi tạo client này.

Tạo một file launch tên là service_client_launch_file.launch.py bằng các lệnh sau:

cd ~/ros2_ws/src/client_pkg
mkdir launch
cd launch
touch service_client_launch_file.launch.py
chmod +x service_client_launch_file.launch.py

Mở file launch vừa tạo và thêm những dòng sau vào:

from launch import LaunchDescription
from launch_ros.actions import Node

def generate_launch_description():
    return LaunchDescription([
        Node(
            package='client_pkg',
            executable='service_client',
            output='screen'),
    ])

4. Sửa tệp setup.py

Sau khi tạo xong file launch, chúng ta sẽ thêm một số dòng code vào file setup.py để cài đặt tệp launch mà chúng ta vừa mới tạo và thêm entry points vào executable cho script service_client.py

from setuptools import setup
import os
from glob import glob

package_name = 'client_pkg'

setup(
    name=package_name,
    version='0.0.0',
    packages=[package_name],
    data_files=[
        ('share/ament_index/resource_index/packages',
            ['resource/' + package_name]),
        ('share/' + package_name, ['package.xml']),
        (os.path.join('share', package_name), glob('launch/*.launch.py'))
    ],
    install_requires=['setuptools'],
    zip_safe=True,
    maintainer='user',
    maintainer_email='user@todo.todo',
    description='TODO: Package description',
    license='TODO: License declaration',
    tests_require=['pytest'],
    entry_points={
        'console_scripts': [
        'service_client = client_pkg.service_client:main',
        ],
    },
)

5. Biên dịch gói vừa tạo

Sau khi code xong, chúng ta sẽ đến với bước biên dịch chương trình. Ta thực hiện các lệnh sau:

cd ~/ros2_ws
colcon build --packages-select client_pkg
source ~/ros2_ws/install/setup.bash

6. Cuối cùng khởi động node Service Server trên Terminal

Node Client vừa tạo được khởi động bằng lệnh sau:

ros2 launch client_pkg service_client_launch_file.launch.py

Sau khi dòng code này chạy, chúng ta sẽ thấy robot bắt đầu di chuyển ngẫu nhiên. Điều này chứng tỏ Service Server /moving đã nhận được yêu cầu từ Client và thực hiện yêu cầu đó.

Chúc các bạn thực hành thành công.

Video Tutorial


Criação de Serviços Personalizados no ROS 2 e Controle de Movimento do Robô em Python – Portuguese ROS Tutorial

Criação de Serviços Personalizados no ROS 2 e Controle de Movimento do Robô em Python – Portuguese ROS Tutorial

This tutorial is created by Robotics Ambassador – Anderson

Robotics Ambassador: theconstruct.ai/robotics-ambassador/

 

O que nós vamos aprender:

  1. Criação de um serviço personalizado no ROS2.
  2. Configuração do ambiente ROS2 para garantir que todas as dependências estão corretamente instaladas e atualizadas.
  3. Criação de um pacote ROS2 incluindo a estrutura de diretórios e os arquivos necessários para o projeto.
  4. Implementação de um servidor e cliente de serviço para controle de movimento do robô.
  5. Criação e edição de arquivos de lançamento (launch files) no ROS2.
  6. Compilação e execução dos pacotes ROS2.
  7. Teste e verificação do serviço para controlar o movimento do robô em diferentes direções.

 

Lista de recursos usados neste post:

  1. Use este rosject: https://app.theconstruct.ai/l/639df241/
  2. The Construct: https://app.theconstructsim.com/
  3. ROS Courses: ROS Basics in 5 Days (C++) https://app.theconstructsim.com/courses/132

 

Panorama

Neste tutorial, vamos aprender a criar serviços personalizados no ROS2 e desenvolver um pacote de controle de movimento para um robô. Vamos passar pelos passos de criação e configuração de serviços, desenvolvimento de um pacote de movimento, implementação do servidor e cliente de serviço, e a criação de arquivos de lançamento. Este tutorial é ideal para quem deseja aprimorar suas habilidades em ROS2 e programação robótica.

 

Abrindo o rosject

Clique no link abaixo para ter acesso ao rosject deste tutorial. Em seguida, clique no botão “RUN”.

https://app.theconstruct.ai/l/639df241/

<figure1>

Criando o Serviço Personalizado

Criação do Pacote de Interfaces Personalizadas

Primeiro, vamos criar um pacote de interfaces personalizadas que conterá nosso serviço. Este pacote será responsável por definir e gerenciar a interface do serviço que iremos usar para controlar o robô.

Copie este bloco de comandos no primeiro terminal:

cd ~/ros2_ws/src
ros2 pkg create --build-type ament_cmake custom_interfaces --dependencies rclcpp std_msgs

Esses comandos criam um novo pacote chamado custom_interfaces com as dependências rclcpp e std_msgs.

Definição do Serviço

Em seguida, vamos definir o serviço que será utilizado para controlar o robô. Para isso, crie uma pasta chamada srv dentro do pacote custom_interfaces e um arquivo Move.srv. Use os comandos:

cd ~/ros2_ws/src/custom_interfaces
mkdir srv
cd srv
touch Move.srv

Edite o arquivo Move.srv com o seguinte conteúdo:

string direction               # Direção para girar (direita ou esquerda)
float64 velocity               # Velocidade angular (em rad/s) ou linear (em m/s)
int32 time                     # Duração do giro (em segundos)
---
bool success                   # O comando foi bem-sucedido?

Esse arquivo define a estrutura do serviço. Ele recebe a direção, velocidade e tempo como entrada, e retorna um booleano indicando se a operação foi bem-sucedida.

Atualizando os Arquivos CMakeLists.txt e package.xml

Agora, precisamos configurar os arquivos CMakeLists.txt e package.xml para que o ROS2 possa construir e utilizar nosso serviço.

No arquivo CMakeLists.txt, adicione:

find_package(rosidl_default_generators REQUIRED)
.
.
.
rosidl_generate_interfaces(${PROJECT_NAME}
"srv/Move.srv"
)

Essas linhas garantem que o gerador de interfaces do ROS2 processe nosso arquivo Move.srv.

No arquivo package.xml, adicione:

<build_depend>rosidl_default_generators</build_depend>
<exec_depend>rosidl_default_runtime</exec_depend>
<member_of_group>rosidl_interface_packages</member_of_group>

Essas dependências são necessárias para a construção e execução do nosso serviço.

Compilação e Verificação

Agora, compile o pacote e verifique se o serviço foi criado corretamente.

cd ~/ros2_ws
colcon build --packages-select custom_interfaces
source ~/ros2_ws/install/setup.bash

Para verificar se a mensagem de serviço foi criada:

ros2 interface show custom_interfaces/srv/Move

Criando o Pacote de Movimento

Vamos criar um pacote que implementará a lógica de movimento do robô, incluindo um servidor e um cliente de serviço.

cd ~/ros2_ws/src
ros2 pkg create --build-type ament_python movement_pkg --dependencies rclpy custom_interfaces std_msgs geometry_msgs sensor_msgs

Este comando cria um pacote chamado movement_pkg com as dependências necessárias.

Crie os arquivos necessários com os comandos:
cd ~/ros2_ws/src/movement_pkg/movement_pkg
touch movement_server.py movement_client.py

Implementação do Servidor de Serviço

Crie e edite o arquivo movement_server.py com o seguinte conteúdo:


from geometry_msgs.msg import Twist
from custom_interfaces.srv import Move
import rclpy
from rclpy.node import Node
import time

class Service(Node):
    def __init__(self):
        super().__init__('move_server')
        self.srv = self.create_service(Move, 'move', self.custom_service_callback)
        self.publisher_ = self.create_publisher(Twist, 'cmd_vel', 10)

    def custom_service_callback(self, request, response):
        msg = Twist()
        if request.direction == "right":
            msg.angular.z = -request.velocity
        elif request.direction == "left":
            msg.angular.z = request.velocity
        elif request.direction == "forward":
            msg.linear.x = request.velocity
        elif request.direction == "backward":
            msg.linear.x = -request.velocity
        elif request.direction == "stop":
            msg.linear.x = 0.0
            msg.angular.z = 0.0
            self.publisher_.publish(msg)
            response.success = True
            return response
        else:
            response.success = False
            return response

        self.publisher_.publish(msg)
        time.sleep(request.time)
        msg.linear.x = 0.0
        msg.angular.z = 0.0
        self.publisher_.publish(msg)
        response.success = True
        return response

def main(args=None):
    rclpy.init(args=args)
    service = Service()
    rclpy.spin(service)
    rclpy.shutdown()

if __name__ == '__main__':
    main()

Explicação do código:

  • Importamos as bibliotecas necessárias.
  • Definimos a classe Service, que herda de Node.
  • No método __init__, criamos o serviço e o publicador.
  • O método custom_service_callback define a lógica de controle do robô com base nos parâmetros recebidos.
  • No final, iniciamos e rodamos o nó ROS2.

Implementação do Cliente de Serviço

Crie e edite o arquivo movement_client.py com o seguinte conteúdo:


from custom_interfaces.srv import Move
import rclpy
from rclpy.node import Node
import sys

class ClientAsync(Node):
    def __init__(self):
        super().__init__('move_client')
        self.client = self.create_client(Move, 'move')
        while not self.client.wait_for_service(timeout_sec=1.0):
            self.get_logger().info('service not available, waiting again...')
        self.req = Move.Request()

    def send_request(self):
        self.req.direction = str(sys.argv[1])
        self.req.velocity = float(sys.argv[2])
        self.req.time = int(sys.argv[3])
        self.future = self.client.call_async(self.req)

def main(args=None):
    rclpy.init(args=args)
    client = ClientAsync()
    client.send_request()

    while rclpy.ok():
        rclpy.spin_once(client)
        if client.future.done():
            try:
                response = client.future.result()
            except Exception as e:
                client.get_logger().info('Service call failed %r' % (e,))
            else:
                client.get_logger().info('Response state %r' % (response.success,))
            break

    client.destroy_node()
    rclpy.shutdown()

if __name__ == '__main__':
    main()

Explicação do código:

  • Importamos as bibliotecas necessárias.
  • Definimos a classe ClientAsync, que herda de Node.
  • No método __init__, criamos o cliente de serviço e esperamos que o serviço esteja disponível.
  • O método send_request envia a solicitação ao serviço com os parâmetros fornecidos.
  • No final, iniciamos e rodamos o nó ROS2.

Criando o Arquivo de Lançamento

cd ~/ros2_ws/src/movement_pkg
mkdir launch
cd launch
touch movement_server_launch_file.launch.py
chmod +x movement_server_launch_file.launch.py

Edite o arquivo movement_server_launch_file.launch.py com o seguinte conteúdo:


from launch import LaunchDescription
from launch_ros.actions import Node

def generate_launch_description():
    return LaunchDescription([
        Node(
            package='movement_pkg',
            executable='movement_server',
            output='screen'),
    ])

Este arquivo de lançamento define um nó que executa o servidor de movimento.

Então sua estrutura de pastas e arquivos deve ficar assim:

Modificação do arquivo setup.py

Atualize o arquivo setup.py com o seguinte conteúdo:


from setuptools import setup
import os
from glob import glob

package_name = 'movement_pkg'

setup(
    name=package_name,
    version='0.0.0',
    packages=[package_name],
    data_files=[
        ('share/ament_index/resource_index/packages',
            ['resource/' + package_name]),
        ('share/' + package_name, ['package.xml']),
        (os.path.join('share', package_name), glob('launch/*.launch.py'))
    ],
    install_requires=['setuptools'],
    zip_safe=True,
    maintainer='user',
    maintainer_email='user@todo.todo',
    description='TODO: Package description',
    license='TODO: License declaration',
    tests_require=['pytest'],
    entry_points={
        'console_scripts': [
        'movement_server = movement_pkg.movement_server:main',
        'movement_client = movement_pkg.movement_client:main'
        ],
    },
)

Este arquivo define a configuração do pacote, incluindo as entradas do console que apontam para nossos scripts Python.

Compilação e Execução

Compile o pacote movement_pkg:

cd ~/ros2_ws
colcon build --packages-select movement_pkg
source ~/ros2_ws/install/setup.bash

Lançando o Servidor

No primeiro terminal, execute:

ros2 launch movement_pkg movement_server_launch_file.launch.py

Executando o Cliente

No segundo terminal, execute:

source ~/ros2_ws/install/setup.bash
ros2 run movement_pkg movement_client left 0.2 5

No segundo terminal você verá:

Enquanto no primeiro terminal aparecerá:

Isso indica que o serviço foi realizado corretamente.

Na janela do Gazebo, você pode ver o robô se movimentando pelo ambiente.

<figure7>

Outras opções, além de left, são right, forward, backward. Experimente mudar os parametros do comando anterior variando também a velocidade e a duraçao do movimento do robô (respectivamente segundo e terceiro parametros do comando).

Conclusão

Esperamos que este tutorial tenha sido útil e que você tenha aprendido como criar serviços personalizados no ROS2 e controlar o movimento de um robô. Se você gostou deste conteúdo, considere explorar mais recursos do ROS2 e experimentar novas possibilidades para expandir suas habilidades em robótica.

 Vídeo no YouTube

Este foi o post de hoje. Lembre-se que nós temos uma versão deste post em vídeo no YouTube.

Se você gostou do conteúdo, por favor, considere se inscrever no nosso canal do YouTube. Estamos publicando novos conteúdos quase todos os dias.

Continue avançando no seu aprendizado de ROS.

 

How to set/get parameters from another node | ROS 2 Humble Python Tutorial

How to set/get parameters from another node | ROS 2 Humble Python Tutorial

What you will learn:

Parameters in ROS 2 are associated with individual nodes, it is not possible to work with another node’s parameters simply by using the Node API. For example, a node calling get_parameter() can only get its own parameters.

In this tutorial, you’ll learn how to get and set another node’s parameters by using a service call in code. All other parameter services should be straightforward as they are similar.

If you want to learn more advanced ROS 2 topics, including parameters and more, in a practical, hands-on way, check out the course Intermediate ROS 2: https://app.theconstruct.ai/courses/113

Opening the rosject

In order to follow this tutorial, we need to have ROS 2 installed in our system, and ideally a ros2_ws (ROS2 Workspace). To make your life easier, we have already prepared a rosject for that: https://app.theconstruct.ai/l/639b9a55/

Just by copying the rosject (clicking the link above), you will have a setup already prepared for you.

After the rosject has been successfully copied to your own area, you should see a Run button. Just click that button to launch the rosject.

After pressing the Run button, you should have the rosject loaded. Now, let’s head to the next section to get some real practice.

In order to interact with ROS2, we need a terminal.

Let’s open a terminal by clicking the Open a new terminal button.

Open a new Terminal

Create a ROS2 python package with a simple node that has two parameters

In a terminal, create a python package called python_parameters:

cd ~/ros2_ws/src/
ros2 pkg create --build-type ament_python python_parameters --dependencies rclpy

In the same terminal, create a python node called python_parameters_node.py:

touch python_parameters/python_parameters/python_parameters_node.py

Here is a simple node with two parameters of type string (to paste in python_parameters_node.py):

  • The parameters are:
    • my_parameter with value me and description This parameter is mine!
    • your_parameter with value you and description This parameter is yours!

    You can also have other types as listed in the documentation.

  • The node has a timer callback with a period of 1 second just to log what the current parameters are.
import rclpy
import rclpy.node
from rcl_interfaces.msg import ParameterDescriptor

class MinimalParam(rclpy.node.Node):
    def __init__(self):
        super().__init__('minimal_param_node')
        
        my_parameter_descriptor = ParameterDescriptor(description='This parameter is mine!')
        self.declare_parameter('my_parameter', 'me', my_parameter_descriptor)

        your_parameter_descriptor = ParameterDescriptor(description='This parameter is yours!')
        self.declare_parameter('your_parameter', 'you', your_parameter_descriptor)

        self.timer = self.create_timer(1, self.timer_callback)

    def timer_callback(self):
        my_param = self.get_parameter('my_parameter').get_parameter_value().string_value

        self.get_logger().info('I am %s!' % my_param)

        your_param = self.get_parameter('your_parameter').get_parameter_value().string_value

        self.get_logger().info('You are %s!' % your_param)

def main():
    rclpy.init()
    node = MinimalParam()
    rclpy.spin(node)

if __name__ == '__main__':
    main()

Add an entry point to the setup.py file so that it looks like this:

    entry_points={
        'console_scripts': [
            'minimal_param_node = python_parameters.python_parameters_node:main',
        ],
    },

In a terminal, compile the package:

cd ~/ros2_ws/
colcon build --packages-select python_parameters

Then start the node:

source ~/ros2_ws/install/setup.bash
ros2 run python_parameters minimal_param_node

You should get something like this:

[INFO] [1713695285.349594469] [minimal_param_node]: I am me!
[INFO] [1713695285.349875641] [minimal_param_node]: You are you!
[INFO] [1713695286.337758113] [minimal_param_node]: I am me!
[INFO] [1713695286.338776447] [minimal_param_node]: You are you!
[INFO] [1713695287.337323765] [minimal_param_node]: I am me!
[INFO] [1713695287.338010397] [minimal_param_node]: You are you!

Create a simple client node that retrieves another node’s parameters

The goal is to get the parameter values of the node that we just created in the previous section (python_parameters_node.py) from within a new client node.

Create a new Python file for the client node called get_parameters_client_node.py:

touch ~/ros2_ws/src/python_parameters/python_parameters/get_parameters_client_node.py

Here is a simple asynchronous client (to paste in get_parameters_client_node.py):

  • It creates a client with service type GetParameters and service name /minimal_param_node/get_parameters
  • The request message needs to be a list of names of parameters whose values we want to get. In this case, we chose to retrieve the values of both my_parameter and your_parameter
    • Remember that we can use the terminal command ros2 interface show rcl_interfaces/srv/GetParameters to determine the request (and response) fields
  • The send_request method is passed the request message and handles the service call
import rclpy
from rclpy.node import Node
from rcl_interfaces.srv import GetParameters

class MinimalGetParamClientAsync(Node):

    def __init__(self):
        super().__init__('minimal_get_param_client_async')

        self.cli = self.create_client(GetParameters, '/minimal_param_node/get_parameters')

        while not self.cli.wait_for_service(timeout_sec=1.0):
            self.get_logger().info('service not available, waiting again...')
        
        self.req = GetParameters.Request()

    def send_request(self, params_name_list):
        self.req.names = params_name_list

        self.future = self.cli.call_async(self.req)
        rclpy.spin_until_future_complete(self, self.future)
        return self.future.result()

def main():
    rclpy.init()

    minimal_get_param_client = MinimalGetParamClientAsync()

    list_of_params_to_get = ['my_parameter', 'your_parameter']
    response = minimal_get_param_client.send_request(list_of_params_to_get)
    minimal_get_param_client.get_logger().info('First value: %s, Second value: %s'  % (response.values[0].string_value, response.values[1].string_value))

    minimal_get_param_client.destroy_node()
    rclpy.shutdown()

if __name__ == '__main__':
    main()

Add the entry point to the setup.py file so that it looks like this:

...

    entry_points={
        'console_scripts': [
            'minimal_param_node = python_parameters.python_parameters_node:main',
            'minimal_get_param_client_async = python_parameters.get_parameters_client_node:main',
        ],
    },
)

Compile the package:

cd ~/ros2_ws/
colcon build --packages-select python_parameters

Run the client node in one terminal:

source ~/ros2_ws/install/setup.bash
ros2 run python_parameters minimal_get_param_client_async

And the service node in a different terminal:

source ~/ros2_ws/install/setup.bash
ros2 run python_parameters minimal_param_node

You should get something like this in the terminal of the client:

[INFO] [1719039475.331557220] [minimal_get_param_client_async]: service not available, waiting again...
[INFO] [1719039476.333952497] [minimal_get_param_client_async]: service not available, waiting again...
[INFO] [1719039477.336702012] [minimal_get_param_client_async]: service not available, waiting again...
[INFO] [1719039478.338849982] [minimal_get_param_client_async]: service not available, waiting again...
[INFO] [1719039478.591154332] [minimal_get_param_client_async]: First value: me, Second value: you
            
...

Since the client was started first, it was initially waiting for the GetParameters service to be available. After the service started, the client successfully retrieved the parameter values before shutting down.

End all terminal processes before continuing.

Create a simple client node that modifies another node’s parameters

The process here is similar to the previous section, only this time we are setting the parameter values.

Create a new Python file for the client node called set_parameters_client_node.py:

touch ~/ros2_ws/src/python_parameters/python_parameters/set_parameters_client_node.py

Here is a simple asynchronous client (to paste in set_parameters_client_node.py):

  • It creates a client with service type SetParameters and service name /minimal_param_node/set_parameters
  • The request message needs to be a list of the Parameter msg type which has sub-fields to store the parameter name, data type and value
    • Remember that we can use the terminal command ros2 interface show rcl_interfaces/srv/SetParameters to determine the request (and response) fields
  • The send_request method is passed the new parameter values to set and handles the service call
import rclpy
from rclpy.node import Node
from rcl_interfaces.srv import SetParameters
from rcl_interfaces.msg import Parameter, ParameterType

class MinimalSetParamClientAsync(Node):

    def __init__(self):
        super().__init__('minimal_set_param_client_async')

        self.cli = self.create_client(SetParameters, '/minimal_param_node/set_parameters')

        while not self.cli.wait_for_service(timeout_sec=1.0):
            self.get_logger().info('service not available, waiting again...')
        
        self.req = SetParameters.Request()

    def send_request(self, my_parameter_value, your_parameter_value):
        param = Parameter()
        param.name = "my_parameter"
        param.value.type = ParameterType.PARAMETER_STRING
        param.value.string_value = my_parameter_value
        self.req.parameters.append(param)

        param = Parameter()
        param.name = "your_parameter"
        param.value.type = ParameterType.PARAMETER_STRING
        param.value.string_value = your_parameter_value
        self.req.parameters.append(param)

        self.future = self.cli.call_async(self.req)
        rclpy.spin_until_future_complete(self, self.future)
        return self.future.result()

def main():
    rclpy.init()

    minimal_set_param_client = MinimalSetParamClientAsync()

    my_parameter_value = 'a teacher'
    your_parameter_value = 'a student'
    response = minimal_set_param_client.send_request(my_parameter_value, your_parameter_value)
    minimal_set_param_client.get_logger().info('Results: %s'  % (response))

    minimal_set_param_client.destroy_node()
    rclpy.shutdown()

if __name__ == '__main__':
    main()

Add the entry point to the setup.py file so that it looks like this:

...

    entry_points={
        'console_scripts': [
            'minimal_param_node = python_parameters.python_parameters_node:main',
            'minimal_get_param_client_async = python_parameters.get_parameters_client_node:main',
            'minimal_set_param_client_async = python_parameters.set_parameters_client_node:main',
        ],
    },
)

Compile the package:

cd ~/ros2_ws/
colcon build --packages-select python_parameters

Run the service node first:

source ~/ros2_ws/install/setup.bash
ros2 run python_parameters minimal_param_node

Then the client node in another terminal:

source ~/ros2_ws/install/setup.bash
ros2 run python_parameters minimal_set_param_client_async

You should get something like this in the terminal of the client:

[INFO] [1719045088.071267960] [minimal_set_param_client_async]: Results: rcl_interfaces.srv.SetParameters_Response(results=[rcl_interfaces.msg.SetParametersResult(successful=True, reason=''), rcl_interfaces.msg.SetParametersResult(successful=True, reason='')])

And this in the terminal of the service node:

[INFO] [1719045085.623969364] [minimal_param_node]: You are you!
[INFO] [1719045086.613542065] [minimal_param_node]: I am me!
[INFO] [1719045086.614106714] [minimal_param_node]: You are you!
[INFO] [1719045087.613402451] [minimal_param_node]: I am me!
[INFO] [1719045087.613898409] [minimal_param_node]: You are you!
[INFO] [1719045088.613406942] [minimal_param_node]: I am a teacher!
[INFO] [1719045088.613964772] [minimal_param_node]: You are a student!
[INFO] [1719045088.613406942] [minimal_param_node]: I am a teacher!
[INFO] [1719045088.613964772] [minimal_param_node]: You are a student!
    
...

Just like in my previous tutorial (How to manipulate multiple parameters at once from the command line using parameter services), you managed to set multiple parameters at once- only this time it was from within another node! This opens up the possibility of scripted interaction with external parameters rather than being restricted to just the CLI!

Now you know how to interact with parameter services in code.

Congratulations. You now have a basic understanding of parameters and are familiar with creating and using services.

To learn more advanced topics about ROS 2, have a look at the course below:

We hope this post was really helpful to you.

This tutorial is created by Robotics Ambassador Ernest.

Video Tutorial

 

Combine Publisher, Subscriber & Service in ROS2 Single Node | ROS2 Tutorial

Combine Publisher, Subscriber & Service in ROS2 Single Node | ROS2 Tutorial

What we are going to learn

  1. Learn how to combine a Publisher, a Subscriber, and a Service in the same ROS2 Node

List of resources used in this post

  1. Use this rosject: https://app.theconstructsim.com/l/5c13606c/
  2. The Construct: https://app.theconstructsim.com/
  3. ROS2 Courses –▸
    1. ROS2 Basics in 5 Days (Python): https://app.theconstructsim.com/Course/132
    2. ROS2 Basics in 5 Days (C++): https://app.theconstructsim.com/Course/133
  4. Apple Detector: https://shrishailsgajbhar.github.io/post/OpenCV-Apple-detection-counting
  5. Banana Detector: https://github.com/noorkhokhar99/Open-CV-Banana-Detection

Overview

ROS2 (Robot Operating System version 2) is becoming the de facto standard “framework” for programming robots.

In this post, we are going to learn how to combine a Publisher, a Subscriber, and a Service in the same ROS2 Node.

ROS Inside!

ROS Inside

ROS Inside

Before anything else, if you want to use the logo above on your own robot or computer, feel free to download it and attach it to your robot. It is really free. Find it in the link below:

ROS Inside logo

Opening the rosject

In order to follow this tutorial, we need to have ROS2 installed in our system, and ideally a ros2_ws (ROS2 Workspace). To make your life easier, we have already prepared a rosject for that: https://app.theconstructsim.com/l/5c13606c/.

Just by copying the rosject (clicking the link above), you will have a setup already prepared for you.

After the rosject has been successfully copied to your own area, you should see a Run button. Just click that button to launch the rosject (below you have a rosject example).

Learn ROS2 Parameters - Run rosject

Combining Publisher, Subscriber & Service in ROS2 Single Node – Run rosject (example of the RUN button)

 

After pressing the Run button, you should have the rosject loaded. Now, let’s head to the next section to get some real practice.

Creating the required files

In order to interact with ROS2, we need a terminal.

Let’s open a terminal by clicking the Open a new terminal button.

 

Open a new Terminal

Open a new Terminal

 

In this rosject we cloned the https://bitbucket.org/theconstructcore/fruit_detector and placed it inside the ~/ros2_ws/src folder. You can see its content with the following command in the terminal:

ls ~/ros2_ws/src/fruit_detector/
the following output should be produced:
custom_interfaces  pub_sub_srv_ros2_pkg_example
A new file called pubsubserv_example.py was created inside the fruit_detector/pub_sub_srv_ros2_pkg_example/scripts folder.
The command for creating that file was:
touch  ~/ros2_ws/src/fruit_detector/pub_sub_srv_ros2_pkg_example/scripts/pubsubserv_example.py
You could have created that file also using the Code Editor.

If you want to use the Code Editor, also known as IDE (Integrated Development Environment), you just have to open it as indicated in the image below:

Open the IDE - Code Editor

Open the IDE – Code Editor

 

The following content was pasted to that file:
#! /usr/bin/env python3
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import Image
from custom_interfaces.srv import StringServiceMessage
import os
import cv2
from cv_bridge import CvBridge
import ament_index_python.packages as ament_index

class CombineNode(Node):

    def __init__(self, dummy=True):
        super().__init__('combine_node')

        self._dummy= dummy

        self.pkg_path = self.get_package_path("pub_sub_srv_ros2_pkg_example")
        self.scripts_path = os.path.join(self.pkg_path,"scripts")
        cascade_file_path = os.path.join(self.scripts_path,'haarbanana.xml')

        self.banana_cascade = cv2.CascadeClassifier(cascade_file_path)


        self.bridge = CvBridge()

        self.publisher = self.create_publisher(Image, 'image_detected_fruit', 10)
        self.subscription = self.create_subscription(
            Image, '/box_bot_1/box_bot_1_camera/image_raw', self.image_callback, 10)

        self.string_service = self.create_service(
            StringServiceMessage, 'detect_fruit_service', self.string_service_callback)
        

        self.get_logger().info(f'READY CombineNode')



    def get_package_path(self, package_name):
        try:
            package_share_directory = ament_index.get_package_share_directory(package_name)
            return package_share_directory
        except Exception as e:
            print(f"Error: {e}")
            return None


    def image_callback(self, msg):
        self.get_logger().info('Received an image.')
        self.current_image = msg


    def string_service_callback(self, request, response):
        # Handle the string service request
        self.get_logger().info(f'Received string service request: {request.detect}')
        
        
        if request.detect == "apple":
            if self._dummy:
                # Generate and publish an image related to apple detections
                apple_image = self.generate_apple_detection_image()
                self.publish_image(apple_image)
            else:
                self.detect_and_publish_apple()
        elif request.detect == "banana":
            if self._dummy:
                # Generate and publish an image related to banana detections
                banana_image = self.generate_banana_detection_image()
                self.publish_image(banana_image)
            else:
                self.detect_and_publish_banana()
        else:
            # If no specific request           
            unknown_image = self.generate_unknown_detection_image()
            self.publish_image(unknown_image)

        # Respond with success and a message
        response.success = True
        response.message = f'Received and processed: {request.detect}'
        return response

    def generate_unknown_detection_image(self):

        self.apple_img_path = os.path.join(self.scripts_path,'unknown.png')
        self.get_logger().warning("Unknown path="+str(self.apple_img_path))
        # Replace this with your actual image processing logic
        # In this example, we create a simple red circle on a black image
        image = cv2.imread(self.apple_img_path)  # Replace with your image path
        if image is None:
            self.get_logger().error("Failed to load the unknown image.")
        else:
            self.get_logger().warning("SUCCESS to load the unknown image.")
        return self.bridge.cv2_to_imgmsg(image, encoding="bgr8")


    def generate_apple_detection_image(self):

        self.apple_img_path = os.path.join(self.scripts_path,'apple.png')
        self.get_logger().warning("Apple path="+str(self.apple_img_path))
        # Replace this with your actual image processing logic
        # In this example, we create a simple red circle on a black image
        image = cv2.imread(self.apple_img_path)  # Replace with your image path
        if image is None:
            self.get_logger().error("Failed to load the apple image.")
        else:
            self.get_logger().warning("SUCCESS to load the apple image.")
        return self.bridge.cv2_to_imgmsg(image, encoding="bgr8")

    def generate_banana_detection_image(self):
        self.banana_img_path = os.path.join(self.scripts_path,'banana.png')
        self.get_logger().warning("Banana path="+str(self.banana_img_path))
        # Replace this with your actual image processing logic
        # In this example, we create a simple yellow circle on a black image
        image = cv2.imread(self.banana_img_path)  # Replace with your image path
        if image is None:
            self.get_logger().error("Failed to load the banana image.")
        else:
            self.get_logger().warning("SUCCESS to load the banana image.")
        return self.bridge.cv2_to_imgmsg(image, encoding="bgr8")

    def publish_image(self, image_msg):
        if image_msg is not None:
            self.publisher.publish(image_msg)


    def detect_and_publish_apple(self):
        if self.current_image is not None:
            frame = self.bridge.imgmsg_to_cv2(self.current_image, desired_encoding="bgr8")

            # Your apple detection code here (from approach_2.py)
            # HIGH=====
            # (0.0, 238.935, 255.0)
            # LOW=====
            # (1.8, 255.0, 66.045)


            low_apple_raw = (0.0, 80.0, 80.0)
            high_apple_raw = (20.0, 255.0, 255.0)

            image_hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)

            mask = cv2.inRange(image_hsv, low_apple_raw, high_apple_raw)

            cnts, _ = cv2.findContours(mask.copy(), cv2.RETR_EXTERNAL,
                                    cv2.CHAIN_APPROX_SIMPLE)
            c_num = 0
            radius = 10
            for i, c in enumerate(cnts):
                ((x, y), r) = cv2.minEnclosingCircle(c)
                if r > radius:
                    print("OK="+str(r))
                    c_num += 1
                    cv2.circle(frame, (int(x), int(y)), int(r), (0, 255, 0), 2)
                    cv2.putText(frame, "#{}".format(c_num), (int(x) - 10, int(y)),
                                cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 0, 0), 2)
                else:
                    print(r)

            

            # Publish the detected image as a ROS 2 Image message
            image_msg = self.bridge.cv2_to_imgmsg(frame, encoding="bgr8")
            self.publish_image(image_msg)

        else:
            self.get_logger().error("Image NOT found")


    def detect_and_publish_banana(self):
        self.get_logger().warning("detect_and_publish_banana Start")
        if self.current_image is not None:
            self.get_logger().warning("Image found")
            frame = self.bridge.imgmsg_to_cv2(self.current_image, desired_encoding="bgr8")
            gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)

            bananas = self.banana_cascade.detectMultiScale(
                gray, scaleFactor=1.1, minNeighbors=5, minSize=(30, 30), flags=cv2.CASCADE_SCALE_IMAGE)

            for (x, y, w, h) in bananas:
                cv2.rectangle(frame, (x, y), (x+w, y+h), (0, 255, 0), 3)
                cv2.putText(frame, 'Banana', (x-10, y-10),
                            cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0))

            # Publish the detected image as a ROS 2 Image message
            self.get_logger().warning("BananaDetection Image Publishing...")
            image_msg = self.bridge.cv2_to_imgmsg(frame, encoding="bgr8")
            self.publish_image(image_msg)
            self.get_logger().warning("BananaDetection Image Publishing...DONE")
        else:
            self.get_logger().error("Image NOT found")
    


def main(args=None):
    rclpy.init(args=args)
    node = CombineNode(dummy=False)
    rclpy.spin(node)
    node.destroy_node()
    rclpy.shutdown()

if __name__ == '__main__':
    main()
After creating that Python file, we also modified the CMakeLists.txt file of the pub_sub_srv_ros2_pkg_example package:
~/ros2_ws/src/fruit_detector/pub_sub_srv_ros2_pkg_example/CMakeLists.txt
We basically added ‘scripts/pubsubserv_example.py‘ to the list of files to be installed when we build our ros2 workspace.
In the end, the content of ~/ros2_ws/src/fruit_detector/pub_sub_srv_ros2_pkg_example/CMakeLists.txt is like this:
cmake_minimum_required(VERSION 3.8)
project(pub_sub_srv_ros2_pkg_example)

if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang")
  add_compile_options(-Wall -Wextra -Wpedantic)
endif()

find_package(ament_cmake REQUIRED)
find_package(sensor_msgs REQUIRED)
find_package(std_srvs REQUIRED)
find_package(custom_interfaces REQUIRED)


if(BUILD_TESTING)
  find_package(ament_lint_auto REQUIRED)
  set(ament_cmake_copyright_FOUND TRUE)
  set(ament_cmake_cpplint_FOUND TRUE)
  ament_lint_auto_find_test_dependencies()
endif()

# We add it to be able to use other modules of the scripts folder
install(DIRECTORY
  scripts
  rviz
  DESTINATION share/${PROJECT_NAME}
)

install(PROGRAMS
	scripts/example1_dummy.py
  	scripts/example1.py
	scripts/example1_main.py
	scripts/pubsubserv_example.py
  DESTINATION lib/${PROJECT_NAME}
)

ament_package()

We then compiled specifically the pub_sub_srv_ros2_pkg_example package using the following command:
cd ~/ros2_ws/

source install/setup.bash
colcon build --packages-select pub_sub_srv_ros2_pkg_example

After the package is compiled, we could run that python script using the following command:

cd ~/ros2_ws

source install/setup.bash

ros2 run pub_sub_srv_ros2_pkg_example pubsubserv_example.py
After running that script you are not going to see any output because we are not printing anything.
But, let’s try to list the services in a second terminal by typing ros2 node list. If everything goes well, we should be able to see the combine_node node:
$ ros2 node list

/combine_node

Launching the simulation

So far we can’t see what our node is capable of.
Let’s launch a simulation so that we can understand our node better.
For that, let’s run the following command in a third terminal:
ros2 launch box_bot_gazebo garden_main.launch.xml
A simulation similar to the following should appear in a few seconds:
Combine Publisher, Subscriber & Service in ROS2 Single Node - Simulation

Combine Publisher, Subscriber & Service in ROS2 Single Node – Simulation

After launching the simulation, in the first terminal where we launched our node, we should start seeing messages like the following:
...
[INFO] [1699306370.709477898] [combine_node]: Received an image.
[INFO] [1699306373.374917545] [combine_node]: Received an image.
[INFO] [1699306376.390623360] [combine_node]: Received an image.
[INFO] [1699306379.277906884] [combine_node]: Received an image
...
We will soon understand what these messages mean.

See what the robot sees through rviz2

Now that the simulation is running, we can open rviz2 (ROS Visualization version 2).
To make it easier for you to see the robot model, and the robot camera, a fruit.rviz file was created at ~/ros2_ws/src/fruit_detector/pub_sub_srv_ros2_pkg_example/rviz/fruit.rviz.
You can tell rviz2 to load that config file using the following command:
rviz2 --display-config ~/ros2_ws/src/fruit_detector/pub_sub_srv_ros2_pkg_example/rviz/fruit.rviz
A new screen should pop up in a few seconds, and you should be able to see what the robot camera sees, as well as the robot model.
The ROS2 Topic that we set for the camera was /box_bot_1/box_bot_1_camera/image_raw. You can find this topic if you list the topics in another terminal using ros2 topic list.
If you look at the topic that we subscribe to at the __init__ method of the CombineNode class, it is exactly this topic:
self.subscription = self.create_subscription( Image, '/box_bot_1/box_bot_1_camera/image_raw', self.image_callback, 10)
When a new Image message comes, the image_callback method is called. It essentially saves the image in an internal variable called current_image:
def image_callback(self, msg): 
    self.get_logger().info('Received an image.') 
    self.current_image = msg
At the __init__ method we also created a service for analyzing an image and detecting whether or not it contains a banana:
    def __init__(self, dummy=True):
        super().__init__('combine_node')
       
        # ...
        self.string_service = self.create_service(
            StringServiceMessage, 'detect_fruit_service', self.string_service_callback)

    def string_service_callback(self, request, response):
        # Handle the string service request
        self.get_logger().info(f'Received string service request: {request.detect}')
        
        
        if request.detect == "apple":
            if self._dummy:
                # Generate and publish an image related to apple detections
                apple_image = self.generate_apple_detection_image()
                self.publish_image(apple_image)
            else:
                self.detect_and_publish_apple()
        elif request.detect == "banana":
            if self._dummy:
                # Generate and publish an image related to banana detections
                banana_image = self.generate_banana_detection_image()
                self.publish_image(banana_image)
            else:
                self.detect_and_publish_banana()
        else:
            # If no specific request           
            unknown_image = self.generate_unknown_detection_image()
            self.publish_image(unknown_image)

        # Respond with success and a message
        response.success = True
        response.message = f'Received and processed: {request.detect}'
        return response

By analyzing the code above, we see that when the detect_fruit_service service that we created is called, it calls the string_service_callback method that is responsible for detecting bananas and apples.
Now, going back to the messages we see in the first terminal:
...
[INFO] [1699306370.709477898] [combine_node]: Received an image.
[INFO] [1699306373.374917545] [combine_node]: Received an image.
[INFO] [1699306376.390623360] [combine_node]: Received an image.
[INFO] [1699306379.277906884] [combine_node]: Received an image
...
These messages basically say that we are correctly receiving the Image messages from the /box_bot_1/box_bot_1_camera/image_raw topic mentioned earlier.
If we list the services, we should find a service named . Let’s try it, by running the following command in a free terminal:
ros2 service list

You should see a huge number of services, and among them, you should be able to find the following one, that we created:

/detect_fruit_service
By the way, the reason why we have so many services is that the Gazebo simulator generates a lot of services, making it easier to interact with Gazebo using ROS2.
Now, let’s call that service. In order to detect an apple, a banana, and a strawberry, we could run the following commands respectively:
ros2 service call /detect_fruit_service custom_interfaces/srv/StringServiceMessage "detect: 'apple'"
ros2 service call /detect_fruit_service custom_interfaces/srv/StringServiceMessage "detect: 'banana'"
ros2 service call /detect_fruit_service custom_interfaces/srv/StringServiceMessage "detect: 'strawberry'"
If you don’t understand the commands we have been using so far, I highly recommend you take the ROS2 Basics course: https://app.theconstructsim.com/courses/132.
Alright. After calling the service to detect a banana, we should have an output similar to the following:
requester: making request: custom_interfaces.srv.StringServiceMessage_Request(detect='banana') 

response: custom_interfaces.srv.StringServiceMessage_Response(success=True, message='Received and processed: banana')
Indicating that the service correctly detected a banana.
If you check the logs in the first terminal where we launched our node, you will also see a message similar to the following:
BananaDetection Image Publishing...
So, as you can see, we have in the same ROS2 Node a Publisher, a Subscriber, and a Service.

Congratulations. Now you know how to combine different ROS2 pieces in a single node.

We hope this post was really helpful to you. If you want a live version of this post with more details, please check the video in the next section.

Youtube video

So this is the post for today. Remember that we have the live version of this post on YouTube. If you liked the content, please consider subscribing to our YouTube channel. We are publishing new content ~every day.

Keep pushing your ROS Learning.

Related Courses & Training

If you want to learn more about ROS and ROS2, we recommend the following courses:

Get ROS2 Industrial Ready- Hands-On Training by The Construct cover.png

Get ROS2 Industrial Ready- Hands-On Training by The Construct cover.png

Pin It on Pinterest