mito2/region/
version.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Version control of mito engine.
16//!
17//! Version is an immutable snapshot of region's metadata.
18//!
19//! To read latest data from `VersionControl`, we should
20//! 1. Acquire `Version` from `VersionControl`.
21//! 2. Then acquire last sequence.
22//!
23//! Reason: data may be flushed/compacted and some data with old sequence may be removed
24//! and became invisible between step 1 and 2, so need to acquire version at first.
25
26use std::sync::{Arc, RwLock};
27use std::time::Duration;
28
29use common_telemetry::info;
30use store_api::metadata::RegionMetadataRef;
31use store_api::storage::SequenceNumber;
32
33use crate::error::Result;
34use crate::manifest::action::{RegionEdit, TruncateKind};
35use crate::memtable::time_partition::{TimePartitions, TimePartitionsRef};
36use crate::memtable::version::{MemtableVersion, MemtableVersionRef};
37use crate::memtable::{MemtableBuilderRef, MemtableId};
38use crate::region::options::RegionOptions;
39use crate::sst::file::FileMeta;
40use crate::sst::file_purger::FilePurgerRef;
41use crate::sst::version::{SstVersion, SstVersionRef};
42use crate::wal::EntryId;
43
44/// Controls metadata and sequence numbers for a region.
45///
46/// It manages metadata in a copy-on-write fashion. Any modification to a region's metadata
47/// will generate a new [Version].
48#[derive(Debug)]
49pub(crate) struct VersionControl {
50    data: RwLock<VersionControlData>,
51}
52
53impl VersionControl {
54    /// Returns a new [VersionControl] with specific `version`.
55    pub(crate) fn new(version: Version) -> VersionControl {
56        // Initialize sequence and entry id from flushed sequence and entry id.
57        let (flushed_sequence, flushed_entry_id) =
58            (version.flushed_sequence, version.flushed_entry_id);
59        VersionControl {
60            data: RwLock::new(VersionControlData {
61                version: Arc::new(version),
62                committed_sequence: flushed_sequence,
63                last_entry_id: flushed_entry_id,
64                is_dropped: false,
65            }),
66        }
67    }
68
69    /// Returns current copy of data.
70    pub(crate) fn current(&self) -> VersionControlData {
71        self.data.read().unwrap().clone()
72    }
73
74    /// Updates the `committed_sequence` of version.
75    pub(crate) fn set_committed_sequence(&self, seq: SequenceNumber) {
76        let mut data = self.data.write().unwrap();
77        data.committed_sequence = seq;
78    }
79
80    /// Updates committed sequence and entry id.
81    pub(crate) fn set_sequence_and_entry_id(&self, seq: SequenceNumber, entry_id: EntryId) {
82        let mut data = self.data.write().unwrap();
83        data.committed_sequence = seq;
84        data.last_entry_id = entry_id;
85    }
86
87    /// Updates last entry id.
88    pub(crate) fn set_entry_id(&self, entry_id: EntryId) {
89        let mut data = self.data.write().unwrap();
90        data.last_entry_id = entry_id;
91    }
92
93    /// Sequence number of last committed data.
94    pub(crate) fn committed_sequence(&self) -> SequenceNumber {
95        self.data.read().unwrap().committed_sequence
96    }
97
98    /// Freezes the mutable memtable if it is not empty.
99    pub(crate) fn freeze_mutable(&self) -> Result<()> {
100        let version = self.current().version;
101        let time_window = version.compaction_time_window;
102
103        let Some(new_memtables) = version
104            .memtables
105            .freeze_mutable(&version.metadata, time_window)?
106        else {
107            return Ok(());
108        };
109
110        // Create a new version with memtable switched.
111        let new_version = Arc::new(
112            VersionBuilder::from_version(version)
113                .memtables(new_memtables)
114                .build(),
115        );
116
117        let mut version_data = self.data.write().unwrap();
118        version_data.version = new_version;
119
120        Ok(())
121    }
122
123    /// Applies region option changes and generates a new version.
124    pub(crate) fn alter_options(&self, options: RegionOptions) {
125        let version = self.current().version;
126        let new_version = Arc::new(
127            VersionBuilder::from_version(version)
128                .options(options)
129                .build(),
130        );
131        let mut version_data = self.data.write().unwrap();
132        version_data.version = new_version;
133    }
134
135    /// Apply edit to current version.
136    ///
137    /// If `edit` is None, only removes the specified memtables.
138    pub(crate) fn apply_edit(
139        &self,
140        edit: Option<RegionEdit>,
141        memtables_to_remove: &[MemtableId],
142        purger: FilePurgerRef,
143    ) {
144        let version = self.current().version;
145        let builder = VersionBuilder::from_version(version);
146        let committed_sequence = edit.as_ref().and_then(|e| e.committed_sequence);
147        let builder = if let Some(edit) = edit {
148            builder.apply_edit(edit, purger)
149        } else {
150            builder
151        };
152        let new_version = Arc::new(builder.remove_memtables(memtables_to_remove).build());
153
154        let mut version_data = self.data.write().unwrap();
155        version_data.committed_sequence = if let Some(committed_in_edit) = committed_sequence {
156            version_data.committed_sequence.max(committed_in_edit)
157        } else {
158            version_data.committed_sequence
159        };
160        version_data.version = new_version;
161    }
162
163    /// Mark all opened files as deleted and set the delete marker in [VersionControlData]
164    pub(crate) fn mark_dropped(&self) {
165        let version = self.current().version;
166        let part_duration = Some(version.memtables.mutable.part_duration());
167        let next_memtable_id = version.memtables.mutable.next_memtable_id();
168        let memtable_builder = version.memtables.mutable.memtable_builder().clone();
169        let new_mutable = Arc::new(TimePartitions::new(
170            version.metadata.clone(),
171            memtable_builder,
172            next_memtable_id,
173            part_duration,
174        ));
175
176        let mut data = self.data.write().unwrap();
177        data.is_dropped = true;
178        data.version.ssts.mark_all_deleted();
179        // Reset version so we can release the reference to memtables and SSTs.
180        let new_version =
181            Arc::new(VersionBuilder::new(version.metadata.clone(), new_mutable).build());
182        data.version = new_version;
183    }
184
185    /// Alter schema of the region.
186    ///
187    /// It replaces existing mutable memtable with a memtable that uses the
188    /// new schema. Memtables of the version must be empty.
189    pub(crate) fn alter_schema(&self, metadata: RegionMetadataRef) {
190        let version = self.current().version;
191        let part_duration = Some(version.memtables.mutable.part_duration());
192        let next_memtable_id = version.memtables.mutable.next_memtable_id();
193        let memtable_builder = version.memtables.mutable.memtable_builder().clone();
194        let new_mutable = Arc::new(TimePartitions::new(
195            metadata.clone(),
196            memtable_builder,
197            next_memtable_id,
198            part_duration,
199        ));
200        debug_assert!(version.memtables.mutable.is_empty());
201        debug_assert!(version.memtables.immutables().is_empty());
202        let new_version = Arc::new(
203            VersionBuilder::from_version(version)
204                .metadata(metadata)
205                .memtables(MemtableVersion::new(new_mutable))
206                .build(),
207        );
208
209        let mut version_data = self.data.write().unwrap();
210        version_data.version = new_version;
211    }
212
213    /// Alter schema and format of the region.
214    ///
215    /// It replaces existing mutable memtable with a memtable that uses the
216    /// new format. Memtables of the version must be empty.
217    pub(crate) fn alter_schema_and_format(
218        &self,
219        metadata: RegionMetadataRef,
220        options: RegionOptions,
221        memtable_builder: MemtableBuilderRef,
222    ) {
223        let version = self.current().version;
224        let part_duration = Some(version.memtables.mutable.part_duration());
225        let next_memtable_id = version.memtables.mutable.next_memtable_id();
226        // Use the new metadata to build `TimePartitions`.
227        let new_mutable = Arc::new(TimePartitions::new(
228            metadata.clone(),
229            memtable_builder,
230            next_memtable_id,
231            part_duration,
232        ));
233        debug_assert!(version.memtables.mutable.is_empty());
234        debug_assert!(version.memtables.immutables().is_empty());
235        let new_version = Arc::new(
236            VersionBuilder::from_version(version)
237                .metadata(metadata)
238                .options(options)
239                .memtables(MemtableVersion::new(new_mutable))
240                .build(),
241        );
242
243        let mut version_data = self.data.write().unwrap();
244        version_data.version = new_version;
245    }
246
247    /// Truncate current version.
248    pub(crate) fn truncate(&self, truncate_kind: TruncateKind) {
249        let version = self.current().version;
250
251        let part_duration = version.memtables.mutable.part_duration();
252        let next_memtable_id = version.memtables.mutable.next_memtable_id();
253        let memtable_builder = version.memtables.mutable.memtable_builder().clone();
254        let new_mutable = Arc::new(TimePartitions::new(
255            version.metadata.clone(),
256            memtable_builder,
257            next_memtable_id,
258            Some(part_duration),
259        ));
260        match truncate_kind {
261            TruncateKind::All {
262                truncated_entry_id,
263                truncated_sequence,
264            } => {
265                let new_version = Arc::new(
266                    VersionBuilder::from_version(version)
267                        .memtables(MemtableVersion::new(new_mutable))
268                        .clear_files()
269                        .flushed_entry_id(truncated_entry_id)
270                        .flushed_sequence(truncated_sequence)
271                        .truncated_entry_id(Some(truncated_entry_id))
272                        .build(),
273                );
274
275                let mut version_data = self.data.write().unwrap();
276                version_data.version.ssts.mark_all_deleted();
277                version_data.version = new_version;
278            }
279            TruncateKind::Partial { files_to_remove } => {
280                let new_version = Arc::new(
281                    VersionBuilder::from_version(version)
282                        .remove_files(files_to_remove.into_iter())
283                        .build(),
284                );
285
286                let mut version_data = self.data.write().unwrap();
287                // notice since it's partial, no need to mark all files as deleted
288                version_data.version = new_version;
289            }
290        };
291    }
292
293    /// Overwrites the current version with a new version.
294    pub(crate) fn overwrite_current(&self, version: VersionRef) {
295        let mut version_data = self.data.write().unwrap();
296        version_data.version = version;
297    }
298}
299
300pub(crate) type VersionControlRef = Arc<VersionControl>;
301
302/// Data of [VersionControl].
303#[derive(Debug, Clone)]
304pub(crate) struct VersionControlData {
305    /// Latest version.
306    pub(crate) version: VersionRef,
307    /// Sequence number of last committed data.
308    ///
309    /// Starts from 1 (zero means no data).
310    pub(crate) committed_sequence: SequenceNumber,
311    /// Last WAL entry Id.
312    ///
313    /// Starts from 1 (zero means no data).
314    pub(crate) last_entry_id: EntryId,
315    /// Marker of whether this region is dropped/dropping
316    pub(crate) is_dropped: bool,
317}
318
319impl VersionControlData {
320    /// Approximate timeseries count in current version.
321    pub(crate) fn series_count(&self) -> usize {
322        self.version.memtables.mutable.series_count()
323    }
324}
325
326/// Static metadata of a region.
327#[derive(Clone, Debug)]
328pub(crate) struct Version {
329    /// Metadata of the region.
330    ///
331    /// Altering metadata isn't frequent, storing metadata in Arc to allow sharing
332    /// metadata and reuse metadata when creating a new `Version`.
333    pub(crate) metadata: RegionMetadataRef,
334    /// Mutable and immutable memtables.
335    ///
336    /// Wrapped in Arc to make clone of `Version` much cheaper.
337    pub(crate) memtables: MemtableVersionRef,
338    /// SSTs of the region.
339    pub(crate) ssts: SstVersionRef,
340    /// Inclusive max WAL entry id of flushed data.
341    pub(crate) flushed_entry_id: EntryId,
342    /// Inclusive max sequence of flushed data.
343    pub(crate) flushed_sequence: SequenceNumber,
344    /// Latest entry id during the truncating table.
345    ///
346    /// Used to check if it is a flush task during the truncating table.
347    pub(crate) truncated_entry_id: Option<EntryId>,
348    /// Inferred compaction time window from flush.
349    ///
350    /// If compaction options contain a time window, it will overwrite this value
351    /// when creating a new version from the [VersionBuilder].
352    pub(crate) compaction_time_window: Option<Duration>,
353    /// Options of the region.
354    pub(crate) options: RegionOptions,
355}
356
357pub(crate) type VersionRef = Arc<Version>;
358
359/// Version builder.
360pub(crate) struct VersionBuilder {
361    metadata: RegionMetadataRef,
362    memtables: MemtableVersionRef,
363    ssts: SstVersionRef,
364    flushed_entry_id: EntryId,
365    flushed_sequence: SequenceNumber,
366    truncated_entry_id: Option<EntryId>,
367    compaction_time_window: Option<Duration>,
368    options: RegionOptions,
369}
370
371impl VersionBuilder {
372    /// Returns a new builder.
373    pub(crate) fn new(metadata: RegionMetadataRef, mutable: TimePartitionsRef) -> Self {
374        VersionBuilder {
375            metadata,
376            memtables: Arc::new(MemtableVersion::new(mutable)),
377            ssts: Arc::new(SstVersion::new()),
378            flushed_entry_id: 0,
379            flushed_sequence: 0,
380            truncated_entry_id: None,
381            compaction_time_window: None,
382            options: RegionOptions::default(),
383        }
384    }
385
386    /// Returns a new builder from an existing version.
387    pub(crate) fn from_version(version: VersionRef) -> Self {
388        VersionBuilder {
389            metadata: version.metadata.clone(),
390            memtables: version.memtables.clone(),
391            ssts: version.ssts.clone(),
392            flushed_entry_id: version.flushed_entry_id,
393            flushed_sequence: version.flushed_sequence,
394            truncated_entry_id: version.truncated_entry_id,
395            compaction_time_window: version.compaction_time_window,
396            options: version.options.clone(),
397        }
398    }
399
400    /// Sets memtables.
401    pub(crate) fn memtables(mut self, memtables: MemtableVersion) -> Self {
402        self.memtables = Arc::new(memtables);
403        self
404    }
405
406    /// Sets metadata.
407    pub(crate) fn metadata(mut self, metadata: RegionMetadataRef) -> Self {
408        self.metadata = metadata;
409        self
410    }
411
412    /// Sets flushed entry id.
413    pub(crate) fn flushed_entry_id(mut self, entry_id: EntryId) -> Self {
414        self.flushed_entry_id = entry_id;
415        self
416    }
417
418    /// Sets flushed sequence.
419    pub(crate) fn flushed_sequence(mut self, sequence: SequenceNumber) -> Self {
420        self.flushed_sequence = sequence;
421        self
422    }
423
424    /// Sets truncated entry id.
425    pub(crate) fn truncated_entry_id(mut self, entry_id: Option<EntryId>) -> Self {
426        self.truncated_entry_id = entry_id;
427        self
428    }
429
430    /// Sets compaction time window.
431    pub(crate) fn compaction_time_window(mut self, window: Option<Duration>) -> Self {
432        self.compaction_time_window = window;
433        self
434    }
435
436    /// Sets options.
437    pub(crate) fn options(mut self, options: RegionOptions) -> Self {
438        self.options = options;
439        self
440    }
441
442    /// Apply edit to the builder.
443    pub(crate) fn apply_edit(mut self, edit: RegionEdit, file_purger: FilePurgerRef) -> Self {
444        if let Some(entry_id) = edit.flushed_entry_id {
445            self.flushed_entry_id = self.flushed_entry_id.max(entry_id);
446        }
447        if let Some(sequence) = edit.flushed_sequence {
448            self.flushed_sequence = self.flushed_sequence.max(sequence);
449        }
450        if let Some(window) = edit.compaction_time_window {
451            self.compaction_time_window = Some(window);
452        }
453        if !edit.files_to_add.is_empty() || !edit.files_to_remove.is_empty() {
454            let mut ssts = (*self.ssts).clone();
455            ssts.add_files(file_purger, edit.files_to_add.into_iter());
456            ssts.remove_files(edit.files_to_remove.into_iter());
457            self.ssts = Arc::new(ssts);
458        }
459
460        self
461    }
462
463    /// Remove memtables from the builder.
464    pub(crate) fn remove_memtables(mut self, ids: &[MemtableId]) -> Self {
465        if !ids.is_empty() {
466            let mut memtables = (*self.memtables).clone();
467            memtables.remove_memtables(ids);
468            self.memtables = Arc::new(memtables);
469        }
470        self
471    }
472
473    /// Add files to the builder.
474    pub(crate) fn add_files(
475        mut self,
476        file_purger: FilePurgerRef,
477        files: impl Iterator<Item = FileMeta>,
478    ) -> Self {
479        let mut ssts = (*self.ssts).clone();
480        ssts.add_files(file_purger, files);
481        self.ssts = Arc::new(ssts);
482
483        self
484    }
485
486    pub(crate) fn remove_files(mut self, files: impl Iterator<Item = FileMeta>) -> Self {
487        let mut ssts = (*self.ssts).clone();
488        ssts.remove_files(files);
489        self.ssts = Arc::new(ssts);
490
491        self
492    }
493
494    /// Clear all files in the builder.
495    pub(crate) fn clear_files(mut self) -> Self {
496        self.ssts = Arc::new(SstVersion::new());
497        self
498    }
499
500    /// Builds a new [Version] from the builder.
501    /// It overwrites the window size by compaction option.
502    pub(crate) fn build(self) -> Version {
503        let compaction_time_window = self
504            .options
505            .compaction
506            .time_window()
507            .or(self.compaction_time_window);
508        if self.compaction_time_window.is_some()
509            && compaction_time_window != self.compaction_time_window
510        {
511            info!(
512                "VersionBuilder overwrites region compaction time window from {:?} to {:?}, region: {}",
513                self.compaction_time_window, compaction_time_window, self.metadata.region_id
514            );
515        }
516
517        Version {
518            metadata: self.metadata,
519            memtables: self.memtables,
520            ssts: self.ssts,
521            flushed_entry_id: self.flushed_entry_id,
522            flushed_sequence: self.flushed_sequence,
523            truncated_entry_id: self.truncated_entry_id,
524            compaction_time_window,
525            options: self.options,
526        }
527    }
528}