Python ROS2 プログラム 高専4年生

【ROS2講座④】自作したプログラムのTurtlesim制御【Python】

前回はROS2のパッケージの作成とビルドについて学びました。

今回はTurtlesimを使って、トピック/サービス/アクション/パラメータについて具体的にどのようにプログラムを組んだら良いか学んでゆきます。

トピックの送信プログラム

まずはTurtlesimを起動します。

ros2 run turtlesim turtlesim_node

もう一つで以下のコマンドを実行するとどのようなトピックがやり取りされているか確認できます。

ros2 topic list

以下のように表示されます。

/parameter_events
/rosout
/turtle1/cmd_vel
/turtle1/pose

ここにある /turtle1/cmd_vel が「動作指令を受け取るトピック」です。

メッセージ型 geometry_msgs/msg/Twist

Twist は移動速度と回転速度を表す構造体です。

Twist:
  linear:
    x, y, z    # 直進方向の速度
  angular:
    x, y, z    # 回転方向の速度

Turtlesimでは主に

  • linear.x:前進速度
  • angular.z:回転速度
    を使います。

作業用ワークスペースを作ります。

cd ~/ros2_ws/src
ros2 pkg create --build-type ament_python --license Apache-2.0 turtle_playground

以下のプログラムを作成します。

ファイル位置~/ros2_ws/src/turtle_playground/turtle_playground/turtle_vel_publisher.py

import rclpy
from rclpy.node import Node
from geometry_msgs.msg import Twist

class TurtleVelPublisher(Node):
    """ /turtle1/cmd_vel に Twist を 10Hz で送る Publisher """
    def __init__(self):
        super().__init__('turtle_vel_publisher')
        self.pub = self.create_publisher(Twist, '/turtle1/cmd_vel', 10)
        self.declare_parameter('linear_x', 2.0)   # 直進速度
        self.declare_parameter('angular_z', 0.0)  # 角速度(+で左回り)
        self.timer = self.create_timer(0.1, self.on_timer)  # 10Hz

    def on_timer(self):
        lin = float(self.get_parameter('linear_x').value)
        ang = float(self.get_parameter('angular_z').value)
        msg = Twist()
        msg.linear.x = lin
        msg.angular.z = ang
        self.pub.publish(msg)
        self.get_logger().info(f'cmd_vel: lin={lin:.2f}, ang={ang:.2f}')

def main():
    rclpy.init()
    node = TurtleVelPublisher()
    try:
            rclpy.spin(node)
    finally:
        node.destroy_node()
        rclpy.shutdown()

ポイント:

  • トピックは「データの通り道」。名前が一致しないと繋がりません(turtlesim は /turtle1/cmd_vel)。
  • Twist は「移動の速度指令」:linear.x(直進)+angular.z(旋回)だけでOK。
  • タイマーで定期送信=リアルタイム制御。パラメータで実行中に値を変えられます。

setup.pyを書き換えます。

ファイル位置~/ros2_ws/src/turtle_playground/setup.py

from setuptools import find_packages, setup

package_name = 'turtle_playground'

setup(
    name=package_name,
    version='0.0.0',
    packages=find_packages(exclude=['test']),
    data_files=[
        ('share/ament_index/resource_index/packages',
            ['resource/' + package_name]),
        ('share/' + package_name, ['package.xml']),
    ],
    install_requires=['setuptools'],
    zip_safe=True,
    maintainer='ubuntu',
    maintainer_email='ubuntu@todo.todo',
    description='TODO: Package description',
    license='Apache-2.0',
    extras_require={
        'test': [
            'pytest',
        ],
    },
    entry_points={
        'console_scripts': [
            'turtle_vel_publisher = turtle_playground.turtle_vel_publisher:main',  # ADDED: Publisher を登録
        ],
    },
)
  • entry_points:ここに書かれた名前(左側)が ros2 run コマンドで呼び出せる実行ファイルになります。
    例)ros2 run py_pubsub turtle_vel_publisher
  • py_pubsub.turtle_vel_publisher:main
    py_pubsub/turtle_vel_publisher.py 内の main() 関数を起動する」という意味です。

package.xmlを書き換えます。

ファイル位置~/ros2_ws/src/turtle_playground/package.xml

