mito2/memtable/
version.rs1use std::sync::Arc;
18use std::time::Duration;
19
20use smallvec::SmallVec;
21use store_api::metadata::RegionMetadataRef;
22
23use crate::error::Result;
24use crate::memtable::time_partition::TimePartitionsRef;
25use crate::memtable::{MemtableId, MemtableRef};
26
27pub(crate) type SmallMemtableVec = SmallVec<[MemtableRef; 2]>;
28
29#[derive(Debug, Clone)]
31pub(crate) struct MemtableVersion {
32    pub(crate) mutable: TimePartitionsRef,
34    immutables: SmallMemtableVec,
40}
41
42pub(crate) type MemtableVersionRef = Arc<MemtableVersion>;
43
44impl MemtableVersion {
45    pub(crate) fn new(mutable: TimePartitionsRef) -> MemtableVersion {
47        MemtableVersion {
48            mutable,
49            immutables: SmallVec::new(),
50        }
51    }
52
53    pub(crate) fn immutables(&self) -> &[MemtableRef] {
55        &self.immutables
56    }
57
58    pub(crate) fn list_memtables(&self) -> Vec<MemtableRef> {
60        let mut mems = Vec::with_capacity(self.immutables.len() + self.mutable.num_partitions());
61        self.mutable.list_memtables(&mut mems);
62        mems.extend_from_slice(&self.immutables);
63        mems
64    }
65
66    pub(crate) fn freeze_mutable(
73        &self,
74        metadata: &RegionMetadataRef,
75        time_window: Option<Duration>,
76    ) -> Result<Option<MemtableVersion>> {
77        if self.mutable.is_empty() {
78            if Some(self.mutable.part_duration()) == time_window {
80                return Ok(None);
82            }
83
84            let mutable = self.mutable.new_with_part_duration(time_window);
86            common_telemetry::debug!(
87                "Freeze empty memtable, update partition duration from {:?} to {:?}",
88                self.mutable.part_duration(),
89                time_window
90            );
91            return Ok(Some(MemtableVersion {
92                mutable: Arc::new(mutable),
93                immutables: self.immutables.clone(),
94            }));
95        }
96
97        self.mutable.freeze()?;
100        if Some(self.mutable.part_duration()) != time_window {
102            common_telemetry::debug!(
103                "Fork memtable, update partition duration from {:?}, to {:?}",
104                self.mutable.part_duration(),
105                time_window
106            );
107        }
108        let mutable = Arc::new(self.mutable.fork(metadata, time_window));
109
110        let mut immutables =
111            SmallVec::with_capacity(self.immutables.len() + self.mutable.num_partitions());
112        immutables.extend(self.immutables.iter().cloned());
113        self.mutable.list_memtables_to_small_vec(&mut immutables);
115
116        Ok(Some(MemtableVersion {
117            mutable,
118            immutables,
119        }))
120    }
121
122    pub(crate) fn remove_memtables(&mut self, ids: &[MemtableId]) {
124        self.immutables = self
125            .immutables
126            .iter()
127            .filter(|mem| !ids.contains(&mem.id()))
128            .cloned()
129            .collect();
130    }
131
132    pub(crate) fn mutable_usage(&self) -> usize {
134        self.mutable.memory_usage()
135    }
136
137    pub(crate) fn immutables_usage(&self) -> usize {
139        self.immutables
140            .iter()
141            .map(|mem| mem.stats().estimated_bytes)
142            .sum()
143    }
144
145    pub(crate) fn num_rows(&self) -> u64 {
147        self.immutables
148            .iter()
149            .map(|mem| mem.stats().num_rows as u64)
150            .sum::<u64>()
151            + self.mutable.num_rows()
152    }
153
154    pub(crate) fn is_empty(&self) -> bool {
159        self.mutable.is_empty() && self.immutables.is_empty()
160    }
161}