table/
table.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17
18use common_recordbatch::SendableRecordBatchStream;
19use datafusion::execution::FunctionRegistry;
20use datafusion::logical_expr::Cast;
21use datafusion::logical_expr::expr::ScalarFunction;
22use datafusion::prelude::SessionContext;
23use datafusion_expr::expr::Expr;
24use datatypes::data_type::DataType;
25use datatypes::prelude::ConcreteDataType;
26use datatypes::schema::constraint::{CURRENT_TIMESTAMP, CURRENT_TIMESTAMP_FN, NOW_FN};
27use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema, SchemaRef};
28use lazy_static::lazy_static;
29use snafu::ResultExt;
30use store_api::data_source::DataSourceRef;
31use store_api::storage::ScanRequest;
32
33use crate::error::{Result, TablesRecordBatchSnafu};
34use crate::metadata::{FilterPushDownType, TableInfoRef, TableType};
35
36pub mod adapter;
37mod metrics;
38pub mod numbers;
39pub mod scan;
40
41lazy_static! {
42    /// The [`Expr`] to call UDF function `now()`.
43    static ref NOW_EXPR: Expr = {
44        let ctx = SessionContext::new();
45
46        let now_udf = ctx.udf("now").expect("now UDF not found");
47
48        Expr::ScalarFunction(ScalarFunction {
49            func: now_udf,
50            args: vec![],
51        })
52    };
53}
54
55/// Defines partition rules for a table.
56/// TODO(discord9): add the entire partition exprs rules here later
57pub struct PartitionRules {
58    /// The physical partition columns that are not in the logical table.
59    /// only used in kvbackend manager to store the physical partition columns that are not in the logical table.
60    /// This is used to avoid the partition columns in the physical table that are not in the logical table
61    /// to prevent certain optimizations, if table is not a logical table, this should be empty
62    pub extra_phy_cols_not_in_logical_table: Vec<String>,
63}
64
65pub type TableRef = Arc<Table>;
66
67/// Table handle.
68pub struct Table {
69    table_info: TableInfoRef,
70    filter_pushdown: FilterPushDownType,
71    data_source: DataSourceRef,
72    /// Columns default [`Expr`]
73    column_defaults: HashMap<String, Expr>,
74    partition_rules: Option<PartitionRules>,
75}
76
77impl Table {
78    pub fn new(
79        table_info: TableInfoRef,
80        filter_pushdown: FilterPushDownType,
81        data_source: DataSourceRef,
82    ) -> Self {
83        Self {
84            column_defaults: collect_column_defaults(table_info.meta.schema.column_schemas()),
85            table_info,
86            filter_pushdown,
87            data_source,
88            partition_rules: None,
89        }
90    }
91
92    pub fn new_partitioned(
93        table_info: TableInfoRef,
94        filter_pushdown: FilterPushDownType,
95        data_source: DataSourceRef,
96        partition_rules: Option<PartitionRules>,
97    ) -> Self {
98        Self {
99            column_defaults: collect_column_defaults(table_info.meta.schema.column_schemas()),
100            table_info,
101            filter_pushdown,
102            data_source,
103            partition_rules,
104        }
105    }
106
107    /// Get column default [`Expr`], if available.
108    pub fn get_column_default(&self, column: &str) -> Option<&Expr> {
109        self.column_defaults.get(column)
110    }
111
112    pub fn data_source(&self) -> DataSourceRef {
113        self.data_source.clone()
114    }
115
116    /// Get a reference to the schema for this table.
117    pub fn schema(&self) -> SchemaRef {
118        self.table_info.meta.schema.clone()
119    }
120
121    pub fn schema_ref(&self) -> &SchemaRef {
122        &self.table_info.meta.schema
123    }
124
125    /// Get a reference to the table info.
126    pub fn table_info(&self) -> TableInfoRef {
127        self.table_info.clone()
128    }
129
130    /// Get the type of this table for metadata/catalog purposes.
131    pub fn table_type(&self) -> TableType {
132        self.table_info.table_type
133    }
134
135    pub fn partition_rules(&self) -> Option<&PartitionRules> {
136        self.partition_rules.as_ref()
137    }
138
139    pub async fn scan_to_stream(&self, request: ScanRequest) -> Result<SendableRecordBatchStream> {
140        self.data_source
141            .get_stream(request)
142            .context(TablesRecordBatchSnafu)
143    }
144
145    /// Tests whether the table provider can make use of any or all filter expressions
146    /// to optimise data retrieval.
147    pub fn supports_filters_pushdown(&self, filters: &[&Expr]) -> Result<Vec<FilterPushDownType>> {
148        Ok(vec![self.filter_pushdown; filters.len()])
149    }
150
151    /// Get primary key columns in the definition order.
152    pub fn primary_key_columns(&self) -> impl Iterator<Item = ColumnSchema> + '_ {
153        self.table_info
154            .meta
155            .primary_key_indices
156            .iter()
157            .map(|i| self.table_info.meta.schema.column_schemas()[*i].clone())
158    }
159
160    /// Get field columns in the definition order.
161    pub fn field_columns(&self) -> impl Iterator<Item = ColumnSchema> + '_ {
162        // `value_indices` in TableMeta is not reliable. Do a filter here.
163        let primary_keys = self
164            .table_info
165            .meta
166            .primary_key_indices
167            .iter()
168            .copied()
169            .collect::<HashSet<_>>();
170
171        self.table_info
172            .meta
173            .schema
174            .column_schemas()
175            .iter()
176            .enumerate()
177            .filter(move |(i, c)| !primary_keys.contains(i) && !c.is_time_index())
178            .map(|(_, c)| c.clone())
179    }
180}
181
182/// Collects column default [`Expr`] from column schemas.
183fn collect_column_defaults(column_schemas: &[ColumnSchema]) -> HashMap<String, Expr> {
184    column_schemas
185        .iter()
186        .filter_map(|column_schema| {
187            default_constraint_to_expr(
188                column_schema.default_constraint()?,
189                &column_schema.data_type,
190            )
191            .map(|expr| (column_schema.name.clone(), expr))
192        })
193        .collect()
194}
195
196/// Try to cast the [`ColumnDefaultConstraint`] to [`Expr`] by the target data type.
197fn default_constraint_to_expr(
198    default_constraint: &ColumnDefaultConstraint,
199    target_type: &ConcreteDataType,
200) -> Option<Expr> {
201    match default_constraint {
202        ColumnDefaultConstraint::Value(v) => v
203            .try_to_scalar_value(target_type)
204            .ok()
205            .map(|x| Expr::Literal(x, None)),
206
207        ColumnDefaultConstraint::Function(name)
208            if matches!(
209                name.as_str(),
210                CURRENT_TIMESTAMP | CURRENT_TIMESTAMP_FN | NOW_FN
211            ) =>
212        {
213            Some(Expr::Cast(Cast {
214                expr: Box::new(NOW_EXPR.clone()),
215                data_type: target_type.as_arrow_type(),
216            }))
217        }
218
219        ColumnDefaultConstraint::Function(_) => None,
220    }
221}
222
223#[cfg(test)]
224mod tests {
225    use datafusion_common::ScalarValue;
226    use datatypes::prelude::ConcreteDataType;
227    use datatypes::schema::ColumnDefaultConstraint;
228
229    use super::*;
230
231    #[test]
232    fn test_collect_columns_defaults() {
233        let column_schemas = [
234            ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), false),
235            ColumnSchema::new("col2", ConcreteDataType::string_datatype(), true)
236                .with_default_constraint(Some(ColumnDefaultConstraint::Value("test".into())))
237                .unwrap(),
238            ColumnSchema::new(
239                "ts",
240                ConcreteDataType::timestamp_millisecond_datatype(),
241                false,
242            )
243            .with_time_index(true)
244            .with_default_constraint(Some(ColumnDefaultConstraint::Function(
245                "current_timestamp".to_string(),
246            )))
247            .unwrap(),
248        ];
249        let column_defaults = collect_column_defaults(&column_schemas[..]);
250
251        assert!(!column_defaults.contains_key("col1"));
252        assert!(matches!(column_defaults.get("col2").unwrap(),
253                         Expr::Literal(ScalarValue::Utf8(Some(s)), _) if s == "test"));
254        assert!(matches!(
255            column_defaults.get("ts").unwrap(),
256            Expr::Cast(Cast {
257                expr,
258                data_type
259            }) if **expr == *NOW_EXPR && *data_type == ConcreteDataType::timestamp_millisecond_datatype().as_arrow_type()
260        ));
261    }
262}