1mod extract_new_columns;
16mod validate;
17
18use std::collections::{BTreeSet, HashMap, HashSet};
19
20use extract_new_columns::extract_new_columns;
21use snafu::{OptionExt, ResultExt, ensure};
22use store_api::metadata::ColumnMetadata;
23use store_api::metric_engine_consts::ALTER_PHYSICAL_EXTENSION_KEY;
24use store_api::region_request::{AffectedRows, AlterKind, RegionAlterRequest};
25use store_api::storage::RegionId;
26use validate::validate_alter_region_requests;
27
28use crate::engine::MetricEngineInner;
29use crate::error::{
30 LogicalRegionNotFoundSnafu, PhysicalRegionNotFoundSnafu, Result, SerializeColumnMetadataSnafu,
31 UnexpectedRequestSnafu,
32};
33use crate::utils::{append_manifest_info, encode_manifest_info_to_extensions, to_data_region_id};
34
35impl MetricEngineInner {
36 pub async fn alter_regions(
37 &self,
38 mut requests: Vec<(RegionId, RegionAlterRequest)>,
39 extension_return_value: &mut HashMap<String, Vec<u8>>,
40 ) -> Result<AffectedRows> {
41 if requests.is_empty() {
42 return Ok(0);
43 }
44
45 let first_region_id = &requests.first().unwrap().0;
46 if self.is_physical_region(*first_region_id) {
47 ensure!(
48 requests.len() == 1,
49 UnexpectedRequestSnafu {
50 reason: "Physical table must be altered with single request".to_string(),
51 }
52 );
53 let (region_id, request) = requests.pop().unwrap();
54 self.alter_physical_region(region_id, request).await?;
55 } else {
56 if requests.len() == 1 {
58 let region_id = requests.first().unwrap().0;
60 let physical_region_id = self
61 .state
62 .read()
63 .unwrap()
64 .get_physical_region_id(region_id)
65 .with_context(|| LogicalRegionNotFoundSnafu { region_id })?;
66 let mut manifest_infos = Vec::with_capacity(1);
67 self.alter_logical_regions(physical_region_id, requests, extension_return_value)
68 .await?;
69 append_manifest_info(&self.mito, physical_region_id, &mut manifest_infos);
70 encode_manifest_info_to_extensions(&manifest_infos, extension_return_value)?;
71 } else {
72 let grouped_requests =
73 self.group_logical_region_requests_by_physical_region_id(requests)?;
74 let mut manifest_infos = Vec::with_capacity(grouped_requests.len());
75 for (physical_region_id, requests) in grouped_requests {
76 self.alter_logical_regions(
77 physical_region_id,
78 requests,
79 extension_return_value,
80 )
81 .await?;
82 append_manifest_info(&self.mito, physical_region_id, &mut manifest_infos);
83 }
84 encode_manifest_info_to_extensions(&manifest_infos, extension_return_value)?;
85 }
86 }
87 Ok(0)
88 }
89
90 fn group_logical_region_requests_by_physical_region_id(
92 &self,
93 requests: Vec<(RegionId, RegionAlterRequest)>,
94 ) -> Result<HashMap<RegionId, Vec<(RegionId, RegionAlterRequest)>>> {
95 let mut result = HashMap::with_capacity(requests.len());
96 let state = self.state.read().unwrap();
97
98 for (region_id, request) in requests {
99 let physical_region_id = state
100 .get_physical_region_id(region_id)
101 .with_context(|| LogicalRegionNotFoundSnafu { region_id })?;
102 result
103 .entry(physical_region_id)
104 .or_insert_with(Vec::new)
105 .push((region_id, request));
106 }
107
108 Ok(result)
109 }
110
111 pub async fn alter_logical_regions(
113 &self,
114 physical_region_id: RegionId,
115 requests: Vec<(RegionId, RegionAlterRequest)>,
116 extension_return_value: &mut HashMap<String, Vec<u8>>,
117 ) -> Result<AffectedRows> {
118 validate_alter_region_requests(&requests)?;
120
121 let mut new_column_names = HashSet::new();
123 let mut new_columns_to_add = vec![];
124
125 let index_options = {
126 let state = &self.state.read().unwrap();
127 let region_state = state
128 .physical_region_states()
129 .get(&physical_region_id)
130 .with_context(|| PhysicalRegionNotFoundSnafu {
131 region_id: physical_region_id,
132 })?;
133 let physical_columns = region_state.physical_columns();
134
135 extract_new_columns(
136 &requests,
137 physical_columns,
138 &mut new_column_names,
139 &mut new_columns_to_add,
140 )?;
141
142 region_state.options().index
143 };
144 let data_region_id = to_data_region_id(physical_region_id);
145
146 let region_ids = requests
149 .iter()
150 .map(|(region_id, _)| *region_id)
151 .collect::<BTreeSet<_>>();
152
153 let mut write_guards = Vec::with_capacity(region_ids.len());
154 for region_id in region_ids {
155 write_guards.push(
156 self.metadata_region
157 .write_lock_logical_region(region_id)
158 .await?,
159 );
160 }
161
162 self.data_region
163 .add_columns(data_region_id, new_columns_to_add, index_options)
164 .await?;
165
166 let physical_columns = self.data_region.physical_columns(data_region_id).await?;
167 let physical_schema_map = physical_columns
168 .iter()
169 .map(|metadata| (metadata.column_schema.name.as_str(), metadata))
170 .collect::<HashMap<_, _>>();
171
172 let logical_region_columns = requests.iter().map(|(region_id, request)| {
173 let AlterKind::AddColumns { columns } = &request.kind else {
174 unreachable!()
175 };
176 (
177 *region_id,
178 columns
179 .iter()
180 .map(|col| {
181 let column_name = col.column_metadata.column_schema.name.as_str();
182 let column_metadata = *physical_schema_map.get(column_name).unwrap();
183 (column_name, column_metadata)
184 })
185 .collect::<HashMap<_, _>>(),
186 )
187 });
188
189 let new_add_columns = new_column_names.iter().map(|name| {
190 let column_metadata = *physical_schema_map.get(name).unwrap();
192 (name.to_string(), column_metadata.column_id)
193 });
194
195 self.metadata_region
197 .add_logical_regions(physical_region_id, false, logical_region_columns)
198 .await?;
199
200 extension_return_value.insert(
201 ALTER_PHYSICAL_EXTENSION_KEY.to_string(),
202 ColumnMetadata::encode_list(&physical_columns).context(SerializeColumnMetadataSnafu)?,
203 );
204
205 let mut state = self.state.write().unwrap();
206 state.add_physical_columns(data_region_id, new_add_columns);
207 state.invalid_logical_regions_cache(requests.iter().map(|(region_id, _)| *region_id));
208
209 Ok(0)
210 }
211
212 async fn alter_physical_region(
213 &self,
214 region_id: RegionId,
215 request: RegionAlterRequest,
216 ) -> Result<()> {
217 self.data_region
218 .alter_region_options(region_id, request)
219 .await?;
220 Ok(())
221 }
222}
223
224#[cfg(test)]
225mod test {
226 use std::time::Duration;
227
228 use api::v1::SemanticType;
229 use common_meta::ddl::test_util::assert_column_name_and_id;
230 use common_meta::ddl::utils::{parse_column_metadatas, parse_manifest_infos_from_extensions};
231 use common_query::prelude::{greptime_timestamp, greptime_value};
232 use store_api::metric_engine_consts::ALTER_PHYSICAL_EXTENSION_KEY;
233 use store_api::region_engine::RegionEngine;
234 use store_api::region_request::{
235 AlterKind, BatchRegionDdlRequest, RegionAlterRequest, SetRegionOption,
236 };
237 use store_api::storage::RegionId;
238 use store_api::storage::consts::ReservedColumnId;
239
240 use crate::test_util::{TestEnv, alter_logical_region_request, create_logical_region_request};
241
242 #[tokio::test]
243 async fn test_alter_region() {
244 let env = TestEnv::new().await;
245 env.init_metric_region().await;
246 let engine = env.metric();
247 let engine_inner = engine.inner;
248
249 let physical_region_id = env.default_physical_region_id();
251 let request = alter_logical_region_request(&["tag1"]);
252
253 let result = engine_inner
254 .alter_physical_region(physical_region_id, request.clone())
255 .await;
256 assert!(result.is_err());
257 assert_eq!(
258 result.unwrap_err().to_string(),
259 "Alter request to physical region is forbidden".to_string()
260 );
261
262 let alter_region_option_request = RegionAlterRequest {
264 kind: AlterKind::SetRegionOptions {
265 options: vec![SetRegionOption::Ttl(Some(Duration::from_secs(500).into()))],
266 },
267 };
268 let result = engine_inner
269 .alter_physical_region(physical_region_id, alter_region_option_request.clone())
270 .await;
271 assert!(result.is_ok());
272
273 let metadata_region = env.metadata_region();
275 let logical_region_id = env.default_logical_region_id();
276 let is_column_exist = metadata_region
277 .column_semantic_type(physical_region_id, logical_region_id, "tag1")
278 .await
279 .unwrap()
280 .is_some();
281 assert!(!is_column_exist);
282
283 let region_id = env.default_logical_region_id();
284 let response = env
285 .metric()
286 .handle_batch_ddl_requests(BatchRegionDdlRequest::Alter(vec![(
287 region_id,
288 request.clone(),
289 )]))
290 .await
291 .unwrap();
292 let manifest_infos = parse_manifest_infos_from_extensions(&response.extensions).unwrap();
293 assert_eq!(manifest_infos[0].0, physical_region_id);
294 assert!(manifest_infos[0].1.is_metric());
295
296 let semantic_type = metadata_region
297 .column_semantic_type(physical_region_id, logical_region_id, "tag1")
298 .await
299 .unwrap()
300 .unwrap();
301 assert_eq!(semantic_type, SemanticType::Tag);
302 let timestamp_index = metadata_region
303 .column_semantic_type(physical_region_id, logical_region_id, greptime_timestamp())
304 .await
305 .unwrap()
306 .unwrap();
307 assert_eq!(timestamp_index, SemanticType::Timestamp);
308 let column_metadatas =
309 parse_column_metadatas(&response.extensions, ALTER_PHYSICAL_EXTENSION_KEY).unwrap();
310 assert_column_name_and_id(
311 &column_metadatas,
312 &[
313 (greptime_timestamp(), 0),
314 (greptime_value(), 1),
315 ("__table_id", ReservedColumnId::table_id()),
316 ("__tsid", ReservedColumnId::tsid()),
317 ("job", 2),
318 ("tag1", 3),
319 ],
320 );
321 }
322
323 #[tokio::test]
324 async fn test_alter_logical_regions() {
325 let env = TestEnv::new().await;
326 let engine = env.metric();
327 let physical_region_id1 = RegionId::new(1024, 0);
328 let physical_region_id2 = RegionId::new(1024, 1);
329 let logical_region_id1 = RegionId::new(1025, 0);
330 let logical_region_id2 = RegionId::new(1025, 1);
331 env.create_physical_region(physical_region_id1, "/test_dir1", vec![])
332 .await;
333 env.create_physical_region(physical_region_id2, "/test_dir2", vec![])
334 .await;
335
336 let region_create_request1 = crate::test_util::create_logical_region_request(
337 &["job"],
338 physical_region_id1,
339 "logical1",
340 );
341 let region_create_request2 =
342 create_logical_region_request(&["job"], physical_region_id2, "logical2");
343 engine
344 .handle_batch_ddl_requests(BatchRegionDdlRequest::Create(vec![
345 (logical_region_id1, region_create_request1),
346 (logical_region_id2, region_create_request2),
347 ]))
348 .await
349 .unwrap();
350
351 let region_alter_request1 = alter_logical_region_request(&["tag1"]);
352 let region_alter_request2 = alter_logical_region_request(&["tag1"]);
353 let response = engine
354 .handle_batch_ddl_requests(BatchRegionDdlRequest::Alter(vec![
355 (logical_region_id1, region_alter_request1),
356 (logical_region_id2, region_alter_request2),
357 ]))
358 .await
359 .unwrap();
360
361 let manifest_infos = parse_manifest_infos_from_extensions(&response.extensions).unwrap();
362 assert_eq!(manifest_infos.len(), 2);
363 let region_ids = manifest_infos.into_iter().map(|i| i.0).collect::<Vec<_>>();
364 assert!(region_ids.contains(&physical_region_id1));
365 assert!(region_ids.contains(&physical_region_id2));
366
367 let column_metadatas =
368 parse_column_metadatas(&response.extensions, ALTER_PHYSICAL_EXTENSION_KEY).unwrap();
369 assert_column_name_and_id(
370 &column_metadatas,
371 &[
372 (greptime_timestamp(), 0),
373 (greptime_value(), 1),
374 ("__table_id", ReservedColumnId::table_id()),
375 ("__tsid", ReservedColumnId::tsid()),
376 ("job", 2),
377 ("tag1", 3),
378 ],
379 );
380 }
381}