Apollo 笔记(03)— Cyber RT Python 接口(channel 读和写、server/client 通信、record 文件读写信息查询、timer 时间定时器操作)

2023-11-03

https://cyber-rt.readthedocs.io/en/latest/CyberRT_Python_API.html
https://cyber-rt.readthedocs.io/en/latest/api/pythonapi.html

1. 背景

Cyber RT 核心代码是由 C++ 开发,同时为了方便开发者,提供了 Python 接口。

2. Cyber RT Python 接口实现思路

Cyber RT Python 接口的实现思路是在 Cyber RT C++ 实现的基础上,做了一层 Python 的封装,由 Python 来调用 C++ 的实现函数。Cyber RTPython Wrapper 的实现没有使用 swig 等第三方工具,完全自主实现,以此保证代码的高可维护性和可读性。

3. 主要接口

目前提供的主要接口包括:

  • channel 读、写
  • server/client 通信
  • record 信息查询
  • record 文件读、写
  • Time/Duration/Rate 时间操作
  • Timer 定时器

3.1 Channel 读写接口

使用步骤是:

  1. 首先创建 Node;
  2. 创建对应的 readerwriter
  3. 如果是向 channel 写数据,调用 writerwrite 接口;
  4. 如果是从 channel 读数据,调用 nodespin,并在回调函数 callback 中对消息进行处理;

接口定义如下:

class Node:
    """
    Class for Cyber RT Node wrapper.
    """

    def create_writer(self, name, data_type, qos_depth=1):
        """
        create a topic writer for send message to topic.
        @param self
        @param name str: topic name
        @param data_type proto: message class for serialization
        """

   def create_reader(self, name, data_type, callback, args=None):
        """
        create a topic reader for receive message from topic.
        @param self
        @param name str: topic name
        @param data_type proto: message class for serialization
        @callback fn: function to call (fn(data)) when data is
                   received. If args is set, the function must
                   accept the args as a second argument,
                   i.e. fn(data, args)
        @args any: additional arguments to pass to the callback
        """
   def create_client(self, name, request_data_type, response_data_type):
   """
   """
   def create_service(self, name, req_data_type, res_data_type, callback, args=None):

    def spin(self):
        """
        spin in wait and process message.
        @param self
        """

class Writer(object):
    """
    Class for Cyber RT writer wrapper.
    """
    def write(self, data):
        """
        writer msg string
        """

3.2 Record 接口

Record 读的操作是:

  1. 创建一个 RecordReader;
  2. Record 进行迭代读;

Record 写的操作是:

  1. 创建一个 RecordReader;
  2. Record 进行写消息;

接口定义如下:

class RecordReader(object):
    """
    Class for Cyber RT RecordReader wrapper.
    """
    def read_messages(self, start_time=0, end_time=18446744073709551615):
        """
        read message from bag file.
        @param self
        @param start_time:
        @param end_time:
        @return: generator of (message, data_type, timestamp)
        """
	def get_messagenumber(self, channel_name):
        """
        return message count.
        """
	def get_messagetype(self, channel_name):
        """
        return message type.
        """
	def get_protodesc(self, channel_name):
        """
        return message protodesc.
        """
	def get_headerstring(self):
        """
        return message header string.
        """
	def reset(self):
        """
        return reset.
        """
        return _Cyber_RECORD.PyRecordReader_Reset(self.record_reader)

     def get_channellist(self):
        """
        return channel list.
        """
        return _Cyber_RECORD.PyRecordReader_GetChannelList(self.record_reader)


class RecordWriter(object):
    """
    Class for Cyber RT RecordWriter wrapper.
    """
	def open(self, path):
        """
        open record file for write.
        """
	def write_channel(self, channel_name, type_name, proto_desc):
        """
        writer channel by channelname,typename,protodesc
        """
	def write_message(self, channel_name, data, time, raw = True):
        """
        writer msg:channelname,data,time,is data raw
        """

	def set_size_fileseg(self, size_kilobytes):
        """
        return filesegment size.
        """

    def set_intervaltime_fileseg(self, time_sec):
        """
        return file interval time.
        """

    def get_messagenumber(self, channel_name):
        """
        return message count.
        """

    def get_messagetype(self, channel_name):
        """
        return message type.
        """

    def get_protodesc(self, channel_name):
        """
        return message protodesc.
        """

