1use std::collections::BTreeMap;
18use std::fmt;
19use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
20use std::sync::Arc;
21
22pub use bulk::part::EncodedBulkPart;
23use common_time::Timestamp;
24use mito_codec::key_values::KeyValue;
25pub use mito_codec::key_values::KeyValues;
26use serde::{Deserialize, Serialize};
27use store_api::metadata::RegionMetadataRef;
28use store_api::storage::{ColumnId, SequenceNumber};
29use table::predicate::Predicate;
30
31use crate::config::MitoConfig;
32use crate::error::Result;
33use crate::flush::WriteBufferManagerRef;
34use crate::memtable::partition_tree::{PartitionTreeConfig, PartitionTreeMemtableBuilder};
35use crate::memtable::time_series::TimeSeriesMemtableBuilder;
36use crate::metrics::WRITE_BUFFER_BYTES;
37use crate::read::prune::PruneTimeIterator;
38use crate::read::scan_region::PredicateGroup;
39use crate::read::Batch;
40use crate::region::options::{MemtableOptions, MergeMode};
41use crate::sst::file::FileTimeRange;
42
43mod builder;
44pub mod bulk;
45pub mod partition_tree;
46mod simple_bulk_memtable;
47mod stats;
48pub mod time_partition;
49pub mod time_series;
50pub(crate) mod version;
51
52#[cfg(any(test, feature = "test"))]
53pub use bulk::part::BulkPart;
54#[cfg(any(test, feature = "test"))]
55pub use time_partition::filter_record_batch;
56
57pub type MemtableId = u32;
61
62#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
64#[serde(tag = "type", rename_all = "snake_case")]
65pub enum MemtableConfig {
66 PartitionTree(PartitionTreeConfig),
67 TimeSeries,
68}
69
70impl Default for MemtableConfig {
71 fn default() -> Self {
72 Self::TimeSeries
73 }
74}
75
76#[derive(Debug, Default)]
77pub struct MemtableStats {
78 estimated_bytes: usize,
80 time_range: Option<(Timestamp, Timestamp)>,
83 num_rows: usize,
85 num_ranges: usize,
87 max_sequence: SequenceNumber,
89 series_count: usize,
91}
92
93impl MemtableStats {
94 #[cfg(any(test, feature = "test"))]
96 pub fn with_time_range(mut self, time_range: Option<(Timestamp, Timestamp)>) -> Self {
97 self.time_range = time_range;
98 self
99 }
100
101 #[cfg(feature = "test")]
102 pub fn with_max_sequence(mut self, max_sequence: SequenceNumber) -> Self {
103 self.max_sequence = max_sequence;
104 self
105 }
106
107 pub fn bytes_allocated(&self) -> usize {
109 self.estimated_bytes
110 }
111
112 pub fn time_range(&self) -> Option<(Timestamp, Timestamp)> {
114 self.time_range
115 }
116
117 pub fn num_rows(&self) -> usize {
119 self.num_rows
120 }
121
122 pub fn num_ranges(&self) -> usize {
124 self.num_ranges
125 }
126
127 pub fn max_sequence(&self) -> SequenceNumber {
129 self.max_sequence
130 }
131
132 pub fn series_count(&self) -> usize {
134 self.series_count
135 }
136}
137
138pub type BoxedBatchIterator = Box<dyn Iterator<Item = Result<Batch>> + Send>;
139
140#[derive(Default)]
142pub struct MemtableRanges {
143 pub ranges: BTreeMap<usize, MemtableRange>,
145 pub stats: MemtableStats,
147}
148
149pub trait Memtable: Send + Sync + fmt::Debug {
151 fn id(&self) -> MemtableId;
153
154 fn write(&self, kvs: &KeyValues) -> Result<()>;
156
157 fn write_one(&self, key_value: KeyValue) -> Result<()>;
159
160 fn write_bulk(&self, part: crate::memtable::bulk::part::BulkPart) -> Result<()>;
162
163 fn iter(
167 &self,
168 projection: Option<&[ColumnId]>,
169 predicate: Option<Predicate>,
170 sequence: Option<SequenceNumber>,
171 ) -> Result<BoxedBatchIterator>;
172
173 fn ranges(
176 &self,
177 projection: Option<&[ColumnId]>,
178 predicate: PredicateGroup,
179 sequence: Option<SequenceNumber>,
180 ) -> Result<MemtableRanges>;
181
182 fn is_empty(&self) -> bool;
184
185 fn freeze(&self) -> Result<()>;
187
188 fn stats(&self) -> MemtableStats;
190
191 fn fork(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef;
195}
196
197pub type MemtableRef = Arc<dyn Memtable>;
198
199pub trait MemtableBuilder: Send + Sync + fmt::Debug {
201 fn build(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef;
203}
204
205pub type MemtableBuilderRef = Arc<dyn MemtableBuilder>;
206
207#[derive(Default)]
209pub struct AllocTracker {
210 write_buffer_manager: Option<WriteBufferManagerRef>,
211 bytes_allocated: AtomicUsize,
213 is_done_allocating: AtomicBool,
215}
216
217impl fmt::Debug for AllocTracker {
218 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
219 f.debug_struct("AllocTracker")
220 .field("bytes_allocated", &self.bytes_allocated)
221 .field("is_done_allocating", &self.is_done_allocating)
222 .finish()
223 }
224}
225
226impl AllocTracker {
227 pub fn new(write_buffer_manager: Option<WriteBufferManagerRef>) -> AllocTracker {
229 AllocTracker {
230 write_buffer_manager,
231 bytes_allocated: AtomicUsize::new(0),
232 is_done_allocating: AtomicBool::new(false),
233 }
234 }
235
236 pub(crate) fn on_allocation(&self, bytes: usize) {
238 self.bytes_allocated.fetch_add(bytes, Ordering::Relaxed);
239 WRITE_BUFFER_BYTES.add(bytes as i64);
240 if let Some(write_buffer_manager) = &self.write_buffer_manager {
241 write_buffer_manager.reserve_mem(bytes);
242 }
243 }
244
245 pub(crate) fn done_allocating(&self) {
250 if let Some(write_buffer_manager) = &self.write_buffer_manager {
251 if self
252 .is_done_allocating
253 .compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed)
254 .is_ok()
255 {
256 write_buffer_manager
257 .schedule_free_mem(self.bytes_allocated.load(Ordering::Relaxed));
258 }
259 }
260 }
261
262 pub(crate) fn bytes_allocated(&self) -> usize {
264 self.bytes_allocated.load(Ordering::Relaxed)
265 }
266
267 pub(crate) fn write_buffer_manager(&self) -> Option<WriteBufferManagerRef> {
269 self.write_buffer_manager.clone()
270 }
271}
272
273impl Drop for AllocTracker {
274 fn drop(&mut self) {
275 if !self.is_done_allocating.load(Ordering::Relaxed) {
276 self.done_allocating();
277 }
278
279 let bytes_allocated = self.bytes_allocated.load(Ordering::Relaxed);
280 WRITE_BUFFER_BYTES.sub(bytes_allocated as i64);
281
282 if let Some(write_buffer_manager) = &self.write_buffer_manager {
284 write_buffer_manager.free_mem(bytes_allocated);
285 }
286 }
287}
288
289#[derive(Clone)]
291pub(crate) struct MemtableBuilderProvider {
292 write_buffer_manager: Option<WriteBufferManagerRef>,
293 config: Arc<MitoConfig>,
294}
295
296impl MemtableBuilderProvider {
297 pub(crate) fn new(
298 write_buffer_manager: Option<WriteBufferManagerRef>,
299 config: Arc<MitoConfig>,
300 ) -> Self {
301 Self {
302 write_buffer_manager,
303 config,
304 }
305 }
306
307 pub(crate) fn builder_for_options(
308 &self,
309 options: Option<&MemtableOptions>,
310 dedup: bool,
311 merge_mode: MergeMode,
312 ) -> MemtableBuilderRef {
313 match options {
314 Some(MemtableOptions::TimeSeries) => Arc::new(TimeSeriesMemtableBuilder::new(
315 self.write_buffer_manager.clone(),
316 dedup,
317 merge_mode,
318 )),
319 Some(MemtableOptions::PartitionTree(opts)) => {
320 Arc::new(PartitionTreeMemtableBuilder::new(
321 PartitionTreeConfig {
322 index_max_keys_per_shard: opts.index_max_keys_per_shard,
323 data_freeze_threshold: opts.data_freeze_threshold,
324 fork_dictionary_bytes: opts.fork_dictionary_bytes,
325 dedup,
326 merge_mode,
327 },
328 self.write_buffer_manager.clone(),
329 ))
330 }
331 None => self.default_memtable_builder(dedup, merge_mode),
332 }
333 }
334
335 fn default_memtable_builder(&self, dedup: bool, merge_mode: MergeMode) -> MemtableBuilderRef {
336 match &self.config.memtable {
337 MemtableConfig::PartitionTree(config) => {
338 let mut config = config.clone();
339 config.dedup = dedup;
340 Arc::new(PartitionTreeMemtableBuilder::new(
341 config,
342 self.write_buffer_manager.clone(),
343 ))
344 }
345 MemtableConfig::TimeSeries => Arc::new(TimeSeriesMemtableBuilder::new(
346 self.write_buffer_manager.clone(),
347 dedup,
348 merge_mode,
349 )),
350 }
351 }
352}
353
354pub trait IterBuilder: Send + Sync {
357 fn build(&self) -> Result<BoxedBatchIterator>;
359}
360
361pub type BoxedIterBuilder = Box<dyn IterBuilder>;
362
363pub struct MemtableRangeContext {
365 id: MemtableId,
367 builder: BoxedIterBuilder,
369 predicate: PredicateGroup,
371}
372
373pub type MemtableRangeContextRef = Arc<MemtableRangeContext>;
374
375impl MemtableRangeContext {
376 pub fn new(id: MemtableId, builder: BoxedIterBuilder, predicate: PredicateGroup) -> Self {
378 Self {
379 id,
380 builder,
381 predicate,
382 }
383 }
384}
385
386#[derive(Clone)]
388pub struct MemtableRange {
389 context: MemtableRangeContextRef,
391}
392
393impl MemtableRange {
394 pub fn new(context: MemtableRangeContextRef) -> Self {
396 Self { context }
397 }
398
399 pub fn id(&self) -> MemtableId {
401 self.context.id
402 }
403
404 pub fn build_iter(&self, time_range: FileTimeRange) -> Result<BoxedBatchIterator> {
408 let iter = self.context.builder.build()?;
409 let time_filters = self.context.predicate.time_filters();
410 Ok(Box::new(PruneTimeIterator::new(
411 iter,
412 time_range,
413 time_filters,
414 )))
415 }
416}
417
418#[cfg(test)]
419mod tests {
420 use common_base::readable_size::ReadableSize;
421
422 use super::*;
423 use crate::flush::{WriteBufferManager, WriteBufferManagerImpl};
424
425 #[test]
426 fn test_deserialize_memtable_config() {
427 let s = r#"
428type = "partition_tree"
429index_max_keys_per_shard = 8192
430data_freeze_threshold = 1024
431dedup = true
432fork_dictionary_bytes = "512MiB"
433"#;
434 let config: MemtableConfig = toml::from_str(s).unwrap();
435 let MemtableConfig::PartitionTree(memtable_config) = config else {
436 unreachable!()
437 };
438 assert!(memtable_config.dedup);
439 assert_eq!(8192, memtable_config.index_max_keys_per_shard);
440 assert_eq!(1024, memtable_config.data_freeze_threshold);
441 assert_eq!(ReadableSize::mb(512), memtable_config.fork_dictionary_bytes);
442 }
443
444 #[test]
445 fn test_alloc_tracker_without_manager() {
446 let tracker = AllocTracker::new(None);
447 assert_eq!(0, tracker.bytes_allocated());
448 tracker.on_allocation(100);
449 assert_eq!(100, tracker.bytes_allocated());
450 tracker.on_allocation(200);
451 assert_eq!(300, tracker.bytes_allocated());
452
453 tracker.done_allocating();
454 assert_eq!(300, tracker.bytes_allocated());
455 }
456
457 #[test]
458 fn test_alloc_tracker_with_manager() {
459 let manager = Arc::new(WriteBufferManagerImpl::new(1000));
460 {
461 let tracker = AllocTracker::new(Some(manager.clone() as WriteBufferManagerRef));
462
463 tracker.on_allocation(100);
464 assert_eq!(100, tracker.bytes_allocated());
465 assert_eq!(100, manager.memory_usage());
466 assert_eq!(100, manager.mutable_usage());
467
468 for _ in 0..2 {
469 tracker.done_allocating();
471 assert_eq!(100, manager.memory_usage());
472 assert_eq!(0, manager.mutable_usage());
473 }
474 }
475
476 assert_eq!(0, manager.memory_usage());
477 assert_eq!(0, manager.mutable_usage());
478 }
479
480 #[test]
481 fn test_alloc_tracker_without_done_allocating() {
482 let manager = Arc::new(WriteBufferManagerImpl::new(1000));
483 {
484 let tracker = AllocTracker::new(Some(manager.clone() as WriteBufferManagerRef));
485
486 tracker.on_allocation(100);
487 assert_eq!(100, tracker.bytes_allocated());
488 assert_eq!(100, manager.memory_usage());
489 assert_eq!(100, manager.mutable_usage());
490 }
491
492 assert_eq!(0, manager.memory_usage());
493 assert_eq!(0, manager.mutable_usage());
494 }
495}