promql/extension_plan/
normalize.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16use std::pin::Pin;
17use std::sync::Arc;
18use std::task::{Context, Poll};
19
20use datafusion::arrow::array::{BooleanArray, Float64Array};
21use datafusion::arrow::compute;
22use datafusion::common::{DFSchema, DFSchemaRef, Result as DataFusionResult, Statistics};
23use datafusion::error::DataFusionError;
24use datafusion::execution::context::TaskContext;
25use datafusion::logical_expr::{EmptyRelation, Expr, LogicalPlan, UserDefinedLogicalNodeCore};
26use datafusion::physical_plan::expressions::Column as ColumnExpr;
27use datafusion::physical_plan::metrics::{
28    BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, MetricValue, MetricsSet,
29};
30use datafusion::physical_plan::{
31    DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, PlanProperties, RecordBatchStream,
32    SendableRecordBatchStream,
33};
34use datafusion_expr::col;
35use datatypes::arrow::array::TimestampMillisecondArray;
36use datatypes::arrow::datatypes::SchemaRef;
37use datatypes::arrow::record_batch::RecordBatch;
38use futures::{Stream, StreamExt, ready};
39use greptime_proto::substrait_extension as pb;
40use prost::Message;
41use snafu::ResultExt;
42
43use crate::error::{DeserializeSnafu, Result};
44use crate::extension_plan::{
45    METRIC_NUM_SERIES, Millisecond, resolve_column_name, serialize_column_index,
46};
47use crate::metrics::PROMQL_SERIES_COUNT;
48
49/// Normalize the input record batch. Notice that for simplicity, this method assumes
50/// the input batch only contains sample points from one time series.
51///
52/// Roughly speaking, this method does these things:
53/// - bias sample's timestamp by offset
54/// - sort the record batch based on timestamp column
55/// - remove NaN values (optional)
56#[derive(Debug, PartialEq, Eq, Hash, PartialOrd)]
57pub struct SeriesNormalize {
58    offset: Millisecond,
59    time_index_column_name: String,
60    need_filter_out_nan: bool,
61    tag_columns: Vec<String>,
62
63    input: LogicalPlan,
64    unfix: Option<UnfixIndices>,
65}
66
67#[derive(Debug, PartialEq, Eq, Hash, PartialOrd)]
68struct UnfixIndices {
69    pub time_index_idx: u64,
70    pub tag_column_indices: Vec<u64>,
71}
72
73impl UserDefinedLogicalNodeCore for SeriesNormalize {
74    fn name(&self) -> &str {
75        Self::name()
76    }
77
78    fn inputs(&self) -> Vec<&LogicalPlan> {
79        vec![&self.input]
80    }
81
82    fn schema(&self) -> &DFSchemaRef {
83        self.input.schema()
84    }
85
86    fn expressions(&self) -> Vec<datafusion::logical_expr::Expr> {
87        if self.unfix.is_some() {
88            return vec![];
89        }
90
91        self.tag_columns
92            .iter()
93            .map(col)
94            .chain(std::iter::once(col(&self.time_index_column_name)))
95            .collect()
96    }
97
98    fn necessary_children_exprs(&self, output_columns: &[usize]) -> Option<Vec<Vec<usize>>> {
99        if self.unfix.is_some() {
100            return None;
101        }
102
103        let input_schema = self.input.schema();
104        if output_columns.is_empty() {
105            let indices = (0..input_schema.fields().len()).collect::<Vec<_>>();
106            return Some(vec![indices]);
107        }
108
109        let mut required = Vec::with_capacity(output_columns.len() + 1 + self.tag_columns.len());
110        required.extend_from_slice(output_columns);
111        required.push(input_schema.index_of_column_by_name(None, &self.time_index_column_name)?);
112        for tag in &self.tag_columns {
113            required.push(input_schema.index_of_column_by_name(None, tag)?);
114        }
115
116        required.sort_unstable();
117        required.dedup();
118        Some(vec![required])
119    }
120
121    fn fmt_for_explain(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
122        write!(
123            f,
124            "PromSeriesNormalize: offset=[{}], time index=[{}], filter NaN: [{}]",
125            self.offset, self.time_index_column_name, self.need_filter_out_nan
126        )
127    }
128
129    fn with_exprs_and_inputs(
130        &self,
131        _exprs: Vec<Expr>,
132        inputs: Vec<LogicalPlan>,
133    ) -> DataFusionResult<Self> {
134        if inputs.is_empty() {
135            return Err(DataFusionError::Internal(
136                "SeriesNormalize should have at least one input".to_string(),
137            ));
138        }
139
140        let input: LogicalPlan = inputs.into_iter().next().unwrap();
141        let input_schema = input.schema();
142
143        if let Some(unfix) = &self.unfix {
144            // transform indices to names
145            let time_index_column_name = resolve_column_name(
146                unfix.time_index_idx,
147                input_schema,
148                "SeriesNormalize",
149                "time index",
150            )?;
151
152            let tag_columns = unfix
153                .tag_column_indices
154                .iter()
155                .map(|idx| resolve_column_name(*idx, input_schema, "SeriesNormalize", "tag"))
156                .collect::<DataFusionResult<Vec<String>>>()?;
157
158            Ok(Self {
159                offset: self.offset,
160                time_index_column_name,
161                need_filter_out_nan: self.need_filter_out_nan,
162                tag_columns,
163                input,
164                unfix: None,
165            })
166        } else {
167            Ok(Self {
168                offset: self.offset,
169                time_index_column_name: self.time_index_column_name.clone(),
170                need_filter_out_nan: self.need_filter_out_nan,
171                tag_columns: self.tag_columns.clone(),
172                input,
173                unfix: None,
174            })
175        }
176    }
177}
178
179impl SeriesNormalize {
180    pub fn new<N: AsRef<str>>(
181        offset: Millisecond,
182        time_index_column_name: N,
183        need_filter_out_nan: bool,
184        tag_columns: Vec<String>,
185        input: LogicalPlan,
186    ) -> Self {
187        Self {
188            offset,
189            time_index_column_name: time_index_column_name.as_ref().to_string(),
190            need_filter_out_nan,
191            tag_columns,
192            input,
193            unfix: None,
194        }
195    }
196
197    pub const fn name() -> &'static str {
198        "SeriesNormalize"
199    }
200
201    pub fn to_execution_plan(&self, exec_input: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
202        Arc::new(SeriesNormalizeExec {
203            offset: self.offset,
204            time_index_column_name: self.time_index_column_name.clone(),
205            need_filter_out_nan: self.need_filter_out_nan,
206            input: exec_input,
207            tag_columns: self.tag_columns.clone(),
208            metric: ExecutionPlanMetricsSet::new(),
209        })
210    }
211
212    pub fn serialize(&self) -> Vec<u8> {
213        let time_index_idx =
214            serialize_column_index(self.input.schema(), &self.time_index_column_name);
215
216        let tag_column_indices = self
217            .tag_columns
218            .iter()
219            .map(|name| serialize_column_index(self.input.schema(), name))
220            .collect::<Vec<u64>>();
221
222        pb::SeriesNormalize {
223            offset: self.offset,
224            time_index_idx,
225            filter_nan: self.need_filter_out_nan,
226            tag_column_indices,
227            ..Default::default()
228        }
229        .encode_to_vec()
230    }
231
232    pub fn deserialize(bytes: &[u8]) -> Result<Self> {
233        let pb_normalize = pb::SeriesNormalize::decode(bytes).context(DeserializeSnafu)?;
234        let placeholder_plan = LogicalPlan::EmptyRelation(EmptyRelation {
235            produce_one_row: false,
236            schema: Arc::new(DFSchema::empty()),
237        });
238
239        let unfix = UnfixIndices {
240            time_index_idx: pb_normalize.time_index_idx,
241            tag_column_indices: pb_normalize.tag_column_indices.clone(),
242        };
243
244        Ok(Self {
245            offset: pb_normalize.offset,
246            time_index_column_name: String::new(),
247            need_filter_out_nan: pb_normalize.filter_nan,
248            tag_columns: Vec::new(),
249            input: placeholder_plan,
250            unfix: Some(unfix),
251        })
252    }
253}
254
255#[derive(Debug)]
256pub struct SeriesNormalizeExec {
257    offset: Millisecond,
258    time_index_column_name: String,
259    need_filter_out_nan: bool,
260    tag_columns: Vec<String>,
261
262    input: Arc<dyn ExecutionPlan>,
263    metric: ExecutionPlanMetricsSet,
264}
265
266impl ExecutionPlan for SeriesNormalizeExec {
267    fn as_any(&self) -> &dyn Any {
268        self
269    }
270
271    fn schema(&self) -> SchemaRef {
272        self.input.schema()
273    }
274
275    fn required_input_distribution(&self) -> Vec<Distribution> {
276        if self.tag_columns.is_empty() {
277            return vec![Distribution::SinglePartition];
278        }
279
280        let schema = self.input.schema();
281        vec![Distribution::HashPartitioned(
282            self.tag_columns
283                .iter()
284                // Safety: the tag column names is verified in the planning phase
285                .map(|tag| Arc::new(ColumnExpr::new_with_schema(tag, &schema).unwrap()) as _)
286                .collect(),
287        )]
288    }
289
290    fn properties(&self) -> &Arc<PlanProperties> {
291        self.input.properties()
292    }
293
294    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
295        vec![&self.input]
296    }
297
298    fn with_new_children(
299        self: Arc<Self>,
300        children: Vec<Arc<dyn ExecutionPlan>>,
301    ) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
302        assert!(!children.is_empty());
303        Ok(Arc::new(Self {
304            offset: self.offset,
305            time_index_column_name: self.time_index_column_name.clone(),
306            need_filter_out_nan: self.need_filter_out_nan,
307            input: children[0].clone(),
308            tag_columns: self.tag_columns.clone(),
309            metric: self.metric.clone(),
310        }))
311    }
312
313    fn execute(
314        &self,
315        partition: usize,
316        context: Arc<TaskContext>,
317    ) -> DataFusionResult<SendableRecordBatchStream> {
318        let baseline_metric = BaselineMetrics::new(&self.metric, partition);
319        let metrics_builder = MetricBuilder::new(&self.metric);
320        let num_series = Count::new();
321        metrics_builder
322            .with_partition(partition)
323            .build(MetricValue::Count {
324                name: METRIC_NUM_SERIES.into(),
325                count: num_series.clone(),
326            });
327
328        let input = self.input.execute(partition, context)?;
329        let schema = input.schema();
330        let time_index = schema
331            .column_with_name(&self.time_index_column_name)
332            .expect("time index column not found")
333            .0;
334        Ok(Box::pin(SeriesNormalizeStream {
335            offset: self.offset,
336            time_index,
337            need_filter_out_nan: self.need_filter_out_nan,
338            schema,
339            input,
340            metric: baseline_metric,
341            num_series,
342        }))
343    }
344
345    fn metrics(&self) -> Option<MetricsSet> {
346        Some(self.metric.clone_inner())
347    }
348
349    fn partition_statistics(&self, partition: Option<usize>) -> DataFusionResult<Statistics> {
350        self.input.partition_statistics(partition)
351    }
352
353    fn name(&self) -> &str {
354        "SeriesNormalizeExec"
355    }
356}
357
358impl DisplayAs for SeriesNormalizeExec {
359    fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
360        match t {
361            DisplayFormatType::Default
362            | DisplayFormatType::Verbose
363            | DisplayFormatType::TreeRender => {
364                write!(
365                    f,
366                    "PromSeriesNormalizeExec: offset=[{}], time index=[{}], filter NaN: [{}]",
367                    self.offset, self.time_index_column_name, self.need_filter_out_nan
368                )
369            }
370        }
371    }
372}
373
374pub struct SeriesNormalizeStream {
375    offset: Millisecond,
376    // Column index of TIME INDEX column's position in schema
377    time_index: usize,
378    need_filter_out_nan: bool,
379
380    schema: SchemaRef,
381    input: SendableRecordBatchStream,
382    metric: BaselineMetrics,
383    /// Number of series processed.
384    num_series: Count,
385}
386
387impl SeriesNormalizeStream {
388    pub fn normalize(&self, input: RecordBatch) -> DataFusionResult<RecordBatch> {
389        let ts_column = input
390            .column(self.time_index)
391            .as_any()
392            .downcast_ref::<TimestampMillisecondArray>()
393            .ok_or_else(|| {
394                DataFusionError::Execution(
395                    "Time index Column downcast to TimestampMillisecondArray failed".into(),
396                )
397            })?;
398
399        // bias the timestamp column by offset
400        let ts_column_biased = if self.offset == 0 {
401            Arc::new(ts_column.clone()) as _
402        } else {
403            Arc::new(TimestampMillisecondArray::from_iter(
404                ts_column.iter().map(|ts| ts.map(|ts| ts + self.offset)),
405            ))
406        };
407        let mut columns = input.columns().to_vec();
408        columns[self.time_index] = ts_column_biased;
409
410        let result_batch = RecordBatch::try_new(input.schema(), columns)?;
411        if !self.need_filter_out_nan {
412            return Ok(result_batch);
413        }
414
415        // TODO(ruihang): consider the "special NaN"
416        // filter out NaN
417        let mut filter = vec![true; input.num_rows()];
418        for column in result_batch.columns() {
419            if let Some(float_column) = column.as_any().downcast_ref::<Float64Array>() {
420                for (i, flag) in filter.iter_mut().enumerate() {
421                    if float_column.value(i).is_nan() {
422                        *flag = false;
423                    }
424                }
425            }
426        }
427
428        let result = compute::filter_record_batch(&result_batch, &BooleanArray::from(filter))
429            .map_err(|e| DataFusionError::ArrowError(Box::new(e), None))?;
430        Ok(result)
431    }
432}
433
434impl RecordBatchStream for SeriesNormalizeStream {
435    fn schema(&self) -> SchemaRef {
436        self.schema.clone()
437    }
438}
439
440impl Stream for SeriesNormalizeStream {
441    type Item = DataFusionResult<RecordBatch>;
442
443    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
444        let poll = match ready!(self.input.poll_next_unpin(cx)) {
445            Some(Ok(batch)) => {
446                self.num_series.add(1);
447                let timer = std::time::Instant::now();
448                let result = Ok(batch).and_then(|batch| self.normalize(batch));
449                self.metric.elapsed_compute().add_elapsed(timer);
450                Poll::Ready(Some(result))
451            }
452            None => {
453                PROMQL_SERIES_COUNT.observe(self.num_series.value() as f64);
454                Poll::Ready(None)
455            }
456            Some(Err(e)) => Poll::Ready(Some(Err(e))),
457        };
458        self.metric.record_poll(poll)
459    }
460}
461
462#[cfg(test)]
463mod test {
464    use datafusion::arrow::array::Float64Array;
465    use datafusion::arrow::datatypes::{
466        ArrowPrimitiveType, DataType, Field, Schema, TimestampMillisecondType,
467    };
468    use datafusion::common::ToDFSchema;
469    use datafusion::datasource::memory::MemorySourceConfig;
470    use datafusion::datasource::source::DataSourceExec;
471    use datafusion::logical_expr::{EmptyRelation, LogicalPlan};
472    use datafusion::prelude::SessionContext;
473    use datatypes::arrow::array::TimestampMillisecondArray;
474    use datatypes::arrow_array::StringArray;
475
476    use super::*;
477
478    const TIME_INDEX_COLUMN: &str = "timestamp";
479
480    fn prepare_test_data() -> DataSourceExec {
481        let schema = Arc::new(Schema::new(vec![
482            Field::new(TIME_INDEX_COLUMN, TimestampMillisecondType::DATA_TYPE, true),
483            Field::new("value", DataType::Float64, true),
484            Field::new("path", DataType::Utf8, true),
485        ]));
486        let timestamp_column = Arc::new(TimestampMillisecondArray::from(vec![
487            60_000, 120_000, 0, 30_000, 90_000,
488        ])) as _;
489        let field_column = Arc::new(Float64Array::from(vec![0.0, 1.0, 10.0, 100.0, 1000.0])) as _;
490        let path_column = Arc::new(StringArray::from(vec!["foo", "foo", "foo", "foo", "foo"])) as _;
491        let data = RecordBatch::try_new(
492            schema.clone(),
493            vec![timestamp_column, field_column, path_column],
494        )
495        .unwrap();
496
497        DataSourceExec::new(Arc::new(
498            MemorySourceConfig::try_new(&[vec![data]], schema, None).unwrap(),
499        ))
500    }
501
502    #[test]
503    fn pruning_should_keep_time_and_tag_columns_for_exec() {
504        let df_schema = prepare_test_data().schema().to_dfschema_ref().unwrap();
505        let input = LogicalPlan::EmptyRelation(EmptyRelation {
506            produce_one_row: false,
507            schema: df_schema,
508        });
509        let plan =
510            SeriesNormalize::new(0, TIME_INDEX_COLUMN, true, vec!["path".to_string()], input);
511
512        // Simulate a parent projection requesting only the `value` column.
513        let output_columns = [1usize];
514        let required = plan.necessary_children_exprs(&output_columns).unwrap();
515        let required = &required[0];
516        assert_eq!(required.as_slice(), &[0, 1, 2]);
517    }
518
519    #[tokio::test]
520    async fn test_sort_record_batch() {
521        let memory_exec = Arc::new(prepare_test_data());
522        let normalize_exec = Arc::new(SeriesNormalizeExec {
523            offset: 0,
524            time_index_column_name: TIME_INDEX_COLUMN.to_string(),
525            need_filter_out_nan: true,
526            input: memory_exec,
527            tag_columns: vec!["path".to_string()],
528            metric: ExecutionPlanMetricsSet::new(),
529        });
530        let session_context = SessionContext::default();
531        let result = datafusion::physical_plan::collect(normalize_exec, session_context.task_ctx())
532            .await
533            .unwrap();
534        let result_literal = datatypes::arrow::util::pretty::pretty_format_batches(&result)
535            .unwrap()
536            .to_string();
537
538        let expected = String::from(
539            "+---------------------+--------+------+\
540            \n| timestamp           | value  | path |\
541            \n+---------------------+--------+------+\
542            \n| 1970-01-01T00:01:00 | 0.0    | foo  |\
543            \n| 1970-01-01T00:02:00 | 1.0    | foo  |\
544            \n| 1970-01-01T00:00:00 | 10.0   | foo  |\
545            \n| 1970-01-01T00:00:30 | 100.0  | foo  |\
546            \n| 1970-01-01T00:01:30 | 1000.0 | foo  |\
547            \n+---------------------+--------+------+",
548        );
549
550        assert_eq!(result_literal, expected);
551    }
552
553    #[tokio::test]
554    async fn test_offset_record_batch() {
555        let memory_exec = Arc::new(prepare_test_data());
556        let normalize_exec = Arc::new(SeriesNormalizeExec {
557            offset: 1_000,
558            time_index_column_name: TIME_INDEX_COLUMN.to_string(),
559            need_filter_out_nan: true,
560            input: memory_exec,
561            metric: ExecutionPlanMetricsSet::new(),
562            tag_columns: vec!["path".to_string()],
563        });
564        let session_context = SessionContext::default();
565        let result = datafusion::physical_plan::collect(normalize_exec, session_context.task_ctx())
566            .await
567            .unwrap();
568        let result_literal = datatypes::arrow::util::pretty::pretty_format_batches(&result)
569            .unwrap()
570            .to_string();
571
572        let expected = String::from(
573            "+---------------------+--------+------+\
574            \n| timestamp           | value  | path |\
575            \n+---------------------+--------+------+\
576            \n| 1970-01-01T00:01:01 | 0.0    | foo  |\
577            \n| 1970-01-01T00:02:01 | 1.0    | foo  |\
578            \n| 1970-01-01T00:00:01 | 10.0   | foo  |\
579            \n| 1970-01-01T00:00:31 | 100.0  | foo  |\
580            \n| 1970-01-01T00:01:31 | 1000.0 | foo  |\
581            \n+---------------------+--------+------+",
582        );
583
584        assert_eq!(result_literal, expected);
585    }
586}