1use std::collections::BTreeMap;
18use std::fmt;
19use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
20use std::sync::{Arc, Mutex};
21use std::time::Duration;
22
23pub use bulk::part::EncodedBulkPart;
24use bytes::Bytes;
25use common_time::Timestamp;
26use datatypes::arrow::record_batch::RecordBatch;
27use mito_codec::key_values::KeyValue;
28pub use mito_codec::key_values::KeyValues;
29use serde::{Deserialize, Serialize};
30use store_api::metadata::RegionMetadataRef;
31use store_api::storage::{ColumnId, SequenceNumber, SequenceRange};
32
33use crate::config::MitoConfig;
34use crate::error::{Result, UnsupportedOperationSnafu};
35use crate::flush::WriteBufferManagerRef;
36use crate::memtable::bulk::{BulkMemtableBuilder, CompactDispatcher};
37use crate::memtable::partition_tree::{PartitionTreeConfig, PartitionTreeMemtableBuilder};
38use crate::memtable::time_series::TimeSeriesMemtableBuilder;
39use crate::metrics::WRITE_BUFFER_BYTES;
40use crate::read::Batch;
41use crate::read::prune::PruneTimeIterator;
42use crate::read::scan_region::PredicateGroup;
43use crate::region::options::{MemtableOptions, MergeMode, RegionOptions};
44use crate::sst::FormatType;
45use crate::sst::file::FileTimeRange;
46use crate::sst::parquet::SstInfo;
47use crate::sst::parquet::file_range::PreFilterMode;
48
49mod builder;
50pub mod bulk;
51pub mod partition_tree;
52pub mod simple_bulk_memtable;
53mod stats;
54pub mod time_partition;
55pub mod time_series;
56pub(crate) mod version;
57
58#[cfg(any(test, feature = "test"))]
59pub use bulk::part::BulkPart;
60#[cfg(any(test, feature = "test"))]
61pub use time_partition::filter_record_batch;
62
63pub type MemtableId = u32;
67
68#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
70#[serde(tag = "type", rename_all = "snake_case")]
71pub enum MemtableConfig {
72 PartitionTree(PartitionTreeConfig),
73 #[default]
74 TimeSeries,
75}
76
77#[derive(Clone)]
79pub struct RangesOptions {
80 pub for_flush: bool,
82 pub pre_filter_mode: PreFilterMode,
84 pub predicate: PredicateGroup,
86 pub sequence: Option<SequenceRange>,
88}
89
90impl Default for RangesOptions {
91 fn default() -> Self {
92 Self {
93 for_flush: false,
94 pre_filter_mode: PreFilterMode::All,
95 predicate: PredicateGroup::default(),
96 sequence: None,
97 }
98 }
99}
100
101impl RangesOptions {
102 pub fn for_flush() -> Self {
104 Self {
105 for_flush: true,
106 pre_filter_mode: PreFilterMode::All,
107 predicate: PredicateGroup::default(),
108 sequence: None,
109 }
110 }
111
112 #[must_use]
114 pub fn with_pre_filter_mode(mut self, pre_filter_mode: PreFilterMode) -> Self {
115 self.pre_filter_mode = pre_filter_mode;
116 self
117 }
118
119 #[must_use]
121 pub fn with_predicate(mut self, predicate: PredicateGroup) -> Self {
122 self.predicate = predicate;
123 self
124 }
125
126 #[must_use]
128 pub fn with_sequence(mut self, sequence: Option<SequenceRange>) -> Self {
129 self.sequence = sequence;
130 self
131 }
132}
133
134#[derive(Debug, Default, Clone)]
135pub struct MemtableStats {
136 estimated_bytes: usize,
138 time_range: Option<(Timestamp, Timestamp)>,
141 pub num_rows: usize,
143 pub num_ranges: usize,
145 max_sequence: SequenceNumber,
147 series_count: usize,
149}
150
151impl MemtableStats {
152 #[cfg(any(test, feature = "test"))]
154 pub fn with_time_range(mut self, time_range: Option<(Timestamp, Timestamp)>) -> Self {
155 self.time_range = time_range;
156 self
157 }
158
159 #[cfg(feature = "test")]
160 pub fn with_max_sequence(mut self, max_sequence: SequenceNumber) -> Self {
161 self.max_sequence = max_sequence;
162 self
163 }
164
165 pub fn bytes_allocated(&self) -> usize {
167 self.estimated_bytes
168 }
169
170 pub fn time_range(&self) -> Option<(Timestamp, Timestamp)> {
172 self.time_range
173 }
174
175 pub fn num_rows(&self) -> usize {
177 self.num_rows
178 }
179
180 pub fn num_ranges(&self) -> usize {
182 self.num_ranges
183 }
184
185 pub fn max_sequence(&self) -> SequenceNumber {
187 self.max_sequence
188 }
189
190 pub fn series_count(&self) -> usize {
192 self.series_count
193 }
194}
195
196pub type BoxedBatchIterator = Box<dyn Iterator<Item = Result<Batch>> + Send>;
197
198pub type BoxedRecordBatchIterator = Box<dyn Iterator<Item = Result<RecordBatch>> + Send>;
199
200#[derive(Default)]
202pub struct MemtableRanges {
203 pub ranges: BTreeMap<usize, MemtableRange>,
205 pub stats: MemtableStats,
207}
208
209impl IterBuilder for MemtableRanges {
210 fn build(&self, _metrics: Option<MemScanMetrics>) -> Result<BoxedBatchIterator> {
211 UnsupportedOperationSnafu {
212 err_msg: "MemtableRanges does not support build iterator",
213 }
214 .fail()
215 }
216
217 fn is_record_batch(&self) -> bool {
218 self.ranges.values().all(|range| range.is_record_batch())
219 }
220}
221
222pub trait Memtable: Send + Sync + fmt::Debug {
224 fn id(&self) -> MemtableId;
226
227 fn write(&self, kvs: &KeyValues) -> Result<()>;
229
230 fn write_one(&self, key_value: KeyValue) -> Result<()>;
232
233 fn write_bulk(&self, part: crate::memtable::bulk::part::BulkPart) -> Result<()>;
235
236 #[cfg(any(test, feature = "test"))]
243 fn iter(
244 &self,
245 projection: Option<&[ColumnId]>,
246 predicate: Option<table::predicate::Predicate>,
247 sequence: Option<SequenceRange>,
248 ) -> Result<BoxedBatchIterator>;
249
250 fn ranges(
254 &self,
255 projection: Option<&[ColumnId]>,
256 options: RangesOptions,
257 ) -> Result<MemtableRanges>;
258
259 fn is_empty(&self) -> bool;
261
262 fn freeze(&self) -> Result<()>;
264
265 fn stats(&self) -> MemtableStats;
267
268 fn fork(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef;
272
273 fn compact(&self, for_flush: bool) -> Result<()> {
277 let _ = for_flush;
278 Ok(())
279 }
280}
281
282pub type MemtableRef = Arc<dyn Memtable>;
283
284pub trait MemtableBuilder: Send + Sync + fmt::Debug {
286 fn build(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef;
288
289 fn use_bulk_insert(&self, metadata: &RegionMetadataRef) -> bool {
291 let _metadata = metadata;
292 false
293 }
294}
295
296pub type MemtableBuilderRef = Arc<dyn MemtableBuilder>;
297
298#[derive(Default)]
300pub struct AllocTracker {
301 write_buffer_manager: Option<WriteBufferManagerRef>,
302 bytes_allocated: AtomicUsize,
304 is_done_allocating: AtomicBool,
306}
307
308impl fmt::Debug for AllocTracker {
309 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
310 f.debug_struct("AllocTracker")
311 .field("bytes_allocated", &self.bytes_allocated)
312 .field("is_done_allocating", &self.is_done_allocating)
313 .finish()
314 }
315}
316
317impl AllocTracker {
318 pub fn new(write_buffer_manager: Option<WriteBufferManagerRef>) -> AllocTracker {
320 AllocTracker {
321 write_buffer_manager,
322 bytes_allocated: AtomicUsize::new(0),
323 is_done_allocating: AtomicBool::new(false),
324 }
325 }
326
327 pub(crate) fn on_allocation(&self, bytes: usize) {
329 self.bytes_allocated.fetch_add(bytes, Ordering::Relaxed);
330 WRITE_BUFFER_BYTES.add(bytes as i64);
331 if let Some(write_buffer_manager) = &self.write_buffer_manager {
332 write_buffer_manager.reserve_mem(bytes);
333 }
334 }
335
336 pub(crate) fn done_allocating(&self) {
341 if let Some(write_buffer_manager) = &self.write_buffer_manager
342 && self
343 .is_done_allocating
344 .compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed)
345 .is_ok()
346 {
347 write_buffer_manager.schedule_free_mem(self.bytes_allocated.load(Ordering::Relaxed));
348 }
349 }
350
351 pub(crate) fn bytes_allocated(&self) -> usize {
353 self.bytes_allocated.load(Ordering::Relaxed)
354 }
355
356 pub(crate) fn write_buffer_manager(&self) -> Option<WriteBufferManagerRef> {
358 self.write_buffer_manager.clone()
359 }
360}
361
362impl Drop for AllocTracker {
363 fn drop(&mut self) {
364 if !self.is_done_allocating.load(Ordering::Relaxed) {
365 self.done_allocating();
366 }
367
368 let bytes_allocated = self.bytes_allocated.load(Ordering::Relaxed);
369 WRITE_BUFFER_BYTES.sub(bytes_allocated as i64);
370
371 if let Some(write_buffer_manager) = &self.write_buffer_manager {
373 write_buffer_manager.free_mem(bytes_allocated);
374 }
375 }
376}
377
378#[derive(Clone)]
380pub(crate) struct MemtableBuilderProvider {
381 write_buffer_manager: Option<WriteBufferManagerRef>,
382 config: Arc<MitoConfig>,
383 compact_dispatcher: Arc<CompactDispatcher>,
384}
385
386impl MemtableBuilderProvider {
387 pub(crate) fn new(
388 write_buffer_manager: Option<WriteBufferManagerRef>,
389 config: Arc<MitoConfig>,
390 ) -> Self {
391 let compact_dispatcher =
392 Arc::new(CompactDispatcher::new(config.max_background_compactions));
393
394 Self {
395 write_buffer_manager,
396 config,
397 compact_dispatcher,
398 }
399 }
400
401 pub(crate) fn builder_for_options(&self, options: &RegionOptions) -> MemtableBuilderRef {
402 let dedup = options.need_dedup();
403 let merge_mode = options.merge_mode();
404 let flat_format = options
405 .sst_format
406 .map(|format| format == FormatType::Flat)
407 .unwrap_or(self.config.default_experimental_flat_format);
408 if flat_format {
409 if options.memtable.is_some() {
410 common_telemetry::info!(
411 "Overriding memtable config, use BulkMemtable under flat format"
412 );
413 }
414
415 return Arc::new(
416 BulkMemtableBuilder::new(
417 self.write_buffer_manager.clone(),
418 !dedup, merge_mode,
420 )
421 .with_compact_dispatcher(self.compact_dispatcher.clone()),
422 );
423 }
424
425 match &options.memtable {
427 Some(MemtableOptions::TimeSeries) => Arc::new(TimeSeriesMemtableBuilder::new(
428 self.write_buffer_manager.clone(),
429 dedup,
430 merge_mode,
431 )),
432 Some(MemtableOptions::PartitionTree(opts)) => {
433 Arc::new(PartitionTreeMemtableBuilder::new(
434 PartitionTreeConfig {
435 index_max_keys_per_shard: opts.index_max_keys_per_shard,
436 data_freeze_threshold: opts.data_freeze_threshold,
437 fork_dictionary_bytes: opts.fork_dictionary_bytes,
438 dedup,
439 merge_mode,
440 },
441 self.write_buffer_manager.clone(),
442 ))
443 }
444 None => self.default_primary_key_memtable_builder(dedup, merge_mode),
445 }
446 }
447
448 fn default_primary_key_memtable_builder(
449 &self,
450 dedup: bool,
451 merge_mode: MergeMode,
452 ) -> MemtableBuilderRef {
453 match &self.config.memtable {
454 MemtableConfig::PartitionTree(config) => {
455 let mut config = config.clone();
456 config.dedup = dedup;
457 Arc::new(PartitionTreeMemtableBuilder::new(
458 config,
459 self.write_buffer_manager.clone(),
460 ))
461 }
462 MemtableConfig::TimeSeries => Arc::new(TimeSeriesMemtableBuilder::new(
463 self.write_buffer_manager.clone(),
464 dedup,
465 merge_mode,
466 )),
467 }
468 }
469}
470
471#[derive(Clone, Default)]
473pub struct MemScanMetrics(Arc<Mutex<MemScanMetricsData>>);
474
475impl MemScanMetrics {
476 pub(crate) fn merge_inner(&self, inner: &MemScanMetricsData) {
478 let mut metrics = self.0.lock().unwrap();
479 metrics.total_series += inner.total_series;
480 metrics.num_rows += inner.num_rows;
481 metrics.num_batches += inner.num_batches;
482 metrics.scan_cost += inner.scan_cost;
483 }
484
485 pub(crate) fn data(&self) -> MemScanMetricsData {
487 self.0.lock().unwrap().clone()
488 }
489}
490
491#[derive(Clone, Default)]
492pub(crate) struct MemScanMetricsData {
493 pub(crate) total_series: usize,
495 pub(crate) num_rows: usize,
497 pub(crate) num_batches: usize,
499 pub(crate) scan_cost: Duration,
501}
502
503pub struct EncodedRange {
505 pub data: Bytes,
507 pub sst_info: SstInfo,
509}
510
511pub trait IterBuilder: Send + Sync {
514 fn build(&self, metrics: Option<MemScanMetrics>) -> Result<BoxedBatchIterator>;
516
517 fn is_record_batch(&self) -> bool {
519 false
520 }
521
522 fn build_record_batch(
524 &self,
525 metrics: Option<MemScanMetrics>,
526 ) -> Result<BoxedRecordBatchIterator> {
527 let _metrics = metrics;
528 UnsupportedOperationSnafu {
529 err_msg: "Record batch iterator is not supported by this memtable",
530 }
531 .fail()
532 }
533
534 fn encoded_range(&self) -> Option<EncodedRange> {
536 None
537 }
538}
539
540pub type BoxedIterBuilder = Box<dyn IterBuilder>;
541
542pub struct MemtableRangeContext {
544 id: MemtableId,
546 builder: BoxedIterBuilder,
548 predicate: PredicateGroup,
550}
551
552pub type MemtableRangeContextRef = Arc<MemtableRangeContext>;
553
554impl MemtableRangeContext {
555 pub fn new(id: MemtableId, builder: BoxedIterBuilder, predicate: PredicateGroup) -> Self {
557 Self {
558 id,
559 builder,
560 predicate,
561 }
562 }
563}
564
565#[derive(Clone)]
567pub struct MemtableRange {
568 context: MemtableRangeContextRef,
570 num_rows: usize,
573}
574
575impl MemtableRange {
576 pub fn new(context: MemtableRangeContextRef, num_rows: usize) -> Self {
578 Self { context, num_rows }
579 }
580
581 pub fn id(&self) -> MemtableId {
583 self.context.id
584 }
585
586 pub fn build_prune_iter(
590 &self,
591 time_range: FileTimeRange,
592 metrics: Option<MemScanMetrics>,
593 ) -> Result<BoxedBatchIterator> {
594 let iter = self.context.builder.build(metrics)?;
595 let time_filters = self.context.predicate.time_filters();
596 Ok(Box::new(PruneTimeIterator::new(
597 iter,
598 time_range,
599 time_filters,
600 )))
601 }
602
603 pub fn build_iter(&self) -> Result<BoxedBatchIterator> {
605 self.context.builder.build(None)
606 }
607
608 pub fn build_record_batch_iter(
613 &self,
614 metrics: Option<MemScanMetrics>,
615 ) -> Result<BoxedRecordBatchIterator> {
616 self.context.builder.build_record_batch(metrics)
617 }
618
619 pub fn is_record_batch(&self) -> bool {
621 self.context.builder.is_record_batch()
622 }
623
624 pub fn num_rows(&self) -> usize {
625 self.num_rows
626 }
627
628 pub fn encoded(&self) -> Option<EncodedRange> {
630 self.context.builder.encoded_range()
631 }
632}
633
634#[cfg(test)]
635mod tests {
636 use common_base::readable_size::ReadableSize;
637
638 use super::*;
639 use crate::flush::{WriteBufferManager, WriteBufferManagerImpl};
640
641 #[test]
642 fn test_deserialize_memtable_config() {
643 let s = r#"
644type = "partition_tree"
645index_max_keys_per_shard = 8192
646data_freeze_threshold = 1024
647dedup = true
648fork_dictionary_bytes = "512MiB"
649"#;
650 let config: MemtableConfig = toml::from_str(s).unwrap();
651 let MemtableConfig::PartitionTree(memtable_config) = config else {
652 unreachable!()
653 };
654 assert!(memtable_config.dedup);
655 assert_eq!(8192, memtable_config.index_max_keys_per_shard);
656 assert_eq!(1024, memtable_config.data_freeze_threshold);
657 assert_eq!(ReadableSize::mb(512), memtable_config.fork_dictionary_bytes);
658 }
659
660 #[test]
661 fn test_alloc_tracker_without_manager() {
662 let tracker = AllocTracker::new(None);
663 assert_eq!(0, tracker.bytes_allocated());
664 tracker.on_allocation(100);
665 assert_eq!(100, tracker.bytes_allocated());
666 tracker.on_allocation(200);
667 assert_eq!(300, tracker.bytes_allocated());
668
669 tracker.done_allocating();
670 assert_eq!(300, tracker.bytes_allocated());
671 }
672
673 #[test]
674 fn test_alloc_tracker_with_manager() {
675 let manager = Arc::new(WriteBufferManagerImpl::new(1000));
676 {
677 let tracker = AllocTracker::new(Some(manager.clone() as WriteBufferManagerRef));
678
679 tracker.on_allocation(100);
680 assert_eq!(100, tracker.bytes_allocated());
681 assert_eq!(100, manager.memory_usage());
682 assert_eq!(100, manager.mutable_usage());
683
684 for _ in 0..2 {
685 tracker.done_allocating();
687 assert_eq!(100, manager.memory_usage());
688 assert_eq!(0, manager.mutable_usage());
689 }
690 }
691
692 assert_eq!(0, manager.memory_usage());
693 assert_eq!(0, manager.mutable_usage());
694 }
695
696 #[test]
697 fn test_alloc_tracker_without_done_allocating() {
698 let manager = Arc::new(WriteBufferManagerImpl::new(1000));
699 {
700 let tracker = AllocTracker::new(Some(manager.clone() as WriteBufferManagerRef));
701
702 tracker.on_allocation(100);
703 assert_eq!(100, tracker.bytes_allocated());
704 assert_eq!(100, manager.memory_usage());
705 assert_eq!(100, manager.mutable_usage());
706 }
707
708 assert_eq!(0, manager.memory_usage());
709 assert_eq!(0, manager.mutable_usage());
710 }
711}