catalog/system_schema/
information_schema.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod cluster_info;
16pub mod columns;
17pub mod flows;
18mod information_memory_table;
19pub mod key_column_usage;
20mod partitions;
21mod procedure_info;
22pub mod process_list;
23pub mod region_peers;
24mod region_statistics;
25pub mod schemata;
26mod ssts;
27mod table_constraints;
28mod table_names;
29pub mod tables;
30mod views;
31
32use std::collections::HashMap;
33use std::sync::{Arc, Weak};
34
35use common_catalog::consts::{self, DEFAULT_CATALOG_NAME, INFORMATION_SCHEMA_NAME};
36use common_error::ext::ErrorExt;
37use common_meta::cluster::NodeInfo;
38use common_meta::datanode::RegionStat;
39use common_meta::key::flow::FlowMetadataManager;
40use common_meta::key::flow::flow_state::FlowStat;
41use common_meta::kv_backend::KvBackendRef;
42use common_procedure::ProcedureInfo;
43use common_recordbatch::SendableRecordBatchStream;
44use datafusion::error::DataFusionError;
45use datafusion::logical_expr::LogicalPlan;
46use datatypes::schema::SchemaRef;
47use lazy_static::lazy_static;
48use paste::paste;
49use process_list::InformationSchemaProcessList;
50use store_api::sst_entry::{ManifestSstEntry, PuffinIndexMetaEntry, StorageSstEntry};
51use store_api::storage::{ScanRequest, TableId};
52use table::TableRef;
53use table::metadata::TableType;
54pub use table_names::*;
55use views::InformationSchemaViews;
56
57use self::columns::InformationSchemaColumns;
58use crate::CatalogManager;
59use crate::error::{Error, Result};
60use crate::process_manager::ProcessManagerRef;
61use crate::system_schema::information_schema::cluster_info::InformationSchemaClusterInfo;
62use crate::system_schema::information_schema::flows::InformationSchemaFlows;
63use crate::system_schema::information_schema::information_memory_table::get_schema_columns;
64use crate::system_schema::information_schema::key_column_usage::InformationSchemaKeyColumnUsage;
65use crate::system_schema::information_schema::partitions::InformationSchemaPartitions;
66use crate::system_schema::information_schema::region_peers::InformationSchemaRegionPeers;
67use crate::system_schema::information_schema::schemata::InformationSchemaSchemata;
68use crate::system_schema::information_schema::ssts::{
69    InformationSchemaSstsIndexMeta, InformationSchemaSstsManifest, InformationSchemaSstsStorage,
70};
71use crate::system_schema::information_schema::table_constraints::InformationSchemaTableConstraints;
72use crate::system_schema::information_schema::tables::InformationSchemaTables;
73use crate::system_schema::memory_table::MemoryTable;
74pub(crate) use crate::system_schema::predicate::Predicates;
75use crate::system_schema::{
76    SystemSchemaProvider, SystemSchemaProviderInner, SystemTable, SystemTableRef,
77};
78
79lazy_static! {
80    // Memory tables in `information_schema`.
81    static ref MEMORY_TABLES: &'static [&'static str] = &[
82        ENGINES,
83        COLUMN_PRIVILEGES,
84        COLUMN_STATISTICS,
85        CHARACTER_SETS,
86        COLLATIONS,
87        COLLATION_CHARACTER_SET_APPLICABILITY,
88        CHECK_CONSTRAINTS,
89        EVENTS,
90        FILES,
91        OPTIMIZER_TRACE,
92        PARAMETERS,
93        PROFILING,
94        REFERENTIAL_CONSTRAINTS,
95        ROUTINES,
96        SCHEMA_PRIVILEGES,
97        TABLE_PRIVILEGES,
98        GLOBAL_STATUS,
99        SESSION_STATUS,
100        PARTITIONS,
101    ];
102}
103
104macro_rules! setup_memory_table {
105    ($name: expr) => {
106        paste! {
107            {
108                let (schema, columns) = get_schema_columns($name);
109                Some(Arc::new(MemoryTable::new(
110                    consts::[<INFORMATION_SCHEMA_ $name  _TABLE_ID>],
111                    $name,
112                    schema,
113                    columns
114                )) as _)
115            }
116        }
117    };
118}
119
120#[cfg(feature = "enterprise")]
121pub struct MakeInformationTableRequest {
122    pub catalog_name: String,
123    pub catalog_manager: Weak<dyn CatalogManager>,
124    pub kv_backend: KvBackendRef,
125}
126
127/// A factory trait for making information schema tables.
128///
129/// This trait allows for extensibility of the information schema by providing
130/// a way to dynamically create custom information schema tables.
131#[cfg(feature = "enterprise")]
132pub trait InformationSchemaTableFactory {
133    fn make_information_table(&self, req: MakeInformationTableRequest) -> SystemTableRef;
134}
135
136#[cfg(feature = "enterprise")]
137pub type InformationSchemaTableFactoryRef = Arc<dyn InformationSchemaTableFactory + Send + Sync>;
138
139/// The `information_schema` tables info provider.
140pub struct InformationSchemaProvider {
141    catalog_name: String,
142    catalog_manager: Weak<dyn CatalogManager>,
143    process_manager: Option<ProcessManagerRef>,
144    flow_metadata_manager: Arc<FlowMetadataManager>,
145    tables: HashMap<String, TableRef>,
146    #[allow(dead_code)]
147    kv_backend: KvBackendRef,
148    #[cfg(feature = "enterprise")]
149    extra_table_factories: HashMap<String, InformationSchemaTableFactoryRef>,
150}
151
152impl SystemSchemaProvider for InformationSchemaProvider {
153    fn tables(&self) -> &HashMap<String, TableRef> {
154        assert!(!self.tables.is_empty());
155
156        &self.tables
157    }
158}
159
160impl SystemSchemaProviderInner for InformationSchemaProvider {
161    fn catalog_name(&self) -> &str {
162        &self.catalog_name
163    }
164    fn schema_name() -> &'static str {
165        INFORMATION_SCHEMA_NAME
166    }
167
168    fn system_table(&self, name: &str) -> Option<SystemTableRef> {
169        #[cfg(feature = "enterprise")]
170        if let Some(factory) = self.extra_table_factories.get(name) {
171            let req = MakeInformationTableRequest {
172                catalog_name: self.catalog_name.clone(),
173                catalog_manager: self.catalog_manager.clone(),
174                kv_backend: self.kv_backend.clone(),
175            };
176            return Some(factory.make_information_table(req));
177        }
178
179        match name.to_ascii_lowercase().as_str() {
180            TABLES => Some(Arc::new(InformationSchemaTables::new(
181                self.catalog_name.clone(),
182                self.catalog_manager.clone(),
183            )) as _),
184            COLUMNS => Some(Arc::new(InformationSchemaColumns::new(
185                self.catalog_name.clone(),
186                self.catalog_manager.clone(),
187            )) as _),
188            ENGINES => setup_memory_table!(ENGINES),
189            COLUMN_PRIVILEGES => setup_memory_table!(COLUMN_PRIVILEGES),
190            COLUMN_STATISTICS => setup_memory_table!(COLUMN_STATISTICS),
191            BUILD_INFO => setup_memory_table!(BUILD_INFO),
192            CHARACTER_SETS => setup_memory_table!(CHARACTER_SETS),
193            COLLATIONS => setup_memory_table!(COLLATIONS),
194            COLLATION_CHARACTER_SET_APPLICABILITY => {
195                setup_memory_table!(COLLATION_CHARACTER_SET_APPLICABILITY)
196            }
197            CHECK_CONSTRAINTS => setup_memory_table!(CHECK_CONSTRAINTS),
198            EVENTS => setup_memory_table!(EVENTS),
199            FILES => setup_memory_table!(FILES),
200            OPTIMIZER_TRACE => setup_memory_table!(OPTIMIZER_TRACE),
201            PARAMETERS => setup_memory_table!(PARAMETERS),
202            PROFILING => setup_memory_table!(PROFILING),
203            REFERENTIAL_CONSTRAINTS => setup_memory_table!(REFERENTIAL_CONSTRAINTS),
204            ROUTINES => setup_memory_table!(ROUTINES),
205            SCHEMA_PRIVILEGES => setup_memory_table!(SCHEMA_PRIVILEGES),
206            TABLE_PRIVILEGES => setup_memory_table!(TABLE_PRIVILEGES),
207            GLOBAL_STATUS => setup_memory_table!(GLOBAL_STATUS),
208            SESSION_STATUS => setup_memory_table!(SESSION_STATUS),
209            KEY_COLUMN_USAGE => Some(Arc::new(InformationSchemaKeyColumnUsage::new(
210                self.catalog_name.clone(),
211                self.catalog_manager.clone(),
212            )) as _),
213            SCHEMATA => Some(Arc::new(InformationSchemaSchemata::new(
214                self.catalog_name.clone(),
215                self.catalog_manager.clone(),
216            )) as _),
217            PARTITIONS => Some(Arc::new(InformationSchemaPartitions::new(
218                self.catalog_name.clone(),
219                self.catalog_manager.clone(),
220            )) as _),
221            REGION_PEERS => Some(Arc::new(InformationSchemaRegionPeers::new(
222                self.catalog_name.clone(),
223                self.catalog_manager.clone(),
224            )) as _),
225            TABLE_CONSTRAINTS => Some(Arc::new(InformationSchemaTableConstraints::new(
226                self.catalog_name.clone(),
227                self.catalog_manager.clone(),
228            )) as _),
229            CLUSTER_INFO => Some(Arc::new(InformationSchemaClusterInfo::new(
230                self.catalog_manager.clone(),
231            )) as _),
232            VIEWS => Some(Arc::new(InformationSchemaViews::new(
233                self.catalog_name.clone(),
234                self.catalog_manager.clone(),
235            )) as _),
236            FLOWS => Some(Arc::new(InformationSchemaFlows::new(
237                self.catalog_name.clone(),
238                self.catalog_manager.clone(),
239                self.flow_metadata_manager.clone(),
240            )) as _),
241            PROCEDURE_INFO => Some(
242                Arc::new(procedure_info::InformationSchemaProcedureInfo::new(
243                    self.catalog_manager.clone(),
244                )) as _,
245            ),
246            REGION_STATISTICS => Some(Arc::new(
247                region_statistics::InformationSchemaRegionStatistics::new(
248                    self.catalog_manager.clone(),
249                ),
250            ) as _),
251            PROCESS_LIST => self
252                .process_manager
253                .as_ref()
254                .map(|p| Arc::new(InformationSchemaProcessList::new(p.clone())) as _),
255            SSTS_MANIFEST => Some(Arc::new(InformationSchemaSstsManifest::new(
256                self.catalog_manager.clone(),
257            )) as _),
258            SSTS_STORAGE => Some(Arc::new(InformationSchemaSstsStorage::new(
259                self.catalog_manager.clone(),
260            )) as _),
261            SSTS_INDEX_META => Some(Arc::new(InformationSchemaSstsIndexMeta::new(
262                self.catalog_manager.clone(),
263            )) as _),
264            _ => None,
265        }
266    }
267}
268
269impl InformationSchemaProvider {
270    pub fn new(
271        catalog_name: String,
272        catalog_manager: Weak<dyn CatalogManager>,
273        flow_metadata_manager: Arc<FlowMetadataManager>,
274        process_manager: Option<ProcessManagerRef>,
275        kv_backend: KvBackendRef,
276    ) -> Self {
277        let mut provider = Self {
278            catalog_name,
279            catalog_manager,
280            flow_metadata_manager,
281            process_manager,
282            tables: HashMap::new(),
283            kv_backend,
284            #[cfg(feature = "enterprise")]
285            extra_table_factories: HashMap::new(),
286        };
287
288        provider.build_tables();
289
290        provider
291    }
292
293    #[cfg(feature = "enterprise")]
294    pub(crate) fn with_extra_table_factories(
295        mut self,
296        factories: HashMap<String, InformationSchemaTableFactoryRef>,
297    ) -> Self {
298        self.extra_table_factories = factories;
299        self.build_tables();
300        self
301    }
302
303    fn build_tables(&mut self) {
304        let mut tables = HashMap::new();
305
306        // SECURITY NOTE:
307        // Carefully consider the tables that may expose sensitive cluster configurations,
308        // authentication details, and other critical information.
309        // Only put these tables under `greptime` catalog to prevent info leak.
310        if self.catalog_name == DEFAULT_CATALOG_NAME {
311            tables.insert(
312                BUILD_INFO.to_string(),
313                self.build_table(BUILD_INFO).unwrap(),
314            );
315            tables.insert(
316                REGION_PEERS.to_string(),
317                self.build_table(REGION_PEERS).unwrap(),
318            );
319            tables.insert(
320                CLUSTER_INFO.to_string(),
321                self.build_table(CLUSTER_INFO).unwrap(),
322            );
323            tables.insert(
324                PROCEDURE_INFO.to_string(),
325                self.build_table(PROCEDURE_INFO).unwrap(),
326            );
327            tables.insert(
328                REGION_STATISTICS.to_string(),
329                self.build_table(REGION_STATISTICS).unwrap(),
330            );
331            tables.insert(
332                SSTS_MANIFEST.to_string(),
333                self.build_table(SSTS_MANIFEST).unwrap(),
334            );
335            tables.insert(
336                SSTS_STORAGE.to_string(),
337                self.build_table(SSTS_STORAGE).unwrap(),
338            );
339            tables.insert(
340                SSTS_INDEX_META.to_string(),
341                self.build_table(SSTS_INDEX_META).unwrap(),
342            );
343        }
344
345        tables.insert(TABLES.to_string(), self.build_table(TABLES).unwrap());
346        tables.insert(VIEWS.to_string(), self.build_table(VIEWS).unwrap());
347        tables.insert(SCHEMATA.to_string(), self.build_table(SCHEMATA).unwrap());
348        tables.insert(COLUMNS.to_string(), self.build_table(COLUMNS).unwrap());
349        tables.insert(
350            KEY_COLUMN_USAGE.to_string(),
351            self.build_table(KEY_COLUMN_USAGE).unwrap(),
352        );
353        tables.insert(
354            TABLE_CONSTRAINTS.to_string(),
355            self.build_table(TABLE_CONSTRAINTS).unwrap(),
356        );
357        tables.insert(FLOWS.to_string(), self.build_table(FLOWS).unwrap());
358        if let Some(process_list) = self.build_table(PROCESS_LIST) {
359            tables.insert(PROCESS_LIST.to_string(), process_list);
360        }
361        #[cfg(feature = "enterprise")]
362        for name in self.extra_table_factories.keys() {
363            tables.insert(name.clone(), self.build_table(name).expect(name));
364        }
365        // Add memory tables
366        for name in MEMORY_TABLES.iter() {
367            tables.insert((*name).to_string(), self.build_table(name).expect(name));
368        }
369        self.tables = tables;
370    }
371}
372
373pub trait InformationTable {
374    fn table_id(&self) -> TableId;
375
376    fn table_name(&self) -> &'static str;
377
378    fn schema(&self) -> SchemaRef;
379
380    fn to_stream(&self, request: ScanRequest) -> Result<SendableRecordBatchStream>;
381
382    fn table_type(&self) -> TableType {
383        TableType::Temporary
384    }
385}
386
387// Provide compatibility for legacy `information_schema` code.
388impl<T> SystemTable for T
389where
390    T: InformationTable,
391{
392    fn table_id(&self) -> TableId {
393        InformationTable::table_id(self)
394    }
395
396    fn table_name(&self) -> &'static str {
397        InformationTable::table_name(self)
398    }
399
400    fn schema(&self) -> SchemaRef {
401        InformationTable::schema(self)
402    }
403
404    fn table_type(&self) -> TableType {
405        InformationTable::table_type(self)
406    }
407
408    fn to_stream(&self, request: ScanRequest) -> Result<SendableRecordBatchStream> {
409        InformationTable::to_stream(self, request)
410    }
411}
412
413pub type InformationExtensionRef = Arc<dyn InformationExtension<Error = Error> + Send + Sync>;
414
415/// The `InformationExtension` trait provides the extension methods for the `information_schema` tables.
416#[async_trait::async_trait]
417pub trait InformationExtension {
418    type Error: ErrorExt;
419
420    /// Gets the nodes information.
421    async fn nodes(&self) -> std::result::Result<Vec<NodeInfo>, Self::Error>;
422
423    /// Gets the procedures information.
424    async fn procedures(&self) -> std::result::Result<Vec<(String, ProcedureInfo)>, Self::Error>;
425
426    /// Gets the region statistics.
427    async fn region_stats(&self) -> std::result::Result<Vec<RegionStat>, Self::Error>;
428
429    /// Get the flow statistics. If no flownode is available, return `None`.
430    async fn flow_stats(&self) -> std::result::Result<Option<FlowStat>, Self::Error>;
431
432    /// Inspects the datanode.
433    async fn inspect_datanode(
434        &self,
435        request: DatanodeInspectRequest,
436    ) -> std::result::Result<SendableRecordBatchStream, Self::Error>;
437}
438
439/// The request to inspect the datanode.
440#[derive(Debug, Clone, PartialEq, Eq)]
441pub struct DatanodeInspectRequest {
442    /// Kind to fetch from datanode.
443    pub kind: DatanodeInspectKind,
444
445    /// Pushdown scan configuration (projection/predicate/limit) for the returned stream.
446    /// This allows server-side filtering to reduce I/O and network costs.
447    pub scan: ScanRequest,
448}
449
450/// The kind of the datanode inspect request.
451#[derive(Debug, Clone, Copy, PartialEq, Eq)]
452pub enum DatanodeInspectKind {
453    /// List SST entries recorded in manifest
454    SstManifest,
455    /// List SST entries discovered in storage layer
456    SstStorage,
457    /// List index metadata collected from manifest
458    SstIndexMeta,
459}
460
461impl DatanodeInspectRequest {
462    /// Builds a logical plan for the datanode inspect request.
463    pub fn build_plan(self) -> std::result::Result<LogicalPlan, DataFusionError> {
464        match self.kind {
465            DatanodeInspectKind::SstManifest => ManifestSstEntry::build_plan(self.scan),
466            DatanodeInspectKind::SstStorage => StorageSstEntry::build_plan(self.scan),
467            DatanodeInspectKind::SstIndexMeta => PuffinIndexMetaEntry::build_plan(self.scan),
468        }
469    }
470}
471pub struct NoopInformationExtension;
472
473#[async_trait::async_trait]
474impl InformationExtension for NoopInformationExtension {
475    type Error = Error;
476
477    async fn nodes(&self) -> std::result::Result<Vec<NodeInfo>, Self::Error> {
478        Ok(vec![])
479    }
480
481    async fn procedures(&self) -> std::result::Result<Vec<(String, ProcedureInfo)>, Self::Error> {
482        Ok(vec![])
483    }
484
485    async fn region_stats(&self) -> std::result::Result<Vec<RegionStat>, Self::Error> {
486        Ok(vec![])
487    }
488
489    async fn flow_stats(&self) -> std::result::Result<Option<FlowStat>, Self::Error> {
490        Ok(None)
491    }
492
493    async fn inspect_datanode(
494        &self,
495        _request: DatanodeInspectRequest,
496    ) -> std::result::Result<SendableRecordBatchStream, Self::Error> {
497        Ok(common_recordbatch::RecordBatches::empty().as_stream())
498    }
499}