1use std::any::Any;
16use std::pin::Pin;
17use std::sync::Arc;
18use std::task::{Context, Poll};
19
20use datafusion::arrow::array::{BooleanArray, Float64Array};
21use datafusion::arrow::compute;
22use datafusion::common::{DFSchema, DFSchemaRef, Result as DataFusionResult, Statistics};
23use datafusion::error::DataFusionError;
24use datafusion::execution::context::TaskContext;
25use datafusion::logical_expr::{EmptyRelation, Expr, LogicalPlan, UserDefinedLogicalNodeCore};
26use datafusion::physical_plan::expressions::Column as ColumnExpr;
27use datafusion::physical_plan::metrics::{
28 BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, MetricValue, MetricsSet,
29};
30use datafusion::physical_plan::{
31 DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, PlanProperties, RecordBatchStream,
32 SendableRecordBatchStream,
33};
34use datafusion_expr::col;
35use datatypes::arrow::array::TimestampMillisecondArray;
36use datatypes::arrow::datatypes::SchemaRef;
37use datatypes::arrow::record_batch::RecordBatch;
38use futures::{Stream, StreamExt, ready};
39use greptime_proto::substrait_extension as pb;
40use prost::Message;
41use snafu::ResultExt;
42
43use crate::error::{DeserializeSnafu, Result};
44use crate::extension_plan::{
45 METRIC_NUM_SERIES, Millisecond, resolve_column_name, serialize_column_index,
46};
47use crate::metrics::PROMQL_SERIES_COUNT;
48
49#[derive(Debug, PartialEq, Eq, Hash, PartialOrd)]
57pub struct SeriesNormalize {
58 offset: Millisecond,
59 time_index_column_name: String,
60 need_filter_out_nan: bool,
61 tag_columns: Vec<String>,
62
63 input: LogicalPlan,
64 unfix: Option<UnfixIndices>,
65}
66
67#[derive(Debug, PartialEq, Eq, Hash, PartialOrd)]
68struct UnfixIndices {
69 pub time_index_idx: u64,
70 pub tag_column_indices: Vec<u64>,
71}
72
73impl UserDefinedLogicalNodeCore for SeriesNormalize {
74 fn name(&self) -> &str {
75 Self::name()
76 }
77
78 fn inputs(&self) -> Vec<&LogicalPlan> {
79 vec![&self.input]
80 }
81
82 fn schema(&self) -> &DFSchemaRef {
83 self.input.schema()
84 }
85
86 fn expressions(&self) -> Vec<datafusion::logical_expr::Expr> {
87 if self.unfix.is_some() {
88 return vec![];
89 }
90
91 self.tag_columns
92 .iter()
93 .map(col)
94 .chain(std::iter::once(col(&self.time_index_column_name)))
95 .collect()
96 }
97
98 fn necessary_children_exprs(&self, output_columns: &[usize]) -> Option<Vec<Vec<usize>>> {
99 if self.unfix.is_some() {
100 return None;
101 }
102
103 let input_schema = self.input.schema();
104 if output_columns.is_empty() {
105 let indices = (0..input_schema.fields().len()).collect::<Vec<_>>();
106 return Some(vec![indices]);
107 }
108
109 let mut required = Vec::with_capacity(output_columns.len() + 1 + self.tag_columns.len());
110 required.extend_from_slice(output_columns);
111 required.push(input_schema.index_of_column_by_name(None, &self.time_index_column_name)?);
112 for tag in &self.tag_columns {
113 required.push(input_schema.index_of_column_by_name(None, tag)?);
114 }
115
116 required.sort_unstable();
117 required.dedup();
118 Some(vec![required])
119 }
120
121 fn fmt_for_explain(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
122 write!(
123 f,
124 "PromSeriesNormalize: offset=[{}], time index=[{}], filter NaN: [{}]",
125 self.offset, self.time_index_column_name, self.need_filter_out_nan
126 )
127 }
128
129 fn with_exprs_and_inputs(
130 &self,
131 _exprs: Vec<Expr>,
132 inputs: Vec<LogicalPlan>,
133 ) -> DataFusionResult<Self> {
134 if inputs.is_empty() {
135 return Err(DataFusionError::Internal(
136 "SeriesNormalize should have at least one input".to_string(),
137 ));
138 }
139
140 let input: LogicalPlan = inputs.into_iter().next().unwrap();
141 let input_schema = input.schema();
142
143 if let Some(unfix) = &self.unfix {
144 let time_index_column_name = resolve_column_name(
146 unfix.time_index_idx,
147 input_schema,
148 "SeriesNormalize",
149 "time index",
150 )?;
151
152 let tag_columns = unfix
153 .tag_column_indices
154 .iter()
155 .map(|idx| resolve_column_name(*idx, input_schema, "SeriesNormalize", "tag"))
156 .collect::<DataFusionResult<Vec<String>>>()?;
157
158 Ok(Self {
159 offset: self.offset,
160 time_index_column_name,
161 need_filter_out_nan: self.need_filter_out_nan,
162 tag_columns,
163 input,
164 unfix: None,
165 })
166 } else {
167 Ok(Self {
168 offset: self.offset,
169 time_index_column_name: self.time_index_column_name.clone(),
170 need_filter_out_nan: self.need_filter_out_nan,
171 tag_columns: self.tag_columns.clone(),
172 input,
173 unfix: None,
174 })
175 }
176 }
177}
178
179impl SeriesNormalize {
180 pub fn new<N: AsRef<str>>(
181 offset: Millisecond,
182 time_index_column_name: N,
183 need_filter_out_nan: bool,
184 tag_columns: Vec<String>,
185 input: LogicalPlan,
186 ) -> Self {
187 Self {
188 offset,
189 time_index_column_name: time_index_column_name.as_ref().to_string(),
190 need_filter_out_nan,
191 tag_columns,
192 input,
193 unfix: None,
194 }
195 }
196
197 pub const fn name() -> &'static str {
198 "SeriesNormalize"
199 }
200
201 pub fn to_execution_plan(&self, exec_input: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
202 Arc::new(SeriesNormalizeExec {
203 offset: self.offset,
204 time_index_column_name: self.time_index_column_name.clone(),
205 need_filter_out_nan: self.need_filter_out_nan,
206 input: exec_input,
207 tag_columns: self.tag_columns.clone(),
208 metric: ExecutionPlanMetricsSet::new(),
209 })
210 }
211
212 pub fn serialize(&self) -> Vec<u8> {
213 let time_index_idx =
214 serialize_column_index(self.input.schema(), &self.time_index_column_name);
215
216 let tag_column_indices = self
217 .tag_columns
218 .iter()
219 .map(|name| serialize_column_index(self.input.schema(), name))
220 .collect::<Vec<u64>>();
221
222 pb::SeriesNormalize {
223 offset: self.offset,
224 time_index_idx,
225 filter_nan: self.need_filter_out_nan,
226 tag_column_indices,
227 ..Default::default()
228 }
229 .encode_to_vec()
230 }
231
232 pub fn deserialize(bytes: &[u8]) -> Result<Self> {
233 let pb_normalize = pb::SeriesNormalize::decode(bytes).context(DeserializeSnafu)?;
234 let placeholder_plan = LogicalPlan::EmptyRelation(EmptyRelation {
235 produce_one_row: false,
236 schema: Arc::new(DFSchema::empty()),
237 });
238
239 let unfix = UnfixIndices {
240 time_index_idx: pb_normalize.time_index_idx,
241 tag_column_indices: pb_normalize.tag_column_indices.clone(),
242 };
243
244 Ok(Self {
245 offset: pb_normalize.offset,
246 time_index_column_name: String::new(),
247 need_filter_out_nan: pb_normalize.filter_nan,
248 tag_columns: Vec::new(),
249 input: placeholder_plan,
250 unfix: Some(unfix),
251 })
252 }
253}
254
255#[derive(Debug)]
256pub struct SeriesNormalizeExec {
257 offset: Millisecond,
258 time_index_column_name: String,
259 need_filter_out_nan: bool,
260 tag_columns: Vec<String>,
261
262 input: Arc<dyn ExecutionPlan>,
263 metric: ExecutionPlanMetricsSet,
264}
265
266impl ExecutionPlan for SeriesNormalizeExec {
267 fn as_any(&self) -> &dyn Any {
268 self
269 }
270
271 fn schema(&self) -> SchemaRef {
272 self.input.schema()
273 }
274
275 fn required_input_distribution(&self) -> Vec<Distribution> {
276 if self.tag_columns.is_empty() {
277 return vec![Distribution::SinglePartition];
278 }
279
280 let schema = self.input.schema();
281 vec![Distribution::HashPartitioned(
282 self.tag_columns
283 .iter()
284 .map(|tag| Arc::new(ColumnExpr::new_with_schema(tag, &schema).unwrap()) as _)
286 .collect(),
287 )]
288 }
289
290 fn properties(&self) -> &Arc<PlanProperties> {
291 self.input.properties()
292 }
293
294 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
295 vec![&self.input]
296 }
297
298 fn with_new_children(
299 self: Arc<Self>,
300 children: Vec<Arc<dyn ExecutionPlan>>,
301 ) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
302 assert!(!children.is_empty());
303 Ok(Arc::new(Self {
304 offset: self.offset,
305 time_index_column_name: self.time_index_column_name.clone(),
306 need_filter_out_nan: self.need_filter_out_nan,
307 input: children[0].clone(),
308 tag_columns: self.tag_columns.clone(),
309 metric: self.metric.clone(),
310 }))
311 }
312
313 fn execute(
314 &self,
315 partition: usize,
316 context: Arc<TaskContext>,
317 ) -> DataFusionResult<SendableRecordBatchStream> {
318 let baseline_metric = BaselineMetrics::new(&self.metric, partition);
319 let metrics_builder = MetricBuilder::new(&self.metric);
320 let num_series = Count::new();
321 metrics_builder
322 .with_partition(partition)
323 .build(MetricValue::Count {
324 name: METRIC_NUM_SERIES.into(),
325 count: num_series.clone(),
326 });
327
328 let input = self.input.execute(partition, context)?;
329 let schema = input.schema();
330 let time_index = schema
331 .column_with_name(&self.time_index_column_name)
332 .expect("time index column not found")
333 .0;
334 Ok(Box::pin(SeriesNormalizeStream {
335 offset: self.offset,
336 time_index,
337 need_filter_out_nan: self.need_filter_out_nan,
338 schema,
339 input,
340 metric: baseline_metric,
341 num_series,
342 }))
343 }
344
345 fn metrics(&self) -> Option<MetricsSet> {
346 Some(self.metric.clone_inner())
347 }
348
349 fn partition_statistics(&self, partition: Option<usize>) -> DataFusionResult<Statistics> {
350 self.input.partition_statistics(partition)
351 }
352
353 fn name(&self) -> &str {
354 "SeriesNormalizeExec"
355 }
356}
357
358impl DisplayAs for SeriesNormalizeExec {
359 fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
360 match t {
361 DisplayFormatType::Default
362 | DisplayFormatType::Verbose
363 | DisplayFormatType::TreeRender => {
364 write!(
365 f,
366 "PromSeriesNormalizeExec: offset=[{}], time index=[{}], filter NaN: [{}]",
367 self.offset, self.time_index_column_name, self.need_filter_out_nan
368 )
369 }
370 }
371 }
372}
373
374pub struct SeriesNormalizeStream {
375 offset: Millisecond,
376 time_index: usize,
378 need_filter_out_nan: bool,
379
380 schema: SchemaRef,
381 input: SendableRecordBatchStream,
382 metric: BaselineMetrics,
383 num_series: Count,
385}
386
387impl SeriesNormalizeStream {
388 pub fn normalize(&self, input: RecordBatch) -> DataFusionResult<RecordBatch> {
389 let ts_column = input
390 .column(self.time_index)
391 .as_any()
392 .downcast_ref::<TimestampMillisecondArray>()
393 .ok_or_else(|| {
394 DataFusionError::Execution(
395 "Time index Column downcast to TimestampMillisecondArray failed".into(),
396 )
397 })?;
398
399 let ts_column_biased = if self.offset == 0 {
401 Arc::new(ts_column.clone()) as _
402 } else {
403 Arc::new(TimestampMillisecondArray::from_iter(
404 ts_column.iter().map(|ts| ts.map(|ts| ts + self.offset)),
405 ))
406 };
407 let mut columns = input.columns().to_vec();
408 columns[self.time_index] = ts_column_biased;
409
410 let result_batch = RecordBatch::try_new(input.schema(), columns)?;
411 if !self.need_filter_out_nan {
412 return Ok(result_batch);
413 }
414
415 let mut filter = vec![true; input.num_rows()];
418 for column in result_batch.columns() {
419 if let Some(float_column) = column.as_any().downcast_ref::<Float64Array>() {
420 for (i, flag) in filter.iter_mut().enumerate() {
421 if float_column.value(i).is_nan() {
422 *flag = false;
423 }
424 }
425 }
426 }
427
428 let result = compute::filter_record_batch(&result_batch, &BooleanArray::from(filter))
429 .map_err(|e| DataFusionError::ArrowError(Box::new(e), None))?;
430 Ok(result)
431 }
432}
433
434impl RecordBatchStream for SeriesNormalizeStream {
435 fn schema(&self) -> SchemaRef {
436 self.schema.clone()
437 }
438}
439
440impl Stream for SeriesNormalizeStream {
441 type Item = DataFusionResult<RecordBatch>;
442
443 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
444 let poll = match ready!(self.input.poll_next_unpin(cx)) {
445 Some(Ok(batch)) => {
446 self.num_series.add(1);
447 let timer = std::time::Instant::now();
448 let result = Ok(batch).and_then(|batch| self.normalize(batch));
449 self.metric.elapsed_compute().add_elapsed(timer);
450 Poll::Ready(Some(result))
451 }
452 None => {
453 PROMQL_SERIES_COUNT.observe(self.num_series.value() as f64);
454 Poll::Ready(None)
455 }
456 Some(Err(e)) => Poll::Ready(Some(Err(e))),
457 };
458 self.metric.record_poll(poll)
459 }
460}
461
462#[cfg(test)]
463mod test {
464 use datafusion::arrow::array::Float64Array;
465 use datafusion::arrow::datatypes::{
466 ArrowPrimitiveType, DataType, Field, Schema, TimestampMillisecondType,
467 };
468 use datafusion::common::ToDFSchema;
469 use datafusion::datasource::memory::MemorySourceConfig;
470 use datafusion::datasource::source::DataSourceExec;
471 use datafusion::logical_expr::{EmptyRelation, LogicalPlan};
472 use datafusion::prelude::SessionContext;
473 use datatypes::arrow::array::TimestampMillisecondArray;
474 use datatypes::arrow_array::StringArray;
475
476 use super::*;
477
478 const TIME_INDEX_COLUMN: &str = "timestamp";
479
480 fn prepare_test_data() -> DataSourceExec {
481 let schema = Arc::new(Schema::new(vec![
482 Field::new(TIME_INDEX_COLUMN, TimestampMillisecondType::DATA_TYPE, true),
483 Field::new("value", DataType::Float64, true),
484 Field::new("path", DataType::Utf8, true),
485 ]));
486 let timestamp_column = Arc::new(TimestampMillisecondArray::from(vec![
487 60_000, 120_000, 0, 30_000, 90_000,
488 ])) as _;
489 let field_column = Arc::new(Float64Array::from(vec![0.0, 1.0, 10.0, 100.0, 1000.0])) as _;
490 let path_column = Arc::new(StringArray::from(vec!["foo", "foo", "foo", "foo", "foo"])) as _;
491 let data = RecordBatch::try_new(
492 schema.clone(),
493 vec![timestamp_column, field_column, path_column],
494 )
495 .unwrap();
496
497 DataSourceExec::new(Arc::new(
498 MemorySourceConfig::try_new(&[vec![data]], schema, None).unwrap(),
499 ))
500 }
501
502 #[test]
503 fn pruning_should_keep_time_and_tag_columns_for_exec() {
504 let df_schema = prepare_test_data().schema().to_dfschema_ref().unwrap();
505 let input = LogicalPlan::EmptyRelation(EmptyRelation {
506 produce_one_row: false,
507 schema: df_schema,
508 });
509 let plan =
510 SeriesNormalize::new(0, TIME_INDEX_COLUMN, true, vec!["path".to_string()], input);
511
512 let output_columns = [1usize];
514 let required = plan.necessary_children_exprs(&output_columns).unwrap();
515 let required = &required[0];
516 assert_eq!(required.as_slice(), &[0, 1, 2]);
517 }
518
519 #[tokio::test]
520 async fn test_sort_record_batch() {
521 let memory_exec = Arc::new(prepare_test_data());
522 let normalize_exec = Arc::new(SeriesNormalizeExec {
523 offset: 0,
524 time_index_column_name: TIME_INDEX_COLUMN.to_string(),
525 need_filter_out_nan: true,
526 input: memory_exec,
527 tag_columns: vec!["path".to_string()],
528 metric: ExecutionPlanMetricsSet::new(),
529 });
530 let session_context = SessionContext::default();
531 let result = datafusion::physical_plan::collect(normalize_exec, session_context.task_ctx())
532 .await
533 .unwrap();
534 let result_literal = datatypes::arrow::util::pretty::pretty_format_batches(&result)
535 .unwrap()
536 .to_string();
537
538 let expected = String::from(
539 "+---------------------+--------+------+\
540 \n| timestamp | value | path |\
541 \n+---------------------+--------+------+\
542 \n| 1970-01-01T00:01:00 | 0.0 | foo |\
543 \n| 1970-01-01T00:02:00 | 1.0 | foo |\
544 \n| 1970-01-01T00:00:00 | 10.0 | foo |\
545 \n| 1970-01-01T00:00:30 | 100.0 | foo |\
546 \n| 1970-01-01T00:01:30 | 1000.0 | foo |\
547 \n+---------------------+--------+------+",
548 );
549
550 assert_eq!(result_literal, expected);
551 }
552
553 #[tokio::test]
554 async fn test_offset_record_batch() {
555 let memory_exec = Arc::new(prepare_test_data());
556 let normalize_exec = Arc::new(SeriesNormalizeExec {
557 offset: 1_000,
558 time_index_column_name: TIME_INDEX_COLUMN.to_string(),
559 need_filter_out_nan: true,
560 input: memory_exec,
561 metric: ExecutionPlanMetricsSet::new(),
562 tag_columns: vec!["path".to_string()],
563 });
564 let session_context = SessionContext::default();
565 let result = datafusion::physical_plan::collect(normalize_exec, session_context.task_ctx())
566 .await
567 .unwrap();
568 let result_literal = datatypes::arrow::util::pretty::pretty_format_batches(&result)
569 .unwrap()
570 .to_string();
571
572 let expected = String::from(
573 "+---------------------+--------+------+\
574 \n| timestamp | value | path |\
575 \n+---------------------+--------+------+\
576 \n| 1970-01-01T00:01:01 | 0.0 | foo |\
577 \n| 1970-01-01T00:02:01 | 1.0 | foo |\
578 \n| 1970-01-01T00:00:01 | 10.0 | foo |\
579 \n| 1970-01-01T00:00:31 | 100.0 | foo |\
580 \n| 1970-01-01T00:01:31 | 1000.0 | foo |\
581 \n+---------------------+--------+------+",
582 );
583
584 assert_eq!(result_literal, expected);
585 }
586}