table/
table.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17
18use common_recordbatch::SendableRecordBatchStream;
19use datafusion::execution::FunctionRegistry;
20use datafusion::logical_expr::expr::ScalarFunction;
21use datafusion::logical_expr::Cast;
22use datafusion::prelude::SessionContext;
23use datafusion_expr::expr::Expr;
24use datatypes::data_type::DataType;
25use datatypes::prelude::ConcreteDataType;
26use datatypes::schema::constraint::{CURRENT_TIMESTAMP, CURRENT_TIMESTAMP_FN, NOW_FN};
27use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema, SchemaRef};
28use lazy_static::lazy_static;
29use snafu::ResultExt;
30use store_api::data_source::DataSourceRef;
31use store_api::storage::ScanRequest;
32
33use crate::error::{Result, TablesRecordBatchSnafu};
34use crate::metadata::{FilterPushDownType, TableInfoRef, TableType};
35
36pub mod adapter;
37mod metrics;
38pub mod numbers;
39pub mod scan;
40
41lazy_static! {
42    /// The [`Expr`] to call UDF function `now()`.
43    static ref NOW_EXPR: Expr = {
44        let ctx = SessionContext::new();
45
46        let now_udf = ctx.udf("now").expect("now UDF not found");
47
48        Expr::ScalarFunction(ScalarFunction {
49            func: now_udf,
50            args: vec![],
51        })
52    };
53}
54
55pub type TableRef = Arc<Table>;
56
57/// Table handle.
58pub struct Table {
59    table_info: TableInfoRef,
60    filter_pushdown: FilterPushDownType,
61    data_source: DataSourceRef,
62    /// Columns default [`Expr`]
63    column_defaults: HashMap<String, Expr>,
64}
65
66impl Table {
67    pub fn new(
68        table_info: TableInfoRef,
69        filter_pushdown: FilterPushDownType,
70        data_source: DataSourceRef,
71    ) -> Self {
72        Self {
73            column_defaults: collect_column_defaults(table_info.meta.schema.column_schemas()),
74            table_info,
75            filter_pushdown,
76            data_source,
77        }
78    }
79
80    /// Get column default [`Expr`], if available.
81    pub fn get_column_default(&self, column: &str) -> Option<&Expr> {
82        self.column_defaults.get(column)
83    }
84
85    pub fn data_source(&self) -> DataSourceRef {
86        self.data_source.clone()
87    }
88
89    /// Get a reference to the schema for this table.
90    pub fn schema(&self) -> SchemaRef {
91        self.table_info.meta.schema.clone()
92    }
93
94    /// Get a reference to the table info.
95    pub fn table_info(&self) -> TableInfoRef {
96        self.table_info.clone()
97    }
98
99    /// Get the type of this table for metadata/catalog purposes.
100    pub fn table_type(&self) -> TableType {
101        self.table_info.table_type
102    }
103
104    pub async fn scan_to_stream(&self, request: ScanRequest) -> Result<SendableRecordBatchStream> {
105        self.data_source
106            .get_stream(request)
107            .context(TablesRecordBatchSnafu)
108    }
109
110    /// Tests whether the table provider can make use of any or all filter expressions
111    /// to optimise data retrieval.
112    pub fn supports_filters_pushdown(&self, filters: &[&Expr]) -> Result<Vec<FilterPushDownType>> {
113        Ok(vec![self.filter_pushdown; filters.len()])
114    }
115
116    /// Get primary key columns in the definition order.
117    pub fn primary_key_columns(&self) -> impl Iterator<Item = ColumnSchema> + '_ {
118        self.table_info
119            .meta
120            .primary_key_indices
121            .iter()
122            .map(|i| self.table_info.meta.schema.column_schemas()[*i].clone())
123    }
124
125    /// Get field columns in the definition order.
126    pub fn field_columns(&self) -> impl Iterator<Item = ColumnSchema> + '_ {
127        // `value_indices` in TableMeta is not reliable. Do a filter here.
128        let primary_keys = self
129            .table_info
130            .meta
131            .primary_key_indices
132            .iter()
133            .copied()
134            .collect::<HashSet<_>>();
135
136        self.table_info
137            .meta
138            .schema
139            .column_schemas()
140            .iter()
141            .enumerate()
142            .filter(move |(i, c)| !primary_keys.contains(i) && !c.is_time_index())
143            .map(|(_, c)| c.clone())
144    }
145}
146
147/// Collects column default [`Expr`] from column schemas.
148fn collect_column_defaults(column_schemas: &[ColumnSchema]) -> HashMap<String, Expr> {
149    column_schemas
150        .iter()
151        .filter_map(|column_schema| {
152            default_constraint_to_expr(
153                column_schema.default_constraint()?,
154                &column_schema.data_type,
155            )
156            .map(|expr| (column_schema.name.to_string(), expr))
157        })
158        .collect()
159}
160
161/// Try to cast the [`ColumnDefaultConstraint`] to [`Expr`] by the target data type.
162fn default_constraint_to_expr(
163    default_constraint: &ColumnDefaultConstraint,
164    target_type: &ConcreteDataType,
165) -> Option<Expr> {
166    match default_constraint {
167        ColumnDefaultConstraint::Value(v) => {
168            v.try_to_scalar_value(target_type).ok().map(Expr::Literal)
169        }
170
171        ColumnDefaultConstraint::Function(name)
172            if matches!(
173                name.as_str(),
174                CURRENT_TIMESTAMP | CURRENT_TIMESTAMP_FN | NOW_FN
175            ) =>
176        {
177            Some(Expr::Cast(Cast {
178                expr: Box::new(NOW_EXPR.clone()),
179                data_type: target_type.as_arrow_type(),
180            }))
181        }
182
183        ColumnDefaultConstraint::Function(_) => None,
184    }
185}
186
187#[cfg(test)]
188mod tests {
189    use datafusion_common::ScalarValue;
190    use datatypes::prelude::ConcreteDataType;
191    use datatypes::schema::ColumnDefaultConstraint;
192
193    use super::*;
194
195    #[test]
196    fn test_collect_columns_defaults() {
197        let column_schemas = vec![
198            ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), false),
199            ColumnSchema::new("col2", ConcreteDataType::string_datatype(), true)
200                .with_default_constraint(Some(ColumnDefaultConstraint::Value("test".into())))
201                .unwrap(),
202            ColumnSchema::new(
203                "ts",
204                ConcreteDataType::timestamp_millisecond_datatype(),
205                false,
206            )
207            .with_time_index(true)
208            .with_default_constraint(Some(ColumnDefaultConstraint::Function(
209                "current_timestamp".to_string(),
210            )))
211            .unwrap(),
212        ];
213        let column_defaults = collect_column_defaults(&column_schemas[..]);
214
215        assert!(!column_defaults.contains_key("col1"));
216        assert!(matches!(column_defaults.get("col2").unwrap(),
217                         Expr::Literal(ScalarValue::Utf8(Some(s))) if s == "test"));
218        assert!(matches!(
219            column_defaults.get("ts").unwrap(),
220            Expr::Cast(Cast {
221                expr,
222                data_type
223            }) if **expr == *NOW_EXPR && *data_type == ConcreteDataType::timestamp_millisecond_datatype().as_arrow_type()
224        ));
225    }
226}