metric_engine/engine/
state.rs1use std::collections::{HashMap, HashSet};
18
19use snafu::OptionExt;
20use store_api::codec::PrimaryKeyEncoding;
21use store_api::metadata::ColumnMetadata;
22use store_api::storage::{ColumnId, RegionId};
23
24use crate::engine::options::PhysicalRegionOptions;
25use crate::error::{PhysicalRegionNotFoundSnafu, Result};
26use crate::metrics::LOGICAL_REGION_COUNT;
27use crate::utils::to_data_region_id;
28
29pub struct PhysicalRegionState {
30 logical_regions: HashSet<RegionId>,
31 physical_columns: HashMap<String, ColumnId>,
32 primary_key_encoding: PrimaryKeyEncoding,
33 options: PhysicalRegionOptions,
34}
35
36impl PhysicalRegionState {
37 pub fn new(
38 physical_columns: HashMap<String, ColumnId>,
39 primary_key_encoding: PrimaryKeyEncoding,
40 options: PhysicalRegionOptions,
41 ) -> Self {
42 Self {
43 logical_regions: HashSet::new(),
44 physical_columns,
45 primary_key_encoding,
46 options,
47 }
48 }
49
50 pub fn logical_regions(&self) -> &HashSet<RegionId> {
52 &self.logical_regions
53 }
54
55 pub fn physical_columns(&self) -> &HashMap<String, ColumnId> {
57 &self.physical_columns
58 }
59
60 pub fn options(&self) -> &PhysicalRegionOptions {
62 &self.options
63 }
64
65 pub fn remove_logical_region(&mut self, logical_region_id: RegionId) -> bool {
68 self.logical_regions.remove(&logical_region_id)
69 }
70}
71
72#[derive(Default)]
74pub(crate) struct MetricEngineState {
75 physical_regions: HashMap<RegionId, PhysicalRegionState>,
77 logical_regions: HashMap<RegionId, RegionId>,
79 logical_columns: HashMap<RegionId, Vec<ColumnMetadata>>,
83}
84
85impl MetricEngineState {
86 pub fn add_physical_region(
87 &mut self,
88 physical_region_id: RegionId,
89 physical_columns: HashMap<String, ColumnId>,
90 primary_key_encoding: PrimaryKeyEncoding,
91 options: PhysicalRegionOptions,
92 ) {
93 let physical_region_id = to_data_region_id(physical_region_id);
94 self.physical_regions.insert(
95 physical_region_id,
96 PhysicalRegionState::new(physical_columns, primary_key_encoding, options),
97 );
98 }
99
100 pub fn add_physical_columns(
103 &mut self,
104 physical_region_id: RegionId,
105 physical_columns: impl IntoIterator<Item = (String, ColumnId)>,
106 ) {
107 let physical_region_id = to_data_region_id(physical_region_id);
108 let state = self.physical_regions.get_mut(&physical_region_id).unwrap();
109 for (col, id) in physical_columns {
110 state.physical_columns.insert(col, id);
111 }
112 }
113
114 pub fn add_logical_regions(
117 &mut self,
118 physical_region_id: RegionId,
119 logical_region_ids: impl IntoIterator<Item = RegionId>,
120 ) {
121 let physical_region_id = to_data_region_id(physical_region_id);
122 let state = self.physical_regions.get_mut(&physical_region_id).unwrap();
123 for logical_region_id in logical_region_ids {
124 state.logical_regions.insert(logical_region_id);
125 self.logical_regions
126 .insert(logical_region_id, physical_region_id);
127 }
128 }
129
130 pub fn invalid_logical_regions_cache(
131 &mut self,
132 logical_region_ids: impl IntoIterator<Item = RegionId>,
133 ) {
134 for logical_region_id in logical_region_ids {
135 self.logical_columns.remove(&logical_region_id);
136 }
137 }
138
139 pub fn add_logical_region(
142 &mut self,
143 physical_region_id: RegionId,
144 logical_region_id: RegionId,
145 ) {
146 let physical_region_id = to_data_region_id(physical_region_id);
147 self.physical_regions
148 .get_mut(&physical_region_id)
149 .unwrap()
150 .logical_regions
151 .insert(logical_region_id);
152 self.logical_regions
153 .insert(logical_region_id, physical_region_id);
154 }
155
156 pub fn set_logical_columns(
158 &mut self,
159 logical_region_id: RegionId,
160 columns: Vec<ColumnMetadata>,
161 ) {
162 self.logical_columns.insert(logical_region_id, columns);
163 }
164
165 pub fn get_physical_region_id(&self, logical_region_id: RegionId) -> Option<RegionId> {
166 self.logical_regions.get(&logical_region_id).copied()
167 }
168
169 pub fn logical_columns(&self) -> &HashMap<RegionId, Vec<ColumnMetadata>> {
170 &self.logical_columns
171 }
172
173 pub fn physical_region_states(&self) -> &HashMap<RegionId, PhysicalRegionState> {
174 &self.physical_regions
175 }
176
177 pub fn exist_physical_region(&self, physical_region_id: RegionId) -> bool {
178 self.physical_regions.contains_key(&physical_region_id)
179 }
180
181 pub fn get_primary_key_encoding(
182 &self,
183 physical_region_id: RegionId,
184 ) -> Option<PrimaryKeyEncoding> {
185 self.physical_regions
186 .get(&physical_region_id)
187 .map(|state| state.primary_key_encoding)
188 }
189
190 pub fn logical_regions(&self) -> &HashMap<RegionId, RegionId> {
191 &self.logical_regions
192 }
193
194 pub fn remove_physical_region(&mut self, physical_region_id: RegionId) -> Result<()> {
196 let physical_region_id = to_data_region_id(physical_region_id);
197
198 let logical_regions = &self
199 .physical_regions
200 .get(&physical_region_id)
201 .context(PhysicalRegionNotFoundSnafu {
202 region_id: physical_region_id,
203 })?
204 .logical_regions;
205
206 LOGICAL_REGION_COUNT.sub(logical_regions.len() as i64);
207
208 for logical_region in logical_regions {
209 self.logical_regions.remove(logical_region);
210 }
211 self.physical_regions.remove(&physical_region_id);
212 Ok(())
213 }
214
215 pub fn remove_logical_region(&mut self, logical_region_id: RegionId) -> Result<()> {
217 let physical_region_id = self.logical_regions.remove(&logical_region_id).context(
218 PhysicalRegionNotFoundSnafu {
219 region_id: logical_region_id,
220 },
221 )?;
222
223 self.physical_regions
224 .get_mut(&physical_region_id)
225 .unwrap() .remove_logical_region(logical_region_id);
227
228 self.logical_columns.remove(&logical_region_id);
229
230 Ok(())
231 }
232
233 pub fn is_logical_region_exist(&self, logical_region_id: RegionId) -> bool {
234 self.logical_regions().contains_key(&logical_region_id)
235 }
236}