1use std::sync::Arc;
16use std::time::Instant;
17
18use common_telemetry::{info, warn};
19use snafu::ensure;
20use store_api::logstore::LogStore;
21
22use crate::error::{self, Result};
23use crate::region::MitoRegion;
24use crate::region::opener::replay_memtable;
25use crate::wal::Wal;
26use crate::wal::entry_distributor::WalEntryReceiver;
27
28pub struct RegionCatchupTask<S> {
29 entry_receiver: Option<WalEntryReceiver>,
30 region: Arc<MitoRegion>,
31 replay_checkpoint_entry_id: Option<u64>,
32 expected_last_entry_id: Option<u64>,
33 allow_stale_entries: bool,
34 location_id: Option<u64>,
35 wal: Wal<S>,
36}
37
38impl<S: LogStore> RegionCatchupTask<S> {
39 pub fn new(region: Arc<MitoRegion>, wal: Wal<S>, allow_stale_entries: bool) -> Self {
40 Self {
41 entry_receiver: None,
42 region,
43 replay_checkpoint_entry_id: None,
44 expected_last_entry_id: None,
45 allow_stale_entries,
46 location_id: None,
47 wal,
48 }
49 }
50
51 pub(crate) fn with_location_id(mut self, location_id: Option<u64>) -> Self {
53 self.location_id = location_id;
54 self
55 }
56
57 pub(crate) fn with_expected_last_entry_id(
59 mut self,
60 expected_last_entry_id: Option<u64>,
61 ) -> Self {
62 self.expected_last_entry_id = expected_last_entry_id;
63 self
64 }
65
66 pub(crate) fn with_entry_receiver(mut self, entry_receiver: Option<WalEntryReceiver>) -> Self {
68 self.entry_receiver = entry_receiver;
69 self
70 }
71
72 pub(crate) fn with_replay_checkpoint_entry_id(
74 mut self,
75 replay_checkpoint_entry_id: Option<u64>,
76 ) -> Self {
77 self.replay_checkpoint_entry_id = replay_checkpoint_entry_id;
78 self
79 }
80
81 pub async fn run(&mut self) -> Result<()> {
82 if self.region.provider.is_remote_wal() {
83 self.remote_wal_catchup().await
84 } else {
85 self.local_wal_catchup().await
86 }
87 }
88
89 async fn remote_wal_catchup(&mut self) -> Result<()> {
90 let flushed_entry_id = self.region.version_control.current().last_entry_id;
91 let replay_from_entry_id = self
92 .replay_checkpoint_entry_id
93 .unwrap_or(flushed_entry_id)
94 .max(flushed_entry_id);
95 let region_id = self.region.region_id;
96 info!(
97 "Trying to replay memtable for region: {region_id}, provider: {:?}, replay from entry id: {replay_from_entry_id}, flushed entry id: {flushed_entry_id}",
98 self.region.provider
99 );
100 let timer = Instant::now();
101 let wal_entry_reader = self
102 .entry_receiver
103 .take()
104 .map(|r| Box::new(r) as _)
105 .unwrap_or_else(|| {
106 self.wal
107 .wal_entry_reader(&self.region.provider, region_id, self.location_id)
108 });
109 let on_region_opened = self.wal.on_region_opened();
110 let last_entry_id = replay_memtable(
111 &self.region.provider,
112 wal_entry_reader,
113 region_id,
114 replay_from_entry_id,
115 &self.region.version_control,
116 self.allow_stale_entries,
117 on_region_opened,
118 )
119 .await?;
120 info!(
121 "Elapsed: {:?}, region: {region_id}, provider: {:?} catchup finished. replay from entry id: {replay_from_entry_id}, flushed entry id: {flushed_entry_id}, last entry id: {last_entry_id}, expected: {:?}.",
122 timer.elapsed(),
123 self.region.provider,
124 self.expected_last_entry_id
125 );
126 if let Some(expected_last_entry_id) = self.expected_last_entry_id {
127 ensure!(
128 last_entry_id >= expected_last_entry_id,
130 error::UnexpectedSnafu {
131 reason: format!(
132 "Failed to catchup region {}, it was expected to replay to {}, but actually replayed to {}",
133 region_id, expected_last_entry_id, last_entry_id,
134 ),
135 }
136 )
137 }
138 Ok(())
139 }
140
141 async fn local_wal_catchup(&mut self) -> Result<()> {
142 let version = self.region.version_control.current();
143 let mut flushed_entry_id = version.last_entry_id;
144 let region_id = self.region.region_id;
145 let latest_entry_id = self
146 .wal
147 .store()
148 .latest_entry_id(&self.region.provider)
149 .unwrap_or_default();
150 info!(
151 "Skips to replay memtable for region: {}, flushed entry id: {}, latest entry id: {}",
152 region_id, flushed_entry_id, latest_entry_id
153 );
154
155 if latest_entry_id > flushed_entry_id {
156 warn!(
157 "Found latest entry id is greater than flushed entry id, using latest entry id as flushed entry id, region: {}, latest entry id: {}, flushed entry id: {}",
158 region_id, latest_entry_id, flushed_entry_id
159 );
160 flushed_entry_id = latest_entry_id;
161 self.region.version_control.set_entry_id(flushed_entry_id);
162 }
163 let on_region_opened = self.wal.on_region_opened();
164 on_region_opened(region_id, flushed_entry_id, &self.region.provider).await?;
165 Ok(())
166 }
167}