mito2/
memtable.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Memtables are write buffers for regions.
16
17use std::collections::BTreeMap;
18use std::fmt;
19use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
20use std::sync::{Arc, Mutex};
21use std::time::Duration;
22
23pub use bulk::part::EncodedBulkPart;
24use bytes::Bytes;
25use common_time::Timestamp;
26use datatypes::arrow::record_batch::RecordBatch;
27use mito_codec::key_values::KeyValue;
28pub use mito_codec::key_values::KeyValues;
29use serde::{Deserialize, Serialize};
30use store_api::metadata::RegionMetadataRef;
31use store_api::storage::{ColumnId, SequenceNumber, SequenceRange};
32
33use crate::config::MitoConfig;
34use crate::error::{Result, UnsupportedOperationSnafu};
35use crate::flush::WriteBufferManagerRef;
36use crate::memtable::bulk::{BulkMemtableBuilder, CompactDispatcher};
37use crate::memtable::partition_tree::{PartitionTreeConfig, PartitionTreeMemtableBuilder};
38use crate::memtable::time_series::TimeSeriesMemtableBuilder;
39use crate::metrics::WRITE_BUFFER_BYTES;
40use crate::read::Batch;
41use crate::read::prune::PruneTimeIterator;
42use crate::read::scan_region::PredicateGroup;
43use crate::region::options::{MemtableOptions, MergeMode, RegionOptions};
44use crate::sst::FormatType;
45use crate::sst::file::FileTimeRange;
46use crate::sst::parquet::SstInfo;
47use crate::sst::parquet::file_range::PreFilterMode;
48
49mod builder;
50pub mod bulk;
51pub mod partition_tree;
52pub mod simple_bulk_memtable;
53mod stats;
54pub mod time_partition;
55pub mod time_series;
56pub(crate) mod version;
57
58#[cfg(any(test, feature = "test"))]
59pub use bulk::part::BulkPart;
60#[cfg(any(test, feature = "test"))]
61pub use time_partition::filter_record_batch;
62
63/// Id for memtables.
64///
65/// Should be unique under the same region.
66pub type MemtableId = u32;
67
68/// Config for memtables.
69#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
70#[serde(tag = "type", rename_all = "snake_case")]
71pub enum MemtableConfig {
72    PartitionTree(PartitionTreeConfig),
73    #[default]
74    TimeSeries,
75}
76
77/// Options for querying ranges from a memtable.
78#[derive(Clone)]
79pub struct RangesOptions {
80    /// Whether the ranges are being queried for flush.
81    pub for_flush: bool,
82    /// Mode to pre-filter columns in ranges.
83    pub pre_filter_mode: PreFilterMode,
84    /// Predicate to filter the data.
85    pub predicate: PredicateGroup,
86    /// Sequence range to filter the data.
87    pub sequence: Option<SequenceRange>,
88}
89
90impl Default for RangesOptions {
91    fn default() -> Self {
92        Self {
93            for_flush: false,
94            pre_filter_mode: PreFilterMode::All,
95            predicate: PredicateGroup::default(),
96            sequence: None,
97        }
98    }
99}
100
101impl RangesOptions {
102    /// Creates a new [RangesOptions] for flushing.
103    pub fn for_flush() -> Self {
104        Self {
105            for_flush: true,
106            pre_filter_mode: PreFilterMode::All,
107            predicate: PredicateGroup::default(),
108            sequence: None,
109        }
110    }
111
112    /// Sets the pre-filter mode.
113    #[must_use]
114    pub fn with_pre_filter_mode(mut self, pre_filter_mode: PreFilterMode) -> Self {
115        self.pre_filter_mode = pre_filter_mode;
116        self
117    }
118
119    /// Sets the predicate.
120    #[must_use]
121    pub fn with_predicate(mut self, predicate: PredicateGroup) -> Self {
122        self.predicate = predicate;
123        self
124    }
125
126    /// Sets the sequence range.
127    #[must_use]
128    pub fn with_sequence(mut self, sequence: Option<SequenceRange>) -> Self {
129        self.sequence = sequence;
130        self
131    }
132}
133
134#[derive(Debug, Default, Clone)]
135pub struct MemtableStats {
136    /// The estimated bytes allocated by this memtable from heap.
137    estimated_bytes: usize,
138    /// The inclusive time range that this memtable contains. It is None if
139    /// and only if the memtable is empty.
140    time_range: Option<(Timestamp, Timestamp)>,
141    /// Total rows in memtable
142    pub num_rows: usize,
143    /// Total number of ranges in the memtable.
144    pub num_ranges: usize,
145    /// The maximum sequence number in the memtable.
146    max_sequence: SequenceNumber,
147    /// Number of estimated timeseries in memtable.
148    series_count: usize,
149}
150
151impl MemtableStats {
152    /// Attaches the time range to the stats.
153    #[cfg(any(test, feature = "test"))]
154    pub fn with_time_range(mut self, time_range: Option<(Timestamp, Timestamp)>) -> Self {
155        self.time_range = time_range;
156        self
157    }
158
159    #[cfg(feature = "test")]
160    pub fn with_max_sequence(mut self, max_sequence: SequenceNumber) -> Self {
161        self.max_sequence = max_sequence;
162        self
163    }
164
165    /// Returns the estimated bytes allocated by this memtable.
166    pub fn bytes_allocated(&self) -> usize {
167        self.estimated_bytes
168    }
169
170    /// Returns the time range of the memtable.
171    pub fn time_range(&self) -> Option<(Timestamp, Timestamp)> {
172        self.time_range
173    }
174
175    /// Returns the num of total rows in memtable.
176    pub fn num_rows(&self) -> usize {
177        self.num_rows
178    }
179
180    /// Returns the number of ranges in the memtable.
181    pub fn num_ranges(&self) -> usize {
182        self.num_ranges
183    }
184
185    /// Returns the maximum sequence number in the memtable.
186    pub fn max_sequence(&self) -> SequenceNumber {
187        self.max_sequence
188    }
189
190    /// Series count in memtable.
191    pub fn series_count(&self) -> usize {
192        self.series_count
193    }
194}
195
196pub type BoxedBatchIterator = Box<dyn Iterator<Item = Result<Batch>> + Send>;
197
198pub type BoxedRecordBatchIterator = Box<dyn Iterator<Item = Result<RecordBatch>> + Send>;
199
200/// Ranges in a memtable.
201#[derive(Default)]
202pub struct MemtableRanges {
203    /// Range IDs and ranges.
204    pub ranges: BTreeMap<usize, MemtableRange>,
205    /// Statistics of the memtable at the query time.
206    pub stats: MemtableStats,
207}
208
209impl IterBuilder for MemtableRanges {
210    fn build(&self, _metrics: Option<MemScanMetrics>) -> Result<BoxedBatchIterator> {
211        UnsupportedOperationSnafu {
212            err_msg: "MemtableRanges does not support build iterator",
213        }
214        .fail()
215    }
216
217    fn is_record_batch(&self) -> bool {
218        self.ranges.values().all(|range| range.is_record_batch())
219    }
220}
221
222/// In memory write buffer.
223pub trait Memtable: Send + Sync + fmt::Debug {
224    /// Returns the id of this memtable.
225    fn id(&self) -> MemtableId;
226
227    /// Writes key values into the memtable.
228    fn write(&self, kvs: &KeyValues) -> Result<()>;
229
230    /// Writes one key value pair into the memtable.
231    fn write_one(&self, key_value: KeyValue) -> Result<()>;
232
233    /// Writes an encoded batch of into memtable.
234    fn write_bulk(&self, part: crate::memtable::bulk::part::BulkPart) -> Result<()>;
235
236    /// Scans the memtable.
237    /// `projection` selects columns to read, `None` means reading all columns.
238    /// `filters` are the predicates to be pushed down to memtable.
239    ///
240    /// # Note
241    /// This method should only be used for tests.
242    #[cfg(any(test, feature = "test"))]
243    fn iter(
244        &self,
245        projection: Option<&[ColumnId]>,
246        predicate: Option<table::predicate::Predicate>,
247        sequence: Option<SequenceRange>,
248    ) -> Result<BoxedBatchIterator>;
249
250    /// Returns the ranges in the memtable.
251    ///
252    /// The returned map contains the range id and the range after applying the predicate.
253    fn ranges(
254        &self,
255        projection: Option<&[ColumnId]>,
256        options: RangesOptions,
257    ) -> Result<MemtableRanges>;
258
259    /// Returns true if the memtable is empty.
260    fn is_empty(&self) -> bool;
261
262    /// Turns a mutable memtable into an immutable memtable.
263    fn freeze(&self) -> Result<()>;
264
265    /// Returns the [MemtableStats] info of Memtable.
266    fn stats(&self) -> MemtableStats;
267
268    /// Forks this (immutable) memtable and returns a new mutable memtable with specific memtable `id`.
269    ///
270    /// A region must freeze the memtable before invoking this method.
271    fn fork(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef;
272
273    /// Compacts the memtable.
274    ///
275    /// The `for_flush` is true when the flush job calls this method.
276    fn compact(&self, for_flush: bool) -> Result<()> {
277        let _ = for_flush;
278        Ok(())
279    }
280}
281
282pub type MemtableRef = Arc<dyn Memtable>;
283
284/// Builder to build a new [Memtable].
285pub trait MemtableBuilder: Send + Sync + fmt::Debug {
286    /// Builds a new memtable instance.
287    fn build(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef;
288
289    /// Returns true if the memtable supports bulk insert and benefits from it.
290    fn use_bulk_insert(&self, metadata: &RegionMetadataRef) -> bool {
291        let _metadata = metadata;
292        false
293    }
294}
295
296pub type MemtableBuilderRef = Arc<dyn MemtableBuilder>;
297
298/// Memtable memory allocation tracker.
299#[derive(Default)]
300pub struct AllocTracker {
301    write_buffer_manager: Option<WriteBufferManagerRef>,
302    /// Bytes allocated by the tracker.
303    bytes_allocated: AtomicUsize,
304    /// Whether allocating is done.
305    is_done_allocating: AtomicBool,
306}
307
308impl fmt::Debug for AllocTracker {
309    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
310        f.debug_struct("AllocTracker")
311            .field("bytes_allocated", &self.bytes_allocated)
312            .field("is_done_allocating", &self.is_done_allocating)
313            .finish()
314    }
315}
316
317impl AllocTracker {
318    /// Returns a new [AllocTracker].
319    pub fn new(write_buffer_manager: Option<WriteBufferManagerRef>) -> AllocTracker {
320        AllocTracker {
321            write_buffer_manager,
322            bytes_allocated: AtomicUsize::new(0),
323            is_done_allocating: AtomicBool::new(false),
324        }
325    }
326
327    /// Tracks `bytes` memory is allocated.
328    pub(crate) fn on_allocation(&self, bytes: usize) {
329        self.bytes_allocated.fetch_add(bytes, Ordering::Relaxed);
330        WRITE_BUFFER_BYTES.add(bytes as i64);
331        if let Some(write_buffer_manager) = &self.write_buffer_manager {
332            write_buffer_manager.reserve_mem(bytes);
333        }
334    }
335
336    /// Marks we have finished allocating memory so we can free it from
337    /// the write buffer's limit.
338    ///
339    /// The region MUST ensure that it calls this method inside the region writer's write lock.
340    pub(crate) fn done_allocating(&self) {
341        if let Some(write_buffer_manager) = &self.write_buffer_manager
342            && self
343                .is_done_allocating
344                .compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed)
345                .is_ok()
346        {
347            write_buffer_manager.schedule_free_mem(self.bytes_allocated.load(Ordering::Relaxed));
348        }
349    }
350
351    /// Returns bytes allocated.
352    pub(crate) fn bytes_allocated(&self) -> usize {
353        self.bytes_allocated.load(Ordering::Relaxed)
354    }
355
356    /// Returns the write buffer manager.
357    pub(crate) fn write_buffer_manager(&self) -> Option<WriteBufferManagerRef> {
358        self.write_buffer_manager.clone()
359    }
360}
361
362impl Drop for AllocTracker {
363    fn drop(&mut self) {
364        if !self.is_done_allocating.load(Ordering::Relaxed) {
365            self.done_allocating();
366        }
367
368        let bytes_allocated = self.bytes_allocated.load(Ordering::Relaxed);
369        WRITE_BUFFER_BYTES.sub(bytes_allocated as i64);
370
371        // Memory tracked by this tracker is freed.
372        if let Some(write_buffer_manager) = &self.write_buffer_manager {
373            write_buffer_manager.free_mem(bytes_allocated);
374        }
375    }
376}
377
378/// Provider of memtable builders for regions.
379#[derive(Clone)]
380pub(crate) struct MemtableBuilderProvider {
381    write_buffer_manager: Option<WriteBufferManagerRef>,
382    config: Arc<MitoConfig>,
383    compact_dispatcher: Arc<CompactDispatcher>,
384}
385
386impl MemtableBuilderProvider {
387    pub(crate) fn new(
388        write_buffer_manager: Option<WriteBufferManagerRef>,
389        config: Arc<MitoConfig>,
390    ) -> Self {
391        let compact_dispatcher =
392            Arc::new(CompactDispatcher::new(config.max_background_compactions));
393
394        Self {
395            write_buffer_manager,
396            config,
397            compact_dispatcher,
398        }
399    }
400
401    pub(crate) fn builder_for_options(&self, options: &RegionOptions) -> MemtableBuilderRef {
402        let dedup = options.need_dedup();
403        let merge_mode = options.merge_mode();
404        let flat_format = options
405            .sst_format
406            .map(|format| format == FormatType::Flat)
407            .unwrap_or(self.config.default_experimental_flat_format);
408        if flat_format {
409            if options.memtable.is_some() {
410                common_telemetry::info!(
411                    "Overriding memtable config, use BulkMemtable under flat format"
412                );
413            }
414
415            return Arc::new(
416                BulkMemtableBuilder::new(
417                    self.write_buffer_manager.clone(),
418                    !dedup, // append_mode: true if not dedup, false if dedup
419                    merge_mode,
420                )
421                .with_compact_dispatcher(self.compact_dispatcher.clone()),
422            );
423        }
424
425        // The format is not flat.
426        match &options.memtable {
427            Some(MemtableOptions::TimeSeries) => Arc::new(TimeSeriesMemtableBuilder::new(
428                self.write_buffer_manager.clone(),
429                dedup,
430                merge_mode,
431            )),
432            Some(MemtableOptions::PartitionTree(opts)) => {
433                Arc::new(PartitionTreeMemtableBuilder::new(
434                    PartitionTreeConfig {
435                        index_max_keys_per_shard: opts.index_max_keys_per_shard,
436                        data_freeze_threshold: opts.data_freeze_threshold,
437                        fork_dictionary_bytes: opts.fork_dictionary_bytes,
438                        dedup,
439                        merge_mode,
440                    },
441                    self.write_buffer_manager.clone(),
442                ))
443            }
444            None => self.default_primary_key_memtable_builder(dedup, merge_mode),
445        }
446    }
447
448    fn default_primary_key_memtable_builder(
449        &self,
450        dedup: bool,
451        merge_mode: MergeMode,
452    ) -> MemtableBuilderRef {
453        match &self.config.memtable {
454            MemtableConfig::PartitionTree(config) => {
455                let mut config = config.clone();
456                config.dedup = dedup;
457                Arc::new(PartitionTreeMemtableBuilder::new(
458                    config,
459                    self.write_buffer_manager.clone(),
460                ))
461            }
462            MemtableConfig::TimeSeries => Arc::new(TimeSeriesMemtableBuilder::new(
463                self.write_buffer_manager.clone(),
464                dedup,
465                merge_mode,
466            )),
467        }
468    }
469}
470
471/// Metrics for scanning a memtable.
472#[derive(Clone, Default)]
473pub struct MemScanMetrics(Arc<Mutex<MemScanMetricsData>>);
474
475impl MemScanMetrics {
476    /// Merges the metrics.
477    pub(crate) fn merge_inner(&self, inner: &MemScanMetricsData) {
478        let mut metrics = self.0.lock().unwrap();
479        metrics.total_series += inner.total_series;
480        metrics.num_rows += inner.num_rows;
481        metrics.num_batches += inner.num_batches;
482        metrics.scan_cost += inner.scan_cost;
483    }
484
485    /// Gets the metrics data.
486    pub(crate) fn data(&self) -> MemScanMetricsData {
487        self.0.lock().unwrap().clone()
488    }
489}
490
491#[derive(Clone, Default)]
492pub(crate) struct MemScanMetricsData {
493    /// Total series in the memtable.
494    pub(crate) total_series: usize,
495    /// Number of rows read.
496    pub(crate) num_rows: usize,
497    /// Number of batch read.
498    pub(crate) num_batches: usize,
499    /// Duration to scan the memtable.
500    pub(crate) scan_cost: Duration,
501}
502
503/// Encoded range in the memtable.
504pub struct EncodedRange {
505    /// Encoded file data.
506    pub data: Bytes,
507    /// Metadata of the encoded range.
508    pub sst_info: SstInfo,
509}
510
511/// Builder to build an iterator to read the range.
512/// The builder should know the projection and the predicate to build the iterator.
513pub trait IterBuilder: Send + Sync {
514    /// Returns the iterator to read the range.
515    fn build(&self, metrics: Option<MemScanMetrics>) -> Result<BoxedBatchIterator>;
516
517    /// Returns whether the iterator is a record batch iterator.
518    fn is_record_batch(&self) -> bool {
519        false
520    }
521
522    /// Returns the record batch iterator to read the range.
523    fn build_record_batch(
524        &self,
525        metrics: Option<MemScanMetrics>,
526    ) -> Result<BoxedRecordBatchIterator> {
527        let _metrics = metrics;
528        UnsupportedOperationSnafu {
529            err_msg: "Record batch iterator is not supported by this memtable",
530        }
531        .fail()
532    }
533
534    /// Returns the [EncodedRange] if the range is already encoded into SST.
535    fn encoded_range(&self) -> Option<EncodedRange> {
536        None
537    }
538}
539
540pub type BoxedIterBuilder = Box<dyn IterBuilder>;
541
542/// Context shared by ranges of the same memtable.
543pub struct MemtableRangeContext {
544    /// Id of the memtable.
545    id: MemtableId,
546    /// Iterator builder.
547    builder: BoxedIterBuilder,
548    /// All filters.
549    predicate: PredicateGroup,
550}
551
552pub type MemtableRangeContextRef = Arc<MemtableRangeContext>;
553
554impl MemtableRangeContext {
555    /// Creates a new [MemtableRangeContext].
556    pub fn new(id: MemtableId, builder: BoxedIterBuilder, predicate: PredicateGroup) -> Self {
557        Self {
558            id,
559            builder,
560            predicate,
561        }
562    }
563}
564
565/// A range in the memtable.
566#[derive(Clone)]
567pub struct MemtableRange {
568    /// Shared context.
569    context: MemtableRangeContextRef,
570    /// Number of rows in current memtable range.
571    // todo(hl): use [MemtableRangeStats] instead.
572    num_rows: usize,
573}
574
575impl MemtableRange {
576    /// Creates a new range from context.
577    pub fn new(context: MemtableRangeContextRef, num_rows: usize) -> Self {
578        Self { context, num_rows }
579    }
580
581    /// Returns the id of the memtable to read.
582    pub fn id(&self) -> MemtableId {
583        self.context.id
584    }
585
586    /// Builds an iterator to read the range.
587    /// Filters the result by the specific time range, this ensures memtable won't return
588    /// rows out of the time range when new rows are inserted.
589    pub fn build_prune_iter(
590        &self,
591        time_range: FileTimeRange,
592        metrics: Option<MemScanMetrics>,
593    ) -> Result<BoxedBatchIterator> {
594        let iter = self.context.builder.build(metrics)?;
595        let time_filters = self.context.predicate.time_filters();
596        Ok(Box::new(PruneTimeIterator::new(
597            iter,
598            time_range,
599            time_filters,
600        )))
601    }
602
603    /// Builds an iterator to read all rows in range.
604    pub fn build_iter(&self) -> Result<BoxedBatchIterator> {
605        self.context.builder.build(None)
606    }
607
608    /// Builds a record batch iterator to read all rows in range.
609    ///
610    /// This method doesn't take the optional time range because a bulk part is immutable
611    /// so we don't need to filter rows out of the time range.
612    pub fn build_record_batch_iter(
613        &self,
614        metrics: Option<MemScanMetrics>,
615    ) -> Result<BoxedRecordBatchIterator> {
616        self.context.builder.build_record_batch(metrics)
617    }
618
619    /// Returns whether the iterator is a record batch iterator.
620    pub fn is_record_batch(&self) -> bool {
621        self.context.builder.is_record_batch()
622    }
623
624    pub fn num_rows(&self) -> usize {
625        self.num_rows
626    }
627
628    /// Returns the encoded range if available.
629    pub fn encoded(&self) -> Option<EncodedRange> {
630        self.context.builder.encoded_range()
631    }
632}
633
634#[cfg(test)]
635mod tests {
636    use common_base::readable_size::ReadableSize;
637
638    use super::*;
639    use crate::flush::{WriteBufferManager, WriteBufferManagerImpl};
640
641    #[test]
642    fn test_deserialize_memtable_config() {
643        let s = r#"
644type = "partition_tree"
645index_max_keys_per_shard = 8192
646data_freeze_threshold = 1024
647dedup = true
648fork_dictionary_bytes = "512MiB"
649"#;
650        let config: MemtableConfig = toml::from_str(s).unwrap();
651        let MemtableConfig::PartitionTree(memtable_config) = config else {
652            unreachable!()
653        };
654        assert!(memtable_config.dedup);
655        assert_eq!(8192, memtable_config.index_max_keys_per_shard);
656        assert_eq!(1024, memtable_config.data_freeze_threshold);
657        assert_eq!(ReadableSize::mb(512), memtable_config.fork_dictionary_bytes);
658    }
659
660    #[test]
661    fn test_alloc_tracker_without_manager() {
662        let tracker = AllocTracker::new(None);
663        assert_eq!(0, tracker.bytes_allocated());
664        tracker.on_allocation(100);
665        assert_eq!(100, tracker.bytes_allocated());
666        tracker.on_allocation(200);
667        assert_eq!(300, tracker.bytes_allocated());
668
669        tracker.done_allocating();
670        assert_eq!(300, tracker.bytes_allocated());
671    }
672
673    #[test]
674    fn test_alloc_tracker_with_manager() {
675        let manager = Arc::new(WriteBufferManagerImpl::new(1000));
676        {
677            let tracker = AllocTracker::new(Some(manager.clone() as WriteBufferManagerRef));
678
679            tracker.on_allocation(100);
680            assert_eq!(100, tracker.bytes_allocated());
681            assert_eq!(100, manager.memory_usage());
682            assert_eq!(100, manager.mutable_usage());
683
684            for _ in 0..2 {
685                // Done allocating won't free the same memory multiple times.
686                tracker.done_allocating();
687                assert_eq!(100, manager.memory_usage());
688                assert_eq!(0, manager.mutable_usage());
689            }
690        }
691
692        assert_eq!(0, manager.memory_usage());
693        assert_eq!(0, manager.mutable_usage());
694    }
695
696    #[test]
697    fn test_alloc_tracker_without_done_allocating() {
698        let manager = Arc::new(WriteBufferManagerImpl::new(1000));
699        {
700            let tracker = AllocTracker::new(Some(manager.clone() as WriteBufferManagerRef));
701
702            tracker.on_allocation(100);
703            assert_eq!(100, tracker.bytes_allocated());
704            assert_eq!(100, manager.memory_usage());
705            assert_eq!(100, manager.mutable_usage());
706        }
707
708        assert_eq!(0, manager.memory_usage());
709        assert_eq!(0, manager.mutable_usage());
710    }
711}