catalog/system_schema/
information_schema.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod cluster_info;
16pub mod columns;
17pub mod flows;
18mod information_memory_table;
19pub mod key_column_usage;
20mod partitions;
21mod procedure_info;
22pub mod process_list;
23pub mod region_peers;
24mod region_statistics;
25mod runtime_metrics;
26pub mod schemata;
27mod table_constraints;
28mod table_names;
29pub mod tables;
30mod views;
31
32use std::collections::HashMap;
33use std::sync::{Arc, Weak};
34
35use common_catalog::consts::{self, DEFAULT_CATALOG_NAME, INFORMATION_SCHEMA_NAME};
36use common_error::ext::ErrorExt;
37use common_meta::cluster::NodeInfo;
38use common_meta::datanode::RegionStat;
39use common_meta::key::flow::FlowMetadataManager;
40use common_meta::key::flow::flow_state::FlowStat;
41use common_meta::kv_backend::KvBackendRef;
42use common_procedure::ProcedureInfo;
43use common_recordbatch::SendableRecordBatchStream;
44use datafusion::error::DataFusionError;
45use datafusion::logical_expr::LogicalPlan;
46use datatypes::schema::SchemaRef;
47use lazy_static::lazy_static;
48use paste::paste;
49use process_list::InformationSchemaProcessList;
50use store_api::sst_entry::{ManifestSstEntry, StorageSstEntry};
51use store_api::storage::{ScanRequest, TableId};
52use table::TableRef;
53use table::metadata::TableType;
54pub use table_names::*;
55use views::InformationSchemaViews;
56
57use self::columns::InformationSchemaColumns;
58use crate::CatalogManager;
59use crate::error::{Error, Result};
60use crate::process_manager::ProcessManagerRef;
61use crate::system_schema::information_schema::cluster_info::InformationSchemaClusterInfo;
62use crate::system_schema::information_schema::flows::InformationSchemaFlows;
63use crate::system_schema::information_schema::information_memory_table::get_schema_columns;
64use crate::system_schema::information_schema::key_column_usage::InformationSchemaKeyColumnUsage;
65use crate::system_schema::information_schema::partitions::InformationSchemaPartitions;
66use crate::system_schema::information_schema::region_peers::InformationSchemaRegionPeers;
67use crate::system_schema::information_schema::runtime_metrics::InformationSchemaMetrics;
68use crate::system_schema::information_schema::schemata::InformationSchemaSchemata;
69use crate::system_schema::information_schema::table_constraints::InformationSchemaTableConstraints;
70use crate::system_schema::information_schema::tables::InformationSchemaTables;
71use crate::system_schema::memory_table::MemoryTable;
72pub(crate) use crate::system_schema::predicate::Predicates;
73use crate::system_schema::{
74    SystemSchemaProvider, SystemSchemaProviderInner, SystemTable, SystemTableRef,
75};
76
77lazy_static! {
78    // Memory tables in `information_schema`.
79    static ref MEMORY_TABLES: &'static [&'static str] = &[
80        ENGINES,
81        COLUMN_PRIVILEGES,
82        COLUMN_STATISTICS,
83        CHARACTER_SETS,
84        COLLATIONS,
85        COLLATION_CHARACTER_SET_APPLICABILITY,
86        CHECK_CONSTRAINTS,
87        EVENTS,
88        FILES,
89        OPTIMIZER_TRACE,
90        PARAMETERS,
91        PROFILING,
92        REFERENTIAL_CONSTRAINTS,
93        ROUTINES,
94        SCHEMA_PRIVILEGES,
95        TABLE_PRIVILEGES,
96        TRIGGERS,
97        GLOBAL_STATUS,
98        SESSION_STATUS,
99        PARTITIONS,
100    ];
101}
102
103macro_rules! setup_memory_table {
104    ($name: expr) => {
105        paste! {
106            {
107                let (schema, columns) = get_schema_columns($name);
108                Some(Arc::new(MemoryTable::new(
109                    consts::[<INFORMATION_SCHEMA_ $name  _TABLE_ID>],
110                    $name,
111                    schema,
112                    columns
113                )) as _)
114            }
115        }
116    };
117}
118
119#[cfg(feature = "enterprise")]
120pub struct MakeInformationTableRequest {
121    pub catalog_name: String,
122    pub catalog_manager: Weak<dyn CatalogManager>,
123    pub kv_backend: KvBackendRef,
124}
125
126/// A factory trait for making information schema tables.
127///
128/// This trait allows for extensibility of the information schema by providing
129/// a way to dynamically create custom information schema tables.
130#[cfg(feature = "enterprise")]
131pub trait InformationSchemaTableFactory {
132    fn make_information_table(&self, req: MakeInformationTableRequest) -> SystemTableRef;
133}
134
135#[cfg(feature = "enterprise")]
136pub type InformationSchemaTableFactoryRef = Arc<dyn InformationSchemaTableFactory + Send + Sync>;
137
138/// The `information_schema` tables info provider.
139pub struct InformationSchemaProvider {
140    catalog_name: String,
141    catalog_manager: Weak<dyn CatalogManager>,
142    process_manager: Option<ProcessManagerRef>,
143    flow_metadata_manager: Arc<FlowMetadataManager>,
144    tables: HashMap<String, TableRef>,
145    #[allow(dead_code)]
146    kv_backend: KvBackendRef,
147    #[cfg(feature = "enterprise")]
148    extra_table_factories: HashMap<String, InformationSchemaTableFactoryRef>,
149}
150
151impl SystemSchemaProvider for InformationSchemaProvider {
152    fn tables(&self) -> &HashMap<String, TableRef> {
153        assert!(!self.tables.is_empty());
154
155        &self.tables
156    }
157}
158
159impl SystemSchemaProviderInner for InformationSchemaProvider {
160    fn catalog_name(&self) -> &str {
161        &self.catalog_name
162    }
163    fn schema_name() -> &'static str {
164        INFORMATION_SCHEMA_NAME
165    }
166
167    fn system_table(&self, name: &str) -> Option<SystemTableRef> {
168        #[cfg(feature = "enterprise")]
169        if let Some(factory) = self.extra_table_factories.get(name) {
170            let req = MakeInformationTableRequest {
171                catalog_name: self.catalog_name.clone(),
172                catalog_manager: self.catalog_manager.clone(),
173                kv_backend: self.kv_backend.clone(),
174            };
175            return Some(factory.make_information_table(req));
176        }
177
178        match name.to_ascii_lowercase().as_str() {
179            TABLES => Some(Arc::new(InformationSchemaTables::new(
180                self.catalog_name.clone(),
181                self.catalog_manager.clone(),
182            )) as _),
183            COLUMNS => Some(Arc::new(InformationSchemaColumns::new(
184                self.catalog_name.clone(),
185                self.catalog_manager.clone(),
186            )) as _),
187            ENGINES => setup_memory_table!(ENGINES),
188            COLUMN_PRIVILEGES => setup_memory_table!(COLUMN_PRIVILEGES),
189            COLUMN_STATISTICS => setup_memory_table!(COLUMN_STATISTICS),
190            BUILD_INFO => setup_memory_table!(BUILD_INFO),
191            CHARACTER_SETS => setup_memory_table!(CHARACTER_SETS),
192            COLLATIONS => setup_memory_table!(COLLATIONS),
193            COLLATION_CHARACTER_SET_APPLICABILITY => {
194                setup_memory_table!(COLLATION_CHARACTER_SET_APPLICABILITY)
195            }
196            CHECK_CONSTRAINTS => setup_memory_table!(CHECK_CONSTRAINTS),
197            EVENTS => setup_memory_table!(EVENTS),
198            FILES => setup_memory_table!(FILES),
199            OPTIMIZER_TRACE => setup_memory_table!(OPTIMIZER_TRACE),
200            PARAMETERS => setup_memory_table!(PARAMETERS),
201            PROFILING => setup_memory_table!(PROFILING),
202            REFERENTIAL_CONSTRAINTS => setup_memory_table!(REFERENTIAL_CONSTRAINTS),
203            ROUTINES => setup_memory_table!(ROUTINES),
204            SCHEMA_PRIVILEGES => setup_memory_table!(SCHEMA_PRIVILEGES),
205            TABLE_PRIVILEGES => setup_memory_table!(TABLE_PRIVILEGES),
206            TRIGGERS => setup_memory_table!(TRIGGERS),
207            GLOBAL_STATUS => setup_memory_table!(GLOBAL_STATUS),
208            SESSION_STATUS => setup_memory_table!(SESSION_STATUS),
209            KEY_COLUMN_USAGE => Some(Arc::new(InformationSchemaKeyColumnUsage::new(
210                self.catalog_name.clone(),
211                self.catalog_manager.clone(),
212            )) as _),
213            SCHEMATA => Some(Arc::new(InformationSchemaSchemata::new(
214                self.catalog_name.clone(),
215                self.catalog_manager.clone(),
216            )) as _),
217            RUNTIME_METRICS => Some(Arc::new(InformationSchemaMetrics::new())),
218            PARTITIONS => Some(Arc::new(InformationSchemaPartitions::new(
219                self.catalog_name.clone(),
220                self.catalog_manager.clone(),
221            )) as _),
222            REGION_PEERS => Some(Arc::new(InformationSchemaRegionPeers::new(
223                self.catalog_name.clone(),
224                self.catalog_manager.clone(),
225            )) as _),
226            TABLE_CONSTRAINTS => Some(Arc::new(InformationSchemaTableConstraints::new(
227                self.catalog_name.clone(),
228                self.catalog_manager.clone(),
229            )) as _),
230            CLUSTER_INFO => Some(Arc::new(InformationSchemaClusterInfo::new(
231                self.catalog_manager.clone(),
232            )) as _),
233            VIEWS => Some(Arc::new(InformationSchemaViews::new(
234                self.catalog_name.clone(),
235                self.catalog_manager.clone(),
236            )) as _),
237            FLOWS => Some(Arc::new(InformationSchemaFlows::new(
238                self.catalog_name.clone(),
239                self.catalog_manager.clone(),
240                self.flow_metadata_manager.clone(),
241            )) as _),
242            PROCEDURE_INFO => Some(
243                Arc::new(procedure_info::InformationSchemaProcedureInfo::new(
244                    self.catalog_manager.clone(),
245                )) as _,
246            ),
247            REGION_STATISTICS => Some(Arc::new(
248                region_statistics::InformationSchemaRegionStatistics::new(
249                    self.catalog_manager.clone(),
250                ),
251            ) as _),
252            PROCESS_LIST => self
253                .process_manager
254                .as_ref()
255                .map(|p| Arc::new(InformationSchemaProcessList::new(p.clone())) as _),
256            _ => None,
257        }
258    }
259}
260
261impl InformationSchemaProvider {
262    pub fn new(
263        catalog_name: String,
264        catalog_manager: Weak<dyn CatalogManager>,
265        flow_metadata_manager: Arc<FlowMetadataManager>,
266        process_manager: Option<ProcessManagerRef>,
267        kv_backend: KvBackendRef,
268    ) -> Self {
269        let mut provider = Self {
270            catalog_name,
271            catalog_manager,
272            flow_metadata_manager,
273            process_manager,
274            tables: HashMap::new(),
275            kv_backend,
276            #[cfg(feature = "enterprise")]
277            extra_table_factories: HashMap::new(),
278        };
279
280        provider.build_tables();
281
282        provider
283    }
284
285    #[cfg(feature = "enterprise")]
286    pub(crate) fn with_extra_table_factories(
287        mut self,
288        factories: HashMap<String, InformationSchemaTableFactoryRef>,
289    ) -> Self {
290        self.extra_table_factories = factories;
291        self.build_tables();
292        self
293    }
294
295    fn build_tables(&mut self) {
296        let mut tables = HashMap::new();
297
298        // SECURITY NOTE:
299        // Carefully consider the tables that may expose sensitive cluster configurations,
300        // authentication details, and other critical information.
301        // Only put these tables under `greptime` catalog to prevent info leak.
302        if self.catalog_name == DEFAULT_CATALOG_NAME {
303            tables.insert(
304                RUNTIME_METRICS.to_string(),
305                self.build_table(RUNTIME_METRICS).unwrap(),
306            );
307            tables.insert(
308                BUILD_INFO.to_string(),
309                self.build_table(BUILD_INFO).unwrap(),
310            );
311            tables.insert(
312                REGION_PEERS.to_string(),
313                self.build_table(REGION_PEERS).unwrap(),
314            );
315            tables.insert(
316                CLUSTER_INFO.to_string(),
317                self.build_table(CLUSTER_INFO).unwrap(),
318            );
319            tables.insert(
320                PROCEDURE_INFO.to_string(),
321                self.build_table(PROCEDURE_INFO).unwrap(),
322            );
323            tables.insert(
324                REGION_STATISTICS.to_string(),
325                self.build_table(REGION_STATISTICS).unwrap(),
326            );
327        }
328
329        tables.insert(TABLES.to_string(), self.build_table(TABLES).unwrap());
330        tables.insert(VIEWS.to_string(), self.build_table(VIEWS).unwrap());
331        tables.insert(SCHEMATA.to_string(), self.build_table(SCHEMATA).unwrap());
332        tables.insert(COLUMNS.to_string(), self.build_table(COLUMNS).unwrap());
333        tables.insert(
334            KEY_COLUMN_USAGE.to_string(),
335            self.build_table(KEY_COLUMN_USAGE).unwrap(),
336        );
337        tables.insert(
338            TABLE_CONSTRAINTS.to_string(),
339            self.build_table(TABLE_CONSTRAINTS).unwrap(),
340        );
341        tables.insert(FLOWS.to_string(), self.build_table(FLOWS).unwrap());
342        if let Some(process_list) = self.build_table(PROCESS_LIST) {
343            tables.insert(PROCESS_LIST.to_string(), process_list);
344        }
345        #[cfg(feature = "enterprise")]
346        for name in self.extra_table_factories.keys() {
347            tables.insert(name.to_string(), self.build_table(name).expect(name));
348        }
349        // Add memory tables
350        for name in MEMORY_TABLES.iter() {
351            tables.insert((*name).to_string(), self.build_table(name).expect(name));
352        }
353        self.tables = tables;
354    }
355}
356
357pub trait InformationTable {
358    fn table_id(&self) -> TableId;
359
360    fn table_name(&self) -> &'static str;
361
362    fn schema(&self) -> SchemaRef;
363
364    fn to_stream(&self, request: ScanRequest) -> Result<SendableRecordBatchStream>;
365
366    fn table_type(&self) -> TableType {
367        TableType::Temporary
368    }
369}
370
371// Provide compatibility for legacy `information_schema` code.
372impl<T> SystemTable for T
373where
374    T: InformationTable,
375{
376    fn table_id(&self) -> TableId {
377        InformationTable::table_id(self)
378    }
379
380    fn table_name(&self) -> &'static str {
381        InformationTable::table_name(self)
382    }
383
384    fn schema(&self) -> SchemaRef {
385        InformationTable::schema(self)
386    }
387
388    fn table_type(&self) -> TableType {
389        InformationTable::table_type(self)
390    }
391
392    fn to_stream(&self, request: ScanRequest) -> Result<SendableRecordBatchStream> {
393        InformationTable::to_stream(self, request)
394    }
395}
396
397pub type InformationExtensionRef = Arc<dyn InformationExtension<Error = Error> + Send + Sync>;
398
399/// The `InformationExtension` trait provides the extension methods for the `information_schema` tables.
400#[async_trait::async_trait]
401pub trait InformationExtension {
402    type Error: ErrorExt;
403
404    /// Gets the nodes information.
405    async fn nodes(&self) -> std::result::Result<Vec<NodeInfo>, Self::Error>;
406
407    /// Gets the procedures information.
408    async fn procedures(&self) -> std::result::Result<Vec<(String, ProcedureInfo)>, Self::Error>;
409
410    /// Gets the region statistics.
411    async fn region_stats(&self) -> std::result::Result<Vec<RegionStat>, Self::Error>;
412
413    /// Get the flow statistics. If no flownode is available, return `None`.
414    async fn flow_stats(&self) -> std::result::Result<Option<FlowStat>, Self::Error>;
415
416    /// Inspects the datanode.
417    async fn inspect_datanode(
418        &self,
419        request: DatanodeInspectRequest,
420    ) -> std::result::Result<SendableRecordBatchStream, Self::Error>;
421}
422
423/// The request to inspect the datanode.
424#[derive(Debug, Clone, PartialEq, Eq)]
425pub struct DatanodeInspectRequest {
426    /// Kind to fetch from datanode.
427    pub kind: DatanodeInspectKind,
428
429    /// Pushdown scan configuration (projection/predicate/limit) for the returned stream.
430    /// This allows server-side filtering to reduce I/O and network costs.
431    pub scan: ScanRequest,
432}
433
434/// The kind of the datanode inspect request.
435#[derive(Debug, Clone, Copy, PartialEq, Eq)]
436pub enum DatanodeInspectKind {
437    /// List SST entries recorded in manifest
438    SstManifest,
439    /// List SST entries discovered in storage layer
440    SstStorage,
441}
442
443impl DatanodeInspectRequest {
444    /// Builds a logical plan for the datanode inspect request.
445    pub fn build_plan(self) -> std::result::Result<LogicalPlan, DataFusionError> {
446        match self.kind {
447            DatanodeInspectKind::SstManifest => ManifestSstEntry::build_plan(self.scan),
448            DatanodeInspectKind::SstStorage => StorageSstEntry::build_plan(self.scan),
449        }
450    }
451}
452pub struct NoopInformationExtension;
453
454#[async_trait::async_trait]
455impl InformationExtension for NoopInformationExtension {
456    type Error = Error;
457
458    async fn nodes(&self) -> std::result::Result<Vec<NodeInfo>, Self::Error> {
459        Ok(vec![])
460    }
461
462    async fn procedures(&self) -> std::result::Result<Vec<(String, ProcedureInfo)>, Self::Error> {
463        Ok(vec![])
464    }
465
466    async fn region_stats(&self) -> std::result::Result<Vec<RegionStat>, Self::Error> {
467        Ok(vec![])
468    }
469
470    async fn flow_stats(&self) -> std::result::Result<Option<FlowStat>, Self::Error> {
471        Ok(None)
472    }
473
474    async fn inspect_datanode(
475        &self,
476        _request: DatanodeInspectRequest,
477    ) -> std::result::Result<SendableRecordBatchStream, Self::Error> {
478        Ok(common_recordbatch::RecordBatches::empty().as_stream())
479    }
480}