1mod cluster_info;
16pub mod columns;
17pub mod flows;
18mod information_memory_table;
19pub mod key_column_usage;
20mod partitions;
21mod procedure_info;
22pub mod process_list;
23mod region_info;
24pub mod region_peers;
25mod region_statistics;
26pub mod schemata;
27mod ssts;
28pub mod statistics;
29mod table_constraints;
30mod table_names;
31mod table_semantics;
32pub mod tables;
33mod views;
34
35use std::collections::HashMap;
36use std::sync::{Arc, Weak};
37
38use common_catalog::consts::{self, DEFAULT_CATALOG_NAME, INFORMATION_SCHEMA_NAME};
39use common_error::ext::ErrorExt;
40use common_meta::cluster::NodeInfo;
41use common_meta::datanode::RegionStat;
42use common_meta::key::flow::FlowMetadataManager;
43use common_meta::key::flow::flow_state::FlowStat;
44use common_meta::kv_backend::KvBackendRef;
45use common_procedure::ProcedureInfo;
46use common_recordbatch::SendableRecordBatchStream;
47use datafusion::error::DataFusionError;
48use datafusion::logical_expr::LogicalPlan;
49use datatypes::schema::SchemaRef;
50use lazy_static::lazy_static;
51use paste::paste;
52use process_list::InformationSchemaProcessList;
53use region_info::InformationSchemaRegionInfo;
54use store_api::metric_engine_consts::{
55 MEMTABLE_PARTITION_TREE_PRIMARY_KEY_ENCODING, PRIMARY_KEY_ENCODING,
56};
57use store_api::region_info::RegionInfoEntry;
58use store_api::sst_entry::{ManifestSstEntry, PuffinIndexMetaEntry, StorageSstEntry};
59use store_api::storage::{ScanRequest, TableId};
60use table::TableRef;
61use table::metadata::TableType;
62pub use table_names::*;
63use views::InformationSchemaViews;
64
65use self::columns::InformationSchemaColumns;
66use crate::CatalogManager;
67use crate::error::{Error, Result};
68use crate::process_manager::ProcessManagerRef;
69use crate::system_schema::information_schema::cluster_info::InformationSchemaClusterInfo;
70use crate::system_schema::information_schema::flows::InformationSchemaFlows;
71use crate::system_schema::information_schema::information_memory_table::get_schema_columns;
72use crate::system_schema::information_schema::key_column_usage::InformationSchemaKeyColumnUsage;
73use crate::system_schema::information_schema::partitions::InformationSchemaPartitions;
74use crate::system_schema::information_schema::region_peers::InformationSchemaRegionPeers;
75use crate::system_schema::information_schema::schemata::InformationSchemaSchemata;
76use crate::system_schema::information_schema::ssts::{
77 InformationSchemaSstsIndexMeta, InformationSchemaSstsManifest, InformationSchemaSstsStorage,
78};
79use crate::system_schema::information_schema::statistics::InformationSchemaStatistics;
80use crate::system_schema::information_schema::table_constraints::InformationSchemaTableConstraints;
81use crate::system_schema::information_schema::table_semantics::InformationSchemaTableSemantics;
82use crate::system_schema::information_schema::tables::InformationSchemaTables;
83use crate::system_schema::memory_table::MemoryTable;
84pub(crate) use crate::system_schema::predicate::Predicates;
85use crate::system_schema::{
86 SystemSchemaProvider, SystemSchemaProviderInner, SystemTable, SystemTableRef,
87};
88
89const DENSE_PRIMARY_KEY_ENCODING: &str = "dense";
90const SPARSE_PRIMARY_KEY_ENCODING: &str = "sparse";
91
92pub(crate) fn primary_key_encoding_index_type(options: &HashMap<String, String>) -> &'static str {
93 options
94 .get(PRIMARY_KEY_ENCODING)
95 .or_else(|| options.get(MEMTABLE_PARTITION_TREE_PRIMARY_KEY_ENCODING))
96 .map(|value| {
97 if value.eq_ignore_ascii_case(SPARSE_PRIMARY_KEY_ENCODING) {
98 SPARSE_PRIMARY_KEY_ENCODING
99 } else {
100 DENSE_PRIMARY_KEY_ENCODING
101 }
102 })
103 .unwrap_or(DENSE_PRIMARY_KEY_ENCODING)
104}
105
106lazy_static! {
107 static ref MEMORY_TABLES: &'static [&'static str] = &[
109 ENGINES,
110 COLUMN_PRIVILEGES,
111 COLUMN_STATISTICS,
112 CHARACTER_SETS,
113 COLLATIONS,
114 COLLATION_CHARACTER_SET_APPLICABILITY,
115 CHECK_CONSTRAINTS,
116 EVENTS,
117 FILES,
118 OPTIMIZER_TRACE,
119 PARAMETERS,
120 PROFILING,
121 REFERENTIAL_CONSTRAINTS,
122 ROUTINES,
123 SCHEMA_PRIVILEGES,
124 TABLE_PRIVILEGES,
125 GLOBAL_STATUS,
126 SESSION_STATUS,
127 PARTITIONS,
128 ];
129}
130
131macro_rules! setup_memory_table {
132 ($name: expr) => {
133 paste! {
134 {
135 let (schema, columns) = get_schema_columns($name);
136 Some(Arc::new(MemoryTable::new(
137 consts::[<INFORMATION_SCHEMA_ $name _TABLE_ID>],
138 $name,
139 schema,
140 columns
141 )) as _)
142 }
143 }
144 };
145}
146
147pub struct MakeInformationTableRequest {
148 pub catalog_name: String,
149 pub catalog_manager: Weak<dyn CatalogManager>,
150 pub kv_backend: KvBackendRef,
151}
152
153pub trait InformationSchemaTableFactory {
158 fn make_information_table(&self, req: MakeInformationTableRequest) -> SystemTableRef;
159}
160
161pub type InformationSchemaTableFactoryRef = Arc<dyn InformationSchemaTableFactory + Send + Sync>;
162
163pub struct InformationSchemaProvider {
165 catalog_name: String,
166 catalog_manager: Weak<dyn CatalogManager>,
167 process_manager: Option<ProcessManagerRef>,
168 flow_metadata_manager: Arc<FlowMetadataManager>,
169 tables: HashMap<String, TableRef>,
170 kv_backend: KvBackendRef,
171 extra_table_factories: HashMap<String, InformationSchemaTableFactoryRef>,
172}
173
174impl SystemSchemaProvider for InformationSchemaProvider {
175 fn tables(&self) -> &HashMap<String, TableRef> {
176 assert!(!self.tables.is_empty());
177
178 &self.tables
179 }
180}
181
182impl SystemSchemaProviderInner for InformationSchemaProvider {
183 fn catalog_name(&self) -> &str {
184 &self.catalog_name
185 }
186 fn schema_name() -> &'static str {
187 INFORMATION_SCHEMA_NAME
188 }
189
190 fn system_table(&self, name: &str) -> Option<SystemTableRef> {
191 if let Some(factory) = self.extra_table_factories.get(name) {
192 let req = MakeInformationTableRequest {
193 catalog_name: self.catalog_name.clone(),
194 catalog_manager: self.catalog_manager.clone(),
195 kv_backend: self.kv_backend.clone(),
196 };
197 return Some(factory.make_information_table(req));
198 }
199
200 match name.to_ascii_lowercase().as_str() {
201 TABLES => Some(Arc::new(InformationSchemaTables::new(
202 self.catalog_name.clone(),
203 self.catalog_manager.clone(),
204 )) as _),
205 COLUMNS => Some(Arc::new(InformationSchemaColumns::new(
206 self.catalog_name.clone(),
207 self.catalog_manager.clone(),
208 )) as _),
209 ENGINES => setup_memory_table!(ENGINES),
210 COLUMN_PRIVILEGES => setup_memory_table!(COLUMN_PRIVILEGES),
211 COLUMN_STATISTICS => setup_memory_table!(COLUMN_STATISTICS),
212 BUILD_INFO => setup_memory_table!(BUILD_INFO),
213 CHARACTER_SETS => setup_memory_table!(CHARACTER_SETS),
214 COLLATIONS => setup_memory_table!(COLLATIONS),
215 COLLATION_CHARACTER_SET_APPLICABILITY => {
216 setup_memory_table!(COLLATION_CHARACTER_SET_APPLICABILITY)
217 }
218 CHECK_CONSTRAINTS => setup_memory_table!(CHECK_CONSTRAINTS),
219 EVENTS => setup_memory_table!(EVENTS),
220 FILES => setup_memory_table!(FILES),
221 OPTIMIZER_TRACE => setup_memory_table!(OPTIMIZER_TRACE),
222 PARAMETERS => setup_memory_table!(PARAMETERS),
223 PROFILING => setup_memory_table!(PROFILING),
224 REFERENTIAL_CONSTRAINTS => setup_memory_table!(REFERENTIAL_CONSTRAINTS),
225 ROUTINES => setup_memory_table!(ROUTINES),
226 SCHEMA_PRIVILEGES => setup_memory_table!(SCHEMA_PRIVILEGES),
227 TABLE_PRIVILEGES => setup_memory_table!(TABLE_PRIVILEGES),
228 GLOBAL_STATUS => setup_memory_table!(GLOBAL_STATUS),
229 SESSION_STATUS => setup_memory_table!(SESSION_STATUS),
230 KEY_COLUMN_USAGE => Some(Arc::new(InformationSchemaKeyColumnUsage::new(
231 self.catalog_name.clone(),
232 self.catalog_manager.clone(),
233 )) as _),
234 SCHEMATA => Some(Arc::new(InformationSchemaSchemata::new(
235 self.catalog_name.clone(),
236 self.catalog_manager.clone(),
237 )) as _),
238 PARTITIONS => Some(Arc::new(InformationSchemaPartitions::new(
239 self.catalog_name.clone(),
240 self.catalog_manager.clone(),
241 )) as _),
242 REGION_PEERS => Some(Arc::new(InformationSchemaRegionPeers::new(
243 self.catalog_name.clone(),
244 self.catalog_manager.clone(),
245 )) as _),
246 TABLE_CONSTRAINTS => Some(Arc::new(InformationSchemaTableConstraints::new(
247 self.catalog_name.clone(),
248 self.catalog_manager.clone(),
249 )) as _),
250 STATISTICS => Some(Arc::new(InformationSchemaStatistics::new(
251 self.catalog_name.clone(),
252 self.catalog_manager.clone(),
253 )) as _),
254 CLUSTER_INFO => Some(Arc::new(InformationSchemaClusterInfo::new(
255 self.catalog_manager.clone(),
256 )) as _),
257 VIEWS => Some(Arc::new(InformationSchemaViews::new(
258 self.catalog_name.clone(),
259 self.catalog_manager.clone(),
260 )) as _),
261 FLOWS => Some(Arc::new(InformationSchemaFlows::new(
262 self.catalog_name.clone(),
263 self.catalog_manager.clone(),
264 self.flow_metadata_manager.clone(),
265 )) as _),
266 PROCEDURE_INFO => Some(
267 Arc::new(procedure_info::InformationSchemaProcedureInfo::new(
268 self.catalog_manager.clone(),
269 )) as _,
270 ),
271 REGION_STATISTICS => Some(Arc::new(
272 region_statistics::InformationSchemaRegionStatistics::new(
273 self.catalog_manager.clone(),
274 ),
275 ) as _),
276 REGION_INFO => Some(Arc::new(InformationSchemaRegionInfo::new(
277 self.catalog_manager.clone(),
278 )) as _),
279 PROCESS_LIST => self
280 .process_manager
281 .as_ref()
282 .map(|p| Arc::new(InformationSchemaProcessList::new(p.clone())) as _),
283 SSTS_MANIFEST => Some(Arc::new(InformationSchemaSstsManifest::new(
284 self.catalog_manager.clone(),
285 )) as _),
286 SSTS_STORAGE => Some(Arc::new(InformationSchemaSstsStorage::new(
287 self.catalog_manager.clone(),
288 )) as _),
289 SSTS_INDEX_META => Some(Arc::new(InformationSchemaSstsIndexMeta::new(
290 self.catalog_manager.clone(),
291 )) as _),
292 TABLE_SEMANTICS => Some(Arc::new(InformationSchemaTableSemantics::new(
293 self.catalog_name.clone(),
294 self.catalog_manager.clone(),
295 )) as _),
296 _ => None,
297 }
298 }
299}
300
301impl InformationSchemaProvider {
302 pub fn new(
303 catalog_name: String,
304 catalog_manager: Weak<dyn CatalogManager>,
305 flow_metadata_manager: Arc<FlowMetadataManager>,
306 process_manager: Option<ProcessManagerRef>,
307 kv_backend: KvBackendRef,
308 ) -> Self {
309 let mut provider = Self {
310 catalog_name,
311 catalog_manager,
312 flow_metadata_manager,
313 process_manager,
314 tables: HashMap::new(),
315 kv_backend,
316 extra_table_factories: HashMap::new(),
317 };
318
319 provider.build_tables();
320
321 provider
322 }
323
324 pub(crate) fn with_extra_table_factories(
325 mut self,
326 factories: HashMap<String, InformationSchemaTableFactoryRef>,
327 ) -> Self {
328 self.extra_table_factories = factories;
329 self.build_tables();
330 self
331 }
332
333 fn build_tables(&mut self) {
334 let mut tables = HashMap::new();
335
336 if self.catalog_name == DEFAULT_CATALOG_NAME {
341 tables.insert(
342 BUILD_INFO.to_string(),
343 self.build_table(BUILD_INFO).unwrap(),
344 );
345 tables.insert(
346 REGION_PEERS.to_string(),
347 self.build_table(REGION_PEERS).unwrap(),
348 );
349 tables.insert(
350 CLUSTER_INFO.to_string(),
351 self.build_table(CLUSTER_INFO).unwrap(),
352 );
353 tables.insert(
354 PROCEDURE_INFO.to_string(),
355 self.build_table(PROCEDURE_INFO).unwrap(),
356 );
357 tables.insert(
358 REGION_STATISTICS.to_string(),
359 self.build_table(REGION_STATISTICS).unwrap(),
360 );
361 tables.insert(
362 REGION_INFO.to_string(),
363 self.build_table(REGION_INFO).unwrap(),
364 );
365 tables.insert(
366 SSTS_MANIFEST.to_string(),
367 self.build_table(SSTS_MANIFEST).unwrap(),
368 );
369 tables.insert(
370 SSTS_STORAGE.to_string(),
371 self.build_table(SSTS_STORAGE).unwrap(),
372 );
373 tables.insert(
374 SSTS_INDEX_META.to_string(),
375 self.build_table(SSTS_INDEX_META).unwrap(),
376 );
377 }
378
379 tables.insert(TABLES.to_string(), self.build_table(TABLES).unwrap());
380 tables.insert(VIEWS.to_string(), self.build_table(VIEWS).unwrap());
381 tables.insert(SCHEMATA.to_string(), self.build_table(SCHEMATA).unwrap());
382 tables.insert(COLUMNS.to_string(), self.build_table(COLUMNS).unwrap());
383 tables.insert(
384 KEY_COLUMN_USAGE.to_string(),
385 self.build_table(KEY_COLUMN_USAGE).unwrap(),
386 );
387 tables.insert(
388 TABLE_CONSTRAINTS.to_string(),
389 self.build_table(TABLE_CONSTRAINTS).unwrap(),
390 );
391 tables.insert(
392 STATISTICS.to_string(),
393 self.build_table(STATISTICS).unwrap(),
394 );
395 tables.insert(FLOWS.to_string(), self.build_table(FLOWS).unwrap());
396 tables.insert(
397 TABLE_SEMANTICS.to_string(),
398 self.build_table(TABLE_SEMANTICS).unwrap(),
399 );
400 if let Some(process_list) = self.build_table(PROCESS_LIST) {
401 tables.insert(PROCESS_LIST.to_string(), process_list);
402 }
403 for name in self.extra_table_factories.keys() {
404 tables.insert(name.clone(), self.build_table(name).expect(name));
405 }
406 for name in MEMORY_TABLES.iter() {
408 tables.insert((*name).to_string(), self.build_table(name).expect(name));
409 }
410 self.tables = tables;
411 }
412}
413
414pub trait InformationTable {
415 fn table_id(&self) -> TableId;
416
417 fn table_name(&self) -> &'static str;
418
419 fn schema(&self) -> SchemaRef;
420
421 fn to_stream(&self, request: ScanRequest) -> Result<SendableRecordBatchStream>;
422
423 fn table_type(&self) -> TableType {
424 TableType::Temporary
425 }
426}
427
428impl<T> SystemTable for T
430where
431 T: InformationTable,
432{
433 fn table_id(&self) -> TableId {
434 InformationTable::table_id(self)
435 }
436
437 fn table_name(&self) -> &'static str {
438 InformationTable::table_name(self)
439 }
440
441 fn schema(&self) -> SchemaRef {
442 InformationTable::schema(self)
443 }
444
445 fn table_type(&self) -> TableType {
446 InformationTable::table_type(self)
447 }
448
449 fn to_stream(&self, request: ScanRequest) -> Result<SendableRecordBatchStream> {
450 InformationTable::to_stream(self, request)
451 }
452}
453
454pub type InformationExtensionRef = Arc<dyn InformationExtension<Error = Error> + Send + Sync>;
455
456#[async_trait::async_trait]
458pub trait InformationExtension {
459 type Error: ErrorExt;
460
461 async fn nodes(&self) -> std::result::Result<Vec<NodeInfo>, Self::Error>;
463
464 async fn procedures(&self) -> std::result::Result<Vec<(String, ProcedureInfo)>, Self::Error>;
466
467 async fn region_stats(&self) -> std::result::Result<Vec<RegionStat>, Self::Error>;
469
470 async fn flow_stats(&self) -> std::result::Result<Option<FlowStat>, Self::Error>;
472
473 async fn inspect_datanode(
475 &self,
476 request: DatanodeInspectRequest,
477 ) -> std::result::Result<SendableRecordBatchStream, Self::Error>;
478}
479
480#[derive(Debug, Clone, PartialEq)]
482pub struct DatanodeInspectRequest {
483 pub kind: DatanodeInspectKind,
485
486 pub scan: ScanRequest,
489}
490
491#[derive(Debug, Clone, Copy, PartialEq, Eq)]
493pub enum DatanodeInspectKind {
494 SstManifest,
496 SstStorage,
498 SstIndexMeta,
500 RegionInfo,
502}
503
504impl DatanodeInspectRequest {
505 pub fn build_plan(self) -> std::result::Result<LogicalPlan, DataFusionError> {
507 match self.kind {
508 DatanodeInspectKind::SstManifest => ManifestSstEntry::build_plan(self.scan),
509 DatanodeInspectKind::SstStorage => StorageSstEntry::build_plan(self.scan),
510 DatanodeInspectKind::SstIndexMeta => PuffinIndexMetaEntry::build_plan(self.scan),
511 DatanodeInspectKind::RegionInfo => RegionInfoEntry::build_plan(self.scan),
512 }
513 }
514}
515pub struct NoopInformationExtension;
516
517#[async_trait::async_trait]
518impl InformationExtension for NoopInformationExtension {
519 type Error = Error;
520
521 async fn nodes(&self) -> std::result::Result<Vec<NodeInfo>, Self::Error> {
522 Ok(vec![])
523 }
524
525 async fn procedures(&self) -> std::result::Result<Vec<(String, ProcedureInfo)>, Self::Error> {
526 Ok(vec![])
527 }
528
529 async fn region_stats(&self) -> std::result::Result<Vec<RegionStat>, Self::Error> {
530 Ok(vec![])
531 }
532
533 async fn flow_stats(&self) -> std::result::Result<Option<FlowStat>, Self::Error> {
534 Ok(None)
535 }
536
537 async fn inspect_datanode(
538 &self,
539 _request: DatanodeInspectRequest,
540 ) -> std::result::Result<SendableRecordBatchStream, Self::Error> {
541 Ok(common_recordbatch::RecordBatches::empty().as_stream())
542 }
543}
544
545#[cfg(test)]
546mod tests {
547 use store_api::region_info::RegionInfoEntry;
548
549 use super::*;
550
551 #[test]
552 fn test_datanode_inspect_region_info_build_plan() {
553 let plan = DatanodeInspectRequest {
554 kind: DatanodeInspectKind::RegionInfo,
555 scan: ScanRequest::default(),
556 }
557 .build_plan()
558 .unwrap();
559
560 let LogicalPlan::TableScan(scan) = plan else {
561 panic!("expected table scan");
562 };
563 assert_eq!(
564 scan.table_name.to_string(),
565 RegionInfoEntry::reserved_table_name_for_inspection()
566 );
567 }
568}