1use std::collections::BTreeMap;
16
17use dfir_rs::scheduled::graph_ext::GraphExt;
18use dfir_rs::scheduled::port::{PortCtx, SEND};
19use itertools::Itertools;
20use snafu::OptionExt;
21
22use crate::compute::render::Context;
23use crate::compute::state::Scheduler;
24use crate::compute::types::{Arranged, Collection, CollectionBundle, ErrCollector, Toff};
25use crate::error::{Error, PlanSnafu};
26use crate::expr::{Batch, EvalError, MapFilterProject, MfpPlan, ScalarExpr};
27use crate::plan::TypedPlan;
28use crate::repr::{self, DiffRow, KeyValDiffRow, RelationType, Row};
29use crate::utils::ArrangeHandler;
30
31impl Context<'_, '_> {
32 pub fn render_mfp_batch(
34 &mut self,
35 input: Box<TypedPlan>,
36 mfp: MapFilterProject,
37 _output_type: &RelationType,
38 ) -> Result<CollectionBundle<Batch>, Error> {
39 let input = self.render_plan_batch(*input)?;
40
41 let (out_send_port, out_recv_port) = self.df.make_edge::<_, Toff<Batch>>("mfp_batch");
42
43 let mfp_plan = MfpPlan::create_from(mfp)?;
45
46 let err_collector = self.err_collector.clone();
47
48 let scheduler = self.compute_state.get_scheduler();
50
51 let subgraph = self.df.add_subgraph_in_out(
52 "mfp_batch",
53 input.collection.into_inner(),
54 out_send_port,
55 move |_ctx, recv, send| {
56 let src_data = recv.take_inner().into_iter().flat_map(|v| v.into_iter());
58
59 let output_batches = src_data
60 .filter_map(|mut input_batch| {
61 err_collector.run(|| {
62 let res_batch = mfp_plan.mfp.eval_batch_into(&mut input_batch)?;
63 Ok(res_batch)
64 })
65 })
66 .collect_vec();
67
68 send.give(output_batches);
69 },
70 );
71
72 scheduler.set_cur_subgraph(subgraph);
74
75 let bundle =
76 CollectionBundle::from_collection(Collection::<Batch>::from_port(out_recv_port));
77 Ok(bundle)
78 }
79
80 #[allow(clippy::mutable_key_type)]
87 pub fn render_mfp(
88 &mut self,
89 input: Box<TypedPlan>,
90 mfp: MapFilterProject,
91 ) -> Result<CollectionBundle, Error> {
92 let input = self.render_plan(*input)?;
93 let (out_send_port, out_recv_port) = self.df.make_edge::<_, Toff>("mfp");
96
97 let output_arity = mfp.output_arity();
98
99 let arrange_handler = self.compute_state.new_arrange(None);
102 let arrange_handler_inner =
103 arrange_handler
104 .clone_future_only()
105 .with_context(|| PlanSnafu {
106 reason: "No write is expected at this point",
107 })?;
108
109 let mfp_plan = MfpPlan::create_from(mfp)?;
111 let now = self.compute_state.current_time_ref();
112
113 let err_collector = self.err_collector.clone();
114
115 let scheduler = self.compute_state.get_scheduler();
117 let scheduler_inner = scheduler.clone();
118
119 let subgraph = self.df.add_subgraph_in_out(
120 "mfp",
121 input.collection.into_inner(),
122 out_send_port,
123 move |_ctx, recv, send| {
124 let data = recv.take_inner().into_iter().flat_map(|v| v.into_iter());
126
127 mfp_subgraph(
128 &arrange_handler_inner,
129 data,
130 &mfp_plan,
131 *now.borrow(),
132 &err_collector,
133 &scheduler_inner,
134 send,
135 );
136 },
137 );
138
139 scheduler.set_cur_subgraph(subgraph);
141
142 let arranged = BTreeMap::from([(
143 (0..output_arity).map(ScalarExpr::Column).collect_vec(),
144 Arranged::new(arrange_handler),
145 )]);
146
147 let bundle = CollectionBundle {
148 collection: Collection::from_port(out_recv_port),
149 arranged,
150 };
151 Ok(bundle)
152 }
153}
154
155fn mfp_subgraph(
156 arrange: &ArrangeHandler,
157 input: impl IntoIterator<Item = DiffRow>,
158 mfp_plan: &MfpPlan,
159 now: repr::Timestamp,
160 err_collector: &ErrCollector,
161 scheduler: &Scheduler,
162 send: &PortCtx<SEND, Toff>,
163) {
164 let mut output_now = vec![];
166 let run_mfp = || {
167 let mut all_updates = eval_mfp_core(input, mfp_plan, now, err_collector);
168 all_updates.retain(|(kv, ts, d)| {
169 if *ts > now {
170 true
171 } else {
172 output_now.push((kv.clone(), *ts, *d));
173 false
174 }
175 });
176 let future_updates = all_updates;
177
178 arrange.write().apply_updates(now, future_updates)?;
179 Ok(())
180 };
181 err_collector.run(run_mfp);
182
183 let from = arrange.read().last_compaction_time();
188 let from = from.unwrap_or(repr::Timestamp::MIN);
189 let range = (
190 std::ops::Bound::Excluded(from),
191 std::ops::Bound::Included(now),
192 );
193
194 let output_kv = arrange.read().get_updates_in_range(range);
196
197 err_collector.run(|| {
198 snafu::ensure!(
199 mfp_plan.is_temporal() || output_kv.is_empty(),
200 crate::expr::error::InternalSnafu {
201 reason: "Output from future should be empty since temporal filter is not applied"
202 }
203 );
204 Ok(())
205 });
206
207 let output = output_kv
209 .into_iter()
210 .chain(output_now) .map(|((key, _v), ts, diff)| (key, ts, diff))
212 .collect_vec();
213 send.give(output);
215
216 let run_compaction = || {
217 arrange.write().compact_to(now)?;
218 Ok(())
219 };
220 err_collector.run(run_compaction);
221
222 scheduler.schedule_for_arrange(&arrange.read(), now);
224}
225
226fn eval_mfp_core(
229 input: impl IntoIterator<Item = DiffRow>,
230 mfp_plan: &MfpPlan,
231 now: repr::Timestamp,
232 err_collector: &ErrCollector,
233) -> Vec<KeyValDiffRow> {
234 let mut all_updates = Vec::new();
235 for (mut row, _sys_time, diff) in input.into_iter() {
236 let updates = mfp_plan.evaluate::<EvalError>(&mut row.inner, now, diff);
238 let updates = updates
241 .filter_map(|r| match r {
242 Ok((key, ts, diff)) => Some(((key, Row::empty()), ts, diff)),
243 Err((err, _ts, _diff)) => {
244 err_collector.push_err(err);
245 None
246 }
247 })
248 .collect_vec();
249
250 all_updates.extend(updates);
251 }
252 all_updates
253}
254
255#[cfg(test)]
256mod test {
257
258 use datatypes::data_type::ConcreteDataType;
259 use dfir_rs::scheduled::graph::Dfir;
260
261 use super::*;
262 use crate::compute::render::test::{get_output_handle, harness_test_ctx, run_and_check};
263 use crate::compute::state::DataflowState;
264 use crate::expr::{self, BinaryFunc, GlobalId};
265 use crate::plan::Plan;
266 use crate::repr::{ColumnType, RelationType};
267
268 #[test]
271 fn test_render_mfp_with_temporal() {
272 let mut df = Dfir::new();
273 let mut state = DataflowState::default();
274 let mut ctx = harness_test_ctx(&mut df, &mut state);
275
276 let rows = vec![
277 (Row::new(vec![1i64.into()]), 0, 1),
278 (Row::new(vec![2i64.into()]), 0, 1),
279 (Row::new(vec![3i64.into()]), 0, 1),
280 ];
281 let collection = ctx.render_constant(rows.clone());
282 ctx.insert_global(GlobalId::User(1), collection);
283 let input_plan = Plan::Get {
284 id: expr::Id::Global(GlobalId::User(1)),
285 };
286 let typ = RelationType::new(vec![ColumnType::new_nullable(
287 ConcreteDataType::int64_datatype(),
288 )]);
289 let mfp = MapFilterProject::new(1)
291 .filter(vec![
292 ScalarExpr::Column(0)
293 .call_unary(expr::UnaryFunc::Cast(
294 ConcreteDataType::timestamp_microsecond_datatype(),
295 ))
296 .call_binary(
297 ScalarExpr::CallUnmaterializable(expr::UnmaterializableFunc::Now),
298 BinaryFunc::Gte,
299 ),
300 ScalarExpr::Column(0)
301 .call_binary(
302 ScalarExpr::literal(4i64.into(), ConcreteDataType::int64_datatype()),
303 BinaryFunc::SubInt64,
304 )
305 .call_unary(expr::UnaryFunc::Cast(
306 ConcreteDataType::timestamp_microsecond_datatype(),
307 ))
308 .call_binary(
309 ScalarExpr::CallUnmaterializable(expr::UnmaterializableFunc::Now),
310 BinaryFunc::Lt,
311 ),
312 ])
313 .unwrap();
314
315 let bundle = ctx
316 .render_mfp(Box::new(input_plan.with_types(typ.into_unnamed())), mfp)
317 .unwrap();
318 let output = get_output_handle(&mut ctx, bundle);
319 drop(ctx);
321 let expected_output = BTreeMap::from([
323 (
324 0, vec![
326 (Row::new(vec![1i64.into()]), 0, 1),
327 (Row::new(vec![2i64.into()]), 0, 1),
328 (Row::new(vec![3i64.into()]), 0, 1),
329 ],
330 ),
331 (
332 2, vec![(Row::new(vec![1i64.into()]), 2, -1)],
334 ),
335 (
336 3, vec![(Row::new(vec![2i64.into()]), 3, -1)],
338 ),
339 (
340 4, vec![(Row::new(vec![3i64.into()]), 4, -1)],
342 ),
343 ]);
344 run_and_check(&mut state, &mut df, 0..5, expected_output, output);
345 }
346
347 #[test]
350 fn test_render_mfp() {
351 let mut df = Dfir::new();
352 let mut state = DataflowState::default();
353 let mut ctx = harness_test_ctx(&mut df, &mut state);
354
355 let rows = vec![
356 (Row::new(vec![1.into()]), 1, 1),
357 (Row::new(vec![2.into()]), 2, 1),
358 (Row::new(vec![3.into()]), 3, 1),
359 ];
360 let collection = ctx.render_constant(rows.clone());
361 ctx.insert_global(GlobalId::User(1), collection);
362 let input_plan = Plan::Get {
363 id: expr::Id::Global(GlobalId::User(1)),
364 };
365 let typ = RelationType::new(vec![ColumnType::new_nullable(
366 ConcreteDataType::int64_datatype(),
367 )]);
368 let mfp = MapFilterProject::new(1)
370 .filter(vec![ScalarExpr::Column(0).call_binary(
371 ScalarExpr::literal(1.into(), ConcreteDataType::int32_datatype()),
372 BinaryFunc::Gt,
373 )])
374 .unwrap();
375 let bundle = ctx
376 .render_mfp(Box::new(input_plan.with_types(typ.into_unnamed())), mfp)
377 .unwrap();
378
379 let output = get_output_handle(&mut ctx, bundle);
380 drop(ctx);
381 let expected = BTreeMap::from([
382 (2, vec![(Row::new(vec![2.into()]), 2, 1)]),
383 (3, vec![(Row::new(vec![3.into()]), 3, 1)]),
384 ]);
385 run_and_check(&mut state, &mut df, 1..5, expected, output);
386 }
387
388 #[test]
390 fn test_render_mfp_multiple_times() {
391 let mut df = Dfir::new();
392 let mut state = DataflowState::default();
393 let mut ctx = harness_test_ctx(&mut df, &mut state);
394
395 let (sender, recv) = tokio::sync::broadcast::channel(1000);
396 let collection = ctx.render_source(recv).unwrap();
397 ctx.insert_global(GlobalId::User(1), collection);
398 let input_plan = Plan::Get {
399 id: expr::Id::Global(GlobalId::User(1)),
400 };
401 let typ = RelationType::new(vec![ColumnType::new_nullable(
402 ConcreteDataType::int64_datatype(),
403 )]);
404 let mfp = MapFilterProject::new(1)
406 .filter(vec![ScalarExpr::Column(0).call_binary(
407 ScalarExpr::literal(1.into(), ConcreteDataType::int32_datatype()),
408 BinaryFunc::Gt,
409 )])
410 .unwrap();
411 let bundle = ctx
412 .render_mfp(Box::new(input_plan.with_types(typ.into_unnamed())), mfp)
413 .unwrap();
414
415 let output = get_output_handle(&mut ctx, bundle);
416 drop(ctx);
417 sender.send((Row::new(vec![2.into()]), 0, 1)).unwrap();
418 state.run_available_with_schedule(&mut df);
419 assert_eq!(output.borrow().len(), 1);
420 output.borrow_mut().clear();
421 sender.send((Row::new(vec![3.into()]), 0, 1)).unwrap();
422 state.run_available_with_schedule(&mut df);
423 assert_eq!(output.borrow().len(), 1);
424 }
425}