mito2/worker/
handle_alter.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Handling alter related requests.
16
17use std::str::FromStr;
18use std::sync::Arc;
19
20use common_base::readable_size::ReadableSize;
21use common_telemetry::info;
22use common_telemetry::tracing::warn;
23use humantime_serde::re::humantime;
24use snafu::ResultExt;
25use store_api::metadata::{
26    InvalidSetRegionOptionRequestSnafu, MetadataError, RegionMetadata, RegionMetadataBuilder,
27    RegionMetadataRef,
28};
29use store_api::mito_engine_options;
30use store_api::region_request::{AlterKind, RegionAlterRequest, SetRegionOption};
31use store_api::storage::RegionId;
32
33use crate::error::{InvalidMetadataSnafu, InvalidRegionRequestSnafu, Result};
34use crate::flush::FlushReason;
35use crate::manifest::action::RegionChange;
36use crate::region::options::CompactionOptions::Twcs;
37use crate::region::options::TwcsOptions;
38use crate::region::version::VersionRef;
39use crate::region::MitoRegionRef;
40use crate::request::{DdlRequest, OptionOutputTx, SenderDdlRequest};
41use crate::worker::RegionWorkerLoop;
42
43impl<S> RegionWorkerLoop<S> {
44    pub(crate) async fn handle_alter_request(
45        &mut self,
46        region_id: RegionId,
47        request: RegionAlterRequest,
48        mut sender: OptionOutputTx,
49    ) {
50        let Some(region) = self.regions.writable_region_or(region_id, &mut sender) else {
51            return;
52        };
53
54        info!("Try to alter region: {}, request: {:?}", region_id, request);
55
56        // Get the version before alter.
57        let version = region.version();
58
59        // fast path for memory state changes like options.
60        match request.kind {
61            AlterKind::SetRegionOptions { options } => {
62                match self.handle_alter_region_options(region, version, options) {
63                    Ok(_) => sender.send(Ok(0)),
64                    Err(e) => sender.send(Err(e).context(InvalidMetadataSnafu)),
65                }
66                return;
67            }
68            AlterKind::UnsetRegionOptions { keys } => {
69                // Converts the keys to SetRegionOption.
70                //
71                // It passes an empty string to achieve the purpose of unset
72                match self.handle_alter_region_options(
73                    region,
74                    version,
75                    keys.iter().map(Into::into).collect(),
76                ) {
77                    Ok(_) => sender.send(Ok(0)),
78                    Err(e) => sender.send(Err(e).context(InvalidMetadataSnafu)),
79                }
80                return;
81            }
82            _ => {}
83        }
84
85        // Validate request.
86        if let Err(e) = request.validate(&version.metadata) {
87            // Invalid request.
88            sender.send(Err(e).context(InvalidRegionRequestSnafu));
89            return;
90        }
91
92        // Checks whether we need to alter the region.
93        if !request.need_alter(&version.metadata) {
94            warn!(
95                "Ignores alter request as it alters nothing, region_id: {}, request: {:?}",
96                region_id, request
97            );
98            sender.send(Ok(0));
99            return;
100        }
101
102        // Checks whether we can alter the region directly.
103        if !version.memtables.is_empty() {
104            // If memtable is not empty, we can't alter it directly and need to flush
105            // all memtables first.
106            info!("Flush region: {} before alteration", region_id);
107
108            // Try to submit a flush task.
109            let task = self.new_flush_task(&region, FlushReason::Alter, None, self.config.clone());
110            if let Err(e) =
111                self.flush_scheduler
112                    .schedule_flush(region.region_id, &region.version_control, task)
113            {
114                // Unable to flush the region, send error to waiter.
115                sender.send(Err(e));
116                return;
117            }
118
119            // Safety: We have requested flush.
120            self.flush_scheduler
121                .add_ddl_request_to_pending(SenderDdlRequest {
122                    region_id,
123                    sender,
124                    request: DdlRequest::Alter(request),
125                });
126
127            return;
128        }
129
130        info!(
131            "Try to alter region {}, version.metadata: {:?}, request: {:?}",
132            region_id, version.metadata, request,
133        );
134        self.handle_alter_region_metadata(region, version, request, sender);
135    }
136
137    /// Handles region metadata changes.
138    fn handle_alter_region_metadata(
139        &mut self,
140        region: MitoRegionRef,
141        version: VersionRef,
142        request: RegionAlterRequest,
143        sender: OptionOutputTx,
144    ) {
145        let new_meta = match metadata_after_alteration(&version.metadata, request) {
146            Ok(new_meta) => new_meta,
147            Err(e) => {
148                sender.send(Err(e));
149                return;
150            }
151        };
152        // Persist the metadata to region's manifest.
153        let change = RegionChange { metadata: new_meta };
154        self.handle_manifest_region_change(region, change, sender)
155    }
156
157    /// Handles requests that changes region options, like TTL. It only affects memory state
158    /// since changes are persisted in the `DatanodeTableValue` in metasrv.
159    fn handle_alter_region_options(
160        &mut self,
161        region: MitoRegionRef,
162        version: VersionRef,
163        options: Vec<SetRegionOption>,
164    ) -> std::result::Result<(), MetadataError> {
165        let mut current_options = version.options.clone();
166        for option in options {
167            match option {
168                SetRegionOption::Ttl(new_ttl) => {
169                    info!(
170                        "Update region ttl: {}, previous: {:?} new: {:?}",
171                        region.region_id, current_options.ttl, new_ttl
172                    );
173                    current_options.ttl = new_ttl;
174                }
175                SetRegionOption::Twsc(key, value) => {
176                    let Twcs(options) = &mut current_options.compaction;
177                    set_twcs_options(
178                        options,
179                        &TwcsOptions::default(),
180                        &key,
181                        &value,
182                        region.region_id,
183                    )?;
184                }
185            }
186        }
187        region.version_control.alter_options(current_options);
188        Ok(())
189    }
190}
191
192/// Creates a metadata after applying the alter `request` to the old `metadata`.
193///
194/// Returns an error if the `request` is invalid.
195fn metadata_after_alteration(
196    metadata: &RegionMetadata,
197    request: RegionAlterRequest,
198) -> Result<RegionMetadataRef> {
199    let mut builder = RegionMetadataBuilder::from_existing(metadata.clone());
200    builder
201        .alter(request.kind)
202        .context(InvalidRegionRequestSnafu)?
203        .bump_version();
204    let new_meta = builder.build().context(InvalidMetadataSnafu)?;
205
206    Ok(Arc::new(new_meta))
207}
208
209fn set_twcs_options(
210    options: &mut TwcsOptions,
211    default_option: &TwcsOptions,
212    key: &str,
213    value: &str,
214    region_id: RegionId,
215) -> std::result::Result<(), MetadataError> {
216    match key {
217        mito_engine_options::TWCS_MAX_ACTIVE_WINDOW_RUNS => {
218            let runs = parse_usize_with_default(key, value, default_option.max_active_window_runs)?;
219            log_option_update(region_id, key, options.max_active_window_runs, runs);
220            options.max_active_window_runs = runs;
221        }
222        mito_engine_options::TWCS_MAX_ACTIVE_WINDOW_FILES => {
223            let files =
224                parse_usize_with_default(key, value, default_option.max_active_window_files)?;
225            log_option_update(region_id, key, options.max_active_window_files, files);
226            options.max_active_window_files = files;
227        }
228        mito_engine_options::TWCS_MAX_INACTIVE_WINDOW_RUNS => {
229            let runs =
230                parse_usize_with_default(key, value, default_option.max_inactive_window_runs)?;
231            log_option_update(region_id, key, options.max_inactive_window_runs, runs);
232            options.max_inactive_window_runs = runs;
233        }
234        mito_engine_options::TWCS_MAX_INACTIVE_WINDOW_FILES => {
235            let files =
236                parse_usize_with_default(key, value, default_option.max_inactive_window_files)?;
237            log_option_update(region_id, key, options.max_inactive_window_files, files);
238            options.max_inactive_window_files = files;
239        }
240        mito_engine_options::TWCS_MAX_OUTPUT_FILE_SIZE => {
241            let size = if value.is_empty() {
242                default_option.max_output_file_size
243            } else {
244                Some(
245                    ReadableSize::from_str(value)
246                        .map_err(|_| InvalidSetRegionOptionRequestSnafu { key, value }.build())?,
247                )
248            };
249            log_option_update(region_id, key, options.max_output_file_size, size);
250            options.max_output_file_size = size;
251        }
252        mito_engine_options::TWCS_TIME_WINDOW => {
253            let window = if value.is_empty() {
254                default_option.time_window
255            } else {
256                Some(
257                    humantime::parse_duration(value)
258                        .map_err(|_| InvalidSetRegionOptionRequestSnafu { key, value }.build())?,
259                )
260            };
261            log_option_update(region_id, key, options.time_window, window);
262            options.time_window = window;
263        }
264        _ => return InvalidSetRegionOptionRequestSnafu { key, value }.fail(),
265    }
266    Ok(())
267}
268
269fn parse_usize_with_default(
270    key: &str,
271    value: &str,
272    default: usize,
273) -> std::result::Result<usize, MetadataError> {
274    if value.is_empty() {
275        Ok(default)
276    } else {
277        value
278            .parse::<usize>()
279            .map_err(|_| InvalidSetRegionOptionRequestSnafu { key, value }.build())
280    }
281}
282
283fn log_option_update<T: std::fmt::Debug>(
284    region_id: RegionId,
285    option_name: &str,
286    prev_value: T,
287    cur_value: T,
288) {
289    info!(
290        "Update region {}: {}, previous: {:?}, new: {:?}",
291        option_name, region_id, prev_value, cur_value
292    );
293}