promql/extension_plan/
series_divide.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16use std::pin::Pin;
17use std::sync::Arc;
18use std::task::{Context, Poll};
19
20use datafusion::arrow::array::{
21    Array, LargeStringArray, StringArray, StringViewArray, UInt64Array,
22};
23use datafusion::arrow::datatypes::{DataType, SchemaRef};
24use datafusion::arrow::record_batch::RecordBatch;
25use datafusion::common::{DFSchema, DFSchemaRef};
26use datafusion::error::Result as DataFusionResult;
27use datafusion::execution::context::TaskContext;
28use datafusion::logical_expr::{EmptyRelation, Expr, LogicalPlan, UserDefinedLogicalNodeCore};
29use datafusion::physical_expr::{LexRequirement, OrderingRequirements, PhysicalSortRequirement};
30use datafusion::physical_plan::expressions::Column as ColumnExpr;
31use datafusion::physical_plan::metrics::{
32    BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, MetricValue, MetricsSet,
33};
34use datafusion::physical_plan::{
35    DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, PlanProperties, RecordBatchStream,
36    SendableRecordBatchStream,
37};
38use datafusion_expr::col;
39use datatypes::arrow::compute;
40use datatypes::compute::SortOptions;
41use futures::{Stream, StreamExt, ready};
42use greptime_proto::substrait_extension as pb;
43use prost::Message;
44use snafu::ResultExt;
45
46use crate::error::{DeserializeSnafu, Result};
47use crate::extension_plan::{METRIC_NUM_SERIES, resolve_column_name, serialize_column_index};
48use crate::metrics::PROMQL_SERIES_COUNT;
49
50enum TagIdentifier<'a> {
51    /// A group of raw string tag columns.
52    Raw(Vec<RawTagColumn<'a>>),
53    /// A single UInt64 identifier (tsid).
54    Id(&'a UInt64Array),
55}
56
57impl<'a> TagIdentifier<'a> {
58    fn try_new(batch: &'a RecordBatch, tag_indices: &[usize]) -> DataFusionResult<Self> {
59        match tag_indices {
60            [] => Ok(Self::Raw(Vec::new())),
61            [index] => {
62                let array = batch.column(*index);
63                if array.data_type() == &DataType::UInt64 {
64                    let array = array
65                        .as_any()
66                        .downcast_ref::<UInt64Array>()
67                        .ok_or_else(|| {
68                            datafusion::error::DataFusionError::Internal(
69                                "Failed to downcast tag column to UInt64Array".to_string(),
70                            )
71                        })?;
72                    Ok(Self::Id(array))
73                } else {
74                    Ok(Self::Raw(vec![RawTagColumn::try_new(array.as_ref())?]))
75                }
76            }
77            indices => Ok(Self::Raw(
78                indices
79                    .iter()
80                    .map(|index| RawTagColumn::try_new(batch.column(*index).as_ref()))
81                    .collect::<DataFusionResult<Vec<_>>>()?,
82            )),
83        }
84    }
85
86    fn equal_at(&self, left_row: usize, other: &Self, right_row: usize) -> DataFusionResult<bool> {
87        match (self, other) {
88            (Self::Id(left), Self::Id(right)) => {
89                if left.is_null(left_row) || right.is_null(right_row) {
90                    return Ok(left.is_null(left_row) && right.is_null(right_row));
91                }
92                Ok(left.value(left_row) == right.value(right_row))
93            }
94            (Self::Raw(left), Self::Raw(right)) => {
95                if left.len() != right.len() {
96                    return Err(datafusion::error::DataFusionError::Internal(format!(
97                        "Mismatched tag column count: left={}, right={}",
98                        left.len(),
99                        right.len()
100                    )));
101                }
102
103                for (left_column, right_column) in left.iter().zip(right.iter()) {
104                    if !left_column.equal_at(left_row, right_column, right_row)? {
105                        return Ok(false);
106                    }
107                }
108                Ok(true)
109            }
110            _ => Err(datafusion::error::DataFusionError::Internal(format!(
111                "Mismatched tag identifier types: left={:?}, right={:?}",
112                self.data_type(),
113                other.data_type()
114            ))),
115        }
116    }
117
118    fn data_type(&self) -> &'static str {
119        match self {
120            Self::Raw(_) => "Raw",
121            Self::Id(_) => "Id",
122        }
123    }
124}
125
126enum RawTagColumn<'a> {
127    Utf8(&'a StringArray),
128    LargeUtf8(&'a LargeStringArray),
129    Utf8View(&'a StringViewArray),
130}
131
132impl<'a> RawTagColumn<'a> {
133    fn try_new(array: &'a dyn Array) -> DataFusionResult<Self> {
134        match array.data_type() {
135            DataType::Utf8 => array
136                .as_any()
137                .downcast_ref::<StringArray>()
138                .map(Self::Utf8)
139                .ok_or_else(|| {
140                    datafusion::error::DataFusionError::Internal(
141                        "Failed to downcast tag column to StringArray".to_string(),
142                    )
143                }),
144            DataType::LargeUtf8 => array
145                .as_any()
146                .downcast_ref::<LargeStringArray>()
147                .map(Self::LargeUtf8)
148                .ok_or_else(|| {
149                    datafusion::error::DataFusionError::Internal(
150                        "Failed to downcast tag column to LargeStringArray".to_string(),
151                    )
152                }),
153            DataType::Utf8View => array
154                .as_any()
155                .downcast_ref::<StringViewArray>()
156                .map(Self::Utf8View)
157                .ok_or_else(|| {
158                    datafusion::error::DataFusionError::Internal(
159                        "Failed to downcast tag column to StringViewArray".to_string(),
160                    )
161                }),
162            other => Err(datafusion::error::DataFusionError::Internal(format!(
163                "Unsupported tag column type: {other:?}"
164            ))),
165        }
166    }
167
168    fn is_null(&self, row: usize) -> bool {
169        match self {
170            Self::Utf8(array) => array.is_null(row),
171            Self::LargeUtf8(array) => array.is_null(row),
172            Self::Utf8View(array) => array.is_null(row),
173        }
174    }
175
176    fn value(&self, row: usize) -> &str {
177        match self {
178            Self::Utf8(array) => array.value(row),
179            Self::LargeUtf8(array) => array.value(row),
180            Self::Utf8View(array) => array.value(row),
181        }
182    }
183
184    fn equal_at(&self, left_row: usize, other: &Self, right_row: usize) -> DataFusionResult<bool> {
185        if self.is_null(left_row) || other.is_null(right_row) {
186            return Ok(self.is_null(left_row) && other.is_null(right_row));
187        }
188
189        Ok(self.value(left_row) == other.value(right_row))
190    }
191}
192
193#[derive(Debug, PartialEq, Eq, Hash, PartialOrd)]
194pub struct SeriesDivide {
195    tag_columns: Vec<String>,
196    /// `SeriesDivide` requires `time_index` column's name to generate ordering requirement
197    /// for input data. But this plan itself doesn't depend on the ordering of time index
198    /// column. This is for follow on plans like `RangeManipulate`. Because requiring ordering
199    /// here can avoid unnecessary sort in follow on plans.
200    time_index_column: String,
201    input: LogicalPlan,
202    unfix: Option<UnfixIndices>,
203}
204
205#[derive(Debug, PartialEq, Eq, Hash, PartialOrd)]
206struct UnfixIndices {
207    pub tag_column_indices: Vec<u64>,
208    pub time_index_column_idx: u64,
209}
210
211impl UserDefinedLogicalNodeCore for SeriesDivide {
212    fn name(&self) -> &str {
213        Self::name()
214    }
215
216    fn inputs(&self) -> Vec<&LogicalPlan> {
217        vec![&self.input]
218    }
219
220    fn schema(&self) -> &DFSchemaRef {
221        self.input.schema()
222    }
223
224    fn expressions(&self) -> Vec<Expr> {
225        if self.unfix.is_some() {
226            return vec![];
227        }
228
229        self.tag_columns
230            .iter()
231            .map(col)
232            .chain(std::iter::once(col(&self.time_index_column)))
233            .collect()
234    }
235
236    fn necessary_children_exprs(&self, output_columns: &[usize]) -> Option<Vec<Vec<usize>>> {
237        if self.unfix.is_some() {
238            return None;
239        }
240
241        let input_schema = self.input.schema();
242        if output_columns.is_empty() {
243            let indices = (0..input_schema.fields().len()).collect::<Vec<_>>();
244            return Some(vec![indices]);
245        }
246
247        let mut required = Vec::with_capacity(output_columns.len() + 1 + self.tag_columns.len());
248        required.extend_from_slice(output_columns);
249        for tag in &self.tag_columns {
250            required.push(input_schema.index_of_column_by_name(None, tag)?);
251        }
252        required.push(input_schema.index_of_column_by_name(None, &self.time_index_column)?);
253
254        required.sort_unstable();
255        required.dedup();
256        Some(vec![required])
257    }
258
259    fn fmt_for_explain(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
260        write!(f, "PromSeriesDivide: tags={:?}", self.tag_columns)
261    }
262
263    fn with_exprs_and_inputs(
264        &self,
265        _exprs: Vec<Expr>,
266        inputs: Vec<LogicalPlan>,
267    ) -> DataFusionResult<Self> {
268        if inputs.is_empty() {
269            return Err(datafusion::error::DataFusionError::Internal(
270                "SeriesDivide must have at least one input".to_string(),
271            ));
272        }
273
274        let input: LogicalPlan = inputs[0].clone();
275        let input_schema = input.schema();
276
277        if let Some(unfix) = &self.unfix {
278            // transform indices to names
279            let tag_columns = unfix
280                .tag_column_indices
281                .iter()
282                .map(|idx| resolve_column_name(*idx, input_schema, "SeriesDivide", "tag"))
283                .collect::<DataFusionResult<Vec<String>>>()?;
284
285            let time_index_column = resolve_column_name(
286                unfix.time_index_column_idx,
287                input_schema,
288                "SeriesDivide",
289                "time index",
290            )?;
291
292            Ok(Self {
293                tag_columns,
294                time_index_column,
295                input,
296                unfix: None,
297            })
298        } else {
299            Ok(Self {
300                tag_columns: self.tag_columns.clone(),
301                time_index_column: self.time_index_column.clone(),
302                input,
303                unfix: None,
304            })
305        }
306    }
307}
308
309impl SeriesDivide {
310    pub fn new(tag_columns: Vec<String>, time_index_column: String, input: LogicalPlan) -> Self {
311        Self {
312            tag_columns,
313            time_index_column,
314            input,
315            unfix: None,
316        }
317    }
318
319    pub const fn name() -> &'static str {
320        "SeriesDivide"
321    }
322
323    pub fn to_execution_plan(&self, exec_input: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
324        Arc::new(SeriesDivideExec {
325            tag_columns: self.tag_columns.clone(),
326            time_index_column: self.time_index_column.clone(),
327            input: exec_input,
328            metric: ExecutionPlanMetricsSet::new(),
329        })
330    }
331
332    pub fn tags(&self) -> &[String] {
333        &self.tag_columns
334    }
335
336    pub fn serialize(&self) -> Vec<u8> {
337        let tag_column_indices = self
338            .tag_columns
339            .iter()
340            .map(|name| serialize_column_index(self.input.schema(), name))
341            .collect::<Vec<u64>>();
342
343        let time_index_column_idx =
344            serialize_column_index(self.input.schema(), &self.time_index_column);
345
346        pb::SeriesDivide {
347            tag_column_indices,
348            time_index_column_idx,
349            ..Default::default()
350        }
351        .encode_to_vec()
352    }
353
354    pub fn deserialize(bytes: &[u8]) -> Result<Self> {
355        let pb_series_divide = pb::SeriesDivide::decode(bytes).context(DeserializeSnafu)?;
356        let placeholder_plan = LogicalPlan::EmptyRelation(EmptyRelation {
357            produce_one_row: false,
358            schema: Arc::new(DFSchema::empty()),
359        });
360
361        let unfix = UnfixIndices {
362            tag_column_indices: pb_series_divide.tag_column_indices.clone(),
363            time_index_column_idx: pb_series_divide.time_index_column_idx,
364        };
365
366        Ok(Self {
367            tag_columns: Vec::new(),
368            time_index_column: String::new(),
369            input: placeholder_plan,
370            unfix: Some(unfix),
371        })
372    }
373}
374
375#[derive(Debug)]
376pub struct SeriesDivideExec {
377    tag_columns: Vec<String>,
378    time_index_column: String,
379    input: Arc<dyn ExecutionPlan>,
380    metric: ExecutionPlanMetricsSet,
381}
382
383impl ExecutionPlan for SeriesDivideExec {
384    fn as_any(&self) -> &dyn Any {
385        self
386    }
387
388    fn schema(&self) -> SchemaRef {
389        self.input.schema()
390    }
391
392    fn properties(&self) -> &PlanProperties {
393        self.input.properties()
394    }
395
396    fn required_input_distribution(&self) -> Vec<Distribution> {
397        let schema = self.input.schema();
398        vec![Distribution::HashPartitioned(
399            self.tag_columns
400                .iter()
401                // Safety: the tag column names is verified in the planning phase
402                .map(|tag| Arc::new(ColumnExpr::new_with_schema(tag, &schema).unwrap()) as _)
403                .collect(),
404        )]
405    }
406
407    fn required_input_ordering(&self) -> Vec<Option<OrderingRequirements>> {
408        let input_schema = self.input.schema();
409        let mut exprs: Vec<PhysicalSortRequirement> = self
410            .tag_columns
411            .iter()
412            .map(|tag| PhysicalSortRequirement {
413                // Safety: the tag column names is verified in the planning phase
414                expr: Arc::new(ColumnExpr::new_with_schema(tag, &input_schema).unwrap()),
415                options: Some(SortOptions {
416                    descending: false,
417                    nulls_first: true,
418                }),
419            })
420            .collect();
421
422        exprs.push(PhysicalSortRequirement {
423            expr: Arc::new(
424                ColumnExpr::new_with_schema(&self.time_index_column, &input_schema).unwrap(),
425            ),
426            options: Some(SortOptions {
427                descending: false,
428                nulls_first: true,
429            }),
430        });
431
432        // Safety: `exprs` is not empty
433        let requirement = LexRequirement::new(exprs).unwrap();
434
435        vec![Some(OrderingRequirements::Hard(vec![requirement]))]
436    }
437
438    fn maintains_input_order(&self) -> Vec<bool> {
439        vec![true; self.children().len()]
440    }
441
442    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
443        vec![&self.input]
444    }
445
446    fn with_new_children(
447        self: Arc<Self>,
448        children: Vec<Arc<dyn ExecutionPlan>>,
449    ) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
450        assert!(!children.is_empty());
451        Ok(Arc::new(Self {
452            tag_columns: self.tag_columns.clone(),
453            time_index_column: self.time_index_column.clone(),
454            input: children[0].clone(),
455            metric: self.metric.clone(),
456        }))
457    }
458
459    fn execute(
460        &self,
461        partition: usize,
462        context: Arc<TaskContext>,
463    ) -> DataFusionResult<SendableRecordBatchStream> {
464        let baseline_metric = BaselineMetrics::new(&self.metric, partition);
465        let metrics_builder = MetricBuilder::new(&self.metric);
466        let num_series = Count::new();
467        metrics_builder
468            .with_partition(partition)
469            .build(MetricValue::Count {
470                name: METRIC_NUM_SERIES.into(),
471                count: num_series.clone(),
472            });
473
474        let input = self.input.execute(partition, context)?;
475        let schema = input.schema();
476        let tag_indices = self
477            .tag_columns
478            .iter()
479            .map(|tag| {
480                schema
481                    .column_with_name(tag)
482                    .unwrap_or_else(|| panic!("tag column not found {tag}"))
483                    .0
484            })
485            .collect();
486        Ok(Box::pin(SeriesDivideStream {
487            tag_indices,
488            buffer: vec![],
489            schema,
490            input,
491            metric: baseline_metric,
492            num_series,
493            inspect_start: 0,
494        }))
495    }
496
497    fn metrics(&self) -> Option<MetricsSet> {
498        Some(self.metric.clone_inner())
499    }
500
501    fn name(&self) -> &str {
502        "SeriesDivideExec"
503    }
504}
505
506impl DisplayAs for SeriesDivideExec {
507    fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
508        match t {
509            DisplayFormatType::Default
510            | DisplayFormatType::Verbose
511            | DisplayFormatType::TreeRender => {
512                write!(f, "PromSeriesDivideExec: tags={:?}", self.tag_columns)
513            }
514        }
515    }
516}
517
518/// Assume the input stream is ordered on the tag columns.
519pub struct SeriesDivideStream {
520    tag_indices: Vec<usize>,
521    buffer: Vec<RecordBatch>,
522    schema: SchemaRef,
523    input: SendableRecordBatchStream,
524    metric: BaselineMetrics,
525    /// Index of buffered batches to start inspect next time.
526    inspect_start: usize,
527    /// Number of series processed.
528    num_series: Count,
529}
530
531impl RecordBatchStream for SeriesDivideStream {
532    fn schema(&self) -> SchemaRef {
533        self.schema.clone()
534    }
535}
536
537impl Stream for SeriesDivideStream {
538    type Item = DataFusionResult<RecordBatch>;
539
540    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
541        loop {
542            if !self.buffer.is_empty() {
543                let timer = std::time::Instant::now();
544                let cut_at = match self.find_first_diff_row() {
545                    Ok(cut_at) => cut_at,
546                    Err(e) => return Poll::Ready(Some(Err(e))),
547                };
548                if let Some((batch_index, row_index)) = cut_at {
549                    // slice out the first time series and return it.
550                    let half_batch_of_first_series =
551                        self.buffer[batch_index].slice(0, row_index + 1);
552                    let half_batch_of_second_series = self.buffer[batch_index].slice(
553                        row_index + 1,
554                        self.buffer[batch_index].num_rows() - row_index - 1,
555                    );
556                    let result_batches = self
557                        .buffer
558                        .drain(0..batch_index)
559                        .chain([half_batch_of_first_series])
560                        .collect::<Vec<_>>();
561                    if half_batch_of_second_series.num_rows() > 0 {
562                        self.buffer[0] = half_batch_of_second_series;
563                    } else {
564                        self.buffer.remove(0);
565                    }
566                    let result_batch = compute::concat_batches(&self.schema, &result_batches)?;
567
568                    self.inspect_start = 0;
569                    self.num_series.add(1);
570                    self.metric.elapsed_compute().add_elapsed(timer);
571                    return Poll::Ready(Some(Ok(result_batch)));
572                } else {
573                    self.metric.elapsed_compute().add_elapsed(timer);
574                    // continue to fetch next batch as the current buffer only contains one time series.
575                    let next_batch = ready!(self.as_mut().fetch_next_batch(cx)).transpose()?;
576                    let timer = std::time::Instant::now();
577                    if let Some(next_batch) = next_batch {
578                        if next_batch.num_rows() != 0 {
579                            self.buffer.push(next_batch);
580                        }
581                        continue;
582                    } else {
583                        // input stream is ended
584                        let result = compute::concat_batches(&self.schema, &self.buffer)?;
585                        self.buffer.clear();
586                        self.inspect_start = 0;
587                        self.num_series.add(1);
588                        self.metric.elapsed_compute().add_elapsed(timer);
589                        return Poll::Ready(Some(Ok(result)));
590                    }
591                }
592            } else {
593                let batch = match ready!(self.as_mut().fetch_next_batch(cx)) {
594                    Some(Ok(batch)) => batch,
595                    None => {
596                        PROMQL_SERIES_COUNT.observe(self.num_series.value() as f64);
597                        return Poll::Ready(None);
598                    }
599                    error => return Poll::Ready(error),
600                };
601                self.buffer.push(batch);
602                continue;
603            }
604        }
605    }
606}
607
608impl SeriesDivideStream {
609    fn fetch_next_batch(
610        mut self: Pin<&mut Self>,
611        cx: &mut Context<'_>,
612    ) -> Poll<Option<DataFusionResult<RecordBatch>>> {
613        let poll = self.input.poll_next_unpin(cx);
614        self.metric.record_poll(poll)
615    }
616
617    /// Return the position to cut buffer.
618    /// None implies the current buffer only contains one time series.
619    fn find_first_diff_row(&mut self) -> DataFusionResult<Option<(usize, usize)>> {
620        // fast path: no tag columns means all data belongs to the same series.
621        if self.tag_indices.is_empty() {
622            return Ok(None);
623        }
624
625        let mut resumed_batch_index = self.inspect_start;
626
627        for batch in &self.buffer[resumed_batch_index..] {
628            let num_rows = batch.num_rows();
629            let tags = TagIdentifier::try_new(batch, &self.tag_indices)?;
630
631            // check if the first row is the same with last batch's last row
632            if resumed_batch_index > self.inspect_start.checked_sub(1).unwrap_or_default() {
633                let last_batch = &self.buffer[resumed_batch_index - 1];
634                let last_row = last_batch.num_rows() - 1;
635                let last_tags = TagIdentifier::try_new(last_batch, &self.tag_indices)?;
636                if !tags.equal_at(0, &last_tags, last_row)? {
637                    return Ok(Some((resumed_batch_index - 1, last_row)));
638                }
639            }
640
641            // quick check if all rows are the same by comparing the first and last row in this batch
642            if tags.equal_at(0, &tags, num_rows - 1)? {
643                resumed_batch_index += 1;
644                continue;
645            }
646
647            let mut same_until = 0;
648            while same_until < num_rows - 1 {
649                if !tags.equal_at(same_until, &tags, same_until + 1)? {
650                    break;
651                }
652                same_until += 1;
653            }
654
655            if same_until + 1 >= num_rows {
656                // all rows are the same, inspect next batch
657                resumed_batch_index += 1;
658            } else {
659                return Ok(Some((resumed_batch_index, same_until)));
660            }
661        }
662
663        self.inspect_start = resumed_batch_index;
664        Ok(None)
665    }
666}
667
668#[cfg(test)]
669mod test {
670    use datafusion::arrow::datatypes::{DataType, Field, Schema};
671    use datafusion::common::ToDFSchema;
672    use datafusion::datasource::memory::MemorySourceConfig;
673    use datafusion::datasource::source::DataSourceExec;
674    use datafusion::logical_expr::{EmptyRelation, LogicalPlan};
675    use datafusion::prelude::SessionContext;
676
677    use super::*;
678
679    fn prepare_test_data() -> DataSourceExec {
680        let schema = Arc::new(Schema::new(vec![
681            Field::new("host", DataType::Utf8, true),
682            Field::new("path", DataType::Utf8, true),
683            Field::new(
684                "time_index",
685                DataType::Timestamp(datafusion::arrow::datatypes::TimeUnit::Millisecond, None),
686                false,
687            ),
688        ]));
689
690        let path_column_1 = Arc::new(StringArray::from(vec![
691            "foo", "foo", "foo", "bar", "bar", "bar", "bar", "bar", "bar", "bla", "bla", "bla",
692        ])) as _;
693        let host_column_1 = Arc::new(StringArray::from(vec![
694            "000", "000", "001", "002", "002", "002", "002", "002", "003", "005", "005", "005",
695        ])) as _;
696        let time_index_column_1 = Arc::new(
697            datafusion::arrow::array::TimestampMillisecondArray::from(vec![
698                1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000, 10000, 11000, 12000,
699            ]),
700        ) as _;
701
702        let path_column_2 = Arc::new(StringArray::from(vec!["bla", "bla", "bla"])) as _;
703        let host_column_2 = Arc::new(StringArray::from(vec!["005", "005", "005"])) as _;
704        let time_index_column_2 = Arc::new(
705            datafusion::arrow::array::TimestampMillisecondArray::from(vec![13000, 14000, 15000]),
706        ) as _;
707
708        let path_column_3 = Arc::new(StringArray::from(vec![
709            "bla", "🥺", "🥺", "🥺", "🥺", "🥺", "🫠", "🫠",
710        ])) as _;
711        let host_column_3 = Arc::new(StringArray::from(vec![
712            "005", "001", "001", "001", "001", "001", "001", "001",
713        ])) as _;
714        let time_index_column_3 =
715            Arc::new(datafusion::arrow::array::TimestampMillisecondArray::from(
716                vec![16000, 17000, 18000, 19000, 20000, 21000, 22000, 23000],
717            )) as _;
718
719        let data_1 = RecordBatch::try_new(
720            schema.clone(),
721            vec![path_column_1, host_column_1, time_index_column_1],
722        )
723        .unwrap();
724        let data_2 = RecordBatch::try_new(
725            schema.clone(),
726            vec![path_column_2, host_column_2, time_index_column_2],
727        )
728        .unwrap();
729        let data_3 = RecordBatch::try_new(
730            schema.clone(),
731            vec![path_column_3, host_column_3, time_index_column_3],
732        )
733        .unwrap();
734
735        DataSourceExec::new(Arc::new(
736            MemorySourceConfig::try_new(&[vec![data_1, data_2, data_3]], schema, None).unwrap(),
737        ))
738    }
739
740    #[test]
741    fn pruning_should_keep_tags_and_time_index_columns_for_exec() {
742        let df_schema = prepare_test_data().schema().to_dfschema_ref().unwrap();
743        let input = LogicalPlan::EmptyRelation(EmptyRelation {
744            produce_one_row: false,
745            schema: df_schema,
746        });
747        let plan = SeriesDivide::new(
748            vec!["host".to_string(), "path".to_string()],
749            "time_index".to_string(),
750            input,
751        );
752
753        // Simulate a parent projection requesting only the `host` column.
754        let output_columns = [0usize];
755        let required = plan.necessary_children_exprs(&output_columns).unwrap();
756        let required = &required[0];
757        assert_eq!(required.as_slice(), &[0, 1, 2]);
758    }
759
760    #[tokio::test]
761    async fn overall_data() {
762        let memory_exec = Arc::new(prepare_test_data());
763        let divide_exec = Arc::new(SeriesDivideExec {
764            tag_columns: vec!["host".to_string(), "path".to_string()],
765            time_index_column: "time_index".to_string(),
766            input: memory_exec,
767            metric: ExecutionPlanMetricsSet::new(),
768        });
769        let session_context = SessionContext::default();
770        let result = datafusion::physical_plan::collect(divide_exec, session_context.task_ctx())
771            .await
772            .unwrap();
773        let result_literal = datatypes::arrow::util::pretty::pretty_format_batches(&result)
774            .unwrap()
775            .to_string();
776
777        let expected = String::from(
778            "+------+------+---------------------+\
779            \n| host | path | time_index          |\
780            \n+------+------+---------------------+\
781            \n| foo  | 000  | 1970-01-01T00:00:01 |\
782            \n| foo  | 000  | 1970-01-01T00:00:02 |\
783            \n| foo  | 001  | 1970-01-01T00:00:03 |\
784            \n| bar  | 002  | 1970-01-01T00:00:04 |\
785            \n| bar  | 002  | 1970-01-01T00:00:05 |\
786            \n| bar  | 002  | 1970-01-01T00:00:06 |\
787            \n| bar  | 002  | 1970-01-01T00:00:07 |\
788            \n| bar  | 002  | 1970-01-01T00:00:08 |\
789            \n| bar  | 003  | 1970-01-01T00:00:09 |\
790            \n| bla  | 005  | 1970-01-01T00:00:10 |\
791            \n| bla  | 005  | 1970-01-01T00:00:11 |\
792            \n| bla  | 005  | 1970-01-01T00:00:12 |\
793            \n| bla  | 005  | 1970-01-01T00:00:13 |\
794            \n| bla  | 005  | 1970-01-01T00:00:14 |\
795            \n| bla  | 005  | 1970-01-01T00:00:15 |\
796            \n| bla  | 005  | 1970-01-01T00:00:16 |\
797            \n| 🥺   | 001  | 1970-01-01T00:00:17 |\
798            \n| 🥺   | 001  | 1970-01-01T00:00:18 |\
799            \n| 🥺   | 001  | 1970-01-01T00:00:19 |\
800            \n| 🥺   | 001  | 1970-01-01T00:00:20 |\
801            \n| 🥺   | 001  | 1970-01-01T00:00:21 |\
802            \n| 🫠   | 001  | 1970-01-01T00:00:22 |\
803            \n| 🫠   | 001  | 1970-01-01T00:00:23 |\
804            \n+------+------+---------------------+",
805        );
806        assert_eq!(result_literal, expected);
807    }
808
809    #[tokio::test]
810    async fn per_batch_data() {
811        let memory_exec = Arc::new(prepare_test_data());
812        let divide_exec = Arc::new(SeriesDivideExec {
813            tag_columns: vec!["host".to_string(), "path".to_string()],
814            time_index_column: "time_index".to_string(),
815            input: memory_exec,
816            metric: ExecutionPlanMetricsSet::new(),
817        });
818        let mut divide_stream = divide_exec
819            .execute(0, SessionContext::default().task_ctx())
820            .unwrap();
821
822        let mut expectations = vec![
823            String::from(
824                "+------+------+---------------------+\
825                \n| host | path | time_index          |\
826                \n+------+------+---------------------+\
827                \n| foo  | 000  | 1970-01-01T00:00:01 |\
828                \n| foo  | 000  | 1970-01-01T00:00:02 |\
829                \n+------+------+---------------------+",
830            ),
831            String::from(
832                "+------+------+---------------------+\
833                \n| host | path | time_index          |\
834                \n+------+------+---------------------+\
835                \n| foo  | 001  | 1970-01-01T00:00:03 |\
836                \n+------+------+---------------------+",
837            ),
838            String::from(
839                "+------+------+---------------------+\
840                \n| host | path | time_index          |\
841                \n+------+------+---------------------+\
842                \n| bar  | 002  | 1970-01-01T00:00:04 |\
843                \n| bar  | 002  | 1970-01-01T00:00:05 |\
844                \n| bar  | 002  | 1970-01-01T00:00:06 |\
845                \n| bar  | 002  | 1970-01-01T00:00:07 |\
846                \n| bar  | 002  | 1970-01-01T00:00:08 |\
847                \n+------+------+---------------------+",
848            ),
849            String::from(
850                "+------+------+---------------------+\
851                \n| host | path | time_index          |\
852                \n+------+------+---------------------+\
853                \n| bar  | 003  | 1970-01-01T00:00:09 |\
854                \n+------+------+---------------------+",
855            ),
856            String::from(
857                "+------+------+---------------------+\
858                \n| host | path | time_index          |\
859                \n+------+------+---------------------+\
860                \n| bla  | 005  | 1970-01-01T00:00:10 |\
861                \n| bla  | 005  | 1970-01-01T00:00:11 |\
862                \n| bla  | 005  | 1970-01-01T00:00:12 |\
863                \n| bla  | 005  | 1970-01-01T00:00:13 |\
864                \n| bla  | 005  | 1970-01-01T00:00:14 |\
865                \n| bla  | 005  | 1970-01-01T00:00:15 |\
866                \n| bla  | 005  | 1970-01-01T00:00:16 |\
867                \n+------+------+---------------------+",
868            ),
869            String::from(
870                "+------+------+---------------------+\
871                \n| host | path | time_index          |\
872                \n+------+------+---------------------+\
873                \n| 🥺   | 001  | 1970-01-01T00:00:17 |\
874                \n| 🥺   | 001  | 1970-01-01T00:00:18 |\
875                \n| 🥺   | 001  | 1970-01-01T00:00:19 |\
876                \n| 🥺   | 001  | 1970-01-01T00:00:20 |\
877                \n| 🥺   | 001  | 1970-01-01T00:00:21 |\
878                \n+------+------+---------------------+",
879            ),
880            String::from(
881                "+------+------+---------------------+\
882                \n| host | path | time_index          |\
883                \n+------+------+---------------------+\
884                \n| 🫠   | 001  | 1970-01-01T00:00:22 |\
885                \n| 🫠   | 001  | 1970-01-01T00:00:23 |\
886                \n+------+------+---------------------+",
887            ),
888        ];
889        expectations.reverse();
890
891        while let Some(batch) = divide_stream.next().await {
892            let formatted =
893                datatypes::arrow::util::pretty::pretty_format_batches(&[batch.unwrap()])
894                    .unwrap()
895                    .to_string();
896            let expected = expectations.pop().unwrap();
897            assert_eq!(formatted, expected);
898        }
899    }
900
901    #[tokio::test]
902    async fn test_all_batches_same_combination() {
903        // Create a schema with host and path columns, same as prepare_test_data
904        let schema = Arc::new(Schema::new(vec![
905            Field::new("host", DataType::Utf8, true),
906            Field::new("path", DataType::Utf8, true),
907            Field::new(
908                "time_index",
909                DataType::Timestamp(datafusion::arrow::datatypes::TimeUnit::Millisecond, None),
910                false,
911            ),
912        ]));
913
914        // Create batches with three different combinations
915        // Each batch contains only one combination
916        // Batches with the same combination are adjacent
917
918        // First combination: "server1", "/var/log"
919        let batch1 = RecordBatch::try_new(
920            schema.clone(),
921            vec![
922                Arc::new(StringArray::from(vec!["server1", "server1", "server1"])) as _,
923                Arc::new(StringArray::from(vec!["/var/log", "/var/log", "/var/log"])) as _,
924                Arc::new(datafusion::arrow::array::TimestampMillisecondArray::from(
925                    vec![1000, 2000, 3000],
926                )) as _,
927            ],
928        )
929        .unwrap();
930
931        let batch2 = RecordBatch::try_new(
932            schema.clone(),
933            vec![
934                Arc::new(StringArray::from(vec!["server1", "server1"])) as _,
935                Arc::new(StringArray::from(vec!["/var/log", "/var/log"])) as _,
936                Arc::new(datafusion::arrow::array::TimestampMillisecondArray::from(
937                    vec![4000, 5000],
938                )) as _,
939            ],
940        )
941        .unwrap();
942
943        // Second combination: "server2", "/var/data"
944        let batch3 = RecordBatch::try_new(
945            schema.clone(),
946            vec![
947                Arc::new(StringArray::from(vec!["server2", "server2", "server2"])) as _,
948                Arc::new(StringArray::from(vec![
949                    "/var/data",
950                    "/var/data",
951                    "/var/data",
952                ])) as _,
953                Arc::new(datafusion::arrow::array::TimestampMillisecondArray::from(
954                    vec![6000, 7000, 8000],
955                )) as _,
956            ],
957        )
958        .unwrap();
959
960        let batch4 = RecordBatch::try_new(
961            schema.clone(),
962            vec![
963                Arc::new(StringArray::from(vec!["server2"])) as _,
964                Arc::new(StringArray::from(vec!["/var/data"])) as _,
965                Arc::new(datafusion::arrow::array::TimestampMillisecondArray::from(
966                    vec![9000],
967                )) as _,
968            ],
969        )
970        .unwrap();
971
972        // Third combination: "server3", "/opt/logs"
973        let batch5 = RecordBatch::try_new(
974            schema.clone(),
975            vec![
976                Arc::new(StringArray::from(vec!["server3", "server3"])) as _,
977                Arc::new(StringArray::from(vec!["/opt/logs", "/opt/logs"])) as _,
978                Arc::new(datafusion::arrow::array::TimestampMillisecondArray::from(
979                    vec![10000, 11000],
980                )) as _,
981            ],
982        )
983        .unwrap();
984
985        let batch6 = RecordBatch::try_new(
986            schema.clone(),
987            vec![
988                Arc::new(StringArray::from(vec!["server3", "server3", "server3"])) as _,
989                Arc::new(StringArray::from(vec![
990                    "/opt/logs",
991                    "/opt/logs",
992                    "/opt/logs",
993                ])) as _,
994                Arc::new(datafusion::arrow::array::TimestampMillisecondArray::from(
995                    vec![12000, 13000, 14000],
996                )) as _,
997            ],
998        )
999        .unwrap();
1000
1001        // Create MemoryExec with these batches, keeping same combinations adjacent
1002        let memory_exec = DataSourceExec::from_data_source(
1003            MemorySourceConfig::try_new(
1004                &[vec![batch1, batch2, batch3, batch4, batch5, batch6]],
1005                schema.clone(),
1006                None,
1007            )
1008            .unwrap(),
1009        );
1010
1011        // Create SeriesDivideExec
1012        let divide_exec = Arc::new(SeriesDivideExec {
1013            tag_columns: vec!["host".to_string(), "path".to_string()],
1014            time_index_column: "time_index".to_string(),
1015            input: memory_exec,
1016            metric: ExecutionPlanMetricsSet::new(),
1017        });
1018
1019        // Execute the division
1020        let session_context = SessionContext::default();
1021        let result =
1022            datafusion::physical_plan::collect(divide_exec.clone(), session_context.task_ctx())
1023                .await
1024                .unwrap();
1025
1026        // Verify that we got 3 batches (one for each combination)
1027        assert_eq!(result.len(), 3);
1028
1029        // First batch should have 5 rows (3 + 2 from the "server1" combination)
1030        assert_eq!(result[0].num_rows(), 5);
1031
1032        // Second batch should have 4 rows (3 + 1 from the "server2" combination)
1033        assert_eq!(result[1].num_rows(), 4);
1034
1035        // Third batch should have 5 rows (2 + 3 from the "server3" combination)
1036        assert_eq!(result[2].num_rows(), 5);
1037
1038        // Verify values in first batch (server1, /var/log)
1039        let host_array1 = result[0]
1040            .column(0)
1041            .as_any()
1042            .downcast_ref::<StringArray>()
1043            .unwrap();
1044        let path_array1 = result[0]
1045            .column(1)
1046            .as_any()
1047            .downcast_ref::<StringArray>()
1048            .unwrap();
1049        let time_index_array1 = result[0]
1050            .column(2)
1051            .as_any()
1052            .downcast_ref::<datafusion::arrow::array::TimestampMillisecondArray>()
1053            .unwrap();
1054
1055        for i in 0..5 {
1056            assert_eq!(host_array1.value(i), "server1");
1057            assert_eq!(path_array1.value(i), "/var/log");
1058            assert_eq!(time_index_array1.value(i), 1000 + (i as i64) * 1000);
1059        }
1060
1061        // Verify values in second batch (server2, /var/data)
1062        let host_array2 = result[1]
1063            .column(0)
1064            .as_any()
1065            .downcast_ref::<StringArray>()
1066            .unwrap();
1067        let path_array2 = result[1]
1068            .column(1)
1069            .as_any()
1070            .downcast_ref::<StringArray>()
1071            .unwrap();
1072        let time_index_array2 = result[1]
1073            .column(2)
1074            .as_any()
1075            .downcast_ref::<datafusion::arrow::array::TimestampMillisecondArray>()
1076            .unwrap();
1077
1078        for i in 0..4 {
1079            assert_eq!(host_array2.value(i), "server2");
1080            assert_eq!(path_array2.value(i), "/var/data");
1081            assert_eq!(time_index_array2.value(i), 6000 + (i as i64) * 1000);
1082        }
1083
1084        // Verify values in third batch (server3, /opt/logs)
1085        let host_array3 = result[2]
1086            .column(0)
1087            .as_any()
1088            .downcast_ref::<StringArray>()
1089            .unwrap();
1090        let path_array3 = result[2]
1091            .column(1)
1092            .as_any()
1093            .downcast_ref::<StringArray>()
1094            .unwrap();
1095        let time_index_array3 = result[2]
1096            .column(2)
1097            .as_any()
1098            .downcast_ref::<datafusion::arrow::array::TimestampMillisecondArray>()
1099            .unwrap();
1100
1101        for i in 0..5 {
1102            assert_eq!(host_array3.value(i), "server3");
1103            assert_eq!(path_array3.value(i), "/opt/logs");
1104            assert_eq!(time_index_array3.value(i), 10000 + (i as i64) * 1000);
1105        }
1106
1107        // Also verify streaming behavior
1108        let mut divide_stream = divide_exec
1109            .execute(0, SessionContext::default().task_ctx())
1110            .unwrap();
1111
1112        // Should produce three batches, one for each combination
1113        let batch1 = divide_stream.next().await.unwrap().unwrap();
1114        assert_eq!(batch1.num_rows(), 5); // server1 combination
1115
1116        let batch2 = divide_stream.next().await.unwrap().unwrap();
1117        assert_eq!(batch2.num_rows(), 4); // server2 combination
1118
1119        let batch3 = divide_stream.next().await.unwrap().unwrap();
1120        assert_eq!(batch3.num_rows(), 5); // server3 combination
1121
1122        // No more batches should be produced
1123        assert!(divide_stream.next().await.is_none());
1124    }
1125
1126    #[tokio::test]
1127    async fn test_string_tag_column_types() {
1128        let schema = Arc::new(Schema::new(vec![
1129            Field::new("tag_large", DataType::LargeUtf8, false),
1130            Field::new("tag_view", DataType::Utf8View, false),
1131            Field::new(
1132                "time_index",
1133                DataType::Timestamp(datafusion::arrow::datatypes::TimeUnit::Millisecond, None),
1134                false,
1135            ),
1136        ]));
1137
1138        let batch1 = RecordBatch::try_new(
1139            schema.clone(),
1140            vec![
1141                Arc::new(LargeStringArray::from(vec!["a", "a", "a", "a"])),
1142                Arc::new(StringViewArray::from(vec!["x", "x", "y", "y"])),
1143                Arc::new(datafusion::arrow::array::TimestampMillisecondArray::from(
1144                    vec![1000, 2000, 1000, 2000],
1145                )),
1146            ],
1147        )
1148        .unwrap();
1149
1150        let batch2 = RecordBatch::try_new(
1151            schema.clone(),
1152            vec![
1153                Arc::new(LargeStringArray::from(vec!["b", "b"])),
1154                Arc::new(StringViewArray::from(vec!["x", "x"])),
1155                Arc::new(datafusion::arrow::array::TimestampMillisecondArray::from(
1156                    vec![1000, 2000],
1157                )),
1158            ],
1159        )
1160        .unwrap();
1161
1162        let memory_exec: Arc<dyn ExecutionPlan> = Arc::new(DataSourceExec::new(Arc::new(
1163            MemorySourceConfig::try_new(&[vec![batch1, batch2]], schema.clone(), None).unwrap(),
1164        )));
1165
1166        let divide_exec = Arc::new(SeriesDivideExec {
1167            tag_columns: vec!["tag_large".to_string(), "tag_view".to_string()],
1168            time_index_column: "time_index".to_string(),
1169            input: memory_exec,
1170            metric: ExecutionPlanMetricsSet::new(),
1171        });
1172
1173        let session_context = SessionContext::default();
1174        let result = datafusion::physical_plan::collect(divide_exec, session_context.task_ctx())
1175            .await
1176            .unwrap();
1177
1178        assert_eq!(result.len(), 3);
1179        for ((expected_large, expected_view), batch) in [("a", "x"), ("a", "y"), ("b", "x")]
1180            .into_iter()
1181            .zip(result.iter())
1182        {
1183            assert_eq!(batch.num_rows(), 2);
1184
1185            let tag_large_array = batch
1186                .column(0)
1187                .as_any()
1188                .downcast_ref::<LargeStringArray>()
1189                .unwrap();
1190            let tag_view_array = batch
1191                .column(1)
1192                .as_any()
1193                .downcast_ref::<StringViewArray>()
1194                .unwrap();
1195
1196            for row in 0..batch.num_rows() {
1197                assert_eq!(tag_large_array.value(row), expected_large);
1198                assert_eq!(tag_view_array.value(row), expected_view);
1199            }
1200        }
1201    }
1202
1203    #[tokio::test]
1204    async fn test_u64_tag_column() {
1205        let schema = Arc::new(Schema::new(vec![
1206            Field::new("tsid", DataType::UInt64, false),
1207            Field::new(
1208                "time_index",
1209                DataType::Timestamp(datafusion::arrow::datatypes::TimeUnit::Millisecond, None),
1210                false,
1211            ),
1212        ]));
1213
1214        let batch1 = RecordBatch::try_new(
1215            schema.clone(),
1216            vec![
1217                Arc::new(UInt64Array::from(vec![1, 1, 2, 2])),
1218                Arc::new(datafusion::arrow::array::TimestampMillisecondArray::from(
1219                    vec![1000, 2000, 1000, 2000],
1220                )),
1221            ],
1222        )
1223        .unwrap();
1224
1225        let batch2 = RecordBatch::try_new(
1226            schema.clone(),
1227            vec![
1228                Arc::new(UInt64Array::from(vec![3, 3])),
1229                Arc::new(datafusion::arrow::array::TimestampMillisecondArray::from(
1230                    vec![1000, 2000],
1231                )),
1232            ],
1233        )
1234        .unwrap();
1235
1236        let memory_exec: Arc<dyn ExecutionPlan> = Arc::new(DataSourceExec::new(Arc::new(
1237            MemorySourceConfig::try_new(&[vec![batch1, batch2]], schema.clone(), None).unwrap(),
1238        )));
1239
1240        let divide_exec = Arc::new(SeriesDivideExec {
1241            tag_columns: vec!["tsid".to_string()],
1242            time_index_column: "time_index".to_string(),
1243            input: memory_exec,
1244            metric: ExecutionPlanMetricsSet::new(),
1245        });
1246
1247        let session_context = SessionContext::default();
1248        let result = datafusion::physical_plan::collect(divide_exec, session_context.task_ctx())
1249            .await
1250            .unwrap();
1251
1252        assert_eq!(result.len(), 3);
1253        for (expected_tsid, batch) in [1u64, 2u64, 3u64].into_iter().zip(result.iter()) {
1254            assert_eq!(batch.num_rows(), 2);
1255            let tsid_array = batch
1256                .column(0)
1257                .as_any()
1258                .downcast_ref::<UInt64Array>()
1259                .unwrap();
1260            assert!(tsid_array.iter().all(|v| v == Some(expected_tsid)));
1261        }
1262    }
1263}