1use std::any::Any;
16use std::collections::HashMap;
17use std::pin::Pin;
18use std::sync::Arc;
19use std::task::{Context, Poll};
20
21use datafusion::common::stats::Precision;
22use datafusion::common::{DFSchema, DFSchemaRef, Result as DataFusionResult, Statistics};
23use datafusion::error::DataFusionError;
24use datafusion::execution::context::TaskContext;
25use datafusion::logical_expr::{EmptyRelation, LogicalPlan, UserDefinedLogicalNodeCore};
26use datafusion::physical_expr::EquivalenceProperties;
27use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
28use datafusion::physical_plan::{
29 DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, Partitioning, PlanProperties,
30 RecordBatchStream, SendableRecordBatchStream,
31};
32use datafusion::prelude::Expr;
33use datafusion::sql::TableReference;
34use datatypes::arrow::array::{Array, Float64Array, StringArray, TimestampMillisecondArray};
35use datatypes::arrow::compute::{CastOptions, cast_with_options, concat_batches};
36use datatypes::arrow::datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit};
37use datatypes::arrow::record_batch::RecordBatch;
38use futures::{Stream, StreamExt, ready};
39use greptime_proto::substrait_extension as pb;
40use prost::Message;
41use snafu::ResultExt;
42
43use crate::error::{ColumnNotFoundSnafu, DataFusionPlanningSnafu, DeserializeSnafu, Result};
44use crate::extension_plan::{Millisecond, resolve_column_name, serialize_column_index};
45
46#[derive(Debug, Clone, PartialEq, Eq, Hash)]
52pub struct ScalarCalculate {
53 start: Millisecond,
54 end: Millisecond,
55 interval: Millisecond,
56
57 time_index: String,
58 tag_columns: Vec<String>,
59 field_column: String,
60 input: LogicalPlan,
61 output_schema: DFSchemaRef,
62 unfix: Option<UnfixIndices>,
63}
64
65#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd)]
66struct UnfixIndices {
67 pub time_index_idx: u64,
68 pub tag_column_indices: Vec<u64>,
69 pub field_column_idx: u64,
70}
71
72impl ScalarCalculate {
73 #[allow(clippy::too_many_arguments)]
75 pub fn new(
76 start: Millisecond,
77 end: Millisecond,
78 interval: Millisecond,
79 input: LogicalPlan,
80 time_index: &str,
81 tag_columns: &[String],
82 field_column: &str,
83 table_name: Option<&str>,
84 ) -> Result<Self> {
85 let input_schema = input.schema();
86 let Ok(ts_field) = input_schema
87 .field_with_unqualified_name(time_index)
88 .cloned()
89 else {
90 return ColumnNotFoundSnafu { col: time_index }.fail();
91 };
92 let val_field = Field::new(format!("scalar({})", field_column), DataType::Float64, true);
93 let qualifier = table_name.map(TableReference::bare);
94 let schema = DFSchema::new_with_metadata(
95 vec![
96 (qualifier.clone(), Arc::new(ts_field)),
97 (qualifier, Arc::new(val_field)),
98 ],
99 input_schema.metadata().clone(),
100 )
101 .context(DataFusionPlanningSnafu)?;
102
103 Ok(Self {
104 start,
105 end,
106 interval,
107 time_index: time_index.to_string(),
108 tag_columns: tag_columns.to_vec(),
109 field_column: field_column.to_string(),
110 input,
111 output_schema: Arc::new(schema),
112 unfix: None,
113 })
114 }
115
116 pub const fn name() -> &'static str {
118 "ScalarCalculate"
119 }
120
121 pub fn to_execution_plan(
123 &self,
124 exec_input: Arc<dyn ExecutionPlan>,
125 ) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
126 let fields: Vec<_> = self
127 .output_schema
128 .fields()
129 .iter()
130 .map(|field| Field::new(field.name(), field.data_type().clone(), field.is_nullable()))
131 .collect();
132 let input_schema = exec_input.schema();
133 let ts_index = input_schema
134 .index_of(&self.time_index)
135 .map_err(|e| DataFusionError::ArrowError(Box::new(e), None))?;
136 let val_index = input_schema
137 .index_of(&self.field_column)
138 .map_err(|e| DataFusionError::ArrowError(Box::new(e), None))?;
139 let schema = Arc::new(Schema::new(fields));
140 let properties = exec_input.properties();
141 let properties = PlanProperties::new(
142 EquivalenceProperties::new(schema.clone()),
143 Partitioning::UnknownPartitioning(1),
144 properties.emission_type,
145 properties.boundedness,
146 );
147 Ok(Arc::new(ScalarCalculateExec {
148 start: self.start,
149 end: self.end,
150 interval: self.interval,
151 schema,
152 input: exec_input,
153 project_index: (ts_index, val_index),
154 tag_columns: self.tag_columns.clone(),
155 metric: ExecutionPlanMetricsSet::new(),
156 properties,
157 }))
158 }
159
160 pub fn serialize(&self) -> Vec<u8> {
161 let time_index_idx = serialize_column_index(self.input.schema(), &self.time_index);
162
163 let tag_column_indices = self
164 .tag_columns
165 .iter()
166 .map(|name| serialize_column_index(self.input.schema(), name))
167 .collect::<Vec<u64>>();
168
169 let field_column_idx = serialize_column_index(self.input.schema(), &self.field_column);
170
171 pb::ScalarCalculate {
172 start: self.start,
173 end: self.end,
174 interval: self.interval,
175 time_index_idx,
176 tag_column_indices,
177 field_column_idx,
178 ..Default::default()
179 }
180 .encode_to_vec()
181 }
182
183 pub fn deserialize(bytes: &[u8]) -> Result<Self> {
184 let pb_scalar_calculate = pb::ScalarCalculate::decode(bytes).context(DeserializeSnafu)?;
185 let placeholder_plan = LogicalPlan::EmptyRelation(EmptyRelation {
186 produce_one_row: false,
187 schema: Arc::new(DFSchema::empty()),
188 });
189
190 let unfix = UnfixIndices {
191 time_index_idx: pb_scalar_calculate.time_index_idx,
192 tag_column_indices: pb_scalar_calculate.tag_column_indices.clone(),
193 field_column_idx: pb_scalar_calculate.field_column_idx,
194 };
195
196 let ts_field = Field::new(
198 "placeholder_time_index",
199 DataType::Timestamp(TimeUnit::Millisecond, None),
200 true,
201 );
202 let val_field = Field::new("placeholder_field", DataType::Float64, true);
203 let schema = DFSchema::new_with_metadata(
205 vec![(None, Arc::new(ts_field)), (None, Arc::new(val_field))],
206 HashMap::new(),
207 )
208 .context(DataFusionPlanningSnafu)?;
209
210 Ok(Self {
211 start: pb_scalar_calculate.start,
212 end: pb_scalar_calculate.end,
213 interval: pb_scalar_calculate.interval,
214 time_index: String::new(),
215 tag_columns: Vec::new(),
216 field_column: String::new(),
217 output_schema: Arc::new(schema),
218 input: placeholder_plan,
219 unfix: Some(unfix),
220 })
221 }
222}
223
224impl PartialOrd for ScalarCalculate {
225 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
226 match self.start.partial_cmp(&other.start) {
228 Some(core::cmp::Ordering::Equal) => {}
229 ord => return ord,
230 }
231 match self.end.partial_cmp(&other.end) {
232 Some(core::cmp::Ordering::Equal) => {}
233 ord => return ord,
234 }
235 match self.interval.partial_cmp(&other.interval) {
236 Some(core::cmp::Ordering::Equal) => {}
237 ord => return ord,
238 }
239 match self.time_index.partial_cmp(&other.time_index) {
240 Some(core::cmp::Ordering::Equal) => {}
241 ord => return ord,
242 }
243 match self.tag_columns.partial_cmp(&other.tag_columns) {
244 Some(core::cmp::Ordering::Equal) => {}
245 ord => return ord,
246 }
247 match self.field_column.partial_cmp(&other.field_column) {
248 Some(core::cmp::Ordering::Equal) => {}
249 ord => return ord,
250 }
251 self.input.partial_cmp(&other.input)
252 }
253}
254
255impl UserDefinedLogicalNodeCore for ScalarCalculate {
256 fn name(&self) -> &str {
257 Self::name()
258 }
259
260 fn inputs(&self) -> Vec<&LogicalPlan> {
261 vec![&self.input]
262 }
263
264 fn schema(&self) -> &DFSchemaRef {
265 &self.output_schema
266 }
267
268 fn expressions(&self) -> Vec<Expr> {
269 vec![]
270 }
271
272 fn fmt_for_explain(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
273 write!(f, "ScalarCalculate: tags={:?}", self.tag_columns)
274 }
275
276 fn with_exprs_and_inputs(
277 &self,
278 exprs: Vec<Expr>,
279 inputs: Vec<LogicalPlan>,
280 ) -> DataFusionResult<Self> {
281 if !exprs.is_empty() {
282 return Err(DataFusionError::Internal(
283 "ScalarCalculate should not have any expressions".to_string(),
284 ));
285 }
286
287 let input: LogicalPlan = inputs.into_iter().next().unwrap();
288 let input_schema = input.schema();
289
290 if let Some(unfix) = &self.unfix {
291 let time_index = resolve_column_name(
293 unfix.time_index_idx,
294 input_schema,
295 "ScalarCalculate",
296 "time index",
297 )?;
298
299 let tag_columns = unfix
300 .tag_column_indices
301 .iter()
302 .map(|idx| resolve_column_name(*idx, input_schema, "ScalarCalculate", "tag"))
303 .collect::<DataFusionResult<Vec<String>>>()?;
304
305 let field_column = resolve_column_name(
306 unfix.field_column_idx,
307 input_schema,
308 "ScalarCalculate",
309 "field",
310 )?;
311
312 let ts_field = Field::new(
314 &time_index,
315 DataType::Timestamp(TimeUnit::Millisecond, None),
316 true,
317 );
318 let val_field =
319 Field::new(format!("scalar({})", field_column), DataType::Float64, true);
320 let schema = DFSchema::new_with_metadata(
321 vec![(None, Arc::new(ts_field)), (None, Arc::new(val_field))],
322 HashMap::new(),
323 )
324 .context(DataFusionPlanningSnafu)?;
325
326 Ok(ScalarCalculate {
327 start: self.start,
328 end: self.end,
329 interval: self.interval,
330 time_index,
331 tag_columns,
332 field_column,
333 input,
334 output_schema: Arc::new(schema),
335 unfix: None,
336 })
337 } else {
338 Ok(ScalarCalculate {
339 start: self.start,
340 end: self.end,
341 interval: self.interval,
342 time_index: self.time_index.clone(),
343 tag_columns: self.tag_columns.clone(),
344 field_column: self.field_column.clone(),
345 input,
346 output_schema: self.output_schema.clone(),
347 unfix: None,
348 })
349 }
350 }
351}
352
353#[derive(Debug, Clone)]
354struct ScalarCalculateExec {
355 start: Millisecond,
356 end: Millisecond,
357 interval: Millisecond,
358 schema: SchemaRef,
359 project_index: (usize, usize),
360 input: Arc<dyn ExecutionPlan>,
361 tag_columns: Vec<String>,
362 metric: ExecutionPlanMetricsSet,
363 properties: PlanProperties,
364}
365
366impl ExecutionPlan for ScalarCalculateExec {
367 fn as_any(&self) -> &dyn Any {
368 self
369 }
370
371 fn schema(&self) -> SchemaRef {
372 self.schema.clone()
373 }
374
375 fn properties(&self) -> &PlanProperties {
376 &self.properties
377 }
378
379 fn maintains_input_order(&self) -> Vec<bool> {
380 vec![true; self.children().len()]
381 }
382
383 fn required_input_distribution(&self) -> Vec<Distribution> {
384 vec![Distribution::SinglePartition]
385 }
386
387 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
388 vec![&self.input]
389 }
390
391 fn with_new_children(
392 self: Arc<Self>,
393 children: Vec<Arc<dyn ExecutionPlan>>,
394 ) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
395 Ok(Arc::new(ScalarCalculateExec {
396 start: self.start,
397 end: self.end,
398 interval: self.interval,
399 schema: self.schema.clone(),
400 project_index: self.project_index,
401 tag_columns: self.tag_columns.clone(),
402 input: children[0].clone(),
403 metric: self.metric.clone(),
404 properties: self.properties.clone(),
405 }))
406 }
407
408 fn execute(
409 &self,
410 partition: usize,
411 context: Arc<TaskContext>,
412 ) -> DataFusionResult<SendableRecordBatchStream> {
413 let baseline_metric = BaselineMetrics::new(&self.metric, partition);
414 let input = self.input.execute(partition, context)?;
415 let schema = input.schema();
416 let tag_indices = self
417 .tag_columns
418 .iter()
419 .map(|tag| {
420 schema
421 .column_with_name(tag)
422 .unwrap_or_else(|| panic!("tag column not found {tag}"))
423 .0
424 })
425 .collect();
426
427 Ok(Box::pin(ScalarCalculateStream {
428 start: self.start,
429 end: self.end,
430 interval: self.interval,
431 schema: self.schema.clone(),
432 project_index: self.project_index,
433 metric: baseline_metric,
434 tag_indices,
435 input,
436 have_multi_series: false,
437 done: false,
438 batch: None,
439 tag_value: None,
440 }))
441 }
442
443 fn metrics(&self) -> Option<MetricsSet> {
444 Some(self.metric.clone_inner())
445 }
446
447 fn partition_statistics(&self, partition: Option<usize>) -> DataFusionResult<Statistics> {
448 let input_stats = self.input.partition_statistics(partition)?;
449
450 let estimated_row_num = (self.end - self.start) as f64 / self.interval as f64;
451 let estimated_total_bytes = input_stats
452 .total_byte_size
453 .get_value()
454 .zip(input_stats.num_rows.get_value())
455 .map(|(size, rows)| {
456 Precision::Inexact(((*size as f64 / *rows as f64) * estimated_row_num).floor() as _)
457 })
458 .unwrap_or_default();
459
460 Ok(Statistics {
461 num_rows: Precision::Inexact(estimated_row_num as _),
462 total_byte_size: estimated_total_bytes,
463 column_statistics: Statistics::unknown_column(&self.schema()),
465 })
466 }
467
468 fn name(&self) -> &str {
469 "ScalarCalculateExec"
470 }
471}
472
473impl DisplayAs for ScalarCalculateExec {
474 fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
475 match t {
476 DisplayFormatType::Default
477 | DisplayFormatType::Verbose
478 | DisplayFormatType::TreeRender => {
479 write!(f, "ScalarCalculateExec: tags={:?}", self.tag_columns)
480 }
481 }
482 }
483}
484
485struct ScalarCalculateStream {
486 start: Millisecond,
487 end: Millisecond,
488 interval: Millisecond,
489 schema: SchemaRef,
490 input: SendableRecordBatchStream,
491 metric: BaselineMetrics,
492 tag_indices: Vec<usize>,
493 project_index: (usize, usize),
495 have_multi_series: bool,
496 done: bool,
497 batch: Option<RecordBatch>,
498 tag_value: Option<Vec<String>>,
499}
500
501impl RecordBatchStream for ScalarCalculateStream {
502 fn schema(&self) -> SchemaRef {
503 self.schema.clone()
504 }
505}
506
507impl ScalarCalculateStream {
508 fn update_batch(&mut self, batch: RecordBatch) -> DataFusionResult<()> {
509 let _timer = self.metric.elapsed_compute();
510 if self.have_multi_series || batch.num_rows() == 0 {
512 return Ok(());
513 }
514 if self.tag_indices.is_empty() {
516 self.append_batch(batch)?;
517 return Ok(());
518 }
519 let all_same = |val: Option<&str>, array: &StringArray| -> bool {
520 if let Some(v) = val {
521 array.iter().all(|s| s == Some(v))
522 } else {
523 array.is_empty() || array.iter().skip(1).all(|s| s == Some(array.value(0)))
524 }
525 };
526 let all_tag_columns_same = if let Some(tags) = &self.tag_value {
528 tags.iter()
529 .zip(self.tag_indices.iter())
530 .all(|(value, index)| {
531 let array = batch.column(*index);
532 let string_array = array.as_any().downcast_ref::<StringArray>().unwrap();
533 all_same(Some(value), string_array)
534 })
535 } else {
536 let mut tag_values = Vec::with_capacity(self.tag_indices.len());
537 let is_same = self.tag_indices.iter().all(|index| {
538 let array = batch.column(*index);
539 let string_array = array.as_any().downcast_ref::<StringArray>().unwrap();
540 tag_values.push(string_array.value(0).to_string());
541 all_same(None, string_array)
542 });
543 self.tag_value = Some(tag_values);
544 is_same
545 };
546 if all_tag_columns_same {
547 self.append_batch(batch)?;
548 } else {
549 self.have_multi_series = true;
550 }
551 Ok(())
552 }
553
554 fn append_batch(&mut self, input_batch: RecordBatch) -> DataFusionResult<()> {
555 let ts_column = input_batch.column(self.project_index.0).clone();
556 let val_column = cast_with_options(
557 input_batch.column(self.project_index.1),
558 &DataType::Float64,
559 &CastOptions::default(),
560 )?;
561 let input_batch = RecordBatch::try_new(self.schema.clone(), vec![ts_column, val_column])?;
562 if let Some(batch) = &self.batch {
563 self.batch = Some(concat_batches(&self.schema, vec![batch, &input_batch])?);
564 } else {
565 self.batch = Some(input_batch);
566 }
567 Ok(())
568 }
569}
570
571impl Stream for ScalarCalculateStream {
572 type Item = DataFusionResult<RecordBatch>;
573
574 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
575 loop {
576 if self.done {
577 return Poll::Ready(None);
578 }
579 match ready!(self.input.poll_next_unpin(cx)) {
580 Some(Ok(batch)) => {
581 self.update_batch(batch)?;
582 }
583 Some(Err(e)) => return Poll::Ready(Some(Err(e))),
585 None => {
587 self.done = true;
588 return match self.batch.take() {
589 Some(batch) if !self.have_multi_series => {
590 self.metric.record_output(batch.num_rows());
591 Poll::Ready(Some(Ok(batch)))
592 }
593 _ => {
594 let time_array = (self.start..=self.end)
595 .step_by(self.interval as _)
596 .collect::<Vec<_>>();
597 let nums = time_array.len();
598 let nan_batch = RecordBatch::try_new(
599 self.schema.clone(),
600 vec![
601 Arc::new(TimestampMillisecondArray::from(time_array)),
602 Arc::new(Float64Array::from(vec![f64::NAN; nums])),
603 ],
604 )?;
605 self.metric.record_output(nan_batch.num_rows());
606 Poll::Ready(Some(Ok(nan_batch)))
607 }
608 };
609 }
610 };
611 }
612 }
613}
614
615#[cfg(test)]
616mod test {
617 use datafusion::arrow::datatypes::{DataType, Field, Schema};
618 use datafusion::datasource::memory::MemorySourceConfig;
619 use datafusion::datasource::source::DataSourceExec;
620 use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
621 use datafusion::prelude::SessionContext;
622 use datatypes::arrow::array::{Float64Array, TimestampMillisecondArray};
623 use datatypes::arrow::datatypes::TimeUnit;
624
625 use super::*;
626
627 fn prepare_test_data(series: Vec<RecordBatch>) -> DataSourceExec {
628 let schema = Arc::new(Schema::new(vec![
629 Field::new("ts", DataType::Timestamp(TimeUnit::Millisecond, None), true),
630 Field::new("tag1", DataType::Utf8, true),
631 Field::new("tag2", DataType::Utf8, true),
632 Field::new("val", DataType::Float64, true),
633 ]));
634 DataSourceExec::new(Arc::new(
635 MemorySourceConfig::try_new(&[series], schema, None).unwrap(),
636 ))
637 }
638
639 async fn run_test(series: Vec<RecordBatch>, expected: &str) {
640 let memory_exec = Arc::new(prepare_test_data(series));
641 let schema = Arc::new(Schema::new(vec![
642 Field::new("ts", DataType::Timestamp(TimeUnit::Millisecond, None), true),
643 Field::new("val", DataType::Float64, true),
644 ]));
645 let properties = PlanProperties::new(
646 EquivalenceProperties::new(schema.clone()),
647 Partitioning::UnknownPartitioning(1),
648 EmissionType::Incremental,
649 Boundedness::Bounded,
650 );
651 let scalar_exec = Arc::new(ScalarCalculateExec {
652 start: 0,
653 end: 15_000,
654 interval: 5000,
655 tag_columns: vec!["tag1".to_string(), "tag2".to_string()],
656 input: memory_exec,
657 schema,
658 project_index: (0, 3),
659 metric: ExecutionPlanMetricsSet::new(),
660 properties,
661 });
662 let session_context = SessionContext::default();
663 let result = datafusion::physical_plan::collect(scalar_exec, session_context.task_ctx())
664 .await
665 .unwrap();
666 let result_literal = datatypes::arrow::util::pretty::pretty_format_batches(&result)
667 .unwrap()
668 .to_string();
669 assert_eq!(result_literal, expected);
670 }
671
672 #[tokio::test]
673 async fn same_series() {
674 let schema = Arc::new(Schema::new(vec![
675 Field::new("ts", DataType::Timestamp(TimeUnit::Millisecond, None), true),
676 Field::new("tag1", DataType::Utf8, true),
677 Field::new("tag2", DataType::Utf8, true),
678 Field::new("val", DataType::Float64, true),
679 ]));
680 run_test(
681 vec![
682 RecordBatch::try_new(
683 schema.clone(),
684 vec![
685 Arc::new(TimestampMillisecondArray::from(vec![0, 5_000])),
686 Arc::new(StringArray::from(vec!["foo", "foo"])),
687 Arc::new(StringArray::from(vec!["🥺", "🥺"])),
688 Arc::new(Float64Array::from(vec![1.0, 2.0])),
689 ],
690 )
691 .unwrap(),
692 RecordBatch::try_new(
693 schema,
694 vec![
695 Arc::new(TimestampMillisecondArray::from(vec![10_000, 15_000])),
696 Arc::new(StringArray::from(vec!["foo", "foo"])),
697 Arc::new(StringArray::from(vec!["🥺", "🥺"])),
698 Arc::new(Float64Array::from(vec![3.0, 4.0])),
699 ],
700 )
701 .unwrap(),
702 ],
703 "+---------------------+-----+\
704 \n| ts | val |\
705 \n+---------------------+-----+\
706 \n| 1970-01-01T00:00:00 | 1.0 |\
707 \n| 1970-01-01T00:00:05 | 2.0 |\
708 \n| 1970-01-01T00:00:10 | 3.0 |\
709 \n| 1970-01-01T00:00:15 | 4.0 |\
710 \n+---------------------+-----+",
711 )
712 .await
713 }
714
715 #[tokio::test]
716 async fn diff_series() {
717 let schema = Arc::new(Schema::new(vec![
718 Field::new("ts", DataType::Timestamp(TimeUnit::Millisecond, None), true),
719 Field::new("tag1", DataType::Utf8, true),
720 Field::new("tag2", DataType::Utf8, true),
721 Field::new("val", DataType::Float64, true),
722 ]));
723 run_test(
724 vec![
725 RecordBatch::try_new(
726 schema.clone(),
727 vec![
728 Arc::new(TimestampMillisecondArray::from(vec![0, 5_000])),
729 Arc::new(StringArray::from(vec!["foo", "foo"])),
730 Arc::new(StringArray::from(vec!["🥺", "🥺"])),
731 Arc::new(Float64Array::from(vec![1.0, 2.0])),
732 ],
733 )
734 .unwrap(),
735 RecordBatch::try_new(
736 schema,
737 vec![
738 Arc::new(TimestampMillisecondArray::from(vec![10_000, 15_000])),
739 Arc::new(StringArray::from(vec!["foo", "foo"])),
740 Arc::new(StringArray::from(vec!["🥺", "😝"])),
741 Arc::new(Float64Array::from(vec![3.0, 4.0])),
742 ],
743 )
744 .unwrap(),
745 ],
746 "+---------------------+-----+\
747 \n| ts | val |\
748 \n+---------------------+-----+\
749 \n| 1970-01-01T00:00:00 | NaN |\
750 \n| 1970-01-01T00:00:05 | NaN |\
751 \n| 1970-01-01T00:00:10 | NaN |\
752 \n| 1970-01-01T00:00:15 | NaN |\
753 \n+---------------------+-----+",
754 )
755 .await
756 }
757
758 #[tokio::test]
759 async fn empty_series() {
760 let schema = Arc::new(Schema::new(vec![
761 Field::new("ts", DataType::Timestamp(TimeUnit::Millisecond, None), true),
762 Field::new("tag1", DataType::Utf8, true),
763 Field::new("tag2", DataType::Utf8, true),
764 Field::new("val", DataType::Float64, true),
765 ]));
766 run_test(
767 vec![
768 RecordBatch::try_new(
769 schema,
770 vec![
771 Arc::new(TimestampMillisecondArray::new_null(0)),
772 Arc::new(StringArray::new_null(0)),
773 Arc::new(StringArray::new_null(0)),
774 Arc::new(Float64Array::new_null(0)),
775 ],
776 )
777 .unwrap(),
778 ],
779 "+---------------------+-----+\
780 \n| ts | val |\
781 \n+---------------------+-----+\
782 \n| 1970-01-01T00:00:00 | NaN |\
783 \n| 1970-01-01T00:00:05 | NaN |\
784 \n| 1970-01-01T00:00:10 | NaN |\
785 \n| 1970-01-01T00:00:15 | NaN |\
786 \n+---------------------+-----+",
787 )
788 .await
789 }
790}