mito2/worker/
handle_catchup.rs1use std::sync::Arc;
18
19use common_telemetry::tracing::warn;
20use common_telemetry::{debug, error, info};
21use store_api::logstore::LogStore;
22use store_api::region_engine::{RegionRole, SettableRegionRoleState};
23use store_api::region_request::RegionCatchupRequest;
24use store_api::storage::RegionId;
25
26use crate::error::{self, Result};
27use crate::region::MitoRegion;
28use crate::region::catchup::RegionCatchupTask;
29use crate::region::opener::RegionOpener;
30use crate::request::OptionOutputTx;
31use crate::wal::entry_distributor::WalEntryReceiver;
32use crate::worker::RegionWorkerLoop;
33
34impl<S: LogStore> RegionWorkerLoop<S> {
35 pub(crate) async fn handle_catchup_request(
36 &mut self,
37 region_id: RegionId,
38 request: RegionCatchupRequest,
39 entry_receiver: Option<WalEntryReceiver>,
40 sender: OptionOutputTx,
41 ) {
42 let Some(region) = self.regions.get_region(region_id) else {
43 sender.send(Err(error::RegionNotFoundSnafu { region_id }.build()));
44 return;
45 };
46
47 if region.is_writable() {
48 debug!("Region {region_id} is writable, skip catchup");
49 sender.send(Ok(0));
50 return;
51 }
52
53 if self.catchup_regions.is_region_exists(region_id) {
54 warn!("Region {region_id} under catching up");
55 sender.send(Err(error::RegionBusySnafu { region_id }.build()));
56 return;
57 }
58
59 let region = match self.reopen_region_if_needed(region).await {
61 Ok(region) => region,
62 Err(e) => {
63 sender.send(Err(e));
64 return;
65 }
66 };
67
68 self.catchup_regions.insert_region(region_id);
69 let catchup_regions = self.catchup_regions.clone();
70 let wal = self.wal.clone();
71 let allow_stale_entries = self.config.allow_stale_entries;
72 common_runtime::spawn_global(async move {
73 let mut task = RegionCatchupTask::new(region.clone(), wal, allow_stale_entries)
74 .with_entry_receiver(entry_receiver)
75 .with_expected_last_entry_id(request.entry_id)
76 .with_location_id(request.location_id)
77 .with_replay_checkpoint_entry_id(request.checkpoint.map(|c| c.entry_id));
78
79 match task.run().await {
80 Ok(_) => {
81 if request.set_writable {
82 region.set_role(RegionRole::Leader);
83 if let Err(err) = region
85 .set_role_state_gracefully(SettableRegionRoleState::Leader)
86 .await
87 {
88 error!(err; "Failed to set region {region_id} to leader");
89 }
90 }
91 sender.send(Ok(0));
92 catchup_regions.remove_region(region_id);
93 }
94 Err(err) => {
95 error!(err; "Failed to catchup region {region_id}");
96 sender.send(Err(err));
97 catchup_regions.remove_region(region_id);
98 }
99 }
100 });
101 }
102
103 pub(crate) async fn reopen_region(
105 &mut self,
106 region: &Arc<MitoRegion>,
107 ) -> Result<Arc<MitoRegion>> {
108 let region_id = region.region_id;
109 let manifest_version = region.manifest_ctx.manifest_version().await;
110 let flushed_entry_id = region.version_control.current().last_entry_id;
111 info!(
112 "Reopening the region: {region_id}, manifest version: {manifest_version}, flushed entry id: {flushed_entry_id}"
113 );
114 let reopened_region = Arc::new(
115 RegionOpener::new(
116 region_id,
117 region.table_dir(),
118 region.access_layer.path_type(),
119 self.memtable_builder_provider.clone(),
120 self.object_store_manager.clone(),
121 self.purge_scheduler.clone(),
122 self.puffin_manager_factory.clone(),
123 self.intermediate_manager.clone(),
124 self.time_provider.clone(),
125 self.file_ref_manager.clone(),
126 self.partition_expr_fetcher.clone(),
127 )
128 .cache(Some(self.cache_manager.clone()))
129 .options(region.version().options.clone())?
130 .skip_wal_replay(true)
131 .open(&self.config, &self.wal)
132 .await?,
133 );
134 debug_assert!(!reopened_region.is_writable());
135 self.regions.insert_region(reopened_region.clone());
136
137 Ok(reopened_region)
138 }
139
140 async fn reopen_region_if_needed(
141 &mut self,
142 region: Arc<MitoRegion>,
143 ) -> Result<Arc<MitoRegion>> {
144 let version = region.version();
147 let is_empty_memtable = version.memtables.is_empty();
148
149 let region = if !is_empty_memtable || region.manifest_ctx.has_update().await? {
151 if !is_empty_memtable {
152 warn!(
153 "Region {} memtables is not empty, which should not happen, manifest version: {}, last entry id: {}",
154 region.region_id,
155 region.manifest_ctx.manifest_version().await,
156 region.version_control.current().last_entry_id
157 );
158 }
159 self.reopen_region(®ion).await?
160 } else {
161 region
162 };
163
164 Ok(region)
165 }
166}