1use std::collections::BTreeMap;
18use std::fmt;
19use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
20use std::sync::{Arc, Mutex};
21use std::time::Duration;
22
23pub use bulk::part::EncodedBulkPart;
24use bytes::Bytes;
25use common_time::Timestamp;
26use datatypes::arrow::record_batch::RecordBatch;
27use mito_codec::key_values::KeyValue;
28pub use mito_codec::key_values::KeyValues;
29use serde::{Deserialize, Serialize};
30use store_api::metadata::RegionMetadataRef;
31use store_api::storage::{ColumnId, SequenceNumber, SequenceRange};
32
33use crate::config::MitoConfig;
34use crate::error::{Result, UnsupportedOperationSnafu};
35use crate::flush::WriteBufferManagerRef;
36use crate::memtable::bulk::{BulkMemtableBuilder, CompactDispatcher};
37use crate::memtable::partition_tree::{PartitionTreeConfig, PartitionTreeMemtableBuilder};
38use crate::memtable::time_series::TimeSeriesMemtableBuilder;
39use crate::metrics::WRITE_BUFFER_BYTES;
40use crate::read::Batch;
41use crate::read::prune::PruneTimeIterator;
42use crate::read::scan_region::PredicateGroup;
43use crate::region::options::{MemtableOptions, MergeMode, RegionOptions};
44use crate::sst::FormatType;
45use crate::sst::file::FileTimeRange;
46use crate::sst::parquet::SstInfo;
47use crate::sst::parquet::file_range::PreFilterMode;
48
49mod builder;
50pub mod bulk;
51pub mod partition_tree;
52pub mod simple_bulk_memtable;
53mod stats;
54pub mod time_partition;
55pub mod time_series;
56pub(crate) mod version;
57
58pub use bulk::part::{
59 BulkPart, BulkPartEncoder, BulkPartMeta, UnorderedPart, record_batch_estimated_size,
60 sort_primary_key_record_batch,
61};
62#[cfg(any(test, feature = "test"))]
63pub use time_partition::filter_record_batch;
64
65pub type MemtableId = u32;
69
70#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
72#[serde(tag = "type", rename_all = "snake_case")]
73pub enum MemtableConfig {
74 PartitionTree(PartitionTreeConfig),
75 #[default]
76 TimeSeries,
77}
78
79#[derive(Clone)]
81pub struct RangesOptions {
82 pub for_flush: bool,
84 pub pre_filter_mode: PreFilterMode,
86 pub predicate: PredicateGroup,
88 pub sequence: Option<SequenceRange>,
90}
91
92impl Default for RangesOptions {
93 fn default() -> Self {
94 Self {
95 for_flush: false,
96 pre_filter_mode: PreFilterMode::All,
97 predicate: PredicateGroup::default(),
98 sequence: None,
99 }
100 }
101}
102
103impl RangesOptions {
104 pub fn for_flush() -> Self {
106 Self {
107 for_flush: true,
108 pre_filter_mode: PreFilterMode::All,
109 predicate: PredicateGroup::default(),
110 sequence: None,
111 }
112 }
113
114 #[must_use]
116 pub fn with_pre_filter_mode(mut self, pre_filter_mode: PreFilterMode) -> Self {
117 self.pre_filter_mode = pre_filter_mode;
118 self
119 }
120
121 #[must_use]
123 pub fn with_predicate(mut self, predicate: PredicateGroup) -> Self {
124 self.predicate = predicate;
125 self
126 }
127
128 #[must_use]
130 pub fn with_sequence(mut self, sequence: Option<SequenceRange>) -> Self {
131 self.sequence = sequence;
132 self
133 }
134}
135
136#[derive(Debug, Default, Clone)]
137pub struct MemtableStats {
138 pub estimated_bytes: usize,
140 pub time_range: Option<(Timestamp, Timestamp)>,
143 pub num_rows: usize,
145 pub num_ranges: usize,
147 pub max_sequence: SequenceNumber,
149 pub series_count: usize,
151}
152
153impl MemtableStats {
154 #[cfg(any(test, feature = "test"))]
156 pub fn with_time_range(mut self, time_range: Option<(Timestamp, Timestamp)>) -> Self {
157 self.time_range = time_range;
158 self
159 }
160
161 #[cfg(feature = "test")]
162 pub fn with_max_sequence(mut self, max_sequence: SequenceNumber) -> Self {
163 self.max_sequence = max_sequence;
164 self
165 }
166
167 pub fn bytes_allocated(&self) -> usize {
169 self.estimated_bytes
170 }
171
172 pub fn time_range(&self) -> Option<(Timestamp, Timestamp)> {
174 self.time_range
175 }
176
177 pub fn num_rows(&self) -> usize {
179 self.num_rows
180 }
181
182 pub fn num_ranges(&self) -> usize {
184 self.num_ranges
185 }
186
187 pub fn max_sequence(&self) -> SequenceNumber {
189 self.max_sequence
190 }
191
192 pub fn series_count(&self) -> usize {
194 self.series_count
195 }
196}
197
198pub type BoxedBatchIterator = Box<dyn Iterator<Item = Result<Batch>> + Send>;
199
200pub type BoxedRecordBatchIterator = Box<dyn Iterator<Item = Result<RecordBatch>> + Send>;
201
202#[derive(Default)]
204pub struct MemtableRanges {
205 pub ranges: BTreeMap<usize, MemtableRange>,
207}
208
209impl MemtableRanges {
210 pub fn num_rows(&self) -> usize {
212 self.ranges.values().map(|r| r.stats().num_rows()).sum()
213 }
214
215 pub fn series_count(&self) -> usize {
217 self.ranges.values().map(|r| r.stats().series_count()).sum()
218 }
219
220 pub fn max_sequence(&self) -> SequenceNumber {
222 self.ranges
223 .values()
224 .map(|r| r.stats().max_sequence())
225 .max()
226 .unwrap_or(0)
227 }
228}
229
230impl IterBuilder for MemtableRanges {
231 fn build(&self, _metrics: Option<MemScanMetrics>) -> Result<BoxedBatchIterator> {
232 UnsupportedOperationSnafu {
233 err_msg: "MemtableRanges does not support build iterator",
234 }
235 .fail()
236 }
237
238 fn is_record_batch(&self) -> bool {
239 self.ranges.values().all(|range| range.is_record_batch())
240 }
241}
242
243pub trait Memtable: Send + Sync + fmt::Debug {
245 fn id(&self) -> MemtableId;
247
248 fn write(&self, kvs: &KeyValues) -> Result<()>;
250
251 fn write_one(&self, key_value: KeyValue) -> Result<()>;
253
254 fn write_bulk(&self, part: crate::memtable::bulk::part::BulkPart) -> Result<()>;
256
257 #[cfg(any(test, feature = "test"))]
264 fn iter(
265 &self,
266 projection: Option<&[ColumnId]>,
267 predicate: Option<table::predicate::Predicate>,
268 sequence: Option<SequenceRange>,
269 ) -> Result<BoxedBatchIterator>;
270
271 fn ranges(
275 &self,
276 projection: Option<&[ColumnId]>,
277 options: RangesOptions,
278 ) -> Result<MemtableRanges>;
279
280 fn is_empty(&self) -> bool;
282
283 fn freeze(&self) -> Result<()>;
285
286 fn stats(&self) -> MemtableStats;
288
289 fn fork(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef;
293
294 fn compact(&self, for_flush: bool) -> Result<()> {
298 let _ = for_flush;
299 Ok(())
300 }
301}
302
303pub type MemtableRef = Arc<dyn Memtable>;
304
305pub trait MemtableBuilder: Send + Sync + fmt::Debug {
307 fn build(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef;
309
310 fn use_bulk_insert(&self, metadata: &RegionMetadataRef) -> bool {
312 let _metadata = metadata;
313 false
314 }
315}
316
317pub type MemtableBuilderRef = Arc<dyn MemtableBuilder>;
318
319#[derive(Default)]
321pub struct AllocTracker {
322 write_buffer_manager: Option<WriteBufferManagerRef>,
323 bytes_allocated: AtomicUsize,
325 is_done_allocating: AtomicBool,
327}
328
329impl fmt::Debug for AllocTracker {
330 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
331 f.debug_struct("AllocTracker")
332 .field("bytes_allocated", &self.bytes_allocated)
333 .field("is_done_allocating", &self.is_done_allocating)
334 .finish()
335 }
336}
337
338impl AllocTracker {
339 pub fn new(write_buffer_manager: Option<WriteBufferManagerRef>) -> AllocTracker {
341 AllocTracker {
342 write_buffer_manager,
343 bytes_allocated: AtomicUsize::new(0),
344 is_done_allocating: AtomicBool::new(false),
345 }
346 }
347
348 pub(crate) fn on_allocation(&self, bytes: usize) {
350 self.bytes_allocated.fetch_add(bytes, Ordering::Relaxed);
351 WRITE_BUFFER_BYTES.add(bytes as i64);
352 if let Some(write_buffer_manager) = &self.write_buffer_manager {
353 write_buffer_manager.reserve_mem(bytes);
354 }
355 }
356
357 pub(crate) fn done_allocating(&self) {
362 if let Some(write_buffer_manager) = &self.write_buffer_manager
363 && self
364 .is_done_allocating
365 .compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed)
366 .is_ok()
367 {
368 write_buffer_manager.schedule_free_mem(self.bytes_allocated.load(Ordering::Relaxed));
369 }
370 }
371
372 pub(crate) fn bytes_allocated(&self) -> usize {
374 self.bytes_allocated.load(Ordering::Relaxed)
375 }
376
377 pub(crate) fn write_buffer_manager(&self) -> Option<WriteBufferManagerRef> {
379 self.write_buffer_manager.clone()
380 }
381}
382
383impl Drop for AllocTracker {
384 fn drop(&mut self) {
385 if !self.is_done_allocating.load(Ordering::Relaxed) {
386 self.done_allocating();
387 }
388
389 let bytes_allocated = self.bytes_allocated.load(Ordering::Relaxed);
390 WRITE_BUFFER_BYTES.sub(bytes_allocated as i64);
391
392 if let Some(write_buffer_manager) = &self.write_buffer_manager {
394 write_buffer_manager.free_mem(bytes_allocated);
395 }
396 }
397}
398
399#[derive(Clone)]
401pub(crate) struct MemtableBuilderProvider {
402 write_buffer_manager: Option<WriteBufferManagerRef>,
403 config: Arc<MitoConfig>,
404 compact_dispatcher: Arc<CompactDispatcher>,
405}
406
407impl MemtableBuilderProvider {
408 pub(crate) fn new(
409 write_buffer_manager: Option<WriteBufferManagerRef>,
410 config: Arc<MitoConfig>,
411 ) -> Self {
412 let compact_dispatcher =
413 Arc::new(CompactDispatcher::new(config.max_background_compactions));
414
415 Self {
416 write_buffer_manager,
417 config,
418 compact_dispatcher,
419 }
420 }
421
422 pub(crate) fn builder_for_options(&self, options: &RegionOptions) -> MemtableBuilderRef {
423 let dedup = options.need_dedup();
424 let merge_mode = options.merge_mode();
425 let flat_format = options
426 .sst_format
427 .map(|format| format == FormatType::Flat)
428 .unwrap_or(self.config.default_experimental_flat_format);
429 if flat_format {
430 if options.memtable.is_some() {
431 common_telemetry::info!(
432 "Overriding memtable config, use BulkMemtable under flat format"
433 );
434 }
435
436 return Arc::new(
437 BulkMemtableBuilder::new(
438 self.write_buffer_manager.clone(),
439 !dedup, merge_mode,
441 )
442 .with_compact_dispatcher(self.compact_dispatcher.clone()),
443 );
444 }
445
446 match &options.memtable {
448 Some(MemtableOptions::TimeSeries) => Arc::new(TimeSeriesMemtableBuilder::new(
449 self.write_buffer_manager.clone(),
450 dedup,
451 merge_mode,
452 )),
453 Some(MemtableOptions::PartitionTree(opts)) => {
454 Arc::new(PartitionTreeMemtableBuilder::new(
455 PartitionTreeConfig {
456 index_max_keys_per_shard: opts.index_max_keys_per_shard,
457 data_freeze_threshold: opts.data_freeze_threshold,
458 fork_dictionary_bytes: opts.fork_dictionary_bytes,
459 dedup,
460 merge_mode,
461 },
462 self.write_buffer_manager.clone(),
463 ))
464 }
465 None => self.default_primary_key_memtable_builder(dedup, merge_mode),
466 }
467 }
468
469 fn default_primary_key_memtable_builder(
470 &self,
471 dedup: bool,
472 merge_mode: MergeMode,
473 ) -> MemtableBuilderRef {
474 match &self.config.memtable {
475 MemtableConfig::PartitionTree(config) => {
476 let mut config = config.clone();
477 config.dedup = dedup;
478 Arc::new(PartitionTreeMemtableBuilder::new(
479 config,
480 self.write_buffer_manager.clone(),
481 ))
482 }
483 MemtableConfig::TimeSeries => Arc::new(TimeSeriesMemtableBuilder::new(
484 self.write_buffer_manager.clone(),
485 dedup,
486 merge_mode,
487 )),
488 }
489 }
490}
491
492#[derive(Clone, Default)]
494pub struct MemScanMetrics(Arc<Mutex<MemScanMetricsData>>);
495
496impl MemScanMetrics {
497 pub(crate) fn merge_inner(&self, inner: &MemScanMetricsData) {
499 let mut metrics = self.0.lock().unwrap();
500 metrics.total_series += inner.total_series;
501 metrics.num_rows += inner.num_rows;
502 metrics.num_batches += inner.num_batches;
503 metrics.scan_cost += inner.scan_cost;
504 }
505
506 pub(crate) fn data(&self) -> MemScanMetricsData {
508 self.0.lock().unwrap().clone()
509 }
510}
511
512#[derive(Clone, Default)]
513pub(crate) struct MemScanMetricsData {
514 pub(crate) total_series: usize,
516 pub(crate) num_rows: usize,
518 pub(crate) num_batches: usize,
520 pub(crate) scan_cost: Duration,
522}
523
524pub struct EncodedRange {
526 pub data: Bytes,
528 pub sst_info: SstInfo,
530}
531
532pub trait IterBuilder: Send + Sync {
535 fn build(&self, metrics: Option<MemScanMetrics>) -> Result<BoxedBatchIterator>;
537
538 fn is_record_batch(&self) -> bool {
540 false
541 }
542
543 fn build_record_batch(
545 &self,
546 metrics: Option<MemScanMetrics>,
547 ) -> Result<BoxedRecordBatchIterator> {
548 let _metrics = metrics;
549 UnsupportedOperationSnafu {
550 err_msg: "Record batch iterator is not supported by this memtable",
551 }
552 .fail()
553 }
554
555 fn encoded_range(&self) -> Option<EncodedRange> {
557 None
558 }
559}
560
561pub type BoxedIterBuilder = Box<dyn IterBuilder>;
562
563pub struct MemtableRangeContext {
565 id: MemtableId,
567 builder: BoxedIterBuilder,
569 predicate: PredicateGroup,
571}
572
573pub type MemtableRangeContextRef = Arc<MemtableRangeContext>;
574
575impl MemtableRangeContext {
576 pub fn new(id: MemtableId, builder: BoxedIterBuilder, predicate: PredicateGroup) -> Self {
578 Self {
579 id,
580 builder,
581 predicate,
582 }
583 }
584}
585
586#[derive(Clone)]
588pub struct MemtableRange {
589 context: MemtableRangeContextRef,
591 stats: MemtableStats,
593}
594
595impl MemtableRange {
596 pub fn new(context: MemtableRangeContextRef, stats: MemtableStats) -> Self {
598 Self { context, stats }
599 }
600
601 pub fn stats(&self) -> &MemtableStats {
603 &self.stats
604 }
605
606 pub fn id(&self) -> MemtableId {
608 self.context.id
609 }
610
611 pub fn build_prune_iter(
615 &self,
616 time_range: FileTimeRange,
617 metrics: Option<MemScanMetrics>,
618 ) -> Result<BoxedBatchIterator> {
619 let iter = self.context.builder.build(metrics)?;
620 let time_filters = self.context.predicate.time_filters();
621 Ok(Box::new(PruneTimeIterator::new(
622 iter,
623 time_range,
624 time_filters,
625 )))
626 }
627
628 pub fn build_iter(&self) -> Result<BoxedBatchIterator> {
630 self.context.builder.build(None)
631 }
632
633 pub fn build_record_batch_iter(
638 &self,
639 metrics: Option<MemScanMetrics>,
640 ) -> Result<BoxedRecordBatchIterator> {
641 self.context.builder.build_record_batch(metrics)
642 }
643
644 pub fn is_record_batch(&self) -> bool {
646 self.context.builder.is_record_batch()
647 }
648
649 pub fn num_rows(&self) -> usize {
650 self.stats.num_rows
651 }
652
653 pub fn encoded(&self) -> Option<EncodedRange> {
655 self.context.builder.encoded_range()
656 }
657}
658
659#[cfg(test)]
660mod tests {
661 use common_base::readable_size::ReadableSize;
662
663 use super::*;
664 use crate::flush::{WriteBufferManager, WriteBufferManagerImpl};
665
666 #[test]
667 fn test_deserialize_memtable_config() {
668 let s = r#"
669type = "partition_tree"
670index_max_keys_per_shard = 8192
671data_freeze_threshold = 1024
672dedup = true
673fork_dictionary_bytes = "512MiB"
674"#;
675 let config: MemtableConfig = toml::from_str(s).unwrap();
676 let MemtableConfig::PartitionTree(memtable_config) = config else {
677 unreachable!()
678 };
679 assert!(memtable_config.dedup);
680 assert_eq!(8192, memtable_config.index_max_keys_per_shard);
681 assert_eq!(1024, memtable_config.data_freeze_threshold);
682 assert_eq!(ReadableSize::mb(512), memtable_config.fork_dictionary_bytes);
683 }
684
685 #[test]
686 fn test_alloc_tracker_without_manager() {
687 let tracker = AllocTracker::new(None);
688 assert_eq!(0, tracker.bytes_allocated());
689 tracker.on_allocation(100);
690 assert_eq!(100, tracker.bytes_allocated());
691 tracker.on_allocation(200);
692 assert_eq!(300, tracker.bytes_allocated());
693
694 tracker.done_allocating();
695 assert_eq!(300, tracker.bytes_allocated());
696 }
697
698 #[test]
699 fn test_alloc_tracker_with_manager() {
700 let manager = Arc::new(WriteBufferManagerImpl::new(1000));
701 {
702 let tracker = AllocTracker::new(Some(manager.clone() as WriteBufferManagerRef));
703
704 tracker.on_allocation(100);
705 assert_eq!(100, tracker.bytes_allocated());
706 assert_eq!(100, manager.memory_usage());
707 assert_eq!(100, manager.mutable_usage());
708
709 for _ in 0..2 {
710 tracker.done_allocating();
712 assert_eq!(100, manager.memory_usage());
713 assert_eq!(0, manager.mutable_usage());
714 }
715 }
716
717 assert_eq!(0, manager.memory_usage());
718 assert_eq!(0, manager.mutable_usage());
719 }
720
721 #[test]
722 fn test_alloc_tracker_without_done_allocating() {
723 let manager = Arc::new(WriteBufferManagerImpl::new(1000));
724 {
725 let tracker = AllocTracker::new(Some(manager.clone() as WriteBufferManagerRef));
726
727 tracker.on_allocation(100);
728 assert_eq!(100, tracker.bytes_allocated());
729 assert_eq!(100, manager.memory_usage());
730 assert_eq!(100, manager.mutable_usage());
731 }
732
733 assert_eq!(0, manager.memory_usage());
734 assert_eq!(0, manager.mutable_usage());
735 }
736}