Example 3: Services and Action Servers

Introduction

ROS (Robot Operating System) is a flexible framework for writing robot software. It is a collection of tools, libraries, and conventions that aim to simplify the task of creating complex and robust robot behavior across a wide variety of robotic platforms. Two important communication mechanisms in ROS are services and action servers/clients.

ROS Services

flowchart LR subgraph Services["ROS Node"] ServiceServer("ServiceServer") Client(["Client"]) end subgraph o["Other Node"] Client2 end Client == Request ==> ServiceServer Client2 == Request ==> ServiceServer ServiceServer -. Response ...-> Client & Client2 style Services fill:#FFFFFF,stroke:#000000 style o stroke:#000000

ROS services are synchronous communication mechanisms used for short-duration tasks that require a request and a response. A service is defined by a pair of messages: one for the request and one for the response.

  • Service Server: A node that provides a service. It advertises the service and waits for requests.
  • Service Client: A node that uses the service. It sends a request to the service server and waits for the response.

In the Services graph:

  • The Service Client sends a request to the Service Server.
  • The Service Server processes the request and sends back a response.

Example Use Case: Retrieving sensor data or setting parameters.

ROS1 Action Servers and Clients

ROS action servers and clients are used for asynchronous communication, suitable for long-duration tasks that require feedback and the ability to preempt (cancel) tasks. Actions are defined by three types of messages: goal, result, and feedback.

  • Action Server: A node that provides an action. It receives goals, processes them, and sends feedback and results.
  • Action Client: A node that sends goals to the action server and can receive feedback and results.

Example Use Case: Moving a robot arm to a specific position or navigating a robot to a goal location.

flowchart LR subgraph Services["ROS Node"] Server("Action Server") Client("Internal Client") end subgraph OtherNode["Other Node"] Client2("Client 2") end Client == Goal ==> Server Client2 == Goal ==> Server Server -. Feedback ..-> Client Server -. Feedback ...-> Client2 Server -- Result --> Client & Client2 style Services fill:#FFFFFF,stroke:#000000 style OtherNode stroke:#000000

This diagram visually separates the two communication mechanisms and illustrates the flow of messages between clients and servers for both services and action servers.

In the ActionServers graph:

  • The Action Client sends a goal to the Action Server.
  • The Action Server processes the goal, provides periodic feedback to the Action Client, and finally sends the result when the goal is achieved or aborted.

Exercise 1

Step 1: Package Creation

First, define the ROS_WS variable and create the workspace:

export ROS_WS=<path_to_your_workspace>
mkdir -p $ROS_WS/src
cd $ROS_WS/src
catkin_create_pkg fib_pkg rospy std_msgs
cd $ROS_WS
catkin_make
source devel/setup.bash

Step 2: Define the Fibonacci Service

Create the service definition file Fibonacci.srv in the srv directory of your package.

mkdir -p $ROS_WS/src/fib_pkg/srv
echo "int64 n
---
int64 fibonacci" > $ROS_WS/src/fib_pkg/srv/Fibonacci.srv

Note: The service file name must start with a capital letter

Note: There are rules to naming the service variables in general. Look them up and be sure to follow.

Update the CMakeLists.txt to build the service.

# Add the following lines in CMakeLists.txt

## Generate services in the 'srv' folder
add_service_files(
  FILES
  Fibonacci.srv
)

## Generate added messages and services with any dependencies listed here
generate_messages(
  DEPENDENCIES
  std_msgs
)

catkin_package(
  CATKIN_DEPENDS message_runtime
)

Step 3: Create the Service Server Node

Now, create the service server node in the scripts directory.

mkdir -p $ROS_WS/src/fib_pkg/scripts

Create a file named fibonacci_server.py.

#!/usr/bin/env python

import rospy
from fib_pkg.srv import Fibonacci, FibonacciResponse

def fibonacci(n):
    if n == 0:
        return 0
    elif n == 1:
        return 1
    else:
        a, b = 0, 1
        for _ in range(2, n + 1):
            a, b = b, a + b
        return b

def handle_fibonacci(req):
    result = fibonacci(req.n)
    rospy.loginfo(f"Returning Fibonacci number for n={req.n}: {result}")
    return FibonacciResponse(result)

def fibonacci_server():
    rospy.init_node('fibonacci_server')
    s = rospy.Service('fibonacci', Fibonacci, handle_fibonacci)
    rospy.loginfo("Ready to calculate Fibonacci number.")
    rospy.spin()

if __name__ == "__main__":
    fibonacci_server()

Make the script executable.

chmod +x $ROS_WS/src/fib_pkg/scripts/fibonacci_server.py

