1use std::any::Any;
16use std::collections::{BTreeSet, HashSet};
17use std::sync::{Arc, Weak};
18
19use async_stream::try_stream;
20use common_catalog::consts::{
21 DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, INFORMATION_SCHEMA_NAME, NUMBERS_TABLE_ID,
22 PG_CATALOG_NAME,
23};
24use common_error::ext::BoxedError;
25use common_meta::cache::{
26 LayeredCacheRegistryRef, TableRoute, TableRouteCacheRef, ViewInfoCacheRef,
27};
28use common_meta::key::catalog_name::CatalogNameKey;
29use common_meta::key::flow::FlowMetadataManager;
30use common_meta::key::schema_name::SchemaNameKey;
31use common_meta::key::table_info::{TableInfoManager, TableInfoValue};
32use common_meta::key::table_name::TableNameKey;
33use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
34use common_meta::kv_backend::KvBackendRef;
35use common_procedure::ProcedureManagerRef;
36use futures_util::stream::BoxStream;
37use futures_util::{StreamExt, TryStreamExt};
38use moka::sync::Cache;
39use partition::manager::{PartitionRuleManager, PartitionRuleManagerRef};
40use session::context::{Channel, QueryContext};
41use snafu::prelude::*;
42use store_api::metric_engine_consts::METRIC_ENGINE_NAME;
43use table::dist_table::DistTable;
44use table::metadata::TableId;
45use table::table::numbers::{NumbersTable, NUMBERS_TABLE_NAME};
46use table::table_name::TableName;
47use table::TableRef;
48use tokio::sync::Semaphore;
49use tokio_stream::wrappers::ReceiverStream;
50
51use crate::error::{
52 CacheNotFoundSnafu, GetTableCacheSnafu, InvalidTableInfoInCatalogSnafu, ListCatalogsSnafu,
53 ListSchemasSnafu, ListTablesSnafu, Result, TableMetadataManagerSnafu,
54};
55use crate::information_schema::{InformationExtensionRef, InformationSchemaProvider};
56use crate::kvbackend::TableCacheRef;
57use crate::process_manager::ProcessManagerRef;
58use crate::system_schema::pg_catalog::PGCatalogProvider;
59use crate::system_schema::SystemSchemaProvider;
60use crate::CatalogManager;
61
62#[derive(Clone)]
68pub struct KvBackendCatalogManager {
69 information_extension: InformationExtensionRef,
71 partition_manager: PartitionRuleManagerRef,
73 table_metadata_manager: TableMetadataManagerRef,
75 system_catalog: SystemCatalog,
77 cache_registry: LayeredCacheRegistryRef,
79 procedure_manager: Option<ProcedureManagerRef>,
81}
82
83const CATALOG_CACHE_MAX_CAPACITY: u64 = 128;
84
85impl KvBackendCatalogManager {
86 pub fn new(
87 information_extension: InformationExtensionRef,
88 backend: KvBackendRef,
89 cache_registry: LayeredCacheRegistryRef,
90 procedure_manager: Option<ProcedureManagerRef>,
91 process_manager: Option<ProcessManagerRef>,
92 ) -> Arc<Self> {
93 Arc::new_cyclic(|me| Self {
94 information_extension,
95 partition_manager: Arc::new(PartitionRuleManager::new(
96 backend.clone(),
97 cache_registry
98 .get()
99 .expect("Failed to get table_route_cache"),
100 )),
101 table_metadata_manager: Arc::new(TableMetadataManager::new(backend.clone())),
102 system_catalog: SystemCatalog {
103 catalog_manager: me.clone(),
104 catalog_cache: Cache::new(CATALOG_CACHE_MAX_CAPACITY),
105 pg_catalog_cache: Cache::new(CATALOG_CACHE_MAX_CAPACITY),
106 information_schema_provider: Arc::new(InformationSchemaProvider::new(
107 DEFAULT_CATALOG_NAME.to_string(),
108 me.clone(),
109 Arc::new(FlowMetadataManager::new(backend.clone())),
110 process_manager.clone(),
111 )),
112 pg_catalog_provider: Arc::new(PGCatalogProvider::new(
113 DEFAULT_CATALOG_NAME.to_string(),
114 me.clone(),
115 )),
116 backend,
117 process_manager,
118 },
119 cache_registry,
120 procedure_manager,
121 })
122 }
123
124 pub fn view_info_cache(&self) -> Result<ViewInfoCacheRef> {
125 self.cache_registry.get().context(CacheNotFoundSnafu {
126 name: "view_info_cache",
127 })
128 }
129
130 pub fn information_extension(&self) -> InformationExtensionRef {
132 self.information_extension.clone()
133 }
134
135 pub fn partition_manager(&self) -> PartitionRuleManagerRef {
136 self.partition_manager.clone()
137 }
138
139 pub fn table_metadata_manager_ref(&self) -> &TableMetadataManagerRef {
140 &self.table_metadata_manager
141 }
142
143 pub fn procedure_manager(&self) -> Option<ProcedureManagerRef> {
144 self.procedure_manager.clone()
145 }
146
147 async fn override_logical_table_partition_key_indices(
149 table_route_cache: &TableRouteCacheRef,
150 table_info_manager: &TableInfoManager,
151 table: TableRef,
152 ) -> Result<TableRef> {
153 if table.table_info().meta.engine != METRIC_ENGINE_NAME {
155 return Ok(table);
156 }
157
158 if let Some(table_route_value) = table_route_cache
159 .get(table.table_info().table_id())
160 .await
161 .context(TableMetadataManagerSnafu)?
162 && let TableRoute::Logical(logical_route) = &*table_route_value
163 && let Some(physical_table_info_value) = table_info_manager
164 .get(logical_route.physical_table_id())
165 .await
166 .context(TableMetadataManagerSnafu)?
167 {
168 let mut new_table_info = (*table.table_info()).clone();
169 let logical_column_names: HashSet<_> = new_table_info
171 .meta
172 .schema
173 .column_schemas()
174 .iter()
175 .map(|col| &col.name)
176 .collect();
177
178 new_table_info.meta.partition_key_indices = physical_table_info_value
180 .table_info
181 .meta
182 .partition_key_indices
183 .iter()
184 .filter(|&&index| {
185 physical_table_info_value
186 .table_info
187 .meta
188 .schema
189 .column_schemas
190 .get(index)
191 .map(|physical_column| logical_column_names.contains(&physical_column.name))
192 .unwrap_or(false)
193 })
194 .cloned()
195 .collect();
196
197 let new_table = DistTable::table(Arc::new(new_table_info));
198
199 return Ok(new_table);
200 }
201
202 Ok(table)
203 }
204}
205
206#[async_trait::async_trait]
207impl CatalogManager for KvBackendCatalogManager {
208 fn as_any(&self) -> &dyn Any {
209 self
210 }
211
212 async fn catalog_names(&self) -> Result<Vec<String>> {
213 let stream = self
214 .table_metadata_manager
215 .catalog_manager()
216 .catalog_names();
217
218 let keys = stream
219 .try_collect::<Vec<_>>()
220 .await
221 .map_err(BoxedError::new)
222 .context(ListCatalogsSnafu)?;
223
224 Ok(keys)
225 }
226
227 async fn schema_names(
228 &self,
229 catalog: &str,
230 query_ctx: Option<&QueryContext>,
231 ) -> Result<Vec<String>> {
232 let stream = self
233 .table_metadata_manager
234 .schema_manager()
235 .schema_names(catalog);
236 let mut keys = stream
237 .try_collect::<BTreeSet<_>>()
238 .await
239 .map_err(BoxedError::new)
240 .context(ListSchemasSnafu { catalog })?;
241
242 keys.extend(self.system_catalog.schema_names(query_ctx));
243
244 Ok(keys.into_iter().collect())
245 }
246
247 async fn table_names(
248 &self,
249 catalog: &str,
250 schema: &str,
251 query_ctx: Option<&QueryContext>,
252 ) -> Result<Vec<String>> {
253 let mut tables = self
254 .table_metadata_manager
255 .table_name_manager()
256 .tables(catalog, schema)
257 .map_ok(|(table_name, _)| table_name)
258 .try_collect::<Vec<_>>()
259 .await
260 .map_err(BoxedError::new)
261 .context(ListTablesSnafu { catalog, schema })?;
262
263 tables.extend(self.system_catalog.table_names(schema, query_ctx));
264 Ok(tables)
265 }
266
267 async fn catalog_exists(&self, catalog: &str) -> Result<bool> {
268 self.table_metadata_manager
269 .catalog_manager()
270 .exists(CatalogNameKey::new(catalog))
271 .await
272 .context(TableMetadataManagerSnafu)
273 }
274
275 async fn schema_exists(
276 &self,
277 catalog: &str,
278 schema: &str,
279 query_ctx: Option<&QueryContext>,
280 ) -> Result<bool> {
281 if self.system_catalog.schema_exists(schema, query_ctx) {
282 return Ok(true);
283 }
284
285 self.table_metadata_manager
286 .schema_manager()
287 .exists(SchemaNameKey::new(catalog, schema))
288 .await
289 .context(TableMetadataManagerSnafu)
290 }
291
292 async fn table_exists(
293 &self,
294 catalog: &str,
295 schema: &str,
296 table: &str,
297 query_ctx: Option<&QueryContext>,
298 ) -> Result<bool> {
299 if self.system_catalog.table_exists(schema, table, query_ctx) {
300 return Ok(true);
301 }
302
303 let key = TableNameKey::new(catalog, schema, table);
304 self.table_metadata_manager
305 .table_name_manager()
306 .get(key)
307 .await
308 .context(TableMetadataManagerSnafu)
309 .map(|x| x.is_some())
310 }
311
312 async fn table(
313 &self,
314 catalog_name: &str,
315 schema_name: &str,
316 table_name: &str,
317 query_ctx: Option<&QueryContext>,
318 ) -> Result<Option<TableRef>> {
319 let channel = query_ctx.map_or(Channel::Unknown, |ctx| ctx.channel());
320 if let Some(table) =
321 self.system_catalog
322 .table(catalog_name, schema_name, table_name, query_ctx)
323 {
324 return Ok(Some(table));
325 }
326
327 let table_cache: TableCacheRef = self.cache_registry.get().context(CacheNotFoundSnafu {
328 name: "table_cache",
329 })?;
330
331 let table = table_cache
332 .get_by_ref(&TableName {
333 catalog_name: catalog_name.to_string(),
334 schema_name: schema_name.to_string(),
335 table_name: table_name.to_string(),
336 })
337 .await
338 .context(GetTableCacheSnafu)?;
339
340 if let Some(table) = table {
341 let table_route_cache: TableRouteCacheRef =
342 self.cache_registry.get().context(CacheNotFoundSnafu {
343 name: "table_route_cache",
344 })?;
345 return Self::override_logical_table_partition_key_indices(
346 &table_route_cache,
347 self.table_metadata_manager.table_info_manager(),
348 table,
349 )
350 .await
351 .map(Some);
352 }
353
354 if channel == Channel::Postgres {
355 if let Some(table) =
357 self.system_catalog
358 .table(catalog_name, PG_CATALOG_NAME, table_name, query_ctx)
359 {
360 return Ok(Some(table));
361 }
362 }
363
364 Ok(None)
365 }
366
367 async fn tables_by_ids(
368 &self,
369 catalog: &str,
370 schema: &str,
371 table_ids: &[TableId],
372 ) -> Result<Vec<TableRef>> {
373 let table_info_values = self
374 .table_metadata_manager
375 .table_info_manager()
376 .batch_get(table_ids)
377 .await
378 .context(TableMetadataManagerSnafu)?;
379
380 let tables = table_info_values
381 .into_values()
382 .filter(|t| t.table_info.catalog_name == catalog && t.table_info.schema_name == schema)
383 .map(build_table)
384 .collect::<Result<Vec<_>>>()?;
385
386 Ok(tables)
387 }
388
389 fn tables<'a>(
390 &'a self,
391 catalog: &'a str,
392 schema: &'a str,
393 query_ctx: Option<&'a QueryContext>,
394 ) -> BoxStream<'a, Result<TableRef>> {
395 let sys_tables = try_stream!({
396 let sys_table_names = self.system_catalog.table_names(schema, query_ctx);
398 for table_name in sys_table_names {
399 if let Some(table) =
400 self.system_catalog
401 .table(catalog, schema, &table_name, query_ctx)
402 {
403 yield table;
404 }
405 }
406 });
407
408 const BATCH_SIZE: usize = 128;
409 const CONCURRENCY: usize = 8;
410
411 let (tx, rx) = tokio::sync::mpsc::channel(64);
412 let metadata_manager = self.table_metadata_manager.clone();
413 let catalog = catalog.to_string();
414 let schema = schema.to_string();
415 let semaphore = Arc::new(Semaphore::new(CONCURRENCY));
416 let table_route_cache: Result<TableRouteCacheRef> =
417 self.cache_registry.get().context(CacheNotFoundSnafu {
418 name: "table_route_cache",
419 });
420
421 common_runtime::spawn_global(async move {
422 let table_route_cache = match table_route_cache {
423 Ok(table_route_cache) => table_route_cache,
424 Err(e) => {
425 let _ = tx.send(Err(e)).await;
426 return;
427 }
428 };
429
430 let table_id_stream = metadata_manager
431 .table_name_manager()
432 .tables(&catalog, &schema)
433 .map_ok(|(_, v)| v.table_id());
434 let mut table_id_chunks = table_id_stream.ready_chunks(BATCH_SIZE);
436
437 while let Some(table_ids) = table_id_chunks.next().await {
438 let table_ids = match table_ids
439 .into_iter()
440 .collect::<std::result::Result<Vec<_>, _>>()
441 .map_err(BoxedError::new)
442 .context(ListTablesSnafu {
443 catalog: &catalog,
444 schema: &schema,
445 }) {
446 Ok(table_ids) => table_ids,
447 Err(e) => {
448 let _ = tx.send(Err(e)).await;
449 return;
450 }
451 };
452
453 let metadata_manager = metadata_manager.clone();
454 let tx = tx.clone();
455 let semaphore = semaphore.clone();
456 let table_route_cache = table_route_cache.clone();
457 common_runtime::spawn_global(async move {
458 let _ = semaphore.acquire().await;
460 let table_info_values = match metadata_manager
461 .table_info_manager()
462 .batch_get(&table_ids)
463 .await
464 .context(TableMetadataManagerSnafu)
465 {
466 Ok(table_info_values) => table_info_values,
467 Err(e) => {
468 let _ = tx.send(Err(e)).await;
469 return;
470 }
471 };
472
473 for table in table_info_values.into_values().map(build_table) {
474 let table = if let Ok(table) = table {
475 Self::override_logical_table_partition_key_indices(
476 &table_route_cache,
477 metadata_manager.table_info_manager(),
478 table,
479 )
480 .await
481 } else {
482 table
483 };
484 if tx.send(table).await.is_err() {
485 return;
486 }
487 }
488 });
489 }
490 });
491
492 let user_tables = ReceiverStream::new(rx);
493 Box::pin(sys_tables.chain(user_tables))
494 }
495}
496
497fn build_table(table_info_value: TableInfoValue) -> Result<TableRef> {
498 let table_info = table_info_value
499 .table_info
500 .try_into()
501 .context(InvalidTableInfoInCatalogSnafu)?;
502 Ok(DistTable::table(Arc::new(table_info)))
503}
504
505#[derive(Clone)]
513struct SystemCatalog {
514 catalog_manager: Weak<KvBackendCatalogManager>,
515 catalog_cache: Cache<String, Arc<InformationSchemaProvider>>,
516 pg_catalog_cache: Cache<String, Arc<PGCatalogProvider>>,
517
518 information_schema_provider: Arc<InformationSchemaProvider>,
520 pg_catalog_provider: Arc<PGCatalogProvider>,
521 backend: KvBackendRef,
522 process_manager: Option<ProcessManagerRef>,
523}
524
525impl SystemCatalog {
526 fn schema_names(&self, query_ctx: Option<&QueryContext>) -> Vec<String> {
527 let channel = query_ctx.map_or(Channel::Unknown, |ctx| ctx.channel());
528 match channel {
529 Channel::Postgres => vec![
531 INFORMATION_SCHEMA_NAME.to_string(),
532 PG_CATALOG_NAME.to_string(),
533 ],
534 _ => {
535 vec![INFORMATION_SCHEMA_NAME.to_string()]
536 }
537 }
538 }
539
540 fn table_names(&self, schema: &str, query_ctx: Option<&QueryContext>) -> Vec<String> {
541 let channel = query_ctx.map_or(Channel::Unknown, |ctx| ctx.channel());
542 match schema {
543 INFORMATION_SCHEMA_NAME => self.information_schema_provider.table_names(),
544 PG_CATALOG_NAME if channel == Channel::Postgres => {
545 self.pg_catalog_provider.table_names()
546 }
547 DEFAULT_SCHEMA_NAME => {
548 vec![NUMBERS_TABLE_NAME.to_string()]
549 }
550 _ => vec![],
551 }
552 }
553
554 fn schema_exists(&self, schema: &str, query_ctx: Option<&QueryContext>) -> bool {
555 let channel = query_ctx.map_or(Channel::Unknown, |ctx| ctx.channel());
556 match channel {
557 Channel::Postgres => schema == PG_CATALOG_NAME || schema == INFORMATION_SCHEMA_NAME,
558 _ => schema == INFORMATION_SCHEMA_NAME,
559 }
560 }
561
562 fn table_exists(&self, schema: &str, table: &str, query_ctx: Option<&QueryContext>) -> bool {
563 let channel = query_ctx.map_or(Channel::Unknown, |ctx| ctx.channel());
564 if schema == INFORMATION_SCHEMA_NAME {
565 self.information_schema_provider.table(table).is_some()
566 } else if schema == DEFAULT_SCHEMA_NAME {
567 table == NUMBERS_TABLE_NAME
568 } else if schema == PG_CATALOG_NAME && channel == Channel::Postgres {
569 self.pg_catalog_provider.table(table).is_some()
570 } else {
571 false
572 }
573 }
574
575 fn table(
576 &self,
577 catalog: &str,
578 schema: &str,
579 table_name: &str,
580 query_ctx: Option<&QueryContext>,
581 ) -> Option<TableRef> {
582 let channel = query_ctx.map_or(Channel::Unknown, |ctx| ctx.channel());
583 if schema == INFORMATION_SCHEMA_NAME {
584 let information_schema_provider =
585 self.catalog_cache.get_with_by_ref(catalog, move || {
586 Arc::new(InformationSchemaProvider::new(
587 catalog.to_string(),
588 self.catalog_manager.clone(),
589 Arc::new(FlowMetadataManager::new(self.backend.clone())),
590 self.process_manager.clone(),
591 ))
592 });
593 information_schema_provider.table(table_name)
594 } else if schema == PG_CATALOG_NAME && channel == Channel::Postgres {
595 if catalog == DEFAULT_CATALOG_NAME {
596 self.pg_catalog_provider.table(table_name)
597 } else {
598 let pg_catalog_provider =
599 self.pg_catalog_cache.get_with_by_ref(catalog, move || {
600 Arc::new(PGCatalogProvider::new(
601 catalog.to_string(),
602 self.catalog_manager.clone(),
603 ))
604 });
605 pg_catalog_provider.table(table_name)
606 }
607 } else if schema == DEFAULT_SCHEMA_NAME && table_name == NUMBERS_TABLE_NAME {
608 Some(NumbersTable::table(NUMBERS_TABLE_ID))
609 } else {
610 None
611 }
612 }
613}