0 目的
介绍ROS的实现原理,从最底层分析ROS代码是如何实现的。
1 序列化
1.1 什么是序列化?
尽管笔者从事机器人研发很长时间了,但是在研究ROS的过程中,“序列化”这个词还是这辈子第一次听到。所以可想而知很多人在看到“把一个消息序列化”这样的描述时是如何一脸懵逼。但其实序列化是一个比较常见和常用的东西,你虽然不知道它但一定接触过它。所以,下面我们先介绍“序列化”的一些常识,然后解释ROS里的序列化是怎么做的?
“序列化”(Serialization )的意思是将一个对象转化为字节流。这里说的对象可以理解为“面向对象”里的那个对象,具体的就是存储在内存中的对象数据。与之相反的过程是“反序列化”(Deserialization )。可见序列化是一个计算机概念,它本来跟机器人一毛钱关系也没有。形象的描述,数据对象是一团面,序列化就是将面团挤成一根面条,反序列化就将面条捏回面团。另一个形象的类比是我们在对话或者打电话时,一个人的思想转换成一维的语音,然后在另一个的头脑里重新变成结构化的思想,这也是一种序列化。

面对序列化,很多人心中可能会有很多疑问。
首先,为什么要序列化?或者更具体的说,既然对象的信息本来就是以字节的形式储存在内存中,那为什么要多此一举把一些内存中的字节数据转换成另一种形式的、一维的、连续的字节数据呢?如果我们的程序在内存中存储了一个数字,比如25。那要怎么传递25这个数字给别的程序节点或者把这个数字永久存储起来呢?很简单,直接传递25这个数字(的字节表示,即0X19,当然最终会变成二进制表示11001以高低电平传输存储)或者直接把这个数字(的字节表示)写进硬盘里即可。所以,对于本来就是连续的、一维的、一连串的数据(例如字符串),序列化并不需要做太多东西。可是实际程序操作的对象很少是这么简单的形式,大多数时候我们面对的是包含不同数据类型(int、double、string)的复杂数据结构(比如array、list),它们很可能在内存中是不连续的而是分散在各处。比如ROS的很多消息都包含向量。数据中还有各种指针和引用。而且,如果数据要在运行于不同架构的计算机之上的、由不同编程语言所编写的节点程序之间传递,那问题就更复杂了,它们的字节顺序endianness规定有可能不一样,基本数据类型(比如int)的长度也不一样(有的int是4个字节、有的是8个字节)。这些都不是通过简单地、原封不动地复制粘贴原始数据就能解决的。这时候就需要序列化和反序列化了。所以在程序之间需要通信时(ROS恰好就是这种情况),或者希望保存程序的中间运算结果时,序列化就登场了。在某种程度上,序列化还起到统一标准的作用。

我们把被序列化的东西叫object,它可以是任意的数据结构或者对象:结构体、数组、类的实例等等。把序列化后得到的东西叫archive,它既可以是人类可读的文本形式,也可以是二进制形式。前者比如JSON和XML,这两个是网络应用里最常用的序列化格式,通过记事本就能打开阅读;后者就是原始的二进制文件,比如后缀名是bin的文件,人类是没办法直接阅读一堆的0101或者0XC9D23E72的。
由于序列化是一个比较常用的功能,所以大多数编程语言都会附带用于序列化的库。比如C++、Python、Java等。以C++为例,标准的STL库没有这样的序列化库,但是第三方库Boost提供了
[
2
]
^{[2]}
[2]。Java自带序列化函数,谷歌的protobuf也是一个序列化库,还有Fast-CDR,还有以及不太知名的Cereal,python可以使用第三方的pickle模块实现序列化。
序列化没有什么神秘的,用户可以看看这些开源的序列化库代码,或者自己写个小程序试试简单数据的序列化,例如这个例子,或者这个,有助于更好地理解ROS中的实现。
1.2 ROS中的序列化实现
理解了序列化,再回到ROS。我们发现,ROS没有采用第三方的序列化工具,而是选择自己实现,代码在roscpp_core项目下的roscpp_serialization中,见下图。这个功能涉及的代码量不多,让人吃惊如此重要的功能居然用如此少的代码实现了。


