metric_engine/engine/
alter.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod extract_new_columns;
16mod validate;
17
18use std::collections::{HashMap, HashSet};
19
20use extract_new_columns::extract_new_columns;
21use snafu::{ensure, OptionExt, ResultExt};
22use store_api::metadata::ColumnMetadata;
23use store_api::metric_engine_consts::ALTER_PHYSICAL_EXTENSION_KEY;
24use store_api::region_request::{AffectedRows, AlterKind, RegionAlterRequest};
25use store_api::storage::RegionId;
26use validate::validate_alter_region_requests;
27
28use crate::engine::MetricEngineInner;
29use crate::error::{
30    LogicalRegionNotFoundSnafu, PhysicalRegionNotFoundSnafu, Result, SerializeColumnMetadataSnafu,
31    UnexpectedRequestSnafu,
32};
33use crate::utils::{append_manifest_info, encode_manifest_info_to_extensions, to_data_region_id};
34
35impl MetricEngineInner {
36    pub async fn alter_regions(
37        &self,
38        mut requests: Vec<(RegionId, RegionAlterRequest)>,
39        extension_return_value: &mut HashMap<String, Vec<u8>>,
40    ) -> Result<AffectedRows> {
41        if requests.is_empty() {
42            return Ok(0);
43        }
44
45        let first_region_id = &requests.first().unwrap().0;
46        if self.is_physical_region(*first_region_id) {
47            ensure!(
48                requests.len() == 1,
49                UnexpectedRequestSnafu {
50                    reason: "Physical table must be altered with single request".to_string(),
51                }
52            );
53            let (region_id, request) = requests.pop().unwrap();
54            self.alter_physical_region(region_id, request).await?;
55        } else {
56            // Fast path for single logical region alter request
57            if requests.len() == 1 {
58                // Safety: requests is not empty
59                let region_id = requests.first().unwrap().0;
60                let physical_region_id = self
61                    .state
62                    .read()
63                    .unwrap()
64                    .get_physical_region_id(region_id)
65                    .with_context(|| LogicalRegionNotFoundSnafu { region_id })?;
66                let mut manifest_infos = Vec::with_capacity(1);
67                self.alter_logical_regions(physical_region_id, requests, extension_return_value)
68                    .await?;
69                append_manifest_info(&self.mito, region_id, &mut manifest_infos);
70                encode_manifest_info_to_extensions(&manifest_infos, extension_return_value)?;
71            } else {
72                let grouped_requests =
73                    self.group_logical_region_requests_by_physical_region_id(requests)?;
74                let mut manifest_infos = Vec::with_capacity(grouped_requests.len());
75                for (physical_region_id, requests) in grouped_requests {
76                    self.alter_logical_regions(
77                        physical_region_id,
78                        requests,
79                        extension_return_value,
80                    )
81                    .await?;
82                    append_manifest_info(&self.mito, physical_region_id, &mut manifest_infos);
83                }
84                encode_manifest_info_to_extensions(&manifest_infos, extension_return_value)?;
85            }
86        }
87        Ok(0)
88    }
89
90    /// Groups the alter logical region requests by physical region id.
91    fn group_logical_region_requests_by_physical_region_id(
92        &self,
93        requests: Vec<(RegionId, RegionAlterRequest)>,
94    ) -> Result<HashMap<RegionId, Vec<(RegionId, RegionAlterRequest)>>> {
95        let mut result = HashMap::with_capacity(requests.len());
96        let state = self.state.read().unwrap();
97
98        for (region_id, request) in requests {
99            let physical_region_id = state
100                .get_physical_region_id(region_id)
101                .with_context(|| LogicalRegionNotFoundSnafu { region_id })?;
102            result
103                .entry(physical_region_id)
104                .or_insert_with(Vec::new)
105                .push((region_id, request));
106        }
107
108        Ok(result)
109    }
110
111    /// Alter multiple logical regions on the same physical region.
112    pub async fn alter_logical_regions(
113        &self,
114        physical_region_id: RegionId,
115        requests: Vec<(RegionId, RegionAlterRequest)>,
116        extension_return_value: &mut HashMap<String, Vec<u8>>,
117    ) -> Result<AffectedRows> {
118        // Checks all alter requests are add columns.
119        validate_alter_region_requests(&requests)?;
120
121        // Finds new columns to add
122        let mut new_column_names = HashSet::new();
123        let mut new_columns_to_add = vec![];
124
125        let index_options = {
126            let state = &self.state.read().unwrap();
127            let region_state = state
128                .physical_region_states()
129                .get(&physical_region_id)
130                .with_context(|| PhysicalRegionNotFoundSnafu {
131                    region_id: physical_region_id,
132                })?;
133            let physical_columns = region_state.physical_columns();
134
135            extract_new_columns(
136                &requests,
137                physical_columns,
138                &mut new_column_names,
139                &mut new_columns_to_add,
140            )?;
141
142            region_state.options().index
143        };
144        let data_region_id = to_data_region_id(physical_region_id);
145
146        let mut write_guards = HashMap::with_capacity(requests.len());
147        for (region_id, _) in requests.iter() {
148            if write_guards.contains_key(region_id) {
149                continue;
150            }
151            let _write_guard = self
152                .metadata_region
153                .write_lock_logical_region(*region_id)
154                .await?;
155            write_guards.insert(*region_id, _write_guard);
156        }
157
158        self.data_region
159            .add_columns(data_region_id, new_columns_to_add, index_options)
160            .await?;
161
162        let physical_columns = self.data_region.physical_columns(data_region_id).await?;
163        let physical_schema_map = physical_columns
164            .iter()
165            .map(|metadata| (metadata.column_schema.name.as_str(), metadata))
166            .collect::<HashMap<_, _>>();
167
168        let logical_region_columns = requests.iter().map(|(region_id, request)| {
169            let AlterKind::AddColumns { columns } = &request.kind else {
170                unreachable!()
171            };
172            (
173                *region_id,
174                columns
175                    .iter()
176                    .map(|col| {
177                        let column_name = col.column_metadata.column_schema.name.as_str();
178                        let column_metadata = *physical_schema_map.get(column_name).unwrap();
179                        (column_name, column_metadata)
180                    })
181                    .collect::<HashMap<_, _>>(),
182            )
183        });
184
185        let new_add_columns = new_column_names.iter().map(|name| {
186            // Safety: previous steps ensure the physical region exist
187            let column_metadata = *physical_schema_map.get(name).unwrap();
188            (name.to_string(), column_metadata.column_id)
189        });
190
191        // Writes logical regions metadata to metadata region
192        self.metadata_region
193            .add_logical_regions(physical_region_id, false, logical_region_columns)
194            .await?;
195
196        extension_return_value.insert(
197            ALTER_PHYSICAL_EXTENSION_KEY.to_string(),
198            ColumnMetadata::encode_list(&physical_columns).context(SerializeColumnMetadataSnafu)?,
199        );
200
201        let mut state = self.state.write().unwrap();
202        state.add_physical_columns(data_region_id, new_add_columns);
203        state.invalid_logical_regions_cache(requests.iter().map(|(region_id, _)| *region_id));
204
205        Ok(0)
206    }
207
208    async fn alter_physical_region(
209        &self,
210        region_id: RegionId,
211        request: RegionAlterRequest,
212    ) -> Result<()> {
213        self.data_region
214            .alter_region_options(region_id, request)
215            .await?;
216        Ok(())
217    }
218}
219
220#[cfg(test)]
221mod test {
222    use std::time::Duration;
223
224    use api::v1::SemanticType;
225    use datatypes::data_type::ConcreteDataType;
226    use datatypes::schema::ColumnSchema;
227    use store_api::metadata::ColumnMetadata;
228    use store_api::region_request::{AddColumn, SetRegionOption};
229
230    use super::*;
231    use crate::test_util::TestEnv;
232
233    #[tokio::test]
234    async fn test_alter_region() {
235        let env = TestEnv::new().await;
236        env.init_metric_region().await;
237        let engine = env.metric();
238        let engine_inner = engine.inner;
239
240        // alter physical region
241        let physical_region_id = env.default_physical_region_id();
242        let request = RegionAlterRequest {
243            kind: AlterKind::AddColumns {
244                columns: vec![AddColumn {
245                    column_metadata: ColumnMetadata {
246                        column_id: 0,
247                        semantic_type: SemanticType::Tag,
248                        column_schema: ColumnSchema::new(
249                            "tag1",
250                            ConcreteDataType::string_datatype(),
251                            false,
252                        ),
253                    },
254                    location: None,
255                }],
256            },
257        };
258
259        let result = engine_inner
260            .alter_physical_region(physical_region_id, request.clone())
261            .await;
262        assert!(result.is_err());
263        assert_eq!(
264            result.unwrap_err().to_string(),
265            "Alter request to physical region is forbidden".to_string()
266        );
267
268        // alter physical region's option should work
269        let alter_region_option_request = RegionAlterRequest {
270            kind: AlterKind::SetRegionOptions {
271                options: vec![SetRegionOption::Ttl(Some(Duration::from_secs(500).into()))],
272            },
273        };
274        let result = engine_inner
275            .alter_physical_region(physical_region_id, alter_region_option_request.clone())
276            .await;
277        assert!(result.is_ok());
278
279        // alter logical region
280        let metadata_region = env.metadata_region();
281        let logical_region_id = env.default_logical_region_id();
282        let is_column_exist = metadata_region
283            .column_semantic_type(physical_region_id, logical_region_id, "tag1")
284            .await
285            .unwrap()
286            .is_some();
287        assert!(!is_column_exist);
288
289        let region_id = env.default_logical_region_id();
290        engine_inner
291            .alter_logical_regions(
292                physical_region_id,
293                vec![(region_id, request)],
294                &mut HashMap::new(),
295            )
296            .await
297            .unwrap();
298        let semantic_type = metadata_region
299            .column_semantic_type(physical_region_id, logical_region_id, "tag1")
300            .await
301            .unwrap()
302            .unwrap();
303        assert_eq!(semantic_type, SemanticType::Tag);
304        let timestamp_index = metadata_region
305            .column_semantic_type(physical_region_id, logical_region_id, "greptime_timestamp")
306            .await
307            .unwrap()
308            .unwrap();
309        assert_eq!(timestamp_index, SemanticType::Timestamp);
310    }
311}