promql/extension_plan/
series_divide.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16use std::pin::Pin;
17use std::sync::Arc;
18use std::task::{Context, Poll};
19
20use datafusion::arrow::array::{Array, StringArray};
21use datafusion::arrow::datatypes::SchemaRef;
22use datafusion::arrow::record_batch::RecordBatch;
23use datafusion::common::{DFSchema, DFSchemaRef};
24use datafusion::error::Result as DataFusionResult;
25use datafusion::execution::context::TaskContext;
26use datafusion::logical_expr::{EmptyRelation, Expr, LogicalPlan, UserDefinedLogicalNodeCore};
27use datafusion::physical_expr::{LexRequirement, OrderingRequirements, PhysicalSortRequirement};
28use datafusion::physical_plan::expressions::Column as ColumnExpr;
29use datafusion::physical_plan::metrics::{
30    BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, MetricValue, MetricsSet,
31};
32use datafusion::physical_plan::{
33    DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, PlanProperties, RecordBatchStream,
34    SendableRecordBatchStream,
35};
36use datatypes::arrow::compute;
37use datatypes::compute::SortOptions;
38use futures::{Stream, StreamExt, ready};
39use greptime_proto::substrait_extension as pb;
40use prost::Message;
41use snafu::ResultExt;
42
43use crate::error::{DeserializeSnafu, Result};
44use crate::extension_plan::{METRIC_NUM_SERIES, resolve_column_name, serialize_column_index};
45use crate::metrics::PROMQL_SERIES_COUNT;
46
47#[derive(Debug, PartialEq, Eq, Hash, PartialOrd)]
48pub struct SeriesDivide {
49    tag_columns: Vec<String>,
50    /// `SeriesDivide` requires `time_index` column's name to generate ordering requirement
51    /// for input data. But this plan itself doesn't depend on the ordering of time index
52    /// column. This is for follow on plans like `RangeManipulate`. Because requiring ordering
53    /// here can avoid unnecessary sort in follow on plans.
54    time_index_column: String,
55    input: LogicalPlan,
56    unfix: Option<UnfixIndices>,
57}
58
59#[derive(Debug, PartialEq, Eq, Hash, PartialOrd)]
60struct UnfixIndices {
61    pub tag_column_indices: Vec<u64>,
62    pub time_index_column_idx: u64,
63}
64
65impl UserDefinedLogicalNodeCore for SeriesDivide {
66    fn name(&self) -> &str {
67        Self::name()
68    }
69
70    fn inputs(&self) -> Vec<&LogicalPlan> {
71        vec![&self.input]
72    }
73
74    fn schema(&self) -> &DFSchemaRef {
75        self.input.schema()
76    }
77
78    fn expressions(&self) -> Vec<Expr> {
79        vec![]
80    }
81
82    fn fmt_for_explain(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
83        write!(f, "PromSeriesDivide: tags={:?}", self.tag_columns)
84    }
85
86    fn with_exprs_and_inputs(
87        &self,
88        _exprs: Vec<Expr>,
89        inputs: Vec<LogicalPlan>,
90    ) -> DataFusionResult<Self> {
91        if inputs.is_empty() {
92            return Err(datafusion::error::DataFusionError::Internal(
93                "SeriesDivide must have at least one input".to_string(),
94            ));
95        }
96
97        let input: LogicalPlan = inputs[0].clone();
98        let input_schema = input.schema();
99
100        if let Some(unfix) = &self.unfix {
101            // transform indices to names
102            let tag_columns = unfix
103                .tag_column_indices
104                .iter()
105                .map(|idx| resolve_column_name(*idx, input_schema, "SeriesDivide", "tag"))
106                .collect::<DataFusionResult<Vec<String>>>()?;
107
108            let time_index_column = resolve_column_name(
109                unfix.time_index_column_idx,
110                input_schema,
111                "SeriesDivide",
112                "time index",
113            )?;
114
115            Ok(Self {
116                tag_columns,
117                time_index_column,
118                input,
119                unfix: None,
120            })
121        } else {
122            Ok(Self {
123                tag_columns: self.tag_columns.clone(),
124                time_index_column: self.time_index_column.clone(),
125                input,
126                unfix: None,
127            })
128        }
129    }
130}
131
132impl SeriesDivide {
133    pub fn new(tag_columns: Vec<String>, time_index_column: String, input: LogicalPlan) -> Self {
134        Self {
135            tag_columns,
136            time_index_column,
137            input,
138            unfix: None,
139        }
140    }
141
142    pub const fn name() -> &'static str {
143        "SeriesDivide"
144    }
145
146    pub fn to_execution_plan(&self, exec_input: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
147        Arc::new(SeriesDivideExec {
148            tag_columns: self.tag_columns.clone(),
149            time_index_column: self.time_index_column.clone(),
150            input: exec_input,
151            metric: ExecutionPlanMetricsSet::new(),
152        })
153    }
154
155    pub fn tags(&self) -> &[String] {
156        &self.tag_columns
157    }
158
159    pub fn serialize(&self) -> Vec<u8> {
160        let tag_column_indices = self
161            .tag_columns
162            .iter()
163            .map(|name| serialize_column_index(self.input.schema(), name))
164            .collect::<Vec<u64>>();
165
166        let time_index_column_idx =
167            serialize_column_index(self.input.schema(), &self.time_index_column);
168
169        pb::SeriesDivide {
170            tag_column_indices,
171            time_index_column_idx,
172            ..Default::default()
173        }
174        .encode_to_vec()
175    }
176
177    pub fn deserialize(bytes: &[u8]) -> Result<Self> {
178        let pb_series_divide = pb::SeriesDivide::decode(bytes).context(DeserializeSnafu)?;
179        let placeholder_plan = LogicalPlan::EmptyRelation(EmptyRelation {
180            produce_one_row: false,
181            schema: Arc::new(DFSchema::empty()),
182        });
183
184        let unfix = UnfixIndices {
185            tag_column_indices: pb_series_divide.tag_column_indices.clone(),
186            time_index_column_idx: pb_series_divide.time_index_column_idx,
187        };
188
189        Ok(Self {
190            tag_columns: Vec::new(),
191            time_index_column: String::new(),
192            input: placeholder_plan,
193            unfix: Some(unfix),
194        })
195    }
196}
197
198#[derive(Debug)]
199pub struct SeriesDivideExec {
200    tag_columns: Vec<String>,
201    time_index_column: String,
202    input: Arc<dyn ExecutionPlan>,
203    metric: ExecutionPlanMetricsSet,
204}
205
206impl ExecutionPlan for SeriesDivideExec {
207    fn as_any(&self) -> &dyn Any {
208        self
209    }
210
211    fn schema(&self) -> SchemaRef {
212        self.input.schema()
213    }
214
215    fn properties(&self) -> &PlanProperties {
216        self.input.properties()
217    }
218
219    fn required_input_distribution(&self) -> Vec<Distribution> {
220        let schema = self.input.schema();
221        vec![Distribution::HashPartitioned(
222            self.tag_columns
223                .iter()
224                // Safety: the tag column names is verified in the planning phase
225                .map(|tag| Arc::new(ColumnExpr::new_with_schema(tag, &schema).unwrap()) as _)
226                .collect(),
227        )]
228    }
229
230    fn required_input_ordering(&self) -> Vec<Option<OrderingRequirements>> {
231        let input_schema = self.input.schema();
232        let mut exprs: Vec<PhysicalSortRequirement> = self
233            .tag_columns
234            .iter()
235            .map(|tag| PhysicalSortRequirement {
236                // Safety: the tag column names is verified in the planning phase
237                expr: Arc::new(ColumnExpr::new_with_schema(tag, &input_schema).unwrap()),
238                options: Some(SortOptions {
239                    descending: false,
240                    nulls_first: true,
241                }),
242            })
243            .collect();
244
245        exprs.push(PhysicalSortRequirement {
246            expr: Arc::new(
247                ColumnExpr::new_with_schema(&self.time_index_column, &input_schema).unwrap(),
248            ),
249            options: Some(SortOptions {
250                descending: false,
251                nulls_first: true,
252            }),
253        });
254
255        // Safety: `exprs` is not empty
256        let requirement = LexRequirement::new(exprs).unwrap();
257
258        vec![Some(OrderingRequirements::Hard(vec![requirement]))]
259    }
260
261    fn maintains_input_order(&self) -> Vec<bool> {
262        vec![true; self.children().len()]
263    }
264
265    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
266        vec![&self.input]
267    }
268
269    fn with_new_children(
270        self: Arc<Self>,
271        children: Vec<Arc<dyn ExecutionPlan>>,
272    ) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
273        assert!(!children.is_empty());
274        Ok(Arc::new(Self {
275            tag_columns: self.tag_columns.clone(),
276            time_index_column: self.time_index_column.clone(),
277            input: children[0].clone(),
278            metric: self.metric.clone(),
279        }))
280    }
281
282    fn execute(
283        &self,
284        partition: usize,
285        context: Arc<TaskContext>,
286    ) -> DataFusionResult<SendableRecordBatchStream> {
287        let baseline_metric = BaselineMetrics::new(&self.metric, partition);
288        let metrics_builder = MetricBuilder::new(&self.metric);
289        let num_series = Count::new();
290        metrics_builder
291            .with_partition(partition)
292            .build(MetricValue::Count {
293                name: METRIC_NUM_SERIES.into(),
294                count: num_series.clone(),
295            });
296
297        let input = self.input.execute(partition, context)?;
298        let schema = input.schema();
299        let tag_indices = self
300            .tag_columns
301            .iter()
302            .map(|tag| {
303                schema
304                    .column_with_name(tag)
305                    .unwrap_or_else(|| panic!("tag column not found {tag}"))
306                    .0
307            })
308            .collect();
309        Ok(Box::pin(SeriesDivideStream {
310            tag_indices,
311            buffer: vec![],
312            schema,
313            input,
314            metric: baseline_metric,
315            num_series,
316            inspect_start: 0,
317        }))
318    }
319
320    fn metrics(&self) -> Option<MetricsSet> {
321        Some(self.metric.clone_inner())
322    }
323
324    fn name(&self) -> &str {
325        "SeriesDivideExec"
326    }
327}
328
329impl DisplayAs for SeriesDivideExec {
330    fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
331        match t {
332            DisplayFormatType::Default
333            | DisplayFormatType::Verbose
334            | DisplayFormatType::TreeRender => {
335                write!(f, "PromSeriesDivideExec: tags={:?}", self.tag_columns)
336            }
337        }
338    }
339}
340
341/// Assume the input stream is ordered on the tag columns.
342pub struct SeriesDivideStream {
343    tag_indices: Vec<usize>,
344    buffer: Vec<RecordBatch>,
345    schema: SchemaRef,
346    input: SendableRecordBatchStream,
347    metric: BaselineMetrics,
348    /// Index of buffered batches to start inspect next time.
349    inspect_start: usize,
350    /// Number of series processed.
351    num_series: Count,
352}
353
354impl RecordBatchStream for SeriesDivideStream {
355    fn schema(&self) -> SchemaRef {
356        self.schema.clone()
357    }
358}
359
360impl Stream for SeriesDivideStream {
361    type Item = DataFusionResult<RecordBatch>;
362
363    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
364        loop {
365            if !self.buffer.is_empty() {
366                let timer = std::time::Instant::now();
367                let cut_at = match self.find_first_diff_row() {
368                    Ok(cut_at) => cut_at,
369                    Err(e) => return Poll::Ready(Some(Err(e))),
370                };
371                if let Some((batch_index, row_index)) = cut_at {
372                    // slice out the first time series and return it.
373                    let half_batch_of_first_series =
374                        self.buffer[batch_index].slice(0, row_index + 1);
375                    let half_batch_of_second_series = self.buffer[batch_index].slice(
376                        row_index + 1,
377                        self.buffer[batch_index].num_rows() - row_index - 1,
378                    );
379                    let result_batches = self
380                        .buffer
381                        .drain(0..batch_index)
382                        .chain([half_batch_of_first_series])
383                        .collect::<Vec<_>>();
384                    if half_batch_of_second_series.num_rows() > 0 {
385                        self.buffer[0] = half_batch_of_second_series;
386                    } else {
387                        self.buffer.remove(0);
388                    }
389                    let result_batch = compute::concat_batches(&self.schema, &result_batches)?;
390
391                    self.inspect_start = 0;
392                    self.num_series.add(1);
393                    self.metric.elapsed_compute().add_elapsed(timer);
394                    return Poll::Ready(Some(Ok(result_batch)));
395                } else {
396                    self.metric.elapsed_compute().add_elapsed(timer);
397                    // continue to fetch next batch as the current buffer only contains one time series.
398                    let next_batch = ready!(self.as_mut().fetch_next_batch(cx)).transpose()?;
399                    let timer = std::time::Instant::now();
400                    if let Some(next_batch) = next_batch {
401                        if next_batch.num_rows() != 0 {
402                            self.buffer.push(next_batch);
403                        }
404                        continue;
405                    } else {
406                        // input stream is ended
407                        let result = compute::concat_batches(&self.schema, &self.buffer)?;
408                        self.buffer.clear();
409                        self.inspect_start = 0;
410                        self.num_series.add(1);
411                        self.metric.elapsed_compute().add_elapsed(timer);
412                        return Poll::Ready(Some(Ok(result)));
413                    }
414                }
415            } else {
416                let batch = match ready!(self.as_mut().fetch_next_batch(cx)) {
417                    Some(Ok(batch)) => batch,
418                    None => {
419                        PROMQL_SERIES_COUNT.observe(self.num_series.value() as f64);
420                        return Poll::Ready(None);
421                    }
422                    error => return Poll::Ready(error),
423                };
424                self.buffer.push(batch);
425                continue;
426            }
427        }
428    }
429}
430
431impl SeriesDivideStream {
432    fn fetch_next_batch(
433        mut self: Pin<&mut Self>,
434        cx: &mut Context<'_>,
435    ) -> Poll<Option<DataFusionResult<RecordBatch>>> {
436        let poll = self.input.poll_next_unpin(cx);
437        self.metric.record_poll(poll)
438    }
439
440    /// Return the position to cut buffer.
441    /// None implies the current buffer only contains one time series.
442    fn find_first_diff_row(&mut self) -> DataFusionResult<Option<(usize, usize)>> {
443        // fast path: no tag columns means all data belongs to the same series.
444        if self.tag_indices.is_empty() {
445            return Ok(None);
446        }
447
448        let mut resumed_batch_index = self.inspect_start;
449
450        for batch in &self.buffer[resumed_batch_index..] {
451            let num_rows = batch.num_rows();
452            let mut result_index = num_rows;
453
454            // check if the first row is the same with last batch's last row
455            if resumed_batch_index > self.inspect_start.checked_sub(1).unwrap_or_default() {
456                let last_batch = &self.buffer[resumed_batch_index - 1];
457                let last_row = last_batch.num_rows() - 1;
458                for index in &self.tag_indices {
459                    let current_array = batch.column(*index);
460                    let last_array = last_batch.column(*index);
461                    let current_string_array = current_array
462                        .as_any()
463                        .downcast_ref::<StringArray>()
464                        .ok_or_else(|| {
465                            datafusion::error::DataFusionError::Internal(
466                                "Failed to downcast tag column to StringArray".to_string(),
467                            )
468                        })?;
469                    let last_string_array = last_array
470                        .as_any()
471                        .downcast_ref::<StringArray>()
472                        .ok_or_else(|| {
473                            datafusion::error::DataFusionError::Internal(
474                                "Failed to downcast tag column to StringArray".to_string(),
475                            )
476                        })?;
477                    let current_value = current_string_array.value(0);
478                    let last_value = last_string_array.value(last_row);
479                    if current_value != last_value {
480                        return Ok(Some((resumed_batch_index - 1, last_batch.num_rows() - 1)));
481                    }
482                }
483            }
484
485            // quick check if all rows are the same by comparing the first and last row in this batch
486            let mut all_same = true;
487            for index in &self.tag_indices {
488                let array = batch.column(*index);
489                let string_array =
490                    array
491                        .as_any()
492                        .downcast_ref::<StringArray>()
493                        .ok_or_else(|| {
494                            datafusion::error::DataFusionError::Internal(
495                                "Failed to downcast tag column to StringArray".to_string(),
496                            )
497                        })?;
498                if string_array.value(0) != string_array.value(num_rows - 1) {
499                    all_same = false;
500                    break;
501                }
502            }
503            if all_same {
504                resumed_batch_index += 1;
505                continue;
506            }
507
508            // check column by column
509            for index in &self.tag_indices {
510                let array = batch.column(*index);
511                let string_array =
512                    array
513                        .as_any()
514                        .downcast_ref::<StringArray>()
515                        .ok_or_else(|| {
516                            datafusion::error::DataFusionError::Internal(
517                                "Failed to downcast tag column to StringArray".to_string(),
518                            )
519                        })?;
520                // the first row number that not equal to the next row.
521                let mut same_until = 0;
522                while same_until < num_rows - 1 {
523                    if string_array.value(same_until) != string_array.value(same_until + 1) {
524                        break;
525                    }
526                    same_until += 1;
527                }
528                result_index = result_index.min(same_until);
529            }
530
531            if result_index + 1 >= num_rows {
532                // all rows are the same, inspect next batch
533                resumed_batch_index += 1;
534            } else {
535                return Ok(Some((resumed_batch_index, result_index)));
536            }
537        }
538
539        self.inspect_start = resumed_batch_index;
540        Ok(None)
541    }
542}
543
544#[cfg(test)]
545mod test {
546    use datafusion::arrow::datatypes::{DataType, Field, Schema};
547    use datafusion::datasource::memory::MemorySourceConfig;
548    use datafusion::datasource::source::DataSourceExec;
549    use datafusion::prelude::SessionContext;
550
551    use super::*;
552
553    fn prepare_test_data() -> DataSourceExec {
554        let schema = Arc::new(Schema::new(vec![
555            Field::new("host", DataType::Utf8, true),
556            Field::new("path", DataType::Utf8, true),
557            Field::new(
558                "time_index",
559                DataType::Timestamp(datafusion::arrow::datatypes::TimeUnit::Millisecond, None),
560                false,
561            ),
562        ]));
563
564        let path_column_1 = Arc::new(StringArray::from(vec![
565            "foo", "foo", "foo", "bar", "bar", "bar", "bar", "bar", "bar", "bla", "bla", "bla",
566        ])) as _;
567        let host_column_1 = Arc::new(StringArray::from(vec![
568            "000", "000", "001", "002", "002", "002", "002", "002", "003", "005", "005", "005",
569        ])) as _;
570        let time_index_column_1 = Arc::new(
571            datafusion::arrow::array::TimestampMillisecondArray::from(vec![
572                1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000, 10000, 11000, 12000,
573            ]),
574        ) as _;
575
576        let path_column_2 = Arc::new(StringArray::from(vec!["bla", "bla", "bla"])) as _;
577        let host_column_2 = Arc::new(StringArray::from(vec!["005", "005", "005"])) as _;
578        let time_index_column_2 = Arc::new(
579            datafusion::arrow::array::TimestampMillisecondArray::from(vec![13000, 14000, 15000]),
580        ) as _;
581
582        let path_column_3 = Arc::new(StringArray::from(vec![
583            "bla", "🥺", "🥺", "🥺", "🥺", "🥺", "🫠", "🫠",
584        ])) as _;
585        let host_column_3 = Arc::new(StringArray::from(vec![
586            "005", "001", "001", "001", "001", "001", "001", "001",
587        ])) as _;
588        let time_index_column_3 =
589            Arc::new(datafusion::arrow::array::TimestampMillisecondArray::from(
590                vec![16000, 17000, 18000, 19000, 20000, 21000, 22000, 23000],
591            )) as _;
592
593        let data_1 = RecordBatch::try_new(
594            schema.clone(),
595            vec![path_column_1, host_column_1, time_index_column_1],
596        )
597        .unwrap();
598        let data_2 = RecordBatch::try_new(
599            schema.clone(),
600            vec![path_column_2, host_column_2, time_index_column_2],
601        )
602        .unwrap();
603        let data_3 = RecordBatch::try_new(
604            schema.clone(),
605            vec![path_column_3, host_column_3, time_index_column_3],
606        )
607        .unwrap();
608
609        DataSourceExec::new(Arc::new(
610            MemorySourceConfig::try_new(&[vec![data_1, data_2, data_3]], schema, None).unwrap(),
611        ))
612    }
613
614    #[tokio::test]
615    async fn overall_data() {
616        let memory_exec = Arc::new(prepare_test_data());
617        let divide_exec = Arc::new(SeriesDivideExec {
618            tag_columns: vec!["host".to_string(), "path".to_string()],
619            time_index_column: "time_index".to_string(),
620            input: memory_exec,
621            metric: ExecutionPlanMetricsSet::new(),
622        });
623        let session_context = SessionContext::default();
624        let result = datafusion::physical_plan::collect(divide_exec, session_context.task_ctx())
625            .await
626            .unwrap();
627        let result_literal = datatypes::arrow::util::pretty::pretty_format_batches(&result)
628            .unwrap()
629            .to_string();
630
631        let expected = String::from(
632            "+------+------+---------------------+\
633            \n| host | path | time_index          |\
634            \n+------+------+---------------------+\
635            \n| foo  | 000  | 1970-01-01T00:00:01 |\
636            \n| foo  | 000  | 1970-01-01T00:00:02 |\
637            \n| foo  | 001  | 1970-01-01T00:00:03 |\
638            \n| bar  | 002  | 1970-01-01T00:00:04 |\
639            \n| bar  | 002  | 1970-01-01T00:00:05 |\
640            \n| bar  | 002  | 1970-01-01T00:00:06 |\
641            \n| bar  | 002  | 1970-01-01T00:00:07 |\
642            \n| bar  | 002  | 1970-01-01T00:00:08 |\
643            \n| bar  | 003  | 1970-01-01T00:00:09 |\
644            \n| bla  | 005  | 1970-01-01T00:00:10 |\
645            \n| bla  | 005  | 1970-01-01T00:00:11 |\
646            \n| bla  | 005  | 1970-01-01T00:00:12 |\
647            \n| bla  | 005  | 1970-01-01T00:00:13 |\
648            \n| bla  | 005  | 1970-01-01T00:00:14 |\
649            \n| bla  | 005  | 1970-01-01T00:00:15 |\
650            \n| bla  | 005  | 1970-01-01T00:00:16 |\
651            \n| 🥺   | 001  | 1970-01-01T00:00:17 |\
652            \n| 🥺   | 001  | 1970-01-01T00:00:18 |\
653            \n| 🥺   | 001  | 1970-01-01T00:00:19 |\
654            \n| 🥺   | 001  | 1970-01-01T00:00:20 |\
655            \n| 🥺   | 001  | 1970-01-01T00:00:21 |\
656            \n| 🫠   | 001  | 1970-01-01T00:00:22 |\
657            \n| 🫠   | 001  | 1970-01-01T00:00:23 |\
658            \n+------+------+---------------------+",
659        );
660        assert_eq!(result_literal, expected);
661    }
662
663    #[tokio::test]
664    async fn per_batch_data() {
665        let memory_exec = Arc::new(prepare_test_data());
666        let divide_exec = Arc::new(SeriesDivideExec {
667            tag_columns: vec!["host".to_string(), "path".to_string()],
668            time_index_column: "time_index".to_string(),
669            input: memory_exec,
670            metric: ExecutionPlanMetricsSet::new(),
671        });
672        let mut divide_stream = divide_exec
673            .execute(0, SessionContext::default().task_ctx())
674            .unwrap();
675
676        let mut expectations = vec![
677            String::from(
678                "+------+------+---------------------+\
679                \n| host | path | time_index          |\
680                \n+------+------+---------------------+\
681                \n| foo  | 000  | 1970-01-01T00:00:01 |\
682                \n| foo  | 000  | 1970-01-01T00:00:02 |\
683                \n+------+------+---------------------+",
684            ),
685            String::from(
686                "+------+------+---------------------+\
687                \n| host | path | time_index          |\
688                \n+------+------+---------------------+\
689                \n| foo  | 001  | 1970-01-01T00:00:03 |\
690                \n+------+------+---------------------+",
691            ),
692            String::from(
693                "+------+------+---------------------+\
694                \n| host | path | time_index          |\
695                \n+------+------+---------------------+\
696                \n| bar  | 002  | 1970-01-01T00:00:04 |\
697                \n| bar  | 002  | 1970-01-01T00:00:05 |\
698                \n| bar  | 002  | 1970-01-01T00:00:06 |\
699                \n| bar  | 002  | 1970-01-01T00:00:07 |\
700                \n| bar  | 002  | 1970-01-01T00:00:08 |\
701                \n+------+------+---------------------+",
702            ),
703            String::from(
704                "+------+------+---------------------+\
705                \n| host | path | time_index          |\
706                \n+------+------+---------------------+\
707                \n| bar  | 003  | 1970-01-01T00:00:09 |\
708                \n+------+------+---------------------+",
709            ),
710            String::from(
711                "+------+------+---------------------+\
712                \n| host | path | time_index          |\
713                \n+------+------+---------------------+\
714                \n| bla  | 005  | 1970-01-01T00:00:10 |\
715                \n| bla  | 005  | 1970-01-01T00:00:11 |\
716                \n| bla  | 005  | 1970-01-01T00:00:12 |\
717                \n| bla  | 005  | 1970-01-01T00:00:13 |\
718                \n| bla  | 005  | 1970-01-01T00:00:14 |\
719                \n| bla  | 005  | 1970-01-01T00:00:15 |\
720                \n| bla  | 005  | 1970-01-01T00:00:16 |\
721                \n+------+------+---------------------+",
722            ),
723            String::from(
724                "+------+------+---------------------+\
725                \n| host | path | time_index          |\
726                \n+------+------+---------------------+\
727                \n| 🥺   | 001  | 1970-01-01T00:00:17 |\
728                \n| 🥺   | 001  | 1970-01-01T00:00:18 |\
729                \n| 🥺   | 001  | 1970-01-01T00:00:19 |\
730                \n| 🥺   | 001  | 1970-01-01T00:00:20 |\
731                \n| 🥺   | 001  | 1970-01-01T00:00:21 |\
732                \n+------+------+---------------------+",
733            ),
734            String::from(
735                "+------+------+---------------------+\
736                \n| host | path | time_index          |\
737                \n+------+------+---------------------+\
738                \n| 🫠   | 001  | 1970-01-01T00:00:22 |\
739                \n| 🫠   | 001  | 1970-01-01T00:00:23 |\
740                \n+------+------+---------------------+",
741            ),
742        ];
743        expectations.reverse();
744
745        while let Some(batch) = divide_stream.next().await {
746            let formatted =
747                datatypes::arrow::util::pretty::pretty_format_batches(&[batch.unwrap()])
748                    .unwrap()
749                    .to_string();
750            let expected = expectations.pop().unwrap();
751            assert_eq!(formatted, expected);
752        }
753    }
754
755    #[tokio::test]
756    async fn test_all_batches_same_combination() {
757        // Create a schema with host and path columns, same as prepare_test_data
758        let schema = Arc::new(Schema::new(vec![
759            Field::new("host", DataType::Utf8, true),
760            Field::new("path", DataType::Utf8, true),
761            Field::new(
762                "time_index",
763                DataType::Timestamp(datafusion::arrow::datatypes::TimeUnit::Millisecond, None),
764                false,
765            ),
766        ]));
767
768        // Create batches with three different combinations
769        // Each batch contains only one combination
770        // Batches with the same combination are adjacent
771
772        // First combination: "server1", "/var/log"
773        let batch1 = RecordBatch::try_new(
774            schema.clone(),
775            vec![
776                Arc::new(StringArray::from(vec!["server1", "server1", "server1"])) as _,
777                Arc::new(StringArray::from(vec!["/var/log", "/var/log", "/var/log"])) as _,
778                Arc::new(datafusion::arrow::array::TimestampMillisecondArray::from(
779                    vec![1000, 2000, 3000],
780                )) as _,
781            ],
782        )
783        .unwrap();
784
785        let batch2 = RecordBatch::try_new(
786            schema.clone(),
787            vec![
788                Arc::new(StringArray::from(vec!["server1", "server1"])) as _,
789                Arc::new(StringArray::from(vec!["/var/log", "/var/log"])) as _,
790                Arc::new(datafusion::arrow::array::TimestampMillisecondArray::from(
791                    vec![4000, 5000],
792                )) as _,
793            ],
794        )
795        .unwrap();
796
797        // Second combination: "server2", "/var/data"
798        let batch3 = RecordBatch::try_new(
799            schema.clone(),
800            vec![
801                Arc::new(StringArray::from(vec!["server2", "server2", "server2"])) as _,
802                Arc::new(StringArray::from(vec![
803                    "/var/data",
804                    "/var/data",
805                    "/var/data",
806                ])) as _,
807                Arc::new(datafusion::arrow::array::TimestampMillisecondArray::from(
808                    vec![6000, 7000, 8000],
809                )) as _,
810            ],
811        )
812        .unwrap();
813
814        let batch4 = RecordBatch::try_new(
815            schema.clone(),
816            vec![
817                Arc::new(StringArray::from(vec!["server2"])) as _,
818                Arc::new(StringArray::from(vec!["/var/data"])) as _,
819                Arc::new(datafusion::arrow::array::TimestampMillisecondArray::from(
820                    vec![9000],
821                )) as _,
822            ],
823        )
824        .unwrap();
825
826        // Third combination: "server3", "/opt/logs"
827        let batch5 = RecordBatch::try_new(
828            schema.clone(),
829            vec![
830                Arc::new(StringArray::from(vec!["server3", "server3"])) as _,
831                Arc::new(StringArray::from(vec!["/opt/logs", "/opt/logs"])) as _,
832                Arc::new(datafusion::arrow::array::TimestampMillisecondArray::from(
833                    vec![10000, 11000],
834                )) as _,
835            ],
836        )
837        .unwrap();
838
839        let batch6 = RecordBatch::try_new(
840            schema.clone(),
841            vec![
842                Arc::new(StringArray::from(vec!["server3", "server3", "server3"])) as _,
843                Arc::new(StringArray::from(vec![
844                    "/opt/logs",
845                    "/opt/logs",
846                    "/opt/logs",
847                ])) as _,
848                Arc::new(datafusion::arrow::array::TimestampMillisecondArray::from(
849                    vec![12000, 13000, 14000],
850                )) as _,
851            ],
852        )
853        .unwrap();
854
855        // Create MemoryExec with these batches, keeping same combinations adjacent
856        let memory_exec = DataSourceExec::from_data_source(
857            MemorySourceConfig::try_new(
858                &[vec![batch1, batch2, batch3, batch4, batch5, batch6]],
859                schema.clone(),
860                None,
861            )
862            .unwrap(),
863        );
864
865        // Create SeriesDivideExec
866        let divide_exec = Arc::new(SeriesDivideExec {
867            tag_columns: vec!["host".to_string(), "path".to_string()],
868            time_index_column: "time_index".to_string(),
869            input: memory_exec,
870            metric: ExecutionPlanMetricsSet::new(),
871        });
872
873        // Execute the division
874        let session_context = SessionContext::default();
875        let result =
876            datafusion::physical_plan::collect(divide_exec.clone(), session_context.task_ctx())
877                .await
878                .unwrap();
879
880        // Verify that we got 3 batches (one for each combination)
881        assert_eq!(result.len(), 3);
882
883        // First batch should have 5 rows (3 + 2 from the "server1" combination)
884        assert_eq!(result[0].num_rows(), 5);
885
886        // Second batch should have 4 rows (3 + 1 from the "server2" combination)
887        assert_eq!(result[1].num_rows(), 4);
888
889        // Third batch should have 5 rows (2 + 3 from the "server3" combination)
890        assert_eq!(result[2].num_rows(), 5);
891
892        // Verify values in first batch (server1, /var/log)
893        let host_array1 = result[0]
894            .column(0)
895            .as_any()
896            .downcast_ref::<StringArray>()
897            .unwrap();
898        let path_array1 = result[0]
899            .column(1)
900            .as_any()
901            .downcast_ref::<StringArray>()
902            .unwrap();
903        let time_index_array1 = result[0]
904            .column(2)
905            .as_any()
906            .downcast_ref::<datafusion::arrow::array::TimestampMillisecondArray>()
907            .unwrap();
908
909        for i in 0..5 {
910            assert_eq!(host_array1.value(i), "server1");
911            assert_eq!(path_array1.value(i), "/var/log");
912            assert_eq!(time_index_array1.value(i), 1000 + (i as i64) * 1000);
913        }
914
915        // Verify values in second batch (server2, /var/data)
916        let host_array2 = result[1]
917            .column(0)
918            .as_any()
919            .downcast_ref::<StringArray>()
920            .unwrap();
921        let path_array2 = result[1]
922            .column(1)
923            .as_any()
924            .downcast_ref::<StringArray>()
925            .unwrap();
926        let time_index_array2 = result[1]
927            .column(2)
928            .as_any()
929            .downcast_ref::<datafusion::arrow::array::TimestampMillisecondArray>()
930            .unwrap();
931
932        for i in 0..4 {
933            assert_eq!(host_array2.value(i), "server2");
934            assert_eq!(path_array2.value(i), "/var/data");
935            assert_eq!(time_index_array2.value(i), 6000 + (i as i64) * 1000);
936        }
937
938        // Verify values in third batch (server3, /opt/logs)
939        let host_array3 = result[2]
940            .column(0)
941            .as_any()
942            .downcast_ref::<StringArray>()
943            .unwrap();
944        let path_array3 = result[2]
945            .column(1)
946            .as_any()
947            .downcast_ref::<StringArray>()
948            .unwrap();
949        let time_index_array3 = result[2]
950            .column(2)
951            .as_any()
952            .downcast_ref::<datafusion::arrow::array::TimestampMillisecondArray>()
953            .unwrap();
954
955        for i in 0..5 {
956            assert_eq!(host_array3.value(i), "server3");
957            assert_eq!(path_array3.value(i), "/opt/logs");
958            assert_eq!(time_index_array3.value(i), 10000 + (i as i64) * 1000);
959        }
960
961        // Also verify streaming behavior
962        let mut divide_stream = divide_exec
963            .execute(0, SessionContext::default().task_ctx())
964            .unwrap();
965
966        // Should produce three batches, one for each combination
967        let batch1 = divide_stream.next().await.unwrap().unwrap();
968        assert_eq!(batch1.num_rows(), 5); // server1 combination
969
970        let batch2 = divide_stream.next().await.unwrap().unwrap();
971        assert_eq!(batch2.num_rows(), 4); // server2 combination
972
973        let batch3 = divide_stream.next().await.unwrap().unwrap();
974        assert_eq!(batch3.num_rows(), 5); // server3 combination
975
976        // No more batches should be produced
977        assert!(divide_stream.next().await.is_none());
978    }
979}