mito2/
memtable.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Memtables are write buffers for regions.
16
17use std::collections::BTreeMap;
18use std::fmt;
19use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
20use std::sync::Arc;
21
22pub use bulk::part::EncodedBulkPart;
23use common_time::Timestamp;
24use serde::{Deserialize, Serialize};
25use store_api::metadata::RegionMetadataRef;
26use store_api::storage::{ColumnId, SequenceNumber};
27use table::predicate::Predicate;
28
29use crate::config::MitoConfig;
30use crate::error::Result;
31use crate::flush::WriteBufferManagerRef;
32use crate::memtable::key_values::KeyValue;
33pub use crate::memtable::key_values::KeyValues;
34use crate::memtable::partition_tree::{PartitionTreeConfig, PartitionTreeMemtableBuilder};
35use crate::memtable::time_series::TimeSeriesMemtableBuilder;
36use crate::metrics::WRITE_BUFFER_BYTES;
37use crate::read::prune::PruneTimeIterator;
38use crate::read::scan_region::PredicateGroup;
39use crate::read::Batch;
40use crate::region::options::{MemtableOptions, MergeMode};
41use crate::sst::file::FileTimeRange;
42
43mod builder;
44pub mod bulk;
45pub mod key_values;
46pub mod partition_tree;
47mod simple_bulk_memtable;
48mod stats;
49pub mod time_partition;
50pub mod time_series;
51pub(crate) mod version;
52
53#[cfg(any(test, feature = "test"))]
54pub use bulk::part::BulkPart;
55#[cfg(any(test, feature = "test"))]
56pub use time_partition::filter_record_batch;
57
58/// Id for memtables.
59///
60/// Should be unique under the same region.
61pub type MemtableId = u32;
62
63/// Config for memtables.
64#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
65#[serde(tag = "type", rename_all = "snake_case")]
66pub enum MemtableConfig {
67    PartitionTree(PartitionTreeConfig),
68    TimeSeries,
69}
70
71impl Default for MemtableConfig {
72    fn default() -> Self {
73        Self::TimeSeries
74    }
75}
76
77#[derive(Debug, Default)]
78pub struct MemtableStats {
79    /// The estimated bytes allocated by this memtable from heap.
80    estimated_bytes: usize,
81    /// The inclusive time range that this memtable contains. It is None if
82    /// and only if the memtable is empty.
83    time_range: Option<(Timestamp, Timestamp)>,
84    /// Total rows in memtable
85    num_rows: usize,
86    /// Total number of ranges in the memtable.
87    num_ranges: usize,
88    /// The maximum sequence number in the memtable.
89    max_sequence: SequenceNumber,
90}
91
92impl MemtableStats {
93    /// Attaches the time range to the stats.
94    #[cfg(any(test, feature = "test"))]
95    pub(crate) fn with_time_range(mut self, time_range: Option<(Timestamp, Timestamp)>) -> Self {
96        self.time_range = time_range;
97        self
98    }
99
100    /// Returns the estimated bytes allocated by this memtable.
101    pub fn bytes_allocated(&self) -> usize {
102        self.estimated_bytes
103    }
104
105    /// Returns the time range of the memtable.
106    pub fn time_range(&self) -> Option<(Timestamp, Timestamp)> {
107        self.time_range
108    }
109
110    /// Returns the num of total rows in memtable.
111    pub fn num_rows(&self) -> usize {
112        self.num_rows
113    }
114
115    /// Returns the number of ranges in the memtable.
116    pub fn num_ranges(&self) -> usize {
117        self.num_ranges
118    }
119
120    /// Returns the maximum sequence number in the memtable.
121    pub fn max_sequence(&self) -> SequenceNumber {
122        self.max_sequence
123    }
124}
125
126pub type BoxedBatchIterator = Box<dyn Iterator<Item = Result<Batch>> + Send>;
127
128/// Ranges in a memtable.
129#[derive(Default)]
130pub struct MemtableRanges {
131    /// Range IDs and ranges.
132    pub ranges: BTreeMap<usize, MemtableRange>,
133    /// Statistics of the memtable at the query time.
134    pub stats: MemtableStats,
135}
136
137/// In memory write buffer.
138pub trait Memtable: Send + Sync + fmt::Debug {
139    /// Returns the id of this memtable.
140    fn id(&self) -> MemtableId;
141
142    /// Writes key values into the memtable.
143    fn write(&self, kvs: &KeyValues) -> Result<()>;
144
145    /// Writes one key value pair into the memtable.
146    fn write_one(&self, key_value: KeyValue) -> Result<()>;
147
148    /// Writes an encoded batch of into memtable.
149    fn write_bulk(&self, part: crate::memtable::bulk::part::BulkPart) -> Result<()>;
150
151    /// Scans the memtable.
152    /// `projection` selects columns to read, `None` means reading all columns.
153    /// `filters` are the predicates to be pushed down to memtable.
154    fn iter(
155        &self,
156        projection: Option<&[ColumnId]>,
157        predicate: Option<Predicate>,
158        sequence: Option<SequenceNumber>,
159    ) -> Result<BoxedBatchIterator>;
160
161    /// Returns the ranges in the memtable.
162    /// The returned map contains the range id and the range after applying the predicate.
163    fn ranges(
164        &self,
165        projection: Option<&[ColumnId]>,
166        predicate: PredicateGroup,
167        sequence: Option<SequenceNumber>,
168    ) -> Result<MemtableRanges>;
169
170    /// Returns true if the memtable is empty.
171    fn is_empty(&self) -> bool;
172
173    /// Turns a mutable memtable into an immutable memtable.
174    fn freeze(&self) -> Result<()>;
175
176    /// Returns the [MemtableStats] info of Memtable.
177    fn stats(&self) -> MemtableStats;
178
179    /// Forks this (immutable) memtable and returns a new mutable memtable with specific memtable `id`.
180    ///
181    /// A region must freeze the memtable before invoking this method.
182    fn fork(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef;
183}
184
185pub type MemtableRef = Arc<dyn Memtable>;
186
187/// Builder to build a new [Memtable].
188pub trait MemtableBuilder: Send + Sync + fmt::Debug {
189    /// Builds a new memtable instance.
190    fn build(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef;
191}
192
193pub type MemtableBuilderRef = Arc<dyn MemtableBuilder>;
194
195/// Memtable memory allocation tracker.
196#[derive(Default)]
197pub struct AllocTracker {
198    write_buffer_manager: Option<WriteBufferManagerRef>,
199    /// Bytes allocated by the tracker.
200    bytes_allocated: AtomicUsize,
201    /// Whether allocating is done.
202    is_done_allocating: AtomicBool,
203}
204
205impl fmt::Debug for AllocTracker {
206    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
207        f.debug_struct("AllocTracker")
208            .field("bytes_allocated", &self.bytes_allocated)
209            .field("is_done_allocating", &self.is_done_allocating)
210            .finish()
211    }
212}
213
214impl AllocTracker {
215    /// Returns a new [AllocTracker].
216    pub fn new(write_buffer_manager: Option<WriteBufferManagerRef>) -> AllocTracker {
217        AllocTracker {
218            write_buffer_manager,
219            bytes_allocated: AtomicUsize::new(0),
220            is_done_allocating: AtomicBool::new(false),
221        }
222    }
223
224    /// Tracks `bytes` memory is allocated.
225    pub(crate) fn on_allocation(&self, bytes: usize) {
226        self.bytes_allocated.fetch_add(bytes, Ordering::Relaxed);
227        WRITE_BUFFER_BYTES.add(bytes as i64);
228        if let Some(write_buffer_manager) = &self.write_buffer_manager {
229            write_buffer_manager.reserve_mem(bytes);
230        }
231    }
232
233    /// Marks we have finished allocating memory so we can free it from
234    /// the write buffer's limit.
235    ///
236    /// The region MUST ensure that it calls this method inside the region writer's write lock.
237    pub(crate) fn done_allocating(&self) {
238        if let Some(write_buffer_manager) = &self.write_buffer_manager {
239            if self
240                .is_done_allocating
241                .compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed)
242                .is_ok()
243            {
244                write_buffer_manager
245                    .schedule_free_mem(self.bytes_allocated.load(Ordering::Relaxed));
246            }
247        }
248    }
249
250    /// Returns bytes allocated.
251    pub(crate) fn bytes_allocated(&self) -> usize {
252        self.bytes_allocated.load(Ordering::Relaxed)
253    }
254
255    /// Returns the write buffer manager.
256    pub(crate) fn write_buffer_manager(&self) -> Option<WriteBufferManagerRef> {
257        self.write_buffer_manager.clone()
258    }
259}
260
261impl Drop for AllocTracker {
262    fn drop(&mut self) {
263        if !self.is_done_allocating.load(Ordering::Relaxed) {
264            self.done_allocating();
265        }
266
267        let bytes_allocated = self.bytes_allocated.load(Ordering::Relaxed);
268        WRITE_BUFFER_BYTES.sub(bytes_allocated as i64);
269
270        // Memory tracked by this tracker is freed.
271        if let Some(write_buffer_manager) = &self.write_buffer_manager {
272            write_buffer_manager.free_mem(bytes_allocated);
273        }
274    }
275}
276
277/// Provider of memtable builders for regions.
278#[derive(Clone)]
279pub(crate) struct MemtableBuilderProvider {
280    write_buffer_manager: Option<WriteBufferManagerRef>,
281    config: Arc<MitoConfig>,
282}
283
284impl MemtableBuilderProvider {
285    pub(crate) fn new(
286        write_buffer_manager: Option<WriteBufferManagerRef>,
287        config: Arc<MitoConfig>,
288    ) -> Self {
289        Self {
290            write_buffer_manager,
291            config,
292        }
293    }
294
295    pub(crate) fn builder_for_options(
296        &self,
297        options: Option<&MemtableOptions>,
298        dedup: bool,
299        merge_mode: MergeMode,
300    ) -> MemtableBuilderRef {
301        match options {
302            Some(MemtableOptions::TimeSeries) => Arc::new(TimeSeriesMemtableBuilder::new(
303                self.write_buffer_manager.clone(),
304                dedup,
305                merge_mode,
306            )),
307            Some(MemtableOptions::PartitionTree(opts)) => {
308                Arc::new(PartitionTreeMemtableBuilder::new(
309                    PartitionTreeConfig {
310                        index_max_keys_per_shard: opts.index_max_keys_per_shard,
311                        data_freeze_threshold: opts.data_freeze_threshold,
312                        fork_dictionary_bytes: opts.fork_dictionary_bytes,
313                        dedup,
314                        merge_mode,
315                    },
316                    self.write_buffer_manager.clone(),
317                ))
318            }
319            None => self.default_memtable_builder(dedup, merge_mode),
320        }
321    }
322
323    fn default_memtable_builder(&self, dedup: bool, merge_mode: MergeMode) -> MemtableBuilderRef {
324        match &self.config.memtable {
325            MemtableConfig::PartitionTree(config) => {
326                let mut config = config.clone();
327                config.dedup = dedup;
328                Arc::new(PartitionTreeMemtableBuilder::new(
329                    config,
330                    self.write_buffer_manager.clone(),
331                ))
332            }
333            MemtableConfig::TimeSeries => Arc::new(TimeSeriesMemtableBuilder::new(
334                self.write_buffer_manager.clone(),
335                dedup,
336                merge_mode,
337            )),
338        }
339    }
340}
341
342/// Builder to build an iterator to read the range.
343/// The builder should know the projection and the predicate to build the iterator.
344pub trait IterBuilder: Send + Sync {
345    /// Returns the iterator to read the range.
346    fn build(&self) -> Result<BoxedBatchIterator>;
347}
348
349pub type BoxedIterBuilder = Box<dyn IterBuilder>;
350
351/// Context shared by ranges of the same memtable.
352pub struct MemtableRangeContext {
353    /// Id of the memtable.
354    id: MemtableId,
355    /// Iterator builder.
356    builder: BoxedIterBuilder,
357    /// All filters.
358    predicate: PredicateGroup,
359}
360
361pub type MemtableRangeContextRef = Arc<MemtableRangeContext>;
362
363impl MemtableRangeContext {
364    /// Creates a new [MemtableRangeContext].
365    pub fn new(id: MemtableId, builder: BoxedIterBuilder, predicate: PredicateGroup) -> Self {
366        Self {
367            id,
368            builder,
369            predicate,
370        }
371    }
372}
373
374/// A range in the memtable.
375#[derive(Clone)]
376pub struct MemtableRange {
377    /// Shared context.
378    context: MemtableRangeContextRef,
379}
380
381impl MemtableRange {
382    /// Creates a new range from context.
383    pub fn new(context: MemtableRangeContextRef) -> Self {
384        Self { context }
385    }
386
387    /// Returns the id of the memtable to read.
388    pub fn id(&self) -> MemtableId {
389        self.context.id
390    }
391
392    /// Builds an iterator to read the range.
393    /// Filters the result by the specific time range, this ensures memtable won't return
394    /// rows out of the time range when new rows are inserted.
395    pub fn build_iter(&self, time_range: FileTimeRange) -> Result<BoxedBatchIterator> {
396        let iter = self.context.builder.build()?;
397        let time_filters = self.context.predicate.time_filters();
398        Ok(Box::new(PruneTimeIterator::new(
399            iter,
400            time_range,
401            time_filters,
402        )))
403    }
404}
405
406#[cfg(test)]
407mod tests {
408    use common_base::readable_size::ReadableSize;
409
410    use super::*;
411    use crate::flush::{WriteBufferManager, WriteBufferManagerImpl};
412
413    #[test]
414    fn test_deserialize_memtable_config() {
415        let s = r#"
416type = "partition_tree"
417index_max_keys_per_shard = 8192
418data_freeze_threshold = 1024
419dedup = true
420fork_dictionary_bytes = "512MiB"
421"#;
422        let config: MemtableConfig = toml::from_str(s).unwrap();
423        let MemtableConfig::PartitionTree(memtable_config) = config else {
424            unreachable!()
425        };
426        assert!(memtable_config.dedup);
427        assert_eq!(8192, memtable_config.index_max_keys_per_shard);
428        assert_eq!(1024, memtable_config.data_freeze_threshold);
429        assert_eq!(ReadableSize::mb(512), memtable_config.fork_dictionary_bytes);
430    }
431
432    #[test]
433    fn test_alloc_tracker_without_manager() {
434        let tracker = AllocTracker::new(None);
435        assert_eq!(0, tracker.bytes_allocated());
436        tracker.on_allocation(100);
437        assert_eq!(100, tracker.bytes_allocated());
438        tracker.on_allocation(200);
439        assert_eq!(300, tracker.bytes_allocated());
440
441        tracker.done_allocating();
442        assert_eq!(300, tracker.bytes_allocated());
443    }
444
445    #[test]
446    fn test_alloc_tracker_with_manager() {
447        let manager = Arc::new(WriteBufferManagerImpl::new(1000));
448        {
449            let tracker = AllocTracker::new(Some(manager.clone() as WriteBufferManagerRef));
450
451            tracker.on_allocation(100);
452            assert_eq!(100, tracker.bytes_allocated());
453            assert_eq!(100, manager.memory_usage());
454            assert_eq!(100, manager.mutable_usage());
455
456            for _ in 0..2 {
457                // Done allocating won't free the same memory multiple times.
458                tracker.done_allocating();
459                assert_eq!(100, manager.memory_usage());
460                assert_eq!(0, manager.mutable_usage());
461            }
462        }
463
464        assert_eq!(0, manager.memory_usage());
465        assert_eq!(0, manager.mutable_usage());
466    }
467
468    #[test]
469    fn test_alloc_tracker_without_done_allocating() {
470        let manager = Arc::new(WriteBufferManagerImpl::new(1000));
471        {
472            let tracker = AllocTracker::new(Some(manager.clone() as WriteBufferManagerRef));
473
474            tracker.on_allocation(100);
475            assert_eq!(100, tracker.bytes_allocated());
476            assert_eq!(100, manager.memory_usage());
477            assert_eq!(100, manager.mutable_usage());
478        }
479
480        assert_eq!(0, manager.memory_usage());
481        assert_eq!(0, manager.mutable_usage());
482    }
483}