1use std::collections::BTreeMap;
18use std::fmt;
19use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
20use std::sync::{Arc, Mutex};
21use std::time::Duration;
22
23pub use bulk::part::EncodedBulkPart;
24use bytes::Bytes;
25use common_time::Timestamp;
26use datatypes::arrow::record_batch::RecordBatch;
27use mito_codec::key_values::KeyValue;
28pub use mito_codec::key_values::KeyValues;
29use serde::{Deserialize, Serialize};
30use store_api::metadata::RegionMetadataRef;
31use store_api::storage::{ColumnId, SequenceNumber};
32
33use crate::config::MitoConfig;
34use crate::error::{Result, UnsupportedOperationSnafu};
35use crate::flush::WriteBufferManagerRef;
36use crate::memtable::bulk::{BulkMemtableBuilder, CompactDispatcher};
37use crate::memtable::partition_tree::{PartitionTreeConfig, PartitionTreeMemtableBuilder};
38use crate::memtable::time_series::TimeSeriesMemtableBuilder;
39use crate::metrics::WRITE_BUFFER_BYTES;
40use crate::read::Batch;
41use crate::read::prune::PruneTimeIterator;
42use crate::read::scan_region::PredicateGroup;
43use crate::region::options::{MemtableOptions, MergeMode, RegionOptions};
44use crate::sst::FormatType;
45use crate::sst::file::FileTimeRange;
46use crate::sst::parquet::SstInfo;
47
48mod builder;
49pub mod bulk;
50pub mod partition_tree;
51pub mod simple_bulk_memtable;
52mod stats;
53pub mod time_partition;
54pub mod time_series;
55pub(crate) mod version;
56
57#[cfg(any(test, feature = "test"))]
58pub use bulk::part::BulkPart;
59#[cfg(any(test, feature = "test"))]
60pub use time_partition::filter_record_batch;
61
62pub type MemtableId = u32;
66
67#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
69#[serde(tag = "type", rename_all = "snake_case")]
70pub enum MemtableConfig {
71 PartitionTree(PartitionTreeConfig),
72 #[default]
73 TimeSeries,
74}
75
76#[derive(Debug, Default, Clone)]
77pub struct MemtableStats {
78 estimated_bytes: usize,
80 time_range: Option<(Timestamp, Timestamp)>,
83 pub num_rows: usize,
85 pub num_ranges: usize,
87 max_sequence: SequenceNumber,
89 series_count: usize,
91}
92
93impl MemtableStats {
94 #[cfg(any(test, feature = "test"))]
96 pub fn with_time_range(mut self, time_range: Option<(Timestamp, Timestamp)>) -> Self {
97 self.time_range = time_range;
98 self
99 }
100
101 #[cfg(feature = "test")]
102 pub fn with_max_sequence(mut self, max_sequence: SequenceNumber) -> Self {
103 self.max_sequence = max_sequence;
104 self
105 }
106
107 pub fn bytes_allocated(&self) -> usize {
109 self.estimated_bytes
110 }
111
112 pub fn time_range(&self) -> Option<(Timestamp, Timestamp)> {
114 self.time_range
115 }
116
117 pub fn num_rows(&self) -> usize {
119 self.num_rows
120 }
121
122 pub fn num_ranges(&self) -> usize {
124 self.num_ranges
125 }
126
127 pub fn max_sequence(&self) -> SequenceNumber {
129 self.max_sequence
130 }
131
132 pub fn series_count(&self) -> usize {
134 self.series_count
135 }
136}
137
138pub type BoxedBatchIterator = Box<dyn Iterator<Item = Result<Batch>> + Send>;
139
140pub type BoxedRecordBatchIterator = Box<dyn Iterator<Item = Result<RecordBatch>> + Send>;
141
142#[derive(Default)]
144pub struct MemtableRanges {
145 pub ranges: BTreeMap<usize, MemtableRange>,
147 pub stats: MemtableStats,
149}
150
151impl IterBuilder for MemtableRanges {
152 fn build(&self, _metrics: Option<MemScanMetrics>) -> Result<BoxedBatchIterator> {
153 UnsupportedOperationSnafu {
154 err_msg: "MemtableRanges does not support build iterator",
155 }
156 .fail()
157 }
158
159 fn is_record_batch(&self) -> bool {
160 self.ranges.values().all(|range| range.is_record_batch())
161 }
162}
163
164pub trait Memtable: Send + Sync + fmt::Debug {
166 fn id(&self) -> MemtableId;
168
169 fn write(&self, kvs: &KeyValues) -> Result<()>;
171
172 fn write_one(&self, key_value: KeyValue) -> Result<()>;
174
175 fn write_bulk(&self, part: crate::memtable::bulk::part::BulkPart) -> Result<()>;
177
178 #[cfg(any(test, feature = "test"))]
185 fn iter(
186 &self,
187 projection: Option<&[ColumnId]>,
188 predicate: Option<table::predicate::Predicate>,
189 sequence: Option<SequenceNumber>,
190 ) -> Result<BoxedBatchIterator>;
191
192 fn ranges(
197 &self,
198 projection: Option<&[ColumnId]>,
199 predicate: PredicateGroup,
200 sequence: Option<SequenceNumber>,
201 for_flush: bool,
202 ) -> Result<MemtableRanges>;
203
204 fn is_empty(&self) -> bool;
206
207 fn freeze(&self) -> Result<()>;
209
210 fn stats(&self) -> MemtableStats;
212
213 fn fork(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef;
217
218 fn compact(&self, for_flush: bool) -> Result<()> {
222 let _ = for_flush;
223 Ok(())
224 }
225}
226
227pub type MemtableRef = Arc<dyn Memtable>;
228
229pub trait MemtableBuilder: Send + Sync + fmt::Debug {
231 fn build(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef;
233
234 fn use_bulk_insert(&self, metadata: &RegionMetadataRef) -> bool {
236 let _metadata = metadata;
237 false
238 }
239}
240
241pub type MemtableBuilderRef = Arc<dyn MemtableBuilder>;
242
243#[derive(Default)]
245pub struct AllocTracker {
246 write_buffer_manager: Option<WriteBufferManagerRef>,
247 bytes_allocated: AtomicUsize,
249 is_done_allocating: AtomicBool,
251}
252
253impl fmt::Debug for AllocTracker {
254 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
255 f.debug_struct("AllocTracker")
256 .field("bytes_allocated", &self.bytes_allocated)
257 .field("is_done_allocating", &self.is_done_allocating)
258 .finish()
259 }
260}
261
262impl AllocTracker {
263 pub fn new(write_buffer_manager: Option<WriteBufferManagerRef>) -> AllocTracker {
265 AllocTracker {
266 write_buffer_manager,
267 bytes_allocated: AtomicUsize::new(0),
268 is_done_allocating: AtomicBool::new(false),
269 }
270 }
271
272 pub(crate) fn on_allocation(&self, bytes: usize) {
274 self.bytes_allocated.fetch_add(bytes, Ordering::Relaxed);
275 WRITE_BUFFER_BYTES.add(bytes as i64);
276 if let Some(write_buffer_manager) = &self.write_buffer_manager {
277 write_buffer_manager.reserve_mem(bytes);
278 }
279 }
280
281 pub(crate) fn done_allocating(&self) {
286 if let Some(write_buffer_manager) = &self.write_buffer_manager
287 && self
288 .is_done_allocating
289 .compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed)
290 .is_ok()
291 {
292 write_buffer_manager.schedule_free_mem(self.bytes_allocated.load(Ordering::Relaxed));
293 }
294 }
295
296 pub(crate) fn bytes_allocated(&self) -> usize {
298 self.bytes_allocated.load(Ordering::Relaxed)
299 }
300
301 pub(crate) fn write_buffer_manager(&self) -> Option<WriteBufferManagerRef> {
303 self.write_buffer_manager.clone()
304 }
305}
306
307impl Drop for AllocTracker {
308 fn drop(&mut self) {
309 if !self.is_done_allocating.load(Ordering::Relaxed) {
310 self.done_allocating();
311 }
312
313 let bytes_allocated = self.bytes_allocated.load(Ordering::Relaxed);
314 WRITE_BUFFER_BYTES.sub(bytes_allocated as i64);
315
316 if let Some(write_buffer_manager) = &self.write_buffer_manager {
318 write_buffer_manager.free_mem(bytes_allocated);
319 }
320 }
321}
322
323#[derive(Clone)]
325pub(crate) struct MemtableBuilderProvider {
326 write_buffer_manager: Option<WriteBufferManagerRef>,
327 config: Arc<MitoConfig>,
328 compact_dispatcher: Arc<CompactDispatcher>,
329}
330
331impl MemtableBuilderProvider {
332 pub(crate) fn new(
333 write_buffer_manager: Option<WriteBufferManagerRef>,
334 config: Arc<MitoConfig>,
335 ) -> Self {
336 let compact_dispatcher =
337 Arc::new(CompactDispatcher::new(config.max_background_compactions));
338
339 Self {
340 write_buffer_manager,
341 config,
342 compact_dispatcher,
343 }
344 }
345
346 pub(crate) fn builder_for_options(&self, options: &RegionOptions) -> MemtableBuilderRef {
347 let dedup = options.need_dedup();
348 let merge_mode = options.merge_mode();
349 let flat_format = options
350 .sst_format
351 .map(|format| format == FormatType::Flat)
352 .unwrap_or(self.config.default_experimental_flat_format);
353 if flat_format {
354 if options.memtable.is_some() {
355 common_telemetry::info!(
356 "Overriding memtable config, use BulkMemtable under flat format"
357 );
358 }
359
360 return Arc::new(
361 BulkMemtableBuilder::new(
362 self.write_buffer_manager.clone(),
363 !dedup, merge_mode,
365 )
366 .with_compact_dispatcher(self.compact_dispatcher.clone()),
367 );
368 }
369
370 match &options.memtable {
371 Some(MemtableOptions::TimeSeries) => Arc::new(TimeSeriesMemtableBuilder::new(
372 self.write_buffer_manager.clone(),
373 dedup,
374 merge_mode,
375 )),
376 Some(MemtableOptions::PartitionTree(opts)) => {
377 Arc::new(PartitionTreeMemtableBuilder::new(
378 PartitionTreeConfig {
379 index_max_keys_per_shard: opts.index_max_keys_per_shard,
380 data_freeze_threshold: opts.data_freeze_threshold,
381 fork_dictionary_bytes: opts.fork_dictionary_bytes,
382 dedup,
383 merge_mode,
384 },
385 self.write_buffer_manager.clone(),
386 ))
387 }
388 None => self.default_memtable_builder(dedup, merge_mode),
389 }
390 }
391
392 fn default_memtable_builder(&self, dedup: bool, merge_mode: MergeMode) -> MemtableBuilderRef {
393 if self.config.default_experimental_flat_format {
394 return Arc::new(
395 BulkMemtableBuilder::new(
396 self.write_buffer_manager.clone(),
397 !dedup, merge_mode,
399 )
400 .with_compact_dispatcher(self.compact_dispatcher.clone()),
401 );
402 }
403
404 match &self.config.memtable {
405 MemtableConfig::PartitionTree(config) => {
406 let mut config = config.clone();
407 config.dedup = dedup;
408 Arc::new(PartitionTreeMemtableBuilder::new(
409 config,
410 self.write_buffer_manager.clone(),
411 ))
412 }
413 MemtableConfig::TimeSeries => Arc::new(TimeSeriesMemtableBuilder::new(
414 self.write_buffer_manager.clone(),
415 dedup,
416 merge_mode,
417 )),
418 }
419 }
420}
421
422#[derive(Clone, Default)]
424pub struct MemScanMetrics(Arc<Mutex<MemScanMetricsData>>);
425
426impl MemScanMetrics {
427 pub(crate) fn merge_inner(&self, inner: &MemScanMetricsData) {
429 let mut metrics = self.0.lock().unwrap();
430 metrics.total_series += inner.total_series;
431 metrics.num_rows += inner.num_rows;
432 metrics.num_batches += inner.num_batches;
433 metrics.scan_cost += inner.scan_cost;
434 }
435
436 pub(crate) fn data(&self) -> MemScanMetricsData {
438 self.0.lock().unwrap().clone()
439 }
440}
441
442#[derive(Clone, Default)]
443pub(crate) struct MemScanMetricsData {
444 pub(crate) total_series: usize,
446 pub(crate) num_rows: usize,
448 pub(crate) num_batches: usize,
450 pub(crate) scan_cost: Duration,
452}
453
454pub struct EncodedRange {
456 pub data: Bytes,
458 pub sst_info: SstInfo,
460}
461
462pub trait IterBuilder: Send + Sync {
465 fn build(&self, metrics: Option<MemScanMetrics>) -> Result<BoxedBatchIterator>;
467
468 fn is_record_batch(&self) -> bool {
470 false
471 }
472
473 fn build_record_batch(
475 &self,
476 metrics: Option<MemScanMetrics>,
477 ) -> Result<BoxedRecordBatchIterator> {
478 let _metrics = metrics;
479 UnsupportedOperationSnafu {
480 err_msg: "Record batch iterator is not supported by this memtable",
481 }
482 .fail()
483 }
484
485 fn encoded_range(&self) -> Option<EncodedRange> {
487 None
488 }
489}
490
491pub type BoxedIterBuilder = Box<dyn IterBuilder>;
492
493pub struct MemtableRangeContext {
495 id: MemtableId,
497 builder: BoxedIterBuilder,
499 predicate: PredicateGroup,
501}
502
503pub type MemtableRangeContextRef = Arc<MemtableRangeContext>;
504
505impl MemtableRangeContext {
506 pub fn new(id: MemtableId, builder: BoxedIterBuilder, predicate: PredicateGroup) -> Self {
508 Self {
509 id,
510 builder,
511 predicate,
512 }
513 }
514}
515
516#[derive(Clone)]
518pub struct MemtableRange {
519 context: MemtableRangeContextRef,
521 num_rows: usize,
524}
525
526impl MemtableRange {
527 pub fn new(context: MemtableRangeContextRef, num_rows: usize) -> Self {
529 Self { context, num_rows }
530 }
531
532 pub fn id(&self) -> MemtableId {
534 self.context.id
535 }
536
537 pub fn build_prune_iter(
541 &self,
542 time_range: FileTimeRange,
543 metrics: Option<MemScanMetrics>,
544 ) -> Result<BoxedBatchIterator> {
545 let iter = self.context.builder.build(metrics)?;
546 let time_filters = self.context.predicate.time_filters();
547 Ok(Box::new(PruneTimeIterator::new(
548 iter,
549 time_range,
550 time_filters,
551 )))
552 }
553
554 pub fn build_iter(&self) -> Result<BoxedBatchIterator> {
556 self.context.builder.build(None)
557 }
558
559 pub fn build_record_batch_iter(
564 &self,
565 metrics: Option<MemScanMetrics>,
566 ) -> Result<BoxedRecordBatchIterator> {
567 self.context.builder.build_record_batch(metrics)
568 }
569
570 pub fn is_record_batch(&self) -> bool {
572 self.context.builder.is_record_batch()
573 }
574
575 pub fn num_rows(&self) -> usize {
576 self.num_rows
577 }
578
579 pub fn encoded(&self) -> Option<EncodedRange> {
581 self.context.builder.encoded_range()
582 }
583}
584
585#[cfg(test)]
586mod tests {
587 use common_base::readable_size::ReadableSize;
588
589 use super::*;
590 use crate::flush::{WriteBufferManager, WriteBufferManagerImpl};
591
592 #[test]
593 fn test_deserialize_memtable_config() {
594 let s = r#"
595type = "partition_tree"
596index_max_keys_per_shard = 8192
597data_freeze_threshold = 1024
598dedup = true
599fork_dictionary_bytes = "512MiB"
600"#;
601 let config: MemtableConfig = toml::from_str(s).unwrap();
602 let MemtableConfig::PartitionTree(memtable_config) = config else {
603 unreachable!()
604 };
605 assert!(memtable_config.dedup);
606 assert_eq!(8192, memtable_config.index_max_keys_per_shard);
607 assert_eq!(1024, memtable_config.data_freeze_threshold);
608 assert_eq!(ReadableSize::mb(512), memtable_config.fork_dictionary_bytes);
609 }
610
611 #[test]
612 fn test_alloc_tracker_without_manager() {
613 let tracker = AllocTracker::new(None);
614 assert_eq!(0, tracker.bytes_allocated());
615 tracker.on_allocation(100);
616 assert_eq!(100, tracker.bytes_allocated());
617 tracker.on_allocation(200);
618 assert_eq!(300, tracker.bytes_allocated());
619
620 tracker.done_allocating();
621 assert_eq!(300, tracker.bytes_allocated());
622 }
623
624 #[test]
625 fn test_alloc_tracker_with_manager() {
626 let manager = Arc::new(WriteBufferManagerImpl::new(1000));
627 {
628 let tracker = AllocTracker::new(Some(manager.clone() as WriteBufferManagerRef));
629
630 tracker.on_allocation(100);
631 assert_eq!(100, tracker.bytes_allocated());
632 assert_eq!(100, manager.memory_usage());
633 assert_eq!(100, manager.mutable_usage());
634
635 for _ in 0..2 {
636 tracker.done_allocating();
638 assert_eq!(100, manager.memory_usage());
639 assert_eq!(0, manager.mutable_usage());
640 }
641 }
642
643 assert_eq!(0, manager.memory_usage());
644 assert_eq!(0, manager.mutable_usage());
645 }
646
647 #[test]
648 fn test_alloc_tracker_without_done_allocating() {
649 let manager = Arc::new(WriteBufferManagerImpl::new(1000));
650 {
651 let tracker = AllocTracker::new(Some(manager.clone() as WriteBufferManagerRef));
652
653 tracker.on_allocation(100);
654 assert_eq!(100, tracker.bytes_allocated());
655 assert_eq!(100, manager.memory_usage());
656 assert_eq!(100, manager.mutable_usage());
657 }
658
659 assert_eq!(0, manager.memory_usage());
660 assert_eq!(0, manager.mutable_usage());
661 }
662}