mito2/worker/
handle_manifest.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Handles manifest.
16//!
17//! It updates the manifest and applies the changes to the region in background.
18
19use std::collections::{HashMap, VecDeque};
20use std::num::NonZeroU64;
21use std::sync::Arc;
22
23use common_telemetry::{info, warn};
24use store_api::logstore::LogStore;
25use store_api::storage::RegionId;
26
27use crate::cache::CacheManagerRef;
28use crate::cache::file_cache::{FileType, IndexKey};
29use crate::config::IndexBuildMode;
30use crate::error::{RegionBusySnafu, RegionNotFoundSnafu, Result};
31use crate::manifest::action::{
32    RegionChange, RegionEdit, RegionMetaAction, RegionMetaActionList, RegionTruncate,
33};
34use crate::metrics::WRITE_CACHE_INFLIGHT_DOWNLOAD;
35use crate::region::version::VersionBuilder;
36use crate::region::{MitoRegionRef, RegionLeaderState, RegionRoleState};
37use crate::request::{
38    BackgroundNotify, BuildIndexRequest, OptionOutputTx, RegionChangeResult, RegionEditRequest,
39    RegionEditResult, RegionSyncRequest, TruncateResult, WorkerRequest, WorkerRequestWithTime,
40};
41use crate::sst::index::IndexBuildType;
42use crate::sst::location;
43use crate::worker::{RegionWorkerLoop, WorkerListener};
44
45pub(crate) type RegionEditQueues = HashMap<RegionId, RegionEditQueue>;
46
47/// A queue for temporary store region edit requests, if the region is in the "Editing" state.
48/// When the current region edit request is completed, the next (if there exists) request in the
49/// queue will be processed.
50/// Everything is done in the region worker loop.
51pub(crate) struct RegionEditQueue {
52    region_id: RegionId,
53    requests: VecDeque<RegionEditRequest>,
54}
55
56impl RegionEditQueue {
57    const QUEUE_MAX_LEN: usize = 128;
58
59    fn new(region_id: RegionId) -> Self {
60        Self {
61            region_id,
62            requests: VecDeque::new(),
63        }
64    }
65
66    fn enqueue(&mut self, request: RegionEditRequest) {
67        if self.requests.len() > Self::QUEUE_MAX_LEN {
68            let _ = request.tx.send(
69                RegionBusySnafu {
70                    region_id: self.region_id,
71                }
72                .fail(),
73            );
74            return;
75        };
76        self.requests.push_back(request);
77    }
78
79    fn dequeue(&mut self) -> Option<RegionEditRequest> {
80        self.requests.pop_front()
81    }
82}
83
84impl<S: LogStore> RegionWorkerLoop<S> {
85    /// Handles region change result.
86    pub(crate) async fn handle_manifest_region_change_result(
87        &mut self,
88        change_result: RegionChangeResult,
89    ) {
90        let region = match self.regions.get_region(change_result.region_id) {
91            Some(region) => region,
92            None => {
93                self.reject_region_stalled_requests(&change_result.region_id);
94                change_result.sender.send(
95                    RegionNotFoundSnafu {
96                        region_id: change_result.region_id,
97                    }
98                    .fail(),
99                );
100                return;
101            }
102        };
103
104        if change_result.result.is_ok() {
105            // Apply the metadata to region's version.
106            region
107                .version_control
108                .alter_schema(change_result.new_meta, &region.memtable_builder);
109
110            let version = region.version();
111            info!(
112                "Region {} is altered, metadata is {:?}, options: {:?}",
113                region.region_id, version.metadata, version.options,
114            );
115        }
116
117        // Sets the region as writable.
118        region.switch_state_to_writable(RegionLeaderState::Altering);
119        // Sends the result.
120        change_result.sender.send(change_result.result.map(|_| 0));
121
122        // In async mode, rebuild index after index metadata changed.
123        if self.config.index.build_mode == IndexBuildMode::Async && change_result.need_index {
124            self.handle_rebuild_index(
125                BuildIndexRequest {
126                    region_id: region.region_id,
127                    build_type: IndexBuildType::SchemaChange,
128                    file_metas: Vec::new(),
129                },
130                OptionOutputTx::new(None),
131            )
132            .await;
133        }
134        // Handles the stalled requests.
135        self.handle_region_stalled_requests(&change_result.region_id)
136            .await;
137    }
138
139    /// Handles region sync request.
140    ///
141    /// Updates the manifest to at least the given version.
142    /// **Note**: The installed version may be greater than the given version.
143    pub(crate) async fn handle_region_sync(&mut self, request: RegionSyncRequest) {
144        let region_id = request.region_id;
145        let sender = request.sender;
146        let region = match self.regions.follower_region(region_id) {
147            Ok(region) => region,
148            Err(e) => {
149                let _ = sender.send(Err(e));
150                return;
151            }
152        };
153
154        let original_manifest_version = region.manifest_ctx.manifest_version().await;
155        let manifest = match region
156            .manifest_ctx
157            .install_manifest_to(request.manifest_version)
158            .await
159        {
160            Ok(manifest) => manifest,
161            Err(e) => {
162                let _ = sender.send(Err(e));
163                return;
164            }
165        };
166        let version = region.version();
167        if !version.memtables.is_empty() {
168            let current = region.version_control.current();
169            warn!(
170                "Region {} memtables is not empty, which should not happen, manifest version: {}, last entry id: {}",
171                region.region_id, manifest.manifest_version, current.last_entry_id
172            );
173        }
174        let region_options = version.options.clone();
175        let new_mutable = Arc::new(
176            region
177                .version()
178                .memtables
179                .mutable
180                .new_with_part_duration(version.compaction_time_window),
181        );
182        let metadata = manifest.metadata.clone();
183        let version = VersionBuilder::new(metadata, new_mutable)
184            .add_files(region.file_purger.clone(), manifest.files.values().cloned())
185            .flushed_entry_id(manifest.flushed_entry_id)
186            .flushed_sequence(manifest.flushed_sequence)
187            .truncated_entry_id(manifest.truncated_entry_id)
188            .compaction_time_window(manifest.compaction_time_window)
189            .options(region_options)
190            .build();
191        region.version_control.overwrite_current(Arc::new(version));
192
193        let updated = manifest.manifest_version > original_manifest_version;
194        let _ = sender.send(Ok((manifest.manifest_version, updated)));
195    }
196}
197
198impl<S> RegionWorkerLoop<S> {
199    /// Handles region edit request.
200    pub(crate) async fn handle_region_edit(&mut self, request: RegionEditRequest) {
201        let region_id = request.region_id;
202        let Some(region) = self.regions.get_region(region_id) else {
203            let _ = request.tx.send(RegionNotFoundSnafu { region_id }.fail());
204            return;
205        };
206
207        if !region.is_writable() {
208            if region.state() == RegionRoleState::Leader(RegionLeaderState::Editing) {
209                self.region_edit_queues
210                    .entry(region_id)
211                    .or_insert_with(|| RegionEditQueue::new(region_id))
212                    .enqueue(request);
213            } else {
214                let _ = request.tx.send(RegionBusySnafu { region_id }.fail());
215            }
216            return;
217        }
218
219        let RegionEditRequest {
220            region_id: _,
221            mut edit,
222            tx: sender,
223        } = request;
224        let file_sequence = region.version_control.committed_sequence() + 1;
225        edit.committed_sequence = Some(file_sequence);
226
227        // For every file added through region edit, we should fill the file sequence
228        for file in &mut edit.files_to_add {
229            file.sequence = NonZeroU64::new(file_sequence);
230        }
231
232        // Marks the region as editing.
233        if let Err(e) = region.set_editing() {
234            let _ = sender.send(Err(e));
235            return;
236        }
237
238        let request_sender = self.sender.clone();
239        let cache_manager = self.cache_manager.clone();
240        let listener = self.listener.clone();
241        // Now the region is in editing state.
242        // Updates manifest in background.
243        common_runtime::spawn_global(async move {
244            let result = edit_region(&region, edit.clone(), cache_manager, listener).await;
245            let notify = WorkerRequest::Background {
246                region_id,
247                notify: BackgroundNotify::RegionEdit(RegionEditResult {
248                    region_id,
249                    sender,
250                    edit,
251                    result,
252                }),
253            };
254
255            // We don't set state back as the worker loop is already exited.
256            if let Err(res) = request_sender
257                .send(WorkerRequestWithTime::new(notify))
258                .await
259            {
260                warn!(
261                    "Failed to send region edit result back to the worker, region_id: {}, res: {:?}",
262                    region_id, res
263                );
264            }
265        });
266    }
267
268    /// Handles region edit result.
269    pub(crate) async fn handle_region_edit_result(&mut self, edit_result: RegionEditResult) {
270        let region = match self.regions.get_region(edit_result.region_id) {
271            Some(region) => region,
272            None => {
273                let _ = edit_result.sender.send(
274                    RegionNotFoundSnafu {
275                        region_id: edit_result.region_id,
276                    }
277                    .fail(),
278                );
279                return;
280            }
281        };
282
283        let need_compaction =
284            edit_result.result.is_ok() && !edit_result.edit.files_to_add.is_empty();
285
286        if edit_result.result.is_ok() {
287            // Applies the edit to the region.
288            region.version_control.apply_edit(
289                Some(edit_result.edit),
290                &[],
291                region.file_purger.clone(),
292            );
293        }
294
295        // Sets the region as writable.
296        region.switch_state_to_writable(RegionLeaderState::Editing);
297
298        let _ = edit_result.sender.send(edit_result.result);
299
300        if let Some(edit_queue) = self.region_edit_queues.get_mut(&edit_result.region_id)
301            && let Some(request) = edit_queue.dequeue()
302        {
303            self.handle_region_edit(request).await;
304        }
305
306        if need_compaction {
307            self.schedule_compaction(&region).await;
308        }
309    }
310
311    /// Writes truncate action to the manifest and then applies it to the region in background.
312    pub(crate) fn handle_manifest_truncate_action(
313        &self,
314        region: MitoRegionRef,
315        truncate: RegionTruncate,
316        sender: OptionOutputTx,
317    ) {
318        // Marks the region as truncating.
319        // This prevents the region from being accessed by other write requests.
320        if let Err(e) = region.set_truncating() {
321            sender.send(Err(e));
322            return;
323        }
324        // Now the region is in truncating state.
325
326        let request_sender = self.sender.clone();
327        let manifest_ctx = region.manifest_ctx.clone();
328
329        // Updates manifest in background.
330        common_runtime::spawn_global(async move {
331            // Write region truncated to manifest.
332            let action_list =
333                RegionMetaActionList::with_action(RegionMetaAction::Truncate(truncate.clone()));
334
335            let result = manifest_ctx
336                .update_manifest(RegionLeaderState::Truncating, action_list)
337                .await
338                .map(|_| ());
339
340            // Sends the result back to the request sender.
341            let truncate_result = TruncateResult {
342                region_id: truncate.region_id,
343                sender,
344                result,
345                kind: truncate.kind,
346            };
347            let _ = request_sender
348                .send(WorkerRequestWithTime::new(WorkerRequest::Background {
349                    region_id: truncate.region_id,
350                    notify: BackgroundNotify::Truncate(truncate_result),
351                }))
352                .await
353                .inspect_err(|_| warn!("failed to send truncate result"));
354        });
355    }
356
357    /// Writes region change action to the manifest and then applies it to the region in background.
358    pub(crate) fn handle_manifest_region_change(
359        &self,
360        region: MitoRegionRef,
361        change: RegionChange,
362        need_index: bool,
363        sender: OptionOutputTx,
364    ) {
365        // Marks the region as altering.
366        if let Err(e) = region.set_altering() {
367            sender.send(Err(e));
368            return;
369        }
370        let listener = self.listener.clone();
371        let request_sender = self.sender.clone();
372        // Now the region is in altering state.
373        common_runtime::spawn_global(async move {
374            let new_meta = change.metadata.clone();
375            let action_list = RegionMetaActionList::with_action(RegionMetaAction::Change(change));
376
377            let result = region
378                .manifest_ctx
379                .update_manifest(RegionLeaderState::Altering, action_list)
380                .await
381                .map(|_| ());
382            let notify = WorkerRequest::Background {
383                region_id: region.region_id,
384                notify: BackgroundNotify::RegionChange(RegionChangeResult {
385                    region_id: region.region_id,
386                    sender,
387                    result,
388                    new_meta,
389                    need_index,
390                }),
391            };
392            listener
393                .on_notify_region_change_result_begin(region.region_id)
394                .await;
395
396            if let Err(res) = request_sender
397                .send(WorkerRequestWithTime::new(notify))
398                .await
399            {
400                warn!(
401                    "Failed to send region change result back to the worker, region_id: {}, res: {:?}",
402                    region.region_id, res
403                );
404            }
405        });
406    }
407}
408
409/// Checks the edit, writes and applies it.
410async fn edit_region(
411    region: &MitoRegionRef,
412    edit: RegionEdit,
413    cache_manager: CacheManagerRef,
414    listener: WorkerListener,
415) -> Result<()> {
416    let region_id = region.region_id;
417    if let Some(write_cache) = cache_manager.write_cache() {
418        for file_meta in &edit.files_to_add {
419            let write_cache = write_cache.clone();
420            let layer = region.access_layer.clone();
421            let listener = listener.clone();
422
423            let index_key = IndexKey::new(region_id, file_meta.file_id, FileType::Parquet);
424            let remote_path =
425                location::sst_file_path(layer.table_dir(), file_meta.file_id(), layer.path_type());
426
427            let is_index_exist = file_meta.exists_index();
428            let index_file_size = file_meta.index_file_size();
429
430            let index_file_index_key =
431                IndexKey::new(region_id, file_meta.file_id, FileType::Puffin);
432            let index_remote_path = location::index_file_path(
433                layer.table_dir(),
434                file_meta.file_id(),
435                layer.path_type(),
436            );
437
438            let file_size = file_meta.file_size;
439            common_runtime::spawn_global(async move {
440                WRITE_CACHE_INFLIGHT_DOWNLOAD.add(1);
441
442                if write_cache
443                    .download(index_key, &remote_path, layer.object_store(), file_size)
444                    .await
445                    .is_ok()
446                {
447                    // Triggers the filling of the parquet metadata cache.
448                    // The parquet file is already downloaded.
449                    let _ = write_cache
450                        .file_cache()
451                        .get_parquet_meta_data(index_key)
452                        .await;
453
454                    listener.on_file_cache_filled(index_key.file_id);
455                }
456                if is_index_exist {
457                    // also download puffin file
458                    if let Err(err) = write_cache
459                        .download(
460                            index_file_index_key,
461                            &index_remote_path,
462                            layer.object_store(),
463                            index_file_size,
464                        )
465                        .await
466                    {
467                        common_telemetry::error!(
468                            err; "Failed to download puffin file, region_id: {}, index_file_index_key: {:?}, index_remote_path: {}", region_id, index_file_index_key, index_remote_path
469                        );
470                    }
471                }
472
473                WRITE_CACHE_INFLIGHT_DOWNLOAD.sub(1);
474            });
475        }
476    }
477
478    info!("Applying {edit:?} to region {}", region_id);
479
480    let action_list = RegionMetaActionList::with_action(RegionMetaAction::Edit(edit));
481    region
482        .manifest_ctx
483        .update_manifest(RegionLeaderState::Editing, action_list)
484        .await
485        .map(|_| ())
486}