mito2/
memtable.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Memtables are write buffers for regions.
16
17use std::collections::BTreeMap;
18use std::fmt;
19use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
20use std::sync::{Arc, Mutex};
21use std::time::Duration;
22
23pub use bulk::part::EncodedBulkPart;
24use common_time::Timestamp;
25use datatypes::arrow::record_batch::RecordBatch;
26use mito_codec::key_values::KeyValue;
27pub use mito_codec::key_values::KeyValues;
28use serde::{Deserialize, Serialize};
29use store_api::metadata::RegionMetadataRef;
30use store_api::storage::{ColumnId, SequenceNumber};
31
32use crate::config::MitoConfig;
33use crate::error::{Result, UnsupportedOperationSnafu};
34use crate::flush::WriteBufferManagerRef;
35use crate::memtable::partition_tree::{PartitionTreeConfig, PartitionTreeMemtableBuilder};
36use crate::memtable::time_series::TimeSeriesMemtableBuilder;
37use crate::metrics::WRITE_BUFFER_BYTES;
38use crate::read::prune::PruneTimeIterator;
39use crate::read::scan_region::PredicateGroup;
40use crate::read::Batch;
41use crate::region::options::{MemtableOptions, MergeMode};
42use crate::sst::file::FileTimeRange;
43
44mod builder;
45pub mod bulk;
46pub mod partition_tree;
47pub mod simple_bulk_memtable;
48mod stats;
49pub mod time_partition;
50pub mod time_series;
51pub(crate) mod version;
52
53#[cfg(any(test, feature = "test"))]
54pub use bulk::part::BulkPart;
55#[cfg(any(test, feature = "test"))]
56pub use time_partition::filter_record_batch;
57
58/// Id for memtables.
59///
60/// Should be unique under the same region.
61pub type MemtableId = u32;
62
63/// Config for memtables.
64#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
65#[serde(tag = "type", rename_all = "snake_case")]
66pub enum MemtableConfig {
67    PartitionTree(PartitionTreeConfig),
68    TimeSeries,
69}
70
71impl Default for MemtableConfig {
72    fn default() -> Self {
73        Self::TimeSeries
74    }
75}
76
77#[derive(Debug, Default, Clone)]
78pub struct MemtableStats {
79    /// The estimated bytes allocated by this memtable from heap.
80    estimated_bytes: usize,
81    /// The inclusive time range that this memtable contains. It is None if
82    /// and only if the memtable is empty.
83    time_range: Option<(Timestamp, Timestamp)>,
84    /// Total rows in memtable
85    pub num_rows: usize,
86    /// Total number of ranges in the memtable.
87    pub num_ranges: usize,
88    /// The maximum sequence number in the memtable.
89    max_sequence: SequenceNumber,
90    /// Number of estimated timeseries in memtable.
91    series_count: usize,
92}
93
94impl MemtableStats {
95    /// Attaches the time range to the stats.
96    #[cfg(any(test, feature = "test"))]
97    pub fn with_time_range(mut self, time_range: Option<(Timestamp, Timestamp)>) -> Self {
98        self.time_range = time_range;
99        self
100    }
101
102    #[cfg(feature = "test")]
103    pub fn with_max_sequence(mut self, max_sequence: SequenceNumber) -> Self {
104        self.max_sequence = max_sequence;
105        self
106    }
107
108    /// Returns the estimated bytes allocated by this memtable.
109    pub fn bytes_allocated(&self) -> usize {
110        self.estimated_bytes
111    }
112
113    /// Returns the time range of the memtable.
114    pub fn time_range(&self) -> Option<(Timestamp, Timestamp)> {
115        self.time_range
116    }
117
118    /// Returns the num of total rows in memtable.
119    pub fn num_rows(&self) -> usize {
120        self.num_rows
121    }
122
123    /// Returns the number of ranges in the memtable.
124    pub fn num_ranges(&self) -> usize {
125        self.num_ranges
126    }
127
128    /// Returns the maximum sequence number in the memtable.
129    pub fn max_sequence(&self) -> SequenceNumber {
130        self.max_sequence
131    }
132
133    /// Series count in memtable.
134    pub fn series_count(&self) -> usize {
135        self.series_count
136    }
137}
138
139pub type BoxedBatchIterator = Box<dyn Iterator<Item = Result<Batch>> + Send>;
140
141pub type BoxedRecordBatchIterator = Box<dyn Iterator<Item = Result<RecordBatch>> + Send>;
142
143/// Ranges in a memtable.
144#[derive(Default)]
145pub struct MemtableRanges {
146    /// Range IDs and ranges.
147    pub ranges: BTreeMap<usize, MemtableRange>,
148    /// Statistics of the memtable at the query time.
149    pub stats: MemtableStats,
150}
151
152/// In memory write buffer.
153pub trait Memtable: Send + Sync + fmt::Debug {
154    /// Returns the id of this memtable.
155    fn id(&self) -> MemtableId;
156
157    /// Writes key values into the memtable.
158    fn write(&self, kvs: &KeyValues) -> Result<()>;
159
160    /// Writes one key value pair into the memtable.
161    fn write_one(&self, key_value: KeyValue) -> Result<()>;
162
163    /// Writes an encoded batch of into memtable.
164    fn write_bulk(&self, part: crate::memtable::bulk::part::BulkPart) -> Result<()>;
165
166    /// Scans the memtable.
167    /// `projection` selects columns to read, `None` means reading all columns.
168    /// `filters` are the predicates to be pushed down to memtable.
169    ///
170    /// # Note
171    /// This method should only be used for tests.
172    #[cfg(any(test, feature = "test"))]
173    fn iter(
174        &self,
175        projection: Option<&[ColumnId]>,
176        predicate: Option<table::predicate::Predicate>,
177        sequence: Option<SequenceNumber>,
178    ) -> Result<BoxedBatchIterator>;
179
180    /// Returns the ranges in the memtable.
181    /// The returned map contains the range id and the range after applying the predicate.
182    fn ranges(
183        &self,
184        projection: Option<&[ColumnId]>,
185        predicate: PredicateGroup,
186        sequence: Option<SequenceNumber>,
187    ) -> Result<MemtableRanges>;
188
189    /// Returns true if the memtable is empty.
190    fn is_empty(&self) -> bool;
191
192    /// Turns a mutable memtable into an immutable memtable.
193    fn freeze(&self) -> Result<()>;
194
195    /// Returns the [MemtableStats] info of Memtable.
196    fn stats(&self) -> MemtableStats;
197
198    /// Forks this (immutable) memtable and returns a new mutable memtable with specific memtable `id`.
199    ///
200    /// A region must freeze the memtable before invoking this method.
201    fn fork(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef;
202}
203
204pub type MemtableRef = Arc<dyn Memtable>;
205
206/// Builder to build a new [Memtable].
207pub trait MemtableBuilder: Send + Sync + fmt::Debug {
208    /// Builds a new memtable instance.
209    fn build(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef;
210
211    /// Returns true if the memtable supports bulk insert and benefits from it.
212    fn use_bulk_insert(&self, metadata: &RegionMetadataRef) -> bool {
213        let _metadata = metadata;
214        false
215    }
216}
217
218pub type MemtableBuilderRef = Arc<dyn MemtableBuilder>;
219
220/// Memtable memory allocation tracker.
221#[derive(Default)]
222pub struct AllocTracker {
223    write_buffer_manager: Option<WriteBufferManagerRef>,
224    /// Bytes allocated by the tracker.
225    bytes_allocated: AtomicUsize,
226    /// Whether allocating is done.
227    is_done_allocating: AtomicBool,
228}
229
230impl fmt::Debug for AllocTracker {
231    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
232        f.debug_struct("AllocTracker")
233            .field("bytes_allocated", &self.bytes_allocated)
234            .field("is_done_allocating", &self.is_done_allocating)
235            .finish()
236    }
237}
238
239impl AllocTracker {
240    /// Returns a new [AllocTracker].
241    pub fn new(write_buffer_manager: Option<WriteBufferManagerRef>) -> AllocTracker {
242        AllocTracker {
243            write_buffer_manager,
244            bytes_allocated: AtomicUsize::new(0),
245            is_done_allocating: AtomicBool::new(false),
246        }
247    }
248
249    /// Tracks `bytes` memory is allocated.
250    pub(crate) fn on_allocation(&self, bytes: usize) {
251        self.bytes_allocated.fetch_add(bytes, Ordering::Relaxed);
252        WRITE_BUFFER_BYTES.add(bytes as i64);
253        if let Some(write_buffer_manager) = &self.write_buffer_manager {
254            write_buffer_manager.reserve_mem(bytes);
255        }
256    }
257
258    /// Marks we have finished allocating memory so we can free it from
259    /// the write buffer's limit.
260    ///
261    /// The region MUST ensure that it calls this method inside the region writer's write lock.
262    pub(crate) fn done_allocating(&self) {
263        if let Some(write_buffer_manager) = &self.write_buffer_manager {
264            if self
265                .is_done_allocating
266                .compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed)
267                .is_ok()
268            {
269                write_buffer_manager
270                    .schedule_free_mem(self.bytes_allocated.load(Ordering::Relaxed));
271            }
272        }
273    }
274
275    /// Returns bytes allocated.
276    pub(crate) fn bytes_allocated(&self) -> usize {
277        self.bytes_allocated.load(Ordering::Relaxed)
278    }
279
280    /// Returns the write buffer manager.
281    pub(crate) fn write_buffer_manager(&self) -> Option<WriteBufferManagerRef> {
282        self.write_buffer_manager.clone()
283    }
284}
285
286impl Drop for AllocTracker {
287    fn drop(&mut self) {
288        if !self.is_done_allocating.load(Ordering::Relaxed) {
289            self.done_allocating();
290        }
291
292        let bytes_allocated = self.bytes_allocated.load(Ordering::Relaxed);
293        WRITE_BUFFER_BYTES.sub(bytes_allocated as i64);
294
295        // Memory tracked by this tracker is freed.
296        if let Some(write_buffer_manager) = &self.write_buffer_manager {
297            write_buffer_manager.free_mem(bytes_allocated);
298        }
299    }
300}
301
302/// Provider of memtable builders for regions.
303#[derive(Clone)]
304pub(crate) struct MemtableBuilderProvider {
305    write_buffer_manager: Option<WriteBufferManagerRef>,
306    config: Arc<MitoConfig>,
307}
308
309impl MemtableBuilderProvider {
310    pub(crate) fn new(
311        write_buffer_manager: Option<WriteBufferManagerRef>,
312        config: Arc<MitoConfig>,
313    ) -> Self {
314        Self {
315            write_buffer_manager,
316            config,
317        }
318    }
319
320    pub(crate) fn builder_for_options(
321        &self,
322        options: Option<&MemtableOptions>,
323        dedup: bool,
324        merge_mode: MergeMode,
325    ) -> MemtableBuilderRef {
326        match options {
327            Some(MemtableOptions::TimeSeries) => Arc::new(TimeSeriesMemtableBuilder::new(
328                self.write_buffer_manager.clone(),
329                dedup,
330                merge_mode,
331            )),
332            Some(MemtableOptions::PartitionTree(opts)) => {
333                Arc::new(PartitionTreeMemtableBuilder::new(
334                    PartitionTreeConfig {
335                        index_max_keys_per_shard: opts.index_max_keys_per_shard,
336                        data_freeze_threshold: opts.data_freeze_threshold,
337                        fork_dictionary_bytes: opts.fork_dictionary_bytes,
338                        dedup,
339                        merge_mode,
340                    },
341                    self.write_buffer_manager.clone(),
342                ))
343            }
344            None => self.default_memtable_builder(dedup, merge_mode),
345        }
346    }
347
348    fn default_memtable_builder(&self, dedup: bool, merge_mode: MergeMode) -> MemtableBuilderRef {
349        match &self.config.memtable {
350            MemtableConfig::PartitionTree(config) => {
351                let mut config = config.clone();
352                config.dedup = dedup;
353                Arc::new(PartitionTreeMemtableBuilder::new(
354                    config,
355                    self.write_buffer_manager.clone(),
356                ))
357            }
358            MemtableConfig::TimeSeries => Arc::new(TimeSeriesMemtableBuilder::new(
359                self.write_buffer_manager.clone(),
360                dedup,
361                merge_mode,
362            )),
363        }
364    }
365}
366
367/// Metrics for scanning a memtable.
368#[derive(Clone, Default)]
369pub struct MemScanMetrics(Arc<Mutex<MemScanMetricsData>>);
370
371impl MemScanMetrics {
372    /// Merges the metrics.
373    pub(crate) fn merge_inner(&self, inner: &MemScanMetricsData) {
374        let mut metrics = self.0.lock().unwrap();
375        metrics.total_series += inner.total_series;
376        metrics.num_rows += inner.num_rows;
377        metrics.num_batches += inner.num_batches;
378        metrics.scan_cost += inner.scan_cost;
379    }
380
381    /// Gets the metrics data.
382    pub(crate) fn data(&self) -> MemScanMetricsData {
383        self.0.lock().unwrap().clone()
384    }
385}
386
387#[derive(Clone, Default)]
388pub(crate) struct MemScanMetricsData {
389    /// Total series in the memtable.
390    pub(crate) total_series: usize,
391    /// Number of rows read.
392    pub(crate) num_rows: usize,
393    /// Number of batch read.
394    pub(crate) num_batches: usize,
395    /// Duration to scan the memtable.
396    pub(crate) scan_cost: Duration,
397}
398
399/// Builder to build an iterator to read the range.
400/// The builder should know the projection and the predicate to build the iterator.
401pub trait IterBuilder: Send + Sync {
402    /// Returns the iterator to read the range.
403    fn build(&self, metrics: Option<MemScanMetrics>) -> Result<BoxedBatchIterator>;
404
405    /// Returns whether the iterator is a record batch iterator.
406    fn is_record_batch(&self) -> bool {
407        false
408    }
409
410    /// Returns the record batch iterator to read the range.
411    fn build_record_batch(
412        &self,
413        metrics: Option<MemScanMetrics>,
414    ) -> Result<BoxedRecordBatchIterator> {
415        let _metrics = metrics;
416        UnsupportedOperationSnafu {
417            err_msg: "Record batch iterator is not supported by this memtable",
418        }
419        .fail()
420    }
421}
422
423pub type BoxedIterBuilder = Box<dyn IterBuilder>;
424
425/// Context shared by ranges of the same memtable.
426pub struct MemtableRangeContext {
427    /// Id of the memtable.
428    id: MemtableId,
429    /// Iterator builder.
430    builder: BoxedIterBuilder,
431    /// All filters.
432    predicate: PredicateGroup,
433}
434
435pub type MemtableRangeContextRef = Arc<MemtableRangeContext>;
436
437impl MemtableRangeContext {
438    /// Creates a new [MemtableRangeContext].
439    pub fn new(id: MemtableId, builder: BoxedIterBuilder, predicate: PredicateGroup) -> Self {
440        Self {
441            id,
442            builder,
443            predicate,
444        }
445    }
446}
447
448/// A range in the memtable.
449#[derive(Clone)]
450pub struct MemtableRange {
451    /// Shared context.
452    context: MemtableRangeContextRef,
453    /// Number of rows in current memtable range.
454    // todo(hl): use [MemtableRangeStats] instead.
455    num_rows: usize,
456}
457
458impl MemtableRange {
459    /// Creates a new range from context.
460    pub fn new(context: MemtableRangeContextRef, num_rows: usize) -> Self {
461        Self { context, num_rows }
462    }
463
464    /// Returns the id of the memtable to read.
465    pub fn id(&self) -> MemtableId {
466        self.context.id
467    }
468
469    /// Builds an iterator to read the range.
470    /// Filters the result by the specific time range, this ensures memtable won't return
471    /// rows out of the time range when new rows are inserted.
472    pub fn build_prune_iter(
473        &self,
474        time_range: FileTimeRange,
475        metrics: Option<MemScanMetrics>,
476    ) -> Result<BoxedBatchIterator> {
477        let iter = self.context.builder.build(metrics)?;
478        let time_filters = self.context.predicate.time_filters();
479        Ok(Box::new(PruneTimeIterator::new(
480            iter,
481            time_range,
482            time_filters,
483        )))
484    }
485
486    /// Builds an iterator to read all rows in range.
487    pub fn build_iter(&self) -> Result<BoxedBatchIterator> {
488        self.context.builder.build(None)
489    }
490
491    /// Builds a record batch iterator to read all rows in range.
492    ///
493    /// This method doesn't take the optional time range because a bulk part is immutable
494    /// so we don't need to filter rows out of the time range.
495    pub fn build_record_batch_iter(
496        &self,
497        metrics: Option<MemScanMetrics>,
498    ) -> Result<BoxedRecordBatchIterator> {
499        self.context.builder.build_record_batch(metrics)
500    }
501
502    /// Returns whether the iterator is a record batch iterator.
503    pub fn is_record_batch(&self) -> bool {
504        self.context.builder.is_record_batch()
505    }
506
507    pub fn num_rows(&self) -> usize {
508        self.num_rows
509    }
510}
511
512#[cfg(test)]
513mod tests {
514    use common_base::readable_size::ReadableSize;
515
516    use super::*;
517    use crate::flush::{WriteBufferManager, WriteBufferManagerImpl};
518
519    #[test]
520    fn test_deserialize_memtable_config() {
521        let s = r#"
522type = "partition_tree"
523index_max_keys_per_shard = 8192
524data_freeze_threshold = 1024
525dedup = true
526fork_dictionary_bytes = "512MiB"
527"#;
528        let config: MemtableConfig = toml::from_str(s).unwrap();
529        let MemtableConfig::PartitionTree(memtable_config) = config else {
530            unreachable!()
531        };
532        assert!(memtable_config.dedup);
533        assert_eq!(8192, memtable_config.index_max_keys_per_shard);
534        assert_eq!(1024, memtable_config.data_freeze_threshold);
535        assert_eq!(ReadableSize::mb(512), memtable_config.fork_dictionary_bytes);
536    }
537
538    #[test]
539    fn test_alloc_tracker_without_manager() {
540        let tracker = AllocTracker::new(None);
541        assert_eq!(0, tracker.bytes_allocated());
542        tracker.on_allocation(100);
543        assert_eq!(100, tracker.bytes_allocated());
544        tracker.on_allocation(200);
545        assert_eq!(300, tracker.bytes_allocated());
546
547        tracker.done_allocating();
548        assert_eq!(300, tracker.bytes_allocated());
549    }
550
551    #[test]
552    fn test_alloc_tracker_with_manager() {
553        let manager = Arc::new(WriteBufferManagerImpl::new(1000));
554        {
555            let tracker = AllocTracker::new(Some(manager.clone() as WriteBufferManagerRef));
556
557            tracker.on_allocation(100);
558            assert_eq!(100, tracker.bytes_allocated());
559            assert_eq!(100, manager.memory_usage());
560            assert_eq!(100, manager.mutable_usage());
561
562            for _ in 0..2 {
563                // Done allocating won't free the same memory multiple times.
564                tracker.done_allocating();
565                assert_eq!(100, manager.memory_usage());
566                assert_eq!(0, manager.mutable_usage());
567            }
568        }
569
570        assert_eq!(0, manager.memory_usage());
571        assert_eq!(0, manager.mutable_usage());
572    }
573
574    #[test]
575    fn test_alloc_tracker_without_done_allocating() {
576        let manager = Arc::new(WriteBufferManagerImpl::new(1000));
577        {
578            let tracker = AllocTracker::new(Some(manager.clone() as WriteBufferManagerRef));
579
580            tracker.on_allocation(100);
581            assert_eq!(100, tracker.bytes_allocated());
582            assert_eq!(100, manager.memory_usage());
583            assert_eq!(100, manager.mutable_usage());
584        }
585
586        assert_eq!(0, manager.memory_usage());
587        assert_eq!(0, manager.mutable_usage());
588    }
589}