mito2/region/
utils.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16use std::time::Instant;
17
18use common_base::readable_size::ReadableSize;
19use common_telemetry::{debug, error, info};
20use futures::future::try_join_all;
21use object_store::manager::ObjectStoreManagerRef;
22use snafu::{ResultExt, ensure};
23use store_api::metadata::RegionMetadataRef;
24use store_api::region_request::PathType;
25use store_api::storage::{FileId, IndexVersion, RegionId};
26
27use crate::access_layer::AccessLayerRef;
28use crate::config::MitoConfig;
29use crate::error::{self, InvalidSourceAndTargetRegionSnafu, Result};
30use crate::manifest::action::RegionManifest;
31use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions};
32use crate::region::opener::get_object_store;
33use crate::region::options::RegionOptions;
34use crate::sst::file::{RegionFileId, RegionIndexId};
35use crate::sst::location;
36
37/// A loader for loading metadata from a region dir.
38#[derive(Debug, Clone)]
39pub struct RegionMetadataLoader {
40    config: Arc<MitoConfig>,
41    object_store_manager: ObjectStoreManagerRef,
42}
43
44impl RegionMetadataLoader {
45    /// Creates a new `RegionMetadataLoader`.
46    pub fn new(config: Arc<MitoConfig>, object_store_manager: ObjectStoreManagerRef) -> Self {
47        Self {
48            config,
49            object_store_manager,
50        }
51    }
52
53    /// Loads the metadata of the region from the region dir.
54    pub async fn load(
55        &self,
56        region_dir: &str,
57        region_options: &RegionOptions,
58    ) -> Result<Option<RegionMetadataRef>> {
59        let manifest = self
60            .load_manifest(region_dir, &region_options.storage)
61            .await?;
62        Ok(manifest.map(|m| m.metadata.clone()))
63    }
64
65    /// Loads the manifest of the region from the region dir.
66    pub async fn load_manifest(
67        &self,
68        region_dir: &str,
69        storage: &Option<String>,
70    ) -> Result<Option<Arc<RegionManifest>>> {
71        let object_store = get_object_store(storage, &self.object_store_manager)?;
72        let region_manifest_options =
73            RegionManifestOptions::new(&self.config, region_dir, &object_store);
74        let Some(manifest_manager) =
75            RegionManifestManager::open(region_manifest_options, &Default::default()).await?
76        else {
77            return Ok(None);
78        };
79
80        let manifest = manifest_manager.manifest();
81        Ok(Some(manifest))
82    }
83}
84
85/// A copier for copying files from a region to another region.
86#[derive(Debug, Clone)]
87pub struct RegionFileCopier {
88    access_layer: AccessLayerRef,
89}
90
91/// A descriptor for a file.
92#[derive(Debug, Clone, Copy)]
93pub enum FileDescriptor {
94    /// An index file.
95    Index {
96        file_id: FileId,
97        version: IndexVersion,
98        size: u64,
99    },
100    /// A data file.
101    Data { file_id: FileId, size: u64 },
102}
103
104impl FileDescriptor {
105    pub fn size(&self) -> u64 {
106        match self {
107            FileDescriptor::Index { size, .. } => *size,
108            FileDescriptor::Data { size, .. } => *size,
109        }
110    }
111}
112
113/// Builds the source and target file paths for a given file descriptor.
114///
115/// # Arguments
116///
117/// * `source_region_id`: The ID of the source region.
118/// * `target_region_id`: The ID of the target region.
119/// * `file_id`: The ID of the file.
120///
121/// # Returns
122///
123/// A tuple containing the source and target file paths.
124fn build_copy_file_paths(
125    source_region_id: RegionId,
126    target_region_id: RegionId,
127    file_descriptor: FileDescriptor,
128    table_dir: &str,
129    path_type: PathType,
130) -> (String, String) {
131    match file_descriptor {
132        FileDescriptor::Index {
133            file_id, version, ..
134        } => (
135            location::index_file_path(
136                table_dir,
137                RegionIndexId::new(RegionFileId::new(source_region_id, file_id), version),
138                path_type,
139            ),
140            location::index_file_path(
141                table_dir,
142                RegionIndexId::new(RegionFileId::new(target_region_id, file_id), version),
143                path_type,
144            ),
145        ),
146        FileDescriptor::Data { file_id, .. } => (
147            location::sst_file_path(
148                table_dir,
149                RegionFileId::new(source_region_id, file_id),
150                path_type,
151            ),
152            location::sst_file_path(
153                table_dir,
154                RegionFileId::new(target_region_id, file_id),
155                path_type,
156            ),
157        ),
158    }
159}
160
161fn build_delete_file_path(
162    target_region_id: RegionId,
163    file_descriptor: FileDescriptor,
164    table_dir: &str,
165    path_type: PathType,
166) -> String {
167    match file_descriptor {
168        FileDescriptor::Index {
169            file_id, version, ..
170        } => location::index_file_path(
171            table_dir,
172            RegionIndexId::new(RegionFileId::new(target_region_id, file_id), version),
173            path_type,
174        ),
175        FileDescriptor::Data { file_id, .. } => location::sst_file_path(
176            table_dir,
177            RegionFileId::new(target_region_id, file_id),
178            path_type,
179        ),
180    }
181}
182
183impl RegionFileCopier {
184    pub fn new(access_layer: AccessLayerRef) -> Self {
185        Self { access_layer }
186    }
187
188    /// Copies files from a source region to a target region.
189    ///
190    /// # Arguments
191    ///
192    /// * `source_region_id`: The ID of the source region.
193    /// * `target_region_id`: The ID of the target region.
194    /// * `file_ids`: The IDs of the files to copy.
195    pub async fn copy_files(
196        &self,
197        source_region_id: RegionId,
198        target_region_id: RegionId,
199        file_ids: Vec<FileDescriptor>,
200        parallelism: usize,
201    ) -> Result<()> {
202        ensure!(
203            source_region_id.table_id() == target_region_id.table_id(),
204            InvalidSourceAndTargetRegionSnafu {
205                source_region_id,
206                target_region_id,
207            },
208        );
209        let table_dir = self.access_layer.table_dir();
210        let path_type = self.access_layer.path_type();
211        let object_store = self.access_layer.object_store();
212
213        info!(
214            "Copying {} files from region {} to region {}",
215            file_ids.len(),
216            source_region_id,
217            target_region_id
218        );
219        debug!(
220            "Copying files: {:?} from region {} to region {}",
221            file_ids, source_region_id, target_region_id
222        );
223        let mut tasks = Vec::with_capacity(parallelism);
224        for skip in 0..parallelism {
225            let target_file_ids = file_ids.iter().skip(skip).step_by(parallelism).copied();
226            let object_store = object_store.clone();
227            tasks.push(async move {
228                for file_desc in target_file_ids {
229                    let (source_path, target_path) = build_copy_file_paths(
230                        source_region_id,
231                        target_region_id,
232                        file_desc,
233                        table_dir,
234                        path_type,
235                    );
236                    let now = Instant::now();
237                    object_store
238                        .copy(&source_path, &target_path)
239                        .await
240                        .inspect_err(
241                            |e| error!(e; "Failed to copy file {} to {}", source_path, target_path),
242                        )
243                        .context(error::OpenDalSnafu)?;
244                    let file_size = ReadableSize(file_desc.size());
245                    info!(
246                        "Copied file {} to {}, file size: {}, elapsed: {:?}",
247                        source_path,
248                        target_path,
249                        file_size,
250                        now.elapsed(),
251                    );
252                }
253
254                Ok(())
255            });
256        }
257
258        if let Err(err) = try_join_all(tasks).await {
259            error!(err; "Failed to copy files from region {} to region {}", source_region_id, target_region_id);
260            self.clean_target_region(target_region_id, file_ids).await;
261            return Err(err);
262        }
263
264        Ok(())
265    }
266
267    /// Cleans the copied files from the target region.
268    async fn clean_target_region(&self, target_region_id: RegionId, file_ids: Vec<FileDescriptor>) {
269        let table_dir = self.access_layer.table_dir();
270        let path_type = self.access_layer.path_type();
271        let object_store = self.access_layer.object_store();
272        let delete_file_path = file_ids
273            .into_iter()
274            .map(|file_descriptor| {
275                build_delete_file_path(target_region_id, file_descriptor, table_dir, path_type)
276            })
277            .collect::<Vec<_>>();
278        debug!(
279            "Deleting files: {:?} after failed to copy files to target region {}",
280            delete_file_path, target_region_id
281        );
282        if let Err(err) = object_store.delete_iter(delete_file_path).await {
283            error!(err; "Failed to delete files from region {}", target_region_id);
284        }
285    }
286}
287
288#[cfg(test)]
289mod tests {
290    use super::*;
291
292    #[test]
293    fn test_build_copy_file_paths() {
294        common_telemetry::init_default_ut_logging();
295        let file_id = FileId::random();
296        let source_region_id = RegionId::new(1, 1);
297        let target_region_id = RegionId::new(1, 2);
298        let file_descriptor = FileDescriptor::Data { file_id, size: 100 };
299        let table_dir = "/table_dir";
300        let path_type = PathType::Bare;
301        let (source_path, target_path) = build_copy_file_paths(
302            source_region_id,
303            target_region_id,
304            file_descriptor,
305            table_dir,
306            path_type,
307        );
308        assert_eq!(
309            source_path,
310            format!("/table_dir/1_0000000001/{}.parquet", file_id)
311        );
312        assert_eq!(
313            target_path,
314            format!("/table_dir/1_0000000002/{}.parquet", file_id)
315        );
316
317        let version = 1;
318        let file_descriptor = FileDescriptor::Index {
319            file_id,
320            version,
321            size: 100,
322        };
323        let (source_path, target_path) = build_copy_file_paths(
324            source_region_id,
325            target_region_id,
326            file_descriptor,
327            table_dir,
328            path_type,
329        );
330        assert_eq!(
331            source_path,
332            format!(
333                "/table_dir/1_0000000001/index/{}.{}.puffin",
334                file_id, version
335            )
336        );
337        assert_eq!(
338            target_path,
339            format!(
340                "/table_dir/1_0000000002/index/{}.{}.puffin",
341                file_id, version
342            )
343        );
344    }
345}