metric_engine/engine/
alter.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod extract_new_columns;
16mod validate;
17
18use std::collections::{HashMap, HashSet};
19
20use extract_new_columns::extract_new_columns;
21use snafu::{ensure, OptionExt, ResultExt};
22use store_api::metadata::ColumnMetadata;
23use store_api::metric_engine_consts::ALTER_PHYSICAL_EXTENSION_KEY;
24use store_api::region_request::{AffectedRows, AlterKind, RegionAlterRequest};
25use store_api::storage::RegionId;
26use validate::validate_alter_region_requests;
27
28use crate::engine::MetricEngineInner;
29use crate::error::{
30    LogicalRegionNotFoundSnafu, PhysicalRegionNotFoundSnafu, Result, SerializeColumnMetadataSnafu,
31    UnexpectedRequestSnafu,
32};
33use crate::utils::{append_manifest_info, encode_manifest_info_to_extensions, to_data_region_id};
34
35impl MetricEngineInner {
36    pub async fn alter_regions(
37        &self,
38        mut requests: Vec<(RegionId, RegionAlterRequest)>,
39        extension_return_value: &mut HashMap<String, Vec<u8>>,
40    ) -> Result<AffectedRows> {
41        if requests.is_empty() {
42            return Ok(0);
43        }
44
45        let first_region_id = &requests.first().unwrap().0;
46        if self.is_physical_region(*first_region_id) {
47            ensure!(
48                requests.len() == 1,
49                UnexpectedRequestSnafu {
50                    reason: "Physical table must be altered with single request".to_string(),
51                }
52            );
53            let (region_id, request) = requests.pop().unwrap();
54            self.alter_physical_region(region_id, request).await?;
55        } else {
56            // Fast path for single logical region alter request
57            if requests.len() == 1 {
58                // Safety: requests is not empty
59                let region_id = requests.first().unwrap().0;
60                let physical_region_id = self
61                    .state
62                    .read()
63                    .unwrap()
64                    .get_physical_region_id(region_id)
65                    .with_context(|| LogicalRegionNotFoundSnafu { region_id })?;
66                let mut manifest_infos = Vec::with_capacity(1);
67                self.alter_logical_regions(physical_region_id, requests, extension_return_value)
68                    .await?;
69                append_manifest_info(&self.mito, physical_region_id, &mut manifest_infos);
70                encode_manifest_info_to_extensions(&manifest_infos, extension_return_value)?;
71            } else {
72                let grouped_requests =
73                    self.group_logical_region_requests_by_physical_region_id(requests)?;
74                let mut manifest_infos = Vec::with_capacity(grouped_requests.len());
75                for (physical_region_id, requests) in grouped_requests {
76                    self.alter_logical_regions(
77                        physical_region_id,
78                        requests,
79                        extension_return_value,
80                    )
81                    .await?;
82                    append_manifest_info(&self.mito, physical_region_id, &mut manifest_infos);
83                }
84                encode_manifest_info_to_extensions(&manifest_infos, extension_return_value)?;
85            }
86        }
87        Ok(0)
88    }
89
90    /// Groups the alter logical region requests by physical region id.
91    fn group_logical_region_requests_by_physical_region_id(
92        &self,
93        requests: Vec<(RegionId, RegionAlterRequest)>,
94    ) -> Result<HashMap<RegionId, Vec<(RegionId, RegionAlterRequest)>>> {
95        let mut result = HashMap::with_capacity(requests.len());
96        let state = self.state.read().unwrap();
97
98        for (region_id, request) in requests {
99            let physical_region_id = state
100                .get_physical_region_id(region_id)
101                .with_context(|| LogicalRegionNotFoundSnafu { region_id })?;
102            result
103                .entry(physical_region_id)
104                .or_insert_with(Vec::new)
105                .push((region_id, request));
106        }
107
108        Ok(result)
109    }
110
111    /// Alter multiple logical regions on the same physical region.
112    pub async fn alter_logical_regions(
113        &self,
114        physical_region_id: RegionId,
115        requests: Vec<(RegionId, RegionAlterRequest)>,
116        extension_return_value: &mut HashMap<String, Vec<u8>>,
117    ) -> Result<AffectedRows> {
118        // Checks all alter requests are add columns.
119        validate_alter_region_requests(&requests)?;
120
121        // Finds new columns to add
122        let mut new_column_names = HashSet::new();
123        let mut new_columns_to_add = vec![];
124
125        let index_options = {
126            let state = &self.state.read().unwrap();
127            let region_state = state
128                .physical_region_states()
129                .get(&physical_region_id)
130                .with_context(|| PhysicalRegionNotFoundSnafu {
131                    region_id: physical_region_id,
132                })?;
133            let physical_columns = region_state.physical_columns();
134
135            extract_new_columns(
136                &requests,
137                physical_columns,
138                &mut new_column_names,
139                &mut new_columns_to_add,
140            )?;
141
142            region_state.options().index
143        };
144        let data_region_id = to_data_region_id(physical_region_id);
145
146        let mut write_guards = HashMap::with_capacity(requests.len());
147        for (region_id, _) in requests.iter() {
148            if write_guards.contains_key(region_id) {
149                continue;
150            }
151            let _write_guard = self
152                .metadata_region
153                .write_lock_logical_region(*region_id)
154                .await?;
155            write_guards.insert(*region_id, _write_guard);
156        }
157
158        self.data_region
159            .add_columns(data_region_id, new_columns_to_add, index_options)
160            .await?;
161
162        let physical_columns = self.data_region.physical_columns(data_region_id).await?;
163        let physical_schema_map = physical_columns
164            .iter()
165            .map(|metadata| (metadata.column_schema.name.as_str(), metadata))
166            .collect::<HashMap<_, _>>();
167
168        let logical_region_columns = requests.iter().map(|(region_id, request)| {
169            let AlterKind::AddColumns { columns } = &request.kind else {
170                unreachable!()
171            };
172            (
173                *region_id,
174                columns
175                    .iter()
176                    .map(|col| {
177                        let column_name = col.column_metadata.column_schema.name.as_str();
178                        let column_metadata = *physical_schema_map.get(column_name).unwrap();
179                        (column_name, column_metadata)
180                    })
181                    .collect::<HashMap<_, _>>(),
182            )
183        });
184
185        let new_add_columns = new_column_names.iter().map(|name| {
186            // Safety: previous steps ensure the physical region exist
187            let column_metadata = *physical_schema_map.get(name).unwrap();
188            (name.to_string(), column_metadata.column_id)
189        });
190
191        // Writes logical regions metadata to metadata region
192        self.metadata_region
193            .add_logical_regions(physical_region_id, false, logical_region_columns)
194            .await?;
195
196        extension_return_value.insert(
197            ALTER_PHYSICAL_EXTENSION_KEY.to_string(),
198            ColumnMetadata::encode_list(&physical_columns).context(SerializeColumnMetadataSnafu)?,
199        );
200
201        let mut state = self.state.write().unwrap();
202        state.add_physical_columns(data_region_id, new_add_columns);
203        state.invalid_logical_regions_cache(requests.iter().map(|(region_id, _)| *region_id));
204
205        Ok(0)
206    }
207
208    async fn alter_physical_region(
209        &self,
210        region_id: RegionId,
211        request: RegionAlterRequest,
212    ) -> Result<()> {
213        self.data_region
214            .alter_region_options(region_id, request)
215            .await?;
216        Ok(())
217    }
218}
219
220#[cfg(test)]
221mod test {
222    use std::time::Duration;
223
224    use api::v1::SemanticType;
225    use common_meta::ddl::test_util::assert_column_name_and_id;
226    use common_meta::ddl::utils::{parse_column_metadatas, parse_manifest_infos_from_extensions};
227    use store_api::metric_engine_consts::ALTER_PHYSICAL_EXTENSION_KEY;
228    use store_api::region_engine::RegionEngine;
229    use store_api::region_request::{
230        AlterKind, BatchRegionDdlRequest, RegionAlterRequest, SetRegionOption,
231    };
232    use store_api::storage::consts::ReservedColumnId;
233    use store_api::storage::RegionId;
234
235    use crate::test_util::{alter_logical_region_request, create_logical_region_request, TestEnv};
236
237    #[tokio::test]
238    async fn test_alter_region() {
239        let env = TestEnv::new().await;
240        env.init_metric_region().await;
241        let engine = env.metric();
242        let engine_inner = engine.inner;
243
244        // alter physical region
245        let physical_region_id = env.default_physical_region_id();
246        let request = alter_logical_region_request(&["tag1"]);
247
248        let result = engine_inner
249            .alter_physical_region(physical_region_id, request.clone())
250            .await;
251        assert!(result.is_err());
252        assert_eq!(
253            result.unwrap_err().to_string(),
254            "Alter request to physical region is forbidden".to_string()
255        );
256
257        // alter physical region's option should work
258        let alter_region_option_request = RegionAlterRequest {
259            kind: AlterKind::SetRegionOptions {
260                options: vec![SetRegionOption::Ttl(Some(Duration::from_secs(500).into()))],
261            },
262        };
263        let result = engine_inner
264            .alter_physical_region(physical_region_id, alter_region_option_request.clone())
265            .await;
266        assert!(result.is_ok());
267
268        // alter logical region
269        let metadata_region = env.metadata_region();
270        let logical_region_id = env.default_logical_region_id();
271        let is_column_exist = metadata_region
272            .column_semantic_type(physical_region_id, logical_region_id, "tag1")
273            .await
274            .unwrap()
275            .is_some();
276        assert!(!is_column_exist);
277
278        let region_id = env.default_logical_region_id();
279        let response = env
280            .metric()
281            .handle_batch_ddl_requests(BatchRegionDdlRequest::Alter(vec![(
282                region_id,
283                request.clone(),
284            )]))
285            .await
286            .unwrap();
287        let manifest_infos = parse_manifest_infos_from_extensions(&response.extensions).unwrap();
288        assert_eq!(manifest_infos[0].0, physical_region_id);
289        assert!(manifest_infos[0].1.is_metric());
290
291        let semantic_type = metadata_region
292            .column_semantic_type(physical_region_id, logical_region_id, "tag1")
293            .await
294            .unwrap()
295            .unwrap();
296        assert_eq!(semantic_type, SemanticType::Tag);
297        let timestamp_index = metadata_region
298            .column_semantic_type(physical_region_id, logical_region_id, "greptime_timestamp")
299            .await
300            .unwrap()
301            .unwrap();
302        assert_eq!(timestamp_index, SemanticType::Timestamp);
303        let column_metadatas =
304            parse_column_metadatas(&response.extensions, ALTER_PHYSICAL_EXTENSION_KEY).unwrap();
305        assert_column_name_and_id(
306            &column_metadatas,
307            &[
308                ("greptime_timestamp", 0),
309                ("greptime_value", 1),
310                ("__table_id", ReservedColumnId::table_id()),
311                ("__tsid", ReservedColumnId::tsid()),
312                ("job", 2),
313                ("tag1", 3),
314            ],
315        );
316    }
317
318    #[tokio::test]
319    async fn test_alter_logical_regions() {
320        let env = TestEnv::new().await;
321        let engine = env.metric();
322        let physical_region_id1 = RegionId::new(1024, 0);
323        let physical_region_id2 = RegionId::new(1024, 1);
324        let logical_region_id1 = RegionId::new(1025, 0);
325        let logical_region_id2 = RegionId::new(1025, 1);
326        env.create_physical_region(physical_region_id1, "/test_dir1")
327            .await;
328        env.create_physical_region(physical_region_id2, "/test_dir2")
329            .await;
330
331        let region_create_request1 = crate::test_util::create_logical_region_request(
332            &["job"],
333            physical_region_id1,
334            "logical1",
335        );
336        let region_create_request2 =
337            create_logical_region_request(&["job"], physical_region_id2, "logical2");
338        engine
339            .handle_batch_ddl_requests(BatchRegionDdlRequest::Create(vec![
340                (logical_region_id1, region_create_request1),
341                (logical_region_id2, region_create_request2),
342            ]))
343            .await
344            .unwrap();
345
346        let region_alter_request1 = alter_logical_region_request(&["tag1"]);
347        let region_alter_request2 = alter_logical_region_request(&["tag1"]);
348        let response = engine
349            .handle_batch_ddl_requests(BatchRegionDdlRequest::Alter(vec![
350                (logical_region_id1, region_alter_request1),
351                (logical_region_id2, region_alter_request2),
352            ]))
353            .await
354            .unwrap();
355
356        let manifest_infos = parse_manifest_infos_from_extensions(&response.extensions).unwrap();
357        assert_eq!(manifest_infos.len(), 2);
358        let region_ids = manifest_infos.into_iter().map(|i| i.0).collect::<Vec<_>>();
359        assert!(region_ids.contains(&physical_region_id1));
360        assert!(region_ids.contains(&physical_region_id2));
361
362        let column_metadatas =
363            parse_column_metadatas(&response.extensions, ALTER_PHYSICAL_EXTENSION_KEY).unwrap();
364        assert_column_name_and_id(
365            &column_metadatas,
366            &[
367                ("greptime_timestamp", 0),
368                ("greptime_value", 1),
369                ("__table_id", ReservedColumnId::table_id()),
370                ("__tsid", ReservedColumnId::tsid()),
371                ("job", 2),
372                ("tag1", 3),
373            ],
374        );
375    }
376}