为什么ROS不使用现成的序列化工具或者库呢?可能ROS诞生的时候,有些序列化库可能还不存在,更有可能是现有的工具不是很合适。
1.2.1 serialization.h
核心的函数都在serialization.h里,简而言之,里面使用了C语言标准库的memcpy
函数把消息拷贝到流中。下面来看一下具体的代码。
序列化功能的特点是要处理很多种数据类型,针对每种具体的类型都要实现相应的序列化函数。为了尽量减少代码量,ROS使用了模板的概念,所以代码里有一堆的template<typename T>
。
从后往前梳理,先看Stream
这个结构体吧。在C++里结构体和类基本没什么区别,结构体里也可以定义函数。Stream翻译为流,流是一个计算机中的抽象概念,前面我们提到过字节流,它是什么意思呢?在需要传输数据的时候,我们可以把数据想象成传送带上连续排列的一个个被传送的物体,它们就是一个流。更形象的,可以想象磁带或者图灵机里连续的纸带。在文件读写、使用串口、网络Socket通信等领域,流经常被使用。由于使用很多,流地概念也在演变。想了解更多可以看这里。
struct Stream
{
// Returns a pointer to the current position of the stream
inline uint8_t* getData() { return data_; }
// Advances the stream, checking bounds, and returns a pointer to the position before it was advanced.
// \throws StreamOverrunException if len would take this stream past the end of its buffer
ROS_FORCE_INLINE uint8_t* advance(uint32_t len)
{
uint8_t* old_data = data_;
data_ += len;
if (data_ > end_)
{
// Throwing directly here causes a significant speed hit due to the extra code generated for the throw statement
throwStreamOverrun();
}
return old_data;
}
// Returns the amount of space left in the stream
inline uint32_t getLength() { return static_cast<uint32_t>(end_ - data_); }
protected:
Stream(uint8_t* _data, uint32_t _count) : data_(_data), end_(_data + _count) {}
private:
uint8_t* data_;
uint8_t* end_;
};
注释表明Stream
是个基类,输入输出流IStream
和OStream
都继承自它。Stream
的成员变量data_
是个指针,指向序列化的字节流开始的位置,它的类型是uint8_t
。在Ubuntu系统中,uint8_t
的定义是typedef unsigned char uint8_t;
,所以uint8_t
就是一个字节,可以用size_of()函数检验。data_
指向的空间就是保存字节流的。
输出流类OStream
用来序列化一个对象,它引用了serialize
函数,如下。
struct OStream : public Stream
{
static const StreamType stream_type = stream_types::Output;
OStream(uint8_t* data, uint32_t count) : Stream(data, count) {}
/* Serialize an item to this output stream*/
template<typename T>
ROS_FORCE_INLINE void next(const T& t)
{
serialize(*this, t);
}
template<typename T>
ROS_FORCE_INLINE OStream& operator<<(const T& t)
{
serialize(*this, t);
return *this;
}
};
输入流类IStream
用来反序列化一个字节流,它引用了deserialize
函数,如下。
struct ROSCPP_SERIALIZATION_DECL IStream : public Stream
{
static const StreamType stream_type = stream_types::Input;
IStream(uint8_t* data, uint32_t count) : Stream(data, count) {}
/* Deserialize an item from this input stream */
template<typename T>
ROS_FORCE_INLINE void next(T& t)
{
deserialize(*this, t);
}
template<typename T>
ROS_FORCE_INLINE IStream& operator>>(T& t)
{
deserialize(*this, t);
return *this;
}
};
自然,serialize
函数和deserialize
函数就是改变数据形式的地方,它们的定义在比较靠前的地方。它们都接收两个模板,都是内联函数,然后里面没什么东西,只是又调用了Serializer
类的成员函数write
和read
。所以,serialize
和deserialize
函数就是个二道贩子。
// Serialize an object. Stream here should normally be a ros::serialization::OStream
template<typename T, typename Stream>
inline void serialize(Stream& stream, const T& t)
{
Serializer<T>::write(stream, t);
}
// Deserialize an object. Stream here should normally be a ros::serialization::IStream
template<typename T, typename Stream>
inline void deserialize(Stream& stream, T& t)
{
Serializer<T>::read(stream, t);
}
所以,我们来分析Serializer
类,如下。我们发现,write
和read
函数又调用了类型里的serialize
函数和deserialize
函数。头别晕,这里的serialize
和deserialize
函数跟上面的同名函数不是一回事。注释中说:“Specializing the Serializer class is the only thing you need to do to get the ROS serialization system to work with a type”(要想让ROS的序列化功能适用于其它的某个类型,你唯一需要做的就是特化这个Serializer
类)。这就涉及到的另一个知识点——模板特化(template specialization)。
template<typename T> struct Serializer
{
// Write an object to the stream. Normally the stream passed in here will be a ros::serialization::OStream
template<typename Stream>
inline static void write(Stream& stream, typename boost::call_traits<T>::param_type t)
{
t.serialize(stream.getData(), 0);
}
// Read an object from the stream. Normally the stream passed in here will be a ros::serialization::IStream
template<typename Stream>
inline static void read(Stream& stream, typename boost::call_traits<T>::reference t)
{
t.deserialize(stream.getData());
}
// Determine the serialized length of an object.
inline static uint32_t serializedLength(typename boost::call_traits<T>::param_type t)
{
return t.serializationLength();
}
};
接着又定义了一个带参数的宏函数ROS_CREATE_SIMPLE_SERIALIZER(Type)
,然后把这个宏作用到了ROS中的10种基本数据类型,分别是:uint8_t, int8_t, uint16_t, int16_t, uint32_t, int32_t, uint64_t, int64_t, float, double
。说明这10种数据类型的处理方式都是类似的。看到这里大家应该明白了,write
和read
函数都使用了memcpy
函数进行数据的移动。注意宏定义中的template<>
,这正是模板特化的标志,关键词template
后面跟一对尖括号。关于模板特化可以看这里。
#define ROS_CREATE_SIMPLE_SERIALIZER(Type) \
template<> struct Serializer<Type> \
{ \
template<typename Stream> inline static void write(Stream& stream, const Type v) \
{ \
memcpy(stream.advance(sizeof(v)), &v, sizeof(v) ); \
} \
template<typename Stream> inline static void read(Stream& stream, Type& v) \
{ \
memcpy(&v, stream.advance(sizeof(v)), sizeof(v) ); \
} \
inline static uint32_t serializedLength(const Type&) \
{ \
return sizeof(Type); \
} \
};
ROS_CREATE_SIMPLE_SERIALIZER(uint8_t)
ROS_CREATE_SIMPLE_SERIALIZER(int8_t)
ROS_CREATE_SIMPLE_SERIALIZER(uint16_t)
ROS_CREATE_SIMPLE_SERIALIZER(int16_t)
ROS_CREATE_SIMPLE_SERIALIZER(uint32_t)
ROS_CREATE_SIMPLE_SERIALIZER(int32_t)
ROS_CREATE_SIMPLE_SERIALIZER(uint64_t)
ROS_CREATE_SIMPLE_SERIALIZER(int64_t)
ROS_CREATE_SIMPLE_SERIALIZER(float)
ROS_CREATE_SIMPLE_SERIALIZER(double)
对于其它类型的数据,例如bool
、std::string
、std::vector
、ros::Time
、ros::Duration
、boost::array
等等,它们各自的处理方式有细微的不同,所以不再用上面的宏函数,而是用模板特化的方式每种单独定义,这也是为什么serialization.h这个文件这么冗长。
对于int、double这种单个元素的数据,直接用上面特化的Serializer
类中的memcpy
函数实现序列化。对于vector、array这种多个元素的数据类型怎么办呢?方法是分成几种情况,对于固定长度简单类型的(fixed-size simple types),还是用各自特化的Serializer
类中的memcpy
函数实现,没啥太大区别。对于固定但是类型不简单的(fixed-size non-simple types)或者既不固定也不简单的(non-fixed-size, non-simple types)或者固定但是不简单的(fixed-size, non-simple types),用for循环遍历,一个元素一个元素的单独处理。那怎么判断一个数据是不是固定是不是简单呢?这是在roscpp_traits文件夹中的message_traits.h完成的。其中采用了Type Traits,这是相对高级一点的编程技巧了,笔者也不太懂。
对序列化的介绍暂时就到这里了,有一些细节还没讲,等笔者看懂了再补。
2 消息订阅发布
2.1 ROS的本质
我们来到了ROS最核心的地带。如果问ROS的本质是什么,或者用一句话概括ROS的核心功能。那么,笔者认为ROS就是个通信库,让不同的程序节点能够相互对话。所以,我们就来看看ROS是如何实现通信的。实现通信的代码在ros_comm包中,如下。其中clients文件夹一共有127个文件,看来是最大的包了。


