1use std::collections::BTreeMap;
18use std::fmt;
19use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
20use std::sync::Arc;
21
22pub use bulk::part::EncodedBulkPart;
23use common_time::Timestamp;
24use mito_codec::key_values::KeyValue;
25pub use mito_codec::key_values::KeyValues;
26use serde::{Deserialize, Serialize};
27use store_api::metadata::RegionMetadataRef;
28use store_api::storage::{ColumnId, SequenceNumber};
29use table::predicate::Predicate;
30
31use crate::config::MitoConfig;
32use crate::error::Result;
33use crate::flush::WriteBufferManagerRef;
34use crate::memtable::partition_tree::{PartitionTreeConfig, PartitionTreeMemtableBuilder};
35use crate::memtable::time_series::TimeSeriesMemtableBuilder;
36use crate::metrics::WRITE_BUFFER_BYTES;
37use crate::read::prune::PruneTimeIterator;
38use crate::read::scan_region::PredicateGroup;
39use crate::read::Batch;
40use crate::region::options::{MemtableOptions, MergeMode};
41use crate::sst::file::FileTimeRange;
42
43mod builder;
44pub mod bulk;
45pub mod partition_tree;
46mod simple_bulk_memtable;
47mod stats;
48pub mod time_partition;
49pub mod time_series;
50pub(crate) mod version;
51
52#[cfg(any(test, feature = "test"))]
53pub use bulk::part::BulkPart;
54#[cfg(any(test, feature = "test"))]
55pub use time_partition::filter_record_batch;
56
57pub type MemtableId = u32;
61
62#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
64#[serde(tag = "type", rename_all = "snake_case")]
65pub enum MemtableConfig {
66 PartitionTree(PartitionTreeConfig),
67 TimeSeries,
68}
69
70impl Default for MemtableConfig {
71 fn default() -> Self {
72 Self::TimeSeries
73 }
74}
75
76#[derive(Debug, Default)]
77pub struct MemtableStats {
78 estimated_bytes: usize,
80 time_range: Option<(Timestamp, Timestamp)>,
83 num_rows: usize,
85 num_ranges: usize,
87 max_sequence: SequenceNumber,
89 series_count: usize,
91}
92
93impl MemtableStats {
94 #[cfg(any(test, feature = "test"))]
96 pub fn with_time_range(mut self, time_range: Option<(Timestamp, Timestamp)>) -> Self {
97 self.time_range = time_range;
98 self
99 }
100
101 pub fn bytes_allocated(&self) -> usize {
103 self.estimated_bytes
104 }
105
106 pub fn time_range(&self) -> Option<(Timestamp, Timestamp)> {
108 self.time_range
109 }
110
111 pub fn num_rows(&self) -> usize {
113 self.num_rows
114 }
115
116 pub fn num_ranges(&self) -> usize {
118 self.num_ranges
119 }
120
121 pub fn max_sequence(&self) -> SequenceNumber {
123 self.max_sequence
124 }
125
126 pub fn series_count(&self) -> usize {
128 self.series_count
129 }
130}
131
132pub type BoxedBatchIterator = Box<dyn Iterator<Item = Result<Batch>> + Send>;
133
134#[derive(Default)]
136pub struct MemtableRanges {
137 pub ranges: BTreeMap<usize, MemtableRange>,
139 pub stats: MemtableStats,
141}
142
143pub trait Memtable: Send + Sync + fmt::Debug {
145 fn id(&self) -> MemtableId;
147
148 fn write(&self, kvs: &KeyValues) -> Result<()>;
150
151 fn write_one(&self, key_value: KeyValue) -> Result<()>;
153
154 fn write_bulk(&self, part: crate::memtable::bulk::part::BulkPart) -> Result<()>;
156
157 fn iter(
161 &self,
162 projection: Option<&[ColumnId]>,
163 predicate: Option<Predicate>,
164 sequence: Option<SequenceNumber>,
165 ) -> Result<BoxedBatchIterator>;
166
167 fn ranges(
170 &self,
171 projection: Option<&[ColumnId]>,
172 predicate: PredicateGroup,
173 sequence: Option<SequenceNumber>,
174 ) -> Result<MemtableRanges>;
175
176 fn is_empty(&self) -> bool;
178
179 fn freeze(&self) -> Result<()>;
181
182 fn stats(&self) -> MemtableStats;
184
185 fn fork(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef;
189}
190
191pub type MemtableRef = Arc<dyn Memtable>;
192
193pub trait MemtableBuilder: Send + Sync + fmt::Debug {
195 fn build(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef;
197}
198
199pub type MemtableBuilderRef = Arc<dyn MemtableBuilder>;
200
201#[derive(Default)]
203pub struct AllocTracker {
204 write_buffer_manager: Option<WriteBufferManagerRef>,
205 bytes_allocated: AtomicUsize,
207 is_done_allocating: AtomicBool,
209}
210
211impl fmt::Debug for AllocTracker {
212 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
213 f.debug_struct("AllocTracker")
214 .field("bytes_allocated", &self.bytes_allocated)
215 .field("is_done_allocating", &self.is_done_allocating)
216 .finish()
217 }
218}
219
220impl AllocTracker {
221 pub fn new(write_buffer_manager: Option<WriteBufferManagerRef>) -> AllocTracker {
223 AllocTracker {
224 write_buffer_manager,
225 bytes_allocated: AtomicUsize::new(0),
226 is_done_allocating: AtomicBool::new(false),
227 }
228 }
229
230 pub(crate) fn on_allocation(&self, bytes: usize) {
232 self.bytes_allocated.fetch_add(bytes, Ordering::Relaxed);
233 WRITE_BUFFER_BYTES.add(bytes as i64);
234 if let Some(write_buffer_manager) = &self.write_buffer_manager {
235 write_buffer_manager.reserve_mem(bytes);
236 }
237 }
238
239 pub(crate) fn done_allocating(&self) {
244 if let Some(write_buffer_manager) = &self.write_buffer_manager {
245 if self
246 .is_done_allocating
247 .compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed)
248 .is_ok()
249 {
250 write_buffer_manager
251 .schedule_free_mem(self.bytes_allocated.load(Ordering::Relaxed));
252 }
253 }
254 }
255
256 pub(crate) fn bytes_allocated(&self) -> usize {
258 self.bytes_allocated.load(Ordering::Relaxed)
259 }
260
261 pub(crate) fn write_buffer_manager(&self) -> Option<WriteBufferManagerRef> {
263 self.write_buffer_manager.clone()
264 }
265}
266
267impl Drop for AllocTracker {
268 fn drop(&mut self) {
269 if !self.is_done_allocating.load(Ordering::Relaxed) {
270 self.done_allocating();
271 }
272
273 let bytes_allocated = self.bytes_allocated.load(Ordering::Relaxed);
274 WRITE_BUFFER_BYTES.sub(bytes_allocated as i64);
275
276 if let Some(write_buffer_manager) = &self.write_buffer_manager {
278 write_buffer_manager.free_mem(bytes_allocated);
279 }
280 }
281}
282
283#[derive(Clone)]
285pub(crate) struct MemtableBuilderProvider {
286 write_buffer_manager: Option<WriteBufferManagerRef>,
287 config: Arc<MitoConfig>,
288}
289
290impl MemtableBuilderProvider {
291 pub(crate) fn new(
292 write_buffer_manager: Option<WriteBufferManagerRef>,
293 config: Arc<MitoConfig>,
294 ) -> Self {
295 Self {
296 write_buffer_manager,
297 config,
298 }
299 }
300
301 pub(crate) fn builder_for_options(
302 &self,
303 options: Option<&MemtableOptions>,
304 dedup: bool,
305 merge_mode: MergeMode,
306 ) -> MemtableBuilderRef {
307 match options {
308 Some(MemtableOptions::TimeSeries) => Arc::new(TimeSeriesMemtableBuilder::new(
309 self.write_buffer_manager.clone(),
310 dedup,
311 merge_mode,
312 )),
313 Some(MemtableOptions::PartitionTree(opts)) => {
314 Arc::new(PartitionTreeMemtableBuilder::new(
315 PartitionTreeConfig {
316 index_max_keys_per_shard: opts.index_max_keys_per_shard,
317 data_freeze_threshold: opts.data_freeze_threshold,
318 fork_dictionary_bytes: opts.fork_dictionary_bytes,
319 dedup,
320 merge_mode,
321 },
322 self.write_buffer_manager.clone(),
323 ))
324 }
325 None => self.default_memtable_builder(dedup, merge_mode),
326 }
327 }
328
329 fn default_memtable_builder(&self, dedup: bool, merge_mode: MergeMode) -> MemtableBuilderRef {
330 match &self.config.memtable {
331 MemtableConfig::PartitionTree(config) => {
332 let mut config = config.clone();
333 config.dedup = dedup;
334 Arc::new(PartitionTreeMemtableBuilder::new(
335 config,
336 self.write_buffer_manager.clone(),
337 ))
338 }
339 MemtableConfig::TimeSeries => Arc::new(TimeSeriesMemtableBuilder::new(
340 self.write_buffer_manager.clone(),
341 dedup,
342 merge_mode,
343 )),
344 }
345 }
346}
347
348pub trait IterBuilder: Send + Sync {
351 fn build(&self) -> Result<BoxedBatchIterator>;
353}
354
355pub type BoxedIterBuilder = Box<dyn IterBuilder>;
356
357pub struct MemtableRangeContext {
359 id: MemtableId,
361 builder: BoxedIterBuilder,
363 predicate: PredicateGroup,
365}
366
367pub type MemtableRangeContextRef = Arc<MemtableRangeContext>;
368
369impl MemtableRangeContext {
370 pub fn new(id: MemtableId, builder: BoxedIterBuilder, predicate: PredicateGroup) -> Self {
372 Self {
373 id,
374 builder,
375 predicate,
376 }
377 }
378}
379
380#[derive(Clone)]
382pub struct MemtableRange {
383 context: MemtableRangeContextRef,
385}
386
387impl MemtableRange {
388 pub fn new(context: MemtableRangeContextRef) -> Self {
390 Self { context }
391 }
392
393 pub fn id(&self) -> MemtableId {
395 self.context.id
396 }
397
398 pub fn build_iter(&self, time_range: FileTimeRange) -> Result<BoxedBatchIterator> {
402 let iter = self.context.builder.build()?;
403 let time_filters = self.context.predicate.time_filters();
404 Ok(Box::new(PruneTimeIterator::new(
405 iter,
406 time_range,
407 time_filters,
408 )))
409 }
410}
411
412#[cfg(test)]
413mod tests {
414 use common_base::readable_size::ReadableSize;
415
416 use super::*;
417 use crate::flush::{WriteBufferManager, WriteBufferManagerImpl};
418
419 #[test]
420 fn test_deserialize_memtable_config() {
421 let s = r#"
422type = "partition_tree"
423index_max_keys_per_shard = 8192
424data_freeze_threshold = 1024
425dedup = true
426fork_dictionary_bytes = "512MiB"
427"#;
428 let config: MemtableConfig = toml::from_str(s).unwrap();
429 let MemtableConfig::PartitionTree(memtable_config) = config else {
430 unreachable!()
431 };
432 assert!(memtable_config.dedup);
433 assert_eq!(8192, memtable_config.index_max_keys_per_shard);
434 assert_eq!(1024, memtable_config.data_freeze_threshold);
435 assert_eq!(ReadableSize::mb(512), memtable_config.fork_dictionary_bytes);
436 }
437
438 #[test]
439 fn test_alloc_tracker_without_manager() {
440 let tracker = AllocTracker::new(None);
441 assert_eq!(0, tracker.bytes_allocated());
442 tracker.on_allocation(100);
443 assert_eq!(100, tracker.bytes_allocated());
444 tracker.on_allocation(200);
445 assert_eq!(300, tracker.bytes_allocated());
446
447 tracker.done_allocating();
448 assert_eq!(300, tracker.bytes_allocated());
449 }
450
451 #[test]
452 fn test_alloc_tracker_with_manager() {
453 let manager = Arc::new(WriteBufferManagerImpl::new(1000));
454 {
455 let tracker = AllocTracker::new(Some(manager.clone() as WriteBufferManagerRef));
456
457 tracker.on_allocation(100);
458 assert_eq!(100, tracker.bytes_allocated());
459 assert_eq!(100, manager.memory_usage());
460 assert_eq!(100, manager.mutable_usage());
461
462 for _ in 0..2 {
463 tracker.done_allocating();
465 assert_eq!(100, manager.memory_usage());
466 assert_eq!(0, manager.mutable_usage());
467 }
468 }
469
470 assert_eq!(0, manager.memory_usage());
471 assert_eq!(0, manager.mutable_usage());
472 }
473
474 #[test]
475 fn test_alloc_tracker_without_done_allocating() {
476 let manager = Arc::new(WriteBufferManagerImpl::new(1000));
477 {
478 let tracker = AllocTracker::new(Some(manager.clone() as WriteBufferManagerRef));
479
480 tracker.on_allocation(100);
481 assert_eq!(100, tracker.bytes_allocated());
482 assert_eq!(100, manager.memory_usage());
483 assert_eq!(100, manager.mutable_usage());
484 }
485
486 assert_eq!(0, manager.memory_usage());
487 assert_eq!(0, manager.mutable_usage());
488 }
489}