# ROS2基础教程 - 4 话题通信
# 4.1 话题简介
在搭建 ROS2 环境的时候,演示了小海龟案例,其中启动了两个节点,一个是窗口显示小海龟的节点,一个是键盘控制的节点。
键盘为什么能控制小海龟窗口的运动呢,这里就涉及到节点之间的通信。在机器人的组件中,有摄像头、雷达等传感器,这些传感器都是一个个节点,他们会采集数据,并将数据传递给其他一个或多个节点进行显示或处理。
ROS 中节点间的通信有很多方式,例如话题、服务、参数服务器等,其中话题是用的最多的一种通信方式,像摄像头数、雷达等数据都是话题在节点之间进行传输的。
在 ROS2 中,话题(Topics)是一种用于不同节点之间进行异步通信的机制,话题通信的实现依赖于发布/订阅模式,通过底层的 DDS(数据分发服务)来确保消息的可靠传递。在话题通信中,发送数据的对象称为发布者(Publisher),接收数据的对象称为订阅者(Subscriber)。发布者将数据发送到指定的 Topic ,订阅 Topic 的订阅者都可以获取到数据,同时传输的数据也有固定的数据类型。
下面通过一个简单的例子来演示话题的使用。
- 创建一个发布者,定时发布 Hello World!到指定的 Topic;
- 创建一个订阅者,订阅该Topic,接收Hello World!
# 4.2 Python实现话题通信
首先在 工作空间/src 创建一个功能包,我这里叫 topic_test_python:
ros2 pkg create --build-type ament_python topic_test_python
在 topic_test_python/topic_test_python 创建两个节点,一个是发布消息的节点,一个是订阅消息的节点。
# 1 发布消息节点
首先创建一个节点,用来发布消息。
创建节点文件 hello_publisher.py,内容如下:
import rclpy  # 导入 ROS2 Python 客户端库
from rclpy.node import Node  # 从 rclpy 导入 Node 类
from std_msgs.msg import String  # 导入标准消息类型 String
class PublisherNode(Node):
    def __init__(self):
        super().__init__('hello_publisher')  # 初始化节点,节点名为 my_publisher
        self.publish()     # 发布消息
        
    def publish(self):
        self.publisher_ = self.create_publisher(String, 'my_topic', 10)  # 创建一个发布者,发布 String 类型消息到 my_topic,队列长度为 10
        timer_period = 1  # 设置定时器频率为 1 秒,可以为小数
        self.timer = self.create_timer(timer_period, self.timer_callback)  # 创建定时器,每 1 秒调用一次 timer_callback 方法
    def timer_callback(self):
        msg = String()  # 创建一个 String 类型的消息对象
        msg.data = 'Hello World!'  # 设置消息内容
        self.publisher_.publish(msg)  # 发布消息到 my_topic
        self.get_logger().info(f'Published: {msg.data}')  # 打印发布的消息内容到日志
def main(args=None):
    rclpy.init(args=args)  # 初始化 ROS2
    publisher = PublisherNode()  # 创建节点
    rclpy.spin(publisher)  # 持续运行节点,处理回调
    publisher.destroy_node()  # 销毁节点
    rclpy.shutdown()  # 关闭 ROS2
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
在上面的代码中,使用 create_publisher 创建了一个发布者,指定了发布消息的类型、topic、队列的长度。同时创建了一个定时器,每秒执行一次,封装了发布消息的对象,使用发布者发布消息。
# 2 订阅消息节点
上面发布消息,下面创建一个节点用来订阅消息。
创建节点文件 hello_subscriber.py,内容如下:
import rclpy  # 导入 ROS2 Python 客户端库
from rclpy.node import Node  # 从 rclpy 导入 Node 类
from std_msgs.msg import String  # 导入标准消息类型 String
class SubscriberNode(Node):
    def __init__(self):
        super().__init__('hello_subscriber')  # 初始化节点,节点名为 my_subscriber
        self.subscribe()  # 调用订阅
    def subscribe(self):
        self.subscription = self.create_subscription(String, 'my_topic', self.listener_callback, 10)  # 创建订阅者,订阅 my_topic,接收 String 类型消息,队列长度为 10
    # 接收消息的回调
    def listener_callback(self, msg):
        self.get_logger().info(f'Received: {msg.data}')  # 打印接收到的消息内容到日志
