mito2/worker/
handle_remap.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::time::Instant;
17
18use common_error::ext::BoxedError;
19use common_telemetry::{debug, info};
20use futures::future::try_join_all;
21use partition::expr::PartitionExpr;
22use snafu::{OptionExt, ResultExt};
23use store_api::storage::RegionId;
24
25use crate::error::{self, FetchManifestsSnafu, InvalidRequestSnafu, MissingManifestSnafu, Result};
26use crate::region::{MitoRegionRef, RegionMetadataLoader};
27use crate::remap_manifest::RemapManifest;
28use crate::request::RemapManifestsRequest;
29use crate::sst::location::region_dir_from_table_dir;
30use crate::worker::RegionWorkerLoop;
31
32impl<S> RegionWorkerLoop<S> {
33    pub(crate) fn handle_remap_manifests_request(&mut self, request: RemapManifestsRequest) {
34        let region_id = request.region_id;
35        let sender = request.sender;
36        let region = match self.regions.staging_region(region_id) {
37            Ok(region) => region,
38            Err(e) => {
39                let _ = sender.send(Err(e));
40                return;
41            }
42        };
43
44        let same_table = request
45            .input_regions
46            .iter()
47            .map(|r| r.table_id())
48            .all(|t| t == region_id.table_id());
49
50        if !same_table {
51            let _ = sender.send(
52                InvalidRequestSnafu {
53                    region_id,
54                    reason: "Input regions must be from the same table",
55                }
56                .fail(),
57            );
58            return;
59        }
60
61        let region_metadata_loader =
62            RegionMetadataLoader::new(self.config.clone(), self.object_store_manager.clone());
63        common_runtime::spawn_global(async move {
64            let result = Self::fetch_and_remap_manifests(
65                region,
66                region_metadata_loader,
67                request.input_regions,
68                request.new_partition_exprs,
69                request.region_mapping,
70            )
71            .await;
72
73            let _ = sender.send(result);
74        });
75    }
76
77    // Fetches manifests for input regions, remaps them according to the provided
78    // mapping and partition expressions.
79    //
80    // Returns a map from each new region to its relative staging manifest path.
81    async fn fetch_and_remap_manifests(
82        region: MitoRegionRef,
83        region_metadata_loader: RegionMetadataLoader,
84        input_regions: Vec<RegionId>,
85        new_partition_exprs: HashMap<RegionId, PartitionExpr>,
86        region_mapping: HashMap<RegionId, Vec<RegionId>>,
87    ) -> Result<HashMap<RegionId, String>> {
88        let mut tasks = Vec::with_capacity(input_regions.len());
89        let region_options = region.version().options.clone();
90        let table_dir = region.table_dir();
91        let path_type = region.path_type();
92        let now = Instant::now();
93        for input_region in &input_regions {
94            let region_dir = region_dir_from_table_dir(table_dir, *input_region, path_type);
95            let storage = region_options.storage.clone();
96            let moved_region_metadata_loader = region_metadata_loader.clone();
97            tasks.push(async move {
98                moved_region_metadata_loader
99                    .load_manifest(&region_dir, &storage)
100                    .await
101            });
102        }
103        let results = try_join_all(tasks)
104            .await
105            .map_err(BoxedError::new)
106            .context(FetchManifestsSnafu)?;
107        let manifests = results
108            .into_iter()
109            .zip(input_regions)
110            .map(|(manifest_res, region_id)| {
111                let manifest = manifest_res.context(MissingManifestSnafu { region_id })?;
112                Ok((region_id, (*manifest).clone()))
113            })
114            .collect::<Result<HashMap<_, _>>>()?;
115        let mut mapper = RemapManifest::new(manifests, new_partition_exprs, region_mapping);
116        let remap_result = mapper.remap_manifests()?;
117
118        // Write new manifests to staging blob storage.
119        let manifest_manager = region.manifest_ctx.manifest_manager.write().await;
120        let manifest_storage = manifest_manager.store();
121        let staging_blob_storage = manifest_storage.staging_storage().blob_storage().clone();
122        let mut tasks = Vec::with_capacity(remap_result.new_manifests.len());
123
124        for (remap_region_id, manifest) in &remap_result.new_manifests {
125            let bytes = serde_json::to_vec(&manifest).context(error::SerializeManifestSnafu {
126                region_id: *remap_region_id,
127            })?;
128            let key = remap_manifest_key(remap_region_id);
129            tasks.push(async {
130                debug!(
131                    "Putting manifest to staging blob storage, region_id: {}, key: {}",
132                    *remap_region_id, key
133                );
134                staging_blob_storage.put(&key, bytes).await?;
135                Ok((*remap_region_id, key))
136            });
137        }
138        let r = try_join_all(tasks).await?;
139        info!(
140            "Remap manifests cost: {:?}, region: {}",
141            now.elapsed(),
142            region.region_id
143        );
144
145        Ok(r.into_iter().collect::<HashMap<_, _>>())
146    }
147}
148
149fn remap_manifest_key(region_id: &RegionId) -> String {
150    format!("remap_manifest_{}", region_id.as_u64())
151}