metric_engine/engine/
state.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Internal states of metric engine
16
17use std::collections::{HashMap, HashSet};
18
19use common_time::timestamp::TimeUnit;
20use snafu::OptionExt;
21use store_api::codec::PrimaryKeyEncoding;
22use store_api::metadata::ColumnMetadata;
23use store_api::storage::{ColumnId, RegionId};
24
25use crate::engine::options::PhysicalRegionOptions;
26use crate::error::{PhysicalRegionNotFoundSnafu, Result};
27use crate::metrics::LOGICAL_REGION_COUNT;
28use crate::utils::to_data_region_id;
29
30pub struct PhysicalRegionState {
31    logical_regions: HashSet<RegionId>,
32    physical_columns: HashMap<String, ColumnId>,
33    primary_key_encoding: PrimaryKeyEncoding,
34    options: PhysicalRegionOptions,
35    time_index_unit: TimeUnit,
36}
37
38impl PhysicalRegionState {
39    pub fn new(
40        physical_columns: HashMap<String, ColumnId>,
41        primary_key_encoding: PrimaryKeyEncoding,
42        options: PhysicalRegionOptions,
43        time_index_unit: TimeUnit,
44    ) -> Self {
45        Self {
46            logical_regions: HashSet::new(),
47            physical_columns,
48            primary_key_encoding,
49            options,
50            time_index_unit,
51        }
52    }
53
54    /// Returns a reference to the logical region ids.
55    pub fn logical_regions(&self) -> &HashSet<RegionId> {
56        &self.logical_regions
57    }
58
59    /// Returns a reference to the physical columns.
60    pub fn physical_columns(&self) -> &HashMap<String, ColumnId> {
61        &self.physical_columns
62    }
63
64    /// Returns a reference to the physical region options.
65    pub fn options(&self) -> &PhysicalRegionOptions {
66        &self.options
67    }
68
69    /// Removes a logical region id from the physical region state.
70    /// Returns true if the logical region id was present.
71    pub fn remove_logical_region(&mut self, logical_region_id: RegionId) -> bool {
72        self.logical_regions.remove(&logical_region_id)
73    }
74}
75
76/// Internal states of metric engine
77#[derive(Default)]
78pub(crate) struct MetricEngineState {
79    /// Physical regions states.
80    physical_regions: HashMap<RegionId, PhysicalRegionState>,
81    /// Mapping from logical region id to physical region id.
82    logical_regions: HashMap<RegionId, RegionId>,
83    /// Cache for the column metadata of logical regions.
84    /// The column order is the same with the order in the metadata, which is
85    /// alphabetically ordered on column name.
86    logical_columns: HashMap<RegionId, Vec<ColumnMetadata>>,
87}
88
89impl MetricEngineState {
90    pub fn add_physical_region(
91        &mut self,
92        physical_region_id: RegionId,
93        physical_columns: HashMap<String, ColumnId>,
94        primary_key_encoding: PrimaryKeyEncoding,
95        options: PhysicalRegionOptions,
96        time_index_unit: TimeUnit,
97    ) {
98        let physical_region_id = to_data_region_id(physical_region_id);
99        self.physical_regions.insert(
100            physical_region_id,
101            PhysicalRegionState::new(
102                physical_columns,
103                primary_key_encoding,
104                options,
105                time_index_unit,
106            ),
107        );
108    }
109
110    /// # Panic
111    /// if the physical region does not exist
112    pub fn add_physical_columns(
113        &mut self,
114        physical_region_id: RegionId,
115        physical_columns: impl IntoIterator<Item = (String, ColumnId)>,
116    ) {
117        let physical_region_id = to_data_region_id(physical_region_id);
118        let state = self.physical_regions.get_mut(&physical_region_id).unwrap();
119        for (col, id) in physical_columns {
120            state.physical_columns.insert(col, id);
121        }
122    }
123
124    /// # Panic
125    /// if the physical region does not exist
126    pub fn add_logical_regions(
127        &mut self,
128        physical_region_id: RegionId,
129        logical_region_ids: impl IntoIterator<Item = RegionId>,
130    ) {
131        let physical_region_id = to_data_region_id(physical_region_id);
132        let state = self.physical_regions.get_mut(&physical_region_id).unwrap();
133        for logical_region_id in logical_region_ids {
134            state.logical_regions.insert(logical_region_id);
135            self.logical_regions
136                .insert(logical_region_id, physical_region_id);
137        }
138    }
139
140    pub fn invalid_logical_regions_cache(
141        &mut self,
142        logical_region_ids: impl IntoIterator<Item = RegionId>,
143    ) {
144        for logical_region_id in logical_region_ids {
145            self.logical_columns.remove(&logical_region_id);
146        }
147    }
148
149    /// # Panic
150    /// if the physical region does not exist
151    pub fn add_logical_region(
152        &mut self,
153        physical_region_id: RegionId,
154        logical_region_id: RegionId,
155    ) {
156        let physical_region_id = to_data_region_id(physical_region_id);
157        self.physical_regions
158            .get_mut(&physical_region_id)
159            .unwrap()
160            .logical_regions
161            .insert(logical_region_id);
162        self.logical_regions
163            .insert(logical_region_id, physical_region_id);
164    }
165
166    /// Replace the logical columns of the logical region with given columns.
167    pub fn set_logical_columns(
168        &mut self,
169        logical_region_id: RegionId,
170        columns: Vec<ColumnMetadata>,
171    ) {
172        self.logical_columns.insert(logical_region_id, columns);
173    }
174
175    pub fn get_physical_region_id(&self, logical_region_id: RegionId) -> Option<RegionId> {
176        self.logical_regions.get(&logical_region_id).copied()
177    }
178
179    pub fn logical_columns(&self) -> &HashMap<RegionId, Vec<ColumnMetadata>> {
180        &self.logical_columns
181    }
182
183    pub fn physical_region_states(&self) -> &HashMap<RegionId, PhysicalRegionState> {
184        &self.physical_regions
185    }
186
187    pub fn exist_physical_region(&self, physical_region_id: RegionId) -> bool {
188        self.physical_regions.contains_key(&physical_region_id)
189    }
190
191    pub fn physical_region_time_index_unit(
192        &self,
193        physical_region_id: RegionId,
194    ) -> Option<TimeUnit> {
195        self.physical_regions
196            .get(&physical_region_id)
197            .map(|state| state.time_index_unit)
198    }
199
200    pub fn get_primary_key_encoding(
201        &self,
202        physical_region_id: RegionId,
203    ) -> Option<PrimaryKeyEncoding> {
204        self.physical_regions
205            .get(&physical_region_id)
206            .map(|state| state.primary_key_encoding)
207    }
208
209    pub fn logical_regions(&self) -> &HashMap<RegionId, RegionId> {
210        &self.logical_regions
211    }
212
213    /// Remove all data that are related to the physical region id.
214    pub fn remove_physical_region(&mut self, physical_region_id: RegionId) -> Result<()> {
215        let physical_region_id = to_data_region_id(physical_region_id);
216
217        let logical_regions = &self
218            .physical_regions
219            .get(&physical_region_id)
220            .context(PhysicalRegionNotFoundSnafu {
221                region_id: physical_region_id,
222            })?
223            .logical_regions;
224
225        LOGICAL_REGION_COUNT.sub(logical_regions.len() as i64);
226
227        for logical_region in logical_regions {
228            self.logical_regions.remove(logical_region);
229        }
230        self.physical_regions.remove(&physical_region_id);
231        Ok(())
232    }
233
234    /// Remove all data that are related to the logical region id.
235    pub fn remove_logical_region(&mut self, logical_region_id: RegionId) -> Result<()> {
236        let physical_region_id = self.logical_regions.remove(&logical_region_id).context(
237            PhysicalRegionNotFoundSnafu {
238                region_id: logical_region_id,
239            },
240        )?;
241
242        self.physical_regions
243            .get_mut(&physical_region_id)
244            .unwrap() // Safety: physical_region_id is got from physical_regions
245            .remove_logical_region(logical_region_id);
246
247        self.logical_columns.remove(&logical_region_id);
248
249        Ok(())
250    }
251
252    pub fn is_logical_region_exist(&self, logical_region_id: RegionId) -> bool {
253        self.logical_regions().contains_key(&logical_region_id)
254    }
255}