catalog/system_schema/
information_schema.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod cluster_info;
16pub mod columns;
17pub mod flows;
18mod information_memory_table;
19pub mod key_column_usage;
20mod partitions;
21mod procedure_info;
22pub mod process_list;
23pub mod region_peers;
24mod region_statistics;
25mod runtime_metrics;
26pub mod schemata;
27mod ssts;
28mod table_constraints;
29mod table_names;
30pub mod tables;
31mod views;
32
33use std::collections::HashMap;
34use std::sync::{Arc, Weak};
35
36use common_catalog::consts::{self, DEFAULT_CATALOG_NAME, INFORMATION_SCHEMA_NAME};
37use common_error::ext::ErrorExt;
38use common_meta::cluster::NodeInfo;
39use common_meta::datanode::RegionStat;
40use common_meta::key::flow::FlowMetadataManager;
41use common_meta::key::flow::flow_state::FlowStat;
42use common_meta::kv_backend::KvBackendRef;
43use common_procedure::ProcedureInfo;
44use common_recordbatch::SendableRecordBatchStream;
45use datafusion::error::DataFusionError;
46use datafusion::logical_expr::LogicalPlan;
47use datatypes::schema::SchemaRef;
48use lazy_static::lazy_static;
49use paste::paste;
50use process_list::InformationSchemaProcessList;
51use store_api::sst_entry::{ManifestSstEntry, StorageSstEntry};
52use store_api::storage::{ScanRequest, TableId};
53use table::TableRef;
54use table::metadata::TableType;
55pub use table_names::*;
56use views::InformationSchemaViews;
57
58use self::columns::InformationSchemaColumns;
59use crate::CatalogManager;
60use crate::error::{Error, Result};
61use crate::process_manager::ProcessManagerRef;
62use crate::system_schema::information_schema::cluster_info::InformationSchemaClusterInfo;
63use crate::system_schema::information_schema::flows::InformationSchemaFlows;
64use crate::system_schema::information_schema::information_memory_table::get_schema_columns;
65use crate::system_schema::information_schema::key_column_usage::InformationSchemaKeyColumnUsage;
66use crate::system_schema::information_schema::partitions::InformationSchemaPartitions;
67use crate::system_schema::information_schema::region_peers::InformationSchemaRegionPeers;
68use crate::system_schema::information_schema::runtime_metrics::InformationSchemaMetrics;
69use crate::system_schema::information_schema::schemata::InformationSchemaSchemata;
70use crate::system_schema::information_schema::ssts::{
71    InformationSchemaSstsManifest, InformationSchemaSstsStorage,
72};
73use crate::system_schema::information_schema::table_constraints::InformationSchemaTableConstraints;
74use crate::system_schema::information_schema::tables::InformationSchemaTables;
75use crate::system_schema::memory_table::MemoryTable;
76pub(crate) use crate::system_schema::predicate::Predicates;
77use crate::system_schema::{
78    SystemSchemaProvider, SystemSchemaProviderInner, SystemTable, SystemTableRef,
79};
80
81lazy_static! {
82    // Memory tables in `information_schema`.
83    static ref MEMORY_TABLES: &'static [&'static str] = &[
84        ENGINES,
85        COLUMN_PRIVILEGES,
86        COLUMN_STATISTICS,
87        CHARACTER_SETS,
88        COLLATIONS,
89        COLLATION_CHARACTER_SET_APPLICABILITY,
90        CHECK_CONSTRAINTS,
91        EVENTS,
92        FILES,
93        OPTIMIZER_TRACE,
94        PARAMETERS,
95        PROFILING,
96        REFERENTIAL_CONSTRAINTS,
97        ROUTINES,
98        SCHEMA_PRIVILEGES,
99        TABLE_PRIVILEGES,
100        TRIGGERS,
101        GLOBAL_STATUS,
102        SESSION_STATUS,
103        PARTITIONS,
104    ];
105}
106
107macro_rules! setup_memory_table {
108    ($name: expr) => {
109        paste! {
110            {
111                let (schema, columns) = get_schema_columns($name);
112                Some(Arc::new(MemoryTable::new(
113                    consts::[<INFORMATION_SCHEMA_ $name  _TABLE_ID>],
114                    $name,
115                    schema,
116                    columns
117                )) as _)
118            }
119        }
120    };
121}
122
123#[cfg(feature = "enterprise")]
124pub struct MakeInformationTableRequest {
125    pub catalog_name: String,
126    pub catalog_manager: Weak<dyn CatalogManager>,
127    pub kv_backend: KvBackendRef,
128}
129
130/// A factory trait for making information schema tables.
131///
132/// This trait allows for extensibility of the information schema by providing
133/// a way to dynamically create custom information schema tables.
134#[cfg(feature = "enterprise")]
135pub trait InformationSchemaTableFactory {
136    fn make_information_table(&self, req: MakeInformationTableRequest) -> SystemTableRef;
137}
138
139#[cfg(feature = "enterprise")]
140pub type InformationSchemaTableFactoryRef = Arc<dyn InformationSchemaTableFactory + Send + Sync>;
141
142/// The `information_schema` tables info provider.
143pub struct InformationSchemaProvider {
144    catalog_name: String,
145    catalog_manager: Weak<dyn CatalogManager>,
146    process_manager: Option<ProcessManagerRef>,
147    flow_metadata_manager: Arc<FlowMetadataManager>,
148    tables: HashMap<String, TableRef>,
149    #[allow(dead_code)]
150    kv_backend: KvBackendRef,
151    #[cfg(feature = "enterprise")]
152    extra_table_factories: HashMap<String, InformationSchemaTableFactoryRef>,
153}
154
155impl SystemSchemaProvider for InformationSchemaProvider {
156    fn tables(&self) -> &HashMap<String, TableRef> {
157        assert!(!self.tables.is_empty());
158
159        &self.tables
160    }
161}
162
163impl SystemSchemaProviderInner for InformationSchemaProvider {
164    fn catalog_name(&self) -> &str {
165        &self.catalog_name
166    }
167    fn schema_name() -> &'static str {
168        INFORMATION_SCHEMA_NAME
169    }
170
171    fn system_table(&self, name: &str) -> Option<SystemTableRef> {
172        #[cfg(feature = "enterprise")]
173        if let Some(factory) = self.extra_table_factories.get(name) {
174            let req = MakeInformationTableRequest {
175                catalog_name: self.catalog_name.clone(),
176                catalog_manager: self.catalog_manager.clone(),
177                kv_backend: self.kv_backend.clone(),
178            };
179            return Some(factory.make_information_table(req));
180        }
181
182        match name.to_ascii_lowercase().as_str() {
183            TABLES => Some(Arc::new(InformationSchemaTables::new(
184                self.catalog_name.clone(),
185                self.catalog_manager.clone(),
186            )) as _),
187            COLUMNS => Some(Arc::new(InformationSchemaColumns::new(
188                self.catalog_name.clone(),
189                self.catalog_manager.clone(),
190            )) as _),
191            ENGINES => setup_memory_table!(ENGINES),
192            COLUMN_PRIVILEGES => setup_memory_table!(COLUMN_PRIVILEGES),
193            COLUMN_STATISTICS => setup_memory_table!(COLUMN_STATISTICS),
194            BUILD_INFO => setup_memory_table!(BUILD_INFO),
195            CHARACTER_SETS => setup_memory_table!(CHARACTER_SETS),
196            COLLATIONS => setup_memory_table!(COLLATIONS),
197            COLLATION_CHARACTER_SET_APPLICABILITY => {
198                setup_memory_table!(COLLATION_CHARACTER_SET_APPLICABILITY)
199            }
200            CHECK_CONSTRAINTS => setup_memory_table!(CHECK_CONSTRAINTS),
201            EVENTS => setup_memory_table!(EVENTS),
202            FILES => setup_memory_table!(FILES),
203            OPTIMIZER_TRACE => setup_memory_table!(OPTIMIZER_TRACE),
204            PARAMETERS => setup_memory_table!(PARAMETERS),
205            PROFILING => setup_memory_table!(PROFILING),
206            REFERENTIAL_CONSTRAINTS => setup_memory_table!(REFERENTIAL_CONSTRAINTS),
207            ROUTINES => setup_memory_table!(ROUTINES),
208            SCHEMA_PRIVILEGES => setup_memory_table!(SCHEMA_PRIVILEGES),
209            TABLE_PRIVILEGES => setup_memory_table!(TABLE_PRIVILEGES),
210            TRIGGERS => setup_memory_table!(TRIGGERS),
211            GLOBAL_STATUS => setup_memory_table!(GLOBAL_STATUS),
212            SESSION_STATUS => setup_memory_table!(SESSION_STATUS),
213            KEY_COLUMN_USAGE => Some(Arc::new(InformationSchemaKeyColumnUsage::new(
214                self.catalog_name.clone(),
215                self.catalog_manager.clone(),
216            )) as _),
217            SCHEMATA => Some(Arc::new(InformationSchemaSchemata::new(
218                self.catalog_name.clone(),
219                self.catalog_manager.clone(),
220            )) as _),
221            RUNTIME_METRICS => Some(Arc::new(InformationSchemaMetrics::new())),
222            PARTITIONS => Some(Arc::new(InformationSchemaPartitions::new(
223                self.catalog_name.clone(),
224                self.catalog_manager.clone(),
225            )) as _),
226            REGION_PEERS => Some(Arc::new(InformationSchemaRegionPeers::new(
227                self.catalog_name.clone(),
228                self.catalog_manager.clone(),
229            )) as _),
230            TABLE_CONSTRAINTS => Some(Arc::new(InformationSchemaTableConstraints::new(
231                self.catalog_name.clone(),
232                self.catalog_manager.clone(),
233            )) as _),
234            CLUSTER_INFO => Some(Arc::new(InformationSchemaClusterInfo::new(
235                self.catalog_manager.clone(),
236            )) as _),
237            VIEWS => Some(Arc::new(InformationSchemaViews::new(
238                self.catalog_name.clone(),
239                self.catalog_manager.clone(),
240            )) as _),
241            FLOWS => Some(Arc::new(InformationSchemaFlows::new(
242                self.catalog_name.clone(),
243                self.catalog_manager.clone(),
244                self.flow_metadata_manager.clone(),
245            )) as _),
246            PROCEDURE_INFO => Some(
247                Arc::new(procedure_info::InformationSchemaProcedureInfo::new(
248                    self.catalog_manager.clone(),
249                )) as _,
250            ),
251            REGION_STATISTICS => Some(Arc::new(
252                region_statistics::InformationSchemaRegionStatistics::new(
253                    self.catalog_manager.clone(),
254                ),
255            ) as _),
256            PROCESS_LIST => self
257                .process_manager
258                .as_ref()
259                .map(|p| Arc::new(InformationSchemaProcessList::new(p.clone())) as _),
260            SSTS_MANIFEST => Some(Arc::new(InformationSchemaSstsManifest::new(
261                self.catalog_manager.clone(),
262            )) as _),
263            SSTS_STORAGE => Some(Arc::new(InformationSchemaSstsStorage::new(
264                self.catalog_manager.clone(),
265            )) as _),
266            _ => None,
267        }
268    }
269}
270
271impl InformationSchemaProvider {
272    pub fn new(
273        catalog_name: String,
274        catalog_manager: Weak<dyn CatalogManager>,
275        flow_metadata_manager: Arc<FlowMetadataManager>,
276        process_manager: Option<ProcessManagerRef>,
277        kv_backend: KvBackendRef,
278    ) -> Self {
279        let mut provider = Self {
280            catalog_name,
281            catalog_manager,
282            flow_metadata_manager,
283            process_manager,
284            tables: HashMap::new(),
285            kv_backend,
286            #[cfg(feature = "enterprise")]
287            extra_table_factories: HashMap::new(),
288        };
289
290        provider.build_tables();
291
292        provider
293    }
294
295    #[cfg(feature = "enterprise")]
296    pub(crate) fn with_extra_table_factories(
297        mut self,
298        factories: HashMap<String, InformationSchemaTableFactoryRef>,
299    ) -> Self {
300        self.extra_table_factories = factories;
301        self.build_tables();
302        self
303    }
304
305    fn build_tables(&mut self) {
306        let mut tables = HashMap::new();
307
308        // SECURITY NOTE:
309        // Carefully consider the tables that may expose sensitive cluster configurations,
310        // authentication details, and other critical information.
311        // Only put these tables under `greptime` catalog to prevent info leak.
312        if self.catalog_name == DEFAULT_CATALOG_NAME {
313            tables.insert(
314                RUNTIME_METRICS.to_string(),
315                self.build_table(RUNTIME_METRICS).unwrap(),
316            );
317            tables.insert(
318                BUILD_INFO.to_string(),
319                self.build_table(BUILD_INFO).unwrap(),
320            );
321            tables.insert(
322                REGION_PEERS.to_string(),
323                self.build_table(REGION_PEERS).unwrap(),
324            );
325            tables.insert(
326                CLUSTER_INFO.to_string(),
327                self.build_table(CLUSTER_INFO).unwrap(),
328            );
329            tables.insert(
330                PROCEDURE_INFO.to_string(),
331                self.build_table(PROCEDURE_INFO).unwrap(),
332            );
333            tables.insert(
334                REGION_STATISTICS.to_string(),
335                self.build_table(REGION_STATISTICS).unwrap(),
336            );
337            tables.insert(
338                SSTS_MANIFEST.to_string(),
339                self.build_table(SSTS_MANIFEST).unwrap(),
340            );
341            tables.insert(
342                SSTS_STORAGE.to_string(),
343                self.build_table(SSTS_STORAGE).unwrap(),
344            );
345        }
346
347        tables.insert(TABLES.to_string(), self.build_table(TABLES).unwrap());
348        tables.insert(VIEWS.to_string(), self.build_table(VIEWS).unwrap());
349        tables.insert(SCHEMATA.to_string(), self.build_table(SCHEMATA).unwrap());
350        tables.insert(COLUMNS.to_string(), self.build_table(COLUMNS).unwrap());
351        tables.insert(
352            KEY_COLUMN_USAGE.to_string(),
353            self.build_table(KEY_COLUMN_USAGE).unwrap(),
354        );
355        tables.insert(
356            TABLE_CONSTRAINTS.to_string(),
357            self.build_table(TABLE_CONSTRAINTS).unwrap(),
358        );
359        tables.insert(FLOWS.to_string(), self.build_table(FLOWS).unwrap());
360        if let Some(process_list) = self.build_table(PROCESS_LIST) {
361            tables.insert(PROCESS_LIST.to_string(), process_list);
362        }
363        #[cfg(feature = "enterprise")]
364        for name in self.extra_table_factories.keys() {
365            tables.insert(name.clone(), self.build_table(name).expect(name));
366        }
367        // Add memory tables
368        for name in MEMORY_TABLES.iter() {
369            tables.insert((*name).to_string(), self.build_table(name).expect(name));
370        }
371        self.tables = tables;
372    }
373}
374
375pub trait InformationTable {
376    fn table_id(&self) -> TableId;
377
378    fn table_name(&self) -> &'static str;
379
380    fn schema(&self) -> SchemaRef;
381
382    fn to_stream(&self, request: ScanRequest) -> Result<SendableRecordBatchStream>;
383
384    fn table_type(&self) -> TableType {
385        TableType::Temporary
386    }
387}
388
389// Provide compatibility for legacy `information_schema` code.
390impl<T> SystemTable for T
391where
392    T: InformationTable,
393{
394    fn table_id(&self) -> TableId {
395        InformationTable::table_id(self)
396    }
397
398    fn table_name(&self) -> &'static str {
399        InformationTable::table_name(self)
400    }
401
402    fn schema(&self) -> SchemaRef {
403        InformationTable::schema(self)
404    }
405
406    fn table_type(&self) -> TableType {
407        InformationTable::table_type(self)
408    }
409
410    fn to_stream(&self, request: ScanRequest) -> Result<SendableRecordBatchStream> {
411        InformationTable::to_stream(self, request)
412    }
413}
414
415pub type InformationExtensionRef = Arc<dyn InformationExtension<Error = Error> + Send + Sync>;
416
417/// The `InformationExtension` trait provides the extension methods for the `information_schema` tables.
418#[async_trait::async_trait]
419pub trait InformationExtension {
420    type Error: ErrorExt;
421
422    /// Gets the nodes information.
423    async fn nodes(&self) -> std::result::Result<Vec<NodeInfo>, Self::Error>;
424
425    /// Gets the procedures information.
426    async fn procedures(&self) -> std::result::Result<Vec<(String, ProcedureInfo)>, Self::Error>;
427
428    /// Gets the region statistics.
429    async fn region_stats(&self) -> std::result::Result<Vec<RegionStat>, Self::Error>;
430
431    /// Get the flow statistics. If no flownode is available, return `None`.
432    async fn flow_stats(&self) -> std::result::Result<Option<FlowStat>, Self::Error>;
433
434    /// Inspects the datanode.
435    async fn inspect_datanode(
436        &self,
437        request: DatanodeInspectRequest,
438    ) -> std::result::Result<SendableRecordBatchStream, Self::Error>;
439}
440
441/// The request to inspect the datanode.
442#[derive(Debug, Clone, PartialEq, Eq)]
443pub struct DatanodeInspectRequest {
444    /// Kind to fetch from datanode.
445    pub kind: DatanodeInspectKind,
446
447    /// Pushdown scan configuration (projection/predicate/limit) for the returned stream.
448    /// This allows server-side filtering to reduce I/O and network costs.
449    pub scan: ScanRequest,
450}
451
452/// The kind of the datanode inspect request.
453#[derive(Debug, Clone, Copy, PartialEq, Eq)]
454pub enum DatanodeInspectKind {
455    /// List SST entries recorded in manifest
456    SstManifest,
457    /// List SST entries discovered in storage layer
458    SstStorage,
459}
460
461impl DatanodeInspectRequest {
462    /// Builds a logical plan for the datanode inspect request.
463    pub fn build_plan(self) -> std::result::Result<LogicalPlan, DataFusionError> {
464        match self.kind {
465            DatanodeInspectKind::SstManifest => ManifestSstEntry::build_plan(self.scan),
466            DatanodeInspectKind::SstStorage => StorageSstEntry::build_plan(self.scan),
467        }
468    }
469}
470pub struct NoopInformationExtension;
471
472#[async_trait::async_trait]
473impl InformationExtension for NoopInformationExtension {
474    type Error = Error;
475
476    async fn nodes(&self) -> std::result::Result<Vec<NodeInfo>, Self::Error> {
477        Ok(vec![])
478    }
479
480    async fn procedures(&self) -> std::result::Result<Vec<(String, ProcedureInfo)>, Self::Error> {
481        Ok(vec![])
482    }
483
484    async fn region_stats(&self) -> std::result::Result<Vec<RegionStat>, Self::Error> {
485        Ok(vec![])
486    }
487
488    async fn flow_stats(&self) -> std::result::Result<Option<FlowStat>, Self::Error> {
489        Ok(None)
490    }
491
492    async fn inspect_datanode(
493        &self,
494        _request: DatanodeInspectRequest,
495    ) -> std::result::Result<SendableRecordBatchStream, Self::Error> {
496        Ok(common_recordbatch::RecordBatches::empty().as_stream())
497    }
498}