catalog/
system_schema.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod information_schema;
16mod memory_table;
17pub mod pg_catalog;
18mod predicate;
19mod utils;
20
21use std::collections::HashMap;
22use std::sync::Arc;
23
24use common_error::ext::BoxedError;
25use common_recordbatch::{RecordBatchStreamWrapper, SendableRecordBatchStream};
26use datatypes::schema::SchemaRef;
27use futures_util::StreamExt;
28use snafu::ResultExt;
29use store_api::data_source::DataSource;
30use store_api::storage::ScanRequest;
31use table::error::{SchemaConversionSnafu, TablesRecordBatchSnafu};
32use table::metadata::{
33    FilterPushDownType, TableId, TableInfoBuilder, TableInfoRef, TableMetaBuilder, TableType,
34};
35use table::{Table, TableRef};
36
37use crate::error::Result;
38
39pub trait SystemSchemaProvider {
40    /// Returns a map of [TableRef] in information schema.
41    fn tables(&self) -> &HashMap<String, TableRef>;
42
43    /// Returns the [TableRef] by table name.
44    fn table(&self, name: &str) -> Option<TableRef> {
45        self.tables().get(name).cloned()
46    }
47
48    /// Returns table names in the order of table id.
49    fn table_names(&self) -> Vec<String> {
50        let mut tables = self.tables().values().clone().collect::<Vec<_>>();
51
52        tables.sort_by(|t1, t2| {
53            t1.table_info()
54                .table_id()
55                .partial_cmp(&t2.table_info().table_id())
56                .unwrap()
57        });
58        tables
59            .into_iter()
60            .map(|t| t.table_info().name.clone())
61            .collect()
62    }
63}
64
65trait SystemSchemaProviderInner {
66    fn catalog_name(&self) -> &str;
67    fn schema_name() -> &'static str;
68    fn build_table(&self, name: &str) -> Option<TableRef> {
69        self.system_table(name).map(|table| {
70            let table_info = Self::table_info(self.catalog_name().to_string(), &table);
71            let filter_pushdown = FilterPushDownType::Inexact;
72            let data_source = Arc::new(SystemTableDataSource::new(table));
73            let table = Table::new(table_info, filter_pushdown, data_source);
74            Arc::new(table)
75        })
76    }
77    fn system_table(&self, name: &str) -> Option<SystemTableRef>;
78
79    fn table_info(catalog_name: String, table: &SystemTableRef) -> TableInfoRef {
80        let table_meta = TableMetaBuilder::empty()
81            .schema(table.schema())
82            .primary_key_indices(vec![])
83            .next_column_id(0)
84            .build()
85            .unwrap();
86        let table_info = TableInfoBuilder::default()
87            .table_id(table.table_id())
88            .name(table.table_name().to_string())
89            .catalog_name(catalog_name)
90            .schema_name(Self::schema_name().to_string())
91            .meta(table_meta)
92            .table_type(table.table_type())
93            .build()
94            .unwrap();
95        Arc::new(table_info)
96    }
97}
98
99pub(crate) trait SystemTable {
100    fn table_id(&self) -> TableId;
101
102    fn table_name(&self) -> &'static str;
103
104    fn schema(&self) -> SchemaRef;
105
106    fn to_stream(&self, request: ScanRequest) -> Result<SendableRecordBatchStream>;
107
108    fn table_type(&self) -> TableType {
109        TableType::Temporary
110    }
111}
112
113pub(crate) type SystemTableRef = Arc<dyn SystemTable + Send + Sync>;
114
115struct SystemTableDataSource {
116    table: SystemTableRef,
117}
118
119impl SystemTableDataSource {
120    fn new(table: SystemTableRef) -> Self {
121        Self { table }
122    }
123
124    fn try_project(&self, projection: &[usize]) -> std::result::Result<SchemaRef, BoxedError> {
125        let schema = self
126            .table
127            .schema()
128            .try_project(projection)
129            .context(SchemaConversionSnafu)
130            .map_err(BoxedError::new)?;
131        Ok(Arc::new(schema))
132    }
133}
134
135impl DataSource for SystemTableDataSource {
136    fn get_stream(
137        &self,
138        request: ScanRequest,
139    ) -> std::result::Result<SendableRecordBatchStream, BoxedError> {
140        let projection = request.projection.clone();
141        let projected_schema = match &projection {
142            Some(projection) => self.try_project(projection)?,
143            None => self.table.schema(),
144        };
145
146        let stream = self
147            .table
148            .to_stream(request)
149            .map_err(BoxedError::new)
150            .context(TablesRecordBatchSnafu)
151            .map_err(BoxedError::new)?
152            .map(move |batch| match &projection {
153                Some(p) => batch.and_then(|b| b.try_project(p)),
154                None => batch,
155            });
156
157        let stream = RecordBatchStreamWrapper {
158            schema: projected_schema,
159            stream: Box::pin(stream),
160            output_ordering: None,
161            metrics: Default::default(),
162        };
163
164        Ok(Box::pin(stream))
165    }
166}