mito2/worker/
handle_alter.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Handling alter related requests.
16
17use std::str::FromStr;
18use std::sync::Arc;
19
20use common_base::readable_size::ReadableSize;
21use common_telemetry::info;
22use common_telemetry::tracing::warn;
23use humantime_serde::re::humantime;
24use snafu::{ResultExt, ensure};
25use store_api::metadata::{
26    InvalidSetRegionOptionRequestSnafu, MetadataError, RegionMetadata, RegionMetadataBuilder,
27    RegionMetadataRef,
28};
29use store_api::mito_engine_options;
30use store_api::region_request::{AlterKind, RegionAlterRequest, SetRegionOption};
31use store_api::storage::RegionId;
32
33use crate::error::{InvalidMetadataSnafu, InvalidRegionRequestSnafu, Result};
34use crate::flush::FlushReason;
35use crate::manifest::action::RegionChange;
36use crate::region::MitoRegionRef;
37use crate::region::options::CompactionOptions::Twcs;
38use crate::region::options::{RegionOptions, TwcsOptions};
39use crate::region::version::VersionRef;
40use crate::request::{DdlRequest, OptionOutputTx, SenderDdlRequest};
41use crate::sst::FormatType;
42use crate::worker::RegionWorkerLoop;
43
44impl<S> RegionWorkerLoop<S> {
45    pub(crate) async fn handle_alter_request(
46        &mut self,
47        region_id: RegionId,
48        request: RegionAlterRequest,
49        sender: OptionOutputTx,
50    ) {
51        let region = match self.regions.writable_non_staging_region(region_id) {
52            Ok(region) => region,
53            Err(e) => {
54                sender.send(Err(e));
55                return;
56            }
57        };
58
59        info!("Try to alter region: {}, request: {:?}", region_id, request);
60
61        // Gets the version before alter.
62        let mut version = region.version();
63
64        // fast path for memory state changes like options.
65        let set_options = match &request.kind {
66            AlterKind::SetRegionOptions { options } => options.clone(),
67            AlterKind::UnsetRegionOptions { keys } => {
68                // Converts the keys to SetRegionOption.
69                //
70                // It passes an empty string to achieve the purpose of unset
71                keys.iter().map(Into::into).collect()
72            }
73            _ => Vec::new(),
74        };
75        if !set_options.is_empty() {
76            match self.handle_alter_region_options_fast(&region, version, set_options) {
77                Ok(new_version) => {
78                    let Some(new_version) = new_version else {
79                        // We don't have options to alter after flush.
80                        sender.send(Ok(0));
81                        return;
82                    };
83                    version = new_version;
84                }
85                Err(e) => {
86                    sender.send(Err(e).context(InvalidMetadataSnafu));
87                    return;
88                }
89            }
90        }
91
92        // Validates request.
93        if let Err(e) = request.validate(&version.metadata) {
94            // Invalid request.
95            sender.send(Err(e).context(InvalidRegionRequestSnafu));
96            return;
97        }
98
99        // Checks whether we need to alter the region.
100        if !request.need_alter(&version.metadata) {
101            warn!(
102                "Ignores alter request as it alters nothing, region_id: {}, request: {:?}",
103                region_id, request
104            );
105            sender.send(Ok(0));
106            return;
107        }
108
109        // Checks whether we can alter the region directly.
110        if !version.memtables.is_empty() {
111            // If memtable is not empty, we can't alter it directly and need to flush
112            // all memtables first.
113            info!("Flush region: {} before alteration", region_id);
114
115            // Try to submit a flush task.
116            let task = self.new_flush_task(
117                &region,
118                FlushReason::Alter,
119                None,
120                self.config.clone(),
121                region.is_staging(),
122            );
123            if let Err(e) =
124                self.flush_scheduler
125                    .schedule_flush(region.region_id, &region.version_control, task)
126            {
127                // Unable to flush the region, send error to waiter.
128                sender.send(Err(e));
129                return;
130            }
131
132            // Safety: We have requested flush.
133            self.flush_scheduler
134                .add_ddl_request_to_pending(SenderDdlRequest {
135                    region_id,
136                    sender,
137                    request: DdlRequest::Alter(request),
138                });
139
140            return;
141        }
142
143        info!(
144            "Try to alter region {}, version.metadata: {:?}, version.options: {:?}, request: {:?}",
145            region_id, version.metadata, version.options, request,
146        );
147        self.handle_alter_region_with_empty_memtable(region, version, request, sender);
148    }
149
150    // TODO(yingwen): Optional new options and sst format.
151    /// Handles region metadata and format changes when the region memtable is empty.
152    fn handle_alter_region_with_empty_memtable(
153        &mut self,
154        region: MitoRegionRef,
155        version: VersionRef,
156        request: RegionAlterRequest,
157        sender: OptionOutputTx,
158    ) {
159        let need_index = need_change_index(&request.kind);
160        let new_options = new_region_options_on_empty_memtable(&version.options, &request.kind);
161        let new_meta = match metadata_after_alteration(&version.metadata, request) {
162            Ok(new_meta) => new_meta,
163            Err(e) => {
164                sender.send(Err(e));
165                return;
166            }
167        };
168        // Persist the metadata to region's manifest.
169        let change = RegionChange {
170            metadata: new_meta,
171            sst_format: new_options
172                .as_ref()
173                .unwrap_or(&version.options)
174                .sst_format
175                .unwrap_or_default(),
176        };
177        self.handle_manifest_region_change(region, change, need_index, new_options, sender);
178    }
179
180    /// Handles requests that changes region options, like TTL. It only affects memory state
181    /// since changes are persisted in the `DatanodeTableValue` in metasrv.
182    ///
183    /// If the options require empty memtable, it only does validation.
184    ///
185    /// Returns a new version with the updated options if it needs further alteration.
186    fn handle_alter_region_options_fast(
187        &mut self,
188        region: &MitoRegionRef,
189        version: VersionRef,
190        options: Vec<SetRegionOption>,
191    ) -> std::result::Result<Option<VersionRef>, MetadataError> {
192        assert!(!options.is_empty());
193
194        let mut all_options_altered = true;
195        let mut current_options = version.options.clone();
196        for option in options {
197            match option {
198                SetRegionOption::Ttl(new_ttl) => {
199                    info!(
200                        "Update region ttl: {}, previous: {:?} new: {:?}",
201                        region.region_id, current_options.ttl, new_ttl
202                    );
203                    current_options.ttl = new_ttl;
204                }
205                SetRegionOption::Twsc(key, value) => {
206                    let Twcs(options) = &mut current_options.compaction;
207                    set_twcs_options(
208                        options,
209                        &TwcsOptions::default(),
210                        &key,
211                        &value,
212                        region.region_id,
213                    )?;
214                }
215                SetRegionOption::Format(format_str) => {
216                    let new_format = format_str.parse::<FormatType>().map_err(|_| {
217                        store_api::metadata::InvalidRegionRequestSnafu {
218                            region_id: region.region_id,
219                            err: format!("Invalid format type: {}", format_str),
220                        }
221                        .build()
222                    })?;
223                    // If the format is unchanged, we also consider the option is altered.
224                    if new_format != current_options.sst_format.unwrap_or_default() {
225                        all_options_altered = false;
226
227                        // Validates the format type.
228                        ensure!(
229                            new_format == FormatType::Flat,
230                            store_api::metadata::InvalidRegionRequestSnafu {
231                                region_id: region.region_id,
232                                err: "Only allow changing format type to flat",
233                            }
234                        );
235                    }
236                }
237            }
238        }
239        region.version_control.alter_options(current_options);
240        if all_options_altered {
241            Ok(None)
242        } else {
243            Ok(Some(region.version()))
244        }
245    }
246}
247
248/// Returns the new region options if there are updates to the options.
249fn new_region_options_on_empty_memtable(
250    current_options: &RegionOptions,
251    kind: &AlterKind,
252) -> Option<RegionOptions> {
253    let AlterKind::SetRegionOptions { options } = kind else {
254        return None;
255    };
256
257    if options.is_empty() {
258        return None;
259    }
260
261    let mut current_options = current_options.clone();
262    for option in options {
263        match option {
264            SetRegionOption::Ttl(_) | SetRegionOption::Twsc(_, _) => (),
265            SetRegionOption::Format(format_str) => {
266                // Safety: handle_alter_region_options_fast() has validated this.
267                let new_format = format_str.parse::<FormatType>().unwrap();
268                assert_eq!(FormatType::Flat, new_format);
269
270                current_options.sst_format = Some(new_format);
271            }
272        }
273    }
274    Some(current_options)
275}
276
277/// Creates a metadata after applying the alter `request` to the old `metadata`.
278///
279/// Returns an error if the `request` is invalid.
280fn metadata_after_alteration(
281    metadata: &RegionMetadata,
282    request: RegionAlterRequest,
283) -> Result<RegionMetadataRef> {
284    let mut builder = RegionMetadataBuilder::from_existing(metadata.clone());
285    builder
286        .alter(request.kind)
287        .context(InvalidRegionRequestSnafu)?
288        .bump_version();
289    let new_meta = builder.build().context(InvalidMetadataSnafu)?;
290
291    Ok(Arc::new(new_meta))
292}
293
294fn set_twcs_options(
295    options: &mut TwcsOptions,
296    default_option: &TwcsOptions,
297    key: &str,
298    value: &str,
299    region_id: RegionId,
300) -> std::result::Result<(), MetadataError> {
301    match key {
302        mito_engine_options::TWCS_TRIGGER_FILE_NUM => {
303            let files = parse_usize_with_default(key, value, default_option.trigger_file_num)?;
304            log_option_update(region_id, key, options.trigger_file_num, files);
305            options.trigger_file_num = files;
306        }
307        mito_engine_options::TWCS_MAX_OUTPUT_FILE_SIZE => {
308            let size = if value.is_empty() {
309                default_option.max_output_file_size
310            } else {
311                Some(
312                    ReadableSize::from_str(value)
313                        .map_err(|_| InvalidSetRegionOptionRequestSnafu { key, value }.build())?,
314                )
315            };
316            log_option_update(region_id, key, options.max_output_file_size, size);
317            options.max_output_file_size = size;
318        }
319        mito_engine_options::TWCS_TIME_WINDOW => {
320            let window = if value.is_empty() {
321                default_option.time_window
322            } else {
323                Some(
324                    humantime::parse_duration(value)
325                        .map_err(|_| InvalidSetRegionOptionRequestSnafu { key, value }.build())?,
326                )
327            };
328            log_option_update(region_id, key, options.time_window, window);
329            options.time_window = window;
330        }
331        _ => return InvalidSetRegionOptionRequestSnafu { key, value }.fail(),
332    }
333    Ok(())
334}
335
336fn parse_usize_with_default(
337    key: &str,
338    value: &str,
339    default: usize,
340) -> std::result::Result<usize, MetadataError> {
341    if value.is_empty() {
342        Ok(default)
343    } else {
344        value
345            .parse::<usize>()
346            .map_err(|_| InvalidSetRegionOptionRequestSnafu { key, value }.build())
347    }
348}
349
350fn log_option_update<T: std::fmt::Debug>(
351    region_id: RegionId,
352    option_name: &str,
353    prev_value: T,
354    cur_value: T,
355) {
356    info!(
357        "Update region {}: {}, previous: {:?}, new: {:?}",
358        option_name, region_id, prev_value, cur_value
359    );
360}
361
362/// Used to determine whether we can build index directly after schema change.
363fn need_change_index(kind: &AlterKind) -> bool {
364    match kind {
365        // `SetIndexes` is a fast-path operation because it can build indexes for existing SSTs
366        // in the background, without needing to wait for a flush or compaction cycle.
367        AlterKind::SetIndexes { options: _ } => true,
368        // For AddColumns, DropColumns, UnsetIndexes and ModifyColumnTypes, we don't treat them as index changes.
369        // Index files still need to be rebuilt after schema changes,
370        // but this will happen automatically during flush or compaction.
371        _ => false,
372    }
373}