mito2/worker/
handle_manifest.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Handles manifest.
16//!
17//! It updates the manifest and applies the changes to the region in background.
18
19use std::collections::{HashMap, VecDeque};
20use std::num::NonZeroU64;
21use std::sync::Arc;
22
23use common_telemetry::{info, warn};
24use parquet::file::metadata::PageIndexPolicy;
25use store_api::logstore::LogStore;
26use store_api::metadata::RegionMetadataRef;
27use store_api::storage::RegionId;
28
29use crate::cache::CacheManagerRef;
30use crate::cache::file_cache::{FileType, IndexKey};
31use crate::config::IndexBuildMode;
32use crate::error::{RegionBusySnafu, RegionNotFoundSnafu, Result};
33use crate::manifest::action::{
34    RegionChange, RegionEdit, RegionMetaAction, RegionMetaActionList, RegionTruncate,
35};
36use crate::memtable::MemtableBuilderProvider;
37use crate::metrics::WRITE_CACHE_INFLIGHT_DOWNLOAD;
38use crate::region::opener::{sanitize_region_options, version_builder_from_manifest};
39use crate::region::options::RegionOptions;
40use crate::region::version::VersionControlRef;
41use crate::region::{MitoRegionRef, RegionLeaderState, RegionRoleState};
42use crate::request::{
43    BackgroundNotify, BuildIndexRequest, OptionOutputTx, RegionChangeResult, RegionEditRequest,
44    RegionEditResult, RegionSyncRequest, TruncateResult, WorkerRequest, WorkerRequestWithTime,
45};
46use crate::sst::index::IndexBuildType;
47use crate::sst::location;
48use crate::worker::{RegionWorkerLoop, WorkerListener};
49
50pub(crate) type RegionEditQueues = HashMap<RegionId, RegionEditQueue>;
51
52/// A queue for temporary store region edit requests, if the region is in the "Editing" state.
53/// When the current region edit request is completed, the next (if there exists) request in the
54/// queue will be processed.
55/// Everything is done in the region worker loop.
56pub(crate) struct RegionEditQueue {
57    region_id: RegionId,
58    requests: VecDeque<RegionEditRequest>,
59}
60
61impl RegionEditQueue {
62    const QUEUE_MAX_LEN: usize = 128;
63
64    fn new(region_id: RegionId) -> Self {
65        Self {
66            region_id,
67            requests: VecDeque::new(),
68        }
69    }
70
71    fn enqueue(&mut self, request: RegionEditRequest) {
72        if self.requests.len() > Self::QUEUE_MAX_LEN {
73            let _ = request.tx.send(
74                RegionBusySnafu {
75                    region_id: self.region_id,
76                }
77                .fail(),
78            );
79            return;
80        };
81        self.requests.push_back(request);
82    }
83
84    fn dequeue(&mut self) -> Option<RegionEditRequest> {
85        self.requests.pop_front()
86    }
87}
88
89impl<S: LogStore> RegionWorkerLoop<S> {
90    /// Handles region change result.
91    pub(crate) async fn handle_manifest_region_change_result(
92        &mut self,
93        change_result: RegionChangeResult,
94    ) {
95        let region = match self.regions.get_region(change_result.region_id) {
96            Some(region) => region,
97            None => {
98                self.reject_region_stalled_requests(&change_result.region_id);
99                change_result.sender.send(
100                    RegionNotFoundSnafu {
101                        region_id: change_result.region_id,
102                    }
103                    .fail(),
104                );
105                return;
106            }
107        };
108
109        if change_result.result.is_ok() {
110            // Updates the region metadata and format.
111            Self::update_region_version(
112                &region.version_control,
113                change_result.new_meta,
114                change_result.new_options,
115                &self.memtable_builder_provider,
116            );
117        }
118
119        // Sets the region as writable.
120        region.switch_state_to_writable(RegionLeaderState::Altering);
121        // Sends the result.
122        change_result.sender.send(change_result.result.map(|_| 0));
123
124        // In async mode, rebuild index after index metadata changed.
125        if self.config.index.build_mode == IndexBuildMode::Async && change_result.need_index {
126            self.handle_rebuild_index(
127                BuildIndexRequest {
128                    region_id: region.region_id,
129                    build_type: IndexBuildType::SchemaChange,
130                    file_metas: Vec::new(),
131                },
132                OptionOutputTx::new(None),
133            )
134            .await;
135        }
136        // Handles the stalled requests.
137        self.handle_region_stalled_requests(&change_result.region_id)
138            .await;
139    }
140
141    /// Handles region sync request.
142    ///
143    /// Updates the manifest to at least the given version.
144    /// **Note**: The installed version may be greater than the given version.
145    pub(crate) async fn handle_region_sync(&mut self, request: RegionSyncRequest) {
146        let region_id = request.region_id;
147        let sender = request.sender;
148        let region = match self.regions.follower_region(region_id) {
149            Ok(region) => region,
150            Err(e) => {
151                let _ = sender.send(Err(e));
152                return;
153            }
154        };
155
156        let original_manifest_version = region.manifest_ctx.manifest_version().await;
157        let manifest = match region
158            .manifest_ctx
159            .install_manifest_to(request.manifest_version)
160            .await
161        {
162            Ok(manifest) => manifest,
163            Err(e) => {
164                let _ = sender.send(Err(e));
165                return;
166            }
167        };
168        let version = region.version();
169        let mut region_options = version.options.clone();
170        let old_format = region_options.sst_format.unwrap_or_default();
171        // Updates the region options with the manifest.
172        sanitize_region_options(&manifest, &mut region_options);
173        if !version.memtables.is_empty() {
174            let current = region.version_control.current();
175            warn!(
176                "Region {} memtables is not empty, which should not happen, manifest version: {}, last entry id: {}",
177                region.region_id, manifest.manifest_version, current.last_entry_id
178            );
179        }
180
181        // We should sanitize the region options before creating a new memtable.
182        let memtable_builder = if old_format != region_options.sst_format.unwrap_or_default() {
183            // Format changed, also needs to replace the memtable builder.
184            Some(
185                self.memtable_builder_provider
186                    .builder_for_options(&region_options),
187            )
188        } else {
189            None
190        };
191        let new_mutable = Arc::new(
192            region
193                .version()
194                .memtables
195                .mutable
196                .new_with_part_duration(version.compaction_time_window, memtable_builder),
197        );
198        // Here it assumes the leader has backfilled the partition_expr of the metadata.
199        let metadata = manifest.metadata.clone();
200
201        let version_builder = version_builder_from_manifest(
202            &manifest,
203            metadata,
204            region.file_purger.clone(),
205            new_mutable,
206            region_options,
207        );
208        let version = version_builder.build();
209        region.version_control.overwrite_current(Arc::new(version));
210
211        let updated = manifest.manifest_version > original_manifest_version;
212        let _ = sender.send(Ok((manifest.manifest_version, updated)));
213    }
214}
215
216impl<S> RegionWorkerLoop<S> {
217    /// Handles region edit request.
218    pub(crate) fn handle_region_edit(&mut self, request: RegionEditRequest) {
219        let region_id = request.region_id;
220        let Some(region) = self.regions.get_region(region_id) else {
221            let _ = request.tx.send(RegionNotFoundSnafu { region_id }.fail());
222            return;
223        };
224
225        if !region.is_writable() {
226            if region.state() == RegionRoleState::Leader(RegionLeaderState::Editing) {
227                self.region_edit_queues
228                    .entry(region_id)
229                    .or_insert_with(|| RegionEditQueue::new(region_id))
230                    .enqueue(request);
231            } else {
232                let _ = request.tx.send(RegionBusySnafu { region_id }.fail());
233            }
234            return;
235        }
236
237        let RegionEditRequest {
238            region_id: _,
239            mut edit,
240            tx: sender,
241        } = request;
242        let file_sequence = region.version_control.committed_sequence() + 1;
243        edit.committed_sequence = Some(file_sequence);
244
245        // For every file added through region edit, we should fill the file sequence
246        for file in &mut edit.files_to_add {
247            file.sequence = NonZeroU64::new(file_sequence);
248        }
249
250        // Allow retrieving `is_staging` before spawn the edit region task.
251        let is_staging = region.is_staging();
252        let expect_state = if is_staging {
253            RegionLeaderState::Staging
254        } else {
255            RegionLeaderState::Writable
256        };
257        // Marks the region as editing.
258        if let Err(e) = region.set_editing(expect_state) {
259            let _ = sender.send(Err(e));
260            return;
261        }
262
263        let request_sender = self.sender.clone();
264        let cache_manager = self.cache_manager.clone();
265        let listener = self.listener.clone();
266        // Now the region is in editing state.
267        // Updates manifest in background.
268        common_runtime::spawn_global(async move {
269            let result =
270                edit_region(&region, edit.clone(), cache_manager, listener, is_staging).await;
271            let notify = WorkerRequest::Background {
272                region_id,
273                notify: BackgroundNotify::RegionEdit(RegionEditResult {
274                    region_id,
275                    sender,
276                    edit,
277                    result,
278                    // we always need to restore region state after region edit
279                    update_region_state: true,
280                    is_staging,
281                }),
282            };
283
284            // We don't set state back as the worker loop is already exited.
285            if let Err(res) = request_sender
286                .send(WorkerRequestWithTime::new(notify))
287                .await
288            {
289                warn!(
290                    "Failed to send region edit result back to the worker, region_id: {}, res: {:?}",
291                    region_id, res
292                );
293            }
294        });
295    }
296
297    /// Handles region edit result.
298    pub(crate) async fn handle_region_edit_result(&mut self, edit_result: RegionEditResult) {
299        let region = match self.regions.get_region(edit_result.region_id) {
300            Some(region) => region,
301            None => {
302                let _ = edit_result.sender.send(
303                    RegionNotFoundSnafu {
304                        region_id: edit_result.region_id,
305                    }
306                    .fail(),
307                );
308                return;
309            }
310        };
311
312        let need_compaction = if edit_result.is_staging {
313            if edit_result.update_region_state {
314                // For staging regions, edits are not applied immediately,
315                // as they remain invisible until the region exits the staging state.
316                region.switch_state_to_staging(RegionLeaderState::Editing);
317            }
318
319            false
320        } else {
321            let need_compaction =
322                edit_result.result.is_ok() && !edit_result.edit.files_to_add.is_empty();
323            // Only apply the edit if the result is ok and region is not in staging state.
324            if edit_result.result.is_ok() {
325                // Applies the edit to the region.
326                region.version_control.apply_edit(
327                    Some(edit_result.edit),
328                    &[],
329                    region.file_purger.clone(),
330                );
331            }
332            if edit_result.update_region_state {
333                region.switch_state_to_writable(RegionLeaderState::Editing);
334            }
335
336            need_compaction
337        };
338
339        let _ = edit_result.sender.send(edit_result.result);
340
341        if let Some(edit_queue) = self.region_edit_queues.get_mut(&edit_result.region_id)
342            && let Some(request) = edit_queue.dequeue()
343        {
344            self.handle_region_edit(request);
345        }
346
347        if need_compaction {
348            self.schedule_compaction(&region).await;
349        }
350    }
351
352    /// Writes truncate action to the manifest and then applies it to the region in background.
353    pub(crate) fn handle_manifest_truncate_action(
354        &self,
355        region: MitoRegionRef,
356        truncate: RegionTruncate,
357        sender: OptionOutputTx,
358    ) {
359        // Marks the region as truncating.
360        // This prevents the region from being accessed by other write requests.
361        if let Err(e) = region.set_truncating() {
362            sender.send(Err(e));
363            return;
364        }
365        // Now the region is in truncating state.
366
367        let request_sender = self.sender.clone();
368        let manifest_ctx = region.manifest_ctx.clone();
369        let is_staging = region.is_staging();
370
371        // Updates manifest in background.
372        common_runtime::spawn_global(async move {
373            // Write region truncated to manifest.
374            let action_list =
375                RegionMetaActionList::with_action(RegionMetaAction::Truncate(truncate.clone()));
376
377            let result = manifest_ctx
378                .update_manifest(RegionLeaderState::Truncating, action_list, is_staging)
379                .await
380                .map(|_| ());
381
382            // Sends the result back to the request sender.
383            let truncate_result = TruncateResult {
384                region_id: truncate.region_id,
385                sender,
386                result,
387                kind: truncate.kind,
388            };
389            let _ = request_sender
390                .send(WorkerRequestWithTime::new(WorkerRequest::Background {
391                    region_id: truncate.region_id,
392                    notify: BackgroundNotify::Truncate(truncate_result),
393                }))
394                .await
395                .inspect_err(|_| warn!("failed to send truncate result"));
396        });
397    }
398
399    /// Writes region change action to the manifest and then applies it to the region in background.
400    pub(crate) fn handle_manifest_region_change(
401        &self,
402        region: MitoRegionRef,
403        change: RegionChange,
404        need_index: bool,
405        new_options: Option<RegionOptions>,
406        sender: OptionOutputTx,
407    ) {
408        // Marks the region as altering.
409        if let Err(e) = region.set_altering() {
410            sender.send(Err(e));
411            return;
412        }
413        let listener = self.listener.clone();
414        let request_sender = self.sender.clone();
415        let is_staging = region.is_staging();
416        // Now the region is in altering state.
417        common_runtime::spawn_global(async move {
418            let new_meta = change.metadata.clone();
419            let action_list = RegionMetaActionList::with_action(RegionMetaAction::Change(change));
420
421            let result = region
422                .manifest_ctx
423                .update_manifest(RegionLeaderState::Altering, action_list, is_staging)
424                .await
425                .map(|_| ());
426            let notify = WorkerRequest::Background {
427                region_id: region.region_id,
428                notify: BackgroundNotify::RegionChange(RegionChangeResult {
429                    region_id: region.region_id,
430                    sender,
431                    result,
432                    new_meta,
433                    need_index,
434                    new_options,
435                }),
436            };
437            listener
438                .on_notify_region_change_result_begin(region.region_id)
439                .await;
440
441            if let Err(res) = request_sender
442                .send(WorkerRequestWithTime::new(notify))
443                .await
444            {
445                warn!(
446                    "Failed to send region change result back to the worker, region_id: {}, res: {:?}",
447                    region.region_id, res
448                );
449            }
450        });
451    }
452
453    fn update_region_version(
454        version_control: &VersionControlRef,
455        new_meta: RegionMetadataRef,
456        new_options: Option<RegionOptions>,
457        memtable_builder_provider: &MemtableBuilderProvider,
458    ) {
459        let options_changed = new_options.is_some();
460        let region_id = new_meta.region_id;
461        if let Some(new_options) = new_options {
462            // Needs to update the region with new format and memtables.
463            // Creates a new memtable builder for the new options as it may change the memtable type.
464            let new_memtable_builder = memtable_builder_provider.builder_for_options(&new_options);
465            version_control.alter_schema_and_format(new_meta, new_options, new_memtable_builder);
466        } else {
467            // Only changes the schema.
468            version_control.alter_schema(new_meta);
469        }
470
471        let version_data = version_control.current();
472        let version = version_data.version;
473        info!(
474            "Region {} is altered, metadata is {:?}, options: {:?}, options_changed: {}",
475            region_id, version.metadata, version.options, options_changed,
476        );
477    }
478}
479
480/// Checks the edit, writes and applies it.
481async fn edit_region(
482    region: &MitoRegionRef,
483    edit: RegionEdit,
484    cache_manager: CacheManagerRef,
485    listener: WorkerListener,
486    is_staging: bool,
487) -> Result<()> {
488    let region_id = region.region_id;
489    if let Some(write_cache) = cache_manager.write_cache() {
490        for file_meta in &edit.files_to_add {
491            let write_cache = write_cache.clone();
492            let layer = region.access_layer.clone();
493            let listener = listener.clone();
494
495            let index_key = IndexKey::new(region_id, file_meta.file_id, FileType::Parquet);
496            let remote_path =
497                location::sst_file_path(layer.table_dir(), file_meta.file_id(), layer.path_type());
498
499            let is_index_exist = file_meta.exists_index();
500            let index_file_size = file_meta.index_file_size();
501
502            let index_file_index_key = IndexKey::new(
503                region_id,
504                file_meta.index_id().file_id.file_id(),
505                FileType::Puffin(file_meta.index_version),
506            );
507            let index_remote_path = location::index_file_path(
508                layer.table_dir(),
509                file_meta.index_id(),
510                layer.path_type(),
511            );
512
513            let file_size = file_meta.file_size;
514            common_runtime::spawn_global(async move {
515                WRITE_CACHE_INFLIGHT_DOWNLOAD.add(1);
516
517                if write_cache
518                    .download(index_key, &remote_path, layer.object_store(), file_size)
519                    .await
520                    .is_ok()
521                {
522                    // Triggers the filling of the parquet metadata cache.
523                    // The parquet file is already downloaded.
524                    let mut cache_metrics = Default::default();
525                    let _ = write_cache
526                        .file_cache()
527                        .get_parquet_meta_data(
528                            index_key,
529                            &mut cache_metrics,
530                            PageIndexPolicy::Optional,
531                        )
532                        .await;
533
534                    listener.on_file_cache_filled(index_key.file_id);
535                }
536                if is_index_exist {
537                    // also download puffin file
538                    if let Err(err) = write_cache
539                        .download(
540                            index_file_index_key,
541                            &index_remote_path,
542                            layer.object_store(),
543                            index_file_size,
544                        )
545                        .await
546                    {
547                        common_telemetry::error!(
548                            err; "Failed to download puffin file, region_id: {}, index_file_index_key: {:?}, index_remote_path: {}", region_id, index_file_index_key, index_remote_path
549                        );
550                    }
551                }
552
553                WRITE_CACHE_INFLIGHT_DOWNLOAD.sub(1);
554            });
555        }
556    }
557
558    info!(
559        "Applying {edit:?} to region {}, is_staging: {}",
560        region_id, is_staging
561    );
562
563    let action_list = RegionMetaActionList::with_action(RegionMetaAction::Edit(edit));
564    region
565        .manifest_ctx
566        .update_manifest(RegionLeaderState::Editing, action_list, is_staging)
567        .await
568        .map(|_| ())
569}