ex_assignment1 Package

Action Server

ServoMarker Action Server Node.

This module implements the ROS 2 Action Server responsible for executing the ServoMarker action.

Design philosophy

To keep the action server readable, maintainable, and testable, the implementation is intentionally split across multiple modules:

  • action_server.py

    Defines the ROS 2 node, action server, publishers, subscriptions, and concurrency model.

  • servoing.py

    Contains the core action execution logic (state machine), implemented as the execute_callback function.

  • callbacks.py

    Contains sensor callbacks (laser and marker detection) and reusable helper methods shared across the action lifecycle.

This separation allows the action server node to focus on ROS wiring while delegating behavior and logic to specialized modules.

class ex_assignment1.action_server.ServoMarkerServer(*args: Any, **kwargs: Any)[source]

Bases: Node

ROS 2 Action Server for the ServoMarker action.

This node executes a servoing behavior that moves the robot toward a specified ArUco marker and then safely retreats.

The behavior follows a deterministic state machine:

SEARCHING → CENTERING → FORWARD → PAUSE → BACKWARD → DONE

Responsibilities

  • Expose the servo_marker action interface

  • Subscribe to perception and safety topics

  • Publish velocity commands

  • Coordinate with the DrawBox action

  • Delegate execution logic to modular helper files

Subscriptions

/aruco_detectionsaruco_opencv_msgs.msg.ArucoDetection

ArUco marker detections used for visual servoing.

/scansensor_msgs.msg.LaserScan

Laser scan data used for obstacle avoidance.

Publications

/cmd_velgeometry_msgs.msg.Twist

Velocity commands for robot motion.

Actions

Server:

servo_marker (ex_assignment1_interfaces/action/ServoMarker)

Client:

draw_box (ex_assignment1_interfaces/action/DrawBox)

Servo Logic (used by Action Server)

Servoing Logic for the ServoMarker Action.

This module contains the core execution logic of the ServoMarker action, implemented as a standalone execute_callback function.

Design Rationale

The servoing behavior is separated from the ROS 2 node definition (action_server.py) to:

  • Reduce the size and complexity of the action server file

  • Clearly isolate the state machine logic

  • Improve readability and maintainability

  • Allow independent testing and reasoning about behavior

This module does not define a ROS node. Instead, it operates on the action server instance via self, which is dynamically bound in action_server.py.

ex_assignment1.servoing.execute_callback(self, goal_handle)[source]

Execute the ServoMarker action goal.

This function implements a deterministic state machine that drives the robot toward a specific ArUco marker, pauses for visualization, and then retreats safely.

State Machine Overview

The action progresses through the following states:

  1. SEARCHING - Rotate in place until the target marker becomes visible.

  2. CENTERING - Rotate the robot to align the marker with the camera center.

  3. FORWARD - Move forward toward the marker while monitoring laser safety.

  4. PAUSE - Stop for a fixed duration and trigger the DrawBox action.

  5. BACKWARD - Retreat a fixed distance away from the marker.

The action terminates successfully once the BACKWARD state completes.

type self:

ServoMarkerServer

param self:

Instance of the ServoMarker action server node. This object provides publishers, sensor state, and helper methods imported from callbacks.py.

type self:

ServoMarkerServer

type goal_handle:

ServoMarker_Goal

param goal_handle:

Handle associated with the incoming ServoMarker action goal.

type goal_handle:

rclpy.action.server.ServerGoalHandle

returns:

Action result indicating success or failure.

rtype:

ServoMarker.Result

Callbacks (used by Action Server)

Shared Callbacks and Helper Functions for ServoMarker Action.

This module groups together sensor callbacks and utility helpers used by the ServoMarker action server.

Design Rationale

The functions in this file are intentionally separated from action_server.py to:

  • Reduce the size and complexity of the action server file

  • Avoid duplicating logic inside the servoing state machine

  • Improve clarity by isolating perception, safety, and helper behaviors

  • Allow reuse across different execution contexts if needed

All functions operate on the action server instance via self and are dynamically bound in action_server.py.

ex_assignment1.callbacks.laser_callback(self, msg)[source]

Process laser scan data to compute the minimum frontal obstacle distance.

This callback extracts a small angular window centered in front of the robot and computes the minimum valid distance. The result is stored in self.laser_front and continuously updated.

This value is used by the servoing state machine to: - Prevent forward motion when an obstacle is too close - Trigger an early transition from FORWARD to PAUSE if needed

Parameters:

msg (sensor_msgs.msg.LaserScan) – Incoming laser scan message.

ex_assignment1.callbacks.marker_callback(self, msg)[source]

Update the pose of the currently requested ArUco marker.

This callback scans all detected markers and updates the internal marker state only if the detected marker matches the target_marker_id.

