Skip to main content

mito2/
memtable.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Memtables are write buffers for regions.
16
17use std::collections::BTreeMap;
18use std::fmt;
19use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
20use std::sync::{Arc, Mutex};
21use std::time::Duration;
22
23pub use bulk::part::EncodedBulkPart;
24use bytes::Bytes;
25use common_time::Timestamp;
26use datatypes::arrow::record_batch::RecordBatch;
27use mito_codec::key_values::KeyValue;
28pub use mito_codec::key_values::KeyValues;
29use mito_codec::row_converter::{PrimaryKeyCodec, build_primary_key_codec};
30use serde::{Deserialize, Serialize};
31use snafu::ensure;
32use store_api::metadata::RegionMetadataRef;
33use store_api::storage::{ColumnId, SequenceNumber, SequenceRange};
34
35use crate::config::MitoConfig;
36use crate::error::{Result, UnsupportedOperationSnafu};
37use crate::flush::WriteBufferManagerRef;
38use crate::memtable::bulk::{BulkMemtableBuilder, CompactDispatcher};
39use crate::memtable::partition_tree::{PartitionTreeConfig, PartitionTreeMemtableBuilder};
40use crate::memtable::time_series::TimeSeriesMemtableBuilder;
41use crate::metrics::WRITE_BUFFER_BYTES;
42use crate::read::Batch;
43use crate::read::batch_adapter::BatchToRecordBatchAdapter;
44use crate::read::prune::PruneTimeIterator;
45use crate::read::scan_region::PredicateGroup;
46use crate::region::options::{MemtableOptions, MergeMode, RegionOptions};
47use crate::sst::FormatType;
48use crate::sst::file::FileTimeRange;
49use crate::sst::parquet::SstInfo;
50use crate::sst::parquet::file_range::PreFilterMode;
51
52mod builder;
53pub mod bulk;
54pub mod partition_tree;
55pub mod simple_bulk_memtable;
56mod stats;
57pub mod time_partition;
58pub mod time_series;
59pub(crate) mod version;
60
61pub use bulk::part::{
62    BulkPart, BulkPartEncoder, BulkPartMeta, UnorderedPart, record_batch_estimated_size,
63    sort_primary_key_record_batch,
64};
65#[cfg(any(test, feature = "test"))]
66pub use time_partition::filter_record_batch;
67
68/// Id for memtables.
69///
70/// Should be unique under the same region.
71pub type MemtableId = u32;
72
73/// Config for memtables.
74#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
75#[serde(tag = "type", rename_all = "snake_case")]
76pub enum MemtableConfig {
77    PartitionTree(PartitionTreeConfig),
78    #[default]
79    TimeSeries,
80}
81
82/// Options for querying ranges from a memtable.
83#[derive(Clone)]
84pub struct RangesOptions {
85    /// Whether the ranges are being queried for flush.
86    pub for_flush: bool,
87    /// Mode to pre-filter columns in ranges.
88    pub pre_filter_mode: PreFilterMode,
89    /// Predicate to filter the data.
90    pub predicate: PredicateGroup,
91    /// Sequence range to filter the data.
92    pub sequence: Option<SequenceRange>,
93}
94
95impl Default for RangesOptions {
96    fn default() -> Self {
97        Self {
98            for_flush: false,
99            pre_filter_mode: PreFilterMode::All,
100            predicate: PredicateGroup::default(),
101            sequence: None,
102        }
103    }
104}
105
106impl RangesOptions {
107    /// Creates a new [RangesOptions] for flushing.
108    pub fn for_flush() -> Self {
109        Self {
110            for_flush: true,
111            pre_filter_mode: PreFilterMode::All,
112            predicate: PredicateGroup::default(),
113            sequence: None,
114        }
115    }
116
117    /// Sets the pre-filter mode.
118    #[must_use]
119    pub fn with_pre_filter_mode(mut self, pre_filter_mode: PreFilterMode) -> Self {
120        self.pre_filter_mode = pre_filter_mode;
121        self
122    }
123
124    /// Sets the predicate.
125    #[must_use]
126    pub fn with_predicate(mut self, predicate: PredicateGroup) -> Self {
127        self.predicate = predicate;
128        self
129    }
130
131    /// Sets the sequence range.
132    #[must_use]
133    pub fn with_sequence(mut self, sequence: Option<SequenceRange>) -> Self {
134        self.sequence = sequence;
135        self
136    }
137}
138
139#[derive(Debug, Default, Clone)]
140pub struct MemtableStats {
141    /// The estimated bytes allocated by this memtable from heap.
142    pub estimated_bytes: usize,
143    /// The inclusive time range that this memtable contains. It is None if
144    /// and only if the memtable is empty.
145    pub time_range: Option<(Timestamp, Timestamp)>,
146    /// Total rows in memtable
147    pub num_rows: usize,
148    /// Total number of ranges in the memtable.
149    pub num_ranges: usize,
150    /// The maximum sequence number in the memtable.
151    pub max_sequence: SequenceNumber,
152    /// Number of estimated timeseries in memtable.
153    pub series_count: usize,
154}
155
156impl MemtableStats {
157    /// Attaches the time range to the stats.
158    #[cfg(any(test, feature = "test"))]
159    pub fn with_time_range(mut self, time_range: Option<(Timestamp, Timestamp)>) -> Self {
160        self.time_range = time_range;
161        self
162    }
163
164    #[cfg(feature = "test")]
165    pub fn with_max_sequence(mut self, max_sequence: SequenceNumber) -> Self {
166        self.max_sequence = max_sequence;
167        self
168    }
169
170    /// Returns the estimated bytes allocated by this memtable.
171    pub fn bytes_allocated(&self) -> usize {
172        self.estimated_bytes
173    }
174
175    /// Returns the time range of the memtable.
176    pub fn time_range(&self) -> Option<(Timestamp, Timestamp)> {
177        self.time_range
178    }
179
180    /// Returns the num of total rows in memtable.
181    pub fn num_rows(&self) -> usize {
182        self.num_rows
183    }
184
185    /// Returns the number of ranges in the memtable.
186    pub fn num_ranges(&self) -> usize {
187        self.num_ranges
188    }
189
190    /// Returns the maximum sequence number in the memtable.
191    pub fn max_sequence(&self) -> SequenceNumber {
192        self.max_sequence
193    }
194
195    /// Series count in memtable.
196    pub fn series_count(&self) -> usize {
197        self.series_count
198    }
199}
200
201pub type BoxedBatchIterator = Box<dyn Iterator<Item = Result<Batch>> + Send>;
202
203pub type BoxedRecordBatchIterator = Box<dyn Iterator<Item = Result<RecordBatch>> + Send>;
204
205/// Ranges in a memtable.
206#[derive(Default)]
207pub struct MemtableRanges {
208    /// Range IDs and ranges.
209    pub ranges: BTreeMap<usize, MemtableRange>,
210}
211
212impl MemtableRanges {
213    /// Returns the total number of rows across all ranges.
214    pub fn num_rows(&self) -> usize {
215        self.ranges.values().map(|r| r.stats().num_rows()).sum()
216    }
217
218    /// Returns the total series count across all ranges.
219    pub fn series_count(&self) -> usize {
220        self.ranges.values().map(|r| r.stats().series_count()).sum()
221    }
222
223    /// Returns the maximum sequence number across all ranges.
224    pub fn max_sequence(&self) -> SequenceNumber {
225        self.ranges
226            .values()
227            .map(|r| r.stats().max_sequence())
228            .max()
229            .unwrap_or(0)
230    }
231}
232
233impl IterBuilder for MemtableRanges {
234    fn build(&self, _metrics: Option<MemScanMetrics>) -> Result<BoxedBatchIterator> {
235        ensure!(
236            self.ranges.len() == 1,
237            UnsupportedOperationSnafu {
238                err_msg: format!(
239                    "Building an iterator from MemtableRanges expects 1 range, but got {}",
240                    self.ranges.len()
241                ),
242            }
243        );
244
245        self.ranges.values().next().unwrap().build_iter()
246    }
247
248    fn is_record_batch(&self) -> bool {
249        self.ranges.values().all(|range| range.is_record_batch())
250    }
251}
252
253/// In memory write buffer.
254pub trait Memtable: Send + Sync + fmt::Debug {
255    /// Returns the id of this memtable.
256    fn id(&self) -> MemtableId;
257
258    /// Writes key values into the memtable.
259    fn write(&self, kvs: &KeyValues) -> Result<()>;
260
261    /// Writes one key value pair into the memtable.
262    fn write_one(&self, key_value: KeyValue) -> Result<()>;
263
264    /// Writes an encoded batch of into memtable.
265    fn write_bulk(&self, part: crate::memtable::bulk::part::BulkPart) -> Result<()>;
266
267    /// Returns the ranges in the memtable.
268    ///
269    /// The returned map contains the range id and the range after applying the predicate.
270    fn ranges(
271        &self,
272        projection: Option<&[ColumnId]>,
273        options: RangesOptions,
274    ) -> Result<MemtableRanges>;
275
276    /// Returns true if the memtable is empty.
277    fn is_empty(&self) -> bool;
278
279    /// Turns a mutable memtable into an immutable memtable.
280    fn freeze(&self) -> Result<()>;
281
282    /// Returns the [MemtableStats] info of Memtable.
283    fn stats(&self) -> MemtableStats;
284
285    /// Forks this (immutable) memtable and returns a new mutable memtable with specific memtable `id`.
286    ///
287    /// A region must freeze the memtable before invoking this method.
288    fn fork(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef;
289
290    /// Compacts the memtable.
291    ///
292    /// The `for_flush` is true when the flush job calls this method.
293    fn compact(&self, for_flush: bool) -> Result<()> {
294        let _ = for_flush;
295        Ok(())
296    }
297}
298
299pub type MemtableRef = Arc<dyn Memtable>;
300
301/// Builder to build a new [Memtable].
302pub trait MemtableBuilder: Send + Sync + fmt::Debug {
303    /// Builds a new memtable instance.
304    fn build(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef;
305
306    /// Returns true if the memtable supports bulk insert and benefits from it.
307    fn use_bulk_insert(&self, metadata: &RegionMetadataRef) -> bool {
308        let _metadata = metadata;
309        false
310    }
311}
312
313pub type MemtableBuilderRef = Arc<dyn MemtableBuilder>;
314
315/// Memtable memory allocation tracker.
316#[derive(Default)]
317pub struct AllocTracker {
318    write_buffer_manager: Option<WriteBufferManagerRef>,
319    /// Bytes allocated by the tracker.
320    bytes_allocated: AtomicUsize,
321    /// Whether allocating is done.
322    is_done_allocating: AtomicBool,
323}
324
325impl fmt::Debug for AllocTracker {
326    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
327        f.debug_struct("AllocTracker")
328            .field("bytes_allocated", &self.bytes_allocated)
329            .field("is_done_allocating", &self.is_done_allocating)
330            .finish()
331    }
332}
333
334impl AllocTracker {
335    /// Returns a new [AllocTracker].
336    pub fn new(write_buffer_manager: Option<WriteBufferManagerRef>) -> AllocTracker {
337        AllocTracker {
338            write_buffer_manager,
339            bytes_allocated: AtomicUsize::new(0),
340            is_done_allocating: AtomicBool::new(false),
341        }
342    }
343
344    /// Tracks `bytes` memory is allocated.
345    pub(crate) fn on_allocation(&self, bytes: usize) {
346        self.bytes_allocated.fetch_add(bytes, Ordering::Relaxed);
347        WRITE_BUFFER_BYTES.add(bytes as i64);
348        if let Some(write_buffer_manager) = &self.write_buffer_manager {
349            write_buffer_manager.reserve_mem(bytes);
350        }
351    }
352
353    /// Marks we have finished allocating memory so we can free it from
354    /// the write buffer's limit.
355    ///
356    /// The region MUST ensure that it calls this method inside the region writer's write lock.
357    pub(crate) fn done_allocating(&self) {
358        if let Some(write_buffer_manager) = &self.write_buffer_manager
359            && self
360                .is_done_allocating
361                .compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed)
362                .is_ok()
363        {
364            write_buffer_manager.schedule_free_mem(self.bytes_allocated.load(Ordering::Relaxed));
365        }
366    }
367
368    /// Returns bytes allocated.
369    pub(crate) fn bytes_allocated(&self) -> usize {
370        self.bytes_allocated.load(Ordering::Relaxed)
371    }
372
373    /// Returns the write buffer manager.
374    pub(crate) fn write_buffer_manager(&self) -> Option<WriteBufferManagerRef> {
375        self.write_buffer_manager.clone()
376    }
377}
378
379impl Drop for AllocTracker {
380    fn drop(&mut self) {
381        if !self.is_done_allocating.load(Ordering::Relaxed) {
382            self.done_allocating();
383        }
384
385        let bytes_allocated = self.bytes_allocated.load(Ordering::Relaxed);
386        WRITE_BUFFER_BYTES.sub(bytes_allocated as i64);
387
388        // Memory tracked by this tracker is freed.
389        if let Some(write_buffer_manager) = &self.write_buffer_manager {
390            write_buffer_manager.free_mem(bytes_allocated);
391        }
392    }
393}
394
395/// Provider of memtable builders for regions.
396#[derive(Clone)]
397pub(crate) struct MemtableBuilderProvider {
398    write_buffer_manager: Option<WriteBufferManagerRef>,
399    config: Arc<MitoConfig>,
400    compact_dispatcher: Arc<CompactDispatcher>,
401}
402
403impl MemtableBuilderProvider {
404    pub(crate) fn new(
405        write_buffer_manager: Option<WriteBufferManagerRef>,
406        config: Arc<MitoConfig>,
407    ) -> Self {
408        let compact_dispatcher =
409            Arc::new(CompactDispatcher::new(config.max_background_compactions));
410
411        Self {
412            write_buffer_manager,
413            config,
414            compact_dispatcher,
415        }
416    }
417
418    pub(crate) fn builder_for_options(&self, options: &RegionOptions) -> MemtableBuilderRef {
419        let dedup = options.need_dedup();
420        let merge_mode = options.merge_mode();
421        let flat_format = options
422            .sst_format
423            .map(|format| format == FormatType::Flat)
424            .unwrap_or(self.config.default_flat_format);
425        if flat_format {
426            if options.memtable.is_some() {
427                common_telemetry::info!(
428                    "Overriding memtable config, use BulkMemtable under flat format"
429                );
430            }
431
432            return Arc::new(
433                BulkMemtableBuilder::new(
434                    self.write_buffer_manager.clone(),
435                    !dedup, // append_mode: true if not dedup, false if dedup
436                    merge_mode,
437                )
438                .with_compact_dispatcher(self.compact_dispatcher.clone()),
439            );
440        }
441
442        // The format is not flat.
443        match &options.memtable {
444            Some(MemtableOptions::TimeSeries) => Arc::new(TimeSeriesMemtableBuilder::new(
445                self.write_buffer_manager.clone(),
446                dedup,
447                merge_mode,
448            )),
449            Some(MemtableOptions::PartitionTree(opts)) => {
450                Arc::new(PartitionTreeMemtableBuilder::new(
451                    PartitionTreeConfig {
452                        index_max_keys_per_shard: opts.index_max_keys_per_shard,
453                        data_freeze_threshold: opts.data_freeze_threshold,
454                        fork_dictionary_bytes: opts.fork_dictionary_bytes,
455                        dedup,
456                        merge_mode,
457                    },
458                    self.write_buffer_manager.clone(),
459                ))
460            }
461            None => self.default_primary_key_memtable_builder(dedup, merge_mode),
462        }
463    }
464
465    fn default_primary_key_memtable_builder(
466        &self,
467        dedup: bool,
468        merge_mode: MergeMode,
469    ) -> MemtableBuilderRef {
470        match &self.config.memtable {
471            MemtableConfig::PartitionTree(config) => {
472                let mut config = config.clone();
473                config.dedup = dedup;
474                Arc::new(PartitionTreeMemtableBuilder::new(
475                    config,
476                    self.write_buffer_manager.clone(),
477                ))
478            }
479            MemtableConfig::TimeSeries => Arc::new(TimeSeriesMemtableBuilder::new(
480                self.write_buffer_manager.clone(),
481                dedup,
482                merge_mode,
483            )),
484        }
485    }
486}
487
488/// Metrics for scanning a memtable.
489#[derive(Clone, Default)]
490pub struct MemScanMetrics(Arc<Mutex<MemScanMetricsData>>);
491
492impl MemScanMetrics {
493    /// Merges the metrics.
494    pub(crate) fn merge_inner(&self, inner: &MemScanMetricsData) {
495        let mut metrics = self.0.lock().unwrap();
496        metrics.total_series += inner.total_series;
497        metrics.num_rows += inner.num_rows;
498        metrics.num_batches += inner.num_batches;
499        metrics.scan_cost += inner.scan_cost;
500        metrics.prefilter_cost += inner.prefilter_cost;
501        metrics.prefilter_rows_filtered += inner.prefilter_rows_filtered;
502    }
503
504    /// Gets the metrics data.
505    pub(crate) fn data(&self) -> MemScanMetricsData {
506        self.0.lock().unwrap().clone()
507    }
508}
509
510#[derive(Clone, Default)]
511pub(crate) struct MemScanMetricsData {
512    /// Total series in the memtable.
513    pub(crate) total_series: usize,
514    /// Number of rows read.
515    pub(crate) num_rows: usize,
516    /// Number of batch read.
517    pub(crate) num_batches: usize,
518    /// Duration to scan the memtable.
519    pub(crate) scan_cost: Duration,
520    /// Duration of prefilter in memtable scan.
521    pub(crate) prefilter_cost: Duration,
522    /// Number of rows filtered by prefilter in memtable scan.
523    pub(crate) prefilter_rows_filtered: usize,
524}
525
526/// Encoded range in the memtable.
527pub struct EncodedRange {
528    /// Encoded file data.
529    pub data: Bytes,
530    /// Metadata of the encoded range.
531    pub sst_info: SstInfo,
532}
533
534/// Builder to build an iterator to read the range.
535/// The builder should know the projection and the predicate to build the iterator.
536pub trait IterBuilder: Send + Sync {
537    /// Returns the iterator to read the range.
538    fn build(&self, metrics: Option<MemScanMetrics>) -> Result<BoxedBatchIterator>;
539
540    /// Returns whether the iterator is a record batch iterator.
541    fn is_record_batch(&self) -> bool {
542        false
543    }
544
545    /// Returns the record batch iterator to read the range.
546    /// ## Note
547    /// Implementations should ensure the iterator yields data within given time range.
548    fn build_record_batch(
549        &self,
550        time_range: Option<(Timestamp, Timestamp)>,
551        metrics: Option<MemScanMetrics>,
552    ) -> Result<BoxedRecordBatchIterator> {
553        let _metrics = metrics;
554        let _ = time_range;
555        UnsupportedOperationSnafu {
556            err_msg: "Record batch iterator is not supported by this memtable",
557        }
558        .fail()
559    }
560
561    /// Returns the [EncodedRange] if the range is already encoded into SST.
562    fn encoded_range(&self) -> Option<EncodedRange> {
563        None
564    }
565}
566
567pub type BoxedIterBuilder = Box<dyn IterBuilder>;
568
569/// Computes the column IDs to read based on the projection.
570///
571/// If `projection` is `Some`, returns those column IDs. If `None`, returns all column IDs
572/// from the metadata.
573pub fn read_column_ids_from_projection(
574    metadata: &RegionMetadataRef,
575    projection: Option<&[ColumnId]>,
576) -> Vec<ColumnId> {
577    if let Some(projection) = projection {
578        projection.to_vec()
579    } else {
580        metadata
581            .column_metadatas
582            .iter()
583            .map(|c| c.column_id)
584            .collect()
585    }
586}
587
588/// Context to adapt batch iterators to record batch iterators for flat scan.
589pub struct BatchToRecordBatchContext {
590    metadata: RegionMetadataRef,
591    codec: Arc<dyn PrimaryKeyCodec>,
592    read_column_ids: Vec<ColumnId>,
593}
594
595impl BatchToRecordBatchContext {
596    /// Creates a new context for adapting batch iterators.
597    pub fn new(metadata: RegionMetadataRef, mut read_column_ids: Vec<ColumnId>) -> Self {
598        if read_column_ids.is_empty() {
599            read_column_ids.push(metadata.time_index_column().column_id);
600        }
601
602        let codec = build_primary_key_codec(&metadata);
603        Self {
604            metadata,
605            codec,
606            read_column_ids,
607        }
608    }
609
610    fn adapt_iter(&self, iter: BoxedBatchIterator) -> BoxedRecordBatchIterator {
611        Box::new(BatchToRecordBatchAdapter::new(
612            iter,
613            self.metadata.clone(),
614            self.codec.clone(),
615            &self.read_column_ids,
616        ))
617    }
618}
619
620/// Context shared by ranges of the same memtable.
621pub struct MemtableRangeContext {
622    /// Id of the memtable.
623    id: MemtableId,
624    /// Iterator builder.
625    builder: BoxedIterBuilder,
626    /// All filters.
627    predicate: PredicateGroup,
628    /// Optional context to adapt batch iterators for flat scans.
629    batch_to_record_batch: Option<Arc<BatchToRecordBatchContext>>,
630}
631
632pub type MemtableRangeContextRef = Arc<MemtableRangeContext>;
633
634impl MemtableRangeContext {
635    /// Creates a new [MemtableRangeContext].
636    pub fn new(id: MemtableId, builder: BoxedIterBuilder, predicate: PredicateGroup) -> Self {
637        Self::new_with_batch_to_record_batch(id, builder, predicate, None)
638    }
639
640    /// Creates a new [MemtableRangeContext] with optional adapter context.
641    pub fn new_with_batch_to_record_batch(
642        id: MemtableId,
643        builder: BoxedIterBuilder,
644        predicate: PredicateGroup,
645        batch_to_record_batch: Option<Arc<BatchToRecordBatchContext>>,
646    ) -> Self {
647        Self {
648            id,
649            builder,
650            predicate,
651            batch_to_record_batch,
652        }
653    }
654}
655
656/// A range in the memtable.
657#[derive(Clone)]
658pub struct MemtableRange {
659    /// Shared context.
660    context: MemtableRangeContextRef,
661    /// Statistics for this memtable range.
662    stats: MemtableStats,
663}
664
665impl MemtableRange {
666    /// Creates a new range from context and stats.
667    pub fn new(context: MemtableRangeContextRef, stats: MemtableStats) -> Self {
668        Self { context, stats }
669    }
670
671    /// Returns the statistics for this range.
672    pub fn stats(&self) -> &MemtableStats {
673        &self.stats
674    }
675
676    /// Returns the id of the memtable to read.
677    pub fn id(&self) -> MemtableId {
678        self.context.id
679    }
680
681    /// Builds an iterator to read the range.
682    /// Filters the result by the specific time range, this ensures memtable won't return
683    /// rows out of the time range when new rows are inserted.
684    pub fn build_prune_iter(
685        &self,
686        time_range: FileTimeRange,
687        metrics: Option<MemScanMetrics>,
688    ) -> Result<BoxedBatchIterator> {
689        let iter = self.context.builder.build(metrics)?;
690        let time_filters = self.context.predicate.time_filters();
691        Ok(Box::new(PruneTimeIterator::new(
692            iter,
693            time_range,
694            time_filters,
695        )))
696    }
697
698    /// Builds an iterator to read all rows in range.
699    pub fn build_iter(&self) -> Result<BoxedBatchIterator> {
700        self.context.builder.build(None)
701    }
702
703    /// Builds a record batch iterator to read rows in range.
704    ///
705    /// For mutable memtables (adapter path), applies time-range pruning to ensure rows
706    /// outside the time range are filtered, matching the behavior of `build_prune_iter`.
707    pub fn build_record_batch_iter(
708        &self,
709        time_range: Option<FileTimeRange>,
710        metrics: Option<MemScanMetrics>,
711    ) -> Result<BoxedRecordBatchIterator> {
712        if self.context.builder.is_record_batch() {
713            return self.context.builder.build_record_batch(time_range, metrics);
714        }
715
716        if let Some(context) = self.context.batch_to_record_batch.as_ref() {
717            let iter = self.context.builder.build(metrics)?;
718            let iter: BoxedBatchIterator = if let Some(time_range) = time_range {
719                let time_filters = self.context.predicate.time_filters();
720                Box::new(PruneTimeIterator::new(iter, time_range, time_filters))
721            } else {
722                iter
723            };
724            return Ok(context.adapt_iter(iter));
725        }
726
727        UnsupportedOperationSnafu {
728            err_msg: "Record batch iterator is not supported by this memtable",
729        }
730        .fail()
731    }
732
733    /// Returns whether the iterator is a record batch iterator.
734    pub fn is_record_batch(&self) -> bool {
735        self.context.builder.is_record_batch()
736    }
737
738    pub fn num_rows(&self) -> usize {
739        self.stats.num_rows
740    }
741
742    /// Returns the encoded range if available.
743    pub fn encoded(&self) -> Option<EncodedRange> {
744        self.context.builder.encoded_range()
745    }
746}
747
748#[cfg(test)]
749mod tests {
750    use std::sync::Arc;
751
752    use common_base::readable_size::ReadableSize;
753
754    use super::*;
755    use crate::flush::{WriteBufferManager, WriteBufferManagerImpl};
756
757    #[test]
758    fn test_deserialize_memtable_config() {
759        let s = r#"
760type = "partition_tree"
761index_max_keys_per_shard = 8192
762data_freeze_threshold = 1024
763dedup = true
764fork_dictionary_bytes = "512MiB"
765"#;
766        let config: MemtableConfig = toml::from_str(s).unwrap();
767        let MemtableConfig::PartitionTree(memtable_config) = config else {
768            unreachable!()
769        };
770        assert!(memtable_config.dedup);
771        assert_eq!(8192, memtable_config.index_max_keys_per_shard);
772        assert_eq!(1024, memtable_config.data_freeze_threshold);
773        assert_eq!(ReadableSize::mb(512), memtable_config.fork_dictionary_bytes);
774    }
775
776    #[test]
777    fn test_alloc_tracker_without_manager() {
778        let tracker = AllocTracker::new(None);
779        assert_eq!(0, tracker.bytes_allocated());
780        tracker.on_allocation(100);
781        assert_eq!(100, tracker.bytes_allocated());
782        tracker.on_allocation(200);
783        assert_eq!(300, tracker.bytes_allocated());
784
785        tracker.done_allocating();
786        assert_eq!(300, tracker.bytes_allocated());
787    }
788
789    #[test]
790    fn test_alloc_tracker_with_manager() {
791        let manager = Arc::new(WriteBufferManagerImpl::new(1000));
792        {
793            let tracker = AllocTracker::new(Some(manager.clone() as WriteBufferManagerRef));
794
795            tracker.on_allocation(100);
796            assert_eq!(100, tracker.bytes_allocated());
797            assert_eq!(100, manager.memory_usage());
798            assert_eq!(100, manager.mutable_usage());
799
800            for _ in 0..2 {
801                // Done allocating won't free the same memory multiple times.
802                tracker.done_allocating();
803                assert_eq!(100, manager.memory_usage());
804                assert_eq!(0, manager.mutable_usage());
805            }
806        }
807
808        assert_eq!(0, manager.memory_usage());
809        assert_eq!(0, manager.mutable_usage());
810    }
811
812    #[test]
813    fn test_alloc_tracker_without_done_allocating() {
814        let manager = Arc::new(WriteBufferManagerImpl::new(1000));
815        {
816            let tracker = AllocTracker::new(Some(manager.clone() as WriteBufferManagerRef));
817
818            tracker.on_allocation(100);
819            assert_eq!(100, tracker.bytes_allocated());
820            assert_eq!(100, manager.memory_usage());
821            assert_eq!(100, manager.mutable_usage());
822        }
823
824        assert_eq!(0, manager.memory_usage());
825        assert_eq!(0, manager.mutable_usage());
826    }
827}