1use std::collections::BTreeMap;
18use std::fmt;
19use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
20use std::sync::{Arc, Mutex};
21use std::time::Duration;
22
23pub use bulk::part::EncodedBulkPart;
24use bytes::Bytes;
25use common_time::Timestamp;
26use datatypes::arrow::record_batch::RecordBatch;
27use mito_codec::key_values::KeyValue;
28pub use mito_codec::key_values::KeyValues;
29use mito_codec::row_converter::{PrimaryKeyCodec, build_primary_key_codec};
30use serde::{Deserialize, Serialize};
31use store_api::metadata::RegionMetadataRef;
32use store_api::storage::{ColumnId, SequenceNumber, SequenceRange};
33
34use crate::config::MitoConfig;
35use crate::error::{Result, UnsupportedOperationSnafu};
36use crate::flush::WriteBufferManagerRef;
37use crate::memtable::bulk::{BulkMemtableBuilder, CompactDispatcher};
38use crate::memtable::partition_tree::{PartitionTreeConfig, PartitionTreeMemtableBuilder};
39use crate::memtable::time_series::TimeSeriesMemtableBuilder;
40use crate::metrics::WRITE_BUFFER_BYTES;
41use crate::read::Batch;
42use crate::read::batch_adapter::BatchToRecordBatchAdapter;
43use crate::read::prune::PruneTimeIterator;
44use crate::read::scan_region::PredicateGroup;
45use crate::region::options::{MemtableOptions, MergeMode, RegionOptions};
46use crate::sst::FormatType;
47use crate::sst::file::FileTimeRange;
48use crate::sst::parquet::SstInfo;
49use crate::sst::parquet::file_range::PreFilterMode;
50
51mod builder;
52pub mod bulk;
53pub mod partition_tree;
54pub mod simple_bulk_memtable;
55mod stats;
56pub mod time_partition;
57pub mod time_series;
58pub(crate) mod version;
59
60pub use bulk::part::{
61 BulkPart, BulkPartEncoder, BulkPartMeta, UnorderedPart, record_batch_estimated_size,
62 sort_primary_key_record_batch,
63};
64#[cfg(any(test, feature = "test"))]
65pub use time_partition::filter_record_batch;
66
67pub type MemtableId = u32;
71
72#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
74#[serde(tag = "type", rename_all = "snake_case")]
75pub enum MemtableConfig {
76 PartitionTree(PartitionTreeConfig),
77 #[default]
78 TimeSeries,
79}
80
81#[derive(Clone)]
83pub struct RangesOptions {
84 pub for_flush: bool,
86 pub pre_filter_mode: PreFilterMode,
88 pub predicate: PredicateGroup,
90 pub sequence: Option<SequenceRange>,
92}
93
94impl Default for RangesOptions {
95 fn default() -> Self {
96 Self {
97 for_flush: false,
98 pre_filter_mode: PreFilterMode::All,
99 predicate: PredicateGroup::default(),
100 sequence: None,
101 }
102 }
103}
104
105impl RangesOptions {
106 pub fn for_flush() -> Self {
108 Self {
109 for_flush: true,
110 pre_filter_mode: PreFilterMode::All,
111 predicate: PredicateGroup::default(),
112 sequence: None,
113 }
114 }
115
116 #[must_use]
118 pub fn with_pre_filter_mode(mut self, pre_filter_mode: PreFilterMode) -> Self {
119 self.pre_filter_mode = pre_filter_mode;
120 self
121 }
122
123 #[must_use]
125 pub fn with_predicate(mut self, predicate: PredicateGroup) -> Self {
126 self.predicate = predicate;
127 self
128 }
129
130 #[must_use]
132 pub fn with_sequence(mut self, sequence: Option<SequenceRange>) -> Self {
133 self.sequence = sequence;
134 self
135 }
136}
137
138#[derive(Debug, Default, Clone)]
139pub struct MemtableStats {
140 pub estimated_bytes: usize,
142 pub time_range: Option<(Timestamp, Timestamp)>,
145 pub num_rows: usize,
147 pub num_ranges: usize,
149 pub max_sequence: SequenceNumber,
151 pub series_count: usize,
153}
154
155impl MemtableStats {
156 #[cfg(any(test, feature = "test"))]
158 pub fn with_time_range(mut self, time_range: Option<(Timestamp, Timestamp)>) -> Self {
159 self.time_range = time_range;
160 self
161 }
162
163 #[cfg(feature = "test")]
164 pub fn with_max_sequence(mut self, max_sequence: SequenceNumber) -> Self {
165 self.max_sequence = max_sequence;
166 self
167 }
168
169 pub fn bytes_allocated(&self) -> usize {
171 self.estimated_bytes
172 }
173
174 pub fn time_range(&self) -> Option<(Timestamp, Timestamp)> {
176 self.time_range
177 }
178
179 pub fn num_rows(&self) -> usize {
181 self.num_rows
182 }
183
184 pub fn num_ranges(&self) -> usize {
186 self.num_ranges
187 }
188
189 pub fn max_sequence(&self) -> SequenceNumber {
191 self.max_sequence
192 }
193
194 pub fn series_count(&self) -> usize {
196 self.series_count
197 }
198}
199
200pub type BoxedBatchIterator = Box<dyn Iterator<Item = Result<Batch>> + Send>;
201
202pub type BoxedRecordBatchIterator = Box<dyn Iterator<Item = Result<RecordBatch>> + Send>;
203
204#[derive(Default)]
206pub struct MemtableRanges {
207 pub ranges: BTreeMap<usize, MemtableRange>,
209}
210
211impl MemtableRanges {
212 pub fn num_rows(&self) -> usize {
214 self.ranges.values().map(|r| r.stats().num_rows()).sum()
215 }
216
217 pub fn series_count(&self) -> usize {
219 self.ranges.values().map(|r| r.stats().series_count()).sum()
220 }
221
222 pub fn max_sequence(&self) -> SequenceNumber {
224 self.ranges
225 .values()
226 .map(|r| r.stats().max_sequence())
227 .max()
228 .unwrap_or(0)
229 }
230}
231
232impl IterBuilder for MemtableRanges {
233 fn build(&self, _metrics: Option<MemScanMetrics>) -> Result<BoxedBatchIterator> {
234 UnsupportedOperationSnafu {
235 err_msg: "MemtableRanges does not support build iterator",
236 }
237 .fail()
238 }
239
240 fn is_record_batch(&self) -> bool {
241 self.ranges.values().all(|range| range.is_record_batch())
242 }
243}
244
245pub trait Memtable: Send + Sync + fmt::Debug {
247 fn id(&self) -> MemtableId;
249
250 fn write(&self, kvs: &KeyValues) -> Result<()>;
252
253 fn write_one(&self, key_value: KeyValue) -> Result<()>;
255
256 fn write_bulk(&self, part: crate::memtable::bulk::part::BulkPart) -> Result<()>;
258
259 #[cfg(any(test, feature = "test"))]
266 fn iter(
267 &self,
268 projection: Option<&[ColumnId]>,
269 predicate: Option<table::predicate::Predicate>,
270 sequence: Option<SequenceRange>,
271 ) -> Result<BoxedBatchIterator>;
272
273 fn ranges(
277 &self,
278 projection: Option<&[ColumnId]>,
279 options: RangesOptions,
280 ) -> Result<MemtableRanges>;
281
282 fn is_empty(&self) -> bool;
284
285 fn freeze(&self) -> Result<()>;
287
288 fn stats(&self) -> MemtableStats;
290
291 fn fork(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef;
295
296 fn compact(&self, for_flush: bool) -> Result<()> {
300 let _ = for_flush;
301 Ok(())
302 }
303}
304
305pub type MemtableRef = Arc<dyn Memtable>;
306
307pub trait MemtableBuilder: Send + Sync + fmt::Debug {
309 fn build(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef;
311
312 fn use_bulk_insert(&self, metadata: &RegionMetadataRef) -> bool {
314 let _metadata = metadata;
315 false
316 }
317}
318
319pub type MemtableBuilderRef = Arc<dyn MemtableBuilder>;
320
321#[derive(Default)]
323pub struct AllocTracker {
324 write_buffer_manager: Option<WriteBufferManagerRef>,
325 bytes_allocated: AtomicUsize,
327 is_done_allocating: AtomicBool,
329}
330
331impl fmt::Debug for AllocTracker {
332 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
333 f.debug_struct("AllocTracker")
334 .field("bytes_allocated", &self.bytes_allocated)
335 .field("is_done_allocating", &self.is_done_allocating)
336 .finish()
337 }
338}
339
340impl AllocTracker {
341 pub fn new(write_buffer_manager: Option<WriteBufferManagerRef>) -> AllocTracker {
343 AllocTracker {
344 write_buffer_manager,
345 bytes_allocated: AtomicUsize::new(0),
346 is_done_allocating: AtomicBool::new(false),
347 }
348 }
349
350 pub(crate) fn on_allocation(&self, bytes: usize) {
352 self.bytes_allocated.fetch_add(bytes, Ordering::Relaxed);
353 WRITE_BUFFER_BYTES.add(bytes as i64);
354 if let Some(write_buffer_manager) = &self.write_buffer_manager {
355 write_buffer_manager.reserve_mem(bytes);
356 }
357 }
358
359 pub(crate) fn done_allocating(&self) {
364 if let Some(write_buffer_manager) = &self.write_buffer_manager
365 && self
366 .is_done_allocating
367 .compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed)
368 .is_ok()
369 {
370 write_buffer_manager.schedule_free_mem(self.bytes_allocated.load(Ordering::Relaxed));
371 }
372 }
373
374 pub(crate) fn bytes_allocated(&self) -> usize {
376 self.bytes_allocated.load(Ordering::Relaxed)
377 }
378
379 pub(crate) fn write_buffer_manager(&self) -> Option<WriteBufferManagerRef> {
381 self.write_buffer_manager.clone()
382 }
383}
384
385impl Drop for AllocTracker {
386 fn drop(&mut self) {
387 if !self.is_done_allocating.load(Ordering::Relaxed) {
388 self.done_allocating();
389 }
390
391 let bytes_allocated = self.bytes_allocated.load(Ordering::Relaxed);
392 WRITE_BUFFER_BYTES.sub(bytes_allocated as i64);
393
394 if let Some(write_buffer_manager) = &self.write_buffer_manager {
396 write_buffer_manager.free_mem(bytes_allocated);
397 }
398 }
399}
400
401#[derive(Clone)]
403pub(crate) struct MemtableBuilderProvider {
404 write_buffer_manager: Option<WriteBufferManagerRef>,
405 config: Arc<MitoConfig>,
406 compact_dispatcher: Arc<CompactDispatcher>,
407}
408
409impl MemtableBuilderProvider {
410 pub(crate) fn new(
411 write_buffer_manager: Option<WriteBufferManagerRef>,
412 config: Arc<MitoConfig>,
413 ) -> Self {
414 let compact_dispatcher =
415 Arc::new(CompactDispatcher::new(config.max_background_compactions));
416
417 Self {
418 write_buffer_manager,
419 config,
420 compact_dispatcher,
421 }
422 }
423
424 pub(crate) fn builder_for_options(&self, options: &RegionOptions) -> MemtableBuilderRef {
425 let dedup = options.need_dedup();
426 let merge_mode = options.merge_mode();
427 let flat_format = options
428 .sst_format
429 .map(|format| format == FormatType::Flat)
430 .unwrap_or(self.config.default_experimental_flat_format);
431 if flat_format {
432 if options.memtable.is_some() {
433 common_telemetry::info!(
434 "Overriding memtable config, use BulkMemtable under flat format"
435 );
436 }
437
438 return Arc::new(
439 BulkMemtableBuilder::new(
440 self.write_buffer_manager.clone(),
441 !dedup, merge_mode,
443 )
444 .with_compact_dispatcher(self.compact_dispatcher.clone()),
445 );
446 }
447
448 match &options.memtable {
450 Some(MemtableOptions::TimeSeries) => Arc::new(TimeSeriesMemtableBuilder::new(
451 self.write_buffer_manager.clone(),
452 dedup,
453 merge_mode,
454 )),
455 Some(MemtableOptions::PartitionTree(opts)) => {
456 Arc::new(PartitionTreeMemtableBuilder::new(
457 PartitionTreeConfig {
458 index_max_keys_per_shard: opts.index_max_keys_per_shard,
459 data_freeze_threshold: opts.data_freeze_threshold,
460 fork_dictionary_bytes: opts.fork_dictionary_bytes,
461 dedup,
462 merge_mode,
463 },
464 self.write_buffer_manager.clone(),
465 ))
466 }
467 None => self.default_primary_key_memtable_builder(dedup, merge_mode),
468 }
469 }
470
471 fn default_primary_key_memtable_builder(
472 &self,
473 dedup: bool,
474 merge_mode: MergeMode,
475 ) -> MemtableBuilderRef {
476 match &self.config.memtable {
477 MemtableConfig::PartitionTree(config) => {
478 let mut config = config.clone();
479 config.dedup = dedup;
480 Arc::new(PartitionTreeMemtableBuilder::new(
481 config,
482 self.write_buffer_manager.clone(),
483 ))
484 }
485 MemtableConfig::TimeSeries => Arc::new(TimeSeriesMemtableBuilder::new(
486 self.write_buffer_manager.clone(),
487 dedup,
488 merge_mode,
489 )),
490 }
491 }
492}
493
494#[derive(Clone, Default)]
496pub struct MemScanMetrics(Arc<Mutex<MemScanMetricsData>>);
497
498impl MemScanMetrics {
499 pub(crate) fn merge_inner(&self, inner: &MemScanMetricsData) {
501 let mut metrics = self.0.lock().unwrap();
502 metrics.total_series += inner.total_series;
503 metrics.num_rows += inner.num_rows;
504 metrics.num_batches += inner.num_batches;
505 metrics.scan_cost += inner.scan_cost;
506 }
507
508 pub(crate) fn data(&self) -> MemScanMetricsData {
510 self.0.lock().unwrap().clone()
511 }
512}
513
514#[derive(Clone, Default)]
515pub(crate) struct MemScanMetricsData {
516 pub(crate) total_series: usize,
518 pub(crate) num_rows: usize,
520 pub(crate) num_batches: usize,
522 pub(crate) scan_cost: Duration,
524}
525
526pub struct EncodedRange {
528 pub data: Bytes,
530 pub sst_info: SstInfo,
532}
533
534pub trait IterBuilder: Send + Sync {
537 fn build(&self, metrics: Option<MemScanMetrics>) -> Result<BoxedBatchIterator>;
539
540 fn is_record_batch(&self) -> bool {
542 false
543 }
544
545 fn build_record_batch(
547 &self,
548 metrics: Option<MemScanMetrics>,
549 ) -> Result<BoxedRecordBatchIterator> {
550 let _metrics = metrics;
551 UnsupportedOperationSnafu {
552 err_msg: "Record batch iterator is not supported by this memtable",
553 }
554 .fail()
555 }
556
557 fn encoded_range(&self) -> Option<EncodedRange> {
559 None
560 }
561}
562
563pub type BoxedIterBuilder = Box<dyn IterBuilder>;
564
565pub fn read_column_ids_from_projection(
570 metadata: &RegionMetadataRef,
571 projection: Option<&[ColumnId]>,
572) -> Vec<ColumnId> {
573 if let Some(projection) = projection {
574 projection.to_vec()
575 } else {
576 metadata
577 .column_metadatas
578 .iter()
579 .map(|c| c.column_id)
580 .collect()
581 }
582}
583
584pub struct BatchToRecordBatchContext {
586 metadata: RegionMetadataRef,
587 codec: Arc<dyn PrimaryKeyCodec>,
588 read_column_ids: Vec<ColumnId>,
589}
590
591impl BatchToRecordBatchContext {
592 pub fn new(metadata: RegionMetadataRef, mut read_column_ids: Vec<ColumnId>) -> Self {
594 if read_column_ids.is_empty() {
595 read_column_ids.push(metadata.time_index_column().column_id);
596 }
597
598 let codec = build_primary_key_codec(&metadata);
599 Self {
600 metadata,
601 codec,
602 read_column_ids,
603 }
604 }
605
606 fn adapt_iter(&self, iter: BoxedBatchIterator) -> BoxedRecordBatchIterator {
607 Box::new(BatchToRecordBatchAdapter::new(
608 iter,
609 self.metadata.clone(),
610 self.codec.clone(),
611 &self.read_column_ids,
612 ))
613 }
614}
615
616pub struct MemtableRangeContext {
618 id: MemtableId,
620 builder: BoxedIterBuilder,
622 predicate: PredicateGroup,
624 batch_to_record_batch: Option<Arc<BatchToRecordBatchContext>>,
626}
627
628pub type MemtableRangeContextRef = Arc<MemtableRangeContext>;
629
630impl MemtableRangeContext {
631 pub fn new(id: MemtableId, builder: BoxedIterBuilder, predicate: PredicateGroup) -> Self {
633 Self::new_with_batch_to_record_batch(id, builder, predicate, None)
634 }
635
636 pub fn new_with_batch_to_record_batch(
638 id: MemtableId,
639 builder: BoxedIterBuilder,
640 predicate: PredicateGroup,
641 batch_to_record_batch: Option<Arc<BatchToRecordBatchContext>>,
642 ) -> Self {
643 Self {
644 id,
645 builder,
646 predicate,
647 batch_to_record_batch,
648 }
649 }
650}
651
652#[derive(Clone)]
654pub struct MemtableRange {
655 context: MemtableRangeContextRef,
657 stats: MemtableStats,
659}
660
661impl MemtableRange {
662 pub fn new(context: MemtableRangeContextRef, stats: MemtableStats) -> Self {
664 Self { context, stats }
665 }
666
667 pub fn stats(&self) -> &MemtableStats {
669 &self.stats
670 }
671
672 pub fn id(&self) -> MemtableId {
674 self.context.id
675 }
676
677 pub fn build_prune_iter(
681 &self,
682 time_range: FileTimeRange,
683 metrics: Option<MemScanMetrics>,
684 ) -> Result<BoxedBatchIterator> {
685 let iter = self.context.builder.build(metrics)?;
686 let time_filters = self.context.predicate.time_filters();
687 Ok(Box::new(PruneTimeIterator::new(
688 iter,
689 time_range,
690 time_filters,
691 )))
692 }
693
694 pub fn build_iter(&self) -> Result<BoxedBatchIterator> {
696 self.context.builder.build(None)
697 }
698
699 pub fn build_record_batch_iter(
704 &self,
705 time_range: Option<FileTimeRange>,
706 metrics: Option<MemScanMetrics>,
707 ) -> Result<BoxedRecordBatchIterator> {
708 if self.context.builder.is_record_batch() {
709 return self.context.builder.build_record_batch(metrics);
710 }
711
712 if let Some(context) = self.context.batch_to_record_batch.as_ref() {
713 let iter = self.context.builder.build(metrics)?;
714 let iter: BoxedBatchIterator = if let Some(time_range) = time_range {
715 let time_filters = self.context.predicate.time_filters();
716 Box::new(PruneTimeIterator::new(iter, time_range, time_filters))
717 } else {
718 iter
719 };
720 return Ok(context.adapt_iter(iter));
721 }
722
723 UnsupportedOperationSnafu {
724 err_msg: "Record batch iterator is not supported by this memtable",
725 }
726 .fail()
727 }
728
729 pub fn is_record_batch(&self) -> bool {
731 self.context.builder.is_record_batch()
732 }
733
734 pub fn num_rows(&self) -> usize {
735 self.stats.num_rows
736 }
737
738 pub fn encoded(&self) -> Option<EncodedRange> {
740 self.context.builder.encoded_range()
741 }
742}
743
744#[cfg(test)]
745mod tests {
746 use std::sync::Arc;
747
748 use common_base::readable_size::ReadableSize;
749
750 use super::*;
751 use crate::flush::{WriteBufferManager, WriteBufferManagerImpl};
752
753 #[test]
754 fn test_deserialize_memtable_config() {
755 let s = r#"
756type = "partition_tree"
757index_max_keys_per_shard = 8192
758data_freeze_threshold = 1024
759dedup = true
760fork_dictionary_bytes = "512MiB"
761"#;
762 let config: MemtableConfig = toml::from_str(s).unwrap();
763 let MemtableConfig::PartitionTree(memtable_config) = config else {
764 unreachable!()
765 };
766 assert!(memtable_config.dedup);
767 assert_eq!(8192, memtable_config.index_max_keys_per_shard);
768 assert_eq!(1024, memtable_config.data_freeze_threshold);
769 assert_eq!(ReadableSize::mb(512), memtable_config.fork_dictionary_bytes);
770 }
771
772 #[test]
773 fn test_alloc_tracker_without_manager() {
774 let tracker = AllocTracker::new(None);
775 assert_eq!(0, tracker.bytes_allocated());
776 tracker.on_allocation(100);
777 assert_eq!(100, tracker.bytes_allocated());
778 tracker.on_allocation(200);
779 assert_eq!(300, tracker.bytes_allocated());
780
781 tracker.done_allocating();
782 assert_eq!(300, tracker.bytes_allocated());
783 }
784
785 #[test]
786 fn test_alloc_tracker_with_manager() {
787 let manager = Arc::new(WriteBufferManagerImpl::new(1000));
788 {
789 let tracker = AllocTracker::new(Some(manager.clone() as WriteBufferManagerRef));
790
791 tracker.on_allocation(100);
792 assert_eq!(100, tracker.bytes_allocated());
793 assert_eq!(100, manager.memory_usage());
794 assert_eq!(100, manager.mutable_usage());
795
796 for _ in 0..2 {
797 tracker.done_allocating();
799 assert_eq!(100, manager.memory_usage());
800 assert_eq!(0, manager.mutable_usage());
801 }
802 }
803
804 assert_eq!(0, manager.memory_usage());
805 assert_eq!(0, manager.mutable_usage());
806 }
807
808 #[test]
809 fn test_alloc_tracker_without_done_allocating() {
810 let manager = Arc::new(WriteBufferManagerImpl::new(1000));
811 {
812 let tracker = AllocTracker::new(Some(manager.clone() as WriteBufferManagerRef));
813
814 tracker.on_allocation(100);
815 assert_eq!(100, tracker.bytes_allocated());
816 assert_eq!(100, manager.memory_usage());
817 assert_eq!(100, manager.mutable_usage());
818 }
819
820 assert_eq!(0, manager.memory_usage());
821 assert_eq!(0, manager.mutable_usage());
822 }
823}