<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
  <name>turtle_playground</name>
  <version>0.0.0</version>
  <description>TODO: Package description</description>
  <maintainer email="ubuntu@todo.todo">ubuntu</maintainer>
  <license>Apache-2.0</license>

  <test_depend>ament_copyright</test_depend>
  <test_depend>ament_flake8</test_depend>
  <test_depend>ament_pep257</test_depend>
  <test_depend>python3-pytest</test_depend>

  <exec_depend>geometry_msgs</exec_depend>  <!-- ADDED: Twist を使うために必要 -->

  <export>
    <build_type>ament_python</build_type>
  </export>
</package>

ビルドを行います。

cd ~/ros2_ws
colcon build --packages-select turtle_playground

実行してみます。

1つ目のターミナルで以下を実行します。

source install/setup.bash
ros2 run turtlesim turtlesim_node

2つ目のターミナルで以下を実行します。

source install/setup.bash
ros2 run turtle_playground turtle_vel_publisher

Turtlesimの亀がまっすぐ動き出します。

ここで、プログラムにはあとから変更することができるパラメータを定義してあります。

以下のコマンドでパラメータを変更できます。

ros2 param set /turtle_vel_publisher angular_z 1.0

カーブすると思います。

続いて、Turtlesimからトピックを受け取ってみます。

ファイル~/ros2_ws/src/turtle_playground/turtle_playground/turtle_pose_listener.py

ros2_ws/py_pubsub/py_pubsub/turtle_pose_listener.py

import math
import rclpy
from rclpy.node import Node
from turtlesim.msg import Pose

class TurtlePoseListener(Node):
    """ /turtle1/pose を購読して座標と向きを表示。差分から概算速度も出す """
    def __init__(self):
        super().__init__('turtle_pose_listener')
        self.sub = self.create_subscription(Pose, '/turtle1/pose', self.on_pose, 10)
        self.prev = None  # (x, y, t)

    def on_pose(self, msg: Pose):
        now = self.get_clock().now().nanoseconds * 1e-9
        self.get_logger().info(f'Pose: x={msg.x:.2f}, y={msg.y:.2f}, theta={msg.theta:.2f}')
        if self.prev is not None:
            px, py, pt = self.prev
            dt = max(now - pt, 1e-6)
            v = math.hypot(msg.x - px, msg.y - py) / dt
            self.get_logger().info(f'Estimated speed ≈ {v:.2f}')
        self.prev = (msg.x, msg.y, now)

def main():
    rclpy.init()
    node = TurtlePoseListener()
    try:
        rclpy.spin(node)
    finally:
        node.destroy_node()
        rclpy.shutdown()
  • /turtle1/pose は Turtlesim が発行する現在位置情報。
  • コールバック関数 listener_callback() が、メッセージを受け取るたびに呼ばれます。
  • 位置の変化から、おおよその移動速度も計算しています。

setup.pyに追記します。

from setuptools import find_packages, setup

package_name = 'turtle_playground'

setup(
    name=package_name,
    version='0.0.0',
    packages=find_packages(exclude=['test']),
    data_files=[
        ('share/ament_index/resource_index/packages',
            ['resource/' + package_name]),
        ('share/' + package_name, ['package.xml']),
    ],
    install_requires=['setuptools'],
    zip_safe=True,
    maintainer='ubuntu',
    maintainer_email='ubuntu@todo.todo',
    description='TODO: Package description',
    license='Apache-2.0',
    extras_require={
        'test': [
            'pytest',
        ],
    },
    entry_points={
        'console_scripts': [
            'turtle_vel_publisher = turtle_playground.turtle_vel_publisher:main',  # ADDED: Publisher を登録
            # ▼▼▼ 今回の追加 ▼▼▼
            'turtle_pose_listener = turtle_playground.turtle_pose_listener:main',   # ADDED: Subscriber を登録
        ],
    },
)

package.xml にも追記します。

ファイル~/ros2_ws/src/turtle_playground/package.xml

<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
  <name>turtle_playground</name>
  <version>0.0.0</version>
  <description>TODO: Package description</description>
  <maintainer email="ubuntu@todo.todo">ubuntu</maintainer>
  <license>Apache-2.0</license>

  <test_depend>ament_copyright</test_depend>
  <test_depend>ament_flake8</test_depend>
  <test_depend>ament_pep257</test_depend>
  <test_depend>python3-pytest</test_depend>

  <exec_depend>geometry_msgs</exec_depend>  <!-- ADDED: Twist を使うために必要 -->
  <!-- ▼▼▼ 今回の追加 ▼▼▼ -->
  <exec_depend>turtlesim</exec_depend>      <!-- ADDED: Pose 型を使うために必要 -->
  <!-- ▲▲▲ ここまで ▲▲▲ -->

  <export>
    <build_type>ament_python</build_type>
  </export>
