catalog/kvbackend/
manager.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16use std::collections::{BTreeSet, HashSet};
17use std::sync::{Arc, Weak};
18
19use async_stream::try_stream;
20use common_catalog::consts::{
21    DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, INFORMATION_SCHEMA_NAME, NUMBERS_TABLE_ID,
22    PG_CATALOG_NAME,
23};
24use common_error::ext::BoxedError;
25use common_meta::cache::{
26    LayeredCacheRegistryRef, TableRoute, TableRouteCacheRef, ViewInfoCacheRef,
27};
28use common_meta::key::catalog_name::CatalogNameKey;
29use common_meta::key::flow::FlowMetadataManager;
30use common_meta::key::schema_name::SchemaNameKey;
31use common_meta::key::table_info::{TableInfoManager, TableInfoValue};
32use common_meta::key::table_name::TableNameKey;
33use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
34use common_meta::kv_backend::KvBackendRef;
35use common_procedure::ProcedureManagerRef;
36use futures_util::stream::BoxStream;
37use futures_util::{StreamExt, TryStreamExt};
38use moka::sync::Cache;
39use partition::manager::{PartitionRuleManager, PartitionRuleManagerRef};
40use session::context::{Channel, QueryContext};
41use snafu::prelude::*;
42use store_api::metric_engine_consts::METRIC_ENGINE_NAME;
43use table::dist_table::DistTable;
44use table::metadata::TableId;
45use table::table::numbers::{NumbersTable, NUMBERS_TABLE_NAME};
46use table::table_name::TableName;
47use table::TableRef;
48use tokio::sync::Semaphore;
49use tokio_stream::wrappers::ReceiverStream;
50
51use crate::error::{
52    CacheNotFoundSnafu, GetTableCacheSnafu, InvalidTableInfoInCatalogSnafu, ListCatalogsSnafu,
53    ListSchemasSnafu, ListTablesSnafu, Result, TableMetadataManagerSnafu,
54};
55use crate::information_schema::{InformationExtensionRef, InformationSchemaProvider};
56use crate::kvbackend::TableCacheRef;
57use crate::process_manager::ProcessManagerRef;
58use crate::system_schema::pg_catalog::PGCatalogProvider;
59use crate::system_schema::SystemSchemaProvider;
60use crate::CatalogManager;
61
62/// Access all existing catalog, schema and tables.
63///
64/// The result comes from two source, all the user tables are presented in
65/// a kv-backend which persists the metadata of a table. And system tables
66/// comes from `SystemCatalog`, which is static and read-only.
67#[derive(Clone)]
68pub struct KvBackendCatalogManager {
69    /// Provides the extension methods for the `information_schema` tables
70    information_extension: InformationExtensionRef,
71    /// Manages partition rules.
72    partition_manager: PartitionRuleManagerRef,
73    /// Manages table metadata.
74    table_metadata_manager: TableMetadataManagerRef,
75    /// A sub-CatalogManager that handles system tables
76    system_catalog: SystemCatalog,
77    /// Cache registry for all caches.
78    cache_registry: LayeredCacheRegistryRef,
79    /// Only available in `Standalone` mode.
80    procedure_manager: Option<ProcedureManagerRef>,
81}
82
83const CATALOG_CACHE_MAX_CAPACITY: u64 = 128;
84
85impl KvBackendCatalogManager {
86    pub fn new(
87        information_extension: InformationExtensionRef,
88        backend: KvBackendRef,
89        cache_registry: LayeredCacheRegistryRef,
90        procedure_manager: Option<ProcedureManagerRef>,
91        process_manager: Option<ProcessManagerRef>,
92    ) -> Arc<Self> {
93        Arc::new_cyclic(|me| Self {
94            information_extension,
95            partition_manager: Arc::new(PartitionRuleManager::new(
96                backend.clone(),
97                cache_registry
98                    .get()
99                    .expect("Failed to get table_route_cache"),
100            )),
101            table_metadata_manager: Arc::new(TableMetadataManager::new(backend.clone())),
102            system_catalog: SystemCatalog {
103                catalog_manager: me.clone(),
104                catalog_cache: Cache::new(CATALOG_CACHE_MAX_CAPACITY),
105                pg_catalog_cache: Cache::new(CATALOG_CACHE_MAX_CAPACITY),
106                information_schema_provider: Arc::new(InformationSchemaProvider::new(
107                    DEFAULT_CATALOG_NAME.to_string(),
108                    me.clone(),
109                    Arc::new(FlowMetadataManager::new(backend.clone())),
110                    process_manager.clone(),
111                )),
112                pg_catalog_provider: Arc::new(PGCatalogProvider::new(
113                    DEFAULT_CATALOG_NAME.to_string(),
114                    me.clone(),
115                )),
116                backend,
117                process_manager,
118            },
119            cache_registry,
120            procedure_manager,
121        })
122    }
123
124    pub fn view_info_cache(&self) -> Result<ViewInfoCacheRef> {
125        self.cache_registry.get().context(CacheNotFoundSnafu {
126            name: "view_info_cache",
127        })
128    }
129
130    /// Returns the [`InformationExtension`].
131    pub fn information_extension(&self) -> InformationExtensionRef {
132        self.information_extension.clone()
133    }
134
135    pub fn partition_manager(&self) -> PartitionRuleManagerRef {
136        self.partition_manager.clone()
137    }
138
139    pub fn table_metadata_manager_ref(&self) -> &TableMetadataManagerRef {
140        &self.table_metadata_manager
141    }
142
143    pub fn procedure_manager(&self) -> Option<ProcedureManagerRef> {
144        self.procedure_manager.clone()
145    }
146
147    // Override logical table's partition key indices with physical table's.
148    async fn override_logical_table_partition_key_indices(
149        table_route_cache: &TableRouteCacheRef,
150        table_info_manager: &TableInfoManager,
151        table: TableRef,
152    ) -> Result<TableRef> {
153        // If the table is not a metric table, return the table directly.
154        if table.table_info().meta.engine != METRIC_ENGINE_NAME {
155            return Ok(table);
156        }
157
158        if let Some(table_route_value) = table_route_cache
159            .get(table.table_info().table_id())
160            .await
161            .context(TableMetadataManagerSnafu)?
162            && let TableRoute::Logical(logical_route) = &*table_route_value
163            && let Some(physical_table_info_value) = table_info_manager
164                .get(logical_route.physical_table_id())
165                .await
166                .context(TableMetadataManagerSnafu)?
167        {
168            let mut new_table_info = (*table.table_info()).clone();
169            // Gather all column names from the logical table
170            let logical_column_names: HashSet<_> = new_table_info
171                .meta
172                .schema
173                .column_schemas()
174                .iter()
175                .map(|col| &col.name)
176                .collect();
177
178            // Only preserve partition key indices where the corresponding columns exist in logical table
179            new_table_info.meta.partition_key_indices = physical_table_info_value
180                .table_info
181                .meta
182                .partition_key_indices
183                .iter()
184                .filter(|&&index| {
185                    physical_table_info_value
186                        .table_info
187                        .meta
188                        .schema
189                        .column_schemas
190                        .get(index)
191                        .map(|physical_column| logical_column_names.contains(&physical_column.name))
192                        .unwrap_or(false)
193                })
194                .cloned()
195                .collect();
196
197            let new_table = DistTable::table(Arc::new(new_table_info));
198
199            return Ok(new_table);
200        }
201
202        Ok(table)
203    }
204}
205
206#[async_trait::async_trait]
207impl CatalogManager for KvBackendCatalogManager {
208    fn as_any(&self) -> &dyn Any {
209        self
210    }
211
212    async fn catalog_names(&self) -> Result<Vec<String>> {
213        let stream = self
214            .table_metadata_manager
215            .catalog_manager()
216            .catalog_names();
217
218        let keys = stream
219            .try_collect::<Vec<_>>()
220            .await
221            .map_err(BoxedError::new)
222            .context(ListCatalogsSnafu)?;
223
224        Ok(keys)
225    }
226
227    async fn schema_names(
228        &self,
229        catalog: &str,
230        query_ctx: Option<&QueryContext>,
231    ) -> Result<Vec<String>> {
232        let stream = self
233            .table_metadata_manager
234            .schema_manager()
235            .schema_names(catalog);
236        let mut keys = stream
237            .try_collect::<BTreeSet<_>>()
238            .await
239            .map_err(BoxedError::new)
240            .context(ListSchemasSnafu { catalog })?;
241
242        keys.extend(self.system_catalog.schema_names(query_ctx));
243
244        Ok(keys.into_iter().collect())
245    }
246
247    async fn table_names(
248        &self,
249        catalog: &str,
250        schema: &str,
251        query_ctx: Option<&QueryContext>,
252    ) -> Result<Vec<String>> {
253        let mut tables = self
254            .table_metadata_manager
255            .table_name_manager()
256            .tables(catalog, schema)
257            .map_ok(|(table_name, _)| table_name)
258            .try_collect::<Vec<_>>()
259            .await
260            .map_err(BoxedError::new)
261            .context(ListTablesSnafu { catalog, schema })?;
262
263        tables.extend(self.system_catalog.table_names(schema, query_ctx));
264        Ok(tables)
265    }
266
267    async fn catalog_exists(&self, catalog: &str) -> Result<bool> {
268        self.table_metadata_manager
269            .catalog_manager()
270            .exists(CatalogNameKey::new(catalog))
271            .await
272            .context(TableMetadataManagerSnafu)
273    }
274
275    async fn schema_exists(
276        &self,
277        catalog: &str,
278        schema: &str,
279        query_ctx: Option<&QueryContext>,
280    ) -> Result<bool> {
281        if self.system_catalog.schema_exists(schema, query_ctx) {
282            return Ok(true);
283        }
284
285        self.table_metadata_manager
286            .schema_manager()
287            .exists(SchemaNameKey::new(catalog, schema))
288            .await
289            .context(TableMetadataManagerSnafu)
290    }
291
292    async fn table_exists(
293        &self,
294        catalog: &str,
295        schema: &str,
296        table: &str,
297        query_ctx: Option<&QueryContext>,
298    ) -> Result<bool> {
299        if self.system_catalog.table_exists(schema, table, query_ctx) {
300            return Ok(true);
301        }
302
303        let key = TableNameKey::new(catalog, schema, table);
304        self.table_metadata_manager
305            .table_name_manager()
306            .get(key)
307            .await
308            .context(TableMetadataManagerSnafu)
309            .map(|x| x.is_some())
310    }
311
312    async fn table(
313        &self,
314        catalog_name: &str,
315        schema_name: &str,
316        table_name: &str,
317        query_ctx: Option<&QueryContext>,
318    ) -> Result<Option<TableRef>> {
319        let channel = query_ctx.map_or(Channel::Unknown, |ctx| ctx.channel());
320        if let Some(table) =
321            self.system_catalog
322                .table(catalog_name, schema_name, table_name, query_ctx)
323        {
324            return Ok(Some(table));
325        }
326
327        let table_cache: TableCacheRef = self.cache_registry.get().context(CacheNotFoundSnafu {
328            name: "table_cache",
329        })?;
330
331        let table = table_cache
332            .get_by_ref(&TableName {
333                catalog_name: catalog_name.to_string(),
334                schema_name: schema_name.to_string(),
335                table_name: table_name.to_string(),
336            })
337            .await
338            .context(GetTableCacheSnafu)?;
339
340        if let Some(table) = table {
341            let table_route_cache: TableRouteCacheRef =
342                self.cache_registry.get().context(CacheNotFoundSnafu {
343                    name: "table_route_cache",
344                })?;
345            return Self::override_logical_table_partition_key_indices(
346                &table_route_cache,
347                self.table_metadata_manager.table_info_manager(),
348                table,
349            )
350            .await
351            .map(Some);
352        }
353
354        if channel == Channel::Postgres {
355            // falldown to pg_catalog
356            if let Some(table) =
357                self.system_catalog
358                    .table(catalog_name, PG_CATALOG_NAME, table_name, query_ctx)
359            {
360                return Ok(Some(table));
361            }
362        }
363
364        Ok(None)
365    }
366
367    async fn tables_by_ids(
368        &self,
369        catalog: &str,
370        schema: &str,
371        table_ids: &[TableId],
372    ) -> Result<Vec<TableRef>> {
373        let table_info_values = self
374            .table_metadata_manager
375            .table_info_manager()
376            .batch_get(table_ids)
377            .await
378            .context(TableMetadataManagerSnafu)?;
379
380        let tables = table_info_values
381            .into_values()
382            .filter(|t| t.table_info.catalog_name == catalog && t.table_info.schema_name == schema)
383            .map(build_table)
384            .collect::<Result<Vec<_>>>()?;
385
386        Ok(tables)
387    }
388
389    fn tables<'a>(
390        &'a self,
391        catalog: &'a str,
392        schema: &'a str,
393        query_ctx: Option<&'a QueryContext>,
394    ) -> BoxStream<'a, Result<TableRef>> {
395        let sys_tables = try_stream!({
396            // System tables
397            let sys_table_names = self.system_catalog.table_names(schema, query_ctx);
398            for table_name in sys_table_names {
399                if let Some(table) =
400                    self.system_catalog
401                        .table(catalog, schema, &table_name, query_ctx)
402                {
403                    yield table;
404                }
405            }
406        });
407
408        const BATCH_SIZE: usize = 128;
409        const CONCURRENCY: usize = 8;
410
411        let (tx, rx) = tokio::sync::mpsc::channel(64);
412        let metadata_manager = self.table_metadata_manager.clone();
413        let catalog = catalog.to_string();
414        let schema = schema.to_string();
415        let semaphore = Arc::new(Semaphore::new(CONCURRENCY));
416        let table_route_cache: Result<TableRouteCacheRef> =
417            self.cache_registry.get().context(CacheNotFoundSnafu {
418                name: "table_route_cache",
419            });
420
421        common_runtime::spawn_global(async move {
422            let table_route_cache = match table_route_cache {
423                Ok(table_route_cache) => table_route_cache,
424                Err(e) => {
425                    let _ = tx.send(Err(e)).await;
426                    return;
427                }
428            };
429
430            let table_id_stream = metadata_manager
431                .table_name_manager()
432                .tables(&catalog, &schema)
433                .map_ok(|(_, v)| v.table_id());
434            // Split table ids into chunks
435            let mut table_id_chunks = table_id_stream.ready_chunks(BATCH_SIZE);
436
437            while let Some(table_ids) = table_id_chunks.next().await {
438                let table_ids = match table_ids
439                    .into_iter()
440                    .collect::<std::result::Result<Vec<_>, _>>()
441                    .map_err(BoxedError::new)
442                    .context(ListTablesSnafu {
443                        catalog: &catalog,
444                        schema: &schema,
445                    }) {
446                    Ok(table_ids) => table_ids,
447                    Err(e) => {
448                        let _ = tx.send(Err(e)).await;
449                        return;
450                    }
451                };
452
453                let metadata_manager = metadata_manager.clone();
454                let tx = tx.clone();
455                let semaphore = semaphore.clone();
456                let table_route_cache = table_route_cache.clone();
457                common_runtime::spawn_global(async move {
458                    // we don't explicitly close the semaphore so just ignore the potential error.
459                    let _ = semaphore.acquire().await;
460                    let table_info_values = match metadata_manager
461                        .table_info_manager()
462                        .batch_get(&table_ids)
463                        .await
464                        .context(TableMetadataManagerSnafu)
465                    {
466                        Ok(table_info_values) => table_info_values,
467                        Err(e) => {
468                            let _ = tx.send(Err(e)).await;
469                            return;
470                        }
471                    };
472
473                    for table in table_info_values.into_values().map(build_table) {
474                        let table = if let Ok(table) = table {
475                            Self::override_logical_table_partition_key_indices(
476                                &table_route_cache,
477                                metadata_manager.table_info_manager(),
478                                table,
479                            )
480                            .await
481                        } else {
482                            table
483                        };
484                        if tx.send(table).await.is_err() {
485                            return;
486                        }
487                    }
488                });
489            }
490        });
491
492        let user_tables = ReceiverStream::new(rx);
493        Box::pin(sys_tables.chain(user_tables))
494    }
495}
496
497fn build_table(table_info_value: TableInfoValue) -> Result<TableRef> {
498    let table_info = table_info_value
499        .table_info
500        .try_into()
501        .context(InvalidTableInfoInCatalogSnafu)?;
502    Ok(DistTable::table(Arc::new(table_info)))
503}
504
505// TODO: This struct can hold a static map of all system tables when
506// the upper layer (e.g., procedure) can inform the catalog manager
507// a new catalog is created.
508/// Existing system tables:
509/// - public.numbers
510/// - information_schema.{tables}
511/// - pg_catalog.{tables}
512#[derive(Clone)]
513struct SystemCatalog {
514    catalog_manager: Weak<KvBackendCatalogManager>,
515    catalog_cache: Cache<String, Arc<InformationSchemaProvider>>,
516    pg_catalog_cache: Cache<String, Arc<PGCatalogProvider>>,
517
518    // system_schema_provider for default catalog
519    information_schema_provider: Arc<InformationSchemaProvider>,
520    pg_catalog_provider: Arc<PGCatalogProvider>,
521    backend: KvBackendRef,
522    process_manager: Option<ProcessManagerRef>,
523}
524
525impl SystemCatalog {
526    fn schema_names(&self, query_ctx: Option<&QueryContext>) -> Vec<String> {
527        let channel = query_ctx.map_or(Channel::Unknown, |ctx| ctx.channel());
528        match channel {
529            // pg_catalog only visible under postgres protocol
530            Channel::Postgres => vec![
531                INFORMATION_SCHEMA_NAME.to_string(),
532                PG_CATALOG_NAME.to_string(),
533            ],
534            _ => {
535                vec![INFORMATION_SCHEMA_NAME.to_string()]
536            }
537        }
538    }
539
540    fn table_names(&self, schema: &str, query_ctx: Option<&QueryContext>) -> Vec<String> {
541        let channel = query_ctx.map_or(Channel::Unknown, |ctx| ctx.channel());
542        match schema {
543            INFORMATION_SCHEMA_NAME => self.information_schema_provider.table_names(),
544            PG_CATALOG_NAME if channel == Channel::Postgres => {
545                self.pg_catalog_provider.table_names()
546            }
547            DEFAULT_SCHEMA_NAME => {
548                vec![NUMBERS_TABLE_NAME.to_string()]
549            }
550            _ => vec![],
551        }
552    }
553
554    fn schema_exists(&self, schema: &str, query_ctx: Option<&QueryContext>) -> bool {
555        let channel = query_ctx.map_or(Channel::Unknown, |ctx| ctx.channel());
556        match channel {
557            Channel::Postgres => schema == PG_CATALOG_NAME || schema == INFORMATION_SCHEMA_NAME,
558            _ => schema == INFORMATION_SCHEMA_NAME,
559        }
560    }
561
562    fn table_exists(&self, schema: &str, table: &str, query_ctx: Option<&QueryContext>) -> bool {
563        let channel = query_ctx.map_or(Channel::Unknown, |ctx| ctx.channel());
564        if schema == INFORMATION_SCHEMA_NAME {
565            self.information_schema_provider.table(table).is_some()
566        } else if schema == DEFAULT_SCHEMA_NAME {
567            table == NUMBERS_TABLE_NAME
568        } else if schema == PG_CATALOG_NAME && channel == Channel::Postgres {
569            self.pg_catalog_provider.table(table).is_some()
570        } else {
571            false
572        }
573    }
574
575    fn table(
576        &self,
577        catalog: &str,
578        schema: &str,
579        table_name: &str,
580        query_ctx: Option<&QueryContext>,
581    ) -> Option<TableRef> {
582        let channel = query_ctx.map_or(Channel::Unknown, |ctx| ctx.channel());
583        if schema == INFORMATION_SCHEMA_NAME {
584            let information_schema_provider =
585                self.catalog_cache.get_with_by_ref(catalog, move || {
586                    Arc::new(InformationSchemaProvider::new(
587                        catalog.to_string(),
588                        self.catalog_manager.clone(),
589                        Arc::new(FlowMetadataManager::new(self.backend.clone())),
590                        self.process_manager.clone(),
591                    ))
592                });
593            information_schema_provider.table(table_name)
594        } else if schema == PG_CATALOG_NAME && channel == Channel::Postgres {
595            if catalog == DEFAULT_CATALOG_NAME {
596                self.pg_catalog_provider.table(table_name)
597            } else {
598                let pg_catalog_provider =
599                    self.pg_catalog_cache.get_with_by_ref(catalog, move || {
600                        Arc::new(PGCatalogProvider::new(
601                            catalog.to_string(),
602                            self.catalog_manager.clone(),
603                        ))
604                    });
605                pg_catalog_provider.table(table_name)
606            }
607        } else if schema == DEFAULT_SCHEMA_NAME && table_name == NUMBERS_TABLE_NAME {
608            Some(NumbersTable::table(NUMBERS_TABLE_ID))
609        } else {
610            None
611        }
612    }
613}