1use std::cell::RefCell;
16use std::collections::{BTreeMap, VecDeque};
17use std::rc::Rc;
18
19use dfir_rs::scheduled::graph::Dfir;
20use dfir_rs::scheduled::SubgraphId;
21use get_size2::GetSize;
22
23use crate::compute::types::ErrCollector;
24use crate::repr::{self, Timestamp};
25use crate::utils::{ArrangeHandler, Arrangement};
26
27#[derive(Debug, Default)]
30pub struct DataflowState {
31 schedule_subgraph: Rc<RefCell<BTreeMap<Timestamp, VecDeque<SubgraphId>>>>,
34 as_of: Rc<RefCell<Timestamp>>,
40 err_collector: ErrCollector,
43 arrange_used: Vec<ArrangeHandler>,
46 expire_after: Option<Timestamp>,
48 last_exec_time: Option<Timestamp>,
50}
51
52impl DataflowState {
53 pub fn new_arrange(&mut self, name: Option<Vec<String>>) -> ArrangeHandler {
54 let arrange = name.map(Arrangement::new_with_name).unwrap_or_default();
55
56 let arr = ArrangeHandler::from(arrange);
57 self.arrange_used.push(
59 arr.clone_future_only()
60 .expect("No write happening at this point"),
61 );
62 arr
63 }
64
65 pub fn run_available_with_schedule(&mut self, df: &mut Dfir) -> bool {
69 let mut before = self
71 .schedule_subgraph
72 .borrow_mut()
73 .split_off(&(*self.as_of.borrow() + 1));
74 std::mem::swap(&mut before, &mut self.schedule_subgraph.borrow_mut());
75 for (_, v) in before {
76 for subgraph in v {
77 df.schedule_subgraph(subgraph);
78 }
79 }
80 df.run_available()
81 }
82 pub fn get_scheduler(&self) -> Scheduler {
83 Scheduler {
84 schedule_subgraph: self.schedule_subgraph.clone(),
85 cur_subgraph: Rc::new(RefCell::new(None)),
86 }
87 }
88
89 pub fn current_time_ref(&self) -> Rc<RefCell<Timestamp>> {
93 self.as_of.clone()
94 }
95
96 pub fn current_ts(&self) -> Timestamp {
97 *self.as_of.borrow()
98 }
99
100 pub fn set_current_ts(&mut self, ts: Timestamp) {
101 self.as_of.replace(ts);
102 }
103
104 pub fn get_err_collector(&self) -> ErrCollector {
105 self.err_collector.clone()
106 }
107
108 pub fn set_expire_after(&mut self, after: Option<repr::Duration>) {
109 self.expire_after = after;
110 }
111
112 pub fn expire_after(&self) -> Option<Timestamp> {
113 self.expire_after
114 }
115
116 pub fn get_state_size(&self) -> usize {
117 self.arrange_used.iter().map(|x| x.read().get_size()).sum()
118 }
119
120 pub fn set_last_exec_time(&mut self, time: Timestamp) {
121 self.last_exec_time = Some(time);
122 }
123
124 pub fn last_exec_time(&self) -> Option<Timestamp> {
125 self.last_exec_time
126 }
127}
128
129#[derive(Debug, Clone)]
130pub struct Scheduler {
131 schedule_subgraph: Rc<RefCell<BTreeMap<Timestamp, VecDeque<SubgraphId>>>>,
133 cur_subgraph: Rc<RefCell<Option<SubgraphId>>>,
134}
135
136impl Scheduler {
137 pub fn schedule_at(&self, next_run_time: Timestamp) {
138 let mut schedule_subgraph = self.schedule_subgraph.borrow_mut();
139 let subgraph = self.cur_subgraph.borrow();
140 let subgraph = subgraph.as_ref().expect("Set SubgraphId before schedule");
141 let subgraph_queue = schedule_subgraph.entry(next_run_time).or_default();
142 subgraph_queue.push_back(*subgraph);
143 }
144
145 pub fn schedule_for_arrange(&self, arrange: &Arrangement, now: Timestamp) {
146 if let Some(i) = arrange.get_next_update_time(&now) {
147 self.schedule_at(i)
148 }
149 }
150
151 pub fn set_cur_subgraph(&self, subgraph: SubgraphId) {
152 self.cur_subgraph.replace(Some(subgraph));
153 }
154}