1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
use std::marker::PhantomData;

use serde::Deserialize;

use crate::dataflow::{graph::default_graph, Data};

use super::{ReadStream, StreamId};

/// Enables loops in the dataflow.
///
/// # Example
/// ```ignore
/// let loop_stream = LoopStream::new();
/// let output_stream = erdos::connect_1_write!(MyOperator, OperatorConfig::new(), loop_stream);
/// // Makes sending on output_stream equivalent to sending on loop_stream.
/// loop_stream.set(&output_stream);
/// ```
pub struct LoopStream<D: Data>
where
    for<'a> D: Data + Deserialize<'a>,
{
    id: StreamId,
    name: String,
    phantom: PhantomData<D>,
}

impl<D> LoopStream<D>
where
    for<'a> D: Data + Deserialize<'a>,
{
    pub fn new() -> Self {
        let id = StreamId::new_deterministic();
        LoopStream::new_internal(id, id.to_string())
    }

    pub fn new_with_name(name: &str) -> Self {
        LoopStream::new_internal(StreamId::new_deterministic(), name.to_string())
    }

    fn new_internal(id: StreamId, name: String) -> Self {
        let loop_stream = Self {
            id,
            name,
            phantom: PhantomData,
        };
        default_graph::add_loop_stream(&loop_stream);
        loop_stream
    }

    pub fn get_id(&self) -> StreamId {
        self.id
    }

    pub fn get_name(&self) -> &str {
        &self.name[..]
    }

    pub fn set(&self, stream: &ReadStream<D>) {
        default_graph::add_stream_alias(self.id, stream.get_id()).unwrap();
    }
}