1use std::sync::Arc;
16use std::time::Instant;
17
18use common_base::readable_size::ReadableSize;
19use common_telemetry::{debug, error, info};
20use futures::future::try_join_all;
21use object_store::manager::ObjectStoreManagerRef;
22use snafu::{ResultExt, ensure};
23use store_api::metadata::RegionMetadataRef;
24use store_api::region_request::PathType;
25use store_api::storage::{FileId, IndexVersion, RegionId};
26
27use crate::access_layer::AccessLayerRef;
28use crate::config::MitoConfig;
29use crate::error::{self, InvalidSourceAndTargetRegionSnafu, Result};
30use crate::manifest::action::RegionManifest;
31use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions};
32use crate::region::opener::get_object_store;
33use crate::region::options::RegionOptions;
34use crate::sst::file::{RegionFileId, RegionIndexId};
35use crate::sst::location;
36
37#[derive(Debug, Clone)]
39pub struct RegionMetadataLoader {
40 config: Arc<MitoConfig>,
41 object_store_manager: ObjectStoreManagerRef,
42}
43
44impl RegionMetadataLoader {
45 pub fn new(config: Arc<MitoConfig>, object_store_manager: ObjectStoreManagerRef) -> Self {
47 Self {
48 config,
49 object_store_manager,
50 }
51 }
52
53 pub async fn load(
55 &self,
56 region_dir: &str,
57 region_options: &RegionOptions,
58 ) -> Result<Option<RegionMetadataRef>> {
59 let manifest = self
60 .load_manifest(region_dir, ®ion_options.storage)
61 .await?;
62 Ok(manifest.map(|m| m.metadata.clone()))
63 }
64
65 pub async fn load_manifest(
67 &self,
68 region_dir: &str,
69 storage: &Option<String>,
70 ) -> Result<Option<Arc<RegionManifest>>> {
71 let object_store = get_object_store(storage, &self.object_store_manager)?;
72 let region_manifest_options =
73 RegionManifestOptions::new(&self.config, region_dir, &object_store);
74 let Some(manifest_manager) =
75 RegionManifestManager::open(region_manifest_options, &Default::default()).await?
76 else {
77 return Ok(None);
78 };
79
80 let manifest = manifest_manager.manifest();
81 Ok(Some(manifest))
82 }
83}
84
85#[derive(Debug, Clone)]
87pub struct RegionFileCopier {
88 access_layer: AccessLayerRef,
89}
90
91#[derive(Debug, Clone, Copy)]
93pub enum FileDescriptor {
94 Index {
96 file_id: FileId,
97 version: IndexVersion,
98 size: u64,
99 },
100 Data { file_id: FileId, size: u64 },
102}
103
104impl FileDescriptor {
105 pub fn size(&self) -> u64 {
106 match self {
107 FileDescriptor::Index { size, .. } => *size,
108 FileDescriptor::Data { size, .. } => *size,
109 }
110 }
111}
112
113fn build_copy_file_paths(
125 source_region_id: RegionId,
126 target_region_id: RegionId,
127 file_descriptor: FileDescriptor,
128 table_dir: &str,
129 path_type: PathType,
130) -> (String, String) {
131 match file_descriptor {
132 FileDescriptor::Index {
133 file_id, version, ..
134 } => (
135 location::index_file_path(
136 table_dir,
137 RegionIndexId::new(RegionFileId::new(source_region_id, file_id), version),
138 path_type,
139 ),
140 location::index_file_path(
141 table_dir,
142 RegionIndexId::new(RegionFileId::new(target_region_id, file_id), version),
143 path_type,
144 ),
145 ),
146 FileDescriptor::Data { file_id, .. } => (
147 location::sst_file_path(
148 table_dir,
149 RegionFileId::new(source_region_id, file_id),
150 path_type,
151 ),
152 location::sst_file_path(
153 table_dir,
154 RegionFileId::new(target_region_id, file_id),
155 path_type,
156 ),
157 ),
158 }
159}
160
161fn build_delete_file_path(
162 target_region_id: RegionId,
163 file_descriptor: FileDescriptor,
164 table_dir: &str,
165 path_type: PathType,
166) -> String {
167 match file_descriptor {
168 FileDescriptor::Index {
169 file_id, version, ..
170 } => location::index_file_path(
171 table_dir,
172 RegionIndexId::new(RegionFileId::new(target_region_id, file_id), version),
173 path_type,
174 ),
175 FileDescriptor::Data { file_id, .. } => location::sst_file_path(
176 table_dir,
177 RegionFileId::new(target_region_id, file_id),
178 path_type,
179 ),
180 }
181}
182
183impl RegionFileCopier {
184 pub fn new(access_layer: AccessLayerRef) -> Self {
185 Self { access_layer }
186 }
187
188 pub async fn copy_files(
196 &self,
197 source_region_id: RegionId,
198 target_region_id: RegionId,
199 file_ids: Vec<FileDescriptor>,
200 parallelism: usize,
201 ) -> Result<()> {
202 ensure!(
203 source_region_id.table_id() == target_region_id.table_id(),
204 InvalidSourceAndTargetRegionSnafu {
205 source_region_id,
206 target_region_id,
207 },
208 );
209 let table_dir = self.access_layer.table_dir();
210 let path_type = self.access_layer.path_type();
211 let object_store = self.access_layer.object_store();
212
213 info!(
214 "Copying {} files from region {} to region {}",
215 file_ids.len(),
216 source_region_id,
217 target_region_id
218 );
219 debug!(
220 "Copying files: {:?} from region {} to region {}",
221 file_ids, source_region_id, target_region_id
222 );
223 let mut tasks = Vec::with_capacity(parallelism);
224 for skip in 0..parallelism {
225 let target_file_ids = file_ids.iter().skip(skip).step_by(parallelism).copied();
226 let object_store = object_store.clone();
227 tasks.push(async move {
228 for file_desc in target_file_ids {
229 let (source_path, target_path) = build_copy_file_paths(
230 source_region_id,
231 target_region_id,
232 file_desc,
233 table_dir,
234 path_type,
235 );
236 let now = Instant::now();
237 object_store
238 .copy(&source_path, &target_path)
239 .await
240 .inspect_err(
241 |e| error!(e; "Failed to copy file {} to {}", source_path, target_path),
242 )
243 .context(error::OpenDalSnafu)?;
244 let file_size = ReadableSize(file_desc.size());
245 info!(
246 "Copied file {} to {}, file size: {}, elapsed: {:?}",
247 source_path,
248 target_path,
249 file_size,
250 now.elapsed(),
251 );
252 }
253
254 Ok(())
255 });
256 }
257
258 if let Err(err) = try_join_all(tasks).await {
259 error!(err; "Failed to copy files from region {} to region {}", source_region_id, target_region_id);
260 self.clean_target_region(target_region_id, file_ids).await;
261 return Err(err);
262 }
263
264 Ok(())
265 }
266
267 async fn clean_target_region(&self, target_region_id: RegionId, file_ids: Vec<FileDescriptor>) {
269 let table_dir = self.access_layer.table_dir();
270 let path_type = self.access_layer.path_type();
271 let object_store = self.access_layer.object_store();
272 let delete_file_path = file_ids
273 .into_iter()
274 .map(|file_descriptor| {
275 build_delete_file_path(target_region_id, file_descriptor, table_dir, path_type)
276 })
277 .collect::<Vec<_>>();
278 debug!(
279 "Deleting files: {:?} after failed to copy files to target region {}",
280 delete_file_path, target_region_id
281 );
282 if let Err(err) = object_store.delete_iter(delete_file_path).await {
283 error!(err; "Failed to delete files from region {}", target_region_id);
284 }
285 }
286}
287
288#[cfg(test)]
289mod tests {
290 use super::*;
291
292 #[test]
293 fn test_build_copy_file_paths() {
294 common_telemetry::init_default_ut_logging();
295 let file_id = FileId::random();
296 let source_region_id = RegionId::new(1, 1);
297 let target_region_id = RegionId::new(1, 2);
298 let file_descriptor = FileDescriptor::Data { file_id, size: 100 };
299 let table_dir = "/table_dir";
300 let path_type = PathType::Bare;
301 let (source_path, target_path) = build_copy_file_paths(
302 source_region_id,
303 target_region_id,
304 file_descriptor,
305 table_dir,
306 path_type,
307 );
308 assert_eq!(
309 source_path,
310 format!("/table_dir/1_0000000001/{}.parquet", file_id)
311 );
312 assert_eq!(
313 target_path,
314 format!("/table_dir/1_0000000002/{}.parquet", file_id)
315 );
316
317 let version = 1;
318 let file_descriptor = FileDescriptor::Index {
319 file_id,
320 version,
321 size: 100,
322 };
323 let (source_path, target_path) = build_copy_file_paths(
324 source_region_id,
325 target_region_id,
326 file_descriptor,
327 table_dir,
328 path_type,
329 );
330 assert_eq!(
331 source_path,
332 format!(
333 "/table_dir/1_0000000001/index/{}.{}.puffin",
334 file_id, version
335 )
336 );
337 assert_eq!(
338 target_path,
339 format!(
340 "/table_dir/1_0000000002/index/{}.{}.puffin",
341 file_id, version
342 )
343 );
344 }
345}