Step 4: Call the Service from CLI

Start the ROS master and the service server.

roscore

In a new terminal, start the service server node.

rosrun fib_pkg fibonacci_server.py

In another terminal, call the service from the CLI.

rosservice call /fibonacci 10

You should see the response with the 10th Fibonacci number.

Step 5: Call the Service from rqt

Open rqt and use the Service Caller plugin.

rqt
  1. Open the Plugins menu, navigate to Services, and select Service Caller.
  2. Select the /fibonacci service.
  3. Enter a value for n and call the service.
  4. You should see the response in the Response field.

6. Create Another Node to Call the Service

Create a new file named fibonacci_client.py in the scripts directory.

#!/usr/bin/env python

import rospy
from fib_pkg.srv import Fibonacci

def fibonacci_client(n):
    rospy.wait_for_service('fibonacci')
    try:
        fibonacci = rospy.ServiceProxy('fibonacci', Fibonacci)
        resp = fibonacci(n)
        return resp.fibonacci
    except rospy.ServiceException as e:
        rospy.logerr(f"Service call failed: {e}")

def usage():
    return "%s [n]" % sys.argv[0]

if __name__ == "__main__":
    rospy.init_node('fibonacci_client')
    if len(sys.argv) == 2:
        n = int(sys.argv[1])
    else:
        rospy.loginfo(usage())
        sys.exit(1)
    rospy.loginfo(f"Requesting Fibonacci number for n={n}")
    result = fibonacci_client(n)
    rospy.loginfo(f"Fibonacci number for n={n} is {result}")

Make the script executable.

chmod +x $ROS_WS/src/fib_pkg/scripts/fibonacci_client.py

Start the client node in a new terminal.

rosrun fib_pkg fibonacci_client.py 10

You should see the result of the service call in the terminal.

Recap

  • We defined a service Fibonacci to calculate the Nth Fibonacci number.
  • We created a service server node fibonacci_server.py.
  • We called the service from CLI and rqt.
  • We created another node fibonacci_client.py that calls the service.

Exercise 2

Stability analysis as an action server.

Mathematical Background for Stability Analysis

Transfer Function

A transfer function $H(s) $ is a mathematical representation of the relationship between the input and output of a linear time-invariant (LTI) system in the Laplace domain. It is typically represented as:

\[H(s) = \frac{N(s)}{D(s)}\]

where:

  • $ N(s) $ is the numerator polynomial.
  • $ D(s) $ is the denominator polynomial.

Poles of the Transfer Function

The poles of the transfer function are the roots of the denominator polynomial $ D(s) $. These poles determine the stability of the system.

Stability Criteria

A continuous-time LTI system is stable if all the poles of its transfer function $ H(s) $ have negative real parts. In other words, the system is stable if all the poles lie in the left-half of the complex plane. Mathematically, if $ s_i $ are the poles of $ H(s) $:

\[\text{Re}(s_i) < 0 \quad \forall i\]

Steps to Determine Stability

  1. Find the Denominator Polynomial: The denominator polynomial $ D(s) $ of the transfer function.
  2. Calculate the Poles: Solve for the roots of the polynomial $ D(s) $.
  3. Check Real Parts of the Poles: Examine the real parts of the roots:
    • If all roots have negative real parts, the system is stable.
    • If any root has a non-negative real part, the system is unstable.

Example

Consider a transfer function:

\[H(s) = \frac{s + 5}{s^2 - s - 6}\]

Here, the numerator polynomial is $ N(s) = s + 5 $ and the denominator polynomial is $ D(s) = s^2 - s - 6 $.

Step 1: Find the Denominator Polynomial
\[D(s) = s^2 - s - 6\]

To find the poles, we solve the equation $D(s) = 0$ we find poles $s_1 = 3$ and $s_2 = -2$.

Step 2: Check Real Parts of the Poles
  • $s_1 = 3$ (positive real part)
  • $s_2 = -2$ (negative real part)

Since $s_1 = 3$ has a positive real part, the system is unstable.

In python code, we can use numpy.roots() to check for stability.

def is_stable(D):
    roots = numpy.roots(D)
    return numpy.all(numpy.real(roots) < 0)

This function returns True if all the real parts of the roots are negative, indicating that the system is stable. Otherwise, it returns False, indicating that the system is unstable.

Now we start with the Exercise

We implement a ROS1 action server that determines if a given transfer function (described by arrays of numerator and denominator coefficients) is stable or unstable. If stable, it will respond “stable” and write an image of its step response; if unstable, it will respond “not stable.”

Step 1: Define the Environment Variable and Workspace

First, define the ROS_WS variable and create the workspace:

