1use std::any::Any;
16use std::collections::BTreeSet;
17use std::sync::{Arc, Weak};
18
19use async_stream::try_stream;
20use common_catalog::consts::{
21 DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, INFORMATION_SCHEMA_NAME, NUMBERS_TABLE_ID,
22 PG_CATALOG_NAME,
23};
24use common_error::ext::BoxedError;
25use common_meta::cache::{LayeredCacheRegistryRef, ViewInfoCacheRef};
26use common_meta::key::catalog_name::CatalogNameKey;
27use common_meta::key::flow::FlowMetadataManager;
28use common_meta::key::schema_name::SchemaNameKey;
29use common_meta::key::table_info::TableInfoValue;
30use common_meta::key::table_name::TableNameKey;
31use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
32use common_meta::kv_backend::KvBackendRef;
33use common_procedure::ProcedureManagerRef;
34use futures_util::stream::BoxStream;
35use futures_util::{StreamExt, TryStreamExt};
36use moka::sync::Cache;
37use partition::manager::{PartitionRuleManager, PartitionRuleManagerRef};
38use session::context::{Channel, QueryContext};
39use snafu::prelude::*;
40use table::dist_table::DistTable;
41use table::metadata::TableId;
42use table::table::numbers::{NumbersTable, NUMBERS_TABLE_NAME};
43use table::table_name::TableName;
44use table::TableRef;
45use tokio::sync::Semaphore;
46use tokio_stream::wrappers::ReceiverStream;
47
48use crate::error::{
49 CacheNotFoundSnafu, GetTableCacheSnafu, InvalidTableInfoInCatalogSnafu, ListCatalogsSnafu,
50 ListSchemasSnafu, ListTablesSnafu, Result, TableMetadataManagerSnafu,
51};
52use crate::information_schema::{InformationExtensionRef, InformationSchemaProvider};
53use crate::kvbackend::TableCacheRef;
54use crate::system_schema::pg_catalog::PGCatalogProvider;
55use crate::system_schema::SystemSchemaProvider;
56use crate::CatalogManager;
57
58#[derive(Clone)]
64pub struct KvBackendCatalogManager {
65 information_extension: InformationExtensionRef,
67 partition_manager: PartitionRuleManagerRef,
69 table_metadata_manager: TableMetadataManagerRef,
71 system_catalog: SystemCatalog,
73 cache_registry: LayeredCacheRegistryRef,
75 procedure_manager: Option<ProcedureManagerRef>,
77}
78
79const CATALOG_CACHE_MAX_CAPACITY: u64 = 128;
80
81impl KvBackendCatalogManager {
82 pub fn new(
83 information_extension: InformationExtensionRef,
84 backend: KvBackendRef,
85 cache_registry: LayeredCacheRegistryRef,
86 procedure_manager: Option<ProcedureManagerRef>,
87 ) -> Arc<Self> {
88 Arc::new_cyclic(|me| Self {
89 information_extension,
90 partition_manager: Arc::new(PartitionRuleManager::new(
91 backend.clone(),
92 cache_registry
93 .get()
94 .expect("Failed to get table_route_cache"),
95 )),
96 table_metadata_manager: Arc::new(TableMetadataManager::new(backend.clone())),
97 system_catalog: SystemCatalog {
98 catalog_manager: me.clone(),
99 catalog_cache: Cache::new(CATALOG_CACHE_MAX_CAPACITY),
100 pg_catalog_cache: Cache::new(CATALOG_CACHE_MAX_CAPACITY),
101 information_schema_provider: Arc::new(InformationSchemaProvider::new(
102 DEFAULT_CATALOG_NAME.to_string(),
103 me.clone(),
104 Arc::new(FlowMetadataManager::new(backend.clone())),
105 )),
106 pg_catalog_provider: Arc::new(PGCatalogProvider::new(
107 DEFAULT_CATALOG_NAME.to_string(),
108 me.clone(),
109 )),
110 backend,
111 },
112 cache_registry,
113 procedure_manager,
114 })
115 }
116
117 pub fn view_info_cache(&self) -> Result<ViewInfoCacheRef> {
118 self.cache_registry.get().context(CacheNotFoundSnafu {
119 name: "view_info_cache",
120 })
121 }
122
123 pub fn information_extension(&self) -> InformationExtensionRef {
125 self.information_extension.clone()
126 }
127
128 pub fn partition_manager(&self) -> PartitionRuleManagerRef {
129 self.partition_manager.clone()
130 }
131
132 pub fn table_metadata_manager_ref(&self) -> &TableMetadataManagerRef {
133 &self.table_metadata_manager
134 }
135
136 pub fn procedure_manager(&self) -> Option<ProcedureManagerRef> {
137 self.procedure_manager.clone()
138 }
139}
140
141#[async_trait::async_trait]
142impl CatalogManager for KvBackendCatalogManager {
143 fn as_any(&self) -> &dyn Any {
144 self
145 }
146
147 async fn catalog_names(&self) -> Result<Vec<String>> {
148 let stream = self
149 .table_metadata_manager
150 .catalog_manager()
151 .catalog_names();
152
153 let keys = stream
154 .try_collect::<Vec<_>>()
155 .await
156 .map_err(BoxedError::new)
157 .context(ListCatalogsSnafu)?;
158
159 Ok(keys)
160 }
161
162 async fn schema_names(
163 &self,
164 catalog: &str,
165 query_ctx: Option<&QueryContext>,
166 ) -> Result<Vec<String>> {
167 let stream = self
168 .table_metadata_manager
169 .schema_manager()
170 .schema_names(catalog);
171 let mut keys = stream
172 .try_collect::<BTreeSet<_>>()
173 .await
174 .map_err(BoxedError::new)
175 .context(ListSchemasSnafu { catalog })?;
176
177 keys.extend(self.system_catalog.schema_names(query_ctx));
178
179 Ok(keys.into_iter().collect())
180 }
181
182 async fn table_names(
183 &self,
184 catalog: &str,
185 schema: &str,
186 query_ctx: Option<&QueryContext>,
187 ) -> Result<Vec<String>> {
188 let mut tables = self
189 .table_metadata_manager
190 .table_name_manager()
191 .tables(catalog, schema)
192 .map_ok(|(table_name, _)| table_name)
193 .try_collect::<Vec<_>>()
194 .await
195 .map_err(BoxedError::new)
196 .context(ListTablesSnafu { catalog, schema })?;
197
198 tables.extend(self.system_catalog.table_names(schema, query_ctx));
199 Ok(tables)
200 }
201
202 async fn catalog_exists(&self, catalog: &str) -> Result<bool> {
203 self.table_metadata_manager
204 .catalog_manager()
205 .exists(CatalogNameKey::new(catalog))
206 .await
207 .context(TableMetadataManagerSnafu)
208 }
209
210 async fn schema_exists(
211 &self,
212 catalog: &str,
213 schema: &str,
214 query_ctx: Option<&QueryContext>,
215 ) -> Result<bool> {
216 if self.system_catalog.schema_exists(schema, query_ctx) {
217 return Ok(true);
218 }
219
220 self.table_metadata_manager
221 .schema_manager()
222 .exists(SchemaNameKey::new(catalog, schema))
223 .await
224 .context(TableMetadataManagerSnafu)
225 }
226
227 async fn table_exists(
228 &self,
229 catalog: &str,
230 schema: &str,
231 table: &str,
232 query_ctx: Option<&QueryContext>,
233 ) -> Result<bool> {
234 if self.system_catalog.table_exists(schema, table, query_ctx) {
235 return Ok(true);
236 }
237
238 let key = TableNameKey::new(catalog, schema, table);
239 self.table_metadata_manager
240 .table_name_manager()
241 .get(key)
242 .await
243 .context(TableMetadataManagerSnafu)
244 .map(|x| x.is_some())
245 }
246
247 async fn table(
248 &self,
249 catalog_name: &str,
250 schema_name: &str,
251 table_name: &str,
252 query_ctx: Option<&QueryContext>,
253 ) -> Result<Option<TableRef>> {
254 let channel = query_ctx.map_or(Channel::Unknown, |ctx| ctx.channel());
255 if let Some(table) =
256 self.system_catalog
257 .table(catalog_name, schema_name, table_name, query_ctx)
258 {
259 return Ok(Some(table));
260 }
261
262 let table_cache: TableCacheRef = self.cache_registry.get().context(CacheNotFoundSnafu {
263 name: "table_cache",
264 })?;
265 if let Some(table) = table_cache
266 .get_by_ref(&TableName {
267 catalog_name: catalog_name.to_string(),
268 schema_name: schema_name.to_string(),
269 table_name: table_name.to_string(),
270 })
271 .await
272 .context(GetTableCacheSnafu)?
273 {
274 return Ok(Some(table));
275 }
276
277 if channel == Channel::Postgres {
278 if let Some(table) =
280 self.system_catalog
281 .table(catalog_name, PG_CATALOG_NAME, table_name, query_ctx)
282 {
283 return Ok(Some(table));
284 }
285 }
286
287 return Ok(None);
288 }
289
290 async fn tables_by_ids(
291 &self,
292 catalog: &str,
293 schema: &str,
294 table_ids: &[TableId],
295 ) -> Result<Vec<TableRef>> {
296 let table_info_values = self
297 .table_metadata_manager
298 .table_info_manager()
299 .batch_get(table_ids)
300 .await
301 .context(TableMetadataManagerSnafu)?;
302
303 let tables = table_info_values
304 .into_values()
305 .filter(|t| t.table_info.catalog_name == catalog && t.table_info.schema_name == schema)
306 .map(build_table)
307 .collect::<Result<Vec<_>>>()?;
308
309 Ok(tables)
310 }
311
312 fn tables<'a>(
313 &'a self,
314 catalog: &'a str,
315 schema: &'a str,
316 query_ctx: Option<&'a QueryContext>,
317 ) -> BoxStream<'a, Result<TableRef>> {
318 let sys_tables = try_stream!({
319 let sys_table_names = self.system_catalog.table_names(schema, query_ctx);
321 for table_name in sys_table_names {
322 if let Some(table) =
323 self.system_catalog
324 .table(catalog, schema, &table_name, query_ctx)
325 {
326 yield table;
327 }
328 }
329 });
330
331 const BATCH_SIZE: usize = 128;
332 const CONCURRENCY: usize = 8;
333
334 let (tx, rx) = tokio::sync::mpsc::channel(64);
335 let metadata_manager = self.table_metadata_manager.clone();
336 let catalog = catalog.to_string();
337 let schema = schema.to_string();
338 let semaphore = Arc::new(Semaphore::new(CONCURRENCY));
339
340 common_runtime::spawn_global(async move {
341 let table_id_stream = metadata_manager
342 .table_name_manager()
343 .tables(&catalog, &schema)
344 .map_ok(|(_, v)| v.table_id());
345 let mut table_id_chunks = table_id_stream.ready_chunks(BATCH_SIZE);
347
348 while let Some(table_ids) = table_id_chunks.next().await {
349 let table_ids = match table_ids
350 .into_iter()
351 .collect::<std::result::Result<Vec<_>, _>>()
352 .map_err(BoxedError::new)
353 .context(ListTablesSnafu {
354 catalog: &catalog,
355 schema: &schema,
356 }) {
357 Ok(table_ids) => table_ids,
358 Err(e) => {
359 let _ = tx.send(Err(e)).await;
360 return;
361 }
362 };
363
364 let metadata_manager = metadata_manager.clone();
365 let tx = tx.clone();
366 let semaphore = semaphore.clone();
367 common_runtime::spawn_global(async move {
368 let _ = semaphore.acquire().await;
370 let table_info_values = match metadata_manager
371 .table_info_manager()
372 .batch_get(&table_ids)
373 .await
374 .context(TableMetadataManagerSnafu)
375 {
376 Ok(table_info_values) => table_info_values,
377 Err(e) => {
378 let _ = tx.send(Err(e)).await;
379 return;
380 }
381 };
382
383 for table in table_info_values.into_values().map(build_table) {
384 if tx.send(table).await.is_err() {
385 return;
386 }
387 }
388 });
389 }
390 });
391
392 let user_tables = ReceiverStream::new(rx);
393 Box::pin(sys_tables.chain(user_tables))
394 }
395}
396
397fn build_table(table_info_value: TableInfoValue) -> Result<TableRef> {
398 let table_info = table_info_value
399 .table_info
400 .try_into()
401 .context(InvalidTableInfoInCatalogSnafu)?;
402 Ok(DistTable::table(Arc::new(table_info)))
403}
404
405#[derive(Clone)]
413struct SystemCatalog {
414 catalog_manager: Weak<KvBackendCatalogManager>,
415 catalog_cache: Cache<String, Arc<InformationSchemaProvider>>,
416 pg_catalog_cache: Cache<String, Arc<PGCatalogProvider>>,
417
418 information_schema_provider: Arc<InformationSchemaProvider>,
420 pg_catalog_provider: Arc<PGCatalogProvider>,
421 backend: KvBackendRef,
422}
423
424impl SystemCatalog {
425 fn schema_names(&self, query_ctx: Option<&QueryContext>) -> Vec<String> {
426 let channel = query_ctx.map_or(Channel::Unknown, |ctx| ctx.channel());
427 match channel {
428 Channel::Postgres => vec![
430 INFORMATION_SCHEMA_NAME.to_string(),
431 PG_CATALOG_NAME.to_string(),
432 ],
433 _ => {
434 vec![INFORMATION_SCHEMA_NAME.to_string()]
435 }
436 }
437 }
438
439 fn table_names(&self, schema: &str, query_ctx: Option<&QueryContext>) -> Vec<String> {
440 let channel = query_ctx.map_or(Channel::Unknown, |ctx| ctx.channel());
441 match schema {
442 INFORMATION_SCHEMA_NAME => self.information_schema_provider.table_names(),
443 PG_CATALOG_NAME if channel == Channel::Postgres => {
444 self.pg_catalog_provider.table_names()
445 }
446 DEFAULT_SCHEMA_NAME => {
447 vec![NUMBERS_TABLE_NAME.to_string()]
448 }
449 _ => vec![],
450 }
451 }
452
453 fn schema_exists(&self, schema: &str, query_ctx: Option<&QueryContext>) -> bool {
454 let channel = query_ctx.map_or(Channel::Unknown, |ctx| ctx.channel());
455 match channel {
456 Channel::Postgres => schema == PG_CATALOG_NAME || schema == INFORMATION_SCHEMA_NAME,
457 _ => schema == INFORMATION_SCHEMA_NAME,
458 }
459 }
460
461 fn table_exists(&self, schema: &str, table: &str, query_ctx: Option<&QueryContext>) -> bool {
462 let channel = query_ctx.map_or(Channel::Unknown, |ctx| ctx.channel());
463 if schema == INFORMATION_SCHEMA_NAME {
464 self.information_schema_provider.table(table).is_some()
465 } else if schema == DEFAULT_SCHEMA_NAME {
466 table == NUMBERS_TABLE_NAME
467 } else if schema == PG_CATALOG_NAME && channel == Channel::Postgres {
468 self.pg_catalog_provider.table(table).is_some()
469 } else {
470 false
471 }
472 }
473
474 fn table(
475 &self,
476 catalog: &str,
477 schema: &str,
478 table_name: &str,
479 query_ctx: Option<&QueryContext>,
480 ) -> Option<TableRef> {
481 let channel = query_ctx.map_or(Channel::Unknown, |ctx| ctx.channel());
482 if schema == INFORMATION_SCHEMA_NAME {
483 let information_schema_provider =
484 self.catalog_cache.get_with_by_ref(catalog, move || {
485 Arc::new(InformationSchemaProvider::new(
486 catalog.to_string(),
487 self.catalog_manager.clone(),
488 Arc::new(FlowMetadataManager::new(self.backend.clone())),
489 ))
490 });
491 information_schema_provider.table(table_name)
492 } else if schema == PG_CATALOG_NAME && channel == Channel::Postgres {
493 if catalog == DEFAULT_CATALOG_NAME {
494 self.pg_catalog_provider.table(table_name)
495 } else {
496 let pg_catalog_provider =
497 self.pg_catalog_cache.get_with_by_ref(catalog, move || {
498 Arc::new(PGCatalogProvider::new(
499 catalog.to_string(),
500 self.catalog_manager.clone(),
501 ))
502 });
503 pg_catalog_provider.table(table_name)
504 }
505 } else if schema == DEFAULT_SCHEMA_NAME && table_name == NUMBERS_TABLE_NAME {
506 Some(NumbersTable::table(NUMBERS_TABLE_ID))
507 } else {
508 None
509 }
510 }
511}