Source code for ex_assignment1.action_client

#!/usr/bin/env python3
"""
ServoMarker Action Client Node.

This module implements a ROS 2 action client that sequentially sends
ServoMarker goals to the corresponding action server.

Workflow:
    1. Load detected ArUco marker IDs from a YAML file.
    2. Sort the marker IDs.
    3. Send ServoMarker goals one-by-one.
    4. Wait for each goal to complete before sending the next.
    5. Stop automatically when all markers are processed.

This client is designed to work together with:
    - ArucoObserver (marker detection)
    - ServoMarker action server (robot servoing behavior)

The node follows a **sequential action execution pattern**, ensuring
safe and deterministic robot motion.
"""

import rclpy
from rclpy.node import Node
from rclpy.action import ActionClient

import yaml
from ex_assignment1_interfaces.action import ServoMarker


[docs] class ServoMarkerClient(Node): """ ROS 2 Action Client for the ServoMarker action. This node: - Loads a list of detected ArUco marker IDs from a YAML file - Sends ServoMarker goals sequentially - Waits for each goal result before proceeding to the next Attributes ---------- client : ActionClient Action client connected to the `servo_marker` action server. yaml_path : str Absolute path to the YAML file containing detected marker IDs. marker_ids : list[int] Sorted list of marker IDs to be processed. current_index : int Index of the currently active marker in `marker_ids`. goal_in_progress : bool Flag indicating whether a goal is currently active. """ def __init__(self): """ Initialize the ServoMarker action client node. - Creates the action client. - Loads marker IDs from YAML. - Waits for the action server. - Starts sending goals sequentially. """ super().__init__('servo_marker_client') self.client = ActionClient(self, ServoMarker, 'servo_marker') self.yaml_path = "/home/shady/ros2_ws/src/detections.yaml" # Storage for sequential sending self.marker_ids = [] self.current_index = 0 self.goal_in_progress = False self.get_logger().info("Action Client READY") # Load IDs and start workflow self.load_and_start()
[docs] def load_yaml(self): """ Load detected marker IDs from a YAML file. Returns ------- list[int] List of detected ArUco marker IDs. """ with open(self.yaml_path, "r") as f: return yaml.safe_load(f)
[docs] def load_and_start(self): """ Load marker IDs and start the action execution workflow. - Reads marker IDs from YAML. - Sorts the IDs. - Waits for the ServoMarker action server. - Sends the first goal. """ self.marker_ids = self.load_yaml() self.marker_ids.sort() self.get_logger().info(f"Loaded {len(self.marker_ids)} marker IDs") # Wait for server then start sending sequentially self.client.wait_for_server() self.send_next_goal()
# ------------------------------------- # Send next marker goal (sequential) # -------------------------------------
[docs] def send_next_goal(self): """ Send the next ServoMarker goal in sequence. This method: - Checks if a goal is already active. - Stops if all markers have been processed. - Sends the next marker ID as a goal. """ if self.goal_in_progress: return if self.current_index >= len(self.marker_ids): self.get_logger().info("🎉 All markers processed.") return marker_id = self.marker_ids[self.current_index] goal = ServoMarker.Goal() goal.marker_id = marker_id self.get_logger().info(f"➡️ Sending marker {marker_id} to server...") self.goal_in_progress = True future = self.client.send_goal_async(goal) future.add_done_callback(self.goal_response_callback)
# ------------------------------------- # Goal accepted / rejected # -------------------------------------
[docs] def goal_response_callback(self, future): """ Callback executed when the action server responds to a goal request. Parameters ---------- future : rclpy.task.Future Future containing the goal handle returned by the action server. """ goal_handle = future.result() if not goal_handle.accepted: self.get_logger().error(f"❌ Marker {self.marker_ids[self.current_index]} rejected.") self.goal_in_progress = False self.current_index += 1 self.send_next_goal() return self.get_logger().info("✅ Goal accepted") result_future = goal_handle.get_result_async() result_future.add_done_callback(self.result_callback)
# ------------------------------------- # Goal completed — send next one # -------------------------------------
[docs] def result_callback(self, future): """ Callback executed when the action server finishes executing a goal. Parameters ---------- future : rclpy.task.Future Future containing the action result. """ result = future.result().result marker_id = self.marker_ids[self.current_index] self.get_logger().info(f"📨 Server Response for marker {marker_id}: {result.message}") # Mark goal finished self.goal_in_progress = False # Move to next ID self.current_index += 1 # Send next goal self.send_next_goal()
def main(args=None): rclpy.init(args=args) node = ServoMarkerClient() rclpy.spin(node) node.destroy_node() rclpy.shutdown() if __name__ == '__main__': main()