performance_test に学ぶ DDS API

はじめに

  • ROS2 は FastRTPS や CycloneDDS など通信レイヤの実装を取り替えることができます。
  • ROS2 を詳しく知るためこれらのミドルウェアの API の使い方を知りたいという場面があるかもしれません。
  • performance_test が良いサンプルになるので少し解説します。

performance_test パッケージ

概要

  • performance_test はROS2/FastDDS/CycloneDDS等の各種通信方式の性能やレイテンシを計測するためのテストツールです。ROS2のCI buildfarm_perf_tests でも使われているパッケージです。オリジナルはApex社の https://gitlab.com/ApexAI/performance_test でros2レポジトリにforkされています。
  • デフォルトでは 1 プロセス内に publisher と subscriber を一つずつ持ち 1000 Hz で pub -> sub を行ない送受信数やリソースの利用状況をレポートします。
  • 通信方式を -c オプションにより指定できます。オプション値としてROS2, FastRTPS CycloneDDS等が指定できます。それぞれ以下のような意味になります。
    • ROS2オプション指定時は create_publisher などROS2のrclcpp層のAPIを使った通信
    • FastRTPSやCycloneDDSオプション指定時は該当する通信ミドルウェアのAPIを直接呼び出した通信
  • その為、この通信の実装を見ると通信ミドルウェアのAPIの使い方を知ることができます。

通信部の実装

  • 通信部の実装は GitHubのros2/performance_test/communication_abstractions/ にあります。
  • Communicatorクラスを実装する形でROS2Communicator, FastRTPSCommunicator, CycloneDDSCommunicator等が定義されています。
  • これらのソースコードを抜粋しながらAPIを簡単に見てみたいと思います。ここではCycloneDDSのAPIを見てみます。なおソースコードは見やすいように一部改変しています。今回参考にしたコードは以下のものです。
    ros2/performance_test/cyclonedds_communicator.hpp

CycloneDDSCommunicator クラスの説明

CycloneDDSCommunicator クラスは CycloneDDS の API を直接呼び出した通信方式を実装したクラスです。このクラスのコンストラクタおよび publish/subscribe に関する処理を紹介します。
実は他のCommunicatorクラスも同じ様に構造をしている為、一つ見ておくと他のクラスも読みやすいと思います。

インスタンス変数一覧

CycloneDDSCommunicator クラスのインスタンス変数は以下の通りです。 m_hoge という変数名になっています。

dds_entity_t はCycloneDDSが提供する型です。実体は int で正の値の場合に意味を持ちます。後で見る様にCycloneDDSが提供する dds_hoge() という関数で初期化されます。

class CycloneDDSQOSAdapter {
private:
  dds_entity_t m_participant;

  static dds_entity_t m_topic;
  dds_entity_t m_datawriter;
  dds_entity_t m_datareader;

  dds_entity_t m_waitset;
} 

簡単に変数の意味を記載します。ここで「DDS オブジェクト」と記載していますが、 C++のオブジェクトではなく、「DDSの処理に用いる何か」位の意味で捉えて下さい。

変数名 説明
m_participant DDS 規格で定義されているparticipantです。ハンドルの様な物で、DDSオブジェクトを作る際に指定します。
m_topic トピックを表わすDDSオブジェクトです。
m_datawriter トピックへ書き込みを行なうDDSオブジェクトです。publish時に使われます。
m_datareader トピックからのデータ読み込みを行なうDDSオブジェクトです。subscribe時に使われます。
m_waitset 待ちを行なうためのDDSオブジェクトです。

CycloneDDSCommunicator クラスのコンストラクタ

explicit CycloneDDSCommunicator(SpinLock & lock)
: Communicator(lock),
  m_participant(ResourceManager::get().cyclonedds_participant()),
  m_datawriter(0),
  m_datareader(0)
{
  register_topic();
} 

インスタンス変数の m_participant を初期化し register_topic() でトピックの初期化を行ないます。 register_topic() はCycloneDDSCommunicator のプライベート関数です。処理を以下に抜粋します。

