mito2/
memtable.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Memtables are write buffers for regions.
16
17use std::collections::BTreeMap;
18use std::fmt;
19use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
20use std::sync::Arc;
21
22pub use bulk::part::EncodedBulkPart;
23use common_time::Timestamp;
24use mito_codec::key_values::KeyValue;
25pub use mito_codec::key_values::KeyValues;
26use serde::{Deserialize, Serialize};
27use store_api::metadata::RegionMetadataRef;
28use store_api::storage::{ColumnId, SequenceNumber};
29use table::predicate::Predicate;
30
31use crate::config::MitoConfig;
32use crate::error::Result;
33use crate::flush::WriteBufferManagerRef;
34use crate::memtable::partition_tree::{PartitionTreeConfig, PartitionTreeMemtableBuilder};
35use crate::memtable::time_series::TimeSeriesMemtableBuilder;
36use crate::metrics::WRITE_BUFFER_BYTES;
37use crate::read::prune::PruneTimeIterator;
38use crate::read::scan_region::PredicateGroup;
39use crate::read::Batch;
40use crate::region::options::{MemtableOptions, MergeMode};
41use crate::sst::file::FileTimeRange;
42
43mod builder;
44pub mod bulk;
45pub mod partition_tree;
46mod simple_bulk_memtable;
47mod stats;
48pub mod time_partition;
49pub mod time_series;
50pub(crate) mod version;
51
52#[cfg(any(test, feature = "test"))]
53pub use bulk::part::BulkPart;
54#[cfg(any(test, feature = "test"))]
55pub use time_partition::filter_record_batch;
56
57/// Id for memtables.
58///
59/// Should be unique under the same region.
60pub type MemtableId = u32;
61
62/// Config for memtables.
63#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
64#[serde(tag = "type", rename_all = "snake_case")]
65pub enum MemtableConfig {
66    PartitionTree(PartitionTreeConfig),
67    TimeSeries,
68}
69
70impl Default for MemtableConfig {
71    fn default() -> Self {
72        Self::TimeSeries
73    }
74}
75
76#[derive(Debug, Default)]
77pub struct MemtableStats {
78    /// The estimated bytes allocated by this memtable from heap.
79    estimated_bytes: usize,
80    /// The inclusive time range that this memtable contains. It is None if
81    /// and only if the memtable is empty.
82    time_range: Option<(Timestamp, Timestamp)>,
83    /// Total rows in memtable
84    num_rows: usize,
85    /// Total number of ranges in the memtable.
86    num_ranges: usize,
87    /// The maximum sequence number in the memtable.
88    max_sequence: SequenceNumber,
89    /// Number of estimated timeseries in memtable.
90    series_count: usize,
91}
92
93impl MemtableStats {
94    /// Attaches the time range to the stats.
95    #[cfg(any(test, feature = "test"))]
96    pub fn with_time_range(mut self, time_range: Option<(Timestamp, Timestamp)>) -> Self {
97        self.time_range = time_range;
98        self
99    }
100
101    /// Returns the estimated bytes allocated by this memtable.
102    pub fn bytes_allocated(&self) -> usize {
103        self.estimated_bytes
104    }
105
106    /// Returns the time range of the memtable.
107    pub fn time_range(&self) -> Option<(Timestamp, Timestamp)> {
108        self.time_range
109    }
110
111    /// Returns the num of total rows in memtable.
112    pub fn num_rows(&self) -> usize {
113        self.num_rows
114    }
115
116    /// Returns the number of ranges in the memtable.
117    pub fn num_ranges(&self) -> usize {
118        self.num_ranges
119    }
120
121    /// Returns the maximum sequence number in the memtable.
122    pub fn max_sequence(&self) -> SequenceNumber {
123        self.max_sequence
124    }
125
126    /// Series count in memtable.
127    pub fn series_count(&self) -> usize {
128        self.series_count
129    }
130}
131
132pub type BoxedBatchIterator = Box<dyn Iterator<Item = Result<Batch>> + Send>;
133
134/// Ranges in a memtable.
135#[derive(Default)]
136pub struct MemtableRanges {
137    /// Range IDs and ranges.
138    pub ranges: BTreeMap<usize, MemtableRange>,
139    /// Statistics of the memtable at the query time.
140    pub stats: MemtableStats,
141}
142
143/// In memory write buffer.
144pub trait Memtable: Send + Sync + fmt::Debug {
145    /// Returns the id of this memtable.
146    fn id(&self) -> MemtableId;
147
148    /// Writes key values into the memtable.
149    fn write(&self, kvs: &KeyValues) -> Result<()>;
150
151    /// Writes one key value pair into the memtable.
152    fn write_one(&self, key_value: KeyValue) -> Result<()>;
153
154    /// Writes an encoded batch of into memtable.
155    fn write_bulk(&self, part: crate::memtable::bulk::part::BulkPart) -> Result<()>;
156
157    /// Scans the memtable.
158    /// `projection` selects columns to read, `None` means reading all columns.
159    /// `filters` are the predicates to be pushed down to memtable.
160    fn iter(
161        &self,
162        projection: Option<&[ColumnId]>,
163        predicate: Option<Predicate>,
164        sequence: Option<SequenceNumber>,
165    ) -> Result<BoxedBatchIterator>;
166
167    /// Returns the ranges in the memtable.
168    /// The returned map contains the range id and the range after applying the predicate.
169    fn ranges(
170        &self,
171        projection: Option<&[ColumnId]>,
172        predicate: PredicateGroup,
173        sequence: Option<SequenceNumber>,
174    ) -> Result<MemtableRanges>;
175
176    /// Returns true if the memtable is empty.
177    fn is_empty(&self) -> bool;
178
179    /// Turns a mutable memtable into an immutable memtable.
180    fn freeze(&self) -> Result<()>;
181
182    /// Returns the [MemtableStats] info of Memtable.
183    fn stats(&self) -> MemtableStats;
184
185    /// Forks this (immutable) memtable and returns a new mutable memtable with specific memtable `id`.
186    ///
187    /// A region must freeze the memtable before invoking this method.
188    fn fork(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef;
189}
190
191pub type MemtableRef = Arc<dyn Memtable>;
192
193/// Builder to build a new [Memtable].
194pub trait MemtableBuilder: Send + Sync + fmt::Debug {
195    /// Builds a new memtable instance.
196    fn build(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef;
197}
198
199pub type MemtableBuilderRef = Arc<dyn MemtableBuilder>;
200
201/// Memtable memory allocation tracker.
202#[derive(Default)]
203pub struct AllocTracker {
204    write_buffer_manager: Option<WriteBufferManagerRef>,
205    /// Bytes allocated by the tracker.
206    bytes_allocated: AtomicUsize,
207    /// Whether allocating is done.
208    is_done_allocating: AtomicBool,
209}
210
211impl fmt::Debug for AllocTracker {
212    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
213        f.debug_struct("AllocTracker")
214            .field("bytes_allocated", &self.bytes_allocated)
215            .field("is_done_allocating", &self.is_done_allocating)
216            .finish()
217    }
218}
219
220impl AllocTracker {
221    /// Returns a new [AllocTracker].
222    pub fn new(write_buffer_manager: Option<WriteBufferManagerRef>) -> AllocTracker {
223        AllocTracker {
224            write_buffer_manager,
225            bytes_allocated: AtomicUsize::new(0),
226            is_done_allocating: AtomicBool::new(false),
227        }
228    }
229
230    /// Tracks `bytes` memory is allocated.
231    pub(crate) fn on_allocation(&self, bytes: usize) {
232        self.bytes_allocated.fetch_add(bytes, Ordering::Relaxed);
233        WRITE_BUFFER_BYTES.add(bytes as i64);
234        if let Some(write_buffer_manager) = &self.write_buffer_manager {
235            write_buffer_manager.reserve_mem(bytes);
236        }
237    }
238
239    /// Marks we have finished allocating memory so we can free it from
240    /// the write buffer's limit.
241    ///
242    /// The region MUST ensure that it calls this method inside the region writer's write lock.
243    pub(crate) fn done_allocating(&self) {
244        if let Some(write_buffer_manager) = &self.write_buffer_manager {
245            if self
246                .is_done_allocating
247                .compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed)
248                .is_ok()
249            {
250                write_buffer_manager
251                    .schedule_free_mem(self.bytes_allocated.load(Ordering::Relaxed));
252            }
253        }
254    }
255
256    /// Returns bytes allocated.
257    pub(crate) fn bytes_allocated(&self) -> usize {
258        self.bytes_allocated.load(Ordering::Relaxed)
259    }
260
261    /// Returns the write buffer manager.
262    pub(crate) fn write_buffer_manager(&self) -> Option<WriteBufferManagerRef> {
263        self.write_buffer_manager.clone()
264    }
265}
266
267impl Drop for AllocTracker {
268    fn drop(&mut self) {
269        if !self.is_done_allocating.load(Ordering::Relaxed) {
270            self.done_allocating();
271        }
272
273        let bytes_allocated = self.bytes_allocated.load(Ordering::Relaxed);
274        WRITE_BUFFER_BYTES.sub(bytes_allocated as i64);
275
276        // Memory tracked by this tracker is freed.
277        if let Some(write_buffer_manager) = &self.write_buffer_manager {
278            write_buffer_manager.free_mem(bytes_allocated);
279        }
280    }
281}
282
283/// Provider of memtable builders for regions.
284#[derive(Clone)]
285pub(crate) struct MemtableBuilderProvider {
286    write_buffer_manager: Option<WriteBufferManagerRef>,
287    config: Arc<MitoConfig>,
288}
289
290impl MemtableBuilderProvider {
291    pub(crate) fn new(
292        write_buffer_manager: Option<WriteBufferManagerRef>,
293        config: Arc<MitoConfig>,
294    ) -> Self {
295        Self {
296            write_buffer_manager,
297            config,
298        }
299    }
300
301    pub(crate) fn builder_for_options(
302        &self,
303        options: Option<&MemtableOptions>,
304        dedup: bool,
305        merge_mode: MergeMode,
306    ) -> MemtableBuilderRef {
307        match options {
308            Some(MemtableOptions::TimeSeries) => Arc::new(TimeSeriesMemtableBuilder::new(
309                self.write_buffer_manager.clone(),
310                dedup,
311                merge_mode,
312            )),
313            Some(MemtableOptions::PartitionTree(opts)) => {
314                Arc::new(PartitionTreeMemtableBuilder::new(
315                    PartitionTreeConfig {
316                        index_max_keys_per_shard: opts.index_max_keys_per_shard,
317                        data_freeze_threshold: opts.data_freeze_threshold,
318                        fork_dictionary_bytes: opts.fork_dictionary_bytes,
319                        dedup,
320                        merge_mode,
321                    },
322                    self.write_buffer_manager.clone(),
323                ))
324            }
325            None => self.default_memtable_builder(dedup, merge_mode),
326        }
327    }
328
329    fn default_memtable_builder(&self, dedup: bool, merge_mode: MergeMode) -> MemtableBuilderRef {
330        match &self.config.memtable {
331            MemtableConfig::PartitionTree(config) => {
332                let mut config = config.clone();
333                config.dedup = dedup;
334                Arc::new(PartitionTreeMemtableBuilder::new(
335                    config,
336                    self.write_buffer_manager.clone(),
337                ))
338            }
339            MemtableConfig::TimeSeries => Arc::new(TimeSeriesMemtableBuilder::new(
340                self.write_buffer_manager.clone(),
341                dedup,
342                merge_mode,
343            )),
344        }
345    }
346}
347
348/// Builder to build an iterator to read the range.
349/// The builder should know the projection and the predicate to build the iterator.
350pub trait IterBuilder: Send + Sync {
351    /// Returns the iterator to read the range.
352    fn build(&self) -> Result<BoxedBatchIterator>;
353}
354
355pub type BoxedIterBuilder = Box<dyn IterBuilder>;
356
357/// Context shared by ranges of the same memtable.
358pub struct MemtableRangeContext {
359    /// Id of the memtable.
360    id: MemtableId,
361    /// Iterator builder.
362    builder: BoxedIterBuilder,
363    /// All filters.
364    predicate: PredicateGroup,
365}
366
367pub type MemtableRangeContextRef = Arc<MemtableRangeContext>;
368
369impl MemtableRangeContext {
370    /// Creates a new [MemtableRangeContext].
371    pub fn new(id: MemtableId, builder: BoxedIterBuilder, predicate: PredicateGroup) -> Self {
372        Self {
373            id,
374            builder,
375            predicate,
376        }
377    }
378}
379
380/// A range in the memtable.
381#[derive(Clone)]
382pub struct MemtableRange {
383    /// Shared context.
384    context: MemtableRangeContextRef,
385}
386
387impl MemtableRange {
388    /// Creates a new range from context.
389    pub fn new(context: MemtableRangeContextRef) -> Self {
390        Self { context }
391    }
392
393    /// Returns the id of the memtable to read.
394    pub fn id(&self) -> MemtableId {
395        self.context.id
396    }
397
398    /// Builds an iterator to read the range.
399    /// Filters the result by the specific time range, this ensures memtable won't return
400    /// rows out of the time range when new rows are inserted.
401    pub fn build_iter(&self, time_range: FileTimeRange) -> Result<BoxedBatchIterator> {
402        let iter = self.context.builder.build()?;
403        let time_filters = self.context.predicate.time_filters();
404        Ok(Box::new(PruneTimeIterator::new(
405            iter,
406            time_range,
407            time_filters,
408        )))
409    }
410}
411
412#[cfg(test)]
413mod tests {
414    use common_base::readable_size::ReadableSize;
415
416    use super::*;
417    use crate::flush::{WriteBufferManager, WriteBufferManagerImpl};
418
419    #[test]
420    fn test_deserialize_memtable_config() {
421        let s = r#"
422type = "partition_tree"
423index_max_keys_per_shard = 8192
424data_freeze_threshold = 1024
425dedup = true
426fork_dictionary_bytes = "512MiB"
427"#;
428        let config: MemtableConfig = toml::from_str(s).unwrap();
429        let MemtableConfig::PartitionTree(memtable_config) = config else {
430            unreachable!()
431        };
432        assert!(memtable_config.dedup);
433        assert_eq!(8192, memtable_config.index_max_keys_per_shard);
434        assert_eq!(1024, memtable_config.data_freeze_threshold);
435        assert_eq!(ReadableSize::mb(512), memtable_config.fork_dictionary_bytes);
436    }
437
438    #[test]
439    fn test_alloc_tracker_without_manager() {
440        let tracker = AllocTracker::new(None);
441        assert_eq!(0, tracker.bytes_allocated());
442        tracker.on_allocation(100);
443        assert_eq!(100, tracker.bytes_allocated());
444        tracker.on_allocation(200);
445        assert_eq!(300, tracker.bytes_allocated());
446
447        tracker.done_allocating();
448        assert_eq!(300, tracker.bytes_allocated());
449    }
450
451    #[test]
452    fn test_alloc_tracker_with_manager() {
453        let manager = Arc::new(WriteBufferManagerImpl::new(1000));
454        {
455            let tracker = AllocTracker::new(Some(manager.clone() as WriteBufferManagerRef));
456
457            tracker.on_allocation(100);
458            assert_eq!(100, tracker.bytes_allocated());
459            assert_eq!(100, manager.memory_usage());
460            assert_eq!(100, manager.mutable_usage());
461
462            for _ in 0..2 {
463                // Done allocating won't free the same memory multiple times.
464                tracker.done_allocating();
465                assert_eq!(100, manager.memory_usage());
466                assert_eq!(0, manager.mutable_usage());
467            }
468        }
469
470        assert_eq!(0, manager.memory_usage());
471        assert_eq!(0, manager.mutable_usage());
472    }
473
474    #[test]
475    fn test_alloc_tracker_without_done_allocating() {
476        let manager = Arc::new(WriteBufferManagerImpl::new(1000));
477        {
478            let tracker = AllocTracker::new(Some(manager.clone() as WriteBufferManagerRef));
479
480            tracker.on_allocation(100);
481            assert_eq!(100, tracker.bytes_allocated());
482            assert_eq!(100, manager.memory_usage());
483            assert_eq!(100, manager.mutable_usage());
484        }
485
486        assert_eq!(0, manager.memory_usage());
487        assert_eq!(0, manager.mutable_usage());
488    }
489}