catalog/system_schema/information_schema/
tables.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16use std::sync::{Arc, Weak};
17
18use arrow_schema::SchemaRef as ArrowSchemaRef;
19use common_catalog::consts::{INFORMATION_SCHEMA_TABLES_TABLE_ID, MITO_ENGINE};
20use common_error::ext::BoxedError;
21use common_meta::datanode::RegionStat;
22use common_recordbatch::adapter::RecordBatchStreamAdapter;
23use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
24use common_telemetry::error;
25use datafusion::execution::TaskContext;
26use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter;
27use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream;
28use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream;
29use datatypes::prelude::{ConcreteDataType, ScalarVectorBuilder, VectorRef};
30use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
31use datatypes::value::Value;
32use datatypes::vectors::{
33    StringVectorBuilder, TimestampMicrosecondVectorBuilder, UInt32VectorBuilder,
34    UInt64VectorBuilder,
35};
36use futures::TryStreamExt;
37use snafu::{OptionExt, ResultExt};
38use store_api::storage::{RegionId, ScanRequest, TableId};
39use table::metadata::{TableInfo, TableType};
40
41use crate::error::{
42    CreateRecordBatchSnafu, InternalSnafu, Result, UpgradeWeakCatalogManagerRefSnafu,
43};
44use crate::system_schema::information_schema::{InformationTable, Predicates, TABLES};
45use crate::system_schema::utils;
46use crate::CatalogManager;
47
48pub const TABLE_CATALOG: &str = "table_catalog";
49pub const TABLE_SCHEMA: &str = "table_schema";
50pub const TABLE_NAME: &str = "table_name";
51pub const TABLE_TYPE: &str = "table_type";
52pub const VERSION: &str = "version";
53pub const ROW_FORMAT: &str = "row_format";
54pub const TABLE_ROWS: &str = "table_rows";
55pub const DATA_LENGTH: &str = "data_length";
56pub const INDEX_LENGTH: &str = "index_length";
57pub const MAX_DATA_LENGTH: &str = "max_data_length";
58pub const AVG_ROW_LENGTH: &str = "avg_row_length";
59pub const DATA_FREE: &str = "data_free";
60pub const AUTO_INCREMENT: &str = "auto_increment";
61pub const CREATE_TIME: &str = "create_time";
62pub const UPDATE_TIME: &str = "update_time";
63pub const CHECK_TIME: &str = "check_time";
64pub const TABLE_COLLATION: &str = "table_collation";
65pub const CHECKSUM: &str = "checksum";
66pub const CREATE_OPTIONS: &str = "create_options";
67pub const TABLE_COMMENT: &str = "table_comment";
68pub const MAX_INDEX_LENGTH: &str = "max_index_length";
69pub const TEMPORARY: &str = "temporary";
70const TABLE_ID: &str = "table_id";
71pub const ENGINE: &str = "engine";
72const INIT_CAPACITY: usize = 42;
73
74#[derive(Debug)]
75pub(super) struct InformationSchemaTables {
76    schema: SchemaRef,
77    catalog_name: String,
78    catalog_manager: Weak<dyn CatalogManager>,
79}
80
81impl InformationSchemaTables {
82    pub(super) fn new(catalog_name: String, catalog_manager: Weak<dyn CatalogManager>) -> Self {
83        Self {
84            schema: Self::schema(),
85            catalog_name,
86            catalog_manager,
87        }
88    }
89
90    pub(crate) fn schema() -> SchemaRef {
91        Arc::new(Schema::new(vec![
92            ColumnSchema::new(TABLE_CATALOG, ConcreteDataType::string_datatype(), false),
93            ColumnSchema::new(TABLE_SCHEMA, ConcreteDataType::string_datatype(), false),
94            ColumnSchema::new(TABLE_NAME, ConcreteDataType::string_datatype(), false),
95            ColumnSchema::new(TABLE_TYPE, ConcreteDataType::string_datatype(), false),
96            ColumnSchema::new(TABLE_ID, ConcreteDataType::uint32_datatype(), true),
97            ColumnSchema::new(DATA_LENGTH, ConcreteDataType::uint64_datatype(), true),
98            ColumnSchema::new(MAX_DATA_LENGTH, ConcreteDataType::uint64_datatype(), true),
99            ColumnSchema::new(INDEX_LENGTH, ConcreteDataType::uint64_datatype(), true),
100            ColumnSchema::new(MAX_INDEX_LENGTH, ConcreteDataType::uint64_datatype(), true),
101            ColumnSchema::new(AVG_ROW_LENGTH, ConcreteDataType::uint64_datatype(), true),
102            ColumnSchema::new(ENGINE, ConcreteDataType::string_datatype(), true),
103            ColumnSchema::new(VERSION, ConcreteDataType::uint64_datatype(), true),
104            ColumnSchema::new(ROW_FORMAT, ConcreteDataType::string_datatype(), true),
105            ColumnSchema::new(TABLE_ROWS, ConcreteDataType::uint64_datatype(), true),
106            ColumnSchema::new(DATA_FREE, ConcreteDataType::uint64_datatype(), true),
107            ColumnSchema::new(AUTO_INCREMENT, ConcreteDataType::uint64_datatype(), true),
108            ColumnSchema::new(
109                CREATE_TIME,
110                ConcreteDataType::timestamp_microsecond_datatype(),
111                true,
112            ),
113            ColumnSchema::new(
114                UPDATE_TIME,
115                ConcreteDataType::timestamp_microsecond_datatype(),
116                true,
117            ),
118            ColumnSchema::new(
119                CHECK_TIME,
120                ConcreteDataType::timestamp_microsecond_datatype(),
121                true,
122            ),
123            ColumnSchema::new(TABLE_COLLATION, ConcreteDataType::string_datatype(), true),
124            ColumnSchema::new(CHECKSUM, ConcreteDataType::uint64_datatype(), true),
125            ColumnSchema::new(CREATE_OPTIONS, ConcreteDataType::string_datatype(), true),
126            ColumnSchema::new(TABLE_COMMENT, ConcreteDataType::string_datatype(), true),
127            ColumnSchema::new(TEMPORARY, ConcreteDataType::string_datatype(), true),
128        ]))
129    }
130
131    fn builder(&self) -> InformationSchemaTablesBuilder {
132        InformationSchemaTablesBuilder::new(
133            self.schema.clone(),
134            self.catalog_name.clone(),
135            self.catalog_manager.clone(),
136        )
137    }
138}
139
140impl InformationTable for InformationSchemaTables {
141    fn table_id(&self) -> TableId {
142        INFORMATION_SCHEMA_TABLES_TABLE_ID
143    }
144
145    fn table_name(&self) -> &'static str {
146        TABLES
147    }
148
149    fn schema(&self) -> SchemaRef {
150        self.schema.clone()
151    }
152
153    fn to_stream(&self, request: ScanRequest) -> Result<SendableRecordBatchStream> {
154        let schema = self.schema.arrow_schema().clone();
155        let mut builder = self.builder();
156        let stream = Box::pin(DfRecordBatchStreamAdapter::new(
157            schema,
158            futures::stream::once(async move {
159                builder
160                    .make_tables(Some(request))
161                    .await
162                    .map(|x| x.into_df_record_batch())
163                    .map_err(|err| datafusion::error::DataFusionError::External(Box::new(err)))
164            }),
165        ));
166        Ok(Box::pin(
167            RecordBatchStreamAdapter::try_new(stream)
168                .map_err(BoxedError::new)
169                .context(InternalSnafu)?,
170        ))
171    }
172}
173
174/// Builds the `information_schema.TABLE` table row by row
175///
176/// Columns are based on <https://www.postgresql.org/docs/current/infoschema-columns.html>
177struct InformationSchemaTablesBuilder {
178    schema: SchemaRef,
179    catalog_name: String,
180    catalog_manager: Weak<dyn CatalogManager>,
181
182    catalog_names: StringVectorBuilder,
183    schema_names: StringVectorBuilder,
184    table_names: StringVectorBuilder,
185    table_types: StringVectorBuilder,
186    table_ids: UInt32VectorBuilder,
187    version: UInt64VectorBuilder,
188    row_format: StringVectorBuilder,
189    table_rows: UInt64VectorBuilder,
190    data_length: UInt64VectorBuilder,
191    max_data_length: UInt64VectorBuilder,
192    index_length: UInt64VectorBuilder,
193    avg_row_length: UInt64VectorBuilder,
194    max_index_length: UInt64VectorBuilder,
195    data_free: UInt64VectorBuilder,
196    auto_increment: UInt64VectorBuilder,
197    create_time: TimestampMicrosecondVectorBuilder,
198    update_time: TimestampMicrosecondVectorBuilder,
199    check_time: TimestampMicrosecondVectorBuilder,
200    table_collation: StringVectorBuilder,
201    checksum: UInt64VectorBuilder,
202    create_options: StringVectorBuilder,
203    table_comment: StringVectorBuilder,
204    engines: StringVectorBuilder,
205    temporary: StringVectorBuilder,
206}
207
208impl InformationSchemaTablesBuilder {
209    fn new(
210        schema: SchemaRef,
211        catalog_name: String,
212        catalog_manager: Weak<dyn CatalogManager>,
213    ) -> Self {
214        Self {
215            schema,
216            catalog_name,
217            catalog_manager,
218            catalog_names: StringVectorBuilder::with_capacity(INIT_CAPACITY),
219            schema_names: StringVectorBuilder::with_capacity(INIT_CAPACITY),
220            table_names: StringVectorBuilder::with_capacity(INIT_CAPACITY),
221            table_types: StringVectorBuilder::with_capacity(INIT_CAPACITY),
222            table_ids: UInt32VectorBuilder::with_capacity(INIT_CAPACITY),
223            data_length: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
224            max_data_length: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
225            index_length: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
226            avg_row_length: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
227            engines: StringVectorBuilder::with_capacity(INIT_CAPACITY),
228            version: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
229            row_format: StringVectorBuilder::with_capacity(INIT_CAPACITY),
230            table_rows: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
231            max_index_length: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
232            data_free: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
233            auto_increment: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
234            create_time: TimestampMicrosecondVectorBuilder::with_capacity(INIT_CAPACITY),
235            update_time: TimestampMicrosecondVectorBuilder::with_capacity(INIT_CAPACITY),
236            check_time: TimestampMicrosecondVectorBuilder::with_capacity(INIT_CAPACITY),
237            table_collation: StringVectorBuilder::with_capacity(INIT_CAPACITY),
238            checksum: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
239            create_options: StringVectorBuilder::with_capacity(INIT_CAPACITY),
240            table_comment: StringVectorBuilder::with_capacity(INIT_CAPACITY),
241            temporary: StringVectorBuilder::with_capacity(INIT_CAPACITY),
242        }
243    }
244
245    /// Construct the `information_schema.tables` virtual table
246    async fn make_tables(&mut self, request: Option<ScanRequest>) -> Result<RecordBatch> {
247        let catalog_name = self.catalog_name.clone();
248        let catalog_manager = self
249            .catalog_manager
250            .upgrade()
251            .context(UpgradeWeakCatalogManagerRefSnafu)?;
252        let predicates = Predicates::from_scan_request(&request);
253
254        let information_extension = utils::information_extension(&self.catalog_manager)?;
255
256        // TODO(dennis): `region_stats` API is not stable in distributed cluster because of network issue etc.
257        // But we don't want the statements such as `show tables` fail,
258        // so using `unwrap_or_else` here instead of `?` operator.
259        let region_stats = information_extension
260            .region_stats()
261            .await
262            .map_err(|e| {
263                error!(e; "Failed to call region_stats");
264                e
265            })
266            .unwrap_or_else(|_| vec![]);
267
268        for schema_name in catalog_manager.schema_names(&catalog_name, None).await? {
269            let mut stream = catalog_manager.tables(&catalog_name, &schema_name, None);
270
271            while let Some(table) = stream.try_next().await? {
272                let table_info = table.table_info();
273
274                // TODO(dennis): make it working for metric engine
275                let table_region_stats =
276                    if table_info.meta.engine == MITO_ENGINE || table_info.is_physical_table() {
277                        let region_ids = table_info
278                            .meta
279                            .region_numbers
280                            .iter()
281                            .map(|n| RegionId::new(table_info.ident.table_id, *n))
282                            .collect::<HashSet<_>>();
283
284                        region_stats
285                            .iter()
286                            .filter(|stat| region_ids.contains(&stat.id))
287                            .collect::<Vec<_>>()
288                    } else {
289                        vec![]
290                    };
291
292                self.add_table(
293                    &predicates,
294                    &catalog_name,
295                    &schema_name,
296                    table_info,
297                    table.table_type(),
298                    &table_region_stats,
299                );
300            }
301        }
302
303        self.finish()
304    }
305
306    #[allow(clippy::too_many_arguments)]
307    fn add_table(
308        &mut self,
309        predicates: &Predicates,
310        catalog_name: &str,
311        schema_name: &str,
312        table_info: Arc<TableInfo>,
313        table_type: TableType,
314        region_stats: &[&RegionStat],
315    ) {
316        let table_name = table_info.name.as_ref();
317        let table_id = table_info.table_id();
318        let engine = table_info.meta.engine.as_ref();
319
320        let table_type_text = match table_type {
321            TableType::Base => "BASE TABLE",
322            TableType::View => "VIEW",
323            TableType::Temporary => "LOCAL TEMPORARY",
324        };
325
326        let row = [
327            (TABLE_CATALOG, &Value::from(catalog_name)),
328            (TABLE_ID, &Value::from(table_id)),
329            (TABLE_SCHEMA, &Value::from(schema_name)),
330            (ENGINE, &Value::from(engine)),
331            (TABLE_NAME, &Value::from(table_name)),
332            (TABLE_TYPE, &Value::from(table_type_text)),
333        ];
334
335        if !predicates.eval(&row) {
336            return;
337        }
338
339        self.catalog_names.push(Some(catalog_name));
340        self.schema_names.push(Some(schema_name));
341        self.table_names.push(Some(table_name));
342        self.table_types.push(Some(table_type_text));
343        self.table_ids.push(Some(table_id));
344
345        let data_length = region_stats.iter().map(|stat| stat.sst_size).sum();
346        let table_rows = region_stats.iter().map(|stat| stat.num_rows).sum();
347        let index_length = region_stats.iter().map(|stat| stat.index_size).sum();
348
349        // It's not precise, but it is acceptable for long-term data storage.
350        let avg_row_length = if table_rows > 0 {
351            let total_data_length = data_length
352                + region_stats
353                    .iter()
354                    .map(|stat| stat.memtable_size)
355                    .sum::<u64>();
356
357            total_data_length / table_rows
358        } else {
359            0
360        };
361
362        self.data_length.push(Some(data_length));
363        self.index_length.push(Some(index_length));
364        self.table_rows.push(Some(table_rows));
365        self.avg_row_length.push(Some(avg_row_length));
366
367        // TODO(sunng87): use real data for these fields
368        self.max_data_length.push(Some(0));
369        self.checksum.push(Some(0));
370        self.max_index_length.push(Some(0));
371        self.data_free.push(Some(0));
372        self.auto_increment.push(Some(0));
373        self.row_format.push(Some("Fixed"));
374        self.table_collation.push(Some("utf8_bin"));
375        self.update_time.push(None);
376        self.check_time.push(None);
377        // use mariadb default table version number here
378        self.version.push(Some(11));
379        self.table_comment.push(table_info.desc.as_deref());
380        self.create_options
381            .push(Some(table_info.meta.options.to_string().as_ref()));
382        self.create_time
383            .push(Some(table_info.meta.created_on.timestamp_millis().into()));
384
385        self.temporary
386            .push(if matches!(table_type, TableType::Temporary) {
387                Some("Y")
388            } else {
389                Some("N")
390            });
391        self.engines.push(Some(engine));
392    }
393
394    fn finish(&mut self) -> Result<RecordBatch> {
395        let columns: Vec<VectorRef> = vec![
396            Arc::new(self.catalog_names.finish()),
397            Arc::new(self.schema_names.finish()),
398            Arc::new(self.table_names.finish()),
399            Arc::new(self.table_types.finish()),
400            Arc::new(self.table_ids.finish()),
401            Arc::new(self.data_length.finish()),
402            Arc::new(self.max_data_length.finish()),
403            Arc::new(self.index_length.finish()),
404            Arc::new(self.max_index_length.finish()),
405            Arc::new(self.avg_row_length.finish()),
406            Arc::new(self.engines.finish()),
407            Arc::new(self.version.finish()),
408            Arc::new(self.row_format.finish()),
409            Arc::new(self.table_rows.finish()),
410            Arc::new(self.data_free.finish()),
411            Arc::new(self.auto_increment.finish()),
412            Arc::new(self.create_time.finish()),
413            Arc::new(self.update_time.finish()),
414            Arc::new(self.check_time.finish()),
415            Arc::new(self.table_collation.finish()),
416            Arc::new(self.checksum.finish()),
417            Arc::new(self.create_options.finish()),
418            Arc::new(self.table_comment.finish()),
419            Arc::new(self.temporary.finish()),
420        ];
421        RecordBatch::new(self.schema.clone(), columns).context(CreateRecordBatchSnafu)
422    }
423}
424
425impl DfPartitionStream for InformationSchemaTables {
426    fn schema(&self) -> &ArrowSchemaRef {
427        self.schema.arrow_schema()
428    }
429
430    fn execute(&self, _: Arc<TaskContext>) -> DfSendableRecordBatchStream {
431        let schema = self.schema.arrow_schema().clone();
432        let mut builder = self.builder();
433        Box::pin(DfRecordBatchStreamAdapter::new(
434            schema,
435            futures::stream::once(async move {
436                builder
437                    .make_tables(None)
438                    .await
439                    .map(|x| x.into_df_record_batch())
440                    .map_err(Into::into)
441            }),
442        ))
443    }
444}