1use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17
18use common_recordbatch::SendableRecordBatchStream;
19use datafusion::execution::FunctionRegistry;
20use datafusion::logical_expr::expr::ScalarFunction;
21use datafusion::logical_expr::Cast;
22use datafusion::prelude::SessionContext;
23use datafusion_expr::expr::Expr;
24use datatypes::data_type::DataType;
25use datatypes::prelude::ConcreteDataType;
26use datatypes::schema::constraint::{CURRENT_TIMESTAMP, CURRENT_TIMESTAMP_FN, NOW_FN};
27use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema, SchemaRef};
28use lazy_static::lazy_static;
29use snafu::ResultExt;
30use store_api::data_source::DataSourceRef;
31use store_api::storage::ScanRequest;
32
33use crate::error::{Result, TablesRecordBatchSnafu};
34use crate::metadata::{FilterPushDownType, TableInfoRef, TableType};
35
36pub mod adapter;
37mod metrics;
38pub mod numbers;
39pub mod scan;
40
41lazy_static! {
42 static ref NOW_EXPR: Expr = {
44 let ctx = SessionContext::new();
45
46 let now_udf = ctx.udf("now").expect("now UDF not found");
47
48 Expr::ScalarFunction(ScalarFunction {
49 func: now_udf,
50 args: vec![],
51 })
52 };
53}
54
55pub type TableRef = Arc<Table>;
56
57pub struct Table {
59 table_info: TableInfoRef,
60 filter_pushdown: FilterPushDownType,
61 data_source: DataSourceRef,
62 column_defaults: HashMap<String, Expr>,
64}
65
66impl Table {
67 pub fn new(
68 table_info: TableInfoRef,
69 filter_pushdown: FilterPushDownType,
70 data_source: DataSourceRef,
71 ) -> Self {
72 Self {
73 column_defaults: collect_column_defaults(table_info.meta.schema.column_schemas()),
74 table_info,
75 filter_pushdown,
76 data_source,
77 }
78 }
79
80 pub fn get_column_default(&self, column: &str) -> Option<&Expr> {
82 self.column_defaults.get(column)
83 }
84
85 pub fn data_source(&self) -> DataSourceRef {
86 self.data_source.clone()
87 }
88
89 pub fn schema(&self) -> SchemaRef {
91 self.table_info.meta.schema.clone()
92 }
93
94 pub fn table_info(&self) -> TableInfoRef {
96 self.table_info.clone()
97 }
98
99 pub fn table_type(&self) -> TableType {
101 self.table_info.table_type
102 }
103
104 pub async fn scan_to_stream(&self, request: ScanRequest) -> Result<SendableRecordBatchStream> {
105 self.data_source
106 .get_stream(request)
107 .context(TablesRecordBatchSnafu)
108 }
109
110 pub fn supports_filters_pushdown(&self, filters: &[&Expr]) -> Result<Vec<FilterPushDownType>> {
113 Ok(vec![self.filter_pushdown; filters.len()])
114 }
115
116 pub fn primary_key_columns(&self) -> impl Iterator<Item = ColumnSchema> + '_ {
118 self.table_info
119 .meta
120 .primary_key_indices
121 .iter()
122 .map(|i| self.table_info.meta.schema.column_schemas()[*i].clone())
123 }
124
125 pub fn field_columns(&self) -> impl Iterator<Item = ColumnSchema> + '_ {
127 let primary_keys = self
129 .table_info
130 .meta
131 .primary_key_indices
132 .iter()
133 .copied()
134 .collect::<HashSet<_>>();
135
136 self.table_info
137 .meta
138 .schema
139 .column_schemas()
140 .iter()
141 .enumerate()
142 .filter(move |(i, c)| !primary_keys.contains(i) && !c.is_time_index())
143 .map(|(_, c)| c.clone())
144 }
145}
146
147fn collect_column_defaults(column_schemas: &[ColumnSchema]) -> HashMap<String, Expr> {
149 column_schemas
150 .iter()
151 .filter_map(|column_schema| {
152 default_constraint_to_expr(
153 column_schema.default_constraint()?,
154 &column_schema.data_type,
155 )
156 .map(|expr| (column_schema.name.to_string(), expr))
157 })
158 .collect()
159}
160
161fn default_constraint_to_expr(
163 default_constraint: &ColumnDefaultConstraint,
164 target_type: &ConcreteDataType,
165) -> Option<Expr> {
166 match default_constraint {
167 ColumnDefaultConstraint::Value(v) => {
168 v.try_to_scalar_value(target_type).ok().map(Expr::Literal)
169 }
170
171 ColumnDefaultConstraint::Function(name)
172 if matches!(
173 name.as_str(),
174 CURRENT_TIMESTAMP | CURRENT_TIMESTAMP_FN | NOW_FN
175 ) =>
176 {
177 Some(Expr::Cast(Cast {
178 expr: Box::new(NOW_EXPR.clone()),
179 data_type: target_type.as_arrow_type(),
180 }))
181 }
182
183 ColumnDefaultConstraint::Function(_) => None,
184 }
185}
186
187#[cfg(test)]
188mod tests {
189 use datafusion_common::ScalarValue;
190 use datatypes::prelude::ConcreteDataType;
191 use datatypes::schema::ColumnDefaultConstraint;
192
193 use super::*;
194
195 #[test]
196 fn test_collect_columns_defaults() {
197 let column_schemas = vec![
198 ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), false),
199 ColumnSchema::new("col2", ConcreteDataType::string_datatype(), true)
200 .with_default_constraint(Some(ColumnDefaultConstraint::Value("test".into())))
201 .unwrap(),
202 ColumnSchema::new(
203 "ts",
204 ConcreteDataType::timestamp_millisecond_datatype(),
205 false,
206 )
207 .with_time_index(true)
208 .with_default_constraint(Some(ColumnDefaultConstraint::Function(
209 "current_timestamp".to_string(),
210 )))
211 .unwrap(),
212 ];
213 let column_defaults = collect_column_defaults(&column_schemas[..]);
214
215 assert!(!column_defaults.contains_key("col1"));
216 assert!(matches!(column_defaults.get("col2").unwrap(),
217 Expr::Literal(ScalarValue::Utf8(Some(s))) if s == "test"));
218 assert!(matches!(
219 column_defaults.get("ts").unwrap(),
220 Expr::Cast(Cast {
221 expr,
222 data_type
223 }) if **expr == *NOW_EXPR && *data_type == ConcreteDataType::timestamp_millisecond_datatype().as_arrow_type()
224 ));
225 }
226}