1use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17
18use common_recordbatch::SendableRecordBatchStream;
19use datafusion::execution::FunctionRegistry;
20use datafusion::logical_expr::Cast;
21use datafusion::logical_expr::expr::ScalarFunction;
22use datafusion::prelude::SessionContext;
23use datafusion_expr::expr::Expr;
24use datatypes::data_type::DataType;
25use datatypes::prelude::ConcreteDataType;
26use datatypes::schema::constraint::{CURRENT_TIMESTAMP, CURRENT_TIMESTAMP_FN, NOW_FN};
27use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema, SchemaRef};
28use lazy_static::lazy_static;
29use snafu::ResultExt;
30use store_api::data_source::DataSourceRef;
31use store_api::storage::ScanRequest;
32
33use crate::error::{Result, TablesRecordBatchSnafu};
34use crate::metadata::{FilterPushDownType, TableInfoRef, TableType};
35
36pub mod adapter;
37mod metrics;
38pub mod numbers;
39pub mod scan;
40
41lazy_static! {
42 static ref NOW_EXPR: Expr = {
44 let ctx = SessionContext::new();
45
46 let now_udf = ctx.udf("now").expect("now UDF not found");
47
48 Expr::ScalarFunction(ScalarFunction {
49 func: now_udf,
50 args: vec![],
51 })
52 };
53}
54
55pub struct PartitionRules {
58 pub extra_phy_cols_not_in_logical_table: Vec<String>,
63}
64
65pub type TableRef = Arc<Table>;
66
67pub struct Table {
69 table_info: TableInfoRef,
70 filter_pushdown: FilterPushDownType,
71 data_source: DataSourceRef,
72 column_defaults: HashMap<String, Expr>,
74 partition_rules: Option<PartitionRules>,
75}
76
77impl Table {
78 pub fn new(
79 table_info: TableInfoRef,
80 filter_pushdown: FilterPushDownType,
81 data_source: DataSourceRef,
82 ) -> Self {
83 Self {
84 column_defaults: collect_column_defaults(table_info.meta.schema.column_schemas()),
85 table_info,
86 filter_pushdown,
87 data_source,
88 partition_rules: None,
89 }
90 }
91
92 pub fn new_partitioned(
93 table_info: TableInfoRef,
94 filter_pushdown: FilterPushDownType,
95 data_source: DataSourceRef,
96 partition_rules: Option<PartitionRules>,
97 ) -> Self {
98 Self {
99 column_defaults: collect_column_defaults(table_info.meta.schema.column_schemas()),
100 table_info,
101 filter_pushdown,
102 data_source,
103 partition_rules,
104 }
105 }
106
107 pub fn get_column_default(&self, column: &str) -> Option<&Expr> {
109 self.column_defaults.get(column)
110 }
111
112 pub fn data_source(&self) -> DataSourceRef {
113 self.data_source.clone()
114 }
115
116 pub fn schema(&self) -> SchemaRef {
118 self.table_info.meta.schema.clone()
119 }
120
121 pub fn schema_ref(&self) -> &SchemaRef {
122 &self.table_info.meta.schema
123 }
124
125 pub fn table_info(&self) -> TableInfoRef {
127 self.table_info.clone()
128 }
129
130 pub fn table_type(&self) -> TableType {
132 self.table_info.table_type
133 }
134
135 pub fn partition_rules(&self) -> Option<&PartitionRules> {
136 self.partition_rules.as_ref()
137 }
138
139 pub async fn scan_to_stream(&self, request: ScanRequest) -> Result<SendableRecordBatchStream> {
140 self.data_source
141 .get_stream(request)
142 .context(TablesRecordBatchSnafu)
143 }
144
145 pub fn supports_filters_pushdown(&self, filters: &[&Expr]) -> Result<Vec<FilterPushDownType>> {
148 Ok(vec![self.filter_pushdown; filters.len()])
149 }
150
151 pub fn primary_key_columns(&self) -> impl Iterator<Item = ColumnSchema> + '_ {
153 self.table_info
154 .meta
155 .primary_key_indices
156 .iter()
157 .map(|i| self.table_info.meta.schema.column_schemas()[*i].clone())
158 }
159
160 pub fn field_columns(&self) -> impl Iterator<Item = ColumnSchema> + '_ {
162 let primary_keys = self
164 .table_info
165 .meta
166 .primary_key_indices
167 .iter()
168 .copied()
169 .collect::<HashSet<_>>();
170
171 self.table_info
172 .meta
173 .schema
174 .column_schemas()
175 .iter()
176 .enumerate()
177 .filter(move |(i, c)| !primary_keys.contains(i) && !c.is_time_index())
178 .map(|(_, c)| c.clone())
179 }
180}
181
182fn collect_column_defaults(column_schemas: &[ColumnSchema]) -> HashMap<String, Expr> {
184 column_schemas
185 .iter()
186 .filter_map(|column_schema| {
187 default_constraint_to_expr(
188 column_schema.default_constraint()?,
189 &column_schema.data_type,
190 )
191 .map(|expr| (column_schema.name.clone(), expr))
192 })
193 .collect()
194}
195
196fn default_constraint_to_expr(
198 default_constraint: &ColumnDefaultConstraint,
199 target_type: &ConcreteDataType,
200) -> Option<Expr> {
201 match default_constraint {
202 ColumnDefaultConstraint::Value(v) => v
203 .try_to_scalar_value(target_type)
204 .ok()
205 .map(|x| Expr::Literal(x, None)),
206
207 ColumnDefaultConstraint::Function(name)
208 if matches!(
209 name.as_str(),
210 CURRENT_TIMESTAMP | CURRENT_TIMESTAMP_FN | NOW_FN
211 ) =>
212 {
213 Some(Expr::Cast(Cast {
214 expr: Box::new(NOW_EXPR.clone()),
215 data_type: target_type.as_arrow_type(),
216 }))
217 }
218
219 ColumnDefaultConstraint::Function(_) => None,
220 }
221}
222
223#[cfg(test)]
224mod tests {
225 use datafusion_common::ScalarValue;
226 use datatypes::prelude::ConcreteDataType;
227 use datatypes::schema::ColumnDefaultConstraint;
228
229 use super::*;
230
231 #[test]
232 fn test_collect_columns_defaults() {
233 let column_schemas = [
234 ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), false),
235 ColumnSchema::new("col2", ConcreteDataType::string_datatype(), true)
236 .with_default_constraint(Some(ColumnDefaultConstraint::Value("test".into())))
237 .unwrap(),
238 ColumnSchema::new(
239 "ts",
240 ConcreteDataType::timestamp_millisecond_datatype(),
241 false,
242 )
243 .with_time_index(true)
244 .with_default_constraint(Some(ColumnDefaultConstraint::Function(
245 "current_timestamp".to_string(),
246 )))
247 .unwrap(),
248 ];
249 let column_defaults = collect_column_defaults(&column_schemas[..]);
250
251 assert!(!column_defaults.contains_key("col1"));
252 assert!(matches!(column_defaults.get("col2").unwrap(),
253 Expr::Literal(ScalarValue::Utf8(Some(s)), _) if s == "test"));
254 assert!(matches!(
255 column_defaults.get("ts").unwrap(),
256 Expr::Cast(Cast {
257 expr,
258 data_type
259 }) if **expr == *NOW_EXPR && *data_type == ConcreteDataType::timestamp_millisecond_datatype().as_arrow_type()
260 ));
261 }
262}