catalog/system_schema/information_schema/
tables.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use core::pin::pin;
16use std::sync::{Arc, Weak};
17
18use arrow_schema::SchemaRef as ArrowSchemaRef;
19use common_catalog::consts::{INFORMATION_SCHEMA_TABLES_TABLE_ID, MITO_ENGINE};
20use common_error::ext::BoxedError;
21use common_meta::datanode::RegionStat;
22use common_recordbatch::adapter::RecordBatchStreamAdapter;
23use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
24use common_telemetry::error;
25use datafusion::execution::TaskContext;
26use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream;
27use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter;
28use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream;
29use datatypes::prelude::{ConcreteDataType, ScalarVectorBuilder, VectorRef};
30use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
31use datatypes::value::Value;
32use datatypes::vectors::{
33    StringVectorBuilder, TimestampSecondVectorBuilder, UInt32VectorBuilder, UInt64VectorBuilder,
34};
35use futures::StreamExt;
36use snafu::{OptionExt, ResultExt};
37use store_api::storage::{ScanRequest, TableId};
38use table::metadata::{TableInfo, TableType};
39
40use crate::CatalogManager;
41use crate::error::{
42    CreateRecordBatchSnafu, FindRegionRoutesSnafu, InternalSnafu, Result,
43    UpgradeWeakCatalogManagerRefSnafu,
44};
45use crate::kvbackend::KvBackendCatalogManager;
46use crate::system_schema::information_schema::{InformationTable, Predicates, TABLES};
47use crate::system_schema::utils;
48
49pub const TABLE_CATALOG: &str = "table_catalog";
50pub const TABLE_SCHEMA: &str = "table_schema";
51pub const TABLE_NAME: &str = "table_name";
52pub const TABLE_TYPE: &str = "table_type";
53pub const VERSION: &str = "version";
54pub const ROW_FORMAT: &str = "row_format";
55pub const TABLE_ROWS: &str = "table_rows";
56pub const DATA_LENGTH: &str = "data_length";
57pub const INDEX_LENGTH: &str = "index_length";
58pub const MAX_DATA_LENGTH: &str = "max_data_length";
59pub const AVG_ROW_LENGTH: &str = "avg_row_length";
60pub const DATA_FREE: &str = "data_free";
61pub const AUTO_INCREMENT: &str = "auto_increment";
62pub const CREATE_TIME: &str = "create_time";
63pub const UPDATE_TIME: &str = "update_time";
64pub const CHECK_TIME: &str = "check_time";
65pub const TABLE_COLLATION: &str = "table_collation";
66pub const CHECKSUM: &str = "checksum";
67pub const CREATE_OPTIONS: &str = "create_options";
68pub const TABLE_COMMENT: &str = "table_comment";
69pub const MAX_INDEX_LENGTH: &str = "max_index_length";
70pub const TEMPORARY: &str = "temporary";
71const TABLE_ID: &str = "table_id";
72pub const ENGINE: &str = "engine";
73const INIT_CAPACITY: usize = 42;
74
75#[derive(Debug)]
76pub(super) struct InformationSchemaTables {
77    schema: SchemaRef,
78    catalog_name: String,
79    catalog_manager: Weak<dyn CatalogManager>,
80}
81
82impl InformationSchemaTables {
83    pub(super) fn new(catalog_name: String, catalog_manager: Weak<dyn CatalogManager>) -> Self {
84        Self {
85            schema: Self::schema(),
86            catalog_name,
87            catalog_manager,
88        }
89    }
90
91    pub(crate) fn schema() -> SchemaRef {
92        Arc::new(Schema::new(vec![
93            ColumnSchema::new(TABLE_CATALOG, ConcreteDataType::string_datatype(), false),
94            ColumnSchema::new(TABLE_SCHEMA, ConcreteDataType::string_datatype(), false),
95            ColumnSchema::new(TABLE_NAME, ConcreteDataType::string_datatype(), false),
96            ColumnSchema::new(TABLE_TYPE, ConcreteDataType::string_datatype(), false),
97            ColumnSchema::new(TABLE_ID, ConcreteDataType::uint32_datatype(), true),
98            ColumnSchema::new(DATA_LENGTH, ConcreteDataType::uint64_datatype(), true),
99            ColumnSchema::new(MAX_DATA_LENGTH, ConcreteDataType::uint64_datatype(), true),
100            ColumnSchema::new(INDEX_LENGTH, ConcreteDataType::uint64_datatype(), true),
101            ColumnSchema::new(MAX_INDEX_LENGTH, ConcreteDataType::uint64_datatype(), true),
102            ColumnSchema::new(AVG_ROW_LENGTH, ConcreteDataType::uint64_datatype(), true),
103            ColumnSchema::new(ENGINE, ConcreteDataType::string_datatype(), true),
104            ColumnSchema::new(VERSION, ConcreteDataType::uint64_datatype(), true),
105            ColumnSchema::new(ROW_FORMAT, ConcreteDataType::string_datatype(), true),
106            ColumnSchema::new(TABLE_ROWS, ConcreteDataType::uint64_datatype(), true),
107            ColumnSchema::new(DATA_FREE, ConcreteDataType::uint64_datatype(), true),
108            ColumnSchema::new(AUTO_INCREMENT, ConcreteDataType::uint64_datatype(), true),
109            ColumnSchema::new(
110                CREATE_TIME,
111                ConcreteDataType::timestamp_second_datatype(),
112                true,
113            ),
114            ColumnSchema::new(
115                UPDATE_TIME,
116                ConcreteDataType::timestamp_second_datatype(),
117                true,
118            ),
119            ColumnSchema::new(
120                CHECK_TIME,
121                ConcreteDataType::timestamp_second_datatype(),
122                true,
123            ),
124            ColumnSchema::new(TABLE_COLLATION, ConcreteDataType::string_datatype(), true),
125            ColumnSchema::new(CHECKSUM, ConcreteDataType::uint64_datatype(), true),
126            ColumnSchema::new(CREATE_OPTIONS, ConcreteDataType::string_datatype(), true),
127            ColumnSchema::new(TABLE_COMMENT, ConcreteDataType::string_datatype(), true),
128            ColumnSchema::new(TEMPORARY, ConcreteDataType::string_datatype(), true),
129        ]))
130    }
131
132    fn builder(&self) -> InformationSchemaTablesBuilder {
133        InformationSchemaTablesBuilder::new(
134            self.schema.clone(),
135            self.catalog_name.clone(),
136            self.catalog_manager.clone(),
137        )
138    }
139}
140
141impl InformationTable for InformationSchemaTables {
142    fn table_id(&self) -> TableId {
143        INFORMATION_SCHEMA_TABLES_TABLE_ID
144    }
145
146    fn table_name(&self) -> &'static str {
147        TABLES
148    }
149
150    fn schema(&self) -> SchemaRef {
151        self.schema.clone()
152    }
153
154    fn to_stream(&self, request: ScanRequest) -> Result<SendableRecordBatchStream> {
155        let schema = self.schema.arrow_schema().clone();
156        let mut builder = self.builder();
157        let stream = Box::pin(DfRecordBatchStreamAdapter::new(
158            schema,
159            futures::stream::once(async move {
160                builder
161                    .make_tables(Some(request))
162                    .await
163                    .map(|x| x.into_df_record_batch())
164                    .map_err(|err| datafusion::error::DataFusionError::External(Box::new(err)))
165            }),
166        ));
167        Ok(Box::pin(
168            RecordBatchStreamAdapter::try_new(stream)
169                .map_err(BoxedError::new)
170                .context(InternalSnafu)?,
171        ))
172    }
173}
174
175/// Builds the `information_schema.TABLE` table row by row
176///
177/// Columns are based on <https://www.postgresql.org/docs/current/infoschema-columns.html>
178struct InformationSchemaTablesBuilder {
179    schema: SchemaRef,
180    catalog_name: String,
181    catalog_manager: Weak<dyn CatalogManager>,
182
183    catalog_names: StringVectorBuilder,
184    schema_names: StringVectorBuilder,
185    table_names: StringVectorBuilder,
186    table_types: StringVectorBuilder,
187    table_ids: UInt32VectorBuilder,
188    version: UInt64VectorBuilder,
189    row_format: StringVectorBuilder,
190    table_rows: UInt64VectorBuilder,
191    data_length: UInt64VectorBuilder,
192    max_data_length: UInt64VectorBuilder,
193    index_length: UInt64VectorBuilder,
194    avg_row_length: UInt64VectorBuilder,
195    max_index_length: UInt64VectorBuilder,
196    data_free: UInt64VectorBuilder,
197    auto_increment: UInt64VectorBuilder,
198    create_time: TimestampSecondVectorBuilder,
199    update_time: TimestampSecondVectorBuilder,
200    check_time: TimestampSecondVectorBuilder,
201    table_collation: StringVectorBuilder,
202    checksum: UInt64VectorBuilder,
203    create_options: StringVectorBuilder,
204    table_comment: StringVectorBuilder,
205    engines: StringVectorBuilder,
206    temporary: StringVectorBuilder,
207}
208
209impl InformationSchemaTablesBuilder {
210    fn new(
211        schema: SchemaRef,
212        catalog_name: String,
213        catalog_manager: Weak<dyn CatalogManager>,
214    ) -> Self {
215        Self {
216            schema,
217            catalog_name,
218            catalog_manager,
219            catalog_names: StringVectorBuilder::with_capacity(INIT_CAPACITY),
220            schema_names: StringVectorBuilder::with_capacity(INIT_CAPACITY),
221            table_names: StringVectorBuilder::with_capacity(INIT_CAPACITY),
222            table_types: StringVectorBuilder::with_capacity(INIT_CAPACITY),
223            table_ids: UInt32VectorBuilder::with_capacity(INIT_CAPACITY),
224            data_length: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
225            max_data_length: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
226            index_length: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
227            avg_row_length: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
228            engines: StringVectorBuilder::with_capacity(INIT_CAPACITY),
229            version: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
230            row_format: StringVectorBuilder::with_capacity(INIT_CAPACITY),
231            table_rows: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
232            max_index_length: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
233            data_free: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
234            auto_increment: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
235            create_time: TimestampSecondVectorBuilder::with_capacity(INIT_CAPACITY),
236            update_time: TimestampSecondVectorBuilder::with_capacity(INIT_CAPACITY),
237            check_time: TimestampSecondVectorBuilder::with_capacity(INIT_CAPACITY),
238            table_collation: StringVectorBuilder::with_capacity(INIT_CAPACITY),
239            checksum: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
240            create_options: StringVectorBuilder::with_capacity(INIT_CAPACITY),
241            table_comment: StringVectorBuilder::with_capacity(INIT_CAPACITY),
242            temporary: StringVectorBuilder::with_capacity(INIT_CAPACITY),
243        }
244    }
245
246    /// Construct the `information_schema.tables` virtual table
247    async fn make_tables(&mut self, request: Option<ScanRequest>) -> Result<RecordBatch> {
248        let catalog_name = self.catalog_name.clone();
249        let catalog_manager = self
250            .catalog_manager
251            .upgrade()
252            .context(UpgradeWeakCatalogManagerRefSnafu)?;
253        let partition_manager = catalog_manager
254            .as_any()
255            .downcast_ref::<KvBackendCatalogManager>()
256            .map(|catalog_manager| catalog_manager.partition_manager());
257        let predicates = Predicates::from_scan_request(&request);
258
259        let information_extension = utils::information_extension(&self.catalog_manager)?;
260
261        // TODO(dennis): `region_stats` API is not stable in distributed cluster because of network issue etc.
262        // But we don't want the statements such as `show tables` fail,
263        // so using `unwrap_or_else` here instead of `?` operator.
264        let region_stats = {
265            let mut x = information_extension
266                .region_stats()
267                .await
268                .unwrap_or_else(|e| {
269                    error!(e; "Failed to find region stats in information_schema, fallback to all empty");
270                    vec![]
271                });
272            x.sort_unstable_by_key(|x| x.id);
273            x
274        };
275
276        for schema_name in catalog_manager.schema_names(&catalog_name, None).await? {
277            let table_stream = catalog_manager.tables(&catalog_name, &schema_name, None);
278
279            const BATCH_SIZE: usize = 128;
280            // Split tables into chunks
281            let mut table_chunks = pin!(table_stream.ready_chunks(BATCH_SIZE));
282
283            while let Some(tables) = table_chunks.next().await {
284                let tables = tables.into_iter().collect::<Result<Vec<_>>>()?;
285                let mito_or_physical_table_ids = tables
286                    .iter()
287                    .filter(|table| {
288                        table.table_info().meta.engine == MITO_ENGINE
289                            || table.table_info().is_physical_table()
290                    })
291                    .map(|table| table.table_info().ident.table_id)
292                    .collect::<Vec<_>>();
293
294                let table_routes = if let Some(partition_manager) = &partition_manager {
295                    partition_manager
296                        .batch_find_region_routes(&mito_or_physical_table_ids)
297                        .await
298                        .context(FindRegionRoutesSnafu)?
299                } else {
300                    mito_or_physical_table_ids
301                        .into_iter()
302                        .map(|id| (id, vec![]))
303                        .collect()
304                };
305
306                for table in tables {
307                    let table_region_stats =
308                        match table_routes.get(&table.table_info().ident.table_id) {
309                            Some(routes) => routes
310                                .iter()
311                                .flat_map(|route| {
312                                    let region_id = route.region.id;
313                                    region_stats
314                                        .binary_search_by_key(&region_id, |x| x.id)
315                                        .map(|i| &region_stats[i])
316                                })
317                                .collect::<Vec<_>>(),
318                            None => vec![],
319                        };
320
321                    self.add_table(
322                        &predicates,
323                        &catalog_name,
324                        &schema_name,
325                        table.table_info(),
326                        table.table_type(),
327                        &table_region_stats,
328                    );
329                }
330            }
331        }
332
333        self.finish()
334    }
335
336    #[allow(clippy::too_many_arguments)]
337    fn add_table(
338        &mut self,
339        predicates: &Predicates,
340        catalog_name: &str,
341        schema_name: &str,
342        table_info: Arc<TableInfo>,
343        table_type: TableType,
344        region_stats: &[&RegionStat],
345    ) {
346        let table_name = table_info.name.as_ref();
347        let table_id = table_info.table_id();
348        let engine = table_info.meta.engine.as_ref();
349
350        let table_type_text = match table_type {
351            TableType::Base => "BASE TABLE",
352            TableType::View => "VIEW",
353            TableType::Temporary => "LOCAL TEMPORARY",
354        };
355
356        let row = [
357            (TABLE_CATALOG, &Value::from(catalog_name)),
358            (TABLE_ID, &Value::from(table_id)),
359            (TABLE_SCHEMA, &Value::from(schema_name)),
360            (ENGINE, &Value::from(engine)),
361            (TABLE_NAME, &Value::from(table_name)),
362            (TABLE_TYPE, &Value::from(table_type_text)),
363        ];
364
365        if !predicates.eval(&row) {
366            return;
367        }
368
369        self.catalog_names.push(Some(catalog_name));
370        self.schema_names.push(Some(schema_name));
371        self.table_names.push(Some(table_name));
372        self.table_types.push(Some(table_type_text));
373        self.table_ids.push(Some(table_id));
374
375        let data_length = region_stats.iter().map(|stat| stat.sst_size).sum();
376        let table_rows = region_stats.iter().map(|stat| stat.num_rows).sum();
377        let index_length = region_stats.iter().map(|stat| stat.index_size).sum();
378
379        // It's not precise, but it is acceptable for long-term data storage.
380        let avg_row_length = if table_rows > 0 {
381            let total_data_length = data_length
382                + region_stats
383                    .iter()
384                    .map(|stat| stat.memtable_size)
385                    .sum::<u64>();
386
387            total_data_length / table_rows
388        } else {
389            0
390        };
391
392        self.data_length.push(Some(data_length));
393        self.index_length.push(Some(index_length));
394        self.table_rows.push(Some(table_rows));
395        self.avg_row_length.push(Some(avg_row_length));
396
397        // TODO(sunng87): use real data for these fields
398        self.max_data_length.push(Some(0));
399        self.checksum.push(Some(0));
400        self.max_index_length.push(Some(0));
401        self.data_free.push(Some(0));
402        self.auto_increment.push(Some(0));
403        self.row_format.push(Some("Fixed"));
404        self.table_collation.push(Some("utf8_bin"));
405        self.update_time
406            .push(Some(table_info.meta.updated_on.timestamp().into()));
407        self.check_time.push(None);
408        // use mariadb default table version number here
409        self.version.push(Some(11));
410        self.table_comment.push(table_info.desc.as_deref());
411        self.create_options
412            .push(Some(table_info.meta.options.to_string().as_ref()));
413        self.create_time
414            .push(Some(table_info.meta.created_on.timestamp().into()));
415
416        self.temporary
417            .push(if matches!(table_type, TableType::Temporary) {
418                Some("Y")
419            } else {
420                Some("N")
421            });
422        self.engines.push(Some(engine));
423    }
424
425    fn finish(&mut self) -> Result<RecordBatch> {
426        let columns: Vec<VectorRef> = vec![
427            Arc::new(self.catalog_names.finish()),
428            Arc::new(self.schema_names.finish()),
429            Arc::new(self.table_names.finish()),
430            Arc::new(self.table_types.finish()),
431            Arc::new(self.table_ids.finish()),
432            Arc::new(self.data_length.finish()),
433            Arc::new(self.max_data_length.finish()),
434            Arc::new(self.index_length.finish()),
435            Arc::new(self.max_index_length.finish()),
436            Arc::new(self.avg_row_length.finish()),
437            Arc::new(self.engines.finish()),
438            Arc::new(self.version.finish()),
439            Arc::new(self.row_format.finish()),
440            Arc::new(self.table_rows.finish()),
441            Arc::new(self.data_free.finish()),
442            Arc::new(self.auto_increment.finish()),
443            Arc::new(self.create_time.finish()),
444            Arc::new(self.update_time.finish()),
445            Arc::new(self.check_time.finish()),
446            Arc::new(self.table_collation.finish()),
447            Arc::new(self.checksum.finish()),
448            Arc::new(self.create_options.finish()),
449            Arc::new(self.table_comment.finish()),
450            Arc::new(self.temporary.finish()),
451        ];
452        RecordBatch::new(self.schema.clone(), columns).context(CreateRecordBatchSnafu)
453    }
454}
455
456impl DfPartitionStream for InformationSchemaTables {
457    fn schema(&self) -> &ArrowSchemaRef {
458        self.schema.arrow_schema()
459    }
460
461    fn execute(&self, _: Arc<TaskContext>) -> DfSendableRecordBatchStream {
462        let schema = self.schema.arrow_schema().clone();
463        let mut builder = self.builder();
464        Box::pin(DfRecordBatchStreamAdapter::new(
465            schema,
466            futures::stream::once(async move {
467                builder
468                    .make_tables(None)
469                    .await
470                    .map(|x| x.into_df_record_batch())
471                    .map_err(Into::into)
472            }),
473        ))
474    }
475}