mito2/worker/
handle_catchup.rs1use std::sync::Arc;
18
19use common_telemetry::tracing::warn;
20use common_telemetry::{debug, info};
21use snafu::ensure;
22use store_api::logstore::LogStore;
23use store_api::region_engine::{RegionRole, SettableRegionRoleState};
24use store_api::region_request::{AffectedRows, RegionCatchupRequest};
25use store_api::storage::RegionId;
26use tokio::time::Instant;
27
28use crate::error::{self, Result};
29use crate::region::MitoRegion;
30use crate::region::opener::{RegionOpener, replay_memtable};
31use crate::worker::RegionWorkerLoop;
32
33impl<S: LogStore> RegionWorkerLoop<S> {
34 pub(crate) async fn handle_catchup_request(
35 &mut self,
36 region_id: RegionId,
37 request: RegionCatchupRequest,
38 ) -> Result<AffectedRows> {
39 let Some(region) = self.regions.get_region(region_id) else {
40 return error::RegionNotFoundSnafu { region_id }.fail();
41 };
42
43 if region.is_writable() {
44 debug!("Region {region_id} is writable, skip catchup");
45 return Ok(0);
46 }
47 let version = region.version();
50 let is_empty_memtable = version.memtables.is_empty();
51
52 let region = if !is_empty_memtable || region.manifest_ctx.has_update().await? {
54 if !is_empty_memtable {
55 warn!(
56 "Region {} memtables is not empty, which should not happen, manifest version: {}, last entry id: {}",
57 region.region_id,
58 region.manifest_ctx.manifest_version().await,
59 region.version_control.current().last_entry_id
60 );
61 }
62 self.reopen_region(®ion).await?
63 } else {
64 region
65 };
66
67 if region.provider.is_remote_wal() {
68 let flushed_entry_id = region.version_control.current().last_entry_id;
69 let replay_from_entry_id = request
70 .checkpoint
71 .map(|c| c.entry_id)
72 .unwrap_or_default()
73 .max(flushed_entry_id);
74 info!(
75 "Trying to replay memtable for region: {region_id}, provider: {:?}, replay from entry id: {replay_from_entry_id}, flushed entry id: {flushed_entry_id}",
76 region.provider
77 );
78 let timer = Instant::now();
79 let wal_entry_reader =
80 self.wal
81 .wal_entry_reader(®ion.provider, region_id, request.location_id);
82 let on_region_opened = self.wal.on_region_opened();
83 let last_entry_id = replay_memtable(
84 ®ion.provider,
85 wal_entry_reader,
86 region_id,
87 replay_from_entry_id,
88 ®ion.version_control,
89 self.config.allow_stale_entries,
90 on_region_opened,
91 )
92 .await?;
93 info!(
94 "Elapsed: {:?}, region: {region_id}, provider: {:?} catchup finished. replay from entry id: {replay_from_entry_id}, flushed entry id: {flushed_entry_id}, last entry id: {last_entry_id}, expected: {:?}.",
95 timer.elapsed(),
96 region.provider,
97 request.entry_id
98 );
99 if let Some(expected_last_entry_id) = request.entry_id {
100 ensure!(
101 last_entry_id >= expected_last_entry_id,
103 error::UnexpectedSnafu {
104 reason: format!(
105 "failed to set region {} to writable, it was expected to replayed to {}, but actually replayed to {}",
106 region_id, expected_last_entry_id, last_entry_id,
107 ),
108 }
109 )
110 }
111 } else {
112 let version = region.version_control.current();
113 let mut flushed_entry_id = version.last_entry_id;
114
115 let latest_entry_id = self
116 .wal
117 .store()
118 .latest_entry_id(®ion.provider)
119 .unwrap_or_default();
120 warn!(
121 "Skips to replay memtable for region: {}, flushed entry id: {}, latest entry id: {}",
122 region.region_id, flushed_entry_id, latest_entry_id
123 );
124
125 if latest_entry_id > flushed_entry_id {
126 warn!(
127 "Found latest entry id is greater than flushed entry id, using latest entry id as flushed entry id, region: {}, latest entry id: {}, flushed entry id: {}",
128 region_id, latest_entry_id, flushed_entry_id
129 );
130 flushed_entry_id = latest_entry_id;
131 region.version_control.set_entry_id(flushed_entry_id);
132 }
133 let on_region_opened = self.wal.on_region_opened();
134 on_region_opened(region_id, flushed_entry_id, ®ion.provider).await?;
135 }
136
137 if request.set_writable {
138 region.set_role(RegionRole::Leader);
139 region
141 .set_role_state_gracefully(SettableRegionRoleState::Leader)
142 .await?;
143 }
144
145 Ok(0)
146 }
147
148 pub(crate) async fn reopen_region(
150 &mut self,
151 region: &Arc<MitoRegion>,
152 ) -> Result<Arc<MitoRegion>> {
153 let region_id = region.region_id;
154 let manifest_version = region.manifest_ctx.manifest_version().await;
155 let flushed_entry_id = region.version_control.current().last_entry_id;
156 info!(
157 "Reopening the region: {region_id}, manifest version: {manifest_version}, flushed entry id: {flushed_entry_id}"
158 );
159 let reopened_region = Arc::new(
160 RegionOpener::new(
161 region_id,
162 region.table_dir(),
163 region.access_layer.path_type(),
164 self.memtable_builder_provider.clone(),
165 self.object_store_manager.clone(),
166 self.purge_scheduler.clone(),
167 self.puffin_manager_factory.clone(),
168 self.intermediate_manager.clone(),
169 self.time_provider.clone(),
170 self.file_ref_manager.clone(),
171 self.partition_expr_fetcher.clone(),
172 )
173 .cache(Some(self.cache_manager.clone()))
174 .options(region.version().options.clone())?
175 .skip_wal_replay(true)
176 .open(&self.config, &self.wal)
177 .await?,
178 );
179 debug_assert!(!reopened_region.is_writable());
180 self.regions.insert_region(reopened_region.clone());
181
182 Ok(reopened_region)
183 }
184}