mito2/worker/
handle_manifest.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Handles manifest.
16//!
17//! It updates the manifest and applies the changes to the region in background.
18
19use std::collections::{HashMap, VecDeque};
20use std::num::NonZeroU64;
21use std::sync::Arc;
22
23use common_telemetry::{info, warn};
24use store_api::logstore::LogStore;
25use store_api::storage::RegionId;
26
27use crate::cache::CacheManagerRef;
28use crate::cache::file_cache::{FileType, IndexKey};
29use crate::error::{RegionBusySnafu, RegionNotFoundSnafu, Result};
30use crate::manifest::action::{
31    RegionChange, RegionEdit, RegionMetaAction, RegionMetaActionList, RegionTruncate,
32};
33use crate::metrics::WRITE_CACHE_INFLIGHT_DOWNLOAD;
34use crate::region::version::VersionBuilder;
35use crate::region::{MitoRegionRef, RegionLeaderState, RegionRoleState};
36use crate::request::{
37    BackgroundNotify, OptionOutputTx, RegionChangeResult, RegionEditRequest, RegionEditResult,
38    RegionSyncRequest, TruncateResult, WorkerRequest, WorkerRequestWithTime,
39};
40use crate::sst::location;
41use crate::worker::{RegionWorkerLoop, WorkerListener};
42
43pub(crate) type RegionEditQueues = HashMap<RegionId, RegionEditQueue>;
44
45/// A queue for temporary store region edit requests, if the region is in the "Editing" state.
46/// When the current region edit request is completed, the next (if there exists) request in the
47/// queue will be processed.
48/// Everything is done in the region worker loop.
49pub(crate) struct RegionEditQueue {
50    region_id: RegionId,
51    requests: VecDeque<RegionEditRequest>,
52}
53
54impl RegionEditQueue {
55    const QUEUE_MAX_LEN: usize = 128;
56
57    fn new(region_id: RegionId) -> Self {
58        Self {
59            region_id,
60            requests: VecDeque::new(),
61        }
62    }
63
64    fn enqueue(&mut self, request: RegionEditRequest) {
65        if self.requests.len() > Self::QUEUE_MAX_LEN {
66            let _ = request.tx.send(
67                RegionBusySnafu {
68                    region_id: self.region_id,
69                }
70                .fail(),
71            );
72            return;
73        };
74        self.requests.push_back(request);
75    }
76
77    fn dequeue(&mut self) -> Option<RegionEditRequest> {
78        self.requests.pop_front()
79    }
80}
81
82impl<S: LogStore> RegionWorkerLoop<S> {
83    /// Handles region change result.
84    pub(crate) async fn handle_manifest_region_change_result(
85        &mut self,
86        change_result: RegionChangeResult,
87    ) {
88        let region = match self.regions.get_region(change_result.region_id) {
89            Some(region) => region,
90            None => {
91                self.reject_region_stalled_requests(&change_result.region_id);
92                change_result.sender.send(
93                    RegionNotFoundSnafu {
94                        region_id: change_result.region_id,
95                    }
96                    .fail(),
97                );
98                return;
99            }
100        };
101
102        if change_result.result.is_ok() {
103            // Apply the metadata to region's version.
104            region
105                .version_control
106                .alter_schema(change_result.new_meta, &region.memtable_builder);
107
108            let version = region.version();
109            info!(
110                "Region {} is altered, metadata is {:?}, options: {:?}",
111                region.region_id, version.metadata, version.options,
112            );
113        }
114
115        // Sets the region as writable.
116        region.switch_state_to_writable(RegionLeaderState::Altering);
117        // Sends the result.
118        change_result.sender.send(change_result.result.map(|_| 0));
119
120        // Handles the stalled requests.
121        self.handle_region_stalled_requests(&change_result.region_id)
122            .await;
123    }
124
125    /// Handles region sync request.
126    ///
127    /// Updates the manifest to at least the given version.
128    /// **Note**: The installed version may be greater than the given version.
129    pub(crate) async fn handle_region_sync(&mut self, request: RegionSyncRequest) {
130        let region_id = request.region_id;
131        let sender = request.sender;
132        let region = match self.regions.follower_region(region_id) {
133            Ok(region) => region,
134            Err(e) => {
135                let _ = sender.send(Err(e));
136                return;
137            }
138        };
139
140        let original_manifest_version = region.manifest_ctx.manifest_version().await;
141        let manifest = match region
142            .manifest_ctx
143            .install_manifest_to(request.manifest_version)
144            .await
145        {
146            Ok(manifest) => manifest,
147            Err(e) => {
148                let _ = sender.send(Err(e));
149                return;
150            }
151        };
152        let version = region.version();
153        if !version.memtables.is_empty() {
154            let current = region.version_control.current();
155            warn!(
156                "Region {} memtables is not empty, which should not happen, manifest version: {}, last entry id: {}",
157                region.region_id, manifest.manifest_version, current.last_entry_id
158            );
159        }
160        let region_options = version.options.clone();
161        let new_mutable = Arc::new(
162            region
163                .version()
164                .memtables
165                .mutable
166                .new_with_part_duration(version.compaction_time_window),
167        );
168        let metadata = manifest.metadata.clone();
169        let version = VersionBuilder::new(metadata, new_mutable)
170            .add_files(region.file_purger.clone(), manifest.files.values().cloned())
171            .flushed_entry_id(manifest.flushed_entry_id)
172            .flushed_sequence(manifest.flushed_sequence)
173            .truncated_entry_id(manifest.truncated_entry_id)
174            .compaction_time_window(manifest.compaction_time_window)
175            .options(region_options)
176            .build();
177        region.version_control.overwrite_current(Arc::new(version));
178
179        let updated = manifest.manifest_version > original_manifest_version;
180        let _ = sender.send(Ok((manifest.manifest_version, updated)));
181    }
182}
183
184impl<S> RegionWorkerLoop<S> {
185    /// Handles region edit request.
186    pub(crate) async fn handle_region_edit(&mut self, request: RegionEditRequest) {
187        let region_id = request.region_id;
188        let Some(region) = self.regions.get_region(region_id) else {
189            let _ = request.tx.send(RegionNotFoundSnafu { region_id }.fail());
190            return;
191        };
192
193        if !region.is_writable() {
194            if region.state() == RegionRoleState::Leader(RegionLeaderState::Editing) {
195                self.region_edit_queues
196                    .entry(region_id)
197                    .or_insert_with(|| RegionEditQueue::new(region_id))
198                    .enqueue(request);
199            } else {
200                let _ = request.tx.send(RegionBusySnafu { region_id }.fail());
201            }
202            return;
203        }
204
205        let RegionEditRequest {
206            region_id: _,
207            mut edit,
208            tx: sender,
209        } = request;
210        let file_sequence = region.version_control.committed_sequence() + 1;
211        edit.committed_sequence = Some(file_sequence);
212
213        // For every file added through region edit, we should fill the file sequence
214        for file in &mut edit.files_to_add {
215            file.sequence = NonZeroU64::new(file_sequence);
216        }
217
218        // Marks the region as editing.
219        if let Err(e) = region.set_editing() {
220            let _ = sender.send(Err(e));
221            return;
222        }
223
224        let request_sender = self.sender.clone();
225        let cache_manager = self.cache_manager.clone();
226        let listener = self.listener.clone();
227        // Now the region is in editing state.
228        // Updates manifest in background.
229        common_runtime::spawn_global(async move {
230            let result = edit_region(&region, edit.clone(), cache_manager, listener).await;
231            let notify = WorkerRequest::Background {
232                region_id,
233                notify: BackgroundNotify::RegionEdit(RegionEditResult {
234                    region_id,
235                    sender,
236                    edit,
237                    result,
238                }),
239            };
240
241            // We don't set state back as the worker loop is already exited.
242            if let Err(res) = request_sender
243                .send(WorkerRequestWithTime::new(notify))
244                .await
245            {
246                warn!(
247                    "Failed to send region edit result back to the worker, region_id: {}, res: {:?}",
248                    region_id, res
249                );
250            }
251        });
252    }
253
254    /// Handles region edit result.
255    pub(crate) async fn handle_region_edit_result(&mut self, edit_result: RegionEditResult) {
256        let region = match self.regions.get_region(edit_result.region_id) {
257            Some(region) => region,
258            None => {
259                let _ = edit_result.sender.send(
260                    RegionNotFoundSnafu {
261                        region_id: edit_result.region_id,
262                    }
263                    .fail(),
264                );
265                return;
266            }
267        };
268
269        let need_compaction =
270            edit_result.result.is_ok() && !edit_result.edit.files_to_add.is_empty();
271
272        if edit_result.result.is_ok() {
273            // Applies the edit to the region.
274            region.version_control.apply_edit(
275                Some(edit_result.edit),
276                &[],
277                region.file_purger.clone(),
278            );
279        }
280
281        // Sets the region as writable.
282        region.switch_state_to_writable(RegionLeaderState::Editing);
283
284        let _ = edit_result.sender.send(edit_result.result);
285
286        if let Some(edit_queue) = self.region_edit_queues.get_mut(&edit_result.region_id)
287            && let Some(request) = edit_queue.dequeue()
288        {
289            self.handle_region_edit(request).await;
290        }
291
292        if need_compaction {
293            self.schedule_compaction(&region).await;
294        }
295    }
296
297    /// Writes truncate action to the manifest and then applies it to the region in background.
298    pub(crate) fn handle_manifest_truncate_action(
299        &self,
300        region: MitoRegionRef,
301        truncate: RegionTruncate,
302        sender: OptionOutputTx,
303    ) {
304        // Marks the region as truncating.
305        // This prevents the region from being accessed by other write requests.
306        if let Err(e) = region.set_truncating() {
307            sender.send(Err(e));
308            return;
309        }
310        // Now the region is in truncating state.
311
312        let request_sender = self.sender.clone();
313        let manifest_ctx = region.manifest_ctx.clone();
314
315        // Updates manifest in background.
316        common_runtime::spawn_global(async move {
317            // Write region truncated to manifest.
318            let action_list =
319                RegionMetaActionList::with_action(RegionMetaAction::Truncate(truncate.clone()));
320
321            let result = manifest_ctx
322                .update_manifest(RegionLeaderState::Truncating, action_list)
323                .await
324                .map(|_| ());
325
326            // Sends the result back to the request sender.
327            let truncate_result = TruncateResult {
328                region_id: truncate.region_id,
329                sender,
330                result,
331                kind: truncate.kind,
332            };
333            let _ = request_sender
334                .send(WorkerRequestWithTime::new(WorkerRequest::Background {
335                    region_id: truncate.region_id,
336                    notify: BackgroundNotify::Truncate(truncate_result),
337                }))
338                .await
339                .inspect_err(|_| warn!("failed to send truncate result"));
340        });
341    }
342
343    /// Writes region change action to the manifest and then applies it to the region in background.
344    pub(crate) fn handle_manifest_region_change(
345        &self,
346        region: MitoRegionRef,
347        change: RegionChange,
348        sender: OptionOutputTx,
349    ) {
350        // Marks the region as altering.
351        if let Err(e) = region.set_altering() {
352            sender.send(Err(e));
353            return;
354        }
355        let listener = self.listener.clone();
356        let request_sender = self.sender.clone();
357        // Now the region is in altering state.
358        common_runtime::spawn_global(async move {
359            let new_meta = change.metadata.clone();
360            let action_list = RegionMetaActionList::with_action(RegionMetaAction::Change(change));
361
362            let result = region
363                .manifest_ctx
364                .update_manifest(RegionLeaderState::Altering, action_list)
365                .await
366                .map(|_| ());
367            let notify = WorkerRequest::Background {
368                region_id: region.region_id,
369                notify: BackgroundNotify::RegionChange(RegionChangeResult {
370                    region_id: region.region_id,
371                    sender,
372                    result,
373                    new_meta,
374                }),
375            };
376            listener
377                .on_notify_region_change_result_begin(region.region_id)
378                .await;
379
380            if let Err(res) = request_sender
381                .send(WorkerRequestWithTime::new(notify))
382                .await
383            {
384                warn!(
385                    "Failed to send region change result back to the worker, region_id: {}, res: {:?}",
386                    region.region_id, res
387                );
388            }
389        });
390    }
391}
392
393/// Checks the edit, writes and applies it.
394async fn edit_region(
395    region: &MitoRegionRef,
396    edit: RegionEdit,
397    cache_manager: CacheManagerRef,
398    listener: WorkerListener,
399) -> Result<()> {
400    let region_id = region.region_id;
401    if let Some(write_cache) = cache_manager.write_cache() {
402        for file_meta in &edit.files_to_add {
403            let write_cache = write_cache.clone();
404            let layer = region.access_layer.clone();
405            let listener = listener.clone();
406
407            let index_key = IndexKey::new(region_id, file_meta.file_id, FileType::Parquet);
408            let remote_path =
409                location::sst_file_path(layer.table_dir(), file_meta.file_id(), layer.path_type());
410
411            let is_index_exist = file_meta.exists_index();
412            let index_file_size = file_meta.index_file_size();
413
414            let index_file_index_key =
415                IndexKey::new(region_id, file_meta.file_id, FileType::Puffin);
416            let index_remote_path = location::index_file_path(
417                layer.table_dir(),
418                file_meta.file_id(),
419                layer.path_type(),
420            );
421
422            let file_size = file_meta.file_size;
423            common_runtime::spawn_global(async move {
424                WRITE_CACHE_INFLIGHT_DOWNLOAD.add(1);
425
426                if write_cache
427                    .download(index_key, &remote_path, layer.object_store(), file_size)
428                    .await
429                    .is_ok()
430                {
431                    // Triggers the filling of the parquet metadata cache.
432                    // The parquet file is already downloaded.
433                    let _ = write_cache
434                        .file_cache()
435                        .get_parquet_meta_data(index_key)
436                        .await;
437
438                    listener.on_file_cache_filled(index_key.file_id);
439                }
440                if is_index_exist {
441                    // also download puffin file
442                    if let Err(err) = write_cache
443                        .download(
444                            index_file_index_key,
445                            &index_remote_path,
446                            layer.object_store(),
447                            index_file_size,
448                        )
449                        .await
450                    {
451                        common_telemetry::error!(
452                            err; "Failed to download puffin file, region_id: {}, index_file_index_key: {:?}, index_remote_path: {}", region_id, index_file_index_key, index_remote_path
453                        );
454                    }
455                }
456
457                WRITE_CACHE_INFLIGHT_DOWNLOAD.sub(1);
458            });
459        }
460    }
461
462    info!("Applying {edit:?} to region {}", region_id);
463
464    let action_list = RegionMetaActionList::with_action(RegionMetaAction::Edit(edit));
465    region
466        .manifest_ctx
467        .update_manifest(RegionLeaderState::Editing, action_list)
468        .await
469        .map(|_| ())
470}