1use std::any::Any;
16use std::collections::hash_map::Entry;
17use std::collections::{HashMap, HashSet};
18use std::sync::{Arc, RwLock, Weak};
19
20use async_stream::{stream, try_stream};
21use common_catalog::build_db_string;
22use common_catalog::consts::{
23 DEFAULT_CATALOG_NAME, DEFAULT_PRIVATE_SCHEMA_NAME, DEFAULT_SCHEMA_NAME,
24 INFORMATION_SCHEMA_NAME, PG_CATALOG_NAME,
25};
26use common_meta::key::flow::FlowMetadataManager;
27use common_meta::kv_backend::memory::MemoryKvBackend;
28use futures_util::stream::BoxStream;
29use session::context::QueryContext;
30use snafu::OptionExt;
31use table::metadata::{TableId, TableInfoRef};
32use table::TableRef;
33
34use crate::error::{CatalogNotFoundSnafu, Result, SchemaNotFoundSnafu, TableExistsSnafu};
35use crate::information_schema::InformationSchemaProvider;
36use crate::system_schema::SystemSchemaProvider;
37use crate::{CatalogManager, DeregisterTableRequest, RegisterSchemaRequest, RegisterTableRequest};
38
39type SchemaEntries = HashMap<String, HashMap<String, TableRef>>;
40
41#[derive(Clone)]
43pub struct MemoryCatalogManager {
44 catalogs: Arc<RwLock<HashMap<String, SchemaEntries>>>,
46}
47
48#[async_trait::async_trait]
49impl CatalogManager for MemoryCatalogManager {
50 fn as_any(&self) -> &dyn Any {
51 self
52 }
53
54 async fn catalog_names(&self) -> Result<Vec<String>> {
55 Ok(self.catalogs.read().unwrap().keys().cloned().collect())
56 }
57
58 async fn schema_names(
59 &self,
60 catalog: &str,
61 _query_ctx: Option<&QueryContext>,
62 ) -> Result<Vec<String>> {
63 Ok(self
64 .catalogs
65 .read()
66 .unwrap()
67 .get(catalog)
68 .with_context(|| CatalogNotFoundSnafu {
69 catalog_name: catalog,
70 })?
71 .keys()
72 .cloned()
73 .collect())
74 }
75
76 async fn table_names(
77 &self,
78 catalog: &str,
79 schema: &str,
80 _query_ctx: Option<&QueryContext>,
81 ) -> Result<Vec<String>> {
82 Ok(self
83 .catalogs
84 .read()
85 .unwrap()
86 .get(catalog)
87 .with_context(|| CatalogNotFoundSnafu {
88 catalog_name: catalog,
89 })?
90 .get(schema)
91 .with_context(|| SchemaNotFoundSnafu { catalog, schema })?
92 .keys()
93 .cloned()
94 .collect())
95 }
96
97 async fn catalog_exists(&self, catalog: &str) -> Result<bool> {
98 self.catalog_exist_sync(catalog)
99 }
100
101 async fn schema_exists(
102 &self,
103 catalog: &str,
104 schema: &str,
105 _query_ctx: Option<&QueryContext>,
106 ) -> Result<bool> {
107 self.schema_exist_sync(catalog, schema)
108 }
109
110 async fn table_exists(
111 &self,
112 catalog: &str,
113 schema: &str,
114 table: &str,
115 _query_ctx: Option<&QueryContext>,
116 ) -> Result<bool> {
117 let catalogs = self.catalogs.read().unwrap();
118 Ok(catalogs
119 .get(catalog)
120 .with_context(|| CatalogNotFoundSnafu {
121 catalog_name: catalog,
122 })?
123 .get(schema)
124 .with_context(|| SchemaNotFoundSnafu { catalog, schema })?
125 .contains_key(table))
126 }
127
128 async fn table(
129 &self,
130 catalog: &str,
131 schema: &str,
132 table_name: &str,
133 _query_ctx: Option<&QueryContext>,
134 ) -> Result<Option<TableRef>> {
135 let result = try {
136 self.catalogs
137 .read()
138 .unwrap()
139 .get(catalog)?
140 .get(schema)?
141 .get(table_name)
142 .cloned()?
143 };
144 Ok(result)
145 }
146
147 async fn table_info_by_id(&self, table_id: TableId) -> Result<Option<TableInfoRef>> {
148 Ok(self
149 .catalogs
150 .read()
151 .unwrap()
152 .iter()
153 .flat_map(|(_, schema_entries)| schema_entries.values())
154 .flat_map(|tables| tables.values())
155 .find(|t| t.table_info().ident.table_id == table_id)
156 .map(|t| t.table_info()))
157 }
158
159 async fn tables_by_ids(
160 &self,
161 catalog: &str,
162 schema: &str,
163 table_ids: &[TableId],
164 ) -> Result<Vec<TableRef>> {
165 let catalogs = self.catalogs.read().unwrap();
166
167 let schemas = catalogs.get(catalog).context(CatalogNotFoundSnafu {
168 catalog_name: catalog,
169 })?;
170
171 let tables = schemas
172 .get(schema)
173 .context(SchemaNotFoundSnafu { catalog, schema })?;
174
175 let filter_ids: HashSet<_> = table_ids.iter().collect();
176 let tables = tables
178 .values()
179 .filter(|t| filter_ids.contains(&t.table_info().table_id()))
180 .cloned()
181 .collect::<Vec<_>>();
182
183 Ok(tables)
184 }
185
186 fn tables<'a>(
187 &'a self,
188 catalog: &'a str,
189 schema: &'a str,
190 _query_ctx: Option<&QueryContext>,
191 ) -> BoxStream<'a, Result<TableRef>> {
192 let catalogs = self.catalogs.read().unwrap();
193
194 let Some(schemas) = catalogs.get(catalog) else {
195 return Box::pin(stream!({
196 yield CatalogNotFoundSnafu {
197 catalog_name: catalog,
198 }
199 .fail();
200 }));
201 };
202
203 let Some(tables) = schemas.get(schema) else {
204 return Box::pin(stream!({
205 yield SchemaNotFoundSnafu { catalog, schema }.fail();
206 }));
207 };
208
209 let tables = tables.values().cloned().collect::<Vec<_>>();
210
211 Box::pin(try_stream!({
212 for table in tables {
213 yield table;
214 }
215 }))
216 }
217}
218
219impl MemoryCatalogManager {
220 pub fn new() -> Arc<Self> {
221 Arc::new(Self {
222 catalogs: Default::default(),
223 })
224 }
225
226 pub fn with_default_setup() -> Arc<Self> {
229 let manager = Arc::new(Self {
230 catalogs: Default::default(),
231 });
232
233 manager.register_catalog_sync(DEFAULT_CATALOG_NAME).unwrap();
235 manager
236 .register_schema_sync(RegisterSchemaRequest {
237 catalog: DEFAULT_CATALOG_NAME.to_string(),
238 schema: DEFAULT_SCHEMA_NAME.to_string(),
239 })
240 .unwrap();
241 manager
242 .register_schema_sync(RegisterSchemaRequest {
243 catalog: DEFAULT_CATALOG_NAME.to_string(),
244 schema: DEFAULT_PRIVATE_SCHEMA_NAME.to_string(),
245 })
246 .unwrap();
247 manager
248 .register_schema_sync(RegisterSchemaRequest {
249 catalog: DEFAULT_CATALOG_NAME.to_string(),
250 schema: PG_CATALOG_NAME.to_string(),
251 })
252 .unwrap();
253 manager
254 .register_schema_sync(RegisterSchemaRequest {
255 catalog: DEFAULT_CATALOG_NAME.to_string(),
256 schema: INFORMATION_SCHEMA_NAME.to_string(),
257 })
258 .unwrap();
259
260 manager
261 }
262
263 fn schema_exist_sync(&self, catalog: &str, schema: &str) -> Result<bool> {
264 Ok(self
265 .catalogs
266 .read()
267 .unwrap()
268 .get(catalog)
269 .with_context(|| CatalogNotFoundSnafu {
270 catalog_name: catalog,
271 })?
272 .contains_key(schema))
273 }
274
275 fn catalog_exist_sync(&self, catalog: &str) -> Result<bool> {
276 Ok(self.catalogs.read().unwrap().contains_key(catalog))
277 }
278
279 pub fn register_catalog_sync(&self, name: &str) -> Result<bool> {
281 let name = name.to_string();
282
283 let mut catalogs = self.catalogs.write().unwrap();
284
285 match catalogs.entry(name.clone()) {
286 Entry::Vacant(e) => {
287 let arc_self = Arc::new(self.clone());
288 let catalog = arc_self.create_catalog_entry(name);
289 e.insert(catalog);
290 crate::metrics::METRIC_CATALOG_MANAGER_CATALOG_COUNT.inc();
291 Ok(true)
292 }
293 Entry::Occupied(_) => Ok(false),
294 }
295 }
296
297 pub fn deregister_table_sync(&self, request: DeregisterTableRequest) -> Result<()> {
298 let mut catalogs = self.catalogs.write().unwrap();
299 let schema = catalogs
300 .get_mut(&request.catalog)
301 .with_context(|| CatalogNotFoundSnafu {
302 catalog_name: &request.catalog,
303 })?
304 .get_mut(&request.schema)
305 .with_context(|| SchemaNotFoundSnafu {
306 catalog: &request.catalog,
307 schema: &request.schema,
308 })?;
309 let result = schema.remove(&request.table_name);
310 if result.is_some() {
311 crate::metrics::METRIC_CATALOG_MANAGER_TABLE_COUNT
312 .with_label_values(&[build_db_string(&request.catalog, &request.schema).as_str()])
313 .dec();
314 }
315 Ok(())
316 }
317
318 pub fn register_schema_sync(&self, request: RegisterSchemaRequest) -> Result<bool> {
322 let mut catalogs = self.catalogs.write().unwrap();
323 let catalog = catalogs
324 .get_mut(&request.catalog)
325 .with_context(|| CatalogNotFoundSnafu {
326 catalog_name: &request.catalog,
327 })?;
328
329 match catalog.entry(request.schema) {
330 Entry::Vacant(e) => {
331 e.insert(HashMap::new());
332 crate::metrics::METRIC_CATALOG_MANAGER_SCHEMA_COUNT.inc();
333 Ok(true)
334 }
335 Entry::Occupied(_) => Ok(false),
336 }
337 }
338
339 pub fn register_table_sync(&self, request: RegisterTableRequest) -> Result<bool> {
341 let mut catalogs = self.catalogs.write().unwrap();
342 let schema = catalogs
343 .get_mut(&request.catalog)
344 .with_context(|| CatalogNotFoundSnafu {
345 catalog_name: &request.catalog,
346 })?
347 .get_mut(&request.schema)
348 .with_context(|| SchemaNotFoundSnafu {
349 catalog: &request.catalog,
350 schema: &request.schema,
351 })?;
352
353 if schema.contains_key(&request.table_name) {
354 return TableExistsSnafu {
355 table: &request.table_name,
356 }
357 .fail();
358 }
359 schema.insert(request.table_name, request.table);
360 crate::metrics::METRIC_CATALOG_MANAGER_TABLE_COUNT
361 .with_label_values(&[build_db_string(&request.catalog, &request.schema).as_str()])
362 .inc();
363 Ok(true)
364 }
365
366 fn create_catalog_entry(self: &Arc<Self>, catalog: String) -> SchemaEntries {
367 let backend = Arc::new(MemoryKvBackend::new());
368 let information_schema_provider = InformationSchemaProvider::new(
369 catalog,
370 Arc::downgrade(self) as Weak<dyn CatalogManager>,
371 Arc::new(FlowMetadataManager::new(backend.clone())),
372 None, backend,
374 );
375 let information_schema = information_schema_provider.tables().clone();
376
377 let mut catalog = HashMap::new();
378 catalog.insert(INFORMATION_SCHEMA_NAME.to_string(), information_schema);
379 catalog
380 }
381
382 #[cfg(any(test, feature = "testing"))]
383 pub fn new_with_table(table: TableRef) -> Arc<Self> {
384 let manager = Self::with_default_setup();
385 let catalog = &table.table_info().catalog_name;
386 let schema = &table.table_info().schema_name;
387
388 if !manager.catalog_exist_sync(catalog).unwrap() {
389 manager.register_catalog_sync(catalog).unwrap();
390 }
391
392 if !manager.schema_exist_sync(catalog, schema).unwrap() {
393 manager
394 .register_schema_sync(RegisterSchemaRequest {
395 catalog: catalog.to_string(),
396 schema: schema.to_string(),
397 })
398 .unwrap();
399 }
400
401 let request = RegisterTableRequest {
402 catalog: catalog.to_string(),
403 schema: schema.to_string(),
404 table_name: table.table_info().name.clone(),
405 table_id: table.table_info().ident.table_id,
406 table,
407 };
408 let _ = manager.register_table_sync(request).unwrap();
409 manager
410 }
411}
412
413pub fn new_memory_catalog_manager() -> Result<Arc<MemoryCatalogManager>> {
415 Ok(MemoryCatalogManager::with_default_setup())
416}
417
418#[cfg(test)]
419mod tests {
420 use common_catalog::consts::*;
421 use futures_util::TryStreamExt;
422 use table::table::numbers::{NumbersTable, NUMBERS_TABLE_NAME};
423
424 use super::*;
425
426 #[tokio::test]
427 async fn test_new_memory_catalog_list() {
428 let catalog_list = new_memory_catalog_manager().unwrap();
429
430 let register_request = RegisterTableRequest {
431 catalog: DEFAULT_CATALOG_NAME.to_string(),
432 schema: DEFAULT_SCHEMA_NAME.to_string(),
433 table_name: NUMBERS_TABLE_NAME.to_string(),
434 table_id: NUMBERS_TABLE_ID,
435 table: NumbersTable::table(NUMBERS_TABLE_ID),
436 };
437
438 catalog_list.register_table_sync(register_request).unwrap();
439 let table = catalog_list
440 .table(
441 DEFAULT_CATALOG_NAME,
442 DEFAULT_SCHEMA_NAME,
443 NUMBERS_TABLE_NAME,
444 None,
445 )
446 .await
447 .unwrap()
448 .unwrap();
449 let stream = catalog_list.tables(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, None);
450 let tables = stream.try_collect::<Vec<_>>().await.unwrap();
451 assert_eq!(tables.len(), 1);
452 assert_eq!(
453 table.table_info().table_id(),
454 tables[0].table_info().table_id()
455 );
456
457 assert!(catalog_list
458 .table(
459 DEFAULT_CATALOG_NAME,
460 DEFAULT_SCHEMA_NAME,
461 "not_exists",
462 None
463 )
464 .await
465 .unwrap()
466 .is_none());
467 }
468
469 #[test]
470 pub fn test_register_catalog_sync() {
471 let list = MemoryCatalogManager::with_default_setup();
472 assert!(list.register_catalog_sync("test_catalog").unwrap());
473 assert!(!list.register_catalog_sync("test_catalog").unwrap());
474 }
475
476 #[tokio::test]
477 pub async fn test_catalog_deregister_table() {
478 let catalog = MemoryCatalogManager::with_default_setup();
479 let table_name = "foo_table";
480
481 let register_table_req = RegisterTableRequest {
482 catalog: DEFAULT_CATALOG_NAME.to_string(),
483 schema: DEFAULT_SCHEMA_NAME.to_string(),
484 table_name: table_name.to_string(),
485 table_id: 2333,
486 table: NumbersTable::table(2333),
487 };
488 catalog.register_table_sync(register_table_req).unwrap();
489 assert!(catalog
490 .table(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, table_name, None)
491 .await
492 .unwrap()
493 .is_some());
494
495 let deregister_table_req = DeregisterTableRequest {
496 catalog: DEFAULT_CATALOG_NAME.to_string(),
497 schema: DEFAULT_SCHEMA_NAME.to_string(),
498 table_name: table_name.to_string(),
499 };
500 catalog.deregister_table_sync(deregister_table_req).unwrap();
501 assert!(catalog
502 .table(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, table_name, None)
503 .await
504 .unwrap()
505 .is_none());
506 }
507}