export ROS_WS=<path_of_your_workspace>
mkdir -p $ROS_WS/src
cd $ROS_WS/src
catkin_create_pkg stability_action_pkg rospy actionlib std_msgs
cd $ROS_WS
catkin_make
source devel/setup.bash

Step 2: Define the Action

Create the action definition file CheckStability.action in the action directory of your package.

mkdir -p $ROS_WS/src/stability_action_pkg/action
touch $ROS_WS/src/stability_action_pkg/action/CheckStability.action

Populate it with the following:

float64[] numerator
float64[] denominator
---
string result
---
float64[] feedback

Note: The action file name must start with a capital letter

Note: There are rules to naming the service variables in general. Look them up and be sure to follow.

Update the CMakeLists.txt to build the action.

# Add the following lines in CMakeLists.txt

## Generate actions in the 'action' folder
add_action_files(
  FILES
  CheckStability.action
)

## Generate added messages and services with any dependencies listed here
generate_messages(
  DEPENDENCIES
  std_msgs actionlib_msgs
)

catkin_package(
  CATKIN_DEPENDS message_runtime actionlib_msgs
)

Step 3: Create the Action Server Node

Now, create the action server node in the scripts directory.

mkdir -p $ROS_WS/src/stability_action_pkg/scripts

Create a file named check_stability_server.py.

#!/usr/bin/env python

import rospy
import actionlib
import numpy as np
import matplotlib.pyplot as plt
from stability_action_pkg.msg import CheckStabilityAction, CheckStabilityFeedback, CheckStabilityResult

def is_stable(denominator):
    roots = np.roots(denominator)
    return np.all(np.real(roots) < 0)

def plot_step_response(numerator, denominator):
    import scipy.signal as signal
    system = signal.TransferFunction(numerator, denominator)
    t, response = signal.step(system)
    plt.figure()
    plt.plot(t, response)
    plt.xlabel('Time [s]')
    plt.ylabel('Amplitude')
    plt.title('Step Response')
    plt.grid()
    plt.savefig('/tmp/step_response.png')

class CheckStabilityActionServer:
    _feedback = CheckStabilityFeedback()
    _result = CheckStabilityResult()

    def __init__(self):
        self._as = actionlib.SimpleActionServer("check_stability", CheckStabilityAction, self.execute_cb, False)
        self._as.start()

    def execute_cb(self, goal):
        rospy.loginfo(f"Received numerator: {goal.numerator}, denominator: {goal.denominator}")
        if is_stable(goal.denominator):
            self._result.result = "stable"
            plot_step_response(goal.numerator, goal.denominator)
        else:
            self._result.result = "not stable"
        self._as.set_succeeded(self._result)

if __name__ == "__main__":
    rospy.init_node('check_stability_server')
    server = CheckStabilityActionServer()
    rospy.spin()

Make the script executable.

chmod +x $ROS_WS/src/stability_action_pkg/scripts/check_stability_server.py

Step 4: Create the Action Client Node

Create a new file named check_stability_client.py in the scripts directory.

#!/usr/bin/env python

import rospy
import actionlib
from stability_action_pkg.msg import CheckStabilityAction, CheckStabilityGoal

def check_stability_client(numerator, denominator):
    client = actionlib.SimpleActionClient('check_stability', CheckStabilityAction)
    client.wait_for_server()
    
    goal = CheckStabilityGoal()
    goal.numerator = numerator
    goal.denominator = denominator
    
    client.send_goal(goal)
    client.wait_for_result()
    
    return client.get_result()

if __name__ == "__main__":
    rospy.init_node('check_stability_client')
    numerator = [1, 5, 6] # Example numerator
    denominator = [1, -1, -6] # Example denominator
    result = check_stability_client(numerator, denominator)
    rospy.loginfo(f"Result: {result.result}")

Make the script executable.

chmod +x $ROS_WS/src/stability_action_pkg/scripts/check_stability_client.py

Step 5: Build the Package

Build the package to ensure all the files and dependencies are correctly set up.

cd $ROS_WS
catkin_make
source devel/setup.bash

Step 6: Run the Action Server and Client

Start the ROS master and the action server.

roscore

In a new terminal, start the action server node.

rosrun stability_action_pkg check_stability_server.py

In another terminal, run the action client node.

rosrun stability_action_pkg check_stability_client.py

You should see the result of the action in the terminal, indicating whether the system is stable or not. If stable, you should also find the step response image saved as /tmp/step_response.png.

Recap

  • We defined an action CheckStability to determine system stability.
  • We created an action server node check_stability_server.py.
  • We created an action client node check_stability_client.py.
  • We verified the stability of a system and generated a step response plot if stable.