前回はROS2のパッケージの作成とビルドについて学びました。
今回はTurtlesimを使って、トピック/サービス/アクション/パラメータについて具体的にどのようにプログラムを組んだら良いか学んでゆきます。

トピックの送信プログラム
まずはTurtlesimを起動します。
ros2 run turtlesim turtlesim_nodeもう一つで以下のコマンドを実行するとどのようなトピックがやり取りされているか確認できます。
ros2 topic list以下のように表示されます。
/parameter_events
/rosout
/turtle1/cmd_vel
/turtle1/poseここにある /turtle1/cmd_vel が「動作指令を受け取るトピック」です。
メッセージ型 geometry_msgs/msg/Twist
Twist は移動速度と回転速度を表す構造体です。
Twist:
linear:
x, y, z # 直進方向の速度
angular:
x, y, z # 回転方向の速度Turtlesimでは主に
linear.x:前進速度angular.z:回転速度
を使います。
作業用ワークスペースを作ります。
cd ~/ros2_ws/srcros2 pkg create --build-type ament_python --license Apache-2.0 turtle_playground以下のプログラムを作成します。
ファイル位置:~/ros2_ws/src/turtle_playground/turtle_playground/turtle_vel_publisher.py
import rclpy
from rclpy.node import Node
from geometry_msgs.msg import Twist
class TurtleVelPublisher(Node):
""" /turtle1/cmd_vel に Twist を 10Hz で送る Publisher """
def __init__(self):
super().__init__('turtle_vel_publisher')
self.pub = self.create_publisher(Twist, '/turtle1/cmd_vel', 10)
self.declare_parameter('linear_x', 2.0) # 直進速度
self.declare_parameter('angular_z', 0.0) # 角速度(+で左回り)
self.timer = self.create_timer(0.1, self.on_timer) # 10Hz
def on_timer(self):
lin = float(self.get_parameter('linear_x').value)
ang = float(self.get_parameter('angular_z').value)
msg = Twist()
msg.linear.x = lin
msg.angular.z = ang
self.pub.publish(msg)
self.get_logger().info(f'cmd_vel: lin={lin:.2f}, ang={ang:.2f}')
def main():
rclpy.init()
node = TurtleVelPublisher()
try:
rclpy.spin(node)
finally:
node.destroy_node()
rclpy.shutdown()ポイント:
- トピックは「データの通り道」。名前が一致しないと繋がりません(turtlesim は
/turtle1/cmd_vel)。
- Twist は「移動の速度指令」:
linear.x(直進)+angular.z(旋回)だけでOK。
- タイマーで定期送信=リアルタイム制御。パラメータで実行中に値を変えられます。
setup.pyを書き換えます。
ファイル位置:~/ros2_ws/src/turtle_playground/setup.py
from setuptools import find_packages, setup
package_name = 'turtle_playground'
setup(
name=package_name,
version='0.0.0',
packages=find_packages(exclude=['test']),
data_files=[
('share/ament_index/resource_index/packages',
['resource/' + package_name]),
('share/' + package_name, ['package.xml']),
],
install_requires=['setuptools'],
zip_safe=True,
maintainer='ubuntu',
maintainer_email='ubuntu@todo.todo',
description='TODO: Package description',
license='Apache-2.0',
extras_require={
'test': [
'pytest',
],
},
entry_points={
'console_scripts': [
'turtle_vel_publisher = turtle_playground.turtle_vel_publisher:main', # ADDED: Publisher を登録
],
},
)entry_points:ここに書かれた名前(左側)がros2 runコマンドで呼び出せる実行ファイルになります。
例)ros2 run py_pubsub turtle_vel_publisher
py_pubsub.turtle_vel_publisher:mainは
「py_pubsub/turtle_vel_publisher.py内のmain()関数を起動する」という意味です。
package.xmlを書き換えます。
ファイル位置:~/ros2_ws/src/turtle_playground/package.xml
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>turtle_playground</name>
<version>0.0.0</version>
<description>TODO: Package description</description>
<maintainer email="ubuntu@todo.todo">ubuntu</maintainer>
<license>Apache-2.0</license>
<test_depend>ament_copyright</test_depend>
<test_depend>ament_flake8</test_depend>
<test_depend>ament_pep257</test_depend>
<test_depend>python3-pytest</test_depend>
<exec_depend>geometry_msgs</exec_depend> <!-- ADDED: Twist を使うために必要 -->
<export>
<build_type>ament_python</build_type>
</export>
</package>ビルドを行います。
cd ~/ros2_wscolcon build --packages-select turtle_playground実行してみます。
1つ目のターミナルで以下を実行します。
source install/setup.bashros2 run turtlesim turtlesim_node2つ目のターミナルで以下を実行します。
source install/setup.bashros2 run turtle_playground turtle_vel_publisherTurtlesimの亀がまっすぐ動き出します。
ここで、プログラムにはあとから変更することができるパラメータを定義してあります。
以下のコマンドでパラメータを変更できます。
ros2 param set /turtle_vel_publisher angular_z 1.0カーブすると思います。
続いて、Turtlesimからトピックを受け取ってみます。
ファイル:~/ros2_ws/src/turtle_playground/turtle_playground/turtle_pose_listener.py
ros2_ws/py_pubsub/py_pubsub/turtle_pose_listener.py
import math
import rclpy
from rclpy.node import Node
from turtlesim.msg import Pose
class TurtlePoseListener(Node):
""" /turtle1/pose を購読して座標と向きを表示。差分から概算速度も出す """
def __init__(self):
super().__init__('turtle_pose_listener')
self.sub = self.create_subscription(Pose, '/turtle1/pose', self.on_pose, 10)
self.prev = None # (x, y, t)
def on_pose(self, msg: Pose):
now = self.get_clock().now().nanoseconds * 1e-9
self.get_logger().info(f'Pose: x={msg.x:.2f}, y={msg.y:.2f}, theta={msg.theta:.2f}')
if self.prev is not None:
px, py, pt = self.prev
dt = max(now - pt, 1e-6)
v = math.hypot(msg.x - px, msg.y - py) / dt
self.get_logger().info(f'Estimated speed ≈ {v:.2f}')
self.prev = (msg.x, msg.y, now)
def main():
rclpy.init()
node = TurtlePoseListener()
try:
rclpy.spin(node)
finally:
node.destroy_node()
rclpy.shutdown()/turtle1/poseは Turtlesim が発行する現在位置情報。- コールバック関数
listener_callback()が、メッセージを受け取るたびに呼ばれます。 - 位置の変化から、おおよその移動速度も計算しています。
setup.pyに追記します。
from setuptools import find_packages, setup
package_name = 'turtle_playground'
setup(
name=package_name,
version='0.0.0',
packages=find_packages(exclude=['test']),
data_files=[
('share/ament_index/resource_index/packages',
['resource/' + package_name]),
('share/' + package_name, ['package.xml']),
],
install_requires=['setuptools'],
zip_safe=True,
maintainer='ubuntu',
maintainer_email='ubuntu@todo.todo',
description='TODO: Package description',
license='Apache-2.0',
extras_require={
'test': [
'pytest',
],
},
entry_points={
'console_scripts': [
'turtle_vel_publisher = turtle_playground.turtle_vel_publisher:main', # ADDED: Publisher を登録
# ▼▼▼ 今回の追加 ▼▼▼
'turtle_pose_listener = turtle_playground.turtle_pose_listener:main', # ADDED: Subscriber を登録
],
},
)
package.xml にも追記します。
ファイル:~/ros2_ws/src/turtle_playground/package.xml
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>turtle_playground</name>
<version>0.0.0</version>
<description>TODO: Package description</description>
<maintainer email="ubuntu@todo.todo">ubuntu</maintainer>
<license>Apache-2.0</license>
<test_depend>ament_copyright</test_depend>
<test_depend>ament_flake8</test_depend>
<test_depend>ament_pep257</test_depend>
<test_depend>python3-pytest</test_depend>
<exec_depend>geometry_msgs</exec_depend> <!-- ADDED: Twist を使うために必要 -->
<!-- ▼▼▼ 今回の追加 ▼▼▼ -->
<exec_depend>turtlesim</exec_depend> <!-- ADDED: Pose 型を使うために必要 -->
<!-- ▲▲▲ ここまで ▲▲▲ -->
<export>
<build_type>ament_python</build_type>
</export>
</package>ビルドします。
cd ~/ros2_wscolcon build --packages-select turtle_playgroundsource install/setup.bash実行してみます。
一つ目のターミナルで以下を実行します。
source install/setup.bashros2 run turtlesim turtlesim_node2つ目のターミナルで以下を実行します。
source install/setup.bashros2 run turtle_playground turtle_vel_publisher3つ目のターミナルで以下を実行します。
source install/setup.bashros2 run turtle_playground turtle_pose_listenervelocity_publisher が動いている状態でこれを実行すると、
リアルタイムで Turtlesim の座標や推定速度が表示されます。
課題4
亀を「ほぼ円形」に動かすノードを作成せよ。
- トピック名:
/turtle1/cmd_vel - メッセージ型:
Twist linear.x(直進速度)とangular.z(回転速度)を一定値で送る- 10Hz で publish
- パラメータ
linear_xとangular_zを定義し、ros2 param setで変更可能にする
回答例はこちら
import rclpy
from rclpy.node import Node
from geometry_msgs.msg import Twist
class CirclePublisher(Node):
"""
/turtle1/cmd_vel に一定の線速度と角速度を送り続けて、
ほぼ円(~楕円)を描かせるパブリッシャ。
可変パラメータ:
- linear_x : 直進速度(既定 2.0)
- angular_z : 角速度 (既定 1.0)
- rate_hz : 送信周波数(既定 10Hz)
例)
ros2 run py_topic_basics circle_publisher
ros2 param set /circle_publisher angular_z 0.5 # 曲率を弱める
ros2 param set /circle_publisher linear_x 3.0 # 半径を広げる
"""
def __init__(self):
super().__init__('circle_publisher')
# パラメータ宣言(後から ros2 param set で変更可能)
self.declare_parameter('linear_x', 2.0)
self.declare_parameter('angular_z', 1.0)
self.declare_parameter('rate_hz', 10.0)
# Publisher:/turtle1/cmd_vel に Twist を送る
self.pub = self.create_publisher(Twist, '/turtle1/cmd_vel', 10)
# タイマ周期をパラメータから計算
rate_hz = float(self.get_parameter('rate_hz').value)
period = 1.0 / max(rate_hz, 1e-3) # 0割防止
self.timer = self.create_timer(period, self.on_timer)
self.get_logger().info(
f'Start circle publisher: rate={rate_hz} Hz, '
f'lin={self.get_parameter("linear_x").value}, '
f'ang={self.get_parameter("angular_z").value}'
)
def on_timer(self):
# 現在値(パラメータは実行中に変更しても反映される)
lin = float(self.get_parameter('linear_x').value)
ang = float(self.get_parameter('angular_z').value)
msg = Twist()
msg.linear.x = lin
msg.angular.z = ang
self.pub.publish(msg)
self.get_logger().info(f'cmd_vel: lin_x={lin:.2f}, ang_z={ang:.2f}')
def main():
rclpy.init()
node = CirclePublisher()
try:
rclpy.spin(node)
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
実行中にパラメータを変えてみる
# 曲率を緩く(=角速度を小さく)
ros2 param set /circle_publisher angular_z 0.5
# 半径を広く(=線速度を速く)
ros2 param set /circle_publisher linear_x 3.0
# 更新頻度を上げる(ログうるさくなるので注意)
ros2 param set /circle_publisher rate_hz 20.0サービスの送信プログラム
続いて、サービスを呼び出すプログラムを書いてみましょう。
カメが書き出すペンの動きを変えるプログラムです。
以下を作成します。
ファイル:~/ros2_ws/src/turtle_playground/turtle_playground/set_pen_client.py
import rclpy
from rclpy.node import Node
from turtlesim.srv import SetPen
class SetPenClient(Node):
"""
/turtle1/set_pen を呼び出してペン設定を変更。
r,g,b: 0-255, width: 線の太さ, off: 0=描く / 1=描かない
"""
def __init__(self):
super().__init__('set_pen_client')
self.cli = self.create_client(SetPen, '/turtle1/set_pen')
while not self.cli.wait_for_service(timeout_sec=1.0):
self.get_logger().info('Waiting for /turtle1/set_pen ...')
self.req = SetPen.Request()
def call(self, r, g, b, width=5, off=0):
self.req.r = int(r)
self.req.g = int(g)
self.req.b = int(b)
self.req.width = int(width)
self.req.off = int(off)
return self.cli.call_async(self.req)
def main():
rclpy.init()
node = SetPenClient()
# 例:赤・太さ5・描画ON
fut = node.call(255, 0, 0, width=5, off=0)
rclpy.spin_until_future_complete(node, fut)
node.get_logger().info('SetPen done.')
node.destroy_node()
rclpy.shutdown()- サービス型は
turtlesim/srv/SetPen。r,g,b,width,offを渡します。
wait_for_service()で サーバ準備完了を待つのが鉄則。
call_async()は非同期。ここではspin_until_future_complete()で完了まで待機。
setupを書き加えます。
ファイル:~/ros2_ws/src/turtle_playground/setup.py
from setuptools import find_packages, setup
package_name = 'turtle_playground'
setup(
name=package_name,
version='0.0.0',
packages=find_packages(exclude=['test']),
data_files=[
('share/ament_index/resource_index/packages',
['resource/' + package_name]),
('share/' + package_name, ['package.xml']),
],
install_requires=['setuptools'],
zip_safe=True,
maintainer='ubuntu',
maintainer_email='ubuntu@todo.todo',
description='TODO: Package description',
license='Apache-2.0',
extras_require={
'test': [
'pytest',
],
},
entry_points={
'console_scripts': [
'turtle_vel_publisher = turtle_playground.turtle_vel_publisher:main', # ADDED: Publisher を登録
'turtle_pose_listener = turtle_playground.turtle_pose_listener:main', # ADDED: Subscriber を登録
# ▼▼▼ 今回の追加 ▼▼▼
'set_pen_client = turtle_playground.set_pen_client:main', # ADDED: /turtle1/set_pen クライアント
],
},
)package.xmlは特に追記しなくて大丈夫です。
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>turtle_playground</name>
<version>0.0.0</version>
<description>TODO: Package description</description>
<maintainer email="ubuntu@todo.todo">ubuntu</maintainer>
<license>Apache-2.0</license>
<test_depend>ament_copyright</test_depend>
<test_depend>ament_flake8</test_depend>
<test_depend>ament_pep257</test_depend>
<test_depend>python3-pytest</test_depend>
<exec_depend>geometry_msgs</exec_depend> <!-- ADDED: Twist を使うために必要 -->
<exec_depend>turtlesim</exec_depend> <!-- ADDED: Pose 型を使うために必要 -->
<export>
<build_type>ament_python</build_type>
</export>
</package>
動かしてみましょう。
ビルドします。
cd ~/ros2_wscolcon build --packages-select turtle_playground1つ目のターミナルで以下を実行します。(Turtlesim を起動)
source install/setup.bashros2 run turtlesim turtlesim_node2つ目のターミナルで課題4のプログラムを動かすと、白い線のまま回ります。
source install/setup.bashros2 run turtle_playground turtle_vel_publisher3つ目のターミナルで以下を実行します。
source install/setup.bashros2 run turtle_playground set_pen_client実行後から線が赤くなれば成功です。

