catalog/system_schema/information_schema/
region_statistics.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::{Arc, Weak};
16
17use arrow_schema::SchemaRef as ArrowSchemaRef;
18use common_catalog::consts::INFORMATION_SCHEMA_REGION_STATISTICS_TABLE_ID;
19use common_error::ext::BoxedError;
20use common_meta::datanode::RegionStat;
21use common_recordbatch::adapter::RecordBatchStreamAdapter;
22use common_recordbatch::{DfSendableRecordBatchStream, RecordBatch, SendableRecordBatchStream};
23use datafusion::execution::TaskContext;
24use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter;
25use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream;
26use datatypes::prelude::{ConcreteDataType, ScalarVectorBuilder, VectorRef};
27use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
28use datatypes::value::Value;
29use datatypes::vectors::{StringVectorBuilder, UInt32VectorBuilder, UInt64VectorBuilder};
30use snafu::ResultExt;
31use store_api::storage::{ScanRequest, TableId};
32
33use crate::CatalogManager;
34use crate::error::{CreateRecordBatchSnafu, InternalSnafu, Result};
35use crate::information_schema::Predicates;
36use crate::system_schema::information_schema::{InformationTable, REGION_STATISTICS};
37use crate::system_schema::utils;
38
39const REGION_ID: &str = "region_id";
40const TABLE_ID: &str = "table_id";
41const REGION_NUMBER: &str = "region_number";
42const REGION_ROWS: &str = "region_rows";
43const WRITTEN_BYTES: &str = "written_bytes_since_open";
44const DISK_SIZE: &str = "disk_size";
45const MEMTABLE_SIZE: &str = "memtable_size";
46const MANIFEST_SIZE: &str = "manifest_size";
47const SST_SIZE: &str = "sst_size";
48const SST_NUM: &str = "sst_num";
49const INDEX_SIZE: &str = "index_size";
50const ENGINE: &str = "engine";
51const REGION_ROLE: &str = "region_role";
52
53const INIT_CAPACITY: usize = 42;
54
55/// The `REGION_STATISTICS` table provides information about the region statistics. Including fields:
56///
57/// - `region_id`: The region id.
58/// - `table_id`: The table id.
59/// - `region_number`: The region number.
60/// - `region_rows`: The number of rows in region.
61/// - `written_bytes_since_open`: The total bytes written of the region since region opened.
62/// - `memtable_size`: The memtable size in bytes.
63/// - `disk_size`: The approximate disk size in bytes.
64/// - `manifest_size`: The manifest size in bytes.
65/// - `sst_size`: The sst data files size in bytes.
66/// - `index_size`: The sst index files size in bytes.
67/// - `engine`: The engine type.
68/// - `region_role`: The region role.
69#[derive(Debug)]
70pub(super) struct InformationSchemaRegionStatistics {
71    schema: SchemaRef,
72    catalog_manager: Weak<dyn CatalogManager>,
73}
74
75impl InformationSchemaRegionStatistics {
76    pub(super) fn new(catalog_manager: Weak<dyn CatalogManager>) -> Self {
77        Self {
78            schema: Self::schema(),
79            catalog_manager,
80        }
81    }
82
83    pub(crate) fn schema() -> SchemaRef {
84        Arc::new(Schema::new(vec![
85            ColumnSchema::new(REGION_ID, ConcreteDataType::uint64_datatype(), false),
86            ColumnSchema::new(TABLE_ID, ConcreteDataType::uint32_datatype(), false),
87            ColumnSchema::new(REGION_NUMBER, ConcreteDataType::uint32_datatype(), false),
88            ColumnSchema::new(REGION_ROWS, ConcreteDataType::uint64_datatype(), true),
89            ColumnSchema::new(WRITTEN_BYTES, ConcreteDataType::uint64_datatype(), true),
90            ColumnSchema::new(DISK_SIZE, ConcreteDataType::uint64_datatype(), true),
91            ColumnSchema::new(MEMTABLE_SIZE, ConcreteDataType::uint64_datatype(), true),
92            ColumnSchema::new(MANIFEST_SIZE, ConcreteDataType::uint64_datatype(), true),
93            ColumnSchema::new(SST_SIZE, ConcreteDataType::uint64_datatype(), true),
94            ColumnSchema::new(SST_NUM, ConcreteDataType::uint64_datatype(), true),
95            ColumnSchema::new(INDEX_SIZE, ConcreteDataType::uint64_datatype(), true),
96            ColumnSchema::new(ENGINE, ConcreteDataType::string_datatype(), true),
97            ColumnSchema::new(REGION_ROLE, ConcreteDataType::string_datatype(), true),
98        ]))
99    }
100
101    fn builder(&self) -> InformationSchemaRegionStatisticsBuilder {
102        InformationSchemaRegionStatisticsBuilder::new(
103            self.schema.clone(),
104            self.catalog_manager.clone(),
105        )
106    }
107}
108
109impl InformationTable for InformationSchemaRegionStatistics {
110    fn table_id(&self) -> TableId {
111        INFORMATION_SCHEMA_REGION_STATISTICS_TABLE_ID
112    }
113
114    fn table_name(&self) -> &'static str {
115        REGION_STATISTICS
116    }
117
118    fn schema(&self) -> SchemaRef {
119        self.schema.clone()
120    }
121
122    fn to_stream(&self, request: ScanRequest) -> Result<SendableRecordBatchStream> {
123        let schema = self.schema.arrow_schema().clone();
124        let mut builder = self.builder();
125
126        let stream = Box::pin(DfRecordBatchStreamAdapter::new(
127            schema,
128            futures::stream::once(async move {
129                builder
130                    .make_region_statistics(Some(request))
131                    .await
132                    .map(|x| x.into_df_record_batch())
133                    .map_err(Into::into)
134            }),
135        ));
136
137        Ok(Box::pin(
138            RecordBatchStreamAdapter::try_new(stream)
139                .map_err(BoxedError::new)
140                .context(InternalSnafu)?,
141        ))
142    }
143}
144
145struct InformationSchemaRegionStatisticsBuilder {
146    schema: SchemaRef,
147    catalog_manager: Weak<dyn CatalogManager>,
148
149    region_ids: UInt64VectorBuilder,
150    table_ids: UInt32VectorBuilder,
151    region_numbers: UInt32VectorBuilder,
152    region_rows: UInt64VectorBuilder,
153    written_bytes: UInt64VectorBuilder,
154    disk_sizes: UInt64VectorBuilder,
155    memtable_sizes: UInt64VectorBuilder,
156    manifest_sizes: UInt64VectorBuilder,
157    sst_sizes: UInt64VectorBuilder,
158    sst_nums: UInt64VectorBuilder,
159    index_sizes: UInt64VectorBuilder,
160    engines: StringVectorBuilder,
161    region_roles: StringVectorBuilder,
162}
163
164impl InformationSchemaRegionStatisticsBuilder {
165    fn new(schema: SchemaRef, catalog_manager: Weak<dyn CatalogManager>) -> Self {
166        Self {
167            schema,
168            catalog_manager,
169            region_ids: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
170            table_ids: UInt32VectorBuilder::with_capacity(INIT_CAPACITY),
171            region_numbers: UInt32VectorBuilder::with_capacity(INIT_CAPACITY),
172            region_rows: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
173            written_bytes: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
174            disk_sizes: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
175            memtable_sizes: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
176            manifest_sizes: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
177            sst_sizes: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
178            sst_nums: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
179            index_sizes: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
180            engines: StringVectorBuilder::with_capacity(INIT_CAPACITY),
181            region_roles: StringVectorBuilder::with_capacity(INIT_CAPACITY),
182        }
183    }
184
185    /// Construct a new `InformationSchemaRegionStatistics` from the collected data.
186    async fn make_region_statistics(
187        &mut self,
188        request: Option<ScanRequest>,
189    ) -> Result<RecordBatch> {
190        let predicates = Predicates::from_scan_request(&request);
191        let information_extension = utils::information_extension(&self.catalog_manager)?;
192        let region_stats = information_extension.region_stats().await?;
193        for region_stat in region_stats {
194            self.add_region_statistic(&predicates, region_stat);
195        }
196        self.finish()
197    }
198
199    fn add_region_statistic(&mut self, predicate: &Predicates, region_stat: RegionStat) {
200        let row = [
201            (REGION_ID, &Value::from(region_stat.id.as_u64())),
202            (TABLE_ID, &Value::from(region_stat.id.table_id())),
203            (REGION_NUMBER, &Value::from(region_stat.id.region_number())),
204            (REGION_ROWS, &Value::from(region_stat.num_rows)),
205            (WRITTEN_BYTES, &Value::from(region_stat.written_bytes)),
206            (DISK_SIZE, &Value::from(region_stat.approximate_bytes)),
207            (MEMTABLE_SIZE, &Value::from(region_stat.memtable_size)),
208            (MANIFEST_SIZE, &Value::from(region_stat.manifest_size)),
209            (SST_SIZE, &Value::from(region_stat.sst_size)),
210            (SST_NUM, &Value::from(region_stat.sst_num)),
211            (INDEX_SIZE, &Value::from(region_stat.index_size)),
212            (ENGINE, &Value::from(region_stat.engine.as_str())),
213            (REGION_ROLE, &Value::from(region_stat.role.to_string())),
214        ];
215
216        if !predicate.eval(&row) {
217            return;
218        }
219
220        self.region_ids.push(Some(region_stat.id.as_u64()));
221        self.table_ids.push(Some(region_stat.id.table_id()));
222        self.region_numbers
223            .push(Some(region_stat.id.region_number()));
224        self.region_rows.push(Some(region_stat.num_rows));
225        self.written_bytes.push(Some(region_stat.written_bytes));
226        self.disk_sizes.push(Some(region_stat.approximate_bytes));
227        self.memtable_sizes.push(Some(region_stat.memtable_size));
228        self.manifest_sizes.push(Some(region_stat.manifest_size));
229        self.sst_sizes.push(Some(region_stat.sst_size));
230        self.sst_nums.push(Some(region_stat.sst_num));
231        self.index_sizes.push(Some(region_stat.index_size));
232        self.engines.push(Some(&region_stat.engine));
233        self.region_roles.push(Some(&region_stat.role.to_string()));
234    }
235
236    fn finish(&mut self) -> Result<RecordBatch> {
237        let columns: Vec<VectorRef> = vec![
238            Arc::new(self.region_ids.finish()),
239            Arc::new(self.table_ids.finish()),
240            Arc::new(self.region_numbers.finish()),
241            Arc::new(self.region_rows.finish()),
242            Arc::new(self.written_bytes.finish()),
243            Arc::new(self.disk_sizes.finish()),
244            Arc::new(self.memtable_sizes.finish()),
245            Arc::new(self.manifest_sizes.finish()),
246            Arc::new(self.sst_sizes.finish()),
247            Arc::new(self.sst_nums.finish()),
248            Arc::new(self.index_sizes.finish()),
249            Arc::new(self.engines.finish()),
250            Arc::new(self.region_roles.finish()),
251        ];
252
253        RecordBatch::new(self.schema.clone(), columns).context(CreateRecordBatchSnafu)
254    }
255}
256
257impl DfPartitionStream for InformationSchemaRegionStatistics {
258    fn schema(&self) -> &ArrowSchemaRef {
259        self.schema.arrow_schema()
260    }
261
262    fn execute(&self, _: Arc<TaskContext>) -> DfSendableRecordBatchStream {
263        let schema = self.schema.arrow_schema().clone();
264        let mut builder = self.builder();
265        Box::pin(DfRecordBatchStreamAdapter::new(
266            schema,
267            futures::stream::once(async move {
268                builder
269                    .make_region_statistics(None)
270                    .await
271                    .map(|x| x.into_df_record_batch())
272                    .map_err(Into::into)
273            }),
274        ))
275    }
276}