void CycloneDDSCommunicator::register_topic()
{
  if (m_topic == 0) {
    m_topic = dds_create_topic(
      m_participant,               # participant
      Topic::CycloneDDSDesc(),     # dds_topic_descriptor_t。 Topic はテンプレートで実体は topics.hpp に指定されている performance_test_msgs_msg_dds__Array32k__des 等
      Topic::topic_name().c_str(), # topic 名
      nullptr, 
      nullptr);
    if (m_topic < 0) {
      throw std::runtime_error("failed to create topic");
    }
  }
} 

CycloneDDSが提供する dds_create_topic でトピックを作成していることが分かります。以下にいくつか登場する dds_hoge という関数はいずれもCycloneDDSが提供する関数です。

CycloneDDSCommunicatorクラスのpublish処理

さて、 DDS では topic に対して DataWriter を使ってメッセージを送信します。
publish 関数にこれらの処理が定義されています。

void CycloneDDSCommunicator::publish(DataType & data, const std::chrono::nanoseconds time)
{
  if (m_datawriter == 0) {
    dds_qos_t * dw_qos = dds_create_qos();
    CycloneDDSQOSAdapter qos_adapter(m_ec.qos());
    qos_adapter.apply(dw_qos);
    m_datawriter = dds_create_writer(m_participant, m_topic, dw_qos, nullptr);
    dds_delete_qos(dw_qos);
    if (m_datawriter < 0) {
       throw std::runtime_error("failed to create datawriter");
    }
  }

  dds_write(m_datawriter, static_cast<void *>(&data);
} 

m_datawriter が未作成なら作成して送信処理を行ないます。CycloneDDS の関数 dds_create_writer() で m_datawriter を作成します。引数に participant, topic, QoS を渡しているのがわかります。その後、CycloneDDS の関数 dds_write でメッセージを送信しています。

CycloneDDSCommunicatorクラスのsubscribe処理

subscribeはDataReader経由で読み込みを行ないます。publish処理と同様に m_datareader が未作成なら作成して読み込み処理を行なっています。
performance_testパッケージでは定期的に下記の関数を呼びだしてトピックを読み込んでいます。

void CycloneDDSCommunicator::update_subscription()
{
  if (m_datareader == 0) {
    dds_qos_t * dw_qos = dds_create_qos();
    CycloneDDSQOSAdapter qos_adapter(m_ec.qos());
    qos_adapter.apply(dw_qos);
    m_datareader = dds_create_reader(m_participant, m_topic, dw_qos, nullptr);
    dds_delete_qos(dw_qos);
    if (m_datareader < 0) {
      throw std::runtime_error("failed to create datareader");
    }
    dds_set_status_mask(m_datareader, DDS_DATA_AVAILABLE_STATUS);
    m_waitset = dds_create_waitset(m_participant);
    if (dds_waitset_attach(m_waitset, m_datareader, 1) < 0) {
      throw std::runtime_error("failed to attach waitset");
    }
  }

  dds_waitset_wait(m_waitset, nullptr, 0, DDS_SECS(15));
  dds_sample_info_t si;
  dds_take(m_datareader, &untyped, &si, 1, 1);
  dds_return_loan(m_datareader, &untyped, n);
} 

CycloneDDSの関数 dds_create_reader でDataReaderを作成しています。また dds_create_waitset によりWaitSetを作成します。これはすぐ後の dds_waitset_wait で待ち処理に使われます。 dds_take でデータを受け取った後、 dds_return_loan でメモリを返却しています。

まとめ

performance_testパッケージを例にCycloneDDSのAPIがどの様に使われているか見てみました。
もちろん各種通信ミドルウェアのサンプル等を見てもよいのですが、このパッケージは様々なミドルウェアをサポートしているため各種通信ミドルウェアのAPIの使い方を比較しやすいかと思います。
他のCommunicatorクラスの実装も今回紹介したCycloneDDSCommunicatorと同じく以下の様な作りになっています。興味のある方は他のクラスも見てみると面白いかと思います。

  • コンストラクタでDDS participantとトピックの初期化
  • publish() 関数でDataWriterの作成と送信処理
  • update_subscription() 関数でDataReaderの作成・待ち・受信処理