1mod cluster_info;
16pub mod columns;
17pub mod flows;
18mod information_memory_table;
19pub mod key_column_usage;
20mod partitions;
21mod procedure_info;
22pub mod process_list;
23pub mod region_peers;
24mod region_statistics;
25mod runtime_metrics;
26pub mod schemata;
27mod table_constraints;
28mod table_names;
29pub mod tables;
30mod views;
31
32use std::collections::HashMap;
33use std::sync::{Arc, Weak};
34
35use common_catalog::consts::{self, DEFAULT_CATALOG_NAME, INFORMATION_SCHEMA_NAME};
36use common_error::ext::ErrorExt;
37use common_meta::cluster::NodeInfo;
38use common_meta::datanode::RegionStat;
39use common_meta::key::flow::flow_state::FlowStat;
40use common_meta::key::flow::FlowMetadataManager;
41use common_procedure::ProcedureInfo;
42use common_recordbatch::SendableRecordBatchStream;
43use datatypes::schema::SchemaRef;
44use lazy_static::lazy_static;
45use paste::paste;
46use process_list::InformationSchemaProcessList;
47use store_api::storage::{ScanRequest, TableId};
48use table::metadata::TableType;
49use table::TableRef;
50pub use table_names::*;
51use views::InformationSchemaViews;
52
53use self::columns::InformationSchemaColumns;
54use crate::error::{Error, Result};
55use crate::process_manager::ProcessManagerRef;
56use crate::system_schema::information_schema::cluster_info::InformationSchemaClusterInfo;
57use crate::system_schema::information_schema::flows::InformationSchemaFlows;
58use crate::system_schema::information_schema::information_memory_table::get_schema_columns;
59use crate::system_schema::information_schema::key_column_usage::InformationSchemaKeyColumnUsage;
60use crate::system_schema::information_schema::partitions::InformationSchemaPartitions;
61use crate::system_schema::information_schema::region_peers::InformationSchemaRegionPeers;
62use crate::system_schema::information_schema::runtime_metrics::InformationSchemaMetrics;
63use crate::system_schema::information_schema::schemata::InformationSchemaSchemata;
64use crate::system_schema::information_schema::table_constraints::InformationSchemaTableConstraints;
65use crate::system_schema::information_schema::tables::InformationSchemaTables;
66use crate::system_schema::memory_table::MemoryTable;
67pub(crate) use crate::system_schema::predicate::Predicates;
68use crate::system_schema::{
69 SystemSchemaProvider, SystemSchemaProviderInner, SystemTable, SystemTableRef,
70};
71use crate::CatalogManager;
72
73lazy_static! {
74 static ref MEMORY_TABLES: &'static [&'static str] = &[
76 ENGINES,
77 COLUMN_PRIVILEGES,
78 COLUMN_STATISTICS,
79 CHARACTER_SETS,
80 COLLATIONS,
81 COLLATION_CHARACTER_SET_APPLICABILITY,
82 CHECK_CONSTRAINTS,
83 EVENTS,
84 FILES,
85 OPTIMIZER_TRACE,
86 PARAMETERS,
87 PROFILING,
88 REFERENTIAL_CONSTRAINTS,
89 ROUTINES,
90 SCHEMA_PRIVILEGES,
91 TABLE_PRIVILEGES,
92 TRIGGERS,
93 GLOBAL_STATUS,
94 SESSION_STATUS,
95 PARTITIONS,
96 ];
97}
98
99macro_rules! setup_memory_table {
100 ($name: expr) => {
101 paste! {
102 {
103 let (schema, columns) = get_schema_columns($name);
104 Some(Arc::new(MemoryTable::new(
105 consts::[<INFORMATION_SCHEMA_ $name _TABLE_ID>],
106 $name,
107 schema,
108 columns
109 )) as _)
110 }
111 }
112 };
113}
114
115pub struct InformationSchemaProvider {
117 catalog_name: String,
118 catalog_manager: Weak<dyn CatalogManager>,
119 process_manager: Option<ProcessManagerRef>,
120 flow_metadata_manager: Arc<FlowMetadataManager>,
121 tables: HashMap<String, TableRef>,
122}
123
124impl SystemSchemaProvider for InformationSchemaProvider {
125 fn tables(&self) -> &HashMap<String, TableRef> {
126 assert!(!self.tables.is_empty());
127
128 &self.tables
129 }
130}
131impl SystemSchemaProviderInner for InformationSchemaProvider {
132 fn catalog_name(&self) -> &str {
133 &self.catalog_name
134 }
135 fn schema_name() -> &'static str {
136 INFORMATION_SCHEMA_NAME
137 }
138
139 fn system_table(&self, name: &str) -> Option<SystemTableRef> {
140 match name.to_ascii_lowercase().as_str() {
141 TABLES => Some(Arc::new(InformationSchemaTables::new(
142 self.catalog_name.clone(),
143 self.catalog_manager.clone(),
144 )) as _),
145 COLUMNS => Some(Arc::new(InformationSchemaColumns::new(
146 self.catalog_name.clone(),
147 self.catalog_manager.clone(),
148 )) as _),
149 ENGINES => setup_memory_table!(ENGINES),
150 COLUMN_PRIVILEGES => setup_memory_table!(COLUMN_PRIVILEGES),
151 COLUMN_STATISTICS => setup_memory_table!(COLUMN_STATISTICS),
152 BUILD_INFO => setup_memory_table!(BUILD_INFO),
153 CHARACTER_SETS => setup_memory_table!(CHARACTER_SETS),
154 COLLATIONS => setup_memory_table!(COLLATIONS),
155 COLLATION_CHARACTER_SET_APPLICABILITY => {
156 setup_memory_table!(COLLATION_CHARACTER_SET_APPLICABILITY)
157 }
158 CHECK_CONSTRAINTS => setup_memory_table!(CHECK_CONSTRAINTS),
159 EVENTS => setup_memory_table!(EVENTS),
160 FILES => setup_memory_table!(FILES),
161 OPTIMIZER_TRACE => setup_memory_table!(OPTIMIZER_TRACE),
162 PARAMETERS => setup_memory_table!(PARAMETERS),
163 PROFILING => setup_memory_table!(PROFILING),
164 REFERENTIAL_CONSTRAINTS => setup_memory_table!(REFERENTIAL_CONSTRAINTS),
165 ROUTINES => setup_memory_table!(ROUTINES),
166 SCHEMA_PRIVILEGES => setup_memory_table!(SCHEMA_PRIVILEGES),
167 TABLE_PRIVILEGES => setup_memory_table!(TABLE_PRIVILEGES),
168 TRIGGERS => setup_memory_table!(TRIGGERS),
169 GLOBAL_STATUS => setup_memory_table!(GLOBAL_STATUS),
170 SESSION_STATUS => setup_memory_table!(SESSION_STATUS),
171 KEY_COLUMN_USAGE => Some(Arc::new(InformationSchemaKeyColumnUsage::new(
172 self.catalog_name.clone(),
173 self.catalog_manager.clone(),
174 )) as _),
175 SCHEMATA => Some(Arc::new(InformationSchemaSchemata::new(
176 self.catalog_name.clone(),
177 self.catalog_manager.clone(),
178 )) as _),
179 RUNTIME_METRICS => Some(Arc::new(InformationSchemaMetrics::new())),
180 PARTITIONS => Some(Arc::new(InformationSchemaPartitions::new(
181 self.catalog_name.clone(),
182 self.catalog_manager.clone(),
183 )) as _),
184 REGION_PEERS => Some(Arc::new(InformationSchemaRegionPeers::new(
185 self.catalog_name.clone(),
186 self.catalog_manager.clone(),
187 )) as _),
188 TABLE_CONSTRAINTS => Some(Arc::new(InformationSchemaTableConstraints::new(
189 self.catalog_name.clone(),
190 self.catalog_manager.clone(),
191 )) as _),
192 CLUSTER_INFO => Some(Arc::new(InformationSchemaClusterInfo::new(
193 self.catalog_manager.clone(),
194 )) as _),
195 VIEWS => Some(Arc::new(InformationSchemaViews::new(
196 self.catalog_name.clone(),
197 self.catalog_manager.clone(),
198 )) as _),
199 FLOWS => Some(Arc::new(InformationSchemaFlows::new(
200 self.catalog_name.clone(),
201 self.catalog_manager.clone(),
202 self.flow_metadata_manager.clone(),
203 )) as _),
204 PROCEDURE_INFO => Some(
205 Arc::new(procedure_info::InformationSchemaProcedureInfo::new(
206 self.catalog_manager.clone(),
207 )) as _,
208 ),
209 REGION_STATISTICS => Some(Arc::new(
210 region_statistics::InformationSchemaRegionStatistics::new(
211 self.catalog_manager.clone(),
212 ),
213 ) as _),
214 PROCESS_LIST => self
215 .process_manager
216 .as_ref()
217 .map(|p| Arc::new(InformationSchemaProcessList::new(p.clone())) as _),
218 _ => None,
219 }
220 }
221}
222
223impl InformationSchemaProvider {
224 pub fn new(
225 catalog_name: String,
226 catalog_manager: Weak<dyn CatalogManager>,
227 flow_metadata_manager: Arc<FlowMetadataManager>,
228 process_manager: Option<ProcessManagerRef>,
229 ) -> Self {
230 let mut provider = Self {
231 catalog_name,
232 catalog_manager,
233 flow_metadata_manager,
234 process_manager,
235 tables: HashMap::new(),
236 };
237
238 provider.build_tables();
239
240 provider
241 }
242
243 fn build_tables(&mut self) {
244 let mut tables = HashMap::new();
245
246 if self.catalog_name == DEFAULT_CATALOG_NAME {
251 tables.insert(
252 RUNTIME_METRICS.to_string(),
253 self.build_table(RUNTIME_METRICS).unwrap(),
254 );
255 tables.insert(
256 BUILD_INFO.to_string(),
257 self.build_table(BUILD_INFO).unwrap(),
258 );
259 tables.insert(
260 REGION_PEERS.to_string(),
261 self.build_table(REGION_PEERS).unwrap(),
262 );
263 tables.insert(
264 CLUSTER_INFO.to_string(),
265 self.build_table(CLUSTER_INFO).unwrap(),
266 );
267 tables.insert(
268 PROCEDURE_INFO.to_string(),
269 self.build_table(PROCEDURE_INFO).unwrap(),
270 );
271 tables.insert(
272 REGION_STATISTICS.to_string(),
273 self.build_table(REGION_STATISTICS).unwrap(),
274 );
275 }
276
277 tables.insert(TABLES.to_string(), self.build_table(TABLES).unwrap());
278 tables.insert(VIEWS.to_string(), self.build_table(VIEWS).unwrap());
279 tables.insert(SCHEMATA.to_string(), self.build_table(SCHEMATA).unwrap());
280 tables.insert(COLUMNS.to_string(), self.build_table(COLUMNS).unwrap());
281 tables.insert(
282 KEY_COLUMN_USAGE.to_string(),
283 self.build_table(KEY_COLUMN_USAGE).unwrap(),
284 );
285 tables.insert(
286 TABLE_CONSTRAINTS.to_string(),
287 self.build_table(TABLE_CONSTRAINTS).unwrap(),
288 );
289 tables.insert(FLOWS.to_string(), self.build_table(FLOWS).unwrap());
290 if let Some(process_list) = self.build_table(PROCESS_LIST) {
291 tables.insert(PROCESS_LIST.to_string(), process_list);
292 }
293 for name in MEMORY_TABLES.iter() {
295 tables.insert((*name).to_string(), self.build_table(name).expect(name));
296 }
297
298 self.tables = tables;
299 }
300}
301
302trait InformationTable {
303 fn table_id(&self) -> TableId;
304
305 fn table_name(&self) -> &'static str;
306
307 fn schema(&self) -> SchemaRef;
308
309 fn to_stream(&self, request: ScanRequest) -> Result<SendableRecordBatchStream>;
310
311 fn table_type(&self) -> TableType {
312 TableType::Temporary
313 }
314}
315
316impl<T> SystemTable for T
318where
319 T: InformationTable,
320{
321 fn table_id(&self) -> TableId {
322 InformationTable::table_id(self)
323 }
324
325 fn table_name(&self) -> &'static str {
326 InformationTable::table_name(self)
327 }
328
329 fn schema(&self) -> SchemaRef {
330 InformationTable::schema(self)
331 }
332
333 fn table_type(&self) -> TableType {
334 InformationTable::table_type(self)
335 }
336
337 fn to_stream(&self, request: ScanRequest) -> Result<SendableRecordBatchStream> {
338 InformationTable::to_stream(self, request)
339 }
340}
341
342pub type InformationExtensionRef = Arc<dyn InformationExtension<Error = Error> + Send + Sync>;
343
344#[async_trait::async_trait]
346pub trait InformationExtension {
347 type Error: ErrorExt;
348
349 async fn nodes(&self) -> std::result::Result<Vec<NodeInfo>, Self::Error>;
351
352 async fn procedures(&self) -> std::result::Result<Vec<(String, ProcedureInfo)>, Self::Error>;
354
355 async fn region_stats(&self) -> std::result::Result<Vec<RegionStat>, Self::Error>;
357
358 async fn flow_stats(&self) -> std::result::Result<Option<FlowStat>, Self::Error>;
360}
361
362pub struct NoopInformationExtension;
363
364#[async_trait::async_trait]
365impl InformationExtension for NoopInformationExtension {
366 type Error = Error;
367
368 async fn nodes(&self) -> std::result::Result<Vec<NodeInfo>, Self::Error> {
369 Ok(vec![])
370 }
371
372 async fn procedures(&self) -> std::result::Result<Vec<(String, ProcedureInfo)>, Self::Error> {
373 Ok(vec![])
374 }
375
376 async fn region_stats(&self) -> std::result::Result<Vec<RegionStat>, Self::Error> {
377 Ok(vec![])
378 }
379
380 async fn flow_stats(&self) -> std::result::Result<Option<FlowStat>, Self::Error> {
381 Ok(None)
382 }
383}