Streams

Streams are used to send data in ERDOS applications.

ERDOS streams are similar to ROS topics, but have a few additional desirable properties. Streams facilitate one-to-many communication, so only 1 operator sends messages on a stream. ERDOS broadcasts messages sent on a stream to all connected operators. In addition, streams are typed when using the Rust API.

Streams expose 2 classes of interfaces that access the underlying stream:

  1. Read-interfaces expose methods to receive and process data. They allow pulling data by calling read() and try_read(). Often, they also support a push data model accessed by registering callbacks (e.g. add_callback and add_watermark_callback). Structures that implement read interfaces include:

  • ReadStream: used by operators to read data and register callbacks.

  • ExtractStream: used by the driver to read data.

  1. Write-interfaces expose the send method to send data on a stream. Structures that implement write interfaces include:

Some applications may want to introduce loops in their dataflow graphs which is possible using the LoopStream.

Sending Messages

Operators use Write Streams to send data.

class erdos.WriteStream(_py_write_stream=None)

Sends data and invokes callbacks when messages are received.

Handlese serialization of messages, and wraps an internal.PyWriteStream.

_py_write_stream is set during erdos.run(), and should never be set manually.

send(msg)

Sends a message on the stream.

Parameters

msg (Message) – the message to send. This may be a Watermark or a Message.

Receiving Messages

Operators receive data by registering callbacks or manually reading messages from Read Streams.

Callbacks are functions which take an ERDOS message and any necessary write streams as arguments. Generally, callbacks process received messages and publish the results on write streams.

class erdos.ReadStream(_py_read_stream=None)

Reads data and invokes callbacks when messages are received.

Handles deserialization of messages, and wraps an internal.PyReadStream.

Currently, no callbacks are invoked while Operator.run is executing.

_py_read_stream is set during erdos.run(), and should never be set manually.

read()

Blocks until a message is read from the stream.

try_read()

Tries to read a mesage from the stream.

Returns None if no messages are available at the moment.

add_callback(callback, write_streams=None)

Adds a callback to the stream.

Parameters
  • callback (Message, list of WriteStream -> None) – callback that takes a message.

  • write_streams (list of WriteStream) – write streams passed to the callback.

add_watermark_callback(callback, write_streams=None)

Adds a watermark callback to the stream.

Parameters
  • callback (Message, list of WriteStream -> None) – callback that takes a message.

  • write_streams (list of WriteStream) – write streams passed to the callback.

Ingesting and Extracting Data

Some applications have trouble placing all of the data processing logic inside operators. For these applications, ERDOS provides special stream interfaces to ingest and extract data.

A comprehensive example is available here.

class erdos.IngestStream

Used to send messages from outside of operators.

send(msg)

Sends a message on the stream.

Parameters

msg (Message) – the message to send. This may be a Watermark or a Message.

class erdos.ExtractStream(read_stream)

Used to receive messages outside of an operator.

Parameters

read_stream (ReadStream) – the stream from which to read messages.

read()

Blocks until a message is read from the stream.

try_read()

Tries to read a mesage from the stream.

Returns None if no messages are available at the moment.

Loops

Certain applications require feedback in the dataflow. To support this use case, ERDOS provides the LoopStream interface to support loops in the dataflow.

A comprehensive example is available here.

class erdos.LoopStream

Stream placeholder used to construct loops in the dataflow graph.

Must call set on a ReadStream to complete the loop.