mito2/worker/
handle_alter.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Handling alter related requests.
16
17use std::str::FromStr;
18use std::sync::Arc;
19
20use common_base::readable_size::ReadableSize;
21use common_telemetry::info;
22use common_telemetry::tracing::warn;
23use humantime_serde::re::humantime;
24use snafu::ResultExt;
25use store_api::metadata::{
26    InvalidSetRegionOptionRequestSnafu, MetadataError, RegionMetadata, RegionMetadataBuilder,
27    RegionMetadataRef,
28};
29use store_api::mito_engine_options;
30use store_api::region_request::{AlterKind, RegionAlterRequest, SetRegionOption};
31use store_api::storage::RegionId;
32
33use crate::error::{InvalidMetadataSnafu, InvalidRegionRequestSnafu, Result};
34use crate::flush::FlushReason;
35use crate::manifest::action::RegionChange;
36use crate::region::MitoRegionRef;
37use crate::region::options::CompactionOptions::Twcs;
38use crate::region::options::TwcsOptions;
39use crate::region::version::VersionRef;
40use crate::request::{DdlRequest, OptionOutputTx, SenderDdlRequest};
41use crate::worker::RegionWorkerLoop;
42
43impl<S> RegionWorkerLoop<S> {
44    pub(crate) async fn handle_alter_request(
45        &mut self,
46        region_id: RegionId,
47        request: RegionAlterRequest,
48        sender: OptionOutputTx,
49    ) {
50        let region = match self.regions.writable_non_staging_region(region_id) {
51            Ok(region) => region,
52            Err(e) => {
53                sender.send(Err(e));
54                return;
55            }
56        };
57
58        info!("Try to alter region: {}, request: {:?}", region_id, request);
59
60        // Get the version before alter.
61        let version = region.version();
62
63        // fast path for memory state changes like options.
64        match request.kind {
65            AlterKind::SetRegionOptions { options } => {
66                match self.handle_alter_region_options(region, version, options) {
67                    Ok(_) => sender.send(Ok(0)),
68                    Err(e) => sender.send(Err(e).context(InvalidMetadataSnafu)),
69                }
70                return;
71            }
72            AlterKind::UnsetRegionOptions { keys } => {
73                // Converts the keys to SetRegionOption.
74                //
75                // It passes an empty string to achieve the purpose of unset
76                match self.handle_alter_region_options(
77                    region,
78                    version,
79                    keys.iter().map(Into::into).collect(),
80                ) {
81                    Ok(_) => sender.send(Ok(0)),
82                    Err(e) => sender.send(Err(e).context(InvalidMetadataSnafu)),
83                }
84                return;
85            }
86            _ => {}
87        }
88
89        // Validate request.
90        if let Err(e) = request.validate(&version.metadata) {
91            // Invalid request.
92            sender.send(Err(e).context(InvalidRegionRequestSnafu));
93            return;
94        }
95
96        // Checks whether we need to alter the region.
97        if !request.need_alter(&version.metadata) {
98            warn!(
99                "Ignores alter request as it alters nothing, region_id: {}, request: {:?}",
100                region_id, request
101            );
102            sender.send(Ok(0));
103            return;
104        }
105
106        // Checks whether we can alter the region directly.
107        if !version.memtables.is_empty() {
108            // If memtable is not empty, we can't alter it directly and need to flush
109            // all memtables first.
110            info!("Flush region: {} before alteration", region_id);
111
112            // Try to submit a flush task.
113            let task = self.new_flush_task(&region, FlushReason::Alter, None, self.config.clone());
114            if let Err(e) =
115                self.flush_scheduler
116                    .schedule_flush(region.region_id, &region.version_control, task)
117            {
118                // Unable to flush the region, send error to waiter.
119                sender.send(Err(e));
120                return;
121            }
122
123            // Safety: We have requested flush.
124            self.flush_scheduler
125                .add_ddl_request_to_pending(SenderDdlRequest {
126                    region_id,
127                    sender,
128                    request: DdlRequest::Alter(request),
129                });
130
131            return;
132        }
133
134        info!(
135            "Try to alter region {}, version.metadata: {:?}, request: {:?}",
136            region_id, version.metadata, request,
137        );
138        self.handle_alter_region_metadata(region, version, request, sender);
139    }
140
141    /// Handles region metadata changes.
142    fn handle_alter_region_metadata(
143        &mut self,
144        region: MitoRegionRef,
145        version: VersionRef,
146        request: RegionAlterRequest,
147        sender: OptionOutputTx,
148    ) {
149        let new_meta = match metadata_after_alteration(&version.metadata, request) {
150            Ok(new_meta) => new_meta,
151            Err(e) => {
152                sender.send(Err(e));
153                return;
154            }
155        };
156        // Persist the metadata to region's manifest.
157        let change = RegionChange {
158            metadata: new_meta,
159            sst_format: region.sst_format(),
160        };
161        self.handle_manifest_region_change(region, change, sender)
162    }
163
164    /// Handles requests that changes region options, like TTL. It only affects memory state
165    /// since changes are persisted in the `DatanodeTableValue` in metasrv.
166    fn handle_alter_region_options(
167        &mut self,
168        region: MitoRegionRef,
169        version: VersionRef,
170        options: Vec<SetRegionOption>,
171    ) -> std::result::Result<(), MetadataError> {
172        let mut current_options = version.options.clone();
173        for option in options {
174            match option {
175                SetRegionOption::Ttl(new_ttl) => {
176                    info!(
177                        "Update region ttl: {}, previous: {:?} new: {:?}",
178                        region.region_id, current_options.ttl, new_ttl
179                    );
180                    current_options.ttl = new_ttl;
181                }
182                SetRegionOption::Twsc(key, value) => {
183                    let Twcs(options) = &mut current_options.compaction;
184                    set_twcs_options(
185                        options,
186                        &TwcsOptions::default(),
187                        &key,
188                        &value,
189                        region.region_id,
190                    )?;
191                }
192            }
193        }
194        region.version_control.alter_options(current_options);
195        Ok(())
196    }
197}
198
199/// Creates a metadata after applying the alter `request` to the old `metadata`.
200///
201/// Returns an error if the `request` is invalid.
202fn metadata_after_alteration(
203    metadata: &RegionMetadata,
204    request: RegionAlterRequest,
205) -> Result<RegionMetadataRef> {
206    let mut builder = RegionMetadataBuilder::from_existing(metadata.clone());
207    builder
208        .alter(request.kind)
209        .context(InvalidRegionRequestSnafu)?
210        .bump_version();
211    let new_meta = builder.build().context(InvalidMetadataSnafu)?;
212
213    Ok(Arc::new(new_meta))
214}
215
216fn set_twcs_options(
217    options: &mut TwcsOptions,
218    default_option: &TwcsOptions,
219    key: &str,
220    value: &str,
221    region_id: RegionId,
222) -> std::result::Result<(), MetadataError> {
223    match key {
224        mito_engine_options::TWCS_TRIGGER_FILE_NUM => {
225            let files = parse_usize_with_default(key, value, default_option.trigger_file_num)?;
226            log_option_update(region_id, key, options.trigger_file_num, files);
227            options.trigger_file_num = files;
228        }
229        mito_engine_options::TWCS_MAX_OUTPUT_FILE_SIZE => {
230            let size = if value.is_empty() {
231                default_option.max_output_file_size
232            } else {
233                Some(
234                    ReadableSize::from_str(value)
235                        .map_err(|_| InvalidSetRegionOptionRequestSnafu { key, value }.build())?,
236                )
237            };
238            log_option_update(region_id, key, options.max_output_file_size, size);
239            options.max_output_file_size = size;
240        }
241        mito_engine_options::TWCS_TIME_WINDOW => {
242            let window = if value.is_empty() {
243                default_option.time_window
244            } else {
245                Some(
246                    humantime::parse_duration(value)
247                        .map_err(|_| InvalidSetRegionOptionRequestSnafu { key, value }.build())?,
248                )
249            };
250            log_option_update(region_id, key, options.time_window, window);
251            options.time_window = window;
252        }
253        _ => return InvalidSetRegionOptionRequestSnafu { key, value }.fail(),
254    }
255    Ok(())
256}
257
258fn parse_usize_with_default(
259    key: &str,
260    value: &str,
261    default: usize,
262) -> std::result::Result<usize, MetadataError> {
263    if value.is_empty() {
264        Ok(default)
265    } else {
266        value
267            .parse::<usize>()
268            .map_err(|_| InvalidSetRegionOptionRequestSnafu { key, value }.build())
269    }
270}
271
272fn log_option_update<T: std::fmt::Debug>(
273    region_id: RegionId,
274    option_name: &str,
275    prev_value: T,
276    cur_value: T,
277) {
278    info!(
279        "Update region {}: {}, previous: {:?}, new: {:?}",
280        option_name, region_id, prev_value, cur_value
281    );
282}