1use std::collections::BTreeMap;
18use std::fmt;
19use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
20use std::sync::{Arc, Mutex};
21use std::time::Duration;
22
23pub use bulk::part::EncodedBulkPart;
24use bytes::Bytes;
25use common_time::Timestamp;
26use datatypes::arrow::record_batch::RecordBatch;
27use mito_codec::key_values::KeyValue;
28pub use mito_codec::key_values::KeyValues;
29use serde::{Deserialize, Serialize};
30use store_api::metadata::RegionMetadataRef;
31use store_api::storage::{ColumnId, SequenceNumber, SequenceRange};
32
33use crate::config::MitoConfig;
34use crate::error::{Result, UnsupportedOperationSnafu};
35use crate::flush::WriteBufferManagerRef;
36use crate::memtable::bulk::{BulkMemtableBuilder, CompactDispatcher};
37use crate::memtable::partition_tree::{PartitionTreeConfig, PartitionTreeMemtableBuilder};
38use crate::memtable::time_series::TimeSeriesMemtableBuilder;
39use crate::metrics::WRITE_BUFFER_BYTES;
40use crate::read::Batch;
41use crate::read::prune::PruneTimeIterator;
42use crate::read::scan_region::PredicateGroup;
43use crate::region::options::{MemtableOptions, MergeMode, RegionOptions};
44use crate::sst::FormatType;
45use crate::sst::file::FileTimeRange;
46use crate::sst::parquet::SstInfo;
47use crate::sst::parquet::file_range::PreFilterMode;
48
49mod builder;
50pub mod bulk;
51pub mod partition_tree;
52pub mod simple_bulk_memtable;
53mod stats;
54pub mod time_partition;
55pub mod time_series;
56pub(crate) mod version;
57
58pub use bulk::part::{
59 BulkPart, BulkPartEncoder, BulkPartMeta, UnorderedPart, record_batch_estimated_size,
60 sort_primary_key_record_batch,
61};
62#[cfg(any(test, feature = "test"))]
63pub use time_partition::filter_record_batch;
64
65pub type MemtableId = u32;
69
70#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
72#[serde(tag = "type", rename_all = "snake_case")]
73pub enum MemtableConfig {
74 PartitionTree(PartitionTreeConfig),
75 #[default]
76 TimeSeries,
77}
78
79#[derive(Clone)]
81pub struct RangesOptions {
82 pub for_flush: bool,
84 pub pre_filter_mode: PreFilterMode,
86 pub predicate: PredicateGroup,
88 pub sequence: Option<SequenceRange>,
90}
91
92impl Default for RangesOptions {
93 fn default() -> Self {
94 Self {
95 for_flush: false,
96 pre_filter_mode: PreFilterMode::All,
97 predicate: PredicateGroup::default(),
98 sequence: None,
99 }
100 }
101}
102
103impl RangesOptions {
104 pub fn for_flush() -> Self {
106 Self {
107 for_flush: true,
108 pre_filter_mode: PreFilterMode::All,
109 predicate: PredicateGroup::default(),
110 sequence: None,
111 }
112 }
113
114 #[must_use]
116 pub fn with_pre_filter_mode(mut self, pre_filter_mode: PreFilterMode) -> Self {
117 self.pre_filter_mode = pre_filter_mode;
118 self
119 }
120
121 #[must_use]
123 pub fn with_predicate(mut self, predicate: PredicateGroup) -> Self {
124 self.predicate = predicate;
125 self
126 }
127
128 #[must_use]
130 pub fn with_sequence(mut self, sequence: Option<SequenceRange>) -> Self {
131 self.sequence = sequence;
132 self
133 }
134}
135
136#[derive(Debug, Default, Clone)]
137pub struct MemtableStats {
138 estimated_bytes: usize,
140 time_range: Option<(Timestamp, Timestamp)>,
143 pub num_rows: usize,
145 pub num_ranges: usize,
147 max_sequence: SequenceNumber,
149 series_count: usize,
151}
152
153impl MemtableStats {
154 #[cfg(any(test, feature = "test"))]
156 pub fn with_time_range(mut self, time_range: Option<(Timestamp, Timestamp)>) -> Self {
157 self.time_range = time_range;
158 self
159 }
160
161 #[cfg(feature = "test")]
162 pub fn with_max_sequence(mut self, max_sequence: SequenceNumber) -> Self {
163 self.max_sequence = max_sequence;
164 self
165 }
166
167 pub fn bytes_allocated(&self) -> usize {
169 self.estimated_bytes
170 }
171
172 pub fn time_range(&self) -> Option<(Timestamp, Timestamp)> {
174 self.time_range
175 }
176
177 pub fn num_rows(&self) -> usize {
179 self.num_rows
180 }
181
182 pub fn num_ranges(&self) -> usize {
184 self.num_ranges
185 }
186
187 pub fn max_sequence(&self) -> SequenceNumber {
189 self.max_sequence
190 }
191
192 pub fn series_count(&self) -> usize {
194 self.series_count
195 }
196}
197
198pub type BoxedBatchIterator = Box<dyn Iterator<Item = Result<Batch>> + Send>;
199
200pub type BoxedRecordBatchIterator = Box<dyn Iterator<Item = Result<RecordBatch>> + Send>;
201
202#[derive(Default)]
204pub struct MemtableRanges {
205 pub ranges: BTreeMap<usize, MemtableRange>,
207 pub stats: MemtableStats,
209}
210
211impl IterBuilder for MemtableRanges {
212 fn build(&self, _metrics: Option<MemScanMetrics>) -> Result<BoxedBatchIterator> {
213 UnsupportedOperationSnafu {
214 err_msg: "MemtableRanges does not support build iterator",
215 }
216 .fail()
217 }
218
219 fn is_record_batch(&self) -> bool {
220 self.ranges.values().all(|range| range.is_record_batch())
221 }
222}
223
224pub trait Memtable: Send + Sync + fmt::Debug {
226 fn id(&self) -> MemtableId;
228
229 fn write(&self, kvs: &KeyValues) -> Result<()>;
231
232 fn write_one(&self, key_value: KeyValue) -> Result<()>;
234
235 fn write_bulk(&self, part: crate::memtable::bulk::part::BulkPart) -> Result<()>;
237
238 #[cfg(any(test, feature = "test"))]
245 fn iter(
246 &self,
247 projection: Option<&[ColumnId]>,
248 predicate: Option<table::predicate::Predicate>,
249 sequence: Option<SequenceRange>,
250 ) -> Result<BoxedBatchIterator>;
251
252 fn ranges(
256 &self,
257 projection: Option<&[ColumnId]>,
258 options: RangesOptions,
259 ) -> Result<MemtableRanges>;
260
261 fn is_empty(&self) -> bool;
263
264 fn freeze(&self) -> Result<()>;
266
267 fn stats(&self) -> MemtableStats;
269
270 fn fork(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef;
274
275 fn compact(&self, for_flush: bool) -> Result<()> {
279 let _ = for_flush;
280 Ok(())
281 }
282}
283
284pub type MemtableRef = Arc<dyn Memtable>;
285
286pub trait MemtableBuilder: Send + Sync + fmt::Debug {
288 fn build(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef;
290
291 fn use_bulk_insert(&self, metadata: &RegionMetadataRef) -> bool {
293 let _metadata = metadata;
294 false
295 }
296}
297
298pub type MemtableBuilderRef = Arc<dyn MemtableBuilder>;
299
300#[derive(Default)]
302pub struct AllocTracker {
303 write_buffer_manager: Option<WriteBufferManagerRef>,
304 bytes_allocated: AtomicUsize,
306 is_done_allocating: AtomicBool,
308}
309
310impl fmt::Debug for AllocTracker {
311 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
312 f.debug_struct("AllocTracker")
313 .field("bytes_allocated", &self.bytes_allocated)
314 .field("is_done_allocating", &self.is_done_allocating)
315 .finish()
316 }
317}
318
319impl AllocTracker {
320 pub fn new(write_buffer_manager: Option<WriteBufferManagerRef>) -> AllocTracker {
322 AllocTracker {
323 write_buffer_manager,
324 bytes_allocated: AtomicUsize::new(0),
325 is_done_allocating: AtomicBool::new(false),
326 }
327 }
328
329 pub(crate) fn on_allocation(&self, bytes: usize) {
331 self.bytes_allocated.fetch_add(bytes, Ordering::Relaxed);
332 WRITE_BUFFER_BYTES.add(bytes as i64);
333 if let Some(write_buffer_manager) = &self.write_buffer_manager {
334 write_buffer_manager.reserve_mem(bytes);
335 }
336 }
337
338 pub(crate) fn done_allocating(&self) {
343 if let Some(write_buffer_manager) = &self.write_buffer_manager
344 && self
345 .is_done_allocating
346 .compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed)
347 .is_ok()
348 {
349 write_buffer_manager.schedule_free_mem(self.bytes_allocated.load(Ordering::Relaxed));
350 }
351 }
352
353 pub(crate) fn bytes_allocated(&self) -> usize {
355 self.bytes_allocated.load(Ordering::Relaxed)
356 }
357
358 pub(crate) fn write_buffer_manager(&self) -> Option<WriteBufferManagerRef> {
360 self.write_buffer_manager.clone()
361 }
362}
363
364impl Drop for AllocTracker {
365 fn drop(&mut self) {
366 if !self.is_done_allocating.load(Ordering::Relaxed) {
367 self.done_allocating();
368 }
369
370 let bytes_allocated = self.bytes_allocated.load(Ordering::Relaxed);
371 WRITE_BUFFER_BYTES.sub(bytes_allocated as i64);
372
373 if let Some(write_buffer_manager) = &self.write_buffer_manager {
375 write_buffer_manager.free_mem(bytes_allocated);
376 }
377 }
378}
379
380#[derive(Clone)]
382pub(crate) struct MemtableBuilderProvider {
383 write_buffer_manager: Option<WriteBufferManagerRef>,
384 config: Arc<MitoConfig>,
385 compact_dispatcher: Arc<CompactDispatcher>,
386}
387
388impl MemtableBuilderProvider {
389 pub(crate) fn new(
390 write_buffer_manager: Option<WriteBufferManagerRef>,
391 config: Arc<MitoConfig>,
392 ) -> Self {
393 let compact_dispatcher =
394 Arc::new(CompactDispatcher::new(config.max_background_compactions));
395
396 Self {
397 write_buffer_manager,
398 config,
399 compact_dispatcher,
400 }
401 }
402
403 pub(crate) fn builder_for_options(&self, options: &RegionOptions) -> MemtableBuilderRef {
404 let dedup = options.need_dedup();
405 let merge_mode = options.merge_mode();
406 let flat_format = options
407 .sst_format
408 .map(|format| format == FormatType::Flat)
409 .unwrap_or(self.config.default_experimental_flat_format);
410 if flat_format {
411 if options.memtable.is_some() {
412 common_telemetry::info!(
413 "Overriding memtable config, use BulkMemtable under flat format"
414 );
415 }
416
417 return Arc::new(
418 BulkMemtableBuilder::new(
419 self.write_buffer_manager.clone(),
420 !dedup, merge_mode,
422 )
423 .with_compact_dispatcher(self.compact_dispatcher.clone()),
424 );
425 }
426
427 match &options.memtable {
429 Some(MemtableOptions::TimeSeries) => Arc::new(TimeSeriesMemtableBuilder::new(
430 self.write_buffer_manager.clone(),
431 dedup,
432 merge_mode,
433 )),
434 Some(MemtableOptions::PartitionTree(opts)) => {
435 Arc::new(PartitionTreeMemtableBuilder::new(
436 PartitionTreeConfig {
437 index_max_keys_per_shard: opts.index_max_keys_per_shard,
438 data_freeze_threshold: opts.data_freeze_threshold,
439 fork_dictionary_bytes: opts.fork_dictionary_bytes,
440 dedup,
441 merge_mode,
442 },
443 self.write_buffer_manager.clone(),
444 ))
445 }
446 None => self.default_primary_key_memtable_builder(dedup, merge_mode),
447 }
448 }
449
450 fn default_primary_key_memtable_builder(
451 &self,
452 dedup: bool,
453 merge_mode: MergeMode,
454 ) -> MemtableBuilderRef {
455 match &self.config.memtable {
456 MemtableConfig::PartitionTree(config) => {
457 let mut config = config.clone();
458 config.dedup = dedup;
459 Arc::new(PartitionTreeMemtableBuilder::new(
460 config,
461 self.write_buffer_manager.clone(),
462 ))
463 }
464 MemtableConfig::TimeSeries => Arc::new(TimeSeriesMemtableBuilder::new(
465 self.write_buffer_manager.clone(),
466 dedup,
467 merge_mode,
468 )),
469 }
470 }
471}
472
473#[derive(Clone, Default)]
475pub struct MemScanMetrics(Arc<Mutex<MemScanMetricsData>>);
476
477impl MemScanMetrics {
478 pub(crate) fn merge_inner(&self, inner: &MemScanMetricsData) {
480 let mut metrics = self.0.lock().unwrap();
481 metrics.total_series += inner.total_series;
482 metrics.num_rows += inner.num_rows;
483 metrics.num_batches += inner.num_batches;
484 metrics.scan_cost += inner.scan_cost;
485 }
486
487 pub(crate) fn data(&self) -> MemScanMetricsData {
489 self.0.lock().unwrap().clone()
490 }
491}
492
493#[derive(Clone, Default)]
494pub(crate) struct MemScanMetricsData {
495 pub(crate) total_series: usize,
497 pub(crate) num_rows: usize,
499 pub(crate) num_batches: usize,
501 pub(crate) scan_cost: Duration,
503}
504
505pub struct EncodedRange {
507 pub data: Bytes,
509 pub sst_info: SstInfo,
511}
512
513pub trait IterBuilder: Send + Sync {
516 fn build(&self, metrics: Option<MemScanMetrics>) -> Result<BoxedBatchIterator>;
518
519 fn is_record_batch(&self) -> bool {
521 false
522 }
523
524 fn build_record_batch(
526 &self,
527 metrics: Option<MemScanMetrics>,
528 ) -> Result<BoxedRecordBatchIterator> {
529 let _metrics = metrics;
530 UnsupportedOperationSnafu {
531 err_msg: "Record batch iterator is not supported by this memtable",
532 }
533 .fail()
534 }
535
536 fn encoded_range(&self) -> Option<EncodedRange> {
538 None
539 }
540}
541
542pub type BoxedIterBuilder = Box<dyn IterBuilder>;
543
544pub struct MemtableRangeContext {
546 id: MemtableId,
548 builder: BoxedIterBuilder,
550 predicate: PredicateGroup,
552}
553
554pub type MemtableRangeContextRef = Arc<MemtableRangeContext>;
555
556impl MemtableRangeContext {
557 pub fn new(id: MemtableId, builder: BoxedIterBuilder, predicate: PredicateGroup) -> Self {
559 Self {
560 id,
561 builder,
562 predicate,
563 }
564 }
565}
566
567#[derive(Clone)]
569pub struct MemtableRange {
570 context: MemtableRangeContextRef,
572 num_rows: usize,
575}
576
577impl MemtableRange {
578 pub fn new(context: MemtableRangeContextRef, num_rows: usize) -> Self {
580 Self { context, num_rows }
581 }
582
583 pub fn id(&self) -> MemtableId {
585 self.context.id
586 }
587
588 pub fn build_prune_iter(
592 &self,
593 time_range: FileTimeRange,
594 metrics: Option<MemScanMetrics>,
595 ) -> Result<BoxedBatchIterator> {
596 let iter = self.context.builder.build(metrics)?;
597 let time_filters = self.context.predicate.time_filters();
598 Ok(Box::new(PruneTimeIterator::new(
599 iter,
600 time_range,
601 time_filters,
602 )))
603 }
604
605 pub fn build_iter(&self) -> Result<BoxedBatchIterator> {
607 self.context.builder.build(None)
608 }
609
610 pub fn build_record_batch_iter(
615 &self,
616 metrics: Option<MemScanMetrics>,
617 ) -> Result<BoxedRecordBatchIterator> {
618 self.context.builder.build_record_batch(metrics)
619 }
620
621 pub fn is_record_batch(&self) -> bool {
623 self.context.builder.is_record_batch()
624 }
625
626 pub fn num_rows(&self) -> usize {
627 self.num_rows
628 }
629
630 pub fn encoded(&self) -> Option<EncodedRange> {
632 self.context.builder.encoded_range()
633 }
634}
635
636#[cfg(test)]
637mod tests {
638 use common_base::readable_size::ReadableSize;
639
640 use super::*;
641 use crate::flush::{WriteBufferManager, WriteBufferManagerImpl};
642
643 #[test]
644 fn test_deserialize_memtable_config() {
645 let s = r#"
646type = "partition_tree"
647index_max_keys_per_shard = 8192
648data_freeze_threshold = 1024
649dedup = true
650fork_dictionary_bytes = "512MiB"
651"#;
652 let config: MemtableConfig = toml::from_str(s).unwrap();
653 let MemtableConfig::PartitionTree(memtable_config) = config else {
654 unreachable!()
655 };
656 assert!(memtable_config.dedup);
657 assert_eq!(8192, memtable_config.index_max_keys_per_shard);
658 assert_eq!(1024, memtable_config.data_freeze_threshold);
659 assert_eq!(ReadableSize::mb(512), memtable_config.fork_dictionary_bytes);
660 }
661
662 #[test]
663 fn test_alloc_tracker_without_manager() {
664 let tracker = AllocTracker::new(None);
665 assert_eq!(0, tracker.bytes_allocated());
666 tracker.on_allocation(100);
667 assert_eq!(100, tracker.bytes_allocated());
668 tracker.on_allocation(200);
669 assert_eq!(300, tracker.bytes_allocated());
670
671 tracker.done_allocating();
672 assert_eq!(300, tracker.bytes_allocated());
673 }
674
675 #[test]
676 fn test_alloc_tracker_with_manager() {
677 let manager = Arc::new(WriteBufferManagerImpl::new(1000));
678 {
679 let tracker = AllocTracker::new(Some(manager.clone() as WriteBufferManagerRef));
680
681 tracker.on_allocation(100);
682 assert_eq!(100, tracker.bytes_allocated());
683 assert_eq!(100, manager.memory_usage());
684 assert_eq!(100, manager.mutable_usage());
685
686 for _ in 0..2 {
687 tracker.done_allocating();
689 assert_eq!(100, manager.memory_usage());
690 assert_eq!(0, manager.mutable_usage());
691 }
692 }
693
694 assert_eq!(0, manager.memory_usage());
695 assert_eq!(0, manager.mutable_usage());
696 }
697
698 #[test]
699 fn test_alloc_tracker_without_done_allocating() {
700 let manager = Arc::new(WriteBufferManagerImpl::new(1000));
701 {
702 let tracker = AllocTracker::new(Some(manager.clone() as WriteBufferManagerRef));
703
704 tracker.on_allocation(100);
705 assert_eq!(100, tracker.bytes_allocated());
706 assert_eq!(100, manager.memory_usage());
707 assert_eq!(100, manager.mutable_usage());
708 }
709
710 assert_eq!(0, manager.memory_usage());
711 assert_eq!(0, manager.mutable_usage());
712 }
713}