3.3 Time 接口

class Time(object):
	@staticmethod
    def now():
        time_now = Time(_CYBER_TIME.PyTime_now())
        return time_now

    @staticmethod
    def mono_time():
        mono_time = Time(_CYBER_TIME.PyTime_mono_time())
        return mono_time

    def to_sec(self):
        return _CYBER_TIME.PyTime_to_sec(self.time)

    def to_nsec(self):
        return _CYBER_TIME.PyTime_to_nsec(self.time)

    def sleep_until(self, nanoseconds):
        return _CYBER_TIME.PyTime_sleep_until(self.time, nanoseconds)

3.4 Timer 接口


class Timer(object):
    
    def set_option(self, period, callback, oneshot=0):
        '''
        period The period of the timer, unit is ms
        callback The tasks that the timer needs to perform
        oneshot 1: perform the callback only after the first timing cycle
        0:perform the callback every timed period
        '''
        
        
    def start(self):
            
            
    def stop(self):

4. 例子

4.1 读 channel (参见 cyber/python/cyber_py3/examples/listener.py)

"""Module for example of listener."""
from cyber.python.cyber_py3 import cyber
from cyber.proto.unit_test_pb2 import ChatterBenchmark


def callback(data):
    """
    Reader message callback.
    """
    print("=" * 80)
    print("py:reader callback msg->:")
    print(data)
    print("=" * 80)


def test_listener_class():
    """
    Reader message.
    """
    print("=" * 120)
    test_node = cyber.Node("listener")
    test_node.create_reader("channel/chatter", ChatterBenchmark, callback)
    test_node.spin()


if __name__ == '__main__':
    cyber.init()
    test_listener_class()
    cyber.shutdown()

apollo 镜像中执行命令

export PYTHONPATH=/apollo/bazel-bin/cyber/python/internal:/apollo:/apollo/cyber/proto

/apollo/cyber 目录下执行

python -m grpc_tools.protoc -I=./proto --python_out=./proto --grpc_python_out=./proto  ./proto/unit_test.proto

用于生成以下两个文件 unit_test_pb2.pyunit_test_pb2_grpc.py

4.2 写 channel(参见 cyber/python/cyber_py3/examples/talker.py)

"""Module for example of talker."""
import time

from cyber.python.cyber_py3 import cyber
from cyber.proto.unit_test_pb2 import ChatterBenchmark


def test_talker_class():
    """
    Test talker.
    """
    msg = ChatterBenchmark()
    msg.content = "py:talker:send Alex!"
    msg.stamp = 9999
    msg.seq = 0
    print(msg)
    test_node = cyber.Node("node_name1")
    g_count = 1

    writer = test_node.create_writer("channel/chatter", ChatterBenchmark, 6)
    while not cyber.is_shutdown():
        time.sleep(1)
        g_count = g_count + 1
        msg.seq = g_count
        msg.content = "I am python talker."
        print("=" * 80)
        print("write msg -> %s" % msg)
        writer.write(msg)


if __name__ == '__main__':
    cyber.init("talker_sample")
    test_talker_class()
    cyber.shutdown()

分别执行

python cyber/python/cyber_py3/examples/listener.py

python cyber/python/cyber_py3/examples/talker.py

listener.py 打印如下:

========================================================================================================================
================================================================================
py:reader callback msg->:
stamp: 9999
seq: 2
content: "I am python talker."

================================================================================
================================================================================
py:reader callback msg->:
stamp: 9999
seq: 3
content: "I am python talker."

================================================================================

talker.py 打印如下:

stamp: 9999
seq: 0
content: "py:talker:send Alex!"

================================================================================
write msg -> stamp: 9999
seq: 2
content: "I am python talker."

================================================================================
write msg -> stamp: 9999
seq: 3
content: "I am python talker."

================================================================================

4.3 读写消息到 Record 文件(参见 cyber/python/cyber_py3/examples/record.py)

"""
Module for example of record.
Run with:
    bazel run //cyber/python/cyber_py3/examples:record
"""

import time

from google.protobuf.descriptor_pb2 import FileDescriptorProto

from cyber.proto.unit_test_pb2 import Chatter
from cyber.python.cyber_py3 import record
from modules.common.util.testdata.simple_pb2 import SimpleMessage


