flow/compute/render/
reduce.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, BTreeSet};
16use std::ops::Range;
17use std::sync::Arc;
18
19use arrow::array::new_null_array;
20use common_telemetry::trace;
21use datatypes::data_type::ConcreteDataType;
22use datatypes::prelude::DataType;
23use datatypes::value::{ListValue, Value};
24use datatypes::vectors::{BooleanVector, NullVector};
25use dfir_rs::scheduled::graph_ext::GraphExt;
26use itertools::Itertools;
27use snafu::{OptionExt, ResultExt, ensure};
28
29use crate::compute::render::{Context, SubgraphArg};
30use crate::compute::types::{Arranged, Collection, CollectionBundle, ErrCollector, Toff};
31use crate::error::{Error, NotImplementedSnafu, PlanSnafu};
32use crate::expr::error::{ArrowSnafu, DataAlreadyExpiredSnafu, DataTypeSnafu, InternalSnafu};
33use crate::expr::{Accum, Accumulator, Batch, EvalError, ScalarExpr, VectorDiff};
34use crate::plan::{AccumulablePlan, AggrWithIndex, KeyValPlan, ReducePlan, TypedPlan};
35use crate::repr::{self, DiffRow, KeyValDiffRow, RelationType, Row};
36use crate::utils::{ArrangeHandler, ArrangeReader, ArrangeWriter, KeyExpiryManager};
37
38impl Context<'_, '_> {
39    const REDUCE_BATCH: &'static str = "reduce_batch";
40    /// Like `render_reduce`, but for batch mode, and only barebone implementation
41    /// no support for distinct aggregation for now
42    // There is a false positive in using `Vec<ScalarExpr>` as key due to `Value` have `bytes` variant
43    #[allow(clippy::mutable_key_type)]
44    pub fn render_reduce_batch(
45        &mut self,
46        input: Box<TypedPlan>,
47        key_val_plan: &KeyValPlan,
48        reduce_plan: &ReducePlan,
49        output_type: &RelationType,
50    ) -> Result<CollectionBundle<Batch>, Error> {
51        let accum_plan = if let ReducePlan::Accumulable(accum_plan) = reduce_plan {
52            if !accum_plan.distinct_aggrs.is_empty() {
53                NotImplementedSnafu {
54                    reason: "Distinct aggregation is not supported in batch mode",
55                }
56                .fail()?
57            }
58            accum_plan.clone()
59        } else {
60            NotImplementedSnafu {
61                reason: "Only accumulable reduce plan is supported in batch mode",
62            }
63            .fail()?
64        };
65
66        let input = self.render_plan_batch(*input)?;
67
68        // first assembly key&val to separate key and val columns(since this is batch mode)
69        // Then stream kvs through a reduce operator
70
71        // the output is concat from key and val
72        let output_key_arity = key_val_plan.key_plan.output_arity();
73
74        // TODO(discord9): config global expire time from self
75        let arrange_handler = self.compute_state.new_arrange(None);
76
77        if let (Some(time_index), Some(expire_after)) =
78            (output_type.time_index, self.compute_state.expire_after())
79        {
80            let expire_man =
81                KeyExpiryManager::new(Some(expire_after), Some(ScalarExpr::Column(time_index)));
82            arrange_handler.write().set_expire_state(expire_man);
83        }
84
85        // reduce need full arrangement to be able to query all keys
86        let arrange_handler_inner = arrange_handler.clone_full_arrange().context(PlanSnafu {
87            reason: "No write is expected at this point",
88        })?;
89        let key_val_plan = key_val_plan.clone();
90
91        let output_type = output_type.clone();
92
93        let now = self.compute_state.current_time_ref();
94
95        let err_collector = self.err_collector.clone();
96
97        // TODO(discord9): better way to schedule future run
98        let scheduler = self.compute_state.get_scheduler();
99
100        let scheduler_inner = scheduler.clone();
101
102        let (out_send_port, out_recv_port) =
103            self.df.make_edge::<_, Toff<Batch>>(Self::REDUCE_BATCH);
104
105        let subgraph = self.df.add_subgraph_in_out(
106            Self::REDUCE_BATCH,
107            input.collection.into_inner(),
108            out_send_port,
109            move |_ctx, recv, send| {
110                let now = *(now.borrow());
111                let arrange = arrange_handler_inner.clone();
112                // mfp only need to passively receive updates from recvs
113                let src_data = recv
114                    .take_inner()
115                    .into_iter()
116                    .flat_map(|v| v.into_iter())
117                    .collect_vec();
118
119                reduce_batch_subgraph(
120                    &arrange,
121                    src_data,
122                    &key_val_plan,
123                    &accum_plan,
124                    &output_type,
125                    SubgraphArg {
126                        now,
127                        err_collector: &err_collector,
128                        scheduler: &scheduler_inner,
129                        send,
130                    },
131                )
132            },
133        );
134
135        scheduler.set_cur_subgraph(subgraph);
136
137        // by default the key of output arrange
138        let arranged = BTreeMap::from([(
139            (0..output_key_arity).map(ScalarExpr::Column).collect_vec(),
140            Arranged::new(arrange_handler),
141        )]);
142
143        let bundle = CollectionBundle {
144            collection: Collection::from_port(out_recv_port),
145            arranged,
146        };
147        Ok(bundle)
148    }
149
150    const REDUCE: &'static str = "reduce";
151    /// render `Plan::Reduce` into executable dataflow
152    // There is a false positive in using `Vec<ScalarExpr>` as key due to `Value` have `bytes` variant
153    #[allow(clippy::mutable_key_type)]
154    pub fn render_reduce(
155        &mut self,
156        input: Box<TypedPlan>,
157        key_val_plan: KeyValPlan,
158        reduce_plan: ReducePlan,
159        output_type: RelationType,
160    ) -> Result<CollectionBundle, Error> {
161        let input = self.render_plan(*input)?;
162        // first assembly key&val that's ((Row, Row), tick, diff)
163        // Then stream kvs through a reduce operator
164
165        // the output is concat from key and val
166        let output_key_arity = key_val_plan.key_plan.output_arity();
167
168        // TODO(discord9): config global expire time from self
169        let arrange_handler = self.compute_state.new_arrange(None);
170
171        if let (Some(time_index), Some(expire_after)) =
172            (output_type.time_index, self.compute_state.expire_after())
173        {
174            let expire_man =
175                KeyExpiryManager::new(Some(expire_after), Some(ScalarExpr::Column(time_index)));
176            arrange_handler.write().set_expire_state(expire_man);
177        }
178
179        // reduce need full arrangement to be able to query all keys
180        let arrange_handler_inner = arrange_handler.clone_full_arrange().context(PlanSnafu {
181            reason: "No write is expected at this point",
182        })?;
183
184        let distinct_input = self.add_accum_distinct_input_arrange(&reduce_plan);
185
186        let reduce_arrange = ReduceArrange {
187            output_arrange: arrange_handler_inner,
188            distinct_input,
189        };
190
191        let now = self.compute_state.current_time_ref();
192
193        let err_collector = self.err_collector.clone();
194
195        // TODO(discord9): better way to schedule future run
196        let scheduler = self.compute_state.get_scheduler();
197        let scheduler_inner = scheduler.clone();
198
199        let (out_send_port, out_recv_port) = self.df.make_edge::<_, Toff>(Self::REDUCE);
200
201        let subgraph = self.df.add_subgraph_in_out(
202            Self::REDUCE,
203            input.collection.into_inner(),
204            out_send_port,
205            move |_ctx, recv, send| {
206                // mfp only need to passively receive updates from recvs
207                let data = recv
208                    .take_inner()
209                    .into_iter()
210                    .flat_map(|v| v.into_iter())
211                    .collect_vec();
212
213                reduce_subgraph(
214                    &reduce_arrange,
215                    data,
216                    &key_val_plan,
217                    &reduce_plan,
218                    SubgraphArg {
219                        now: *now.borrow(),
220                        err_collector: &err_collector,
221                        scheduler: &scheduler_inner,
222                        send,
223                    },
224                );
225            },
226        );
227
228        scheduler.set_cur_subgraph(subgraph);
229
230        // by default the key of output arrange
231        let arranged = BTreeMap::from([(
232            (0..output_key_arity).map(ScalarExpr::Column).collect_vec(),
233            Arranged::new(arrange_handler),
234        )]);
235
236        let bundle = CollectionBundle {
237            collection: Collection::from_port(out_recv_port),
238            arranged,
239        };
240        Ok(bundle)
241    }
242
243    /// Contrast to it name, it's for adding distinct input for
244    /// accumulable reduce plan with distinct input,
245    /// like `select COUNT(DISTINCT col) from table`
246    ///
247    /// The return value is optional a list of arrangement, which is created for distinct input, and should be the
248    /// same length as the distinct aggregation in accumulable reduce plan
249    fn add_accum_distinct_input_arrange(
250        &mut self,
251        reduce_plan: &ReducePlan,
252    ) -> Option<Vec<ArrangeHandler>> {
253        match reduce_plan {
254            ReducePlan::Distinct => None,
255            ReducePlan::Accumulable(AccumulablePlan { distinct_aggrs, .. }) => {
256                (!distinct_aggrs.is_empty()).then(|| {
257                    std::iter::repeat_with(|| {
258                        let arr = self.compute_state.new_arrange(None);
259                        arr.set_full_arrangement(true);
260                        arr
261                    })
262                    .take(distinct_aggrs.len())
263                    .collect()
264                })
265            }
266        }
267    }
268}
269
270fn from_accum_values_to_live_accums(
271    accums: Vec<Value>,
272    len: usize,
273) -> Result<Vec<Vec<Value>>, EvalError> {
274    let accum_ranges = from_val_to_slice_idx(accums.first().cloned(), len)?;
275    let mut accum_list = vec![];
276    for range in accum_ranges.iter() {
277        accum_list.push(accums.get(range.clone()).unwrap_or_default().to_vec());
278    }
279    Ok(accum_list)
280}
281
282/// All arrange(aka state) used in reduce operator
283pub struct ReduceArrange {
284    /// The output arrange of reduce operator
285    output_arrange: ArrangeHandler,
286    /// The distinct input arrangement for accumulable reduce plan
287    /// only used when accumulable reduce plan has distinct aggregation
288    distinct_input: Option<Vec<ArrangeHandler>>,
289}
290
291fn batch_split_by_key_val(
292    batch: &Batch,
293    key_val_plan: &KeyValPlan,
294    err_collector: &ErrCollector,
295) -> (Batch, Batch) {
296    let row_count = batch.row_count();
297    let mut key_batch = Batch::empty();
298    let mut val_batch = Batch::empty();
299
300    err_collector.run(|| {
301        if key_val_plan.key_plan.output_arity() != 0 {
302            key_batch = key_val_plan.key_plan.eval_batch_into(&mut batch.clone())?;
303        }
304
305        if key_val_plan.val_plan.output_arity() != 0 {
306            val_batch = key_val_plan.val_plan.eval_batch_into(&mut batch.clone())?;
307        }
308        Ok(())
309    });
310
311    // deal with empty key or val
312    if key_batch.row_count() == 0 && key_batch.column_count() == 0 {
313        key_batch.set_row_count(row_count);
314    }
315
316    if val_batch.row_count() == 0 && val_batch.column_count() == 0 {
317        val_batch.set_row_count(row_count);
318    }
319
320    (key_batch, val_batch)
321}
322
323/// split a row into key and val by evaluate the key and val plan
324fn split_rows_to_key_val(
325    rows: impl IntoIterator<Item = DiffRow>,
326    key_val_plan: KeyValPlan,
327    err_collector: ErrCollector,
328) -> impl IntoIterator<Item = KeyValDiffRow> {
329    let mut row_buf = Row::new(vec![]);
330    rows.into_iter().filter_map(
331        move |(mut row, sys_time, diff): DiffRow| -> Option<KeyValDiffRow> {
332            err_collector.run(|| {
333                let len = row.len();
334                if let Some(key) = key_val_plan
335                    .key_plan
336                    .evaluate_into(&mut row.inner, &mut row_buf)?
337                {
338                    // reuse the row as buffer
339                    row.inner.resize(len, Value::Null);
340                    // val_plan is not supported to carry any filter predicate,
341                    let val = key_val_plan
342                        .val_plan
343                        .evaluate_into(&mut row.inner, &mut row_buf)?
344                        .context(InternalSnafu {
345                            reason: "val_plan should not contain any filter predicate",
346                        })?;
347                    Ok(Some(((key, val), sys_time, diff)))
348                } else {
349                    Ok(None)
350                }
351            })?
352        },
353    )
354}
355
356fn reduce_batch_subgraph(
357    arrange: &ArrangeHandler,
358    src_data: impl IntoIterator<Item = Batch>,
359    key_val_plan: &KeyValPlan,
360    accum_plan: &AccumulablePlan,
361    output_type: &RelationType,
362    SubgraphArg {
363        now,
364        err_collector,
365        scheduler: _,
366        send,
367    }: SubgraphArg<Toff<Batch>>,
368) {
369    let mut key_to_many_vals = BTreeMap::<Row, Vec<Batch>>::new();
370    let mut input_row_count = 0;
371    let mut input_batch_count = 0;
372
373    for batch in src_data {
374        input_batch_count += 1;
375        input_row_count += batch.row_count();
376        err_collector.run(|| {
377            let (key_batch, val_batch) =
378                batch_split_by_key_val(&batch, key_val_plan, err_collector);
379            ensure!(
380                key_batch.row_count() == val_batch.row_count(),
381                InternalSnafu {
382                    reason: format!(
383                        "Key and val batch should have the same row count, found {} and {}",
384                        key_batch.row_count(),
385                        val_batch.row_count()
386                    )
387                }
388            );
389
390            let mut distinct_keys = BTreeSet::new();
391            for row_idx in 0..key_batch.row_count() {
392                let key_row = key_batch.get_row(row_idx)?;
393                let key_row = Row::new(key_row);
394
395                if distinct_keys.contains(&key_row) {
396                    continue;
397                } else {
398                    distinct_keys.insert(key_row.clone());
399                }
400            }
401
402            let key_data_types = output_type
403                .column_types
404                .iter()
405                .map(|t| t.scalar_type.clone())
406                .collect_vec();
407
408            // TODO(discord9): here reduce numbers of eq to minimal by keeping slicing key/val batch
409            for key_row in distinct_keys {
410                let key_scalar_value = {
411                    let mut key_scalar_value = Vec::with_capacity(key_row.len());
412                    for (key_idx, key) in key_row.iter().enumerate() {
413                        let v =
414                            key.try_to_scalar_value(&key.data_type())
415                                .context(DataTypeSnafu {
416                                    msg: "can't convert key values to datafusion value",
417                                })?;
418
419                        let key_data_type = key_data_types.get(key_idx).context(InternalSnafu {
420                            reason: format!(
421                                "Key index out of bound, expected at most {} but got {}",
422                                output_type.column_types.len(),
423                                key_idx
424                            ),
425                        })?;
426
427                        // if incoming value's datatype is null, it need to be handled specially, see below
428                        if key_data_type.as_arrow_type() != v.data_type()
429                            && !v.data_type().is_null()
430                        {
431                            crate::expr::error::InternalSnafu {
432                                reason: format!(
433                                    "Key data type mismatch, expected {:?} but got {:?}",
434                                    key_data_type.as_arrow_type(),
435                                    v.data_type()
436                                ),
437                            }
438                            .fail()?
439                        }
440
441                        // handle single null key
442                        let arrow_value = if v.data_type().is_null() {
443                            let ret = new_null_array(&arrow::datatypes::DataType::Null, 1);
444                            arrow::array::Scalar::new(ret)
445                        } else {
446                            v.to_scalar().context(crate::expr::error::DatafusionSnafu {
447                                context: "can't convert key values to arrow value",
448                            })?
449                        };
450                        key_scalar_value.push(arrow_value);
451                    }
452                    key_scalar_value
453                };
454
455                // first compute equal from separate columns
456                let eq_results = key_scalar_value
457                    .into_iter()
458                    .zip(key_batch.batch().iter())
459                    .map(|(key, col)| {
460                        // TODO(discord9): this takes half of the cpu! And this is redundant amount of `eq`!
461
462                        // note that if lhs is a null, we still need to get all rows that are null! But can't use `eq` since
463                        // it will return null if input have null, so we need to use `is_null` instead
464                        if arrow::array::Datum::get(&key).0.data_type().is_null() {
465                            arrow::compute::kernels::boolean::is_null(
466                                col.to_arrow_array().as_ref() as _
467                            )
468                        } else {
469                            arrow::compute::kernels::cmp::eq(
470                                &key,
471                                &col.to_arrow_array().as_ref() as _,
472                            )
473                        }
474                    })
475                    .try_collect::<_, Vec<_>, _>()
476                    .context(ArrowSnafu {
477                        context: "Failed to compare key values",
478                    })?;
479
480                // then combine all equal results to finally found equal key rows
481                let opt_eq_mask = eq_results
482                    .into_iter()
483                    .fold(None, |acc, v| match acc {
484                        Some(Ok(acc)) => Some(arrow::compute::kernels::boolean::and(&acc, &v)),
485                        Some(Err(_)) => acc,
486                        None => Some(Ok(v)),
487                    })
488                    .transpose()
489                    .context(ArrowSnafu {
490                        context: "Failed to combine key comparison results",
491                    })?;
492
493                let key_eq_mask = if let Some(eq_mask) = opt_eq_mask {
494                    BooleanVector::from(eq_mask)
495                } else {
496                    // if None, meaning key_batch's column number is zero, which means
497                    // the key is empty, so we just return a mask of all true
498                    // meaning taking all values
499                    BooleanVector::from(vec![true; key_batch.row_count()])
500                };
501                // TODO: both slice and mutate remaining batch
502
503                let cur_val_batch = val_batch.filter(&key_eq_mask)?;
504
505                key_to_many_vals
506                    .entry(key_row)
507                    .or_default()
508                    .push(cur_val_batch);
509            }
510
511            Ok(())
512        });
513    }
514
515    trace!(
516        "Reduce take {} batches, {} rows",
517        input_batch_count, input_row_count
518    );
519
520    // write lock the arrange for the rest of the function body
521    // to prevent wired race condition
522    let mut arrange = arrange.write();
523    let mut all_arrange_updates = Vec::with_capacity(key_to_many_vals.len());
524
525    let mut all_output_dict = BTreeMap::new();
526
527    for (key, val_batches) in key_to_many_vals {
528        err_collector.run(|| -> Result<(), _> {
529            let (accums, _, _) = arrange.get(now, &key).unwrap_or_default();
530            let accum_list =
531                from_accum_values_to_live_accums(accums.unpack(), accum_plan.simple_aggrs.len())?;
532
533            let mut accum_output = AccumOutput::new();
534            for AggrWithIndex {
535                expr,
536                input_idx,
537                output_idx,
538            } in accum_plan.simple_aggrs.iter()
539            {
540                let cur_accum_value = accum_list.get(*output_idx).cloned().unwrap_or_default();
541                let mut cur_accum = if cur_accum_value.is_empty() {
542                    Accum::new_accum(&expr.func.clone())?
543                } else {
544                    Accum::try_into_accum(&expr.func, cur_accum_value)?
545                };
546
547                for val_batch in val_batches.iter() {
548                    // if batch is empty, input null instead
549                    let cur_input = val_batch
550                        .batch()
551                        .get(*input_idx)
552                        .cloned()
553                        .unwrap_or_else(|| Arc::new(NullVector::new(val_batch.row_count())));
554                    let len = cur_input.len();
555                    cur_accum.update_batch(&expr.func, VectorDiff::from(cur_input))?;
556
557                    trace!("Reduce accum after take {} rows: {:?}", len, cur_accum);
558                }
559                let final_output = cur_accum.eval(&expr.func)?;
560                trace!("Reduce accum final output: {:?}", final_output);
561                accum_output.insert_output(*output_idx, final_output);
562
563                let cur_accum_value = cur_accum.into_state();
564                accum_output.insert_accum(*output_idx, cur_accum_value);
565            }
566
567            let (new_accums, res_val_row) = accum_output.into_accum_output()?;
568
569            let arrange_update = ((key.clone(), Row::new(new_accums)), now, 1);
570            all_arrange_updates.push(arrange_update);
571
572            all_output_dict.insert(key, Row::from(res_val_row));
573
574            Ok(())
575        });
576    }
577
578    err_collector.run(|| {
579        arrange.apply_updates(now, all_arrange_updates)?;
580        arrange.compact_to(now)
581    });
582    // release the lock
583    drop(arrange);
584
585    // this output part is not supposed to be resource intensive
586    // (because for every batch there wouldn't usually be as many output row?),
587    // so we can do some costly operation here
588    let output_types = output_type
589        .column_types
590        .iter()
591        .map(|t| t.scalar_type.clone())
592        .collect_vec();
593
594    err_collector.run(|| {
595            let column_cnt = output_types.len();
596            let row_cnt = all_output_dict.len();
597
598            let mut output_builder = output_types
599                .into_iter()
600                .map(|t| t.create_mutable_vector(row_cnt))
601                .collect_vec();
602
603            for (key, val) in all_output_dict {
604                for (i, v) in key.into_iter().chain(val.into_iter()).enumerate() {
605                    output_builder
606                    .get_mut(i)
607                    .context(InternalSnafu{
608                        reason: format!(
609                            "Output builder should have the same length as the row, expected at most {} but got {}",
610                            column_cnt - 1,
611                            i
612                        )
613                    })?
614                    .try_push_value_ref(v.as_value_ref())
615                    .context(DataTypeSnafu {
616                        msg: "Failed to push value",
617                    })?;
618                }
619            }
620
621            let output_columns = output_builder
622                .into_iter()
623                .map(|mut b| b.to_vector())
624                .collect_vec();
625
626            let output_batch = Batch::try_new(output_columns, row_cnt)?;
627
628            trace!("Reduce output batch: {:?}", output_batch);
629
630            send.give(vec![output_batch]);
631
632            Ok(())
633        });
634}
635
636/// reduce subgraph, reduce the input data into a single row
637/// output is concat from key and val
638fn reduce_subgraph(
639    ReduceArrange {
640        output_arrange: arrange,
641        distinct_input,
642    }: &ReduceArrange,
643    data: impl IntoIterator<Item = DiffRow>,
644    key_val_plan: &KeyValPlan,
645    reduce_plan: &ReducePlan,
646    SubgraphArg {
647        now,
648        err_collector,
649        scheduler,
650        send,
651    }: SubgraphArg,
652) {
653    let key_val = split_rows_to_key_val(data, key_val_plan.clone(), err_collector.clone());
654    // from here for distinct reduce and accum reduce, things are drastically different
655    // for distinct reduce the arrange store the output,
656    // but for accum reduce the arrange store the accum state, and output is
657    // evaluated from the accum state if there is need to update
658    match reduce_plan {
659        ReducePlan::Distinct => reduce_distinct_subgraph(
660            arrange,
661            key_val,
662            SubgraphArg {
663                now,
664                err_collector,
665                scheduler,
666                send,
667            },
668        ),
669        ReducePlan::Accumulable(accum_plan) => reduce_accum_subgraph(
670            arrange,
671            distinct_input,
672            key_val,
673            accum_plan,
674            SubgraphArg {
675                now,
676                err_collector,
677                scheduler,
678                send,
679            },
680        ),
681    };
682}
683
684/// return distinct rows(distinct by row's key) from the input, but do not update the arrangement
685///
686/// if the same key already exist, we only preserve the oldest value(It make sense for distinct input over key)
687fn eval_distinct_core(
688    arrange: ArrangeReader,
689    kv: impl IntoIterator<Item = KeyValDiffRow>,
690    now: repr::Timestamp,
691    err_collector: &ErrCollector,
692) -> Vec<KeyValDiffRow> {
693    let _ = err_collector;
694
695    // note that we also need to keep track of the distinct rows inside the current input
696    // hence the `inner_map` to keeping track of the distinct rows
697    let mut inner_map = BTreeMap::new();
698    kv.into_iter()
699        .filter_map(|((key, val), ts, diff)| {
700            // first check inner_map, then check the arrangement to make sure getting the newest value
701            let old_val = inner_map
702                .get(&key)
703                .cloned()
704                .or_else(|| arrange.get(now, &key));
705
706            let new_key_val = match (old_val, diff) {
707                // a new distinct row
708                (None, 1) => Some(((key, val), ts, diff)),
709                // if diff from newest value, also do update
710                (Some(old_val), diff) if old_val.0 == val && old_val.2 != diff => {
711                    Some(((key, val), ts, diff))
712                }
713                _ => None,
714            };
715
716            if let Some(((k, v), t, d)) = new_key_val.clone() {
717                // update the inner_map, so later updates can be checked against it
718                inner_map.insert(k, (v, t, d));
719            }
720            new_key_val
721        })
722        .collect_vec()
723}
724
725/// eval distinct reduce plan, output the distinct, and update the arrangement
726///
727/// This function is extracted because also want to use it to update distinct input of accumulable reduce plan
728fn update_reduce_distinct_arrange(
729    arrange: &ArrangeHandler,
730    kv: impl IntoIterator<Item = KeyValDiffRow>,
731    now: repr::Timestamp,
732    err_collector: &ErrCollector,
733) -> impl Iterator<Item = DiffRow> {
734    let result_updates = eval_distinct_core(arrange.read(), kv, now, err_collector);
735
736    err_collector.run(|| {
737        arrange.write().apply_updates(now, result_updates)?;
738        Ok(())
739    });
740
741    // Deal with output:
742
743    // 1. Read all updates that were emitted between the last time this arrangement had updates and the current time.
744    let from = arrange.read().last_compaction_time();
745    let from = from.unwrap_or(repr::Timestamp::MIN);
746    let range = (
747        std::ops::Bound::Excluded(from),
748        std::ops::Bound::Included(now),
749    );
750    let output_kv = arrange.read().get_updates_in_range(range);
751
752    // 2. Truncate all updates stored in arrangement within that range.
753    let run_compaction = || {
754        arrange.write().compact_to(now)?;
755        Ok(())
756    };
757    err_collector.run(run_compaction);
758
759    // 3. Output the updates.
760    // output is concat from key and val
761    output_kv.into_iter().map(|((mut key, v), ts, diff)| {
762        key.extend(v.into_iter());
763        (key, ts, diff)
764    })
765}
766
767/// eval distinct reduce plan, output the distinct, and update the arrangement
768///
769/// invariant: it'is assumed `kv`'s time is always <= now,
770/// since it's from a Collection Bundle, where future inserts are stored in arrange
771fn reduce_distinct_subgraph(
772    arrange: &ArrangeHandler,
773    kv: impl IntoIterator<Item = KeyValDiffRow>,
774    SubgraphArg {
775        now,
776        err_collector,
777        scheduler: _,
778        send,
779    }: SubgraphArg,
780) {
781    let ret = update_reduce_distinct_arrange(arrange, kv, now, err_collector).collect_vec();
782
783    // no future updates should exist here
784    if arrange.read().get_next_update_time(&now).is_some() {
785        err_collector.push_err(
786            InternalSnafu {
787                reason: "No future updates should exist in the reduce distinct arrangement",
788            }
789            .build(),
790        );
791    }
792
793    send.give(ret);
794}
795
796/// eval accumulable reduce plan by eval aggregate function and reduce the result
797///
798/// TODO(discord9): eval distinct by adding distinct input arrangement
799///
800/// invariant: it'is assumed `kv`'s time is always <= now,
801/// since it's from a Collection Bundle, where future inserts are stored in arrange
802///
803/// the data being send is just new rows that represent the new output after given input is processed
804///
805/// i.e: for example before new updates comes in, the output of query `SELECT sum(number), count(number) FROM table`
806/// is (10,2(), and after new updates comes in, the output is (15,3), then the new row being send is ((15, 3), now, 1)
807///
808/// while it will also update key -> accums's value, for example if it is empty before, it will become something like
809/// |offset| accum for sum | accum for count |
810/// where offset is a single value holding the end offset of each accumulator
811/// and the rest is the actual accumulator values which could be multiple values
812fn reduce_accum_subgraph(
813    arrange: &ArrangeHandler,
814    distinct_input: &Option<Vec<ArrangeHandler>>,
815    kv: impl IntoIterator<Item = KeyValDiffRow>,
816    accum_plan: &AccumulablePlan,
817    SubgraphArg {
818        now,
819        err_collector,
820        scheduler,
821        send,
822    }: SubgraphArg,
823) {
824    let AccumulablePlan {
825        full_aggrs,
826        simple_aggrs,
827        distinct_aggrs,
828    } = accum_plan;
829    let mut key_to_vals = BTreeMap::<Row, Vec<(Row, repr::Diff)>>::new();
830
831    for ((key, val), _tick, diff) in kv {
832        // it is assumed that value is in order of insertion
833        let vals = key_to_vals.entry(key).or_default();
834        vals.push((val, diff));
835    }
836
837    let mut all_updates = Vec::with_capacity(key_to_vals.len());
838    let mut all_outputs = Vec::with_capacity(key_to_vals.len());
839    // lock the arrange for write for the rest of function body
840    // so to prevent wired race condition since we are going to update the arrangement by write after read
841    // TODO(discord9): consider key-based lock
842    let mut arrange = arrange.write();
843    for (key, value_diffs) in key_to_vals {
844        if let Some(expire_man) = &arrange.get_expire_state() {
845            let mut is_expired = false;
846            err_collector.run(|| {
847                if let Some(expired) = expire_man.get_expire_duration(now, &key)? {
848                    is_expired = true;
849                    // expired data is ignored in computation, and a simple warning is logged
850                    common_telemetry::warn!(
851                        "Data already expired: {}",
852                        DataAlreadyExpiredSnafu {
853                            expired_by: expired,
854                        }
855                        .build()
856                    );
857                    Ok(())
858                } else {
859                    Ok(())
860                }
861            });
862            if is_expired {
863                // errors already collected, we can just continue to next key
864                continue;
865            }
866        }
867        let col_diffs = {
868            let row_len = value_diffs[0].0.len();
869            let res = err_collector.run(|| get_col_diffs(value_diffs, row_len));
870            match res {
871                Some(res) => res,
872                // TODO(discord9): consider better error handling other than
873                // just skip the row and logging error
874                None => continue,
875            }
876        };
877        let (accums, _, _) = arrange.get(now, &key).unwrap_or_default();
878
879        let accums = accums.inner;
880
881        // deser accums from offsets
882        let accum_ranges = {
883            let res = err_collector
884                .run(|| from_val_to_slice_idx(accums.first().cloned(), full_aggrs.len()));
885            if let Some(res) = res {
886                res
887            } else {
888                // errors is collected, we can just continue and send error back through `err_collector`
889                continue;
890            }
891        };
892
893        let mut accum_output = AccumOutput::new();
894        eval_simple_aggrs(
895            simple_aggrs,
896            &accums,
897            &accum_ranges,
898            &col_diffs,
899            &mut accum_output,
900            err_collector,
901        );
902
903        // for distinct input
904        eval_distinct_aggrs(
905            distinct_aggrs,
906            distinct_input,
907            &accums,
908            &accum_ranges,
909            &col_diffs,
910            &mut accum_output,
911            SubgraphArg {
912                now,
913                err_collector,
914                scheduler,
915                send,
916            },
917        );
918
919        // get and append results
920        err_collector.run(|| {
921            let (new_accums, res_val_row) = accum_output.into_accum_output()?;
922
923            // construct the updates and save it
924            all_updates.push(((key.clone(), Row::new(new_accums)), now, 1));
925            let mut key_val = key;
926            key_val.extend(res_val_row);
927            all_outputs.push((key_val, now, 1));
928            Ok(())
929        });
930    }
931    err_collector.run(|| {
932        arrange.apply_updates(now, all_updates)?;
933        arrange.compact_to(now)
934    });
935
936    // for all arranges involved, schedule next time this subgraph should run
937    // no future updates should exist here
938    let all_arrange_used = distinct_input
939        .iter()
940        .flatten()
941        .map(|d| d.write())
942        .chain(std::iter::once(arrange));
943    check_no_future_updates(all_arrange_used, err_collector, now);
944
945    send.give(all_outputs);
946}
947
948fn get_col_diffs(
949    value_diffs: Vec<(Row, repr::Diff)>,
950    row_len: usize,
951) -> Result<Vec<Vec<(Value, i64)>>, EvalError> {
952    ensure!(
953        value_diffs.iter().all(|(row, _)| row.len() == row_len),
954        InternalSnafu {
955            reason: "value_diffs should have rows with equal length"
956        }
957    );
958    let ret = (0..row_len)
959        .map(|i| {
960            value_diffs
961                .iter()
962                .map(|(row, diff)| (row.get(i).cloned().unwrap(), *diff))
963                .collect_vec()
964        })
965        .collect_vec();
966    Ok(ret)
967}
968
969/// Eval simple aggregate functions with no distinct input
970fn eval_simple_aggrs(
971    simple_aggrs: &Vec<AggrWithIndex>,
972    accums: &[Value],
973    accum_ranges: &[Range<usize>],
974    col_diffs: &[Vec<(Value, i64)>],
975    accum_output: &mut AccumOutput,
976    err_collector: &ErrCollector,
977) {
978    for AggrWithIndex {
979        expr,
980        input_idx,
981        output_idx,
982    } in simple_aggrs
983    {
984        let cur_accum_range = accum_ranges[*output_idx].clone(); // range of current accum
985        let cur_old_accum = accums
986            .get(cur_accum_range)
987            .unwrap_or_default()
988            .iter()
989            .cloned();
990        let cur_col_diff = col_diffs[*input_idx].iter().cloned();
991
992        // actual eval aggregation function
993        if let Some((res, new_accum)) =
994            err_collector.run(|| expr.func.eval_diff_accumulable(cur_old_accum, cur_col_diff))
995        {
996            accum_output.insert_accum(*output_idx, new_accum);
997            accum_output.insert_output(*output_idx, res);
998        } // else just collect error and continue
999    }
1000}
1001
1002/// Accumulate the output of aggregation functions
1003///
1004/// The accum is a map from index to the accumulator of the aggregation function
1005///
1006/// The output is a map from index to the output of the aggregation function
1007#[derive(Debug)]
1008struct AccumOutput {
1009    accum: BTreeMap<usize, Vec<Value>>,
1010    output: BTreeMap<usize, Value>,
1011}
1012
1013impl AccumOutput {
1014    fn new() -> Self {
1015        Self {
1016            accum: BTreeMap::new(),
1017            output: BTreeMap::new(),
1018        }
1019    }
1020
1021    fn insert_accum(&mut self, idx: usize, v: Vec<Value>) {
1022        self.accum.insert(idx, v);
1023    }
1024
1025    fn insert_output(&mut self, idx: usize, v: Value) {
1026        self.output.insert(idx, v);
1027    }
1028
1029    /// return (accums, output)
1030    fn into_accum_output(self) -> Result<(Vec<Value>, Vec<Value>), EvalError> {
1031        if self.accum.is_empty() && self.output.is_empty() {
1032            return Ok((vec![], vec![]));
1033        }
1034        ensure!(
1035            !self.accum.is_empty() && self.accum.len() == self.output.len(),
1036            InternalSnafu {
1037                reason: format!(
1038                    "Accum and output should have the non-zero and same length, found accum.len() = {}, output.len() = {}",
1039                    self.accum.len(),
1040                    self.output.len()
1041                )
1042            }
1043        );
1044        // make output vec from output map
1045        if let Some(kv) = self.accum.last_key_value() {
1046            ensure!(
1047                *kv.0 == self.accum.len() - 1,
1048                InternalSnafu {
1049                    reason: "Accum should be a continuous range"
1050                }
1051            );
1052        }
1053        if let Some(kv) = self.output.last_key_value() {
1054            ensure!(
1055                *kv.0 == self.output.len() - 1,
1056                InternalSnafu {
1057                    reason: "Output should be a continuous range"
1058                }
1059            );
1060        }
1061
1062        let accums = self.accum.into_values().collect_vec();
1063        let new_accums = from_accums_to_offsetted_accum(accums);
1064        let output = self.output.into_values().collect_vec();
1065        Ok((new_accums, output))
1066    }
1067}
1068
1069/// Eval distinct aggregate functions with distinct input arrange
1070fn eval_distinct_aggrs(
1071    distinct_aggrs: &Vec<AggrWithIndex>,
1072    distinct_input: &Option<Vec<ArrangeHandler>>,
1073    accums: &[Value],
1074    accum_ranges: &[Range<usize>],
1075    col_diffs: &[Vec<(Value, i64)>],
1076    accum_output: &mut AccumOutput,
1077    SubgraphArg {
1078        now,
1079        err_collector,
1080        scheduler: _,
1081        send: _,
1082    }: SubgraphArg,
1083) {
1084    for AggrWithIndex {
1085        expr,
1086        input_idx,
1087        output_idx,
1088    } in distinct_aggrs
1089    {
1090        let cur_accum_range = accum_ranges[*output_idx].clone(); // range of current accum
1091        let cur_old_accum = accums
1092            .get(cur_accum_range)
1093            .unwrap_or_default()
1094            .iter()
1095            .cloned();
1096        let cur_col_diff = col_diffs[*input_idx].iter().cloned();
1097        // first filter input with distinct
1098        let input_arrange = distinct_input
1099            .as_ref()
1100            .and_then(|v| v[*input_idx].clone_full_arrange())
1101            .expect("A full distinct input arrangement should exist");
1102        let kv = cur_col_diff.map(|(v, d)| ((Row::new(vec![v]), Row::empty()), now, d));
1103        let col_diff_distinct =
1104            update_reduce_distinct_arrange(&input_arrange, kv, now, err_collector).map(
1105                |(row, _ts, diff)| (row.get(0).expect("Row should not be empty").clone(), diff),
1106            );
1107        let col_diff_distinct = {
1108            let res = col_diff_distinct.collect_vec();
1109            res.into_iter()
1110        };
1111        // actual eval aggregation function
1112        let (res, new_accum) = expr
1113            .func
1114            .eval_diff_accumulable(cur_old_accum, col_diff_distinct)
1115            .unwrap();
1116        accum_output.insert_accum(*output_idx, new_accum);
1117        accum_output.insert_output(*output_idx, res);
1118    }
1119}
1120
1121fn check_no_future_updates<'a>(
1122    all_arrange_used: impl IntoIterator<Item = ArrangeWriter<'a>>,
1123    err_collector: &ErrCollector,
1124    now: repr::Timestamp,
1125) {
1126    for arrange in all_arrange_used {
1127        if arrange.get_next_update_time(&now).is_some() {
1128            err_collector.push_err(
1129                InternalSnafu {
1130                    reason: "No future updates should exist in the reduce distinct arrangement",
1131                }
1132                .build(),
1133            );
1134        }
1135    }
1136}
1137
1138/// convert a list of accumulators to a vector of values with first value as offset of end of each accumulator
1139fn from_accums_to_offsetted_accum(new_accums: Vec<Vec<Value>>) -> Vec<Value> {
1140    let offset = new_accums
1141        .iter()
1142        .map(|v| v.len() as u64)
1143        .scan(1, |state, x| {
1144            *state += x;
1145            Some(*state)
1146        })
1147        .map(Value::from)
1148        .collect::<Vec<_>>();
1149    let first = ListValue::new(offset, ConcreteDataType::uint64_datatype());
1150    let first = Value::List(first);
1151    // construct new_accums
1152
1153    std::iter::once(first)
1154        .chain(new_accums.into_iter().flatten())
1155        .collect::<Vec<_>>()
1156}
1157
1158/// Convert a value to a list of slice index
1159fn from_val_to_slice_idx(
1160    value: Option<Value>,
1161    expected_len: usize,
1162) -> Result<Vec<Range<usize>>, EvalError> {
1163    let offset_end = if let Some(value) = value {
1164        let list = value
1165            .as_list()
1166            .with_context(|_| DataTypeSnafu {
1167                msg: "Accum's first element should be a list",
1168            })?
1169            .context(InternalSnafu {
1170                reason: "Accum's first element should be a list",
1171            })?;
1172        let ret: Vec<usize> = list
1173            .items()
1174            .iter()
1175            .map(|v| {
1176                v.as_u64().map(|j| j as usize).context(InternalSnafu {
1177                    reason: "End offset should be a list of u64",
1178                })
1179            })
1180            .try_collect()?;
1181        ensure!(
1182            ret.len() == expected_len,
1183            InternalSnafu {
1184                reason: "Offset List should have the same length as full_aggrs"
1185            }
1186        );
1187        Ok(ret)
1188    } else {
1189        Ok(vec![1usize; expected_len])
1190    }?;
1191    let accum_ranges = (0..expected_len)
1192        .map(|idx| {
1193            if idx == 0 {
1194                // note that the first element is the offset list
1195                debug_assert!(
1196                    offset_end[0] >= 1,
1197                    "Offset should be at least 1: {:?}",
1198                    &offset_end
1199                );
1200                1..offset_end[0]
1201            } else {
1202                offset_end[idx - 1]..offset_end[idx]
1203            }
1204        })
1205        .collect_vec();
1206    Ok(accum_ranges)
1207}
1208
1209// mainly for reduce's test
1210// TODO(discord9): add tests for accum ser/de
1211#[cfg(test)]
1212mod test {
1213
1214    use std::time::Duration;
1215
1216    use common_time::Timestamp;
1217    use datatypes::data_type::{ConcreteDataType, ConcreteDataType as CDT};
1218    use dfir_rs::scheduled::graph::Dfir;
1219
1220    use super::*;
1221    use crate::compute::render::test::{get_output_handle, harness_test_ctx, run_and_check};
1222    use crate::compute::state::DataflowState;
1223    use crate::expr::{
1224        self, AggregateExpr, AggregateFunc, BinaryFunc, GlobalId, MapFilterProject, UnaryFunc,
1225    };
1226    use crate::plan::Plan;
1227    use crate::repr::{ColumnType, RelationType};
1228
1229    /// SELECT sum(number) FROM numbers_with_ts GROUP BY tumble(ts, '1 second', '2021-07-01 00:00:00')
1230    /// input table columns: number, ts
1231    /// expected: sum(number), window_start, window_end
1232    #[test]
1233    fn test_tumble_group_by() {
1234        let mut df = Dfir::new();
1235        let mut state = DataflowState::default();
1236        let mut ctx = harness_test_ctx(&mut df, &mut state);
1237        const START: i64 = 1625097600000;
1238        let rows = vec![
1239            (1u32, START + 1000),
1240            (2u32, START + 1500),
1241            (3u32, START + 2000),
1242            (1u32, START + 2500),
1243            (2u32, START + 3000),
1244            (3u32, START + 3500),
1245        ];
1246        let rows = rows
1247            .into_iter()
1248            .map(|(number, ts)| {
1249                (
1250                    Row::new(vec![number.into(), Timestamp::new_millisecond(ts).into()]),
1251                    1,
1252                    1,
1253                )
1254            })
1255            .collect_vec();
1256
1257        let collection = ctx.render_constant(rows.clone());
1258        ctx.insert_global(GlobalId::User(1), collection);
1259
1260        let aggr_expr = AggregateExpr {
1261            func: AggregateFunc::SumUInt32,
1262            expr: ScalarExpr::Column(0),
1263            distinct: false,
1264        };
1265        let expected = TypedPlan {
1266            schema: RelationType::new(vec![
1267                ColumnType::new(CDT::uint64_datatype(), true), // sum(number)
1268                ColumnType::new(CDT::timestamp_millisecond_datatype(), false), // window start
1269                ColumnType::new(CDT::timestamp_millisecond_datatype(), false), // window end
1270            ])
1271            .into_unnamed(),
1272            // TODO(discord9): mfp indirectly ref to key columns
1273            /*
1274            .with_key(vec![1])
1275            .with_time_index(Some(0)),*/
1276            plan: Plan::Mfp {
1277                input: Box::new(
1278                    Plan::Reduce {
1279                        input: Box::new(
1280                            Plan::Get {
1281                                id: crate::expr::Id::Global(GlobalId::User(1)),
1282                            }
1283                            .with_types(
1284                                RelationType::new(vec![
1285                                    ColumnType::new(ConcreteDataType::uint32_datatype(), false),
1286                                    ColumnType::new(
1287                                        ConcreteDataType::timestamp_millisecond_datatype(),
1288                                        false,
1289                                    ),
1290                                ])
1291                                .into_unnamed(),
1292                            ),
1293                        ),
1294                        key_val_plan: KeyValPlan {
1295                            key_plan: MapFilterProject::new(2)
1296                                .map(vec![
1297                                    ScalarExpr::Column(1).call_unary(
1298                                        UnaryFunc::TumbleWindowFloor {
1299                                            window_size: Duration::from_nanos(1_000_000_000),
1300                                            start_time: Some(Timestamp::new_millisecond(
1301                                                1625097600000,
1302                                            )),
1303                                        },
1304                                    ),
1305                                    ScalarExpr::Column(1).call_unary(
1306                                        UnaryFunc::TumbleWindowCeiling {
1307                                            window_size: Duration::from_nanos(1_000_000_000),
1308                                            start_time: Some(Timestamp::new_millisecond(
1309                                                1625097600000,
1310                                            )),
1311                                        },
1312                                    ),
1313                                ])
1314                                .unwrap()
1315                                .project(vec![2, 3])
1316                                .unwrap()
1317                                .into_safe(),
1318                            val_plan: MapFilterProject::new(2)
1319                                .project(vec![0, 1])
1320                                .unwrap()
1321                                .into_safe(),
1322                        },
1323                        reduce_plan: ReducePlan::Accumulable(AccumulablePlan {
1324                            full_aggrs: vec![aggr_expr.clone()],
1325                            simple_aggrs: vec![AggrWithIndex::new(aggr_expr.clone(), 0, 0)],
1326                            distinct_aggrs: vec![],
1327                        }),
1328                    }
1329                    .with_types(
1330                        RelationType::new(vec![
1331                            ColumnType::new(CDT::timestamp_millisecond_datatype(), false), // window start
1332                            ColumnType::new(CDT::timestamp_millisecond_datatype(), false), // window end
1333                            ColumnType::new(CDT::uint64_datatype(), true), //sum(number)
1334                        ])
1335                        .with_key(vec![1])
1336                        .with_time_index(Some(0))
1337                        .into_unnamed(),
1338                    ),
1339                ),
1340                mfp: MapFilterProject::new(3)
1341                    .map(vec![
1342                        ScalarExpr::Column(2),
1343                        ScalarExpr::Column(3),
1344                        ScalarExpr::Column(0),
1345                        ScalarExpr::Column(1),
1346                    ])
1347                    .unwrap()
1348                    .project(vec![4, 5, 6])
1349                    .unwrap(),
1350            },
1351        };
1352
1353        let bundle = ctx.render_plan(expected).unwrap();
1354
1355        let output = get_output_handle(&mut ctx, bundle);
1356        drop(ctx);
1357        let expected = BTreeMap::from([(
1358            1,
1359            vec![
1360                (
1361                    Row::new(vec![
1362                        3u64.into(),
1363                        Timestamp::new_millisecond(START + 1000).into(),
1364                        Timestamp::new_millisecond(START + 2000).into(),
1365                    ]),
1366                    1,
1367                    1,
1368                ),
1369                (
1370                    Row::new(vec![
1371                        4u64.into(),
1372                        Timestamp::new_millisecond(START + 2000).into(),
1373                        Timestamp::new_millisecond(START + 3000).into(),
1374                    ]),
1375                    1,
1376                    1,
1377                ),
1378                (
1379                    Row::new(vec![
1380                        5u64.into(),
1381                        Timestamp::new_millisecond(START + 3000).into(),
1382                        Timestamp::new_millisecond(START + 4000).into(),
1383                    ]),
1384                    1,
1385                    1,
1386                ),
1387            ],
1388        )]);
1389        run_and_check(&mut state, &mut df, 1..2, expected, output);
1390    }
1391
1392    /// select avg(number) from number;
1393    #[test]
1394    fn test_avg_eval() {
1395        let mut df = Dfir::new();
1396        let mut state = DataflowState::default();
1397        let mut ctx = harness_test_ctx(&mut df, &mut state);
1398
1399        let rows = vec![
1400            (Row::new(vec![1u32.into()]), 1, 1),
1401            (Row::new(vec![2u32.into()]), 1, 1),
1402            (Row::new(vec![3u32.into()]), 1, 1),
1403            (Row::new(vec![1u32.into()]), 1, 1),
1404            (Row::new(vec![2u32.into()]), 1, 1),
1405            (Row::new(vec![3u32.into()]), 1, 1),
1406        ];
1407        let collection = ctx.render_constant(rows.clone());
1408        ctx.insert_global(GlobalId::User(1), collection);
1409
1410        let aggr_exprs = vec![
1411            AggregateExpr {
1412                func: AggregateFunc::SumUInt32,
1413                expr: ScalarExpr::Column(0),
1414                distinct: false,
1415            },
1416            AggregateExpr {
1417                func: AggregateFunc::Count,
1418                expr: ScalarExpr::Column(0),
1419                distinct: false,
1420            },
1421        ];
1422        let avg_expr = ScalarExpr::If {
1423            cond: Box::new(ScalarExpr::Column(1).call_binary(
1424                ScalarExpr::Literal(Value::from(0u32), CDT::int64_datatype()),
1425                BinaryFunc::NotEq,
1426            )),
1427            then: Box::new(ScalarExpr::Column(0).call_binary(
1428                ScalarExpr::Column(1).call_unary(UnaryFunc::Cast(CDT::uint64_datatype())),
1429                BinaryFunc::DivUInt64,
1430            )),
1431            els: Box::new(ScalarExpr::Literal(Value::Null, CDT::uint64_datatype())),
1432        };
1433        let expected = TypedPlan {
1434            schema: RelationType::new(vec![ColumnType::new(CDT::uint64_datatype(), true)])
1435                .into_unnamed(),
1436            plan: Plan::Mfp {
1437                input: Box::new(
1438                    Plan::Reduce {
1439                        input: Box::new(
1440                            Plan::Get {
1441                                id: crate::expr::Id::Global(GlobalId::User(1)),
1442                            }
1443                            .with_types(
1444                                RelationType::new(vec![ColumnType::new(
1445                                    ConcreteDataType::int64_datatype(),
1446                                    false,
1447                                )])
1448                                .into_unnamed(),
1449                            ),
1450                        ),
1451                        key_val_plan: KeyValPlan {
1452                            key_plan: MapFilterProject::new(1)
1453                                .project(vec![])
1454                                .unwrap()
1455                                .into_safe(),
1456                            val_plan: MapFilterProject::new(1)
1457                                .project(vec![0])
1458                                .unwrap()
1459                                .into_safe(),
1460                        },
1461                        reduce_plan: ReducePlan::Accumulable(AccumulablePlan {
1462                            full_aggrs: aggr_exprs.clone(),
1463                            simple_aggrs: vec![
1464                                AggrWithIndex::new(aggr_exprs[0].clone(), 0, 0),
1465                                AggrWithIndex::new(aggr_exprs[1].clone(), 0, 1),
1466                            ],
1467                            distinct_aggrs: vec![],
1468                        }),
1469                    }
1470                    .with_types(
1471                        RelationType::new(vec![
1472                            ColumnType::new(ConcreteDataType::uint32_datatype(), true),
1473                            ColumnType::new(ConcreteDataType::int64_datatype(), true),
1474                        ])
1475                        .into_unnamed(),
1476                    ),
1477                ),
1478                mfp: MapFilterProject::new(2)
1479                    .map(vec![
1480                        avg_expr,
1481                        // TODO(discord9): optimize mfp so to remove indirect ref
1482                        ScalarExpr::Column(2),
1483                    ])
1484                    .unwrap()
1485                    .project(vec![3])
1486                    .unwrap(),
1487            },
1488        };
1489
1490        let bundle = ctx.render_plan(expected).unwrap();
1491
1492        let output = get_output_handle(&mut ctx, bundle);
1493        drop(ctx);
1494        let expected = BTreeMap::from([(1, vec![(Row::new(vec![2u64.into()]), 1, 1)])]);
1495        run_and_check(&mut state, &mut df, 1..2, expected, output);
1496    }
1497
1498    /// SELECT DISTINCT col FROM table
1499    ///
1500    /// table schema:
1501    /// | name | type  |
1502    /// |------|-------|
1503    /// | col  | Int64 |
1504    #[test]
1505    fn test_basic_distinct() {
1506        let mut df = Dfir::new();
1507        let mut state = DataflowState::default();
1508        let mut ctx = harness_test_ctx(&mut df, &mut state);
1509
1510        let rows = vec![
1511            (Row::new(vec![1i64.into()]), 1, 1),
1512            (Row::new(vec![2i64.into()]), 2, 1),
1513            (Row::new(vec![3i64.into()]), 3, 1),
1514            (Row::new(vec![1i64.into()]), 4, 1),
1515            (Row::new(vec![2i64.into()]), 5, 1),
1516            (Row::new(vec![3i64.into()]), 6, 1),
1517        ];
1518        let collection = ctx.render_constant(rows.clone());
1519        ctx.insert_global(GlobalId::User(1), collection);
1520        let input_plan = Plan::Get {
1521            id: expr::Id::Global(GlobalId::User(1)),
1522        };
1523        let typ = RelationType::new(vec![ColumnType::new_nullable(
1524            ConcreteDataType::int64_datatype(),
1525        )]);
1526        let key_val_plan = KeyValPlan {
1527            key_plan: MapFilterProject::new(1).project([0]).unwrap().into_safe(),
1528            val_plan: MapFilterProject::new(1).project([]).unwrap().into_safe(),
1529        };
1530        let reduce_plan = ReducePlan::Distinct;
1531        let bundle = ctx
1532            .render_reduce(
1533                Box::new(input_plan.with_types(typ.into_unnamed())),
1534                key_val_plan,
1535                reduce_plan,
1536                RelationType::empty(),
1537            )
1538            .unwrap();
1539
1540        let output = get_output_handle(&mut ctx, bundle);
1541        drop(ctx);
1542        let expected = BTreeMap::from([(
1543            6,
1544            vec![
1545                (Row::new(vec![1i64.into()]), 1, 1),
1546                (Row::new(vec![2i64.into()]), 2, 1),
1547                (Row::new(vec![3i64.into()]), 3, 1),
1548            ],
1549        )]);
1550        run_and_check(&mut state, &mut df, 6..7, expected, output);
1551    }
1552
1553    /// Batch Mode Reduce Evaluation
1554    /// SELECT SUM(col) FROM table
1555    ///
1556    /// table schema:
1557    /// | name | type  |
1558    /// |------|-------|
1559    /// | col  | Int64 |
1560    #[test]
1561    fn test_basic_batch_reduce_accum() {
1562        let mut df = Dfir::new();
1563        let mut state = DataflowState::default();
1564        let now = state.current_time_ref();
1565        let mut ctx = harness_test_ctx(&mut df, &mut state);
1566
1567        let rows = vec![
1568            (Row::new(vec![Value::Null]), -1, 1),
1569            (Row::new(vec![1i64.into()]), 0, 1),
1570            (Row::new(vec![Value::Null]), 1, 1),
1571            (Row::new(vec![2i64.into()]), 2, 1),
1572            (Row::new(vec![3i64.into()]), 3, 1),
1573            (Row::new(vec![1i64.into()]), 4, 1),
1574            (Row::new(vec![2i64.into()]), 5, 1),
1575            (Row::new(vec![3i64.into()]), 6, 1),
1576        ];
1577        let input_plan = Plan::Constant { rows: rows.clone() };
1578
1579        let typ = RelationType::new(vec![ColumnType::new_nullable(
1580            ConcreteDataType::int64_datatype(),
1581        )]);
1582        let key_val_plan = KeyValPlan {
1583            key_plan: MapFilterProject::new(1).project([]).unwrap().into_safe(),
1584            val_plan: MapFilterProject::new(1).project([0]).unwrap().into_safe(),
1585        };
1586
1587        let simple_aggrs = vec![AggrWithIndex::new(
1588            AggregateExpr {
1589                func: AggregateFunc::SumInt64,
1590                expr: ScalarExpr::Column(0),
1591                distinct: false,
1592            },
1593            0,
1594            0,
1595        )];
1596        let accum_plan = AccumulablePlan {
1597            full_aggrs: vec![AggregateExpr {
1598                func: AggregateFunc::SumInt64,
1599                expr: ScalarExpr::Column(0),
1600                distinct: false,
1601            }],
1602            simple_aggrs,
1603            distinct_aggrs: vec![],
1604        };
1605
1606        let reduce_plan = ReducePlan::Accumulable(accum_plan);
1607        let bundle = ctx
1608            .render_reduce_batch(
1609                Box::new(input_plan.with_types(typ.into_unnamed())),
1610                &key_val_plan,
1611                &reduce_plan,
1612                &RelationType::new(vec![ColumnType::new(CDT::int64_datatype(), true)]),
1613            )
1614            .unwrap();
1615
1616        {
1617            let now_inner = now.clone();
1618            let expected = BTreeMap::<i64, Vec<i64>>::from([
1619                (-1, vec![]),
1620                (0, vec![1i64]),
1621                (1, vec![1i64]),
1622                (2, vec![3i64]),
1623                (3, vec![6i64]),
1624                (4, vec![7i64]),
1625                (5, vec![9i64]),
1626                (6, vec![12i64]),
1627            ]);
1628            let collection = bundle.collection;
1629            ctx.df
1630                .add_subgraph_sink("test_sink", collection.into_inner(), move |_ctx, recv| {
1631                    let now = *now_inner.borrow();
1632                    let data = recv.take_inner();
1633                    let res = data.into_iter().flat_map(|v| v.into_iter()).collect_vec();
1634
1635                    if let Some(expected) = expected.get(&now) {
1636                        let batch = expected.iter().map(|v| Value::from(*v)).collect_vec();
1637                        let batch = Batch::try_from_rows_with_types(
1638                            vec![batch.into()],
1639                            &[CDT::int64_datatype()],
1640                        )
1641                        .unwrap();
1642                        assert_eq!(res.first(), Some(&batch));
1643                    }
1644                });
1645            drop(ctx);
1646
1647            for now in 1..7 {
1648                state.set_current_ts(now);
1649                state.run_available_with_schedule(&mut df);
1650                if !state.get_err_collector().is_empty() {
1651                    panic!(
1652                        "Errors occur: {:?}",
1653                        state.get_err_collector().get_all_blocking()
1654                    )
1655                }
1656            }
1657        }
1658    }
1659
1660    /// SELECT SUM(col) FROM table
1661    ///
1662    /// table schema:
1663    /// | name | type  |
1664    /// |------|-------|
1665    /// | col  | Int64 |
1666    #[test]
1667    fn test_basic_reduce_accum() {
1668        let mut df = Dfir::new();
1669        let mut state = DataflowState::default();
1670        let mut ctx = harness_test_ctx(&mut df, &mut state);
1671
1672        let rows = vec![
1673            (Row::new(vec![1i64.into()]), 1, 1),
1674            (Row::new(vec![2i64.into()]), 2, 1),
1675            (Row::new(vec![3i64.into()]), 3, 1),
1676            (Row::new(vec![1i64.into()]), 4, 1),
1677            (Row::new(vec![2i64.into()]), 5, 1),
1678            (Row::new(vec![3i64.into()]), 6, 1),
1679        ];
1680        let collection = ctx.render_constant(rows.clone());
1681        ctx.insert_global(GlobalId::User(1), collection);
1682        let input_plan = Plan::Get {
1683            id: expr::Id::Global(GlobalId::User(1)),
1684        };
1685        let typ = RelationType::new(vec![ColumnType::new_nullable(
1686            ConcreteDataType::int64_datatype(),
1687        )]);
1688        let key_val_plan = KeyValPlan {
1689            key_plan: MapFilterProject::new(1).project([]).unwrap().into_safe(),
1690            val_plan: MapFilterProject::new(1).project([0]).unwrap().into_safe(),
1691        };
1692
1693        let simple_aggrs = vec![AggrWithIndex::new(
1694            AggregateExpr {
1695                func: AggregateFunc::SumInt64,
1696                expr: ScalarExpr::Column(0),
1697                distinct: false,
1698            },
1699            0,
1700            0,
1701        )];
1702        let accum_plan = AccumulablePlan {
1703            full_aggrs: vec![AggregateExpr {
1704                func: AggregateFunc::SumInt64,
1705                expr: ScalarExpr::Column(0),
1706                distinct: false,
1707            }],
1708            simple_aggrs,
1709            distinct_aggrs: vec![],
1710        };
1711
1712        let reduce_plan = ReducePlan::Accumulable(accum_plan);
1713        let bundle = ctx
1714            .render_reduce(
1715                Box::new(input_plan.with_types(typ.into_unnamed())),
1716                key_val_plan,
1717                reduce_plan,
1718                RelationType::empty(),
1719            )
1720            .unwrap();
1721
1722        let output = get_output_handle(&mut ctx, bundle);
1723        drop(ctx);
1724        let expected = BTreeMap::from([
1725            (1, vec![(Row::new(vec![1i64.into()]), 1, 1)]),
1726            (2, vec![(Row::new(vec![3i64.into()]), 2, 1)]),
1727            (3, vec![(Row::new(vec![6i64.into()]), 3, 1)]),
1728            (4, vec![(Row::new(vec![7i64.into()]), 4, 1)]),
1729            (5, vec![(Row::new(vec![9i64.into()]), 5, 1)]),
1730            (6, vec![(Row::new(vec![12i64.into()]), 6, 1)]),
1731        ]);
1732        run_and_check(&mut state, &mut df, 1..7, expected, output);
1733    }
1734
1735    /// SELECT SUM(DISTINCT col) FROM table
1736    ///
1737    /// table schema:
1738    /// | name | type  |
1739    /// |------|-------|
1740    /// | col  | Int64 |
1741    ///
1742    /// this test include even more insert/delete case to cover all case for eval_distinct_core
1743    #[test]
1744    fn test_delete_reduce_distinct_accum() {
1745        let mut df = Dfir::new();
1746        let mut state = DataflowState::default();
1747        let mut ctx = harness_test_ctx(&mut df, &mut state);
1748
1749        let rows = vec![
1750            // same tick
1751            (Row::new(vec![1i64.into()]), 1, 1),
1752            (Row::new(vec![1i64.into()]), 1, -1),
1753            // next tick
1754            (Row::new(vec![1i64.into()]), 2, 1),
1755            (Row::new(vec![1i64.into()]), 3, -1),
1756            // repeat in same tick
1757            (Row::new(vec![1i64.into()]), 4, 1),
1758            (Row::new(vec![1i64.into()]), 4, -1),
1759            (Row::new(vec![1i64.into()]), 4, 1),
1760        ];
1761        let collection = ctx.render_constant(rows.clone());
1762        ctx.insert_global(GlobalId::User(1), collection);
1763        let input_plan = Plan::Get {
1764            id: expr::Id::Global(GlobalId::User(1)),
1765        };
1766        let typ = RelationType::new(vec![ColumnType::new_nullable(
1767            ConcreteDataType::int64_datatype(),
1768        )]);
1769        let key_val_plan = KeyValPlan {
1770            key_plan: MapFilterProject::new(1).project([]).unwrap().into_safe(),
1771            val_plan: MapFilterProject::new(1).project([0]).unwrap().into_safe(),
1772        };
1773
1774        let distinct_aggrs = vec![AggrWithIndex::new(
1775            AggregateExpr {
1776                func: AggregateFunc::SumInt64,
1777                expr: ScalarExpr::Column(0),
1778                distinct: false,
1779            },
1780            0,
1781            0,
1782        )];
1783        let accum_plan = AccumulablePlan {
1784            full_aggrs: vec![AggregateExpr {
1785                func: AggregateFunc::SumInt64,
1786                expr: ScalarExpr::Column(0),
1787                distinct: true,
1788            }],
1789            simple_aggrs: vec![],
1790            distinct_aggrs,
1791        };
1792
1793        let reduce_plan = ReducePlan::Accumulable(accum_plan);
1794        let bundle = ctx
1795            .render_reduce(
1796                Box::new(input_plan.with_types(typ.into_unnamed())),
1797                key_val_plan,
1798                reduce_plan,
1799                RelationType::empty(),
1800            )
1801            .unwrap();
1802
1803        let output = get_output_handle(&mut ctx, bundle);
1804        drop(ctx);
1805        let expected = BTreeMap::from([
1806            (1, vec![(Row::new(vec![0i64.into()]), 1, 1)]),
1807            (2, vec![(Row::new(vec![1i64.into()]), 2, 1)]),
1808            (3, vec![(Row::new(vec![0i64.into()]), 3, 1)]),
1809            (4, vec![(Row::new(vec![1i64.into()]), 4, 1)]),
1810        ]);
1811        run_and_check(&mut state, &mut df, 1..7, expected, output);
1812    }
1813
1814    /// SELECT SUM(DISTINCT col) FROM table
1815    ///
1816    /// table schema:
1817    /// | name | type  |
1818    /// |------|-------|
1819    /// | col  | Int64 |
1820    ///
1821    /// this test include insert and delete which should cover all case for eval_distinct_core
1822    #[test]
1823    fn test_basic_reduce_distinct_accum() {
1824        let mut df = Dfir::new();
1825        let mut state = DataflowState::default();
1826        let mut ctx = harness_test_ctx(&mut df, &mut state);
1827
1828        let rows = vec![
1829            (Row::new(vec![1i64.into()]), 1, 1),
1830            (Row::new(vec![1i64.into()]), 1, -1),
1831            (Row::new(vec![2i64.into()]), 2, 1),
1832            (Row::new(vec![3i64.into()]), 3, 1),
1833            (Row::new(vec![1i64.into()]), 4, 1),
1834            (Row::new(vec![2i64.into()]), 5, 1),
1835            (Row::new(vec![3i64.into()]), 6, 1),
1836            (Row::new(vec![1i64.into()]), 7, 1),
1837        ];
1838        let collection = ctx.render_constant(rows.clone());
1839        ctx.insert_global(GlobalId::User(1), collection);
1840        let input_plan = Plan::Get {
1841            id: expr::Id::Global(GlobalId::User(1)),
1842        };
1843        let typ = RelationType::new(vec![ColumnType::new_nullable(
1844            ConcreteDataType::int64_datatype(),
1845        )]);
1846        let key_val_plan = KeyValPlan {
1847            key_plan: MapFilterProject::new(1).project([]).unwrap().into_safe(),
1848            val_plan: MapFilterProject::new(1).project([0]).unwrap().into_safe(),
1849        };
1850
1851        let distinct_aggrs = vec![AggrWithIndex::new(
1852            AggregateExpr {
1853                func: AggregateFunc::SumInt64,
1854                expr: ScalarExpr::Column(0),
1855                distinct: false,
1856            },
1857            0,
1858            0,
1859        )];
1860        let accum_plan = AccumulablePlan {
1861            full_aggrs: vec![AggregateExpr {
1862                func: AggregateFunc::SumInt64,
1863                expr: ScalarExpr::Column(0),
1864                distinct: true,
1865            }],
1866            simple_aggrs: vec![],
1867            distinct_aggrs,
1868        };
1869
1870        let reduce_plan = ReducePlan::Accumulable(accum_plan);
1871        let bundle = ctx
1872            .render_reduce(
1873                Box::new(input_plan.with_types(typ.into_unnamed())),
1874                key_val_plan,
1875                reduce_plan,
1876                RelationType::empty(),
1877            )
1878            .unwrap();
1879
1880        let output = get_output_handle(&mut ctx, bundle);
1881        drop(ctx);
1882        let expected = BTreeMap::from([
1883            (1, vec![(Row::new(vec![0i64.into()]), 1, 1)]),
1884            (2, vec![(Row::new(vec![2i64.into()]), 2, 1)]),
1885            (3, vec![(Row::new(vec![5i64.into()]), 3, 1)]),
1886            (4, vec![(Row::new(vec![6i64.into()]), 4, 1)]),
1887            (5, vec![(Row::new(vec![6i64.into()]), 5, 1)]),
1888            (6, vec![(Row::new(vec![6i64.into()]), 6, 1)]),
1889            (7, vec![(Row::new(vec![6i64.into()]), 7, 1)]),
1890        ]);
1891        run_and_check(&mut state, &mut df, 1..7, expected, output);
1892    }
1893
1894    /// SELECT SUM(col), SUM(DISTINCT col) FROM table
1895    ///
1896    /// table schema:
1897    /// | name | type  |
1898    /// |------|-------|
1899    /// | col  | Int64 |
1900    #[test]
1901    fn test_composite_reduce_distinct_accum() {
1902        let mut df = Dfir::new();
1903        let mut state = DataflowState::default();
1904        let mut ctx = harness_test_ctx(&mut df, &mut state);
1905
1906        let rows = vec![
1907            (Row::new(vec![1i64.into()]), 1, 1),
1908            (Row::new(vec![2i64.into()]), 2, 1),
1909            (Row::new(vec![3i64.into()]), 3, 1),
1910            (Row::new(vec![1i64.into()]), 4, 1),
1911            (Row::new(vec![2i64.into()]), 5, 1),
1912            (Row::new(vec![3i64.into()]), 6, 1),
1913            (Row::new(vec![1i64.into()]), 7, 1),
1914        ];
1915        let collection = ctx.render_constant(rows.clone());
1916        ctx.insert_global(GlobalId::User(1), collection);
1917        let input_plan = Plan::Get {
1918            id: expr::Id::Global(GlobalId::User(1)),
1919        };
1920        let typ = RelationType::new(vec![ColumnType::new_nullable(
1921            ConcreteDataType::int64_datatype(),
1922        )]);
1923        let key_val_plan = KeyValPlan {
1924            key_plan: MapFilterProject::new(1).project([]).unwrap().into_safe(),
1925            val_plan: MapFilterProject::new(1).project([0]).unwrap().into_safe(),
1926        };
1927        let simple_aggrs = vec![AggrWithIndex::new(
1928            AggregateExpr {
1929                func: AggregateFunc::SumInt64,
1930                expr: ScalarExpr::Column(0),
1931                distinct: false,
1932            },
1933            0,
1934            0,
1935        )];
1936        let distinct_aggrs = vec![AggrWithIndex::new(
1937            AggregateExpr {
1938                func: AggregateFunc::SumInt64,
1939                expr: ScalarExpr::Column(0),
1940                distinct: true,
1941            },
1942            0,
1943            1,
1944        )];
1945        let accum_plan = AccumulablePlan {
1946            full_aggrs: vec![
1947                AggregateExpr {
1948                    func: AggregateFunc::SumInt64,
1949                    expr: ScalarExpr::Column(0),
1950                    distinct: false,
1951                },
1952                AggregateExpr {
1953                    func: AggregateFunc::SumInt64,
1954                    expr: ScalarExpr::Column(0),
1955                    distinct: true,
1956                },
1957            ],
1958            simple_aggrs,
1959            distinct_aggrs,
1960        };
1961
1962        let reduce_plan = ReducePlan::Accumulable(accum_plan);
1963        let bundle = ctx
1964            .render_reduce(
1965                Box::new(input_plan.with_types(typ.into_unnamed())),
1966                key_val_plan,
1967                reduce_plan,
1968                RelationType::empty(),
1969            )
1970            .unwrap();
1971
1972        let output = get_output_handle(&mut ctx, bundle);
1973        drop(ctx);
1974        let expected = BTreeMap::from([
1975            (1, vec![(Row::new(vec![1i64.into(), 1i64.into()]), 1, 1)]),
1976            (2, vec![(Row::new(vec![3i64.into(), 3i64.into()]), 2, 1)]),
1977            (3, vec![(Row::new(vec![6i64.into(), 6i64.into()]), 3, 1)]),
1978            (4, vec![(Row::new(vec![7i64.into(), 6i64.into()]), 4, 1)]),
1979            (5, vec![(Row::new(vec![9i64.into(), 6i64.into()]), 5, 1)]),
1980            (6, vec![(Row::new(vec![12i64.into(), 6i64.into()]), 6, 1)]),
1981            (7, vec![(Row::new(vec![13i64.into(), 6i64.into()]), 7, 1)]),
1982        ]);
1983        run_and_check(&mut state, &mut df, 1..7, expected, output);
1984    }
1985}