1mod extract_new_columns;
16mod validate;
17
18use std::collections::{HashMap, HashSet};
19
20use extract_new_columns::extract_new_columns;
21use snafu::{ensure, OptionExt, ResultExt};
22use store_api::metadata::ColumnMetadata;
23use store_api::metric_engine_consts::ALTER_PHYSICAL_EXTENSION_KEY;
24use store_api::region_request::{AffectedRows, AlterKind, RegionAlterRequest};
25use store_api::storage::RegionId;
26use validate::validate_alter_region_requests;
27
28use crate::engine::MetricEngineInner;
29use crate::error::{
30 LogicalRegionNotFoundSnafu, PhysicalRegionNotFoundSnafu, Result, SerializeColumnMetadataSnafu,
31 UnexpectedRequestSnafu,
32};
33use crate::utils::{append_manifest_info, encode_manifest_info_to_extensions, to_data_region_id};
34
35impl MetricEngineInner {
36 pub async fn alter_regions(
37 &self,
38 mut requests: Vec<(RegionId, RegionAlterRequest)>,
39 extension_return_value: &mut HashMap<String, Vec<u8>>,
40 ) -> Result<AffectedRows> {
41 if requests.is_empty() {
42 return Ok(0);
43 }
44
45 let first_region_id = &requests.first().unwrap().0;
46 if self.is_physical_region(*first_region_id) {
47 ensure!(
48 requests.len() == 1,
49 UnexpectedRequestSnafu {
50 reason: "Physical table must be altered with single request".to_string(),
51 }
52 );
53 let (region_id, request) = requests.pop().unwrap();
54 self.alter_physical_region(region_id, request).await?;
55 } else {
56 if requests.len() == 1 {
58 let region_id = requests.first().unwrap().0;
60 let physical_region_id = self
61 .state
62 .read()
63 .unwrap()
64 .get_physical_region_id(region_id)
65 .with_context(|| LogicalRegionNotFoundSnafu { region_id })?;
66 let mut manifest_infos = Vec::with_capacity(1);
67 self.alter_logical_regions(physical_region_id, requests, extension_return_value)
68 .await?;
69 append_manifest_info(&self.mito, region_id, &mut manifest_infos);
70 encode_manifest_info_to_extensions(&manifest_infos, extension_return_value)?;
71 } else {
72 let grouped_requests =
73 self.group_logical_region_requests_by_physical_region_id(requests)?;
74 let mut manifest_infos = Vec::with_capacity(grouped_requests.len());
75 for (physical_region_id, requests) in grouped_requests {
76 self.alter_logical_regions(
77 physical_region_id,
78 requests,
79 extension_return_value,
80 )
81 .await?;
82 append_manifest_info(&self.mito, physical_region_id, &mut manifest_infos);
83 }
84 encode_manifest_info_to_extensions(&manifest_infos, extension_return_value)?;
85 }
86 }
87 Ok(0)
88 }
89
90 fn group_logical_region_requests_by_physical_region_id(
92 &self,
93 requests: Vec<(RegionId, RegionAlterRequest)>,
94 ) -> Result<HashMap<RegionId, Vec<(RegionId, RegionAlterRequest)>>> {
95 let mut result = HashMap::with_capacity(requests.len());
96 let state = self.state.read().unwrap();
97
98 for (region_id, request) in requests {
99 let physical_region_id = state
100 .get_physical_region_id(region_id)
101 .with_context(|| LogicalRegionNotFoundSnafu { region_id })?;
102 result
103 .entry(physical_region_id)
104 .or_insert_with(Vec::new)
105 .push((region_id, request));
106 }
107
108 Ok(result)
109 }
110
111 pub async fn alter_logical_regions(
113 &self,
114 physical_region_id: RegionId,
115 requests: Vec<(RegionId, RegionAlterRequest)>,
116 extension_return_value: &mut HashMap<String, Vec<u8>>,
117 ) -> Result<AffectedRows> {
118 validate_alter_region_requests(&requests)?;
120
121 let mut new_column_names = HashSet::new();
123 let mut new_columns_to_add = vec![];
124
125 let index_options = {
126 let state = &self.state.read().unwrap();
127 let region_state = state
128 .physical_region_states()
129 .get(&physical_region_id)
130 .with_context(|| PhysicalRegionNotFoundSnafu {
131 region_id: physical_region_id,
132 })?;
133 let physical_columns = region_state.physical_columns();
134
135 extract_new_columns(
136 &requests,
137 physical_columns,
138 &mut new_column_names,
139 &mut new_columns_to_add,
140 )?;
141
142 region_state.options().index
143 };
144 let data_region_id = to_data_region_id(physical_region_id);
145
146 let mut write_guards = HashMap::with_capacity(requests.len());
147 for (region_id, _) in requests.iter() {
148 if write_guards.contains_key(region_id) {
149 continue;
150 }
151 let _write_guard = self
152 .metadata_region
153 .write_lock_logical_region(*region_id)
154 .await?;
155 write_guards.insert(*region_id, _write_guard);
156 }
157
158 self.data_region
159 .add_columns(data_region_id, new_columns_to_add, index_options)
160 .await?;
161
162 let physical_columns = self.data_region.physical_columns(data_region_id).await?;
163 let physical_schema_map = physical_columns
164 .iter()
165 .map(|metadata| (metadata.column_schema.name.as_str(), metadata))
166 .collect::<HashMap<_, _>>();
167
168 let logical_region_columns = requests.iter().map(|(region_id, request)| {
169 let AlterKind::AddColumns { columns } = &request.kind else {
170 unreachable!()
171 };
172 (
173 *region_id,
174 columns
175 .iter()
176 .map(|col| {
177 let column_name = col.column_metadata.column_schema.name.as_str();
178 let column_metadata = *physical_schema_map.get(column_name).unwrap();
179 (column_name, column_metadata)
180 })
181 .collect::<HashMap<_, _>>(),
182 )
183 });
184
185 let new_add_columns = new_column_names.iter().map(|name| {
186 let column_metadata = *physical_schema_map.get(name).unwrap();
188 (name.to_string(), column_metadata.column_id)
189 });
190
191 self.metadata_region
193 .add_logical_regions(physical_region_id, false, logical_region_columns)
194 .await?;
195
196 extension_return_value.insert(
197 ALTER_PHYSICAL_EXTENSION_KEY.to_string(),
198 ColumnMetadata::encode_list(&physical_columns).context(SerializeColumnMetadataSnafu)?,
199 );
200
201 let mut state = self.state.write().unwrap();
202 state.add_physical_columns(data_region_id, new_add_columns);
203 state.invalid_logical_regions_cache(requests.iter().map(|(region_id, _)| *region_id));
204
205 Ok(0)
206 }
207
208 async fn alter_physical_region(
209 &self,
210 region_id: RegionId,
211 request: RegionAlterRequest,
212 ) -> Result<()> {
213 self.data_region
214 .alter_region_options(region_id, request)
215 .await?;
216 Ok(())
217 }
218}
219
220#[cfg(test)]
221mod test {
222 use std::time::Duration;
223
224 use api::v1::SemanticType;
225 use datatypes::data_type::ConcreteDataType;
226 use datatypes::schema::ColumnSchema;
227 use store_api::metadata::ColumnMetadata;
228 use store_api::region_request::{AddColumn, SetRegionOption};
229
230 use super::*;
231 use crate::test_util::TestEnv;
232
233 #[tokio::test]
234 async fn test_alter_region() {
235 let env = TestEnv::new().await;
236 env.init_metric_region().await;
237 let engine = env.metric();
238 let engine_inner = engine.inner;
239
240 let physical_region_id = env.default_physical_region_id();
242 let request = RegionAlterRequest {
243 kind: AlterKind::AddColumns {
244 columns: vec![AddColumn {
245 column_metadata: ColumnMetadata {
246 column_id: 0,
247 semantic_type: SemanticType::Tag,
248 column_schema: ColumnSchema::new(
249 "tag1",
250 ConcreteDataType::string_datatype(),
251 false,
252 ),
253 },
254 location: None,
255 }],
256 },
257 };
258
259 let result = engine_inner
260 .alter_physical_region(physical_region_id, request.clone())
261 .await;
262 assert!(result.is_err());
263 assert_eq!(
264 result.unwrap_err().to_string(),
265 "Alter request to physical region is forbidden".to_string()
266 );
267
268 let alter_region_option_request = RegionAlterRequest {
270 kind: AlterKind::SetRegionOptions {
271 options: vec![SetRegionOption::Ttl(Some(Duration::from_secs(500).into()))],
272 },
273 };
274 let result = engine_inner
275 .alter_physical_region(physical_region_id, alter_region_option_request.clone())
276 .await;
277 assert!(result.is_ok());
278
279 let metadata_region = env.metadata_region();
281 let logical_region_id = env.default_logical_region_id();
282 let is_column_exist = metadata_region
283 .column_semantic_type(physical_region_id, logical_region_id, "tag1")
284 .await
285 .unwrap()
286 .is_some();
287 assert!(!is_column_exist);
288
289 let region_id = env.default_logical_region_id();
290 engine_inner
291 .alter_logical_regions(
292 physical_region_id,
293 vec![(region_id, request)],
294 &mut HashMap::new(),
295 )
296 .await
297 .unwrap();
298 let semantic_type = metadata_region
299 .column_semantic_type(physical_region_id, logical_region_id, "tag1")
300 .await
301 .unwrap()
302 .unwrap();
303 assert_eq!(semantic_type, SemanticType::Tag);
304 let timestamp_index = metadata_region
305 .column_semantic_type(physical_region_id, logical_region_id, "greptime_timestamp")
306 .await
307 .unwrap()
308 .unwrap();
309 assert_eq!(timestamp_index, SemanticType::Timestamp);
310 }
311}