catalog/system_schema/information_schema/
region_statistics.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::{Arc, Weak};
16
17use arrow_schema::SchemaRef as ArrowSchemaRef;
18use common_catalog::consts::INFORMATION_SCHEMA_REGION_STATISTICS_TABLE_ID;
19use common_error::ext::BoxedError;
20use common_meta::datanode::RegionStat;
21use common_recordbatch::adapter::RecordBatchStreamAdapter;
22use common_recordbatch::{DfSendableRecordBatchStream, RecordBatch, SendableRecordBatchStream};
23use datafusion::execution::TaskContext;
24use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter;
25use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream;
26use datatypes::prelude::{ConcreteDataType, ScalarVectorBuilder, VectorRef};
27use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
28use datatypes::value::Value;
29use datatypes::vectors::{StringVectorBuilder, UInt32VectorBuilder, UInt64VectorBuilder};
30use snafu::ResultExt;
31use store_api::storage::{ScanRequest, TableId};
32
33use crate::error::{CreateRecordBatchSnafu, InternalSnafu, Result};
34use crate::information_schema::Predicates;
35use crate::system_schema::information_schema::{InformationTable, REGION_STATISTICS};
36use crate::system_schema::utils;
37use crate::CatalogManager;
38
39const REGION_ID: &str = "region_id";
40const TABLE_ID: &str = "table_id";
41const REGION_NUMBER: &str = "region_number";
42const REGION_ROWS: &str = "region_rows";
43const DISK_SIZE: &str = "disk_size";
44const MEMTABLE_SIZE: &str = "memtable_size";
45const MANIFEST_SIZE: &str = "manifest_size";
46const SST_SIZE: &str = "sst_size";
47const SST_NUM: &str = "sst_num";
48const INDEX_SIZE: &str = "index_size";
49const ENGINE: &str = "engine";
50const REGION_ROLE: &str = "region_role";
51
52const INIT_CAPACITY: usize = 42;
53
54/// The `REGION_STATISTICS` table provides information about the region statistics. Including fields:
55///
56/// - `region_id`: The region id.
57/// - `table_id`: The table id.
58/// - `region_number`: The region number.
59/// - `region_rows`: The number of rows in region.
60/// - `memtable_size`: The memtable size in bytes.
61/// - `disk_size`: The approximate disk size in bytes.
62/// - `manifest_size`: The manifest size in bytes.
63/// - `sst_size`: The sst data files size in bytes.
64/// - `index_size`: The sst index files size in bytes.
65/// - `engine`: The engine type.
66/// - `region_role`: The region role.
67#[derive(Debug)]
68pub(super) struct InformationSchemaRegionStatistics {
69    schema: SchemaRef,
70    catalog_manager: Weak<dyn CatalogManager>,
71}
72
73impl InformationSchemaRegionStatistics {
74    pub(super) fn new(catalog_manager: Weak<dyn CatalogManager>) -> Self {
75        Self {
76            schema: Self::schema(),
77            catalog_manager,
78        }
79    }
80
81    pub(crate) fn schema() -> SchemaRef {
82        Arc::new(Schema::new(vec![
83            ColumnSchema::new(REGION_ID, ConcreteDataType::uint64_datatype(), false),
84            ColumnSchema::new(TABLE_ID, ConcreteDataType::uint32_datatype(), false),
85            ColumnSchema::new(REGION_NUMBER, ConcreteDataType::uint32_datatype(), false),
86            ColumnSchema::new(REGION_ROWS, ConcreteDataType::uint64_datatype(), true),
87            ColumnSchema::new(DISK_SIZE, ConcreteDataType::uint64_datatype(), true),
88            ColumnSchema::new(MEMTABLE_SIZE, ConcreteDataType::uint64_datatype(), true),
89            ColumnSchema::new(MANIFEST_SIZE, ConcreteDataType::uint64_datatype(), true),
90            ColumnSchema::new(SST_SIZE, ConcreteDataType::uint64_datatype(), true),
91            ColumnSchema::new(SST_NUM, ConcreteDataType::uint64_datatype(), true),
92            ColumnSchema::new(INDEX_SIZE, ConcreteDataType::uint64_datatype(), true),
93            ColumnSchema::new(ENGINE, ConcreteDataType::string_datatype(), true),
94            ColumnSchema::new(REGION_ROLE, ConcreteDataType::string_datatype(), true),
95        ]))
96    }
97
98    fn builder(&self) -> InformationSchemaRegionStatisticsBuilder {
99        InformationSchemaRegionStatisticsBuilder::new(
100            self.schema.clone(),
101            self.catalog_manager.clone(),
102        )
103    }
104}
105
106impl InformationTable for InformationSchemaRegionStatistics {
107    fn table_id(&self) -> TableId {
108        INFORMATION_SCHEMA_REGION_STATISTICS_TABLE_ID
109    }
110
111    fn table_name(&self) -> &'static str {
112        REGION_STATISTICS
113    }
114
115    fn schema(&self) -> SchemaRef {
116        self.schema.clone()
117    }
118
119    fn to_stream(&self, request: ScanRequest) -> Result<SendableRecordBatchStream> {
120        let schema = self.schema.arrow_schema().clone();
121        let mut builder = self.builder();
122
123        let stream = Box::pin(DfRecordBatchStreamAdapter::new(
124            schema,
125            futures::stream::once(async move {
126                builder
127                    .make_region_statistics(Some(request))
128                    .await
129                    .map(|x| x.into_df_record_batch())
130                    .map_err(Into::into)
131            }),
132        ));
133
134        Ok(Box::pin(
135            RecordBatchStreamAdapter::try_new(stream)
136                .map_err(BoxedError::new)
137                .context(InternalSnafu)?,
138        ))
139    }
140}
141
142struct InformationSchemaRegionStatisticsBuilder {
143    schema: SchemaRef,
144    catalog_manager: Weak<dyn CatalogManager>,
145
146    region_ids: UInt64VectorBuilder,
147    table_ids: UInt32VectorBuilder,
148    region_numbers: UInt32VectorBuilder,
149    region_rows: UInt64VectorBuilder,
150    disk_sizes: UInt64VectorBuilder,
151    memtable_sizes: UInt64VectorBuilder,
152    manifest_sizes: UInt64VectorBuilder,
153    sst_sizes: UInt64VectorBuilder,
154    sst_nums: UInt64VectorBuilder,
155    index_sizes: UInt64VectorBuilder,
156    engines: StringVectorBuilder,
157    region_roles: StringVectorBuilder,
158}
159
160impl InformationSchemaRegionStatisticsBuilder {
161    fn new(schema: SchemaRef, catalog_manager: Weak<dyn CatalogManager>) -> Self {
162        Self {
163            schema,
164            catalog_manager,
165            region_ids: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
166            table_ids: UInt32VectorBuilder::with_capacity(INIT_CAPACITY),
167            region_numbers: UInt32VectorBuilder::with_capacity(INIT_CAPACITY),
168            region_rows: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
169            disk_sizes: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
170            memtable_sizes: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
171            manifest_sizes: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
172            sst_sizes: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
173            sst_nums: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
174            index_sizes: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
175            engines: StringVectorBuilder::with_capacity(INIT_CAPACITY),
176            region_roles: StringVectorBuilder::with_capacity(INIT_CAPACITY),
177        }
178    }
179
180    /// Construct a new `InformationSchemaRegionStatistics` from the collected data.
181    async fn make_region_statistics(
182        &mut self,
183        request: Option<ScanRequest>,
184    ) -> Result<RecordBatch> {
185        let predicates = Predicates::from_scan_request(&request);
186        let information_extension = utils::information_extension(&self.catalog_manager)?;
187        let region_stats = information_extension.region_stats().await?;
188        for region_stat in region_stats {
189            self.add_region_statistic(&predicates, region_stat);
190        }
191        self.finish()
192    }
193
194    fn add_region_statistic(&mut self, predicate: &Predicates, region_stat: RegionStat) {
195        let row = [
196            (REGION_ID, &Value::from(region_stat.id.as_u64())),
197            (TABLE_ID, &Value::from(region_stat.id.table_id())),
198            (REGION_NUMBER, &Value::from(region_stat.id.region_number())),
199            (REGION_ROWS, &Value::from(region_stat.num_rows)),
200            (DISK_SIZE, &Value::from(region_stat.approximate_bytes)),
201            (MEMTABLE_SIZE, &Value::from(region_stat.memtable_size)),
202            (MANIFEST_SIZE, &Value::from(region_stat.manifest_size)),
203            (SST_SIZE, &Value::from(region_stat.sst_size)),
204            (SST_NUM, &Value::from(region_stat.sst_num)),
205            (INDEX_SIZE, &Value::from(region_stat.index_size)),
206            (ENGINE, &Value::from(region_stat.engine.as_str())),
207            (REGION_ROLE, &Value::from(region_stat.role.to_string())),
208        ];
209
210        if !predicate.eval(&row) {
211            return;
212        }
213
214        self.region_ids.push(Some(region_stat.id.as_u64()));
215        self.table_ids.push(Some(region_stat.id.table_id()));
216        self.region_numbers
217            .push(Some(region_stat.id.region_number()));
218        self.region_rows.push(Some(region_stat.num_rows));
219        self.disk_sizes.push(Some(region_stat.approximate_bytes));
220        self.memtable_sizes.push(Some(region_stat.memtable_size));
221        self.manifest_sizes.push(Some(region_stat.manifest_size));
222        self.sst_sizes.push(Some(region_stat.sst_size));
223        self.sst_nums.push(Some(region_stat.sst_num));
224        self.index_sizes.push(Some(region_stat.index_size));
225        self.engines.push(Some(&region_stat.engine));
226        self.region_roles.push(Some(&region_stat.role.to_string()));
227    }
228
229    fn finish(&mut self) -> Result<RecordBatch> {
230        let columns: Vec<VectorRef> = vec![
231            Arc::new(self.region_ids.finish()),
232            Arc::new(self.table_ids.finish()),
233            Arc::new(self.region_numbers.finish()),
234            Arc::new(self.region_rows.finish()),
235            Arc::new(self.disk_sizes.finish()),
236            Arc::new(self.memtable_sizes.finish()),
237            Arc::new(self.manifest_sizes.finish()),
238            Arc::new(self.sst_sizes.finish()),
239            Arc::new(self.sst_nums.finish()),
240            Arc::new(self.index_sizes.finish()),
241            Arc::new(self.engines.finish()),
242            Arc::new(self.region_roles.finish()),
243        ];
244
245        RecordBatch::new(self.schema.clone(), columns).context(CreateRecordBatchSnafu)
246    }
247}
248
249impl DfPartitionStream for InformationSchemaRegionStatistics {
250    fn schema(&self) -> &ArrowSchemaRef {
251        self.schema.arrow_schema()
252    }
253
254    fn execute(&self, _: Arc<TaskContext>) -> DfSendableRecordBatchStream {
255        let schema = self.schema.arrow_schema().clone();
256        let mut builder = self.builder();
257        Box::pin(DfRecordBatchStreamAdapter::new(
258            schema,
259            futures::stream::once(async move {
260                builder
261                    .make_region_statistics(None)
262                    .await
263                    .map(|x| x.into_df_record_batch())
264                    .map_err(Into::into)
265            }),
266        ))
267    }
268}