mito2/worker/
handle_remap.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::time::Instant;
17
18use common_error::ext::BoxedError;
19use common_telemetry::info;
20use futures::future::try_join_all;
21use partition::expr::PartitionExpr;
22use snafu::{OptionExt, ResultExt};
23use store_api::storage::RegionId;
24
25use crate::error::{FetchManifestsSnafu, InvalidRequestSnafu, MissingManifestSnafu, Result};
26use crate::manifest::action::RegionManifest;
27use crate::region::{MitoRegionRef, RegionMetadataLoader};
28use crate::remap_manifest::RemapManifest;
29use crate::request::RemapManifestsRequest;
30use crate::sst::location::region_dir_from_table_dir;
31use crate::worker::RegionWorkerLoop;
32
33impl<S> RegionWorkerLoop<S> {
34    pub(crate) fn handle_remap_manifests_request(&mut self, request: RemapManifestsRequest) {
35        let region_id = request.region_id;
36        let sender = request.sender;
37        let region = match self.regions.staging_region(region_id) {
38            Ok(region) => region,
39            Err(e) => {
40                let _ = sender.send(Err(e));
41                return;
42            }
43        };
44
45        let same_table = request
46            .input_regions
47            .iter()
48            .map(|r| r.table_id())
49            .all(|t| t == region_id.table_id());
50
51        if !same_table {
52            let _ = sender.send(
53                InvalidRequestSnafu {
54                    region_id,
55                    reason: "Input regions must be from the same table",
56                }
57                .fail(),
58            );
59            return;
60        }
61
62        let region_metadata_loader =
63            RegionMetadataLoader::new(self.config.clone(), self.object_store_manager.clone());
64        common_runtime::spawn_global(async move {
65            let result = Self::fetch_and_remap_manifests(
66                region,
67                region_metadata_loader,
68                request.input_regions,
69                request.new_partition_exprs,
70                request.region_mapping,
71            )
72            .await;
73
74            let _ = sender.send(result);
75        });
76    }
77
78    async fn fetch_and_remap_manifests(
79        region: MitoRegionRef,
80        region_metadata_loader: RegionMetadataLoader,
81        input_regions: Vec<RegionId>,
82        new_partition_exprs: HashMap<RegionId, PartitionExpr>,
83        region_mapping: HashMap<RegionId, Vec<RegionId>>,
84    ) -> Result<HashMap<RegionId, RegionManifest>> {
85        let mut tasks = Vec::with_capacity(input_regions.len());
86        let region_options = region.version().options.clone();
87        let table_dir = region.table_dir();
88        let path_type = region.path_type();
89        let now = Instant::now();
90        for input_region in &input_regions {
91            let region_dir = region_dir_from_table_dir(table_dir, *input_region, path_type);
92            let storage = region_options.storage.clone();
93            let moved_region_metadata_loader = region_metadata_loader.clone();
94            tasks.push(async move {
95                moved_region_metadata_loader
96                    .load_manifest(&region_dir, &storage)
97                    .await
98            });
99        }
100
101        let results = try_join_all(tasks)
102            .await
103            .map_err(BoxedError::new)
104            .context(FetchManifestsSnafu)?;
105        let manifests = results
106            .into_iter()
107            .zip(input_regions)
108            .map(|(manifest_res, region_id)| {
109                let manifest = manifest_res.context(MissingManifestSnafu { region_id })?;
110                Ok((region_id, (*manifest).clone()))
111            })
112            .collect::<Result<HashMap<_, _>>>()?;
113        let mut mapper = RemapManifest::new(manifests, new_partition_exprs, region_mapping);
114        let remap_result = mapper.remap_manifests()?;
115        info!(
116            "Remap manifests cost: {:?}, region: {}",
117            now.elapsed(),
118            region.region_id
119        );
120
121        Ok(remap_result.new_manifests)
122    }
123}