1use std::str::FromStr;
18use std::sync::Arc;
19
20use common_base::readable_size::ReadableSize;
21use common_telemetry::info;
22use common_telemetry::tracing::warn;
23use humantime_serde::re::humantime;
24use snafu::{ResultExt, ensure};
25use store_api::metadata::{
26 InvalidSetRegionOptionRequestSnafu, MetadataError, RegionMetadata, RegionMetadataBuilder,
27 RegionMetadataRef,
28};
29use store_api::mito_engine_options;
30use store_api::region_request::{AlterKind, RegionAlterRequest, SetRegionOption};
31use store_api::storage::RegionId;
32
33use crate::error::{InvalidMetadataSnafu, InvalidRegionRequestSnafu, Result};
34use crate::flush::FlushReason;
35use crate::manifest::action::RegionChange;
36use crate::region::MitoRegionRef;
37use crate::region::options::CompactionOptions::Twcs;
38use crate::region::options::{RegionOptions, TwcsOptions};
39use crate::region::version::VersionRef;
40use crate::request::{DdlRequest, OptionOutputTx, SenderDdlRequest};
41use crate::sst::FormatType;
42use crate::worker::RegionWorkerLoop;
43
44impl<S> RegionWorkerLoop<S> {
45 pub(crate) async fn handle_alter_request(
46 &mut self,
47 region_id: RegionId,
48 request: RegionAlterRequest,
49 sender: OptionOutputTx,
50 ) {
51 let region = match self.regions.writable_non_staging_region(region_id) {
52 Ok(region) => region,
53 Err(e) => {
54 sender.send(Err(e));
55 return;
56 }
57 };
58
59 info!("Try to alter region: {}, request: {:?}", region_id, request);
60
61 let mut version = region.version();
63
64 let set_options = match &request.kind {
66 AlterKind::SetRegionOptions { options } => options.clone(),
67 AlterKind::UnsetRegionOptions { keys } => {
68 keys.iter().map(Into::into).collect()
72 }
73 _ => Vec::new(),
74 };
75 if !set_options.is_empty() {
76 match self.handle_alter_region_options_fast(®ion, version, set_options) {
77 Ok(new_version) => {
78 let Some(new_version) = new_version else {
79 sender.send(Ok(0));
81 return;
82 };
83 version = new_version;
84 }
85 Err(e) => {
86 sender.send(Err(e).context(InvalidMetadataSnafu));
87 return;
88 }
89 }
90 }
91
92 if let Err(e) = request.validate(&version.metadata) {
94 sender.send(Err(e).context(InvalidRegionRequestSnafu));
96 return;
97 }
98
99 if !request.need_alter(&version.metadata) {
101 warn!(
102 "Ignores alter request as it alters nothing, region_id: {}, request: {:?}",
103 region_id, request
104 );
105 sender.send(Ok(0));
106 return;
107 }
108
109 if !version.memtables.is_empty() {
111 info!("Flush region: {} before alteration", region_id);
114
115 let task = self.new_flush_task(
117 ®ion,
118 FlushReason::Alter,
119 None,
120 self.config.clone(),
121 region.is_staging(),
122 );
123 if let Err(e) =
124 self.flush_scheduler
125 .schedule_flush(region.region_id, ®ion.version_control, task)
126 {
127 sender.send(Err(e));
129 return;
130 }
131
132 self.flush_scheduler
134 .add_ddl_request_to_pending(SenderDdlRequest {
135 region_id,
136 sender,
137 request: DdlRequest::Alter(request),
138 });
139
140 return;
141 }
142
143 info!(
144 "Try to alter region {}, version.metadata: {:?}, version.options: {:?}, request: {:?}",
145 region_id, version.metadata, version.options, request,
146 );
147 self.handle_alter_region_with_empty_memtable(region, version, request, sender);
148 }
149
150 fn handle_alter_region_with_empty_memtable(
153 &mut self,
154 region: MitoRegionRef,
155 version: VersionRef,
156 request: RegionAlterRequest,
157 sender: OptionOutputTx,
158 ) {
159 let need_index = need_change_index(&request.kind);
160 let new_options = new_region_options_on_empty_memtable(&version.options, &request.kind);
161 let new_meta = match metadata_after_alteration(&version.metadata, request) {
162 Ok(new_meta) => new_meta,
163 Err(e) => {
164 sender.send(Err(e));
165 return;
166 }
167 };
168 let change = RegionChange {
170 metadata: new_meta,
171 sst_format: new_options
172 .as_ref()
173 .unwrap_or(&version.options)
174 .sst_format
175 .unwrap_or_default(),
176 };
177 self.handle_manifest_region_change(region, change, need_index, new_options, sender);
178 }
179
180 fn handle_alter_region_options_fast(
187 &mut self,
188 region: &MitoRegionRef,
189 version: VersionRef,
190 options: Vec<SetRegionOption>,
191 ) -> std::result::Result<Option<VersionRef>, MetadataError> {
192 assert!(!options.is_empty());
193
194 let mut all_options_altered = true;
195 let mut current_options = version.options.clone();
196 for option in options {
197 match option {
198 SetRegionOption::Ttl(new_ttl) => {
199 info!(
200 "Update region ttl: {}, previous: {:?} new: {:?}",
201 region.region_id, current_options.ttl, new_ttl
202 );
203 current_options.ttl = new_ttl;
204 }
205 SetRegionOption::Twsc(key, value) => {
206 let Twcs(options) = &mut current_options.compaction;
207 set_twcs_options(
208 options,
209 &TwcsOptions::default(),
210 &key,
211 &value,
212 region.region_id,
213 )?;
214 }
215 SetRegionOption::Format(format_str) => {
216 let new_format = format_str.parse::<FormatType>().map_err(|_| {
217 store_api::metadata::InvalidRegionRequestSnafu {
218 region_id: region.region_id,
219 err: format!("Invalid format type: {}", format_str),
220 }
221 .build()
222 })?;
223 if new_format != current_options.sst_format.unwrap_or_default() {
225 all_options_altered = false;
226
227 ensure!(
229 new_format == FormatType::Flat,
230 store_api::metadata::InvalidRegionRequestSnafu {
231 region_id: region.region_id,
232 err: "Only allow changing format type to flat",
233 }
234 );
235 }
236 }
237 }
238 }
239 region.version_control.alter_options(current_options);
240 if all_options_altered {
241 Ok(None)
242 } else {
243 Ok(Some(region.version()))
244 }
245 }
246}
247
248fn new_region_options_on_empty_memtable(
250 current_options: &RegionOptions,
251 kind: &AlterKind,
252) -> Option<RegionOptions> {
253 let AlterKind::SetRegionOptions { options } = kind else {
254 return None;
255 };
256
257 if options.is_empty() {
258 return None;
259 }
260
261 let mut current_options = current_options.clone();
262 for option in options {
263 match option {
264 SetRegionOption::Ttl(_) | SetRegionOption::Twsc(_, _) => (),
265 SetRegionOption::Format(format_str) => {
266 let new_format = format_str.parse::<FormatType>().unwrap();
268 assert_eq!(FormatType::Flat, new_format);
269
270 current_options.sst_format = Some(new_format);
271 }
272 }
273 }
274 Some(current_options)
275}
276
277fn metadata_after_alteration(
281 metadata: &RegionMetadata,
282 request: RegionAlterRequest,
283) -> Result<RegionMetadataRef> {
284 let mut builder = RegionMetadataBuilder::from_existing(metadata.clone());
285 builder
286 .alter(request.kind)
287 .context(InvalidRegionRequestSnafu)?
288 .bump_version();
289 let new_meta = builder.build().context(InvalidMetadataSnafu)?;
290
291 Ok(Arc::new(new_meta))
292}
293
294fn set_twcs_options(
295 options: &mut TwcsOptions,
296 default_option: &TwcsOptions,
297 key: &str,
298 value: &str,
299 region_id: RegionId,
300) -> std::result::Result<(), MetadataError> {
301 match key {
302 mito_engine_options::TWCS_TRIGGER_FILE_NUM => {
303 let files = parse_usize_with_default(key, value, default_option.trigger_file_num)?;
304 log_option_update(region_id, key, options.trigger_file_num, files);
305 options.trigger_file_num = files;
306 }
307 mito_engine_options::TWCS_MAX_OUTPUT_FILE_SIZE => {
308 let size = if value.is_empty() {
309 default_option.max_output_file_size
310 } else {
311 Some(
312 ReadableSize::from_str(value)
313 .map_err(|_| InvalidSetRegionOptionRequestSnafu { key, value }.build())?,
314 )
315 };
316 log_option_update(region_id, key, options.max_output_file_size, size);
317 options.max_output_file_size = size;
318 }
319 mito_engine_options::TWCS_TIME_WINDOW => {
320 let window = if value.is_empty() {
321 default_option.time_window
322 } else {
323 Some(
324 humantime::parse_duration(value)
325 .map_err(|_| InvalidSetRegionOptionRequestSnafu { key, value }.build())?,
326 )
327 };
328 log_option_update(region_id, key, options.time_window, window);
329 options.time_window = window;
330 }
331 _ => return InvalidSetRegionOptionRequestSnafu { key, value }.fail(),
332 }
333 Ok(())
334}
335
336fn parse_usize_with_default(
337 key: &str,
338 value: &str,
339 default: usize,
340) -> std::result::Result<usize, MetadataError> {
341 if value.is_empty() {
342 Ok(default)
343 } else {
344 value
345 .parse::<usize>()
346 .map_err(|_| InvalidSetRegionOptionRequestSnafu { key, value }.build())
347 }
348}
349
350fn log_option_update<T: std::fmt::Debug>(
351 region_id: RegionId,
352 option_name: &str,
353 prev_value: T,
354 cur_value: T,
355) {
356 info!(
357 "Update region {}: {}, previous: {:?}, new: {:?}",
358 option_name, region_id, prev_value, cur_value
359 );
360}
361
362fn need_change_index(kind: &AlterKind) -> bool {
364 match kind {
365 AlterKind::SetIndexes { options: _ } => true,
368 _ => false,
372 }
373}