1use std::any::Any;
16use std::pin::Pin;
17use std::sync::Arc;
18use std::task::{Context, Poll};
19
20use datafusion::arrow::array::{Array, StringArray};
21use datafusion::arrow::datatypes::SchemaRef;
22use datafusion::arrow::record_batch::RecordBatch;
23use datafusion::common::{DFSchema, DFSchemaRef};
24use datafusion::error::Result as DataFusionResult;
25use datafusion::execution::context::TaskContext;
26use datafusion::logical_expr::{EmptyRelation, Expr, LogicalPlan, UserDefinedLogicalNodeCore};
27use datafusion::physical_expr::{LexRequirement, PhysicalSortRequirement};
28use datafusion::physical_plan::expressions::Column as ColumnExpr;
29use datafusion::physical_plan::metrics::{
30 BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, MetricValue, MetricsSet,
31};
32use datafusion::physical_plan::{
33 DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, PlanProperties, RecordBatchStream,
34 SendableRecordBatchStream,
35};
36use datatypes::arrow::compute;
37use datatypes::compute::SortOptions;
38use futures::{ready, Stream, StreamExt};
39use greptime_proto::substrait_extension as pb;
40use prost::Message;
41use snafu::ResultExt;
42
43use crate::error::{DeserializeSnafu, Result};
44use crate::extension_plan::METRIC_NUM_SERIES;
45use crate::metrics::PROMQL_SERIES_COUNT;
46
47#[derive(Debug, PartialEq, Eq, Hash, PartialOrd)]
48pub struct SeriesDivide {
49 tag_columns: Vec<String>,
50 input: LogicalPlan,
51}
52
53impl UserDefinedLogicalNodeCore for SeriesDivide {
54 fn name(&self) -> &str {
55 Self::name()
56 }
57
58 fn inputs(&self) -> Vec<&LogicalPlan> {
59 vec![&self.input]
60 }
61
62 fn schema(&self) -> &DFSchemaRef {
63 self.input.schema()
64 }
65
66 fn expressions(&self) -> Vec<Expr> {
67 vec![]
68 }
69
70 fn fmt_for_explain(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
71 write!(f, "PromSeriesDivide: tags={:?}", self.tag_columns)
72 }
73
74 fn with_exprs_and_inputs(
75 &self,
76 _exprs: Vec<Expr>,
77 inputs: Vec<LogicalPlan>,
78 ) -> DataFusionResult<Self> {
79 if inputs.is_empty() {
80 return Err(datafusion::error::DataFusionError::Internal(
81 "SeriesDivide must have at least one input".to_string(),
82 ));
83 }
84
85 Ok(Self {
86 tag_columns: self.tag_columns.clone(),
87 input: inputs[0].clone(),
88 })
89 }
90}
91
92impl SeriesDivide {
93 pub fn new(tag_columns: Vec<String>, input: LogicalPlan) -> Self {
94 Self { tag_columns, input }
95 }
96
97 pub const fn name() -> &'static str {
98 "SeriesDivide"
99 }
100
101 pub fn to_execution_plan(&self, exec_input: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
102 Arc::new(SeriesDivideExec {
103 tag_columns: self.tag_columns.clone(),
104 input: exec_input,
105 metric: ExecutionPlanMetricsSet::new(),
106 })
107 }
108
109 pub fn tags(&self) -> &[String] {
110 &self.tag_columns
111 }
112
113 pub fn serialize(&self) -> Vec<u8> {
114 pb::SeriesDivide {
115 tag_columns: self.tag_columns.clone(),
116 }
117 .encode_to_vec()
118 }
119
120 pub fn deserialize(bytes: &[u8]) -> Result<Self> {
121 let pb_series_divide = pb::SeriesDivide::decode(bytes).context(DeserializeSnafu)?;
122 let placeholder_plan = LogicalPlan::EmptyRelation(EmptyRelation {
123 produce_one_row: false,
124 schema: Arc::new(DFSchema::empty()),
125 });
126 Ok(Self {
127 tag_columns: pb_series_divide.tag_columns,
128 input: placeholder_plan,
129 })
130 }
131}
132
133#[derive(Debug)]
134pub struct SeriesDivideExec {
135 tag_columns: Vec<String>,
136 input: Arc<dyn ExecutionPlan>,
137 metric: ExecutionPlanMetricsSet,
138}
139
140impl ExecutionPlan for SeriesDivideExec {
141 fn as_any(&self) -> &dyn Any {
142 self
143 }
144
145 fn schema(&self) -> SchemaRef {
146 self.input.schema()
147 }
148
149 fn properties(&self) -> &PlanProperties {
150 self.input.properties()
151 }
152
153 fn required_input_distribution(&self) -> Vec<Distribution> {
154 let schema = self.input.schema();
155 vec![Distribution::HashPartitioned(
156 self.tag_columns
157 .iter()
158 .map(|tag| Arc::new(ColumnExpr::new_with_schema(tag, &schema).unwrap()) as _)
160 .collect(),
161 )]
162 }
163
164 fn required_input_ordering(&self) -> Vec<Option<LexRequirement>> {
165 let input_schema = self.input.schema();
166 let exprs: Vec<PhysicalSortRequirement> = self
167 .tag_columns
168 .iter()
169 .map(|tag| PhysicalSortRequirement {
170 expr: Arc::new(ColumnExpr::new_with_schema(tag, &input_schema).unwrap()),
172 options: Some(SortOptions {
173 descending: false,
174 nulls_first: true,
175 }),
176 })
177 .collect();
178 if !exprs.is_empty() {
179 vec![Some(LexRequirement::new(exprs))]
180 } else {
181 vec![None]
182 }
183 }
184
185 fn maintains_input_order(&self) -> Vec<bool> {
186 vec![true; self.children().len()]
187 }
188
189 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
190 vec![&self.input]
191 }
192
193 fn with_new_children(
194 self: Arc<Self>,
195 children: Vec<Arc<dyn ExecutionPlan>>,
196 ) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
197 assert!(!children.is_empty());
198 Ok(Arc::new(Self {
199 tag_columns: self.tag_columns.clone(),
200 input: children[0].clone(),
201 metric: self.metric.clone(),
202 }))
203 }
204
205 fn execute(
206 &self,
207 partition: usize,
208 context: Arc<TaskContext>,
209 ) -> DataFusionResult<SendableRecordBatchStream> {
210 let baseline_metric = BaselineMetrics::new(&self.metric, partition);
211 let metrics_builder = MetricBuilder::new(&self.metric);
212 let num_series = Count::new();
213 metrics_builder
214 .with_partition(partition)
215 .build(MetricValue::Count {
216 name: METRIC_NUM_SERIES.into(),
217 count: num_series.clone(),
218 });
219
220 let input = self.input.execute(partition, context)?;
221 let schema = input.schema();
222 let tag_indices = self
223 .tag_columns
224 .iter()
225 .map(|tag| {
226 schema
227 .column_with_name(tag)
228 .unwrap_or_else(|| panic!("tag column not found {tag}"))
229 .0
230 })
231 .collect();
232 Ok(Box::pin(SeriesDivideStream {
233 tag_indices,
234 buffer: vec![],
235 schema,
236 input,
237 metric: baseline_metric,
238 num_series,
239 inspect_start: 0,
240 }))
241 }
242
243 fn metrics(&self) -> Option<MetricsSet> {
244 Some(self.metric.clone_inner())
245 }
246
247 fn name(&self) -> &str {
248 "SeriesDivideExec"
249 }
250}
251
252impl DisplayAs for SeriesDivideExec {
253 fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
254 match t {
255 DisplayFormatType::Default | DisplayFormatType::Verbose => {
256 write!(f, "PromSeriesDivideExec: tags={:?}", self.tag_columns)
257 }
258 }
259 }
260}
261
262pub struct SeriesDivideStream {
264 tag_indices: Vec<usize>,
265 buffer: Vec<RecordBatch>,
266 schema: SchemaRef,
267 input: SendableRecordBatchStream,
268 metric: BaselineMetrics,
269 inspect_start: usize,
271 num_series: Count,
273}
274
275impl RecordBatchStream for SeriesDivideStream {
276 fn schema(&self) -> SchemaRef {
277 self.schema.clone()
278 }
279}
280
281impl Stream for SeriesDivideStream {
282 type Item = DataFusionResult<RecordBatch>;
283
284 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
285 loop {
286 if !self.buffer.is_empty() {
287 let timer = std::time::Instant::now();
288 let cut_at = match self.find_first_diff_row() {
289 Ok(cut_at) => cut_at,
290 Err(e) => return Poll::Ready(Some(Err(e))),
291 };
292 if let Some((batch_index, row_index)) = cut_at {
293 let half_batch_of_first_series =
295 self.buffer[batch_index].slice(0, row_index + 1);
296 let half_batch_of_second_series = self.buffer[batch_index].slice(
297 row_index + 1,
298 self.buffer[batch_index].num_rows() - row_index - 1,
299 );
300 let result_batches = self
301 .buffer
302 .drain(0..batch_index)
303 .chain([half_batch_of_first_series])
304 .collect::<Vec<_>>();
305 if half_batch_of_second_series.num_rows() > 0 {
306 self.buffer[0] = half_batch_of_second_series;
307 } else {
308 self.buffer.remove(0);
309 }
310 let result_batch = compute::concat_batches(&self.schema, &result_batches)?;
311
312 self.inspect_start = 0;
313 self.num_series.add(1);
314 self.metric.elapsed_compute().add_elapsed(timer);
315 return Poll::Ready(Some(Ok(result_batch)));
316 } else {
317 self.metric.elapsed_compute().add_elapsed(timer);
318 let next_batch = ready!(self.as_mut().fetch_next_batch(cx)).transpose()?;
320 let timer = std::time::Instant::now();
321 if let Some(next_batch) = next_batch {
322 if next_batch.num_rows() != 0 {
323 self.buffer.push(next_batch);
324 }
325 continue;
326 } else {
327 let result = compute::concat_batches(&self.schema, &self.buffer)?;
329 self.buffer.clear();
330 self.inspect_start = 0;
331 self.num_series.add(1);
332 self.metric.elapsed_compute().add_elapsed(timer);
333 return Poll::Ready(Some(Ok(result)));
334 }
335 }
336 } else {
337 let batch = match ready!(self.as_mut().fetch_next_batch(cx)) {
338 Some(Ok(batch)) => batch,
339 None => {
340 PROMQL_SERIES_COUNT.observe(self.num_series.value() as f64);
341 return Poll::Ready(None);
342 }
343 error => return Poll::Ready(error),
344 };
345 self.buffer.push(batch);
346 continue;
347 }
348 }
349 }
350}
351
352impl SeriesDivideStream {
353 fn fetch_next_batch(
354 mut self: Pin<&mut Self>,
355 cx: &mut Context<'_>,
356 ) -> Poll<Option<DataFusionResult<RecordBatch>>> {
357 let poll = self.input.poll_next_unpin(cx);
358 self.metric.record_poll(poll)
359 }
360
361 fn find_first_diff_row(&mut self) -> DataFusionResult<Option<(usize, usize)>> {
364 if self.tag_indices.is_empty() {
366 return Ok(None);
367 }
368
369 let mut resumed_batch_index = self.inspect_start;
370
371 for batch in &self.buffer[resumed_batch_index..] {
372 let num_rows = batch.num_rows();
373 let mut result_index = num_rows;
374
375 if resumed_batch_index > self.inspect_start.checked_sub(1).unwrap_or_default() {
377 let last_batch = &self.buffer[resumed_batch_index - 1];
378 let last_row = last_batch.num_rows() - 1;
379 for index in &self.tag_indices {
380 let current_array = batch.column(*index);
381 let last_array = last_batch.column(*index);
382 let current_string_array = current_array
383 .as_any()
384 .downcast_ref::<StringArray>()
385 .ok_or_else(|| {
386 datafusion::error::DataFusionError::Internal(
387 "Failed to downcast tag column to StringArray".to_string(),
388 )
389 })?;
390 let last_string_array = last_array
391 .as_any()
392 .downcast_ref::<StringArray>()
393 .ok_or_else(|| {
394 datafusion::error::DataFusionError::Internal(
395 "Failed to downcast tag column to StringArray".to_string(),
396 )
397 })?;
398 let current_value = current_string_array.value(0);
399 let last_value = last_string_array.value(last_row);
400 if current_value != last_value {
401 return Ok(Some((resumed_batch_index - 1, last_batch.num_rows() - 1)));
402 }
403 }
404 }
405
406 let mut all_same = true;
408 for index in &self.tag_indices {
409 let array = batch.column(*index);
410 let string_array =
411 array
412 .as_any()
413 .downcast_ref::<StringArray>()
414 .ok_or_else(|| {
415 datafusion::error::DataFusionError::Internal(
416 "Failed to downcast tag column to StringArray".to_string(),
417 )
418 })?;
419 if string_array.value(0) != string_array.value(num_rows - 1) {
420 all_same = false;
421 break;
422 }
423 }
424 if all_same {
425 resumed_batch_index += 1;
426 continue;
427 }
428
429 for index in &self.tag_indices {
431 let array = batch.column(*index);
432 let string_array =
433 array
434 .as_any()
435 .downcast_ref::<StringArray>()
436 .ok_or_else(|| {
437 datafusion::error::DataFusionError::Internal(
438 "Failed to downcast tag column to StringArray".to_string(),
439 )
440 })?;
441 let mut same_until = 0;
443 while same_until < num_rows - 1 {
444 if string_array.value(same_until) != string_array.value(same_until + 1) {
445 break;
446 }
447 same_until += 1;
448 }
449 result_index = result_index.min(same_until);
450 }
451
452 if result_index + 1 >= num_rows {
453 resumed_batch_index += 1;
455 } else {
456 return Ok(Some((resumed_batch_index, result_index)));
457 }
458 }
459
460 self.inspect_start = resumed_batch_index;
461 Ok(None)
462 }
463}
464
465#[cfg(test)]
466mod test {
467 use datafusion::arrow::datatypes::{DataType, Field, Schema};
468 use datafusion::physical_plan::memory::MemoryExec;
469 use datafusion::prelude::SessionContext;
470
471 use super::*;
472
473 fn prepare_test_data() -> MemoryExec {
474 let schema = Arc::new(Schema::new(vec![
475 Field::new("host", DataType::Utf8, true),
476 Field::new("path", DataType::Utf8, true),
477 ]));
478
479 let path_column_1 = Arc::new(StringArray::from(vec![
480 "foo", "foo", "foo", "bar", "bar", "bar", "bar", "bar", "bar", "bla", "bla", "bla",
481 ])) as _;
482 let host_column_1 = Arc::new(StringArray::from(vec![
483 "000", "000", "001", "002", "002", "002", "002", "002", "003", "005", "005", "005",
484 ])) as _;
485
486 let path_column_2 = Arc::new(StringArray::from(vec!["bla", "bla", "bla"])) as _;
487 let host_column_2 = Arc::new(StringArray::from(vec!["005", "005", "005"])) as _;
488
489 let path_column_3 = Arc::new(StringArray::from(vec![
490 "bla", "🥺", "🥺", "🥺", "🥺", "🥺", "🫠", "🫠",
491 ])) as _;
492 let host_column_3 = Arc::new(StringArray::from(vec![
493 "005", "001", "001", "001", "001", "001", "001", "001",
494 ])) as _;
495
496 let data_1 =
497 RecordBatch::try_new(schema.clone(), vec![path_column_1, host_column_1]).unwrap();
498 let data_2 =
499 RecordBatch::try_new(schema.clone(), vec![path_column_2, host_column_2]).unwrap();
500 let data_3 =
501 RecordBatch::try_new(schema.clone(), vec![path_column_3, host_column_3]).unwrap();
502
503 MemoryExec::try_new(&[vec![data_1, data_2, data_3]], schema, None).unwrap()
504 }
505
506 #[tokio::test]
507 async fn overall_data() {
508 let memory_exec = Arc::new(prepare_test_data());
509 let divide_exec = Arc::new(SeriesDivideExec {
510 tag_columns: vec!["host".to_string(), "path".to_string()],
511 input: memory_exec,
512 metric: ExecutionPlanMetricsSet::new(),
513 });
514 let session_context = SessionContext::default();
515 let result = datafusion::physical_plan::collect(divide_exec, session_context.task_ctx())
516 .await
517 .unwrap();
518 let result_literal = datatypes::arrow::util::pretty::pretty_format_batches(&result)
519 .unwrap()
520 .to_string();
521
522 let expected = String::from(
523 "+------+------+\
524 \n| host | path |\
525 \n+------+------+\
526 \n| foo | 000 |\
527 \n| foo | 000 |\
528 \n| foo | 001 |\
529 \n| bar | 002 |\
530 \n| bar | 002 |\
531 \n| bar | 002 |\
532 \n| bar | 002 |\
533 \n| bar | 002 |\
534 \n| bar | 003 |\
535 \n| bla | 005 |\
536 \n| bla | 005 |\
537 \n| bla | 005 |\
538 \n| bla | 005 |\
539 \n| bla | 005 |\
540 \n| bla | 005 |\
541 \n| bla | 005 |\
542 \n| 🥺 | 001 |\
543 \n| 🥺 | 001 |\
544 \n| 🥺 | 001 |\
545 \n| 🥺 | 001 |\
546 \n| 🥺 | 001 |\
547 \n| 🫠| 001 |\
548 \n| 🫠| 001 |\
549 \n+------+------+",
550 );
551 assert_eq!(result_literal, expected);
552 }
553
554 #[tokio::test]
555 async fn per_batch_data() {
556 let memory_exec = Arc::new(prepare_test_data());
557 let divide_exec = Arc::new(SeriesDivideExec {
558 tag_columns: vec!["host".to_string(), "path".to_string()],
559 input: memory_exec,
560 metric: ExecutionPlanMetricsSet::new(),
561 });
562 let mut divide_stream = divide_exec
563 .execute(0, SessionContext::default().task_ctx())
564 .unwrap();
565
566 let mut expectations = vec![
567 String::from(
568 "+------+------+\
569 \n| host | path |\
570 \n+------+------+\
571 \n| foo | 000 |\
572 \n| foo | 000 |\
573 \n+------+------+",
574 ),
575 String::from(
576 "+------+------+\
577 \n| host | path |\
578 \n+------+------+\
579 \n| foo | 001 |\
580 \n+------+------+",
581 ),
582 String::from(
583 "+------+------+\
584 \n| host | path |\
585 \n+------+------+\
586 \n| bar | 002 |\
587 \n| bar | 002 |\
588 \n| bar | 002 |\
589 \n| bar | 002 |\
590 \n| bar | 002 |\
591 \n+------+------+",
592 ),
593 String::from(
594 "+------+------+\
595 \n| host | path |\
596 \n+------+------+\
597 \n| bar | 003 |\
598 \n+------+------+",
599 ),
600 String::from(
601 "+------+------+\
602 \n| host | path |\
603 \n+------+------+\
604 \n| bla | 005 |\
605 \n| bla | 005 |\
606 \n| bla | 005 |\
607 \n| bla | 005 |\
608 \n| bla | 005 |\
609 \n| bla | 005 |\
610 \n| bla | 005 |\
611 \n+------+------+",
612 ),
613 String::from(
614 "+------+------+\
615 \n| host | path |\
616 \n+------+------+\
617 \n| 🥺 | 001 |\
618 \n| 🥺 | 001 |\
619 \n| 🥺 | 001 |\
620 \n| 🥺 | 001 |\
621 \n| 🥺 | 001 |\
622 \n+------+------+",
623 ),
624 String::from(
625 "+------+------+\
626 \n| host | path |\
627 \n+------+------+\
628 \n| 🫠| 001 |\
629 \n| 🫠| 001 |\
630 \n+------+------+",
631 ),
632 ];
633 expectations.reverse();
634
635 while let Some(batch) = divide_stream.next().await {
636 let formatted =
637 datatypes::arrow::util::pretty::pretty_format_batches(&[batch.unwrap()])
638 .unwrap()
639 .to_string();
640 let expected = expectations.pop().unwrap();
641 assert_eq!(formatted, expected);
642 }
643 }
644
645 #[tokio::test]
646 async fn test_all_batches_same_combination() {
647 let schema = Arc::new(Schema::new(vec![
649 Field::new("host", DataType::Utf8, true),
650 Field::new("path", DataType::Utf8, true),
651 ]));
652
653 let batch1 = RecordBatch::try_new(
659 schema.clone(),
660 vec![
661 Arc::new(StringArray::from(vec!["server1", "server1", "server1"])) as _,
662 Arc::new(StringArray::from(vec!["/var/log", "/var/log", "/var/log"])) as _,
663 ],
664 )
665 .unwrap();
666
667 let batch2 = RecordBatch::try_new(
668 schema.clone(),
669 vec![
670 Arc::new(StringArray::from(vec!["server1", "server1"])) as _,
671 Arc::new(StringArray::from(vec!["/var/log", "/var/log"])) as _,
672 ],
673 )
674 .unwrap();
675
676 let batch3 = RecordBatch::try_new(
678 schema.clone(),
679 vec![
680 Arc::new(StringArray::from(vec!["server2", "server2", "server2"])) as _,
681 Arc::new(StringArray::from(vec![
682 "/var/data",
683 "/var/data",
684 "/var/data",
685 ])) as _,
686 ],
687 )
688 .unwrap();
689
690 let batch4 = RecordBatch::try_new(
691 schema.clone(),
692 vec![
693 Arc::new(StringArray::from(vec!["server2"])) as _,
694 Arc::new(StringArray::from(vec!["/var/data"])) as _,
695 ],
696 )
697 .unwrap();
698
699 let batch5 = RecordBatch::try_new(
701 schema.clone(),
702 vec![
703 Arc::new(StringArray::from(vec!["server3", "server3"])) as _,
704 Arc::new(StringArray::from(vec!["/opt/logs", "/opt/logs"])) as _,
705 ],
706 )
707 .unwrap();
708
709 let batch6 = RecordBatch::try_new(
710 schema.clone(),
711 vec![
712 Arc::new(StringArray::from(vec!["server3", "server3", "server3"])) as _,
713 Arc::new(StringArray::from(vec![
714 "/opt/logs",
715 "/opt/logs",
716 "/opt/logs",
717 ])) as _,
718 ],
719 )
720 .unwrap();
721
722 let memory_exec = Arc::new(
724 MemoryExec::try_new(
725 &[vec![batch1, batch2, batch3, batch4, batch5, batch6]],
726 schema.clone(),
727 None,
728 )
729 .unwrap(),
730 );
731
732 let divide_exec = Arc::new(SeriesDivideExec {
734 tag_columns: vec!["host".to_string(), "path".to_string()],
735 input: memory_exec,
736 metric: ExecutionPlanMetricsSet::new(),
737 });
738
739 let session_context = SessionContext::default();
741 let result =
742 datafusion::physical_plan::collect(divide_exec.clone(), session_context.task_ctx())
743 .await
744 .unwrap();
745
746 assert_eq!(result.len(), 3);
748
749 assert_eq!(result[0].num_rows(), 5);
751
752 assert_eq!(result[1].num_rows(), 4);
754
755 assert_eq!(result[2].num_rows(), 5);
757
758 let host_array1 = result[0]
760 .column(0)
761 .as_any()
762 .downcast_ref::<StringArray>()
763 .unwrap();
764 let path_array1 = result[0]
765 .column(1)
766 .as_any()
767 .downcast_ref::<StringArray>()
768 .unwrap();
769
770 for i in 0..5 {
771 assert_eq!(host_array1.value(i), "server1");
772 assert_eq!(path_array1.value(i), "/var/log");
773 }
774
775 let host_array2 = result[1]
777 .column(0)
778 .as_any()
779 .downcast_ref::<StringArray>()
780 .unwrap();
781 let path_array2 = result[1]
782 .column(1)
783 .as_any()
784 .downcast_ref::<StringArray>()
785 .unwrap();
786
787 for i in 0..4 {
788 assert_eq!(host_array2.value(i), "server2");
789 assert_eq!(path_array2.value(i), "/var/data");
790 }
791
792 let host_array3 = result[2]
794 .column(0)
795 .as_any()
796 .downcast_ref::<StringArray>()
797 .unwrap();
798 let path_array3 = result[2]
799 .column(1)
800 .as_any()
801 .downcast_ref::<StringArray>()
802 .unwrap();
803
804 for i in 0..5 {
805 assert_eq!(host_array3.value(i), "server3");
806 assert_eq!(path_array3.value(i), "/opt/logs");
807 }
808
809 let mut divide_stream = divide_exec
811 .execute(0, SessionContext::default().task_ctx())
812 .unwrap();
813
814 let batch1 = divide_stream.next().await.unwrap().unwrap();
816 assert_eq!(batch1.num_rows(), 5); let batch2 = divide_stream.next().await.unwrap().unwrap();
819 assert_eq!(batch2.num_rows(), 4); let batch3 = divide_stream.next().await.unwrap().unwrap();
822 assert_eq!(batch3.num_rows(), 5); assert!(divide_stream.next().await.is_none());
826 }
827}