1use std::str::FromStr;
18use std::sync::Arc;
19
20use common_base::readable_size::ReadableSize;
21use common_telemetry::info;
22use common_telemetry::tracing::warn;
23use humantime_serde::re::humantime;
24use snafu::ResultExt;
25use store_api::metadata::{
26 InvalidSetRegionOptionRequestSnafu, MetadataError, RegionMetadata, RegionMetadataBuilder,
27 RegionMetadataRef,
28};
29use store_api::mito_engine_options;
30use store_api::region_request::{AlterKind, RegionAlterRequest, SetRegionOption};
31use store_api::storage::RegionId;
32
33use crate::error::{InvalidMetadataSnafu, InvalidRegionRequestSnafu, Result};
34use crate::flush::FlushReason;
35use crate::manifest::action::RegionChange;
36use crate::region::options::CompactionOptions::Twcs;
37use crate::region::options::TwcsOptions;
38use crate::region::version::VersionRef;
39use crate::region::MitoRegionRef;
40use crate::request::{DdlRequest, OptionOutputTx, SenderDdlRequest};
41use crate::worker::RegionWorkerLoop;
42
43impl<S> RegionWorkerLoop<S> {
44 pub(crate) async fn handle_alter_request(
45 &mut self,
46 region_id: RegionId,
47 request: RegionAlterRequest,
48 mut sender: OptionOutputTx,
49 ) {
50 let Some(region) = self.regions.writable_region_or(region_id, &mut sender) else {
51 return;
52 };
53
54 info!("Try to alter region: {}, request: {:?}", region_id, request);
55
56 let version = region.version();
58
59 match request.kind {
61 AlterKind::SetRegionOptions { options } => {
62 match self.handle_alter_region_options(region, version, options) {
63 Ok(_) => sender.send(Ok(0)),
64 Err(e) => sender.send(Err(e).context(InvalidMetadataSnafu)),
65 }
66 return;
67 }
68 AlterKind::UnsetRegionOptions { keys } => {
69 match self.handle_alter_region_options(
73 region,
74 version,
75 keys.iter().map(Into::into).collect(),
76 ) {
77 Ok(_) => sender.send(Ok(0)),
78 Err(e) => sender.send(Err(e).context(InvalidMetadataSnafu)),
79 }
80 return;
81 }
82 _ => {}
83 }
84
85 if let Err(e) = request.validate(&version.metadata) {
87 sender.send(Err(e).context(InvalidRegionRequestSnafu));
89 return;
90 }
91
92 if !request.need_alter(&version.metadata) {
94 warn!(
95 "Ignores alter request as it alters nothing, region_id: {}, request: {:?}",
96 region_id, request
97 );
98 sender.send(Ok(0));
99 return;
100 }
101
102 if !version.memtables.is_empty() {
104 info!("Flush region: {} before alteration", region_id);
107
108 let task = self.new_flush_task(®ion, FlushReason::Alter, None, self.config.clone());
110 if let Err(e) =
111 self.flush_scheduler
112 .schedule_flush(region.region_id, ®ion.version_control, task)
113 {
114 sender.send(Err(e));
116 return;
117 }
118
119 self.flush_scheduler
121 .add_ddl_request_to_pending(SenderDdlRequest {
122 region_id,
123 sender,
124 request: DdlRequest::Alter(request),
125 });
126
127 return;
128 }
129
130 info!(
131 "Try to alter region {}, version.metadata: {:?}, request: {:?}",
132 region_id, version.metadata, request,
133 );
134 self.handle_alter_region_metadata(region, version, request, sender);
135 }
136
137 fn handle_alter_region_metadata(
139 &mut self,
140 region: MitoRegionRef,
141 version: VersionRef,
142 request: RegionAlterRequest,
143 sender: OptionOutputTx,
144 ) {
145 let new_meta = match metadata_after_alteration(&version.metadata, request) {
146 Ok(new_meta) => new_meta,
147 Err(e) => {
148 sender.send(Err(e));
149 return;
150 }
151 };
152 let change = RegionChange { metadata: new_meta };
154 self.handle_manifest_region_change(region, change, sender)
155 }
156
157 fn handle_alter_region_options(
160 &mut self,
161 region: MitoRegionRef,
162 version: VersionRef,
163 options: Vec<SetRegionOption>,
164 ) -> std::result::Result<(), MetadataError> {
165 let mut current_options = version.options.clone();
166 for option in options {
167 match option {
168 SetRegionOption::Ttl(new_ttl) => {
169 info!(
170 "Update region ttl: {}, previous: {:?} new: {:?}",
171 region.region_id, current_options.ttl, new_ttl
172 );
173 current_options.ttl = new_ttl;
174 }
175 SetRegionOption::Twsc(key, value) => {
176 let Twcs(options) = &mut current_options.compaction;
177 set_twcs_options(
178 options,
179 &TwcsOptions::default(),
180 &key,
181 &value,
182 region.region_id,
183 )?;
184 }
185 }
186 }
187 region.version_control.alter_options(current_options);
188 Ok(())
189 }
190}
191
192fn metadata_after_alteration(
196 metadata: &RegionMetadata,
197 request: RegionAlterRequest,
198) -> Result<RegionMetadataRef> {
199 let mut builder = RegionMetadataBuilder::from_existing(metadata.clone());
200 builder
201 .alter(request.kind)
202 .context(InvalidRegionRequestSnafu)?
203 .bump_version();
204 let new_meta = builder.build().context(InvalidMetadataSnafu)?;
205
206 Ok(Arc::new(new_meta))
207}
208
209fn set_twcs_options(
210 options: &mut TwcsOptions,
211 default_option: &TwcsOptions,
212 key: &str,
213 value: &str,
214 region_id: RegionId,
215) -> std::result::Result<(), MetadataError> {
216 match key {
217 mito_engine_options::TWCS_MAX_ACTIVE_WINDOW_RUNS => {
218 let runs = parse_usize_with_default(key, value, default_option.max_active_window_runs)?;
219 log_option_update(region_id, key, options.max_active_window_runs, runs);
220 options.max_active_window_runs = runs;
221 }
222 mito_engine_options::TWCS_MAX_ACTIVE_WINDOW_FILES => {
223 let files =
224 parse_usize_with_default(key, value, default_option.max_active_window_files)?;
225 log_option_update(region_id, key, options.max_active_window_files, files);
226 options.max_active_window_files = files;
227 }
228 mito_engine_options::TWCS_MAX_INACTIVE_WINDOW_RUNS => {
229 let runs =
230 parse_usize_with_default(key, value, default_option.max_inactive_window_runs)?;
231 log_option_update(region_id, key, options.max_inactive_window_runs, runs);
232 options.max_inactive_window_runs = runs;
233 }
234 mito_engine_options::TWCS_MAX_INACTIVE_WINDOW_FILES => {
235 let files =
236 parse_usize_with_default(key, value, default_option.max_inactive_window_files)?;
237 log_option_update(region_id, key, options.max_inactive_window_files, files);
238 options.max_inactive_window_files = files;
239 }
240 mito_engine_options::TWCS_MAX_OUTPUT_FILE_SIZE => {
241 let size = if value.is_empty() {
242 default_option.max_output_file_size
243 } else {
244 Some(
245 ReadableSize::from_str(value)
246 .map_err(|_| InvalidSetRegionOptionRequestSnafu { key, value }.build())?,
247 )
248 };
249 log_option_update(region_id, key, options.max_output_file_size, size);
250 options.max_output_file_size = size;
251 }
252 mito_engine_options::TWCS_TIME_WINDOW => {
253 let window = if value.is_empty() {
254 default_option.time_window
255 } else {
256 Some(
257 humantime::parse_duration(value)
258 .map_err(|_| InvalidSetRegionOptionRequestSnafu { key, value }.build())?,
259 )
260 };
261 log_option_update(region_id, key, options.time_window, window);
262 options.time_window = window;
263 }
264 _ => return InvalidSetRegionOptionRequestSnafu { key, value }.fail(),
265 }
266 Ok(())
267}
268
269fn parse_usize_with_default(
270 key: &str,
271 value: &str,
272 default: usize,
273) -> std::result::Result<usize, MetadataError> {
274 if value.is_empty() {
275 Ok(default)
276 } else {
277 value
278 .parse::<usize>()
279 .map_err(|_| InvalidSetRegionOptionRequestSnafu { key, value }.build())
280 }
281}
282
283fn log_option_update<T: std::fmt::Debug>(
284 region_id: RegionId,
285 option_name: &str,
286 prev_value: T,
287 cur_value: T,
288) {
289 info!(
290 "Update region {}: {}, previous: {:?}, new: {:?}",
291 option_name, region_id, prev_value, cur_value
292 );
293}