catalog/system_schema/
information_schema.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod cluster_info;
16pub mod columns;
17pub mod flows;
18mod information_memory_table;
19pub mod key_column_usage;
20mod partitions;
21mod procedure_info;
22pub mod process_list;
23pub mod region_peers;
24mod region_statistics;
25mod runtime_metrics;
26pub mod schemata;
27mod table_constraints;
28mod table_names;
29pub mod tables;
30mod views;
31
32use std::collections::HashMap;
33use std::sync::{Arc, Weak};
34
35use common_catalog::consts::{self, DEFAULT_CATALOG_NAME, INFORMATION_SCHEMA_NAME};
36use common_error::ext::ErrorExt;
37use common_meta::cluster::NodeInfo;
38use common_meta::datanode::RegionStat;
39use common_meta::key::flow::flow_state::FlowStat;
40use common_meta::key::flow::FlowMetadataManager;
41use common_meta::kv_backend::KvBackendRef;
42use common_procedure::ProcedureInfo;
43use common_recordbatch::SendableRecordBatchStream;
44use datatypes::schema::SchemaRef;
45use lazy_static::lazy_static;
46use paste::paste;
47use process_list::InformationSchemaProcessList;
48use store_api::storage::{ScanRequest, TableId};
49use table::metadata::TableType;
50use table::TableRef;
51pub use table_names::*;
52use views::InformationSchemaViews;
53
54use self::columns::InformationSchemaColumns;
55use crate::error::{Error, Result};
56use crate::process_manager::ProcessManagerRef;
57use crate::system_schema::information_schema::cluster_info::InformationSchemaClusterInfo;
58use crate::system_schema::information_schema::flows::InformationSchemaFlows;
59use crate::system_schema::information_schema::information_memory_table::get_schema_columns;
60use crate::system_schema::information_schema::key_column_usage::InformationSchemaKeyColumnUsage;
61use crate::system_schema::information_schema::partitions::InformationSchemaPartitions;
62use crate::system_schema::information_schema::region_peers::InformationSchemaRegionPeers;
63use crate::system_schema::information_schema::runtime_metrics::InformationSchemaMetrics;
64use crate::system_schema::information_schema::schemata::InformationSchemaSchemata;
65use crate::system_schema::information_schema::table_constraints::InformationSchemaTableConstraints;
66use crate::system_schema::information_schema::tables::InformationSchemaTables;
67use crate::system_schema::memory_table::MemoryTable;
68pub(crate) use crate::system_schema::predicate::Predicates;
69use crate::system_schema::{
70    SystemSchemaProvider, SystemSchemaProviderInner, SystemTable, SystemTableRef,
71};
72use crate::CatalogManager;
73
74lazy_static! {
75    // Memory tables in `information_schema`.
76    static ref MEMORY_TABLES: &'static [&'static str] = &[
77        ENGINES,
78        COLUMN_PRIVILEGES,
79        COLUMN_STATISTICS,
80        CHARACTER_SETS,
81        COLLATIONS,
82        COLLATION_CHARACTER_SET_APPLICABILITY,
83        CHECK_CONSTRAINTS,
84        EVENTS,
85        FILES,
86        OPTIMIZER_TRACE,
87        PARAMETERS,
88        PROFILING,
89        REFERENTIAL_CONSTRAINTS,
90        ROUTINES,
91        SCHEMA_PRIVILEGES,
92        TABLE_PRIVILEGES,
93        TRIGGERS,
94        GLOBAL_STATUS,
95        SESSION_STATUS,
96        PARTITIONS,
97    ];
98}
99
100macro_rules! setup_memory_table {
101    ($name: expr) => {
102        paste! {
103            {
104                let (schema, columns) = get_schema_columns($name);
105                Some(Arc::new(MemoryTable::new(
106                    consts::[<INFORMATION_SCHEMA_ $name  _TABLE_ID>],
107                    $name,
108                    schema,
109                    columns
110                )) as _)
111            }
112        }
113    };
114}
115
116#[cfg(feature = "enterprise")]
117pub struct MakeInformationTableRequest {
118    pub catalog_name: String,
119    pub catalog_manager: Weak<dyn CatalogManager>,
120    pub kv_backend: KvBackendRef,
121}
122
123/// A factory trait for making information schema tables.
124///
125/// This trait allows for extensibility of the information schema by providing
126/// a way to dynamically create custom information schema tables.
127#[cfg(feature = "enterprise")]
128pub trait InformationSchemaTableFactory {
129    fn make_information_table(&self, req: MakeInformationTableRequest) -> SystemTableRef;
130}
131
132#[cfg(feature = "enterprise")]
133pub type InformationSchemaTableFactoryRef = Arc<dyn InformationSchemaTableFactory + Send + Sync>;
134
135/// The `information_schema` tables info provider.
136pub struct InformationSchemaProvider {
137    catalog_name: String,
138    catalog_manager: Weak<dyn CatalogManager>,
139    process_manager: Option<ProcessManagerRef>,
140    flow_metadata_manager: Arc<FlowMetadataManager>,
141    tables: HashMap<String, TableRef>,
142    #[allow(dead_code)]
143    kv_backend: KvBackendRef,
144    #[cfg(feature = "enterprise")]
145    extra_table_factories: HashMap<String, InformationSchemaTableFactoryRef>,
146}
147
148impl SystemSchemaProvider for InformationSchemaProvider {
149    fn tables(&self) -> &HashMap<String, TableRef> {
150        assert!(!self.tables.is_empty());
151
152        &self.tables
153    }
154}
155
156impl SystemSchemaProviderInner for InformationSchemaProvider {
157    fn catalog_name(&self) -> &str {
158        &self.catalog_name
159    }
160    fn schema_name() -> &'static str {
161        INFORMATION_SCHEMA_NAME
162    }
163
164    fn system_table(&self, name: &str) -> Option<SystemTableRef> {
165        match name.to_ascii_lowercase().as_str() {
166            TABLES => Some(Arc::new(InformationSchemaTables::new(
167                self.catalog_name.clone(),
168                self.catalog_manager.clone(),
169            )) as _),
170            COLUMNS => Some(Arc::new(InformationSchemaColumns::new(
171                self.catalog_name.clone(),
172                self.catalog_manager.clone(),
173            )) as _),
174            ENGINES => setup_memory_table!(ENGINES),
175            COLUMN_PRIVILEGES => setup_memory_table!(COLUMN_PRIVILEGES),
176            COLUMN_STATISTICS => setup_memory_table!(COLUMN_STATISTICS),
177            BUILD_INFO => setup_memory_table!(BUILD_INFO),
178            CHARACTER_SETS => setup_memory_table!(CHARACTER_SETS),
179            COLLATIONS => setup_memory_table!(COLLATIONS),
180            COLLATION_CHARACTER_SET_APPLICABILITY => {
181                setup_memory_table!(COLLATION_CHARACTER_SET_APPLICABILITY)
182            }
183            CHECK_CONSTRAINTS => setup_memory_table!(CHECK_CONSTRAINTS),
184            EVENTS => setup_memory_table!(EVENTS),
185            FILES => setup_memory_table!(FILES),
186            OPTIMIZER_TRACE => setup_memory_table!(OPTIMIZER_TRACE),
187            PARAMETERS => setup_memory_table!(PARAMETERS),
188            PROFILING => setup_memory_table!(PROFILING),
189            REFERENTIAL_CONSTRAINTS => setup_memory_table!(REFERENTIAL_CONSTRAINTS),
190            ROUTINES => setup_memory_table!(ROUTINES),
191            SCHEMA_PRIVILEGES => setup_memory_table!(SCHEMA_PRIVILEGES),
192            TABLE_PRIVILEGES => setup_memory_table!(TABLE_PRIVILEGES),
193            TRIGGERS => setup_memory_table!(TRIGGERS),
194            GLOBAL_STATUS => setup_memory_table!(GLOBAL_STATUS),
195            SESSION_STATUS => setup_memory_table!(SESSION_STATUS),
196            KEY_COLUMN_USAGE => Some(Arc::new(InformationSchemaKeyColumnUsage::new(
197                self.catalog_name.clone(),
198                self.catalog_manager.clone(),
199            )) as _),
200            SCHEMATA => Some(Arc::new(InformationSchemaSchemata::new(
201                self.catalog_name.clone(),
202                self.catalog_manager.clone(),
203            )) as _),
204            RUNTIME_METRICS => Some(Arc::new(InformationSchemaMetrics::new())),
205            PARTITIONS => Some(Arc::new(InformationSchemaPartitions::new(
206                self.catalog_name.clone(),
207                self.catalog_manager.clone(),
208            )) as _),
209            REGION_PEERS => Some(Arc::new(InformationSchemaRegionPeers::new(
210                self.catalog_name.clone(),
211                self.catalog_manager.clone(),
212            )) as _),
213            TABLE_CONSTRAINTS => Some(Arc::new(InformationSchemaTableConstraints::new(
214                self.catalog_name.clone(),
215                self.catalog_manager.clone(),
216            )) as _),
217            CLUSTER_INFO => Some(Arc::new(InformationSchemaClusterInfo::new(
218                self.catalog_manager.clone(),
219            )) as _),
220            VIEWS => Some(Arc::new(InformationSchemaViews::new(
221                self.catalog_name.clone(),
222                self.catalog_manager.clone(),
223            )) as _),
224            FLOWS => Some(Arc::new(InformationSchemaFlows::new(
225                self.catalog_name.clone(),
226                self.catalog_manager.clone(),
227                self.flow_metadata_manager.clone(),
228            )) as _),
229            PROCEDURE_INFO => Some(
230                Arc::new(procedure_info::InformationSchemaProcedureInfo::new(
231                    self.catalog_manager.clone(),
232                )) as _,
233            ),
234            REGION_STATISTICS => Some(Arc::new(
235                region_statistics::InformationSchemaRegionStatistics::new(
236                    self.catalog_manager.clone(),
237                ),
238            ) as _),
239            PROCESS_LIST => self
240                .process_manager
241                .as_ref()
242                .map(|p| Arc::new(InformationSchemaProcessList::new(p.clone())) as _),
243            table_name => {
244                #[cfg(feature = "enterprise")]
245                return self.extra_table_factories.get(table_name).map(|factory| {
246                    let req = MakeInformationTableRequest {
247                        catalog_name: self.catalog_name.clone(),
248                        catalog_manager: self.catalog_manager.clone(),
249                        kv_backend: self.kv_backend.clone(),
250                    };
251                    factory.make_information_table(req)
252                });
253                #[cfg(not(feature = "enterprise"))]
254                {
255                    let _ = table_name;
256                    None
257                }
258            }
259        }
260    }
261}
262
263impl InformationSchemaProvider {
264    pub fn new(
265        catalog_name: String,
266        catalog_manager: Weak<dyn CatalogManager>,
267        flow_metadata_manager: Arc<FlowMetadataManager>,
268        process_manager: Option<ProcessManagerRef>,
269        kv_backend: KvBackendRef,
270    ) -> Self {
271        let mut provider = Self {
272            catalog_name,
273            catalog_manager,
274            flow_metadata_manager,
275            process_manager,
276            tables: HashMap::new(),
277            kv_backend,
278            #[cfg(feature = "enterprise")]
279            extra_table_factories: HashMap::new(),
280        };
281
282        provider.build_tables();
283
284        provider
285    }
286
287    #[cfg(feature = "enterprise")]
288    pub(crate) fn with_extra_table_factories(
289        mut self,
290        factories: HashMap<String, InformationSchemaTableFactoryRef>,
291    ) -> Self {
292        self.extra_table_factories = factories;
293        self.build_tables();
294        self
295    }
296
297    fn build_tables(&mut self) {
298        let mut tables = HashMap::new();
299
300        // SECURITY NOTE:
301        // Carefully consider the tables that may expose sensitive cluster configurations,
302        // authentication details, and other critical information.
303        // Only put these tables under `greptime` catalog to prevent info leak.
304        if self.catalog_name == DEFAULT_CATALOG_NAME {
305            tables.insert(
306                RUNTIME_METRICS.to_string(),
307                self.build_table(RUNTIME_METRICS).unwrap(),
308            );
309            tables.insert(
310                BUILD_INFO.to_string(),
311                self.build_table(BUILD_INFO).unwrap(),
312            );
313            tables.insert(
314                REGION_PEERS.to_string(),
315                self.build_table(REGION_PEERS).unwrap(),
316            );
317            tables.insert(
318                CLUSTER_INFO.to_string(),
319                self.build_table(CLUSTER_INFO).unwrap(),
320            );
321            tables.insert(
322                PROCEDURE_INFO.to_string(),
323                self.build_table(PROCEDURE_INFO).unwrap(),
324            );
325            tables.insert(
326                REGION_STATISTICS.to_string(),
327                self.build_table(REGION_STATISTICS).unwrap(),
328            );
329        }
330
331        tables.insert(TABLES.to_string(), self.build_table(TABLES).unwrap());
332        tables.insert(VIEWS.to_string(), self.build_table(VIEWS).unwrap());
333        tables.insert(SCHEMATA.to_string(), self.build_table(SCHEMATA).unwrap());
334        tables.insert(COLUMNS.to_string(), self.build_table(COLUMNS).unwrap());
335        tables.insert(
336            KEY_COLUMN_USAGE.to_string(),
337            self.build_table(KEY_COLUMN_USAGE).unwrap(),
338        );
339        tables.insert(
340            TABLE_CONSTRAINTS.to_string(),
341            self.build_table(TABLE_CONSTRAINTS).unwrap(),
342        );
343        tables.insert(FLOWS.to_string(), self.build_table(FLOWS).unwrap());
344        if let Some(process_list) = self.build_table(PROCESS_LIST) {
345            tables.insert(PROCESS_LIST.to_string(), process_list);
346        }
347        #[cfg(feature = "enterprise")]
348        for name in self.extra_table_factories.keys() {
349            tables.insert(name.to_string(), self.build_table(name).expect(name));
350        }
351        // Add memory tables
352        for name in MEMORY_TABLES.iter() {
353            tables.insert((*name).to_string(), self.build_table(name).expect(name));
354        }
355        self.tables = tables;
356    }
357}
358
359pub trait InformationTable {
360    fn table_id(&self) -> TableId;
361
362    fn table_name(&self) -> &'static str;
363
364    fn schema(&self) -> SchemaRef;
365
366    fn to_stream(&self, request: ScanRequest) -> Result<SendableRecordBatchStream>;
367
368    fn table_type(&self) -> TableType {
369        TableType::Temporary
370    }
371}
372
373// Provide compatibility for legacy `information_schema` code.
374impl<T> SystemTable for T
375where
376    T: InformationTable,
377{
378    fn table_id(&self) -> TableId {
379        InformationTable::table_id(self)
380    }
381
382    fn table_name(&self) -> &'static str {
383        InformationTable::table_name(self)
384    }
385
386    fn schema(&self) -> SchemaRef {
387        InformationTable::schema(self)
388    }
389
390    fn table_type(&self) -> TableType {
391        InformationTable::table_type(self)
392    }
393
394    fn to_stream(&self, request: ScanRequest) -> Result<SendableRecordBatchStream> {
395        InformationTable::to_stream(self, request)
396    }
397}
398
399pub type InformationExtensionRef = Arc<dyn InformationExtension<Error = Error> + Send + Sync>;
400
401/// The `InformationExtension` trait provides the extension methods for the `information_schema` tables.
402#[async_trait::async_trait]
403pub trait InformationExtension {
404    type Error: ErrorExt;
405
406    /// Gets the nodes information.
407    async fn nodes(&self) -> std::result::Result<Vec<NodeInfo>, Self::Error>;
408
409    /// Gets the procedures information.
410    async fn procedures(&self) -> std::result::Result<Vec<(String, ProcedureInfo)>, Self::Error>;
411
412    /// Gets the region statistics.
413    async fn region_stats(&self) -> std::result::Result<Vec<RegionStat>, Self::Error>;
414
415    /// Get the flow statistics. If no flownode is available, return `None`.
416    async fn flow_stats(&self) -> std::result::Result<Option<FlowStat>, Self::Error>;
417}
418
419pub struct NoopInformationExtension;
420
421#[async_trait::async_trait]
422impl InformationExtension for NoopInformationExtension {
423    type Error = Error;
424
425    async fn nodes(&self) -> std::result::Result<Vec<NodeInfo>, Self::Error> {
426        Ok(vec![])
427    }
428
429    async fn procedures(&self) -> std::result::Result<Vec<(String, ProcedureInfo)>, Self::Error> {
430        Ok(vec![])
431    }
432
433    async fn region_stats(&self) -> std::result::Result<Vec<RegionStat>, Self::Error> {
434        Ok(vec![])
435    }
436
437    async fn flow_stats(&self) -> std::result::Result<Option<FlowStat>, Self::Error> {
438        Ok(None)
439    }
440}