1use std::str::FromStr;
18use std::sync::Arc;
19
20use common_base::readable_size::ReadableSize;
21use common_telemetry::info;
22use common_telemetry::tracing::warn;
23use humantime_serde::re::humantime;
24use snafu::{ResultExt, ensure};
25use store_api::logstore::LogStore;
26use store_api::metadata::{
27 InvalidSetRegionOptionRequestSnafu, MetadataError, RegionMetadata, RegionMetadataBuilder,
28 RegionMetadataRef,
29};
30use store_api::mito_engine_options;
31use store_api::region_request::{AlterKind, RegionAlterRequest, SetRegionOption};
32use store_api::storage::RegionId;
33
34use crate::error::{InvalidMetadataSnafu, InvalidRegionRequestSnafu, Result};
35use crate::flush::FlushReason;
36use crate::manifest::action::RegionChange;
37use crate::region::MitoRegionRef;
38use crate::region::options::CompactionOptions::Twcs;
39use crate::region::options::{RegionOptions, TwcsOptions};
40use crate::region::version::VersionRef;
41use crate::request::{DdlRequest, OptionOutputTx, SenderDdlRequest};
42use crate::sst::FormatType;
43use crate::worker::RegionWorkerLoop;
44
45impl<S: LogStore> RegionWorkerLoop<S> {
46 pub(crate) async fn handle_alter_request(
47 &mut self,
48 region_id: RegionId,
49 request: RegionAlterRequest,
50 sender: OptionOutputTx,
51 ) {
52 let region = match self.regions.writable_non_staging_region(region_id) {
53 Ok(region) => region,
54 Err(e) => {
55 sender.send(Err(e));
56 return;
57 }
58 };
59
60 info!("Try to alter region: {}, request: {:?}", region_id, request);
61
62 let mut version = region.version();
64
65 let set_options = match &request.kind {
67 AlterKind::SetRegionOptions { options } => options.clone(),
68 AlterKind::UnsetRegionOptions { keys } => {
69 keys.iter().map(Into::into).collect()
73 }
74 _ => Vec::new(),
75 };
76 if !set_options.is_empty() {
77 match self.handle_alter_region_options_fast(®ion, version, set_options) {
78 Ok(new_version) => {
79 let Some(new_version) = new_version else {
80 sender.send(Ok(0));
82 return;
83 };
84 version = new_version;
85 }
86 Err(e) => {
87 sender.send(Err(e).context(InvalidMetadataSnafu));
88 return;
89 }
90 }
91 }
92
93 if let Err(e) = request.validate(&version.metadata) {
95 sender.send(Err(e).context(InvalidRegionRequestSnafu));
97 return;
98 }
99
100 if !request.need_alter(&version.metadata) {
102 warn!(
103 "Ignores alter request as it alters nothing, region_id: {}, request: {:?}",
104 region_id, request
105 );
106 sender.send(Ok(0));
107 return;
108 }
109
110 if !version.memtables.is_empty() {
112 info!("Flush region: {} before alteration", region_id);
115
116 let task = self.new_flush_task(®ion, FlushReason::Alter, None, self.config.clone());
118 if let Err(e) =
119 self.flush_scheduler
120 .schedule_flush(region.region_id, ®ion.version_control, task)
121 {
122 sender.send(Err(e));
124 return;
125 }
126
127 self.flush_scheduler
129 .add_ddl_request_to_pending(SenderDdlRequest {
130 region_id,
131 sender,
132 request: DdlRequest::Alter(request),
133 });
134
135 return;
136 }
137
138 info!(
139 "Try to alter region {}, version.metadata: {:?}, version.options: {:?}, request: {:?}",
140 region_id, version.metadata, version.options, request,
141 );
142 self.handle_alter_region_with_empty_memtable(region, version, request, sender);
143 }
144
145 fn handle_alter_region_with_empty_memtable(
148 &mut self,
149 region: MitoRegionRef,
150 version: VersionRef,
151 request: RegionAlterRequest,
152 sender: OptionOutputTx,
153 ) {
154 let need_index = need_change_index(&request.kind);
155 let new_options = new_region_options_on_empty_memtable(&version.options, &request.kind);
156 let new_meta = match metadata_after_alteration(&version.metadata, request) {
157 Ok(new_meta) => new_meta,
158 Err(e) => {
159 sender.send(Err(e));
160 return;
161 }
162 };
163 let options = new_options.as_ref().unwrap_or(&version.options);
165 let change = RegionChange {
166 metadata: new_meta,
167 sst_format: options.sst_format.unwrap_or_default(),
168 append_mode: Some(options.append_mode),
169 };
170 self.handle_manifest_region_change(region, change, need_index, new_options, sender);
171 }
172
173 fn handle_alter_region_options_fast(
180 &mut self,
181 region: &MitoRegionRef,
182 version: VersionRef,
183 options: Vec<SetRegionOption>,
184 ) -> std::result::Result<Option<VersionRef>, MetadataError> {
185 assert!(!options.is_empty());
186
187 let mut all_options_altered = true;
188 let mut current_options = version.options.clone();
189 for option in options {
190 match option {
191 SetRegionOption::Ttl(new_ttl) => {
192 info!(
193 "Update region ttl: {}, previous: {:?} new: {:?}",
194 region.region_id, current_options.ttl, new_ttl
195 );
196 current_options.ttl = new_ttl;
197 }
198 SetRegionOption::Twsc(key, value) => {
199 let Twcs(options) = &mut current_options.compaction;
200 set_twcs_options(
201 options,
202 &TwcsOptions::default(),
203 &key,
204 &value,
205 region.region_id,
206 )?;
207 }
208 SetRegionOption::Format(format_str) => {
209 let new_format = format_str.parse::<FormatType>().map_err(|_| {
210 store_api::metadata::InvalidRegionRequestSnafu {
211 region_id: region.region_id,
212 err: format!("Invalid format type: {}", format_str),
213 }
214 .build()
215 })?;
216 if new_format != current_options.sst_format.unwrap_or_default() {
218 all_options_altered = false;
219
220 ensure!(
222 new_format == FormatType::Flat,
223 store_api::metadata::InvalidRegionRequestSnafu {
224 region_id: region.region_id,
225 err: "Only allow changing format type to flat",
226 }
227 );
228 }
229 }
230 SetRegionOption::AppendMode(new_append_mode) => {
231 if new_append_mode != current_options.append_mode {
233 ensure!(
235 !current_options.append_mode && new_append_mode,
236 store_api::metadata::InvalidRegionRequestSnafu {
237 region_id: region.region_id,
238 err: "Only allow changing append_mode from false to true",
239 }
240 );
241 current_options.merge_mode = None;
243 all_options_altered = false;
244 }
245 }
246 }
247 }
248 region.version_control.alter_options(current_options);
249 if all_options_altered {
250 Ok(None)
251 } else {
252 Ok(Some(region.version()))
253 }
254 }
255}
256
257fn new_region_options_on_empty_memtable(
259 current_options: &RegionOptions,
260 kind: &AlterKind,
261) -> Option<RegionOptions> {
262 let AlterKind::SetRegionOptions { options } = kind else {
263 return None;
264 };
265
266 if options.is_empty() {
267 return None;
268 }
269
270 let mut current_options = current_options.clone();
271 for option in options {
272 match option {
273 SetRegionOption::Ttl(_) | SetRegionOption::Twsc(_, _) => (),
274 SetRegionOption::Format(format_str) => {
275 let new_format = format_str.parse::<FormatType>().unwrap();
277 assert_eq!(FormatType::Flat, new_format);
278
279 current_options.sst_format = Some(new_format);
280 }
281 SetRegionOption::AppendMode(new_append_mode) => {
282 assert!(*new_append_mode && !current_options.append_mode);
284
285 current_options.append_mode = true;
286 current_options.merge_mode = None;
287 }
288 }
289 }
290 Some(current_options)
291}
292
293fn metadata_after_alteration(
297 metadata: &RegionMetadata,
298 request: RegionAlterRequest,
299) -> Result<RegionMetadataRef> {
300 let mut builder = RegionMetadataBuilder::from_existing(metadata.clone());
301 builder
302 .alter(request.kind)
303 .context(InvalidRegionRequestSnafu)?
304 .bump_version();
305 let new_meta = builder.build().context(InvalidMetadataSnafu)?;
306
307 Ok(Arc::new(new_meta))
308}
309
310fn set_twcs_options(
311 options: &mut TwcsOptions,
312 default_option: &TwcsOptions,
313 key: &str,
314 value: &str,
315 region_id: RegionId,
316) -> std::result::Result<(), MetadataError> {
317 match key {
318 mito_engine_options::TWCS_TRIGGER_FILE_NUM => {
319 let files = parse_usize_with_default(key, value, default_option.trigger_file_num)?;
320 log_option_update(region_id, key, options.trigger_file_num, files);
321 options.trigger_file_num = files;
322 }
323 mito_engine_options::TWCS_MAX_OUTPUT_FILE_SIZE => {
324 let size = if value.is_empty() {
325 default_option.max_output_file_size
326 } else {
327 Some(
328 ReadableSize::from_str(value)
329 .map_err(|_| InvalidSetRegionOptionRequestSnafu { key, value }.build())?,
330 )
331 };
332 log_option_update(region_id, key, options.max_output_file_size, size);
333 options.max_output_file_size = size;
334 }
335 mito_engine_options::TWCS_TIME_WINDOW => {
336 let window = if value.is_empty() {
337 default_option.time_window
338 } else {
339 Some(
340 humantime::parse_duration(value)
341 .map_err(|_| InvalidSetRegionOptionRequestSnafu { key, value }.build())?,
342 )
343 };
344 log_option_update(region_id, key, options.time_window, window);
345 options.time_window = window;
346 }
347 _ => return InvalidSetRegionOptionRequestSnafu { key, value }.fail(),
348 }
349 Ok(())
350}
351
352fn parse_usize_with_default(
353 key: &str,
354 value: &str,
355 default: usize,
356) -> std::result::Result<usize, MetadataError> {
357 if value.is_empty() {
358 Ok(default)
359 } else {
360 value
361 .parse::<usize>()
362 .map_err(|_| InvalidSetRegionOptionRequestSnafu { key, value }.build())
363 }
364}
365
366fn log_option_update<T: std::fmt::Debug>(
367 region_id: RegionId,
368 option_name: &str,
369 prev_value: T,
370 cur_value: T,
371) {
372 info!(
373 "Update region {}: {}, previous: {:?}, new: {:?}",
374 option_name, region_id, prev_value, cur_value
375 );
376}
377
378fn need_change_index(kind: &AlterKind) -> bool {
380 match kind {
381 AlterKind::SetIndexes { options: _ } => true,
384 _ => false,
388 }
389}