catalog/kvbackend/
manager.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16use std::collections::BTreeSet;
17use std::sync::{Arc, Weak};
18
19use async_stream::try_stream;
20use common_catalog::consts::{
21    DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, INFORMATION_SCHEMA_NAME, NUMBERS_TABLE_ID,
22    PG_CATALOG_NAME,
23};
24use common_error::ext::BoxedError;
25use common_meta::cache::{LayeredCacheRegistryRef, ViewInfoCacheRef};
26use common_meta::key::catalog_name::CatalogNameKey;
27use common_meta::key::flow::FlowMetadataManager;
28use common_meta::key::schema_name::SchemaNameKey;
29use common_meta::key::table_info::TableInfoValue;
30use common_meta::key::table_name::TableNameKey;
31use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
32use common_meta::kv_backend::KvBackendRef;
33use common_procedure::ProcedureManagerRef;
34use futures_util::stream::BoxStream;
35use futures_util::{StreamExt, TryStreamExt};
36use moka::sync::Cache;
37use partition::manager::{PartitionRuleManager, PartitionRuleManagerRef};
38use session::context::{Channel, QueryContext};
39use snafu::prelude::*;
40use table::dist_table::DistTable;
41use table::metadata::TableId;
42use table::table::numbers::{NumbersTable, NUMBERS_TABLE_NAME};
43use table::table_name::TableName;
44use table::TableRef;
45use tokio::sync::Semaphore;
46use tokio_stream::wrappers::ReceiverStream;
47
48use crate::error::{
49    CacheNotFoundSnafu, GetTableCacheSnafu, InvalidTableInfoInCatalogSnafu, ListCatalogsSnafu,
50    ListSchemasSnafu, ListTablesSnafu, Result, TableMetadataManagerSnafu,
51};
52use crate::information_schema::{InformationExtensionRef, InformationSchemaProvider};
53use crate::kvbackend::TableCacheRef;
54use crate::system_schema::pg_catalog::PGCatalogProvider;
55use crate::system_schema::SystemSchemaProvider;
56use crate::CatalogManager;
57
58/// Access all existing catalog, schema and tables.
59///
60/// The result comes from two source, all the user tables are presented in
61/// a kv-backend which persists the metadata of a table. And system tables
62/// comes from `SystemCatalog`, which is static and read-only.
63#[derive(Clone)]
64pub struct KvBackendCatalogManager {
65    /// Provides the extension methods for the `information_schema` tables
66    information_extension: InformationExtensionRef,
67    /// Manages partition rules.
68    partition_manager: PartitionRuleManagerRef,
69    /// Manages table metadata.
70    table_metadata_manager: TableMetadataManagerRef,
71    /// A sub-CatalogManager that handles system tables
72    system_catalog: SystemCatalog,
73    /// Cache registry for all caches.
74    cache_registry: LayeredCacheRegistryRef,
75    /// Only available in `Standalone` mode.
76    procedure_manager: Option<ProcedureManagerRef>,
77}
78
79const CATALOG_CACHE_MAX_CAPACITY: u64 = 128;
80
81impl KvBackendCatalogManager {
82    pub fn new(
83        information_extension: InformationExtensionRef,
84        backend: KvBackendRef,
85        cache_registry: LayeredCacheRegistryRef,
86        procedure_manager: Option<ProcedureManagerRef>,
87    ) -> Arc<Self> {
88        Arc::new_cyclic(|me| Self {
89            information_extension,
90            partition_manager: Arc::new(PartitionRuleManager::new(
91                backend.clone(),
92                cache_registry
93                    .get()
94                    .expect("Failed to get table_route_cache"),
95            )),
96            table_metadata_manager: Arc::new(TableMetadataManager::new(backend.clone())),
97            system_catalog: SystemCatalog {
98                catalog_manager: me.clone(),
99                catalog_cache: Cache::new(CATALOG_CACHE_MAX_CAPACITY),
100                pg_catalog_cache: Cache::new(CATALOG_CACHE_MAX_CAPACITY),
101                information_schema_provider: Arc::new(InformationSchemaProvider::new(
102                    DEFAULT_CATALOG_NAME.to_string(),
103                    me.clone(),
104                    Arc::new(FlowMetadataManager::new(backend.clone())),
105                )),
106                pg_catalog_provider: Arc::new(PGCatalogProvider::new(
107                    DEFAULT_CATALOG_NAME.to_string(),
108                    me.clone(),
109                )),
110                backend,
111            },
112            cache_registry,
113            procedure_manager,
114        })
115    }
116
117    pub fn view_info_cache(&self) -> Result<ViewInfoCacheRef> {
118        self.cache_registry.get().context(CacheNotFoundSnafu {
119            name: "view_info_cache",
120        })
121    }
122
123    /// Returns the [`InformationExtension`].
124    pub fn information_extension(&self) -> InformationExtensionRef {
125        self.information_extension.clone()
126    }
127
128    pub fn partition_manager(&self) -> PartitionRuleManagerRef {
129        self.partition_manager.clone()
130    }
131
132    pub fn table_metadata_manager_ref(&self) -> &TableMetadataManagerRef {
133        &self.table_metadata_manager
134    }
135
136    pub fn procedure_manager(&self) -> Option<ProcedureManagerRef> {
137        self.procedure_manager.clone()
138    }
139}
140
141#[async_trait::async_trait]
142impl CatalogManager for KvBackendCatalogManager {
143    fn as_any(&self) -> &dyn Any {
144        self
145    }
146
147    async fn catalog_names(&self) -> Result<Vec<String>> {
148        let stream = self
149            .table_metadata_manager
150            .catalog_manager()
151            .catalog_names();
152
153        let keys = stream
154            .try_collect::<Vec<_>>()
155            .await
156            .map_err(BoxedError::new)
157            .context(ListCatalogsSnafu)?;
158
159        Ok(keys)
160    }
161
162    async fn schema_names(
163        &self,
164        catalog: &str,
165        query_ctx: Option<&QueryContext>,
166    ) -> Result<Vec<String>> {
167        let stream = self
168            .table_metadata_manager
169            .schema_manager()
170            .schema_names(catalog);
171        let mut keys = stream
172            .try_collect::<BTreeSet<_>>()
173            .await
174            .map_err(BoxedError::new)
175            .context(ListSchemasSnafu { catalog })?;
176
177        keys.extend(self.system_catalog.schema_names(query_ctx));
178
179        Ok(keys.into_iter().collect())
180    }
181
182    async fn table_names(
183        &self,
184        catalog: &str,
185        schema: &str,
186        query_ctx: Option<&QueryContext>,
187    ) -> Result<Vec<String>> {
188        let mut tables = self
189            .table_metadata_manager
190            .table_name_manager()
191            .tables(catalog, schema)
192            .map_ok(|(table_name, _)| table_name)
193            .try_collect::<Vec<_>>()
194            .await
195            .map_err(BoxedError::new)
196            .context(ListTablesSnafu { catalog, schema })?;
197
198        tables.extend(self.system_catalog.table_names(schema, query_ctx));
199        Ok(tables)
200    }
201
202    async fn catalog_exists(&self, catalog: &str) -> Result<bool> {
203        self.table_metadata_manager
204            .catalog_manager()
205            .exists(CatalogNameKey::new(catalog))
206            .await
207            .context(TableMetadataManagerSnafu)
208    }
209
210    async fn schema_exists(
211        &self,
212        catalog: &str,
213        schema: &str,
214        query_ctx: Option<&QueryContext>,
215    ) -> Result<bool> {
216        if self.system_catalog.schema_exists(schema, query_ctx) {
217            return Ok(true);
218        }
219
220        self.table_metadata_manager
221            .schema_manager()
222            .exists(SchemaNameKey::new(catalog, schema))
223            .await
224            .context(TableMetadataManagerSnafu)
225    }
226
227    async fn table_exists(
228        &self,
229        catalog: &str,
230        schema: &str,
231        table: &str,
232        query_ctx: Option<&QueryContext>,
233    ) -> Result<bool> {
234        if self.system_catalog.table_exists(schema, table, query_ctx) {
235            return Ok(true);
236        }
237
238        let key = TableNameKey::new(catalog, schema, table);
239        self.table_metadata_manager
240            .table_name_manager()
241            .get(key)
242            .await
243            .context(TableMetadataManagerSnafu)
244            .map(|x| x.is_some())
245    }
246
247    async fn table(
248        &self,
249        catalog_name: &str,
250        schema_name: &str,
251        table_name: &str,
252        query_ctx: Option<&QueryContext>,
253    ) -> Result<Option<TableRef>> {
254        let channel = query_ctx.map_or(Channel::Unknown, |ctx| ctx.channel());
255        if let Some(table) =
256            self.system_catalog
257                .table(catalog_name, schema_name, table_name, query_ctx)
258        {
259            return Ok(Some(table));
260        }
261
262        let table_cache: TableCacheRef = self.cache_registry.get().context(CacheNotFoundSnafu {
263            name: "table_cache",
264        })?;
265        if let Some(table) = table_cache
266            .get_by_ref(&TableName {
267                catalog_name: catalog_name.to_string(),
268                schema_name: schema_name.to_string(),
269                table_name: table_name.to_string(),
270            })
271            .await
272            .context(GetTableCacheSnafu)?
273        {
274            return Ok(Some(table));
275        }
276
277        if channel == Channel::Postgres {
278            // falldown to pg_catalog
279            if let Some(table) =
280                self.system_catalog
281                    .table(catalog_name, PG_CATALOG_NAME, table_name, query_ctx)
282            {
283                return Ok(Some(table));
284            }
285        }
286
287        return Ok(None);
288    }
289
290    async fn tables_by_ids(
291        &self,
292        catalog: &str,
293        schema: &str,
294        table_ids: &[TableId],
295    ) -> Result<Vec<TableRef>> {
296        let table_info_values = self
297            .table_metadata_manager
298            .table_info_manager()
299            .batch_get(table_ids)
300            .await
301            .context(TableMetadataManagerSnafu)?;
302
303        let tables = table_info_values
304            .into_values()
305            .filter(|t| t.table_info.catalog_name == catalog && t.table_info.schema_name == schema)
306            .map(build_table)
307            .collect::<Result<Vec<_>>>()?;
308
309        Ok(tables)
310    }
311
312    fn tables<'a>(
313        &'a self,
314        catalog: &'a str,
315        schema: &'a str,
316        query_ctx: Option<&'a QueryContext>,
317    ) -> BoxStream<'a, Result<TableRef>> {
318        let sys_tables = try_stream!({
319            // System tables
320            let sys_table_names = self.system_catalog.table_names(schema, query_ctx);
321            for table_name in sys_table_names {
322                if let Some(table) =
323                    self.system_catalog
324                        .table(catalog, schema, &table_name, query_ctx)
325                {
326                    yield table;
327                }
328            }
329        });
330
331        const BATCH_SIZE: usize = 128;
332        const CONCURRENCY: usize = 8;
333
334        let (tx, rx) = tokio::sync::mpsc::channel(64);
335        let metadata_manager = self.table_metadata_manager.clone();
336        let catalog = catalog.to_string();
337        let schema = schema.to_string();
338        let semaphore = Arc::new(Semaphore::new(CONCURRENCY));
339
340        common_runtime::spawn_global(async move {
341            let table_id_stream = metadata_manager
342                .table_name_manager()
343                .tables(&catalog, &schema)
344                .map_ok(|(_, v)| v.table_id());
345            // Split table ids into chunks
346            let mut table_id_chunks = table_id_stream.ready_chunks(BATCH_SIZE);
347
348            while let Some(table_ids) = table_id_chunks.next().await {
349                let table_ids = match table_ids
350                    .into_iter()
351                    .collect::<std::result::Result<Vec<_>, _>>()
352                    .map_err(BoxedError::new)
353                    .context(ListTablesSnafu {
354                        catalog: &catalog,
355                        schema: &schema,
356                    }) {
357                    Ok(table_ids) => table_ids,
358                    Err(e) => {
359                        let _ = tx.send(Err(e)).await;
360                        return;
361                    }
362                };
363
364                let metadata_manager = metadata_manager.clone();
365                let tx = tx.clone();
366                let semaphore = semaphore.clone();
367                common_runtime::spawn_global(async move {
368                    // we don't explicitly close the semaphore so just ignore the potential error.
369                    let _ = semaphore.acquire().await;
370                    let table_info_values = match metadata_manager
371                        .table_info_manager()
372                        .batch_get(&table_ids)
373                        .await
374                        .context(TableMetadataManagerSnafu)
375                    {
376                        Ok(table_info_values) => table_info_values,
377                        Err(e) => {
378                            let _ = tx.send(Err(e)).await;
379                            return;
380                        }
381                    };
382
383                    for table in table_info_values.into_values().map(build_table) {
384                        if tx.send(table).await.is_err() {
385                            return;
386                        }
387                    }
388                });
389            }
390        });
391
392        let user_tables = ReceiverStream::new(rx);
393        Box::pin(sys_tables.chain(user_tables))
394    }
395}
396
397fn build_table(table_info_value: TableInfoValue) -> Result<TableRef> {
398    let table_info = table_info_value
399        .table_info
400        .try_into()
401        .context(InvalidTableInfoInCatalogSnafu)?;
402    Ok(DistTable::table(Arc::new(table_info)))
403}
404
405// TODO: This struct can hold a static map of all system tables when
406// the upper layer (e.g., procedure) can inform the catalog manager
407// a new catalog is created.
408/// Existing system tables:
409/// - public.numbers
410/// - information_schema.{tables}
411/// - pg_catalog.{tables}
412#[derive(Clone)]
413struct SystemCatalog {
414    catalog_manager: Weak<KvBackendCatalogManager>,
415    catalog_cache: Cache<String, Arc<InformationSchemaProvider>>,
416    pg_catalog_cache: Cache<String, Arc<PGCatalogProvider>>,
417
418    // system_schema_provider for default catalog
419    information_schema_provider: Arc<InformationSchemaProvider>,
420    pg_catalog_provider: Arc<PGCatalogProvider>,
421    backend: KvBackendRef,
422}
423
424impl SystemCatalog {
425    fn schema_names(&self, query_ctx: Option<&QueryContext>) -> Vec<String> {
426        let channel = query_ctx.map_or(Channel::Unknown, |ctx| ctx.channel());
427        match channel {
428            // pg_catalog only visible under postgres protocol
429            Channel::Postgres => vec![
430                INFORMATION_SCHEMA_NAME.to_string(),
431                PG_CATALOG_NAME.to_string(),
432            ],
433            _ => {
434                vec![INFORMATION_SCHEMA_NAME.to_string()]
435            }
436        }
437    }
438
439    fn table_names(&self, schema: &str, query_ctx: Option<&QueryContext>) -> Vec<String> {
440        let channel = query_ctx.map_or(Channel::Unknown, |ctx| ctx.channel());
441        match schema {
442            INFORMATION_SCHEMA_NAME => self.information_schema_provider.table_names(),
443            PG_CATALOG_NAME if channel == Channel::Postgres => {
444                self.pg_catalog_provider.table_names()
445            }
446            DEFAULT_SCHEMA_NAME => {
447                vec![NUMBERS_TABLE_NAME.to_string()]
448            }
449            _ => vec![],
450        }
451    }
452
453    fn schema_exists(&self, schema: &str, query_ctx: Option<&QueryContext>) -> bool {
454        let channel = query_ctx.map_or(Channel::Unknown, |ctx| ctx.channel());
455        match channel {
456            Channel::Postgres => schema == PG_CATALOG_NAME || schema == INFORMATION_SCHEMA_NAME,
457            _ => schema == INFORMATION_SCHEMA_NAME,
458        }
459    }
460
461    fn table_exists(&self, schema: &str, table: &str, query_ctx: Option<&QueryContext>) -> bool {
462        let channel = query_ctx.map_or(Channel::Unknown, |ctx| ctx.channel());
463        if schema == INFORMATION_SCHEMA_NAME {
464            self.information_schema_provider.table(table).is_some()
465        } else if schema == DEFAULT_SCHEMA_NAME {
466            table == NUMBERS_TABLE_NAME
467        } else if schema == PG_CATALOG_NAME && channel == Channel::Postgres {
468            self.pg_catalog_provider.table(table).is_some()
469        } else {
470            false
471        }
472    }
473
474    fn table(
475        &self,
476        catalog: &str,
477        schema: &str,
478        table_name: &str,
479        query_ctx: Option<&QueryContext>,
480    ) -> Option<TableRef> {
481        let channel = query_ctx.map_or(Channel::Unknown, |ctx| ctx.channel());
482        if schema == INFORMATION_SCHEMA_NAME {
483            let information_schema_provider =
484                self.catalog_cache.get_with_by_ref(catalog, move || {
485                    Arc::new(InformationSchemaProvider::new(
486                        catalog.to_string(),
487                        self.catalog_manager.clone(),
488                        Arc::new(FlowMetadataManager::new(self.backend.clone())),
489                    ))
490                });
491            information_schema_provider.table(table_name)
492        } else if schema == PG_CATALOG_NAME && channel == Channel::Postgres {
493            if catalog == DEFAULT_CATALOG_NAME {
494                self.pg_catalog_provider.table(table_name)
495            } else {
496                let pg_catalog_provider =
497                    self.pg_catalog_cache.get_with_by_ref(catalog, move || {
498                        Arc::new(PGCatalogProvider::new(
499                            catalog.to_string(),
500                            self.catalog_manager.clone(),
501                        ))
502                    });
503                pg_catalog_provider.table(table_name)
504            }
505        } else if schema == DEFAULT_SCHEMA_NAME && table_name == NUMBERS_TABLE_NAME {
506            Some(NumbersTable::table(NUMBERS_TABLE_ID))
507        } else {
508            None
509        }
510    }
511}