1use std::collections::HashMap;
16use std::sync::Arc;
17
18use bytes::Bytes;
19use common_catalog::format_full_table_name;
20use common_query::logical_plan::{rename_logical_plan_columns, SubstraitPlanDecoderRef};
21use datafusion::common::{ResolvedTableReference, TableReference};
22use datafusion::datasource::view::ViewTable;
23use datafusion::datasource::{provider_as_source, TableProvider};
24use datafusion::logical_expr::TableSource;
25use itertools::Itertools;
26use session::context::QueryContextRef;
27use snafu::{ensure, OptionExt, ResultExt};
28use table::metadata::TableType;
29use table::table::adapter::DfTableProviderAdapter;
30pub mod dummy_catalog;
31use dummy_catalog::DummyCatalogList;
32use table::TableRef;
33
34use crate::error::{
35 CastManagerSnafu, DatafusionSnafu, DecodePlanSnafu, GetViewCacheSnafu, ProjectViewColumnsSnafu,
36 QueryAccessDeniedSnafu, Result, TableNotExistSnafu, ViewInfoNotFoundSnafu,
37 ViewPlanColumnsChangedSnafu,
38};
39use crate::kvbackend::KvBackendCatalogManager;
40use crate::CatalogManagerRef;
41
42pub struct DfTableSourceProvider {
43 catalog_manager: CatalogManagerRef,
44 resolved_tables: HashMap<String, Arc<dyn TableSource>>,
45 disallow_cross_catalog_query: bool,
46 default_catalog: String,
47 default_schema: String,
48 query_ctx: QueryContextRef,
49 plan_decoder: SubstraitPlanDecoderRef,
50 enable_ident_normalization: bool,
51}
52
53impl DfTableSourceProvider {
54 pub fn new(
55 catalog_manager: CatalogManagerRef,
56 disallow_cross_catalog_query: bool,
57 query_ctx: QueryContextRef,
58 plan_decoder: SubstraitPlanDecoderRef,
59 enable_ident_normalization: bool,
60 ) -> Self {
61 Self {
62 catalog_manager,
63 disallow_cross_catalog_query,
64 resolved_tables: HashMap::new(),
65 default_catalog: query_ctx.current_catalog().to_owned(),
66 default_schema: query_ctx.current_schema(),
67 query_ctx,
68 plan_decoder,
69 enable_ident_normalization,
70 }
71 }
72
73 pub fn resolve_table_ref(&self, table_ref: TableReference) -> Result<ResolvedTableReference> {
74 if self.disallow_cross_catalog_query {
75 match &table_ref {
76 TableReference::Bare { .. } | TableReference::Partial { .. } => {}
77 TableReference::Full {
78 catalog, schema, ..
79 } => {
80 ensure!(
81 catalog.as_ref() == self.default_catalog,
82 QueryAccessDeniedSnafu {
83 catalog: catalog.as_ref(),
84 schema: schema.as_ref(),
85 }
86 );
87 }
88 };
89 }
90
91 Ok(table_ref.resolve(&self.default_catalog, &self.default_schema))
92 }
93
94 pub async fn resolve_table(
95 &mut self,
96 table_ref: TableReference,
97 ) -> Result<Arc<dyn TableSource>> {
98 let table_ref = self.resolve_table_ref(table_ref)?;
99
100 let resolved_name = table_ref.to_string();
101 if let Some(table) = self.resolved_tables.get(&resolved_name) {
102 return Ok(table.clone());
103 }
104
105 let catalog_name = table_ref.catalog.as_ref();
106 let schema_name = table_ref.schema.as_ref();
107 let table_name = table_ref.table.as_ref();
108
109 let table = self
110 .catalog_manager
111 .table(catalog_name, schema_name, table_name, Some(&self.query_ctx))
112 .await?
113 .with_context(|| TableNotExistSnafu {
114 table: format_full_table_name(catalog_name, schema_name, table_name),
115 })?;
116
117 let provider: Arc<dyn TableProvider> = if table.table_info().table_type == TableType::View {
118 self.create_view_provider(&table).await?
119 } else {
120 Arc::new(DfTableProviderAdapter::new(table))
121 };
122
123 let source = provider_as_source(provider);
124
125 let _ = self.resolved_tables.insert(resolved_name, source.clone());
126 Ok(source)
127 }
128
129 async fn create_view_provider(&self, table: &TableRef) -> Result<Arc<dyn TableProvider>> {
130 let catalog_manager = self
131 .catalog_manager
132 .as_any()
133 .downcast_ref::<KvBackendCatalogManager>()
134 .context(CastManagerSnafu)?;
135
136 let view_info = catalog_manager
137 .view_info_cache()?
138 .get(table.table_info().ident.table_id)
139 .await
140 .context(GetViewCacheSnafu)?
141 .context(ViewInfoNotFoundSnafu {
142 name: &table.table_info().name,
143 })?;
144
145 let catalog_list = Arc::new(DummyCatalogList::new(self.catalog_manager.clone()));
147 let logical_plan = self
148 .plan_decoder
149 .decode(Bytes::from(view_info.view_info.clone()), catalog_list, true)
150 .await
151 .context(DecodePlanSnafu {
152 name: &table.table_info().name,
153 })?;
154
155 let columns: Vec<_> = view_info.columns.iter().map(|c| c.as_str()).collect();
156
157 let original_plan_columns: Vec<_> =
158 view_info.plan_columns.iter().map(|c| c.as_str()).collect();
159
160 let plan_columns: Vec<_> = logical_plan
161 .schema()
162 .columns()
163 .into_iter()
164 .map(|c| c.name)
165 .collect();
166
167 ensure!(
172 original_plan_columns.len() == plan_columns.len(),
173 ViewPlanColumnsChangedSnafu {
174 origin_names: original_plan_columns.iter().join(","),
175 actual_names: plan_columns.iter().join(","),
176 }
177 );
178
179 let logical_plan = if !columns.is_empty() {
183 rename_logical_plan_columns(
184 self.enable_ident_normalization,
185 logical_plan,
186 plan_columns
187 .iter()
188 .map(|c| c.as_str())
189 .zip(columns.into_iter())
190 .collect(),
191 )
192 .context(ProjectViewColumnsSnafu)?
193 } else {
194 logical_plan
195 };
196
197 Ok(Arc::new(
198 ViewTable::try_new(logical_plan, Some(view_info.definition.to_string()))
199 .context(DatafusionSnafu)?,
200 ))
201 }
202}
203
204#[cfg(test)]
205mod tests {
206 use common_query::test_util::DummyDecoder;
207 use session::context::QueryContext;
208
209 use super::*;
210 use crate::memory::MemoryCatalogManager;
211
212 #[test]
213 fn test_validate_table_ref() {
214 let query_ctx = Arc::new(QueryContext::with("greptime", "public"));
215
216 let table_provider = DfTableSourceProvider::new(
217 MemoryCatalogManager::with_default_setup(),
218 true,
219 query_ctx.clone(),
220 DummyDecoder::arc(),
221 true,
222 );
223
224 let table_ref = TableReference::bare("table_name");
225 let result = table_provider.resolve_table_ref(table_ref);
226 assert!(result.is_ok());
227
228 let table_ref = TableReference::partial("public", "table_name");
229 let result = table_provider.resolve_table_ref(table_ref);
230 assert!(result.is_ok());
231
232 let table_ref = TableReference::partial("wrong_schema", "table_name");
233 let result = table_provider.resolve_table_ref(table_ref);
234 assert!(result.is_ok());
235
236 let table_ref = TableReference::full("greptime", "public", "table_name");
237 let result = table_provider.resolve_table_ref(table_ref);
238 assert!(result.is_ok());
239
240 let table_ref = TableReference::full("wrong_catalog", "public", "table_name");
241 let result = table_provider.resolve_table_ref(table_ref);
242 assert!(result.is_err());
243
244 let table_ref = TableReference::partial("information_schema", "columns");
245 let result = table_provider.resolve_table_ref(table_ref);
246 assert!(result.is_ok());
247
248 let table_ref = TableReference::full("greptime", "information_schema", "columns");
249 assert!(table_provider.resolve_table_ref(table_ref).is_ok());
250
251 let table_ref = TableReference::full("dummy", "information_schema", "columns");
252 assert!(table_provider.resolve_table_ref(table_ref).is_err());
253
254 let table_ref = TableReference::full("greptime", "greptime_private", "columns");
255 assert!(table_provider.resolve_table_ref(table_ref).is_ok());
256 }
257
258 use std::collections::HashSet;
259
260 use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
261 use cache::{build_fundamental_cache_registry, with_default_composite_cache_registry};
262 use common_meta::cache::{CacheRegistryBuilder, LayeredCacheRegistryBuilder};
263 use common_meta::key::TableMetadataManager;
264 use common_meta::kv_backend::memory::MemoryKvBackend;
265 use common_query::error::Result as QueryResult;
266 use common_query::logical_plan::SubstraitPlanDecoder;
267 use datafusion::catalog::CatalogProviderList;
268 use datafusion::logical_expr::builder::LogicalTableSource;
269 use datafusion::logical_expr::{col, lit, LogicalPlan, LogicalPlanBuilder};
270
271 use crate::information_schema::NoopInformationExtension;
272
273 struct MockDecoder;
274 impl MockDecoder {
275 pub fn arc() -> Arc<Self> {
276 Arc::new(MockDecoder)
277 }
278 }
279
280 #[async_trait::async_trait]
281 impl SubstraitPlanDecoder for MockDecoder {
282 async fn decode(
283 &self,
284 _message: bytes::Bytes,
285 _catalog_list: Arc<dyn CatalogProviderList>,
286 _optimize: bool,
287 ) -> QueryResult<LogicalPlan> {
288 Ok(mock_plan())
289 }
290 }
291
292 fn mock_plan() -> LogicalPlan {
293 let schema = Schema::new(vec![
294 Field::new("id", DataType::Int32, true),
295 Field::new("name", DataType::Utf8, true),
296 ]);
297 let table_source = LogicalTableSource::new(SchemaRef::new(schema));
298
299 let projection = None;
300
301 let builder =
302 LogicalPlanBuilder::scan("person", Arc::new(table_source), projection).unwrap();
303
304 builder
305 .filter(col("id").gt(lit(500)))
306 .unwrap()
307 .build()
308 .unwrap()
309 }
310
311 #[tokio::test]
312 async fn test_resolve_view() {
313 let query_ctx = Arc::new(QueryContext::with("greptime", "public"));
314 let backend = Arc::new(MemoryKvBackend::default());
315 let layered_cache_builder = LayeredCacheRegistryBuilder::default()
316 .add_cache_registry(CacheRegistryBuilder::default().build());
317 let fundamental_cache_registry = build_fundamental_cache_registry(backend.clone());
318 let layered_cache_registry = Arc::new(
319 with_default_composite_cache_registry(
320 layered_cache_builder.add_cache_registry(fundamental_cache_registry),
321 )
322 .unwrap()
323 .build(),
324 );
325
326 let catalog_manager = KvBackendCatalogManager::new(
327 Arc::new(NoopInformationExtension),
328 backend.clone(),
329 layered_cache_registry,
330 None,
331 );
332 let table_metadata_manager = TableMetadataManager::new(backend);
333 let mut view_info = common_meta::key::test_utils::new_test_table_info(1024, vec![]);
334 view_info.table_type = TableType::View;
335 let logical_plan = vec![1, 2, 3];
336 table_metadata_manager
338 .create_view_metadata(
339 view_info.clone().into(),
340 logical_plan,
341 HashSet::new(),
342 vec!["a".to_string(), "b".to_string()],
343 vec!["id".to_string(), "name".to_string()],
344 "definition".to_string(),
345 )
346 .await
347 .unwrap();
348
349 let mut table_provider = DfTableSourceProvider::new(
350 catalog_manager,
351 true,
352 query_ctx.clone(),
353 MockDecoder::arc(),
354 true,
355 );
356
357 let table_ref = TableReference::bare("not_exists_view");
359 assert!(table_provider.resolve_table(table_ref).await.is_err());
360
361 let table_ref = TableReference::bare(view_info.name);
362 let source = table_provider.resolve_table(table_ref).await.unwrap();
363 assert_eq!(
364 r#"
365Projection: person.id AS a, person.name AS b
366 Filter: person.id > Int32(500)
367 TableScan: person"#,
368 format!("\n{}", source.get_logical_plan().unwrap())
369 );
370 }
371}