catalog/system_schema/information_schema/
partitions.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use core::pin::pin;
16use std::sync::{Arc, Weak};
17
18use arrow_schema::SchemaRef as ArrowSchemaRef;
19use common_catalog::consts::INFORMATION_SCHEMA_PARTITIONS_TABLE_ID;
20use common_error::ext::BoxedError;
21use common_recordbatch::adapter::RecordBatchStreamAdapter;
22use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
23use datafusion::execution::TaskContext;
24use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream;
25use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter;
26use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream;
27use datatypes::prelude::{ConcreteDataType, ScalarVectorBuilder, VectorRef};
28use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
29use datatypes::timestamp::TimestampSecond;
30use datatypes::value::Value;
31use datatypes::vectors::{
32    ConstantVector, Int64Vector, Int64VectorBuilder, MutableVector, StringVector,
33    StringVectorBuilder, TimestampSecondVector, TimestampSecondVectorBuilder, UInt64VectorBuilder,
34};
35use futures::{StreamExt, TryStreamExt};
36use partition::manager::PartitionInfo;
37use snafu::{OptionExt, ResultExt};
38use store_api::storage::{ScanRequest, TableId};
39use table::metadata::{TableInfo, TableType};
40
41use crate::CatalogManager;
42use crate::error::{
43    CreateRecordBatchSnafu, FindPartitionsSnafu, InternalSnafu, PartitionManagerNotFoundSnafu,
44    Result, UpgradeWeakCatalogManagerRefSnafu,
45};
46use crate::kvbackend::KvBackendCatalogManager;
47use crate::system_schema::information_schema::{InformationTable, PARTITIONS, Predicates};
48
49const TABLE_CATALOG: &str = "table_catalog";
50const TABLE_SCHEMA: &str = "table_schema";
51const TABLE_NAME: &str = "table_name";
52const PARTITION_NAME: &str = "partition_name";
53const PARTITION_EXPRESSION: &str = "partition_expression";
54/// The region id
55const GREPTIME_PARTITION_ID: &str = "greptime_partition_id";
56const INIT_CAPACITY: usize = 42;
57
58/// The `PARTITIONS` table provides information about partitioned tables.
59/// See https://dev.mysql.com/doc/refman/8.0/en/information-schema-partitions-table.html
60/// We provide an extral column `greptime_partition_id` for GreptimeDB region id.
61#[derive(Debug)]
62pub(super) struct InformationSchemaPartitions {
63    schema: SchemaRef,
64    catalog_name: String,
65    catalog_manager: Weak<dyn CatalogManager>,
66}
67
68impl InformationSchemaPartitions {
69    pub(super) fn new(catalog_name: String, catalog_manager: Weak<dyn CatalogManager>) -> Self {
70        Self {
71            schema: Self::schema(),
72            catalog_name,
73            catalog_manager,
74        }
75    }
76
77    pub(crate) fn schema() -> SchemaRef {
78        Arc::new(Schema::new(vec![
79            ColumnSchema::new(TABLE_CATALOG, ConcreteDataType::string_datatype(), false),
80            ColumnSchema::new(TABLE_SCHEMA, ConcreteDataType::string_datatype(), false),
81            ColumnSchema::new(TABLE_NAME, ConcreteDataType::string_datatype(), false),
82            ColumnSchema::new(PARTITION_NAME, ConcreteDataType::string_datatype(), false),
83            ColumnSchema::new(
84                "subpartition_name",
85                ConcreteDataType::string_datatype(),
86                true,
87            ),
88            ColumnSchema::new(
89                "partition_ordinal_position",
90                ConcreteDataType::int64_datatype(),
91                true,
92            ),
93            ColumnSchema::new(
94                "subpartition_ordinal_position",
95                ConcreteDataType::int64_datatype(),
96                true,
97            ),
98            ColumnSchema::new(
99                "partition_method",
100                ConcreteDataType::string_datatype(),
101                true,
102            ),
103            ColumnSchema::new(
104                "subpartition_method",
105                ConcreteDataType::string_datatype(),
106                true,
107            ),
108            ColumnSchema::new(
109                PARTITION_EXPRESSION,
110                ConcreteDataType::string_datatype(),
111                true,
112            ),
113            ColumnSchema::new(
114                "subpartition_expression",
115                ConcreteDataType::string_datatype(),
116                true,
117            ),
118            ColumnSchema::new(
119                "partition_description",
120                ConcreteDataType::string_datatype(),
121                true,
122            ),
123            ColumnSchema::new("table_rows", ConcreteDataType::int64_datatype(), true),
124            ColumnSchema::new("avg_row_length", ConcreteDataType::int64_datatype(), true),
125            ColumnSchema::new("data_length", ConcreteDataType::int64_datatype(), true),
126            ColumnSchema::new("max_data_length", ConcreteDataType::int64_datatype(), true),
127            ColumnSchema::new("index_length", ConcreteDataType::int64_datatype(), true),
128            ColumnSchema::new("data_free", ConcreteDataType::int64_datatype(), true),
129            ColumnSchema::new(
130                "create_time",
131                ConcreteDataType::timestamp_second_datatype(),
132                true,
133            ),
134            ColumnSchema::new(
135                "update_time",
136                ConcreteDataType::timestamp_second_datatype(),
137                true,
138            ),
139            ColumnSchema::new(
140                "check_time",
141                ConcreteDataType::timestamp_second_datatype(),
142                true,
143            ),
144            ColumnSchema::new("checksum", ConcreteDataType::int64_datatype(), true),
145            ColumnSchema::new(
146                "partition_comment",
147                ConcreteDataType::string_datatype(),
148                true,
149            ),
150            ColumnSchema::new("nodegroup", ConcreteDataType::string_datatype(), true),
151            ColumnSchema::new("tablespace_name", ConcreteDataType::string_datatype(), true),
152            ColumnSchema::new(
153                GREPTIME_PARTITION_ID,
154                ConcreteDataType::uint64_datatype(),
155                true,
156            ),
157        ]))
158    }
159
160    fn builder(&self) -> InformationSchemaPartitionsBuilder {
161        InformationSchemaPartitionsBuilder::new(
162            self.schema.clone(),
163            self.catalog_name.clone(),
164            self.catalog_manager.clone(),
165        )
166    }
167}
168
169impl InformationTable for InformationSchemaPartitions {
170    fn table_id(&self) -> TableId {
171        INFORMATION_SCHEMA_PARTITIONS_TABLE_ID
172    }
173
174    fn table_name(&self) -> &'static str {
175        PARTITIONS
176    }
177
178    fn schema(&self) -> SchemaRef {
179        self.schema.clone()
180    }
181
182    fn to_stream(&self, request: ScanRequest) -> Result<SendableRecordBatchStream> {
183        let schema = self.schema.arrow_schema().clone();
184        let mut builder = self.builder();
185        let stream = Box::pin(DfRecordBatchStreamAdapter::new(
186            schema,
187            futures::stream::once(async move {
188                builder
189                    .make_partitions(Some(request))
190                    .await
191                    .map(|x| x.into_df_record_batch())
192                    .map_err(Into::into)
193            }),
194        ));
195        Ok(Box::pin(
196            RecordBatchStreamAdapter::try_new(stream)
197                .map_err(BoxedError::new)
198                .context(InternalSnafu)?,
199        ))
200    }
201}
202
203struct InformationSchemaPartitionsBuilder {
204    schema: SchemaRef,
205    catalog_name: String,
206    catalog_manager: Weak<dyn CatalogManager>,
207
208    catalog_names: StringVectorBuilder,
209    schema_names: StringVectorBuilder,
210    table_names: StringVectorBuilder,
211    partition_names: StringVectorBuilder,
212    partition_ordinal_positions: Int64VectorBuilder,
213    partition_expressions: StringVectorBuilder,
214    create_times: TimestampSecondVectorBuilder,
215    partition_ids: UInt64VectorBuilder,
216}
217
218impl InformationSchemaPartitionsBuilder {
219    fn new(
220        schema: SchemaRef,
221        catalog_name: String,
222        catalog_manager: Weak<dyn CatalogManager>,
223    ) -> Self {
224        Self {
225            schema,
226            catalog_name,
227            catalog_manager,
228            catalog_names: StringVectorBuilder::with_capacity(INIT_CAPACITY),
229            schema_names: StringVectorBuilder::with_capacity(INIT_CAPACITY),
230            table_names: StringVectorBuilder::with_capacity(INIT_CAPACITY),
231            partition_names: StringVectorBuilder::with_capacity(INIT_CAPACITY),
232            partition_ordinal_positions: Int64VectorBuilder::with_capacity(INIT_CAPACITY),
233            partition_expressions: StringVectorBuilder::with_capacity(INIT_CAPACITY),
234            create_times: TimestampSecondVectorBuilder::with_capacity(INIT_CAPACITY),
235            partition_ids: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
236        }
237    }
238
239    /// Construct the `information_schema.partitions` virtual table
240    async fn make_partitions(&mut self, request: Option<ScanRequest>) -> Result<RecordBatch> {
241        let catalog_name = self.catalog_name.clone();
242        let catalog_manager = self
243            .catalog_manager
244            .upgrade()
245            .context(UpgradeWeakCatalogManagerRefSnafu)?;
246
247        let partition_manager = catalog_manager
248            .as_any()
249            .downcast_ref::<KvBackendCatalogManager>()
250            .map(|catalog_manager| catalog_manager.partition_manager())
251            .context(PartitionManagerNotFoundSnafu)?;
252
253        let predicates = Predicates::from_scan_request(&request);
254
255        for schema_name in catalog_manager.schema_names(&catalog_name, None).await? {
256            let table_info_stream = catalog_manager
257                .tables(&catalog_name, &schema_name, None)
258                .try_filter_map(|t| async move {
259                    let table_info = t.table_info();
260                    if table_info.table_type == TableType::Temporary {
261                        Ok(None)
262                    } else {
263                        Ok(Some(table_info))
264                    }
265                });
266
267            const BATCH_SIZE: usize = 128;
268
269            // Split table infos into chunks
270            let mut table_info_chunks = pin!(table_info_stream.ready_chunks(BATCH_SIZE));
271
272            while let Some(table_infos) = table_info_chunks.next().await {
273                let table_infos = table_infos.into_iter().collect::<Result<Vec<_>>>()?;
274                let table_ids: Vec<TableId> =
275                    table_infos.iter().map(|info| info.ident.table_id).collect();
276
277                let mut table_partitions = partition_manager
278                    .batch_find_table_partitions(&table_ids)
279                    .await
280                    .context(FindPartitionsSnafu)?;
281
282                for table_info in table_infos {
283                    let partitions = table_partitions
284                        .remove(&table_info.ident.table_id)
285                        .unwrap_or(vec![]);
286
287                    self.add_partitions(
288                        &predicates,
289                        &table_info,
290                        &catalog_name,
291                        &schema_name,
292                        &table_info.name,
293                        &partitions,
294                    );
295                }
296            }
297        }
298
299        self.finish()
300    }
301
302    #[allow(clippy::too_many_arguments)]
303    fn add_partitions(
304        &mut self,
305        predicates: &Predicates,
306        table_info: &TableInfo,
307        catalog_name: &str,
308        schema_name: &str,
309        table_name: &str,
310        partitions: &[PartitionInfo],
311    ) {
312        let row = [
313            (TABLE_CATALOG, &Value::from(catalog_name)),
314            (TABLE_SCHEMA, &Value::from(schema_name)),
315            (TABLE_NAME, &Value::from(table_name)),
316        ];
317
318        if !predicates.eval(&row) {
319            return;
320        }
321
322        for (index, partition) in partitions.iter().enumerate() {
323            let partition_name = format!("p{index}");
324
325            self.catalog_names.push(Some(catalog_name));
326            self.schema_names.push(Some(schema_name));
327            self.table_names.push(Some(table_name));
328            self.partition_names.push(Some(&partition_name));
329            self.partition_ordinal_positions
330                .push(Some((index + 1) as i64));
331            let expression = partition.partition_expr.as_ref().map(|e| e.to_string());
332            self.partition_expressions.push(expression.as_deref());
333            self.create_times.push(Some(TimestampSecond::from(
334                table_info.meta.created_on.timestamp(),
335            )));
336            self.partition_ids.push(Some(partition.id.as_u64()));
337        }
338    }
339
340    fn finish(&mut self) -> Result<RecordBatch> {
341        let rows_num = self.catalog_names.len();
342
343        let null_string_vector = Arc::new(ConstantVector::new(
344            Arc::new(StringVector::from(vec![None as Option<&str>])),
345            rows_num,
346        ));
347        let null_i64_vector = Arc::new(ConstantVector::new(
348            Arc::new(Int64Vector::from(vec![None])),
349            rows_num,
350        ));
351        let null_timestamp_second_vector = Arc::new(ConstantVector::new(
352            Arc::new(TimestampSecondVector::from(vec![None])),
353            rows_num,
354        ));
355        let partition_methods = Arc::new(ConstantVector::new(
356            Arc::new(StringVector::from(vec![Some("RANGE")])),
357            rows_num,
358        ));
359
360        let columns: Vec<VectorRef> = vec![
361            Arc::new(self.catalog_names.finish()),
362            Arc::new(self.schema_names.finish()),
363            Arc::new(self.table_names.finish()),
364            Arc::new(self.partition_names.finish()),
365            null_string_vector.clone(),
366            Arc::new(self.partition_ordinal_positions.finish()),
367            null_i64_vector.clone(),
368            partition_methods,
369            null_string_vector.clone(),
370            Arc::new(self.partition_expressions.finish()),
371            null_string_vector.clone(),
372            null_string_vector.clone(),
373            // TODO(dennis): rows and index statistics info
374            null_i64_vector.clone(),
375            null_i64_vector.clone(),
376            null_i64_vector.clone(),
377            null_i64_vector.clone(),
378            null_i64_vector.clone(),
379            null_i64_vector.clone(),
380            Arc::new(self.create_times.finish()),
381            // TODO(dennis): supports update_time
382            null_timestamp_second_vector.clone(),
383            null_timestamp_second_vector,
384            null_i64_vector,
385            null_string_vector.clone(),
386            null_string_vector.clone(),
387            null_string_vector,
388            Arc::new(self.partition_ids.finish()),
389        ];
390        RecordBatch::new(self.schema.clone(), columns).context(CreateRecordBatchSnafu)
391    }
392}
393
394impl DfPartitionStream for InformationSchemaPartitions {
395    fn schema(&self) -> &ArrowSchemaRef {
396        self.schema.arrow_schema()
397    }
398
399    fn execute(&self, _: Arc<TaskContext>) -> DfSendableRecordBatchStream {
400        let schema = self.schema.arrow_schema().clone();
401        let mut builder = self.builder();
402        Box::pin(DfRecordBatchStreamAdapter::new(
403            schema,
404            futures::stream::once(async move {
405                builder
406                    .make_partitions(None)
407                    .await
408                    .map(|x| x.into_df_record_batch())
409                    .map_err(Into::into)
410            }),
411        ))
412    }
413}