2.2 客户端库
客户端库(client libraries)是干什么的?
3 时间
不只是机器人,在任何一个系统里,时间都是一个绕不开的重要话题。下面我们就从万物的起点——时间开始吧。
ROS中定义时间的程序都在roscpp_core项目下的rostime中,见下图。如果细分一下,时间其实有两种,一种叫“时刻”,也就是某个时间点;一种叫“时段”或者时间间隔,也就是两个时刻之间的部分。这两者的代码是分开实现的,时刻是time,时间间隔是duration。在Ubuntu中把rostime文件夹中的文件打印出来,发现确实有time和duration两类文件,但是还多了个rate,如下图所示。


我们还注意到,里面有 CMakeLists.txt 和 package.xml 两个文件,那说明rostime本身也是个ROS的package,可以单独编译。
3.1 时刻time
先看下文件间的依赖关系。跟时刻有关的文件是两个time.h文件和一个time.cpp文件。time.h给出了类的声明,而impl\time.h给出了类运算符重载的实现,time.cpp给出了其它函数的实现。
3.1.1 TimeBase基类
首先看time.h
文件,它定义了一个叫TimeBase
的类。注释中说,TimeBase
是个基类,定义了两个成员变量uint32_t sec, nsec
,还重载了
+
+
+、
−
-
−、
<
<
<、
>
>
>、
=
=
=等运算符。
成员变量uint32_t sec, nsec
其实就是时间的秒和纳秒两部分,它们合起来构成一个完整的时刻。至于为啥要分成两部分而不是用一个来定义,可能是考虑到整数表示精度的问题。因为32位整数最大表示的数字是2147483647。如果我们要用纳秒这个范围估计是不够的。你可能会问,机器人系统怎么会使用到纳秒这么高精度的时间分辨率,毕竟控制器的定时器最高精度可能也只能到微秒?如果你做过自动驾驶项目,有使用过GPS和激光雷达传感器的经验,就会发现GPS的时钟精度就是纳秒级的,它可以同步激光雷达每个激光点的时间戳。
还有,为什么定义TimeBase
这个基类,原因大家很容易就能猜到。因为在程序里,时间本质上就是一个数字而已,数字系统的序关系(能比较大小)和运算(加减乘除)也同样适用于时间这个东西。当然,这里只有加减没有乘除,因为时间的乘除没有意义。
3.1.2 Time类
紧接着TimeBase
类的是Time
类,它是TimeBase
的子类。我们做机器人应用程序开发时用不到TimeBase
基类,但是Time
类会经常使用。
3.1.3 now()函数
Time
类比TimeBase
类多了now()
函数,它是我们的老熟人了。在开发应用的时候,我们直接用下面的代码就能得到当前的时间戳:
ros::Time begin = ros::Time::now(); //获取当前时间
now()
函数的定义在rostime\src\time.cpp
里,因为它很常用很重要,笔者就把代码粘贴在这里,如下。函数很简单,可以看到,如果定义了使用仿真时间(g_use_sim_time
为true
),那就使用仿真时间,否则就使用墙上时间。
Time Time::now()
{
if (!g_initialized)
throw TimeNotInitializedException();
if (g_use_sim_time)
{
boost::mutex::scoped_lock lock(g_sim_time_mutex);
Time t = g_sim_time;
return t;
}
Time t;
ros_walltime(t.sec, t.nsec);
return t;
}
在ROS里,时间分成两类,一种叫仿真时间,一种叫墙上时间。顾名思义,墙上时间就是实际的客观世界的时间,它一秒一秒地流逝,谁都不能改变它,让它快一点慢一点都不可能,除非你有超能力。仿真时间则是可以由你控制的,让它快它就快。之所以多了一个仿真时间,是因为有时我们在仿真机器人希望可以自己控制时间,例如为了提高验证算法的效率,让它按我们的期望速度推进。
在使用墙上时间的情况下,now()
函数调用了ros_walltime
函数,这个函数也在rostime\src\time.cpp
里。剥开层层洋葱皮,最后发现,这个ros_walltime
函数才是真正调用操作系统时间函数的地方,而且它还是个跨平台的实现(Windows和Linux)。
■ 如果操作系统是Linux,那它会使用clock_gettime
函数,在笔者使用的Ubuntu 18.04系统中这个函数在usr\include
路径下。但是万一缺少这个函数,那么ROS会使用gettimeofday
函数,但是gettimeofday
没有clock_gettime
精确,clock_gettime
能提供纳秒的精确度。
■ 如果操作系统是Windows,那它会使用标准库STL的chrono库获取当前的时刻,要用这个库只需要引用它的头文件,所以在time.cpp
中引用了#include <chrono>
。具体使用的函数就是std::chrono::system_clock::now().time_since_epoch()
。当然,时间得是秒和纳秒的形式,所以用了count
方法:
uint64_t now_ns = std::chrono::duration_cast<std::chrono::nanoseconds>
(std::chrono::system_clock::now().time_since_epoch()).count();
3.1.4 WallTime类
后面又接着声明了WallTime
类和SteadyTime
类。
3.2 时间间隔duration
时间间隔duration的定义与实现跟time时刻差不多,但是Duration类里的sleep()
延时函数是在time.cpp里定义的,其中使用了Linux操作系统的nanosleep
系统调用。这个系统调用虽然叫纳秒,但实际能实现的精度也就是几十毫秒,即便这样也比C语言提供的sleep
函数的精度好多了。如果是Windows系统则调用STL chrono函数:
std::this_thread::sleep_for(std::chrono::nanoseconds(static_cast<int64_t>(sec * 1e9 + nsec)));
3.3 频率rate
关于Rate类,声明注释中是这么解释的“Class to help run loops at a desired frequency”,也就是帮助(程序)按照期望的频率循环执行。
我们看看一个ROS节点是怎么使用Rate类的,如下图所示的例子。

首先用Rate的构造函数实例化一个对象loop_rate
。调用的构造函数如下。可见,构造函数使用输入完成了对三个参数的初始化。然后在While循环里调用sleep()函数实现一段时间的延迟。既然用到延迟,所以就使用了前面的time类。
Rate::Rate(double frequency)
: start_(Time::now())
, expected_cycle_time_(1.0 / frequency)
, actual_cycle_time_(0.0)
{ }
3.4 总结
至此rostime就解释清楚了。可以看到,这部分实现主要是依靠STL标准库函数(比如chrono)、BOOST库(date_time)、Linux操作系统的系统调用以及标准的C函数。这就是ROS的底层了。说句题外话,在百度Apollo自动驾驶项目中也受到了rostime的影响,其cyber\time
中的Time、Duration、Rate类的定义方式与rostime中的类似,但是没有定义基类,实现更加简单了。
参考资料
[1] A Gentle Introduction to Serialization for Python.
[2] An Introduction to Object Serialization in C++.
[3] Concept of serialization.
[4] Data Serialization.
[5] Serialization
[6] Why do we need to convert structured data first to bits or bytes by serialization before we can send it to other servers or programs? How is structured data stored? Is it not stored as bits and bytes?
[7] Understanding serialization