Python ROS2 プログラム 高専4年生

【ROS2講座③】パッケージの作成とビルド【Python】

前回はROS2のサービス、パラメータ、アクションについて学びました。

今回は実際にパッケージを作り、プログラム開発を進めてゆきます。

colcon(コルコン) を使って ROS 2 のプログラムをまとめて「ビルド(build)」し、実行できる状態にする流れを学び、「ビルド」「インストール」「ワークスペース」などの専門用語の意味を1つずつ理解してゆきます。

ビルドとは

プログラムのソースコード(.cpp や .py ファイル)を、実行できる形式(バイナリやスクリプト)にまとめて変換する作業のことです。

たとえば…

  • C++のソースファイル → コンパイルして .so や 実行ファイルに変換
  • Pythonのコード → パッケージにまとめて、パス設定などで呼び出せるようにする

colconとは

colcon(COLlective CONstruction)

ROS 2 で使われる パッケージ管理&ビルドツール。次のようなことができます:

機能説明
複数パッケージのビルドsrc 内に複数パッケージを置くと一括ビルドできる
実行ファイルやライブラリのインストール実行に必要なファイルを install/ に配置
ログや設定の管理エラーやログは log/ に保存される

ワークスペースの作成

以下のコマンドを実行します。

mkdir -p ~/ros2_ws/src
cd ~/ros2_ws

専門用語:ワークスペース 自分が作った ROS 2 のコードやパッケージをまとめて置いておく場所。

上記はLinuxのコマンドで、フォルダの作成と作成したフォルダに移動しています。

src フォルダの中にソースコードを置いて、colcon がそこを見てビルドします。

サンプルパッケージの追加

下記のコマンドでgithubに公開されているサンプルコードをros2_wsの中にコピーします。

git clone https://github.com/ros2/examples src/examples -b humble

ビルド(build)

試しにビルドしてみましょう。

source /opt/ros/humble/setup.bash
colcon build --symlink-install

もし途中でフリーズしてしまう場合はCPUやメモリの性能が足りていない可能性があります。

以下の--executor sequential オプションを使うと安定してビルドできます。

colcon build --executor sequential

1つずつ順番にパッケージをビルドします(直列処理)

専門用語:--symlink-install Pythonスクリプトの変更をすぐに反映させるために、実ファイルではなく「ショートカット(シンボリックリンク)」を貼る設定です。

ビルドが終わると下記のようなフォルダ構成になっています。

ros2_ws/
├── src/          ← ソースコード
├── build/        ← 中間ファイル(コンパイル結果)
├── install/      ← 実行に必要なファイル(実行バイナリなど)
├── log/          ← ビルド時のログ情報

環境の読み込み

これを実行しないと ros2 run が新しいノードを見つけてくれません。

source install/setup.bash

専門用語:source とは?

実行環境に「このワークスペースに新しくできたプログラムがありますよ」と教えてあげるコマンド。

ノードの実行

それではビルドしたプログラムがきちんと動作するか確かめてみます。

1つ目のノードを実行

ros2 run examples_rclcpp_minimal_subscriber subscriber_member_function

2つ目のノードを実行

ros2 run examples_rclcpp_minimal_publisher publisher_member_function

ビルドコマンドまとめ

まとめると以下のようになります。

コマンド意味
colcon buildビルド(=プログラムの組み立て)
colcon test自動テストを実行
source install/setup.bash環境設定を読み込む(実行可能にする)
colcon build --symlink-install開発中のPythonファイルを即時反映
colcon build --executor sequentialビルドを1行ずつ実行

プログラムの自作

ここからは自分でプログラムを作りながら学習を進めてゆきます。

プログラムの開発はIDE(Integrated Development Environmentの略で、日本語では統合開発環境)を使用すると楽です。

今回のdocker環境には最初からVSCodium(VSCodeの互換ソフトウェア)が入っています。

WSL 2.6.1 以降の仕様変更で、GUI アプリ(Electron系)を直接 WSL 内で動かすことを推奨しなくなったため、ダブルクリックするだけでは起動しない場合があります。

ターミナルから以下のコマンドを実行します。

codium --no-sandbox --disable-gpu

その後、本当に起動するか[y/n]と聞かれますのでyを入力し、エンターを押します。

起動したら日本語化します。

