metric_engine/engine/
alter.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod extract_new_columns;
16mod validate;
17
18use std::collections::{BTreeSet, HashMap, HashSet};
19
20use extract_new_columns::extract_new_columns;
21use snafu::{OptionExt, ResultExt, ensure};
22use store_api::metadata::ColumnMetadata;
23use store_api::metric_engine_consts::ALTER_PHYSICAL_EXTENSION_KEY;
24use store_api::region_request::{AffectedRows, AlterKind, RegionAlterRequest};
25use store_api::storage::RegionId;
26use validate::validate_alter_region_requests;
27
28use crate::engine::MetricEngineInner;
29use crate::error::{
30    LogicalRegionNotFoundSnafu, PhysicalRegionNotFoundSnafu, Result, SerializeColumnMetadataSnafu,
31    UnexpectedRequestSnafu,
32};
33use crate::utils::{append_manifest_info, encode_manifest_info_to_extensions, to_data_region_id};
34
35impl MetricEngineInner {
36    pub async fn alter_regions(
37        &self,
38        mut requests: Vec<(RegionId, RegionAlterRequest)>,
39        extension_return_value: &mut HashMap<String, Vec<u8>>,
40    ) -> Result<AffectedRows> {
41        if requests.is_empty() {
42            return Ok(0);
43        }
44
45        let first_region_id = &requests.first().unwrap().0;
46        if self.is_physical_region(*first_region_id) {
47            ensure!(
48                requests.len() == 1,
49                UnexpectedRequestSnafu {
50                    reason: "Physical table must be altered with single request".to_string(),
51                }
52            );
53            let (region_id, request) = requests.pop().unwrap();
54            self.alter_physical_region(region_id, request).await?;
55        } else {
56            // Fast path for single logical region alter request
57            if requests.len() == 1 {
58                // Safety: requests is not empty
59                let region_id = requests.first().unwrap().0;
60                let physical_region_id = self
61                    .state
62                    .read()
63                    .unwrap()
64                    .get_physical_region_id(region_id)
65                    .with_context(|| LogicalRegionNotFoundSnafu { region_id })?;
66                let mut manifest_infos = Vec::with_capacity(1);
67                self.alter_logical_regions(physical_region_id, requests, extension_return_value)
68                    .await?;
69                append_manifest_info(&self.mito, physical_region_id, &mut manifest_infos);
70                encode_manifest_info_to_extensions(&manifest_infos, extension_return_value)?;
71            } else {
72                let grouped_requests =
73                    self.group_logical_region_requests_by_physical_region_id(requests)?;
74                let mut manifest_infos = Vec::with_capacity(grouped_requests.len());
75                for (physical_region_id, requests) in grouped_requests {
76                    self.alter_logical_regions(
77                        physical_region_id,
78                        requests,
79                        extension_return_value,
80                    )
81                    .await?;
82                    append_manifest_info(&self.mito, physical_region_id, &mut manifest_infos);
83                }
84                encode_manifest_info_to_extensions(&manifest_infos, extension_return_value)?;
85            }
86        }
87        Ok(0)
88    }
89
90    /// Groups the alter logical region requests by physical region id.
91    fn group_logical_region_requests_by_physical_region_id(
92        &self,
93        requests: Vec<(RegionId, RegionAlterRequest)>,
94    ) -> Result<HashMap<RegionId, Vec<(RegionId, RegionAlterRequest)>>> {
95        let mut result = HashMap::with_capacity(requests.len());
96        let state = self.state.read().unwrap();
97
98        for (region_id, request) in requests {
99            let physical_region_id = state
100                .get_physical_region_id(region_id)
101                .with_context(|| LogicalRegionNotFoundSnafu { region_id })?;
102            result
103                .entry(physical_region_id)
104                .or_insert_with(Vec::new)
105                .push((region_id, request));
106        }
107
108        Ok(result)
109    }
110
111    /// Alter multiple logical regions on the same physical region.
112    pub async fn alter_logical_regions(
113        &self,
114        physical_region_id: RegionId,
115        requests: Vec<(RegionId, RegionAlterRequest)>,
116        extension_return_value: &mut HashMap<String, Vec<u8>>,
117    ) -> Result<AffectedRows> {
118        // Checks all alter requests are add columns.
119        validate_alter_region_requests(&requests)?;
120
121        // Finds new columns to add
122        let mut new_column_names = HashSet::new();
123        let mut new_columns_to_add = vec![];
124
125        let index_options = {
126            let state = &self.state.read().unwrap();
127            let region_state = state
128                .physical_region_states()
129                .get(&physical_region_id)
130                .with_context(|| PhysicalRegionNotFoundSnafu {
131                    region_id: physical_region_id,
132                })?;
133            let physical_columns = region_state.physical_columns();
134
135            extract_new_columns(
136                &requests,
137                physical_columns,
138                &mut new_column_names,
139                &mut new_columns_to_add,
140            )?;
141
142            region_state.options().index
143        };
144        let data_region_id = to_data_region_id(physical_region_id);
145
146        // Acquire logical region locks in a deterministic order to avoid deadlocks when multiple
147        // alter operations target overlapping regions concurrently.
148        let region_ids = requests
149            .iter()
150            .map(|(region_id, _)| *region_id)
151            .collect::<BTreeSet<_>>();
152
153        let mut write_guards = Vec::with_capacity(region_ids.len());
154        for region_id in region_ids {
155            write_guards.push(
156                self.metadata_region
157                    .write_lock_logical_region(region_id)
158                    .await?,
159            );
160        }
161
162        self.data_region
163            .add_columns(data_region_id, new_columns_to_add, index_options)
164            .await?;
165
166        let physical_columns = self.data_region.physical_columns(data_region_id).await?;
167        let physical_schema_map = physical_columns
168            .iter()
169            .map(|metadata| (metadata.column_schema.name.as_str(), metadata))
170            .collect::<HashMap<_, _>>();
171
172        let logical_region_columns = requests.iter().map(|(region_id, request)| {
173            let AlterKind::AddColumns { columns } = &request.kind else {
174                unreachable!()
175            };
176            (
177                *region_id,
178                columns
179                    .iter()
180                    .map(|col| {
181                        let column_name = col.column_metadata.column_schema.name.as_str();
182                        let column_metadata = *physical_schema_map.get(column_name).unwrap();
183                        (column_name, column_metadata)
184                    })
185                    .collect::<HashMap<_, _>>(),
186            )
187        });
188
189        let new_add_columns = new_column_names.iter().map(|name| {
190            // Safety: previous steps ensure the physical region exist
191            let column_metadata = *physical_schema_map.get(name).unwrap();
192            (name.to_string(), column_metadata.column_id)
193        });
194
195        // Writes logical regions metadata to metadata region
196        self.metadata_region
197            .add_logical_regions(physical_region_id, false, logical_region_columns)
198            .await?;
199
200        extension_return_value.insert(
201            ALTER_PHYSICAL_EXTENSION_KEY.to_string(),
202            ColumnMetadata::encode_list(&physical_columns).context(SerializeColumnMetadataSnafu)?,
203        );
204
205        let mut state = self.state.write().unwrap();
206        state.add_physical_columns(data_region_id, new_add_columns);
207        state.invalid_logical_regions_cache(requests.iter().map(|(region_id, _)| *region_id));
208
209        Ok(0)
210    }
211
212    async fn alter_physical_region(
213        &self,
214        region_id: RegionId,
215        request: RegionAlterRequest,
216    ) -> Result<()> {
217        self.data_region
218            .alter_region_options(region_id, request)
219            .await?;
220        Ok(())
221    }
222}
223
224#[cfg(test)]
225mod test {
226    use std::time::Duration;
227
228    use api::v1::SemanticType;
229    use common_meta::ddl::test_util::assert_column_name_and_id;
230    use common_meta::ddl::utils::{parse_column_metadatas, parse_manifest_infos_from_extensions};
231    use common_query::prelude::{greptime_timestamp, greptime_value};
232    use store_api::metric_engine_consts::ALTER_PHYSICAL_EXTENSION_KEY;
233    use store_api::region_engine::RegionEngine;
234    use store_api::region_request::{
235        AlterKind, BatchRegionDdlRequest, RegionAlterRequest, SetRegionOption,
236    };
237    use store_api::storage::RegionId;
238    use store_api::storage::consts::ReservedColumnId;
239
240    use crate::test_util::{TestEnv, alter_logical_region_request, create_logical_region_request};
241
242    #[tokio::test]
243    async fn test_alter_region() {
244        let env = TestEnv::new().await;
245        env.init_metric_region().await;
246        let engine = env.metric();
247        let engine_inner = engine.inner;
248
249        // alter physical region
250        let physical_region_id = env.default_physical_region_id();
251        let request = alter_logical_region_request(&["tag1"]);
252
253        let result = engine_inner
254            .alter_physical_region(physical_region_id, request.clone())
255            .await;
256        assert!(result.is_err());
257        assert_eq!(
258            result.unwrap_err().to_string(),
259            "Alter request to physical region is forbidden".to_string()
260        );
261
262        // alter physical region's option should work
263        let alter_region_option_request = RegionAlterRequest {
264            kind: AlterKind::SetRegionOptions {
265                options: vec![SetRegionOption::Ttl(Some(Duration::from_secs(500).into()))],
266            },
267        };
268        let result = engine_inner
269            .alter_physical_region(physical_region_id, alter_region_option_request.clone())
270            .await;
271        assert!(result.is_ok());
272
273        // alter logical region
274        let metadata_region = env.metadata_region();
275        let logical_region_id = env.default_logical_region_id();
276        let is_column_exist = metadata_region
277            .column_semantic_type(physical_region_id, logical_region_id, "tag1")
278            .await
279            .unwrap()
280            .is_some();
281        assert!(!is_column_exist);
282
283        let region_id = env.default_logical_region_id();
284        let response = env
285            .metric()
286            .handle_batch_ddl_requests(BatchRegionDdlRequest::Alter(vec![(
287                region_id,
288                request.clone(),
289            )]))
290            .await
291            .unwrap();
292        let manifest_infos = parse_manifest_infos_from_extensions(&response.extensions).unwrap();
293        assert_eq!(manifest_infos[0].0, physical_region_id);
294        assert!(manifest_infos[0].1.is_metric());
295
296        let semantic_type = metadata_region
297            .column_semantic_type(physical_region_id, logical_region_id, "tag1")
298            .await
299            .unwrap()
300            .unwrap();
301        assert_eq!(semantic_type, SemanticType::Tag);
302        let timestamp_index = metadata_region
303            .column_semantic_type(physical_region_id, logical_region_id, greptime_timestamp())
304            .await
305            .unwrap()
306            .unwrap();
307        assert_eq!(timestamp_index, SemanticType::Timestamp);
308        let column_metadatas =
309            parse_column_metadatas(&response.extensions, ALTER_PHYSICAL_EXTENSION_KEY).unwrap();
310        assert_column_name_and_id(
311            &column_metadatas,
312            &[
313                (greptime_timestamp(), 0),
314                (greptime_value(), 1),
315                ("__table_id", ReservedColumnId::table_id()),
316                ("__tsid", ReservedColumnId::tsid()),
317                ("job", 2),
318                ("tag1", 3),
319            ],
320        );
321    }
322
323    #[tokio::test]
324    async fn test_alter_logical_regions() {
325        let env = TestEnv::new().await;
326        let engine = env.metric();
327        let physical_region_id1 = RegionId::new(1024, 0);
328        let physical_region_id2 = RegionId::new(1024, 1);
329        let logical_region_id1 = RegionId::new(1025, 0);
330        let logical_region_id2 = RegionId::new(1025, 1);
331        env.create_physical_region(physical_region_id1, "/test_dir1", vec![])
332            .await;
333        env.create_physical_region(physical_region_id2, "/test_dir2", vec![])
334            .await;
335
336        let region_create_request1 = crate::test_util::create_logical_region_request(
337            &["job"],
338            physical_region_id1,
339            "logical1",
340        );
341        let region_create_request2 =
342            create_logical_region_request(&["job"], physical_region_id2, "logical2");
343        engine
344            .handle_batch_ddl_requests(BatchRegionDdlRequest::Create(vec![
345                (logical_region_id1, region_create_request1),
346                (logical_region_id2, region_create_request2),
347            ]))
348            .await
349            .unwrap();
350
351        let region_alter_request1 = alter_logical_region_request(&["tag1"]);
352        let region_alter_request2 = alter_logical_region_request(&["tag1"]);
353        let response = engine
354            .handle_batch_ddl_requests(BatchRegionDdlRequest::Alter(vec![
355                (logical_region_id1, region_alter_request1),
356                (logical_region_id2, region_alter_request2),
357            ]))
358            .await
359            .unwrap();
360
361        let manifest_infos = parse_manifest_infos_from_extensions(&response.extensions).unwrap();
362        assert_eq!(manifest_infos.len(), 2);
363        let region_ids = manifest_infos.into_iter().map(|i| i.0).collect::<Vec<_>>();
364        assert!(region_ids.contains(&physical_region_id1));
365        assert!(region_ids.contains(&physical_region_id2));
366
367        let column_metadatas =
368            parse_column_metadatas(&response.extensions, ALTER_PHYSICAL_EXTENSION_KEY).unwrap();
369        assert_column_name_and_id(
370            &column_metadatas,
371            &[
372                (greptime_timestamp(), 0),
373                (greptime_value(), 1),
374                ("__table_id", ReservedColumnId::table_id()),
375                ("__tsid", ReservedColumnId::tsid()),
376                ("job", 2),
377                ("tag1", 3),
378            ],
379        );
380    }
381}