1mod cluster_info;
16pub mod columns;
17pub mod flows;
18mod information_memory_table;
19pub mod key_column_usage;
20mod partitions;
21mod procedure_info;
22pub mod process_list;
23pub mod region_peers;
24mod region_statistics;
25mod runtime_metrics;
26pub mod schemata;
27mod table_constraints;
28mod table_names;
29pub mod tables;
30mod views;
31
32use std::collections::HashMap;
33use std::sync::{Arc, Weak};
34
35use common_catalog::consts::{self, DEFAULT_CATALOG_NAME, INFORMATION_SCHEMA_NAME};
36use common_error::ext::ErrorExt;
37use common_meta::cluster::NodeInfo;
38use common_meta::datanode::RegionStat;
39use common_meta::key::flow::FlowMetadataManager;
40use common_meta::key::flow::flow_state::FlowStat;
41use common_meta::kv_backend::KvBackendRef;
42use common_procedure::ProcedureInfo;
43use common_recordbatch::SendableRecordBatchStream;
44use datafusion::error::DataFusionError;
45use datafusion::logical_expr::LogicalPlan;
46use datatypes::schema::SchemaRef;
47use lazy_static::lazy_static;
48use paste::paste;
49use process_list::InformationSchemaProcessList;
50use store_api::sst_entry::{ManifestSstEntry, StorageSstEntry};
51use store_api::storage::{ScanRequest, TableId};
52use table::TableRef;
53use table::metadata::TableType;
54pub use table_names::*;
55use views::InformationSchemaViews;
56
57use self::columns::InformationSchemaColumns;
58use crate::CatalogManager;
59use crate::error::{Error, Result};
60use crate::process_manager::ProcessManagerRef;
61use crate::system_schema::information_schema::cluster_info::InformationSchemaClusterInfo;
62use crate::system_schema::information_schema::flows::InformationSchemaFlows;
63use crate::system_schema::information_schema::information_memory_table::get_schema_columns;
64use crate::system_schema::information_schema::key_column_usage::InformationSchemaKeyColumnUsage;
65use crate::system_schema::information_schema::partitions::InformationSchemaPartitions;
66use crate::system_schema::information_schema::region_peers::InformationSchemaRegionPeers;
67use crate::system_schema::information_schema::runtime_metrics::InformationSchemaMetrics;
68use crate::system_schema::information_schema::schemata::InformationSchemaSchemata;
69use crate::system_schema::information_schema::table_constraints::InformationSchemaTableConstraints;
70use crate::system_schema::information_schema::tables::InformationSchemaTables;
71use crate::system_schema::memory_table::MemoryTable;
72pub(crate) use crate::system_schema::predicate::Predicates;
73use crate::system_schema::{
74 SystemSchemaProvider, SystemSchemaProviderInner, SystemTable, SystemTableRef,
75};
76
77lazy_static! {
78 static ref MEMORY_TABLES: &'static [&'static str] = &[
80 ENGINES,
81 COLUMN_PRIVILEGES,
82 COLUMN_STATISTICS,
83 CHARACTER_SETS,
84 COLLATIONS,
85 COLLATION_CHARACTER_SET_APPLICABILITY,
86 CHECK_CONSTRAINTS,
87 EVENTS,
88 FILES,
89 OPTIMIZER_TRACE,
90 PARAMETERS,
91 PROFILING,
92 REFERENTIAL_CONSTRAINTS,
93 ROUTINES,
94 SCHEMA_PRIVILEGES,
95 TABLE_PRIVILEGES,
96 TRIGGERS,
97 GLOBAL_STATUS,
98 SESSION_STATUS,
99 PARTITIONS,
100 ];
101}
102
103macro_rules! setup_memory_table {
104 ($name: expr) => {
105 paste! {
106 {
107 let (schema, columns) = get_schema_columns($name);
108 Some(Arc::new(MemoryTable::new(
109 consts::[<INFORMATION_SCHEMA_ $name _TABLE_ID>],
110 $name,
111 schema,
112 columns
113 )) as _)
114 }
115 }
116 };
117}
118
119#[cfg(feature = "enterprise")]
120pub struct MakeInformationTableRequest {
121 pub catalog_name: String,
122 pub catalog_manager: Weak<dyn CatalogManager>,
123 pub kv_backend: KvBackendRef,
124}
125
126#[cfg(feature = "enterprise")]
131pub trait InformationSchemaTableFactory {
132 fn make_information_table(&self, req: MakeInformationTableRequest) -> SystemTableRef;
133}
134
135#[cfg(feature = "enterprise")]
136pub type InformationSchemaTableFactoryRef = Arc<dyn InformationSchemaTableFactory + Send + Sync>;
137
138pub struct InformationSchemaProvider {
140 catalog_name: String,
141 catalog_manager: Weak<dyn CatalogManager>,
142 process_manager: Option<ProcessManagerRef>,
143 flow_metadata_manager: Arc<FlowMetadataManager>,
144 tables: HashMap<String, TableRef>,
145 #[allow(dead_code)]
146 kv_backend: KvBackendRef,
147 #[cfg(feature = "enterprise")]
148 extra_table_factories: HashMap<String, InformationSchemaTableFactoryRef>,
149}
150
151impl SystemSchemaProvider for InformationSchemaProvider {
152 fn tables(&self) -> &HashMap<String, TableRef> {
153 assert!(!self.tables.is_empty());
154
155 &self.tables
156 }
157}
158
159impl SystemSchemaProviderInner for InformationSchemaProvider {
160 fn catalog_name(&self) -> &str {
161 &self.catalog_name
162 }
163 fn schema_name() -> &'static str {
164 INFORMATION_SCHEMA_NAME
165 }
166
167 fn system_table(&self, name: &str) -> Option<SystemTableRef> {
168 #[cfg(feature = "enterprise")]
169 if let Some(factory) = self.extra_table_factories.get(name) {
170 let req = MakeInformationTableRequest {
171 catalog_name: self.catalog_name.clone(),
172 catalog_manager: self.catalog_manager.clone(),
173 kv_backend: self.kv_backend.clone(),
174 };
175 return Some(factory.make_information_table(req));
176 }
177
178 match name.to_ascii_lowercase().as_str() {
179 TABLES => Some(Arc::new(InformationSchemaTables::new(
180 self.catalog_name.clone(),
181 self.catalog_manager.clone(),
182 )) as _),
183 COLUMNS => Some(Arc::new(InformationSchemaColumns::new(
184 self.catalog_name.clone(),
185 self.catalog_manager.clone(),
186 )) as _),
187 ENGINES => setup_memory_table!(ENGINES),
188 COLUMN_PRIVILEGES => setup_memory_table!(COLUMN_PRIVILEGES),
189 COLUMN_STATISTICS => setup_memory_table!(COLUMN_STATISTICS),
190 BUILD_INFO => setup_memory_table!(BUILD_INFO),
191 CHARACTER_SETS => setup_memory_table!(CHARACTER_SETS),
192 COLLATIONS => setup_memory_table!(COLLATIONS),
193 COLLATION_CHARACTER_SET_APPLICABILITY => {
194 setup_memory_table!(COLLATION_CHARACTER_SET_APPLICABILITY)
195 }
196 CHECK_CONSTRAINTS => setup_memory_table!(CHECK_CONSTRAINTS),
197 EVENTS => setup_memory_table!(EVENTS),
198 FILES => setup_memory_table!(FILES),
199 OPTIMIZER_TRACE => setup_memory_table!(OPTIMIZER_TRACE),
200 PARAMETERS => setup_memory_table!(PARAMETERS),
201 PROFILING => setup_memory_table!(PROFILING),
202 REFERENTIAL_CONSTRAINTS => setup_memory_table!(REFERENTIAL_CONSTRAINTS),
203 ROUTINES => setup_memory_table!(ROUTINES),
204 SCHEMA_PRIVILEGES => setup_memory_table!(SCHEMA_PRIVILEGES),
205 TABLE_PRIVILEGES => setup_memory_table!(TABLE_PRIVILEGES),
206 TRIGGERS => setup_memory_table!(TRIGGERS),
207 GLOBAL_STATUS => setup_memory_table!(GLOBAL_STATUS),
208 SESSION_STATUS => setup_memory_table!(SESSION_STATUS),
209 KEY_COLUMN_USAGE => Some(Arc::new(InformationSchemaKeyColumnUsage::new(
210 self.catalog_name.clone(),
211 self.catalog_manager.clone(),
212 )) as _),
213 SCHEMATA => Some(Arc::new(InformationSchemaSchemata::new(
214 self.catalog_name.clone(),
215 self.catalog_manager.clone(),
216 )) as _),
217 RUNTIME_METRICS => Some(Arc::new(InformationSchemaMetrics::new())),
218 PARTITIONS => Some(Arc::new(InformationSchemaPartitions::new(
219 self.catalog_name.clone(),
220 self.catalog_manager.clone(),
221 )) as _),
222 REGION_PEERS => Some(Arc::new(InformationSchemaRegionPeers::new(
223 self.catalog_name.clone(),
224 self.catalog_manager.clone(),
225 )) as _),
226 TABLE_CONSTRAINTS => Some(Arc::new(InformationSchemaTableConstraints::new(
227 self.catalog_name.clone(),
228 self.catalog_manager.clone(),
229 )) as _),
230 CLUSTER_INFO => Some(Arc::new(InformationSchemaClusterInfo::new(
231 self.catalog_manager.clone(),
232 )) as _),
233 VIEWS => Some(Arc::new(InformationSchemaViews::new(
234 self.catalog_name.clone(),
235 self.catalog_manager.clone(),
236 )) as _),
237 FLOWS => Some(Arc::new(InformationSchemaFlows::new(
238 self.catalog_name.clone(),
239 self.catalog_manager.clone(),
240 self.flow_metadata_manager.clone(),
241 )) as _),
242 PROCEDURE_INFO => Some(
243 Arc::new(procedure_info::InformationSchemaProcedureInfo::new(
244 self.catalog_manager.clone(),
245 )) as _,
246 ),
247 REGION_STATISTICS => Some(Arc::new(
248 region_statistics::InformationSchemaRegionStatistics::new(
249 self.catalog_manager.clone(),
250 ),
251 ) as _),
252 PROCESS_LIST => self
253 .process_manager
254 .as_ref()
255 .map(|p| Arc::new(InformationSchemaProcessList::new(p.clone())) as _),
256 _ => None,
257 }
258 }
259}
260
261impl InformationSchemaProvider {
262 pub fn new(
263 catalog_name: String,
264 catalog_manager: Weak<dyn CatalogManager>,
265 flow_metadata_manager: Arc<FlowMetadataManager>,
266 process_manager: Option<ProcessManagerRef>,
267 kv_backend: KvBackendRef,
268 ) -> Self {
269 let mut provider = Self {
270 catalog_name,
271 catalog_manager,
272 flow_metadata_manager,
273 process_manager,
274 tables: HashMap::new(),
275 kv_backend,
276 #[cfg(feature = "enterprise")]
277 extra_table_factories: HashMap::new(),
278 };
279
280 provider.build_tables();
281
282 provider
283 }
284
285 #[cfg(feature = "enterprise")]
286 pub(crate) fn with_extra_table_factories(
287 mut self,
288 factories: HashMap<String, InformationSchemaTableFactoryRef>,
289 ) -> Self {
290 self.extra_table_factories = factories;
291 self.build_tables();
292 self
293 }
294
295 fn build_tables(&mut self) {
296 let mut tables = HashMap::new();
297
298 if self.catalog_name == DEFAULT_CATALOG_NAME {
303 tables.insert(
304 RUNTIME_METRICS.to_string(),
305 self.build_table(RUNTIME_METRICS).unwrap(),
306 );
307 tables.insert(
308 BUILD_INFO.to_string(),
309 self.build_table(BUILD_INFO).unwrap(),
310 );
311 tables.insert(
312 REGION_PEERS.to_string(),
313 self.build_table(REGION_PEERS).unwrap(),
314 );
315 tables.insert(
316 CLUSTER_INFO.to_string(),
317 self.build_table(CLUSTER_INFO).unwrap(),
318 );
319 tables.insert(
320 PROCEDURE_INFO.to_string(),
321 self.build_table(PROCEDURE_INFO).unwrap(),
322 );
323 tables.insert(
324 REGION_STATISTICS.to_string(),
325 self.build_table(REGION_STATISTICS).unwrap(),
326 );
327 }
328
329 tables.insert(TABLES.to_string(), self.build_table(TABLES).unwrap());
330 tables.insert(VIEWS.to_string(), self.build_table(VIEWS).unwrap());
331 tables.insert(SCHEMATA.to_string(), self.build_table(SCHEMATA).unwrap());
332 tables.insert(COLUMNS.to_string(), self.build_table(COLUMNS).unwrap());
333 tables.insert(
334 KEY_COLUMN_USAGE.to_string(),
335 self.build_table(KEY_COLUMN_USAGE).unwrap(),
336 );
337 tables.insert(
338 TABLE_CONSTRAINTS.to_string(),
339 self.build_table(TABLE_CONSTRAINTS).unwrap(),
340 );
341 tables.insert(FLOWS.to_string(), self.build_table(FLOWS).unwrap());
342 if let Some(process_list) = self.build_table(PROCESS_LIST) {
343 tables.insert(PROCESS_LIST.to_string(), process_list);
344 }
345 #[cfg(feature = "enterprise")]
346 for name in self.extra_table_factories.keys() {
347 tables.insert(name.to_string(), self.build_table(name).expect(name));
348 }
349 for name in MEMORY_TABLES.iter() {
351 tables.insert((*name).to_string(), self.build_table(name).expect(name));
352 }
353 self.tables = tables;
354 }
355}
356
357pub trait InformationTable {
358 fn table_id(&self) -> TableId;
359
360 fn table_name(&self) -> &'static str;
361
362 fn schema(&self) -> SchemaRef;
363
364 fn to_stream(&self, request: ScanRequest) -> Result<SendableRecordBatchStream>;
365
366 fn table_type(&self) -> TableType {
367 TableType::Temporary
368 }
369}
370
371impl<T> SystemTable for T
373where
374 T: InformationTable,
375{
376 fn table_id(&self) -> TableId {
377 InformationTable::table_id(self)
378 }
379
380 fn table_name(&self) -> &'static str {
381 InformationTable::table_name(self)
382 }
383
384 fn schema(&self) -> SchemaRef {
385 InformationTable::schema(self)
386 }
387
388 fn table_type(&self) -> TableType {
389 InformationTable::table_type(self)
390 }
391
392 fn to_stream(&self, request: ScanRequest) -> Result<SendableRecordBatchStream> {
393 InformationTable::to_stream(self, request)
394 }
395}
396
397pub type InformationExtensionRef = Arc<dyn InformationExtension<Error = Error> + Send + Sync>;
398
399#[async_trait::async_trait]
401pub trait InformationExtension {
402 type Error: ErrorExt;
403
404 async fn nodes(&self) -> std::result::Result<Vec<NodeInfo>, Self::Error>;
406
407 async fn procedures(&self) -> std::result::Result<Vec<(String, ProcedureInfo)>, Self::Error>;
409
410 async fn region_stats(&self) -> std::result::Result<Vec<RegionStat>, Self::Error>;
412
413 async fn flow_stats(&self) -> std::result::Result<Option<FlowStat>, Self::Error>;
415
416 async fn inspect_datanode(
418 &self,
419 request: DatanodeInspectRequest,
420 ) -> std::result::Result<SendableRecordBatchStream, Self::Error>;
421}
422
423#[derive(Debug, Clone, PartialEq, Eq)]
425pub struct DatanodeInspectRequest {
426 pub kind: DatanodeInspectKind,
428
429 pub scan: ScanRequest,
432}
433
434#[derive(Debug, Clone, Copy, PartialEq, Eq)]
436pub enum DatanodeInspectKind {
437 SstManifest,
439 SstStorage,
441}
442
443impl DatanodeInspectRequest {
444 pub fn build_plan(self) -> std::result::Result<LogicalPlan, DataFusionError> {
446 match self.kind {
447 DatanodeInspectKind::SstManifest => ManifestSstEntry::build_plan(self.scan),
448 DatanodeInspectKind::SstStorage => StorageSstEntry::build_plan(self.scan),
449 }
450 }
451}
452pub struct NoopInformationExtension;
453
454#[async_trait::async_trait]
455impl InformationExtension for NoopInformationExtension {
456 type Error = Error;
457
458 async fn nodes(&self) -> std::result::Result<Vec<NodeInfo>, Self::Error> {
459 Ok(vec![])
460 }
461
462 async fn procedures(&self) -> std::result::Result<Vec<(String, ProcedureInfo)>, Self::Error> {
463 Ok(vec![])
464 }
465
466 async fn region_stats(&self) -> std::result::Result<Vec<RegionStat>, Self::Error> {
467 Ok(vec![])
468 }
469
470 async fn flow_stats(&self) -> std::result::Result<Option<FlowStat>, Self::Error> {
471 Ok(None)
472 }
473
474 async fn inspect_datanode(
475 &self,
476 _request: DatanodeInspectRequest,
477 ) -> std::result::Result<SendableRecordBatchStream, Self::Error> {
478 Ok(common_recordbatch::RecordBatches::empty().as_stream())
479 }
480}