1use std::collections::BTreeMap;
18use std::fmt;
19use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
20use std::sync::Arc;
21
22pub use bulk::part::EncodedBulkPart;
23use common_time::Timestamp;
24use mito_codec::key_values::KeyValue;
25pub use mito_codec::key_values::KeyValues;
26use serde::{Deserialize, Serialize};
27use store_api::metadata::RegionMetadataRef;
28use store_api::storage::{ColumnId, SequenceNumber};
29use table::predicate::Predicate;
30
31use crate::config::MitoConfig;
32use crate::error::Result;
33use crate::flush::WriteBufferManagerRef;
34use crate::memtable::partition_tree::{PartitionTreeConfig, PartitionTreeMemtableBuilder};
35use crate::memtable::time_series::TimeSeriesMemtableBuilder;
36use crate::metrics::WRITE_BUFFER_BYTES;
37use crate::read::prune::PruneTimeIterator;
38use crate::read::scan_region::PredicateGroup;
39use crate::read::Batch;
40use crate::region::options::{MemtableOptions, MergeMode};
41use crate::sst::file::FileTimeRange;
42
43mod builder;
44pub mod bulk;
45pub mod partition_tree;
46mod simple_bulk_memtable;
47mod stats;
48pub mod time_partition;
49pub mod time_series;
50pub(crate) mod version;
51
52#[cfg(any(test, feature = "test"))]
53pub use bulk::part::BulkPart;
54#[cfg(any(test, feature = "test"))]
55pub use time_partition::filter_record_batch;
56
57pub type MemtableId = u32;
61
62#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
64#[serde(tag = "type", rename_all = "snake_case")]
65pub enum MemtableConfig {
66 PartitionTree(PartitionTreeConfig),
67 TimeSeries,
68}
69
70impl Default for MemtableConfig {
71 fn default() -> Self {
72 Self::TimeSeries
73 }
74}
75
76#[derive(Debug, Default)]
77pub struct MemtableStats {
78 estimated_bytes: usize,
80 time_range: Option<(Timestamp, Timestamp)>,
83 num_rows: usize,
85 num_ranges: usize,
87 max_sequence: SequenceNumber,
89}
90
91impl MemtableStats {
92 #[cfg(any(test, feature = "test"))]
94 pub(crate) fn with_time_range(mut self, time_range: Option<(Timestamp, Timestamp)>) -> Self {
95 self.time_range = time_range;
96 self
97 }
98
99 pub fn bytes_allocated(&self) -> usize {
101 self.estimated_bytes
102 }
103
104 pub fn time_range(&self) -> Option<(Timestamp, Timestamp)> {
106 self.time_range
107 }
108
109 pub fn num_rows(&self) -> usize {
111 self.num_rows
112 }
113
114 pub fn num_ranges(&self) -> usize {
116 self.num_ranges
117 }
118
119 pub fn max_sequence(&self) -> SequenceNumber {
121 self.max_sequence
122 }
123}
124
125pub type BoxedBatchIterator = Box<dyn Iterator<Item = Result<Batch>> + Send>;
126
127#[derive(Default)]
129pub struct MemtableRanges {
130 pub ranges: BTreeMap<usize, MemtableRange>,
132 pub stats: MemtableStats,
134}
135
136pub trait Memtable: Send + Sync + fmt::Debug {
138 fn id(&self) -> MemtableId;
140
141 fn write(&self, kvs: &KeyValues) -> Result<()>;
143
144 fn write_one(&self, key_value: KeyValue) -> Result<()>;
146
147 fn write_bulk(&self, part: crate::memtable::bulk::part::BulkPart) -> Result<()>;
149
150 fn iter(
154 &self,
155 projection: Option<&[ColumnId]>,
156 predicate: Option<Predicate>,
157 sequence: Option<SequenceNumber>,
158 ) -> Result<BoxedBatchIterator>;
159
160 fn ranges(
163 &self,
164 projection: Option<&[ColumnId]>,
165 predicate: PredicateGroup,
166 sequence: Option<SequenceNumber>,
167 ) -> Result<MemtableRanges>;
168
169 fn is_empty(&self) -> bool;
171
172 fn freeze(&self) -> Result<()>;
174
175 fn stats(&self) -> MemtableStats;
177
178 fn fork(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef;
182}
183
184pub type MemtableRef = Arc<dyn Memtable>;
185
186pub trait MemtableBuilder: Send + Sync + fmt::Debug {
188 fn build(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef;
190}
191
192pub type MemtableBuilderRef = Arc<dyn MemtableBuilder>;
193
194#[derive(Default)]
196pub struct AllocTracker {
197 write_buffer_manager: Option<WriteBufferManagerRef>,
198 bytes_allocated: AtomicUsize,
200 is_done_allocating: AtomicBool,
202}
203
204impl fmt::Debug for AllocTracker {
205 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
206 f.debug_struct("AllocTracker")
207 .field("bytes_allocated", &self.bytes_allocated)
208 .field("is_done_allocating", &self.is_done_allocating)
209 .finish()
210 }
211}
212
213impl AllocTracker {
214 pub fn new(write_buffer_manager: Option<WriteBufferManagerRef>) -> AllocTracker {
216 AllocTracker {
217 write_buffer_manager,
218 bytes_allocated: AtomicUsize::new(0),
219 is_done_allocating: AtomicBool::new(false),
220 }
221 }
222
223 pub(crate) fn on_allocation(&self, bytes: usize) {
225 self.bytes_allocated.fetch_add(bytes, Ordering::Relaxed);
226 WRITE_BUFFER_BYTES.add(bytes as i64);
227 if let Some(write_buffer_manager) = &self.write_buffer_manager {
228 write_buffer_manager.reserve_mem(bytes);
229 }
230 }
231
232 pub(crate) fn done_allocating(&self) {
237 if let Some(write_buffer_manager) = &self.write_buffer_manager {
238 if self
239 .is_done_allocating
240 .compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed)
241 .is_ok()
242 {
243 write_buffer_manager
244 .schedule_free_mem(self.bytes_allocated.load(Ordering::Relaxed));
245 }
246 }
247 }
248
249 pub(crate) fn bytes_allocated(&self) -> usize {
251 self.bytes_allocated.load(Ordering::Relaxed)
252 }
253
254 pub(crate) fn write_buffer_manager(&self) -> Option<WriteBufferManagerRef> {
256 self.write_buffer_manager.clone()
257 }
258}
259
260impl Drop for AllocTracker {
261 fn drop(&mut self) {
262 if !self.is_done_allocating.load(Ordering::Relaxed) {
263 self.done_allocating();
264 }
265
266 let bytes_allocated = self.bytes_allocated.load(Ordering::Relaxed);
267 WRITE_BUFFER_BYTES.sub(bytes_allocated as i64);
268
269 if let Some(write_buffer_manager) = &self.write_buffer_manager {
271 write_buffer_manager.free_mem(bytes_allocated);
272 }
273 }
274}
275
276#[derive(Clone)]
278pub(crate) struct MemtableBuilderProvider {
279 write_buffer_manager: Option<WriteBufferManagerRef>,
280 config: Arc<MitoConfig>,
281}
282
283impl MemtableBuilderProvider {
284 pub(crate) fn new(
285 write_buffer_manager: Option<WriteBufferManagerRef>,
286 config: Arc<MitoConfig>,
287 ) -> Self {
288 Self {
289 write_buffer_manager,
290 config,
291 }
292 }
293
294 pub(crate) fn builder_for_options(
295 &self,
296 options: Option<&MemtableOptions>,
297 dedup: bool,
298 merge_mode: MergeMode,
299 ) -> MemtableBuilderRef {
300 match options {
301 Some(MemtableOptions::TimeSeries) => Arc::new(TimeSeriesMemtableBuilder::new(
302 self.write_buffer_manager.clone(),
303 dedup,
304 merge_mode,
305 )),
306 Some(MemtableOptions::PartitionTree(opts)) => {
307 Arc::new(PartitionTreeMemtableBuilder::new(
308 PartitionTreeConfig {
309 index_max_keys_per_shard: opts.index_max_keys_per_shard,
310 data_freeze_threshold: opts.data_freeze_threshold,
311 fork_dictionary_bytes: opts.fork_dictionary_bytes,
312 dedup,
313 merge_mode,
314 },
315 self.write_buffer_manager.clone(),
316 ))
317 }
318 None => self.default_memtable_builder(dedup, merge_mode),
319 }
320 }
321
322 fn default_memtable_builder(&self, dedup: bool, merge_mode: MergeMode) -> MemtableBuilderRef {
323 match &self.config.memtable {
324 MemtableConfig::PartitionTree(config) => {
325 let mut config = config.clone();
326 config.dedup = dedup;
327 Arc::new(PartitionTreeMemtableBuilder::new(
328 config,
329 self.write_buffer_manager.clone(),
330 ))
331 }
332 MemtableConfig::TimeSeries => Arc::new(TimeSeriesMemtableBuilder::new(
333 self.write_buffer_manager.clone(),
334 dedup,
335 merge_mode,
336 )),
337 }
338 }
339}
340
341pub trait IterBuilder: Send + Sync {
344 fn build(&self) -> Result<BoxedBatchIterator>;
346}
347
348pub type BoxedIterBuilder = Box<dyn IterBuilder>;
349
350pub struct MemtableRangeContext {
352 id: MemtableId,
354 builder: BoxedIterBuilder,
356 predicate: PredicateGroup,
358}
359
360pub type MemtableRangeContextRef = Arc<MemtableRangeContext>;
361
362impl MemtableRangeContext {
363 pub fn new(id: MemtableId, builder: BoxedIterBuilder, predicate: PredicateGroup) -> Self {
365 Self {
366 id,
367 builder,
368 predicate,
369 }
370 }
371}
372
373#[derive(Clone)]
375pub struct MemtableRange {
376 context: MemtableRangeContextRef,
378}
379
380impl MemtableRange {
381 pub fn new(context: MemtableRangeContextRef) -> Self {
383 Self { context }
384 }
385
386 pub fn id(&self) -> MemtableId {
388 self.context.id
389 }
390
391 pub fn build_iter(&self, time_range: FileTimeRange) -> Result<BoxedBatchIterator> {
395 let iter = self.context.builder.build()?;
396 let time_filters = self.context.predicate.time_filters();
397 Ok(Box::new(PruneTimeIterator::new(
398 iter,
399 time_range,
400 time_filters,
401 )))
402 }
403}
404
405#[cfg(test)]
406mod tests {
407 use common_base::readable_size::ReadableSize;
408
409 use super::*;
410 use crate::flush::{WriteBufferManager, WriteBufferManagerImpl};
411
412 #[test]
413 fn test_deserialize_memtable_config() {
414 let s = r#"
415type = "partition_tree"
416index_max_keys_per_shard = 8192
417data_freeze_threshold = 1024
418dedup = true
419fork_dictionary_bytes = "512MiB"
420"#;
421 let config: MemtableConfig = toml::from_str(s).unwrap();
422 let MemtableConfig::PartitionTree(memtable_config) = config else {
423 unreachable!()
424 };
425 assert!(memtable_config.dedup);
426 assert_eq!(8192, memtable_config.index_max_keys_per_shard);
427 assert_eq!(1024, memtable_config.data_freeze_threshold);
428 assert_eq!(ReadableSize::mb(512), memtable_config.fork_dictionary_bytes);
429 }
430
431 #[test]
432 fn test_alloc_tracker_without_manager() {
433 let tracker = AllocTracker::new(None);
434 assert_eq!(0, tracker.bytes_allocated());
435 tracker.on_allocation(100);
436 assert_eq!(100, tracker.bytes_allocated());
437 tracker.on_allocation(200);
438 assert_eq!(300, tracker.bytes_allocated());
439
440 tracker.done_allocating();
441 assert_eq!(300, tracker.bytes_allocated());
442 }
443
444 #[test]
445 fn test_alloc_tracker_with_manager() {
446 let manager = Arc::new(WriteBufferManagerImpl::new(1000));
447 {
448 let tracker = AllocTracker::new(Some(manager.clone() as WriteBufferManagerRef));
449
450 tracker.on_allocation(100);
451 assert_eq!(100, tracker.bytes_allocated());
452 assert_eq!(100, manager.memory_usage());
453 assert_eq!(100, manager.mutable_usage());
454
455 for _ in 0..2 {
456 tracker.done_allocating();
458 assert_eq!(100, manager.memory_usage());
459 assert_eq!(0, manager.mutable_usage());
460 }
461 }
462
463 assert_eq!(0, manager.memory_usage());
464 assert_eq!(0, manager.mutable_usage());
465 }
466
467 #[test]
468 fn test_alloc_tracker_without_done_allocating() {
469 let manager = Arc::new(WriteBufferManagerImpl::new(1000));
470 {
471 let tracker = AllocTracker::new(Some(manager.clone() as WriteBufferManagerRef));
472
473 tracker.on_allocation(100);
474 assert_eq!(100, tracker.bytes_allocated());
475 assert_eq!(100, manager.memory_usage());
476 assert_eq!(100, manager.mutable_usage());
477 }
478
479 assert_eq!(0, manager.memory_usage());
480 assert_eq!(0, manager.mutable_usage());
481 }
482}