catalog/system_schema/information_schema/
table_constraints.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::{Arc, Weak};
16
17use arrow_schema::SchemaRef as ArrowSchemaRef;
18use common_catalog::consts::INFORMATION_SCHEMA_TABLE_CONSTRAINTS_TABLE_ID;
19use common_error::ext::BoxedError;
20use common_recordbatch::adapter::RecordBatchStreamAdapter;
21use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
22use datafusion::execution::TaskContext;
23use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter;
24use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream;
25use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream;
26use datatypes::prelude::{ConcreteDataType, MutableVector};
27use datatypes::scalars::ScalarVectorBuilder;
28use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
29use datatypes::value::Value;
30use datatypes::vectors::{ConstantVector, StringVector, StringVectorBuilder, VectorRef};
31use futures::TryStreamExt;
32use snafu::{OptionExt, ResultExt};
33use store_api::storage::{ScanRequest, TableId};
34
35use crate::error::{
36    CreateRecordBatchSnafu, InternalSnafu, Result, UpgradeWeakCatalogManagerRefSnafu,
37};
38use crate::information_schema::key_column_usage::{
39    CONSTRAINT_NAME_PRI, CONSTRAINT_NAME_TIME_INDEX,
40};
41use crate::information_schema::Predicates;
42use crate::system_schema::information_schema::{InformationTable, TABLE_CONSTRAINTS};
43use crate::CatalogManager;
44
45/// The `TABLE_CONSTRAINTS` table describes which tables have constraints.
46#[derive(Debug)]
47pub(super) struct InformationSchemaTableConstraints {
48    schema: SchemaRef,
49    catalog_name: String,
50    catalog_manager: Weak<dyn CatalogManager>,
51}
52
53const CONSTRAINT_CATALOG: &str = "constraint_catalog";
54const CONSTRAINT_SCHEMA: &str = "constraint_schema";
55const CONSTRAINT_NAME: &str = "constraint_name";
56const TABLE_SCHEMA: &str = "table_schema";
57const TABLE_NAME: &str = "table_name";
58const CONSTRAINT_TYPE: &str = "constraint_type";
59const ENFORCED: &str = "enforced";
60
61const INIT_CAPACITY: usize = 42;
62
63const TIME_INDEX_CONSTRAINT_TYPE: &str = "TIME INDEX";
64const PRI_KEY_CONSTRAINT_TYPE: &str = "PRIMARY KEY";
65
66impl InformationSchemaTableConstraints {
67    pub(super) fn new(catalog_name: String, catalog_manager: Weak<dyn CatalogManager>) -> Self {
68        Self {
69            schema: Self::schema(),
70            catalog_name,
71            catalog_manager,
72        }
73    }
74
75    fn schema() -> SchemaRef {
76        Arc::new(Schema::new(vec![
77            ColumnSchema::new(
78                CONSTRAINT_CATALOG,
79                ConcreteDataType::string_datatype(),
80                false,
81            ),
82            ColumnSchema::new(
83                CONSTRAINT_SCHEMA,
84                ConcreteDataType::string_datatype(),
85                false,
86            ),
87            ColumnSchema::new(CONSTRAINT_NAME, ConcreteDataType::string_datatype(), false),
88            ColumnSchema::new(TABLE_SCHEMA, ConcreteDataType::string_datatype(), false),
89            ColumnSchema::new(TABLE_NAME, ConcreteDataType::string_datatype(), false),
90            ColumnSchema::new(CONSTRAINT_TYPE, ConcreteDataType::string_datatype(), false),
91            ColumnSchema::new(ENFORCED, ConcreteDataType::string_datatype(), false),
92        ]))
93    }
94
95    fn builder(&self) -> InformationSchemaTableConstraintsBuilder {
96        InformationSchemaTableConstraintsBuilder::new(
97            self.schema.clone(),
98            self.catalog_name.clone(),
99            self.catalog_manager.clone(),
100        )
101    }
102}
103
104impl InformationTable for InformationSchemaTableConstraints {
105    fn table_id(&self) -> TableId {
106        INFORMATION_SCHEMA_TABLE_CONSTRAINTS_TABLE_ID
107    }
108
109    fn table_name(&self) -> &'static str {
110        TABLE_CONSTRAINTS
111    }
112
113    fn schema(&self) -> SchemaRef {
114        self.schema.clone()
115    }
116
117    fn to_stream(&self, request: ScanRequest) -> Result<SendableRecordBatchStream> {
118        let schema = self.schema.arrow_schema().clone();
119        let mut builder = self.builder();
120        let stream = Box::pin(DfRecordBatchStreamAdapter::new(
121            schema,
122            futures::stream::once(async move {
123                builder
124                    .make_table_constraints(Some(request))
125                    .await
126                    .map(|x| x.into_df_record_batch())
127                    .map_err(Into::into)
128            }),
129        ));
130        Ok(Box::pin(
131            RecordBatchStreamAdapter::try_new(stream)
132                .map_err(BoxedError::new)
133                .context(InternalSnafu)?,
134        ))
135    }
136}
137
138struct InformationSchemaTableConstraintsBuilder {
139    schema: SchemaRef,
140    catalog_name: String,
141    catalog_manager: Weak<dyn CatalogManager>,
142
143    constraint_schemas: StringVectorBuilder,
144    constraint_names: StringVectorBuilder,
145    table_schemas: StringVectorBuilder,
146    table_names: StringVectorBuilder,
147    constraint_types: StringVectorBuilder,
148}
149
150impl InformationSchemaTableConstraintsBuilder {
151    fn new(
152        schema: SchemaRef,
153        catalog_name: String,
154        catalog_manager: Weak<dyn CatalogManager>,
155    ) -> Self {
156        Self {
157            schema,
158            catalog_name,
159            catalog_manager,
160            constraint_schemas: StringVectorBuilder::with_capacity(INIT_CAPACITY),
161            constraint_names: StringVectorBuilder::with_capacity(INIT_CAPACITY),
162            table_schemas: StringVectorBuilder::with_capacity(INIT_CAPACITY),
163            table_names: StringVectorBuilder::with_capacity(INIT_CAPACITY),
164            constraint_types: StringVectorBuilder::with_capacity(INIT_CAPACITY),
165        }
166    }
167
168    /// Construct the `information_schema.table_constraints` virtual table
169    async fn make_table_constraints(
170        &mut self,
171        request: Option<ScanRequest>,
172    ) -> Result<RecordBatch> {
173        let catalog_name = self.catalog_name.clone();
174        let catalog_manager = self
175            .catalog_manager
176            .upgrade()
177            .context(UpgradeWeakCatalogManagerRefSnafu)?;
178        let predicates = Predicates::from_scan_request(&request);
179
180        for schema_name in catalog_manager.schema_names(&catalog_name, None).await? {
181            let mut stream = catalog_manager.tables(&catalog_name, &schema_name, None);
182
183            while let Some(table) = stream.try_next().await? {
184                let keys = &table.table_info().meta.primary_key_indices;
185                let schema = table.schema();
186
187                if schema.timestamp_index().is_some() {
188                    self.add_table_constraint(
189                        &predicates,
190                        &schema_name,
191                        CONSTRAINT_NAME_TIME_INDEX,
192                        &schema_name,
193                        &table.table_info().name,
194                        TIME_INDEX_CONSTRAINT_TYPE,
195                    );
196                }
197
198                if !keys.is_empty() {
199                    self.add_table_constraint(
200                        &predicates,
201                        &schema_name,
202                        CONSTRAINT_NAME_PRI,
203                        &schema_name,
204                        &table.table_info().name,
205                        PRI_KEY_CONSTRAINT_TYPE,
206                    );
207                }
208            }
209        }
210
211        self.finish()
212    }
213
214    fn add_table_constraint(
215        &mut self,
216        predicates: &Predicates,
217        constraint_schema: &str,
218        constraint_name: &str,
219        table_schema: &str,
220        table_name: &str,
221        constraint_type: &str,
222    ) {
223        let row = [
224            (CONSTRAINT_SCHEMA, &Value::from(constraint_schema)),
225            (CONSTRAINT_NAME, &Value::from(constraint_name)),
226            (TABLE_SCHEMA, &Value::from(table_schema)),
227            (TABLE_NAME, &Value::from(table_name)),
228            (CONSTRAINT_TYPE, &Value::from(constraint_type)),
229        ];
230
231        if !predicates.eval(&row) {
232            return;
233        }
234
235        self.constraint_schemas.push(Some(constraint_schema));
236        self.constraint_names.push(Some(constraint_name));
237        self.table_schemas.push(Some(table_schema));
238        self.table_names.push(Some(table_name));
239        self.constraint_types.push(Some(constraint_type));
240    }
241
242    fn finish(&mut self) -> Result<RecordBatch> {
243        let rows_num = self.constraint_names.len();
244
245        let constraint_catalogs = Arc::new(ConstantVector::new(
246            Arc::new(StringVector::from(vec!["def"])),
247            rows_num,
248        ));
249        let enforceds = Arc::new(ConstantVector::new(
250            Arc::new(StringVector::from(vec!["YES"])),
251            rows_num,
252        ));
253
254        let columns: Vec<VectorRef> = vec![
255            constraint_catalogs,
256            Arc::new(self.constraint_schemas.finish()),
257            Arc::new(self.constraint_names.finish()),
258            Arc::new(self.table_schemas.finish()),
259            Arc::new(self.table_names.finish()),
260            Arc::new(self.constraint_types.finish()),
261            enforceds,
262        ];
263
264        RecordBatch::new(self.schema.clone(), columns).context(CreateRecordBatchSnafu)
265    }
266}
267
268impl DfPartitionStream for InformationSchemaTableConstraints {
269    fn schema(&self) -> &ArrowSchemaRef {
270        self.schema.arrow_schema()
271    }
272
273    fn execute(&self, _: Arc<TaskContext>) -> DfSendableRecordBatchStream {
274        let schema = self.schema.arrow_schema().clone();
275        let mut builder = self.builder();
276        Box::pin(DfRecordBatchStreamAdapter::new(
277            schema,
278            futures::stream::once(async move {
279                builder
280                    .make_table_constraints(None)
281                    .await
282                    .map(|x| x.into_df_record_batch())
283                    .map_err(Into::into)
284            }),
285        ))
286    }
287}