promql/extension_plan/
series_divide.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16use std::pin::Pin;
17use std::sync::Arc;
18use std::task::{Context, Poll};
19
20use datafusion::arrow::array::{Array, StringArray};
21use datafusion::arrow::datatypes::SchemaRef;
22use datafusion::arrow::record_batch::RecordBatch;
23use datafusion::common::{DFSchema, DFSchemaRef};
24use datafusion::error::Result as DataFusionResult;
25use datafusion::execution::context::TaskContext;
26use datafusion::logical_expr::{EmptyRelation, Expr, LogicalPlan, UserDefinedLogicalNodeCore};
27use datafusion::physical_expr::{LexRequirement, PhysicalSortRequirement};
28use datafusion::physical_plan::expressions::Column as ColumnExpr;
29use datafusion::physical_plan::metrics::{
30    BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, MetricValue, MetricsSet,
31};
32use datafusion::physical_plan::{
33    DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, PlanProperties, RecordBatchStream,
34    SendableRecordBatchStream,
35};
36use datatypes::arrow::compute;
37use datatypes::compute::SortOptions;
38use futures::{ready, Stream, StreamExt};
39use greptime_proto::substrait_extension as pb;
40use prost::Message;
41use snafu::ResultExt;
42
43use crate::error::{DeserializeSnafu, Result};
44use crate::extension_plan::METRIC_NUM_SERIES;
45use crate::metrics::PROMQL_SERIES_COUNT;
46
47#[derive(Debug, PartialEq, Eq, Hash, PartialOrd)]
48pub struct SeriesDivide {
49    tag_columns: Vec<String>,
50    input: LogicalPlan,
51}
52
53impl UserDefinedLogicalNodeCore for SeriesDivide {
54    fn name(&self) -> &str {
55        Self::name()
56    }
57
58    fn inputs(&self) -> Vec<&LogicalPlan> {
59        vec![&self.input]
60    }
61
62    fn schema(&self) -> &DFSchemaRef {
63        self.input.schema()
64    }
65
66    fn expressions(&self) -> Vec<Expr> {
67        vec![]
68    }
69
70    fn fmt_for_explain(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
71        write!(f, "PromSeriesDivide: tags={:?}", self.tag_columns)
72    }
73
74    fn with_exprs_and_inputs(
75        &self,
76        _exprs: Vec<Expr>,
77        inputs: Vec<LogicalPlan>,
78    ) -> DataFusionResult<Self> {
79        if inputs.is_empty() {
80            return Err(datafusion::error::DataFusionError::Internal(
81                "SeriesDivide must have at least one input".to_string(),
82            ));
83        }
84
85        Ok(Self {
86            tag_columns: self.tag_columns.clone(),
87            input: inputs[0].clone(),
88        })
89    }
90}
91
92impl SeriesDivide {
93    pub fn new(tag_columns: Vec<String>, input: LogicalPlan) -> Self {
94        Self { tag_columns, input }
95    }
96
97    pub const fn name() -> &'static str {
98        "SeriesDivide"
99    }
100
101    pub fn to_execution_plan(&self, exec_input: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
102        Arc::new(SeriesDivideExec {
103            tag_columns: self.tag_columns.clone(),
104            input: exec_input,
105            metric: ExecutionPlanMetricsSet::new(),
106        })
107    }
108
109    pub fn tags(&self) -> &[String] {
110        &self.tag_columns
111    }
112
113    pub fn serialize(&self) -> Vec<u8> {
114        pb::SeriesDivide {
115            tag_columns: self.tag_columns.clone(),
116        }
117        .encode_to_vec()
118    }
119
120    pub fn deserialize(bytes: &[u8]) -> Result<Self> {
121        let pb_series_divide = pb::SeriesDivide::decode(bytes).context(DeserializeSnafu)?;
122        let placeholder_plan = LogicalPlan::EmptyRelation(EmptyRelation {
123            produce_one_row: false,
124            schema: Arc::new(DFSchema::empty()),
125        });
126        Ok(Self {
127            tag_columns: pb_series_divide.tag_columns,
128            input: placeholder_plan,
129        })
130    }
131}
132
133#[derive(Debug)]
134pub struct SeriesDivideExec {
135    tag_columns: Vec<String>,
136    input: Arc<dyn ExecutionPlan>,
137    metric: ExecutionPlanMetricsSet,
138}
139
140impl ExecutionPlan for SeriesDivideExec {
141    fn as_any(&self) -> &dyn Any {
142        self
143    }
144
145    fn schema(&self) -> SchemaRef {
146        self.input.schema()
147    }
148
149    fn properties(&self) -> &PlanProperties {
150        self.input.properties()
151    }
152
153    fn required_input_distribution(&self) -> Vec<Distribution> {
154        let schema = self.input.schema();
155        vec![Distribution::HashPartitioned(
156            self.tag_columns
157                .iter()
158                // Safety: the tag column names is verified in the planning phase
159                .map(|tag| Arc::new(ColumnExpr::new_with_schema(tag, &schema).unwrap()) as _)
160                .collect(),
161        )]
162    }
163
164    fn required_input_ordering(&self) -> Vec<Option<LexRequirement>> {
165        let input_schema = self.input.schema();
166        let exprs: Vec<PhysicalSortRequirement> = self
167            .tag_columns
168            .iter()
169            .map(|tag| PhysicalSortRequirement {
170                // Safety: the tag column names is verified in the planning phase
171                expr: Arc::new(ColumnExpr::new_with_schema(tag, &input_schema).unwrap()),
172                options: Some(SortOptions {
173                    descending: false,
174                    nulls_first: true,
175                }),
176            })
177            .collect();
178        if !exprs.is_empty() {
179            vec![Some(LexRequirement::new(exprs))]
180        } else {
181            vec![None]
182        }
183    }
184
185    fn maintains_input_order(&self) -> Vec<bool> {
186        vec![true; self.children().len()]
187    }
188
189    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
190        vec![&self.input]
191    }
192
193    fn with_new_children(
194        self: Arc<Self>,
195        children: Vec<Arc<dyn ExecutionPlan>>,
196    ) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
197        assert!(!children.is_empty());
198        Ok(Arc::new(Self {
199            tag_columns: self.tag_columns.clone(),
200            input: children[0].clone(),
201            metric: self.metric.clone(),
202        }))
203    }
204
205    fn execute(
206        &self,
207        partition: usize,
208        context: Arc<TaskContext>,
209    ) -> DataFusionResult<SendableRecordBatchStream> {
210        let baseline_metric = BaselineMetrics::new(&self.metric, partition);
211        let metrics_builder = MetricBuilder::new(&self.metric);
212        let num_series = Count::new();
213        metrics_builder
214            .with_partition(partition)
215            .build(MetricValue::Count {
216                name: METRIC_NUM_SERIES.into(),
217                count: num_series.clone(),
218            });
219
220        let input = self.input.execute(partition, context)?;
221        let schema = input.schema();
222        let tag_indices = self
223            .tag_columns
224            .iter()
225            .map(|tag| {
226                schema
227                    .column_with_name(tag)
228                    .unwrap_or_else(|| panic!("tag column not found {tag}"))
229                    .0
230            })
231            .collect();
232        Ok(Box::pin(SeriesDivideStream {
233            tag_indices,
234            buffer: vec![],
235            schema,
236            input,
237            metric: baseline_metric,
238            num_series,
239            inspect_start: 0,
240        }))
241    }
242
243    fn metrics(&self) -> Option<MetricsSet> {
244        Some(self.metric.clone_inner())
245    }
246
247    fn name(&self) -> &str {
248        "SeriesDivideExec"
249    }
250}
251
252impl DisplayAs for SeriesDivideExec {
253    fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
254        match t {
255            DisplayFormatType::Default | DisplayFormatType::Verbose => {
256                write!(f, "PromSeriesDivideExec: tags={:?}", self.tag_columns)
257            }
258        }
259    }
260}
261
262/// Assume the input stream is ordered on the tag columns.
263pub struct SeriesDivideStream {
264    tag_indices: Vec<usize>,
265    buffer: Vec<RecordBatch>,
266    schema: SchemaRef,
267    input: SendableRecordBatchStream,
268    metric: BaselineMetrics,
269    /// Index of buffered batches to start inspect next time.
270    inspect_start: usize,
271    /// Number of series processed.
272    num_series: Count,
273}
274
275impl RecordBatchStream for SeriesDivideStream {
276    fn schema(&self) -> SchemaRef {
277        self.schema.clone()
278    }
279}
280
281impl Stream for SeriesDivideStream {
282    type Item = DataFusionResult<RecordBatch>;
283
284    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
285        loop {
286            if !self.buffer.is_empty() {
287                let timer = std::time::Instant::now();
288                let cut_at = match self.find_first_diff_row() {
289                    Ok(cut_at) => cut_at,
290                    Err(e) => return Poll::Ready(Some(Err(e))),
291                };
292                if let Some((batch_index, row_index)) = cut_at {
293                    // slice out the first time series and return it.
294                    let half_batch_of_first_series =
295                        self.buffer[batch_index].slice(0, row_index + 1);
296                    let half_batch_of_second_series = self.buffer[batch_index].slice(
297                        row_index + 1,
298                        self.buffer[batch_index].num_rows() - row_index - 1,
299                    );
300                    let result_batches = self
301                        .buffer
302                        .drain(0..batch_index)
303                        .chain([half_batch_of_first_series])
304                        .collect::<Vec<_>>();
305                    if half_batch_of_second_series.num_rows() > 0 {
306                        self.buffer[0] = half_batch_of_second_series;
307                    } else {
308                        self.buffer.remove(0);
309                    }
310                    let result_batch = compute::concat_batches(&self.schema, &result_batches)?;
311
312                    self.inspect_start = 0;
313                    self.num_series.add(1);
314                    self.metric.elapsed_compute().add_elapsed(timer);
315                    return Poll::Ready(Some(Ok(result_batch)));
316                } else {
317                    self.metric.elapsed_compute().add_elapsed(timer);
318                    // continue to fetch next batch as the current buffer only contains one time series.
319                    let next_batch = ready!(self.as_mut().fetch_next_batch(cx)).transpose()?;
320                    let timer = std::time::Instant::now();
321                    if let Some(next_batch) = next_batch {
322                        if next_batch.num_rows() != 0 {
323                            self.buffer.push(next_batch);
324                        }
325                        continue;
326                    } else {
327                        // input stream is ended
328                        let result = compute::concat_batches(&self.schema, &self.buffer)?;
329                        self.buffer.clear();
330                        self.inspect_start = 0;
331                        self.num_series.add(1);
332                        self.metric.elapsed_compute().add_elapsed(timer);
333                        return Poll::Ready(Some(Ok(result)));
334                    }
335                }
336            } else {
337                let batch = match ready!(self.as_mut().fetch_next_batch(cx)) {
338                    Some(Ok(batch)) => batch,
339                    None => {
340                        PROMQL_SERIES_COUNT.observe(self.num_series.value() as f64);
341                        return Poll::Ready(None);
342                    }
343                    error => return Poll::Ready(error),
344                };
345                self.buffer.push(batch);
346                continue;
347            }
348        }
349    }
350}
351
352impl SeriesDivideStream {
353    fn fetch_next_batch(
354        mut self: Pin<&mut Self>,
355        cx: &mut Context<'_>,
356    ) -> Poll<Option<DataFusionResult<RecordBatch>>> {
357        let poll = self.input.poll_next_unpin(cx);
358        self.metric.record_poll(poll)
359    }
360
361    /// Return the position to cut buffer.
362    /// None implies the current buffer only contains one time series.
363    fn find_first_diff_row(&mut self) -> DataFusionResult<Option<(usize, usize)>> {
364        // fast path: no tag columns means all data belongs to the same series.
365        if self.tag_indices.is_empty() {
366            return Ok(None);
367        }
368
369        let mut resumed_batch_index = self.inspect_start;
370
371        for batch in &self.buffer[resumed_batch_index..] {
372            let num_rows = batch.num_rows();
373            let mut result_index = num_rows;
374
375            // check if the first row is the same with last batch's last row
376            if resumed_batch_index > self.inspect_start.checked_sub(1).unwrap_or_default() {
377                let last_batch = &self.buffer[resumed_batch_index - 1];
378                let last_row = last_batch.num_rows() - 1;
379                for index in &self.tag_indices {
380                    let current_array = batch.column(*index);
381                    let last_array = last_batch.column(*index);
382                    let current_string_array = current_array
383                        .as_any()
384                        .downcast_ref::<StringArray>()
385                        .ok_or_else(|| {
386                            datafusion::error::DataFusionError::Internal(
387                                "Failed to downcast tag column to StringArray".to_string(),
388                            )
389                        })?;
390                    let last_string_array = last_array
391                        .as_any()
392                        .downcast_ref::<StringArray>()
393                        .ok_or_else(|| {
394                            datafusion::error::DataFusionError::Internal(
395                                "Failed to downcast tag column to StringArray".to_string(),
396                            )
397                        })?;
398                    let current_value = current_string_array.value(0);
399                    let last_value = last_string_array.value(last_row);
400                    if current_value != last_value {
401                        return Ok(Some((resumed_batch_index - 1, last_batch.num_rows() - 1)));
402                    }
403                }
404            }
405
406            // quick check if all rows are the same by comparing the first and last row in this batch
407            let mut all_same = true;
408            for index in &self.tag_indices {
409                let array = batch.column(*index);
410                let string_array =
411                    array
412                        .as_any()
413                        .downcast_ref::<StringArray>()
414                        .ok_or_else(|| {
415                            datafusion::error::DataFusionError::Internal(
416                                "Failed to downcast tag column to StringArray".to_string(),
417                            )
418                        })?;
419                if string_array.value(0) != string_array.value(num_rows - 1) {
420                    all_same = false;
421                    break;
422                }
423            }
424            if all_same {
425                resumed_batch_index += 1;
426                continue;
427            }
428
429            // check column by column
430            for index in &self.tag_indices {
431                let array = batch.column(*index);
432                let string_array =
433                    array
434                        .as_any()
435                        .downcast_ref::<StringArray>()
436                        .ok_or_else(|| {
437                            datafusion::error::DataFusionError::Internal(
438                                "Failed to downcast tag column to StringArray".to_string(),
439                            )
440                        })?;
441                // the first row number that not equal to the next row.
442                let mut same_until = 0;
443                while same_until < num_rows - 1 {
444                    if string_array.value(same_until) != string_array.value(same_until + 1) {
445                        break;
446                    }
447                    same_until += 1;
448                }
449                result_index = result_index.min(same_until);
450            }
451
452            if result_index + 1 >= num_rows {
453                // all rows are the same, inspect next batch
454                resumed_batch_index += 1;
455            } else {
456                return Ok(Some((resumed_batch_index, result_index)));
457            }
458        }
459
460        self.inspect_start = resumed_batch_index;
461        Ok(None)
462    }
463}
464
465#[cfg(test)]
466mod test {
467    use datafusion::arrow::datatypes::{DataType, Field, Schema};
468    use datafusion::physical_plan::memory::MemoryExec;
469    use datafusion::prelude::SessionContext;
470
471    use super::*;
472
473    fn prepare_test_data() -> MemoryExec {
474        let schema = Arc::new(Schema::new(vec![
475            Field::new("host", DataType::Utf8, true),
476            Field::new("path", DataType::Utf8, true),
477        ]));
478
479        let path_column_1 = Arc::new(StringArray::from(vec![
480            "foo", "foo", "foo", "bar", "bar", "bar", "bar", "bar", "bar", "bla", "bla", "bla",
481        ])) as _;
482        let host_column_1 = Arc::new(StringArray::from(vec![
483            "000", "000", "001", "002", "002", "002", "002", "002", "003", "005", "005", "005",
484        ])) as _;
485
486        let path_column_2 = Arc::new(StringArray::from(vec!["bla", "bla", "bla"])) as _;
487        let host_column_2 = Arc::new(StringArray::from(vec!["005", "005", "005"])) as _;
488
489        let path_column_3 = Arc::new(StringArray::from(vec![
490            "bla", "🥺", "🥺", "🥺", "🥺", "🥺", "🫠", "🫠",
491        ])) as _;
492        let host_column_3 = Arc::new(StringArray::from(vec![
493            "005", "001", "001", "001", "001", "001", "001", "001",
494        ])) as _;
495
496        let data_1 =
497            RecordBatch::try_new(schema.clone(), vec![path_column_1, host_column_1]).unwrap();
498        let data_2 =
499            RecordBatch::try_new(schema.clone(), vec![path_column_2, host_column_2]).unwrap();
500        let data_3 =
501            RecordBatch::try_new(schema.clone(), vec![path_column_3, host_column_3]).unwrap();
502
503        MemoryExec::try_new(&[vec![data_1, data_2, data_3]], schema, None).unwrap()
504    }
505
506    #[tokio::test]
507    async fn overall_data() {
508        let memory_exec = Arc::new(prepare_test_data());
509        let divide_exec = Arc::new(SeriesDivideExec {
510            tag_columns: vec!["host".to_string(), "path".to_string()],
511            input: memory_exec,
512            metric: ExecutionPlanMetricsSet::new(),
513        });
514        let session_context = SessionContext::default();
515        let result = datafusion::physical_plan::collect(divide_exec, session_context.task_ctx())
516            .await
517            .unwrap();
518        let result_literal = datatypes::arrow::util::pretty::pretty_format_batches(&result)
519            .unwrap()
520            .to_string();
521
522        let expected = String::from(
523            "+------+------+\
524            \n| host | path |\
525            \n+------+------+\
526            \n| foo  | 000  |\
527            \n| foo  | 000  |\
528            \n| foo  | 001  |\
529            \n| bar  | 002  |\
530            \n| bar  | 002  |\
531            \n| bar  | 002  |\
532            \n| bar  | 002  |\
533            \n| bar  | 002  |\
534            \n| bar  | 003  |\
535            \n| bla  | 005  |\
536            \n| bla  | 005  |\
537            \n| bla  | 005  |\
538            \n| bla  | 005  |\
539            \n| bla  | 005  |\
540            \n| bla  | 005  |\
541            \n| bla  | 005  |\
542            \n| 🥺   | 001  |\
543            \n| 🥺   | 001  |\
544            \n| 🥺   | 001  |\
545            \n| 🥺   | 001  |\
546            \n| 🥺   | 001  |\
547            \n| 🫠   | 001  |\
548            \n| 🫠   | 001  |\
549            \n+------+------+",
550        );
551        assert_eq!(result_literal, expected);
552    }
553
554    #[tokio::test]
555    async fn per_batch_data() {
556        let memory_exec = Arc::new(prepare_test_data());
557        let divide_exec = Arc::new(SeriesDivideExec {
558            tag_columns: vec!["host".to_string(), "path".to_string()],
559            input: memory_exec,
560            metric: ExecutionPlanMetricsSet::new(),
561        });
562        let mut divide_stream = divide_exec
563            .execute(0, SessionContext::default().task_ctx())
564            .unwrap();
565
566        let mut expectations = vec![
567            String::from(
568                "+------+------+\
569                \n| host | path |\
570                \n+------+------+\
571                \n| foo  | 000  |\
572                \n| foo  | 000  |\
573                \n+------+------+",
574            ),
575            String::from(
576                "+------+------+\
577                \n| host | path |\
578                \n+------+------+\
579                \n| foo  | 001  |\
580                \n+------+------+",
581            ),
582            String::from(
583                "+------+------+\
584                \n| host | path |\
585                \n+------+------+\
586                \n| bar  | 002  |\
587                \n| bar  | 002  |\
588                \n| bar  | 002  |\
589                \n| bar  | 002  |\
590                \n| bar  | 002  |\
591                \n+------+------+",
592            ),
593            String::from(
594                "+------+------+\
595                \n| host | path |\
596                \n+------+------+\
597                \n| bar  | 003  |\
598                \n+------+------+",
599            ),
600            String::from(
601                "+------+------+\
602                \n| host | path |\
603                \n+------+------+\
604                \n| bla  | 005  |\
605                \n| bla  | 005  |\
606                \n| bla  | 005  |\
607                \n| bla  | 005  |\
608                \n| bla  | 005  |\
609                \n| bla  | 005  |\
610                \n| bla  | 005  |\
611                \n+------+------+",
612            ),
613            String::from(
614                "+------+------+\
615                \n| host | path |\
616                \n+------+------+\
617                \n| 🥺   | 001  |\
618                \n| 🥺   | 001  |\
619                \n| 🥺   | 001  |\
620                \n| 🥺   | 001  |\
621                \n| 🥺   | 001  |\
622                \n+------+------+",
623            ),
624            String::from(
625                "+------+------+\
626                \n| host | path |\
627                \n+------+------+\
628                \n| 🫠   | 001  |\
629                \n| 🫠   | 001  |\
630                \n+------+------+",
631            ),
632        ];
633        expectations.reverse();
634
635        while let Some(batch) = divide_stream.next().await {
636            let formatted =
637                datatypes::arrow::util::pretty::pretty_format_batches(&[batch.unwrap()])
638                    .unwrap()
639                    .to_string();
640            let expected = expectations.pop().unwrap();
641            assert_eq!(formatted, expected);
642        }
643    }
644
645    #[tokio::test]
646    async fn test_all_batches_same_combination() {
647        // Create a schema with host and path columns, same as prepare_test_data
648        let schema = Arc::new(Schema::new(vec![
649            Field::new("host", DataType::Utf8, true),
650            Field::new("path", DataType::Utf8, true),
651        ]));
652
653        // Create batches with three different combinations
654        // Each batch contains only one combination
655        // Batches with the same combination are adjacent
656
657        // First combination: "server1", "/var/log"
658        let batch1 = RecordBatch::try_new(
659            schema.clone(),
660            vec![
661                Arc::new(StringArray::from(vec!["server1", "server1", "server1"])) as _,
662                Arc::new(StringArray::from(vec!["/var/log", "/var/log", "/var/log"])) as _,
663            ],
664        )
665        .unwrap();
666
667        let batch2 = RecordBatch::try_new(
668            schema.clone(),
669            vec![
670                Arc::new(StringArray::from(vec!["server1", "server1"])) as _,
671                Arc::new(StringArray::from(vec!["/var/log", "/var/log"])) as _,
672            ],
673        )
674        .unwrap();
675
676        // Second combination: "server2", "/var/data"
677        let batch3 = RecordBatch::try_new(
678            schema.clone(),
679            vec![
680                Arc::new(StringArray::from(vec!["server2", "server2", "server2"])) as _,
681                Arc::new(StringArray::from(vec![
682                    "/var/data",
683                    "/var/data",
684                    "/var/data",
685                ])) as _,
686            ],
687        )
688        .unwrap();
689
690        let batch4 = RecordBatch::try_new(
691            schema.clone(),
692            vec![
693                Arc::new(StringArray::from(vec!["server2"])) as _,
694                Arc::new(StringArray::from(vec!["/var/data"])) as _,
695            ],
696        )
697        .unwrap();
698
699        // Third combination: "server3", "/opt/logs"
700        let batch5 = RecordBatch::try_new(
701            schema.clone(),
702            vec![
703                Arc::new(StringArray::from(vec!["server3", "server3"])) as _,
704                Arc::new(StringArray::from(vec!["/opt/logs", "/opt/logs"])) as _,
705            ],
706        )
707        .unwrap();
708
709        let batch6 = RecordBatch::try_new(
710            schema.clone(),
711            vec![
712                Arc::new(StringArray::from(vec!["server3", "server3", "server3"])) as _,
713                Arc::new(StringArray::from(vec![
714                    "/opt/logs",
715                    "/opt/logs",
716                    "/opt/logs",
717                ])) as _,
718            ],
719        )
720        .unwrap();
721
722        // Create MemoryExec with these batches, keeping same combinations adjacent
723        let memory_exec = Arc::new(
724            MemoryExec::try_new(
725                &[vec![batch1, batch2, batch3, batch4, batch5, batch6]],
726                schema.clone(),
727                None,
728            )
729            .unwrap(),
730        );
731
732        // Create SeriesDivideExec
733        let divide_exec = Arc::new(SeriesDivideExec {
734            tag_columns: vec!["host".to_string(), "path".to_string()],
735            input: memory_exec,
736            metric: ExecutionPlanMetricsSet::new(),
737        });
738
739        // Execute the division
740        let session_context = SessionContext::default();
741        let result =
742            datafusion::physical_plan::collect(divide_exec.clone(), session_context.task_ctx())
743                .await
744                .unwrap();
745
746        // Verify that we got 3 batches (one for each combination)
747        assert_eq!(result.len(), 3);
748
749        // First batch should have 5 rows (3 + 2 from the "server1" combination)
750        assert_eq!(result[0].num_rows(), 5);
751
752        // Second batch should have 4 rows (3 + 1 from the "server2" combination)
753        assert_eq!(result[1].num_rows(), 4);
754
755        // Third batch should have 5 rows (2 + 3 from the "server3" combination)
756        assert_eq!(result[2].num_rows(), 5);
757
758        // Verify values in first batch (server1, /var/log)
759        let host_array1 = result[0]
760            .column(0)
761            .as_any()
762            .downcast_ref::<StringArray>()
763            .unwrap();
764        let path_array1 = result[0]
765            .column(1)
766            .as_any()
767            .downcast_ref::<StringArray>()
768            .unwrap();
769
770        for i in 0..5 {
771            assert_eq!(host_array1.value(i), "server1");
772            assert_eq!(path_array1.value(i), "/var/log");
773        }
774
775        // Verify values in second batch (server2, /var/data)
776        let host_array2 = result[1]
777            .column(0)
778            .as_any()
779            .downcast_ref::<StringArray>()
780            .unwrap();
781        let path_array2 = result[1]
782            .column(1)
783            .as_any()
784            .downcast_ref::<StringArray>()
785            .unwrap();
786
787        for i in 0..4 {
788            assert_eq!(host_array2.value(i), "server2");
789            assert_eq!(path_array2.value(i), "/var/data");
790        }
791
792        // Verify values in third batch (server3, /opt/logs)
793        let host_array3 = result[2]
794            .column(0)
795            .as_any()
796            .downcast_ref::<StringArray>()
797            .unwrap();
798        let path_array3 = result[2]
799            .column(1)
800            .as_any()
801            .downcast_ref::<StringArray>()
802            .unwrap();
803
804        for i in 0..5 {
805            assert_eq!(host_array3.value(i), "server3");
806            assert_eq!(path_array3.value(i), "/opt/logs");
807        }
808
809        // Also verify streaming behavior
810        let mut divide_stream = divide_exec
811            .execute(0, SessionContext::default().task_ctx())
812            .unwrap();
813
814        // Should produce three batches, one for each combination
815        let batch1 = divide_stream.next().await.unwrap().unwrap();
816        assert_eq!(batch1.num_rows(), 5); // server1 combination
817
818        let batch2 = divide_stream.next().await.unwrap().unwrap();
819        assert_eq!(batch2.num_rows(), 4); // server2 combination
820
821        let batch3 = divide_stream.next().await.unwrap().unwrap();
822        assert_eq!(batch3.num_rows(), 5); // server3 combination
823
824        // No more batches should be produced
825        assert!(divide_stream.next().await.is_none());
826    }
827}