catalog/system_schema/information_schema/
tables.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16use std::sync::{Arc, Weak};
17
18use arrow_schema::SchemaRef as ArrowSchemaRef;
19use common_catalog::consts::{INFORMATION_SCHEMA_TABLES_TABLE_ID, MITO_ENGINE};
20use common_error::ext::BoxedError;
21use common_meta::datanode::RegionStat;
22use common_recordbatch::adapter::RecordBatchStreamAdapter;
23use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
24use common_telemetry::error;
25use datafusion::execution::TaskContext;
26use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream;
27use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter;
28use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream;
29use datatypes::prelude::{ConcreteDataType, ScalarVectorBuilder, VectorRef};
30use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
31use datatypes::value::Value;
32use datatypes::vectors::{
33    StringVectorBuilder, TimestampSecondVectorBuilder, UInt32VectorBuilder, UInt64VectorBuilder,
34};
35use futures::TryStreamExt;
36use snafu::{OptionExt, ResultExt};
37use store_api::storage::{RegionId, ScanRequest, TableId};
38use table::metadata::{TableInfo, TableType};
39
40use crate::CatalogManager;
41use crate::error::{
42    CreateRecordBatchSnafu, InternalSnafu, Result, UpgradeWeakCatalogManagerRefSnafu,
43};
44use crate::system_schema::information_schema::{InformationTable, Predicates, TABLES};
45use crate::system_schema::utils;
46
47pub const TABLE_CATALOG: &str = "table_catalog";
48pub const TABLE_SCHEMA: &str = "table_schema";
49pub const TABLE_NAME: &str = "table_name";
50pub const TABLE_TYPE: &str = "table_type";
51pub const VERSION: &str = "version";
52pub const ROW_FORMAT: &str = "row_format";
53pub const TABLE_ROWS: &str = "table_rows";
54pub const DATA_LENGTH: &str = "data_length";
55pub const INDEX_LENGTH: &str = "index_length";
56pub const MAX_DATA_LENGTH: &str = "max_data_length";
57pub const AVG_ROW_LENGTH: &str = "avg_row_length";
58pub const DATA_FREE: &str = "data_free";
59pub const AUTO_INCREMENT: &str = "auto_increment";
60pub const CREATE_TIME: &str = "create_time";
61pub const UPDATE_TIME: &str = "update_time";
62pub const CHECK_TIME: &str = "check_time";
63pub const TABLE_COLLATION: &str = "table_collation";
64pub const CHECKSUM: &str = "checksum";
65pub const CREATE_OPTIONS: &str = "create_options";
66pub const TABLE_COMMENT: &str = "table_comment";
67pub const MAX_INDEX_LENGTH: &str = "max_index_length";
68pub const TEMPORARY: &str = "temporary";
69const TABLE_ID: &str = "table_id";
70pub const ENGINE: &str = "engine";
71const INIT_CAPACITY: usize = 42;
72
73#[derive(Debug)]
74pub(super) struct InformationSchemaTables {
75    schema: SchemaRef,
76    catalog_name: String,
77    catalog_manager: Weak<dyn CatalogManager>,
78}
79
80impl InformationSchemaTables {
81    pub(super) fn new(catalog_name: String, catalog_manager: Weak<dyn CatalogManager>) -> Self {
82        Self {
83            schema: Self::schema(),
84            catalog_name,
85            catalog_manager,
86        }
87    }
88
89    pub(crate) fn schema() -> SchemaRef {
90        Arc::new(Schema::new(vec![
91            ColumnSchema::new(TABLE_CATALOG, ConcreteDataType::string_datatype(), false),
92            ColumnSchema::new(TABLE_SCHEMA, ConcreteDataType::string_datatype(), false),
93            ColumnSchema::new(TABLE_NAME, ConcreteDataType::string_datatype(), false),
94            ColumnSchema::new(TABLE_TYPE, ConcreteDataType::string_datatype(), false),
95            ColumnSchema::new(TABLE_ID, ConcreteDataType::uint32_datatype(), true),
96            ColumnSchema::new(DATA_LENGTH, ConcreteDataType::uint64_datatype(), true),
97            ColumnSchema::new(MAX_DATA_LENGTH, ConcreteDataType::uint64_datatype(), true),
98            ColumnSchema::new(INDEX_LENGTH, ConcreteDataType::uint64_datatype(), true),
99            ColumnSchema::new(MAX_INDEX_LENGTH, ConcreteDataType::uint64_datatype(), true),
100            ColumnSchema::new(AVG_ROW_LENGTH, ConcreteDataType::uint64_datatype(), true),
101            ColumnSchema::new(ENGINE, ConcreteDataType::string_datatype(), true),
102            ColumnSchema::new(VERSION, ConcreteDataType::uint64_datatype(), true),
103            ColumnSchema::new(ROW_FORMAT, ConcreteDataType::string_datatype(), true),
104            ColumnSchema::new(TABLE_ROWS, ConcreteDataType::uint64_datatype(), true),
105            ColumnSchema::new(DATA_FREE, ConcreteDataType::uint64_datatype(), true),
106            ColumnSchema::new(AUTO_INCREMENT, ConcreteDataType::uint64_datatype(), true),
107            ColumnSchema::new(
108                CREATE_TIME,
109                ConcreteDataType::timestamp_second_datatype(),
110                true,
111            ),
112            ColumnSchema::new(
113                UPDATE_TIME,
114                ConcreteDataType::timestamp_second_datatype(),
115                true,
116            ),
117            ColumnSchema::new(
118                CHECK_TIME,
119                ConcreteDataType::timestamp_second_datatype(),
120                true,
121            ),
122            ColumnSchema::new(TABLE_COLLATION, ConcreteDataType::string_datatype(), true),
123            ColumnSchema::new(CHECKSUM, ConcreteDataType::uint64_datatype(), true),
124            ColumnSchema::new(CREATE_OPTIONS, ConcreteDataType::string_datatype(), true),
125            ColumnSchema::new(TABLE_COMMENT, ConcreteDataType::string_datatype(), true),
126            ColumnSchema::new(TEMPORARY, ConcreteDataType::string_datatype(), true),
127        ]))
128    }
129
130    fn builder(&self) -> InformationSchemaTablesBuilder {
131        InformationSchemaTablesBuilder::new(
132            self.schema.clone(),
133            self.catalog_name.clone(),
134            self.catalog_manager.clone(),
135        )
136    }
137}
138
139impl InformationTable for InformationSchemaTables {
140    fn table_id(&self) -> TableId {
141        INFORMATION_SCHEMA_TABLES_TABLE_ID
142    }
143
144    fn table_name(&self) -> &'static str {
145        TABLES
146    }
147
148    fn schema(&self) -> SchemaRef {
149        self.schema.clone()
150    }
151
152    fn to_stream(&self, request: ScanRequest) -> Result<SendableRecordBatchStream> {
153        let schema = self.schema.arrow_schema().clone();
154        let mut builder = self.builder();
155        let stream = Box::pin(DfRecordBatchStreamAdapter::new(
156            schema,
157            futures::stream::once(async move {
158                builder
159                    .make_tables(Some(request))
160                    .await
161                    .map(|x| x.into_df_record_batch())
162                    .map_err(|err| datafusion::error::DataFusionError::External(Box::new(err)))
163            }),
164        ));
165        Ok(Box::pin(
166            RecordBatchStreamAdapter::try_new(stream)
167                .map_err(BoxedError::new)
168                .context(InternalSnafu)?,
169        ))
170    }
171}
172
173/// Builds the `information_schema.TABLE` table row by row
174///
175/// Columns are based on <https://www.postgresql.org/docs/current/infoschema-columns.html>
176struct InformationSchemaTablesBuilder {
177    schema: SchemaRef,
178    catalog_name: String,
179    catalog_manager: Weak<dyn CatalogManager>,
180
181    catalog_names: StringVectorBuilder,
182    schema_names: StringVectorBuilder,
183    table_names: StringVectorBuilder,
184    table_types: StringVectorBuilder,
185    table_ids: UInt32VectorBuilder,
186    version: UInt64VectorBuilder,
187    row_format: StringVectorBuilder,
188    table_rows: UInt64VectorBuilder,
189    data_length: UInt64VectorBuilder,
190    max_data_length: UInt64VectorBuilder,
191    index_length: UInt64VectorBuilder,
192    avg_row_length: UInt64VectorBuilder,
193    max_index_length: UInt64VectorBuilder,
194    data_free: UInt64VectorBuilder,
195    auto_increment: UInt64VectorBuilder,
196    create_time: TimestampSecondVectorBuilder,
197    update_time: TimestampSecondVectorBuilder,
198    check_time: TimestampSecondVectorBuilder,
199    table_collation: StringVectorBuilder,
200    checksum: UInt64VectorBuilder,
201    create_options: StringVectorBuilder,
202    table_comment: StringVectorBuilder,
203    engines: StringVectorBuilder,
204    temporary: StringVectorBuilder,
205}
206
207impl InformationSchemaTablesBuilder {
208    fn new(
209        schema: SchemaRef,
210        catalog_name: String,
211        catalog_manager: Weak<dyn CatalogManager>,
212    ) -> Self {
213        Self {
214            schema,
215            catalog_name,
216            catalog_manager,
217            catalog_names: StringVectorBuilder::with_capacity(INIT_CAPACITY),
218            schema_names: StringVectorBuilder::with_capacity(INIT_CAPACITY),
219            table_names: StringVectorBuilder::with_capacity(INIT_CAPACITY),
220            table_types: StringVectorBuilder::with_capacity(INIT_CAPACITY),
221            table_ids: UInt32VectorBuilder::with_capacity(INIT_CAPACITY),
222            data_length: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
223            max_data_length: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
224            index_length: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
225            avg_row_length: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
226            engines: StringVectorBuilder::with_capacity(INIT_CAPACITY),
227            version: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
228            row_format: StringVectorBuilder::with_capacity(INIT_CAPACITY),
229            table_rows: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
230            max_index_length: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
231            data_free: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
232            auto_increment: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
233            create_time: TimestampSecondVectorBuilder::with_capacity(INIT_CAPACITY),
234            update_time: TimestampSecondVectorBuilder::with_capacity(INIT_CAPACITY),
235            check_time: TimestampSecondVectorBuilder::with_capacity(INIT_CAPACITY),
236            table_collation: StringVectorBuilder::with_capacity(INIT_CAPACITY),
237            checksum: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
238            create_options: StringVectorBuilder::with_capacity(INIT_CAPACITY),
239            table_comment: StringVectorBuilder::with_capacity(INIT_CAPACITY),
240            temporary: StringVectorBuilder::with_capacity(INIT_CAPACITY),
241        }
242    }
243
244    /// Construct the `information_schema.tables` virtual table
245    async fn make_tables(&mut self, request: Option<ScanRequest>) -> Result<RecordBatch> {
246        let catalog_name = self.catalog_name.clone();
247        let catalog_manager = self
248            .catalog_manager
249            .upgrade()
250            .context(UpgradeWeakCatalogManagerRefSnafu)?;
251        let predicates = Predicates::from_scan_request(&request);
252
253        let information_extension = utils::information_extension(&self.catalog_manager)?;
254
255        // TODO(dennis): `region_stats` API is not stable in distributed cluster because of network issue etc.
256        // But we don't want the statements such as `show tables` fail,
257        // so using `unwrap_or_else` here instead of `?` operator.
258        let region_stats = information_extension
259            .region_stats()
260            .await
261            .map_err(|e| {
262                error!(e; "Failed to call region_stats");
263                e
264            })
265            .unwrap_or_else(|_| vec![]);
266
267        for schema_name in catalog_manager.schema_names(&catalog_name, None).await? {
268            let mut stream = catalog_manager.tables(&catalog_name, &schema_name, None);
269
270            while let Some(table) = stream.try_next().await? {
271                let table_info = table.table_info();
272
273                // TODO(dennis): make it working for metric engine
274                let table_region_stats =
275                    if table_info.meta.engine == MITO_ENGINE || table_info.is_physical_table() {
276                        let region_ids = table_info
277                            .meta
278                            .region_numbers
279                            .iter()
280                            .map(|n| RegionId::new(table_info.ident.table_id, *n))
281                            .collect::<HashSet<_>>();
282
283                        region_stats
284                            .iter()
285                            .filter(|stat| region_ids.contains(&stat.id))
286                            .collect::<Vec<_>>()
287                    } else {
288                        vec![]
289                    };
290
291                self.add_table(
292                    &predicates,
293                    &catalog_name,
294                    &schema_name,
295                    table_info,
296                    table.table_type(),
297                    &table_region_stats,
298                );
299            }
300        }
301
302        self.finish()
303    }
304
305    #[allow(clippy::too_many_arguments)]
306    fn add_table(
307        &mut self,
308        predicates: &Predicates,
309        catalog_name: &str,
310        schema_name: &str,
311        table_info: Arc<TableInfo>,
312        table_type: TableType,
313        region_stats: &[&RegionStat],
314    ) {
315        let table_name = table_info.name.as_ref();
316        let table_id = table_info.table_id();
317        let engine = table_info.meta.engine.as_ref();
318
319        let table_type_text = match table_type {
320            TableType::Base => "BASE TABLE",
321            TableType::View => "VIEW",
322            TableType::Temporary => "LOCAL TEMPORARY",
323        };
324
325        let row = [
326            (TABLE_CATALOG, &Value::from(catalog_name)),
327            (TABLE_ID, &Value::from(table_id)),
328            (TABLE_SCHEMA, &Value::from(schema_name)),
329            (ENGINE, &Value::from(engine)),
330            (TABLE_NAME, &Value::from(table_name)),
331            (TABLE_TYPE, &Value::from(table_type_text)),
332        ];
333
334        if !predicates.eval(&row) {
335            return;
336        }
337
338        self.catalog_names.push(Some(catalog_name));
339        self.schema_names.push(Some(schema_name));
340        self.table_names.push(Some(table_name));
341        self.table_types.push(Some(table_type_text));
342        self.table_ids.push(Some(table_id));
343
344        let data_length = region_stats.iter().map(|stat| stat.sst_size).sum();
345        let table_rows = region_stats.iter().map(|stat| stat.num_rows).sum();
346        let index_length = region_stats.iter().map(|stat| stat.index_size).sum();
347
348        // It's not precise, but it is acceptable for long-term data storage.
349        let avg_row_length = if table_rows > 0 {
350            let total_data_length = data_length
351                + region_stats
352                    .iter()
353                    .map(|stat| stat.memtable_size)
354                    .sum::<u64>();
355
356            total_data_length / table_rows
357        } else {
358            0
359        };
360
361        self.data_length.push(Some(data_length));
362        self.index_length.push(Some(index_length));
363        self.table_rows.push(Some(table_rows));
364        self.avg_row_length.push(Some(avg_row_length));
365
366        // TODO(sunng87): use real data for these fields
367        self.max_data_length.push(Some(0));
368        self.checksum.push(Some(0));
369        self.max_index_length.push(Some(0));
370        self.data_free.push(Some(0));
371        self.auto_increment.push(Some(0));
372        self.row_format.push(Some("Fixed"));
373        self.table_collation.push(Some("utf8_bin"));
374        self.update_time.push(None);
375        self.check_time.push(None);
376        // use mariadb default table version number here
377        self.version.push(Some(11));
378        self.table_comment.push(table_info.desc.as_deref());
379        self.create_options
380            .push(Some(table_info.meta.options.to_string().as_ref()));
381        self.create_time
382            .push(Some(table_info.meta.created_on.timestamp().into()));
383
384        self.temporary
385            .push(if matches!(table_type, TableType::Temporary) {
386                Some("Y")
387            } else {
388                Some("N")
389            });
390        self.engines.push(Some(engine));
391    }
392
393    fn finish(&mut self) -> Result<RecordBatch> {
394        let columns: Vec<VectorRef> = vec![
395            Arc::new(self.catalog_names.finish()),
396            Arc::new(self.schema_names.finish()),
397            Arc::new(self.table_names.finish()),
398            Arc::new(self.table_types.finish()),
399            Arc::new(self.table_ids.finish()),
400            Arc::new(self.data_length.finish()),
401            Arc::new(self.max_data_length.finish()),
402            Arc::new(self.index_length.finish()),
403            Arc::new(self.max_index_length.finish()),
404            Arc::new(self.avg_row_length.finish()),
405            Arc::new(self.engines.finish()),
406            Arc::new(self.version.finish()),
407            Arc::new(self.row_format.finish()),
408            Arc::new(self.table_rows.finish()),
409            Arc::new(self.data_free.finish()),
410            Arc::new(self.auto_increment.finish()),
411            Arc::new(self.create_time.finish()),
412            Arc::new(self.update_time.finish()),
413            Arc::new(self.check_time.finish()),
414            Arc::new(self.table_collation.finish()),
415            Arc::new(self.checksum.finish()),
416            Arc::new(self.create_options.finish()),
417            Arc::new(self.table_comment.finish()),
418            Arc::new(self.temporary.finish()),
419        ];
420        RecordBatch::new(self.schema.clone(), columns).context(CreateRecordBatchSnafu)
421    }
422}
423
424impl DfPartitionStream for InformationSchemaTables {
425    fn schema(&self) -> &ArrowSchemaRef {
426        self.schema.arrow_schema()
427    }
428
429    fn execute(&self, _: Arc<TaskContext>) -> DfSendableRecordBatchStream {
430        let schema = self.schema.arrow_schema().clone();
431        let mut builder = self.builder();
432        Box::pin(DfRecordBatchStreamAdapter::new(
433            schema,
434            futures::stream::once(async move {
435                builder
436                    .make_tables(None)
437                    .await
438                    .map(|x| x.into_df_record_batch())
439                    .map_err(Into::into)
440            }),
441        ))
442    }
443}