catalog/system_schema/
information_schema.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod cluster_info;
16pub mod columns;
17pub mod flows;
18mod information_memory_table;
19pub mod key_column_usage;
20mod partitions;
21mod procedure_info;
22pub mod process_list;
23pub mod region_peers;
24mod region_statistics;
25mod runtime_metrics;
26pub mod schemata;
27mod table_constraints;
28mod table_names;
29pub mod tables;
30mod views;
31
32use std::collections::HashMap;
33use std::sync::{Arc, Weak};
34
35use common_catalog::consts::{self, DEFAULT_CATALOG_NAME, INFORMATION_SCHEMA_NAME};
36use common_error::ext::ErrorExt;
37use common_meta::cluster::NodeInfo;
38use common_meta::datanode::RegionStat;
39use common_meta::key::flow::flow_state::FlowStat;
40use common_meta::key::flow::FlowMetadataManager;
41use common_meta::kv_backend::KvBackendRef;
42use common_procedure::ProcedureInfo;
43use common_recordbatch::SendableRecordBatchStream;
44use datatypes::schema::SchemaRef;
45use lazy_static::lazy_static;
46use paste::paste;
47use process_list::InformationSchemaProcessList;
48use store_api::storage::{ScanRequest, TableId};
49use table::metadata::TableType;
50use table::TableRef;
51pub use table_names::*;
52use views::InformationSchemaViews;
53
54use self::columns::InformationSchemaColumns;
55use crate::error::{Error, Result};
56use crate::process_manager::ProcessManagerRef;
57use crate::system_schema::information_schema::cluster_info::InformationSchemaClusterInfo;
58use crate::system_schema::information_schema::flows::InformationSchemaFlows;
59use crate::system_schema::information_schema::information_memory_table::get_schema_columns;
60use crate::system_schema::information_schema::key_column_usage::InformationSchemaKeyColumnUsage;
61use crate::system_schema::information_schema::partitions::InformationSchemaPartitions;
62use crate::system_schema::information_schema::region_peers::InformationSchemaRegionPeers;
63use crate::system_schema::information_schema::runtime_metrics::InformationSchemaMetrics;
64use crate::system_schema::information_schema::schemata::InformationSchemaSchemata;
65use crate::system_schema::information_schema::table_constraints::InformationSchemaTableConstraints;
66use crate::system_schema::information_schema::tables::InformationSchemaTables;
67use crate::system_schema::memory_table::MemoryTable;
68pub(crate) use crate::system_schema::predicate::Predicates;
69use crate::system_schema::{
70    SystemSchemaProvider, SystemSchemaProviderInner, SystemTable, SystemTableRef,
71};
72use crate::CatalogManager;
73
74lazy_static! {
75    // Memory tables in `information_schema`.
76    static ref MEMORY_TABLES: &'static [&'static str] = &[
77        ENGINES,
78        COLUMN_PRIVILEGES,
79        COLUMN_STATISTICS,
80        CHARACTER_SETS,
81        COLLATIONS,
82        COLLATION_CHARACTER_SET_APPLICABILITY,
83        CHECK_CONSTRAINTS,
84        EVENTS,
85        FILES,
86        OPTIMIZER_TRACE,
87        PARAMETERS,
88        PROFILING,
89        REFERENTIAL_CONSTRAINTS,
90        ROUTINES,
91        SCHEMA_PRIVILEGES,
92        TABLE_PRIVILEGES,
93        TRIGGERS,
94        GLOBAL_STATUS,
95        SESSION_STATUS,
96        PARTITIONS,
97    ];
98}
99
100macro_rules! setup_memory_table {
101    ($name: expr) => {
102        paste! {
103            {
104                let (schema, columns) = get_schema_columns($name);
105                Some(Arc::new(MemoryTable::new(
106                    consts::[<INFORMATION_SCHEMA_ $name  _TABLE_ID>],
107                    $name,
108                    schema,
109                    columns
110                )) as _)
111            }
112        }
113    };
114}
115
116#[cfg(feature = "enterprise")]
117pub struct MakeInformationTableRequest {
118    pub catalog_name: String,
119    pub catalog_manager: Weak<dyn CatalogManager>,
120    pub kv_backend: KvBackendRef,
121}
122
123/// A factory trait for making information schema tables.
124///
125/// This trait allows for extensibility of the information schema by providing
126/// a way to dynamically create custom information schema tables.
127#[cfg(feature = "enterprise")]
128pub trait InformationSchemaTableFactory {
129    fn make_information_table(&self, req: MakeInformationTableRequest) -> SystemTableRef;
130}
131
132#[cfg(feature = "enterprise")]
133pub type InformationSchemaTableFactoryRef = Arc<dyn InformationSchemaTableFactory + Send + Sync>;
134
135/// The `information_schema` tables info provider.
136pub struct InformationSchemaProvider {
137    catalog_name: String,
138    catalog_manager: Weak<dyn CatalogManager>,
139    process_manager: Option<ProcessManagerRef>,
140    flow_metadata_manager: Arc<FlowMetadataManager>,
141    tables: HashMap<String, TableRef>,
142    #[allow(dead_code)]
143    kv_backend: KvBackendRef,
144    #[cfg(feature = "enterprise")]
145    extra_table_factories: HashMap<String, InformationSchemaTableFactoryRef>,
146}
147
148impl SystemSchemaProvider for InformationSchemaProvider {
149    fn tables(&self) -> &HashMap<String, TableRef> {
150        assert!(!self.tables.is_empty());
151
152        &self.tables
153    }
154}
155
156impl SystemSchemaProviderInner for InformationSchemaProvider {
157    fn catalog_name(&self) -> &str {
158        &self.catalog_name
159    }
160    fn schema_name() -> &'static str {
161        INFORMATION_SCHEMA_NAME
162    }
163
164    fn system_table(&self, name: &str) -> Option<SystemTableRef> {
165        #[cfg(feature = "enterprise")]
166        if let Some(factory) = self.extra_table_factories.get(name) {
167            let req = MakeInformationTableRequest {
168                catalog_name: self.catalog_name.clone(),
169                catalog_manager: self.catalog_manager.clone(),
170                kv_backend: self.kv_backend.clone(),
171            };
172            return Some(factory.make_information_table(req));
173        }
174
175        match name.to_ascii_lowercase().as_str() {
176            TABLES => Some(Arc::new(InformationSchemaTables::new(
177                self.catalog_name.clone(),
178                self.catalog_manager.clone(),
179            )) as _),
180            COLUMNS => Some(Arc::new(InformationSchemaColumns::new(
181                self.catalog_name.clone(),
182                self.catalog_manager.clone(),
183            )) as _),
184            ENGINES => setup_memory_table!(ENGINES),
185            COLUMN_PRIVILEGES => setup_memory_table!(COLUMN_PRIVILEGES),
186            COLUMN_STATISTICS => setup_memory_table!(COLUMN_STATISTICS),
187            BUILD_INFO => setup_memory_table!(BUILD_INFO),
188            CHARACTER_SETS => setup_memory_table!(CHARACTER_SETS),
189            COLLATIONS => setup_memory_table!(COLLATIONS),
190            COLLATION_CHARACTER_SET_APPLICABILITY => {
191                setup_memory_table!(COLLATION_CHARACTER_SET_APPLICABILITY)
192            }
193            CHECK_CONSTRAINTS => setup_memory_table!(CHECK_CONSTRAINTS),
194            EVENTS => setup_memory_table!(EVENTS),
195            FILES => setup_memory_table!(FILES),
196            OPTIMIZER_TRACE => setup_memory_table!(OPTIMIZER_TRACE),
197            PARAMETERS => setup_memory_table!(PARAMETERS),
198            PROFILING => setup_memory_table!(PROFILING),
199            REFERENTIAL_CONSTRAINTS => setup_memory_table!(REFERENTIAL_CONSTRAINTS),
200            ROUTINES => setup_memory_table!(ROUTINES),
201            SCHEMA_PRIVILEGES => setup_memory_table!(SCHEMA_PRIVILEGES),
202            TABLE_PRIVILEGES => setup_memory_table!(TABLE_PRIVILEGES),
203            TRIGGERS => setup_memory_table!(TRIGGERS),
204            GLOBAL_STATUS => setup_memory_table!(GLOBAL_STATUS),
205            SESSION_STATUS => setup_memory_table!(SESSION_STATUS),
206            KEY_COLUMN_USAGE => Some(Arc::new(InformationSchemaKeyColumnUsage::new(
207                self.catalog_name.clone(),
208                self.catalog_manager.clone(),
209            )) as _),
210            SCHEMATA => Some(Arc::new(InformationSchemaSchemata::new(
211                self.catalog_name.clone(),
212                self.catalog_manager.clone(),
213            )) as _),
214            RUNTIME_METRICS => Some(Arc::new(InformationSchemaMetrics::new())),
215            PARTITIONS => Some(Arc::new(InformationSchemaPartitions::new(
216                self.catalog_name.clone(),
217                self.catalog_manager.clone(),
218            )) as _),
219            REGION_PEERS => Some(Arc::new(InformationSchemaRegionPeers::new(
220                self.catalog_name.clone(),
221                self.catalog_manager.clone(),
222            )) as _),
223            TABLE_CONSTRAINTS => Some(Arc::new(InformationSchemaTableConstraints::new(
224                self.catalog_name.clone(),
225                self.catalog_manager.clone(),
226            )) as _),
227            CLUSTER_INFO => Some(Arc::new(InformationSchemaClusterInfo::new(
228                self.catalog_manager.clone(),
229            )) as _),
230            VIEWS => Some(Arc::new(InformationSchemaViews::new(
231                self.catalog_name.clone(),
232                self.catalog_manager.clone(),
233            )) as _),
234            FLOWS => Some(Arc::new(InformationSchemaFlows::new(
235                self.catalog_name.clone(),
236                self.catalog_manager.clone(),
237                self.flow_metadata_manager.clone(),
238            )) as _),
239            PROCEDURE_INFO => Some(
240                Arc::new(procedure_info::InformationSchemaProcedureInfo::new(
241                    self.catalog_manager.clone(),
242                )) as _,
243            ),
244            REGION_STATISTICS => Some(Arc::new(
245                region_statistics::InformationSchemaRegionStatistics::new(
246                    self.catalog_manager.clone(),
247                ),
248            ) as _),
249            PROCESS_LIST => self
250                .process_manager
251                .as_ref()
252                .map(|p| Arc::new(InformationSchemaProcessList::new(p.clone())) as _),
253            _ => None,
254        }
255    }
256}
257
258impl InformationSchemaProvider {
259    pub fn new(
260        catalog_name: String,
261        catalog_manager: Weak<dyn CatalogManager>,
262        flow_metadata_manager: Arc<FlowMetadataManager>,
263        process_manager: Option<ProcessManagerRef>,
264        kv_backend: KvBackendRef,
265    ) -> Self {
266        let mut provider = Self {
267            catalog_name,
268            catalog_manager,
269            flow_metadata_manager,
270            process_manager,
271            tables: HashMap::new(),
272            kv_backend,
273            #[cfg(feature = "enterprise")]
274            extra_table_factories: HashMap::new(),
275        };
276
277        provider.build_tables();
278
279        provider
280    }
281
282    #[cfg(feature = "enterprise")]
283    pub(crate) fn with_extra_table_factories(
284        mut self,
285        factories: HashMap<String, InformationSchemaTableFactoryRef>,
286    ) -> Self {
287        self.extra_table_factories = factories;
288        self.build_tables();
289        self
290    }
291
292    fn build_tables(&mut self) {
293        let mut tables = HashMap::new();
294
295        // SECURITY NOTE:
296        // Carefully consider the tables that may expose sensitive cluster configurations,
297        // authentication details, and other critical information.
298        // Only put these tables under `greptime` catalog to prevent info leak.
299        if self.catalog_name == DEFAULT_CATALOG_NAME {
300            tables.insert(
301                RUNTIME_METRICS.to_string(),
302                self.build_table(RUNTIME_METRICS).unwrap(),
303            );
304            tables.insert(
305                BUILD_INFO.to_string(),
306                self.build_table(BUILD_INFO).unwrap(),
307            );
308            tables.insert(
309                REGION_PEERS.to_string(),
310                self.build_table(REGION_PEERS).unwrap(),
311            );
312            tables.insert(
313                CLUSTER_INFO.to_string(),
314                self.build_table(CLUSTER_INFO).unwrap(),
315            );
316            tables.insert(
317                PROCEDURE_INFO.to_string(),
318                self.build_table(PROCEDURE_INFO).unwrap(),
319            );
320            tables.insert(
321                REGION_STATISTICS.to_string(),
322                self.build_table(REGION_STATISTICS).unwrap(),
323            );
324        }
325
326        tables.insert(TABLES.to_string(), self.build_table(TABLES).unwrap());
327        tables.insert(VIEWS.to_string(), self.build_table(VIEWS).unwrap());
328        tables.insert(SCHEMATA.to_string(), self.build_table(SCHEMATA).unwrap());
329        tables.insert(COLUMNS.to_string(), self.build_table(COLUMNS).unwrap());
330        tables.insert(
331            KEY_COLUMN_USAGE.to_string(),
332            self.build_table(KEY_COLUMN_USAGE).unwrap(),
333        );
334        tables.insert(
335            TABLE_CONSTRAINTS.to_string(),
336            self.build_table(TABLE_CONSTRAINTS).unwrap(),
337        );
338        tables.insert(FLOWS.to_string(), self.build_table(FLOWS).unwrap());
339        if let Some(process_list) = self.build_table(PROCESS_LIST) {
340            tables.insert(PROCESS_LIST.to_string(), process_list);
341        }
342        #[cfg(feature = "enterprise")]
343        for name in self.extra_table_factories.keys() {
344            tables.insert(name.to_string(), self.build_table(name).expect(name));
345        }
346        // Add memory tables
347        for name in MEMORY_TABLES.iter() {
348            tables.insert((*name).to_string(), self.build_table(name).expect(name));
349        }
350        self.tables = tables;
351    }
352}
353
354pub trait InformationTable {
355    fn table_id(&self) -> TableId;
356
357    fn table_name(&self) -> &'static str;
358
359    fn schema(&self) -> SchemaRef;
360
361    fn to_stream(&self, request: ScanRequest) -> Result<SendableRecordBatchStream>;
362
363    fn table_type(&self) -> TableType {
364        TableType::Temporary
365    }
366}
367
368// Provide compatibility for legacy `information_schema` code.
369impl<T> SystemTable for T
370where
371    T: InformationTable,
372{
373    fn table_id(&self) -> TableId {
374        InformationTable::table_id(self)
375    }
376
377    fn table_name(&self) -> &'static str {
378        InformationTable::table_name(self)
379    }
380
381    fn schema(&self) -> SchemaRef {
382        InformationTable::schema(self)
383    }
384
385    fn table_type(&self) -> TableType {
386        InformationTable::table_type(self)
387    }
388
389    fn to_stream(&self, request: ScanRequest) -> Result<SendableRecordBatchStream> {
390        InformationTable::to_stream(self, request)
391    }
392}
393
394pub type InformationExtensionRef = Arc<dyn InformationExtension<Error = Error> + Send + Sync>;
395
396/// The `InformationExtension` trait provides the extension methods for the `information_schema` tables.
397#[async_trait::async_trait]
398pub trait InformationExtension {
399    type Error: ErrorExt;
400
401    /// Gets the nodes information.
402    async fn nodes(&self) -> std::result::Result<Vec<NodeInfo>, Self::Error>;
403
404    /// Gets the procedures information.
405    async fn procedures(&self) -> std::result::Result<Vec<(String, ProcedureInfo)>, Self::Error>;
406
407    /// Gets the region statistics.
408    async fn region_stats(&self) -> std::result::Result<Vec<RegionStat>, Self::Error>;
409
410    /// Get the flow statistics. If no flownode is available, return `None`.
411    async fn flow_stats(&self) -> std::result::Result<Option<FlowStat>, Self::Error>;
412}
413
414pub struct NoopInformationExtension;
415
416#[async_trait::async_trait]
417impl InformationExtension for NoopInformationExtension {
418    type Error = Error;
419
420    async fn nodes(&self) -> std::result::Result<Vec<NodeInfo>, Self::Error> {
421        Ok(vec![])
422    }
423
424    async fn procedures(&self) -> std::result::Result<Vec<(String, ProcedureInfo)>, Self::Error> {
425        Ok(vec![])
426    }
427
428    async fn region_stats(&self) -> std::result::Result<Vec<RegionStat>, Self::Error> {
429        Ok(vec![])
430    }
431
432    async fn flow_stats(&self) -> std::result::Result<Option<FlowStat>, Self::Error> {
433        Ok(None)
434    }
435}