mito2/region/
version.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Version control of mito engine.
16//!
17//! Version is an immutable snapshot of region's metadata.
18//!
19//! To read latest data from `VersionControl`, we should
20//! 1. Acquire `Version` from `VersionControl`.
21//! 2. Then acquire last sequence.
22//!
23//! Reason: data may be flushed/compacted and some data with old sequence may be removed
24//! and became invisible between step 1 and 2, so need to acquire version at first.
25
26use std::sync::{Arc, RwLock};
27use std::time::Duration;
28
29use common_telemetry::info;
30use store_api::metadata::RegionMetadataRef;
31use store_api::storage::SequenceNumber;
32
33use crate::error::Result;
34use crate::manifest::action::{RegionEdit, TruncateKind};
35use crate::memtable::time_partition::{TimePartitions, TimePartitionsRef};
36use crate::memtable::version::{MemtableVersion, MemtableVersionRef};
37use crate::memtable::{MemtableBuilderRef, MemtableId};
38use crate::region::options::RegionOptions;
39use crate::sst::file::FileMeta;
40use crate::sst::file_purger::FilePurgerRef;
41use crate::sst::version::{SstVersion, SstVersionRef};
42use crate::wal::EntryId;
43
44/// Controls metadata and sequence numbers for a region.
45///
46/// It manages metadata in a copy-on-write fashion. Any modification to a region's metadata
47/// will generate a new [Version].
48#[derive(Debug)]
49pub(crate) struct VersionControl {
50    data: RwLock<VersionControlData>,
51}
52
53impl VersionControl {
54    /// Returns a new [VersionControl] with specific `version`.
55    pub(crate) fn new(version: Version) -> VersionControl {
56        // Initialize sequence and entry id from flushed sequence and entry id.
57        let (flushed_sequence, flushed_entry_id) =
58            (version.flushed_sequence, version.flushed_entry_id);
59        VersionControl {
60            data: RwLock::new(VersionControlData {
61                version: Arc::new(version),
62                committed_sequence: flushed_sequence,
63                last_entry_id: flushed_entry_id,
64                is_dropped: false,
65            }),
66        }
67    }
68
69    /// Returns current copy of data.
70    pub(crate) fn current(&self) -> VersionControlData {
71        self.data.read().unwrap().clone()
72    }
73
74    /// Updates committed sequence and entry id.
75    pub(crate) fn set_sequence_and_entry_id(&self, seq: SequenceNumber, entry_id: EntryId) {
76        let mut data = self.data.write().unwrap();
77        data.committed_sequence = seq;
78        data.last_entry_id = entry_id;
79    }
80
81    /// Updates last entry id.
82    pub(crate) fn set_entry_id(&self, entry_id: EntryId) {
83        let mut data = self.data.write().unwrap();
84        data.last_entry_id = entry_id;
85    }
86
87    /// Sequence number of last committed data.
88    pub(crate) fn committed_sequence(&self) -> SequenceNumber {
89        self.data.read().unwrap().committed_sequence
90    }
91
92    /// Freezes the mutable memtable if it is not empty.
93    pub(crate) fn freeze_mutable(&self) -> Result<()> {
94        let version = self.current().version;
95        let time_window = version.compaction_time_window;
96
97        let Some(new_memtables) = version
98            .memtables
99            .freeze_mutable(&version.metadata, time_window)?
100        else {
101            return Ok(());
102        };
103
104        // Create a new version with memtable switched.
105        let new_version = Arc::new(
106            VersionBuilder::from_version(version)
107                .memtables(new_memtables)
108                .build(),
109        );
110
111        let mut version_data = self.data.write().unwrap();
112        version_data.version = new_version;
113
114        Ok(())
115    }
116
117    /// Applies region option changes and generates a new version.
118    pub(crate) fn alter_options(&self, options: RegionOptions) {
119        let version = self.current().version;
120        let new_version = Arc::new(
121            VersionBuilder::from_version(version)
122                .options(options)
123                .build(),
124        );
125        let mut version_data = self.data.write().unwrap();
126        version_data.version = new_version;
127    }
128
129    /// Apply edit to current version.
130    pub(crate) fn apply_edit(
131        &self,
132        edit: RegionEdit,
133        memtables_to_remove: &[MemtableId],
134        purger: FilePurgerRef,
135    ) {
136        let version = self.current().version;
137        let new_version = Arc::new(
138            VersionBuilder::from_version(version)
139                .apply_edit(edit, purger)
140                .remove_memtables(memtables_to_remove)
141                .build(),
142        );
143
144        let mut version_data = self.data.write().unwrap();
145        version_data.version = new_version;
146    }
147
148    /// Mark all opened files as deleted and set the delete marker in [VersionControlData]
149    pub(crate) fn mark_dropped(&self, memtable_builder: &MemtableBuilderRef) {
150        let version = self.current().version;
151        let part_duration = Some(version.memtables.mutable.part_duration());
152        let next_memtable_id = version.memtables.mutable.next_memtable_id();
153        let new_mutable = Arc::new(TimePartitions::new(
154            version.metadata.clone(),
155            memtable_builder.clone(),
156            next_memtable_id,
157            part_duration,
158        ));
159
160        let mut data = self.data.write().unwrap();
161        data.is_dropped = true;
162        data.version.ssts.mark_all_deleted();
163        // Reset version so we can release the reference to memtables and SSTs.
164        let new_version =
165            Arc::new(VersionBuilder::new(version.metadata.clone(), new_mutable).build());
166        data.version = new_version;
167    }
168
169    /// Alter schema of the region.
170    ///
171    /// It replaces existing mutable memtable with a memtable that uses the
172    /// new schema. Memtables of the version must be empty.
173    pub(crate) fn alter_schema(&self, metadata: RegionMetadataRef, builder: &MemtableBuilderRef) {
174        let version = self.current().version;
175        let part_duration = Some(version.memtables.mutable.part_duration());
176        let next_memtable_id = version.memtables.mutable.next_memtable_id();
177        let new_mutable = Arc::new(TimePartitions::new(
178            metadata.clone(),
179            builder.clone(),
180            next_memtable_id,
181            part_duration,
182        ));
183        debug_assert!(version.memtables.mutable.is_empty());
184        debug_assert!(version.memtables.immutables().is_empty());
185        let new_version = Arc::new(
186            VersionBuilder::from_version(version)
187                .metadata(metadata)
188                .memtables(MemtableVersion::new(new_mutable))
189                .build(),
190        );
191
192        let mut version_data = self.data.write().unwrap();
193        version_data.version = new_version;
194    }
195
196    /// Truncate current version.
197    pub(crate) fn truncate(
198        &self,
199        truncate_kind: TruncateKind,
200        memtable_builder: &MemtableBuilderRef,
201    ) {
202        let version = self.current().version;
203
204        let part_duration = version.memtables.mutable.part_duration();
205        let next_memtable_id = version.memtables.mutable.next_memtable_id();
206        let new_mutable = Arc::new(TimePartitions::new(
207            version.metadata.clone(),
208            memtable_builder.clone(),
209            next_memtable_id,
210            Some(part_duration),
211        ));
212        match truncate_kind {
213            TruncateKind::All {
214                truncated_entry_id,
215                truncated_sequence,
216            } => {
217                let new_version = Arc::new(
218                    VersionBuilder::new(version.metadata.clone(), new_mutable)
219                        .flushed_entry_id(truncated_entry_id)
220                        .flushed_sequence(truncated_sequence)
221                        .truncated_entry_id(Some(truncated_entry_id))
222                        .build(),
223                );
224
225                let mut version_data = self.data.write().unwrap();
226                version_data.version.ssts.mark_all_deleted();
227                version_data.version = new_version;
228            }
229            TruncateKind::Partial { files_to_remove } => {
230                let new_version = Arc::new(
231                    VersionBuilder::from_version(version)
232                        .remove_files(files_to_remove.into_iter())
233                        .build(),
234                );
235
236                let mut version_data = self.data.write().unwrap();
237                // notice since it's partial, no need to mark all files as deleted
238                version_data.version = new_version;
239            }
240        };
241    }
242
243    /// Overwrites the current version with a new version.
244    pub(crate) fn overwrite_current(&self, version: VersionRef) {
245        let mut version_data = self.data.write().unwrap();
246        version_data.version = version;
247    }
248}
249
250pub(crate) type VersionControlRef = Arc<VersionControl>;
251
252/// Data of [VersionControl].
253#[derive(Debug, Clone)]
254pub(crate) struct VersionControlData {
255    /// Latest version.
256    pub(crate) version: VersionRef,
257    /// Sequence number of last committed data.
258    ///
259    /// Starts from 1 (zero means no data).
260    pub(crate) committed_sequence: SequenceNumber,
261    /// Last WAL entry Id.
262    ///
263    /// Starts from 1 (zero means no data).
264    pub(crate) last_entry_id: EntryId,
265    /// Marker of whether this region is dropped/dropping
266    pub(crate) is_dropped: bool,
267}
268
269impl VersionControlData {
270    /// Approximate timeseries count in current version.
271    pub(crate) fn series_count(&self) -> usize {
272        self.version.memtables.mutable.series_count()
273    }
274}
275
276/// Static metadata of a region.
277#[derive(Clone, Debug)]
278pub(crate) struct Version {
279    /// Metadata of the region.
280    ///
281    /// Altering metadata isn't frequent, storing metadata in Arc to allow sharing
282    /// metadata and reuse metadata when creating a new `Version`.
283    pub(crate) metadata: RegionMetadataRef,
284    /// Mutable and immutable memtables.
285    ///
286    /// Wrapped in Arc to make clone of `Version` much cheaper.
287    pub(crate) memtables: MemtableVersionRef,
288    /// SSTs of the region.
289    pub(crate) ssts: SstVersionRef,
290    /// Inclusive max WAL entry id of flushed data.
291    pub(crate) flushed_entry_id: EntryId,
292    /// Inclusive max sequence of flushed data.
293    pub(crate) flushed_sequence: SequenceNumber,
294    /// Latest entry id during the truncating table.
295    ///
296    /// Used to check if it is a flush task during the truncating table.
297    pub(crate) truncated_entry_id: Option<EntryId>,
298    /// Inferred compaction time window from flush.
299    ///
300    /// If compaction options contain a time window, it will overwrite this value
301    /// when creating a new version from the [VersionBuilder].
302    pub(crate) compaction_time_window: Option<Duration>,
303    /// Options of the region.
304    pub(crate) options: RegionOptions,
305}
306
307pub(crate) type VersionRef = Arc<Version>;
308
309/// Version builder.
310pub(crate) struct VersionBuilder {
311    metadata: RegionMetadataRef,
312    memtables: MemtableVersionRef,
313    ssts: SstVersionRef,
314    flushed_entry_id: EntryId,
315    flushed_sequence: SequenceNumber,
316    truncated_entry_id: Option<EntryId>,
317    compaction_time_window: Option<Duration>,
318    options: RegionOptions,
319}
320
321impl VersionBuilder {
322    /// Returns a new builder.
323    pub(crate) fn new(metadata: RegionMetadataRef, mutable: TimePartitionsRef) -> Self {
324        VersionBuilder {
325            metadata,
326            memtables: Arc::new(MemtableVersion::new(mutable)),
327            ssts: Arc::new(SstVersion::new()),
328            flushed_entry_id: 0,
329            flushed_sequence: 0,
330            truncated_entry_id: None,
331            compaction_time_window: None,
332            options: RegionOptions::default(),
333        }
334    }
335
336    /// Returns a new builder from an existing version.
337    pub(crate) fn from_version(version: VersionRef) -> Self {
338        VersionBuilder {
339            metadata: version.metadata.clone(),
340            memtables: version.memtables.clone(),
341            ssts: version.ssts.clone(),
342            flushed_entry_id: version.flushed_entry_id,
343            flushed_sequence: version.flushed_sequence,
344            truncated_entry_id: version.truncated_entry_id,
345            compaction_time_window: version.compaction_time_window,
346            options: version.options.clone(),
347        }
348    }
349
350    /// Sets memtables.
351    pub(crate) fn memtables(mut self, memtables: MemtableVersion) -> Self {
352        self.memtables = Arc::new(memtables);
353        self
354    }
355
356    /// Sets metadata.
357    pub(crate) fn metadata(mut self, metadata: RegionMetadataRef) -> Self {
358        self.metadata = metadata;
359        self
360    }
361
362    /// Sets flushed entry id.
363    pub(crate) fn flushed_entry_id(mut self, entry_id: EntryId) -> Self {
364        self.flushed_entry_id = entry_id;
365        self
366    }
367
368    /// Sets flushed sequence.
369    pub(crate) fn flushed_sequence(mut self, sequence: SequenceNumber) -> Self {
370        self.flushed_sequence = sequence;
371        self
372    }
373
374    /// Sets truncated entry id.
375    pub(crate) fn truncated_entry_id(mut self, entry_id: Option<EntryId>) -> Self {
376        self.truncated_entry_id = entry_id;
377        self
378    }
379
380    /// Sets compaction time window.
381    pub(crate) fn compaction_time_window(mut self, window: Option<Duration>) -> Self {
382        self.compaction_time_window = window;
383        self
384    }
385
386    /// Sets options.
387    pub(crate) fn options(mut self, options: RegionOptions) -> Self {
388        self.options = options;
389        self
390    }
391
392    /// Apply edit to the builder.
393    pub(crate) fn apply_edit(mut self, edit: RegionEdit, file_purger: FilePurgerRef) -> Self {
394        if let Some(entry_id) = edit.flushed_entry_id {
395            self.flushed_entry_id = self.flushed_entry_id.max(entry_id);
396        }
397        if let Some(sequence) = edit.flushed_sequence {
398            self.flushed_sequence = self.flushed_sequence.max(sequence);
399        }
400        if let Some(window) = edit.compaction_time_window {
401            self.compaction_time_window = Some(window);
402        }
403        if !edit.files_to_add.is_empty() || !edit.files_to_remove.is_empty() {
404            let mut ssts = (*self.ssts).clone();
405            ssts.add_files(file_purger, edit.files_to_add.into_iter());
406            ssts.remove_files(edit.files_to_remove.into_iter());
407            self.ssts = Arc::new(ssts);
408        }
409
410        self
411    }
412
413    /// Remove memtables from the builder.
414    pub(crate) fn remove_memtables(mut self, ids: &[MemtableId]) -> Self {
415        if !ids.is_empty() {
416            let mut memtables = (*self.memtables).clone();
417            memtables.remove_memtables(ids);
418            self.memtables = Arc::new(memtables);
419        }
420        self
421    }
422
423    /// Add files to the builder.
424    pub(crate) fn add_files(
425        mut self,
426        file_purger: FilePurgerRef,
427        files: impl Iterator<Item = FileMeta>,
428    ) -> Self {
429        let mut ssts = (*self.ssts).clone();
430        ssts.add_files(file_purger, files);
431        self.ssts = Arc::new(ssts);
432
433        self
434    }
435
436    pub(crate) fn remove_files(mut self, files: impl Iterator<Item = FileMeta>) -> Self {
437        let mut ssts = (*self.ssts).clone();
438        ssts.remove_files(files);
439        self.ssts = Arc::new(ssts);
440
441        self
442    }
443
444    /// Builds a new [Version] from the builder.
445    /// It overwrites the window size by compaction option.
446    pub(crate) fn build(self) -> Version {
447        let compaction_time_window = self
448            .options
449            .compaction
450            .time_window()
451            .or(self.compaction_time_window);
452        if self.compaction_time_window.is_some()
453            && compaction_time_window != self.compaction_time_window
454        {
455            info!(
456                "VersionBuilder overwrites region compaction time window from {:?} to {:?}, region: {}",
457                self.compaction_time_window,
458                compaction_time_window,
459                self.metadata.region_id
460            );
461        }
462
463        Version {
464            metadata: self.metadata,
465            memtables: self.memtables,
466            ssts: self.ssts,
467            flushed_entry_id: self.flushed_entry_id,
468            flushed_sequence: self.flushed_sequence,
469            truncated_entry_id: self.truncated_entry_id,
470            compaction_time_window,
471            options: self.options,
472        }
473    }
474}