Skip to main content

mito2/worker/
handle_manifest.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Handles manifest.
16//!
17//! It updates the manifest and applies the changes to the region in background.
18
19use std::collections::{HashMap, VecDeque};
20use std::num::NonZeroU64;
21use std::sync::Arc;
22
23use common_telemetry::{info, warn};
24use parquet::file::metadata::PageIndexPolicy;
25use store_api::logstore::LogStore;
26use store_api::metadata::RegionMetadataRef;
27use store_api::storage::RegionId;
28
29use crate::cache::CacheManagerRef;
30use crate::cache::file_cache::{FileType, IndexKey};
31use crate::config::IndexBuildMode;
32use crate::error::{RegionBusySnafu, RegionNotFoundSnafu, Result};
33use crate::manifest::action::{
34    RegionChange, RegionEdit, RegionMetaAction, RegionMetaActionList, RegionTruncate,
35};
36use crate::memtable::MemtableBuilderProvider;
37use crate::metrics::WRITE_CACHE_INFLIGHT_DOWNLOAD;
38use crate::region::opener::{sanitize_region_options, version_builder_from_manifest};
39use crate::region::options::RegionOptions;
40use crate::region::version::VersionControlRef;
41use crate::region::{MitoRegionRef, RegionLeaderState, RegionRoleState};
42use crate::request::{
43    BackgroundNotify, BuildIndexRequest, OptionOutputTx, RegionChangeResult, RegionEditRequest,
44    RegionEditResult, RegionSyncRequest, TruncateResult, WorkerRequest, WorkerRequestWithTime,
45};
46use crate::sst::index::IndexBuildType;
47use crate::sst::location;
48use crate::worker::{RegionWorkerLoop, WorkerListener};
49
50pub(crate) type RegionEditQueues = HashMap<RegionId, RegionEditQueue>;
51
52/// A queue for region edit requests received while the region is already `Editing`.
53///
54/// Normal writes and bulk inserts that arrive during `Editing` use the stalled-write queue instead.
55/// When an edit completes, those writes are handled before the next queued edit starts, preserving
56/// sequence ordering between direct SST edits and WAL/memtable writes unless global reject
57/// backpressure rejects them first.
58/// Everything is done in the region worker loop.
59pub(crate) struct RegionEditQueue {
60    region_id: RegionId,
61    requests: VecDeque<RegionEditRequest>,
62}
63
64impl RegionEditQueue {
65    const QUEUE_MAX_LEN: usize = 128;
66
67    fn new(region_id: RegionId) -> Self {
68        Self {
69            region_id,
70            requests: VecDeque::new(),
71        }
72    }
73
74    fn enqueue(&mut self, request: RegionEditRequest) {
75        if self.requests.len() > Self::QUEUE_MAX_LEN {
76            let _ = request.tx.send(
77                RegionBusySnafu {
78                    region_id: self.region_id,
79                }
80                .fail(),
81            );
82            return;
83        };
84        self.requests.push_back(request);
85    }
86
87    fn dequeue(&mut self) -> Option<RegionEditRequest> {
88        self.requests.pop_front()
89    }
90
91    fn is_empty(&self) -> bool {
92        self.requests.is_empty()
93    }
94
95    fn reject_all_as_not_found(mut self) {
96        while let Some(request) = self.requests.pop_front() {
97            let _ = request.tx.send(
98                RegionNotFoundSnafu {
99                    region_id: self.region_id,
100                }
101                .fail(),
102            );
103        }
104    }
105}
106
107impl<S: LogStore> RegionWorkerLoop<S> {
108    /// Rejects queued region edit requests as region not found.
109    pub(crate) fn reject_region_edit_queue_as_not_found(&mut self, region_id: RegionId) {
110        if let Some(edit_queue) = self.region_edit_queues.remove(&region_id) {
111            edit_queue.reject_all_as_not_found();
112        }
113    }
114
115    /// Handles region change result.
116    pub(crate) async fn handle_manifest_region_change_result(
117        &mut self,
118        change_result: RegionChangeResult,
119    ) {
120        let region = match self.regions.get_region(change_result.region_id) {
121            Some(region) => region,
122            None => {
123                self.reject_region_stalled_requests(&change_result.region_id);
124                change_result.sender.send(
125                    RegionNotFoundSnafu {
126                        region_id: change_result.region_id,
127                    }
128                    .fail(),
129                );
130                return;
131            }
132        };
133
134        if change_result.result.is_ok() {
135            // Updates the region metadata and format.
136            Self::update_region_version(
137                &region.version_control,
138                change_result.new_meta,
139                change_result.new_options,
140                &self.memtable_builder_provider,
141            );
142        }
143
144        // Sets the region as writable.
145        region.switch_state_to_writable(RegionLeaderState::Altering);
146        // Sends the result.
147        change_result.sender.send(change_result.result.map(|_| 0));
148
149        // In async mode, rebuild index after index metadata changed.
150        if self.config.index.build_mode == IndexBuildMode::Async && change_result.need_index {
151            self.handle_rebuild_index(
152                BuildIndexRequest {
153                    region_id: region.region_id,
154                    build_type: IndexBuildType::SchemaChange,
155                    file_metas: Vec::new(),
156                },
157                OptionOutputTx::new(None),
158            )
159            .await;
160        }
161        // Handles the stalled requests.
162        self.handle_region_stalled_requests(&change_result.region_id, true)
163            .await;
164    }
165
166    /// Handles region sync request.
167    ///
168    /// Updates the manifest to at least the given version.
169    /// **Note**: The installed version may be greater than the given version.
170    pub(crate) async fn handle_region_sync(&mut self, request: RegionSyncRequest) {
171        let region_id = request.region_id;
172        let sender = request.sender;
173        let region = match self.regions.follower_region(region_id) {
174            Ok(region) => region,
175            Err(e) => {
176                let _ = sender.send(Err(e));
177                return;
178            }
179        };
180
181        let original_manifest_version = region.manifest_ctx.manifest_version().await;
182        let manifest = match region
183            .manifest_ctx
184            .install_manifest_to(request.manifest_version)
185            .await
186        {
187            Ok(manifest) => manifest,
188            Err(e) => {
189                let _ = sender.send(Err(e));
190                return;
191            }
192        };
193        let version = region.version();
194        let mut region_options = version.options.clone();
195        let old_format = region_options.sst_format.unwrap_or_default();
196        // Updates the region options with the manifest.
197        sanitize_region_options(&manifest, &mut region_options);
198        if !version.memtables.is_empty() {
199            let current = region.version_control.current();
200            warn!(
201                "Region {} memtables is not empty, which should not happen, manifest version: {}, last entry id: {}",
202                region.region_id, manifest.manifest_version, current.last_entry_id
203            );
204        }
205
206        // We should sanitize the region options before creating a new memtable.
207        let memtable_builder = if old_format != region_options.sst_format.unwrap_or_default() {
208            // Format changed, also needs to replace the memtable builder.
209            Some(
210                self.memtable_builder_provider
211                    .builder_for_options(&region_options),
212            )
213        } else {
214            None
215        };
216        let new_mutable = Arc::new(
217            region
218                .version()
219                .memtables
220                .mutable
221                .new_with_part_duration(version.compaction_time_window, memtable_builder),
222        );
223        // Here it assumes the leader has backfilled the partition_expr of the metadata.
224        let metadata = manifest.metadata.clone();
225
226        let version_builder = version_builder_from_manifest(
227            &manifest,
228            metadata,
229            region.file_purger.clone(),
230            new_mutable,
231            region_options,
232        );
233        let version = version_builder.build();
234        region.version_control.overwrite_current(Arc::new(version));
235
236        let updated = manifest.manifest_version > original_manifest_version;
237        let _ = sender.send(Ok((manifest.manifest_version, updated)));
238    }
239}
240
241impl<S: LogStore> RegionWorkerLoop<S> {
242    /// Handles region edit request.
243    pub(crate) fn handle_region_edit(&mut self, request: RegionEditRequest) {
244        let region_id = request.region_id;
245        let Some(region) = self.regions.get_region(region_id) else {
246            let _ = request.tx.send(RegionNotFoundSnafu { region_id }.fail());
247            return;
248        };
249
250        if !region.is_writable() {
251            if region.state() == RegionRoleState::Leader(RegionLeaderState::Editing) {
252                self.region_edit_queues
253                    .entry(region_id)
254                    .or_insert_with(|| RegionEditQueue::new(region_id))
255                    .enqueue(request);
256            } else {
257                let _ = request.tx.send(RegionBusySnafu { region_id }.fail());
258            }
259            return;
260        }
261
262        let RegionEditRequest {
263            region_id: _,
264            mut edit,
265            tx: sender,
266        } = request;
267        let file_sequence = region.version_control.committed_sequence() + 1;
268        edit.committed_sequence = Some(file_sequence);
269
270        // For every file added through region edit, we should fill the file sequence
271        for file in &mut edit.files_to_add {
272            file.sequence = NonZeroU64::new(file_sequence);
273        }
274
275        // Allow retrieving `is_staging` before spawn the edit region task.
276        let is_staging = region.is_staging();
277        let expect_state = if is_staging {
278            RegionLeaderState::Staging
279        } else {
280            RegionLeaderState::Writable
281        };
282        // Marks the region as editing.
283        if let Err(e) = region.set_editing(expect_state) {
284            let _ = sender.send(Err(e));
285            return;
286        }
287
288        let request_sender = self.sender.clone();
289        let cache_manager = self.cache_manager.clone();
290        let listener = self.listener.clone();
291        // Now the region is in editing state.
292        // Updates manifest in background.
293        common_runtime::spawn_global(async move {
294            let result =
295                edit_region(&region, edit.clone(), cache_manager, listener, is_staging).await;
296            let notify = WorkerRequest::Background {
297                region_id,
298                notify: BackgroundNotify::RegionEdit(RegionEditResult {
299                    region_id,
300                    sender,
301                    edit,
302                    result,
303                    // we always need to restore region state after region edit
304                    update_region_state: true,
305                    is_staging,
306                }),
307            };
308
309            // We don't set state back as the worker loop is already exited.
310            if let Err(res) = request_sender
311                .send(WorkerRequestWithTime::new(notify))
312                .await
313            {
314                warn!(
315                    "Failed to send region edit result back to the worker, region_id: {}, res: {:?}",
316                    region_id, res
317                );
318            }
319        });
320    }
321
322    /// Handles region edit result.
323    pub(crate) async fn handle_region_edit_result(&mut self, edit_result: RegionEditResult) {
324        let region = match self.regions.get_region(edit_result.region_id) {
325            Some(region) => region,
326            None => {
327                // Fail writes stalled behind this edit if the region was removed before the
328                // edit-completion notification reached the worker.
329                self.fail_region_stalled_requests_as_not_found(&edit_result.region_id);
330                self.reject_region_edit_queue_as_not_found(edit_result.region_id);
331                let _ = edit_result.sender.send(
332                    RegionNotFoundSnafu {
333                        region_id: edit_result.region_id,
334                    }
335                    .fail(),
336                );
337                return;
338            }
339        };
340
341        let need_compaction = if edit_result.is_staging {
342            if edit_result.update_region_state {
343                // For staging regions, edits are not applied immediately,
344                // as they remain invisible until the region exits the staging state.
345                region.switch_state_to_staging(RegionLeaderState::Editing);
346            }
347
348            false
349        } else {
350            let need_compaction =
351                edit_result.result.is_ok() && !edit_result.edit.files_to_add.is_empty();
352            // Only apply the edit if the result is ok and region is not in staging state.
353            if edit_result.result.is_ok() {
354                // Applies the edit to the region.
355                region.version_control.apply_edit(
356                    Some(edit_result.edit),
357                    &[],
358                    region.file_purger.clone(),
359                );
360            }
361            if edit_result.update_region_state {
362                region.switch_state_to_writable(RegionLeaderState::Editing);
363            }
364
365            need_compaction
366        };
367
368        let _ = edit_result.sender.send(edit_result.result);
369
370        if edit_result.update_region_state {
371            // Writes stalled specifically by this edit are handled before the next queued edit.
372            // Otherwise the next edit could reserve a committed sequence before those writes.
373            self.handle_region_stalled_requests(&edit_result.region_id, false)
374                .await;
375        }
376
377        let next_request =
378            if let Some(edit_queue) = self.region_edit_queues.get_mut(&edit_result.region_id) {
379                let request = edit_queue.dequeue();
380                if edit_queue.is_empty() {
381                    self.region_edit_queues.remove(&edit_result.region_id);
382                }
383                request
384            } else {
385                None
386            };
387        if let Some(request) = next_request {
388            self.handle_region_edit(request);
389        }
390
391        if need_compaction {
392            self.schedule_compaction(&region).await;
393        }
394    }
395
396    /// Writes truncate action to the manifest and then applies it to the region in background.
397    pub(crate) fn handle_manifest_truncate_action(
398        &self,
399        region: MitoRegionRef,
400        truncate: RegionTruncate,
401        sender: OptionOutputTx,
402    ) {
403        // Marks the region as truncating.
404        // This prevents the region from being accessed by other write requests.
405        if let Err(e) = region.set_truncating() {
406            sender.send(Err(e));
407            return;
408        }
409        // Now the region is in truncating state.
410
411        let request_sender = self.sender.clone();
412        let manifest_ctx = region.manifest_ctx.clone();
413        let is_staging = region.is_staging();
414
415        // Updates manifest in background.
416        common_runtime::spawn_global(async move {
417            // Write region truncated to manifest.
418            let action_list =
419                RegionMetaActionList::with_action(RegionMetaAction::Truncate(truncate.clone()));
420
421            let result = manifest_ctx
422                .update_manifest(RegionLeaderState::Truncating, action_list, is_staging)
423                .await
424                .map(|_| ());
425
426            // Sends the result back to the request sender.
427            let truncate_result = TruncateResult {
428                region_id: truncate.region_id,
429                sender,
430                result,
431                kind: truncate.kind,
432            };
433            let _ = request_sender
434                .send(WorkerRequestWithTime::new(WorkerRequest::Background {
435                    region_id: truncate.region_id,
436                    notify: BackgroundNotify::Truncate(truncate_result),
437                }))
438                .await
439                .inspect_err(|_| warn!("failed to send truncate result"));
440        });
441    }
442
443    /// Writes region change action to the manifest and then applies it to the region in background.
444    pub(crate) fn handle_manifest_region_change(
445        &self,
446        region: MitoRegionRef,
447        change: RegionChange,
448        need_index: bool,
449        new_options: Option<RegionOptions>,
450        sender: OptionOutputTx,
451    ) {
452        // Marks the region as altering.
453        if let Err(e) = region.set_altering() {
454            sender.send(Err(e));
455            return;
456        }
457        let listener = self.listener.clone();
458        let request_sender = self.sender.clone();
459        let is_staging = region.is_staging();
460        // Now the region is in altering state.
461        common_runtime::spawn_global(async move {
462            let new_meta = change.metadata.clone();
463            let action_list = RegionMetaActionList::with_action(RegionMetaAction::Change(change));
464
465            let result = region
466                .manifest_ctx
467                .update_manifest(RegionLeaderState::Altering, action_list, is_staging)
468                .await
469                .map(|_| ());
470            let notify = WorkerRequest::Background {
471                region_id: region.region_id,
472                notify: BackgroundNotify::RegionChange(RegionChangeResult {
473                    region_id: region.region_id,
474                    sender,
475                    result,
476                    new_meta,
477                    need_index,
478                    new_options,
479                }),
480            };
481            listener
482                .on_notify_region_change_result_begin(region.region_id)
483                .await;
484
485            if let Err(res) = request_sender
486                .send(WorkerRequestWithTime::new(notify))
487                .await
488            {
489                warn!(
490                    "Failed to send region change result back to the worker, region_id: {}, res: {:?}",
491                    region.region_id, res
492                );
493            }
494        });
495    }
496
497    fn update_region_version(
498        version_control: &VersionControlRef,
499        new_meta: RegionMetadataRef,
500        new_options: Option<RegionOptions>,
501        memtable_builder_provider: &MemtableBuilderProvider,
502    ) {
503        let options_changed = new_options.is_some();
504        let region_id = new_meta.region_id;
505        if let Some(new_options) = new_options {
506            // Needs to update the region with new format and memtables.
507            // Creates a new memtable builder for the new options as it may change the memtable type.
508            let new_memtable_builder = memtable_builder_provider.builder_for_options(&new_options);
509            version_control.alter_schema_and_format(new_meta, new_options, new_memtable_builder);
510        } else {
511            // Only changes the schema.
512            version_control.alter_schema(new_meta);
513        }
514
515        let version_data = version_control.current();
516        let version = version_data.version;
517        info!(
518            "Region {} is altered, metadata is {:?}, options: {:?}, options_changed: {}",
519            region_id, version.metadata, version.options, options_changed,
520        );
521    }
522}
523
524/// Checks the edit, writes and applies it.
525async fn edit_region(
526    region: &MitoRegionRef,
527    edit: RegionEdit,
528    cache_manager: CacheManagerRef,
529    listener: WorkerListener,
530    is_staging: bool,
531) -> Result<()> {
532    let region_id = region.region_id;
533    if let Some(write_cache) = cache_manager.write_cache() {
534        for file_meta in &edit.files_to_add {
535            let write_cache = write_cache.clone();
536            let layer = region.access_layer.clone();
537            let listener = listener.clone();
538
539            let index_key = IndexKey::new(region_id, file_meta.file_id, FileType::Parquet);
540            let remote_path =
541                location::sst_file_path(layer.table_dir(), file_meta.file_id(), layer.path_type());
542
543            let is_index_exist = file_meta.exists_index();
544            let index_file_size = file_meta.index_file_size();
545
546            let index_file_index_key = IndexKey::new(
547                region_id,
548                file_meta.index_id().file_id.file_id(),
549                FileType::Puffin(file_meta.index_version),
550            );
551            let index_remote_path = location::index_file_path(
552                layer.table_dir(),
553                file_meta.index_id(),
554                layer.path_type(),
555            );
556
557            let file_size = file_meta.file_size;
558            common_runtime::spawn_global(async move {
559                WRITE_CACHE_INFLIGHT_DOWNLOAD.add(1);
560
561                let parquet_cached = write_cache
562                    .download_if_absent(index_key, &remote_path, layer.object_store(), file_size)
563                    .await;
564
565                if parquet_cached.is_ok() {
566                    // Triggers the filling of the parquet metadata cache.
567                    // The parquet file is already downloaded.
568                    let mut cache_metrics = Default::default();
569                    let _ = write_cache
570                        .file_cache()
571                        .get_parquet_meta_data(
572                            index_key,
573                            &mut cache_metrics,
574                            PageIndexPolicy::Optional,
575                        )
576                        .await;
577
578                    if matches!(parquet_cached, Ok(true)) {
579                        listener.on_file_cache_filled(index_key.file_id);
580                    }
581                }
582                if is_index_exist {
583                    // also download puffin file
584                    if let Err(err) = write_cache
585                        .download(
586                            index_file_index_key,
587                            &index_remote_path,
588                            layer.object_store(),
589                            index_file_size,
590                        )
591                        .await
592                    {
593                        common_telemetry::error!(
594                            err; "Failed to download puffin file, region_id: {}, index_file_index_key: {:?}, index_remote_path: {}", region_id, index_file_index_key, index_remote_path
595                        );
596                    }
597                }
598
599                WRITE_CACHE_INFLIGHT_DOWNLOAD.sub(1);
600            });
601        }
602    }
603
604    info!(
605        "Applying {edit:?} to region {}, is_staging: {}",
606        region_id, is_staging
607    );
608
609    let action_list = RegionMetaActionList::with_action(RegionMetaAction::Edit(edit));
610    region
611        .manifest_ctx
612        .update_manifest(RegionLeaderState::Editing, action_list, is_staging)
613        .await
614        .map(|_| ())
615}