use std::collections::{BTreeMap, BTreeSet};
use std::ops::Range;
use std::sync::Arc;
use arrow::array::new_null_array;
use common_telemetry::trace;
use datatypes::data_type::ConcreteDataType;
use datatypes::prelude::DataType;
use datatypes::value::{ListValue, Value};
use datatypes::vectors::{BooleanVector, NullVector};
use hydroflow::scheduled::graph_ext::GraphExt;
use itertools::Itertools;
use snafu::{ensure, OptionExt, ResultExt};
use crate::compute::render::{Context, SubgraphArg};
use crate::compute::types::{Arranged, Collection, CollectionBundle, ErrCollector, Toff};
use crate::error::{Error, NotImplementedSnafu, PlanSnafu};
use crate::expr::error::{ArrowSnafu, DataAlreadyExpiredSnafu, DataTypeSnafu, InternalSnafu};
use crate::expr::{Accum, Accumulator, Batch, EvalError, ScalarExpr, VectorDiff};
use crate::plan::{AccumulablePlan, AggrWithIndex, KeyValPlan, ReducePlan, TypedPlan};
use crate::repr::{self, DiffRow, KeyValDiffRow, RelationType, Row};
use crate::utils::{ArrangeHandler, ArrangeReader, ArrangeWriter, KeyExpiryManager};
impl Context<'_, '_> {
const REDUCE_BATCH: &'static str = "reduce_batch";
#[allow(clippy::mutable_key_type)]
pub fn render_reduce_batch(
&mut self,
input: Box<TypedPlan>,
key_val_plan: &KeyValPlan,
reduce_plan: &ReducePlan,
output_type: &RelationType,
) -> Result<CollectionBundle<Batch>, Error> {
let accum_plan = if let ReducePlan::Accumulable(accum_plan) = reduce_plan {
if !accum_plan.distinct_aggrs.is_empty() {
NotImplementedSnafu {
reason: "Distinct aggregation is not supported in batch mode",
}
.fail()?
}
accum_plan.clone()
} else {
NotImplementedSnafu {
reason: "Only accumulable reduce plan is supported in batch mode",
}
.fail()?
};
let input = self.render_plan_batch(*input)?;
let output_key_arity = key_val_plan.key_plan.output_arity();
let arrange_handler = self.compute_state.new_arrange(None);
if let (Some(time_index), Some(expire_after)) =
(output_type.time_index, self.compute_state.expire_after())
{
let expire_man =
KeyExpiryManager::new(Some(expire_after), Some(ScalarExpr::Column(time_index)));
arrange_handler.write().set_expire_state(expire_man);
}
let arrange_handler_inner = arrange_handler.clone_full_arrange().context(PlanSnafu {
reason: "No write is expected at this point",
})?;
let key_val_plan = key_val_plan.clone();
let output_type = output_type.clone();
let now = self.compute_state.current_time_ref();
let err_collector = self.err_collector.clone();
let scheduler = self.compute_state.get_scheduler();
let scheduler_inner = scheduler.clone();
let (out_send_port, out_recv_port) =
self.df.make_edge::<_, Toff<Batch>>(Self::REDUCE_BATCH);
let subgraph = self.df.add_subgraph_in_out(
Self::REDUCE_BATCH,
input.collection.into_inner(),
out_send_port,
move |_ctx, recv, send| {
let now = *(now.borrow());
let arrange = arrange_handler_inner.clone();
let src_data = recv
.take_inner()
.into_iter()
.flat_map(|v| v.into_iter())
.collect_vec();
reduce_batch_subgraph(
&arrange,
src_data,
&key_val_plan,
&accum_plan,
&output_type,
SubgraphArg {
now,
err_collector: &err_collector,
scheduler: &scheduler_inner,
send,
},
)
},
);
scheduler.set_cur_subgraph(subgraph);
let arranged = BTreeMap::from([(
(0..output_key_arity).map(ScalarExpr::Column).collect_vec(),
Arranged::new(arrange_handler),
)]);
let bundle = CollectionBundle {
collection: Collection::from_port(out_recv_port),
arranged,
};
Ok(bundle)
}
const REDUCE: &'static str = "reduce";
#[allow(clippy::mutable_key_type)]
pub fn render_reduce(
&mut self,
input: Box<TypedPlan>,
key_val_plan: KeyValPlan,
reduce_plan: ReducePlan,
output_type: RelationType,
) -> Result<CollectionBundle, Error> {
let input = self.render_plan(*input)?;
let output_key_arity = key_val_plan.key_plan.output_arity();
let arrange_handler = self.compute_state.new_arrange(None);
if let (Some(time_index), Some(expire_after)) =
(output_type.time_index, self.compute_state.expire_after())
{
let expire_man =
KeyExpiryManager::new(Some(expire_after), Some(ScalarExpr::Column(time_index)));
arrange_handler.write().set_expire_state(expire_man);
}
let arrange_handler_inner = arrange_handler.clone_full_arrange().context(PlanSnafu {
reason: "No write is expected at this point",
})?;
let distinct_input = self.add_accum_distinct_input_arrange(&reduce_plan);
let reduce_arrange = ReduceArrange {
output_arrange: arrange_handler_inner,
distinct_input,
};
let now = self.compute_state.current_time_ref();
let err_collector = self.err_collector.clone();
let scheduler = self.compute_state.get_scheduler();
let scheduler_inner = scheduler.clone();
let (out_send_port, out_recv_port) = self.df.make_edge::<_, Toff>(Self::REDUCE);
let subgraph = self.df.add_subgraph_in_out(
Self::REDUCE,
input.collection.into_inner(),
out_send_port,
move |_ctx, recv, send| {
let data = recv
.take_inner()
.into_iter()
.flat_map(|v| v.into_iter())
.collect_vec();
reduce_subgraph(
&reduce_arrange,
data,
&key_val_plan,
&reduce_plan,
SubgraphArg {
now: *now.borrow(),
err_collector: &err_collector,
scheduler: &scheduler_inner,
send,
},
);
},
);
scheduler.set_cur_subgraph(subgraph);
let arranged = BTreeMap::from([(
(0..output_key_arity).map(ScalarExpr::Column).collect_vec(),
Arranged::new(arrange_handler),
)]);
let bundle = CollectionBundle {
collection: Collection::from_port(out_recv_port),
arranged,
};
Ok(bundle)
}
fn add_accum_distinct_input_arrange(
&mut self,
reduce_plan: &ReducePlan,
) -> Option<Vec<ArrangeHandler>> {
match reduce_plan {
ReducePlan::Distinct => None,
ReducePlan::Accumulable(AccumulablePlan { distinct_aggrs, .. }) => {
(!distinct_aggrs.is_empty()).then(|| {
std::iter::repeat_with(|| {
let arr = self.compute_state.new_arrange(None);
arr.set_full_arrangement(true);
arr
})
.take(distinct_aggrs.len())
.collect()
})
}
}
}
}
fn from_accum_values_to_live_accums(
accums: Vec<Value>,
len: usize,
) -> Result<Vec<Vec<Value>>, EvalError> {
let accum_ranges = from_val_to_slice_idx(accums.first().cloned(), len)?;
let mut accum_list = vec![];
for range in accum_ranges.iter() {
accum_list.push(accums.get(range.clone()).unwrap_or_default().to_vec());
}
Ok(accum_list)
}
pub struct ReduceArrange {
output_arrange: ArrangeHandler,
distinct_input: Option<Vec<ArrangeHandler>>,
}
fn batch_split_by_key_val(
batch: &Batch,
key_val_plan: &KeyValPlan,
err_collector: &ErrCollector,
) -> (Batch, Batch) {
let row_count = batch.row_count();
let mut key_batch = Batch::empty();
let mut val_batch = Batch::empty();
err_collector.run(|| {
if key_val_plan.key_plan.output_arity() != 0 {
key_batch = key_val_plan.key_plan.eval_batch_into(&mut batch.clone())?;
}
if key_val_plan.val_plan.output_arity() != 0 {
val_batch = key_val_plan.val_plan.eval_batch_into(&mut batch.clone())?;
}
Ok(())
});
if key_batch.row_count() == 0 && key_batch.column_count() == 0 {
key_batch.set_row_count(row_count);
}
if val_batch.row_count() == 0 && val_batch.column_count() == 0 {
val_batch.set_row_count(row_count);
}
(key_batch, val_batch)
}
fn split_rows_to_key_val(
rows: impl IntoIterator<Item = DiffRow>,
key_val_plan: KeyValPlan,
err_collector: ErrCollector,
) -> impl IntoIterator<Item = KeyValDiffRow> {
let mut row_buf = Row::new(vec![]);
rows.into_iter().filter_map(
move |(mut row, sys_time, diff): DiffRow| -> Option<KeyValDiffRow> {
err_collector.run(|| {
let len = row.len();
if let Some(key) = key_val_plan
.key_plan
.evaluate_into(&mut row.inner, &mut row_buf)?
{
row.inner.resize(len, Value::Null);
let val = key_val_plan
.val_plan
.evaluate_into(&mut row.inner, &mut row_buf)?
.context(InternalSnafu {
reason: "val_plan should not contain any filter predicate",
})?;
Ok(Some(((key, val), sys_time, diff)))
} else {
Ok(None)
}
})?
},
)
}
fn reduce_batch_subgraph(
arrange: &ArrangeHandler,
src_data: impl IntoIterator<Item = Batch>,
key_val_plan: &KeyValPlan,
accum_plan: &AccumulablePlan,
output_type: &RelationType,
SubgraphArg {
now,
err_collector,
scheduler: _,
send,
}: SubgraphArg<Toff<Batch>>,
) {
let mut key_to_many_vals = BTreeMap::<Row, Vec<Batch>>::new();
let mut input_row_count = 0;
let mut input_batch_count = 0;
for batch in src_data {
input_batch_count += 1;
input_row_count += batch.row_count();
err_collector.run(|| {
let (key_batch, val_batch) =
batch_split_by_key_val(&batch, key_val_plan, err_collector);
ensure!(
key_batch.row_count() == val_batch.row_count(),
InternalSnafu {
reason: format!(
"Key and val batch should have the same row count, found {} and {}",
key_batch.row_count(),
val_batch.row_count()
)
}
);
let mut distinct_keys = BTreeSet::new();
for row_idx in 0..key_batch.row_count() {
let key_row = key_batch.get_row(row_idx)?;
let key_row = Row::new(key_row);
if distinct_keys.contains(&key_row) {
continue;
} else {
distinct_keys.insert(key_row.clone());
}
}
let key_data_types = output_type
.column_types
.iter()
.map(|t| t.scalar_type.clone())
.collect_vec();
for key_row in distinct_keys {
let key_scalar_value = {
let mut key_scalar_value = Vec::with_capacity(key_row.len());
for (key_idx, key) in key_row.iter().enumerate() {
let v =
key.try_to_scalar_value(&key.data_type())
.context(DataTypeSnafu {
msg: "can't convert key values to datafusion value",
})?;
let key_data_type = key_data_types.get(key_idx).context(InternalSnafu {
reason: format!(
"Key index out of bound, expected at most {} but got {}",
output_type.column_types.len(),
key_idx
),
})?;
if key_data_type.as_arrow_type() != v.data_type()
&& !v.data_type().is_null()
{
crate::expr::error::InternalSnafu {
reason: format!(
"Key data type mismatch, expected {:?} but got {:?}",
key_data_type.as_arrow_type(),
v.data_type()
),
}
.fail()?
}
let arrow_value = if v.data_type().is_null() {
let ret = new_null_array(&arrow::datatypes::DataType::Null, 1);
arrow::array::Scalar::new(ret)
} else {
v.to_scalar().context(crate::expr::error::DatafusionSnafu {
context: "can't convert key values to arrow value",
})?
};
key_scalar_value.push(arrow_value);
}
key_scalar_value
};
let eq_results = key_scalar_value
.into_iter()
.zip(key_batch.batch().iter())
.map(|(key, col)| {
if arrow::array::Datum::get(&key).0.data_type().is_null() {
arrow::compute::kernels::boolean::is_null(
col.to_arrow_array().as_ref() as _
)
} else {
arrow::compute::kernels::cmp::eq(
&key,
&col.to_arrow_array().as_ref() as _,
)
}
})
.try_collect::<_, Vec<_>, _>()
.context(ArrowSnafu {
context: "Failed to compare key values",
})?;
let opt_eq_mask = eq_results
.into_iter()
.fold(None, |acc, v| match acc {
Some(Ok(acc)) => Some(arrow::compute::kernels::boolean::and(&acc, &v)),
Some(Err(_)) => acc,
None => Some(Ok(v)),
})
.transpose()
.context(ArrowSnafu {
context: "Failed to combine key comparison results",
})?;
let key_eq_mask = if let Some(eq_mask) = opt_eq_mask {
BooleanVector::from(eq_mask)
} else {
BooleanVector::from(vec![true; key_batch.row_count()])
};
let cur_val_batch = val_batch.filter(&key_eq_mask)?;
key_to_many_vals
.entry(key_row)
.or_default()
.push(cur_val_batch);
}
Ok(())
});
}
trace!(
"Reduce take {} batches, {} rows",
input_batch_count,
input_row_count
);
let mut arrange = arrange.write();
let mut all_arrange_updates = Vec::with_capacity(key_to_many_vals.len());
let mut all_output_dict = BTreeMap::new();
for (key, val_batches) in key_to_many_vals {
err_collector.run(|| -> Result<(), _> {
let (accums, _, _) = arrange.get(now, &key).unwrap_or_default();
let accum_list =
from_accum_values_to_live_accums(accums.unpack(), accum_plan.simple_aggrs.len())?;
let mut accum_output = AccumOutput::new();
for AggrWithIndex {
expr,
input_idx,
output_idx,
} in accum_plan.simple_aggrs.iter()
{
let cur_accum_value = accum_list.get(*output_idx).cloned().unwrap_or_default();
let mut cur_accum = if cur_accum_value.is_empty() {
Accum::new_accum(&expr.func.clone())?
} else {
Accum::try_into_accum(&expr.func, cur_accum_value)?
};
for val_batch in val_batches.iter() {
let cur_input = val_batch
.batch()
.get(*input_idx)
.cloned()
.unwrap_or_else(|| Arc::new(NullVector::new(val_batch.row_count())));
let len = cur_input.len();
cur_accum.update_batch(&expr.func, VectorDiff::from(cur_input))?;
trace!("Reduce accum after take {} rows: {:?}", len, cur_accum);
}
let final_output = cur_accum.eval(&expr.func)?;
trace!("Reduce accum final output: {:?}", final_output);
accum_output.insert_output(*output_idx, final_output);
let cur_accum_value = cur_accum.into_state();
accum_output.insert_accum(*output_idx, cur_accum_value);
}
let (new_accums, res_val_row) = accum_output.into_accum_output()?;
let arrange_update = ((key.clone(), Row::new(new_accums)), now, 1);
all_arrange_updates.push(arrange_update);
all_output_dict.insert(key, Row::from(res_val_row));
Ok(())
});
}
err_collector.run(|| {
arrange.apply_updates(now, all_arrange_updates)?;
arrange.compact_to(now)
});
drop(arrange);
let output_types = output_type
.column_types
.iter()
.map(|t| t.scalar_type.clone())
.collect_vec();
err_collector.run(|| {
let column_cnt = output_types.len();
let row_cnt = all_output_dict.len();
let mut output_builder = output_types
.into_iter()
.map(|t| t.create_mutable_vector(row_cnt))
.collect_vec();
for (key, val) in all_output_dict {
for (i, v) in key.into_iter().chain(val.into_iter()).enumerate() {
output_builder
.get_mut(i)
.context(InternalSnafu{
reason: format!(
"Output builder should have the same length as the row, expected at most {} but got {}",
column_cnt - 1,
i
)
})?
.try_push_value_ref(v.as_value_ref())
.context(DataTypeSnafu {
msg: "Failed to push value",
})?;
}
}
let output_columns = output_builder
.into_iter()
.map(|mut b| b.to_vector())
.collect_vec();
let output_batch = Batch::try_new(output_columns, row_cnt)?;
trace!("Reduce output batch: {:?}", output_batch);
send.give(vec![output_batch]);
Ok(())
});
}
fn reduce_subgraph(
ReduceArrange {
output_arrange: arrange,
distinct_input,
}: &ReduceArrange,
data: impl IntoIterator<Item = DiffRow>,
key_val_plan: &KeyValPlan,
reduce_plan: &ReducePlan,
SubgraphArg {
now,
err_collector,
scheduler,
send,
}: SubgraphArg,
) {
let key_val = split_rows_to_key_val(data, key_val_plan.clone(), err_collector.clone());
match reduce_plan {
ReducePlan::Distinct => reduce_distinct_subgraph(
arrange,
key_val,
SubgraphArg {
now,
err_collector,
scheduler,
send,
},
),
ReducePlan::Accumulable(accum_plan) => reduce_accum_subgraph(
arrange,
distinct_input,
key_val,
accum_plan,
SubgraphArg {
now,
err_collector,
scheduler,
send,
},
),
};
}
fn eval_distinct_core(
arrange: ArrangeReader,
kv: impl IntoIterator<Item = KeyValDiffRow>,
now: repr::Timestamp,
err_collector: &ErrCollector,
) -> Vec<KeyValDiffRow> {
let _ = err_collector;
let mut inner_map = BTreeMap::new();
kv.into_iter()
.filter_map(|((key, val), ts, diff)| {
let old_val = inner_map
.get(&key)
.cloned()
.or_else(|| arrange.get(now, &key));
let new_key_val = match (old_val, diff) {
(None, 1) => Some(((key, val), ts, diff)),
(Some(old_val), diff) if old_val.0 == val && old_val.2 != diff => {
Some(((key, val), ts, diff))
}
_ => None,
};
if let Some(((k, v), t, d)) = new_key_val.clone() {
inner_map.insert(k, (v, t, d));
}
new_key_val
})
.collect_vec()
}
fn update_reduce_distinct_arrange(
arrange: &ArrangeHandler,
kv: impl IntoIterator<Item = KeyValDiffRow>,
now: repr::Timestamp,
err_collector: &ErrCollector,
) -> impl Iterator<Item = DiffRow> {
let result_updates = eval_distinct_core(arrange.read(), kv, now, err_collector);
err_collector.run(|| {
arrange.write().apply_updates(now, result_updates)?;
Ok(())
});
let from = arrange.read().last_compaction_time();
let from = from.unwrap_or(repr::Timestamp::MIN);
let range = (
std::ops::Bound::Excluded(from),
std::ops::Bound::Included(now),
);
let output_kv = arrange.read().get_updates_in_range(range);
let run_compaction = || {
arrange.write().compact_to(now)?;
Ok(())
};
err_collector.run(run_compaction);
output_kv.into_iter().map(|((mut key, v), ts, diff)| {
key.extend(v.into_iter());
(key, ts, diff)
})
}
fn reduce_distinct_subgraph(
arrange: &ArrangeHandler,
kv: impl IntoIterator<Item = KeyValDiffRow>,
SubgraphArg {
now,
err_collector,
scheduler: _,
send,
}: SubgraphArg,
) {
let ret = update_reduce_distinct_arrange(arrange, kv, now, err_collector).collect_vec();
if arrange.read().get_next_update_time(&now).is_some() {
err_collector.push_err(
InternalSnafu {
reason: "No future updates should exist in the reduce distinct arrangement",
}
.build(),
);
}
send.give(ret);
}
fn reduce_accum_subgraph(
arrange: &ArrangeHandler,
distinct_input: &Option<Vec<ArrangeHandler>>,
kv: impl IntoIterator<Item = KeyValDiffRow>,
accum_plan: &AccumulablePlan,
SubgraphArg {
now,
err_collector,
scheduler,
send,
}: SubgraphArg,
) {
let AccumulablePlan {
full_aggrs,
simple_aggrs,
distinct_aggrs,
} = accum_plan;
let mut key_to_vals = BTreeMap::<Row, Vec<(Row, repr::Diff)>>::new();
for ((key, val), _tick, diff) in kv {
let vals = key_to_vals.entry(key).or_default();
vals.push((val, diff));
}
let mut all_updates = Vec::with_capacity(key_to_vals.len());
let mut all_outputs = Vec::with_capacity(key_to_vals.len());
let mut arrange = arrange.write();
for (key, value_diffs) in key_to_vals {
if let Some(expire_man) = &arrange.get_expire_state() {
let mut is_expired = false;
err_collector.run(|| {
if let Some(expired) = expire_man.get_expire_duration(now, &key)? {
is_expired = true;
common_telemetry::warn!(
"Data already expired: {}",
DataAlreadyExpiredSnafu {
expired_by: expired,
}
.build()
);
Ok(())
} else {
Ok(())
}
});
if is_expired {
continue;
}
}
let col_diffs = {
let row_len = value_diffs[0].0.len();
let res = err_collector.run(|| get_col_diffs(value_diffs, row_len));
match res {
Some(res) => res,
None => continue,
}
};
let (accums, _, _) = arrange.get(now, &key).unwrap_or_default();
let accums = accums.inner;
let accum_ranges = {
let res = err_collector
.run(|| from_val_to_slice_idx(accums.first().cloned(), full_aggrs.len()));
if let Some(res) = res {
res
} else {
continue;
}
};
let mut accum_output = AccumOutput::new();
eval_simple_aggrs(
simple_aggrs,
&accums,
&accum_ranges,
&col_diffs,
&mut accum_output,
err_collector,
);
eval_distinct_aggrs(
distinct_aggrs,
distinct_input,
&accums,
&accum_ranges,
&col_diffs,
&mut accum_output,
SubgraphArg {
now,
err_collector,
scheduler,
send,
},
);
err_collector.run(|| {
let (new_accums, res_val_row) = accum_output.into_accum_output()?;
all_updates.push(((key.clone(), Row::new(new_accums)), now, 1));
let mut key_val = key;
key_val.extend(res_val_row);
all_outputs.push((key_val, now, 1));
Ok(())
});
}
err_collector.run(|| {
arrange.apply_updates(now, all_updates)?;
arrange.compact_to(now)
});
let all_arrange_used = distinct_input
.iter()
.flatten()
.map(|d| d.write())
.chain(std::iter::once(arrange));
check_no_future_updates(all_arrange_used, err_collector, now);
send.give(all_outputs);
}
fn get_col_diffs(
value_diffs: Vec<(Row, repr::Diff)>,
row_len: usize,
) -> Result<Vec<Vec<(Value, i64)>>, EvalError> {
ensure!(
value_diffs.iter().all(|(row, _)| row.len() == row_len),
InternalSnafu {
reason: "value_diffs should have rows with equal length"
}
);
let ret = (0..row_len)
.map(|i| {
value_diffs
.iter()
.map(|(row, diff)| (row.get(i).cloned().unwrap(), *diff))
.collect_vec()
})
.collect_vec();
Ok(ret)
}
fn eval_simple_aggrs(
simple_aggrs: &Vec<AggrWithIndex>,
accums: &[Value],
accum_ranges: &[Range<usize>],
col_diffs: &[Vec<(Value, i64)>],
accum_output: &mut AccumOutput,
err_collector: &ErrCollector,
) {
for AggrWithIndex {
expr,
input_idx,
output_idx,
} in simple_aggrs
{
let cur_accum_range = accum_ranges[*output_idx].clone(); let cur_old_accum = accums
.get(cur_accum_range)
.unwrap_or_default()
.iter()
.cloned();
let cur_col_diff = col_diffs[*input_idx].iter().cloned();
if let Some((res, new_accum)) =
err_collector.run(|| expr.func.eval_diff_accumulable(cur_old_accum, cur_col_diff))
{
accum_output.insert_accum(*output_idx, new_accum);
accum_output.insert_output(*output_idx, res);
} }
}
#[derive(Debug)]
struct AccumOutput {
accum: BTreeMap<usize, Vec<Value>>,
output: BTreeMap<usize, Value>,
}
impl AccumOutput {
fn new() -> Self {
Self {
accum: BTreeMap::new(),
output: BTreeMap::new(),
}
}
fn insert_accum(&mut self, idx: usize, v: Vec<Value>) {
self.accum.insert(idx, v);
}
fn insert_output(&mut self, idx: usize, v: Value) {
self.output.insert(idx, v);
}
fn into_accum_output(self) -> Result<(Vec<Value>, Vec<Value>), EvalError> {
if self.accum.is_empty() && self.output.is_empty() {
return Ok((vec![], vec![]));
}
ensure!(
!self.accum.is_empty() && self.accum.len() == self.output.len(),
InternalSnafu {
reason: format!("Accum and output should have the non-zero and same length, found accum.len() = {}, output.len() = {}", self.accum.len(), self.output.len())
}
);
if let Some(kv) = self.accum.last_key_value() {
ensure!(
*kv.0 == self.accum.len() - 1,
InternalSnafu {
reason: "Accum should be a continuous range"
}
);
}
if let Some(kv) = self.output.last_key_value() {
ensure!(
*kv.0 == self.output.len() - 1,
InternalSnafu {
reason: "Output should be a continuous range"
}
);
}
let accums = self.accum.into_values().collect_vec();
let new_accums = from_accums_to_offsetted_accum(accums);
let output = self.output.into_values().collect_vec();
Ok((new_accums, output))
}
}
fn eval_distinct_aggrs(
distinct_aggrs: &Vec<AggrWithIndex>,
distinct_input: &Option<Vec<ArrangeHandler>>,
accums: &[Value],
accum_ranges: &[Range<usize>],
col_diffs: &[Vec<(Value, i64)>],
accum_output: &mut AccumOutput,
SubgraphArg {
now,
err_collector,
scheduler: _,
send: _,
}: SubgraphArg,
) {
for AggrWithIndex {
expr,
input_idx,
output_idx,
} in distinct_aggrs
{
let cur_accum_range = accum_ranges[*output_idx].clone(); let cur_old_accum = accums
.get(cur_accum_range)
.unwrap_or_default()
.iter()
.cloned();
let cur_col_diff = col_diffs[*input_idx].iter().cloned();
let input_arrange = distinct_input
.as_ref()
.and_then(|v| v[*input_idx].clone_full_arrange())
.expect("A full distinct input arrangement should exist");
let kv = cur_col_diff.map(|(v, d)| ((Row::new(vec![v]), Row::empty()), now, d));
let col_diff_distinct =
update_reduce_distinct_arrange(&input_arrange, kv, now, err_collector).map(
|(row, _ts, diff)| (row.get(0).expect("Row should not be empty").clone(), diff),
);
let col_diff_distinct = {
let res = col_diff_distinct.collect_vec();
res.into_iter()
};
let (res, new_accum) = expr
.func
.eval_diff_accumulable(cur_old_accum, col_diff_distinct)
.unwrap();
accum_output.insert_accum(*output_idx, new_accum);
accum_output.insert_output(*output_idx, res);
}
}
fn check_no_future_updates<'a>(
all_arrange_used: impl IntoIterator<Item = ArrangeWriter<'a>>,
err_collector: &ErrCollector,
now: repr::Timestamp,
) {
for arrange in all_arrange_used {
if arrange.get_next_update_time(&now).is_some() {
err_collector.push_err(
InternalSnafu {
reason: "No future updates should exist in the reduce distinct arrangement",
}
.build(),
);
}
}
}
fn from_accums_to_offsetted_accum(new_accums: Vec<Vec<Value>>) -> Vec<Value> {
let offset = new_accums
.iter()
.map(|v| v.len() as u64)
.scan(1, |state, x| {
*state += x;
Some(*state)
})
.map(Value::from)
.collect::<Vec<_>>();
let first = ListValue::new(offset, ConcreteDataType::uint64_datatype());
let first = Value::List(first);
std::iter::once(first)
.chain(new_accums.into_iter().flatten())
.collect::<Vec<_>>()
}
fn from_val_to_slice_idx(
value: Option<Value>,
expected_len: usize,
) -> Result<Vec<Range<usize>>, EvalError> {
let offset_end = if let Some(value) = value {
let list = value
.as_list()
.with_context(|_| DataTypeSnafu {
msg: "Accum's first element should be a list",
})?
.context(InternalSnafu {
reason: "Accum's first element should be a list",
})?;
let ret: Vec<usize> = list
.items()
.iter()
.map(|v| {
v.as_u64().map(|j| j as usize).context(InternalSnafu {
reason: "End offset should be a list of u64",
})
})
.try_collect()?;
ensure!(
ret.len() == expected_len,
InternalSnafu {
reason: "Offset List should have the same length as full_aggrs"
}
);
Ok(ret)
} else {
Ok(vec![1usize; expected_len])
}?;
let accum_ranges = (0..expected_len)
.map(|idx| {
if idx == 0 {
debug_assert!(
offset_end[0] >= 1,
"Offset should be at least 1: {:?}",
&offset_end
);
1..offset_end[0]
} else {
offset_end[idx - 1]..offset_end[idx]
}
})
.collect_vec();
Ok(accum_ranges)
}
#[cfg(test)]
mod test {
use std::time::Duration;
use common_time::Timestamp;
use datatypes::data_type::{ConcreteDataType, ConcreteDataType as CDT};
use hydroflow::scheduled::graph::Hydroflow;
use super::*;
use crate::compute::render::test::{get_output_handle, harness_test_ctx, run_and_check};
use crate::compute::state::DataflowState;
use crate::expr::{
self, AggregateExpr, AggregateFunc, BinaryFunc, GlobalId, MapFilterProject, UnaryFunc,
};
use crate::plan::Plan;
use crate::repr::{ColumnType, RelationType};
#[test]
fn test_tumble_group_by() {
let mut df = Hydroflow::new();
let mut state = DataflowState::default();
let mut ctx = harness_test_ctx(&mut df, &mut state);
const START: i64 = 1625097600000;
let rows = vec![
(1u32, START + 1000),
(2u32, START + 1500),
(3u32, START + 2000),
(1u32, START + 2500),
(2u32, START + 3000),
(3u32, START + 3500),
];
let rows = rows
.into_iter()
.map(|(number, ts)| {
(
Row::new(vec![number.into(), Timestamp::new_millisecond(ts).into()]),
1,
1,
)
})
.collect_vec();
let collection = ctx.render_constant(rows.clone());
ctx.insert_global(GlobalId::User(1), collection);
let aggr_expr = AggregateExpr {
func: AggregateFunc::SumUInt32,
expr: ScalarExpr::Column(0),
distinct: false,
};
let expected = TypedPlan {
schema: RelationType::new(vec![
ColumnType::new(CDT::uint64_datatype(), true), ColumnType::new(CDT::timestamp_millisecond_datatype(), false), ColumnType::new(CDT::timestamp_millisecond_datatype(), false), ])
.into_unnamed(),
plan: Plan::Mfp {
input: Box::new(
Plan::Reduce {
input: Box::new(
Plan::Get {
id: crate::expr::Id::Global(GlobalId::User(1)),
}
.with_types(
RelationType::new(vec![
ColumnType::new(ConcreteDataType::uint32_datatype(), false),
ColumnType::new(
ConcreteDataType::timestamp_millisecond_datatype(),
false,
),
])
.into_unnamed(),
),
),
key_val_plan: KeyValPlan {
key_plan: MapFilterProject::new(2)
.map(vec![
ScalarExpr::Column(1).call_unary(
UnaryFunc::TumbleWindowFloor {
window_size: Duration::from_nanos(1_000_000_000),
start_time: Some(Timestamp::new_millisecond(
1625097600000,
)),
},
),
ScalarExpr::Column(1).call_unary(
UnaryFunc::TumbleWindowCeiling {
window_size: Duration::from_nanos(1_000_000_000),
start_time: Some(Timestamp::new_millisecond(
1625097600000,
)),
},
),
])
.unwrap()
.project(vec![2, 3])
.unwrap()
.into_safe(),
val_plan: MapFilterProject::new(2)
.project(vec![0, 1])
.unwrap()
.into_safe(),
},
reduce_plan: ReducePlan::Accumulable(AccumulablePlan {
full_aggrs: vec![aggr_expr.clone()],
simple_aggrs: vec![AggrWithIndex::new(aggr_expr.clone(), 0, 0)],
distinct_aggrs: vec![],
}),
}
.with_types(
RelationType::new(vec![
ColumnType::new(CDT::timestamp_millisecond_datatype(), false), ColumnType::new(CDT::timestamp_millisecond_datatype(), false), ColumnType::new(CDT::uint64_datatype(), true), ])
.with_key(vec![1])
.with_time_index(Some(0))
.into_unnamed(),
),
),
mfp: MapFilterProject::new(3)
.map(vec![
ScalarExpr::Column(2),
ScalarExpr::Column(3),
ScalarExpr::Column(0),
ScalarExpr::Column(1),
])
.unwrap()
.project(vec![4, 5, 6])
.unwrap(),
},
};
let bundle = ctx.render_plan(expected).unwrap();
let output = get_output_handle(&mut ctx, bundle);
drop(ctx);
let expected = BTreeMap::from([(
1,
vec![
(
Row::new(vec![
3u64.into(),
Timestamp::new_millisecond(START + 1000).into(),
Timestamp::new_millisecond(START + 2000).into(),
]),
1,
1,
),
(
Row::new(vec![
4u64.into(),
Timestamp::new_millisecond(START + 2000).into(),
Timestamp::new_millisecond(START + 3000).into(),
]),
1,
1,
),
(
Row::new(vec![
5u64.into(),
Timestamp::new_millisecond(START + 3000).into(),
Timestamp::new_millisecond(START + 4000).into(),
]),
1,
1,
),
],
)]);
run_and_check(&mut state, &mut df, 1..2, expected, output);
}
#[test]
fn test_avg_eval() {
let mut df = Hydroflow::new();
let mut state = DataflowState::default();
let mut ctx = harness_test_ctx(&mut df, &mut state);
let rows = vec![
(Row::new(vec![1u32.into()]), 1, 1),
(Row::new(vec![2u32.into()]), 1, 1),
(Row::new(vec![3u32.into()]), 1, 1),
(Row::new(vec![1u32.into()]), 1, 1),
(Row::new(vec![2u32.into()]), 1, 1),
(Row::new(vec![3u32.into()]), 1, 1),
];
let collection = ctx.render_constant(rows.clone());
ctx.insert_global(GlobalId::User(1), collection);
let aggr_exprs = vec![
AggregateExpr {
func: AggregateFunc::SumUInt32,
expr: ScalarExpr::Column(0),
distinct: false,
},
AggregateExpr {
func: AggregateFunc::Count,
expr: ScalarExpr::Column(0),
distinct: false,
},
];
let avg_expr = ScalarExpr::If {
cond: Box::new(ScalarExpr::Column(1).call_binary(
ScalarExpr::Literal(Value::from(0u32), CDT::int64_datatype()),
BinaryFunc::NotEq,
)),
then: Box::new(ScalarExpr::Column(0).call_binary(
ScalarExpr::Column(1).call_unary(UnaryFunc::Cast(CDT::uint64_datatype())),
BinaryFunc::DivUInt64,
)),
els: Box::new(ScalarExpr::Literal(Value::Null, CDT::uint64_datatype())),
};
let expected = TypedPlan {
schema: RelationType::new(vec![ColumnType::new(CDT::uint64_datatype(), true)])
.into_unnamed(),
plan: Plan::Mfp {
input: Box::new(
Plan::Reduce {
input: Box::new(
Plan::Get {
id: crate::expr::Id::Global(GlobalId::User(1)),
}
.with_types(
RelationType::new(vec![ColumnType::new(
ConcreteDataType::int64_datatype(),
false,
)])
.into_unnamed(),
),
),
key_val_plan: KeyValPlan {
key_plan: MapFilterProject::new(1)
.project(vec![])
.unwrap()
.into_safe(),
val_plan: MapFilterProject::new(1)
.project(vec![0])
.unwrap()
.into_safe(),
},
reduce_plan: ReducePlan::Accumulable(AccumulablePlan {
full_aggrs: aggr_exprs.clone(),
simple_aggrs: vec![
AggrWithIndex::new(aggr_exprs[0].clone(), 0, 0),
AggrWithIndex::new(aggr_exprs[1].clone(), 0, 1),
],
distinct_aggrs: vec![],
}),
}
.with_types(
RelationType::new(vec![
ColumnType::new(ConcreteDataType::uint32_datatype(), true),
ColumnType::new(ConcreteDataType::int64_datatype(), true),
])
.into_unnamed(),
),
),
mfp: MapFilterProject::new(2)
.map(vec![
avg_expr,
ScalarExpr::Column(2),
])
.unwrap()
.project(vec![3])
.unwrap(),
},
};
let bundle = ctx.render_plan(expected).unwrap();
let output = get_output_handle(&mut ctx, bundle);
drop(ctx);
let expected = BTreeMap::from([(1, vec![(Row::new(vec![2u64.into()]), 1, 1)])]);
run_and_check(&mut state, &mut df, 1..2, expected, output);
}
#[test]
fn test_basic_distinct() {
let mut df = Hydroflow::new();
let mut state = DataflowState::default();
let mut ctx = harness_test_ctx(&mut df, &mut state);
let rows = vec![
(Row::new(vec![1i64.into()]), 1, 1),
(Row::new(vec![2i64.into()]), 2, 1),
(Row::new(vec![3i64.into()]), 3, 1),
(Row::new(vec![1i64.into()]), 4, 1),
(Row::new(vec![2i64.into()]), 5, 1),
(Row::new(vec![3i64.into()]), 6, 1),
];
let collection = ctx.render_constant(rows.clone());
ctx.insert_global(GlobalId::User(1), collection);
let input_plan = Plan::Get {
id: expr::Id::Global(GlobalId::User(1)),
};
let typ = RelationType::new(vec![ColumnType::new_nullable(
ConcreteDataType::int64_datatype(),
)]);
let key_val_plan = KeyValPlan {
key_plan: MapFilterProject::new(1).project([0]).unwrap().into_safe(),
val_plan: MapFilterProject::new(1).project([]).unwrap().into_safe(),
};
let reduce_plan = ReducePlan::Distinct;
let bundle = ctx
.render_reduce(
Box::new(input_plan.with_types(typ.into_unnamed())),
key_val_plan,
reduce_plan,
RelationType::empty(),
)
.unwrap();
let output = get_output_handle(&mut ctx, bundle);
drop(ctx);
let expected = BTreeMap::from([(
6,
vec![
(Row::new(vec![1i64.into()]), 1, 1),
(Row::new(vec![2i64.into()]), 2, 1),
(Row::new(vec![3i64.into()]), 3, 1),
],
)]);
run_and_check(&mut state, &mut df, 6..7, expected, output);
}
#[test]
fn test_basic_batch_reduce_accum() {
let mut df = Hydroflow::new();
let mut state = DataflowState::default();
let now = state.current_time_ref();
let mut ctx = harness_test_ctx(&mut df, &mut state);
let rows = vec![
(Row::new(vec![Value::Null]), -1, 1),
(Row::new(vec![1i64.into()]), 0, 1),
(Row::new(vec![Value::Null]), 1, 1),
(Row::new(vec![2i64.into()]), 2, 1),
(Row::new(vec![3i64.into()]), 3, 1),
(Row::new(vec![1i64.into()]), 4, 1),
(Row::new(vec![2i64.into()]), 5, 1),
(Row::new(vec![3i64.into()]), 6, 1),
];
let input_plan = Plan::Constant { rows: rows.clone() };
let typ = RelationType::new(vec![ColumnType::new_nullable(
ConcreteDataType::int64_datatype(),
)]);
let key_val_plan = KeyValPlan {
key_plan: MapFilterProject::new(1).project([]).unwrap().into_safe(),
val_plan: MapFilterProject::new(1).project([0]).unwrap().into_safe(),
};
let simple_aggrs = vec![AggrWithIndex::new(
AggregateExpr {
func: AggregateFunc::SumInt64,
expr: ScalarExpr::Column(0),
distinct: false,
},
0,
0,
)];
let accum_plan = AccumulablePlan {
full_aggrs: vec![AggregateExpr {
func: AggregateFunc::SumInt64,
expr: ScalarExpr::Column(0),
distinct: false,
}],
simple_aggrs,
distinct_aggrs: vec![],
};
let reduce_plan = ReducePlan::Accumulable(accum_plan);
let bundle = ctx
.render_reduce_batch(
Box::new(input_plan.with_types(typ.into_unnamed())),
&key_val_plan,
&reduce_plan,
&RelationType::new(vec![ColumnType::new(CDT::int64_datatype(), true)]),
)
.unwrap();
{
let now_inner = now.clone();
let expected = BTreeMap::<i64, Vec<i64>>::from([
(-1, vec![]),
(0, vec![1i64]),
(1, vec![1i64]),
(2, vec![3i64]),
(3, vec![6i64]),
(4, vec![7i64]),
(5, vec![9i64]),
(6, vec![12i64]),
]);
let collection = bundle.collection;
ctx.df
.add_subgraph_sink("test_sink", collection.into_inner(), move |_ctx, recv| {
let now = *now_inner.borrow();
let data = recv.take_inner();
let res = data.into_iter().flat_map(|v| v.into_iter()).collect_vec();
if let Some(expected) = expected.get(&now) {
let batch = expected.iter().map(|v| Value::from(*v)).collect_vec();
let batch = Batch::try_from_rows_with_types(
vec![batch.into()],
&[CDT::int64_datatype()],
)
.unwrap();
assert_eq!(res.first(), Some(&batch));
}
});
drop(ctx);
for now in 1..7 {
state.set_current_ts(now);
state.run_available_with_schedule(&mut df);
if !state.get_err_collector().is_empty() {
panic!(
"Errors occur: {:?}",
state.get_err_collector().get_all_blocking()
)
}
}
}
}
#[test]
fn test_basic_reduce_accum() {
let mut df = Hydroflow::new();
let mut state = DataflowState::default();
let mut ctx = harness_test_ctx(&mut df, &mut state);
let rows = vec![
(Row::new(vec![1i64.into()]), 1, 1),
(Row::new(vec![2i64.into()]), 2, 1),
(Row::new(vec![3i64.into()]), 3, 1),
(Row::new(vec![1i64.into()]), 4, 1),
(Row::new(vec![2i64.into()]), 5, 1),
(Row::new(vec![3i64.into()]), 6, 1),
];
let collection = ctx.render_constant(rows.clone());
ctx.insert_global(GlobalId::User(1), collection);
let input_plan = Plan::Get {
id: expr::Id::Global(GlobalId::User(1)),
};
let typ = RelationType::new(vec![ColumnType::new_nullable(
ConcreteDataType::int64_datatype(),
)]);
let key_val_plan = KeyValPlan {
key_plan: MapFilterProject::new(1).project([]).unwrap().into_safe(),
val_plan: MapFilterProject::new(1).project([0]).unwrap().into_safe(),
};
let simple_aggrs = vec![AggrWithIndex::new(
AggregateExpr {
func: AggregateFunc::SumInt64,
expr: ScalarExpr::Column(0),
distinct: false,
},
0,
0,
)];
let accum_plan = AccumulablePlan {
full_aggrs: vec![AggregateExpr {
func: AggregateFunc::SumInt64,
expr: ScalarExpr::Column(0),
distinct: false,
}],
simple_aggrs,
distinct_aggrs: vec![],
};
let reduce_plan = ReducePlan::Accumulable(accum_plan);
let bundle = ctx
.render_reduce(
Box::new(input_plan.with_types(typ.into_unnamed())),
key_val_plan,
reduce_plan,
RelationType::empty(),
)
.unwrap();
let output = get_output_handle(&mut ctx, bundle);
drop(ctx);
let expected = BTreeMap::from([
(1, vec![(Row::new(vec![1i64.into()]), 1, 1)]),
(2, vec![(Row::new(vec![3i64.into()]), 2, 1)]),
(3, vec![(Row::new(vec![6i64.into()]), 3, 1)]),
(4, vec![(Row::new(vec![7i64.into()]), 4, 1)]),
(5, vec![(Row::new(vec![9i64.into()]), 5, 1)]),
(6, vec![(Row::new(vec![12i64.into()]), 6, 1)]),
]);
run_and_check(&mut state, &mut df, 1..7, expected, output);
}
#[test]
fn test_delete_reduce_distinct_accum() {
let mut df = Hydroflow::new();
let mut state = DataflowState::default();
let mut ctx = harness_test_ctx(&mut df, &mut state);
let rows = vec![
(Row::new(vec![1i64.into()]), 1, 1),
(Row::new(vec![1i64.into()]), 1, -1),
(Row::new(vec![1i64.into()]), 2, 1),
(Row::new(vec![1i64.into()]), 3, -1),
(Row::new(vec![1i64.into()]), 4, 1),
(Row::new(vec![1i64.into()]), 4, -1),
(Row::new(vec![1i64.into()]), 4, 1),
];
let collection = ctx.render_constant(rows.clone());
ctx.insert_global(GlobalId::User(1), collection);
let input_plan = Plan::Get {
id: expr::Id::Global(GlobalId::User(1)),
};
let typ = RelationType::new(vec![ColumnType::new_nullable(
ConcreteDataType::int64_datatype(),
)]);
let key_val_plan = KeyValPlan {
key_plan: MapFilterProject::new(1).project([]).unwrap().into_safe(),
val_plan: MapFilterProject::new(1).project([0]).unwrap().into_safe(),
};
let distinct_aggrs = vec![AggrWithIndex::new(
AggregateExpr {
func: AggregateFunc::SumInt64,
expr: ScalarExpr::Column(0),
distinct: false,
},
0,
0,
)];
let accum_plan = AccumulablePlan {
full_aggrs: vec![AggregateExpr {
func: AggregateFunc::SumInt64,
expr: ScalarExpr::Column(0),
distinct: true,
}],
simple_aggrs: vec![],
distinct_aggrs,
};
let reduce_plan = ReducePlan::Accumulable(accum_plan);
let bundle = ctx
.render_reduce(
Box::new(input_plan.with_types(typ.into_unnamed())),
key_val_plan,
reduce_plan,
RelationType::empty(),
)
.unwrap();
let output = get_output_handle(&mut ctx, bundle);
drop(ctx);
let expected = BTreeMap::from([
(1, vec![(Row::new(vec![0i64.into()]), 1, 1)]),
(2, vec![(Row::new(vec![1i64.into()]), 2, 1)]),
(3, vec![(Row::new(vec![0i64.into()]), 3, 1)]),
(4, vec![(Row::new(vec![1i64.into()]), 4, 1)]),
]);
run_and_check(&mut state, &mut df, 1..7, expected, output);
}
#[test]
fn test_basic_reduce_distinct_accum() {
let mut df = Hydroflow::new();
let mut state = DataflowState::default();
let mut ctx = harness_test_ctx(&mut df, &mut state);
let rows = vec![
(Row::new(vec![1i64.into()]), 1, 1),
(Row::new(vec![1i64.into()]), 1, -1),
(Row::new(vec![2i64.into()]), 2, 1),
(Row::new(vec![3i64.into()]), 3, 1),
(Row::new(vec![1i64.into()]), 4, 1),
(Row::new(vec![2i64.into()]), 5, 1),
(Row::new(vec![3i64.into()]), 6, 1),
(Row::new(vec![1i64.into()]), 7, 1),
];
let collection = ctx.render_constant(rows.clone());
ctx.insert_global(GlobalId::User(1), collection);
let input_plan = Plan::Get {
id: expr::Id::Global(GlobalId::User(1)),
};
let typ = RelationType::new(vec![ColumnType::new_nullable(
ConcreteDataType::int64_datatype(),
)]);
let key_val_plan = KeyValPlan {
key_plan: MapFilterProject::new(1).project([]).unwrap().into_safe(),
val_plan: MapFilterProject::new(1).project([0]).unwrap().into_safe(),
};
let distinct_aggrs = vec![AggrWithIndex::new(
AggregateExpr {
func: AggregateFunc::SumInt64,
expr: ScalarExpr::Column(0),
distinct: false,
},
0,
0,
)];
let accum_plan = AccumulablePlan {
full_aggrs: vec![AggregateExpr {
func: AggregateFunc::SumInt64,
expr: ScalarExpr::Column(0),
distinct: true,
}],
simple_aggrs: vec![],
distinct_aggrs,
};
let reduce_plan = ReducePlan::Accumulable(accum_plan);
let bundle = ctx
.render_reduce(
Box::new(input_plan.with_types(typ.into_unnamed())),
key_val_plan,
reduce_plan,
RelationType::empty(),
)
.unwrap();
let output = get_output_handle(&mut ctx, bundle);
drop(ctx);
let expected = BTreeMap::from([
(1, vec![(Row::new(vec![0i64.into()]), 1, 1)]),
(2, vec![(Row::new(vec![2i64.into()]), 2, 1)]),
(3, vec![(Row::new(vec![5i64.into()]), 3, 1)]),
(4, vec![(Row::new(vec![6i64.into()]), 4, 1)]),
(5, vec![(Row::new(vec![6i64.into()]), 5, 1)]),
(6, vec![(Row::new(vec![6i64.into()]), 6, 1)]),
(7, vec![(Row::new(vec![6i64.into()]), 7, 1)]),
]);
run_and_check(&mut state, &mut df, 1..7, expected, output);
}
#[test]
fn test_composite_reduce_distinct_accum() {
let mut df = Hydroflow::new();
let mut state = DataflowState::default();
let mut ctx = harness_test_ctx(&mut df, &mut state);
let rows = vec![
(Row::new(vec![1i64.into()]), 1, 1),
(Row::new(vec![2i64.into()]), 2, 1),
(Row::new(vec![3i64.into()]), 3, 1),
(Row::new(vec![1i64.into()]), 4, 1),
(Row::new(vec![2i64.into()]), 5, 1),
(Row::new(vec![3i64.into()]), 6, 1),
(Row::new(vec![1i64.into()]), 7, 1),
];
let collection = ctx.render_constant(rows.clone());
ctx.insert_global(GlobalId::User(1), collection);
let input_plan = Plan::Get {
id: expr::Id::Global(GlobalId::User(1)),
};
let typ = RelationType::new(vec![ColumnType::new_nullable(
ConcreteDataType::int64_datatype(),
)]);
let key_val_plan = KeyValPlan {
key_plan: MapFilterProject::new(1).project([]).unwrap().into_safe(),
val_plan: MapFilterProject::new(1).project([0]).unwrap().into_safe(),
};
let simple_aggrs = vec![AggrWithIndex::new(
AggregateExpr {
func: AggregateFunc::SumInt64,
expr: ScalarExpr::Column(0),
distinct: false,
},
0,
0,
)];
let distinct_aggrs = vec![AggrWithIndex::new(
AggregateExpr {
func: AggregateFunc::SumInt64,
expr: ScalarExpr::Column(0),
distinct: true,
},
0,
1,
)];
let accum_plan = AccumulablePlan {
full_aggrs: vec![
AggregateExpr {
func: AggregateFunc::SumInt64,
expr: ScalarExpr::Column(0),
distinct: false,
},
AggregateExpr {
func: AggregateFunc::SumInt64,
expr: ScalarExpr::Column(0),
distinct: true,
},
],
simple_aggrs,
distinct_aggrs,
};
let reduce_plan = ReducePlan::Accumulable(accum_plan);
let bundle = ctx
.render_reduce(
Box::new(input_plan.with_types(typ.into_unnamed())),
key_val_plan,
reduce_plan,
RelationType::empty(),
)
.unwrap();
let output = get_output_handle(&mut ctx, bundle);
drop(ctx);
let expected = BTreeMap::from([
(1, vec![(Row::new(vec![1i64.into(), 1i64.into()]), 1, 1)]),
(2, vec![(Row::new(vec![3i64.into(), 3i64.into()]), 2, 1)]),
(3, vec![(Row::new(vec![6i64.into(), 6i64.into()]), 3, 1)]),
(4, vec![(Row::new(vec![7i64.into(), 6i64.into()]), 4, 1)]),
(5, vec![(Row::new(vec![9i64.into(), 6i64.into()]), 5, 1)]),
(6, vec![(Row::new(vec![12i64.into(), 6i64.into()]), 6, 1)]),
(7, vec![(Row::new(vec![13i64.into(), 6i64.into()]), 7, 1)]),
]);
run_and_check(&mut state, &mut df, 1..7, expected, output);
}
}