1use std::cell::RefCell;
16use std::collections::{BTreeMap, VecDeque};
17use std::rc::Rc;
18use std::sync::Arc;
19
20use common_error::ext::ErrorExt;
21use dfir_rs::scheduled::graph::Dfir;
22use dfir_rs::scheduled::handoff::TeeingHandoff;
23use dfir_rs::scheduled::port::RecvPort;
24use dfir_rs::scheduled::SubgraphId;
25use itertools::Itertools;
26use tokio::sync::Mutex;
27
28use crate::expr::{Batch, EvalError, ScalarExpr};
29use crate::metrics::METRIC_FLOW_ERRORS;
30use crate::repr::DiffRow;
31use crate::utils::ArrangeHandler;
32
33pub type Toff<T = DiffRow> = TeeingHandoff<T>;
34
35pub struct Collection<T: 'static> {
37 stream: RecvPort<TeeingHandoff<T>>,
39}
40
41impl<T: 'static + Clone> Collection<T> {
42 pub fn from_port(port: RecvPort<TeeingHandoff<T>>) -> Self {
43 Collection { stream: port }
44 }
45
46 pub fn clone(&self, df: &mut Dfir) -> Self {
50 Collection {
51 stream: self.stream.tee(df),
52 }
53 }
54
55 pub fn into_inner(self) -> RecvPort<TeeingHandoff<T>> {
56 self.stream
57 }
58}
59
60pub struct Arranged {
62 pub arrangement: ArrangeHandler,
63 pub writer: Rc<RefCell<Option<SubgraphId>>>,
64 pub readers: Rc<RefCell<Vec<SubgraphId>>>,
66}
67
68impl Arranged {
69 pub fn new(arr: ArrangeHandler) -> Self {
70 Self {
71 arrangement: arr,
72 writer: Default::default(),
73 readers: Default::default(),
74 }
75 }
76
77 pub fn try_copy_future(&self) -> Option<Self> {
79 self.arrangement
80 .clone_future_only()
81 .map(|arrangement| Arranged {
82 arrangement,
83 readers: self.readers.clone(),
84 writer: self.writer.clone(),
85 })
86 }
87}
88
89pub struct CollectionBundle<T: 'static = DiffRow> {
97 pub collection: Collection<T>,
101 #[allow(clippy::mutable_key_type)]
109 pub arranged: BTreeMap<Vec<ScalarExpr>, Arranged>,
110}
111
112pub trait GenericBundle {
113 fn is_batch(&self) -> bool;
114
115 fn try_as_batch(&self) -> Option<&CollectionBundle<Batch>> {
116 None
117 }
118
119 fn try_as_row(&self) -> Option<&CollectionBundle<DiffRow>> {
120 None
121 }
122}
123
124impl GenericBundle for CollectionBundle<Batch> {
125 fn is_batch(&self) -> bool {
126 true
127 }
128
129 fn try_as_batch(&self) -> Option<&CollectionBundle<Batch>> {
130 Some(self)
131 }
132}
133
134impl GenericBundle for CollectionBundle<DiffRow> {
135 fn is_batch(&self) -> bool {
136 false
137 }
138
139 fn try_as_row(&self) -> Option<&CollectionBundle<DiffRow>> {
140 Some(self)
141 }
142}
143
144impl<T: 'static> CollectionBundle<T> {
145 pub fn from_collection(collection: Collection<T>) -> Self {
146 Self {
147 collection,
148 arranged: BTreeMap::default(),
149 }
150 }
151}
152
153impl<T: 'static + Clone> CollectionBundle<T> {
154 pub fn clone(&self, df: &mut Dfir) -> Self {
155 Self {
156 collection: self.collection.clone(df),
157 arranged: self
158 .arranged
159 .iter()
160 .map(|(k, v)| (k.clone(), v.try_copy_future().unwrap()))
161 .collect(),
162 }
163 }
164}
165
166#[derive(Debug, Default, Clone)]
173pub struct ErrCollector {
174 pub inner: Arc<Mutex<VecDeque<EvalError>>>,
175}
176
177impl ErrCollector {
178 pub fn get_all_blocking(&self) -> Vec<EvalError> {
179 self.inner.blocking_lock().drain(..).collect_vec()
180 }
181 pub async fn get_all(&self) -> Vec<EvalError> {
182 self.inner.lock().await.drain(..).collect_vec()
183 }
184
185 pub fn is_empty(&self) -> bool {
186 self.inner.blocking_lock().is_empty()
187 }
188
189 pub fn push_err(&self, err: EvalError) {
190 METRIC_FLOW_ERRORS
191 .with_label_values(&[err.status_code().as_ref()])
192 .inc();
193 self.inner.blocking_lock().push_back(err)
194 }
195
196 pub fn run<F, R>(&self, f: F) -> Option<R>
197 where
198 F: FnOnce() -> Result<R, EvalError>,
199 {
200 match f() {
201 Ok(r) => Some(r),
202 Err(e) => {
203 self.push_err(e);
204 None
205 }
206 }
207 }
208}