1use std::any::Any;
16use std::collections::hash_map::Entry;
17use std::collections::{HashMap, HashSet};
18use std::sync::{Arc, RwLock, Weak};
19
20use async_stream::{stream, try_stream};
21use common_catalog::build_db_string;
22use common_catalog::consts::{
23 DEFAULT_CATALOG_NAME, DEFAULT_PRIVATE_SCHEMA_NAME, DEFAULT_SCHEMA_NAME,
24 INFORMATION_SCHEMA_NAME, PG_CATALOG_NAME,
25};
26use common_meta::key::flow::FlowMetadataManager;
27use common_meta::kv_backend::memory::MemoryKvBackend;
28use futures_util::stream::BoxStream;
29use session::context::QueryContext;
30use snafu::OptionExt;
31use table::metadata::TableId;
32use table::TableRef;
33
34use crate::error::{CatalogNotFoundSnafu, Result, SchemaNotFoundSnafu, TableExistsSnafu};
35use crate::information_schema::InformationSchemaProvider;
36use crate::system_schema::SystemSchemaProvider;
37use crate::{CatalogManager, DeregisterTableRequest, RegisterSchemaRequest, RegisterTableRequest};
38
39type SchemaEntries = HashMap<String, HashMap<String, TableRef>>;
40
41#[derive(Clone)]
43pub struct MemoryCatalogManager {
44 catalogs: Arc<RwLock<HashMap<String, SchemaEntries>>>,
46}
47
48#[async_trait::async_trait]
49impl CatalogManager for MemoryCatalogManager {
50 fn as_any(&self) -> &dyn Any {
51 self
52 }
53
54 async fn catalog_names(&self) -> Result<Vec<String>> {
55 Ok(self.catalogs.read().unwrap().keys().cloned().collect())
56 }
57
58 async fn schema_names(
59 &self,
60 catalog: &str,
61 _query_ctx: Option<&QueryContext>,
62 ) -> Result<Vec<String>> {
63 Ok(self
64 .catalogs
65 .read()
66 .unwrap()
67 .get(catalog)
68 .with_context(|| CatalogNotFoundSnafu {
69 catalog_name: catalog,
70 })?
71 .keys()
72 .cloned()
73 .collect())
74 }
75
76 async fn table_names(
77 &self,
78 catalog: &str,
79 schema: &str,
80 _query_ctx: Option<&QueryContext>,
81 ) -> Result<Vec<String>> {
82 Ok(self
83 .catalogs
84 .read()
85 .unwrap()
86 .get(catalog)
87 .with_context(|| CatalogNotFoundSnafu {
88 catalog_name: catalog,
89 })?
90 .get(schema)
91 .with_context(|| SchemaNotFoundSnafu { catalog, schema })?
92 .keys()
93 .cloned()
94 .collect())
95 }
96
97 async fn catalog_exists(&self, catalog: &str) -> Result<bool> {
98 self.catalog_exist_sync(catalog)
99 }
100
101 async fn schema_exists(
102 &self,
103 catalog: &str,
104 schema: &str,
105 _query_ctx: Option<&QueryContext>,
106 ) -> Result<bool> {
107 self.schema_exist_sync(catalog, schema)
108 }
109
110 async fn table_exists(
111 &self,
112 catalog: &str,
113 schema: &str,
114 table: &str,
115 _query_ctx: Option<&QueryContext>,
116 ) -> Result<bool> {
117 let catalogs = self.catalogs.read().unwrap();
118 Ok(catalogs
119 .get(catalog)
120 .with_context(|| CatalogNotFoundSnafu {
121 catalog_name: catalog,
122 })?
123 .get(schema)
124 .with_context(|| SchemaNotFoundSnafu { catalog, schema })?
125 .contains_key(table))
126 }
127
128 async fn table(
129 &self,
130 catalog: &str,
131 schema: &str,
132 table_name: &str,
133 _query_ctx: Option<&QueryContext>,
134 ) -> Result<Option<TableRef>> {
135 let result = try {
136 self.catalogs
137 .read()
138 .unwrap()
139 .get(catalog)?
140 .get(schema)?
141 .get(table_name)
142 .cloned()?
143 };
144 Ok(result)
145 }
146
147 async fn tables_by_ids(
148 &self,
149 catalog: &str,
150 schema: &str,
151 table_ids: &[TableId],
152 ) -> Result<Vec<TableRef>> {
153 let catalogs = self.catalogs.read().unwrap();
154
155 let schemas = catalogs.get(catalog).context(CatalogNotFoundSnafu {
156 catalog_name: catalog,
157 })?;
158
159 let tables = schemas
160 .get(schema)
161 .context(SchemaNotFoundSnafu { catalog, schema })?;
162
163 let filter_ids: HashSet<_> = table_ids.iter().collect();
164 let tables = tables
166 .values()
167 .filter(|t| filter_ids.contains(&t.table_info().table_id()))
168 .cloned()
169 .collect::<Vec<_>>();
170
171 Ok(tables)
172 }
173
174 fn tables<'a>(
175 &'a self,
176 catalog: &'a str,
177 schema: &'a str,
178 _query_ctx: Option<&QueryContext>,
179 ) -> BoxStream<'a, Result<TableRef>> {
180 let catalogs = self.catalogs.read().unwrap();
181
182 let Some(schemas) = catalogs.get(catalog) else {
183 return Box::pin(stream!({
184 yield CatalogNotFoundSnafu {
185 catalog_name: catalog,
186 }
187 .fail();
188 }));
189 };
190
191 let Some(tables) = schemas.get(schema) else {
192 return Box::pin(stream!({
193 yield SchemaNotFoundSnafu { catalog, schema }.fail();
194 }));
195 };
196
197 let tables = tables.values().cloned().collect::<Vec<_>>();
198
199 Box::pin(try_stream!({
200 for table in tables {
201 yield table;
202 }
203 }))
204 }
205}
206
207impl MemoryCatalogManager {
208 pub fn new() -> Arc<Self> {
209 Arc::new(Self {
210 catalogs: Default::default(),
211 })
212 }
213
214 pub fn with_default_setup() -> Arc<Self> {
217 let manager = Arc::new(Self {
218 catalogs: Default::default(),
219 });
220
221 manager.register_catalog_sync(DEFAULT_CATALOG_NAME).unwrap();
223 manager
224 .register_schema_sync(RegisterSchemaRequest {
225 catalog: DEFAULT_CATALOG_NAME.to_string(),
226 schema: DEFAULT_SCHEMA_NAME.to_string(),
227 })
228 .unwrap();
229 manager
230 .register_schema_sync(RegisterSchemaRequest {
231 catalog: DEFAULT_CATALOG_NAME.to_string(),
232 schema: DEFAULT_PRIVATE_SCHEMA_NAME.to_string(),
233 })
234 .unwrap();
235 manager
236 .register_schema_sync(RegisterSchemaRequest {
237 catalog: DEFAULT_CATALOG_NAME.to_string(),
238 schema: PG_CATALOG_NAME.to_string(),
239 })
240 .unwrap();
241 manager
242 .register_schema_sync(RegisterSchemaRequest {
243 catalog: DEFAULT_CATALOG_NAME.to_string(),
244 schema: INFORMATION_SCHEMA_NAME.to_string(),
245 })
246 .unwrap();
247
248 manager
249 }
250
251 fn schema_exist_sync(&self, catalog: &str, schema: &str) -> Result<bool> {
252 Ok(self
253 .catalogs
254 .read()
255 .unwrap()
256 .get(catalog)
257 .with_context(|| CatalogNotFoundSnafu {
258 catalog_name: catalog,
259 })?
260 .contains_key(schema))
261 }
262
263 fn catalog_exist_sync(&self, catalog: &str) -> Result<bool> {
264 Ok(self.catalogs.read().unwrap().contains_key(catalog))
265 }
266
267 pub fn register_catalog_sync(&self, name: &str) -> Result<bool> {
269 let name = name.to_string();
270
271 let mut catalogs = self.catalogs.write().unwrap();
272
273 match catalogs.entry(name.clone()) {
274 Entry::Vacant(e) => {
275 let arc_self = Arc::new(self.clone());
276 let catalog = arc_self.create_catalog_entry(name);
277 e.insert(catalog);
278 crate::metrics::METRIC_CATALOG_MANAGER_CATALOG_COUNT.inc();
279 Ok(true)
280 }
281 Entry::Occupied(_) => Ok(false),
282 }
283 }
284
285 pub fn deregister_table_sync(&self, request: DeregisterTableRequest) -> Result<()> {
286 let mut catalogs = self.catalogs.write().unwrap();
287 let schema = catalogs
288 .get_mut(&request.catalog)
289 .with_context(|| CatalogNotFoundSnafu {
290 catalog_name: &request.catalog,
291 })?
292 .get_mut(&request.schema)
293 .with_context(|| SchemaNotFoundSnafu {
294 catalog: &request.catalog,
295 schema: &request.schema,
296 })?;
297 let result = schema.remove(&request.table_name);
298 if result.is_some() {
299 crate::metrics::METRIC_CATALOG_MANAGER_TABLE_COUNT
300 .with_label_values(&[build_db_string(&request.catalog, &request.schema).as_str()])
301 .dec();
302 }
303 Ok(())
304 }
305
306 pub fn register_schema_sync(&self, request: RegisterSchemaRequest) -> Result<bool> {
310 let mut catalogs = self.catalogs.write().unwrap();
311 let catalog = catalogs
312 .get_mut(&request.catalog)
313 .with_context(|| CatalogNotFoundSnafu {
314 catalog_name: &request.catalog,
315 })?;
316
317 match catalog.entry(request.schema) {
318 Entry::Vacant(e) => {
319 e.insert(HashMap::new());
320 crate::metrics::METRIC_CATALOG_MANAGER_SCHEMA_COUNT.inc();
321 Ok(true)
322 }
323 Entry::Occupied(_) => Ok(false),
324 }
325 }
326
327 pub fn register_table_sync(&self, request: RegisterTableRequest) -> Result<bool> {
329 let mut catalogs = self.catalogs.write().unwrap();
330 let schema = catalogs
331 .get_mut(&request.catalog)
332 .with_context(|| CatalogNotFoundSnafu {
333 catalog_name: &request.catalog,
334 })?
335 .get_mut(&request.schema)
336 .with_context(|| SchemaNotFoundSnafu {
337 catalog: &request.catalog,
338 schema: &request.schema,
339 })?;
340
341 if schema.contains_key(&request.table_name) {
342 return TableExistsSnafu {
343 table: &request.table_name,
344 }
345 .fail();
346 }
347 schema.insert(request.table_name, request.table);
348 crate::metrics::METRIC_CATALOG_MANAGER_TABLE_COUNT
349 .with_label_values(&[build_db_string(&request.catalog, &request.schema).as_str()])
350 .inc();
351 Ok(true)
352 }
353
354 fn create_catalog_entry(self: &Arc<Self>, catalog: String) -> SchemaEntries {
355 let backend = Arc::new(MemoryKvBackend::new());
356 let information_schema_provider = InformationSchemaProvider::new(
357 catalog,
358 Arc::downgrade(self) as Weak<dyn CatalogManager>,
359 Arc::new(FlowMetadataManager::new(backend.clone())),
360 None, backend,
362 );
363 let information_schema = information_schema_provider.tables().clone();
364
365 let mut catalog = HashMap::new();
366 catalog.insert(INFORMATION_SCHEMA_NAME.to_string(), information_schema);
367 catalog
368 }
369
370 #[cfg(any(test, feature = "testing"))]
371 pub fn new_with_table(table: TableRef) -> Arc<Self> {
372 let manager = Self::with_default_setup();
373 let catalog = &table.table_info().catalog_name;
374 let schema = &table.table_info().schema_name;
375
376 if !manager.catalog_exist_sync(catalog).unwrap() {
377 manager.register_catalog_sync(catalog).unwrap();
378 }
379
380 if !manager.schema_exist_sync(catalog, schema).unwrap() {
381 manager
382 .register_schema_sync(RegisterSchemaRequest {
383 catalog: catalog.to_string(),
384 schema: schema.to_string(),
385 })
386 .unwrap();
387 }
388
389 let request = RegisterTableRequest {
390 catalog: catalog.to_string(),
391 schema: schema.to_string(),
392 table_name: table.table_info().name.clone(),
393 table_id: table.table_info().ident.table_id,
394 table,
395 };
396 let _ = manager.register_table_sync(request).unwrap();
397 manager
398 }
399}
400
401pub fn new_memory_catalog_manager() -> Result<Arc<MemoryCatalogManager>> {
403 Ok(MemoryCatalogManager::with_default_setup())
404}
405
406#[cfg(test)]
407mod tests {
408 use common_catalog::consts::*;
409 use futures_util::TryStreamExt;
410 use table::table::numbers::{NumbersTable, NUMBERS_TABLE_NAME};
411
412 use super::*;
413
414 #[tokio::test]
415 async fn test_new_memory_catalog_list() {
416 let catalog_list = new_memory_catalog_manager().unwrap();
417
418 let register_request = RegisterTableRequest {
419 catalog: DEFAULT_CATALOG_NAME.to_string(),
420 schema: DEFAULT_SCHEMA_NAME.to_string(),
421 table_name: NUMBERS_TABLE_NAME.to_string(),
422 table_id: NUMBERS_TABLE_ID,
423 table: NumbersTable::table(NUMBERS_TABLE_ID),
424 };
425
426 catalog_list.register_table_sync(register_request).unwrap();
427 let table = catalog_list
428 .table(
429 DEFAULT_CATALOG_NAME,
430 DEFAULT_SCHEMA_NAME,
431 NUMBERS_TABLE_NAME,
432 None,
433 )
434 .await
435 .unwrap()
436 .unwrap();
437 let stream = catalog_list.tables(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, None);
438 let tables = stream.try_collect::<Vec<_>>().await.unwrap();
439 assert_eq!(tables.len(), 1);
440 assert_eq!(
441 table.table_info().table_id(),
442 tables[0].table_info().table_id()
443 );
444
445 assert!(catalog_list
446 .table(
447 DEFAULT_CATALOG_NAME,
448 DEFAULT_SCHEMA_NAME,
449 "not_exists",
450 None
451 )
452 .await
453 .unwrap()
454 .is_none());
455 }
456
457 #[test]
458 pub fn test_register_catalog_sync() {
459 let list = MemoryCatalogManager::with_default_setup();
460 assert!(list.register_catalog_sync("test_catalog").unwrap());
461 assert!(!list.register_catalog_sync("test_catalog").unwrap());
462 }
463
464 #[tokio::test]
465 pub async fn test_catalog_deregister_table() {
466 let catalog = MemoryCatalogManager::with_default_setup();
467 let table_name = "foo_table";
468
469 let register_table_req = RegisterTableRequest {
470 catalog: DEFAULT_CATALOG_NAME.to_string(),
471 schema: DEFAULT_SCHEMA_NAME.to_string(),
472 table_name: table_name.to_string(),
473 table_id: 2333,
474 table: NumbersTable::table(2333),
475 };
476 catalog.register_table_sync(register_table_req).unwrap();
477 assert!(catalog
478 .table(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, table_name, None)
479 .await
480 .unwrap()
481 .is_some());
482
483 let deregister_table_req = DeregisterTableRequest {
484 catalog: DEFAULT_CATALOG_NAME.to_string(),
485 schema: DEFAULT_SCHEMA_NAME.to_string(),
486 table_name: table_name.to_string(),
487 };
488 catalog.deregister_table_sync(deregister_table_req).unwrap();
489 assert!(catalog
490 .table(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, table_name, None)
491 .await
492 .unwrap()
493 .is_none());
494 }
495}