1use std::collections::BTreeMap;
18use std::fmt;
19use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
20use std::sync::{Arc, Mutex};
21use std::time::Duration;
22
23pub use bulk::part::EncodedBulkPart;
24use bytes::Bytes;
25use common_time::Timestamp;
26use datatypes::arrow::record_batch::RecordBatch;
27use mito_codec::key_values::KeyValue;
28pub use mito_codec::key_values::KeyValues;
29use mito_codec::row_converter::{PrimaryKeyCodec, build_primary_key_codec};
30use serde::{Deserialize, Serialize};
31use snafu::ensure;
32use store_api::metadata::RegionMetadataRef;
33use store_api::storage::{ColumnId, SequenceNumber, SequenceRange};
34
35use crate::config::MitoConfig;
36use crate::error::{Result, UnsupportedOperationSnafu};
37use crate::flush::WriteBufferManagerRef;
38use crate::memtable::bulk::{BulkMemtableBuilder, CompactDispatcher};
39use crate::memtable::partition_tree::{PartitionTreeConfig, PartitionTreeMemtableBuilder};
40use crate::memtable::time_series::TimeSeriesMemtableBuilder;
41use crate::metrics::WRITE_BUFFER_BYTES;
42use crate::read::Batch;
43use crate::read::batch_adapter::BatchToRecordBatchAdapter;
44use crate::read::prune::PruneTimeIterator;
45use crate::read::scan_region::PredicateGroup;
46use crate::region::options::{MemtableOptions, MergeMode, RegionOptions};
47use crate::sst::FormatType;
48use crate::sst::file::FileTimeRange;
49use crate::sst::parquet::SstInfo;
50use crate::sst::parquet::file_range::PreFilterMode;
51
52mod builder;
53pub mod bulk;
54pub mod partition_tree;
55pub mod simple_bulk_memtable;
56mod stats;
57pub mod time_partition;
58pub mod time_series;
59pub(crate) mod version;
60
61pub use bulk::part::{
62 BulkPart, BulkPartEncoder, BulkPartMeta, UnorderedPart, record_batch_estimated_size,
63 sort_primary_key_record_batch,
64};
65#[cfg(any(test, feature = "test"))]
66pub use time_partition::filter_record_batch;
67
68pub type MemtableId = u32;
72
73#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
75#[serde(tag = "type", rename_all = "snake_case")]
76pub enum MemtableConfig {
77 PartitionTree(PartitionTreeConfig),
78 #[default]
79 TimeSeries,
80}
81
82#[derive(Clone)]
84pub struct RangesOptions {
85 pub for_flush: bool,
87 pub pre_filter_mode: PreFilterMode,
89 pub predicate: PredicateGroup,
91 pub sequence: Option<SequenceRange>,
93}
94
95impl Default for RangesOptions {
96 fn default() -> Self {
97 Self {
98 for_flush: false,
99 pre_filter_mode: PreFilterMode::All,
100 predicate: PredicateGroup::default(),
101 sequence: None,
102 }
103 }
104}
105
106impl RangesOptions {
107 pub fn for_flush() -> Self {
109 Self {
110 for_flush: true,
111 pre_filter_mode: PreFilterMode::All,
112 predicate: PredicateGroup::default(),
113 sequence: None,
114 }
115 }
116
117 #[must_use]
119 pub fn with_pre_filter_mode(mut self, pre_filter_mode: PreFilterMode) -> Self {
120 self.pre_filter_mode = pre_filter_mode;
121 self
122 }
123
124 #[must_use]
126 pub fn with_predicate(mut self, predicate: PredicateGroup) -> Self {
127 self.predicate = predicate;
128 self
129 }
130
131 #[must_use]
133 pub fn with_sequence(mut self, sequence: Option<SequenceRange>) -> Self {
134 self.sequence = sequence;
135 self
136 }
137}
138
139#[derive(Debug, Default, Clone)]
140pub struct MemtableStats {
141 pub estimated_bytes: usize,
143 pub time_range: Option<(Timestamp, Timestamp)>,
146 pub num_rows: usize,
148 pub num_ranges: usize,
150 pub max_sequence: SequenceNumber,
152 pub series_count: usize,
154}
155
156impl MemtableStats {
157 #[cfg(any(test, feature = "test"))]
159 pub fn with_time_range(mut self, time_range: Option<(Timestamp, Timestamp)>) -> Self {
160 self.time_range = time_range;
161 self
162 }
163
164 #[cfg(feature = "test")]
165 pub fn with_max_sequence(mut self, max_sequence: SequenceNumber) -> Self {
166 self.max_sequence = max_sequence;
167 self
168 }
169
170 pub fn bytes_allocated(&self) -> usize {
172 self.estimated_bytes
173 }
174
175 pub fn time_range(&self) -> Option<(Timestamp, Timestamp)> {
177 self.time_range
178 }
179
180 pub fn num_rows(&self) -> usize {
182 self.num_rows
183 }
184
185 pub fn num_ranges(&self) -> usize {
187 self.num_ranges
188 }
189
190 pub fn max_sequence(&self) -> SequenceNumber {
192 self.max_sequence
193 }
194
195 pub fn series_count(&self) -> usize {
197 self.series_count
198 }
199}
200
201pub type BoxedBatchIterator = Box<dyn Iterator<Item = Result<Batch>> + Send>;
202
203pub type BoxedRecordBatchIterator = Box<dyn Iterator<Item = Result<RecordBatch>> + Send>;
204
205#[derive(Default)]
207pub struct MemtableRanges {
208 pub ranges: BTreeMap<usize, MemtableRange>,
210}
211
212impl MemtableRanges {
213 pub fn num_rows(&self) -> usize {
215 self.ranges.values().map(|r| r.stats().num_rows()).sum()
216 }
217
218 pub fn series_count(&self) -> usize {
220 self.ranges.values().map(|r| r.stats().series_count()).sum()
221 }
222
223 pub fn max_sequence(&self) -> SequenceNumber {
225 self.ranges
226 .values()
227 .map(|r| r.stats().max_sequence())
228 .max()
229 .unwrap_or(0)
230 }
231}
232
233impl IterBuilder for MemtableRanges {
234 fn build(&self, _metrics: Option<MemScanMetrics>) -> Result<BoxedBatchIterator> {
235 ensure!(
236 self.ranges.len() == 1,
237 UnsupportedOperationSnafu {
238 err_msg: format!(
239 "Building an iterator from MemtableRanges expects 1 range, but got {}",
240 self.ranges.len()
241 ),
242 }
243 );
244
245 self.ranges.values().next().unwrap().build_iter()
246 }
247
248 fn is_record_batch(&self) -> bool {
249 self.ranges.values().all(|range| range.is_record_batch())
250 }
251}
252
253pub trait Memtable: Send + Sync + fmt::Debug {
255 fn id(&self) -> MemtableId;
257
258 fn write(&self, kvs: &KeyValues) -> Result<()>;
260
261 fn write_one(&self, key_value: KeyValue) -> Result<()>;
263
264 fn write_bulk(&self, part: crate::memtable::bulk::part::BulkPart) -> Result<()>;
266
267 fn ranges(
271 &self,
272 projection: Option<&[ColumnId]>,
273 options: RangesOptions,
274 ) -> Result<MemtableRanges>;
275
276 fn is_empty(&self) -> bool;
278
279 fn freeze(&self) -> Result<()>;
281
282 fn stats(&self) -> MemtableStats;
284
285 fn fork(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef;
289
290 fn compact(&self, for_flush: bool) -> Result<()> {
294 let _ = for_flush;
295 Ok(())
296 }
297}
298
299pub type MemtableRef = Arc<dyn Memtable>;
300
301pub trait MemtableBuilder: Send + Sync + fmt::Debug {
303 fn build(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef;
305
306 fn use_bulk_insert(&self, metadata: &RegionMetadataRef) -> bool {
308 let _metadata = metadata;
309 false
310 }
311}
312
313pub type MemtableBuilderRef = Arc<dyn MemtableBuilder>;
314
315#[derive(Default)]
317pub struct AllocTracker {
318 write_buffer_manager: Option<WriteBufferManagerRef>,
319 bytes_allocated: AtomicUsize,
321 is_done_allocating: AtomicBool,
323}
324
325impl fmt::Debug for AllocTracker {
326 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
327 f.debug_struct("AllocTracker")
328 .field("bytes_allocated", &self.bytes_allocated)
329 .field("is_done_allocating", &self.is_done_allocating)
330 .finish()
331 }
332}
333
334impl AllocTracker {
335 pub fn new(write_buffer_manager: Option<WriteBufferManagerRef>) -> AllocTracker {
337 AllocTracker {
338 write_buffer_manager,
339 bytes_allocated: AtomicUsize::new(0),
340 is_done_allocating: AtomicBool::new(false),
341 }
342 }
343
344 pub(crate) fn on_allocation(&self, bytes: usize) {
346 self.bytes_allocated.fetch_add(bytes, Ordering::Relaxed);
347 WRITE_BUFFER_BYTES.add(bytes as i64);
348 if let Some(write_buffer_manager) = &self.write_buffer_manager {
349 write_buffer_manager.reserve_mem(bytes);
350 }
351 }
352
353 pub(crate) fn done_allocating(&self) {
358 if let Some(write_buffer_manager) = &self.write_buffer_manager
359 && self
360 .is_done_allocating
361 .compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed)
362 .is_ok()
363 {
364 write_buffer_manager.schedule_free_mem(self.bytes_allocated.load(Ordering::Relaxed));
365 }
366 }
367
368 pub(crate) fn bytes_allocated(&self) -> usize {
370 self.bytes_allocated.load(Ordering::Relaxed)
371 }
372
373 pub(crate) fn write_buffer_manager(&self) -> Option<WriteBufferManagerRef> {
375 self.write_buffer_manager.clone()
376 }
377}
378
379impl Drop for AllocTracker {
380 fn drop(&mut self) {
381 if !self.is_done_allocating.load(Ordering::Relaxed) {
382 self.done_allocating();
383 }
384
385 let bytes_allocated = self.bytes_allocated.load(Ordering::Relaxed);
386 WRITE_BUFFER_BYTES.sub(bytes_allocated as i64);
387
388 if let Some(write_buffer_manager) = &self.write_buffer_manager {
390 write_buffer_manager.free_mem(bytes_allocated);
391 }
392 }
393}
394
395#[derive(Clone)]
397pub(crate) struct MemtableBuilderProvider {
398 write_buffer_manager: Option<WriteBufferManagerRef>,
399 config: Arc<MitoConfig>,
400 compact_dispatcher: Arc<CompactDispatcher>,
401}
402
403impl MemtableBuilderProvider {
404 pub(crate) fn new(
405 write_buffer_manager: Option<WriteBufferManagerRef>,
406 config: Arc<MitoConfig>,
407 ) -> Self {
408 let compact_dispatcher =
409 Arc::new(CompactDispatcher::new(config.max_background_compactions));
410
411 Self {
412 write_buffer_manager,
413 config,
414 compact_dispatcher,
415 }
416 }
417
418 pub(crate) fn builder_for_options(&self, options: &RegionOptions) -> MemtableBuilderRef {
419 let dedup = options.need_dedup();
420 let merge_mode = options.merge_mode();
421 let flat_format = options
422 .sst_format
423 .map(|format| format == FormatType::Flat)
424 .unwrap_or(self.config.default_flat_format);
425 if flat_format {
426 if options.memtable.is_some() {
427 common_telemetry::info!(
428 "Overriding memtable config, use BulkMemtable under flat format"
429 );
430 }
431
432 return Arc::new(
433 BulkMemtableBuilder::new(
434 self.write_buffer_manager.clone(),
435 !dedup, merge_mode,
437 )
438 .with_compact_dispatcher(self.compact_dispatcher.clone()),
439 );
440 }
441
442 match &options.memtable {
444 Some(MemtableOptions::TimeSeries) => Arc::new(TimeSeriesMemtableBuilder::new(
445 self.write_buffer_manager.clone(),
446 dedup,
447 merge_mode,
448 )),
449 Some(MemtableOptions::PartitionTree(opts)) => {
450 Arc::new(PartitionTreeMemtableBuilder::new(
451 PartitionTreeConfig {
452 index_max_keys_per_shard: opts.index_max_keys_per_shard,
453 data_freeze_threshold: opts.data_freeze_threshold,
454 fork_dictionary_bytes: opts.fork_dictionary_bytes,
455 dedup,
456 merge_mode,
457 },
458 self.write_buffer_manager.clone(),
459 ))
460 }
461 None => self.default_primary_key_memtable_builder(dedup, merge_mode),
462 }
463 }
464
465 fn default_primary_key_memtable_builder(
466 &self,
467 dedup: bool,
468 merge_mode: MergeMode,
469 ) -> MemtableBuilderRef {
470 match &self.config.memtable {
471 MemtableConfig::PartitionTree(config) => {
472 let mut config = config.clone();
473 config.dedup = dedup;
474 Arc::new(PartitionTreeMemtableBuilder::new(
475 config,
476 self.write_buffer_manager.clone(),
477 ))
478 }
479 MemtableConfig::TimeSeries => Arc::new(TimeSeriesMemtableBuilder::new(
480 self.write_buffer_manager.clone(),
481 dedup,
482 merge_mode,
483 )),
484 }
485 }
486}
487
488#[derive(Clone, Default)]
490pub struct MemScanMetrics(Arc<Mutex<MemScanMetricsData>>);
491
492impl MemScanMetrics {
493 pub(crate) fn merge_inner(&self, inner: &MemScanMetricsData) {
495 let mut metrics = self.0.lock().unwrap();
496 metrics.total_series += inner.total_series;
497 metrics.num_rows += inner.num_rows;
498 metrics.num_batches += inner.num_batches;
499 metrics.scan_cost += inner.scan_cost;
500 metrics.prefilter_cost += inner.prefilter_cost;
501 metrics.prefilter_rows_filtered += inner.prefilter_rows_filtered;
502 }
503
504 pub(crate) fn data(&self) -> MemScanMetricsData {
506 self.0.lock().unwrap().clone()
507 }
508}
509
510#[derive(Clone, Default)]
511pub(crate) struct MemScanMetricsData {
512 pub(crate) total_series: usize,
514 pub(crate) num_rows: usize,
516 pub(crate) num_batches: usize,
518 pub(crate) scan_cost: Duration,
520 pub(crate) prefilter_cost: Duration,
522 pub(crate) prefilter_rows_filtered: usize,
524}
525
526pub struct EncodedRange {
528 pub data: Bytes,
530 pub sst_info: SstInfo,
532}
533
534pub trait IterBuilder: Send + Sync {
537 fn build(&self, metrics: Option<MemScanMetrics>) -> Result<BoxedBatchIterator>;
539
540 fn is_record_batch(&self) -> bool {
542 false
543 }
544
545 fn build_record_batch(
549 &self,
550 time_range: Option<(Timestamp, Timestamp)>,
551 metrics: Option<MemScanMetrics>,
552 ) -> Result<BoxedRecordBatchIterator> {
553 let _metrics = metrics;
554 let _ = time_range;
555 UnsupportedOperationSnafu {
556 err_msg: "Record batch iterator is not supported by this memtable",
557 }
558 .fail()
559 }
560
561 fn encoded_range(&self) -> Option<EncodedRange> {
563 None
564 }
565}
566
567pub type BoxedIterBuilder = Box<dyn IterBuilder>;
568
569pub fn read_column_ids_from_projection(
574 metadata: &RegionMetadataRef,
575 projection: Option<&[ColumnId]>,
576) -> Vec<ColumnId> {
577 if let Some(projection) = projection {
578 projection.to_vec()
579 } else {
580 metadata
581 .column_metadatas
582 .iter()
583 .map(|c| c.column_id)
584 .collect()
585 }
586}
587
588pub struct BatchToRecordBatchContext {
590 metadata: RegionMetadataRef,
591 codec: Arc<dyn PrimaryKeyCodec>,
592 read_column_ids: Vec<ColumnId>,
593}
594
595impl BatchToRecordBatchContext {
596 pub fn new(metadata: RegionMetadataRef, mut read_column_ids: Vec<ColumnId>) -> Self {
598 if read_column_ids.is_empty() {
599 read_column_ids.push(metadata.time_index_column().column_id);
600 }
601
602 let codec = build_primary_key_codec(&metadata);
603 Self {
604 metadata,
605 codec,
606 read_column_ids,
607 }
608 }
609
610 fn adapt_iter(&self, iter: BoxedBatchIterator) -> BoxedRecordBatchIterator {
611 Box::new(BatchToRecordBatchAdapter::new(
612 iter,
613 self.metadata.clone(),
614 self.codec.clone(),
615 &self.read_column_ids,
616 ))
617 }
618}
619
620pub struct MemtableRangeContext {
622 id: MemtableId,
624 builder: BoxedIterBuilder,
626 predicate: PredicateGroup,
628 batch_to_record_batch: Option<Arc<BatchToRecordBatchContext>>,
630}
631
632pub type MemtableRangeContextRef = Arc<MemtableRangeContext>;
633
634impl MemtableRangeContext {
635 pub fn new(id: MemtableId, builder: BoxedIterBuilder, predicate: PredicateGroup) -> Self {
637 Self::new_with_batch_to_record_batch(id, builder, predicate, None)
638 }
639
640 pub fn new_with_batch_to_record_batch(
642 id: MemtableId,
643 builder: BoxedIterBuilder,
644 predicate: PredicateGroup,
645 batch_to_record_batch: Option<Arc<BatchToRecordBatchContext>>,
646 ) -> Self {
647 Self {
648 id,
649 builder,
650 predicate,
651 batch_to_record_batch,
652 }
653 }
654}
655
656#[derive(Clone)]
658pub struct MemtableRange {
659 context: MemtableRangeContextRef,
661 stats: MemtableStats,
663}
664
665impl MemtableRange {
666 pub fn new(context: MemtableRangeContextRef, stats: MemtableStats) -> Self {
668 Self { context, stats }
669 }
670
671 pub fn stats(&self) -> &MemtableStats {
673 &self.stats
674 }
675
676 pub fn id(&self) -> MemtableId {
678 self.context.id
679 }
680
681 pub fn build_prune_iter(
685 &self,
686 time_range: FileTimeRange,
687 metrics: Option<MemScanMetrics>,
688 ) -> Result<BoxedBatchIterator> {
689 let iter = self.context.builder.build(metrics)?;
690 let time_filters = self.context.predicate.time_filters();
691 Ok(Box::new(PruneTimeIterator::new(
692 iter,
693 time_range,
694 time_filters,
695 )))
696 }
697
698 pub fn build_iter(&self) -> Result<BoxedBatchIterator> {
700 self.context.builder.build(None)
701 }
702
703 pub fn build_record_batch_iter(
708 &self,
709 time_range: Option<FileTimeRange>,
710 metrics: Option<MemScanMetrics>,
711 ) -> Result<BoxedRecordBatchIterator> {
712 if self.context.builder.is_record_batch() {
713 return self.context.builder.build_record_batch(time_range, metrics);
714 }
715
716 if let Some(context) = self.context.batch_to_record_batch.as_ref() {
717 let iter = self.context.builder.build(metrics)?;
718 let iter: BoxedBatchIterator = if let Some(time_range) = time_range {
719 let time_filters = self.context.predicate.time_filters();
720 Box::new(PruneTimeIterator::new(iter, time_range, time_filters))
721 } else {
722 iter
723 };
724 return Ok(context.adapt_iter(iter));
725 }
726
727 UnsupportedOperationSnafu {
728 err_msg: "Record batch iterator is not supported by this memtable",
729 }
730 .fail()
731 }
732
733 pub fn is_record_batch(&self) -> bool {
735 self.context.builder.is_record_batch()
736 }
737
738 pub fn num_rows(&self) -> usize {
739 self.stats.num_rows
740 }
741
742 pub fn encoded(&self) -> Option<EncodedRange> {
744 self.context.builder.encoded_range()
745 }
746}
747
748#[cfg(test)]
749mod tests {
750 use std::sync::Arc;
751
752 use common_base::readable_size::ReadableSize;
753
754 use super::*;
755 use crate::flush::{WriteBufferManager, WriteBufferManagerImpl};
756
757 #[test]
758 fn test_deserialize_memtable_config() {
759 let s = r#"
760type = "partition_tree"
761index_max_keys_per_shard = 8192
762data_freeze_threshold = 1024
763dedup = true
764fork_dictionary_bytes = "512MiB"
765"#;
766 let config: MemtableConfig = toml::from_str(s).unwrap();
767 let MemtableConfig::PartitionTree(memtable_config) = config else {
768 unreachable!()
769 };
770 assert!(memtable_config.dedup);
771 assert_eq!(8192, memtable_config.index_max_keys_per_shard);
772 assert_eq!(1024, memtable_config.data_freeze_threshold);
773 assert_eq!(ReadableSize::mb(512), memtable_config.fork_dictionary_bytes);
774 }
775
776 #[test]
777 fn test_alloc_tracker_without_manager() {
778 let tracker = AllocTracker::new(None);
779 assert_eq!(0, tracker.bytes_allocated());
780 tracker.on_allocation(100);
781 assert_eq!(100, tracker.bytes_allocated());
782 tracker.on_allocation(200);
783 assert_eq!(300, tracker.bytes_allocated());
784
785 tracker.done_allocating();
786 assert_eq!(300, tracker.bytes_allocated());
787 }
788
789 #[test]
790 fn test_alloc_tracker_with_manager() {
791 let manager = Arc::new(WriteBufferManagerImpl::new(1000));
792 {
793 let tracker = AllocTracker::new(Some(manager.clone() as WriteBufferManagerRef));
794
795 tracker.on_allocation(100);
796 assert_eq!(100, tracker.bytes_allocated());
797 assert_eq!(100, manager.memory_usage());
798 assert_eq!(100, manager.mutable_usage());
799
800 for _ in 0..2 {
801 tracker.done_allocating();
803 assert_eq!(100, manager.memory_usage());
804 assert_eq!(0, manager.mutable_usage());
805 }
806 }
807
808 assert_eq!(0, manager.memory_usage());
809 assert_eq!(0, manager.mutable_usage());
810 }
811
812 #[test]
813 fn test_alloc_tracker_without_done_allocating() {
814 let manager = Arc::new(WriteBufferManagerImpl::new(1000));
815 {
816 let tracker = AllocTracker::new(Some(manager.clone() as WriteBufferManagerRef));
817
818 tracker.on_allocation(100);
819 assert_eq!(100, tracker.bytes_allocated());
820 assert_eq!(100, manager.memory_usage());
821 assert_eq!(100, manager.mutable_usage());
822 }
823
824 assert_eq!(0, manager.memory_usage());
825 assert_eq!(0, manager.mutable_usage());
826 }
827}