1use std::any::Any;
16use std::pin::Pin;
17use std::sync::Arc;
18use std::task::{Context, Poll};
19
20use datafusion::arrow::array::{Array, StringArray};
21use datafusion::arrow::datatypes::SchemaRef;
22use datafusion::arrow::record_batch::RecordBatch;
23use datafusion::common::{DFSchema, DFSchemaRef};
24use datafusion::error::Result as DataFusionResult;
25use datafusion::execution::context::TaskContext;
26use datafusion::logical_expr::{EmptyRelation, Expr, LogicalPlan, UserDefinedLogicalNodeCore};
27use datafusion::physical_expr::{LexRequirement, OrderingRequirements, PhysicalSortRequirement};
28use datafusion::physical_plan::expressions::Column as ColumnExpr;
29use datafusion::physical_plan::metrics::{
30 BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, MetricValue, MetricsSet,
31};
32use datafusion::physical_plan::{
33 DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, PlanProperties, RecordBatchStream,
34 SendableRecordBatchStream,
35};
36use datatypes::arrow::compute;
37use datatypes::compute::SortOptions;
38use futures::{ready, Stream, StreamExt};
39use greptime_proto::substrait_extension as pb;
40use prost::Message;
41use snafu::ResultExt;
42
43use crate::error::{DeserializeSnafu, Result};
44use crate::extension_plan::METRIC_NUM_SERIES;
45use crate::metrics::PROMQL_SERIES_COUNT;
46
47#[derive(Debug, PartialEq, Eq, Hash, PartialOrd)]
48pub struct SeriesDivide {
49 tag_columns: Vec<String>,
50 time_index_column: String,
55 input: LogicalPlan,
56}
57
58impl UserDefinedLogicalNodeCore for SeriesDivide {
59 fn name(&self) -> &str {
60 Self::name()
61 }
62
63 fn inputs(&self) -> Vec<&LogicalPlan> {
64 vec![&self.input]
65 }
66
67 fn schema(&self) -> &DFSchemaRef {
68 self.input.schema()
69 }
70
71 fn expressions(&self) -> Vec<Expr> {
72 vec![]
73 }
74
75 fn fmt_for_explain(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
76 write!(f, "PromSeriesDivide: tags={:?}", self.tag_columns)
77 }
78
79 fn with_exprs_and_inputs(
80 &self,
81 _exprs: Vec<Expr>,
82 inputs: Vec<LogicalPlan>,
83 ) -> DataFusionResult<Self> {
84 if inputs.is_empty() {
85 return Err(datafusion::error::DataFusionError::Internal(
86 "SeriesDivide must have at least one input".to_string(),
87 ));
88 }
89
90 Ok(Self {
91 tag_columns: self.tag_columns.clone(),
92 time_index_column: self.time_index_column.clone(),
93 input: inputs[0].clone(),
94 })
95 }
96}
97
98impl SeriesDivide {
99 pub fn new(tag_columns: Vec<String>, time_index_column: String, input: LogicalPlan) -> Self {
100 Self {
101 tag_columns,
102 time_index_column,
103 input,
104 }
105 }
106
107 pub const fn name() -> &'static str {
108 "SeriesDivide"
109 }
110
111 pub fn to_execution_plan(&self, exec_input: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
112 Arc::new(SeriesDivideExec {
113 tag_columns: self.tag_columns.clone(),
114 time_index_column: self.time_index_column.clone(),
115 input: exec_input,
116 metric: ExecutionPlanMetricsSet::new(),
117 })
118 }
119
120 pub fn tags(&self) -> &[String] {
121 &self.tag_columns
122 }
123
124 pub fn serialize(&self) -> Vec<u8> {
125 pb::SeriesDivide {
126 tag_columns: self.tag_columns.clone(),
127 time_index_column: self.time_index_column.clone(),
128 }
129 .encode_to_vec()
130 }
131
132 pub fn deserialize(bytes: &[u8]) -> Result<Self> {
133 let pb_series_divide = pb::SeriesDivide::decode(bytes).context(DeserializeSnafu)?;
134 let placeholder_plan = LogicalPlan::EmptyRelation(EmptyRelation {
135 produce_one_row: false,
136 schema: Arc::new(DFSchema::empty()),
137 });
138 Ok(Self {
139 tag_columns: pb_series_divide.tag_columns,
140 time_index_column: pb_series_divide.time_index_column,
141 input: placeholder_plan,
142 })
143 }
144}
145
146#[derive(Debug)]
147pub struct SeriesDivideExec {
148 tag_columns: Vec<String>,
149 time_index_column: String,
150 input: Arc<dyn ExecutionPlan>,
151 metric: ExecutionPlanMetricsSet,
152}
153
154impl ExecutionPlan for SeriesDivideExec {
155 fn as_any(&self) -> &dyn Any {
156 self
157 }
158
159 fn schema(&self) -> SchemaRef {
160 self.input.schema()
161 }
162
163 fn properties(&self) -> &PlanProperties {
164 self.input.properties()
165 }
166
167 fn required_input_distribution(&self) -> Vec<Distribution> {
168 let schema = self.input.schema();
169 vec![Distribution::HashPartitioned(
170 self.tag_columns
171 .iter()
172 .map(|tag| Arc::new(ColumnExpr::new_with_schema(tag, &schema).unwrap()) as _)
174 .collect(),
175 )]
176 }
177
178 fn required_input_ordering(&self) -> Vec<Option<OrderingRequirements>> {
179 let input_schema = self.input.schema();
180 let mut exprs: Vec<PhysicalSortRequirement> = self
181 .tag_columns
182 .iter()
183 .map(|tag| PhysicalSortRequirement {
184 expr: Arc::new(ColumnExpr::new_with_schema(tag, &input_schema).unwrap()),
186 options: Some(SortOptions {
187 descending: false,
188 nulls_first: true,
189 }),
190 })
191 .collect();
192
193 exprs.push(PhysicalSortRequirement {
194 expr: Arc::new(
195 ColumnExpr::new_with_schema(&self.time_index_column, &input_schema).unwrap(),
196 ),
197 options: Some(SortOptions {
198 descending: false,
199 nulls_first: true,
200 }),
201 });
202
203 let requirement = LexRequirement::new(exprs).unwrap();
205
206 vec![Some(OrderingRequirements::Hard(vec![requirement]))]
207 }
208
209 fn maintains_input_order(&self) -> Vec<bool> {
210 vec![true; self.children().len()]
211 }
212
213 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
214 vec![&self.input]
215 }
216
217 fn with_new_children(
218 self: Arc<Self>,
219 children: Vec<Arc<dyn ExecutionPlan>>,
220 ) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
221 assert!(!children.is_empty());
222 Ok(Arc::new(Self {
223 tag_columns: self.tag_columns.clone(),
224 time_index_column: self.time_index_column.clone(),
225 input: children[0].clone(),
226 metric: self.metric.clone(),
227 }))
228 }
229
230 fn execute(
231 &self,
232 partition: usize,
233 context: Arc<TaskContext>,
234 ) -> DataFusionResult<SendableRecordBatchStream> {
235 let baseline_metric = BaselineMetrics::new(&self.metric, partition);
236 let metrics_builder = MetricBuilder::new(&self.metric);
237 let num_series = Count::new();
238 metrics_builder
239 .with_partition(partition)
240 .build(MetricValue::Count {
241 name: METRIC_NUM_SERIES.into(),
242 count: num_series.clone(),
243 });
244
245 let input = self.input.execute(partition, context)?;
246 let schema = input.schema();
247 let tag_indices = self
248 .tag_columns
249 .iter()
250 .map(|tag| {
251 schema
252 .column_with_name(tag)
253 .unwrap_or_else(|| panic!("tag column not found {tag}"))
254 .0
255 })
256 .collect();
257 Ok(Box::pin(SeriesDivideStream {
258 tag_indices,
259 buffer: vec![],
260 schema,
261 input,
262 metric: baseline_metric,
263 num_series,
264 inspect_start: 0,
265 }))
266 }
267
268 fn metrics(&self) -> Option<MetricsSet> {
269 Some(self.metric.clone_inner())
270 }
271
272 fn name(&self) -> &str {
273 "SeriesDivideExec"
274 }
275}
276
277impl DisplayAs for SeriesDivideExec {
278 fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
279 match t {
280 DisplayFormatType::Default
281 | DisplayFormatType::Verbose
282 | DisplayFormatType::TreeRender => {
283 write!(f, "PromSeriesDivideExec: tags={:?}", self.tag_columns)
284 }
285 }
286 }
287}
288
289pub struct SeriesDivideStream {
291 tag_indices: Vec<usize>,
292 buffer: Vec<RecordBatch>,
293 schema: SchemaRef,
294 input: SendableRecordBatchStream,
295 metric: BaselineMetrics,
296 inspect_start: usize,
298 num_series: Count,
300}
301
302impl RecordBatchStream for SeriesDivideStream {
303 fn schema(&self) -> SchemaRef {
304 self.schema.clone()
305 }
306}
307
308impl Stream for SeriesDivideStream {
309 type Item = DataFusionResult<RecordBatch>;
310
311 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
312 loop {
313 if !self.buffer.is_empty() {
314 let timer = std::time::Instant::now();
315 let cut_at = match self.find_first_diff_row() {
316 Ok(cut_at) => cut_at,
317 Err(e) => return Poll::Ready(Some(Err(e))),
318 };
319 if let Some((batch_index, row_index)) = cut_at {
320 let half_batch_of_first_series =
322 self.buffer[batch_index].slice(0, row_index + 1);
323 let half_batch_of_second_series = self.buffer[batch_index].slice(
324 row_index + 1,
325 self.buffer[batch_index].num_rows() - row_index - 1,
326 );
327 let result_batches = self
328 .buffer
329 .drain(0..batch_index)
330 .chain([half_batch_of_first_series])
331 .collect::<Vec<_>>();
332 if half_batch_of_second_series.num_rows() > 0 {
333 self.buffer[0] = half_batch_of_second_series;
334 } else {
335 self.buffer.remove(0);
336 }
337 let result_batch = compute::concat_batches(&self.schema, &result_batches)?;
338
339 self.inspect_start = 0;
340 self.num_series.add(1);
341 self.metric.elapsed_compute().add_elapsed(timer);
342 return Poll::Ready(Some(Ok(result_batch)));
343 } else {
344 self.metric.elapsed_compute().add_elapsed(timer);
345 let next_batch = ready!(self.as_mut().fetch_next_batch(cx)).transpose()?;
347 let timer = std::time::Instant::now();
348 if let Some(next_batch) = next_batch {
349 if next_batch.num_rows() != 0 {
350 self.buffer.push(next_batch);
351 }
352 continue;
353 } else {
354 let result = compute::concat_batches(&self.schema, &self.buffer)?;
356 self.buffer.clear();
357 self.inspect_start = 0;
358 self.num_series.add(1);
359 self.metric.elapsed_compute().add_elapsed(timer);
360 return Poll::Ready(Some(Ok(result)));
361 }
362 }
363 } else {
364 let batch = match ready!(self.as_mut().fetch_next_batch(cx)) {
365 Some(Ok(batch)) => batch,
366 None => {
367 PROMQL_SERIES_COUNT.observe(self.num_series.value() as f64);
368 return Poll::Ready(None);
369 }
370 error => return Poll::Ready(error),
371 };
372 self.buffer.push(batch);
373 continue;
374 }
375 }
376 }
377}
378
379impl SeriesDivideStream {
380 fn fetch_next_batch(
381 mut self: Pin<&mut Self>,
382 cx: &mut Context<'_>,
383 ) -> Poll<Option<DataFusionResult<RecordBatch>>> {
384 let poll = self.input.poll_next_unpin(cx);
385 self.metric.record_poll(poll)
386 }
387
388 fn find_first_diff_row(&mut self) -> DataFusionResult<Option<(usize, usize)>> {
391 if self.tag_indices.is_empty() {
393 return Ok(None);
394 }
395
396 let mut resumed_batch_index = self.inspect_start;
397
398 for batch in &self.buffer[resumed_batch_index..] {
399 let num_rows = batch.num_rows();
400 let mut result_index = num_rows;
401
402 if resumed_batch_index > self.inspect_start.checked_sub(1).unwrap_or_default() {
404 let last_batch = &self.buffer[resumed_batch_index - 1];
405 let last_row = last_batch.num_rows() - 1;
406 for index in &self.tag_indices {
407 let current_array = batch.column(*index);
408 let last_array = last_batch.column(*index);
409 let current_string_array = current_array
410 .as_any()
411 .downcast_ref::<StringArray>()
412 .ok_or_else(|| {
413 datafusion::error::DataFusionError::Internal(
414 "Failed to downcast tag column to StringArray".to_string(),
415 )
416 })?;
417 let last_string_array = last_array
418 .as_any()
419 .downcast_ref::<StringArray>()
420 .ok_or_else(|| {
421 datafusion::error::DataFusionError::Internal(
422 "Failed to downcast tag column to StringArray".to_string(),
423 )
424 })?;
425 let current_value = current_string_array.value(0);
426 let last_value = last_string_array.value(last_row);
427 if current_value != last_value {
428 return Ok(Some((resumed_batch_index - 1, last_batch.num_rows() - 1)));
429 }
430 }
431 }
432
433 let mut all_same = true;
435 for index in &self.tag_indices {
436 let array = batch.column(*index);
437 let string_array =
438 array
439 .as_any()
440 .downcast_ref::<StringArray>()
441 .ok_or_else(|| {
442 datafusion::error::DataFusionError::Internal(
443 "Failed to downcast tag column to StringArray".to_string(),
444 )
445 })?;
446 if string_array.value(0) != string_array.value(num_rows - 1) {
447 all_same = false;
448 break;
449 }
450 }
451 if all_same {
452 resumed_batch_index += 1;
453 continue;
454 }
455
456 for index in &self.tag_indices {
458 let array = batch.column(*index);
459 let string_array =
460 array
461 .as_any()
462 .downcast_ref::<StringArray>()
463 .ok_or_else(|| {
464 datafusion::error::DataFusionError::Internal(
465 "Failed to downcast tag column to StringArray".to_string(),
466 )
467 })?;
468 let mut same_until = 0;
470 while same_until < num_rows - 1 {
471 if string_array.value(same_until) != string_array.value(same_until + 1) {
472 break;
473 }
474 same_until += 1;
475 }
476 result_index = result_index.min(same_until);
477 }
478
479 if result_index + 1 >= num_rows {
480 resumed_batch_index += 1;
482 } else {
483 return Ok(Some((resumed_batch_index, result_index)));
484 }
485 }
486
487 self.inspect_start = resumed_batch_index;
488 Ok(None)
489 }
490}
491
492#[cfg(test)]
493mod test {
494 use datafusion::arrow::datatypes::{DataType, Field, Schema};
495 use datafusion::datasource::memory::MemorySourceConfig;
496 use datafusion::datasource::source::DataSourceExec;
497 use datafusion::prelude::SessionContext;
498
499 use super::*;
500
501 fn prepare_test_data() -> DataSourceExec {
502 let schema = Arc::new(Schema::new(vec![
503 Field::new("host", DataType::Utf8, true),
504 Field::new("path", DataType::Utf8, true),
505 Field::new(
506 "time_index",
507 DataType::Timestamp(datafusion::arrow::datatypes::TimeUnit::Millisecond, None),
508 false,
509 ),
510 ]));
511
512 let path_column_1 = Arc::new(StringArray::from(vec![
513 "foo", "foo", "foo", "bar", "bar", "bar", "bar", "bar", "bar", "bla", "bla", "bla",
514 ])) as _;
515 let host_column_1 = Arc::new(StringArray::from(vec![
516 "000", "000", "001", "002", "002", "002", "002", "002", "003", "005", "005", "005",
517 ])) as _;
518 let time_index_column_1 = Arc::new(
519 datafusion::arrow::array::TimestampMillisecondArray::from(vec![
520 1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000, 10000, 11000, 12000,
521 ]),
522 ) as _;
523
524 let path_column_2 = Arc::new(StringArray::from(vec!["bla", "bla", "bla"])) as _;
525 let host_column_2 = Arc::new(StringArray::from(vec!["005", "005", "005"])) as _;
526 let time_index_column_2 = Arc::new(
527 datafusion::arrow::array::TimestampMillisecondArray::from(vec![13000, 14000, 15000]),
528 ) as _;
529
530 let path_column_3 = Arc::new(StringArray::from(vec![
531 "bla", "🥺", "🥺", "🥺", "🥺", "🥺", "🫠", "🫠",
532 ])) as _;
533 let host_column_3 = Arc::new(StringArray::from(vec![
534 "005", "001", "001", "001", "001", "001", "001", "001",
535 ])) as _;
536 let time_index_column_3 =
537 Arc::new(datafusion::arrow::array::TimestampMillisecondArray::from(
538 vec![16000, 17000, 18000, 19000, 20000, 21000, 22000, 23000],
539 )) as _;
540
541 let data_1 = RecordBatch::try_new(
542 schema.clone(),
543 vec![path_column_1, host_column_1, time_index_column_1],
544 )
545 .unwrap();
546 let data_2 = RecordBatch::try_new(
547 schema.clone(),
548 vec![path_column_2, host_column_2, time_index_column_2],
549 )
550 .unwrap();
551 let data_3 = RecordBatch::try_new(
552 schema.clone(),
553 vec![path_column_3, host_column_3, time_index_column_3],
554 )
555 .unwrap();
556
557 DataSourceExec::new(Arc::new(
558 MemorySourceConfig::try_new(&[vec![data_1, data_2, data_3]], schema, None).unwrap(),
559 ))
560 }
561
562 #[tokio::test]
563 async fn overall_data() {
564 let memory_exec = Arc::new(prepare_test_data());
565 let divide_exec = Arc::new(SeriesDivideExec {
566 tag_columns: vec!["host".to_string(), "path".to_string()],
567 time_index_column: "time_index".to_string(),
568 input: memory_exec,
569 metric: ExecutionPlanMetricsSet::new(),
570 });
571 let session_context = SessionContext::default();
572 let result = datafusion::physical_plan::collect(divide_exec, session_context.task_ctx())
573 .await
574 .unwrap();
575 let result_literal = datatypes::arrow::util::pretty::pretty_format_batches(&result)
576 .unwrap()
577 .to_string();
578
579 let expected = String::from(
580 "+------+------+---------------------+\
581 \n| host | path | time_index |\
582 \n+------+------+---------------------+\
583 \n| foo | 000 | 1970-01-01T00:00:01 |\
584 \n| foo | 000 | 1970-01-01T00:00:02 |\
585 \n| foo | 001 | 1970-01-01T00:00:03 |\
586 \n| bar | 002 | 1970-01-01T00:00:04 |\
587 \n| bar | 002 | 1970-01-01T00:00:05 |\
588 \n| bar | 002 | 1970-01-01T00:00:06 |\
589 \n| bar | 002 | 1970-01-01T00:00:07 |\
590 \n| bar | 002 | 1970-01-01T00:00:08 |\
591 \n| bar | 003 | 1970-01-01T00:00:09 |\
592 \n| bla | 005 | 1970-01-01T00:00:10 |\
593 \n| bla | 005 | 1970-01-01T00:00:11 |\
594 \n| bla | 005 | 1970-01-01T00:00:12 |\
595 \n| bla | 005 | 1970-01-01T00:00:13 |\
596 \n| bla | 005 | 1970-01-01T00:00:14 |\
597 \n| bla | 005 | 1970-01-01T00:00:15 |\
598 \n| bla | 005 | 1970-01-01T00:00:16 |\
599 \n| 🥺 | 001 | 1970-01-01T00:00:17 |\
600 \n| 🥺 | 001 | 1970-01-01T00:00:18 |\
601 \n| 🥺 | 001 | 1970-01-01T00:00:19 |\
602 \n| 🥺 | 001 | 1970-01-01T00:00:20 |\
603 \n| 🥺 | 001 | 1970-01-01T00:00:21 |\
604 \n| 🫠| 001 | 1970-01-01T00:00:22 |\
605 \n| 🫠| 001 | 1970-01-01T00:00:23 |\
606 \n+------+------+---------------------+",
607 );
608 assert_eq!(result_literal, expected);
609 }
610
611 #[tokio::test]
612 async fn per_batch_data() {
613 let memory_exec = Arc::new(prepare_test_data());
614 let divide_exec = Arc::new(SeriesDivideExec {
615 tag_columns: vec!["host".to_string(), "path".to_string()],
616 time_index_column: "time_index".to_string(),
617 input: memory_exec,
618 metric: ExecutionPlanMetricsSet::new(),
619 });
620 let mut divide_stream = divide_exec
621 .execute(0, SessionContext::default().task_ctx())
622 .unwrap();
623
624 let mut expectations = vec![
625 String::from(
626 "+------+------+---------------------+\
627 \n| host | path | time_index |\
628 \n+------+------+---------------------+\
629 \n| foo | 000 | 1970-01-01T00:00:01 |\
630 \n| foo | 000 | 1970-01-01T00:00:02 |\
631 \n+------+------+---------------------+",
632 ),
633 String::from(
634 "+------+------+---------------------+\
635 \n| host | path | time_index |\
636 \n+------+------+---------------------+\
637 \n| foo | 001 | 1970-01-01T00:00:03 |\
638 \n+------+------+---------------------+",
639 ),
640 String::from(
641 "+------+------+---------------------+\
642 \n| host | path | time_index |\
643 \n+------+------+---------------------+\
644 \n| bar | 002 | 1970-01-01T00:00:04 |\
645 \n| bar | 002 | 1970-01-01T00:00:05 |\
646 \n| bar | 002 | 1970-01-01T00:00:06 |\
647 \n| bar | 002 | 1970-01-01T00:00:07 |\
648 \n| bar | 002 | 1970-01-01T00:00:08 |\
649 \n+------+------+---------------------+",
650 ),
651 String::from(
652 "+------+------+---------------------+\
653 \n| host | path | time_index |\
654 \n+------+------+---------------------+\
655 \n| bar | 003 | 1970-01-01T00:00:09 |\
656 \n+------+------+---------------------+",
657 ),
658 String::from(
659 "+------+------+---------------------+\
660 \n| host | path | time_index |\
661 \n+------+------+---------------------+\
662 \n| bla | 005 | 1970-01-01T00:00:10 |\
663 \n| bla | 005 | 1970-01-01T00:00:11 |\
664 \n| bla | 005 | 1970-01-01T00:00:12 |\
665 \n| bla | 005 | 1970-01-01T00:00:13 |\
666 \n| bla | 005 | 1970-01-01T00:00:14 |\
667 \n| bla | 005 | 1970-01-01T00:00:15 |\
668 \n| bla | 005 | 1970-01-01T00:00:16 |\
669 \n+------+------+---------------------+",
670 ),
671 String::from(
672 "+------+------+---------------------+\
673 \n| host | path | time_index |\
674 \n+------+------+---------------------+\
675 \n| 🥺 | 001 | 1970-01-01T00:00:17 |\
676 \n| 🥺 | 001 | 1970-01-01T00:00:18 |\
677 \n| 🥺 | 001 | 1970-01-01T00:00:19 |\
678 \n| 🥺 | 001 | 1970-01-01T00:00:20 |\
679 \n| 🥺 | 001 | 1970-01-01T00:00:21 |\
680 \n+------+------+---------------------+",
681 ),
682 String::from(
683 "+------+------+---------------------+\
684 \n| host | path | time_index |\
685 \n+------+------+---------------------+\
686 \n| 🫠| 001 | 1970-01-01T00:00:22 |\
687 \n| 🫠| 001 | 1970-01-01T00:00:23 |\
688 \n+------+------+---------------------+",
689 ),
690 ];
691 expectations.reverse();
692
693 while let Some(batch) = divide_stream.next().await {
694 let formatted =
695 datatypes::arrow::util::pretty::pretty_format_batches(&[batch.unwrap()])
696 .unwrap()
697 .to_string();
698 let expected = expectations.pop().unwrap();
699 assert_eq!(formatted, expected);
700 }
701 }
702
703 #[tokio::test]
704 async fn test_all_batches_same_combination() {
705 let schema = Arc::new(Schema::new(vec![
707 Field::new("host", DataType::Utf8, true),
708 Field::new("path", DataType::Utf8, true),
709 Field::new(
710 "time_index",
711 DataType::Timestamp(datafusion::arrow::datatypes::TimeUnit::Millisecond, None),
712 false,
713 ),
714 ]));
715
716 let batch1 = RecordBatch::try_new(
722 schema.clone(),
723 vec![
724 Arc::new(StringArray::from(vec!["server1", "server1", "server1"])) as _,
725 Arc::new(StringArray::from(vec!["/var/log", "/var/log", "/var/log"])) as _,
726 Arc::new(datafusion::arrow::array::TimestampMillisecondArray::from(
727 vec![1000, 2000, 3000],
728 )) as _,
729 ],
730 )
731 .unwrap();
732
733 let batch2 = RecordBatch::try_new(
734 schema.clone(),
735 vec![
736 Arc::new(StringArray::from(vec!["server1", "server1"])) as _,
737 Arc::new(StringArray::from(vec!["/var/log", "/var/log"])) as _,
738 Arc::new(datafusion::arrow::array::TimestampMillisecondArray::from(
739 vec![4000, 5000],
740 )) as _,
741 ],
742 )
743 .unwrap();
744
745 let batch3 = RecordBatch::try_new(
747 schema.clone(),
748 vec![
749 Arc::new(StringArray::from(vec!["server2", "server2", "server2"])) as _,
750 Arc::new(StringArray::from(vec![
751 "/var/data",
752 "/var/data",
753 "/var/data",
754 ])) as _,
755 Arc::new(datafusion::arrow::array::TimestampMillisecondArray::from(
756 vec![6000, 7000, 8000],
757 )) as _,
758 ],
759 )
760 .unwrap();
761
762 let batch4 = RecordBatch::try_new(
763 schema.clone(),
764 vec![
765 Arc::new(StringArray::from(vec!["server2"])) as _,
766 Arc::new(StringArray::from(vec!["/var/data"])) as _,
767 Arc::new(datafusion::arrow::array::TimestampMillisecondArray::from(
768 vec![9000],
769 )) as _,
770 ],
771 )
772 .unwrap();
773
774 let batch5 = RecordBatch::try_new(
776 schema.clone(),
777 vec![
778 Arc::new(StringArray::from(vec!["server3", "server3"])) as _,
779 Arc::new(StringArray::from(vec!["/opt/logs", "/opt/logs"])) as _,
780 Arc::new(datafusion::arrow::array::TimestampMillisecondArray::from(
781 vec![10000, 11000],
782 )) as _,
783 ],
784 )
785 .unwrap();
786
787 let batch6 = RecordBatch::try_new(
788 schema.clone(),
789 vec![
790 Arc::new(StringArray::from(vec!["server3", "server3", "server3"])) as _,
791 Arc::new(StringArray::from(vec![
792 "/opt/logs",
793 "/opt/logs",
794 "/opt/logs",
795 ])) as _,
796 Arc::new(datafusion::arrow::array::TimestampMillisecondArray::from(
797 vec![12000, 13000, 14000],
798 )) as _,
799 ],
800 )
801 .unwrap();
802
803 let memory_exec = DataSourceExec::from_data_source(
805 MemorySourceConfig::try_new(
806 &[vec![batch1, batch2, batch3, batch4, batch5, batch6]],
807 schema.clone(),
808 None,
809 )
810 .unwrap(),
811 );
812
813 let divide_exec = Arc::new(SeriesDivideExec {
815 tag_columns: vec!["host".to_string(), "path".to_string()],
816 time_index_column: "time_index".to_string(),
817 input: memory_exec,
818 metric: ExecutionPlanMetricsSet::new(),
819 });
820
821 let session_context = SessionContext::default();
823 let result =
824 datafusion::physical_plan::collect(divide_exec.clone(), session_context.task_ctx())
825 .await
826 .unwrap();
827
828 assert_eq!(result.len(), 3);
830
831 assert_eq!(result[0].num_rows(), 5);
833
834 assert_eq!(result[1].num_rows(), 4);
836
837 assert_eq!(result[2].num_rows(), 5);
839
840 let host_array1 = result[0]
842 .column(0)
843 .as_any()
844 .downcast_ref::<StringArray>()
845 .unwrap();
846 let path_array1 = result[0]
847 .column(1)
848 .as_any()
849 .downcast_ref::<StringArray>()
850 .unwrap();
851 let time_index_array1 = result[0]
852 .column(2)
853 .as_any()
854 .downcast_ref::<datafusion::arrow::array::TimestampMillisecondArray>()
855 .unwrap();
856
857 for i in 0..5 {
858 assert_eq!(host_array1.value(i), "server1");
859 assert_eq!(path_array1.value(i), "/var/log");
860 assert_eq!(time_index_array1.value(i), 1000 + (i as i64) * 1000);
861 }
862
863 let host_array2 = result[1]
865 .column(0)
866 .as_any()
867 .downcast_ref::<StringArray>()
868 .unwrap();
869 let path_array2 = result[1]
870 .column(1)
871 .as_any()
872 .downcast_ref::<StringArray>()
873 .unwrap();
874 let time_index_array2 = result[1]
875 .column(2)
876 .as_any()
877 .downcast_ref::<datafusion::arrow::array::TimestampMillisecondArray>()
878 .unwrap();
879
880 for i in 0..4 {
881 assert_eq!(host_array2.value(i), "server2");
882 assert_eq!(path_array2.value(i), "/var/data");
883 assert_eq!(time_index_array2.value(i), 6000 + (i as i64) * 1000);
884 }
885
886 let host_array3 = result[2]
888 .column(0)
889 .as_any()
890 .downcast_ref::<StringArray>()
891 .unwrap();
892 let path_array3 = result[2]
893 .column(1)
894 .as_any()
895 .downcast_ref::<StringArray>()
896 .unwrap();
897 let time_index_array3 = result[2]
898 .column(2)
899 .as_any()
900 .downcast_ref::<datafusion::arrow::array::TimestampMillisecondArray>()
901 .unwrap();
902
903 for i in 0..5 {
904 assert_eq!(host_array3.value(i), "server3");
905 assert_eq!(path_array3.value(i), "/opt/logs");
906 assert_eq!(time_index_array3.value(i), 10000 + (i as i64) * 1000);
907 }
908
909 let mut divide_stream = divide_exec
911 .execute(0, SessionContext::default().task_ctx())
912 .unwrap();
913
914 let batch1 = divide_stream.next().await.unwrap().unwrap();
916 assert_eq!(batch1.num_rows(), 5); let batch2 = divide_stream.next().await.unwrap().unwrap();
919 assert_eq!(batch2.num_rows(), 4); let batch3 = divide_stream.next().await.unwrap().unwrap();
922 assert_eq!(batch3.num_rows(), 5); assert!(divide_stream.next().await.is_none());
926 }
927}