flow/compute/
state.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cell::RefCell;
16use std::collections::{BTreeMap, VecDeque};
17use std::rc::Rc;
18
19use dfir_rs::scheduled::graph::Dfir;
20use dfir_rs::scheduled::SubgraphId;
21use get_size2::GetSize;
22
23use crate::compute::types::ErrCollector;
24use crate::repr::{self, Timestamp};
25use crate::utils::{ArrangeHandler, Arrangement};
26
27/// input/output of a dataflow
28/// One `ComputeState` manage the input/output/schedule of one `Dfir`
29#[derive(Debug, Default)]
30pub struct DataflowState {
31    /// it is important to use a deque to maintain the order of subgraph here
32    /// TODO(discord9): consider dedup? Also not necessary for hydroflow itself also do dedup when schedule
33    schedule_subgraph: Rc<RefCell<BTreeMap<Timestamp, VecDeque<SubgraphId>>>>,
34    /// Frontier (in sys time) before which updates should not be emitted.
35    ///
36    /// We *must* apply it to sinks, to ensure correct outputs.
37    /// We *should* apply it to sources and imported shared state, because it improves performance.
38    /// Which means it's also the current time in temporal filter to get current correct result
39    as_of: Rc<RefCell<Timestamp>>,
40    /// error collector local to this `ComputeState`,
41    /// useful for distinguishing errors from different `Dfir`
42    err_collector: ErrCollector,
43    /// save all used arrange in this dataflow, since usually there is no delete operation
44    /// we can just keep track of all used arrange and schedule subgraph when they need to be updated
45    arrange_used: Vec<ArrangeHandler>,
46    /// the time arrangement need to be expired after a certain time in milliseconds
47    expire_after: Option<Timestamp>,
48    /// the last time each subgraph executed
49    last_exec_time: Option<Timestamp>,
50}
51
52impl DataflowState {
53    pub fn new_arrange(&mut self, name: Option<Vec<String>>) -> ArrangeHandler {
54        let arrange = name.map(Arrangement::new_with_name).unwrap_or_default();
55
56        let arr = ArrangeHandler::from(arrange);
57        // mark this arrange as used in this dataflow
58        self.arrange_used.push(
59            arr.clone_future_only()
60                .expect("No write happening at this point"),
61        );
62        arr
63    }
64
65    /// schedule all subgraph that need to run with time <= `as_of` and run_available()
66    ///
67    /// return true if any subgraph actually executed
68    #[allow(clippy::swap_with_temporary)]
69    pub fn run_available_with_schedule(&mut self, df: &mut Dfir) -> bool {
70        // first split keys <= as_of into another map
71        let mut before = self
72            .schedule_subgraph
73            .borrow_mut()
74            .split_off(&(*self.as_of.borrow() + 1));
75        std::mem::swap(&mut before, &mut self.schedule_subgraph.borrow_mut());
76        for (_, v) in before {
77            for subgraph in v {
78                df.schedule_subgraph(subgraph);
79            }
80        }
81        df.run_available()
82    }
83    pub fn get_scheduler(&self) -> Scheduler {
84        Scheduler {
85            schedule_subgraph: self.schedule_subgraph.clone(),
86            cur_subgraph: Rc::new(RefCell::new(None)),
87        }
88    }
89
90    /// return a handle to the current time, will update when `as_of` is updated
91    ///
92    /// so it can keep track of the current time even in a closure that is called later
93    pub fn current_time_ref(&self) -> Rc<RefCell<Timestamp>> {
94        self.as_of.clone()
95    }
96
97    pub fn current_ts(&self) -> Timestamp {
98        *self.as_of.borrow()
99    }
100
101    pub fn set_current_ts(&mut self, ts: Timestamp) {
102        self.as_of.replace(ts);
103    }
104
105    pub fn get_err_collector(&self) -> ErrCollector {
106        self.err_collector.clone()
107    }
108
109    pub fn set_expire_after(&mut self, after: Option<repr::Duration>) {
110        self.expire_after = after;
111    }
112
113    pub fn expire_after(&self) -> Option<Timestamp> {
114        self.expire_after
115    }
116
117    pub fn get_state_size(&self) -> usize {
118        self.arrange_used.iter().map(|x| x.read().get_size()).sum()
119    }
120
121    pub fn set_last_exec_time(&mut self, time: Timestamp) {
122        self.last_exec_time = Some(time);
123    }
124
125    pub fn last_exec_time(&self) -> Option<Timestamp> {
126        self.last_exec_time
127    }
128}
129
130#[derive(Debug, Clone)]
131pub struct Scheduler {
132    // this scheduler is shared with `DataflowState`, so it can schedule subgraph
133    schedule_subgraph: Rc<RefCell<BTreeMap<Timestamp, VecDeque<SubgraphId>>>>,
134    cur_subgraph: Rc<RefCell<Option<SubgraphId>>>,
135}
136
137impl Scheduler {
138    pub fn schedule_at(&self, next_run_time: Timestamp) {
139        let mut schedule_subgraph = self.schedule_subgraph.borrow_mut();
140        let subgraph = self.cur_subgraph.borrow();
141        let subgraph = subgraph.as_ref().expect("Set SubgraphId before schedule");
142        let subgraph_queue = schedule_subgraph.entry(next_run_time).or_default();
143        subgraph_queue.push_back(*subgraph);
144    }
145
146    pub fn schedule_for_arrange(&self, arrange: &Arrangement, now: Timestamp) {
147        if let Some(i) = arrange.get_next_update_time(&now) {
148            self.schedule_at(i)
149        }
150    }
151
152    pub fn set_cur_subgraph(&self, subgraph: SubgraphId) {
153        self.cur_subgraph.replace(Some(subgraph));
154    }
155}