mito2/
memtable.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Memtables are write buffers for regions.
16
17use std::collections::BTreeMap;
18use std::fmt;
19use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
20use std::sync::Arc;
21
22pub use bulk::part::EncodedBulkPart;
23use common_time::Timestamp;
24use mito_codec::key_values::KeyValue;
25pub use mito_codec::key_values::KeyValues;
26use serde::{Deserialize, Serialize};
27use store_api::metadata::RegionMetadataRef;
28use store_api::storage::{ColumnId, SequenceNumber};
29use table::predicate::Predicate;
30
31use crate::config::MitoConfig;
32use crate::error::Result;
33use crate::flush::WriteBufferManagerRef;
34use crate::memtable::partition_tree::{PartitionTreeConfig, PartitionTreeMemtableBuilder};
35use crate::memtable::time_series::TimeSeriesMemtableBuilder;
36use crate::metrics::WRITE_BUFFER_BYTES;
37use crate::read::prune::PruneTimeIterator;
38use crate::read::scan_region::PredicateGroup;
39use crate::read::Batch;
40use crate::region::options::{MemtableOptions, MergeMode};
41use crate::sst::file::FileTimeRange;
42
43mod builder;
44pub mod bulk;
45pub mod partition_tree;
46mod simple_bulk_memtable;
47mod stats;
48pub mod time_partition;
49pub mod time_series;
50pub(crate) mod version;
51
52#[cfg(any(test, feature = "test"))]
53pub use bulk::part::BulkPart;
54#[cfg(any(test, feature = "test"))]
55pub use time_partition::filter_record_batch;
56
57/// Id for memtables.
58///
59/// Should be unique under the same region.
60pub type MemtableId = u32;
61
62/// Config for memtables.
63#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
64#[serde(tag = "type", rename_all = "snake_case")]
65pub enum MemtableConfig {
66    PartitionTree(PartitionTreeConfig),
67    TimeSeries,
68}
69
70impl Default for MemtableConfig {
71    fn default() -> Self {
72        Self::TimeSeries
73    }
74}
75
76#[derive(Debug, Default)]
77pub struct MemtableStats {
78    /// The estimated bytes allocated by this memtable from heap.
79    estimated_bytes: usize,
80    /// The inclusive time range that this memtable contains. It is None if
81    /// and only if the memtable is empty.
82    time_range: Option<(Timestamp, Timestamp)>,
83    /// Total rows in memtable
84    num_rows: usize,
85    /// Total number of ranges in the memtable.
86    num_ranges: usize,
87    /// The maximum sequence number in the memtable.
88    max_sequence: SequenceNumber,
89    /// Number of estimated timeseries in memtable.
90    series_count: usize,
91}
92
93impl MemtableStats {
94    /// Attaches the time range to the stats.
95    #[cfg(any(test, feature = "test"))]
96    pub fn with_time_range(mut self, time_range: Option<(Timestamp, Timestamp)>) -> Self {
97        self.time_range = time_range;
98        self
99    }
100
101    #[cfg(feature = "test")]
102    pub fn with_max_sequence(mut self, max_sequence: SequenceNumber) -> Self {
103        self.max_sequence = max_sequence;
104        self
105    }
106
107    /// Returns the estimated bytes allocated by this memtable.
108    pub fn bytes_allocated(&self) -> usize {
109        self.estimated_bytes
110    }
111
112    /// Returns the time range of the memtable.
113    pub fn time_range(&self) -> Option<(Timestamp, Timestamp)> {
114        self.time_range
115    }
116
117    /// Returns the num of total rows in memtable.
118    pub fn num_rows(&self) -> usize {
119        self.num_rows
120    }
121
122    /// Returns the number of ranges in the memtable.
123    pub fn num_ranges(&self) -> usize {
124        self.num_ranges
125    }
126
127    /// Returns the maximum sequence number in the memtable.
128    pub fn max_sequence(&self) -> SequenceNumber {
129        self.max_sequence
130    }
131
132    /// Series count in memtable.
133    pub fn series_count(&self) -> usize {
134        self.series_count
135    }
136}
137
138pub type BoxedBatchIterator = Box<dyn Iterator<Item = Result<Batch>> + Send>;
139
140/// Ranges in a memtable.
141#[derive(Default)]
142pub struct MemtableRanges {
143    /// Range IDs and ranges.
144    pub ranges: BTreeMap<usize, MemtableRange>,
145    /// Statistics of the memtable at the query time.
146    pub stats: MemtableStats,
147}
148
149/// In memory write buffer.
150pub trait Memtable: Send + Sync + fmt::Debug {
151    /// Returns the id of this memtable.
152    fn id(&self) -> MemtableId;
153
154    /// Writes key values into the memtable.
155    fn write(&self, kvs: &KeyValues) -> Result<()>;
156
157    /// Writes one key value pair into the memtable.
158    fn write_one(&self, key_value: KeyValue) -> Result<()>;
159
160    /// Writes an encoded batch of into memtable.
161    fn write_bulk(&self, part: crate::memtable::bulk::part::BulkPart) -> Result<()>;
162
163    /// Scans the memtable.
164    /// `projection` selects columns to read, `None` means reading all columns.
165    /// `filters` are the predicates to be pushed down to memtable.
166    fn iter(
167        &self,
168        projection: Option<&[ColumnId]>,
169        predicate: Option<Predicate>,
170        sequence: Option<SequenceNumber>,
171    ) -> Result<BoxedBatchIterator>;
172
173    /// Returns the ranges in the memtable.
174    /// The returned map contains the range id and the range after applying the predicate.
175    fn ranges(
176        &self,
177        projection: Option<&[ColumnId]>,
178        predicate: PredicateGroup,
179        sequence: Option<SequenceNumber>,
180    ) -> Result<MemtableRanges>;
181
182    /// Returns true if the memtable is empty.
183    fn is_empty(&self) -> bool;
184
185    /// Turns a mutable memtable into an immutable memtable.
186    fn freeze(&self) -> Result<()>;
187
188    /// Returns the [MemtableStats] info of Memtable.
189    fn stats(&self) -> MemtableStats;
190
191    /// Forks this (immutable) memtable and returns a new mutable memtable with specific memtable `id`.
192    ///
193    /// A region must freeze the memtable before invoking this method.
194    fn fork(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef;
195}
196
197pub type MemtableRef = Arc<dyn Memtable>;
198
199/// Builder to build a new [Memtable].
200pub trait MemtableBuilder: Send + Sync + fmt::Debug {
201    /// Builds a new memtable instance.
202    fn build(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef;
203}
204
205pub type MemtableBuilderRef = Arc<dyn MemtableBuilder>;
206
207/// Memtable memory allocation tracker.
208#[derive(Default)]
209pub struct AllocTracker {
210    write_buffer_manager: Option<WriteBufferManagerRef>,
211    /// Bytes allocated by the tracker.
212    bytes_allocated: AtomicUsize,
213    /// Whether allocating is done.
214    is_done_allocating: AtomicBool,
215}
216
217impl fmt::Debug for AllocTracker {
218    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
219        f.debug_struct("AllocTracker")
220            .field("bytes_allocated", &self.bytes_allocated)
221            .field("is_done_allocating", &self.is_done_allocating)
222            .finish()
223    }
224}
225
226impl AllocTracker {
227    /// Returns a new [AllocTracker].
228    pub fn new(write_buffer_manager: Option<WriteBufferManagerRef>) -> AllocTracker {
229        AllocTracker {
230            write_buffer_manager,
231            bytes_allocated: AtomicUsize::new(0),
232            is_done_allocating: AtomicBool::new(false),
233        }
234    }
235
236    /// Tracks `bytes` memory is allocated.
237    pub(crate) fn on_allocation(&self, bytes: usize) {
238        self.bytes_allocated.fetch_add(bytes, Ordering::Relaxed);
239        WRITE_BUFFER_BYTES.add(bytes as i64);
240        if let Some(write_buffer_manager) = &self.write_buffer_manager {
241            write_buffer_manager.reserve_mem(bytes);
242        }
243    }
244
245    /// Marks we have finished allocating memory so we can free it from
246    /// the write buffer's limit.
247    ///
248    /// The region MUST ensure that it calls this method inside the region writer's write lock.
249    pub(crate) fn done_allocating(&self) {
250        if let Some(write_buffer_manager) = &self.write_buffer_manager {
251            if self
252                .is_done_allocating
253                .compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed)
254                .is_ok()
255            {
256                write_buffer_manager
257                    .schedule_free_mem(self.bytes_allocated.load(Ordering::Relaxed));
258            }
259        }
260    }
261
262    /// Returns bytes allocated.
263    pub(crate) fn bytes_allocated(&self) -> usize {
264        self.bytes_allocated.load(Ordering::Relaxed)
265    }
266
267    /// Returns the write buffer manager.
268    pub(crate) fn write_buffer_manager(&self) -> Option<WriteBufferManagerRef> {
269        self.write_buffer_manager.clone()
270    }
271}
272
273impl Drop for AllocTracker {
274    fn drop(&mut self) {
275        if !self.is_done_allocating.load(Ordering::Relaxed) {
276            self.done_allocating();
277        }
278
279        let bytes_allocated = self.bytes_allocated.load(Ordering::Relaxed);
280        WRITE_BUFFER_BYTES.sub(bytes_allocated as i64);
281
282        // Memory tracked by this tracker is freed.
283        if let Some(write_buffer_manager) = &self.write_buffer_manager {
284            write_buffer_manager.free_mem(bytes_allocated);
285        }
286    }
287}
288
289/// Provider of memtable builders for regions.
290#[derive(Clone)]
291pub(crate) struct MemtableBuilderProvider {
292    write_buffer_manager: Option<WriteBufferManagerRef>,
293    config: Arc<MitoConfig>,
294}
295
296impl MemtableBuilderProvider {
297    pub(crate) fn new(
298        write_buffer_manager: Option<WriteBufferManagerRef>,
299        config: Arc<MitoConfig>,
300    ) -> Self {
301        Self {
302            write_buffer_manager,
303            config,
304        }
305    }
306
307    pub(crate) fn builder_for_options(
308        &self,
309        options: Option<&MemtableOptions>,
310        dedup: bool,
311        merge_mode: MergeMode,
312    ) -> MemtableBuilderRef {
313        match options {
314            Some(MemtableOptions::TimeSeries) => Arc::new(TimeSeriesMemtableBuilder::new(
315                self.write_buffer_manager.clone(),
316                dedup,
317                merge_mode,
318            )),
319            Some(MemtableOptions::PartitionTree(opts)) => {
320                Arc::new(PartitionTreeMemtableBuilder::new(
321                    PartitionTreeConfig {
322                        index_max_keys_per_shard: opts.index_max_keys_per_shard,
323                        data_freeze_threshold: opts.data_freeze_threshold,
324                        fork_dictionary_bytes: opts.fork_dictionary_bytes,
325                        dedup,
326                        merge_mode,
327                    },
328                    self.write_buffer_manager.clone(),
329                ))
330            }
331            None => self.default_memtable_builder(dedup, merge_mode),
332        }
333    }
334
335    fn default_memtable_builder(&self, dedup: bool, merge_mode: MergeMode) -> MemtableBuilderRef {
336        match &self.config.memtable {
337            MemtableConfig::PartitionTree(config) => {
338                let mut config = config.clone();
339                config.dedup = dedup;
340                Arc::new(PartitionTreeMemtableBuilder::new(
341                    config,
342                    self.write_buffer_manager.clone(),
343                ))
344            }
345            MemtableConfig::TimeSeries => Arc::new(TimeSeriesMemtableBuilder::new(
346                self.write_buffer_manager.clone(),
347                dedup,
348                merge_mode,
349            )),
350        }
351    }
352}
353
354/// Builder to build an iterator to read the range.
355/// The builder should know the projection and the predicate to build the iterator.
356pub trait IterBuilder: Send + Sync {
357    /// Returns the iterator to read the range.
358    fn build(&self) -> Result<BoxedBatchIterator>;
359}
360
361pub type BoxedIterBuilder = Box<dyn IterBuilder>;
362
363/// Context shared by ranges of the same memtable.
364pub struct MemtableRangeContext {
365    /// Id of the memtable.
366    id: MemtableId,
367    /// Iterator builder.
368    builder: BoxedIterBuilder,
369    /// All filters.
370    predicate: PredicateGroup,
371}
372
373pub type MemtableRangeContextRef = Arc<MemtableRangeContext>;
374
375impl MemtableRangeContext {
376    /// Creates a new [MemtableRangeContext].
377    pub fn new(id: MemtableId, builder: BoxedIterBuilder, predicate: PredicateGroup) -> Self {
378        Self {
379            id,
380            builder,
381            predicate,
382        }
383    }
384}
385
386/// A range in the memtable.
387#[derive(Clone)]
388pub struct MemtableRange {
389    /// Shared context.
390    context: MemtableRangeContextRef,
391}
392
393impl MemtableRange {
394    /// Creates a new range from context.
395    pub fn new(context: MemtableRangeContextRef) -> Self {
396        Self { context }
397    }
398
399    /// Returns the id of the memtable to read.
400    pub fn id(&self) -> MemtableId {
401        self.context.id
402    }
403
404    /// Builds an iterator to read the range.
405    /// Filters the result by the specific time range, this ensures memtable won't return
406    /// rows out of the time range when new rows are inserted.
407    pub fn build_iter(&self, time_range: FileTimeRange) -> Result<BoxedBatchIterator> {
408        let iter = self.context.builder.build()?;
409        let time_filters = self.context.predicate.time_filters();
410        Ok(Box::new(PruneTimeIterator::new(
411            iter,
412            time_range,
413            time_filters,
414        )))
415    }
416}
417
418#[cfg(test)]
419mod tests {
420    use common_base::readable_size::ReadableSize;
421
422    use super::*;
423    use crate::flush::{WriteBufferManager, WriteBufferManagerImpl};
424
425    #[test]
426    fn test_deserialize_memtable_config() {
427        let s = r#"
428type = "partition_tree"
429index_max_keys_per_shard = 8192
430data_freeze_threshold = 1024
431dedup = true
432fork_dictionary_bytes = "512MiB"
433"#;
434        let config: MemtableConfig = toml::from_str(s).unwrap();
435        let MemtableConfig::PartitionTree(memtable_config) = config else {
436            unreachable!()
437        };
438        assert!(memtable_config.dedup);
439        assert_eq!(8192, memtable_config.index_max_keys_per_shard);
440        assert_eq!(1024, memtable_config.data_freeze_threshold);
441        assert_eq!(ReadableSize::mb(512), memtable_config.fork_dictionary_bytes);
442    }
443
444    #[test]
445    fn test_alloc_tracker_without_manager() {
446        let tracker = AllocTracker::new(None);
447        assert_eq!(0, tracker.bytes_allocated());
448        tracker.on_allocation(100);
449        assert_eq!(100, tracker.bytes_allocated());
450        tracker.on_allocation(200);
451        assert_eq!(300, tracker.bytes_allocated());
452
453        tracker.done_allocating();
454        assert_eq!(300, tracker.bytes_allocated());
455    }
456
457    #[test]
458    fn test_alloc_tracker_with_manager() {
459        let manager = Arc::new(WriteBufferManagerImpl::new(1000));
460        {
461            let tracker = AllocTracker::new(Some(manager.clone() as WriteBufferManagerRef));
462
463            tracker.on_allocation(100);
464            assert_eq!(100, tracker.bytes_allocated());
465            assert_eq!(100, manager.memory_usage());
466            assert_eq!(100, manager.mutable_usage());
467
468            for _ in 0..2 {
469                // Done allocating won't free the same memory multiple times.
470                tracker.done_allocating();
471                assert_eq!(100, manager.memory_usage());
472                assert_eq!(0, manager.mutable_usage());
473            }
474        }
475
476        assert_eq!(0, manager.memory_usage());
477        assert_eq!(0, manager.mutable_usage());
478    }
479
480    #[test]
481    fn test_alloc_tracker_without_done_allocating() {
482        let manager = Arc::new(WriteBufferManagerImpl::new(1000));
483        {
484            let tracker = AllocTracker::new(Some(manager.clone() as WriteBufferManagerRef));
485
486            tracker.on_allocation(100);
487            assert_eq!(100, tracker.bytes_allocated());
488            assert_eq!(100, manager.memory_usage());
489            assert_eq!(100, manager.mutable_usage());
490        }
491
492        assert_eq!(0, manager.memory_usage());
493        assert_eq!(0, manager.mutable_usage());
494    }
495}