1use std::any::Any;
16use std::collections::BTreeSet;
17use std::sync::{Arc, Weak};
18
19use async_stream::try_stream;
20use common_catalog::consts::{
21 DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, INFORMATION_SCHEMA_NAME, PG_CATALOG_NAME,
22};
23use common_error::ext::BoxedError;
24use common_meta::cache::{
25 LayeredCacheRegistryRef, TableInfoCacheRef, TableNameCacheRef, TableRoute, TableRouteCacheRef,
26 ViewInfoCacheRef,
27};
28use common_meta::key::TableMetadataManagerRef;
29use common_meta::key::catalog_name::CatalogNameKey;
30use common_meta::key::flow::FlowMetadataManager;
31use common_meta::key::schema_name::SchemaNameKey;
32use common_meta::key::table_info::{TableInfoManager, TableInfoValue};
33use common_meta::key::table_name::TableNameKey;
34use common_meta::kv_backend::KvBackendRef;
35use common_procedure::ProcedureManagerRef;
36use futures_util::stream::BoxStream;
37use futures_util::{StreamExt, TryStreamExt};
38use moka::sync::Cache;
39use partition::manager::PartitionRuleManagerRef;
40use session::context::{Channel, QueryContext};
41use snafu::prelude::*;
42use store_api::metric_engine_consts::METRIC_ENGINE_NAME;
43use table::TableRef;
44use table::dist_table::DistTable;
45use table::metadata::{TableId, TableInfoRef};
46use table::table::PartitionRules;
47use table::table_name::TableName;
48use tokio::sync::Semaphore;
49use tokio_stream::wrappers::ReceiverStream;
50
51use crate::CatalogManager;
52use crate::error::{
53 CacheNotFoundSnafu, GetTableCacheSnafu, InvalidTableInfoInCatalogSnafu, ListCatalogsSnafu,
54 ListSchemasSnafu, ListTablesSnafu, Result, TableMetadataManagerSnafu,
55};
56use crate::information_schema::{
57 InformationExtensionRef, InformationSchemaProvider, InformationSchemaTableFactoryRef,
58};
59use crate::kvbackend::TableCacheRef;
60use crate::process_manager::ProcessManagerRef;
61use crate::system_schema::SystemSchemaProvider;
62use crate::system_schema::numbers_table_provider::NumbersTableProvider;
63use crate::system_schema::pg_catalog::PGCatalogProvider;
64
65#[derive(Clone)]
71pub struct KvBackendCatalogManager {
72 pub(super) information_extension: InformationExtensionRef,
74 pub(super) partition_manager: PartitionRuleManagerRef,
76 pub(super) table_metadata_manager: TableMetadataManagerRef,
78 pub(super) system_catalog: SystemCatalog,
80 pub(super) cache_registry: LayeredCacheRegistryRef,
82 pub(super) procedure_manager: Option<ProcedureManagerRef>,
84}
85
86pub(super) const CATALOG_CACHE_MAX_CAPACITY: u64 = 128;
87
88impl KvBackendCatalogManager {
89 pub fn view_info_cache(&self) -> Result<ViewInfoCacheRef> {
90 self.cache_registry.get().context(CacheNotFoundSnafu {
91 name: "view_info_cache",
92 })
93 }
94
95 pub fn information_extension(&self) -> InformationExtensionRef {
97 self.information_extension.clone()
98 }
99
100 pub fn partition_manager(&self) -> PartitionRuleManagerRef {
101 self.partition_manager.clone()
102 }
103
104 pub fn table_metadata_manager_ref(&self) -> &TableMetadataManagerRef {
105 &self.table_metadata_manager
106 }
107
108 pub fn procedure_manager(&self) -> Option<ProcedureManagerRef> {
109 self.procedure_manager.clone()
110 }
111
112 async fn override_logical_table_partition_key_indices(
114 table_route_cache: &TableRouteCacheRef,
115 table_info_manager: &TableInfoManager,
116 table: TableRef,
117 ) -> Result<TableRef> {
118 if table.table_info().meta.engine != METRIC_ENGINE_NAME {
120 return Ok(table);
121 }
122
123 if let Some(table_route_value) = table_route_cache
124 .get(table.table_info().table_id())
125 .await
126 .context(TableMetadataManagerSnafu)?
127 && let TableRoute::Logical(logical_route) = &*table_route_value
128 && let Some(physical_table_info_value) = table_info_manager
129 .get(logical_route.physical_table_id())
130 .await
131 .context(TableMetadataManagerSnafu)?
132 {
133 let mut new_table_info = (*table.table_info()).clone();
134
135 let mut phy_part_cols_not_in_logical_table = vec![];
136
137 new_table_info.meta.partition_key_indices = physical_table_info_value
139 .table_info
140 .meta
141 .partition_key_indices
142 .iter()
143 .filter_map(|&physical_index| {
144 physical_table_info_value
146 .table_info
147 .meta
148 .schema
149 .column_schemas
150 .get(physical_index)
151 .and_then(|physical_column| {
152 let idx = new_table_info
154 .meta
155 .schema
156 .column_index_by_name(physical_column.name.as_str());
157 if idx.is_none() {
158 phy_part_cols_not_in_logical_table
160 .push(physical_column.name.clone());
161 }
162
163 idx
164 })
165 })
166 .collect();
167
168 let partition_rules = if !phy_part_cols_not_in_logical_table.is_empty() {
169 Some(PartitionRules {
170 extra_phy_cols_not_in_logical_table: phy_part_cols_not_in_logical_table,
171 })
172 } else {
173 None
174 };
175
176 let new_table = DistTable::table_partitioned(Arc::new(new_table_info), partition_rules);
177
178 return Ok(new_table);
179 }
180
181 Ok(table)
182 }
183}
184
185#[async_trait::async_trait]
186impl CatalogManager for KvBackendCatalogManager {
187 fn as_any(&self) -> &dyn Any {
188 self
189 }
190
191 async fn catalog_names(&self) -> Result<Vec<String>> {
192 let stream = self
193 .table_metadata_manager
194 .catalog_manager()
195 .catalog_names();
196
197 let keys = stream
198 .try_collect::<Vec<_>>()
199 .await
200 .map_err(BoxedError::new)
201 .context(ListCatalogsSnafu)?;
202
203 Ok(keys)
204 }
205
206 async fn schema_names(
207 &self,
208 catalog: &str,
209 query_ctx: Option<&QueryContext>,
210 ) -> Result<Vec<String>> {
211 let stream = self
212 .table_metadata_manager
213 .schema_manager()
214 .schema_names(catalog);
215 let mut keys = stream
216 .try_collect::<BTreeSet<_>>()
217 .await
218 .map_err(BoxedError::new)
219 .context(ListSchemasSnafu { catalog })?;
220
221 keys.extend(self.system_catalog.schema_names(query_ctx));
222
223 Ok(keys.into_iter().collect())
224 }
225
226 async fn table_names(
227 &self,
228 catalog: &str,
229 schema: &str,
230 query_ctx: Option<&QueryContext>,
231 ) -> Result<Vec<String>> {
232 let mut tables = self
233 .table_metadata_manager
234 .table_name_manager()
235 .tables(catalog, schema)
236 .map_ok(|(table_name, _)| table_name)
237 .try_collect::<Vec<_>>()
238 .await
239 .map_err(BoxedError::new)
240 .context(ListTablesSnafu { catalog, schema })?;
241
242 tables.extend(self.system_catalog.table_names(schema, query_ctx));
243 Ok(tables)
244 }
245
246 async fn catalog_exists(&self, catalog: &str) -> Result<bool> {
247 self.table_metadata_manager
248 .catalog_manager()
249 .exists(CatalogNameKey::new(catalog))
250 .await
251 .context(TableMetadataManagerSnafu)
252 }
253
254 async fn schema_exists(
255 &self,
256 catalog: &str,
257 schema: &str,
258 query_ctx: Option<&QueryContext>,
259 ) -> Result<bool> {
260 if self.system_catalog.schema_exists(schema, query_ctx) {
261 return Ok(true);
262 }
263
264 self.table_metadata_manager
265 .schema_manager()
266 .exists(SchemaNameKey::new(catalog, schema))
267 .await
268 .context(TableMetadataManagerSnafu)
269 }
270
271 async fn table_exists(
272 &self,
273 catalog: &str,
274 schema: &str,
275 table: &str,
276 query_ctx: Option<&QueryContext>,
277 ) -> Result<bool> {
278 if self.system_catalog.table_exists(schema, table, query_ctx) {
279 return Ok(true);
280 }
281
282 let key = TableNameKey::new(catalog, schema, table);
283 self.table_metadata_manager
284 .table_name_manager()
285 .get(key)
286 .await
287 .context(TableMetadataManagerSnafu)
288 .map(|x| x.is_some())
289 }
290
291 async fn table(
292 &self,
293 catalog_name: &str,
294 schema_name: &str,
295 table_name: &str,
296 query_ctx: Option<&QueryContext>,
297 ) -> Result<Option<TableRef>> {
298 let channel = query_ctx.map_or(Channel::Unknown, |ctx| ctx.channel());
299 if let Some(table) =
300 self.system_catalog
301 .table(catalog_name, schema_name, table_name, query_ctx)
302 {
303 return Ok(Some(table));
304 }
305
306 let table_cache: TableCacheRef = self.cache_registry.get().context(CacheNotFoundSnafu {
307 name: "table_cache",
308 })?;
309
310 let table = table_cache
311 .get_by_ref(&TableName {
312 catalog_name: catalog_name.to_string(),
313 schema_name: schema_name.to_string(),
314 table_name: table_name.to_string(),
315 })
316 .await
317 .context(GetTableCacheSnafu)?;
318
319 if let Some(table) = table {
320 let table_route_cache: TableRouteCacheRef =
321 self.cache_registry.get().context(CacheNotFoundSnafu {
322 name: "table_route_cache",
323 })?;
324 return Self::override_logical_table_partition_key_indices(
325 &table_route_cache,
326 self.table_metadata_manager.table_info_manager(),
327 table,
328 )
329 .await
330 .map(Some);
331 }
332
333 if channel == Channel::Postgres {
334 if let Some(table) =
336 self.system_catalog
337 .table(catalog_name, PG_CATALOG_NAME, table_name, query_ctx)
338 {
339 return Ok(Some(table));
340 }
341 }
342
343 Ok(None)
344 }
345
346 async fn table_id(
347 &self,
348 catalog_name: &str,
349 schema_name: &str,
350 table_name: &str,
351 query_ctx: Option<&QueryContext>,
352 ) -> Result<Option<TableId>> {
353 let channel = query_ctx.map_or(Channel::Unknown, |ctx| ctx.channel());
354 if let Some(table) =
355 self.system_catalog
356 .table(catalog_name, schema_name, table_name, query_ctx)
357 {
358 return Ok(Some(table.table_info().table_id()));
359 }
360
361 let table_cache: TableNameCacheRef =
362 self.cache_registry.get().context(CacheNotFoundSnafu {
363 name: "table_name_cache",
364 })?;
365
366 let table = table_cache
367 .get_by_ref(&TableName {
368 catalog_name: catalog_name.to_string(),
369 schema_name: schema_name.to_string(),
370 table_name: table_name.to_string(),
371 })
372 .await
373 .context(GetTableCacheSnafu)?;
374
375 if let Some(table) = table {
376 return Ok(Some(table));
377 }
378
379 if channel == Channel::Postgres {
380 if let Some(table) =
382 self.system_catalog
383 .table(catalog_name, PG_CATALOG_NAME, table_name, query_ctx)
384 {
385 return Ok(Some(table.table_info().table_id()));
386 }
387 }
388
389 Ok(None)
390 }
391
392 async fn table_info_by_id(&self, table_id: TableId) -> Result<Option<TableInfoRef>> {
393 let table_info_cache: TableInfoCacheRef =
394 self.cache_registry.get().context(CacheNotFoundSnafu {
395 name: "table_info_cache",
396 })?;
397 table_info_cache
398 .get_by_ref(&table_id)
399 .await
400 .context(GetTableCacheSnafu)
401 }
402
403 async fn tables_by_ids(
404 &self,
405 catalog: &str,
406 schema: &str,
407 table_ids: &[TableId],
408 ) -> Result<Vec<TableRef>> {
409 let table_info_values = self
410 .table_metadata_manager
411 .table_info_manager()
412 .batch_get(table_ids)
413 .await
414 .context(TableMetadataManagerSnafu)?;
415
416 let tables = table_info_values
417 .into_values()
418 .filter(|t| t.table_info.catalog_name == catalog && t.table_info.schema_name == schema)
419 .map(build_table)
420 .collect::<Result<Vec<_>>>()?;
421
422 Ok(tables)
423 }
424
425 fn tables<'a>(
426 &'a self,
427 catalog: &'a str,
428 schema: &'a str,
429 query_ctx: Option<&'a QueryContext>,
430 ) -> BoxStream<'a, Result<TableRef>> {
431 let sys_tables = try_stream!({
432 let sys_table_names = self.system_catalog.table_names(schema, query_ctx);
434 for table_name in sys_table_names {
435 if let Some(table) =
436 self.system_catalog
437 .table(catalog, schema, &table_name, query_ctx)
438 {
439 yield table;
440 }
441 }
442 });
443
444 const BATCH_SIZE: usize = 128;
445 const CONCURRENCY: usize = 8;
446
447 let (tx, rx) = tokio::sync::mpsc::channel(64);
448 let metadata_manager = self.table_metadata_manager.clone();
449 let catalog = catalog.to_string();
450 let schema = schema.to_string();
451 let semaphore = Arc::new(Semaphore::new(CONCURRENCY));
452 let table_route_cache: Result<TableRouteCacheRef> =
453 self.cache_registry.get().context(CacheNotFoundSnafu {
454 name: "table_route_cache",
455 });
456
457 common_runtime::spawn_global(async move {
458 let table_route_cache = match table_route_cache {
459 Ok(table_route_cache) => table_route_cache,
460 Err(e) => {
461 let _ = tx.send(Err(e)).await;
462 return;
463 }
464 };
465
466 let table_id_stream = metadata_manager
467 .table_name_manager()
468 .tables(&catalog, &schema)
469 .map_ok(|(_, v)| v.table_id());
470 let mut table_id_chunks = table_id_stream.ready_chunks(BATCH_SIZE);
472
473 while let Some(table_ids) = table_id_chunks.next().await {
474 let table_ids = match table_ids
475 .into_iter()
476 .collect::<std::result::Result<Vec<_>, _>>()
477 .map_err(BoxedError::new)
478 .context(ListTablesSnafu {
479 catalog: &catalog,
480 schema: &schema,
481 }) {
482 Ok(table_ids) => table_ids,
483 Err(e) => {
484 let _ = tx.send(Err(e)).await;
485 return;
486 }
487 };
488
489 let metadata_manager = metadata_manager.clone();
490 let tx = tx.clone();
491 let semaphore = semaphore.clone();
492 let table_route_cache = table_route_cache.clone();
493 common_runtime::spawn_global(async move {
494 let _ = semaphore.acquire().await;
496 let table_info_values = match metadata_manager
497 .table_info_manager()
498 .batch_get(&table_ids)
499 .await
500 .context(TableMetadataManagerSnafu)
501 {
502 Ok(table_info_values) => table_info_values,
503 Err(e) => {
504 let _ = tx.send(Err(e)).await;
505 return;
506 }
507 };
508
509 for table in table_info_values.into_values().map(build_table) {
510 let table = if let Ok(table) = table {
511 Self::override_logical_table_partition_key_indices(
512 &table_route_cache,
513 metadata_manager.table_info_manager(),
514 table,
515 )
516 .await
517 } else {
518 table
519 };
520 if tx.send(table).await.is_err() {
521 return;
522 }
523 }
524 });
525 }
526 });
527
528 let user_tables = ReceiverStream::new(rx);
529 Box::pin(sys_tables.chain(user_tables))
530 }
531}
532
533fn build_table(table_info_value: TableInfoValue) -> Result<TableRef> {
534 let table_info = table_info_value
535 .table_info
536 .try_into()
537 .context(InvalidTableInfoInCatalogSnafu)?;
538 Ok(DistTable::table(Arc::new(table_info)))
539}
540
541#[derive(Clone)]
549pub(super) struct SystemCatalog {
550 pub(super) catalog_manager: Weak<KvBackendCatalogManager>,
551 pub(super) catalog_cache: Cache<String, Arc<InformationSchemaProvider>>,
552 pub(super) pg_catalog_cache: Cache<String, Arc<PGCatalogProvider>>,
553
554 pub(super) information_schema_provider: Arc<InformationSchemaProvider>,
556 pub(super) pg_catalog_provider: Arc<PGCatalogProvider>,
557 pub(super) numbers_table_provider: NumbersTableProvider,
558 pub(super) backend: KvBackendRef,
559 pub(super) process_manager: Option<ProcessManagerRef>,
560 pub(super) extra_information_table_factories:
561 std::collections::HashMap<String, InformationSchemaTableFactoryRef>,
562}
563
564impl SystemCatalog {
565 fn schema_names(&self, query_ctx: Option<&QueryContext>) -> Vec<String> {
566 let channel = query_ctx.map_or(Channel::Unknown, |ctx| ctx.channel());
567 match channel {
568 Channel::Postgres => vec![
570 INFORMATION_SCHEMA_NAME.to_string(),
571 PG_CATALOG_NAME.to_string(),
572 ],
573 _ => {
574 vec![INFORMATION_SCHEMA_NAME.to_string()]
575 }
576 }
577 }
578
579 fn table_names(&self, schema: &str, query_ctx: Option<&QueryContext>) -> Vec<String> {
580 let channel = query_ctx.map_or(Channel::Unknown, |ctx| ctx.channel());
581 match schema {
582 INFORMATION_SCHEMA_NAME => self.information_schema_provider.table_names(),
583 PG_CATALOG_NAME if channel == Channel::Postgres => {
584 self.pg_catalog_provider.table_names()
585 }
586 DEFAULT_SCHEMA_NAME => self.numbers_table_provider.table_names(),
587 _ => vec![],
588 }
589 }
590
591 fn schema_exists(&self, schema: &str, query_ctx: Option<&QueryContext>) -> bool {
592 let channel = query_ctx.map_or(Channel::Unknown, |ctx| ctx.channel());
593 match channel {
594 Channel::Postgres => schema == PG_CATALOG_NAME || schema == INFORMATION_SCHEMA_NAME,
595 _ => schema == INFORMATION_SCHEMA_NAME,
596 }
597 }
598
599 fn table_exists(&self, schema: &str, table: &str, query_ctx: Option<&QueryContext>) -> bool {
600 let channel = query_ctx.map_or(Channel::Unknown, |ctx| ctx.channel());
601 if schema == INFORMATION_SCHEMA_NAME {
602 self.information_schema_provider.table(table).is_some()
603 } else if schema == DEFAULT_SCHEMA_NAME {
604 self.numbers_table_provider.table_exists(table)
605 } else if schema == PG_CATALOG_NAME && channel == Channel::Postgres {
606 self.pg_catalog_provider.table(table).is_some()
607 } else {
608 false
609 }
610 }
611
612 fn table(
613 &self,
614 catalog: &str,
615 schema: &str,
616 table_name: &str,
617 query_ctx: Option<&QueryContext>,
618 ) -> Option<TableRef> {
619 let channel = query_ctx.map_or(Channel::Unknown, |ctx| ctx.channel());
620 if schema == INFORMATION_SCHEMA_NAME {
621 let information_schema_provider =
622 self.catalog_cache.get_with_by_ref(catalog, move || {
623 let provider = InformationSchemaProvider::new(
624 catalog.to_string(),
625 self.catalog_manager.clone(),
626 Arc::new(FlowMetadataManager::new(self.backend.clone())),
627 self.process_manager.clone(),
628 self.backend.clone(),
629 );
630 let provider = provider
631 .with_extra_table_factories(self.extra_information_table_factories.clone());
632 Arc::new(provider)
633 });
634 information_schema_provider.table(table_name)
635 } else if schema == PG_CATALOG_NAME && channel == Channel::Postgres {
636 if catalog == DEFAULT_CATALOG_NAME {
637 self.pg_catalog_provider.table(table_name)
638 } else {
639 let pg_catalog_provider =
640 self.pg_catalog_cache.get_with_by_ref(catalog, move || {
641 Arc::new(PGCatalogProvider::new(
642 catalog.to_string(),
643 self.catalog_manager.clone(),
644 ))
645 });
646 pg_catalog_provider.table(table_name)
647 }
648 } else if schema == DEFAULT_SCHEMA_NAME {
649 self.numbers_table_provider.table(table_name)
650 } else {
651 None
652 }
653 }
654}