Files
abomonation
abomonation_derive
ansi_term
async_trait
atty
bincode
bitflags
byteorder
bytes
cfg_if
chrono
clap
dirs
dirs_sys
erdos
fixedbitset
fnv
futures
futures_channel
futures_core
futures_executor
futures_io
futures_macro
futures_sink
futures_task
futures_util
async_await
future
io
lock
sink
stream
task
indexmap
iovec
lazy_static
libc
log
memchr
mio
net2
num_cpus
num_integer
num_traits
petgraph
pin_project_lite
pin_utils
proc_macro2
proc_macro_hack
proc_macro_nested
quote
rand
rand_chacha
rand_core
rand_hc
rand_isaac
rand_jitter
rand_os
rand_pcg
rand_xorshift
serde
serde_derive
sha1
slab
slog
slog_term
strsim
syn
synstructure
term
textwrap
thread_local
time
tokio
future
io
loom
macros
net
park
runtime
stream
sync
task
time
util
tokio_macros
tokio_serde
tokio_serde_bincode
tokio_util
unicode_width
unicode_xid
uuid
vec_map
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
use std::marker::PhantomData;

use serde::Deserialize;

use crate::dataflow::{graph::default_graph, Data};

use super::{ReadStream, StreamId};

/// Enables loops in the dataflow.
///
/// # Example
/// ```ignore
/// let loop_stream = LoopStream::new();
/// let output_stream = erdos::connect_1_write!(MyOperator, OperatorConfig::new(), loop_stream);
/// // Makes sending on output_stream equivalent to sending on loop_stream.
/// loop_stream.set(&output_stream);
/// ```
pub struct LoopStream<D: Data>
where
    for<'a> D: Data + Deserialize<'a>,
{
    id: StreamId,
    name: String,
    phantom: PhantomData<D>,
}

impl<D> LoopStream<D>
where
    for<'a> D: Data + Deserialize<'a>,
{
    pub fn new() -> Self {
        let id = StreamId::new_deterministic();
        LoopStream::new_internal(id, id.to_string())
    }

    pub fn new_with_name(name: &str) -> Self {
        LoopStream::new_internal(StreamId::new_deterministic(), name.to_string())
    }

    fn new_internal(id: StreamId, name: String) -> Self {
        let loop_stream = Self {
            id,
            name,
            phantom: PhantomData,
        };
        default_graph::add_loop_stream(&loop_stream);
        loop_stream
    }

    pub fn get_id(&self) -> StreamId {
        self.id
    }

    pub fn get_name(&self) -> &str {
        &self.name[..]
    }

    pub fn set(&self, stream: &ReadStream<D>) {
        default_graph::add_stream_alias(self.id, stream.get_id()).unwrap();
    }
}