Extensionsをクリックし、検索窓にJapaneseと入力し、Installをクリックします。

Trust Publisher &Installをクリックします。

右下のRestartをクリックすると再起動し、日本語化されています。

再起動したらフォルダを開くを選択します。

ros2_wsを選択しOPENをクリックします。

VSCodium上でターミナルを開きます。

ターミナル上で以下のコマンドを実行します。

srcフォルダに移動

cd src

パッケージの作成

ros2 pkg create --build-type ament_python --license Apache-2.0 py_pubsub

py_pubsub というパッケージが自動生成されます。

py_pubsub/py_pubsunフォルダの中にpublisher_member_function.pyというファイルを作成し、以下の内容を書き込んでください。

# rclpy は ROS 2 の Python クライアントライブラリ
import rclpy
# Node クラスは ROS 2 ノードを作るための基本クラス
from rclpy.node import Node

# std_msgs の String 型メッセージを使う
from std_msgs.msg import String

# MinimalPublisher クラス:ROSノードの本体となるクラス
class MinimalPublisher(Node):

    def __init__(self):
        # 親クラス(Node)を初期化。ノード名は "minimal_publisher"
        super().__init__('minimal_publisher')

        # パブリッシャーを作成。型は String、トピック名は 'topic'、キューサイズは10
        self.publisher_ = self.create_publisher(String, 'topic', 10)

        # 0.5秒ごとに timer_callback() を呼び出すタイマーを設定
        timer_period = 0.5  # 秒単位
        self.timer = self.create_timer(timer_period, self.timer_callback)

        # カウント用の変数 i(メッセージに含める番号)
        self.i = 0

    # タイマーで定期的に呼び出される関数
    def timer_callback(self):
        # メッセージオブジェクトを作成(型はString)
        msg = String()
        # メッセージの中身を "Hello World: i" という文字列に設定
        msg.data = 'Hello World: %d' % self.i

        # メッセージをパブリッシュ(送信)
        self.publisher_.publish(msg)

        # ログとして送信したメッセージ内容を表示(画面に出力)
        self.get_logger().info('Publishing: "%s"' % msg.data)

        # カウントアップ(次回メッセージ用)
        self.i += 1

# ノードを起動する main 関数
def main(args=None):
    # ROS 2 の Pythonライブラリを初期化(ノードなどを使う準備)
    rclpy.init(args=args)

    # MinimalPublisher ノードのインスタンスを作成
    minimal_publisher = MinimalPublisher()

    # ノードを実行状態にする(Ctrl+C で止めるまでループ)
    rclpy.spin(minimal_publisher)

    # ノードを明示的に破棄(メモリ解放・終了処理)
    minimal_publisher.destroy_node()
    # ROS 2 をシャットダウン(終了処理)
    rclpy.shutdown()

# スクリプトが直接実行されたとき、main() を呼び出す
if __name__ == '__main__':
    main()

こちらは

項目説明
create_publisher()指定した型・トピック名でメッセージを送信するためのパブリッシャーを作る
create_timer()一定時間ごとに指定した関数を実行(非同期的に)
String()ROS 2 の文字列メッセージオブジェクト
publish()メッセージを送信(トピックに流す)
get_logger().info()ターミナル上にログ出力する(デバッグに便利)
rclpy.spin()ノードを生かして、コールバックが呼ばれるのを待つループ
rclpy.shutdown()ROS 2 を安全に終了する

同様にしてsubscriber_member_function.pyファイルも作ります。

# rclpy は ROS 2 の Python クライアントライブラリ
import rclpy
# Node クラスは ROS 2 ノードを定義するための基本クラス
from rclpy.node import Node

# std_msgs パッケージから、文字列メッセージ型(String)をインポート
from std_msgs.msg import String


# MinimalSubscriber クラス:ROS ノードの受信側ノード(サブスクライバー)を定義
class MinimalSubscriber(Node):

    def __init__(self):
        # 親クラス Node を初期化。ノード名は 'minimal_subscriber'
        super().__init__('minimal_subscriber')

        # サブスクリプション(購読)を作成
        # 'topic' という名前のトピックから String 型メッセージを購読
        # メッセージを受信すると listener_callback 関数が呼ばれる
        self.subscription = self.create_subscription(
            String,              # 受け取るメッセージ型(std_msgs.msg.String)
            'topic',             # 購読するトピック名
            self.listener_callback,  # コールバック関数(受信時に実行)
            10)                  # キューサイズ(メッセージがたまる最大数)

        # 変数を使わないと警告が出るので、それを回避するためのダミー参照
        self.subscription

    # メッセージを受け取ったときに呼ばれる関数(コールバック)
    def listener_callback(self, msg):
        # msg.data に入っている文字列をログ出力(ターミナルに表示)
        self.get_logger().info('I heard: "%s"' % msg.data)


