mito2/worker/
handle_enter_staging.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16use std::time::Instant;
17
18use common_telemetry::{error, info, warn};
19use store_api::logstore::LogStore;
20use store_api::region_request::EnterStagingRequest;
21use store_api::storage::RegionId;
22
23use crate::error::{RegionNotFoundSnafu, Result, StagingPartitionExprMismatchSnafu};
24use crate::flush::FlushReason;
25use crate::manifest::action::{RegionChange, RegionMetaAction, RegionMetaActionList};
26use crate::region::{MitoRegionRef, RegionLeaderState};
27use crate::request::{
28    BackgroundNotify, DdlRequest, EnterStagingResult, OptionOutputTx, SenderDdlRequest,
29    WorkerRequest, WorkerRequestWithTime,
30};
31use crate::worker::RegionWorkerLoop;
32
33impl<S: LogStore> RegionWorkerLoop<S> {
34    pub(crate) async fn handle_enter_staging_request(
35        &mut self,
36        region_id: RegionId,
37        partition_expr: String,
38        mut sender: OptionOutputTx,
39    ) {
40        let Some(region) = self.regions.writable_region_or(region_id, &mut sender) else {
41            return;
42        };
43
44        // If the region is already in staging mode, verify the partition expr matches.
45        if region.is_staging() {
46            let staging_partition_expr = region.staging_partition_expr.lock().unwrap().clone();
47            // If the partition expr mismatch, return error.
48            if staging_partition_expr.as_ref() != Some(&partition_expr) {
49                sender.send(Err(StagingPartitionExprMismatchSnafu {
50                    manifest_expr: staging_partition_expr,
51                    request_expr: partition_expr,
52                }
53                .build()));
54                return;
55            }
56
57            // If the partition expr matches, return success.
58            sender.send(Ok(0));
59            return;
60        }
61
62        let version = region.version();
63        if !version.memtables.is_empty() {
64            // If memtable is not empty, we can't enter staging directly and need to flush
65            // all memtables first.
66            info!("Flush region: {} before entering staging", region_id);
67            debug_assert!(!region.is_staging());
68            let task = self.new_flush_task(
69                &region,
70                FlushReason::EnterStaging,
71                None,
72                self.config.clone(),
73            );
74            if let Err(e) =
75                self.flush_scheduler
76                    .schedule_flush(region.region_id, &region.version_control, task)
77            {
78                // Unable to flush the region, send error to waiter.
79                sender.send(Err(e));
80                return;
81            }
82
83            // Safety: We have requested flush.
84            self.flush_scheduler
85                .add_ddl_request_to_pending(SenderDdlRequest {
86                    region_id,
87                    sender,
88                    request: DdlRequest::EnterStaging(EnterStagingRequest { partition_expr }),
89                });
90
91            return;
92        }
93
94        self.handle_enter_staging(region, partition_expr, sender);
95    }
96
97    async fn enter_staging(region: &MitoRegionRef, partition_expr: String) -> Result<()> {
98        let now = Instant::now();
99        // First step: clear all staging manifest files.
100        {
101            let mut manager = region.manifest_ctx.manifest_manager.write().await;
102            manager
103                .clear_staging_manifest_and_dir()
104                .await
105                .inspect_err(|e| {
106                    error!(
107                        e;
108                        "Failed to clear staging manifest files for region {}",
109                        region.region_id
110                    );
111                })?;
112
113            info!(
114                "Cleared all staging manifest files for region {}, elapsed: {:?}",
115                region.region_id,
116                now.elapsed(),
117            );
118        }
119
120        // Second step: write new staging manifest.
121        let mut new_meta = (*region.metadata()).clone();
122        new_meta.partition_expr = Some(partition_expr.clone());
123        let sst_format = region.version().options.sst_format.unwrap_or_default();
124        let change = RegionChange {
125            metadata: Arc::new(new_meta),
126            sst_format,
127        };
128        let action_list = RegionMetaActionList::with_action(RegionMetaAction::Change(change));
129        region
130            .manifest_ctx
131            .update_manifest(RegionLeaderState::EnteringStaging, action_list, true)
132            .await?;
133
134        Ok(())
135    }
136
137    fn handle_enter_staging(
138        &self,
139        region: MitoRegionRef,
140        partition_expr: String,
141        sender: OptionOutputTx,
142    ) {
143        if let Err(e) = region.set_entering_staging() {
144            sender.send(Err(e));
145            return;
146        }
147
148        let listener = self.listener.clone();
149        let request_sender = self.sender.clone();
150        common_runtime::spawn_global(async move {
151            let now = Instant::now();
152            let result = Self::enter_staging(&region, partition_expr.clone()).await;
153            match result {
154                Ok(_) => {
155                    info!(
156                        "Created staging manifest for region {}, elapsed: {:?}",
157                        region.region_id,
158                        now.elapsed(),
159                    );
160                }
161                Err(ref e) => {
162                    // Unset the staging manifest
163                    region
164                        .manifest_ctx
165                        .manifest_manager
166                        .write()
167                        .await
168                        .unset_staging_manifest();
169                    error!(
170                        "Failed to create staging manifest for region {}: {:?}, elapsed: {:?}",
171                        region.region_id,
172                        e,
173                        now.elapsed(),
174                    );
175                }
176            }
177
178            let notify = WorkerRequest::Background {
179                region_id: region.region_id,
180                notify: BackgroundNotify::EnterStaging(EnterStagingResult {
181                    region_id: region.region_id,
182                    sender,
183                    result,
184                    partition_expr,
185                }),
186            };
187            listener
188                .on_enter_staging_result_begin(region.region_id)
189                .await;
190
191            if let Err(res) = request_sender
192                .send(WorkerRequestWithTime::new(notify))
193                .await
194            {
195                warn!(
196                    "Failed to send enter staging result back to the worker, region_id: {}, res: {:?}",
197                    region.region_id, res
198                );
199            }
200        });
201    }
202
203    /// Handles enter staging result.
204    pub(crate) async fn handle_enter_staging_result(
205        &mut self,
206        enter_staging_result: EnterStagingResult,
207    ) {
208        let region = match self.regions.get_region(enter_staging_result.region_id) {
209            Some(region) => region,
210            None => {
211                self.reject_region_stalled_requests(&enter_staging_result.region_id);
212                enter_staging_result.sender.send(
213                    RegionNotFoundSnafu {
214                        region_id: enter_staging_result.region_id,
215                    }
216                    .fail(),
217                );
218                return;
219            }
220        };
221
222        if enter_staging_result.result.is_ok() {
223            info!(
224                "Updating region {} staging partition expr to {}",
225                region.region_id, enter_staging_result.partition_expr
226            );
227            Self::update_region_staging_partition_expr(
228                &region,
229                enter_staging_result.partition_expr,
230            );
231            region.switch_state_to_staging(RegionLeaderState::EnteringStaging);
232        } else {
233            region.switch_state_to_writable(RegionLeaderState::EnteringStaging);
234        }
235        enter_staging_result
236            .sender
237            .send(enter_staging_result.result.map(|_| 0));
238        // Handles the stalled requests.
239        self.handle_region_stalled_requests(&enter_staging_result.region_id)
240            .await;
241    }
242
243    fn update_region_staging_partition_expr(region: &MitoRegionRef, partition_expr: String) {
244        let mut staging_partition_expr = region.staging_partition_expr.lock().unwrap();
245        debug_assert!(staging_partition_expr.is_none());
246        *staging_partition_expr = Some(partition_expr);
247    }
248}