1mod cluster_info;
16pub mod columns;
17pub mod flows;
18mod information_memory_table;
19pub mod key_column_usage;
20mod partitions;
21mod procedure_info;
22pub mod process_list;
23mod region_info;
24pub mod region_peers;
25mod region_statistics;
26pub mod schemata;
27mod ssts;
28mod table_constraints;
29mod table_names;
30pub mod tables;
31mod views;
32
33use std::collections::HashMap;
34use std::sync::{Arc, Weak};
35
36use common_catalog::consts::{self, DEFAULT_CATALOG_NAME, INFORMATION_SCHEMA_NAME};
37use common_error::ext::ErrorExt;
38use common_meta::cluster::NodeInfo;
39use common_meta::datanode::RegionStat;
40use common_meta::key::flow::FlowMetadataManager;
41use common_meta::key::flow::flow_state::FlowStat;
42use common_meta::kv_backend::KvBackendRef;
43use common_procedure::ProcedureInfo;
44use common_recordbatch::SendableRecordBatchStream;
45use datafusion::error::DataFusionError;
46use datafusion::logical_expr::LogicalPlan;
47use datatypes::schema::SchemaRef;
48use lazy_static::lazy_static;
49use paste::paste;
50use process_list::InformationSchemaProcessList;
51use region_info::InformationSchemaRegionInfo;
52use store_api::region_info::RegionInfoEntry;
53use store_api::sst_entry::{ManifestSstEntry, PuffinIndexMetaEntry, StorageSstEntry};
54use store_api::storage::{ScanRequest, TableId};
55use table::TableRef;
56use table::metadata::TableType;
57pub use table_names::*;
58use views::InformationSchemaViews;
59
60use self::columns::InformationSchemaColumns;
61use crate::CatalogManager;
62use crate::error::{Error, Result};
63use crate::process_manager::ProcessManagerRef;
64use crate::system_schema::information_schema::cluster_info::InformationSchemaClusterInfo;
65use crate::system_schema::information_schema::flows::InformationSchemaFlows;
66use crate::system_schema::information_schema::information_memory_table::get_schema_columns;
67use crate::system_schema::information_schema::key_column_usage::InformationSchemaKeyColumnUsage;
68use crate::system_schema::information_schema::partitions::InformationSchemaPartitions;
69use crate::system_schema::information_schema::region_peers::InformationSchemaRegionPeers;
70use crate::system_schema::information_schema::schemata::InformationSchemaSchemata;
71use crate::system_schema::information_schema::ssts::{
72 InformationSchemaSstsIndexMeta, InformationSchemaSstsManifest, InformationSchemaSstsStorage,
73};
74use crate::system_schema::information_schema::table_constraints::InformationSchemaTableConstraints;
75use crate::system_schema::information_schema::tables::InformationSchemaTables;
76use crate::system_schema::memory_table::MemoryTable;
77pub(crate) use crate::system_schema::predicate::Predicates;
78use crate::system_schema::{
79 SystemSchemaProvider, SystemSchemaProviderInner, SystemTable, SystemTableRef,
80};
81
82lazy_static! {
83 static ref MEMORY_TABLES: &'static [&'static str] = &[
85 ENGINES,
86 COLUMN_PRIVILEGES,
87 COLUMN_STATISTICS,
88 CHARACTER_SETS,
89 COLLATIONS,
90 COLLATION_CHARACTER_SET_APPLICABILITY,
91 CHECK_CONSTRAINTS,
92 EVENTS,
93 FILES,
94 OPTIMIZER_TRACE,
95 PARAMETERS,
96 PROFILING,
97 REFERENTIAL_CONSTRAINTS,
98 ROUTINES,
99 SCHEMA_PRIVILEGES,
100 TABLE_PRIVILEGES,
101 GLOBAL_STATUS,
102 SESSION_STATUS,
103 PARTITIONS,
104 ];
105}
106
107macro_rules! setup_memory_table {
108 ($name: expr) => {
109 paste! {
110 {
111 let (schema, columns) = get_schema_columns($name);
112 Some(Arc::new(MemoryTable::new(
113 consts::[<INFORMATION_SCHEMA_ $name _TABLE_ID>],
114 $name,
115 schema,
116 columns
117 )) as _)
118 }
119 }
120 };
121}
122
123pub struct MakeInformationTableRequest {
124 pub catalog_name: String,
125 pub catalog_manager: Weak<dyn CatalogManager>,
126 pub kv_backend: KvBackendRef,
127}
128
129pub trait InformationSchemaTableFactory {
134 fn make_information_table(&self, req: MakeInformationTableRequest) -> SystemTableRef;
135}
136
137pub type InformationSchemaTableFactoryRef = Arc<dyn InformationSchemaTableFactory + Send + Sync>;
138
139pub struct InformationSchemaProvider {
141 catalog_name: String,
142 catalog_manager: Weak<dyn CatalogManager>,
143 process_manager: Option<ProcessManagerRef>,
144 flow_metadata_manager: Arc<FlowMetadataManager>,
145 tables: HashMap<String, TableRef>,
146 kv_backend: KvBackendRef,
147 extra_table_factories: HashMap<String, InformationSchemaTableFactoryRef>,
148}
149
150impl SystemSchemaProvider for InformationSchemaProvider {
151 fn tables(&self) -> &HashMap<String, TableRef> {
152 assert!(!self.tables.is_empty());
153
154 &self.tables
155 }
156}
157
158impl SystemSchemaProviderInner for InformationSchemaProvider {
159 fn catalog_name(&self) -> &str {
160 &self.catalog_name
161 }
162 fn schema_name() -> &'static str {
163 INFORMATION_SCHEMA_NAME
164 }
165
166 fn system_table(&self, name: &str) -> Option<SystemTableRef> {
167 if let Some(factory) = self.extra_table_factories.get(name) {
168 let req = MakeInformationTableRequest {
169 catalog_name: self.catalog_name.clone(),
170 catalog_manager: self.catalog_manager.clone(),
171 kv_backend: self.kv_backend.clone(),
172 };
173 return Some(factory.make_information_table(req));
174 }
175
176 match name.to_ascii_lowercase().as_str() {
177 TABLES => Some(Arc::new(InformationSchemaTables::new(
178 self.catalog_name.clone(),
179 self.catalog_manager.clone(),
180 )) as _),
181 COLUMNS => Some(Arc::new(InformationSchemaColumns::new(
182 self.catalog_name.clone(),
183 self.catalog_manager.clone(),
184 )) as _),
185 ENGINES => setup_memory_table!(ENGINES),
186 COLUMN_PRIVILEGES => setup_memory_table!(COLUMN_PRIVILEGES),
187 COLUMN_STATISTICS => setup_memory_table!(COLUMN_STATISTICS),
188 BUILD_INFO => setup_memory_table!(BUILD_INFO),
189 CHARACTER_SETS => setup_memory_table!(CHARACTER_SETS),
190 COLLATIONS => setup_memory_table!(COLLATIONS),
191 COLLATION_CHARACTER_SET_APPLICABILITY => {
192 setup_memory_table!(COLLATION_CHARACTER_SET_APPLICABILITY)
193 }
194 CHECK_CONSTRAINTS => setup_memory_table!(CHECK_CONSTRAINTS),
195 EVENTS => setup_memory_table!(EVENTS),
196 FILES => setup_memory_table!(FILES),
197 OPTIMIZER_TRACE => setup_memory_table!(OPTIMIZER_TRACE),
198 PARAMETERS => setup_memory_table!(PARAMETERS),
199 PROFILING => setup_memory_table!(PROFILING),
200 REFERENTIAL_CONSTRAINTS => setup_memory_table!(REFERENTIAL_CONSTRAINTS),
201 ROUTINES => setup_memory_table!(ROUTINES),
202 SCHEMA_PRIVILEGES => setup_memory_table!(SCHEMA_PRIVILEGES),
203 TABLE_PRIVILEGES => setup_memory_table!(TABLE_PRIVILEGES),
204 GLOBAL_STATUS => setup_memory_table!(GLOBAL_STATUS),
205 SESSION_STATUS => setup_memory_table!(SESSION_STATUS),
206 KEY_COLUMN_USAGE => Some(Arc::new(InformationSchemaKeyColumnUsage::new(
207 self.catalog_name.clone(),
208 self.catalog_manager.clone(),
209 )) as _),
210 SCHEMATA => Some(Arc::new(InformationSchemaSchemata::new(
211 self.catalog_name.clone(),
212 self.catalog_manager.clone(),
213 )) as _),
214 PARTITIONS => Some(Arc::new(InformationSchemaPartitions::new(
215 self.catalog_name.clone(),
216 self.catalog_manager.clone(),
217 )) as _),
218 REGION_PEERS => Some(Arc::new(InformationSchemaRegionPeers::new(
219 self.catalog_name.clone(),
220 self.catalog_manager.clone(),
221 )) as _),
222 TABLE_CONSTRAINTS => Some(Arc::new(InformationSchemaTableConstraints::new(
223 self.catalog_name.clone(),
224 self.catalog_manager.clone(),
225 )) as _),
226 CLUSTER_INFO => Some(Arc::new(InformationSchemaClusterInfo::new(
227 self.catalog_manager.clone(),
228 )) as _),
229 VIEWS => Some(Arc::new(InformationSchemaViews::new(
230 self.catalog_name.clone(),
231 self.catalog_manager.clone(),
232 )) as _),
233 FLOWS => Some(Arc::new(InformationSchemaFlows::new(
234 self.catalog_name.clone(),
235 self.catalog_manager.clone(),
236 self.flow_metadata_manager.clone(),
237 )) as _),
238 PROCEDURE_INFO => Some(
239 Arc::new(procedure_info::InformationSchemaProcedureInfo::new(
240 self.catalog_manager.clone(),
241 )) as _,
242 ),
243 REGION_STATISTICS => Some(Arc::new(
244 region_statistics::InformationSchemaRegionStatistics::new(
245 self.catalog_manager.clone(),
246 ),
247 ) as _),
248 REGION_INFO => Some(Arc::new(InformationSchemaRegionInfo::new(
249 self.catalog_manager.clone(),
250 )) as _),
251 PROCESS_LIST => self
252 .process_manager
253 .as_ref()
254 .map(|p| Arc::new(InformationSchemaProcessList::new(p.clone())) as _),
255 SSTS_MANIFEST => Some(Arc::new(InformationSchemaSstsManifest::new(
256 self.catalog_manager.clone(),
257 )) as _),
258 SSTS_STORAGE => Some(Arc::new(InformationSchemaSstsStorage::new(
259 self.catalog_manager.clone(),
260 )) as _),
261 SSTS_INDEX_META => Some(Arc::new(InformationSchemaSstsIndexMeta::new(
262 self.catalog_manager.clone(),
263 )) as _),
264 _ => None,
265 }
266 }
267}
268
269impl InformationSchemaProvider {
270 pub fn new(
271 catalog_name: String,
272 catalog_manager: Weak<dyn CatalogManager>,
273 flow_metadata_manager: Arc<FlowMetadataManager>,
274 process_manager: Option<ProcessManagerRef>,
275 kv_backend: KvBackendRef,
276 ) -> Self {
277 let mut provider = Self {
278 catalog_name,
279 catalog_manager,
280 flow_metadata_manager,
281 process_manager,
282 tables: HashMap::new(),
283 kv_backend,
284 extra_table_factories: HashMap::new(),
285 };
286
287 provider.build_tables();
288
289 provider
290 }
291
292 pub(crate) fn with_extra_table_factories(
293 mut self,
294 factories: HashMap<String, InformationSchemaTableFactoryRef>,
295 ) -> Self {
296 self.extra_table_factories = factories;
297 self.build_tables();
298 self
299 }
300
301 fn build_tables(&mut self) {
302 let mut tables = HashMap::new();
303
304 if self.catalog_name == DEFAULT_CATALOG_NAME {
309 tables.insert(
310 BUILD_INFO.to_string(),
311 self.build_table(BUILD_INFO).unwrap(),
312 );
313 tables.insert(
314 REGION_PEERS.to_string(),
315 self.build_table(REGION_PEERS).unwrap(),
316 );
317 tables.insert(
318 CLUSTER_INFO.to_string(),
319 self.build_table(CLUSTER_INFO).unwrap(),
320 );
321 tables.insert(
322 PROCEDURE_INFO.to_string(),
323 self.build_table(PROCEDURE_INFO).unwrap(),
324 );
325 tables.insert(
326 REGION_STATISTICS.to_string(),
327 self.build_table(REGION_STATISTICS).unwrap(),
328 );
329 tables.insert(
330 REGION_INFO.to_string(),
331 self.build_table(REGION_INFO).unwrap(),
332 );
333 tables.insert(
334 SSTS_MANIFEST.to_string(),
335 self.build_table(SSTS_MANIFEST).unwrap(),
336 );
337 tables.insert(
338 SSTS_STORAGE.to_string(),
339 self.build_table(SSTS_STORAGE).unwrap(),
340 );
341 tables.insert(
342 SSTS_INDEX_META.to_string(),
343 self.build_table(SSTS_INDEX_META).unwrap(),
344 );
345 }
346
347 tables.insert(TABLES.to_string(), self.build_table(TABLES).unwrap());
348 tables.insert(VIEWS.to_string(), self.build_table(VIEWS).unwrap());
349 tables.insert(SCHEMATA.to_string(), self.build_table(SCHEMATA).unwrap());
350 tables.insert(COLUMNS.to_string(), self.build_table(COLUMNS).unwrap());
351 tables.insert(
352 KEY_COLUMN_USAGE.to_string(),
353 self.build_table(KEY_COLUMN_USAGE).unwrap(),
354 );
355 tables.insert(
356 TABLE_CONSTRAINTS.to_string(),
357 self.build_table(TABLE_CONSTRAINTS).unwrap(),
358 );
359 tables.insert(FLOWS.to_string(), self.build_table(FLOWS).unwrap());
360 if let Some(process_list) = self.build_table(PROCESS_LIST) {
361 tables.insert(PROCESS_LIST.to_string(), process_list);
362 }
363 for name in self.extra_table_factories.keys() {
364 tables.insert(name.clone(), self.build_table(name).expect(name));
365 }
366 for name in MEMORY_TABLES.iter() {
368 tables.insert((*name).to_string(), self.build_table(name).expect(name));
369 }
370 self.tables = tables;
371 }
372}
373
374pub trait InformationTable {
375 fn table_id(&self) -> TableId;
376
377 fn table_name(&self) -> &'static str;
378
379 fn schema(&self) -> SchemaRef;
380
381 fn to_stream(&self, request: ScanRequest) -> Result<SendableRecordBatchStream>;
382
383 fn table_type(&self) -> TableType {
384 TableType::Temporary
385 }
386}
387
388impl<T> SystemTable for T
390where
391 T: InformationTable,
392{
393 fn table_id(&self) -> TableId {
394 InformationTable::table_id(self)
395 }
396
397 fn table_name(&self) -> &'static str {
398 InformationTable::table_name(self)
399 }
400
401 fn schema(&self) -> SchemaRef {
402 InformationTable::schema(self)
403 }
404
405 fn table_type(&self) -> TableType {
406 InformationTable::table_type(self)
407 }
408
409 fn to_stream(&self, request: ScanRequest) -> Result<SendableRecordBatchStream> {
410 InformationTable::to_stream(self, request)
411 }
412}
413
414pub type InformationExtensionRef = Arc<dyn InformationExtension<Error = Error> + Send + Sync>;
415
416#[async_trait::async_trait]
418pub trait InformationExtension {
419 type Error: ErrorExt;
420
421 async fn nodes(&self) -> std::result::Result<Vec<NodeInfo>, Self::Error>;
423
424 async fn procedures(&self) -> std::result::Result<Vec<(String, ProcedureInfo)>, Self::Error>;
426
427 async fn region_stats(&self) -> std::result::Result<Vec<RegionStat>, Self::Error>;
429
430 async fn flow_stats(&self) -> std::result::Result<Option<FlowStat>, Self::Error>;
432
433 async fn inspect_datanode(
435 &self,
436 request: DatanodeInspectRequest,
437 ) -> std::result::Result<SendableRecordBatchStream, Self::Error>;
438}
439
440#[derive(Debug, Clone, PartialEq)]
442pub struct DatanodeInspectRequest {
443 pub kind: DatanodeInspectKind,
445
446 pub scan: ScanRequest,
449}
450
451#[derive(Debug, Clone, Copy, PartialEq, Eq)]
453pub enum DatanodeInspectKind {
454 SstManifest,
456 SstStorage,
458 SstIndexMeta,
460 RegionInfo,
462}
463
464impl DatanodeInspectRequest {
465 pub fn build_plan(self) -> std::result::Result<LogicalPlan, DataFusionError> {
467 match self.kind {
468 DatanodeInspectKind::SstManifest => ManifestSstEntry::build_plan(self.scan),
469 DatanodeInspectKind::SstStorage => StorageSstEntry::build_plan(self.scan),
470 DatanodeInspectKind::SstIndexMeta => PuffinIndexMetaEntry::build_plan(self.scan),
471 DatanodeInspectKind::RegionInfo => RegionInfoEntry::build_plan(self.scan),
472 }
473 }
474}
475pub struct NoopInformationExtension;
476
477#[async_trait::async_trait]
478impl InformationExtension for NoopInformationExtension {
479 type Error = Error;
480
481 async fn nodes(&self) -> std::result::Result<Vec<NodeInfo>, Self::Error> {
482 Ok(vec![])
483 }
484
485 async fn procedures(&self) -> std::result::Result<Vec<(String, ProcedureInfo)>, Self::Error> {
486 Ok(vec![])
487 }
488
489 async fn region_stats(&self) -> std::result::Result<Vec<RegionStat>, Self::Error> {
490 Ok(vec![])
491 }
492
493 async fn flow_stats(&self) -> std::result::Result<Option<FlowStat>, Self::Error> {
494 Ok(None)
495 }
496
497 async fn inspect_datanode(
498 &self,
499 _request: DatanodeInspectRequest,
500 ) -> std::result::Result<SendableRecordBatchStream, Self::Error> {
501 Ok(common_recordbatch::RecordBatches::empty().as_stream())
502 }
503}
504
505#[cfg(test)]
506mod tests {
507 use store_api::region_info::RegionInfoEntry;
508
509 use super::*;
510
511 #[test]
512 fn test_datanode_inspect_region_info_build_plan() {
513 let plan = DatanodeInspectRequest {
514 kind: DatanodeInspectKind::RegionInfo,
515 scan: ScanRequest::default(),
516 }
517 .build_plan()
518 .unwrap();
519
520 let LogicalPlan::TableScan(scan) = plan else {
521 panic!("expected table scan");
522 };
523 assert_eq!(
524 scan.table_name.to_string(),
525 RegionInfoEntry::reserved_table_name_for_inspection()
526 );
527 }
528}