catalog/system_schema/information_schema/
tables.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::{Arc, Weak};
16
17use arrow_schema::SchemaRef as ArrowSchemaRef;
18use common_catalog::consts::{INFORMATION_SCHEMA_TABLES_TABLE_ID, MITO_ENGINE};
19use common_error::ext::BoxedError;
20use common_meta::datanode::RegionStat;
21use common_recordbatch::adapter::RecordBatchStreamAdapter;
22use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
23use common_telemetry::error;
24use datafusion::execution::TaskContext;
25use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream;
26use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter;
27use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream;
28use datatypes::prelude::{ConcreteDataType, ScalarVectorBuilder, VectorRef};
29use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
30use datatypes::value::Value;
31use datatypes::vectors::{
32    StringVectorBuilder, TimestampSecondVectorBuilder, UInt32VectorBuilder, UInt64VectorBuilder,
33};
34use futures::TryStreamExt;
35use snafu::{OptionExt, ResultExt};
36use store_api::storage::{RegionId, ScanRequest, TableId};
37use table::metadata::{TableInfo, TableType};
38
39use crate::CatalogManager;
40use crate::error::{
41    CreateRecordBatchSnafu, InternalSnafu, Result, UpgradeWeakCatalogManagerRefSnafu,
42};
43use crate::system_schema::information_schema::{InformationTable, Predicates, TABLES};
44use crate::system_schema::utils;
45
46pub const TABLE_CATALOG: &str = "table_catalog";
47pub const TABLE_SCHEMA: &str = "table_schema";
48pub const TABLE_NAME: &str = "table_name";
49pub const TABLE_TYPE: &str = "table_type";
50pub const VERSION: &str = "version";
51pub const ROW_FORMAT: &str = "row_format";
52pub const TABLE_ROWS: &str = "table_rows";
53pub const DATA_LENGTH: &str = "data_length";
54pub const INDEX_LENGTH: &str = "index_length";
55pub const MAX_DATA_LENGTH: &str = "max_data_length";
56pub const AVG_ROW_LENGTH: &str = "avg_row_length";
57pub const DATA_FREE: &str = "data_free";
58pub const AUTO_INCREMENT: &str = "auto_increment";
59pub const CREATE_TIME: &str = "create_time";
60pub const UPDATE_TIME: &str = "update_time";
61pub const CHECK_TIME: &str = "check_time";
62pub const TABLE_COLLATION: &str = "table_collation";
63pub const CHECKSUM: &str = "checksum";
64pub const CREATE_OPTIONS: &str = "create_options";
65pub const TABLE_COMMENT: &str = "table_comment";
66pub const MAX_INDEX_LENGTH: &str = "max_index_length";
67pub const TEMPORARY: &str = "temporary";
68const TABLE_ID: &str = "table_id";
69pub const ENGINE: &str = "engine";
70const INIT_CAPACITY: usize = 42;
71
72#[derive(Debug)]
73pub(super) struct InformationSchemaTables {
74    schema: SchemaRef,
75    catalog_name: String,
76    catalog_manager: Weak<dyn CatalogManager>,
77}
78
79impl InformationSchemaTables {
80    pub(super) fn new(catalog_name: String, catalog_manager: Weak<dyn CatalogManager>) -> Self {
81        Self {
82            schema: Self::schema(),
83            catalog_name,
84            catalog_manager,
85        }
86    }
87
88    pub(crate) fn schema() -> SchemaRef {
89        Arc::new(Schema::new(vec![
90            ColumnSchema::new(TABLE_CATALOG, ConcreteDataType::string_datatype(), false),
91            ColumnSchema::new(TABLE_SCHEMA, ConcreteDataType::string_datatype(), false),
92            ColumnSchema::new(TABLE_NAME, ConcreteDataType::string_datatype(), false),
93            ColumnSchema::new(TABLE_TYPE, ConcreteDataType::string_datatype(), false),
94            ColumnSchema::new(TABLE_ID, ConcreteDataType::uint32_datatype(), true),
95            ColumnSchema::new(DATA_LENGTH, ConcreteDataType::uint64_datatype(), true),
96            ColumnSchema::new(MAX_DATA_LENGTH, ConcreteDataType::uint64_datatype(), true),
97            ColumnSchema::new(INDEX_LENGTH, ConcreteDataType::uint64_datatype(), true),
98            ColumnSchema::new(MAX_INDEX_LENGTH, ConcreteDataType::uint64_datatype(), true),
99            ColumnSchema::new(AVG_ROW_LENGTH, ConcreteDataType::uint64_datatype(), true),
100            ColumnSchema::new(ENGINE, ConcreteDataType::string_datatype(), true),
101            ColumnSchema::new(VERSION, ConcreteDataType::uint64_datatype(), true),
102            ColumnSchema::new(ROW_FORMAT, ConcreteDataType::string_datatype(), true),
103            ColumnSchema::new(TABLE_ROWS, ConcreteDataType::uint64_datatype(), true),
104            ColumnSchema::new(DATA_FREE, ConcreteDataType::uint64_datatype(), true),
105            ColumnSchema::new(AUTO_INCREMENT, ConcreteDataType::uint64_datatype(), true),
106            ColumnSchema::new(
107                CREATE_TIME,
108                ConcreteDataType::timestamp_second_datatype(),
109                true,
110            ),
111            ColumnSchema::new(
112                UPDATE_TIME,
113                ConcreteDataType::timestamp_second_datatype(),
114                true,
115            ),
116            ColumnSchema::new(
117                CHECK_TIME,
118                ConcreteDataType::timestamp_second_datatype(),
119                true,
120            ),
121            ColumnSchema::new(TABLE_COLLATION, ConcreteDataType::string_datatype(), true),
122            ColumnSchema::new(CHECKSUM, ConcreteDataType::uint64_datatype(), true),
123            ColumnSchema::new(CREATE_OPTIONS, ConcreteDataType::string_datatype(), true),
124            ColumnSchema::new(TABLE_COMMENT, ConcreteDataType::string_datatype(), true),
125            ColumnSchema::new(TEMPORARY, ConcreteDataType::string_datatype(), true),
126        ]))
127    }
128
129    fn builder(&self) -> InformationSchemaTablesBuilder {
130        InformationSchemaTablesBuilder::new(
131            self.schema.clone(),
132            self.catalog_name.clone(),
133            self.catalog_manager.clone(),
134        )
135    }
136}
137
138impl InformationTable for InformationSchemaTables {
139    fn table_id(&self) -> TableId {
140        INFORMATION_SCHEMA_TABLES_TABLE_ID
141    }
142
143    fn table_name(&self) -> &'static str {
144        TABLES
145    }
146
147    fn schema(&self) -> SchemaRef {
148        self.schema.clone()
149    }
150
151    fn to_stream(&self, request: ScanRequest) -> Result<SendableRecordBatchStream> {
152        let schema = self.schema.arrow_schema().clone();
153        let mut builder = self.builder();
154        let stream = Box::pin(DfRecordBatchStreamAdapter::new(
155            schema,
156            futures::stream::once(async move {
157                builder
158                    .make_tables(Some(request))
159                    .await
160                    .map(|x| x.into_df_record_batch())
161                    .map_err(|err| datafusion::error::DataFusionError::External(Box::new(err)))
162            }),
163        ));
164        Ok(Box::pin(
165            RecordBatchStreamAdapter::try_new(stream)
166                .map_err(BoxedError::new)
167                .context(InternalSnafu)?,
168        ))
169    }
170}
171
172/// Builds the `information_schema.TABLE` table row by row
173///
174/// Columns are based on <https://www.postgresql.org/docs/current/infoschema-columns.html>
175struct InformationSchemaTablesBuilder {
176    schema: SchemaRef,
177    catalog_name: String,
178    catalog_manager: Weak<dyn CatalogManager>,
179
180    catalog_names: StringVectorBuilder,
181    schema_names: StringVectorBuilder,
182    table_names: StringVectorBuilder,
183    table_types: StringVectorBuilder,
184    table_ids: UInt32VectorBuilder,
185    version: UInt64VectorBuilder,
186    row_format: StringVectorBuilder,
187    table_rows: UInt64VectorBuilder,
188    data_length: UInt64VectorBuilder,
189    max_data_length: UInt64VectorBuilder,
190    index_length: UInt64VectorBuilder,
191    avg_row_length: UInt64VectorBuilder,
192    max_index_length: UInt64VectorBuilder,
193    data_free: UInt64VectorBuilder,
194    auto_increment: UInt64VectorBuilder,
195    create_time: TimestampSecondVectorBuilder,
196    update_time: TimestampSecondVectorBuilder,
197    check_time: TimestampSecondVectorBuilder,
198    table_collation: StringVectorBuilder,
199    checksum: UInt64VectorBuilder,
200    create_options: StringVectorBuilder,
201    table_comment: StringVectorBuilder,
202    engines: StringVectorBuilder,
203    temporary: StringVectorBuilder,
204}
205
206impl InformationSchemaTablesBuilder {
207    fn new(
208        schema: SchemaRef,
209        catalog_name: String,
210        catalog_manager: Weak<dyn CatalogManager>,
211    ) -> Self {
212        Self {
213            schema,
214            catalog_name,
215            catalog_manager,
216            catalog_names: StringVectorBuilder::with_capacity(INIT_CAPACITY),
217            schema_names: StringVectorBuilder::with_capacity(INIT_CAPACITY),
218            table_names: StringVectorBuilder::with_capacity(INIT_CAPACITY),
219            table_types: StringVectorBuilder::with_capacity(INIT_CAPACITY),
220            table_ids: UInt32VectorBuilder::with_capacity(INIT_CAPACITY),
221            data_length: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
222            max_data_length: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
223            index_length: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
224            avg_row_length: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
225            engines: StringVectorBuilder::with_capacity(INIT_CAPACITY),
226            version: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
227            row_format: StringVectorBuilder::with_capacity(INIT_CAPACITY),
228            table_rows: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
229            max_index_length: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
230            data_free: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
231            auto_increment: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
232            create_time: TimestampSecondVectorBuilder::with_capacity(INIT_CAPACITY),
233            update_time: TimestampSecondVectorBuilder::with_capacity(INIT_CAPACITY),
234            check_time: TimestampSecondVectorBuilder::with_capacity(INIT_CAPACITY),
235            table_collation: StringVectorBuilder::with_capacity(INIT_CAPACITY),
236            checksum: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
237            create_options: StringVectorBuilder::with_capacity(INIT_CAPACITY),
238            table_comment: StringVectorBuilder::with_capacity(INIT_CAPACITY),
239            temporary: StringVectorBuilder::with_capacity(INIT_CAPACITY),
240        }
241    }
242
243    /// Construct the `information_schema.tables` virtual table
244    async fn make_tables(&mut self, request: Option<ScanRequest>) -> Result<RecordBatch> {
245        let catalog_name = self.catalog_name.clone();
246        let catalog_manager = self
247            .catalog_manager
248            .upgrade()
249            .context(UpgradeWeakCatalogManagerRefSnafu)?;
250        let predicates = Predicates::from_scan_request(&request);
251
252        let information_extension = utils::information_extension(&self.catalog_manager)?;
253
254        // TODO(dennis): `region_stats` API is not stable in distributed cluster because of network issue etc.
255        // But we don't want the statements such as `show tables` fail,
256        // so using `unwrap_or_else` here instead of `?` operator.
257        let region_stats = {
258            let mut x = information_extension
259                .region_stats()
260                .await
261                .unwrap_or_else(|e| {
262                    error!(e; "Failed to find region stats in information_schema, fallback to all empty");
263                    vec![]
264                });
265            x.sort_unstable_by_key(|x| x.id);
266            x
267        };
268
269        for schema_name in catalog_manager.schema_names(&catalog_name, None).await? {
270            let mut stream = catalog_manager.tables(&catalog_name, &schema_name, None);
271
272            while let Some(table) = stream.try_next().await? {
273                let table_info = table.table_info();
274
275                // TODO(dennis): make it working for metric engine
276                let table_region_stats =
277                    if table_info.meta.engine == MITO_ENGINE || table_info.is_physical_table() {
278                        table_info
279                            .meta
280                            .region_numbers
281                            .iter()
282                            .map(|n| RegionId::new(table_info.ident.table_id, *n))
283                            .flat_map(|region_id| {
284                                region_stats
285                                    .binary_search_by_key(&region_id, |x| x.id)
286                                    .map(|i| &region_stats[i])
287                            })
288                            .collect::<Vec<_>>()
289                    } else {
290                        vec![]
291                    };
292
293                self.add_table(
294                    &predicates,
295                    &catalog_name,
296                    &schema_name,
297                    table_info,
298                    table.table_type(),
299                    &table_region_stats,
300                );
301            }
302        }
303
304        self.finish()
305    }
306
307    #[allow(clippy::too_many_arguments)]
308    fn add_table(
309        &mut self,
310        predicates: &Predicates,
311        catalog_name: &str,
312        schema_name: &str,
313        table_info: Arc<TableInfo>,
314        table_type: TableType,
315        region_stats: &[&RegionStat],
316    ) {
317        let table_name = table_info.name.as_ref();
318        let table_id = table_info.table_id();
319        let engine = table_info.meta.engine.as_ref();
320
321        let table_type_text = match table_type {
322            TableType::Base => "BASE TABLE",
323            TableType::View => "VIEW",
324            TableType::Temporary => "LOCAL TEMPORARY",
325        };
326
327        let row = [
328            (TABLE_CATALOG, &Value::from(catalog_name)),
329            (TABLE_ID, &Value::from(table_id)),
330            (TABLE_SCHEMA, &Value::from(schema_name)),
331            (ENGINE, &Value::from(engine)),
332            (TABLE_NAME, &Value::from(table_name)),
333            (TABLE_TYPE, &Value::from(table_type_text)),
334        ];
335
336        if !predicates.eval(&row) {
337            return;
338        }
339
340        self.catalog_names.push(Some(catalog_name));
341        self.schema_names.push(Some(schema_name));
342        self.table_names.push(Some(table_name));
343        self.table_types.push(Some(table_type_text));
344        self.table_ids.push(Some(table_id));
345
346        let data_length = region_stats.iter().map(|stat| stat.sst_size).sum();
347        let table_rows = region_stats.iter().map(|stat| stat.num_rows).sum();
348        let index_length = region_stats.iter().map(|stat| stat.index_size).sum();
349
350        // It's not precise, but it is acceptable for long-term data storage.
351        let avg_row_length = if table_rows > 0 {
352            let total_data_length = data_length
353                + region_stats
354                    .iter()
355                    .map(|stat| stat.memtable_size)
356                    .sum::<u64>();
357
358            total_data_length / table_rows
359        } else {
360            0
361        };
362
363        self.data_length.push(Some(data_length));
364        self.index_length.push(Some(index_length));
365        self.table_rows.push(Some(table_rows));
366        self.avg_row_length.push(Some(avg_row_length));
367
368        // TODO(sunng87): use real data for these fields
369        self.max_data_length.push(Some(0));
370        self.checksum.push(Some(0));
371        self.max_index_length.push(Some(0));
372        self.data_free.push(Some(0));
373        self.auto_increment.push(Some(0));
374        self.row_format.push(Some("Fixed"));
375        self.table_collation.push(Some("utf8_bin"));
376        self.update_time
377            .push(Some(table_info.meta.updated_on.timestamp().into()));
378        self.check_time.push(None);
379        // use mariadb default table version number here
380        self.version.push(Some(11));
381        self.table_comment.push(table_info.desc.as_deref());
382        self.create_options
383            .push(Some(table_info.meta.options.to_string().as_ref()));
384        self.create_time
385            .push(Some(table_info.meta.created_on.timestamp().into()));
386
387        self.temporary
388            .push(if matches!(table_type, TableType::Temporary) {
389                Some("Y")
390            } else {
391                Some("N")
392            });
393        self.engines.push(Some(engine));
394    }
395
396    fn finish(&mut self) -> Result<RecordBatch> {
397        let columns: Vec<VectorRef> = vec![
398            Arc::new(self.catalog_names.finish()),
399            Arc::new(self.schema_names.finish()),
400            Arc::new(self.table_names.finish()),
401            Arc::new(self.table_types.finish()),
402            Arc::new(self.table_ids.finish()),
403            Arc::new(self.data_length.finish()),
404            Arc::new(self.max_data_length.finish()),
405            Arc::new(self.index_length.finish()),
406            Arc::new(self.max_index_length.finish()),
407            Arc::new(self.avg_row_length.finish()),
408            Arc::new(self.engines.finish()),
409            Arc::new(self.version.finish()),
410            Arc::new(self.row_format.finish()),
411            Arc::new(self.table_rows.finish()),
412            Arc::new(self.data_free.finish()),
413            Arc::new(self.auto_increment.finish()),
414            Arc::new(self.create_time.finish()),
415            Arc::new(self.update_time.finish()),
416            Arc::new(self.check_time.finish()),
417            Arc::new(self.table_collation.finish()),
418            Arc::new(self.checksum.finish()),
419            Arc::new(self.create_options.finish()),
420            Arc::new(self.table_comment.finish()),
421            Arc::new(self.temporary.finish()),
422        ];
423        RecordBatch::new(self.schema.clone(), columns).context(CreateRecordBatchSnafu)
424    }
425}
426
427impl DfPartitionStream for InformationSchemaTables {
428    fn schema(&self) -> &ArrowSchemaRef {
429        self.schema.arrow_schema()
430    }
431
432    fn execute(&self, _: Arc<TaskContext>) -> DfSendableRecordBatchStream {
433        let schema = self.schema.arrow_schema().clone();
434        let mut builder = self.builder();
435        Box::pin(DfRecordBatchStreamAdapter::new(
436            schema,
437            futures::stream::once(async move {
438                builder
439                    .make_tables(None)
440                    .await
441                    .map(|x| x.into_df_record_batch())
442                    .map_err(Into::into)
443            }),
444        ))
445    }
446}