1mod cluster_info;
16pub mod columns;
17pub mod flows;
18mod information_memory_table;
19pub mod key_column_usage;
20mod partitions;
21mod procedure_info;
22pub mod process_list;
23pub mod region_peers;
24mod region_statistics;
25pub mod schemata;
26mod ssts;
27mod table_constraints;
28mod table_names;
29pub mod tables;
30mod views;
31
32use std::collections::HashMap;
33use std::sync::{Arc, Weak};
34
35use common_catalog::consts::{self, DEFAULT_CATALOG_NAME, INFORMATION_SCHEMA_NAME};
36use common_error::ext::ErrorExt;
37use common_meta::cluster::NodeInfo;
38use common_meta::datanode::RegionStat;
39use common_meta::key::flow::FlowMetadataManager;
40use common_meta::key::flow::flow_state::FlowStat;
41use common_meta::kv_backend::KvBackendRef;
42use common_procedure::ProcedureInfo;
43use common_recordbatch::SendableRecordBatchStream;
44use datafusion::error::DataFusionError;
45use datafusion::logical_expr::LogicalPlan;
46use datatypes::schema::SchemaRef;
47use lazy_static::lazy_static;
48use paste::paste;
49use process_list::InformationSchemaProcessList;
50use store_api::sst_entry::{ManifestSstEntry, PuffinIndexMetaEntry, StorageSstEntry};
51use store_api::storage::{ScanRequest, TableId};
52use table::TableRef;
53use table::metadata::TableType;
54pub use table_names::*;
55use views::InformationSchemaViews;
56
57use self::columns::InformationSchemaColumns;
58use crate::CatalogManager;
59use crate::error::{Error, Result};
60use crate::process_manager::ProcessManagerRef;
61use crate::system_schema::information_schema::cluster_info::InformationSchemaClusterInfo;
62use crate::system_schema::information_schema::flows::InformationSchemaFlows;
63use crate::system_schema::information_schema::information_memory_table::get_schema_columns;
64use crate::system_schema::information_schema::key_column_usage::InformationSchemaKeyColumnUsage;
65use crate::system_schema::information_schema::partitions::InformationSchemaPartitions;
66use crate::system_schema::information_schema::region_peers::InformationSchemaRegionPeers;
67use crate::system_schema::information_schema::schemata::InformationSchemaSchemata;
68use crate::system_schema::information_schema::ssts::{
69 InformationSchemaSstsIndexMeta, InformationSchemaSstsManifest, InformationSchemaSstsStorage,
70};
71use crate::system_schema::information_schema::table_constraints::InformationSchemaTableConstraints;
72use crate::system_schema::information_schema::tables::InformationSchemaTables;
73use crate::system_schema::memory_table::MemoryTable;
74pub(crate) use crate::system_schema::predicate::Predicates;
75use crate::system_schema::{
76 SystemSchemaProvider, SystemSchemaProviderInner, SystemTable, SystemTableRef,
77};
78
79lazy_static! {
80 static ref MEMORY_TABLES: &'static [&'static str] = &[
82 ENGINES,
83 COLUMN_PRIVILEGES,
84 COLUMN_STATISTICS,
85 CHARACTER_SETS,
86 COLLATIONS,
87 COLLATION_CHARACTER_SET_APPLICABILITY,
88 CHECK_CONSTRAINTS,
89 EVENTS,
90 FILES,
91 OPTIMIZER_TRACE,
92 PARAMETERS,
93 PROFILING,
94 REFERENTIAL_CONSTRAINTS,
95 ROUTINES,
96 SCHEMA_PRIVILEGES,
97 TABLE_PRIVILEGES,
98 GLOBAL_STATUS,
99 SESSION_STATUS,
100 PARTITIONS,
101 ];
102}
103
104macro_rules! setup_memory_table {
105 ($name: expr) => {
106 paste! {
107 {
108 let (schema, columns) = get_schema_columns($name);
109 Some(Arc::new(MemoryTable::new(
110 consts::[<INFORMATION_SCHEMA_ $name _TABLE_ID>],
111 $name,
112 schema,
113 columns
114 )) as _)
115 }
116 }
117 };
118}
119
120#[cfg(feature = "enterprise")]
121pub struct MakeInformationTableRequest {
122 pub catalog_name: String,
123 pub catalog_manager: Weak<dyn CatalogManager>,
124 pub kv_backend: KvBackendRef,
125}
126
127#[cfg(feature = "enterprise")]
132pub trait InformationSchemaTableFactory {
133 fn make_information_table(&self, req: MakeInformationTableRequest) -> SystemTableRef;
134}
135
136#[cfg(feature = "enterprise")]
137pub type InformationSchemaTableFactoryRef = Arc<dyn InformationSchemaTableFactory + Send + Sync>;
138
139pub struct InformationSchemaProvider {
141 catalog_name: String,
142 catalog_manager: Weak<dyn CatalogManager>,
143 process_manager: Option<ProcessManagerRef>,
144 flow_metadata_manager: Arc<FlowMetadataManager>,
145 tables: HashMap<String, TableRef>,
146 #[allow(dead_code)]
147 kv_backend: KvBackendRef,
148 #[cfg(feature = "enterprise")]
149 extra_table_factories: HashMap<String, InformationSchemaTableFactoryRef>,
150}
151
152impl SystemSchemaProvider for InformationSchemaProvider {
153 fn tables(&self) -> &HashMap<String, TableRef> {
154 assert!(!self.tables.is_empty());
155
156 &self.tables
157 }
158}
159
160impl SystemSchemaProviderInner for InformationSchemaProvider {
161 fn catalog_name(&self) -> &str {
162 &self.catalog_name
163 }
164 fn schema_name() -> &'static str {
165 INFORMATION_SCHEMA_NAME
166 }
167
168 fn system_table(&self, name: &str) -> Option<SystemTableRef> {
169 #[cfg(feature = "enterprise")]
170 if let Some(factory) = self.extra_table_factories.get(name) {
171 let req = MakeInformationTableRequest {
172 catalog_name: self.catalog_name.clone(),
173 catalog_manager: self.catalog_manager.clone(),
174 kv_backend: self.kv_backend.clone(),
175 };
176 return Some(factory.make_information_table(req));
177 }
178
179 match name.to_ascii_lowercase().as_str() {
180 TABLES => Some(Arc::new(InformationSchemaTables::new(
181 self.catalog_name.clone(),
182 self.catalog_manager.clone(),
183 )) as _),
184 COLUMNS => Some(Arc::new(InformationSchemaColumns::new(
185 self.catalog_name.clone(),
186 self.catalog_manager.clone(),
187 )) as _),
188 ENGINES => setup_memory_table!(ENGINES),
189 COLUMN_PRIVILEGES => setup_memory_table!(COLUMN_PRIVILEGES),
190 COLUMN_STATISTICS => setup_memory_table!(COLUMN_STATISTICS),
191 BUILD_INFO => setup_memory_table!(BUILD_INFO),
192 CHARACTER_SETS => setup_memory_table!(CHARACTER_SETS),
193 COLLATIONS => setup_memory_table!(COLLATIONS),
194 COLLATION_CHARACTER_SET_APPLICABILITY => {
195 setup_memory_table!(COLLATION_CHARACTER_SET_APPLICABILITY)
196 }
197 CHECK_CONSTRAINTS => setup_memory_table!(CHECK_CONSTRAINTS),
198 EVENTS => setup_memory_table!(EVENTS),
199 FILES => setup_memory_table!(FILES),
200 OPTIMIZER_TRACE => setup_memory_table!(OPTIMIZER_TRACE),
201 PARAMETERS => setup_memory_table!(PARAMETERS),
202 PROFILING => setup_memory_table!(PROFILING),
203 REFERENTIAL_CONSTRAINTS => setup_memory_table!(REFERENTIAL_CONSTRAINTS),
204 ROUTINES => setup_memory_table!(ROUTINES),
205 SCHEMA_PRIVILEGES => setup_memory_table!(SCHEMA_PRIVILEGES),
206 TABLE_PRIVILEGES => setup_memory_table!(TABLE_PRIVILEGES),
207 GLOBAL_STATUS => setup_memory_table!(GLOBAL_STATUS),
208 SESSION_STATUS => setup_memory_table!(SESSION_STATUS),
209 KEY_COLUMN_USAGE => Some(Arc::new(InformationSchemaKeyColumnUsage::new(
210 self.catalog_name.clone(),
211 self.catalog_manager.clone(),
212 )) as _),
213 SCHEMATA => Some(Arc::new(InformationSchemaSchemata::new(
214 self.catalog_name.clone(),
215 self.catalog_manager.clone(),
216 )) as _),
217 PARTITIONS => Some(Arc::new(InformationSchemaPartitions::new(
218 self.catalog_name.clone(),
219 self.catalog_manager.clone(),
220 )) as _),
221 REGION_PEERS => Some(Arc::new(InformationSchemaRegionPeers::new(
222 self.catalog_name.clone(),
223 self.catalog_manager.clone(),
224 )) as _),
225 TABLE_CONSTRAINTS => Some(Arc::new(InformationSchemaTableConstraints::new(
226 self.catalog_name.clone(),
227 self.catalog_manager.clone(),
228 )) as _),
229 CLUSTER_INFO => Some(Arc::new(InformationSchemaClusterInfo::new(
230 self.catalog_manager.clone(),
231 )) as _),
232 VIEWS => Some(Arc::new(InformationSchemaViews::new(
233 self.catalog_name.clone(),
234 self.catalog_manager.clone(),
235 )) as _),
236 FLOWS => Some(Arc::new(InformationSchemaFlows::new(
237 self.catalog_name.clone(),
238 self.catalog_manager.clone(),
239 self.flow_metadata_manager.clone(),
240 )) as _),
241 PROCEDURE_INFO => Some(
242 Arc::new(procedure_info::InformationSchemaProcedureInfo::new(
243 self.catalog_manager.clone(),
244 )) as _,
245 ),
246 REGION_STATISTICS => Some(Arc::new(
247 region_statistics::InformationSchemaRegionStatistics::new(
248 self.catalog_manager.clone(),
249 ),
250 ) as _),
251 PROCESS_LIST => self
252 .process_manager
253 .as_ref()
254 .map(|p| Arc::new(InformationSchemaProcessList::new(p.clone())) as _),
255 SSTS_MANIFEST => Some(Arc::new(InformationSchemaSstsManifest::new(
256 self.catalog_manager.clone(),
257 )) as _),
258 SSTS_STORAGE => Some(Arc::new(InformationSchemaSstsStorage::new(
259 self.catalog_manager.clone(),
260 )) as _),
261 SSTS_INDEX_META => Some(Arc::new(InformationSchemaSstsIndexMeta::new(
262 self.catalog_manager.clone(),
263 )) as _),
264 _ => None,
265 }
266 }
267}
268
269impl InformationSchemaProvider {
270 pub fn new(
271 catalog_name: String,
272 catalog_manager: Weak<dyn CatalogManager>,
273 flow_metadata_manager: Arc<FlowMetadataManager>,
274 process_manager: Option<ProcessManagerRef>,
275 kv_backend: KvBackendRef,
276 ) -> Self {
277 let mut provider = Self {
278 catalog_name,
279 catalog_manager,
280 flow_metadata_manager,
281 process_manager,
282 tables: HashMap::new(),
283 kv_backend,
284 #[cfg(feature = "enterprise")]
285 extra_table_factories: HashMap::new(),
286 };
287
288 provider.build_tables();
289
290 provider
291 }
292
293 #[cfg(feature = "enterprise")]
294 pub(crate) fn with_extra_table_factories(
295 mut self,
296 factories: HashMap<String, InformationSchemaTableFactoryRef>,
297 ) -> Self {
298 self.extra_table_factories = factories;
299 self.build_tables();
300 self
301 }
302
303 fn build_tables(&mut self) {
304 let mut tables = HashMap::new();
305
306 if self.catalog_name == DEFAULT_CATALOG_NAME {
311 tables.insert(
312 BUILD_INFO.to_string(),
313 self.build_table(BUILD_INFO).unwrap(),
314 );
315 tables.insert(
316 REGION_PEERS.to_string(),
317 self.build_table(REGION_PEERS).unwrap(),
318 );
319 tables.insert(
320 CLUSTER_INFO.to_string(),
321 self.build_table(CLUSTER_INFO).unwrap(),
322 );
323 tables.insert(
324 PROCEDURE_INFO.to_string(),
325 self.build_table(PROCEDURE_INFO).unwrap(),
326 );
327 tables.insert(
328 REGION_STATISTICS.to_string(),
329 self.build_table(REGION_STATISTICS).unwrap(),
330 );
331 tables.insert(
332 SSTS_MANIFEST.to_string(),
333 self.build_table(SSTS_MANIFEST).unwrap(),
334 );
335 tables.insert(
336 SSTS_STORAGE.to_string(),
337 self.build_table(SSTS_STORAGE).unwrap(),
338 );
339 tables.insert(
340 SSTS_INDEX_META.to_string(),
341 self.build_table(SSTS_INDEX_META).unwrap(),
342 );
343 }
344
345 tables.insert(TABLES.to_string(), self.build_table(TABLES).unwrap());
346 tables.insert(VIEWS.to_string(), self.build_table(VIEWS).unwrap());
347 tables.insert(SCHEMATA.to_string(), self.build_table(SCHEMATA).unwrap());
348 tables.insert(COLUMNS.to_string(), self.build_table(COLUMNS).unwrap());
349 tables.insert(
350 KEY_COLUMN_USAGE.to_string(),
351 self.build_table(KEY_COLUMN_USAGE).unwrap(),
352 );
353 tables.insert(
354 TABLE_CONSTRAINTS.to_string(),
355 self.build_table(TABLE_CONSTRAINTS).unwrap(),
356 );
357 tables.insert(FLOWS.to_string(), self.build_table(FLOWS).unwrap());
358 if let Some(process_list) = self.build_table(PROCESS_LIST) {
359 tables.insert(PROCESS_LIST.to_string(), process_list);
360 }
361 #[cfg(feature = "enterprise")]
362 for name in self.extra_table_factories.keys() {
363 tables.insert(name.clone(), self.build_table(name).expect(name));
364 }
365 for name in MEMORY_TABLES.iter() {
367 tables.insert((*name).to_string(), self.build_table(name).expect(name));
368 }
369 self.tables = tables;
370 }
371}
372
373pub trait InformationTable {
374 fn table_id(&self) -> TableId;
375
376 fn table_name(&self) -> &'static str;
377
378 fn schema(&self) -> SchemaRef;
379
380 fn to_stream(&self, request: ScanRequest) -> Result<SendableRecordBatchStream>;
381
382 fn table_type(&self) -> TableType {
383 TableType::Temporary
384 }
385}
386
387impl<T> SystemTable for T
389where
390 T: InformationTable,
391{
392 fn table_id(&self) -> TableId {
393 InformationTable::table_id(self)
394 }
395
396 fn table_name(&self) -> &'static str {
397 InformationTable::table_name(self)
398 }
399
400 fn schema(&self) -> SchemaRef {
401 InformationTable::schema(self)
402 }
403
404 fn table_type(&self) -> TableType {
405 InformationTable::table_type(self)
406 }
407
408 fn to_stream(&self, request: ScanRequest) -> Result<SendableRecordBatchStream> {
409 InformationTable::to_stream(self, request)
410 }
411}
412
413pub type InformationExtensionRef = Arc<dyn InformationExtension<Error = Error> + Send + Sync>;
414
415#[async_trait::async_trait]
417pub trait InformationExtension {
418 type Error: ErrorExt;
419
420 async fn nodes(&self) -> std::result::Result<Vec<NodeInfo>, Self::Error>;
422
423 async fn procedures(&self) -> std::result::Result<Vec<(String, ProcedureInfo)>, Self::Error>;
425
426 async fn region_stats(&self) -> std::result::Result<Vec<RegionStat>, Self::Error>;
428
429 async fn flow_stats(&self) -> std::result::Result<Option<FlowStat>, Self::Error>;
431
432 async fn inspect_datanode(
434 &self,
435 request: DatanodeInspectRequest,
436 ) -> std::result::Result<SendableRecordBatchStream, Self::Error>;
437}
438
439#[derive(Debug, Clone, PartialEq, Eq)]
441pub struct DatanodeInspectRequest {
442 pub kind: DatanodeInspectKind,
444
445 pub scan: ScanRequest,
448}
449
450#[derive(Debug, Clone, Copy, PartialEq, Eq)]
452pub enum DatanodeInspectKind {
453 SstManifest,
455 SstStorage,
457 SstIndexMeta,
459}
460
461impl DatanodeInspectRequest {
462 pub fn build_plan(self) -> std::result::Result<LogicalPlan, DataFusionError> {
464 match self.kind {
465 DatanodeInspectKind::SstManifest => ManifestSstEntry::build_plan(self.scan),
466 DatanodeInspectKind::SstStorage => StorageSstEntry::build_plan(self.scan),
467 DatanodeInspectKind::SstIndexMeta => PuffinIndexMetaEntry::build_plan(self.scan),
468 }
469 }
470}
471pub struct NoopInformationExtension;
472
473#[async_trait::async_trait]
474impl InformationExtension for NoopInformationExtension {
475 type Error = Error;
476
477 async fn nodes(&self) -> std::result::Result<Vec<NodeInfo>, Self::Error> {
478 Ok(vec![])
479 }
480
481 async fn procedures(&self) -> std::result::Result<Vec<(String, ProcedureInfo)>, Self::Error> {
482 Ok(vec![])
483 }
484
485 async fn region_stats(&self) -> std::result::Result<Vec<RegionStat>, Self::Error> {
486 Ok(vec![])
487 }
488
489 async fn flow_stats(&self) -> std::result::Result<Option<FlowStat>, Self::Error> {
490 Ok(None)
491 }
492
493 async fn inspect_datanode(
494 &self,
495 _request: DatanodeInspectRequest,
496 ) -> std::result::Result<SendableRecordBatchStream, Self::Error> {
497 Ok(common_recordbatch::RecordBatches::empty().as_stream())
498 }
499}