1mod cluster_info;
16pub mod columns;
17pub mod flows;
18mod information_memory_table;
19pub mod key_column_usage;
20mod partitions;
21mod procedure_info;
22pub mod region_peers;
23mod region_statistics;
24mod runtime_metrics;
25pub mod schemata;
26mod table_constraints;
27mod table_names;
28pub mod tables;
29mod views;
30
31use std::collections::HashMap;
32use std::sync::{Arc, Weak};
33
34use common_catalog::consts::{self, DEFAULT_CATALOG_NAME, INFORMATION_SCHEMA_NAME};
35use common_error::ext::ErrorExt;
36use common_meta::cluster::NodeInfo;
37use common_meta::datanode::RegionStat;
38use common_meta::key::flow::flow_state::FlowStat;
39use common_meta::key::flow::FlowMetadataManager;
40use common_procedure::ProcedureInfo;
41use common_recordbatch::SendableRecordBatchStream;
42use datatypes::schema::SchemaRef;
43use lazy_static::lazy_static;
44use paste::paste;
45use store_api::storage::{ScanRequest, TableId};
46use table::metadata::TableType;
47use table::TableRef;
48pub use table_names::*;
49use views::InformationSchemaViews;
50
51use self::columns::InformationSchemaColumns;
52use crate::error::{Error, Result};
53use crate::system_schema::information_schema::cluster_info::InformationSchemaClusterInfo;
54use crate::system_schema::information_schema::flows::InformationSchemaFlows;
55use crate::system_schema::information_schema::information_memory_table::get_schema_columns;
56use crate::system_schema::information_schema::key_column_usage::InformationSchemaKeyColumnUsage;
57use crate::system_schema::information_schema::partitions::InformationSchemaPartitions;
58use crate::system_schema::information_schema::region_peers::InformationSchemaRegionPeers;
59use crate::system_schema::information_schema::runtime_metrics::InformationSchemaMetrics;
60use crate::system_schema::information_schema::schemata::InformationSchemaSchemata;
61use crate::system_schema::information_schema::table_constraints::InformationSchemaTableConstraints;
62use crate::system_schema::information_schema::tables::InformationSchemaTables;
63use crate::system_schema::memory_table::MemoryTable;
64pub(crate) use crate::system_schema::predicate::Predicates;
65use crate::system_schema::{
66 SystemSchemaProvider, SystemSchemaProviderInner, SystemTable, SystemTableRef,
67};
68use crate::CatalogManager;
69
70lazy_static! {
71 static ref MEMORY_TABLES: &'static [&'static str] = &[
73 ENGINES,
74 COLUMN_PRIVILEGES,
75 COLUMN_STATISTICS,
76 CHARACTER_SETS,
77 COLLATIONS,
78 COLLATION_CHARACTER_SET_APPLICABILITY,
79 CHECK_CONSTRAINTS,
80 EVENTS,
81 FILES,
82 OPTIMIZER_TRACE,
83 PARAMETERS,
84 PROFILING,
85 REFERENTIAL_CONSTRAINTS,
86 ROUTINES,
87 SCHEMA_PRIVILEGES,
88 TABLE_PRIVILEGES,
89 TRIGGERS,
90 GLOBAL_STATUS,
91 SESSION_STATUS,
92 PARTITIONS,
93 ];
94}
95
96macro_rules! setup_memory_table {
97 ($name: expr) => {
98 paste! {
99 {
100 let (schema, columns) = get_schema_columns($name);
101 Some(Arc::new(MemoryTable::new(
102 consts::[<INFORMATION_SCHEMA_ $name _TABLE_ID>],
103 $name,
104 schema,
105 columns
106 )) as _)
107 }
108 }
109 };
110}
111
112pub struct InformationSchemaProvider {
114 catalog_name: String,
115 catalog_manager: Weak<dyn CatalogManager>,
116 flow_metadata_manager: Arc<FlowMetadataManager>,
117 tables: HashMap<String, TableRef>,
118}
119
120impl SystemSchemaProvider for InformationSchemaProvider {
121 fn tables(&self) -> &HashMap<String, TableRef> {
122 assert!(!self.tables.is_empty());
123
124 &self.tables
125 }
126}
127impl SystemSchemaProviderInner for InformationSchemaProvider {
128 fn catalog_name(&self) -> &str {
129 &self.catalog_name
130 }
131 fn schema_name() -> &'static str {
132 INFORMATION_SCHEMA_NAME
133 }
134
135 fn system_table(&self, name: &str) -> Option<SystemTableRef> {
136 match name.to_ascii_lowercase().as_str() {
137 TABLES => Some(Arc::new(InformationSchemaTables::new(
138 self.catalog_name.clone(),
139 self.catalog_manager.clone(),
140 )) as _),
141 COLUMNS => Some(Arc::new(InformationSchemaColumns::new(
142 self.catalog_name.clone(),
143 self.catalog_manager.clone(),
144 )) as _),
145 ENGINES => setup_memory_table!(ENGINES),
146 COLUMN_PRIVILEGES => setup_memory_table!(COLUMN_PRIVILEGES),
147 COLUMN_STATISTICS => setup_memory_table!(COLUMN_STATISTICS),
148 BUILD_INFO => setup_memory_table!(BUILD_INFO),
149 CHARACTER_SETS => setup_memory_table!(CHARACTER_SETS),
150 COLLATIONS => setup_memory_table!(COLLATIONS),
151 COLLATION_CHARACTER_SET_APPLICABILITY => {
152 setup_memory_table!(COLLATION_CHARACTER_SET_APPLICABILITY)
153 }
154 CHECK_CONSTRAINTS => setup_memory_table!(CHECK_CONSTRAINTS),
155 EVENTS => setup_memory_table!(EVENTS),
156 FILES => setup_memory_table!(FILES),
157 OPTIMIZER_TRACE => setup_memory_table!(OPTIMIZER_TRACE),
158 PARAMETERS => setup_memory_table!(PARAMETERS),
159 PROFILING => setup_memory_table!(PROFILING),
160 REFERENTIAL_CONSTRAINTS => setup_memory_table!(REFERENTIAL_CONSTRAINTS),
161 ROUTINES => setup_memory_table!(ROUTINES),
162 SCHEMA_PRIVILEGES => setup_memory_table!(SCHEMA_PRIVILEGES),
163 TABLE_PRIVILEGES => setup_memory_table!(TABLE_PRIVILEGES),
164 TRIGGERS => setup_memory_table!(TRIGGERS),
165 GLOBAL_STATUS => setup_memory_table!(GLOBAL_STATUS),
166 SESSION_STATUS => setup_memory_table!(SESSION_STATUS),
167 KEY_COLUMN_USAGE => Some(Arc::new(InformationSchemaKeyColumnUsage::new(
168 self.catalog_name.clone(),
169 self.catalog_manager.clone(),
170 )) as _),
171 SCHEMATA => Some(Arc::new(InformationSchemaSchemata::new(
172 self.catalog_name.clone(),
173 self.catalog_manager.clone(),
174 )) as _),
175 RUNTIME_METRICS => Some(Arc::new(InformationSchemaMetrics::new())),
176 PARTITIONS => Some(Arc::new(InformationSchemaPartitions::new(
177 self.catalog_name.clone(),
178 self.catalog_manager.clone(),
179 )) as _),
180 REGION_PEERS => Some(Arc::new(InformationSchemaRegionPeers::new(
181 self.catalog_name.clone(),
182 self.catalog_manager.clone(),
183 )) as _),
184 TABLE_CONSTRAINTS => Some(Arc::new(InformationSchemaTableConstraints::new(
185 self.catalog_name.clone(),
186 self.catalog_manager.clone(),
187 )) as _),
188 CLUSTER_INFO => Some(Arc::new(InformationSchemaClusterInfo::new(
189 self.catalog_manager.clone(),
190 )) as _),
191 VIEWS => Some(Arc::new(InformationSchemaViews::new(
192 self.catalog_name.clone(),
193 self.catalog_manager.clone(),
194 )) as _),
195 FLOWS => Some(Arc::new(InformationSchemaFlows::new(
196 self.catalog_name.clone(),
197 self.catalog_manager.clone(),
198 self.flow_metadata_manager.clone(),
199 )) as _),
200 PROCEDURE_INFO => Some(
201 Arc::new(procedure_info::InformationSchemaProcedureInfo::new(
202 self.catalog_manager.clone(),
203 )) as _,
204 ),
205 REGION_STATISTICS => Some(Arc::new(
206 region_statistics::InformationSchemaRegionStatistics::new(
207 self.catalog_manager.clone(),
208 ),
209 ) as _),
210 _ => None,
211 }
212 }
213}
214
215impl InformationSchemaProvider {
216 pub fn new(
217 catalog_name: String,
218 catalog_manager: Weak<dyn CatalogManager>,
219 flow_metadata_manager: Arc<FlowMetadataManager>,
220 ) -> Self {
221 let mut provider = Self {
222 catalog_name,
223 catalog_manager,
224 flow_metadata_manager,
225 tables: HashMap::new(),
226 };
227
228 provider.build_tables();
229
230 provider
231 }
232
233 fn build_tables(&mut self) {
234 let mut tables = HashMap::new();
235
236 if self.catalog_name == DEFAULT_CATALOG_NAME {
241 tables.insert(
242 RUNTIME_METRICS.to_string(),
243 self.build_table(RUNTIME_METRICS).unwrap(),
244 );
245 tables.insert(
246 BUILD_INFO.to_string(),
247 self.build_table(BUILD_INFO).unwrap(),
248 );
249 tables.insert(
250 REGION_PEERS.to_string(),
251 self.build_table(REGION_PEERS).unwrap(),
252 );
253 tables.insert(
254 CLUSTER_INFO.to_string(),
255 self.build_table(CLUSTER_INFO).unwrap(),
256 );
257 tables.insert(
258 PROCEDURE_INFO.to_string(),
259 self.build_table(PROCEDURE_INFO).unwrap(),
260 );
261 tables.insert(
262 REGION_STATISTICS.to_string(),
263 self.build_table(REGION_STATISTICS).unwrap(),
264 );
265 }
266
267 tables.insert(TABLES.to_string(), self.build_table(TABLES).unwrap());
268 tables.insert(VIEWS.to_string(), self.build_table(VIEWS).unwrap());
269 tables.insert(SCHEMATA.to_string(), self.build_table(SCHEMATA).unwrap());
270 tables.insert(COLUMNS.to_string(), self.build_table(COLUMNS).unwrap());
271 tables.insert(
272 KEY_COLUMN_USAGE.to_string(),
273 self.build_table(KEY_COLUMN_USAGE).unwrap(),
274 );
275 tables.insert(
276 TABLE_CONSTRAINTS.to_string(),
277 self.build_table(TABLE_CONSTRAINTS).unwrap(),
278 );
279 tables.insert(FLOWS.to_string(), self.build_table(FLOWS).unwrap());
280 for name in MEMORY_TABLES.iter() {
282 tables.insert((*name).to_string(), self.build_table(name).expect(name));
283 }
284
285 self.tables = tables;
286 }
287}
288
289trait InformationTable {
290 fn table_id(&self) -> TableId;
291
292 fn table_name(&self) -> &'static str;
293
294 fn schema(&self) -> SchemaRef;
295
296 fn to_stream(&self, request: ScanRequest) -> Result<SendableRecordBatchStream>;
297
298 fn table_type(&self) -> TableType {
299 TableType::Temporary
300 }
301}
302
303impl<T> SystemTable for T
305where
306 T: InformationTable,
307{
308 fn table_id(&self) -> TableId {
309 InformationTable::table_id(self)
310 }
311
312 fn table_name(&self) -> &'static str {
313 InformationTable::table_name(self)
314 }
315
316 fn schema(&self) -> SchemaRef {
317 InformationTable::schema(self)
318 }
319
320 fn table_type(&self) -> TableType {
321 InformationTable::table_type(self)
322 }
323
324 fn to_stream(&self, request: ScanRequest) -> Result<SendableRecordBatchStream> {
325 InformationTable::to_stream(self, request)
326 }
327}
328
329pub type InformationExtensionRef = Arc<dyn InformationExtension<Error = Error> + Send + Sync>;
330
331#[async_trait::async_trait]
333pub trait InformationExtension {
334 type Error: ErrorExt;
335
336 async fn nodes(&self) -> std::result::Result<Vec<NodeInfo>, Self::Error>;
338
339 async fn procedures(&self) -> std::result::Result<Vec<(String, ProcedureInfo)>, Self::Error>;
341
342 async fn region_stats(&self) -> std::result::Result<Vec<RegionStat>, Self::Error>;
344
345 async fn flow_stats(&self) -> std::result::Result<Option<FlowStat>, Self::Error>;
347}
348
349pub struct NoopInformationExtension;
350
351#[async_trait::async_trait]
352impl InformationExtension for NoopInformationExtension {
353 type Error = Error;
354
355 async fn nodes(&self) -> std::result::Result<Vec<NodeInfo>, Self::Error> {
356 Ok(vec![])
357 }
358
359 async fn procedures(&self) -> std::result::Result<Vec<(String, ProcedureInfo)>, Self::Error> {
360 Ok(vec![])
361 }
362
363 async fn region_stats(&self) -> std::result::Result<Vec<RegionStat>, Self::Error> {
364 Ok(vec![])
365 }
366
367 async fn flow_stats(&self) -> std::result::Result<Option<FlowStat>, Self::Error> {
368 Ok(None)
369 }
370}