mito2/worker/
handle_catchup.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Handling catchup request.
16
17use std::sync::Arc;
18
19use common_telemetry::tracing::warn;
20use common_telemetry::{debug, error, info};
21use store_api::logstore::LogStore;
22use store_api::region_engine::{RegionRole, SettableRegionRoleState};
23use store_api::region_request::RegionCatchupRequest;
24use store_api::storage::RegionId;
25
26use crate::error::{self, Result};
27use crate::region::MitoRegion;
28use crate::region::catchup::RegionCatchupTask;
29use crate::region::opener::RegionOpener;
30use crate::request::OptionOutputTx;
31use crate::wal::entry_distributor::WalEntryReceiver;
32use crate::worker::RegionWorkerLoop;
33
34impl<S: LogStore> RegionWorkerLoop<S> {
35    pub(crate) async fn handle_catchup_request(
36        &mut self,
37        region_id: RegionId,
38        request: RegionCatchupRequest,
39        entry_receiver: Option<WalEntryReceiver>,
40        sender: OptionOutputTx,
41    ) {
42        let Some(region) = self.regions.get_region(region_id) else {
43            sender.send(Err(error::RegionNotFoundSnafu { region_id }.build()));
44            return;
45        };
46
47        if region.is_writable() {
48            debug!("Region {region_id} is writable, skip catchup");
49            sender.send(Ok(0));
50            return;
51        }
52
53        if self.catchup_regions.is_region_exists(region_id) {
54            warn!("Region {region_id} under catching up");
55            sender.send(Err(error::RegionBusySnafu { region_id }.build()));
56            return;
57        }
58
59        // If the memtable is not empty or the manifest has been updated, we need to reopen the region.
60        let region = match self.reopen_region_if_needed(region).await {
61            Ok(region) => region,
62            Err(e) => {
63                sender.send(Err(e));
64                return;
65            }
66        };
67
68        self.catchup_regions.insert_region(region_id);
69        let catchup_regions = self.catchup_regions.clone();
70        let wal = self.wal.clone();
71        let allow_stale_entries = self.config.allow_stale_entries;
72        common_runtime::spawn_global(async move {
73            let mut task = RegionCatchupTask::new(region.clone(), wal, allow_stale_entries)
74                .with_entry_receiver(entry_receiver)
75                .with_expected_last_entry_id(request.entry_id)
76                .with_location_id(request.location_id)
77                .with_replay_checkpoint_entry_id(request.checkpoint.map(|c| c.entry_id));
78
79            match task.run().await {
80                Ok(_) => {
81                    if request.set_writable {
82                        region.set_role(RegionRole::Leader);
83                        // Finalize leadership: persist backfilled metadata.
84                        if let Err(err) = region
85                            .set_role_state_gracefully(SettableRegionRoleState::Leader)
86                            .await
87                        {
88                            error!(err; "Failed to set region {region_id} to leader");
89                        }
90                    }
91                    sender.send(Ok(0));
92                    catchup_regions.remove_region(region_id);
93                }
94                Err(err) => {
95                    error!(err; "Failed to catchup region {region_id}");
96                    sender.send(Err(err));
97                    catchup_regions.remove_region(region_id);
98                }
99            }
100        });
101    }
102
103    /// Reopens a region.
104    pub(crate) async fn reopen_region(
105        &mut self,
106        region: &Arc<MitoRegion>,
107    ) -> Result<Arc<MitoRegion>> {
108        let region_id = region.region_id;
109        let manifest_version = region.manifest_ctx.manifest_version().await;
110        let flushed_entry_id = region.version_control.current().last_entry_id;
111        info!(
112            "Reopening the region: {region_id}, manifest version: {manifest_version}, flushed entry id: {flushed_entry_id}"
113        );
114        let reopened_region = Arc::new(
115            RegionOpener::new(
116                region_id,
117                region.table_dir(),
118                region.access_layer.path_type(),
119                self.memtable_builder_provider.clone(),
120                self.object_store_manager.clone(),
121                self.purge_scheduler.clone(),
122                self.puffin_manager_factory.clone(),
123                self.intermediate_manager.clone(),
124                self.time_provider.clone(),
125                self.file_ref_manager.clone(),
126                self.partition_expr_fetcher.clone(),
127            )
128            .cache(Some(self.cache_manager.clone()))
129            .options(region.version().options.clone())?
130            .skip_wal_replay(true)
131            .open(&self.config, &self.wal)
132            .await?,
133        );
134        debug_assert!(!reopened_region.is_writable());
135        self.regions.insert_region(reopened_region.clone());
136
137        Ok(reopened_region)
138    }
139
140    async fn reopen_region_if_needed(
141        &mut self,
142        region: Arc<MitoRegion>,
143    ) -> Result<Arc<MitoRegion>> {
144        // Note: Currently, We protect the split brain by ensuring the memtable table is empty.
145        // It's expensive to execute catch-up requests without `set_writable=true` multiple times.
146        let version = region.version();
147        let is_empty_memtable = version.memtables.is_empty();
148
149        // Utilizes the short circuit evaluation.
150        let region = if !is_empty_memtable || region.manifest_ctx.has_update().await? {
151            if !is_empty_memtable {
152                warn!(
153                    "Region {} memtables is not empty, which should not happen, manifest version: {}, last entry id: {}",
154                    region.region_id,
155                    region.manifest_ctx.manifest_version().await,
156                    region.version_control.current().last_entry_id
157                );
158            }
159            self.reopen_region(&region).await?
160        } else {
161            region
162        };
163
164        Ok(region)
165    }
166}