metric_engine/engine/
catchup.rs1use std::collections::HashMap;
16
17use common_error::ext::BoxedError;
18use snafu::{OptionExt, ResultExt};
19use store_api::region_engine::{BatchResponses, RegionEngine};
20use store_api::region_request::{RegionCatchupRequest, ReplayCheckpoint};
21use store_api::storage::RegionId;
22
23use crate::engine::MetricEngineInner;
24use crate::error::{BatchCatchupMitoRegionSnafu, PhysicalRegionNotFoundSnafu, Result};
25use crate::utils;
26
27impl MetricEngineInner {
28 pub async fn handle_batch_catchup_requests(
29 &self,
30 parallelism: usize,
31 requests: Vec<(RegionId, RegionCatchupRequest)>,
32 ) -> Result<BatchResponses> {
33 let mut all_requests = Vec::with_capacity(requests.len() * 2);
34 let mut physical_region_options_list = Vec::with_capacity(requests.len());
35
36 for (region_id, req) in requests {
37 let metadata_region_id = utils::to_metadata_region_id(region_id);
38 let data_region_id = utils::to_data_region_id(region_id);
39
40 let physical_region_options = *self
41 .state
42 .read()
43 .unwrap()
44 .physical_region_states()
45 .get(&data_region_id)
46 .context(PhysicalRegionNotFoundSnafu {
47 region_id: data_region_id,
48 })?
49 .options();
50 physical_region_options_list.push((data_region_id, physical_region_options));
51 all_requests.push((
52 metadata_region_id,
53 RegionCatchupRequest {
54 set_writable: req.set_writable,
55 entry_id: req.metadata_entry_id,
56 metadata_entry_id: None,
57 location_id: req.location_id,
58 checkpoint: req.checkpoint.map(|c| ReplayCheckpoint {
59 entry_id: c.metadata_entry_id.unwrap_or_default(),
60 metadata_entry_id: None,
61 }),
62 },
63 ));
64 all_requests.push((
65 data_region_id,
66 RegionCatchupRequest {
67 set_writable: req.set_writable,
68 entry_id: req.entry_id,
69 metadata_entry_id: None,
70 location_id: req.location_id,
71 checkpoint: req.checkpoint.map(|c| ReplayCheckpoint {
72 entry_id: c.entry_id,
73 metadata_entry_id: None,
74 }),
75 },
76 ));
77 }
78
79 let mut results = self
80 .mito
81 .handle_batch_catchup_requests(parallelism, all_requests)
82 .await
83 .context(BatchCatchupMitoRegionSnafu {})?
84 .into_iter()
85 .collect::<HashMap<_, _>>();
86
87 let mut responses = Vec::with_capacity(physical_region_options_list.len());
88 for (physical_region_id, physical_region_options) in physical_region_options_list {
89 let metadata_region_id = utils::to_metadata_region_id(physical_region_id);
90 let data_region_id = utils::to_data_region_id(physical_region_id);
91 let metadata_region_result = results.remove(&metadata_region_id);
92 let data_region_result = results.remove(&data_region_id);
93
94 let response = self
99 .recover_physical_region_with_results(
100 metadata_region_result,
101 data_region_result,
102 physical_region_id,
103 physical_region_options,
104 false,
108 )
109 .await
110 .map_err(BoxedError::new);
111 responses.push((physical_region_id, response));
112 }
113
114 Ok(responses)
115 }
116}