1mod cluster_info;
16pub mod columns;
17pub mod flows;
18mod information_memory_table;
19pub mod key_column_usage;
20mod partitions;
21mod procedure_info;
22pub mod process_list;
23pub mod region_peers;
24mod region_statistics;
25mod runtime_metrics;
26pub mod schemata;
27mod ssts;
28mod table_constraints;
29mod table_names;
30pub mod tables;
31mod views;
32
33use std::collections::HashMap;
34use std::sync::{Arc, Weak};
35
36use common_catalog::consts::{self, DEFAULT_CATALOG_NAME, INFORMATION_SCHEMA_NAME};
37use common_error::ext::ErrorExt;
38use common_meta::cluster::NodeInfo;
39use common_meta::datanode::RegionStat;
40use common_meta::key::flow::FlowMetadataManager;
41use common_meta::key::flow::flow_state::FlowStat;
42use common_meta::kv_backend::KvBackendRef;
43use common_procedure::ProcedureInfo;
44use common_recordbatch::SendableRecordBatchStream;
45use datafusion::error::DataFusionError;
46use datafusion::logical_expr::LogicalPlan;
47use datatypes::schema::SchemaRef;
48use lazy_static::lazy_static;
49use paste::paste;
50use process_list::InformationSchemaProcessList;
51use store_api::sst_entry::{ManifestSstEntry, StorageSstEntry};
52use store_api::storage::{ScanRequest, TableId};
53use table::TableRef;
54use table::metadata::TableType;
55pub use table_names::*;
56use views::InformationSchemaViews;
57
58use self::columns::InformationSchemaColumns;
59use crate::CatalogManager;
60use crate::error::{Error, Result};
61use crate::process_manager::ProcessManagerRef;
62use crate::system_schema::information_schema::cluster_info::InformationSchemaClusterInfo;
63use crate::system_schema::information_schema::flows::InformationSchemaFlows;
64use crate::system_schema::information_schema::information_memory_table::get_schema_columns;
65use crate::system_schema::information_schema::key_column_usage::InformationSchemaKeyColumnUsage;
66use crate::system_schema::information_schema::partitions::InformationSchemaPartitions;
67use crate::system_schema::information_schema::region_peers::InformationSchemaRegionPeers;
68use crate::system_schema::information_schema::runtime_metrics::InformationSchemaMetrics;
69use crate::system_schema::information_schema::schemata::InformationSchemaSchemata;
70use crate::system_schema::information_schema::ssts::{
71 InformationSchemaSstsManifest, InformationSchemaSstsStorage,
72};
73use crate::system_schema::information_schema::table_constraints::InformationSchemaTableConstraints;
74use crate::system_schema::information_schema::tables::InformationSchemaTables;
75use crate::system_schema::memory_table::MemoryTable;
76pub(crate) use crate::system_schema::predicate::Predicates;
77use crate::system_schema::{
78 SystemSchemaProvider, SystemSchemaProviderInner, SystemTable, SystemTableRef,
79};
80
81lazy_static! {
82 static ref MEMORY_TABLES: &'static [&'static str] = &[
84 ENGINES,
85 COLUMN_PRIVILEGES,
86 COLUMN_STATISTICS,
87 CHARACTER_SETS,
88 COLLATIONS,
89 COLLATION_CHARACTER_SET_APPLICABILITY,
90 CHECK_CONSTRAINTS,
91 EVENTS,
92 FILES,
93 OPTIMIZER_TRACE,
94 PARAMETERS,
95 PROFILING,
96 REFERENTIAL_CONSTRAINTS,
97 ROUTINES,
98 SCHEMA_PRIVILEGES,
99 TABLE_PRIVILEGES,
100 TRIGGERS,
101 GLOBAL_STATUS,
102 SESSION_STATUS,
103 PARTITIONS,
104 ];
105}
106
107macro_rules! setup_memory_table {
108 ($name: expr) => {
109 paste! {
110 {
111 let (schema, columns) = get_schema_columns($name);
112 Some(Arc::new(MemoryTable::new(
113 consts::[<INFORMATION_SCHEMA_ $name _TABLE_ID>],
114 $name,
115 schema,
116 columns
117 )) as _)
118 }
119 }
120 };
121}
122
123#[cfg(feature = "enterprise")]
124pub struct MakeInformationTableRequest {
125 pub catalog_name: String,
126 pub catalog_manager: Weak<dyn CatalogManager>,
127 pub kv_backend: KvBackendRef,
128}
129
130#[cfg(feature = "enterprise")]
135pub trait InformationSchemaTableFactory {
136 fn make_information_table(&self, req: MakeInformationTableRequest) -> SystemTableRef;
137}
138
139#[cfg(feature = "enterprise")]
140pub type InformationSchemaTableFactoryRef = Arc<dyn InformationSchemaTableFactory + Send + Sync>;
141
142pub struct InformationSchemaProvider {
144 catalog_name: String,
145 catalog_manager: Weak<dyn CatalogManager>,
146 process_manager: Option<ProcessManagerRef>,
147 flow_metadata_manager: Arc<FlowMetadataManager>,
148 tables: HashMap<String, TableRef>,
149 #[allow(dead_code)]
150 kv_backend: KvBackendRef,
151 #[cfg(feature = "enterprise")]
152 extra_table_factories: HashMap<String, InformationSchemaTableFactoryRef>,
153}
154
155impl SystemSchemaProvider for InformationSchemaProvider {
156 fn tables(&self) -> &HashMap<String, TableRef> {
157 assert!(!self.tables.is_empty());
158
159 &self.tables
160 }
161}
162
163impl SystemSchemaProviderInner for InformationSchemaProvider {
164 fn catalog_name(&self) -> &str {
165 &self.catalog_name
166 }
167 fn schema_name() -> &'static str {
168 INFORMATION_SCHEMA_NAME
169 }
170
171 fn system_table(&self, name: &str) -> Option<SystemTableRef> {
172 #[cfg(feature = "enterprise")]
173 if let Some(factory) = self.extra_table_factories.get(name) {
174 let req = MakeInformationTableRequest {
175 catalog_name: self.catalog_name.clone(),
176 catalog_manager: self.catalog_manager.clone(),
177 kv_backend: self.kv_backend.clone(),
178 };
179 return Some(factory.make_information_table(req));
180 }
181
182 match name.to_ascii_lowercase().as_str() {
183 TABLES => Some(Arc::new(InformationSchemaTables::new(
184 self.catalog_name.clone(),
185 self.catalog_manager.clone(),
186 )) as _),
187 COLUMNS => Some(Arc::new(InformationSchemaColumns::new(
188 self.catalog_name.clone(),
189 self.catalog_manager.clone(),
190 )) as _),
191 ENGINES => setup_memory_table!(ENGINES),
192 COLUMN_PRIVILEGES => setup_memory_table!(COLUMN_PRIVILEGES),
193 COLUMN_STATISTICS => setup_memory_table!(COLUMN_STATISTICS),
194 BUILD_INFO => setup_memory_table!(BUILD_INFO),
195 CHARACTER_SETS => setup_memory_table!(CHARACTER_SETS),
196 COLLATIONS => setup_memory_table!(COLLATIONS),
197 COLLATION_CHARACTER_SET_APPLICABILITY => {
198 setup_memory_table!(COLLATION_CHARACTER_SET_APPLICABILITY)
199 }
200 CHECK_CONSTRAINTS => setup_memory_table!(CHECK_CONSTRAINTS),
201 EVENTS => setup_memory_table!(EVENTS),
202 FILES => setup_memory_table!(FILES),
203 OPTIMIZER_TRACE => setup_memory_table!(OPTIMIZER_TRACE),
204 PARAMETERS => setup_memory_table!(PARAMETERS),
205 PROFILING => setup_memory_table!(PROFILING),
206 REFERENTIAL_CONSTRAINTS => setup_memory_table!(REFERENTIAL_CONSTRAINTS),
207 ROUTINES => setup_memory_table!(ROUTINES),
208 SCHEMA_PRIVILEGES => setup_memory_table!(SCHEMA_PRIVILEGES),
209 TABLE_PRIVILEGES => setup_memory_table!(TABLE_PRIVILEGES),
210 TRIGGERS => setup_memory_table!(TRIGGERS),
211 GLOBAL_STATUS => setup_memory_table!(GLOBAL_STATUS),
212 SESSION_STATUS => setup_memory_table!(SESSION_STATUS),
213 KEY_COLUMN_USAGE => Some(Arc::new(InformationSchemaKeyColumnUsage::new(
214 self.catalog_name.clone(),
215 self.catalog_manager.clone(),
216 )) as _),
217 SCHEMATA => Some(Arc::new(InformationSchemaSchemata::new(
218 self.catalog_name.clone(),
219 self.catalog_manager.clone(),
220 )) as _),
221 RUNTIME_METRICS => Some(Arc::new(InformationSchemaMetrics::new())),
222 PARTITIONS => Some(Arc::new(InformationSchemaPartitions::new(
223 self.catalog_name.clone(),
224 self.catalog_manager.clone(),
225 )) as _),
226 REGION_PEERS => Some(Arc::new(InformationSchemaRegionPeers::new(
227 self.catalog_name.clone(),
228 self.catalog_manager.clone(),
229 )) as _),
230 TABLE_CONSTRAINTS => Some(Arc::new(InformationSchemaTableConstraints::new(
231 self.catalog_name.clone(),
232 self.catalog_manager.clone(),
233 )) as _),
234 CLUSTER_INFO => Some(Arc::new(InformationSchemaClusterInfo::new(
235 self.catalog_manager.clone(),
236 )) as _),
237 VIEWS => Some(Arc::new(InformationSchemaViews::new(
238 self.catalog_name.clone(),
239 self.catalog_manager.clone(),
240 )) as _),
241 FLOWS => Some(Arc::new(InformationSchemaFlows::new(
242 self.catalog_name.clone(),
243 self.catalog_manager.clone(),
244 self.flow_metadata_manager.clone(),
245 )) as _),
246 PROCEDURE_INFO => Some(
247 Arc::new(procedure_info::InformationSchemaProcedureInfo::new(
248 self.catalog_manager.clone(),
249 )) as _,
250 ),
251 REGION_STATISTICS => Some(Arc::new(
252 region_statistics::InformationSchemaRegionStatistics::new(
253 self.catalog_manager.clone(),
254 ),
255 ) as _),
256 PROCESS_LIST => self
257 .process_manager
258 .as_ref()
259 .map(|p| Arc::new(InformationSchemaProcessList::new(p.clone())) as _),
260 SSTS_MANIFEST => Some(Arc::new(InformationSchemaSstsManifest::new(
261 self.catalog_manager.clone(),
262 )) as _),
263 SSTS_STORAGE => Some(Arc::new(InformationSchemaSstsStorage::new(
264 self.catalog_manager.clone(),
265 )) as _),
266 _ => None,
267 }
268 }
269}
270
271impl InformationSchemaProvider {
272 pub fn new(
273 catalog_name: String,
274 catalog_manager: Weak<dyn CatalogManager>,
275 flow_metadata_manager: Arc<FlowMetadataManager>,
276 process_manager: Option<ProcessManagerRef>,
277 kv_backend: KvBackendRef,
278 ) -> Self {
279 let mut provider = Self {
280 catalog_name,
281 catalog_manager,
282 flow_metadata_manager,
283 process_manager,
284 tables: HashMap::new(),
285 kv_backend,
286 #[cfg(feature = "enterprise")]
287 extra_table_factories: HashMap::new(),
288 };
289
290 provider.build_tables();
291
292 provider
293 }
294
295 #[cfg(feature = "enterprise")]
296 pub(crate) fn with_extra_table_factories(
297 mut self,
298 factories: HashMap<String, InformationSchemaTableFactoryRef>,
299 ) -> Self {
300 self.extra_table_factories = factories;
301 self.build_tables();
302 self
303 }
304
305 fn build_tables(&mut self) {
306 let mut tables = HashMap::new();
307
308 if self.catalog_name == DEFAULT_CATALOG_NAME {
313 tables.insert(
314 RUNTIME_METRICS.to_string(),
315 self.build_table(RUNTIME_METRICS).unwrap(),
316 );
317 tables.insert(
318 BUILD_INFO.to_string(),
319 self.build_table(BUILD_INFO).unwrap(),
320 );
321 tables.insert(
322 REGION_PEERS.to_string(),
323 self.build_table(REGION_PEERS).unwrap(),
324 );
325 tables.insert(
326 CLUSTER_INFO.to_string(),
327 self.build_table(CLUSTER_INFO).unwrap(),
328 );
329 tables.insert(
330 PROCEDURE_INFO.to_string(),
331 self.build_table(PROCEDURE_INFO).unwrap(),
332 );
333 tables.insert(
334 REGION_STATISTICS.to_string(),
335 self.build_table(REGION_STATISTICS).unwrap(),
336 );
337 tables.insert(
338 SSTS_MANIFEST.to_string(),
339 self.build_table(SSTS_MANIFEST).unwrap(),
340 );
341 tables.insert(
342 SSTS_STORAGE.to_string(),
343 self.build_table(SSTS_STORAGE).unwrap(),
344 );
345 }
346
347 tables.insert(TABLES.to_string(), self.build_table(TABLES).unwrap());
348 tables.insert(VIEWS.to_string(), self.build_table(VIEWS).unwrap());
349 tables.insert(SCHEMATA.to_string(), self.build_table(SCHEMATA).unwrap());
350 tables.insert(COLUMNS.to_string(), self.build_table(COLUMNS).unwrap());
351 tables.insert(
352 KEY_COLUMN_USAGE.to_string(),
353 self.build_table(KEY_COLUMN_USAGE).unwrap(),
354 );
355 tables.insert(
356 TABLE_CONSTRAINTS.to_string(),
357 self.build_table(TABLE_CONSTRAINTS).unwrap(),
358 );
359 tables.insert(FLOWS.to_string(), self.build_table(FLOWS).unwrap());
360 if let Some(process_list) = self.build_table(PROCESS_LIST) {
361 tables.insert(PROCESS_LIST.to_string(), process_list);
362 }
363 #[cfg(feature = "enterprise")]
364 for name in self.extra_table_factories.keys() {
365 tables.insert(name.clone(), self.build_table(name).expect(name));
366 }
367 for name in MEMORY_TABLES.iter() {
369 tables.insert((*name).to_string(), self.build_table(name).expect(name));
370 }
371 self.tables = tables;
372 }
373}
374
375pub trait InformationTable {
376 fn table_id(&self) -> TableId;
377
378 fn table_name(&self) -> &'static str;
379
380 fn schema(&self) -> SchemaRef;
381
382 fn to_stream(&self, request: ScanRequest) -> Result<SendableRecordBatchStream>;
383
384 fn table_type(&self) -> TableType {
385 TableType::Temporary
386 }
387}
388
389impl<T> SystemTable for T
391where
392 T: InformationTable,
393{
394 fn table_id(&self) -> TableId {
395 InformationTable::table_id(self)
396 }
397
398 fn table_name(&self) -> &'static str {
399 InformationTable::table_name(self)
400 }
401
402 fn schema(&self) -> SchemaRef {
403 InformationTable::schema(self)
404 }
405
406 fn table_type(&self) -> TableType {
407 InformationTable::table_type(self)
408 }
409
410 fn to_stream(&self, request: ScanRequest) -> Result<SendableRecordBatchStream> {
411 InformationTable::to_stream(self, request)
412 }
413}
414
415pub type InformationExtensionRef = Arc<dyn InformationExtension<Error = Error> + Send + Sync>;
416
417#[async_trait::async_trait]
419pub trait InformationExtension {
420 type Error: ErrorExt;
421
422 async fn nodes(&self) -> std::result::Result<Vec<NodeInfo>, Self::Error>;
424
425 async fn procedures(&self) -> std::result::Result<Vec<(String, ProcedureInfo)>, Self::Error>;
427
428 async fn region_stats(&self) -> std::result::Result<Vec<RegionStat>, Self::Error>;
430
431 async fn flow_stats(&self) -> std::result::Result<Option<FlowStat>, Self::Error>;
433
434 async fn inspect_datanode(
436 &self,
437 request: DatanodeInspectRequest,
438 ) -> std::result::Result<SendableRecordBatchStream, Self::Error>;
439}
440
441#[derive(Debug, Clone, PartialEq, Eq)]
443pub struct DatanodeInspectRequest {
444 pub kind: DatanodeInspectKind,
446
447 pub scan: ScanRequest,
450}
451
452#[derive(Debug, Clone, Copy, PartialEq, Eq)]
454pub enum DatanodeInspectKind {
455 SstManifest,
457 SstStorage,
459}
460
461impl DatanodeInspectRequest {
462 pub fn build_plan(self) -> std::result::Result<LogicalPlan, DataFusionError> {
464 match self.kind {
465 DatanodeInspectKind::SstManifest => ManifestSstEntry::build_plan(self.scan),
466 DatanodeInspectKind::SstStorage => StorageSstEntry::build_plan(self.scan),
467 }
468 }
469}
470pub struct NoopInformationExtension;
471
472#[async_trait::async_trait]
473impl InformationExtension for NoopInformationExtension {
474 type Error = Error;
475
476 async fn nodes(&self) -> std::result::Result<Vec<NodeInfo>, Self::Error> {
477 Ok(vec![])
478 }
479
480 async fn procedures(&self) -> std::result::Result<Vec<(String, ProcedureInfo)>, Self::Error> {
481 Ok(vec![])
482 }
483
484 async fn region_stats(&self) -> std::result::Result<Vec<RegionStat>, Self::Error> {
485 Ok(vec![])
486 }
487
488 async fn flow_stats(&self) -> std::result::Result<Option<FlowStat>, Self::Error> {
489 Ok(None)
490 }
491
492 async fn inspect_datanode(
493 &self,
494 _request: DatanodeInspectRequest,
495 ) -> std::result::Result<SendableRecordBatchStream, Self::Error> {
496 Ok(common_recordbatch::RecordBatches::empty().as_stream())
497 }
498}