catalog/system_schema/information_schema/
key_column_usage.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::{Arc, Weak};
16
17use arrow_schema::SchemaRef as ArrowSchemaRef;
18use common_catalog::consts::INFORMATION_SCHEMA_KEY_COLUMN_USAGE_TABLE_ID;
19use common_error::ext::BoxedError;
20use common_recordbatch::adapter::RecordBatchStreamAdapter;
21use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
22use datafusion::execution::TaskContext;
23use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter;
24use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream;
25use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream;
26use datatypes::prelude::{ConcreteDataType, MutableVector, ScalarVectorBuilder, VectorRef};
27use datatypes::schema::{ColumnSchema, FulltextBackend, Schema, SchemaRef};
28use datatypes::value::Value;
29use datatypes::vectors::{ConstantVector, StringVector, StringVectorBuilder, UInt32VectorBuilder};
30use futures_util::TryStreamExt;
31use snafu::{OptionExt, ResultExt};
32use store_api::storage::{ScanRequest, TableId};
33
34use crate::error::{
35    CreateRecordBatchSnafu, InternalSnafu, Result, UpgradeWeakCatalogManagerRefSnafu,
36};
37use crate::system_schema::information_schema::{InformationTable, Predicates, KEY_COLUMN_USAGE};
38use crate::CatalogManager;
39
40pub const CONSTRAINT_SCHEMA: &str = "constraint_schema";
41pub const CONSTRAINT_NAME: &str = "constraint_name";
42// It's always `def` in MySQL
43pub const TABLE_CATALOG: &str = "table_catalog";
44// The real catalog name for this key column.
45pub const REAL_TABLE_CATALOG: &str = "real_table_catalog";
46pub const TABLE_SCHEMA: &str = "table_schema";
47pub const TABLE_NAME: &str = "table_name";
48pub const COLUMN_NAME: &str = "column_name";
49pub const ORDINAL_POSITION: &str = "ordinal_position";
50/// The type of the index.
51pub const GREPTIME_INDEX_TYPE: &str = "greptime_index_type";
52const INIT_CAPACITY: usize = 42;
53
54/// Time index constraint name
55pub(crate) const CONSTRAINT_NAME_TIME_INDEX: &str = "TIME INDEX";
56
57/// Primary key constraint name
58pub(crate) const CONSTRAINT_NAME_PRI: &str = "PRIMARY";
59/// Primary key index type
60pub(crate) const INDEX_TYPE_PRI: &str = "greptime-primary-key-v1";
61
62/// Inverted index constraint name
63pub(crate) const CONSTRAINT_NAME_INVERTED_INDEX: &str = "INVERTED INDEX";
64/// Inverted index type
65pub(crate) const INDEX_TYPE_INVERTED_INDEX: &str = "greptime-inverted-index-v1";
66
67/// Fulltext index constraint name
68pub(crate) const CONSTRAINT_NAME_FULLTEXT_INDEX: &str = "FULLTEXT INDEX";
69/// Fulltext index v1 type
70pub(crate) const INDEX_TYPE_FULLTEXT_TANTIVY: &str = "greptime-fulltext-index-v1";
71/// Fulltext index bloom type
72pub(crate) const INDEX_TYPE_FULLTEXT_BLOOM: &str = "greptime-fulltext-index-bloom";
73
74/// Skipping index constraint name
75pub(crate) const CONSTRAINT_NAME_SKIPPING_INDEX: &str = "SKIPPING INDEX";
76/// Skipping index type
77pub(crate) const INDEX_TYPE_SKIPPING_INDEX: &str = "greptime-bloom-filter-v1";
78
79/// The virtual table implementation for `information_schema.KEY_COLUMN_USAGE`.
80///
81/// Provides an extra column `greptime_index_type` for the index type of the key column.
82#[derive(Debug)]
83pub(super) struct InformationSchemaKeyColumnUsage {
84    schema: SchemaRef,
85    catalog_name: String,
86    catalog_manager: Weak<dyn CatalogManager>,
87}
88
89impl InformationSchemaKeyColumnUsage {
90    pub(super) fn new(catalog_name: String, catalog_manager: Weak<dyn CatalogManager>) -> Self {
91        Self {
92            schema: Self::schema(),
93            catalog_name,
94            catalog_manager,
95        }
96    }
97
98    pub(crate) fn schema() -> SchemaRef {
99        Arc::new(Schema::new(vec![
100            ColumnSchema::new(
101                "constraint_catalog",
102                ConcreteDataType::string_datatype(),
103                false,
104            ),
105            ColumnSchema::new(
106                CONSTRAINT_SCHEMA,
107                ConcreteDataType::string_datatype(),
108                false,
109            ),
110            ColumnSchema::new(CONSTRAINT_NAME, ConcreteDataType::string_datatype(), false),
111            ColumnSchema::new(TABLE_CATALOG, ConcreteDataType::string_datatype(), false),
112            ColumnSchema::new(
113                REAL_TABLE_CATALOG,
114                ConcreteDataType::string_datatype(),
115                false,
116            ),
117            ColumnSchema::new(TABLE_SCHEMA, ConcreteDataType::string_datatype(), false),
118            ColumnSchema::new(TABLE_NAME, ConcreteDataType::string_datatype(), false),
119            ColumnSchema::new(COLUMN_NAME, ConcreteDataType::string_datatype(), false),
120            ColumnSchema::new(ORDINAL_POSITION, ConcreteDataType::uint32_datatype(), false),
121            ColumnSchema::new(
122                "position_in_unique_constraint",
123                ConcreteDataType::uint32_datatype(),
124                true,
125            ),
126            ColumnSchema::new(
127                "referenced_table_schema",
128                ConcreteDataType::string_datatype(),
129                true,
130            ),
131            ColumnSchema::new(
132                "referenced_table_name",
133                ConcreteDataType::string_datatype(),
134                true,
135            ),
136            ColumnSchema::new(
137                "referenced_column_name",
138                ConcreteDataType::string_datatype(),
139                true,
140            ),
141            ColumnSchema::new(
142                GREPTIME_INDEX_TYPE,
143                ConcreteDataType::string_datatype(),
144                true,
145            ),
146        ]))
147    }
148
149    fn builder(&self) -> InformationSchemaKeyColumnUsageBuilder {
150        InformationSchemaKeyColumnUsageBuilder::new(
151            self.schema.clone(),
152            self.catalog_name.clone(),
153            self.catalog_manager.clone(),
154        )
155    }
156}
157
158impl InformationTable for InformationSchemaKeyColumnUsage {
159    fn table_id(&self) -> TableId {
160        INFORMATION_SCHEMA_KEY_COLUMN_USAGE_TABLE_ID
161    }
162
163    fn table_name(&self) -> &'static str {
164        KEY_COLUMN_USAGE
165    }
166
167    fn schema(&self) -> SchemaRef {
168        self.schema.clone()
169    }
170
171    fn to_stream(&self, request: ScanRequest) -> Result<SendableRecordBatchStream> {
172        let schema = self.schema.arrow_schema().clone();
173        let mut builder = self.builder();
174        let stream = Box::pin(DfRecordBatchStreamAdapter::new(
175            schema,
176            futures::stream::once(async move {
177                builder
178                    .make_key_column_usage(Some(request))
179                    .await
180                    .map(|x| x.into_df_record_batch())
181                    .map_err(Into::into)
182            }),
183        ));
184        Ok(Box::pin(
185            RecordBatchStreamAdapter::try_new(stream)
186                .map_err(BoxedError::new)
187                .context(InternalSnafu)?,
188        ))
189    }
190}
191
192/// Builds the `information_schema.KEY_COLUMN_USAGE` table row by row
193///
194/// Columns are based on <https://dev.mysql.com/doc/refman/8.2/en/information-schema-key-column-usage-table.html>
195struct InformationSchemaKeyColumnUsageBuilder {
196    schema: SchemaRef,
197    catalog_name: String,
198    catalog_manager: Weak<dyn CatalogManager>,
199
200    constraint_catalog: StringVectorBuilder,
201    constraint_schema: StringVectorBuilder,
202    constraint_name: StringVectorBuilder,
203    table_catalog: StringVectorBuilder,
204    real_table_catalog: StringVectorBuilder,
205    table_schema: StringVectorBuilder,
206    table_name: StringVectorBuilder,
207    column_name: StringVectorBuilder,
208    ordinal_position: UInt32VectorBuilder,
209    position_in_unique_constraint: UInt32VectorBuilder,
210    greptime_index_type: StringVectorBuilder,
211}
212
213impl InformationSchemaKeyColumnUsageBuilder {
214    fn new(
215        schema: SchemaRef,
216        catalog_name: String,
217        catalog_manager: Weak<dyn CatalogManager>,
218    ) -> Self {
219        Self {
220            schema,
221            catalog_name,
222            catalog_manager,
223            constraint_catalog: StringVectorBuilder::with_capacity(INIT_CAPACITY),
224            constraint_schema: StringVectorBuilder::with_capacity(INIT_CAPACITY),
225            constraint_name: StringVectorBuilder::with_capacity(INIT_CAPACITY),
226            table_catalog: StringVectorBuilder::with_capacity(INIT_CAPACITY),
227            real_table_catalog: StringVectorBuilder::with_capacity(INIT_CAPACITY),
228            table_schema: StringVectorBuilder::with_capacity(INIT_CAPACITY),
229            table_name: StringVectorBuilder::with_capacity(INIT_CAPACITY),
230            column_name: StringVectorBuilder::with_capacity(INIT_CAPACITY),
231            ordinal_position: UInt32VectorBuilder::with_capacity(INIT_CAPACITY),
232            position_in_unique_constraint: UInt32VectorBuilder::with_capacity(INIT_CAPACITY),
233            greptime_index_type: StringVectorBuilder::with_capacity(INIT_CAPACITY),
234        }
235    }
236
237    /// Construct the `information_schema.KEY_COLUMN_USAGE` virtual table
238    async fn make_key_column_usage(&mut self, request: Option<ScanRequest>) -> Result<RecordBatch> {
239        let catalog_name = self.catalog_name.clone();
240        let catalog_manager = self
241            .catalog_manager
242            .upgrade()
243            .context(UpgradeWeakCatalogManagerRefSnafu)?;
244        let predicates = Predicates::from_scan_request(&request);
245
246        for schema_name in catalog_manager.schema_names(&catalog_name, None).await? {
247            let mut stream = catalog_manager.tables(&catalog_name, &schema_name, None);
248
249            while let Some(table) = stream.try_next().await? {
250                let table_info = table.table_info();
251                let table_name = &table_info.name;
252                let keys = &table_info.meta.primary_key_indices;
253                let schema = table.schema();
254
255                for (idx, column) in schema.column_schemas().iter().enumerate() {
256                    let mut constraints = vec![];
257                    let mut greptime_index_type = vec![];
258                    if column.is_time_index() {
259                        self.add_key_column_usage(
260                            &predicates,
261                            &schema_name,
262                            CONSTRAINT_NAME_TIME_INDEX,
263                            &catalog_name,
264                            &schema_name,
265                            table_name,
266                            &column.name,
267                            1, //always 1 for time index
268                            "",
269                        );
270                    }
271                    // TODO(dimbtp): foreign key constraint not supported yet
272                    if keys.contains(&idx) {
273                        constraints.push(CONSTRAINT_NAME_PRI);
274                        greptime_index_type.push(INDEX_TYPE_PRI);
275                    }
276                    if column.is_inverted_indexed() {
277                        constraints.push(CONSTRAINT_NAME_INVERTED_INDEX);
278                        greptime_index_type.push(INDEX_TYPE_INVERTED_INDEX);
279                    }
280                    if let Ok(Some(options)) = column.fulltext_options() {
281                        if options.enable {
282                            constraints.push(CONSTRAINT_NAME_FULLTEXT_INDEX);
283                            let index_type = match options.backend {
284                                FulltextBackend::Bloom => INDEX_TYPE_FULLTEXT_BLOOM,
285                                FulltextBackend::Tantivy => INDEX_TYPE_FULLTEXT_TANTIVY,
286                            };
287                            greptime_index_type.push(index_type);
288                        }
289                    }
290                    if column.is_skipping_indexed() {
291                        constraints.push(CONSTRAINT_NAME_SKIPPING_INDEX);
292                        greptime_index_type.push(INDEX_TYPE_SKIPPING_INDEX);
293                    }
294
295                    if !constraints.is_empty() {
296                        let aggregated_constraints = constraints.join(", ");
297                        let aggregated_index_types = greptime_index_type.join(", ");
298                        self.add_key_column_usage(
299                            &predicates,
300                            &schema_name,
301                            &aggregated_constraints,
302                            &catalog_name,
303                            &schema_name,
304                            table_name,
305                            &column.name,
306                            idx as u32 + 1,
307                            &aggregated_index_types,
308                        );
309                    }
310                }
311            }
312        }
313
314        self.finish()
315    }
316
317    // TODO(dimbtp): Foreign key constraint has not `None` value for last 4
318    // fields, but it is not supported yet.
319    #[allow(clippy::too_many_arguments)]
320    fn add_key_column_usage(
321        &mut self,
322        predicates: &Predicates,
323        constraint_schema: &str,
324        constraint_name: &str,
325        table_catalog: &str,
326        table_schema: &str,
327        table_name: &str,
328        column_name: &str,
329        ordinal_position: u32,
330        index_types: &str,
331    ) {
332        let row = [
333            (CONSTRAINT_SCHEMA, &Value::from(constraint_schema)),
334            (CONSTRAINT_NAME, &Value::from(constraint_name)),
335            (REAL_TABLE_CATALOG, &Value::from(table_catalog)),
336            (TABLE_SCHEMA, &Value::from(table_schema)),
337            (TABLE_NAME, &Value::from(table_name)),
338            (COLUMN_NAME, &Value::from(column_name)),
339            (ORDINAL_POSITION, &Value::from(ordinal_position)),
340            (GREPTIME_INDEX_TYPE, &Value::from(index_types)),
341        ];
342
343        if !predicates.eval(&row) {
344            return;
345        }
346
347        self.constraint_catalog.push(Some("def"));
348        self.constraint_schema.push(Some(constraint_schema));
349        self.constraint_name.push(Some(constraint_name));
350        self.table_catalog.push(Some("def"));
351        self.real_table_catalog.push(Some(table_catalog));
352        self.table_schema.push(Some(table_schema));
353        self.table_name.push(Some(table_name));
354        self.column_name.push(Some(column_name));
355        self.ordinal_position.push(Some(ordinal_position));
356        self.position_in_unique_constraint.push(None);
357        self.greptime_index_type.push(Some(index_types));
358    }
359
360    fn finish(&mut self) -> Result<RecordBatch> {
361        let rows_num = self.table_catalog.len();
362
363        let null_string_vector = Arc::new(ConstantVector::new(
364            Arc::new(StringVector::from(vec![None as Option<&str>])),
365            rows_num,
366        ));
367        let columns: Vec<VectorRef> = vec![
368            Arc::new(self.constraint_catalog.finish()),
369            Arc::new(self.constraint_schema.finish()),
370            Arc::new(self.constraint_name.finish()),
371            Arc::new(self.table_catalog.finish()),
372            Arc::new(self.real_table_catalog.finish()),
373            Arc::new(self.table_schema.finish()),
374            Arc::new(self.table_name.finish()),
375            Arc::new(self.column_name.finish()),
376            Arc::new(self.ordinal_position.finish()),
377            Arc::new(self.position_in_unique_constraint.finish()),
378            null_string_vector.clone(),
379            null_string_vector.clone(),
380            null_string_vector,
381            Arc::new(self.greptime_index_type.finish()),
382        ];
383        RecordBatch::new(self.schema.clone(), columns).context(CreateRecordBatchSnafu)
384    }
385}
386
387impl DfPartitionStream for InformationSchemaKeyColumnUsage {
388    fn schema(&self) -> &ArrowSchemaRef {
389        self.schema.arrow_schema()
390    }
391
392    fn execute(&self, _: Arc<TaskContext>) -> DfSendableRecordBatchStream {
393        let schema = self.schema.arrow_schema().clone();
394        let mut builder = self.builder();
395        Box::pin(DfRecordBatchStreamAdapter::new(
396            schema,
397            futures::stream::once(async move {
398                builder
399                    .make_key_column_usage(None)
400                    .await
401                    .map(|x| x.into_df_record_batch())
402                    .map_err(Into::into)
403            }),
404        ))
405    }
406}