mito2/memtable/
version.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Memtable version.
16
17use std::sync::Arc;
18use std::time::Duration;
19
20use smallvec::SmallVec;
21use store_api::metadata::RegionMetadataRef;
22
23use crate::error::Result;
24use crate::memtable::time_partition::TimePartitionsRef;
25use crate::memtable::{MemtableId, MemtableRef};
26
27pub(crate) type SmallMemtableVec = SmallVec<[MemtableRef; 2]>;
28
29/// A version of current memtables in a region.
30#[derive(Debug, Clone)]
31pub(crate) struct MemtableVersion {
32    /// Mutable memtable.
33    pub(crate) mutable: TimePartitionsRef,
34    /// Immutable memtables.
35    ///
36    /// We only allow one flush job per region but if a flush job failed, then we
37    /// might need to store more than one immutable memtable on the next time we
38    /// flush the region.
39    immutables: SmallMemtableVec,
40}
41
42pub(crate) type MemtableVersionRef = Arc<MemtableVersion>;
43
44impl MemtableVersion {
45    /// Returns a new [MemtableVersion] with specific mutable memtable.
46    pub(crate) fn new(mutable: TimePartitionsRef) -> MemtableVersion {
47        MemtableVersion {
48            mutable,
49            immutables: SmallVec::new(),
50        }
51    }
52
53    /// Immutable memtables.
54    pub(crate) fn immutables(&self) -> &[MemtableRef] {
55        &self.immutables
56    }
57
58    /// Lists mutable and immutable memtables.
59    pub(crate) fn list_memtables(&self) -> Vec<MemtableRef> {
60        let mut mems = Vec::with_capacity(self.immutables.len() + self.mutable.num_partitions());
61        self.mutable.list_memtables(&mut mems);
62        mems.extend_from_slice(&self.immutables);
63        mems
64    }
65
66    /// Returns a new [MemtableVersion] which switches the old mutable memtable to immutable
67    /// memtable.
68    ///
69    /// It will switch to use the `time_window` provided.
70    ///
71    /// Returns `None` if the mutable memtable is empty.
72    pub(crate) fn freeze_mutable(
73        &self,
74        metadata: &RegionMetadataRef,
75        time_window: Option<Duration>,
76    ) -> Result<Option<MemtableVersion>> {
77        if self.mutable.is_empty() {
78            // No need to freeze the mutable memtable, but we need to check the time window.
79            if self.mutable.part_duration() == time_window {
80                // If the time window is the same, we don't need to update it.
81                return Ok(None);
82            }
83
84            // Update the time window.
85            let mutable = self.mutable.new_with_part_duration(time_window);
86            common_telemetry::debug!(
87                "Freeze empty memtable, update partition duration from {:?} to {:?}",
88                self.mutable.part_duration(),
89                time_window
90            );
91            return Ok(Some(MemtableVersion {
92                mutable: Arc::new(mutable),
93                immutables: self.immutables.clone(),
94            }));
95        }
96
97        // Marks the mutable memtable as immutable so it can free the memory usage from our
98        // soft limit.
99        self.mutable.freeze()?;
100        // Fork the memtable.
101        if self.mutable.part_duration() != time_window {
102            common_telemetry::debug!(
103                "Fork memtable, update partition duration from {:?}, to {:?}",
104                self.mutable.part_duration(),
105                time_window
106            );
107        }
108        let mutable = Arc::new(self.mutable.fork(metadata, time_window));
109
110        let mut immutables =
111            SmallVec::with_capacity(self.immutables.len() + self.mutable.num_partitions());
112        immutables.extend(self.immutables.iter().cloned());
113        // Pushes the mutable memtable to immutable list.
114        self.mutable.list_memtables_to_small_vec(&mut immutables);
115
116        Ok(Some(MemtableVersion {
117            mutable,
118            immutables,
119        }))
120    }
121
122    /// Removes memtables by ids from immutable memtables.
123    pub(crate) fn remove_memtables(&mut self, ids: &[MemtableId]) {
124        self.immutables = self
125            .immutables
126            .iter()
127            .filter(|mem| !ids.contains(&mem.id()))
128            .cloned()
129            .collect();
130    }
131
132    /// Returns the memory usage of the mutable memtable.
133    pub(crate) fn mutable_usage(&self) -> usize {
134        self.mutable.memory_usage()
135    }
136
137    /// Returns the memory usage of the immutable memtables.
138    pub(crate) fn immutables_usage(&self) -> usize {
139        self.immutables
140            .iter()
141            .map(|mem| mem.stats().estimated_bytes)
142            .sum()
143    }
144
145    /// Returns the number of rows in memtables.
146    pub(crate) fn num_rows(&self) -> u64 {
147        self.immutables
148            .iter()
149            .map(|mem| mem.stats().num_rows as u64)
150            .sum::<u64>()
151            + self.mutable.num_rows()
152    }
153
154    /// Returns true if the memtable version is empty.
155    ///
156    /// The version is empty when mutable memtable is empty and there is no
157    /// immutable memtables.
158    pub(crate) fn is_empty(&self) -> bool {
159        self.mutable.is_empty() && self.immutables.is_empty()
160    }
161}