Source code for compas_fab.backends.pybullet.client


from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import os
import shutil
import tempfile
from itertools import combinations

import compas
from compas.geometry import Frame
from compas.robots import RobotModel

from compas_fab.backends.interfaces.client import ClientInterface
from compas_fab.robots import Robot
from compas_fab.utilities import LazyLoader

from . import const
from .conversions import frame_from_pose
from .conversions import pose_from_frame
from .exceptions import CollisionError
from .planner import PyBulletPlanner
from .utils import LOG
from .utils import redirect_stdout

pybullet = LazyLoader('pybullet', globals(), 'pybullet')


__all__ = [
    'PyBulletClient',
]


class PyBulletBase(object):
    def __init__(self, connection_type):
        self.client_id = None
        self.connection_type = connection_type

    def connect(self, shadows=True, color=None, width=None, height=None):
        """Connect from the PyBullet server.

        Parameters
        ----------
        shadows : :obj:`bool`
            Display shadows in the GUI. Defaults to ``True``.
        color : :obj:`tuple` of :obj:`float`
            Set the background color of the GUI. Defaults to ``None``.
        width : :obj:`int`
            Set the width in pixels of the GUI. Defaults to ``None``.
        height : :obj:`int`
            Set the height in pixels of GUI. Defaults to ``None``.

        Returns
        -------
        ``None``
        """
        # Shared Memory: execute the physics simulation and rendering in a separate process
        # https://github.com/bulletphysics/bullet3/blob/master/examples/pybullet/examples/vrminitaur.py#L7
        self._detect_display()
        options = self._compose_options(color, width, height)
        with redirect_stdout():
            self.client_id = pybullet.connect(const.CONNECTION_TYPE[self.connection_type], options=options)
        if self.client_id < 0:
            raise Exception('Error in establishing connection with PyBullet.')
        if self.connection_type == 'gui':
            self._configure_debug_visualizer(shadows)

    def _detect_display(self):
        if self.connection_type == 'gui' and not compas.OSX and not compas.WINDOWS and ('DISPLAY' not in os.environ):
            self.connection_type = 'direct'
            print('No display detected! Continuing without GUI.')

    @staticmethod
    def _compose_options(color, width, height):
        options = ''
        if color is not None:
            options += '--background_color_red={} --background_color_green={} --background_color_blue={}'.format(*color)
        if width is not None:
            options += '--width={}'.format(width)
        if height is not None:
            options += '--height={}'.format(height)
        return options

    def _configure_debug_visualizer(self, shadows):
        pybullet.configureDebugVisualizer(pybullet.COV_ENABLE_GUI, False, physicsClientId=self.client_id)
        pybullet.configureDebugVisualizer(pybullet.COV_ENABLE_TINY_RENDERER, False, physicsClientId=self.client_id)
        pybullet.configureDebugVisualizer(pybullet.COV_ENABLE_RGB_BUFFER_PREVIEW, False, physicsClientId=self.client_id)
        pybullet.configureDebugVisualizer(pybullet.COV_ENABLE_DEPTH_BUFFER_PREVIEW, False, physicsClientId=self.client_id)
        pybullet.configureDebugVisualizer(pybullet.COV_ENABLE_SEGMENTATION_MARK_PREVIEW, False, physicsClientId=self.client_id)
        pybullet.configureDebugVisualizer(pybullet.COV_ENABLE_SHADOWS, shadows, physicsClientId=self.client_id)

    def disconnect(self):
        """Disconnect from the PyBullet server."""
        with redirect_stdout():
            return pybullet.disconnect(physicsClientId=self.client_id)

    @property
    def is_connected(self):
        """Indicates whether the client has an active connection.

        Returns
        -------
        :obj:`bool`
            ``True`` if connected, ``False`` otherwise.
        """
        if self.client_id is None:
            return False
        return pybullet.getConnectionInfo(physicsClientId=self.client_id)['isConnected'] == 1