課題5
課題4では円を描くパブリッシャーを作りました。
この円の軌跡を“破線”にするサービスを作ってください。
- 既存の 円運動 Publisher はそのまま使用(速度で動かす)。
- 新しく作るノードは
/turtle1/set_penサービスを交互に呼び出すだけです。 - 線を「描く(off=0)」→「描かない(off=1)」を繰り返すことで、点線に見えるようにします。
- トピック(動作)とサービス(状態切替)の役割分担を体感してみましょう。

作成要件:
ノード名:dash_pen_min
サービス:turtlesim/srv/SetPen
色:白(r=255, g=255, b=255)、太さ:5
ON時間:0.6秒、OFF時間:0.4秒
ずっと繰り返す
コード内で複雑な仕組み(タイマーやスレッド)は使わない
→ whileループと time.sleep() だけでOK!
解答例はこちら
import time
import rclpy
from rclpy.node import Node
from turtlesim.srv import SetPen
def main():
rclpy.init()
node = Node('dash_pen_min')
cli = node.create_client(SetPen, '/turtle1/set_pen')
while not cli.wait_for_service(1.0):
node.get_logger().info('Waiting for /turtle1/set_pen ...')
# 固定: 白(255,255,255), 幅5 / 0.6秒ON, 0.4秒OFF
while rclpy.ok():
req_on = SetPen.Request(); req_on.r, req_on.g, req_on.b, req_on.width, req_on.off = 255,255,255,5,0
cli.call_async(req_on) # 応答は待たない
time.sleep(0.6)
req_off = SetPen.Request(); req_off.r, req_off.g, req_off.b, req_off.width, req_off.off = 255,255,255,5,1
cli.call_async(req_off)
time.sleep(0.4)
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()アクション送信のプログラム
アクションはトピックとサービスのいいとこどりをした通信方法でした。
| 通信種別 | 主な特徴 |
|---|---|
| トピック | 送るだけ(One-way)・連続的なデータに向く(例:センサ値) |
| サービス | 要求と応答(Request / Response)1回限り |
| アクション | 長時間かかる処理を途中経過を含めてやり取り(Goal / Feedback / Result) |
ここでは turtlesim のアクション /turtle1/rotate_absolute を使う アクションクライアントを、既存パッケージ turtle_playground に1ファイル追加して動かしてみましょう。
まずは90度左回転させるアクションプログラムです。
ファイル:~/ros2_ws/src/turtle_playground/turtle_playground/rotate_abs_client.py
import math
import rclpy
from rclpy.node import Node
from rclpy.action import ActionClient
from turtlesim.action import RotateAbsolute
class RotateAbsoluteClient(Node):
"""
/turtle1/rotate_absolute へゴール(目標角度theta)を送るアクションクライアント。
- 途中経過(Feedback)として current_angle が届く。
- 既定値: theta = 90度(=1.5708rad)。--ros-args -p theta:=... で変更可。
"""
def __init__(self):
super().__init__('rotate_abs_client')
self._client = ActionClient(self, RotateAbsolute, '/turtle1/rotate_absolute')
# 角度[rad]のパラメータ(既定=90度)
self.declare_parameter('theta', 1.57079632679)
def send_goal(self):
# サーバ出現待ち
if not self._client.wait_for_server(timeout_sec=5.0):
self.get_logger().error('Action server /turtle1/rotate_absolute not available.')
return False
theta = float(self.get_parameter('theta').value)
goal = RotateAbsolute.Goal()
goal.theta = theta
self.get_logger().info(f'Sending goal: theta={theta:.3f} rad ({theta*180/math.pi:.1f} deg)')
# フィードバックコールバックを登録して非同期送信
send_future = self._client.send_goal_async(goal, feedback_callback=self._on_feedback)
send_future.add_done_callback(self._on_goal_response)
return True
def _on_feedback(self, feedback_msg):
current = feedback_msg.feedback.current_angle
self.get_logger().info(f'Feedback: current_angle={current:.3f}')
def _on_goal_response(self, future):
goal_handle = future.result()
if not goal_handle.accepted:
self.get_logger().warn('Goal rejected.')
rclpy.shutdown()
return
self.get_logger().info('Goal accepted. Waiting for result...')
result_future = goal_handle.get_result_async()
result_future.add_done_callback(self._on_result)
def _on_result(self, future):
result = future.result().result
self.get_logger().info(f'Result: final_angle={result.delta:.3f} (delta)')
# 終了
rclpy.shutdown()
def main():
rclpy.init()
node = RotateAbsoluteClient()
if node.send_goal():
rclpy.spin(node) # フィードバック/結果を待つ
node.destroy_node()
if __name__ == '__main__':
main()ポイント
- Goal: 目標角度
thetaを送信 - Feedback: 現在角度
current_angleを随時受信 - Result: 完了時の
delta(回転量)が返る - 角度は ラジアン。90°=1.5708, 180°=3.1416
setup.pyに下記の通り追記します。
ファイル:~/ros2_ws/src/turtle_playground/setup.py
from setuptools import find_packages, setup
package_name = 'turtle_playground'
setup(
name=package_name,
version='0.0.0',
packages=find_packages(exclude=['test']),
data_files=[
('share/ament_index/resource_index/packages',
['resource/' + package_name]),
('share/' + package_name, ['package.xml']),
],
install_requires=['setuptools'],
zip_safe=True,
maintainer='ubuntu',
maintainer_email='ubuntu@todo.todo',
description='TODO: Package description',
license='Apache-2.0',
extras_require={
'test': [
'pytest',
],
},
entry_points={
'console_scripts': [
'turtle_vel_publisher = turtle_playground.turtle_vel_publisher:main', # ADDED: Publisher を登録
'turtle_pose_listener = turtle_playground.turtle_pose_listener:main', # ADDED: Subscriber を登録
'set_pen_client = turtle_playground.set_pen_client:main', # ADDED: /turtle1/set_pen クライアント
# ▼▼▼ 今回の追加 ▼▼▼
'rotate_abs_client = turtle_playground.rotate_abs_client:main', # ADDED: アクションクライアント
],
},
)package.xmlは今回はそのままです。
ビルドと実行をしてみます。
cd ~/ros2_wscolcon build --packages-select turtle_playgroundTurtlesimを起動します。
source install/setup.bashros2 run turtlesim turtlesim_node別ターミナルで作成したアクションプログラムを起動してみます。
ros2 run turtle_playground rotate_abs_client90度回転すれば成功です。
以下のようにパラメータを指定することもできます。
# 180度回転(パラメータ指定)
ros2 run turtle_playground rotate_abs_client --ros-args -p theta:=3.1416実行中に端末に Feedback(current_angle)→ Result が表示されます。
次にキー操作でゴールを送信、キャンセルを行うプログラムを作成してみます。
ターミナルにフォーカスした状態でキー入力を拾い、ゴールを送信します。
- x … 180°(π rad)へ回転
- v … 0°へ回転
- c … 進行中のゴールをキャンセル
- q … 終了
を実現します。
ファイル:~/ros2_ws/src/turtle_playground/turtle_playground/rotate_abs_keycontrol_raw.py
import math
import os
import select
import threading
import rclpy
from rclpy.node import Node
from rclpy.action import ActionClient
from turtlesim.action import RotateAbsolute
import termios
import tty
class RotateAbsoluteKeyControlRaw(Node):
"""
X: 180°, V: 0°, C: Cancel, Q: Quit
Enter不要。端末フォーカスをこのウィンドウに置いた状態でキーを押す。
"""
def __init__(self):
super().__init__('rotate_abs_keycontrol_raw')
self.client = ActionClient(self, RotateAbsolute, '/turtle1/rotate_absolute')
self.goal_handle = None
self.lock = threading.Lock()
self._stop = threading.Event()
self.key_thread = None
# /dev/tty を raw に(1文字即時取得)
self.fd = os.open('/dev/tty', os.O_RDONLY | os.O_NONBLOCK)
self.old_attrs = termios.tcgetattr(self.fd)
tty.setcbreak(self.fd)
def start(self) -> bool:
if not self.client.wait_for_server(5.0):
self.get_logger().error('Action server not available.')
return False
self.get_logger().info('Ready: X=180°, V=0°, C=Cancel, Q=Quit (no Enter)')
self.key_thread = threading.Thread(target=self._keyloop, daemon=True)
self.key_thread.start()
return True
def destroy_node(self):
# スレッドを先に止めてから端末を復元・クローズ
try:
self._stop.set()
if self.key_thread:
self.key_thread.join(timeout=0.2)
import termios
termios.tcsetattr(self.fd, termios.TCSADRAIN, self.old_attrs)
os.close(self.fd)
except Exception:
pass
super().destroy_node()
def _keyloop(self):
# 20msポーリングで1文字ずつ読む
while rclpy.ok() and not self._stop.is_set():
r, _, _ = select.select([self.fd], [], [], 0.02)
if not r:
continue
try:
ch = os.read(self.fd, 1)
except (BlockingIOError, OSError):
break
if not ch:
continue
key = ch.decode(errors='ignore').lower()
with self.lock:
if key == 'x':
self._send_goal(math.pi) # 180°
elif key == 'v':
self._send_goal(0.0) # 0°
elif key == 'c':
self._cancel_goal()
elif key == 'q':
self.get_logger().info('Quit.')
self._stop.set()
rclpy.shutdown()
break
# それ以外のキーは無視
def _send_goal(self, theta: float):
goal = RotateAbsolute.Goal()
goal.theta = float(theta)
self.get_logger().info(f'Sending goal: {theta:.3f} rad ({theta*180/math.pi:.1f}°)')
self.client.send_goal_async(goal, feedback_callback=self._on_feedback) \
.add_done_callback(self._on_goal_response)
def _cancel_goal(self):
if self.goal_handle is not None:
self.get_logger().info('Sending cancel request...')
self.goal_handle.cancel_goal_async().add_done_callback(lambda _: None)
else:
self.get_logger().warn('No active goal to cancel.')
# Feedback は remaining(残り角度)
def _on_feedback(self, fb_msg):
rem = fb_msg.feedback.remaining
self.get_logger().info(f'Feedback remaining={rem:.3f}')
def _on_goal_response(self, fut):
gh = fut.result()
if not gh.accepted:
self.get_logger().warn('Goal rejected.')
return
self.goal_handle = gh
gh.get_result_async().add_done_callback(self._on_result)
def _on_result(self, fut):
res = fut.result()
self.get_logger().info(f'Result: delta={res.result.delta:.3f}, status={res.status}')
self.goal_handle = None # 結果が返っても終了しない
def main():
rclpy.init()
node = RotateAbsoluteKeyControlRaw()
try:
if node.start():
rclpy.spin(node)
finally:
node.destroy_node()
try:
rclpy.shutdown()
except Exception:
pass
if __name__ == '__main__':
main()
setup.pyに追記します。
ファイル:~/ros2_ws/src/turtle_playground/setup.py
from setuptools import find_packages, setup
package_name = 'turtle_playground'
setup(
name=package_name,
version='0.0.0',
packages=find_packages(exclude=['test']),
data_files=[
('share/ament_index/resource_index/packages',
['resource/' + package_name]),
('share/' + package_name, ['package.xml']),
],
install_requires=['setuptools'],
zip_safe=True,
maintainer='ubuntu',
maintainer_email='ubuntu@todo.todo',
description='TODO: Package description',
license='Apache-2.0',
extras_require={
'test': [
'pytest',
],
},
entry_points={
'console_scripts': [
'turtle_vel_publisher = turtle_playground.turtle_vel_publisher:main', # ADDED: Publisher を登録
'turtle_pose_listener = turtle_playground.turtle_pose_listener:main', # ADDED: Subscriber を登録
'set_pen_client = turtle_playground.set_pen_client:main', # ADDED: /turtle1/set_pen クライアント
'rotate_abs_client = turtle_playground.rotate_abs_client:main', # ADDED: アクションクライアント
# ▼▼▼ 今回の追加 ▼▼▼
'rotate_abs_keycontrol_raw = turtle_playground.rotate_abs_keycontrol_raw:main', # Enter不要のキー操作
],
},
)ビルドして実行します。
cd ~/ros2_wscolcon build --packages-select turtle_playgroundturtlesim を起動します。
source install/setup.bashros2 run turtlesim turtlesim_node今回作成したプログラムを起動します。
source install/setup.bashros2 run turtle_playground rotate_abs_keycontrol_rawVとXキーで目標角度を送信、途中でCが押された場合停止すれば成功です。
課題6
作成したrotate_abs_keycontrol_raw.py のプログラムに機能を1つ追加してみてください。
次回はこちら

