flow/compute/render/
reduce.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, BTreeSet};
16use std::ops::Range;
17use std::sync::Arc;
18
19use arrow::array::new_null_array;
20use common_telemetry::trace;
21use datatypes::data_type::ConcreteDataType;
22use datatypes::prelude::DataType;
23use datatypes::value::{ListValue, Value};
24use datatypes::vectors::{BooleanVector, NullVector};
25use dfir_rs::scheduled::graph_ext::GraphExt;
26use itertools::Itertools;
27use snafu::{ensure, OptionExt, ResultExt};
28
29use crate::compute::render::{Context, SubgraphArg};
30use crate::compute::types::{Arranged, Collection, CollectionBundle, ErrCollector, Toff};
31use crate::error::{Error, NotImplementedSnafu, PlanSnafu};
32use crate::expr::error::{ArrowSnafu, DataAlreadyExpiredSnafu, DataTypeSnafu, InternalSnafu};
33use crate::expr::{Accum, Accumulator, Batch, EvalError, ScalarExpr, VectorDiff};
34use crate::plan::{AccumulablePlan, AggrWithIndex, KeyValPlan, ReducePlan, TypedPlan};
35use crate::repr::{self, DiffRow, KeyValDiffRow, RelationType, Row};
36use crate::utils::{ArrangeHandler, ArrangeReader, ArrangeWriter, KeyExpiryManager};
37
38impl Context<'_, '_> {
39    const REDUCE_BATCH: &'static str = "reduce_batch";
40    /// Like `render_reduce`, but for batch mode, and only barebone implementation
41    /// no support for distinct aggregation for now
42    // There is a false positive in using `Vec<ScalarExpr>` as key due to `Value` have `bytes` variant
43    #[allow(clippy::mutable_key_type)]
44    pub fn render_reduce_batch(
45        &mut self,
46        input: Box<TypedPlan>,
47        key_val_plan: &KeyValPlan,
48        reduce_plan: &ReducePlan,
49        output_type: &RelationType,
50    ) -> Result<CollectionBundle<Batch>, Error> {
51        let accum_plan = if let ReducePlan::Accumulable(accum_plan) = reduce_plan {
52            if !accum_plan.distinct_aggrs.is_empty() {
53                NotImplementedSnafu {
54                    reason: "Distinct aggregation is not supported in batch mode",
55                }
56                .fail()?
57            }
58            accum_plan.clone()
59        } else {
60            NotImplementedSnafu {
61                reason: "Only accumulable reduce plan is supported in batch mode",
62            }
63            .fail()?
64        };
65
66        let input = self.render_plan_batch(*input)?;
67
68        // first assembly key&val to separate key and val columns(since this is batch mode)
69        // Then stream kvs through a reduce operator
70
71        // the output is concat from key and val
72        let output_key_arity = key_val_plan.key_plan.output_arity();
73
74        // TODO(discord9): config global expire time from self
75        let arrange_handler = self.compute_state.new_arrange(None);
76
77        if let (Some(time_index), Some(expire_after)) =
78            (output_type.time_index, self.compute_state.expire_after())
79        {
80            let expire_man =
81                KeyExpiryManager::new(Some(expire_after), Some(ScalarExpr::Column(time_index)));
82            arrange_handler.write().set_expire_state(expire_man);
83        }
84
85        // reduce need full arrangement to be able to query all keys
86        let arrange_handler_inner = arrange_handler.clone_full_arrange().context(PlanSnafu {
87            reason: "No write is expected at this point",
88        })?;
89        let key_val_plan = key_val_plan.clone();
90
91        let output_type = output_type.clone();
92
93        let now = self.compute_state.current_time_ref();
94
95        let err_collector = self.err_collector.clone();
96
97        // TODO(discord9): better way to schedule future run
98        let scheduler = self.compute_state.get_scheduler();
99
100        let scheduler_inner = scheduler.clone();
101
102        let (out_send_port, out_recv_port) =
103            self.df.make_edge::<_, Toff<Batch>>(Self::REDUCE_BATCH);
104
105        let subgraph = self.df.add_subgraph_in_out(
106            Self::REDUCE_BATCH,
107            input.collection.into_inner(),
108            out_send_port,
109            move |_ctx, recv, send| {
110                let now = *(now.borrow());
111                let arrange = arrange_handler_inner.clone();
112                // mfp only need to passively receive updates from recvs
113                let src_data = recv
114                    .take_inner()
115                    .into_iter()
116                    .flat_map(|v| v.into_iter())
117                    .collect_vec();
118
119                reduce_batch_subgraph(
120                    &arrange,
121                    src_data,
122                    &key_val_plan,
123                    &accum_plan,
124                    &output_type,
125                    SubgraphArg {
126                        now,
127                        err_collector: &err_collector,
128                        scheduler: &scheduler_inner,
129                        send,
130                    },
131                )
132            },
133        );
134
135        scheduler.set_cur_subgraph(subgraph);
136
137        // by default the key of output arrange
138        let arranged = BTreeMap::from([(
139            (0..output_key_arity).map(ScalarExpr::Column).collect_vec(),
140            Arranged::new(arrange_handler),
141        )]);
142
143        let bundle = CollectionBundle {
144            collection: Collection::from_port(out_recv_port),
145            arranged,
146        };
147        Ok(bundle)
148    }
149
150    const REDUCE: &'static str = "reduce";
151    /// render `Plan::Reduce` into executable dataflow
152    // There is a false positive in using `Vec<ScalarExpr>` as key due to `Value` have `bytes` variant
153    #[allow(clippy::mutable_key_type)]
154    pub fn render_reduce(
155        &mut self,
156        input: Box<TypedPlan>,
157        key_val_plan: KeyValPlan,
158        reduce_plan: ReducePlan,
159        output_type: RelationType,
160    ) -> Result<CollectionBundle, Error> {
161        let input = self.render_plan(*input)?;
162        // first assembly key&val that's ((Row, Row), tick, diff)
163        // Then stream kvs through a reduce operator
164
165        // the output is concat from key and val
166        let output_key_arity = key_val_plan.key_plan.output_arity();
167
168        // TODO(discord9): config global expire time from self
169        let arrange_handler = self.compute_state.new_arrange(None);
170
171        if let (Some(time_index), Some(expire_after)) =
172            (output_type.time_index, self.compute_state.expire_after())
173        {
174            let expire_man =
175                KeyExpiryManager::new(Some(expire_after), Some(ScalarExpr::Column(time_index)));
176            arrange_handler.write().set_expire_state(expire_man);
177        }
178
179        // reduce need full arrangement to be able to query all keys
180        let arrange_handler_inner = arrange_handler.clone_full_arrange().context(PlanSnafu {
181            reason: "No write is expected at this point",
182        })?;
183
184        let distinct_input = self.add_accum_distinct_input_arrange(&reduce_plan);
185
186        let reduce_arrange = ReduceArrange {
187            output_arrange: arrange_handler_inner,
188            distinct_input,
189        };
190
191        let now = self.compute_state.current_time_ref();
192
193        let err_collector = self.err_collector.clone();
194
195        // TODO(discord9): better way to schedule future run
196        let scheduler = self.compute_state.get_scheduler();
197        let scheduler_inner = scheduler.clone();
198
199        let (out_send_port, out_recv_port) = self.df.make_edge::<_, Toff>(Self::REDUCE);
200
201        let subgraph = self.df.add_subgraph_in_out(
202            Self::REDUCE,
203            input.collection.into_inner(),
204            out_send_port,
205            move |_ctx, recv, send| {
206                // mfp only need to passively receive updates from recvs
207                let data = recv
208                    .take_inner()
209                    .into_iter()
210                    .flat_map(|v| v.into_iter())
211                    .collect_vec();
212
213                reduce_subgraph(
214                    &reduce_arrange,
215                    data,
216                    &key_val_plan,
217                    &reduce_plan,
218                    SubgraphArg {
219                        now: *now.borrow(),
220                        err_collector: &err_collector,
221                        scheduler: &scheduler_inner,
222                        send,
223                    },
224                );
225            },
226        );
227
228        scheduler.set_cur_subgraph(subgraph);
229
230        // by default the key of output arrange
231        let arranged = BTreeMap::from([(
232            (0..output_key_arity).map(ScalarExpr::Column).collect_vec(),
233            Arranged::new(arrange_handler),
234        )]);
235
236        let bundle = CollectionBundle {
237            collection: Collection::from_port(out_recv_port),
238            arranged,
239        };
240        Ok(bundle)
241    }
242
243    /// Contrast to it name, it's for adding distinct input for
244    /// accumulable reduce plan with distinct input,
245    /// like `select COUNT(DISTINCT col) from table`
246    ///
247    /// The return value is optional a list of arrangement, which is created for distinct input, and should be the
248    /// same length as the distinct aggregation in accumulable reduce plan
249    fn add_accum_distinct_input_arrange(
250        &mut self,
251        reduce_plan: &ReducePlan,
252    ) -> Option<Vec<ArrangeHandler>> {
253        match reduce_plan {
254            ReducePlan::Distinct => None,
255            ReducePlan::Accumulable(AccumulablePlan { distinct_aggrs, .. }) => {
256                (!distinct_aggrs.is_empty()).then(|| {
257                    std::iter::repeat_with(|| {
258                        let arr = self.compute_state.new_arrange(None);
259                        arr.set_full_arrangement(true);
260                        arr
261                    })
262                    .take(distinct_aggrs.len())
263                    .collect()
264                })
265            }
266        }
267    }
268}
269
270fn from_accum_values_to_live_accums(
271    accums: Vec<Value>,
272    len: usize,
273) -> Result<Vec<Vec<Value>>, EvalError> {
274    let accum_ranges = from_val_to_slice_idx(accums.first().cloned(), len)?;
275    let mut accum_list = vec![];
276    for range in accum_ranges.iter() {
277        accum_list.push(accums.get(range.clone()).unwrap_or_default().to_vec());
278    }
279    Ok(accum_list)
280}
281
282/// All arrange(aka state) used in reduce operator
283pub struct ReduceArrange {
284    /// The output arrange of reduce operator
285    output_arrange: ArrangeHandler,
286    /// The distinct input arrangement for accumulable reduce plan
287    /// only used when accumulable reduce plan has distinct aggregation
288    distinct_input: Option<Vec<ArrangeHandler>>,
289}
290
291fn batch_split_by_key_val(
292    batch: &Batch,
293    key_val_plan: &KeyValPlan,
294    err_collector: &ErrCollector,
295) -> (Batch, Batch) {
296    let row_count = batch.row_count();
297    let mut key_batch = Batch::empty();
298    let mut val_batch = Batch::empty();
299
300    err_collector.run(|| {
301        if key_val_plan.key_plan.output_arity() != 0 {
302            key_batch = key_val_plan.key_plan.eval_batch_into(&mut batch.clone())?;
303        }
304
305        if key_val_plan.val_plan.output_arity() != 0 {
306            val_batch = key_val_plan.val_plan.eval_batch_into(&mut batch.clone())?;
307        }
308        Ok(())
309    });
310
311    // deal with empty key or val
312    if key_batch.row_count() == 0 && key_batch.column_count() == 0 {
313        key_batch.set_row_count(row_count);
314    }
315
316    if val_batch.row_count() == 0 && val_batch.column_count() == 0 {
317        val_batch.set_row_count(row_count);
318    }
319
320    (key_batch, val_batch)
321}
322
323/// split a row into key and val by evaluate the key and val plan
324fn split_rows_to_key_val(
325    rows: impl IntoIterator<Item = DiffRow>,
326    key_val_plan: KeyValPlan,
327    err_collector: ErrCollector,
328) -> impl IntoIterator<Item = KeyValDiffRow> {
329    let mut row_buf = Row::new(vec![]);
330    rows.into_iter().filter_map(
331        move |(mut row, sys_time, diff): DiffRow| -> Option<KeyValDiffRow> {
332            err_collector.run(|| {
333                let len = row.len();
334                if let Some(key) = key_val_plan
335                    .key_plan
336                    .evaluate_into(&mut row.inner, &mut row_buf)?
337                {
338                    // reuse the row as buffer
339                    row.inner.resize(len, Value::Null);
340                    // val_plan is not supported to carry any filter predicate,
341                    let val = key_val_plan
342                        .val_plan
343                        .evaluate_into(&mut row.inner, &mut row_buf)?
344                        .context(InternalSnafu {
345                            reason: "val_plan should not contain any filter predicate",
346                        })?;
347                    Ok(Some(((key, val), sys_time, diff)))
348                } else {
349                    Ok(None)
350                }
351            })?
352        },
353    )
354}
355
356fn reduce_batch_subgraph(
357    arrange: &ArrangeHandler,
358    src_data: impl IntoIterator<Item = Batch>,
359    key_val_plan: &KeyValPlan,
360    accum_plan: &AccumulablePlan,
361    output_type: &RelationType,
362    SubgraphArg {
363        now,
364        err_collector,
365        scheduler: _,
366        send,
367    }: SubgraphArg<Toff<Batch>>,
368) {
369    let mut key_to_many_vals = BTreeMap::<Row, Vec<Batch>>::new();
370    let mut input_row_count = 0;
371    let mut input_batch_count = 0;
372
373    for batch in src_data {
374        input_batch_count += 1;
375        input_row_count += batch.row_count();
376        err_collector.run(|| {
377            let (key_batch, val_batch) =
378                batch_split_by_key_val(&batch, key_val_plan, err_collector);
379            ensure!(
380                key_batch.row_count() == val_batch.row_count(),
381                InternalSnafu {
382                    reason: format!(
383                        "Key and val batch should have the same row count, found {} and {}",
384                        key_batch.row_count(),
385                        val_batch.row_count()
386                    )
387                }
388            );
389
390            let mut distinct_keys = BTreeSet::new();
391            for row_idx in 0..key_batch.row_count() {
392                let key_row = key_batch.get_row(row_idx)?;
393                let key_row = Row::new(key_row);
394
395                if distinct_keys.contains(&key_row) {
396                    continue;
397                } else {
398                    distinct_keys.insert(key_row.clone());
399                }
400            }
401
402            let key_data_types = output_type
403                .column_types
404                .iter()
405                .map(|t| t.scalar_type.clone())
406                .collect_vec();
407
408            // TODO(discord9): here reduce numbers of eq to minimal by keeping slicing key/val batch
409            for key_row in distinct_keys {
410                let key_scalar_value = {
411                    let mut key_scalar_value = Vec::with_capacity(key_row.len());
412                    for (key_idx, key) in key_row.iter().enumerate() {
413                        let v =
414                            key.try_to_scalar_value(&key.data_type())
415                                .context(DataTypeSnafu {
416                                    msg: "can't convert key values to datafusion value",
417                                })?;
418
419                        let key_data_type = key_data_types.get(key_idx).context(InternalSnafu {
420                            reason: format!(
421                                "Key index out of bound, expected at most {} but got {}",
422                                output_type.column_types.len(),
423                                key_idx
424                            ),
425                        })?;
426
427                        // if incoming value's datatype is null, it need to be handled specially, see below
428                        if key_data_type.as_arrow_type() != v.data_type()
429                            && !v.data_type().is_null()
430                        {
431                            crate::expr::error::InternalSnafu {
432                                reason: format!(
433                                    "Key data type mismatch, expected {:?} but got {:?}",
434                                    key_data_type.as_arrow_type(),
435                                    v.data_type()
436                                ),
437                            }
438                            .fail()?
439                        }
440
441                        // handle single null key
442                        let arrow_value = if v.data_type().is_null() {
443                            let ret = new_null_array(&arrow::datatypes::DataType::Null, 1);
444                            arrow::array::Scalar::new(ret)
445                        } else {
446                            v.to_scalar().context(crate::expr::error::DatafusionSnafu {
447                                context: "can't convert key values to arrow value",
448                            })?
449                        };
450                        key_scalar_value.push(arrow_value);
451                    }
452                    key_scalar_value
453                };
454
455                // first compute equal from separate columns
456                let eq_results = key_scalar_value
457                    .into_iter()
458                    .zip(key_batch.batch().iter())
459                    .map(|(key, col)| {
460                        // TODO(discord9): this takes half of the cpu! And this is redundant amount of `eq`!
461
462                        // note that if lhs is a null, we still need to get all rows that are null! But can't use `eq` since
463                        // it will return null if input have null, so we need to use `is_null` instead
464                        if arrow::array::Datum::get(&key).0.data_type().is_null() {
465                            arrow::compute::kernels::boolean::is_null(
466                                col.to_arrow_array().as_ref() as _
467                            )
468                        } else {
469                            arrow::compute::kernels::cmp::eq(
470                                &key,
471                                &col.to_arrow_array().as_ref() as _,
472                            )
473                        }
474                    })
475                    .try_collect::<_, Vec<_>, _>()
476                    .context(ArrowSnafu {
477                        context: "Failed to compare key values",
478                    })?;
479
480                // then combine all equal results to finally found equal key rows
481                let opt_eq_mask = eq_results
482                    .into_iter()
483                    .fold(None, |acc, v| match acc {
484                        Some(Ok(acc)) => Some(arrow::compute::kernels::boolean::and(&acc, &v)),
485                        Some(Err(_)) => acc,
486                        None => Some(Ok(v)),
487                    })
488                    .transpose()
489                    .context(ArrowSnafu {
490                        context: "Failed to combine key comparison results",
491                    })?;
492
493                let key_eq_mask = if let Some(eq_mask) = opt_eq_mask {
494                    BooleanVector::from(eq_mask)
495                } else {
496                    // if None, meaning key_batch's column number is zero, which means
497                    // the key is empty, so we just return a mask of all true
498                    // meaning taking all values
499                    BooleanVector::from(vec![true; key_batch.row_count()])
500                };
501                // TODO: both slice and mutate remaining batch
502
503                let cur_val_batch = val_batch.filter(&key_eq_mask)?;
504
505                key_to_many_vals
506                    .entry(key_row)
507                    .or_default()
508                    .push(cur_val_batch);
509            }
510
511            Ok(())
512        });
513    }
514
515    trace!(
516        "Reduce take {} batches, {} rows",
517        input_batch_count,
518        input_row_count
519    );
520
521    // write lock the arrange for the rest of the function body
522    // to prevent wired race condition
523    let mut arrange = arrange.write();
524    let mut all_arrange_updates = Vec::with_capacity(key_to_many_vals.len());
525
526    let mut all_output_dict = BTreeMap::new();
527
528    for (key, val_batches) in key_to_many_vals {
529        err_collector.run(|| -> Result<(), _> {
530            let (accums, _, _) = arrange.get(now, &key).unwrap_or_default();
531            let accum_list =
532                from_accum_values_to_live_accums(accums.unpack(), accum_plan.simple_aggrs.len())?;
533
534            let mut accum_output = AccumOutput::new();
535            for AggrWithIndex {
536                expr,
537                input_idx,
538                output_idx,
539            } in accum_plan.simple_aggrs.iter()
540            {
541                let cur_accum_value = accum_list.get(*output_idx).cloned().unwrap_or_default();
542                let mut cur_accum = if cur_accum_value.is_empty() {
543                    Accum::new_accum(&expr.func.clone())?
544                } else {
545                    Accum::try_into_accum(&expr.func, cur_accum_value)?
546                };
547
548                for val_batch in val_batches.iter() {
549                    // if batch is empty, input null instead
550                    let cur_input = val_batch
551                        .batch()
552                        .get(*input_idx)
553                        .cloned()
554                        .unwrap_or_else(|| Arc::new(NullVector::new(val_batch.row_count())));
555                    let len = cur_input.len();
556                    cur_accum.update_batch(&expr.func, VectorDiff::from(cur_input))?;
557
558                    trace!("Reduce accum after take {} rows: {:?}", len, cur_accum);
559                }
560                let final_output = cur_accum.eval(&expr.func)?;
561                trace!("Reduce accum final output: {:?}", final_output);
562                accum_output.insert_output(*output_idx, final_output);
563
564                let cur_accum_value = cur_accum.into_state();
565                accum_output.insert_accum(*output_idx, cur_accum_value);
566            }
567
568            let (new_accums, res_val_row) = accum_output.into_accum_output()?;
569
570            let arrange_update = ((key.clone(), Row::new(new_accums)), now, 1);
571            all_arrange_updates.push(arrange_update);
572
573            all_output_dict.insert(key, Row::from(res_val_row));
574
575            Ok(())
576        });
577    }
578
579    err_collector.run(|| {
580        arrange.apply_updates(now, all_arrange_updates)?;
581        arrange.compact_to(now)
582    });
583    // release the lock
584    drop(arrange);
585
586    // this output part is not supposed to be resource intensive
587    // (because for every batch there wouldn't usually be as many output row?),
588    // so we can do some costly operation here
589    let output_types = output_type
590        .column_types
591        .iter()
592        .map(|t| t.scalar_type.clone())
593        .collect_vec();
594
595    err_collector.run(|| {
596            let column_cnt = output_types.len();
597            let row_cnt = all_output_dict.len();
598
599            let mut output_builder = output_types
600                .into_iter()
601                .map(|t| t.create_mutable_vector(row_cnt))
602                .collect_vec();
603
604            for (key, val) in all_output_dict {
605                for (i, v) in key.into_iter().chain(val.into_iter()).enumerate() {
606                    output_builder
607                    .get_mut(i)
608                    .context(InternalSnafu{
609                        reason: format!(
610                            "Output builder should have the same length as the row, expected at most {} but got {}",
611                            column_cnt - 1,
612                            i
613                        )
614                    })?
615                    .try_push_value_ref(v.as_value_ref())
616                    .context(DataTypeSnafu {
617                        msg: "Failed to push value",
618                    })?;
619                }
620            }
621
622            let output_columns = output_builder
623                .into_iter()
624                .map(|mut b| b.to_vector())
625                .collect_vec();
626
627            let output_batch = Batch::try_new(output_columns, row_cnt)?;
628
629            trace!("Reduce output batch: {:?}", output_batch);
630
631            send.give(vec![output_batch]);
632
633            Ok(())
634        });
635}
636
637/// reduce subgraph, reduce the input data into a single row
638/// output is concat from key and val
639fn reduce_subgraph(
640    ReduceArrange {
641        output_arrange: arrange,
642        distinct_input,
643    }: &ReduceArrange,
644    data: impl IntoIterator<Item = DiffRow>,
645    key_val_plan: &KeyValPlan,
646    reduce_plan: &ReducePlan,
647    SubgraphArg {
648        now,
649        err_collector,
650        scheduler,
651        send,
652    }: SubgraphArg,
653) {
654    let key_val = split_rows_to_key_val(data, key_val_plan.clone(), err_collector.clone());
655    // from here for distinct reduce and accum reduce, things are drastically different
656    // for distinct reduce the arrange store the output,
657    // but for accum reduce the arrange store the accum state, and output is
658    // evaluated from the accum state if there is need to update
659    match reduce_plan {
660        ReducePlan::Distinct => reduce_distinct_subgraph(
661            arrange,
662            key_val,
663            SubgraphArg {
664                now,
665                err_collector,
666                scheduler,
667                send,
668            },
669        ),
670        ReducePlan::Accumulable(accum_plan) => reduce_accum_subgraph(
671            arrange,
672            distinct_input,
673            key_val,
674            accum_plan,
675            SubgraphArg {
676                now,
677                err_collector,
678                scheduler,
679                send,
680            },
681        ),
682    };
683}
684
685/// return distinct rows(distinct by row's key) from the input, but do not update the arrangement
686///
687/// if the same key already exist, we only preserve the oldest value(It make sense for distinct input over key)
688fn eval_distinct_core(
689    arrange: ArrangeReader,
690    kv: impl IntoIterator<Item = KeyValDiffRow>,
691    now: repr::Timestamp,
692    err_collector: &ErrCollector,
693) -> Vec<KeyValDiffRow> {
694    let _ = err_collector;
695
696    // note that we also need to keep track of the distinct rows inside the current input
697    // hence the `inner_map` to keeping track of the distinct rows
698    let mut inner_map = BTreeMap::new();
699    kv.into_iter()
700        .filter_map(|((key, val), ts, diff)| {
701            // first check inner_map, then check the arrangement to make sure getting the newest value
702            let old_val = inner_map
703                .get(&key)
704                .cloned()
705                .or_else(|| arrange.get(now, &key));
706
707            let new_key_val = match (old_val, diff) {
708                // a new distinct row
709                (None, 1) => Some(((key, val), ts, diff)),
710                // if diff from newest value, also do update
711                (Some(old_val), diff) if old_val.0 == val && old_val.2 != diff => {
712                    Some(((key, val), ts, diff))
713                }
714                _ => None,
715            };
716
717            if let Some(((k, v), t, d)) = new_key_val.clone() {
718                // update the inner_map, so later updates can be checked against it
719                inner_map.insert(k, (v, t, d));
720            }
721            new_key_val
722        })
723        .collect_vec()
724}
725
726/// eval distinct reduce plan, output the distinct, and update the arrangement
727///
728/// This function is extracted because also want to use it to update distinct input of accumulable reduce plan
729fn update_reduce_distinct_arrange(
730    arrange: &ArrangeHandler,
731    kv: impl IntoIterator<Item = KeyValDiffRow>,
732    now: repr::Timestamp,
733    err_collector: &ErrCollector,
734) -> impl Iterator<Item = DiffRow> {
735    let result_updates = eval_distinct_core(arrange.read(), kv, now, err_collector);
736
737    err_collector.run(|| {
738        arrange.write().apply_updates(now, result_updates)?;
739        Ok(())
740    });
741
742    // Deal with output:
743
744    // 1. Read all updates that were emitted between the last time this arrangement had updates and the current time.
745    let from = arrange.read().last_compaction_time();
746    let from = from.unwrap_or(repr::Timestamp::MIN);
747    let range = (
748        std::ops::Bound::Excluded(from),
749        std::ops::Bound::Included(now),
750    );
751    let output_kv = arrange.read().get_updates_in_range(range);
752
753    // 2. Truncate all updates stored in arrangement within that range.
754    let run_compaction = || {
755        arrange.write().compact_to(now)?;
756        Ok(())
757    };
758    err_collector.run(run_compaction);
759
760    // 3. Output the updates.
761    // output is concat from key and val
762    output_kv.into_iter().map(|((mut key, v), ts, diff)| {
763        key.extend(v.into_iter());
764        (key, ts, diff)
765    })
766}
767
768/// eval distinct reduce plan, output the distinct, and update the arrangement
769///
770/// invariant: it'is assumed `kv`'s time is always <= now,
771/// since it's from a Collection Bundle, where future inserts are stored in arrange
772fn reduce_distinct_subgraph(
773    arrange: &ArrangeHandler,
774    kv: impl IntoIterator<Item = KeyValDiffRow>,
775    SubgraphArg {
776        now,
777        err_collector,
778        scheduler: _,
779        send,
780    }: SubgraphArg,
781) {
782    let ret = update_reduce_distinct_arrange(arrange, kv, now, err_collector).collect_vec();
783
784    // no future updates should exist here
785    if arrange.read().get_next_update_time(&now).is_some() {
786        err_collector.push_err(
787            InternalSnafu {
788                reason: "No future updates should exist in the reduce distinct arrangement",
789            }
790            .build(),
791        );
792    }
793
794    send.give(ret);
795}
796
797/// eval accumulable reduce plan by eval aggregate function and reduce the result
798///
799/// TODO(discord9): eval distinct by adding distinct input arrangement
800///
801/// invariant: it'is assumed `kv`'s time is always <= now,
802/// since it's from a Collection Bundle, where future inserts are stored in arrange
803///
804/// the data being send is just new rows that represent the new output after given input is processed
805///
806/// i.e: for example before new updates comes in, the output of query `SELECT sum(number), count(number) FROM table`
807/// is (10,2(), and after new updates comes in, the output is (15,3), then the new row being send is ((15, 3), now, 1)
808///
809/// while it will also update key -> accums's value, for example if it is empty before, it will become something like
810/// |offset| accum for sum | accum for count |
811/// where offset is a single value holding the end offset of each accumulator
812/// and the rest is the actual accumulator values which could be multiple values
813fn reduce_accum_subgraph(
814    arrange: &ArrangeHandler,
815    distinct_input: &Option<Vec<ArrangeHandler>>,
816    kv: impl IntoIterator<Item = KeyValDiffRow>,
817    accum_plan: &AccumulablePlan,
818    SubgraphArg {
819        now,
820        err_collector,
821        scheduler,
822        send,
823    }: SubgraphArg,
824) {
825    let AccumulablePlan {
826        full_aggrs,
827        simple_aggrs,
828        distinct_aggrs,
829    } = accum_plan;
830    let mut key_to_vals = BTreeMap::<Row, Vec<(Row, repr::Diff)>>::new();
831
832    for ((key, val), _tick, diff) in kv {
833        // it is assumed that value is in order of insertion
834        let vals = key_to_vals.entry(key).or_default();
835        vals.push((val, diff));
836    }
837
838    let mut all_updates = Vec::with_capacity(key_to_vals.len());
839    let mut all_outputs = Vec::with_capacity(key_to_vals.len());
840    // lock the arrange for write for the rest of function body
841    // so to prevent wired race condition since we are going to update the arrangement by write after read
842    // TODO(discord9): consider key-based lock
843    let mut arrange = arrange.write();
844    for (key, value_diffs) in key_to_vals {
845        if let Some(expire_man) = &arrange.get_expire_state() {
846            let mut is_expired = false;
847            err_collector.run(|| {
848                if let Some(expired) = expire_man.get_expire_duration(now, &key)? {
849                    is_expired = true;
850                    // expired data is ignored in computation, and a simple warning is logged
851                    common_telemetry::warn!(
852                        "Data already expired: {}",
853                        DataAlreadyExpiredSnafu {
854                            expired_by: expired,
855                        }
856                        .build()
857                    );
858                    Ok(())
859                } else {
860                    Ok(())
861                }
862            });
863            if is_expired {
864                // errors already collected, we can just continue to next key
865                continue;
866            }
867        }
868        let col_diffs = {
869            let row_len = value_diffs[0].0.len();
870            let res = err_collector.run(|| get_col_diffs(value_diffs, row_len));
871            match res {
872                Some(res) => res,
873                // TODO(discord9): consider better error handling other than
874                // just skip the row and logging error
875                None => continue,
876            }
877        };
878        let (accums, _, _) = arrange.get(now, &key).unwrap_or_default();
879
880        let accums = accums.inner;
881
882        // deser accums from offsets
883        let accum_ranges = {
884            let res = err_collector
885                .run(|| from_val_to_slice_idx(accums.first().cloned(), full_aggrs.len()));
886            if let Some(res) = res {
887                res
888            } else {
889                // errors is collected, we can just continue and send error back through `err_collector`
890                continue;
891            }
892        };
893
894        let mut accum_output = AccumOutput::new();
895        eval_simple_aggrs(
896            simple_aggrs,
897            &accums,
898            &accum_ranges,
899            &col_diffs,
900            &mut accum_output,
901            err_collector,
902        );
903
904        // for distinct input
905        eval_distinct_aggrs(
906            distinct_aggrs,
907            distinct_input,
908            &accums,
909            &accum_ranges,
910            &col_diffs,
911            &mut accum_output,
912            SubgraphArg {
913                now,
914                err_collector,
915                scheduler,
916                send,
917            },
918        );
919
920        // get and append results
921        err_collector.run(|| {
922            let (new_accums, res_val_row) = accum_output.into_accum_output()?;
923
924            // construct the updates and save it
925            all_updates.push(((key.clone(), Row::new(new_accums)), now, 1));
926            let mut key_val = key;
927            key_val.extend(res_val_row);
928            all_outputs.push((key_val, now, 1));
929            Ok(())
930        });
931    }
932    err_collector.run(|| {
933        arrange.apply_updates(now, all_updates)?;
934        arrange.compact_to(now)
935    });
936
937    // for all arranges involved, schedule next time this subgraph should run
938    // no future updates should exist here
939    let all_arrange_used = distinct_input
940        .iter()
941        .flatten()
942        .map(|d| d.write())
943        .chain(std::iter::once(arrange));
944    check_no_future_updates(all_arrange_used, err_collector, now);
945
946    send.give(all_outputs);
947}
948
949fn get_col_diffs(
950    value_diffs: Vec<(Row, repr::Diff)>,
951    row_len: usize,
952) -> Result<Vec<Vec<(Value, i64)>>, EvalError> {
953    ensure!(
954        value_diffs.iter().all(|(row, _)| row.len() == row_len),
955        InternalSnafu {
956            reason: "value_diffs should have rows with equal length"
957        }
958    );
959    let ret = (0..row_len)
960        .map(|i| {
961            value_diffs
962                .iter()
963                .map(|(row, diff)| (row.get(i).cloned().unwrap(), *diff))
964                .collect_vec()
965        })
966        .collect_vec();
967    Ok(ret)
968}
969
970/// Eval simple aggregate functions with no distinct input
971fn eval_simple_aggrs(
972    simple_aggrs: &Vec<AggrWithIndex>,
973    accums: &[Value],
974    accum_ranges: &[Range<usize>],
975    col_diffs: &[Vec<(Value, i64)>],
976    accum_output: &mut AccumOutput,
977    err_collector: &ErrCollector,
978) {
979    for AggrWithIndex {
980        expr,
981        input_idx,
982        output_idx,
983    } in simple_aggrs
984    {
985        let cur_accum_range = accum_ranges[*output_idx].clone(); // range of current accum
986        let cur_old_accum = accums
987            .get(cur_accum_range)
988            .unwrap_or_default()
989            .iter()
990            .cloned();
991        let cur_col_diff = col_diffs[*input_idx].iter().cloned();
992
993        // actual eval aggregation function
994        if let Some((res, new_accum)) =
995            err_collector.run(|| expr.func.eval_diff_accumulable(cur_old_accum, cur_col_diff))
996        {
997            accum_output.insert_accum(*output_idx, new_accum);
998            accum_output.insert_output(*output_idx, res);
999        } // else just collect error and continue
1000    }
1001}
1002
1003/// Accumulate the output of aggregation functions
1004///
1005/// The accum is a map from index to the accumulator of the aggregation function
1006///
1007/// The output is a map from index to the output of the aggregation function
1008#[derive(Debug)]
1009struct AccumOutput {
1010    accum: BTreeMap<usize, Vec<Value>>,
1011    output: BTreeMap<usize, Value>,
1012}
1013
1014impl AccumOutput {
1015    fn new() -> Self {
1016        Self {
1017            accum: BTreeMap::new(),
1018            output: BTreeMap::new(),
1019        }
1020    }
1021
1022    fn insert_accum(&mut self, idx: usize, v: Vec<Value>) {
1023        self.accum.insert(idx, v);
1024    }
1025
1026    fn insert_output(&mut self, idx: usize, v: Value) {
1027        self.output.insert(idx, v);
1028    }
1029
1030    /// return (accums, output)
1031    fn into_accum_output(self) -> Result<(Vec<Value>, Vec<Value>), EvalError> {
1032        if self.accum.is_empty() && self.output.is_empty() {
1033            return Ok((vec![], vec![]));
1034        }
1035        ensure!(
1036            !self.accum.is_empty() && self.accum.len() == self.output.len(),
1037            InternalSnafu {
1038                reason: format!("Accum and output should have the non-zero and same length, found accum.len() = {}, output.len() = {}", self.accum.len(), self.output.len())
1039            }
1040        );
1041        // make output vec from output map
1042        if let Some(kv) = self.accum.last_key_value() {
1043            ensure!(
1044                *kv.0 == self.accum.len() - 1,
1045                InternalSnafu {
1046                    reason: "Accum should be a continuous range"
1047                }
1048            );
1049        }
1050        if let Some(kv) = self.output.last_key_value() {
1051            ensure!(
1052                *kv.0 == self.output.len() - 1,
1053                InternalSnafu {
1054                    reason: "Output should be a continuous range"
1055                }
1056            );
1057        }
1058
1059        let accums = self.accum.into_values().collect_vec();
1060        let new_accums = from_accums_to_offsetted_accum(accums);
1061        let output = self.output.into_values().collect_vec();
1062        Ok((new_accums, output))
1063    }
1064}
1065
1066/// Eval distinct aggregate functions with distinct input arrange
1067fn eval_distinct_aggrs(
1068    distinct_aggrs: &Vec<AggrWithIndex>,
1069    distinct_input: &Option<Vec<ArrangeHandler>>,
1070    accums: &[Value],
1071    accum_ranges: &[Range<usize>],
1072    col_diffs: &[Vec<(Value, i64)>],
1073    accum_output: &mut AccumOutput,
1074    SubgraphArg {
1075        now,
1076        err_collector,
1077        scheduler: _,
1078        send: _,
1079    }: SubgraphArg,
1080) {
1081    for AggrWithIndex {
1082        expr,
1083        input_idx,
1084        output_idx,
1085    } in distinct_aggrs
1086    {
1087        let cur_accum_range = accum_ranges[*output_idx].clone(); // range of current accum
1088        let cur_old_accum = accums
1089            .get(cur_accum_range)
1090            .unwrap_or_default()
1091            .iter()
1092            .cloned();
1093        let cur_col_diff = col_diffs[*input_idx].iter().cloned();
1094        // first filter input with distinct
1095        let input_arrange = distinct_input
1096            .as_ref()
1097            .and_then(|v| v[*input_idx].clone_full_arrange())
1098            .expect("A full distinct input arrangement should exist");
1099        let kv = cur_col_diff.map(|(v, d)| ((Row::new(vec![v]), Row::empty()), now, d));
1100        let col_diff_distinct =
1101            update_reduce_distinct_arrange(&input_arrange, kv, now, err_collector).map(
1102                |(row, _ts, diff)| (row.get(0).expect("Row should not be empty").clone(), diff),
1103            );
1104        let col_diff_distinct = {
1105            let res = col_diff_distinct.collect_vec();
1106            res.into_iter()
1107        };
1108        // actual eval aggregation function
1109        let (res, new_accum) = expr
1110            .func
1111            .eval_diff_accumulable(cur_old_accum, col_diff_distinct)
1112            .unwrap();
1113        accum_output.insert_accum(*output_idx, new_accum);
1114        accum_output.insert_output(*output_idx, res);
1115    }
1116}
1117
1118fn check_no_future_updates<'a>(
1119    all_arrange_used: impl IntoIterator<Item = ArrangeWriter<'a>>,
1120    err_collector: &ErrCollector,
1121    now: repr::Timestamp,
1122) {
1123    for arrange in all_arrange_used {
1124        if arrange.get_next_update_time(&now).is_some() {
1125            err_collector.push_err(
1126                InternalSnafu {
1127                    reason: "No future updates should exist in the reduce distinct arrangement",
1128                }
1129                .build(),
1130            );
1131        }
1132    }
1133}
1134
1135/// convert a list of accumulators to a vector of values with first value as offset of end of each accumulator
1136fn from_accums_to_offsetted_accum(new_accums: Vec<Vec<Value>>) -> Vec<Value> {
1137    let offset = new_accums
1138        .iter()
1139        .map(|v| v.len() as u64)
1140        .scan(1, |state, x| {
1141            *state += x;
1142            Some(*state)
1143        })
1144        .map(Value::from)
1145        .collect::<Vec<_>>();
1146    let first = ListValue::new(offset, ConcreteDataType::uint64_datatype());
1147    let first = Value::List(first);
1148    // construct new_accums
1149
1150    std::iter::once(first)
1151        .chain(new_accums.into_iter().flatten())
1152        .collect::<Vec<_>>()
1153}
1154
1155/// Convert a value to a list of slice index
1156fn from_val_to_slice_idx(
1157    value: Option<Value>,
1158    expected_len: usize,
1159) -> Result<Vec<Range<usize>>, EvalError> {
1160    let offset_end = if let Some(value) = value {
1161        let list = value
1162            .as_list()
1163            .with_context(|_| DataTypeSnafu {
1164                msg: "Accum's first element should be a list",
1165            })?
1166            .context(InternalSnafu {
1167                reason: "Accum's first element should be a list",
1168            })?;
1169        let ret: Vec<usize> = list
1170            .items()
1171            .iter()
1172            .map(|v| {
1173                v.as_u64().map(|j| j as usize).context(InternalSnafu {
1174                    reason: "End offset should be a list of u64",
1175                })
1176            })
1177            .try_collect()?;
1178        ensure!(
1179            ret.len() == expected_len,
1180            InternalSnafu {
1181                reason: "Offset List should have the same length as full_aggrs"
1182            }
1183        );
1184        Ok(ret)
1185    } else {
1186        Ok(vec![1usize; expected_len])
1187    }?;
1188    let accum_ranges = (0..expected_len)
1189        .map(|idx| {
1190            if idx == 0 {
1191                // note that the first element is the offset list
1192                debug_assert!(
1193                    offset_end[0] >= 1,
1194                    "Offset should be at least 1: {:?}",
1195                    &offset_end
1196                );
1197                1..offset_end[0]
1198            } else {
1199                offset_end[idx - 1]..offset_end[idx]
1200            }
1201        })
1202        .collect_vec();
1203    Ok(accum_ranges)
1204}
1205
1206// mainly for reduce's test
1207// TODO(discord9): add tests for accum ser/de
1208#[cfg(test)]
1209mod test {
1210
1211    use std::time::Duration;
1212
1213    use common_time::Timestamp;
1214    use datatypes::data_type::{ConcreteDataType, ConcreteDataType as CDT};
1215    use dfir_rs::scheduled::graph::Dfir;
1216
1217    use super::*;
1218    use crate::compute::render::test::{get_output_handle, harness_test_ctx, run_and_check};
1219    use crate::compute::state::DataflowState;
1220    use crate::expr::{
1221        self, AggregateExpr, AggregateFunc, BinaryFunc, GlobalId, MapFilterProject, UnaryFunc,
1222    };
1223    use crate::plan::Plan;
1224    use crate::repr::{ColumnType, RelationType};
1225
1226    /// SELECT sum(number) FROM numbers_with_ts GROUP BY tumble(ts, '1 second', '2021-07-01 00:00:00')
1227    /// input table columns: number, ts
1228    /// expected: sum(number), window_start, window_end
1229    #[test]
1230    fn test_tumble_group_by() {
1231        let mut df = Dfir::new();
1232        let mut state = DataflowState::default();
1233        let mut ctx = harness_test_ctx(&mut df, &mut state);
1234        const START: i64 = 1625097600000;
1235        let rows = vec![
1236            (1u32, START + 1000),
1237            (2u32, START + 1500),
1238            (3u32, START + 2000),
1239            (1u32, START + 2500),
1240            (2u32, START + 3000),
1241            (3u32, START + 3500),
1242        ];
1243        let rows = rows
1244            .into_iter()
1245            .map(|(number, ts)| {
1246                (
1247                    Row::new(vec![number.into(), Timestamp::new_millisecond(ts).into()]),
1248                    1,
1249                    1,
1250                )
1251            })
1252            .collect_vec();
1253
1254        let collection = ctx.render_constant(rows.clone());
1255        ctx.insert_global(GlobalId::User(1), collection);
1256
1257        let aggr_expr = AggregateExpr {
1258            func: AggregateFunc::SumUInt32,
1259            expr: ScalarExpr::Column(0),
1260            distinct: false,
1261        };
1262        let expected = TypedPlan {
1263            schema: RelationType::new(vec![
1264                ColumnType::new(CDT::uint64_datatype(), true), // sum(number)
1265                ColumnType::new(CDT::timestamp_millisecond_datatype(), false), // window start
1266                ColumnType::new(CDT::timestamp_millisecond_datatype(), false), // window end
1267            ])
1268            .into_unnamed(),
1269            // TODO(discord9): mfp indirectly ref to key columns
1270            /*
1271            .with_key(vec![1])
1272            .with_time_index(Some(0)),*/
1273            plan: Plan::Mfp {
1274                input: Box::new(
1275                    Plan::Reduce {
1276                        input: Box::new(
1277                            Plan::Get {
1278                                id: crate::expr::Id::Global(GlobalId::User(1)),
1279                            }
1280                            .with_types(
1281                                RelationType::new(vec![
1282                                    ColumnType::new(ConcreteDataType::uint32_datatype(), false),
1283                                    ColumnType::new(
1284                                        ConcreteDataType::timestamp_millisecond_datatype(),
1285                                        false,
1286                                    ),
1287                                ])
1288                                .into_unnamed(),
1289                            ),
1290                        ),
1291                        key_val_plan: KeyValPlan {
1292                            key_plan: MapFilterProject::new(2)
1293                                .map(vec![
1294                                    ScalarExpr::Column(1).call_unary(
1295                                        UnaryFunc::TumbleWindowFloor {
1296                                            window_size: Duration::from_nanos(1_000_000_000),
1297                                            start_time: Some(Timestamp::new_millisecond(
1298                                                1625097600000,
1299                                            )),
1300                                        },
1301                                    ),
1302                                    ScalarExpr::Column(1).call_unary(
1303                                        UnaryFunc::TumbleWindowCeiling {
1304                                            window_size: Duration::from_nanos(1_000_000_000),
1305                                            start_time: Some(Timestamp::new_millisecond(
1306                                                1625097600000,
1307                                            )),
1308                                        },
1309                                    ),
1310                                ])
1311                                .unwrap()
1312                                .project(vec![2, 3])
1313                                .unwrap()
1314                                .into_safe(),
1315                            val_plan: MapFilterProject::new(2)
1316                                .project(vec![0, 1])
1317                                .unwrap()
1318                                .into_safe(),
1319                        },
1320                        reduce_plan: ReducePlan::Accumulable(AccumulablePlan {
1321                            full_aggrs: vec![aggr_expr.clone()],
1322                            simple_aggrs: vec![AggrWithIndex::new(aggr_expr.clone(), 0, 0)],
1323                            distinct_aggrs: vec![],
1324                        }),
1325                    }
1326                    .with_types(
1327                        RelationType::new(vec![
1328                            ColumnType::new(CDT::timestamp_millisecond_datatype(), false), // window start
1329                            ColumnType::new(CDT::timestamp_millisecond_datatype(), false), // window end
1330                            ColumnType::new(CDT::uint64_datatype(), true), //sum(number)
1331                        ])
1332                        .with_key(vec![1])
1333                        .with_time_index(Some(0))
1334                        .into_unnamed(),
1335                    ),
1336                ),
1337                mfp: MapFilterProject::new(3)
1338                    .map(vec![
1339                        ScalarExpr::Column(2),
1340                        ScalarExpr::Column(3),
1341                        ScalarExpr::Column(0),
1342                        ScalarExpr::Column(1),
1343                    ])
1344                    .unwrap()
1345                    .project(vec![4, 5, 6])
1346                    .unwrap(),
1347            },
1348        };
1349
1350        let bundle = ctx.render_plan(expected).unwrap();
1351
1352        let output = get_output_handle(&mut ctx, bundle);
1353        drop(ctx);
1354        let expected = BTreeMap::from([(
1355            1,
1356            vec![
1357                (
1358                    Row::new(vec![
1359                        3u64.into(),
1360                        Timestamp::new_millisecond(START + 1000).into(),
1361                        Timestamp::new_millisecond(START + 2000).into(),
1362                    ]),
1363                    1,
1364                    1,
1365                ),
1366                (
1367                    Row::new(vec![
1368                        4u64.into(),
1369                        Timestamp::new_millisecond(START + 2000).into(),
1370                        Timestamp::new_millisecond(START + 3000).into(),
1371                    ]),
1372                    1,
1373                    1,
1374                ),
1375                (
1376                    Row::new(vec![
1377                        5u64.into(),
1378                        Timestamp::new_millisecond(START + 3000).into(),
1379                        Timestamp::new_millisecond(START + 4000).into(),
1380                    ]),
1381                    1,
1382                    1,
1383                ),
1384            ],
1385        )]);
1386        run_and_check(&mut state, &mut df, 1..2, expected, output);
1387    }
1388
1389    /// select avg(number) from number;
1390    #[test]
1391    fn test_avg_eval() {
1392        let mut df = Dfir::new();
1393        let mut state = DataflowState::default();
1394        let mut ctx = harness_test_ctx(&mut df, &mut state);
1395
1396        let rows = vec![
1397            (Row::new(vec![1u32.into()]), 1, 1),
1398            (Row::new(vec![2u32.into()]), 1, 1),
1399            (Row::new(vec![3u32.into()]), 1, 1),
1400            (Row::new(vec![1u32.into()]), 1, 1),
1401            (Row::new(vec![2u32.into()]), 1, 1),
1402            (Row::new(vec![3u32.into()]), 1, 1),
1403        ];
1404        let collection = ctx.render_constant(rows.clone());
1405        ctx.insert_global(GlobalId::User(1), collection);
1406
1407        let aggr_exprs = vec![
1408            AggregateExpr {
1409                func: AggregateFunc::SumUInt32,
1410                expr: ScalarExpr::Column(0),
1411                distinct: false,
1412            },
1413            AggregateExpr {
1414                func: AggregateFunc::Count,
1415                expr: ScalarExpr::Column(0),
1416                distinct: false,
1417            },
1418        ];
1419        let avg_expr = ScalarExpr::If {
1420            cond: Box::new(ScalarExpr::Column(1).call_binary(
1421                ScalarExpr::Literal(Value::from(0u32), CDT::int64_datatype()),
1422                BinaryFunc::NotEq,
1423            )),
1424            then: Box::new(ScalarExpr::Column(0).call_binary(
1425                ScalarExpr::Column(1).call_unary(UnaryFunc::Cast(CDT::uint64_datatype())),
1426                BinaryFunc::DivUInt64,
1427            )),
1428            els: Box::new(ScalarExpr::Literal(Value::Null, CDT::uint64_datatype())),
1429        };
1430        let expected = TypedPlan {
1431            schema: RelationType::new(vec![ColumnType::new(CDT::uint64_datatype(), true)])
1432                .into_unnamed(),
1433            plan: Plan::Mfp {
1434                input: Box::new(
1435                    Plan::Reduce {
1436                        input: Box::new(
1437                            Plan::Get {
1438                                id: crate::expr::Id::Global(GlobalId::User(1)),
1439                            }
1440                            .with_types(
1441                                RelationType::new(vec![ColumnType::new(
1442                                    ConcreteDataType::int64_datatype(),
1443                                    false,
1444                                )])
1445                                .into_unnamed(),
1446                            ),
1447                        ),
1448                        key_val_plan: KeyValPlan {
1449                            key_plan: MapFilterProject::new(1)
1450                                .project(vec![])
1451                                .unwrap()
1452                                .into_safe(),
1453                            val_plan: MapFilterProject::new(1)
1454                                .project(vec![0])
1455                                .unwrap()
1456                                .into_safe(),
1457                        },
1458                        reduce_plan: ReducePlan::Accumulable(AccumulablePlan {
1459                            full_aggrs: aggr_exprs.clone(),
1460                            simple_aggrs: vec![
1461                                AggrWithIndex::new(aggr_exprs[0].clone(), 0, 0),
1462                                AggrWithIndex::new(aggr_exprs[1].clone(), 0, 1),
1463                            ],
1464                            distinct_aggrs: vec![],
1465                        }),
1466                    }
1467                    .with_types(
1468                        RelationType::new(vec![
1469                            ColumnType::new(ConcreteDataType::uint32_datatype(), true),
1470                            ColumnType::new(ConcreteDataType::int64_datatype(), true),
1471                        ])
1472                        .into_unnamed(),
1473                    ),
1474                ),
1475                mfp: MapFilterProject::new(2)
1476                    .map(vec![
1477                        avg_expr,
1478                        // TODO(discord9): optimize mfp so to remove indirect ref
1479                        ScalarExpr::Column(2),
1480                    ])
1481                    .unwrap()
1482                    .project(vec![3])
1483                    .unwrap(),
1484            },
1485        };
1486
1487        let bundle = ctx.render_plan(expected).unwrap();
1488
1489        let output = get_output_handle(&mut ctx, bundle);
1490        drop(ctx);
1491        let expected = BTreeMap::from([(1, vec![(Row::new(vec![2u64.into()]), 1, 1)])]);
1492        run_and_check(&mut state, &mut df, 1..2, expected, output);
1493    }
1494
1495    /// SELECT DISTINCT col FROM table
1496    ///
1497    /// table schema:
1498    /// | name | type  |
1499    /// |------|-------|
1500    /// | col  | Int64 |
1501    #[test]
1502    fn test_basic_distinct() {
1503        let mut df = Dfir::new();
1504        let mut state = DataflowState::default();
1505        let mut ctx = harness_test_ctx(&mut df, &mut state);
1506
1507        let rows = vec![
1508            (Row::new(vec![1i64.into()]), 1, 1),
1509            (Row::new(vec![2i64.into()]), 2, 1),
1510            (Row::new(vec![3i64.into()]), 3, 1),
1511            (Row::new(vec![1i64.into()]), 4, 1),
1512            (Row::new(vec![2i64.into()]), 5, 1),
1513            (Row::new(vec![3i64.into()]), 6, 1),
1514        ];
1515        let collection = ctx.render_constant(rows.clone());
1516        ctx.insert_global(GlobalId::User(1), collection);
1517        let input_plan = Plan::Get {
1518            id: expr::Id::Global(GlobalId::User(1)),
1519        };
1520        let typ = RelationType::new(vec![ColumnType::new_nullable(
1521            ConcreteDataType::int64_datatype(),
1522        )]);
1523        let key_val_plan = KeyValPlan {
1524            key_plan: MapFilterProject::new(1).project([0]).unwrap().into_safe(),
1525            val_plan: MapFilterProject::new(1).project([]).unwrap().into_safe(),
1526        };
1527        let reduce_plan = ReducePlan::Distinct;
1528        let bundle = ctx
1529            .render_reduce(
1530                Box::new(input_plan.with_types(typ.into_unnamed())),
1531                key_val_plan,
1532                reduce_plan,
1533                RelationType::empty(),
1534            )
1535            .unwrap();
1536
1537        let output = get_output_handle(&mut ctx, bundle);
1538        drop(ctx);
1539        let expected = BTreeMap::from([(
1540            6,
1541            vec![
1542                (Row::new(vec![1i64.into()]), 1, 1),
1543                (Row::new(vec![2i64.into()]), 2, 1),
1544                (Row::new(vec![3i64.into()]), 3, 1),
1545            ],
1546        )]);
1547        run_and_check(&mut state, &mut df, 6..7, expected, output);
1548    }
1549
1550    /// Batch Mode Reduce Evaluation
1551    /// SELECT SUM(col) FROM table
1552    ///
1553    /// table schema:
1554    /// | name | type  |
1555    /// |------|-------|
1556    /// | col  | Int64 |
1557    #[test]
1558    fn test_basic_batch_reduce_accum() {
1559        let mut df = Dfir::new();
1560        let mut state = DataflowState::default();
1561        let now = state.current_time_ref();
1562        let mut ctx = harness_test_ctx(&mut df, &mut state);
1563
1564        let rows = vec![
1565            (Row::new(vec![Value::Null]), -1, 1),
1566            (Row::new(vec![1i64.into()]), 0, 1),
1567            (Row::new(vec![Value::Null]), 1, 1),
1568            (Row::new(vec![2i64.into()]), 2, 1),
1569            (Row::new(vec![3i64.into()]), 3, 1),
1570            (Row::new(vec![1i64.into()]), 4, 1),
1571            (Row::new(vec![2i64.into()]), 5, 1),
1572            (Row::new(vec![3i64.into()]), 6, 1),
1573        ];
1574        let input_plan = Plan::Constant { rows: rows.clone() };
1575
1576        let typ = RelationType::new(vec![ColumnType::new_nullable(
1577            ConcreteDataType::int64_datatype(),
1578        )]);
1579        let key_val_plan = KeyValPlan {
1580            key_plan: MapFilterProject::new(1).project([]).unwrap().into_safe(),
1581            val_plan: MapFilterProject::new(1).project([0]).unwrap().into_safe(),
1582        };
1583
1584        let simple_aggrs = vec![AggrWithIndex::new(
1585            AggregateExpr {
1586                func: AggregateFunc::SumInt64,
1587                expr: ScalarExpr::Column(0),
1588                distinct: false,
1589            },
1590            0,
1591            0,
1592        )];
1593        let accum_plan = AccumulablePlan {
1594            full_aggrs: vec![AggregateExpr {
1595                func: AggregateFunc::SumInt64,
1596                expr: ScalarExpr::Column(0),
1597                distinct: false,
1598            }],
1599            simple_aggrs,
1600            distinct_aggrs: vec![],
1601        };
1602
1603        let reduce_plan = ReducePlan::Accumulable(accum_plan);
1604        let bundle = ctx
1605            .render_reduce_batch(
1606                Box::new(input_plan.with_types(typ.into_unnamed())),
1607                &key_val_plan,
1608                &reduce_plan,
1609                &RelationType::new(vec![ColumnType::new(CDT::int64_datatype(), true)]),
1610            )
1611            .unwrap();
1612
1613        {
1614            let now_inner = now.clone();
1615            let expected = BTreeMap::<i64, Vec<i64>>::from([
1616                (-1, vec![]),
1617                (0, vec![1i64]),
1618                (1, vec![1i64]),
1619                (2, vec![3i64]),
1620                (3, vec![6i64]),
1621                (4, vec![7i64]),
1622                (5, vec![9i64]),
1623                (6, vec![12i64]),
1624            ]);
1625            let collection = bundle.collection;
1626            ctx.df
1627                .add_subgraph_sink("test_sink", collection.into_inner(), move |_ctx, recv| {
1628                    let now = *now_inner.borrow();
1629                    let data = recv.take_inner();
1630                    let res = data.into_iter().flat_map(|v| v.into_iter()).collect_vec();
1631
1632                    if let Some(expected) = expected.get(&now) {
1633                        let batch = expected.iter().map(|v| Value::from(*v)).collect_vec();
1634                        let batch = Batch::try_from_rows_with_types(
1635                            vec![batch.into()],
1636                            &[CDT::int64_datatype()],
1637                        )
1638                        .unwrap();
1639                        assert_eq!(res.first(), Some(&batch));
1640                    }
1641                });
1642            drop(ctx);
1643
1644            for now in 1..7 {
1645                state.set_current_ts(now);
1646                state.run_available_with_schedule(&mut df);
1647                if !state.get_err_collector().is_empty() {
1648                    panic!(
1649                        "Errors occur: {:?}",
1650                        state.get_err_collector().get_all_blocking()
1651                    )
1652                }
1653            }
1654        }
1655    }
1656
1657    /// SELECT SUM(col) FROM table
1658    ///
1659    /// table schema:
1660    /// | name | type  |
1661    /// |------|-------|
1662    /// | col  | Int64 |
1663    #[test]
1664    fn test_basic_reduce_accum() {
1665        let mut df = Dfir::new();
1666        let mut state = DataflowState::default();
1667        let mut ctx = harness_test_ctx(&mut df, &mut state);
1668
1669        let rows = vec![
1670            (Row::new(vec![1i64.into()]), 1, 1),
1671            (Row::new(vec![2i64.into()]), 2, 1),
1672            (Row::new(vec![3i64.into()]), 3, 1),
1673            (Row::new(vec![1i64.into()]), 4, 1),
1674            (Row::new(vec![2i64.into()]), 5, 1),
1675            (Row::new(vec![3i64.into()]), 6, 1),
1676        ];
1677        let collection = ctx.render_constant(rows.clone());
1678        ctx.insert_global(GlobalId::User(1), collection);
1679        let input_plan = Plan::Get {
1680            id: expr::Id::Global(GlobalId::User(1)),
1681        };
1682        let typ = RelationType::new(vec![ColumnType::new_nullable(
1683            ConcreteDataType::int64_datatype(),
1684        )]);
1685        let key_val_plan = KeyValPlan {
1686            key_plan: MapFilterProject::new(1).project([]).unwrap().into_safe(),
1687            val_plan: MapFilterProject::new(1).project([0]).unwrap().into_safe(),
1688        };
1689
1690        let simple_aggrs = vec![AggrWithIndex::new(
1691            AggregateExpr {
1692                func: AggregateFunc::SumInt64,
1693                expr: ScalarExpr::Column(0),
1694                distinct: false,
1695            },
1696            0,
1697            0,
1698        )];
1699        let accum_plan = AccumulablePlan {
1700            full_aggrs: vec![AggregateExpr {
1701                func: AggregateFunc::SumInt64,
1702                expr: ScalarExpr::Column(0),
1703                distinct: false,
1704            }],
1705            simple_aggrs,
1706            distinct_aggrs: vec![],
1707        };
1708
1709        let reduce_plan = ReducePlan::Accumulable(accum_plan);
1710        let bundle = ctx
1711            .render_reduce(
1712                Box::new(input_plan.with_types(typ.into_unnamed())),
1713                key_val_plan,
1714                reduce_plan,
1715                RelationType::empty(),
1716            )
1717            .unwrap();
1718
1719        let output = get_output_handle(&mut ctx, bundle);
1720        drop(ctx);
1721        let expected = BTreeMap::from([
1722            (1, vec![(Row::new(vec![1i64.into()]), 1, 1)]),
1723            (2, vec![(Row::new(vec![3i64.into()]), 2, 1)]),
1724            (3, vec![(Row::new(vec![6i64.into()]), 3, 1)]),
1725            (4, vec![(Row::new(vec![7i64.into()]), 4, 1)]),
1726            (5, vec![(Row::new(vec![9i64.into()]), 5, 1)]),
1727            (6, vec![(Row::new(vec![12i64.into()]), 6, 1)]),
1728        ]);
1729        run_and_check(&mut state, &mut df, 1..7, expected, output);
1730    }
1731
1732    /// SELECT SUM(DISTINCT col) FROM table
1733    ///
1734    /// table schema:
1735    /// | name | type  |
1736    /// |------|-------|
1737    /// | col  | Int64 |
1738    ///
1739    /// this test include even more insert/delete case to cover all case for eval_distinct_core
1740    #[test]
1741    fn test_delete_reduce_distinct_accum() {
1742        let mut df = Dfir::new();
1743        let mut state = DataflowState::default();
1744        let mut ctx = harness_test_ctx(&mut df, &mut state);
1745
1746        let rows = vec![
1747            // same tick
1748            (Row::new(vec![1i64.into()]), 1, 1),
1749            (Row::new(vec![1i64.into()]), 1, -1),
1750            // next tick
1751            (Row::new(vec![1i64.into()]), 2, 1),
1752            (Row::new(vec![1i64.into()]), 3, -1),
1753            // repeat in same tick
1754            (Row::new(vec![1i64.into()]), 4, 1),
1755            (Row::new(vec![1i64.into()]), 4, -1),
1756            (Row::new(vec![1i64.into()]), 4, 1),
1757        ];
1758        let collection = ctx.render_constant(rows.clone());
1759        ctx.insert_global(GlobalId::User(1), collection);
1760        let input_plan = Plan::Get {
1761            id: expr::Id::Global(GlobalId::User(1)),
1762        };
1763        let typ = RelationType::new(vec![ColumnType::new_nullable(
1764            ConcreteDataType::int64_datatype(),
1765        )]);
1766        let key_val_plan = KeyValPlan {
1767            key_plan: MapFilterProject::new(1).project([]).unwrap().into_safe(),
1768            val_plan: MapFilterProject::new(1).project([0]).unwrap().into_safe(),
1769        };
1770
1771        let distinct_aggrs = vec![AggrWithIndex::new(
1772            AggregateExpr {
1773                func: AggregateFunc::SumInt64,
1774                expr: ScalarExpr::Column(0),
1775                distinct: false,
1776            },
1777            0,
1778            0,
1779        )];
1780        let accum_plan = AccumulablePlan {
1781            full_aggrs: vec![AggregateExpr {
1782                func: AggregateFunc::SumInt64,
1783                expr: ScalarExpr::Column(0),
1784                distinct: true,
1785            }],
1786            simple_aggrs: vec![],
1787            distinct_aggrs,
1788        };
1789
1790        let reduce_plan = ReducePlan::Accumulable(accum_plan);
1791        let bundle = ctx
1792            .render_reduce(
1793                Box::new(input_plan.with_types(typ.into_unnamed())),
1794                key_val_plan,
1795                reduce_plan,
1796                RelationType::empty(),
1797            )
1798            .unwrap();
1799
1800        let output = get_output_handle(&mut ctx, bundle);
1801        drop(ctx);
1802        let expected = BTreeMap::from([
1803            (1, vec![(Row::new(vec![0i64.into()]), 1, 1)]),
1804            (2, vec![(Row::new(vec![1i64.into()]), 2, 1)]),
1805            (3, vec![(Row::new(vec![0i64.into()]), 3, 1)]),
1806            (4, vec![(Row::new(vec![1i64.into()]), 4, 1)]),
1807        ]);
1808        run_and_check(&mut state, &mut df, 1..7, expected, output);
1809    }
1810
1811    /// SELECT SUM(DISTINCT col) FROM table
1812    ///
1813    /// table schema:
1814    /// | name | type  |
1815    /// |------|-------|
1816    /// | col  | Int64 |
1817    ///
1818    /// this test include insert and delete which should cover all case for eval_distinct_core
1819    #[test]
1820    fn test_basic_reduce_distinct_accum() {
1821        let mut df = Dfir::new();
1822        let mut state = DataflowState::default();
1823        let mut ctx = harness_test_ctx(&mut df, &mut state);
1824
1825        let rows = vec![
1826            (Row::new(vec![1i64.into()]), 1, 1),
1827            (Row::new(vec![1i64.into()]), 1, -1),
1828            (Row::new(vec![2i64.into()]), 2, 1),
1829            (Row::new(vec![3i64.into()]), 3, 1),
1830            (Row::new(vec![1i64.into()]), 4, 1),
1831            (Row::new(vec![2i64.into()]), 5, 1),
1832            (Row::new(vec![3i64.into()]), 6, 1),
1833            (Row::new(vec![1i64.into()]), 7, 1),
1834        ];
1835        let collection = ctx.render_constant(rows.clone());
1836        ctx.insert_global(GlobalId::User(1), collection);
1837        let input_plan = Plan::Get {
1838            id: expr::Id::Global(GlobalId::User(1)),
1839        };
1840        let typ = RelationType::new(vec![ColumnType::new_nullable(
1841            ConcreteDataType::int64_datatype(),
1842        )]);
1843        let key_val_plan = KeyValPlan {
1844            key_plan: MapFilterProject::new(1).project([]).unwrap().into_safe(),
1845            val_plan: MapFilterProject::new(1).project([0]).unwrap().into_safe(),
1846        };
1847
1848        let distinct_aggrs = vec![AggrWithIndex::new(
1849            AggregateExpr {
1850                func: AggregateFunc::SumInt64,
1851                expr: ScalarExpr::Column(0),
1852                distinct: false,
1853            },
1854            0,
1855            0,
1856        )];
1857        let accum_plan = AccumulablePlan {
1858            full_aggrs: vec![AggregateExpr {
1859                func: AggregateFunc::SumInt64,
1860                expr: ScalarExpr::Column(0),
1861                distinct: true,
1862            }],
1863            simple_aggrs: vec![],
1864            distinct_aggrs,
1865        };
1866
1867        let reduce_plan = ReducePlan::Accumulable(accum_plan);
1868        let bundle = ctx
1869            .render_reduce(
1870                Box::new(input_plan.with_types(typ.into_unnamed())),
1871                key_val_plan,
1872                reduce_plan,
1873                RelationType::empty(),
1874            )
1875            .unwrap();
1876
1877        let output = get_output_handle(&mut ctx, bundle);
1878        drop(ctx);
1879        let expected = BTreeMap::from([
1880            (1, vec![(Row::new(vec![0i64.into()]), 1, 1)]),
1881            (2, vec![(Row::new(vec![2i64.into()]), 2, 1)]),
1882            (3, vec![(Row::new(vec![5i64.into()]), 3, 1)]),
1883            (4, vec![(Row::new(vec![6i64.into()]), 4, 1)]),
1884            (5, vec![(Row::new(vec![6i64.into()]), 5, 1)]),
1885            (6, vec![(Row::new(vec![6i64.into()]), 6, 1)]),
1886            (7, vec![(Row::new(vec![6i64.into()]), 7, 1)]),
1887        ]);
1888        run_and_check(&mut state, &mut df, 1..7, expected, output);
1889    }
1890
1891    /// SELECT SUM(col), SUM(DISTINCT col) FROM table
1892    ///
1893    /// table schema:
1894    /// | name | type  |
1895    /// |------|-------|
1896    /// | col  | Int64 |
1897    #[test]
1898    fn test_composite_reduce_distinct_accum() {
1899        let mut df = Dfir::new();
1900        let mut state = DataflowState::default();
1901        let mut ctx = harness_test_ctx(&mut df, &mut state);
1902
1903        let rows = vec![
1904            (Row::new(vec![1i64.into()]), 1, 1),
1905            (Row::new(vec![2i64.into()]), 2, 1),
1906            (Row::new(vec![3i64.into()]), 3, 1),
1907            (Row::new(vec![1i64.into()]), 4, 1),
1908            (Row::new(vec![2i64.into()]), 5, 1),
1909            (Row::new(vec![3i64.into()]), 6, 1),
1910            (Row::new(vec![1i64.into()]), 7, 1),
1911        ];
1912        let collection = ctx.render_constant(rows.clone());
1913        ctx.insert_global(GlobalId::User(1), collection);
1914        let input_plan = Plan::Get {
1915            id: expr::Id::Global(GlobalId::User(1)),
1916        };
1917        let typ = RelationType::new(vec![ColumnType::new_nullable(
1918            ConcreteDataType::int64_datatype(),
1919        )]);
1920        let key_val_plan = KeyValPlan {
1921            key_plan: MapFilterProject::new(1).project([]).unwrap().into_safe(),
1922            val_plan: MapFilterProject::new(1).project([0]).unwrap().into_safe(),
1923        };
1924        let simple_aggrs = vec![AggrWithIndex::new(
1925            AggregateExpr {
1926                func: AggregateFunc::SumInt64,
1927                expr: ScalarExpr::Column(0),
1928                distinct: false,
1929            },
1930            0,
1931            0,
1932        )];
1933        let distinct_aggrs = vec![AggrWithIndex::new(
1934            AggregateExpr {
1935                func: AggregateFunc::SumInt64,
1936                expr: ScalarExpr::Column(0),
1937                distinct: true,
1938            },
1939            0,
1940            1,
1941        )];
1942        let accum_plan = AccumulablePlan {
1943            full_aggrs: vec![
1944                AggregateExpr {
1945                    func: AggregateFunc::SumInt64,
1946                    expr: ScalarExpr::Column(0),
1947                    distinct: false,
1948                },
1949                AggregateExpr {
1950                    func: AggregateFunc::SumInt64,
1951                    expr: ScalarExpr::Column(0),
1952                    distinct: true,
1953                },
1954            ],
1955            simple_aggrs,
1956            distinct_aggrs,
1957        };
1958
1959        let reduce_plan = ReducePlan::Accumulable(accum_plan);
1960        let bundle = ctx
1961            .render_reduce(
1962                Box::new(input_plan.with_types(typ.into_unnamed())),
1963                key_val_plan,
1964                reduce_plan,
1965                RelationType::empty(),
1966            )
1967            .unwrap();
1968
1969        let output = get_output_handle(&mut ctx, bundle);
1970        drop(ctx);
1971        let expected = BTreeMap::from([
1972            (1, vec![(Row::new(vec![1i64.into(), 1i64.into()]), 1, 1)]),
1973            (2, vec![(Row::new(vec![3i64.into(), 3i64.into()]), 2, 1)]),
1974            (3, vec![(Row::new(vec![6i64.into(), 6i64.into()]), 3, 1)]),
1975            (4, vec![(Row::new(vec![7i64.into(), 6i64.into()]), 4, 1)]),
1976            (5, vec![(Row::new(vec![9i64.into(), 6i64.into()]), 5, 1)]),
1977            (6, vec![(Row::new(vec![12i64.into(), 6i64.into()]), 6, 1)]),
1978            (7, vec![(Row::new(vec![13i64.into(), 6i64.into()]), 7, 1)]),
1979        ]);
1980        run_and_check(&mut state, &mut df, 1..7, expected, output);
1981    }
1982}