catalog/system_schema/information_schema/
views.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::{Arc, Weak};
16
17use arrow_schema::SchemaRef as ArrowSchemaRef;
18use common_catalog::consts::INFORMATION_SCHEMA_VIEW_TABLE_ID;
19use common_error::ext::BoxedError;
20use common_recordbatch::adapter::RecordBatchStreamAdapter;
21use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
22use datafusion::execution::TaskContext;
23use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter;
24use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream;
25use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream;
26use datatypes::prelude::{ConcreteDataType, ScalarVectorBuilder, VectorRef};
27use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
28use datatypes::value::Value;
29use datatypes::vectors::StringVectorBuilder;
30use futures::TryStreamExt;
31use snafu::{OptionExt, ResultExt};
32use store_api::storage::{ScanRequest, TableId};
33use table::metadata::TableType;
34
35use crate::error::{
36    CastManagerSnafu, CreateRecordBatchSnafu, GetViewCacheSnafu, InternalSnafu, Result,
37    UpgradeWeakCatalogManagerRefSnafu, ViewInfoNotFoundSnafu,
38};
39use crate::kvbackend::KvBackendCatalogManager;
40use crate::system_schema::information_schema::{InformationTable, Predicates, VIEWS};
41use crate::CatalogManager;
42const INIT_CAPACITY: usize = 42;
43
44pub const TABLE_CATALOG: &str = "table_catalog";
45pub const TABLE_SCHEMA: &str = "table_schema";
46pub const TABLE_NAME: &str = "table_name";
47pub const VIEW_DEFINITION: &str = "view_definition";
48pub const CHECK_OPTION: &str = "check_option";
49pub const IS_UPDATABLE: &str = "is_updatable";
50pub const DEFINER: &str = "definer";
51pub const SECURITY_TYPE: &str = "security_type";
52pub const CHARACTER_SET_CLIENT: &str = "character_set_client";
53pub const COLLATION_CONNECTION: &str = "collation_connection";
54
55/// The `information_schema.views` to provides information about views in databases.
56#[derive(Debug)]
57pub(super) struct InformationSchemaViews {
58    schema: SchemaRef,
59    catalog_name: String,
60    catalog_manager: Weak<dyn CatalogManager>,
61}
62
63impl InformationSchemaViews {
64    pub(super) fn new(catalog_name: String, catalog_manager: Weak<dyn CatalogManager>) -> Self {
65        Self {
66            schema: Self::schema(),
67            catalog_name,
68            catalog_manager,
69        }
70    }
71
72    pub(crate) fn schema() -> SchemaRef {
73        Arc::new(Schema::new(vec![
74            ColumnSchema::new(TABLE_CATALOG, ConcreteDataType::string_datatype(), false),
75            ColumnSchema::new(TABLE_SCHEMA, ConcreteDataType::string_datatype(), false),
76            ColumnSchema::new(TABLE_NAME, ConcreteDataType::string_datatype(), false),
77            ColumnSchema::new(VIEW_DEFINITION, ConcreteDataType::string_datatype(), false),
78            ColumnSchema::new(CHECK_OPTION, ConcreteDataType::string_datatype(), true),
79            ColumnSchema::new(IS_UPDATABLE, ConcreteDataType::string_datatype(), true),
80            ColumnSchema::new(DEFINER, ConcreteDataType::string_datatype(), true),
81            ColumnSchema::new(SECURITY_TYPE, ConcreteDataType::string_datatype(), true),
82            ColumnSchema::new(
83                CHARACTER_SET_CLIENT,
84                ConcreteDataType::string_datatype(),
85                true,
86            ),
87            ColumnSchema::new(
88                COLLATION_CONNECTION,
89                ConcreteDataType::string_datatype(),
90                true,
91            ),
92        ]))
93    }
94
95    fn builder(&self) -> InformationSchemaViewsBuilder {
96        InformationSchemaViewsBuilder::new(
97            self.schema.clone(),
98            self.catalog_name.clone(),
99            self.catalog_manager.clone(),
100        )
101    }
102}
103
104impl InformationTable for InformationSchemaViews {
105    fn table_id(&self) -> TableId {
106        INFORMATION_SCHEMA_VIEW_TABLE_ID
107    }
108
109    fn table_name(&self) -> &'static str {
110        VIEWS
111    }
112
113    fn schema(&self) -> SchemaRef {
114        self.schema.clone()
115    }
116
117    fn to_stream(&self, request: ScanRequest) -> Result<SendableRecordBatchStream> {
118        let schema = self.schema.arrow_schema().clone();
119        let mut builder = self.builder();
120        let stream = Box::pin(DfRecordBatchStreamAdapter::new(
121            schema,
122            futures::stream::once(async move {
123                builder
124                    .make_views(Some(request))
125                    .await
126                    .map(|x| x.into_df_record_batch())
127                    .map_err(|err| datafusion::error::DataFusionError::External(Box::new(err)))
128            }),
129        ));
130        Ok(Box::pin(
131            RecordBatchStreamAdapter::try_new(stream)
132                .map_err(BoxedError::new)
133                .context(InternalSnafu)?,
134        ))
135    }
136}
137
138/// Builds the `information_schema.VIEWS` table row by row
139///
140/// Columns are based on <https://dev.mysql.com/doc/refman/8.4/en/information-schema-views-table.html>
141struct InformationSchemaViewsBuilder {
142    schema: SchemaRef,
143    catalog_name: String,
144    catalog_manager: Weak<dyn CatalogManager>,
145
146    catalog_names: StringVectorBuilder,
147    schema_names: StringVectorBuilder,
148    table_names: StringVectorBuilder,
149    view_definitions: StringVectorBuilder,
150    check_options: StringVectorBuilder,
151    is_updatable: StringVectorBuilder,
152    definer: StringVectorBuilder,
153    security_type: StringVectorBuilder,
154    character_set_client: StringVectorBuilder,
155    collation_connection: StringVectorBuilder,
156}
157
158impl InformationSchemaViewsBuilder {
159    fn new(
160        schema: SchemaRef,
161        catalog_name: String,
162        catalog_manager: Weak<dyn CatalogManager>,
163    ) -> Self {
164        Self {
165            schema,
166            catalog_name,
167            catalog_manager,
168            catalog_names: StringVectorBuilder::with_capacity(INIT_CAPACITY),
169            schema_names: StringVectorBuilder::with_capacity(INIT_CAPACITY),
170            table_names: StringVectorBuilder::with_capacity(INIT_CAPACITY),
171            view_definitions: StringVectorBuilder::with_capacity(INIT_CAPACITY),
172            check_options: StringVectorBuilder::with_capacity(INIT_CAPACITY),
173            is_updatable: StringVectorBuilder::with_capacity(INIT_CAPACITY),
174            definer: StringVectorBuilder::with_capacity(INIT_CAPACITY),
175            security_type: StringVectorBuilder::with_capacity(INIT_CAPACITY),
176            character_set_client: StringVectorBuilder::with_capacity(INIT_CAPACITY),
177            collation_connection: StringVectorBuilder::with_capacity(INIT_CAPACITY),
178        }
179    }
180
181    /// Construct the `information_schema.views` virtual table
182    async fn make_views(&mut self, request: Option<ScanRequest>) -> Result<RecordBatch> {
183        let catalog_name = self.catalog_name.clone();
184        let catalog_manager = self
185            .catalog_manager
186            .upgrade()
187            .context(UpgradeWeakCatalogManagerRefSnafu)?;
188        let predicates = Predicates::from_scan_request(&request);
189        let view_info_cache = catalog_manager
190            .as_any()
191            .downcast_ref::<KvBackendCatalogManager>()
192            .context(CastManagerSnafu)?
193            .view_info_cache()?;
194
195        for schema_name in catalog_manager.schema_names(&catalog_name, None).await? {
196            let mut stream = catalog_manager.tables(&catalog_name, &schema_name, None);
197
198            while let Some(table) = stream.try_next().await? {
199                let table_info = table.table_info();
200                if table_info.table_type == TableType::View {
201                    let view_info = view_info_cache
202                        .get(table_info.ident.table_id)
203                        .await
204                        .context(GetViewCacheSnafu)?
205                        .context(ViewInfoNotFoundSnafu {
206                            name: &table_info.name,
207                        })?;
208                    self.add_view(
209                        &predicates,
210                        &catalog_name,
211                        &schema_name,
212                        &table_info.name,
213                        &view_info.definition,
214                    )
215                }
216            }
217        }
218
219        self.finish()
220    }
221
222    fn add_view(
223        &mut self,
224        predicates: &Predicates,
225        catalog_name: &str,
226        schema_name: &str,
227        table_name: &str,
228        definition: &str,
229    ) {
230        let row = [
231            (TABLE_CATALOG, &Value::from(catalog_name)),
232            (TABLE_SCHEMA, &Value::from(schema_name)),
233            (TABLE_NAME, &Value::from(table_name)),
234        ];
235
236        if !predicates.eval(&row) {
237            return;
238        }
239        self.catalog_names.push(Some(catalog_name));
240        self.schema_names.push(Some(schema_name));
241        self.table_names.push(Some(table_name));
242        self.view_definitions.push(Some(definition));
243        self.check_options.push(None);
244        // View is not updatable, statements such UPDATE , DELETE , and INSERT are illegal and are rejected.
245        self.is_updatable.push(Some("NO"));
246        self.definer.push(None);
247        self.security_type.push(None);
248        self.character_set_client.push(Some("utf8"));
249        self.collation_connection.push(Some("utf8_bin"));
250    }
251
252    fn finish(&mut self) -> Result<RecordBatch> {
253        let columns: Vec<VectorRef> = vec![
254            Arc::new(self.catalog_names.finish()),
255            Arc::new(self.schema_names.finish()),
256            Arc::new(self.table_names.finish()),
257            Arc::new(self.view_definitions.finish()),
258            Arc::new(self.check_options.finish()),
259            Arc::new(self.is_updatable.finish()),
260            Arc::new(self.definer.finish()),
261            Arc::new(self.security_type.finish()),
262            Arc::new(self.character_set_client.finish()),
263            Arc::new(self.collation_connection.finish()),
264        ];
265        RecordBatch::new(self.schema.clone(), columns).context(CreateRecordBatchSnafu)
266    }
267}
268
269impl DfPartitionStream for InformationSchemaViews {
270    fn schema(&self) -> &ArrowSchemaRef {
271        self.schema.arrow_schema()
272    }
273
274    fn execute(&self, _: Arc<TaskContext>) -> DfSendableRecordBatchStream {
275        let schema = self.schema.arrow_schema().clone();
276        let mut builder = self.builder();
277        Box::pin(DfRecordBatchStreamAdapter::new(
278            schema,
279            futures::stream::once(async move {
280                builder
281                    .make_views(None)
282                    .await
283                    .map(|x| x.into_df_record_batch())
284                    .map_err(Into::into)
285            }),
286        ))
287    }
288}