mito2/worker/
handle_manifest.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Handles manifest.
16//!
17//! It updates the manifest and applies the changes to the region in background.
18
19use std::collections::{HashMap, VecDeque};
20use std::sync::Arc;
21
22use common_telemetry::{info, warn};
23use store_api::logstore::LogStore;
24use store_api::storage::RegionId;
25
26use crate::cache::file_cache::{FileType, IndexKey};
27use crate::cache::CacheManagerRef;
28use crate::error::{RegionBusySnafu, RegionNotFoundSnafu, Result};
29use crate::manifest::action::{
30    RegionChange, RegionEdit, RegionMetaAction, RegionMetaActionList, RegionTruncate,
31};
32use crate::metrics::WRITE_CACHE_INFLIGHT_DOWNLOAD;
33use crate::region::version::VersionBuilder;
34use crate::region::{MitoRegionRef, RegionLeaderState, RegionRoleState};
35use crate::request::{
36    BackgroundNotify, OptionOutputTx, RegionChangeResult, RegionEditRequest, RegionEditResult,
37    RegionSyncRequest, TruncateResult, WorkerRequest, WorkerRequestWithTime,
38};
39use crate::sst::location;
40use crate::worker::{RegionWorkerLoop, WorkerListener};
41
42pub(crate) type RegionEditQueues = HashMap<RegionId, RegionEditQueue>;
43
44/// A queue for temporary store region edit requests, if the region is in the "Editing" state.
45/// When the current region edit request is completed, the next (if there exists) request in the
46/// queue will be processed.
47/// Everything is done in the region worker loop.
48pub(crate) struct RegionEditQueue {
49    region_id: RegionId,
50    requests: VecDeque<RegionEditRequest>,
51}
52
53impl RegionEditQueue {
54    const QUEUE_MAX_LEN: usize = 128;
55
56    fn new(region_id: RegionId) -> Self {
57        Self {
58            region_id,
59            requests: VecDeque::new(),
60        }
61    }
62
63    fn enqueue(&mut self, request: RegionEditRequest) {
64        if self.requests.len() > Self::QUEUE_MAX_LEN {
65            let _ = request.tx.send(
66                RegionBusySnafu {
67                    region_id: self.region_id,
68                }
69                .fail(),
70            );
71            return;
72        };
73        self.requests.push_back(request);
74    }
75
76    fn dequeue(&mut self) -> Option<RegionEditRequest> {
77        self.requests.pop_front()
78    }
79}
80
81impl<S: LogStore> RegionWorkerLoop<S> {
82    /// Handles region change result.
83    pub(crate) async fn handle_manifest_region_change_result(
84        &mut self,
85        change_result: RegionChangeResult,
86    ) {
87        let region = match self.regions.get_region(change_result.region_id) {
88            Some(region) => region,
89            None => {
90                self.reject_region_stalled_requests(&change_result.region_id);
91                change_result.sender.send(
92                    RegionNotFoundSnafu {
93                        region_id: change_result.region_id,
94                    }
95                    .fail(),
96                );
97                return;
98            }
99        };
100
101        if change_result.result.is_ok() {
102            // Apply the metadata to region's version.
103            region
104                .version_control
105                .alter_schema(change_result.new_meta, &region.memtable_builder);
106
107            let version = region.version();
108            info!(
109                "Region {} is altered, metadata is {:?}, options: {:?}",
110                region.region_id, version.metadata, version.options,
111            );
112        }
113
114        // Sets the region as writable.
115        region.switch_state_to_writable(RegionLeaderState::Altering);
116        // Sends the result.
117        change_result.sender.send(change_result.result.map(|_| 0));
118
119        // Handles the stalled requests.
120        self.handle_region_stalled_requests(&change_result.region_id)
121            .await;
122    }
123
124    /// Handles region sync request.
125    ///
126    /// Updates the manifest to at least the given version.
127    /// **Note**: The installed version may be greater than the given version.
128    pub(crate) async fn handle_region_sync(&mut self, request: RegionSyncRequest) {
129        let region_id = request.region_id;
130        let sender = request.sender;
131        let region = match self.regions.follower_region(region_id) {
132            Ok(region) => region,
133            Err(e) => {
134                let _ = sender.send(Err(e));
135                return;
136            }
137        };
138
139        let original_manifest_version = region.manifest_ctx.manifest_version().await;
140        let manifest = match region
141            .manifest_ctx
142            .install_manifest_to(request.manifest_version)
143            .await
144        {
145            Ok(manifest) => manifest,
146            Err(e) => {
147                let _ = sender.send(Err(e));
148                return;
149            }
150        };
151        let version = region.version();
152        if !version.memtables.is_empty() {
153            let current = region.version_control.current();
154            warn!(
155                "Region {} memtables is not empty, which should not happen, manifest version: {}, last entry id: {}",
156                region.region_id, manifest.manifest_version, current.last_entry_id
157            );
158        }
159        let region_options = version.options.clone();
160        let new_mutable = Arc::new(
161            region
162                .version()
163                .memtables
164                .mutable
165                .new_with_part_duration(version.compaction_time_window),
166        );
167        let metadata = manifest.metadata.clone();
168        let version = VersionBuilder::new(metadata, new_mutable)
169            .add_files(region.file_purger.clone(), manifest.files.values().cloned())
170            .flushed_entry_id(manifest.flushed_entry_id)
171            .flushed_sequence(manifest.flushed_sequence)
172            .truncated_entry_id(manifest.truncated_entry_id)
173            .compaction_time_window(manifest.compaction_time_window)
174            .options(region_options)
175            .build();
176        region.version_control.overwrite_current(Arc::new(version));
177
178        let updated = manifest.manifest_version > original_manifest_version;
179        let _ = sender.send(Ok((manifest.manifest_version, updated)));
180    }
181}
182
183impl<S> RegionWorkerLoop<S> {
184    /// Handles region edit request.
185    pub(crate) async fn handle_region_edit(&mut self, request: RegionEditRequest) {
186        let region_id = request.region_id;
187        let Some(region) = self.regions.get_region(region_id) else {
188            let _ = request.tx.send(RegionNotFoundSnafu { region_id }.fail());
189            return;
190        };
191
192        if !region.is_writable() {
193            if region.state() == RegionRoleState::Leader(RegionLeaderState::Editing) {
194                self.region_edit_queues
195                    .entry(region_id)
196                    .or_insert_with(|| RegionEditQueue::new(region_id))
197                    .enqueue(request);
198            } else {
199                let _ = request.tx.send(RegionBusySnafu { region_id }.fail());
200            }
201            return;
202        }
203
204        let RegionEditRequest {
205            region_id: _,
206            edit,
207            tx: sender,
208        } = request;
209
210        // Marks the region as editing.
211        if let Err(e) = region.set_editing() {
212            let _ = sender.send(Err(e));
213            return;
214        }
215
216        let request_sender = self.sender.clone();
217        let cache_manager = self.cache_manager.clone();
218        let listener = self.listener.clone();
219        // Now the region is in editing state.
220        // Updates manifest in background.
221        common_runtime::spawn_global(async move {
222            let result = edit_region(&region, edit.clone(), cache_manager, listener).await;
223            let notify = WorkerRequest::Background {
224                region_id,
225                notify: BackgroundNotify::RegionEdit(RegionEditResult {
226                    region_id,
227                    sender,
228                    edit,
229                    result,
230                }),
231            };
232            // We don't set state back as the worker loop is already exited.
233            if let Err(res) = request_sender
234                .send(WorkerRequestWithTime::new(notify))
235                .await
236            {
237                warn!(
238                    "Failed to send region edit result back to the worker, region_id: {}, res: {:?}",
239                    region_id, res
240                );
241            }
242        });
243    }
244
245    /// Handles region edit result.
246    pub(crate) async fn handle_region_edit_result(&mut self, edit_result: RegionEditResult) {
247        let region = match self.regions.get_region(edit_result.region_id) {
248            Some(region) => region,
249            None => {
250                let _ = edit_result.sender.send(
251                    RegionNotFoundSnafu {
252                        region_id: edit_result.region_id,
253                    }
254                    .fail(),
255                );
256                return;
257            }
258        };
259
260        let need_compaction =
261            edit_result.result.is_ok() && !edit_result.edit.files_to_add.is_empty();
262
263        if edit_result.result.is_ok() {
264            // Applies the edit to the region.
265            region
266                .version_control
267                .apply_edit(edit_result.edit, &[], region.file_purger.clone());
268        }
269
270        // Sets the region as writable.
271        region.switch_state_to_writable(RegionLeaderState::Editing);
272
273        let _ = edit_result.sender.send(edit_result.result);
274
275        if let Some(edit_queue) = self.region_edit_queues.get_mut(&edit_result.region_id) {
276            if let Some(request) = edit_queue.dequeue() {
277                self.handle_region_edit(request).await;
278            }
279        }
280
281        if need_compaction {
282            self.schedule_compaction(&region).await;
283        }
284    }
285
286    /// Writes truncate action to the manifest and then applies it to the region in background.
287    pub(crate) fn handle_manifest_truncate_action(
288        &self,
289        region: MitoRegionRef,
290        truncate: RegionTruncate,
291        sender: OptionOutputTx,
292    ) {
293        // Marks the region as truncating.
294        // This prevents the region from being accessed by other write requests.
295        if let Err(e) = region.set_truncating() {
296            sender.send(Err(e));
297            return;
298        }
299        // Now the region is in truncating state.
300
301        let request_sender = self.sender.clone();
302        let manifest_ctx = region.manifest_ctx.clone();
303
304        // Updates manifest in background.
305        common_runtime::spawn_global(async move {
306            // Write region truncated to manifest.
307            let action_list =
308                RegionMetaActionList::with_action(RegionMetaAction::Truncate(truncate.clone()));
309
310            let result = manifest_ctx
311                .update_manifest(RegionLeaderState::Truncating, action_list)
312                .await
313                .map(|_| ());
314
315            // Sends the result back to the request sender.
316            let truncate_result = TruncateResult {
317                region_id: truncate.region_id,
318                sender,
319                result,
320                truncated_entry_id: truncate.truncated_entry_id,
321                truncated_sequence: truncate.truncated_sequence,
322            };
323            let _ = request_sender
324                .send(WorkerRequestWithTime::new(WorkerRequest::Background {
325                    region_id: truncate.region_id,
326                    notify: BackgroundNotify::Truncate(truncate_result),
327                }))
328                .await
329                .inspect_err(|_| warn!("failed to send truncate result"));
330        });
331    }
332
333    /// Writes region change action to the manifest and then applies it to the region in background.
334    pub(crate) fn handle_manifest_region_change(
335        &self,
336        region: MitoRegionRef,
337        change: RegionChange,
338        sender: OptionOutputTx,
339    ) {
340        // Marks the region as altering.
341        if let Err(e) = region.set_altering() {
342            sender.send(Err(e));
343            return;
344        }
345        let listener = self.listener.clone();
346        let request_sender = self.sender.clone();
347        // Now the region is in altering state.
348        common_runtime::spawn_global(async move {
349            let new_meta = change.metadata.clone();
350            let action_list = RegionMetaActionList::with_action(RegionMetaAction::Change(change));
351
352            let result = region
353                .manifest_ctx
354                .update_manifest(RegionLeaderState::Altering, action_list)
355                .await
356                .map(|_| ());
357            let notify = WorkerRequest::Background {
358                region_id: region.region_id,
359                notify: BackgroundNotify::RegionChange(RegionChangeResult {
360                    region_id: region.region_id,
361                    sender,
362                    result,
363                    new_meta,
364                }),
365            };
366            listener
367                .on_notify_region_change_result_begin(region.region_id)
368                .await;
369
370            if let Err(res) = request_sender
371                .send(WorkerRequestWithTime::new(notify))
372                .await
373            {
374                warn!(
375                    "Failed to send region change result back to the worker, region_id: {}, res: {:?}",
376                    region.region_id, res
377                );
378            }
379        });
380    }
381}
382
383/// Checks the edit, writes and applies it.
384async fn edit_region(
385    region: &MitoRegionRef,
386    edit: RegionEdit,
387    cache_manager: CacheManagerRef,
388    listener: WorkerListener,
389) -> Result<()> {
390    let region_id = region.region_id;
391    if let Some(write_cache) = cache_manager.write_cache() {
392        for file_meta in &edit.files_to_add {
393            let write_cache = write_cache.clone();
394            let layer = region.access_layer.clone();
395            let listener = listener.clone();
396
397            let index_key = IndexKey::new(region_id, file_meta.file_id, FileType::Parquet);
398            let remote_path =
399                location::sst_file_path(layer.table_dir(), file_meta.file_id(), layer.path_type());
400
401            let is_index_exist = file_meta.exists_index();
402            let index_file_size = file_meta.index_file_size();
403
404            let index_file_index_key =
405                IndexKey::new(region_id, file_meta.file_id, FileType::Puffin);
406            let index_remote_path = location::index_file_path(
407                layer.table_dir(),
408                file_meta.file_id(),
409                layer.path_type(),
410            );
411
412            let file_size = file_meta.file_size;
413            common_runtime::spawn_global(async move {
414                WRITE_CACHE_INFLIGHT_DOWNLOAD.add(1);
415
416                if write_cache
417                    .download(index_key, &remote_path, layer.object_store(), file_size)
418                    .await
419                    .is_ok()
420                {
421                    // Triggers the filling of the parquet metadata cache.
422                    // The parquet file is already downloaded.
423                    let _ = write_cache
424                        .file_cache()
425                        .get_parquet_meta_data(index_key)
426                        .await;
427
428                    listener.on_file_cache_filled(index_key.file_id);
429                }
430                if is_index_exist {
431                    // also download puffin file
432                    if let Err(err) = write_cache
433                        .download(
434                            index_file_index_key,
435                            &index_remote_path,
436                            layer.object_store(),
437                            index_file_size,
438                        )
439                        .await
440                    {
441                        common_telemetry::error!(
442                            err; "Failed to download puffin file, region_id: {}, index_file_index_key: {:?}, index_remote_path: {}", region_id, index_file_index_key, index_remote_path
443                        );
444                    }
445                }
446
447                WRITE_CACHE_INFLIGHT_DOWNLOAD.sub(1);
448            });
449        }
450    }
451
452    info!("Applying {edit:?} to region {}", region_id);
453
454    let action_list = RegionMetaActionList::with_action(RegionMetaAction::Edit(edit));
455    region
456        .manifest_ctx
457        .update_manifest(RegionLeaderState::Editing, action_list)
458        .await
459        .map(|_| ())
460}