Skip to main content

mito2/worker/
handle_copy_region.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use common_telemetry::{debug, error, info};
16use snafu::OptionExt;
17use store_api::region_engine::MitoCopyRegionFromResponse;
18use store_api::storage::{FileId, RegionId};
19
20use crate::error::{InvalidRequestSnafu, MissingManifestSnafu, Result};
21use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList};
22use crate::region::{
23    FileDescriptor, MitoRegionRef, RegionFileCopier, RegionLeaderState, RegionMetadataLoader,
24};
25use crate::request::{
26    BackgroundNotify, CopyRegionFromFinished, CopyRegionFromRequest, WorkerRequest,
27};
28use crate::sst::location::region_dir_from_table_dir;
29use crate::worker::{RegionWorkerLoop, WorkerRequestWithTime};
30
31impl<S> RegionWorkerLoop<S> {
32    pub(crate) fn handle_copy_region_from_request(&mut self, request: CopyRegionFromRequest) {
33        let region_id = request.region_id;
34        let source_region_id = request.source_region_id;
35        let sender = request.sender;
36        let region = match self.regions.writable_non_staging_region(region_id) {
37            Ok(region) => region,
38            Err(e) => {
39                let _ = sender.send(Err(e));
40                return;
41            }
42        };
43
44        let same_table = source_region_id.table_id() == region_id.table_id();
45        if !same_table {
46            let _ = sender.send(
47                InvalidRequestSnafu {
48                    region_id,
49                    reason: format!("Source and target regions must be from the same table, source_region_id: {source_region_id}, target_region_id: {region_id}"),
50                }
51                .fail(),
52            );
53            return;
54        }
55        if source_region_id == region_id {
56            let _ = sender.send(
57                InvalidRequestSnafu {
58                    region_id,
59                    reason: format!("Source and target regions must be different, source_region_id: {source_region_id}, target_region_id: {region_id}"),
60                }
61                .fail(),
62            );
63            return;
64        }
65
66        let region_metadata_loader =
67            RegionMetadataLoader::new(self.config.clone(), self.object_store_manager.clone());
68        let worker_sender = self.sender.clone();
69
70        common_runtime::spawn_global(async move {
71            let (region_edit, file_ids) = match Self::copy_region_from(
72                &region,
73                region_metadata_loader,
74                source_region_id,
75                region_id,
76                request.parallelism.max(1),
77            )
78            .await
79            {
80                Ok(region_files) => region_files,
81                Err(e) => {
82                    let _ = sender.send(Err(e));
83                    return;
84                }
85            };
86
87            match region_edit {
88                Some(region_edit) => {
89                    if let Err(e) = worker_sender
90                        .send(WorkerRequestWithTime::new(WorkerRequest::Background {
91                            region_id,
92                            notify: BackgroundNotify::CopyRegionFromFinished(
93                                CopyRegionFromFinished {
94                                    region_id,
95                                    edit: region_edit,
96                                    sender,
97                                },
98                            ),
99                        }))
100                        .await
101                    {
102                        error!(e; "Failed to send copy region from finished notification to worker, region_id: {}", region_id);
103                    }
104                }
105                None => {
106                    let _ = sender.send(Ok(MitoCopyRegionFromResponse {
107                        copied_file_ids: file_ids,
108                    }));
109                }
110            }
111        });
112    }
113
114    pub(crate) fn handle_copy_region_from_finished(&mut self, request: CopyRegionFromFinished) {
115        let region_id = request.region_id;
116        let sender = request.sender;
117        let region = match self.regions.writable_region(region_id) {
118            Ok(region) => region,
119            Err(e) => {
120                let _ = sender.send(Err(e));
121                return;
122            }
123        };
124
125        let copied_file_ids = request
126            .edit
127            .files_to_add
128            .iter()
129            .map(|file_meta| file_meta.file_id)
130            .collect();
131
132        region
133            .version_control
134            .apply_edit(Some(request.edit), &[], region.file_purger.clone());
135
136        let _ = sender.send(Ok(MitoCopyRegionFromResponse { copied_file_ids }));
137    }
138
139    /// Returns the region edit and the file ids that were copied from the source region to the target region.
140    ///
141    /// If no need to copy files, returns (None, file_ids).
142    async fn copy_region_from(
143        region: &MitoRegionRef,
144        region_metadata_loader: RegionMetadataLoader,
145        source_region_id: RegionId,
146        target_region_id: RegionId,
147        parallelism: usize,
148    ) -> Result<(Option<RegionEdit>, Vec<FileId>)> {
149        let table_dir = region.table_dir();
150        let path_type = region.path_type();
151        let region_dir = region_dir_from_table_dir(table_dir, source_region_id, path_type);
152        info!(
153            "Loading source region manifest from region dir: {region_dir}, target region: {target_region_id}"
154        );
155        let source_region_manifest = region_metadata_loader
156            .load_manifest(&region_dir, &region.version().options.storage)
157            .await?
158            .context(MissingManifestSnafu {
159                region_id: source_region_id,
160            })?;
161        let mut files_to_copy = vec![];
162        let target_region_manifest = region.manifest_ctx.manifest().await;
163        let file_ids = source_region_manifest
164            .files
165            .keys()
166            .cloned()
167            .collect::<Vec<_>>();
168        debug!(
169            "source region files: {:?}, source region id: {}",
170            source_region_manifest.files, source_region_id
171        );
172        for (file_id, file_meta) in &source_region_manifest.files {
173            if !target_region_manifest.files.contains_key(file_id) {
174                let mut new_file_meta = file_meta.clone();
175                new_file_meta.region_id = target_region_id;
176                files_to_copy.push(new_file_meta);
177            }
178        }
179        if files_to_copy.is_empty() {
180            return Ok((None, file_ids));
181        }
182
183        let file_descriptors = files_to_copy
184            .iter()
185            .flat_map(|file_meta| {
186                if file_meta.exists_index() {
187                    let region_index_id = file_meta.index_id();
188                    let file_id = region_index_id.file_id.file_id();
189                    let version = region_index_id.version;
190                    let file_size = file_meta.file_size;
191                    let index_file_size = file_meta.index_file_size();
192                    vec![
193                        FileDescriptor::Data {
194                            file_id: file_meta.file_id,
195                            size: file_size,
196                        },
197                        FileDescriptor::Index {
198                            file_id,
199                            version,
200                            size: index_file_size,
201                        },
202                    ]
203                } else {
204                    let file_size = file_meta.file_size;
205                    vec![FileDescriptor::Data {
206                        file_id: file_meta.file_id,
207                        size: file_size,
208                    }]
209                }
210            })
211            .collect();
212        debug!("File descriptors to copy: {:?}", file_descriptors);
213        let copier = RegionFileCopier::new(region.access_layer());
214        // TODO(weny): ensure the target region is empty.
215        copier
216            .copy_files(
217                source_region_id,
218                target_region_id,
219                file_descriptors,
220                parallelism,
221            )
222            .await?;
223        let edit = RegionEdit {
224            files_to_add: files_to_copy,
225            files_to_remove: vec![],
226            timestamp_ms: Some(chrono::Utc::now().timestamp_millis()),
227            compaction_time_window: None,
228            flushed_entry_id: None,
229            flushed_sequence: None,
230            committed_sequence: None,
231        };
232        let action_list = RegionMetaActionList::with_action(RegionMetaAction::Edit(edit.clone()));
233        info!(
234            "Applying {:?} to region {target_region_id}, reason: CopyRegionFrom",
235            edit
236        );
237        let version = region
238            .manifest_ctx
239            .update_manifest(RegionLeaderState::Writable, action_list, false)
240            .await?;
241        info!(
242            "Successfully update manifest version to {version}, region: {target_region_id}, reason: CopyRegionFrom"
243        );
244
245        Ok((Some(edit), file_ids))
246    }
247}