1use std::any::Any;
16use std::collections::hash_map::Entry;
17use std::collections::{HashMap, HashSet};
18use std::sync::{Arc, RwLock, Weak};
19
20use async_stream::{stream, try_stream};
21use common_catalog::build_db_string;
22use common_catalog::consts::{
23 DEFAULT_CATALOG_NAME, DEFAULT_PRIVATE_SCHEMA_NAME, DEFAULT_SCHEMA_NAME,
24 INFORMATION_SCHEMA_NAME, PG_CATALOG_NAME,
25};
26use common_meta::key::flow::FlowMetadataManager;
27use common_meta::kv_backend::memory::MemoryKvBackend;
28use futures_util::stream::BoxStream;
29use session::context::QueryContext;
30use snafu::OptionExt;
31use table::metadata::TableId;
32use table::TableRef;
33
34use crate::error::{CatalogNotFoundSnafu, Result, SchemaNotFoundSnafu, TableExistsSnafu};
35use crate::information_schema::InformationSchemaProvider;
36use crate::system_schema::SystemSchemaProvider;
37use crate::{CatalogManager, DeregisterTableRequest, RegisterSchemaRequest, RegisterTableRequest};
38
39type SchemaEntries = HashMap<String, HashMap<String, TableRef>>;
40
41#[derive(Clone)]
43pub struct MemoryCatalogManager {
44 catalogs: Arc<RwLock<HashMap<String, SchemaEntries>>>,
46}
47
48#[async_trait::async_trait]
49impl CatalogManager for MemoryCatalogManager {
50 fn as_any(&self) -> &dyn Any {
51 self
52 }
53
54 async fn catalog_names(&self) -> Result<Vec<String>> {
55 Ok(self.catalogs.read().unwrap().keys().cloned().collect())
56 }
57
58 async fn schema_names(
59 &self,
60 catalog: &str,
61 _query_ctx: Option<&QueryContext>,
62 ) -> Result<Vec<String>> {
63 Ok(self
64 .catalogs
65 .read()
66 .unwrap()
67 .get(catalog)
68 .with_context(|| CatalogNotFoundSnafu {
69 catalog_name: catalog,
70 })?
71 .keys()
72 .cloned()
73 .collect())
74 }
75
76 async fn table_names(
77 &self,
78 catalog: &str,
79 schema: &str,
80 _query_ctx: Option<&QueryContext>,
81 ) -> Result<Vec<String>> {
82 Ok(self
83 .catalogs
84 .read()
85 .unwrap()
86 .get(catalog)
87 .with_context(|| CatalogNotFoundSnafu {
88 catalog_name: catalog,
89 })?
90 .get(schema)
91 .with_context(|| SchemaNotFoundSnafu { catalog, schema })?
92 .keys()
93 .cloned()
94 .collect())
95 }
96
97 async fn catalog_exists(&self, catalog: &str) -> Result<bool> {
98 self.catalog_exist_sync(catalog)
99 }
100
101 async fn schema_exists(
102 &self,
103 catalog: &str,
104 schema: &str,
105 _query_ctx: Option<&QueryContext>,
106 ) -> Result<bool> {
107 self.schema_exist_sync(catalog, schema)
108 }
109
110 async fn table_exists(
111 &self,
112 catalog: &str,
113 schema: &str,
114 table: &str,
115 _query_ctx: Option<&QueryContext>,
116 ) -> Result<bool> {
117 let catalogs = self.catalogs.read().unwrap();
118 Ok(catalogs
119 .get(catalog)
120 .with_context(|| CatalogNotFoundSnafu {
121 catalog_name: catalog,
122 })?
123 .get(schema)
124 .with_context(|| SchemaNotFoundSnafu { catalog, schema })?
125 .contains_key(table))
126 }
127
128 async fn table(
129 &self,
130 catalog: &str,
131 schema: &str,
132 table_name: &str,
133 _query_ctx: Option<&QueryContext>,
134 ) -> Result<Option<TableRef>> {
135 let result = try {
136 self.catalogs
137 .read()
138 .unwrap()
139 .get(catalog)?
140 .get(schema)?
141 .get(table_name)
142 .cloned()?
143 };
144 Ok(result)
145 }
146
147 async fn tables_by_ids(
148 &self,
149 catalog: &str,
150 schema: &str,
151 table_ids: &[TableId],
152 ) -> Result<Vec<TableRef>> {
153 let catalogs = self.catalogs.read().unwrap();
154
155 let schemas = catalogs.get(catalog).context(CatalogNotFoundSnafu {
156 catalog_name: catalog,
157 })?;
158
159 let tables = schemas
160 .get(schema)
161 .context(SchemaNotFoundSnafu { catalog, schema })?;
162
163 let filter_ids: HashSet<_> = table_ids.iter().collect();
164 let tables = tables
166 .values()
167 .filter(|t| filter_ids.contains(&t.table_info().table_id()))
168 .cloned()
169 .collect::<Vec<_>>();
170
171 Ok(tables)
172 }
173
174 fn tables<'a>(
175 &'a self,
176 catalog: &'a str,
177 schema: &'a str,
178 _query_ctx: Option<&QueryContext>,
179 ) -> BoxStream<'a, Result<TableRef>> {
180 let catalogs = self.catalogs.read().unwrap();
181
182 let Some(schemas) = catalogs.get(catalog) else {
183 return Box::pin(stream!({
184 yield CatalogNotFoundSnafu {
185 catalog_name: catalog,
186 }
187 .fail();
188 }));
189 };
190
191 let Some(tables) = schemas.get(schema) else {
192 return Box::pin(stream!({
193 yield SchemaNotFoundSnafu { catalog, schema }.fail();
194 }));
195 };
196
197 let tables = tables.values().cloned().collect::<Vec<_>>();
198
199 Box::pin(try_stream!({
200 for table in tables {
201 yield table;
202 }
203 }))
204 }
205}
206
207impl MemoryCatalogManager {
208 pub fn new() -> Arc<Self> {
209 Arc::new(Self {
210 catalogs: Default::default(),
211 })
212 }
213
214 pub fn with_default_setup() -> Arc<Self> {
217 let manager = Arc::new(Self {
218 catalogs: Default::default(),
219 });
220
221 manager.register_catalog_sync(DEFAULT_CATALOG_NAME).unwrap();
223 manager
224 .register_schema_sync(RegisterSchemaRequest {
225 catalog: DEFAULT_CATALOG_NAME.to_string(),
226 schema: DEFAULT_SCHEMA_NAME.to_string(),
227 })
228 .unwrap();
229 manager
230 .register_schema_sync(RegisterSchemaRequest {
231 catalog: DEFAULT_CATALOG_NAME.to_string(),
232 schema: DEFAULT_PRIVATE_SCHEMA_NAME.to_string(),
233 })
234 .unwrap();
235 manager
236 .register_schema_sync(RegisterSchemaRequest {
237 catalog: DEFAULT_CATALOG_NAME.to_string(),
238 schema: PG_CATALOG_NAME.to_string(),
239 })
240 .unwrap();
241 manager
242 .register_schema_sync(RegisterSchemaRequest {
243 catalog: DEFAULT_CATALOG_NAME.to_string(),
244 schema: INFORMATION_SCHEMA_NAME.to_string(),
245 })
246 .unwrap();
247
248 manager
249 }
250
251 fn schema_exist_sync(&self, catalog: &str, schema: &str) -> Result<bool> {
252 Ok(self
253 .catalogs
254 .read()
255 .unwrap()
256 .get(catalog)
257 .with_context(|| CatalogNotFoundSnafu {
258 catalog_name: catalog,
259 })?
260 .contains_key(schema))
261 }
262
263 fn catalog_exist_sync(&self, catalog: &str) -> Result<bool> {
264 Ok(self.catalogs.read().unwrap().contains_key(catalog))
265 }
266
267 pub fn register_catalog_sync(&self, name: &str) -> Result<bool> {
269 let name = name.to_string();
270
271 let mut catalogs = self.catalogs.write().unwrap();
272
273 match catalogs.entry(name.clone()) {
274 Entry::Vacant(e) => {
275 let arc_self = Arc::new(self.clone());
276 let catalog = arc_self.create_catalog_entry(name);
277 e.insert(catalog);
278 crate::metrics::METRIC_CATALOG_MANAGER_CATALOG_COUNT.inc();
279 Ok(true)
280 }
281 Entry::Occupied(_) => Ok(false),
282 }
283 }
284
285 pub fn deregister_table_sync(&self, request: DeregisterTableRequest) -> Result<()> {
286 let mut catalogs = self.catalogs.write().unwrap();
287 let schema = catalogs
288 .get_mut(&request.catalog)
289 .with_context(|| CatalogNotFoundSnafu {
290 catalog_name: &request.catalog,
291 })?
292 .get_mut(&request.schema)
293 .with_context(|| SchemaNotFoundSnafu {
294 catalog: &request.catalog,
295 schema: &request.schema,
296 })?;
297 let result = schema.remove(&request.table_name);
298 if result.is_some() {
299 crate::metrics::METRIC_CATALOG_MANAGER_TABLE_COUNT
300 .with_label_values(&[build_db_string(&request.catalog, &request.schema).as_str()])
301 .dec();
302 }
303 Ok(())
304 }
305
306 pub fn register_schema_sync(&self, request: RegisterSchemaRequest) -> Result<bool> {
310 let mut catalogs = self.catalogs.write().unwrap();
311 let catalog = catalogs
312 .get_mut(&request.catalog)
313 .with_context(|| CatalogNotFoundSnafu {
314 catalog_name: &request.catalog,
315 })?;
316
317 match catalog.entry(request.schema) {
318 Entry::Vacant(e) => {
319 e.insert(HashMap::new());
320 crate::metrics::METRIC_CATALOG_MANAGER_SCHEMA_COUNT.inc();
321 Ok(true)
322 }
323 Entry::Occupied(_) => Ok(false),
324 }
325 }
326
327 pub fn register_table_sync(&self, request: RegisterTableRequest) -> Result<bool> {
329 let mut catalogs = self.catalogs.write().unwrap();
330 let schema = catalogs
331 .get_mut(&request.catalog)
332 .with_context(|| CatalogNotFoundSnafu {
333 catalog_name: &request.catalog,
334 })?
335 .get_mut(&request.schema)
336 .with_context(|| SchemaNotFoundSnafu {
337 catalog: &request.catalog,
338 schema: &request.schema,
339 })?;
340
341 if schema.contains_key(&request.table_name) {
342 return TableExistsSnafu {
343 table: &request.table_name,
344 }
345 .fail();
346 }
347 schema.insert(request.table_name, request.table);
348 crate::metrics::METRIC_CATALOG_MANAGER_TABLE_COUNT
349 .with_label_values(&[build_db_string(&request.catalog, &request.schema).as_str()])
350 .inc();
351 Ok(true)
352 }
353
354 fn create_catalog_entry(self: &Arc<Self>, catalog: String) -> SchemaEntries {
355 let information_schema_provider = InformationSchemaProvider::new(
356 catalog,
357 Arc::downgrade(self) as Weak<dyn CatalogManager>,
358 Arc::new(FlowMetadataManager::new(Arc::new(MemoryKvBackend::new()))),
359 );
360 let information_schema = information_schema_provider.tables().clone();
361
362 let mut catalog = HashMap::new();
363 catalog.insert(INFORMATION_SCHEMA_NAME.to_string(), information_schema);
364 catalog
365 }
366
367 #[cfg(any(test, feature = "testing"))]
368 pub fn new_with_table(table: TableRef) -> Arc<Self> {
369 let manager = Self::with_default_setup();
370 let catalog = &table.table_info().catalog_name;
371 let schema = &table.table_info().schema_name;
372
373 if !manager.catalog_exist_sync(catalog).unwrap() {
374 manager.register_catalog_sync(catalog).unwrap();
375 }
376
377 if !manager.schema_exist_sync(catalog, schema).unwrap() {
378 manager
379 .register_schema_sync(RegisterSchemaRequest {
380 catalog: catalog.to_string(),
381 schema: schema.to_string(),
382 })
383 .unwrap();
384 }
385
386 let request = RegisterTableRequest {
387 catalog: catalog.to_string(),
388 schema: schema.to_string(),
389 table_name: table.table_info().name.clone(),
390 table_id: table.table_info().ident.table_id,
391 table,
392 };
393 let _ = manager.register_table_sync(request).unwrap();
394 manager
395 }
396}
397
398pub fn new_memory_catalog_manager() -> Result<Arc<MemoryCatalogManager>> {
400 Ok(MemoryCatalogManager::with_default_setup())
401}
402
403#[cfg(test)]
404mod tests {
405 use common_catalog::consts::*;
406 use futures_util::TryStreamExt;
407 use table::table::numbers::{NumbersTable, NUMBERS_TABLE_NAME};
408
409 use super::*;
410
411 #[tokio::test]
412 async fn test_new_memory_catalog_list() {
413 let catalog_list = new_memory_catalog_manager().unwrap();
414
415 let register_request = RegisterTableRequest {
416 catalog: DEFAULT_CATALOG_NAME.to_string(),
417 schema: DEFAULT_SCHEMA_NAME.to_string(),
418 table_name: NUMBERS_TABLE_NAME.to_string(),
419 table_id: NUMBERS_TABLE_ID,
420 table: NumbersTable::table(NUMBERS_TABLE_ID),
421 };
422
423 catalog_list.register_table_sync(register_request).unwrap();
424 let table = catalog_list
425 .table(
426 DEFAULT_CATALOG_NAME,
427 DEFAULT_SCHEMA_NAME,
428 NUMBERS_TABLE_NAME,
429 None,
430 )
431 .await
432 .unwrap()
433 .unwrap();
434 let stream = catalog_list.tables(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, None);
435 let tables = stream.try_collect::<Vec<_>>().await.unwrap();
436 assert_eq!(tables.len(), 1);
437 assert_eq!(
438 table.table_info().table_id(),
439 tables[0].table_info().table_id()
440 );
441
442 assert!(catalog_list
443 .table(
444 DEFAULT_CATALOG_NAME,
445 DEFAULT_SCHEMA_NAME,
446 "not_exists",
447 None
448 )
449 .await
450 .unwrap()
451 .is_none());
452 }
453
454 #[test]
455 pub fn test_register_catalog_sync() {
456 let list = MemoryCatalogManager::with_default_setup();
457 assert!(list.register_catalog_sync("test_catalog").unwrap());
458 assert!(!list.register_catalog_sync("test_catalog").unwrap());
459 }
460
461 #[tokio::test]
462 pub async fn test_catalog_deregister_table() {
463 let catalog = MemoryCatalogManager::with_default_setup();
464 let table_name = "foo_table";
465
466 let register_table_req = RegisterTableRequest {
467 catalog: DEFAULT_CATALOG_NAME.to_string(),
468 schema: DEFAULT_SCHEMA_NAME.to_string(),
469 table_name: table_name.to_string(),
470 table_id: 2333,
471 table: NumbersTable::table(2333),
472 };
473 catalog.register_table_sync(register_table_req).unwrap();
474 assert!(catalog
475 .table(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, table_name, None)
476 .await
477 .unwrap()
478 .is_some());
479
480 let deregister_table_req = DeregisterTableRequest {
481 catalog: DEFAULT_CATALOG_NAME.to_string(),
482 schema: DEFAULT_SCHEMA_NAME.to_string(),
483 table_name: table_name.to_string(),
484 };
485 catalog.deregister_table_sync(deregister_table_req).unwrap();
486 assert!(catalog
487 .table(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, table_name, None)
488 .await
489 .unwrap()
490 .is_none());
491 }
492}