catalog/system_schema/information_schema/
region_statistics.rs1use std::sync::{Arc, Weak};
16
17use arrow_schema::SchemaRef as ArrowSchemaRef;
18use common_catalog::consts::INFORMATION_SCHEMA_REGION_STATISTICS_TABLE_ID;
19use common_error::ext::BoxedError;
20use common_meta::datanode::RegionStat;
21use common_recordbatch::adapter::RecordBatchStreamAdapter;
22use common_recordbatch::{DfSendableRecordBatchStream, RecordBatch, SendableRecordBatchStream};
23use datafusion::execution::TaskContext;
24use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter;
25use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream;
26use datatypes::prelude::{ConcreteDataType, ScalarVectorBuilder, VectorRef};
27use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
28use datatypes::value::Value;
29use datatypes::vectors::{StringVectorBuilder, UInt32VectorBuilder, UInt64VectorBuilder};
30use snafu::ResultExt;
31use store_api::storage::{ScanRequest, TableId};
32
33use crate::error::{CreateRecordBatchSnafu, InternalSnafu, Result};
34use crate::information_schema::Predicates;
35use crate::system_schema::information_schema::{InformationTable, REGION_STATISTICS};
36use crate::system_schema::utils;
37use crate::CatalogManager;
38
39const REGION_ID: &str = "region_id";
40const TABLE_ID: &str = "table_id";
41const REGION_NUMBER: &str = "region_number";
42const REGION_ROWS: &str = "region_rows";
43const DISK_SIZE: &str = "disk_size";
44const MEMTABLE_SIZE: &str = "memtable_size";
45const MANIFEST_SIZE: &str = "manifest_size";
46const SST_SIZE: &str = "sst_size";
47const SST_NUM: &str = "sst_num";
48const INDEX_SIZE: &str = "index_size";
49const ENGINE: &str = "engine";
50const REGION_ROLE: &str = "region_role";
51
52const INIT_CAPACITY: usize = 42;
53
54#[derive(Debug)]
68pub(super) struct InformationSchemaRegionStatistics {
69 schema: SchemaRef,
70 catalog_manager: Weak<dyn CatalogManager>,
71}
72
73impl InformationSchemaRegionStatistics {
74 pub(super) fn new(catalog_manager: Weak<dyn CatalogManager>) -> Self {
75 Self {
76 schema: Self::schema(),
77 catalog_manager,
78 }
79 }
80
81 pub(crate) fn schema() -> SchemaRef {
82 Arc::new(Schema::new(vec![
83 ColumnSchema::new(REGION_ID, ConcreteDataType::uint64_datatype(), false),
84 ColumnSchema::new(TABLE_ID, ConcreteDataType::uint32_datatype(), false),
85 ColumnSchema::new(REGION_NUMBER, ConcreteDataType::uint32_datatype(), false),
86 ColumnSchema::new(REGION_ROWS, ConcreteDataType::uint64_datatype(), true),
87 ColumnSchema::new(DISK_SIZE, ConcreteDataType::uint64_datatype(), true),
88 ColumnSchema::new(MEMTABLE_SIZE, ConcreteDataType::uint64_datatype(), true),
89 ColumnSchema::new(MANIFEST_SIZE, ConcreteDataType::uint64_datatype(), true),
90 ColumnSchema::new(SST_SIZE, ConcreteDataType::uint64_datatype(), true),
91 ColumnSchema::new(SST_NUM, ConcreteDataType::uint64_datatype(), true),
92 ColumnSchema::new(INDEX_SIZE, ConcreteDataType::uint64_datatype(), true),
93 ColumnSchema::new(ENGINE, ConcreteDataType::string_datatype(), true),
94 ColumnSchema::new(REGION_ROLE, ConcreteDataType::string_datatype(), true),
95 ]))
96 }
97
98 fn builder(&self) -> InformationSchemaRegionStatisticsBuilder {
99 InformationSchemaRegionStatisticsBuilder::new(
100 self.schema.clone(),
101 self.catalog_manager.clone(),
102 )
103 }
104}
105
106impl InformationTable for InformationSchemaRegionStatistics {
107 fn table_id(&self) -> TableId {
108 INFORMATION_SCHEMA_REGION_STATISTICS_TABLE_ID
109 }
110
111 fn table_name(&self) -> &'static str {
112 REGION_STATISTICS
113 }
114
115 fn schema(&self) -> SchemaRef {
116 self.schema.clone()
117 }
118
119 fn to_stream(&self, request: ScanRequest) -> Result<SendableRecordBatchStream> {
120 let schema = self.schema.arrow_schema().clone();
121 let mut builder = self.builder();
122
123 let stream = Box::pin(DfRecordBatchStreamAdapter::new(
124 schema,
125 futures::stream::once(async move {
126 builder
127 .make_region_statistics(Some(request))
128 .await
129 .map(|x| x.into_df_record_batch())
130 .map_err(Into::into)
131 }),
132 ));
133
134 Ok(Box::pin(
135 RecordBatchStreamAdapter::try_new(stream)
136 .map_err(BoxedError::new)
137 .context(InternalSnafu)?,
138 ))
139 }
140}
141
142struct InformationSchemaRegionStatisticsBuilder {
143 schema: SchemaRef,
144 catalog_manager: Weak<dyn CatalogManager>,
145
146 region_ids: UInt64VectorBuilder,
147 table_ids: UInt32VectorBuilder,
148 region_numbers: UInt32VectorBuilder,
149 region_rows: UInt64VectorBuilder,
150 disk_sizes: UInt64VectorBuilder,
151 memtable_sizes: UInt64VectorBuilder,
152 manifest_sizes: UInt64VectorBuilder,
153 sst_sizes: UInt64VectorBuilder,
154 sst_nums: UInt64VectorBuilder,
155 index_sizes: UInt64VectorBuilder,
156 engines: StringVectorBuilder,
157 region_roles: StringVectorBuilder,
158}
159
160impl InformationSchemaRegionStatisticsBuilder {
161 fn new(schema: SchemaRef, catalog_manager: Weak<dyn CatalogManager>) -> Self {
162 Self {
163 schema,
164 catalog_manager,
165 region_ids: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
166 table_ids: UInt32VectorBuilder::with_capacity(INIT_CAPACITY),
167 region_numbers: UInt32VectorBuilder::with_capacity(INIT_CAPACITY),
168 region_rows: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
169 disk_sizes: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
170 memtable_sizes: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
171 manifest_sizes: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
172 sst_sizes: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
173 sst_nums: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
174 index_sizes: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
175 engines: StringVectorBuilder::with_capacity(INIT_CAPACITY),
176 region_roles: StringVectorBuilder::with_capacity(INIT_CAPACITY),
177 }
178 }
179
180 async fn make_region_statistics(
182 &mut self,
183 request: Option<ScanRequest>,
184 ) -> Result<RecordBatch> {
185 let predicates = Predicates::from_scan_request(&request);
186 let information_extension = utils::information_extension(&self.catalog_manager)?;
187 let region_stats = information_extension.region_stats().await?;
188 for region_stat in region_stats {
189 self.add_region_statistic(&predicates, region_stat);
190 }
191 self.finish()
192 }
193
194 fn add_region_statistic(&mut self, predicate: &Predicates, region_stat: RegionStat) {
195 let row = [
196 (REGION_ID, &Value::from(region_stat.id.as_u64())),
197 (TABLE_ID, &Value::from(region_stat.id.table_id())),
198 (REGION_NUMBER, &Value::from(region_stat.id.region_number())),
199 (REGION_ROWS, &Value::from(region_stat.num_rows)),
200 (DISK_SIZE, &Value::from(region_stat.approximate_bytes)),
201 (MEMTABLE_SIZE, &Value::from(region_stat.memtable_size)),
202 (MANIFEST_SIZE, &Value::from(region_stat.manifest_size)),
203 (SST_SIZE, &Value::from(region_stat.sst_size)),
204 (SST_NUM, &Value::from(region_stat.sst_num)),
205 (INDEX_SIZE, &Value::from(region_stat.index_size)),
206 (ENGINE, &Value::from(region_stat.engine.as_str())),
207 (REGION_ROLE, &Value::from(region_stat.role.to_string())),
208 ];
209
210 if !predicate.eval(&row) {
211 return;
212 }
213
214 self.region_ids.push(Some(region_stat.id.as_u64()));
215 self.table_ids.push(Some(region_stat.id.table_id()));
216 self.region_numbers
217 .push(Some(region_stat.id.region_number()));
218 self.region_rows.push(Some(region_stat.num_rows));
219 self.disk_sizes.push(Some(region_stat.approximate_bytes));
220 self.memtable_sizes.push(Some(region_stat.memtable_size));
221 self.manifest_sizes.push(Some(region_stat.manifest_size));
222 self.sst_sizes.push(Some(region_stat.sst_size));
223 self.sst_nums.push(Some(region_stat.sst_num));
224 self.index_sizes.push(Some(region_stat.index_size));
225 self.engines.push(Some(®ion_stat.engine));
226 self.region_roles.push(Some(®ion_stat.role.to_string()));
227 }
228
229 fn finish(&mut self) -> Result<RecordBatch> {
230 let columns: Vec<VectorRef> = vec![
231 Arc::new(self.region_ids.finish()),
232 Arc::new(self.table_ids.finish()),
233 Arc::new(self.region_numbers.finish()),
234 Arc::new(self.region_rows.finish()),
235 Arc::new(self.disk_sizes.finish()),
236 Arc::new(self.memtable_sizes.finish()),
237 Arc::new(self.manifest_sizes.finish()),
238 Arc::new(self.sst_sizes.finish()),
239 Arc::new(self.sst_nums.finish()),
240 Arc::new(self.index_sizes.finish()),
241 Arc::new(self.engines.finish()),
242 Arc::new(self.region_roles.finish()),
243 ];
244
245 RecordBatch::new(self.schema.clone(), columns).context(CreateRecordBatchSnafu)
246 }
247}
248
249impl DfPartitionStream for InformationSchemaRegionStatistics {
250 fn schema(&self) -> &ArrowSchemaRef {
251 self.schema.arrow_schema()
252 }
253
254 fn execute(&self, _: Arc<TaskContext>) -> DfSendableRecordBatchStream {
255 let schema = self.schema.arrow_schema().clone();
256 let mut builder = self.builder();
257 Box::pin(DfRecordBatchStreamAdapter::new(
258 schema,
259 futures::stream::once(async move {
260 builder
261 .make_region_statistics(None)
262 .await
263 .map(|x| x.into_df_record_batch())
264 .map_err(Into::into)
265 }),
266 ))
267 }
268}