1use std::sync::{Arc, Weak};
16
17use arrow_schema::SchemaRef as ArrowSchemaRef;
18use common_catalog::consts::INFORMATION_SCHEMA_KEY_COLUMN_USAGE_TABLE_ID;
19use common_error::ext::BoxedError;
20use common_recordbatch::adapter::RecordBatchStreamAdapter;
21use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
22use datafusion::execution::TaskContext;
23use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter;
24use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream;
25use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream;
26use datatypes::prelude::{ConcreteDataType, MutableVector, ScalarVectorBuilder, VectorRef};
27use datatypes::schema::{ColumnSchema, FulltextBackend, Schema, SchemaRef};
28use datatypes::value::Value;
29use datatypes::vectors::{ConstantVector, StringVector, StringVectorBuilder, UInt32VectorBuilder};
30use futures_util::TryStreamExt;
31use snafu::{OptionExt, ResultExt};
32use store_api::storage::{ScanRequest, TableId};
33
34use crate::error::{
35 CreateRecordBatchSnafu, InternalSnafu, Result, UpgradeWeakCatalogManagerRefSnafu,
36};
37use crate::system_schema::information_schema::{InformationTable, Predicates, KEY_COLUMN_USAGE};
38use crate::CatalogManager;
39
40pub const CONSTRAINT_SCHEMA: &str = "constraint_schema";
41pub const CONSTRAINT_NAME: &str = "constraint_name";
42pub const TABLE_CATALOG: &str = "table_catalog";
44pub const REAL_TABLE_CATALOG: &str = "real_table_catalog";
46pub const TABLE_SCHEMA: &str = "table_schema";
47pub const TABLE_NAME: &str = "table_name";
48pub const COLUMN_NAME: &str = "column_name";
49pub const ORDINAL_POSITION: &str = "ordinal_position";
50pub const GREPTIME_INDEX_TYPE: &str = "greptime_index_type";
52const INIT_CAPACITY: usize = 42;
53
54pub(crate) const CONSTRAINT_NAME_TIME_INDEX: &str = "TIME INDEX";
56
57pub(crate) const CONSTRAINT_NAME_PRI: &str = "PRIMARY";
59pub(crate) const INDEX_TYPE_PRI: &str = "greptime-primary-key-v1";
61
62pub(crate) const CONSTRAINT_NAME_INVERTED_INDEX: &str = "INVERTED INDEX";
64pub(crate) const INDEX_TYPE_INVERTED_INDEX: &str = "greptime-inverted-index-v1";
66
67pub(crate) const CONSTRAINT_NAME_FULLTEXT_INDEX: &str = "FULLTEXT INDEX";
69pub(crate) const INDEX_TYPE_FULLTEXT_TANTIVY: &str = "greptime-fulltext-index-v1";
71pub(crate) const INDEX_TYPE_FULLTEXT_BLOOM: &str = "greptime-fulltext-index-bloom";
73
74pub(crate) const CONSTRAINT_NAME_SKIPPING_INDEX: &str = "SKIPPING INDEX";
76pub(crate) const INDEX_TYPE_SKIPPING_INDEX: &str = "greptime-bloom-filter-v1";
78
79#[derive(Debug)]
83pub(super) struct InformationSchemaKeyColumnUsage {
84 schema: SchemaRef,
85 catalog_name: String,
86 catalog_manager: Weak<dyn CatalogManager>,
87}
88
89impl InformationSchemaKeyColumnUsage {
90 pub(super) fn new(catalog_name: String, catalog_manager: Weak<dyn CatalogManager>) -> Self {
91 Self {
92 schema: Self::schema(),
93 catalog_name,
94 catalog_manager,
95 }
96 }
97
98 pub(crate) fn schema() -> SchemaRef {
99 Arc::new(Schema::new(vec![
100 ColumnSchema::new(
101 "constraint_catalog",
102 ConcreteDataType::string_datatype(),
103 false,
104 ),
105 ColumnSchema::new(
106 CONSTRAINT_SCHEMA,
107 ConcreteDataType::string_datatype(),
108 false,
109 ),
110 ColumnSchema::new(CONSTRAINT_NAME, ConcreteDataType::string_datatype(), false),
111 ColumnSchema::new(TABLE_CATALOG, ConcreteDataType::string_datatype(), false),
112 ColumnSchema::new(
113 REAL_TABLE_CATALOG,
114 ConcreteDataType::string_datatype(),
115 false,
116 ),
117 ColumnSchema::new(TABLE_SCHEMA, ConcreteDataType::string_datatype(), false),
118 ColumnSchema::new(TABLE_NAME, ConcreteDataType::string_datatype(), false),
119 ColumnSchema::new(COLUMN_NAME, ConcreteDataType::string_datatype(), false),
120 ColumnSchema::new(ORDINAL_POSITION, ConcreteDataType::uint32_datatype(), false),
121 ColumnSchema::new(
122 "position_in_unique_constraint",
123 ConcreteDataType::uint32_datatype(),
124 true,
125 ),
126 ColumnSchema::new(
127 "referenced_table_schema",
128 ConcreteDataType::string_datatype(),
129 true,
130 ),
131 ColumnSchema::new(
132 "referenced_table_name",
133 ConcreteDataType::string_datatype(),
134 true,
135 ),
136 ColumnSchema::new(
137 "referenced_column_name",
138 ConcreteDataType::string_datatype(),
139 true,
140 ),
141 ColumnSchema::new(
142 GREPTIME_INDEX_TYPE,
143 ConcreteDataType::string_datatype(),
144 true,
145 ),
146 ]))
147 }
148
149 fn builder(&self) -> InformationSchemaKeyColumnUsageBuilder {
150 InformationSchemaKeyColumnUsageBuilder::new(
151 self.schema.clone(),
152 self.catalog_name.clone(),
153 self.catalog_manager.clone(),
154 )
155 }
156}
157
158impl InformationTable for InformationSchemaKeyColumnUsage {
159 fn table_id(&self) -> TableId {
160 INFORMATION_SCHEMA_KEY_COLUMN_USAGE_TABLE_ID
161 }
162
163 fn table_name(&self) -> &'static str {
164 KEY_COLUMN_USAGE
165 }
166
167 fn schema(&self) -> SchemaRef {
168 self.schema.clone()
169 }
170
171 fn to_stream(&self, request: ScanRequest) -> Result<SendableRecordBatchStream> {
172 let schema = self.schema.arrow_schema().clone();
173 let mut builder = self.builder();
174 let stream = Box::pin(DfRecordBatchStreamAdapter::new(
175 schema,
176 futures::stream::once(async move {
177 builder
178 .make_key_column_usage(Some(request))
179 .await
180 .map(|x| x.into_df_record_batch())
181 .map_err(Into::into)
182 }),
183 ));
184 Ok(Box::pin(
185 RecordBatchStreamAdapter::try_new(stream)
186 .map_err(BoxedError::new)
187 .context(InternalSnafu)?,
188 ))
189 }
190}
191
192struct InformationSchemaKeyColumnUsageBuilder {
196 schema: SchemaRef,
197 catalog_name: String,
198 catalog_manager: Weak<dyn CatalogManager>,
199
200 constraint_catalog: StringVectorBuilder,
201 constraint_schema: StringVectorBuilder,
202 constraint_name: StringVectorBuilder,
203 table_catalog: StringVectorBuilder,
204 real_table_catalog: StringVectorBuilder,
205 table_schema: StringVectorBuilder,
206 table_name: StringVectorBuilder,
207 column_name: StringVectorBuilder,
208 ordinal_position: UInt32VectorBuilder,
209 position_in_unique_constraint: UInt32VectorBuilder,
210 greptime_index_type: StringVectorBuilder,
211}
212
213impl InformationSchemaKeyColumnUsageBuilder {
214 fn new(
215 schema: SchemaRef,
216 catalog_name: String,
217 catalog_manager: Weak<dyn CatalogManager>,
218 ) -> Self {
219 Self {
220 schema,
221 catalog_name,
222 catalog_manager,
223 constraint_catalog: StringVectorBuilder::with_capacity(INIT_CAPACITY),
224 constraint_schema: StringVectorBuilder::with_capacity(INIT_CAPACITY),
225 constraint_name: StringVectorBuilder::with_capacity(INIT_CAPACITY),
226 table_catalog: StringVectorBuilder::with_capacity(INIT_CAPACITY),
227 real_table_catalog: StringVectorBuilder::with_capacity(INIT_CAPACITY),
228 table_schema: StringVectorBuilder::with_capacity(INIT_CAPACITY),
229 table_name: StringVectorBuilder::with_capacity(INIT_CAPACITY),
230 column_name: StringVectorBuilder::with_capacity(INIT_CAPACITY),
231 ordinal_position: UInt32VectorBuilder::with_capacity(INIT_CAPACITY),
232 position_in_unique_constraint: UInt32VectorBuilder::with_capacity(INIT_CAPACITY),
233 greptime_index_type: StringVectorBuilder::with_capacity(INIT_CAPACITY),
234 }
235 }
236
237 async fn make_key_column_usage(&mut self, request: Option<ScanRequest>) -> Result<RecordBatch> {
239 let catalog_name = self.catalog_name.clone();
240 let catalog_manager = self
241 .catalog_manager
242 .upgrade()
243 .context(UpgradeWeakCatalogManagerRefSnafu)?;
244 let predicates = Predicates::from_scan_request(&request);
245
246 for schema_name in catalog_manager.schema_names(&catalog_name, None).await? {
247 let mut stream = catalog_manager.tables(&catalog_name, &schema_name, None);
248
249 while let Some(table) = stream.try_next().await? {
250 let table_info = table.table_info();
251 let table_name = &table_info.name;
252 let keys = &table_info.meta.primary_key_indices;
253 let schema = table.schema();
254
255 for (idx, column) in schema.column_schemas().iter().enumerate() {
256 let mut constraints = vec![];
257 let mut greptime_index_type = vec![];
258 if column.is_time_index() {
259 self.add_key_column_usage(
260 &predicates,
261 &schema_name,
262 CONSTRAINT_NAME_TIME_INDEX,
263 &catalog_name,
264 &schema_name,
265 table_name,
266 &column.name,
267 1, "",
269 );
270 }
271 if keys.contains(&idx) {
273 constraints.push(CONSTRAINT_NAME_PRI);
274 greptime_index_type.push(INDEX_TYPE_PRI);
275 }
276 if column.is_inverted_indexed() {
277 constraints.push(CONSTRAINT_NAME_INVERTED_INDEX);
278 greptime_index_type.push(INDEX_TYPE_INVERTED_INDEX);
279 }
280 if let Ok(Some(options)) = column.fulltext_options() {
281 if options.enable {
282 constraints.push(CONSTRAINT_NAME_FULLTEXT_INDEX);
283 let index_type = match options.backend {
284 FulltextBackend::Bloom => INDEX_TYPE_FULLTEXT_BLOOM,
285 FulltextBackend::Tantivy => INDEX_TYPE_FULLTEXT_TANTIVY,
286 };
287 greptime_index_type.push(index_type);
288 }
289 }
290 if column.is_skipping_indexed() {
291 constraints.push(CONSTRAINT_NAME_SKIPPING_INDEX);
292 greptime_index_type.push(INDEX_TYPE_SKIPPING_INDEX);
293 }
294
295 if !constraints.is_empty() {
296 let aggregated_constraints = constraints.join(", ");
297 let aggregated_index_types = greptime_index_type.join(", ");
298 self.add_key_column_usage(
299 &predicates,
300 &schema_name,
301 &aggregated_constraints,
302 &catalog_name,
303 &schema_name,
304 table_name,
305 &column.name,
306 idx as u32 + 1,
307 &aggregated_index_types,
308 );
309 }
310 }
311 }
312 }
313
314 self.finish()
315 }
316
317 #[allow(clippy::too_many_arguments)]
320 fn add_key_column_usage(
321 &mut self,
322 predicates: &Predicates,
323 constraint_schema: &str,
324 constraint_name: &str,
325 table_catalog: &str,
326 table_schema: &str,
327 table_name: &str,
328 column_name: &str,
329 ordinal_position: u32,
330 index_types: &str,
331 ) {
332 let row = [
333 (CONSTRAINT_SCHEMA, &Value::from(constraint_schema)),
334 (CONSTRAINT_NAME, &Value::from(constraint_name)),
335 (REAL_TABLE_CATALOG, &Value::from(table_catalog)),
336 (TABLE_SCHEMA, &Value::from(table_schema)),
337 (TABLE_NAME, &Value::from(table_name)),
338 (COLUMN_NAME, &Value::from(column_name)),
339 (ORDINAL_POSITION, &Value::from(ordinal_position)),
340 (GREPTIME_INDEX_TYPE, &Value::from(index_types)),
341 ];
342
343 if !predicates.eval(&row) {
344 return;
345 }
346
347 self.constraint_catalog.push(Some("def"));
348 self.constraint_schema.push(Some(constraint_schema));
349 self.constraint_name.push(Some(constraint_name));
350 self.table_catalog.push(Some("def"));
351 self.real_table_catalog.push(Some(table_catalog));
352 self.table_schema.push(Some(table_schema));
353 self.table_name.push(Some(table_name));
354 self.column_name.push(Some(column_name));
355 self.ordinal_position.push(Some(ordinal_position));
356 self.position_in_unique_constraint.push(None);
357 self.greptime_index_type.push(Some(index_types));
358 }
359
360 fn finish(&mut self) -> Result<RecordBatch> {
361 let rows_num = self.table_catalog.len();
362
363 let null_string_vector = Arc::new(ConstantVector::new(
364 Arc::new(StringVector::from(vec![None as Option<&str>])),
365 rows_num,
366 ));
367 let columns: Vec<VectorRef> = vec![
368 Arc::new(self.constraint_catalog.finish()),
369 Arc::new(self.constraint_schema.finish()),
370 Arc::new(self.constraint_name.finish()),
371 Arc::new(self.table_catalog.finish()),
372 Arc::new(self.real_table_catalog.finish()),
373 Arc::new(self.table_schema.finish()),
374 Arc::new(self.table_name.finish()),
375 Arc::new(self.column_name.finish()),
376 Arc::new(self.ordinal_position.finish()),
377 Arc::new(self.position_in_unique_constraint.finish()),
378 null_string_vector.clone(),
379 null_string_vector.clone(),
380 null_string_vector,
381 Arc::new(self.greptime_index_type.finish()),
382 ];
383 RecordBatch::new(self.schema.clone(), columns).context(CreateRecordBatchSnafu)
384 }
385}
386
387impl DfPartitionStream for InformationSchemaKeyColumnUsage {
388 fn schema(&self) -> &ArrowSchemaRef {
389 self.schema.arrow_schema()
390 }
391
392 fn execute(&self, _: Arc<TaskContext>) -> DfSendableRecordBatchStream {
393 let schema = self.schema.arrow_schema().clone();
394 let mut builder = self.builder();
395 Box::pin(DfRecordBatchStreamAdapter::new(
396 schema,
397 futures::stream::once(async move {
398 builder
399 .make_key_column_usage(None)
400 .await
401 .map(|x| x.into_df_record_batch())
402 .map_err(Into::into)
403 }),
404 ))
405 }
406}