Source code for ex_assignment1.observer

"""
Aruco Observer Node.

This module implements a ROS 2 node responsible for **environment scanning**
and **ArUco marker discovery**.

The node rotates the robot in place while listening to ArUco detections.
Once a predefined number of unique markers is detected, it:

1. Stops the robot.
2. Stores the detected marker IDs in a YAML file.
3. Launches the ServoMarker action client to process each marker.

This node acts as the **high-level trigger** of the assignment pipeline and
does not perform any servoing or navigation by itself.
"""

import rclpy
from rclpy.node import Node

import yaml
import subprocess
import time

from geometry_msgs.msg import Twist
from aruco_opencv_msgs.msg import ArucoDetection


[docs] class ArucoObserver(Node): """ ROS 2 node for scanning and collecting ArUco marker IDs. The observer: - Rotates the robot in place to scan the environment. - Subscribes to `/aruco_detections`. - Collects unique marker IDs. - Stops scanning after reaching a target count. - Saves results to a YAML file. - Launches the ServoMarker action client. Publications ------------ /cmd_vel : geometry_msgs.msg.Twist Velocity commands used to rotate or stop the robot. Subscriptions ------------- /aruco_detections : aruco_opencv_msgs.msg.ArucoDetection Incoming ArUco marker detections. """ def __init__(self): """ Initialize the ArucoObserver node. Sets up publishers, subscriptions, internal state variables, and immediately starts the scanning behavior. """ super().__init__('aruco_observer') # ----------------------------------------- # Publishers & Subscribers # ----------------------------------------- self.cmd_pub = self.create_publisher(Twist, '/cmd_vel', 10) self.subscription = self.create_subscription( ArucoDetection, '/aruco_detections', self.detection_callback, 10 ) # ----------------------------------------- # State # ----------------------------------------- self.detected_ids = set() self.target_count = 5 self.yaml_path = "/home/shady/ros2_ws/src/detections.yaml" self.scanning = True self.get_logger().info( "🔄 Aruco Observer started — rotating to scan markers" ) self.spin_robot() # ------------------------------------------------ # Robot motion helpers # ------------------------------------------------
[docs] def spin_robot(self): """ Publish angular velocity to rotate the robot in place. This method is used while scanning for ArUco markers. """ twist = Twist() twist.angular.z = 0.3 twist.linear.x = 0.0 self.cmd_pub.publish(twist)
[docs] def stop_robot(self): """ Stop all robot motion by publishing zero velocities. """ self.cmd_pub.publish(Twist()) self.get_logger().info("🛑 Robot stopped")
# ------------------------------------------------ # Detection callback # ------------------------------------------------
[docs] def detection_callback(self, msg: ArucoDetection): """ Process incoming ArUco detections. This callback: - Extracts marker IDs from incoming messages. - Tracks unique IDs. - Continues rotating while scanning. - Stops scanning once the target number of markers is reached. Parameters ---------- msg : aruco_opencv_msgs.msg.ArucoDetection Message containing detected ArUco markers. """ for m in msg.markers: self.detected_ids.add(int(m.marker_id)) self.get_logger().info( f"Detected IDs: {sorted(self.detected_ids)}" ) if self.scanning: self.spin_robot() if len(self.detected_ids) >= self.target_count and self.scanning: self.scanning = False self.get_logger().info( f"✅ Reached {self.target_count} markers → stopping scan" ) self.stop_robot() self.save_yaml() self.launch_actions() self.get_logger().info( "🔚 Observer finished — shutting down" ) rclpy.shutdown()
# ------------------------------------------------ # Save detected IDs # ------------------------------------------------
[docs] def save_yaml(self): """ Save detected marker IDs to a YAML file. The marker IDs are sorted before saving to ensure deterministic execution order in the action client. """ sorted_ids = sorted(self.detected_ids) with open(self.yaml_path, "w") as f: yaml.dump(sorted_ids, f) self.get_logger().info( f"📁 Saved marker IDs to {self.yaml_path}" )
# ------------------------------------------------ # Launch action client # ------------------------------------------------
[docs] def launch_actions(self): """ Launch the ServoMarker action client as a separate process. A short delay is introduced to ensure the system is stable before starting action execution. """ self.get_logger().info( "🚀 Launching Action Client in 10s..." ) time.sleep(10) subprocess.Popen( ["ros2", "run", "ex_assignment1", "action_client"] )
def main(args=None): """ ROS 2 entry point for the ArucoObserver node. Initializes ROS 2, starts the observer node, and spins until shutdown. """ rclpy.init(args=args) node = ArucoObserver() try: rclpy.spin(node) except SystemExit: pass node.destroy_node() if __name__ == "__main__": main()