mito2/worker/
handle_manifest.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Handles manifest.
16//!
17//! It updates the manifest and applies the changes to the region in background.
18
19use std::collections::{HashMap, VecDeque};
20use std::num::NonZeroU64;
21use std::sync::Arc;
22
23use common_telemetry::{info, warn};
24use store_api::logstore::LogStore;
25use store_api::metadata::RegionMetadataRef;
26use store_api::storage::RegionId;
27
28use crate::cache::CacheManagerRef;
29use crate::cache::file_cache::{FileType, IndexKey};
30use crate::config::IndexBuildMode;
31use crate::error::{RegionBusySnafu, RegionNotFoundSnafu, Result};
32use crate::manifest::action::{
33    RegionChange, RegionEdit, RegionMetaAction, RegionMetaActionList, RegionTruncate,
34};
35use crate::memtable::MemtableBuilderProvider;
36use crate::metrics::WRITE_CACHE_INFLIGHT_DOWNLOAD;
37use crate::region::opener::{sanitize_region_options, version_builder_from_manifest};
38use crate::region::options::RegionOptions;
39use crate::region::version::VersionControlRef;
40use crate::region::{MitoRegionRef, RegionLeaderState, RegionRoleState};
41use crate::request::{
42    BackgroundNotify, BuildIndexRequest, OptionOutputTx, RegionChangeResult, RegionEditRequest,
43    RegionEditResult, RegionSyncRequest, TruncateResult, WorkerRequest, WorkerRequestWithTime,
44};
45use crate::sst::index::IndexBuildType;
46use crate::sst::location;
47use crate::worker::{RegionWorkerLoop, WorkerListener};
48
49pub(crate) type RegionEditQueues = HashMap<RegionId, RegionEditQueue>;
50
51/// A queue for temporary store region edit requests, if the region is in the "Editing" state.
52/// When the current region edit request is completed, the next (if there exists) request in the
53/// queue will be processed.
54/// Everything is done in the region worker loop.
55pub(crate) struct RegionEditQueue {
56    region_id: RegionId,
57    requests: VecDeque<RegionEditRequest>,
58}
59
60impl RegionEditQueue {
61    const QUEUE_MAX_LEN: usize = 128;
62
63    fn new(region_id: RegionId) -> Self {
64        Self {
65            region_id,
66            requests: VecDeque::new(),
67        }
68    }
69
70    fn enqueue(&mut self, request: RegionEditRequest) {
71        if self.requests.len() > Self::QUEUE_MAX_LEN {
72            let _ = request.tx.send(
73                RegionBusySnafu {
74                    region_id: self.region_id,
75                }
76                .fail(),
77            );
78            return;
79        };
80        self.requests.push_back(request);
81    }
82
83    fn dequeue(&mut self) -> Option<RegionEditRequest> {
84        self.requests.pop_front()
85    }
86}
87
88impl<S: LogStore> RegionWorkerLoop<S> {
89    /// Handles region change result.
90    pub(crate) async fn handle_manifest_region_change_result(
91        &mut self,
92        change_result: RegionChangeResult,
93    ) {
94        let region = match self.regions.get_region(change_result.region_id) {
95            Some(region) => region,
96            None => {
97                self.reject_region_stalled_requests(&change_result.region_id);
98                change_result.sender.send(
99                    RegionNotFoundSnafu {
100                        region_id: change_result.region_id,
101                    }
102                    .fail(),
103                );
104                return;
105            }
106        };
107
108        if change_result.result.is_ok() {
109            // Updates the region metadata and format.
110            Self::update_region_version(
111                &region.version_control,
112                change_result.new_meta,
113                change_result.new_options,
114                &self.memtable_builder_provider,
115            );
116        }
117
118        // Sets the region as writable.
119        region.switch_state_to_writable(RegionLeaderState::Altering);
120        // Sends the result.
121        change_result.sender.send(change_result.result.map(|_| 0));
122
123        // In async mode, rebuild index after index metadata changed.
124        if self.config.index.build_mode == IndexBuildMode::Async && change_result.need_index {
125            self.handle_rebuild_index(
126                BuildIndexRequest {
127                    region_id: region.region_id,
128                    build_type: IndexBuildType::SchemaChange,
129                    file_metas: Vec::new(),
130                },
131                OptionOutputTx::new(None),
132            )
133            .await;
134        }
135        // Handles the stalled requests.
136        self.handle_region_stalled_requests(&change_result.region_id)
137            .await;
138    }
139
140    /// Handles region sync request.
141    ///
142    /// Updates the manifest to at least the given version.
143    /// **Note**: The installed version may be greater than the given version.
144    pub(crate) async fn handle_region_sync(&mut self, request: RegionSyncRequest) {
145        let region_id = request.region_id;
146        let sender = request.sender;
147        let region = match self.regions.follower_region(region_id) {
148            Ok(region) => region,
149            Err(e) => {
150                let _ = sender.send(Err(e));
151                return;
152            }
153        };
154
155        let original_manifest_version = region.manifest_ctx.manifest_version().await;
156        let manifest = match region
157            .manifest_ctx
158            .install_manifest_to(request.manifest_version)
159            .await
160        {
161            Ok(manifest) => manifest,
162            Err(e) => {
163                let _ = sender.send(Err(e));
164                return;
165            }
166        };
167        let version = region.version();
168        let mut region_options = version.options.clone();
169        let old_format = region_options.sst_format.unwrap_or_default();
170        // Updates the region options with the manifest.
171        sanitize_region_options(&manifest, &mut region_options);
172        if !version.memtables.is_empty() {
173            let current = region.version_control.current();
174            warn!(
175                "Region {} memtables is not empty, which should not happen, manifest version: {}, last entry id: {}",
176                region.region_id, manifest.manifest_version, current.last_entry_id
177            );
178        }
179
180        // We should sanitize the region options before creating a new memtable.
181        let memtable_builder = if old_format != region_options.sst_format.unwrap_or_default() {
182            // Format changed, also needs to replace the memtable builder.
183            Some(
184                self.memtable_builder_provider
185                    .builder_for_options(&region_options),
186            )
187        } else {
188            None
189        };
190        let new_mutable = Arc::new(
191            region
192                .version()
193                .memtables
194                .mutable
195                .new_with_part_duration(version.compaction_time_window, memtable_builder),
196        );
197        // Here it assumes the leader has backfilled the partition_expr of the metadata.
198        let metadata = manifest.metadata.clone();
199
200        let version_builder = version_builder_from_manifest(
201            &manifest,
202            metadata,
203            region.file_purger.clone(),
204            new_mutable,
205            region_options,
206        );
207        let version = version_builder.build();
208        region.version_control.overwrite_current(Arc::new(version));
209
210        let updated = manifest.manifest_version > original_manifest_version;
211        let _ = sender.send(Ok((manifest.manifest_version, updated)));
212    }
213}
214
215impl<S> RegionWorkerLoop<S> {
216    /// Handles region edit request.
217    pub(crate) async fn handle_region_edit(&mut self, request: RegionEditRequest) {
218        let region_id = request.region_id;
219        let Some(region) = self.regions.get_region(region_id) else {
220            let _ = request.tx.send(RegionNotFoundSnafu { region_id }.fail());
221            return;
222        };
223
224        if !region.is_writable() {
225            if region.state() == RegionRoleState::Leader(RegionLeaderState::Editing) {
226                self.region_edit_queues
227                    .entry(region_id)
228                    .or_insert_with(|| RegionEditQueue::new(region_id))
229                    .enqueue(request);
230            } else {
231                let _ = request.tx.send(RegionBusySnafu { region_id }.fail());
232            }
233            return;
234        }
235
236        let RegionEditRequest {
237            region_id: _,
238            mut edit,
239            tx: sender,
240        } = request;
241        let file_sequence = region.version_control.committed_sequence() + 1;
242        edit.committed_sequence = Some(file_sequence);
243
244        // For every file added through region edit, we should fill the file sequence
245        for file in &mut edit.files_to_add {
246            file.sequence = NonZeroU64::new(file_sequence);
247        }
248
249        // Marks the region as editing.
250        if let Err(e) = region.set_editing() {
251            let _ = sender.send(Err(e));
252            return;
253        }
254
255        let request_sender = self.sender.clone();
256        let cache_manager = self.cache_manager.clone();
257        let listener = self.listener.clone();
258        // Now the region is in editing state.
259        // Updates manifest in background.
260        common_runtime::spawn_global(async move {
261            let result = edit_region(&region, edit.clone(), cache_manager, listener).await;
262            let notify = WorkerRequest::Background {
263                region_id,
264                notify: BackgroundNotify::RegionEdit(RegionEditResult {
265                    region_id,
266                    sender,
267                    edit,
268                    result,
269                    // we always need to restore region state after region edit
270                    update_region_state: true,
271                }),
272            };
273
274            // We don't set state back as the worker loop is already exited.
275            if let Err(res) = request_sender
276                .send(WorkerRequestWithTime::new(notify))
277                .await
278            {
279                warn!(
280                    "Failed to send region edit result back to the worker, region_id: {}, res: {:?}",
281                    region_id, res
282                );
283            }
284        });
285    }
286
287    /// Handles region edit result.
288    pub(crate) async fn handle_region_edit_result(&mut self, edit_result: RegionEditResult) {
289        let region = match self.regions.get_region(edit_result.region_id) {
290            Some(region) => region,
291            None => {
292                let _ = edit_result.sender.send(
293                    RegionNotFoundSnafu {
294                        region_id: edit_result.region_id,
295                    }
296                    .fail(),
297                );
298                return;
299            }
300        };
301
302        let need_compaction =
303            edit_result.result.is_ok() && !edit_result.edit.files_to_add.is_empty();
304
305        if edit_result.result.is_ok() {
306            // Applies the edit to the region.
307            region.version_control.apply_edit(
308                Some(edit_result.edit),
309                &[],
310                region.file_purger.clone(),
311            );
312        }
313
314        if edit_result.update_region_state {
315            // Sets the region as writable.
316            region.switch_state_to_writable(RegionLeaderState::Editing);
317        }
318
319        let _ = edit_result.sender.send(edit_result.result);
320
321        if let Some(edit_queue) = self.region_edit_queues.get_mut(&edit_result.region_id)
322            && let Some(request) = edit_queue.dequeue()
323        {
324            self.handle_region_edit(request).await;
325        }
326
327        if need_compaction {
328            self.schedule_compaction(&region).await;
329        }
330    }
331
332    /// Writes truncate action to the manifest and then applies it to the region in background.
333    pub(crate) fn handle_manifest_truncate_action(
334        &self,
335        region: MitoRegionRef,
336        truncate: RegionTruncate,
337        sender: OptionOutputTx,
338    ) {
339        // Marks the region as truncating.
340        // This prevents the region from being accessed by other write requests.
341        if let Err(e) = region.set_truncating() {
342            sender.send(Err(e));
343            return;
344        }
345        // Now the region is in truncating state.
346
347        let request_sender = self.sender.clone();
348        let manifest_ctx = region.manifest_ctx.clone();
349
350        // Updates manifest in background.
351        common_runtime::spawn_global(async move {
352            // Write region truncated to manifest.
353            let action_list =
354                RegionMetaActionList::with_action(RegionMetaAction::Truncate(truncate.clone()));
355
356            let result = manifest_ctx
357                .update_manifest(RegionLeaderState::Truncating, action_list)
358                .await
359                .map(|_| ());
360
361            // Sends the result back to the request sender.
362            let truncate_result = TruncateResult {
363                region_id: truncate.region_id,
364                sender,
365                result,
366                kind: truncate.kind,
367            };
368            let _ = request_sender
369                .send(WorkerRequestWithTime::new(WorkerRequest::Background {
370                    region_id: truncate.region_id,
371                    notify: BackgroundNotify::Truncate(truncate_result),
372                }))
373                .await
374                .inspect_err(|_| warn!("failed to send truncate result"));
375        });
376    }
377
378    /// Writes region change action to the manifest and then applies it to the region in background.
379    pub(crate) fn handle_manifest_region_change(
380        &self,
381        region: MitoRegionRef,
382        change: RegionChange,
383        need_index: bool,
384        new_options: Option<RegionOptions>,
385        sender: OptionOutputTx,
386    ) {
387        // Marks the region as altering.
388        if let Err(e) = region.set_altering() {
389            sender.send(Err(e));
390            return;
391        }
392        let listener = self.listener.clone();
393        let request_sender = self.sender.clone();
394        // Now the region is in altering state.
395        common_runtime::spawn_global(async move {
396            let new_meta = change.metadata.clone();
397            let action_list = RegionMetaActionList::with_action(RegionMetaAction::Change(change));
398
399            let result = region
400                .manifest_ctx
401                .update_manifest(RegionLeaderState::Altering, action_list)
402                .await
403                .map(|_| ());
404            let notify = WorkerRequest::Background {
405                region_id: region.region_id,
406                notify: BackgroundNotify::RegionChange(RegionChangeResult {
407                    region_id: region.region_id,
408                    sender,
409                    result,
410                    new_meta,
411                    need_index,
412                    new_options,
413                }),
414            };
415            listener
416                .on_notify_region_change_result_begin(region.region_id)
417                .await;
418
419            if let Err(res) = request_sender
420                .send(WorkerRequestWithTime::new(notify))
421                .await
422            {
423                warn!(
424                    "Failed to send region change result back to the worker, region_id: {}, res: {:?}",
425                    region.region_id, res
426                );
427            }
428        });
429    }
430
431    fn update_region_version(
432        version_control: &VersionControlRef,
433        new_meta: RegionMetadataRef,
434        new_options: Option<RegionOptions>,
435        memtable_builder_provider: &MemtableBuilderProvider,
436    ) {
437        let options_changed = new_options.is_some();
438        let region_id = new_meta.region_id;
439        if let Some(new_options) = new_options {
440            // Needs to update the region with new format and memtables.
441            // Creates a new memtable builder for the new options as it may change the memtable type.
442            let new_memtable_builder = memtable_builder_provider.builder_for_options(&new_options);
443            version_control.alter_schema_and_format(new_meta, new_options, new_memtable_builder);
444        } else {
445            // Only changes the schema.
446            version_control.alter_schema(new_meta);
447        }
448
449        let version_data = version_control.current();
450        let version = version_data.version;
451        info!(
452            "Region {} is altered, metadata is {:?}, options: {:?}, options_changed: {}",
453            region_id, version.metadata, version.options, options_changed,
454        );
455    }
456}
457
458/// Checks the edit, writes and applies it.
459async fn edit_region(
460    region: &MitoRegionRef,
461    edit: RegionEdit,
462    cache_manager: CacheManagerRef,
463    listener: WorkerListener,
464) -> Result<()> {
465    let region_id = region.region_id;
466    if let Some(write_cache) = cache_manager.write_cache() {
467        for file_meta in &edit.files_to_add {
468            let write_cache = write_cache.clone();
469            let layer = region.access_layer.clone();
470            let listener = listener.clone();
471
472            let index_key = IndexKey::new(region_id, file_meta.file_id, FileType::Parquet);
473            let remote_path =
474                location::sst_file_path(layer.table_dir(), file_meta.file_id(), layer.path_type());
475
476            let is_index_exist = file_meta.exists_index();
477            let index_file_size = file_meta.index_file_size();
478
479            let index_file_index_key = IndexKey::new(
480                region_id,
481                file_meta.index_file_id().file_id(),
482                FileType::Puffin,
483            );
484            let index_remote_path = location::index_file_path(
485                layer.table_dir(),
486                file_meta.file_id(),
487                layer.path_type(),
488            );
489
490            let file_size = file_meta.file_size;
491            common_runtime::spawn_global(async move {
492                WRITE_CACHE_INFLIGHT_DOWNLOAD.add(1);
493
494                if write_cache
495                    .download(index_key, &remote_path, layer.object_store(), file_size)
496                    .await
497                    .is_ok()
498                {
499                    // Triggers the filling of the parquet metadata cache.
500                    // The parquet file is already downloaded.
501                    let _ = write_cache
502                        .file_cache()
503                        .get_parquet_meta_data(index_key)
504                        .await;
505
506                    listener.on_file_cache_filled(index_key.file_id);
507                }
508                if is_index_exist {
509                    // also download puffin file
510                    if let Err(err) = write_cache
511                        .download(
512                            index_file_index_key,
513                            &index_remote_path,
514                            layer.object_store(),
515                            index_file_size,
516                        )
517                        .await
518                    {
519                        common_telemetry::error!(
520                            err; "Failed to download puffin file, region_id: {}, index_file_index_key: {:?}, index_remote_path: {}", region_id, index_file_index_key, index_remote_path
521                        );
522                    }
523                }
524
525                WRITE_CACHE_INFLIGHT_DOWNLOAD.sub(1);
526            });
527        }
528    }
529
530    info!("Applying {edit:?} to region {}", region_id);
531
532    let action_list = RegionMetaActionList::with_action(RegionMetaAction::Edit(edit));
533    region
534        .manifest_ctx
535        .update_manifest(RegionLeaderState::Editing, action_list)
536        .await
537        .map(|_| ())
538}