metric_engine/engine/
state.rs1use std::collections::{HashMap, HashSet};
18
19use common_time::timestamp::TimeUnit;
20use snafu::OptionExt;
21use store_api::codec::PrimaryKeyEncoding;
22use store_api::metadata::ColumnMetadata;
23use store_api::storage::{ColumnId, RegionId};
24
25use crate::engine::options::PhysicalRegionOptions;
26use crate::error::{PhysicalRegionNotFoundSnafu, Result};
27use crate::metrics::LOGICAL_REGION_COUNT;
28use crate::utils::to_data_region_id;
29
30pub struct PhysicalRegionState {
31 logical_regions: HashSet<RegionId>,
32 physical_columns: HashMap<String, ColumnId>,
33 primary_key_encoding: PrimaryKeyEncoding,
34 options: PhysicalRegionOptions,
35 time_index_unit: TimeUnit,
36}
37
38impl PhysicalRegionState {
39 pub fn new(
40 physical_columns: HashMap<String, ColumnId>,
41 primary_key_encoding: PrimaryKeyEncoding,
42 options: PhysicalRegionOptions,
43 time_index_unit: TimeUnit,
44 ) -> Self {
45 Self {
46 logical_regions: HashSet::new(),
47 physical_columns,
48 primary_key_encoding,
49 options,
50 time_index_unit,
51 }
52 }
53
54 pub fn logical_regions(&self) -> &HashSet<RegionId> {
56 &self.logical_regions
57 }
58
59 pub fn physical_columns(&self) -> &HashMap<String, ColumnId> {
61 &self.physical_columns
62 }
63
64 pub fn options(&self) -> &PhysicalRegionOptions {
66 &self.options
67 }
68
69 pub fn remove_logical_region(&mut self, logical_region_id: RegionId) -> bool {
72 self.logical_regions.remove(&logical_region_id)
73 }
74}
75
76#[derive(Default)]
78pub(crate) struct MetricEngineState {
79 physical_regions: HashMap<RegionId, PhysicalRegionState>,
81 logical_regions: HashMap<RegionId, RegionId>,
83 logical_columns: HashMap<RegionId, Vec<ColumnMetadata>>,
87}
88
89impl MetricEngineState {
90 pub fn add_physical_region(
91 &mut self,
92 physical_region_id: RegionId,
93 physical_columns: HashMap<String, ColumnId>,
94 primary_key_encoding: PrimaryKeyEncoding,
95 options: PhysicalRegionOptions,
96 time_index_unit: TimeUnit,
97 ) {
98 let physical_region_id = to_data_region_id(physical_region_id);
99 self.physical_regions.insert(
100 physical_region_id,
101 PhysicalRegionState::new(
102 physical_columns,
103 primary_key_encoding,
104 options,
105 time_index_unit,
106 ),
107 );
108 }
109
110 pub fn add_physical_columns(
113 &mut self,
114 physical_region_id: RegionId,
115 physical_columns: impl IntoIterator<Item = (String, ColumnId)>,
116 ) {
117 let physical_region_id = to_data_region_id(physical_region_id);
118 let state = self.physical_regions.get_mut(&physical_region_id).unwrap();
119 for (col, id) in physical_columns {
120 state.physical_columns.insert(col, id);
121 }
122 }
123
124 pub fn add_logical_regions(
127 &mut self,
128 physical_region_id: RegionId,
129 logical_region_ids: impl IntoIterator<Item = RegionId>,
130 ) {
131 let physical_region_id = to_data_region_id(physical_region_id);
132 let state = self.physical_regions.get_mut(&physical_region_id).unwrap();
133 for logical_region_id in logical_region_ids {
134 state.logical_regions.insert(logical_region_id);
135 self.logical_regions
136 .insert(logical_region_id, physical_region_id);
137 }
138 }
139
140 pub fn invalid_logical_regions_cache(
141 &mut self,
142 logical_region_ids: impl IntoIterator<Item = RegionId>,
143 ) {
144 for logical_region_id in logical_region_ids {
145 self.logical_columns.remove(&logical_region_id);
146 }
147 }
148
149 pub fn add_logical_region(
152 &mut self,
153 physical_region_id: RegionId,
154 logical_region_id: RegionId,
155 ) {
156 let physical_region_id = to_data_region_id(physical_region_id);
157 self.physical_regions
158 .get_mut(&physical_region_id)
159 .unwrap()
160 .logical_regions
161 .insert(logical_region_id);
162 self.logical_regions
163 .insert(logical_region_id, physical_region_id);
164 }
165
166 pub fn set_logical_columns(
168 &mut self,
169 logical_region_id: RegionId,
170 columns: Vec<ColumnMetadata>,
171 ) {
172 self.logical_columns.insert(logical_region_id, columns);
173 }
174
175 pub fn get_physical_region_id(&self, logical_region_id: RegionId) -> Option<RegionId> {
176 self.logical_regions.get(&logical_region_id).copied()
177 }
178
179 pub fn logical_columns(&self) -> &HashMap<RegionId, Vec<ColumnMetadata>> {
180 &self.logical_columns
181 }
182
183 pub fn physical_region_states(&self) -> &HashMap<RegionId, PhysicalRegionState> {
184 &self.physical_regions
185 }
186
187 pub fn exist_physical_region(&self, physical_region_id: RegionId) -> bool {
188 self.physical_regions.contains_key(&physical_region_id)
189 }
190
191 pub fn physical_region_time_index_unit(
192 &self,
193 physical_region_id: RegionId,
194 ) -> Option<TimeUnit> {
195 self.physical_regions
196 .get(&physical_region_id)
197 .map(|state| state.time_index_unit)
198 }
199
200 pub fn get_primary_key_encoding(
201 &self,
202 physical_region_id: RegionId,
203 ) -> Option<PrimaryKeyEncoding> {
204 self.physical_regions
205 .get(&physical_region_id)
206 .map(|state| state.primary_key_encoding)
207 }
208
209 pub fn logical_regions(&self) -> &HashMap<RegionId, RegionId> {
210 &self.logical_regions
211 }
212
213 pub fn remove_physical_region(&mut self, physical_region_id: RegionId) -> Result<()> {
215 let physical_region_id = to_data_region_id(physical_region_id);
216
217 let logical_regions = &self
218 .physical_regions
219 .get(&physical_region_id)
220 .context(PhysicalRegionNotFoundSnafu {
221 region_id: physical_region_id,
222 })?
223 .logical_regions;
224
225 LOGICAL_REGION_COUNT.sub(logical_regions.len() as i64);
226
227 for logical_region in logical_regions {
228 self.logical_regions.remove(logical_region);
229 }
230 self.physical_regions.remove(&physical_region_id);
231 Ok(())
232 }
233
234 pub fn remove_logical_region(&mut self, logical_region_id: RegionId) -> Result<()> {
236 let physical_region_id = self.logical_regions.remove(&logical_region_id).context(
237 PhysicalRegionNotFoundSnafu {
238 region_id: logical_region_id,
239 },
240 )?;
241
242 self.physical_regions
243 .get_mut(&physical_region_id)
244 .unwrap() .remove_logical_region(logical_region_id);
246
247 self.logical_columns.remove(&logical_region_id);
248
249 Ok(())
250 }
251
252 pub fn is_logical_region_exist(&self, logical_region_id: RegionId) -> bool {
253 self.logical_regions().contains_key(&logical_region_id)
254 }
255}