Skip to main content

catalog/system_schema/
information_schema.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod cluster_info;
16pub mod columns;
17pub mod flows;
18mod information_memory_table;
19pub mod key_column_usage;
20mod partitions;
21mod procedure_info;
22pub mod process_list;
23mod region_info;
24pub mod region_peers;
25mod region_statistics;
26pub mod schemata;
27mod ssts;
28pub mod statistics;
29mod table_constraints;
30mod table_names;
31mod table_semantics;
32pub mod tables;
33mod views;
34
35use std::collections::HashMap;
36use std::sync::{Arc, Weak};
37
38use common_catalog::consts::{self, DEFAULT_CATALOG_NAME, INFORMATION_SCHEMA_NAME};
39use common_error::ext::ErrorExt;
40use common_meta::cluster::NodeInfo;
41use common_meta::datanode::RegionStat;
42use common_meta::key::flow::FlowMetadataManager;
43use common_meta::key::flow::flow_state::FlowStat;
44use common_meta::kv_backend::KvBackendRef;
45use common_procedure::ProcedureInfo;
46use common_recordbatch::SendableRecordBatchStream;
47use datafusion::error::DataFusionError;
48use datafusion::logical_expr::LogicalPlan;
49use datatypes::schema::SchemaRef;
50use lazy_static::lazy_static;
51use paste::paste;
52use process_list::InformationSchemaProcessList;
53use region_info::InformationSchemaRegionInfo;
54use store_api::metric_engine_consts::{
55    MEMTABLE_PARTITION_TREE_PRIMARY_KEY_ENCODING, PRIMARY_KEY_ENCODING,
56};
57use store_api::region_info::RegionInfoEntry;
58use store_api::sst_entry::{ManifestSstEntry, PuffinIndexMetaEntry, StorageSstEntry};
59use store_api::storage::{ScanRequest, TableId};
60use table::TableRef;
61use table::metadata::TableType;
62pub use table_names::*;
63use views::InformationSchemaViews;
64
65use self::columns::InformationSchemaColumns;
66use crate::CatalogManager;
67use crate::error::{Error, Result};
68use crate::process_manager::ProcessManagerRef;
69use crate::system_schema::information_schema::cluster_info::InformationSchemaClusterInfo;
70use crate::system_schema::information_schema::flows::InformationSchemaFlows;
71use crate::system_schema::information_schema::information_memory_table::get_schema_columns;
72use crate::system_schema::information_schema::key_column_usage::InformationSchemaKeyColumnUsage;
73use crate::system_schema::information_schema::partitions::InformationSchemaPartitions;
74use crate::system_schema::information_schema::region_peers::InformationSchemaRegionPeers;
75use crate::system_schema::information_schema::schemata::InformationSchemaSchemata;
76use crate::system_schema::information_schema::ssts::{
77    InformationSchemaSstsIndexMeta, InformationSchemaSstsManifest, InformationSchemaSstsStorage,
78};
79use crate::system_schema::information_schema::statistics::InformationSchemaStatistics;
80use crate::system_schema::information_schema::table_constraints::InformationSchemaTableConstraints;
81use crate::system_schema::information_schema::table_semantics::InformationSchemaTableSemantics;
82use crate::system_schema::information_schema::tables::InformationSchemaTables;
83use crate::system_schema::memory_table::MemoryTable;
84pub(crate) use crate::system_schema::predicate::Predicates;
85use crate::system_schema::{
86    SystemSchemaProvider, SystemSchemaProviderInner, SystemTable, SystemTableRef,
87};
88
89const DENSE_PRIMARY_KEY_ENCODING: &str = "dense";
90const SPARSE_PRIMARY_KEY_ENCODING: &str = "sparse";
91
92pub(crate) fn primary_key_encoding_index_type(options: &HashMap<String, String>) -> &'static str {
93    options
94        .get(PRIMARY_KEY_ENCODING)
95        .or_else(|| options.get(MEMTABLE_PARTITION_TREE_PRIMARY_KEY_ENCODING))
96        .map(|value| {
97            if value.eq_ignore_ascii_case(SPARSE_PRIMARY_KEY_ENCODING) {
98                SPARSE_PRIMARY_KEY_ENCODING
99            } else {
100                DENSE_PRIMARY_KEY_ENCODING
101            }
102        })
103        .unwrap_or(DENSE_PRIMARY_KEY_ENCODING)
104}
105
106lazy_static! {
107    // Memory tables in `information_schema`.
108    static ref MEMORY_TABLES: &'static [&'static str] = &[
109        ENGINES,
110        COLUMN_PRIVILEGES,
111        COLUMN_STATISTICS,
112        CHARACTER_SETS,
113        COLLATIONS,
114        COLLATION_CHARACTER_SET_APPLICABILITY,
115        CHECK_CONSTRAINTS,
116        EVENTS,
117        FILES,
118        OPTIMIZER_TRACE,
119        PARAMETERS,
120        PROFILING,
121        REFERENTIAL_CONSTRAINTS,
122        ROUTINES,
123        SCHEMA_PRIVILEGES,
124        TABLE_PRIVILEGES,
125        GLOBAL_STATUS,
126        SESSION_STATUS,
127        PARTITIONS,
128    ];
129}
130
131macro_rules! setup_memory_table {
132    ($name: expr) => {
133        paste! {
134            {
135                let (schema, columns) = get_schema_columns($name);
136                Some(Arc::new(MemoryTable::new(
137                    consts::[<INFORMATION_SCHEMA_ $name  _TABLE_ID>],
138                    $name,
139                    schema,
140                    columns
141                )) as _)
142            }
143        }
144    };
145}
146
147pub struct MakeInformationTableRequest {
148    pub catalog_name: String,
149    pub catalog_manager: Weak<dyn CatalogManager>,
150    pub kv_backend: KvBackendRef,
151}
152
153/// A factory trait for making information schema tables.
154///
155/// This trait allows for extensibility of the information schema by providing
156/// a way to dynamically create custom information schema tables.
157pub trait InformationSchemaTableFactory {
158    fn make_information_table(&self, req: MakeInformationTableRequest) -> SystemTableRef;
159}
160
161pub type InformationSchemaTableFactoryRef = Arc<dyn InformationSchemaTableFactory + Send + Sync>;
162
163/// The `information_schema` tables info provider.
164pub struct InformationSchemaProvider {
165    catalog_name: String,
166    catalog_manager: Weak<dyn CatalogManager>,
167    process_manager: Option<ProcessManagerRef>,
168    flow_metadata_manager: Arc<FlowMetadataManager>,
169    tables: HashMap<String, TableRef>,
170    kv_backend: KvBackendRef,
171    extra_table_factories: HashMap<String, InformationSchemaTableFactoryRef>,
172}
173
174impl SystemSchemaProvider for InformationSchemaProvider {
175    fn tables(&self) -> &HashMap<String, TableRef> {
176        assert!(!self.tables.is_empty());
177
178        &self.tables
179    }
180}
181
182impl SystemSchemaProviderInner for InformationSchemaProvider {
183    fn catalog_name(&self) -> &str {
184        &self.catalog_name
185    }
186    fn schema_name() -> &'static str {
187        INFORMATION_SCHEMA_NAME
188    }
189
190    fn system_table(&self, name: &str) -> Option<SystemTableRef> {
191        if let Some(factory) = self.extra_table_factories.get(name) {
192            let req = MakeInformationTableRequest {
193                catalog_name: self.catalog_name.clone(),
194                catalog_manager: self.catalog_manager.clone(),
195                kv_backend: self.kv_backend.clone(),
196            };
197            return Some(factory.make_information_table(req));
198        }
199
200        match name.to_ascii_lowercase().as_str() {
201            TABLES => Some(Arc::new(InformationSchemaTables::new(
202                self.catalog_name.clone(),
203                self.catalog_manager.clone(),
204            )) as _),
205            COLUMNS => Some(Arc::new(InformationSchemaColumns::new(
206                self.catalog_name.clone(),
207                self.catalog_manager.clone(),
208            )) as _),
209            ENGINES => setup_memory_table!(ENGINES),
210            COLUMN_PRIVILEGES => setup_memory_table!(COLUMN_PRIVILEGES),
211            COLUMN_STATISTICS => setup_memory_table!(COLUMN_STATISTICS),
212            BUILD_INFO => setup_memory_table!(BUILD_INFO),
213            CHARACTER_SETS => setup_memory_table!(CHARACTER_SETS),
214            COLLATIONS => setup_memory_table!(COLLATIONS),
215            COLLATION_CHARACTER_SET_APPLICABILITY => {
216                setup_memory_table!(COLLATION_CHARACTER_SET_APPLICABILITY)
217            }
218            CHECK_CONSTRAINTS => setup_memory_table!(CHECK_CONSTRAINTS),
219            EVENTS => setup_memory_table!(EVENTS),
220            FILES => setup_memory_table!(FILES),
221            OPTIMIZER_TRACE => setup_memory_table!(OPTIMIZER_TRACE),
222            PARAMETERS => setup_memory_table!(PARAMETERS),
223            PROFILING => setup_memory_table!(PROFILING),
224            REFERENTIAL_CONSTRAINTS => setup_memory_table!(REFERENTIAL_CONSTRAINTS),
225            ROUTINES => setup_memory_table!(ROUTINES),
226            SCHEMA_PRIVILEGES => setup_memory_table!(SCHEMA_PRIVILEGES),
227            TABLE_PRIVILEGES => setup_memory_table!(TABLE_PRIVILEGES),
228            GLOBAL_STATUS => setup_memory_table!(GLOBAL_STATUS),
229            SESSION_STATUS => setup_memory_table!(SESSION_STATUS),
230            KEY_COLUMN_USAGE => Some(Arc::new(InformationSchemaKeyColumnUsage::new(
231                self.catalog_name.clone(),
232                self.catalog_manager.clone(),
233            )) as _),
234            SCHEMATA => Some(Arc::new(InformationSchemaSchemata::new(
235                self.catalog_name.clone(),
236                self.catalog_manager.clone(),
237            )) as _),
238            PARTITIONS => Some(Arc::new(InformationSchemaPartitions::new(
239                self.catalog_name.clone(),
240                self.catalog_manager.clone(),
241            )) as _),
242            REGION_PEERS => Some(Arc::new(InformationSchemaRegionPeers::new(
243                self.catalog_name.clone(),
244                self.catalog_manager.clone(),
245            )) as _),
246            TABLE_CONSTRAINTS => Some(Arc::new(InformationSchemaTableConstraints::new(
247                self.catalog_name.clone(),
248                self.catalog_manager.clone(),
249            )) as _),
250            STATISTICS => Some(Arc::new(InformationSchemaStatistics::new(
251                self.catalog_name.clone(),
252                self.catalog_manager.clone(),
253            )) as _),
254            CLUSTER_INFO => Some(Arc::new(InformationSchemaClusterInfo::new(
255                self.catalog_manager.clone(),
256            )) as _),
257            VIEWS => Some(Arc::new(InformationSchemaViews::new(
258                self.catalog_name.clone(),
259                self.catalog_manager.clone(),
260            )) as _),
261            FLOWS => Some(Arc::new(InformationSchemaFlows::new(
262                self.catalog_name.clone(),
263                self.catalog_manager.clone(),
264                self.flow_metadata_manager.clone(),
265            )) as _),
266            PROCEDURE_INFO => Some(
267                Arc::new(procedure_info::InformationSchemaProcedureInfo::new(
268                    self.catalog_manager.clone(),
269                )) as _,
270            ),
271            REGION_STATISTICS => Some(Arc::new(
272                region_statistics::InformationSchemaRegionStatistics::new(
273                    self.catalog_manager.clone(),
274                ),
275            ) as _),
276            REGION_INFO => Some(Arc::new(InformationSchemaRegionInfo::new(
277                self.catalog_manager.clone(),
278            )) as _),
279            PROCESS_LIST => self
280                .process_manager
281                .as_ref()
282                .map(|p| Arc::new(InformationSchemaProcessList::new(p.clone())) as _),
283            SSTS_MANIFEST => Some(Arc::new(InformationSchemaSstsManifest::new(
284                self.catalog_manager.clone(),
285            )) as _),
286            SSTS_STORAGE => Some(Arc::new(InformationSchemaSstsStorage::new(
287                self.catalog_manager.clone(),
288            )) as _),
289            SSTS_INDEX_META => Some(Arc::new(InformationSchemaSstsIndexMeta::new(
290                self.catalog_manager.clone(),
291            )) as _),
292            TABLE_SEMANTICS => Some(Arc::new(InformationSchemaTableSemantics::new(
293                self.catalog_name.clone(),
294                self.catalog_manager.clone(),
295            )) as _),
296            _ => None,
297        }
298    }
299}
300
301impl InformationSchemaProvider {
302    pub fn new(
303        catalog_name: String,
304        catalog_manager: Weak<dyn CatalogManager>,
305        flow_metadata_manager: Arc<FlowMetadataManager>,
306        process_manager: Option<ProcessManagerRef>,
307        kv_backend: KvBackendRef,
308    ) -> Self {
309        let mut provider = Self {
310            catalog_name,
311            catalog_manager,
312            flow_metadata_manager,
313            process_manager,
314            tables: HashMap::new(),
315            kv_backend,
316            extra_table_factories: HashMap::new(),
317        };
318
319        provider.build_tables();
320
321        provider
322    }
323
324    pub(crate) fn with_extra_table_factories(
325        mut self,
326        factories: HashMap<String, InformationSchemaTableFactoryRef>,
327    ) -> Self {
328        self.extra_table_factories = factories;
329        self.build_tables();
330        self
331    }
332
333    fn build_tables(&mut self) {
334        let mut tables = HashMap::new();
335
336        // SECURITY NOTE:
337        // Carefully consider the tables that may expose sensitive cluster configurations,
338        // authentication details, and other critical information.
339        // Only put these tables under `greptime` catalog to prevent info leak.
340        if self.catalog_name == DEFAULT_CATALOG_NAME {
341            tables.insert(
342                BUILD_INFO.to_string(),
343                self.build_table(BUILD_INFO).unwrap(),
344            );
345            tables.insert(
346                REGION_PEERS.to_string(),
347                self.build_table(REGION_PEERS).unwrap(),
348            );
349            tables.insert(
350                CLUSTER_INFO.to_string(),
351                self.build_table(CLUSTER_INFO).unwrap(),
352            );
353            tables.insert(
354                PROCEDURE_INFO.to_string(),
355                self.build_table(PROCEDURE_INFO).unwrap(),
356            );
357            tables.insert(
358                REGION_STATISTICS.to_string(),
359                self.build_table(REGION_STATISTICS).unwrap(),
360            );
361            tables.insert(
362                REGION_INFO.to_string(),
363                self.build_table(REGION_INFO).unwrap(),
364            );
365            tables.insert(
366                SSTS_MANIFEST.to_string(),
367                self.build_table(SSTS_MANIFEST).unwrap(),
368            );
369            tables.insert(
370                SSTS_STORAGE.to_string(),
371                self.build_table(SSTS_STORAGE).unwrap(),
372            );
373            tables.insert(
374                SSTS_INDEX_META.to_string(),
375                self.build_table(SSTS_INDEX_META).unwrap(),
376            );
377        }
378
379        tables.insert(TABLES.to_string(), self.build_table(TABLES).unwrap());
380        tables.insert(VIEWS.to_string(), self.build_table(VIEWS).unwrap());
381        tables.insert(SCHEMATA.to_string(), self.build_table(SCHEMATA).unwrap());
382        tables.insert(COLUMNS.to_string(), self.build_table(COLUMNS).unwrap());
383        tables.insert(
384            KEY_COLUMN_USAGE.to_string(),
385            self.build_table(KEY_COLUMN_USAGE).unwrap(),
386        );
387        tables.insert(
388            TABLE_CONSTRAINTS.to_string(),
389            self.build_table(TABLE_CONSTRAINTS).unwrap(),
390        );
391        tables.insert(
392            STATISTICS.to_string(),
393            self.build_table(STATISTICS).unwrap(),
394        );
395        tables.insert(FLOWS.to_string(), self.build_table(FLOWS).unwrap());
396        tables.insert(
397            TABLE_SEMANTICS.to_string(),
398            self.build_table(TABLE_SEMANTICS).unwrap(),
399        );
400        if let Some(process_list) = self.build_table(PROCESS_LIST) {
401            tables.insert(PROCESS_LIST.to_string(), process_list);
402        }
403        for name in self.extra_table_factories.keys() {
404            tables.insert(name.clone(), self.build_table(name).expect(name));
405        }
406        // Add memory tables
407        for name in MEMORY_TABLES.iter() {
408            tables.insert((*name).to_string(), self.build_table(name).expect(name));
409        }
410        self.tables = tables;
411    }
412}
413
414pub trait InformationTable {
415    fn table_id(&self) -> TableId;
416
417    fn table_name(&self) -> &'static str;
418
419    fn schema(&self) -> SchemaRef;
420
421    fn to_stream(&self, request: ScanRequest) -> Result<SendableRecordBatchStream>;
422
423    fn table_type(&self) -> TableType {
424        TableType::Temporary
425    }
426}
427
428// Provide compatibility for legacy `information_schema` code.
429impl<T> SystemTable for T
430where
431    T: InformationTable,
432{
433    fn table_id(&self) -> TableId {
434        InformationTable::table_id(self)
435    }
436
437    fn table_name(&self) -> &'static str {
438        InformationTable::table_name(self)
439    }
440
441    fn schema(&self) -> SchemaRef {
442        InformationTable::schema(self)
443    }
444
445    fn table_type(&self) -> TableType {
446        InformationTable::table_type(self)
447    }
448
449    fn to_stream(&self, request: ScanRequest) -> Result<SendableRecordBatchStream> {
450        InformationTable::to_stream(self, request)
451    }
452}
453
454pub type InformationExtensionRef = Arc<dyn InformationExtension<Error = Error> + Send + Sync>;
455
456/// The `InformationExtension` trait provides the extension methods for the `information_schema` tables.
457#[async_trait::async_trait]
458pub trait InformationExtension {
459    type Error: ErrorExt;
460
461    /// Gets the nodes information.
462    async fn nodes(&self) -> std::result::Result<Vec<NodeInfo>, Self::Error>;
463
464    /// Gets the procedures information.
465    async fn procedures(&self) -> std::result::Result<Vec<(String, ProcedureInfo)>, Self::Error>;
466
467    /// Gets the region statistics.
468    async fn region_stats(&self) -> std::result::Result<Vec<RegionStat>, Self::Error>;
469
470    /// Get the flow statistics. If no flownode is available, return `None`.
471    async fn flow_stats(&self) -> std::result::Result<Option<FlowStat>, Self::Error>;
472
473    /// Inspects the datanode.
474    async fn inspect_datanode(
475        &self,
476        request: DatanodeInspectRequest,
477    ) -> std::result::Result<SendableRecordBatchStream, Self::Error>;
478}
479
480/// The request to inspect the datanode.
481#[derive(Debug, Clone, PartialEq)]
482pub struct DatanodeInspectRequest {
483    /// Kind to fetch from datanode.
484    pub kind: DatanodeInspectKind,
485
486    /// Pushdown scan configuration (projection/predicate/limit) for the returned stream.
487    /// This allows server-side filtering to reduce I/O and network costs.
488    pub scan: ScanRequest,
489}
490
491/// The kind of the datanode inspect request.
492#[derive(Debug, Clone, Copy, PartialEq, Eq)]
493pub enum DatanodeInspectKind {
494    /// List SST entries recorded in manifest
495    SstManifest,
496    /// List SST entries discovered in storage layer
497    SstStorage,
498    /// List index metadata collected from manifest
499    SstIndexMeta,
500    /// List region runtime and manifest info
501    RegionInfo,
502}
503
504impl DatanodeInspectRequest {
505    /// Builds a logical plan for the datanode inspect request.
506    pub fn build_plan(self) -> std::result::Result<LogicalPlan, DataFusionError> {
507        match self.kind {
508            DatanodeInspectKind::SstManifest => ManifestSstEntry::build_plan(self.scan),
509            DatanodeInspectKind::SstStorage => StorageSstEntry::build_plan(self.scan),
510            DatanodeInspectKind::SstIndexMeta => PuffinIndexMetaEntry::build_plan(self.scan),
511            DatanodeInspectKind::RegionInfo => RegionInfoEntry::build_plan(self.scan),
512        }
513    }
514}
515pub struct NoopInformationExtension;
516
517#[async_trait::async_trait]
518impl InformationExtension for NoopInformationExtension {
519    type Error = Error;
520
521    async fn nodes(&self) -> std::result::Result<Vec<NodeInfo>, Self::Error> {
522        Ok(vec![])
523    }
524
525    async fn procedures(&self) -> std::result::Result<Vec<(String, ProcedureInfo)>, Self::Error> {
526        Ok(vec![])
527    }
528
529    async fn region_stats(&self) -> std::result::Result<Vec<RegionStat>, Self::Error> {
530        Ok(vec![])
531    }
532
533    async fn flow_stats(&self) -> std::result::Result<Option<FlowStat>, Self::Error> {
534        Ok(None)
535    }
536
537    async fn inspect_datanode(
538        &self,
539        _request: DatanodeInspectRequest,
540    ) -> std::result::Result<SendableRecordBatchStream, Self::Error> {
541        Ok(common_recordbatch::RecordBatches::empty().as_stream())
542    }
543}
544
545#[cfg(test)]
546mod tests {
547    use store_api::region_info::RegionInfoEntry;
548
549    use super::*;
550
551    #[test]
552    fn test_datanode_inspect_region_info_build_plan() {
553        let plan = DatanodeInspectRequest {
554            kind: DatanodeInspectKind::RegionInfo,
555            scan: ScanRequest::default(),
556        }
557        .build_plan()
558        .unwrap();
559
560        let LogicalPlan::TableScan(scan) = plan else {
561            panic!("expected table scan");
562        };
563        assert_eq!(
564            scan.table_name.to_string(),
565            RegionInfoEntry::reserved_table_name_for_inspection()
566        );
567    }
568}