def main(args=None):
    rclpy.init(args=args)  # 初始化 ROS2
    subscriber = SubscriberNode()  # 创建节点
    rclpy.spin(subscriber)  # 持续运行节点,处理回调
    subscriber.destroy_node()  # 销毁节点
    rclpy.shutdown()  # 关闭 ROS2
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
在上面的代码中,使用 create_subscription 创建一个订阅者,指定了消息的类型、topic、回调方法、队列长度,接收到消息后,会调用回调方法。
# 3 配置节点
在功能包下的 setup.py 中配置两个节点,在 entry_points 中配置:
entry_points={
    'console_scripts': [
        'hello_publisher = topic_test_python.hello_publisher:main',
        'hello_subscriber= topic_test_python.hello_subscriber:main',
    ],
},
2
3
4
5
6
# 4 构建项目
现在代码已经编写完成了,需要构建项目。
在工作空间下执行如下命令:
colcon build
构建完成,会在 install 目录下生成文件。
# 5 运行节点
首先执行 source 命令,在工作空间下执行:
source install/local_setup.sh
因为是刚构建,打开终端虽然已经source了,但是还没有包括新建的功能包,所以这里还需要 source 一下,后面就不需要了。
然后打开一个终端,启动发布者节点:
ros2 run topic_test_python hello_publisher
此时发布节点就已经在发布消息了:

我们可以打开一个终端,使用如下命令查看发布的消息:
# 查看有哪些topic,可以看到/my_topic
ros2 topic list
# 可以使用echo打印topic中发布的内容
ros2 topic echo /my_topic   # 查看/my_topic的内容
2
3
4
5
上面命令会显示 topic 中的内容。
打开另一个终端,运行订阅者节点:
ros2 run topic_test_python hello_subscriber
可以看到,终端不停的打印接收到的消息:

