mito2/worker/
handle_enter_staging.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16use std::time::Instant;
17
18use common_telemetry::{error, info, warn};
19use store_api::logstore::LogStore;
20use store_api::region_request::EnterStagingRequest;
21use store_api::storage::RegionId;
22
23use crate::error::{RegionNotFoundSnafu, Result, StagingPartitionExprMismatchSnafu};
24use crate::flush::FlushReason;
25use crate::manifest::action::{RegionChange, RegionMetaAction, RegionMetaActionList};
26use crate::region::{MitoRegionRef, RegionLeaderState};
27use crate::request::{
28    BackgroundNotify, DdlRequest, EnterStagingResult, OptionOutputTx, SenderDdlRequest,
29    WorkerRequest, WorkerRequestWithTime,
30};
31use crate::worker::RegionWorkerLoop;
32
33impl<S: LogStore> RegionWorkerLoop<S> {
34    pub(crate) async fn handle_enter_staging_request(
35        &mut self,
36        region_id: RegionId,
37        partition_expr: String,
38        mut sender: OptionOutputTx,
39    ) {
40        let Some(region) = self.regions.writable_region_or(region_id, &mut sender) else {
41            return;
42        };
43
44        // If the region is already in staging mode, verify the partition expr matches.
45        if region.is_staging() {
46            let staging_partition_expr = region.staging_partition_expr.lock().unwrap().clone();
47            // If the partition expr mismatch, return error.
48            if staging_partition_expr.as_ref() != Some(&partition_expr) {
49                sender.send(Err(StagingPartitionExprMismatchSnafu {
50                    manifest_expr: staging_partition_expr,
51                    request_expr: partition_expr,
52                }
53                .build()));
54                return;
55            }
56
57            // If the partition expr matches, return success.
58            sender.send(Ok(0));
59            return;
60        }
61
62        let version = region.version();
63        if !version.memtables.is_empty() {
64            // If memtable is not empty, we can't enter staging directly and need to flush
65            // all memtables first.
66            info!("Flush region: {} before entering staging", region_id);
67            debug_assert!(!region.is_staging());
68            let task = self.new_flush_task(
69                &region,
70                FlushReason::EnterStaging,
71                None,
72                self.config.clone(),
73                region.is_staging(),
74            );
75            if let Err(e) =
76                self.flush_scheduler
77                    .schedule_flush(region.region_id, &region.version_control, task)
78            {
79                // Unable to flush the region, send error to waiter.
80                sender.send(Err(e));
81                return;
82            }
83
84            // Safety: We have requested flush.
85            self.flush_scheduler
86                .add_ddl_request_to_pending(SenderDdlRequest {
87                    region_id,
88                    sender,
89                    request: DdlRequest::EnterStaging(EnterStagingRequest { partition_expr }),
90                });
91
92            return;
93        }
94
95        self.handle_enter_staging(region, partition_expr, sender);
96    }
97
98    async fn enter_staging(region: &MitoRegionRef, partition_expr: String) -> Result<()> {
99        let now = Instant::now();
100        // First step: clear all staging manifest files.
101        {
102            let mut manager = region.manifest_ctx.manifest_manager.write().await;
103            manager
104                .clear_staging_manifest_and_dir()
105                .await
106                .inspect_err(|e| {
107                    error!(
108                        e;
109                        "Failed to clear staging manifest files for region {}",
110                        region.region_id
111                    );
112                })?;
113
114            info!(
115                "Cleared all staging manifest files for region {}, elapsed: {:?}",
116                region.region_id,
117                now.elapsed(),
118            );
119        }
120
121        // Second step: write new staging manifest.
122        let mut new_meta = (*region.metadata()).clone();
123        new_meta.partition_expr = Some(partition_expr.clone());
124        let sst_format = region.version().options.sst_format.unwrap_or_default();
125        let change = RegionChange {
126            metadata: Arc::new(new_meta),
127            sst_format,
128        };
129        let action_list = RegionMetaActionList::with_action(RegionMetaAction::Change(change));
130        region
131            .manifest_ctx
132            .update_manifest(RegionLeaderState::EnteringStaging, action_list, true)
133            .await?;
134
135        Ok(())
136    }
137
138    fn handle_enter_staging(
139        &self,
140        region: MitoRegionRef,
141        partition_expr: String,
142        sender: OptionOutputTx,
143    ) {
144        if let Err(e) = region.set_entering_staging() {
145            sender.send(Err(e));
146            return;
147        }
148
149        let listener = self.listener.clone();
150        let request_sender = self.sender.clone();
151        common_runtime::spawn_global(async move {
152            let now = Instant::now();
153            let result = Self::enter_staging(&region, partition_expr.clone()).await;
154            match result {
155                Ok(_) => {
156                    info!(
157                        "Created staging manifest for region {}, elapsed: {:?}",
158                        region.region_id,
159                        now.elapsed(),
160                    );
161                }
162                Err(ref e) => {
163                    // Unset the staging manifest
164                    region
165                        .manifest_ctx
166                        .manifest_manager
167                        .write()
168                        .await
169                        .unset_staging_manifest();
170                    error!(
171                        "Failed to create staging manifest for region {}: {:?}, elapsed: {:?}",
172                        region.region_id,
173                        e,
174                        now.elapsed(),
175                    );
176                }
177            }
178
179            let notify = WorkerRequest::Background {
180                region_id: region.region_id,
181                notify: BackgroundNotify::EnterStaging(EnterStagingResult {
182                    region_id: region.region_id,
183                    sender,
184                    result,
185                    partition_expr,
186                }),
187            };
188            listener
189                .on_enter_staging_result_begin(region.region_id)
190                .await;
191
192            if let Err(res) = request_sender
193                .send(WorkerRequestWithTime::new(notify))
194                .await
195            {
196                warn!(
197                    "Failed to send enter staging result back to the worker, region_id: {}, res: {:?}",
198                    region.region_id, res
199                );
200            }
201        });
202    }
203
204    /// Handles enter staging result.
205    pub(crate) async fn handle_enter_staging_result(
206        &mut self,
207        enter_staging_result: EnterStagingResult,
208    ) {
209        let region = match self.regions.get_region(enter_staging_result.region_id) {
210            Some(region) => region,
211            None => {
212                self.reject_region_stalled_requests(&enter_staging_result.region_id);
213                enter_staging_result.sender.send(
214                    RegionNotFoundSnafu {
215                        region_id: enter_staging_result.region_id,
216                    }
217                    .fail(),
218                );
219                return;
220            }
221        };
222
223        if enter_staging_result.result.is_ok() {
224            info!(
225                "Updating region {} staging partition expr to {}",
226                region.region_id, enter_staging_result.partition_expr
227            );
228            Self::update_region_staging_partition_expr(
229                &region,
230                enter_staging_result.partition_expr,
231            );
232            region.switch_state_to_staging(RegionLeaderState::EnteringStaging);
233        } else {
234            region.switch_state_to_writable(RegionLeaderState::EnteringStaging);
235        }
236        enter_staging_result
237            .sender
238            .send(enter_staging_result.result.map(|_| 0));
239        // Handles the stalled requests.
240        self.handle_region_stalled_requests(&enter_staging_result.region_id)
241            .await;
242    }
243
244    fn update_region_staging_partition_expr(region: &MitoRegionRef, partition_expr: String) {
245        let mut staging_partition_expr = region.staging_partition_expr.lock().unwrap();
246        debug_assert!(staging_partition_expr.is_none());
247        *staging_partition_expr = Some(partition_expr);
248    }
249}