catalog/system_schema/information_schema/
schemata.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::{Arc, Weak};
16
17use arrow_schema::SchemaRef as ArrowSchemaRef;
18use common_catalog::consts::INFORMATION_SCHEMA_SCHEMATA_TABLE_ID;
19use common_error::ext::BoxedError;
20use common_meta::key::schema_name::SchemaNameKey;
21use common_recordbatch::adapter::RecordBatchStreamAdapter;
22use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
23use datafusion::execution::TaskContext;
24use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter;
25use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream;
26use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream;
27use datatypes::prelude::{ConcreteDataType, ScalarVectorBuilder, VectorRef};
28use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
29use datatypes::value::Value;
30use datatypes::vectors::StringVectorBuilder;
31use snafu::{OptionExt, ResultExt};
32use store_api::storage::{ScanRequest, TableId};
33
34use crate::error::{
35    CreateRecordBatchSnafu, InternalSnafu, Result, TableMetadataManagerSnafu,
36    UpgradeWeakCatalogManagerRefSnafu,
37};
38use crate::system_schema::information_schema::{InformationTable, Predicates, SCHEMATA};
39use crate::system_schema::utils;
40use crate::CatalogManager;
41
42pub const CATALOG_NAME: &str = "catalog_name";
43pub const SCHEMA_NAME: &str = "schema_name";
44const DEFAULT_CHARACTER_SET_NAME: &str = "default_character_set_name";
45const DEFAULT_COLLATION_NAME: &str = "default_collation_name";
46/// The database options
47pub const SCHEMA_OPTS: &str = "options";
48const INIT_CAPACITY: usize = 42;
49
50/// The `information_schema.schemata` table implementation.
51#[derive(Debug)]
52pub(super) struct InformationSchemaSchemata {
53    schema: SchemaRef,
54    catalog_name: String,
55    catalog_manager: Weak<dyn CatalogManager>,
56}
57
58impl InformationSchemaSchemata {
59    pub(super) fn new(catalog_name: String, catalog_manager: Weak<dyn CatalogManager>) -> Self {
60        Self {
61            schema: Self::schema(),
62            catalog_name,
63            catalog_manager,
64        }
65    }
66
67    pub(crate) fn schema() -> SchemaRef {
68        Arc::new(Schema::new(vec![
69            ColumnSchema::new(CATALOG_NAME, ConcreteDataType::string_datatype(), false),
70            ColumnSchema::new(SCHEMA_NAME, ConcreteDataType::string_datatype(), false),
71            ColumnSchema::new(
72                DEFAULT_CHARACTER_SET_NAME,
73                ConcreteDataType::string_datatype(),
74                false,
75            ),
76            ColumnSchema::new(
77                DEFAULT_COLLATION_NAME,
78                ConcreteDataType::string_datatype(),
79                false,
80            ),
81            ColumnSchema::new("sql_path", ConcreteDataType::string_datatype(), true),
82            ColumnSchema::new(SCHEMA_OPTS, ConcreteDataType::string_datatype(), true),
83        ]))
84    }
85
86    fn builder(&self) -> InformationSchemaSchemataBuilder {
87        InformationSchemaSchemataBuilder::new(
88            self.schema.clone(),
89            self.catalog_name.clone(),
90            self.catalog_manager.clone(),
91        )
92    }
93}
94
95impl InformationTable for InformationSchemaSchemata {
96    fn table_id(&self) -> TableId {
97        INFORMATION_SCHEMA_SCHEMATA_TABLE_ID
98    }
99
100    fn table_name(&self) -> &'static str {
101        SCHEMATA
102    }
103
104    fn schema(&self) -> SchemaRef {
105        self.schema.clone()
106    }
107
108    fn to_stream(&self, request: ScanRequest) -> Result<SendableRecordBatchStream> {
109        let schema = self.schema.arrow_schema().clone();
110        let mut builder = self.builder();
111        let stream = Box::pin(DfRecordBatchStreamAdapter::new(
112            schema,
113            futures::stream::once(async move {
114                builder
115                    .make_schemata(Some(request))
116                    .await
117                    .map(|x| x.into_df_record_batch())
118                    .map_err(Into::into)
119            }),
120        ));
121        Ok(Box::pin(
122            RecordBatchStreamAdapter::try_new(stream)
123                .map_err(BoxedError::new)
124                .context(InternalSnafu)?,
125        ))
126    }
127}
128
129/// Builds the `information_schema.schemata` table row by row
130///
131/// Columns are based on <https://docs.pingcap.com/tidb/stable/information-schema-schemata>
132struct InformationSchemaSchemataBuilder {
133    schema: SchemaRef,
134    catalog_name: String,
135    catalog_manager: Weak<dyn CatalogManager>,
136
137    catalog_names: StringVectorBuilder,
138    schema_names: StringVectorBuilder,
139    charset_names: StringVectorBuilder,
140    collation_names: StringVectorBuilder,
141    sql_paths: StringVectorBuilder,
142    schema_options: StringVectorBuilder,
143}
144
145impl InformationSchemaSchemataBuilder {
146    fn new(
147        schema: SchemaRef,
148        catalog_name: String,
149        catalog_manager: Weak<dyn CatalogManager>,
150    ) -> Self {
151        Self {
152            schema,
153            catalog_name,
154            catalog_manager,
155            catalog_names: StringVectorBuilder::with_capacity(INIT_CAPACITY),
156            schema_names: StringVectorBuilder::with_capacity(INIT_CAPACITY),
157            charset_names: StringVectorBuilder::with_capacity(INIT_CAPACITY),
158            collation_names: StringVectorBuilder::with_capacity(INIT_CAPACITY),
159            sql_paths: StringVectorBuilder::with_capacity(INIT_CAPACITY),
160            schema_options: StringVectorBuilder::with_capacity(INIT_CAPACITY),
161        }
162    }
163
164    /// Construct the `information_schema.schemata` virtual table
165    async fn make_schemata(&mut self, request: Option<ScanRequest>) -> Result<RecordBatch> {
166        let catalog_name = self.catalog_name.clone();
167        let catalog_manager = self
168            .catalog_manager
169            .upgrade()
170            .context(UpgradeWeakCatalogManagerRefSnafu)?;
171        let table_metadata_manager = utils::table_meta_manager(&self.catalog_manager)?;
172        let predicates = Predicates::from_scan_request(&request);
173
174        for schema_name in catalog_manager.schema_names(&catalog_name, None).await? {
175            let opts = if let Some(table_metadata_manager) = &table_metadata_manager {
176                table_metadata_manager
177                    .schema_manager()
178                    .get(SchemaNameKey::new(&catalog_name, &schema_name))
179                    .await
180                    .context(TableMetadataManagerSnafu)?
181                    // information_schema is not available from this
182                    // table_metadata_manager and we return None
183                    .map(|schema_opts| format!("{}", schema_opts.into_inner()))
184            } else {
185                None
186            };
187
188            self.add_schema(
189                &predicates,
190                &catalog_name,
191                &schema_name,
192                opts.as_deref().unwrap_or(""),
193            );
194        }
195
196        self.finish()
197    }
198
199    fn add_schema(
200        &mut self,
201        predicates: &Predicates,
202        catalog_name: &str,
203        schema_name: &str,
204        schema_options: &str,
205    ) {
206        let row = [
207            (CATALOG_NAME, &Value::from(catalog_name)),
208            (SCHEMA_NAME, &Value::from(schema_name)),
209            (DEFAULT_CHARACTER_SET_NAME, &Value::from("utf8")),
210            (DEFAULT_COLLATION_NAME, &Value::from("utf8_bin")),
211            (SCHEMA_OPTS, &Value::from(schema_options)),
212        ];
213
214        if !predicates.eval(&row) {
215            return;
216        }
217
218        self.catalog_names.push(Some(catalog_name));
219        self.schema_names.push(Some(schema_name));
220        self.charset_names.push(Some("utf8"));
221        self.collation_names.push(Some("utf8_bin"));
222        self.sql_paths.push(None);
223        self.schema_options.push(Some(schema_options));
224    }
225
226    fn finish(&mut self) -> Result<RecordBatch> {
227        let columns: Vec<VectorRef> = vec![
228            Arc::new(self.catalog_names.finish()),
229            Arc::new(self.schema_names.finish()),
230            Arc::new(self.charset_names.finish()),
231            Arc::new(self.collation_names.finish()),
232            Arc::new(self.sql_paths.finish()),
233            Arc::new(self.schema_options.finish()),
234        ];
235        RecordBatch::new(self.schema.clone(), columns).context(CreateRecordBatchSnafu)
236    }
237}
238
239impl DfPartitionStream for InformationSchemaSchemata {
240    fn schema(&self) -> &ArrowSchemaRef {
241        self.schema.arrow_schema()
242    }
243
244    fn execute(&self, _: Arc<TaskContext>) -> DfSendableRecordBatchStream {
245        let schema = self.schema.arrow_schema().clone();
246        let mut builder = self.builder();
247        Box::pin(DfRecordBatchStreamAdapter::new(
248            schema,
249            futures::stream::once(async move {
250                builder
251                    .make_schemata(None)
252                    .await
253                    .map(|x| x.into_df_record_batch())
254                    .map_err(Into::into)
255            }),
256        ))
257    }
258}