catalog/memory/
manager.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16use std::collections::hash_map::Entry;
17use std::collections::{HashMap, HashSet};
18use std::sync::{Arc, RwLock, Weak};
19
20use async_stream::{stream, try_stream};
21use common_catalog::build_db_string;
22use common_catalog::consts::{
23    DEFAULT_CATALOG_NAME, DEFAULT_PRIVATE_SCHEMA_NAME, DEFAULT_SCHEMA_NAME,
24    INFORMATION_SCHEMA_NAME, PG_CATALOG_NAME,
25};
26use common_meta::key::flow::FlowMetadataManager;
27use common_meta::kv_backend::memory::MemoryKvBackend;
28use futures_util::stream::BoxStream;
29use session::context::QueryContext;
30use snafu::OptionExt;
31use table::metadata::{TableId, TableInfoRef};
32use table::TableRef;
33
34use crate::error::{CatalogNotFoundSnafu, Result, SchemaNotFoundSnafu, TableExistsSnafu};
35use crate::information_schema::InformationSchemaProvider;
36use crate::system_schema::SystemSchemaProvider;
37use crate::{CatalogManager, DeregisterTableRequest, RegisterSchemaRequest, RegisterTableRequest};
38
39type SchemaEntries = HashMap<String, HashMap<String, TableRef>>;
40
41/// Simple in-memory list of catalogs used for tests.
42#[derive(Clone)]
43pub struct MemoryCatalogManager {
44    /// Collection of catalogs containing schemas and ultimately Tables
45    catalogs: Arc<RwLock<HashMap<String, SchemaEntries>>>,
46}
47
48#[async_trait::async_trait]
49impl CatalogManager for MemoryCatalogManager {
50    fn as_any(&self) -> &dyn Any {
51        self
52    }
53
54    async fn catalog_names(&self) -> Result<Vec<String>> {
55        Ok(self.catalogs.read().unwrap().keys().cloned().collect())
56    }
57
58    async fn schema_names(
59        &self,
60        catalog: &str,
61        _query_ctx: Option<&QueryContext>,
62    ) -> Result<Vec<String>> {
63        Ok(self
64            .catalogs
65            .read()
66            .unwrap()
67            .get(catalog)
68            .with_context(|| CatalogNotFoundSnafu {
69                catalog_name: catalog,
70            })?
71            .keys()
72            .cloned()
73            .collect())
74    }
75
76    async fn table_names(
77        &self,
78        catalog: &str,
79        schema: &str,
80        _query_ctx: Option<&QueryContext>,
81    ) -> Result<Vec<String>> {
82        Ok(self
83            .catalogs
84            .read()
85            .unwrap()
86            .get(catalog)
87            .with_context(|| CatalogNotFoundSnafu {
88                catalog_name: catalog,
89            })?
90            .get(schema)
91            .with_context(|| SchemaNotFoundSnafu { catalog, schema })?
92            .keys()
93            .cloned()
94            .collect())
95    }
96
97    async fn catalog_exists(&self, catalog: &str) -> Result<bool> {
98        self.catalog_exist_sync(catalog)
99    }
100
101    async fn schema_exists(
102        &self,
103        catalog: &str,
104        schema: &str,
105        _query_ctx: Option<&QueryContext>,
106    ) -> Result<bool> {
107        self.schema_exist_sync(catalog, schema)
108    }
109
110    async fn table_exists(
111        &self,
112        catalog: &str,
113        schema: &str,
114        table: &str,
115        _query_ctx: Option<&QueryContext>,
116    ) -> Result<bool> {
117        let catalogs = self.catalogs.read().unwrap();
118        Ok(catalogs
119            .get(catalog)
120            .with_context(|| CatalogNotFoundSnafu {
121                catalog_name: catalog,
122            })?
123            .get(schema)
124            .with_context(|| SchemaNotFoundSnafu { catalog, schema })?
125            .contains_key(table))
126    }
127
128    async fn table(
129        &self,
130        catalog: &str,
131        schema: &str,
132        table_name: &str,
133        _query_ctx: Option<&QueryContext>,
134    ) -> Result<Option<TableRef>> {
135        let result = try {
136            self.catalogs
137                .read()
138                .unwrap()
139                .get(catalog)?
140                .get(schema)?
141                .get(table_name)
142                .cloned()?
143        };
144        Ok(result)
145    }
146
147    async fn table_info_by_id(&self, table_id: TableId) -> Result<Option<TableInfoRef>> {
148        Ok(self
149            .catalogs
150            .read()
151            .unwrap()
152            .iter()
153            .flat_map(|(_, schema_entries)| schema_entries.values())
154            .flat_map(|tables| tables.values())
155            .find(|t| t.table_info().ident.table_id == table_id)
156            .map(|t| t.table_info()))
157    }
158
159    async fn tables_by_ids(
160        &self,
161        catalog: &str,
162        schema: &str,
163        table_ids: &[TableId],
164    ) -> Result<Vec<TableRef>> {
165        let catalogs = self.catalogs.read().unwrap();
166
167        let schemas = catalogs.get(catalog).context(CatalogNotFoundSnafu {
168            catalog_name: catalog,
169        })?;
170
171        let tables = schemas
172            .get(schema)
173            .context(SchemaNotFoundSnafu { catalog, schema })?;
174
175        let filter_ids: HashSet<_> = table_ids.iter().collect();
176        // It is very inefficient, but we do not need to optimize it since it will not be called in `MemoryCatalogManager`.
177        let tables = tables
178            .values()
179            .filter(|t| filter_ids.contains(&t.table_info().table_id()))
180            .cloned()
181            .collect::<Vec<_>>();
182
183        Ok(tables)
184    }
185
186    fn tables<'a>(
187        &'a self,
188        catalog: &'a str,
189        schema: &'a str,
190        _query_ctx: Option<&QueryContext>,
191    ) -> BoxStream<'a, Result<TableRef>> {
192        let catalogs = self.catalogs.read().unwrap();
193
194        let Some(schemas) = catalogs.get(catalog) else {
195            return Box::pin(stream!({
196                yield CatalogNotFoundSnafu {
197                    catalog_name: catalog,
198                }
199                .fail();
200            }));
201        };
202
203        let Some(tables) = schemas.get(schema) else {
204            return Box::pin(stream!({
205                yield SchemaNotFoundSnafu { catalog, schema }.fail();
206            }));
207        };
208
209        let tables = tables.values().cloned().collect::<Vec<_>>();
210
211        Box::pin(try_stream!({
212            for table in tables {
213                yield table;
214            }
215        }))
216    }
217}
218
219impl MemoryCatalogManager {
220    pub fn new() -> Arc<Self> {
221        Arc::new(Self {
222            catalogs: Default::default(),
223        })
224    }
225
226    /// Creates a manager with some default setups
227    /// (e.g. default catalog/schema and information schema)
228    pub fn with_default_setup() -> Arc<Self> {
229        let manager = Arc::new(Self {
230            catalogs: Default::default(),
231        });
232
233        // Safety: default catalog/schema is registered in order so no CatalogNotFound error will occur
234        manager.register_catalog_sync(DEFAULT_CATALOG_NAME).unwrap();
235        manager
236            .register_schema_sync(RegisterSchemaRequest {
237                catalog: DEFAULT_CATALOG_NAME.to_string(),
238                schema: DEFAULT_SCHEMA_NAME.to_string(),
239            })
240            .unwrap();
241        manager
242            .register_schema_sync(RegisterSchemaRequest {
243                catalog: DEFAULT_CATALOG_NAME.to_string(),
244                schema: DEFAULT_PRIVATE_SCHEMA_NAME.to_string(),
245            })
246            .unwrap();
247        manager
248            .register_schema_sync(RegisterSchemaRequest {
249                catalog: DEFAULT_CATALOG_NAME.to_string(),
250                schema: PG_CATALOG_NAME.to_string(),
251            })
252            .unwrap();
253        manager
254            .register_schema_sync(RegisterSchemaRequest {
255                catalog: DEFAULT_CATALOG_NAME.to_string(),
256                schema: INFORMATION_SCHEMA_NAME.to_string(),
257            })
258            .unwrap();
259
260        manager
261    }
262
263    fn schema_exist_sync(&self, catalog: &str, schema: &str) -> Result<bool> {
264        Ok(self
265            .catalogs
266            .read()
267            .unwrap()
268            .get(catalog)
269            .with_context(|| CatalogNotFoundSnafu {
270                catalog_name: catalog,
271            })?
272            .contains_key(schema))
273    }
274
275    fn catalog_exist_sync(&self, catalog: &str) -> Result<bool> {
276        Ok(self.catalogs.read().unwrap().contains_key(catalog))
277    }
278
279    /// Registers a catalog if it does not exist and returns false if the schema exists.
280    pub fn register_catalog_sync(&self, name: &str) -> Result<bool> {
281        let name = name.to_string();
282
283        let mut catalogs = self.catalogs.write().unwrap();
284
285        match catalogs.entry(name.clone()) {
286            Entry::Vacant(e) => {
287                let arc_self = Arc::new(self.clone());
288                let catalog = arc_self.create_catalog_entry(name);
289                e.insert(catalog);
290                crate::metrics::METRIC_CATALOG_MANAGER_CATALOG_COUNT.inc();
291                Ok(true)
292            }
293            Entry::Occupied(_) => Ok(false),
294        }
295    }
296
297    pub fn deregister_table_sync(&self, request: DeregisterTableRequest) -> Result<()> {
298        let mut catalogs = self.catalogs.write().unwrap();
299        let schema = catalogs
300            .get_mut(&request.catalog)
301            .with_context(|| CatalogNotFoundSnafu {
302                catalog_name: &request.catalog,
303            })?
304            .get_mut(&request.schema)
305            .with_context(|| SchemaNotFoundSnafu {
306                catalog: &request.catalog,
307                schema: &request.schema,
308            })?;
309        let result = schema.remove(&request.table_name);
310        if result.is_some() {
311            crate::metrics::METRIC_CATALOG_MANAGER_TABLE_COUNT
312                .with_label_values(&[build_db_string(&request.catalog, &request.schema).as_str()])
313                .dec();
314        }
315        Ok(())
316    }
317
318    /// Registers a schema if it does not exist.
319    /// It returns an error if the catalog does not exist,
320    /// and returns false if the schema exists.
321    pub fn register_schema_sync(&self, request: RegisterSchemaRequest) -> Result<bool> {
322        let mut catalogs = self.catalogs.write().unwrap();
323        let catalog = catalogs
324            .get_mut(&request.catalog)
325            .with_context(|| CatalogNotFoundSnafu {
326                catalog_name: &request.catalog,
327            })?;
328
329        match catalog.entry(request.schema) {
330            Entry::Vacant(e) => {
331                e.insert(HashMap::new());
332                crate::metrics::METRIC_CATALOG_MANAGER_SCHEMA_COUNT.inc();
333                Ok(true)
334            }
335            Entry::Occupied(_) => Ok(false),
336        }
337    }
338
339    /// Registers a schema and returns an error if the catalog or schema does not exist.
340    pub fn register_table_sync(&self, request: RegisterTableRequest) -> Result<bool> {
341        let mut catalogs = self.catalogs.write().unwrap();
342        let schema = catalogs
343            .get_mut(&request.catalog)
344            .with_context(|| CatalogNotFoundSnafu {
345                catalog_name: &request.catalog,
346            })?
347            .get_mut(&request.schema)
348            .with_context(|| SchemaNotFoundSnafu {
349                catalog: &request.catalog,
350                schema: &request.schema,
351            })?;
352
353        if schema.contains_key(&request.table_name) {
354            return TableExistsSnafu {
355                table: &request.table_name,
356            }
357            .fail();
358        }
359        schema.insert(request.table_name, request.table);
360        crate::metrics::METRIC_CATALOG_MANAGER_TABLE_COUNT
361            .with_label_values(&[build_db_string(&request.catalog, &request.schema).as_str()])
362            .inc();
363        Ok(true)
364    }
365
366    fn create_catalog_entry(self: &Arc<Self>, catalog: String) -> SchemaEntries {
367        let backend = Arc::new(MemoryKvBackend::new());
368        let information_schema_provider = InformationSchemaProvider::new(
369            catalog,
370            Arc::downgrade(self) as Weak<dyn CatalogManager>,
371            Arc::new(FlowMetadataManager::new(backend.clone())),
372            None, // we don't need ProcessManager on regions server.
373            backend,
374        );
375        let information_schema = information_schema_provider.tables().clone();
376
377        let mut catalog = HashMap::new();
378        catalog.insert(INFORMATION_SCHEMA_NAME.to_string(), information_schema);
379        catalog
380    }
381
382    #[cfg(any(test, feature = "testing"))]
383    pub fn new_with_table(table: TableRef) -> Arc<Self> {
384        let manager = Self::with_default_setup();
385        let catalog = &table.table_info().catalog_name;
386        let schema = &table.table_info().schema_name;
387
388        if !manager.catalog_exist_sync(catalog).unwrap() {
389            manager.register_catalog_sync(catalog).unwrap();
390        }
391
392        if !manager.schema_exist_sync(catalog, schema).unwrap() {
393            manager
394                .register_schema_sync(RegisterSchemaRequest {
395                    catalog: catalog.to_string(),
396                    schema: schema.to_string(),
397                })
398                .unwrap();
399        }
400
401        let request = RegisterTableRequest {
402            catalog: catalog.to_string(),
403            schema: schema.to_string(),
404            table_name: table.table_info().name.clone(),
405            table_id: table.table_info().ident.table_id,
406            table,
407        };
408        let _ = manager.register_table_sync(request).unwrap();
409        manager
410    }
411}
412
413/// Create a memory catalog list contains a numbers table for test
414pub fn new_memory_catalog_manager() -> Result<Arc<MemoryCatalogManager>> {
415    Ok(MemoryCatalogManager::with_default_setup())
416}
417
418#[cfg(test)]
419mod tests {
420    use common_catalog::consts::*;
421    use futures_util::TryStreamExt;
422    use table::table::numbers::{NumbersTable, NUMBERS_TABLE_NAME};
423
424    use super::*;
425
426    #[tokio::test]
427    async fn test_new_memory_catalog_list() {
428        let catalog_list = new_memory_catalog_manager().unwrap();
429
430        let register_request = RegisterTableRequest {
431            catalog: DEFAULT_CATALOG_NAME.to_string(),
432            schema: DEFAULT_SCHEMA_NAME.to_string(),
433            table_name: NUMBERS_TABLE_NAME.to_string(),
434            table_id: NUMBERS_TABLE_ID,
435            table: NumbersTable::table(NUMBERS_TABLE_ID),
436        };
437
438        catalog_list.register_table_sync(register_request).unwrap();
439        let table = catalog_list
440            .table(
441                DEFAULT_CATALOG_NAME,
442                DEFAULT_SCHEMA_NAME,
443                NUMBERS_TABLE_NAME,
444                None,
445            )
446            .await
447            .unwrap()
448            .unwrap();
449        let stream = catalog_list.tables(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, None);
450        let tables = stream.try_collect::<Vec<_>>().await.unwrap();
451        assert_eq!(tables.len(), 1);
452        assert_eq!(
453            table.table_info().table_id(),
454            tables[0].table_info().table_id()
455        );
456
457        assert!(catalog_list
458            .table(
459                DEFAULT_CATALOG_NAME,
460                DEFAULT_SCHEMA_NAME,
461                "not_exists",
462                None
463            )
464            .await
465            .unwrap()
466            .is_none());
467    }
468
469    #[test]
470    pub fn test_register_catalog_sync() {
471        let list = MemoryCatalogManager::with_default_setup();
472        assert!(list.register_catalog_sync("test_catalog").unwrap());
473        assert!(!list.register_catalog_sync("test_catalog").unwrap());
474    }
475
476    #[tokio::test]
477    pub async fn test_catalog_deregister_table() {
478        let catalog = MemoryCatalogManager::with_default_setup();
479        let table_name = "foo_table";
480
481        let register_table_req = RegisterTableRequest {
482            catalog: DEFAULT_CATALOG_NAME.to_string(),
483            schema: DEFAULT_SCHEMA_NAME.to_string(),
484            table_name: table_name.to_string(),
485            table_id: 2333,
486            table: NumbersTable::table(2333),
487        };
488        catalog.register_table_sync(register_table_req).unwrap();
489        assert!(catalog
490            .table(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, table_name, None)
491            .await
492            .unwrap()
493            .is_some());
494
495        let deregister_table_req = DeregisterTableRequest {
496            catalog: DEFAULT_CATALOG_NAME.to_string(),
497            schema: DEFAULT_SCHEMA_NAME.to_string(),
498            table_name: table_name.to_string(),
499        };
500        catalog.deregister_table_sync(deregister_table_req).unwrap();
501        assert!(catalog
502            .table(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, table_name, None)
503            .await
504            .unwrap()
505            .is_none());
506    }
507}