Skip to main content

mito2/worker/
handle_manifest.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Handles manifest.
16//!
17//! It updates the manifest and applies the changes to the region in background.
18
19use std::collections::{HashMap, VecDeque};
20use std::num::NonZeroU64;
21use std::sync::Arc;
22
23use common_telemetry::{debug, info, warn};
24use parquet::file::metadata::PageIndexPolicy;
25use snafu::ResultExt;
26use store_api::logstore::LogStore;
27use store_api::metadata::RegionMetadataRef;
28use store_api::storage::RegionId;
29
30use crate::cache::CacheManagerRef;
31use crate::cache::file_cache::{FileType, IndexKey};
32use crate::config::IndexBuildMode;
33use crate::error::{EditRegionSnafu, RegionBusySnafu, RegionNotFoundSnafu, Result};
34use crate::manifest::action::{
35    RegionChange, RegionEdit, RegionMetaAction, RegionMetaActionList, RegionTruncate,
36};
37use crate::memtable::MemtableBuilderProvider;
38use crate::metrics::WRITE_CACHE_INFLIGHT_DOWNLOAD;
39use crate::region::opener::{sanitize_region_options, version_builder_from_manifest};
40use crate::region::options::RegionOptions;
41use crate::region::version::VersionControlRef;
42use crate::region::{MitoRegionRef, RegionLeaderState, RegionRoleState};
43use crate::request::{
44    BackgroundNotify, BuildIndexRequest, OptionOutputTx, RegionChangeResult, RegionEditRequest,
45    RegionEditResult, RegionSyncRequest, TruncateResult, WorkerRequest, WorkerRequestWithTime,
46};
47use crate::sst::index::IndexBuildType;
48use crate::sst::location;
49use crate::worker::{RegionWorkerLoop, WorkerListener};
50
51pub(crate) type RegionEditQueues = HashMap<RegionId, RegionEditQueue>;
52
53/// A queue for region edit requests received while the region is already `Editing`.
54///
55/// Normal writes and bulk inserts that arrive during `Editing` use the stalled-write queue instead.
56/// When an edit completes, those writes are handled before the next queued edit starts, preserving
57/// sequence ordering between direct SST edits and WAL/memtable writes unless global reject
58/// backpressure rejects them first.
59/// Everything is done in the region worker loop.
60pub(crate) struct RegionEditQueue {
61    region_id: RegionId,
62    requests: VecDeque<RegionEditRequest>,
63}
64
65impl RegionEditQueue {
66    const QUEUE_MAX_LEN: usize = 128;
67
68    fn new(region_id: RegionId) -> Self {
69        Self {
70            region_id,
71            requests: VecDeque::new(),
72        }
73    }
74
75    fn enqueue(&mut self, request: RegionEditRequest) {
76        if self.requests.len() > Self::QUEUE_MAX_LEN {
77            request.waiters.reply_with(|| {
78                RegionBusySnafu {
79                    region_id: self.region_id,
80                }
81                .fail()
82            });
83            return;
84        };
85        self.requests.push_back(request);
86    }
87
88    fn dequeue(&mut self) -> Option<RegionEditRequest> {
89        fn can_merge(edit: &RegionEdit) -> bool {
90            // Only the `RegionEdit`:
91            // 1. contains the "raw" (file without a sequence) files to add,
92            // 2. and no `committed_sequence`,
93            // 3. and all other fields are empty,
94            // can it be merged.
95            //
96            // However, merging them means they will all share a same sequence, and if there are
97            // overlapping data in the files, the dedup is uncertain. This is a caution that must
98            // be noticed for editing region.
99            edit.files_to_add.iter().all(|f| f.sequence.is_none())
100                && edit.files_to_remove.is_empty()
101                && edit.timestamp_ms.is_none()
102                && edit.compaction_time_window.is_none()
103                && edit.flushed_entry_id.is_none()
104                && edit.flushed_sequence.is_none()
105                && edit.committed_sequence.is_none()
106        }
107
108        let mut merged = self.requests.pop_front()?;
109        if !can_merge(&merged.edit) {
110            return Some(merged);
111        }
112
113        while let Some(request) = self
114            .requests
115            .pop_front_if(|request| can_merge(&request.edit))
116        {
117            merged.edit.files_to_add.extend(request.edit.files_to_add);
118            merged.waiters.merge(request.waiters);
119        }
120        debug!(
121            "the files to add: [{}] are merged in one edit",
122            merged
123                .edit
124                .files_to_add
125                .iter()
126                .map(|x| x.file_id.to_string())
127                .collect::<Vec<_>>()
128                .join(", ")
129        );
130        Some(merged)
131    }
132
133    fn is_empty(&self) -> bool {
134        self.requests.is_empty()
135    }
136
137    fn reject_all_as_not_found(mut self) {
138        while let Some(request) = self.requests.pop_front() {
139            request.waiters.reply_with(|| {
140                RegionNotFoundSnafu {
141                    region_id: self.region_id,
142                }
143                .fail()
144            });
145        }
146    }
147}
148
149impl<S: LogStore> RegionWorkerLoop<S> {
150    /// Rejects queued region edit requests as region not found.
151    pub(crate) fn reject_region_edit_queue_as_not_found(&mut self, region_id: RegionId) {
152        if let Some(edit_queue) = self.region_edit_queues.remove(&region_id) {
153            edit_queue.reject_all_as_not_found();
154        }
155    }
156
157    /// Handles region change result.
158    pub(crate) async fn handle_manifest_region_change_result(
159        &mut self,
160        change_result: RegionChangeResult,
161    ) {
162        let region = match self.regions.get_region(change_result.region_id) {
163            Some(region) => region,
164            None => {
165                self.reject_region_stalled_requests(&change_result.region_id);
166                change_result.sender.send(
167                    RegionNotFoundSnafu {
168                        region_id: change_result.region_id,
169                    }
170                    .fail(),
171                );
172                return;
173            }
174        };
175
176        if change_result.result.is_ok() {
177            // Updates the region metadata and format.
178            Self::update_region_version(
179                &region.version_control,
180                change_result.new_meta,
181                change_result.new_options,
182                &self.memtable_builder_provider,
183            );
184        }
185
186        // Sets the region as writable.
187        region.switch_state_to_writable(RegionLeaderState::Altering);
188        // Sends the result.
189        change_result.sender.send(change_result.result.map(|_| 0));
190
191        // In async mode, rebuild index after index metadata changed.
192        if self.config.index.build_mode == IndexBuildMode::Async && change_result.need_index {
193            self.handle_rebuild_index(
194                BuildIndexRequest {
195                    region_id: region.region_id,
196                    build_type: IndexBuildType::SchemaChange,
197                    file_metas: Vec::new(),
198                },
199                OptionOutputTx::new(None),
200            )
201            .await;
202        }
203        // Handles the stalled requests.
204        self.handle_region_stalled_requests(&change_result.region_id, true)
205            .await;
206    }
207
208    /// Handles region sync request.
209    ///
210    /// Updates the manifest to at least the given version.
211    /// **Note**: The installed version may be greater than the given version.
212    pub(crate) async fn handle_region_sync(&mut self, request: RegionSyncRequest) {
213        let region_id = request.region_id;
214        let sender = request.sender;
215        let region = match self.regions.follower_region(region_id) {
216            Ok(region) => region,
217            Err(e) => {
218                let _ = sender.send(Err(e));
219                return;
220            }
221        };
222
223        let original_manifest_version = region.manifest_ctx.manifest_version().await;
224        let manifest = match region
225            .manifest_ctx
226            .install_manifest_to(request.manifest_version)
227            .await
228        {
229            Ok(manifest) => manifest,
230            Err(e) => {
231                let _ = sender.send(Err(e));
232                return;
233            }
234        };
235        let version = region.version();
236        let mut region_options = version.options.clone();
237        let old_format = region_options.sst_format.unwrap_or_default();
238        // Updates the region options with the manifest.
239        sanitize_region_options(&manifest, &mut region_options);
240        if !version.memtables.is_empty() {
241            let current = region.version_control.current();
242            warn!(
243                "Region {} memtables is not empty, which should not happen, manifest version: {}, last entry id: {}",
244                region.region_id, manifest.manifest_version, current.last_entry_id
245            );
246        }
247
248        // We should sanitize the region options before creating a new memtable.
249        let memtable_builder = if old_format != region_options.sst_format.unwrap_or_default() {
250            // Format changed, also needs to replace the memtable builder.
251            Some(
252                self.memtable_builder_provider
253                    .builder_for_options(&region_options),
254            )
255        } else {
256            None
257        };
258        let new_mutable = Arc::new(
259            region
260                .version()
261                .memtables
262                .mutable
263                .new_with_part_duration(version.compaction_time_window, memtable_builder),
264        );
265        // Here it assumes the leader has backfilled the partition_expr of the metadata.
266        let metadata = manifest.metadata.clone();
267
268        let version_builder = version_builder_from_manifest(
269            &manifest,
270            metadata,
271            region.file_purger.clone(),
272            new_mutable,
273            region_options,
274        );
275        let version = version_builder.build();
276        region.version_control.overwrite_current(Arc::new(version));
277
278        let updated = manifest.manifest_version > original_manifest_version;
279        let _ = sender.send(Ok((manifest.manifest_version, updated)));
280    }
281}
282
283impl<S: LogStore> RegionWorkerLoop<S> {
284    /// Handles region edit request.
285    pub(crate) fn handle_region_edit(&mut self, request: RegionEditRequest) {
286        let region_id = request.region_id;
287        let Some(region) = self.regions.get_region(region_id) else {
288            request
289                .waiters
290                .reply_with(|| RegionNotFoundSnafu { region_id }.fail());
291            return;
292        };
293
294        if !region.is_writable() {
295            if region.state() == RegionRoleState::Leader(RegionLeaderState::Editing) {
296                self.region_edit_queues
297                    .entry(region_id)
298                    .or_insert_with(|| RegionEditQueue::new(region_id))
299                    .enqueue(request);
300            } else {
301                request
302                    .waiters
303                    .reply_with(|| RegionBusySnafu { region_id }.fail());
304            }
305            return;
306        }
307
308        let RegionEditRequest {
309            region_id: _,
310            mut edit,
311            waiters,
312            preload_sst_cache,
313        } = request;
314        let file_sequence = region.version_control.committed_sequence() + 1;
315        edit.committed_sequence = Some(file_sequence);
316
317        // For every file added through region edit, we should fill the file sequence
318        for file in &mut edit.files_to_add {
319            file.sequence = NonZeroU64::new(file_sequence);
320        }
321
322        // Allow retrieving `is_staging` before spawn the edit region task.
323        let is_staging = region.is_staging();
324        let expect_state = if is_staging {
325            RegionLeaderState::Staging
326        } else {
327            RegionLeaderState::Writable
328        };
329        // Marks the region as editing.
330        if let Err(e) = region.set_editing(expect_state) {
331            let e = Arc::new(e);
332            waiters.reply_with(|| Err(e.clone()).context(EditRegionSnafu { region_id }));
333            return;
334        }
335
336        let request_sender = self.sender.clone();
337        let cache_manager = self.cache_manager.clone();
338        let listener = self.listener.clone();
339        // Now the region is in editing state.
340        // Updates manifest in background.
341        common_runtime::spawn_global(async move {
342            let result = edit_region(
343                &region,
344                edit.clone(),
345                cache_manager,
346                listener,
347                is_staging,
348                preload_sst_cache,
349            )
350            .await
351            .map_err(Arc::new);
352            let notify = WorkerRequest::Background {
353                region_id,
354                notify: BackgroundNotify::RegionEdit(RegionEditResult {
355                    region_id,
356                    waiters,
357                    edit,
358                    result,
359                    // we always need to restore region state after region edit
360                    update_region_state: true,
361                    is_staging,
362                }),
363            };
364
365            // We don't set state back as the worker loop is already exited.
366            if let Err(res) = request_sender
367                .send(WorkerRequestWithTime::new(notify))
368                .await
369            {
370                warn!(
371                    "Failed to send region edit result back to the worker, region_id: {}, res: {:?}",
372                    region_id, res
373                );
374            }
375        });
376    }
377
378    /// Handles region edit result.
379    pub(crate) async fn handle_region_edit_result(&mut self, edit_result: RegionEditResult) {
380        let region = match self.regions.get_region(edit_result.region_id) {
381            Some(region) => region,
382            None => {
383                // Fail writes stalled behind this edit if the region was removed before the
384                // edit-completion notification reached the worker.
385                self.fail_region_stalled_requests_as_not_found(&edit_result.region_id);
386                self.reject_region_edit_queue_as_not_found(edit_result.region_id);
387
388                edit_result.waiters.reply_with(|| {
389                    RegionNotFoundSnafu {
390                        region_id: edit_result.region_id,
391                    }
392                    .fail()
393                });
394                return;
395            }
396        };
397
398        let need_compaction = if edit_result.is_staging {
399            if edit_result.update_region_state {
400                // For staging regions, edits are not applied immediately,
401                // as they remain invisible until the region exits the staging state.
402                region.switch_state_to_staging(RegionLeaderState::Editing);
403            }
404
405            false
406        } else {
407            let need_compaction = self.config.schedule_compaction_after_edit
408                && edit_result.result.is_ok()
409                && !edit_result.edit.files_to_add.is_empty();
410
411            // Only apply the edit if the result is ok and region is not in staging state.
412            if edit_result.result.is_ok() {
413                // Applies the edit to the region.
414                region.version_control.apply_edit(
415                    Some(edit_result.edit),
416                    &[],
417                    region.file_purger.clone(),
418                );
419            }
420            if edit_result.update_region_state {
421                region.switch_state_to_writable(RegionLeaderState::Editing);
422            }
423
424            need_compaction
425        };
426
427        edit_result
428            .waiters
429            .reply_with(|| match &edit_result.result {
430                Ok(()) => Ok(()),
431                Err(e) => Err(e.clone()).context(EditRegionSnafu {
432                    region_id: edit_result.region_id,
433                }),
434            });
435
436        if edit_result.update_region_state {
437            // Writes stalled specifically by this edit are handled before the next queued edit.
438            // Otherwise the next edit could reserve a committed sequence before those writes.
439            self.handle_region_stalled_requests(&edit_result.region_id, false)
440                .await;
441        }
442
443        let next_request =
444            if let Some(edit_queue) = self.region_edit_queues.get_mut(&edit_result.region_id) {
445                let request = edit_queue.dequeue();
446                if edit_queue.is_empty() {
447                    self.region_edit_queues.remove(&edit_result.region_id);
448                }
449                request
450            } else {
451                None
452            };
453        if let Some(request) = next_request {
454            self.handle_region_edit(request);
455        }
456
457        if need_compaction {
458            self.schedule_compaction(&region).await;
459        }
460    }
461
462    /// Writes truncate action to the manifest and then applies it to the region in background.
463    pub(crate) fn handle_manifest_truncate_action(
464        &self,
465        region: MitoRegionRef,
466        truncate: RegionTruncate,
467        sender: OptionOutputTx,
468    ) {
469        // Marks the region as truncating.
470        // This prevents the region from being accessed by other write requests.
471        if let Err(e) = region.set_truncating() {
472            sender.send(Err(e));
473            return;
474        }
475        // Now the region is in truncating state.
476
477        let request_sender = self.sender.clone();
478        let manifest_ctx = region.manifest_ctx.clone();
479        let is_staging = region.is_staging();
480
481        // Updates manifest in background.
482        common_runtime::spawn_global(async move {
483            // Write region truncated to manifest.
484            let action_list =
485                RegionMetaActionList::with_action(RegionMetaAction::Truncate(truncate.clone()));
486
487            let result = manifest_ctx
488                .update_manifest(RegionLeaderState::Truncating, action_list, is_staging)
489                .await
490                .map(|_| ());
491
492            // Sends the result back to the request sender.
493            let truncate_result = TruncateResult {
494                region_id: truncate.region_id,
495                sender,
496                result,
497                kind: truncate.kind,
498            };
499            let _ = request_sender
500                .send(WorkerRequestWithTime::new(WorkerRequest::Background {
501                    region_id: truncate.region_id,
502                    notify: BackgroundNotify::Truncate(truncate_result),
503                }))
504                .await
505                .inspect_err(|_| warn!("failed to send truncate result"));
506        });
507    }
508
509    /// Writes region change action to the manifest and then applies it to the region in background.
510    pub(crate) fn handle_manifest_region_change(
511        &self,
512        region: MitoRegionRef,
513        change: RegionChange,
514        need_index: bool,
515        new_options: Option<RegionOptions>,
516        sender: OptionOutputTx,
517    ) {
518        // Marks the region as altering.
519        if let Err(e) = region.set_altering() {
520            sender.send(Err(e));
521            return;
522        }
523        let listener = self.listener.clone();
524        let request_sender = self.sender.clone();
525        let is_staging = region.is_staging();
526        // Now the region is in altering state.
527        common_runtime::spawn_global(async move {
528            let new_meta = change.metadata.clone();
529            let action_list = RegionMetaActionList::with_action(RegionMetaAction::Change(change));
530
531            let result = region
532                .manifest_ctx
533                .update_manifest(RegionLeaderState::Altering, action_list, is_staging)
534                .await
535                .map(|_| ());
536            let notify = WorkerRequest::Background {
537                region_id: region.region_id,
538                notify: BackgroundNotify::RegionChange(RegionChangeResult {
539                    region_id: region.region_id,
540                    sender,
541                    result,
542                    new_meta,
543                    need_index,
544                    new_options,
545                }),
546            };
547            listener
548                .on_notify_region_change_result_begin(region.region_id)
549                .await;
550
551            if let Err(res) = request_sender
552                .send(WorkerRequestWithTime::new(notify))
553                .await
554            {
555                warn!(
556                    "Failed to send region change result back to the worker, region_id: {}, res: {:?}",
557                    region.region_id, res
558                );
559            }
560        });
561    }
562
563    fn update_region_version(
564        version_control: &VersionControlRef,
565        new_meta: RegionMetadataRef,
566        new_options: Option<RegionOptions>,
567        memtable_builder_provider: &MemtableBuilderProvider,
568    ) {
569        let options_changed = new_options.is_some();
570        let region_id = new_meta.region_id;
571        if let Some(new_options) = new_options {
572            // Needs to update the region with new format and memtables.
573            // Creates a new memtable builder for the new options as it may change the memtable type.
574            let new_memtable_builder = memtable_builder_provider.builder_for_options(&new_options);
575            version_control.alter_schema_and_format(new_meta, new_options, new_memtable_builder);
576        } else {
577            // Only changes the schema.
578            version_control.alter_schema(new_meta);
579        }
580
581        let version_data = version_control.current();
582        let version = version_data.version;
583        info!(
584            "Region {} is altered, metadata is {:?}, options: {:?}, options_changed: {}",
585            region_id, version.metadata, version.options, options_changed,
586        );
587    }
588}
589
590/// Checks the edit, writes and applies it.
591async fn edit_region(
592    region: &MitoRegionRef,
593    edit: RegionEdit,
594    cache_manager: CacheManagerRef,
595    listener: WorkerListener,
596    is_staging: bool,
597    preload_sst_cache: bool,
598) -> Result<()> {
599    let region_id = region.region_id;
600    if let Some(write_cache) = cache_manager.write_cache()
601        && preload_sst_cache
602    {
603        for file_meta in &edit.files_to_add {
604            let write_cache = write_cache.clone();
605            let layer = region.access_layer.clone();
606            let listener = listener.clone();
607
608            let index_key = IndexKey::new(region_id, file_meta.file_id, FileType::Parquet);
609            let remote_path =
610                location::sst_file_path(layer.table_dir(), file_meta.file_id(), layer.path_type());
611
612            let is_index_exist = file_meta.exists_index();
613            let index_file_size = file_meta.index_file_size();
614
615            let index_file_index_key = IndexKey::new(
616                region_id,
617                file_meta.index_id().file_id.file_id(),
618                FileType::Puffin(file_meta.index_version),
619            );
620            let index_remote_path = location::index_file_path(
621                layer.table_dir(),
622                file_meta.index_id(),
623                layer.path_type(),
624            );
625
626            let file_size = file_meta.file_size;
627            common_runtime::spawn_global(async move {
628                WRITE_CACHE_INFLIGHT_DOWNLOAD.add(1);
629
630                let parquet_cached = write_cache
631                    .download_if_absent(index_key, &remote_path, layer.object_store(), file_size)
632                    .await;
633
634                if parquet_cached.is_ok() {
635                    // Triggers the filling of the parquet metadata cache.
636                    // The parquet file is already downloaded.
637                    let mut cache_metrics = Default::default();
638                    let _ = write_cache
639                        .file_cache()
640                        .get_parquet_meta_data(
641                            index_key,
642                            &mut cache_metrics,
643                            PageIndexPolicy::Optional,
644                        )
645                        .await;
646
647                    if matches!(parquet_cached, Ok(true)) {
648                        listener.on_file_cache_filled(index_key.file_id);
649                    }
650                }
651                if is_index_exist {
652                    // also download puffin file
653                    if let Err(err) = write_cache
654                        .download(
655                            index_file_index_key,
656                            &index_remote_path,
657                            layer.object_store(),
658                            index_file_size,
659                        )
660                        .await
661                    {
662                        common_telemetry::error!(
663                            err; "Failed to download puffin file, region_id: {}, index_file_index_key: {:?}, index_remote_path: {}", region_id, index_file_index_key, index_remote_path
664                        );
665                    }
666                }
667
668                WRITE_CACHE_INFLIGHT_DOWNLOAD.sub(1);
669            });
670        }
671    }
672
673    info!(
674        "Applying {edit:?} to region {}, is_staging: {}",
675        region_id, is_staging
676    );
677
678    let action_list = RegionMetaActionList::with_action(RegionMetaAction::Edit(edit));
679    region
680        .manifest_ctx
681        .update_manifest(RegionLeaderState::Editing, action_list, is_staging)
682        .await
683        .map(|_| ())
684}