Skip to main content

store_api/
region_info.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use common_recordbatch::DfRecordBatch;
18use datafusion_common::DataFusionError;
19use datafusion_expr::{LogicalPlan, LogicalPlanBuilder, LogicalTableSource};
20use datatypes::arrow::array::{
21    ArrayRef, BooleanArray, StringArray, UInt8Array, UInt32Array, UInt64Array,
22};
23use datatypes::arrow::error::ArrowError;
24use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
25use serde::{Deserialize, Serialize};
26
27use crate::storage::{RegionGroup, RegionId, RegionNumber, RegionSeq, ScanRequest, TableId};
28
29/// Runtime and manifest information of a region for inspection.
30#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
31pub struct RegionInfoEntry {
32    /// The region id.
33    pub region_id: RegionId,
34    /// The table id this region belongs to.
35    pub table_id: TableId,
36    /// The region number inside the table.
37    pub region_number: RegionNumber,
38    /// The region group.
39    pub region_group: RegionGroup,
40    /// The region sequence inside the group.
41    pub region_sequence: RegionSeq,
42    /// The full runtime role/state label.
43    pub state: String,
44    /// The coarse region role.
45    pub role: String,
46    /// Whether the region accepts writes.
47    pub writable: bool,
48    /// The committed sequence of the region.
49    pub committed_sequence: u64,
50    /// The latest sequence that has been persisted into SSTs.
51    pub flushed_sequence: Option<u64>,
52    /// The manifest version of the region.
53    pub manifest_version: u64,
54    /// Human-readable compaction time window.
55    pub compaction_time_window: Option<String>,
56    /// Region options encoded as JSON.
57    pub region_options: String,
58    /// SST format used by the region.
59    pub sst_format: String,
60    /// Datanode id that reports the row.
61    pub node_id: Option<u64>,
62}
63
64impl RegionInfoEntry {
65    /// Returns the schema of the region info entry.
66    pub fn schema() -> SchemaRef {
67        use datatypes::prelude::ConcreteDataType as Ty;
68        Arc::new(Schema::new(vec![
69            ColumnSchema::new("region_id", Ty::uint64_datatype(), false),
70            ColumnSchema::new("table_id", Ty::uint32_datatype(), false),
71            ColumnSchema::new("region_number", Ty::uint32_datatype(), false),
72            ColumnSchema::new("region_group", Ty::uint8_datatype(), false),
73            ColumnSchema::new("region_sequence", Ty::uint32_datatype(), false),
74            ColumnSchema::new("state", Ty::string_datatype(), false),
75            ColumnSchema::new("role", Ty::string_datatype(), false),
76            ColumnSchema::new("writable", Ty::boolean_datatype(), false),
77            ColumnSchema::new("committed_sequence", Ty::uint64_datatype(), false),
78            ColumnSchema::new("flushed_sequence", Ty::uint64_datatype(), true),
79            ColumnSchema::new("manifest_version", Ty::uint64_datatype(), false),
80            ColumnSchema::new("compaction_time_window", Ty::string_datatype(), true),
81            ColumnSchema::new("region_options", Ty::string_datatype(), false),
82            ColumnSchema::new("sst_format", Ty::string_datatype(), false),
83            ColumnSchema::new("node_id", Ty::uint64_datatype(), true),
84        ]))
85    }
86
87    /// Converts a list of region info entries to a record batch.
88    pub fn to_record_batch(entries: &[Self]) -> Result<DfRecordBatch, ArrowError> {
89        let schema = Self::schema();
90        let region_ids = entries.iter().map(|e| e.region_id.as_u64());
91        let table_ids = entries.iter().map(|e| e.table_id);
92        let region_numbers = entries.iter().map(|e| e.region_number);
93        let region_groups = entries.iter().map(|e| e.region_group);
94        let region_sequences = entries.iter().map(|e| e.region_sequence);
95        let states = entries.iter().map(|e| e.state.as_str());
96        let roles = entries.iter().map(|e| e.role.as_str());
97        let writable = entries.iter().map(|e| e.writable);
98        let committed_sequences = entries.iter().map(|e| e.committed_sequence);
99        let flushed_sequences = entries.iter().map(|e| e.flushed_sequence);
100        let manifest_versions = entries.iter().map(|e| e.manifest_version);
101        let compaction_time_windows = entries.iter().map(|e| e.compaction_time_window.as_ref());
102        let region_options = entries.iter().map(|e| e.region_options.as_str());
103        let sst_formats = entries.iter().map(|e| e.sst_format.as_str());
104        let node_ids = entries.iter().map(|e| e.node_id);
105
106        let columns: Vec<ArrayRef> = vec![
107            Arc::new(UInt64Array::from_iter_values(region_ids)),
108            Arc::new(UInt32Array::from_iter_values(table_ids)),
109            Arc::new(UInt32Array::from_iter_values(region_numbers)),
110            Arc::new(UInt8Array::from_iter_values(region_groups)),
111            Arc::new(UInt32Array::from_iter_values(region_sequences)),
112            Arc::new(StringArray::from_iter_values(states)),
113            Arc::new(StringArray::from_iter_values(roles)),
114            Arc::new(BooleanArray::from_iter(writable)),
115            Arc::new(UInt64Array::from_iter_values(committed_sequences)),
116            Arc::new(UInt64Array::from_iter(flushed_sequences)),
117            Arc::new(UInt64Array::from_iter_values(manifest_versions)),
118            Arc::new(StringArray::from_iter(compaction_time_windows)),
119            Arc::new(StringArray::from_iter_values(region_options)),
120            Arc::new(StringArray::from_iter_values(sst_formats)),
121            Arc::new(UInt64Array::from_iter(node_ids)),
122        ];
123
124        DfRecordBatch::try_new(schema.arrow_schema().clone(), columns)
125    }
126
127    /// Reserved internal inspect table name for region info.
128    pub fn reserved_table_name_for_inspection() -> &'static str {
129        "__inspect/__mito/__region_info"
130    }
131
132    /// Builds a logical plan for scanning region info entries.
133    pub fn build_plan(scan_request: ScanRequest) -> Result<LogicalPlan, DataFusionError> {
134        let table_source = LogicalTableSource::new(Self::schema().arrow_schema().clone());
135
136        let projection = scan_request.projection_input.map(|input| input.projection);
137        let mut builder = LogicalPlanBuilder::scan(
138            Self::reserved_table_name_for_inspection(),
139            Arc::new(table_source),
140            projection,
141        )?;
142
143        for filter in scan_request.filters {
144            builder = builder.filter(filter)?;
145        }
146
147        if let Some(limit) = scan_request.limit {
148            builder = builder.limit(0, Some(limit))?;
149        }
150
151        builder.build()
152    }
153}
154
155#[cfg(test)]
156mod tests {
157    use datafusion_common::TableReference;
158    use datafusion_expr::{LogicalPlan, Operator, binary_expr, col, lit};
159    use datatypes::arrow::array::{
160        Array, BooleanArray, StringArray, UInt8Array, UInt32Array, UInt64Array,
161    };
162
163    use super::*;
164    use crate::storage::{RegionId, ScanRequest};
165
166    #[test]
167    fn test_region_info_schema() {
168        let schema = RegionInfoEntry::schema();
169        let columns = schema.column_schemas();
170
171        let names = columns.iter().map(|c| c.name.as_str()).collect::<Vec<_>>();
172        assert_eq!(
173            names,
174            vec![
175                "region_id",
176                "table_id",
177                "region_number",
178                "region_group",
179                "region_sequence",
180                "state",
181                "role",
182                "writable",
183                "committed_sequence",
184                "flushed_sequence",
185                "manifest_version",
186                "compaction_time_window",
187                "region_options",
188                "sst_format",
189                "node_id",
190            ]
191        );
192        assert!(!columns[0].is_nullable());
193        assert!(!columns[8].is_nullable());
194        assert!(columns[9].is_nullable());
195        assert!(columns[11].is_nullable());
196        assert!(columns[14].is_nullable());
197    }
198
199    #[test]
200    fn test_region_info_to_record_batch() {
201        let region_id1 = RegionId::with_group_and_seq(10, 1, 20);
202        let region_id2 = RegionId::with_group_and_seq(11, 0, 21);
203        let entries = vec![
204            RegionInfoEntry {
205                region_id: region_id1,
206                table_id: region_id1.table_id(),
207                region_number: region_id1.region_number(),
208                region_group: region_id1.region_group(),
209                region_sequence: region_id1.region_sequence(),
210                state: "Leader(Writable)".to_string(),
211                role: "Leader".to_string(),
212                writable: true,
213                committed_sequence: 42,
214                flushed_sequence: Some(41),
215                manifest_version: 7,
216                compaction_time_window: Some("1h".to_string()),
217                region_options: "{\"sst_format\":\"flat\"}".to_string(),
218                sst_format: "flat".to_string(),
219                node_id: Some(3),
220            },
221            RegionInfoEntry {
222                region_id: region_id2,
223                table_id: region_id2.table_id(),
224                region_number: region_id2.region_number(),
225                region_group: region_id2.region_group(),
226                region_sequence: region_id2.region_sequence(),
227                state: "Follower".to_string(),
228                role: "Follower".to_string(),
229                writable: false,
230                committed_sequence: 9,
231                flushed_sequence: None,
232                manifest_version: 2,
233                compaction_time_window: None,
234                region_options: "{}".to_string(),
235                sst_format: "primary_key".to_string(),
236                node_id: None,
237            },
238        ];
239
240        let batch = RegionInfoEntry::to_record_batch(&entries).unwrap();
241        assert_eq!(batch.num_rows(), 2);
242
243        let region_ids = batch
244            .column(0)
245            .as_any()
246            .downcast_ref::<UInt64Array>()
247            .unwrap();
248        assert_eq!(region_id1.as_u64(), region_ids.value(0));
249        assert_eq!(region_id2.as_u64(), region_ids.value(1));
250
251        let table_ids = batch
252            .column(1)
253            .as_any()
254            .downcast_ref::<UInt32Array>()
255            .unwrap();
256        assert_eq!(10, table_ids.value(0));
257        assert_eq!(11, table_ids.value(1));
258
259        let region_groups = batch
260            .column(3)
261            .as_any()
262            .downcast_ref::<UInt8Array>()
263            .unwrap();
264        assert_eq!(1, region_groups.value(0));
265        assert_eq!(0, region_groups.value(1));
266
267        let states = batch
268            .column(5)
269            .as_any()
270            .downcast_ref::<StringArray>()
271            .unwrap();
272        assert_eq!("Leader(Writable)", states.value(0));
273        assert_eq!("Follower", states.value(1));
274
275        let writable = batch
276            .column(7)
277            .as_any()
278            .downcast_ref::<BooleanArray>()
279            .unwrap();
280        assert!(writable.value(0));
281        assert!(!writable.value(1));
282
283        let committed_sequences = batch
284            .column(8)
285            .as_any()
286            .downcast_ref::<UInt64Array>()
287            .unwrap();
288        assert_eq!(42, committed_sequences.value(0));
289        assert_eq!(9, committed_sequences.value(1));
290
291        let flushed_sequences = batch
292            .column(9)
293            .as_any()
294            .downcast_ref::<UInt64Array>()
295            .unwrap();
296        assert_eq!(41, flushed_sequences.value(0));
297        assert!(flushed_sequences.is_null(1));
298
299        let compaction_time_windows = batch
300            .column(11)
301            .as_any()
302            .downcast_ref::<StringArray>()
303            .unwrap();
304        assert_eq!("1h", compaction_time_windows.value(0));
305        assert!(compaction_time_windows.is_null(1));
306
307        let node_ids = batch
308            .column(14)
309            .as_any()
310            .downcast_ref::<UInt64Array>()
311            .unwrap();
312        assert_eq!(3, node_ids.value(0));
313        assert!(node_ids.is_null(1));
314    }
315
316    #[test]
317    fn test_region_info_build_plan() {
318        let projection_input = Some(vec![0, 5, 7, 11].into());
319        let request = ScanRequest {
320            projection_input,
321            filters: vec![binary_expr(col("writable"), Operator::Eq, lit(true))],
322            limit: Some(10),
323            ..Default::default()
324        };
325
326        let plan = RegionInfoEntry::build_plan(request).unwrap();
327        let (scan, has_filter, has_limit) = extract_scan(&plan);
328        assert!(has_filter);
329        assert!(has_limit);
330        assert_eq!(
331            scan.table_name,
332            TableReference::bare(RegionInfoEntry::reserved_table_name_for_inspection())
333        );
334        assert_eq!(scan.projection, Some(vec![0, 5, 7, 11]));
335
336        let fields = scan.projected_schema.fields();
337        assert_eq!(fields.len(), 4);
338        assert_eq!(fields[0].name(), "region_id");
339        assert_eq!(fields[1].name(), "state");
340        assert_eq!(fields[2].name(), "writable");
341        assert_eq!(fields[3].name(), "compaction_time_window");
342    }
343
344    fn extract_scan(plan: &LogicalPlan) -> (&datafusion_expr::logical_plan::TableScan, bool, bool) {
345        use datafusion_expr::logical_plan::Limit;
346
347        match plan {
348            LogicalPlan::Filter(f) => {
349                let (scan, _, has_limit) = extract_scan(&f.input);
350                (scan, true, has_limit)
351            }
352            LogicalPlan::Limit(Limit { input, .. }) => {
353                let (scan, has_filter, _) = extract_scan(input);
354                (scan, has_filter, true)
355            }
356            LogicalPlan::TableScan(scan) => (scan, false, false),
357            other => panic!("unexpected plan: {other:?}"),
358        }
359    }
360}