mito2/
memtable.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Memtables are write buffers for regions.
16
17use std::collections::BTreeMap;
18use std::fmt;
19use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
20use std::sync::{Arc, Mutex};
21use std::time::Duration;
22
23pub use bulk::part::EncodedBulkPart;
24use bytes::Bytes;
25use common_time::Timestamp;
26use datatypes::arrow::record_batch::RecordBatch;
27use mito_codec::key_values::KeyValue;
28pub use mito_codec::key_values::KeyValues;
29use serde::{Deserialize, Serialize};
30use store_api::metadata::RegionMetadataRef;
31use store_api::storage::{ColumnId, SequenceNumber, SequenceRange};
32
33use crate::config::MitoConfig;
34use crate::error::{Result, UnsupportedOperationSnafu};
35use crate::flush::WriteBufferManagerRef;
36use crate::memtable::bulk::{BulkMemtableBuilder, CompactDispatcher};
37use crate::memtable::partition_tree::{PartitionTreeConfig, PartitionTreeMemtableBuilder};
38use crate::memtable::time_series::TimeSeriesMemtableBuilder;
39use crate::metrics::WRITE_BUFFER_BYTES;
40use crate::read::Batch;
41use crate::read::prune::PruneTimeIterator;
42use crate::read::scan_region::PredicateGroup;
43use crate::region::options::{MemtableOptions, MergeMode, RegionOptions};
44use crate::sst::FormatType;
45use crate::sst::file::FileTimeRange;
46use crate::sst::parquet::SstInfo;
47use crate::sst::parquet::file_range::PreFilterMode;
48
49mod builder;
50pub mod bulk;
51pub mod partition_tree;
52pub mod simple_bulk_memtable;
53mod stats;
54pub mod time_partition;
55pub mod time_series;
56pub(crate) mod version;
57
58pub use bulk::part::{
59    BulkPart, BulkPartEncoder, BulkPartMeta, UnorderedPart, record_batch_estimated_size,
60    sort_primary_key_record_batch,
61};
62#[cfg(any(test, feature = "test"))]
63pub use time_partition::filter_record_batch;
64
65/// Id for memtables.
66///
67/// Should be unique under the same region.
68pub type MemtableId = u32;
69
70/// Config for memtables.
71#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
72#[serde(tag = "type", rename_all = "snake_case")]
73pub enum MemtableConfig {
74    PartitionTree(PartitionTreeConfig),
75    #[default]
76    TimeSeries,
77}
78
79/// Options for querying ranges from a memtable.
80#[derive(Clone)]
81pub struct RangesOptions {
82    /// Whether the ranges are being queried for flush.
83    pub for_flush: bool,
84    /// Mode to pre-filter columns in ranges.
85    pub pre_filter_mode: PreFilterMode,
86    /// Predicate to filter the data.
87    pub predicate: PredicateGroup,
88    /// Sequence range to filter the data.
89    pub sequence: Option<SequenceRange>,
90}
91
92impl Default for RangesOptions {
93    fn default() -> Self {
94        Self {
95            for_flush: false,
96            pre_filter_mode: PreFilterMode::All,
97            predicate: PredicateGroup::default(),
98            sequence: None,
99        }
100    }
101}
102
103impl RangesOptions {
104    /// Creates a new [RangesOptions] for flushing.
105    pub fn for_flush() -> Self {
106        Self {
107            for_flush: true,
108            pre_filter_mode: PreFilterMode::All,
109            predicate: PredicateGroup::default(),
110            sequence: None,
111        }
112    }
113
114    /// Sets the pre-filter mode.
115    #[must_use]
116    pub fn with_pre_filter_mode(mut self, pre_filter_mode: PreFilterMode) -> Self {
117        self.pre_filter_mode = pre_filter_mode;
118        self
119    }
120
121    /// Sets the predicate.
122    #[must_use]
123    pub fn with_predicate(mut self, predicate: PredicateGroup) -> Self {
124        self.predicate = predicate;
125        self
126    }
127
128    /// Sets the sequence range.
129    #[must_use]
130    pub fn with_sequence(mut self, sequence: Option<SequenceRange>) -> Self {
131        self.sequence = sequence;
132        self
133    }
134}
135
136#[derive(Debug, Default, Clone)]
137pub struct MemtableStats {
138    /// The estimated bytes allocated by this memtable from heap.
139    pub estimated_bytes: usize,
140    /// The inclusive time range that this memtable contains. It is None if
141    /// and only if the memtable is empty.
142    pub time_range: Option<(Timestamp, Timestamp)>,
143    /// Total rows in memtable
144    pub num_rows: usize,
145    /// Total number of ranges in the memtable.
146    pub num_ranges: usize,
147    /// The maximum sequence number in the memtable.
148    pub max_sequence: SequenceNumber,
149    /// Number of estimated timeseries in memtable.
150    pub series_count: usize,
151}
152
153impl MemtableStats {
154    /// Attaches the time range to the stats.
155    #[cfg(any(test, feature = "test"))]
156    pub fn with_time_range(mut self, time_range: Option<(Timestamp, Timestamp)>) -> Self {
157        self.time_range = time_range;
158        self
159    }
160
161    #[cfg(feature = "test")]
162    pub fn with_max_sequence(mut self, max_sequence: SequenceNumber) -> Self {
163        self.max_sequence = max_sequence;
164        self
165    }
166
167    /// Returns the estimated bytes allocated by this memtable.
168    pub fn bytes_allocated(&self) -> usize {
169        self.estimated_bytes
170    }
171
172    /// Returns the time range of the memtable.
173    pub fn time_range(&self) -> Option<(Timestamp, Timestamp)> {
174        self.time_range
175    }
176
177    /// Returns the num of total rows in memtable.
178    pub fn num_rows(&self) -> usize {
179        self.num_rows
180    }
181
182    /// Returns the number of ranges in the memtable.
183    pub fn num_ranges(&self) -> usize {
184        self.num_ranges
185    }
186
187    /// Returns the maximum sequence number in the memtable.
188    pub fn max_sequence(&self) -> SequenceNumber {
189        self.max_sequence
190    }
191
192    /// Series count in memtable.
193    pub fn series_count(&self) -> usize {
194        self.series_count
195    }
196}
197
198pub type BoxedBatchIterator = Box<dyn Iterator<Item = Result<Batch>> + Send>;
199
200pub type BoxedRecordBatchIterator = Box<dyn Iterator<Item = Result<RecordBatch>> + Send>;
201
202/// Ranges in a memtable.
203#[derive(Default)]
204pub struct MemtableRanges {
205    /// Range IDs and ranges.
206    pub ranges: BTreeMap<usize, MemtableRange>,
207}
208
209impl MemtableRanges {
210    /// Returns the total number of rows across all ranges.
211    pub fn num_rows(&self) -> usize {
212        self.ranges.values().map(|r| r.stats().num_rows()).sum()
213    }
214
215    /// Returns the total series count across all ranges.
216    pub fn series_count(&self) -> usize {
217        self.ranges.values().map(|r| r.stats().series_count()).sum()
218    }
219
220    /// Returns the maximum sequence number across all ranges.
221    pub fn max_sequence(&self) -> SequenceNumber {
222        self.ranges
223            .values()
224            .map(|r| r.stats().max_sequence())
225            .max()
226            .unwrap_or(0)
227    }
228}
229
230impl IterBuilder for MemtableRanges {
231    fn build(&self, _metrics: Option<MemScanMetrics>) -> Result<BoxedBatchIterator> {
232        UnsupportedOperationSnafu {
233            err_msg: "MemtableRanges does not support build iterator",
234        }
235        .fail()
236    }
237
238    fn is_record_batch(&self) -> bool {
239        self.ranges.values().all(|range| range.is_record_batch())
240    }
241}
242
243/// In memory write buffer.
244pub trait Memtable: Send + Sync + fmt::Debug {
245    /// Returns the id of this memtable.
246    fn id(&self) -> MemtableId;
247
248    /// Writes key values into the memtable.
249    fn write(&self, kvs: &KeyValues) -> Result<()>;
250
251    /// Writes one key value pair into the memtable.
252    fn write_one(&self, key_value: KeyValue) -> Result<()>;
253
254    /// Writes an encoded batch of into memtable.
255    fn write_bulk(&self, part: crate::memtable::bulk::part::BulkPart) -> Result<()>;
256
257    /// Scans the memtable.
258    /// `projection` selects columns to read, `None` means reading all columns.
259    /// `filters` are the predicates to be pushed down to memtable.
260    ///
261    /// # Note
262    /// This method should only be used for tests.
263    #[cfg(any(test, feature = "test"))]
264    fn iter(
265        &self,
266        projection: Option<&[ColumnId]>,
267        predicate: Option<table::predicate::Predicate>,
268        sequence: Option<SequenceRange>,
269    ) -> Result<BoxedBatchIterator>;
270
271    /// Returns the ranges in the memtable.
272    ///
273    /// The returned map contains the range id and the range after applying the predicate.
274    fn ranges(
275        &self,
276        projection: Option<&[ColumnId]>,
277        options: RangesOptions,
278    ) -> Result<MemtableRanges>;
279
280    /// Returns true if the memtable is empty.
281    fn is_empty(&self) -> bool;
282
283    /// Turns a mutable memtable into an immutable memtable.
284    fn freeze(&self) -> Result<()>;
285
286    /// Returns the [MemtableStats] info of Memtable.
287    fn stats(&self) -> MemtableStats;
288
289    /// Forks this (immutable) memtable and returns a new mutable memtable with specific memtable `id`.
290    ///
291    /// A region must freeze the memtable before invoking this method.
292    fn fork(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef;
293
294    /// Compacts the memtable.
295    ///
296    /// The `for_flush` is true when the flush job calls this method.
297    fn compact(&self, for_flush: bool) -> Result<()> {
298        let _ = for_flush;
299        Ok(())
300    }
301}
302
303pub type MemtableRef = Arc<dyn Memtable>;
304
305/// Builder to build a new [Memtable].
306pub trait MemtableBuilder: Send + Sync + fmt::Debug {
307    /// Builds a new memtable instance.
308    fn build(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef;
309
310    /// Returns true if the memtable supports bulk insert and benefits from it.
311    fn use_bulk_insert(&self, metadata: &RegionMetadataRef) -> bool {
312        let _metadata = metadata;
313        false
314    }
315}
316
317pub type MemtableBuilderRef = Arc<dyn MemtableBuilder>;
318
319/// Memtable memory allocation tracker.
320#[derive(Default)]
321pub struct AllocTracker {
322    write_buffer_manager: Option<WriteBufferManagerRef>,
323    /// Bytes allocated by the tracker.
324    bytes_allocated: AtomicUsize,
325    /// Whether allocating is done.
326    is_done_allocating: AtomicBool,
327}
328
329impl fmt::Debug for AllocTracker {
330    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
331        f.debug_struct("AllocTracker")
332            .field("bytes_allocated", &self.bytes_allocated)
333            .field("is_done_allocating", &self.is_done_allocating)
334            .finish()
335    }
336}
337
338impl AllocTracker {
339    /// Returns a new [AllocTracker].
340    pub fn new(write_buffer_manager: Option<WriteBufferManagerRef>) -> AllocTracker {
341        AllocTracker {
342            write_buffer_manager,
343            bytes_allocated: AtomicUsize::new(0),
344            is_done_allocating: AtomicBool::new(false),
345        }
346    }
347
348    /// Tracks `bytes` memory is allocated.
349    pub(crate) fn on_allocation(&self, bytes: usize) {
350        self.bytes_allocated.fetch_add(bytes, Ordering::Relaxed);
351        WRITE_BUFFER_BYTES.add(bytes as i64);
352        if let Some(write_buffer_manager) = &self.write_buffer_manager {
353            write_buffer_manager.reserve_mem(bytes);
354        }
355    }
356
357    /// Marks we have finished allocating memory so we can free it from
358    /// the write buffer's limit.
359    ///
360    /// The region MUST ensure that it calls this method inside the region writer's write lock.
361    pub(crate) fn done_allocating(&self) {
362        if let Some(write_buffer_manager) = &self.write_buffer_manager
363            && self
364                .is_done_allocating
365                .compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed)
366                .is_ok()
367        {
368            write_buffer_manager.schedule_free_mem(self.bytes_allocated.load(Ordering::Relaxed));
369        }
370    }
371
372    /// Returns bytes allocated.
373    pub(crate) fn bytes_allocated(&self) -> usize {
374        self.bytes_allocated.load(Ordering::Relaxed)
375    }
376
377    /// Returns the write buffer manager.
378    pub(crate) fn write_buffer_manager(&self) -> Option<WriteBufferManagerRef> {
379        self.write_buffer_manager.clone()
380    }
381}
382
383impl Drop for AllocTracker {
384    fn drop(&mut self) {
385        if !self.is_done_allocating.load(Ordering::Relaxed) {
386            self.done_allocating();
387        }
388
389        let bytes_allocated = self.bytes_allocated.load(Ordering::Relaxed);
390        WRITE_BUFFER_BYTES.sub(bytes_allocated as i64);
391
392        // Memory tracked by this tracker is freed.
393        if let Some(write_buffer_manager) = &self.write_buffer_manager {
394            write_buffer_manager.free_mem(bytes_allocated);
395        }
396    }
397}
398
399/// Provider of memtable builders for regions.
400#[derive(Clone)]
401pub(crate) struct MemtableBuilderProvider {
402    write_buffer_manager: Option<WriteBufferManagerRef>,
403    config: Arc<MitoConfig>,
404    compact_dispatcher: Arc<CompactDispatcher>,
405}
406
407impl MemtableBuilderProvider {
408    pub(crate) fn new(
409        write_buffer_manager: Option<WriteBufferManagerRef>,
410        config: Arc<MitoConfig>,
411    ) -> Self {
412        let compact_dispatcher =
413            Arc::new(CompactDispatcher::new(config.max_background_compactions));
414
415        Self {
416            write_buffer_manager,
417            config,
418            compact_dispatcher,
419        }
420    }
421
422    pub(crate) fn builder_for_options(&self, options: &RegionOptions) -> MemtableBuilderRef {
423        let dedup = options.need_dedup();
424        let merge_mode = options.merge_mode();
425        let flat_format = options
426            .sst_format
427            .map(|format| format == FormatType::Flat)
428            .unwrap_or(self.config.default_experimental_flat_format);
429        if flat_format {
430            if options.memtable.is_some() {
431                common_telemetry::info!(
432                    "Overriding memtable config, use BulkMemtable under flat format"
433                );
434            }
435
436            return Arc::new(
437                BulkMemtableBuilder::new(
438                    self.write_buffer_manager.clone(),
439                    !dedup, // append_mode: true if not dedup, false if dedup
440                    merge_mode,
441                )
442                .with_compact_dispatcher(self.compact_dispatcher.clone()),
443            );
444        }
445
446        // The format is not flat.
447        match &options.memtable {
448            Some(MemtableOptions::TimeSeries) => Arc::new(TimeSeriesMemtableBuilder::new(
449                self.write_buffer_manager.clone(),
450                dedup,
451                merge_mode,
452            )),
453            Some(MemtableOptions::PartitionTree(opts)) => {
454                Arc::new(PartitionTreeMemtableBuilder::new(
455                    PartitionTreeConfig {
456                        index_max_keys_per_shard: opts.index_max_keys_per_shard,
457                        data_freeze_threshold: opts.data_freeze_threshold,
458                        fork_dictionary_bytes: opts.fork_dictionary_bytes,
459                        dedup,
460                        merge_mode,
461                    },
462                    self.write_buffer_manager.clone(),
463                ))
464            }
465            None => self.default_primary_key_memtable_builder(dedup, merge_mode),
466        }
467    }
468
469    fn default_primary_key_memtable_builder(
470        &self,
471        dedup: bool,
472        merge_mode: MergeMode,
473    ) -> MemtableBuilderRef {
474        match &self.config.memtable {
475            MemtableConfig::PartitionTree(config) => {
476                let mut config = config.clone();
477                config.dedup = dedup;
478                Arc::new(PartitionTreeMemtableBuilder::new(
479                    config,
480                    self.write_buffer_manager.clone(),
481                ))
482            }
483            MemtableConfig::TimeSeries => Arc::new(TimeSeriesMemtableBuilder::new(
484                self.write_buffer_manager.clone(),
485                dedup,
486                merge_mode,
487            )),
488        }
489    }
490}
491
492/// Metrics for scanning a memtable.
493#[derive(Clone, Default)]
494pub struct MemScanMetrics(Arc<Mutex<MemScanMetricsData>>);
495
496impl MemScanMetrics {
497    /// Merges the metrics.
498    pub(crate) fn merge_inner(&self, inner: &MemScanMetricsData) {
499        let mut metrics = self.0.lock().unwrap();
500        metrics.total_series += inner.total_series;
501        metrics.num_rows += inner.num_rows;
502        metrics.num_batches += inner.num_batches;
503        metrics.scan_cost += inner.scan_cost;
504    }
505
506    /// Gets the metrics data.
507    pub(crate) fn data(&self) -> MemScanMetricsData {
508        self.0.lock().unwrap().clone()
509    }
510}
511
512#[derive(Clone, Default)]
513pub(crate) struct MemScanMetricsData {
514    /// Total series in the memtable.
515    pub(crate) total_series: usize,
516    /// Number of rows read.
517    pub(crate) num_rows: usize,
518    /// Number of batch read.
519    pub(crate) num_batches: usize,
520    /// Duration to scan the memtable.
521    pub(crate) scan_cost: Duration,
522}
523
524/// Encoded range in the memtable.
525pub struct EncodedRange {
526    /// Encoded file data.
527    pub data: Bytes,
528    /// Metadata of the encoded range.
529    pub sst_info: SstInfo,
530}
531
532/// Builder to build an iterator to read the range.
533/// The builder should know the projection and the predicate to build the iterator.
534pub trait IterBuilder: Send + Sync {
535    /// Returns the iterator to read the range.
536    fn build(&self, metrics: Option<MemScanMetrics>) -> Result<BoxedBatchIterator>;
537
538    /// Returns whether the iterator is a record batch iterator.
539    fn is_record_batch(&self) -> bool {
540        false
541    }
542
543    /// Returns the record batch iterator to read the range.
544    fn build_record_batch(
545        &self,
546        metrics: Option<MemScanMetrics>,
547    ) -> Result<BoxedRecordBatchIterator> {
548        let _metrics = metrics;
549        UnsupportedOperationSnafu {
550            err_msg: "Record batch iterator is not supported by this memtable",
551        }
552        .fail()
553    }
554
555    /// Returns the [EncodedRange] if the range is already encoded into SST.
556    fn encoded_range(&self) -> Option<EncodedRange> {
557        None
558    }
559}
560
561pub type BoxedIterBuilder = Box<dyn IterBuilder>;
562
563/// Context shared by ranges of the same memtable.
564pub struct MemtableRangeContext {
565    /// Id of the memtable.
566    id: MemtableId,
567    /// Iterator builder.
568    builder: BoxedIterBuilder,
569    /// All filters.
570    predicate: PredicateGroup,
571}
572
573pub type MemtableRangeContextRef = Arc<MemtableRangeContext>;
574
575impl MemtableRangeContext {
576    /// Creates a new [MemtableRangeContext].
577    pub fn new(id: MemtableId, builder: BoxedIterBuilder, predicate: PredicateGroup) -> Self {
578        Self {
579            id,
580            builder,
581            predicate,
582        }
583    }
584}
585
586/// A range in the memtable.
587#[derive(Clone)]
588pub struct MemtableRange {
589    /// Shared context.
590    context: MemtableRangeContextRef,
591    /// Statistics for this memtable range.
592    stats: MemtableStats,
593}
594
595impl MemtableRange {
596    /// Creates a new range from context and stats.
597    pub fn new(context: MemtableRangeContextRef, stats: MemtableStats) -> Self {
598        Self { context, stats }
599    }
600
601    /// Returns the statistics for this range.
602    pub fn stats(&self) -> &MemtableStats {
603        &self.stats
604    }
605
606    /// Returns the id of the memtable to read.
607    pub fn id(&self) -> MemtableId {
608        self.context.id
609    }
610
611    /// Builds an iterator to read the range.
612    /// Filters the result by the specific time range, this ensures memtable won't return
613    /// rows out of the time range when new rows are inserted.
614    pub fn build_prune_iter(
615        &self,
616        time_range: FileTimeRange,
617        metrics: Option<MemScanMetrics>,
618    ) -> Result<BoxedBatchIterator> {
619        let iter = self.context.builder.build(metrics)?;
620        let time_filters = self.context.predicate.time_filters();
621        Ok(Box::new(PruneTimeIterator::new(
622            iter,
623            time_range,
624            time_filters,
625        )))
626    }
627
628    /// Builds an iterator to read all rows in range.
629    pub fn build_iter(&self) -> Result<BoxedBatchIterator> {
630        self.context.builder.build(None)
631    }
632
633    /// Builds a record batch iterator to read all rows in range.
634    ///
635    /// This method doesn't take the optional time range because a bulk part is immutable
636    /// so we don't need to filter rows out of the time range.
637    pub fn build_record_batch_iter(
638        &self,
639        metrics: Option<MemScanMetrics>,
640    ) -> Result<BoxedRecordBatchIterator> {
641        self.context.builder.build_record_batch(metrics)
642    }
643
644    /// Returns whether the iterator is a record batch iterator.
645    pub fn is_record_batch(&self) -> bool {
646        self.context.builder.is_record_batch()
647    }
648
649    pub fn num_rows(&self) -> usize {
650        self.stats.num_rows
651    }
652
653    /// Returns the encoded range if available.
654    pub fn encoded(&self) -> Option<EncodedRange> {
655        self.context.builder.encoded_range()
656    }
657}
658
659#[cfg(test)]
660mod tests {
661    use common_base::readable_size::ReadableSize;
662
663    use super::*;
664    use crate::flush::{WriteBufferManager, WriteBufferManagerImpl};
665
666    #[test]
667    fn test_deserialize_memtable_config() {
668        let s = r#"
669type = "partition_tree"
670index_max_keys_per_shard = 8192
671data_freeze_threshold = 1024
672dedup = true
673fork_dictionary_bytes = "512MiB"
674"#;
675        let config: MemtableConfig = toml::from_str(s).unwrap();
676        let MemtableConfig::PartitionTree(memtable_config) = config else {
677            unreachable!()
678        };
679        assert!(memtable_config.dedup);
680        assert_eq!(8192, memtable_config.index_max_keys_per_shard);
681        assert_eq!(1024, memtable_config.data_freeze_threshold);
682        assert_eq!(ReadableSize::mb(512), memtable_config.fork_dictionary_bytes);
683    }
684
685    #[test]
686    fn test_alloc_tracker_without_manager() {
687        let tracker = AllocTracker::new(None);
688        assert_eq!(0, tracker.bytes_allocated());
689        tracker.on_allocation(100);
690        assert_eq!(100, tracker.bytes_allocated());
691        tracker.on_allocation(200);
692        assert_eq!(300, tracker.bytes_allocated());
693
694        tracker.done_allocating();
695        assert_eq!(300, tracker.bytes_allocated());
696    }
697
698    #[test]
699    fn test_alloc_tracker_with_manager() {
700        let manager = Arc::new(WriteBufferManagerImpl::new(1000));
701        {
702            let tracker = AllocTracker::new(Some(manager.clone() as WriteBufferManagerRef));
703
704            tracker.on_allocation(100);
705            assert_eq!(100, tracker.bytes_allocated());
706            assert_eq!(100, manager.memory_usage());
707            assert_eq!(100, manager.mutable_usage());
708
709            for _ in 0..2 {
710                // Done allocating won't free the same memory multiple times.
711                tracker.done_allocating();
712                assert_eq!(100, manager.memory_usage());
713                assert_eq!(0, manager.mutable_usage());
714            }
715        }
716
717        assert_eq!(0, manager.memory_usage());
718        assert_eq!(0, manager.mutable_usage());
719    }
720
721    #[test]
722    fn test_alloc_tracker_without_done_allocating() {
723        let manager = Arc::new(WriteBufferManagerImpl::new(1000));
724        {
725            let tracker = AllocTracker::new(Some(manager.clone() as WriteBufferManagerRef));
726
727            tracker.on_allocation(100);
728            assert_eq!(100, tracker.bytes_allocated());
729            assert_eq!(100, manager.memory_usage());
730            assert_eq!(100, manager.mutable_usage());
731        }
732
733        assert_eq!(0, manager.memory_usage());
734        assert_eq!(0, manager.mutable_usage());
735    }
736}