catalog/kvbackend/
manager.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16use std::collections::BTreeSet;
17use std::sync::{Arc, Weak};
18
19use async_stream::try_stream;
20use common_catalog::consts::{
21    DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, INFORMATION_SCHEMA_NAME, NUMBERS_TABLE_ID,
22    PG_CATALOG_NAME,
23};
24use common_error::ext::BoxedError;
25use common_meta::cache::{
26    LayeredCacheRegistryRef, TableInfoCacheRef, TableNameCacheRef, TableRoute, TableRouteCacheRef,
27    ViewInfoCacheRef,
28};
29use common_meta::key::catalog_name::CatalogNameKey;
30use common_meta::key::flow::FlowMetadataManager;
31use common_meta::key::schema_name::SchemaNameKey;
32use common_meta::key::table_info::{TableInfoManager, TableInfoValue};
33use common_meta::key::table_name::TableNameKey;
34use common_meta::key::TableMetadataManagerRef;
35use common_meta::kv_backend::KvBackendRef;
36use common_procedure::ProcedureManagerRef;
37use futures_util::stream::BoxStream;
38use futures_util::{StreamExt, TryStreamExt};
39use moka::sync::Cache;
40use partition::manager::PartitionRuleManagerRef;
41use session::context::{Channel, QueryContext};
42use snafu::prelude::*;
43use store_api::metric_engine_consts::METRIC_ENGINE_NAME;
44use table::dist_table::DistTable;
45use table::metadata::{TableId, TableInfoRef};
46use table::table::numbers::{NumbersTable, NUMBERS_TABLE_NAME};
47use table::table::PartitionRules;
48use table::table_name::TableName;
49use table::TableRef;
50use tokio::sync::Semaphore;
51use tokio_stream::wrappers::ReceiverStream;
52
53use crate::error::{
54    CacheNotFoundSnafu, GetTableCacheSnafu, InvalidTableInfoInCatalogSnafu, ListCatalogsSnafu,
55    ListSchemasSnafu, ListTablesSnafu, Result, TableMetadataManagerSnafu,
56};
57#[cfg(feature = "enterprise")]
58use crate::information_schema::InformationSchemaTableFactoryRef;
59use crate::information_schema::{InformationExtensionRef, InformationSchemaProvider};
60use crate::kvbackend::TableCacheRef;
61use crate::process_manager::ProcessManagerRef;
62use crate::system_schema::pg_catalog::PGCatalogProvider;
63use crate::system_schema::SystemSchemaProvider;
64use crate::CatalogManager;
65
66/// Access all existing catalog, schema and tables.
67///
68/// The result comes from two source, all the user tables are presented in
69/// a kv-backend which persists the metadata of a table. And system tables
70/// comes from `SystemCatalog`, which is static and read-only.
71#[derive(Clone)]
72pub struct KvBackendCatalogManager {
73    /// Provides the extension methods for the `information_schema` tables
74    pub(super) information_extension: InformationExtensionRef,
75    /// Manages partition rules.
76    pub(super) partition_manager: PartitionRuleManagerRef,
77    /// Manages table metadata.
78    pub(super) table_metadata_manager: TableMetadataManagerRef,
79    /// A sub-CatalogManager that handles system tables
80    pub(super) system_catalog: SystemCatalog,
81    /// Cache registry for all caches.
82    pub(super) cache_registry: LayeredCacheRegistryRef,
83    /// Only available in `Standalone` mode.
84    pub(super) procedure_manager: Option<ProcedureManagerRef>,
85}
86
87pub(super) const CATALOG_CACHE_MAX_CAPACITY: u64 = 128;
88
89impl KvBackendCatalogManager {
90    pub fn view_info_cache(&self) -> Result<ViewInfoCacheRef> {
91        self.cache_registry.get().context(CacheNotFoundSnafu {
92            name: "view_info_cache",
93        })
94    }
95
96    /// Returns the [`InformationExtension`].
97    pub fn information_extension(&self) -> InformationExtensionRef {
98        self.information_extension.clone()
99    }
100
101    pub fn partition_manager(&self) -> PartitionRuleManagerRef {
102        self.partition_manager.clone()
103    }
104
105    pub fn table_metadata_manager_ref(&self) -> &TableMetadataManagerRef {
106        &self.table_metadata_manager
107    }
108
109    pub fn procedure_manager(&self) -> Option<ProcedureManagerRef> {
110        self.procedure_manager.clone()
111    }
112
113    // Override logical table's partition key indices with physical table's.
114    async fn override_logical_table_partition_key_indices(
115        table_route_cache: &TableRouteCacheRef,
116        table_info_manager: &TableInfoManager,
117        table: TableRef,
118    ) -> Result<TableRef> {
119        // If the table is not a metric table, return the table directly.
120        if table.table_info().meta.engine != METRIC_ENGINE_NAME {
121            return Ok(table);
122        }
123
124        if let Some(table_route_value) = table_route_cache
125            .get(table.table_info().table_id())
126            .await
127            .context(TableMetadataManagerSnafu)?
128            && let TableRoute::Logical(logical_route) = &*table_route_value
129            && let Some(physical_table_info_value) = table_info_manager
130                .get(logical_route.physical_table_id())
131                .await
132                .context(TableMetadataManagerSnafu)?
133        {
134            let mut new_table_info = (*table.table_info()).clone();
135
136            let mut phy_part_cols_not_in_logical_table = vec![];
137
138            // Remap partition key indices from physical table to logical table
139            new_table_info.meta.partition_key_indices = physical_table_info_value
140                .table_info
141                .meta
142                .partition_key_indices
143                .iter()
144                .filter_map(|&physical_index| {
145                    // Get the column name from the physical table using the physical index
146                    physical_table_info_value
147                        .table_info
148                        .meta
149                        .schema
150                        .column_schemas
151                        .get(physical_index)
152                        .and_then(|physical_column| {
153                            // Find the corresponding index in the logical table schema
154                            let idx = new_table_info
155                                .meta
156                                .schema
157                                .column_index_by_name(physical_column.name.as_str());
158                            if idx.is_none() {
159                                // not all part columns in physical table that are also in logical table
160                                phy_part_cols_not_in_logical_table
161                                    .push(physical_column.name.clone());
162                            }
163
164                            idx
165                        })
166                })
167                .collect();
168
169            let partition_rules = if !phy_part_cols_not_in_logical_table.is_empty() {
170                Some(PartitionRules {
171                    extra_phy_cols_not_in_logical_table: phy_part_cols_not_in_logical_table,
172                })
173            } else {
174                None
175            };
176
177            let new_table = DistTable::table_partitioned(Arc::new(new_table_info), partition_rules);
178
179            return Ok(new_table);
180        }
181
182        Ok(table)
183    }
184}
185
186#[async_trait::async_trait]
187impl CatalogManager for KvBackendCatalogManager {
188    fn as_any(&self) -> &dyn Any {
189        self
190    }
191
192    async fn catalog_names(&self) -> Result<Vec<String>> {
193        let stream = self
194            .table_metadata_manager
195            .catalog_manager()
196            .catalog_names();
197
198        let keys = stream
199            .try_collect::<Vec<_>>()
200            .await
201            .map_err(BoxedError::new)
202            .context(ListCatalogsSnafu)?;
203
204        Ok(keys)
205    }
206
207    async fn schema_names(
208        &self,
209        catalog: &str,
210        query_ctx: Option<&QueryContext>,
211    ) -> Result<Vec<String>> {
212        let stream = self
213            .table_metadata_manager
214            .schema_manager()
215            .schema_names(catalog);
216        let mut keys = stream
217            .try_collect::<BTreeSet<_>>()
218            .await
219            .map_err(BoxedError::new)
220            .context(ListSchemasSnafu { catalog })?;
221
222        keys.extend(self.system_catalog.schema_names(query_ctx));
223
224        Ok(keys.into_iter().collect())
225    }
226
227    async fn table_names(
228        &self,
229        catalog: &str,
230        schema: &str,
231        query_ctx: Option<&QueryContext>,
232    ) -> Result<Vec<String>> {
233        let mut tables = self
234            .table_metadata_manager
235            .table_name_manager()
236            .tables(catalog, schema)
237            .map_ok(|(table_name, _)| table_name)
238            .try_collect::<Vec<_>>()
239            .await
240            .map_err(BoxedError::new)
241            .context(ListTablesSnafu { catalog, schema })?;
242
243        tables.extend(self.system_catalog.table_names(schema, query_ctx));
244        Ok(tables)
245    }
246
247    async fn catalog_exists(&self, catalog: &str) -> Result<bool> {
248        self.table_metadata_manager
249            .catalog_manager()
250            .exists(CatalogNameKey::new(catalog))
251            .await
252            .context(TableMetadataManagerSnafu)
253    }
254
255    async fn schema_exists(
256        &self,
257        catalog: &str,
258        schema: &str,
259        query_ctx: Option<&QueryContext>,
260    ) -> Result<bool> {
261        if self.system_catalog.schema_exists(schema, query_ctx) {
262            return Ok(true);
263        }
264
265        self.table_metadata_manager
266            .schema_manager()
267            .exists(SchemaNameKey::new(catalog, schema))
268            .await
269            .context(TableMetadataManagerSnafu)
270    }
271
272    async fn table_exists(
273        &self,
274        catalog: &str,
275        schema: &str,
276        table: &str,
277        query_ctx: Option<&QueryContext>,
278    ) -> Result<bool> {
279        if self.system_catalog.table_exists(schema, table, query_ctx) {
280            return Ok(true);
281        }
282
283        let key = TableNameKey::new(catalog, schema, table);
284        self.table_metadata_manager
285            .table_name_manager()
286            .get(key)
287            .await
288            .context(TableMetadataManagerSnafu)
289            .map(|x| x.is_some())
290    }
291
292    async fn table(
293        &self,
294        catalog_name: &str,
295        schema_name: &str,
296        table_name: &str,
297        query_ctx: Option<&QueryContext>,
298    ) -> Result<Option<TableRef>> {
299        let channel = query_ctx.map_or(Channel::Unknown, |ctx| ctx.channel());
300        if let Some(table) =
301            self.system_catalog
302                .table(catalog_name, schema_name, table_name, query_ctx)
303        {
304            return Ok(Some(table));
305        }
306
307        let table_cache: TableCacheRef = self.cache_registry.get().context(CacheNotFoundSnafu {
308            name: "table_cache",
309        })?;
310
311        let table = table_cache
312            .get_by_ref(&TableName {
313                catalog_name: catalog_name.to_string(),
314                schema_name: schema_name.to_string(),
315                table_name: table_name.to_string(),
316            })
317            .await
318            .context(GetTableCacheSnafu)?;
319
320        if let Some(table) = table {
321            let table_route_cache: TableRouteCacheRef =
322                self.cache_registry.get().context(CacheNotFoundSnafu {
323                    name: "table_route_cache",
324                })?;
325            return Self::override_logical_table_partition_key_indices(
326                &table_route_cache,
327                self.table_metadata_manager.table_info_manager(),
328                table,
329            )
330            .await
331            .map(Some);
332        }
333
334        if channel == Channel::Postgres {
335            // falldown to pg_catalog
336            if let Some(table) =
337                self.system_catalog
338                    .table(catalog_name, PG_CATALOG_NAME, table_name, query_ctx)
339            {
340                return Ok(Some(table));
341            }
342        }
343
344        Ok(None)
345    }
346
347    async fn table_id(
348        &self,
349        catalog_name: &str,
350        schema_name: &str,
351        table_name: &str,
352        query_ctx: Option<&QueryContext>,
353    ) -> Result<Option<TableId>> {
354        let channel = query_ctx.map_or(Channel::Unknown, |ctx| ctx.channel());
355        if let Some(table) =
356            self.system_catalog
357                .table(catalog_name, schema_name, table_name, query_ctx)
358        {
359            return Ok(Some(table.table_info().table_id()));
360        }
361
362        let table_cache: TableNameCacheRef =
363            self.cache_registry.get().context(CacheNotFoundSnafu {
364                name: "table_name_cache",
365            })?;
366
367        let table = table_cache
368            .get_by_ref(&TableName {
369                catalog_name: catalog_name.to_string(),
370                schema_name: schema_name.to_string(),
371                table_name: table_name.to_string(),
372            })
373            .await
374            .context(GetTableCacheSnafu)?;
375
376        if let Some(table) = table {
377            return Ok(Some(table));
378        }
379
380        if channel == Channel::Postgres {
381            // falldown to pg_catalog
382            if let Some(table) =
383                self.system_catalog
384                    .table(catalog_name, PG_CATALOG_NAME, table_name, query_ctx)
385            {
386                return Ok(Some(table.table_info().table_id()));
387            }
388        }
389
390        Ok(None)
391    }
392
393    async fn table_info_by_id(&self, table_id: TableId) -> Result<Option<TableInfoRef>> {
394        let table_info_cache: TableInfoCacheRef =
395            self.cache_registry.get().context(CacheNotFoundSnafu {
396                name: "table_info_cache",
397            })?;
398        table_info_cache
399            .get_by_ref(&table_id)
400            .await
401            .context(GetTableCacheSnafu)
402    }
403
404    async fn tables_by_ids(
405        &self,
406        catalog: &str,
407        schema: &str,
408        table_ids: &[TableId],
409    ) -> Result<Vec<TableRef>> {
410        let table_info_values = self
411            .table_metadata_manager
412            .table_info_manager()
413            .batch_get(table_ids)
414            .await
415            .context(TableMetadataManagerSnafu)?;
416
417        let tables = table_info_values
418            .into_values()
419            .filter(|t| t.table_info.catalog_name == catalog && t.table_info.schema_name == schema)
420            .map(build_table)
421            .collect::<Result<Vec<_>>>()?;
422
423        Ok(tables)
424    }
425
426    fn tables<'a>(
427        &'a self,
428        catalog: &'a str,
429        schema: &'a str,
430        query_ctx: Option<&'a QueryContext>,
431    ) -> BoxStream<'a, Result<TableRef>> {
432        let sys_tables = try_stream!({
433            // System tables
434            let sys_table_names = self.system_catalog.table_names(schema, query_ctx);
435            for table_name in sys_table_names {
436                if let Some(table) =
437                    self.system_catalog
438                        .table(catalog, schema, &table_name, query_ctx)
439                {
440                    yield table;
441                }
442            }
443        });
444
445        const BATCH_SIZE: usize = 128;
446        const CONCURRENCY: usize = 8;
447
448        let (tx, rx) = tokio::sync::mpsc::channel(64);
449        let metadata_manager = self.table_metadata_manager.clone();
450        let catalog = catalog.to_string();
451        let schema = schema.to_string();
452        let semaphore = Arc::new(Semaphore::new(CONCURRENCY));
453        let table_route_cache: Result<TableRouteCacheRef> =
454            self.cache_registry.get().context(CacheNotFoundSnafu {
455                name: "table_route_cache",
456            });
457
458        common_runtime::spawn_global(async move {
459            let table_route_cache = match table_route_cache {
460                Ok(table_route_cache) => table_route_cache,
461                Err(e) => {
462                    let _ = tx.send(Err(e)).await;
463                    return;
464                }
465            };
466
467            let table_id_stream = metadata_manager
468                .table_name_manager()
469                .tables(&catalog, &schema)
470                .map_ok(|(_, v)| v.table_id());
471            // Split table ids into chunks
472            let mut table_id_chunks = table_id_stream.ready_chunks(BATCH_SIZE);
473
474            while let Some(table_ids) = table_id_chunks.next().await {
475                let table_ids = match table_ids
476                    .into_iter()
477                    .collect::<std::result::Result<Vec<_>, _>>()
478                    .map_err(BoxedError::new)
479                    .context(ListTablesSnafu {
480                        catalog: &catalog,
481                        schema: &schema,
482                    }) {
483                    Ok(table_ids) => table_ids,
484                    Err(e) => {
485                        let _ = tx.send(Err(e)).await;
486                        return;
487                    }
488                };
489
490                let metadata_manager = metadata_manager.clone();
491                let tx = tx.clone();
492                let semaphore = semaphore.clone();
493                let table_route_cache = table_route_cache.clone();
494                common_runtime::spawn_global(async move {
495                    // we don't explicitly close the semaphore so just ignore the potential error.
496                    let _ = semaphore.acquire().await;
497                    let table_info_values = match metadata_manager
498                        .table_info_manager()
499                        .batch_get(&table_ids)
500                        .await
501                        .context(TableMetadataManagerSnafu)
502                    {
503                        Ok(table_info_values) => table_info_values,
504                        Err(e) => {
505                            let _ = tx.send(Err(e)).await;
506                            return;
507                        }
508                    };
509
510                    for table in table_info_values.into_values().map(build_table) {
511                        let table = if let Ok(table) = table {
512                            Self::override_logical_table_partition_key_indices(
513                                &table_route_cache,
514                                metadata_manager.table_info_manager(),
515                                table,
516                            )
517                            .await
518                        } else {
519                            table
520                        };
521                        if tx.send(table).await.is_err() {
522                            return;
523                        }
524                    }
525                });
526            }
527        });
528
529        let user_tables = ReceiverStream::new(rx);
530        Box::pin(sys_tables.chain(user_tables))
531    }
532}
533
534fn build_table(table_info_value: TableInfoValue) -> Result<TableRef> {
535    let table_info = table_info_value
536        .table_info
537        .try_into()
538        .context(InvalidTableInfoInCatalogSnafu)?;
539    Ok(DistTable::table(Arc::new(table_info)))
540}
541
542// TODO: This struct can hold a static map of all system tables when
543// the upper layer (e.g., procedure) can inform the catalog manager
544// a new catalog is created.
545/// Existing system tables:
546/// - public.numbers
547/// - information_schema.{tables}
548/// - pg_catalog.{tables}
549#[derive(Clone)]
550pub(super) struct SystemCatalog {
551    pub(super) catalog_manager: Weak<KvBackendCatalogManager>,
552    pub(super) catalog_cache: Cache<String, Arc<InformationSchemaProvider>>,
553    pub(super) pg_catalog_cache: Cache<String, Arc<PGCatalogProvider>>,
554
555    // system_schema_provider for default catalog
556    pub(super) information_schema_provider: Arc<InformationSchemaProvider>,
557    pub(super) pg_catalog_provider: Arc<PGCatalogProvider>,
558    pub(super) backend: KvBackendRef,
559    pub(super) process_manager: Option<ProcessManagerRef>,
560    #[cfg(feature = "enterprise")]
561    pub(super) extra_information_table_factories:
562        std::collections::HashMap<String, InformationSchemaTableFactoryRef>,
563}
564
565impl SystemCatalog {
566    fn schema_names(&self, query_ctx: Option<&QueryContext>) -> Vec<String> {
567        let channel = query_ctx.map_or(Channel::Unknown, |ctx| ctx.channel());
568        match channel {
569            // pg_catalog only visible under postgres protocol
570            Channel::Postgres => vec![
571                INFORMATION_SCHEMA_NAME.to_string(),
572                PG_CATALOG_NAME.to_string(),
573            ],
574            _ => {
575                vec![INFORMATION_SCHEMA_NAME.to_string()]
576            }
577        }
578    }
579
580    fn table_names(&self, schema: &str, query_ctx: Option<&QueryContext>) -> Vec<String> {
581        let channel = query_ctx.map_or(Channel::Unknown, |ctx| ctx.channel());
582        match schema {
583            INFORMATION_SCHEMA_NAME => self.information_schema_provider.table_names(),
584            PG_CATALOG_NAME if channel == Channel::Postgres => {
585                self.pg_catalog_provider.table_names()
586            }
587            DEFAULT_SCHEMA_NAME => {
588                vec![NUMBERS_TABLE_NAME.to_string()]
589            }
590            _ => vec![],
591        }
592    }
593
594    fn schema_exists(&self, schema: &str, query_ctx: Option<&QueryContext>) -> bool {
595        let channel = query_ctx.map_or(Channel::Unknown, |ctx| ctx.channel());
596        match channel {
597            Channel::Postgres => schema == PG_CATALOG_NAME || schema == INFORMATION_SCHEMA_NAME,
598            _ => schema == INFORMATION_SCHEMA_NAME,
599        }
600    }
601
602    fn table_exists(&self, schema: &str, table: &str, query_ctx: Option<&QueryContext>) -> bool {
603        let channel = query_ctx.map_or(Channel::Unknown, |ctx| ctx.channel());
604        if schema == INFORMATION_SCHEMA_NAME {
605            self.information_schema_provider.table(table).is_some()
606        } else if schema == DEFAULT_SCHEMA_NAME {
607            table == NUMBERS_TABLE_NAME
608        } else if schema == PG_CATALOG_NAME && channel == Channel::Postgres {
609            self.pg_catalog_provider.table(table).is_some()
610        } else {
611            false
612        }
613    }
614
615    fn table(
616        &self,
617        catalog: &str,
618        schema: &str,
619        table_name: &str,
620        query_ctx: Option<&QueryContext>,
621    ) -> Option<TableRef> {
622        let channel = query_ctx.map_or(Channel::Unknown, |ctx| ctx.channel());
623        if schema == INFORMATION_SCHEMA_NAME {
624            let information_schema_provider =
625                self.catalog_cache.get_with_by_ref(catalog, move || {
626                    let provider = InformationSchemaProvider::new(
627                        catalog.to_string(),
628                        self.catalog_manager.clone(),
629                        Arc::new(FlowMetadataManager::new(self.backend.clone())),
630                        self.process_manager.clone(),
631                        self.backend.clone(),
632                    );
633                    #[cfg(feature = "enterprise")]
634                    let provider = provider
635                        .with_extra_table_factories(self.extra_information_table_factories.clone());
636                    Arc::new(provider)
637                });
638            information_schema_provider.table(table_name)
639        } else if schema == PG_CATALOG_NAME && channel == Channel::Postgres {
640            if catalog == DEFAULT_CATALOG_NAME {
641                self.pg_catalog_provider.table(table_name)
642            } else {
643                let pg_catalog_provider =
644                    self.pg_catalog_cache.get_with_by_ref(catalog, move || {
645                        Arc::new(PGCatalogProvider::new(
646                            catalog.to_string(),
647                            self.catalog_manager.clone(),
648                        ))
649                    });
650                pg_catalog_provider.table(table_name)
651            }
652        } else if schema == DEFAULT_SCHEMA_NAME && table_name == NUMBERS_TABLE_NAME {
653            Some(NumbersTable::table(NUMBERS_TABLE_ID))
654        } else {
655            None
656        }
657    }
658}