catalog/system_schema/information_schema/
region_statistics.rs1use std::sync::{Arc, Weak};
16
17use arrow_schema::SchemaRef as ArrowSchemaRef;
18use common_catalog::consts::INFORMATION_SCHEMA_REGION_STATISTICS_TABLE_ID;
19use common_error::ext::BoxedError;
20use common_meta::datanode::RegionStat;
21use common_recordbatch::adapter::RecordBatchStreamAdapter;
22use common_recordbatch::{DfSendableRecordBatchStream, RecordBatch, SendableRecordBatchStream};
23use datafusion::execution::TaskContext;
24use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter;
25use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream;
26use datatypes::prelude::{ConcreteDataType, ScalarVectorBuilder, VectorRef};
27use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
28use datatypes::value::Value;
29use datatypes::vectors::{StringVectorBuilder, UInt32VectorBuilder, UInt64VectorBuilder};
30use snafu::ResultExt;
31use store_api::storage::{ScanRequest, TableId};
32
33use crate::CatalogManager;
34use crate::error::{CreateRecordBatchSnafu, InternalSnafu, Result};
35use crate::information_schema::Predicates;
36use crate::system_schema::information_schema::{InformationTable, REGION_STATISTICS};
37use crate::system_schema::utils;
38
39const REGION_ID: &str = "region_id";
40const TABLE_ID: &str = "table_id";
41const REGION_NUMBER: &str = "region_number";
42const REGION_ROWS: &str = "region_rows";
43const WRITTEN_BYTES: &str = "written_bytes_since_open";
44const DISK_SIZE: &str = "disk_size";
45const MEMTABLE_SIZE: &str = "memtable_size";
46const MANIFEST_SIZE: &str = "manifest_size";
47const SST_SIZE: &str = "sst_size";
48const SST_NUM: &str = "sst_num";
49const INDEX_SIZE: &str = "index_size";
50const ENGINE: &str = "engine";
51const REGION_ROLE: &str = "region_role";
52
53const INIT_CAPACITY: usize = 42;
54
55#[derive(Debug)]
70pub(super) struct InformationSchemaRegionStatistics {
71 schema: SchemaRef,
72 catalog_manager: Weak<dyn CatalogManager>,
73}
74
75impl InformationSchemaRegionStatistics {
76 pub(super) fn new(catalog_manager: Weak<dyn CatalogManager>) -> Self {
77 Self {
78 schema: Self::schema(),
79 catalog_manager,
80 }
81 }
82
83 pub(crate) fn schema() -> SchemaRef {
84 Arc::new(Schema::new(vec![
85 ColumnSchema::new(REGION_ID, ConcreteDataType::uint64_datatype(), false),
86 ColumnSchema::new(TABLE_ID, ConcreteDataType::uint32_datatype(), false),
87 ColumnSchema::new(REGION_NUMBER, ConcreteDataType::uint32_datatype(), false),
88 ColumnSchema::new(REGION_ROWS, ConcreteDataType::uint64_datatype(), true),
89 ColumnSchema::new(WRITTEN_BYTES, ConcreteDataType::uint64_datatype(), true),
90 ColumnSchema::new(DISK_SIZE, ConcreteDataType::uint64_datatype(), true),
91 ColumnSchema::new(MEMTABLE_SIZE, ConcreteDataType::uint64_datatype(), true),
92 ColumnSchema::new(MANIFEST_SIZE, ConcreteDataType::uint64_datatype(), true),
93 ColumnSchema::new(SST_SIZE, ConcreteDataType::uint64_datatype(), true),
94 ColumnSchema::new(SST_NUM, ConcreteDataType::uint64_datatype(), true),
95 ColumnSchema::new(INDEX_SIZE, ConcreteDataType::uint64_datatype(), true),
96 ColumnSchema::new(ENGINE, ConcreteDataType::string_datatype(), true),
97 ColumnSchema::new(REGION_ROLE, ConcreteDataType::string_datatype(), true),
98 ]))
99 }
100
101 fn builder(&self) -> InformationSchemaRegionStatisticsBuilder {
102 InformationSchemaRegionStatisticsBuilder::new(
103 self.schema.clone(),
104 self.catalog_manager.clone(),
105 )
106 }
107}
108
109impl InformationTable for InformationSchemaRegionStatistics {
110 fn table_id(&self) -> TableId {
111 INFORMATION_SCHEMA_REGION_STATISTICS_TABLE_ID
112 }
113
114 fn table_name(&self) -> &'static str {
115 REGION_STATISTICS
116 }
117
118 fn schema(&self) -> SchemaRef {
119 self.schema.clone()
120 }
121
122 fn to_stream(&self, request: ScanRequest) -> Result<SendableRecordBatchStream> {
123 let schema = self.schema.arrow_schema().clone();
124 let mut builder = self.builder();
125
126 let stream = Box::pin(DfRecordBatchStreamAdapter::new(
127 schema,
128 futures::stream::once(async move {
129 builder
130 .make_region_statistics(Some(request))
131 .await
132 .map(|x| x.into_df_record_batch())
133 .map_err(Into::into)
134 }),
135 ));
136
137 Ok(Box::pin(
138 RecordBatchStreamAdapter::try_new(stream)
139 .map_err(BoxedError::new)
140 .context(InternalSnafu)?,
141 ))
142 }
143}
144
145struct InformationSchemaRegionStatisticsBuilder {
146 schema: SchemaRef,
147 catalog_manager: Weak<dyn CatalogManager>,
148
149 region_ids: UInt64VectorBuilder,
150 table_ids: UInt32VectorBuilder,
151 region_numbers: UInt32VectorBuilder,
152 region_rows: UInt64VectorBuilder,
153 written_bytes: UInt64VectorBuilder,
154 disk_sizes: UInt64VectorBuilder,
155 memtable_sizes: UInt64VectorBuilder,
156 manifest_sizes: UInt64VectorBuilder,
157 sst_sizes: UInt64VectorBuilder,
158 sst_nums: UInt64VectorBuilder,
159 index_sizes: UInt64VectorBuilder,
160 engines: StringVectorBuilder,
161 region_roles: StringVectorBuilder,
162}
163
164impl InformationSchemaRegionStatisticsBuilder {
165 fn new(schema: SchemaRef, catalog_manager: Weak<dyn CatalogManager>) -> Self {
166 Self {
167 schema,
168 catalog_manager,
169 region_ids: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
170 table_ids: UInt32VectorBuilder::with_capacity(INIT_CAPACITY),
171 region_numbers: UInt32VectorBuilder::with_capacity(INIT_CAPACITY),
172 region_rows: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
173 written_bytes: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
174 disk_sizes: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
175 memtable_sizes: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
176 manifest_sizes: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
177 sst_sizes: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
178 sst_nums: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
179 index_sizes: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
180 engines: StringVectorBuilder::with_capacity(INIT_CAPACITY),
181 region_roles: StringVectorBuilder::with_capacity(INIT_CAPACITY),
182 }
183 }
184
185 async fn make_region_statistics(
187 &mut self,
188 request: Option<ScanRequest>,
189 ) -> Result<RecordBatch> {
190 let predicates = Predicates::from_scan_request(&request);
191 let information_extension = utils::information_extension(&self.catalog_manager)?;
192 let region_stats = information_extension.region_stats().await?;
193 for region_stat in region_stats {
194 self.add_region_statistic(&predicates, region_stat);
195 }
196 self.finish()
197 }
198
199 fn add_region_statistic(&mut self, predicate: &Predicates, region_stat: RegionStat) {
200 let row = [
201 (REGION_ID, &Value::from(region_stat.id.as_u64())),
202 (TABLE_ID, &Value::from(region_stat.id.table_id())),
203 (REGION_NUMBER, &Value::from(region_stat.id.region_number())),
204 (REGION_ROWS, &Value::from(region_stat.num_rows)),
205 (WRITTEN_BYTES, &Value::from(region_stat.written_bytes)),
206 (DISK_SIZE, &Value::from(region_stat.approximate_bytes)),
207 (MEMTABLE_SIZE, &Value::from(region_stat.memtable_size)),
208 (MANIFEST_SIZE, &Value::from(region_stat.manifest_size)),
209 (SST_SIZE, &Value::from(region_stat.sst_size)),
210 (SST_NUM, &Value::from(region_stat.sst_num)),
211 (INDEX_SIZE, &Value::from(region_stat.index_size)),
212 (ENGINE, &Value::from(region_stat.engine.as_str())),
213 (REGION_ROLE, &Value::from(region_stat.role.to_string())),
214 ];
215
216 if !predicate.eval(&row) {
217 return;
218 }
219
220 self.region_ids.push(Some(region_stat.id.as_u64()));
221 self.table_ids.push(Some(region_stat.id.table_id()));
222 self.region_numbers
223 .push(Some(region_stat.id.region_number()));
224 self.region_rows.push(Some(region_stat.num_rows));
225 self.written_bytes.push(Some(region_stat.written_bytes));
226 self.disk_sizes.push(Some(region_stat.approximate_bytes));
227 self.memtable_sizes.push(Some(region_stat.memtable_size));
228 self.manifest_sizes.push(Some(region_stat.manifest_size));
229 self.sst_sizes.push(Some(region_stat.sst_size));
230 self.sst_nums.push(Some(region_stat.sst_num));
231 self.index_sizes.push(Some(region_stat.index_size));
232 self.engines.push(Some(®ion_stat.engine));
233 self.region_roles.push(Some(®ion_stat.role.to_string()));
234 }
235
236 fn finish(&mut self) -> Result<RecordBatch> {
237 let columns: Vec<VectorRef> = vec![
238 Arc::new(self.region_ids.finish()),
239 Arc::new(self.table_ids.finish()),
240 Arc::new(self.region_numbers.finish()),
241 Arc::new(self.region_rows.finish()),
242 Arc::new(self.written_bytes.finish()),
243 Arc::new(self.disk_sizes.finish()),
244 Arc::new(self.memtable_sizes.finish()),
245 Arc::new(self.manifest_sizes.finish()),
246 Arc::new(self.sst_sizes.finish()),
247 Arc::new(self.sst_nums.finish()),
248 Arc::new(self.index_sizes.finish()),
249 Arc::new(self.engines.finish()),
250 Arc::new(self.region_roles.finish()),
251 ];
252
253 RecordBatch::new(self.schema.clone(), columns).context(CreateRecordBatchSnafu)
254 }
255}
256
257impl DfPartitionStream for InformationSchemaRegionStatistics {
258 fn schema(&self) -> &ArrowSchemaRef {
259 self.schema.arrow_schema()
260 }
261
262 fn execute(&self, _: Arc<TaskContext>) -> DfSendableRecordBatchStream {
263 let schema = self.schema.arrow_schema().clone();
264 let mut builder = self.builder();
265 Box::pin(DfRecordBatchStreamAdapter::new(
266 schema,
267 futures::stream::once(async move {
268 builder
269 .make_region_statistics(None)
270 .await
271 .map(|x| x.into_df_record_batch())
272 .map_err(Into::into)
273 }),
274 ))
275 }
276}