mito2/
memtable.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Memtables are write buffers for regions.
16
17use std::collections::BTreeMap;
18use std::fmt;
19use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
20use std::sync::{Arc, Mutex};
21use std::time::Duration;
22
23pub use bulk::part::EncodedBulkPart;
24use bytes::Bytes;
25use common_time::Timestamp;
26use datatypes::arrow::record_batch::RecordBatch;
27use mito_codec::key_values::KeyValue;
28pub use mito_codec::key_values::KeyValues;
29use mito_codec::row_converter::{PrimaryKeyCodec, build_primary_key_codec};
30use serde::{Deserialize, Serialize};
31use store_api::metadata::RegionMetadataRef;
32use store_api::storage::{ColumnId, SequenceNumber, SequenceRange};
33
34use crate::config::MitoConfig;
35use crate::error::{Result, UnsupportedOperationSnafu};
36use crate::flush::WriteBufferManagerRef;
37use crate::memtable::bulk::{BulkMemtableBuilder, CompactDispatcher};
38use crate::memtable::partition_tree::{PartitionTreeConfig, PartitionTreeMemtableBuilder};
39use crate::memtable::time_series::TimeSeriesMemtableBuilder;
40use crate::metrics::WRITE_BUFFER_BYTES;
41use crate::read::Batch;
42use crate::read::batch_adapter::BatchToRecordBatchAdapter;
43use crate::read::prune::PruneTimeIterator;
44use crate::read::scan_region::PredicateGroup;
45use crate::region::options::{MemtableOptions, MergeMode, RegionOptions};
46use crate::sst::FormatType;
47use crate::sst::file::FileTimeRange;
48use crate::sst::parquet::SstInfo;
49use crate::sst::parquet::file_range::PreFilterMode;
50
51mod builder;
52pub mod bulk;
53pub mod partition_tree;
54pub mod simple_bulk_memtable;
55mod stats;
56pub mod time_partition;
57pub mod time_series;
58pub(crate) mod version;
59
60pub use bulk::part::{
61    BulkPart, BulkPartEncoder, BulkPartMeta, UnorderedPart, record_batch_estimated_size,
62    sort_primary_key_record_batch,
63};
64#[cfg(any(test, feature = "test"))]
65pub use time_partition::filter_record_batch;
66
67/// Id for memtables.
68///
69/// Should be unique under the same region.
70pub type MemtableId = u32;
71
72/// Config for memtables.
73#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
74#[serde(tag = "type", rename_all = "snake_case")]
75pub enum MemtableConfig {
76    PartitionTree(PartitionTreeConfig),
77    #[default]
78    TimeSeries,
79}
80
81/// Options for querying ranges from a memtable.
82#[derive(Clone)]
83pub struct RangesOptions {
84    /// Whether the ranges are being queried for flush.
85    pub for_flush: bool,
86    /// Mode to pre-filter columns in ranges.
87    pub pre_filter_mode: PreFilterMode,
88    /// Predicate to filter the data.
89    pub predicate: PredicateGroup,
90    /// Sequence range to filter the data.
91    pub sequence: Option<SequenceRange>,
92}
93
94impl Default for RangesOptions {
95    fn default() -> Self {
96        Self {
97            for_flush: false,
98            pre_filter_mode: PreFilterMode::All,
99            predicate: PredicateGroup::default(),
100            sequence: None,
101        }
102    }
103}
104
105impl RangesOptions {
106    /// Creates a new [RangesOptions] for flushing.
107    pub fn for_flush() -> Self {
108        Self {
109            for_flush: true,
110            pre_filter_mode: PreFilterMode::All,
111            predicate: PredicateGroup::default(),
112            sequence: None,
113        }
114    }
115
116    /// Sets the pre-filter mode.
117    #[must_use]
118    pub fn with_pre_filter_mode(mut self, pre_filter_mode: PreFilterMode) -> Self {
119        self.pre_filter_mode = pre_filter_mode;
120        self
121    }
122
123    /// Sets the predicate.
124    #[must_use]
125    pub fn with_predicate(mut self, predicate: PredicateGroup) -> Self {
126        self.predicate = predicate;
127        self
128    }
129
130    /// Sets the sequence range.
131    #[must_use]
132    pub fn with_sequence(mut self, sequence: Option<SequenceRange>) -> Self {
133        self.sequence = sequence;
134        self
135    }
136}
137
138#[derive(Debug, Default, Clone)]
139pub struct MemtableStats {
140    /// The estimated bytes allocated by this memtable from heap.
141    pub estimated_bytes: usize,
142    /// The inclusive time range that this memtable contains. It is None if
143    /// and only if the memtable is empty.
144    pub time_range: Option<(Timestamp, Timestamp)>,
145    /// Total rows in memtable
146    pub num_rows: usize,
147    /// Total number of ranges in the memtable.
148    pub num_ranges: usize,
149    /// The maximum sequence number in the memtable.
150    pub max_sequence: SequenceNumber,
151    /// Number of estimated timeseries in memtable.
152    pub series_count: usize,
153}
154
155impl MemtableStats {
156    /// Attaches the time range to the stats.
157    #[cfg(any(test, feature = "test"))]
158    pub fn with_time_range(mut self, time_range: Option<(Timestamp, Timestamp)>) -> Self {
159        self.time_range = time_range;
160        self
161    }
162
163    #[cfg(feature = "test")]
164    pub fn with_max_sequence(mut self, max_sequence: SequenceNumber) -> Self {
165        self.max_sequence = max_sequence;
166        self
167    }
168
169    /// Returns the estimated bytes allocated by this memtable.
170    pub fn bytes_allocated(&self) -> usize {
171        self.estimated_bytes
172    }
173
174    /// Returns the time range of the memtable.
175    pub fn time_range(&self) -> Option<(Timestamp, Timestamp)> {
176        self.time_range
177    }
178
179    /// Returns the num of total rows in memtable.
180    pub fn num_rows(&self) -> usize {
181        self.num_rows
182    }
183
184    /// Returns the number of ranges in the memtable.
185    pub fn num_ranges(&self) -> usize {
186        self.num_ranges
187    }
188
189    /// Returns the maximum sequence number in the memtable.
190    pub fn max_sequence(&self) -> SequenceNumber {
191        self.max_sequence
192    }
193
194    /// Series count in memtable.
195    pub fn series_count(&self) -> usize {
196        self.series_count
197    }
198}
199
200pub type BoxedBatchIterator = Box<dyn Iterator<Item = Result<Batch>> + Send>;
201
202pub type BoxedRecordBatchIterator = Box<dyn Iterator<Item = Result<RecordBatch>> + Send>;
203
204/// Ranges in a memtable.
205#[derive(Default)]
206pub struct MemtableRanges {
207    /// Range IDs and ranges.
208    pub ranges: BTreeMap<usize, MemtableRange>,
209}
210
211impl MemtableRanges {
212    /// Returns the total number of rows across all ranges.
213    pub fn num_rows(&self) -> usize {
214        self.ranges.values().map(|r| r.stats().num_rows()).sum()
215    }
216
217    /// Returns the total series count across all ranges.
218    pub fn series_count(&self) -> usize {
219        self.ranges.values().map(|r| r.stats().series_count()).sum()
220    }
221
222    /// Returns the maximum sequence number across all ranges.
223    pub fn max_sequence(&self) -> SequenceNumber {
224        self.ranges
225            .values()
226            .map(|r| r.stats().max_sequence())
227            .max()
228            .unwrap_or(0)
229    }
230}
231
232impl IterBuilder for MemtableRanges {
233    fn build(&self, _metrics: Option<MemScanMetrics>) -> Result<BoxedBatchIterator> {
234        UnsupportedOperationSnafu {
235            err_msg: "MemtableRanges does not support build iterator",
236        }
237        .fail()
238    }
239
240    fn is_record_batch(&self) -> bool {
241        self.ranges.values().all(|range| range.is_record_batch())
242    }
243}
244
245/// In memory write buffer.
246pub trait Memtable: Send + Sync + fmt::Debug {
247    /// Returns the id of this memtable.
248    fn id(&self) -> MemtableId;
249
250    /// Writes key values into the memtable.
251    fn write(&self, kvs: &KeyValues) -> Result<()>;
252
253    /// Writes one key value pair into the memtable.
254    fn write_one(&self, key_value: KeyValue) -> Result<()>;
255
256    /// Writes an encoded batch of into memtable.
257    fn write_bulk(&self, part: crate::memtable::bulk::part::BulkPart) -> Result<()>;
258
259    /// Scans the memtable.
260    /// `projection` selects columns to read, `None` means reading all columns.
261    /// `filters` are the predicates to be pushed down to memtable.
262    ///
263    /// # Note
264    /// This method should only be used for tests.
265    #[cfg(any(test, feature = "test"))]
266    fn iter(
267        &self,
268        projection: Option<&[ColumnId]>,
269        predicate: Option<table::predicate::Predicate>,
270        sequence: Option<SequenceRange>,
271    ) -> Result<BoxedBatchIterator>;
272
273    /// Returns the ranges in the memtable.
274    ///
275    /// The returned map contains the range id and the range after applying the predicate.
276    fn ranges(
277        &self,
278        projection: Option<&[ColumnId]>,
279        options: RangesOptions,
280    ) -> Result<MemtableRanges>;
281
282    /// Returns true if the memtable is empty.
283    fn is_empty(&self) -> bool;
284
285    /// Turns a mutable memtable into an immutable memtable.
286    fn freeze(&self) -> Result<()>;
287
288    /// Returns the [MemtableStats] info of Memtable.
289    fn stats(&self) -> MemtableStats;
290
291    /// Forks this (immutable) memtable and returns a new mutable memtable with specific memtable `id`.
292    ///
293    /// A region must freeze the memtable before invoking this method.
294    fn fork(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef;
295
296    /// Compacts the memtable.
297    ///
298    /// The `for_flush` is true when the flush job calls this method.
299    fn compact(&self, for_flush: bool) -> Result<()> {
300        let _ = for_flush;
301        Ok(())
302    }
303}
304
305pub type MemtableRef = Arc<dyn Memtable>;
306
307/// Builder to build a new [Memtable].
308pub trait MemtableBuilder: Send + Sync + fmt::Debug {
309    /// Builds a new memtable instance.
310    fn build(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef;
311
312    /// Returns true if the memtable supports bulk insert and benefits from it.
313    fn use_bulk_insert(&self, metadata: &RegionMetadataRef) -> bool {
314        let _metadata = metadata;
315        false
316    }
317}
318
319pub type MemtableBuilderRef = Arc<dyn MemtableBuilder>;
320
321/// Memtable memory allocation tracker.
322#[derive(Default)]
323pub struct AllocTracker {
324    write_buffer_manager: Option<WriteBufferManagerRef>,
325    /// Bytes allocated by the tracker.
326    bytes_allocated: AtomicUsize,
327    /// Whether allocating is done.
328    is_done_allocating: AtomicBool,
329}
330
331impl fmt::Debug for AllocTracker {
332    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
333        f.debug_struct("AllocTracker")
334            .field("bytes_allocated", &self.bytes_allocated)
335            .field("is_done_allocating", &self.is_done_allocating)
336            .finish()
337    }
338}
339
340impl AllocTracker {
341    /// Returns a new [AllocTracker].
342    pub fn new(write_buffer_manager: Option<WriteBufferManagerRef>) -> AllocTracker {
343        AllocTracker {
344            write_buffer_manager,
345            bytes_allocated: AtomicUsize::new(0),
346            is_done_allocating: AtomicBool::new(false),
347        }
348    }
349
350    /// Tracks `bytes` memory is allocated.
351    pub(crate) fn on_allocation(&self, bytes: usize) {
352        self.bytes_allocated.fetch_add(bytes, Ordering::Relaxed);
353        WRITE_BUFFER_BYTES.add(bytes as i64);
354        if let Some(write_buffer_manager) = &self.write_buffer_manager {
355            write_buffer_manager.reserve_mem(bytes);
356        }
357    }
358
359    /// Marks we have finished allocating memory so we can free it from
360    /// the write buffer's limit.
361    ///
362    /// The region MUST ensure that it calls this method inside the region writer's write lock.
363    pub(crate) fn done_allocating(&self) {
364        if let Some(write_buffer_manager) = &self.write_buffer_manager
365            && self
366                .is_done_allocating
367                .compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed)
368                .is_ok()
369        {
370            write_buffer_manager.schedule_free_mem(self.bytes_allocated.load(Ordering::Relaxed));
371        }
372    }
373
374    /// Returns bytes allocated.
375    pub(crate) fn bytes_allocated(&self) -> usize {
376        self.bytes_allocated.load(Ordering::Relaxed)
377    }
378
379    /// Returns the write buffer manager.
380    pub(crate) fn write_buffer_manager(&self) -> Option<WriteBufferManagerRef> {
381        self.write_buffer_manager.clone()
382    }
383}
384
385impl Drop for AllocTracker {
386    fn drop(&mut self) {
387        if !self.is_done_allocating.load(Ordering::Relaxed) {
388            self.done_allocating();
389        }
390
391        let bytes_allocated = self.bytes_allocated.load(Ordering::Relaxed);
392        WRITE_BUFFER_BYTES.sub(bytes_allocated as i64);
393
394        // Memory tracked by this tracker is freed.
395        if let Some(write_buffer_manager) = &self.write_buffer_manager {
396            write_buffer_manager.free_mem(bytes_allocated);
397        }
398    }
399}
400
401/// Provider of memtable builders for regions.
402#[derive(Clone)]
403pub(crate) struct MemtableBuilderProvider {
404    write_buffer_manager: Option<WriteBufferManagerRef>,
405    config: Arc<MitoConfig>,
406    compact_dispatcher: Arc<CompactDispatcher>,
407}
408
409impl MemtableBuilderProvider {
410    pub(crate) fn new(
411        write_buffer_manager: Option<WriteBufferManagerRef>,
412        config: Arc<MitoConfig>,
413    ) -> Self {
414        let compact_dispatcher =
415            Arc::new(CompactDispatcher::new(config.max_background_compactions));
416
417        Self {
418            write_buffer_manager,
419            config,
420            compact_dispatcher,
421        }
422    }
423
424    pub(crate) fn builder_for_options(&self, options: &RegionOptions) -> MemtableBuilderRef {
425        let dedup = options.need_dedup();
426        let merge_mode = options.merge_mode();
427        let flat_format = options
428            .sst_format
429            .map(|format| format == FormatType::Flat)
430            .unwrap_or(self.config.default_experimental_flat_format);
431        if flat_format {
432            if options.memtable.is_some() {
433                common_telemetry::info!(
434                    "Overriding memtable config, use BulkMemtable under flat format"
435                );
436            }
437
438            return Arc::new(
439                BulkMemtableBuilder::new(
440                    self.write_buffer_manager.clone(),
441                    !dedup, // append_mode: true if not dedup, false if dedup
442                    merge_mode,
443                )
444                .with_compact_dispatcher(self.compact_dispatcher.clone()),
445            );
446        }
447
448        // The format is not flat.
449        match &options.memtable {
450            Some(MemtableOptions::TimeSeries) => Arc::new(TimeSeriesMemtableBuilder::new(
451                self.write_buffer_manager.clone(),
452                dedup,
453                merge_mode,
454            )),
455            Some(MemtableOptions::PartitionTree(opts)) => {
456                Arc::new(PartitionTreeMemtableBuilder::new(
457                    PartitionTreeConfig {
458                        index_max_keys_per_shard: opts.index_max_keys_per_shard,
459                        data_freeze_threshold: opts.data_freeze_threshold,
460                        fork_dictionary_bytes: opts.fork_dictionary_bytes,
461                        dedup,
462                        merge_mode,
463                    },
464                    self.write_buffer_manager.clone(),
465                ))
466            }
467            None => self.default_primary_key_memtable_builder(dedup, merge_mode),
468        }
469    }
470
471    fn default_primary_key_memtable_builder(
472        &self,
473        dedup: bool,
474        merge_mode: MergeMode,
475    ) -> MemtableBuilderRef {
476        match &self.config.memtable {
477            MemtableConfig::PartitionTree(config) => {
478                let mut config = config.clone();
479                config.dedup = dedup;
480                Arc::new(PartitionTreeMemtableBuilder::new(
481                    config,
482                    self.write_buffer_manager.clone(),
483                ))
484            }
485            MemtableConfig::TimeSeries => Arc::new(TimeSeriesMemtableBuilder::new(
486                self.write_buffer_manager.clone(),
487                dedup,
488                merge_mode,
489            )),
490        }
491    }
492}
493
494/// Metrics for scanning a memtable.
495#[derive(Clone, Default)]
496pub struct MemScanMetrics(Arc<Mutex<MemScanMetricsData>>);
497
498impl MemScanMetrics {
499    /// Merges the metrics.
500    pub(crate) fn merge_inner(&self, inner: &MemScanMetricsData) {
501        let mut metrics = self.0.lock().unwrap();
502        metrics.total_series += inner.total_series;
503        metrics.num_rows += inner.num_rows;
504        metrics.num_batches += inner.num_batches;
505        metrics.scan_cost += inner.scan_cost;
506    }
507
508    /// Gets the metrics data.
509    pub(crate) fn data(&self) -> MemScanMetricsData {
510        self.0.lock().unwrap().clone()
511    }
512}
513
514#[derive(Clone, Default)]
515pub(crate) struct MemScanMetricsData {
516    /// Total series in the memtable.
517    pub(crate) total_series: usize,
518    /// Number of rows read.
519    pub(crate) num_rows: usize,
520    /// Number of batch read.
521    pub(crate) num_batches: usize,
522    /// Duration to scan the memtable.
523    pub(crate) scan_cost: Duration,
524}
525
526/// Encoded range in the memtable.
527pub struct EncodedRange {
528    /// Encoded file data.
529    pub data: Bytes,
530    /// Metadata of the encoded range.
531    pub sst_info: SstInfo,
532}
533
534/// Builder to build an iterator to read the range.
535/// The builder should know the projection and the predicate to build the iterator.
536pub trait IterBuilder: Send + Sync {
537    /// Returns the iterator to read the range.
538    fn build(&self, metrics: Option<MemScanMetrics>) -> Result<BoxedBatchIterator>;
539
540    /// Returns whether the iterator is a record batch iterator.
541    fn is_record_batch(&self) -> bool {
542        false
543    }
544
545    /// Returns the record batch iterator to read the range.
546    fn build_record_batch(
547        &self,
548        metrics: Option<MemScanMetrics>,
549    ) -> Result<BoxedRecordBatchIterator> {
550        let _metrics = metrics;
551        UnsupportedOperationSnafu {
552            err_msg: "Record batch iterator is not supported by this memtable",
553        }
554        .fail()
555    }
556
557    /// Returns the [EncodedRange] if the range is already encoded into SST.
558    fn encoded_range(&self) -> Option<EncodedRange> {
559        None
560    }
561}
562
563pub type BoxedIterBuilder = Box<dyn IterBuilder>;
564
565/// Computes the column IDs to read based on the projection.
566///
567/// If `projection` is `Some`, returns those column IDs. If `None`, returns all column IDs
568/// from the metadata.
569pub fn read_column_ids_from_projection(
570    metadata: &RegionMetadataRef,
571    projection: Option<&[ColumnId]>,
572) -> Vec<ColumnId> {
573    if let Some(projection) = projection {
574        projection.to_vec()
575    } else {
576        metadata
577            .column_metadatas
578            .iter()
579            .map(|c| c.column_id)
580            .collect()
581    }
582}
583
584/// Context to adapt batch iterators to record batch iterators for flat scan.
585pub struct BatchToRecordBatchContext {
586    metadata: RegionMetadataRef,
587    codec: Arc<dyn PrimaryKeyCodec>,
588    read_column_ids: Vec<ColumnId>,
589}
590
591impl BatchToRecordBatchContext {
592    /// Creates a new context for adapting batch iterators.
593    pub fn new(metadata: RegionMetadataRef, mut read_column_ids: Vec<ColumnId>) -> Self {
594        if read_column_ids.is_empty() {
595            read_column_ids.push(metadata.time_index_column().column_id);
596        }
597
598        let codec = build_primary_key_codec(&metadata);
599        Self {
600            metadata,
601            codec,
602            read_column_ids,
603        }
604    }
605
606    fn adapt_iter(&self, iter: BoxedBatchIterator) -> BoxedRecordBatchIterator {
607        Box::new(BatchToRecordBatchAdapter::new(
608            iter,
609            self.metadata.clone(),
610            self.codec.clone(),
611            &self.read_column_ids,
612        ))
613    }
614}
615
616/// Context shared by ranges of the same memtable.
617pub struct MemtableRangeContext {
618    /// Id of the memtable.
619    id: MemtableId,
620    /// Iterator builder.
621    builder: BoxedIterBuilder,
622    /// All filters.
623    predicate: PredicateGroup,
624    /// Optional context to adapt batch iterators for flat scans.
625    batch_to_record_batch: Option<Arc<BatchToRecordBatchContext>>,
626}
627
628pub type MemtableRangeContextRef = Arc<MemtableRangeContext>;
629
630impl MemtableRangeContext {
631    /// Creates a new [MemtableRangeContext].
632    pub fn new(id: MemtableId, builder: BoxedIterBuilder, predicate: PredicateGroup) -> Self {
633        Self::new_with_batch_to_record_batch(id, builder, predicate, None)
634    }
635
636    /// Creates a new [MemtableRangeContext] with optional adapter context.
637    pub fn new_with_batch_to_record_batch(
638        id: MemtableId,
639        builder: BoxedIterBuilder,
640        predicate: PredicateGroup,
641        batch_to_record_batch: Option<Arc<BatchToRecordBatchContext>>,
642    ) -> Self {
643        Self {
644            id,
645            builder,
646            predicate,
647            batch_to_record_batch,
648        }
649    }
650}
651
652/// A range in the memtable.
653#[derive(Clone)]
654pub struct MemtableRange {
655    /// Shared context.
656    context: MemtableRangeContextRef,
657    /// Statistics for this memtable range.
658    stats: MemtableStats,
659}
660
661impl MemtableRange {
662    /// Creates a new range from context and stats.
663    pub fn new(context: MemtableRangeContextRef, stats: MemtableStats) -> Self {
664        Self { context, stats }
665    }
666
667    /// Returns the statistics for this range.
668    pub fn stats(&self) -> &MemtableStats {
669        &self.stats
670    }
671
672    /// Returns the id of the memtable to read.
673    pub fn id(&self) -> MemtableId {
674        self.context.id
675    }
676
677    /// Builds an iterator to read the range.
678    /// Filters the result by the specific time range, this ensures memtable won't return
679    /// rows out of the time range when new rows are inserted.
680    pub fn build_prune_iter(
681        &self,
682        time_range: FileTimeRange,
683        metrics: Option<MemScanMetrics>,
684    ) -> Result<BoxedBatchIterator> {
685        let iter = self.context.builder.build(metrics)?;
686        let time_filters = self.context.predicate.time_filters();
687        Ok(Box::new(PruneTimeIterator::new(
688            iter,
689            time_range,
690            time_filters,
691        )))
692    }
693
694    /// Builds an iterator to read all rows in range.
695    pub fn build_iter(&self) -> Result<BoxedBatchIterator> {
696        self.context.builder.build(None)
697    }
698
699    /// Builds a record batch iterator to read rows in range.
700    ///
701    /// For mutable memtables (adapter path), applies time-range pruning to ensure rows
702    /// outside the time range are filtered, matching the behavior of `build_prune_iter`.
703    pub fn build_record_batch_iter(
704        &self,
705        time_range: Option<FileTimeRange>,
706        metrics: Option<MemScanMetrics>,
707    ) -> Result<BoxedRecordBatchIterator> {
708        if self.context.builder.is_record_batch() {
709            return self.context.builder.build_record_batch(metrics);
710        }
711
712        if let Some(context) = self.context.batch_to_record_batch.as_ref() {
713            let iter = self.context.builder.build(metrics)?;
714            let iter: BoxedBatchIterator = if let Some(time_range) = time_range {
715                let time_filters = self.context.predicate.time_filters();
716                Box::new(PruneTimeIterator::new(iter, time_range, time_filters))
717            } else {
718                iter
719            };
720            return Ok(context.adapt_iter(iter));
721        }
722
723        UnsupportedOperationSnafu {
724            err_msg: "Record batch iterator is not supported by this memtable",
725        }
726        .fail()
727    }
728
729    /// Returns whether the iterator is a record batch iterator.
730    pub fn is_record_batch(&self) -> bool {
731        self.context.builder.is_record_batch()
732    }
733
734    pub fn num_rows(&self) -> usize {
735        self.stats.num_rows
736    }
737
738    /// Returns the encoded range if available.
739    pub fn encoded(&self) -> Option<EncodedRange> {
740        self.context.builder.encoded_range()
741    }
742}
743
744#[cfg(test)]
745mod tests {
746    use std::sync::Arc;
747
748    use common_base::readable_size::ReadableSize;
749
750    use super::*;
751    use crate::flush::{WriteBufferManager, WriteBufferManagerImpl};
752
753    #[test]
754    fn test_deserialize_memtable_config() {
755        let s = r#"
756type = "partition_tree"
757index_max_keys_per_shard = 8192
758data_freeze_threshold = 1024
759dedup = true
760fork_dictionary_bytes = "512MiB"
761"#;
762        let config: MemtableConfig = toml::from_str(s).unwrap();
763        let MemtableConfig::PartitionTree(memtable_config) = config else {
764            unreachable!()
765        };
766        assert!(memtable_config.dedup);
767        assert_eq!(8192, memtable_config.index_max_keys_per_shard);
768        assert_eq!(1024, memtable_config.data_freeze_threshold);
769        assert_eq!(ReadableSize::mb(512), memtable_config.fork_dictionary_bytes);
770    }
771
772    #[test]
773    fn test_alloc_tracker_without_manager() {
774        let tracker = AllocTracker::new(None);
775        assert_eq!(0, tracker.bytes_allocated());
776        tracker.on_allocation(100);
777        assert_eq!(100, tracker.bytes_allocated());
778        tracker.on_allocation(200);
779        assert_eq!(300, tracker.bytes_allocated());
780
781        tracker.done_allocating();
782        assert_eq!(300, tracker.bytes_allocated());
783    }
784
785    #[test]
786    fn test_alloc_tracker_with_manager() {
787        let manager = Arc::new(WriteBufferManagerImpl::new(1000));
788        {
789            let tracker = AllocTracker::new(Some(manager.clone() as WriteBufferManagerRef));
790
791            tracker.on_allocation(100);
792            assert_eq!(100, tracker.bytes_allocated());
793            assert_eq!(100, manager.memory_usage());
794            assert_eq!(100, manager.mutable_usage());
795
796            for _ in 0..2 {
797                // Done allocating won't free the same memory multiple times.
798                tracker.done_allocating();
799                assert_eq!(100, manager.memory_usage());
800                assert_eq!(0, manager.mutable_usage());
801            }
802        }
803
804        assert_eq!(0, manager.memory_usage());
805        assert_eq!(0, manager.mutable_usage());
806    }
807
808    #[test]
809    fn test_alloc_tracker_without_done_allocating() {
810        let manager = Arc::new(WriteBufferManagerImpl::new(1000));
811        {
812            let tracker = AllocTracker::new(Some(manager.clone() as WriteBufferManagerRef));
813
814            tracker.on_allocation(100);
815            assert_eq!(100, tracker.bytes_allocated());
816            assert_eq!(100, manager.memory_usage());
817            assert_eq!(100, manager.mutable_usage());
818        }
819
820        assert_eq!(0, manager.memory_usage());
821        assert_eq!(0, manager.mutable_usage());
822    }
823}