前回はROS2のサービス、パラメータ、アクションについて学びました。

今回は実際にパッケージを作り、プログラム開発を進めてゆきます。
colcon(コルコン) を使って ROS 2 のプログラムをまとめて「ビルド(build)」し、実行できる状態にする流れを学び、「ビルド」「インストール」「ワークスペース」などの専門用語の意味を1つずつ理解してゆきます。
ビルドとは
プログラムのソースコード(.cpp や .py ファイル)を、実行できる形式(バイナリやスクリプト)にまとめて変換する作業のことです。
たとえば…
- C++のソースファイル → コンパイルして
.so
や 実行ファイルに変換 - Pythonのコード → パッケージにまとめて、パス設定などで呼び出せるようにする
colconとは
colcon(COLlective CONstruction)
ROS 2 で使われる パッケージ管理&ビルドツール。次のようなことができます:
機能 | 説明 |
---|---|
複数パッケージのビルド | src 内に複数パッケージを置くと一括ビルドできる |
実行ファイルやライブラリのインストール | 実行に必要なファイルを install/ に配置 |
ログや設定の管理 | エラーやログは log/ に保存される |
ワークスペースの作成
以下のコマンドを実行します。
mkdir -p ~/ros2_ws/src
cd ~/ros2_ws
専門用語:ワークスペース 自分が作った ROS 2 のコードやパッケージをまとめて置いておく場所。
上記はLinuxのコマンドで、フォルダの作成と作成したフォルダに移動しています。
src
フォルダの中にソースコードを置いて、colcon
がそこを見てビルドします。
サンプルパッケージの追加
下記のコマンドでgithubに公開されているサンプルコードをros2_wsの中にコピーします。
git clone https://github.com/ros2/examples src/examples -b humble
ビルド(build)
試しにビルドしてみましょう。
source /opt/ros/humble/setup.bash
colcon build --symlink-install
もし途中でフリーズしてしまう場合はCPUやメモリの性能が足りていない可能性があります。
以下の--executor sequential オプションを使うと安定してビルドできます。
colcon build --executor sequential
1つずつ順番にパッケージをビルドします(直列処理)
専門用語:--symlink-install Pythonスクリプトの変更をすぐに反映させるために、実ファイルではなく「ショートカット(シンボリックリンク)」を貼る設定です。
ビルドが終わると下記のようなフォルダ構成になっています。
ros2_ws/
├── src/ ← ソースコード
├── build/ ← 中間ファイル(コンパイル結果)
├── install/ ← 実行に必要なファイル(実行バイナリなど)
├── log/ ← ビルド時のログ情報

環境の読み込み
これを実行しないと ros2 run
が新しいノードを見つけてくれません。
source install/setup.bash
専門用語:source とは?
実行環境に「このワークスペースに新しくできたプログラムがありますよ」と教えてあげるコマンド。
ノードの実行
それではビルドしたプログラムがきちんと動作するか確かめてみます。
1つ目のノードを実行
ros2 run examples_rclcpp_minimal_subscriber subscriber_member_function
2つ目のノードを実行
ros2 run examples_rclcpp_minimal_publisher publisher_member_function
ビルドコマンドまとめ
まとめると以下のようになります。
コマンド | 意味 |
---|---|
colcon build | ビルド(=プログラムの組み立て) |
colcon test | 自動テストを実行 |
source install/setup.bash | 環境設定を読み込む(実行可能にする) |
colcon build --symlink-install | 開発中のPythonファイルを即時反映 |
colcon build --executor sequential | ビルドを1行ずつ実行 |
プログラムの自作
ここからは自分でプログラムを作りながら学習を進めてゆきます。
プログラムの開発はIDE(Integrated Development Environmentの略で、日本語では統合開発環境)を使用すると楽です。
今回のdocker環境には最初からVSCodium(VSCodeの互換ソフトウェア)が入っています。

WSL 2.6.1 以降の仕様変更で、GUI アプリ(Electron系)を直接 WSL 内で動かすことを推奨しなくなったため、ダブルクリックするだけでは起動しない場合があります。
ターミナルから以下のコマンドを実行します。
codium --no-sandbox --disable-gpu

