1use common_telemetry::{debug, error, info};
16use snafu::OptionExt;
17use store_api::region_engine::MitoCopyRegionFromResponse;
18use store_api::storage::{FileId, RegionId};
19
20use crate::error::{InvalidRequestSnafu, MissingManifestSnafu, Result};
21use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList};
22use crate::region::{FileDescriptor, MitoRegionRef, RegionFileCopier, RegionMetadataLoader};
23use crate::request::{
24 BackgroundNotify, CopyRegionFromFinished, CopyRegionFromRequest, WorkerRequest,
25};
26use crate::sst::location::region_dir_from_table_dir;
27use crate::worker::{RegionWorkerLoop, WorkerRequestWithTime};
28
29impl<S> RegionWorkerLoop<S> {
30 pub(crate) fn handle_copy_region_from_request(&mut self, request: CopyRegionFromRequest) {
31 let region_id = request.region_id;
32 let source_region_id = request.source_region_id;
33 let sender = request.sender;
34 let region = match self.regions.writable_region(region_id) {
35 Ok(region) => region,
36 Err(e) => {
37 let _ = sender.send(Err(e));
38 return;
39 }
40 };
41
42 let same_table = source_region_id.table_id() == region_id.table_id();
43 if !same_table {
44 let _ = sender.send(
45 InvalidRequestSnafu {
46 region_id,
47 reason: format!("Source and target regions must be from the same table, source_region_id: {source_region_id}, target_region_id: {region_id}"),
48 }
49 .fail(),
50 );
51 return;
52 }
53 if source_region_id == region_id {
54 let _ = sender.send(
55 InvalidRequestSnafu {
56 region_id,
57 reason: format!("Source and target regions must be different, source_region_id: {source_region_id}, target_region_id: {region_id}"),
58 }
59 .fail(),
60 );
61 return;
62 }
63
64 let region_metadata_loader =
65 RegionMetadataLoader::new(self.config.clone(), self.object_store_manager.clone());
66 let worker_sender = self.sender.clone();
67
68 common_runtime::spawn_global(async move {
69 let (region_edit, file_ids) = match Self::copy_region_from(
70 ®ion,
71 region_metadata_loader,
72 source_region_id,
73 region_id,
74 request.parallelism.max(1),
75 )
76 .await
77 {
78 Ok(region_files) => region_files,
79 Err(e) => {
80 let _ = sender.send(Err(e));
81 return;
82 }
83 };
84
85 match region_edit {
86 Some(region_edit) => {
87 if let Err(e) = worker_sender
88 .send(WorkerRequestWithTime::new(WorkerRequest::Background {
89 region_id,
90 notify: BackgroundNotify::CopyRegionFromFinished(
91 CopyRegionFromFinished {
92 region_id,
93 edit: region_edit,
94 sender,
95 },
96 ),
97 }))
98 .await
99 {
100 error!(e; "Failed to send copy region from finished notification to worker, region_id: {}", region_id);
101 }
102 }
103 None => {
104 let _ = sender.send(Ok(MitoCopyRegionFromResponse {
105 copied_file_ids: file_ids,
106 }));
107 }
108 }
109 });
110 }
111
112 pub(crate) fn handle_copy_region_from_finished(&mut self, request: CopyRegionFromFinished) {
113 let region_id = request.region_id;
114 let sender = request.sender;
115 let region = match self.regions.writable_region(region_id) {
116 Ok(region) => region,
117 Err(e) => {
118 let _ = sender.send(Err(e));
119 return;
120 }
121 };
122
123 let copied_file_ids = request
124 .edit
125 .files_to_add
126 .iter()
127 .map(|file_meta| file_meta.file_id)
128 .collect();
129
130 region
131 .version_control
132 .apply_edit(Some(request.edit), &[], region.file_purger.clone());
133
134 let _ = sender.send(Ok(MitoCopyRegionFromResponse { copied_file_ids }));
135 }
136
137 async fn copy_region_from(
141 region: &MitoRegionRef,
142 region_metadata_loader: RegionMetadataLoader,
143 source_region_id: RegionId,
144 target_region_id: RegionId,
145 parallelism: usize,
146 ) -> Result<(Option<RegionEdit>, Vec<FileId>)> {
147 let table_dir = region.table_dir();
148 let path_type = region.path_type();
149 let region_dir = region_dir_from_table_dir(table_dir, source_region_id, path_type);
150 info!(
151 "Loading source region manifest from region dir: {region_dir}, target region: {target_region_id}"
152 );
153 let source_region_manifest = region_metadata_loader
154 .load_manifest(®ion_dir, ®ion.version().options.storage)
155 .await?
156 .context(MissingManifestSnafu {
157 region_id: source_region_id,
158 })?;
159 let mut files_to_copy = vec![];
160 let target_region_manifest = region.manifest_ctx.manifest().await;
161 let file_ids = source_region_manifest
162 .files
163 .keys()
164 .cloned()
165 .collect::<Vec<_>>();
166 debug!(
167 "source region files: {:?}, source region id: {}",
168 source_region_manifest.files, source_region_id
169 );
170 for (file_id, file_meta) in &source_region_manifest.files {
171 if !target_region_manifest.files.contains_key(file_id) {
172 let mut new_file_meta = file_meta.clone();
173 new_file_meta.region_id = target_region_id;
174 files_to_copy.push(new_file_meta);
175 }
176 }
177 if files_to_copy.is_empty() {
178 return Ok((None, file_ids));
179 }
180
181 let file_descriptors = files_to_copy
182 .iter()
183 .flat_map(|file_meta| {
184 if file_meta.exists_index() {
185 let region_index_id = file_meta.index_id();
186 let file_id = region_index_id.file_id.file_id();
187 let version = region_index_id.version;
188 let file_size = file_meta.file_size;
189 let index_file_size = file_meta.index_file_size();
190 vec![
191 FileDescriptor::Data {
192 file_id: file_meta.file_id,
193 size: file_size,
194 },
195 FileDescriptor::Index {
196 file_id,
197 version,
198 size: index_file_size,
199 },
200 ]
201 } else {
202 let file_size = file_meta.file_size;
203 vec![FileDescriptor::Data {
204 file_id: file_meta.file_id,
205 size: file_size,
206 }]
207 }
208 })
209 .collect();
210 debug!("File descriptors to copy: {:?}", file_descriptors);
211 let copier = RegionFileCopier::new(region.access_layer());
212 copier
214 .copy_files(
215 source_region_id,
216 target_region_id,
217 file_descriptors,
218 parallelism,
219 )
220 .await?;
221 let edit = RegionEdit {
222 files_to_add: files_to_copy,
223 files_to_remove: vec![],
224 timestamp_ms: Some(chrono::Utc::now().timestamp_millis()),
225 compaction_time_window: None,
226 flushed_entry_id: None,
227 flushed_sequence: None,
228 committed_sequence: None,
229 };
230 let action_list = RegionMetaActionList::with_action(RegionMetaAction::Edit(edit.clone()));
231 info!("Applying {edit:?} to region {target_region_id}, reason: CopyRegionFrom");
232 let version = region
233 .manifest_ctx
234 .manifest_manager
235 .write()
236 .await
237 .update(action_list, false)
238 .await?;
239 info!(
240 "Successfully update manifest version to {version}, region: {target_region_id}, reason: CopyRegionFrom"
241 );
242
243 Ok((Some(edit), file_ids))
244 }
245}