1use std::str::FromStr;
18use std::sync::Arc;
19
20use common_base::readable_size::ReadableSize;
21use common_telemetry::info;
22use common_telemetry::tracing::warn;
23use humantime_serde::re::humantime;
24use snafu::ResultExt;
25use store_api::metadata::{
26 InvalidSetRegionOptionRequestSnafu, MetadataError, RegionMetadata, RegionMetadataBuilder,
27 RegionMetadataRef,
28};
29use store_api::mito_engine_options;
30use store_api::region_request::{AlterKind, RegionAlterRequest, SetRegionOption};
31use store_api::storage::RegionId;
32
33use crate::error::{InvalidMetadataSnafu, InvalidRegionRequestSnafu, Result};
34use crate::flush::FlushReason;
35use crate::manifest::action::RegionChange;
36use crate::region::MitoRegionRef;
37use crate::region::options::CompactionOptions::Twcs;
38use crate::region::options::TwcsOptions;
39use crate::region::version::VersionRef;
40use crate::request::{DdlRequest, OptionOutputTx, SenderDdlRequest};
41use crate::worker::RegionWorkerLoop;
42
43impl<S> RegionWorkerLoop<S> {
44 pub(crate) async fn handle_alter_request(
45 &mut self,
46 region_id: RegionId,
47 request: RegionAlterRequest,
48 sender: OptionOutputTx,
49 ) {
50 let region = match self.regions.writable_non_staging_region(region_id) {
51 Ok(region) => region,
52 Err(e) => {
53 sender.send(Err(e));
54 return;
55 }
56 };
57
58 info!("Try to alter region: {}, request: {:?}", region_id, request);
59
60 let version = region.version();
62
63 match request.kind {
65 AlterKind::SetRegionOptions { options } => {
66 match self.handle_alter_region_options(region, version, options) {
67 Ok(_) => sender.send(Ok(0)),
68 Err(e) => sender.send(Err(e).context(InvalidMetadataSnafu)),
69 }
70 return;
71 }
72 AlterKind::UnsetRegionOptions { keys } => {
73 match self.handle_alter_region_options(
77 region,
78 version,
79 keys.iter().map(Into::into).collect(),
80 ) {
81 Ok(_) => sender.send(Ok(0)),
82 Err(e) => sender.send(Err(e).context(InvalidMetadataSnafu)),
83 }
84 return;
85 }
86 _ => {}
87 }
88
89 if let Err(e) = request.validate(&version.metadata) {
91 sender.send(Err(e).context(InvalidRegionRequestSnafu));
93 return;
94 }
95
96 if !request.need_alter(&version.metadata) {
98 warn!(
99 "Ignores alter request as it alters nothing, region_id: {}, request: {:?}",
100 region_id, request
101 );
102 sender.send(Ok(0));
103 return;
104 }
105
106 if !version.memtables.is_empty() {
108 info!("Flush region: {} before alteration", region_id);
111
112 let task = self.new_flush_task(®ion, FlushReason::Alter, None, self.config.clone());
114 if let Err(e) =
115 self.flush_scheduler
116 .schedule_flush(region.region_id, ®ion.version_control, task)
117 {
118 sender.send(Err(e));
120 return;
121 }
122
123 self.flush_scheduler
125 .add_ddl_request_to_pending(SenderDdlRequest {
126 region_id,
127 sender,
128 request: DdlRequest::Alter(request),
129 });
130
131 return;
132 }
133
134 info!(
135 "Try to alter region {}, version.metadata: {:?}, request: {:?}",
136 region_id, version.metadata, request,
137 );
138 self.handle_alter_region_metadata(region, version, request, sender);
139 }
140
141 fn handle_alter_region_metadata(
143 &mut self,
144 region: MitoRegionRef,
145 version: VersionRef,
146 request: RegionAlterRequest,
147 sender: OptionOutputTx,
148 ) {
149 let need_index = need_change_index(&request.kind);
150 let new_meta = match metadata_after_alteration(&version.metadata, request) {
151 Ok(new_meta) => new_meta,
152 Err(e) => {
153 sender.send(Err(e));
154 return;
155 }
156 };
157 let change = RegionChange {
159 metadata: new_meta,
160 sst_format: region.sst_format(),
161 };
162 self.handle_manifest_region_change(region, change, need_index, sender);
163 }
164
165 fn handle_alter_region_options(
168 &mut self,
169 region: MitoRegionRef,
170 version: VersionRef,
171 options: Vec<SetRegionOption>,
172 ) -> std::result::Result<(), MetadataError> {
173 let mut current_options = version.options.clone();
174 for option in options {
175 match option {
176 SetRegionOption::Ttl(new_ttl) => {
177 info!(
178 "Update region ttl: {}, previous: {:?} new: {:?}",
179 region.region_id, current_options.ttl, new_ttl
180 );
181 current_options.ttl = new_ttl;
182 }
183 SetRegionOption::Twsc(key, value) => {
184 let Twcs(options) = &mut current_options.compaction;
185 set_twcs_options(
186 options,
187 &TwcsOptions::default(),
188 &key,
189 &value,
190 region.region_id,
191 )?;
192 }
193 }
194 }
195 region.version_control.alter_options(current_options);
196 Ok(())
197 }
198}
199
200fn metadata_after_alteration(
204 metadata: &RegionMetadata,
205 request: RegionAlterRequest,
206) -> Result<RegionMetadataRef> {
207 let mut builder = RegionMetadataBuilder::from_existing(metadata.clone());
208 builder
209 .alter(request.kind)
210 .context(InvalidRegionRequestSnafu)?
211 .bump_version();
212 let new_meta = builder.build().context(InvalidMetadataSnafu)?;
213
214 Ok(Arc::new(new_meta))
215}
216
217fn set_twcs_options(
218 options: &mut TwcsOptions,
219 default_option: &TwcsOptions,
220 key: &str,
221 value: &str,
222 region_id: RegionId,
223) -> std::result::Result<(), MetadataError> {
224 match key {
225 mito_engine_options::TWCS_TRIGGER_FILE_NUM => {
226 let files = parse_usize_with_default(key, value, default_option.trigger_file_num)?;
227 log_option_update(region_id, key, options.trigger_file_num, files);
228 options.trigger_file_num = files;
229 }
230 mito_engine_options::TWCS_MAX_OUTPUT_FILE_SIZE => {
231 let size = if value.is_empty() {
232 default_option.max_output_file_size
233 } else {
234 Some(
235 ReadableSize::from_str(value)
236 .map_err(|_| InvalidSetRegionOptionRequestSnafu { key, value }.build())?,
237 )
238 };
239 log_option_update(region_id, key, options.max_output_file_size, size);
240 options.max_output_file_size = size;
241 }
242 mito_engine_options::TWCS_TIME_WINDOW => {
243 let window = if value.is_empty() {
244 default_option.time_window
245 } else {
246 Some(
247 humantime::parse_duration(value)
248 .map_err(|_| InvalidSetRegionOptionRequestSnafu { key, value }.build())?,
249 )
250 };
251 log_option_update(region_id, key, options.time_window, window);
252 options.time_window = window;
253 }
254 _ => return InvalidSetRegionOptionRequestSnafu { key, value }.fail(),
255 }
256 Ok(())
257}
258
259fn parse_usize_with_default(
260 key: &str,
261 value: &str,
262 default: usize,
263) -> std::result::Result<usize, MetadataError> {
264 if value.is_empty() {
265 Ok(default)
266 } else {
267 value
268 .parse::<usize>()
269 .map_err(|_| InvalidSetRegionOptionRequestSnafu { key, value }.build())
270 }
271}
272
273fn log_option_update<T: std::fmt::Debug>(
274 region_id: RegionId,
275 option_name: &str,
276 prev_value: T,
277 cur_value: T,
278) {
279 info!(
280 "Update region {}: {}, previous: {:?}, new: {:?}",
281 option_name, region_id, prev_value, cur_value
282 );
283}
284
285fn need_change_index(kind: &AlterKind) -> bool {
287 match kind {
288 AlterKind::SetIndexes { options: _ } => true,
291 _ => false,
295 }
296}