mito2/worker/
handle_enter_staging.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::time::Instant;
16
17use common_telemetry::{error, info, warn};
18use store_api::logstore::LogStore;
19use store_api::region_request::{EnterStagingRequest, StagingPartitionDirective};
20use store_api::storage::RegionId;
21
22use crate::error::{RegionNotFoundSnafu, Result, StagingPartitionExprMismatchSnafu};
23use crate::flush::FlushReason;
24use crate::manifest::action::{RegionMetaAction, RegionMetaActionList, RegionPartitionExprChange};
25use crate::region::{MitoRegionRef, RegionLeaderState, StagingPartitionInfo};
26use crate::request::{
27    BackgroundNotify, DdlRequest, EnterStagingResult, OptionOutputTx, SenderDdlRequest,
28    WorkerRequest, WorkerRequestWithTime,
29};
30use crate::worker::RegionWorkerLoop;
31
32impl<S: LogStore> RegionWorkerLoop<S> {
33    pub(crate) async fn handle_enter_staging_request(
34        &mut self,
35        region_id: RegionId,
36        partition_directive: StagingPartitionDirective,
37        mut sender: OptionOutputTx,
38    ) {
39        let Some(region) = self.regions.writable_region_or(region_id, &mut sender) else {
40            return;
41        };
42
43        // If the region is already in staging mode, verify the partition directive matches.
44        if region.is_staging() {
45            let staging_partition_info = region.staging_partition_info.lock().unwrap().clone();
46            // If the partition directive mismatches, return error.
47            if staging_partition_info
48                .as_ref()
49                .map(|info| &info.partition_directive)
50                != Some(&partition_directive)
51            {
52                sender.send(Err(StagingPartitionExprMismatchSnafu {
53                    manifest_expr: staging_partition_info
54                        .as_ref()
55                        .and_then(|info| info.partition_expr().map(ToString::to_string)),
56                    request_expr: format!("{:?}", partition_directive),
57                }
58                .build()));
59                return;
60            }
61
62            // If the partition expr matches, return success.
63            sender.send(Ok(0));
64            return;
65        }
66
67        let version = region.version();
68        if !version.memtables.is_empty() {
69            // If memtable is not empty, we can't enter staging directly and need to flush
70            // all memtables first.
71            info!("Flush region: {} before entering staging", region_id);
72            debug_assert!(!region.is_staging());
73            let task = self.new_flush_task(
74                &region,
75                FlushReason::EnterStaging,
76                None,
77                self.config.clone(),
78            );
79            if let Err(e) =
80                self.flush_scheduler
81                    .schedule_flush(region.region_id, &region.version_control, task)
82            {
83                // Unable to flush the region, send error to waiter.
84                sender.send(Err(e));
85                return;
86            }
87
88            // Safety: We have requested flush.
89            self.flush_scheduler
90                .add_ddl_request_to_pending(SenderDdlRequest {
91                    region_id,
92                    sender,
93                    request: DdlRequest::EnterStaging(EnterStagingRequest {
94                        partition_directive: partition_directive.clone(),
95                    }),
96                });
97
98            return;
99        }
100
101        self.handle_enter_staging(region, partition_directive, sender);
102    }
103
104    async fn enter_staging(
105        region: &MitoRegionRef,
106        partition_directive: &StagingPartitionDirective,
107    ) -> Result<()> {
108        let now = Instant::now();
109        // First step: clear all staging manifest files.
110        {
111            let mut manager = region.manifest_ctx.manifest_manager.write().await;
112            manager
113                .clear_staging_manifest_and_dir()
114                .await
115                .inspect_err(|e| {
116                    error!(
117                        e;
118                        "Failed to clear staging manifest files for region {}",
119                        region.region_id
120                    );
121                })?;
122
123            info!(
124                "Cleared all staging manifest files for region {}, elapsed: {:?}",
125                region.region_id,
126                now.elapsed(),
127            );
128        }
129
130        let partition_expr = match partition_directive {
131            StagingPartitionDirective::UpdatePartitionExpr(partition_expr) => {
132                partition_expr.clone()
133            }
134            StagingPartitionDirective::RejectAllWrites => {
135                info!(
136                    "Enter staging with reject all writes, region_id: {}",
137                    region.region_id
138                );
139                // Rejects all writes just a memory flag, no need to write new staging manifest.
140                return Ok(());
141            }
142        };
143
144        // Second step: write new staging manifest.
145        let change = RegionPartitionExprChange {
146            partition_expr: Some(partition_expr.clone()),
147        };
148        let action_list =
149            RegionMetaActionList::with_action(RegionMetaAction::PartitionExprChange(change));
150        region
151            .manifest_ctx
152            .update_manifest(RegionLeaderState::EnteringStaging, action_list, true)
153            .await?;
154
155        Ok(())
156    }
157
158    fn handle_enter_staging(
159        &self,
160        region: MitoRegionRef,
161        partition_directive: StagingPartitionDirective,
162        sender: OptionOutputTx,
163    ) {
164        if let Err(e) = region.set_entering_staging() {
165            sender.send(Err(e));
166            return;
167        }
168
169        let listener = self.listener.clone();
170        let request_sender = self.sender.clone();
171        common_runtime::spawn_global(async move {
172            let now = Instant::now();
173            let result = Self::enter_staging(&region, &partition_directive).await;
174            match result {
175                Ok(_) => {
176                    info!(
177                        "Created staging manifest for region {}, elapsed: {:?}",
178                        region.region_id,
179                        now.elapsed(),
180                    );
181                }
182                Err(ref e) => {
183                    // Unset the staging manifest
184                    region
185                        .manifest_ctx
186                        .manifest_manager
187                        .write()
188                        .await
189                        .unset_staging_manifest();
190                    error!(
191                        "Failed to create staging manifest for region {}: {:?}, elapsed: {:?}",
192                        region.region_id,
193                        e,
194                        now.elapsed(),
195                    );
196                }
197            }
198
199            let notify = WorkerRequest::Background {
200                region_id: region.region_id,
201                notify: BackgroundNotify::EnterStaging(EnterStagingResult {
202                    region_id: region.region_id,
203                    sender,
204                    result,
205                    partition_directive,
206                }),
207            };
208            listener
209                .on_enter_staging_result_begin(region.region_id)
210                .await;
211
212            if let Err(res) = request_sender
213                .send(WorkerRequestWithTime::new(notify))
214                .await
215            {
216                warn!(
217                    "Failed to send enter staging result back to the worker, region_id: {}, res: {:?}",
218                    region.region_id, res
219                );
220            }
221        });
222    }
223
224    /// Handles enter staging result.
225    pub(crate) async fn handle_enter_staging_result(
226        &mut self,
227        enter_staging_result: EnterStagingResult,
228    ) {
229        let region = match self.regions.get_region(enter_staging_result.region_id) {
230            Some(region) => region,
231            None => {
232                self.reject_region_stalled_requests(&enter_staging_result.region_id);
233                enter_staging_result.sender.send(
234                    RegionNotFoundSnafu {
235                        region_id: enter_staging_result.region_id,
236                    }
237                    .fail(),
238                );
239                return;
240            }
241        };
242
243        if enter_staging_result.result.is_ok() {
244            info!(
245                "Updating region {} staging partition directive to {:?}",
246                region.region_id, enter_staging_result.partition_directive
247            );
248            Self::update_region_staging_partition_info(
249                &region,
250                enter_staging_result.partition_directive,
251            );
252            region.switch_state_to_staging(RegionLeaderState::EnteringStaging);
253        } else {
254            region.switch_state_to_writable(RegionLeaderState::EnteringStaging);
255        }
256        enter_staging_result
257            .sender
258            .send(enter_staging_result.result.map(|_| 0));
259        // Handles the stalled requests.
260        self.handle_region_stalled_requests(&enter_staging_result.region_id)
261            .await;
262    }
263
264    fn update_region_staging_partition_info(
265        region: &MitoRegionRef,
266        partition_directive: StagingPartitionDirective,
267    ) {
268        let mut staging_partition_info = region.staging_partition_info.lock().unwrap();
269        debug_assert!(staging_partition_info.is_none());
270        *staging_partition_info = Some(StagingPartitionInfo::from_partition_directive(
271            partition_directive,
272        ));
273    }
274}