1use std::sync::Arc;
16
17use common_recordbatch::DfRecordBatch;
18use datafusion_common::DataFusionError;
19use datafusion_expr::{LogicalPlan, LogicalPlanBuilder, LogicalTableSource};
20use datatypes::arrow::array::{
21 ArrayRef, BooleanArray, StringArray, UInt8Array, UInt32Array, UInt64Array,
22};
23use datatypes::arrow::error::ArrowError;
24use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
25use serde::{Deserialize, Serialize};
26
27use crate::storage::{RegionGroup, RegionId, RegionNumber, RegionSeq, ScanRequest, TableId};
28
29#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
31pub struct RegionInfoEntry {
32 pub region_id: RegionId,
34 pub table_id: TableId,
36 pub region_number: RegionNumber,
38 pub region_group: RegionGroup,
40 pub region_sequence: RegionSeq,
42 pub state: String,
44 pub role: String,
46 pub writable: bool,
48 pub committed_sequence: u64,
50 pub flushed_sequence: Option<u64>,
52 pub manifest_version: u64,
54 pub compaction_time_window: Option<String>,
56 pub region_options: String,
58 pub sst_format: String,
60 pub node_id: Option<u64>,
62}
63
64impl RegionInfoEntry {
65 pub fn schema() -> SchemaRef {
67 use datatypes::prelude::ConcreteDataType as Ty;
68 Arc::new(Schema::new(vec![
69 ColumnSchema::new("region_id", Ty::uint64_datatype(), false),
70 ColumnSchema::new("table_id", Ty::uint32_datatype(), false),
71 ColumnSchema::new("region_number", Ty::uint32_datatype(), false),
72 ColumnSchema::new("region_group", Ty::uint8_datatype(), false),
73 ColumnSchema::new("region_sequence", Ty::uint32_datatype(), false),
74 ColumnSchema::new("state", Ty::string_datatype(), false),
75 ColumnSchema::new("role", Ty::string_datatype(), false),
76 ColumnSchema::new("writable", Ty::boolean_datatype(), false),
77 ColumnSchema::new("committed_sequence", Ty::uint64_datatype(), false),
78 ColumnSchema::new("flushed_sequence", Ty::uint64_datatype(), true),
79 ColumnSchema::new("manifest_version", Ty::uint64_datatype(), false),
80 ColumnSchema::new("compaction_time_window", Ty::string_datatype(), true),
81 ColumnSchema::new("region_options", Ty::string_datatype(), false),
82 ColumnSchema::new("sst_format", Ty::string_datatype(), false),
83 ColumnSchema::new("node_id", Ty::uint64_datatype(), true),
84 ]))
85 }
86
87 pub fn to_record_batch(entries: &[Self]) -> Result<DfRecordBatch, ArrowError> {
89 let schema = Self::schema();
90 let region_ids = entries.iter().map(|e| e.region_id.as_u64());
91 let table_ids = entries.iter().map(|e| e.table_id);
92 let region_numbers = entries.iter().map(|e| e.region_number);
93 let region_groups = entries.iter().map(|e| e.region_group);
94 let region_sequences = entries.iter().map(|e| e.region_sequence);
95 let states = entries.iter().map(|e| e.state.as_str());
96 let roles = entries.iter().map(|e| e.role.as_str());
97 let writable = entries.iter().map(|e| e.writable);
98 let committed_sequences = entries.iter().map(|e| e.committed_sequence);
99 let flushed_sequences = entries.iter().map(|e| e.flushed_sequence);
100 let manifest_versions = entries.iter().map(|e| e.manifest_version);
101 let compaction_time_windows = entries.iter().map(|e| e.compaction_time_window.as_ref());
102 let region_options = entries.iter().map(|e| e.region_options.as_str());
103 let sst_formats = entries.iter().map(|e| e.sst_format.as_str());
104 let node_ids = entries.iter().map(|e| e.node_id);
105
106 let columns: Vec<ArrayRef> = vec![
107 Arc::new(UInt64Array::from_iter_values(region_ids)),
108 Arc::new(UInt32Array::from_iter_values(table_ids)),
109 Arc::new(UInt32Array::from_iter_values(region_numbers)),
110 Arc::new(UInt8Array::from_iter_values(region_groups)),
111 Arc::new(UInt32Array::from_iter_values(region_sequences)),
112 Arc::new(StringArray::from_iter_values(states)),
113 Arc::new(StringArray::from_iter_values(roles)),
114 Arc::new(BooleanArray::from_iter(writable)),
115 Arc::new(UInt64Array::from_iter_values(committed_sequences)),
116 Arc::new(UInt64Array::from_iter(flushed_sequences)),
117 Arc::new(UInt64Array::from_iter_values(manifest_versions)),
118 Arc::new(StringArray::from_iter(compaction_time_windows)),
119 Arc::new(StringArray::from_iter_values(region_options)),
120 Arc::new(StringArray::from_iter_values(sst_formats)),
121 Arc::new(UInt64Array::from_iter(node_ids)),
122 ];
123
124 DfRecordBatch::try_new(schema.arrow_schema().clone(), columns)
125 }
126
127 pub fn reserved_table_name_for_inspection() -> &'static str {
129 "__inspect/__mito/__region_info"
130 }
131
132 pub fn build_plan(scan_request: ScanRequest) -> Result<LogicalPlan, DataFusionError> {
134 let table_source = LogicalTableSource::new(Self::schema().arrow_schema().clone());
135
136 let projection = scan_request.projection_input.map(|input| input.projection);
137 let mut builder = LogicalPlanBuilder::scan(
138 Self::reserved_table_name_for_inspection(),
139 Arc::new(table_source),
140 projection,
141 )?;
142
143 for filter in scan_request.filters {
144 builder = builder.filter(filter)?;
145 }
146
147 if let Some(limit) = scan_request.limit {
148 builder = builder.limit(0, Some(limit))?;
149 }
150
151 builder.build()
152 }
153}
154
155#[cfg(test)]
156mod tests {
157 use datafusion_common::TableReference;
158 use datafusion_expr::{LogicalPlan, Operator, binary_expr, col, lit};
159 use datatypes::arrow::array::{
160 Array, BooleanArray, StringArray, UInt8Array, UInt32Array, UInt64Array,
161 };
162
163 use super::*;
164 use crate::storage::{RegionId, ScanRequest};
165
166 #[test]
167 fn test_region_info_schema() {
168 let schema = RegionInfoEntry::schema();
169 let columns = schema.column_schemas();
170
171 let names = columns.iter().map(|c| c.name.as_str()).collect::<Vec<_>>();
172 assert_eq!(
173 names,
174 vec![
175 "region_id",
176 "table_id",
177 "region_number",
178 "region_group",
179 "region_sequence",
180 "state",
181 "role",
182 "writable",
183 "committed_sequence",
184 "flushed_sequence",
185 "manifest_version",
186 "compaction_time_window",
187 "region_options",
188 "sst_format",
189 "node_id",
190 ]
191 );
192 assert!(!columns[0].is_nullable());
193 assert!(!columns[8].is_nullable());
194 assert!(columns[9].is_nullable());
195 assert!(columns[11].is_nullable());
196 assert!(columns[14].is_nullable());
197 }
198
199 #[test]
200 fn test_region_info_to_record_batch() {
201 let region_id1 = RegionId::with_group_and_seq(10, 1, 20);
202 let region_id2 = RegionId::with_group_and_seq(11, 0, 21);
203 let entries = vec![
204 RegionInfoEntry {
205 region_id: region_id1,
206 table_id: region_id1.table_id(),
207 region_number: region_id1.region_number(),
208 region_group: region_id1.region_group(),
209 region_sequence: region_id1.region_sequence(),
210 state: "Leader(Writable)".to_string(),
211 role: "Leader".to_string(),
212 writable: true,
213 committed_sequence: 42,
214 flushed_sequence: Some(41),
215 manifest_version: 7,
216 compaction_time_window: Some("1h".to_string()),
217 region_options: "{\"sst_format\":\"flat\"}".to_string(),
218 sst_format: "flat".to_string(),
219 node_id: Some(3),
220 },
221 RegionInfoEntry {
222 region_id: region_id2,
223 table_id: region_id2.table_id(),
224 region_number: region_id2.region_number(),
225 region_group: region_id2.region_group(),
226 region_sequence: region_id2.region_sequence(),
227 state: "Follower".to_string(),
228 role: "Follower".to_string(),
229 writable: false,
230 committed_sequence: 9,
231 flushed_sequence: None,
232 manifest_version: 2,
233 compaction_time_window: None,
234 region_options: "{}".to_string(),
235 sst_format: "primary_key".to_string(),
236 node_id: None,
237 },
238 ];
239
240 let batch = RegionInfoEntry::to_record_batch(&entries).unwrap();
241 assert_eq!(batch.num_rows(), 2);
242
243 let region_ids = batch
244 .column(0)
245 .as_any()
246 .downcast_ref::<UInt64Array>()
247 .unwrap();
248 assert_eq!(region_id1.as_u64(), region_ids.value(0));
249 assert_eq!(region_id2.as_u64(), region_ids.value(1));
250
251 let table_ids = batch
252 .column(1)
253 .as_any()
254 .downcast_ref::<UInt32Array>()
255 .unwrap();
256 assert_eq!(10, table_ids.value(0));
257 assert_eq!(11, table_ids.value(1));
258
259 let region_groups = batch
260 .column(3)
261 .as_any()
262 .downcast_ref::<UInt8Array>()
263 .unwrap();
264 assert_eq!(1, region_groups.value(0));
265 assert_eq!(0, region_groups.value(1));
266
267 let states = batch
268 .column(5)
269 .as_any()
270 .downcast_ref::<StringArray>()
271 .unwrap();
272 assert_eq!("Leader(Writable)", states.value(0));
273 assert_eq!("Follower", states.value(1));
274
275 let writable = batch
276 .column(7)
277 .as_any()
278 .downcast_ref::<BooleanArray>()
279 .unwrap();
280 assert!(writable.value(0));
281 assert!(!writable.value(1));
282
283 let committed_sequences = batch
284 .column(8)
285 .as_any()
286 .downcast_ref::<UInt64Array>()
287 .unwrap();
288 assert_eq!(42, committed_sequences.value(0));
289 assert_eq!(9, committed_sequences.value(1));
290
291 let flushed_sequences = batch
292 .column(9)
293 .as_any()
294 .downcast_ref::<UInt64Array>()
295 .unwrap();
296 assert_eq!(41, flushed_sequences.value(0));
297 assert!(flushed_sequences.is_null(1));
298
299 let compaction_time_windows = batch
300 .column(11)
301 .as_any()
302 .downcast_ref::<StringArray>()
303 .unwrap();
304 assert_eq!("1h", compaction_time_windows.value(0));
305 assert!(compaction_time_windows.is_null(1));
306
307 let node_ids = batch
308 .column(14)
309 .as_any()
310 .downcast_ref::<UInt64Array>()
311 .unwrap();
312 assert_eq!(3, node_ids.value(0));
313 assert!(node_ids.is_null(1));
314 }
315
316 #[test]
317 fn test_region_info_build_plan() {
318 let projection_input = Some(vec![0, 5, 7, 11].into());
319 let request = ScanRequest {
320 projection_input,
321 filters: vec![binary_expr(col("writable"), Operator::Eq, lit(true))],
322 limit: Some(10),
323 ..Default::default()
324 };
325
326 let plan = RegionInfoEntry::build_plan(request).unwrap();
327 let (scan, has_filter, has_limit) = extract_scan(&plan);
328 assert!(has_filter);
329 assert!(has_limit);
330 assert_eq!(
331 scan.table_name,
332 TableReference::bare(RegionInfoEntry::reserved_table_name_for_inspection())
333 );
334 assert_eq!(scan.projection, Some(vec![0, 5, 7, 11]));
335
336 let fields = scan.projected_schema.fields();
337 assert_eq!(fields.len(), 4);
338 assert_eq!(fields[0].name(), "region_id");
339 assert_eq!(fields[1].name(), "state");
340 assert_eq!(fields[2].name(), "writable");
341 assert_eq!(fields[3].name(), "compaction_time_window");
342 }
343
344 fn extract_scan(plan: &LogicalPlan) -> (&datafusion_expr::logical_plan::TableScan, bool, bool) {
345 use datafusion_expr::logical_plan::Limit;
346
347 match plan {
348 LogicalPlan::Filter(f) => {
349 let (scan, _, has_limit) = extract_scan(&f.input);
350 (scan, true, has_limit)
351 }
352 LogicalPlan::Limit(Limit { input, .. }) => {
353 let (scan, has_filter, _) = extract_scan(input);
354 (scan, has_filter, true)
355 }
356 LogicalPlan::TableScan(scan) => (scan, false, false),
357 other => panic!("unexpected plan: {other:?}"),
358 }
359 }
360}