Operators

An ERDOS operator receives data on ReadStreams, and sends processed data on WriteStreams. We provide a standard library of operators for common dataflow patterns under erdos.operators. While the standard operators are general and versatile, some applications may implement custom operators to better optimize performance and take fine-grained control over exection.

All operators must inherit from the Operator base class and implement __init__() and connect() methods.

While ERDOS manages the execution of callbacks, some operators require more finegrained control. Operators can take manual control over the thread of execution by implementing Operator.run(), and pulling data from ReadStreams. Callbacks are not invoked while run executes.

Operator API

class erdos.Operator(*args, **kwargs)

Operator abstract base class.

Inherit from this class when creating an operator.

__init__(*streams)

Instantiates the operator.

ERDOS will pass read streams followed by write streams as arguments, matching the read streams and write streams in connect().

Invoked automatically during erdos.run().

ERDOS operators never need to call super().__init__() because setup is handled by the ERDOS backend code.

static connect(*read_streams)

Connects the operator to its read streams and returns its write streams.

This method should return all write streams it intends to use.

Invoked automatically during erdos.connect.

run()

Runs the operator.

Invoked automaticaly during erdos.run().

property id

Returns the operator’s ID.

property config

Returns the operator’s config.

class erdos.OperatorConfig(name=None, flow_watermarks=True, log_file_name=None, csv_log_file_name=None, profile_file_name=None)

Configuration details required by ERDOS Operators.

property name

Name of the operator.

property flow_watermarks

Whether to automatically pass on the low watermark.

property log_file_name

File name used for logging.

property csv_log_file_name

File name used for logging to CSV.

property profile_file_name

File named used for profiling an operator’s performance.

Examples

Full example at python/examples/simple_pipeline.py.

Periodically Publishing Data

class SendOp(erdos.Operator):
    def __init__(self, write_stream):
        self.write_stream = write_stream

    @staticmethod
    def connect():
        return [erdos.WriteStream()]

    def run(self):
        count = 0
        while True:
            msg = erdos.Message(erdos.Timestamp(coordinates=[count]), count)
            print("SendOp: sending {msg}".format(msg=msg))
            self.write_stream.send(msg)

            count += 1
            time.sleep(1)

Processing Data via Callbacks

class CallbackOp(erdos.Operator):
    def __init__(self, read_stream):
        print("initializing  op")
        read_stream.add_callback(CallbackOp.callback)

    @staticmethod
    def callback(msg):
        print("CallbackOp: received {msg}".format(msg=msg))

    @staticmethod
    def connect(read_streams):
        return []

Processing Data by Pulling Messages

class PullOp(erdos.Operator):
    def __init__(self, read_stream):
        self.read_stream = read_stream

    @staticmethod
    def connect(read_streams):
        return []

    def run(self):
        while True:
            data = self.read_stream.read()
            print("PullOp: received {data}".format(data=data))