catalog/system_schema/
information_schema.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod cluster_info;
16pub mod columns;
17pub mod flows;
18mod information_memory_table;
19pub mod key_column_usage;
20mod partitions;
21mod procedure_info;
22pub mod process_list;
23pub mod region_peers;
24mod region_statistics;
25mod runtime_metrics;
26pub mod schemata;
27mod table_constraints;
28mod table_names;
29pub mod tables;
30mod views;
31
32use std::collections::HashMap;
33use std::sync::{Arc, Weak};
34
35use common_catalog::consts::{self, DEFAULT_CATALOG_NAME, INFORMATION_SCHEMA_NAME};
36use common_error::ext::ErrorExt;
37use common_meta::cluster::NodeInfo;
38use common_meta::datanode::RegionStat;
39use common_meta::key::flow::flow_state::FlowStat;
40use common_meta::key::flow::FlowMetadataManager;
41use common_procedure::ProcedureInfo;
42use common_recordbatch::SendableRecordBatchStream;
43use datatypes::schema::SchemaRef;
44use lazy_static::lazy_static;
45use paste::paste;
46use process_list::InformationSchemaProcessList;
47use store_api::storage::{ScanRequest, TableId};
48use table::metadata::TableType;
49use table::TableRef;
50pub use table_names::*;
51use views::InformationSchemaViews;
52
53use self::columns::InformationSchemaColumns;
54use crate::error::{Error, Result};
55use crate::process_manager::ProcessManagerRef;
56use crate::system_schema::information_schema::cluster_info::InformationSchemaClusterInfo;
57use crate::system_schema::information_schema::flows::InformationSchemaFlows;
58use crate::system_schema::information_schema::information_memory_table::get_schema_columns;
59use crate::system_schema::information_schema::key_column_usage::InformationSchemaKeyColumnUsage;
60use crate::system_schema::information_schema::partitions::InformationSchemaPartitions;
61use crate::system_schema::information_schema::region_peers::InformationSchemaRegionPeers;
62use crate::system_schema::information_schema::runtime_metrics::InformationSchemaMetrics;
63use crate::system_schema::information_schema::schemata::InformationSchemaSchemata;
64use crate::system_schema::information_schema::table_constraints::InformationSchemaTableConstraints;
65use crate::system_schema::information_schema::tables::InformationSchemaTables;
66use crate::system_schema::memory_table::MemoryTable;
67pub(crate) use crate::system_schema::predicate::Predicates;
68use crate::system_schema::{
69    SystemSchemaProvider, SystemSchemaProviderInner, SystemTable, SystemTableRef,
70};
71use crate::CatalogManager;
72
73lazy_static! {
74    // Memory tables in `information_schema`.
75    static ref MEMORY_TABLES: &'static [&'static str] = &[
76        ENGINES,
77        COLUMN_PRIVILEGES,
78        COLUMN_STATISTICS,
79        CHARACTER_SETS,
80        COLLATIONS,
81        COLLATION_CHARACTER_SET_APPLICABILITY,
82        CHECK_CONSTRAINTS,
83        EVENTS,
84        FILES,
85        OPTIMIZER_TRACE,
86        PARAMETERS,
87        PROFILING,
88        REFERENTIAL_CONSTRAINTS,
89        ROUTINES,
90        SCHEMA_PRIVILEGES,
91        TABLE_PRIVILEGES,
92        TRIGGERS,
93        GLOBAL_STATUS,
94        SESSION_STATUS,
95        PARTITIONS,
96    ];
97}
98
99macro_rules! setup_memory_table {
100    ($name: expr) => {
101        paste! {
102            {
103                let (schema, columns) = get_schema_columns($name);
104                Some(Arc::new(MemoryTable::new(
105                    consts::[<INFORMATION_SCHEMA_ $name  _TABLE_ID>],
106                    $name,
107                    schema,
108                    columns
109                )) as _)
110            }
111        }
112    };
113}
114
115/// The `information_schema` tables info provider.
116pub struct InformationSchemaProvider {
117    catalog_name: String,
118    catalog_manager: Weak<dyn CatalogManager>,
119    process_manager: Option<ProcessManagerRef>,
120    flow_metadata_manager: Arc<FlowMetadataManager>,
121    tables: HashMap<String, TableRef>,
122}
123
124impl SystemSchemaProvider for InformationSchemaProvider {
125    fn tables(&self) -> &HashMap<String, TableRef> {
126        assert!(!self.tables.is_empty());
127
128        &self.tables
129    }
130}
131impl SystemSchemaProviderInner for InformationSchemaProvider {
132    fn catalog_name(&self) -> &str {
133        &self.catalog_name
134    }
135    fn schema_name() -> &'static str {
136        INFORMATION_SCHEMA_NAME
137    }
138
139    fn system_table(&self, name: &str) -> Option<SystemTableRef> {
140        match name.to_ascii_lowercase().as_str() {
141            TABLES => Some(Arc::new(InformationSchemaTables::new(
142                self.catalog_name.clone(),
143                self.catalog_manager.clone(),
144            )) as _),
145            COLUMNS => Some(Arc::new(InformationSchemaColumns::new(
146                self.catalog_name.clone(),
147                self.catalog_manager.clone(),
148            )) as _),
149            ENGINES => setup_memory_table!(ENGINES),
150            COLUMN_PRIVILEGES => setup_memory_table!(COLUMN_PRIVILEGES),
151            COLUMN_STATISTICS => setup_memory_table!(COLUMN_STATISTICS),
152            BUILD_INFO => setup_memory_table!(BUILD_INFO),
153            CHARACTER_SETS => setup_memory_table!(CHARACTER_SETS),
154            COLLATIONS => setup_memory_table!(COLLATIONS),
155            COLLATION_CHARACTER_SET_APPLICABILITY => {
156                setup_memory_table!(COLLATION_CHARACTER_SET_APPLICABILITY)
157            }
158            CHECK_CONSTRAINTS => setup_memory_table!(CHECK_CONSTRAINTS),
159            EVENTS => setup_memory_table!(EVENTS),
160            FILES => setup_memory_table!(FILES),
161            OPTIMIZER_TRACE => setup_memory_table!(OPTIMIZER_TRACE),
162            PARAMETERS => setup_memory_table!(PARAMETERS),
163            PROFILING => setup_memory_table!(PROFILING),
164            REFERENTIAL_CONSTRAINTS => setup_memory_table!(REFERENTIAL_CONSTRAINTS),
165            ROUTINES => setup_memory_table!(ROUTINES),
166            SCHEMA_PRIVILEGES => setup_memory_table!(SCHEMA_PRIVILEGES),
167            TABLE_PRIVILEGES => setup_memory_table!(TABLE_PRIVILEGES),
168            TRIGGERS => setup_memory_table!(TRIGGERS),
169            GLOBAL_STATUS => setup_memory_table!(GLOBAL_STATUS),
170            SESSION_STATUS => setup_memory_table!(SESSION_STATUS),
171            KEY_COLUMN_USAGE => Some(Arc::new(InformationSchemaKeyColumnUsage::new(
172                self.catalog_name.clone(),
173                self.catalog_manager.clone(),
174            )) as _),
175            SCHEMATA => Some(Arc::new(InformationSchemaSchemata::new(
176                self.catalog_name.clone(),
177                self.catalog_manager.clone(),
178            )) as _),
179            RUNTIME_METRICS => Some(Arc::new(InformationSchemaMetrics::new())),
180            PARTITIONS => Some(Arc::new(InformationSchemaPartitions::new(
181                self.catalog_name.clone(),
182                self.catalog_manager.clone(),
183            )) as _),
184            REGION_PEERS => Some(Arc::new(InformationSchemaRegionPeers::new(
185                self.catalog_name.clone(),
186                self.catalog_manager.clone(),
187            )) as _),
188            TABLE_CONSTRAINTS => Some(Arc::new(InformationSchemaTableConstraints::new(
189                self.catalog_name.clone(),
190                self.catalog_manager.clone(),
191            )) as _),
192            CLUSTER_INFO => Some(Arc::new(InformationSchemaClusterInfo::new(
193                self.catalog_manager.clone(),
194            )) as _),
195            VIEWS => Some(Arc::new(InformationSchemaViews::new(
196                self.catalog_name.clone(),
197                self.catalog_manager.clone(),
198            )) as _),
199            FLOWS => Some(Arc::new(InformationSchemaFlows::new(
200                self.catalog_name.clone(),
201                self.catalog_manager.clone(),
202                self.flow_metadata_manager.clone(),
203            )) as _),
204            PROCEDURE_INFO => Some(
205                Arc::new(procedure_info::InformationSchemaProcedureInfo::new(
206                    self.catalog_manager.clone(),
207                )) as _,
208            ),
209            REGION_STATISTICS => Some(Arc::new(
210                region_statistics::InformationSchemaRegionStatistics::new(
211                    self.catalog_manager.clone(),
212                ),
213            ) as _),
214            PROCESS_LIST => self
215                .process_manager
216                .as_ref()
217                .map(|p| Arc::new(InformationSchemaProcessList::new(p.clone())) as _),
218            _ => None,
219        }
220    }
221}
222
223impl InformationSchemaProvider {
224    pub fn new(
225        catalog_name: String,
226        catalog_manager: Weak<dyn CatalogManager>,
227        flow_metadata_manager: Arc<FlowMetadataManager>,
228        process_manager: Option<ProcessManagerRef>,
229    ) -> Self {
230        let mut provider = Self {
231            catalog_name,
232            catalog_manager,
233            flow_metadata_manager,
234            process_manager,
235            tables: HashMap::new(),
236        };
237
238        provider.build_tables();
239
240        provider
241    }
242
243    fn build_tables(&mut self) {
244        let mut tables = HashMap::new();
245
246        // SECURITY NOTE:
247        // Carefully consider the tables that may expose sensitive cluster configurations,
248        // authentication details, and other critical information.
249        // Only put these tables under `greptime` catalog to prevent info leak.
250        if self.catalog_name == DEFAULT_CATALOG_NAME {
251            tables.insert(
252                RUNTIME_METRICS.to_string(),
253                self.build_table(RUNTIME_METRICS).unwrap(),
254            );
255            tables.insert(
256                BUILD_INFO.to_string(),
257                self.build_table(BUILD_INFO).unwrap(),
258            );
259            tables.insert(
260                REGION_PEERS.to_string(),
261                self.build_table(REGION_PEERS).unwrap(),
262            );
263            tables.insert(
264                CLUSTER_INFO.to_string(),
265                self.build_table(CLUSTER_INFO).unwrap(),
266            );
267            tables.insert(
268                PROCEDURE_INFO.to_string(),
269                self.build_table(PROCEDURE_INFO).unwrap(),
270            );
271            tables.insert(
272                REGION_STATISTICS.to_string(),
273                self.build_table(REGION_STATISTICS).unwrap(),
274            );
275        }
276
277        tables.insert(TABLES.to_string(), self.build_table(TABLES).unwrap());
278        tables.insert(VIEWS.to_string(), self.build_table(VIEWS).unwrap());
279        tables.insert(SCHEMATA.to_string(), self.build_table(SCHEMATA).unwrap());
280        tables.insert(COLUMNS.to_string(), self.build_table(COLUMNS).unwrap());
281        tables.insert(
282            KEY_COLUMN_USAGE.to_string(),
283            self.build_table(KEY_COLUMN_USAGE).unwrap(),
284        );
285        tables.insert(
286            TABLE_CONSTRAINTS.to_string(),
287            self.build_table(TABLE_CONSTRAINTS).unwrap(),
288        );
289        tables.insert(FLOWS.to_string(), self.build_table(FLOWS).unwrap());
290        if let Some(process_list) = self.build_table(PROCESS_LIST) {
291            tables.insert(PROCESS_LIST.to_string(), process_list);
292        }
293        // Add memory tables
294        for name in MEMORY_TABLES.iter() {
295            tables.insert((*name).to_string(), self.build_table(name).expect(name));
296        }
297
298        self.tables = tables;
299    }
300}
301
302trait InformationTable {
303    fn table_id(&self) -> TableId;
304
305    fn table_name(&self) -> &'static str;
306
307    fn schema(&self) -> SchemaRef;
308
309    fn to_stream(&self, request: ScanRequest) -> Result<SendableRecordBatchStream>;
310
311    fn table_type(&self) -> TableType {
312        TableType::Temporary
313    }
314}
315
316// Provide compatibility for legacy `information_schema` code.
317impl<T> SystemTable for T
318where
319    T: InformationTable,
320{
321    fn table_id(&self) -> TableId {
322        InformationTable::table_id(self)
323    }
324
325    fn table_name(&self) -> &'static str {
326        InformationTable::table_name(self)
327    }
328
329    fn schema(&self) -> SchemaRef {
330        InformationTable::schema(self)
331    }
332
333    fn table_type(&self) -> TableType {
334        InformationTable::table_type(self)
335    }
336
337    fn to_stream(&self, request: ScanRequest) -> Result<SendableRecordBatchStream> {
338        InformationTable::to_stream(self, request)
339    }
340}
341
342pub type InformationExtensionRef = Arc<dyn InformationExtension<Error = Error> + Send + Sync>;
343
344/// The `InformationExtension` trait provides the extension methods for the `information_schema` tables.
345#[async_trait::async_trait]
346pub trait InformationExtension {
347    type Error: ErrorExt;
348
349    /// Gets the nodes information.
350    async fn nodes(&self) -> std::result::Result<Vec<NodeInfo>, Self::Error>;
351
352    /// Gets the procedures information.
353    async fn procedures(&self) -> std::result::Result<Vec<(String, ProcedureInfo)>, Self::Error>;
354
355    /// Gets the region statistics.
356    async fn region_stats(&self) -> std::result::Result<Vec<RegionStat>, Self::Error>;
357
358    /// Get the flow statistics. If no flownode is available, return `None`.
359    async fn flow_stats(&self) -> std::result::Result<Option<FlowStat>, Self::Error>;
360}
361
362pub struct NoopInformationExtension;
363
364#[async_trait::async_trait]
365impl InformationExtension for NoopInformationExtension {
366    type Error = Error;
367
368    async fn nodes(&self) -> std::result::Result<Vec<NodeInfo>, Self::Error> {
369        Ok(vec![])
370    }
371
372    async fn procedures(&self) -> std::result::Result<Vec<(String, ProcedureInfo)>, Self::Error> {
373        Ok(vec![])
374    }
375
376    async fn region_stats(&self) -> std::result::Result<Vec<RegionStat>, Self::Error> {
377        Ok(vec![])
378    }
379
380    async fn flow_stats(&self) -> std::result::Result<Option<FlowStat>, Self::Error> {
381        Ok(None)
382    }
383}