promql/extension_plan/
normalize.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16use std::pin::Pin;
17use std::sync::Arc;
18use std::task::{Context, Poll};
19
20use datafusion::arrow::array::{BooleanArray, Float64Array};
21use datafusion::arrow::compute;
22use datafusion::common::{DFSchema, DFSchemaRef, Result as DataFusionResult, Statistics};
23use datafusion::error::DataFusionError;
24use datafusion::execution::context::TaskContext;
25use datafusion::logical_expr::{EmptyRelation, Expr, LogicalPlan, UserDefinedLogicalNodeCore};
26use datafusion::physical_plan::expressions::Column as ColumnExpr;
27use datafusion::physical_plan::metrics::{
28    BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, MetricValue, MetricsSet,
29};
30use datafusion::physical_plan::{
31    DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, PlanProperties, RecordBatchStream,
32    SendableRecordBatchStream,
33};
34use datatypes::arrow::array::TimestampMillisecondArray;
35use datatypes::arrow::datatypes::SchemaRef;
36use datatypes::arrow::record_batch::RecordBatch;
37use futures::{Stream, StreamExt, ready};
38use greptime_proto::substrait_extension as pb;
39use prost::Message;
40use snafu::ResultExt;
41
42use crate::error::{DeserializeSnafu, Result};
43use crate::extension_plan::{
44    METRIC_NUM_SERIES, Millisecond, resolve_column_name, serialize_column_index,
45};
46use crate::metrics::PROMQL_SERIES_COUNT;
47
48/// Normalize the input record batch. Notice that for simplicity, this method assumes
49/// the input batch only contains sample points from one time series.
50///
51/// Roughly speaking, this method does these things:
52/// - bias sample's timestamp by offset
53/// - sort the record batch based on timestamp column
54/// - remove NaN values (optional)
55#[derive(Debug, PartialEq, Eq, Hash, PartialOrd)]
56pub struct SeriesNormalize {
57    offset: Millisecond,
58    time_index_column_name: String,
59    need_filter_out_nan: bool,
60    tag_columns: Vec<String>,
61
62    input: LogicalPlan,
63    unfix: Option<UnfixIndices>,
64}
65
66#[derive(Debug, PartialEq, Eq, Hash, PartialOrd)]
67struct UnfixIndices {
68    pub time_index_idx: u64,
69    pub tag_column_indices: Vec<u64>,
70}
71
72impl UserDefinedLogicalNodeCore for SeriesNormalize {
73    fn name(&self) -> &str {
74        Self::name()
75    }
76
77    fn inputs(&self) -> Vec<&LogicalPlan> {
78        vec![&self.input]
79    }
80
81    fn schema(&self) -> &DFSchemaRef {
82        self.input.schema()
83    }
84
85    fn expressions(&self) -> Vec<datafusion::logical_expr::Expr> {
86        vec![]
87    }
88
89    fn fmt_for_explain(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
90        write!(
91            f,
92            "PromSeriesNormalize: offset=[{}], time index=[{}], filter NaN: [{}]",
93            self.offset, self.time_index_column_name, self.need_filter_out_nan
94        )
95    }
96
97    fn with_exprs_and_inputs(
98        &self,
99        _exprs: Vec<Expr>,
100        inputs: Vec<LogicalPlan>,
101    ) -> DataFusionResult<Self> {
102        if inputs.is_empty() {
103            return Err(DataFusionError::Internal(
104                "SeriesNormalize should have at least one input".to_string(),
105            ));
106        }
107
108        let input: LogicalPlan = inputs.into_iter().next().unwrap();
109        let input_schema = input.schema();
110
111        if let Some(unfix) = &self.unfix {
112            // transform indices to names
113            let time_index_column_name = resolve_column_name(
114                unfix.time_index_idx,
115                input_schema,
116                "SeriesNormalize",
117                "time index",
118            )?;
119
120            let tag_columns = unfix
121                .tag_column_indices
122                .iter()
123                .map(|idx| resolve_column_name(*idx, input_schema, "SeriesNormalize", "tag"))
124                .collect::<DataFusionResult<Vec<String>>>()?;
125
126            Ok(Self {
127                offset: self.offset,
128                time_index_column_name,
129                need_filter_out_nan: self.need_filter_out_nan,
130                tag_columns,
131                input,
132                unfix: None,
133            })
134        } else {
135            Ok(Self {
136                offset: self.offset,
137                time_index_column_name: self.time_index_column_name.clone(),
138                need_filter_out_nan: self.need_filter_out_nan,
139                tag_columns: self.tag_columns.clone(),
140                input,
141                unfix: None,
142            })
143        }
144    }
145}
146
147impl SeriesNormalize {
148    pub fn new<N: AsRef<str>>(
149        offset: Millisecond,
150        time_index_column_name: N,
151        need_filter_out_nan: bool,
152        tag_columns: Vec<String>,
153        input: LogicalPlan,
154    ) -> Self {
155        Self {
156            offset,
157            time_index_column_name: time_index_column_name.as_ref().to_string(),
158            need_filter_out_nan,
159            tag_columns,
160            input,
161            unfix: None,
162        }
163    }
164
165    pub const fn name() -> &'static str {
166        "SeriesNormalize"
167    }
168
169    pub fn to_execution_plan(&self, exec_input: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
170        Arc::new(SeriesNormalizeExec {
171            offset: self.offset,
172            time_index_column_name: self.time_index_column_name.clone(),
173            need_filter_out_nan: self.need_filter_out_nan,
174            input: exec_input,
175            tag_columns: self.tag_columns.clone(),
176            metric: ExecutionPlanMetricsSet::new(),
177        })
178    }
179
180    pub fn serialize(&self) -> Vec<u8> {
181        let time_index_idx =
182            serialize_column_index(self.input.schema(), &self.time_index_column_name);
183
184        let tag_column_indices = self
185            .tag_columns
186            .iter()
187            .map(|name| serialize_column_index(self.input.schema(), name))
188            .collect::<Vec<u64>>();
189
190        pb::SeriesNormalize {
191            offset: self.offset,
192            time_index_idx,
193            filter_nan: self.need_filter_out_nan,
194            tag_column_indices,
195            ..Default::default()
196        }
197        .encode_to_vec()
198    }
199
200    pub fn deserialize(bytes: &[u8]) -> Result<Self> {
201        let pb_normalize = pb::SeriesNormalize::decode(bytes).context(DeserializeSnafu)?;
202        let placeholder_plan = LogicalPlan::EmptyRelation(EmptyRelation {
203            produce_one_row: false,
204            schema: Arc::new(DFSchema::empty()),
205        });
206
207        let unfix = UnfixIndices {
208            time_index_idx: pb_normalize.time_index_idx,
209            tag_column_indices: pb_normalize.tag_column_indices.clone(),
210        };
211
212        Ok(Self {
213            offset: pb_normalize.offset,
214            time_index_column_name: String::new(),
215            need_filter_out_nan: pb_normalize.filter_nan,
216            tag_columns: Vec::new(),
217            input: placeholder_plan,
218            unfix: Some(unfix),
219        })
220    }
221}
222
223#[derive(Debug)]
224pub struct SeriesNormalizeExec {
225    offset: Millisecond,
226    time_index_column_name: String,
227    need_filter_out_nan: bool,
228    tag_columns: Vec<String>,
229
230    input: Arc<dyn ExecutionPlan>,
231    metric: ExecutionPlanMetricsSet,
232}
233
234impl ExecutionPlan for SeriesNormalizeExec {
235    fn as_any(&self) -> &dyn Any {
236        self
237    }
238
239    fn schema(&self) -> SchemaRef {
240        self.input.schema()
241    }
242
243    fn required_input_distribution(&self) -> Vec<Distribution> {
244        let schema = self.input.schema();
245        vec![Distribution::HashPartitioned(
246            self.tag_columns
247                .iter()
248                // Safety: the tag column names is verified in the planning phase
249                .map(|tag| Arc::new(ColumnExpr::new_with_schema(tag, &schema).unwrap()) as _)
250                .collect(),
251        )]
252    }
253
254    fn properties(&self) -> &PlanProperties {
255        self.input.properties()
256    }
257
258    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
259        vec![&self.input]
260    }
261
262    fn with_new_children(
263        self: Arc<Self>,
264        children: Vec<Arc<dyn ExecutionPlan>>,
265    ) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
266        assert!(!children.is_empty());
267        Ok(Arc::new(Self {
268            offset: self.offset,
269            time_index_column_name: self.time_index_column_name.clone(),
270            need_filter_out_nan: self.need_filter_out_nan,
271            input: children[0].clone(),
272            tag_columns: self.tag_columns.clone(),
273            metric: self.metric.clone(),
274        }))
275    }
276
277    fn execute(
278        &self,
279        partition: usize,
280        context: Arc<TaskContext>,
281    ) -> DataFusionResult<SendableRecordBatchStream> {
282        let baseline_metric = BaselineMetrics::new(&self.metric, partition);
283        let metrics_builder = MetricBuilder::new(&self.metric);
284        let num_series = Count::new();
285        metrics_builder
286            .with_partition(partition)
287            .build(MetricValue::Count {
288                name: METRIC_NUM_SERIES.into(),
289                count: num_series.clone(),
290            });
291
292        let input = self.input.execute(partition, context)?;
293        let schema = input.schema();
294        let time_index = schema
295            .column_with_name(&self.time_index_column_name)
296            .expect("time index column not found")
297            .0;
298        Ok(Box::pin(SeriesNormalizeStream {
299            offset: self.offset,
300            time_index,
301            need_filter_out_nan: self.need_filter_out_nan,
302            schema,
303            input,
304            metric: baseline_metric,
305            num_series,
306        }))
307    }
308
309    fn metrics(&self) -> Option<MetricsSet> {
310        Some(self.metric.clone_inner())
311    }
312
313    fn partition_statistics(&self, partition: Option<usize>) -> DataFusionResult<Statistics> {
314        self.input.partition_statistics(partition)
315    }
316
317    fn name(&self) -> &str {
318        "SeriesNormalizeExec"
319    }
320}
321
322impl DisplayAs for SeriesNormalizeExec {
323    fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
324        match t {
325            DisplayFormatType::Default
326            | DisplayFormatType::Verbose
327            | DisplayFormatType::TreeRender => {
328                write!(
329                    f,
330                    "PromSeriesNormalizeExec: offset=[{}], time index=[{}], filter NaN: [{}]",
331                    self.offset, self.time_index_column_name, self.need_filter_out_nan
332                )
333            }
334        }
335    }
336}
337
338pub struct SeriesNormalizeStream {
339    offset: Millisecond,
340    // Column index of TIME INDEX column's position in schema
341    time_index: usize,
342    need_filter_out_nan: bool,
343
344    schema: SchemaRef,
345    input: SendableRecordBatchStream,
346    metric: BaselineMetrics,
347    /// Number of series processed.
348    num_series: Count,
349}
350
351impl SeriesNormalizeStream {
352    pub fn normalize(&self, input: RecordBatch) -> DataFusionResult<RecordBatch> {
353        let ts_column = input
354            .column(self.time_index)
355            .as_any()
356            .downcast_ref::<TimestampMillisecondArray>()
357            .ok_or_else(|| {
358                DataFusionError::Execution(
359                    "Time index Column downcast to TimestampMillisecondArray failed".into(),
360                )
361            })?;
362
363        // bias the timestamp column by offset
364        let ts_column_biased = if self.offset == 0 {
365            Arc::new(ts_column.clone()) as _
366        } else {
367            Arc::new(TimestampMillisecondArray::from_iter(
368                ts_column.iter().map(|ts| ts.map(|ts| ts - self.offset)),
369            ))
370        };
371        let mut columns = input.columns().to_vec();
372        columns[self.time_index] = ts_column_biased;
373
374        let result_batch = RecordBatch::try_new(input.schema(), columns)?;
375        if !self.need_filter_out_nan {
376            return Ok(result_batch);
377        }
378
379        // TODO(ruihang): consider the "special NaN"
380        // filter out NaN
381        let mut filter = vec![true; input.num_rows()];
382        for column in result_batch.columns() {
383            if let Some(float_column) = column.as_any().downcast_ref::<Float64Array>() {
384                for (i, flag) in filter.iter_mut().enumerate() {
385                    if float_column.value(i).is_nan() {
386                        *flag = false;
387                    }
388                }
389            }
390        }
391
392        let result = compute::filter_record_batch(&result_batch, &BooleanArray::from(filter))
393            .map_err(|e| DataFusionError::ArrowError(Box::new(e), None))?;
394        Ok(result)
395    }
396}
397
398impl RecordBatchStream for SeriesNormalizeStream {
399    fn schema(&self) -> SchemaRef {
400        self.schema.clone()
401    }
402}
403
404impl Stream for SeriesNormalizeStream {
405    type Item = DataFusionResult<RecordBatch>;
406
407    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
408        let poll = match ready!(self.input.poll_next_unpin(cx)) {
409            Some(Ok(batch)) => {
410                self.num_series.add(1);
411                let timer = std::time::Instant::now();
412                let result = Ok(batch).and_then(|batch| self.normalize(batch));
413                self.metric.elapsed_compute().add_elapsed(timer);
414                Poll::Ready(Some(result))
415            }
416            None => {
417                PROMQL_SERIES_COUNT.observe(self.num_series.value() as f64);
418                Poll::Ready(None)
419            }
420            Some(Err(e)) => Poll::Ready(Some(Err(e))),
421        };
422        self.metric.record_poll(poll)
423    }
424}
425
426#[cfg(test)]
427mod test {
428    use datafusion::arrow::array::Float64Array;
429    use datafusion::arrow::datatypes::{
430        ArrowPrimitiveType, DataType, Field, Schema, TimestampMillisecondType,
431    };
432    use datafusion::datasource::memory::MemorySourceConfig;
433    use datafusion::datasource::source::DataSourceExec;
434    use datafusion::prelude::SessionContext;
435    use datatypes::arrow::array::TimestampMillisecondArray;
436    use datatypes::arrow_array::StringArray;
437
438    use super::*;
439
440    const TIME_INDEX_COLUMN: &str = "timestamp";
441
442    fn prepare_test_data() -> DataSourceExec {
443        let schema = Arc::new(Schema::new(vec![
444            Field::new(TIME_INDEX_COLUMN, TimestampMillisecondType::DATA_TYPE, true),
445            Field::new("value", DataType::Float64, true),
446            Field::new("path", DataType::Utf8, true),
447        ]));
448        let timestamp_column = Arc::new(TimestampMillisecondArray::from(vec![
449            60_000, 120_000, 0, 30_000, 90_000,
450        ])) as _;
451        let field_column = Arc::new(Float64Array::from(vec![0.0, 1.0, 10.0, 100.0, 1000.0])) as _;
452        let path_column = Arc::new(StringArray::from(vec!["foo", "foo", "foo", "foo", "foo"])) as _;
453        let data = RecordBatch::try_new(
454            schema.clone(),
455            vec![timestamp_column, field_column, path_column],
456        )
457        .unwrap();
458
459        DataSourceExec::new(Arc::new(
460            MemorySourceConfig::try_new(&[vec![data]], schema, None).unwrap(),
461        ))
462    }
463
464    #[tokio::test]
465    async fn test_sort_record_batch() {
466        let memory_exec = Arc::new(prepare_test_data());
467        let normalize_exec = Arc::new(SeriesNormalizeExec {
468            offset: 0,
469            time_index_column_name: TIME_INDEX_COLUMN.to_string(),
470            need_filter_out_nan: true,
471            input: memory_exec,
472            tag_columns: vec!["path".to_string()],
473            metric: ExecutionPlanMetricsSet::new(),
474        });
475        let session_context = SessionContext::default();
476        let result = datafusion::physical_plan::collect(normalize_exec, session_context.task_ctx())
477            .await
478            .unwrap();
479        let result_literal = datatypes::arrow::util::pretty::pretty_format_batches(&result)
480            .unwrap()
481            .to_string();
482
483        let expected = String::from(
484            "+---------------------+--------+------+\
485            \n| timestamp           | value  | path |\
486            \n+---------------------+--------+------+\
487            \n| 1970-01-01T00:01:00 | 0.0    | foo  |\
488            \n| 1970-01-01T00:02:00 | 1.0    | foo  |\
489            \n| 1970-01-01T00:00:00 | 10.0   | foo  |\
490            \n| 1970-01-01T00:00:30 | 100.0  | foo  |\
491            \n| 1970-01-01T00:01:30 | 1000.0 | foo  |\
492            \n+---------------------+--------+------+",
493        );
494
495        assert_eq!(result_literal, expected);
496    }
497
498    #[tokio::test]
499    async fn test_offset_record_batch() {
500        let memory_exec = Arc::new(prepare_test_data());
501        let normalize_exec = Arc::new(SeriesNormalizeExec {
502            offset: 1_000,
503            time_index_column_name: TIME_INDEX_COLUMN.to_string(),
504            need_filter_out_nan: true,
505            input: memory_exec,
506            metric: ExecutionPlanMetricsSet::new(),
507            tag_columns: vec!["path".to_string()],
508        });
509        let session_context = SessionContext::default();
510        let result = datafusion::physical_plan::collect(normalize_exec, session_context.task_ctx())
511            .await
512            .unwrap();
513        let result_literal = datatypes::arrow::util::pretty::pretty_format_batches(&result)
514            .unwrap()
515            .to_string();
516
517        let expected = String::from(
518            "+---------------------+--------+------+\
519            \n| timestamp           | value  | path |\
520            \n+---------------------+--------+------+\
521            \n| 1970-01-01T00:00:59 | 0.0    | foo  |\
522            \n| 1970-01-01T00:01:59 | 1.0    | foo  |\
523            \n| 1969-12-31T23:59:59 | 10.0   | foo  |\
524            \n| 1970-01-01T00:00:29 | 100.0  | foo  |\
525            \n| 1970-01-01T00:01:29 | 1000.0 | foo  |\
526            \n+---------------------+--------+------+",
527        );
528
529        assert_eq!(result_literal, expected);
530    }
531}