mito2/worker/
handle_remap.rs1use std::collections::HashMap;
16use std::time::Instant;
17
18use common_error::ext::BoxedError;
19use common_telemetry::{debug, info};
20use futures::future::try_join_all;
21use partition::expr::PartitionExpr;
22use snafu::{OptionExt, ResultExt};
23use store_api::storage::RegionId;
24
25use crate::error::{self, FetchManifestsSnafu, InvalidRequestSnafu, MissingManifestSnafu, Result};
26use crate::region::{MitoRegionRef, RegionMetadataLoader};
27use crate::remap_manifest::RemapManifest;
28use crate::request::RemapManifestsRequest;
29use crate::sst::location::region_dir_from_table_dir;
30use crate::worker::RegionWorkerLoop;
31
32impl<S> RegionWorkerLoop<S> {
33 pub(crate) fn handle_remap_manifests_request(&mut self, request: RemapManifestsRequest) {
34 let region_id = request.region_id;
35 let sender = request.sender;
36 let region = match self.regions.staging_region(region_id) {
37 Ok(region) => region,
38 Err(e) => {
39 let _ = sender.send(Err(e));
40 return;
41 }
42 };
43
44 let same_table = request
45 .input_regions
46 .iter()
47 .map(|r| r.table_id())
48 .all(|t| t == region_id.table_id());
49
50 if !same_table {
51 let _ = sender.send(
52 InvalidRequestSnafu {
53 region_id,
54 reason: "Input regions must be from the same table",
55 }
56 .fail(),
57 );
58 return;
59 }
60
61 let region_metadata_loader =
62 RegionMetadataLoader::new(self.config.clone(), self.object_store_manager.clone());
63 common_runtime::spawn_global(async move {
64 let result = Self::fetch_and_remap_manifests(
65 region,
66 region_metadata_loader,
67 request.input_regions,
68 request.new_partition_exprs,
69 request.region_mapping,
70 )
71 .await;
72
73 let _ = sender.send(result);
74 });
75 }
76
77 async fn fetch_and_remap_manifests(
82 region: MitoRegionRef,
83 region_metadata_loader: RegionMetadataLoader,
84 input_regions: Vec<RegionId>,
85 new_partition_exprs: HashMap<RegionId, PartitionExpr>,
86 region_mapping: HashMap<RegionId, Vec<RegionId>>,
87 ) -> Result<HashMap<RegionId, String>> {
88 let mut tasks = Vec::with_capacity(input_regions.len());
89 let region_options = region.version().options.clone();
90 let table_dir = region.table_dir();
91 let path_type = region.path_type();
92 let now = Instant::now();
93 for input_region in &input_regions {
94 let region_dir = region_dir_from_table_dir(table_dir, *input_region, path_type);
95 let storage = region_options.storage.clone();
96 let moved_region_metadata_loader = region_metadata_loader.clone();
97 tasks.push(async move {
98 moved_region_metadata_loader
99 .load_manifest(®ion_dir, &storage)
100 .await
101 });
102 }
103 let results = try_join_all(tasks)
104 .await
105 .map_err(BoxedError::new)
106 .context(FetchManifestsSnafu)?;
107 let manifests = results
108 .into_iter()
109 .zip(input_regions)
110 .map(|(manifest_res, region_id)| {
111 let manifest = manifest_res.context(MissingManifestSnafu { region_id })?;
112 Ok((region_id, (*manifest).clone()))
113 })
114 .collect::<Result<HashMap<_, _>>>()?;
115 let mut mapper = RemapManifest::new(manifests, new_partition_exprs, region_mapping);
116 let remap_result = mapper.remap_manifests()?;
117
118 let manifest_manager = region.manifest_ctx.manifest_manager.write().await;
120 let manifest_storage = manifest_manager.store();
121 let staging_blob_storage = manifest_storage.staging_storage().blob_storage().clone();
122 let mut tasks = Vec::with_capacity(remap_result.new_manifests.len());
123
124 for (remap_region_id, manifest) in &remap_result.new_manifests {
125 let bytes = serde_json::to_vec(&manifest).context(error::SerializeManifestSnafu {
126 region_id: *remap_region_id,
127 })?;
128 let key = remap_manifest_key(remap_region_id);
129 tasks.push(async {
130 debug!(
131 "Putting manifest to staging blob storage, region_id: {}, key: {}",
132 *remap_region_id, key
133 );
134 staging_blob_storage.put(&key, bytes).await?;
135 Ok((*remap_region_id, key))
136 });
137 }
138 let r = try_join_all(tasks).await?;
139 info!(
140 "Remap manifests cost: {:?}, region: {}",
141 now.elapsed(),
142 region.region_id
143 );
144
145 Ok(r.into_iter().collect::<HashMap<_, _>>())
146 }
147}
148
149fn remap_manifest_key(region_id: &RegionId) -> String {
150 format!("remap_manifest_{}", region_id.as_u64())
151}