metric_engine/engine/
catchup.rs1use common_telemetry::debug;
16use snafu::{OptionExt, ResultExt};
17use store_api::region_engine::RegionEngine;
18use store_api::region_request::{AffectedRows, RegionCatchupRequest, RegionRequest};
19use store_api::storage::RegionId;
20
21use crate::engine::MetricEngineInner;
22use crate::error::{
23 MitoCatchupOperationSnafu, PhysicalRegionNotFoundSnafu, Result, UnsupportedRegionRequestSnafu,
24};
25use crate::utils;
26
27impl MetricEngineInner {
28 pub async fn catchup_region(
29 &self,
30 region_id: RegionId,
31 req: RegionCatchupRequest,
32 ) -> Result<AffectedRows> {
33 if !self.is_physical_region(region_id) {
34 return UnsupportedRegionRequestSnafu {
35 request: RegionRequest::Catchup(req),
36 }
37 .fail();
38 }
39 let data_region_id = utils::to_data_region_id(region_id);
40 let physical_region_options = *self
41 .state
42 .read()
43 .unwrap()
44 .physical_region_states()
45 .get(&data_region_id)
46 .context(PhysicalRegionNotFoundSnafu {
47 region_id: data_region_id,
48 })?
49 .options();
50
51 let metadata_region_id = utils::to_metadata_region_id(region_id);
52 debug!("Catchup metadata region {metadata_region_id}");
54 self.mito
55 .handle_request(
56 metadata_region_id,
57 RegionRequest::Catchup(RegionCatchupRequest {
58 set_writable: req.set_writable,
59 entry_id: req.metadata_entry_id,
60 metadata_entry_id: None,
61 location_id: req.location_id,
62 }),
63 )
64 .await
65 .context(MitoCatchupOperationSnafu)?;
66
67 debug!("Catchup data region {data_region_id}");
68 self.mito
69 .handle_request(
70 data_region_id,
71 RegionRequest::Catchup(RegionCatchupRequest {
72 set_writable: req.set_writable,
73 entry_id: req.entry_id,
74 metadata_entry_id: None,
75 location_id: req.location_id,
76 }),
77 )
78 .await
79 .context(MitoCatchupOperationSnafu)
80 .map(|response| response.affected_rows)?;
81
82 let primary_key_encoding = self.mito.get_primary_key_encoding(data_region_id).context(
83 PhysicalRegionNotFoundSnafu {
84 region_id: data_region_id,
85 },
86 )?;
87 self.recover_states(region_id, primary_key_encoding, physical_region_options)
88 .await?;
89 Ok(0)
90 }
91}