mito2/
memtable.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Memtables are write buffers for regions.
16
17use std::collections::BTreeMap;
18use std::fmt;
19use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
20use std::sync::{Arc, Mutex};
21use std::time::Duration;
22
23pub use bulk::part::EncodedBulkPart;
24use bytes::Bytes;
25use common_time::Timestamp;
26use datatypes::arrow::record_batch::RecordBatch;
27use mito_codec::key_values::KeyValue;
28pub use mito_codec::key_values::KeyValues;
29use serde::{Deserialize, Serialize};
30use store_api::metadata::RegionMetadataRef;
31use store_api::storage::{ColumnId, SequenceNumber};
32
33use crate::config::MitoConfig;
34use crate::error::{Result, UnsupportedOperationSnafu};
35use crate::flush::WriteBufferManagerRef;
36use crate::memtable::bulk::{BulkMemtableBuilder, CompactDispatcher};
37use crate::memtable::partition_tree::{PartitionTreeConfig, PartitionTreeMemtableBuilder};
38use crate::memtable::time_series::TimeSeriesMemtableBuilder;
39use crate::metrics::WRITE_BUFFER_BYTES;
40use crate::read::Batch;
41use crate::read::prune::PruneTimeIterator;
42use crate::read::scan_region::PredicateGroup;
43use crate::region::options::{MemtableOptions, MergeMode, RegionOptions};
44use crate::sst::FormatType;
45use crate::sst::file::FileTimeRange;
46use crate::sst::parquet::SstInfo;
47
48mod builder;
49pub mod bulk;
50pub mod partition_tree;
51pub mod simple_bulk_memtable;
52mod stats;
53pub mod time_partition;
54pub mod time_series;
55pub(crate) mod version;
56
57#[cfg(any(test, feature = "test"))]
58pub use bulk::part::BulkPart;
59#[cfg(any(test, feature = "test"))]
60pub use time_partition::filter_record_batch;
61
62/// Id for memtables.
63///
64/// Should be unique under the same region.
65pub type MemtableId = u32;
66
67/// Config for memtables.
68#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
69#[serde(tag = "type", rename_all = "snake_case")]
70pub enum MemtableConfig {
71    PartitionTree(PartitionTreeConfig),
72    #[default]
73    TimeSeries,
74}
75
76#[derive(Debug, Default, Clone)]
77pub struct MemtableStats {
78    /// The estimated bytes allocated by this memtable from heap.
79    estimated_bytes: usize,
80    /// The inclusive time range that this memtable contains. It is None if
81    /// and only if the memtable is empty.
82    time_range: Option<(Timestamp, Timestamp)>,
83    /// Total rows in memtable
84    pub num_rows: usize,
85    /// Total number of ranges in the memtable.
86    pub num_ranges: usize,
87    /// The maximum sequence number in the memtable.
88    max_sequence: SequenceNumber,
89    /// Number of estimated timeseries in memtable.
90    series_count: usize,
91}
92
93impl MemtableStats {
94    /// Attaches the time range to the stats.
95    #[cfg(any(test, feature = "test"))]
96    pub fn with_time_range(mut self, time_range: Option<(Timestamp, Timestamp)>) -> Self {
97        self.time_range = time_range;
98        self
99    }
100
101    #[cfg(feature = "test")]
102    pub fn with_max_sequence(mut self, max_sequence: SequenceNumber) -> Self {
103        self.max_sequence = max_sequence;
104        self
105    }
106
107    /// Returns the estimated bytes allocated by this memtable.
108    pub fn bytes_allocated(&self) -> usize {
109        self.estimated_bytes
110    }
111
112    /// Returns the time range of the memtable.
113    pub fn time_range(&self) -> Option<(Timestamp, Timestamp)> {
114        self.time_range
115    }
116
117    /// Returns the num of total rows in memtable.
118    pub fn num_rows(&self) -> usize {
119        self.num_rows
120    }
121
122    /// Returns the number of ranges in the memtable.
123    pub fn num_ranges(&self) -> usize {
124        self.num_ranges
125    }
126
127    /// Returns the maximum sequence number in the memtable.
128    pub fn max_sequence(&self) -> SequenceNumber {
129        self.max_sequence
130    }
131
132    /// Series count in memtable.
133    pub fn series_count(&self) -> usize {
134        self.series_count
135    }
136}
137
138pub type BoxedBatchIterator = Box<dyn Iterator<Item = Result<Batch>> + Send>;
139
140pub type BoxedRecordBatchIterator = Box<dyn Iterator<Item = Result<RecordBatch>> + Send>;
141
142/// Ranges in a memtable.
143#[derive(Default)]
144pub struct MemtableRanges {
145    /// Range IDs and ranges.
146    pub ranges: BTreeMap<usize, MemtableRange>,
147    /// Statistics of the memtable at the query time.
148    pub stats: MemtableStats,
149}
150
151impl IterBuilder for MemtableRanges {
152    fn build(&self, _metrics: Option<MemScanMetrics>) -> Result<BoxedBatchIterator> {
153        UnsupportedOperationSnafu {
154            err_msg: "MemtableRanges does not support build iterator",
155        }
156        .fail()
157    }
158
159    fn is_record_batch(&self) -> bool {
160        self.ranges.values().all(|range| range.is_record_batch())
161    }
162}
163
164/// In memory write buffer.
165pub trait Memtable: Send + Sync + fmt::Debug {
166    /// Returns the id of this memtable.
167    fn id(&self) -> MemtableId;
168
169    /// Writes key values into the memtable.
170    fn write(&self, kvs: &KeyValues) -> Result<()>;
171
172    /// Writes one key value pair into the memtable.
173    fn write_one(&self, key_value: KeyValue) -> Result<()>;
174
175    /// Writes an encoded batch of into memtable.
176    fn write_bulk(&self, part: crate::memtable::bulk::part::BulkPart) -> Result<()>;
177
178    /// Scans the memtable.
179    /// `projection` selects columns to read, `None` means reading all columns.
180    /// `filters` are the predicates to be pushed down to memtable.
181    ///
182    /// # Note
183    /// This method should only be used for tests.
184    #[cfg(any(test, feature = "test"))]
185    fn iter(
186        &self,
187        projection: Option<&[ColumnId]>,
188        predicate: Option<table::predicate::Predicate>,
189        sequence: Option<SequenceNumber>,
190    ) -> Result<BoxedBatchIterator>;
191
192    /// Returns the ranges in the memtable.
193    ///
194    /// The `for_flush` flag is true if the flush job calls this method for flush.
195    /// The returned map contains the range id and the range after applying the predicate.
196    fn ranges(
197        &self,
198        projection: Option<&[ColumnId]>,
199        predicate: PredicateGroup,
200        sequence: Option<SequenceNumber>,
201        for_flush: bool,
202    ) -> Result<MemtableRanges>;
203
204    /// Returns true if the memtable is empty.
205    fn is_empty(&self) -> bool;
206
207    /// Turns a mutable memtable into an immutable memtable.
208    fn freeze(&self) -> Result<()>;
209
210    /// Returns the [MemtableStats] info of Memtable.
211    fn stats(&self) -> MemtableStats;
212
213    /// Forks this (immutable) memtable and returns a new mutable memtable with specific memtable `id`.
214    ///
215    /// A region must freeze the memtable before invoking this method.
216    fn fork(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef;
217
218    /// Compacts the memtable.
219    ///
220    /// The `for_flush` is true when the flush job calls this method.
221    fn compact(&self, for_flush: bool) -> Result<()> {
222        let _ = for_flush;
223        Ok(())
224    }
225}
226
227pub type MemtableRef = Arc<dyn Memtable>;
228
229/// Builder to build a new [Memtable].
230pub trait MemtableBuilder: Send + Sync + fmt::Debug {
231    /// Builds a new memtable instance.
232    fn build(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef;
233
234    /// Returns true if the memtable supports bulk insert and benefits from it.
235    fn use_bulk_insert(&self, metadata: &RegionMetadataRef) -> bool {
236        let _metadata = metadata;
237        false
238    }
239}
240
241pub type MemtableBuilderRef = Arc<dyn MemtableBuilder>;
242
243/// Memtable memory allocation tracker.
244#[derive(Default)]
245pub struct AllocTracker {
246    write_buffer_manager: Option<WriteBufferManagerRef>,
247    /// Bytes allocated by the tracker.
248    bytes_allocated: AtomicUsize,
249    /// Whether allocating is done.
250    is_done_allocating: AtomicBool,
251}
252
253impl fmt::Debug for AllocTracker {
254    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
255        f.debug_struct("AllocTracker")
256            .field("bytes_allocated", &self.bytes_allocated)
257            .field("is_done_allocating", &self.is_done_allocating)
258            .finish()
259    }
260}
261
262impl AllocTracker {
263    /// Returns a new [AllocTracker].
264    pub fn new(write_buffer_manager: Option<WriteBufferManagerRef>) -> AllocTracker {
265        AllocTracker {
266            write_buffer_manager,
267            bytes_allocated: AtomicUsize::new(0),
268            is_done_allocating: AtomicBool::new(false),
269        }
270    }
271
272    /// Tracks `bytes` memory is allocated.
273    pub(crate) fn on_allocation(&self, bytes: usize) {
274        self.bytes_allocated.fetch_add(bytes, Ordering::Relaxed);
275        WRITE_BUFFER_BYTES.add(bytes as i64);
276        if let Some(write_buffer_manager) = &self.write_buffer_manager {
277            write_buffer_manager.reserve_mem(bytes);
278        }
279    }
280
281    /// Marks we have finished allocating memory so we can free it from
282    /// the write buffer's limit.
283    ///
284    /// The region MUST ensure that it calls this method inside the region writer's write lock.
285    pub(crate) fn done_allocating(&self) {
286        if let Some(write_buffer_manager) = &self.write_buffer_manager
287            && self
288                .is_done_allocating
289                .compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed)
290                .is_ok()
291        {
292            write_buffer_manager.schedule_free_mem(self.bytes_allocated.load(Ordering::Relaxed));
293        }
294    }
295
296    /// Returns bytes allocated.
297    pub(crate) fn bytes_allocated(&self) -> usize {
298        self.bytes_allocated.load(Ordering::Relaxed)
299    }
300
301    /// Returns the write buffer manager.
302    pub(crate) fn write_buffer_manager(&self) -> Option<WriteBufferManagerRef> {
303        self.write_buffer_manager.clone()
304    }
305}
306
307impl Drop for AllocTracker {
308    fn drop(&mut self) {
309        if !self.is_done_allocating.load(Ordering::Relaxed) {
310            self.done_allocating();
311        }
312
313        let bytes_allocated = self.bytes_allocated.load(Ordering::Relaxed);
314        WRITE_BUFFER_BYTES.sub(bytes_allocated as i64);
315
316        // Memory tracked by this tracker is freed.
317        if let Some(write_buffer_manager) = &self.write_buffer_manager {
318            write_buffer_manager.free_mem(bytes_allocated);
319        }
320    }
321}
322
323/// Provider of memtable builders for regions.
324#[derive(Clone)]
325pub(crate) struct MemtableBuilderProvider {
326    write_buffer_manager: Option<WriteBufferManagerRef>,
327    config: Arc<MitoConfig>,
328    compact_dispatcher: Arc<CompactDispatcher>,
329}
330
331impl MemtableBuilderProvider {
332    pub(crate) fn new(
333        write_buffer_manager: Option<WriteBufferManagerRef>,
334        config: Arc<MitoConfig>,
335    ) -> Self {
336        let compact_dispatcher =
337            Arc::new(CompactDispatcher::new(config.max_background_compactions));
338
339        Self {
340            write_buffer_manager,
341            config,
342            compact_dispatcher,
343        }
344    }
345
346    pub(crate) fn builder_for_options(&self, options: &RegionOptions) -> MemtableBuilderRef {
347        let dedup = options.need_dedup();
348        let merge_mode = options.merge_mode();
349        let flat_format = options
350            .sst_format
351            .map(|format| format == FormatType::Flat)
352            .unwrap_or(self.config.default_experimental_flat_format);
353        if flat_format {
354            if options.memtable.is_some() {
355                common_telemetry::info!(
356                    "Overriding memtable config, use BulkMemtable under flat format"
357                );
358            }
359
360            return Arc::new(
361                BulkMemtableBuilder::new(
362                    self.write_buffer_manager.clone(),
363                    !dedup, // append_mode: true if not dedup, false if dedup
364                    merge_mode,
365                )
366                .with_compact_dispatcher(self.compact_dispatcher.clone()),
367            );
368        }
369
370        match &options.memtable {
371            Some(MemtableOptions::TimeSeries) => Arc::new(TimeSeriesMemtableBuilder::new(
372                self.write_buffer_manager.clone(),
373                dedup,
374                merge_mode,
375            )),
376            Some(MemtableOptions::PartitionTree(opts)) => {
377                Arc::new(PartitionTreeMemtableBuilder::new(
378                    PartitionTreeConfig {
379                        index_max_keys_per_shard: opts.index_max_keys_per_shard,
380                        data_freeze_threshold: opts.data_freeze_threshold,
381                        fork_dictionary_bytes: opts.fork_dictionary_bytes,
382                        dedup,
383                        merge_mode,
384                    },
385                    self.write_buffer_manager.clone(),
386                ))
387            }
388            None => self.default_memtable_builder(dedup, merge_mode),
389        }
390    }
391
392    fn default_memtable_builder(&self, dedup: bool, merge_mode: MergeMode) -> MemtableBuilderRef {
393        if self.config.default_experimental_flat_format {
394            return Arc::new(
395                BulkMemtableBuilder::new(
396                    self.write_buffer_manager.clone(),
397                    !dedup, // append_mode: true if not dedup, false if dedup
398                    merge_mode,
399                )
400                .with_compact_dispatcher(self.compact_dispatcher.clone()),
401            );
402        }
403
404        match &self.config.memtable {
405            MemtableConfig::PartitionTree(config) => {
406                let mut config = config.clone();
407                config.dedup = dedup;
408                Arc::new(PartitionTreeMemtableBuilder::new(
409                    config,
410                    self.write_buffer_manager.clone(),
411                ))
412            }
413            MemtableConfig::TimeSeries => Arc::new(TimeSeriesMemtableBuilder::new(
414                self.write_buffer_manager.clone(),
415                dedup,
416                merge_mode,
417            )),
418        }
419    }
420}
421
422/// Metrics for scanning a memtable.
423#[derive(Clone, Default)]
424pub struct MemScanMetrics(Arc<Mutex<MemScanMetricsData>>);
425
426impl MemScanMetrics {
427    /// Merges the metrics.
428    pub(crate) fn merge_inner(&self, inner: &MemScanMetricsData) {
429        let mut metrics = self.0.lock().unwrap();
430        metrics.total_series += inner.total_series;
431        metrics.num_rows += inner.num_rows;
432        metrics.num_batches += inner.num_batches;
433        metrics.scan_cost += inner.scan_cost;
434    }
435
436    /// Gets the metrics data.
437    pub(crate) fn data(&self) -> MemScanMetricsData {
438        self.0.lock().unwrap().clone()
439    }
440}
441
442#[derive(Clone, Default)]
443pub(crate) struct MemScanMetricsData {
444    /// Total series in the memtable.
445    pub(crate) total_series: usize,
446    /// Number of rows read.
447    pub(crate) num_rows: usize,
448    /// Number of batch read.
449    pub(crate) num_batches: usize,
450    /// Duration to scan the memtable.
451    pub(crate) scan_cost: Duration,
452}
453
454/// Encoded range in the memtable.
455pub struct EncodedRange {
456    /// Encoded file data.
457    pub data: Bytes,
458    /// Metadata of the encoded range.
459    pub sst_info: SstInfo,
460}
461
462/// Builder to build an iterator to read the range.
463/// The builder should know the projection and the predicate to build the iterator.
464pub trait IterBuilder: Send + Sync {
465    /// Returns the iterator to read the range.
466    fn build(&self, metrics: Option<MemScanMetrics>) -> Result<BoxedBatchIterator>;
467
468    /// Returns whether the iterator is a record batch iterator.
469    fn is_record_batch(&self) -> bool {
470        false
471    }
472
473    /// Returns the record batch iterator to read the range.
474    fn build_record_batch(
475        &self,
476        metrics: Option<MemScanMetrics>,
477    ) -> Result<BoxedRecordBatchIterator> {
478        let _metrics = metrics;
479        UnsupportedOperationSnafu {
480            err_msg: "Record batch iterator is not supported by this memtable",
481        }
482        .fail()
483    }
484
485    /// Returns the [EncodedRange] if the range is already encoded into SST.
486    fn encoded_range(&self) -> Option<EncodedRange> {
487        None
488    }
489}
490
491pub type BoxedIterBuilder = Box<dyn IterBuilder>;
492
493/// Context shared by ranges of the same memtable.
494pub struct MemtableRangeContext {
495    /// Id of the memtable.
496    id: MemtableId,
497    /// Iterator builder.
498    builder: BoxedIterBuilder,
499    /// All filters.
500    predicate: PredicateGroup,
501}
502
503pub type MemtableRangeContextRef = Arc<MemtableRangeContext>;
504
505impl MemtableRangeContext {
506    /// Creates a new [MemtableRangeContext].
507    pub fn new(id: MemtableId, builder: BoxedIterBuilder, predicate: PredicateGroup) -> Self {
508        Self {
509            id,
510            builder,
511            predicate,
512        }
513    }
514}
515
516/// A range in the memtable.
517#[derive(Clone)]
518pub struct MemtableRange {
519    /// Shared context.
520    context: MemtableRangeContextRef,
521    /// Number of rows in current memtable range.
522    // todo(hl): use [MemtableRangeStats] instead.
523    num_rows: usize,
524}
525
526impl MemtableRange {
527    /// Creates a new range from context.
528    pub fn new(context: MemtableRangeContextRef, num_rows: usize) -> Self {
529        Self { context, num_rows }
530    }
531
532    /// Returns the id of the memtable to read.
533    pub fn id(&self) -> MemtableId {
534        self.context.id
535    }
536
537    /// Builds an iterator to read the range.
538    /// Filters the result by the specific time range, this ensures memtable won't return
539    /// rows out of the time range when new rows are inserted.
540    pub fn build_prune_iter(
541        &self,
542        time_range: FileTimeRange,
543        metrics: Option<MemScanMetrics>,
544    ) -> Result<BoxedBatchIterator> {
545        let iter = self.context.builder.build(metrics)?;
546        let time_filters = self.context.predicate.time_filters();
547        Ok(Box::new(PruneTimeIterator::new(
548            iter,
549            time_range,
550            time_filters,
551        )))
552    }
553
554    /// Builds an iterator to read all rows in range.
555    pub fn build_iter(&self) -> Result<BoxedBatchIterator> {
556        self.context.builder.build(None)
557    }
558
559    /// Builds a record batch iterator to read all rows in range.
560    ///
561    /// This method doesn't take the optional time range because a bulk part is immutable
562    /// so we don't need to filter rows out of the time range.
563    pub fn build_record_batch_iter(
564        &self,
565        metrics: Option<MemScanMetrics>,
566    ) -> Result<BoxedRecordBatchIterator> {
567        self.context.builder.build_record_batch(metrics)
568    }
569
570    /// Returns whether the iterator is a record batch iterator.
571    pub fn is_record_batch(&self) -> bool {
572        self.context.builder.is_record_batch()
573    }
574
575    pub fn num_rows(&self) -> usize {
576        self.num_rows
577    }
578
579    /// Returns the encoded range if available.
580    pub fn encoded(&self) -> Option<EncodedRange> {
581        self.context.builder.encoded_range()
582    }
583}
584
585#[cfg(test)]
586mod tests {
587    use common_base::readable_size::ReadableSize;
588
589    use super::*;
590    use crate::flush::{WriteBufferManager, WriteBufferManagerImpl};
591
592    #[test]
593    fn test_deserialize_memtable_config() {
594        let s = r#"
595type = "partition_tree"
596index_max_keys_per_shard = 8192
597data_freeze_threshold = 1024
598dedup = true
599fork_dictionary_bytes = "512MiB"
600"#;
601        let config: MemtableConfig = toml::from_str(s).unwrap();
602        let MemtableConfig::PartitionTree(memtable_config) = config else {
603            unreachable!()
604        };
605        assert!(memtable_config.dedup);
606        assert_eq!(8192, memtable_config.index_max_keys_per_shard);
607        assert_eq!(1024, memtable_config.data_freeze_threshold);
608        assert_eq!(ReadableSize::mb(512), memtable_config.fork_dictionary_bytes);
609    }
610
611    #[test]
612    fn test_alloc_tracker_without_manager() {
613        let tracker = AllocTracker::new(None);
614        assert_eq!(0, tracker.bytes_allocated());
615        tracker.on_allocation(100);
616        assert_eq!(100, tracker.bytes_allocated());
617        tracker.on_allocation(200);
618        assert_eq!(300, tracker.bytes_allocated());
619
620        tracker.done_allocating();
621        assert_eq!(300, tracker.bytes_allocated());
622    }
623
624    #[test]
625    fn test_alloc_tracker_with_manager() {
626        let manager = Arc::new(WriteBufferManagerImpl::new(1000));
627        {
628            let tracker = AllocTracker::new(Some(manager.clone() as WriteBufferManagerRef));
629
630            tracker.on_allocation(100);
631            assert_eq!(100, tracker.bytes_allocated());
632            assert_eq!(100, manager.memory_usage());
633            assert_eq!(100, manager.mutable_usage());
634
635            for _ in 0..2 {
636                // Done allocating won't free the same memory multiple times.
637                tracker.done_allocating();
638                assert_eq!(100, manager.memory_usage());
639                assert_eq!(0, manager.mutable_usage());
640            }
641        }
642
643        assert_eq!(0, manager.memory_usage());
644        assert_eq!(0, manager.mutable_usage());
645    }
646
647    #[test]
648    fn test_alloc_tracker_without_done_allocating() {
649        let manager = Arc::new(WriteBufferManagerImpl::new(1000));
650        {
651            let tracker = AllocTracker::new(Some(manager.clone() as WriteBufferManagerRef));
652
653            tracker.on_allocation(100);
654            assert_eq!(100, tracker.bytes_allocated());
655            assert_eq!(100, manager.memory_usage());
656            assert_eq!(100, manager.mutable_usage());
657        }
658
659        assert_eq!(0, manager.memory_usage());
660        assert_eq!(0, manager.mutable_usage());
661    }
662}