1use common_telemetry::{debug, error, info};
16use snafu::OptionExt;
17use store_api::region_engine::MitoCopyRegionFromResponse;
18use store_api::storage::{FileId, RegionId};
19
20use crate::error::{InvalidRequestSnafu, MissingManifestSnafu, Result};
21use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList};
22use crate::region::{
23 FileDescriptor, MitoRegionRef, RegionFileCopier, RegionLeaderState, RegionMetadataLoader,
24};
25use crate::request::{
26 BackgroundNotify, CopyRegionFromFinished, CopyRegionFromRequest, WorkerRequest,
27};
28use crate::sst::location::region_dir_from_table_dir;
29use crate::worker::{RegionWorkerLoop, WorkerRequestWithTime};
30
31impl<S> RegionWorkerLoop<S> {
32 pub(crate) fn handle_copy_region_from_request(&mut self, request: CopyRegionFromRequest) {
33 let region_id = request.region_id;
34 let source_region_id = request.source_region_id;
35 let sender = request.sender;
36 let region = match self.regions.writable_non_staging_region(region_id) {
37 Ok(region) => region,
38 Err(e) => {
39 let _ = sender.send(Err(e));
40 return;
41 }
42 };
43
44 let same_table = source_region_id.table_id() == region_id.table_id();
45 if !same_table {
46 let _ = sender.send(
47 InvalidRequestSnafu {
48 region_id,
49 reason: format!("Source and target regions must be from the same table, source_region_id: {source_region_id}, target_region_id: {region_id}"),
50 }
51 .fail(),
52 );
53 return;
54 }
55 if source_region_id == region_id {
56 let _ = sender.send(
57 InvalidRequestSnafu {
58 region_id,
59 reason: format!("Source and target regions must be different, source_region_id: {source_region_id}, target_region_id: {region_id}"),
60 }
61 .fail(),
62 );
63 return;
64 }
65
66 let region_metadata_loader =
67 RegionMetadataLoader::new(self.config.clone(), self.object_store_manager.clone());
68 let worker_sender = self.sender.clone();
69
70 common_runtime::spawn_global(async move {
71 let (region_edit, file_ids) = match Self::copy_region_from(
72 ®ion,
73 region_metadata_loader,
74 source_region_id,
75 region_id,
76 request.parallelism.max(1),
77 )
78 .await
79 {
80 Ok(region_files) => region_files,
81 Err(e) => {
82 let _ = sender.send(Err(e));
83 return;
84 }
85 };
86
87 match region_edit {
88 Some(region_edit) => {
89 if let Err(e) = worker_sender
90 .send(WorkerRequestWithTime::new(WorkerRequest::Background {
91 region_id,
92 notify: BackgroundNotify::CopyRegionFromFinished(
93 CopyRegionFromFinished {
94 region_id,
95 edit: region_edit,
96 sender,
97 },
98 ),
99 }))
100 .await
101 {
102 error!(e; "Failed to send copy region from finished notification to worker, region_id: {}", region_id);
103 }
104 }
105 None => {
106 let _ = sender.send(Ok(MitoCopyRegionFromResponse {
107 copied_file_ids: file_ids,
108 }));
109 }
110 }
111 });
112 }
113
114 pub(crate) fn handle_copy_region_from_finished(&mut self, request: CopyRegionFromFinished) {
115 let region_id = request.region_id;
116 let sender = request.sender;
117 let region = match self.regions.writable_region(region_id) {
118 Ok(region) => region,
119 Err(e) => {
120 let _ = sender.send(Err(e));
121 return;
122 }
123 };
124
125 let copied_file_ids = request
126 .edit
127 .files_to_add
128 .iter()
129 .map(|file_meta| file_meta.file_id)
130 .collect();
131
132 region
133 .version_control
134 .apply_edit(Some(request.edit), &[], region.file_purger.clone());
135
136 let _ = sender.send(Ok(MitoCopyRegionFromResponse { copied_file_ids }));
137 }
138
139 async fn copy_region_from(
143 region: &MitoRegionRef,
144 region_metadata_loader: RegionMetadataLoader,
145 source_region_id: RegionId,
146 target_region_id: RegionId,
147 parallelism: usize,
148 ) -> Result<(Option<RegionEdit>, Vec<FileId>)> {
149 let table_dir = region.table_dir();
150 let path_type = region.path_type();
151 let region_dir = region_dir_from_table_dir(table_dir, source_region_id, path_type);
152 info!(
153 "Loading source region manifest from region dir: {region_dir}, target region: {target_region_id}"
154 );
155 let source_region_manifest = region_metadata_loader
156 .load_manifest(®ion_dir, ®ion.version().options.storage)
157 .await?
158 .context(MissingManifestSnafu {
159 region_id: source_region_id,
160 })?;
161 let mut files_to_copy = vec![];
162 let target_region_manifest = region.manifest_ctx.manifest().await;
163 let file_ids = source_region_manifest
164 .files
165 .keys()
166 .cloned()
167 .collect::<Vec<_>>();
168 debug!(
169 "source region files: {:?}, source region id: {}",
170 source_region_manifest.files, source_region_id
171 );
172 for (file_id, file_meta) in &source_region_manifest.files {
173 if !target_region_manifest.files.contains_key(file_id) {
174 let mut new_file_meta = file_meta.clone();
175 new_file_meta.region_id = target_region_id;
176 files_to_copy.push(new_file_meta);
177 }
178 }
179 if files_to_copy.is_empty() {
180 return Ok((None, file_ids));
181 }
182
183 let file_descriptors = files_to_copy
184 .iter()
185 .flat_map(|file_meta| {
186 if file_meta.exists_index() {
187 let region_index_id = file_meta.index_id();
188 let file_id = region_index_id.file_id.file_id();
189 let version = region_index_id.version;
190 let file_size = file_meta.file_size;
191 let index_file_size = file_meta.index_file_size();
192 vec![
193 FileDescriptor::Data {
194 file_id: file_meta.file_id,
195 size: file_size,
196 },
197 FileDescriptor::Index {
198 file_id,
199 version,
200 size: index_file_size,
201 },
202 ]
203 } else {
204 let file_size = file_meta.file_size;
205 vec![FileDescriptor::Data {
206 file_id: file_meta.file_id,
207 size: file_size,
208 }]
209 }
210 })
211 .collect();
212 debug!("File descriptors to copy: {:?}", file_descriptors);
213 let copier = RegionFileCopier::new(region.access_layer());
214 copier
216 .copy_files(
217 source_region_id,
218 target_region_id,
219 file_descriptors,
220 parallelism,
221 )
222 .await?;
223 let edit = RegionEdit {
224 files_to_add: files_to_copy,
225 files_to_remove: vec![],
226 timestamp_ms: Some(chrono::Utc::now().timestamp_millis()),
227 compaction_time_window: None,
228 flushed_entry_id: None,
229 flushed_sequence: None,
230 committed_sequence: None,
231 };
232 let action_list = RegionMetaActionList::with_action(RegionMetaAction::Edit(edit.clone()));
233 info!(
234 "Applying {:?} to region {target_region_id}, reason: CopyRegionFrom",
235 edit
236 );
237 let version = region
238 .manifest_ctx
239 .update_manifest(RegionLeaderState::Writable, action_list, false)
240 .await?;
241 info!(
242 "Successfully update manifest version to {version}, region: {target_region_id}, reason: CopyRegionFrom"
243 );
244
245 Ok((Some(edit), file_ids))
246 }
247}