1use std::any::Any;
16use std::collections::BTreeSet;
17use std::sync::{Arc, Weak};
18
19use async_stream::try_stream;
20use common_catalog::consts::{
21 DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, INFORMATION_SCHEMA_NAME, NUMBERS_TABLE_ID,
22 PG_CATALOG_NAME,
23};
24use common_error::ext::BoxedError;
25use common_meta::cache::{
26 LayeredCacheRegistryRef, TableRoute, TableRouteCacheRef, ViewInfoCacheRef,
27};
28use common_meta::key::catalog_name::CatalogNameKey;
29use common_meta::key::flow::FlowMetadataManager;
30use common_meta::key::schema_name::SchemaNameKey;
31use common_meta::key::table_info::{TableInfoManager, TableInfoValue};
32use common_meta::key::table_name::TableNameKey;
33use common_meta::key::TableMetadataManagerRef;
34use common_meta::kv_backend::KvBackendRef;
35use common_procedure::ProcedureManagerRef;
36use futures_util::stream::BoxStream;
37use futures_util::{StreamExt, TryStreamExt};
38use moka::sync::Cache;
39use partition::manager::PartitionRuleManagerRef;
40use session::context::{Channel, QueryContext};
41use snafu::prelude::*;
42use store_api::metric_engine_consts::METRIC_ENGINE_NAME;
43use table::dist_table::DistTable;
44use table::metadata::TableId;
45use table::table::numbers::{NumbersTable, NUMBERS_TABLE_NAME};
46use table::table_name::TableName;
47use table::TableRef;
48use tokio::sync::Semaphore;
49use tokio_stream::wrappers::ReceiverStream;
50
51use crate::error::{
52 CacheNotFoundSnafu, GetTableCacheSnafu, InvalidTableInfoInCatalogSnafu, ListCatalogsSnafu,
53 ListSchemasSnafu, ListTablesSnafu, Result, TableMetadataManagerSnafu,
54};
55#[cfg(feature = "enterprise")]
56use crate::information_schema::InformationSchemaTableFactoryRef;
57use crate::information_schema::{InformationExtensionRef, InformationSchemaProvider};
58use crate::kvbackend::TableCacheRef;
59use crate::process_manager::ProcessManagerRef;
60use crate::system_schema::pg_catalog::PGCatalogProvider;
61use crate::system_schema::SystemSchemaProvider;
62use crate::CatalogManager;
63
64#[derive(Clone)]
70pub struct KvBackendCatalogManager {
71 pub(super) information_extension: InformationExtensionRef,
73 pub(super) partition_manager: PartitionRuleManagerRef,
75 pub(super) table_metadata_manager: TableMetadataManagerRef,
77 pub(super) system_catalog: SystemCatalog,
79 pub(super) cache_registry: LayeredCacheRegistryRef,
81 pub(super) procedure_manager: Option<ProcedureManagerRef>,
83}
84
85pub(super) const CATALOG_CACHE_MAX_CAPACITY: u64 = 128;
86
87impl KvBackendCatalogManager {
88 pub fn view_info_cache(&self) -> Result<ViewInfoCacheRef> {
89 self.cache_registry.get().context(CacheNotFoundSnafu {
90 name: "view_info_cache",
91 })
92 }
93
94 pub fn information_extension(&self) -> InformationExtensionRef {
96 self.information_extension.clone()
97 }
98
99 pub fn partition_manager(&self) -> PartitionRuleManagerRef {
100 self.partition_manager.clone()
101 }
102
103 pub fn table_metadata_manager_ref(&self) -> &TableMetadataManagerRef {
104 &self.table_metadata_manager
105 }
106
107 pub fn procedure_manager(&self) -> Option<ProcedureManagerRef> {
108 self.procedure_manager.clone()
109 }
110
111 async fn override_logical_table_partition_key_indices(
113 table_route_cache: &TableRouteCacheRef,
114 table_info_manager: &TableInfoManager,
115 table: TableRef,
116 ) -> Result<TableRef> {
117 if table.table_info().meta.engine != METRIC_ENGINE_NAME {
119 return Ok(table);
120 }
121
122 if let Some(table_route_value) = table_route_cache
123 .get(table.table_info().table_id())
124 .await
125 .context(TableMetadataManagerSnafu)?
126 && let TableRoute::Logical(logical_route) = &*table_route_value
127 && let Some(physical_table_info_value) = table_info_manager
128 .get(logical_route.physical_table_id())
129 .await
130 .context(TableMetadataManagerSnafu)?
131 {
132 let mut new_table_info = (*table.table_info()).clone();
133
134 new_table_info.meta.partition_key_indices = physical_table_info_value
136 .table_info
137 .meta
138 .partition_key_indices
139 .iter()
140 .filter_map(|&physical_index| {
141 physical_table_info_value
143 .table_info
144 .meta
145 .schema
146 .column_schemas
147 .get(physical_index)
148 .and_then(|physical_column| {
149 new_table_info
151 .meta
152 .schema
153 .column_index_by_name(physical_column.name.as_str())
154 })
155 })
156 .collect();
157
158 let new_table = DistTable::table(Arc::new(new_table_info));
159
160 return Ok(new_table);
161 }
162
163 Ok(table)
164 }
165}
166
167#[async_trait::async_trait]
168impl CatalogManager for KvBackendCatalogManager {
169 fn as_any(&self) -> &dyn Any {
170 self
171 }
172
173 async fn catalog_names(&self) -> Result<Vec<String>> {
174 let stream = self
175 .table_metadata_manager
176 .catalog_manager()
177 .catalog_names();
178
179 let keys = stream
180 .try_collect::<Vec<_>>()
181 .await
182 .map_err(BoxedError::new)
183 .context(ListCatalogsSnafu)?;
184
185 Ok(keys)
186 }
187
188 async fn schema_names(
189 &self,
190 catalog: &str,
191 query_ctx: Option<&QueryContext>,
192 ) -> Result<Vec<String>> {
193 let stream = self
194 .table_metadata_manager
195 .schema_manager()
196 .schema_names(catalog);
197 let mut keys = stream
198 .try_collect::<BTreeSet<_>>()
199 .await
200 .map_err(BoxedError::new)
201 .context(ListSchemasSnafu { catalog })?;
202
203 keys.extend(self.system_catalog.schema_names(query_ctx));
204
205 Ok(keys.into_iter().collect())
206 }
207
208 async fn table_names(
209 &self,
210 catalog: &str,
211 schema: &str,
212 query_ctx: Option<&QueryContext>,
213 ) -> Result<Vec<String>> {
214 let mut tables = self
215 .table_metadata_manager
216 .table_name_manager()
217 .tables(catalog, schema)
218 .map_ok(|(table_name, _)| table_name)
219 .try_collect::<Vec<_>>()
220 .await
221 .map_err(BoxedError::new)
222 .context(ListTablesSnafu { catalog, schema })?;
223
224 tables.extend(self.system_catalog.table_names(schema, query_ctx));
225 Ok(tables)
226 }
227
228 async fn catalog_exists(&self, catalog: &str) -> Result<bool> {
229 self.table_metadata_manager
230 .catalog_manager()
231 .exists(CatalogNameKey::new(catalog))
232 .await
233 .context(TableMetadataManagerSnafu)
234 }
235
236 async fn schema_exists(
237 &self,
238 catalog: &str,
239 schema: &str,
240 query_ctx: Option<&QueryContext>,
241 ) -> Result<bool> {
242 if self.system_catalog.schema_exists(schema, query_ctx) {
243 return Ok(true);
244 }
245
246 self.table_metadata_manager
247 .schema_manager()
248 .exists(SchemaNameKey::new(catalog, schema))
249 .await
250 .context(TableMetadataManagerSnafu)
251 }
252
253 async fn table_exists(
254 &self,
255 catalog: &str,
256 schema: &str,
257 table: &str,
258 query_ctx: Option<&QueryContext>,
259 ) -> Result<bool> {
260 if self.system_catalog.table_exists(schema, table, query_ctx) {
261 return Ok(true);
262 }
263
264 let key = TableNameKey::new(catalog, schema, table);
265 self.table_metadata_manager
266 .table_name_manager()
267 .get(key)
268 .await
269 .context(TableMetadataManagerSnafu)
270 .map(|x| x.is_some())
271 }
272
273 async fn table(
274 &self,
275 catalog_name: &str,
276 schema_name: &str,
277 table_name: &str,
278 query_ctx: Option<&QueryContext>,
279 ) -> Result<Option<TableRef>> {
280 let channel = query_ctx.map_or(Channel::Unknown, |ctx| ctx.channel());
281 if let Some(table) =
282 self.system_catalog
283 .table(catalog_name, schema_name, table_name, query_ctx)
284 {
285 return Ok(Some(table));
286 }
287
288 let table_cache: TableCacheRef = self.cache_registry.get().context(CacheNotFoundSnafu {
289 name: "table_cache",
290 })?;
291
292 let table = table_cache
293 .get_by_ref(&TableName {
294 catalog_name: catalog_name.to_string(),
295 schema_name: schema_name.to_string(),
296 table_name: table_name.to_string(),
297 })
298 .await
299 .context(GetTableCacheSnafu)?;
300
301 if let Some(table) = table {
302 let table_route_cache: TableRouteCacheRef =
303 self.cache_registry.get().context(CacheNotFoundSnafu {
304 name: "table_route_cache",
305 })?;
306 return Self::override_logical_table_partition_key_indices(
307 &table_route_cache,
308 self.table_metadata_manager.table_info_manager(),
309 table,
310 )
311 .await
312 .map(Some);
313 }
314
315 if channel == Channel::Postgres {
316 if let Some(table) =
318 self.system_catalog
319 .table(catalog_name, PG_CATALOG_NAME, table_name, query_ctx)
320 {
321 return Ok(Some(table));
322 }
323 }
324
325 Ok(None)
326 }
327
328 async fn tables_by_ids(
329 &self,
330 catalog: &str,
331 schema: &str,
332 table_ids: &[TableId],
333 ) -> Result<Vec<TableRef>> {
334 let table_info_values = self
335 .table_metadata_manager
336 .table_info_manager()
337 .batch_get(table_ids)
338 .await
339 .context(TableMetadataManagerSnafu)?;
340
341 let tables = table_info_values
342 .into_values()
343 .filter(|t| t.table_info.catalog_name == catalog && t.table_info.schema_name == schema)
344 .map(build_table)
345 .collect::<Result<Vec<_>>>()?;
346
347 Ok(tables)
348 }
349
350 fn tables<'a>(
351 &'a self,
352 catalog: &'a str,
353 schema: &'a str,
354 query_ctx: Option<&'a QueryContext>,
355 ) -> BoxStream<'a, Result<TableRef>> {
356 let sys_tables = try_stream!({
357 let sys_table_names = self.system_catalog.table_names(schema, query_ctx);
359 for table_name in sys_table_names {
360 if let Some(table) =
361 self.system_catalog
362 .table(catalog, schema, &table_name, query_ctx)
363 {
364 yield table;
365 }
366 }
367 });
368
369 const BATCH_SIZE: usize = 128;
370 const CONCURRENCY: usize = 8;
371
372 let (tx, rx) = tokio::sync::mpsc::channel(64);
373 let metadata_manager = self.table_metadata_manager.clone();
374 let catalog = catalog.to_string();
375 let schema = schema.to_string();
376 let semaphore = Arc::new(Semaphore::new(CONCURRENCY));
377 let table_route_cache: Result<TableRouteCacheRef> =
378 self.cache_registry.get().context(CacheNotFoundSnafu {
379 name: "table_route_cache",
380 });
381
382 common_runtime::spawn_global(async move {
383 let table_route_cache = match table_route_cache {
384 Ok(table_route_cache) => table_route_cache,
385 Err(e) => {
386 let _ = tx.send(Err(e)).await;
387 return;
388 }
389 };
390
391 let table_id_stream = metadata_manager
392 .table_name_manager()
393 .tables(&catalog, &schema)
394 .map_ok(|(_, v)| v.table_id());
395 let mut table_id_chunks = table_id_stream.ready_chunks(BATCH_SIZE);
397
398 while let Some(table_ids) = table_id_chunks.next().await {
399 let table_ids = match table_ids
400 .into_iter()
401 .collect::<std::result::Result<Vec<_>, _>>()
402 .map_err(BoxedError::new)
403 .context(ListTablesSnafu {
404 catalog: &catalog,
405 schema: &schema,
406 }) {
407 Ok(table_ids) => table_ids,
408 Err(e) => {
409 let _ = tx.send(Err(e)).await;
410 return;
411 }
412 };
413
414 let metadata_manager = metadata_manager.clone();
415 let tx = tx.clone();
416 let semaphore = semaphore.clone();
417 let table_route_cache = table_route_cache.clone();
418 common_runtime::spawn_global(async move {
419 let _ = semaphore.acquire().await;
421 let table_info_values = match metadata_manager
422 .table_info_manager()
423 .batch_get(&table_ids)
424 .await
425 .context(TableMetadataManagerSnafu)
426 {
427 Ok(table_info_values) => table_info_values,
428 Err(e) => {
429 let _ = tx.send(Err(e)).await;
430 return;
431 }
432 };
433
434 for table in table_info_values.into_values().map(build_table) {
435 let table = if let Ok(table) = table {
436 Self::override_logical_table_partition_key_indices(
437 &table_route_cache,
438 metadata_manager.table_info_manager(),
439 table,
440 )
441 .await
442 } else {
443 table
444 };
445 if tx.send(table).await.is_err() {
446 return;
447 }
448 }
449 });
450 }
451 });
452
453 let user_tables = ReceiverStream::new(rx);
454 Box::pin(sys_tables.chain(user_tables))
455 }
456}
457
458fn build_table(table_info_value: TableInfoValue) -> Result<TableRef> {
459 let table_info = table_info_value
460 .table_info
461 .try_into()
462 .context(InvalidTableInfoInCatalogSnafu)?;
463 Ok(DistTable::table(Arc::new(table_info)))
464}
465
466#[derive(Clone)]
474pub(super) struct SystemCatalog {
475 pub(super) catalog_manager: Weak<KvBackendCatalogManager>,
476 pub(super) catalog_cache: Cache<String, Arc<InformationSchemaProvider>>,
477 pub(super) pg_catalog_cache: Cache<String, Arc<PGCatalogProvider>>,
478
479 pub(super) information_schema_provider: Arc<InformationSchemaProvider>,
481 pub(super) pg_catalog_provider: Arc<PGCatalogProvider>,
482 pub(super) backend: KvBackendRef,
483 pub(super) process_manager: Option<ProcessManagerRef>,
484 #[cfg(feature = "enterprise")]
485 pub(super) extra_information_table_factories:
486 std::collections::HashMap<String, InformationSchemaTableFactoryRef>,
487}
488
489impl SystemCatalog {
490 fn schema_names(&self, query_ctx: Option<&QueryContext>) -> Vec<String> {
491 let channel = query_ctx.map_or(Channel::Unknown, |ctx| ctx.channel());
492 match channel {
493 Channel::Postgres => vec![
495 INFORMATION_SCHEMA_NAME.to_string(),
496 PG_CATALOG_NAME.to_string(),
497 ],
498 _ => {
499 vec![INFORMATION_SCHEMA_NAME.to_string()]
500 }
501 }
502 }
503
504 fn table_names(&self, schema: &str, query_ctx: Option<&QueryContext>) -> Vec<String> {
505 let channel = query_ctx.map_or(Channel::Unknown, |ctx| ctx.channel());
506 match schema {
507 INFORMATION_SCHEMA_NAME => self.information_schema_provider.table_names(),
508 PG_CATALOG_NAME if channel == Channel::Postgres => {
509 self.pg_catalog_provider.table_names()
510 }
511 DEFAULT_SCHEMA_NAME => {
512 vec![NUMBERS_TABLE_NAME.to_string()]
513 }
514 _ => vec![],
515 }
516 }
517
518 fn schema_exists(&self, schema: &str, query_ctx: Option<&QueryContext>) -> bool {
519 let channel = query_ctx.map_or(Channel::Unknown, |ctx| ctx.channel());
520 match channel {
521 Channel::Postgres => schema == PG_CATALOG_NAME || schema == INFORMATION_SCHEMA_NAME,
522 _ => schema == INFORMATION_SCHEMA_NAME,
523 }
524 }
525
526 fn table_exists(&self, schema: &str, table: &str, query_ctx: Option<&QueryContext>) -> bool {
527 let channel = query_ctx.map_or(Channel::Unknown, |ctx| ctx.channel());
528 if schema == INFORMATION_SCHEMA_NAME {
529 self.information_schema_provider.table(table).is_some()
530 } else if schema == DEFAULT_SCHEMA_NAME {
531 table == NUMBERS_TABLE_NAME
532 } else if schema == PG_CATALOG_NAME && channel == Channel::Postgres {
533 self.pg_catalog_provider.table(table).is_some()
534 } else {
535 false
536 }
537 }
538
539 fn table(
540 &self,
541 catalog: &str,
542 schema: &str,
543 table_name: &str,
544 query_ctx: Option<&QueryContext>,
545 ) -> Option<TableRef> {
546 let channel = query_ctx.map_or(Channel::Unknown, |ctx| ctx.channel());
547 if schema == INFORMATION_SCHEMA_NAME {
548 let information_schema_provider =
549 self.catalog_cache.get_with_by_ref(catalog, move || {
550 let provider = InformationSchemaProvider::new(
551 catalog.to_string(),
552 self.catalog_manager.clone(),
553 Arc::new(FlowMetadataManager::new(self.backend.clone())),
554 self.process_manager.clone(),
555 self.backend.clone(),
556 );
557 #[cfg(feature = "enterprise")]
558 let provider = provider
559 .with_extra_table_factories(self.extra_information_table_factories.clone());
560 Arc::new(provider)
561 });
562 information_schema_provider.table(table_name)
563 } else if schema == PG_CATALOG_NAME && channel == Channel::Postgres {
564 if catalog == DEFAULT_CATALOG_NAME {
565 self.pg_catalog_provider.table(table_name)
566 } else {
567 let pg_catalog_provider =
568 self.pg_catalog_cache.get_with_by_ref(catalog, move || {
569 Arc::new(PGCatalogProvider::new(
570 catalog.to_string(),
571 self.catalog_manager.clone(),
572 ))
573 });
574 pg_catalog_provider.table(table_name)
575 }
576 } else if schema == DEFAULT_SCHEMA_NAME && table_name == NUMBERS_TABLE_NAME {
577 Some(NumbersTable::table(NUMBERS_TABLE_ID))
578 } else {
579 None
580 }
581 }
582}