1use std::any::Any;
16use std::collections::BTreeSet;
17use std::sync::{Arc, Weak};
18
19use async_stream::try_stream;
20use common_catalog::consts::{
21 DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, INFORMATION_SCHEMA_NAME, NUMBERS_TABLE_ID,
22 PG_CATALOG_NAME,
23};
24use common_error::ext::BoxedError;
25use common_meta::cache::{
26 LayeredCacheRegistryRef, TableInfoCacheRef, TableNameCacheRef, TableRoute, TableRouteCacheRef,
27 ViewInfoCacheRef,
28};
29use common_meta::key::catalog_name::CatalogNameKey;
30use common_meta::key::flow::FlowMetadataManager;
31use common_meta::key::schema_name::SchemaNameKey;
32use common_meta::key::table_info::{TableInfoManager, TableInfoValue};
33use common_meta::key::table_name::TableNameKey;
34use common_meta::key::TableMetadataManagerRef;
35use common_meta::kv_backend::KvBackendRef;
36use common_procedure::ProcedureManagerRef;
37use futures_util::stream::BoxStream;
38use futures_util::{StreamExt, TryStreamExt};
39use moka::sync::Cache;
40use partition::manager::PartitionRuleManagerRef;
41use session::context::{Channel, QueryContext};
42use snafu::prelude::*;
43use store_api::metric_engine_consts::METRIC_ENGINE_NAME;
44use table::dist_table::DistTable;
45use table::metadata::{TableId, TableInfoRef};
46use table::table::numbers::{NumbersTable, NUMBERS_TABLE_NAME};
47use table::table::PartitionRules;
48use table::table_name::TableName;
49use table::TableRef;
50use tokio::sync::Semaphore;
51use tokio_stream::wrappers::ReceiverStream;
52
53use crate::error::{
54 CacheNotFoundSnafu, GetTableCacheSnafu, InvalidTableInfoInCatalogSnafu, ListCatalogsSnafu,
55 ListSchemasSnafu, ListTablesSnafu, Result, TableMetadataManagerSnafu,
56};
57#[cfg(feature = "enterprise")]
58use crate::information_schema::InformationSchemaTableFactoryRef;
59use crate::information_schema::{InformationExtensionRef, InformationSchemaProvider};
60use crate::kvbackend::TableCacheRef;
61use crate::process_manager::ProcessManagerRef;
62use crate::system_schema::pg_catalog::PGCatalogProvider;
63use crate::system_schema::SystemSchemaProvider;
64use crate::CatalogManager;
65
66#[derive(Clone)]
72pub struct KvBackendCatalogManager {
73 pub(super) information_extension: InformationExtensionRef,
75 pub(super) partition_manager: PartitionRuleManagerRef,
77 pub(super) table_metadata_manager: TableMetadataManagerRef,
79 pub(super) system_catalog: SystemCatalog,
81 pub(super) cache_registry: LayeredCacheRegistryRef,
83 pub(super) procedure_manager: Option<ProcedureManagerRef>,
85}
86
87pub(super) const CATALOG_CACHE_MAX_CAPACITY: u64 = 128;
88
89impl KvBackendCatalogManager {
90 pub fn view_info_cache(&self) -> Result<ViewInfoCacheRef> {
91 self.cache_registry.get().context(CacheNotFoundSnafu {
92 name: "view_info_cache",
93 })
94 }
95
96 pub fn information_extension(&self) -> InformationExtensionRef {
98 self.information_extension.clone()
99 }
100
101 pub fn partition_manager(&self) -> PartitionRuleManagerRef {
102 self.partition_manager.clone()
103 }
104
105 pub fn table_metadata_manager_ref(&self) -> &TableMetadataManagerRef {
106 &self.table_metadata_manager
107 }
108
109 pub fn procedure_manager(&self) -> Option<ProcedureManagerRef> {
110 self.procedure_manager.clone()
111 }
112
113 async fn override_logical_table_partition_key_indices(
115 table_route_cache: &TableRouteCacheRef,
116 table_info_manager: &TableInfoManager,
117 table: TableRef,
118 ) -> Result<TableRef> {
119 if table.table_info().meta.engine != METRIC_ENGINE_NAME {
121 return Ok(table);
122 }
123
124 if let Some(table_route_value) = table_route_cache
125 .get(table.table_info().table_id())
126 .await
127 .context(TableMetadataManagerSnafu)?
128 && let TableRoute::Logical(logical_route) = &*table_route_value
129 && let Some(physical_table_info_value) = table_info_manager
130 .get(logical_route.physical_table_id())
131 .await
132 .context(TableMetadataManagerSnafu)?
133 {
134 let mut new_table_info = (*table.table_info()).clone();
135
136 let mut phy_part_cols_not_in_logical_table = vec![];
137
138 new_table_info.meta.partition_key_indices = physical_table_info_value
140 .table_info
141 .meta
142 .partition_key_indices
143 .iter()
144 .filter_map(|&physical_index| {
145 physical_table_info_value
147 .table_info
148 .meta
149 .schema
150 .column_schemas
151 .get(physical_index)
152 .and_then(|physical_column| {
153 let idx = new_table_info
155 .meta
156 .schema
157 .column_index_by_name(physical_column.name.as_str());
158 if idx.is_none() {
159 phy_part_cols_not_in_logical_table
161 .push(physical_column.name.clone());
162 }
163
164 idx
165 })
166 })
167 .collect();
168
169 let partition_rules = if !phy_part_cols_not_in_logical_table.is_empty() {
170 Some(PartitionRules {
171 extra_phy_cols_not_in_logical_table: phy_part_cols_not_in_logical_table,
172 })
173 } else {
174 None
175 };
176
177 let new_table = DistTable::table_partitioned(Arc::new(new_table_info), partition_rules);
178
179 return Ok(new_table);
180 }
181
182 Ok(table)
183 }
184}
185
186#[async_trait::async_trait]
187impl CatalogManager for KvBackendCatalogManager {
188 fn as_any(&self) -> &dyn Any {
189 self
190 }
191
192 async fn catalog_names(&self) -> Result<Vec<String>> {
193 let stream = self
194 .table_metadata_manager
195 .catalog_manager()
196 .catalog_names();
197
198 let keys = stream
199 .try_collect::<Vec<_>>()
200 .await
201 .map_err(BoxedError::new)
202 .context(ListCatalogsSnafu)?;
203
204 Ok(keys)
205 }
206
207 async fn schema_names(
208 &self,
209 catalog: &str,
210 query_ctx: Option<&QueryContext>,
211 ) -> Result<Vec<String>> {
212 let stream = self
213 .table_metadata_manager
214 .schema_manager()
215 .schema_names(catalog);
216 let mut keys = stream
217 .try_collect::<BTreeSet<_>>()
218 .await
219 .map_err(BoxedError::new)
220 .context(ListSchemasSnafu { catalog })?;
221
222 keys.extend(self.system_catalog.schema_names(query_ctx));
223
224 Ok(keys.into_iter().collect())
225 }
226
227 async fn table_names(
228 &self,
229 catalog: &str,
230 schema: &str,
231 query_ctx: Option<&QueryContext>,
232 ) -> Result<Vec<String>> {
233 let mut tables = self
234 .table_metadata_manager
235 .table_name_manager()
236 .tables(catalog, schema)
237 .map_ok(|(table_name, _)| table_name)
238 .try_collect::<Vec<_>>()
239 .await
240 .map_err(BoxedError::new)
241 .context(ListTablesSnafu { catalog, schema })?;
242
243 tables.extend(self.system_catalog.table_names(schema, query_ctx));
244 Ok(tables)
245 }
246
247 async fn catalog_exists(&self, catalog: &str) -> Result<bool> {
248 self.table_metadata_manager
249 .catalog_manager()
250 .exists(CatalogNameKey::new(catalog))
251 .await
252 .context(TableMetadataManagerSnafu)
253 }
254
255 async fn schema_exists(
256 &self,
257 catalog: &str,
258 schema: &str,
259 query_ctx: Option<&QueryContext>,
260 ) -> Result<bool> {
261 if self.system_catalog.schema_exists(schema, query_ctx) {
262 return Ok(true);
263 }
264
265 self.table_metadata_manager
266 .schema_manager()
267 .exists(SchemaNameKey::new(catalog, schema))
268 .await
269 .context(TableMetadataManagerSnafu)
270 }
271
272 async fn table_exists(
273 &self,
274 catalog: &str,
275 schema: &str,
276 table: &str,
277 query_ctx: Option<&QueryContext>,
278 ) -> Result<bool> {
279 if self.system_catalog.table_exists(schema, table, query_ctx) {
280 return Ok(true);
281 }
282
283 let key = TableNameKey::new(catalog, schema, table);
284 self.table_metadata_manager
285 .table_name_manager()
286 .get(key)
287 .await
288 .context(TableMetadataManagerSnafu)
289 .map(|x| x.is_some())
290 }
291
292 async fn table(
293 &self,
294 catalog_name: &str,
295 schema_name: &str,
296 table_name: &str,
297 query_ctx: Option<&QueryContext>,
298 ) -> Result<Option<TableRef>> {
299 let channel = query_ctx.map_or(Channel::Unknown, |ctx| ctx.channel());
300 if let Some(table) =
301 self.system_catalog
302 .table(catalog_name, schema_name, table_name, query_ctx)
303 {
304 return Ok(Some(table));
305 }
306
307 let table_cache: TableCacheRef = self.cache_registry.get().context(CacheNotFoundSnafu {
308 name: "table_cache",
309 })?;
310
311 let table = table_cache
312 .get_by_ref(&TableName {
313 catalog_name: catalog_name.to_string(),
314 schema_name: schema_name.to_string(),
315 table_name: table_name.to_string(),
316 })
317 .await
318 .context(GetTableCacheSnafu)?;
319
320 if let Some(table) = table {
321 let table_route_cache: TableRouteCacheRef =
322 self.cache_registry.get().context(CacheNotFoundSnafu {
323 name: "table_route_cache",
324 })?;
325 return Self::override_logical_table_partition_key_indices(
326 &table_route_cache,
327 self.table_metadata_manager.table_info_manager(),
328 table,
329 )
330 .await
331 .map(Some);
332 }
333
334 if channel == Channel::Postgres {
335 if let Some(table) =
337 self.system_catalog
338 .table(catalog_name, PG_CATALOG_NAME, table_name, query_ctx)
339 {
340 return Ok(Some(table));
341 }
342 }
343
344 Ok(None)
345 }
346
347 async fn table_id(
348 &self,
349 catalog_name: &str,
350 schema_name: &str,
351 table_name: &str,
352 query_ctx: Option<&QueryContext>,
353 ) -> Result<Option<TableId>> {
354 let channel = query_ctx.map_or(Channel::Unknown, |ctx| ctx.channel());
355 if let Some(table) =
356 self.system_catalog
357 .table(catalog_name, schema_name, table_name, query_ctx)
358 {
359 return Ok(Some(table.table_info().table_id()));
360 }
361
362 let table_cache: TableNameCacheRef =
363 self.cache_registry.get().context(CacheNotFoundSnafu {
364 name: "table_name_cache",
365 })?;
366
367 let table = table_cache
368 .get_by_ref(&TableName {
369 catalog_name: catalog_name.to_string(),
370 schema_name: schema_name.to_string(),
371 table_name: table_name.to_string(),
372 })
373 .await
374 .context(GetTableCacheSnafu)?;
375
376 if let Some(table) = table {
377 return Ok(Some(table));
378 }
379
380 if channel == Channel::Postgres {
381 if let Some(table) =
383 self.system_catalog
384 .table(catalog_name, PG_CATALOG_NAME, table_name, query_ctx)
385 {
386 return Ok(Some(table.table_info().table_id()));
387 }
388 }
389
390 Ok(None)
391 }
392
393 async fn table_info_by_id(&self, table_id: TableId) -> Result<Option<TableInfoRef>> {
394 let table_info_cache: TableInfoCacheRef =
395 self.cache_registry.get().context(CacheNotFoundSnafu {
396 name: "table_info_cache",
397 })?;
398 table_info_cache
399 .get_by_ref(&table_id)
400 .await
401 .context(GetTableCacheSnafu)
402 }
403
404 async fn tables_by_ids(
405 &self,
406 catalog: &str,
407 schema: &str,
408 table_ids: &[TableId],
409 ) -> Result<Vec<TableRef>> {
410 let table_info_values = self
411 .table_metadata_manager
412 .table_info_manager()
413 .batch_get(table_ids)
414 .await
415 .context(TableMetadataManagerSnafu)?;
416
417 let tables = table_info_values
418 .into_values()
419 .filter(|t| t.table_info.catalog_name == catalog && t.table_info.schema_name == schema)
420 .map(build_table)
421 .collect::<Result<Vec<_>>>()?;
422
423 Ok(tables)
424 }
425
426 fn tables<'a>(
427 &'a self,
428 catalog: &'a str,
429 schema: &'a str,
430 query_ctx: Option<&'a QueryContext>,
431 ) -> BoxStream<'a, Result<TableRef>> {
432 let sys_tables = try_stream!({
433 let sys_table_names = self.system_catalog.table_names(schema, query_ctx);
435 for table_name in sys_table_names {
436 if let Some(table) =
437 self.system_catalog
438 .table(catalog, schema, &table_name, query_ctx)
439 {
440 yield table;
441 }
442 }
443 });
444
445 const BATCH_SIZE: usize = 128;
446 const CONCURRENCY: usize = 8;
447
448 let (tx, rx) = tokio::sync::mpsc::channel(64);
449 let metadata_manager = self.table_metadata_manager.clone();
450 let catalog = catalog.to_string();
451 let schema = schema.to_string();
452 let semaphore = Arc::new(Semaphore::new(CONCURRENCY));
453 let table_route_cache: Result<TableRouteCacheRef> =
454 self.cache_registry.get().context(CacheNotFoundSnafu {
455 name: "table_route_cache",
456 });
457
458 common_runtime::spawn_global(async move {
459 let table_route_cache = match table_route_cache {
460 Ok(table_route_cache) => table_route_cache,
461 Err(e) => {
462 let _ = tx.send(Err(e)).await;
463 return;
464 }
465 };
466
467 let table_id_stream = metadata_manager
468 .table_name_manager()
469 .tables(&catalog, &schema)
470 .map_ok(|(_, v)| v.table_id());
471 let mut table_id_chunks = table_id_stream.ready_chunks(BATCH_SIZE);
473
474 while let Some(table_ids) = table_id_chunks.next().await {
475 let table_ids = match table_ids
476 .into_iter()
477 .collect::<std::result::Result<Vec<_>, _>>()
478 .map_err(BoxedError::new)
479 .context(ListTablesSnafu {
480 catalog: &catalog,
481 schema: &schema,
482 }) {
483 Ok(table_ids) => table_ids,
484 Err(e) => {
485 let _ = tx.send(Err(e)).await;
486 return;
487 }
488 };
489
490 let metadata_manager = metadata_manager.clone();
491 let tx = tx.clone();
492 let semaphore = semaphore.clone();
493 let table_route_cache = table_route_cache.clone();
494 common_runtime::spawn_global(async move {
495 let _ = semaphore.acquire().await;
497 let table_info_values = match metadata_manager
498 .table_info_manager()
499 .batch_get(&table_ids)
500 .await
501 .context(TableMetadataManagerSnafu)
502 {
503 Ok(table_info_values) => table_info_values,
504 Err(e) => {
505 let _ = tx.send(Err(e)).await;
506 return;
507 }
508 };
509
510 for table in table_info_values.into_values().map(build_table) {
511 let table = if let Ok(table) = table {
512 Self::override_logical_table_partition_key_indices(
513 &table_route_cache,
514 metadata_manager.table_info_manager(),
515 table,
516 )
517 .await
518 } else {
519 table
520 };
521 if tx.send(table).await.is_err() {
522 return;
523 }
524 }
525 });
526 }
527 });
528
529 let user_tables = ReceiverStream::new(rx);
530 Box::pin(sys_tables.chain(user_tables))
531 }
532}
533
534fn build_table(table_info_value: TableInfoValue) -> Result<TableRef> {
535 let table_info = table_info_value
536 .table_info
537 .try_into()
538 .context(InvalidTableInfoInCatalogSnafu)?;
539 Ok(DistTable::table(Arc::new(table_info)))
540}
541
542#[derive(Clone)]
550pub(super) struct SystemCatalog {
551 pub(super) catalog_manager: Weak<KvBackendCatalogManager>,
552 pub(super) catalog_cache: Cache<String, Arc<InformationSchemaProvider>>,
553 pub(super) pg_catalog_cache: Cache<String, Arc<PGCatalogProvider>>,
554
555 pub(super) information_schema_provider: Arc<InformationSchemaProvider>,
557 pub(super) pg_catalog_provider: Arc<PGCatalogProvider>,
558 pub(super) backend: KvBackendRef,
559 pub(super) process_manager: Option<ProcessManagerRef>,
560 #[cfg(feature = "enterprise")]
561 pub(super) extra_information_table_factories:
562 std::collections::HashMap<String, InformationSchemaTableFactoryRef>,
563}
564
565impl SystemCatalog {
566 fn schema_names(&self, query_ctx: Option<&QueryContext>) -> Vec<String> {
567 let channel = query_ctx.map_or(Channel::Unknown, |ctx| ctx.channel());
568 match channel {
569 Channel::Postgres => vec![
571 INFORMATION_SCHEMA_NAME.to_string(),
572 PG_CATALOG_NAME.to_string(),
573 ],
574 _ => {
575 vec![INFORMATION_SCHEMA_NAME.to_string()]
576 }
577 }
578 }
579
580 fn table_names(&self, schema: &str, query_ctx: Option<&QueryContext>) -> Vec<String> {
581 let channel = query_ctx.map_or(Channel::Unknown, |ctx| ctx.channel());
582 match schema {
583 INFORMATION_SCHEMA_NAME => self.information_schema_provider.table_names(),
584 PG_CATALOG_NAME if channel == Channel::Postgres => {
585 self.pg_catalog_provider.table_names()
586 }
587 DEFAULT_SCHEMA_NAME => {
588 vec![NUMBERS_TABLE_NAME.to_string()]
589 }
590 _ => vec![],
591 }
592 }
593
594 fn schema_exists(&self, schema: &str, query_ctx: Option<&QueryContext>) -> bool {
595 let channel = query_ctx.map_or(Channel::Unknown, |ctx| ctx.channel());
596 match channel {
597 Channel::Postgres => schema == PG_CATALOG_NAME || schema == INFORMATION_SCHEMA_NAME,
598 _ => schema == INFORMATION_SCHEMA_NAME,
599 }
600 }
601
602 fn table_exists(&self, schema: &str, table: &str, query_ctx: Option<&QueryContext>) -> bool {
603 let channel = query_ctx.map_or(Channel::Unknown, |ctx| ctx.channel());
604 if schema == INFORMATION_SCHEMA_NAME {
605 self.information_schema_provider.table(table).is_some()
606 } else if schema == DEFAULT_SCHEMA_NAME {
607 table == NUMBERS_TABLE_NAME
608 } else if schema == PG_CATALOG_NAME && channel == Channel::Postgres {
609 self.pg_catalog_provider.table(table).is_some()
610 } else {
611 false
612 }
613 }
614
615 fn table(
616 &self,
617 catalog: &str,
618 schema: &str,
619 table_name: &str,
620 query_ctx: Option<&QueryContext>,
621 ) -> Option<TableRef> {
622 let channel = query_ctx.map_or(Channel::Unknown, |ctx| ctx.channel());
623 if schema == INFORMATION_SCHEMA_NAME {
624 let information_schema_provider =
625 self.catalog_cache.get_with_by_ref(catalog, move || {
626 let provider = InformationSchemaProvider::new(
627 catalog.to_string(),
628 self.catalog_manager.clone(),
629 Arc::new(FlowMetadataManager::new(self.backend.clone())),
630 self.process_manager.clone(),
631 self.backend.clone(),
632 );
633 #[cfg(feature = "enterprise")]
634 let provider = provider
635 .with_extra_table_factories(self.extra_information_table_factories.clone());
636 Arc::new(provider)
637 });
638 information_schema_provider.table(table_name)
639 } else if schema == PG_CATALOG_NAME && channel == Channel::Postgres {
640 if catalog == DEFAULT_CATALOG_NAME {
641 self.pg_catalog_provider.table(table_name)
642 } else {
643 let pg_catalog_provider =
644 self.pg_catalog_cache.get_with_by_ref(catalog, move || {
645 Arc::new(PGCatalogProvider::new(
646 catalog.to_string(),
647 self.catalog_manager.clone(),
648 ))
649 });
650 pg_catalog_provider.table(table_name)
651 }
652 } else if schema == DEFAULT_SCHEMA_NAME && table_name == NUMBERS_TABLE_NAME {
653 Some(NumbersTable::table(NUMBERS_TABLE_ID))
654 } else {
655 None
656 }
657 }
658}