catalog/system_schema/information_schema/
region_statistics.rs1use std::sync::{Arc, Weak};
16
17use arrow_schema::SchemaRef as ArrowSchemaRef;
18use common_catalog::consts::INFORMATION_SCHEMA_REGION_STATISTICS_TABLE_ID;
19use common_error::ext::BoxedError;
20use common_meta::datanode::RegionStat;
21use common_recordbatch::adapter::RecordBatchStreamAdapter;
22use common_recordbatch::{DfSendableRecordBatchStream, RecordBatch, SendableRecordBatchStream};
23use datafusion::execution::TaskContext;
24use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter;
25use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream;
26use datatypes::prelude::{ConcreteDataType, ScalarVectorBuilder, VectorRef};
27use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
28use datatypes::value::Value;
29use datatypes::vectors::{StringVectorBuilder, UInt32VectorBuilder, UInt64VectorBuilder};
30use snafu::ResultExt;
31use store_api::storage::{ScanRequest, TableId};
32
33use crate::error::{CreateRecordBatchSnafu, InternalSnafu, Result};
34use crate::information_schema::Predicates;
35use crate::system_schema::information_schema::{InformationTable, REGION_STATISTICS};
36use crate::system_schema::utils;
37use crate::CatalogManager;
38
39const REGION_ID: &str = "region_id";
40const TABLE_ID: &str = "table_id";
41const REGION_NUMBER: &str = "region_number";
42const REGION_ROWS: &str = "region_rows";
43const DISK_SIZE: &str = "disk_size";
44const MEMTABLE_SIZE: &str = "memtable_size";
45const MANIFEST_SIZE: &str = "manifest_size";
46const SST_SIZE: &str = "sst_size";
47const INDEX_SIZE: &str = "index_size";
48const ENGINE: &str = "engine";
49const REGION_ROLE: &str = "region_role";
50
51const INIT_CAPACITY: usize = 42;
52
53#[derive(Debug)]
67pub(super) struct InformationSchemaRegionStatistics {
68 schema: SchemaRef,
69 catalog_manager: Weak<dyn CatalogManager>,
70}
71
72impl InformationSchemaRegionStatistics {
73 pub(super) fn new(catalog_manager: Weak<dyn CatalogManager>) -> Self {
74 Self {
75 schema: Self::schema(),
76 catalog_manager,
77 }
78 }
79
80 pub(crate) fn schema() -> SchemaRef {
81 Arc::new(Schema::new(vec![
82 ColumnSchema::new(REGION_ID, ConcreteDataType::uint64_datatype(), false),
83 ColumnSchema::new(TABLE_ID, ConcreteDataType::uint32_datatype(), false),
84 ColumnSchema::new(REGION_NUMBER, ConcreteDataType::uint32_datatype(), false),
85 ColumnSchema::new(REGION_ROWS, ConcreteDataType::uint64_datatype(), true),
86 ColumnSchema::new(DISK_SIZE, ConcreteDataType::uint64_datatype(), true),
87 ColumnSchema::new(MEMTABLE_SIZE, ConcreteDataType::uint64_datatype(), true),
88 ColumnSchema::new(MANIFEST_SIZE, ConcreteDataType::uint64_datatype(), true),
89 ColumnSchema::new(SST_SIZE, ConcreteDataType::uint64_datatype(), true),
90 ColumnSchema::new(INDEX_SIZE, ConcreteDataType::uint64_datatype(), true),
91 ColumnSchema::new(ENGINE, ConcreteDataType::string_datatype(), true),
92 ColumnSchema::new(REGION_ROLE, ConcreteDataType::string_datatype(), true),
93 ]))
94 }
95
96 fn builder(&self) -> InformationSchemaRegionStatisticsBuilder {
97 InformationSchemaRegionStatisticsBuilder::new(
98 self.schema.clone(),
99 self.catalog_manager.clone(),
100 )
101 }
102}
103
104impl InformationTable for InformationSchemaRegionStatistics {
105 fn table_id(&self) -> TableId {
106 INFORMATION_SCHEMA_REGION_STATISTICS_TABLE_ID
107 }
108
109 fn table_name(&self) -> &'static str {
110 REGION_STATISTICS
111 }
112
113 fn schema(&self) -> SchemaRef {
114 self.schema.clone()
115 }
116
117 fn to_stream(&self, request: ScanRequest) -> Result<SendableRecordBatchStream> {
118 let schema = self.schema.arrow_schema().clone();
119 let mut builder = self.builder();
120
121 let stream = Box::pin(DfRecordBatchStreamAdapter::new(
122 schema,
123 futures::stream::once(async move {
124 builder
125 .make_region_statistics(Some(request))
126 .await
127 .map(|x| x.into_df_record_batch())
128 .map_err(Into::into)
129 }),
130 ));
131
132 Ok(Box::pin(
133 RecordBatchStreamAdapter::try_new(stream)
134 .map_err(BoxedError::new)
135 .context(InternalSnafu)?,
136 ))
137 }
138}
139
140struct InformationSchemaRegionStatisticsBuilder {
141 schema: SchemaRef,
142 catalog_manager: Weak<dyn CatalogManager>,
143
144 region_ids: UInt64VectorBuilder,
145 table_ids: UInt32VectorBuilder,
146 region_numbers: UInt32VectorBuilder,
147 region_rows: UInt64VectorBuilder,
148 disk_sizes: UInt64VectorBuilder,
149 memtable_sizes: UInt64VectorBuilder,
150 manifest_sizes: UInt64VectorBuilder,
151 sst_sizes: UInt64VectorBuilder,
152 index_sizes: UInt64VectorBuilder,
153 engines: StringVectorBuilder,
154 region_roles: StringVectorBuilder,
155}
156
157impl InformationSchemaRegionStatisticsBuilder {
158 fn new(schema: SchemaRef, catalog_manager: Weak<dyn CatalogManager>) -> Self {
159 Self {
160 schema,
161 catalog_manager,
162 region_ids: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
163 table_ids: UInt32VectorBuilder::with_capacity(INIT_CAPACITY),
164 region_numbers: UInt32VectorBuilder::with_capacity(INIT_CAPACITY),
165 region_rows: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
166 disk_sizes: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
167 memtable_sizes: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
168 manifest_sizes: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
169 sst_sizes: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
170 index_sizes: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
171 engines: StringVectorBuilder::with_capacity(INIT_CAPACITY),
172 region_roles: StringVectorBuilder::with_capacity(INIT_CAPACITY),
173 }
174 }
175
176 async fn make_region_statistics(
178 &mut self,
179 request: Option<ScanRequest>,
180 ) -> Result<RecordBatch> {
181 let predicates = Predicates::from_scan_request(&request);
182 let information_extension = utils::information_extension(&self.catalog_manager)?;
183 let region_stats = information_extension.region_stats().await?;
184 for region_stat in region_stats {
185 self.add_region_statistic(&predicates, region_stat);
186 }
187 self.finish()
188 }
189
190 fn add_region_statistic(&mut self, predicate: &Predicates, region_stat: RegionStat) {
191 let row = [
192 (REGION_ID, &Value::from(region_stat.id.as_u64())),
193 (TABLE_ID, &Value::from(region_stat.id.table_id())),
194 (REGION_NUMBER, &Value::from(region_stat.id.region_number())),
195 (REGION_ROWS, &Value::from(region_stat.num_rows)),
196 (DISK_SIZE, &Value::from(region_stat.approximate_bytes)),
197 (MEMTABLE_SIZE, &Value::from(region_stat.memtable_size)),
198 (MANIFEST_SIZE, &Value::from(region_stat.manifest_size)),
199 (SST_SIZE, &Value::from(region_stat.sst_size)),
200 (INDEX_SIZE, &Value::from(region_stat.index_size)),
201 (ENGINE, &Value::from(region_stat.engine.as_str())),
202 (REGION_ROLE, &Value::from(region_stat.role.to_string())),
203 ];
204
205 if !predicate.eval(&row) {
206 return;
207 }
208
209 self.region_ids.push(Some(region_stat.id.as_u64()));
210 self.table_ids.push(Some(region_stat.id.table_id()));
211 self.region_numbers
212 .push(Some(region_stat.id.region_number()));
213 self.region_rows.push(Some(region_stat.num_rows));
214 self.disk_sizes.push(Some(region_stat.approximate_bytes));
215 self.memtable_sizes.push(Some(region_stat.memtable_size));
216 self.manifest_sizes.push(Some(region_stat.manifest_size));
217 self.sst_sizes.push(Some(region_stat.sst_size));
218 self.index_sizes.push(Some(region_stat.index_size));
219 self.engines.push(Some(®ion_stat.engine));
220 self.region_roles.push(Some(®ion_stat.role.to_string()));
221 }
222
223 fn finish(&mut self) -> Result<RecordBatch> {
224 let columns: Vec<VectorRef> = vec![
225 Arc::new(self.region_ids.finish()),
226 Arc::new(self.table_ids.finish()),
227 Arc::new(self.region_numbers.finish()),
228 Arc::new(self.region_rows.finish()),
229 Arc::new(self.disk_sizes.finish()),
230 Arc::new(self.memtable_sizes.finish()),
231 Arc::new(self.manifest_sizes.finish()),
232 Arc::new(self.sst_sizes.finish()),
233 Arc::new(self.index_sizes.finish()),
234 Arc::new(self.engines.finish()),
235 Arc::new(self.region_roles.finish()),
236 ];
237
238 RecordBatch::new(self.schema.clone(), columns).context(CreateRecordBatchSnafu)
239 }
240}
241
242impl DfPartitionStream for InformationSchemaRegionStatistics {
243 fn schema(&self) -> &ArrowSchemaRef {
244 self.schema.arrow_schema()
245 }
246
247 fn execute(&self, _: Arc<TaskContext>) -> DfSendableRecordBatchStream {
248 let schema = self.schema.arrow_schema().clone();
249 let mut builder = self.builder();
250 Box::pin(DfRecordBatchStreamAdapter::new(
251 schema,
252 futures::stream::once(async move {
253 builder
254 .make_region_statistics(None)
255 .await
256 .map(|x| x.into_df_record_batch())
257 .map_err(Into::into)
258 }),
259 ))
260 }
261}