mito2/worker/
handle_catchup.rs1use std::sync::Arc;
18
19use common_telemetry::tracing::warn;
20use common_telemetry::{debug, info};
21use snafu::ensure;
22use store_api::logstore::LogStore;
23use store_api::region_engine::RegionRole;
24use store_api::region_request::{AffectedRows, RegionCatchupRequest};
25use store_api::storage::RegionId;
26use tokio::time::Instant;
27
28use crate::error::{self, Result};
29use crate::region::opener::{replay_memtable, RegionOpener};
30use crate::region::MitoRegion;
31use crate::worker::RegionWorkerLoop;
32
33impl<S: LogStore> RegionWorkerLoop<S> {
34 pub(crate) async fn handle_catchup_request(
35 &mut self,
36 region_id: RegionId,
37 request: RegionCatchupRequest,
38 ) -> Result<AffectedRows> {
39 let Some(region) = self.regions.get_region(region_id) else {
40 return error::RegionNotFoundSnafu { region_id }.fail();
41 };
42
43 if region.is_writable() {
44 debug!("Region {region_id} is writable, skip catchup");
45 return Ok(0);
46 }
47 let version = region.version();
50 let is_empty_memtable = version.memtables.is_empty();
51
52 let region = if !is_empty_memtable || region.manifest_ctx.has_update().await? {
54 self.reopen_region(®ion).await?
55 } else {
56 region
57 };
58
59 if region.provider.is_remote_wal() {
60 let flushed_entry_id = region.version_control.current().last_entry_id;
61 info!("Trying to replay memtable for region: {region_id}, flushed entry id: {flushed_entry_id}");
62 let timer = Instant::now();
63 let wal_entry_reader =
64 self.wal
65 .wal_entry_reader(®ion.provider, region_id, request.location_id);
66 let on_region_opened = self.wal.on_region_opened();
67 let last_entry_id = replay_memtable(
68 ®ion.provider,
69 wal_entry_reader,
70 region_id,
71 flushed_entry_id,
72 ®ion.version_control,
73 self.config.allow_stale_entries,
74 on_region_opened,
75 )
76 .await?;
77 info!(
78 "Elapsed: {:?}, region: {region_id} catchup finished. last entry id: {last_entry_id}, expected: {:?}.",
79 timer.elapsed(),
80 request.entry_id
81 );
82 if let Some(expected_last_entry_id) = request.entry_id {
83 ensure!(
84 last_entry_id >= expected_last_entry_id,
86 error::UnexpectedReplaySnafu {
87 region_id,
88 expected_last_entry_id,
89 replayed_last_entry_id: last_entry_id,
90 }
91 )
92 }
93 } else {
94 warn!("Skips to replay memtable for region: {}", region.region_id);
95 let flushed_entry_id = region.version_control.current().last_entry_id;
96 let on_region_opened = self.wal.on_region_opened();
97 on_region_opened(region_id, flushed_entry_id, ®ion.provider).await?;
98 }
99
100 if request.set_writable {
101 region.set_role(RegionRole::Leader);
102 }
103
104 Ok(0)
105 }
106
107 pub(crate) async fn reopen_region(
109 &mut self,
110 region: &Arc<MitoRegion>,
111 ) -> Result<Arc<MitoRegion>> {
112 let region_id = region.region_id;
113 let manifest_version = region.manifest_ctx.manifest_version().await;
114 let flushed_entry_id = region.version_control.current().last_entry_id;
115 info!("Reopening the region: {region_id}, manifest version: {manifest_version}, flushed entry id: {flushed_entry_id}");
116 let reopened_region = Arc::new(
117 RegionOpener::new(
118 region_id,
119 region.region_dir(),
120 self.memtable_builder_provider.clone(),
121 self.object_store_manager.clone(),
122 self.purge_scheduler.clone(),
123 self.puffin_manager_factory.clone(),
124 self.intermediate_manager.clone(),
125 self.time_provider.clone(),
126 )
127 .cache(Some(self.cache_manager.clone()))
128 .options(region.version().options.clone())?
129 .skip_wal_replay(true)
130 .open(&self.config, &self.wal)
131 .await?,
132 );
133 debug_assert!(!reopened_region.is_writable());
134 self.regions.insert_region(reopened_region.clone());
135
136 Ok(reopened_region)
137 }
138}