mito2/
memtable.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Memtables are write buffers for regions.
16
17use std::collections::BTreeMap;
18use std::fmt;
19use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
20use std::sync::{Arc, Mutex};
21use std::time::Duration;
22
23pub use bulk::part::EncodedBulkPart;
24use bytes::Bytes;
25use common_time::Timestamp;
26use datatypes::arrow::record_batch::RecordBatch;
27use mito_codec::key_values::KeyValue;
28pub use mito_codec::key_values::KeyValues;
29use serde::{Deserialize, Serialize};
30use store_api::metadata::RegionMetadataRef;
31use store_api::storage::{ColumnId, SequenceNumber, SequenceRange};
32
33use crate::config::MitoConfig;
34use crate::error::{Result, UnsupportedOperationSnafu};
35use crate::flush::WriteBufferManagerRef;
36use crate::memtable::bulk::{BulkMemtableBuilder, CompactDispatcher};
37use crate::memtable::partition_tree::{PartitionTreeConfig, PartitionTreeMemtableBuilder};
38use crate::memtable::time_series::TimeSeriesMemtableBuilder;
39use crate::metrics::WRITE_BUFFER_BYTES;
40use crate::read::Batch;
41use crate::read::prune::PruneTimeIterator;
42use crate::read::scan_region::PredicateGroup;
43use crate::region::options::{MemtableOptions, MergeMode, RegionOptions};
44use crate::sst::FormatType;
45use crate::sst::file::FileTimeRange;
46use crate::sst::parquet::SstInfo;
47use crate::sst::parquet::file_range::PreFilterMode;
48
49mod builder;
50pub mod bulk;
51pub mod partition_tree;
52pub mod simple_bulk_memtable;
53mod stats;
54pub mod time_partition;
55pub mod time_series;
56pub(crate) mod version;
57
58#[cfg(any(test, feature = "test"))]
59pub use bulk::part::BulkPart;
60pub use bulk::part::{
61    BulkPartEncoder, BulkPartMeta, UnorderedPart, record_batch_estimated_size,
62    sort_primary_key_record_batch,
63};
64#[cfg(any(test, feature = "test"))]
65pub use time_partition::filter_record_batch;
66
67/// Id for memtables.
68///
69/// Should be unique under the same region.
70pub type MemtableId = u32;
71
72/// Config for memtables.
73#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
74#[serde(tag = "type", rename_all = "snake_case")]
75pub enum MemtableConfig {
76    PartitionTree(PartitionTreeConfig),
77    #[default]
78    TimeSeries,
79}
80
81/// Options for querying ranges from a memtable.
82#[derive(Clone)]
83pub struct RangesOptions {
84    /// Whether the ranges are being queried for flush.
85    pub for_flush: bool,
86    /// Mode to pre-filter columns in ranges.
87    pub pre_filter_mode: PreFilterMode,
88    /// Predicate to filter the data.
89    pub predicate: PredicateGroup,
90    /// Sequence range to filter the data.
91    pub sequence: Option<SequenceRange>,
92}
93
94impl Default for RangesOptions {
95    fn default() -> Self {
96        Self {
97            for_flush: false,
98            pre_filter_mode: PreFilterMode::All,
99            predicate: PredicateGroup::default(),
100            sequence: None,
101        }
102    }
103}
104
105impl RangesOptions {
106    /// Creates a new [RangesOptions] for flushing.
107    pub fn for_flush() -> Self {
108        Self {
109            for_flush: true,
110            pre_filter_mode: PreFilterMode::All,
111            predicate: PredicateGroup::default(),
112            sequence: None,
113        }
114    }
115
116    /// Sets the pre-filter mode.
117    #[must_use]
118    pub fn with_pre_filter_mode(mut self, pre_filter_mode: PreFilterMode) -> Self {
119        self.pre_filter_mode = pre_filter_mode;
120        self
121    }
122
123    /// Sets the predicate.
124    #[must_use]
125    pub fn with_predicate(mut self, predicate: PredicateGroup) -> Self {
126        self.predicate = predicate;
127        self
128    }
129
130    /// Sets the sequence range.
131    #[must_use]
132    pub fn with_sequence(mut self, sequence: Option<SequenceRange>) -> Self {
133        self.sequence = sequence;
134        self
135    }
136}
137
138#[derive(Debug, Default, Clone)]
139pub struct MemtableStats {
140    /// The estimated bytes allocated by this memtable from heap.
141    estimated_bytes: usize,
142    /// The inclusive time range that this memtable contains. It is None if
143    /// and only if the memtable is empty.
144    time_range: Option<(Timestamp, Timestamp)>,
145    /// Total rows in memtable
146    pub num_rows: usize,
147    /// Total number of ranges in the memtable.
148    pub num_ranges: usize,
149    /// The maximum sequence number in the memtable.
150    max_sequence: SequenceNumber,
151    /// Number of estimated timeseries in memtable.
152    series_count: usize,
153}
154
155impl MemtableStats {
156    /// Attaches the time range to the stats.
157    #[cfg(any(test, feature = "test"))]
158    pub fn with_time_range(mut self, time_range: Option<(Timestamp, Timestamp)>) -> Self {
159        self.time_range = time_range;
160        self
161    }
162
163    #[cfg(feature = "test")]
164    pub fn with_max_sequence(mut self, max_sequence: SequenceNumber) -> Self {
165        self.max_sequence = max_sequence;
166        self
167    }
168
169    /// Returns the estimated bytes allocated by this memtable.
170    pub fn bytes_allocated(&self) -> usize {
171        self.estimated_bytes
172    }
173
174    /// Returns the time range of the memtable.
175    pub fn time_range(&self) -> Option<(Timestamp, Timestamp)> {
176        self.time_range
177    }
178
179    /// Returns the num of total rows in memtable.
180    pub fn num_rows(&self) -> usize {
181        self.num_rows
182    }
183
184    /// Returns the number of ranges in the memtable.
185    pub fn num_ranges(&self) -> usize {
186        self.num_ranges
187    }
188
189    /// Returns the maximum sequence number in the memtable.
190    pub fn max_sequence(&self) -> SequenceNumber {
191        self.max_sequence
192    }
193
194    /// Series count in memtable.
195    pub fn series_count(&self) -> usize {
196        self.series_count
197    }
198}
199
200pub type BoxedBatchIterator = Box<dyn Iterator<Item = Result<Batch>> + Send>;
201
202pub type BoxedRecordBatchIterator = Box<dyn Iterator<Item = Result<RecordBatch>> + Send>;
203
204/// Ranges in a memtable.
205#[derive(Default)]
206pub struct MemtableRanges {
207    /// Range IDs and ranges.
208    pub ranges: BTreeMap<usize, MemtableRange>,
209    /// Statistics of the memtable at the query time.
210    pub stats: MemtableStats,
211}
212
213impl IterBuilder for MemtableRanges {
214    fn build(&self, _metrics: Option<MemScanMetrics>) -> Result<BoxedBatchIterator> {
215        UnsupportedOperationSnafu {
216            err_msg: "MemtableRanges does not support build iterator",
217        }
218        .fail()
219    }
220
221    fn is_record_batch(&self) -> bool {
222        self.ranges.values().all(|range| range.is_record_batch())
223    }
224}
225
226/// In memory write buffer.
227pub trait Memtable: Send + Sync + fmt::Debug {
228    /// Returns the id of this memtable.
229    fn id(&self) -> MemtableId;
230
231    /// Writes key values into the memtable.
232    fn write(&self, kvs: &KeyValues) -> Result<()>;
233
234    /// Writes one key value pair into the memtable.
235    fn write_one(&self, key_value: KeyValue) -> Result<()>;
236
237    /// Writes an encoded batch of into memtable.
238    fn write_bulk(&self, part: crate::memtable::bulk::part::BulkPart) -> Result<()>;
239
240    /// Scans the memtable.
241    /// `projection` selects columns to read, `None` means reading all columns.
242    /// `filters` are the predicates to be pushed down to memtable.
243    ///
244    /// # Note
245    /// This method should only be used for tests.
246    #[cfg(any(test, feature = "test"))]
247    fn iter(
248        &self,
249        projection: Option<&[ColumnId]>,
250        predicate: Option<table::predicate::Predicate>,
251        sequence: Option<SequenceRange>,
252    ) -> Result<BoxedBatchIterator>;
253
254    /// Returns the ranges in the memtable.
255    ///
256    /// The returned map contains the range id and the range after applying the predicate.
257    fn ranges(
258        &self,
259        projection: Option<&[ColumnId]>,
260        options: RangesOptions,
261    ) -> Result<MemtableRanges>;
262
263    /// Returns true if the memtable is empty.
264    fn is_empty(&self) -> bool;
265
266    /// Turns a mutable memtable into an immutable memtable.
267    fn freeze(&self) -> Result<()>;
268
269    /// Returns the [MemtableStats] info of Memtable.
270    fn stats(&self) -> MemtableStats;
271
272    /// Forks this (immutable) memtable and returns a new mutable memtable with specific memtable `id`.
273    ///
274    /// A region must freeze the memtable before invoking this method.
275    fn fork(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef;
276
277    /// Compacts the memtable.
278    ///
279    /// The `for_flush` is true when the flush job calls this method.
280    fn compact(&self, for_flush: bool) -> Result<()> {
281        let _ = for_flush;
282        Ok(())
283    }
284}
285
286pub type MemtableRef = Arc<dyn Memtable>;
287
288/// Builder to build a new [Memtable].
289pub trait MemtableBuilder: Send + Sync + fmt::Debug {
290    /// Builds a new memtable instance.
291    fn build(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef;
292
293    /// Returns true if the memtable supports bulk insert and benefits from it.
294    fn use_bulk_insert(&self, metadata: &RegionMetadataRef) -> bool {
295        let _metadata = metadata;
296        false
297    }
298}
299
300pub type MemtableBuilderRef = Arc<dyn MemtableBuilder>;
301
302/// Memtable memory allocation tracker.
303#[derive(Default)]
304pub struct AllocTracker {
305    write_buffer_manager: Option<WriteBufferManagerRef>,
306    /// Bytes allocated by the tracker.
307    bytes_allocated: AtomicUsize,
308    /// Whether allocating is done.
309    is_done_allocating: AtomicBool,
310}
311
312impl fmt::Debug for AllocTracker {
313    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
314        f.debug_struct("AllocTracker")
315            .field("bytes_allocated", &self.bytes_allocated)
316            .field("is_done_allocating", &self.is_done_allocating)
317            .finish()
318    }
319}
320
321impl AllocTracker {
322    /// Returns a new [AllocTracker].
323    pub fn new(write_buffer_manager: Option<WriteBufferManagerRef>) -> AllocTracker {
324        AllocTracker {
325            write_buffer_manager,
326            bytes_allocated: AtomicUsize::new(0),
327            is_done_allocating: AtomicBool::new(false),
328        }
329    }
330
331    /// Tracks `bytes` memory is allocated.
332    pub(crate) fn on_allocation(&self, bytes: usize) {
333        self.bytes_allocated.fetch_add(bytes, Ordering::Relaxed);
334        WRITE_BUFFER_BYTES.add(bytes as i64);
335        if let Some(write_buffer_manager) = &self.write_buffer_manager {
336            write_buffer_manager.reserve_mem(bytes);
337        }
338    }
339
340    /// Marks we have finished allocating memory so we can free it from
341    /// the write buffer's limit.
342    ///
343    /// The region MUST ensure that it calls this method inside the region writer's write lock.
344    pub(crate) fn done_allocating(&self) {
345        if let Some(write_buffer_manager) = &self.write_buffer_manager
346            && self
347                .is_done_allocating
348                .compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed)
349                .is_ok()
350        {
351            write_buffer_manager.schedule_free_mem(self.bytes_allocated.load(Ordering::Relaxed));
352        }
353    }
354
355    /// Returns bytes allocated.
356    pub(crate) fn bytes_allocated(&self) -> usize {
357        self.bytes_allocated.load(Ordering::Relaxed)
358    }
359
360    /// Returns the write buffer manager.
361    pub(crate) fn write_buffer_manager(&self) -> Option<WriteBufferManagerRef> {
362        self.write_buffer_manager.clone()
363    }
364}
365
366impl Drop for AllocTracker {
367    fn drop(&mut self) {
368        if !self.is_done_allocating.load(Ordering::Relaxed) {
369            self.done_allocating();
370        }
371
372        let bytes_allocated = self.bytes_allocated.load(Ordering::Relaxed);
373        WRITE_BUFFER_BYTES.sub(bytes_allocated as i64);
374
375        // Memory tracked by this tracker is freed.
376        if let Some(write_buffer_manager) = &self.write_buffer_manager {
377            write_buffer_manager.free_mem(bytes_allocated);
378        }
379    }
380}
381
382/// Provider of memtable builders for regions.
383#[derive(Clone)]
384pub(crate) struct MemtableBuilderProvider {
385    write_buffer_manager: Option<WriteBufferManagerRef>,
386    config: Arc<MitoConfig>,
387    compact_dispatcher: Arc<CompactDispatcher>,
388}
389
390impl MemtableBuilderProvider {
391    pub(crate) fn new(
392        write_buffer_manager: Option<WriteBufferManagerRef>,
393        config: Arc<MitoConfig>,
394    ) -> Self {
395        let compact_dispatcher =
396            Arc::new(CompactDispatcher::new(config.max_background_compactions));
397
398        Self {
399            write_buffer_manager,
400            config,
401            compact_dispatcher,
402        }
403    }
404
405    pub(crate) fn builder_for_options(&self, options: &RegionOptions) -> MemtableBuilderRef {
406        let dedup = options.need_dedup();
407        let merge_mode = options.merge_mode();
408        let flat_format = options
409            .sst_format
410            .map(|format| format == FormatType::Flat)
411            .unwrap_or(self.config.default_experimental_flat_format);
412        if flat_format {
413            if options.memtable.is_some() {
414                common_telemetry::info!(
415                    "Overriding memtable config, use BulkMemtable under flat format"
416                );
417            }
418
419            return Arc::new(
420                BulkMemtableBuilder::new(
421                    self.write_buffer_manager.clone(),
422                    !dedup, // append_mode: true if not dedup, false if dedup
423                    merge_mode,
424                )
425                .with_compact_dispatcher(self.compact_dispatcher.clone()),
426            );
427        }
428
429        // The format is not flat.
430        match &options.memtable {
431            Some(MemtableOptions::TimeSeries) => Arc::new(TimeSeriesMemtableBuilder::new(
432                self.write_buffer_manager.clone(),
433                dedup,
434                merge_mode,
435            )),
436            Some(MemtableOptions::PartitionTree(opts)) => {
437                Arc::new(PartitionTreeMemtableBuilder::new(
438                    PartitionTreeConfig {
439                        index_max_keys_per_shard: opts.index_max_keys_per_shard,
440                        data_freeze_threshold: opts.data_freeze_threshold,
441                        fork_dictionary_bytes: opts.fork_dictionary_bytes,
442                        dedup,
443                        merge_mode,
444                    },
445                    self.write_buffer_manager.clone(),
446                ))
447            }
448            None => self.default_primary_key_memtable_builder(dedup, merge_mode),
449        }
450    }
451
452    fn default_primary_key_memtable_builder(
453        &self,
454        dedup: bool,
455        merge_mode: MergeMode,
456    ) -> MemtableBuilderRef {
457        match &self.config.memtable {
458            MemtableConfig::PartitionTree(config) => {
459                let mut config = config.clone();
460                config.dedup = dedup;
461                Arc::new(PartitionTreeMemtableBuilder::new(
462                    config,
463                    self.write_buffer_manager.clone(),
464                ))
465            }
466            MemtableConfig::TimeSeries => Arc::new(TimeSeriesMemtableBuilder::new(
467                self.write_buffer_manager.clone(),
468                dedup,
469                merge_mode,
470            )),
471        }
472    }
473}
474
475/// Metrics for scanning a memtable.
476#[derive(Clone, Default)]
477pub struct MemScanMetrics(Arc<Mutex<MemScanMetricsData>>);
478
479impl MemScanMetrics {
480    /// Merges the metrics.
481    pub(crate) fn merge_inner(&self, inner: &MemScanMetricsData) {
482        let mut metrics = self.0.lock().unwrap();
483        metrics.total_series += inner.total_series;
484        metrics.num_rows += inner.num_rows;
485        metrics.num_batches += inner.num_batches;
486        metrics.scan_cost += inner.scan_cost;
487    }
488
489    /// Gets the metrics data.
490    pub(crate) fn data(&self) -> MemScanMetricsData {
491        self.0.lock().unwrap().clone()
492    }
493}
494
495#[derive(Clone, Default)]
496pub(crate) struct MemScanMetricsData {
497    /// Total series in the memtable.
498    pub(crate) total_series: usize,
499    /// Number of rows read.
500    pub(crate) num_rows: usize,
501    /// Number of batch read.
502    pub(crate) num_batches: usize,
503    /// Duration to scan the memtable.
504    pub(crate) scan_cost: Duration,
505}
506
507/// Encoded range in the memtable.
508pub struct EncodedRange {
509    /// Encoded file data.
510    pub data: Bytes,
511    /// Metadata of the encoded range.
512    pub sst_info: SstInfo,
513}
514
515/// Builder to build an iterator to read the range.
516/// The builder should know the projection and the predicate to build the iterator.
517pub trait IterBuilder: Send + Sync {
518    /// Returns the iterator to read the range.
519    fn build(&self, metrics: Option<MemScanMetrics>) -> Result<BoxedBatchIterator>;
520
521    /// Returns whether the iterator is a record batch iterator.
522    fn is_record_batch(&self) -> bool {
523        false
524    }
525
526    /// Returns the record batch iterator to read the range.
527    fn build_record_batch(
528        &self,
529        metrics: Option<MemScanMetrics>,
530    ) -> Result<BoxedRecordBatchIterator> {
531        let _metrics = metrics;
532        UnsupportedOperationSnafu {
533            err_msg: "Record batch iterator is not supported by this memtable",
534        }
535        .fail()
536    }
537
538    /// Returns the [EncodedRange] if the range is already encoded into SST.
539    fn encoded_range(&self) -> Option<EncodedRange> {
540        None
541    }
542}
543
544pub type BoxedIterBuilder = Box<dyn IterBuilder>;
545
546/// Context shared by ranges of the same memtable.
547pub struct MemtableRangeContext {
548    /// Id of the memtable.
549    id: MemtableId,
550    /// Iterator builder.
551    builder: BoxedIterBuilder,
552    /// All filters.
553    predicate: PredicateGroup,
554}
555
556pub type MemtableRangeContextRef = Arc<MemtableRangeContext>;
557
558impl MemtableRangeContext {
559    /// Creates a new [MemtableRangeContext].
560    pub fn new(id: MemtableId, builder: BoxedIterBuilder, predicate: PredicateGroup) -> Self {
561        Self {
562            id,
563            builder,
564            predicate,
565        }
566    }
567}
568
569/// A range in the memtable.
570#[derive(Clone)]
571pub struct MemtableRange {
572    /// Shared context.
573    context: MemtableRangeContextRef,
574    /// Number of rows in current memtable range.
575    // todo(hl): use [MemtableRangeStats] instead.
576    num_rows: usize,
577}
578
579impl MemtableRange {
580    /// Creates a new range from context.
581    pub fn new(context: MemtableRangeContextRef, num_rows: usize) -> Self {
582        Self { context, num_rows }
583    }
584
585    /// Returns the id of the memtable to read.
586    pub fn id(&self) -> MemtableId {
587        self.context.id
588    }
589
590    /// Builds an iterator to read the range.
591    /// Filters the result by the specific time range, this ensures memtable won't return
592    /// rows out of the time range when new rows are inserted.
593    pub fn build_prune_iter(
594        &self,
595        time_range: FileTimeRange,
596        metrics: Option<MemScanMetrics>,
597    ) -> Result<BoxedBatchIterator> {
598        let iter = self.context.builder.build(metrics)?;
599        let time_filters = self.context.predicate.time_filters();
600        Ok(Box::new(PruneTimeIterator::new(
601            iter,
602            time_range,
603            time_filters,
604        )))
605    }
606
607    /// Builds an iterator to read all rows in range.
608    pub fn build_iter(&self) -> Result<BoxedBatchIterator> {
609        self.context.builder.build(None)
610    }
611
612    /// Builds a record batch iterator to read all rows in range.
613    ///
614    /// This method doesn't take the optional time range because a bulk part is immutable
615    /// so we don't need to filter rows out of the time range.
616    pub fn build_record_batch_iter(
617        &self,
618        metrics: Option<MemScanMetrics>,
619    ) -> Result<BoxedRecordBatchIterator> {
620        self.context.builder.build_record_batch(metrics)
621    }
622
623    /// Returns whether the iterator is a record batch iterator.
624    pub fn is_record_batch(&self) -> bool {
625        self.context.builder.is_record_batch()
626    }
627
628    pub fn num_rows(&self) -> usize {
629        self.num_rows
630    }
631
632    /// Returns the encoded range if available.
633    pub fn encoded(&self) -> Option<EncodedRange> {
634        self.context.builder.encoded_range()
635    }
636}
637
638#[cfg(test)]
639mod tests {
640    use common_base::readable_size::ReadableSize;
641
642    use super::*;
643    use crate::flush::{WriteBufferManager, WriteBufferManagerImpl};
644
645    #[test]
646    fn test_deserialize_memtable_config() {
647        let s = r#"
648type = "partition_tree"
649index_max_keys_per_shard = 8192
650data_freeze_threshold = 1024
651dedup = true
652fork_dictionary_bytes = "512MiB"
653"#;
654        let config: MemtableConfig = toml::from_str(s).unwrap();
655        let MemtableConfig::PartitionTree(memtable_config) = config else {
656            unreachable!()
657        };
658        assert!(memtable_config.dedup);
659        assert_eq!(8192, memtable_config.index_max_keys_per_shard);
660        assert_eq!(1024, memtable_config.data_freeze_threshold);
661        assert_eq!(ReadableSize::mb(512), memtable_config.fork_dictionary_bytes);
662    }
663
664    #[test]
665    fn test_alloc_tracker_without_manager() {
666        let tracker = AllocTracker::new(None);
667        assert_eq!(0, tracker.bytes_allocated());
668        tracker.on_allocation(100);
669        assert_eq!(100, tracker.bytes_allocated());
670        tracker.on_allocation(200);
671        assert_eq!(300, tracker.bytes_allocated());
672
673        tracker.done_allocating();
674        assert_eq!(300, tracker.bytes_allocated());
675    }
676
677    #[test]
678    fn test_alloc_tracker_with_manager() {
679        let manager = Arc::new(WriteBufferManagerImpl::new(1000));
680        {
681            let tracker = AllocTracker::new(Some(manager.clone() as WriteBufferManagerRef));
682
683            tracker.on_allocation(100);
684            assert_eq!(100, tracker.bytes_allocated());
685            assert_eq!(100, manager.memory_usage());
686            assert_eq!(100, manager.mutable_usage());
687
688            for _ in 0..2 {
689                // Done allocating won't free the same memory multiple times.
690                tracker.done_allocating();
691                assert_eq!(100, manager.memory_usage());
692                assert_eq!(0, manager.mutable_usage());
693            }
694        }
695
696        assert_eq!(0, manager.memory_usage());
697        assert_eq!(0, manager.mutable_usage());
698    }
699
700    #[test]
701    fn test_alloc_tracker_without_done_allocating() {
702        let manager = Arc::new(WriteBufferManagerImpl::new(1000));
703        {
704            let tracker = AllocTracker::new(Some(manager.clone() as WriteBufferManagerRef));
705
706            tracker.on_allocation(100);
707            assert_eq!(100, tracker.bytes_allocated());
708            assert_eq!(100, manager.memory_usage());
709            assert_eq!(100, manager.mutable_usage());
710        }
711
712        assert_eq!(0, manager.memory_usage());
713        assert_eq!(0, manager.mutable_usage());
714    }
715}