mito2/region/
catchup.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16use std::time::Instant;
17
18use common_telemetry::{info, warn};
19use snafu::ensure;
20use store_api::logstore::LogStore;
21
22use crate::error::{self, Result};
23use crate::region::MitoRegion;
24use crate::region::opener::replay_memtable;
25use crate::wal::Wal;
26use crate::wal::entry_distributor::WalEntryReceiver;
27
28pub struct RegionCatchupTask<S> {
29    entry_receiver: Option<WalEntryReceiver>,
30    region: Arc<MitoRegion>,
31    replay_checkpoint_entry_id: Option<u64>,
32    expected_last_entry_id: Option<u64>,
33    allow_stale_entries: bool,
34    location_id: Option<u64>,
35    wal: Wal<S>,
36}
37
38impl<S: LogStore> RegionCatchupTask<S> {
39    pub fn new(region: Arc<MitoRegion>, wal: Wal<S>, allow_stale_entries: bool) -> Self {
40        Self {
41            entry_receiver: None,
42            region,
43            replay_checkpoint_entry_id: None,
44            expected_last_entry_id: None,
45            allow_stale_entries,
46            location_id: None,
47            wal,
48        }
49    }
50
51    /// Sets the location id.
52    pub(crate) fn with_location_id(mut self, location_id: Option<u64>) -> Self {
53        self.location_id = location_id;
54        self
55    }
56
57    /// Sets the expected last entry id.
58    pub(crate) fn with_expected_last_entry_id(
59        mut self,
60        expected_last_entry_id: Option<u64>,
61    ) -> Self {
62        self.expected_last_entry_id = expected_last_entry_id;
63        self
64    }
65
66    /// Sets the entry receiver.
67    pub(crate) fn with_entry_receiver(mut self, entry_receiver: Option<WalEntryReceiver>) -> Self {
68        self.entry_receiver = entry_receiver;
69        self
70    }
71
72    /// Sets the replay checkpoint entry id.
73    pub(crate) fn with_replay_checkpoint_entry_id(
74        mut self,
75        replay_checkpoint_entry_id: Option<u64>,
76    ) -> Self {
77        self.replay_checkpoint_entry_id = replay_checkpoint_entry_id;
78        self
79    }
80
81    pub async fn run(&mut self) -> Result<()> {
82        if self.region.provider.is_remote_wal() {
83            self.remote_wal_catchup().await
84        } else {
85            self.local_wal_catchup().await
86        }
87    }
88
89    async fn remote_wal_catchup(&mut self) -> Result<()> {
90        let flushed_entry_id = self.region.version_control.current().last_entry_id;
91        let replay_from_entry_id = self
92            .replay_checkpoint_entry_id
93            .unwrap_or(flushed_entry_id)
94            .max(flushed_entry_id);
95        let region_id = self.region.region_id;
96        info!(
97            "Trying to replay memtable for region: {region_id}, provider: {:?}, replay from entry id: {replay_from_entry_id}, flushed entry id: {flushed_entry_id}",
98            self.region.provider
99        );
100        let timer = Instant::now();
101        let wal_entry_reader = self
102            .entry_receiver
103            .take()
104            .map(|r| Box::new(r) as _)
105            .unwrap_or_else(|| {
106                self.wal
107                    .wal_entry_reader(&self.region.provider, region_id, self.location_id)
108            });
109        let on_region_opened = self.wal.on_region_opened();
110        let last_entry_id = replay_memtable(
111            &self.region.provider,
112            wal_entry_reader,
113            region_id,
114            replay_from_entry_id,
115            &self.region.version_control,
116            self.allow_stale_entries,
117            on_region_opened,
118        )
119        .await?;
120        info!(
121            "Elapsed: {:?}, region: {region_id}, provider: {:?} catchup finished. replay from entry id: {replay_from_entry_id}, flushed entry id: {flushed_entry_id}, last entry id: {last_entry_id}, expected: {:?}.",
122            timer.elapsed(),
123            self.region.provider,
124            self.expected_last_entry_id
125        );
126        if let Some(expected_last_entry_id) = self.expected_last_entry_id {
127            ensure!(
128                // The replayed last entry id may be greater than the `expected_last_entry_id`.
129                last_entry_id >= expected_last_entry_id,
130                error::UnexpectedSnafu {
131                    reason: format!(
132                        "Failed to catchup region {}, it was expected to replay to {}, but actually replayed to {}",
133                        region_id, expected_last_entry_id, last_entry_id,
134                    ),
135                }
136            )
137        }
138        Ok(())
139    }
140
141    async fn local_wal_catchup(&mut self) -> Result<()> {
142        let version = self.region.version_control.current();
143        let mut flushed_entry_id = version.last_entry_id;
144        let region_id = self.region.region_id;
145        let latest_entry_id = self
146            .wal
147            .store()
148            .latest_entry_id(&self.region.provider)
149            .unwrap_or_default();
150        info!(
151            "Skips to replay memtable for region: {}, flushed entry id: {}, latest entry id: {}",
152            region_id, flushed_entry_id, latest_entry_id
153        );
154
155        if latest_entry_id > flushed_entry_id {
156            warn!(
157                "Found latest entry id is greater than flushed entry id, using latest entry id as flushed entry id, region: {}, latest entry id: {}, flushed entry id: {}",
158                region_id, latest_entry_id, flushed_entry_id
159            );
160            flushed_entry_id = latest_entry_id;
161            self.region.version_control.set_entry_id(flushed_entry_id);
162        }
163        let on_region_opened = self.wal.on_region_opened();
164        on_region_opened(region_id, flushed_entry_id, &self.region.provider).await?;
165        Ok(())
166    }
167}