# main 関数:このノードを実行するときのエントリーポイント
def main(args=None):
    # rclpy の初期化(ROS 2 通信を開始できるようにする)
    rclpy.init(args=args)

    # MinimalSubscriber ノードのインスタンスを作成
    minimal_subscriber = MinimalSubscriber()

    # ノードを実行状態にして、メッセージを受信し続ける(Ctrl+Cで停止)
    rclpy.spin(minimal_subscriber)

    # ノードを明示的に破棄(省略可能だが、きれいに終了させたい場合は書く)
    minimal_subscriber.destroy_node()
    # ROS 2 の通信をシャットダウン
    rclpy.shutdown()


# このスクリプトが直接実行された場合、main() を呼び出す
if __name__ == '__main__':
    main()
要素解説
create_subscription()指定したトピック名・メッセージ型で受信の準備をする
listener_callback()メッセージが届いたときに自動で呼び出される関数
msg.data実際に送られてきた文字列の中身
get_logger().info()メッセージを画面に出力する関数(ログとして残る)
rclpy.spin()ノードを起動し、コールバック関数を呼び出す仕組み

依存関係とエントリポイントを設定

package.xml に依存パッケージを追加

package.xml は、ROS 2 パッケージの「名刺」のようなファイルです。
この中で、他のパッケージ(ライブラリ)への依存を宣言します。

ROS 2 のビルドツールや実行環境が、
「このパッケージには何が必要なのか?」を判断するために使います。

以下を追加します。

<exec_depend>rclpy</exec_depend>
<exec_depend>std_msgs</exec_depend>
タグ説明
rclpyROS 2 の Python API ライブラリ
std_msgsString 型など、標準メッセージ型を使うために必要

編集が終わったらCtrl+Sで保存します。

setup.py にエントリポイントを追加

setup.py は、Python製パッケージをインストールするための設定ファイルです。
特に重要なのが entry_points という項目です。

ros2 run パッケージ名 ノード名 のように、ターミナルからノードを実行できるようにするためです。

entry_points={
    'console_scripts': [
        'publisher = py_pubsub.publisher_member_function:main',
        'subscriber = py_pubsub.subscriber_member_function:main',
    ],
},
部分意味
'publisher = ...'ros2 run py_pubsub publisher と入力できるようにする
py_pubsub.publisher_member_function:mainpy_pubsub/ ディレクトリの publisher_member_function.py にある main() を実行

編集が終わったらCtrl+Sで保存します。

それではビルドを行います。

cd ~/ros2_ws
colcon build --packages-select py_pubsub
source install/setup.bash

プログラムの実行

送信側のプログラムを実行します。

ros2 run py_pubsub publisher

新しいターミナルを開き、以下のコマンドを打って、受信側のプログラムを実行します。

開いた際にはsourceの指定が必要です。

source install/setup.bash
ros2 run py_pubsub subscriber

上記のコマンドを別のターミナルで実行します。

上記のように送信側と受信側で通信が行えていれば成功です。

課題1 オリジナルトピックで文字列を送信するノードを作ろう

📋 指示:

  • トピック名を greeting に変更して、
  • "Hello from Publisher!" という固定メッセージを1秒おきに送信するノード(greeting_publisher)を作成してください。

✍️ チェック項目:

  • パブリッシャーの作成に create_publisher() を使っているか
  • トピック名が 'greeting' になっているか
  • メッセージ型が std_msgs.msg.String であるか
  • タイマーで定期送信されるようになっているか

解答例はこちら
import rclpy
from rclpy.node import Node
from std_msgs.msg import String

class GreetingPublisher(Node):
    def __init__(self):
        super().__init__('greeting_publisher')
        self.publisher_ = self.create_publisher(String, 'greeting', 10)
        self.timer = self.create_timer(1.0, self.timer_callback)

    def timer_callback(self):
        msg = String()
        msg.data = "Hello from Publisher!"
        self.publisher_.publish(msg)
        self.get_logger().info(f'Published: "{msg.data}"')