</package>

ビルドします。

cd ~/ros2_ws
colcon build --packages-select turtle_playground
source install/setup.bash

実行してみます。

一つ目のターミナルで以下を実行します。

source install/setup.bash
ros2 run turtlesim turtlesim_node

2つ目のターミナルで以下を実行します。

source install/setup.bash
ros2 run turtle_playground turtle_vel_publisher

3つ目のターミナルで以下を実行します。

source install/setup.bash
ros2 run turtle_playground turtle_pose_listener

velocity_publisher が動いている状態でこれを実行すると、
リアルタイムで Turtlesim の座標や推定速度が表示されます。

課題4

亀を「ほぼ円形」に動かすノードを作成せよ。

  • トピック名:/turtle1/cmd_vel
  • メッセージ型:Twist
  • linear.x(直進速度)と angular.z(回転速度)を一定値で送る
  • 10Hz で publish
  • パラメータ linear_xangular_z を定義し、ros2 param set で変更可能にする
回答例はこちら
import rclpy
from rclpy.node import Node
from geometry_msgs.msg import Twist


class CirclePublisher(Node):
    """
    /turtle1/cmd_vel に一定の線速度と角速度を送り続けて、
    ほぼ円(~楕円)を描かせるパブリッシャ。

    可変パラメータ:
      - linear_x  : 直進速度(既定 2.0)
      - angular_z : 角速度   (既定 1.0)
      - rate_hz   : 送信周波数(既定 10Hz)

    例)
      ros2 run py_topic_basics circle_publisher
      ros2 param set /circle_publisher angular_z 0.5   # 曲率を弱める
      ros2 param set /circle_publisher linear_x  3.0   # 半径を広げる
    """

    def __init__(self):
        super().__init__('circle_publisher')

        # パラメータ宣言(後から ros2 param set で変更可能)
        self.declare_parameter('linear_x', 2.0)
        self.declare_parameter('angular_z', 1.0)
        self.declare_parameter('rate_hz', 10.0)

        # Publisher:/turtle1/cmd_vel に Twist を送る
        self.pub = self.create_publisher(Twist, '/turtle1/cmd_vel', 10)

        # タイマ周期をパラメータから計算
        rate_hz = float(self.get_parameter('rate_hz').value)
        period = 1.0 / max(rate_hz, 1e-3)  # 0割防止
        self.timer = self.create_timer(period, self.on_timer)

        self.get_logger().info(
            f'Start circle publisher: rate={rate_hz} Hz, '
            f'lin={self.get_parameter("linear_x").value}, '
            f'ang={self.get_parameter("angular_z").value}'
        )

    def on_timer(self):
        # 現在値(パラメータは実行中に変更しても反映される)
        lin = float(self.get_parameter('linear_x').value)
        ang = float(self.get_parameter('angular_z').value)

        msg = Twist()
        msg.linear.x = lin
        msg.angular.z = ang

        self.pub.publish(msg)
        self.get_logger().info(f'cmd_vel: lin_x={lin:.2f}, ang_z={ang:.2f}')

def main():
    rclpy.init()
    node = CirclePublisher()
    try:
        rclpy.spin(node)
    finally:
        node.destroy_node()
        rclpy.shutdown()


if __name__ == '__main__':
    main()

実行中にパラメータを変えてみる

# 曲率を緩く(=角速度を小さく)
ros2 param set /circle_publisher angular_z 0.5

# 半径を広く(=線速度を速く)
ros2 param set /circle_publisher linear_x 3.0

# 更新頻度を上げる(ログうるさくなるので注意)
ros2 param set /circle_publisher rate_hz 20.0

サービスの送信プログラム

続いて、サービスを呼び出すプログラムを書いてみましょう。

カメが書き出すペンの動きを変えるプログラムです。

以下を作成します。

ファイル~/ros2_ws/src/turtle_playground/turtle_playground/set_pen_client.py

import rclpy
from rclpy.node import Node
from turtlesim.srv import SetPen

