1use std::str::FromStr;
18use std::sync::Arc;
19
20use common_base::readable_size::ReadableSize;
21use common_telemetry::info;
22use common_telemetry::tracing::warn;
23use humantime_serde::re::humantime;
24use snafu::ResultExt;
25use store_api::metadata::{
26 InvalidSetRegionOptionRequestSnafu, MetadataError, RegionMetadata, RegionMetadataBuilder,
27 RegionMetadataRef,
28};
29use store_api::mito_engine_options;
30use store_api::region_request::{AlterKind, RegionAlterRequest, SetRegionOption};
31use store_api::storage::RegionId;
32
33use crate::error::{InvalidMetadataSnafu, InvalidRegionRequestSnafu, Result};
34use crate::flush::FlushReason;
35use crate::manifest::action::RegionChange;
36use crate::region::options::CompactionOptions::Twcs;
37use crate::region::options::TwcsOptions;
38use crate::region::version::VersionRef;
39use crate::region::MitoRegionRef;
40use crate::request::{DdlRequest, OptionOutputTx, SenderDdlRequest};
41use crate::worker::RegionWorkerLoop;
42
43impl<S> RegionWorkerLoop<S> {
44 pub(crate) async fn handle_alter_request(
45 &mut self,
46 region_id: RegionId,
47 request: RegionAlterRequest,
48 sender: OptionOutputTx,
49 ) {
50 let region = match self.regions.writable_non_staging_region(region_id) {
51 Ok(region) => region,
52 Err(e) => {
53 sender.send(Err(e));
54 return;
55 }
56 };
57
58 info!("Try to alter region: {}, request: {:?}", region_id, request);
59
60 let version = region.version();
62
63 match request.kind {
65 AlterKind::SetRegionOptions { options } => {
66 match self.handle_alter_region_options(region, version, options) {
67 Ok(_) => sender.send(Ok(0)),
68 Err(e) => sender.send(Err(e).context(InvalidMetadataSnafu)),
69 }
70 return;
71 }
72 AlterKind::UnsetRegionOptions { keys } => {
73 match self.handle_alter_region_options(
77 region,
78 version,
79 keys.iter().map(Into::into).collect(),
80 ) {
81 Ok(_) => sender.send(Ok(0)),
82 Err(e) => sender.send(Err(e).context(InvalidMetadataSnafu)),
83 }
84 return;
85 }
86 _ => {}
87 }
88
89 if let Err(e) = request.validate(&version.metadata) {
91 sender.send(Err(e).context(InvalidRegionRequestSnafu));
93 return;
94 }
95
96 if !request.need_alter(&version.metadata) {
98 warn!(
99 "Ignores alter request as it alters nothing, region_id: {}, request: {:?}",
100 region_id, request
101 );
102 sender.send(Ok(0));
103 return;
104 }
105
106 if !version.memtables.is_empty() {
108 info!("Flush region: {} before alteration", region_id);
111
112 let task = self.new_flush_task(®ion, FlushReason::Alter, None, self.config.clone());
114 if let Err(e) =
115 self.flush_scheduler
116 .schedule_flush(region.region_id, ®ion.version_control, task)
117 {
118 sender.send(Err(e));
120 return;
121 }
122
123 self.flush_scheduler
125 .add_ddl_request_to_pending(SenderDdlRequest {
126 region_id,
127 sender,
128 request: DdlRequest::Alter(request),
129 });
130
131 return;
132 }
133
134 info!(
135 "Try to alter region {}, version.metadata: {:?}, request: {:?}",
136 region_id, version.metadata, request,
137 );
138 self.handle_alter_region_metadata(region, version, request, sender);
139 }
140
141 fn handle_alter_region_metadata(
143 &mut self,
144 region: MitoRegionRef,
145 version: VersionRef,
146 request: RegionAlterRequest,
147 sender: OptionOutputTx,
148 ) {
149 let new_meta = match metadata_after_alteration(&version.metadata, request) {
150 Ok(new_meta) => new_meta,
151 Err(e) => {
152 sender.send(Err(e));
153 return;
154 }
155 };
156 let change = RegionChange { metadata: new_meta };
158 self.handle_manifest_region_change(region, change, sender)
159 }
160
161 fn handle_alter_region_options(
164 &mut self,
165 region: MitoRegionRef,
166 version: VersionRef,
167 options: Vec<SetRegionOption>,
168 ) -> std::result::Result<(), MetadataError> {
169 let mut current_options = version.options.clone();
170 for option in options {
171 match option {
172 SetRegionOption::Ttl(new_ttl) => {
173 info!(
174 "Update region ttl: {}, previous: {:?} new: {:?}",
175 region.region_id, current_options.ttl, new_ttl
176 );
177 current_options.ttl = new_ttl;
178 }
179 SetRegionOption::Twsc(key, value) => {
180 let Twcs(options) = &mut current_options.compaction;
181 set_twcs_options(
182 options,
183 &TwcsOptions::default(),
184 &key,
185 &value,
186 region.region_id,
187 )?;
188 }
189 }
190 }
191 region.version_control.alter_options(current_options);
192 Ok(())
193 }
194}
195
196fn metadata_after_alteration(
200 metadata: &RegionMetadata,
201 request: RegionAlterRequest,
202) -> Result<RegionMetadataRef> {
203 let mut builder = RegionMetadataBuilder::from_existing(metadata.clone());
204 builder
205 .alter(request.kind)
206 .context(InvalidRegionRequestSnafu)?
207 .bump_version();
208 let new_meta = builder.build().context(InvalidMetadataSnafu)?;
209
210 Ok(Arc::new(new_meta))
211}
212
213fn set_twcs_options(
214 options: &mut TwcsOptions,
215 default_option: &TwcsOptions,
216 key: &str,
217 value: &str,
218 region_id: RegionId,
219) -> std::result::Result<(), MetadataError> {
220 match key {
221 mito_engine_options::TWCS_TRIGGER_FILE_NUM => {
222 let files = parse_usize_with_default(key, value, default_option.trigger_file_num)?;
223 log_option_update(region_id, key, options.trigger_file_num, files);
224 options.trigger_file_num = files;
225 }
226 mito_engine_options::TWCS_MAX_OUTPUT_FILE_SIZE => {
227 let size = if value.is_empty() {
228 default_option.max_output_file_size
229 } else {
230 Some(
231 ReadableSize::from_str(value)
232 .map_err(|_| InvalidSetRegionOptionRequestSnafu { key, value }.build())?,
233 )
234 };
235 log_option_update(region_id, key, options.max_output_file_size, size);
236 options.max_output_file_size = size;
237 }
238 mito_engine_options::TWCS_TIME_WINDOW => {
239 let window = if value.is_empty() {
240 default_option.time_window
241 } else {
242 Some(
243 humantime::parse_duration(value)
244 .map_err(|_| InvalidSetRegionOptionRequestSnafu { key, value }.build())?,
245 )
246 };
247 log_option_update(region_id, key, options.time_window, window);
248 options.time_window = window;
249 }
250 _ => return InvalidSetRegionOptionRequestSnafu { key, value }.fail(),
251 }
252 Ok(())
253}
254
255fn parse_usize_with_default(
256 key: &str,
257 value: &str,
258 default: usize,
259) -> std::result::Result<usize, MetadataError> {
260 if value.is_empty() {
261 Ok(default)
262 } else {
263 value
264 .parse::<usize>()
265 .map_err(|_| InvalidSetRegionOptionRequestSnafu { key, value }.build())
266 }
267}
268
269fn log_option_update<T: std::fmt::Debug>(
270 region_id: RegionId,
271 option_name: &str,
272 prev_value: T,
273 cur_value: T,
274) {
275 info!(
276 "Update region {}: {}, previous: {:?}, new: {:?}",
277 option_name, region_id, prev_value, cur_value
278 );
279}