catalog/system_schema/information_schema/
columns.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::{Arc, Weak};
16
17use arrow_schema::SchemaRef as ArrowSchemaRef;
18use common_catalog::consts::{
19    INFORMATION_SCHEMA_COLUMNS_TABLE_ID, SEMANTIC_TYPE_FIELD, SEMANTIC_TYPE_PRIMARY_KEY,
20    SEMANTIC_TYPE_TIME_INDEX,
21};
22use common_error::ext::BoxedError;
23use common_recordbatch::adapter::RecordBatchStreamAdapter;
24use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
25use datafusion::execution::TaskContext;
26use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter;
27use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream;
28use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream;
29use datatypes::prelude::{ConcreteDataType, DataType, MutableVector};
30use datatypes::scalars::ScalarVectorBuilder;
31use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
32use datatypes::value::Value;
33use datatypes::vectors::{
34    ConstantVector, Int64Vector, Int64VectorBuilder, StringVector, StringVectorBuilder, VectorRef,
35};
36use futures::TryStreamExt;
37use snafu::{OptionExt, ResultExt};
38use sql::statements;
39use store_api::storage::{ScanRequest, TableId};
40
41use crate::error::{
42    CreateRecordBatchSnafu, InternalSnafu, Result, UpgradeWeakCatalogManagerRefSnafu,
43};
44use crate::information_schema::Predicates;
45use crate::system_schema::information_schema::{InformationTable, COLUMNS};
46use crate::CatalogManager;
47
48#[derive(Debug)]
49pub(super) struct InformationSchemaColumns {
50    schema: SchemaRef,
51    catalog_name: String,
52    catalog_manager: Weak<dyn CatalogManager>,
53}
54
55pub const TABLE_CATALOG: &str = "table_catalog";
56pub const TABLE_SCHEMA: &str = "table_schema";
57pub const TABLE_NAME: &str = "table_name";
58pub const COLUMN_NAME: &str = "column_name";
59pub const REGION_ID: &str = "region_id";
60pub const PEER_ID: &str = "peer_id";
61const ORDINAL_POSITION: &str = "ordinal_position";
62const CHARACTER_MAXIMUM_LENGTH: &str = "character_maximum_length";
63const CHARACTER_OCTET_LENGTH: &str = "character_octet_length";
64const NUMERIC_PRECISION: &str = "numeric_precision";
65const NUMERIC_SCALE: &str = "numeric_scale";
66const DATETIME_PRECISION: &str = "datetime_precision";
67const CHARACTER_SET_NAME: &str = "character_set_name";
68pub const COLLATION_NAME: &str = "collation_name";
69pub const COLUMN_KEY: &str = "column_key";
70pub const EXTRA: &str = "extra";
71pub const PRIVILEGES: &str = "privileges";
72const GENERATION_EXPRESSION: &str = "generation_expression";
73// Extension field to keep greptime data type name
74pub const GREPTIME_DATA_TYPE: &str = "greptime_data_type";
75pub const DATA_TYPE: &str = "data_type";
76pub const SEMANTIC_TYPE: &str = "semantic_type";
77pub const COLUMN_DEFAULT: &str = "column_default";
78pub const IS_NULLABLE: &str = "is_nullable";
79const COLUMN_TYPE: &str = "column_type";
80pub const COLUMN_COMMENT: &str = "column_comment";
81const SRS_ID: &str = "srs_id";
82const INIT_CAPACITY: usize = 42;
83
84// The maximum length of string type
85const MAX_STRING_LENGTH: i64 = 2147483647;
86const UTF8_CHARSET_NAME: &str = "utf8";
87const UTF8_COLLATE_NAME: &str = "utf8_bin";
88const PRI_COLUMN_KEY: &str = "PRI";
89const TIME_INDEX_COLUMN_KEY: &str = "TIME INDEX";
90const DEFAULT_PRIVILEGES: &str = "select,insert";
91const EMPTY_STR: &str = "";
92
93impl InformationSchemaColumns {
94    pub(super) fn new(catalog_name: String, catalog_manager: Weak<dyn CatalogManager>) -> Self {
95        Self {
96            schema: Self::schema(),
97            catalog_name,
98            catalog_manager,
99        }
100    }
101
102    fn schema() -> SchemaRef {
103        Arc::new(Schema::new(vec![
104            ColumnSchema::new(TABLE_CATALOG, ConcreteDataType::string_datatype(), false),
105            ColumnSchema::new(TABLE_SCHEMA, ConcreteDataType::string_datatype(), false),
106            ColumnSchema::new(TABLE_NAME, ConcreteDataType::string_datatype(), false),
107            ColumnSchema::new(COLUMN_NAME, ConcreteDataType::string_datatype(), false),
108            ColumnSchema::new(ORDINAL_POSITION, ConcreteDataType::int64_datatype(), false),
109            ColumnSchema::new(
110                CHARACTER_MAXIMUM_LENGTH,
111                ConcreteDataType::int64_datatype(),
112                true,
113            ),
114            ColumnSchema::new(
115                CHARACTER_OCTET_LENGTH,
116                ConcreteDataType::int64_datatype(),
117                true,
118            ),
119            ColumnSchema::new(NUMERIC_PRECISION, ConcreteDataType::int64_datatype(), true),
120            ColumnSchema::new(NUMERIC_SCALE, ConcreteDataType::int64_datatype(), true),
121            ColumnSchema::new(DATETIME_PRECISION, ConcreteDataType::int64_datatype(), true),
122            ColumnSchema::new(
123                CHARACTER_SET_NAME,
124                ConcreteDataType::string_datatype(),
125                true,
126            ),
127            ColumnSchema::new(COLLATION_NAME, ConcreteDataType::string_datatype(), true),
128            ColumnSchema::new(COLUMN_KEY, ConcreteDataType::string_datatype(), false),
129            ColumnSchema::new(EXTRA, ConcreteDataType::string_datatype(), false),
130            ColumnSchema::new(PRIVILEGES, ConcreteDataType::string_datatype(), false),
131            ColumnSchema::new(
132                GENERATION_EXPRESSION,
133                ConcreteDataType::string_datatype(),
134                false,
135            ),
136            ColumnSchema::new(
137                GREPTIME_DATA_TYPE,
138                ConcreteDataType::string_datatype(),
139                false,
140            ),
141            ColumnSchema::new(DATA_TYPE, ConcreteDataType::string_datatype(), false),
142            ColumnSchema::new(SEMANTIC_TYPE, ConcreteDataType::string_datatype(), false),
143            ColumnSchema::new(COLUMN_DEFAULT, ConcreteDataType::string_datatype(), true),
144            ColumnSchema::new(IS_NULLABLE, ConcreteDataType::string_datatype(), false),
145            ColumnSchema::new(COLUMN_TYPE, ConcreteDataType::string_datatype(), false),
146            ColumnSchema::new(COLUMN_COMMENT, ConcreteDataType::string_datatype(), true),
147            ColumnSchema::new(SRS_ID, ConcreteDataType::int64_datatype(), true),
148        ]))
149    }
150
151    fn builder(&self) -> InformationSchemaColumnsBuilder {
152        InformationSchemaColumnsBuilder::new(
153            self.schema.clone(),
154            self.catalog_name.clone(),
155            self.catalog_manager.clone(),
156        )
157    }
158}
159
160impl InformationTable for InformationSchemaColumns {
161    fn table_id(&self) -> TableId {
162        INFORMATION_SCHEMA_COLUMNS_TABLE_ID
163    }
164
165    fn table_name(&self) -> &'static str {
166        COLUMNS
167    }
168
169    fn schema(&self) -> SchemaRef {
170        self.schema.clone()
171    }
172
173    fn to_stream(&self, request: ScanRequest) -> Result<SendableRecordBatchStream> {
174        let schema = self.schema.arrow_schema().clone();
175        let mut builder = self.builder();
176        let stream = Box::pin(DfRecordBatchStreamAdapter::new(
177            schema,
178            futures::stream::once(async move {
179                builder
180                    .make_columns(Some(request))
181                    .await
182                    .map(|x| x.into_df_record_batch())
183                    .map_err(Into::into)
184            }),
185        ));
186        Ok(Box::pin(
187            RecordBatchStreamAdapter::try_new(stream)
188                .map_err(BoxedError::new)
189                .context(InternalSnafu)?,
190        ))
191    }
192}
193
194struct InformationSchemaColumnsBuilder {
195    schema: SchemaRef,
196    catalog_name: String,
197    catalog_manager: Weak<dyn CatalogManager>,
198
199    catalog_names: StringVectorBuilder,
200    schema_names: StringVectorBuilder,
201    table_names: StringVectorBuilder,
202    column_names: StringVectorBuilder,
203    ordinal_positions: Int64VectorBuilder,
204    character_maximum_lengths: Int64VectorBuilder,
205    character_octet_lengths: Int64VectorBuilder,
206    numeric_precisions: Int64VectorBuilder,
207    numeric_scales: Int64VectorBuilder,
208    datetime_precisions: Int64VectorBuilder,
209    character_set_names: StringVectorBuilder,
210    collation_names: StringVectorBuilder,
211    column_keys: StringVectorBuilder,
212    greptime_data_types: StringVectorBuilder,
213    data_types: StringVectorBuilder,
214    semantic_types: StringVectorBuilder,
215    column_defaults: StringVectorBuilder,
216    is_nullables: StringVectorBuilder,
217    column_types: StringVectorBuilder,
218    column_comments: StringVectorBuilder,
219}
220
221impl InformationSchemaColumnsBuilder {
222    fn new(
223        schema: SchemaRef,
224        catalog_name: String,
225        catalog_manager: Weak<dyn CatalogManager>,
226    ) -> Self {
227        Self {
228            schema,
229            catalog_name,
230            catalog_manager,
231            catalog_names: StringVectorBuilder::with_capacity(INIT_CAPACITY),
232            schema_names: StringVectorBuilder::with_capacity(INIT_CAPACITY),
233            table_names: StringVectorBuilder::with_capacity(INIT_CAPACITY),
234            column_names: StringVectorBuilder::with_capacity(INIT_CAPACITY),
235            ordinal_positions: Int64VectorBuilder::with_capacity(INIT_CAPACITY),
236            character_maximum_lengths: Int64VectorBuilder::with_capacity(INIT_CAPACITY),
237            character_octet_lengths: Int64VectorBuilder::with_capacity(INIT_CAPACITY),
238            numeric_precisions: Int64VectorBuilder::with_capacity(INIT_CAPACITY),
239            numeric_scales: Int64VectorBuilder::with_capacity(INIT_CAPACITY),
240            datetime_precisions: Int64VectorBuilder::with_capacity(INIT_CAPACITY),
241            character_set_names: StringVectorBuilder::with_capacity(INIT_CAPACITY),
242            collation_names: StringVectorBuilder::with_capacity(INIT_CAPACITY),
243            column_keys: StringVectorBuilder::with_capacity(INIT_CAPACITY),
244            greptime_data_types: StringVectorBuilder::with_capacity(INIT_CAPACITY),
245            data_types: StringVectorBuilder::with_capacity(INIT_CAPACITY),
246            semantic_types: StringVectorBuilder::with_capacity(INIT_CAPACITY),
247            column_defaults: StringVectorBuilder::with_capacity(INIT_CAPACITY),
248            is_nullables: StringVectorBuilder::with_capacity(INIT_CAPACITY),
249            column_types: StringVectorBuilder::with_capacity(INIT_CAPACITY),
250            column_comments: StringVectorBuilder::with_capacity(INIT_CAPACITY),
251        }
252    }
253
254    /// Construct the `information_schema.columns` virtual table
255    async fn make_columns(&mut self, request: Option<ScanRequest>) -> Result<RecordBatch> {
256        let catalog_name = self.catalog_name.clone();
257        let catalog_manager = self
258            .catalog_manager
259            .upgrade()
260            .context(UpgradeWeakCatalogManagerRefSnafu)?;
261        let predicates = Predicates::from_scan_request(&request);
262
263        for schema_name in catalog_manager.schema_names(&catalog_name, None).await? {
264            let mut stream = catalog_manager.tables(&catalog_name, &schema_name, None);
265
266            while let Some(table) = stream.try_next().await? {
267                let keys = &table.table_info().meta.primary_key_indices;
268                let schema = table.schema();
269
270                for (idx, column) in schema.column_schemas().iter().enumerate() {
271                    let semantic_type = if column.is_time_index() {
272                        SEMANTIC_TYPE_TIME_INDEX
273                    } else if keys.contains(&idx) {
274                        SEMANTIC_TYPE_PRIMARY_KEY
275                    } else {
276                        SEMANTIC_TYPE_FIELD
277                    };
278
279                    self.add_column(
280                        &predicates,
281                        idx,
282                        &catalog_name,
283                        &schema_name,
284                        &table.table_info().name,
285                        semantic_type,
286                        column,
287                    );
288                }
289            }
290        }
291
292        self.finish()
293    }
294
295    #[allow(clippy::too_many_arguments)]
296    fn add_column(
297        &mut self,
298        predicates: &Predicates,
299        index: usize,
300        catalog_name: &str,
301        schema_name: &str,
302        table_name: &str,
303        semantic_type: &str,
304        column_schema: &ColumnSchema,
305    ) {
306        // Use sql data type name
307        let data_type = statements::concrete_data_type_to_sql_data_type(&column_schema.data_type)
308            .map(|dt| dt.to_string().to_lowercase())
309            .unwrap_or_else(|_| column_schema.data_type.name());
310
311        let column_key = match semantic_type {
312            SEMANTIC_TYPE_PRIMARY_KEY => PRI_COLUMN_KEY,
313            SEMANTIC_TYPE_TIME_INDEX => TIME_INDEX_COLUMN_KEY,
314            _ => EMPTY_STR,
315        };
316
317        let row = [
318            (TABLE_CATALOG, &Value::from(catalog_name)),
319            (TABLE_SCHEMA, &Value::from(schema_name)),
320            (TABLE_NAME, &Value::from(table_name)),
321            (COLUMN_NAME, &Value::from(column_schema.name.as_str())),
322            (DATA_TYPE, &Value::from(data_type.as_str())),
323            (SEMANTIC_TYPE, &Value::from(semantic_type)),
324            (ORDINAL_POSITION, &Value::from((index + 1) as i64)),
325            (COLUMN_KEY, &Value::from(column_key)),
326        ];
327
328        if !predicates.eval(&row) {
329            return;
330        }
331
332        self.catalog_names.push(Some(catalog_name));
333        self.schema_names.push(Some(schema_name));
334        self.table_names.push(Some(table_name));
335        self.column_names.push(Some(&column_schema.name));
336        // Starts from 1
337        self.ordinal_positions.push(Some((index + 1) as i64));
338
339        if column_schema.data_type.is_string() {
340            self.character_maximum_lengths.push(Some(MAX_STRING_LENGTH));
341            self.character_octet_lengths.push(Some(MAX_STRING_LENGTH));
342            self.numeric_precisions.push(None);
343            self.numeric_scales.push(None);
344            self.datetime_precisions.push(None);
345            self.character_set_names.push(Some(UTF8_CHARSET_NAME));
346            self.collation_names.push(Some(UTF8_COLLATE_NAME));
347        } else if column_schema.data_type.is_numeric() || column_schema.data_type.is_decimal() {
348            self.character_maximum_lengths.push(None);
349            self.character_octet_lengths.push(None);
350
351            self.numeric_precisions.push(
352                column_schema
353                    .data_type
354                    .numeric_precision()
355                    .map(|x| x as i64),
356            );
357            self.numeric_scales
358                .push(column_schema.data_type.numeric_scale().map(|x| x as i64));
359
360            self.datetime_precisions.push(None);
361            self.character_set_names.push(None);
362            self.collation_names.push(None);
363        } else {
364            self.character_maximum_lengths.push(None);
365            self.character_octet_lengths.push(None);
366            self.numeric_precisions.push(None);
367            self.numeric_scales.push(None);
368
369            match &column_schema.data_type {
370                ConcreteDataType::Timestamp(ts_type) => {
371                    self.datetime_precisions
372                        .push(Some(ts_type.precision() as i64));
373                }
374                ConcreteDataType::Time(time_type) => {
375                    self.datetime_precisions
376                        .push(Some(time_type.precision() as i64));
377                }
378                _ => self.datetime_precisions.push(None),
379            }
380
381            self.character_set_names.push(None);
382            self.collation_names.push(None);
383        }
384
385        self.column_keys.push(Some(column_key));
386        self.greptime_data_types
387            .push(Some(&column_schema.data_type.name()));
388        self.data_types.push(Some(&data_type));
389        self.semantic_types.push(Some(semantic_type));
390        self.column_defaults.push(
391            column_schema
392                .default_constraint()
393                .map(|s| format!("{}", s))
394                .as_deref(),
395        );
396        if column_schema.is_nullable() {
397            self.is_nullables.push(Some("Yes"));
398        } else {
399            self.is_nullables.push(Some("No"));
400        }
401        self.column_types.push(Some(&data_type));
402        self.column_comments
403            .push(column_schema.column_comment().map(|x| x.as_ref()));
404    }
405
406    fn finish(&mut self) -> Result<RecordBatch> {
407        let rows_num = self.collation_names.len();
408
409        let privileges = Arc::new(ConstantVector::new(
410            Arc::new(StringVector::from(vec![DEFAULT_PRIVILEGES])),
411            rows_num,
412        ));
413        let empty_string = Arc::new(ConstantVector::new(
414            Arc::new(StringVector::from(vec![EMPTY_STR])),
415            rows_num,
416        ));
417        let srs_ids = Arc::new(ConstantVector::new(
418            Arc::new(Int64Vector::from(vec![None])),
419            rows_num,
420        ));
421
422        let columns: Vec<VectorRef> = vec![
423            Arc::new(self.catalog_names.finish()),
424            Arc::new(self.schema_names.finish()),
425            Arc::new(self.table_names.finish()),
426            Arc::new(self.column_names.finish()),
427            Arc::new(self.ordinal_positions.finish()),
428            Arc::new(self.character_maximum_lengths.finish()),
429            Arc::new(self.character_octet_lengths.finish()),
430            Arc::new(self.numeric_precisions.finish()),
431            Arc::new(self.numeric_scales.finish()),
432            Arc::new(self.datetime_precisions.finish()),
433            Arc::new(self.character_set_names.finish()),
434            Arc::new(self.collation_names.finish()),
435            Arc::new(self.column_keys.finish()),
436            empty_string.clone(),
437            privileges,
438            empty_string,
439            Arc::new(self.greptime_data_types.finish()),
440            Arc::new(self.data_types.finish()),
441            Arc::new(self.semantic_types.finish()),
442            Arc::new(self.column_defaults.finish()),
443            Arc::new(self.is_nullables.finish()),
444            Arc::new(self.column_types.finish()),
445            Arc::new(self.column_comments.finish()),
446            srs_ids,
447        ];
448
449        RecordBatch::new(self.schema.clone(), columns).context(CreateRecordBatchSnafu)
450    }
451}
452
453impl DfPartitionStream for InformationSchemaColumns {
454    fn schema(&self) -> &ArrowSchemaRef {
455        self.schema.arrow_schema()
456    }
457
458    fn execute(&self, _: Arc<TaskContext>) -> DfSendableRecordBatchStream {
459        let schema = self.schema.arrow_schema().clone();
460        let mut builder = self.builder();
461        Box::pin(DfRecordBatchStreamAdapter::new(
462            schema,
463            futures::stream::once(async move {
464                builder
465                    .make_columns(None)
466                    .await
467                    .map(|x| x.into_df_record_batch())
468                    .map_err(Into::into)
469            }),
470        ))
471    }
472}