[docs]class PyBulletClient(PyBulletBase, ClientInterface): """Interface to use pybullet as backend. :class:`compasfab.backends.PyBulletClient` is a context manager type, so it's best used in combination with the ``with`` statement to ensure resource deallocation. Thanks to Yijiang Huang and his work in `pybullet_planning <https://github.com/yijiangh/pybullet_planning>`_ for much inspiration. Parameters ---------- connection_type : :obj:`str` Sets the connection type. Defaults to ``'gui'``. verbose : :obj:`bool` Use verbose logging. Defaults to ``False``. Examples -------- >>> from compas_fab.backends import PyBulletClient >>> with PyBulletClient(connection_type='direct') as client: ... print('Connected: %s' % client.is_connected) Connected: True """
[docs] def __init__(self, connection_type='gui', verbose=False): super(PyBulletClient, self).__init__(connection_type) self.planner = PyBulletPlanner(self) self.verbose = verbose self.collision_objects = {} self.attached_collision_objects = {} self.disabled_collisions = set()
def __enter__(self): self.connect() return self def __exit__(self, *args): self.disconnect()
[docs] def step_simulation(self): """By default, the physics server will not step the simulation, unless you explicitly send a ``step_simulation`` command. This method will perform all the actions in a single forward dynamics simulation step such as collision detection, constraint solving and integration. The timestep is 1/240 second. """ pybullet.stepSimulation(physicsClientId=self.client_id)
[docs] def load_robot(self, urdf_file): """Create a pybullet robot using the input urdf file. Parameters ---------- urdf_file : :obj:`str` or file object Absolute file path to the urdf file name or file object. The mesh file can be linked by either `"package::"` or relative path. Notes ----- By default, PyBullet will use the convex hull of any mesh loaded from a URDF for collision detection. Amending the link tag as ``<link concave="yes" name="<name of link>">`` will make the mesh concave for static meshes (see this `example <https://github.com/bulletphysics/bullet3/blob/master/data/samurai.urdf>`_). For non-static concave meshes, replace the OBJ files within the URDF with those generated by ``pybullet.vhacd``. """ robot_model = RobotModel.from_urdf_file(urdf_file) robot = Robot(robot_model, client=self) with redirect_stdout(enabled=not self.verbose): pybullet_uid = pybullet.loadURDF(urdf_file, useFixedBase=True, physicsClientId=self.client_id, flags=pybullet.URDF_USE_SELF_COLLISION) robot.attributes['pybullet_uid'] = pybullet_uid self._add_ids_to_robot_joints(robot) self._add_ids_to_robot_links(robot) return robot
def _add_ids_to_robot_joints(self, robot): joint_ids = self._get_joint_ids(robot.attributes['pybullet_uid']) for joint_id in joint_ids: joint_name = self._get_joint_name(joint_id, robot.attributes['pybullet_uid']) joint = robot.model.get_joint_by_name(joint_name) pybullet_attr = {'id': joint_id} joint.attr.setdefault('pybullet', {}).update(pybullet_attr) def _add_ids_to_robot_links(self, robot): joint_ids = self._get_joint_ids(robot.attributes['pybullet_uid']) for link_id in joint_ids: link_name = self._get_link_name(link_id, robot.attributes['pybullet_uid']) link = robot.model.get_link_by_name(link_name) pybullet_attr = {'id': link_id} link.attr.setdefault('pybullet', {}).update(pybullet_attr) def _get_joint_id_by_name(self, name, robot): return robot.model.get_joint_by_name(name).attr['pybullet']['id'] def _get_joint_ids_by_name(self, names, robot): return tuple(self._get_joint_id_by_name(name, robot) for name in names) def _get_link_id_by_name(self, name, robot): return robot.model.get_link_by_name(name).attr['pybullet']['id'] def _get_link_ids_by_name(self, names, robot): return tuple(self._get_link_id_by_name(name, robot) for name in names)
[docs] def filter_configurations_in_collision(self, robot, configurations): """Filters from a list of configurations those which are in collision. Used for a custom inverse kinematics function. Parameters ---------- robot : :class:`compas_fab.robots.Robot` Robot whose configurations may be in collision. configurations : :obj:`list` of :class:`compas_fab.robots.Configuration` List of configurations to be checked for collisions. Returns ------- :obj:`list` of :class:`compas_fab.robots.Configuration` The same list of configurations with those in collision replaced with ``None``. """ for i, configuration in enumerate(configurations): if not configuration: # if an ik solution was already removed continue try: self.check_collisions(robot, configuration) except CollisionError: configurations[i] = None
# =======================================
[docs] def check_collisions(self, robot, configuration=None): """Checks whether the current or given configuration is in collision. Parameters ---------- robot : :class:`compas_fab.robots.Robot` Robot whose configuration may be in collision. configuration : :class:`compas_fab.robots.Configuration` Configuration to be checked for collisions. If ``None`` is given, the current configuration will be checked. Defaults to ``None``. Raises ------- :class:`compas_fab.backends.pybullet.DetectedCollision` """ if configuration: joint_ids = self._get_joint_ids_by_name(configuration.joint_names, robot) self._set_joint_positions(joint_ids, configuration.values, robot.attributes['pybullet_uid']) self.check_collision_with_objects(robot) self.check_robot_self_collision(robot)
[docs] def check_collision_with_objects(self, robot): """Checks whether the robot and its attached collision objects with its current configuration is is colliding with any collision objects. Parameters ---------- robot : :class:`compas_fab.robots.Robot` Robot whose configuration may be in collision. Raises ------- :class:`compas_fab.backends.pybullet.DetectedCollision` """ for name, body_ids in self.collision_objects.items(): for body_id in body_ids: self._check_collision(robot.attributes['pybullet_uid'], 'robot', body_id, name)
[docs] def check_robot_self_collision(self, robot): """Checks whether the robot and its attached collision objects with its current configuration is colliding with itself. Parameters ---------- robot : :class:`compas_fab.robots.Robot` Robot whose configuration may be in collision. Raises ------- :class:`compas_fab.backends.pybullet.DetectedCollision` """ link_names = robot.get_link_names_with_collision_geometry() # check for collisions between robot links for link_1_name, link_2_name in combinations(link_names, 2): if {link_1_name, link_2_name} in self.disabled_collisions: continue link_1_id = self._get_link_id_by_name(link_1_name, robot) link_2_id = self._get_link_id_by_name(link_2_name, robot) self._check_collision(robot.attributes['pybullet_uid'], link_1_name, robot.attributes['pybullet_uid'], link_2_name, link_1_id, link_2_id) # check for collisions between robot links and attached collision objects for link_name in link_names: link_id = self._get_link_id_by_name(link_name, robot) for name, constraint_info_list in self.attached_collision_objects.items(): for constraint_info in constraint_info_list: self._check_collision(robot.attributes['pybullet_uid'], link_name, constraint_info.body_id, name, link_id)
[docs] def check_collision_objects_for_collision(self): """Checks whether any of the collision objects are colliding. Raises ------- :class:`compas_fab.backends.CollisionError` """ names = self.collision_objects.keys() for name_1, name_2 in combinations(names, 2): for body_1_id in self.collision_objects[name_1]: for body_2_id in self.collision_objects[name_2]: self._check_collision(body_1_id, name_1, body_2_id, name_2)
def _check_collision(self, body_1_id, body_1_name, body_2_id, body_2_name, link_index_1=None, link_index_2=None): kwargs = { 'bodyA': body_1_id, 'bodyB': body_2_id, 'distance': 0, 'physicsClientId': self.client_id, 'linkIndexA': link_index_1, 'linkIndexB': link_index_2, } kwargs = {key: value for key, value in kwargs.items() if value is not None} pts = pybullet.getClosestPoints(**kwargs) if pts: LOG.warning("Collision between '{}' and '{}'".format(body_1_name, body_2_name)) raise CollisionError(body_1_name, body_2_name) # ====================================== def _get_base_frame(self, body_id): pose = pybullet.getBasePositionAndOrientation(body_id, physicsClientId=self.client_id) return frame_from_pose(pose) def _get_base_name(self, body_id): return self._get_body_info(body_id).base_name.decode(encoding='UTF-8') def _get_link_state(self, link_id, body_id): return const.LinkState(*pybullet.getLinkState(body_id, link_id, computeForwardKinematics=True, physicsClientId=self.client_id)) def _get_body_info(self, body_id): return const.BodyInfo(*pybullet.getBodyInfo(body_id, physicsClientId=self.client_id)) def _get_joint_info(self, joint_id, body_id): return const.JointInfo(*pybullet.getJointInfo(body_id, joint_id, physicsClientId=self.client_id)) def _get_num_joints(self, body_id): return pybullet.getNumJoints(body_id, physicsClientId=self.client_id) def _get_joint_ids(self, body_id): return list(range(self._get_num_joints(body_id))) def _get_joint_name(self, joint_id, body_id): return self._get_joint_info(joint_id, body_id).jointName.decode('UTF-8') def _get_link_name(self, link_id, body_id): if link_id == const.BASE_LINK_ID: return self._get_base_name(body_id) return self._get_joint_info(link_id, body_id).linkName.decode('UTF-8') def _get_link_frame(self, link_id, body_id): if link_id == const.BASE_LINK_ID: return self._get_base_frame(body_id) link_state = self._get_link_state(link_id, body_id) pose = (link_state.worldLinkFramePosition, link_state.worldLinkFrameOrientation) return frame_from_pose(pose) # ======================================= def _set_base_frame(self, frame, body_id): point, quaternion = pose_from_frame(frame) pybullet.resetBasePositionAndOrientation(body_id, point, quaternion, physicsClientId=self.client_id) def _set_joint_position(self, joint_id, value, body_id): pybullet.resetJointState(body_id, joint_id, value, targetVelocity=0, physicsClientId=self.client_id) def _set_joint_positions(self, joint_ids, values, body_id): if len(joint_ids) != len(values): raise Exception('Joints and values must have the same length.') for joint_id, value in zip(joint_ids, values): self._set_joint_position(joint_id, value, body_id)
[docs] def set_robot_configuration(self, robot, configuration, group=None): """Sets the robot's pose to the given configuration. Should be followed by `step_simulation` for visualization purposes. Parameters ---------- robot : :class:`compas_fab.robots.Robot` The robot to be configured. configuration : :class:`compas_fab.robots.Configuration` If a full configuration is not given, the values from :meth:`compas_fab.robots.Robot.zero_configuration` will be used for the missing ones. Joint names are expected to be supplied in the configuration. group : :obj:`str`, optional The planning group used for calculation. Defaults to the robot's main planning group. """ default_config = robot.zero_configuration() full_configuration = robot.merge_group_with_full_configuration(configuration, default_config, group) joint_ids = self._get_joint_ids_by_name(full_configuration.joint_names, robot) self._set_joint_positions(joint_ids, full_configuration.values, robot.attributes['pybullet_uid']) return full_configuration
# =======================================
[docs] def convert_mesh_to_body(self, mesh, frame, _name=None, concavity=False, mass=const.STATIC_MASS): """Convert compas mesh and its frame to a pybullet body Parameters ---------- mesh : :class:`compas.datastructures.Mesh` frame : :class:`compas.geometry.Frame` _name : :obj:`str`, optional Name of the mesh for tagging in PyBullet's GUI concavity : :obj:`bool`, optional When ``False`` (the default), the mesh will be loaded as its convex hull for collision checking purposes. When ``True``, a non-static mesh will be decomposed into convex parts using v-HACD. mass : :obj:`float`, optional Mass of the body to be created, in kg. If ``0`` mass is given (the default), the object is static. Returns ------- :obj:`int` """ tmp_dir = tempfile.mkdtemp() tmp_obj_path = os.path.join(tmp_dir, 'temp.obj') try: mesh.to_obj(tmp_obj_path) tmp_obj_path = self._handle_concavity(tmp_obj_path, tmp_dir, concavity, mass) pyb_body_id = self.body_from_obj(tmp_obj_path, concavity=concavity, mass=mass) self._set_base_frame(frame, pyb_body_id) # The following lines are for visual debugging purposes # To be deleted or rewritten later. # if name: # from pybullet_planning import add_body_name # add_body_name(pyb_body, name) finally: shutil.rmtree(tmp_dir) return pyb_body_id
@staticmethod def _handle_concavity(tmp_obj_path, tmp_dir, concavity, mass): if not concavity or mass == const.STATIC_MASS: return tmp_obj_path tmp_vhacd_obj_path = os.path.join(tmp_dir, 'vhacd_temp.obj') tmp_log_path = os.path.join(tmp_dir, 'log.txt') with redirect_stdout(): pybullet.vhacd(tmp_obj_path, tmp_vhacd_obj_path, tmp_log_path) return tmp_vhacd_obj_path
[docs] def body_from_obj(self, path, scale=1., concavity=False, mass=const.STATIC_MASS, collision=True, color=const.GREY): """Create a PyBullet body from an OBJ file. Parameters ---------- path : :obj:`str` Path to the OBJ file. scale : :obj:`float`, optional Factor by which to scale the mesh. Defaults to ``1.`` concavity : :obj:`bool`, optional When ``False`` (the default), the mesh will be loaded as its convex hull for collision checking purposes. Only applicable to static (massless) objects. For non-static meshes, use OBJs preprocessed by ``pybullet.vhacd``. mass : :obj:`float`, optional Mass of the body to be created, in kg. If `0` mass is given (the default), the object is static. collision : :obj:`bool` When ``True``, body will be included in collision checking calculations. Defaults to ``True``. color : :obj:`tuple` of :obj:`float` RGBa color components of the body. Returns ------- :obj:`int` """ concavity &= mass == const.STATIC_MASS geometry_args = self._get_geometry_args(path, concavity=concavity, scale=scale) collision_args = self._get_collision_args(geometry_args) collision_id = self._create_collision_shape(collision_args) if collision else const.NULL_ID visual_args = self._get_visual_args(geometry_args, color=color) visual_id = self._create_visual_shape(visual_args) body_id = self._create_body(collision_id, visual_id, mass=mass) return body_id
def _create_body(self, collision_id=const.NULL_ID, visual_id=const.NULL_ID, mass=const.STATIC_MASS): return pybullet.createMultiBody(baseMass=mass, baseCollisionShapeIndex=collision_id, baseVisualShapeIndex=visual_id, physicsClientId=self.client_id) @staticmethod def _create_collision_shape(collision_args): return pybullet.createCollisionShape(**collision_args) @staticmethod def _create_visual_shape(visual_args): if visual_args.get('rgbaColor') is None: return const.NULL_ID return pybullet.createVisualShape(**visual_args) def _get_visual_args(self, geometry_args, frame=Frame.worldXY(), color=const.RED, specular=None): point, quaternion = pose_from_frame(frame) visual_args = { 'rgbaColor': color, 'visualFramePosition': point, 'visualFrameOrientation': quaternion, 'physicsClientId': self.client_id, } visual_args.update(geometry_args) if specular is not None: visual_args['specularColor'] = specular return visual_args def _get_collision_args(self, geometry_args, frame=Frame.worldXY()): point, quaternion = pose_from_frame(frame) collision_args = { 'collisionFramePosition': point, 'collisionFrameOrientation': quaternion, 'physicsClientId': self.client_id, } collision_args.update(geometry_args) if 'length' in collision_args: # pybullet bug visual => length, collision => height collision_args['height'] = collision_args['length'] del collision_args['length'] return collision_args @staticmethod def _get_geometry_args(path, concavity=False, scale=1.): geometry_args = { 'shapeType': pybullet.GEOM_MESH, 'fileName': path, 'meshScale': [scale] * 3, } if concavity: geometry_args['flags'] = pybullet.GEOM_FORCE_CONCAVE_TRIMESH return geometry_args