promql/extension_plan/
series_divide.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16use std::pin::Pin;
17use std::sync::Arc;
18use std::task::{Context, Poll};
19
20use datafusion::arrow::array::{Array, StringArray};
21use datafusion::arrow::datatypes::SchemaRef;
22use datafusion::arrow::record_batch::RecordBatch;
23use datafusion::common::{DFSchema, DFSchemaRef};
24use datafusion::error::Result as DataFusionResult;
25use datafusion::execution::context::TaskContext;
26use datafusion::logical_expr::{EmptyRelation, Expr, LogicalPlan, UserDefinedLogicalNodeCore};
27use datafusion::physical_expr::{LexRequirement, OrderingRequirements, PhysicalSortRequirement};
28use datafusion::physical_plan::expressions::Column as ColumnExpr;
29use datafusion::physical_plan::metrics::{
30    BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, MetricValue, MetricsSet,
31};
32use datafusion::physical_plan::{
33    DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, PlanProperties, RecordBatchStream,
34    SendableRecordBatchStream,
35};
36use datatypes::arrow::compute;
37use datatypes::compute::SortOptions;
38use futures::{ready, Stream, StreamExt};
39use greptime_proto::substrait_extension as pb;
40use prost::Message;
41use snafu::ResultExt;
42
43use crate::error::{DeserializeSnafu, Result};
44use crate::extension_plan::METRIC_NUM_SERIES;
45use crate::metrics::PROMQL_SERIES_COUNT;
46
47#[derive(Debug, PartialEq, Eq, Hash, PartialOrd)]
48pub struct SeriesDivide {
49    tag_columns: Vec<String>,
50    /// `SeriesDivide` requires `time_index` column's name to generate ordering requirement
51    /// for input data. But this plan itself doesn't depend on the ordering of time index
52    /// column. This is for follow on plans like `RangeManipulate`. Because requiring ordering
53    /// here can avoid unnecessary sort in follow on plans.
54    time_index_column: String,
55    input: LogicalPlan,
56}
57
58impl UserDefinedLogicalNodeCore for SeriesDivide {
59    fn name(&self) -> &str {
60        Self::name()
61    }
62
63    fn inputs(&self) -> Vec<&LogicalPlan> {
64        vec![&self.input]
65    }
66
67    fn schema(&self) -> &DFSchemaRef {
68        self.input.schema()
69    }
70
71    fn expressions(&self) -> Vec<Expr> {
72        vec![]
73    }
74
75    fn fmt_for_explain(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
76        write!(f, "PromSeriesDivide: tags={:?}", self.tag_columns)
77    }
78
79    fn with_exprs_and_inputs(
80        &self,
81        _exprs: Vec<Expr>,
82        inputs: Vec<LogicalPlan>,
83    ) -> DataFusionResult<Self> {
84        if inputs.is_empty() {
85            return Err(datafusion::error::DataFusionError::Internal(
86                "SeriesDivide must have at least one input".to_string(),
87            ));
88        }
89
90        Ok(Self {
91            tag_columns: self.tag_columns.clone(),
92            time_index_column: self.time_index_column.clone(),
93            input: inputs[0].clone(),
94        })
95    }
96}
97
98impl SeriesDivide {
99    pub fn new(tag_columns: Vec<String>, time_index_column: String, input: LogicalPlan) -> Self {
100        Self {
101            tag_columns,
102            time_index_column,
103            input,
104        }
105    }
106
107    pub const fn name() -> &'static str {
108        "SeriesDivide"
109    }
110
111    pub fn to_execution_plan(&self, exec_input: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
112        Arc::new(SeriesDivideExec {
113            tag_columns: self.tag_columns.clone(),
114            time_index_column: self.time_index_column.clone(),
115            input: exec_input,
116            metric: ExecutionPlanMetricsSet::new(),
117        })
118    }
119
120    pub fn tags(&self) -> &[String] {
121        &self.tag_columns
122    }
123
124    pub fn serialize(&self) -> Vec<u8> {
125        pb::SeriesDivide {
126            tag_columns: self.tag_columns.clone(),
127            time_index_column: self.time_index_column.clone(),
128        }
129        .encode_to_vec()
130    }
131
132    pub fn deserialize(bytes: &[u8]) -> Result<Self> {
133        let pb_series_divide = pb::SeriesDivide::decode(bytes).context(DeserializeSnafu)?;
134        let placeholder_plan = LogicalPlan::EmptyRelation(EmptyRelation {
135            produce_one_row: false,
136            schema: Arc::new(DFSchema::empty()),
137        });
138        Ok(Self {
139            tag_columns: pb_series_divide.tag_columns,
140            time_index_column: pb_series_divide.time_index_column,
141            input: placeholder_plan,
142        })
143    }
144}
145
146#[derive(Debug)]
147pub struct SeriesDivideExec {
148    tag_columns: Vec<String>,
149    time_index_column: String,
150    input: Arc<dyn ExecutionPlan>,
151    metric: ExecutionPlanMetricsSet,
152}
153
154impl ExecutionPlan for SeriesDivideExec {
155    fn as_any(&self) -> &dyn Any {
156        self
157    }
158
159    fn schema(&self) -> SchemaRef {
160        self.input.schema()
161    }
162
163    fn properties(&self) -> &PlanProperties {
164        self.input.properties()
165    }
166
167    fn required_input_distribution(&self) -> Vec<Distribution> {
168        let schema = self.input.schema();
169        vec![Distribution::HashPartitioned(
170            self.tag_columns
171                .iter()
172                // Safety: the tag column names is verified in the planning phase
173                .map(|tag| Arc::new(ColumnExpr::new_with_schema(tag, &schema).unwrap()) as _)
174                .collect(),
175        )]
176    }
177
178    fn required_input_ordering(&self) -> Vec<Option<OrderingRequirements>> {
179        let input_schema = self.input.schema();
180        let mut exprs: Vec<PhysicalSortRequirement> = self
181            .tag_columns
182            .iter()
183            .map(|tag| PhysicalSortRequirement {
184                // Safety: the tag column names is verified in the planning phase
185                expr: Arc::new(ColumnExpr::new_with_schema(tag, &input_schema).unwrap()),
186                options: Some(SortOptions {
187                    descending: false,
188                    nulls_first: true,
189                }),
190            })
191            .collect();
192
193        exprs.push(PhysicalSortRequirement {
194            expr: Arc::new(
195                ColumnExpr::new_with_schema(&self.time_index_column, &input_schema).unwrap(),
196            ),
197            options: Some(SortOptions {
198                descending: false,
199                nulls_first: true,
200            }),
201        });
202
203        // Safety: `exprs` is not empty
204        let requirement = LexRequirement::new(exprs).unwrap();
205
206        vec![Some(OrderingRequirements::Hard(vec![requirement]))]
207    }
208
209    fn maintains_input_order(&self) -> Vec<bool> {
210        vec![true; self.children().len()]
211    }
212
213    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
214        vec![&self.input]
215    }
216
217    fn with_new_children(
218        self: Arc<Self>,
219        children: Vec<Arc<dyn ExecutionPlan>>,
220    ) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
221        assert!(!children.is_empty());
222        Ok(Arc::new(Self {
223            tag_columns: self.tag_columns.clone(),
224            time_index_column: self.time_index_column.clone(),
225            input: children[0].clone(),
226            metric: self.metric.clone(),
227        }))
228    }
229
230    fn execute(
231        &self,
232        partition: usize,
233        context: Arc<TaskContext>,
234    ) -> DataFusionResult<SendableRecordBatchStream> {
235        let baseline_metric = BaselineMetrics::new(&self.metric, partition);
236        let metrics_builder = MetricBuilder::new(&self.metric);
237        let num_series = Count::new();
238        metrics_builder
239            .with_partition(partition)
240            .build(MetricValue::Count {
241                name: METRIC_NUM_SERIES.into(),
242                count: num_series.clone(),
243            });
244
245        let input = self.input.execute(partition, context)?;
246        let schema = input.schema();
247        let tag_indices = self
248            .tag_columns
249            .iter()
250            .map(|tag| {
251                schema
252                    .column_with_name(tag)
253                    .unwrap_or_else(|| panic!("tag column not found {tag}"))
254                    .0
255            })
256            .collect();
257        Ok(Box::pin(SeriesDivideStream {
258            tag_indices,
259            buffer: vec![],
260            schema,
261            input,
262            metric: baseline_metric,
263            num_series,
264            inspect_start: 0,
265        }))
266    }
267
268    fn metrics(&self) -> Option<MetricsSet> {
269        Some(self.metric.clone_inner())
270    }
271
272    fn name(&self) -> &str {
273        "SeriesDivideExec"
274    }
275}
276
277impl DisplayAs for SeriesDivideExec {
278    fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
279        match t {
280            DisplayFormatType::Default
281            | DisplayFormatType::Verbose
282            | DisplayFormatType::TreeRender => {
283                write!(f, "PromSeriesDivideExec: tags={:?}", self.tag_columns)
284            }
285        }
286    }
287}
288
289/// Assume the input stream is ordered on the tag columns.
290pub struct SeriesDivideStream {
291    tag_indices: Vec<usize>,
292    buffer: Vec<RecordBatch>,
293    schema: SchemaRef,
294    input: SendableRecordBatchStream,
295    metric: BaselineMetrics,
296    /// Index of buffered batches to start inspect next time.
297    inspect_start: usize,
298    /// Number of series processed.
299    num_series: Count,
300}
301
302impl RecordBatchStream for SeriesDivideStream {
303    fn schema(&self) -> SchemaRef {
304        self.schema.clone()
305    }
306}
307
308impl Stream for SeriesDivideStream {
309    type Item = DataFusionResult<RecordBatch>;
310
311    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
312        loop {
313            if !self.buffer.is_empty() {
314                let timer = std::time::Instant::now();
315                let cut_at = match self.find_first_diff_row() {
316                    Ok(cut_at) => cut_at,
317                    Err(e) => return Poll::Ready(Some(Err(e))),
318                };
319                if let Some((batch_index, row_index)) = cut_at {
320                    // slice out the first time series and return it.
321                    let half_batch_of_first_series =
322                        self.buffer[batch_index].slice(0, row_index + 1);
323                    let half_batch_of_second_series = self.buffer[batch_index].slice(
324                        row_index + 1,
325                        self.buffer[batch_index].num_rows() - row_index - 1,
326                    );
327                    let result_batches = self
328                        .buffer
329                        .drain(0..batch_index)
330                        .chain([half_batch_of_first_series])
331                        .collect::<Vec<_>>();
332                    if half_batch_of_second_series.num_rows() > 0 {
333                        self.buffer[0] = half_batch_of_second_series;
334                    } else {
335                        self.buffer.remove(0);
336                    }
337                    let result_batch = compute::concat_batches(&self.schema, &result_batches)?;
338
339                    self.inspect_start = 0;
340                    self.num_series.add(1);
341                    self.metric.elapsed_compute().add_elapsed(timer);
342                    return Poll::Ready(Some(Ok(result_batch)));
343                } else {
344                    self.metric.elapsed_compute().add_elapsed(timer);
345                    // continue to fetch next batch as the current buffer only contains one time series.
346                    let next_batch = ready!(self.as_mut().fetch_next_batch(cx)).transpose()?;
347                    let timer = std::time::Instant::now();
348                    if let Some(next_batch) = next_batch {
349                        if next_batch.num_rows() != 0 {
350                            self.buffer.push(next_batch);
351                        }
352                        continue;
353                    } else {
354                        // input stream is ended
355                        let result = compute::concat_batches(&self.schema, &self.buffer)?;
356                        self.buffer.clear();
357                        self.inspect_start = 0;
358                        self.num_series.add(1);
359                        self.metric.elapsed_compute().add_elapsed(timer);
360                        return Poll::Ready(Some(Ok(result)));
361                    }
362                }
363            } else {
364                let batch = match ready!(self.as_mut().fetch_next_batch(cx)) {
365                    Some(Ok(batch)) => batch,
366                    None => {
367                        PROMQL_SERIES_COUNT.observe(self.num_series.value() as f64);
368                        return Poll::Ready(None);
369                    }
370                    error => return Poll::Ready(error),
371                };
372                self.buffer.push(batch);
373                continue;
374            }
375        }
376    }
377}
378
379impl SeriesDivideStream {
380    fn fetch_next_batch(
381        mut self: Pin<&mut Self>,
382        cx: &mut Context<'_>,
383    ) -> Poll<Option<DataFusionResult<RecordBatch>>> {
384        let poll = self.input.poll_next_unpin(cx);
385        self.metric.record_poll(poll)
386    }
387
388    /// Return the position to cut buffer.
389    /// None implies the current buffer only contains one time series.
390    fn find_first_diff_row(&mut self) -> DataFusionResult<Option<(usize, usize)>> {
391        // fast path: no tag columns means all data belongs to the same series.
392        if self.tag_indices.is_empty() {
393            return Ok(None);
394        }
395
396        let mut resumed_batch_index = self.inspect_start;
397
398        for batch in &self.buffer[resumed_batch_index..] {
399            let num_rows = batch.num_rows();
400            let mut result_index = num_rows;
401
402            // check if the first row is the same with last batch's last row
403            if resumed_batch_index > self.inspect_start.checked_sub(1).unwrap_or_default() {
404                let last_batch = &self.buffer[resumed_batch_index - 1];
405                let last_row = last_batch.num_rows() - 1;
406                for index in &self.tag_indices {
407                    let current_array = batch.column(*index);
408                    let last_array = last_batch.column(*index);
409                    let current_string_array = current_array
410                        .as_any()
411                        .downcast_ref::<StringArray>()
412                        .ok_or_else(|| {
413                            datafusion::error::DataFusionError::Internal(
414                                "Failed to downcast tag column to StringArray".to_string(),
415                            )
416                        })?;
417                    let last_string_array = last_array
418                        .as_any()
419                        .downcast_ref::<StringArray>()
420                        .ok_or_else(|| {
421                            datafusion::error::DataFusionError::Internal(
422                                "Failed to downcast tag column to StringArray".to_string(),
423                            )
424                        })?;
425                    let current_value = current_string_array.value(0);
426                    let last_value = last_string_array.value(last_row);
427                    if current_value != last_value {
428                        return Ok(Some((resumed_batch_index - 1, last_batch.num_rows() - 1)));
429                    }
430                }
431            }
432
433            // quick check if all rows are the same by comparing the first and last row in this batch
434            let mut all_same = true;
435            for index in &self.tag_indices {
436                let array = batch.column(*index);
437                let string_array =
438                    array
439                        .as_any()
440                        .downcast_ref::<StringArray>()
441                        .ok_or_else(|| {
442                            datafusion::error::DataFusionError::Internal(
443                                "Failed to downcast tag column to StringArray".to_string(),
444                            )
445                        })?;
446                if string_array.value(0) != string_array.value(num_rows - 1) {
447                    all_same = false;
448                    break;
449                }
450            }
451            if all_same {
452                resumed_batch_index += 1;
453                continue;
454            }
455
456            // check column by column
457            for index in &self.tag_indices {
458                let array = batch.column(*index);
459                let string_array =
460                    array
461                        .as_any()
462                        .downcast_ref::<StringArray>()
463                        .ok_or_else(|| {
464                            datafusion::error::DataFusionError::Internal(
465                                "Failed to downcast tag column to StringArray".to_string(),
466                            )
467                        })?;
468                // the first row number that not equal to the next row.
469                let mut same_until = 0;
470                while same_until < num_rows - 1 {
471                    if string_array.value(same_until) != string_array.value(same_until + 1) {
472                        break;
473                    }
474                    same_until += 1;
475                }
476                result_index = result_index.min(same_until);
477            }
478
479            if result_index + 1 >= num_rows {
480                // all rows are the same, inspect next batch
481                resumed_batch_index += 1;
482            } else {
483                return Ok(Some((resumed_batch_index, result_index)));
484            }
485        }
486
487        self.inspect_start = resumed_batch_index;
488        Ok(None)
489    }
490}
491
492#[cfg(test)]
493mod test {
494    use datafusion::arrow::datatypes::{DataType, Field, Schema};
495    use datafusion::datasource::memory::MemorySourceConfig;
496    use datafusion::datasource::source::DataSourceExec;
497    use datafusion::prelude::SessionContext;
498
499    use super::*;
500
501    fn prepare_test_data() -> DataSourceExec {
502        let schema = Arc::new(Schema::new(vec![
503            Field::new("host", DataType::Utf8, true),
504            Field::new("path", DataType::Utf8, true),
505            Field::new(
506                "time_index",
507                DataType::Timestamp(datafusion::arrow::datatypes::TimeUnit::Millisecond, None),
508                false,
509            ),
510        ]));
511
512        let path_column_1 = Arc::new(StringArray::from(vec![
513            "foo", "foo", "foo", "bar", "bar", "bar", "bar", "bar", "bar", "bla", "bla", "bla",
514        ])) as _;
515        let host_column_1 = Arc::new(StringArray::from(vec![
516            "000", "000", "001", "002", "002", "002", "002", "002", "003", "005", "005", "005",
517        ])) as _;
518        let time_index_column_1 = Arc::new(
519            datafusion::arrow::array::TimestampMillisecondArray::from(vec![
520                1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000, 10000, 11000, 12000,
521            ]),
522        ) as _;
523
524        let path_column_2 = Arc::new(StringArray::from(vec!["bla", "bla", "bla"])) as _;
525        let host_column_2 = Arc::new(StringArray::from(vec!["005", "005", "005"])) as _;
526        let time_index_column_2 = Arc::new(
527            datafusion::arrow::array::TimestampMillisecondArray::from(vec![13000, 14000, 15000]),
528        ) as _;
529
530        let path_column_3 = Arc::new(StringArray::from(vec![
531            "bla", "🥺", "🥺", "🥺", "🥺", "🥺", "🫠", "🫠",
532        ])) as _;
533        let host_column_3 = Arc::new(StringArray::from(vec![
534            "005", "001", "001", "001", "001", "001", "001", "001",
535        ])) as _;
536        let time_index_column_3 =
537            Arc::new(datafusion::arrow::array::TimestampMillisecondArray::from(
538                vec![16000, 17000, 18000, 19000, 20000, 21000, 22000, 23000],
539            )) as _;
540
541        let data_1 = RecordBatch::try_new(
542            schema.clone(),
543            vec![path_column_1, host_column_1, time_index_column_1],
544        )
545        .unwrap();
546        let data_2 = RecordBatch::try_new(
547            schema.clone(),
548            vec![path_column_2, host_column_2, time_index_column_2],
549        )
550        .unwrap();
551        let data_3 = RecordBatch::try_new(
552            schema.clone(),
553            vec![path_column_3, host_column_3, time_index_column_3],
554        )
555        .unwrap();
556
557        DataSourceExec::new(Arc::new(
558            MemorySourceConfig::try_new(&[vec![data_1, data_2, data_3]], schema, None).unwrap(),
559        ))
560    }
561
562    #[tokio::test]
563    async fn overall_data() {
564        let memory_exec = Arc::new(prepare_test_data());
565        let divide_exec = Arc::new(SeriesDivideExec {
566            tag_columns: vec!["host".to_string(), "path".to_string()],
567            time_index_column: "time_index".to_string(),
568            input: memory_exec,
569            metric: ExecutionPlanMetricsSet::new(),
570        });
571        let session_context = SessionContext::default();
572        let result = datafusion::physical_plan::collect(divide_exec, session_context.task_ctx())
573            .await
574            .unwrap();
575        let result_literal = datatypes::arrow::util::pretty::pretty_format_batches(&result)
576            .unwrap()
577            .to_string();
578
579        let expected = String::from(
580            "+------+------+---------------------+\
581            \n| host | path | time_index          |\
582            \n+------+------+---------------------+\
583            \n| foo  | 000  | 1970-01-01T00:00:01 |\
584            \n| foo  | 000  | 1970-01-01T00:00:02 |\
585            \n| foo  | 001  | 1970-01-01T00:00:03 |\
586            \n| bar  | 002  | 1970-01-01T00:00:04 |\
587            \n| bar  | 002  | 1970-01-01T00:00:05 |\
588            \n| bar  | 002  | 1970-01-01T00:00:06 |\
589            \n| bar  | 002  | 1970-01-01T00:00:07 |\
590            \n| bar  | 002  | 1970-01-01T00:00:08 |\
591            \n| bar  | 003  | 1970-01-01T00:00:09 |\
592            \n| bla  | 005  | 1970-01-01T00:00:10 |\
593            \n| bla  | 005  | 1970-01-01T00:00:11 |\
594            \n| bla  | 005  | 1970-01-01T00:00:12 |\
595            \n| bla  | 005  | 1970-01-01T00:00:13 |\
596            \n| bla  | 005  | 1970-01-01T00:00:14 |\
597            \n| bla  | 005  | 1970-01-01T00:00:15 |\
598            \n| bla  | 005  | 1970-01-01T00:00:16 |\
599            \n| 🥺   | 001  | 1970-01-01T00:00:17 |\
600            \n| 🥺   | 001  | 1970-01-01T00:00:18 |\
601            \n| 🥺   | 001  | 1970-01-01T00:00:19 |\
602            \n| 🥺   | 001  | 1970-01-01T00:00:20 |\
603            \n| 🥺   | 001  | 1970-01-01T00:00:21 |\
604            \n| 🫠   | 001  | 1970-01-01T00:00:22 |\
605            \n| 🫠   | 001  | 1970-01-01T00:00:23 |\
606            \n+------+------+---------------------+",
607        );
608        assert_eq!(result_literal, expected);
609    }
610
611    #[tokio::test]
612    async fn per_batch_data() {
613        let memory_exec = Arc::new(prepare_test_data());
614        let divide_exec = Arc::new(SeriesDivideExec {
615            tag_columns: vec!["host".to_string(), "path".to_string()],
616            time_index_column: "time_index".to_string(),
617            input: memory_exec,
618            metric: ExecutionPlanMetricsSet::new(),
619        });
620        let mut divide_stream = divide_exec
621            .execute(0, SessionContext::default().task_ctx())
622            .unwrap();
623
624        let mut expectations = vec![
625            String::from(
626                "+------+------+---------------------+\
627                \n| host | path | time_index          |\
628                \n+------+------+---------------------+\
629                \n| foo  | 000  | 1970-01-01T00:00:01 |\
630                \n| foo  | 000  | 1970-01-01T00:00:02 |\
631                \n+------+------+---------------------+",
632            ),
633            String::from(
634                "+------+------+---------------------+\
635                \n| host | path | time_index          |\
636                \n+------+------+---------------------+\
637                \n| foo  | 001  | 1970-01-01T00:00:03 |\
638                \n+------+------+---------------------+",
639            ),
640            String::from(
641                "+------+------+---------------------+\
642                \n| host | path | time_index          |\
643                \n+------+------+---------------------+\
644                \n| bar  | 002  | 1970-01-01T00:00:04 |\
645                \n| bar  | 002  | 1970-01-01T00:00:05 |\
646                \n| bar  | 002  | 1970-01-01T00:00:06 |\
647                \n| bar  | 002  | 1970-01-01T00:00:07 |\
648                \n| bar  | 002  | 1970-01-01T00:00:08 |\
649                \n+------+------+---------------------+",
650            ),
651            String::from(
652                "+------+------+---------------------+\
653                \n| host | path | time_index          |\
654                \n+------+------+---------------------+\
655                \n| bar  | 003  | 1970-01-01T00:00:09 |\
656                \n+------+------+---------------------+",
657            ),
658            String::from(
659                "+------+------+---------------------+\
660                \n| host | path | time_index          |\
661                \n+------+------+---------------------+\
662                \n| bla  | 005  | 1970-01-01T00:00:10 |\
663                \n| bla  | 005  | 1970-01-01T00:00:11 |\
664                \n| bla  | 005  | 1970-01-01T00:00:12 |\
665                \n| bla  | 005  | 1970-01-01T00:00:13 |\
666                \n| bla  | 005  | 1970-01-01T00:00:14 |\
667                \n| bla  | 005  | 1970-01-01T00:00:15 |\
668                \n| bla  | 005  | 1970-01-01T00:00:16 |\
669                \n+------+------+---------------------+",
670            ),
671            String::from(
672                "+------+------+---------------------+\
673                \n| host | path | time_index          |\
674                \n+------+------+---------------------+\
675                \n| 🥺   | 001  | 1970-01-01T00:00:17 |\
676                \n| 🥺   | 001  | 1970-01-01T00:00:18 |\
677                \n| 🥺   | 001  | 1970-01-01T00:00:19 |\
678                \n| 🥺   | 001  | 1970-01-01T00:00:20 |\
679                \n| 🥺   | 001  | 1970-01-01T00:00:21 |\
680                \n+------+------+---------------------+",
681            ),
682            String::from(
683                "+------+------+---------------------+\
684                \n| host | path | time_index          |\
685                \n+------+------+---------------------+\
686                \n| 🫠   | 001  | 1970-01-01T00:00:22 |\
687                \n| 🫠   | 001  | 1970-01-01T00:00:23 |\
688                \n+------+------+---------------------+",
689            ),
690        ];
691        expectations.reverse();
692
693        while let Some(batch) = divide_stream.next().await {
694            let formatted =
695                datatypes::arrow::util::pretty::pretty_format_batches(&[batch.unwrap()])
696                    .unwrap()
697                    .to_string();
698            let expected = expectations.pop().unwrap();
699            assert_eq!(formatted, expected);
700        }
701    }
702
703    #[tokio::test]
704    async fn test_all_batches_same_combination() {
705        // Create a schema with host and path columns, same as prepare_test_data
706        let schema = Arc::new(Schema::new(vec![
707            Field::new("host", DataType::Utf8, true),
708            Field::new("path", DataType::Utf8, true),
709            Field::new(
710                "time_index",
711                DataType::Timestamp(datafusion::arrow::datatypes::TimeUnit::Millisecond, None),
712                false,
713            ),
714        ]));
715
716        // Create batches with three different combinations
717        // Each batch contains only one combination
718        // Batches with the same combination are adjacent
719
720        // First combination: "server1", "/var/log"
721        let batch1 = RecordBatch::try_new(
722            schema.clone(),
723            vec![
724                Arc::new(StringArray::from(vec!["server1", "server1", "server1"])) as _,
725                Arc::new(StringArray::from(vec!["/var/log", "/var/log", "/var/log"])) as _,
726                Arc::new(datafusion::arrow::array::TimestampMillisecondArray::from(
727                    vec![1000, 2000, 3000],
728                )) as _,
729            ],
730        )
731        .unwrap();
732
733        let batch2 = RecordBatch::try_new(
734            schema.clone(),
735            vec![
736                Arc::new(StringArray::from(vec!["server1", "server1"])) as _,
737                Arc::new(StringArray::from(vec!["/var/log", "/var/log"])) as _,
738                Arc::new(datafusion::arrow::array::TimestampMillisecondArray::from(
739                    vec![4000, 5000],
740                )) as _,
741            ],
742        )
743        .unwrap();
744
745        // Second combination: "server2", "/var/data"
746        let batch3 = RecordBatch::try_new(
747            schema.clone(),
748            vec![
749                Arc::new(StringArray::from(vec!["server2", "server2", "server2"])) as _,
750                Arc::new(StringArray::from(vec![
751                    "/var/data",
752                    "/var/data",
753                    "/var/data",
754                ])) as _,
755                Arc::new(datafusion::arrow::array::TimestampMillisecondArray::from(
756                    vec![6000, 7000, 8000],
757                )) as _,
758            ],
759        )
760        .unwrap();
761
762        let batch4 = RecordBatch::try_new(
763            schema.clone(),
764            vec![
765                Arc::new(StringArray::from(vec!["server2"])) as _,
766                Arc::new(StringArray::from(vec!["/var/data"])) as _,
767                Arc::new(datafusion::arrow::array::TimestampMillisecondArray::from(
768                    vec![9000],
769                )) as _,
770            ],
771        )
772        .unwrap();
773
774        // Third combination: "server3", "/opt/logs"
775        let batch5 = RecordBatch::try_new(
776            schema.clone(),
777            vec![
778                Arc::new(StringArray::from(vec!["server3", "server3"])) as _,
779                Arc::new(StringArray::from(vec!["/opt/logs", "/opt/logs"])) as _,
780                Arc::new(datafusion::arrow::array::TimestampMillisecondArray::from(
781                    vec![10000, 11000],
782                )) as _,
783            ],
784        )
785        .unwrap();
786
787        let batch6 = RecordBatch::try_new(
788            schema.clone(),
789            vec![
790                Arc::new(StringArray::from(vec!["server3", "server3", "server3"])) as _,
791                Arc::new(StringArray::from(vec![
792                    "/opt/logs",
793                    "/opt/logs",
794                    "/opt/logs",
795                ])) as _,
796                Arc::new(datafusion::arrow::array::TimestampMillisecondArray::from(
797                    vec![12000, 13000, 14000],
798                )) as _,
799            ],
800        )
801        .unwrap();
802
803        // Create MemoryExec with these batches, keeping same combinations adjacent
804        let memory_exec = DataSourceExec::from_data_source(
805            MemorySourceConfig::try_new(
806                &[vec![batch1, batch2, batch3, batch4, batch5, batch6]],
807                schema.clone(),
808                None,
809            )
810            .unwrap(),
811        );
812
813        // Create SeriesDivideExec
814        let divide_exec = Arc::new(SeriesDivideExec {
815            tag_columns: vec!["host".to_string(), "path".to_string()],
816            time_index_column: "time_index".to_string(),
817            input: memory_exec,
818            metric: ExecutionPlanMetricsSet::new(),
819        });
820
821        // Execute the division
822        let session_context = SessionContext::default();
823        let result =
824            datafusion::physical_plan::collect(divide_exec.clone(), session_context.task_ctx())
825                .await
826                .unwrap();
827
828        // Verify that we got 3 batches (one for each combination)
829        assert_eq!(result.len(), 3);
830
831        // First batch should have 5 rows (3 + 2 from the "server1" combination)
832        assert_eq!(result[0].num_rows(), 5);
833
834        // Second batch should have 4 rows (3 + 1 from the "server2" combination)
835        assert_eq!(result[1].num_rows(), 4);
836
837        // Third batch should have 5 rows (2 + 3 from the "server3" combination)
838        assert_eq!(result[2].num_rows(), 5);
839
840        // Verify values in first batch (server1, /var/log)
841        let host_array1 = result[0]
842            .column(0)
843            .as_any()
844            .downcast_ref::<StringArray>()
845            .unwrap();
846        let path_array1 = result[0]
847            .column(1)
848            .as_any()
849            .downcast_ref::<StringArray>()
850            .unwrap();
851        let time_index_array1 = result[0]
852            .column(2)
853            .as_any()
854            .downcast_ref::<datafusion::arrow::array::TimestampMillisecondArray>()
855            .unwrap();
856
857        for i in 0..5 {
858            assert_eq!(host_array1.value(i), "server1");
859            assert_eq!(path_array1.value(i), "/var/log");
860            assert_eq!(time_index_array1.value(i), 1000 + (i as i64) * 1000);
861        }
862
863        // Verify values in second batch (server2, /var/data)
864        let host_array2 = result[1]
865            .column(0)
866            .as_any()
867            .downcast_ref::<StringArray>()
868            .unwrap();
869        let path_array2 = result[1]
870            .column(1)
871            .as_any()
872            .downcast_ref::<StringArray>()
873            .unwrap();
874        let time_index_array2 = result[1]
875            .column(2)
876            .as_any()
877            .downcast_ref::<datafusion::arrow::array::TimestampMillisecondArray>()
878            .unwrap();
879
880        for i in 0..4 {
881            assert_eq!(host_array2.value(i), "server2");
882            assert_eq!(path_array2.value(i), "/var/data");
883            assert_eq!(time_index_array2.value(i), 6000 + (i as i64) * 1000);
884        }
885
886        // Verify values in third batch (server3, /opt/logs)
887        let host_array3 = result[2]
888            .column(0)
889            .as_any()
890            .downcast_ref::<StringArray>()
891            .unwrap();
892        let path_array3 = result[2]
893            .column(1)
894            .as_any()
895            .downcast_ref::<StringArray>()
896            .unwrap();
897        let time_index_array3 = result[2]
898            .column(2)
899            .as_any()
900            .downcast_ref::<datafusion::arrow::array::TimestampMillisecondArray>()
901            .unwrap();
902
903        for i in 0..5 {
904            assert_eq!(host_array3.value(i), "server3");
905            assert_eq!(path_array3.value(i), "/opt/logs");
906            assert_eq!(time_index_array3.value(i), 10000 + (i as i64) * 1000);
907        }
908
909        // Also verify streaming behavior
910        let mut divide_stream = divide_exec
911            .execute(0, SessionContext::default().task_ctx())
912            .unwrap();
913
914        // Should produce three batches, one for each combination
915        let batch1 = divide_stream.next().await.unwrap().unwrap();
916        assert_eq!(batch1.num_rows(), 5); // server1 combination
917
918        let batch2 = divide_stream.next().await.unwrap().unwrap();
919        assert_eq!(batch2.num_rows(), 4); // server2 combination
920
921        let batch3 = divide_stream.next().await.unwrap().unwrap();
922        assert_eq!(batch3.num_rows(), 5); // server3 combination
923
924        // No more batches should be produced
925        assert!(divide_stream.next().await.is_none());
926    }
927}