1use std::any::Any;
16use std::pin::Pin;
17use std::sync::Arc;
18use std::task::{Context, Poll};
19
20use datafusion::arrow::array::{
21 Array, LargeStringArray, StringArray, StringViewArray, UInt64Array,
22};
23use datafusion::arrow::datatypes::{DataType, SchemaRef};
24use datafusion::arrow::record_batch::RecordBatch;
25use datafusion::common::{DFSchema, DFSchemaRef};
26use datafusion::error::Result as DataFusionResult;
27use datafusion::execution::context::TaskContext;
28use datafusion::logical_expr::{EmptyRelation, Expr, LogicalPlan, UserDefinedLogicalNodeCore};
29use datafusion::physical_expr::{LexRequirement, OrderingRequirements, PhysicalSortRequirement};
30use datafusion::physical_plan::expressions::Column as ColumnExpr;
31use datafusion::physical_plan::metrics::{
32 BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, MetricValue, MetricsSet,
33};
34use datafusion::physical_plan::{
35 DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, PlanProperties, RecordBatchStream,
36 SendableRecordBatchStream,
37};
38use datafusion_expr::col;
39use datatypes::arrow::compute;
40use datatypes::compute::SortOptions;
41use futures::{Stream, StreamExt, ready};
42use greptime_proto::substrait_extension as pb;
43use prost::Message;
44use snafu::ResultExt;
45
46use crate::error::{DeserializeSnafu, Result};
47use crate::extension_plan::{METRIC_NUM_SERIES, resolve_column_name, serialize_column_index};
48use crate::metrics::PROMQL_SERIES_COUNT;
49
50enum TagIdentifier<'a> {
51 Raw(Vec<RawTagColumn<'a>>),
53 Id(&'a UInt64Array),
55}
56
57impl<'a> TagIdentifier<'a> {
58 fn try_new(batch: &'a RecordBatch, tag_indices: &[usize]) -> DataFusionResult<Self> {
59 match tag_indices {
60 [] => Ok(Self::Raw(Vec::new())),
61 [index] => {
62 let array = batch.column(*index);
63 if array.data_type() == &DataType::UInt64 {
64 let array = array
65 .as_any()
66 .downcast_ref::<UInt64Array>()
67 .ok_or_else(|| {
68 datafusion::error::DataFusionError::Internal(
69 "Failed to downcast tag column to UInt64Array".to_string(),
70 )
71 })?;
72 Ok(Self::Id(array))
73 } else {
74 Ok(Self::Raw(vec![RawTagColumn::try_new(array.as_ref())?]))
75 }
76 }
77 indices => Ok(Self::Raw(
78 indices
79 .iter()
80 .map(|index| RawTagColumn::try_new(batch.column(*index).as_ref()))
81 .collect::<DataFusionResult<Vec<_>>>()?,
82 )),
83 }
84 }
85
86 fn equal_at(&self, left_row: usize, other: &Self, right_row: usize) -> DataFusionResult<bool> {
87 match (self, other) {
88 (Self::Id(left), Self::Id(right)) => {
89 if left.is_null(left_row) || right.is_null(right_row) {
90 return Ok(left.is_null(left_row) && right.is_null(right_row));
91 }
92 Ok(left.value(left_row) == right.value(right_row))
93 }
94 (Self::Raw(left), Self::Raw(right)) => {
95 if left.len() != right.len() {
96 return Err(datafusion::error::DataFusionError::Internal(format!(
97 "Mismatched tag column count: left={}, right={}",
98 left.len(),
99 right.len()
100 )));
101 }
102
103 for (left_column, right_column) in left.iter().zip(right.iter()) {
104 if !left_column.equal_at(left_row, right_column, right_row)? {
105 return Ok(false);
106 }
107 }
108 Ok(true)
109 }
110 _ => Err(datafusion::error::DataFusionError::Internal(format!(
111 "Mismatched tag identifier types: left={:?}, right={:?}",
112 self.data_type(),
113 other.data_type()
114 ))),
115 }
116 }
117
118 fn data_type(&self) -> &'static str {
119 match self {
120 Self::Raw(_) => "Raw",
121 Self::Id(_) => "Id",
122 }
123 }
124}
125
126enum RawTagColumn<'a> {
127 Utf8(&'a StringArray),
128 LargeUtf8(&'a LargeStringArray),
129 Utf8View(&'a StringViewArray),
130}
131
132impl<'a> RawTagColumn<'a> {
133 fn try_new(array: &'a dyn Array) -> DataFusionResult<Self> {
134 match array.data_type() {
135 DataType::Utf8 => array
136 .as_any()
137 .downcast_ref::<StringArray>()
138 .map(Self::Utf8)
139 .ok_or_else(|| {
140 datafusion::error::DataFusionError::Internal(
141 "Failed to downcast tag column to StringArray".to_string(),
142 )
143 }),
144 DataType::LargeUtf8 => array
145 .as_any()
146 .downcast_ref::<LargeStringArray>()
147 .map(Self::LargeUtf8)
148 .ok_or_else(|| {
149 datafusion::error::DataFusionError::Internal(
150 "Failed to downcast tag column to LargeStringArray".to_string(),
151 )
152 }),
153 DataType::Utf8View => array
154 .as_any()
155 .downcast_ref::<StringViewArray>()
156 .map(Self::Utf8View)
157 .ok_or_else(|| {
158 datafusion::error::DataFusionError::Internal(
159 "Failed to downcast tag column to StringViewArray".to_string(),
160 )
161 }),
162 other => Err(datafusion::error::DataFusionError::Internal(format!(
163 "Unsupported tag column type: {other:?}"
164 ))),
165 }
166 }
167
168 fn is_null(&self, row: usize) -> bool {
169 match self {
170 Self::Utf8(array) => array.is_null(row),
171 Self::LargeUtf8(array) => array.is_null(row),
172 Self::Utf8View(array) => array.is_null(row),
173 }
174 }
175
176 fn value(&self, row: usize) -> &str {
177 match self {
178 Self::Utf8(array) => array.value(row),
179 Self::LargeUtf8(array) => array.value(row),
180 Self::Utf8View(array) => array.value(row),
181 }
182 }
183
184 fn equal_at(&self, left_row: usize, other: &Self, right_row: usize) -> DataFusionResult<bool> {
185 if self.is_null(left_row) || other.is_null(right_row) {
186 return Ok(self.is_null(left_row) && other.is_null(right_row));
187 }
188
189 Ok(self.value(left_row) == other.value(right_row))
190 }
191}
192
193#[derive(Debug, PartialEq, Eq, Hash, PartialOrd)]
194pub struct SeriesDivide {
195 tag_columns: Vec<String>,
196 time_index_column: String,
201 input: LogicalPlan,
202 unfix: Option<UnfixIndices>,
203}
204
205#[derive(Debug, PartialEq, Eq, Hash, PartialOrd)]
206struct UnfixIndices {
207 pub tag_column_indices: Vec<u64>,
208 pub time_index_column_idx: u64,
209}
210
211impl UserDefinedLogicalNodeCore for SeriesDivide {
212 fn name(&self) -> &str {
213 Self::name()
214 }
215
216 fn inputs(&self) -> Vec<&LogicalPlan> {
217 vec![&self.input]
218 }
219
220 fn schema(&self) -> &DFSchemaRef {
221 self.input.schema()
222 }
223
224 fn expressions(&self) -> Vec<Expr> {
225 if self.unfix.is_some() {
226 return vec![];
227 }
228
229 self.tag_columns
230 .iter()
231 .map(col)
232 .chain(std::iter::once(col(&self.time_index_column)))
233 .collect()
234 }
235
236 fn necessary_children_exprs(&self, output_columns: &[usize]) -> Option<Vec<Vec<usize>>> {
237 if self.unfix.is_some() {
238 return None;
239 }
240
241 let input_schema = self.input.schema();
242 if output_columns.is_empty() {
243 let indices = (0..input_schema.fields().len()).collect::<Vec<_>>();
244 return Some(vec![indices]);
245 }
246
247 let mut required = Vec::with_capacity(output_columns.len() + 1 + self.tag_columns.len());
248 required.extend_from_slice(output_columns);
249 for tag in &self.tag_columns {
250 required.push(input_schema.index_of_column_by_name(None, tag)?);
251 }
252 required.push(input_schema.index_of_column_by_name(None, &self.time_index_column)?);
253
254 required.sort_unstable();
255 required.dedup();
256 Some(vec![required])
257 }
258
259 fn fmt_for_explain(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
260 write!(f, "PromSeriesDivide: tags={:?}", self.tag_columns)
261 }
262
263 fn with_exprs_and_inputs(
264 &self,
265 _exprs: Vec<Expr>,
266 inputs: Vec<LogicalPlan>,
267 ) -> DataFusionResult<Self> {
268 if inputs.is_empty() {
269 return Err(datafusion::error::DataFusionError::Internal(
270 "SeriesDivide must have at least one input".to_string(),
271 ));
272 }
273
274 let input: LogicalPlan = inputs[0].clone();
275 let input_schema = input.schema();
276
277 if let Some(unfix) = &self.unfix {
278 let tag_columns = unfix
280 .tag_column_indices
281 .iter()
282 .map(|idx| resolve_column_name(*idx, input_schema, "SeriesDivide", "tag"))
283 .collect::<DataFusionResult<Vec<String>>>()?;
284
285 let time_index_column = resolve_column_name(
286 unfix.time_index_column_idx,
287 input_schema,
288 "SeriesDivide",
289 "time index",
290 )?;
291
292 Ok(Self {
293 tag_columns,
294 time_index_column,
295 input,
296 unfix: None,
297 })
298 } else {
299 Ok(Self {
300 tag_columns: self.tag_columns.clone(),
301 time_index_column: self.time_index_column.clone(),
302 input,
303 unfix: None,
304 })
305 }
306 }
307}
308
309impl SeriesDivide {
310 pub fn new(tag_columns: Vec<String>, time_index_column: String, input: LogicalPlan) -> Self {
311 Self {
312 tag_columns,
313 time_index_column,
314 input,
315 unfix: None,
316 }
317 }
318
319 pub const fn name() -> &'static str {
320 "SeriesDivide"
321 }
322
323 pub fn to_execution_plan(&self, exec_input: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
324 Arc::new(SeriesDivideExec {
325 tag_columns: self.tag_columns.clone(),
326 time_index_column: self.time_index_column.clone(),
327 input: exec_input,
328 metric: ExecutionPlanMetricsSet::new(),
329 })
330 }
331
332 pub fn tags(&self) -> &[String] {
333 &self.tag_columns
334 }
335
336 pub fn serialize(&self) -> Vec<u8> {
337 let tag_column_indices = self
338 .tag_columns
339 .iter()
340 .map(|name| serialize_column_index(self.input.schema(), name))
341 .collect::<Vec<u64>>();
342
343 let time_index_column_idx =
344 serialize_column_index(self.input.schema(), &self.time_index_column);
345
346 pb::SeriesDivide {
347 tag_column_indices,
348 time_index_column_idx,
349 ..Default::default()
350 }
351 .encode_to_vec()
352 }
353
354 pub fn deserialize(bytes: &[u8]) -> Result<Self> {
355 let pb_series_divide = pb::SeriesDivide::decode(bytes).context(DeserializeSnafu)?;
356 let placeholder_plan = LogicalPlan::EmptyRelation(EmptyRelation {
357 produce_one_row: false,
358 schema: Arc::new(DFSchema::empty()),
359 });
360
361 let unfix = UnfixIndices {
362 tag_column_indices: pb_series_divide.tag_column_indices.clone(),
363 time_index_column_idx: pb_series_divide.time_index_column_idx,
364 };
365
366 Ok(Self {
367 tag_columns: Vec::new(),
368 time_index_column: String::new(),
369 input: placeholder_plan,
370 unfix: Some(unfix),
371 })
372 }
373}
374
375#[derive(Debug)]
376pub struct SeriesDivideExec {
377 tag_columns: Vec<String>,
378 time_index_column: String,
379 input: Arc<dyn ExecutionPlan>,
380 metric: ExecutionPlanMetricsSet,
381}
382
383impl ExecutionPlan for SeriesDivideExec {
384 fn as_any(&self) -> &dyn Any {
385 self
386 }
387
388 fn schema(&self) -> SchemaRef {
389 self.input.schema()
390 }
391
392 fn properties(&self) -> &PlanProperties {
393 self.input.properties()
394 }
395
396 fn required_input_distribution(&self) -> Vec<Distribution> {
397 let schema = self.input.schema();
398 vec![Distribution::HashPartitioned(
399 self.tag_columns
400 .iter()
401 .map(|tag| Arc::new(ColumnExpr::new_with_schema(tag, &schema).unwrap()) as _)
403 .collect(),
404 )]
405 }
406
407 fn required_input_ordering(&self) -> Vec<Option<OrderingRequirements>> {
408 let input_schema = self.input.schema();
409 let mut exprs: Vec<PhysicalSortRequirement> = self
410 .tag_columns
411 .iter()
412 .map(|tag| PhysicalSortRequirement {
413 expr: Arc::new(ColumnExpr::new_with_schema(tag, &input_schema).unwrap()),
415 options: Some(SortOptions {
416 descending: false,
417 nulls_first: true,
418 }),
419 })
420 .collect();
421
422 exprs.push(PhysicalSortRequirement {
423 expr: Arc::new(
424 ColumnExpr::new_with_schema(&self.time_index_column, &input_schema).unwrap(),
425 ),
426 options: Some(SortOptions {
427 descending: false,
428 nulls_first: true,
429 }),
430 });
431
432 let requirement = LexRequirement::new(exprs).unwrap();
434
435 vec![Some(OrderingRequirements::Hard(vec![requirement]))]
436 }
437
438 fn maintains_input_order(&self) -> Vec<bool> {
439 vec![true; self.children().len()]
440 }
441
442 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
443 vec![&self.input]
444 }
445
446 fn with_new_children(
447 self: Arc<Self>,
448 children: Vec<Arc<dyn ExecutionPlan>>,
449 ) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
450 assert!(!children.is_empty());
451 Ok(Arc::new(Self {
452 tag_columns: self.tag_columns.clone(),
453 time_index_column: self.time_index_column.clone(),
454 input: children[0].clone(),
455 metric: self.metric.clone(),
456 }))
457 }
458
459 fn execute(
460 &self,
461 partition: usize,
462 context: Arc<TaskContext>,
463 ) -> DataFusionResult<SendableRecordBatchStream> {
464 let baseline_metric = BaselineMetrics::new(&self.metric, partition);
465 let metrics_builder = MetricBuilder::new(&self.metric);
466 let num_series = Count::new();
467 metrics_builder
468 .with_partition(partition)
469 .build(MetricValue::Count {
470 name: METRIC_NUM_SERIES.into(),
471 count: num_series.clone(),
472 });
473
474 let input = self.input.execute(partition, context)?;
475 let schema = input.schema();
476 let tag_indices = self
477 .tag_columns
478 .iter()
479 .map(|tag| {
480 schema
481 .column_with_name(tag)
482 .unwrap_or_else(|| panic!("tag column not found {tag}"))
483 .0
484 })
485 .collect();
486 Ok(Box::pin(SeriesDivideStream {
487 tag_indices,
488 buffer: vec![],
489 schema,
490 input,
491 metric: baseline_metric,
492 num_series,
493 inspect_start: 0,
494 }))
495 }
496
497 fn metrics(&self) -> Option<MetricsSet> {
498 Some(self.metric.clone_inner())
499 }
500
501 fn name(&self) -> &str {
502 "SeriesDivideExec"
503 }
504}
505
506impl DisplayAs for SeriesDivideExec {
507 fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
508 match t {
509 DisplayFormatType::Default
510 | DisplayFormatType::Verbose
511 | DisplayFormatType::TreeRender => {
512 write!(f, "PromSeriesDivideExec: tags={:?}", self.tag_columns)
513 }
514 }
515 }
516}
517
518pub struct SeriesDivideStream {
520 tag_indices: Vec<usize>,
521 buffer: Vec<RecordBatch>,
522 schema: SchemaRef,
523 input: SendableRecordBatchStream,
524 metric: BaselineMetrics,
525 inspect_start: usize,
527 num_series: Count,
529}
530
531impl RecordBatchStream for SeriesDivideStream {
532 fn schema(&self) -> SchemaRef {
533 self.schema.clone()
534 }
535}
536
537impl Stream for SeriesDivideStream {
538 type Item = DataFusionResult<RecordBatch>;
539
540 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
541 loop {
542 if !self.buffer.is_empty() {
543 let timer = std::time::Instant::now();
544 let cut_at = match self.find_first_diff_row() {
545 Ok(cut_at) => cut_at,
546 Err(e) => return Poll::Ready(Some(Err(e))),
547 };
548 if let Some((batch_index, row_index)) = cut_at {
549 let half_batch_of_first_series =
551 self.buffer[batch_index].slice(0, row_index + 1);
552 let half_batch_of_second_series = self.buffer[batch_index].slice(
553 row_index + 1,
554 self.buffer[batch_index].num_rows() - row_index - 1,
555 );
556 let result_batches = self
557 .buffer
558 .drain(0..batch_index)
559 .chain([half_batch_of_first_series])
560 .collect::<Vec<_>>();
561 if half_batch_of_second_series.num_rows() > 0 {
562 self.buffer[0] = half_batch_of_second_series;
563 } else {
564 self.buffer.remove(0);
565 }
566 let result_batch = compute::concat_batches(&self.schema, &result_batches)?;
567
568 self.inspect_start = 0;
569 self.num_series.add(1);
570 self.metric.elapsed_compute().add_elapsed(timer);
571 return Poll::Ready(Some(Ok(result_batch)));
572 } else {
573 self.metric.elapsed_compute().add_elapsed(timer);
574 let next_batch = ready!(self.as_mut().fetch_next_batch(cx)).transpose()?;
576 let timer = std::time::Instant::now();
577 if let Some(next_batch) = next_batch {
578 if next_batch.num_rows() != 0 {
579 self.buffer.push(next_batch);
580 }
581 continue;
582 } else {
583 let result = compute::concat_batches(&self.schema, &self.buffer)?;
585 self.buffer.clear();
586 self.inspect_start = 0;
587 self.num_series.add(1);
588 self.metric.elapsed_compute().add_elapsed(timer);
589 return Poll::Ready(Some(Ok(result)));
590 }
591 }
592 } else {
593 let batch = match ready!(self.as_mut().fetch_next_batch(cx)) {
594 Some(Ok(batch)) => batch,
595 None => {
596 PROMQL_SERIES_COUNT.observe(self.num_series.value() as f64);
597 return Poll::Ready(None);
598 }
599 error => return Poll::Ready(error),
600 };
601 self.buffer.push(batch);
602 continue;
603 }
604 }
605 }
606}
607
608impl SeriesDivideStream {
609 fn fetch_next_batch(
610 mut self: Pin<&mut Self>,
611 cx: &mut Context<'_>,
612 ) -> Poll<Option<DataFusionResult<RecordBatch>>> {
613 let poll = self.input.poll_next_unpin(cx);
614 self.metric.record_poll(poll)
615 }
616
617 fn find_first_diff_row(&mut self) -> DataFusionResult<Option<(usize, usize)>> {
620 if self.tag_indices.is_empty() {
622 return Ok(None);
623 }
624
625 let mut resumed_batch_index = self.inspect_start;
626
627 for batch in &self.buffer[resumed_batch_index..] {
628 let num_rows = batch.num_rows();
629 let tags = TagIdentifier::try_new(batch, &self.tag_indices)?;
630
631 if resumed_batch_index > self.inspect_start.checked_sub(1).unwrap_or_default() {
633 let last_batch = &self.buffer[resumed_batch_index - 1];
634 let last_row = last_batch.num_rows() - 1;
635 let last_tags = TagIdentifier::try_new(last_batch, &self.tag_indices)?;
636 if !tags.equal_at(0, &last_tags, last_row)? {
637 return Ok(Some((resumed_batch_index - 1, last_row)));
638 }
639 }
640
641 if tags.equal_at(0, &tags, num_rows - 1)? {
643 resumed_batch_index += 1;
644 continue;
645 }
646
647 let mut same_until = 0;
648 while same_until < num_rows - 1 {
649 if !tags.equal_at(same_until, &tags, same_until + 1)? {
650 break;
651 }
652 same_until += 1;
653 }
654
655 if same_until + 1 >= num_rows {
656 resumed_batch_index += 1;
658 } else {
659 return Ok(Some((resumed_batch_index, same_until)));
660 }
661 }
662
663 self.inspect_start = resumed_batch_index;
664 Ok(None)
665 }
666}
667
668#[cfg(test)]
669mod test {
670 use datafusion::arrow::datatypes::{DataType, Field, Schema};
671 use datafusion::common::ToDFSchema;
672 use datafusion::datasource::memory::MemorySourceConfig;
673 use datafusion::datasource::source::DataSourceExec;
674 use datafusion::logical_expr::{EmptyRelation, LogicalPlan};
675 use datafusion::prelude::SessionContext;
676
677 use super::*;
678
679 fn prepare_test_data() -> DataSourceExec {
680 let schema = Arc::new(Schema::new(vec![
681 Field::new("host", DataType::Utf8, true),
682 Field::new("path", DataType::Utf8, true),
683 Field::new(
684 "time_index",
685 DataType::Timestamp(datafusion::arrow::datatypes::TimeUnit::Millisecond, None),
686 false,
687 ),
688 ]));
689
690 let path_column_1 = Arc::new(StringArray::from(vec![
691 "foo", "foo", "foo", "bar", "bar", "bar", "bar", "bar", "bar", "bla", "bla", "bla",
692 ])) as _;
693 let host_column_1 = Arc::new(StringArray::from(vec![
694 "000", "000", "001", "002", "002", "002", "002", "002", "003", "005", "005", "005",
695 ])) as _;
696 let time_index_column_1 = Arc::new(
697 datafusion::arrow::array::TimestampMillisecondArray::from(vec![
698 1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000, 10000, 11000, 12000,
699 ]),
700 ) as _;
701
702 let path_column_2 = Arc::new(StringArray::from(vec!["bla", "bla", "bla"])) as _;
703 let host_column_2 = Arc::new(StringArray::from(vec!["005", "005", "005"])) as _;
704 let time_index_column_2 = Arc::new(
705 datafusion::arrow::array::TimestampMillisecondArray::from(vec![13000, 14000, 15000]),
706 ) as _;
707
708 let path_column_3 = Arc::new(StringArray::from(vec![
709 "bla", "🥺", "🥺", "🥺", "🥺", "🥺", "🫠", "🫠",
710 ])) as _;
711 let host_column_3 = Arc::new(StringArray::from(vec![
712 "005", "001", "001", "001", "001", "001", "001", "001",
713 ])) as _;
714 let time_index_column_3 =
715 Arc::new(datafusion::arrow::array::TimestampMillisecondArray::from(
716 vec![16000, 17000, 18000, 19000, 20000, 21000, 22000, 23000],
717 )) as _;
718
719 let data_1 = RecordBatch::try_new(
720 schema.clone(),
721 vec![path_column_1, host_column_1, time_index_column_1],
722 )
723 .unwrap();
724 let data_2 = RecordBatch::try_new(
725 schema.clone(),
726 vec![path_column_2, host_column_2, time_index_column_2],
727 )
728 .unwrap();
729 let data_3 = RecordBatch::try_new(
730 schema.clone(),
731 vec![path_column_3, host_column_3, time_index_column_3],
732 )
733 .unwrap();
734
735 DataSourceExec::new(Arc::new(
736 MemorySourceConfig::try_new(&[vec![data_1, data_2, data_3]], schema, None).unwrap(),
737 ))
738 }
739
740 #[test]
741 fn pruning_should_keep_tags_and_time_index_columns_for_exec() {
742 let df_schema = prepare_test_data().schema().to_dfschema_ref().unwrap();
743 let input = LogicalPlan::EmptyRelation(EmptyRelation {
744 produce_one_row: false,
745 schema: df_schema,
746 });
747 let plan = SeriesDivide::new(
748 vec!["host".to_string(), "path".to_string()],
749 "time_index".to_string(),
750 input,
751 );
752
753 let output_columns = [0usize];
755 let required = plan.necessary_children_exprs(&output_columns).unwrap();
756 let required = &required[0];
757 assert_eq!(required.as_slice(), &[0, 1, 2]);
758 }
759
760 #[tokio::test]
761 async fn overall_data() {
762 let memory_exec = Arc::new(prepare_test_data());
763 let divide_exec = Arc::new(SeriesDivideExec {
764 tag_columns: vec!["host".to_string(), "path".to_string()],
765 time_index_column: "time_index".to_string(),
766 input: memory_exec,
767 metric: ExecutionPlanMetricsSet::new(),
768 });
769 let session_context = SessionContext::default();
770 let result = datafusion::physical_plan::collect(divide_exec, session_context.task_ctx())
771 .await
772 .unwrap();
773 let result_literal = datatypes::arrow::util::pretty::pretty_format_batches(&result)
774 .unwrap()
775 .to_string();
776
777 let expected = String::from(
778 "+------+------+---------------------+\
779 \n| host | path | time_index |\
780 \n+------+------+---------------------+\
781 \n| foo | 000 | 1970-01-01T00:00:01 |\
782 \n| foo | 000 | 1970-01-01T00:00:02 |\
783 \n| foo | 001 | 1970-01-01T00:00:03 |\
784 \n| bar | 002 | 1970-01-01T00:00:04 |\
785 \n| bar | 002 | 1970-01-01T00:00:05 |\
786 \n| bar | 002 | 1970-01-01T00:00:06 |\
787 \n| bar | 002 | 1970-01-01T00:00:07 |\
788 \n| bar | 002 | 1970-01-01T00:00:08 |\
789 \n| bar | 003 | 1970-01-01T00:00:09 |\
790 \n| bla | 005 | 1970-01-01T00:00:10 |\
791 \n| bla | 005 | 1970-01-01T00:00:11 |\
792 \n| bla | 005 | 1970-01-01T00:00:12 |\
793 \n| bla | 005 | 1970-01-01T00:00:13 |\
794 \n| bla | 005 | 1970-01-01T00:00:14 |\
795 \n| bla | 005 | 1970-01-01T00:00:15 |\
796 \n| bla | 005 | 1970-01-01T00:00:16 |\
797 \n| 🥺 | 001 | 1970-01-01T00:00:17 |\
798 \n| 🥺 | 001 | 1970-01-01T00:00:18 |\
799 \n| 🥺 | 001 | 1970-01-01T00:00:19 |\
800 \n| 🥺 | 001 | 1970-01-01T00:00:20 |\
801 \n| 🥺 | 001 | 1970-01-01T00:00:21 |\
802 \n| 🫠| 001 | 1970-01-01T00:00:22 |\
803 \n| 🫠| 001 | 1970-01-01T00:00:23 |\
804 \n+------+------+---------------------+",
805 );
806 assert_eq!(result_literal, expected);
807 }
808
809 #[tokio::test]
810 async fn per_batch_data() {
811 let memory_exec = Arc::new(prepare_test_data());
812 let divide_exec = Arc::new(SeriesDivideExec {
813 tag_columns: vec!["host".to_string(), "path".to_string()],
814 time_index_column: "time_index".to_string(),
815 input: memory_exec,
816 metric: ExecutionPlanMetricsSet::new(),
817 });
818 let mut divide_stream = divide_exec
819 .execute(0, SessionContext::default().task_ctx())
820 .unwrap();
821
822 let mut expectations = vec![
823 String::from(
824 "+------+------+---------------------+\
825 \n| host | path | time_index |\
826 \n+------+------+---------------------+\
827 \n| foo | 000 | 1970-01-01T00:00:01 |\
828 \n| foo | 000 | 1970-01-01T00:00:02 |\
829 \n+------+------+---------------------+",
830 ),
831 String::from(
832 "+------+------+---------------------+\
833 \n| host | path | time_index |\
834 \n+------+------+---------------------+\
835 \n| foo | 001 | 1970-01-01T00:00:03 |\
836 \n+------+------+---------------------+",
837 ),
838 String::from(
839 "+------+------+---------------------+\
840 \n| host | path | time_index |\
841 \n+------+------+---------------------+\
842 \n| bar | 002 | 1970-01-01T00:00:04 |\
843 \n| bar | 002 | 1970-01-01T00:00:05 |\
844 \n| bar | 002 | 1970-01-01T00:00:06 |\
845 \n| bar | 002 | 1970-01-01T00:00:07 |\
846 \n| bar | 002 | 1970-01-01T00:00:08 |\
847 \n+------+------+---------------------+",
848 ),
849 String::from(
850 "+------+------+---------------------+\
851 \n| host | path | time_index |\
852 \n+------+------+---------------------+\
853 \n| bar | 003 | 1970-01-01T00:00:09 |\
854 \n+------+------+---------------------+",
855 ),
856 String::from(
857 "+------+------+---------------------+\
858 \n| host | path | time_index |\
859 \n+------+------+---------------------+\
860 \n| bla | 005 | 1970-01-01T00:00:10 |\
861 \n| bla | 005 | 1970-01-01T00:00:11 |\
862 \n| bla | 005 | 1970-01-01T00:00:12 |\
863 \n| bla | 005 | 1970-01-01T00:00:13 |\
864 \n| bla | 005 | 1970-01-01T00:00:14 |\
865 \n| bla | 005 | 1970-01-01T00:00:15 |\
866 \n| bla | 005 | 1970-01-01T00:00:16 |\
867 \n+------+------+---------------------+",
868 ),
869 String::from(
870 "+------+------+---------------------+\
871 \n| host | path | time_index |\
872 \n+------+------+---------------------+\
873 \n| 🥺 | 001 | 1970-01-01T00:00:17 |\
874 \n| 🥺 | 001 | 1970-01-01T00:00:18 |\
875 \n| 🥺 | 001 | 1970-01-01T00:00:19 |\
876 \n| 🥺 | 001 | 1970-01-01T00:00:20 |\
877 \n| 🥺 | 001 | 1970-01-01T00:00:21 |\
878 \n+------+------+---------------------+",
879 ),
880 String::from(
881 "+------+------+---------------------+\
882 \n| host | path | time_index |\
883 \n+------+------+---------------------+\
884 \n| 🫠| 001 | 1970-01-01T00:00:22 |\
885 \n| 🫠| 001 | 1970-01-01T00:00:23 |\
886 \n+------+------+---------------------+",
887 ),
888 ];
889 expectations.reverse();
890
891 while let Some(batch) = divide_stream.next().await {
892 let formatted =
893 datatypes::arrow::util::pretty::pretty_format_batches(&[batch.unwrap()])
894 .unwrap()
895 .to_string();
896 let expected = expectations.pop().unwrap();
897 assert_eq!(formatted, expected);
898 }
899 }
900
901 #[tokio::test]
902 async fn test_all_batches_same_combination() {
903 let schema = Arc::new(Schema::new(vec![
905 Field::new("host", DataType::Utf8, true),
906 Field::new("path", DataType::Utf8, true),
907 Field::new(
908 "time_index",
909 DataType::Timestamp(datafusion::arrow::datatypes::TimeUnit::Millisecond, None),
910 false,
911 ),
912 ]));
913
914 let batch1 = RecordBatch::try_new(
920 schema.clone(),
921 vec![
922 Arc::new(StringArray::from(vec!["server1", "server1", "server1"])) as _,
923 Arc::new(StringArray::from(vec!["/var/log", "/var/log", "/var/log"])) as _,
924 Arc::new(datafusion::arrow::array::TimestampMillisecondArray::from(
925 vec![1000, 2000, 3000],
926 )) as _,
927 ],
928 )
929 .unwrap();
930
931 let batch2 = RecordBatch::try_new(
932 schema.clone(),
933 vec![
934 Arc::new(StringArray::from(vec!["server1", "server1"])) as _,
935 Arc::new(StringArray::from(vec!["/var/log", "/var/log"])) as _,
936 Arc::new(datafusion::arrow::array::TimestampMillisecondArray::from(
937 vec![4000, 5000],
938 )) as _,
939 ],
940 )
941 .unwrap();
942
943 let batch3 = RecordBatch::try_new(
945 schema.clone(),
946 vec![
947 Arc::new(StringArray::from(vec!["server2", "server2", "server2"])) as _,
948 Arc::new(StringArray::from(vec![
949 "/var/data",
950 "/var/data",
951 "/var/data",
952 ])) as _,
953 Arc::new(datafusion::arrow::array::TimestampMillisecondArray::from(
954 vec![6000, 7000, 8000],
955 )) as _,
956 ],
957 )
958 .unwrap();
959
960 let batch4 = RecordBatch::try_new(
961 schema.clone(),
962 vec![
963 Arc::new(StringArray::from(vec!["server2"])) as _,
964 Arc::new(StringArray::from(vec!["/var/data"])) as _,
965 Arc::new(datafusion::arrow::array::TimestampMillisecondArray::from(
966 vec![9000],
967 )) as _,
968 ],
969 )
970 .unwrap();
971
972 let batch5 = RecordBatch::try_new(
974 schema.clone(),
975 vec![
976 Arc::new(StringArray::from(vec!["server3", "server3"])) as _,
977 Arc::new(StringArray::from(vec!["/opt/logs", "/opt/logs"])) as _,
978 Arc::new(datafusion::arrow::array::TimestampMillisecondArray::from(
979 vec![10000, 11000],
980 )) as _,
981 ],
982 )
983 .unwrap();
984
985 let batch6 = RecordBatch::try_new(
986 schema.clone(),
987 vec![
988 Arc::new(StringArray::from(vec!["server3", "server3", "server3"])) as _,
989 Arc::new(StringArray::from(vec![
990 "/opt/logs",
991 "/opt/logs",
992 "/opt/logs",
993 ])) as _,
994 Arc::new(datafusion::arrow::array::TimestampMillisecondArray::from(
995 vec![12000, 13000, 14000],
996 )) as _,
997 ],
998 )
999 .unwrap();
1000
1001 let memory_exec = DataSourceExec::from_data_source(
1003 MemorySourceConfig::try_new(
1004 &[vec![batch1, batch2, batch3, batch4, batch5, batch6]],
1005 schema.clone(),
1006 None,
1007 )
1008 .unwrap(),
1009 );
1010
1011 let divide_exec = Arc::new(SeriesDivideExec {
1013 tag_columns: vec!["host".to_string(), "path".to_string()],
1014 time_index_column: "time_index".to_string(),
1015 input: memory_exec,
1016 metric: ExecutionPlanMetricsSet::new(),
1017 });
1018
1019 let session_context = SessionContext::default();
1021 let result =
1022 datafusion::physical_plan::collect(divide_exec.clone(), session_context.task_ctx())
1023 .await
1024 .unwrap();
1025
1026 assert_eq!(result.len(), 3);
1028
1029 assert_eq!(result[0].num_rows(), 5);
1031
1032 assert_eq!(result[1].num_rows(), 4);
1034
1035 assert_eq!(result[2].num_rows(), 5);
1037
1038 let host_array1 = result[0]
1040 .column(0)
1041 .as_any()
1042 .downcast_ref::<StringArray>()
1043 .unwrap();
1044 let path_array1 = result[0]
1045 .column(1)
1046 .as_any()
1047 .downcast_ref::<StringArray>()
1048 .unwrap();
1049 let time_index_array1 = result[0]
1050 .column(2)
1051 .as_any()
1052 .downcast_ref::<datafusion::arrow::array::TimestampMillisecondArray>()
1053 .unwrap();
1054
1055 for i in 0..5 {
1056 assert_eq!(host_array1.value(i), "server1");
1057 assert_eq!(path_array1.value(i), "/var/log");
1058 assert_eq!(time_index_array1.value(i), 1000 + (i as i64) * 1000);
1059 }
1060
1061 let host_array2 = result[1]
1063 .column(0)
1064 .as_any()
1065 .downcast_ref::<StringArray>()
1066 .unwrap();
1067 let path_array2 = result[1]
1068 .column(1)
1069 .as_any()
1070 .downcast_ref::<StringArray>()
1071 .unwrap();
1072 let time_index_array2 = result[1]
1073 .column(2)
1074 .as_any()
1075 .downcast_ref::<datafusion::arrow::array::TimestampMillisecondArray>()
1076 .unwrap();
1077
1078 for i in 0..4 {
1079 assert_eq!(host_array2.value(i), "server2");
1080 assert_eq!(path_array2.value(i), "/var/data");
1081 assert_eq!(time_index_array2.value(i), 6000 + (i as i64) * 1000);
1082 }
1083
1084 let host_array3 = result[2]
1086 .column(0)
1087 .as_any()
1088 .downcast_ref::<StringArray>()
1089 .unwrap();
1090 let path_array3 = result[2]
1091 .column(1)
1092 .as_any()
1093 .downcast_ref::<StringArray>()
1094 .unwrap();
1095 let time_index_array3 = result[2]
1096 .column(2)
1097 .as_any()
1098 .downcast_ref::<datafusion::arrow::array::TimestampMillisecondArray>()
1099 .unwrap();
1100
1101 for i in 0..5 {
1102 assert_eq!(host_array3.value(i), "server3");
1103 assert_eq!(path_array3.value(i), "/opt/logs");
1104 assert_eq!(time_index_array3.value(i), 10000 + (i as i64) * 1000);
1105 }
1106
1107 let mut divide_stream = divide_exec
1109 .execute(0, SessionContext::default().task_ctx())
1110 .unwrap();
1111
1112 let batch1 = divide_stream.next().await.unwrap().unwrap();
1114 assert_eq!(batch1.num_rows(), 5); let batch2 = divide_stream.next().await.unwrap().unwrap();
1117 assert_eq!(batch2.num_rows(), 4); let batch3 = divide_stream.next().await.unwrap().unwrap();
1120 assert_eq!(batch3.num_rows(), 5); assert!(divide_stream.next().await.is_none());
1124 }
1125
1126 #[tokio::test]
1127 async fn test_string_tag_column_types() {
1128 let schema = Arc::new(Schema::new(vec![
1129 Field::new("tag_large", DataType::LargeUtf8, false),
1130 Field::new("tag_view", DataType::Utf8View, false),
1131 Field::new(
1132 "time_index",
1133 DataType::Timestamp(datafusion::arrow::datatypes::TimeUnit::Millisecond, None),
1134 false,
1135 ),
1136 ]));
1137
1138 let batch1 = RecordBatch::try_new(
1139 schema.clone(),
1140 vec![
1141 Arc::new(LargeStringArray::from(vec!["a", "a", "a", "a"])),
1142 Arc::new(StringViewArray::from(vec!["x", "x", "y", "y"])),
1143 Arc::new(datafusion::arrow::array::TimestampMillisecondArray::from(
1144 vec![1000, 2000, 1000, 2000],
1145 )),
1146 ],
1147 )
1148 .unwrap();
1149
1150 let batch2 = RecordBatch::try_new(
1151 schema.clone(),
1152 vec![
1153 Arc::new(LargeStringArray::from(vec!["b", "b"])),
1154 Arc::new(StringViewArray::from(vec!["x", "x"])),
1155 Arc::new(datafusion::arrow::array::TimestampMillisecondArray::from(
1156 vec![1000, 2000],
1157 )),
1158 ],
1159 )
1160 .unwrap();
1161
1162 let memory_exec: Arc<dyn ExecutionPlan> = Arc::new(DataSourceExec::new(Arc::new(
1163 MemorySourceConfig::try_new(&[vec![batch1, batch2]], schema.clone(), None).unwrap(),
1164 )));
1165
1166 let divide_exec = Arc::new(SeriesDivideExec {
1167 tag_columns: vec!["tag_large".to_string(), "tag_view".to_string()],
1168 time_index_column: "time_index".to_string(),
1169 input: memory_exec,
1170 metric: ExecutionPlanMetricsSet::new(),
1171 });
1172
1173 let session_context = SessionContext::default();
1174 let result = datafusion::physical_plan::collect(divide_exec, session_context.task_ctx())
1175 .await
1176 .unwrap();
1177
1178 assert_eq!(result.len(), 3);
1179 for ((expected_large, expected_view), batch) in [("a", "x"), ("a", "y"), ("b", "x")]
1180 .into_iter()
1181 .zip(result.iter())
1182 {
1183 assert_eq!(batch.num_rows(), 2);
1184
1185 let tag_large_array = batch
1186 .column(0)
1187 .as_any()
1188 .downcast_ref::<LargeStringArray>()
1189 .unwrap();
1190 let tag_view_array = batch
1191 .column(1)
1192 .as_any()
1193 .downcast_ref::<StringViewArray>()
1194 .unwrap();
1195
1196 for row in 0..batch.num_rows() {
1197 assert_eq!(tag_large_array.value(row), expected_large);
1198 assert_eq!(tag_view_array.value(row), expected_view);
1199 }
1200 }
1201 }
1202
1203 #[tokio::test]
1204 async fn test_u64_tag_column() {
1205 let schema = Arc::new(Schema::new(vec![
1206 Field::new("tsid", DataType::UInt64, false),
1207 Field::new(
1208 "time_index",
1209 DataType::Timestamp(datafusion::arrow::datatypes::TimeUnit::Millisecond, None),
1210 false,
1211 ),
1212 ]));
1213
1214 let batch1 = RecordBatch::try_new(
1215 schema.clone(),
1216 vec![
1217 Arc::new(UInt64Array::from(vec![1, 1, 2, 2])),
1218 Arc::new(datafusion::arrow::array::TimestampMillisecondArray::from(
1219 vec![1000, 2000, 1000, 2000],
1220 )),
1221 ],
1222 )
1223 .unwrap();
1224
1225 let batch2 = RecordBatch::try_new(
1226 schema.clone(),
1227 vec![
1228 Arc::new(UInt64Array::from(vec![3, 3])),
1229 Arc::new(datafusion::arrow::array::TimestampMillisecondArray::from(
1230 vec![1000, 2000],
1231 )),
1232 ],
1233 )
1234 .unwrap();
1235
1236 let memory_exec: Arc<dyn ExecutionPlan> = Arc::new(DataSourceExec::new(Arc::new(
1237 MemorySourceConfig::try_new(&[vec![batch1, batch2]], schema.clone(), None).unwrap(),
1238 )));
1239
1240 let divide_exec = Arc::new(SeriesDivideExec {
1241 tag_columns: vec!["tsid".to_string()],
1242 time_index_column: "time_index".to_string(),
1243 input: memory_exec,
1244 metric: ExecutionPlanMetricsSet::new(),
1245 });
1246
1247 let session_context = SessionContext::default();
1248 let result = datafusion::physical_plan::collect(divide_exec, session_context.task_ctx())
1249 .await
1250 .unwrap();
1251
1252 assert_eq!(result.len(), 3);
1253 for (expected_tsid, batch) in [1u64, 2u64, 3u64].into_iter().zip(result.iter()) {
1254 assert_eq!(batch.num_rows(), 2);
1255 let tsid_array = batch
1256 .column(0)
1257 .as_any()
1258 .downcast_ref::<UInt64Array>()
1259 .unwrap();
1260 assert!(tsid_array.iter().all(|v| v == Some(expected_tsid)));
1261 }
1262 }
1263}