flow/compute/state.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::cell::RefCell;
use std::collections::{BTreeMap, VecDeque};
use std::rc::Rc;
use get_size2::GetSize;
use hydroflow::scheduled::graph::Hydroflow;
use hydroflow::scheduled::SubgraphId;
use crate::compute::types::ErrCollector;
use crate::repr::{self, Timestamp};
use crate::utils::{ArrangeHandler, Arrangement};
/// input/output of a dataflow
/// One `ComputeState` manage the input/output/schedule of one `Hydroflow`
#[derive(Debug, Default)]
pub struct DataflowState {
/// it is important to use a deque to maintain the order of subgraph here
/// TODO(discord9): consider dedup? Also not necessary for hydroflow itself also do dedup when schedule
schedule_subgraph: Rc<RefCell<BTreeMap<Timestamp, VecDeque<SubgraphId>>>>,
/// Frontier (in sys time) before which updates should not be emitted.
///
/// We *must* apply it to sinks, to ensure correct outputs.
/// We *should* apply it to sources and imported shared state, because it improves performance.
/// Which means it's also the current time in temporal filter to get current correct result
as_of: Rc<RefCell<Timestamp>>,
/// error collector local to this `ComputeState`,
/// useful for distinguishing errors from different `Hydroflow`
err_collector: ErrCollector,
/// save all used arrange in this dataflow, since usually there is no delete operation
/// we can just keep track of all used arrange and schedule subgraph when they need to be updated
arrange_used: Vec<ArrangeHandler>,
/// the time arrangement need to be expired after a certain time in milliseconds
expire_after: Option<Timestamp>,
/// the last time each subgraph executed
last_exec_time: Option<Timestamp>,
}
impl DataflowState {
pub fn new_arrange(&mut self, name: Option<Vec<String>>) -> ArrangeHandler {
let arrange = name.map(Arrangement::new_with_name).unwrap_or_default();
let arr = ArrangeHandler::from(arrange);
// mark this arrange as used in this dataflow
self.arrange_used.push(
arr.clone_future_only()
.expect("No write happening at this point"),
);
arr
}
/// schedule all subgraph that need to run with time <= `as_of` and run_available()
///
/// return true if any subgraph actually executed
pub fn run_available_with_schedule(&mut self, df: &mut Hydroflow) -> bool {
// first split keys <= as_of into another map
let mut before = self
.schedule_subgraph
.borrow_mut()
.split_off(&(*self.as_of.borrow() + 1));
std::mem::swap(&mut before, &mut self.schedule_subgraph.borrow_mut());
for (_, v) in before {
for subgraph in v {
df.schedule_subgraph(subgraph);
}
}
df.run_available()
}
pub fn get_scheduler(&self) -> Scheduler {
Scheduler {
schedule_subgraph: self.schedule_subgraph.clone(),
cur_subgraph: Rc::new(RefCell::new(None)),
}
}
/// return a handle to the current time, will update when `as_of` is updated
///
/// so it can keep track of the current time even in a closure that is called later
pub fn current_time_ref(&self) -> Rc<RefCell<Timestamp>> {
self.as_of.clone()
}
pub fn current_ts(&self) -> Timestamp {
*self.as_of.borrow()
}
pub fn set_current_ts(&mut self, ts: Timestamp) {
self.as_of.replace(ts);
}
pub fn get_err_collector(&self) -> ErrCollector {
self.err_collector.clone()
}
pub fn set_expire_after(&mut self, after: Option<repr::Duration>) {
self.expire_after = after;
}
pub fn expire_after(&self) -> Option<Timestamp> {
self.expire_after
}
pub fn get_state_size(&self) -> usize {
self.arrange_used.iter().map(|x| x.read().get_size()).sum()
}
pub fn set_last_exec_time(&mut self, time: Timestamp) {
self.last_exec_time = Some(time);
}
pub fn last_exec_time(&self) -> Option<Timestamp> {
self.last_exec_time
}
}
#[derive(Debug, Clone)]
pub struct Scheduler {
// this scheduler is shared with `DataflowState`, so it can schedule subgraph
schedule_subgraph: Rc<RefCell<BTreeMap<Timestamp, VecDeque<SubgraphId>>>>,
cur_subgraph: Rc<RefCell<Option<SubgraphId>>>,
}
impl Scheduler {
pub fn schedule_at(&self, next_run_time: Timestamp) {
let mut schedule_subgraph = self.schedule_subgraph.borrow_mut();
let subgraph = self.cur_subgraph.borrow();
let subgraph = subgraph.as_ref().expect("Set SubgraphId before schedule");
let subgraph_queue = schedule_subgraph.entry(next_run_time).or_default();
subgraph_queue.push_back(*subgraph);
}
pub fn schedule_for_arrange(&self, arrange: &Arrangement, now: Timestamp) {
if let Some(i) = arrange.get_next_update_time(&now) {
self.schedule_at(i)
}
}
pub fn set_cur_subgraph(&self, subgraph: SubgraphId) {
self.cur_subgraph.replace(Some(subgraph));
}
}