1use std::any::Any;
16use std::collections::BTreeSet;
17use std::sync::{Arc, Weak};
18
19use async_stream::try_stream;
20use common_catalog::consts::{
21 DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, INFORMATION_SCHEMA_NAME, PG_CATALOG_NAME,
22};
23use common_error::ext::BoxedError;
24use common_meta::cache::{
25 LayeredCacheRegistryRef, TableInfoCacheRef, TableNameCacheRef, TableRoute, TableRouteCacheRef,
26 ViewInfoCacheRef,
27};
28use common_meta::key::TableMetadataManagerRef;
29use common_meta::key::catalog_name::CatalogNameKey;
30use common_meta::key::flow::FlowMetadataManager;
31use common_meta::key::schema_name::SchemaNameKey;
32use common_meta::key::table_info::{TableInfoManager, TableInfoValue};
33use common_meta::key::table_name::TableNameKey;
34use common_meta::kv_backend::KvBackendRef;
35use common_procedure::ProcedureManagerRef;
36use futures_util::stream::BoxStream;
37use futures_util::{StreamExt, TryStreamExt};
38use moka::sync::Cache;
39use partition::manager::PartitionRuleManagerRef;
40use session::context::{Channel, QueryContext};
41use snafu::prelude::*;
42use store_api::metric_engine_consts::METRIC_ENGINE_NAME;
43use table::TableRef;
44use table::dist_table::DistTable;
45use table::metadata::{TableId, TableInfoRef};
46use table::table::PartitionRules;
47use table::table_name::TableName;
48use tokio::sync::Semaphore;
49use tokio_stream::wrappers::ReceiverStream;
50
51use crate::CatalogManager;
52use crate::error::{
53 CacheNotFoundSnafu, GetTableCacheSnafu, ListCatalogsSnafu, ListSchemasSnafu, ListTablesSnafu,
54 Result, TableMetadataManagerSnafu,
55};
56use crate::information_schema::{
57 InformationExtensionRef, InformationSchemaProvider, InformationSchemaTableFactoryRef,
58};
59use crate::kvbackend::TableCacheRef;
60use crate::process_manager::ProcessManagerRef;
61use crate::system_schema::SystemSchemaProvider;
62use crate::system_schema::numbers_table_provider::NumbersTableProvider;
63use crate::system_schema::pg_catalog::PGCatalogProvider;
64
65#[derive(Clone)]
71pub struct KvBackendCatalogManager {
72 pub(super) information_extension: InformationExtensionRef,
74 pub(super) partition_manager: PartitionRuleManagerRef,
76 pub(super) table_metadata_manager: TableMetadataManagerRef,
78 pub(super) system_catalog: SystemCatalog,
80 pub(super) cache_registry: LayeredCacheRegistryRef,
82 pub(super) procedure_manager: Option<ProcedureManagerRef>,
84}
85
86pub(super) const CATALOG_CACHE_MAX_CAPACITY: u64 = 128;
87
88impl KvBackendCatalogManager {
89 pub fn view_info_cache(&self) -> Result<ViewInfoCacheRef> {
90 self.cache_registry.get().context(CacheNotFoundSnafu {
91 name: "view_info_cache",
92 })
93 }
94
95 pub fn information_extension(&self) -> InformationExtensionRef {
97 self.information_extension.clone()
98 }
99
100 pub fn partition_manager(&self) -> PartitionRuleManagerRef {
101 self.partition_manager.clone()
102 }
103
104 pub fn table_metadata_manager_ref(&self) -> &TableMetadataManagerRef {
105 &self.table_metadata_manager
106 }
107
108 pub fn procedure_manager(&self) -> Option<ProcedureManagerRef> {
109 self.procedure_manager.clone()
110 }
111
112 async fn override_logical_table_partition_key_indices(
114 table_route_cache: &TableRouteCacheRef,
115 table_info_manager: &TableInfoManager,
116 table: TableRef,
117 ) -> Result<TableRef> {
118 if table.table_info().meta.engine != METRIC_ENGINE_NAME {
120 return Ok(table);
121 }
122
123 if let Some(table_route_value) = table_route_cache
124 .get(table.table_info().table_id())
125 .await
126 .context(TableMetadataManagerSnafu)?
127 && let TableRoute::Logical(logical_route) = &*table_route_value
128 && let Some(physical_table_info_value) = table_info_manager
129 .get(logical_route.physical_table_id())
130 .await
131 .context(TableMetadataManagerSnafu)?
132 {
133 let mut new_table_info = (*table.table_info()).clone();
134
135 let mut phy_part_cols_not_in_logical_table = vec![];
136
137 new_table_info.meta.partition_key_indices = physical_table_info_value
139 .table_info
140 .meta
141 .partition_key_indices
142 .iter()
143 .filter_map(|&physical_index| {
144 physical_table_info_value
146 .table_info
147 .meta
148 .schema
149 .column_schemas()
150 .get(physical_index)
151 .and_then(|physical_column| {
152 let idx = new_table_info
154 .meta
155 .schema
156 .column_index_by_name(physical_column.name.as_str());
157 if idx.is_none() {
158 phy_part_cols_not_in_logical_table
160 .push(physical_column.name.clone());
161 }
162
163 idx
164 })
165 })
166 .collect();
167
168 let partition_rules = if !phy_part_cols_not_in_logical_table.is_empty() {
169 Some(PartitionRules {
170 extra_phy_cols_not_in_logical_table: phy_part_cols_not_in_logical_table,
171 })
172 } else {
173 None
174 };
175
176 let new_table = DistTable::table_partitioned(Arc::new(new_table_info), partition_rules);
177
178 return Ok(new_table);
179 }
180
181 Ok(table)
182 }
183}
184
185#[async_trait::async_trait]
186impl CatalogManager for KvBackendCatalogManager {
187 fn as_any(&self) -> &dyn Any {
188 self
189 }
190
191 async fn catalog_names(&self) -> Result<Vec<String>> {
192 let stream = self
193 .table_metadata_manager
194 .catalog_manager()
195 .catalog_names();
196
197 let keys = stream
198 .try_collect::<Vec<_>>()
199 .await
200 .map_err(BoxedError::new)
201 .context(ListCatalogsSnafu)?;
202
203 Ok(keys)
204 }
205
206 async fn schema_names(
207 &self,
208 catalog: &str,
209 query_ctx: Option<&QueryContext>,
210 ) -> Result<Vec<String>> {
211 let stream = self
212 .table_metadata_manager
213 .schema_manager()
214 .schema_names(catalog);
215 let mut keys = stream
216 .try_collect::<BTreeSet<_>>()
217 .await
218 .map_err(BoxedError::new)
219 .context(ListSchemasSnafu { catalog })?;
220
221 keys.extend(self.system_catalog.schema_names(query_ctx));
222
223 Ok(keys.into_iter().collect())
224 }
225
226 async fn table_names(
227 &self,
228 catalog: &str,
229 schema: &str,
230 query_ctx: Option<&QueryContext>,
231 ) -> Result<Vec<String>> {
232 let mut tables = self
233 .table_metadata_manager
234 .table_name_manager()
235 .tables(catalog, schema)
236 .map_ok(|(table_name, _)| table_name)
237 .try_collect::<Vec<_>>()
238 .await
239 .map_err(BoxedError::new)
240 .context(ListTablesSnafu { catalog, schema })?;
241
242 tables.extend(self.system_catalog.table_names(schema, query_ctx));
243 Ok(tables)
244 }
245
246 async fn catalog_exists(&self, catalog: &str) -> Result<bool> {
247 self.table_metadata_manager
248 .catalog_manager()
249 .exists(CatalogNameKey::new(catalog))
250 .await
251 .context(TableMetadataManagerSnafu)
252 }
253
254 async fn schema_exists(
255 &self,
256 catalog: &str,
257 schema: &str,
258 query_ctx: Option<&QueryContext>,
259 ) -> Result<bool> {
260 if self.system_catalog.schema_exists(schema, query_ctx) {
261 return Ok(true);
262 }
263
264 self.table_metadata_manager
265 .schema_manager()
266 .exists(SchemaNameKey::new(catalog, schema))
267 .await
268 .context(TableMetadataManagerSnafu)
269 }
270
271 async fn table_exists(
272 &self,
273 catalog: &str,
274 schema: &str,
275 table: &str,
276 query_ctx: Option<&QueryContext>,
277 ) -> Result<bool> {
278 if self.system_catalog.table_exists(schema, table, query_ctx) {
279 return Ok(true);
280 }
281
282 let key = TableNameKey::new(catalog, schema, table);
283 self.table_metadata_manager
284 .table_name_manager()
285 .get(key)
286 .await
287 .context(TableMetadataManagerSnafu)
288 .map(|x| x.is_some())
289 }
290
291 async fn table(
292 &self,
293 catalog_name: &str,
294 schema_name: &str,
295 table_name: &str,
296 query_ctx: Option<&QueryContext>,
297 ) -> Result<Option<TableRef>> {
298 let channel = query_ctx.map_or(Channel::Unknown, |ctx| ctx.channel());
299 if let Some(table) =
300 self.system_catalog
301 .table(catalog_name, schema_name, table_name, query_ctx)
302 {
303 return Ok(Some(table));
304 }
305
306 let table_cache: TableCacheRef = self.cache_registry.get().context(CacheNotFoundSnafu {
307 name: "table_cache",
308 })?;
309
310 let table = table_cache
311 .get_by_ref(&TableName {
312 catalog_name: catalog_name.to_string(),
313 schema_name: schema_name.to_string(),
314 table_name: table_name.to_string(),
315 })
316 .await
317 .context(GetTableCacheSnafu)?;
318
319 if let Some(table) = table {
320 let table_route_cache: TableRouteCacheRef =
321 self.cache_registry.get().context(CacheNotFoundSnafu {
322 name: "table_route_cache",
323 })?;
324 return Self::override_logical_table_partition_key_indices(
325 &table_route_cache,
326 self.table_metadata_manager.table_info_manager(),
327 table,
328 )
329 .await
330 .map(Some);
331 }
332
333 if channel == Channel::Postgres {
334 if let Some(table) =
336 self.system_catalog
337 .table(catalog_name, PG_CATALOG_NAME, table_name, query_ctx)
338 {
339 return Ok(Some(table));
340 }
341 }
342
343 Ok(None)
344 }
345
346 async fn table_id(
347 &self,
348 catalog_name: &str,
349 schema_name: &str,
350 table_name: &str,
351 query_ctx: Option<&QueryContext>,
352 ) -> Result<Option<TableId>> {
353 let channel = query_ctx.map_or(Channel::Unknown, |ctx| ctx.channel());
354 if let Some(table) =
355 self.system_catalog
356 .table(catalog_name, schema_name, table_name, query_ctx)
357 {
358 return Ok(Some(table.table_info().table_id()));
359 }
360
361 let table_cache: TableNameCacheRef =
362 self.cache_registry.get().context(CacheNotFoundSnafu {
363 name: "table_name_cache",
364 })?;
365
366 let table = table_cache
367 .get_by_ref(&TableName {
368 catalog_name: catalog_name.to_string(),
369 schema_name: schema_name.to_string(),
370 table_name: table_name.to_string(),
371 })
372 .await
373 .context(GetTableCacheSnafu)?;
374
375 if let Some(table) = table {
376 return Ok(Some(table));
377 }
378
379 if channel == Channel::Postgres {
380 if let Some(table) =
382 self.system_catalog
383 .table(catalog_name, PG_CATALOG_NAME, table_name, query_ctx)
384 {
385 return Ok(Some(table.table_info().table_id()));
386 }
387 }
388
389 Ok(None)
390 }
391
392 async fn table_info_by_id(&self, table_id: TableId) -> Result<Option<TableInfoRef>> {
393 let table_info_cache: TableInfoCacheRef =
394 self.cache_registry.get().context(CacheNotFoundSnafu {
395 name: "table_info_cache",
396 })?;
397 table_info_cache
398 .get_by_ref(&table_id)
399 .await
400 .context(GetTableCacheSnafu)
401 }
402
403 async fn tables_by_ids(
404 &self,
405 catalog: &str,
406 schema: &str,
407 table_ids: &[TableId],
408 ) -> Result<Vec<TableRef>> {
409 let table_info_values = self
410 .table_metadata_manager
411 .table_info_manager()
412 .batch_get(table_ids)
413 .await
414 .context(TableMetadataManagerSnafu)?;
415
416 let tables = table_info_values
417 .into_values()
418 .filter(|t| t.table_info.catalog_name == catalog && t.table_info.schema_name == schema)
419 .map(build_table)
420 .collect::<Vec<_>>();
421
422 Ok(tables)
423 }
424
425 fn tables<'a>(
426 &'a self,
427 catalog: &'a str,
428 schema: &'a str,
429 query_ctx: Option<&'a QueryContext>,
430 ) -> BoxStream<'a, Result<TableRef>> {
431 let sys_tables = try_stream!({
432 let sys_table_names = self.system_catalog.table_names(schema, query_ctx);
434 for table_name in sys_table_names {
435 if let Some(table) =
436 self.system_catalog
437 .table(catalog, schema, &table_name, query_ctx)
438 {
439 yield table;
440 }
441 }
442 });
443
444 const BATCH_SIZE: usize = 128;
445 const CONCURRENCY: usize = 8;
446
447 let (tx, rx) = tokio::sync::mpsc::channel(64);
448 let metadata_manager = self.table_metadata_manager.clone();
449 let catalog = catalog.to_string();
450 let schema = schema.to_string();
451 let semaphore = Arc::new(Semaphore::new(CONCURRENCY));
452 let table_route_cache: Result<TableRouteCacheRef> =
453 self.cache_registry.get().context(CacheNotFoundSnafu {
454 name: "table_route_cache",
455 });
456
457 common_runtime::spawn_global(async move {
458 let table_route_cache = match table_route_cache {
459 Ok(table_route_cache) => table_route_cache,
460 Err(e) => {
461 let _ = tx.send(Err(e)).await;
462 return;
463 }
464 };
465
466 let table_id_stream = metadata_manager
467 .table_name_manager()
468 .tables(&catalog, &schema)
469 .map_ok(|(_, v)| v.table_id());
470 let mut table_id_chunks = table_id_stream.ready_chunks(BATCH_SIZE);
472
473 while let Some(table_ids) = table_id_chunks.next().await {
474 let table_ids = match table_ids
475 .into_iter()
476 .collect::<std::result::Result<Vec<_>, _>>()
477 .map_err(BoxedError::new)
478 .context(ListTablesSnafu {
479 catalog: &catalog,
480 schema: &schema,
481 }) {
482 Ok(table_ids) => table_ids,
483 Err(e) => {
484 let _ = tx.send(Err(e)).await;
485 return;
486 }
487 };
488
489 let metadata_manager = metadata_manager.clone();
490 let tx = tx.clone();
491 let semaphore = semaphore.clone();
492 let table_route_cache = table_route_cache.clone();
493 common_runtime::spawn_global(async move {
494 let _ = semaphore.acquire().await;
496 let table_info_values = match metadata_manager
497 .table_info_manager()
498 .batch_get(&table_ids)
499 .await
500 .context(TableMetadataManagerSnafu)
501 {
502 Ok(table_info_values) => table_info_values,
503 Err(e) => {
504 let _ = tx.send(Err(e)).await;
505 return;
506 }
507 };
508
509 for table in table_info_values.into_values().map(build_table) {
510 let table = Self::override_logical_table_partition_key_indices(
511 &table_route_cache,
512 metadata_manager.table_info_manager(),
513 table,
514 )
515 .await;
516 if tx.send(table).await.is_err() {
517 return;
518 }
519 }
520 });
521 }
522 });
523
524 let user_tables = ReceiverStream::new(rx);
525 Box::pin(sys_tables.chain(user_tables))
526 }
527}
528
529fn build_table(table_info_value: TableInfoValue) -> TableRef {
530 let table_info = table_info_value.table_info;
531 DistTable::table(Arc::new(table_info))
532}
533
534#[derive(Clone)]
542pub(super) struct SystemCatalog {
543 pub(super) catalog_manager: Weak<KvBackendCatalogManager>,
544 pub(super) catalog_cache: Cache<String, Arc<InformationSchemaProvider>>,
545 pub(super) pg_catalog_cache: Cache<String, Arc<PGCatalogProvider>>,
546
547 pub(super) information_schema_provider: Arc<InformationSchemaProvider>,
549 pub(super) pg_catalog_provider: Arc<PGCatalogProvider>,
550 pub(super) numbers_table_provider: NumbersTableProvider,
551 pub(super) backend: KvBackendRef,
552 pub(super) process_manager: Option<ProcessManagerRef>,
553 pub(super) extra_information_table_factories:
554 std::collections::HashMap<String, InformationSchemaTableFactoryRef>,
555}
556
557impl SystemCatalog {
558 fn schema_names(&self, query_ctx: Option<&QueryContext>) -> Vec<String> {
559 let channel = query_ctx.map_or(Channel::Unknown, |ctx| ctx.channel());
560 match channel {
561 Channel::Postgres => vec![
563 INFORMATION_SCHEMA_NAME.to_string(),
564 PG_CATALOG_NAME.to_string(),
565 ],
566 _ => {
567 vec![INFORMATION_SCHEMA_NAME.to_string()]
568 }
569 }
570 }
571
572 fn table_names(&self, schema: &str, query_ctx: Option<&QueryContext>) -> Vec<String> {
573 let channel = query_ctx.map_or(Channel::Unknown, |ctx| ctx.channel());
574 match schema {
575 INFORMATION_SCHEMA_NAME => self.information_schema_provider.table_names(),
576 PG_CATALOG_NAME if channel == Channel::Postgres => {
577 self.pg_catalog_provider.table_names()
578 }
579 DEFAULT_SCHEMA_NAME => self.numbers_table_provider.table_names(),
580 _ => vec![],
581 }
582 }
583
584 fn schema_exists(&self, schema: &str, query_ctx: Option<&QueryContext>) -> bool {
585 let channel = query_ctx.map_or(Channel::Unknown, |ctx| ctx.channel());
586 match channel {
587 Channel::Postgres => schema == PG_CATALOG_NAME || schema == INFORMATION_SCHEMA_NAME,
588 _ => schema == INFORMATION_SCHEMA_NAME,
589 }
590 }
591
592 fn table_exists(&self, schema: &str, table: &str, query_ctx: Option<&QueryContext>) -> bool {
593 let channel = query_ctx.map_or(Channel::Unknown, |ctx| ctx.channel());
594 if schema == INFORMATION_SCHEMA_NAME {
595 self.information_schema_provider.table(table).is_some()
596 } else if schema == DEFAULT_SCHEMA_NAME {
597 self.numbers_table_provider.table_exists(table)
598 } else if schema == PG_CATALOG_NAME && channel == Channel::Postgres {
599 self.pg_catalog_provider.table(table).is_some()
600 } else {
601 false
602 }
603 }
604
605 fn table(
606 &self,
607 catalog: &str,
608 schema: &str,
609 table_name: &str,
610 query_ctx: Option<&QueryContext>,
611 ) -> Option<TableRef> {
612 let channel = query_ctx.map_or(Channel::Unknown, |ctx| ctx.channel());
613 if schema == INFORMATION_SCHEMA_NAME {
614 let information_schema_provider =
615 self.catalog_cache.get_with_by_ref(catalog, move || {
616 let provider = InformationSchemaProvider::new(
617 catalog.to_string(),
618 self.catalog_manager.clone(),
619 Arc::new(FlowMetadataManager::new(self.backend.clone())),
620 self.process_manager.clone(),
621 self.backend.clone(),
622 );
623 let provider = provider
624 .with_extra_table_factories(self.extra_information_table_factories.clone());
625 Arc::new(provider)
626 });
627 information_schema_provider.table(table_name)
628 } else if schema == PG_CATALOG_NAME && channel == Channel::Postgres {
629 if catalog == DEFAULT_CATALOG_NAME {
630 self.pg_catalog_provider.table(table_name)
631 } else {
632 let pg_catalog_provider =
633 self.pg_catalog_cache.get_with_by_ref(catalog, move || {
634 Arc::new(PGCatalogProvider::new(
635 catalog.to_string(),
636 self.catalog_manager.clone(),
637 ))
638 });
639 pg_catalog_provider.table(table_name)
640 }
641 } else if schema == DEFAULT_SCHEMA_NAME {
642 self.numbers_table_provider.table(table_name)
643 } else {
644 None
645 }
646 }
647}