promql/extension_plan/
series_divide.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16use std::pin::Pin;
17use std::sync::Arc;
18use std::task::{Context, Poll};
19
20use datafusion::arrow::array::{Array, StringArray};
21use datafusion::arrow::datatypes::SchemaRef;
22use datafusion::arrow::record_batch::RecordBatch;
23use datafusion::common::{DFSchema, DFSchemaRef};
24use datafusion::error::Result as DataFusionResult;
25use datafusion::execution::context::TaskContext;
26use datafusion::logical_expr::{EmptyRelation, Expr, LogicalPlan, UserDefinedLogicalNodeCore};
27use datafusion::physical_expr::{LexRequirement, PhysicalSortRequirement};
28use datafusion::physical_plan::expressions::Column as ColumnExpr;
29use datafusion::physical_plan::metrics::{
30    BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, MetricValue, MetricsSet,
31};
32use datafusion::physical_plan::{
33    DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, PlanProperties, RecordBatchStream,
34    SendableRecordBatchStream,
35};
36use datatypes::arrow::compute;
37use datatypes::compute::SortOptions;
38use futures::{ready, Stream, StreamExt};
39use greptime_proto::substrait_extension as pb;
40use prost::Message;
41use snafu::ResultExt;
42
43use crate::error::{DeserializeSnafu, Result};
44use crate::extension_plan::METRIC_NUM_SERIES;
45use crate::metrics::PROMQL_SERIES_COUNT;
46
47#[derive(Debug, PartialEq, Eq, Hash, PartialOrd)]
48pub struct SeriesDivide {
49    tag_columns: Vec<String>,
50    /// `SeriesDivide` requires `time_index` column's name to generate ordering requirement
51    /// for input data. But this plan itself doesn't depend on the ordering of time index
52    /// column. This is for follow on plans like `RangeManipulate`. Because requiring ordering
53    /// here can avoid unnecessary sort in follow on plans.
54    time_index_column: String,
55    input: LogicalPlan,
56}
57
58impl UserDefinedLogicalNodeCore for SeriesDivide {
59    fn name(&self) -> &str {
60        Self::name()
61    }
62
63    fn inputs(&self) -> Vec<&LogicalPlan> {
64        vec![&self.input]
65    }
66
67    fn schema(&self) -> &DFSchemaRef {
68        self.input.schema()
69    }
70
71    fn expressions(&self) -> Vec<Expr> {
72        vec![]
73    }
74
75    fn fmt_for_explain(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
76        write!(f, "PromSeriesDivide: tags={:?}", self.tag_columns)
77    }
78
79    fn with_exprs_and_inputs(
80        &self,
81        _exprs: Vec<Expr>,
82        inputs: Vec<LogicalPlan>,
83    ) -> DataFusionResult<Self> {
84        if inputs.is_empty() {
85            return Err(datafusion::error::DataFusionError::Internal(
86                "SeriesDivide must have at least one input".to_string(),
87            ));
88        }
89
90        Ok(Self {
91            tag_columns: self.tag_columns.clone(),
92            time_index_column: self.time_index_column.clone(),
93            input: inputs[0].clone(),
94        })
95    }
96}
97
98impl SeriesDivide {
99    pub fn new(tag_columns: Vec<String>, time_index_column: String, input: LogicalPlan) -> Self {
100        Self {
101            tag_columns,
102            time_index_column,
103            input,
104        }
105    }
106
107    pub const fn name() -> &'static str {
108        "SeriesDivide"
109    }
110
111    pub fn to_execution_plan(&self, exec_input: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
112        Arc::new(SeriesDivideExec {
113            tag_columns: self.tag_columns.clone(),
114            time_index_column: self.time_index_column.clone(),
115            input: exec_input,
116            metric: ExecutionPlanMetricsSet::new(),
117        })
118    }
119
120    pub fn tags(&self) -> &[String] {
121        &self.tag_columns
122    }
123
124    pub fn serialize(&self) -> Vec<u8> {
125        pb::SeriesDivide {
126            tag_columns: self.tag_columns.clone(),
127            time_index_column: self.time_index_column.clone(),
128        }
129        .encode_to_vec()
130    }
131
132    pub fn deserialize(bytes: &[u8]) -> Result<Self> {
133        let pb_series_divide = pb::SeriesDivide::decode(bytes).context(DeserializeSnafu)?;
134        let placeholder_plan = LogicalPlan::EmptyRelation(EmptyRelation {
135            produce_one_row: false,
136            schema: Arc::new(DFSchema::empty()),
137        });
138        Ok(Self {
139            tag_columns: pb_series_divide.tag_columns,
140            time_index_column: pb_series_divide.time_index_column,
141            input: placeholder_plan,
142        })
143    }
144}
145
146#[derive(Debug)]
147pub struct SeriesDivideExec {
148    tag_columns: Vec<String>,
149    time_index_column: String,
150    input: Arc<dyn ExecutionPlan>,
151    metric: ExecutionPlanMetricsSet,
152}
153
154impl ExecutionPlan for SeriesDivideExec {
155    fn as_any(&self) -> &dyn Any {
156        self
157    }
158
159    fn schema(&self) -> SchemaRef {
160        self.input.schema()
161    }
162
163    fn properties(&self) -> &PlanProperties {
164        self.input.properties()
165    }
166
167    fn required_input_distribution(&self) -> Vec<Distribution> {
168        let schema = self.input.schema();
169        vec![Distribution::HashPartitioned(
170            self.tag_columns
171                .iter()
172                // Safety: the tag column names is verified in the planning phase
173                .map(|tag| Arc::new(ColumnExpr::new_with_schema(tag, &schema).unwrap()) as _)
174                .collect(),
175        )]
176    }
177
178    fn required_input_ordering(&self) -> Vec<Option<LexRequirement>> {
179        let input_schema = self.input.schema();
180        let mut exprs: Vec<PhysicalSortRequirement> = self
181            .tag_columns
182            .iter()
183            .map(|tag| PhysicalSortRequirement {
184                // Safety: the tag column names is verified in the planning phase
185                expr: Arc::new(ColumnExpr::new_with_schema(tag, &input_schema).unwrap()),
186                options: Some(SortOptions {
187                    descending: false,
188                    nulls_first: true,
189                }),
190            })
191            .collect();
192
193        exprs.push(PhysicalSortRequirement {
194            expr: Arc::new(
195                ColumnExpr::new_with_schema(&self.time_index_column, &input_schema).unwrap(),
196            ),
197            options: Some(SortOptions {
198                descending: false,
199                nulls_first: true,
200            }),
201        });
202        vec![Some(LexRequirement::new(exprs))]
203    }
204
205    fn maintains_input_order(&self) -> Vec<bool> {
206        vec![true; self.children().len()]
207    }
208
209    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
210        vec![&self.input]
211    }
212
213    fn with_new_children(
214        self: Arc<Self>,
215        children: Vec<Arc<dyn ExecutionPlan>>,
216    ) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
217        assert!(!children.is_empty());
218        Ok(Arc::new(Self {
219            tag_columns: self.tag_columns.clone(),
220            time_index_column: self.time_index_column.clone(),
221            input: children[0].clone(),
222            metric: self.metric.clone(),
223        }))
224    }
225
226    fn execute(
227        &self,
228        partition: usize,
229        context: Arc<TaskContext>,
230    ) -> DataFusionResult<SendableRecordBatchStream> {
231        let baseline_metric = BaselineMetrics::new(&self.metric, partition);
232        let metrics_builder = MetricBuilder::new(&self.metric);
233        let num_series = Count::new();
234        metrics_builder
235            .with_partition(partition)
236            .build(MetricValue::Count {
237                name: METRIC_NUM_SERIES.into(),
238                count: num_series.clone(),
239            });
240
241        let input = self.input.execute(partition, context)?;
242        let schema = input.schema();
243        let tag_indices = self
244            .tag_columns
245            .iter()
246            .map(|tag| {
247                schema
248                    .column_with_name(tag)
249                    .unwrap_or_else(|| panic!("tag column not found {tag}"))
250                    .0
251            })
252            .collect();
253        Ok(Box::pin(SeriesDivideStream {
254            tag_indices,
255            buffer: vec![],
256            schema,
257            input,
258            metric: baseline_metric,
259            num_series,
260            inspect_start: 0,
261        }))
262    }
263
264    fn metrics(&self) -> Option<MetricsSet> {
265        Some(self.metric.clone_inner())
266    }
267
268    fn name(&self) -> &str {
269        "SeriesDivideExec"
270    }
271}
272
273impl DisplayAs for SeriesDivideExec {
274    fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
275        match t {
276            DisplayFormatType::Default | DisplayFormatType::Verbose => {
277                write!(f, "PromSeriesDivideExec: tags={:?}", self.tag_columns)
278            }
279        }
280    }
281}
282
283/// Assume the input stream is ordered on the tag columns.
284pub struct SeriesDivideStream {
285    tag_indices: Vec<usize>,
286    buffer: Vec<RecordBatch>,
287    schema: SchemaRef,
288    input: SendableRecordBatchStream,
289    metric: BaselineMetrics,
290    /// Index of buffered batches to start inspect next time.
291    inspect_start: usize,
292    /// Number of series processed.
293    num_series: Count,
294}
295
296impl RecordBatchStream for SeriesDivideStream {
297    fn schema(&self) -> SchemaRef {
298        self.schema.clone()
299    }
300}
301
302impl Stream for SeriesDivideStream {
303    type Item = DataFusionResult<RecordBatch>;
304
305    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
306        loop {
307            if !self.buffer.is_empty() {
308                let timer = std::time::Instant::now();
309                let cut_at = match self.find_first_diff_row() {
310                    Ok(cut_at) => cut_at,
311                    Err(e) => return Poll::Ready(Some(Err(e))),
312                };
313                if let Some((batch_index, row_index)) = cut_at {
314                    // slice out the first time series and return it.
315                    let half_batch_of_first_series =
316                        self.buffer[batch_index].slice(0, row_index + 1);
317                    let half_batch_of_second_series = self.buffer[batch_index].slice(
318                        row_index + 1,
319                        self.buffer[batch_index].num_rows() - row_index - 1,
320                    );
321                    let result_batches = self
322                        .buffer
323                        .drain(0..batch_index)
324                        .chain([half_batch_of_first_series])
325                        .collect::<Vec<_>>();
326                    if half_batch_of_second_series.num_rows() > 0 {
327                        self.buffer[0] = half_batch_of_second_series;
328                    } else {
329                        self.buffer.remove(0);
330                    }
331                    let result_batch = compute::concat_batches(&self.schema, &result_batches)?;
332
333                    self.inspect_start = 0;
334                    self.num_series.add(1);
335                    self.metric.elapsed_compute().add_elapsed(timer);
336                    return Poll::Ready(Some(Ok(result_batch)));
337                } else {
338                    self.metric.elapsed_compute().add_elapsed(timer);
339                    // continue to fetch next batch as the current buffer only contains one time series.
340                    let next_batch = ready!(self.as_mut().fetch_next_batch(cx)).transpose()?;
341                    let timer = std::time::Instant::now();
342                    if let Some(next_batch) = next_batch {
343                        if next_batch.num_rows() != 0 {
344                            self.buffer.push(next_batch);
345                        }
346                        continue;
347                    } else {
348                        // input stream is ended
349                        let result = compute::concat_batches(&self.schema, &self.buffer)?;
350                        self.buffer.clear();
351                        self.inspect_start = 0;
352                        self.num_series.add(1);
353                        self.metric.elapsed_compute().add_elapsed(timer);
354                        return Poll::Ready(Some(Ok(result)));
355                    }
356                }
357            } else {
358                let batch = match ready!(self.as_mut().fetch_next_batch(cx)) {
359                    Some(Ok(batch)) => batch,
360                    None => {
361                        PROMQL_SERIES_COUNT.observe(self.num_series.value() as f64);
362                        return Poll::Ready(None);
363                    }
364                    error => return Poll::Ready(error),
365                };
366                self.buffer.push(batch);
367                continue;
368            }
369        }
370    }
371}
372
373impl SeriesDivideStream {
374    fn fetch_next_batch(
375        mut self: Pin<&mut Self>,
376        cx: &mut Context<'_>,
377    ) -> Poll<Option<DataFusionResult<RecordBatch>>> {
378        let poll = self.input.poll_next_unpin(cx);
379        self.metric.record_poll(poll)
380    }
381
382    /// Return the position to cut buffer.
383    /// None implies the current buffer only contains one time series.
384    fn find_first_diff_row(&mut self) -> DataFusionResult<Option<(usize, usize)>> {
385        // fast path: no tag columns means all data belongs to the same series.
386        if self.tag_indices.is_empty() {
387            return Ok(None);
388        }
389
390        let mut resumed_batch_index = self.inspect_start;
391
392        for batch in &self.buffer[resumed_batch_index..] {
393            let num_rows = batch.num_rows();
394            let mut result_index = num_rows;
395
396            // check if the first row is the same with last batch's last row
397            if resumed_batch_index > self.inspect_start.checked_sub(1).unwrap_or_default() {
398                let last_batch = &self.buffer[resumed_batch_index - 1];
399                let last_row = last_batch.num_rows() - 1;
400                for index in &self.tag_indices {
401                    let current_array = batch.column(*index);
402                    let last_array = last_batch.column(*index);
403                    let current_string_array = current_array
404                        .as_any()
405                        .downcast_ref::<StringArray>()
406                        .ok_or_else(|| {
407                            datafusion::error::DataFusionError::Internal(
408                                "Failed to downcast tag column to StringArray".to_string(),
409                            )
410                        })?;
411                    let last_string_array = last_array
412                        .as_any()
413                        .downcast_ref::<StringArray>()
414                        .ok_or_else(|| {
415                            datafusion::error::DataFusionError::Internal(
416                                "Failed to downcast tag column to StringArray".to_string(),
417                            )
418                        })?;
419                    let current_value = current_string_array.value(0);
420                    let last_value = last_string_array.value(last_row);
421                    if current_value != last_value {
422                        return Ok(Some((resumed_batch_index - 1, last_batch.num_rows() - 1)));
423                    }
424                }
425            }
426
427            // quick check if all rows are the same by comparing the first and last row in this batch
428            let mut all_same = true;
429            for index in &self.tag_indices {
430                let array = batch.column(*index);
431                let string_array =
432                    array
433                        .as_any()
434                        .downcast_ref::<StringArray>()
435                        .ok_or_else(|| {
436                            datafusion::error::DataFusionError::Internal(
437                                "Failed to downcast tag column to StringArray".to_string(),
438                            )
439                        })?;
440                if string_array.value(0) != string_array.value(num_rows - 1) {
441                    all_same = false;
442                    break;
443                }
444            }
445            if all_same {
446                resumed_batch_index += 1;
447                continue;
448            }
449
450            // check column by column
451            for index in &self.tag_indices {
452                let array = batch.column(*index);
453                let string_array =
454                    array
455                        .as_any()
456                        .downcast_ref::<StringArray>()
457                        .ok_or_else(|| {
458                            datafusion::error::DataFusionError::Internal(
459                                "Failed to downcast tag column to StringArray".to_string(),
460                            )
461                        })?;
462                // the first row number that not equal to the next row.
463                let mut same_until = 0;
464                while same_until < num_rows - 1 {
465                    if string_array.value(same_until) != string_array.value(same_until + 1) {
466                        break;
467                    }
468                    same_until += 1;
469                }
470                result_index = result_index.min(same_until);
471            }
472
473            if result_index + 1 >= num_rows {
474                // all rows are the same, inspect next batch
475                resumed_batch_index += 1;
476            } else {
477                return Ok(Some((resumed_batch_index, result_index)));
478            }
479        }
480
481        self.inspect_start = resumed_batch_index;
482        Ok(None)
483    }
484}
485
486#[cfg(test)]
487mod test {
488    use datafusion::arrow::datatypes::{DataType, Field, Schema};
489    use datafusion::physical_plan::memory::MemoryExec;
490    use datafusion::prelude::SessionContext;
491
492    use super::*;
493
494    fn prepare_test_data() -> MemoryExec {
495        let schema = Arc::new(Schema::new(vec![
496            Field::new("host", DataType::Utf8, true),
497            Field::new("path", DataType::Utf8, true),
498            Field::new(
499                "time_index",
500                DataType::Timestamp(datafusion::arrow::datatypes::TimeUnit::Millisecond, None),
501                false,
502            ),
503        ]));
504
505        let path_column_1 = Arc::new(StringArray::from(vec![
506            "foo", "foo", "foo", "bar", "bar", "bar", "bar", "bar", "bar", "bla", "bla", "bla",
507        ])) as _;
508        let host_column_1 = Arc::new(StringArray::from(vec![
509            "000", "000", "001", "002", "002", "002", "002", "002", "003", "005", "005", "005",
510        ])) as _;
511        let time_index_column_1 = Arc::new(
512            datafusion::arrow::array::TimestampMillisecondArray::from(vec![
513                1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000, 10000, 11000, 12000,
514            ]),
515        ) as _;
516
517        let path_column_2 = Arc::new(StringArray::from(vec!["bla", "bla", "bla"])) as _;
518        let host_column_2 = Arc::new(StringArray::from(vec!["005", "005", "005"])) as _;
519        let time_index_column_2 = Arc::new(
520            datafusion::arrow::array::TimestampMillisecondArray::from(vec![13000, 14000, 15000]),
521        ) as _;
522
523        let path_column_3 = Arc::new(StringArray::from(vec![
524            "bla", "🥺", "🥺", "🥺", "🥺", "🥺", "🫠", "🫠",
525        ])) as _;
526        let host_column_3 = Arc::new(StringArray::from(vec![
527            "005", "001", "001", "001", "001", "001", "001", "001",
528        ])) as _;
529        let time_index_column_3 =
530            Arc::new(datafusion::arrow::array::TimestampMillisecondArray::from(
531                vec![16000, 17000, 18000, 19000, 20000, 21000, 22000, 23000],
532            )) as _;
533
534        let data_1 = RecordBatch::try_new(
535            schema.clone(),
536            vec![path_column_1, host_column_1, time_index_column_1],
537        )
538        .unwrap();
539        let data_2 = RecordBatch::try_new(
540            schema.clone(),
541            vec![path_column_2, host_column_2, time_index_column_2],
542        )
543        .unwrap();
544        let data_3 = RecordBatch::try_new(
545            schema.clone(),
546            vec![path_column_3, host_column_3, time_index_column_3],
547        )
548        .unwrap();
549
550        MemoryExec::try_new(&[vec![data_1, data_2, data_3]], schema, None).unwrap()
551    }
552
553    #[tokio::test]
554    async fn overall_data() {
555        let memory_exec = Arc::new(prepare_test_data());
556        let divide_exec = Arc::new(SeriesDivideExec {
557            tag_columns: vec!["host".to_string(), "path".to_string()],
558            time_index_column: "time_index".to_string(),
559            input: memory_exec,
560            metric: ExecutionPlanMetricsSet::new(),
561        });
562        let session_context = SessionContext::default();
563        let result = datafusion::physical_plan::collect(divide_exec, session_context.task_ctx())
564            .await
565            .unwrap();
566        let result_literal = datatypes::arrow::util::pretty::pretty_format_batches(&result)
567            .unwrap()
568            .to_string();
569
570        let expected = String::from(
571            "+------+------+---------------------+\
572            \n| host | path | time_index          |\
573            \n+------+------+---------------------+\
574            \n| foo  | 000  | 1970-01-01T00:00:01 |\
575            \n| foo  | 000  | 1970-01-01T00:00:02 |\
576            \n| foo  | 001  | 1970-01-01T00:00:03 |\
577            \n| bar  | 002  | 1970-01-01T00:00:04 |\
578            \n| bar  | 002  | 1970-01-01T00:00:05 |\
579            \n| bar  | 002  | 1970-01-01T00:00:06 |\
580            \n| bar  | 002  | 1970-01-01T00:00:07 |\
581            \n| bar  | 002  | 1970-01-01T00:00:08 |\
582            \n| bar  | 003  | 1970-01-01T00:00:09 |\
583            \n| bla  | 005  | 1970-01-01T00:00:10 |\
584            \n| bla  | 005  | 1970-01-01T00:00:11 |\
585            \n| bla  | 005  | 1970-01-01T00:00:12 |\
586            \n| bla  | 005  | 1970-01-01T00:00:13 |\
587            \n| bla  | 005  | 1970-01-01T00:00:14 |\
588            \n| bla  | 005  | 1970-01-01T00:00:15 |\
589            \n| bla  | 005  | 1970-01-01T00:00:16 |\
590            \n| 🥺   | 001  | 1970-01-01T00:00:17 |\
591            \n| 🥺   | 001  | 1970-01-01T00:00:18 |\
592            \n| 🥺   | 001  | 1970-01-01T00:00:19 |\
593            \n| 🥺   | 001  | 1970-01-01T00:00:20 |\
594            \n| 🥺   | 001  | 1970-01-01T00:00:21 |\
595            \n| 🫠   | 001  | 1970-01-01T00:00:22 |\
596            \n| 🫠   | 001  | 1970-01-01T00:00:23 |\
597            \n+------+------+---------------------+",
598        );
599        assert_eq!(result_literal, expected);
600    }
601
602    #[tokio::test]
603    async fn per_batch_data() {
604        let memory_exec = Arc::new(prepare_test_data());
605        let divide_exec = Arc::new(SeriesDivideExec {
606            tag_columns: vec!["host".to_string(), "path".to_string()],
607            time_index_column: "time_index".to_string(),
608            input: memory_exec,
609            metric: ExecutionPlanMetricsSet::new(),
610        });
611        let mut divide_stream = divide_exec
612            .execute(0, SessionContext::default().task_ctx())
613            .unwrap();
614
615        let mut expectations = vec![
616            String::from(
617                "+------+------+---------------------+\
618                \n| host | path | time_index          |\
619                \n+------+------+---------------------+\
620                \n| foo  | 000  | 1970-01-01T00:00:01 |\
621                \n| foo  | 000  | 1970-01-01T00:00:02 |\
622                \n+------+------+---------------------+",
623            ),
624            String::from(
625                "+------+------+---------------------+\
626                \n| host | path | time_index          |\
627                \n+------+------+---------------------+\
628                \n| foo  | 001  | 1970-01-01T00:00:03 |\
629                \n+------+------+---------------------+",
630            ),
631            String::from(
632                "+------+------+---------------------+\
633                \n| host | path | time_index          |\
634                \n+------+------+---------------------+\
635                \n| bar  | 002  | 1970-01-01T00:00:04 |\
636                \n| bar  | 002  | 1970-01-01T00:00:05 |\
637                \n| bar  | 002  | 1970-01-01T00:00:06 |\
638                \n| bar  | 002  | 1970-01-01T00:00:07 |\
639                \n| bar  | 002  | 1970-01-01T00:00:08 |\
640                \n+------+------+---------------------+",
641            ),
642            String::from(
643                "+------+------+---------------------+\
644                \n| host | path | time_index          |\
645                \n+------+------+---------------------+\
646                \n| bar  | 003  | 1970-01-01T00:00:09 |\
647                \n+------+------+---------------------+",
648            ),
649            String::from(
650                "+------+------+---------------------+\
651                \n| host | path | time_index          |\
652                \n+------+------+---------------------+\
653                \n| bla  | 005  | 1970-01-01T00:00:10 |\
654                \n| bla  | 005  | 1970-01-01T00:00:11 |\
655                \n| bla  | 005  | 1970-01-01T00:00:12 |\
656                \n| bla  | 005  | 1970-01-01T00:00:13 |\
657                \n| bla  | 005  | 1970-01-01T00:00:14 |\
658                \n| bla  | 005  | 1970-01-01T00:00:15 |\
659                \n| bla  | 005  | 1970-01-01T00:00:16 |\
660                \n+------+------+---------------------+",
661            ),
662            String::from(
663                "+------+------+---------------------+\
664                \n| host | path | time_index          |\
665                \n+------+------+---------------------+\
666                \n| 🥺   | 001  | 1970-01-01T00:00:17 |\
667                \n| 🥺   | 001  | 1970-01-01T00:00:18 |\
668                \n| 🥺   | 001  | 1970-01-01T00:00:19 |\
669                \n| 🥺   | 001  | 1970-01-01T00:00:20 |\
670                \n| 🥺   | 001  | 1970-01-01T00:00:21 |\
671                \n+------+------+---------------------+",
672            ),
673            String::from(
674                "+------+------+---------------------+\
675                \n| host | path | time_index          |\
676                \n+------+------+---------------------+\
677                \n| 🫠   | 001  | 1970-01-01T00:00:22 |\
678                \n| 🫠   | 001  | 1970-01-01T00:00:23 |\
679                \n+------+------+---------------------+",
680            ),
681        ];
682        expectations.reverse();
683
684        while let Some(batch) = divide_stream.next().await {
685            let formatted =
686                datatypes::arrow::util::pretty::pretty_format_batches(&[batch.unwrap()])
687                    .unwrap()
688                    .to_string();
689            let expected = expectations.pop().unwrap();
690            assert_eq!(formatted, expected);
691        }
692    }
693
694    #[tokio::test]
695    async fn test_all_batches_same_combination() {
696        // Create a schema with host and path columns, same as prepare_test_data
697        let schema = Arc::new(Schema::new(vec![
698            Field::new("host", DataType::Utf8, true),
699            Field::new("path", DataType::Utf8, true),
700            Field::new(
701                "time_index",
702                DataType::Timestamp(datafusion::arrow::datatypes::TimeUnit::Millisecond, None),
703                false,
704            ),
705        ]));
706
707        // Create batches with three different combinations
708        // Each batch contains only one combination
709        // Batches with the same combination are adjacent
710
711        // First combination: "server1", "/var/log"
712        let batch1 = RecordBatch::try_new(
713            schema.clone(),
714            vec![
715                Arc::new(StringArray::from(vec!["server1", "server1", "server1"])) as _,
716                Arc::new(StringArray::from(vec!["/var/log", "/var/log", "/var/log"])) as _,
717                Arc::new(datafusion::arrow::array::TimestampMillisecondArray::from(
718                    vec![1000, 2000, 3000],
719                )) as _,
720            ],
721        )
722        .unwrap();
723
724        let batch2 = RecordBatch::try_new(
725            schema.clone(),
726            vec![
727                Arc::new(StringArray::from(vec!["server1", "server1"])) as _,
728                Arc::new(StringArray::from(vec!["/var/log", "/var/log"])) as _,
729                Arc::new(datafusion::arrow::array::TimestampMillisecondArray::from(
730                    vec![4000, 5000],
731                )) as _,
732            ],
733        )
734        .unwrap();
735
736        // Second combination: "server2", "/var/data"
737        let batch3 = RecordBatch::try_new(
738            schema.clone(),
739            vec![
740                Arc::new(StringArray::from(vec!["server2", "server2", "server2"])) as _,
741                Arc::new(StringArray::from(vec![
742                    "/var/data",
743                    "/var/data",
744                    "/var/data",
745                ])) as _,
746                Arc::new(datafusion::arrow::array::TimestampMillisecondArray::from(
747                    vec![6000, 7000, 8000],
748                )) as _,
749            ],
750        )
751        .unwrap();
752
753        let batch4 = RecordBatch::try_new(
754            schema.clone(),
755            vec![
756                Arc::new(StringArray::from(vec!["server2"])) as _,
757                Arc::new(StringArray::from(vec!["/var/data"])) as _,
758                Arc::new(datafusion::arrow::array::TimestampMillisecondArray::from(
759                    vec![9000],
760                )) as _,
761            ],
762        )
763        .unwrap();
764
765        // Third combination: "server3", "/opt/logs"
766        let batch5 = RecordBatch::try_new(
767            schema.clone(),
768            vec![
769                Arc::new(StringArray::from(vec!["server3", "server3"])) as _,
770                Arc::new(StringArray::from(vec!["/opt/logs", "/opt/logs"])) as _,
771                Arc::new(datafusion::arrow::array::TimestampMillisecondArray::from(
772                    vec![10000, 11000],
773                )) as _,
774            ],
775        )
776        .unwrap();
777
778        let batch6 = RecordBatch::try_new(
779            schema.clone(),
780            vec![
781                Arc::new(StringArray::from(vec!["server3", "server3", "server3"])) as _,
782                Arc::new(StringArray::from(vec![
783                    "/opt/logs",
784                    "/opt/logs",
785                    "/opt/logs",
786                ])) as _,
787                Arc::new(datafusion::arrow::array::TimestampMillisecondArray::from(
788                    vec![12000, 13000, 14000],
789                )) as _,
790            ],
791        )
792        .unwrap();
793
794        // Create MemoryExec with these batches, keeping same combinations adjacent
795        let memory_exec = Arc::new(
796            MemoryExec::try_new(
797                &[vec![batch1, batch2, batch3, batch4, batch5, batch6]],
798                schema.clone(),
799                None,
800            )
801            .unwrap(),
802        );
803
804        // Create SeriesDivideExec
805        let divide_exec = Arc::new(SeriesDivideExec {
806            tag_columns: vec!["host".to_string(), "path".to_string()],
807            time_index_column: "time_index".to_string(),
808            input: memory_exec,
809            metric: ExecutionPlanMetricsSet::new(),
810        });
811
812        // Execute the division
813        let session_context = SessionContext::default();
814        let result =
815            datafusion::physical_plan::collect(divide_exec.clone(), session_context.task_ctx())
816                .await
817                .unwrap();
818
819        // Verify that we got 3 batches (one for each combination)
820        assert_eq!(result.len(), 3);
821
822        // First batch should have 5 rows (3 + 2 from the "server1" combination)
823        assert_eq!(result[0].num_rows(), 5);
824
825        // Second batch should have 4 rows (3 + 1 from the "server2" combination)
826        assert_eq!(result[1].num_rows(), 4);
827
828        // Third batch should have 5 rows (2 + 3 from the "server3" combination)
829        assert_eq!(result[2].num_rows(), 5);
830
831        // Verify values in first batch (server1, /var/log)
832        let host_array1 = result[0]
833            .column(0)
834            .as_any()
835            .downcast_ref::<StringArray>()
836            .unwrap();
837        let path_array1 = result[0]
838            .column(1)
839            .as_any()
840            .downcast_ref::<StringArray>()
841            .unwrap();
842        let time_index_array1 = result[0]
843            .column(2)
844            .as_any()
845            .downcast_ref::<datafusion::arrow::array::TimestampMillisecondArray>()
846            .unwrap();
847
848        for i in 0..5 {
849            assert_eq!(host_array1.value(i), "server1");
850            assert_eq!(path_array1.value(i), "/var/log");
851            assert_eq!(time_index_array1.value(i), 1000 + (i as i64) * 1000);
852        }
853
854        // Verify values in second batch (server2, /var/data)
855        let host_array2 = result[1]
856            .column(0)
857            .as_any()
858            .downcast_ref::<StringArray>()
859            .unwrap();
860        let path_array2 = result[1]
861            .column(1)
862            .as_any()
863            .downcast_ref::<StringArray>()
864            .unwrap();
865        let time_index_array2 = result[1]
866            .column(2)
867            .as_any()
868            .downcast_ref::<datafusion::arrow::array::TimestampMillisecondArray>()
869            .unwrap();
870
871        for i in 0..4 {
872            assert_eq!(host_array2.value(i), "server2");
873            assert_eq!(path_array2.value(i), "/var/data");
874            assert_eq!(time_index_array2.value(i), 6000 + (i as i64) * 1000);
875        }
876
877        // Verify values in third batch (server3, /opt/logs)
878        let host_array3 = result[2]
879            .column(0)
880            .as_any()
881            .downcast_ref::<StringArray>()
882            .unwrap();
883        let path_array3 = result[2]
884            .column(1)
885            .as_any()
886            .downcast_ref::<StringArray>()
887            .unwrap();
888        let time_index_array3 = result[2]
889            .column(2)
890            .as_any()
891            .downcast_ref::<datafusion::arrow::array::TimestampMillisecondArray>()
892            .unwrap();
893
894        for i in 0..5 {
895            assert_eq!(host_array3.value(i), "server3");
896            assert_eq!(path_array3.value(i), "/opt/logs");
897            assert_eq!(time_index_array3.value(i), 10000 + (i as i64) * 1000);
898        }
899
900        // Also verify streaming behavior
901        let mut divide_stream = divide_exec
902            .execute(0, SessionContext::default().task_ctx())
903            .unwrap();
904
905        // Should produce three batches, one for each combination
906        let batch1 = divide_stream.next().await.unwrap().unwrap();
907        assert_eq!(batch1.num_rows(), 5); // server1 combination
908
909        let batch2 = divide_stream.next().await.unwrap().unwrap();
910        assert_eq!(batch2.num_rows(), 4); // server2 combination
911
912        let batch3 = divide_stream.next().await.unwrap().unwrap();
913        assert_eq!(batch3.num_rows(), 5); // server3 combination
914
915        // No more batches should be produced
916        assert!(divide_stream.next().await.is_none());
917    }
918}