catalog/system_schema/information_schema/
region_statistics.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::{Arc, Weak};
16
17use arrow_schema::SchemaRef as ArrowSchemaRef;
18use common_catalog::consts::INFORMATION_SCHEMA_REGION_STATISTICS_TABLE_ID;
19use common_error::ext::BoxedError;
20use common_meta::datanode::RegionStat;
21use common_recordbatch::adapter::RecordBatchStreamAdapter;
22use common_recordbatch::{DfSendableRecordBatchStream, RecordBatch, SendableRecordBatchStream};
23use datafusion::execution::TaskContext;
24use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter;
25use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream;
26use datatypes::prelude::{ConcreteDataType, ScalarVectorBuilder, VectorRef};
27use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
28use datatypes::value::Value;
29use datatypes::vectors::{StringVectorBuilder, UInt32VectorBuilder, UInt64VectorBuilder};
30use snafu::ResultExt;
31use store_api::storage::{ScanRequest, TableId};
32
33use crate::error::{CreateRecordBatchSnafu, InternalSnafu, Result};
34use crate::information_schema::Predicates;
35use crate::system_schema::information_schema::{InformationTable, REGION_STATISTICS};
36use crate::system_schema::utils;
37use crate::CatalogManager;
38
39const REGION_ID: &str = "region_id";
40const TABLE_ID: &str = "table_id";
41const REGION_NUMBER: &str = "region_number";
42const REGION_ROWS: &str = "region_rows";
43const DISK_SIZE: &str = "disk_size";
44const MEMTABLE_SIZE: &str = "memtable_size";
45const MANIFEST_SIZE: &str = "manifest_size";
46const SST_SIZE: &str = "sst_size";
47const INDEX_SIZE: &str = "index_size";
48const ENGINE: &str = "engine";
49const REGION_ROLE: &str = "region_role";
50
51const INIT_CAPACITY: usize = 42;
52
53/// The `REGION_STATISTICS` table provides information about the region statistics. Including fields:
54///
55/// - `region_id`: The region id.
56/// - `table_id`: The table id.
57/// - `region_number`: The region number.
58/// - `region_rows`: The number of rows in region.
59/// - `memtable_size`: The memtable size in bytes.
60/// - `disk_size`: The approximate disk size in bytes.
61/// - `manifest_size`: The manifest size in bytes.
62/// - `sst_size`: The sst data files size in bytes.
63/// - `index_size`: The sst index files size in bytes.
64/// - `engine`: The engine type.
65/// - `region_role`: The region role.
66#[derive(Debug)]
67pub(super) struct InformationSchemaRegionStatistics {
68    schema: SchemaRef,
69    catalog_manager: Weak<dyn CatalogManager>,
70}
71
72impl InformationSchemaRegionStatistics {
73    pub(super) fn new(catalog_manager: Weak<dyn CatalogManager>) -> Self {
74        Self {
75            schema: Self::schema(),
76            catalog_manager,
77        }
78    }
79
80    pub(crate) fn schema() -> SchemaRef {
81        Arc::new(Schema::new(vec![
82            ColumnSchema::new(REGION_ID, ConcreteDataType::uint64_datatype(), false),
83            ColumnSchema::new(TABLE_ID, ConcreteDataType::uint32_datatype(), false),
84            ColumnSchema::new(REGION_NUMBER, ConcreteDataType::uint32_datatype(), false),
85            ColumnSchema::new(REGION_ROWS, ConcreteDataType::uint64_datatype(), true),
86            ColumnSchema::new(DISK_SIZE, ConcreteDataType::uint64_datatype(), true),
87            ColumnSchema::new(MEMTABLE_SIZE, ConcreteDataType::uint64_datatype(), true),
88            ColumnSchema::new(MANIFEST_SIZE, ConcreteDataType::uint64_datatype(), true),
89            ColumnSchema::new(SST_SIZE, ConcreteDataType::uint64_datatype(), true),
90            ColumnSchema::new(INDEX_SIZE, ConcreteDataType::uint64_datatype(), true),
91            ColumnSchema::new(ENGINE, ConcreteDataType::string_datatype(), true),
92            ColumnSchema::new(REGION_ROLE, ConcreteDataType::string_datatype(), true),
93        ]))
94    }
95
96    fn builder(&self) -> InformationSchemaRegionStatisticsBuilder {
97        InformationSchemaRegionStatisticsBuilder::new(
98            self.schema.clone(),
99            self.catalog_manager.clone(),
100        )
101    }
102}
103
104impl InformationTable for InformationSchemaRegionStatistics {
105    fn table_id(&self) -> TableId {
106        INFORMATION_SCHEMA_REGION_STATISTICS_TABLE_ID
107    }
108
109    fn table_name(&self) -> &'static str {
110        REGION_STATISTICS
111    }
112
113    fn schema(&self) -> SchemaRef {
114        self.schema.clone()
115    }
116
117    fn to_stream(&self, request: ScanRequest) -> Result<SendableRecordBatchStream> {
118        let schema = self.schema.arrow_schema().clone();
119        let mut builder = self.builder();
120
121        let stream = Box::pin(DfRecordBatchStreamAdapter::new(
122            schema,
123            futures::stream::once(async move {
124                builder
125                    .make_region_statistics(Some(request))
126                    .await
127                    .map(|x| x.into_df_record_batch())
128                    .map_err(Into::into)
129            }),
130        ));
131
132        Ok(Box::pin(
133            RecordBatchStreamAdapter::try_new(stream)
134                .map_err(BoxedError::new)
135                .context(InternalSnafu)?,
136        ))
137    }
138}
139
140struct InformationSchemaRegionStatisticsBuilder {
141    schema: SchemaRef,
142    catalog_manager: Weak<dyn CatalogManager>,
143
144    region_ids: UInt64VectorBuilder,
145    table_ids: UInt32VectorBuilder,
146    region_numbers: UInt32VectorBuilder,
147    region_rows: UInt64VectorBuilder,
148    disk_sizes: UInt64VectorBuilder,
149    memtable_sizes: UInt64VectorBuilder,
150    manifest_sizes: UInt64VectorBuilder,
151    sst_sizes: UInt64VectorBuilder,
152    index_sizes: UInt64VectorBuilder,
153    engines: StringVectorBuilder,
154    region_roles: StringVectorBuilder,
155}
156
157impl InformationSchemaRegionStatisticsBuilder {
158    fn new(schema: SchemaRef, catalog_manager: Weak<dyn CatalogManager>) -> Self {
159        Self {
160            schema,
161            catalog_manager,
162            region_ids: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
163            table_ids: UInt32VectorBuilder::with_capacity(INIT_CAPACITY),
164            region_numbers: UInt32VectorBuilder::with_capacity(INIT_CAPACITY),
165            region_rows: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
166            disk_sizes: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
167            memtable_sizes: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
168            manifest_sizes: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
169            sst_sizes: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
170            index_sizes: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
171            engines: StringVectorBuilder::with_capacity(INIT_CAPACITY),
172            region_roles: StringVectorBuilder::with_capacity(INIT_CAPACITY),
173        }
174    }
175
176    /// Construct a new `InformationSchemaRegionStatistics` from the collected data.
177    async fn make_region_statistics(
178        &mut self,
179        request: Option<ScanRequest>,
180    ) -> Result<RecordBatch> {
181        let predicates = Predicates::from_scan_request(&request);
182        let information_extension = utils::information_extension(&self.catalog_manager)?;
183        let region_stats = information_extension.region_stats().await?;
184        for region_stat in region_stats {
185            self.add_region_statistic(&predicates, region_stat);
186        }
187        self.finish()
188    }
189
190    fn add_region_statistic(&mut self, predicate: &Predicates, region_stat: RegionStat) {
191        let row = [
192            (REGION_ID, &Value::from(region_stat.id.as_u64())),
193            (TABLE_ID, &Value::from(region_stat.id.table_id())),
194            (REGION_NUMBER, &Value::from(region_stat.id.region_number())),
195            (REGION_ROWS, &Value::from(region_stat.num_rows)),
196            (DISK_SIZE, &Value::from(region_stat.approximate_bytes)),
197            (MEMTABLE_SIZE, &Value::from(region_stat.memtable_size)),
198            (MANIFEST_SIZE, &Value::from(region_stat.manifest_size)),
199            (SST_SIZE, &Value::from(region_stat.sst_size)),
200            (INDEX_SIZE, &Value::from(region_stat.index_size)),
201            (ENGINE, &Value::from(region_stat.engine.as_str())),
202            (REGION_ROLE, &Value::from(region_stat.role.to_string())),
203        ];
204
205        if !predicate.eval(&row) {
206            return;
207        }
208
209        self.region_ids.push(Some(region_stat.id.as_u64()));
210        self.table_ids.push(Some(region_stat.id.table_id()));
211        self.region_numbers
212            .push(Some(region_stat.id.region_number()));
213        self.region_rows.push(Some(region_stat.num_rows));
214        self.disk_sizes.push(Some(region_stat.approximate_bytes));
215        self.memtable_sizes.push(Some(region_stat.memtable_size));
216        self.manifest_sizes.push(Some(region_stat.manifest_size));
217        self.sst_sizes.push(Some(region_stat.sst_size));
218        self.index_sizes.push(Some(region_stat.index_size));
219        self.engines.push(Some(&region_stat.engine));
220        self.region_roles.push(Some(&region_stat.role.to_string()));
221    }
222
223    fn finish(&mut self) -> Result<RecordBatch> {
224        let columns: Vec<VectorRef> = vec![
225            Arc::new(self.region_ids.finish()),
226            Arc::new(self.table_ids.finish()),
227            Arc::new(self.region_numbers.finish()),
228            Arc::new(self.region_rows.finish()),
229            Arc::new(self.disk_sizes.finish()),
230            Arc::new(self.memtable_sizes.finish()),
231            Arc::new(self.manifest_sizes.finish()),
232            Arc::new(self.sst_sizes.finish()),
233            Arc::new(self.index_sizes.finish()),
234            Arc::new(self.engines.finish()),
235            Arc::new(self.region_roles.finish()),
236        ];
237
238        RecordBatch::new(self.schema.clone(), columns).context(CreateRecordBatchSnafu)
239    }
240}
241
242impl DfPartitionStream for InformationSchemaRegionStatistics {
243    fn schema(&self) -> &ArrowSchemaRef {
244        self.schema.arrow_schema()
245    }
246
247    fn execute(&self, _: Arc<TaskContext>) -> DfSendableRecordBatchStream {
248        let schema = self.schema.arrow_schema().clone();
249        let mut builder = self.builder();
250        Box::pin(DfRecordBatchStreamAdapter::new(
251            schema,
252            futures::stream::once(async move {
253                builder
254                    .make_region_statistics(None)
255                    .await
256                    .map(|x| x.into_df_record_batch())
257                    .map_err(Into::into)
258            }),
259        ))
260    }
261}