1use std::collections::HashMap;
16use std::sync::Arc;
17
18use common_catalog::format_full_table_name;
19use common_query::logical_plan::{SubstraitPlanDecoderRef, rename_logical_plan_columns};
20use datafusion::common::{ResolvedTableReference, TableReference};
21use datafusion::datasource::view::ViewTable;
22use datafusion::datasource::{TableProvider, provider_as_source};
23use datafusion::logical_expr::TableSource;
24use itertools::Itertools;
25use session::context::QueryContextRef;
26use snafu::{OptionExt, ResultExt, ensure};
27use table::metadata::TableType;
28use table::table::adapter::DfTableProviderAdapter;
29pub mod dummy_catalog;
30use dummy_catalog::DummyCatalogList;
31use table::TableRef;
32
33use crate::CatalogManagerRef;
34use crate::error::{
35 CastManagerSnafu, DecodePlanSnafu, GetViewCacheSnafu, ProjectViewColumnsSnafu,
36 QueryAccessDeniedSnafu, Result, TableNotExistSnafu, ViewInfoNotFoundSnafu,
37 ViewPlanColumnsChangedSnafu,
38};
39use crate::kvbackend::KvBackendCatalogManager;
40
41pub struct DfTableSourceProvider {
42 catalog_manager: CatalogManagerRef,
43 resolved_tables: HashMap<String, Arc<dyn TableSource>>,
44 disallow_cross_catalog_query: bool,
45 default_catalog: String,
46 default_schema: String,
47 query_ctx: QueryContextRef,
48 plan_decoder: SubstraitPlanDecoderRef,
49 enable_ident_normalization: bool,
50}
51
52impl DfTableSourceProvider {
53 pub fn new(
54 catalog_manager: CatalogManagerRef,
55 disallow_cross_catalog_query: bool,
56 query_ctx: QueryContextRef,
57 plan_decoder: SubstraitPlanDecoderRef,
58 enable_ident_normalization: bool,
59 ) -> Self {
60 Self {
61 catalog_manager,
62 disallow_cross_catalog_query,
63 resolved_tables: HashMap::new(),
64 default_catalog: query_ctx.current_catalog().to_owned(),
65 default_schema: query_ctx.current_schema(),
66 query_ctx,
67 plan_decoder,
68 enable_ident_normalization,
69 }
70 }
71
72 pub fn query_ctx(&self) -> &QueryContextRef {
74 &self.query_ctx
75 }
76
77 pub fn resolve_table_ref(&self, table_ref: TableReference) -> Result<ResolvedTableReference> {
78 if self.disallow_cross_catalog_query {
79 match &table_ref {
80 TableReference::Bare { .. } | TableReference::Partial { .. } => {}
81 TableReference::Full {
82 catalog, schema, ..
83 } => {
84 ensure!(
85 catalog.as_ref() == self.default_catalog,
86 QueryAccessDeniedSnafu {
87 catalog: catalog.as_ref(),
88 schema: schema.as_ref(),
89 }
90 );
91 }
92 };
93 }
94
95 Ok(table_ref.resolve(&self.default_catalog, &self.default_schema))
96 }
97
98 pub async fn resolve_table(
99 &mut self,
100 table_ref: TableReference,
101 ) -> Result<Arc<dyn TableSource>> {
102 let table_ref = self.resolve_table_ref(table_ref)?;
103
104 let resolved_name = table_ref.to_string();
105 if let Some(table) = self.resolved_tables.get(&resolved_name) {
106 return Ok(table.clone());
107 }
108
109 let catalog_name = table_ref.catalog.as_ref();
110 let schema_name = table_ref.schema.as_ref();
111 let table_name = table_ref.table.as_ref();
112
113 let table = self
114 .catalog_manager
115 .table(catalog_name, schema_name, table_name, Some(&self.query_ctx))
116 .await?
117 .with_context(|| TableNotExistSnafu {
118 table: format_full_table_name(catalog_name, schema_name, table_name),
119 })?;
120
121 let provider: Arc<dyn TableProvider> = if table.table_info().table_type == TableType::View {
122 self.create_view_provider(&table).await?
123 } else {
124 Arc::new(DfTableProviderAdapter::new(table))
125 };
126
127 let source = provider_as_source(provider);
128
129 let _ = self.resolved_tables.insert(resolved_name, source.clone());
130 Ok(source)
131 }
132
133 async fn create_view_provider(&self, table: &TableRef) -> Result<Arc<dyn TableProvider>> {
134 let catalog_manager = self
135 .catalog_manager
136 .as_any()
137 .downcast_ref::<KvBackendCatalogManager>()
138 .context(CastManagerSnafu)?;
139
140 let view_info = catalog_manager
141 .view_info_cache()?
142 .get(table.table_info().ident.table_id)
143 .await
144 .context(GetViewCacheSnafu)?
145 .context(ViewInfoNotFoundSnafu {
146 name: &table.table_info().name,
147 })?;
148
149 let catalog_list = Arc::new(DummyCatalogList::new(self.catalog_manager.clone()));
151 let logical_plan = self
152 .plan_decoder
153 .decode(view_info.view_info.clone().into(), catalog_list, false)
154 .await
155 .context(DecodePlanSnafu {
156 name: &table.table_info().name,
157 })?;
158
159 let columns: Vec<_> = view_info.columns.iter().map(|c| c.as_str()).collect();
160
161 let original_plan_columns: Vec<_> =
162 view_info.plan_columns.iter().map(|c| c.as_str()).collect();
163
164 let plan_columns: Vec<_> = logical_plan
165 .schema()
166 .columns()
167 .into_iter()
168 .map(|c| c.name)
169 .collect();
170
171 ensure!(
176 original_plan_columns.len() == plan_columns.len(),
177 ViewPlanColumnsChangedSnafu {
178 origin_names: original_plan_columns.iter().join(","),
179 actual_names: plan_columns.iter().join(","),
180 }
181 );
182
183 let logical_plan = if !columns.is_empty() {
187 rename_logical_plan_columns(
188 self.enable_ident_normalization,
189 logical_plan,
190 plan_columns
191 .iter()
192 .map(|c| c.as_str())
193 .zip(columns)
194 .collect(),
195 )
196 .context(ProjectViewColumnsSnafu)?
197 } else {
198 logical_plan
199 };
200
201 Ok(Arc::new(ViewTable::new(
202 logical_plan,
203 Some(view_info.definition.clone()),
204 )))
205 }
206}
207
208#[cfg(test)]
209mod tests {
210 use common_query::test_util::DummyDecoder;
211 use session::context::QueryContext;
212
213 use super::*;
214 use crate::kvbackend::KvBackendCatalogManagerBuilder;
215 use crate::memory::MemoryCatalogManager;
216
217 #[test]
218 fn test_validate_table_ref() {
219 let query_ctx = Arc::new(QueryContext::with("greptime", "public"));
220
221 let table_provider = DfTableSourceProvider::new(
222 MemoryCatalogManager::with_default_setup(),
223 true,
224 query_ctx.clone(),
225 DummyDecoder::arc(),
226 true,
227 );
228
229 let table_ref = TableReference::bare("table_name");
230 let result = table_provider.resolve_table_ref(table_ref);
231 assert!(result.is_ok());
232
233 let table_ref = TableReference::partial("public", "table_name");
234 let result = table_provider.resolve_table_ref(table_ref);
235 assert!(result.is_ok());
236
237 let table_ref = TableReference::partial("wrong_schema", "table_name");
238 let result = table_provider.resolve_table_ref(table_ref);
239 assert!(result.is_ok());
240
241 let table_ref = TableReference::full("greptime", "public", "table_name");
242 let result = table_provider.resolve_table_ref(table_ref);
243 assert!(result.is_ok());
244
245 let table_ref = TableReference::full("wrong_catalog", "public", "table_name");
246 let result = table_provider.resolve_table_ref(table_ref);
247 assert!(result.is_err());
248
249 let table_ref = TableReference::partial("information_schema", "columns");
250 let result = table_provider.resolve_table_ref(table_ref);
251 assert!(result.is_ok());
252
253 let table_ref = TableReference::full("greptime", "information_schema", "columns");
254 assert!(table_provider.resolve_table_ref(table_ref).is_ok());
255
256 let table_ref = TableReference::full("dummy", "information_schema", "columns");
257 assert!(table_provider.resolve_table_ref(table_ref).is_err());
258
259 let table_ref = TableReference::full("greptime", "greptime_private", "columns");
260 assert!(table_provider.resolve_table_ref(table_ref).is_ok());
261 }
262
263 use std::collections::HashSet;
264
265 use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
266 use cache::{build_fundamental_cache_registry, with_default_composite_cache_registry};
267 use common_meta::cache::{CacheRegistryBuilder, LayeredCacheRegistryBuilder};
268 use common_meta::key::TableMetadataManager;
269 use common_meta::kv_backend::memory::MemoryKvBackend;
270 use common_query::error::Result as QueryResult;
271 use common_query::logical_plan::SubstraitPlanDecoder;
272 use datafusion::catalog::CatalogProviderList;
273 use datafusion::logical_expr::builder::LogicalTableSource;
274 use datafusion::logical_expr::{LogicalPlan, LogicalPlanBuilder, col, lit};
275
276 use crate::information_schema::NoopInformationExtension;
277
278 struct MockDecoder;
279 impl MockDecoder {
280 pub fn arc() -> Arc<Self> {
281 Arc::new(MockDecoder)
282 }
283 }
284
285 #[async_trait::async_trait]
286 impl SubstraitPlanDecoder for MockDecoder {
287 async fn decode(
288 &self,
289 _message: bytes::Bytes,
290 _catalog_list: Arc<dyn CatalogProviderList>,
291 _optimize: bool,
292 ) -> QueryResult<LogicalPlan> {
293 Ok(mock_plan())
294 }
295 }
296
297 fn mock_plan() -> LogicalPlan {
298 let schema = Schema::new(vec![
299 Field::new("id", DataType::Int32, true),
300 Field::new("name", DataType::Utf8, true),
301 ]);
302 let table_source = LogicalTableSource::new(SchemaRef::new(schema));
303
304 let projection = None;
305
306 let builder =
307 LogicalPlanBuilder::scan("person", Arc::new(table_source), projection).unwrap();
308
309 builder
310 .filter(col("id").gt(lit(500)))
311 .unwrap()
312 .build()
313 .unwrap()
314 }
315
316 #[tokio::test]
317 async fn test_resolve_view() {
318 let query_ctx = Arc::new(QueryContext::with("greptime", "public"));
319 let backend = Arc::new(MemoryKvBackend::default());
320 let layered_cache_builder = LayeredCacheRegistryBuilder::default()
321 .add_cache_registry(CacheRegistryBuilder::default().build());
322 let fundamental_cache_registry = build_fundamental_cache_registry(backend.clone());
323 let layered_cache_registry = Arc::new(
324 with_default_composite_cache_registry(
325 layered_cache_builder.add_cache_registry(fundamental_cache_registry),
326 )
327 .unwrap()
328 .build(),
329 );
330
331 let catalog_manager = KvBackendCatalogManagerBuilder::new(
332 Arc::new(NoopInformationExtension),
333 backend.clone(),
334 layered_cache_registry,
335 )
336 .build();
337
338 let table_metadata_manager = TableMetadataManager::new(backend);
339 let mut view_info = common_meta::key::test_utils::new_test_table_info(1024);
340 view_info.table_type = TableType::View;
341 let logical_plan = vec![1, 2, 3];
342 table_metadata_manager
344 .create_view_metadata(
345 view_info.clone(),
346 logical_plan,
347 HashSet::new(),
348 vec!["a".to_string(), "b".to_string()],
349 vec!["id".to_string(), "name".to_string()],
350 "definition".to_string(),
351 )
352 .await
353 .unwrap();
354
355 let mut table_provider = DfTableSourceProvider::new(
356 catalog_manager,
357 true,
358 query_ctx.clone(),
359 MockDecoder::arc(),
360 true,
361 );
362
363 let table_ref = TableReference::bare("not_exists_view");
365 assert!(table_provider.resolve_table(table_ref).await.is_err());
366
367 let table_ref = TableReference::bare(view_info.name);
368 let source = table_provider.resolve_table(table_ref).await.unwrap();
369 assert_eq!(
370 r#"
371Projection: person.id AS a, person.name AS b
372 Filter: person.id > Int32(500)
373 TableScan: person"#,
374 format!("\n{}", source.get_logical_plan().unwrap())
375 );
376 }
377}