table/
table.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17
18use common_recordbatch::SendableRecordBatchStream;
19use datafusion::execution::FunctionRegistry;
20use datafusion::logical_expr::expr::ScalarFunction;
21use datafusion::logical_expr::Cast;
22use datafusion::prelude::SessionContext;
23use datafusion_expr::expr::Expr;
24use datatypes::data_type::DataType;
25use datatypes::prelude::ConcreteDataType;
26use datatypes::schema::constraint::{CURRENT_TIMESTAMP, CURRENT_TIMESTAMP_FN, NOW_FN};
27use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema, SchemaRef};
28use lazy_static::lazy_static;
29use snafu::ResultExt;
30use store_api::data_source::DataSourceRef;
31use store_api::storage::ScanRequest;
32
33use crate::error::{Result, TablesRecordBatchSnafu};
34use crate::metadata::{FilterPushDownType, TableInfoRef, TableType};
35
36pub mod adapter;
37mod metrics;
38pub mod numbers;
39pub mod scan;
40
41lazy_static! {
42    /// The [`Expr`] to call UDF function `now()`.
43    static ref NOW_EXPR: Expr = {
44        let ctx = SessionContext::new();
45
46        let now_udf = ctx.udf("now").expect("now UDF not found");
47
48        Expr::ScalarFunction(ScalarFunction {
49            func: now_udf,
50            args: vec![],
51        })
52    };
53}
54
55/// Defines partition rules for a table.
56/// TODO(discord9): add the entire partition exprs rules here later
57pub struct PartitionRules {
58    /// The physical partition columns that are not in the logical table.
59    /// only used in kvbackend manager to store the physical partition columns that are not in the logical table.
60    /// This is used to avoid the partition columns in the physical table that are not in the logical table
61    /// to prevent certain optimizations, if table is not a logical table, this should be empty
62    pub extra_phy_cols_not_in_logical_table: Vec<String>,
63}
64
65pub type TableRef = Arc<Table>;
66
67/// Table handle.
68pub struct Table {
69    table_info: TableInfoRef,
70    filter_pushdown: FilterPushDownType,
71    data_source: DataSourceRef,
72    /// Columns default [`Expr`]
73    column_defaults: HashMap<String, Expr>,
74    partition_rules: Option<PartitionRules>,
75}
76
77impl Table {
78    pub fn new(
79        table_info: TableInfoRef,
80        filter_pushdown: FilterPushDownType,
81        data_source: DataSourceRef,
82    ) -> Self {
83        Self {
84            column_defaults: collect_column_defaults(table_info.meta.schema.column_schemas()),
85            table_info,
86            filter_pushdown,
87            data_source,
88            partition_rules: None,
89        }
90    }
91
92    pub fn new_partitioned(
93        table_info: TableInfoRef,
94        filter_pushdown: FilterPushDownType,
95        data_source: DataSourceRef,
96        partition_rules: Option<PartitionRules>,
97    ) -> Self {
98        Self {
99            column_defaults: collect_column_defaults(table_info.meta.schema.column_schemas()),
100            table_info,
101            filter_pushdown,
102            data_source,
103            partition_rules,
104        }
105    }
106
107    /// Get column default [`Expr`], if available.
108    pub fn get_column_default(&self, column: &str) -> Option<&Expr> {
109        self.column_defaults.get(column)
110    }
111
112    pub fn data_source(&self) -> DataSourceRef {
113        self.data_source.clone()
114    }
115
116    /// Get a reference to the schema for this table.
117    pub fn schema(&self) -> SchemaRef {
118        self.table_info.meta.schema.clone()
119    }
120
121    /// Get a reference to the table info.
122    pub fn table_info(&self) -> TableInfoRef {
123        self.table_info.clone()
124    }
125
126    /// Get the type of this table for metadata/catalog purposes.
127    pub fn table_type(&self) -> TableType {
128        self.table_info.table_type
129    }
130
131    pub fn partition_rules(&self) -> Option<&PartitionRules> {
132        self.partition_rules.as_ref()
133    }
134
135    pub async fn scan_to_stream(&self, request: ScanRequest) -> Result<SendableRecordBatchStream> {
136        self.data_source
137            .get_stream(request)
138            .context(TablesRecordBatchSnafu)
139    }
140
141    /// Tests whether the table provider can make use of any or all filter expressions
142    /// to optimise data retrieval.
143    pub fn supports_filters_pushdown(&self, filters: &[&Expr]) -> Result<Vec<FilterPushDownType>> {
144        Ok(vec![self.filter_pushdown; filters.len()])
145    }
146
147    /// Get primary key columns in the definition order.
148    pub fn primary_key_columns(&self) -> impl Iterator<Item = ColumnSchema> + '_ {
149        self.table_info
150            .meta
151            .primary_key_indices
152            .iter()
153            .map(|i| self.table_info.meta.schema.column_schemas()[*i].clone())
154    }
155
156    /// Get field columns in the definition order.
157    pub fn field_columns(&self) -> impl Iterator<Item = ColumnSchema> + '_ {
158        // `value_indices` in TableMeta is not reliable. Do a filter here.
159        let primary_keys = self
160            .table_info
161            .meta
162            .primary_key_indices
163            .iter()
164            .copied()
165            .collect::<HashSet<_>>();
166
167        self.table_info
168            .meta
169            .schema
170            .column_schemas()
171            .iter()
172            .enumerate()
173            .filter(move |(i, c)| !primary_keys.contains(i) && !c.is_time_index())
174            .map(|(_, c)| c.clone())
175    }
176}
177
178/// Collects column default [`Expr`] from column schemas.
179fn collect_column_defaults(column_schemas: &[ColumnSchema]) -> HashMap<String, Expr> {
180    column_schemas
181        .iter()
182        .filter_map(|column_schema| {
183            default_constraint_to_expr(
184                column_schema.default_constraint()?,
185                &column_schema.data_type,
186            )
187            .map(|expr| (column_schema.name.to_string(), expr))
188        })
189        .collect()
190}
191
192/// Try to cast the [`ColumnDefaultConstraint`] to [`Expr`] by the target data type.
193fn default_constraint_to_expr(
194    default_constraint: &ColumnDefaultConstraint,
195    target_type: &ConcreteDataType,
196) -> Option<Expr> {
197    match default_constraint {
198        ColumnDefaultConstraint::Value(v) => v
199            .try_to_scalar_value(target_type)
200            .ok()
201            .map(|x| Expr::Literal(x, None)),
202
203        ColumnDefaultConstraint::Function(name)
204            if matches!(
205                name.as_str(),
206                CURRENT_TIMESTAMP | CURRENT_TIMESTAMP_FN | NOW_FN
207            ) =>
208        {
209            Some(Expr::Cast(Cast {
210                expr: Box::new(NOW_EXPR.clone()),
211                data_type: target_type.as_arrow_type(),
212            }))
213        }
214
215        ColumnDefaultConstraint::Function(_) => None,
216    }
217}
218
219#[cfg(test)]
220mod tests {
221    use datafusion_common::ScalarValue;
222    use datatypes::prelude::ConcreteDataType;
223    use datatypes::schema::ColumnDefaultConstraint;
224
225    use super::*;
226
227    #[test]
228    fn test_collect_columns_defaults() {
229        let column_schemas = vec![
230            ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), false),
231            ColumnSchema::new("col2", ConcreteDataType::string_datatype(), true)
232                .with_default_constraint(Some(ColumnDefaultConstraint::Value("test".into())))
233                .unwrap(),
234            ColumnSchema::new(
235                "ts",
236                ConcreteDataType::timestamp_millisecond_datatype(),
237                false,
238            )
239            .with_time_index(true)
240            .with_default_constraint(Some(ColumnDefaultConstraint::Function(
241                "current_timestamp".to_string(),
242            )))
243            .unwrap(),
244        ];
245        let column_defaults = collect_column_defaults(&column_schemas[..]);
246
247        assert!(!column_defaults.contains_key("col1"));
248        assert!(matches!(column_defaults.get("col2").unwrap(),
249                         Expr::Literal(ScalarValue::Utf8(Some(s)), _) if s == "test"));
250        assert!(matches!(
251            column_defaults.get("ts").unwrap(),
252            Expr::Cast(Cast {
253                expr,
254                data_type
255            }) if **expr == *NOW_EXPR && *data_type == ConcreteDataType::timestamp_millisecond_datatype().as_arrow_type()
256        ));
257    }
258}