mito2/memtable/
version.rs1use std::sync::Arc;
18use std::time::Duration;
19
20use smallvec::SmallVec;
21use store_api::metadata::RegionMetadataRef;
22
23use crate::error::Result;
24use crate::memtable::time_partition::TimePartitionsRef;
25use crate::memtable::{MemtableId, MemtableRef};
26
27pub(crate) type SmallMemtableVec = SmallVec<[MemtableRef; 2]>;
28
29#[derive(Debug, Clone)]
31pub(crate) struct MemtableVersion {
32 pub(crate) mutable: TimePartitionsRef,
34 immutables: SmallMemtableVec,
40}
41
42pub(crate) type MemtableVersionRef = Arc<MemtableVersion>;
43
44impl MemtableVersion {
45 pub(crate) fn new(mutable: TimePartitionsRef) -> MemtableVersion {
47 MemtableVersion {
48 mutable,
49 immutables: SmallVec::new(),
50 }
51 }
52
53 pub(crate) fn immutables(&self) -> &[MemtableRef] {
55 &self.immutables
56 }
57
58 pub(crate) fn list_memtables(&self) -> Vec<MemtableRef> {
60 let mut mems = Vec::with_capacity(self.immutables.len() + self.mutable.num_partitions());
61 self.mutable.list_memtables(&mut mems);
62 mems.extend_from_slice(&self.immutables);
63 mems
64 }
65
66 pub(crate) fn freeze_mutable(
73 &self,
74 metadata: &RegionMetadataRef,
75 time_window: Option<Duration>,
76 ) -> Result<Option<MemtableVersion>> {
77 if self.mutable.is_empty() {
78 if self.mutable.part_duration() == time_window {
80 return Ok(None);
82 }
83
84 let mutable = self.mutable.new_with_part_duration(time_window);
86 common_telemetry::debug!(
87 "Freeze empty memtable, update partition duration from {:?} to {:?}",
88 self.mutable.part_duration(),
89 time_window
90 );
91 return Ok(Some(MemtableVersion {
92 mutable: Arc::new(mutable),
93 immutables: self.immutables.clone(),
94 }));
95 }
96
97 self.mutable.freeze()?;
100 if self.mutable.part_duration() != time_window {
102 common_telemetry::debug!(
103 "Fork memtable, update partition duration from {:?}, to {:?}",
104 self.mutable.part_duration(),
105 time_window
106 );
107 }
108 let mutable = Arc::new(self.mutable.fork(metadata, time_window));
109
110 let mut immutables =
111 SmallVec::with_capacity(self.immutables.len() + self.mutable.num_partitions());
112 immutables.extend(self.immutables.iter().cloned());
113 self.mutable.list_memtables_to_small_vec(&mut immutables);
115
116 Ok(Some(MemtableVersion {
117 mutable,
118 immutables,
119 }))
120 }
121
122 pub(crate) fn remove_memtables(&mut self, ids: &[MemtableId]) {
124 self.immutables = self
125 .immutables
126 .iter()
127 .filter(|mem| !ids.contains(&mem.id()))
128 .cloned()
129 .collect();
130 }
131
132 pub(crate) fn mutable_usage(&self) -> usize {
134 self.mutable.memory_usage()
135 }
136
137 pub(crate) fn immutables_usage(&self) -> usize {
139 self.immutables
140 .iter()
141 .map(|mem| mem.stats().estimated_bytes)
142 .sum()
143 }
144
145 pub(crate) fn num_rows(&self) -> u64 {
147 self.immutables
148 .iter()
149 .map(|mem| mem.stats().num_rows as u64)
150 .sum::<u64>()
151 + self.mutable.num_rows()
152 }
153
154 pub(crate) fn is_empty(&self) -> bool {
159 self.mutable.is_empty() && self.immutables.is_empty()
160 }
161}