mito2/worker/
handle_alter.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Handling alter related requests.
16
17use std::str::FromStr;
18use std::sync::Arc;
19
20use common_base::readable_size::ReadableSize;
21use common_telemetry::info;
22use common_telemetry::tracing::warn;
23use humantime_serde::re::humantime;
24use snafu::{ResultExt, ensure};
25use store_api::logstore::LogStore;
26use store_api::metadata::{
27    InvalidSetRegionOptionRequestSnafu, MetadataError, RegionMetadata, RegionMetadataBuilder,
28    RegionMetadataRef,
29};
30use store_api::mito_engine_options;
31use store_api::region_request::{AlterKind, RegionAlterRequest, SetRegionOption};
32use store_api::storage::RegionId;
33
34use crate::error::{InvalidMetadataSnafu, InvalidRegionRequestSnafu, Result};
35use crate::flush::FlushReason;
36use crate::manifest::action::RegionChange;
37use crate::region::MitoRegionRef;
38use crate::region::options::CompactionOptions::Twcs;
39use crate::region::options::{RegionOptions, TwcsOptions};
40use crate::region::version::VersionRef;
41use crate::request::{DdlRequest, OptionOutputTx, SenderDdlRequest};
42use crate::sst::FormatType;
43use crate::worker::RegionWorkerLoop;
44
45impl<S: LogStore> RegionWorkerLoop<S> {
46    pub(crate) async fn handle_alter_request(
47        &mut self,
48        region_id: RegionId,
49        request: RegionAlterRequest,
50        sender: OptionOutputTx,
51    ) {
52        let region = match self.regions.writable_non_staging_region(region_id) {
53            Ok(region) => region,
54            Err(e) => {
55                sender.send(Err(e));
56                return;
57            }
58        };
59
60        info!("Try to alter region: {}, request: {:?}", region_id, request);
61
62        // Gets the version before alter.
63        let mut version = region.version();
64
65        // fast path for memory state changes like options.
66        let set_options = match &request.kind {
67            AlterKind::SetRegionOptions { options } => options.clone(),
68            AlterKind::UnsetRegionOptions { keys } => {
69                // Converts the keys to SetRegionOption.
70                //
71                // It passes an empty string to achieve the purpose of unset
72                keys.iter().map(Into::into).collect()
73            }
74            _ => Vec::new(),
75        };
76        if !set_options.is_empty() {
77            match self.handle_alter_region_options_fast(&region, version, set_options) {
78                Ok(new_version) => {
79                    let Some(new_version) = new_version else {
80                        // We don't have options to alter after flush.
81                        sender.send(Ok(0));
82                        return;
83                    };
84                    version = new_version;
85                }
86                Err(e) => {
87                    sender.send(Err(e).context(InvalidMetadataSnafu));
88                    return;
89                }
90            }
91        }
92
93        // Validates request.
94        if let Err(e) = request.validate(&version.metadata) {
95            // Invalid request.
96            sender.send(Err(e).context(InvalidRegionRequestSnafu));
97            return;
98        }
99
100        // Checks whether we need to alter the region.
101        if !request.need_alter(&version.metadata) {
102            warn!(
103                "Ignores alter request as it alters nothing, region_id: {}, request: {:?}",
104                region_id, request
105            );
106            sender.send(Ok(0));
107            return;
108        }
109
110        // Checks whether we can alter the region directly.
111        if !version.memtables.is_empty() {
112            // If memtable is not empty, we can't alter it directly and need to flush
113            // all memtables first.
114            info!("Flush region: {} before alteration", region_id);
115
116            // Try to submit a flush task.
117            let task = self.new_flush_task(&region, FlushReason::Alter, None, self.config.clone());
118            if let Err(e) =
119                self.flush_scheduler
120                    .schedule_flush(region.region_id, &region.version_control, task)
121            {
122                // Unable to flush the region, send error to waiter.
123                sender.send(Err(e));
124                return;
125            }
126
127            // Safety: We have requested flush.
128            self.flush_scheduler
129                .add_ddl_request_to_pending(SenderDdlRequest {
130                    region_id,
131                    sender,
132                    request: DdlRequest::Alter(request),
133                });
134
135            return;
136        }
137
138        info!(
139            "Try to alter region {}, version.metadata: {:?}, version.options: {:?}, request: {:?}",
140            region_id, version.metadata, version.options, request,
141        );
142        self.handle_alter_region_with_empty_memtable(region, version, request, sender);
143    }
144
145    // TODO(yingwen): Optional new options and sst format.
146    /// Handles region metadata and format changes when the region memtable is empty.
147    fn handle_alter_region_with_empty_memtable(
148        &mut self,
149        region: MitoRegionRef,
150        version: VersionRef,
151        request: RegionAlterRequest,
152        sender: OptionOutputTx,
153    ) {
154        let need_index = need_change_index(&request.kind);
155        let new_options = new_region_options_on_empty_memtable(&version.options, &request.kind);
156        let new_meta = match metadata_after_alteration(&version.metadata, request) {
157            Ok(new_meta) => new_meta,
158            Err(e) => {
159                sender.send(Err(e));
160                return;
161            }
162        };
163        // Persist the metadata to region's manifest.
164        let options = new_options.as_ref().unwrap_or(&version.options);
165        let change = RegionChange {
166            metadata: new_meta,
167            sst_format: options.sst_format.unwrap_or_default(),
168            append_mode: Some(options.append_mode),
169        };
170        self.handle_manifest_region_change(region, change, need_index, new_options, sender);
171    }
172
173    /// Handles requests that changes region options, like TTL. It only affects memory state
174    /// since changes are persisted in the `DatanodeTableValue` in metasrv.
175    ///
176    /// If the options require empty memtable, it only does validation.
177    ///
178    /// Returns a new version with the updated options if it needs further alteration.
179    fn handle_alter_region_options_fast(
180        &mut self,
181        region: &MitoRegionRef,
182        version: VersionRef,
183        options: Vec<SetRegionOption>,
184    ) -> std::result::Result<Option<VersionRef>, MetadataError> {
185        assert!(!options.is_empty());
186
187        let mut all_options_altered = true;
188        let mut current_options = version.options.clone();
189        for option in options {
190            match option {
191                SetRegionOption::Ttl(new_ttl) => {
192                    info!(
193                        "Update region ttl: {}, previous: {:?} new: {:?}",
194                        region.region_id, current_options.ttl, new_ttl
195                    );
196                    current_options.ttl = new_ttl;
197                }
198                SetRegionOption::Twsc(key, value) => {
199                    let Twcs(options) = &mut current_options.compaction;
200                    set_twcs_options(
201                        options,
202                        &TwcsOptions::default(),
203                        &key,
204                        &value,
205                        region.region_id,
206                    )?;
207                }
208                SetRegionOption::Format(format_str) => {
209                    let new_format = format_str.parse::<FormatType>().map_err(|_| {
210                        store_api::metadata::InvalidRegionRequestSnafu {
211                            region_id: region.region_id,
212                            err: format!("Invalid format type: {}", format_str),
213                        }
214                        .build()
215                    })?;
216                    // If the format is unchanged, we also consider the option is altered.
217                    if new_format != current_options.sst_format.unwrap_or_default() {
218                        all_options_altered = false;
219
220                        // Validates the format type.
221                        ensure!(
222                            new_format == FormatType::Flat,
223                            store_api::metadata::InvalidRegionRequestSnafu {
224                                region_id: region.region_id,
225                                err: "Only allow changing format type to flat",
226                            }
227                        );
228                    }
229                }
230                SetRegionOption::AppendMode(new_append_mode) => {
231                    // If the append mode is unchanged, we consider the option is altered.
232                    if new_append_mode != current_options.append_mode {
233                        // Validates: only allow changing from false to true.
234                        ensure!(
235                            !current_options.append_mode && new_append_mode,
236                            store_api::metadata::InvalidRegionRequestSnafu {
237                                region_id: region.region_id,
238                                err: "Only allow changing append_mode from false to true",
239                            }
240                        );
241                        // Clear merge_mode since it's incompatible with append_mode.
242                        current_options.merge_mode = None;
243                        all_options_altered = false;
244                    }
245                }
246            }
247        }
248        region.version_control.alter_options(current_options);
249        if all_options_altered {
250            Ok(None)
251        } else {
252            Ok(Some(region.version()))
253        }
254    }
255}
256
257/// Returns the new region options if there are updates to the options.
258fn new_region_options_on_empty_memtable(
259    current_options: &RegionOptions,
260    kind: &AlterKind,
261) -> Option<RegionOptions> {
262    let AlterKind::SetRegionOptions { options } = kind else {
263        return None;
264    };
265
266    if options.is_empty() {
267        return None;
268    }
269
270    let mut current_options = current_options.clone();
271    for option in options {
272        match option {
273            SetRegionOption::Ttl(_) | SetRegionOption::Twsc(_, _) => (),
274            SetRegionOption::Format(format_str) => {
275                // Safety: handle_alter_region_options_fast() has validated this.
276                let new_format = format_str.parse::<FormatType>().unwrap();
277                assert_eq!(FormatType::Flat, new_format);
278
279                current_options.sst_format = Some(new_format);
280            }
281            SetRegionOption::AppendMode(new_append_mode) => {
282                // Safety: handle_alter_region_options_fast() has validated this.
283                assert!(*new_append_mode && !current_options.append_mode);
284
285                current_options.append_mode = true;
286                current_options.merge_mode = None;
287            }
288        }
289    }
290    Some(current_options)
291}
292
293/// Creates a metadata after applying the alter `request` to the old `metadata`.
294///
295/// Returns an error if the `request` is invalid.
296fn metadata_after_alteration(
297    metadata: &RegionMetadata,
298    request: RegionAlterRequest,
299) -> Result<RegionMetadataRef> {
300    let mut builder = RegionMetadataBuilder::from_existing(metadata.clone());
301    builder
302        .alter(request.kind)
303        .context(InvalidRegionRequestSnafu)?
304        .bump_version();
305    let new_meta = builder.build().context(InvalidMetadataSnafu)?;
306
307    Ok(Arc::new(new_meta))
308}
309
310fn set_twcs_options(
311    options: &mut TwcsOptions,
312    default_option: &TwcsOptions,
313    key: &str,
314    value: &str,
315    region_id: RegionId,
316) -> std::result::Result<(), MetadataError> {
317    match key {
318        mito_engine_options::TWCS_TRIGGER_FILE_NUM => {
319            let files = parse_usize_with_default(key, value, default_option.trigger_file_num)?;
320            log_option_update(region_id, key, options.trigger_file_num, files);
321            options.trigger_file_num = files;
322        }
323        mito_engine_options::TWCS_MAX_OUTPUT_FILE_SIZE => {
324            let size = if value.is_empty() {
325                default_option.max_output_file_size
326            } else {
327                Some(
328                    ReadableSize::from_str(value)
329                        .map_err(|_| InvalidSetRegionOptionRequestSnafu { key, value }.build())?,
330                )
331            };
332            log_option_update(region_id, key, options.max_output_file_size, size);
333            options.max_output_file_size = size;
334        }
335        mito_engine_options::TWCS_TIME_WINDOW => {
336            let window = if value.is_empty() {
337                default_option.time_window
338            } else {
339                Some(
340                    humantime::parse_duration(value)
341                        .map_err(|_| InvalidSetRegionOptionRequestSnafu { key, value }.build())?,
342                )
343            };
344            log_option_update(region_id, key, options.time_window, window);
345            options.time_window = window;
346        }
347        _ => return InvalidSetRegionOptionRequestSnafu { key, value }.fail(),
348    }
349    Ok(())
350}
351
352fn parse_usize_with_default(
353    key: &str,
354    value: &str,
355    default: usize,
356) -> std::result::Result<usize, MetadataError> {
357    if value.is_empty() {
358        Ok(default)
359    } else {
360        value
361            .parse::<usize>()
362            .map_err(|_| InvalidSetRegionOptionRequestSnafu { key, value }.build())
363    }
364}
365
366fn log_option_update<T: std::fmt::Debug>(
367    region_id: RegionId,
368    option_name: &str,
369    prev_value: T,
370    cur_value: T,
371) {
372    info!(
373        "Update region {}: {}, previous: {:?}, new: {:?}",
374        option_name, region_id, prev_value, cur_value
375    );
376}
377
378/// Used to determine whether we can build index directly after schema change.
379fn need_change_index(kind: &AlterKind) -> bool {
380    match kind {
381        // `SetIndexes` is a fast-path operation because it can build indexes for existing SSTs
382        // in the background, without needing to wait for a flush or compaction cycle.
383        AlterKind::SetIndexes { options: _ } => true,
384        // For AddColumns, DropColumns, UnsetIndexes and ModifyColumnTypes, we don't treat them as index changes.
385        // Index files still need to be rebuilt after schema changes,
386        // but this will happen automatically during flush or compaction.
387        _ => false,
388    }
389}