mito2/
memtable.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Memtables are write buffers for regions.
16
17use std::collections::BTreeMap;
18use std::fmt;
19use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
20use std::sync::{Arc, Mutex};
21use std::time::Duration;
22
23pub use bulk::part::EncodedBulkPart;
24use common_time::Timestamp;
25use mito_codec::key_values::KeyValue;
26pub use mito_codec::key_values::KeyValues;
27use serde::{Deserialize, Serialize};
28use store_api::metadata::RegionMetadataRef;
29use store_api::storage::{ColumnId, SequenceNumber};
30
31use crate::config::MitoConfig;
32use crate::error::Result;
33use crate::flush::WriteBufferManagerRef;
34use crate::memtable::partition_tree::{PartitionTreeConfig, PartitionTreeMemtableBuilder};
35use crate::memtable::time_series::TimeSeriesMemtableBuilder;
36use crate::metrics::WRITE_BUFFER_BYTES;
37use crate::read::prune::PruneTimeIterator;
38use crate::read::scan_region::PredicateGroup;
39use crate::read::Batch;
40use crate::region::options::{MemtableOptions, MergeMode};
41use crate::sst::file::FileTimeRange;
42
43mod builder;
44pub mod bulk;
45pub mod partition_tree;
46pub mod simple_bulk_memtable;
47mod stats;
48pub mod time_partition;
49pub mod time_series;
50pub(crate) mod version;
51
52#[cfg(any(test, feature = "test"))]
53pub use bulk::part::BulkPart;
54#[cfg(any(test, feature = "test"))]
55pub use time_partition::filter_record_batch;
56
57/// Id for memtables.
58///
59/// Should be unique under the same region.
60pub type MemtableId = u32;
61
62/// Config for memtables.
63#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
64#[serde(tag = "type", rename_all = "snake_case")]
65pub enum MemtableConfig {
66    PartitionTree(PartitionTreeConfig),
67    TimeSeries,
68}
69
70impl Default for MemtableConfig {
71    fn default() -> Self {
72        Self::TimeSeries
73    }
74}
75
76#[derive(Debug, Default, Clone)]
77pub struct MemtableStats {
78    /// The estimated bytes allocated by this memtable from heap.
79    estimated_bytes: usize,
80    /// The inclusive time range that this memtable contains. It is None if
81    /// and only if the memtable is empty.
82    time_range: Option<(Timestamp, Timestamp)>,
83    /// Total rows in memtable
84    pub num_rows: usize,
85    /// Total number of ranges in the memtable.
86    pub num_ranges: usize,
87    /// The maximum sequence number in the memtable.
88    max_sequence: SequenceNumber,
89    /// Number of estimated timeseries in memtable.
90    series_count: usize,
91}
92
93impl MemtableStats {
94    /// Attaches the time range to the stats.
95    #[cfg(any(test, feature = "test"))]
96    pub fn with_time_range(mut self, time_range: Option<(Timestamp, Timestamp)>) -> Self {
97        self.time_range = time_range;
98        self
99    }
100
101    #[cfg(feature = "test")]
102    pub fn with_max_sequence(mut self, max_sequence: SequenceNumber) -> Self {
103        self.max_sequence = max_sequence;
104        self
105    }
106
107    /// Returns the estimated bytes allocated by this memtable.
108    pub fn bytes_allocated(&self) -> usize {
109        self.estimated_bytes
110    }
111
112    /// Returns the time range of the memtable.
113    pub fn time_range(&self) -> Option<(Timestamp, Timestamp)> {
114        self.time_range
115    }
116
117    /// Returns the num of total rows in memtable.
118    pub fn num_rows(&self) -> usize {
119        self.num_rows
120    }
121
122    /// Returns the number of ranges in the memtable.
123    pub fn num_ranges(&self) -> usize {
124        self.num_ranges
125    }
126
127    /// Returns the maximum sequence number in the memtable.
128    pub fn max_sequence(&self) -> SequenceNumber {
129        self.max_sequence
130    }
131
132    /// Series count in memtable.
133    pub fn series_count(&self) -> usize {
134        self.series_count
135    }
136}
137
138pub type BoxedBatchIterator = Box<dyn Iterator<Item = Result<Batch>> + Send>;
139
140/// Ranges in a memtable.
141#[derive(Default)]
142pub struct MemtableRanges {
143    /// Range IDs and ranges.
144    pub ranges: BTreeMap<usize, MemtableRange>,
145    /// Statistics of the memtable at the query time.
146    pub stats: MemtableStats,
147}
148
149/// In memory write buffer.
150pub trait Memtable: Send + Sync + fmt::Debug {
151    /// Returns the id of this memtable.
152    fn id(&self) -> MemtableId;
153
154    /// Writes key values into the memtable.
155    fn write(&self, kvs: &KeyValues) -> Result<()>;
156
157    /// Writes one key value pair into the memtable.
158    fn write_one(&self, key_value: KeyValue) -> Result<()>;
159
160    /// Writes an encoded batch of into memtable.
161    fn write_bulk(&self, part: crate::memtable::bulk::part::BulkPart) -> Result<()>;
162
163    /// Scans the memtable.
164    /// `projection` selects columns to read, `None` means reading all columns.
165    /// `filters` are the predicates to be pushed down to memtable.
166    ///
167    /// # Note
168    /// This method should only be used for tests.
169    #[cfg(any(test, feature = "test"))]
170    fn iter(
171        &self,
172        projection: Option<&[ColumnId]>,
173        predicate: Option<table::predicate::Predicate>,
174        sequence: Option<SequenceNumber>,
175    ) -> Result<BoxedBatchIterator>;
176
177    /// Returns the ranges in the memtable.
178    /// The returned map contains the range id and the range after applying the predicate.
179    fn ranges(
180        &self,
181        projection: Option<&[ColumnId]>,
182        predicate: PredicateGroup,
183        sequence: Option<SequenceNumber>,
184    ) -> Result<MemtableRanges>;
185
186    /// Returns true if the memtable is empty.
187    fn is_empty(&self) -> bool;
188
189    /// Turns a mutable memtable into an immutable memtable.
190    fn freeze(&self) -> Result<()>;
191
192    /// Returns the [MemtableStats] info of Memtable.
193    fn stats(&self) -> MemtableStats;
194
195    /// Forks this (immutable) memtable and returns a new mutable memtable with specific memtable `id`.
196    ///
197    /// A region must freeze the memtable before invoking this method.
198    fn fork(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef;
199}
200
201pub type MemtableRef = Arc<dyn Memtable>;
202
203/// Builder to build a new [Memtable].
204pub trait MemtableBuilder: Send + Sync + fmt::Debug {
205    /// Builds a new memtable instance.
206    fn build(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef;
207
208    /// Returns true if the memtable supports bulk insert and benefits from it.
209    fn use_bulk_insert(&self, metadata: &RegionMetadataRef) -> bool {
210        let _metadata = metadata;
211        false
212    }
213}
214
215pub type MemtableBuilderRef = Arc<dyn MemtableBuilder>;
216
217/// Memtable memory allocation tracker.
218#[derive(Default)]
219pub struct AllocTracker {
220    write_buffer_manager: Option<WriteBufferManagerRef>,
221    /// Bytes allocated by the tracker.
222    bytes_allocated: AtomicUsize,
223    /// Whether allocating is done.
224    is_done_allocating: AtomicBool,
225}
226
227impl fmt::Debug for AllocTracker {
228    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
229        f.debug_struct("AllocTracker")
230            .field("bytes_allocated", &self.bytes_allocated)
231            .field("is_done_allocating", &self.is_done_allocating)
232            .finish()
233    }
234}
235
236impl AllocTracker {
237    /// Returns a new [AllocTracker].
238    pub fn new(write_buffer_manager: Option<WriteBufferManagerRef>) -> AllocTracker {
239        AllocTracker {
240            write_buffer_manager,
241            bytes_allocated: AtomicUsize::new(0),
242            is_done_allocating: AtomicBool::new(false),
243        }
244    }
245
246    /// Tracks `bytes` memory is allocated.
247    pub(crate) fn on_allocation(&self, bytes: usize) {
248        self.bytes_allocated.fetch_add(bytes, Ordering::Relaxed);
249        WRITE_BUFFER_BYTES.add(bytes as i64);
250        if let Some(write_buffer_manager) = &self.write_buffer_manager {
251            write_buffer_manager.reserve_mem(bytes);
252        }
253    }
254
255    /// Marks we have finished allocating memory so we can free it from
256    /// the write buffer's limit.
257    ///
258    /// The region MUST ensure that it calls this method inside the region writer's write lock.
259    pub(crate) fn done_allocating(&self) {
260        if let Some(write_buffer_manager) = &self.write_buffer_manager {
261            if self
262                .is_done_allocating
263                .compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed)
264                .is_ok()
265            {
266                write_buffer_manager
267                    .schedule_free_mem(self.bytes_allocated.load(Ordering::Relaxed));
268            }
269        }
270    }
271
272    /// Returns bytes allocated.
273    pub(crate) fn bytes_allocated(&self) -> usize {
274        self.bytes_allocated.load(Ordering::Relaxed)
275    }
276
277    /// Returns the write buffer manager.
278    pub(crate) fn write_buffer_manager(&self) -> Option<WriteBufferManagerRef> {
279        self.write_buffer_manager.clone()
280    }
281}
282
283impl Drop for AllocTracker {
284    fn drop(&mut self) {
285        if !self.is_done_allocating.load(Ordering::Relaxed) {
286            self.done_allocating();
287        }
288
289        let bytes_allocated = self.bytes_allocated.load(Ordering::Relaxed);
290        WRITE_BUFFER_BYTES.sub(bytes_allocated as i64);
291
292        // Memory tracked by this tracker is freed.
293        if let Some(write_buffer_manager) = &self.write_buffer_manager {
294            write_buffer_manager.free_mem(bytes_allocated);
295        }
296    }
297}
298
299/// Provider of memtable builders for regions.
300#[derive(Clone)]
301pub(crate) struct MemtableBuilderProvider {
302    write_buffer_manager: Option<WriteBufferManagerRef>,
303    config: Arc<MitoConfig>,
304}
305
306impl MemtableBuilderProvider {
307    pub(crate) fn new(
308        write_buffer_manager: Option<WriteBufferManagerRef>,
309        config: Arc<MitoConfig>,
310    ) -> Self {
311        Self {
312            write_buffer_manager,
313            config,
314        }
315    }
316
317    pub(crate) fn builder_for_options(
318        &self,
319        options: Option<&MemtableOptions>,
320        dedup: bool,
321        merge_mode: MergeMode,
322    ) -> MemtableBuilderRef {
323        match options {
324            Some(MemtableOptions::TimeSeries) => Arc::new(TimeSeriesMemtableBuilder::new(
325                self.write_buffer_manager.clone(),
326                dedup,
327                merge_mode,
328            )),
329            Some(MemtableOptions::PartitionTree(opts)) => {
330                Arc::new(PartitionTreeMemtableBuilder::new(
331                    PartitionTreeConfig {
332                        index_max_keys_per_shard: opts.index_max_keys_per_shard,
333                        data_freeze_threshold: opts.data_freeze_threshold,
334                        fork_dictionary_bytes: opts.fork_dictionary_bytes,
335                        dedup,
336                        merge_mode,
337                    },
338                    self.write_buffer_manager.clone(),
339                ))
340            }
341            None => self.default_memtable_builder(dedup, merge_mode),
342        }
343    }
344
345    fn default_memtable_builder(&self, dedup: bool, merge_mode: MergeMode) -> MemtableBuilderRef {
346        match &self.config.memtable {
347            MemtableConfig::PartitionTree(config) => {
348                let mut config = config.clone();
349                config.dedup = dedup;
350                Arc::new(PartitionTreeMemtableBuilder::new(
351                    config,
352                    self.write_buffer_manager.clone(),
353                ))
354            }
355            MemtableConfig::TimeSeries => Arc::new(TimeSeriesMemtableBuilder::new(
356                self.write_buffer_manager.clone(),
357                dedup,
358                merge_mode,
359            )),
360        }
361    }
362}
363
364/// Metrics for scanning a memtable.
365#[derive(Clone, Default)]
366pub struct MemScanMetrics(Arc<Mutex<MemScanMetricsData>>);
367
368impl MemScanMetrics {
369    /// Merges the metrics.
370    pub(crate) fn merge_inner(&self, inner: &MemScanMetricsData) {
371        let mut metrics = self.0.lock().unwrap();
372        metrics.total_series += inner.total_series;
373        metrics.num_rows += inner.num_rows;
374        metrics.num_batches += inner.num_batches;
375        metrics.scan_cost += inner.scan_cost;
376    }
377
378    /// Gets the metrics data.
379    pub(crate) fn data(&self) -> MemScanMetricsData {
380        self.0.lock().unwrap().clone()
381    }
382}
383
384#[derive(Clone, Default)]
385pub(crate) struct MemScanMetricsData {
386    /// Total series in the memtable.
387    pub(crate) total_series: usize,
388    /// Number of rows read.
389    pub(crate) num_rows: usize,
390    /// Number of batch read.
391    pub(crate) num_batches: usize,
392    /// Duration to scan the memtable.
393    pub(crate) scan_cost: Duration,
394}
395
396/// Builder to build an iterator to read the range.
397/// The builder should know the projection and the predicate to build the iterator.
398pub trait IterBuilder: Send + Sync {
399    /// Returns the iterator to read the range.
400    fn build(&self, metrics: Option<MemScanMetrics>) -> Result<BoxedBatchIterator>;
401}
402
403pub type BoxedIterBuilder = Box<dyn IterBuilder>;
404
405/// Context shared by ranges of the same memtable.
406pub struct MemtableRangeContext {
407    /// Id of the memtable.
408    id: MemtableId,
409    /// Iterator builder.
410    builder: BoxedIterBuilder,
411    /// All filters.
412    predicate: PredicateGroup,
413}
414
415pub type MemtableRangeContextRef = Arc<MemtableRangeContext>;
416
417impl MemtableRangeContext {
418    /// Creates a new [MemtableRangeContext].
419    pub fn new(id: MemtableId, builder: BoxedIterBuilder, predicate: PredicateGroup) -> Self {
420        Self {
421            id,
422            builder,
423            predicate,
424        }
425    }
426}
427
428/// A range in the memtable.
429#[derive(Clone)]
430pub struct MemtableRange {
431    /// Shared context.
432    context: MemtableRangeContextRef,
433    /// Number of rows in current memtable range.
434    // todo(hl): use [MemtableRangeStats] instead.
435    num_rows: usize,
436}
437
438impl MemtableRange {
439    /// Creates a new range from context.
440    pub fn new(context: MemtableRangeContextRef, num_rows: usize) -> Self {
441        Self { context, num_rows }
442    }
443
444    /// Returns the id of the memtable to read.
445    pub fn id(&self) -> MemtableId {
446        self.context.id
447    }
448
449    /// Builds an iterator to read the range.
450    /// Filters the result by the specific time range, this ensures memtable won't return
451    /// rows out of the time range when new rows are inserted.
452    pub fn build_prune_iter(
453        &self,
454        time_range: FileTimeRange,
455        metrics: Option<MemScanMetrics>,
456    ) -> Result<BoxedBatchIterator> {
457        let iter = self.context.builder.build(metrics)?;
458        let time_filters = self.context.predicate.time_filters();
459        Ok(Box::new(PruneTimeIterator::new(
460            iter,
461            time_range,
462            time_filters,
463        )))
464    }
465
466    /// Builds an iterator to read all rows in range.
467    pub fn build_iter(&self) -> Result<BoxedBatchIterator> {
468        self.context.builder.build(None)
469    }
470
471    pub fn num_rows(&self) -> usize {
472        self.num_rows
473    }
474}
475
476#[cfg(test)]
477mod tests {
478    use common_base::readable_size::ReadableSize;
479
480    use super::*;
481    use crate::flush::{WriteBufferManager, WriteBufferManagerImpl};
482
483    #[test]
484    fn test_deserialize_memtable_config() {
485        let s = r#"
486type = "partition_tree"
487index_max_keys_per_shard = 8192
488data_freeze_threshold = 1024
489dedup = true
490fork_dictionary_bytes = "512MiB"
491"#;
492        let config: MemtableConfig = toml::from_str(s).unwrap();
493        let MemtableConfig::PartitionTree(memtable_config) = config else {
494            unreachable!()
495        };
496        assert!(memtable_config.dedup);
497        assert_eq!(8192, memtable_config.index_max_keys_per_shard);
498        assert_eq!(1024, memtable_config.data_freeze_threshold);
499        assert_eq!(ReadableSize::mb(512), memtable_config.fork_dictionary_bytes);
500    }
501
502    #[test]
503    fn test_alloc_tracker_without_manager() {
504        let tracker = AllocTracker::new(None);
505        assert_eq!(0, tracker.bytes_allocated());
506        tracker.on_allocation(100);
507        assert_eq!(100, tracker.bytes_allocated());
508        tracker.on_allocation(200);
509        assert_eq!(300, tracker.bytes_allocated());
510
511        tracker.done_allocating();
512        assert_eq!(300, tracker.bytes_allocated());
513    }
514
515    #[test]
516    fn test_alloc_tracker_with_manager() {
517        let manager = Arc::new(WriteBufferManagerImpl::new(1000));
518        {
519            let tracker = AllocTracker::new(Some(manager.clone() as WriteBufferManagerRef));
520
521            tracker.on_allocation(100);
522            assert_eq!(100, tracker.bytes_allocated());
523            assert_eq!(100, manager.memory_usage());
524            assert_eq!(100, manager.mutable_usage());
525
526            for _ in 0..2 {
527                // Done allocating won't free the same memory multiple times.
528                tracker.done_allocating();
529                assert_eq!(100, manager.memory_usage());
530                assert_eq!(0, manager.mutable_usage());
531            }
532        }
533
534        assert_eq!(0, manager.memory_usage());
535        assert_eq!(0, manager.mutable_usage());
536    }
537
538    #[test]
539    fn test_alloc_tracker_without_done_allocating() {
540        let manager = Arc::new(WriteBufferManagerImpl::new(1000));
541        {
542            let tracker = AllocTracker::new(Some(manager.clone() as WriteBufferManagerRef));
543
544            tracker.on_allocation(100);
545            assert_eq!(100, tracker.bytes_allocated());
546            assert_eq!(100, manager.memory_usage());
547            assert_eq!(100, manager.mutable_usage());
548        }
549
550        assert_eq!(0, manager.memory_usage());
551        assert_eq!(0, manager.mutable_usage());
552    }
553}