1use std::collections::HashMap;
16use std::sync::Arc;
17
18use bytes::Bytes;
19use common_catalog::format_full_table_name;
20use common_query::logical_plan::{rename_logical_plan_columns, SubstraitPlanDecoderRef};
21use datafusion::common::{ResolvedTableReference, TableReference};
22use datafusion::datasource::view::ViewTable;
23use datafusion::datasource::{provider_as_source, TableProvider};
24use datafusion::logical_expr::TableSource;
25use itertools::Itertools;
26use session::context::QueryContextRef;
27use snafu::{ensure, OptionExt, ResultExt};
28use table::metadata::TableType;
29use table::table::adapter::DfTableProviderAdapter;
30pub mod dummy_catalog;
31use dummy_catalog::DummyCatalogList;
32use table::TableRef;
33
34use crate::error::{
35 CastManagerSnafu, DecodePlanSnafu, GetViewCacheSnafu, ProjectViewColumnsSnafu,
36 QueryAccessDeniedSnafu, Result, TableNotExistSnafu, ViewInfoNotFoundSnafu,
37 ViewPlanColumnsChangedSnafu,
38};
39use crate::kvbackend::KvBackendCatalogManager;
40use crate::CatalogManagerRef;
41
42pub struct DfTableSourceProvider {
43 catalog_manager: CatalogManagerRef,
44 resolved_tables: HashMap<String, Arc<dyn TableSource>>,
45 disallow_cross_catalog_query: bool,
46 default_catalog: String,
47 default_schema: String,
48 query_ctx: QueryContextRef,
49 plan_decoder: SubstraitPlanDecoderRef,
50 enable_ident_normalization: bool,
51}
52
53impl DfTableSourceProvider {
54 pub fn new(
55 catalog_manager: CatalogManagerRef,
56 disallow_cross_catalog_query: bool,
57 query_ctx: QueryContextRef,
58 plan_decoder: SubstraitPlanDecoderRef,
59 enable_ident_normalization: bool,
60 ) -> Self {
61 Self {
62 catalog_manager,
63 disallow_cross_catalog_query,
64 resolved_tables: HashMap::new(),
65 default_catalog: query_ctx.current_catalog().to_owned(),
66 default_schema: query_ctx.current_schema(),
67 query_ctx,
68 plan_decoder,
69 enable_ident_normalization,
70 }
71 }
72
73 pub fn query_ctx(&self) -> &QueryContextRef {
75 &self.query_ctx
76 }
77
78 pub fn resolve_table_ref(&self, table_ref: TableReference) -> Result<ResolvedTableReference> {
79 if self.disallow_cross_catalog_query {
80 match &table_ref {
81 TableReference::Bare { .. } | TableReference::Partial { .. } => {}
82 TableReference::Full {
83 catalog, schema, ..
84 } => {
85 ensure!(
86 catalog.as_ref() == self.default_catalog,
87 QueryAccessDeniedSnafu {
88 catalog: catalog.as_ref(),
89 schema: schema.as_ref(),
90 }
91 );
92 }
93 };
94 }
95
96 Ok(table_ref.resolve(&self.default_catalog, &self.default_schema))
97 }
98
99 pub async fn resolve_table(
100 &mut self,
101 table_ref: TableReference,
102 ) -> Result<Arc<dyn TableSource>> {
103 let table_ref = self.resolve_table_ref(table_ref)?;
104
105 let resolved_name = table_ref.to_string();
106 if let Some(table) = self.resolved_tables.get(&resolved_name) {
107 return Ok(table.clone());
108 }
109
110 let catalog_name = table_ref.catalog.as_ref();
111 let schema_name = table_ref.schema.as_ref();
112 let table_name = table_ref.table.as_ref();
113
114 let table = self
115 .catalog_manager
116 .table(catalog_name, schema_name, table_name, Some(&self.query_ctx))
117 .await?
118 .with_context(|| TableNotExistSnafu {
119 table: format_full_table_name(catalog_name, schema_name, table_name),
120 })?;
121
122 let provider: Arc<dyn TableProvider> = if table.table_info().table_type == TableType::View {
123 self.create_view_provider(&table).await?
124 } else {
125 Arc::new(DfTableProviderAdapter::new(table))
126 };
127
128 let source = provider_as_source(provider);
129
130 let _ = self.resolved_tables.insert(resolved_name, source.clone());
131 Ok(source)
132 }
133
134 async fn create_view_provider(&self, table: &TableRef) -> Result<Arc<dyn TableProvider>> {
135 let catalog_manager = self
136 .catalog_manager
137 .as_any()
138 .downcast_ref::<KvBackendCatalogManager>()
139 .context(CastManagerSnafu)?;
140
141 let view_info = catalog_manager
142 .view_info_cache()?
143 .get(table.table_info().ident.table_id)
144 .await
145 .context(GetViewCacheSnafu)?
146 .context(ViewInfoNotFoundSnafu {
147 name: &table.table_info().name,
148 })?;
149
150 let catalog_list = Arc::new(DummyCatalogList::new(self.catalog_manager.clone()));
152 let logical_plan = self
153 .plan_decoder
154 .decode(Bytes::from(view_info.view_info.clone()), catalog_list, true)
155 .await
156 .context(DecodePlanSnafu {
157 name: &table.table_info().name,
158 })?;
159
160 let columns: Vec<_> = view_info.columns.iter().map(|c| c.as_str()).collect();
161
162 let original_plan_columns: Vec<_> =
163 view_info.plan_columns.iter().map(|c| c.as_str()).collect();
164
165 let plan_columns: Vec<_> = logical_plan
166 .schema()
167 .columns()
168 .into_iter()
169 .map(|c| c.name)
170 .collect();
171
172 ensure!(
177 original_plan_columns.len() == plan_columns.len(),
178 ViewPlanColumnsChangedSnafu {
179 origin_names: original_plan_columns.iter().join(","),
180 actual_names: plan_columns.iter().join(","),
181 }
182 );
183
184 let logical_plan = if !columns.is_empty() {
188 rename_logical_plan_columns(
189 self.enable_ident_normalization,
190 logical_plan,
191 plan_columns
192 .iter()
193 .map(|c| c.as_str())
194 .zip(columns.into_iter())
195 .collect(),
196 )
197 .context(ProjectViewColumnsSnafu)?
198 } else {
199 logical_plan
200 };
201
202 Ok(Arc::new(ViewTable::new(
203 logical_plan,
204 Some(view_info.definition.to_string()),
205 )))
206 }
207}
208
209#[cfg(test)]
210mod tests {
211 use common_query::test_util::DummyDecoder;
212 use session::context::QueryContext;
213
214 use super::*;
215 use crate::kvbackend::KvBackendCatalogManagerBuilder;
216 use crate::memory::MemoryCatalogManager;
217
218 #[test]
219 fn test_validate_table_ref() {
220 let query_ctx = Arc::new(QueryContext::with("greptime", "public"));
221
222 let table_provider = DfTableSourceProvider::new(
223 MemoryCatalogManager::with_default_setup(),
224 true,
225 query_ctx.clone(),
226 DummyDecoder::arc(),
227 true,
228 );
229
230 let table_ref = TableReference::bare("table_name");
231 let result = table_provider.resolve_table_ref(table_ref);
232 assert!(result.is_ok());
233
234 let table_ref = TableReference::partial("public", "table_name");
235 let result = table_provider.resolve_table_ref(table_ref);
236 assert!(result.is_ok());
237
238 let table_ref = TableReference::partial("wrong_schema", "table_name");
239 let result = table_provider.resolve_table_ref(table_ref);
240 assert!(result.is_ok());
241
242 let table_ref = TableReference::full("greptime", "public", "table_name");
243 let result = table_provider.resolve_table_ref(table_ref);
244 assert!(result.is_ok());
245
246 let table_ref = TableReference::full("wrong_catalog", "public", "table_name");
247 let result = table_provider.resolve_table_ref(table_ref);
248 assert!(result.is_err());
249
250 let table_ref = TableReference::partial("information_schema", "columns");
251 let result = table_provider.resolve_table_ref(table_ref);
252 assert!(result.is_ok());
253
254 let table_ref = TableReference::full("greptime", "information_schema", "columns");
255 assert!(table_provider.resolve_table_ref(table_ref).is_ok());
256
257 let table_ref = TableReference::full("dummy", "information_schema", "columns");
258 assert!(table_provider.resolve_table_ref(table_ref).is_err());
259
260 let table_ref = TableReference::full("greptime", "greptime_private", "columns");
261 assert!(table_provider.resolve_table_ref(table_ref).is_ok());
262 }
263
264 use std::collections::HashSet;
265
266 use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
267 use cache::{build_fundamental_cache_registry, with_default_composite_cache_registry};
268 use common_meta::cache::{CacheRegistryBuilder, LayeredCacheRegistryBuilder};
269 use common_meta::key::TableMetadataManager;
270 use common_meta::kv_backend::memory::MemoryKvBackend;
271 use common_query::error::Result as QueryResult;
272 use common_query::logical_plan::SubstraitPlanDecoder;
273 use datafusion::catalog::CatalogProviderList;
274 use datafusion::logical_expr::builder::LogicalTableSource;
275 use datafusion::logical_expr::{col, lit, LogicalPlan, LogicalPlanBuilder};
276
277 use crate::information_schema::NoopInformationExtension;
278
279 struct MockDecoder;
280 impl MockDecoder {
281 pub fn arc() -> Arc<Self> {
282 Arc::new(MockDecoder)
283 }
284 }
285
286 #[async_trait::async_trait]
287 impl SubstraitPlanDecoder for MockDecoder {
288 async fn decode(
289 &self,
290 _message: bytes::Bytes,
291 _catalog_list: Arc<dyn CatalogProviderList>,
292 _optimize: bool,
293 ) -> QueryResult<LogicalPlan> {
294 Ok(mock_plan())
295 }
296 }
297
298 fn mock_plan() -> LogicalPlan {
299 let schema = Schema::new(vec![
300 Field::new("id", DataType::Int32, true),
301 Field::new("name", DataType::Utf8, true),
302 ]);
303 let table_source = LogicalTableSource::new(SchemaRef::new(schema));
304
305 let projection = None;
306
307 let builder =
308 LogicalPlanBuilder::scan("person", Arc::new(table_source), projection).unwrap();
309
310 builder
311 .filter(col("id").gt(lit(500)))
312 .unwrap()
313 .build()
314 .unwrap()
315 }
316
317 #[tokio::test]
318 async fn test_resolve_view() {
319 let query_ctx = Arc::new(QueryContext::with("greptime", "public"));
320 let backend = Arc::new(MemoryKvBackend::default());
321 let layered_cache_builder = LayeredCacheRegistryBuilder::default()
322 .add_cache_registry(CacheRegistryBuilder::default().build());
323 let fundamental_cache_registry = build_fundamental_cache_registry(backend.clone());
324 let layered_cache_registry = Arc::new(
325 with_default_composite_cache_registry(
326 layered_cache_builder.add_cache_registry(fundamental_cache_registry),
327 )
328 .unwrap()
329 .build(),
330 );
331
332 let catalog_manager = KvBackendCatalogManagerBuilder::new(
333 Arc::new(NoopInformationExtension),
334 backend.clone(),
335 layered_cache_registry,
336 )
337 .build();
338
339 let table_metadata_manager = TableMetadataManager::new(backend);
340 let mut view_info = common_meta::key::test_utils::new_test_table_info(1024, vec![]);
341 view_info.table_type = TableType::View;
342 let logical_plan = vec![1, 2, 3];
343 table_metadata_manager
345 .create_view_metadata(
346 view_info.clone().into(),
347 logical_plan,
348 HashSet::new(),
349 vec!["a".to_string(), "b".to_string()],
350 vec!["id".to_string(), "name".to_string()],
351 "definition".to_string(),
352 )
353 .await
354 .unwrap();
355
356 let mut table_provider = DfTableSourceProvider::new(
357 catalog_manager,
358 true,
359 query_ctx.clone(),
360 MockDecoder::arc(),
361 true,
362 );
363
364 let table_ref = TableReference::bare("not_exists_view");
366 assert!(table_provider.resolve_table(table_ref).await.is_err());
367
368 let table_ref = TableReference::bare(view_info.name);
369 let source = table_provider.resolve_table(table_ref).await.unwrap();
370 assert_eq!(
371 r#"
372Projection: person.id AS a, person.name AS b
373 Filter: person.id > Int32(500)
374 TableScan: person"#,
375 format!("\n{}", source.get_logical_plan().unwrap())
376 );
377 }
378}