flow/compute/
state.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cell::RefCell;
16use std::collections::{BTreeMap, VecDeque};
17use std::rc::Rc;
18
19use dfir_rs::scheduled::graph::Dfir;
20use dfir_rs::scheduled::SubgraphId;
21use get_size2::GetSize;
22
23use crate::compute::types::ErrCollector;
24use crate::repr::{self, Timestamp};
25use crate::utils::{ArrangeHandler, Arrangement};
26
27/// input/output of a dataflow
28/// One `ComputeState` manage the input/output/schedule of one `Dfir`
29#[derive(Debug, Default)]
30pub struct DataflowState {
31    /// it is important to use a deque to maintain the order of subgraph here
32    /// TODO(discord9): consider dedup? Also not necessary for hydroflow itself also do dedup when schedule
33    schedule_subgraph: Rc<RefCell<BTreeMap<Timestamp, VecDeque<SubgraphId>>>>,
34    /// Frontier (in sys time) before which updates should not be emitted.
35    ///
36    /// We *must* apply it to sinks, to ensure correct outputs.
37    /// We *should* apply it to sources and imported shared state, because it improves performance.
38    /// Which means it's also the current time in temporal filter to get current correct result
39    as_of: Rc<RefCell<Timestamp>>,
40    /// error collector local to this `ComputeState`,
41    /// useful for distinguishing errors from different `Dfir`
42    err_collector: ErrCollector,
43    /// save all used arrange in this dataflow, since usually there is no delete operation
44    /// we can just keep track of all used arrange and schedule subgraph when they need to be updated
45    arrange_used: Vec<ArrangeHandler>,
46    /// the time arrangement need to be expired after a certain time in milliseconds
47    expire_after: Option<Timestamp>,
48    /// the last time each subgraph executed
49    last_exec_time: Option<Timestamp>,
50}
51
52impl DataflowState {
53    pub fn new_arrange(&mut self, name: Option<Vec<String>>) -> ArrangeHandler {
54        let arrange = name.map(Arrangement::new_with_name).unwrap_or_default();
55
56        let arr = ArrangeHandler::from(arrange);
57        // mark this arrange as used in this dataflow
58        self.arrange_used.push(
59            arr.clone_future_only()
60                .expect("No write happening at this point"),
61        );
62        arr
63    }
64
65    /// schedule all subgraph that need to run with time <= `as_of` and run_available()
66    ///
67    /// return true if any subgraph actually executed
68    pub fn run_available_with_schedule(&mut self, df: &mut Dfir) -> bool {
69        // first split keys <= as_of into another map
70        let mut before = self
71            .schedule_subgraph
72            .borrow_mut()
73            .split_off(&(*self.as_of.borrow() + 1));
74        std::mem::swap(&mut before, &mut self.schedule_subgraph.borrow_mut());
75        for (_, v) in before {
76            for subgraph in v {
77                df.schedule_subgraph(subgraph);
78            }
79        }
80        df.run_available()
81    }
82    pub fn get_scheduler(&self) -> Scheduler {
83        Scheduler {
84            schedule_subgraph: self.schedule_subgraph.clone(),
85            cur_subgraph: Rc::new(RefCell::new(None)),
86        }
87    }
88
89    /// return a handle to the current time, will update when `as_of` is updated
90    ///
91    /// so it can keep track of the current time even in a closure that is called later
92    pub fn current_time_ref(&self) -> Rc<RefCell<Timestamp>> {
93        self.as_of.clone()
94    }
95
96    pub fn current_ts(&self) -> Timestamp {
97        *self.as_of.borrow()
98    }
99
100    pub fn set_current_ts(&mut self, ts: Timestamp) {
101        self.as_of.replace(ts);
102    }
103
104    pub fn get_err_collector(&self) -> ErrCollector {
105        self.err_collector.clone()
106    }
107
108    pub fn set_expire_after(&mut self, after: Option<repr::Duration>) {
109        self.expire_after = after;
110    }
111
112    pub fn expire_after(&self) -> Option<Timestamp> {
113        self.expire_after
114    }
115
116    pub fn get_state_size(&self) -> usize {
117        self.arrange_used.iter().map(|x| x.read().get_size()).sum()
118    }
119
120    pub fn set_last_exec_time(&mut self, time: Timestamp) {
121        self.last_exec_time = Some(time);
122    }
123
124    pub fn last_exec_time(&self) -> Option<Timestamp> {
125        self.last_exec_time
126    }
127}
128
129#[derive(Debug, Clone)]
130pub struct Scheduler {
131    // this scheduler is shared with `DataflowState`, so it can schedule subgraph
132    schedule_subgraph: Rc<RefCell<BTreeMap<Timestamp, VecDeque<SubgraphId>>>>,
133    cur_subgraph: Rc<RefCell<Option<SubgraphId>>>,
134}
135
136impl Scheduler {
137    pub fn schedule_at(&self, next_run_time: Timestamp) {
138        let mut schedule_subgraph = self.schedule_subgraph.borrow_mut();
139        let subgraph = self.cur_subgraph.borrow();
140        let subgraph = subgraph.as_ref().expect("Set SubgraphId before schedule");
141        let subgraph_queue = schedule_subgraph.entry(next_run_time).or_default();
142        subgraph_queue.push_back(*subgraph);
143    }
144
145    pub fn schedule_for_arrange(&self, arrange: &Arrangement, now: Timestamp) {
146        if let Some(i) = arrange.get_next_update_time(&now) {
147            self.schedule_at(i)
148        }
149    }
150
151    pub fn set_cur_subgraph(&self, subgraph: SubgraphId) {
152        self.cur_subgraph.replace(Some(subgraph));
153    }
154}