promql/
extension_plan.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod absent;
16mod empty_metric;
17mod histogram_fold;
18mod instant_manipulate;
19mod normalize;
20mod planner;
21mod range_manipulate;
22mod scalar_calculate;
23mod series_divide;
24#[cfg(test)]
25mod test_util;
26mod union_distinct_on;
27
28pub use absent::{Absent, AbsentExec, AbsentStream};
29use datafusion::arrow::datatypes::{ArrowPrimitiveType, TimestampMillisecondType};
30use datafusion::common::DFSchemaRef;
31use datafusion::error::{DataFusionError, Result as DataFusionResult};
32pub use empty_metric::{EmptyMetric, EmptyMetricExec, EmptyMetricStream, build_special_time_expr};
33pub use histogram_fold::{HistogramFold, HistogramFoldExec, HistogramFoldStream};
34pub use instant_manipulate::{InstantManipulate, InstantManipulateExec, InstantManipulateStream};
35pub use normalize::{SeriesNormalize, SeriesNormalizeExec, SeriesNormalizeStream};
36pub use planner::PromExtensionPlanner;
37pub use range_manipulate::{RangeManipulate, RangeManipulateExec, RangeManipulateStream};
38pub use scalar_calculate::ScalarCalculate;
39pub use series_divide::{SeriesDivide, SeriesDivideExec, SeriesDivideStream};
40pub use union_distinct_on::{UnionDistinctOn, UnionDistinctOnExec, UnionDistinctOnStream};
41
42pub type Millisecond = <TimestampMillisecondType as ArrowPrimitiveType>::Native;
43
44const METRIC_NUM_SERIES: &str = "num_series";
45
46/// Utilities for handling unfix logic in extension plans
47/// Convert column name to index for serialization
48pub fn serialize_column_index(schema: &DFSchemaRef, column_name: &str) -> u64 {
49    schema
50        .index_of_column_by_name(None, column_name)
51        .map(|idx| idx as u64)
52        .unwrap_or(u64::MAX) // make sure if not found, it will report error in deserialization
53}
54
55/// Convert index back to column name for deserialization
56pub fn resolve_column_name(
57    index: u64,
58    schema: &DFSchemaRef,
59    context: &str,
60    column_type: &str,
61) -> DataFusionResult<String> {
62    let columns = schema.columns();
63    columns
64        .get(index as usize)
65        .ok_or_else(|| {
66            DataFusionError::Internal(format!(
67                "Failed to get {} column at idx {} during unfixing {} with columns:{:?}",
68                column_type, index, context, columns
69            ))
70        })
71        .map(|field| field.name().to_string())
72}
73
74/// Batch process multiple column indices
75pub fn resolve_column_names(
76    indices: &[u64],
77    schema: &DFSchemaRef,
78    context: &str,
79    column_type: &str,
80) -> DataFusionResult<Vec<String>> {
81    indices
82        .iter()
83        .map(|idx| resolve_column_name(*idx, schema, context, column_type))
84        .collect()
85}