Files
abomonation
abomonation_derive
ansi_term
async_trait
atty
bincode
bitflags
byteorder
bytes
cfg_if
chrono
clap
dirs
dirs_sys
erdos
fixedbitset
fnv
futures
futures_channel
futures_core
futures_executor
futures_io
futures_macro
futures_sink
futures_task
futures_util
async_await
future
io
lock
sink
stream
task
indexmap
iovec
lazy_static
libc
log
memchr
mio
net2
num_cpus
num_integer
num_traits
petgraph
pin_project_lite
pin_utils
proc_macro2
proc_macro_hack
proc_macro_nested
quote
rand
rand_chacha
rand_core
rand_hc
rand_isaac
rand_jitter
rand_os
rand_pcg
rand_xorshift
serde
serde_derive
sha1
slab
slog
slog_term
strsim
syn
synstructure
term
textwrap
thread_local
time
tokio
future
io
loom
macros
net
park
runtime
stream
sync
task
time
util
tokio_macros
tokio_serde
tokio_serde_bincode
tokio_util
unicode_width
unicode_xid
uuid
vec_map
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
use std::collections::HashMap;
use tokio::sync::mpsc::UnboundedSender;

use crate::{
    communication::{InterProcessMessage, PusherT},
    dataflow::stream::StreamId,
    node::NodeId,
};

/// Wrapper used to update pushers in the TCP receiving.
///
/// Stores `mpsc::Sender`s to receivers on which `PusherT` can be sent to inform
/// the receivers that data should be sent to ne operators.
pub struct ChannelsToReceivers {
    // We do not use a tokio::mpsc::UnboundedSender because that only provides a blocking API.
    // It does not allow us to just check if the channel has a new message. We need this API in
    // the receivers, which regularly check if there are new pushers available.
    senders: Vec<UnboundedSender<(StreamId, Box<dyn PusherT>)>>,
}

impl ChannelsToReceivers {
    pub fn new() -> Self {
        ChannelsToReceivers {
            senders: Vec::new(),
        }
    }

    /// Adds a `mpsc::Sender` to a new receiver thread.
    pub fn add_sender(&mut self, sender: UnboundedSender<(StreamId, Box<dyn PusherT>)>) {
        self.senders.push(sender);
    }

    /// Updates the receivers about the existance of a new operator.
    ///
    /// It sends a `PusherT` to message on all receiving threads.
    pub fn send(&mut self, stream_id: StreamId, pusher: Box<dyn PusherT>) {
        for sender in self.senders.iter_mut() {
            let msg = (stream_id.clone(), pusher.clone());
            sender.send(msg).unwrap();
        }
    }
}

/// Wrapper used to store mappings between node ids and `mpsc::UnboundedSender` to sender threads.
pub struct ChannelsToSenders {
    /// The ith sender corresponds to a TCP connection to the ith node.
    senders: HashMap<NodeId, UnboundedSender<InterProcessMessage>>,
}

impl ChannelsToSenders {
    pub fn new() -> Self {
        ChannelsToSenders {
            senders: HashMap::new(),
        }
    }

    /// Adds a `mpsc::UnboundedSender` to a node.
    pub fn add_sender(
        &mut self,
        node_id: NodeId,
        sender: tokio::sync::mpsc::UnboundedSender<InterProcessMessage>,
    ) {
        self.senders.insert(node_id, sender);
    }

    /// Returns the associated `mpsc::UnboundedSender` for a given node.
    pub fn clone_channel(
        &self,
        node_id: NodeId,
    ) -> Option<tokio::sync::mpsc::UnboundedSender<InterProcessMessage>> {
        self.senders.get(&node_id).map(|c| c.clone())
    }
}