mito2/worker/
handle_enter_staging.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::time::Instant;
16
17use common_telemetry::{error, info, warn};
18use store_api::logstore::LogStore;
19use store_api::region_request::{EnterStagingRequest, StagingPartitionDirective};
20use store_api::storage::RegionId;
21
22use crate::error::{RegionNotFoundSnafu, Result, StagingPartitionExprMismatchSnafu};
23use crate::flush::FlushReason;
24use crate::manifest::action::{RegionMetaAction, RegionMetaActionList, RegionPartitionExprChange};
25use crate::region::{MitoRegionRef, RegionLeaderState, StagingPartitionInfo};
26use crate::request::{
27    BackgroundNotify, DdlRequest, EnterStagingResult, OptionOutputTx, SenderDdlRequest,
28    WorkerRequest, WorkerRequestWithTime,
29};
30use crate::worker::RegionWorkerLoop;
31
32impl<S: LogStore> RegionWorkerLoop<S> {
33    pub(crate) async fn handle_enter_staging_request(
34        &mut self,
35        region_id: RegionId,
36        partition_directive: StagingPartitionDirective,
37        mut sender: OptionOutputTx,
38    ) {
39        let Some(region) = self.regions.writable_region_or(region_id, &mut sender) else {
40            return;
41        };
42
43        // If the region is already in staging mode, verify the partition directive matches.
44        if region.is_staging() {
45            let staging_partition_info = region.staging_partition_info.lock().unwrap().clone();
46            // If the partition directive mismatches, return error.
47            if staging_partition_info
48                .as_ref()
49                .map(|info| &info.partition_directive)
50                != Some(&partition_directive)
51            {
52                sender.send(Err(StagingPartitionExprMismatchSnafu {
53                    manifest_expr: staging_partition_info
54                        .as_ref()
55                        .and_then(|info| info.partition_expr().map(ToString::to_string)),
56                    request_expr: format!("{:?}", partition_directive),
57                }
58                .build()));
59                return;
60            }
61
62            // If the partition expr matches, return success.
63            sender.send(Ok(0));
64            return;
65        }
66
67        let version = region.version();
68        if !version.memtables.is_empty() {
69            // If memtable is not empty, we can't enter staging directly and need to flush
70            // all memtables first.
71            info!("Flush region: {} before entering staging", region_id);
72            debug_assert!(!region.is_staging());
73            let task = self.new_flush_task(
74                &region,
75                FlushReason::EnterStaging,
76                None,
77                self.config.clone(),
78            );
79            if let Err(e) =
80                self.flush_scheduler
81                    .schedule_flush(region.region_id, &region.version_control, task)
82            {
83                // Unable to flush the region, send error to waiter.
84                sender.send(Err(e));
85                return;
86            }
87
88            // Safety: We have requested flush.
89            self.flush_scheduler
90                .add_ddl_request_to_pending(SenderDdlRequest {
91                    region_id,
92                    sender,
93                    request: DdlRequest::EnterStaging(EnterStagingRequest {
94                        partition_directive: partition_directive.clone(),
95                    }),
96                });
97
98            return;
99        }
100
101        if self.compaction_scheduler.is_compacting(region_id) {
102            // Safety: region is compacting, add ddl request to pending queue.
103            self.compaction_scheduler
104                .add_ddl_request_to_pending(SenderDdlRequest {
105                    region_id,
106                    sender,
107                    request: DdlRequest::EnterStaging(EnterStagingRequest {
108                        partition_directive,
109                    }),
110                });
111
112            return;
113        }
114
115        self.handle_enter_staging(region, partition_directive, sender);
116    }
117
118    async fn enter_staging(
119        region: &MitoRegionRef,
120        partition_directive: &StagingPartitionDirective,
121    ) -> Result<()> {
122        let now = Instant::now();
123        // First step: clear all staging manifest files.
124        {
125            let mut manager = region.manifest_ctx.manifest_manager.write().await;
126            manager
127                .clear_staging_manifest_and_dir()
128                .await
129                .inspect_err(|e| {
130                    error!(
131                        e;
132                        "Failed to clear staging manifest files for region {}",
133                        region.region_id
134                    );
135                })?;
136
137            info!(
138                "Cleared all staging manifest files for region {}, elapsed: {:?}",
139                region.region_id,
140                now.elapsed(),
141            );
142        }
143
144        let partition_expr = match partition_directive {
145            StagingPartitionDirective::UpdatePartitionExpr(partition_expr) => {
146                partition_expr.clone()
147            }
148            StagingPartitionDirective::RejectAllWrites => {
149                info!(
150                    "Enter staging with reject all writes, region_id: {}",
151                    region.region_id
152                );
153                // Rejects all writes just a memory flag, no need to write new staging manifest.
154                return Ok(());
155            }
156        };
157
158        // Second step: write new staging manifest.
159        let change = RegionPartitionExprChange {
160            partition_expr: Some(partition_expr.clone()),
161        };
162        let action_list =
163            RegionMetaActionList::with_action(RegionMetaAction::PartitionExprChange(change));
164        region
165            .manifest_ctx
166            .update_manifest(RegionLeaderState::EnteringStaging, action_list, true)
167            .await?;
168
169        Ok(())
170    }
171
172    fn handle_enter_staging(
173        &self,
174        region: MitoRegionRef,
175        partition_directive: StagingPartitionDirective,
176        sender: OptionOutputTx,
177    ) {
178        if let Err(e) = region.set_entering_staging() {
179            sender.send(Err(e));
180            return;
181        }
182
183        let listener = self.listener.clone();
184        let request_sender = self.sender.clone();
185        common_runtime::spawn_global(async move {
186            let now = Instant::now();
187            let result = Self::enter_staging(&region, &partition_directive).await;
188            match result {
189                Ok(_) => {
190                    info!(
191                        "Created staging manifest for region {}, elapsed: {:?}",
192                        region.region_id,
193                        now.elapsed(),
194                    );
195                }
196                Err(ref e) => {
197                    // Unset the staging manifest
198                    region
199                        .manifest_ctx
200                        .manifest_manager
201                        .write()
202                        .await
203                        .unset_staging_manifest();
204                    error!(
205                        "Failed to create staging manifest for region {}: {:?}, elapsed: {:?}",
206                        region.region_id,
207                        e,
208                        now.elapsed(),
209                    );
210                }
211            }
212
213            let notify = WorkerRequest::Background {
214                region_id: region.region_id,
215                notify: BackgroundNotify::EnterStaging(EnterStagingResult {
216                    region_id: region.region_id,
217                    sender,
218                    result,
219                    partition_directive,
220                }),
221            };
222            listener
223                .on_enter_staging_result_begin(region.region_id)
224                .await;
225
226            if let Err(res) = request_sender
227                .send(WorkerRequestWithTime::new(notify))
228                .await
229            {
230                warn!(
231                    "Failed to send enter staging result back to the worker, region_id: {}, res: {:?}",
232                    region.region_id, res
233                );
234            }
235        });
236    }
237
238    /// Handles enter staging result.
239    pub(crate) async fn handle_enter_staging_result(
240        &mut self,
241        enter_staging_result: EnterStagingResult,
242    ) {
243        let region = match self.regions.get_region(enter_staging_result.region_id) {
244            Some(region) => region,
245            None => {
246                self.reject_region_stalled_requests(&enter_staging_result.region_id);
247                enter_staging_result.sender.send(
248                    RegionNotFoundSnafu {
249                        region_id: enter_staging_result.region_id,
250                    }
251                    .fail(),
252                );
253                return;
254            }
255        };
256
257        if enter_staging_result.result.is_ok() {
258            info!(
259                "Updating region {} staging partition directive to {:?}",
260                region.region_id, enter_staging_result.partition_directive
261            );
262            Self::update_region_staging_partition_info(
263                &region,
264                enter_staging_result.partition_directive,
265            );
266            region.switch_state_to_staging(RegionLeaderState::EnteringStaging);
267        } else {
268            region.switch_state_to_writable(RegionLeaderState::EnteringStaging);
269        }
270        enter_staging_result
271            .sender
272            .send(enter_staging_result.result.map(|_| 0));
273        // Handles the stalled requests.
274        self.handle_region_stalled_requests(&enter_staging_result.region_id)
275            .await;
276    }
277
278    fn update_region_staging_partition_info(
279        region: &MitoRegionRef,
280        partition_directive: StagingPartitionDirective,
281    ) {
282        let mut staging_partition_info = region.staging_partition_info.lock().unwrap();
283        debug_assert!(staging_partition_info.is_none());
284        *staging_partition_info = Some(StagingPartitionInfo::from_partition_directive(
285            partition_directive,
286        ));
287    }
288}