mito2/
memtable.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Memtables are write buffers for regions.
16
17use std::collections::BTreeMap;
18use std::fmt;
19use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
20use std::sync::Arc;
21
22pub use bulk::part::EncodedBulkPart;
23use common_time::Timestamp;
24use mito_codec::key_values::KeyValue;
25pub use mito_codec::key_values::KeyValues;
26use serde::{Deserialize, Serialize};
27use store_api::metadata::RegionMetadataRef;
28use store_api::storage::{ColumnId, SequenceNumber};
29use table::predicate::Predicate;
30
31use crate::config::MitoConfig;
32use crate::error::Result;
33use crate::flush::WriteBufferManagerRef;
34use crate::memtable::partition_tree::{PartitionTreeConfig, PartitionTreeMemtableBuilder};
35use crate::memtable::time_series::TimeSeriesMemtableBuilder;
36use crate::metrics::WRITE_BUFFER_BYTES;
37use crate::read::prune::PruneTimeIterator;
38use crate::read::scan_region::PredicateGroup;
39use crate::read::Batch;
40use crate::region::options::{MemtableOptions, MergeMode};
41use crate::sst::file::FileTimeRange;
42
43mod builder;
44pub mod bulk;
45pub mod partition_tree;
46mod simple_bulk_memtable;
47mod stats;
48pub mod time_partition;
49pub mod time_series;
50pub(crate) mod version;
51
52#[cfg(any(test, feature = "test"))]
53pub use bulk::part::BulkPart;
54#[cfg(any(test, feature = "test"))]
55pub use time_partition::filter_record_batch;
56
57/// Id for memtables.
58///
59/// Should be unique under the same region.
60pub type MemtableId = u32;
61
62/// Config for memtables.
63#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
64#[serde(tag = "type", rename_all = "snake_case")]
65pub enum MemtableConfig {
66    PartitionTree(PartitionTreeConfig),
67    TimeSeries,
68}
69
70impl Default for MemtableConfig {
71    fn default() -> Self {
72        Self::TimeSeries
73    }
74}
75
76#[derive(Debug, Default)]
77pub struct MemtableStats {
78    /// The estimated bytes allocated by this memtable from heap.
79    estimated_bytes: usize,
80    /// The inclusive time range that this memtable contains. It is None if
81    /// and only if the memtable is empty.
82    time_range: Option<(Timestamp, Timestamp)>,
83    /// Total rows in memtable
84    num_rows: usize,
85    /// Total number of ranges in the memtable.
86    num_ranges: usize,
87    /// The maximum sequence number in the memtable.
88    max_sequence: SequenceNumber,
89}
90
91impl MemtableStats {
92    /// Attaches the time range to the stats.
93    #[cfg(any(test, feature = "test"))]
94    pub(crate) fn with_time_range(mut self, time_range: Option<(Timestamp, Timestamp)>) -> Self {
95        self.time_range = time_range;
96        self
97    }
98
99    /// Returns the estimated bytes allocated by this memtable.
100    pub fn bytes_allocated(&self) -> usize {
101        self.estimated_bytes
102    }
103
104    /// Returns the time range of the memtable.
105    pub fn time_range(&self) -> Option<(Timestamp, Timestamp)> {
106        self.time_range
107    }
108
109    /// Returns the num of total rows in memtable.
110    pub fn num_rows(&self) -> usize {
111        self.num_rows
112    }
113
114    /// Returns the number of ranges in the memtable.
115    pub fn num_ranges(&self) -> usize {
116        self.num_ranges
117    }
118
119    /// Returns the maximum sequence number in the memtable.
120    pub fn max_sequence(&self) -> SequenceNumber {
121        self.max_sequence
122    }
123}
124
125pub type BoxedBatchIterator = Box<dyn Iterator<Item = Result<Batch>> + Send>;
126
127/// Ranges in a memtable.
128#[derive(Default)]
129pub struct MemtableRanges {
130    /// Range IDs and ranges.
131    pub ranges: BTreeMap<usize, MemtableRange>,
132    /// Statistics of the memtable at the query time.
133    pub stats: MemtableStats,
134}
135
136/// In memory write buffer.
137pub trait Memtable: Send + Sync + fmt::Debug {
138    /// Returns the id of this memtable.
139    fn id(&self) -> MemtableId;
140
141    /// Writes key values into the memtable.
142    fn write(&self, kvs: &KeyValues) -> Result<()>;
143
144    /// Writes one key value pair into the memtable.
145    fn write_one(&self, key_value: KeyValue) -> Result<()>;
146
147    /// Writes an encoded batch of into memtable.
148    fn write_bulk(&self, part: crate::memtable::bulk::part::BulkPart) -> Result<()>;
149
150    /// Scans the memtable.
151    /// `projection` selects columns to read, `None` means reading all columns.
152    /// `filters` are the predicates to be pushed down to memtable.
153    fn iter(
154        &self,
155        projection: Option<&[ColumnId]>,
156        predicate: Option<Predicate>,
157        sequence: Option<SequenceNumber>,
158    ) -> Result<BoxedBatchIterator>;
159
160    /// Returns the ranges in the memtable.
161    /// The returned map contains the range id and the range after applying the predicate.
162    fn ranges(
163        &self,
164        projection: Option<&[ColumnId]>,
165        predicate: PredicateGroup,
166        sequence: Option<SequenceNumber>,
167    ) -> Result<MemtableRanges>;
168
169    /// Returns true if the memtable is empty.
170    fn is_empty(&self) -> bool;
171
172    /// Turns a mutable memtable into an immutable memtable.
173    fn freeze(&self) -> Result<()>;
174
175    /// Returns the [MemtableStats] info of Memtable.
176    fn stats(&self) -> MemtableStats;
177
178    /// Forks this (immutable) memtable and returns a new mutable memtable with specific memtable `id`.
179    ///
180    /// A region must freeze the memtable before invoking this method.
181    fn fork(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef;
182}
183
184pub type MemtableRef = Arc<dyn Memtable>;
185
186/// Builder to build a new [Memtable].
187pub trait MemtableBuilder: Send + Sync + fmt::Debug {
188    /// Builds a new memtable instance.
189    fn build(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef;
190}
191
192pub type MemtableBuilderRef = Arc<dyn MemtableBuilder>;
193
194/// Memtable memory allocation tracker.
195#[derive(Default)]
196pub struct AllocTracker {
197    write_buffer_manager: Option<WriteBufferManagerRef>,
198    /// Bytes allocated by the tracker.
199    bytes_allocated: AtomicUsize,
200    /// Whether allocating is done.
201    is_done_allocating: AtomicBool,
202}
203
204impl fmt::Debug for AllocTracker {
205    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
206        f.debug_struct("AllocTracker")
207            .field("bytes_allocated", &self.bytes_allocated)
208            .field("is_done_allocating", &self.is_done_allocating)
209            .finish()
210    }
211}
212
213impl AllocTracker {
214    /// Returns a new [AllocTracker].
215    pub fn new(write_buffer_manager: Option<WriteBufferManagerRef>) -> AllocTracker {
216        AllocTracker {
217            write_buffer_manager,
218            bytes_allocated: AtomicUsize::new(0),
219            is_done_allocating: AtomicBool::new(false),
220        }
221    }
222
223    /// Tracks `bytes` memory is allocated.
224    pub(crate) fn on_allocation(&self, bytes: usize) {
225        self.bytes_allocated.fetch_add(bytes, Ordering::Relaxed);
226        WRITE_BUFFER_BYTES.add(bytes as i64);
227        if let Some(write_buffer_manager) = &self.write_buffer_manager {
228            write_buffer_manager.reserve_mem(bytes);
229        }
230    }
231
232    /// Marks we have finished allocating memory so we can free it from
233    /// the write buffer's limit.
234    ///
235    /// The region MUST ensure that it calls this method inside the region writer's write lock.
236    pub(crate) fn done_allocating(&self) {
237        if let Some(write_buffer_manager) = &self.write_buffer_manager {
238            if self
239                .is_done_allocating
240                .compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed)
241                .is_ok()
242            {
243                write_buffer_manager
244                    .schedule_free_mem(self.bytes_allocated.load(Ordering::Relaxed));
245            }
246        }
247    }
248
249    /// Returns bytes allocated.
250    pub(crate) fn bytes_allocated(&self) -> usize {
251        self.bytes_allocated.load(Ordering::Relaxed)
252    }
253
254    /// Returns the write buffer manager.
255    pub(crate) fn write_buffer_manager(&self) -> Option<WriteBufferManagerRef> {
256        self.write_buffer_manager.clone()
257    }
258}
259
260impl Drop for AllocTracker {
261    fn drop(&mut self) {
262        if !self.is_done_allocating.load(Ordering::Relaxed) {
263            self.done_allocating();
264        }
265
266        let bytes_allocated = self.bytes_allocated.load(Ordering::Relaxed);
267        WRITE_BUFFER_BYTES.sub(bytes_allocated as i64);
268
269        // Memory tracked by this tracker is freed.
270        if let Some(write_buffer_manager) = &self.write_buffer_manager {
271            write_buffer_manager.free_mem(bytes_allocated);
272        }
273    }
274}
275
276/// Provider of memtable builders for regions.
277#[derive(Clone)]
278pub(crate) struct MemtableBuilderProvider {
279    write_buffer_manager: Option<WriteBufferManagerRef>,
280    config: Arc<MitoConfig>,
281}
282
283impl MemtableBuilderProvider {
284    pub(crate) fn new(
285        write_buffer_manager: Option<WriteBufferManagerRef>,
286        config: Arc<MitoConfig>,
287    ) -> Self {
288        Self {
289            write_buffer_manager,
290            config,
291        }
292    }
293
294    pub(crate) fn builder_for_options(
295        &self,
296        options: Option<&MemtableOptions>,
297        dedup: bool,
298        merge_mode: MergeMode,
299    ) -> MemtableBuilderRef {
300        match options {
301            Some(MemtableOptions::TimeSeries) => Arc::new(TimeSeriesMemtableBuilder::new(
302                self.write_buffer_manager.clone(),
303                dedup,
304                merge_mode,
305            )),
306            Some(MemtableOptions::PartitionTree(opts)) => {
307                Arc::new(PartitionTreeMemtableBuilder::new(
308                    PartitionTreeConfig {
309                        index_max_keys_per_shard: opts.index_max_keys_per_shard,
310                        data_freeze_threshold: opts.data_freeze_threshold,
311                        fork_dictionary_bytes: opts.fork_dictionary_bytes,
312                        dedup,
313                        merge_mode,
314                    },
315                    self.write_buffer_manager.clone(),
316                ))
317            }
318            None => self.default_memtable_builder(dedup, merge_mode),
319        }
320    }
321
322    fn default_memtable_builder(&self, dedup: bool, merge_mode: MergeMode) -> MemtableBuilderRef {
323        match &self.config.memtable {
324            MemtableConfig::PartitionTree(config) => {
325                let mut config = config.clone();
326                config.dedup = dedup;
327                Arc::new(PartitionTreeMemtableBuilder::new(
328                    config,
329                    self.write_buffer_manager.clone(),
330                ))
331            }
332            MemtableConfig::TimeSeries => Arc::new(TimeSeriesMemtableBuilder::new(
333                self.write_buffer_manager.clone(),
334                dedup,
335                merge_mode,
336            )),
337        }
338    }
339}
340
341/// Builder to build an iterator to read the range.
342/// The builder should know the projection and the predicate to build the iterator.
343pub trait IterBuilder: Send + Sync {
344    /// Returns the iterator to read the range.
345    fn build(&self) -> Result<BoxedBatchIterator>;
346}
347
348pub type BoxedIterBuilder = Box<dyn IterBuilder>;
349
350/// Context shared by ranges of the same memtable.
351pub struct MemtableRangeContext {
352    /// Id of the memtable.
353    id: MemtableId,
354    /// Iterator builder.
355    builder: BoxedIterBuilder,
356    /// All filters.
357    predicate: PredicateGroup,
358}
359
360pub type MemtableRangeContextRef = Arc<MemtableRangeContext>;
361
362impl MemtableRangeContext {
363    /// Creates a new [MemtableRangeContext].
364    pub fn new(id: MemtableId, builder: BoxedIterBuilder, predicate: PredicateGroup) -> Self {
365        Self {
366            id,
367            builder,
368            predicate,
369        }
370    }
371}
372
373/// A range in the memtable.
374#[derive(Clone)]
375pub struct MemtableRange {
376    /// Shared context.
377    context: MemtableRangeContextRef,
378}
379
380impl MemtableRange {
381    /// Creates a new range from context.
382    pub fn new(context: MemtableRangeContextRef) -> Self {
383        Self { context }
384    }
385
386    /// Returns the id of the memtable to read.
387    pub fn id(&self) -> MemtableId {
388        self.context.id
389    }
390
391    /// Builds an iterator to read the range.
392    /// Filters the result by the specific time range, this ensures memtable won't return
393    /// rows out of the time range when new rows are inserted.
394    pub fn build_iter(&self, time_range: FileTimeRange) -> Result<BoxedBatchIterator> {
395        let iter = self.context.builder.build()?;
396        let time_filters = self.context.predicate.time_filters();
397        Ok(Box::new(PruneTimeIterator::new(
398            iter,
399            time_range,
400            time_filters,
401        )))
402    }
403}
404
405#[cfg(test)]
406mod tests {
407    use common_base::readable_size::ReadableSize;
408
409    use super::*;
410    use crate::flush::{WriteBufferManager, WriteBufferManagerImpl};
411
412    #[test]
413    fn test_deserialize_memtable_config() {
414        let s = r#"
415type = "partition_tree"
416index_max_keys_per_shard = 8192
417data_freeze_threshold = 1024
418dedup = true
419fork_dictionary_bytes = "512MiB"
420"#;
421        let config: MemtableConfig = toml::from_str(s).unwrap();
422        let MemtableConfig::PartitionTree(memtable_config) = config else {
423            unreachable!()
424        };
425        assert!(memtable_config.dedup);
426        assert_eq!(8192, memtable_config.index_max_keys_per_shard);
427        assert_eq!(1024, memtable_config.data_freeze_threshold);
428        assert_eq!(ReadableSize::mb(512), memtable_config.fork_dictionary_bytes);
429    }
430
431    #[test]
432    fn test_alloc_tracker_without_manager() {
433        let tracker = AllocTracker::new(None);
434        assert_eq!(0, tracker.bytes_allocated());
435        tracker.on_allocation(100);
436        assert_eq!(100, tracker.bytes_allocated());
437        tracker.on_allocation(200);
438        assert_eq!(300, tracker.bytes_allocated());
439
440        tracker.done_allocating();
441        assert_eq!(300, tracker.bytes_allocated());
442    }
443
444    #[test]
445    fn test_alloc_tracker_with_manager() {
446        let manager = Arc::new(WriteBufferManagerImpl::new(1000));
447        {
448            let tracker = AllocTracker::new(Some(manager.clone() as WriteBufferManagerRef));
449
450            tracker.on_allocation(100);
451            assert_eq!(100, tracker.bytes_allocated());
452            assert_eq!(100, manager.memory_usage());
453            assert_eq!(100, manager.mutable_usage());
454
455            for _ in 0..2 {
456                // Done allocating won't free the same memory multiple times.
457                tracker.done_allocating();
458                assert_eq!(100, manager.memory_usage());
459                assert_eq!(0, manager.mutable_usage());
460            }
461        }
462
463        assert_eq!(0, manager.memory_usage());
464        assert_eq!(0, manager.mutable_usage());
465    }
466
467    #[test]
468    fn test_alloc_tracker_without_done_allocating() {
469        let manager = Arc::new(WriteBufferManagerImpl::new(1000));
470        {
471            let tracker = AllocTracker::new(Some(manager.clone() as WriteBufferManagerRef));
472
473            tracker.on_allocation(100);
474            assert_eq!(100, tracker.bytes_allocated());
475            assert_eq!(100, manager.memory_usage());
476            assert_eq!(100, manager.mutable_usage());
477        }
478
479        assert_eq!(0, manager.memory_usage());
480        assert_eq!(0, manager.mutable_usage());
481    }
482}