1use std::sync::Arc;
16use std::time::Instant;
17
18use common_base::readable_size::ReadableSize;
19use common_telemetry::{debug, error, info};
20use futures::future::try_join_all;
21use object_store::manager::ObjectStoreManagerRef;
22use snafu::{ResultExt, ensure};
23use store_api::metadata::RegionMetadataRef;
24use store_api::region_request::PathType;
25use store_api::storage::{FileId, IndexVersion, RegionId};
26
27use crate::access_layer::AccessLayerRef;
28use crate::config::MitoConfig;
29use crate::error::{self, InvalidSourceAndTargetRegionSnafu, Result};
30use crate::manifest::action::RegionManifest;
31use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions};
32use crate::region::opener::get_object_store;
33use crate::region::options::RegionOptions;
34use crate::sst::file::{RegionFileId, RegionIndexId};
35use crate::sst::location;
36
37#[derive(Debug, Clone)]
39pub struct RegionMetadataLoader {
40 config: Arc<MitoConfig>,
41 object_store_manager: ObjectStoreManagerRef,
42}
43
44impl RegionMetadataLoader {
45 pub fn new(config: Arc<MitoConfig>, object_store_manager: ObjectStoreManagerRef) -> Self {
47 Self {
48 config,
49 object_store_manager,
50 }
51 }
52
53 pub async fn load(
55 &self,
56 region_dir: &str,
57 region_options: &RegionOptions,
58 ) -> Result<Option<RegionMetadataRef>> {
59 let manifest = self
60 .load_manifest(region_dir, ®ion_options.storage)
61 .await?;
62 Ok(manifest.map(|m| m.metadata.clone()))
63 }
64
65 pub async fn load_manifest(
67 &self,
68 region_dir: &str,
69 storage: &Option<String>,
70 ) -> Result<Option<Arc<RegionManifest>>> {
71 let object_store = get_object_store(storage, &self.object_store_manager)?;
72 let region_manifest_options =
73 RegionManifestOptions::new(&self.config, region_dir, &object_store);
74 let Some(manifest_manager) =
75 RegionManifestManager::open(region_manifest_options, &Default::default()).await?
76 else {
77 return Ok(None);
78 };
79
80 let manifest = manifest_manager.manifest();
81 Ok(Some(manifest))
82 }
83}
84
85#[derive(Debug, Clone)]
87pub struct RegionFileCopier {
88 access_layer: AccessLayerRef,
89}
90
91#[derive(Debug, Clone, Copy)]
93pub enum FileDescriptor {
94 Index {
96 file_id: FileId,
97 version: IndexVersion,
98 size: u64,
99 },
100 Data { file_id: FileId, size: u64 },
102}
103
104impl FileDescriptor {
105 pub fn size(&self) -> u64 {
106 match self {
107 FileDescriptor::Index { size, .. } => *size,
108 FileDescriptor::Data { size, .. } => *size,
109 }
110 }
111
112 fn file_path(&self, region_id: RegionId, table_dir: &str, path_type: PathType) -> String {
113 match *self {
114 FileDescriptor::Index {
115 file_id, version, ..
116 } => location::index_file_path(
117 table_dir,
118 RegionIndexId::new(RegionFileId::new(region_id, file_id), version),
119 path_type,
120 ),
121 FileDescriptor::Data { file_id, .. } => {
122 location::sst_file_path(table_dir, RegionFileId::new(region_id, file_id), path_type)
123 }
124 }
125 }
126}
127
128fn build_copy_file_paths(
140 source_region_id: RegionId,
141 target_region_id: RegionId,
142 file_descriptor: FileDescriptor,
143 table_dir: &str,
144 path_type: PathType,
145) -> (String, String) {
146 (
147 file_descriptor.file_path(source_region_id, table_dir, path_type),
148 file_descriptor.file_path(target_region_id, table_dir, path_type),
149 )
150}
151
152fn build_delete_file_path(
153 target_region_id: RegionId,
154 file_descriptor: FileDescriptor,
155 table_dir: &str,
156 path_type: PathType,
157) -> String {
158 file_descriptor.file_path(target_region_id, table_dir, path_type)
159}
160
161impl RegionFileCopier {
162 pub fn new(access_layer: AccessLayerRef) -> Self {
163 Self { access_layer }
164 }
165
166 pub async fn copy_files(
174 &self,
175 source_region_id: RegionId,
176 target_region_id: RegionId,
177 file_ids: Vec<FileDescriptor>,
178 parallelism: usize,
179 ) -> Result<()> {
180 ensure!(
181 source_region_id.table_id() == target_region_id.table_id(),
182 InvalidSourceAndTargetRegionSnafu {
183 source_region_id,
184 target_region_id,
185 },
186 );
187 let table_dir = self.access_layer.table_dir();
188 let path_type = self.access_layer.path_type();
189 let object_store = self.access_layer.object_store();
190
191 info!(
192 "Copying {} files from region {} to region {}",
193 file_ids.len(),
194 source_region_id,
195 target_region_id
196 );
197 debug!(
198 "Copying files: {:?} from region {} to region {}",
199 file_ids, source_region_id, target_region_id
200 );
201 let mut tasks = Vec::with_capacity(parallelism);
202 for skip in 0..parallelism {
203 let target_file_ids = file_ids.iter().skip(skip).step_by(parallelism).copied();
204 let object_store = object_store.clone();
205 tasks.push(async move {
206 for file_desc in target_file_ids {
207 let (source_path, target_path) = build_copy_file_paths(
208 source_region_id,
209 target_region_id,
210 file_desc,
211 table_dir,
212 path_type,
213 );
214 let now = Instant::now();
215 object_store
216 .copy(&source_path, &target_path)
217 .await
218 .inspect_err(
219 |e| error!(e; "Failed to copy file {} to {}", source_path, target_path),
220 )
221 .context(error::OpenDalSnafu)?;
222 let file_size = ReadableSize(file_desc.size());
223 info!(
224 "Copied file {} to {}, file size: {}, elapsed: {:?}",
225 source_path,
226 target_path,
227 file_size,
228 now.elapsed(),
229 );
230 }
231
232 Ok(())
233 });
234 }
235
236 if let Err(err) = try_join_all(tasks).await {
237 error!(err; "Failed to copy files from region {} to region {}", source_region_id, target_region_id);
238 self.clean_target_region(target_region_id, file_ids).await;
239 return Err(err);
240 }
241
242 Ok(())
243 }
244
245 async fn clean_target_region(&self, target_region_id: RegionId, file_ids: Vec<FileDescriptor>) {
247 let table_dir = self.access_layer.table_dir();
248 let path_type = self.access_layer.path_type();
249 let object_store = self.access_layer.object_store();
250 let delete_file_path = file_ids
251 .into_iter()
252 .map(|file_descriptor| {
253 build_delete_file_path(target_region_id, file_descriptor, table_dir, path_type)
254 })
255 .collect::<Vec<_>>();
256 debug!(
257 "Deleting files: {:?} after failed to copy files to target region {}",
258 delete_file_path, target_region_id
259 );
260 if let Err(err) = object_store.delete_iter(delete_file_path).await {
261 error!(err; "Failed to delete files from region {}", target_region_id);
262 }
263 }
264}
265
266#[cfg(test)]
267mod tests {
268 use super::*;
269
270 #[test]
271 fn test_build_copy_file_paths() {
272 common_telemetry::init_default_ut_logging();
273 let file_id = FileId::random();
274 let source_region_id = RegionId::new(1, 1);
275 let target_region_id = RegionId::new(1, 2);
276 let file_descriptor = FileDescriptor::Data { file_id, size: 100 };
277 let table_dir = "/table_dir";
278 let path_type = PathType::Bare;
279 let (source_path, target_path) = build_copy_file_paths(
280 source_region_id,
281 target_region_id,
282 file_descriptor,
283 table_dir,
284 path_type,
285 );
286 assert_eq!(
287 source_path,
288 format!("/table_dir/1_0000000001/{}.parquet", file_id)
289 );
290 assert_eq!(
291 target_path,
292 format!("/table_dir/1_0000000002/{}.parquet", file_id)
293 );
294
295 let version = 1;
296 let file_descriptor = FileDescriptor::Index {
297 file_id,
298 version,
299 size: 100,
300 };
301 let (source_path, target_path) = build_copy_file_paths(
302 source_region_id,
303 target_region_id,
304 file_descriptor,
305 table_dir,
306 path_type,
307 );
308 assert_eq!(
309 source_path,
310 format!(
311 "/table_dir/1_0000000001/index/{}.{}.puffin",
312 file_id, version
313 )
314 );
315 assert_eq!(
316 target_path,
317 format!(
318 "/table_dir/1_0000000002/index/{}.{}.puffin",
319 file_id, version
320 )
321 );
322 }
323
324 #[test]
325 fn test_build_delete_file_path() {
326 common_telemetry::init_default_ut_logging();
327 let file_id = FileId::random();
328 let target_region_id = RegionId::new(1, 2);
329 let table_dir = "/table_dir";
330 let path_type = PathType::Bare;
331
332 let file_descriptor = FileDescriptor::Data { file_id, size: 100 };
333 let path = build_delete_file_path(target_region_id, file_descriptor, table_dir, path_type);
334 assert_eq!(path, format!("/table_dir/1_0000000002/{}.parquet", file_id));
335
336 let file_descriptor = FileDescriptor::Index {
337 file_id,
338 version: 1,
339 size: 100,
340 };
341 let path = build_delete_file_path(target_region_id, file_descriptor, table_dir, path_type);
342 assert_eq!(
343 path,
344 format!("/table_dir/1_0000000002/index/{}.1.puffin", file_id)
345 );
346 }
347}