Examples

Note

This tutorial assumes that you have already installed compas_eve. If you haven’t, please follow the instructions in the installation section.

The main feature of compas_eve is to allow communication between different parts of a program using messages. These messages are sent around using a publisher/subscriber model, or pub/sub for short. In pub/sub communication, messages are not sent directly from a sender to a receiver, instead, they are sent to a Topic. A topic is like a mailbox, the Publisher sends messages to the topic without the need for a subcriber to be actively listening for messages, and also the Subscriber can start listening for messages on a topic without the need for any publisher to be currently sending anything.

This creates a highly decoupled communication model that facilitates the creation of complex software with simple code.

An additional benefit of pub/sub is that it is not limited to 1-to-1 communication: on a single topic, there can be multiple publishers and multiple subscribers all communicating at the same time without additional coordination.

Hello World

Let’s see a Hello World example of this type of communication using compas_eve. This example is very contrived because both the publisher and the subscriber are defined in the same script and the same thread.

import time

from compas_eve import Publisher
from compas_eve import Subscriber
from compas_eve import Topic


topic = Topic("/compas_eve/hello_world/")

publisher = Publisher(topic)
subcriber = Subscriber(topic, callback=lambda msg: print(f"Received message: {msg}"))
subcriber.subscribe()

for i in range(20):
    msg = dict(text=f"Hello world #{i}")
    print(f"Publishing message: {msg}")
    publisher.publish(msg)
    time.sleep(1)

This example is the simplest possible, and it only shows the main concepts needed to communicate. In particular, compas_eve uses by default an in-memory transport for the messages, this means that messages are can only be received within the same program.

Hello Threaded World

Let’s try to extend this first example and add multiple threads to illustrate multi-threaded communication:

import time
from threading import Thread

from compas_eve import Publisher
from compas_eve import Subscriber
from compas_eve import Topic

topic = Topic("/compas_eve/hello_world/")


def start_publisher():
    publisher = Publisher(topic)

    for i in range(20):
        msg = dict(text=f"Hello world #{i}")
        print(f"Publishing message: {msg}")
        publisher.publish(msg)
        time.sleep(1)


def start_subscriber():
    subcriber = Subscriber(topic, callback=lambda msg: print(f"Received message: {msg}"))
    subcriber.subscribe()


# Define one thread for each
t1 = Thread(target=start_subscriber)
t2 = Thread(target=start_publisher)

# Start both threads
t1.start()
t2.start()

# Wait until both threads complete
t1.join()
t2.join()

This get more interesting! Now the publisher and subscriber are in separate threads. However, the in-memory transport is limited to same-process. This means that if we launch this script twice, the messages will not jump from one process to the other. In other words, if we want to communicate with a subscriber on a different process on the machine, or even on a completely separate machine, we need to take an extra step.

Hello Distributed World

Fortunately, it is very easy to extend our example and enable communication across processes, machines, networks, continents, and anything that is connected to the Internet!

The only difference is that we are going to configure a different Transport implementation for our messages. In this case, we will use the MQTT transport method. MQTT is a network protocol very popular for IoT applications because of its lightweight.

We are going to split the code and create one script for sending messages with a publisher and a different one for receiving. This will enable us to start the two examples at the same time from different windows, or potentially from different machines!

First the publisher example:

import time

from compas_eve import Publisher
from compas_eve import Topic
from compas_eve.mqtt import MqttTransport

topic = Topic("/compas_eve/hello_world/")
tx = MqttTransport("broker.hivemq.com")

publisher = Publisher(topic, transport=tx)

for i in range(20):
    msg = dict(text=f"Hello world #{i}")
    print(f"Publishing message: {msg}")
    publisher.publish(msg)
    time.sleep(1)

And now the subscriber example:

import time

from compas_eve import Subscriber
from compas_eve import Topic
from compas_eve.mqtt import MqttTransport

topic = Topic("/compas_eve/hello_world/")
tx = MqttTransport("broker.hivemq.com")

subcriber = Subscriber(topic, callback=lambda msg: print(f"Received message: {msg}"), transport=tx)
subcriber.subscribe()

print("Waiting for messages, press CTRL+C to cancel")
while True:
    time.sleep(1)

You can start both programs in two completely different terminal windows, or even completely different computers and they will be able to communicate!

And since pub/sub allows any number of publishers and any number of subscriber per topic, you can start the same scripts more than once and the messages will be received and send multiple times!

Add typing information to messages

So far, we have defined our messages as simple dictionaries. It is also possible to define a class that messages need to comform to, in order to get typing information on the messages.

import compas_eve as eve


# Create a custom message type
class CustomMessage(eve.Message):
    def __init__(self, value=None):
        super(CustomMessage, self).__init__()
        self["value"] = value


# Define a default transport using MQTT
from compas_eve.mqtt import MqttTransport

tx = MqttTransport("broker.hivemq.com")
eve.set_default_transport(tx)

# Create publisher and subscriber (using the EchoSubscriber for simplicity)
topic = eve.Topic("/hello_world/", message_type=CustomMessage)
pub = eve.Publisher(topic)
sub = eve.EchoSubscriber(topic)
sub.subscribe()

# Starting publishing messages
import time

for i in range(10):
    pub.publish(CustomMessage(i))
    time.sleep(1)

This example also shows how to set a default transport so that it does not need to be specified on every publisher and subscriber instance.