mito2/worker/
handle_catchup.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Handling catchup request.
16
17use std::sync::Arc;
18
19use common_telemetry::tracing::warn;
20use common_telemetry::{debug, info};
21use snafu::ensure;
22use store_api::logstore::LogStore;
23use store_api::region_engine::{RegionRole, SettableRegionRoleState};
24use store_api::region_request::{AffectedRows, RegionCatchupRequest};
25use store_api::storage::RegionId;
26use tokio::time::Instant;
27
28use crate::error::{self, Result};
29use crate::region::MitoRegion;
30use crate::region::opener::{RegionOpener, replay_memtable};
31use crate::worker::RegionWorkerLoop;
32
33impl<S: LogStore> RegionWorkerLoop<S> {
34    pub(crate) async fn handle_catchup_request(
35        &mut self,
36        region_id: RegionId,
37        request: RegionCatchupRequest,
38    ) -> Result<AffectedRows> {
39        let Some(region) = self.regions.get_region(region_id) else {
40            return error::RegionNotFoundSnafu { region_id }.fail();
41        };
42
43        if region.is_writable() {
44            debug!("Region {region_id} is writable, skip catchup");
45            return Ok(0);
46        }
47        // Note: Currently, We protect the split brain by ensuring the mutable table is empty.
48        // It's expensive to execute catch-up requests without `set_writable=true` multiple times.
49        let version = region.version();
50        let is_empty_memtable = version.memtables.is_empty();
51
52        // Utilizes the short circuit evaluation.
53        let region = if !is_empty_memtable || region.manifest_ctx.has_update().await? {
54            if !is_empty_memtable {
55                warn!(
56                    "Region {} memtables is not empty, which should not happen, manifest version: {}, last entry id: {}",
57                    region.region_id,
58                    region.manifest_ctx.manifest_version().await,
59                    region.version_control.current().last_entry_id
60                );
61            }
62            self.reopen_region(&region).await?
63        } else {
64            region
65        };
66
67        if region.provider.is_remote_wal() {
68            let flushed_entry_id = region.version_control.current().last_entry_id;
69            let replay_from_entry_id = request
70                .checkpoint
71                .map(|c| c.entry_id)
72                .unwrap_or_default()
73                .max(flushed_entry_id);
74            info!(
75                "Trying to replay memtable for region: {region_id}, provider: {:?}, replay from entry id: {replay_from_entry_id}, flushed entry id: {flushed_entry_id}",
76                region.provider
77            );
78            let timer = Instant::now();
79            let wal_entry_reader =
80                self.wal
81                    .wal_entry_reader(&region.provider, region_id, request.location_id);
82            let on_region_opened = self.wal.on_region_opened();
83            let last_entry_id = replay_memtable(
84                &region.provider,
85                wal_entry_reader,
86                region_id,
87                replay_from_entry_id,
88                &region.version_control,
89                self.config.allow_stale_entries,
90                on_region_opened,
91            )
92            .await?;
93            info!(
94                "Elapsed: {:?}, region: {region_id}, provider: {:?} catchup finished. replay from entry id: {replay_from_entry_id}, flushed entry id: {flushed_entry_id}, last entry id: {last_entry_id}, expected: {:?}.",
95                timer.elapsed(),
96                region.provider,
97                request.entry_id
98            );
99            if let Some(expected_last_entry_id) = request.entry_id {
100                ensure!(
101                    // The replayed last entry id may be greater than the `expected_last_entry_id`.
102                    last_entry_id >= expected_last_entry_id,
103                    error::UnexpectedSnafu {
104                        reason: format!(
105                            "failed to set region {} to writable, it was expected to replayed to {}, but actually replayed to {}",
106                            region_id, expected_last_entry_id, last_entry_id,
107                        ),
108                    }
109                )
110            }
111        } else {
112            let version = region.version_control.current();
113            let mut flushed_entry_id = version.last_entry_id;
114
115            let latest_entry_id = self
116                .wal
117                .store()
118                .latest_entry_id(&region.provider)
119                .unwrap_or_default();
120            warn!(
121                "Skips to replay memtable for region: {}, flushed entry id: {}, latest entry id: {}",
122                region.region_id, flushed_entry_id, latest_entry_id
123            );
124
125            if latest_entry_id > flushed_entry_id {
126                warn!(
127                    "Found latest entry id is greater than flushed entry id, using latest entry id as flushed entry id, region: {}, latest entry id: {}, flushed entry id: {}",
128                    region_id, latest_entry_id, flushed_entry_id
129                );
130                flushed_entry_id = latest_entry_id;
131                region.version_control.set_entry_id(flushed_entry_id);
132            }
133            let on_region_opened = self.wal.on_region_opened();
134            on_region_opened(region_id, flushed_entry_id, &region.provider).await?;
135        }
136
137        if request.set_writable {
138            region.set_role(RegionRole::Leader);
139            // Finalize leadership: persist backfilled metadata.
140            region
141                .set_role_state_gracefully(SettableRegionRoleState::Leader)
142                .await?;
143        }
144
145        Ok(0)
146    }
147
148    /// Reopens a region.
149    pub(crate) async fn reopen_region(
150        &mut self,
151        region: &Arc<MitoRegion>,
152    ) -> Result<Arc<MitoRegion>> {
153        let region_id = region.region_id;
154        let manifest_version = region.manifest_ctx.manifest_version().await;
155        let flushed_entry_id = region.version_control.current().last_entry_id;
156        info!(
157            "Reopening the region: {region_id}, manifest version: {manifest_version}, flushed entry id: {flushed_entry_id}"
158        );
159        let reopened_region = Arc::new(
160            RegionOpener::new(
161                region_id,
162                region.table_dir(),
163                region.access_layer.path_type(),
164                self.memtable_builder_provider.clone(),
165                self.object_store_manager.clone(),
166                self.purge_scheduler.clone(),
167                self.puffin_manager_factory.clone(),
168                self.intermediate_manager.clone(),
169                self.time_provider.clone(),
170                self.file_ref_manager.clone(),
171                self.partition_expr_fetcher.clone(),
172            )
173            .cache(Some(self.cache_manager.clone()))
174            .options(region.version().options.clone())?
175            .skip_wal_replay(true)
176            .open(&self.config, &self.wal)
177            .await?,
178        );
179        debug_assert!(!reopened_region.is_writable());
180        self.regions.insert_region(reopened_region.clone());
181
182        Ok(reopened_region)
183    }
184}