catalog/system_schema/
information_schema.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod cluster_info;
16pub mod columns;
17pub mod flows;
18mod information_memory_table;
19pub mod key_column_usage;
20mod partitions;
21mod procedure_info;
22pub mod region_peers;
23mod region_statistics;
24mod runtime_metrics;
25pub mod schemata;
26mod table_constraints;
27mod table_names;
28pub mod tables;
29mod views;
30
31use std::collections::HashMap;
32use std::sync::{Arc, Weak};
33
34use common_catalog::consts::{self, DEFAULT_CATALOG_NAME, INFORMATION_SCHEMA_NAME};
35use common_error::ext::ErrorExt;
36use common_meta::cluster::NodeInfo;
37use common_meta::datanode::RegionStat;
38use common_meta::key::flow::flow_state::FlowStat;
39use common_meta::key::flow::FlowMetadataManager;
40use common_procedure::ProcedureInfo;
41use common_recordbatch::SendableRecordBatchStream;
42use datatypes::schema::SchemaRef;
43use lazy_static::lazy_static;
44use paste::paste;
45use store_api::storage::{ScanRequest, TableId};
46use table::metadata::TableType;
47use table::TableRef;
48pub use table_names::*;
49use views::InformationSchemaViews;
50
51use self::columns::InformationSchemaColumns;
52use crate::error::{Error, Result};
53use crate::system_schema::information_schema::cluster_info::InformationSchemaClusterInfo;
54use crate::system_schema::information_schema::flows::InformationSchemaFlows;
55use crate::system_schema::information_schema::information_memory_table::get_schema_columns;
56use crate::system_schema::information_schema::key_column_usage::InformationSchemaKeyColumnUsage;
57use crate::system_schema::information_schema::partitions::InformationSchemaPartitions;
58use crate::system_schema::information_schema::region_peers::InformationSchemaRegionPeers;
59use crate::system_schema::information_schema::runtime_metrics::InformationSchemaMetrics;
60use crate::system_schema::information_schema::schemata::InformationSchemaSchemata;
61use crate::system_schema::information_schema::table_constraints::InformationSchemaTableConstraints;
62use crate::system_schema::information_schema::tables::InformationSchemaTables;
63use crate::system_schema::memory_table::MemoryTable;
64pub(crate) use crate::system_schema::predicate::Predicates;
65use crate::system_schema::{
66    SystemSchemaProvider, SystemSchemaProviderInner, SystemTable, SystemTableRef,
67};
68use crate::CatalogManager;
69
70lazy_static! {
71    // Memory tables in `information_schema`.
72    static ref MEMORY_TABLES: &'static [&'static str] = &[
73        ENGINES,
74        COLUMN_PRIVILEGES,
75        COLUMN_STATISTICS,
76        CHARACTER_SETS,
77        COLLATIONS,
78        COLLATION_CHARACTER_SET_APPLICABILITY,
79        CHECK_CONSTRAINTS,
80        EVENTS,
81        FILES,
82        OPTIMIZER_TRACE,
83        PARAMETERS,
84        PROFILING,
85        REFERENTIAL_CONSTRAINTS,
86        ROUTINES,
87        SCHEMA_PRIVILEGES,
88        TABLE_PRIVILEGES,
89        TRIGGERS,
90        GLOBAL_STATUS,
91        SESSION_STATUS,
92        PARTITIONS,
93    ];
94}
95
96macro_rules! setup_memory_table {
97    ($name: expr) => {
98        paste! {
99            {
100                let (schema, columns) = get_schema_columns($name);
101                Some(Arc::new(MemoryTable::new(
102                    consts::[<INFORMATION_SCHEMA_ $name  _TABLE_ID>],
103                    $name,
104                    schema,
105                    columns
106                )) as _)
107            }
108        }
109    };
110}
111
112/// The `information_schema` tables info provider.
113pub struct InformationSchemaProvider {
114    catalog_name: String,
115    catalog_manager: Weak<dyn CatalogManager>,
116    flow_metadata_manager: Arc<FlowMetadataManager>,
117    tables: HashMap<String, TableRef>,
118}
119
120impl SystemSchemaProvider for InformationSchemaProvider {
121    fn tables(&self) -> &HashMap<String, TableRef> {
122        assert!(!self.tables.is_empty());
123
124        &self.tables
125    }
126}
127impl SystemSchemaProviderInner for InformationSchemaProvider {
128    fn catalog_name(&self) -> &str {
129        &self.catalog_name
130    }
131    fn schema_name() -> &'static str {
132        INFORMATION_SCHEMA_NAME
133    }
134
135    fn system_table(&self, name: &str) -> Option<SystemTableRef> {
136        match name.to_ascii_lowercase().as_str() {
137            TABLES => Some(Arc::new(InformationSchemaTables::new(
138                self.catalog_name.clone(),
139                self.catalog_manager.clone(),
140            )) as _),
141            COLUMNS => Some(Arc::new(InformationSchemaColumns::new(
142                self.catalog_name.clone(),
143                self.catalog_manager.clone(),
144            )) as _),
145            ENGINES => setup_memory_table!(ENGINES),
146            COLUMN_PRIVILEGES => setup_memory_table!(COLUMN_PRIVILEGES),
147            COLUMN_STATISTICS => setup_memory_table!(COLUMN_STATISTICS),
148            BUILD_INFO => setup_memory_table!(BUILD_INFO),
149            CHARACTER_SETS => setup_memory_table!(CHARACTER_SETS),
150            COLLATIONS => setup_memory_table!(COLLATIONS),
151            COLLATION_CHARACTER_SET_APPLICABILITY => {
152                setup_memory_table!(COLLATION_CHARACTER_SET_APPLICABILITY)
153            }
154            CHECK_CONSTRAINTS => setup_memory_table!(CHECK_CONSTRAINTS),
155            EVENTS => setup_memory_table!(EVENTS),
156            FILES => setup_memory_table!(FILES),
157            OPTIMIZER_TRACE => setup_memory_table!(OPTIMIZER_TRACE),
158            PARAMETERS => setup_memory_table!(PARAMETERS),
159            PROFILING => setup_memory_table!(PROFILING),
160            REFERENTIAL_CONSTRAINTS => setup_memory_table!(REFERENTIAL_CONSTRAINTS),
161            ROUTINES => setup_memory_table!(ROUTINES),
162            SCHEMA_PRIVILEGES => setup_memory_table!(SCHEMA_PRIVILEGES),
163            TABLE_PRIVILEGES => setup_memory_table!(TABLE_PRIVILEGES),
164            TRIGGERS => setup_memory_table!(TRIGGERS),
165            GLOBAL_STATUS => setup_memory_table!(GLOBAL_STATUS),
166            SESSION_STATUS => setup_memory_table!(SESSION_STATUS),
167            KEY_COLUMN_USAGE => Some(Arc::new(InformationSchemaKeyColumnUsage::new(
168                self.catalog_name.clone(),
169                self.catalog_manager.clone(),
170            )) as _),
171            SCHEMATA => Some(Arc::new(InformationSchemaSchemata::new(
172                self.catalog_name.clone(),
173                self.catalog_manager.clone(),
174            )) as _),
175            RUNTIME_METRICS => Some(Arc::new(InformationSchemaMetrics::new())),
176            PARTITIONS => Some(Arc::new(InformationSchemaPartitions::new(
177                self.catalog_name.clone(),
178                self.catalog_manager.clone(),
179            )) as _),
180            REGION_PEERS => Some(Arc::new(InformationSchemaRegionPeers::new(
181                self.catalog_name.clone(),
182                self.catalog_manager.clone(),
183            )) as _),
184            TABLE_CONSTRAINTS => Some(Arc::new(InformationSchemaTableConstraints::new(
185                self.catalog_name.clone(),
186                self.catalog_manager.clone(),
187            )) as _),
188            CLUSTER_INFO => Some(Arc::new(InformationSchemaClusterInfo::new(
189                self.catalog_manager.clone(),
190            )) as _),
191            VIEWS => Some(Arc::new(InformationSchemaViews::new(
192                self.catalog_name.clone(),
193                self.catalog_manager.clone(),
194            )) as _),
195            FLOWS => Some(Arc::new(InformationSchemaFlows::new(
196                self.catalog_name.clone(),
197                self.catalog_manager.clone(),
198                self.flow_metadata_manager.clone(),
199            )) as _),
200            PROCEDURE_INFO => Some(
201                Arc::new(procedure_info::InformationSchemaProcedureInfo::new(
202                    self.catalog_manager.clone(),
203                )) as _,
204            ),
205            REGION_STATISTICS => Some(Arc::new(
206                region_statistics::InformationSchemaRegionStatistics::new(
207                    self.catalog_manager.clone(),
208                ),
209            ) as _),
210            _ => None,
211        }
212    }
213}
214
215impl InformationSchemaProvider {
216    pub fn new(
217        catalog_name: String,
218        catalog_manager: Weak<dyn CatalogManager>,
219        flow_metadata_manager: Arc<FlowMetadataManager>,
220    ) -> Self {
221        let mut provider = Self {
222            catalog_name,
223            catalog_manager,
224            flow_metadata_manager,
225            tables: HashMap::new(),
226        };
227
228        provider.build_tables();
229
230        provider
231    }
232
233    fn build_tables(&mut self) {
234        let mut tables = HashMap::new();
235
236        // SECURITY NOTE:
237        // Carefully consider the tables that may expose sensitive cluster configurations,
238        // authentication details, and other critical information.
239        // Only put these tables under `greptime` catalog to prevent info leak.
240        if self.catalog_name == DEFAULT_CATALOG_NAME {
241            tables.insert(
242                RUNTIME_METRICS.to_string(),
243                self.build_table(RUNTIME_METRICS).unwrap(),
244            );
245            tables.insert(
246                BUILD_INFO.to_string(),
247                self.build_table(BUILD_INFO).unwrap(),
248            );
249            tables.insert(
250                REGION_PEERS.to_string(),
251                self.build_table(REGION_PEERS).unwrap(),
252            );
253            tables.insert(
254                CLUSTER_INFO.to_string(),
255                self.build_table(CLUSTER_INFO).unwrap(),
256            );
257            tables.insert(
258                PROCEDURE_INFO.to_string(),
259                self.build_table(PROCEDURE_INFO).unwrap(),
260            );
261            tables.insert(
262                REGION_STATISTICS.to_string(),
263                self.build_table(REGION_STATISTICS).unwrap(),
264            );
265        }
266
267        tables.insert(TABLES.to_string(), self.build_table(TABLES).unwrap());
268        tables.insert(VIEWS.to_string(), self.build_table(VIEWS).unwrap());
269        tables.insert(SCHEMATA.to_string(), self.build_table(SCHEMATA).unwrap());
270        tables.insert(COLUMNS.to_string(), self.build_table(COLUMNS).unwrap());
271        tables.insert(
272            KEY_COLUMN_USAGE.to_string(),
273            self.build_table(KEY_COLUMN_USAGE).unwrap(),
274        );
275        tables.insert(
276            TABLE_CONSTRAINTS.to_string(),
277            self.build_table(TABLE_CONSTRAINTS).unwrap(),
278        );
279        tables.insert(FLOWS.to_string(), self.build_table(FLOWS).unwrap());
280        // Add memory tables
281        for name in MEMORY_TABLES.iter() {
282            tables.insert((*name).to_string(), self.build_table(name).expect(name));
283        }
284
285        self.tables = tables;
286    }
287}
288
289trait InformationTable {
290    fn table_id(&self) -> TableId;
291
292    fn table_name(&self) -> &'static str;
293
294    fn schema(&self) -> SchemaRef;
295
296    fn to_stream(&self, request: ScanRequest) -> Result<SendableRecordBatchStream>;
297
298    fn table_type(&self) -> TableType {
299        TableType::Temporary
300    }
301}
302
303// Provide compatibility for legacy `information_schema` code.
304impl<T> SystemTable for T
305where
306    T: InformationTable,
307{
308    fn table_id(&self) -> TableId {
309        InformationTable::table_id(self)
310    }
311
312    fn table_name(&self) -> &'static str {
313        InformationTable::table_name(self)
314    }
315
316    fn schema(&self) -> SchemaRef {
317        InformationTable::schema(self)
318    }
319
320    fn table_type(&self) -> TableType {
321        InformationTable::table_type(self)
322    }
323
324    fn to_stream(&self, request: ScanRequest) -> Result<SendableRecordBatchStream> {
325        InformationTable::to_stream(self, request)
326    }
327}
328
329pub type InformationExtensionRef = Arc<dyn InformationExtension<Error = Error> + Send + Sync>;
330
331/// The `InformationExtension` trait provides the extension methods for the `information_schema` tables.
332#[async_trait::async_trait]
333pub trait InformationExtension {
334    type Error: ErrorExt;
335
336    /// Gets the nodes information.
337    async fn nodes(&self) -> std::result::Result<Vec<NodeInfo>, Self::Error>;
338
339    /// Gets the procedures information.
340    async fn procedures(&self) -> std::result::Result<Vec<(String, ProcedureInfo)>, Self::Error>;
341
342    /// Gets the region statistics.
343    async fn region_stats(&self) -> std::result::Result<Vec<RegionStat>, Self::Error>;
344
345    /// Get the flow statistics. If no flownode is available, return `None`.
346    async fn flow_stats(&self) -> std::result::Result<Option<FlowStat>, Self::Error>;
347}
348
349pub struct NoopInformationExtension;
350
351#[async_trait::async_trait]
352impl InformationExtension for NoopInformationExtension {
353    type Error = Error;
354
355    async fn nodes(&self) -> std::result::Result<Vec<NodeInfo>, Self::Error> {
356        Ok(vec![])
357    }
358
359    async fn procedures(&self) -> std::result::Result<Vec<(String, ProcedureInfo)>, Self::Error> {
360        Ok(vec![])
361    }
362
363    async fn region_stats(&self) -> std::result::Result<Vec<RegionStat>, Self::Error> {
364        Ok(vec![])
365    }
366
367    async fn flow_stats(&self) -> std::result::Result<Option<FlowStat>, Self::Error> {
368        Ok(None)
369    }
370}