class SetPenClient(Node):
    """
    /turtle1/set_pen を呼び出してペン設定を変更。
    r,g,b: 0-255, width: 線の太さ, off: 0=描く / 1=描かない
    """
    def __init__(self):
        super().__init__('set_pen_client')
        self.cli = self.create_client(SetPen, '/turtle1/set_pen')
        while not self.cli.wait_for_service(timeout_sec=1.0):
            self.get_logger().info('Waiting for /turtle1/set_pen ...')
        self.req = SetPen.Request()

    def call(self, r, g, b, width=5, off=0):
        self.req.r = int(r)
        self.req.g = int(g)
        self.req.b = int(b)
        self.req.width = int(width)
        self.req.off = int(off)
        return self.cli.call_async(self.req)

def main():
    rclpy.init()
    node = SetPenClient()
    # 例:赤・太さ5・描画ON
    fut = node.call(255, 0, 0, width=5, off=0)
    rclpy.spin_until_future_complete(node, fut)
    node.get_logger().info('SetPen done.')
    node.destroy_node()
    rclpy.shutdown()
  • サービス型turtlesim/srv/SetPenr,g,b,width,off を渡します。
  • wait_for_service()サーバ準備完了を待つのが鉄則。
  • call_async() は非同期。ここでは spin_until_future_complete()完了まで待機

setupを書き加えます。

ファイル~/ros2_ws/src/turtle_playground/setup.py

from setuptools import find_packages, setup

package_name = 'turtle_playground'

setup(
    name=package_name,
    version='0.0.0',
    packages=find_packages(exclude=['test']),
    data_files=[
        ('share/ament_index/resource_index/packages',
            ['resource/' + package_name]),
        ('share/' + package_name, ['package.xml']),
    ],
    install_requires=['setuptools'],
    zip_safe=True,
    maintainer='ubuntu',
    maintainer_email='ubuntu@todo.todo',
    description='TODO: Package description',
    license='Apache-2.0',
    extras_require={
        'test': [
            'pytest',
        ],
    },
    entry_points={
        'console_scripts': [
            'turtle_vel_publisher = turtle_playground.turtle_vel_publisher:main',  # ADDED: Publisher を登録
            'turtle_pose_listener = turtle_playground.turtle_pose_listener:main',   # ADDED: Subscriber を登録
            # ▼▼▼ 今回の追加 ▼▼▼
            'set_pen_client      = turtle_playground.set_pen_client:main',          # ADDED: /turtle1/set_pen クライアント
        ],
    },
)

package.xmlは特に追記しなくて大丈夫です。

<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
  <name>turtle_playground</name>
  <version>0.0.0</version>
  <description>TODO: Package description</description>
  <maintainer email="ubuntu@todo.todo">ubuntu</maintainer>
  <license>Apache-2.0</license>

  <test_depend>ament_copyright</test_depend>
  <test_depend>ament_flake8</test_depend>
  <test_depend>ament_pep257</test_depend>
  <test_depend>python3-pytest</test_depend>

  <exec_depend>geometry_msgs</exec_depend>  <!-- ADDED: Twist を使うために必要 -->
  <exec_depend>turtlesim</exec_depend>      <!-- ADDED: Pose 型を使うために必要 -->

  <export>
    <build_type>ament_python</build_type>
  </export>
</package>

動かしてみましょう。

ビルドします。

cd ~/ros2_ws
colcon build --packages-select turtle_playground

1つ目のターミナルで以下を実行します。(Turtlesim を起動)

source install/setup.bash
ros2 run turtlesim turtlesim_node

2つ目のターミナルで課題4のプログラムを動かすと、白い線のまま回ります。

source install/setup.bash
ros2 run turtle_playground turtle_vel_publisher

3つ目のターミナルで以下を実行します。

source install/setup.bash
ros2 run turtle_playground set_pen_client

実行後から線が赤くなれば成功です。

課題5

課題4では円を描くパブリッシャーを作りました。

この円の軌跡を“破線”にするサービスを作ってください。

  • 既存の 円運動 Publisher はそのまま使用(速度で動かす)。
  • 新しく作るノードは /turtle1/set_pen サービスを交互に呼び出すだけです。
  • 線を「描く(off=0)」→「描かない(off=1)」を繰り返すことで、点線に見えるようにします。
  • トピック(動作)とサービス(状態切替)の役割分担を体感してみましょう。

作成要件:

