1use std::collections::{BTreeMap, BTreeSet};
16use std::ops::Range;
17use std::sync::Arc;
18
19use arrow::array::new_null_array;
20use common_telemetry::trace;
21use datatypes::data_type::ConcreteDataType;
22use datatypes::prelude::DataType;
23use datatypes::value::{ListValue, Value};
24use datatypes::vectors::{BooleanVector, NullVector};
25use dfir_rs::scheduled::graph_ext::GraphExt;
26use itertools::Itertools;
27use snafu::{ensure, OptionExt, ResultExt};
28
29use crate::compute::render::{Context, SubgraphArg};
30use crate::compute::types::{Arranged, Collection, CollectionBundle, ErrCollector, Toff};
31use crate::error::{Error, NotImplementedSnafu, PlanSnafu};
32use crate::expr::error::{ArrowSnafu, DataAlreadyExpiredSnafu, DataTypeSnafu, InternalSnafu};
33use crate::expr::{Accum, Accumulator, Batch, EvalError, ScalarExpr, VectorDiff};
34use crate::plan::{AccumulablePlan, AggrWithIndex, KeyValPlan, ReducePlan, TypedPlan};
35use crate::repr::{self, DiffRow, KeyValDiffRow, RelationType, Row};
36use crate::utils::{ArrangeHandler, ArrangeReader, ArrangeWriter, KeyExpiryManager};
37
38impl Context<'_, '_> {
39 const REDUCE_BATCH: &'static str = "reduce_batch";
40 #[allow(clippy::mutable_key_type)]
44 pub fn render_reduce_batch(
45 &mut self,
46 input: Box<TypedPlan>,
47 key_val_plan: &KeyValPlan,
48 reduce_plan: &ReducePlan,
49 output_type: &RelationType,
50 ) -> Result<CollectionBundle<Batch>, Error> {
51 let accum_plan = if let ReducePlan::Accumulable(accum_plan) = reduce_plan {
52 if !accum_plan.distinct_aggrs.is_empty() {
53 NotImplementedSnafu {
54 reason: "Distinct aggregation is not supported in batch mode",
55 }
56 .fail()?
57 }
58 accum_plan.clone()
59 } else {
60 NotImplementedSnafu {
61 reason: "Only accumulable reduce plan is supported in batch mode",
62 }
63 .fail()?
64 };
65
66 let input = self.render_plan_batch(*input)?;
67
68 let output_key_arity = key_val_plan.key_plan.output_arity();
73
74 let arrange_handler = self.compute_state.new_arrange(None);
76
77 if let (Some(time_index), Some(expire_after)) =
78 (output_type.time_index, self.compute_state.expire_after())
79 {
80 let expire_man =
81 KeyExpiryManager::new(Some(expire_after), Some(ScalarExpr::Column(time_index)));
82 arrange_handler.write().set_expire_state(expire_man);
83 }
84
85 let arrange_handler_inner = arrange_handler.clone_full_arrange().context(PlanSnafu {
87 reason: "No write is expected at this point",
88 })?;
89 let key_val_plan = key_val_plan.clone();
90
91 let output_type = output_type.clone();
92
93 let now = self.compute_state.current_time_ref();
94
95 let err_collector = self.err_collector.clone();
96
97 let scheduler = self.compute_state.get_scheduler();
99
100 let scheduler_inner = scheduler.clone();
101
102 let (out_send_port, out_recv_port) =
103 self.df.make_edge::<_, Toff<Batch>>(Self::REDUCE_BATCH);
104
105 let subgraph = self.df.add_subgraph_in_out(
106 Self::REDUCE_BATCH,
107 input.collection.into_inner(),
108 out_send_port,
109 move |_ctx, recv, send| {
110 let now = *(now.borrow());
111 let arrange = arrange_handler_inner.clone();
112 let src_data = recv
114 .take_inner()
115 .into_iter()
116 .flat_map(|v| v.into_iter())
117 .collect_vec();
118
119 reduce_batch_subgraph(
120 &arrange,
121 src_data,
122 &key_val_plan,
123 &accum_plan,
124 &output_type,
125 SubgraphArg {
126 now,
127 err_collector: &err_collector,
128 scheduler: &scheduler_inner,
129 send,
130 },
131 )
132 },
133 );
134
135 scheduler.set_cur_subgraph(subgraph);
136
137 let arranged = BTreeMap::from([(
139 (0..output_key_arity).map(ScalarExpr::Column).collect_vec(),
140 Arranged::new(arrange_handler),
141 )]);
142
143 let bundle = CollectionBundle {
144 collection: Collection::from_port(out_recv_port),
145 arranged,
146 };
147 Ok(bundle)
148 }
149
150 const REDUCE: &'static str = "reduce";
151 #[allow(clippy::mutable_key_type)]
154 pub fn render_reduce(
155 &mut self,
156 input: Box<TypedPlan>,
157 key_val_plan: KeyValPlan,
158 reduce_plan: ReducePlan,
159 output_type: RelationType,
160 ) -> Result<CollectionBundle, Error> {
161 let input = self.render_plan(*input)?;
162 let output_key_arity = key_val_plan.key_plan.output_arity();
167
168 let arrange_handler = self.compute_state.new_arrange(None);
170
171 if let (Some(time_index), Some(expire_after)) =
172 (output_type.time_index, self.compute_state.expire_after())
173 {
174 let expire_man =
175 KeyExpiryManager::new(Some(expire_after), Some(ScalarExpr::Column(time_index)));
176 arrange_handler.write().set_expire_state(expire_man);
177 }
178
179 let arrange_handler_inner = arrange_handler.clone_full_arrange().context(PlanSnafu {
181 reason: "No write is expected at this point",
182 })?;
183
184 let distinct_input = self.add_accum_distinct_input_arrange(&reduce_plan);
185
186 let reduce_arrange = ReduceArrange {
187 output_arrange: arrange_handler_inner,
188 distinct_input,
189 };
190
191 let now = self.compute_state.current_time_ref();
192
193 let err_collector = self.err_collector.clone();
194
195 let scheduler = self.compute_state.get_scheduler();
197 let scheduler_inner = scheduler.clone();
198
199 let (out_send_port, out_recv_port) = self.df.make_edge::<_, Toff>(Self::REDUCE);
200
201 let subgraph = self.df.add_subgraph_in_out(
202 Self::REDUCE,
203 input.collection.into_inner(),
204 out_send_port,
205 move |_ctx, recv, send| {
206 let data = recv
208 .take_inner()
209 .into_iter()
210 .flat_map(|v| v.into_iter())
211 .collect_vec();
212
213 reduce_subgraph(
214 &reduce_arrange,
215 data,
216 &key_val_plan,
217 &reduce_plan,
218 SubgraphArg {
219 now: *now.borrow(),
220 err_collector: &err_collector,
221 scheduler: &scheduler_inner,
222 send,
223 },
224 );
225 },
226 );
227
228 scheduler.set_cur_subgraph(subgraph);
229
230 let arranged = BTreeMap::from([(
232 (0..output_key_arity).map(ScalarExpr::Column).collect_vec(),
233 Arranged::new(arrange_handler),
234 )]);
235
236 let bundle = CollectionBundle {
237 collection: Collection::from_port(out_recv_port),
238 arranged,
239 };
240 Ok(bundle)
241 }
242
243 fn add_accum_distinct_input_arrange(
250 &mut self,
251 reduce_plan: &ReducePlan,
252 ) -> Option<Vec<ArrangeHandler>> {
253 match reduce_plan {
254 ReducePlan::Distinct => None,
255 ReducePlan::Accumulable(AccumulablePlan { distinct_aggrs, .. }) => {
256 (!distinct_aggrs.is_empty()).then(|| {
257 std::iter::repeat_with(|| {
258 let arr = self.compute_state.new_arrange(None);
259 arr.set_full_arrangement(true);
260 arr
261 })
262 .take(distinct_aggrs.len())
263 .collect()
264 })
265 }
266 }
267 }
268}
269
270fn from_accum_values_to_live_accums(
271 accums: Vec<Value>,
272 len: usize,
273) -> Result<Vec<Vec<Value>>, EvalError> {
274 let accum_ranges = from_val_to_slice_idx(accums.first().cloned(), len)?;
275 let mut accum_list = vec![];
276 for range in accum_ranges.iter() {
277 accum_list.push(accums.get(range.clone()).unwrap_or_default().to_vec());
278 }
279 Ok(accum_list)
280}
281
282pub struct ReduceArrange {
284 output_arrange: ArrangeHandler,
286 distinct_input: Option<Vec<ArrangeHandler>>,
289}
290
291fn batch_split_by_key_val(
292 batch: &Batch,
293 key_val_plan: &KeyValPlan,
294 err_collector: &ErrCollector,
295) -> (Batch, Batch) {
296 let row_count = batch.row_count();
297 let mut key_batch = Batch::empty();
298 let mut val_batch = Batch::empty();
299
300 err_collector.run(|| {
301 if key_val_plan.key_plan.output_arity() != 0 {
302 key_batch = key_val_plan.key_plan.eval_batch_into(&mut batch.clone())?;
303 }
304
305 if key_val_plan.val_plan.output_arity() != 0 {
306 val_batch = key_val_plan.val_plan.eval_batch_into(&mut batch.clone())?;
307 }
308 Ok(())
309 });
310
311 if key_batch.row_count() == 0 && key_batch.column_count() == 0 {
313 key_batch.set_row_count(row_count);
314 }
315
316 if val_batch.row_count() == 0 && val_batch.column_count() == 0 {
317 val_batch.set_row_count(row_count);
318 }
319
320 (key_batch, val_batch)
321}
322
323fn split_rows_to_key_val(
325 rows: impl IntoIterator<Item = DiffRow>,
326 key_val_plan: KeyValPlan,
327 err_collector: ErrCollector,
328) -> impl IntoIterator<Item = KeyValDiffRow> {
329 let mut row_buf = Row::new(vec![]);
330 rows.into_iter().filter_map(
331 move |(mut row, sys_time, diff): DiffRow| -> Option<KeyValDiffRow> {
332 err_collector.run(|| {
333 let len = row.len();
334 if let Some(key) = key_val_plan
335 .key_plan
336 .evaluate_into(&mut row.inner, &mut row_buf)?
337 {
338 row.inner.resize(len, Value::Null);
340 let val = key_val_plan
342 .val_plan
343 .evaluate_into(&mut row.inner, &mut row_buf)?
344 .context(InternalSnafu {
345 reason: "val_plan should not contain any filter predicate",
346 })?;
347 Ok(Some(((key, val), sys_time, diff)))
348 } else {
349 Ok(None)
350 }
351 })?
352 },
353 )
354}
355
356fn reduce_batch_subgraph(
357 arrange: &ArrangeHandler,
358 src_data: impl IntoIterator<Item = Batch>,
359 key_val_plan: &KeyValPlan,
360 accum_plan: &AccumulablePlan,
361 output_type: &RelationType,
362 SubgraphArg {
363 now,
364 err_collector,
365 scheduler: _,
366 send,
367 }: SubgraphArg<Toff<Batch>>,
368) {
369 let mut key_to_many_vals = BTreeMap::<Row, Vec<Batch>>::new();
370 let mut input_row_count = 0;
371 let mut input_batch_count = 0;
372
373 for batch in src_data {
374 input_batch_count += 1;
375 input_row_count += batch.row_count();
376 err_collector.run(|| {
377 let (key_batch, val_batch) =
378 batch_split_by_key_val(&batch, key_val_plan, err_collector);
379 ensure!(
380 key_batch.row_count() == val_batch.row_count(),
381 InternalSnafu {
382 reason: format!(
383 "Key and val batch should have the same row count, found {} and {}",
384 key_batch.row_count(),
385 val_batch.row_count()
386 )
387 }
388 );
389
390 let mut distinct_keys = BTreeSet::new();
391 for row_idx in 0..key_batch.row_count() {
392 let key_row = key_batch.get_row(row_idx)?;
393 let key_row = Row::new(key_row);
394
395 if distinct_keys.contains(&key_row) {
396 continue;
397 } else {
398 distinct_keys.insert(key_row.clone());
399 }
400 }
401
402 let key_data_types = output_type
403 .column_types
404 .iter()
405 .map(|t| t.scalar_type.clone())
406 .collect_vec();
407
408 for key_row in distinct_keys {
410 let key_scalar_value = {
411 let mut key_scalar_value = Vec::with_capacity(key_row.len());
412 for (key_idx, key) in key_row.iter().enumerate() {
413 let v =
414 key.try_to_scalar_value(&key.data_type())
415 .context(DataTypeSnafu {
416 msg: "can't convert key values to datafusion value",
417 })?;
418
419 let key_data_type = key_data_types.get(key_idx).context(InternalSnafu {
420 reason: format!(
421 "Key index out of bound, expected at most {} but got {}",
422 output_type.column_types.len(),
423 key_idx
424 ),
425 })?;
426
427 if key_data_type.as_arrow_type() != v.data_type()
429 && !v.data_type().is_null()
430 {
431 crate::expr::error::InternalSnafu {
432 reason: format!(
433 "Key data type mismatch, expected {:?} but got {:?}",
434 key_data_type.as_arrow_type(),
435 v.data_type()
436 ),
437 }
438 .fail()?
439 }
440
441 let arrow_value = if v.data_type().is_null() {
443 let ret = new_null_array(&arrow::datatypes::DataType::Null, 1);
444 arrow::array::Scalar::new(ret)
445 } else {
446 v.to_scalar().context(crate::expr::error::DatafusionSnafu {
447 context: "can't convert key values to arrow value",
448 })?
449 };
450 key_scalar_value.push(arrow_value);
451 }
452 key_scalar_value
453 };
454
455 let eq_results = key_scalar_value
457 .into_iter()
458 .zip(key_batch.batch().iter())
459 .map(|(key, col)| {
460 if arrow::array::Datum::get(&key).0.data_type().is_null() {
465 arrow::compute::kernels::boolean::is_null(
466 col.to_arrow_array().as_ref() as _
467 )
468 } else {
469 arrow::compute::kernels::cmp::eq(
470 &key,
471 &col.to_arrow_array().as_ref() as _,
472 )
473 }
474 })
475 .try_collect::<_, Vec<_>, _>()
476 .context(ArrowSnafu {
477 context: "Failed to compare key values",
478 })?;
479
480 let opt_eq_mask = eq_results
482 .into_iter()
483 .fold(None, |acc, v| match acc {
484 Some(Ok(acc)) => Some(arrow::compute::kernels::boolean::and(&acc, &v)),
485 Some(Err(_)) => acc,
486 None => Some(Ok(v)),
487 })
488 .transpose()
489 .context(ArrowSnafu {
490 context: "Failed to combine key comparison results",
491 })?;
492
493 let key_eq_mask = if let Some(eq_mask) = opt_eq_mask {
494 BooleanVector::from(eq_mask)
495 } else {
496 BooleanVector::from(vec![true; key_batch.row_count()])
500 };
501 let cur_val_batch = val_batch.filter(&key_eq_mask)?;
504
505 key_to_many_vals
506 .entry(key_row)
507 .or_default()
508 .push(cur_val_batch);
509 }
510
511 Ok(())
512 });
513 }
514
515 trace!(
516 "Reduce take {} batches, {} rows",
517 input_batch_count,
518 input_row_count
519 );
520
521 let mut arrange = arrange.write();
524 let mut all_arrange_updates = Vec::with_capacity(key_to_many_vals.len());
525
526 let mut all_output_dict = BTreeMap::new();
527
528 for (key, val_batches) in key_to_many_vals {
529 err_collector.run(|| -> Result<(), _> {
530 let (accums, _, _) = arrange.get(now, &key).unwrap_or_default();
531 let accum_list =
532 from_accum_values_to_live_accums(accums.unpack(), accum_plan.simple_aggrs.len())?;
533
534 let mut accum_output = AccumOutput::new();
535 for AggrWithIndex {
536 expr,
537 input_idx,
538 output_idx,
539 } in accum_plan.simple_aggrs.iter()
540 {
541 let cur_accum_value = accum_list.get(*output_idx).cloned().unwrap_or_default();
542 let mut cur_accum = if cur_accum_value.is_empty() {
543 Accum::new_accum(&expr.func.clone())?
544 } else {
545 Accum::try_into_accum(&expr.func, cur_accum_value)?
546 };
547
548 for val_batch in val_batches.iter() {
549 let cur_input = val_batch
551 .batch()
552 .get(*input_idx)
553 .cloned()
554 .unwrap_or_else(|| Arc::new(NullVector::new(val_batch.row_count())));
555 let len = cur_input.len();
556 cur_accum.update_batch(&expr.func, VectorDiff::from(cur_input))?;
557
558 trace!("Reduce accum after take {} rows: {:?}", len, cur_accum);
559 }
560 let final_output = cur_accum.eval(&expr.func)?;
561 trace!("Reduce accum final output: {:?}", final_output);
562 accum_output.insert_output(*output_idx, final_output);
563
564 let cur_accum_value = cur_accum.into_state();
565 accum_output.insert_accum(*output_idx, cur_accum_value);
566 }
567
568 let (new_accums, res_val_row) = accum_output.into_accum_output()?;
569
570 let arrange_update = ((key.clone(), Row::new(new_accums)), now, 1);
571 all_arrange_updates.push(arrange_update);
572
573 all_output_dict.insert(key, Row::from(res_val_row));
574
575 Ok(())
576 });
577 }
578
579 err_collector.run(|| {
580 arrange.apply_updates(now, all_arrange_updates)?;
581 arrange.compact_to(now)
582 });
583 drop(arrange);
585
586 let output_types = output_type
590 .column_types
591 .iter()
592 .map(|t| t.scalar_type.clone())
593 .collect_vec();
594
595 err_collector.run(|| {
596 let column_cnt = output_types.len();
597 let row_cnt = all_output_dict.len();
598
599 let mut output_builder = output_types
600 .into_iter()
601 .map(|t| t.create_mutable_vector(row_cnt))
602 .collect_vec();
603
604 for (key, val) in all_output_dict {
605 for (i, v) in key.into_iter().chain(val.into_iter()).enumerate() {
606 output_builder
607 .get_mut(i)
608 .context(InternalSnafu{
609 reason: format!(
610 "Output builder should have the same length as the row, expected at most {} but got {}",
611 column_cnt - 1,
612 i
613 )
614 })?
615 .try_push_value_ref(v.as_value_ref())
616 .context(DataTypeSnafu {
617 msg: "Failed to push value",
618 })?;
619 }
620 }
621
622 let output_columns = output_builder
623 .into_iter()
624 .map(|mut b| b.to_vector())
625 .collect_vec();
626
627 let output_batch = Batch::try_new(output_columns, row_cnt)?;
628
629 trace!("Reduce output batch: {:?}", output_batch);
630
631 send.give(vec![output_batch]);
632
633 Ok(())
634 });
635}
636
637fn reduce_subgraph(
640 ReduceArrange {
641 output_arrange: arrange,
642 distinct_input,
643 }: &ReduceArrange,
644 data: impl IntoIterator<Item = DiffRow>,
645 key_val_plan: &KeyValPlan,
646 reduce_plan: &ReducePlan,
647 SubgraphArg {
648 now,
649 err_collector,
650 scheduler,
651 send,
652 }: SubgraphArg,
653) {
654 let key_val = split_rows_to_key_val(data, key_val_plan.clone(), err_collector.clone());
655 match reduce_plan {
660 ReducePlan::Distinct => reduce_distinct_subgraph(
661 arrange,
662 key_val,
663 SubgraphArg {
664 now,
665 err_collector,
666 scheduler,
667 send,
668 },
669 ),
670 ReducePlan::Accumulable(accum_plan) => reduce_accum_subgraph(
671 arrange,
672 distinct_input,
673 key_val,
674 accum_plan,
675 SubgraphArg {
676 now,
677 err_collector,
678 scheduler,
679 send,
680 },
681 ),
682 };
683}
684
685fn eval_distinct_core(
689 arrange: ArrangeReader,
690 kv: impl IntoIterator<Item = KeyValDiffRow>,
691 now: repr::Timestamp,
692 err_collector: &ErrCollector,
693) -> Vec<KeyValDiffRow> {
694 let _ = err_collector;
695
696 let mut inner_map = BTreeMap::new();
699 kv.into_iter()
700 .filter_map(|((key, val), ts, diff)| {
701 let old_val = inner_map
703 .get(&key)
704 .cloned()
705 .or_else(|| arrange.get(now, &key));
706
707 let new_key_val = match (old_val, diff) {
708 (None, 1) => Some(((key, val), ts, diff)),
710 (Some(old_val), diff) if old_val.0 == val && old_val.2 != diff => {
712 Some(((key, val), ts, diff))
713 }
714 _ => None,
715 };
716
717 if let Some(((k, v), t, d)) = new_key_val.clone() {
718 inner_map.insert(k, (v, t, d));
720 }
721 new_key_val
722 })
723 .collect_vec()
724}
725
726fn update_reduce_distinct_arrange(
730 arrange: &ArrangeHandler,
731 kv: impl IntoIterator<Item = KeyValDiffRow>,
732 now: repr::Timestamp,
733 err_collector: &ErrCollector,
734) -> impl Iterator<Item = DiffRow> {
735 let result_updates = eval_distinct_core(arrange.read(), kv, now, err_collector);
736
737 err_collector.run(|| {
738 arrange.write().apply_updates(now, result_updates)?;
739 Ok(())
740 });
741
742 let from = arrange.read().last_compaction_time();
746 let from = from.unwrap_or(repr::Timestamp::MIN);
747 let range = (
748 std::ops::Bound::Excluded(from),
749 std::ops::Bound::Included(now),
750 );
751 let output_kv = arrange.read().get_updates_in_range(range);
752
753 let run_compaction = || {
755 arrange.write().compact_to(now)?;
756 Ok(())
757 };
758 err_collector.run(run_compaction);
759
760 output_kv.into_iter().map(|((mut key, v), ts, diff)| {
763 key.extend(v.into_iter());
764 (key, ts, diff)
765 })
766}
767
768fn reduce_distinct_subgraph(
773 arrange: &ArrangeHandler,
774 kv: impl IntoIterator<Item = KeyValDiffRow>,
775 SubgraphArg {
776 now,
777 err_collector,
778 scheduler: _,
779 send,
780 }: SubgraphArg,
781) {
782 let ret = update_reduce_distinct_arrange(arrange, kv, now, err_collector).collect_vec();
783
784 if arrange.read().get_next_update_time(&now).is_some() {
786 err_collector.push_err(
787 InternalSnafu {
788 reason: "No future updates should exist in the reduce distinct arrangement",
789 }
790 .build(),
791 );
792 }
793
794 send.give(ret);
795}
796
797fn reduce_accum_subgraph(
814 arrange: &ArrangeHandler,
815 distinct_input: &Option<Vec<ArrangeHandler>>,
816 kv: impl IntoIterator<Item = KeyValDiffRow>,
817 accum_plan: &AccumulablePlan,
818 SubgraphArg {
819 now,
820 err_collector,
821 scheduler,
822 send,
823 }: SubgraphArg,
824) {
825 let AccumulablePlan {
826 full_aggrs,
827 simple_aggrs,
828 distinct_aggrs,
829 } = accum_plan;
830 let mut key_to_vals = BTreeMap::<Row, Vec<(Row, repr::Diff)>>::new();
831
832 for ((key, val), _tick, diff) in kv {
833 let vals = key_to_vals.entry(key).or_default();
835 vals.push((val, diff));
836 }
837
838 let mut all_updates = Vec::with_capacity(key_to_vals.len());
839 let mut all_outputs = Vec::with_capacity(key_to_vals.len());
840 let mut arrange = arrange.write();
844 for (key, value_diffs) in key_to_vals {
845 if let Some(expire_man) = &arrange.get_expire_state() {
846 let mut is_expired = false;
847 err_collector.run(|| {
848 if let Some(expired) = expire_man.get_expire_duration(now, &key)? {
849 is_expired = true;
850 common_telemetry::warn!(
852 "Data already expired: {}",
853 DataAlreadyExpiredSnafu {
854 expired_by: expired,
855 }
856 .build()
857 );
858 Ok(())
859 } else {
860 Ok(())
861 }
862 });
863 if is_expired {
864 continue;
866 }
867 }
868 let col_diffs = {
869 let row_len = value_diffs[0].0.len();
870 let res = err_collector.run(|| get_col_diffs(value_diffs, row_len));
871 match res {
872 Some(res) => res,
873 None => continue,
876 }
877 };
878 let (accums, _, _) = arrange.get(now, &key).unwrap_or_default();
879
880 let accums = accums.inner;
881
882 let accum_ranges = {
884 let res = err_collector
885 .run(|| from_val_to_slice_idx(accums.first().cloned(), full_aggrs.len()));
886 if let Some(res) = res {
887 res
888 } else {
889 continue;
891 }
892 };
893
894 let mut accum_output = AccumOutput::new();
895 eval_simple_aggrs(
896 simple_aggrs,
897 &accums,
898 &accum_ranges,
899 &col_diffs,
900 &mut accum_output,
901 err_collector,
902 );
903
904 eval_distinct_aggrs(
906 distinct_aggrs,
907 distinct_input,
908 &accums,
909 &accum_ranges,
910 &col_diffs,
911 &mut accum_output,
912 SubgraphArg {
913 now,
914 err_collector,
915 scheduler,
916 send,
917 },
918 );
919
920 err_collector.run(|| {
922 let (new_accums, res_val_row) = accum_output.into_accum_output()?;
923
924 all_updates.push(((key.clone(), Row::new(new_accums)), now, 1));
926 let mut key_val = key;
927 key_val.extend(res_val_row);
928 all_outputs.push((key_val, now, 1));
929 Ok(())
930 });
931 }
932 err_collector.run(|| {
933 arrange.apply_updates(now, all_updates)?;
934 arrange.compact_to(now)
935 });
936
937 let all_arrange_used = distinct_input
940 .iter()
941 .flatten()
942 .map(|d| d.write())
943 .chain(std::iter::once(arrange));
944 check_no_future_updates(all_arrange_used, err_collector, now);
945
946 send.give(all_outputs);
947}
948
949fn get_col_diffs(
950 value_diffs: Vec<(Row, repr::Diff)>,
951 row_len: usize,
952) -> Result<Vec<Vec<(Value, i64)>>, EvalError> {
953 ensure!(
954 value_diffs.iter().all(|(row, _)| row.len() == row_len),
955 InternalSnafu {
956 reason: "value_diffs should have rows with equal length"
957 }
958 );
959 let ret = (0..row_len)
960 .map(|i| {
961 value_diffs
962 .iter()
963 .map(|(row, diff)| (row.get(i).cloned().unwrap(), *diff))
964 .collect_vec()
965 })
966 .collect_vec();
967 Ok(ret)
968}
969
970fn eval_simple_aggrs(
972 simple_aggrs: &Vec<AggrWithIndex>,
973 accums: &[Value],
974 accum_ranges: &[Range<usize>],
975 col_diffs: &[Vec<(Value, i64)>],
976 accum_output: &mut AccumOutput,
977 err_collector: &ErrCollector,
978) {
979 for AggrWithIndex {
980 expr,
981 input_idx,
982 output_idx,
983 } in simple_aggrs
984 {
985 let cur_accum_range = accum_ranges[*output_idx].clone(); let cur_old_accum = accums
987 .get(cur_accum_range)
988 .unwrap_or_default()
989 .iter()
990 .cloned();
991 let cur_col_diff = col_diffs[*input_idx].iter().cloned();
992
993 if let Some((res, new_accum)) =
995 err_collector.run(|| expr.func.eval_diff_accumulable(cur_old_accum, cur_col_diff))
996 {
997 accum_output.insert_accum(*output_idx, new_accum);
998 accum_output.insert_output(*output_idx, res);
999 } }
1001}
1002
1003#[derive(Debug)]
1009struct AccumOutput {
1010 accum: BTreeMap<usize, Vec<Value>>,
1011 output: BTreeMap<usize, Value>,
1012}
1013
1014impl AccumOutput {
1015 fn new() -> Self {
1016 Self {
1017 accum: BTreeMap::new(),
1018 output: BTreeMap::new(),
1019 }
1020 }
1021
1022 fn insert_accum(&mut self, idx: usize, v: Vec<Value>) {
1023 self.accum.insert(idx, v);
1024 }
1025
1026 fn insert_output(&mut self, idx: usize, v: Value) {
1027 self.output.insert(idx, v);
1028 }
1029
1030 fn into_accum_output(self) -> Result<(Vec<Value>, Vec<Value>), EvalError> {
1032 if self.accum.is_empty() && self.output.is_empty() {
1033 return Ok((vec![], vec![]));
1034 }
1035 ensure!(
1036 !self.accum.is_empty() && self.accum.len() == self.output.len(),
1037 InternalSnafu {
1038 reason: format!("Accum and output should have the non-zero and same length, found accum.len() = {}, output.len() = {}", self.accum.len(), self.output.len())
1039 }
1040 );
1041 if let Some(kv) = self.accum.last_key_value() {
1043 ensure!(
1044 *kv.0 == self.accum.len() - 1,
1045 InternalSnafu {
1046 reason: "Accum should be a continuous range"
1047 }
1048 );
1049 }
1050 if let Some(kv) = self.output.last_key_value() {
1051 ensure!(
1052 *kv.0 == self.output.len() - 1,
1053 InternalSnafu {
1054 reason: "Output should be a continuous range"
1055 }
1056 );
1057 }
1058
1059 let accums = self.accum.into_values().collect_vec();
1060 let new_accums = from_accums_to_offsetted_accum(accums);
1061 let output = self.output.into_values().collect_vec();
1062 Ok((new_accums, output))
1063 }
1064}
1065
1066fn eval_distinct_aggrs(
1068 distinct_aggrs: &Vec<AggrWithIndex>,
1069 distinct_input: &Option<Vec<ArrangeHandler>>,
1070 accums: &[Value],
1071 accum_ranges: &[Range<usize>],
1072 col_diffs: &[Vec<(Value, i64)>],
1073 accum_output: &mut AccumOutput,
1074 SubgraphArg {
1075 now,
1076 err_collector,
1077 scheduler: _,
1078 send: _,
1079 }: SubgraphArg,
1080) {
1081 for AggrWithIndex {
1082 expr,
1083 input_idx,
1084 output_idx,
1085 } in distinct_aggrs
1086 {
1087 let cur_accum_range = accum_ranges[*output_idx].clone(); let cur_old_accum = accums
1089 .get(cur_accum_range)
1090 .unwrap_or_default()
1091 .iter()
1092 .cloned();
1093 let cur_col_diff = col_diffs[*input_idx].iter().cloned();
1094 let input_arrange = distinct_input
1096 .as_ref()
1097 .and_then(|v| v[*input_idx].clone_full_arrange())
1098 .expect("A full distinct input arrangement should exist");
1099 let kv = cur_col_diff.map(|(v, d)| ((Row::new(vec![v]), Row::empty()), now, d));
1100 let col_diff_distinct =
1101 update_reduce_distinct_arrange(&input_arrange, kv, now, err_collector).map(
1102 |(row, _ts, diff)| (row.get(0).expect("Row should not be empty").clone(), diff),
1103 );
1104 let col_diff_distinct = {
1105 let res = col_diff_distinct.collect_vec();
1106 res.into_iter()
1107 };
1108 let (res, new_accum) = expr
1110 .func
1111 .eval_diff_accumulable(cur_old_accum, col_diff_distinct)
1112 .unwrap();
1113 accum_output.insert_accum(*output_idx, new_accum);
1114 accum_output.insert_output(*output_idx, res);
1115 }
1116}
1117
1118fn check_no_future_updates<'a>(
1119 all_arrange_used: impl IntoIterator<Item = ArrangeWriter<'a>>,
1120 err_collector: &ErrCollector,
1121 now: repr::Timestamp,
1122) {
1123 for arrange in all_arrange_used {
1124 if arrange.get_next_update_time(&now).is_some() {
1125 err_collector.push_err(
1126 InternalSnafu {
1127 reason: "No future updates should exist in the reduce distinct arrangement",
1128 }
1129 .build(),
1130 );
1131 }
1132 }
1133}
1134
1135fn from_accums_to_offsetted_accum(new_accums: Vec<Vec<Value>>) -> Vec<Value> {
1137 let offset = new_accums
1138 .iter()
1139 .map(|v| v.len() as u64)
1140 .scan(1, |state, x| {
1141 *state += x;
1142 Some(*state)
1143 })
1144 .map(Value::from)
1145 .collect::<Vec<_>>();
1146 let first = ListValue::new(offset, ConcreteDataType::uint64_datatype());
1147 let first = Value::List(first);
1148 std::iter::once(first)
1151 .chain(new_accums.into_iter().flatten())
1152 .collect::<Vec<_>>()
1153}
1154
1155fn from_val_to_slice_idx(
1157 value: Option<Value>,
1158 expected_len: usize,
1159) -> Result<Vec<Range<usize>>, EvalError> {
1160 let offset_end = if let Some(value) = value {
1161 let list = value
1162 .as_list()
1163 .with_context(|_| DataTypeSnafu {
1164 msg: "Accum's first element should be a list",
1165 })?
1166 .context(InternalSnafu {
1167 reason: "Accum's first element should be a list",
1168 })?;
1169 let ret: Vec<usize> = list
1170 .items()
1171 .iter()
1172 .map(|v| {
1173 v.as_u64().map(|j| j as usize).context(InternalSnafu {
1174 reason: "End offset should be a list of u64",
1175 })
1176 })
1177 .try_collect()?;
1178 ensure!(
1179 ret.len() == expected_len,
1180 InternalSnafu {
1181 reason: "Offset List should have the same length as full_aggrs"
1182 }
1183 );
1184 Ok(ret)
1185 } else {
1186 Ok(vec![1usize; expected_len])
1187 }?;
1188 let accum_ranges = (0..expected_len)
1189 .map(|idx| {
1190 if idx == 0 {
1191 debug_assert!(
1193 offset_end[0] >= 1,
1194 "Offset should be at least 1: {:?}",
1195 &offset_end
1196 );
1197 1..offset_end[0]
1198 } else {
1199 offset_end[idx - 1]..offset_end[idx]
1200 }
1201 })
1202 .collect_vec();
1203 Ok(accum_ranges)
1204}
1205
1206#[cfg(test)]
1209mod test {
1210
1211 use std::time::Duration;
1212
1213 use common_time::Timestamp;
1214 use datatypes::data_type::{ConcreteDataType, ConcreteDataType as CDT};
1215 use dfir_rs::scheduled::graph::Dfir;
1216
1217 use super::*;
1218 use crate::compute::render::test::{get_output_handle, harness_test_ctx, run_and_check};
1219 use crate::compute::state::DataflowState;
1220 use crate::expr::{
1221 self, AggregateExpr, AggregateFunc, BinaryFunc, GlobalId, MapFilterProject, UnaryFunc,
1222 };
1223 use crate::plan::Plan;
1224 use crate::repr::{ColumnType, RelationType};
1225
1226 #[test]
1230 fn test_tumble_group_by() {
1231 let mut df = Dfir::new();
1232 let mut state = DataflowState::default();
1233 let mut ctx = harness_test_ctx(&mut df, &mut state);
1234 const START: i64 = 1625097600000;
1235 let rows = vec![
1236 (1u32, START + 1000),
1237 (2u32, START + 1500),
1238 (3u32, START + 2000),
1239 (1u32, START + 2500),
1240 (2u32, START + 3000),
1241 (3u32, START + 3500),
1242 ];
1243 let rows = rows
1244 .into_iter()
1245 .map(|(number, ts)| {
1246 (
1247 Row::new(vec![number.into(), Timestamp::new_millisecond(ts).into()]),
1248 1,
1249 1,
1250 )
1251 })
1252 .collect_vec();
1253
1254 let collection = ctx.render_constant(rows.clone());
1255 ctx.insert_global(GlobalId::User(1), collection);
1256
1257 let aggr_expr = AggregateExpr {
1258 func: AggregateFunc::SumUInt32,
1259 expr: ScalarExpr::Column(0),
1260 distinct: false,
1261 };
1262 let expected = TypedPlan {
1263 schema: RelationType::new(vec![
1264 ColumnType::new(CDT::uint64_datatype(), true), ColumnType::new(CDT::timestamp_millisecond_datatype(), false), ColumnType::new(CDT::timestamp_millisecond_datatype(), false), ])
1268 .into_unnamed(),
1269 plan: Plan::Mfp {
1274 input: Box::new(
1275 Plan::Reduce {
1276 input: Box::new(
1277 Plan::Get {
1278 id: crate::expr::Id::Global(GlobalId::User(1)),
1279 }
1280 .with_types(
1281 RelationType::new(vec![
1282 ColumnType::new(ConcreteDataType::uint32_datatype(), false),
1283 ColumnType::new(
1284 ConcreteDataType::timestamp_millisecond_datatype(),
1285 false,
1286 ),
1287 ])
1288 .into_unnamed(),
1289 ),
1290 ),
1291 key_val_plan: KeyValPlan {
1292 key_plan: MapFilterProject::new(2)
1293 .map(vec![
1294 ScalarExpr::Column(1).call_unary(
1295 UnaryFunc::TumbleWindowFloor {
1296 window_size: Duration::from_nanos(1_000_000_000),
1297 start_time: Some(Timestamp::new_millisecond(
1298 1625097600000,
1299 )),
1300 },
1301 ),
1302 ScalarExpr::Column(1).call_unary(
1303 UnaryFunc::TumbleWindowCeiling {
1304 window_size: Duration::from_nanos(1_000_000_000),
1305 start_time: Some(Timestamp::new_millisecond(
1306 1625097600000,
1307 )),
1308 },
1309 ),
1310 ])
1311 .unwrap()
1312 .project(vec![2, 3])
1313 .unwrap()
1314 .into_safe(),
1315 val_plan: MapFilterProject::new(2)
1316 .project(vec![0, 1])
1317 .unwrap()
1318 .into_safe(),
1319 },
1320 reduce_plan: ReducePlan::Accumulable(AccumulablePlan {
1321 full_aggrs: vec![aggr_expr.clone()],
1322 simple_aggrs: vec![AggrWithIndex::new(aggr_expr.clone(), 0, 0)],
1323 distinct_aggrs: vec![],
1324 }),
1325 }
1326 .with_types(
1327 RelationType::new(vec![
1328 ColumnType::new(CDT::timestamp_millisecond_datatype(), false), ColumnType::new(CDT::timestamp_millisecond_datatype(), false), ColumnType::new(CDT::uint64_datatype(), true), ])
1332 .with_key(vec![1])
1333 .with_time_index(Some(0))
1334 .into_unnamed(),
1335 ),
1336 ),
1337 mfp: MapFilterProject::new(3)
1338 .map(vec![
1339 ScalarExpr::Column(2),
1340 ScalarExpr::Column(3),
1341 ScalarExpr::Column(0),
1342 ScalarExpr::Column(1),
1343 ])
1344 .unwrap()
1345 .project(vec![4, 5, 6])
1346 .unwrap(),
1347 },
1348 };
1349
1350 let bundle = ctx.render_plan(expected).unwrap();
1351
1352 let output = get_output_handle(&mut ctx, bundle);
1353 drop(ctx);
1354 let expected = BTreeMap::from([(
1355 1,
1356 vec![
1357 (
1358 Row::new(vec![
1359 3u64.into(),
1360 Timestamp::new_millisecond(START + 1000).into(),
1361 Timestamp::new_millisecond(START + 2000).into(),
1362 ]),
1363 1,
1364 1,
1365 ),
1366 (
1367 Row::new(vec![
1368 4u64.into(),
1369 Timestamp::new_millisecond(START + 2000).into(),
1370 Timestamp::new_millisecond(START + 3000).into(),
1371 ]),
1372 1,
1373 1,
1374 ),
1375 (
1376 Row::new(vec![
1377 5u64.into(),
1378 Timestamp::new_millisecond(START + 3000).into(),
1379 Timestamp::new_millisecond(START + 4000).into(),
1380 ]),
1381 1,
1382 1,
1383 ),
1384 ],
1385 )]);
1386 run_and_check(&mut state, &mut df, 1..2, expected, output);
1387 }
1388
1389 #[test]
1391 fn test_avg_eval() {
1392 let mut df = Dfir::new();
1393 let mut state = DataflowState::default();
1394 let mut ctx = harness_test_ctx(&mut df, &mut state);
1395
1396 let rows = vec![
1397 (Row::new(vec![1u32.into()]), 1, 1),
1398 (Row::new(vec![2u32.into()]), 1, 1),
1399 (Row::new(vec![3u32.into()]), 1, 1),
1400 (Row::new(vec![1u32.into()]), 1, 1),
1401 (Row::new(vec![2u32.into()]), 1, 1),
1402 (Row::new(vec![3u32.into()]), 1, 1),
1403 ];
1404 let collection = ctx.render_constant(rows.clone());
1405 ctx.insert_global(GlobalId::User(1), collection);
1406
1407 let aggr_exprs = vec![
1408 AggregateExpr {
1409 func: AggregateFunc::SumUInt32,
1410 expr: ScalarExpr::Column(0),
1411 distinct: false,
1412 },
1413 AggregateExpr {
1414 func: AggregateFunc::Count,
1415 expr: ScalarExpr::Column(0),
1416 distinct: false,
1417 },
1418 ];
1419 let avg_expr = ScalarExpr::If {
1420 cond: Box::new(ScalarExpr::Column(1).call_binary(
1421 ScalarExpr::Literal(Value::from(0u32), CDT::int64_datatype()),
1422 BinaryFunc::NotEq,
1423 )),
1424 then: Box::new(ScalarExpr::Column(0).call_binary(
1425 ScalarExpr::Column(1).call_unary(UnaryFunc::Cast(CDT::uint64_datatype())),
1426 BinaryFunc::DivUInt64,
1427 )),
1428 els: Box::new(ScalarExpr::Literal(Value::Null, CDT::uint64_datatype())),
1429 };
1430 let expected = TypedPlan {
1431 schema: RelationType::new(vec![ColumnType::new(CDT::uint64_datatype(), true)])
1432 .into_unnamed(),
1433 plan: Plan::Mfp {
1434 input: Box::new(
1435 Plan::Reduce {
1436 input: Box::new(
1437 Plan::Get {
1438 id: crate::expr::Id::Global(GlobalId::User(1)),
1439 }
1440 .with_types(
1441 RelationType::new(vec![ColumnType::new(
1442 ConcreteDataType::int64_datatype(),
1443 false,
1444 )])
1445 .into_unnamed(),
1446 ),
1447 ),
1448 key_val_plan: KeyValPlan {
1449 key_plan: MapFilterProject::new(1)
1450 .project(vec![])
1451 .unwrap()
1452 .into_safe(),
1453 val_plan: MapFilterProject::new(1)
1454 .project(vec![0])
1455 .unwrap()
1456 .into_safe(),
1457 },
1458 reduce_plan: ReducePlan::Accumulable(AccumulablePlan {
1459 full_aggrs: aggr_exprs.clone(),
1460 simple_aggrs: vec![
1461 AggrWithIndex::new(aggr_exprs[0].clone(), 0, 0),
1462 AggrWithIndex::new(aggr_exprs[1].clone(), 0, 1),
1463 ],
1464 distinct_aggrs: vec![],
1465 }),
1466 }
1467 .with_types(
1468 RelationType::new(vec![
1469 ColumnType::new(ConcreteDataType::uint32_datatype(), true),
1470 ColumnType::new(ConcreteDataType::int64_datatype(), true),
1471 ])
1472 .into_unnamed(),
1473 ),
1474 ),
1475 mfp: MapFilterProject::new(2)
1476 .map(vec![
1477 avg_expr,
1478 ScalarExpr::Column(2),
1480 ])
1481 .unwrap()
1482 .project(vec![3])
1483 .unwrap(),
1484 },
1485 };
1486
1487 let bundle = ctx.render_plan(expected).unwrap();
1488
1489 let output = get_output_handle(&mut ctx, bundle);
1490 drop(ctx);
1491 let expected = BTreeMap::from([(1, vec![(Row::new(vec![2u64.into()]), 1, 1)])]);
1492 run_and_check(&mut state, &mut df, 1..2, expected, output);
1493 }
1494
1495 #[test]
1502 fn test_basic_distinct() {
1503 let mut df = Dfir::new();
1504 let mut state = DataflowState::default();
1505 let mut ctx = harness_test_ctx(&mut df, &mut state);
1506
1507 let rows = vec![
1508 (Row::new(vec![1i64.into()]), 1, 1),
1509 (Row::new(vec![2i64.into()]), 2, 1),
1510 (Row::new(vec![3i64.into()]), 3, 1),
1511 (Row::new(vec![1i64.into()]), 4, 1),
1512 (Row::new(vec![2i64.into()]), 5, 1),
1513 (Row::new(vec![3i64.into()]), 6, 1),
1514 ];
1515 let collection = ctx.render_constant(rows.clone());
1516 ctx.insert_global(GlobalId::User(1), collection);
1517 let input_plan = Plan::Get {
1518 id: expr::Id::Global(GlobalId::User(1)),
1519 };
1520 let typ = RelationType::new(vec![ColumnType::new_nullable(
1521 ConcreteDataType::int64_datatype(),
1522 )]);
1523 let key_val_plan = KeyValPlan {
1524 key_plan: MapFilterProject::new(1).project([0]).unwrap().into_safe(),
1525 val_plan: MapFilterProject::new(1).project([]).unwrap().into_safe(),
1526 };
1527 let reduce_plan = ReducePlan::Distinct;
1528 let bundle = ctx
1529 .render_reduce(
1530 Box::new(input_plan.with_types(typ.into_unnamed())),
1531 key_val_plan,
1532 reduce_plan,
1533 RelationType::empty(),
1534 )
1535 .unwrap();
1536
1537 let output = get_output_handle(&mut ctx, bundle);
1538 drop(ctx);
1539 let expected = BTreeMap::from([(
1540 6,
1541 vec![
1542 (Row::new(vec![1i64.into()]), 1, 1),
1543 (Row::new(vec![2i64.into()]), 2, 1),
1544 (Row::new(vec![3i64.into()]), 3, 1),
1545 ],
1546 )]);
1547 run_and_check(&mut state, &mut df, 6..7, expected, output);
1548 }
1549
1550 #[test]
1558 fn test_basic_batch_reduce_accum() {
1559 let mut df = Dfir::new();
1560 let mut state = DataflowState::default();
1561 let now = state.current_time_ref();
1562 let mut ctx = harness_test_ctx(&mut df, &mut state);
1563
1564 let rows = vec![
1565 (Row::new(vec![Value::Null]), -1, 1),
1566 (Row::new(vec![1i64.into()]), 0, 1),
1567 (Row::new(vec![Value::Null]), 1, 1),
1568 (Row::new(vec![2i64.into()]), 2, 1),
1569 (Row::new(vec![3i64.into()]), 3, 1),
1570 (Row::new(vec![1i64.into()]), 4, 1),
1571 (Row::new(vec![2i64.into()]), 5, 1),
1572 (Row::new(vec![3i64.into()]), 6, 1),
1573 ];
1574 let input_plan = Plan::Constant { rows: rows.clone() };
1575
1576 let typ = RelationType::new(vec![ColumnType::new_nullable(
1577 ConcreteDataType::int64_datatype(),
1578 )]);
1579 let key_val_plan = KeyValPlan {
1580 key_plan: MapFilterProject::new(1).project([]).unwrap().into_safe(),
1581 val_plan: MapFilterProject::new(1).project([0]).unwrap().into_safe(),
1582 };
1583
1584 let simple_aggrs = vec![AggrWithIndex::new(
1585 AggregateExpr {
1586 func: AggregateFunc::SumInt64,
1587 expr: ScalarExpr::Column(0),
1588 distinct: false,
1589 },
1590 0,
1591 0,
1592 )];
1593 let accum_plan = AccumulablePlan {
1594 full_aggrs: vec![AggregateExpr {
1595 func: AggregateFunc::SumInt64,
1596 expr: ScalarExpr::Column(0),
1597 distinct: false,
1598 }],
1599 simple_aggrs,
1600 distinct_aggrs: vec![],
1601 };
1602
1603 let reduce_plan = ReducePlan::Accumulable(accum_plan);
1604 let bundle = ctx
1605 .render_reduce_batch(
1606 Box::new(input_plan.with_types(typ.into_unnamed())),
1607 &key_val_plan,
1608 &reduce_plan,
1609 &RelationType::new(vec![ColumnType::new(CDT::int64_datatype(), true)]),
1610 )
1611 .unwrap();
1612
1613 {
1614 let now_inner = now.clone();
1615 let expected = BTreeMap::<i64, Vec<i64>>::from([
1616 (-1, vec![]),
1617 (0, vec![1i64]),
1618 (1, vec![1i64]),
1619 (2, vec![3i64]),
1620 (3, vec![6i64]),
1621 (4, vec![7i64]),
1622 (5, vec![9i64]),
1623 (6, vec![12i64]),
1624 ]);
1625 let collection = bundle.collection;
1626 ctx.df
1627 .add_subgraph_sink("test_sink", collection.into_inner(), move |_ctx, recv| {
1628 let now = *now_inner.borrow();
1629 let data = recv.take_inner();
1630 let res = data.into_iter().flat_map(|v| v.into_iter()).collect_vec();
1631
1632 if let Some(expected) = expected.get(&now) {
1633 let batch = expected.iter().map(|v| Value::from(*v)).collect_vec();
1634 let batch = Batch::try_from_rows_with_types(
1635 vec![batch.into()],
1636 &[CDT::int64_datatype()],
1637 )
1638 .unwrap();
1639 assert_eq!(res.first(), Some(&batch));
1640 }
1641 });
1642 drop(ctx);
1643
1644 for now in 1..7 {
1645 state.set_current_ts(now);
1646 state.run_available_with_schedule(&mut df);
1647 if !state.get_err_collector().is_empty() {
1648 panic!(
1649 "Errors occur: {:?}",
1650 state.get_err_collector().get_all_blocking()
1651 )
1652 }
1653 }
1654 }
1655 }
1656
1657 #[test]
1664 fn test_basic_reduce_accum() {
1665 let mut df = Dfir::new();
1666 let mut state = DataflowState::default();
1667 let mut ctx = harness_test_ctx(&mut df, &mut state);
1668
1669 let rows = vec![
1670 (Row::new(vec![1i64.into()]), 1, 1),
1671 (Row::new(vec![2i64.into()]), 2, 1),
1672 (Row::new(vec![3i64.into()]), 3, 1),
1673 (Row::new(vec![1i64.into()]), 4, 1),
1674 (Row::new(vec![2i64.into()]), 5, 1),
1675 (Row::new(vec![3i64.into()]), 6, 1),
1676 ];
1677 let collection = ctx.render_constant(rows.clone());
1678 ctx.insert_global(GlobalId::User(1), collection);
1679 let input_plan = Plan::Get {
1680 id: expr::Id::Global(GlobalId::User(1)),
1681 };
1682 let typ = RelationType::new(vec![ColumnType::new_nullable(
1683 ConcreteDataType::int64_datatype(),
1684 )]);
1685 let key_val_plan = KeyValPlan {
1686 key_plan: MapFilterProject::new(1).project([]).unwrap().into_safe(),
1687 val_plan: MapFilterProject::new(1).project([0]).unwrap().into_safe(),
1688 };
1689
1690 let simple_aggrs = vec![AggrWithIndex::new(
1691 AggregateExpr {
1692 func: AggregateFunc::SumInt64,
1693 expr: ScalarExpr::Column(0),
1694 distinct: false,
1695 },
1696 0,
1697 0,
1698 )];
1699 let accum_plan = AccumulablePlan {
1700 full_aggrs: vec![AggregateExpr {
1701 func: AggregateFunc::SumInt64,
1702 expr: ScalarExpr::Column(0),
1703 distinct: false,
1704 }],
1705 simple_aggrs,
1706 distinct_aggrs: vec![],
1707 };
1708
1709 let reduce_plan = ReducePlan::Accumulable(accum_plan);
1710 let bundle = ctx
1711 .render_reduce(
1712 Box::new(input_plan.with_types(typ.into_unnamed())),
1713 key_val_plan,
1714 reduce_plan,
1715 RelationType::empty(),
1716 )
1717 .unwrap();
1718
1719 let output = get_output_handle(&mut ctx, bundle);
1720 drop(ctx);
1721 let expected = BTreeMap::from([
1722 (1, vec![(Row::new(vec![1i64.into()]), 1, 1)]),
1723 (2, vec![(Row::new(vec![3i64.into()]), 2, 1)]),
1724 (3, vec![(Row::new(vec![6i64.into()]), 3, 1)]),
1725 (4, vec![(Row::new(vec![7i64.into()]), 4, 1)]),
1726 (5, vec![(Row::new(vec![9i64.into()]), 5, 1)]),
1727 (6, vec![(Row::new(vec![12i64.into()]), 6, 1)]),
1728 ]);
1729 run_and_check(&mut state, &mut df, 1..7, expected, output);
1730 }
1731
1732 #[test]
1741 fn test_delete_reduce_distinct_accum() {
1742 let mut df = Dfir::new();
1743 let mut state = DataflowState::default();
1744 let mut ctx = harness_test_ctx(&mut df, &mut state);
1745
1746 let rows = vec![
1747 (Row::new(vec![1i64.into()]), 1, 1),
1749 (Row::new(vec![1i64.into()]), 1, -1),
1750 (Row::new(vec![1i64.into()]), 2, 1),
1752 (Row::new(vec![1i64.into()]), 3, -1),
1753 (Row::new(vec![1i64.into()]), 4, 1),
1755 (Row::new(vec![1i64.into()]), 4, -1),
1756 (Row::new(vec![1i64.into()]), 4, 1),
1757 ];
1758 let collection = ctx.render_constant(rows.clone());
1759 ctx.insert_global(GlobalId::User(1), collection);
1760 let input_plan = Plan::Get {
1761 id: expr::Id::Global(GlobalId::User(1)),
1762 };
1763 let typ = RelationType::new(vec![ColumnType::new_nullable(
1764 ConcreteDataType::int64_datatype(),
1765 )]);
1766 let key_val_plan = KeyValPlan {
1767 key_plan: MapFilterProject::new(1).project([]).unwrap().into_safe(),
1768 val_plan: MapFilterProject::new(1).project([0]).unwrap().into_safe(),
1769 };
1770
1771 let distinct_aggrs = vec![AggrWithIndex::new(
1772 AggregateExpr {
1773 func: AggregateFunc::SumInt64,
1774 expr: ScalarExpr::Column(0),
1775 distinct: false,
1776 },
1777 0,
1778 0,
1779 )];
1780 let accum_plan = AccumulablePlan {
1781 full_aggrs: vec![AggregateExpr {
1782 func: AggregateFunc::SumInt64,
1783 expr: ScalarExpr::Column(0),
1784 distinct: true,
1785 }],
1786 simple_aggrs: vec![],
1787 distinct_aggrs,
1788 };
1789
1790 let reduce_plan = ReducePlan::Accumulable(accum_plan);
1791 let bundle = ctx
1792 .render_reduce(
1793 Box::new(input_plan.with_types(typ.into_unnamed())),
1794 key_val_plan,
1795 reduce_plan,
1796 RelationType::empty(),
1797 )
1798 .unwrap();
1799
1800 let output = get_output_handle(&mut ctx, bundle);
1801 drop(ctx);
1802 let expected = BTreeMap::from([
1803 (1, vec![(Row::new(vec![0i64.into()]), 1, 1)]),
1804 (2, vec![(Row::new(vec![1i64.into()]), 2, 1)]),
1805 (3, vec![(Row::new(vec![0i64.into()]), 3, 1)]),
1806 (4, vec![(Row::new(vec![1i64.into()]), 4, 1)]),
1807 ]);
1808 run_and_check(&mut state, &mut df, 1..7, expected, output);
1809 }
1810
1811 #[test]
1820 fn test_basic_reduce_distinct_accum() {
1821 let mut df = Dfir::new();
1822 let mut state = DataflowState::default();
1823 let mut ctx = harness_test_ctx(&mut df, &mut state);
1824
1825 let rows = vec![
1826 (Row::new(vec![1i64.into()]), 1, 1),
1827 (Row::new(vec![1i64.into()]), 1, -1),
1828 (Row::new(vec![2i64.into()]), 2, 1),
1829 (Row::new(vec![3i64.into()]), 3, 1),
1830 (Row::new(vec![1i64.into()]), 4, 1),
1831 (Row::new(vec![2i64.into()]), 5, 1),
1832 (Row::new(vec![3i64.into()]), 6, 1),
1833 (Row::new(vec![1i64.into()]), 7, 1),
1834 ];
1835 let collection = ctx.render_constant(rows.clone());
1836 ctx.insert_global(GlobalId::User(1), collection);
1837 let input_plan = Plan::Get {
1838 id: expr::Id::Global(GlobalId::User(1)),
1839 };
1840 let typ = RelationType::new(vec![ColumnType::new_nullable(
1841 ConcreteDataType::int64_datatype(),
1842 )]);
1843 let key_val_plan = KeyValPlan {
1844 key_plan: MapFilterProject::new(1).project([]).unwrap().into_safe(),
1845 val_plan: MapFilterProject::new(1).project([0]).unwrap().into_safe(),
1846 };
1847
1848 let distinct_aggrs = vec![AggrWithIndex::new(
1849 AggregateExpr {
1850 func: AggregateFunc::SumInt64,
1851 expr: ScalarExpr::Column(0),
1852 distinct: false,
1853 },
1854 0,
1855 0,
1856 )];
1857 let accum_plan = AccumulablePlan {
1858 full_aggrs: vec![AggregateExpr {
1859 func: AggregateFunc::SumInt64,
1860 expr: ScalarExpr::Column(0),
1861 distinct: true,
1862 }],
1863 simple_aggrs: vec![],
1864 distinct_aggrs,
1865 };
1866
1867 let reduce_plan = ReducePlan::Accumulable(accum_plan);
1868 let bundle = ctx
1869 .render_reduce(
1870 Box::new(input_plan.with_types(typ.into_unnamed())),
1871 key_val_plan,
1872 reduce_plan,
1873 RelationType::empty(),
1874 )
1875 .unwrap();
1876
1877 let output = get_output_handle(&mut ctx, bundle);
1878 drop(ctx);
1879 let expected = BTreeMap::from([
1880 (1, vec![(Row::new(vec![0i64.into()]), 1, 1)]),
1881 (2, vec![(Row::new(vec![2i64.into()]), 2, 1)]),
1882 (3, vec![(Row::new(vec![5i64.into()]), 3, 1)]),
1883 (4, vec![(Row::new(vec![6i64.into()]), 4, 1)]),
1884 (5, vec![(Row::new(vec![6i64.into()]), 5, 1)]),
1885 (6, vec![(Row::new(vec![6i64.into()]), 6, 1)]),
1886 (7, vec![(Row::new(vec![6i64.into()]), 7, 1)]),
1887 ]);
1888 run_and_check(&mut state, &mut df, 1..7, expected, output);
1889 }
1890
1891 #[test]
1898 fn test_composite_reduce_distinct_accum() {
1899 let mut df = Dfir::new();
1900 let mut state = DataflowState::default();
1901 let mut ctx = harness_test_ctx(&mut df, &mut state);
1902
1903 let rows = vec![
1904 (Row::new(vec![1i64.into()]), 1, 1),
1905 (Row::new(vec![2i64.into()]), 2, 1),
1906 (Row::new(vec![3i64.into()]), 3, 1),
1907 (Row::new(vec![1i64.into()]), 4, 1),
1908 (Row::new(vec![2i64.into()]), 5, 1),
1909 (Row::new(vec![3i64.into()]), 6, 1),
1910 (Row::new(vec![1i64.into()]), 7, 1),
1911 ];
1912 let collection = ctx.render_constant(rows.clone());
1913 ctx.insert_global(GlobalId::User(1), collection);
1914 let input_plan = Plan::Get {
1915 id: expr::Id::Global(GlobalId::User(1)),
1916 };
1917 let typ = RelationType::new(vec![ColumnType::new_nullable(
1918 ConcreteDataType::int64_datatype(),
1919 )]);
1920 let key_val_plan = KeyValPlan {
1921 key_plan: MapFilterProject::new(1).project([]).unwrap().into_safe(),
1922 val_plan: MapFilterProject::new(1).project([0]).unwrap().into_safe(),
1923 };
1924 let simple_aggrs = vec![AggrWithIndex::new(
1925 AggregateExpr {
1926 func: AggregateFunc::SumInt64,
1927 expr: ScalarExpr::Column(0),
1928 distinct: false,
1929 },
1930 0,
1931 0,
1932 )];
1933 let distinct_aggrs = vec![AggrWithIndex::new(
1934 AggregateExpr {
1935 func: AggregateFunc::SumInt64,
1936 expr: ScalarExpr::Column(0),
1937 distinct: true,
1938 },
1939 0,
1940 1,
1941 )];
1942 let accum_plan = AccumulablePlan {
1943 full_aggrs: vec![
1944 AggregateExpr {
1945 func: AggregateFunc::SumInt64,
1946 expr: ScalarExpr::Column(0),
1947 distinct: false,
1948 },
1949 AggregateExpr {
1950 func: AggregateFunc::SumInt64,
1951 expr: ScalarExpr::Column(0),
1952 distinct: true,
1953 },
1954 ],
1955 simple_aggrs,
1956 distinct_aggrs,
1957 };
1958
1959 let reduce_plan = ReducePlan::Accumulable(accum_plan);
1960 let bundle = ctx
1961 .render_reduce(
1962 Box::new(input_plan.with_types(typ.into_unnamed())),
1963 key_val_plan,
1964 reduce_plan,
1965 RelationType::empty(),
1966 )
1967 .unwrap();
1968
1969 let output = get_output_handle(&mut ctx, bundle);
1970 drop(ctx);
1971 let expected = BTreeMap::from([
1972 (1, vec![(Row::new(vec![1i64.into(), 1i64.into()]), 1, 1)]),
1973 (2, vec![(Row::new(vec![3i64.into(), 3i64.into()]), 2, 1)]),
1974 (3, vec![(Row::new(vec![6i64.into(), 6i64.into()]), 3, 1)]),
1975 (4, vec![(Row::new(vec![7i64.into(), 6i64.into()]), 4, 1)]),
1976 (5, vec![(Row::new(vec![9i64.into(), 6i64.into()]), 5, 1)]),
1977 (6, vec![(Row::new(vec![12i64.into(), 6i64.into()]), 6, 1)]),
1978 (7, vec![(Row::new(vec![13i64.into(), 6i64.into()]), 7, 1)]),
1979 ]);
1980 run_and_check(&mut state, &mut df, 1..7, expected, output);
1981 }
1982}