その後、本当に起動するか[y/n]と聞かれますのでyを入力し、エンターを押します。
起動したら日本語化します。
Extensionsをクリックし、検索窓にJapaneseと入力し、Installをクリックします。

Trust Publisher &Installをクリックします。

右下のRestartをクリックすると再起動し、日本語化されています。

再起動したらフォルダを開くを選択します。

ros2_wsを選択しOPENをクリックします。

VSCodium上でターミナルを開きます。

ターミナル上で以下のコマンドを実行します。
srcフォルダに移動
cd src
パッケージの作成
ros2 pkg create --build-type ament_python --license Apache-2.0 py_pubsub
py_pubsub
というパッケージが自動生成されます。

py_pubsub/py_pubsunフォルダの中にpublisher_member_function.pyというファイルを作成し、以下の内容を書き込んでください。

# rclpy は ROS 2 の Python クライアントライブラリ
import rclpy
# Node クラスは ROS 2 ノードを作るための基本クラス
from rclpy.node import Node
# std_msgs の String 型メッセージを使う
from std_msgs.msg import String
# MinimalPublisher クラス:ROSノードの本体となるクラス
class MinimalPublisher(Node):
def __init__(self):
# 親クラス(Node)を初期化。ノード名は "minimal_publisher"
super().__init__('minimal_publisher')
# パブリッシャーを作成。型は String、トピック名は 'topic'、キューサイズは10
self.publisher_ = self.create_publisher(String, 'topic', 10)
# 0.5秒ごとに timer_callback() を呼び出すタイマーを設定
timer_period = 0.5 # 秒単位
self.timer = self.create_timer(timer_period, self.timer_callback)
# カウント用の変数 i(メッセージに含める番号)
self.i = 0
# タイマーで定期的に呼び出される関数
def timer_callback(self):
# メッセージオブジェクトを作成(型はString)
msg = String()
# メッセージの中身を "Hello World: i" という文字列に設定
msg.data = 'Hello World: %d' % self.i
# メッセージをパブリッシュ(送信)
self.publisher_.publish(msg)
# ログとして送信したメッセージ内容を表示(画面に出力)
self.get_logger().info('Publishing: "%s"' % msg.data)
# カウントアップ(次回メッセージ用)
self.i += 1
# ノードを起動する main 関数
def main(args=None):
# ROS 2 の Pythonライブラリを初期化(ノードなどを使う準備)
rclpy.init(args=args)
# MinimalPublisher ノードのインスタンスを作成
minimal_publisher = MinimalPublisher()
# ノードを実行状態にする(Ctrl+C で止めるまでループ)
rclpy.spin(minimal_publisher)
# ノードを明示的に破棄(メモリ解放・終了処理)
minimal_publisher.destroy_node()
# ROS 2 をシャットダウン(終了処理)
rclpy.shutdown()
# スクリプトが直接実行されたとき、main() を呼び出す
if __name__ == '__main__':
main()
こちらは
項目 | 説明 |
---|---|
create_publisher() | 指定した型・トピック名でメッセージを送信するためのパブリッシャーを作る |
create_timer() | 一定時間ごとに指定した関数を実行(非同期的に) |
String() | ROS 2 の文字列メッセージオブジェクト |
publish() | メッセージを送信(トピックに流す) |
get_logger().info() | ターミナル上にログ出力する(デバッグに便利) |
rclpy.spin() | ノードを生かして、コールバックが呼ばれるのを待つループ |
rclpy.shutdown() | ROS 2 を安全に終了する |
同様にしてsubscriber_member_function.pyファイルも作ります。
# rclpy は ROS 2 の Python クライアントライブラリ
import rclpy
# Node クラスは ROS 2 ノードを定義するための基本クラス
from rclpy.node import Node
# std_msgs パッケージから、文字列メッセージ型(String)をインポート
from std_msgs.msg import String
# MinimalSubscriber クラス:ROS ノードの受信側ノード(サブスクライバー)を定義
class MinimalSubscriber(Node):
def __init__(self):
# 親クラス Node を初期化。ノード名は 'minimal_subscriber'
super().__init__('minimal_subscriber')
# サブスクリプション(購読)を作成
# 'topic' という名前のトピックから String 型メッセージを購読
# メッセージを受信すると listener_callback 関数が呼ばれる
self.subscription = self.create_subscription(
String, # 受け取るメッセージ型(std_msgs.msg.String)
'topic', # 購読するトピック名
self.listener_callback, # コールバック関数(受信時に実行)
10) # キューサイズ(メッセージがたまる最大数)
# 変数を使わないと警告が出るので、それを回避するためのダミー参照
self.subscription
# メッセージを受け取ったときに呼ばれる関数(コールバック)
def listener_callback(self, msg):
# msg.data に入っている文字列をログ出力(ターミナルに表示)
self.get_logger().info('I heard: "%s"' % msg.data)
# main 関数:このノードを実行するときのエントリーポイント
def main(args=None):
# rclpy の初期化(ROS 2 通信を開始できるようにする)
rclpy.init(args=args)
# MinimalSubscriber ノードのインスタンスを作成
minimal_subscriber = MinimalSubscriber()
# ノードを実行状態にして、メッセージを受信し続ける(Ctrl+Cで停止)
rclpy.spin(minimal_subscriber)
# ノードを明示的に破棄(省略可能だが、きれいに終了させたい場合は書く)
minimal_subscriber.destroy_node()
# ROS 2 の通信をシャットダウン
rclpy.shutdown()
# このスクリプトが直接実行された場合、main() を呼び出す
if __name__ == '__main__':
main()
要素 | 解説 |
---|---|
create_subscription() | 指定したトピック名・メッセージ型で受信の準備をする |
listener_callback() | メッセージが届いたときに自動で呼び出される関数 |
msg.data | 実際に送られてきた文字列の中身 |
get_logger().info() | メッセージを画面に出力する関数(ログとして残る) |
rclpy.spin() | ノードを起動し、コールバック関数を呼び出す仕組み |
依存関係とエントリポイントを設定
package.xml
に依存パッケージを追加
package.xml
は、ROS 2 パッケージの「名刺」のようなファイルです。
この中で、他のパッケージ(ライブラリ)への依存を宣言します。
ROS 2 のビルドツールや実行環境が、
「このパッケージには何が必要なのか?」を判断するために使います。
以下を追加します。
<exec_depend>rclpy</exec_depend>
<exec_depend>std_msgs</exec_depend>
タグ | 説明 |
---|---|
rclpy | ROS 2 の Python API ライブラリ |
std_msgs | String 型など、標準メッセージ型を使うために必要 |

