1use std::collections::{BTreeMap, BTreeSet};
16use std::ops::Range;
17use std::sync::Arc;
18
19use arrow::array::new_null_array;
20use common_telemetry::trace;
21use datatypes::data_type::ConcreteDataType;
22use datatypes::prelude::DataType;
23use datatypes::value::{ListValue, Value};
24use datatypes::vectors::{BooleanVector, NullVector};
25use dfir_rs::scheduled::graph_ext::GraphExt;
26use itertools::Itertools;
27use snafu::{OptionExt, ResultExt, ensure};
28
29use crate::compute::render::{Context, SubgraphArg};
30use crate::compute::types::{Arranged, Collection, CollectionBundle, ErrCollector, Toff};
31use crate::error::{Error, NotImplementedSnafu, PlanSnafu};
32use crate::expr::error::{ArrowSnafu, DataAlreadyExpiredSnafu, DataTypeSnafu, InternalSnafu};
33use crate::expr::{Accum, Accumulator, Batch, EvalError, ScalarExpr, VectorDiff};
34use crate::plan::{AccumulablePlan, AggrWithIndex, KeyValPlan, ReducePlan, TypedPlan};
35use crate::repr::{self, DiffRow, KeyValDiffRow, RelationType, Row};
36use crate::utils::{ArrangeHandler, ArrangeReader, ArrangeWriter, KeyExpiryManager};
37
38impl Context<'_, '_> {
39 const REDUCE_BATCH: &'static str = "reduce_batch";
40 #[allow(clippy::mutable_key_type)]
44 pub fn render_reduce_batch(
45 &mut self,
46 input: Box<TypedPlan>,
47 key_val_plan: &KeyValPlan,
48 reduce_plan: &ReducePlan,
49 output_type: &RelationType,
50 ) -> Result<CollectionBundle<Batch>, Error> {
51 let accum_plan = if let ReducePlan::Accumulable(accum_plan) = reduce_plan {
52 if !accum_plan.distinct_aggrs.is_empty() {
53 NotImplementedSnafu {
54 reason: "Distinct aggregation is not supported in batch mode",
55 }
56 .fail()?
57 }
58 accum_plan.clone()
59 } else {
60 NotImplementedSnafu {
61 reason: "Only accumulable reduce plan is supported in batch mode",
62 }
63 .fail()?
64 };
65
66 let input = self.render_plan_batch(*input)?;
67
68 let output_key_arity = key_val_plan.key_plan.output_arity();
73
74 let arrange_handler = self.compute_state.new_arrange(None);
76
77 if let (Some(time_index), Some(expire_after)) =
78 (output_type.time_index, self.compute_state.expire_after())
79 {
80 let expire_man =
81 KeyExpiryManager::new(Some(expire_after), Some(ScalarExpr::Column(time_index)));
82 arrange_handler.write().set_expire_state(expire_man);
83 }
84
85 let arrange_handler_inner = arrange_handler.clone_full_arrange().context(PlanSnafu {
87 reason: "No write is expected at this point",
88 })?;
89 let key_val_plan = key_val_plan.clone();
90
91 let output_type = output_type.clone();
92
93 let now = self.compute_state.current_time_ref();
94
95 let err_collector = self.err_collector.clone();
96
97 let scheduler = self.compute_state.get_scheduler();
99
100 let scheduler_inner = scheduler.clone();
101
102 let (out_send_port, out_recv_port) =
103 self.df.make_edge::<_, Toff<Batch>>(Self::REDUCE_BATCH);
104
105 let subgraph = self.df.add_subgraph_in_out(
106 Self::REDUCE_BATCH,
107 input.collection.into_inner(),
108 out_send_port,
109 move |_ctx, recv, send| {
110 let now = *(now.borrow());
111 let arrange = arrange_handler_inner.clone();
112 let src_data = recv
114 .take_inner()
115 .into_iter()
116 .flat_map(|v| v.into_iter())
117 .collect_vec();
118
119 reduce_batch_subgraph(
120 &arrange,
121 src_data,
122 &key_val_plan,
123 &accum_plan,
124 &output_type,
125 SubgraphArg {
126 now,
127 err_collector: &err_collector,
128 scheduler: &scheduler_inner,
129 send,
130 },
131 )
132 },
133 );
134
135 scheduler.set_cur_subgraph(subgraph);
136
137 let arranged = BTreeMap::from([(
139 (0..output_key_arity).map(ScalarExpr::Column).collect_vec(),
140 Arranged::new(arrange_handler),
141 )]);
142
143 let bundle = CollectionBundle {
144 collection: Collection::from_port(out_recv_port),
145 arranged,
146 };
147 Ok(bundle)
148 }
149
150 const REDUCE: &'static str = "reduce";
151 #[allow(clippy::mutable_key_type)]
154 pub fn render_reduce(
155 &mut self,
156 input: Box<TypedPlan>,
157 key_val_plan: KeyValPlan,
158 reduce_plan: ReducePlan,
159 output_type: RelationType,
160 ) -> Result<CollectionBundle, Error> {
161 let input = self.render_plan(*input)?;
162 let output_key_arity = key_val_plan.key_plan.output_arity();
167
168 let arrange_handler = self.compute_state.new_arrange(None);
170
171 if let (Some(time_index), Some(expire_after)) =
172 (output_type.time_index, self.compute_state.expire_after())
173 {
174 let expire_man =
175 KeyExpiryManager::new(Some(expire_after), Some(ScalarExpr::Column(time_index)));
176 arrange_handler.write().set_expire_state(expire_man);
177 }
178
179 let arrange_handler_inner = arrange_handler.clone_full_arrange().context(PlanSnafu {
181 reason: "No write is expected at this point",
182 })?;
183
184 let distinct_input = self.add_accum_distinct_input_arrange(&reduce_plan);
185
186 let reduce_arrange = ReduceArrange {
187 output_arrange: arrange_handler_inner,
188 distinct_input,
189 };
190
191 let now = self.compute_state.current_time_ref();
192
193 let err_collector = self.err_collector.clone();
194
195 let scheduler = self.compute_state.get_scheduler();
197 let scheduler_inner = scheduler.clone();
198
199 let (out_send_port, out_recv_port) = self.df.make_edge::<_, Toff>(Self::REDUCE);
200
201 let subgraph = self.df.add_subgraph_in_out(
202 Self::REDUCE,
203 input.collection.into_inner(),
204 out_send_port,
205 move |_ctx, recv, send| {
206 let data = recv
208 .take_inner()
209 .into_iter()
210 .flat_map(|v| v.into_iter())
211 .collect_vec();
212
213 reduce_subgraph(
214 &reduce_arrange,
215 data,
216 &key_val_plan,
217 &reduce_plan,
218 SubgraphArg {
219 now: *now.borrow(),
220 err_collector: &err_collector,
221 scheduler: &scheduler_inner,
222 send,
223 },
224 );
225 },
226 );
227
228 scheduler.set_cur_subgraph(subgraph);
229
230 let arranged = BTreeMap::from([(
232 (0..output_key_arity).map(ScalarExpr::Column).collect_vec(),
233 Arranged::new(arrange_handler),
234 )]);
235
236 let bundle = CollectionBundle {
237 collection: Collection::from_port(out_recv_port),
238 arranged,
239 };
240 Ok(bundle)
241 }
242
243 fn add_accum_distinct_input_arrange(
250 &mut self,
251 reduce_plan: &ReducePlan,
252 ) -> Option<Vec<ArrangeHandler>> {
253 match reduce_plan {
254 ReducePlan::Distinct => None,
255 ReducePlan::Accumulable(AccumulablePlan { distinct_aggrs, .. }) => {
256 (!distinct_aggrs.is_empty()).then(|| {
257 std::iter::repeat_with(|| {
258 let arr = self.compute_state.new_arrange(None);
259 arr.set_full_arrangement(true);
260 arr
261 })
262 .take(distinct_aggrs.len())
263 .collect()
264 })
265 }
266 }
267 }
268}
269
270fn from_accum_values_to_live_accums(
271 accums: Vec<Value>,
272 len: usize,
273) -> Result<Vec<Vec<Value>>, EvalError> {
274 let accum_ranges = from_val_to_slice_idx(accums.first().cloned(), len)?;
275 let mut accum_list = vec![];
276 for range in accum_ranges.iter() {
277 accum_list.push(accums.get(range.clone()).unwrap_or_default().to_vec());
278 }
279 Ok(accum_list)
280}
281
282pub struct ReduceArrange {
284 output_arrange: ArrangeHandler,
286 distinct_input: Option<Vec<ArrangeHandler>>,
289}
290
291fn batch_split_by_key_val(
292 batch: &Batch,
293 key_val_plan: &KeyValPlan,
294 err_collector: &ErrCollector,
295) -> (Batch, Batch) {
296 let row_count = batch.row_count();
297 let mut key_batch = Batch::empty();
298 let mut val_batch = Batch::empty();
299
300 err_collector.run(|| {
301 if key_val_plan.key_plan.output_arity() != 0 {
302 key_batch = key_val_plan.key_plan.eval_batch_into(&mut batch.clone())?;
303 }
304
305 if key_val_plan.val_plan.output_arity() != 0 {
306 val_batch = key_val_plan.val_plan.eval_batch_into(&mut batch.clone())?;
307 }
308 Ok(())
309 });
310
311 if key_batch.row_count() == 0 && key_batch.column_count() == 0 {
313 key_batch.set_row_count(row_count);
314 }
315
316 if val_batch.row_count() == 0 && val_batch.column_count() == 0 {
317 val_batch.set_row_count(row_count);
318 }
319
320 (key_batch, val_batch)
321}
322
323fn split_rows_to_key_val(
325 rows: impl IntoIterator<Item = DiffRow>,
326 key_val_plan: KeyValPlan,
327 err_collector: ErrCollector,
328) -> impl IntoIterator<Item = KeyValDiffRow> {
329 let mut row_buf = Row::new(vec![]);
330 rows.into_iter().filter_map(
331 move |(mut row, sys_time, diff): DiffRow| -> Option<KeyValDiffRow> {
332 err_collector.run(|| {
333 let len = row.len();
334 if let Some(key) = key_val_plan
335 .key_plan
336 .evaluate_into(&mut row.inner, &mut row_buf)?
337 {
338 row.inner.resize(len, Value::Null);
340 let val = key_val_plan
342 .val_plan
343 .evaluate_into(&mut row.inner, &mut row_buf)?
344 .context(InternalSnafu {
345 reason: "val_plan should not contain any filter predicate",
346 })?;
347 Ok(Some(((key, val), sys_time, diff)))
348 } else {
349 Ok(None)
350 }
351 })?
352 },
353 )
354}
355
356fn reduce_batch_subgraph(
357 arrange: &ArrangeHandler,
358 src_data: impl IntoIterator<Item = Batch>,
359 key_val_plan: &KeyValPlan,
360 accum_plan: &AccumulablePlan,
361 output_type: &RelationType,
362 SubgraphArg {
363 now,
364 err_collector,
365 scheduler: _,
366 send,
367 }: SubgraphArg<Toff<Batch>>,
368) {
369 let mut key_to_many_vals = BTreeMap::<Row, Vec<Batch>>::new();
370 let mut input_row_count = 0;
371 let mut input_batch_count = 0;
372
373 for batch in src_data {
374 input_batch_count += 1;
375 input_row_count += batch.row_count();
376 err_collector.run(|| {
377 let (key_batch, val_batch) =
378 batch_split_by_key_val(&batch, key_val_plan, err_collector);
379 ensure!(
380 key_batch.row_count() == val_batch.row_count(),
381 InternalSnafu {
382 reason: format!(
383 "Key and val batch should have the same row count, found {} and {}",
384 key_batch.row_count(),
385 val_batch.row_count()
386 )
387 }
388 );
389
390 let mut distinct_keys = BTreeSet::new();
391 for row_idx in 0..key_batch.row_count() {
392 let key_row = key_batch.get_row(row_idx)?;
393 let key_row = Row::new(key_row);
394
395 if distinct_keys.contains(&key_row) {
396 continue;
397 } else {
398 distinct_keys.insert(key_row.clone());
399 }
400 }
401
402 let key_data_types = output_type
403 .column_types
404 .iter()
405 .map(|t| t.scalar_type.clone())
406 .collect_vec();
407
408 for key_row in distinct_keys {
410 let key_scalar_value = {
411 let mut key_scalar_value = Vec::with_capacity(key_row.len());
412 for (key_idx, key) in key_row.iter().enumerate() {
413 let v =
414 key.try_to_scalar_value(&key.data_type())
415 .context(DataTypeSnafu {
416 msg: "can't convert key values to datafusion value",
417 })?;
418
419 let key_data_type = key_data_types.get(key_idx).context(InternalSnafu {
420 reason: format!(
421 "Key index out of bound, expected at most {} but got {}",
422 output_type.column_types.len(),
423 key_idx
424 ),
425 })?;
426
427 if key_data_type.as_arrow_type() != v.data_type()
429 && !v.data_type().is_null()
430 {
431 crate::expr::error::InternalSnafu {
432 reason: format!(
433 "Key data type mismatch, expected {:?} but got {:?}",
434 key_data_type.as_arrow_type(),
435 v.data_type()
436 ),
437 }
438 .fail()?
439 }
440
441 let arrow_value = if v.data_type().is_null() {
443 let ret = new_null_array(&arrow::datatypes::DataType::Null, 1);
444 arrow::array::Scalar::new(ret)
445 } else {
446 v.to_scalar().context(crate::expr::error::DatafusionSnafu {
447 context: "can't convert key values to arrow value",
448 })?
449 };
450 key_scalar_value.push(arrow_value);
451 }
452 key_scalar_value
453 };
454
455 let eq_results = key_scalar_value
457 .into_iter()
458 .zip(key_batch.batch().iter())
459 .map(|(key, col)| {
460 if arrow::array::Datum::get(&key).0.data_type().is_null() {
465 arrow::compute::kernels::boolean::is_null(
466 col.to_arrow_array().as_ref() as _
467 )
468 } else {
469 arrow::compute::kernels::cmp::eq(
470 &key,
471 &col.to_arrow_array().as_ref() as _,
472 )
473 }
474 })
475 .try_collect::<_, Vec<_>, _>()
476 .context(ArrowSnafu {
477 context: "Failed to compare key values",
478 })?;
479
480 let opt_eq_mask = eq_results
482 .into_iter()
483 .fold(None, |acc, v| match acc {
484 Some(Ok(acc)) => Some(arrow::compute::kernels::boolean::and(&acc, &v)),
485 Some(Err(_)) => acc,
486 None => Some(Ok(v)),
487 })
488 .transpose()
489 .context(ArrowSnafu {
490 context: "Failed to combine key comparison results",
491 })?;
492
493 let key_eq_mask = if let Some(eq_mask) = opt_eq_mask {
494 BooleanVector::from(eq_mask)
495 } else {
496 BooleanVector::from(vec![true; key_batch.row_count()])
500 };
501 let cur_val_batch = val_batch.filter(&key_eq_mask)?;
504
505 key_to_many_vals
506 .entry(key_row)
507 .or_default()
508 .push(cur_val_batch);
509 }
510
511 Ok(())
512 });
513 }
514
515 trace!(
516 "Reduce take {} batches, {} rows",
517 input_batch_count, input_row_count
518 );
519
520 let mut arrange = arrange.write();
523 let mut all_arrange_updates = Vec::with_capacity(key_to_many_vals.len());
524
525 let mut all_output_dict = BTreeMap::new();
526
527 for (key, val_batches) in key_to_many_vals {
528 err_collector.run(|| -> Result<(), _> {
529 let (accums, _, _) = arrange.get(now, &key).unwrap_or_default();
530 let accum_list =
531 from_accum_values_to_live_accums(accums.unpack(), accum_plan.simple_aggrs.len())?;
532
533 let mut accum_output = AccumOutput::new();
534 for AggrWithIndex {
535 expr,
536 input_idx,
537 output_idx,
538 } in accum_plan.simple_aggrs.iter()
539 {
540 let cur_accum_value = accum_list.get(*output_idx).cloned().unwrap_or_default();
541 let mut cur_accum = if cur_accum_value.is_empty() {
542 Accum::new_accum(&expr.func.clone())?
543 } else {
544 Accum::try_into_accum(&expr.func, cur_accum_value)?
545 };
546
547 for val_batch in val_batches.iter() {
548 let cur_input = val_batch
550 .batch()
551 .get(*input_idx)
552 .cloned()
553 .unwrap_or_else(|| Arc::new(NullVector::new(val_batch.row_count())));
554 let len = cur_input.len();
555 cur_accum.update_batch(&expr.func, VectorDiff::from(cur_input))?;
556
557 trace!("Reduce accum after take {} rows: {:?}", len, cur_accum);
558 }
559 let final_output = cur_accum.eval(&expr.func)?;
560 trace!("Reduce accum final output: {:?}", final_output);
561 accum_output.insert_output(*output_idx, final_output);
562
563 let cur_accum_value = cur_accum.into_state();
564 accum_output.insert_accum(*output_idx, cur_accum_value);
565 }
566
567 let (new_accums, res_val_row) = accum_output.into_accum_output()?;
568
569 let arrange_update = ((key.clone(), Row::new(new_accums)), now, 1);
570 all_arrange_updates.push(arrange_update);
571
572 all_output_dict.insert(key, Row::from(res_val_row));
573
574 Ok(())
575 });
576 }
577
578 err_collector.run(|| {
579 arrange.apply_updates(now, all_arrange_updates)?;
580 arrange.compact_to(now)
581 });
582 drop(arrange);
584
585 let output_types = output_type
589 .column_types
590 .iter()
591 .map(|t| t.scalar_type.clone())
592 .collect_vec();
593
594 err_collector.run(|| {
595 let column_cnt = output_types.len();
596 let row_cnt = all_output_dict.len();
597
598 let mut output_builder = output_types
599 .into_iter()
600 .map(|t| t.create_mutable_vector(row_cnt))
601 .collect_vec();
602
603 for (key, val) in all_output_dict {
604 for (i, v) in key.into_iter().chain(val.into_iter()).enumerate() {
605 output_builder
606 .get_mut(i)
607 .context(InternalSnafu{
608 reason: format!(
609 "Output builder should have the same length as the row, expected at most {} but got {}",
610 column_cnt - 1,
611 i
612 )
613 })?
614 .try_push_value_ref(v.as_value_ref())
615 .context(DataTypeSnafu {
616 msg: "Failed to push value",
617 })?;
618 }
619 }
620
621 let output_columns = output_builder
622 .into_iter()
623 .map(|mut b| b.to_vector())
624 .collect_vec();
625
626 let output_batch = Batch::try_new(output_columns, row_cnt)?;
627
628 trace!("Reduce output batch: {:?}", output_batch);
629
630 send.give(vec![output_batch]);
631
632 Ok(())
633 });
634}
635
636fn reduce_subgraph(
639 ReduceArrange {
640 output_arrange: arrange,
641 distinct_input,
642 }: &ReduceArrange,
643 data: impl IntoIterator<Item = DiffRow>,
644 key_val_plan: &KeyValPlan,
645 reduce_plan: &ReducePlan,
646 SubgraphArg {
647 now,
648 err_collector,
649 scheduler,
650 send,
651 }: SubgraphArg,
652) {
653 let key_val = split_rows_to_key_val(data, key_val_plan.clone(), err_collector.clone());
654 match reduce_plan {
659 ReducePlan::Distinct => reduce_distinct_subgraph(
660 arrange,
661 key_val,
662 SubgraphArg {
663 now,
664 err_collector,
665 scheduler,
666 send,
667 },
668 ),
669 ReducePlan::Accumulable(accum_plan) => reduce_accum_subgraph(
670 arrange,
671 distinct_input,
672 key_val,
673 accum_plan,
674 SubgraphArg {
675 now,
676 err_collector,
677 scheduler,
678 send,
679 },
680 ),
681 };
682}
683
684fn eval_distinct_core(
688 arrange: ArrangeReader,
689 kv: impl IntoIterator<Item = KeyValDiffRow>,
690 now: repr::Timestamp,
691 err_collector: &ErrCollector,
692) -> Vec<KeyValDiffRow> {
693 let _ = err_collector;
694
695 let mut inner_map = BTreeMap::new();
698 kv.into_iter()
699 .filter_map(|((key, val), ts, diff)| {
700 let old_val = inner_map
702 .get(&key)
703 .cloned()
704 .or_else(|| arrange.get(now, &key));
705
706 let new_key_val = match (old_val, diff) {
707 (None, 1) => Some(((key, val), ts, diff)),
709 (Some(old_val), diff) if old_val.0 == val && old_val.2 != diff => {
711 Some(((key, val), ts, diff))
712 }
713 _ => None,
714 };
715
716 if let Some(((k, v), t, d)) = new_key_val.clone() {
717 inner_map.insert(k, (v, t, d));
719 }
720 new_key_val
721 })
722 .collect_vec()
723}
724
725fn update_reduce_distinct_arrange(
729 arrange: &ArrangeHandler,
730 kv: impl IntoIterator<Item = KeyValDiffRow>,
731 now: repr::Timestamp,
732 err_collector: &ErrCollector,
733) -> impl Iterator<Item = DiffRow> {
734 let result_updates = eval_distinct_core(arrange.read(), kv, now, err_collector);
735
736 err_collector.run(|| {
737 arrange.write().apply_updates(now, result_updates)?;
738 Ok(())
739 });
740
741 let from = arrange.read().last_compaction_time();
745 let from = from.unwrap_or(repr::Timestamp::MIN);
746 let range = (
747 std::ops::Bound::Excluded(from),
748 std::ops::Bound::Included(now),
749 );
750 let output_kv = arrange.read().get_updates_in_range(range);
751
752 let run_compaction = || {
754 arrange.write().compact_to(now)?;
755 Ok(())
756 };
757 err_collector.run(run_compaction);
758
759 output_kv.into_iter().map(|((mut key, v), ts, diff)| {
762 key.extend(v.into_iter());
763 (key, ts, diff)
764 })
765}
766
767fn reduce_distinct_subgraph(
772 arrange: &ArrangeHandler,
773 kv: impl IntoIterator<Item = KeyValDiffRow>,
774 SubgraphArg {
775 now,
776 err_collector,
777 scheduler: _,
778 send,
779 }: SubgraphArg,
780) {
781 let ret = update_reduce_distinct_arrange(arrange, kv, now, err_collector).collect_vec();
782
783 if arrange.read().get_next_update_time(&now).is_some() {
785 err_collector.push_err(
786 InternalSnafu {
787 reason: "No future updates should exist in the reduce distinct arrangement",
788 }
789 .build(),
790 );
791 }
792
793 send.give(ret);
794}
795
796fn reduce_accum_subgraph(
813 arrange: &ArrangeHandler,
814 distinct_input: &Option<Vec<ArrangeHandler>>,
815 kv: impl IntoIterator<Item = KeyValDiffRow>,
816 accum_plan: &AccumulablePlan,
817 SubgraphArg {
818 now,
819 err_collector,
820 scheduler,
821 send,
822 }: SubgraphArg,
823) {
824 let AccumulablePlan {
825 full_aggrs,
826 simple_aggrs,
827 distinct_aggrs,
828 } = accum_plan;
829 let mut key_to_vals = BTreeMap::<Row, Vec<(Row, repr::Diff)>>::new();
830
831 for ((key, val), _tick, diff) in kv {
832 let vals = key_to_vals.entry(key).or_default();
834 vals.push((val, diff));
835 }
836
837 let mut all_updates = Vec::with_capacity(key_to_vals.len());
838 let mut all_outputs = Vec::with_capacity(key_to_vals.len());
839 let mut arrange = arrange.write();
843 for (key, value_diffs) in key_to_vals {
844 if let Some(expire_man) = &arrange.get_expire_state() {
845 let mut is_expired = false;
846 err_collector.run(|| {
847 if let Some(expired) = expire_man.get_expire_duration(now, &key)? {
848 is_expired = true;
849 common_telemetry::warn!(
851 "Data already expired: {}",
852 DataAlreadyExpiredSnafu {
853 expired_by: expired,
854 }
855 .build()
856 );
857 Ok(())
858 } else {
859 Ok(())
860 }
861 });
862 if is_expired {
863 continue;
865 }
866 }
867 let col_diffs = {
868 let row_len = value_diffs[0].0.len();
869 let res = err_collector.run(|| get_col_diffs(value_diffs, row_len));
870 match res {
871 Some(res) => res,
872 None => continue,
875 }
876 };
877 let (accums, _, _) = arrange.get(now, &key).unwrap_or_default();
878
879 let accums = accums.inner;
880
881 let accum_ranges = {
883 let res = err_collector
884 .run(|| from_val_to_slice_idx(accums.first().cloned(), full_aggrs.len()));
885 if let Some(res) = res {
886 res
887 } else {
888 continue;
890 }
891 };
892
893 let mut accum_output = AccumOutput::new();
894 eval_simple_aggrs(
895 simple_aggrs,
896 &accums,
897 &accum_ranges,
898 &col_diffs,
899 &mut accum_output,
900 err_collector,
901 );
902
903 eval_distinct_aggrs(
905 distinct_aggrs,
906 distinct_input,
907 &accums,
908 &accum_ranges,
909 &col_diffs,
910 &mut accum_output,
911 SubgraphArg {
912 now,
913 err_collector,
914 scheduler,
915 send,
916 },
917 );
918
919 err_collector.run(|| {
921 let (new_accums, res_val_row) = accum_output.into_accum_output()?;
922
923 all_updates.push(((key.clone(), Row::new(new_accums)), now, 1));
925 let mut key_val = key;
926 key_val.extend(res_val_row);
927 all_outputs.push((key_val, now, 1));
928 Ok(())
929 });
930 }
931 err_collector.run(|| {
932 arrange.apply_updates(now, all_updates)?;
933 arrange.compact_to(now)
934 });
935
936 let all_arrange_used = distinct_input
939 .iter()
940 .flatten()
941 .map(|d| d.write())
942 .chain(std::iter::once(arrange));
943 check_no_future_updates(all_arrange_used, err_collector, now);
944
945 send.give(all_outputs);
946}
947
948fn get_col_diffs(
949 value_diffs: Vec<(Row, repr::Diff)>,
950 row_len: usize,
951) -> Result<Vec<Vec<(Value, i64)>>, EvalError> {
952 ensure!(
953 value_diffs.iter().all(|(row, _)| row.len() == row_len),
954 InternalSnafu {
955 reason: "value_diffs should have rows with equal length"
956 }
957 );
958 let ret = (0..row_len)
959 .map(|i| {
960 value_diffs
961 .iter()
962 .map(|(row, diff)| (row.get(i).cloned().unwrap(), *diff))
963 .collect_vec()
964 })
965 .collect_vec();
966 Ok(ret)
967}
968
969fn eval_simple_aggrs(
971 simple_aggrs: &Vec<AggrWithIndex>,
972 accums: &[Value],
973 accum_ranges: &[Range<usize>],
974 col_diffs: &[Vec<(Value, i64)>],
975 accum_output: &mut AccumOutput,
976 err_collector: &ErrCollector,
977) {
978 for AggrWithIndex {
979 expr,
980 input_idx,
981 output_idx,
982 } in simple_aggrs
983 {
984 let cur_accum_range = accum_ranges[*output_idx].clone(); let cur_old_accum = accums
986 .get(cur_accum_range)
987 .unwrap_or_default()
988 .iter()
989 .cloned();
990 let cur_col_diff = col_diffs[*input_idx].iter().cloned();
991
992 if let Some((res, new_accum)) =
994 err_collector.run(|| expr.func.eval_diff_accumulable(cur_old_accum, cur_col_diff))
995 {
996 accum_output.insert_accum(*output_idx, new_accum);
997 accum_output.insert_output(*output_idx, res);
998 } }
1000}
1001
1002#[derive(Debug)]
1008struct AccumOutput {
1009 accum: BTreeMap<usize, Vec<Value>>,
1010 output: BTreeMap<usize, Value>,
1011}
1012
1013impl AccumOutput {
1014 fn new() -> Self {
1015 Self {
1016 accum: BTreeMap::new(),
1017 output: BTreeMap::new(),
1018 }
1019 }
1020
1021 fn insert_accum(&mut self, idx: usize, v: Vec<Value>) {
1022 self.accum.insert(idx, v);
1023 }
1024
1025 fn insert_output(&mut self, idx: usize, v: Value) {
1026 self.output.insert(idx, v);
1027 }
1028
1029 fn into_accum_output(self) -> Result<(Vec<Value>, Vec<Value>), EvalError> {
1031 if self.accum.is_empty() && self.output.is_empty() {
1032 return Ok((vec![], vec![]));
1033 }
1034 ensure!(
1035 !self.accum.is_empty() && self.accum.len() == self.output.len(),
1036 InternalSnafu {
1037 reason: format!(
1038 "Accum and output should have the non-zero and same length, found accum.len() = {}, output.len() = {}",
1039 self.accum.len(),
1040 self.output.len()
1041 )
1042 }
1043 );
1044 if let Some(kv) = self.accum.last_key_value() {
1046 ensure!(
1047 *kv.0 == self.accum.len() - 1,
1048 InternalSnafu {
1049 reason: "Accum should be a continuous range"
1050 }
1051 );
1052 }
1053 if let Some(kv) = self.output.last_key_value() {
1054 ensure!(
1055 *kv.0 == self.output.len() - 1,
1056 InternalSnafu {
1057 reason: "Output should be a continuous range"
1058 }
1059 );
1060 }
1061
1062 let accums = self.accum.into_values().collect_vec();
1063 let new_accums = from_accums_to_offsetted_accum(accums);
1064 let output = self.output.into_values().collect_vec();
1065 Ok((new_accums, output))
1066 }
1067}
1068
1069fn eval_distinct_aggrs(
1071 distinct_aggrs: &Vec<AggrWithIndex>,
1072 distinct_input: &Option<Vec<ArrangeHandler>>,
1073 accums: &[Value],
1074 accum_ranges: &[Range<usize>],
1075 col_diffs: &[Vec<(Value, i64)>],
1076 accum_output: &mut AccumOutput,
1077 SubgraphArg {
1078 now,
1079 err_collector,
1080 scheduler: _,
1081 send: _,
1082 }: SubgraphArg,
1083) {
1084 for AggrWithIndex {
1085 expr,
1086 input_idx,
1087 output_idx,
1088 } in distinct_aggrs
1089 {
1090 let cur_accum_range = accum_ranges[*output_idx].clone(); let cur_old_accum = accums
1092 .get(cur_accum_range)
1093 .unwrap_or_default()
1094 .iter()
1095 .cloned();
1096 let cur_col_diff = col_diffs[*input_idx].iter().cloned();
1097 let input_arrange = distinct_input
1099 .as_ref()
1100 .and_then(|v| v[*input_idx].clone_full_arrange())
1101 .expect("A full distinct input arrangement should exist");
1102 let kv = cur_col_diff.map(|(v, d)| ((Row::new(vec![v]), Row::empty()), now, d));
1103 let col_diff_distinct =
1104 update_reduce_distinct_arrange(&input_arrange, kv, now, err_collector).map(
1105 |(row, _ts, diff)| (row.get(0).expect("Row should not be empty").clone(), diff),
1106 );
1107 let col_diff_distinct = {
1108 let res = col_diff_distinct.collect_vec();
1109 res.into_iter()
1110 };
1111 let (res, new_accum) = expr
1113 .func
1114 .eval_diff_accumulable(cur_old_accum, col_diff_distinct)
1115 .unwrap();
1116 accum_output.insert_accum(*output_idx, new_accum);
1117 accum_output.insert_output(*output_idx, res);
1118 }
1119}
1120
1121fn check_no_future_updates<'a>(
1122 all_arrange_used: impl IntoIterator<Item = ArrangeWriter<'a>>,
1123 err_collector: &ErrCollector,
1124 now: repr::Timestamp,
1125) {
1126 for arrange in all_arrange_used {
1127 if arrange.get_next_update_time(&now).is_some() {
1128 err_collector.push_err(
1129 InternalSnafu {
1130 reason: "No future updates should exist in the reduce distinct arrangement",
1131 }
1132 .build(),
1133 );
1134 }
1135 }
1136}
1137
1138fn from_accums_to_offsetted_accum(new_accums: Vec<Vec<Value>>) -> Vec<Value> {
1140 let offset = new_accums
1141 .iter()
1142 .map(|v| v.len() as u64)
1143 .scan(1, |state, x| {
1144 *state += x;
1145 Some(*state)
1146 })
1147 .map(Value::from)
1148 .collect::<Vec<_>>();
1149 let first = ListValue::new(offset, ConcreteDataType::uint64_datatype());
1150 let first = Value::List(first);
1151 std::iter::once(first)
1154 .chain(new_accums.into_iter().flatten())
1155 .collect::<Vec<_>>()
1156}
1157
1158fn from_val_to_slice_idx(
1160 value: Option<Value>,
1161 expected_len: usize,
1162) -> Result<Vec<Range<usize>>, EvalError> {
1163 let offset_end = if let Some(value) = value {
1164 let list = value
1165 .as_list()
1166 .with_context(|_| DataTypeSnafu {
1167 msg: "Accum's first element should be a list",
1168 })?
1169 .context(InternalSnafu {
1170 reason: "Accum's first element should be a list",
1171 })?;
1172 let ret: Vec<usize> = list
1173 .items()
1174 .iter()
1175 .map(|v| {
1176 v.as_u64().map(|j| j as usize).context(InternalSnafu {
1177 reason: "End offset should be a list of u64",
1178 })
1179 })
1180 .try_collect()?;
1181 ensure!(
1182 ret.len() == expected_len,
1183 InternalSnafu {
1184 reason: "Offset List should have the same length as full_aggrs"
1185 }
1186 );
1187 Ok(ret)
1188 } else {
1189 Ok(vec![1usize; expected_len])
1190 }?;
1191 let accum_ranges = (0..expected_len)
1192 .map(|idx| {
1193 if idx == 0 {
1194 debug_assert!(
1196 offset_end[0] >= 1,
1197 "Offset should be at least 1: {:?}",
1198 &offset_end
1199 );
1200 1..offset_end[0]
1201 } else {
1202 offset_end[idx - 1]..offset_end[idx]
1203 }
1204 })
1205 .collect_vec();
1206 Ok(accum_ranges)
1207}
1208
1209#[cfg(test)]
1212mod test {
1213
1214 use std::time::Duration;
1215
1216 use common_time::Timestamp;
1217 use datatypes::data_type::{ConcreteDataType, ConcreteDataType as CDT};
1218 use dfir_rs::scheduled::graph::Dfir;
1219
1220 use super::*;
1221 use crate::compute::render::test::{get_output_handle, harness_test_ctx, run_and_check};
1222 use crate::compute::state::DataflowState;
1223 use crate::expr::{
1224 self, AggregateExpr, AggregateFunc, BinaryFunc, GlobalId, MapFilterProject, UnaryFunc,
1225 };
1226 use crate::plan::Plan;
1227 use crate::repr::{ColumnType, RelationType};
1228
1229 #[test]
1233 fn test_tumble_group_by() {
1234 let mut df = Dfir::new();
1235 let mut state = DataflowState::default();
1236 let mut ctx = harness_test_ctx(&mut df, &mut state);
1237 const START: i64 = 1625097600000;
1238 let rows = vec![
1239 (1u32, START + 1000),
1240 (2u32, START + 1500),
1241 (3u32, START + 2000),
1242 (1u32, START + 2500),
1243 (2u32, START + 3000),
1244 (3u32, START + 3500),
1245 ];
1246 let rows = rows
1247 .into_iter()
1248 .map(|(number, ts)| {
1249 (
1250 Row::new(vec![number.into(), Timestamp::new_millisecond(ts).into()]),
1251 1,
1252 1,
1253 )
1254 })
1255 .collect_vec();
1256
1257 let collection = ctx.render_constant(rows.clone());
1258 ctx.insert_global(GlobalId::User(1), collection);
1259
1260 let aggr_expr = AggregateExpr {
1261 func: AggregateFunc::SumUInt32,
1262 expr: ScalarExpr::Column(0),
1263 distinct: false,
1264 };
1265 let expected = TypedPlan {
1266 schema: RelationType::new(vec![
1267 ColumnType::new(CDT::uint64_datatype(), true), ColumnType::new(CDT::timestamp_millisecond_datatype(), false), ColumnType::new(CDT::timestamp_millisecond_datatype(), false), ])
1271 .into_unnamed(),
1272 plan: Plan::Mfp {
1277 input: Box::new(
1278 Plan::Reduce {
1279 input: Box::new(
1280 Plan::Get {
1281 id: crate::expr::Id::Global(GlobalId::User(1)),
1282 }
1283 .with_types(
1284 RelationType::new(vec![
1285 ColumnType::new(ConcreteDataType::uint32_datatype(), false),
1286 ColumnType::new(
1287 ConcreteDataType::timestamp_millisecond_datatype(),
1288 false,
1289 ),
1290 ])
1291 .into_unnamed(),
1292 ),
1293 ),
1294 key_val_plan: KeyValPlan {
1295 key_plan: MapFilterProject::new(2)
1296 .map(vec![
1297 ScalarExpr::Column(1).call_unary(
1298 UnaryFunc::TumbleWindowFloor {
1299 window_size: Duration::from_nanos(1_000_000_000),
1300 start_time: Some(Timestamp::new_millisecond(
1301 1625097600000,
1302 )),
1303 },
1304 ),
1305 ScalarExpr::Column(1).call_unary(
1306 UnaryFunc::TumbleWindowCeiling {
1307 window_size: Duration::from_nanos(1_000_000_000),
1308 start_time: Some(Timestamp::new_millisecond(
1309 1625097600000,
1310 )),
1311 },
1312 ),
1313 ])
1314 .unwrap()
1315 .project(vec![2, 3])
1316 .unwrap()
1317 .into_safe(),
1318 val_plan: MapFilterProject::new(2)
1319 .project(vec![0, 1])
1320 .unwrap()
1321 .into_safe(),
1322 },
1323 reduce_plan: ReducePlan::Accumulable(AccumulablePlan {
1324 full_aggrs: vec![aggr_expr.clone()],
1325 simple_aggrs: vec![AggrWithIndex::new(aggr_expr.clone(), 0, 0)],
1326 distinct_aggrs: vec![],
1327 }),
1328 }
1329 .with_types(
1330 RelationType::new(vec![
1331 ColumnType::new(CDT::timestamp_millisecond_datatype(), false), ColumnType::new(CDT::timestamp_millisecond_datatype(), false), ColumnType::new(CDT::uint64_datatype(), true), ])
1335 .with_key(vec![1])
1336 .with_time_index(Some(0))
1337 .into_unnamed(),
1338 ),
1339 ),
1340 mfp: MapFilterProject::new(3)
1341 .map(vec![
1342 ScalarExpr::Column(2),
1343 ScalarExpr::Column(3),
1344 ScalarExpr::Column(0),
1345 ScalarExpr::Column(1),
1346 ])
1347 .unwrap()
1348 .project(vec![4, 5, 6])
1349 .unwrap(),
1350 },
1351 };
1352
1353 let bundle = ctx.render_plan(expected).unwrap();
1354
1355 let output = get_output_handle(&mut ctx, bundle);
1356 drop(ctx);
1357 let expected = BTreeMap::from([(
1358 1,
1359 vec![
1360 (
1361 Row::new(vec![
1362 3u64.into(),
1363 Timestamp::new_millisecond(START + 1000).into(),
1364 Timestamp::new_millisecond(START + 2000).into(),
1365 ]),
1366 1,
1367 1,
1368 ),
1369 (
1370 Row::new(vec![
1371 4u64.into(),
1372 Timestamp::new_millisecond(START + 2000).into(),
1373 Timestamp::new_millisecond(START + 3000).into(),
1374 ]),
1375 1,
1376 1,
1377 ),
1378 (
1379 Row::new(vec![
1380 5u64.into(),
1381 Timestamp::new_millisecond(START + 3000).into(),
1382 Timestamp::new_millisecond(START + 4000).into(),
1383 ]),
1384 1,
1385 1,
1386 ),
1387 ],
1388 )]);
1389 run_and_check(&mut state, &mut df, 1..2, expected, output);
1390 }
1391
1392 #[test]
1394 fn test_avg_eval() {
1395 let mut df = Dfir::new();
1396 let mut state = DataflowState::default();
1397 let mut ctx = harness_test_ctx(&mut df, &mut state);
1398
1399 let rows = vec![
1400 (Row::new(vec![1u32.into()]), 1, 1),
1401 (Row::new(vec![2u32.into()]), 1, 1),
1402 (Row::new(vec![3u32.into()]), 1, 1),
1403 (Row::new(vec![1u32.into()]), 1, 1),
1404 (Row::new(vec![2u32.into()]), 1, 1),
1405 (Row::new(vec![3u32.into()]), 1, 1),
1406 ];
1407 let collection = ctx.render_constant(rows.clone());
1408 ctx.insert_global(GlobalId::User(1), collection);
1409
1410 let aggr_exprs = vec![
1411 AggregateExpr {
1412 func: AggregateFunc::SumUInt32,
1413 expr: ScalarExpr::Column(0),
1414 distinct: false,
1415 },
1416 AggregateExpr {
1417 func: AggregateFunc::Count,
1418 expr: ScalarExpr::Column(0),
1419 distinct: false,
1420 },
1421 ];
1422 let avg_expr = ScalarExpr::If {
1423 cond: Box::new(ScalarExpr::Column(1).call_binary(
1424 ScalarExpr::Literal(Value::from(0u32), CDT::int64_datatype()),
1425 BinaryFunc::NotEq,
1426 )),
1427 then: Box::new(ScalarExpr::Column(0).call_binary(
1428 ScalarExpr::Column(1).call_unary(UnaryFunc::Cast(CDT::uint64_datatype())),
1429 BinaryFunc::DivUInt64,
1430 )),
1431 els: Box::new(ScalarExpr::Literal(Value::Null, CDT::uint64_datatype())),
1432 };
1433 let expected = TypedPlan {
1434 schema: RelationType::new(vec![ColumnType::new(CDT::uint64_datatype(), true)])
1435 .into_unnamed(),
1436 plan: Plan::Mfp {
1437 input: Box::new(
1438 Plan::Reduce {
1439 input: Box::new(
1440 Plan::Get {
1441 id: crate::expr::Id::Global(GlobalId::User(1)),
1442 }
1443 .with_types(
1444 RelationType::new(vec![ColumnType::new(
1445 ConcreteDataType::int64_datatype(),
1446 false,
1447 )])
1448 .into_unnamed(),
1449 ),
1450 ),
1451 key_val_plan: KeyValPlan {
1452 key_plan: MapFilterProject::new(1)
1453 .project(vec![])
1454 .unwrap()
1455 .into_safe(),
1456 val_plan: MapFilterProject::new(1)
1457 .project(vec![0])
1458 .unwrap()
1459 .into_safe(),
1460 },
1461 reduce_plan: ReducePlan::Accumulable(AccumulablePlan {
1462 full_aggrs: aggr_exprs.clone(),
1463 simple_aggrs: vec![
1464 AggrWithIndex::new(aggr_exprs[0].clone(), 0, 0),
1465 AggrWithIndex::new(aggr_exprs[1].clone(), 0, 1),
1466 ],
1467 distinct_aggrs: vec![],
1468 }),
1469 }
1470 .with_types(
1471 RelationType::new(vec![
1472 ColumnType::new(ConcreteDataType::uint32_datatype(), true),
1473 ColumnType::new(ConcreteDataType::int64_datatype(), true),
1474 ])
1475 .into_unnamed(),
1476 ),
1477 ),
1478 mfp: MapFilterProject::new(2)
1479 .map(vec![
1480 avg_expr,
1481 ScalarExpr::Column(2),
1483 ])
1484 .unwrap()
1485 .project(vec![3])
1486 .unwrap(),
1487 },
1488 };
1489
1490 let bundle = ctx.render_plan(expected).unwrap();
1491
1492 let output = get_output_handle(&mut ctx, bundle);
1493 drop(ctx);
1494 let expected = BTreeMap::from([(1, vec![(Row::new(vec![2u64.into()]), 1, 1)])]);
1495 run_and_check(&mut state, &mut df, 1..2, expected, output);
1496 }
1497
1498 #[test]
1505 fn test_basic_distinct() {
1506 let mut df = Dfir::new();
1507 let mut state = DataflowState::default();
1508 let mut ctx = harness_test_ctx(&mut df, &mut state);
1509
1510 let rows = vec![
1511 (Row::new(vec![1i64.into()]), 1, 1),
1512 (Row::new(vec![2i64.into()]), 2, 1),
1513 (Row::new(vec![3i64.into()]), 3, 1),
1514 (Row::new(vec![1i64.into()]), 4, 1),
1515 (Row::new(vec![2i64.into()]), 5, 1),
1516 (Row::new(vec![3i64.into()]), 6, 1),
1517 ];
1518 let collection = ctx.render_constant(rows.clone());
1519 ctx.insert_global(GlobalId::User(1), collection);
1520 let input_plan = Plan::Get {
1521 id: expr::Id::Global(GlobalId::User(1)),
1522 };
1523 let typ = RelationType::new(vec![ColumnType::new_nullable(
1524 ConcreteDataType::int64_datatype(),
1525 )]);
1526 let key_val_plan = KeyValPlan {
1527 key_plan: MapFilterProject::new(1).project([0]).unwrap().into_safe(),
1528 val_plan: MapFilterProject::new(1).project([]).unwrap().into_safe(),
1529 };
1530 let reduce_plan = ReducePlan::Distinct;
1531 let bundle = ctx
1532 .render_reduce(
1533 Box::new(input_plan.with_types(typ.into_unnamed())),
1534 key_val_plan,
1535 reduce_plan,
1536 RelationType::empty(),
1537 )
1538 .unwrap();
1539
1540 let output = get_output_handle(&mut ctx, bundle);
1541 drop(ctx);
1542 let expected = BTreeMap::from([(
1543 6,
1544 vec![
1545 (Row::new(vec![1i64.into()]), 1, 1),
1546 (Row::new(vec![2i64.into()]), 2, 1),
1547 (Row::new(vec![3i64.into()]), 3, 1),
1548 ],
1549 )]);
1550 run_and_check(&mut state, &mut df, 6..7, expected, output);
1551 }
1552
1553 #[test]
1561 fn test_basic_batch_reduce_accum() {
1562 let mut df = Dfir::new();
1563 let mut state = DataflowState::default();
1564 let now = state.current_time_ref();
1565 let mut ctx = harness_test_ctx(&mut df, &mut state);
1566
1567 let rows = vec![
1568 (Row::new(vec![Value::Null]), -1, 1),
1569 (Row::new(vec![1i64.into()]), 0, 1),
1570 (Row::new(vec![Value::Null]), 1, 1),
1571 (Row::new(vec![2i64.into()]), 2, 1),
1572 (Row::new(vec![3i64.into()]), 3, 1),
1573 (Row::new(vec![1i64.into()]), 4, 1),
1574 (Row::new(vec![2i64.into()]), 5, 1),
1575 (Row::new(vec![3i64.into()]), 6, 1),
1576 ];
1577 let input_plan = Plan::Constant { rows: rows.clone() };
1578
1579 let typ = RelationType::new(vec![ColumnType::new_nullable(
1580 ConcreteDataType::int64_datatype(),
1581 )]);
1582 let key_val_plan = KeyValPlan {
1583 key_plan: MapFilterProject::new(1).project([]).unwrap().into_safe(),
1584 val_plan: MapFilterProject::new(1).project([0]).unwrap().into_safe(),
1585 };
1586
1587 let simple_aggrs = vec![AggrWithIndex::new(
1588 AggregateExpr {
1589 func: AggregateFunc::SumInt64,
1590 expr: ScalarExpr::Column(0),
1591 distinct: false,
1592 },
1593 0,
1594 0,
1595 )];
1596 let accum_plan = AccumulablePlan {
1597 full_aggrs: vec![AggregateExpr {
1598 func: AggregateFunc::SumInt64,
1599 expr: ScalarExpr::Column(0),
1600 distinct: false,
1601 }],
1602 simple_aggrs,
1603 distinct_aggrs: vec![],
1604 };
1605
1606 let reduce_plan = ReducePlan::Accumulable(accum_plan);
1607 let bundle = ctx
1608 .render_reduce_batch(
1609 Box::new(input_plan.with_types(typ.into_unnamed())),
1610 &key_val_plan,
1611 &reduce_plan,
1612 &RelationType::new(vec![ColumnType::new(CDT::int64_datatype(), true)]),
1613 )
1614 .unwrap();
1615
1616 {
1617 let now_inner = now.clone();
1618 let expected = BTreeMap::<i64, Vec<i64>>::from([
1619 (-1, vec![]),
1620 (0, vec![1i64]),
1621 (1, vec![1i64]),
1622 (2, vec![3i64]),
1623 (3, vec![6i64]),
1624 (4, vec![7i64]),
1625 (5, vec![9i64]),
1626 (6, vec![12i64]),
1627 ]);
1628 let collection = bundle.collection;
1629 ctx.df
1630 .add_subgraph_sink("test_sink", collection.into_inner(), move |_ctx, recv| {
1631 let now = *now_inner.borrow();
1632 let data = recv.take_inner();
1633 let res = data.into_iter().flat_map(|v| v.into_iter()).collect_vec();
1634
1635 if let Some(expected) = expected.get(&now) {
1636 let batch = expected.iter().map(|v| Value::from(*v)).collect_vec();
1637 let batch = Batch::try_from_rows_with_types(
1638 vec![batch.into()],
1639 &[CDT::int64_datatype()],
1640 )
1641 .unwrap();
1642 assert_eq!(res.first(), Some(&batch));
1643 }
1644 });
1645 drop(ctx);
1646
1647 for now in 1..7 {
1648 state.set_current_ts(now);
1649 state.run_available_with_schedule(&mut df);
1650 if !state.get_err_collector().is_empty() {
1651 panic!(
1652 "Errors occur: {:?}",
1653 state.get_err_collector().get_all_blocking()
1654 )
1655 }
1656 }
1657 }
1658 }
1659
1660 #[test]
1667 fn test_basic_reduce_accum() {
1668 let mut df = Dfir::new();
1669 let mut state = DataflowState::default();
1670 let mut ctx = harness_test_ctx(&mut df, &mut state);
1671
1672 let rows = vec![
1673 (Row::new(vec![1i64.into()]), 1, 1),
1674 (Row::new(vec![2i64.into()]), 2, 1),
1675 (Row::new(vec![3i64.into()]), 3, 1),
1676 (Row::new(vec![1i64.into()]), 4, 1),
1677 (Row::new(vec![2i64.into()]), 5, 1),
1678 (Row::new(vec![3i64.into()]), 6, 1),
1679 ];
1680 let collection = ctx.render_constant(rows.clone());
1681 ctx.insert_global(GlobalId::User(1), collection);
1682 let input_plan = Plan::Get {
1683 id: expr::Id::Global(GlobalId::User(1)),
1684 };
1685 let typ = RelationType::new(vec![ColumnType::new_nullable(
1686 ConcreteDataType::int64_datatype(),
1687 )]);
1688 let key_val_plan = KeyValPlan {
1689 key_plan: MapFilterProject::new(1).project([]).unwrap().into_safe(),
1690 val_plan: MapFilterProject::new(1).project([0]).unwrap().into_safe(),
1691 };
1692
1693 let simple_aggrs = vec![AggrWithIndex::new(
1694 AggregateExpr {
1695 func: AggregateFunc::SumInt64,
1696 expr: ScalarExpr::Column(0),
1697 distinct: false,
1698 },
1699 0,
1700 0,
1701 )];
1702 let accum_plan = AccumulablePlan {
1703 full_aggrs: vec![AggregateExpr {
1704 func: AggregateFunc::SumInt64,
1705 expr: ScalarExpr::Column(0),
1706 distinct: false,
1707 }],
1708 simple_aggrs,
1709 distinct_aggrs: vec![],
1710 };
1711
1712 let reduce_plan = ReducePlan::Accumulable(accum_plan);
1713 let bundle = ctx
1714 .render_reduce(
1715 Box::new(input_plan.with_types(typ.into_unnamed())),
1716 key_val_plan,
1717 reduce_plan,
1718 RelationType::empty(),
1719 )
1720 .unwrap();
1721
1722 let output = get_output_handle(&mut ctx, bundle);
1723 drop(ctx);
1724 let expected = BTreeMap::from([
1725 (1, vec![(Row::new(vec![1i64.into()]), 1, 1)]),
1726 (2, vec![(Row::new(vec![3i64.into()]), 2, 1)]),
1727 (3, vec![(Row::new(vec![6i64.into()]), 3, 1)]),
1728 (4, vec![(Row::new(vec![7i64.into()]), 4, 1)]),
1729 (5, vec![(Row::new(vec![9i64.into()]), 5, 1)]),
1730 (6, vec![(Row::new(vec![12i64.into()]), 6, 1)]),
1731 ]);
1732 run_and_check(&mut state, &mut df, 1..7, expected, output);
1733 }
1734
1735 #[test]
1744 fn test_delete_reduce_distinct_accum() {
1745 let mut df = Dfir::new();
1746 let mut state = DataflowState::default();
1747 let mut ctx = harness_test_ctx(&mut df, &mut state);
1748
1749 let rows = vec![
1750 (Row::new(vec![1i64.into()]), 1, 1),
1752 (Row::new(vec![1i64.into()]), 1, -1),
1753 (Row::new(vec![1i64.into()]), 2, 1),
1755 (Row::new(vec![1i64.into()]), 3, -1),
1756 (Row::new(vec![1i64.into()]), 4, 1),
1758 (Row::new(vec![1i64.into()]), 4, -1),
1759 (Row::new(vec![1i64.into()]), 4, 1),
1760 ];
1761 let collection = ctx.render_constant(rows.clone());
1762 ctx.insert_global(GlobalId::User(1), collection);
1763 let input_plan = Plan::Get {
1764 id: expr::Id::Global(GlobalId::User(1)),
1765 };
1766 let typ = RelationType::new(vec![ColumnType::new_nullable(
1767 ConcreteDataType::int64_datatype(),
1768 )]);
1769 let key_val_plan = KeyValPlan {
1770 key_plan: MapFilterProject::new(1).project([]).unwrap().into_safe(),
1771 val_plan: MapFilterProject::new(1).project([0]).unwrap().into_safe(),
1772 };
1773
1774 let distinct_aggrs = vec![AggrWithIndex::new(
1775 AggregateExpr {
1776 func: AggregateFunc::SumInt64,
1777 expr: ScalarExpr::Column(0),
1778 distinct: false,
1779 },
1780 0,
1781 0,
1782 )];
1783 let accum_plan = AccumulablePlan {
1784 full_aggrs: vec![AggregateExpr {
1785 func: AggregateFunc::SumInt64,
1786 expr: ScalarExpr::Column(0),
1787 distinct: true,
1788 }],
1789 simple_aggrs: vec![],
1790 distinct_aggrs,
1791 };
1792
1793 let reduce_plan = ReducePlan::Accumulable(accum_plan);
1794 let bundle = ctx
1795 .render_reduce(
1796 Box::new(input_plan.with_types(typ.into_unnamed())),
1797 key_val_plan,
1798 reduce_plan,
1799 RelationType::empty(),
1800 )
1801 .unwrap();
1802
1803 let output = get_output_handle(&mut ctx, bundle);
1804 drop(ctx);
1805 let expected = BTreeMap::from([
1806 (1, vec![(Row::new(vec![0i64.into()]), 1, 1)]),
1807 (2, vec![(Row::new(vec![1i64.into()]), 2, 1)]),
1808 (3, vec![(Row::new(vec![0i64.into()]), 3, 1)]),
1809 (4, vec![(Row::new(vec![1i64.into()]), 4, 1)]),
1810 ]);
1811 run_and_check(&mut state, &mut df, 1..7, expected, output);
1812 }
1813
1814 #[test]
1823 fn test_basic_reduce_distinct_accum() {
1824 let mut df = Dfir::new();
1825 let mut state = DataflowState::default();
1826 let mut ctx = harness_test_ctx(&mut df, &mut state);
1827
1828 let rows = vec![
1829 (Row::new(vec![1i64.into()]), 1, 1),
1830 (Row::new(vec![1i64.into()]), 1, -1),
1831 (Row::new(vec![2i64.into()]), 2, 1),
1832 (Row::new(vec![3i64.into()]), 3, 1),
1833 (Row::new(vec![1i64.into()]), 4, 1),
1834 (Row::new(vec![2i64.into()]), 5, 1),
1835 (Row::new(vec![3i64.into()]), 6, 1),
1836 (Row::new(vec![1i64.into()]), 7, 1),
1837 ];
1838 let collection = ctx.render_constant(rows.clone());
1839 ctx.insert_global(GlobalId::User(1), collection);
1840 let input_plan = Plan::Get {
1841 id: expr::Id::Global(GlobalId::User(1)),
1842 };
1843 let typ = RelationType::new(vec![ColumnType::new_nullable(
1844 ConcreteDataType::int64_datatype(),
1845 )]);
1846 let key_val_plan = KeyValPlan {
1847 key_plan: MapFilterProject::new(1).project([]).unwrap().into_safe(),
1848 val_plan: MapFilterProject::new(1).project([0]).unwrap().into_safe(),
1849 };
1850
1851 let distinct_aggrs = vec![AggrWithIndex::new(
1852 AggregateExpr {
1853 func: AggregateFunc::SumInt64,
1854 expr: ScalarExpr::Column(0),
1855 distinct: false,
1856 },
1857 0,
1858 0,
1859 )];
1860 let accum_plan = AccumulablePlan {
1861 full_aggrs: vec![AggregateExpr {
1862 func: AggregateFunc::SumInt64,
1863 expr: ScalarExpr::Column(0),
1864 distinct: true,
1865 }],
1866 simple_aggrs: vec![],
1867 distinct_aggrs,
1868 };
1869
1870 let reduce_plan = ReducePlan::Accumulable(accum_plan);
1871 let bundle = ctx
1872 .render_reduce(
1873 Box::new(input_plan.with_types(typ.into_unnamed())),
1874 key_val_plan,
1875 reduce_plan,
1876 RelationType::empty(),
1877 )
1878 .unwrap();
1879
1880 let output = get_output_handle(&mut ctx, bundle);
1881 drop(ctx);
1882 let expected = BTreeMap::from([
1883 (1, vec![(Row::new(vec![0i64.into()]), 1, 1)]),
1884 (2, vec![(Row::new(vec![2i64.into()]), 2, 1)]),
1885 (3, vec![(Row::new(vec![5i64.into()]), 3, 1)]),
1886 (4, vec![(Row::new(vec![6i64.into()]), 4, 1)]),
1887 (5, vec![(Row::new(vec![6i64.into()]), 5, 1)]),
1888 (6, vec![(Row::new(vec![6i64.into()]), 6, 1)]),
1889 (7, vec![(Row::new(vec![6i64.into()]), 7, 1)]),
1890 ]);
1891 run_and_check(&mut state, &mut df, 1..7, expected, output);
1892 }
1893
1894 #[test]
1901 fn test_composite_reduce_distinct_accum() {
1902 let mut df = Dfir::new();
1903 let mut state = DataflowState::default();
1904 let mut ctx = harness_test_ctx(&mut df, &mut state);
1905
1906 let rows = vec![
1907 (Row::new(vec![1i64.into()]), 1, 1),
1908 (Row::new(vec![2i64.into()]), 2, 1),
1909 (Row::new(vec![3i64.into()]), 3, 1),
1910 (Row::new(vec![1i64.into()]), 4, 1),
1911 (Row::new(vec![2i64.into()]), 5, 1),
1912 (Row::new(vec![3i64.into()]), 6, 1),
1913 (Row::new(vec![1i64.into()]), 7, 1),
1914 ];
1915 let collection = ctx.render_constant(rows.clone());
1916 ctx.insert_global(GlobalId::User(1), collection);
1917 let input_plan = Plan::Get {
1918 id: expr::Id::Global(GlobalId::User(1)),
1919 };
1920 let typ = RelationType::new(vec![ColumnType::new_nullable(
1921 ConcreteDataType::int64_datatype(),
1922 )]);
1923 let key_val_plan = KeyValPlan {
1924 key_plan: MapFilterProject::new(1).project([]).unwrap().into_safe(),
1925 val_plan: MapFilterProject::new(1).project([0]).unwrap().into_safe(),
1926 };
1927 let simple_aggrs = vec![AggrWithIndex::new(
1928 AggregateExpr {
1929 func: AggregateFunc::SumInt64,
1930 expr: ScalarExpr::Column(0),
1931 distinct: false,
1932 },
1933 0,
1934 0,
1935 )];
1936 let distinct_aggrs = vec![AggrWithIndex::new(
1937 AggregateExpr {
1938 func: AggregateFunc::SumInt64,
1939 expr: ScalarExpr::Column(0),
1940 distinct: true,
1941 },
1942 0,
1943 1,
1944 )];
1945 let accum_plan = AccumulablePlan {
1946 full_aggrs: vec![
1947 AggregateExpr {
1948 func: AggregateFunc::SumInt64,
1949 expr: ScalarExpr::Column(0),
1950 distinct: false,
1951 },
1952 AggregateExpr {
1953 func: AggregateFunc::SumInt64,
1954 expr: ScalarExpr::Column(0),
1955 distinct: true,
1956 },
1957 ],
1958 simple_aggrs,
1959 distinct_aggrs,
1960 };
1961
1962 let reduce_plan = ReducePlan::Accumulable(accum_plan);
1963 let bundle = ctx
1964 .render_reduce(
1965 Box::new(input_plan.with_types(typ.into_unnamed())),
1966 key_val_plan,
1967 reduce_plan,
1968 RelationType::empty(),
1969 )
1970 .unwrap();
1971
1972 let output = get_output_handle(&mut ctx, bundle);
1973 drop(ctx);
1974 let expected = BTreeMap::from([
1975 (1, vec![(Row::new(vec![1i64.into(), 1i64.into()]), 1, 1)]),
1976 (2, vec![(Row::new(vec![3i64.into(), 3i64.into()]), 2, 1)]),
1977 (3, vec![(Row::new(vec![6i64.into(), 6i64.into()]), 3, 1)]),
1978 (4, vec![(Row::new(vec![7i64.into(), 6i64.into()]), 4, 1)]),
1979 (5, vec![(Row::new(vec![9i64.into(), 6i64.into()]), 5, 1)]),
1980 (6, vec![(Row::new(vec![12i64.into(), 6i64.into()]), 6, 1)]),
1981 (7, vec![(Row::new(vec![13i64.into(), 6i64.into()]), 7, 1)]),
1982 ]);
1983 run_and_check(&mut state, &mut df, 1..7, expected, output);
1984 }
1985}