mito2/worker/
handle_catchup.rs1use std::sync::Arc;
18
19use common_telemetry::tracing::warn;
20use common_telemetry::{debug, info};
21use snafu::ensure;
22use store_api::logstore::LogStore;
23use store_api::region_engine::RegionRole;
24use store_api::region_request::{AffectedRows, RegionCatchupRequest};
25use store_api::storage::RegionId;
26use tokio::time::Instant;
27
28use crate::error::{self, Result};
29use crate::region::opener::{replay_memtable, RegionOpener};
30use crate::region::MitoRegion;
31use crate::worker::RegionWorkerLoop;
32
33impl<S: LogStore> RegionWorkerLoop<S> {
34 pub(crate) async fn handle_catchup_request(
35 &mut self,
36 region_id: RegionId,
37 request: RegionCatchupRequest,
38 ) -> Result<AffectedRows> {
39 let Some(region) = self.regions.get_region(region_id) else {
40 return error::RegionNotFoundSnafu { region_id }.fail();
41 };
42
43 if region.is_writable() {
44 debug!("Region {region_id} is writable, skip catchup");
45 return Ok(0);
46 }
47 let version = region.version();
50 let is_empty_memtable = version.memtables.is_empty();
51
52 let region = if !is_empty_memtable || region.manifest_ctx.has_update().await? {
54 if !is_empty_memtable {
55 warn!("Region {} memtables is not empty, which should not happen, manifest version: {}, last entry id: {}",
56 region.region_id,
57 region.manifest_ctx.manifest_version().await,
58 region.version_control.current().last_entry_id
59 );
60 }
61 self.reopen_region(®ion).await?
62 } else {
63 region
64 };
65
66 if region.provider.is_remote_wal() {
67 let flushed_entry_id = region.version_control.current().last_entry_id;
68 info!("Trying to replay memtable for region: {region_id}, flushed entry id: {flushed_entry_id}");
69 let timer = Instant::now();
70 let wal_entry_reader =
71 self.wal
72 .wal_entry_reader(®ion.provider, region_id, request.location_id);
73 let on_region_opened = self.wal.on_region_opened();
74 let last_entry_id = replay_memtable(
75 ®ion.provider,
76 wal_entry_reader,
77 region_id,
78 flushed_entry_id,
79 ®ion.version_control,
80 self.config.allow_stale_entries,
81 on_region_opened,
82 )
83 .await?;
84 info!(
85 "Elapsed: {:?}, region: {region_id} catchup finished. last entry id: {last_entry_id}, expected: {:?}.",
86 timer.elapsed(),
87 request.entry_id
88 );
89 if let Some(expected_last_entry_id) = request.entry_id {
90 ensure!(
91 last_entry_id >= expected_last_entry_id,
93 error::UnexpectedSnafu {
94 reason: format!(
95 "failed to set region {} to writable, it was expected to replayed to {}, but actually replayed to {}",
96 region_id, expected_last_entry_id, last_entry_id,
97 ),
98 }
99 )
100 }
101 } else {
102 let version = region.version_control.current();
103 let mut flushed_entry_id = version.last_entry_id;
104
105 let high_watermark = self
106 .wal
107 .store()
108 .high_watermark(®ion.provider)
109 .unwrap_or_default();
110 warn!(
111 "Skips to replay memtable for region: {}, flushed entry id: {}, high watermark: {}",
112 region.region_id, flushed_entry_id, high_watermark
113 );
114
115 if high_watermark > flushed_entry_id {
116 warn!(
117 "Found high watermark is greater than flushed entry id, using high watermark as flushed entry id, region: {}, high watermark: {}, flushed entry id: {}",
118 region_id, high_watermark, flushed_entry_id
119 );
120 flushed_entry_id = high_watermark;
121 region.version_control.set_entry_id(flushed_entry_id);
122 }
123 let on_region_opened = self.wal.on_region_opened();
124 on_region_opened(region_id, flushed_entry_id, ®ion.provider).await?;
125 }
126
127 if request.set_writable {
128 region.set_role(RegionRole::Leader);
129 }
130
131 Ok(0)
132 }
133
134 pub(crate) async fn reopen_region(
136 &mut self,
137 region: &Arc<MitoRegion>,
138 ) -> Result<Arc<MitoRegion>> {
139 let region_id = region.region_id;
140 let manifest_version = region.manifest_ctx.manifest_version().await;
141 let flushed_entry_id = region.version_control.current().last_entry_id;
142 info!("Reopening the region: {region_id}, manifest version: {manifest_version}, flushed entry id: {flushed_entry_id}");
143 let reopened_region = Arc::new(
144 RegionOpener::new(
145 region_id,
146 region.region_dir(),
147 self.memtable_builder_provider.clone(),
148 self.object_store_manager.clone(),
149 self.purge_scheduler.clone(),
150 self.puffin_manager_factory.clone(),
151 self.intermediate_manager.clone(),
152 self.time_provider.clone(),
153 )
154 .cache(Some(self.cache_manager.clone()))
155 .options(region.version().options.clone())?
156 .skip_wal_replay(true)
157 .open(&self.config, &self.wal)
158 .await?,
159 );
160 debug_assert!(!reopened_region.is_writable());
161 self.regions.insert_region(reopened_region.clone());
162
163 Ok(reopened_region)
164 }
165}