catalog/system_schema/information_schema/
partitions.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use core::pin::pin;
16use std::sync::{Arc, Weak};
17
18use arrow_schema::SchemaRef as ArrowSchemaRef;
19use common_catalog::consts::INFORMATION_SCHEMA_PARTITIONS_TABLE_ID;
20use common_error::ext::BoxedError;
21use common_recordbatch::adapter::RecordBatchStreamAdapter;
22use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
23use datafusion::execution::TaskContext;
24use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream;
25use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter;
26use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream;
27use datatypes::prelude::{ConcreteDataType, ScalarVectorBuilder, VectorRef};
28use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
29use datatypes::timestamp::TimestampSecond;
30use datatypes::value::Value;
31use datatypes::vectors::{
32    ConstantVector, Int64Vector, Int64VectorBuilder, MutableVector, StringVector,
33    StringVectorBuilder, TimestampSecondVector, TimestampSecondVectorBuilder, UInt64VectorBuilder,
34};
35use futures::{StreamExt, TryStreamExt};
36use partition::manager::PartitionInfo;
37use snafu::{OptionExt, ResultExt};
38use store_api::storage::{ScanRequest, TableId};
39use table::metadata::{TableInfo, TableType};
40
41use crate::CatalogManager;
42use crate::error::{
43    CreateRecordBatchSnafu, FindPartitionsSnafu, InternalSnafu, PartitionManagerNotFoundSnafu,
44    Result, UpgradeWeakCatalogManagerRefSnafu,
45};
46use crate::kvbackend::KvBackendCatalogManager;
47use crate::system_schema::information_schema::{InformationTable, PARTITIONS, Predicates};
48
49const TABLE_CATALOG: &str = "table_catalog";
50const TABLE_SCHEMA: &str = "table_schema";
51const TABLE_NAME: &str = "table_name";
52const PARTITION_NAME: &str = "partition_name";
53const PARTITION_EXPRESSION: &str = "partition_expression";
54/// The region id
55const GREPTIME_PARTITION_ID: &str = "greptime_partition_id";
56const INIT_CAPACITY: usize = 42;
57
58/// The `PARTITIONS` table provides information about partitioned tables.
59/// See https://dev.mysql.com/doc/refman/8.0/en/information-schema-partitions-table.html
60/// We provide an extral column `greptime_partition_id` for GreptimeDB region id.
61#[derive(Debug)]
62pub(super) struct InformationSchemaPartitions {
63    schema: SchemaRef,
64    catalog_name: String,
65    catalog_manager: Weak<dyn CatalogManager>,
66}
67
68impl InformationSchemaPartitions {
69    pub(super) fn new(catalog_name: String, catalog_manager: Weak<dyn CatalogManager>) -> Self {
70        Self {
71            schema: Self::schema(),
72            catalog_name,
73            catalog_manager,
74        }
75    }
76
77    pub(crate) fn schema() -> SchemaRef {
78        Arc::new(Schema::new(vec![
79            ColumnSchema::new(TABLE_CATALOG, ConcreteDataType::string_datatype(), false),
80            ColumnSchema::new(TABLE_SCHEMA, ConcreteDataType::string_datatype(), false),
81            ColumnSchema::new(TABLE_NAME, ConcreteDataType::string_datatype(), false),
82            ColumnSchema::new(PARTITION_NAME, ConcreteDataType::string_datatype(), false),
83            ColumnSchema::new(
84                "subpartition_name",
85                ConcreteDataType::string_datatype(),
86                true,
87            ),
88            ColumnSchema::new(
89                "partition_ordinal_position",
90                ConcreteDataType::int64_datatype(),
91                true,
92            ),
93            ColumnSchema::new(
94                "subpartition_ordinal_position",
95                ConcreteDataType::int64_datatype(),
96                true,
97            ),
98            ColumnSchema::new(
99                "partition_method",
100                ConcreteDataType::string_datatype(),
101                true,
102            ),
103            ColumnSchema::new(
104                "subpartition_method",
105                ConcreteDataType::string_datatype(),
106                true,
107            ),
108            ColumnSchema::new(
109                PARTITION_EXPRESSION,
110                ConcreteDataType::string_datatype(),
111                true,
112            ),
113            ColumnSchema::new(
114                "subpartition_expression",
115                ConcreteDataType::string_datatype(),
116                true,
117            ),
118            ColumnSchema::new(
119                "partition_description",
120                ConcreteDataType::string_datatype(),
121                true,
122            ),
123            ColumnSchema::new("table_rows", ConcreteDataType::int64_datatype(), true),
124            ColumnSchema::new("avg_row_length", ConcreteDataType::int64_datatype(), true),
125            ColumnSchema::new("data_length", ConcreteDataType::int64_datatype(), true),
126            ColumnSchema::new("max_data_length", ConcreteDataType::int64_datatype(), true),
127            ColumnSchema::new("index_length", ConcreteDataType::int64_datatype(), true),
128            ColumnSchema::new("data_free", ConcreteDataType::int64_datatype(), true),
129            ColumnSchema::new(
130                "create_time",
131                ConcreteDataType::timestamp_second_datatype(),
132                true,
133            ),
134            ColumnSchema::new(
135                "update_time",
136                ConcreteDataType::timestamp_second_datatype(),
137                true,
138            ),
139            ColumnSchema::new(
140                "check_time",
141                ConcreteDataType::timestamp_second_datatype(),
142                true,
143            ),
144            ColumnSchema::new("checksum", ConcreteDataType::int64_datatype(), true),
145            ColumnSchema::new(
146                "partition_comment",
147                ConcreteDataType::string_datatype(),
148                true,
149            ),
150            ColumnSchema::new("nodegroup", ConcreteDataType::string_datatype(), true),
151            ColumnSchema::new("tablespace_name", ConcreteDataType::string_datatype(), true),
152            ColumnSchema::new(
153                GREPTIME_PARTITION_ID,
154                ConcreteDataType::uint64_datatype(),
155                true,
156            ),
157        ]))
158    }
159
160    fn builder(&self) -> InformationSchemaPartitionsBuilder {
161        InformationSchemaPartitionsBuilder::new(
162            self.schema.clone(),
163            self.catalog_name.clone(),
164            self.catalog_manager.clone(),
165        )
166    }
167}
168
169impl InformationTable for InformationSchemaPartitions {
170    fn table_id(&self) -> TableId {
171        INFORMATION_SCHEMA_PARTITIONS_TABLE_ID
172    }
173
174    fn table_name(&self) -> &'static str {
175        PARTITIONS
176    }
177
178    fn schema(&self) -> SchemaRef {
179        self.schema.clone()
180    }
181
182    fn to_stream(&self, request: ScanRequest) -> Result<SendableRecordBatchStream> {
183        let schema = self.schema.arrow_schema().clone();
184        let mut builder = self.builder();
185        let stream = Box::pin(DfRecordBatchStreamAdapter::new(
186            schema,
187            futures::stream::once(async move {
188                builder
189                    .make_partitions(Some(request))
190                    .await
191                    .map(|x| x.into_df_record_batch())
192                    .map_err(Into::into)
193            }),
194        ));
195        Ok(Box::pin(
196            RecordBatchStreamAdapter::try_new(stream)
197                .map_err(BoxedError::new)
198                .context(InternalSnafu)?,
199        ))
200    }
201}
202
203struct InformationSchemaPartitionsBuilder {
204    schema: SchemaRef,
205    catalog_name: String,
206    catalog_manager: Weak<dyn CatalogManager>,
207
208    catalog_names: StringVectorBuilder,
209    schema_names: StringVectorBuilder,
210    table_names: StringVectorBuilder,
211    partition_names: StringVectorBuilder,
212    partition_ordinal_positions: Int64VectorBuilder,
213    partition_expressions: StringVectorBuilder,
214    partition_descriptions: StringVectorBuilder,
215    create_times: TimestampSecondVectorBuilder,
216    partition_ids: UInt64VectorBuilder,
217}
218
219impl InformationSchemaPartitionsBuilder {
220    fn new(
221        schema: SchemaRef,
222        catalog_name: String,
223        catalog_manager: Weak<dyn CatalogManager>,
224    ) -> Self {
225        Self {
226            schema,
227            catalog_name,
228            catalog_manager,
229            catalog_names: StringVectorBuilder::with_capacity(INIT_CAPACITY),
230            schema_names: StringVectorBuilder::with_capacity(INIT_CAPACITY),
231            table_names: StringVectorBuilder::with_capacity(INIT_CAPACITY),
232            partition_names: StringVectorBuilder::with_capacity(INIT_CAPACITY),
233            partition_ordinal_positions: Int64VectorBuilder::with_capacity(INIT_CAPACITY),
234            partition_expressions: StringVectorBuilder::with_capacity(INIT_CAPACITY),
235            partition_descriptions: StringVectorBuilder::with_capacity(INIT_CAPACITY),
236            create_times: TimestampSecondVectorBuilder::with_capacity(INIT_CAPACITY),
237            partition_ids: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
238        }
239    }
240
241    /// Construct the `information_schema.partitions` virtual table
242    async fn make_partitions(&mut self, request: Option<ScanRequest>) -> Result<RecordBatch> {
243        let catalog_name = self.catalog_name.clone();
244        let catalog_manager = self
245            .catalog_manager
246            .upgrade()
247            .context(UpgradeWeakCatalogManagerRefSnafu)?;
248
249        let partition_manager = catalog_manager
250            .as_any()
251            .downcast_ref::<KvBackendCatalogManager>()
252            .map(|catalog_manager| catalog_manager.partition_manager())
253            .context(PartitionManagerNotFoundSnafu)?;
254
255        let predicates = Predicates::from_scan_request(&request);
256
257        for schema_name in catalog_manager.schema_names(&catalog_name, None).await? {
258            let table_info_stream = catalog_manager
259                .tables(&catalog_name, &schema_name, None)
260                .try_filter_map(|t| async move {
261                    let table_info = t.table_info();
262                    if table_info.table_type == TableType::Temporary {
263                        Ok(None)
264                    } else {
265                        Ok(Some(table_info))
266                    }
267                });
268
269            const BATCH_SIZE: usize = 128;
270
271            // Split table infos into chunks
272            let mut table_info_chunks = pin!(table_info_stream.ready_chunks(BATCH_SIZE));
273
274            while let Some(table_infos) = table_info_chunks.next().await {
275                let table_infos = table_infos.into_iter().collect::<Result<Vec<_>>>()?;
276                let table_ids: Vec<TableId> =
277                    table_infos.iter().map(|info| info.ident.table_id).collect();
278
279                let mut table_partitions = partition_manager
280                    .batch_find_table_partitions(&table_ids)
281                    .await
282                    .context(FindPartitionsSnafu)?;
283
284                for table_info in table_infos {
285                    let partitions = table_partitions
286                        .remove(&table_info.ident.table_id)
287                        .unwrap_or(vec![]);
288
289                    self.add_partitions(
290                        &predicates,
291                        &table_info,
292                        &catalog_name,
293                        &schema_name,
294                        &table_info.name,
295                        &partitions,
296                    );
297                }
298            }
299        }
300
301        self.finish()
302    }
303
304    #[allow(clippy::too_many_arguments)]
305    fn add_partitions(
306        &mut self,
307        predicates: &Predicates,
308        table_info: &TableInfo,
309        catalog_name: &str,
310        schema_name: &str,
311        table_name: &str,
312        partitions: &[PartitionInfo],
313    ) {
314        let row = [
315            (TABLE_CATALOG, &Value::from(catalog_name)),
316            (TABLE_SCHEMA, &Value::from(schema_name)),
317            (TABLE_NAME, &Value::from(table_name)),
318        ];
319
320        if !predicates.eval(&row) {
321            return;
322        }
323
324        // Get partition column names (shared by all partitions)
325        // In MySQL, PARTITION_EXPRESSION is the partitioning function expression (e.g., column name)
326        let partition_columns: String = table_info
327            .meta
328            .partition_column_names()
329            .cloned()
330            .collect::<Vec<_>>()
331            .join(", ");
332
333        let partition_expr_str = if partition_columns.is_empty() {
334            None
335        } else {
336            Some(partition_columns)
337        };
338
339        for (index, partition) in partitions.iter().enumerate() {
340            let partition_name = format!("p{index}");
341
342            self.catalog_names.push(Some(catalog_name));
343            self.schema_names.push(Some(schema_name));
344            self.table_names.push(Some(table_name));
345            self.partition_names.push(Some(&partition_name));
346            self.partition_ordinal_positions
347                .push(Some((index + 1) as i64));
348            // PARTITION_EXPRESSION: partition column names (same for all partitions)
349            self.partition_expressions
350                .push(partition_expr_str.as_deref());
351            // PARTITION_DESCRIPTION: partition boundary expression (different for each partition)
352            let description = partition.partition_expr.as_ref().map(|e| e.to_string());
353            self.partition_descriptions.push(description.as_deref());
354            self.create_times.push(Some(TimestampSecond::from(
355                table_info.meta.created_on.timestamp(),
356            )));
357            self.partition_ids.push(Some(partition.id.as_u64()));
358        }
359    }
360
361    fn finish(&mut self) -> Result<RecordBatch> {
362        let rows_num = self.catalog_names.len();
363
364        let null_string_vector = Arc::new(ConstantVector::new(
365            Arc::new(StringVector::from(vec![None as Option<&str>])),
366            rows_num,
367        ));
368        let null_i64_vector = Arc::new(ConstantVector::new(
369            Arc::new(Int64Vector::from(vec![None])),
370            rows_num,
371        ));
372        let null_timestamp_second_vector = Arc::new(ConstantVector::new(
373            Arc::new(TimestampSecondVector::from(vec![None])),
374            rows_num,
375        ));
376        let partition_methods = Arc::new(ConstantVector::new(
377            Arc::new(StringVector::from(vec![Some("RANGE")])),
378            rows_num,
379        ));
380
381        let columns: Vec<VectorRef> = vec![
382            Arc::new(self.catalog_names.finish()),
383            Arc::new(self.schema_names.finish()),
384            Arc::new(self.table_names.finish()),
385            Arc::new(self.partition_names.finish()),
386            null_string_vector.clone(),
387            Arc::new(self.partition_ordinal_positions.finish()),
388            null_i64_vector.clone(),
389            partition_methods,
390            null_string_vector.clone(),
391            Arc::new(self.partition_expressions.finish()),
392            null_string_vector.clone(),
393            Arc::new(self.partition_descriptions.finish()),
394            // TODO(dennis): rows and index statistics info
395            null_i64_vector.clone(),
396            null_i64_vector.clone(),
397            null_i64_vector.clone(),
398            null_i64_vector.clone(),
399            null_i64_vector.clone(),
400            null_i64_vector.clone(),
401            Arc::new(self.create_times.finish()),
402            // TODO(dennis): supports update_time
403            null_timestamp_second_vector.clone(),
404            null_timestamp_second_vector,
405            null_i64_vector,
406            null_string_vector.clone(),
407            null_string_vector.clone(),
408            null_string_vector,
409            Arc::new(self.partition_ids.finish()),
410        ];
411        RecordBatch::new(self.schema.clone(), columns).context(CreateRecordBatchSnafu)
412    }
413}
414
415impl DfPartitionStream for InformationSchemaPartitions {
416    fn schema(&self) -> &ArrowSchemaRef {
417        self.schema.arrow_schema()
418    }
419
420    fn execute(&self, _: Arc<TaskContext>) -> DfSendableRecordBatchStream {
421        let schema = self.schema.arrow_schema().clone();
422        let mut builder = self.builder();
423        Box::pin(DfRecordBatchStreamAdapter::new(
424            schema,
425            futures::stream::once(async move {
426                builder
427                    .make_partitions(None)
428                    .await
429                    .map(|x| x.into_df_record_batch())
430                    .map_err(Into::into)
431            }),
432        ))
433    }
434}