The following fields are updated: - self.marker_x - self.marker_z - self.marker_seen

Parameters:

msg (aruco_opencv_msgs.msg.ArucoDetection) – Message containing detected ArUco markers.

Action Client

ServoMarker Action Client Node.

This module implements a ROS 2 action client that sequentially sends ServoMarker goals to the corresponding action server.

Workflow:
  1. Load detected ArUco marker IDs from a YAML file.

  2. Sort the marker IDs.

  3. Send ServoMarker goals one-by-one.

  4. Wait for each goal to complete before sending the next.

  5. Stop automatically when all markers are processed.

This client is designed to work together with:
  • ArucoObserver (marker detection)

  • ServoMarker action server (robot servoing behavior)

The node follows a sequential action execution pattern, ensuring safe and deterministic robot motion.

class ex_assignment1.action_client.ServoMarkerClient[source]

ROS 2 Action Client for the ServoMarker action.

This node: - Loads a list of detected ArUco marker IDs from a YAML file - Sends ServoMarker goals sequentially - Waits for each goal result before proceeding to the next

client

Action client connected to the servo_marker action server.

Type:

ActionClient

yaml_path

Absolute path to the YAML file containing detected marker IDs.

Type:

str

marker_ids

Sorted list of marker IDs to be processed.

Type:

list[int]

current_index

Index of the currently active marker in marker_ids.

Type:

int

goal_in_progress

Flag indicating whether a goal is currently active.

Type:

bool

goal_response_callback(future)[source]

Callback executed when the action server responds to a goal request.

Parameters:

future (rclpy.task.Future) – Future containing the goal handle returned by the action server.

load_and_start()[source]

Load marker IDs and start the action execution workflow.

  • Reads marker IDs from YAML.

  • Sorts the IDs.

  • Waits for the ServoMarker action server.

  • Sends the first goal.

load_yaml()[source]

Load detected marker IDs from a YAML file.

Returns:

List of detected ArUco marker IDs.

Return type:

list[int]

result_callback(future)[source]

Callback executed when the action server finishes executing a goal.

Parameters:

future (rclpy.task.Future) – Future containing the action result.

send_next_goal()[source]

Send the next ServoMarker goal in sequence.

This method: - Checks if a goal is already active. - Stops if all markers have been processed. - Sends the next marker ID as a goal.

Observer Node

Aruco Observer Node.

This module implements a ROS 2 node responsible for environment scanning and ArUco marker discovery.

The node rotates the robot in place while listening to ArUco detections. Once a predefined number of unique markers is detected, it:

  1. Stops the robot.

  2. Stores the detected marker IDs in a YAML file.

  3. Launches the ServoMarker action client to process each marker.

This node acts as the high-level trigger of the assignment pipeline and does not perform any servoing or navigation by itself.

class ex_assignment1.observer.ArucoObserver[source]

ROS 2 node for scanning and collecting ArUco marker IDs.

The observer: - Rotates the robot in place to scan the environment. - Subscribes to /aruco_detections. - Collects unique marker IDs. - Stops scanning after reaching a target count. - Saves results to a YAML file. - Launches the ServoMarker action client.

Publications

/cmd_velgeometry_msgs.msg.Twist

Velocity commands used to rotate or stop the robot.

Subscriptions

/aruco_detectionsaruco_opencv_msgs.msg.ArucoDetection

Incoming ArUco marker detections.

detection_callback(msg)[source]

Process incoming ArUco detections.

This callback: - Extracts marker IDs from incoming messages. - Tracks unique IDs. - Continues rotating while scanning. - Stops scanning once the target number of markers is reached.

Parameters:

msg (aruco_opencv_msgs.msg.ArucoDetection) – Message containing detected ArUco markers.

launch_actions()[source]

Launch the ServoMarker action client as a separate process.

A short delay is introduced to ensure the system is stable before starting action execution.

save_yaml()[source]

Save detected marker IDs to a YAML file.

The marker IDs are sorted before saving to ensure deterministic execution order in the action client.

spin_robot()[source]

Publish angular velocity to rotate the robot in place.

This method is used while scanning for ArUco markers.

stop_robot()[source]

Stop all robot motion by publishing zero velocities.

Image Modifier

class ex_assignment1.image_modifier.ImageModifier[source]
draw_aruco_boundaries(img, target_marker_id=None)[source]

Detect ArUco markers and draw their real boundaries. If target_marker_id is given, only draw that marker.

async execute_callback(goal_handle)[source]

Action callback: activate drawing for 5 seconds.

image_callback(msg)[source]

Store the latest image.

publish_raw(cv_image)[source]

Publish raw image without compression.

timer_publish()[source]

Continuously republish image, adding box while active.