1use std::any::Any;
16use std::pin::Pin;
17use std::sync::Arc;
18use std::task::{Context, Poll};
19
20use datafusion::arrow::array::{Array, StringArray};
21use datafusion::arrow::datatypes::SchemaRef;
22use datafusion::arrow::record_batch::RecordBatch;
23use datafusion::common::{DFSchema, DFSchemaRef};
24use datafusion::error::Result as DataFusionResult;
25use datafusion::execution::context::TaskContext;
26use datafusion::logical_expr::{EmptyRelation, Expr, LogicalPlan, UserDefinedLogicalNodeCore};
27use datafusion::physical_expr::{LexRequirement, OrderingRequirements, PhysicalSortRequirement};
28use datafusion::physical_plan::expressions::Column as ColumnExpr;
29use datafusion::physical_plan::metrics::{
30 BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, MetricValue, MetricsSet,
31};
32use datafusion::physical_plan::{
33 DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, PlanProperties, RecordBatchStream,
34 SendableRecordBatchStream,
35};
36use datatypes::arrow::compute;
37use datatypes::compute::SortOptions;
38use futures::{Stream, StreamExt, ready};
39use greptime_proto::substrait_extension as pb;
40use prost::Message;
41use snafu::ResultExt;
42
43use crate::error::{DeserializeSnafu, Result};
44use crate::extension_plan::{METRIC_NUM_SERIES, resolve_column_name, serialize_column_index};
45use crate::metrics::PROMQL_SERIES_COUNT;
46
47#[derive(Debug, PartialEq, Eq, Hash, PartialOrd)]
48pub struct SeriesDivide {
49 tag_columns: Vec<String>,
50 time_index_column: String,
55 input: LogicalPlan,
56 unfix: Option<UnfixIndices>,
57}
58
59#[derive(Debug, PartialEq, Eq, Hash, PartialOrd)]
60struct UnfixIndices {
61 pub tag_column_indices: Vec<u64>,
62 pub time_index_column_idx: u64,
63}
64
65impl UserDefinedLogicalNodeCore for SeriesDivide {
66 fn name(&self) -> &str {
67 Self::name()
68 }
69
70 fn inputs(&self) -> Vec<&LogicalPlan> {
71 vec![&self.input]
72 }
73
74 fn schema(&self) -> &DFSchemaRef {
75 self.input.schema()
76 }
77
78 fn expressions(&self) -> Vec<Expr> {
79 vec![]
80 }
81
82 fn fmt_for_explain(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
83 write!(f, "PromSeriesDivide: tags={:?}", self.tag_columns)
84 }
85
86 fn with_exprs_and_inputs(
87 &self,
88 _exprs: Vec<Expr>,
89 inputs: Vec<LogicalPlan>,
90 ) -> DataFusionResult<Self> {
91 if inputs.is_empty() {
92 return Err(datafusion::error::DataFusionError::Internal(
93 "SeriesDivide must have at least one input".to_string(),
94 ));
95 }
96
97 let input: LogicalPlan = inputs[0].clone();
98 let input_schema = input.schema();
99
100 if let Some(unfix) = &self.unfix {
101 let tag_columns = unfix
103 .tag_column_indices
104 .iter()
105 .map(|idx| resolve_column_name(*idx, input_schema, "SeriesDivide", "tag"))
106 .collect::<DataFusionResult<Vec<String>>>()?;
107
108 let time_index_column = resolve_column_name(
109 unfix.time_index_column_idx,
110 input_schema,
111 "SeriesDivide",
112 "time index",
113 )?;
114
115 Ok(Self {
116 tag_columns,
117 time_index_column,
118 input,
119 unfix: None,
120 })
121 } else {
122 Ok(Self {
123 tag_columns: self.tag_columns.clone(),
124 time_index_column: self.time_index_column.clone(),
125 input,
126 unfix: None,
127 })
128 }
129 }
130}
131
132impl SeriesDivide {
133 pub fn new(tag_columns: Vec<String>, time_index_column: String, input: LogicalPlan) -> Self {
134 Self {
135 tag_columns,
136 time_index_column,
137 input,
138 unfix: None,
139 }
140 }
141
142 pub const fn name() -> &'static str {
143 "SeriesDivide"
144 }
145
146 pub fn to_execution_plan(&self, exec_input: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
147 Arc::new(SeriesDivideExec {
148 tag_columns: self.tag_columns.clone(),
149 time_index_column: self.time_index_column.clone(),
150 input: exec_input,
151 metric: ExecutionPlanMetricsSet::new(),
152 })
153 }
154
155 pub fn tags(&self) -> &[String] {
156 &self.tag_columns
157 }
158
159 pub fn serialize(&self) -> Vec<u8> {
160 let tag_column_indices = self
161 .tag_columns
162 .iter()
163 .map(|name| serialize_column_index(self.input.schema(), name))
164 .collect::<Vec<u64>>();
165
166 let time_index_column_idx =
167 serialize_column_index(self.input.schema(), &self.time_index_column);
168
169 pb::SeriesDivide {
170 tag_column_indices,
171 time_index_column_idx,
172 ..Default::default()
173 }
174 .encode_to_vec()
175 }
176
177 pub fn deserialize(bytes: &[u8]) -> Result<Self> {
178 let pb_series_divide = pb::SeriesDivide::decode(bytes).context(DeserializeSnafu)?;
179 let placeholder_plan = LogicalPlan::EmptyRelation(EmptyRelation {
180 produce_one_row: false,
181 schema: Arc::new(DFSchema::empty()),
182 });
183
184 let unfix = UnfixIndices {
185 tag_column_indices: pb_series_divide.tag_column_indices.clone(),
186 time_index_column_idx: pb_series_divide.time_index_column_idx,
187 };
188
189 Ok(Self {
190 tag_columns: Vec::new(),
191 time_index_column: String::new(),
192 input: placeholder_plan,
193 unfix: Some(unfix),
194 })
195 }
196}
197
198#[derive(Debug)]
199pub struct SeriesDivideExec {
200 tag_columns: Vec<String>,
201 time_index_column: String,
202 input: Arc<dyn ExecutionPlan>,
203 metric: ExecutionPlanMetricsSet,
204}
205
206impl ExecutionPlan for SeriesDivideExec {
207 fn as_any(&self) -> &dyn Any {
208 self
209 }
210
211 fn schema(&self) -> SchemaRef {
212 self.input.schema()
213 }
214
215 fn properties(&self) -> &PlanProperties {
216 self.input.properties()
217 }
218
219 fn required_input_distribution(&self) -> Vec<Distribution> {
220 let schema = self.input.schema();
221 vec![Distribution::HashPartitioned(
222 self.tag_columns
223 .iter()
224 .map(|tag| Arc::new(ColumnExpr::new_with_schema(tag, &schema).unwrap()) as _)
226 .collect(),
227 )]
228 }
229
230 fn required_input_ordering(&self) -> Vec<Option<OrderingRequirements>> {
231 let input_schema = self.input.schema();
232 let mut exprs: Vec<PhysicalSortRequirement> = self
233 .tag_columns
234 .iter()
235 .map(|tag| PhysicalSortRequirement {
236 expr: Arc::new(ColumnExpr::new_with_schema(tag, &input_schema).unwrap()),
238 options: Some(SortOptions {
239 descending: false,
240 nulls_first: true,
241 }),
242 })
243 .collect();
244
245 exprs.push(PhysicalSortRequirement {
246 expr: Arc::new(
247 ColumnExpr::new_with_schema(&self.time_index_column, &input_schema).unwrap(),
248 ),
249 options: Some(SortOptions {
250 descending: false,
251 nulls_first: true,
252 }),
253 });
254
255 let requirement = LexRequirement::new(exprs).unwrap();
257
258 vec![Some(OrderingRequirements::Hard(vec![requirement]))]
259 }
260
261 fn maintains_input_order(&self) -> Vec<bool> {
262 vec![true; self.children().len()]
263 }
264
265 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
266 vec![&self.input]
267 }
268
269 fn with_new_children(
270 self: Arc<Self>,
271 children: Vec<Arc<dyn ExecutionPlan>>,
272 ) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
273 assert!(!children.is_empty());
274 Ok(Arc::new(Self {
275 tag_columns: self.tag_columns.clone(),
276 time_index_column: self.time_index_column.clone(),
277 input: children[0].clone(),
278 metric: self.metric.clone(),
279 }))
280 }
281
282 fn execute(
283 &self,
284 partition: usize,
285 context: Arc<TaskContext>,
286 ) -> DataFusionResult<SendableRecordBatchStream> {
287 let baseline_metric = BaselineMetrics::new(&self.metric, partition);
288 let metrics_builder = MetricBuilder::new(&self.metric);
289 let num_series = Count::new();
290 metrics_builder
291 .with_partition(partition)
292 .build(MetricValue::Count {
293 name: METRIC_NUM_SERIES.into(),
294 count: num_series.clone(),
295 });
296
297 let input = self.input.execute(partition, context)?;
298 let schema = input.schema();
299 let tag_indices = self
300 .tag_columns
301 .iter()
302 .map(|tag| {
303 schema
304 .column_with_name(tag)
305 .unwrap_or_else(|| panic!("tag column not found {tag}"))
306 .0
307 })
308 .collect();
309 Ok(Box::pin(SeriesDivideStream {
310 tag_indices,
311 buffer: vec![],
312 schema,
313 input,
314 metric: baseline_metric,
315 num_series,
316 inspect_start: 0,
317 }))
318 }
319
320 fn metrics(&self) -> Option<MetricsSet> {
321 Some(self.metric.clone_inner())
322 }
323
324 fn name(&self) -> &str {
325 "SeriesDivideExec"
326 }
327}
328
329impl DisplayAs for SeriesDivideExec {
330 fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
331 match t {
332 DisplayFormatType::Default
333 | DisplayFormatType::Verbose
334 | DisplayFormatType::TreeRender => {
335 write!(f, "PromSeriesDivideExec: tags={:?}", self.tag_columns)
336 }
337 }
338 }
339}
340
341pub struct SeriesDivideStream {
343 tag_indices: Vec<usize>,
344 buffer: Vec<RecordBatch>,
345 schema: SchemaRef,
346 input: SendableRecordBatchStream,
347 metric: BaselineMetrics,
348 inspect_start: usize,
350 num_series: Count,
352}
353
354impl RecordBatchStream for SeriesDivideStream {
355 fn schema(&self) -> SchemaRef {
356 self.schema.clone()
357 }
358}
359
360impl Stream for SeriesDivideStream {
361 type Item = DataFusionResult<RecordBatch>;
362
363 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
364 loop {
365 if !self.buffer.is_empty() {
366 let timer = std::time::Instant::now();
367 let cut_at = match self.find_first_diff_row() {
368 Ok(cut_at) => cut_at,
369 Err(e) => return Poll::Ready(Some(Err(e))),
370 };
371 if let Some((batch_index, row_index)) = cut_at {
372 let half_batch_of_first_series =
374 self.buffer[batch_index].slice(0, row_index + 1);
375 let half_batch_of_second_series = self.buffer[batch_index].slice(
376 row_index + 1,
377 self.buffer[batch_index].num_rows() - row_index - 1,
378 );
379 let result_batches = self
380 .buffer
381 .drain(0..batch_index)
382 .chain([half_batch_of_first_series])
383 .collect::<Vec<_>>();
384 if half_batch_of_second_series.num_rows() > 0 {
385 self.buffer[0] = half_batch_of_second_series;
386 } else {
387 self.buffer.remove(0);
388 }
389 let result_batch = compute::concat_batches(&self.schema, &result_batches)?;
390
391 self.inspect_start = 0;
392 self.num_series.add(1);
393 self.metric.elapsed_compute().add_elapsed(timer);
394 return Poll::Ready(Some(Ok(result_batch)));
395 } else {
396 self.metric.elapsed_compute().add_elapsed(timer);
397 let next_batch = ready!(self.as_mut().fetch_next_batch(cx)).transpose()?;
399 let timer = std::time::Instant::now();
400 if let Some(next_batch) = next_batch {
401 if next_batch.num_rows() != 0 {
402 self.buffer.push(next_batch);
403 }
404 continue;
405 } else {
406 let result = compute::concat_batches(&self.schema, &self.buffer)?;
408 self.buffer.clear();
409 self.inspect_start = 0;
410 self.num_series.add(1);
411 self.metric.elapsed_compute().add_elapsed(timer);
412 return Poll::Ready(Some(Ok(result)));
413 }
414 }
415 } else {
416 let batch = match ready!(self.as_mut().fetch_next_batch(cx)) {
417 Some(Ok(batch)) => batch,
418 None => {
419 PROMQL_SERIES_COUNT.observe(self.num_series.value() as f64);
420 return Poll::Ready(None);
421 }
422 error => return Poll::Ready(error),
423 };
424 self.buffer.push(batch);
425 continue;
426 }
427 }
428 }
429}
430
431impl SeriesDivideStream {
432 fn fetch_next_batch(
433 mut self: Pin<&mut Self>,
434 cx: &mut Context<'_>,
435 ) -> Poll<Option<DataFusionResult<RecordBatch>>> {
436 let poll = self.input.poll_next_unpin(cx);
437 self.metric.record_poll(poll)
438 }
439
440 fn find_first_diff_row(&mut self) -> DataFusionResult<Option<(usize, usize)>> {
443 if self.tag_indices.is_empty() {
445 return Ok(None);
446 }
447
448 let mut resumed_batch_index = self.inspect_start;
449
450 for batch in &self.buffer[resumed_batch_index..] {
451 let num_rows = batch.num_rows();
452 let mut result_index = num_rows;
453
454 if resumed_batch_index > self.inspect_start.checked_sub(1).unwrap_or_default() {
456 let last_batch = &self.buffer[resumed_batch_index - 1];
457 let last_row = last_batch.num_rows() - 1;
458 for index in &self.tag_indices {
459 let current_array = batch.column(*index);
460 let last_array = last_batch.column(*index);
461 let current_string_array = current_array
462 .as_any()
463 .downcast_ref::<StringArray>()
464 .ok_or_else(|| {
465 datafusion::error::DataFusionError::Internal(
466 "Failed to downcast tag column to StringArray".to_string(),
467 )
468 })?;
469 let last_string_array = last_array
470 .as_any()
471 .downcast_ref::<StringArray>()
472 .ok_or_else(|| {
473 datafusion::error::DataFusionError::Internal(
474 "Failed to downcast tag column to StringArray".to_string(),
475 )
476 })?;
477 let current_value = current_string_array.value(0);
478 let last_value = last_string_array.value(last_row);
479 if current_value != last_value {
480 return Ok(Some((resumed_batch_index - 1, last_batch.num_rows() - 1)));
481 }
482 }
483 }
484
485 let mut all_same = true;
487 for index in &self.tag_indices {
488 let array = batch.column(*index);
489 let string_array =
490 array
491 .as_any()
492 .downcast_ref::<StringArray>()
493 .ok_or_else(|| {
494 datafusion::error::DataFusionError::Internal(
495 "Failed to downcast tag column to StringArray".to_string(),
496 )
497 })?;
498 if string_array.value(0) != string_array.value(num_rows - 1) {
499 all_same = false;
500 break;
501 }
502 }
503 if all_same {
504 resumed_batch_index += 1;
505 continue;
506 }
507
508 for index in &self.tag_indices {
510 let array = batch.column(*index);
511 let string_array =
512 array
513 .as_any()
514 .downcast_ref::<StringArray>()
515 .ok_or_else(|| {
516 datafusion::error::DataFusionError::Internal(
517 "Failed to downcast tag column to StringArray".to_string(),
518 )
519 })?;
520 let mut same_until = 0;
522 while same_until < num_rows - 1 {
523 if string_array.value(same_until) != string_array.value(same_until + 1) {
524 break;
525 }
526 same_until += 1;
527 }
528 result_index = result_index.min(same_until);
529 }
530
531 if result_index + 1 >= num_rows {
532 resumed_batch_index += 1;
534 } else {
535 return Ok(Some((resumed_batch_index, result_index)));
536 }
537 }
538
539 self.inspect_start = resumed_batch_index;
540 Ok(None)
541 }
542}
543
544#[cfg(test)]
545mod test {
546 use datafusion::arrow::datatypes::{DataType, Field, Schema};
547 use datafusion::datasource::memory::MemorySourceConfig;
548 use datafusion::datasource::source::DataSourceExec;
549 use datafusion::prelude::SessionContext;
550
551 use super::*;
552
553 fn prepare_test_data() -> DataSourceExec {
554 let schema = Arc::new(Schema::new(vec![
555 Field::new("host", DataType::Utf8, true),
556 Field::new("path", DataType::Utf8, true),
557 Field::new(
558 "time_index",
559 DataType::Timestamp(datafusion::arrow::datatypes::TimeUnit::Millisecond, None),
560 false,
561 ),
562 ]));
563
564 let path_column_1 = Arc::new(StringArray::from(vec![
565 "foo", "foo", "foo", "bar", "bar", "bar", "bar", "bar", "bar", "bla", "bla", "bla",
566 ])) as _;
567 let host_column_1 = Arc::new(StringArray::from(vec![
568 "000", "000", "001", "002", "002", "002", "002", "002", "003", "005", "005", "005",
569 ])) as _;
570 let time_index_column_1 = Arc::new(
571 datafusion::arrow::array::TimestampMillisecondArray::from(vec![
572 1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000, 10000, 11000, 12000,
573 ]),
574 ) as _;
575
576 let path_column_2 = Arc::new(StringArray::from(vec!["bla", "bla", "bla"])) as _;
577 let host_column_2 = Arc::new(StringArray::from(vec!["005", "005", "005"])) as _;
578 let time_index_column_2 = Arc::new(
579 datafusion::arrow::array::TimestampMillisecondArray::from(vec![13000, 14000, 15000]),
580 ) as _;
581
582 let path_column_3 = Arc::new(StringArray::from(vec![
583 "bla", "🥺", "🥺", "🥺", "🥺", "🥺", "🫠", "🫠",
584 ])) as _;
585 let host_column_3 = Arc::new(StringArray::from(vec![
586 "005", "001", "001", "001", "001", "001", "001", "001",
587 ])) as _;
588 let time_index_column_3 =
589 Arc::new(datafusion::arrow::array::TimestampMillisecondArray::from(
590 vec![16000, 17000, 18000, 19000, 20000, 21000, 22000, 23000],
591 )) as _;
592
593 let data_1 = RecordBatch::try_new(
594 schema.clone(),
595 vec![path_column_1, host_column_1, time_index_column_1],
596 )
597 .unwrap();
598 let data_2 = RecordBatch::try_new(
599 schema.clone(),
600 vec![path_column_2, host_column_2, time_index_column_2],
601 )
602 .unwrap();
603 let data_3 = RecordBatch::try_new(
604 schema.clone(),
605 vec![path_column_3, host_column_3, time_index_column_3],
606 )
607 .unwrap();
608
609 DataSourceExec::new(Arc::new(
610 MemorySourceConfig::try_new(&[vec![data_1, data_2, data_3]], schema, None).unwrap(),
611 ))
612 }
613
614 #[tokio::test]
615 async fn overall_data() {
616 let memory_exec = Arc::new(prepare_test_data());
617 let divide_exec = Arc::new(SeriesDivideExec {
618 tag_columns: vec!["host".to_string(), "path".to_string()],
619 time_index_column: "time_index".to_string(),
620 input: memory_exec,
621 metric: ExecutionPlanMetricsSet::new(),
622 });
623 let session_context = SessionContext::default();
624 let result = datafusion::physical_plan::collect(divide_exec, session_context.task_ctx())
625 .await
626 .unwrap();
627 let result_literal = datatypes::arrow::util::pretty::pretty_format_batches(&result)
628 .unwrap()
629 .to_string();
630
631 let expected = String::from(
632 "+------+------+---------------------+\
633 \n| host | path | time_index |\
634 \n+------+------+---------------------+\
635 \n| foo | 000 | 1970-01-01T00:00:01 |\
636 \n| foo | 000 | 1970-01-01T00:00:02 |\
637 \n| foo | 001 | 1970-01-01T00:00:03 |\
638 \n| bar | 002 | 1970-01-01T00:00:04 |\
639 \n| bar | 002 | 1970-01-01T00:00:05 |\
640 \n| bar | 002 | 1970-01-01T00:00:06 |\
641 \n| bar | 002 | 1970-01-01T00:00:07 |\
642 \n| bar | 002 | 1970-01-01T00:00:08 |\
643 \n| bar | 003 | 1970-01-01T00:00:09 |\
644 \n| bla | 005 | 1970-01-01T00:00:10 |\
645 \n| bla | 005 | 1970-01-01T00:00:11 |\
646 \n| bla | 005 | 1970-01-01T00:00:12 |\
647 \n| bla | 005 | 1970-01-01T00:00:13 |\
648 \n| bla | 005 | 1970-01-01T00:00:14 |\
649 \n| bla | 005 | 1970-01-01T00:00:15 |\
650 \n| bla | 005 | 1970-01-01T00:00:16 |\
651 \n| 🥺 | 001 | 1970-01-01T00:00:17 |\
652 \n| 🥺 | 001 | 1970-01-01T00:00:18 |\
653 \n| 🥺 | 001 | 1970-01-01T00:00:19 |\
654 \n| 🥺 | 001 | 1970-01-01T00:00:20 |\
655 \n| 🥺 | 001 | 1970-01-01T00:00:21 |\
656 \n| 🫠| 001 | 1970-01-01T00:00:22 |\
657 \n| 🫠| 001 | 1970-01-01T00:00:23 |\
658 \n+------+------+---------------------+",
659 );
660 assert_eq!(result_literal, expected);
661 }
662
663 #[tokio::test]
664 async fn per_batch_data() {
665 let memory_exec = Arc::new(prepare_test_data());
666 let divide_exec = Arc::new(SeriesDivideExec {
667 tag_columns: vec!["host".to_string(), "path".to_string()],
668 time_index_column: "time_index".to_string(),
669 input: memory_exec,
670 metric: ExecutionPlanMetricsSet::new(),
671 });
672 let mut divide_stream = divide_exec
673 .execute(0, SessionContext::default().task_ctx())
674 .unwrap();
675
676 let mut expectations = vec![
677 String::from(
678 "+------+------+---------------------+\
679 \n| host | path | time_index |\
680 \n+------+------+---------------------+\
681 \n| foo | 000 | 1970-01-01T00:00:01 |\
682 \n| foo | 000 | 1970-01-01T00:00:02 |\
683 \n+------+------+---------------------+",
684 ),
685 String::from(
686 "+------+------+---------------------+\
687 \n| host | path | time_index |\
688 \n+------+------+---------------------+\
689 \n| foo | 001 | 1970-01-01T00:00:03 |\
690 \n+------+------+---------------------+",
691 ),
692 String::from(
693 "+------+------+---------------------+\
694 \n| host | path | time_index |\
695 \n+------+------+---------------------+\
696 \n| bar | 002 | 1970-01-01T00:00:04 |\
697 \n| bar | 002 | 1970-01-01T00:00:05 |\
698 \n| bar | 002 | 1970-01-01T00:00:06 |\
699 \n| bar | 002 | 1970-01-01T00:00:07 |\
700 \n| bar | 002 | 1970-01-01T00:00:08 |\
701 \n+------+------+---------------------+",
702 ),
703 String::from(
704 "+------+------+---------------------+\
705 \n| host | path | time_index |\
706 \n+------+------+---------------------+\
707 \n| bar | 003 | 1970-01-01T00:00:09 |\
708 \n+------+------+---------------------+",
709 ),
710 String::from(
711 "+------+------+---------------------+\
712 \n| host | path | time_index |\
713 \n+------+------+---------------------+\
714 \n| bla | 005 | 1970-01-01T00:00:10 |\
715 \n| bla | 005 | 1970-01-01T00:00:11 |\
716 \n| bla | 005 | 1970-01-01T00:00:12 |\
717 \n| bla | 005 | 1970-01-01T00:00:13 |\
718 \n| bla | 005 | 1970-01-01T00:00:14 |\
719 \n| bla | 005 | 1970-01-01T00:00:15 |\
720 \n| bla | 005 | 1970-01-01T00:00:16 |\
721 \n+------+------+---------------------+",
722 ),
723 String::from(
724 "+------+------+---------------------+\
725 \n| host | path | time_index |\
726 \n+------+------+---------------------+\
727 \n| 🥺 | 001 | 1970-01-01T00:00:17 |\
728 \n| 🥺 | 001 | 1970-01-01T00:00:18 |\
729 \n| 🥺 | 001 | 1970-01-01T00:00:19 |\
730 \n| 🥺 | 001 | 1970-01-01T00:00:20 |\
731 \n| 🥺 | 001 | 1970-01-01T00:00:21 |\
732 \n+------+------+---------------------+",
733 ),
734 String::from(
735 "+------+------+---------------------+\
736 \n| host | path | time_index |\
737 \n+------+------+---------------------+\
738 \n| 🫠| 001 | 1970-01-01T00:00:22 |\
739 \n| 🫠| 001 | 1970-01-01T00:00:23 |\
740 \n+------+------+---------------------+",
741 ),
742 ];
743 expectations.reverse();
744
745 while let Some(batch) = divide_stream.next().await {
746 let formatted =
747 datatypes::arrow::util::pretty::pretty_format_batches(&[batch.unwrap()])
748 .unwrap()
749 .to_string();
750 let expected = expectations.pop().unwrap();
751 assert_eq!(formatted, expected);
752 }
753 }
754
755 #[tokio::test]
756 async fn test_all_batches_same_combination() {
757 let schema = Arc::new(Schema::new(vec![
759 Field::new("host", DataType::Utf8, true),
760 Field::new("path", DataType::Utf8, true),
761 Field::new(
762 "time_index",
763 DataType::Timestamp(datafusion::arrow::datatypes::TimeUnit::Millisecond, None),
764 false,
765 ),
766 ]));
767
768 let batch1 = RecordBatch::try_new(
774 schema.clone(),
775 vec![
776 Arc::new(StringArray::from(vec!["server1", "server1", "server1"])) as _,
777 Arc::new(StringArray::from(vec!["/var/log", "/var/log", "/var/log"])) as _,
778 Arc::new(datafusion::arrow::array::TimestampMillisecondArray::from(
779 vec![1000, 2000, 3000],
780 )) as _,
781 ],
782 )
783 .unwrap();
784
785 let batch2 = RecordBatch::try_new(
786 schema.clone(),
787 vec![
788 Arc::new(StringArray::from(vec!["server1", "server1"])) as _,
789 Arc::new(StringArray::from(vec!["/var/log", "/var/log"])) as _,
790 Arc::new(datafusion::arrow::array::TimestampMillisecondArray::from(
791 vec![4000, 5000],
792 )) as _,
793 ],
794 )
795 .unwrap();
796
797 let batch3 = RecordBatch::try_new(
799 schema.clone(),
800 vec![
801 Arc::new(StringArray::from(vec!["server2", "server2", "server2"])) as _,
802 Arc::new(StringArray::from(vec![
803 "/var/data",
804 "/var/data",
805 "/var/data",
806 ])) as _,
807 Arc::new(datafusion::arrow::array::TimestampMillisecondArray::from(
808 vec![6000, 7000, 8000],
809 )) as _,
810 ],
811 )
812 .unwrap();
813
814 let batch4 = RecordBatch::try_new(
815 schema.clone(),
816 vec![
817 Arc::new(StringArray::from(vec!["server2"])) as _,
818 Arc::new(StringArray::from(vec!["/var/data"])) as _,
819 Arc::new(datafusion::arrow::array::TimestampMillisecondArray::from(
820 vec![9000],
821 )) as _,
822 ],
823 )
824 .unwrap();
825
826 let batch5 = RecordBatch::try_new(
828 schema.clone(),
829 vec![
830 Arc::new(StringArray::from(vec!["server3", "server3"])) as _,
831 Arc::new(StringArray::from(vec!["/opt/logs", "/opt/logs"])) as _,
832 Arc::new(datafusion::arrow::array::TimestampMillisecondArray::from(
833 vec![10000, 11000],
834 )) as _,
835 ],
836 )
837 .unwrap();
838
839 let batch6 = RecordBatch::try_new(
840 schema.clone(),
841 vec![
842 Arc::new(StringArray::from(vec!["server3", "server3", "server3"])) as _,
843 Arc::new(StringArray::from(vec![
844 "/opt/logs",
845 "/opt/logs",
846 "/opt/logs",
847 ])) as _,
848 Arc::new(datafusion::arrow::array::TimestampMillisecondArray::from(
849 vec![12000, 13000, 14000],
850 )) as _,
851 ],
852 )
853 .unwrap();
854
855 let memory_exec = DataSourceExec::from_data_source(
857 MemorySourceConfig::try_new(
858 &[vec![batch1, batch2, batch3, batch4, batch5, batch6]],
859 schema.clone(),
860 None,
861 )
862 .unwrap(),
863 );
864
865 let divide_exec = Arc::new(SeriesDivideExec {
867 tag_columns: vec!["host".to_string(), "path".to_string()],
868 time_index_column: "time_index".to_string(),
869 input: memory_exec,
870 metric: ExecutionPlanMetricsSet::new(),
871 });
872
873 let session_context = SessionContext::default();
875 let result =
876 datafusion::physical_plan::collect(divide_exec.clone(), session_context.task_ctx())
877 .await
878 .unwrap();
879
880 assert_eq!(result.len(), 3);
882
883 assert_eq!(result[0].num_rows(), 5);
885
886 assert_eq!(result[1].num_rows(), 4);
888
889 assert_eq!(result[2].num_rows(), 5);
891
892 let host_array1 = result[0]
894 .column(0)
895 .as_any()
896 .downcast_ref::<StringArray>()
897 .unwrap();
898 let path_array1 = result[0]
899 .column(1)
900 .as_any()
901 .downcast_ref::<StringArray>()
902 .unwrap();
903 let time_index_array1 = result[0]
904 .column(2)
905 .as_any()
906 .downcast_ref::<datafusion::arrow::array::TimestampMillisecondArray>()
907 .unwrap();
908
909 for i in 0..5 {
910 assert_eq!(host_array1.value(i), "server1");
911 assert_eq!(path_array1.value(i), "/var/log");
912 assert_eq!(time_index_array1.value(i), 1000 + (i as i64) * 1000);
913 }
914
915 let host_array2 = result[1]
917 .column(0)
918 .as_any()
919 .downcast_ref::<StringArray>()
920 .unwrap();
921 let path_array2 = result[1]
922 .column(1)
923 .as_any()
924 .downcast_ref::<StringArray>()
925 .unwrap();
926 let time_index_array2 = result[1]
927 .column(2)
928 .as_any()
929 .downcast_ref::<datafusion::arrow::array::TimestampMillisecondArray>()
930 .unwrap();
931
932 for i in 0..4 {
933 assert_eq!(host_array2.value(i), "server2");
934 assert_eq!(path_array2.value(i), "/var/data");
935 assert_eq!(time_index_array2.value(i), 6000 + (i as i64) * 1000);
936 }
937
938 let host_array3 = result[2]
940 .column(0)
941 .as_any()
942 .downcast_ref::<StringArray>()
943 .unwrap();
944 let path_array3 = result[2]
945 .column(1)
946 .as_any()
947 .downcast_ref::<StringArray>()
948 .unwrap();
949 let time_index_array3 = result[2]
950 .column(2)
951 .as_any()
952 .downcast_ref::<datafusion::arrow::array::TimestampMillisecondArray>()
953 .unwrap();
954
955 for i in 0..5 {
956 assert_eq!(host_array3.value(i), "server3");
957 assert_eq!(path_array3.value(i), "/opt/logs");
958 assert_eq!(time_index_array3.value(i), 10000 + (i as i64) * 1000);
959 }
960
961 let mut divide_stream = divide_exec
963 .execute(0, SessionContext::default().task_ctx())
964 .unwrap();
965
966 let batch1 = divide_stream.next().await.unwrap().unwrap();
968 assert_eq!(batch1.num_rows(), 5); let batch2 = divide_stream.next().await.unwrap().unwrap();
971 assert_eq!(batch2.num_rows(), 4); let batch3 = divide_stream.next().await.unwrap().unwrap();
974 assert_eq!(batch3.num_rows(), 5); assert!(divide_stream.next().await.is_none());
978 }
979}