ノード名:dash_pen_min
サービス:turtlesim/srv/SetPen
色:白(r=255, g=255, b=255)、太さ:5
ON時間:0.6秒、OFF時間:0.4秒
ずっと繰り返す
コード内で複雑な仕組み(タイマーやスレッド)は使わない
 → whileループと time.sleep() だけでOK!

解答例はこちら
import time
import rclpy
from rclpy.node import Node
from turtlesim.srv import SetPen

def main():
    rclpy.init()
    node = Node('dash_pen_min')
    cli = node.create_client(SetPen, '/turtle1/set_pen')
    while not cli.wait_for_service(1.0):
        node.get_logger().info('Waiting for /turtle1/set_pen ...')

    # 固定: 白(255,255,255), 幅5 / 0.6秒ON, 0.4秒OFF
    while rclpy.ok():
        req_on = SetPen.Request();  req_on.r, req_on.g, req_on.b, req_on.width, req_on.off = 255,255,255,5,0
        cli.call_async(req_on)  # 応答は待たない
        time.sleep(0.6)

        req_off = SetPen.Request(); req_off.r, req_off.g, req_off.b, req_off.width, req_off.off = 255,255,255,5,1
        cli.call_async(req_off)
        time.sleep(0.4)

    node.destroy_node()
    rclpy.shutdown()

if __name__ == '__main__':
    main()

アクション送信のプログラム

アクションはトピックとサービスのいいとこどりをした通信方法でした。

通信種別主な特徴
トピック送るだけ(One-way)・連続的なデータに向く(例:センサ値)
サービス要求と応答(Request / Response)1回限り
アクション長時間かかる処理を途中経過を含めてやり取り(Goal / Feedback / Result)

ここでは turtlesim のアクション /turtle1/rotate_absolute を使う アクションクライアントを、既存パッケージ turtle_playground に1ファイル追加して動かしてみましょう。

まずは90度左回転させるアクションプログラムです。

ファイル~/ros2_ws/src/turtle_playground/turtle_playground/rotate_abs_client.py

import math
import rclpy
from rclpy.node import Node
from rclpy.action import ActionClient
from turtlesim.action import RotateAbsolute

class RotateAbsoluteClient(Node):
    """
    /turtle1/rotate_absolute へゴール(目標角度theta)を送るアクションクライアント。
    - 途中経過(Feedback)として current_angle が届く。
    - 既定値: theta = 90度(=1.5708rad)。--ros-args -p theta:=... で変更可。
    """
    def __init__(self):
        super().__init__('rotate_abs_client')
        self._client = ActionClient(self, RotateAbsolute, '/turtle1/rotate_absolute')
        # 角度[rad]のパラメータ(既定=90度)
        self.declare_parameter('theta', 1.57079632679)

    def send_goal(self):
        # サーバ出現待ち
        if not self._client.wait_for_server(timeout_sec=5.0):
            self.get_logger().error('Action server /turtle1/rotate_absolute not available.')
            return False

        theta = float(self.get_parameter('theta').value)

        goal = RotateAbsolute.Goal()
        goal.theta = theta

        self.get_logger().info(f'Sending goal: theta={theta:.3f} rad ({theta*180/math.pi:.1f} deg)')

        # フィードバックコールバックを登録して非同期送信
        send_future = self._client.send_goal_async(goal, feedback_callback=self._on_feedback)
        send_future.add_done_callback(self._on_goal_response)
        return True

    def _on_feedback(self, feedback_msg):
        current = feedback_msg.feedback.current_angle
        self.get_logger().info(f'Feedback: current_angle={current:.3f}')

    def _on_goal_response(self, future):
        goal_handle = future.result()
        if not goal_handle.accepted:
            self.get_logger().warn('Goal rejected.')
            rclpy.shutdown()
            return
        self.get_logger().info('Goal accepted. Waiting for result...')
        result_future = goal_handle.get_result_async()
        result_future.add_done_callback(self._on_result)

    def _on_result(self, future):
        result = future.result().result
        self.get_logger().info(f'Result: final_angle={result.delta:.3f} (delta)')
        # 終了
        rclpy.shutdown()

def main():
    rclpy.init()
    node = RotateAbsoluteClient()
    if node.send_goal():
        rclpy.spin(node)   # フィードバック/結果を待つ
    node.destroy_node()

if __name__ == '__main__':
    main()

