Skip to main content

mito2/worker/
handle_enter_staging.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::time::Instant;
16
17use common_telemetry::{error, info, warn};
18use store_api::logstore::LogStore;
19use store_api::region_request::{EnterStagingRequest, StagingPartitionDirective};
20use store_api::storage::RegionId;
21
22use crate::compaction::RequestCancelResult;
23use crate::error::{RegionNotFoundSnafu, Result, StagingPartitionExprMismatchSnafu};
24use crate::flush::FlushReason;
25use crate::manifest::action::{RegionMetaAction, RegionMetaActionList, RegionPartitionExprChange};
26use crate::region::{MitoRegionRef, RegionLeaderState, StagingPartitionInfo};
27use crate::request::{
28    BackgroundNotify, DdlRequest, EnterStagingResult, OptionOutputTx, SenderDdlRequest,
29    WorkerRequest, WorkerRequestWithTime,
30};
31use crate::worker::RegionWorkerLoop;
32
33impl<S: LogStore> RegionWorkerLoop<S> {
34    pub(crate) async fn handle_enter_staging_request(
35        &mut self,
36        region_id: RegionId,
37        partition_directive: StagingPartitionDirective,
38        mut sender: OptionOutputTx,
39    ) {
40        let Some(region) = self.regions.writable_region_or(region_id, &mut sender) else {
41            return;
42        };
43
44        // If the region is already in staging mode, verify the partition directive matches.
45        if region.is_staging() {
46            let staging_partition_info = region.manifest_ctx.staging_partition_info();
47            // If the partition directive mismatches, return error.
48            if staging_partition_info
49                .as_ref()
50                .map(|info| &info.partition_directive)
51                != Some(&partition_directive)
52            {
53                sender.send(Err(StagingPartitionExprMismatchSnafu {
54                    manifest_expr: staging_partition_info
55                        .as_ref()
56                        .and_then(|info| info.partition_expr().map(ToString::to_string)),
57                    request_expr: format!("{:?}", partition_directive),
58                }
59                .build()));
60                return;
61            }
62
63            // If the partition expr matches, return success.
64            sender.send(Ok(0));
65            return;
66        }
67
68        let version = region.version();
69        if !version.memtables.is_empty() {
70            // If memtable is not empty, we can't enter staging directly and need to flush
71            // all memtables first.
72            info!("Flush region: {} before entering staging", region_id);
73            debug_assert!(!region.is_staging());
74            let task = self.new_flush_task(
75                &region,
76                FlushReason::EnterStaging,
77                None,
78                self.config.clone(),
79            );
80            if let Err(e) =
81                self.flush_scheduler
82                    .schedule_flush(region.region_id, &region.version_control, task)
83            {
84                // Unable to flush the region, send error to waiter.
85                sender.send(Err(e));
86                return;
87            }
88
89            // Safety: We have requested flush.
90            self.flush_scheduler
91                .add_ddl_request_to_pending(SenderDdlRequest {
92                    region_id,
93                    sender,
94                    request: DdlRequest::EnterStaging(EnterStagingRequest {
95                        partition_directive: partition_directive.clone(),
96                    }),
97                });
98
99            return;
100        }
101
102        match self.compaction_scheduler.request_cancel(region_id) {
103            RequestCancelResult::CancelIssued
104            | RequestCancelResult::AlreadyCancelling
105            | RequestCancelResult::TooLateToCancel => {
106                // Safety: region is compacting or has entered the non-cancellable publish stage,
107                // keep the DDL pending until the current task finishes or acknowledges cancellation.
108                self.compaction_scheduler
109                    .add_ddl_request_to_pending(SenderDdlRequest {
110                        region_id,
111                        sender,
112                        request: DdlRequest::EnterStaging(EnterStagingRequest {
113                            partition_directive,
114                        }),
115                    });
116
117                return;
118            }
119            RequestCancelResult::NotRunning => {}
120        }
121
122        self.handle_enter_staging(region, partition_directive, sender);
123    }
124
125    async fn enter_staging(
126        region: &MitoRegionRef,
127        partition_directive: &StagingPartitionDirective,
128    ) -> Result<()> {
129        let now = Instant::now();
130        // First step: clear all staging manifest files.
131        {
132            let mut manager = region.manifest_ctx.manifest_manager.write().await;
133            manager
134                .clear_staging_manifest_and_dir()
135                .await
136                .inspect_err(|e| {
137                    error!(
138                        e;
139                        "Failed to clear staging manifest files for region {}",
140                        region.region_id
141                    );
142                })?;
143
144            info!(
145                "Cleared all staging manifest files for region {}, elapsed: {:?}",
146                region.region_id,
147                now.elapsed(),
148            );
149        }
150
151        let partition_expr = match partition_directive {
152            StagingPartitionDirective::UpdatePartitionExpr(partition_expr) => {
153                partition_expr.clone()
154            }
155            StagingPartitionDirective::RejectAllWrites => {
156                info!(
157                    "Enter staging with reject all writes, region_id: {}",
158                    region.region_id
159                );
160                // Rejects all writes just a memory flag, no need to write new staging manifest.
161                return Ok(());
162            }
163        };
164
165        // Second step: write new staging manifest.
166        let change = RegionPartitionExprChange {
167            partition_expr: Some(partition_expr.clone()),
168        };
169        let action_list =
170            RegionMetaActionList::with_action(RegionMetaAction::PartitionExprChange(change));
171        region
172            .manifest_ctx
173            .update_manifest(RegionLeaderState::EnteringStaging, action_list, true)
174            .await?;
175
176        Ok(())
177    }
178
179    fn handle_enter_staging(
180        &self,
181        region: MitoRegionRef,
182        partition_directive: StagingPartitionDirective,
183        sender: OptionOutputTx,
184    ) {
185        if let Err(e) = region.set_entering_staging() {
186            sender.send(Err(e));
187            return;
188        }
189
190        let listener = self.listener.clone();
191        let request_sender = self.sender.clone();
192        common_runtime::spawn_global(async move {
193            let now = Instant::now();
194            let result = Self::enter_staging(&region, &partition_directive).await;
195            match result {
196                Ok(_) => {
197                    info!(
198                        "Created staging manifest for region {}, elapsed: {:?}",
199                        region.region_id,
200                        now.elapsed(),
201                    );
202                }
203                Err(ref e) => {
204                    // Unset the staging manifest
205                    region
206                        .manifest_ctx
207                        .manifest_manager
208                        .write()
209                        .await
210                        .unset_staging_manifest();
211                    error!(
212                        "Failed to create staging manifest for region {}: {:?}, elapsed: {:?}",
213                        region.region_id,
214                        e,
215                        now.elapsed(),
216                    );
217                }
218            }
219
220            let notify = WorkerRequest::Background {
221                region_id: region.region_id,
222                notify: BackgroundNotify::EnterStaging(EnterStagingResult {
223                    region_id: region.region_id,
224                    sender,
225                    result,
226                    partition_directive,
227                }),
228            };
229            listener
230                .on_enter_staging_result_begin(region.region_id)
231                .await;
232
233            if let Err(res) = request_sender
234                .send(WorkerRequestWithTime::new(notify))
235                .await
236            {
237                warn!(
238                    "Failed to send enter staging result back to the worker, region_id: {}, res: {:?}",
239                    region.region_id, res
240                );
241            }
242        });
243    }
244
245    /// Handles enter staging result.
246    pub(crate) async fn handle_enter_staging_result(
247        &mut self,
248        enter_staging_result: EnterStagingResult,
249    ) {
250        let region = match self.regions.get_region(enter_staging_result.region_id) {
251            Some(region) => region,
252            None => {
253                self.reject_region_stalled_requests(&enter_staging_result.region_id);
254                enter_staging_result.sender.send(
255                    RegionNotFoundSnafu {
256                        region_id: enter_staging_result.region_id,
257                    }
258                    .fail(),
259                );
260                return;
261            }
262        };
263
264        if enter_staging_result.result.is_ok() {
265            info!(
266                "Updating region {} staging partition directive to {:?}",
267                region.region_id, enter_staging_result.partition_directive
268            );
269            Self::update_region_staging_partition_info(
270                &region,
271                enter_staging_result.partition_directive,
272            );
273            region.switch_state_to_staging(RegionLeaderState::EnteringStaging);
274        } else {
275            region.switch_state_to_writable(RegionLeaderState::EnteringStaging);
276        }
277        enter_staging_result
278            .sender
279            .send(enter_staging_result.result.map(|_| 0));
280        // Handles the stalled requests.
281        self.handle_region_stalled_requests(&enter_staging_result.region_id)
282            .await;
283    }
284
285    fn update_region_staging_partition_info(
286        region: &MitoRegionRef,
287        partition_directive: StagingPartitionDirective,
288    ) {
289        region.manifest_ctx.set_staging_partition_info(
290            StagingPartitionInfo::from_partition_directive(partition_directive),
291        );
292    }
293}