catalog/kvbackend/
manager.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16use std::collections::BTreeSet;
17use std::sync::{Arc, Weak};
18
19use async_stream::try_stream;
20use common_catalog::consts::{
21    DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, INFORMATION_SCHEMA_NAME, NUMBERS_TABLE_ID,
22    PG_CATALOG_NAME,
23};
24use common_error::ext::BoxedError;
25use common_meta::cache::{
26    LayeredCacheRegistryRef, TableRoute, TableRouteCacheRef, ViewInfoCacheRef,
27};
28use common_meta::key::catalog_name::CatalogNameKey;
29use common_meta::key::flow::FlowMetadataManager;
30use common_meta::key::schema_name::SchemaNameKey;
31use common_meta::key::table_info::{TableInfoManager, TableInfoValue};
32use common_meta::key::table_name::TableNameKey;
33use common_meta::key::TableMetadataManagerRef;
34use common_meta::kv_backend::KvBackendRef;
35use common_procedure::ProcedureManagerRef;
36use futures_util::stream::BoxStream;
37use futures_util::{StreamExt, TryStreamExt};
38use moka::sync::Cache;
39use partition::manager::PartitionRuleManagerRef;
40use session::context::{Channel, QueryContext};
41use snafu::prelude::*;
42use store_api::metric_engine_consts::METRIC_ENGINE_NAME;
43use table::dist_table::DistTable;
44use table::metadata::TableId;
45use table::table::numbers::{NumbersTable, NUMBERS_TABLE_NAME};
46use table::table_name::TableName;
47use table::TableRef;
48use tokio::sync::Semaphore;
49use tokio_stream::wrappers::ReceiverStream;
50
51use crate::error::{
52    CacheNotFoundSnafu, GetTableCacheSnafu, InvalidTableInfoInCatalogSnafu, ListCatalogsSnafu,
53    ListSchemasSnafu, ListTablesSnafu, Result, TableMetadataManagerSnafu,
54};
55#[cfg(feature = "enterprise")]
56use crate::information_schema::InformationSchemaTableFactoryRef;
57use crate::information_schema::{InformationExtensionRef, InformationSchemaProvider};
58use crate::kvbackend::TableCacheRef;
59use crate::process_manager::ProcessManagerRef;
60use crate::system_schema::pg_catalog::PGCatalogProvider;
61use crate::system_schema::SystemSchemaProvider;
62use crate::CatalogManager;
63
64/// Access all existing catalog, schema and tables.
65///
66/// The result comes from two source, all the user tables are presented in
67/// a kv-backend which persists the metadata of a table. And system tables
68/// comes from `SystemCatalog`, which is static and read-only.
69#[derive(Clone)]
70pub struct KvBackendCatalogManager {
71    /// Provides the extension methods for the `information_schema` tables
72    pub(super) information_extension: InformationExtensionRef,
73    /// Manages partition rules.
74    pub(super) partition_manager: PartitionRuleManagerRef,
75    /// Manages table metadata.
76    pub(super) table_metadata_manager: TableMetadataManagerRef,
77    /// A sub-CatalogManager that handles system tables
78    pub(super) system_catalog: SystemCatalog,
79    /// Cache registry for all caches.
80    pub(super) cache_registry: LayeredCacheRegistryRef,
81    /// Only available in `Standalone` mode.
82    pub(super) procedure_manager: Option<ProcedureManagerRef>,
83}
84
85pub(super) const CATALOG_CACHE_MAX_CAPACITY: u64 = 128;
86
87impl KvBackendCatalogManager {
88    pub fn view_info_cache(&self) -> Result<ViewInfoCacheRef> {
89        self.cache_registry.get().context(CacheNotFoundSnafu {
90            name: "view_info_cache",
91        })
92    }
93
94    /// Returns the [`InformationExtension`].
95    pub fn information_extension(&self) -> InformationExtensionRef {
96        self.information_extension.clone()
97    }
98
99    pub fn partition_manager(&self) -> PartitionRuleManagerRef {
100        self.partition_manager.clone()
101    }
102
103    pub fn table_metadata_manager_ref(&self) -> &TableMetadataManagerRef {
104        &self.table_metadata_manager
105    }
106
107    pub fn procedure_manager(&self) -> Option<ProcedureManagerRef> {
108        self.procedure_manager.clone()
109    }
110
111    // Override logical table's partition key indices with physical table's.
112    async fn override_logical_table_partition_key_indices(
113        table_route_cache: &TableRouteCacheRef,
114        table_info_manager: &TableInfoManager,
115        table: TableRef,
116    ) -> Result<TableRef> {
117        // If the table is not a metric table, return the table directly.
118        if table.table_info().meta.engine != METRIC_ENGINE_NAME {
119            return Ok(table);
120        }
121
122        if let Some(table_route_value) = table_route_cache
123            .get(table.table_info().table_id())
124            .await
125            .context(TableMetadataManagerSnafu)?
126            && let TableRoute::Logical(logical_route) = &*table_route_value
127            && let Some(physical_table_info_value) = table_info_manager
128                .get(logical_route.physical_table_id())
129                .await
130                .context(TableMetadataManagerSnafu)?
131        {
132            let mut new_table_info = (*table.table_info()).clone();
133
134            // Remap partition key indices from physical table to logical table
135            new_table_info.meta.partition_key_indices = physical_table_info_value
136                .table_info
137                .meta
138                .partition_key_indices
139                .iter()
140                .filter_map(|&physical_index| {
141                    // Get the column name from the physical table using the physical index
142                    physical_table_info_value
143                        .table_info
144                        .meta
145                        .schema
146                        .column_schemas
147                        .get(physical_index)
148                        .and_then(|physical_column| {
149                            // Find the corresponding index in the logical table schema
150                            new_table_info
151                                .meta
152                                .schema
153                                .column_index_by_name(physical_column.name.as_str())
154                        })
155                })
156                .collect();
157
158            let new_table = DistTable::table(Arc::new(new_table_info));
159
160            return Ok(new_table);
161        }
162
163        Ok(table)
164    }
165}
166
167#[async_trait::async_trait]
168impl CatalogManager for KvBackendCatalogManager {
169    fn as_any(&self) -> &dyn Any {
170        self
171    }
172
173    async fn catalog_names(&self) -> Result<Vec<String>> {
174        let stream = self
175            .table_metadata_manager
176            .catalog_manager()
177            .catalog_names();
178
179        let keys = stream
180            .try_collect::<Vec<_>>()
181            .await
182            .map_err(BoxedError::new)
183            .context(ListCatalogsSnafu)?;
184
185        Ok(keys)
186    }
187
188    async fn schema_names(
189        &self,
190        catalog: &str,
191        query_ctx: Option<&QueryContext>,
192    ) -> Result<Vec<String>> {
193        let stream = self
194            .table_metadata_manager
195            .schema_manager()
196            .schema_names(catalog);
197        let mut keys = stream
198            .try_collect::<BTreeSet<_>>()
199            .await
200            .map_err(BoxedError::new)
201            .context(ListSchemasSnafu { catalog })?;
202
203        keys.extend(self.system_catalog.schema_names(query_ctx));
204
205        Ok(keys.into_iter().collect())
206    }
207
208    async fn table_names(
209        &self,
210        catalog: &str,
211        schema: &str,
212        query_ctx: Option<&QueryContext>,
213    ) -> Result<Vec<String>> {
214        let mut tables = self
215            .table_metadata_manager
216            .table_name_manager()
217            .tables(catalog, schema)
218            .map_ok(|(table_name, _)| table_name)
219            .try_collect::<Vec<_>>()
220            .await
221            .map_err(BoxedError::new)
222            .context(ListTablesSnafu { catalog, schema })?;
223
224        tables.extend(self.system_catalog.table_names(schema, query_ctx));
225        Ok(tables)
226    }
227
228    async fn catalog_exists(&self, catalog: &str) -> Result<bool> {
229        self.table_metadata_manager
230            .catalog_manager()
231            .exists(CatalogNameKey::new(catalog))
232            .await
233            .context(TableMetadataManagerSnafu)
234    }
235
236    async fn schema_exists(
237        &self,
238        catalog: &str,
239        schema: &str,
240        query_ctx: Option<&QueryContext>,
241    ) -> Result<bool> {
242        if self.system_catalog.schema_exists(schema, query_ctx) {
243            return Ok(true);
244        }
245
246        self.table_metadata_manager
247            .schema_manager()
248            .exists(SchemaNameKey::new(catalog, schema))
249            .await
250            .context(TableMetadataManagerSnafu)
251    }
252
253    async fn table_exists(
254        &self,
255        catalog: &str,
256        schema: &str,
257        table: &str,
258        query_ctx: Option<&QueryContext>,
259    ) -> Result<bool> {
260        if self.system_catalog.table_exists(schema, table, query_ctx) {
261            return Ok(true);
262        }
263
264        let key = TableNameKey::new(catalog, schema, table);
265        self.table_metadata_manager
266            .table_name_manager()
267            .get(key)
268            .await
269            .context(TableMetadataManagerSnafu)
270            .map(|x| x.is_some())
271    }
272
273    async fn table(
274        &self,
275        catalog_name: &str,
276        schema_name: &str,
277        table_name: &str,
278        query_ctx: Option<&QueryContext>,
279    ) -> Result<Option<TableRef>> {
280        let channel = query_ctx.map_or(Channel::Unknown, |ctx| ctx.channel());
281        if let Some(table) =
282            self.system_catalog
283                .table(catalog_name, schema_name, table_name, query_ctx)
284        {
285            return Ok(Some(table));
286        }
287
288        let table_cache: TableCacheRef = self.cache_registry.get().context(CacheNotFoundSnafu {
289            name: "table_cache",
290        })?;
291
292        let table = table_cache
293            .get_by_ref(&TableName {
294                catalog_name: catalog_name.to_string(),
295                schema_name: schema_name.to_string(),
296                table_name: table_name.to_string(),
297            })
298            .await
299            .context(GetTableCacheSnafu)?;
300
301        if let Some(table) = table {
302            let table_route_cache: TableRouteCacheRef =
303                self.cache_registry.get().context(CacheNotFoundSnafu {
304                    name: "table_route_cache",
305                })?;
306            return Self::override_logical_table_partition_key_indices(
307                &table_route_cache,
308                self.table_metadata_manager.table_info_manager(),
309                table,
310            )
311            .await
312            .map(Some);
313        }
314
315        if channel == Channel::Postgres {
316            // falldown to pg_catalog
317            if let Some(table) =
318                self.system_catalog
319                    .table(catalog_name, PG_CATALOG_NAME, table_name, query_ctx)
320            {
321                return Ok(Some(table));
322            }
323        }
324
325        Ok(None)
326    }
327
328    async fn tables_by_ids(
329        &self,
330        catalog: &str,
331        schema: &str,
332        table_ids: &[TableId],
333    ) -> Result<Vec<TableRef>> {
334        let table_info_values = self
335            .table_metadata_manager
336            .table_info_manager()
337            .batch_get(table_ids)
338            .await
339            .context(TableMetadataManagerSnafu)?;
340
341        let tables = table_info_values
342            .into_values()
343            .filter(|t| t.table_info.catalog_name == catalog && t.table_info.schema_name == schema)
344            .map(build_table)
345            .collect::<Result<Vec<_>>>()?;
346
347        Ok(tables)
348    }
349
350    fn tables<'a>(
351        &'a self,
352        catalog: &'a str,
353        schema: &'a str,
354        query_ctx: Option<&'a QueryContext>,
355    ) -> BoxStream<'a, Result<TableRef>> {
356        let sys_tables = try_stream!({
357            // System tables
358            let sys_table_names = self.system_catalog.table_names(schema, query_ctx);
359            for table_name in sys_table_names {
360                if let Some(table) =
361                    self.system_catalog
362                        .table(catalog, schema, &table_name, query_ctx)
363                {
364                    yield table;
365                }
366            }
367        });
368
369        const BATCH_SIZE: usize = 128;
370        const CONCURRENCY: usize = 8;
371
372        let (tx, rx) = tokio::sync::mpsc::channel(64);
373        let metadata_manager = self.table_metadata_manager.clone();
374        let catalog = catalog.to_string();
375        let schema = schema.to_string();
376        let semaphore = Arc::new(Semaphore::new(CONCURRENCY));
377        let table_route_cache: Result<TableRouteCacheRef> =
378            self.cache_registry.get().context(CacheNotFoundSnafu {
379                name: "table_route_cache",
380            });
381
382        common_runtime::spawn_global(async move {
383            let table_route_cache = match table_route_cache {
384                Ok(table_route_cache) => table_route_cache,
385                Err(e) => {
386                    let _ = tx.send(Err(e)).await;
387                    return;
388                }
389            };
390
391            let table_id_stream = metadata_manager
392                .table_name_manager()
393                .tables(&catalog, &schema)
394                .map_ok(|(_, v)| v.table_id());
395            // Split table ids into chunks
396            let mut table_id_chunks = table_id_stream.ready_chunks(BATCH_SIZE);
397
398            while let Some(table_ids) = table_id_chunks.next().await {
399                let table_ids = match table_ids
400                    .into_iter()
401                    .collect::<std::result::Result<Vec<_>, _>>()
402                    .map_err(BoxedError::new)
403                    .context(ListTablesSnafu {
404                        catalog: &catalog,
405                        schema: &schema,
406                    }) {
407                    Ok(table_ids) => table_ids,
408                    Err(e) => {
409                        let _ = tx.send(Err(e)).await;
410                        return;
411                    }
412                };
413
414                let metadata_manager = metadata_manager.clone();
415                let tx = tx.clone();
416                let semaphore = semaphore.clone();
417                let table_route_cache = table_route_cache.clone();
418                common_runtime::spawn_global(async move {
419                    // we don't explicitly close the semaphore so just ignore the potential error.
420                    let _ = semaphore.acquire().await;
421                    let table_info_values = match metadata_manager
422                        .table_info_manager()
423                        .batch_get(&table_ids)
424                        .await
425                        .context(TableMetadataManagerSnafu)
426                    {
427                        Ok(table_info_values) => table_info_values,
428                        Err(e) => {
429                            let _ = tx.send(Err(e)).await;
430                            return;
431                        }
432                    };
433
434                    for table in table_info_values.into_values().map(build_table) {
435                        let table = if let Ok(table) = table {
436                            Self::override_logical_table_partition_key_indices(
437                                &table_route_cache,
438                                metadata_manager.table_info_manager(),
439                                table,
440                            )
441                            .await
442                        } else {
443                            table
444                        };
445                        if tx.send(table).await.is_err() {
446                            return;
447                        }
448                    }
449                });
450            }
451        });
452
453        let user_tables = ReceiverStream::new(rx);
454        Box::pin(sys_tables.chain(user_tables))
455    }
456}
457
458fn build_table(table_info_value: TableInfoValue) -> Result<TableRef> {
459    let table_info = table_info_value
460        .table_info
461        .try_into()
462        .context(InvalidTableInfoInCatalogSnafu)?;
463    Ok(DistTable::table(Arc::new(table_info)))
464}
465
466// TODO: This struct can hold a static map of all system tables when
467// the upper layer (e.g., procedure) can inform the catalog manager
468// a new catalog is created.
469/// Existing system tables:
470/// - public.numbers
471/// - information_schema.{tables}
472/// - pg_catalog.{tables}
473#[derive(Clone)]
474pub(super) struct SystemCatalog {
475    pub(super) catalog_manager: Weak<KvBackendCatalogManager>,
476    pub(super) catalog_cache: Cache<String, Arc<InformationSchemaProvider>>,
477    pub(super) pg_catalog_cache: Cache<String, Arc<PGCatalogProvider>>,
478
479    // system_schema_provider for default catalog
480    pub(super) information_schema_provider: Arc<InformationSchemaProvider>,
481    pub(super) pg_catalog_provider: Arc<PGCatalogProvider>,
482    pub(super) backend: KvBackendRef,
483    pub(super) process_manager: Option<ProcessManagerRef>,
484    #[cfg(feature = "enterprise")]
485    pub(super) extra_information_table_factories:
486        std::collections::HashMap<String, InformationSchemaTableFactoryRef>,
487}
488
489impl SystemCatalog {
490    fn schema_names(&self, query_ctx: Option<&QueryContext>) -> Vec<String> {
491        let channel = query_ctx.map_or(Channel::Unknown, |ctx| ctx.channel());
492        match channel {
493            // pg_catalog only visible under postgres protocol
494            Channel::Postgres => vec![
495                INFORMATION_SCHEMA_NAME.to_string(),
496                PG_CATALOG_NAME.to_string(),
497            ],
498            _ => {
499                vec![INFORMATION_SCHEMA_NAME.to_string()]
500            }
501        }
502    }
503
504    fn table_names(&self, schema: &str, query_ctx: Option<&QueryContext>) -> Vec<String> {
505        let channel = query_ctx.map_or(Channel::Unknown, |ctx| ctx.channel());
506        match schema {
507            INFORMATION_SCHEMA_NAME => self.information_schema_provider.table_names(),
508            PG_CATALOG_NAME if channel == Channel::Postgres => {
509                self.pg_catalog_provider.table_names()
510            }
511            DEFAULT_SCHEMA_NAME => {
512                vec![NUMBERS_TABLE_NAME.to_string()]
513            }
514            _ => vec![],
515        }
516    }
517
518    fn schema_exists(&self, schema: &str, query_ctx: Option<&QueryContext>) -> bool {
519        let channel = query_ctx.map_or(Channel::Unknown, |ctx| ctx.channel());
520        match channel {
521            Channel::Postgres => schema == PG_CATALOG_NAME || schema == INFORMATION_SCHEMA_NAME,
522            _ => schema == INFORMATION_SCHEMA_NAME,
523        }
524    }
525
526    fn table_exists(&self, schema: &str, table: &str, query_ctx: Option<&QueryContext>) -> bool {
527        let channel = query_ctx.map_or(Channel::Unknown, |ctx| ctx.channel());
528        if schema == INFORMATION_SCHEMA_NAME {
529            self.information_schema_provider.table(table).is_some()
530        } else if schema == DEFAULT_SCHEMA_NAME {
531            table == NUMBERS_TABLE_NAME
532        } else if schema == PG_CATALOG_NAME && channel == Channel::Postgres {
533            self.pg_catalog_provider.table(table).is_some()
534        } else {
535            false
536        }
537    }
538
539    fn table(
540        &self,
541        catalog: &str,
542        schema: &str,
543        table_name: &str,
544        query_ctx: Option<&QueryContext>,
545    ) -> Option<TableRef> {
546        let channel = query_ctx.map_or(Channel::Unknown, |ctx| ctx.channel());
547        if schema == INFORMATION_SCHEMA_NAME {
548            let information_schema_provider =
549                self.catalog_cache.get_with_by_ref(catalog, move || {
550                    let provider = InformationSchemaProvider::new(
551                        catalog.to_string(),
552                        self.catalog_manager.clone(),
553                        Arc::new(FlowMetadataManager::new(self.backend.clone())),
554                        self.process_manager.clone(),
555                        self.backend.clone(),
556                    );
557                    #[cfg(feature = "enterprise")]
558                    let provider = provider
559                        .with_extra_table_factories(self.extra_information_table_factories.clone());
560                    Arc::new(provider)
561                });
562            information_schema_provider.table(table_name)
563        } else if schema == PG_CATALOG_NAME && channel == Channel::Postgres {
564            if catalog == DEFAULT_CATALOG_NAME {
565                self.pg_catalog_provider.table(table_name)
566            } else {
567                let pg_catalog_provider =
568                    self.pg_catalog_cache.get_with_by_ref(catalog, move || {
569                        Arc::new(PGCatalogProvider::new(
570                            catalog.to_string(),
571                            self.catalog_manager.clone(),
572                        ))
573                    });
574                pg_catalog_provider.table(table_name)
575            }
576        } else if schema == DEFAULT_SCHEMA_NAME && table_name == NUMBERS_TABLE_NAME {
577            Some(NumbersTable::table(NUMBERS_TABLE_ID))
578        } else {
579            None
580        }
581    }
582}