ポイント

  • Goal: 目標角度 theta を送信
  • Feedback: 現在角度 current_angle を随時受信
  • Result: 完了時の delta(回転量)が返る
  • 角度は ラジアン。90°=1.5708, 180°=3.1416

setup.pyに下記の通り追記します。

ファイル~/ros2_ws/src/turtle_playground/setup.py

from setuptools import find_packages, setup

package_name = 'turtle_playground'

setup(
    name=package_name,
    version='0.0.0',
    packages=find_packages(exclude=['test']),
    data_files=[
        ('share/ament_index/resource_index/packages',
            ['resource/' + package_name]),
        ('share/' + package_name, ['package.xml']),
    ],
    install_requires=['setuptools'],
    zip_safe=True,
    maintainer='ubuntu',
    maintainer_email='ubuntu@todo.todo',
    description='TODO: Package description',
    license='Apache-2.0',
    extras_require={
        'test': [
            'pytest',
        ],
    },
    entry_points={
        'console_scripts': [
            'turtle_vel_publisher = turtle_playground.turtle_vel_publisher:main',  # ADDED: Publisher を登録
            'turtle_pose_listener = turtle_playground.turtle_pose_listener:main',   # ADDED: Subscriber を登録
            'set_pen_client      = turtle_playground.set_pen_client:main',          # ADDED: /turtle1/set_pen クライアント
            # ▼▼▼ 今回の追加 ▼▼▼
            'rotate_abs_client   = turtle_playground.rotate_abs_client:main',       # ADDED: アクションクライアント
            
        ],
    },
)

package.xmlは今回はそのままです。

ビルドと実行をしてみます。

cd ~/ros2_ws
colcon build --packages-select turtle_playground

Turtlesimを起動します。

source install/setup.bash
ros2 run turtlesim turtlesim_node

別ターミナルで作成したアクションプログラムを起動してみます。

ros2 run turtle_playground rotate_abs_client

90度回転すれば成功です。

以下のようにパラメータを指定することもできます。

# 180度回転(パラメータ指定)
ros2 run turtle_playground rotate_abs_client --ros-args -p theta:=3.1416

実行中に端末に Feedback(current_angle)→ Result が表示されます。

次にキー操作でゴールを送信、キャンセルを行うプログラムを作成してみます。

ターミナルにフォーカスした状態でキー入力を拾い、ゴールを送信します。

  • x … 180°(π rad)へ回転
  • v … 0°へ回転
  • c … 進行中のゴールをキャンセル
  • q … 終了
    を実現します。

ファイル~/ros2_ws/src/turtle_playground/turtle_playground/rotate_abs_keycontrol_raw.py

import math
import os
import select
import threading
import rclpy
from rclpy.node import Node
from rclpy.action import ActionClient
from turtlesim.action import RotateAbsolute

import termios
import tty