# 4.3 C++实现话题通信
首先在 工作空间/src 创建一个功能包,我这里叫 topic_test_cpp:
ros2 pkg create --build-type ament_cmake topic_test_cpp
在 topic_test_cpp/src 创建两个节点,一个是发布消息的节点,一个是订阅消息的节点。
# 1 发布消息节点
创建节点文件 hello_publisher.cpp,内容如下:
#include <rclcpp/rclcpp.hpp>  // 导入 ROS2 C++ 客户端库
#include <std_msgs/msg/string.hpp>  // 导入标准消息类型 String
class PublisherNode : public rclcpp::Node {  // 创建一个继承自 rclcpp::Node 的类
public:
    PublisherNode() : Node("hello_publisher") {  // 构造函数,初始化节点名称为 hello_publisher
        publish();  // 调用 publish 方法
    }
    void publish() {  // 定义发布方法
        publisher_ = this->create_publisher<std_msgs::msg::String>("my_topic", 10);  // 创建发布者,发布 String 类型消息到 my_topic,队列长度为 10
        timer_ = this->create_wall_timer(  // 创建定时器,每 1 秒调用一次 timer_callback 方法
            std::chrono::seconds(1),
            std::bind(&PublisherNode::timer_callback, this)  // 绑定成员函数
        );
    }
private:
    void timer_callback() {  // 定时器回调函数
        auto msg = std_msgs::msg::String();  // 创建一个 String 类型的消息对象
        msg.data = "Hello World!";  // 设置消息内容
        publisher_->publish(msg);  // 发布消息到 my_topic
        RCLCPP_INFO(this->get_logger(), "Published: '%s'", msg.data.c_str());  // 打印发布的消息内容到日志
    }
    rclcpp::Publisher<std_msgs::msg::String>::SharedPtr publisher_;  // 定义发布者的共享指针
    rclcpp::TimerBase::SharedPtr timer_;  // 定义定时器的共享指针
};
int main(int argc, char * argv[]) {
    rclcpp::init(argc, argv);  // 初始化 ROS2
    auto publisher_node = std::make_shared<PublisherNode>();  // 创建 PublisherNode 实例
    rclcpp::spin(publisher_node);  // 持续运行节点,处理回调
    rclcpp::shutdown();  // 关闭 ROS2
    return 0;  // 返回 0,表示程序正常结束
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
在上面的代码中,使用 create_publisher 创建了一个发布者,指定了发布消息的类型、topic、队列的长度。同时创建了一个定时器,每秒执行一次,封装了发布消息的对象,使用发布者发布消息。
# 2 订阅消息节点
创建节点文件 hello_subscriber.cpp,内容如下:
#include <rclcpp/rclcpp.hpp>  // 导入 ROS2 C++ 客户端库
#include <std_msgs/msg/string.hpp>  // 导入标准消息类型 String
class SubscriberNode : public rclcpp::Node {  // 创建一个继承自 rclcpp::Node 的类
public:
    SubscriberNode() : Node("hello_subscriber") {  // 构造函数,初始化节点名称为 hello_subscriber
        subscribe();  // 调用 subscribe 方法
    }
    void subscribe() {  // 定义订阅方法
        subscription_ = this->create_subscription<std_msgs::msg::String>(  // 创建订阅者,订阅 my_topic,接收 String 类型消息,队列长度为 10
            "my_topic",
            10,
            std::bind(&SubscriberNode::listener_callback, this, std::placeholders::_1)  // 指定回调函数
        );
    }
private:
    void listener_callback(const std_msgs::msg::String::SharedPtr msg) {  // 订阅回调函数
        RCLCPP_INFO(this->get_logger(), "Received: '%s'", msg->data.c_str());  // 打印接收到的消息内容到日志
    }
    rclcpp::Subscription<std_msgs::msg::String>::SharedPtr subscription_;  // 定义订阅者的共享指针
};
int main(int argc, char * argv[]) {
    rclcpp::init(argc, argv);  // 初始化 ROS2
    auto subscriber_node = std::make_shared<SubscriberNode>();  // 创建 SubscriberNode 实例
    rclcpp::spin(subscriber_node);  // 持续运行节点,处理回调
    rclcpp::shutdown();  // 关闭 ROS2
    return 0;  // 返回 0,表示程序正常结束
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
在上面的代码中,使用 create_subscription 创建一个订阅者,指定了消息的类型、topic、回调方法、队列长度,接收到消息后,会调用回调方法。
# 3 配置CMakeLists.txt
在功能包下的 CMakeLists.txt 中,添加节点配置,可以添加在 find_package(ament_cmake REQUIRED) 下面。
因为用到了 String 类型,所以需要配置。
# 找到此包需要的其他包
find_package(rclcpp REQUIRED)
find_package(std_msgs REQUIRED)   # 还需要std_msgs
# 添加可执行文件,指向 hello_publisher.cpp
add_executable(hello_publisher src/hello_publisher.cpp)
ament_target_dependencies(hello_publisher rclcpp std_msgs)   # 需要rclcpp std_msgs
# 添加可执行文件,指向 hello_subscriber.cpp
add_executable(hello_subscriber src/hello_subscriber.cpp)
ament_target_dependencies(hello_subscriber rclcpp std_msgs)   # 需要rclcpp std_msgs
# 安装可执行文件
install(TARGETS
  hello_publisher
  hello_subscriber
  DESTINATION lib/${PROJECT_NAME}
)
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
上面配置了 hello_publisher 和 hello_subscriber 两个节点。
# 4 构建项目
现在代码已经编写完成了,需要构建项目。
在工作空间下执行如下命令:
colcon build
构建完成,会在 install 目录下生成文件。
# 5 运行节点
首先执行 source 命令,在工作空间下执行:
source install/local_setup.sh
因为是刚构建,打开终端虽然已经source了,但是还没有包括新建的功能包,所以这里还需要 source 一下,后面就不需要了。
然后打开一个终端,启动发布者节点:
ros2 run topic_test_cpp hello_publisher
打开另一个终端,运行订阅者节点:
ros2 run topic_test_cpp hello_subscriber
# 4.3 控制小海龟运动
小海龟窗口接收键盘控制节点发送的速度消息,然后进行显示处理。也就是说小海龟窗口订阅了某个话题,键盘控制节点发送速度消息到这个话题,从而实现控制。同样,我们也可以发布消息到这个话题,来实现小海龟的运动。
下面就使用 Python 来实现,发布消息给小海龟,控制小海龟的运动。
# 1 查看节点信息
首先把小海龟窗口节点运行起来:
ros2 run turtlesim turtlesim_node 
然后可以使用命令查看有哪些运动的节点:
# 查看所有运行的节点
ros2 node list
2
可以使用下面的命令查看 turtlesim_node 节点的详细信息:
ros2 node info /turtlesim
显示如下,包括了节点订阅、发布和服务等信息:

可以看到节点订阅了一个 /turtle1/cmd_vel 的 topic,消息类型为 geometry_msgs/msg/Twist ,我们只需要按照格式发布消息即可。
# 2 使用节点控制小海龟
首先在 工作空间/src 创建一个功能包,我这里叫 control_turtle_python:
ros2 pkg create --build-type ament_python control_turtle_python
在 control_turtle_python/control_turtle_python 创建一个节点,发布 geometry_msgs/msg/Twist 消息,控制乌龟的运动。
control_turtle.py,内容如下:
import rclpy
from rclpy.node import Node
from geometry_msgs.msg import Twist
class ControllerTurtle(Node):
    def __init__(self):
        super().__init__('turtle_controller')
        # 创建发布者,发布到 /turtle1/cmd_vel 话题
        self.publisher_ = self.create_publisher(Twist, '/turtle1/cmd_vel', 10)
        # 定时器,用于不停的发布速度消息
        self.timer = self.create_timer(0.1, self.publish_velocity_command)
    # 发布速度消息
    def publish_velocity_command(self):
        # 创建并设置速度消息
        twist = Twist()
        twist.linear.x = 1.0  # 设置线速度,向前运动
        twist.angular.z = 0.5  # 设置角速度,逆时针旋转
        # 发布消息
        self.publisher_.publish(twist)
        self.get_logger().info('Publishing velocity command: linear.x=1.0, angular.z=0.5')
def main(args=None):
    rclpy.init(args=args)
    controller_turtle = ControllerTurtle()
    try:
        rclpy.spin(controller_turtle)
    except KeyboardInterrupt:
        pass
    # 销毁节点并关闭 rclpy
    controller_turtle.destroy_node()
    rclpy.shutdown()
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
在上面的节点中,通过定时器,不停的向 /turtle1/cmd_vel 话题,不停的发布速度信息。
上面发布的速度信息包括线速度和角速度,线速度包括 X/Y/Z 轴上的速度,角速度包括 X/Y/Z 轴上的旋转速度,坐标系参见 TF坐标变换 章节。
# 3 配置节点
在功能包下的 setup.py 中配置两个节点,在 entry_points 中配置:
entry_points={
    'console_scripts': [
        'control_turtle = control_turtle_python.control_turtle:main'
    ],
},
2
3
4
5
# 4 编译和运行
编译节点后,首先运行小海龟的窗口节点,然后在运行上面的节点。
# 构建项目
colcon build
# 运行小海龟节点
ros2 run turtlesim turtlesim_node
# 运行发布消息节点
ros2 run control_turtle_python control_turtle
2
3
4
5
6
7
8
发布消息节点不停的向 /turtle1/cmd_vel 话题发布速度信息:

小海龟窗口接收到速度消息,就会不停的运动:
# 4.4 topic常用的终端命令
上面用到了一些命令,如下:
查看有哪些 topic:
ros2 topic list
查看所有运行的节点:
ros2 node list
查看节点信息:
ros2 node info 节点名称
可以使用 ros2 topic echo 打印指定 topic 中发布的内容:
# 查看/turtle1/cmd_vel话题中发布的内容
ros2 topic echo /turtle1/cmd_vel
2
通过上面的的命令,可以查看 /turtle1/cmd_vel 话题中发布的消息(如果有人往这个话题发布信息的话,就可以看到)。
同样,我们也可以使用终端命令,向指定的话题发布消息:
例如我们可以使用终端命令向 /turtle1/cmd_vel 话题中发布速度消息,控制小海龟的运动。
ros2 topic pub -r 10 /turtle1/cmd_vel geometry_msgs/msg/Twist "{\"linear\": {\"x\": 1.0, \"y\": 0.0, \"z\": 0.0}, \"angular\": {\"x\": 0.0, \"y\": 0.0, \"z\": 0.5}}"
# 4.5 ROS开发思路
通过话题通信,这里简单介绍一下 ROS 中实现功能的逻辑。
例如想通过摄像头实现图像识别,那么我们首先问一下摄像头的厂商有没有提供 ROS 的依赖包,如果有,那么就可以集成到我们的工作空间中,然后编译运行,这样就可以通过硬件厂商提供的 ROS 包获取到硬件的数据,一般都是以话题的形式发布数据,这样我们就可以通过订阅硬件发布的话题来获取到数据。
如果硬件没有提供 ROS 的依赖包,那么就看一下如何获取到硬件数据,例如如何获取到笔记本的摄像头数据,知道如何获取硬件数据后,我们可以自己封装一个节点,来获取硬件的数据,然后将数据通过话题发布出来。
然后我们就可以再编写另外一个节点,用来实现我们具体想实现的功能,在实现功能的节点中,订阅话题,来获取硬件的数据,例如获取摄像头数据、雷达数据等,获取到数据后,如何实现想要的功能,就和在不在 ROS 中实现区别不大了,你想实现人脸识别或图像识别,获取到摄像头数据,直接去做就可以了。
← 03-HelloWorld 05-服务通信 →
