1use std::cell::RefCell;
16use std::collections::{BTreeMap, VecDeque};
17use std::rc::Rc;
18
19use dfir_rs::scheduled::graph::Dfir;
20use dfir_rs::scheduled::SubgraphId;
21use get_size2::GetSize;
22
23use crate::compute::types::ErrCollector;
24use crate::repr::{self, Timestamp};
25use crate::utils::{ArrangeHandler, Arrangement};
26
27#[derive(Debug, Default)]
30pub struct DataflowState {
31 schedule_subgraph: Rc<RefCell<BTreeMap<Timestamp, VecDeque<SubgraphId>>>>,
34 as_of: Rc<RefCell<Timestamp>>,
40 err_collector: ErrCollector,
43 arrange_used: Vec<ArrangeHandler>,
46 expire_after: Option<Timestamp>,
48 last_exec_time: Option<Timestamp>,
50}
51
52impl DataflowState {
53 pub fn new_arrange(&mut self, name: Option<Vec<String>>) -> ArrangeHandler {
54 let arrange = name.map(Arrangement::new_with_name).unwrap_or_default();
55
56 let arr = ArrangeHandler::from(arrange);
57 self.arrange_used.push(
59 arr.clone_future_only()
60 .expect("No write happening at this point"),
61 );
62 arr
63 }
64
65 #[allow(clippy::swap_with_temporary)]
69 pub fn run_available_with_schedule(&mut self, df: &mut Dfir) -> bool {
70 let mut before = self
72 .schedule_subgraph
73 .borrow_mut()
74 .split_off(&(*self.as_of.borrow() + 1));
75 std::mem::swap(&mut before, &mut self.schedule_subgraph.borrow_mut());
76 for (_, v) in before {
77 for subgraph in v {
78 df.schedule_subgraph(subgraph);
79 }
80 }
81 df.run_available()
82 }
83 pub fn get_scheduler(&self) -> Scheduler {
84 Scheduler {
85 schedule_subgraph: self.schedule_subgraph.clone(),
86 cur_subgraph: Rc::new(RefCell::new(None)),
87 }
88 }
89
90 pub fn current_time_ref(&self) -> Rc<RefCell<Timestamp>> {
94 self.as_of.clone()
95 }
96
97 pub fn current_ts(&self) -> Timestamp {
98 *self.as_of.borrow()
99 }
100
101 pub fn set_current_ts(&mut self, ts: Timestamp) {
102 self.as_of.replace(ts);
103 }
104
105 pub fn get_err_collector(&self) -> ErrCollector {
106 self.err_collector.clone()
107 }
108
109 pub fn set_expire_after(&mut self, after: Option<repr::Duration>) {
110 self.expire_after = after;
111 }
112
113 pub fn expire_after(&self) -> Option<Timestamp> {
114 self.expire_after
115 }
116
117 pub fn get_state_size(&self) -> usize {
118 self.arrange_used.iter().map(|x| x.read().get_size()).sum()
119 }
120
121 pub fn set_last_exec_time(&mut self, time: Timestamp) {
122 self.last_exec_time = Some(time);
123 }
124
125 pub fn last_exec_time(&self) -> Option<Timestamp> {
126 self.last_exec_time
127 }
128}
129
130#[derive(Debug, Clone)]
131pub struct Scheduler {
132 schedule_subgraph: Rc<RefCell<BTreeMap<Timestamp, VecDeque<SubgraphId>>>>,
134 cur_subgraph: Rc<RefCell<Option<SubgraphId>>>,
135}
136
137impl Scheduler {
138 pub fn schedule_at(&self, next_run_time: Timestamp) {
139 let mut schedule_subgraph = self.schedule_subgraph.borrow_mut();
140 let subgraph = self.cur_subgraph.borrow();
141 let subgraph = subgraph.as_ref().expect("Set SubgraphId before schedule");
142 let subgraph_queue = schedule_subgraph.entry(next_run_time).or_default();
143 subgraph_queue.push_back(*subgraph);
144 }
145
146 pub fn schedule_for_arrange(&self, arrange: &Arrangement, now: Timestamp) {
147 if let Some(i) = arrange.get_next_update_time(&now) {
148 self.schedule_at(i)
149 }
150 }
151
152 pub fn set_cur_subgraph(&self, subgraph: SubgraphId) {
153 self.cur_subgraph.replace(Some(subgraph));
154 }
155}