class RotateAbsoluteKeyControlRaw(Node):
    """
    X: 180°, V: 0°, C: Cancel, Q: Quit
    Enter不要。端末フォーカスをこのウィンドウに置いた状態でキーを押す。
    """
    def __init__(self):
        super().__init__('rotate_abs_keycontrol_raw')
        self.client = ActionClient(self, RotateAbsolute, '/turtle1/rotate_absolute')
        self.goal_handle = None
        self.lock = threading.Lock()
        self._stop = threading.Event()
        self.key_thread = None

        # /dev/tty を raw に(1文字即時取得)
        self.fd = os.open('/dev/tty', os.O_RDONLY | os.O_NONBLOCK)
        self.old_attrs = termios.tcgetattr(self.fd)
        tty.setcbreak(self.fd)

    def start(self) -> bool:
        if not self.client.wait_for_server(5.0):
            self.get_logger().error('Action server not available.')
            return False
        self.get_logger().info('Ready: X=180°, V=0°, C=Cancel, Q=Quit (no Enter)')
        self.key_thread = threading.Thread(target=self._keyloop, daemon=True)
        self.key_thread.start()
        return True

    def destroy_node(self):
        # スレッドを先に止めてから端末を復元・クローズ
        try:
            self._stop.set()
            if self.key_thread:
                self.key_thread.join(timeout=0.2)
            import termios
            termios.tcsetattr(self.fd, termios.TCSADRAIN, self.old_attrs)
            os.close(self.fd)
        except Exception:
            pass
        super().destroy_node()

    def _keyloop(self):
        # 20msポーリングで1文字ずつ読む
        while rclpy.ok() and not self._stop.is_set():
            r, _, _ = select.select([self.fd], [], [], 0.02)
            if not r:
                continue
            try:
                ch = os.read(self.fd, 1)
            except (BlockingIOError, OSError):
                break
            if not ch:
                continue
            key = ch.decode(errors='ignore').lower()

            with self.lock:
                if key == 'x':
                    self._send_goal(math.pi)   # 180°
                elif key == 'v':
                    self._send_goal(0.0)      # 0°
                elif key == 'c':
                    self._cancel_goal()
                elif key == 'q':
                    self.get_logger().info('Quit.')
                    self._stop.set()
                    rclpy.shutdown()
                    break
                # それ以外のキーは無視

    def _send_goal(self, theta: float):
        goal = RotateAbsolute.Goal()
        goal.theta = float(theta)
        self.get_logger().info(f'Sending goal: {theta:.3f} rad ({theta*180/math.pi:.1f}°)')
        self.client.send_goal_async(goal, feedback_callback=self._on_feedback) \
                   .add_done_callback(self._on_goal_response)

    def _cancel_goal(self):
        if self.goal_handle is not None:
            self.get_logger().info('Sending cancel request...')
            self.goal_handle.cancel_goal_async().add_done_callback(lambda _: None)
        else:
            self.get_logger().warn('No active goal to cancel.')

    # Feedback は remaining(残り角度)
    def _on_feedback(self, fb_msg):
        rem = fb_msg.feedback.remaining
        self.get_logger().info(f'Feedback remaining={rem:.3f}')

    def _on_goal_response(self, fut):
        gh = fut.result()
        if not gh.accepted:
            self.get_logger().warn('Goal rejected.')
            return
        self.goal_handle = gh
        gh.get_result_async().add_done_callback(self._on_result)

    def _on_result(self, fut):
        res = fut.result()
        self.get_logger().info(f'Result: delta={res.result.delta:.3f}, status={res.status}')
        self.goal_handle = None  # 結果が返っても終了しない

def main():
    rclpy.init()
    node = RotateAbsoluteKeyControlRaw()
    try:
        if node.start():
            rclpy.spin(node)
    finally:
        node.destroy_node()
        try:
            rclpy.shutdown()
        except Exception:
            pass

if __name__ == '__main__':
    main()

setup.pyに追記します。

ファイル~/ros2_ws/src/turtle_playground/setup.py

from setuptools import find_packages, setup

package_name = 'turtle_playground'

setup(
    name=package_name,
    version='0.0.0',
    packages=find_packages(exclude=['test']),
    data_files=[
        ('share/ament_index/resource_index/packages',
            ['resource/' + package_name]),
        ('share/' + package_name, ['package.xml']),
    ],
    install_requires=['setuptools'],
    zip_safe=True,
    maintainer='ubuntu',
    maintainer_email='ubuntu@todo.todo',
    description='TODO: Package description',
    license='Apache-2.0',
    extras_require={
        'test': [
            'pytest',
        ],
    },
    entry_points={
        'console_scripts': [
            'turtle_vel_publisher = turtle_playground.turtle_vel_publisher:main',  # ADDED: Publisher を登録
            'turtle_pose_listener = turtle_playground.turtle_pose_listener:main',   # ADDED: Subscriber を登録
            'set_pen_client      = turtle_playground.set_pen_client:main',          # ADDED: /turtle1/set_pen クライアント
            'rotate_abs_client   = turtle_playground.rotate_abs_client:main',       # ADDED: アクションクライアント
            # ▼▼▼ 今回の追加 ▼▼▼
            'rotate_abs_keycontrol_raw = turtle_playground.rotate_abs_keycontrol_raw:main',  # Enter不要のキー操作
            
        ],
    },
)

ビルドして実行します。

cd ~/ros2_ws
colcon build --packages-select turtle_playground

turtlesim を起動します。

source install/setup.bash
ros2 run turtlesim turtlesim_node

今回作成したプログラムを起動します。

source install/setup.bash
ros2 run turtle_playground rotate_abs_keycontrol_raw

VとXキーで目標角度を送信、途中でCが押された場合停止すれば成功です。

課題6

作成したrotate_abs_keycontrol_raw.py のプログラムに機能を1つ追加してみてください。

次回はこちら

Follow me!

-Python, ROS2, プログラム, 高専4年生
-, , , ,

PAGE TOP