metric_engine/engine/
state.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Internal states of metric engine
16
17use std::collections::{HashMap, HashSet};
18
19use snafu::OptionExt;
20use store_api::codec::PrimaryKeyEncoding;
21use store_api::metadata::ColumnMetadata;
22use store_api::storage::{ColumnId, RegionId};
23
24use crate::engine::options::PhysicalRegionOptions;
25use crate::error::{PhysicalRegionNotFoundSnafu, Result};
26use crate::metrics::LOGICAL_REGION_COUNT;
27use crate::utils::to_data_region_id;
28
29pub struct PhysicalRegionState {
30    logical_regions: HashSet<RegionId>,
31    physical_columns: HashMap<String, ColumnId>,
32    primary_key_encoding: PrimaryKeyEncoding,
33    options: PhysicalRegionOptions,
34}
35
36impl PhysicalRegionState {
37    pub fn new(
38        physical_columns: HashMap<String, ColumnId>,
39        primary_key_encoding: PrimaryKeyEncoding,
40        options: PhysicalRegionOptions,
41    ) -> Self {
42        Self {
43            logical_regions: HashSet::new(),
44            physical_columns,
45            primary_key_encoding,
46            options,
47        }
48    }
49
50    /// Returns a reference to the logical region ids.
51    pub fn logical_regions(&self) -> &HashSet<RegionId> {
52        &self.logical_regions
53    }
54
55    /// Returns a reference to the physical columns.
56    pub fn physical_columns(&self) -> &HashMap<String, ColumnId> {
57        &self.physical_columns
58    }
59
60    /// Returns a reference to the physical region options.
61    pub fn options(&self) -> &PhysicalRegionOptions {
62        &self.options
63    }
64
65    /// Removes a logical region id from the physical region state.
66    /// Returns true if the logical region id was present.
67    pub fn remove_logical_region(&mut self, logical_region_id: RegionId) -> bool {
68        self.logical_regions.remove(&logical_region_id)
69    }
70}
71
72/// Internal states of metric engine
73#[derive(Default)]
74pub(crate) struct MetricEngineState {
75    /// Physical regions states.
76    physical_regions: HashMap<RegionId, PhysicalRegionState>,
77    /// Mapping from logical region id to physical region id.
78    logical_regions: HashMap<RegionId, RegionId>,
79    /// Cache for the column metadata of logical regions.
80    /// The column order is the same with the order in the metadata, which is
81    /// alphabetically ordered on column name.
82    logical_columns: HashMap<RegionId, Vec<ColumnMetadata>>,
83}
84
85impl MetricEngineState {
86    pub fn add_physical_region(
87        &mut self,
88        physical_region_id: RegionId,
89        physical_columns: HashMap<String, ColumnId>,
90        primary_key_encoding: PrimaryKeyEncoding,
91        options: PhysicalRegionOptions,
92    ) {
93        let physical_region_id = to_data_region_id(physical_region_id);
94        self.physical_regions.insert(
95            physical_region_id,
96            PhysicalRegionState::new(physical_columns, primary_key_encoding, options),
97        );
98    }
99
100    /// # Panic
101    /// if the physical region does not exist
102    pub fn add_physical_columns(
103        &mut self,
104        physical_region_id: RegionId,
105        physical_columns: impl IntoIterator<Item = (String, ColumnId)>,
106    ) {
107        let physical_region_id = to_data_region_id(physical_region_id);
108        let state = self.physical_regions.get_mut(&physical_region_id).unwrap();
109        for (col, id) in physical_columns {
110            state.physical_columns.insert(col, id);
111        }
112    }
113
114    /// # Panic
115    /// if the physical region does not exist
116    pub fn add_logical_regions(
117        &mut self,
118        physical_region_id: RegionId,
119        logical_region_ids: impl IntoIterator<Item = RegionId>,
120    ) {
121        let physical_region_id = to_data_region_id(physical_region_id);
122        let state = self.physical_regions.get_mut(&physical_region_id).unwrap();
123        for logical_region_id in logical_region_ids {
124            state.logical_regions.insert(logical_region_id);
125            self.logical_regions
126                .insert(logical_region_id, physical_region_id);
127        }
128    }
129
130    pub fn invalid_logical_regions_cache(
131        &mut self,
132        logical_region_ids: impl IntoIterator<Item = RegionId>,
133    ) {
134        for logical_region_id in logical_region_ids {
135            self.logical_columns.remove(&logical_region_id);
136        }
137    }
138
139    /// # Panic
140    /// if the physical region does not exist
141    pub fn add_logical_region(
142        &mut self,
143        physical_region_id: RegionId,
144        logical_region_id: RegionId,
145    ) {
146        let physical_region_id = to_data_region_id(physical_region_id);
147        self.physical_regions
148            .get_mut(&physical_region_id)
149            .unwrap()
150            .logical_regions
151            .insert(logical_region_id);
152        self.logical_regions
153            .insert(logical_region_id, physical_region_id);
154    }
155
156    /// Replace the logical columns of the logical region with given columns.
157    pub fn set_logical_columns(
158        &mut self,
159        logical_region_id: RegionId,
160        columns: Vec<ColumnMetadata>,
161    ) {
162        self.logical_columns.insert(logical_region_id, columns);
163    }
164
165    pub fn get_physical_region_id(&self, logical_region_id: RegionId) -> Option<RegionId> {
166        self.logical_regions.get(&logical_region_id).copied()
167    }
168
169    pub fn logical_columns(&self) -> &HashMap<RegionId, Vec<ColumnMetadata>> {
170        &self.logical_columns
171    }
172
173    pub fn physical_region_states(&self) -> &HashMap<RegionId, PhysicalRegionState> {
174        &self.physical_regions
175    }
176
177    pub fn exist_physical_region(&self, physical_region_id: RegionId) -> bool {
178        self.physical_regions.contains_key(&physical_region_id)
179    }
180
181    pub fn get_primary_key_encoding(
182        &self,
183        physical_region_id: RegionId,
184    ) -> Option<PrimaryKeyEncoding> {
185        self.physical_regions
186            .get(&physical_region_id)
187            .map(|state| state.primary_key_encoding)
188    }
189
190    pub fn logical_regions(&self) -> &HashMap<RegionId, RegionId> {
191        &self.logical_regions
192    }
193
194    /// Remove all data that are related to the physical region id.
195    pub fn remove_physical_region(&mut self, physical_region_id: RegionId) -> Result<()> {
196        let physical_region_id = to_data_region_id(physical_region_id);
197
198        let logical_regions = &self
199            .physical_regions
200            .get(&physical_region_id)
201            .context(PhysicalRegionNotFoundSnafu {
202                region_id: physical_region_id,
203            })?
204            .logical_regions;
205
206        LOGICAL_REGION_COUNT.sub(logical_regions.len() as i64);
207
208        for logical_region in logical_regions {
209            self.logical_regions.remove(logical_region);
210        }
211        self.physical_regions.remove(&physical_region_id);
212        Ok(())
213    }
214
215    /// Remove all data that are related to the logical region id.
216    pub fn remove_logical_region(&mut self, logical_region_id: RegionId) -> Result<()> {
217        let physical_region_id = self.logical_regions.remove(&logical_region_id).context(
218            PhysicalRegionNotFoundSnafu {
219                region_id: logical_region_id,
220            },
221        )?;
222
223        self.physical_regions
224            .get_mut(&physical_region_id)
225            .unwrap() // Safety: physical_region_id is got from physical_regions
226            .remove_logical_region(logical_region_id);
227
228        self.logical_columns.remove(&logical_region_id);
229
230        Ok(())
231    }
232
233    pub fn is_logical_region_exist(&self, logical_region_id: RegionId) -> bool {
234        self.logical_regions().contains_key(&logical_region_id)
235    }
236}