編集が終わったらCtrl+Sで保存します。
setup.py
にエントリポイントを追加
setup.py
は、Python製パッケージをインストールするための設定ファイルです。
特に重要なのが entry_points
という項目です。
ros2 run パッケージ名 ノード名
のように、ターミナルからノードを実行できるようにするためです。
entry_points={
'console_scripts': [
'publisher = py_pubsub.publisher_member_function:main',
'subscriber = py_pubsub.subscriber_member_function:main',
],
},
部分 | 意味 |
---|---|
'publisher = ...' | ros2 run py_pubsub publisher と入力できるようにする |
py_pubsub.publisher_member_function:main | py_pubsub/ ディレクトリの publisher_member_function.py にある main() を実行 |

編集が終わったらCtrl+Sで保存します。
それではビルドを行います。
cd ~/ros2_ws
colcon build --packages-select py_pubsub
source install/setup.bash
プログラムの実行
送信側のプログラムを実行します。
ros2 run py_pubsub publisher
新しいターミナルを開き、以下のコマンドを打って、受信側のプログラムを実行します。
開いた際にはsourceの指定が必要です。
source install/setup.bash
ros2 run py_pubsub subscriber
上記のコマンドを別のターミナルで実行します。

上記のように送信側と受信側で通信が行えていれば成功です。
課題1 オリジナルトピックで文字列を送信するノードを作ろう
📋 指示:
- トピック名を
greeting
に変更して、 "Hello from Publisher!"
という固定メッセージを1秒おきに送信するノード(greeting_publisher
)を作成してください。
✍️ チェック項目:
- パブリッシャーの作成に
create_publisher()
を使っているか - トピック名が
'greeting'
になっているか - メッセージ型が
std_msgs.msg.String
であるか - タイマーで定期送信されるようになっているか
解答例はこちら
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
class GreetingPublisher(Node):
def __init__(self):
super().__init__('greeting_publisher')
self.publisher_ = self.create_publisher(String, 'greeting', 10)
self.timer = self.create_timer(1.0, self.timer_callback)
def timer_callback(self):
msg = String()
msg.data = "Hello from Publisher!"
self.publisher_.publish(msg)
self.get_logger().info(f'Published: "{msg.data}"')
def main(args=None):
rclpy.init(args=args)
node = GreetingPublisher()
rclpy.spin(node)
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
課題2 受け取ったメッセージの長さを出力するサブスクライバを作ろう
📋 指示:
greeting
トピックを購読するgreeting_subscriber
を作成してください。- 受け取ったメッセージの「文字数」を表示してください。
✍️ チェック項目:
msg.data
のlen()
を使って長さを計算しているかget_logger().info()
で"Length: X"
のように出力しているか
解答例はこちら
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
class GreetingSubscriber(Node):
def __init__(self):
super().__init__('greeting_subscriber')
self.subscription = self.create_subscription(
String,
'greeting',
self.listener_callback,
10)
def listener_callback(self, msg):
length = len(msg.data)
self.get_logger().info(f'Received message of length: {length}')
def main(args=None):
rclpy.init(args=args)
node = GreetingSubscriber()
rclpy.spin(node)
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
課題3 新しいメッセージ型を使ってみよう(応用)
📋 指示:
std_msgs.msg.Int32
型を使って、0から順番に整数を送信するnumber_publisher
を作成してください。- トピック名は
count
とします。 - 受信ノード
number_subscriber
は、受け取った数値が偶数か奇数かを判断して出力してください。
✍️ チェック項目:
msg.data % 2
を使って偶奇を判断しているか- ロガー出力に
"Even"
または"Odd"
を出力しているか
Int32
型とは?
Int32
は、ROS 2 の標準メッセージ型 std_msgs
に含まれている 32ビット符号付き整数(int) を送受信するためのメッセージ型です。
- 値の範囲:
-2,147,483,648
〜+2,147,483,647
- Pythonでいう
int
型に対応しています - 単一のフィールド(
data
)だけを持つシンプルな構造
インポート方法:
from std_msgs.msg import Int32
Int32 メッセージの作成と代入
msg = Int32()
msg.data = 42 # 任意の整数値を代入
パブリッシャーで使う場合
self.publisher_ = self.create_publisher(Int32, 'count', 10)
サブスクライバーで使う場合
self.subscription = self.create_subscription(
Int32,
'count',
self.listener_callback,
10)
解答例はこちら
0から順に数値を送信するノード
import rclpy
from rclpy.node import Node
from std_msgs.msg import Int32
class NumberPublisher(Node):
def __init__(self):
super().__init__('number_publisher')
self.publisher_ = self.create_publisher(Int32, 'count', 10)
self.i = 0
self.timer = self.create_timer(1.0, self.timer_callback)
def timer_callback(self):
msg = Int32()
msg.data = self.i
self.publisher_.publish(msg)
self.get_logger().info(f'Published: {msg.data}')
self.i += 1
def main(args=None):
rclpy.init(args=args)
node = NumberPublisher()
rclpy.spin(node)
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
受け取った整数が偶数か奇数かを判断するノード
import rclpy
from rclpy.node import Node
from std_msgs.msg import Int32
class NumberSubscriber(Node):
def __init__(self):
super().__init__('number_subscriber')
self.subscription = self.create_subscription(
Int32,
'count',
self.listener_callback,
10)
def listener_callback(self, msg):
if msg.data % 2 == 0:
self.get_logger().info(f'{msg.data} is Even')
else:
self.get_logger().info(f'{msg.data} is Odd')
def main(args=None):
rclpy.init(args=args)
node = NumberSubscriber()
rclpy.spin(node)
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
今回はパッケージの作成とビルドを学びました。
次回はこちら
Comming soon