mito2/region/
utils.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16use std::time::Instant;
17
18use common_base::readable_size::ReadableSize;
19use common_telemetry::{debug, error, info};
20use futures::future::try_join_all;
21use object_store::manager::ObjectStoreManagerRef;
22use snafu::{ResultExt, ensure};
23use store_api::metadata::RegionMetadataRef;
24use store_api::region_request::PathType;
25use store_api::storage::{FileId, IndexVersion, RegionId};
26
27use crate::access_layer::AccessLayerRef;
28use crate::config::MitoConfig;
29use crate::error::{self, InvalidSourceAndTargetRegionSnafu, Result};
30use crate::manifest::action::RegionManifest;
31use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions};
32use crate::region::opener::get_object_store;
33use crate::region::options::RegionOptions;
34use crate::sst::file::{RegionFileId, RegionIndexId};
35use crate::sst::location;
36
37/// A loader for loading metadata from a region dir.
38#[derive(Debug, Clone)]
39pub struct RegionMetadataLoader {
40    config: Arc<MitoConfig>,
41    object_store_manager: ObjectStoreManagerRef,
42}
43
44impl RegionMetadataLoader {
45    /// Creates a new `RegionMetadataLoader`.
46    pub fn new(config: Arc<MitoConfig>, object_store_manager: ObjectStoreManagerRef) -> Self {
47        Self {
48            config,
49            object_store_manager,
50        }
51    }
52
53    /// Loads the metadata of the region from the region dir.
54    pub async fn load(
55        &self,
56        region_dir: &str,
57        region_options: &RegionOptions,
58    ) -> Result<Option<RegionMetadataRef>> {
59        let manifest = self
60            .load_manifest(region_dir, &region_options.storage)
61            .await?;
62        Ok(manifest.map(|m| m.metadata.clone()))
63    }
64
65    /// Loads the manifest of the region from the region dir.
66    pub async fn load_manifest(
67        &self,
68        region_dir: &str,
69        storage: &Option<String>,
70    ) -> Result<Option<Arc<RegionManifest>>> {
71        let object_store = get_object_store(storage, &self.object_store_manager)?;
72        let region_manifest_options =
73            RegionManifestOptions::new(&self.config, region_dir, &object_store);
74        let Some(manifest_manager) =
75            RegionManifestManager::open(region_manifest_options, &Default::default()).await?
76        else {
77            return Ok(None);
78        };
79
80        let manifest = manifest_manager.manifest();
81        Ok(Some(manifest))
82    }
83}
84
85/// A copier for copying files from a region to another region.
86#[derive(Debug, Clone)]
87pub struct RegionFileCopier {
88    access_layer: AccessLayerRef,
89}
90
91/// A descriptor for a file.
92#[derive(Debug, Clone, Copy)]
93pub enum FileDescriptor {
94    /// An index file.
95    Index {
96        file_id: FileId,
97        version: IndexVersion,
98        size: u64,
99    },
100    /// A data file.
101    Data { file_id: FileId, size: u64 },
102}
103
104impl FileDescriptor {
105    pub fn size(&self) -> u64 {
106        match self {
107            FileDescriptor::Index { size, .. } => *size,
108            FileDescriptor::Data { size, .. } => *size,
109        }
110    }
111
112    fn file_path(&self, region_id: RegionId, table_dir: &str, path_type: PathType) -> String {
113        match *self {
114            FileDescriptor::Index {
115                file_id, version, ..
116            } => location::index_file_path(
117                table_dir,
118                RegionIndexId::new(RegionFileId::new(region_id, file_id), version),
119                path_type,
120            ),
121            FileDescriptor::Data { file_id, .. } => {
122                location::sst_file_path(table_dir, RegionFileId::new(region_id, file_id), path_type)
123            }
124        }
125    }
126}
127
128/// Builds the source and target file paths for a given file descriptor.
129///
130/// # Arguments
131///
132/// * `source_region_id`: The ID of the source region.
133/// * `target_region_id`: The ID of the target region.
134/// * `file_id`: The ID of the file.
135///
136/// # Returns
137///
138/// A tuple containing the source and target file paths.
139fn build_copy_file_paths(
140    source_region_id: RegionId,
141    target_region_id: RegionId,
142    file_descriptor: FileDescriptor,
143    table_dir: &str,
144    path_type: PathType,
145) -> (String, String) {
146    (
147        file_descriptor.file_path(source_region_id, table_dir, path_type),
148        file_descriptor.file_path(target_region_id, table_dir, path_type),
149    )
150}
151
152fn build_delete_file_path(
153    target_region_id: RegionId,
154    file_descriptor: FileDescriptor,
155    table_dir: &str,
156    path_type: PathType,
157) -> String {
158    file_descriptor.file_path(target_region_id, table_dir, path_type)
159}
160
161impl RegionFileCopier {
162    pub fn new(access_layer: AccessLayerRef) -> Self {
163        Self { access_layer }
164    }
165
166    /// Copies files from a source region to a target region.
167    ///
168    /// # Arguments
169    ///
170    /// * `source_region_id`: The ID of the source region.
171    /// * `target_region_id`: The ID of the target region.
172    /// * `file_ids`: The IDs of the files to copy.
173    pub async fn copy_files(
174        &self,
175        source_region_id: RegionId,
176        target_region_id: RegionId,
177        file_ids: Vec<FileDescriptor>,
178        parallelism: usize,
179    ) -> Result<()> {
180        ensure!(
181            source_region_id.table_id() == target_region_id.table_id(),
182            InvalidSourceAndTargetRegionSnafu {
183                source_region_id,
184                target_region_id,
185            },
186        );
187        let table_dir = self.access_layer.table_dir();
188        let path_type = self.access_layer.path_type();
189        let object_store = self.access_layer.object_store();
190
191        info!(
192            "Copying {} files from region {} to region {}",
193            file_ids.len(),
194            source_region_id,
195            target_region_id
196        );
197        debug!(
198            "Copying files: {:?} from region {} to region {}",
199            file_ids, source_region_id, target_region_id
200        );
201        let mut tasks = Vec::with_capacity(parallelism);
202        for skip in 0..parallelism {
203            let target_file_ids = file_ids.iter().skip(skip).step_by(parallelism).copied();
204            let object_store = object_store.clone();
205            tasks.push(async move {
206                for file_desc in target_file_ids {
207                    let (source_path, target_path) = build_copy_file_paths(
208                        source_region_id,
209                        target_region_id,
210                        file_desc,
211                        table_dir,
212                        path_type,
213                    );
214                    let now = Instant::now();
215                    object_store
216                        .copy(&source_path, &target_path)
217                        .await
218                        .inspect_err(
219                            |e| error!(e; "Failed to copy file {} to {}", source_path, target_path),
220                        )
221                        .context(error::OpenDalSnafu)?;
222                    let file_size = ReadableSize(file_desc.size());
223                    info!(
224                        "Copied file {} to {}, file size: {}, elapsed: {:?}",
225                        source_path,
226                        target_path,
227                        file_size,
228                        now.elapsed(),
229                    );
230                }
231
232                Ok(())
233            });
234        }
235
236        if let Err(err) = try_join_all(tasks).await {
237            error!(err; "Failed to copy files from region {} to region {}", source_region_id, target_region_id);
238            self.clean_target_region(target_region_id, file_ids).await;
239            return Err(err);
240        }
241
242        Ok(())
243    }
244
245    /// Cleans the copied files from the target region.
246    async fn clean_target_region(&self, target_region_id: RegionId, file_ids: Vec<FileDescriptor>) {
247        let table_dir = self.access_layer.table_dir();
248        let path_type = self.access_layer.path_type();
249        let object_store = self.access_layer.object_store();
250        let delete_file_path = file_ids
251            .into_iter()
252            .map(|file_descriptor| {
253                build_delete_file_path(target_region_id, file_descriptor, table_dir, path_type)
254            })
255            .collect::<Vec<_>>();
256        debug!(
257            "Deleting files: {:?} after failed to copy files to target region {}",
258            delete_file_path, target_region_id
259        );
260        if let Err(err) = object_store.delete_iter(delete_file_path).await {
261            error!(err; "Failed to delete files from region {}", target_region_id);
262        }
263    }
264}
265
266#[cfg(test)]
267mod tests {
268    use super::*;
269
270    #[test]
271    fn test_build_copy_file_paths() {
272        common_telemetry::init_default_ut_logging();
273        let file_id = FileId::random();
274        let source_region_id = RegionId::new(1, 1);
275        let target_region_id = RegionId::new(1, 2);
276        let file_descriptor = FileDescriptor::Data { file_id, size: 100 };
277        let table_dir = "/table_dir";
278        let path_type = PathType::Bare;
279        let (source_path, target_path) = build_copy_file_paths(
280            source_region_id,
281            target_region_id,
282            file_descriptor,
283            table_dir,
284            path_type,
285        );
286        assert_eq!(
287            source_path,
288            format!("/table_dir/1_0000000001/{}.parquet", file_id)
289        );
290        assert_eq!(
291            target_path,
292            format!("/table_dir/1_0000000002/{}.parquet", file_id)
293        );
294
295        let version = 1;
296        let file_descriptor = FileDescriptor::Index {
297            file_id,
298            version,
299            size: 100,
300        };
301        let (source_path, target_path) = build_copy_file_paths(
302            source_region_id,
303            target_region_id,
304            file_descriptor,
305            table_dir,
306            path_type,
307        );
308        assert_eq!(
309            source_path,
310            format!(
311                "/table_dir/1_0000000001/index/{}.{}.puffin",
312                file_id, version
313            )
314        );
315        assert_eq!(
316            target_path,
317            format!(
318                "/table_dir/1_0000000002/index/{}.{}.puffin",
319                file_id, version
320            )
321        );
322    }
323
324    #[test]
325    fn test_build_delete_file_path() {
326        common_telemetry::init_default_ut_logging();
327        let file_id = FileId::random();
328        let target_region_id = RegionId::new(1, 2);
329        let table_dir = "/table_dir";
330        let path_type = PathType::Bare;
331
332        let file_descriptor = FileDescriptor::Data { file_id, size: 100 };
333        let path = build_delete_file_path(target_region_id, file_descriptor, table_dir, path_type);
334        assert_eq!(path, format!("/table_dir/1_0000000002/{}.parquet", file_id));
335
336        let file_descriptor = FileDescriptor::Index {
337            file_id,
338            version: 1,
339            size: 100,
340        };
341        let path = build_delete_file_path(target_region_id, file_descriptor, table_dir, path_type);
342        assert_eq!(
343            path,
344            format!("/table_dir/1_0000000002/index/{}.1.puffin", file_id)
345        );
346    }
347}