1use std::any::Any;
16use std::pin::Pin;
17use std::sync::Arc;
18use std::task::{Context, Poll};
19
20use datafusion::arrow::array::{Array, StringArray};
21use datafusion::arrow::datatypes::SchemaRef;
22use datafusion::arrow::record_batch::RecordBatch;
23use datafusion::common::{DFSchema, DFSchemaRef};
24use datafusion::error::Result as DataFusionResult;
25use datafusion::execution::context::TaskContext;
26use datafusion::logical_expr::{EmptyRelation, Expr, LogicalPlan, UserDefinedLogicalNodeCore};
27use datafusion::physical_expr::{LexRequirement, PhysicalSortRequirement};
28use datafusion::physical_plan::expressions::Column as ColumnExpr;
29use datafusion::physical_plan::metrics::{
30 BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, MetricValue, MetricsSet,
31};
32use datafusion::physical_plan::{
33 DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, PlanProperties, RecordBatchStream,
34 SendableRecordBatchStream,
35};
36use datatypes::arrow::compute;
37use datatypes::compute::SortOptions;
38use futures::{ready, Stream, StreamExt};
39use greptime_proto::substrait_extension as pb;
40use prost::Message;
41use snafu::ResultExt;
42
43use crate::error::{DeserializeSnafu, Result};
44use crate::extension_plan::METRIC_NUM_SERIES;
45use crate::metrics::PROMQL_SERIES_COUNT;
46
47#[derive(Debug, PartialEq, Eq, Hash, PartialOrd)]
48pub struct SeriesDivide {
49 tag_columns: Vec<String>,
50 time_index_column: String,
55 input: LogicalPlan,
56}
57
58impl UserDefinedLogicalNodeCore for SeriesDivide {
59 fn name(&self) -> &str {
60 Self::name()
61 }
62
63 fn inputs(&self) -> Vec<&LogicalPlan> {
64 vec![&self.input]
65 }
66
67 fn schema(&self) -> &DFSchemaRef {
68 self.input.schema()
69 }
70
71 fn expressions(&self) -> Vec<Expr> {
72 vec![]
73 }
74
75 fn fmt_for_explain(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
76 write!(f, "PromSeriesDivide: tags={:?}", self.tag_columns)
77 }
78
79 fn with_exprs_and_inputs(
80 &self,
81 _exprs: Vec<Expr>,
82 inputs: Vec<LogicalPlan>,
83 ) -> DataFusionResult<Self> {
84 if inputs.is_empty() {
85 return Err(datafusion::error::DataFusionError::Internal(
86 "SeriesDivide must have at least one input".to_string(),
87 ));
88 }
89
90 Ok(Self {
91 tag_columns: self.tag_columns.clone(),
92 time_index_column: self.time_index_column.clone(),
93 input: inputs[0].clone(),
94 })
95 }
96}
97
98impl SeriesDivide {
99 pub fn new(tag_columns: Vec<String>, time_index_column: String, input: LogicalPlan) -> Self {
100 Self {
101 tag_columns,
102 time_index_column,
103 input,
104 }
105 }
106
107 pub const fn name() -> &'static str {
108 "SeriesDivide"
109 }
110
111 pub fn to_execution_plan(&self, exec_input: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
112 Arc::new(SeriesDivideExec {
113 tag_columns: self.tag_columns.clone(),
114 time_index_column: self.time_index_column.clone(),
115 input: exec_input,
116 metric: ExecutionPlanMetricsSet::new(),
117 })
118 }
119
120 pub fn tags(&self) -> &[String] {
121 &self.tag_columns
122 }
123
124 pub fn serialize(&self) -> Vec<u8> {
125 pb::SeriesDivide {
126 tag_columns: self.tag_columns.clone(),
127 time_index_column: self.time_index_column.clone(),
128 }
129 .encode_to_vec()
130 }
131
132 pub fn deserialize(bytes: &[u8]) -> Result<Self> {
133 let pb_series_divide = pb::SeriesDivide::decode(bytes).context(DeserializeSnafu)?;
134 let placeholder_plan = LogicalPlan::EmptyRelation(EmptyRelation {
135 produce_one_row: false,
136 schema: Arc::new(DFSchema::empty()),
137 });
138 Ok(Self {
139 tag_columns: pb_series_divide.tag_columns,
140 time_index_column: pb_series_divide.time_index_column,
141 input: placeholder_plan,
142 })
143 }
144}
145
146#[derive(Debug)]
147pub struct SeriesDivideExec {
148 tag_columns: Vec<String>,
149 time_index_column: String,
150 input: Arc<dyn ExecutionPlan>,
151 metric: ExecutionPlanMetricsSet,
152}
153
154impl ExecutionPlan for SeriesDivideExec {
155 fn as_any(&self) -> &dyn Any {
156 self
157 }
158
159 fn schema(&self) -> SchemaRef {
160 self.input.schema()
161 }
162
163 fn properties(&self) -> &PlanProperties {
164 self.input.properties()
165 }
166
167 fn required_input_distribution(&self) -> Vec<Distribution> {
168 let schema = self.input.schema();
169 vec![Distribution::HashPartitioned(
170 self.tag_columns
171 .iter()
172 .map(|tag| Arc::new(ColumnExpr::new_with_schema(tag, &schema).unwrap()) as _)
174 .collect(),
175 )]
176 }
177
178 fn required_input_ordering(&self) -> Vec<Option<LexRequirement>> {
179 let input_schema = self.input.schema();
180 let mut exprs: Vec<PhysicalSortRequirement> = self
181 .tag_columns
182 .iter()
183 .map(|tag| PhysicalSortRequirement {
184 expr: Arc::new(ColumnExpr::new_with_schema(tag, &input_schema).unwrap()),
186 options: Some(SortOptions {
187 descending: false,
188 nulls_first: true,
189 }),
190 })
191 .collect();
192
193 exprs.push(PhysicalSortRequirement {
194 expr: Arc::new(
195 ColumnExpr::new_with_schema(&self.time_index_column, &input_schema).unwrap(),
196 ),
197 options: Some(SortOptions {
198 descending: false,
199 nulls_first: true,
200 }),
201 });
202 vec![Some(LexRequirement::new(exprs))]
203 }
204
205 fn maintains_input_order(&self) -> Vec<bool> {
206 vec![true; self.children().len()]
207 }
208
209 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
210 vec![&self.input]
211 }
212
213 fn with_new_children(
214 self: Arc<Self>,
215 children: Vec<Arc<dyn ExecutionPlan>>,
216 ) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
217 assert!(!children.is_empty());
218 Ok(Arc::new(Self {
219 tag_columns: self.tag_columns.clone(),
220 time_index_column: self.time_index_column.clone(),
221 input: children[0].clone(),
222 metric: self.metric.clone(),
223 }))
224 }
225
226 fn execute(
227 &self,
228 partition: usize,
229 context: Arc<TaskContext>,
230 ) -> DataFusionResult<SendableRecordBatchStream> {
231 let baseline_metric = BaselineMetrics::new(&self.metric, partition);
232 let metrics_builder = MetricBuilder::new(&self.metric);
233 let num_series = Count::new();
234 metrics_builder
235 .with_partition(partition)
236 .build(MetricValue::Count {
237 name: METRIC_NUM_SERIES.into(),
238 count: num_series.clone(),
239 });
240
241 let input = self.input.execute(partition, context)?;
242 let schema = input.schema();
243 let tag_indices = self
244 .tag_columns
245 .iter()
246 .map(|tag| {
247 schema
248 .column_with_name(tag)
249 .unwrap_or_else(|| panic!("tag column not found {tag}"))
250 .0
251 })
252 .collect();
253 Ok(Box::pin(SeriesDivideStream {
254 tag_indices,
255 buffer: vec![],
256 schema,
257 input,
258 metric: baseline_metric,
259 num_series,
260 inspect_start: 0,
261 }))
262 }
263
264 fn metrics(&self) -> Option<MetricsSet> {
265 Some(self.metric.clone_inner())
266 }
267
268 fn name(&self) -> &str {
269 "SeriesDivideExec"
270 }
271}
272
273impl DisplayAs for SeriesDivideExec {
274 fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
275 match t {
276 DisplayFormatType::Default | DisplayFormatType::Verbose => {
277 write!(f, "PromSeriesDivideExec: tags={:?}", self.tag_columns)
278 }
279 }
280 }
281}
282
283pub struct SeriesDivideStream {
285 tag_indices: Vec<usize>,
286 buffer: Vec<RecordBatch>,
287 schema: SchemaRef,
288 input: SendableRecordBatchStream,
289 metric: BaselineMetrics,
290 inspect_start: usize,
292 num_series: Count,
294}
295
296impl RecordBatchStream for SeriesDivideStream {
297 fn schema(&self) -> SchemaRef {
298 self.schema.clone()
299 }
300}
301
302impl Stream for SeriesDivideStream {
303 type Item = DataFusionResult<RecordBatch>;
304
305 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
306 loop {
307 if !self.buffer.is_empty() {
308 let timer = std::time::Instant::now();
309 let cut_at = match self.find_first_diff_row() {
310 Ok(cut_at) => cut_at,
311 Err(e) => return Poll::Ready(Some(Err(e))),
312 };
313 if let Some((batch_index, row_index)) = cut_at {
314 let half_batch_of_first_series =
316 self.buffer[batch_index].slice(0, row_index + 1);
317 let half_batch_of_second_series = self.buffer[batch_index].slice(
318 row_index + 1,
319 self.buffer[batch_index].num_rows() - row_index - 1,
320 );
321 let result_batches = self
322 .buffer
323 .drain(0..batch_index)
324 .chain([half_batch_of_first_series])
325 .collect::<Vec<_>>();
326 if half_batch_of_second_series.num_rows() > 0 {
327 self.buffer[0] = half_batch_of_second_series;
328 } else {
329 self.buffer.remove(0);
330 }
331 let result_batch = compute::concat_batches(&self.schema, &result_batches)?;
332
333 self.inspect_start = 0;
334 self.num_series.add(1);
335 self.metric.elapsed_compute().add_elapsed(timer);
336 return Poll::Ready(Some(Ok(result_batch)));
337 } else {
338 self.metric.elapsed_compute().add_elapsed(timer);
339 let next_batch = ready!(self.as_mut().fetch_next_batch(cx)).transpose()?;
341 let timer = std::time::Instant::now();
342 if let Some(next_batch) = next_batch {
343 if next_batch.num_rows() != 0 {
344 self.buffer.push(next_batch);
345 }
346 continue;
347 } else {
348 let result = compute::concat_batches(&self.schema, &self.buffer)?;
350 self.buffer.clear();
351 self.inspect_start = 0;
352 self.num_series.add(1);
353 self.metric.elapsed_compute().add_elapsed(timer);
354 return Poll::Ready(Some(Ok(result)));
355 }
356 }
357 } else {
358 let batch = match ready!(self.as_mut().fetch_next_batch(cx)) {
359 Some(Ok(batch)) => batch,
360 None => {
361 PROMQL_SERIES_COUNT.observe(self.num_series.value() as f64);
362 return Poll::Ready(None);
363 }
364 error => return Poll::Ready(error),
365 };
366 self.buffer.push(batch);
367 continue;
368 }
369 }
370 }
371}
372
373impl SeriesDivideStream {
374 fn fetch_next_batch(
375 mut self: Pin<&mut Self>,
376 cx: &mut Context<'_>,
377 ) -> Poll<Option<DataFusionResult<RecordBatch>>> {
378 let poll = self.input.poll_next_unpin(cx);
379 self.metric.record_poll(poll)
380 }
381
382 fn find_first_diff_row(&mut self) -> DataFusionResult<Option<(usize, usize)>> {
385 if self.tag_indices.is_empty() {
387 return Ok(None);
388 }
389
390 let mut resumed_batch_index = self.inspect_start;
391
392 for batch in &self.buffer[resumed_batch_index..] {
393 let num_rows = batch.num_rows();
394 let mut result_index = num_rows;
395
396 if resumed_batch_index > self.inspect_start.checked_sub(1).unwrap_or_default() {
398 let last_batch = &self.buffer[resumed_batch_index - 1];
399 let last_row = last_batch.num_rows() - 1;
400 for index in &self.tag_indices {
401 let current_array = batch.column(*index);
402 let last_array = last_batch.column(*index);
403 let current_string_array = current_array
404 .as_any()
405 .downcast_ref::<StringArray>()
406 .ok_or_else(|| {
407 datafusion::error::DataFusionError::Internal(
408 "Failed to downcast tag column to StringArray".to_string(),
409 )
410 })?;
411 let last_string_array = last_array
412 .as_any()
413 .downcast_ref::<StringArray>()
414 .ok_or_else(|| {
415 datafusion::error::DataFusionError::Internal(
416 "Failed to downcast tag column to StringArray".to_string(),
417 )
418 })?;
419 let current_value = current_string_array.value(0);
420 let last_value = last_string_array.value(last_row);
421 if current_value != last_value {
422 return Ok(Some((resumed_batch_index - 1, last_batch.num_rows() - 1)));
423 }
424 }
425 }
426
427 let mut all_same = true;
429 for index in &self.tag_indices {
430 let array = batch.column(*index);
431 let string_array =
432 array
433 .as_any()
434 .downcast_ref::<StringArray>()
435 .ok_or_else(|| {
436 datafusion::error::DataFusionError::Internal(
437 "Failed to downcast tag column to StringArray".to_string(),
438 )
439 })?;
440 if string_array.value(0) != string_array.value(num_rows - 1) {
441 all_same = false;
442 break;
443 }
444 }
445 if all_same {
446 resumed_batch_index += 1;
447 continue;
448 }
449
450 for index in &self.tag_indices {
452 let array = batch.column(*index);
453 let string_array =
454 array
455 .as_any()
456 .downcast_ref::<StringArray>()
457 .ok_or_else(|| {
458 datafusion::error::DataFusionError::Internal(
459 "Failed to downcast tag column to StringArray".to_string(),
460 )
461 })?;
462 let mut same_until = 0;
464 while same_until < num_rows - 1 {
465 if string_array.value(same_until) != string_array.value(same_until + 1) {
466 break;
467 }
468 same_until += 1;
469 }
470 result_index = result_index.min(same_until);
471 }
472
473 if result_index + 1 >= num_rows {
474 resumed_batch_index += 1;
476 } else {
477 return Ok(Some((resumed_batch_index, result_index)));
478 }
479 }
480
481 self.inspect_start = resumed_batch_index;
482 Ok(None)
483 }
484}
485
486#[cfg(test)]
487mod test {
488 use datafusion::arrow::datatypes::{DataType, Field, Schema};
489 use datafusion::physical_plan::memory::MemoryExec;
490 use datafusion::prelude::SessionContext;
491
492 use super::*;
493
494 fn prepare_test_data() -> MemoryExec {
495 let schema = Arc::new(Schema::new(vec![
496 Field::new("host", DataType::Utf8, true),
497 Field::new("path", DataType::Utf8, true),
498 Field::new(
499 "time_index",
500 DataType::Timestamp(datafusion::arrow::datatypes::TimeUnit::Millisecond, None),
501 false,
502 ),
503 ]));
504
505 let path_column_1 = Arc::new(StringArray::from(vec![
506 "foo", "foo", "foo", "bar", "bar", "bar", "bar", "bar", "bar", "bla", "bla", "bla",
507 ])) as _;
508 let host_column_1 = Arc::new(StringArray::from(vec![
509 "000", "000", "001", "002", "002", "002", "002", "002", "003", "005", "005", "005",
510 ])) as _;
511 let time_index_column_1 = Arc::new(
512 datafusion::arrow::array::TimestampMillisecondArray::from(vec![
513 1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000, 10000, 11000, 12000,
514 ]),
515 ) as _;
516
517 let path_column_2 = Arc::new(StringArray::from(vec!["bla", "bla", "bla"])) as _;
518 let host_column_2 = Arc::new(StringArray::from(vec!["005", "005", "005"])) as _;
519 let time_index_column_2 = Arc::new(
520 datafusion::arrow::array::TimestampMillisecondArray::from(vec![13000, 14000, 15000]),
521 ) as _;
522
523 let path_column_3 = Arc::new(StringArray::from(vec![
524 "bla", "🥺", "🥺", "🥺", "🥺", "🥺", "🫠", "🫠",
525 ])) as _;
526 let host_column_3 = Arc::new(StringArray::from(vec![
527 "005", "001", "001", "001", "001", "001", "001", "001",
528 ])) as _;
529 let time_index_column_3 =
530 Arc::new(datafusion::arrow::array::TimestampMillisecondArray::from(
531 vec![16000, 17000, 18000, 19000, 20000, 21000, 22000, 23000],
532 )) as _;
533
534 let data_1 = RecordBatch::try_new(
535 schema.clone(),
536 vec![path_column_1, host_column_1, time_index_column_1],
537 )
538 .unwrap();
539 let data_2 = RecordBatch::try_new(
540 schema.clone(),
541 vec![path_column_2, host_column_2, time_index_column_2],
542 )
543 .unwrap();
544 let data_3 = RecordBatch::try_new(
545 schema.clone(),
546 vec![path_column_3, host_column_3, time_index_column_3],
547 )
548 .unwrap();
549
550 MemoryExec::try_new(&[vec![data_1, data_2, data_3]], schema, None).unwrap()
551 }
552
553 #[tokio::test]
554 async fn overall_data() {
555 let memory_exec = Arc::new(prepare_test_data());
556 let divide_exec = Arc::new(SeriesDivideExec {
557 tag_columns: vec!["host".to_string(), "path".to_string()],
558 time_index_column: "time_index".to_string(),
559 input: memory_exec,
560 metric: ExecutionPlanMetricsSet::new(),
561 });
562 let session_context = SessionContext::default();
563 let result = datafusion::physical_plan::collect(divide_exec, session_context.task_ctx())
564 .await
565 .unwrap();
566 let result_literal = datatypes::arrow::util::pretty::pretty_format_batches(&result)
567 .unwrap()
568 .to_string();
569
570 let expected = String::from(
571 "+------+------+---------------------+\
572 \n| host | path | time_index |\
573 \n+------+------+---------------------+\
574 \n| foo | 000 | 1970-01-01T00:00:01 |\
575 \n| foo | 000 | 1970-01-01T00:00:02 |\
576 \n| foo | 001 | 1970-01-01T00:00:03 |\
577 \n| bar | 002 | 1970-01-01T00:00:04 |\
578 \n| bar | 002 | 1970-01-01T00:00:05 |\
579 \n| bar | 002 | 1970-01-01T00:00:06 |\
580 \n| bar | 002 | 1970-01-01T00:00:07 |\
581 \n| bar | 002 | 1970-01-01T00:00:08 |\
582 \n| bar | 003 | 1970-01-01T00:00:09 |\
583 \n| bla | 005 | 1970-01-01T00:00:10 |\
584 \n| bla | 005 | 1970-01-01T00:00:11 |\
585 \n| bla | 005 | 1970-01-01T00:00:12 |\
586 \n| bla | 005 | 1970-01-01T00:00:13 |\
587 \n| bla | 005 | 1970-01-01T00:00:14 |\
588 \n| bla | 005 | 1970-01-01T00:00:15 |\
589 \n| bla | 005 | 1970-01-01T00:00:16 |\
590 \n| 🥺 | 001 | 1970-01-01T00:00:17 |\
591 \n| 🥺 | 001 | 1970-01-01T00:00:18 |\
592 \n| 🥺 | 001 | 1970-01-01T00:00:19 |\
593 \n| 🥺 | 001 | 1970-01-01T00:00:20 |\
594 \n| 🥺 | 001 | 1970-01-01T00:00:21 |\
595 \n| 🫠| 001 | 1970-01-01T00:00:22 |\
596 \n| 🫠| 001 | 1970-01-01T00:00:23 |\
597 \n+------+------+---------------------+",
598 );
599 assert_eq!(result_literal, expected);
600 }
601
602 #[tokio::test]
603 async fn per_batch_data() {
604 let memory_exec = Arc::new(prepare_test_data());
605 let divide_exec = Arc::new(SeriesDivideExec {
606 tag_columns: vec!["host".to_string(), "path".to_string()],
607 time_index_column: "time_index".to_string(),
608 input: memory_exec,
609 metric: ExecutionPlanMetricsSet::new(),
610 });
611 let mut divide_stream = divide_exec
612 .execute(0, SessionContext::default().task_ctx())
613 .unwrap();
614
615 let mut expectations = vec![
616 String::from(
617 "+------+------+---------------------+\
618 \n| host | path | time_index |\
619 \n+------+------+---------------------+\
620 \n| foo | 000 | 1970-01-01T00:00:01 |\
621 \n| foo | 000 | 1970-01-01T00:00:02 |\
622 \n+------+------+---------------------+",
623 ),
624 String::from(
625 "+------+------+---------------------+\
626 \n| host | path | time_index |\
627 \n+------+------+---------------------+\
628 \n| foo | 001 | 1970-01-01T00:00:03 |\
629 \n+------+------+---------------------+",
630 ),
631 String::from(
632 "+------+------+---------------------+\
633 \n| host | path | time_index |\
634 \n+------+------+---------------------+\
635 \n| bar | 002 | 1970-01-01T00:00:04 |\
636 \n| bar | 002 | 1970-01-01T00:00:05 |\
637 \n| bar | 002 | 1970-01-01T00:00:06 |\
638 \n| bar | 002 | 1970-01-01T00:00:07 |\
639 \n| bar | 002 | 1970-01-01T00:00:08 |\
640 \n+------+------+---------------------+",
641 ),
642 String::from(
643 "+------+------+---------------------+\
644 \n| host | path | time_index |\
645 \n+------+------+---------------------+\
646 \n| bar | 003 | 1970-01-01T00:00:09 |\
647 \n+------+------+---------------------+",
648 ),
649 String::from(
650 "+------+------+---------------------+\
651 \n| host | path | time_index |\
652 \n+------+------+---------------------+\
653 \n| bla | 005 | 1970-01-01T00:00:10 |\
654 \n| bla | 005 | 1970-01-01T00:00:11 |\
655 \n| bla | 005 | 1970-01-01T00:00:12 |\
656 \n| bla | 005 | 1970-01-01T00:00:13 |\
657 \n| bla | 005 | 1970-01-01T00:00:14 |\
658 \n| bla | 005 | 1970-01-01T00:00:15 |\
659 \n| bla | 005 | 1970-01-01T00:00:16 |\
660 \n+------+------+---------------------+",
661 ),
662 String::from(
663 "+------+------+---------------------+\
664 \n| host | path | time_index |\
665 \n+------+------+---------------------+\
666 \n| 🥺 | 001 | 1970-01-01T00:00:17 |\
667 \n| 🥺 | 001 | 1970-01-01T00:00:18 |\
668 \n| 🥺 | 001 | 1970-01-01T00:00:19 |\
669 \n| 🥺 | 001 | 1970-01-01T00:00:20 |\
670 \n| 🥺 | 001 | 1970-01-01T00:00:21 |\
671 \n+------+------+---------------------+",
672 ),
673 String::from(
674 "+------+------+---------------------+\
675 \n| host | path | time_index |\
676 \n+------+------+---------------------+\
677 \n| 🫠| 001 | 1970-01-01T00:00:22 |\
678 \n| 🫠| 001 | 1970-01-01T00:00:23 |\
679 \n+------+------+---------------------+",
680 ),
681 ];
682 expectations.reverse();
683
684 while let Some(batch) = divide_stream.next().await {
685 let formatted =
686 datatypes::arrow::util::pretty::pretty_format_batches(&[batch.unwrap()])
687 .unwrap()
688 .to_string();
689 let expected = expectations.pop().unwrap();
690 assert_eq!(formatted, expected);
691 }
692 }
693
694 #[tokio::test]
695 async fn test_all_batches_same_combination() {
696 let schema = Arc::new(Schema::new(vec![
698 Field::new("host", DataType::Utf8, true),
699 Field::new("path", DataType::Utf8, true),
700 Field::new(
701 "time_index",
702 DataType::Timestamp(datafusion::arrow::datatypes::TimeUnit::Millisecond, None),
703 false,
704 ),
705 ]));
706
707 let batch1 = RecordBatch::try_new(
713 schema.clone(),
714 vec![
715 Arc::new(StringArray::from(vec!["server1", "server1", "server1"])) as _,
716 Arc::new(StringArray::from(vec!["/var/log", "/var/log", "/var/log"])) as _,
717 Arc::new(datafusion::arrow::array::TimestampMillisecondArray::from(
718 vec![1000, 2000, 3000],
719 )) as _,
720 ],
721 )
722 .unwrap();
723
724 let batch2 = RecordBatch::try_new(
725 schema.clone(),
726 vec![
727 Arc::new(StringArray::from(vec!["server1", "server1"])) as _,
728 Arc::new(StringArray::from(vec!["/var/log", "/var/log"])) as _,
729 Arc::new(datafusion::arrow::array::TimestampMillisecondArray::from(
730 vec![4000, 5000],
731 )) as _,
732 ],
733 )
734 .unwrap();
735
736 let batch3 = RecordBatch::try_new(
738 schema.clone(),
739 vec![
740 Arc::new(StringArray::from(vec!["server2", "server2", "server2"])) as _,
741 Arc::new(StringArray::from(vec![
742 "/var/data",
743 "/var/data",
744 "/var/data",
745 ])) as _,
746 Arc::new(datafusion::arrow::array::TimestampMillisecondArray::from(
747 vec![6000, 7000, 8000],
748 )) as _,
749 ],
750 )
751 .unwrap();
752
753 let batch4 = RecordBatch::try_new(
754 schema.clone(),
755 vec![
756 Arc::new(StringArray::from(vec!["server2"])) as _,
757 Arc::new(StringArray::from(vec!["/var/data"])) as _,
758 Arc::new(datafusion::arrow::array::TimestampMillisecondArray::from(
759 vec![9000],
760 )) as _,
761 ],
762 )
763 .unwrap();
764
765 let batch5 = RecordBatch::try_new(
767 schema.clone(),
768 vec![
769 Arc::new(StringArray::from(vec!["server3", "server3"])) as _,
770 Arc::new(StringArray::from(vec!["/opt/logs", "/opt/logs"])) as _,
771 Arc::new(datafusion::arrow::array::TimestampMillisecondArray::from(
772 vec![10000, 11000],
773 )) as _,
774 ],
775 )
776 .unwrap();
777
778 let batch6 = RecordBatch::try_new(
779 schema.clone(),
780 vec![
781 Arc::new(StringArray::from(vec!["server3", "server3", "server3"])) as _,
782 Arc::new(StringArray::from(vec![
783 "/opt/logs",
784 "/opt/logs",
785 "/opt/logs",
786 ])) as _,
787 Arc::new(datafusion::arrow::array::TimestampMillisecondArray::from(
788 vec![12000, 13000, 14000],
789 )) as _,
790 ],
791 )
792 .unwrap();
793
794 let memory_exec = Arc::new(
796 MemoryExec::try_new(
797 &[vec![batch1, batch2, batch3, batch4, batch5, batch6]],
798 schema.clone(),
799 None,
800 )
801 .unwrap(),
802 );
803
804 let divide_exec = Arc::new(SeriesDivideExec {
806 tag_columns: vec!["host".to_string(), "path".to_string()],
807 time_index_column: "time_index".to_string(),
808 input: memory_exec,
809 metric: ExecutionPlanMetricsSet::new(),
810 });
811
812 let session_context = SessionContext::default();
814 let result =
815 datafusion::physical_plan::collect(divide_exec.clone(), session_context.task_ctx())
816 .await
817 .unwrap();
818
819 assert_eq!(result.len(), 3);
821
822 assert_eq!(result[0].num_rows(), 5);
824
825 assert_eq!(result[1].num_rows(), 4);
827
828 assert_eq!(result[2].num_rows(), 5);
830
831 let host_array1 = result[0]
833 .column(0)
834 .as_any()
835 .downcast_ref::<StringArray>()
836 .unwrap();
837 let path_array1 = result[0]
838 .column(1)
839 .as_any()
840 .downcast_ref::<StringArray>()
841 .unwrap();
842 let time_index_array1 = result[0]
843 .column(2)
844 .as_any()
845 .downcast_ref::<datafusion::arrow::array::TimestampMillisecondArray>()
846 .unwrap();
847
848 for i in 0..5 {
849 assert_eq!(host_array1.value(i), "server1");
850 assert_eq!(path_array1.value(i), "/var/log");
851 assert_eq!(time_index_array1.value(i), 1000 + (i as i64) * 1000);
852 }
853
854 let host_array2 = result[1]
856 .column(0)
857 .as_any()
858 .downcast_ref::<StringArray>()
859 .unwrap();
860 let path_array2 = result[1]
861 .column(1)
862 .as_any()
863 .downcast_ref::<StringArray>()
864 .unwrap();
865 let time_index_array2 = result[1]
866 .column(2)
867 .as_any()
868 .downcast_ref::<datafusion::arrow::array::TimestampMillisecondArray>()
869 .unwrap();
870
871 for i in 0..4 {
872 assert_eq!(host_array2.value(i), "server2");
873 assert_eq!(path_array2.value(i), "/var/data");
874 assert_eq!(time_index_array2.value(i), 6000 + (i as i64) * 1000);
875 }
876
877 let host_array3 = result[2]
879 .column(0)
880 .as_any()
881 .downcast_ref::<StringArray>()
882 .unwrap();
883 let path_array3 = result[2]
884 .column(1)
885 .as_any()
886 .downcast_ref::<StringArray>()
887 .unwrap();
888 let time_index_array3 = result[2]
889 .column(2)
890 .as_any()
891 .downcast_ref::<datafusion::arrow::array::TimestampMillisecondArray>()
892 .unwrap();
893
894 for i in 0..5 {
895 assert_eq!(host_array3.value(i), "server3");
896 assert_eq!(path_array3.value(i), "/opt/logs");
897 assert_eq!(time_index_array3.value(i), 10000 + (i as i64) * 1000);
898 }
899
900 let mut divide_stream = divide_exec
902 .execute(0, SessionContext::default().task_ctx())
903 .unwrap();
904
905 let batch1 = divide_stream.next().await.unwrap().unwrap();
907 assert_eq!(batch1.num_rows(), 5); let batch2 = divide_stream.next().await.unwrap().unwrap();
910 assert_eq!(batch2.num_rows(), 4); let batch3 = divide_stream.next().await.unwrap().unwrap();
913 assert_eq!(batch3.num_rows(), 5); assert!(divide_stream.next().await.is_none());
917 }
918}