Repository Summary
Checkout URI | https://github.com/uleroboticsgroup/yasmin.git |
VCS Type | git |
VCS Version | main |
Last Updated | 2024-12-28 |
Dev Status | DEVELOPED |
CI status | No Continuous Integration |
Released | RELEASED |
Tags | No category tags. |
Contributing |
Help Wanted (0)
Good First Issues (0) Pull Requests to Review (0) |
Packages
Name | Version |
---|---|
yasmin | 3.0.3 |
yasmin_demos | 3.0.3 |
yasmin_msgs | 3.0.3 |
yasmin_ros | 3.0.3 |
yasmin_viewer | 3.0.3 |
README
YASMIN (Yet Another State MachINe)
YASMIN is a project focused on implementing robot behaviors using Finite State Machines (FSM). It is available for ROS 2, Python and C++.
Table of Contents
Key Features
- ROS 2 Integration: Integrates with ROS 2 for easy deployment and interaction.
- Support for Python and C++: Available for both Python and C++, making it flexible for a variety of use cases.
- Rapid Prototyping: Designed for fast prototyping, allowing quick iteration of state machine behaviors.
- Predefined States: Includes states for interacting with ROS 2 action clients, service clients, and topics.
- Data Sharing with Blackboards: Utilizes blackboards for data sharing between states and state machines.
- State Management: Supports cancellation and stopping of state machines, including halting the current executing state.
- Web Viewer: Features an integrated web viewer for real-time monitoring of state machine execution.
Installation
Debian Packages
To install YASMIN and its packages, use the following command:
sudo apt install ros-$ROS_DISTRO-yasmin ros-$ROS_DISTRO-yasmin-*
Building from Source
Follow these steps to build the source code from this repository:
cd ~/ros2_ws/src
git clone https://github.com/uleroboticsgroup/yasmin.git
cd ~/ros2_ws
rosdep install --from-paths src --ignore-src -r -y
cd ~/ros2_ws
colcon build
Note for Deprecated or Rolling ROS 2 Distros
If you are using a deprecated ROS 2 distribution (e.g., Foxy or Galactic) or the Rolling distribution, install the example interfaces:
sudo apt install -y ros-$ROS_DISTRO-example-interfaces
Docker
If your operating system doesn’t support ROS 2, docker is a great alternative. You can use an image from Dockerhub or create your own images. First of all, to build the image you have to use the following command:
## Assuming you are in the YASMIN project directory
docker build -t yasmin .
To use a shortcut the docker build, you may use the following command:
## Assuming you are in the YASMIN project directory
make docker_build
After the image is created, run a docker container with the following command:
docker run -it --net=host --ipc=host --privileged --env="DISPLAY" --env="QT_X11_NO_MITSHM=1" --volume="/tmp/.X11-unix:/tmp/.X11-unix:rw" --volume="${XAUTHORITY}:/root/.Xauthority" --entrypoint /bin/bash yasmin
To use a shortcut the docker run, you may use following command:
## Assuming you are in the YASMIN project directory
make docker_run
Demos
There are some examples, for both Python and C++, that can be found in yasmin_demos.
Python
Vanilla Demo (FSM)
ros2 run yasmin_demos yasmin_demo.py
Click to expand
```python import time import rclpy import yasmin from yasmin import State from yasmin import Blackboard from yasmin import StateMachine from yasmin_ros.ros_logs import set_ros_loggers from yasmin_viewer import YasminViewerPub # Define the FooState class, inheriting from the State class class FooState(State): """ Represents the Foo state in the state machine. Attributes: counter (int): Counter to track the number of executions of this state. """ def __init__(self) -> None: """ Initializes the FooState instance, setting up the outcomes. Outcomes: outcome1: Indicates the state should continue to the Bar state. outcome2: Indicates the state should finish execution and return. """ super().__init__(["outcome1", "outcome2"]) self.counter = 0 def execute(self, blackboard: Blackboard) -> str: """ Executes the logic for the Foo state. Args: blackboard (Blackboard): The shared data structure for states. Returns: str: The outcome of the execution, which can be "outcome1" or "outcome2". Raises: Exception: May raise exceptions related to state execution. """ yasmin.YASMIN_LOG_INFO("Executing state FOO") time.sleep(3) # Simulate work by sleeping if self.counter < 3: self.counter += 1 blackboard["foo_str"] = f"Counter: {self.counter}" return "outcome1" else: return "outcome2" # Define the BarState class, inheriting from the State class class BarState(State): """ Represents the Bar state in the state machine. """ def __init__(self) -> None: """ Initializes the BarState instance, setting up the outcome. Outcomes: outcome3: Indicates the state should transition back to the Foo state. """ super().__init__(outcomes=["outcome3"]) def execute(self, blackboard: Blackboard) -> str: """ Executes the logic for the Bar state. Args: blackboard (Blackboard): The shared data structure for states. Returns: str: The outcome of the execution, which will always be "outcome3". Raises: Exception: May raise exceptions related to state execution. """ yasmin.YASMIN_LOG_INFO("Executing state BAR") time.sleep(3) # Simulate work by sleeping yasmin.YASMIN_LOG_INFO(blackboard["foo_str"]) return "outcome3" # Main function to initialize and run the state machine def main(): """ The main entry point of the application. Initializes the ROS 2 environment, sets up the state machine, and handles execution and termination. Raises: KeyboardInterrupt: If the execution is interrupted by the user. """ yasmin.YASMIN_LOG_INFO("yasmin_demo") # Initialize ROS 2 rclpy.init() # Set ROS 2 loggers set_ros_loggers() # Create a finite state machine (FSM) sm = StateMachine(outcomes=["outcome4"]) # Add states to the FSM sm.add_state( "FOO", FooState(), transitions={ "outcome1": "BAR", "outcome2": "outcome4", }, ) sm.add_state( "BAR", BarState(), transitions={ "outcome3": "FOO", }, ) # Publish FSM information for visualization YasminViewerPub("yasmin_demo", sm) # Execute the FSM try: outcome = sm() yasmin.YASMIN_LOG_INFO(outcome) except KeyboardInterrupt: if sm.is_running(): sm.cancel_state() # Shutdown ROS 2 if it's running if rclpy.ok(): rclpy.shutdown() if __name__ == "__main__": main() ```Service Demo (FSM + ROS 2 Service Client)
ros2 run yasmin_demos add_two_ints_server
ros2 run yasmin_demos service_client_demo.py
Click to expand
```python import rclpy from example_interfaces.srv import AddTwoInts import yasmin from yasmin import CbState from yasmin import Blackboard from yasmin import StateMachine from yasmin_ros import ServiceState from yasmin_ros.ros_logs import set_ros_loggers from yasmin_ros.basic_outcomes import SUCCEED, ABORT from yasmin_viewer import YasminViewerPub class AddTwoIntsState(ServiceState): """ A state that calls the AddTwoInts service to add two integers. This class is a state in a finite state machine that sends a request to the AddTwoInts service, retrieves the response, and updates the blackboard with the result. Attributes: service_type (type): The service type being used (AddTwoInts). service_name (str): The name of the service. outcomes (list): The list of possible outcomes for this state. """ def __init__(self) -> None: """ Initializes the AddTwoIntsState. Calls the parent constructor with the specific service type, service name, request handler, outcomes, and response handler. """ super().__init__( AddTwoInts, # srv type "/add_two_ints", # service name self.create_request_handler, # cb to create the request ["outcome1"], # outcomes. Includes (SUCCEED, ABORT) self.response_handler, # cb to process the response ) def create_request_handler(self, blackboard: Blackboard) -> AddTwoInts.Request: """ Creates the service request from the blackboard data. Args: blackboard (Blackboard): The blackboard containing the input values. Returns: AddTwoInts.Request: The request object populated with values from the blackboard. """ req = AddTwoInts.Request() req.a = blackboard["a"] req.b = blackboard["b"] return req def response_handler( self, blackboard: Blackboard, response: AddTwoInts.Response ) -> str: """ Processes the response from the AddTwoInts service. Updates the blackboard with the sum result from the response. Args: blackboard (Blackboard): The blackboard to update with the sum. response (AddTwoInts.Response): The response from the service call. Returns: str: The outcome of the operation, which is "outcome1". """ blackboard["sum"] = response.sum return "outcome1" def set_ints(blackboard: Blackboard) -> str: """ Sets the integer values in the blackboard. This function initializes the blackboard with two integer values to be added. Args: blackboard (Blackboard): The blackboard to update with integer values. Returns: str: The outcome of the operation, which is SUCCEED. """ blackboard["a"] = 10 blackboard["b"] = 5 return SUCCEED def print_sum(blackboard: Blackboard) -> str: """ Logs the sum value from the blackboard. This function retrieves the sum from the blackboard and logs it. Args: blackboard (Blackboard): The blackboard from which to retrieve the sum. Returns: str: The outcome of the operation, which is SUCCEED. """ yasmin.YASMIN_LOG_INFO(f"Sum: {blackboard['sum']}") return SUCCEED def main(): """ The main function to execute the finite state machine (FSM). This function initializes the ROS 2 environment, sets up logging, creates the FSM with defined states, and executes the FSM. Raises: KeyboardInterrupt: If the user interrupts the program. """ yasmin.YASMIN_LOG_INFO("yasmin_service_client_demo") # Init ROS 2 rclpy.init() # Set ROS 2 logs set_ros_loggers() # Create a FSM sm = StateMachine(outcomes=["outcome4"]) # Add states sm.add_state( "SETTING_INTS", CbState([SUCCEED], set_ints), transitions={SUCCEED: "ADD_TWO_INTS"}, ) sm.add_state( "ADD_TWO_INTS", AddTwoIntsState(), transitions={ "outcome1": "PRINTING_SUM", SUCCEED: "outcome4", ABORT: "outcome4", }, ) sm.add_state( "PRINTING_SUM", CbState([SUCCEED], print_sum), transitions={ SUCCEED: "outcome4", }, ) # Publish FSM info YasminViewerPub("YASMIN_SERVICE_CLIENT_DEMO", sm) # Execute FSM try: outcome = sm() yasmin.YASMIN_LOG_INFO(outcome) except KeyboardInterrupt: if sm.is_running(): sm.cancel_state() # Shutdown ROS 2 if rclpy.ok(): rclpy.shutdown() if __name__ == "__main__": main() ```Action Demo (FSM + ROS 2 Action)
ros2 run yasmin_demos fibonacci_action_server
ros2 run yasmin_demos action_client_demo.py
Click to expand
```python import rclpy from example_interfaces.action import Fibonacci import yasmin from yasmin import CbState, Blackboard, StateMachine from yasmin_ros import ActionState from yasmin_ros.ros_logs import set_ros_loggers from yasmin_ros.basic_outcomes import SUCCEED, ABORT, CANCEL from yasmin_viewer import YasminViewerPub class FibonacciState(ActionState): """ Class representing the state of the Fibonacci action. Inherits from ActionState and implements methods to handle the Fibonacci action in a finite state machine. Attributes: None """ def __init__(self) -> None: """ Initializes the FibonacciState. Sets up the action type and the action name for the Fibonacci action. Initializes goal, response handler, and feedback processing callbacks. Parameters: None Returns: None """ super().__init__( Fibonacci, # action type "/fibonacci", # action name self.create_goal_handler, # callback to create the goal None, # outcomes. Includes (SUCCEED, ABORT, CANCEL) self.response_handler, # callback to process the response self.print_feedback, # callback to process the feedback ) def create_goal_handler(self, blackboard: Blackboard) -> Fibonacci.Goal: """ Creates the goal for the Fibonacci action. This method retrieves the input value from the blackboard and populates the Fibonacci goal. Parameters: blackboard (Blackboard): The blackboard containing the state information. Returns: Fibonacci.Goal: The populated goal object for the Fibonacci action. Raises: KeyError: If the expected key is not present in the blackboard. """ goal = Fibonacci.Goal() goal.order = blackboard["n"] # Retrieve the input value 'n' from the blackboard return goal def response_handler(self, blackboard: Blackboard, response: Fibonacci.Result) -> str: """ Handles the response from the Fibonacci action. This method processes the result of the Fibonacci action and stores it in the blackboard. Parameters: blackboard (Blackboard): The blackboard to store the result. response (Fibonacci.Result): The result object from the Fibonacci action. Returns: str: Outcome of the operation, typically SUCCEED. Raises: None """ blackboard["fibo_res"] = ( response.sequence ) # Store the result sequence in the blackboard return SUCCEED def print_feedback( self, blackboard: Blackboard, feedback: Fibonacci.Feedback ) -> None: """ Prints feedback from the Fibonacci action. This method logs the partial sequence received during the action. Parameters: blackboard (Blackboard): The blackboard (not used in this method). feedback (Fibonacci.Feedback): The feedback object from the Fibonacci action. Returns: None Raises: None """ yasmin.YASMIN_LOG_INFO(f"Received feedback: {list(feedback.sequence)}") def print_result(blackboard: Blackboard) -> str: """ Prints the result of the Fibonacci action. This function logs the final result stored in the blackboard. Parameters: blackboard (Blackboard): The blackboard containing the result. Returns: str: Outcome of the operation, typically SUCCEED. Raises: None """ yasmin.YASMIN_LOG_INFO(f"Result: {blackboard['fibo_res']}") return SUCCEED def main(): """ Main function to execute the ROS 2 action client demo. This function initializes the ROS 2 client, sets up the finite state machine, adds the states, and starts the action processing. Parameters: None Returns: None Raises: KeyboardInterrupt: If the user interrupts the execution. """ yasmin.YASMIN_LOG_INFO("yasmin_action_client_demo") # Initialize ROS 2 rclpy.init() # Set up ROS 2 logs set_ros_loggers() # Create a finite state machine (FSM) sm = StateMachine(outcomes=["outcome4"]) # Add states to the FSM sm.add_state( "CALLING_FIBONACCI", FibonacciState(), transitions={ SUCCEED: "PRINTING_RESULT", CANCEL: "outcome4", ABORT: "outcome4", }, ) sm.add_state( "PRINTING_RESULT", CbState([SUCCEED], print_result), transitions={ SUCCEED: "outcome4", }, ) # Publish FSM information YasminViewerPub("YASMIN_ACTION_CLIENT_DEMO", sm) # Create an initial blackboard with the input value blackboard = Blackboard() blackboard["n"] = 10 # Set the Fibonacci order to 10 # Execute the FSM try: outcome = sm(blackboard) yasmin.YASMIN_LOG_INFO(outcome) except KeyboardInterrupt: if sm.is_running(): sm.cancel_state() # Cancel the state if interrupted # Shutdown ROS 2 if rclpy.ok(): rclpy.shutdown() if __name__ == "__main__": main() ```Monitor Demo (FSM + ROS 2 Subscriber)
ros2 run yasmin_demos monitor_demo.py
Click to expand
```python import rclpy from rclpy.qos import qos_profile_sensor_data from nav_msgs.msg import Odometry import yasmin from yasmin import Blackboard from yasmin import StateMachine from yasmin_ros import MonitorState from yasmin_ros.ros_logs import set_ros_loggers from yasmin_ros.basic_outcomes import TIMEOUT from yasmin_viewer import YasminViewerPub class PrintOdometryState(MonitorState): """ MonitorState subclass to handle Odometry messages. This state monitors Odometry messages from the specified ROS topic, logging them and transitioning based on the number of messages received. Attributes: times (int): The number of messages to monitor before transitioning to the next outcome. Parameters: times (int): The initial count of how many Odometry messages to process before changing state. Methods: monitor_handler(blackboard: Blackboard, msg: Odometry) -> str: Handles incoming Odometry messages, logging the message and returning the appropriate outcome based on the remaining count. """ def __init__(self, times: int) -> None: """ Initializes the PrintOdometryState. Parameters: times (int): The number of Odometry messages to monitor before transitioning to the next outcome. """ super().__init__( Odometry, # msg type "odom", # topic name ["outcome1", "outcome2"], # outcomes self.monitor_handler, # monitor handler callback qos=qos_profile_sensor_data, # qos for the topic subscription msg_queue=10, # queue for the monitor handler callback timeout=10, # timeout to wait for messages in seconds ) self.times = times def monitor_handler(self, blackboard: Blackboard, msg: Odometry) -> str: """ Handles incoming Odometry messages. This method is called whenever a new Odometry message is received. It logs the message, decrements the count of messages to process, and determines the next state outcome. Parameters: blackboard (Blackboard): The shared data storage for states. msg (Odometry): The incoming Odometry message. Returns: str: The next state outcome, either "outcome1" to continue monitoring or "outcome2" to transition to the next state. Exceptions: None """ yasmin.YASMIN_LOG_INFO(msg) self.times -= 1 if self.times <= 0: return "outcome2" return "outcome1" def main(): """ Main function to initialize and run the ROS 2 state machine. This function initializes ROS 2, sets up logging, creates a finite state machine (FSM), adds states to the FSM, and executes the FSM. It handles cleanup and shutdown of ROS 2 gracefully. Exceptions: KeyboardInterrupt: Caught to allow graceful cancellation of the state machine during execution. """ yasmin.YASMIN_LOG_INFO("yasmin_monitor_demo") # Initialize ROS 2 rclpy.init() # Set ROS 2 logs set_ros_loggers() # Create a finite state machine (FSM) sm = StateMachine(outcomes=["outcome4"]) # Add states to the FSM sm.add_state( "PRINTING_ODOM", PrintOdometryState(5), transitions={ "outcome1": "PRINTING_ODOM", "outcome2": "outcome4", TIMEOUT: "outcome4", }, ) # Publish FSM information YasminViewerPub("YASMIN_MONITOR_DEMO", sm) # Execute FSM try: outcome = sm() yasmin.YASMIN_LOG_INFO(outcome) except KeyboardInterrupt: if sm.is_running(): sm.cancel_state() # Shutdown ROS 2 if rclpy.ok(): rclpy.shutdown() if __name__ == "__main__": main() ```Nav2 Demo (Hierarchical FSM + ROS 2 Action)
ros2 run yasmin_demos nav_demo.py
Click to expand
```python import random import rclpy from geometry_msgs.msg import Pose from nav2_msgs.action import NavigateToPose import yasmin from yasmin import CbState from yasmin import Blackboard from yasmin import StateMachine from yasmin_ros import ActionState from yasmin_ros.ros_logs import set_ros_loggers from yasmin_ros.basic_outcomes import SUCCEED, ABORT, CANCEL from yasmin_viewer import YasminViewerPub # Constants for state outcomes HAS_NEXT = "has_next" ##< Indicates there are more waypoints END = "end" ##< Indicates no more waypoints class Nav2State(ActionState): """ ActionState for navigating to a specified pose using ROS 2 Navigation. Attributes: None Methods: create_goal_handler(blackboard: Blackboard) -> NavigateToPose.Goal: Creates the navigation goal from the blackboard. """ def __init__(self) -> None: """ Initializes the Nav2State. Calls the parent constructor to set up the action with: - Action type: NavigateToPose - Action name: /navigate_to_pose - Callback for goal creation: create_goal_handler - Outcomes: None, since it will use default outcomes (SUCCEED, ABORT, CANCEL) """ super().__init__( NavigateToPose, # action type "/navigate_to_pose", # action name self.create_goal_handler, # callback to create the goal None, # outcomes None, # callback to process the response ) def create_goal_handler(self, blackboard: Blackboard) -> NavigateToPose.Goal: """ Creates a goal for navigation based on the current pose in the blackboard. Args: blackboard (Blackboard): The blackboard instance holding current state data. Returns: NavigateToPose.Goal: The constructed goal for the navigation action. """ goal = NavigateToPose.Goal() goal.pose.pose = blackboard["pose"] goal.pose.header.frame_id = "map" # Set the reference frame to 'map' return goal def create_waypoints(blackboard: Blackboard) -> str: """ Initializes waypoints in the blackboard for navigation. Args: blackboard (Blackboard): The blackboard instance to store waypoints. Returns: str: Outcome indicating success (SUCCEED). """ blackboard["waypoints"] = { "entrance": [1.25, 6.30, -0.78, 0.67], "bathroom": [4.89, 1.64, 0.0, 1.0], "livingroom": [1.55, 4.03, -0.69, 0.72], "kitchen": [3.79, 6.77, 0.99, 0.12], "bedroom": [7.50, 4.89, 0.76, 0.65], } return SUCCEED def take_random_waypoint(blackboard: Blackboard) -> str: """ Selects a random set of waypoints from the available waypoints. Args: blackboard (Blackboard): The blackboard instance to store random waypoints. Returns: str: Outcome indicating success (SUCCEED). """ blackboard["random_waypoints"] = random.sample( list(blackboard["waypoints"].keys()), blackboard["waypoints_num"] ) return SUCCEED def get_next_waypoint(blackboard: Blackboard) -> str: """ Retrieves the next waypoint from the list of random waypoints. Updates the blackboard with the pose of the next waypoint. Args: blackboard (Blackboard): The blackboard instance holding current state data. Returns: str: Outcome indicating whether there is a next waypoint (HAS_NEXT) or if navigation is complete (END). """ if not blackboard["random_waypoints"]: return END wp_name = blackboard["random_waypoints"].pop(0) # Get the next waypoint name wp = blackboard["waypoints"][wp_name] # Get the waypoint coordinates pose = Pose() pose.position.x = wp[0] pose.position.y = wp[1] pose.orientation.z = wp[2] pose.orientation.w = wp[3] blackboard["pose"] = pose # Update blackboard with new pose blackboard["text"] = f"I have reached waypoint {wp_name}" return HAS_NEXT # main function def main() -> None: """ Initializes the ROS 2 node, sets up state machines for navigation, and executes the FSM. Handles cleanup and shutdown of the ROS 2 node upon completion. """ yasmin.YASMIN_LOG_INFO("yasmin_nav2_demo") # Initialize ROS 2 rclpy.init() # Set ROS 2 loggers for debugging set_ros_loggers() # Create state machines sm = StateMachine(outcomes=[SUCCEED, ABORT, CANCEL]) nav_sm = StateMachine(outcomes=[SUCCEED, ABORT, CANCEL]) # Add states to the state machine sm.add_state( "CREATING_WAYPOINTS", CbState([SUCCEED], create_waypoints), transitions={ SUCCEED: "TAKING_RANDOM_WAYPOINTS", }, ) sm.add_state( "TAKING_RANDOM_WAYPOINTS", CbState([SUCCEED], take_random_waypoint), transitions={ SUCCEED: "NAVIGATING", }, ) nav_sm.add_state( "GETTING_NEXT_WAYPOINT", CbState([END, HAS_NEXT], get_next_waypoint), transitions={ END: SUCCEED, HAS_NEXT: "NAVIGATING", }, ) nav_sm.add_state( "NAVIGATING", Nav2State(), transitions={ SUCCEED: "GETTING_NEXT_WAYPOINT", CANCEL: CANCEL, ABORT: ABORT, }, ) sm.add_state( "NAVIGATING", nav_sm, transitions={ SUCCEED: SUCCEED, CANCEL: CANCEL, ABORT: ABORT, }, ) # Publish FSM information for visualization YasminViewerPub("YASMIN_NAV_DEMO", sm) # Execute the state machine blackboard = Blackboard() blackboard["waypoints_num"] = 2 # Set the number of waypoints to navigate try: outcome = sm(blackboard) # Run the state machine with the blackboard yasmin.YASMIN_LOG_INFO(outcome) except KeyboardInterrupt: sm.cancel_state() # Handle manual interruption # Shutdown ROS 2 if rclpy.ok(): if sm.is_running(): rclpy.shutdown() if __name__ == "__main__": main() ```Cpp
Vanilla Demo
ros2 run yasmin_demos yasmin_demo
Click to expand
```cpp #includeClick to expand
```cpp #includeClick to expand
```cpp #includeClick to expand
```cpp #includeCONTRIBUTING
Repository Summary
Checkout URI | https://github.com/uleroboticsgroup/yasmin.git |
VCS Type | git |
VCS Version | main |
Last Updated | 2024-12-28 |
Dev Status | DEVELOPED |
CI status | No Continuous Integration |
Released | RELEASED |
Tags | No category tags. |
Contributing |
Help Wanted (0)
Good First Issues (0) Pull Requests to Review (0) |
Packages
Name | Version |
---|---|
yasmin | 3.0.3 |
yasmin_demos | 3.0.3 |
yasmin_msgs | 3.0.3 |
yasmin_ros | 3.0.3 |
yasmin_viewer | 3.0.3 |
README
YASMIN (Yet Another State MachINe)
YASMIN is a project focused on implementing robot behaviors using Finite State Machines (FSM). It is available for ROS 2, Python and C++.
Table of Contents
Key Features
- ROS 2 Integration: Integrates with ROS 2 for easy deployment and interaction.
- Support for Python and C++: Available for both Python and C++, making it flexible for a variety of use cases.
- Rapid Prototyping: Designed for fast prototyping, allowing quick iteration of state machine behaviors.
- Predefined States: Includes states for interacting with ROS 2 action clients, service clients, and topics.
- Data Sharing with Blackboards: Utilizes blackboards for data sharing between states and state machines.
- State Management: Supports cancellation and stopping of state machines, including halting the current executing state.
- Web Viewer: Features an integrated web viewer for real-time monitoring of state machine execution.
Installation
Debian Packages
To install YASMIN and its packages, use the following command:
sudo apt install ros-$ROS_DISTRO-yasmin ros-$ROS_DISTRO-yasmin-*
Building from Source
Follow these steps to build the source code from this repository:
cd ~/ros2_ws/src
git clone https://github.com/uleroboticsgroup/yasmin.git
cd ~/ros2_ws
rosdep install --from-paths src --ignore-src -r -y
cd ~/ros2_ws
colcon build
Note for Deprecated or Rolling ROS 2 Distros
If you are using a deprecated ROS 2 distribution (e.g., Foxy or Galactic) or the Rolling distribution, install the example interfaces:
sudo apt install -y ros-$ROS_DISTRO-example-interfaces
Docker
If your operating system doesn’t support ROS 2, docker is a great alternative. You can use an image from Dockerhub or create your own images. First of all, to build the image you have to use the following command:
## Assuming you are in the YASMIN project directory
docker build -t yasmin .
To use a shortcut the docker build, you may use the following command:
## Assuming you are in the YASMIN project directory
make docker_build
After the image is created, run a docker container with the following command:
docker run -it --net=host --ipc=host --privileged --env="DISPLAY" --env="QT_X11_NO_MITSHM=1" --volume="/tmp/.X11-unix:/tmp/.X11-unix:rw" --volume="${XAUTHORITY}:/root/.Xauthority" --entrypoint /bin/bash yasmin
To use a shortcut the docker run, you may use following command:
## Assuming you are in the YASMIN project directory
make docker_run
Demos
There are some examples, for both Python and C++, that can be found in yasmin_demos.
Python
Vanilla Demo (FSM)
ros2 run yasmin_demos yasmin_demo.py
Click to expand
```python import time import rclpy import yasmin from yasmin import State from yasmin import Blackboard from yasmin import StateMachine from yasmin_ros.ros_logs import set_ros_loggers from yasmin_viewer import YasminViewerPub # Define the FooState class, inheriting from the State class class FooState(State): """ Represents the Foo state in the state machine. Attributes: counter (int): Counter to track the number of executions of this state. """ def __init__(self) -> None: """ Initializes the FooState instance, setting up the outcomes. Outcomes: outcome1: Indicates the state should continue to the Bar state. outcome2: Indicates the state should finish execution and return. """ super().__init__(["outcome1", "outcome2"]) self.counter = 0 def execute(self, blackboard: Blackboard) -> str: """ Executes the logic for the Foo state. Args: blackboard (Blackboard): The shared data structure for states. Returns: str: The outcome of the execution, which can be "outcome1" or "outcome2". Raises: Exception: May raise exceptions related to state execution. """ yasmin.YASMIN_LOG_INFO("Executing state FOO") time.sleep(3) # Simulate work by sleeping if self.counter < 3: self.counter += 1 blackboard["foo_str"] = f"Counter: {self.counter}" return "outcome1" else: return "outcome2" # Define the BarState class, inheriting from the State class class BarState(State): """ Represents the Bar state in the state machine. """ def __init__(self) -> None: """ Initializes the BarState instance, setting up the outcome. Outcomes: outcome3: Indicates the state should transition back to the Foo state. """ super().__init__(outcomes=["outcome3"]) def execute(self, blackboard: Blackboard) -> str: """ Executes the logic for the Bar state. Args: blackboard (Blackboard): The shared data structure for states. Returns: str: The outcome of the execution, which will always be "outcome3". Raises: Exception: May raise exceptions related to state execution. """ yasmin.YASMIN_LOG_INFO("Executing state BAR") time.sleep(3) # Simulate work by sleeping yasmin.YASMIN_LOG_INFO(blackboard["foo_str"]) return "outcome3" # Main function to initialize and run the state machine def main(): """ The main entry point of the application. Initializes the ROS 2 environment, sets up the state machine, and handles execution and termination. Raises: KeyboardInterrupt: If the execution is interrupted by the user. """ yasmin.YASMIN_LOG_INFO("yasmin_demo") # Initialize ROS 2 rclpy.init() # Set ROS 2 loggers set_ros_loggers() # Create a finite state machine (FSM) sm = StateMachine(outcomes=["outcome4"]) # Add states to the FSM sm.add_state( "FOO", FooState(), transitions={ "outcome1": "BAR", "outcome2": "outcome4", }, ) sm.add_state( "BAR", BarState(), transitions={ "outcome3": "FOO", }, ) # Publish FSM information for visualization YasminViewerPub("yasmin_demo", sm) # Execute the FSM try: outcome = sm() yasmin.YASMIN_LOG_INFO(outcome) except KeyboardInterrupt: if sm.is_running(): sm.cancel_state() # Shutdown ROS 2 if it's running if rclpy.ok(): rclpy.shutdown() if __name__ == "__main__": main() ```Service Demo (FSM + ROS 2 Service Client)
ros2 run yasmin_demos add_two_ints_server
ros2 run yasmin_demos service_client_demo.py
Click to expand
```python import rclpy from example_interfaces.srv import AddTwoInts import yasmin from yasmin import CbState from yasmin import Blackboard from yasmin import StateMachine from yasmin_ros import ServiceState from yasmin_ros.ros_logs import set_ros_loggers from yasmin_ros.basic_outcomes import SUCCEED, ABORT from yasmin_viewer import YasminViewerPub class AddTwoIntsState(ServiceState): """ A state that calls the AddTwoInts service to add two integers. This class is a state in a finite state machine that sends a request to the AddTwoInts service, retrieves the response, and updates the blackboard with the result. Attributes: service_type (type): The service type being used (AddTwoInts). service_name (str): The name of the service. outcomes (list): The list of possible outcomes for this state. """ def __init__(self) -> None: """ Initializes the AddTwoIntsState. Calls the parent constructor with the specific service type, service name, request handler, outcomes, and response handler. """ super().__init__( AddTwoInts, # srv type "/add_two_ints", # service name self.create_request_handler, # cb to create the request ["outcome1"], # outcomes. Includes (SUCCEED, ABORT) self.response_handler, # cb to process the response ) def create_request_handler(self, blackboard: Blackboard) -> AddTwoInts.Request: """ Creates the service request from the blackboard data. Args: blackboard (Blackboard): The blackboard containing the input values. Returns: AddTwoInts.Request: The request object populated with values from the blackboard. """ req = AddTwoInts.Request() req.a = blackboard["a"] req.b = blackboard["b"] return req def response_handler( self, blackboard: Blackboard, response: AddTwoInts.Response ) -> str: """ Processes the response from the AddTwoInts service. Updates the blackboard with the sum result from the response. Args: blackboard (Blackboard): The blackboard to update with the sum. response (AddTwoInts.Response): The response from the service call. Returns: str: The outcome of the operation, which is "outcome1". """ blackboard["sum"] = response.sum return "outcome1" def set_ints(blackboard: Blackboard) -> str: """ Sets the integer values in the blackboard. This function initializes the blackboard with two integer values to be added. Args: blackboard (Blackboard): The blackboard to update with integer values. Returns: str: The outcome of the operation, which is SUCCEED. """ blackboard["a"] = 10 blackboard["b"] = 5 return SUCCEED def print_sum(blackboard: Blackboard) -> str: """ Logs the sum value from the blackboard. This function retrieves the sum from the blackboard and logs it. Args: blackboard (Blackboard): The blackboard from which to retrieve the sum. Returns: str: The outcome of the operation, which is SUCCEED. """ yasmin.YASMIN_LOG_INFO(f"Sum: {blackboard['sum']}") return SUCCEED def main(): """ The main function to execute the finite state machine (FSM). This function initializes the ROS 2 environment, sets up logging, creates the FSM with defined states, and executes the FSM. Raises: KeyboardInterrupt: If the user interrupts the program. """ yasmin.YASMIN_LOG_INFO("yasmin_service_client_demo") # Init ROS 2 rclpy.init() # Set ROS 2 logs set_ros_loggers() # Create a FSM sm = StateMachine(outcomes=["outcome4"]) # Add states sm.add_state( "SETTING_INTS", CbState([SUCCEED], set_ints), transitions={SUCCEED: "ADD_TWO_INTS"}, ) sm.add_state( "ADD_TWO_INTS", AddTwoIntsState(), transitions={ "outcome1": "PRINTING_SUM", SUCCEED: "outcome4", ABORT: "outcome4", }, ) sm.add_state( "PRINTING_SUM", CbState([SUCCEED], print_sum), transitions={ SUCCEED: "outcome4", }, ) # Publish FSM info YasminViewerPub("YASMIN_SERVICE_CLIENT_DEMO", sm) # Execute FSM try: outcome = sm() yasmin.YASMIN_LOG_INFO(outcome) except KeyboardInterrupt: if sm.is_running(): sm.cancel_state() # Shutdown ROS 2 if rclpy.ok(): rclpy.shutdown() if __name__ == "__main__": main() ```Action Demo (FSM + ROS 2 Action)
ros2 run yasmin_demos fibonacci_action_server
ros2 run yasmin_demos action_client_demo.py
Click to expand
```python import rclpy from example_interfaces.action import Fibonacci import yasmin from yasmin import CbState, Blackboard, StateMachine from yasmin_ros import ActionState from yasmin_ros.ros_logs import set_ros_loggers from yasmin_ros.basic_outcomes import SUCCEED, ABORT, CANCEL from yasmin_viewer import YasminViewerPub class FibonacciState(ActionState): """ Class representing the state of the Fibonacci action. Inherits from ActionState and implements methods to handle the Fibonacci action in a finite state machine. Attributes: None """ def __init__(self) -> None: """ Initializes the FibonacciState. Sets up the action type and the action name for the Fibonacci action. Initializes goal, response handler, and feedback processing callbacks. Parameters: None Returns: None """ super().__init__( Fibonacci, # action type "/fibonacci", # action name self.create_goal_handler, # callback to create the goal None, # outcomes. Includes (SUCCEED, ABORT, CANCEL) self.response_handler, # callback to process the response self.print_feedback, # callback to process the feedback ) def create_goal_handler(self, blackboard: Blackboard) -> Fibonacci.Goal: """ Creates the goal for the Fibonacci action. This method retrieves the input value from the blackboard and populates the Fibonacci goal. Parameters: blackboard (Blackboard): The blackboard containing the state information. Returns: Fibonacci.Goal: The populated goal object for the Fibonacci action. Raises: KeyError: If the expected key is not present in the blackboard. """ goal = Fibonacci.Goal() goal.order = blackboard["n"] # Retrieve the input value 'n' from the blackboard return goal def response_handler(self, blackboard: Blackboard, response: Fibonacci.Result) -> str: """ Handles the response from the Fibonacci action. This method processes the result of the Fibonacci action and stores it in the blackboard. Parameters: blackboard (Blackboard): The blackboard to store the result. response (Fibonacci.Result): The result object from the Fibonacci action. Returns: str: Outcome of the operation, typically SUCCEED. Raises: None """ blackboard["fibo_res"] = ( response.sequence ) # Store the result sequence in the blackboard return SUCCEED def print_feedback( self, blackboard: Blackboard, feedback: Fibonacci.Feedback ) -> None: """ Prints feedback from the Fibonacci action. This method logs the partial sequence received during the action. Parameters: blackboard (Blackboard): The blackboard (not used in this method). feedback (Fibonacci.Feedback): The feedback object from the Fibonacci action. Returns: None Raises: None """ yasmin.YASMIN_LOG_INFO(f"Received feedback: {list(feedback.sequence)}") def print_result(blackboard: Blackboard) -> str: """ Prints the result of the Fibonacci action. This function logs the final result stored in the blackboard. Parameters: blackboard (Blackboard): The blackboard containing the result. Returns: str: Outcome of the operation, typically SUCCEED. Raises: None """ yasmin.YASMIN_LOG_INFO(f"Result: {blackboard['fibo_res']}") return SUCCEED def main(): """ Main function to execute the ROS 2 action client demo. This function initializes the ROS 2 client, sets up the finite state machine, adds the states, and starts the action processing. Parameters: None Returns: None Raises: KeyboardInterrupt: If the user interrupts the execution. """ yasmin.YASMIN_LOG_INFO("yasmin_action_client_demo") # Initialize ROS 2 rclpy.init() # Set up ROS 2 logs set_ros_loggers() # Create a finite state machine (FSM) sm = StateMachine(outcomes=["outcome4"]) # Add states to the FSM sm.add_state( "CALLING_FIBONACCI", FibonacciState(), transitions={ SUCCEED: "PRINTING_RESULT", CANCEL: "outcome4", ABORT: "outcome4", }, ) sm.add_state( "PRINTING_RESULT", CbState([SUCCEED], print_result), transitions={ SUCCEED: "outcome4", }, ) # Publish FSM information YasminViewerPub("YASMIN_ACTION_CLIENT_DEMO", sm) # Create an initial blackboard with the input value blackboard = Blackboard() blackboard["n"] = 10 # Set the Fibonacci order to 10 # Execute the FSM try: outcome = sm(blackboard) yasmin.YASMIN_LOG_INFO(outcome) except KeyboardInterrupt: if sm.is_running(): sm.cancel_state() # Cancel the state if interrupted # Shutdown ROS 2 if rclpy.ok(): rclpy.shutdown() if __name__ == "__main__": main() ```Monitor Demo (FSM + ROS 2 Subscriber)
ros2 run yasmin_demos monitor_demo.py
Click to expand
```python import rclpy from rclpy.qos import qos_profile_sensor_data from nav_msgs.msg import Odometry import yasmin from yasmin import Blackboard from yasmin import StateMachine from yasmin_ros import MonitorState from yasmin_ros.ros_logs import set_ros_loggers from yasmin_ros.basic_outcomes import TIMEOUT from yasmin_viewer import YasminViewerPub class PrintOdometryState(MonitorState): """ MonitorState subclass to handle Odometry messages. This state monitors Odometry messages from the specified ROS topic, logging them and transitioning based on the number of messages received. Attributes: times (int): The number of messages to monitor before transitioning to the next outcome. Parameters: times (int): The initial count of how many Odometry messages to process before changing state. Methods: monitor_handler(blackboard: Blackboard, msg: Odometry) -> str: Handles incoming Odometry messages, logging the message and returning the appropriate outcome based on the remaining count. """ def __init__(self, times: int) -> None: """ Initializes the PrintOdometryState. Parameters: times (int): The number of Odometry messages to monitor before transitioning to the next outcome. """ super().__init__( Odometry, # msg type "odom", # topic name ["outcome1", "outcome2"], # outcomes self.monitor_handler, # monitor handler callback qos=qos_profile_sensor_data, # qos for the topic subscription msg_queue=10, # queue for the monitor handler callback timeout=10, # timeout to wait for messages in seconds ) self.times = times def monitor_handler(self, blackboard: Blackboard, msg: Odometry) -> str: """ Handles incoming Odometry messages. This method is called whenever a new Odometry message is received. It logs the message, decrements the count of messages to process, and determines the next state outcome. Parameters: blackboard (Blackboard): The shared data storage for states. msg (Odometry): The incoming Odometry message. Returns: str: The next state outcome, either "outcome1" to continue monitoring or "outcome2" to transition to the next state. Exceptions: None """ yasmin.YASMIN_LOG_INFO(msg) self.times -= 1 if self.times <= 0: return "outcome2" return "outcome1" def main(): """ Main function to initialize and run the ROS 2 state machine. This function initializes ROS 2, sets up logging, creates a finite state machine (FSM), adds states to the FSM, and executes the FSM. It handles cleanup and shutdown of ROS 2 gracefully. Exceptions: KeyboardInterrupt: Caught to allow graceful cancellation of the state machine during execution. """ yasmin.YASMIN_LOG_INFO("yasmin_monitor_demo") # Initialize ROS 2 rclpy.init() # Set ROS 2 logs set_ros_loggers() # Create a finite state machine (FSM) sm = StateMachine(outcomes=["outcome4"]) # Add states to the FSM sm.add_state( "PRINTING_ODOM", PrintOdometryState(5), transitions={ "outcome1": "PRINTING_ODOM", "outcome2": "outcome4", TIMEOUT: "outcome4", }, ) # Publish FSM information YasminViewerPub("YASMIN_MONITOR_DEMO", sm) # Execute FSM try: outcome = sm() yasmin.YASMIN_LOG_INFO(outcome) except KeyboardInterrupt: if sm.is_running(): sm.cancel_state() # Shutdown ROS 2 if rclpy.ok(): rclpy.shutdown() if __name__ == "__main__": main() ```Nav2 Demo (Hierarchical FSM + ROS 2 Action)
ros2 run yasmin_demos nav_demo.py
Click to expand
```python import random import rclpy from geometry_msgs.msg import Pose from nav2_msgs.action import NavigateToPose import yasmin from yasmin import CbState from yasmin import Blackboard from yasmin import StateMachine from yasmin_ros import ActionState from yasmin_ros.ros_logs import set_ros_loggers from yasmin_ros.basic_outcomes import SUCCEED, ABORT, CANCEL from yasmin_viewer import YasminViewerPub # Constants for state outcomes HAS_NEXT = "has_next" ##< Indicates there are more waypoints END = "end" ##< Indicates no more waypoints class Nav2State(ActionState): """ ActionState for navigating to a specified pose using ROS 2 Navigation. Attributes: None Methods: create_goal_handler(blackboard: Blackboard) -> NavigateToPose.Goal: Creates the navigation goal from the blackboard. """ def __init__(self) -> None: """ Initializes the Nav2State. Calls the parent constructor to set up the action with: - Action type: NavigateToPose - Action name: /navigate_to_pose - Callback for goal creation: create_goal_handler - Outcomes: None, since it will use default outcomes (SUCCEED, ABORT, CANCEL) """ super().__init__( NavigateToPose, # action type "/navigate_to_pose", # action name self.create_goal_handler, # callback to create the goal None, # outcomes None, # callback to process the response ) def create_goal_handler(self, blackboard: Blackboard) -> NavigateToPose.Goal: """ Creates a goal for navigation based on the current pose in the blackboard. Args: blackboard (Blackboard): The blackboard instance holding current state data. Returns: NavigateToPose.Goal: The constructed goal for the navigation action. """ goal = NavigateToPose.Goal() goal.pose.pose = blackboard["pose"] goal.pose.header.frame_id = "map" # Set the reference frame to 'map' return goal def create_waypoints(blackboard: Blackboard) -> str: """ Initializes waypoints in the blackboard for navigation. Args: blackboard (Blackboard): The blackboard instance to store waypoints. Returns: str: Outcome indicating success (SUCCEED). """ blackboard["waypoints"] = { "entrance": [1.25, 6.30, -0.78, 0.67], "bathroom": [4.89, 1.64, 0.0, 1.0], "livingroom": [1.55, 4.03, -0.69, 0.72], "kitchen": [3.79, 6.77, 0.99, 0.12], "bedroom": [7.50, 4.89, 0.76, 0.65], } return SUCCEED def take_random_waypoint(blackboard: Blackboard) -> str: """ Selects a random set of waypoints from the available waypoints. Args: blackboard (Blackboard): The blackboard instance to store random waypoints. Returns: str: Outcome indicating success (SUCCEED). """ blackboard["random_waypoints"] = random.sample( list(blackboard["waypoints"].keys()), blackboard["waypoints_num"] ) return SUCCEED def get_next_waypoint(blackboard: Blackboard) -> str: """ Retrieves the next waypoint from the list of random waypoints. Updates the blackboard with the pose of the next waypoint. Args: blackboard (Blackboard): The blackboard instance holding current state data. Returns: str: Outcome indicating whether there is a next waypoint (HAS_NEXT) or if navigation is complete (END). """ if not blackboard["random_waypoints"]: return END wp_name = blackboard["random_waypoints"].pop(0) # Get the next waypoint name wp = blackboard["waypoints"][wp_name] # Get the waypoint coordinates pose = Pose() pose.position.x = wp[0] pose.position.y = wp[1] pose.orientation.z = wp[2] pose.orientation.w = wp[3] blackboard["pose"] = pose # Update blackboard with new pose blackboard["text"] = f"I have reached waypoint {wp_name}" return HAS_NEXT # main function def main() -> None: """ Initializes the ROS 2 node, sets up state machines for navigation, and executes the FSM. Handles cleanup and shutdown of the ROS 2 node upon completion. """ yasmin.YASMIN_LOG_INFO("yasmin_nav2_demo") # Initialize ROS 2 rclpy.init() # Set ROS 2 loggers for debugging set_ros_loggers() # Create state machines sm = StateMachine(outcomes=[SUCCEED, ABORT, CANCEL]) nav_sm = StateMachine(outcomes=[SUCCEED, ABORT, CANCEL]) # Add states to the state machine sm.add_state( "CREATING_WAYPOINTS", CbState([SUCCEED], create_waypoints), transitions={ SUCCEED: "TAKING_RANDOM_WAYPOINTS", }, ) sm.add_state( "TAKING_RANDOM_WAYPOINTS", CbState([SUCCEED], take_random_waypoint), transitions={ SUCCEED: "NAVIGATING", }, ) nav_sm.add_state( "GETTING_NEXT_WAYPOINT", CbState([END, HAS_NEXT], get_next_waypoint), transitions={ END: SUCCEED, HAS_NEXT: "NAVIGATING", }, ) nav_sm.add_state( "NAVIGATING", Nav2State(), transitions={ SUCCEED: "GETTING_NEXT_WAYPOINT", CANCEL: CANCEL, ABORT: ABORT, }, ) sm.add_state( "NAVIGATING", nav_sm, transitions={ SUCCEED: SUCCEED, CANCEL: CANCEL, ABORT: ABORT, }, ) # Publish FSM information for visualization YasminViewerPub("YASMIN_NAV_DEMO", sm) # Execute the state machine blackboard = Blackboard() blackboard["waypoints_num"] = 2 # Set the number of waypoints to navigate try: outcome = sm(blackboard) # Run the state machine with the blackboard yasmin.YASMIN_LOG_INFO(outcome) except KeyboardInterrupt: sm.cancel_state() # Handle manual interruption # Shutdown ROS 2 if rclpy.ok(): if sm.is_running(): rclpy.shutdown() if __name__ == "__main__": main() ```Cpp
Vanilla Demo
ros2 run yasmin_demos yasmin_demo
Click to expand
```cpp #includeClick to expand
```cpp #includeClick to expand
```cpp #includeClick to expand
```cpp #includeCONTRIBUTING
Repository Summary
Checkout URI | https://github.com/uleroboticsgroup/yasmin.git |
VCS Type | git |
VCS Version | main |
Last Updated | 2024-12-28 |
Dev Status | DEVELOPED |
CI status | No Continuous Integration |
Released | RELEASED |
Tags | No category tags. |
Contributing |
Help Wanted (0)
Good First Issues (0) Pull Requests to Review (0) |
Packages
Name | Version |
---|---|
yasmin | 3.0.3 |
yasmin_demos | 3.0.3 |
yasmin_msgs | 3.0.3 |
yasmin_ros | 3.0.3 |
yasmin_viewer | 3.0.3 |
README
YASMIN (Yet Another State MachINe)
YASMIN is a project focused on implementing robot behaviors using Finite State Machines (FSM). It is available for ROS 2, Python and C++.
Table of Contents
Key Features
- ROS 2 Integration: Integrates with ROS 2 for easy deployment and interaction.
- Support for Python and C++: Available for both Python and C++, making it flexible for a variety of use cases.
- Rapid Prototyping: Designed for fast prototyping, allowing quick iteration of state machine behaviors.
- Predefined States: Includes states for interacting with ROS 2 action clients, service clients, and topics.
- Data Sharing with Blackboards: Utilizes blackboards for data sharing between states and state machines.
- State Management: Supports cancellation and stopping of state machines, including halting the current executing state.
- Web Viewer: Features an integrated web viewer for real-time monitoring of state machine execution.
Installation
Debian Packages
To install YASMIN and its packages, use the following command:
sudo apt install ros-$ROS_DISTRO-yasmin ros-$ROS_DISTRO-yasmin-*
Building from Source
Follow these steps to build the source code from this repository:
cd ~/ros2_ws/src
git clone https://github.com/uleroboticsgroup/yasmin.git
cd ~/ros2_ws
rosdep install --from-paths src --ignore-src -r -y
cd ~/ros2_ws
colcon build
Note for Deprecated or Rolling ROS 2 Distros
If you are using a deprecated ROS 2 distribution (e.g., Foxy or Galactic) or the Rolling distribution, install the example interfaces:
sudo apt install -y ros-$ROS_DISTRO-example-interfaces
Docker
If your operating system doesn’t support ROS 2, docker is a great alternative. You can use an image from Dockerhub or create your own images. First of all, to build the image you have to use the following command:
## Assuming you are in the YASMIN project directory
docker build -t yasmin .
To use a shortcut the docker build, you may use the following command:
## Assuming you are in the YASMIN project directory
make docker_build
After the image is created, run a docker container with the following command:
docker run -it --net=host --ipc=host --privileged --env="DISPLAY" --env="QT_X11_NO_MITSHM=1" --volume="/tmp/.X11-unix:/tmp/.X11-unix:rw" --volume="${XAUTHORITY}:/root/.Xauthority" --entrypoint /bin/bash yasmin
To use a shortcut the docker run, you may use following command:
## Assuming you are in the YASMIN project directory
make docker_run
Demos
There are some examples, for both Python and C++, that can be found in yasmin_demos.
Python
Vanilla Demo (FSM)
ros2 run yasmin_demos yasmin_demo.py
Click to expand
```python import time import rclpy import yasmin from yasmin import State from yasmin import Blackboard from yasmin import StateMachine from yasmin_ros.ros_logs import set_ros_loggers from yasmin_viewer import YasminViewerPub # Define the FooState class, inheriting from the State class class FooState(State): """ Represents the Foo state in the state machine. Attributes: counter (int): Counter to track the number of executions of this state. """ def __init__(self) -> None: """ Initializes the FooState instance, setting up the outcomes. Outcomes: outcome1: Indicates the state should continue to the Bar state. outcome2: Indicates the state should finish execution and return. """ super().__init__(["outcome1", "outcome2"]) self.counter = 0 def execute(self, blackboard: Blackboard) -> str: """ Executes the logic for the Foo state. Args: blackboard (Blackboard): The shared data structure for states. Returns: str: The outcome of the execution, which can be "outcome1" or "outcome2". Raises: Exception: May raise exceptions related to state execution. """ yasmin.YASMIN_LOG_INFO("Executing state FOO") time.sleep(3) # Simulate work by sleeping if self.counter < 3: self.counter += 1 blackboard["foo_str"] = f"Counter: {self.counter}" return "outcome1" else: return "outcome2" # Define the BarState class, inheriting from the State class class BarState(State): """ Represents the Bar state in the state machine. """ def __init__(self) -> None: """ Initializes the BarState instance, setting up the outcome. Outcomes: outcome3: Indicates the state should transition back to the Foo state. """ super().__init__(outcomes=["outcome3"]) def execute(self, blackboard: Blackboard) -> str: """ Executes the logic for the Bar state. Args: blackboard (Blackboard): The shared data structure for states. Returns: str: The outcome of the execution, which will always be "outcome3". Raises: Exception: May raise exceptions related to state execution. """ yasmin.YASMIN_LOG_INFO("Executing state BAR") time.sleep(3) # Simulate work by sleeping yasmin.YASMIN_LOG_INFO(blackboard["foo_str"]) return "outcome3" # Main function to initialize and run the state machine def main(): """ The main entry point of the application. Initializes the ROS 2 environment, sets up the state machine, and handles execution and termination. Raises: KeyboardInterrupt: If the execution is interrupted by the user. """ yasmin.YASMIN_LOG_INFO("yasmin_demo") # Initialize ROS 2 rclpy.init() # Set ROS 2 loggers set_ros_loggers() # Create a finite state machine (FSM) sm = StateMachine(outcomes=["outcome4"]) # Add states to the FSM sm.add_state( "FOO", FooState(), transitions={ "outcome1": "BAR", "outcome2": "outcome4", }, ) sm.add_state( "BAR", BarState(), transitions={ "outcome3": "FOO", }, ) # Publish FSM information for visualization YasminViewerPub("yasmin_demo", sm) # Execute the FSM try: outcome = sm() yasmin.YASMIN_LOG_INFO(outcome) except KeyboardInterrupt: if sm.is_running(): sm.cancel_state() # Shutdown ROS 2 if it's running if rclpy.ok(): rclpy.shutdown() if __name__ == "__main__": main() ```Service Demo (FSM + ROS 2 Service Client)
ros2 run yasmin_demos add_two_ints_server
ros2 run yasmin_demos service_client_demo.py
Click to expand
```python import rclpy from example_interfaces.srv import AddTwoInts import yasmin from yasmin import CbState from yasmin import Blackboard from yasmin import StateMachine from yasmin_ros import ServiceState from yasmin_ros.ros_logs import set_ros_loggers from yasmin_ros.basic_outcomes import SUCCEED, ABORT from yasmin_viewer import YasminViewerPub class AddTwoIntsState(ServiceState): """ A state that calls the AddTwoInts service to add two integers. This class is a state in a finite state machine that sends a request to the AddTwoInts service, retrieves the response, and updates the blackboard with the result. Attributes: service_type (type): The service type being used (AddTwoInts). service_name (str): The name of the service. outcomes (list): The list of possible outcomes for this state. """ def __init__(self) -> None: """ Initializes the AddTwoIntsState. Calls the parent constructor with the specific service type, service name, request handler, outcomes, and response handler. """ super().__init__( AddTwoInts, # srv type "/add_two_ints", # service name self.create_request_handler, # cb to create the request ["outcome1"], # outcomes. Includes (SUCCEED, ABORT) self.response_handler, # cb to process the response ) def create_request_handler(self, blackboard: Blackboard) -> AddTwoInts.Request: """ Creates the service request from the blackboard data. Args: blackboard (Blackboard): The blackboard containing the input values. Returns: AddTwoInts.Request: The request object populated with values from the blackboard. """ req = AddTwoInts.Request() req.a = blackboard["a"] req.b = blackboard["b"] return req def response_handler( self, blackboard: Blackboard, response: AddTwoInts.Response ) -> str: """ Processes the response from the AddTwoInts service. Updates the blackboard with the sum result from the response. Args: blackboard (Blackboard): The blackboard to update with the sum. response (AddTwoInts.Response): The response from the service call. Returns: str: The outcome of the operation, which is "outcome1". """ blackboard["sum"] = response.sum return "outcome1" def set_ints(blackboard: Blackboard) -> str: """ Sets the integer values in the blackboard. This function initializes the blackboard with two integer values to be added. Args: blackboard (Blackboard): The blackboard to update with integer values. Returns: str: The outcome of the operation, which is SUCCEED. """ blackboard["a"] = 10 blackboard["b"] = 5 return SUCCEED def print_sum(blackboard: Blackboard) -> str: """ Logs the sum value from the blackboard. This function retrieves the sum from the blackboard and logs it. Args: blackboard (Blackboard): The blackboard from which to retrieve the sum. Returns: str: The outcome of the operation, which is SUCCEED. """ yasmin.YASMIN_LOG_INFO(f"Sum: {blackboard['sum']}") return SUCCEED def main(): """ The main function to execute the finite state machine (FSM). This function initializes the ROS 2 environment, sets up logging, creates the FSM with defined states, and executes the FSM. Raises: KeyboardInterrupt: If the user interrupts the program. """ yasmin.YASMIN_LOG_INFO("yasmin_service_client_demo") # Init ROS 2 rclpy.init() # Set ROS 2 logs set_ros_loggers() # Create a FSM sm = StateMachine(outcomes=["outcome4"]) # Add states sm.add_state( "SETTING_INTS", CbState([SUCCEED], set_ints), transitions={SUCCEED: "ADD_TWO_INTS"}, ) sm.add_state( "ADD_TWO_INTS", AddTwoIntsState(), transitions={ "outcome1": "PRINTING_SUM", SUCCEED: "outcome4", ABORT: "outcome4", }, ) sm.add_state( "PRINTING_SUM", CbState([SUCCEED], print_sum), transitions={ SUCCEED: "outcome4", }, ) # Publish FSM info YasminViewerPub("YASMIN_SERVICE_CLIENT_DEMO", sm) # Execute FSM try: outcome = sm() yasmin.YASMIN_LOG_INFO(outcome) except KeyboardInterrupt: if sm.is_running(): sm.cancel_state() # Shutdown ROS 2 if rclpy.ok(): rclpy.shutdown() if __name__ == "__main__": main() ```Action Demo (FSM + ROS 2 Action)
ros2 run yasmin_demos fibonacci_action_server
ros2 run yasmin_demos action_client_demo.py
Click to expand
```python import rclpy from example_interfaces.action import Fibonacci import yasmin from yasmin import CbState, Blackboard, StateMachine from yasmin_ros import ActionState from yasmin_ros.ros_logs import set_ros_loggers from yasmin_ros.basic_outcomes import SUCCEED, ABORT, CANCEL from yasmin_viewer import YasminViewerPub class FibonacciState(ActionState): """ Class representing the state of the Fibonacci action. Inherits from ActionState and implements methods to handle the Fibonacci action in a finite state machine. Attributes: None """ def __init__(self) -> None: """ Initializes the FibonacciState. Sets up the action type and the action name for the Fibonacci action. Initializes goal, response handler, and feedback processing callbacks. Parameters: None Returns: None """ super().__init__( Fibonacci, # action type "/fibonacci", # action name self.create_goal_handler, # callback to create the goal None, # outcomes. Includes (SUCCEED, ABORT, CANCEL) self.response_handler, # callback to process the response self.print_feedback, # callback to process the feedback ) def create_goal_handler(self, blackboard: Blackboard) -> Fibonacci.Goal: """ Creates the goal for the Fibonacci action. This method retrieves the input value from the blackboard and populates the Fibonacci goal. Parameters: blackboard (Blackboard): The blackboard containing the state information. Returns: Fibonacci.Goal: The populated goal object for the Fibonacci action. Raises: KeyError: If the expected key is not present in the blackboard. """ goal = Fibonacci.Goal() goal.order = blackboard["n"] # Retrieve the input value 'n' from the blackboard return goal def response_handler(self, blackboard: Blackboard, response: Fibonacci.Result) -> str: """ Handles the response from the Fibonacci action. This method processes the result of the Fibonacci action and stores it in the blackboard. Parameters: blackboard (Blackboard): The blackboard to store the result. response (Fibonacci.Result): The result object from the Fibonacci action. Returns: str: Outcome of the operation, typically SUCCEED. Raises: None """ blackboard["fibo_res"] = ( response.sequence ) # Store the result sequence in the blackboard return SUCCEED def print_feedback( self, blackboard: Blackboard, feedback: Fibonacci.Feedback ) -> None: """ Prints feedback from the Fibonacci action. This method logs the partial sequence received during the action. Parameters: blackboard (Blackboard): The blackboard (not used in this method). feedback (Fibonacci.Feedback): The feedback object from the Fibonacci action. Returns: None Raises: None """ yasmin.YASMIN_LOG_INFO(f"Received feedback: {list(feedback.sequence)}") def print_result(blackboard: Blackboard) -> str: """ Prints the result of the Fibonacci action. This function logs the final result stored in the blackboard. Parameters: blackboard (Blackboard): The blackboard containing the result. Returns: str: Outcome of the operation, typically SUCCEED. Raises: None """ yasmin.YASMIN_LOG_INFO(f"Result: {blackboard['fibo_res']}") return SUCCEED def main(): """ Main function to execute the ROS 2 action client demo. This function initializes the ROS 2 client, sets up the finite state machine, adds the states, and starts the action processing. Parameters: None Returns: None Raises: KeyboardInterrupt: If the user interrupts the execution. """ yasmin.YASMIN_LOG_INFO("yasmin_action_client_demo") # Initialize ROS 2 rclpy.init() # Set up ROS 2 logs set_ros_loggers() # Create a finite state machine (FSM) sm = StateMachine(outcomes=["outcome4"]) # Add states to the FSM sm.add_state( "CALLING_FIBONACCI", FibonacciState(), transitions={ SUCCEED: "PRINTING_RESULT", CANCEL: "outcome4", ABORT: "outcome4", }, ) sm.add_state( "PRINTING_RESULT", CbState([SUCCEED], print_result), transitions={ SUCCEED: "outcome4", }, ) # Publish FSM information YasminViewerPub("YASMIN_ACTION_CLIENT_DEMO", sm) # Create an initial blackboard with the input value blackboard = Blackboard() blackboard["n"] = 10 # Set the Fibonacci order to 10 # Execute the FSM try: outcome = sm(blackboard) yasmin.YASMIN_LOG_INFO(outcome) except KeyboardInterrupt: if sm.is_running(): sm.cancel_state() # Cancel the state if interrupted # Shutdown ROS 2 if rclpy.ok(): rclpy.shutdown() if __name__ == "__main__": main() ```Monitor Demo (FSM + ROS 2 Subscriber)
ros2 run yasmin_demos monitor_demo.py
Click to expand
```python import rclpy from rclpy.qos import qos_profile_sensor_data from nav_msgs.msg import Odometry import yasmin from yasmin import Blackboard from yasmin import StateMachine from yasmin_ros import MonitorState from yasmin_ros.ros_logs import set_ros_loggers from yasmin_ros.basic_outcomes import TIMEOUT from yasmin_viewer import YasminViewerPub class PrintOdometryState(MonitorState): """ MonitorState subclass to handle Odometry messages. This state monitors Odometry messages from the specified ROS topic, logging them and transitioning based on the number of messages received. Attributes: times (int): The number of messages to monitor before transitioning to the next outcome. Parameters: times (int): The initial count of how many Odometry messages to process before changing state. Methods: monitor_handler(blackboard: Blackboard, msg: Odometry) -> str: Handles incoming Odometry messages, logging the message and returning the appropriate outcome based on the remaining count. """ def __init__(self, times: int) -> None: """ Initializes the PrintOdometryState. Parameters: times (int): The number of Odometry messages to monitor before transitioning to the next outcome. """ super().__init__( Odometry, # msg type "odom", # topic name ["outcome1", "outcome2"], # outcomes self.monitor_handler, # monitor handler callback qos=qos_profile_sensor_data, # qos for the topic subscription msg_queue=10, # queue for the monitor handler callback timeout=10, # timeout to wait for messages in seconds ) self.times = times def monitor_handler(self, blackboard: Blackboard, msg: Odometry) -> str: """ Handles incoming Odometry messages. This method is called whenever a new Odometry message is received. It logs the message, decrements the count of messages to process, and determines the next state outcome. Parameters: blackboard (Blackboard): The shared data storage for states. msg (Odometry): The incoming Odometry message. Returns: str: The next state outcome, either "outcome1" to continue monitoring or "outcome2" to transition to the next state. Exceptions: None """ yasmin.YASMIN_LOG_INFO(msg) self.times -= 1 if self.times <= 0: return "outcome2" return "outcome1" def main(): """ Main function to initialize and run the ROS 2 state machine. This function initializes ROS 2, sets up logging, creates a finite state machine (FSM), adds states to the FSM, and executes the FSM. It handles cleanup and shutdown of ROS 2 gracefully. Exceptions: KeyboardInterrupt: Caught to allow graceful cancellation of the state machine during execution. """ yasmin.YASMIN_LOG_INFO("yasmin_monitor_demo") # Initialize ROS 2 rclpy.init() # Set ROS 2 logs set_ros_loggers() # Create a finite state machine (FSM) sm = StateMachine(outcomes=["outcome4"]) # Add states to the FSM sm.add_state( "PRINTING_ODOM", PrintOdometryState(5), transitions={ "outcome1": "PRINTING_ODOM", "outcome2": "outcome4", TIMEOUT: "outcome4", }, ) # Publish FSM information YasminViewerPub("YASMIN_MONITOR_DEMO", sm) # Execute FSM try: outcome = sm() yasmin.YASMIN_LOG_INFO(outcome) except KeyboardInterrupt: if sm.is_running(): sm.cancel_state() # Shutdown ROS 2 if rclpy.ok(): rclpy.shutdown() if __name__ == "__main__": main() ```Nav2 Demo (Hierarchical FSM + ROS 2 Action)
ros2 run yasmin_demos nav_demo.py
Click to expand
```python import random import rclpy from geometry_msgs.msg import Pose from nav2_msgs.action import NavigateToPose import yasmin from yasmin import CbState from yasmin import Blackboard from yasmin import StateMachine from yasmin_ros import ActionState from yasmin_ros.ros_logs import set_ros_loggers from yasmin_ros.basic_outcomes import SUCCEED, ABORT, CANCEL from yasmin_viewer import YasminViewerPub # Constants for state outcomes HAS_NEXT = "has_next" ##< Indicates there are more waypoints END = "end" ##< Indicates no more waypoints class Nav2State(ActionState): """ ActionState for navigating to a specified pose using ROS 2 Navigation. Attributes: None Methods: create_goal_handler(blackboard: Blackboard) -> NavigateToPose.Goal: Creates the navigation goal from the blackboard. """ def __init__(self) -> None: """ Initializes the Nav2State. Calls the parent constructor to set up the action with: - Action type: NavigateToPose - Action name: /navigate_to_pose - Callback for goal creation: create_goal_handler - Outcomes: None, since it will use default outcomes (SUCCEED, ABORT, CANCEL) """ super().__init__( NavigateToPose, # action type "/navigate_to_pose", # action name self.create_goal_handler, # callback to create the goal None, # outcomes None, # callback to process the response ) def create_goal_handler(self, blackboard: Blackboard) -> NavigateToPose.Goal: """ Creates a goal for navigation based on the current pose in the blackboard. Args: blackboard (Blackboard): The blackboard instance holding current state data. Returns: NavigateToPose.Goal: The constructed goal for the navigation action. """ goal = NavigateToPose.Goal() goal.pose.pose = blackboard["pose"] goal.pose.header.frame_id = "map" # Set the reference frame to 'map' return goal def create_waypoints(blackboard: Blackboard) -> str: """ Initializes waypoints in the blackboard for navigation. Args: blackboard (Blackboard): The blackboard instance to store waypoints. Returns: str: Outcome indicating success (SUCCEED). """ blackboard["waypoints"] = { "entrance": [1.25, 6.30, -0.78, 0.67], "bathroom": [4.89, 1.64, 0.0, 1.0], "livingroom": [1.55, 4.03, -0.69, 0.72], "kitchen": [3.79, 6.77, 0.99, 0.12], "bedroom": [7.50, 4.89, 0.76, 0.65], } return SUCCEED def take_random_waypoint(blackboard: Blackboard) -> str: """ Selects a random set of waypoints from the available waypoints. Args: blackboard (Blackboard): The blackboard instance to store random waypoints. Returns: str: Outcome indicating success (SUCCEED). """ blackboard["random_waypoints"] = random.sample( list(blackboard["waypoints"].keys()), blackboard["waypoints_num"] ) return SUCCEED def get_next_waypoint(blackboard: Blackboard) -> str: """ Retrieves the next waypoint from the list of random waypoints. Updates the blackboard with the pose of the next waypoint. Args: blackboard (Blackboard): The blackboard instance holding current state data. Returns: str: Outcome indicating whether there is a next waypoint (HAS_NEXT) or if navigation is complete (END). """ if not blackboard["random_waypoints"]: return END wp_name = blackboard["random_waypoints"].pop(0) # Get the next waypoint name wp = blackboard["waypoints"][wp_name] # Get the waypoint coordinates pose = Pose() pose.position.x = wp[0] pose.position.y = wp[1] pose.orientation.z = wp[2] pose.orientation.w = wp[3] blackboard["pose"] = pose # Update blackboard with new pose blackboard["text"] = f"I have reached waypoint {wp_name}" return HAS_NEXT # main function def main() -> None: """ Initializes the ROS 2 node, sets up state machines for navigation, and executes the FSM. Handles cleanup and shutdown of the ROS 2 node upon completion. """ yasmin.YASMIN_LOG_INFO("yasmin_nav2_demo") # Initialize ROS 2 rclpy.init() # Set ROS 2 loggers for debugging set_ros_loggers() # Create state machines sm = StateMachine(outcomes=[SUCCEED, ABORT, CANCEL]) nav_sm = StateMachine(outcomes=[SUCCEED, ABORT, CANCEL]) # Add states to the state machine sm.add_state( "CREATING_WAYPOINTS", CbState([SUCCEED], create_waypoints), transitions={ SUCCEED: "TAKING_RANDOM_WAYPOINTS", }, ) sm.add_state( "TAKING_RANDOM_WAYPOINTS", CbState([SUCCEED], take_random_waypoint), transitions={ SUCCEED: "NAVIGATING", }, ) nav_sm.add_state( "GETTING_NEXT_WAYPOINT", CbState([END, HAS_NEXT], get_next_waypoint), transitions={ END: SUCCEED, HAS_NEXT: "NAVIGATING", }, ) nav_sm.add_state( "NAVIGATING", Nav2State(), transitions={ SUCCEED: "GETTING_NEXT_WAYPOINT", CANCEL: CANCEL, ABORT: ABORT, }, ) sm.add_state( "NAVIGATING", nav_sm, transitions={ SUCCEED: SUCCEED, CANCEL: CANCEL, ABORT: ABORT, }, ) # Publish FSM information for visualization YasminViewerPub("YASMIN_NAV_DEMO", sm) # Execute the state machine blackboard = Blackboard() blackboard["waypoints_num"] = 2 # Set the number of waypoints to navigate try: outcome = sm(blackboard) # Run the state machine with the blackboard yasmin.YASMIN_LOG_INFO(outcome) except KeyboardInterrupt: sm.cancel_state() # Handle manual interruption # Shutdown ROS 2 if rclpy.ok(): if sm.is_running(): rclpy.shutdown() if __name__ == "__main__": main() ```Cpp
Vanilla Demo
ros2 run yasmin_demos yasmin_demo