from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import binascii
import logging
import os
import tempfile
from collections import OrderedDict
import roslibpy
from compas.datastructures import Mesh
from compas.datastructures import mesh_transform
from compas.files import XML
from compas.geometry import Transformation
from compas.robots.resources.basic import _get_file_format
from compas.robots.resources.basic import _mesh_import
from compas.utilities import geometric_key
LOGGER = logging.getLogger('compas_fab.backends.ros')
TIMEOUT = 10
__all__ = [
'RosFileServerLoader',
]
def _cache_file_exists(filename):
return os.path.isfile(filename)
def _read_file(filename, mode='r'):
LOGGER.debug('Loading file %s from local cache dir', filename)
with open(filename, mode) as f:
return f.read()
def _write_file(filename, file_contents, mode='w'):
LOGGER.debug('Saving file to %s', filename)
dirname = os.path.dirname(filename)
if not os.path.isdir(dirname):
os.makedirs(dirname)
with open(filename, mode) as f:
f.write(file_contents)
[docs]class RosFileServerLoader(object):
"""Allows to retrieve the mesh files specified in the robot model from the
ROS File Server. Optionally, it stores them on the local file system,
allowing for faster re-loads as well as enabling them to be loaded by
the local package loaders afterwards.
Parameters
----------
ros : :class:`compas_fab.backends.RosClient`
The ROS client.
local_cache : bool
``True`` to store a local copy of the ROS files, otherwise ``False``.
Defaults to ``False``.
local_cache_directory : str, optional
Directory name to store the cached files. Only used if
``local_cache`` is ``True``. Defaults to ``~/robot_description``.
precision : float
Defines precision for importing/loading meshes. Defaults to ``compas.PRECISION``.
"""
[docs] def __init__(self, ros=None, local_cache=False, local_cache_directory=None, precision=None):
self.robot_name = None
self.schema_prefix = 'package://'
self.ros = ros
self.local_cache_directory = None
self.local_cache_enabled = local_cache
self.precision = precision
if self.local_cache_enabled:
self.local_cache_directory = local_cache_directory or os.path.join(
os.path.expanduser('~'), 'robot_description')
@property
def _robot_resource_path(self):
if not self.robot_name:
raise Exception('Robot name is not assigned, make sure you loaded URDF first')
if not self.local_cache_directory:
raise ValueError('local_cache_directory not set')
return os.path.join(self.local_cache_directory, self.robot_name)
@property
def _urdf_filename(self):
return os.path.join(self._robot_resource_path, 'urdf', 'robot_description.urdf')
@property
def _srdf_filename(self):
return os.path.join(self._robot_resource_path, 'robot_description_semantic.srdf')
[docs] def load_urdf(self, parameter_name='/robot_description'):
"""Loads a URDF model from the specified ROS parameter.
Parameters
----------
parameter_name : str, optional
Name of the ROS parameter containing the robot description.
Defaults to ``/robot_description``.
Returns
-------
str
URDF model of the robot currently loaded in ROS.
"""
if self.local_cache_enabled and self.robot_name:
filename = self._urdf_filename
if _cache_file_exists(filename):
return _read_file(filename)
param = roslibpy.Param(self.ros, parameter_name)
urdf = param.get(timeout=TIMEOUT)
self.robot_name = self._read_robot_name(urdf)
if self.local_cache_enabled:
_write_file(self._urdf_filename, urdf)
return urdf
[docs] def load_srdf(self, parameter_name='/robot_description_semantic'):
"""Loads an SRDF model from the specified ROS parameter.
Parameters
----------
parameter_name : str, optional
Name of the ROS parameter containing the robot semantics.
Defaults to ``/robot_description_semantic``.
Returns
-------
str
SRDF model of the robot currently loaded in ROS.
"""
if self.local_cache_enabled and self.robot_name:
filename = self._srdf_filename
if _cache_file_exists(filename):
return _read_file(filename)
param = roslibpy.Param(self.ros, parameter_name)
srdf = param.get(timeout=TIMEOUT)
if self.local_cache_enabled:
_write_file(self._srdf_filename, srdf)
return srdf
def _read_robot_name(self, robot_description):
xml = XML.from_string(robot_description)
return xml.root.attrib['name']
[docs] def can_load_mesh(self, url):
"""Determine whether this loader can load a given mesh URL.
Parameters
----------
url : str
Mesh URL.
Returns
-------
bool
``True`` if the URL uses the ``package://` scheme,
otherwise ``False``.
"""
return url.startswith(self.schema_prefix)
[docs] def load_mesh(self, url):
"""Loads a mesh from local storage.
Parameters
----------
url : str
Mesh location
Returns
-------
:class:`Mesh` or list of :class:`Mesh`
Instance of a mesh.
"""
use_local_file = False
file_extension = _get_file_format(url)
if self.local_cache_enabled:
local_filename = self._local_mesh_filename(url)
use_local_file = _cache_file_exists(local_filename)
else:
_, local_filename = tempfile.mkstemp(suffix='.' + file_extension, prefix='ros_fileserver_')
if not use_local_file:
service = roslibpy.Service(self.ros, '/file_server/get_file', 'file_server/GetBinaryFile')
request = roslibpy.ServiceRequest(dict(name=url))
response = service.call(request, timeout=TIMEOUT)
file_content = binascii.a2b_base64(response.data['value'])
if file_extension == 'dae':
file_content = file_content.replace(b'xmlns="http://www.collada.org/2005/11/COLLADASchema"', b'')
_write_file(local_filename, file_content, 'wb')
else:
LOGGER.debug('Loading mesh file %s from local cache dir', local_filename)
return _fileserver_mesh_import(url, local_filename, self.precision)
def _local_mesh_filename(self, url):
return os.path.abspath(os.path.join(self._robot_resource_path, url[len('package://'):]))
def _dae_mesh_importer(filename, precision):
"""This is a very simple implementation of a DAE/Collada parser.
Collada specification: https://www.khronos.org/files/collada_spec_1_5.pdf
"""
dae = XML.from_file(filename)
meshes = []
visual_scenes = dae.root.find('library_visual_scenes')
materials = dae.root.find('library_materials')
effects = dae.root.find('library_effects')
for geometry in dae.root.findall('library_geometries/geometry'):
mesh_xml = geometry.find('mesh')
mesh_id = geometry.attrib['id']
matrix_node = visual_scenes.find('visual_scene/node/instance_geometry[@url="#{}"]/../matrix'.format(mesh_id))
transform = None
if matrix_node is not None:
M = [float(i) for i in matrix_node.text.split()]
if M != [1., 0., 0., 0.,
0., 1., 0., 0.,
0., 0., 1., 0.,
0., 0., 0., 1.]:
M = M[0:4], M[4:8], M[8:12], M[12:16]
transform = Transformation.from_matrix(M)
primitive_element_sets = []
primitive_element_sets.extend(mesh_xml.findall('triangles'))
primitive_element_sets.extend(mesh_xml.findall('polylist'))
if len(primitive_element_sets) == 0:
raise Exception('No primitive elements found (currently only triangles and polylist are supported)')
for primitive_element_set in primitive_element_sets:
primitive_tag = primitive_element_set.tag
primitive_set_data = primitive_element_set.find('p').text.split()
mesh_colors = {}
if materials is not None and effects is not None:
try:
instance_effect = None
material_id = primitive_element_set.attrib.get('material')
primitive_count = int(primitive_element_set.attrib['count'])
if material_id is not None:
instance_effect = materials.find('material[@id="{}"]/instance_effect'.format(material_id))
if instance_effect is not None:
instance_effect_id = instance_effect.attrib['url'][1:]
colors = effects.findall('effect[@id="{}"]/profile_COMMON/technique/phong/*/color'.format(instance_effect_id))
for color_node in colors:
rgba = [float(i) for i in color_node.text.split()]
mesh_colors['mesh_color.{}'.format(color_node.attrib['sid'])] = rgba
except Exception:
LOGGER.exception('Exception while loading materials, all materials of mesh file %s will be ignored ', filename)
all_offsets = sorted([int(i.attrib['offset']) for i in primitive_element_set.findall('input[@offset]')])
if not all_offsets:
raise Exception('Primitive element node does not contain offset information! Primitive tag={}'.format(primitive_tag))
vertices_input = primitive_element_set.find('input[@semantic="VERTEX"]')
vertices_id = vertices_input.attrib['source'][1:]
vertices_link = mesh_xml.find('vertices[@id="{}"]/input'.format(vertices_id))
positions = mesh_xml.find('source[@id="{}"]/float_array'.format(vertices_link.attrib['source'][1:]))
positions = positions.text.split()
vertices = [[float(p) for p in positions[i:i + 3]] for i in range(0, len(positions), 3)]
skip_step = 1 + all_offsets[-1]
if primitive_tag == 'triangles':
vcount = [3] * primitive_count
elif primitive_tag == 'polylist':
vcount = [int(v) for v in primitive_element_set.find('vcount').text.split()]
if len(vcount) != primitive_count:
raise Exception('Primitive count does not match vertex per face count, vertex input id={}'.format(vertices_id))
fkeys = [int(f) for f in primitive_set_data[::skip_step]]
faces = []
for i in range(primitive_count):
a = i * vcount[i]
b = a + vcount[i]
faces.append(fkeys[a:b])
index_key = OrderedDict()
vertex = OrderedDict()
for i, xyz in enumerate(vertices):
key = geometric_key(xyz, precision)
index_key[i] = key
vertex[key] = xyz
key_index = {key: index for index, key in enumerate(vertex)}
index_index = {index: key_index[key] for index, key in iter(index_key.items())}
vertices = [xyz for xyz in iter(vertex.values())]
faces = [[index_index[index] for index in face] for face in faces]
mesh = Mesh.from_vertices_and_faces(vertices, faces)
if mesh_colors:
mesh.attributes.update(mesh_colors)
if transform:
mesh_transform(mesh, transform)
meshes.append(mesh)
return meshes
def _fileserver_mesh_import(url, filename, precision=None):
"""Internal function that adds primitive support for DAE files
to the _mesh_import function of compas.robots."""
file_extension = _get_file_format(url)
if file_extension == 'dae':
return _dae_mesh_importer(filename, precision)
else:
return _mesh_import(url, filename)
if __name__ == "__main__":
"""
Start following processes on client side:
roslaunch YOUR_ROBOT_moveit_config demo.launch rviz_tutorial:=true
roslaunch rosbridge_server rosbridge_websocket.launch
roslaunch file_server.launch
"""
import logging
from compas.robots import RobotModel
from compas_fab.backends import RosClient
FORMAT = '%(asctime)-15s [%(levelname)s] %(message)s'
logging.basicConfig(level=logging.DEBUG, format=FORMAT)
with RosClient() as ros:
local_directory = os.path.join(os.path.expanduser('~'), 'workspace', 'robot_description')
importer = RosFileServerLoader(ros, local_cache=True, local_cache_directory=local_directory)
importer.robot_name = 'abb_irb1600_6_12'
urdf = importer.load_urdf()
srdf = importer.load_srdf()
model = RobotModel.from_urdf_string(urdf)
model.load_geometry(importer)
print(model)