catalog/system_schema/information_schema/
partitions.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use core::pin::pin;
16use std::sync::{Arc, Weak};
17
18use arrow_schema::SchemaRef as ArrowSchemaRef;
19use common_catalog::consts::INFORMATION_SCHEMA_PARTITIONS_TABLE_ID;
20use common_error::ext::BoxedError;
21use common_recordbatch::adapter::RecordBatchStreamAdapter;
22use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
23use datafusion::execution::TaskContext;
24use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter;
25use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream;
26use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream;
27use datatypes::prelude::{ConcreteDataType, ScalarVectorBuilder, VectorRef};
28use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
29use datatypes::timestamp::TimestampMicrosecond;
30use datatypes::value::Value;
31use datatypes::vectors::{
32    ConstantVector, Int64Vector, Int64VectorBuilder, MutableVector, StringVector,
33    StringVectorBuilder, TimestampMicrosecondVector, TimestampMicrosecondVectorBuilder,
34    UInt64VectorBuilder,
35};
36use futures::{StreamExt, TryStreamExt};
37use partition::manager::PartitionInfo;
38use snafu::{OptionExt, ResultExt};
39use store_api::storage::{ScanRequest, TableId};
40use table::metadata::{TableInfo, TableType};
41
42use crate::error::{
43    CreateRecordBatchSnafu, FindPartitionsSnafu, InternalSnafu, PartitionManagerNotFoundSnafu,
44    Result, UpgradeWeakCatalogManagerRefSnafu,
45};
46use crate::kvbackend::KvBackendCatalogManager;
47use crate::system_schema::information_schema::{InformationTable, Predicates, PARTITIONS};
48use crate::CatalogManager;
49
50const TABLE_CATALOG: &str = "table_catalog";
51const TABLE_SCHEMA: &str = "table_schema";
52const TABLE_NAME: &str = "table_name";
53const PARTITION_NAME: &str = "partition_name";
54const PARTITION_EXPRESSION: &str = "partition_expression";
55/// The region id
56const GREPTIME_PARTITION_ID: &str = "greptime_partition_id";
57const INIT_CAPACITY: usize = 42;
58
59/// The `PARTITIONS` table provides information about partitioned tables.
60/// See https://dev.mysql.com/doc/refman/8.0/en/information-schema-partitions-table.html
61/// We provide an extral column `greptime_partition_id` for GreptimeDB region id.
62#[derive(Debug)]
63pub(super) struct InformationSchemaPartitions {
64    schema: SchemaRef,
65    catalog_name: String,
66    catalog_manager: Weak<dyn CatalogManager>,
67}
68
69impl InformationSchemaPartitions {
70    pub(super) fn new(catalog_name: String, catalog_manager: Weak<dyn CatalogManager>) -> Self {
71        Self {
72            schema: Self::schema(),
73            catalog_name,
74            catalog_manager,
75        }
76    }
77
78    pub(crate) fn schema() -> SchemaRef {
79        Arc::new(Schema::new(vec![
80            ColumnSchema::new(TABLE_CATALOG, ConcreteDataType::string_datatype(), false),
81            ColumnSchema::new(TABLE_SCHEMA, ConcreteDataType::string_datatype(), false),
82            ColumnSchema::new(TABLE_NAME, ConcreteDataType::string_datatype(), false),
83            ColumnSchema::new(PARTITION_NAME, ConcreteDataType::string_datatype(), false),
84            ColumnSchema::new(
85                "subpartition_name",
86                ConcreteDataType::string_datatype(),
87                true,
88            ),
89            ColumnSchema::new(
90                "partition_ordinal_position",
91                ConcreteDataType::int64_datatype(),
92                true,
93            ),
94            ColumnSchema::new(
95                "subpartition_ordinal_position",
96                ConcreteDataType::int64_datatype(),
97                true,
98            ),
99            ColumnSchema::new(
100                "partition_method",
101                ConcreteDataType::string_datatype(),
102                true,
103            ),
104            ColumnSchema::new(
105                "subpartition_method",
106                ConcreteDataType::string_datatype(),
107                true,
108            ),
109            ColumnSchema::new(
110                PARTITION_EXPRESSION,
111                ConcreteDataType::string_datatype(),
112                true,
113            ),
114            ColumnSchema::new(
115                "subpartition_expression",
116                ConcreteDataType::string_datatype(),
117                true,
118            ),
119            ColumnSchema::new(
120                "partition_description",
121                ConcreteDataType::string_datatype(),
122                true,
123            ),
124            ColumnSchema::new("table_rows", ConcreteDataType::int64_datatype(), true),
125            ColumnSchema::new("avg_row_length", ConcreteDataType::int64_datatype(), true),
126            ColumnSchema::new("data_length", ConcreteDataType::int64_datatype(), true),
127            ColumnSchema::new("max_data_length", ConcreteDataType::int64_datatype(), true),
128            ColumnSchema::new("index_length", ConcreteDataType::int64_datatype(), true),
129            ColumnSchema::new("data_free", ConcreteDataType::int64_datatype(), true),
130            ColumnSchema::new(
131                "create_time",
132                ConcreteDataType::timestamp_microsecond_datatype(),
133                true,
134            ),
135            ColumnSchema::new(
136                "update_time",
137                ConcreteDataType::timestamp_microsecond_datatype(),
138                true,
139            ),
140            ColumnSchema::new(
141                "check_time",
142                ConcreteDataType::timestamp_microsecond_datatype(),
143                true,
144            ),
145            ColumnSchema::new("checksum", ConcreteDataType::int64_datatype(), true),
146            ColumnSchema::new(
147                "partition_comment",
148                ConcreteDataType::string_datatype(),
149                true,
150            ),
151            ColumnSchema::new("nodegroup", ConcreteDataType::string_datatype(), true),
152            ColumnSchema::new("tablespace_name", ConcreteDataType::string_datatype(), true),
153            ColumnSchema::new(
154                GREPTIME_PARTITION_ID,
155                ConcreteDataType::uint64_datatype(),
156                true,
157            ),
158        ]))
159    }
160
161    fn builder(&self) -> InformationSchemaPartitionsBuilder {
162        InformationSchemaPartitionsBuilder::new(
163            self.schema.clone(),
164            self.catalog_name.clone(),
165            self.catalog_manager.clone(),
166        )
167    }
168}
169
170impl InformationTable for InformationSchemaPartitions {
171    fn table_id(&self) -> TableId {
172        INFORMATION_SCHEMA_PARTITIONS_TABLE_ID
173    }
174
175    fn table_name(&self) -> &'static str {
176        PARTITIONS
177    }
178
179    fn schema(&self) -> SchemaRef {
180        self.schema.clone()
181    }
182
183    fn to_stream(&self, request: ScanRequest) -> Result<SendableRecordBatchStream> {
184        let schema = self.schema.arrow_schema().clone();
185        let mut builder = self.builder();
186        let stream = Box::pin(DfRecordBatchStreamAdapter::new(
187            schema,
188            futures::stream::once(async move {
189                builder
190                    .make_partitions(Some(request))
191                    .await
192                    .map(|x| x.into_df_record_batch())
193                    .map_err(Into::into)
194            }),
195        ));
196        Ok(Box::pin(
197            RecordBatchStreamAdapter::try_new(stream)
198                .map_err(BoxedError::new)
199                .context(InternalSnafu)?,
200        ))
201    }
202}
203
204struct InformationSchemaPartitionsBuilder {
205    schema: SchemaRef,
206    catalog_name: String,
207    catalog_manager: Weak<dyn CatalogManager>,
208
209    catalog_names: StringVectorBuilder,
210    schema_names: StringVectorBuilder,
211    table_names: StringVectorBuilder,
212    partition_names: StringVectorBuilder,
213    partition_ordinal_positions: Int64VectorBuilder,
214    partition_expressions: StringVectorBuilder,
215    create_times: TimestampMicrosecondVectorBuilder,
216    partition_ids: UInt64VectorBuilder,
217}
218
219impl InformationSchemaPartitionsBuilder {
220    fn new(
221        schema: SchemaRef,
222        catalog_name: String,
223        catalog_manager: Weak<dyn CatalogManager>,
224    ) -> Self {
225        Self {
226            schema,
227            catalog_name,
228            catalog_manager,
229            catalog_names: StringVectorBuilder::with_capacity(INIT_CAPACITY),
230            schema_names: StringVectorBuilder::with_capacity(INIT_CAPACITY),
231            table_names: StringVectorBuilder::with_capacity(INIT_CAPACITY),
232            partition_names: StringVectorBuilder::with_capacity(INIT_CAPACITY),
233            partition_ordinal_positions: Int64VectorBuilder::with_capacity(INIT_CAPACITY),
234            partition_expressions: StringVectorBuilder::with_capacity(INIT_CAPACITY),
235            create_times: TimestampMicrosecondVectorBuilder::with_capacity(INIT_CAPACITY),
236            partition_ids: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
237        }
238    }
239
240    /// Construct the `information_schema.partitions` virtual table
241    async fn make_partitions(&mut self, request: Option<ScanRequest>) -> Result<RecordBatch> {
242        let catalog_name = self.catalog_name.clone();
243        let catalog_manager = self
244            .catalog_manager
245            .upgrade()
246            .context(UpgradeWeakCatalogManagerRefSnafu)?;
247
248        let partition_manager = catalog_manager
249            .as_any()
250            .downcast_ref::<KvBackendCatalogManager>()
251            .map(|catalog_manager| catalog_manager.partition_manager())
252            .context(PartitionManagerNotFoundSnafu)?;
253
254        let predicates = Predicates::from_scan_request(&request);
255
256        for schema_name in catalog_manager.schema_names(&catalog_name, None).await? {
257            let table_info_stream = catalog_manager
258                .tables(&catalog_name, &schema_name, None)
259                .try_filter_map(|t| async move {
260                    let table_info = t.table_info();
261                    if table_info.table_type == TableType::Temporary {
262                        Ok(None)
263                    } else {
264                        Ok(Some(table_info))
265                    }
266                });
267
268            const BATCH_SIZE: usize = 128;
269
270            // Split table infos into chunks
271            let mut table_info_chunks = pin!(table_info_stream.ready_chunks(BATCH_SIZE));
272
273            while let Some(table_infos) = table_info_chunks.next().await {
274                let table_infos = table_infos.into_iter().collect::<Result<Vec<_>>>()?;
275                let table_ids: Vec<TableId> =
276                    table_infos.iter().map(|info| info.ident.table_id).collect();
277
278                let mut table_partitions = partition_manager
279                    .batch_find_table_partitions(&table_ids)
280                    .await
281                    .context(FindPartitionsSnafu)?;
282
283                for table_info in table_infos {
284                    let partitions = table_partitions
285                        .remove(&table_info.ident.table_id)
286                        .unwrap_or(vec![]);
287
288                    self.add_partitions(
289                        &predicates,
290                        &table_info,
291                        &catalog_name,
292                        &schema_name,
293                        &table_info.name,
294                        &partitions,
295                    );
296                }
297            }
298        }
299
300        self.finish()
301    }
302
303    #[allow(clippy::too_many_arguments)]
304    fn add_partitions(
305        &mut self,
306        predicates: &Predicates,
307        table_info: &TableInfo,
308        catalog_name: &str,
309        schema_name: &str,
310        table_name: &str,
311        partitions: &[PartitionInfo],
312    ) {
313        let row = [
314            (TABLE_CATALOG, &Value::from(catalog_name)),
315            (TABLE_SCHEMA, &Value::from(schema_name)),
316            (TABLE_NAME, &Value::from(table_name)),
317        ];
318
319        if !predicates.eval(&row) {
320            return;
321        }
322
323        for (index, partition) in partitions.iter().enumerate() {
324            let partition_name = format!("p{index}");
325
326            self.catalog_names.push(Some(catalog_name));
327            self.schema_names.push(Some(schema_name));
328            self.table_names.push(Some(table_name));
329            self.partition_names.push(Some(&partition_name));
330            self.partition_ordinal_positions
331                .push(Some((index + 1) as i64));
332            let expressions = if partition.partition.partition_columns().is_empty() {
333                None
334            } else {
335                Some(partition.partition.to_string())
336            };
337
338            self.partition_expressions.push(expressions.as_deref());
339            self.create_times.push(Some(TimestampMicrosecond::from(
340                table_info.meta.created_on.timestamp_millis(),
341            )));
342            self.partition_ids.push(Some(partition.id.as_u64()));
343        }
344    }
345
346    fn finish(&mut self) -> Result<RecordBatch> {
347        let rows_num = self.catalog_names.len();
348
349        let null_string_vector = Arc::new(ConstantVector::new(
350            Arc::new(StringVector::from(vec![None as Option<&str>])),
351            rows_num,
352        ));
353        let null_i64_vector = Arc::new(ConstantVector::new(
354            Arc::new(Int64Vector::from(vec![None])),
355            rows_num,
356        ));
357        let null_timestampmicrosecond_vector = Arc::new(ConstantVector::new(
358            Arc::new(TimestampMicrosecondVector::from(vec![None])),
359            rows_num,
360        ));
361        let partition_methods = Arc::new(ConstantVector::new(
362            Arc::new(StringVector::from(vec![Some("RANGE")])),
363            rows_num,
364        ));
365
366        let columns: Vec<VectorRef> = vec![
367            Arc::new(self.catalog_names.finish()),
368            Arc::new(self.schema_names.finish()),
369            Arc::new(self.table_names.finish()),
370            Arc::new(self.partition_names.finish()),
371            null_string_vector.clone(),
372            Arc::new(self.partition_ordinal_positions.finish()),
373            null_i64_vector.clone(),
374            partition_methods,
375            null_string_vector.clone(),
376            Arc::new(self.partition_expressions.finish()),
377            null_string_vector.clone(),
378            null_string_vector.clone(),
379            // TODO(dennis): rows and index statistics info
380            null_i64_vector.clone(),
381            null_i64_vector.clone(),
382            null_i64_vector.clone(),
383            null_i64_vector.clone(),
384            null_i64_vector.clone(),
385            null_i64_vector.clone(),
386            Arc::new(self.create_times.finish()),
387            // TODO(dennis): supports update_time
388            null_timestampmicrosecond_vector.clone(),
389            null_timestampmicrosecond_vector,
390            null_i64_vector,
391            null_string_vector.clone(),
392            null_string_vector.clone(),
393            null_string_vector,
394            Arc::new(self.partition_ids.finish()),
395        ];
396        RecordBatch::new(self.schema.clone(), columns).context(CreateRecordBatchSnafu)
397    }
398}
399
400impl DfPartitionStream for InformationSchemaPartitions {
401    fn schema(&self) -> &ArrowSchemaRef {
402        self.schema.arrow_schema()
403    }
404
405    fn execute(&self, _: Arc<TaskContext>) -> DfSendableRecordBatchStream {
406        let schema = self.schema.arrow_schema().clone();
407        let mut builder = self.builder();
408        Box::pin(DfRecordBatchStreamAdapter::new(
409            schema,
410            futures::stream::once(async move {
411                builder
412                    .make_partitions(None)
413                    .await
414                    .map(|x| x.into_df_record_batch())
415                    .map_err(Into::into)
416            }),
417        ))
418    }
419}