#[doc(hidden)]
#[macro_export]
macro_rules! flow_watermarks {
(($($rs:ident),+), ($($ws:ident),+)) => {
$crate::add_watermark_callback!(($($rs.add_state(())),+), ($($ws),+), (|timestamp, $($rs),+, $($ws),+| {
$(
match $ws.send(Message::new_watermark(timestamp.clone())) {
Ok(_) => (),
Err(_) => eprintln!("Error flowing watermark"),
}
)+
}));
};
(($($rs:ident),+), ()) => ();
((), ($($ws:ident),+)) => ();
((), ()) => ();
}
#[doc(hidden)]
#[macro_export]
macro_rules! make_operator {
($t:ty, $config:expr, ($($rs:ident),+), ($($ws:ident),*)) => {
<$t>::new($config.clone(), $($rs.clone()),+, $($ws.clone()),*)
};
($t:ty, $config:expr, (), ($($ws:ident),+)) => {
<$t>::new($config.clone(), $($ws.clone()),*)
};
($t:ty, $config:expr, (), ()) => {
<$t>::new($config.clone())
};
}
#[doc(hidden)]
#[macro_export]
macro_rules! make_operator_executor {
($t:ty, $config:expr, ($($rs:ident),*), ($($ws:ident),*)) => {{
$(
let $rs = ($rs.get_id());
)*
$(
let $ws = ($ws.get_id());
)*
move |channel_manager: Arc<Mutex<ChannelManager>>, control_sender: UnboundedSender<ControlMessage>, mut control_receiver: UnboundedReceiver<ControlMessage>| {
let mut op_ex_streams: Vec<Box<dyn OperatorExecutorStreamT>> = Vec::new();
$(
let $rs = {
let recv_endpoint = channel_manager.lock().unwrap().take_recv_endpoint($rs).unwrap();
let read_stream = ReadStream::from(InternalReadStream::from_endpoint(recv_endpoint, $rs));
op_ex_streams.push(
Box::new(OperatorExecutorStream::from(&read_stream))
);
read_stream
};
)*
$(
let $ws = {
let send_endpoints = channel_manager.lock().unwrap().get_send_endpoints($ws).unwrap();
WriteStream::from_endpoints(send_endpoints, $ws)
};
)*
let mut config = $config.clone();
config.node_id = channel_manager.lock().unwrap().node_id();
let flow_watermarks = config.flow_watermarks;
let logger = $crate::get_terminal_logger();
let mut op = $crate::make_operator!($t, config.clone(), ($($rs),*), ($($ws),*));
if flow_watermarks {
$crate::flow_watermarks!(($($rs),*), ($($ws),*));
}
if let Err(e) = control_sender.send(ControlMessage::OperatorInitialized(config.id)) {
slog::error!(
logger,
"Error sending OperatorInitialized message to control handler: {:?}", e
);
}
let mut op_executor = OperatorExecutor::new(op, config, op_ex_streams, control_receiver, logger);
op_executor
}
}};
}
#[doc(hidden)]
#[macro_export]
macro_rules! imports {
() => {
use std::{
cell::RefCell,
rc::Rc,
sync::{Arc, Mutex},
thread,
time::Duration,
};
use $crate::slog;
use $crate::tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
use $crate::{
self,
communication::ControlMessage,
dataflow::graph::default_graph,
dataflow::stream::{InternalReadStream, WriteStreamT},
dataflow::{Message, Operator, ReadStream, WriteStream},
node::operator_executor::{
OperatorExecutor, OperatorExecutorStream, OperatorExecutorStreamT,
},
scheduler::channel_manager::ChannelManager,
OperatorId,
};
};
}
#[doc(hidden)]
#[macro_export]
macro_rules! register {
($t:ty, $config:expr, ($($rs:ident),*), ($($ws:ident),*)) => {{
$crate::imports!();
let mut config = $config.clone();
config.id = OperatorId::new_deterministic();
let config_copy = config.clone();
if false {
let mut op = $crate::make_operator!($t, config.clone(), ($($rs),*), ($($ws),*));
Operator::run(&mut op)
}
let read_stream_ids = vec![$($rs.get_id()),*];
let write_stream_ids = vec![$($ws.get_id()),*];
let op_runner = $crate::make_operator_executor!($t, config_copy, ($($rs),*), ($($ws),*));
default_graph::add_operator(config.id, config.name.clone(), config.node_id, read_stream_ids, write_stream_ids, op_runner);
$(
default_graph::add_operator_stream(config.id, &$ws);
)*
($(ReadStream::from(&$ws)),*)
}};
}
#[macro_export]
macro_rules! connect_0_write {
($t:ty, $config:expr $(,$s:ident)*) => {{
$(
let $s = (&$s).into();
)*
<$t>::connect($(&$s),*);
$crate::register!($t, $config, ($($s),*), ())
}};
}
#[macro_export]
macro_rules! connect_1_write {
($t:ty, $config:expr $(,$s:ident)*) => {{
$(
let $s = (&$s).into();
)*
let ws = <$t>::connect($(&$s),*);
$crate::register!($t, $config, ($($s),*), (ws))
}};
}
#[macro_export]
macro_rules! connect_2_write {
($t:ty, $config:expr $(,$s:ident)*) => {{
$(
let $s = (&$s).into();
)*
let (ws1, ws2) = <$t>::connect($(&$s),*);
$crate::register!($t, $config, ($($s),*), (ws1, ws2))
}};
}
#[macro_export]
macro_rules! connect_3_write {
($t:ty, $config:expr $(,$s:ident)*) => {{
$(
let $s = (&$s).into();
)*
let (ws1, ws2, ws3) = <$t>::connect($(&$s),*);
$crate::register!($t, $config, ($($s),*), (ws1, ws2, ws3))
}};
}
#[doc(hidden)]
#[macro_export]
macro_rules! make_callback_builder {
(($rs_head:expr), (), $state:expr) => {{
use std::{cell::RefCell, rc::Rc};
Rc::new(RefCell::new($rs_head.add_state($state)))
}};
(($rs_head:expr), ($($ws:expr),*)) => {{
use std::{cell::RefCell, rc::Rc};
use $crate::dataflow::callback_builder::MultiStreamEventMaker;
let cb_builder = Rc::new(RefCell::new($rs_head));
$(
let cb_builder = cb_builder.borrow_mut().add_write_stream(&$ws);
)*
cb_builder
}};
(($($rs:expr),+), ($($ws:expr),*), $state:expr) => {{
use $crate::dataflow::callback_builder::MultiStreamEventMaker;
make_callback_builder!(($($rs),+), ($($ws),*)).borrow_mut().add_state($state)
}};
(($rs_head:expr, $($rs:expr),*), ($($ws:expr),*)) => {{
use std::{cell::RefCell, rc::Rc};
let cb_builder = Rc::new(RefCell::new($rs_head));
$(
let cb_builder = cb_builder.borrow_mut().add_read_stream(&$rs);
)*
$(
let cb_builder = cb_builder.borrow_mut().add_write_stream(&$ws);
)*
cb_builder
}};
}
#[macro_export]
macro_rules! add_watermark_callback {
(($($rs:expr),+), ($($ws:expr),*), ($($cb:expr),+), $state:expr) => (
let cb_builder = $crate::make_callback_builder!(($($rs),+), ($($ws),*), $state);
$(
cb_builder.borrow_mut().add_watermark_callback($cb);
)+
);
(($($rs:expr),+), ($($ws:expr),*), ($($cb:expr),+)) => (
let cb_builder = $crate::make_callback_builder!(($($rs),+), ($($ws),*));
$(
cb_builder.borrow_mut().add_watermark_callback($cb);
)+
);
}