flow/compute/
types.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cell::RefCell;
16use std::collections::{BTreeMap, VecDeque};
17use std::rc::Rc;
18use std::sync::Arc;
19
20use common_error::ext::ErrorExt;
21use dfir_rs::scheduled::graph::Dfir;
22use dfir_rs::scheduled::handoff::TeeingHandoff;
23use dfir_rs::scheduled::port::RecvPort;
24use dfir_rs::scheduled::SubgraphId;
25use itertools::Itertools;
26use tokio::sync::Mutex;
27
28use crate::expr::{Batch, EvalError, ScalarExpr};
29use crate::metrics::METRIC_FLOW_ERRORS;
30use crate::repr::DiffRow;
31use crate::utils::ArrangeHandler;
32
33pub type Toff<T = DiffRow> = TeeingHandoff<T>;
34
35/// A collection, represent a collections of data that is received from a handoff.
36pub struct Collection<T: 'static> {
37    /// represent a stream of updates recv from this port
38    stream: RecvPort<TeeingHandoff<T>>,
39}
40
41impl<T: 'static + Clone> Collection<T> {
42    pub fn from_port(port: RecvPort<TeeingHandoff<T>>) -> Self {
43        Collection { stream: port }
44    }
45
46    /// clone a collection, require a mutable reference to the hydroflow instance
47    ///
48    /// Note: need to be the same hydroflow instance that this collection is created from
49    pub fn clone(&self, df: &mut Dfir) -> Self {
50        Collection {
51            stream: self.stream.tee(df),
52        }
53    }
54
55    pub fn into_inner(self) -> RecvPort<TeeingHandoff<T>> {
56        self.stream
57    }
58}
59
60/// Arranged is a wrapper around `ArrangeHandler` that maintain a list of readers and a writer
61pub struct Arranged {
62    pub arrangement: ArrangeHandler,
63    pub writer: Rc<RefCell<Option<SubgraphId>>>,
64    /// maintain a list of readers for the arrangement for the ease of scheduling
65    pub readers: Rc<RefCell<Vec<SubgraphId>>>,
66}
67
68impl Arranged {
69    pub fn new(arr: ArrangeHandler) -> Self {
70        Self {
71            arrangement: arr,
72            writer: Default::default(),
73            readers: Default::default(),
74        }
75    }
76
77    /// Copy it's future only updates, internally `Rc-ed` so it's cheap to copy
78    pub fn try_copy_future(&self) -> Option<Self> {
79        self.arrangement
80            .clone_future_only()
81            .map(|arrangement| Arranged {
82                arrangement,
83                readers: self.readers.clone(),
84                writer: self.writer.clone(),
85            })
86    }
87}
88
89/// A bundle of the various ways a collection can be represented.
90///
91/// This type maintains the invariant that it does contain at least one(or both) valid
92/// source of data, either a collection or at least one arrangement. This is for convenience
93/// of reading the data from the collection.
94///
95// TODO(discord9): make T default to Batch and obsolete the Row Mode
96pub struct CollectionBundle<T: 'static = DiffRow> {
97    /// This is useful for passively reading the new updates from the collection
98    ///
99    /// Invariant: the timestamp of the updates should always not greater than now, since future updates should be stored in the arrangement
100    pub collection: Collection<T>,
101    /// the key [`ScalarExpr`] indicate how the keys(also a [`Row`]) used in Arranged is extract from collection's [`Row`]
102    /// So it is the "index" of the arrangement
103    ///
104    /// The `Arranged` is the actual data source, it can be used to read the data from the collection by
105    /// using the key indicated by the `Vec<ScalarExpr>`
106    /// There is a false positive in using `Vec<ScalarExpr>` as key due to `ScalarExpr::Literal`
107    /// contain a `Value` which have `bytes` variant
108    #[allow(clippy::mutable_key_type)]
109    pub arranged: BTreeMap<Vec<ScalarExpr>, Arranged>,
110}
111
112pub trait GenericBundle {
113    fn is_batch(&self) -> bool;
114
115    fn try_as_batch(&self) -> Option<&CollectionBundle<Batch>> {
116        None
117    }
118
119    fn try_as_row(&self) -> Option<&CollectionBundle<DiffRow>> {
120        None
121    }
122}
123
124impl GenericBundle for CollectionBundle<Batch> {
125    fn is_batch(&self) -> bool {
126        true
127    }
128
129    fn try_as_batch(&self) -> Option<&CollectionBundle<Batch>> {
130        Some(self)
131    }
132}
133
134impl GenericBundle for CollectionBundle<DiffRow> {
135    fn is_batch(&self) -> bool {
136        false
137    }
138
139    fn try_as_row(&self) -> Option<&CollectionBundle<DiffRow>> {
140        Some(self)
141    }
142}
143
144impl<T: 'static> CollectionBundle<T> {
145    pub fn from_collection(collection: Collection<T>) -> Self {
146        Self {
147            collection,
148            arranged: BTreeMap::default(),
149        }
150    }
151}
152
153impl<T: 'static + Clone> CollectionBundle<T> {
154    pub fn clone(&self, df: &mut Dfir) -> Self {
155        Self {
156            collection: self.collection.clone(df),
157            arranged: self
158                .arranged
159                .iter()
160                .map(|(k, v)| (k.clone(), v.try_copy_future().unwrap()))
161                .collect(),
162        }
163    }
164}
165
166/// A thread local error collector, used to collect errors during the evaluation of the plan
167///
168/// usually only the first error matters, but store all of them just in case
169///
170/// Using a `VecDeque` to preserve the order of errors
171/// when running dataflow continuously and need errors in order
172#[derive(Debug, Default, Clone)]
173pub struct ErrCollector {
174    pub inner: Arc<Mutex<VecDeque<EvalError>>>,
175}
176
177impl ErrCollector {
178    pub fn get_all_blocking(&self) -> Vec<EvalError> {
179        self.inner.blocking_lock().drain(..).collect_vec()
180    }
181    pub async fn get_all(&self) -> Vec<EvalError> {
182        self.inner.lock().await.drain(..).collect_vec()
183    }
184
185    pub fn is_empty(&self) -> bool {
186        self.inner.blocking_lock().is_empty()
187    }
188
189    pub fn push_err(&self, err: EvalError) {
190        METRIC_FLOW_ERRORS
191            .with_label_values(&[err.status_code().as_ref()])
192            .inc();
193        self.inner.blocking_lock().push_back(err)
194    }
195
196    pub fn run<F, R>(&self, f: F) -> Option<R>
197    where
198        F: FnOnce() -> Result<R, EvalError>,
199    {
200        match f() {
201            Ok(r) => Some(r),
202            Err(e) => {
203                self.push_err(e);
204                None
205            }
206        }
207    }
208}