1use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17
18use common_recordbatch::SendableRecordBatchStream;
19use datafusion::execution::FunctionRegistry;
20use datafusion::logical_expr::expr::ScalarFunction;
21use datafusion::logical_expr::Cast;
22use datafusion::prelude::SessionContext;
23use datafusion_expr::expr::Expr;
24use datatypes::data_type::DataType;
25use datatypes::prelude::ConcreteDataType;
26use datatypes::schema::constraint::{CURRENT_TIMESTAMP, CURRENT_TIMESTAMP_FN, NOW_FN};
27use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema, SchemaRef};
28use lazy_static::lazy_static;
29use snafu::ResultExt;
30use store_api::data_source::DataSourceRef;
31use store_api::storage::ScanRequest;
32
33use crate::error::{Result, TablesRecordBatchSnafu};
34use crate::metadata::{FilterPushDownType, TableInfoRef, TableType};
35
36pub mod adapter;
37mod metrics;
38pub mod numbers;
39pub mod scan;
40
41lazy_static! {
42 static ref NOW_EXPR: Expr = {
44 let ctx = SessionContext::new();
45
46 let now_udf = ctx.udf("now").expect("now UDF not found");
47
48 Expr::ScalarFunction(ScalarFunction {
49 func: now_udf,
50 args: vec![],
51 })
52 };
53}
54
55pub struct PartitionRules {
58 pub extra_phy_cols_not_in_logical_table: Vec<String>,
63}
64
65pub type TableRef = Arc<Table>;
66
67pub struct Table {
69 table_info: TableInfoRef,
70 filter_pushdown: FilterPushDownType,
71 data_source: DataSourceRef,
72 column_defaults: HashMap<String, Expr>,
74 partition_rules: Option<PartitionRules>,
75}
76
77impl Table {
78 pub fn new(
79 table_info: TableInfoRef,
80 filter_pushdown: FilterPushDownType,
81 data_source: DataSourceRef,
82 ) -> Self {
83 Self {
84 column_defaults: collect_column_defaults(table_info.meta.schema.column_schemas()),
85 table_info,
86 filter_pushdown,
87 data_source,
88 partition_rules: None,
89 }
90 }
91
92 pub fn new_partitioned(
93 table_info: TableInfoRef,
94 filter_pushdown: FilterPushDownType,
95 data_source: DataSourceRef,
96 partition_rules: Option<PartitionRules>,
97 ) -> Self {
98 Self {
99 column_defaults: collect_column_defaults(table_info.meta.schema.column_schemas()),
100 table_info,
101 filter_pushdown,
102 data_source,
103 partition_rules,
104 }
105 }
106
107 pub fn get_column_default(&self, column: &str) -> Option<&Expr> {
109 self.column_defaults.get(column)
110 }
111
112 pub fn data_source(&self) -> DataSourceRef {
113 self.data_source.clone()
114 }
115
116 pub fn schema(&self) -> SchemaRef {
118 self.table_info.meta.schema.clone()
119 }
120
121 pub fn table_info(&self) -> TableInfoRef {
123 self.table_info.clone()
124 }
125
126 pub fn table_type(&self) -> TableType {
128 self.table_info.table_type
129 }
130
131 pub fn partition_rules(&self) -> Option<&PartitionRules> {
132 self.partition_rules.as_ref()
133 }
134
135 pub async fn scan_to_stream(&self, request: ScanRequest) -> Result<SendableRecordBatchStream> {
136 self.data_source
137 .get_stream(request)
138 .context(TablesRecordBatchSnafu)
139 }
140
141 pub fn supports_filters_pushdown(&self, filters: &[&Expr]) -> Result<Vec<FilterPushDownType>> {
144 Ok(vec![self.filter_pushdown; filters.len()])
145 }
146
147 pub fn primary_key_columns(&self) -> impl Iterator<Item = ColumnSchema> + '_ {
149 self.table_info
150 .meta
151 .primary_key_indices
152 .iter()
153 .map(|i| self.table_info.meta.schema.column_schemas()[*i].clone())
154 }
155
156 pub fn field_columns(&self) -> impl Iterator<Item = ColumnSchema> + '_ {
158 let primary_keys = self
160 .table_info
161 .meta
162 .primary_key_indices
163 .iter()
164 .copied()
165 .collect::<HashSet<_>>();
166
167 self.table_info
168 .meta
169 .schema
170 .column_schemas()
171 .iter()
172 .enumerate()
173 .filter(move |(i, c)| !primary_keys.contains(i) && !c.is_time_index())
174 .map(|(_, c)| c.clone())
175 }
176}
177
178fn collect_column_defaults(column_schemas: &[ColumnSchema]) -> HashMap<String, Expr> {
180 column_schemas
181 .iter()
182 .filter_map(|column_schema| {
183 default_constraint_to_expr(
184 column_schema.default_constraint()?,
185 &column_schema.data_type,
186 )
187 .map(|expr| (column_schema.name.to_string(), expr))
188 })
189 .collect()
190}
191
192fn default_constraint_to_expr(
194 default_constraint: &ColumnDefaultConstraint,
195 target_type: &ConcreteDataType,
196) -> Option<Expr> {
197 match default_constraint {
198 ColumnDefaultConstraint::Value(v) => v
199 .try_to_scalar_value(target_type)
200 .ok()
201 .map(|x| Expr::Literal(x, None)),
202
203 ColumnDefaultConstraint::Function(name)
204 if matches!(
205 name.as_str(),
206 CURRENT_TIMESTAMP | CURRENT_TIMESTAMP_FN | NOW_FN
207 ) =>
208 {
209 Some(Expr::Cast(Cast {
210 expr: Box::new(NOW_EXPR.clone()),
211 data_type: target_type.as_arrow_type(),
212 }))
213 }
214
215 ColumnDefaultConstraint::Function(_) => None,
216 }
217}
218
219#[cfg(test)]
220mod tests {
221 use datafusion_common::ScalarValue;
222 use datatypes::prelude::ConcreteDataType;
223 use datatypes::schema::ColumnDefaultConstraint;
224
225 use super::*;
226
227 #[test]
228 fn test_collect_columns_defaults() {
229 let column_schemas = vec![
230 ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), false),
231 ColumnSchema::new("col2", ConcreteDataType::string_datatype(), true)
232 .with_default_constraint(Some(ColumnDefaultConstraint::Value("test".into())))
233 .unwrap(),
234 ColumnSchema::new(
235 "ts",
236 ConcreteDataType::timestamp_millisecond_datatype(),
237 false,
238 )
239 .with_time_index(true)
240 .with_default_constraint(Some(ColumnDefaultConstraint::Function(
241 "current_timestamp".to_string(),
242 )))
243 .unwrap(),
244 ];
245 let column_defaults = collect_column_defaults(&column_schemas[..]);
246
247 assert!(!column_defaults.contains_key("col1"));
248 assert!(matches!(column_defaults.get("col2").unwrap(),
249 Expr::Literal(ScalarValue::Utf8(Some(s)), _) if s == "test"));
250 assert!(matches!(
251 column_defaults.get("ts").unwrap(),
252 Expr::Cast(Cast {
253 expr,
254 data_type
255 }) if **expr == *NOW_EXPR && *data_type == ConcreteDataType::timestamp_millisecond_datatype().as_arrow_type()
256 ));
257 }
258}