from __future__ import print_function
import logging
import threading
from . import Message
from . import Param
from . import Service
from . import ServiceRequest
from .comm import RosBridgeClientFactory
__all__ = ['Ros']
LOGGER = logging.getLogger('roslibpy')
CONNECTION_TIMEOUT = 10
ROSAPI_TIMEOUT = 3
class Ros(object):
"""Connection manager to ROS server.
Args:
host (:obj:`str`): Name or IP address of the ROS bridge host, e.g. ``127.0.0.1``.
port (:obj:`int`): ROS bridge port, e.g. ``9090``.
is_secure (:obj:`bool`): ``True`` to use a secure web sockets connection, otherwise ``False``.
"""
def __init__(self, host, port=None, is_secure=False):
self._id_counter = 0
url = RosBridgeClientFactory.create_url(host, port, is_secure)
self.factory = RosBridgeClientFactory(url)
self.is_connecting = False
self.connect()
@property
def id_counter(self):
"""Generate an auto-incremental ID starting from 1.
Returns:
int: An auto-incremented ID.
"""
self._id_counter += 1
return self._id_counter
@property
def is_connected(self):
"""Indicate if the ROS connection is open or not.
Returns:
bool: True if connected to ROS, False otherwise.
"""
return self.factory.is_connected
def connect(self):
"""Connect to ROS master."""
# Don't try to reconnect if already connected.
if self.is_connected or self.is_connecting:
return
self.is_connecting = True
def _unset_connecting_flag(*args):
self.is_connecting = False
self.factory.on_ready(_unset_connecting_flag)
self.factory.connect()
def close(self):
"""Disconnect from ROS master."""
if self.is_connected:
def _wrapper_callback(proto):
self.emit('closing')
proto.send_close()
return proto
self.factory.on_ready(_wrapper_callback)
def run(self, timeout=CONNECTION_TIMEOUT):
"""Kick-starts a non-blocking event loop.
Args:
timeout: Timeout to wait until connection is ready.
"""
self.factory.manager.run()
wait_connect = threading.Event()
self.factory.on_ready(lambda _: wait_connect.set())
if not wait_connect.wait(timeout):
raise Exception('Failed to connect to ROS')
def run_forever(self):
"""Kick-starts a blocking loop to wait for events.
Depending on the implementations, and the client applications,
running this might be required or not.
"""
self.factory.manager.run_forever()
def run_event_loop(self):
LOGGER.warn(
'Deprecation warning: use run_forever instead of run_event_loop ')
self.run_forever()
def call_in_thread(self, callback):
"""Call the given function in a thread.
The threading implementation is deferred to the factory.
Args:
callback (:obj:`callable`): Callable function to be invoked.
"""
self.factory.manager.call_in_thread(callback)
def call_later(self, delay, callback):
"""Call the given function after a certain period of time has passed.
Args:
delay (:obj:`int`): Number of seconds to wait before invoking the callback.
callback (:obj:`callable`): Callable function to be invoked when ROS connection is ready.
"""
self.factory.manager.call_later(delay, callback)
def terminate(self):
"""Signals the termination of the main event loop."""
if self.is_connected:
self.close()
self.factory.manager.terminate()
def on(self, event_name, callback):
"""Add a callback to an arbitrary named event.
Args:
event_name (:obj:`str`): Name of the event to which to subscribe.
callback: Callable function to be executed when the event is triggered.
"""
self.factory.on(event_name, callback)
def off(self, event_name, callback=None):
"""Remove a callback from an arbitrary named event.
Args:
event_name (:obj:`str`): Name of the event from which to unsubscribe.
callback: Callable function. If ``None``, all callbacks of the event
will be removed.
"""
if callback:
self.factory.off(event_name, callback)
else:
self.factory.remove_all_listeners(event_name)
def emit(self, event_name, *args):
"""Trigger a named event."""
self.factory.emit(event_name, *args)
def on_ready(self, callback, run_in_thread=True):
"""Add a callback to be executed when the connection is established.
If a connection to ROS is already available, the callback is executed immediately.
Args:
callback: Callable function to be invoked when ROS connection is ready.
run_in_thread (:obj:`bool`): True to run the callback in a separate thread, False otherwise.
"""
def _wrapper_callback(proto):
if run_in_thread:
self.factory.manager.call_in_thread(callback)
else:
callback()
return proto
self.factory.on_ready(_wrapper_callback)
def send_on_ready(self, message):
"""Send message to the ROS Master once the connection is established.
If a connection to ROS is already available, the message is sent immediately.
Args:
message (:class:`.Message`): ROS Bridge Message to send.
"""
def _send_internal(proto):
proto.send_ros_message(message)
return proto
self.factory.on_ready(_send_internal)
def blocking_call_from_thread(self, callback, timeout):
"""Call the given function from a thread, and wait for the result synchronously
for as long as the timeout will allow.
Args:
callback: Callable function to be invoked from the thread.
timeout (:obj: int): Number of seconds to wait for the response before
raising an exception.
Returns:
The results from the callback, or a timeout exception.
"""
return self.factory.manager.blocking_call_from_thread(callback, timeout)
def get_service_request_callback(self, message):
"""Get the callback which, when called, sends the service request.
Args:
message (:class:`.Message`): ROS Bridge Message containing the request.
Returns:
A callable which makes the service request.
"""
def get_call_results(result_placeholder):
inner_callback = self.factory.manager.get_inner_callback(result_placeholder)
inner_errback = self.factory.manager.get_inner_errback(result_placeholder)
self.call_async_service(message, inner_callback, inner_errback)
return result_placeholder
return get_call_results
def call_sync_service(self, message, timeout):
"""Send a blocking service request to the ROS Master once the connection is established,
waiting for the result to be return.
If a connection to ROS is already available, the request is sent immediately.
Args:
message (:class:`.Message`): ROS Bridge Message containing the request.
timeout (:obj: int): Number of seconds to wait for the response before
raising an exception.
Returns:
Either returns the service request results or raises a timeout exception.
"""
get_call_results = self.get_service_request_callback(message)
return self.blocking_call_from_thread(get_call_results, timeout)
def call_async_service(self, message, callback, errback):
"""Send a service request to the ROS Master once the connection is established.
If a connection to ROS is already available, the request is sent immediately.
Args:
message (:class:`.Message`): ROS Bridge Message containing the request.
callback: Callback invoked on successful execution.
errback: Callback invoked on error.
"""
def _send_internal(proto):
proto.send_ros_service_request(message, callback, errback)
return proto
self.factory.on_ready(_send_internal)
def set_status_level(self, level, identifier):
level_message = Message({
'op': 'set_level',
'level': level,
'id': identifier
})
self.send_on_ready(level_message)
def get_topics(self, callback=None, errback=None):
"""Retrieve list of topics in ROS.
Note:
To make this a blocking call, pass ``None`` to the ``callback`` parameter .
Returns:
list: List of topics if blocking, otherwise ``None``.
"""
service = Service(self, '/rosapi/topics', 'rosapi/Topics')
result = service.call(ServiceRequest(), callback,
errback, timeout=ROSAPI_TIMEOUT)
if callback:
return
assert 'topics' in result
return result['topics']
def get_topic_type(self, topic, callback=None, errback=None):
"""Retrieve the type of a topic in ROS.
Note:
To make this a blocking call, pass ``None`` to the ``callback`` parameter .
Returns:
str: Topic type if blocking, otherwise ``None``.
"""
service = Service(self, '/rosapi/topic_type',
'rosapi/TopicType')
result = service.call(ServiceRequest({'topic': topic}),
callback, errback, timeout=ROSAPI_TIMEOUT)
if callback:
return
assert 'type' in result
return result['type']
def get_topics_for_type(self, topic_type, callback=None, errback=None):
"""Retrieve list of topics in ROS matching the specified type.
Note:
To make this a blocking call, pass ``None`` to the ``callback`` parameter .
Returns:
list: List of topics matching the specified type if blocking, otherwise ``None``.
"""
service = Service(self, '/rosapi/topics_for_type',
'rosapi/TopicsForType')
result = service.call(ServiceRequest({'type': topic_type}),
callback, errback, timeout=ROSAPI_TIMEOUT)
if callback:
return
assert 'topics' in result
return result['topics']
def get_services(self, callback=None, errback=None):
"""Retrieve list of active service names in ROS.
Note:
To make this a blocking call, pass ``None`` to the ``callback`` parameter .
Returns:
list: List of services if blocking, otherwise ``None``.
"""
service = Service(self, '/rosapi/services',
'rosapi/Services')
result = service.call(ServiceRequest(), callback,
errback, timeout=ROSAPI_TIMEOUT)
if callback:
return
assert 'services' in result
return result['services']
def get_service_type(self, service_name, callback=None, errback=None):
"""Retrieve the type of a service in ROS.
Note:
To make this a blocking call, pass ``None`` to the ``callback`` parameter .
Returns:
str: Service type if blocking, otherwise ``None``.
"""
service = Service(self, '/rosapi/service_type',
'rosapi/ServiceType')
result = service.call(ServiceRequest(
{'service': service_name}), callback, errback, timeout=ROSAPI_TIMEOUT)
if callback:
return
assert 'type' in result
return result['type']
def get_services_for_type(self, service_type, callback=None, errback=None):
"""Retrieve list of services in ROS matching the specified type.
Note:
To make this a blocking call, pass ``None`` to the ``callback`` parameter .
Returns:
list: List of services matching the specified type if blocking, otherwise ``None``.
"""
service = Service(self, '/rosapi/services_for_type',
'rosapi/ServicesForType')
result = service.call(ServiceRequest({'type': service_type}),
callback, errback, timeout=ROSAPI_TIMEOUT)
if callback:
return
assert 'services' in result
return result['services']
def get_service_request_details(self, type, callback=None, errback=None):
"""Retrieve details of a ROS Service Request.
Note:
To make this a blocking call, pass ``None`` to the ``callback`` parameter .
Returns:
Service Request details if blocking, otherwise ``None``.
"""
service = Service(self, '/rosapi/service_request_details',
'rosapi/ServiceRequestDetails')
result = service.call(ServiceRequest({'type': type}),
callback, errback, timeout=ROSAPI_TIMEOUT)
if callback:
return
return result
def get_service_response_details(self, type, callback=None, errback=None):
"""Retrieve details of a ROS Service Response.
Note:
To make this a blocking call, pass ``None`` to the ``callback`` parameter .
Returns:
Service Response details if blocking, otherwise ``None``.
"""
service = Service(self, '/rosapi/service_response_details',
'rosapi/ServiceResponseDetails')
result = service.call(ServiceRequest({'type': type}),
callback, errback, timeout=ROSAPI_TIMEOUT)
if callback:
return
return result
def get_message_details(self, message_type, callback=None, errback=None):
"""Retrieve details of a message type in ROS.
Note:
To make this a blocking call, pass ``None`` to the ``callback`` parameter .
Returns:
Message type details if blocking, otherwise ``None``.
"""
service = Service(self, '/rosapi/message_details',
'rosapi/MessageDetails')
result = service.call(ServiceRequest(
{'type': message_type}), callback, errback, timeout=ROSAPI_TIMEOUT)
if callback:
return
return result
def get_params(self, callback=None, errback=None):
"""Retrieve list of param names from the ROS Parameter Server.
Note:
To make this a blocking call, pass ``None`` to the ``callback`` parameter .
Returns:
list: List of parameters if blocking, otherwise ``None``.
"""
service = Service(self, '/rosapi/get_param_names',
'rosapi/GetParamNames')
result = service.call(ServiceRequest(), callback,
errback, timeout=ROSAPI_TIMEOUT)
if callback:
return
assert 'names' in result
return result['names']
def get_param(self, name, callback=None, errback=None):
"""Get the value of a parameter from the ROS Parameter Server.
Note:
To make this a blocking call, pass ``None`` to the ``callback`` parameter .
Returns:
Parameter value if blocking, otherwise ``None``.
"""
param = Param(self, name)
return param.get(callback, errback, timeout=ROSAPI_TIMEOUT)
def set_param(self, name, value, callback=None, errback=None):
"""Set the value of a parameter from the ROS Parameter Server.
Note:
To make this a blocking call, pass ``None`` to the ``callback`` parameter .
"""
param = Param(self, name)
param.set(value, callback, errback, timeout=ROSAPI_TIMEOUT)
def delete_param(self, name, callback=None, errback=None):
"""Delete parameter from the ROS Parameter Server.
Note:
To make this a blocking call, pass ``None`` to the ``callback`` parameter .
"""
param = Param(self, name)
param.delete(callback, errback, timeout=ROSAPI_TIMEOUT)
def get_action_servers(self, callback, errback=None):
"""Retrieve list of action servers in ROS."""
service = Service(self, '/rosapi/action_servers',
'rosapi/GetActionServers')
service.call(ServiceRequest(), callback, errback)
def get_nodes(self, callback=None, errback=None):
"""Retrieve list of active node names in ROS.
Note:
To make this a blocking call, pass ``None`` to the ``callback`` parameter .
"""
service = Service(self, '/rosapi/nodes',
'rosapi/Nodes')
result = service.call(ServiceRequest(), callback, errback)
if callback:
return
assert 'nodes' in result
return result['nodes']
def get_node_details(self, node, callback=None, errback=None):
"""Retrieve list subscribed topics, publishing topics and services of a specific node name.
Note:
To make this a blocking call, pass ``None`` to the ``callback`` parameter .
"""
service = Service(self, '/rosapi/node_details',
'rosapi/NodeDetails')
result = service.call(ServiceRequest({'node': node}), callback, errback)
if callback:
return
output = {
'services': result['services'],
'subscribing': result['subscribing'],
'publishing': result['publishing']
}
return output
if __name__ == '__main__':
FORMAT = '%(asctime)-15s [%(levelname)s] %(message)s'
logging.basicConfig(level=logging.DEBUG, format=FORMAT)
ros_client = Ros('127.0.0.1', 9090)
ros_client.on_ready(lambda: ros_client.get_topics(print))
ros_client.call_later(3, ros_client.close)
ros_client.call_later(5, ros_client.terminate)
ros_client.run_forever()