def main(args=None):
    rclpy.init(args=args)
    node = GreetingPublisher()
    rclpy.spin(node)
    node.destroy_node()
    rclpy.shutdown()

if __name__ == '__main__':
    main()

課題2 受け取ったメッセージの長さを出力するサブスクライバを作ろう

📋 指示:

  • greeting トピックを購読する greeting_subscriber を作成してください。
  • 受け取ったメッセージの「文字数」を表示してください。

✍️ チェック項目:

  • msg.datalen() を使って長さを計算しているか
  • get_logger().info()"Length: X" のように出力しているか
解答例はこちら
import rclpy
from rclpy.node import Node
from std_msgs.msg import String

class GreetingSubscriber(Node):
    def __init__(self):
        super().__init__('greeting_subscriber')
        self.subscription = self.create_subscription(
            String,
            'greeting',
            self.listener_callback,
            10)

    def listener_callback(self, msg):
        length = len(msg.data)
        self.get_logger().info(f'Received message of length: {length}')

def main(args=None):
    rclpy.init(args=args)
    node = GreetingSubscriber()
    rclpy.spin(node)
    node.destroy_node()
    rclpy.shutdown()

if __name__ == '__main__':
    main()

課題3 新しいメッセージ型を使ってみよう(応用)

📋 指示:

  • std_msgs.msg.Int32 型を使って、0から順番に整数を送信する number_publisher を作成してください。
  • トピック名は count とします。
  • 受信ノード number_subscriber は、受け取った数値が偶数か奇数かを判断して出力してください。

✍️ チェック項目:

  • msg.data % 2 を使って偶奇を判断しているか
  • ロガー出力に "Even" または "Odd" を出力しているか

Int32 型とは?

Int32 は、ROS 2 の標準メッセージ型 std_msgs に含まれている 32ビット符号付き整数(int) を送受信するためのメッセージ型です。

  • 値の範囲:-2,147,483,648+2,147,483,647
  • Pythonでいう int 型に対応しています
  • 単一のフィールド(data)だけを持つシンプルな構造

インポート方法:

from std_msgs.msg import Int32

Int32 メッセージの作成と代入

msg = Int32()
msg.data = 42  # 任意の整数値を代入

パブリッシャーで使う場合

self.publisher_ = self.create_publisher(Int32, 'count', 10)

サブスクライバーで使う場合

self.subscription = self.create_subscription(
    Int32,
    'count',
    self.listener_callback,
    10)

解答例はこちら

0から順に数値を送信するノード

import rclpy
from rclpy.node import Node
from std_msgs.msg import Int32

class NumberPublisher(Node):
    def __init__(self):
        super().__init__('number_publisher')
        self.publisher_ = self.create_publisher(Int32, 'count', 10)
        self.i = 0
        self.timer = self.create_timer(1.0, self.timer_callback)

    def timer_callback(self):
        msg = Int32()
        msg.data = self.i
        self.publisher_.publish(msg)
        self.get_logger().info(f'Published: {msg.data}')
        self.i += 1

def main(args=None):
    rclpy.init(args=args)
    node = NumberPublisher()
    rclpy.spin(node)
    node.destroy_node()
    rclpy.shutdown()

if __name__ == '__main__':
    main()

受け取った整数が偶数か奇数かを判断するノード

import rclpy
from rclpy.node import Node
from std_msgs.msg import Int32

class NumberSubscriber(Node):
    def __init__(self):
        super().__init__('number_subscriber')
        self.subscription = self.create_subscription(
            Int32,
            'count',
            self.listener_callback,
            10)

    def listener_callback(self, msg):
        if msg.data % 2 == 0:
            self.get_logger().info(f'{msg.data} is Even')
        else:
            self.get_logger().info(f'{msg.data} is Odd')

def main(args=None):
    rclpy.init(args=args)
    node = NumberSubscriber()
    rclpy.spin(node)
    node.destroy_node()
    rclpy.shutdown()

if __name__ == '__main__':
    main()

今回はパッケージの作成とビルドを学びました。

次回はこちら

Comming soon

Follow me!

-Python, ROS2, プログラム, 高専4年生
-, , , ,

PAGE TOP