Operators¶
An ERDOS operator receives data on ReadStreams
,
and sends processed data on WriteStreams
.
We provide a standard library of operators for common dataflow patterns
under erdos.operators
.
While the standard operators are general and versatile, some applications may
implement custom operators to better optimize performance and take
fine-grained control over exection.
All operators must inherit from the Operator
base class and
implement __init__()
and
connect()
methods.
__init__()
takes allReadStreams
from which the operator receives data, allWriteStreams
on which the operator sends data, and any other arguments passed when callingconnect()
. Within__init__()
, the state should be initialized, and callbacks may be registered acrossReadStreams
.The
connect()
method takesReadStreams
and returnsWriteStreams
which are all later passed to__init__()
by ERDOS. TheReadStreams
andWriteStreams
must appear in the same order as in__init__()
.
While ERDOS manages the execution of callbacks, some operators require
more finegrained control. Operators can take manual control over the
thread of execution by implementing
Operator.run()
,
and pulling data from ReadStreams
.
Callbacks are not invoked while run executes.
Operator API¶
-
class
erdos.
Operator
(*args, **kwargs)¶ Operator abstract base class.
Inherit from this class when creating an operator.
-
__init__
(*streams)¶ Instantiates the operator.
ERDOS will pass read streams followed by write streams as arguments, matching the read streams and write streams in connect().
Invoked automatically during erdos.run().
ERDOS operators never need to call super().__init__() because setup is handled by the ERDOS backend code.
-
static
connect
(*read_streams)¶ Connects the operator to its read streams and returns its write streams.
This method should return all write streams it intends to use.
Invoked automatically during erdos.connect.
-
run
()¶ Runs the operator.
Invoked automaticaly during erdos.run().
-
property
id
¶ Returns the operator’s ID.
-
property
config
¶ Returns the operator’s config.
-
-
class
erdos.
OperatorConfig
(name=None, flow_watermarks=True, log_file_name=None, csv_log_file_name=None, profile_file_name=None)¶ Configuration details required by ERDOS Operators.
-
property
name
¶ Name of the operator.
-
property
flow_watermarks
¶ Whether to automatically pass on the low watermark.
-
property
log_file_name
¶ File name used for logging.
-
property
csv_log_file_name
¶ File name used for logging to CSV.
-
property
profile_file_name
¶ File named used for profiling an operator’s performance.
-
property
Examples¶
Full example at python/examples/simple_pipeline.py.
Periodically Publishing Data¶
class SendOp(erdos.Operator):
def __init__(self, write_stream):
self.write_stream = write_stream
@staticmethod
def connect():
return [erdos.WriteStream()]
def run(self):
count = 0
while True:
msg = erdos.Message(erdos.Timestamp(coordinates=[count]), count)
print("SendOp: sending {msg}".format(msg=msg))
self.write_stream.send(msg)
count += 1
time.sleep(1)
Processing Data via Callbacks¶
class CallbackOp(erdos.Operator):
def __init__(self, read_stream):
print("initializing op")
read_stream.add_callback(CallbackOp.callback)
@staticmethod
def callback(msg):
print("CallbackOp: received {msg}".format(msg=msg))
@staticmethod
def connect(read_streams):
return []
Processing Data by Pulling Messages¶
class PullOp(erdos.Operator):
def __init__(self, read_stream):
self.read_stream = read_stream
@staticmethod
def connect(read_streams):
return []
def run(self):
while True:
data = self.read_stream.read()
print("PullOp: received {data}".format(data=data))