catalog/memory/
manager.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16use std::collections::hash_map::Entry;
17use std::collections::{HashMap, HashSet};
18use std::sync::{Arc, RwLock, Weak};
19
20use async_stream::{stream, try_stream};
21use common_catalog::build_db_string;
22use common_catalog::consts::{
23    DEFAULT_CATALOG_NAME, DEFAULT_PRIVATE_SCHEMA_NAME, DEFAULT_SCHEMA_NAME,
24    INFORMATION_SCHEMA_NAME, PG_CATALOG_NAME,
25};
26use common_meta::key::flow::FlowMetadataManager;
27use common_meta::kv_backend::memory::MemoryKvBackend;
28use futures_util::stream::BoxStream;
29use session::context::QueryContext;
30use snafu::OptionExt;
31use table::metadata::TableId;
32use table::TableRef;
33
34use crate::error::{CatalogNotFoundSnafu, Result, SchemaNotFoundSnafu, TableExistsSnafu};
35use crate::information_schema::InformationSchemaProvider;
36use crate::system_schema::SystemSchemaProvider;
37use crate::{CatalogManager, DeregisterTableRequest, RegisterSchemaRequest, RegisterTableRequest};
38
39type SchemaEntries = HashMap<String, HashMap<String, TableRef>>;
40
41/// Simple in-memory list of catalogs
42#[derive(Clone)]
43pub struct MemoryCatalogManager {
44    /// Collection of catalogs containing schemas and ultimately Tables
45    catalogs: Arc<RwLock<HashMap<String, SchemaEntries>>>,
46}
47
48#[async_trait::async_trait]
49impl CatalogManager for MemoryCatalogManager {
50    fn as_any(&self) -> &dyn Any {
51        self
52    }
53
54    async fn catalog_names(&self) -> Result<Vec<String>> {
55        Ok(self.catalogs.read().unwrap().keys().cloned().collect())
56    }
57
58    async fn schema_names(
59        &self,
60        catalog: &str,
61        _query_ctx: Option<&QueryContext>,
62    ) -> Result<Vec<String>> {
63        Ok(self
64            .catalogs
65            .read()
66            .unwrap()
67            .get(catalog)
68            .with_context(|| CatalogNotFoundSnafu {
69                catalog_name: catalog,
70            })?
71            .keys()
72            .cloned()
73            .collect())
74    }
75
76    async fn table_names(
77        &self,
78        catalog: &str,
79        schema: &str,
80        _query_ctx: Option<&QueryContext>,
81    ) -> Result<Vec<String>> {
82        Ok(self
83            .catalogs
84            .read()
85            .unwrap()
86            .get(catalog)
87            .with_context(|| CatalogNotFoundSnafu {
88                catalog_name: catalog,
89            })?
90            .get(schema)
91            .with_context(|| SchemaNotFoundSnafu { catalog, schema })?
92            .keys()
93            .cloned()
94            .collect())
95    }
96
97    async fn catalog_exists(&self, catalog: &str) -> Result<bool> {
98        self.catalog_exist_sync(catalog)
99    }
100
101    async fn schema_exists(
102        &self,
103        catalog: &str,
104        schema: &str,
105        _query_ctx: Option<&QueryContext>,
106    ) -> Result<bool> {
107        self.schema_exist_sync(catalog, schema)
108    }
109
110    async fn table_exists(
111        &self,
112        catalog: &str,
113        schema: &str,
114        table: &str,
115        _query_ctx: Option<&QueryContext>,
116    ) -> Result<bool> {
117        let catalogs = self.catalogs.read().unwrap();
118        Ok(catalogs
119            .get(catalog)
120            .with_context(|| CatalogNotFoundSnafu {
121                catalog_name: catalog,
122            })?
123            .get(schema)
124            .with_context(|| SchemaNotFoundSnafu { catalog, schema })?
125            .contains_key(table))
126    }
127
128    async fn table(
129        &self,
130        catalog: &str,
131        schema: &str,
132        table_name: &str,
133        _query_ctx: Option<&QueryContext>,
134    ) -> Result<Option<TableRef>> {
135        let result = try {
136            self.catalogs
137                .read()
138                .unwrap()
139                .get(catalog)?
140                .get(schema)?
141                .get(table_name)
142                .cloned()?
143        };
144        Ok(result)
145    }
146
147    async fn tables_by_ids(
148        &self,
149        catalog: &str,
150        schema: &str,
151        table_ids: &[TableId],
152    ) -> Result<Vec<TableRef>> {
153        let catalogs = self.catalogs.read().unwrap();
154
155        let schemas = catalogs.get(catalog).context(CatalogNotFoundSnafu {
156            catalog_name: catalog,
157        })?;
158
159        let tables = schemas
160            .get(schema)
161            .context(SchemaNotFoundSnafu { catalog, schema })?;
162
163        let filter_ids: HashSet<_> = table_ids.iter().collect();
164        // It is very inefficient, but we do not need to optimize it since it will not be called in `MemoryCatalogManager`.
165        let tables = tables
166            .values()
167            .filter(|t| filter_ids.contains(&t.table_info().table_id()))
168            .cloned()
169            .collect::<Vec<_>>();
170
171        Ok(tables)
172    }
173
174    fn tables<'a>(
175        &'a self,
176        catalog: &'a str,
177        schema: &'a str,
178        _query_ctx: Option<&QueryContext>,
179    ) -> BoxStream<'a, Result<TableRef>> {
180        let catalogs = self.catalogs.read().unwrap();
181
182        let Some(schemas) = catalogs.get(catalog) else {
183            return Box::pin(stream!({
184                yield CatalogNotFoundSnafu {
185                    catalog_name: catalog,
186                }
187                .fail();
188            }));
189        };
190
191        let Some(tables) = schemas.get(schema) else {
192            return Box::pin(stream!({
193                yield SchemaNotFoundSnafu { catalog, schema }.fail();
194            }));
195        };
196
197        let tables = tables.values().cloned().collect::<Vec<_>>();
198
199        Box::pin(try_stream!({
200            for table in tables {
201                yield table;
202            }
203        }))
204    }
205}
206
207impl MemoryCatalogManager {
208    pub fn new() -> Arc<Self> {
209        Arc::new(Self {
210            catalogs: Default::default(),
211        })
212    }
213
214    /// Creates a manager with some default setups
215    /// (e.g. default catalog/schema and information schema)
216    pub fn with_default_setup() -> Arc<Self> {
217        let manager = Arc::new(Self {
218            catalogs: Default::default(),
219        });
220
221        // Safety: default catalog/schema is registered in order so no CatalogNotFound error will occur
222        manager.register_catalog_sync(DEFAULT_CATALOG_NAME).unwrap();
223        manager
224            .register_schema_sync(RegisterSchemaRequest {
225                catalog: DEFAULT_CATALOG_NAME.to_string(),
226                schema: DEFAULT_SCHEMA_NAME.to_string(),
227            })
228            .unwrap();
229        manager
230            .register_schema_sync(RegisterSchemaRequest {
231                catalog: DEFAULT_CATALOG_NAME.to_string(),
232                schema: DEFAULT_PRIVATE_SCHEMA_NAME.to_string(),
233            })
234            .unwrap();
235        manager
236            .register_schema_sync(RegisterSchemaRequest {
237                catalog: DEFAULT_CATALOG_NAME.to_string(),
238                schema: PG_CATALOG_NAME.to_string(),
239            })
240            .unwrap();
241        manager
242            .register_schema_sync(RegisterSchemaRequest {
243                catalog: DEFAULT_CATALOG_NAME.to_string(),
244                schema: INFORMATION_SCHEMA_NAME.to_string(),
245            })
246            .unwrap();
247
248        manager
249    }
250
251    fn schema_exist_sync(&self, catalog: &str, schema: &str) -> Result<bool> {
252        Ok(self
253            .catalogs
254            .read()
255            .unwrap()
256            .get(catalog)
257            .with_context(|| CatalogNotFoundSnafu {
258                catalog_name: catalog,
259            })?
260            .contains_key(schema))
261    }
262
263    fn catalog_exist_sync(&self, catalog: &str) -> Result<bool> {
264        Ok(self.catalogs.read().unwrap().contains_key(catalog))
265    }
266
267    /// Registers a catalog if it does not exist and returns false if the schema exists.
268    pub fn register_catalog_sync(&self, name: &str) -> Result<bool> {
269        let name = name.to_string();
270
271        let mut catalogs = self.catalogs.write().unwrap();
272
273        match catalogs.entry(name.clone()) {
274            Entry::Vacant(e) => {
275                let arc_self = Arc::new(self.clone());
276                let catalog = arc_self.create_catalog_entry(name);
277                e.insert(catalog);
278                crate::metrics::METRIC_CATALOG_MANAGER_CATALOG_COUNT.inc();
279                Ok(true)
280            }
281            Entry::Occupied(_) => Ok(false),
282        }
283    }
284
285    pub fn deregister_table_sync(&self, request: DeregisterTableRequest) -> Result<()> {
286        let mut catalogs = self.catalogs.write().unwrap();
287        let schema = catalogs
288            .get_mut(&request.catalog)
289            .with_context(|| CatalogNotFoundSnafu {
290                catalog_name: &request.catalog,
291            })?
292            .get_mut(&request.schema)
293            .with_context(|| SchemaNotFoundSnafu {
294                catalog: &request.catalog,
295                schema: &request.schema,
296            })?;
297        let result = schema.remove(&request.table_name);
298        if result.is_some() {
299            crate::metrics::METRIC_CATALOG_MANAGER_TABLE_COUNT
300                .with_label_values(&[build_db_string(&request.catalog, &request.schema).as_str()])
301                .dec();
302        }
303        Ok(())
304    }
305
306    /// Registers a schema if it does not exist.
307    /// It returns an error if the catalog does not exist,
308    /// and returns false if the schema exists.
309    pub fn register_schema_sync(&self, request: RegisterSchemaRequest) -> Result<bool> {
310        let mut catalogs = self.catalogs.write().unwrap();
311        let catalog = catalogs
312            .get_mut(&request.catalog)
313            .with_context(|| CatalogNotFoundSnafu {
314                catalog_name: &request.catalog,
315            })?;
316
317        match catalog.entry(request.schema) {
318            Entry::Vacant(e) => {
319                e.insert(HashMap::new());
320                crate::metrics::METRIC_CATALOG_MANAGER_SCHEMA_COUNT.inc();
321                Ok(true)
322            }
323            Entry::Occupied(_) => Ok(false),
324        }
325    }
326
327    /// Registers a schema and returns an error if the catalog or schema does not exist.
328    pub fn register_table_sync(&self, request: RegisterTableRequest) -> Result<bool> {
329        let mut catalogs = self.catalogs.write().unwrap();
330        let schema = catalogs
331            .get_mut(&request.catalog)
332            .with_context(|| CatalogNotFoundSnafu {
333                catalog_name: &request.catalog,
334            })?
335            .get_mut(&request.schema)
336            .with_context(|| SchemaNotFoundSnafu {
337                catalog: &request.catalog,
338                schema: &request.schema,
339            })?;
340
341        if schema.contains_key(&request.table_name) {
342            return TableExistsSnafu {
343                table: &request.table_name,
344            }
345            .fail();
346        }
347        schema.insert(request.table_name, request.table);
348        crate::metrics::METRIC_CATALOG_MANAGER_TABLE_COUNT
349            .with_label_values(&[build_db_string(&request.catalog, &request.schema).as_str()])
350            .inc();
351        Ok(true)
352    }
353
354    fn create_catalog_entry(self: &Arc<Self>, catalog: String) -> SchemaEntries {
355        let information_schema_provider = InformationSchemaProvider::new(
356            catalog,
357            Arc::downgrade(self) as Weak<dyn CatalogManager>,
358            Arc::new(FlowMetadataManager::new(Arc::new(MemoryKvBackend::new()))),
359        );
360        let information_schema = information_schema_provider.tables().clone();
361
362        let mut catalog = HashMap::new();
363        catalog.insert(INFORMATION_SCHEMA_NAME.to_string(), information_schema);
364        catalog
365    }
366
367    #[cfg(any(test, feature = "testing"))]
368    pub fn new_with_table(table: TableRef) -> Arc<Self> {
369        let manager = Self::with_default_setup();
370        let catalog = &table.table_info().catalog_name;
371        let schema = &table.table_info().schema_name;
372
373        if !manager.catalog_exist_sync(catalog).unwrap() {
374            manager.register_catalog_sync(catalog).unwrap();
375        }
376
377        if !manager.schema_exist_sync(catalog, schema).unwrap() {
378            manager
379                .register_schema_sync(RegisterSchemaRequest {
380                    catalog: catalog.to_string(),
381                    schema: schema.to_string(),
382                })
383                .unwrap();
384        }
385
386        let request = RegisterTableRequest {
387            catalog: catalog.to_string(),
388            schema: schema.to_string(),
389            table_name: table.table_info().name.clone(),
390            table_id: table.table_info().ident.table_id,
391            table,
392        };
393        let _ = manager.register_table_sync(request).unwrap();
394        manager
395    }
396}
397
398/// Create a memory catalog list contains a numbers table for test
399pub fn new_memory_catalog_manager() -> Result<Arc<MemoryCatalogManager>> {
400    Ok(MemoryCatalogManager::with_default_setup())
401}
402
403#[cfg(test)]
404mod tests {
405    use common_catalog::consts::*;
406    use futures_util::TryStreamExt;
407    use table::table::numbers::{NumbersTable, NUMBERS_TABLE_NAME};
408
409    use super::*;
410
411    #[tokio::test]
412    async fn test_new_memory_catalog_list() {
413        let catalog_list = new_memory_catalog_manager().unwrap();
414
415        let register_request = RegisterTableRequest {
416            catalog: DEFAULT_CATALOG_NAME.to_string(),
417            schema: DEFAULT_SCHEMA_NAME.to_string(),
418            table_name: NUMBERS_TABLE_NAME.to_string(),
419            table_id: NUMBERS_TABLE_ID,
420            table: NumbersTable::table(NUMBERS_TABLE_ID),
421        };
422
423        catalog_list.register_table_sync(register_request).unwrap();
424        let table = catalog_list
425            .table(
426                DEFAULT_CATALOG_NAME,
427                DEFAULT_SCHEMA_NAME,
428                NUMBERS_TABLE_NAME,
429                None,
430            )
431            .await
432            .unwrap()
433            .unwrap();
434        let stream = catalog_list.tables(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, None);
435        let tables = stream.try_collect::<Vec<_>>().await.unwrap();
436        assert_eq!(tables.len(), 1);
437        assert_eq!(
438            table.table_info().table_id(),
439            tables[0].table_info().table_id()
440        );
441
442        assert!(catalog_list
443            .table(
444                DEFAULT_CATALOG_NAME,
445                DEFAULT_SCHEMA_NAME,
446                "not_exists",
447                None
448            )
449            .await
450            .unwrap()
451            .is_none());
452    }
453
454    #[test]
455    pub fn test_register_catalog_sync() {
456        let list = MemoryCatalogManager::with_default_setup();
457        assert!(list.register_catalog_sync("test_catalog").unwrap());
458        assert!(!list.register_catalog_sync("test_catalog").unwrap());
459    }
460
461    #[tokio::test]
462    pub async fn test_catalog_deregister_table() {
463        let catalog = MemoryCatalogManager::with_default_setup();
464        let table_name = "foo_table";
465
466        let register_table_req = RegisterTableRequest {
467            catalog: DEFAULT_CATALOG_NAME.to_string(),
468            schema: DEFAULT_SCHEMA_NAME.to_string(),
469            table_name: table_name.to_string(),
470            table_id: 2333,
471            table: NumbersTable::table(2333),
472        };
473        catalog.register_table_sync(register_table_req).unwrap();
474        assert!(catalog
475            .table(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, table_name, None)
476            .await
477            .unwrap()
478            .is_some());
479
480        let deregister_table_req = DeregisterTableRequest {
481            catalog: DEFAULT_CATALOG_NAME.to_string(),
482            schema: DEFAULT_SCHEMA_NAME.to_string(),
483            table_name: table_name.to_string(),
484        };
485        catalog.deregister_table_sync(deregister_table_req).unwrap();
486        assert!(catalog
487            .table(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, table_name, None)
488            .await
489            .unwrap()
490            .is_none());
491    }
492}