mito2/worker/
handle_copy_region.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use common_telemetry::{debug, error, info};
16use snafu::OptionExt;
17use store_api::region_engine::MitoCopyRegionFromResponse;
18use store_api::storage::{FileId, RegionId};
19
20use crate::error::{InvalidRequestSnafu, MissingManifestSnafu, Result};
21use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList};
22use crate::region::{FileDescriptor, MitoRegionRef, RegionFileCopier, RegionMetadataLoader};
23use crate::request::{
24    BackgroundNotify, CopyRegionFromFinished, CopyRegionFromRequest, WorkerRequest,
25};
26use crate::sst::location::region_dir_from_table_dir;
27use crate::worker::{RegionWorkerLoop, WorkerRequestWithTime};
28
29impl<S> RegionWorkerLoop<S> {
30    pub(crate) fn handle_copy_region_from_request(&mut self, request: CopyRegionFromRequest) {
31        let region_id = request.region_id;
32        let source_region_id = request.source_region_id;
33        let sender = request.sender;
34        let region = match self.regions.writable_region(region_id) {
35            Ok(region) => region,
36            Err(e) => {
37                let _ = sender.send(Err(e));
38                return;
39            }
40        };
41
42        let same_table = source_region_id.table_id() == region_id.table_id();
43        if !same_table {
44            let _ = sender.send(
45                InvalidRequestSnafu {
46                    region_id,
47                    reason: format!("Source and target regions must be from the same table, source_region_id: {source_region_id}, target_region_id: {region_id}"),
48                }
49                .fail(),
50            );
51            return;
52        }
53        if source_region_id == region_id {
54            let _ = sender.send(
55                InvalidRequestSnafu {
56                    region_id,
57                    reason: format!("Source and target regions must be different, source_region_id: {source_region_id}, target_region_id: {region_id}"),
58                }
59                .fail(),
60            );
61            return;
62        }
63
64        let region_metadata_loader =
65            RegionMetadataLoader::new(self.config.clone(), self.object_store_manager.clone());
66        let worker_sender = self.sender.clone();
67
68        common_runtime::spawn_global(async move {
69            let (region_edit, file_ids) = match Self::copy_region_from(
70                &region,
71                region_metadata_loader,
72                source_region_id,
73                region_id,
74                request.parallelism.max(1),
75            )
76            .await
77            {
78                Ok(region_files) => region_files,
79                Err(e) => {
80                    let _ = sender.send(Err(e));
81                    return;
82                }
83            };
84
85            match region_edit {
86                Some(region_edit) => {
87                    if let Err(e) = worker_sender
88                        .send(WorkerRequestWithTime::new(WorkerRequest::Background {
89                            region_id,
90                            notify: BackgroundNotify::CopyRegionFromFinished(
91                                CopyRegionFromFinished {
92                                    region_id,
93                                    edit: region_edit,
94                                    sender,
95                                },
96                            ),
97                        }))
98                        .await
99                    {
100                        error!(e; "Failed to send copy region from finished notification to worker, region_id: {}", region_id);
101                    }
102                }
103                None => {
104                    let _ = sender.send(Ok(MitoCopyRegionFromResponse {
105                        copied_file_ids: file_ids,
106                    }));
107                }
108            }
109        });
110    }
111
112    pub(crate) fn handle_copy_region_from_finished(&mut self, request: CopyRegionFromFinished) {
113        let region_id = request.region_id;
114        let sender = request.sender;
115        let region = match self.regions.writable_region(region_id) {
116            Ok(region) => region,
117            Err(e) => {
118                let _ = sender.send(Err(e));
119                return;
120            }
121        };
122
123        let copied_file_ids = request
124            .edit
125            .files_to_add
126            .iter()
127            .map(|file_meta| file_meta.file_id)
128            .collect();
129
130        region
131            .version_control
132            .apply_edit(Some(request.edit), &[], region.file_purger.clone());
133
134        let _ = sender.send(Ok(MitoCopyRegionFromResponse { copied_file_ids }));
135    }
136
137    /// Returns the region edit and the file ids that were copied from the source region to the target region.
138    ///
139    /// If no need to copy files, returns (None, file_ids).
140    async fn copy_region_from(
141        region: &MitoRegionRef,
142        region_metadata_loader: RegionMetadataLoader,
143        source_region_id: RegionId,
144        target_region_id: RegionId,
145        parallelism: usize,
146    ) -> Result<(Option<RegionEdit>, Vec<FileId>)> {
147        let table_dir = region.table_dir();
148        let path_type = region.path_type();
149        let region_dir = region_dir_from_table_dir(table_dir, source_region_id, path_type);
150        info!(
151            "Loading source region manifest from region dir: {region_dir}, target region: {target_region_id}"
152        );
153        let source_region_manifest = region_metadata_loader
154            .load_manifest(&region_dir, &region.version().options.storage)
155            .await?
156            .context(MissingManifestSnafu {
157                region_id: source_region_id,
158            })?;
159        let mut files_to_copy = vec![];
160        let target_region_manifest = region.manifest_ctx.manifest().await;
161        let file_ids = source_region_manifest
162            .files
163            .keys()
164            .cloned()
165            .collect::<Vec<_>>();
166        debug!(
167            "source region files: {:?}, source region id: {}",
168            source_region_manifest.files, source_region_id
169        );
170        for (file_id, file_meta) in &source_region_manifest.files {
171            if !target_region_manifest.files.contains_key(file_id) {
172                let mut new_file_meta = file_meta.clone();
173                new_file_meta.region_id = target_region_id;
174                files_to_copy.push(new_file_meta);
175            }
176        }
177        if files_to_copy.is_empty() {
178            return Ok((None, file_ids));
179        }
180
181        let file_descriptors = files_to_copy
182            .iter()
183            .flat_map(|file_meta| {
184                if file_meta.exists_index() {
185                    let region_index_id = file_meta.index_id();
186                    let file_id = region_index_id.file_id.file_id();
187                    let version = region_index_id.version;
188                    let file_size = file_meta.file_size;
189                    let index_file_size = file_meta.index_file_size();
190                    vec![
191                        FileDescriptor::Data {
192                            file_id: file_meta.file_id,
193                            size: file_size,
194                        },
195                        FileDescriptor::Index {
196                            file_id,
197                            version,
198                            size: index_file_size,
199                        },
200                    ]
201                } else {
202                    let file_size = file_meta.file_size;
203                    vec![FileDescriptor::Data {
204                        file_id: file_meta.file_id,
205                        size: file_size,
206                    }]
207                }
208            })
209            .collect();
210        debug!("File descriptors to copy: {:?}", file_descriptors);
211        let copier = RegionFileCopier::new(region.access_layer());
212        // TODO(weny): ensure the target region is empty.
213        copier
214            .copy_files(
215                source_region_id,
216                target_region_id,
217                file_descriptors,
218                parallelism,
219            )
220            .await?;
221        let edit = RegionEdit {
222            files_to_add: files_to_copy,
223            files_to_remove: vec![],
224            timestamp_ms: Some(chrono::Utc::now().timestamp_millis()),
225            compaction_time_window: None,
226            flushed_entry_id: None,
227            flushed_sequence: None,
228            committed_sequence: None,
229        };
230        let action_list = RegionMetaActionList::with_action(RegionMetaAction::Edit(edit.clone()));
231        info!("Applying {edit:?} to region {target_region_id}, reason: CopyRegionFrom");
232        let version = region
233            .manifest_ctx
234            .manifest_manager
235            .write()
236            .await
237            .update(action_list, false)
238            .await?;
239        info!(
240            "Successfully update manifest version to {version}, region: {target_region_id}, reason: CopyRegionFrom"
241        );
242
243        Ok((Some(edit), file_ids))
244    }
245}