Skip to main content

catalog/system_schema/
information_schema.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod cluster_info;
16pub mod columns;
17pub mod flows;
18mod information_memory_table;
19pub mod key_column_usage;
20mod partitions;
21mod procedure_info;
22pub mod process_list;
23mod region_info;
24pub mod region_peers;
25mod region_statistics;
26pub mod schemata;
27mod ssts;
28mod table_constraints;
29mod table_names;
30pub mod tables;
31mod views;
32
33use std::collections::HashMap;
34use std::sync::{Arc, Weak};
35
36use common_catalog::consts::{self, DEFAULT_CATALOG_NAME, INFORMATION_SCHEMA_NAME};
37use common_error::ext::ErrorExt;
38use common_meta::cluster::NodeInfo;
39use common_meta::datanode::RegionStat;
40use common_meta::key::flow::FlowMetadataManager;
41use common_meta::key::flow::flow_state::FlowStat;
42use common_meta::kv_backend::KvBackendRef;
43use common_procedure::ProcedureInfo;
44use common_recordbatch::SendableRecordBatchStream;
45use datafusion::error::DataFusionError;
46use datafusion::logical_expr::LogicalPlan;
47use datatypes::schema::SchemaRef;
48use lazy_static::lazy_static;
49use paste::paste;
50use process_list::InformationSchemaProcessList;
51use region_info::InformationSchemaRegionInfo;
52use store_api::region_info::RegionInfoEntry;
53use store_api::sst_entry::{ManifestSstEntry, PuffinIndexMetaEntry, StorageSstEntry};
54use store_api::storage::{ScanRequest, TableId};
55use table::TableRef;
56use table::metadata::TableType;
57pub use table_names::*;
58use views::InformationSchemaViews;
59
60use self::columns::InformationSchemaColumns;
61use crate::CatalogManager;
62use crate::error::{Error, Result};
63use crate::process_manager::ProcessManagerRef;
64use crate::system_schema::information_schema::cluster_info::InformationSchemaClusterInfo;
65use crate::system_schema::information_schema::flows::InformationSchemaFlows;
66use crate::system_schema::information_schema::information_memory_table::get_schema_columns;
67use crate::system_schema::information_schema::key_column_usage::InformationSchemaKeyColumnUsage;
68use crate::system_schema::information_schema::partitions::InformationSchemaPartitions;
69use crate::system_schema::information_schema::region_peers::InformationSchemaRegionPeers;
70use crate::system_schema::information_schema::schemata::InformationSchemaSchemata;
71use crate::system_schema::information_schema::ssts::{
72    InformationSchemaSstsIndexMeta, InformationSchemaSstsManifest, InformationSchemaSstsStorage,
73};
74use crate::system_schema::information_schema::table_constraints::InformationSchemaTableConstraints;
75use crate::system_schema::information_schema::tables::InformationSchemaTables;
76use crate::system_schema::memory_table::MemoryTable;
77pub(crate) use crate::system_schema::predicate::Predicates;
78use crate::system_schema::{
79    SystemSchemaProvider, SystemSchemaProviderInner, SystemTable, SystemTableRef,
80};
81
82lazy_static! {
83    // Memory tables in `information_schema`.
84    static ref MEMORY_TABLES: &'static [&'static str] = &[
85        ENGINES,
86        COLUMN_PRIVILEGES,
87        COLUMN_STATISTICS,
88        CHARACTER_SETS,
89        COLLATIONS,
90        COLLATION_CHARACTER_SET_APPLICABILITY,
91        CHECK_CONSTRAINTS,
92        EVENTS,
93        FILES,
94        OPTIMIZER_TRACE,
95        PARAMETERS,
96        PROFILING,
97        REFERENTIAL_CONSTRAINTS,
98        ROUTINES,
99        SCHEMA_PRIVILEGES,
100        TABLE_PRIVILEGES,
101        GLOBAL_STATUS,
102        SESSION_STATUS,
103        PARTITIONS,
104    ];
105}
106
107macro_rules! setup_memory_table {
108    ($name: expr) => {
109        paste! {
110            {
111                let (schema, columns) = get_schema_columns($name);
112                Some(Arc::new(MemoryTable::new(
113                    consts::[<INFORMATION_SCHEMA_ $name  _TABLE_ID>],
114                    $name,
115                    schema,
116                    columns
117                )) as _)
118            }
119        }
120    };
121}
122
123pub struct MakeInformationTableRequest {
124    pub catalog_name: String,
125    pub catalog_manager: Weak<dyn CatalogManager>,
126    pub kv_backend: KvBackendRef,
127}
128
129/// A factory trait for making information schema tables.
130///
131/// This trait allows for extensibility of the information schema by providing
132/// a way to dynamically create custom information schema tables.
133pub trait InformationSchemaTableFactory {
134    fn make_information_table(&self, req: MakeInformationTableRequest) -> SystemTableRef;
135}
136
137pub type InformationSchemaTableFactoryRef = Arc<dyn InformationSchemaTableFactory + Send + Sync>;
138
139/// The `information_schema` tables info provider.
140pub struct InformationSchemaProvider {
141    catalog_name: String,
142    catalog_manager: Weak<dyn CatalogManager>,
143    process_manager: Option<ProcessManagerRef>,
144    flow_metadata_manager: Arc<FlowMetadataManager>,
145    tables: HashMap<String, TableRef>,
146    kv_backend: KvBackendRef,
147    extra_table_factories: HashMap<String, InformationSchemaTableFactoryRef>,
148}
149
150impl SystemSchemaProvider for InformationSchemaProvider {
151    fn tables(&self) -> &HashMap<String, TableRef> {
152        assert!(!self.tables.is_empty());
153
154        &self.tables
155    }
156}
157
158impl SystemSchemaProviderInner for InformationSchemaProvider {
159    fn catalog_name(&self) -> &str {
160        &self.catalog_name
161    }
162    fn schema_name() -> &'static str {
163        INFORMATION_SCHEMA_NAME
164    }
165
166    fn system_table(&self, name: &str) -> Option<SystemTableRef> {
167        if let Some(factory) = self.extra_table_factories.get(name) {
168            let req = MakeInformationTableRequest {
169                catalog_name: self.catalog_name.clone(),
170                catalog_manager: self.catalog_manager.clone(),
171                kv_backend: self.kv_backend.clone(),
172            };
173            return Some(factory.make_information_table(req));
174        }
175
176        match name.to_ascii_lowercase().as_str() {
177            TABLES => Some(Arc::new(InformationSchemaTables::new(
178                self.catalog_name.clone(),
179                self.catalog_manager.clone(),
180            )) as _),
181            COLUMNS => Some(Arc::new(InformationSchemaColumns::new(
182                self.catalog_name.clone(),
183                self.catalog_manager.clone(),
184            )) as _),
185            ENGINES => setup_memory_table!(ENGINES),
186            COLUMN_PRIVILEGES => setup_memory_table!(COLUMN_PRIVILEGES),
187            COLUMN_STATISTICS => setup_memory_table!(COLUMN_STATISTICS),
188            BUILD_INFO => setup_memory_table!(BUILD_INFO),
189            CHARACTER_SETS => setup_memory_table!(CHARACTER_SETS),
190            COLLATIONS => setup_memory_table!(COLLATIONS),
191            COLLATION_CHARACTER_SET_APPLICABILITY => {
192                setup_memory_table!(COLLATION_CHARACTER_SET_APPLICABILITY)
193            }
194            CHECK_CONSTRAINTS => setup_memory_table!(CHECK_CONSTRAINTS),
195            EVENTS => setup_memory_table!(EVENTS),
196            FILES => setup_memory_table!(FILES),
197            OPTIMIZER_TRACE => setup_memory_table!(OPTIMIZER_TRACE),
198            PARAMETERS => setup_memory_table!(PARAMETERS),
199            PROFILING => setup_memory_table!(PROFILING),
200            REFERENTIAL_CONSTRAINTS => setup_memory_table!(REFERENTIAL_CONSTRAINTS),
201            ROUTINES => setup_memory_table!(ROUTINES),
202            SCHEMA_PRIVILEGES => setup_memory_table!(SCHEMA_PRIVILEGES),
203            TABLE_PRIVILEGES => setup_memory_table!(TABLE_PRIVILEGES),
204            GLOBAL_STATUS => setup_memory_table!(GLOBAL_STATUS),
205            SESSION_STATUS => setup_memory_table!(SESSION_STATUS),
206            KEY_COLUMN_USAGE => Some(Arc::new(InformationSchemaKeyColumnUsage::new(
207                self.catalog_name.clone(),
208                self.catalog_manager.clone(),
209            )) as _),
210            SCHEMATA => Some(Arc::new(InformationSchemaSchemata::new(
211                self.catalog_name.clone(),
212                self.catalog_manager.clone(),
213            )) as _),
214            PARTITIONS => Some(Arc::new(InformationSchemaPartitions::new(
215                self.catalog_name.clone(),
216                self.catalog_manager.clone(),
217            )) as _),
218            REGION_PEERS => Some(Arc::new(InformationSchemaRegionPeers::new(
219                self.catalog_name.clone(),
220                self.catalog_manager.clone(),
221            )) as _),
222            TABLE_CONSTRAINTS => Some(Arc::new(InformationSchemaTableConstraints::new(
223                self.catalog_name.clone(),
224                self.catalog_manager.clone(),
225            )) as _),
226            CLUSTER_INFO => Some(Arc::new(InformationSchemaClusterInfo::new(
227                self.catalog_manager.clone(),
228            )) as _),
229            VIEWS => Some(Arc::new(InformationSchemaViews::new(
230                self.catalog_name.clone(),
231                self.catalog_manager.clone(),
232            )) as _),
233            FLOWS => Some(Arc::new(InformationSchemaFlows::new(
234                self.catalog_name.clone(),
235                self.catalog_manager.clone(),
236                self.flow_metadata_manager.clone(),
237            )) as _),
238            PROCEDURE_INFO => Some(
239                Arc::new(procedure_info::InformationSchemaProcedureInfo::new(
240                    self.catalog_manager.clone(),
241                )) as _,
242            ),
243            REGION_STATISTICS => Some(Arc::new(
244                region_statistics::InformationSchemaRegionStatistics::new(
245                    self.catalog_manager.clone(),
246                ),
247            ) as _),
248            REGION_INFO => Some(Arc::new(InformationSchemaRegionInfo::new(
249                self.catalog_manager.clone(),
250            )) as _),
251            PROCESS_LIST => self
252                .process_manager
253                .as_ref()
254                .map(|p| Arc::new(InformationSchemaProcessList::new(p.clone())) as _),
255            SSTS_MANIFEST => Some(Arc::new(InformationSchemaSstsManifest::new(
256                self.catalog_manager.clone(),
257            )) as _),
258            SSTS_STORAGE => Some(Arc::new(InformationSchemaSstsStorage::new(
259                self.catalog_manager.clone(),
260            )) as _),
261            SSTS_INDEX_META => Some(Arc::new(InformationSchemaSstsIndexMeta::new(
262                self.catalog_manager.clone(),
263            )) as _),
264            _ => None,
265        }
266    }
267}
268
269impl InformationSchemaProvider {
270    pub fn new(
271        catalog_name: String,
272        catalog_manager: Weak<dyn CatalogManager>,
273        flow_metadata_manager: Arc<FlowMetadataManager>,
274        process_manager: Option<ProcessManagerRef>,
275        kv_backend: KvBackendRef,
276    ) -> Self {
277        let mut provider = Self {
278            catalog_name,
279            catalog_manager,
280            flow_metadata_manager,
281            process_manager,
282            tables: HashMap::new(),
283            kv_backend,
284            extra_table_factories: HashMap::new(),
285        };
286
287        provider.build_tables();
288
289        provider
290    }
291
292    pub(crate) fn with_extra_table_factories(
293        mut self,
294        factories: HashMap<String, InformationSchemaTableFactoryRef>,
295    ) -> Self {
296        self.extra_table_factories = factories;
297        self.build_tables();
298        self
299    }
300
301    fn build_tables(&mut self) {
302        let mut tables = HashMap::new();
303
304        // SECURITY NOTE:
305        // Carefully consider the tables that may expose sensitive cluster configurations,
306        // authentication details, and other critical information.
307        // Only put these tables under `greptime` catalog to prevent info leak.
308        if self.catalog_name == DEFAULT_CATALOG_NAME {
309            tables.insert(
310                BUILD_INFO.to_string(),
311                self.build_table(BUILD_INFO).unwrap(),
312            );
313            tables.insert(
314                REGION_PEERS.to_string(),
315                self.build_table(REGION_PEERS).unwrap(),
316            );
317            tables.insert(
318                CLUSTER_INFO.to_string(),
319                self.build_table(CLUSTER_INFO).unwrap(),
320            );
321            tables.insert(
322                PROCEDURE_INFO.to_string(),
323                self.build_table(PROCEDURE_INFO).unwrap(),
324            );
325            tables.insert(
326                REGION_STATISTICS.to_string(),
327                self.build_table(REGION_STATISTICS).unwrap(),
328            );
329            tables.insert(
330                REGION_INFO.to_string(),
331                self.build_table(REGION_INFO).unwrap(),
332            );
333            tables.insert(
334                SSTS_MANIFEST.to_string(),
335                self.build_table(SSTS_MANIFEST).unwrap(),
336            );
337            tables.insert(
338                SSTS_STORAGE.to_string(),
339                self.build_table(SSTS_STORAGE).unwrap(),
340            );
341            tables.insert(
342                SSTS_INDEX_META.to_string(),
343                self.build_table(SSTS_INDEX_META).unwrap(),
344            );
345        }
346
347        tables.insert(TABLES.to_string(), self.build_table(TABLES).unwrap());
348        tables.insert(VIEWS.to_string(), self.build_table(VIEWS).unwrap());
349        tables.insert(SCHEMATA.to_string(), self.build_table(SCHEMATA).unwrap());
350        tables.insert(COLUMNS.to_string(), self.build_table(COLUMNS).unwrap());
351        tables.insert(
352            KEY_COLUMN_USAGE.to_string(),
353            self.build_table(KEY_COLUMN_USAGE).unwrap(),
354        );
355        tables.insert(
356            TABLE_CONSTRAINTS.to_string(),
357            self.build_table(TABLE_CONSTRAINTS).unwrap(),
358        );
359        tables.insert(FLOWS.to_string(), self.build_table(FLOWS).unwrap());
360        if let Some(process_list) = self.build_table(PROCESS_LIST) {
361            tables.insert(PROCESS_LIST.to_string(), process_list);
362        }
363        for name in self.extra_table_factories.keys() {
364            tables.insert(name.clone(), self.build_table(name).expect(name));
365        }
366        // Add memory tables
367        for name in MEMORY_TABLES.iter() {
368            tables.insert((*name).to_string(), self.build_table(name).expect(name));
369        }
370        self.tables = tables;
371    }
372}
373
374pub trait InformationTable {
375    fn table_id(&self) -> TableId;
376
377    fn table_name(&self) -> &'static str;
378
379    fn schema(&self) -> SchemaRef;
380
381    fn to_stream(&self, request: ScanRequest) -> Result<SendableRecordBatchStream>;
382
383    fn table_type(&self) -> TableType {
384        TableType::Temporary
385    }
386}
387
388// Provide compatibility for legacy `information_schema` code.
389impl<T> SystemTable for T
390where
391    T: InformationTable,
392{
393    fn table_id(&self) -> TableId {
394        InformationTable::table_id(self)
395    }
396
397    fn table_name(&self) -> &'static str {
398        InformationTable::table_name(self)
399    }
400
401    fn schema(&self) -> SchemaRef {
402        InformationTable::schema(self)
403    }
404
405    fn table_type(&self) -> TableType {
406        InformationTable::table_type(self)
407    }
408
409    fn to_stream(&self, request: ScanRequest) -> Result<SendableRecordBatchStream> {
410        InformationTable::to_stream(self, request)
411    }
412}
413
414pub type InformationExtensionRef = Arc<dyn InformationExtension<Error = Error> + Send + Sync>;
415
416/// The `InformationExtension` trait provides the extension methods for the `information_schema` tables.
417#[async_trait::async_trait]
418pub trait InformationExtension {
419    type Error: ErrorExt;
420
421    /// Gets the nodes information.
422    async fn nodes(&self) -> std::result::Result<Vec<NodeInfo>, Self::Error>;
423
424    /// Gets the procedures information.
425    async fn procedures(&self) -> std::result::Result<Vec<(String, ProcedureInfo)>, Self::Error>;
426
427    /// Gets the region statistics.
428    async fn region_stats(&self) -> std::result::Result<Vec<RegionStat>, Self::Error>;
429
430    /// Get the flow statistics. If no flownode is available, return `None`.
431    async fn flow_stats(&self) -> std::result::Result<Option<FlowStat>, Self::Error>;
432
433    /// Inspects the datanode.
434    async fn inspect_datanode(
435        &self,
436        request: DatanodeInspectRequest,
437    ) -> std::result::Result<SendableRecordBatchStream, Self::Error>;
438}
439
440/// The request to inspect the datanode.
441#[derive(Debug, Clone, PartialEq)]
442pub struct DatanodeInspectRequest {
443    /// Kind to fetch from datanode.
444    pub kind: DatanodeInspectKind,
445
446    /// Pushdown scan configuration (projection/predicate/limit) for the returned stream.
447    /// This allows server-side filtering to reduce I/O and network costs.
448    pub scan: ScanRequest,
449}
450
451/// The kind of the datanode inspect request.
452#[derive(Debug, Clone, Copy, PartialEq, Eq)]
453pub enum DatanodeInspectKind {
454    /// List SST entries recorded in manifest
455    SstManifest,
456    /// List SST entries discovered in storage layer
457    SstStorage,
458    /// List index metadata collected from manifest
459    SstIndexMeta,
460    /// List region runtime and manifest info
461    RegionInfo,
462}
463
464impl DatanodeInspectRequest {
465    /// Builds a logical plan for the datanode inspect request.
466    pub fn build_plan(self) -> std::result::Result<LogicalPlan, DataFusionError> {
467        match self.kind {
468            DatanodeInspectKind::SstManifest => ManifestSstEntry::build_plan(self.scan),
469            DatanodeInspectKind::SstStorage => StorageSstEntry::build_plan(self.scan),
470            DatanodeInspectKind::SstIndexMeta => PuffinIndexMetaEntry::build_plan(self.scan),
471            DatanodeInspectKind::RegionInfo => RegionInfoEntry::build_plan(self.scan),
472        }
473    }
474}
475pub struct NoopInformationExtension;
476
477#[async_trait::async_trait]
478impl InformationExtension for NoopInformationExtension {
479    type Error = Error;
480
481    async fn nodes(&self) -> std::result::Result<Vec<NodeInfo>, Self::Error> {
482        Ok(vec![])
483    }
484
485    async fn procedures(&self) -> std::result::Result<Vec<(String, ProcedureInfo)>, Self::Error> {
486        Ok(vec![])
487    }
488
489    async fn region_stats(&self) -> std::result::Result<Vec<RegionStat>, Self::Error> {
490        Ok(vec![])
491    }
492
493    async fn flow_stats(&self) -> std::result::Result<Option<FlowStat>, Self::Error> {
494        Ok(None)
495    }
496
497    async fn inspect_datanode(
498        &self,
499        _request: DatanodeInspectRequest,
500    ) -> std::result::Result<SendableRecordBatchStream, Self::Error> {
501        Ok(common_recordbatch::RecordBatches::empty().as_stream())
502    }
503}
504
505#[cfg(test)]
506mod tests {
507    use store_api::region_info::RegionInfoEntry;
508
509    use super::*;
510
511    #[test]
512    fn test_datanode_inspect_region_info_build_plan() {
513        let plan = DatanodeInspectRequest {
514            kind: DatanodeInspectKind::RegionInfo,
515            scan: ScanRequest::default(),
516        }
517        .build_plan()
518        .unwrap();
519
520        let LogicalPlan::TableScan(scan) = plan else {
521            panic!("expected table scan");
522        };
523        assert_eq!(
524            scan.table_name.to_string(),
525            RegionInfoEntry::reserved_table_name_for_inspection()
526        );
527    }
528}