mito2/worker/
handle_remap.rs1use std::collections::HashMap;
16use std::time::Instant;
17
18use common_error::ext::BoxedError;
19use common_telemetry::info;
20use futures::future::try_join_all;
21use partition::expr::PartitionExpr;
22use snafu::{OptionExt, ResultExt};
23use store_api::storage::RegionId;
24
25use crate::error::{FetchManifestsSnafu, InvalidRequestSnafu, MissingManifestSnafu, Result};
26use crate::manifest::action::RegionManifest;
27use crate::region::{MitoRegionRef, RegionMetadataLoader};
28use crate::remap_manifest::RemapManifest;
29use crate::request::RemapManifestsRequest;
30use crate::sst::location::region_dir_from_table_dir;
31use crate::worker::RegionWorkerLoop;
32
33impl<S> RegionWorkerLoop<S> {
34 pub(crate) fn handle_remap_manifests_request(&mut self, request: RemapManifestsRequest) {
35 let region_id = request.region_id;
36 let sender = request.sender;
37 let region = match self.regions.staging_region(region_id) {
38 Ok(region) => region,
39 Err(e) => {
40 let _ = sender.send(Err(e));
41 return;
42 }
43 };
44
45 let same_table = request
46 .input_regions
47 .iter()
48 .map(|r| r.table_id())
49 .all(|t| t == region_id.table_id());
50
51 if !same_table {
52 let _ = sender.send(
53 InvalidRequestSnafu {
54 region_id,
55 reason: "Input regions must be from the same table",
56 }
57 .fail(),
58 );
59 return;
60 }
61
62 let region_metadata_loader =
63 RegionMetadataLoader::new(self.config.clone(), self.object_store_manager.clone());
64 common_runtime::spawn_global(async move {
65 let result = Self::fetch_and_remap_manifests(
66 region,
67 region_metadata_loader,
68 request.input_regions,
69 request.new_partition_exprs,
70 request.region_mapping,
71 )
72 .await;
73
74 let _ = sender.send(result);
75 });
76 }
77
78 async fn fetch_and_remap_manifests(
79 region: MitoRegionRef,
80 region_metadata_loader: RegionMetadataLoader,
81 input_regions: Vec<RegionId>,
82 new_partition_exprs: HashMap<RegionId, PartitionExpr>,
83 region_mapping: HashMap<RegionId, Vec<RegionId>>,
84 ) -> Result<HashMap<RegionId, RegionManifest>> {
85 let mut tasks = Vec::with_capacity(input_regions.len());
86 let region_options = region.version().options.clone();
87 let table_dir = region.table_dir();
88 let path_type = region.path_type();
89 let now = Instant::now();
90 for input_region in &input_regions {
91 let region_dir = region_dir_from_table_dir(table_dir, *input_region, path_type);
92 let storage = region_options.storage.clone();
93 let moved_region_metadata_loader = region_metadata_loader.clone();
94 tasks.push(async move {
95 moved_region_metadata_loader
96 .load_manifest(®ion_dir, &storage)
97 .await
98 });
99 }
100
101 let results = try_join_all(tasks)
102 .await
103 .map_err(BoxedError::new)
104 .context(FetchManifestsSnafu)?;
105 let manifests = results
106 .into_iter()
107 .zip(input_regions)
108 .map(|(manifest_res, region_id)| {
109 let manifest = manifest_res.context(MissingManifestSnafu { region_id })?;
110 Ok((region_id, (*manifest).clone()))
111 })
112 .collect::<Result<HashMap<_, _>>>()?;
113 let mut mapper = RemapManifest::new(manifests, new_partition_exprs, region_mapping);
114 let remap_result = mapper.remap_manifests()?;
115 info!(
116 "Remap manifests cost: {:?}, region: {}",
117 now.elapsed(),
118 region.region_id
119 );
120
121 Ok(remap_result.new_manifests)
122 }
123}