MSG_TYPE = "apollo.common.util.test.SimpleMessage"
MSG_TYPE_CHATTER = "apollo.cyber.proto.Chatter"


def test_record_writer(writer_path):
    """
    Record writer.
    """
    fwriter = record.RecordWriter()
    fwriter.set_size_fileseg(0)
    fwriter.set_intervaltime_fileseg(0)

    if not fwriter.open(writer_path):
        print('Failed to open record writer!')
        return
    print('+++ Begin to writer +++')

    # Writer 2 SimpleMessage
    msg = SimpleMessage()
    msg.text = "AAAAAA"

    file_desc = msg.DESCRIPTOR.file
    proto = FileDescriptorProto()
    file_desc.CopyToProto(proto)
    proto.name = file_desc.name
    desc_str = proto.SerializeToString()
    print(msg.DESCRIPTOR.full_name)
    fwriter.write_channel(
        'simplemsg_channel', msg.DESCRIPTOR.full_name, desc_str)
    fwriter.write_message('simplemsg_channel', msg, 990, False)
    fwriter.write_message('simplemsg_channel', msg.SerializeToString(), 991)

    # Writer 2 Chatter
    msg = Chatter()
    msg.timestamp = 99999
    msg.lidar_timestamp = 100
    msg.seq = 1

    file_desc = msg.DESCRIPTOR.file
    proto = FileDescriptorProto()
    file_desc.CopyToProto(proto)
    proto.name = file_desc.name
    desc_str = proto.SerializeToString()
    print(msg.DESCRIPTOR.full_name)
    fwriter.write_channel('chatter_a', msg.DESCRIPTOR.full_name, desc_str)
    fwriter.write_message('chatter_a', msg, 992, False)
    msg.seq = 2
    fwriter.write_message("chatter_a", msg.SerializeToString(), 993)

    fwriter.close()


def test_record_reader(reader_path):
    """
    Record reader.
    """
    freader = record.RecordReader(reader_path)
    time.sleep(1)
    print('+' * 80)
    print('+++ Begin to read +++')
    count = 0
    for channel_name, msg, datatype, timestamp in freader.read_messages():
        count += 1
        print('=' * 80)
        print('read [%d] messages' % count)
        print('channel_name -> %s' % channel_name)
        print('msgtime -> %d' % timestamp)
        print('msgnum -> %d' % freader.get_messagenumber(channel_name))
        print('msgtype -> %s' % datatype)
        print('message is -> %s' % msg)
        print('***After parse(if needed),the message is ->')
        if datatype == MSG_TYPE:
            msg_new = SimpleMessage()
            msg_new.ParseFromString(msg)
            print(msg_new)
        elif datatype == MSG_TYPE_CHATTER:
            msg_new = Chatter()
            msg_new.ParseFromString(msg)
            print(msg_new)


