catalog/kvbackend/
manager.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16use std::collections::BTreeSet;
17use std::sync::{Arc, Weak};
18
19use async_stream::try_stream;
20use common_catalog::consts::{
21    DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, INFORMATION_SCHEMA_NAME, PG_CATALOG_NAME,
22};
23use common_error::ext::BoxedError;
24use common_meta::cache::{
25    LayeredCacheRegistryRef, TableInfoCacheRef, TableNameCacheRef, TableRoute, TableRouteCacheRef,
26    ViewInfoCacheRef,
27};
28use common_meta::key::TableMetadataManagerRef;
29use common_meta::key::catalog_name::CatalogNameKey;
30use common_meta::key::flow::FlowMetadataManager;
31use common_meta::key::schema_name::SchemaNameKey;
32use common_meta::key::table_info::{TableInfoManager, TableInfoValue};
33use common_meta::key::table_name::TableNameKey;
34use common_meta::kv_backend::KvBackendRef;
35use common_procedure::ProcedureManagerRef;
36use futures_util::stream::BoxStream;
37use futures_util::{StreamExt, TryStreamExt};
38use moka::sync::Cache;
39use partition::manager::PartitionRuleManagerRef;
40use session::context::{Channel, QueryContext};
41use snafu::prelude::*;
42use store_api::metric_engine_consts::METRIC_ENGINE_NAME;
43use table::TableRef;
44use table::dist_table::DistTable;
45use table::metadata::{TableId, TableInfoRef};
46use table::table::PartitionRules;
47use table::table_name::TableName;
48use tokio::sync::Semaphore;
49use tokio_stream::wrappers::ReceiverStream;
50
51use crate::CatalogManager;
52use crate::error::{
53    CacheNotFoundSnafu, GetTableCacheSnafu, InvalidTableInfoInCatalogSnafu, ListCatalogsSnafu,
54    ListSchemasSnafu, ListTablesSnafu, Result, TableMetadataManagerSnafu,
55};
56#[cfg(feature = "enterprise")]
57use crate::information_schema::InformationSchemaTableFactoryRef;
58use crate::information_schema::{InformationExtensionRef, InformationSchemaProvider};
59use crate::kvbackend::TableCacheRef;
60use crate::process_manager::ProcessManagerRef;
61use crate::system_schema::SystemSchemaProvider;
62use crate::system_schema::numbers_table_provider::NumbersTableProvider;
63use crate::system_schema::pg_catalog::PGCatalogProvider;
64
65/// Access all existing catalog, schema and tables.
66///
67/// The result comes from two source, all the user tables are presented in
68/// a kv-backend which persists the metadata of a table. And system tables
69/// comes from `SystemCatalog`, which is static and read-only.
70#[derive(Clone)]
71pub struct KvBackendCatalogManager {
72    /// Provides the extension methods for the `information_schema` tables
73    pub(super) information_extension: InformationExtensionRef,
74    /// Manages partition rules.
75    pub(super) partition_manager: PartitionRuleManagerRef,
76    /// Manages table metadata.
77    pub(super) table_metadata_manager: TableMetadataManagerRef,
78    /// A sub-CatalogManager that handles system tables
79    pub(super) system_catalog: SystemCatalog,
80    /// Cache registry for all caches.
81    pub(super) cache_registry: LayeredCacheRegistryRef,
82    /// Only available in `Standalone` mode.
83    pub(super) procedure_manager: Option<ProcedureManagerRef>,
84}
85
86pub(super) const CATALOG_CACHE_MAX_CAPACITY: u64 = 128;
87
88impl KvBackendCatalogManager {
89    pub fn view_info_cache(&self) -> Result<ViewInfoCacheRef> {
90        self.cache_registry.get().context(CacheNotFoundSnafu {
91            name: "view_info_cache",
92        })
93    }
94
95    /// Returns the [`InformationExtension`].
96    pub fn information_extension(&self) -> InformationExtensionRef {
97        self.information_extension.clone()
98    }
99
100    pub fn partition_manager(&self) -> PartitionRuleManagerRef {
101        self.partition_manager.clone()
102    }
103
104    pub fn table_metadata_manager_ref(&self) -> &TableMetadataManagerRef {
105        &self.table_metadata_manager
106    }
107
108    pub fn procedure_manager(&self) -> Option<ProcedureManagerRef> {
109        self.procedure_manager.clone()
110    }
111
112    // Override logical table's partition key indices with physical table's.
113    async fn override_logical_table_partition_key_indices(
114        table_route_cache: &TableRouteCacheRef,
115        table_info_manager: &TableInfoManager,
116        table: TableRef,
117    ) -> Result<TableRef> {
118        // If the table is not a metric table, return the table directly.
119        if table.table_info().meta.engine != METRIC_ENGINE_NAME {
120            return Ok(table);
121        }
122
123        if let Some(table_route_value) = table_route_cache
124            .get(table.table_info().table_id())
125            .await
126            .context(TableMetadataManagerSnafu)?
127            && let TableRoute::Logical(logical_route) = &*table_route_value
128            && let Some(physical_table_info_value) = table_info_manager
129                .get(logical_route.physical_table_id())
130                .await
131                .context(TableMetadataManagerSnafu)?
132        {
133            let mut new_table_info = (*table.table_info()).clone();
134
135            let mut phy_part_cols_not_in_logical_table = vec![];
136
137            // Remap partition key indices from physical table to logical table
138            new_table_info.meta.partition_key_indices = physical_table_info_value
139                .table_info
140                .meta
141                .partition_key_indices
142                .iter()
143                .filter_map(|&physical_index| {
144                    // Get the column name from the physical table using the physical index
145                    physical_table_info_value
146                        .table_info
147                        .meta
148                        .schema
149                        .column_schemas
150                        .get(physical_index)
151                        .and_then(|physical_column| {
152                            // Find the corresponding index in the logical table schema
153                            let idx = new_table_info
154                                .meta
155                                .schema
156                                .column_index_by_name(physical_column.name.as_str());
157                            if idx.is_none() {
158                                // not all part columns in physical table that are also in logical table
159                                phy_part_cols_not_in_logical_table
160                                    .push(physical_column.name.clone());
161                            }
162
163                            idx
164                        })
165                })
166                .collect();
167
168            let partition_rules = if !phy_part_cols_not_in_logical_table.is_empty() {
169                Some(PartitionRules {
170                    extra_phy_cols_not_in_logical_table: phy_part_cols_not_in_logical_table,
171                })
172            } else {
173                None
174            };
175
176            let new_table = DistTable::table_partitioned(Arc::new(new_table_info), partition_rules);
177
178            return Ok(new_table);
179        }
180
181        Ok(table)
182    }
183}
184
185#[async_trait::async_trait]
186impl CatalogManager for KvBackendCatalogManager {
187    fn as_any(&self) -> &dyn Any {
188        self
189    }
190
191    async fn catalog_names(&self) -> Result<Vec<String>> {
192        let stream = self
193            .table_metadata_manager
194            .catalog_manager()
195            .catalog_names();
196
197        let keys = stream
198            .try_collect::<Vec<_>>()
199            .await
200            .map_err(BoxedError::new)
201            .context(ListCatalogsSnafu)?;
202
203        Ok(keys)
204    }
205
206    async fn schema_names(
207        &self,
208        catalog: &str,
209        query_ctx: Option<&QueryContext>,
210    ) -> Result<Vec<String>> {
211        let stream = self
212            .table_metadata_manager
213            .schema_manager()
214            .schema_names(catalog);
215        let mut keys = stream
216            .try_collect::<BTreeSet<_>>()
217            .await
218            .map_err(BoxedError::new)
219            .context(ListSchemasSnafu { catalog })?;
220
221        keys.extend(self.system_catalog.schema_names(query_ctx));
222
223        Ok(keys.into_iter().collect())
224    }
225
226    async fn table_names(
227        &self,
228        catalog: &str,
229        schema: &str,
230        query_ctx: Option<&QueryContext>,
231    ) -> Result<Vec<String>> {
232        let mut tables = self
233            .table_metadata_manager
234            .table_name_manager()
235            .tables(catalog, schema)
236            .map_ok(|(table_name, _)| table_name)
237            .try_collect::<Vec<_>>()
238            .await
239            .map_err(BoxedError::new)
240            .context(ListTablesSnafu { catalog, schema })?;
241
242        tables.extend(self.system_catalog.table_names(schema, query_ctx));
243        Ok(tables)
244    }
245
246    async fn catalog_exists(&self, catalog: &str) -> Result<bool> {
247        self.table_metadata_manager
248            .catalog_manager()
249            .exists(CatalogNameKey::new(catalog))
250            .await
251            .context(TableMetadataManagerSnafu)
252    }
253
254    async fn schema_exists(
255        &self,
256        catalog: &str,
257        schema: &str,
258        query_ctx: Option<&QueryContext>,
259    ) -> Result<bool> {
260        if self.system_catalog.schema_exists(schema, query_ctx) {
261            return Ok(true);
262        }
263
264        self.table_metadata_manager
265            .schema_manager()
266            .exists(SchemaNameKey::new(catalog, schema))
267            .await
268            .context(TableMetadataManagerSnafu)
269    }
270
271    async fn table_exists(
272        &self,
273        catalog: &str,
274        schema: &str,
275        table: &str,
276        query_ctx: Option<&QueryContext>,
277    ) -> Result<bool> {
278        if self.system_catalog.table_exists(schema, table, query_ctx) {
279            return Ok(true);
280        }
281
282        let key = TableNameKey::new(catalog, schema, table);
283        self.table_metadata_manager
284            .table_name_manager()
285            .get(key)
286            .await
287            .context(TableMetadataManagerSnafu)
288            .map(|x| x.is_some())
289    }
290
291    async fn table(
292        &self,
293        catalog_name: &str,
294        schema_name: &str,
295        table_name: &str,
296        query_ctx: Option<&QueryContext>,
297    ) -> Result<Option<TableRef>> {
298        let channel = query_ctx.map_or(Channel::Unknown, |ctx| ctx.channel());
299        if let Some(table) =
300            self.system_catalog
301                .table(catalog_name, schema_name, table_name, query_ctx)
302        {
303            return Ok(Some(table));
304        }
305
306        let table_cache: TableCacheRef = self.cache_registry.get().context(CacheNotFoundSnafu {
307            name: "table_cache",
308        })?;
309
310        let table = table_cache
311            .get_by_ref(&TableName {
312                catalog_name: catalog_name.to_string(),
313                schema_name: schema_name.to_string(),
314                table_name: table_name.to_string(),
315            })
316            .await
317            .context(GetTableCacheSnafu)?;
318
319        if let Some(table) = table {
320            let table_route_cache: TableRouteCacheRef =
321                self.cache_registry.get().context(CacheNotFoundSnafu {
322                    name: "table_route_cache",
323                })?;
324            return Self::override_logical_table_partition_key_indices(
325                &table_route_cache,
326                self.table_metadata_manager.table_info_manager(),
327                table,
328            )
329            .await
330            .map(Some);
331        }
332
333        if channel == Channel::Postgres {
334            // falldown to pg_catalog
335            if let Some(table) =
336                self.system_catalog
337                    .table(catalog_name, PG_CATALOG_NAME, table_name, query_ctx)
338            {
339                return Ok(Some(table));
340            }
341        }
342
343        Ok(None)
344    }
345
346    async fn table_id(
347        &self,
348        catalog_name: &str,
349        schema_name: &str,
350        table_name: &str,
351        query_ctx: Option<&QueryContext>,
352    ) -> Result<Option<TableId>> {
353        let channel = query_ctx.map_or(Channel::Unknown, |ctx| ctx.channel());
354        if let Some(table) =
355            self.system_catalog
356                .table(catalog_name, schema_name, table_name, query_ctx)
357        {
358            return Ok(Some(table.table_info().table_id()));
359        }
360
361        let table_cache: TableNameCacheRef =
362            self.cache_registry.get().context(CacheNotFoundSnafu {
363                name: "table_name_cache",
364            })?;
365
366        let table = table_cache
367            .get_by_ref(&TableName {
368                catalog_name: catalog_name.to_string(),
369                schema_name: schema_name.to_string(),
370                table_name: table_name.to_string(),
371            })
372            .await
373            .context(GetTableCacheSnafu)?;
374
375        if let Some(table) = table {
376            return Ok(Some(table));
377        }
378
379        if channel == Channel::Postgres {
380            // falldown to pg_catalog
381            if let Some(table) =
382                self.system_catalog
383                    .table(catalog_name, PG_CATALOG_NAME, table_name, query_ctx)
384            {
385                return Ok(Some(table.table_info().table_id()));
386            }
387        }
388
389        Ok(None)
390    }
391
392    async fn table_info_by_id(&self, table_id: TableId) -> Result<Option<TableInfoRef>> {
393        let table_info_cache: TableInfoCacheRef =
394            self.cache_registry.get().context(CacheNotFoundSnafu {
395                name: "table_info_cache",
396            })?;
397        table_info_cache
398            .get_by_ref(&table_id)
399            .await
400            .context(GetTableCacheSnafu)
401    }
402
403    async fn tables_by_ids(
404        &self,
405        catalog: &str,
406        schema: &str,
407        table_ids: &[TableId],
408    ) -> Result<Vec<TableRef>> {
409        let table_info_values = self
410            .table_metadata_manager
411            .table_info_manager()
412            .batch_get(table_ids)
413            .await
414            .context(TableMetadataManagerSnafu)?;
415
416        let tables = table_info_values
417            .into_values()
418            .filter(|t| t.table_info.catalog_name == catalog && t.table_info.schema_name == schema)
419            .map(build_table)
420            .collect::<Result<Vec<_>>>()?;
421
422        Ok(tables)
423    }
424
425    fn tables<'a>(
426        &'a self,
427        catalog: &'a str,
428        schema: &'a str,
429        query_ctx: Option<&'a QueryContext>,
430    ) -> BoxStream<'a, Result<TableRef>> {
431        let sys_tables = try_stream!({
432            // System tables
433            let sys_table_names = self.system_catalog.table_names(schema, query_ctx);
434            for table_name in sys_table_names {
435                if let Some(table) =
436                    self.system_catalog
437                        .table(catalog, schema, &table_name, query_ctx)
438                {
439                    yield table;
440                }
441            }
442        });
443
444        const BATCH_SIZE: usize = 128;
445        const CONCURRENCY: usize = 8;
446
447        let (tx, rx) = tokio::sync::mpsc::channel(64);
448        let metadata_manager = self.table_metadata_manager.clone();
449        let catalog = catalog.to_string();
450        let schema = schema.to_string();
451        let semaphore = Arc::new(Semaphore::new(CONCURRENCY));
452        let table_route_cache: Result<TableRouteCacheRef> =
453            self.cache_registry.get().context(CacheNotFoundSnafu {
454                name: "table_route_cache",
455            });
456
457        common_runtime::spawn_global(async move {
458            let table_route_cache = match table_route_cache {
459                Ok(table_route_cache) => table_route_cache,
460                Err(e) => {
461                    let _ = tx.send(Err(e)).await;
462                    return;
463                }
464            };
465
466            let table_id_stream = metadata_manager
467                .table_name_manager()
468                .tables(&catalog, &schema)
469                .map_ok(|(_, v)| v.table_id());
470            // Split table ids into chunks
471            let mut table_id_chunks = table_id_stream.ready_chunks(BATCH_SIZE);
472
473            while let Some(table_ids) = table_id_chunks.next().await {
474                let table_ids = match table_ids
475                    .into_iter()
476                    .collect::<std::result::Result<Vec<_>, _>>()
477                    .map_err(BoxedError::new)
478                    .context(ListTablesSnafu {
479                        catalog: &catalog,
480                        schema: &schema,
481                    }) {
482                    Ok(table_ids) => table_ids,
483                    Err(e) => {
484                        let _ = tx.send(Err(e)).await;
485                        return;
486                    }
487                };
488
489                let metadata_manager = metadata_manager.clone();
490                let tx = tx.clone();
491                let semaphore = semaphore.clone();
492                let table_route_cache = table_route_cache.clone();
493                common_runtime::spawn_global(async move {
494                    // we don't explicitly close the semaphore so just ignore the potential error.
495                    let _ = semaphore.acquire().await;
496                    let table_info_values = match metadata_manager
497                        .table_info_manager()
498                        .batch_get(&table_ids)
499                        .await
500                        .context(TableMetadataManagerSnafu)
501                    {
502                        Ok(table_info_values) => table_info_values,
503                        Err(e) => {
504                            let _ = tx.send(Err(e)).await;
505                            return;
506                        }
507                    };
508
509                    for table in table_info_values.into_values().map(build_table) {
510                        let table = if let Ok(table) = table {
511                            Self::override_logical_table_partition_key_indices(
512                                &table_route_cache,
513                                metadata_manager.table_info_manager(),
514                                table,
515                            )
516                            .await
517                        } else {
518                            table
519                        };
520                        if tx.send(table).await.is_err() {
521                            return;
522                        }
523                    }
524                });
525            }
526        });
527
528        let user_tables = ReceiverStream::new(rx);
529        Box::pin(sys_tables.chain(user_tables))
530    }
531}
532
533fn build_table(table_info_value: TableInfoValue) -> Result<TableRef> {
534    let table_info = table_info_value
535        .table_info
536        .try_into()
537        .context(InvalidTableInfoInCatalogSnafu)?;
538    Ok(DistTable::table(Arc::new(table_info)))
539}
540
541// TODO: This struct can hold a static map of all system tables when
542// the upper layer (e.g., procedure) can inform the catalog manager
543// a new catalog is created.
544/// Existing system tables:
545/// - public.numbers
546/// - information_schema.{tables}
547/// - pg_catalog.{tables}
548#[derive(Clone)]
549pub(super) struct SystemCatalog {
550    pub(super) catalog_manager: Weak<KvBackendCatalogManager>,
551    pub(super) catalog_cache: Cache<String, Arc<InformationSchemaProvider>>,
552    pub(super) pg_catalog_cache: Cache<String, Arc<PGCatalogProvider>>,
553
554    // system_schema_provider for default catalog
555    pub(super) information_schema_provider: Arc<InformationSchemaProvider>,
556    pub(super) pg_catalog_provider: Arc<PGCatalogProvider>,
557    pub(super) numbers_table_provider: NumbersTableProvider,
558    pub(super) backend: KvBackendRef,
559    pub(super) process_manager: Option<ProcessManagerRef>,
560    #[cfg(feature = "enterprise")]
561    pub(super) extra_information_table_factories:
562        std::collections::HashMap<String, InformationSchemaTableFactoryRef>,
563}
564
565impl SystemCatalog {
566    fn schema_names(&self, query_ctx: Option<&QueryContext>) -> Vec<String> {
567        let channel = query_ctx.map_or(Channel::Unknown, |ctx| ctx.channel());
568        match channel {
569            // pg_catalog only visible under postgres protocol
570            Channel::Postgres => vec![
571                INFORMATION_SCHEMA_NAME.to_string(),
572                PG_CATALOG_NAME.to_string(),
573            ],
574            _ => {
575                vec![INFORMATION_SCHEMA_NAME.to_string()]
576            }
577        }
578    }
579
580    fn table_names(&self, schema: &str, query_ctx: Option<&QueryContext>) -> Vec<String> {
581        let channel = query_ctx.map_or(Channel::Unknown, |ctx| ctx.channel());
582        match schema {
583            INFORMATION_SCHEMA_NAME => self.information_schema_provider.table_names(),
584            PG_CATALOG_NAME if channel == Channel::Postgres => {
585                self.pg_catalog_provider.table_names()
586            }
587            DEFAULT_SCHEMA_NAME => self.numbers_table_provider.table_names(),
588            _ => vec![],
589        }
590    }
591
592    fn schema_exists(&self, schema: &str, query_ctx: Option<&QueryContext>) -> bool {
593        let channel = query_ctx.map_or(Channel::Unknown, |ctx| ctx.channel());
594        match channel {
595            Channel::Postgres => schema == PG_CATALOG_NAME || schema == INFORMATION_SCHEMA_NAME,
596            _ => schema == INFORMATION_SCHEMA_NAME,
597        }
598    }
599
600    fn table_exists(&self, schema: &str, table: &str, query_ctx: Option<&QueryContext>) -> bool {
601        let channel = query_ctx.map_or(Channel::Unknown, |ctx| ctx.channel());
602        if schema == INFORMATION_SCHEMA_NAME {
603            self.information_schema_provider.table(table).is_some()
604        } else if schema == DEFAULT_SCHEMA_NAME {
605            self.numbers_table_provider.table_exists(table)
606        } else if schema == PG_CATALOG_NAME && channel == Channel::Postgres {
607            self.pg_catalog_provider.table(table).is_some()
608        } else {
609            false
610        }
611    }
612
613    fn table(
614        &self,
615        catalog: &str,
616        schema: &str,
617        table_name: &str,
618        query_ctx: Option<&QueryContext>,
619    ) -> Option<TableRef> {
620        let channel = query_ctx.map_or(Channel::Unknown, |ctx| ctx.channel());
621        if schema == INFORMATION_SCHEMA_NAME {
622            let information_schema_provider =
623                self.catalog_cache.get_with_by_ref(catalog, move || {
624                    let provider = InformationSchemaProvider::new(
625                        catalog.to_string(),
626                        self.catalog_manager.clone(),
627                        Arc::new(FlowMetadataManager::new(self.backend.clone())),
628                        self.process_manager.clone(),
629                        self.backend.clone(),
630                    );
631                    #[cfg(feature = "enterprise")]
632                    let provider = provider
633                        .with_extra_table_factories(self.extra_information_table_factories.clone());
634                    Arc::new(provider)
635                });
636            information_schema_provider.table(table_name)
637        } else if schema == PG_CATALOG_NAME && channel == Channel::Postgres {
638            if catalog == DEFAULT_CATALOG_NAME {
639                self.pg_catalog_provider.table(table_name)
640            } else {
641                let pg_catalog_provider =
642                    self.pg_catalog_cache.get_with_by_ref(catalog, move || {
643                        Arc::new(PGCatalogProvider::new(
644                            catalog.to_string(),
645                            self.catalog_manager.clone(),
646                        ))
647                    });
648                pg_catalog_provider.table(table_name)
649            }
650        } else if schema == DEFAULT_SCHEMA_NAME {
651            self.numbers_table_provider.table(table_name)
652        } else {
653            None
654        }
655    }
656}