mito2/region/
version.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Version control of mito engine.
16//!
17//! Version is an immutable snapshot of region's metadata.
18//!
19//! To read latest data from `VersionControl`, we should
20//! 1. Acquire `Version` from `VersionControl`.
21//! 2. Then acquire last sequence.
22//!
23//! Reason: data may be flushed/compacted and some data with old sequence may be removed
24//! and became invisible between step 1 and 2, so need to acquire version at first.
25
26use std::sync::{Arc, RwLock};
27use std::time::Duration;
28
29use common_telemetry::info;
30use store_api::metadata::RegionMetadataRef;
31use store_api::storage::SequenceNumber;
32
33use crate::error::Result;
34use crate::manifest::action::RegionEdit;
35use crate::memtable::time_partition::{TimePartitions, TimePartitionsRef};
36use crate::memtable::version::{MemtableVersion, MemtableVersionRef};
37use crate::memtable::{MemtableBuilderRef, MemtableId};
38use crate::region::options::RegionOptions;
39use crate::sst::file::FileMeta;
40use crate::sst::file_purger::FilePurgerRef;
41use crate::sst::version::{SstVersion, SstVersionRef};
42use crate::wal::EntryId;
43
44/// Controls metadata and sequence numbers for a region.
45///
46/// It manages metadata in a copy-on-write fashion. Any modification to a region's metadata
47/// will generate a new [Version].
48#[derive(Debug)]
49pub(crate) struct VersionControl {
50    data: RwLock<VersionControlData>,
51}
52
53impl VersionControl {
54    /// Returns a new [VersionControl] with specific `version`.
55    pub(crate) fn new(version: Version) -> VersionControl {
56        // Initialize sequence and entry id from flushed sequence and entry id.
57        let (flushed_sequence, flushed_entry_id) =
58            (version.flushed_sequence, version.flushed_entry_id);
59        VersionControl {
60            data: RwLock::new(VersionControlData {
61                version: Arc::new(version),
62                committed_sequence: flushed_sequence,
63                last_entry_id: flushed_entry_id,
64                is_dropped: false,
65            }),
66        }
67    }
68
69    /// Returns current copy of data.
70    pub(crate) fn current(&self) -> VersionControlData {
71        self.data.read().unwrap().clone()
72    }
73
74    /// Updates committed sequence and entry id.
75    pub(crate) fn set_sequence_and_entry_id(&self, seq: SequenceNumber, entry_id: EntryId) {
76        let mut data = self.data.write().unwrap();
77        data.committed_sequence = seq;
78        data.last_entry_id = entry_id;
79    }
80
81    /// Sequence number of last committed data.
82    pub(crate) fn committed_sequence(&self) -> SequenceNumber {
83        self.data.read().unwrap().committed_sequence
84    }
85
86    /// Freezes the mutable memtable if it is not empty.
87    pub(crate) fn freeze_mutable(&self) -> Result<()> {
88        let version = self.current().version;
89        let time_window = version.compaction_time_window;
90
91        let Some(new_memtables) = version
92            .memtables
93            .freeze_mutable(&version.metadata, time_window)?
94        else {
95            return Ok(());
96        };
97
98        // Create a new version with memtable switched.
99        let new_version = Arc::new(
100            VersionBuilder::from_version(version)
101                .memtables(new_memtables)
102                .build(),
103        );
104
105        let mut version_data = self.data.write().unwrap();
106        version_data.version = new_version;
107
108        Ok(())
109    }
110
111    /// Applies region option changes and generates a new version.
112    pub(crate) fn alter_options(&self, options: RegionOptions) {
113        let version = self.current().version;
114        let new_version = Arc::new(
115            VersionBuilder::from_version(version)
116                .options(options)
117                .build(),
118        );
119        let mut version_data = self.data.write().unwrap();
120        version_data.version = new_version;
121    }
122
123    /// Apply edit to current version.
124    pub(crate) fn apply_edit(
125        &self,
126        edit: RegionEdit,
127        memtables_to_remove: &[MemtableId],
128        purger: FilePurgerRef,
129    ) {
130        let version = self.current().version;
131        let new_version = Arc::new(
132            VersionBuilder::from_version(version)
133                .apply_edit(edit, purger)
134                .remove_memtables(memtables_to_remove)
135                .build(),
136        );
137
138        let mut version_data = self.data.write().unwrap();
139        version_data.version = new_version;
140    }
141
142    /// Mark all opened files as deleted and set the delete marker in [VersionControlData]
143    pub(crate) fn mark_dropped(&self, memtable_builder: &MemtableBuilderRef) {
144        let version = self.current().version;
145        let part_duration = version.memtables.mutable.part_duration();
146        let next_memtable_id = version.memtables.mutable.next_memtable_id();
147        let new_mutable = Arc::new(TimePartitions::new(
148            version.metadata.clone(),
149            memtable_builder.clone(),
150            next_memtable_id,
151            part_duration,
152        ));
153
154        let mut data = self.data.write().unwrap();
155        data.is_dropped = true;
156        data.version.ssts.mark_all_deleted();
157        // Reset version so we can release the reference to memtables and SSTs.
158        let new_version =
159            Arc::new(VersionBuilder::new(version.metadata.clone(), new_mutable).build());
160        data.version = new_version;
161    }
162
163    /// Alter schema of the region.
164    ///
165    /// It replaces existing mutable memtable with a memtable that uses the
166    /// new schema. Memtables of the version must be empty.
167    pub(crate) fn alter_schema(&self, metadata: RegionMetadataRef, builder: &MemtableBuilderRef) {
168        let version = self.current().version;
169        let part_duration = version.memtables.mutable.part_duration();
170        let next_memtable_id = version.memtables.mutable.next_memtable_id();
171        let new_mutable = Arc::new(TimePartitions::new(
172            metadata.clone(),
173            builder.clone(),
174            next_memtable_id,
175            part_duration,
176        ));
177        debug_assert!(version.memtables.mutable.is_empty());
178        debug_assert!(version.memtables.immutables().is_empty());
179        let new_version = Arc::new(
180            VersionBuilder::from_version(version)
181                .metadata(metadata)
182                .memtables(MemtableVersion::new(new_mutable))
183                .build(),
184        );
185
186        let mut version_data = self.data.write().unwrap();
187        version_data.version = new_version;
188    }
189
190    /// Truncate current version.
191    pub(crate) fn truncate(
192        &self,
193        truncated_entry_id: EntryId,
194        truncated_sequence: SequenceNumber,
195        memtable_builder: &MemtableBuilderRef,
196    ) {
197        let version = self.current().version;
198
199        let part_duration = version.memtables.mutable.part_duration();
200        let next_memtable_id = version.memtables.mutable.next_memtable_id();
201        let new_mutable = Arc::new(TimePartitions::new(
202            version.metadata.clone(),
203            memtable_builder.clone(),
204            next_memtable_id,
205            part_duration,
206        ));
207        let new_version = Arc::new(
208            VersionBuilder::new(version.metadata.clone(), new_mutable)
209                .flushed_entry_id(truncated_entry_id)
210                .flushed_sequence(truncated_sequence)
211                .truncated_entry_id(Some(truncated_entry_id))
212                .build(),
213        );
214
215        let mut version_data = self.data.write().unwrap();
216        version_data.version.ssts.mark_all_deleted();
217        version_data.version = new_version;
218    }
219
220    /// Overwrites the current version with a new version.
221    pub(crate) fn overwrite_current(&self, version: VersionRef) {
222        let mut version_data = self.data.write().unwrap();
223        version_data.version = version;
224    }
225}
226
227pub(crate) type VersionControlRef = Arc<VersionControl>;
228
229/// Data of [VersionControl].
230#[derive(Debug, Clone)]
231pub(crate) struct VersionControlData {
232    /// Latest version.
233    pub(crate) version: VersionRef,
234    /// Sequence number of last committed data.
235    ///
236    /// Starts from 1 (zero means no data).
237    pub(crate) committed_sequence: SequenceNumber,
238    /// Last WAL entry Id.
239    ///
240    /// Starts from 1 (zero means no data).
241    pub(crate) last_entry_id: EntryId,
242    /// Marker of whether this region is dropped/dropping
243    pub(crate) is_dropped: bool,
244}
245
246/// Static metadata of a region.
247#[derive(Clone, Debug)]
248pub(crate) struct Version {
249    /// Metadata of the region.
250    ///
251    /// Altering metadata isn't frequent, storing metadata in Arc to allow sharing
252    /// metadata and reuse metadata when creating a new `Version`.
253    pub(crate) metadata: RegionMetadataRef,
254    /// Mutable and immutable memtables.
255    ///
256    /// Wrapped in Arc to make clone of `Version` much cheaper.
257    pub(crate) memtables: MemtableVersionRef,
258    /// SSTs of the region.
259    pub(crate) ssts: SstVersionRef,
260    /// Inclusive max WAL entry id of flushed data.
261    pub(crate) flushed_entry_id: EntryId,
262    /// Inclusive max sequence of flushed data.
263    pub(crate) flushed_sequence: SequenceNumber,
264    /// Latest entry id during the truncating table.
265    ///
266    /// Used to check if it is a flush task during the truncating table.
267    pub(crate) truncated_entry_id: Option<EntryId>,
268    /// Inferred compaction time window from flush.
269    ///
270    /// If compaction options contain a time window, it will overwrite this value
271    /// when creating a new version from the [VersionBuilder].
272    pub(crate) compaction_time_window: Option<Duration>,
273    /// Options of the region.
274    pub(crate) options: RegionOptions,
275}
276
277pub(crate) type VersionRef = Arc<Version>;
278
279/// Version builder.
280pub(crate) struct VersionBuilder {
281    metadata: RegionMetadataRef,
282    memtables: MemtableVersionRef,
283    ssts: SstVersionRef,
284    flushed_entry_id: EntryId,
285    flushed_sequence: SequenceNumber,
286    truncated_entry_id: Option<EntryId>,
287    compaction_time_window: Option<Duration>,
288    options: RegionOptions,
289}
290
291impl VersionBuilder {
292    /// Returns a new builder.
293    pub(crate) fn new(metadata: RegionMetadataRef, mutable: TimePartitionsRef) -> Self {
294        VersionBuilder {
295            metadata,
296            memtables: Arc::new(MemtableVersion::new(mutable)),
297            ssts: Arc::new(SstVersion::new()),
298            flushed_entry_id: 0,
299            flushed_sequence: 0,
300            truncated_entry_id: None,
301            compaction_time_window: None,
302            options: RegionOptions::default(),
303        }
304    }
305
306    /// Returns a new builder from an existing version.
307    pub(crate) fn from_version(version: VersionRef) -> Self {
308        VersionBuilder {
309            metadata: version.metadata.clone(),
310            memtables: version.memtables.clone(),
311            ssts: version.ssts.clone(),
312            flushed_entry_id: version.flushed_entry_id,
313            flushed_sequence: version.flushed_sequence,
314            truncated_entry_id: version.truncated_entry_id,
315            compaction_time_window: version.compaction_time_window,
316            options: version.options.clone(),
317        }
318    }
319
320    /// Sets memtables.
321    pub(crate) fn memtables(mut self, memtables: MemtableVersion) -> Self {
322        self.memtables = Arc::new(memtables);
323        self
324    }
325
326    /// Sets metadata.
327    pub(crate) fn metadata(mut self, metadata: RegionMetadataRef) -> Self {
328        self.metadata = metadata;
329        self
330    }
331
332    /// Sets flushed entry id.
333    pub(crate) fn flushed_entry_id(mut self, entry_id: EntryId) -> Self {
334        self.flushed_entry_id = entry_id;
335        self
336    }
337
338    /// Sets flushed sequence.
339    pub(crate) fn flushed_sequence(mut self, sequence: SequenceNumber) -> Self {
340        self.flushed_sequence = sequence;
341        self
342    }
343
344    /// Sets truncated entty id.
345    pub(crate) fn truncated_entry_id(mut self, entry_id: Option<EntryId>) -> Self {
346        self.truncated_entry_id = entry_id;
347        self
348    }
349
350    /// Sets compaction time window.
351    pub(crate) fn compaction_time_window(mut self, window: Option<Duration>) -> Self {
352        self.compaction_time_window = window;
353        self
354    }
355
356    /// Sets options.
357    pub(crate) fn options(mut self, options: RegionOptions) -> Self {
358        self.options = options;
359        self
360    }
361
362    /// Apply edit to the builder.
363    pub(crate) fn apply_edit(mut self, edit: RegionEdit, file_purger: FilePurgerRef) -> Self {
364        if let Some(entry_id) = edit.flushed_entry_id {
365            self.flushed_entry_id = self.flushed_entry_id.max(entry_id);
366        }
367        if let Some(sequence) = edit.flushed_sequence {
368            self.flushed_sequence = self.flushed_sequence.max(sequence);
369        }
370        if let Some(window) = edit.compaction_time_window {
371            self.compaction_time_window = Some(window);
372        }
373        if !edit.files_to_add.is_empty() || !edit.files_to_remove.is_empty() {
374            let mut ssts = (*self.ssts).clone();
375            ssts.add_files(file_purger, edit.files_to_add.into_iter());
376            ssts.remove_files(edit.files_to_remove.into_iter());
377            self.ssts = Arc::new(ssts);
378        }
379
380        self
381    }
382
383    /// Remove memtables from the builder.
384    pub(crate) fn remove_memtables(mut self, ids: &[MemtableId]) -> Self {
385        if !ids.is_empty() {
386            let mut memtables = (*self.memtables).clone();
387            memtables.remove_memtables(ids);
388            self.memtables = Arc::new(memtables);
389        }
390        self
391    }
392
393    /// Add files to the builder.
394    pub(crate) fn add_files(
395        mut self,
396        file_purger: FilePurgerRef,
397        files: impl Iterator<Item = FileMeta>,
398    ) -> Self {
399        let mut ssts = (*self.ssts).clone();
400        ssts.add_files(file_purger, files);
401        self.ssts = Arc::new(ssts);
402
403        self
404    }
405
406    /// Builds a new [Version] from the builder.
407    /// It overwrites the window size by compaction option.
408    pub(crate) fn build(self) -> Version {
409        let compaction_time_window = self
410            .options
411            .compaction
412            .time_window()
413            .or(self.compaction_time_window);
414        if self.compaction_time_window.is_some()
415            && compaction_time_window != self.compaction_time_window
416        {
417            info!(
418                "VersionBuilder overwrites region compaction time window from {:?} to {:?}, region: {}",
419                self.compaction_time_window,
420                compaction_time_window,
421                self.metadata.region_id
422            );
423        }
424
425        Version {
426            metadata: self.metadata,
427            memtables: self.memtables,
428            ssts: self.ssts,
429            flushed_entry_id: self.flushed_entry_id,
430            flushed_sequence: self.flushed_sequence,
431            truncated_entry_id: self.truncated_entry_id,
432            compaction_time_window,
433            options: self.options,
434        }
435    }
436}