1use std::collections::BTreeMap;
18use std::fmt;
19use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
20use std::sync::{Arc, Mutex};
21use std::time::Duration;
22
23pub use bulk::part::EncodedBulkPart;
24use bytes::Bytes;
25use common_time::Timestamp;
26use datatypes::arrow::record_batch::RecordBatch;
27use mito_codec::key_values::KeyValue;
28pub use mito_codec::key_values::KeyValues;
29use serde::{Deserialize, Serialize};
30use store_api::metadata::RegionMetadataRef;
31use store_api::storage::{ColumnId, SequenceNumber, SequenceRange};
32
33use crate::config::MitoConfig;
34use crate::error::{Result, UnsupportedOperationSnafu};
35use crate::flush::WriteBufferManagerRef;
36use crate::memtable::bulk::{BulkMemtableBuilder, CompactDispatcher};
37use crate::memtable::partition_tree::{PartitionTreeConfig, PartitionTreeMemtableBuilder};
38use crate::memtable::time_series::TimeSeriesMemtableBuilder;
39use crate::metrics::WRITE_BUFFER_BYTES;
40use crate::read::Batch;
41use crate::read::prune::PruneTimeIterator;
42use crate::read::scan_region::PredicateGroup;
43use crate::region::options::{MemtableOptions, MergeMode, RegionOptions};
44use crate::sst::FormatType;
45use crate::sst::file::FileTimeRange;
46use crate::sst::parquet::SstInfo;
47use crate::sst::parquet::file_range::PreFilterMode;
48
49mod builder;
50pub mod bulk;
51pub mod partition_tree;
52pub mod simple_bulk_memtable;
53mod stats;
54pub mod time_partition;
55pub mod time_series;
56pub(crate) mod version;
57
58#[cfg(any(test, feature = "test"))]
59pub use bulk::part::BulkPart;
60pub use bulk::part::{
61 BulkPartEncoder, BulkPartMeta, UnorderedPart, record_batch_estimated_size,
62 sort_primary_key_record_batch,
63};
64#[cfg(any(test, feature = "test"))]
65pub use time_partition::filter_record_batch;
66
67pub type MemtableId = u32;
71
72#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
74#[serde(tag = "type", rename_all = "snake_case")]
75pub enum MemtableConfig {
76 PartitionTree(PartitionTreeConfig),
77 #[default]
78 TimeSeries,
79}
80
81#[derive(Clone)]
83pub struct RangesOptions {
84 pub for_flush: bool,
86 pub pre_filter_mode: PreFilterMode,
88 pub predicate: PredicateGroup,
90 pub sequence: Option<SequenceRange>,
92}
93
94impl Default for RangesOptions {
95 fn default() -> Self {
96 Self {
97 for_flush: false,
98 pre_filter_mode: PreFilterMode::All,
99 predicate: PredicateGroup::default(),
100 sequence: None,
101 }
102 }
103}
104
105impl RangesOptions {
106 pub fn for_flush() -> Self {
108 Self {
109 for_flush: true,
110 pre_filter_mode: PreFilterMode::All,
111 predicate: PredicateGroup::default(),
112 sequence: None,
113 }
114 }
115
116 #[must_use]
118 pub fn with_pre_filter_mode(mut self, pre_filter_mode: PreFilterMode) -> Self {
119 self.pre_filter_mode = pre_filter_mode;
120 self
121 }
122
123 #[must_use]
125 pub fn with_predicate(mut self, predicate: PredicateGroup) -> Self {
126 self.predicate = predicate;
127 self
128 }
129
130 #[must_use]
132 pub fn with_sequence(mut self, sequence: Option<SequenceRange>) -> Self {
133 self.sequence = sequence;
134 self
135 }
136}
137
138#[derive(Debug, Default, Clone)]
139pub struct MemtableStats {
140 estimated_bytes: usize,
142 time_range: Option<(Timestamp, Timestamp)>,
145 pub num_rows: usize,
147 pub num_ranges: usize,
149 max_sequence: SequenceNumber,
151 series_count: usize,
153}
154
155impl MemtableStats {
156 #[cfg(any(test, feature = "test"))]
158 pub fn with_time_range(mut self, time_range: Option<(Timestamp, Timestamp)>) -> Self {
159 self.time_range = time_range;
160 self
161 }
162
163 #[cfg(feature = "test")]
164 pub fn with_max_sequence(mut self, max_sequence: SequenceNumber) -> Self {
165 self.max_sequence = max_sequence;
166 self
167 }
168
169 pub fn bytes_allocated(&self) -> usize {
171 self.estimated_bytes
172 }
173
174 pub fn time_range(&self) -> Option<(Timestamp, Timestamp)> {
176 self.time_range
177 }
178
179 pub fn num_rows(&self) -> usize {
181 self.num_rows
182 }
183
184 pub fn num_ranges(&self) -> usize {
186 self.num_ranges
187 }
188
189 pub fn max_sequence(&self) -> SequenceNumber {
191 self.max_sequence
192 }
193
194 pub fn series_count(&self) -> usize {
196 self.series_count
197 }
198}
199
200pub type BoxedBatchIterator = Box<dyn Iterator<Item = Result<Batch>> + Send>;
201
202pub type BoxedRecordBatchIterator = Box<dyn Iterator<Item = Result<RecordBatch>> + Send>;
203
204#[derive(Default)]
206pub struct MemtableRanges {
207 pub ranges: BTreeMap<usize, MemtableRange>,
209 pub stats: MemtableStats,
211}
212
213impl IterBuilder for MemtableRanges {
214 fn build(&self, _metrics: Option<MemScanMetrics>) -> Result<BoxedBatchIterator> {
215 UnsupportedOperationSnafu {
216 err_msg: "MemtableRanges does not support build iterator",
217 }
218 .fail()
219 }
220
221 fn is_record_batch(&self) -> bool {
222 self.ranges.values().all(|range| range.is_record_batch())
223 }
224}
225
226pub trait Memtable: Send + Sync + fmt::Debug {
228 fn id(&self) -> MemtableId;
230
231 fn write(&self, kvs: &KeyValues) -> Result<()>;
233
234 fn write_one(&self, key_value: KeyValue) -> Result<()>;
236
237 fn write_bulk(&self, part: crate::memtable::bulk::part::BulkPart) -> Result<()>;
239
240 #[cfg(any(test, feature = "test"))]
247 fn iter(
248 &self,
249 projection: Option<&[ColumnId]>,
250 predicate: Option<table::predicate::Predicate>,
251 sequence: Option<SequenceRange>,
252 ) -> Result<BoxedBatchIterator>;
253
254 fn ranges(
258 &self,
259 projection: Option<&[ColumnId]>,
260 options: RangesOptions,
261 ) -> Result<MemtableRanges>;
262
263 fn is_empty(&self) -> bool;
265
266 fn freeze(&self) -> Result<()>;
268
269 fn stats(&self) -> MemtableStats;
271
272 fn fork(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef;
276
277 fn compact(&self, for_flush: bool) -> Result<()> {
281 let _ = for_flush;
282 Ok(())
283 }
284}
285
286pub type MemtableRef = Arc<dyn Memtable>;
287
288pub trait MemtableBuilder: Send + Sync + fmt::Debug {
290 fn build(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef;
292
293 fn use_bulk_insert(&self, metadata: &RegionMetadataRef) -> bool {
295 let _metadata = metadata;
296 false
297 }
298}
299
300pub type MemtableBuilderRef = Arc<dyn MemtableBuilder>;
301
302#[derive(Default)]
304pub struct AllocTracker {
305 write_buffer_manager: Option<WriteBufferManagerRef>,
306 bytes_allocated: AtomicUsize,
308 is_done_allocating: AtomicBool,
310}
311
312impl fmt::Debug for AllocTracker {
313 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
314 f.debug_struct("AllocTracker")
315 .field("bytes_allocated", &self.bytes_allocated)
316 .field("is_done_allocating", &self.is_done_allocating)
317 .finish()
318 }
319}
320
321impl AllocTracker {
322 pub fn new(write_buffer_manager: Option<WriteBufferManagerRef>) -> AllocTracker {
324 AllocTracker {
325 write_buffer_manager,
326 bytes_allocated: AtomicUsize::new(0),
327 is_done_allocating: AtomicBool::new(false),
328 }
329 }
330
331 pub(crate) fn on_allocation(&self, bytes: usize) {
333 self.bytes_allocated.fetch_add(bytes, Ordering::Relaxed);
334 WRITE_BUFFER_BYTES.add(bytes as i64);
335 if let Some(write_buffer_manager) = &self.write_buffer_manager {
336 write_buffer_manager.reserve_mem(bytes);
337 }
338 }
339
340 pub(crate) fn done_allocating(&self) {
345 if let Some(write_buffer_manager) = &self.write_buffer_manager
346 && self
347 .is_done_allocating
348 .compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed)
349 .is_ok()
350 {
351 write_buffer_manager.schedule_free_mem(self.bytes_allocated.load(Ordering::Relaxed));
352 }
353 }
354
355 pub(crate) fn bytes_allocated(&self) -> usize {
357 self.bytes_allocated.load(Ordering::Relaxed)
358 }
359
360 pub(crate) fn write_buffer_manager(&self) -> Option<WriteBufferManagerRef> {
362 self.write_buffer_manager.clone()
363 }
364}
365
366impl Drop for AllocTracker {
367 fn drop(&mut self) {
368 if !self.is_done_allocating.load(Ordering::Relaxed) {
369 self.done_allocating();
370 }
371
372 let bytes_allocated = self.bytes_allocated.load(Ordering::Relaxed);
373 WRITE_BUFFER_BYTES.sub(bytes_allocated as i64);
374
375 if let Some(write_buffer_manager) = &self.write_buffer_manager {
377 write_buffer_manager.free_mem(bytes_allocated);
378 }
379 }
380}
381
382#[derive(Clone)]
384pub(crate) struct MemtableBuilderProvider {
385 write_buffer_manager: Option<WriteBufferManagerRef>,
386 config: Arc<MitoConfig>,
387 compact_dispatcher: Arc<CompactDispatcher>,
388}
389
390impl MemtableBuilderProvider {
391 pub(crate) fn new(
392 write_buffer_manager: Option<WriteBufferManagerRef>,
393 config: Arc<MitoConfig>,
394 ) -> Self {
395 let compact_dispatcher =
396 Arc::new(CompactDispatcher::new(config.max_background_compactions));
397
398 Self {
399 write_buffer_manager,
400 config,
401 compact_dispatcher,
402 }
403 }
404
405 pub(crate) fn builder_for_options(&self, options: &RegionOptions) -> MemtableBuilderRef {
406 let dedup = options.need_dedup();
407 let merge_mode = options.merge_mode();
408 let flat_format = options
409 .sst_format
410 .map(|format| format == FormatType::Flat)
411 .unwrap_or(self.config.default_experimental_flat_format);
412 if flat_format {
413 if options.memtable.is_some() {
414 common_telemetry::info!(
415 "Overriding memtable config, use BulkMemtable under flat format"
416 );
417 }
418
419 return Arc::new(
420 BulkMemtableBuilder::new(
421 self.write_buffer_manager.clone(),
422 !dedup, merge_mode,
424 )
425 .with_compact_dispatcher(self.compact_dispatcher.clone()),
426 );
427 }
428
429 match &options.memtable {
431 Some(MemtableOptions::TimeSeries) => Arc::new(TimeSeriesMemtableBuilder::new(
432 self.write_buffer_manager.clone(),
433 dedup,
434 merge_mode,
435 )),
436 Some(MemtableOptions::PartitionTree(opts)) => {
437 Arc::new(PartitionTreeMemtableBuilder::new(
438 PartitionTreeConfig {
439 index_max_keys_per_shard: opts.index_max_keys_per_shard,
440 data_freeze_threshold: opts.data_freeze_threshold,
441 fork_dictionary_bytes: opts.fork_dictionary_bytes,
442 dedup,
443 merge_mode,
444 },
445 self.write_buffer_manager.clone(),
446 ))
447 }
448 None => self.default_primary_key_memtable_builder(dedup, merge_mode),
449 }
450 }
451
452 fn default_primary_key_memtable_builder(
453 &self,
454 dedup: bool,
455 merge_mode: MergeMode,
456 ) -> MemtableBuilderRef {
457 match &self.config.memtable {
458 MemtableConfig::PartitionTree(config) => {
459 let mut config = config.clone();
460 config.dedup = dedup;
461 Arc::new(PartitionTreeMemtableBuilder::new(
462 config,
463 self.write_buffer_manager.clone(),
464 ))
465 }
466 MemtableConfig::TimeSeries => Arc::new(TimeSeriesMemtableBuilder::new(
467 self.write_buffer_manager.clone(),
468 dedup,
469 merge_mode,
470 )),
471 }
472 }
473}
474
475#[derive(Clone, Default)]
477pub struct MemScanMetrics(Arc<Mutex<MemScanMetricsData>>);
478
479impl MemScanMetrics {
480 pub(crate) fn merge_inner(&self, inner: &MemScanMetricsData) {
482 let mut metrics = self.0.lock().unwrap();
483 metrics.total_series += inner.total_series;
484 metrics.num_rows += inner.num_rows;
485 metrics.num_batches += inner.num_batches;
486 metrics.scan_cost += inner.scan_cost;
487 }
488
489 pub(crate) fn data(&self) -> MemScanMetricsData {
491 self.0.lock().unwrap().clone()
492 }
493}
494
495#[derive(Clone, Default)]
496pub(crate) struct MemScanMetricsData {
497 pub(crate) total_series: usize,
499 pub(crate) num_rows: usize,
501 pub(crate) num_batches: usize,
503 pub(crate) scan_cost: Duration,
505}
506
507pub struct EncodedRange {
509 pub data: Bytes,
511 pub sst_info: SstInfo,
513}
514
515pub trait IterBuilder: Send + Sync {
518 fn build(&self, metrics: Option<MemScanMetrics>) -> Result<BoxedBatchIterator>;
520
521 fn is_record_batch(&self) -> bool {
523 false
524 }
525
526 fn build_record_batch(
528 &self,
529 metrics: Option<MemScanMetrics>,
530 ) -> Result<BoxedRecordBatchIterator> {
531 let _metrics = metrics;
532 UnsupportedOperationSnafu {
533 err_msg: "Record batch iterator is not supported by this memtable",
534 }
535 .fail()
536 }
537
538 fn encoded_range(&self) -> Option<EncodedRange> {
540 None
541 }
542}
543
544pub type BoxedIterBuilder = Box<dyn IterBuilder>;
545
546pub struct MemtableRangeContext {
548 id: MemtableId,
550 builder: BoxedIterBuilder,
552 predicate: PredicateGroup,
554}
555
556pub type MemtableRangeContextRef = Arc<MemtableRangeContext>;
557
558impl MemtableRangeContext {
559 pub fn new(id: MemtableId, builder: BoxedIterBuilder, predicate: PredicateGroup) -> Self {
561 Self {
562 id,
563 builder,
564 predicate,
565 }
566 }
567}
568
569#[derive(Clone)]
571pub struct MemtableRange {
572 context: MemtableRangeContextRef,
574 num_rows: usize,
577}
578
579impl MemtableRange {
580 pub fn new(context: MemtableRangeContextRef, num_rows: usize) -> Self {
582 Self { context, num_rows }
583 }
584
585 pub fn id(&self) -> MemtableId {
587 self.context.id
588 }
589
590 pub fn build_prune_iter(
594 &self,
595 time_range: FileTimeRange,
596 metrics: Option<MemScanMetrics>,
597 ) -> Result<BoxedBatchIterator> {
598 let iter = self.context.builder.build(metrics)?;
599 let time_filters = self.context.predicate.time_filters();
600 Ok(Box::new(PruneTimeIterator::new(
601 iter,
602 time_range,
603 time_filters,
604 )))
605 }
606
607 pub fn build_iter(&self) -> Result<BoxedBatchIterator> {
609 self.context.builder.build(None)
610 }
611
612 pub fn build_record_batch_iter(
617 &self,
618 metrics: Option<MemScanMetrics>,
619 ) -> Result<BoxedRecordBatchIterator> {
620 self.context.builder.build_record_batch(metrics)
621 }
622
623 pub fn is_record_batch(&self) -> bool {
625 self.context.builder.is_record_batch()
626 }
627
628 pub fn num_rows(&self) -> usize {
629 self.num_rows
630 }
631
632 pub fn encoded(&self) -> Option<EncodedRange> {
634 self.context.builder.encoded_range()
635 }
636}
637
638#[cfg(test)]
639mod tests {
640 use common_base::readable_size::ReadableSize;
641
642 use super::*;
643 use crate::flush::{WriteBufferManager, WriteBufferManagerImpl};
644
645 #[test]
646 fn test_deserialize_memtable_config() {
647 let s = r#"
648type = "partition_tree"
649index_max_keys_per_shard = 8192
650data_freeze_threshold = 1024
651dedup = true
652fork_dictionary_bytes = "512MiB"
653"#;
654 let config: MemtableConfig = toml::from_str(s).unwrap();
655 let MemtableConfig::PartitionTree(memtable_config) = config else {
656 unreachable!()
657 };
658 assert!(memtable_config.dedup);
659 assert_eq!(8192, memtable_config.index_max_keys_per_shard);
660 assert_eq!(1024, memtable_config.data_freeze_threshold);
661 assert_eq!(ReadableSize::mb(512), memtable_config.fork_dictionary_bytes);
662 }
663
664 #[test]
665 fn test_alloc_tracker_without_manager() {
666 let tracker = AllocTracker::new(None);
667 assert_eq!(0, tracker.bytes_allocated());
668 tracker.on_allocation(100);
669 assert_eq!(100, tracker.bytes_allocated());
670 tracker.on_allocation(200);
671 assert_eq!(300, tracker.bytes_allocated());
672
673 tracker.done_allocating();
674 assert_eq!(300, tracker.bytes_allocated());
675 }
676
677 #[test]
678 fn test_alloc_tracker_with_manager() {
679 let manager = Arc::new(WriteBufferManagerImpl::new(1000));
680 {
681 let tracker = AllocTracker::new(Some(manager.clone() as WriteBufferManagerRef));
682
683 tracker.on_allocation(100);
684 assert_eq!(100, tracker.bytes_allocated());
685 assert_eq!(100, manager.memory_usage());
686 assert_eq!(100, manager.mutable_usage());
687
688 for _ in 0..2 {
689 tracker.done_allocating();
691 assert_eq!(100, manager.memory_usage());
692 assert_eq!(0, manager.mutable_usage());
693 }
694 }
695
696 assert_eq!(0, manager.memory_usage());
697 assert_eq!(0, manager.mutable_usage());
698 }
699
700 #[test]
701 fn test_alloc_tracker_without_done_allocating() {
702 let manager = Arc::new(WriteBufferManagerImpl::new(1000));
703 {
704 let tracker = AllocTracker::new(Some(manager.clone() as WriteBufferManagerRef));
705
706 tracker.on_allocation(100);
707 assert_eq!(100, tracker.bytes_allocated());
708 assert_eq!(100, manager.memory_usage());
709 assert_eq!(100, manager.mutable_usage());
710 }
711
712 assert_eq!(0, manager.memory_usage());
713 assert_eq!(0, manager.mutable_usage());
714 }
715}