1use std::cell::RefCell;
16use std::collections::{BTreeMap, VecDeque};
17use std::rc::Rc;
18
19use dfir_rs::scheduled::SubgraphId;
20use dfir_rs::scheduled::graph::Dfir;
21use get_size2::GetSize;
22
23use crate::compute::types::ErrCollector;
24use crate::repr::{self, Timestamp};
25use crate::utils::{ArrangeHandler, Arrangement};
26
27#[derive(Debug, Default)]
30pub struct DataflowState {
31    schedule_subgraph: Rc<RefCell<BTreeMap<Timestamp, VecDeque<SubgraphId>>>>,
34    as_of: Rc<RefCell<Timestamp>>,
40    err_collector: ErrCollector,
43    arrange_used: Vec<ArrangeHandler>,
46    expire_after: Option<Timestamp>,
48    last_exec_time: Option<Timestamp>,
50}
51
52impl DataflowState {
53    pub fn new_arrange(&mut self, name: Option<Vec<String>>) -> ArrangeHandler {
54        let arrange = name.map(Arrangement::new_with_name).unwrap_or_default();
55
56        let arr = ArrangeHandler::from(arrange);
57        self.arrange_used.push(
59            arr.clone_future_only()
60                .expect("No write happening at this point"),
61        );
62        arr
63    }
64
65    #[allow(clippy::swap_with_temporary)]
69    pub fn run_available_with_schedule(&mut self, df: &mut Dfir) -> bool {
70        let mut before = self
72            .schedule_subgraph
73            .borrow_mut()
74            .split_off(&(*self.as_of.borrow() + 1));
75        std::mem::swap(&mut before, &mut self.schedule_subgraph.borrow_mut());
76        for (_, v) in before {
77            for subgraph in v {
78                df.schedule_subgraph(subgraph);
79            }
80        }
81        df.run_available()
82    }
83    pub fn get_scheduler(&self) -> Scheduler {
84        Scheduler {
85            schedule_subgraph: self.schedule_subgraph.clone(),
86            cur_subgraph: Rc::new(RefCell::new(None)),
87        }
88    }
89
90    pub fn current_time_ref(&self) -> Rc<RefCell<Timestamp>> {
94        self.as_of.clone()
95    }
96
97    pub fn current_ts(&self) -> Timestamp {
98        *self.as_of.borrow()
99    }
100
101    pub fn set_current_ts(&mut self, ts: Timestamp) {
102        self.as_of.replace(ts);
103    }
104
105    pub fn get_err_collector(&self) -> ErrCollector {
106        self.err_collector.clone()
107    }
108
109    pub fn set_expire_after(&mut self, after: Option<repr::Duration>) {
110        self.expire_after = after;
111    }
112
113    pub fn expire_after(&self) -> Option<Timestamp> {
114        self.expire_after
115    }
116
117    pub fn get_state_size(&self) -> usize {
118        self.arrange_used.iter().map(|x| x.read().get_size()).sum()
119    }
120
121    pub fn set_last_exec_time(&mut self, time: Timestamp) {
122        self.last_exec_time = Some(time);
123    }
124
125    pub fn last_exec_time(&self) -> Option<Timestamp> {
126        self.last_exec_time
127    }
128}
129
130#[derive(Debug, Clone)]
131pub struct Scheduler {
132    schedule_subgraph: Rc<RefCell<BTreeMap<Timestamp, VecDeque<SubgraphId>>>>,
134    cur_subgraph: Rc<RefCell<Option<SubgraphId>>>,
135}
136
137impl Scheduler {
138    pub fn schedule_at(&self, next_run_time: Timestamp) {
139        let mut schedule_subgraph = self.schedule_subgraph.borrow_mut();
140        let subgraph = self.cur_subgraph.borrow();
141        let subgraph = subgraph.as_ref().expect("Set SubgraphId before schedule");
142        let subgraph_queue = schedule_subgraph.entry(next_run_time).or_default();
143        subgraph_queue.push_back(*subgraph);
144    }
145
146    pub fn schedule_for_arrange(&self, arrange: &Arrangement, now: Timestamp) {
147        if let Some(i) = arrange.get_next_update_time(&now) {
148            self.schedule_at(i)
149        }
150    }
151
152    pub fn set_cur_subgraph(&self, subgraph: SubgraphId) {
153        self.cur_subgraph.replace(Some(subgraph));
154    }
155}