1mod cluster_info;
16pub mod columns;
17pub mod flows;
18mod information_memory_table;
19pub mod key_column_usage;
20mod partitions;
21mod procedure_info;
22pub mod process_list;
23pub mod region_peers;
24mod region_statistics;
25mod runtime_metrics;
26pub mod schemata;
27mod table_constraints;
28mod table_names;
29pub mod tables;
30mod views;
31
32use std::collections::HashMap;
33use std::sync::{Arc, Weak};
34
35use common_catalog::consts::{self, DEFAULT_CATALOG_NAME, INFORMATION_SCHEMA_NAME};
36use common_error::ext::ErrorExt;
37use common_meta::cluster::NodeInfo;
38use common_meta::datanode::RegionStat;
39use common_meta::key::flow::flow_state::FlowStat;
40use common_meta::key::flow::FlowMetadataManager;
41use common_meta::kv_backend::KvBackendRef;
42use common_procedure::ProcedureInfo;
43use common_recordbatch::SendableRecordBatchStream;
44use datatypes::schema::SchemaRef;
45use lazy_static::lazy_static;
46use paste::paste;
47use process_list::InformationSchemaProcessList;
48use store_api::storage::{ScanRequest, TableId};
49use table::metadata::TableType;
50use table::TableRef;
51pub use table_names::*;
52use views::InformationSchemaViews;
53
54use self::columns::InformationSchemaColumns;
55use crate::error::{Error, Result};
56use crate::process_manager::ProcessManagerRef;
57use crate::system_schema::information_schema::cluster_info::InformationSchemaClusterInfo;
58use crate::system_schema::information_schema::flows::InformationSchemaFlows;
59use crate::system_schema::information_schema::information_memory_table::get_schema_columns;
60use crate::system_schema::information_schema::key_column_usage::InformationSchemaKeyColumnUsage;
61use crate::system_schema::information_schema::partitions::InformationSchemaPartitions;
62use crate::system_schema::information_schema::region_peers::InformationSchemaRegionPeers;
63use crate::system_schema::information_schema::runtime_metrics::InformationSchemaMetrics;
64use crate::system_schema::information_schema::schemata::InformationSchemaSchemata;
65use crate::system_schema::information_schema::table_constraints::InformationSchemaTableConstraints;
66use crate::system_schema::information_schema::tables::InformationSchemaTables;
67use crate::system_schema::memory_table::MemoryTable;
68pub(crate) use crate::system_schema::predicate::Predicates;
69use crate::system_schema::{
70 SystemSchemaProvider, SystemSchemaProviderInner, SystemTable, SystemTableRef,
71};
72use crate::CatalogManager;
73
74lazy_static! {
75 static ref MEMORY_TABLES: &'static [&'static str] = &[
77 ENGINES,
78 COLUMN_PRIVILEGES,
79 COLUMN_STATISTICS,
80 CHARACTER_SETS,
81 COLLATIONS,
82 COLLATION_CHARACTER_SET_APPLICABILITY,
83 CHECK_CONSTRAINTS,
84 EVENTS,
85 FILES,
86 OPTIMIZER_TRACE,
87 PARAMETERS,
88 PROFILING,
89 REFERENTIAL_CONSTRAINTS,
90 ROUTINES,
91 SCHEMA_PRIVILEGES,
92 TABLE_PRIVILEGES,
93 TRIGGERS,
94 GLOBAL_STATUS,
95 SESSION_STATUS,
96 PARTITIONS,
97 ];
98}
99
100macro_rules! setup_memory_table {
101 ($name: expr) => {
102 paste! {
103 {
104 let (schema, columns) = get_schema_columns($name);
105 Some(Arc::new(MemoryTable::new(
106 consts::[<INFORMATION_SCHEMA_ $name _TABLE_ID>],
107 $name,
108 schema,
109 columns
110 )) as _)
111 }
112 }
113 };
114}
115
116#[cfg(feature = "enterprise")]
117pub struct MakeInformationTableRequest {
118 pub catalog_name: String,
119 pub catalog_manager: Weak<dyn CatalogManager>,
120 pub kv_backend: KvBackendRef,
121}
122
123#[cfg(feature = "enterprise")]
128pub trait InformationSchemaTableFactory {
129 fn make_information_table(&self, req: MakeInformationTableRequest) -> SystemTableRef;
130}
131
132#[cfg(feature = "enterprise")]
133pub type InformationSchemaTableFactoryRef = Arc<dyn InformationSchemaTableFactory + Send + Sync>;
134
135pub struct InformationSchemaProvider {
137 catalog_name: String,
138 catalog_manager: Weak<dyn CatalogManager>,
139 process_manager: Option<ProcessManagerRef>,
140 flow_metadata_manager: Arc<FlowMetadataManager>,
141 tables: HashMap<String, TableRef>,
142 #[allow(dead_code)]
143 kv_backend: KvBackendRef,
144 #[cfg(feature = "enterprise")]
145 extra_table_factories: HashMap<String, InformationSchemaTableFactoryRef>,
146}
147
148impl SystemSchemaProvider for InformationSchemaProvider {
149 fn tables(&self) -> &HashMap<String, TableRef> {
150 assert!(!self.tables.is_empty());
151
152 &self.tables
153 }
154}
155
156impl SystemSchemaProviderInner for InformationSchemaProvider {
157 fn catalog_name(&self) -> &str {
158 &self.catalog_name
159 }
160 fn schema_name() -> &'static str {
161 INFORMATION_SCHEMA_NAME
162 }
163
164 fn system_table(&self, name: &str) -> Option<SystemTableRef> {
165 match name.to_ascii_lowercase().as_str() {
166 TABLES => Some(Arc::new(InformationSchemaTables::new(
167 self.catalog_name.clone(),
168 self.catalog_manager.clone(),
169 )) as _),
170 COLUMNS => Some(Arc::new(InformationSchemaColumns::new(
171 self.catalog_name.clone(),
172 self.catalog_manager.clone(),
173 )) as _),
174 ENGINES => setup_memory_table!(ENGINES),
175 COLUMN_PRIVILEGES => setup_memory_table!(COLUMN_PRIVILEGES),
176 COLUMN_STATISTICS => setup_memory_table!(COLUMN_STATISTICS),
177 BUILD_INFO => setup_memory_table!(BUILD_INFO),
178 CHARACTER_SETS => setup_memory_table!(CHARACTER_SETS),
179 COLLATIONS => setup_memory_table!(COLLATIONS),
180 COLLATION_CHARACTER_SET_APPLICABILITY => {
181 setup_memory_table!(COLLATION_CHARACTER_SET_APPLICABILITY)
182 }
183 CHECK_CONSTRAINTS => setup_memory_table!(CHECK_CONSTRAINTS),
184 EVENTS => setup_memory_table!(EVENTS),
185 FILES => setup_memory_table!(FILES),
186 OPTIMIZER_TRACE => setup_memory_table!(OPTIMIZER_TRACE),
187 PARAMETERS => setup_memory_table!(PARAMETERS),
188 PROFILING => setup_memory_table!(PROFILING),
189 REFERENTIAL_CONSTRAINTS => setup_memory_table!(REFERENTIAL_CONSTRAINTS),
190 ROUTINES => setup_memory_table!(ROUTINES),
191 SCHEMA_PRIVILEGES => setup_memory_table!(SCHEMA_PRIVILEGES),
192 TABLE_PRIVILEGES => setup_memory_table!(TABLE_PRIVILEGES),
193 TRIGGERS => setup_memory_table!(TRIGGERS),
194 GLOBAL_STATUS => setup_memory_table!(GLOBAL_STATUS),
195 SESSION_STATUS => setup_memory_table!(SESSION_STATUS),
196 KEY_COLUMN_USAGE => Some(Arc::new(InformationSchemaKeyColumnUsage::new(
197 self.catalog_name.clone(),
198 self.catalog_manager.clone(),
199 )) as _),
200 SCHEMATA => Some(Arc::new(InformationSchemaSchemata::new(
201 self.catalog_name.clone(),
202 self.catalog_manager.clone(),
203 )) as _),
204 RUNTIME_METRICS => Some(Arc::new(InformationSchemaMetrics::new())),
205 PARTITIONS => Some(Arc::new(InformationSchemaPartitions::new(
206 self.catalog_name.clone(),
207 self.catalog_manager.clone(),
208 )) as _),
209 REGION_PEERS => Some(Arc::new(InformationSchemaRegionPeers::new(
210 self.catalog_name.clone(),
211 self.catalog_manager.clone(),
212 )) as _),
213 TABLE_CONSTRAINTS => Some(Arc::new(InformationSchemaTableConstraints::new(
214 self.catalog_name.clone(),
215 self.catalog_manager.clone(),
216 )) as _),
217 CLUSTER_INFO => Some(Arc::new(InformationSchemaClusterInfo::new(
218 self.catalog_manager.clone(),
219 )) as _),
220 VIEWS => Some(Arc::new(InformationSchemaViews::new(
221 self.catalog_name.clone(),
222 self.catalog_manager.clone(),
223 )) as _),
224 FLOWS => Some(Arc::new(InformationSchemaFlows::new(
225 self.catalog_name.clone(),
226 self.catalog_manager.clone(),
227 self.flow_metadata_manager.clone(),
228 )) as _),
229 PROCEDURE_INFO => Some(
230 Arc::new(procedure_info::InformationSchemaProcedureInfo::new(
231 self.catalog_manager.clone(),
232 )) as _,
233 ),
234 REGION_STATISTICS => Some(Arc::new(
235 region_statistics::InformationSchemaRegionStatistics::new(
236 self.catalog_manager.clone(),
237 ),
238 ) as _),
239 PROCESS_LIST => self
240 .process_manager
241 .as_ref()
242 .map(|p| Arc::new(InformationSchemaProcessList::new(p.clone())) as _),
243 table_name => {
244 #[cfg(feature = "enterprise")]
245 return self.extra_table_factories.get(table_name).map(|factory| {
246 let req = MakeInformationTableRequest {
247 catalog_name: self.catalog_name.clone(),
248 catalog_manager: self.catalog_manager.clone(),
249 kv_backend: self.kv_backend.clone(),
250 };
251 factory.make_information_table(req)
252 });
253 #[cfg(not(feature = "enterprise"))]
254 {
255 let _ = table_name;
256 None
257 }
258 }
259 }
260 }
261}
262
263impl InformationSchemaProvider {
264 pub fn new(
265 catalog_name: String,
266 catalog_manager: Weak<dyn CatalogManager>,
267 flow_metadata_manager: Arc<FlowMetadataManager>,
268 process_manager: Option<ProcessManagerRef>,
269 kv_backend: KvBackendRef,
270 ) -> Self {
271 let mut provider = Self {
272 catalog_name,
273 catalog_manager,
274 flow_metadata_manager,
275 process_manager,
276 tables: HashMap::new(),
277 kv_backend,
278 #[cfg(feature = "enterprise")]
279 extra_table_factories: HashMap::new(),
280 };
281
282 provider.build_tables();
283
284 provider
285 }
286
287 #[cfg(feature = "enterprise")]
288 pub(crate) fn with_extra_table_factories(
289 mut self,
290 factories: HashMap<String, InformationSchemaTableFactoryRef>,
291 ) -> Self {
292 self.extra_table_factories = factories;
293 self.build_tables();
294 self
295 }
296
297 fn build_tables(&mut self) {
298 let mut tables = HashMap::new();
299
300 if self.catalog_name == DEFAULT_CATALOG_NAME {
305 tables.insert(
306 RUNTIME_METRICS.to_string(),
307 self.build_table(RUNTIME_METRICS).unwrap(),
308 );
309 tables.insert(
310 BUILD_INFO.to_string(),
311 self.build_table(BUILD_INFO).unwrap(),
312 );
313 tables.insert(
314 REGION_PEERS.to_string(),
315 self.build_table(REGION_PEERS).unwrap(),
316 );
317 tables.insert(
318 CLUSTER_INFO.to_string(),
319 self.build_table(CLUSTER_INFO).unwrap(),
320 );
321 tables.insert(
322 PROCEDURE_INFO.to_string(),
323 self.build_table(PROCEDURE_INFO).unwrap(),
324 );
325 tables.insert(
326 REGION_STATISTICS.to_string(),
327 self.build_table(REGION_STATISTICS).unwrap(),
328 );
329 }
330
331 tables.insert(TABLES.to_string(), self.build_table(TABLES).unwrap());
332 tables.insert(VIEWS.to_string(), self.build_table(VIEWS).unwrap());
333 tables.insert(SCHEMATA.to_string(), self.build_table(SCHEMATA).unwrap());
334 tables.insert(COLUMNS.to_string(), self.build_table(COLUMNS).unwrap());
335 tables.insert(
336 KEY_COLUMN_USAGE.to_string(),
337 self.build_table(KEY_COLUMN_USAGE).unwrap(),
338 );
339 tables.insert(
340 TABLE_CONSTRAINTS.to_string(),
341 self.build_table(TABLE_CONSTRAINTS).unwrap(),
342 );
343 tables.insert(FLOWS.to_string(), self.build_table(FLOWS).unwrap());
344 if let Some(process_list) = self.build_table(PROCESS_LIST) {
345 tables.insert(PROCESS_LIST.to_string(), process_list);
346 }
347 #[cfg(feature = "enterprise")]
348 for name in self.extra_table_factories.keys() {
349 tables.insert(name.to_string(), self.build_table(name).expect(name));
350 }
351 for name in MEMORY_TABLES.iter() {
353 tables.insert((*name).to_string(), self.build_table(name).expect(name));
354 }
355 self.tables = tables;
356 }
357}
358
359pub trait InformationTable {
360 fn table_id(&self) -> TableId;
361
362 fn table_name(&self) -> &'static str;
363
364 fn schema(&self) -> SchemaRef;
365
366 fn to_stream(&self, request: ScanRequest) -> Result<SendableRecordBatchStream>;
367
368 fn table_type(&self) -> TableType {
369 TableType::Temporary
370 }
371}
372
373impl<T> SystemTable for T
375where
376 T: InformationTable,
377{
378 fn table_id(&self) -> TableId {
379 InformationTable::table_id(self)
380 }
381
382 fn table_name(&self) -> &'static str {
383 InformationTable::table_name(self)
384 }
385
386 fn schema(&self) -> SchemaRef {
387 InformationTable::schema(self)
388 }
389
390 fn table_type(&self) -> TableType {
391 InformationTable::table_type(self)
392 }
393
394 fn to_stream(&self, request: ScanRequest) -> Result<SendableRecordBatchStream> {
395 InformationTable::to_stream(self, request)
396 }
397}
398
399pub type InformationExtensionRef = Arc<dyn InformationExtension<Error = Error> + Send + Sync>;
400
401#[async_trait::async_trait]
403pub trait InformationExtension {
404 type Error: ErrorExt;
405
406 async fn nodes(&self) -> std::result::Result<Vec<NodeInfo>, Self::Error>;
408
409 async fn procedures(&self) -> std::result::Result<Vec<(String, ProcedureInfo)>, Self::Error>;
411
412 async fn region_stats(&self) -> std::result::Result<Vec<RegionStat>, Self::Error>;
414
415 async fn flow_stats(&self) -> std::result::Result<Option<FlowStat>, Self::Error>;
417}
418
419pub struct NoopInformationExtension;
420
421#[async_trait::async_trait]
422impl InformationExtension for NoopInformationExtension {
423 type Error = Error;
424
425 async fn nodes(&self) -> std::result::Result<Vec<NodeInfo>, Self::Error> {
426 Ok(vec![])
427 }
428
429 async fn procedures(&self) -> std::result::Result<Vec<(String, ProcedureInfo)>, Self::Error> {
430 Ok(vec![])
431 }
432
433 async fn region_stats(&self) -> std::result::Result<Vec<RegionStat>, Self::Error> {
434 Ok(vec![])
435 }
436
437 async fn flow_stats(&self) -> std::result::Result<Option<FlowStat>, Self::Error> {
438 Ok(None)
439 }
440}