if __name__ == '__main__':
    test_record_file = "/tmp/test_writer.record"

    print('Begin to write record file: {}'.format(test_record_file))
    test_record_writer(test_record_file)

    print('Begin to read record file: {}'.format(test_record_file))
    test_record_reader(test_record_file)
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Apollo 笔记(03)— Cyber RT Python 接口(channel 读和写、server/client 通信、record 文件读写信息查询、timer 时间定时器操作) 的相关文章

  • 使用 pythonbrew 编译 Python 3.2 和 2.7 时出现问题

    我正在尝试使用构建多个版本的 python蟒蛇酿造 http pypi python org pypi pythonbrew 0 7 3 但我遇到了一些测试失败 这是在运行的虚拟机上 Ubuntu 8 04 32 位 当我使用时会发生这种情
  • 没有名为 crypto.cipher 的模块

    我现在正在尝试加密一段时间 我最近得到了这个基于 python 的密码器 名为PythonCrypter https github com jbertman PythonCrypter 我对 Python 相当陌生 当我尝试通过终端打开 C
  • Django 代理模型的继承和多态性

    我正在开发一个我没有启动的 Django 项目 我面临着一个问题遗产 我有一个大模型 在示例中简化 称为MyModel这应该代表不同种类的物品 的所有实例对象MyModel应该具有相同的字段 但方法的行为根据项目类型的不同而有很大差异 到目
  • SQLAlchemy 通过关联对象声明式多对多自连接

    我有一个用户表和一个朋友表 它将用户映射到其他用户 因为每个用户可以有很多朋友 这个关系显然是对称的 如果用户A是用户B的朋友 那么用户B也是用户A的朋友 我只存储这个关系一次 除了两个用户 ID 之外 Friends 表还有其他字段 因此
  • Python - StatsModels、OLS 置信区间

    在 Statsmodels 中 我可以使用以下方法拟合我的模型 import statsmodels api as sm X np array 22000 13400 47600 7400 12000 32000 28000 31000 6
  • Flask 会话变量

    我正在用 Flask 编写一个小型网络应用程序 当两个用户 在同一网络下 尝试使用应用程序时 我遇到会话变量问题 这是代码 import os from flask import Flask request render template
  • 如何在 Python 中检索 for 循环中的剩余项目?

    我有一个简单的 for 循环迭代项目列表 在某些时候 我知道它会破裂 我该如何退回剩余的物品 for i in a b c d e f g try some func i except return remaining items if s
  • 如何使用 Ansible playbook 中的 service_facts 模块检查服务是否存在且未安装在服务器中?

    我用过service facts检查服务是否正在运行并启用 在某些服务器中 未安装特定的软件包 现在 我如何知道这个特定的软件包没有安装在该特定的服务器上service facts module 在 Ansible 剧本中 它显示以下错误
  • 根据列值突出显示数据框中的行?

    假设我有这样的数据框 col1 col2 col3 col4 0 A A 1 pass 2 1 A A 2 pass 4 2 A A 1 fail 4 3 A A 1 fail 5 4 A A 1 pass 3 5 A A 2 fail 2
  • Spark KMeans 无法处理大数据吗?

    KMeans 有几个参数training http spark apache org docs latest api python pyspark mllib html highlight kmeans pyspark mllib clus
  • 以编程方式停止Python脚本的执行? [复制]

    这个问题在这里已经有答案了 是否可以使用命令在任意行停止执行 python 脚本 Like some code quit quit at this point some more code that s not executed sys e
  • 绘制方程

    我正在尝试创建一个函数 它将绘制我告诉它的任何公式 import numpy as np import matplotlib pyplot as plt def graph formula x range x np array x rang
  • BeautifulSoup 中的嵌套标签 - Python

    我在网站和 stackoverflow 上查看了许多示例 但找不到解决我的问题的通用解决方案 我正在处理一个非常混乱的网站 我想抓取一些数据 标记看起来像这样 table tbody tr tr tr td td td table tr t
  • 如何使用Python创建历史时间线

    So I ve seen a few answers on here that helped a bit but my dataset is larger than the ones that have been answered prev
  • IO 密集型任务中的 Python 多线程

    建议仅在 IO 密集型任务中使用 Python 多线程 因为 Python 有一个全局解释器锁 GIL 只允许一个线程持有 Python 解释器的控制权 然而 多线程对于 IO 密集型操作有意义吗 https stackoverflow c
  • 在f字符串中转义字符[重复]

    这个问题在这里已经有答案了 我遇到了以下问题f string gt gt gt a hello how to print hello gt gt gt f a a gt gt gt f a File
  • 使用 \r 并打印一些文本后如何清除控制台中的一行?

    对于我当前的项目 有一些代码很慢并且我无法使其更快 为了获得一些关于已完成 必须完成多少的反馈 我创建了一个进度片段 您可以在下面看到 当你看到最后一行时 sys stdout write r100 80 n I use 80覆盖最终剩余的
  • Scrapy:如何使用元在方法之间传递项目

    我是 scrapy 和 python 的新手 我试图将 parse quotes 中的项目 item author 传递给下一个解析方法 parse bio 我尝试了 request meta 和 response meta 方法 如 sc
  • 在 Qt 中自动调整标签文本大小 - 奇怪的行为

    在 Qt 中 我有一个复合小部件 它由排列在 QBoxLayouts 内的多个 QLabels 组成 当小部件调整大小时 我希望标签文本缩放以填充标签区域 并且我已经在 resizeEvent 中实现了文本大小的调整 这可行 但似乎发生了某
  • Statsmodels.formula.api OLS不显示截距的统计值

    我正在运行以下源代码 import statsmodels formula api as sm Add one column of ones for the intercept term X np append arr np ones 50

随机推荐