metric_engine/engine/
catchup.rs1use common_telemetry::debug;
16use snafu::{OptionExt, ResultExt};
17use store_api::region_engine::RegionEngine;
18use store_api::region_request::{
19 AffectedRows, RegionCatchupRequest, RegionRequest, ReplayCheckpoint,
20};
21use store_api::storage::RegionId;
22
23use crate::engine::MetricEngineInner;
24use crate::error::{
25 MitoCatchupOperationSnafu, PhysicalRegionNotFoundSnafu, Result, UnsupportedRegionRequestSnafu,
26};
27use crate::utils;
28
29impl MetricEngineInner {
30 pub async fn catchup_region(
31 &self,
32 region_id: RegionId,
33 req: RegionCatchupRequest,
34 ) -> Result<AffectedRows> {
35 if !self.is_physical_region(region_id) {
36 return UnsupportedRegionRequestSnafu {
37 request: RegionRequest::Catchup(req),
38 }
39 .fail();
40 }
41 let data_region_id = utils::to_data_region_id(region_id);
42 let physical_region_options = *self
43 .state
44 .read()
45 .unwrap()
46 .physical_region_states()
47 .get(&data_region_id)
48 .context(PhysicalRegionNotFoundSnafu {
49 region_id: data_region_id,
50 })?
51 .options();
52
53 let metadata_region_id = utils::to_metadata_region_id(region_id);
54 debug!("Catchup metadata region {metadata_region_id}");
56 self.mito
57 .handle_request(
58 metadata_region_id,
59 RegionRequest::Catchup(RegionCatchupRequest {
60 set_writable: req.set_writable,
61 entry_id: req.metadata_entry_id,
62 metadata_entry_id: None,
63 location_id: req.location_id,
64 checkpoint: req.checkpoint.map(|c| ReplayCheckpoint {
65 entry_id: c.metadata_entry_id.unwrap_or_default(),
66 metadata_entry_id: None,
67 }),
68 }),
69 )
70 .await
71 .context(MitoCatchupOperationSnafu)?;
72
73 debug!("Catchup data region {data_region_id}");
74 self.mito
75 .handle_request(
76 data_region_id,
77 RegionRequest::Catchup(RegionCatchupRequest {
78 set_writable: req.set_writable,
79 entry_id: req.entry_id,
80 metadata_entry_id: None,
81 location_id: req.location_id,
82 checkpoint: req.checkpoint.map(|c| ReplayCheckpoint {
83 entry_id: c.entry_id,
84 metadata_entry_id: None,
85 }),
86 }),
87 )
88 .await
89 .context(MitoCatchupOperationSnafu)
90 .map(|response| response.affected_rows)?;
91
92 self.recover_states(region_id, physical_region_options)
93 .await?;
94 Ok(0)
95 }
96}