mito2/memtable/
bulk.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Memtable implementation for bulk load
16
17#[allow(unused)]
18pub mod context;
19#[allow(unused)]
20pub mod part;
21pub mod part_reader;
22mod row_group_reader;
23
24use std::collections::{BTreeMap, HashSet};
25use std::sync::atomic::{AtomicI64, AtomicU64, AtomicUsize, Ordering};
26use std::sync::{Arc, Mutex, RwLock};
27use std::time::Instant;
28
29use datatypes::arrow::datatypes::SchemaRef;
30use mito_codec::key_values::KeyValue;
31use rayon::prelude::*;
32use store_api::metadata::RegionMetadataRef;
33use store_api::storage::{ColumnId, RegionId, SequenceNumber};
34use tokio::sync::Semaphore;
35
36use crate::error::{Result, UnsupportedOperationSnafu};
37use crate::flush::WriteBufferManagerRef;
38use crate::memtable::bulk::context::BulkIterContext;
39use crate::memtable::bulk::part::{BulkPart, BulkPartEncodeMetrics, BulkPartEncoder};
40use crate::memtable::bulk::part_reader::BulkPartRecordBatchIter;
41use crate::memtable::stats::WriteMetrics;
42use crate::memtable::{
43    AllocTracker, BoxedBatchIterator, BoxedRecordBatchIterator, EncodedBulkPart, EncodedRange,
44    IterBuilder, KeyValues, MemScanMetrics, Memtable, MemtableBuilder, MemtableId, MemtableRange,
45    MemtableRangeContext, MemtableRanges, MemtableRef, MemtableStats, PredicateGroup,
46};
47use crate::read::flat_dedup::{FlatDedupIterator, FlatLastNonNull, FlatLastRow};
48use crate::read::flat_merge::FlatMergeIterator;
49use crate::region::options::MergeMode;
50use crate::sst::file::FileId;
51use crate::sst::parquet::format::FIXED_POS_COLUMN_NUM;
52use crate::sst::parquet::{DEFAULT_READ_BATCH_SIZE, DEFAULT_ROW_GROUP_SIZE};
53use crate::sst::{FlatSchemaOptions, to_flat_sst_arrow_schema};
54
55/// All parts in a bulk memtable.
56#[derive(Default)]
57struct BulkParts {
58    /// Raw parts.
59    parts: Vec<BulkPartWrapper>,
60    /// Parts encoded as parquets.
61    encoded_parts: Vec<EncodedPartWrapper>,
62}
63
64impl BulkParts {
65    /// Total number of parts (raw + encoded).
66    fn num_parts(&self) -> usize {
67        self.parts.len() + self.encoded_parts.len()
68    }
69
70    /// Returns true if there is no part.
71    fn is_empty(&self) -> bool {
72        self.parts.is_empty() && self.encoded_parts.is_empty()
73    }
74
75    /// Returns true if the bulk parts should be merged.
76    fn should_merge_bulk_parts(&self) -> bool {
77        let unmerged_count = self.parts.iter().filter(|wrapper| !wrapper.merging).count();
78        // If the total number of unmerged parts is >= 8, start a merge task.
79        unmerged_count >= 8
80    }
81
82    /// Returns true if the encoded parts should be merged.
83    fn should_merge_encoded_parts(&self) -> bool {
84        let unmerged_count = self
85            .encoded_parts
86            .iter()
87            .filter(|wrapper| !wrapper.merging)
88            .count();
89        // If the total number of unmerged encoded parts is >= 8, start a merge task.
90        unmerged_count >= 8
91    }
92
93    /// Collects unmerged parts and marks them as being merged.
94    /// Returns the collected parts to merge.
95    fn collect_bulk_parts_to_merge(&mut self) -> Vec<PartToMerge> {
96        let mut collected_parts = Vec::new();
97
98        for wrapper in &mut self.parts {
99            if !wrapper.merging {
100                wrapper.merging = true;
101                collected_parts.push(PartToMerge::Bulk {
102                    part: wrapper.part.clone(),
103                    file_id: wrapper.file_id,
104                });
105            }
106        }
107        collected_parts
108    }
109
110    /// Collects unmerged encoded parts within size threshold and marks them as being merged.
111    /// Returns the collected parts to merge.
112    fn collect_encoded_parts_to_merge(&mut self) -> Vec<PartToMerge> {
113        // Find minimum size among unmerged parts
114        let min_size = self
115            .encoded_parts
116            .iter()
117            .filter(|wrapper| !wrapper.merging)
118            .map(|wrapper| wrapper.part.size_bytes())
119            .min();
120
121        let Some(min_size) = min_size else {
122            return Vec::new();
123        };
124
125        let max_allowed_size = min_size.saturating_mul(16).min(4 * 1024 * 1024);
126        let mut collected_parts = Vec::new();
127
128        for wrapper in &mut self.encoded_parts {
129            if !wrapper.merging {
130                let size = wrapper.part.size_bytes();
131                if size <= max_allowed_size {
132                    wrapper.merging = true;
133                    collected_parts.push(PartToMerge::Encoded {
134                        part: wrapper.part.clone(),
135                        file_id: wrapper.file_id,
136                    });
137                }
138            }
139        }
140        collected_parts
141    }
142
143    /// Installs merged encoded parts and removes the original parts by file ids.
144    /// Returns the total number of rows in the merged parts.
145    fn install_merged_parts<I>(
146        &mut self,
147        merged_parts: I,
148        merged_file_ids: &HashSet<FileId>,
149        merge_encoded: bool,
150    ) -> usize
151    where
152        I: IntoIterator<Item = EncodedBulkPart>,
153    {
154        let mut total_output_rows = 0;
155
156        for encoded_part in merged_parts {
157            total_output_rows += encoded_part.metadata().num_rows;
158            self.encoded_parts.push(EncodedPartWrapper {
159                part: encoded_part,
160                file_id: FileId::random(),
161                merging: false,
162            });
163        }
164
165        if merge_encoded {
166            self.encoded_parts
167                .retain(|wrapper| !merged_file_ids.contains(&wrapper.file_id));
168        } else {
169            self.parts
170                .retain(|wrapper| !merged_file_ids.contains(&wrapper.file_id));
171        }
172
173        total_output_rows
174    }
175
176    /// Resets merging flag for parts with the given file ids.
177    /// Used when merging fails or is cancelled.
178    fn reset_merging_flags(&mut self, file_ids: &HashSet<FileId>, merge_encoded: bool) {
179        if merge_encoded {
180            for wrapper in &mut self.encoded_parts {
181                if file_ids.contains(&wrapper.file_id) {
182                    wrapper.merging = false;
183                }
184            }
185        } else {
186            for wrapper in &mut self.parts {
187                if file_ids.contains(&wrapper.file_id) {
188                    wrapper.merging = false;
189                }
190            }
191        }
192    }
193}
194
195/// RAII guard for managing merging flags.
196/// Automatically resets merging flags when dropped if the merge operation wasn't successful.
197struct MergingFlagsGuard<'a> {
198    bulk_parts: &'a RwLock<BulkParts>,
199    file_ids: &'a HashSet<FileId>,
200    merge_encoded: bool,
201    success: bool,
202}
203
204impl<'a> MergingFlagsGuard<'a> {
205    /// Creates a new guard for the given file ids.
206    fn new(
207        bulk_parts: &'a RwLock<BulkParts>,
208        file_ids: &'a HashSet<FileId>,
209        merge_encoded: bool,
210    ) -> Self {
211        Self {
212            bulk_parts,
213            file_ids,
214            merge_encoded,
215            success: false,
216        }
217    }
218
219    /// Marks the merge operation as successful.
220    /// When this is called, the guard will not reset the flags on drop.
221    fn mark_success(&mut self) {
222        self.success = true;
223    }
224}
225
226impl<'a> Drop for MergingFlagsGuard<'a> {
227    fn drop(&mut self) {
228        if !self.success
229            && let Ok(mut parts) = self.bulk_parts.write()
230        {
231            parts.reset_merging_flags(self.file_ids, self.merge_encoded);
232        }
233    }
234}
235
236/// Memtable that ingests and scans parts directly.
237pub struct BulkMemtable {
238    id: MemtableId,
239    parts: Arc<RwLock<BulkParts>>,
240    metadata: RegionMetadataRef,
241    alloc_tracker: AllocTracker,
242    max_timestamp: AtomicI64,
243    min_timestamp: AtomicI64,
244    max_sequence: AtomicU64,
245    num_rows: AtomicUsize,
246    /// Cached flat SST arrow schema for memtable compaction.
247    #[allow(dead_code)]
248    flat_arrow_schema: SchemaRef,
249    /// Compactor for merging bulk parts
250    compactor: Arc<Mutex<MemtableCompactor>>,
251    /// Dispatcher for scheduling compaction tasks
252    compact_dispatcher: Option<Arc<CompactDispatcher>>,
253    /// Whether the append mode is enabled
254    append_mode: bool,
255    /// Mode to handle duplicate rows while merging
256    merge_mode: MergeMode,
257}
258
259impl std::fmt::Debug for BulkMemtable {
260    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
261        f.debug_struct("BulkMemtable")
262            .field("id", &self.id)
263            .field("num_rows", &self.num_rows.load(Ordering::Relaxed))
264            .field("min_timestamp", &self.min_timestamp.load(Ordering::Relaxed))
265            .field("max_timestamp", &self.max_timestamp.load(Ordering::Relaxed))
266            .field("max_sequence", &self.max_sequence.load(Ordering::Relaxed))
267            .finish()
268    }
269}
270
271impl Memtable for BulkMemtable {
272    fn id(&self) -> MemtableId {
273        self.id
274    }
275
276    fn write(&self, _kvs: &KeyValues) -> Result<()> {
277        UnsupportedOperationSnafu {
278            err_msg: "write() is not supported for bulk memtable",
279        }
280        .fail()
281    }
282
283    fn write_one(&self, _key_value: KeyValue) -> Result<()> {
284        UnsupportedOperationSnafu {
285            err_msg: "write_one() is not supported for bulk memtable",
286        }
287        .fail()
288    }
289
290    fn write_bulk(&self, fragment: BulkPart) -> Result<()> {
291        let local_metrics = WriteMetrics {
292            key_bytes: 0,
293            value_bytes: fragment.estimated_size(),
294            min_ts: fragment.min_timestamp,
295            max_ts: fragment.max_timestamp,
296            num_rows: fragment.num_rows(),
297            max_sequence: fragment.sequence,
298        };
299
300        {
301            let mut bulk_parts = self.parts.write().unwrap();
302            bulk_parts.parts.push(BulkPartWrapper {
303                part: fragment,
304                file_id: FileId::random(),
305                merging: false,
306            });
307
308            // Since this operation should be fast, we do it in parts lock scope.
309            // This ensure the statistics in `ranges()` are correct. What's more,
310            // it guarantees no rows are out of the time range so we don't need to
311            // prune rows by time range again in the iterator of the MemtableRange.
312            self.update_stats(local_metrics);
313        }
314
315        if self.should_compact() {
316            self.schedule_compact();
317        }
318
319        Ok(())
320    }
321
322    #[cfg(any(test, feature = "test"))]
323    fn iter(
324        &self,
325        _projection: Option<&[ColumnId]>,
326        _predicate: Option<table::predicate::Predicate>,
327        _sequence: Option<SequenceNumber>,
328    ) -> Result<crate::memtable::BoxedBatchIterator> {
329        todo!()
330    }
331
332    fn ranges(
333        &self,
334        projection: Option<&[ColumnId]>,
335        predicate: PredicateGroup,
336        sequence: Option<SequenceNumber>,
337    ) -> Result<MemtableRanges> {
338        let mut ranges = BTreeMap::new();
339        let mut range_id = 0;
340
341        // TODO(yingwen): Filter ranges by sequence.
342        let context = Arc::new(BulkIterContext::new(
343            self.metadata.clone(),
344            &projection,
345            predicate.predicate().cloned(),
346        ));
347
348        // Adds ranges for regular parts and encoded parts
349        {
350            let bulk_parts = self.parts.read().unwrap();
351
352            // Adds ranges for regular parts
353            for part_wrapper in bulk_parts.parts.iter() {
354                // Skips empty parts
355                if part_wrapper.part.num_rows() == 0 {
356                    continue;
357                }
358
359                let range = MemtableRange::new(
360                    Arc::new(MemtableRangeContext::new(
361                        self.id,
362                        Box::new(BulkRangeIterBuilder {
363                            part: part_wrapper.part.clone(),
364                            context: context.clone(),
365                            sequence,
366                        }),
367                        predicate.clone(),
368                    )),
369                    part_wrapper.part.num_rows(),
370                );
371                ranges.insert(range_id, range);
372                range_id += 1;
373            }
374
375            // Adds ranges for encoded parts
376            for encoded_part_wrapper in bulk_parts.encoded_parts.iter() {
377                // Skips empty parts
378                if encoded_part_wrapper.part.metadata().num_rows == 0 {
379                    continue;
380                }
381
382                let range = MemtableRange::new(
383                    Arc::new(MemtableRangeContext::new(
384                        self.id,
385                        Box::new(EncodedBulkRangeIterBuilder {
386                            file_id: encoded_part_wrapper.file_id,
387                            part: encoded_part_wrapper.part.clone(),
388                            context: context.clone(),
389                            sequence,
390                        }),
391                        predicate.clone(),
392                    )),
393                    encoded_part_wrapper.part.metadata().num_rows,
394                );
395                ranges.insert(range_id, range);
396                range_id += 1;
397            }
398        }
399
400        let mut stats = self.stats();
401        stats.num_ranges = ranges.len();
402
403        // TODO(yingwen): Supports per range stats.
404        Ok(MemtableRanges { ranges, stats })
405    }
406
407    fn is_empty(&self) -> bool {
408        let bulk_parts = self.parts.read().unwrap();
409        bulk_parts.is_empty()
410    }
411
412    fn freeze(&self) -> Result<()> {
413        self.alloc_tracker.done_allocating();
414        Ok(())
415    }
416
417    fn stats(&self) -> MemtableStats {
418        let estimated_bytes = self.alloc_tracker.bytes_allocated();
419
420        if estimated_bytes == 0 || self.num_rows.load(Ordering::Relaxed) == 0 {
421            return MemtableStats {
422                estimated_bytes,
423                time_range: None,
424                num_rows: 0,
425                num_ranges: 0,
426                max_sequence: 0,
427                series_count: 0,
428            };
429        }
430
431        let ts_type = self
432            .metadata
433            .time_index_column()
434            .column_schema
435            .data_type
436            .clone()
437            .as_timestamp()
438            .expect("Timestamp column must have timestamp type");
439        let max_timestamp = ts_type.create_timestamp(self.max_timestamp.load(Ordering::Relaxed));
440        let min_timestamp = ts_type.create_timestamp(self.min_timestamp.load(Ordering::Relaxed));
441
442        let num_ranges = self.parts.read().unwrap().num_parts();
443
444        MemtableStats {
445            estimated_bytes,
446            time_range: Some((min_timestamp, max_timestamp)),
447            num_rows: self.num_rows.load(Ordering::Relaxed),
448            num_ranges,
449            max_sequence: self.max_sequence.load(Ordering::Relaxed),
450            series_count: self.estimated_series_count(),
451        }
452    }
453
454    fn fork(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef {
455        // Computes the new flat schema based on the new metadata.
456        let flat_arrow_schema = to_flat_sst_arrow_schema(
457            metadata,
458            &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
459        );
460
461        Arc::new(Self {
462            id,
463            parts: Arc::new(RwLock::new(BulkParts::default())),
464            metadata: metadata.clone(),
465            alloc_tracker: AllocTracker::new(self.alloc_tracker.write_buffer_manager()),
466            max_timestamp: AtomicI64::new(i64::MIN),
467            min_timestamp: AtomicI64::new(i64::MAX),
468            max_sequence: AtomicU64::new(0),
469            num_rows: AtomicUsize::new(0),
470            flat_arrow_schema,
471            compactor: Arc::new(Mutex::new(MemtableCompactor::new(metadata.region_id, id))),
472            compact_dispatcher: self.compact_dispatcher.clone(),
473            append_mode: self.append_mode,
474            merge_mode: self.merge_mode,
475        })
476    }
477
478    fn compact(&self, for_flush: bool) -> Result<()> {
479        let mut compactor = self.compactor.lock().unwrap();
480
481        if for_flush {
482            return Ok(());
483        }
484
485        // Try to merge regular parts first
486        let should_merge = self.parts.read().unwrap().should_merge_bulk_parts();
487        if should_merge {
488            compactor.merge_bulk_parts(
489                &self.flat_arrow_schema,
490                &self.parts,
491                &self.metadata,
492                !self.append_mode,
493                self.merge_mode,
494            )?;
495        }
496
497        // Then try to merge encoded parts
498        let should_merge = self.parts.read().unwrap().should_merge_encoded_parts();
499        if should_merge {
500            compactor.merge_encoded_parts(
501                &self.flat_arrow_schema,
502                &self.parts,
503                &self.metadata,
504                !self.append_mode,
505                self.merge_mode,
506            )?;
507        }
508
509        Ok(())
510    }
511}
512
513impl BulkMemtable {
514    /// Creates a new BulkMemtable
515    pub fn new(
516        id: MemtableId,
517        metadata: RegionMetadataRef,
518        write_buffer_manager: Option<WriteBufferManagerRef>,
519        compact_dispatcher: Option<Arc<CompactDispatcher>>,
520        append_mode: bool,
521        merge_mode: MergeMode,
522    ) -> Self {
523        let flat_arrow_schema = to_flat_sst_arrow_schema(
524            &metadata,
525            &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
526        );
527
528        let region_id = metadata.region_id;
529        Self {
530            id,
531            parts: Arc::new(RwLock::new(BulkParts::default())),
532            metadata,
533            alloc_tracker: AllocTracker::new(write_buffer_manager),
534            max_timestamp: AtomicI64::new(i64::MIN),
535            min_timestamp: AtomicI64::new(i64::MAX),
536            max_sequence: AtomicU64::new(0),
537            num_rows: AtomicUsize::new(0),
538            flat_arrow_schema,
539            compactor: Arc::new(Mutex::new(MemtableCompactor::new(region_id, id))),
540            compact_dispatcher,
541            append_mode,
542            merge_mode,
543        }
544    }
545
546    /// Updates memtable stats.
547    ///
548    /// Please update this inside the write lock scope.
549    fn update_stats(&self, stats: WriteMetrics) {
550        self.alloc_tracker
551            .on_allocation(stats.key_bytes + stats.value_bytes);
552
553        self.max_timestamp
554            .fetch_max(stats.max_ts, Ordering::Relaxed);
555        self.min_timestamp
556            .fetch_min(stats.min_ts, Ordering::Relaxed);
557        self.max_sequence
558            .fetch_max(stats.max_sequence, Ordering::Relaxed);
559        self.num_rows.fetch_add(stats.num_rows, Ordering::Relaxed);
560    }
561
562    /// Returns the estimated time series count.
563    fn estimated_series_count(&self) -> usize {
564        let bulk_parts = self.parts.read().unwrap();
565        bulk_parts
566            .parts
567            .iter()
568            .map(|part_wrapper| part_wrapper.part.estimated_series_count())
569            .sum()
570    }
571
572    /// Returns whether the memtable should be compacted.
573    fn should_compact(&self) -> bool {
574        let parts = self.parts.read().unwrap();
575        parts.should_merge_bulk_parts() || parts.should_merge_encoded_parts()
576    }
577
578    /// Schedules a compaction task using the CompactDispatcher.
579    fn schedule_compact(&self) {
580        if let Some(dispatcher) = &self.compact_dispatcher {
581            let task = MemCompactTask {
582                metadata: self.metadata.clone(),
583                parts: self.parts.clone(),
584                flat_arrow_schema: self.flat_arrow_schema.clone(),
585                compactor: self.compactor.clone(),
586                append_mode: self.append_mode,
587                merge_mode: self.merge_mode,
588            };
589
590            dispatcher.dispatch_compact(task);
591        } else {
592            // Uses synchronous compaction if no dispatcher is available.
593            if let Err(e) = self.compact(false) {
594                common_telemetry::error!(e; "Failed to compact table");
595            }
596        }
597    }
598}
599
600/// Iterator builder for bulk range
601struct BulkRangeIterBuilder {
602    part: BulkPart,
603    context: Arc<BulkIterContext>,
604    sequence: Option<SequenceNumber>,
605}
606
607impl IterBuilder for BulkRangeIterBuilder {
608    fn build(&self, _metrics: Option<MemScanMetrics>) -> Result<BoxedBatchIterator> {
609        UnsupportedOperationSnafu {
610            err_msg: "BatchIterator is not supported for bulk memtable",
611        }
612        .fail()
613    }
614
615    fn is_record_batch(&self) -> bool {
616        true
617    }
618
619    fn build_record_batch(
620        &self,
621        _metrics: Option<MemScanMetrics>,
622    ) -> Result<BoxedRecordBatchIterator> {
623        let iter = BulkPartRecordBatchIter::new(
624            self.part.batch.clone(),
625            self.context.clone(),
626            self.sequence,
627        );
628
629        Ok(Box::new(iter))
630    }
631
632    fn encoded_range(&self) -> Option<EncodedRange> {
633        None
634    }
635}
636
637/// Iterator builder for encoded bulk range
638struct EncodedBulkRangeIterBuilder {
639    #[allow(dead_code)]
640    file_id: FileId,
641    part: EncodedBulkPart,
642    context: Arc<BulkIterContext>,
643    sequence: Option<SequenceNumber>,
644}
645
646impl IterBuilder for EncodedBulkRangeIterBuilder {
647    fn build(&self, _metrics: Option<MemScanMetrics>) -> Result<BoxedBatchIterator> {
648        UnsupportedOperationSnafu {
649            err_msg: "BatchIterator is not supported for encoded bulk memtable",
650        }
651        .fail()
652    }
653
654    fn is_record_batch(&self) -> bool {
655        true
656    }
657
658    fn build_record_batch(
659        &self,
660        _metrics: Option<MemScanMetrics>,
661    ) -> Result<BoxedRecordBatchIterator> {
662        if let Some(iter) = self.part.read(self.context.clone(), self.sequence)? {
663            Ok(iter)
664        } else {
665            // Return an empty iterator if no data to read
666            Ok(Box::new(std::iter::empty()))
667        }
668    }
669
670    fn encoded_range(&self) -> Option<EncodedRange> {
671        Some(EncodedRange {
672            data: self.part.data().clone(),
673            sst_info: self.part.to_sst_info(self.file_id),
674        })
675    }
676}
677
678struct BulkPartWrapper {
679    part: BulkPart,
680    /// The unique file id for this part in memtable.
681    #[allow(dead_code)]
682    file_id: FileId,
683    /// Whether this part is currently being merged.
684    merging: bool,
685}
686
687struct EncodedPartWrapper {
688    part: EncodedBulkPart,
689    /// The unique file id for this part in memtable.
690    #[allow(dead_code)]
691    file_id: FileId,
692    /// Whether this part is currently being merged.
693    merging: bool,
694}
695
696/// Enum to wrap different types of parts for unified merging.
697#[derive(Clone)]
698enum PartToMerge {
699    /// Raw bulk part.
700    Bulk { part: BulkPart, file_id: FileId },
701    /// Encoded bulk part.
702    Encoded {
703        part: EncodedBulkPart,
704        file_id: FileId,
705    },
706}
707
708impl PartToMerge {
709    /// Gets the file ID of this part.
710    fn file_id(&self) -> FileId {
711        match self {
712            PartToMerge::Bulk { file_id, .. } => *file_id,
713            PartToMerge::Encoded { file_id, .. } => *file_id,
714        }
715    }
716
717    /// Gets the minimum timestamp of this part.
718    fn min_timestamp(&self) -> i64 {
719        match self {
720            PartToMerge::Bulk { part, .. } => part.min_timestamp,
721            PartToMerge::Encoded { part, .. } => part.metadata().min_timestamp,
722        }
723    }
724
725    /// Gets the maximum timestamp of this part.
726    fn max_timestamp(&self) -> i64 {
727        match self {
728            PartToMerge::Bulk { part, .. } => part.max_timestamp,
729            PartToMerge::Encoded { part, .. } => part.metadata().max_timestamp,
730        }
731    }
732
733    /// Gets the number of rows in this part.
734    fn num_rows(&self) -> usize {
735        match self {
736            PartToMerge::Bulk { part, .. } => part.num_rows(),
737            PartToMerge::Encoded { part, .. } => part.metadata().num_rows,
738        }
739    }
740
741    /// Creates a record batch iterator for this part.
742    fn create_iterator(
743        self,
744        context: Arc<BulkIterContext>,
745    ) -> Result<Option<BoxedRecordBatchIterator>> {
746        match self {
747            PartToMerge::Bulk { part, .. } => {
748                let iter = BulkPartRecordBatchIter::new(
749                    part.batch, context, None, // No sequence filter for merging
750                );
751                Ok(Some(Box::new(iter) as BoxedRecordBatchIterator))
752            }
753            PartToMerge::Encoded { part, .. } => part.read(context, None),
754        }
755    }
756}
757
758struct MemtableCompactor {
759    region_id: RegionId,
760    memtable_id: MemtableId,
761}
762
763impl MemtableCompactor {
764    /// Creates a new MemtableCompactor.
765    fn new(region_id: RegionId, memtable_id: MemtableId) -> Self {
766        Self {
767            region_id,
768            memtable_id,
769        }
770    }
771
772    /// Merges bulk parts and then encodes the result to an [EncodedBulkPart].
773    fn merge_bulk_parts(
774        &mut self,
775        arrow_schema: &SchemaRef,
776        bulk_parts: &RwLock<BulkParts>,
777        metadata: &RegionMetadataRef,
778        dedup: bool,
779        merge_mode: MergeMode,
780    ) -> Result<()> {
781        let start = Instant::now();
782
783        // Collects unmerged parts and mark them as being merged
784        let parts_to_merge = bulk_parts.write().unwrap().collect_bulk_parts_to_merge();
785        if parts_to_merge.is_empty() {
786            return Ok(());
787        }
788
789        let merged_file_ids: HashSet<FileId> =
790            parts_to_merge.iter().map(|part| part.file_id()).collect();
791        let mut guard = MergingFlagsGuard::new(bulk_parts, &merged_file_ids, false);
792
793        // Sorts parts by row count (ascending) to merge parts with similar row counts.
794        let mut sorted_parts = parts_to_merge;
795        sorted_parts.sort_unstable_by_key(|part| part.num_rows());
796
797        // Groups parts into chunks for concurrent processing.
798        let part_groups: Vec<Vec<PartToMerge>> = sorted_parts
799            .chunks(16)
800            .map(|chunk| chunk.to_vec())
801            .collect();
802
803        let total_groups = part_groups.len();
804        let total_parts_to_merge: usize = part_groups.iter().map(|group| group.len()).sum();
805        let merged_parts = part_groups
806            .into_par_iter()
807            .map(|group| Self::merge_parts_group(group, arrow_schema, metadata, dedup, merge_mode))
808            .collect::<Result<Vec<Option<EncodedBulkPart>>>>()?;
809
810        // Installs merged parts.
811        let total_output_rows = {
812            let mut parts = bulk_parts.write().unwrap();
813            parts.install_merged_parts(merged_parts.into_iter().flatten(), &merged_file_ids, false)
814        };
815
816        guard.mark_success();
817
818        common_telemetry::debug!(
819            "BulkMemtable {} {} concurrent compact {} groups, {} bulk parts, {} rows, cost: {:?}",
820            self.region_id,
821            self.memtable_id,
822            total_groups,
823            total_parts_to_merge,
824            total_output_rows,
825            start.elapsed()
826        );
827
828        Ok(())
829    }
830
831    /// Merges encoded parts and then encodes the result to an [EncodedBulkPart].
832    fn merge_encoded_parts(
833        &mut self,
834        arrow_schema: &SchemaRef,
835        bulk_parts: &RwLock<BulkParts>,
836        metadata: &RegionMetadataRef,
837        dedup: bool,
838        merge_mode: MergeMode,
839    ) -> Result<()> {
840        let start = Instant::now();
841
842        // Collects unmerged encoded parts within size threshold and mark them as being merged.
843        let parts_to_merge = {
844            let mut parts = bulk_parts.write().unwrap();
845            parts.collect_encoded_parts_to_merge()
846        };
847
848        if parts_to_merge.is_empty() {
849            return Ok(());
850        }
851
852        let merged_file_ids: HashSet<FileId> =
853            parts_to_merge.iter().map(|part| part.file_id()).collect();
854        let mut guard = MergingFlagsGuard::new(bulk_parts, &merged_file_ids, true);
855
856        if parts_to_merge.len() == 1 {
857            // Only 1 part, don't have to merge - the guard will automatically reset the flag
858            return Ok(());
859        }
860
861        // Groups parts into chunks for concurrent processing.
862        let part_groups: Vec<Vec<PartToMerge>> = parts_to_merge
863            .chunks(16)
864            .map(|chunk| chunk.to_vec())
865            .collect();
866
867        let total_groups = part_groups.len();
868        let total_parts_to_merge: usize = part_groups.iter().map(|group| group.len()).sum();
869
870        let merged_parts = part_groups
871            .into_par_iter()
872            .map(|group| Self::merge_parts_group(group, arrow_schema, metadata, dedup, merge_mode))
873            .collect::<Result<Vec<Option<EncodedBulkPart>>>>()?;
874
875        // Installs merged parts using iterator and get total output rows
876        let total_output_rows = {
877            let mut parts = bulk_parts.write().unwrap();
878            parts.install_merged_parts(merged_parts.into_iter().flatten(), &merged_file_ids, true)
879        };
880
881        // Marks the operation as successful to prevent flag reset
882        guard.mark_success();
883
884        common_telemetry::debug!(
885            "BulkMemtable {} {} concurrent compact {} groups, {} encoded parts, {} rows, cost: {:?}",
886            self.region_id,
887            self.memtable_id,
888            total_groups,
889            total_parts_to_merge,
890            total_output_rows,
891            start.elapsed()
892        );
893
894        Ok(())
895    }
896
897    /// Merges a group of parts into a single encoded part.
898    fn merge_parts_group(
899        parts_to_merge: Vec<PartToMerge>,
900        arrow_schema: &SchemaRef,
901        metadata: &RegionMetadataRef,
902        dedup: bool,
903        merge_mode: MergeMode,
904    ) -> Result<Option<EncodedBulkPart>> {
905        if parts_to_merge.is_empty() {
906            return Ok(None);
907        }
908
909        // Calculates timestamp bounds for merged data
910        let min_timestamp = parts_to_merge
911            .iter()
912            .map(|p| p.min_timestamp())
913            .min()
914            .unwrap_or(i64::MAX);
915        let max_timestamp = parts_to_merge
916            .iter()
917            .map(|p| p.max_timestamp())
918            .max()
919            .unwrap_or(i64::MIN);
920
921        let context = Arc::new(BulkIterContext::new(
922            metadata.clone(),
923            &None, // No column projection for merging
924            None,  // No predicate for merging
925        ));
926
927        // Creates iterators for all parts to merge.
928        let iterators: Vec<BoxedRecordBatchIterator> = parts_to_merge
929            .into_iter()
930            .filter_map(|part| part.create_iterator(context.clone()).ok().flatten())
931            .collect();
932
933        if iterators.is_empty() {
934            return Ok(None);
935        }
936
937        let merged_iter =
938            FlatMergeIterator::new(arrow_schema.clone(), iterators, DEFAULT_READ_BATCH_SIZE)?;
939
940        let boxed_iter: BoxedRecordBatchIterator = if dedup {
941            // Applies deduplication based on merge mode
942            match merge_mode {
943                MergeMode::LastRow => {
944                    let dedup_iter = FlatDedupIterator::new(merged_iter, FlatLastRow::new(false));
945                    Box::new(dedup_iter)
946                }
947                MergeMode::LastNonNull => {
948                    // Calculates field column start: total columns - fixed columns - field columns
949                    // Field column count = total metadata columns - time index column - primary key columns
950                    let field_column_count =
951                        metadata.column_metadatas.len() - 1 - metadata.primary_key.len();
952                    let total_columns = arrow_schema.fields().len();
953                    let field_column_start =
954                        total_columns - FIXED_POS_COLUMN_NUM - field_column_count;
955
956                    let dedup_iter = FlatDedupIterator::new(
957                        merged_iter,
958                        FlatLastNonNull::new(field_column_start, false),
959                    );
960                    Box::new(dedup_iter)
961                }
962            }
963        } else {
964            Box::new(merged_iter)
965        };
966
967        // Encodes the merged iterator
968        let encoder = BulkPartEncoder::new(metadata.clone(), DEFAULT_ROW_GROUP_SIZE)?;
969        let mut metrics = BulkPartEncodeMetrics::default();
970        let encoded_part = encoder.encode_record_batch_iter(
971            boxed_iter,
972            arrow_schema.clone(),
973            min_timestamp,
974            max_timestamp,
975            &mut metrics,
976        )?;
977
978        common_telemetry::trace!("merge_parts_group metrics: {:?}", metrics);
979
980        Ok(encoded_part)
981    }
982}
983
984/// A memtable compact task to run in background.
985struct MemCompactTask {
986    metadata: RegionMetadataRef,
987    parts: Arc<RwLock<BulkParts>>,
988
989    /// Cached flat SST arrow schema
990    flat_arrow_schema: SchemaRef,
991    /// Compactor for merging bulk parts
992    compactor: Arc<Mutex<MemtableCompactor>>,
993    /// Whether the append mode is enabled
994    append_mode: bool,
995    /// Mode to handle duplicate rows while merging
996    merge_mode: MergeMode,
997}
998
999impl MemCompactTask {
1000    fn compact(&self) -> Result<()> {
1001        let mut compactor = self.compactor.lock().unwrap();
1002
1003        // Tries to merge regular parts first
1004        let should_merge = self.parts.read().unwrap().should_merge_bulk_parts();
1005        if should_merge {
1006            compactor.merge_bulk_parts(
1007                &self.flat_arrow_schema,
1008                &self.parts,
1009                &self.metadata,
1010                !self.append_mode,
1011                self.merge_mode,
1012            )?;
1013        }
1014
1015        // Then tries to merge encoded parts
1016        let should_merge = self.parts.read().unwrap().should_merge_encoded_parts();
1017        if should_merge {
1018            compactor.merge_encoded_parts(
1019                &self.flat_arrow_schema,
1020                &self.parts,
1021                &self.metadata,
1022                !self.append_mode,
1023                self.merge_mode,
1024            )?;
1025        }
1026
1027        Ok(())
1028    }
1029}
1030
1031/// Scheduler to run compact tasks in background.
1032#[derive(Debug)]
1033pub struct CompactDispatcher {
1034    semaphore: Arc<Semaphore>,
1035}
1036
1037impl CompactDispatcher {
1038    /// Creates a new dispatcher with the given number of max concurrent tasks.
1039    pub fn new(permits: usize) -> Self {
1040        Self {
1041            semaphore: Arc::new(Semaphore::new(permits)),
1042        }
1043    }
1044
1045    /// Dispatches a compact task to run in background.
1046    fn dispatch_compact(&self, task: MemCompactTask) {
1047        let semaphore = self.semaphore.clone();
1048        common_runtime::spawn_global(async move {
1049            let Ok(_permit) = semaphore.acquire().await else {
1050                return;
1051            };
1052
1053            common_runtime::spawn_blocking_global(move || {
1054                if let Err(e) = task.compact() {
1055                    common_telemetry::error!(e; "Failed to compact memtable, region: {}", task.metadata.region_id);
1056                }
1057            });
1058        });
1059    }
1060}
1061
1062/// Builder to build a [BulkMemtable].
1063#[derive(Debug, Default)]
1064pub struct BulkMemtableBuilder {
1065    write_buffer_manager: Option<WriteBufferManagerRef>,
1066    compact_dispatcher: Option<Arc<CompactDispatcher>>,
1067    append_mode: bool,
1068    merge_mode: MergeMode,
1069}
1070
1071impl BulkMemtableBuilder {
1072    /// Creates a new builder with specific `write_buffer_manager`.
1073    pub fn new(
1074        write_buffer_manager: Option<WriteBufferManagerRef>,
1075        append_mode: bool,
1076        merge_mode: MergeMode,
1077    ) -> Self {
1078        Self {
1079            write_buffer_manager,
1080            compact_dispatcher: None,
1081            append_mode,
1082            merge_mode,
1083        }
1084    }
1085
1086    /// Sets the compact dispatcher.
1087    pub fn with_compact_dispatcher(mut self, compact_dispatcher: Arc<CompactDispatcher>) -> Self {
1088        self.compact_dispatcher = Some(compact_dispatcher);
1089        self
1090    }
1091}
1092
1093impl MemtableBuilder for BulkMemtableBuilder {
1094    fn build(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef {
1095        Arc::new(BulkMemtable::new(
1096            id,
1097            metadata.clone(),
1098            self.write_buffer_manager.clone(),
1099            self.compact_dispatcher.clone(),
1100            self.append_mode,
1101            self.merge_mode,
1102        ))
1103    }
1104
1105    fn use_bulk_insert(&self, _metadata: &RegionMetadataRef) -> bool {
1106        true
1107    }
1108}
1109
1110#[cfg(test)]
1111mod tests {
1112
1113    use mito_codec::row_converter::build_primary_key_codec;
1114
1115    use super::*;
1116    use crate::memtable::bulk::part::BulkPartConverter;
1117    use crate::read::scan_region::PredicateGroup;
1118    use crate::sst::{FlatSchemaOptions, to_flat_sst_arrow_schema};
1119    use crate::test_util::memtable_util::{build_key_values_with_ts_seq_values, metadata_for_test};
1120
1121    fn create_bulk_part_with_converter(
1122        k0: &str,
1123        k1: u32,
1124        timestamps: Vec<i64>,
1125        values: Vec<Option<f64>>,
1126        sequence: u64,
1127    ) -> Result<BulkPart> {
1128        let metadata = metadata_for_test();
1129        let capacity = 100;
1130        let primary_key_codec = build_primary_key_codec(&metadata);
1131        let schema = to_flat_sst_arrow_schema(
1132            &metadata,
1133            &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
1134        );
1135
1136        let mut converter =
1137            BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec, true);
1138
1139        let key_values = build_key_values_with_ts_seq_values(
1140            &metadata,
1141            k0.to_string(),
1142            k1,
1143            timestamps.into_iter(),
1144            values.into_iter(),
1145            sequence,
1146        );
1147
1148        converter.append_key_values(&key_values)?;
1149        converter.convert()
1150    }
1151
1152    #[test]
1153    fn test_bulk_memtable_write_read() {
1154        let metadata = metadata_for_test();
1155        let memtable =
1156            BulkMemtable::new(999, metadata.clone(), None, None, false, MergeMode::LastRow);
1157
1158        let test_data = vec![
1159            (
1160                "key_a",
1161                1u32,
1162                vec![1000i64, 2000i64],
1163                vec![Some(10.5), Some(20.5)],
1164                100u64,
1165            ),
1166            (
1167                "key_b",
1168                2u32,
1169                vec![1500i64, 2500i64],
1170                vec![Some(15.5), Some(25.5)],
1171                200u64,
1172            ),
1173            ("key_c", 3u32, vec![3000i64], vec![Some(30.5)], 300u64),
1174        ];
1175
1176        for (k0, k1, timestamps, values, seq) in test_data.iter() {
1177            let part =
1178                create_bulk_part_with_converter(k0, *k1, timestamps.clone(), values.clone(), *seq)
1179                    .unwrap();
1180            memtable.write_bulk(part).unwrap();
1181        }
1182
1183        let stats = memtable.stats();
1184        assert_eq!(5, stats.num_rows);
1185        assert_eq!(3, stats.num_ranges);
1186        assert_eq!(300, stats.max_sequence);
1187
1188        let (min_ts, max_ts) = stats.time_range.unwrap();
1189        assert_eq!(1000, min_ts.value());
1190        assert_eq!(3000, max_ts.value());
1191
1192        let predicate_group = PredicateGroup::new(&metadata, &[]);
1193        let ranges = memtable.ranges(None, predicate_group, None).unwrap();
1194
1195        assert_eq!(3, ranges.ranges.len());
1196        assert_eq!(5, ranges.stats.num_rows);
1197
1198        for (_range_id, range) in ranges.ranges.iter() {
1199            assert!(range.num_rows() > 0);
1200            assert!(range.is_record_batch());
1201
1202            let record_batch_iter = range.build_record_batch_iter(None).unwrap();
1203
1204            let mut total_rows = 0;
1205            for batch_result in record_batch_iter {
1206                let batch = batch_result.unwrap();
1207                total_rows += batch.num_rows();
1208                assert!(batch.num_rows() > 0);
1209                assert_eq!(8, batch.num_columns());
1210            }
1211            assert_eq!(total_rows, range.num_rows());
1212        }
1213    }
1214
1215    #[test]
1216    fn test_bulk_memtable_ranges_with_projection() {
1217        let metadata = metadata_for_test();
1218        let memtable =
1219            BulkMemtable::new(111, metadata.clone(), None, None, false, MergeMode::LastRow);
1220
1221        let bulk_part = create_bulk_part_with_converter(
1222            "projection_test",
1223            5,
1224            vec![5000, 6000, 7000],
1225            vec![Some(50.0), Some(60.0), Some(70.0)],
1226            500,
1227        )
1228        .unwrap();
1229
1230        memtable.write_bulk(bulk_part).unwrap();
1231
1232        let projection = vec![4u32];
1233        let predicate_group = PredicateGroup::new(&metadata, &[]);
1234        let ranges = memtable
1235            .ranges(Some(&projection), predicate_group, None)
1236            .unwrap();
1237
1238        assert_eq!(1, ranges.ranges.len());
1239        let range = ranges.ranges.get(&0).unwrap();
1240
1241        assert!(range.is_record_batch());
1242        let record_batch_iter = range.build_record_batch_iter(None).unwrap();
1243
1244        let mut total_rows = 0;
1245        for batch_result in record_batch_iter {
1246            let batch = batch_result.unwrap();
1247            assert!(batch.num_rows() > 0);
1248            assert_eq!(5, batch.num_columns());
1249            total_rows += batch.num_rows();
1250        }
1251        assert_eq!(3, total_rows);
1252    }
1253
1254    #[test]
1255    fn test_bulk_memtable_unsupported_operations() {
1256        let metadata = metadata_for_test();
1257        let memtable =
1258            BulkMemtable::new(111, metadata.clone(), None, None, false, MergeMode::LastRow);
1259
1260        let key_values = build_key_values_with_ts_seq_values(
1261            &metadata,
1262            "test".to_string(),
1263            1,
1264            vec![1000].into_iter(),
1265            vec![Some(1.0)].into_iter(),
1266            1,
1267        );
1268
1269        let err = memtable.write(&key_values).unwrap_err();
1270        assert!(err.to_string().contains("not supported"));
1271
1272        let kv = key_values.iter().next().unwrap();
1273        let err = memtable.write_one(kv).unwrap_err();
1274        assert!(err.to_string().contains("not supported"));
1275    }
1276
1277    #[test]
1278    fn test_bulk_memtable_freeze() {
1279        let metadata = metadata_for_test();
1280        let memtable =
1281            BulkMemtable::new(222, metadata.clone(), None, None, false, MergeMode::LastRow);
1282
1283        let bulk_part = create_bulk_part_with_converter(
1284            "freeze_test",
1285            10,
1286            vec![10000],
1287            vec![Some(100.0)],
1288            1000,
1289        )
1290        .unwrap();
1291
1292        memtable.write_bulk(bulk_part).unwrap();
1293        memtable.freeze().unwrap();
1294
1295        let stats_after_freeze = memtable.stats();
1296        assert_eq!(1, stats_after_freeze.num_rows);
1297    }
1298
1299    #[test]
1300    fn test_bulk_memtable_fork() {
1301        let metadata = metadata_for_test();
1302        let original_memtable =
1303            BulkMemtable::new(333, metadata.clone(), None, None, false, MergeMode::LastRow);
1304
1305        let bulk_part =
1306            create_bulk_part_with_converter("fork_test", 15, vec![15000], vec![Some(150.0)], 1500)
1307                .unwrap();
1308
1309        original_memtable.write_bulk(bulk_part).unwrap();
1310
1311        let forked_memtable = original_memtable.fork(444, &metadata);
1312
1313        assert_eq!(forked_memtable.id(), 444);
1314        assert!(forked_memtable.is_empty());
1315        assert_eq!(0, forked_memtable.stats().num_rows);
1316
1317        assert!(!original_memtable.is_empty());
1318        assert_eq!(1, original_memtable.stats().num_rows);
1319    }
1320
1321    #[test]
1322    fn test_bulk_memtable_ranges_multiple_parts() {
1323        let metadata = metadata_for_test();
1324        let memtable =
1325            BulkMemtable::new(777, metadata.clone(), None, None, false, MergeMode::LastRow);
1326
1327        let parts_data = vec![
1328            (
1329                "part1",
1330                1u32,
1331                vec![1000i64, 1100i64],
1332                vec![Some(10.0), Some(11.0)],
1333                100u64,
1334            ),
1335            (
1336                "part2",
1337                2u32,
1338                vec![2000i64, 2100i64],
1339                vec![Some(20.0), Some(21.0)],
1340                200u64,
1341            ),
1342            ("part3", 3u32, vec![3000i64], vec![Some(30.0)], 300u64),
1343        ];
1344
1345        for (k0, k1, timestamps, values, seq) in parts_data {
1346            let part = create_bulk_part_with_converter(k0, k1, timestamps, values, seq).unwrap();
1347            memtable.write_bulk(part).unwrap();
1348        }
1349
1350        let predicate_group = PredicateGroup::new(&metadata, &[]);
1351        let ranges = memtable.ranges(None, predicate_group, None).unwrap();
1352
1353        assert_eq!(3, ranges.ranges.len());
1354        assert_eq!(5, ranges.stats.num_rows);
1355        assert_eq!(3, ranges.stats.num_ranges);
1356
1357        for (range_id, range) in ranges.ranges.iter() {
1358            assert!(*range_id < 3);
1359            assert!(range.num_rows() > 0);
1360            assert!(range.is_record_batch());
1361        }
1362    }
1363
1364    #[test]
1365    fn test_bulk_memtable_ranges_with_sequence_filter() {
1366        let metadata = metadata_for_test();
1367        let memtable =
1368            BulkMemtable::new(888, metadata.clone(), None, None, false, MergeMode::LastRow);
1369
1370        let part = create_bulk_part_with_converter(
1371            "seq_test",
1372            1,
1373            vec![1000, 2000, 3000],
1374            vec![Some(10.0), Some(20.0), Some(30.0)],
1375            500,
1376        )
1377        .unwrap();
1378
1379        memtable.write_bulk(part).unwrap();
1380
1381        let predicate_group = PredicateGroup::new(&metadata, &[]);
1382        let sequence_filter = Some(400u64); // Filters out rows with sequence > 400
1383        let ranges = memtable
1384            .ranges(None, predicate_group, sequence_filter)
1385            .unwrap();
1386
1387        assert_eq!(1, ranges.ranges.len());
1388        let range = ranges.ranges.get(&0).unwrap();
1389
1390        let mut record_batch_iter = range.build_record_batch_iter(None).unwrap();
1391        assert!(record_batch_iter.next().is_none());
1392    }
1393
1394    #[test]
1395    fn test_bulk_memtable_ranges_with_encoded_parts() {
1396        let metadata = metadata_for_test();
1397        let memtable =
1398            BulkMemtable::new(999, metadata.clone(), None, None, false, MergeMode::LastRow);
1399
1400        // Adds enough bulk parts to trigger encoding
1401        for i in 0..10 {
1402            let part = create_bulk_part_with_converter(
1403                &format!("key_{}", i),
1404                i,
1405                vec![1000 + i as i64 * 100],
1406                vec![Some(i as f64 * 10.0)],
1407                100 + i as u64,
1408            )
1409            .unwrap();
1410            memtable.write_bulk(part).unwrap();
1411        }
1412
1413        memtable.compact(false).unwrap();
1414
1415        let predicate_group = PredicateGroup::new(&metadata, &[]);
1416        let ranges = memtable.ranges(None, predicate_group, None).unwrap();
1417
1418        // Should have ranges for both bulk parts and encoded parts
1419        assert_eq!(3, ranges.ranges.len());
1420        assert_eq!(10, ranges.stats.num_rows);
1421
1422        for (_range_id, range) in ranges.ranges.iter() {
1423            assert!(range.num_rows() > 0);
1424            assert!(range.is_record_batch());
1425
1426            let record_batch_iter = range.build_record_batch_iter(None).unwrap();
1427            let mut total_rows = 0;
1428            for batch_result in record_batch_iter {
1429                let batch = batch_result.unwrap();
1430                total_rows += batch.num_rows();
1431                assert!(batch.num_rows() > 0);
1432            }
1433            assert_eq!(total_rows, range.num_rows());
1434        }
1435    }
1436}