Examples
Note
This tutorial assumes that you have already installed compas_eve
.
If you haven’t, please follow the instructions in the installation section.
The main feature of compas_eve
is to allow communication between different
parts of a program using messages. These messages are sent around using a
publisher/subscriber model, or pub/sub for short. In pub/sub communication,
messages are not sent directly from a sender to a receiver, instead, they are
sent to a Topic
. A topic is like a mailbox, the Publisher
sends messages to the topic without the need for a subcriber to be
actively listening for messages, and also the Subscriber
can start
listening for messages on a topic without the need for any publisher to be
currently sending anything.
This creates a highly decoupled communication model that facilitates the creation of complex software with simple code.
An additional benefit of pub/sub is that it is not limited to 1-to-1 communication: on a single topic, there can be multiple publishers and multiple subscribers all communicating at the same time without additional coordination.
Hello World
Let’s see a Hello World example of this type of communication using compas_eve
.
This example is very contrived because both the publisher and the subscriber are defined
in the same script and the same thread.
import time
from compas_eve import Publisher
from compas_eve import Subscriber
from compas_eve import Topic
topic = Topic("/compas_eve/hello_world/")
publisher = Publisher(topic)
subcriber = Subscriber(topic, callback=lambda msg: print(f"Received message: {msg}"))
subcriber.subscribe()
for i in range(20):
msg = dict(text=f"Hello world #{i}")
print(f"Publishing message: {msg}")
publisher.publish(msg)
time.sleep(1)
This example is the simplest possible, and it only shows the main concepts needed
to communicate. In particular, compas_eve
uses by default an in-memory transport
for the messages, this means that messages are can only be received within the same program.
Hello Threaded World
Let’s try to extend this first example and add multiple threads to illustrate multi-threaded communication:
import time
from threading import Thread
from compas_eve import Publisher
from compas_eve import Subscriber
from compas_eve import Topic
topic = Topic("/compas_eve/hello_world/")
def start_publisher():
publisher = Publisher(topic)
for i in range(20):
msg = dict(text=f"Hello world #{i}")
print(f"Publishing message: {msg}")
publisher.publish(msg)
time.sleep(1)
def start_subscriber():
subcriber = Subscriber(topic, callback=lambda msg: print(f"Received message: {msg}"))
subcriber.subscribe()
# Define one thread for each
t1 = Thread(target=start_subscriber)
t2 = Thread(target=start_publisher)
# Start both threads
t1.start()
t2.start()
# Wait until both threads complete
t1.join()
t2.join()
This get more interesting! Now the publisher and subscriber are in separate threads. However, the in-memory transport is limited to same-process. This means that if we launch this script twice, the messages will not jump from one process to the other. In other words, if we want to communicate with a subscriber on a different process on the machine, or even on a completely separate machine, we need to take an extra step.
Hello Distributed World
Fortunately, it is very easy to extend our example and enable communication across processes, machines, networks, continents, and anything that is connected to the Internet!
The only difference is that we are going to configure a different Transport
implementation for
our messages. In this case, we will use the MQTT transport method. MQTT
is a network protocol very popular for IoT applications because of its lightweight.
We are going to split the code and create one script for sending messages with a publisher and a different one for receiving. This will enable us to start the two examples at the same time from different windows, or potentially from different machines!
First the publisher example:
import time
from compas_eve import Publisher
from compas_eve import Topic
from compas_eve.mqtt import MqttTransport
topic = Topic("/compas_eve/hello_world/")
tx = MqttTransport("broker.hivemq.com")
publisher = Publisher(topic, transport=tx)
for i in range(20):
msg = dict(text=f"Hello world #{i}")
print(f"Publishing message: {msg}")
publisher.publish(msg)
time.sleep(1)
And now the subscriber example:
import time
from compas_eve import Subscriber
from compas_eve import Topic
from compas_eve.mqtt import MqttTransport
topic = Topic("/compas_eve/hello_world/")
tx = MqttTransport("broker.hivemq.com")
subcriber = Subscriber(topic, callback=lambda msg: print(f"Received message: {msg}"), transport=tx)
subcriber.subscribe()
print("Waiting for messages, press CTRL+C to cancel")
while True:
time.sleep(1)
You can start both programs in two completely different terminal windows, or even completely different computers and they will be able to communicate!
And since pub/sub allows any number of publishers and any number of subscriber per topic, you can start the same scripts more than once and the messages will be received and send multiple times!
Add typing information to messages
So far, we have defined our messages as simple dictionaries. It is also possible to define a class that messages need to comform to, in order to get typing information on the messages.
import compas_eve as eve
# Create a custom message type
class CustomMessage(eve.Message):
def __init__(self, value=None):
super(CustomMessage, self).__init__()
self["value"] = value
# Define a default transport using MQTT
from compas_eve.mqtt import MqttTransport
tx = MqttTransport("broker.hivemq.com")
eve.set_default_transport(tx)
# Create publisher and subscriber (using the EchoSubscriber for simplicity)
topic = eve.Topic("/hello_world/", message_type=CustomMessage)
pub = eve.Publisher(topic)
sub = eve.EchoSubscriber(topic)
sub.subscribe()
# Starting publishing messages
import time
for i in range(10):
pub.publish(CustomMessage(i))
time.sleep(1)
This example also shows how to set a default transport so that it does not need to be specified on every publisher and subscriber instance.