catalog/system_schema/information_schema/
table_constraints.rs1use std::sync::{Arc, Weak};
16
17use arrow_schema::SchemaRef as ArrowSchemaRef;
18use common_catalog::consts::INFORMATION_SCHEMA_TABLE_CONSTRAINTS_TABLE_ID;
19use common_error::ext::BoxedError;
20use common_recordbatch::adapter::RecordBatchStreamAdapter;
21use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
22use datafusion::execution::TaskContext;
23use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter;
24use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream;
25use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream;
26use datatypes::prelude::{ConcreteDataType, MutableVector};
27use datatypes::scalars::ScalarVectorBuilder;
28use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
29use datatypes::value::Value;
30use datatypes::vectors::{ConstantVector, StringVector, StringVectorBuilder, VectorRef};
31use futures::TryStreamExt;
32use snafu::{OptionExt, ResultExt};
33use store_api::storage::{ScanRequest, TableId};
34
35use crate::error::{
36 CreateRecordBatchSnafu, InternalSnafu, Result, UpgradeWeakCatalogManagerRefSnafu,
37};
38use crate::information_schema::key_column_usage::{
39 CONSTRAINT_NAME_PRI, CONSTRAINT_NAME_TIME_INDEX,
40};
41use crate::information_schema::Predicates;
42use crate::system_schema::information_schema::{InformationTable, TABLE_CONSTRAINTS};
43use crate::CatalogManager;
44
45#[derive(Debug)]
47pub(super) struct InformationSchemaTableConstraints {
48 schema: SchemaRef,
49 catalog_name: String,
50 catalog_manager: Weak<dyn CatalogManager>,
51}
52
53const CONSTRAINT_CATALOG: &str = "constraint_catalog";
54const CONSTRAINT_SCHEMA: &str = "constraint_schema";
55const CONSTRAINT_NAME: &str = "constraint_name";
56const TABLE_SCHEMA: &str = "table_schema";
57const TABLE_NAME: &str = "table_name";
58const CONSTRAINT_TYPE: &str = "constraint_type";
59const ENFORCED: &str = "enforced";
60
61const INIT_CAPACITY: usize = 42;
62
63const TIME_INDEX_CONSTRAINT_TYPE: &str = "TIME INDEX";
64const PRI_KEY_CONSTRAINT_TYPE: &str = "PRIMARY KEY";
65
66impl InformationSchemaTableConstraints {
67 pub(super) fn new(catalog_name: String, catalog_manager: Weak<dyn CatalogManager>) -> Self {
68 Self {
69 schema: Self::schema(),
70 catalog_name,
71 catalog_manager,
72 }
73 }
74
75 fn schema() -> SchemaRef {
76 Arc::new(Schema::new(vec![
77 ColumnSchema::new(
78 CONSTRAINT_CATALOG,
79 ConcreteDataType::string_datatype(),
80 false,
81 ),
82 ColumnSchema::new(
83 CONSTRAINT_SCHEMA,
84 ConcreteDataType::string_datatype(),
85 false,
86 ),
87 ColumnSchema::new(CONSTRAINT_NAME, ConcreteDataType::string_datatype(), false),
88 ColumnSchema::new(TABLE_SCHEMA, ConcreteDataType::string_datatype(), false),
89 ColumnSchema::new(TABLE_NAME, ConcreteDataType::string_datatype(), false),
90 ColumnSchema::new(CONSTRAINT_TYPE, ConcreteDataType::string_datatype(), false),
91 ColumnSchema::new(ENFORCED, ConcreteDataType::string_datatype(), false),
92 ]))
93 }
94
95 fn builder(&self) -> InformationSchemaTableConstraintsBuilder {
96 InformationSchemaTableConstraintsBuilder::new(
97 self.schema.clone(),
98 self.catalog_name.clone(),
99 self.catalog_manager.clone(),
100 )
101 }
102}
103
104impl InformationTable for InformationSchemaTableConstraints {
105 fn table_id(&self) -> TableId {
106 INFORMATION_SCHEMA_TABLE_CONSTRAINTS_TABLE_ID
107 }
108
109 fn table_name(&self) -> &'static str {
110 TABLE_CONSTRAINTS
111 }
112
113 fn schema(&self) -> SchemaRef {
114 self.schema.clone()
115 }
116
117 fn to_stream(&self, request: ScanRequest) -> Result<SendableRecordBatchStream> {
118 let schema = self.schema.arrow_schema().clone();
119 let mut builder = self.builder();
120 let stream = Box::pin(DfRecordBatchStreamAdapter::new(
121 schema,
122 futures::stream::once(async move {
123 builder
124 .make_table_constraints(Some(request))
125 .await
126 .map(|x| x.into_df_record_batch())
127 .map_err(Into::into)
128 }),
129 ));
130 Ok(Box::pin(
131 RecordBatchStreamAdapter::try_new(stream)
132 .map_err(BoxedError::new)
133 .context(InternalSnafu)?,
134 ))
135 }
136}
137
138struct InformationSchemaTableConstraintsBuilder {
139 schema: SchemaRef,
140 catalog_name: String,
141 catalog_manager: Weak<dyn CatalogManager>,
142
143 constraint_schemas: StringVectorBuilder,
144 constraint_names: StringVectorBuilder,
145 table_schemas: StringVectorBuilder,
146 table_names: StringVectorBuilder,
147 constraint_types: StringVectorBuilder,
148}
149
150impl InformationSchemaTableConstraintsBuilder {
151 fn new(
152 schema: SchemaRef,
153 catalog_name: String,
154 catalog_manager: Weak<dyn CatalogManager>,
155 ) -> Self {
156 Self {
157 schema,
158 catalog_name,
159 catalog_manager,
160 constraint_schemas: StringVectorBuilder::with_capacity(INIT_CAPACITY),
161 constraint_names: StringVectorBuilder::with_capacity(INIT_CAPACITY),
162 table_schemas: StringVectorBuilder::with_capacity(INIT_CAPACITY),
163 table_names: StringVectorBuilder::with_capacity(INIT_CAPACITY),
164 constraint_types: StringVectorBuilder::with_capacity(INIT_CAPACITY),
165 }
166 }
167
168 async fn make_table_constraints(
170 &mut self,
171 request: Option<ScanRequest>,
172 ) -> Result<RecordBatch> {
173 let catalog_name = self.catalog_name.clone();
174 let catalog_manager = self
175 .catalog_manager
176 .upgrade()
177 .context(UpgradeWeakCatalogManagerRefSnafu)?;
178 let predicates = Predicates::from_scan_request(&request);
179
180 for schema_name in catalog_manager.schema_names(&catalog_name, None).await? {
181 let mut stream = catalog_manager.tables(&catalog_name, &schema_name, None);
182
183 while let Some(table) = stream.try_next().await? {
184 let keys = &table.table_info().meta.primary_key_indices;
185 let schema = table.schema();
186
187 if schema.timestamp_index().is_some() {
188 self.add_table_constraint(
189 &predicates,
190 &schema_name,
191 CONSTRAINT_NAME_TIME_INDEX,
192 &schema_name,
193 &table.table_info().name,
194 TIME_INDEX_CONSTRAINT_TYPE,
195 );
196 }
197
198 if !keys.is_empty() {
199 self.add_table_constraint(
200 &predicates,
201 &schema_name,
202 CONSTRAINT_NAME_PRI,
203 &schema_name,
204 &table.table_info().name,
205 PRI_KEY_CONSTRAINT_TYPE,
206 );
207 }
208 }
209 }
210
211 self.finish()
212 }
213
214 fn add_table_constraint(
215 &mut self,
216 predicates: &Predicates,
217 constraint_schema: &str,
218 constraint_name: &str,
219 table_schema: &str,
220 table_name: &str,
221 constraint_type: &str,
222 ) {
223 let row = [
224 (CONSTRAINT_SCHEMA, &Value::from(constraint_schema)),
225 (CONSTRAINT_NAME, &Value::from(constraint_name)),
226 (TABLE_SCHEMA, &Value::from(table_schema)),
227 (TABLE_NAME, &Value::from(table_name)),
228 (CONSTRAINT_TYPE, &Value::from(constraint_type)),
229 ];
230
231 if !predicates.eval(&row) {
232 return;
233 }
234
235 self.constraint_schemas.push(Some(constraint_schema));
236 self.constraint_names.push(Some(constraint_name));
237 self.table_schemas.push(Some(table_schema));
238 self.table_names.push(Some(table_name));
239 self.constraint_types.push(Some(constraint_type));
240 }
241
242 fn finish(&mut self) -> Result<RecordBatch> {
243 let rows_num = self.constraint_names.len();
244
245 let constraint_catalogs = Arc::new(ConstantVector::new(
246 Arc::new(StringVector::from(vec!["def"])),
247 rows_num,
248 ));
249 let enforceds = Arc::new(ConstantVector::new(
250 Arc::new(StringVector::from(vec!["YES"])),
251 rows_num,
252 ));
253
254 let columns: Vec<VectorRef> = vec![
255 constraint_catalogs,
256 Arc::new(self.constraint_schemas.finish()),
257 Arc::new(self.constraint_names.finish()),
258 Arc::new(self.table_schemas.finish()),
259 Arc::new(self.table_names.finish()),
260 Arc::new(self.constraint_types.finish()),
261 enforceds,
262 ];
263
264 RecordBatch::new(self.schema.clone(), columns).context(CreateRecordBatchSnafu)
265 }
266}
267
268impl DfPartitionStream for InformationSchemaTableConstraints {
269 fn schema(&self) -> &ArrowSchemaRef {
270 self.schema.arrow_schema()
271 }
272
273 fn execute(&self, _: Arc<TaskContext>) -> DfSendableRecordBatchStream {
274 let schema = self.schema.arrow_schema().clone();
275 let mut builder = self.builder();
276 Box::pin(DfRecordBatchStreamAdapter::new(
277 schema,
278 futures::stream::once(async move {
279 builder
280 .make_table_constraints(None)
281 .await
282 .map(|x| x.into_df_record_batch())
283 .map_err(Into::into)
284 }),
285 ))
286 }
287}