mito2/memtable/
bulk.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Memtable implementation for bulk load
16
17#[allow(unused)]
18pub mod context;
19#[allow(unused)]
20pub mod part;
21pub mod part_reader;
22mod row_group_reader;
23
24use std::collections::{BTreeMap, HashSet};
25use std::sync::atomic::{AtomicI64, AtomicU64, AtomicUsize, Ordering};
26use std::sync::{Arc, Mutex, RwLock};
27use std::time::Instant;
28
29use datatypes::arrow::datatypes::SchemaRef;
30use mito_codec::key_values::KeyValue;
31use rayon::prelude::*;
32use store_api::metadata::RegionMetadataRef;
33use store_api::storage::{ColumnId, FileId, RegionId, SequenceNumber};
34use tokio::sync::Semaphore;
35
36use crate::error::{Result, UnsupportedOperationSnafu};
37use crate::flush::WriteBufferManagerRef;
38use crate::memtable::bulk::context::BulkIterContext;
39use crate::memtable::bulk::part::{BulkPart, BulkPartEncodeMetrics, BulkPartEncoder};
40use crate::memtable::bulk::part_reader::BulkPartRecordBatchIter;
41use crate::memtable::stats::WriteMetrics;
42use crate::memtable::{
43    AllocTracker, BoxedBatchIterator, BoxedRecordBatchIterator, EncodedBulkPart, EncodedRange,
44    IterBuilder, KeyValues, MemScanMetrics, Memtable, MemtableBuilder, MemtableId, MemtableRange,
45    MemtableRangeContext, MemtableRanges, MemtableRef, MemtableStats, PredicateGroup,
46};
47use crate::read::flat_dedup::{FlatDedupIterator, FlatLastNonNull, FlatLastRow};
48use crate::read::flat_merge::FlatMergeIterator;
49use crate::region::options::MergeMode;
50use crate::sst::parquet::format::FIXED_POS_COLUMN_NUM;
51use crate::sst::parquet::{DEFAULT_READ_BATCH_SIZE, DEFAULT_ROW_GROUP_SIZE};
52use crate::sst::{FlatSchemaOptions, to_flat_sst_arrow_schema};
53
54/// All parts in a bulk memtable.
55#[derive(Default)]
56struct BulkParts {
57    /// Raw parts.
58    parts: Vec<BulkPartWrapper>,
59    /// Parts encoded as parquets.
60    encoded_parts: Vec<EncodedPartWrapper>,
61}
62
63impl BulkParts {
64    /// Total number of parts (raw + encoded).
65    fn num_parts(&self) -> usize {
66        self.parts.len() + self.encoded_parts.len()
67    }
68
69    /// Returns true if there is no part.
70    fn is_empty(&self) -> bool {
71        self.parts.is_empty() && self.encoded_parts.is_empty()
72    }
73
74    /// Returns true if the bulk parts should be merged.
75    fn should_merge_bulk_parts(&self) -> bool {
76        let unmerged_count = self.parts.iter().filter(|wrapper| !wrapper.merging).count();
77        // If the total number of unmerged parts is >= 8, start a merge task.
78        unmerged_count >= 8
79    }
80
81    /// Returns true if the encoded parts should be merged.
82    fn should_merge_encoded_parts(&self) -> bool {
83        let unmerged_count = self
84            .encoded_parts
85            .iter()
86            .filter(|wrapper| !wrapper.merging)
87            .count();
88        // If the total number of unmerged encoded parts is >= 8, start a merge task.
89        unmerged_count >= 8
90    }
91
92    /// Collects unmerged parts and marks them as being merged.
93    /// Returns the collected parts to merge.
94    fn collect_bulk_parts_to_merge(&mut self) -> Vec<PartToMerge> {
95        let mut collected_parts = Vec::new();
96
97        for wrapper in &mut self.parts {
98            if !wrapper.merging {
99                wrapper.merging = true;
100                collected_parts.push(PartToMerge::Bulk {
101                    part: wrapper.part.clone(),
102                    file_id: wrapper.file_id,
103                });
104            }
105        }
106        collected_parts
107    }
108
109    /// Collects unmerged encoded parts within size threshold and marks them as being merged.
110    /// Returns the collected parts to merge.
111    fn collect_encoded_parts_to_merge(&mut self) -> Vec<PartToMerge> {
112        // Find minimum size among unmerged parts
113        let min_size = self
114            .encoded_parts
115            .iter()
116            .filter(|wrapper| !wrapper.merging)
117            .map(|wrapper| wrapper.part.size_bytes())
118            .min();
119
120        let Some(min_size) = min_size else {
121            return Vec::new();
122        };
123
124        let max_allowed_size = min_size.saturating_mul(16).min(4 * 1024 * 1024);
125        let mut collected_parts = Vec::new();
126
127        for wrapper in &mut self.encoded_parts {
128            if !wrapper.merging {
129                let size = wrapper.part.size_bytes();
130                if size <= max_allowed_size {
131                    wrapper.merging = true;
132                    collected_parts.push(PartToMerge::Encoded {
133                        part: wrapper.part.clone(),
134                        file_id: wrapper.file_id,
135                    });
136                }
137            }
138        }
139        collected_parts
140    }
141
142    /// Installs merged encoded parts and removes the original parts by file ids.
143    /// Returns the total number of rows in the merged parts.
144    fn install_merged_parts<I>(
145        &mut self,
146        merged_parts: I,
147        merged_file_ids: &HashSet<FileId>,
148        merge_encoded: bool,
149    ) -> usize
150    where
151        I: IntoIterator<Item = EncodedBulkPart>,
152    {
153        let mut total_output_rows = 0;
154
155        for encoded_part in merged_parts {
156            total_output_rows += encoded_part.metadata().num_rows;
157            self.encoded_parts.push(EncodedPartWrapper {
158                part: encoded_part,
159                file_id: FileId::random(),
160                merging: false,
161            });
162        }
163
164        if merge_encoded {
165            self.encoded_parts
166                .retain(|wrapper| !merged_file_ids.contains(&wrapper.file_id));
167        } else {
168            self.parts
169                .retain(|wrapper| !merged_file_ids.contains(&wrapper.file_id));
170        }
171
172        total_output_rows
173    }
174
175    /// Resets merging flag for parts with the given file ids.
176    /// Used when merging fails or is cancelled.
177    fn reset_merging_flags(&mut self, file_ids: &HashSet<FileId>, merge_encoded: bool) {
178        if merge_encoded {
179            for wrapper in &mut self.encoded_parts {
180                if file_ids.contains(&wrapper.file_id) {
181                    wrapper.merging = false;
182                }
183            }
184        } else {
185            for wrapper in &mut self.parts {
186                if file_ids.contains(&wrapper.file_id) {
187                    wrapper.merging = false;
188                }
189            }
190        }
191    }
192}
193
194/// RAII guard for managing merging flags.
195/// Automatically resets merging flags when dropped if the merge operation wasn't successful.
196struct MergingFlagsGuard<'a> {
197    bulk_parts: &'a RwLock<BulkParts>,
198    file_ids: &'a HashSet<FileId>,
199    merge_encoded: bool,
200    success: bool,
201}
202
203impl<'a> MergingFlagsGuard<'a> {
204    /// Creates a new guard for the given file ids.
205    fn new(
206        bulk_parts: &'a RwLock<BulkParts>,
207        file_ids: &'a HashSet<FileId>,
208        merge_encoded: bool,
209    ) -> Self {
210        Self {
211            bulk_parts,
212            file_ids,
213            merge_encoded,
214            success: false,
215        }
216    }
217
218    /// Marks the merge operation as successful.
219    /// When this is called, the guard will not reset the flags on drop.
220    fn mark_success(&mut self) {
221        self.success = true;
222    }
223}
224
225impl<'a> Drop for MergingFlagsGuard<'a> {
226    fn drop(&mut self) {
227        if !self.success
228            && let Ok(mut parts) = self.bulk_parts.write()
229        {
230            parts.reset_merging_flags(self.file_ids, self.merge_encoded);
231        }
232    }
233}
234
235/// Memtable that ingests and scans parts directly.
236pub struct BulkMemtable {
237    id: MemtableId,
238    parts: Arc<RwLock<BulkParts>>,
239    metadata: RegionMetadataRef,
240    alloc_tracker: AllocTracker,
241    max_timestamp: AtomicI64,
242    min_timestamp: AtomicI64,
243    max_sequence: AtomicU64,
244    num_rows: AtomicUsize,
245    /// Cached flat SST arrow schema for memtable compaction.
246    #[allow(dead_code)]
247    flat_arrow_schema: SchemaRef,
248    /// Compactor for merging bulk parts
249    compactor: Arc<Mutex<MemtableCompactor>>,
250    /// Dispatcher for scheduling compaction tasks
251    compact_dispatcher: Option<Arc<CompactDispatcher>>,
252    /// Whether the append mode is enabled
253    append_mode: bool,
254    /// Mode to handle duplicate rows while merging
255    merge_mode: MergeMode,
256}
257
258impl std::fmt::Debug for BulkMemtable {
259    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
260        f.debug_struct("BulkMemtable")
261            .field("id", &self.id)
262            .field("num_rows", &self.num_rows.load(Ordering::Relaxed))
263            .field("min_timestamp", &self.min_timestamp.load(Ordering::Relaxed))
264            .field("max_timestamp", &self.max_timestamp.load(Ordering::Relaxed))
265            .field("max_sequence", &self.max_sequence.load(Ordering::Relaxed))
266            .finish()
267    }
268}
269
270impl Memtable for BulkMemtable {
271    fn id(&self) -> MemtableId {
272        self.id
273    }
274
275    fn write(&self, _kvs: &KeyValues) -> Result<()> {
276        UnsupportedOperationSnafu {
277            err_msg: "write() is not supported for bulk memtable",
278        }
279        .fail()
280    }
281
282    fn write_one(&self, _key_value: KeyValue) -> Result<()> {
283        UnsupportedOperationSnafu {
284            err_msg: "write_one() is not supported for bulk memtable",
285        }
286        .fail()
287    }
288
289    fn write_bulk(&self, fragment: BulkPart) -> Result<()> {
290        let local_metrics = WriteMetrics {
291            key_bytes: 0,
292            value_bytes: fragment.estimated_size(),
293            min_ts: fragment.min_timestamp,
294            max_ts: fragment.max_timestamp,
295            num_rows: fragment.num_rows(),
296            max_sequence: fragment.sequence,
297        };
298
299        {
300            let mut bulk_parts = self.parts.write().unwrap();
301            bulk_parts.parts.push(BulkPartWrapper {
302                part: fragment,
303                file_id: FileId::random(),
304                merging: false,
305            });
306
307            // Since this operation should be fast, we do it in parts lock scope.
308            // This ensure the statistics in `ranges()` are correct. What's more,
309            // it guarantees no rows are out of the time range so we don't need to
310            // prune rows by time range again in the iterator of the MemtableRange.
311            self.update_stats(local_metrics);
312        }
313
314        if self.should_compact() {
315            self.schedule_compact();
316        }
317
318        Ok(())
319    }
320
321    #[cfg(any(test, feature = "test"))]
322    fn iter(
323        &self,
324        _projection: Option<&[ColumnId]>,
325        _predicate: Option<table::predicate::Predicate>,
326        _sequence: Option<SequenceNumber>,
327    ) -> Result<crate::memtable::BoxedBatchIterator> {
328        todo!()
329    }
330
331    fn ranges(
332        &self,
333        projection: Option<&[ColumnId]>,
334        predicate: PredicateGroup,
335        sequence: Option<SequenceNumber>,
336        for_flush: bool,
337    ) -> Result<MemtableRanges> {
338        let mut ranges = BTreeMap::new();
339        let mut range_id = 0;
340
341        // TODO(yingwen): Filter ranges by sequence.
342        let context = Arc::new(BulkIterContext::new(
343            self.metadata.clone(),
344            projection,
345            predicate.predicate().cloned(),
346            for_flush,
347        )?);
348
349        // Adds ranges for regular parts and encoded parts
350        {
351            let bulk_parts = self.parts.read().unwrap();
352
353            // Adds ranges for regular parts
354            for part_wrapper in bulk_parts.parts.iter() {
355                // Skips empty parts
356                if part_wrapper.part.num_rows() == 0 {
357                    continue;
358                }
359
360                let range = MemtableRange::new(
361                    Arc::new(MemtableRangeContext::new(
362                        self.id,
363                        Box::new(BulkRangeIterBuilder {
364                            part: part_wrapper.part.clone(),
365                            context: context.clone(),
366                            sequence,
367                        }),
368                        predicate.clone(),
369                    )),
370                    part_wrapper.part.num_rows(),
371                );
372                ranges.insert(range_id, range);
373                range_id += 1;
374            }
375
376            // Adds ranges for encoded parts
377            for encoded_part_wrapper in bulk_parts.encoded_parts.iter() {
378                // Skips empty parts
379                if encoded_part_wrapper.part.metadata().num_rows == 0 {
380                    continue;
381                }
382
383                let range = MemtableRange::new(
384                    Arc::new(MemtableRangeContext::new(
385                        self.id,
386                        Box::new(EncodedBulkRangeIterBuilder {
387                            file_id: encoded_part_wrapper.file_id,
388                            part: encoded_part_wrapper.part.clone(),
389                            context: context.clone(),
390                            sequence,
391                        }),
392                        predicate.clone(),
393                    )),
394                    encoded_part_wrapper.part.metadata().num_rows,
395                );
396                ranges.insert(range_id, range);
397                range_id += 1;
398            }
399        }
400
401        let mut stats = self.stats();
402        stats.num_ranges = ranges.len();
403
404        // TODO(yingwen): Supports per range stats.
405        Ok(MemtableRanges { ranges, stats })
406    }
407
408    fn is_empty(&self) -> bool {
409        let bulk_parts = self.parts.read().unwrap();
410        bulk_parts.is_empty()
411    }
412
413    fn freeze(&self) -> Result<()> {
414        self.alloc_tracker.done_allocating();
415        Ok(())
416    }
417
418    fn stats(&self) -> MemtableStats {
419        let estimated_bytes = self.alloc_tracker.bytes_allocated();
420
421        if estimated_bytes == 0 || self.num_rows.load(Ordering::Relaxed) == 0 {
422            return MemtableStats {
423                estimated_bytes,
424                time_range: None,
425                num_rows: 0,
426                num_ranges: 0,
427                max_sequence: 0,
428                series_count: 0,
429            };
430        }
431
432        let ts_type = self
433            .metadata
434            .time_index_column()
435            .column_schema
436            .data_type
437            .clone()
438            .as_timestamp()
439            .expect("Timestamp column must have timestamp type");
440        let max_timestamp = ts_type.create_timestamp(self.max_timestamp.load(Ordering::Relaxed));
441        let min_timestamp = ts_type.create_timestamp(self.min_timestamp.load(Ordering::Relaxed));
442
443        let num_ranges = self.parts.read().unwrap().num_parts();
444
445        MemtableStats {
446            estimated_bytes,
447            time_range: Some((min_timestamp, max_timestamp)),
448            num_rows: self.num_rows.load(Ordering::Relaxed),
449            num_ranges,
450            max_sequence: self.max_sequence.load(Ordering::Relaxed),
451            series_count: self.estimated_series_count(),
452        }
453    }
454
455    fn fork(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef {
456        // Computes the new flat schema based on the new metadata.
457        let flat_arrow_schema = to_flat_sst_arrow_schema(
458            metadata,
459            &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
460        );
461
462        Arc::new(Self {
463            id,
464            parts: Arc::new(RwLock::new(BulkParts::default())),
465            metadata: metadata.clone(),
466            alloc_tracker: AllocTracker::new(self.alloc_tracker.write_buffer_manager()),
467            max_timestamp: AtomicI64::new(i64::MIN),
468            min_timestamp: AtomicI64::new(i64::MAX),
469            max_sequence: AtomicU64::new(0),
470            num_rows: AtomicUsize::new(0),
471            flat_arrow_schema,
472            compactor: Arc::new(Mutex::new(MemtableCompactor::new(metadata.region_id, id))),
473            compact_dispatcher: self.compact_dispatcher.clone(),
474            append_mode: self.append_mode,
475            merge_mode: self.merge_mode,
476        })
477    }
478
479    fn compact(&self, for_flush: bool) -> Result<()> {
480        let mut compactor = self.compactor.lock().unwrap();
481
482        if for_flush {
483            return Ok(());
484        }
485
486        // Try to merge regular parts first
487        let should_merge = self.parts.read().unwrap().should_merge_bulk_parts();
488        if should_merge {
489            compactor.merge_bulk_parts(
490                &self.flat_arrow_schema,
491                &self.parts,
492                &self.metadata,
493                !self.append_mode,
494                self.merge_mode,
495            )?;
496        }
497
498        // Then try to merge encoded parts
499        let should_merge = self.parts.read().unwrap().should_merge_encoded_parts();
500        if should_merge {
501            compactor.merge_encoded_parts(
502                &self.flat_arrow_schema,
503                &self.parts,
504                &self.metadata,
505                !self.append_mode,
506                self.merge_mode,
507            )?;
508        }
509
510        Ok(())
511    }
512}
513
514impl BulkMemtable {
515    /// Creates a new BulkMemtable
516    pub fn new(
517        id: MemtableId,
518        metadata: RegionMetadataRef,
519        write_buffer_manager: Option<WriteBufferManagerRef>,
520        compact_dispatcher: Option<Arc<CompactDispatcher>>,
521        append_mode: bool,
522        merge_mode: MergeMode,
523    ) -> Self {
524        let flat_arrow_schema = to_flat_sst_arrow_schema(
525            &metadata,
526            &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
527        );
528
529        let region_id = metadata.region_id;
530        Self {
531            id,
532            parts: Arc::new(RwLock::new(BulkParts::default())),
533            metadata,
534            alloc_tracker: AllocTracker::new(write_buffer_manager),
535            max_timestamp: AtomicI64::new(i64::MIN),
536            min_timestamp: AtomicI64::new(i64::MAX),
537            max_sequence: AtomicU64::new(0),
538            num_rows: AtomicUsize::new(0),
539            flat_arrow_schema,
540            compactor: Arc::new(Mutex::new(MemtableCompactor::new(region_id, id))),
541            compact_dispatcher,
542            append_mode,
543            merge_mode,
544        }
545    }
546
547    /// Updates memtable stats.
548    ///
549    /// Please update this inside the write lock scope.
550    fn update_stats(&self, stats: WriteMetrics) {
551        self.alloc_tracker
552            .on_allocation(stats.key_bytes + stats.value_bytes);
553
554        self.max_timestamp
555            .fetch_max(stats.max_ts, Ordering::Relaxed);
556        self.min_timestamp
557            .fetch_min(stats.min_ts, Ordering::Relaxed);
558        self.max_sequence
559            .fetch_max(stats.max_sequence, Ordering::Relaxed);
560        self.num_rows.fetch_add(stats.num_rows, Ordering::Relaxed);
561    }
562
563    /// Returns the estimated time series count.
564    fn estimated_series_count(&self) -> usize {
565        let bulk_parts = self.parts.read().unwrap();
566        bulk_parts
567            .parts
568            .iter()
569            .map(|part_wrapper| part_wrapper.part.estimated_series_count())
570            .sum()
571    }
572
573    /// Returns whether the memtable should be compacted.
574    fn should_compact(&self) -> bool {
575        let parts = self.parts.read().unwrap();
576        parts.should_merge_bulk_parts() || parts.should_merge_encoded_parts()
577    }
578
579    /// Schedules a compaction task using the CompactDispatcher.
580    fn schedule_compact(&self) {
581        if let Some(dispatcher) = &self.compact_dispatcher {
582            let task = MemCompactTask {
583                metadata: self.metadata.clone(),
584                parts: self.parts.clone(),
585                flat_arrow_schema: self.flat_arrow_schema.clone(),
586                compactor: self.compactor.clone(),
587                append_mode: self.append_mode,
588                merge_mode: self.merge_mode,
589            };
590
591            dispatcher.dispatch_compact(task);
592        } else {
593            // Uses synchronous compaction if no dispatcher is available.
594            if let Err(e) = self.compact(false) {
595                common_telemetry::error!(e; "Failed to compact table");
596            }
597        }
598    }
599}
600
601/// Iterator builder for bulk range
602struct BulkRangeIterBuilder {
603    part: BulkPart,
604    context: Arc<BulkIterContext>,
605    sequence: Option<SequenceNumber>,
606}
607
608impl IterBuilder for BulkRangeIterBuilder {
609    fn build(&self, _metrics: Option<MemScanMetrics>) -> Result<BoxedBatchIterator> {
610        UnsupportedOperationSnafu {
611            err_msg: "BatchIterator is not supported for bulk memtable",
612        }
613        .fail()
614    }
615
616    fn is_record_batch(&self) -> bool {
617        true
618    }
619
620    fn build_record_batch(
621        &self,
622        _metrics: Option<MemScanMetrics>,
623    ) -> Result<BoxedRecordBatchIterator> {
624        let iter = BulkPartRecordBatchIter::new(
625            self.part.batch.clone(),
626            self.context.clone(),
627            self.sequence,
628        );
629
630        Ok(Box::new(iter))
631    }
632
633    fn encoded_range(&self) -> Option<EncodedRange> {
634        None
635    }
636}
637
638/// Iterator builder for encoded bulk range
639struct EncodedBulkRangeIterBuilder {
640    #[allow(dead_code)]
641    file_id: FileId,
642    part: EncodedBulkPart,
643    context: Arc<BulkIterContext>,
644    sequence: Option<SequenceNumber>,
645}
646
647impl IterBuilder for EncodedBulkRangeIterBuilder {
648    fn build(&self, _metrics: Option<MemScanMetrics>) -> Result<BoxedBatchIterator> {
649        UnsupportedOperationSnafu {
650            err_msg: "BatchIterator is not supported for encoded bulk memtable",
651        }
652        .fail()
653    }
654
655    fn is_record_batch(&self) -> bool {
656        true
657    }
658
659    fn build_record_batch(
660        &self,
661        _metrics: Option<MemScanMetrics>,
662    ) -> Result<BoxedRecordBatchIterator> {
663        if let Some(iter) = self.part.read(self.context.clone(), self.sequence)? {
664            Ok(iter)
665        } else {
666            // Return an empty iterator if no data to read
667            Ok(Box::new(std::iter::empty()))
668        }
669    }
670
671    fn encoded_range(&self) -> Option<EncodedRange> {
672        Some(EncodedRange {
673            data: self.part.data().clone(),
674            sst_info: self.part.to_sst_info(self.file_id),
675        })
676    }
677}
678
679struct BulkPartWrapper {
680    part: BulkPart,
681    /// The unique file id for this part in memtable.
682    #[allow(dead_code)]
683    file_id: FileId,
684    /// Whether this part is currently being merged.
685    merging: bool,
686}
687
688struct EncodedPartWrapper {
689    part: EncodedBulkPart,
690    /// The unique file id for this part in memtable.
691    #[allow(dead_code)]
692    file_id: FileId,
693    /// Whether this part is currently being merged.
694    merging: bool,
695}
696
697/// Enum to wrap different types of parts for unified merging.
698#[derive(Clone)]
699enum PartToMerge {
700    /// Raw bulk part.
701    Bulk { part: BulkPart, file_id: FileId },
702    /// Encoded bulk part.
703    Encoded {
704        part: EncodedBulkPart,
705        file_id: FileId,
706    },
707}
708
709impl PartToMerge {
710    /// Gets the file ID of this part.
711    fn file_id(&self) -> FileId {
712        match self {
713            PartToMerge::Bulk { file_id, .. } => *file_id,
714            PartToMerge::Encoded { file_id, .. } => *file_id,
715        }
716    }
717
718    /// Gets the minimum timestamp of this part.
719    fn min_timestamp(&self) -> i64 {
720        match self {
721            PartToMerge::Bulk { part, .. } => part.min_timestamp,
722            PartToMerge::Encoded { part, .. } => part.metadata().min_timestamp,
723        }
724    }
725
726    /// Gets the maximum timestamp of this part.
727    fn max_timestamp(&self) -> i64 {
728        match self {
729            PartToMerge::Bulk { part, .. } => part.max_timestamp,
730            PartToMerge::Encoded { part, .. } => part.metadata().max_timestamp,
731        }
732    }
733
734    /// Gets the number of rows in this part.
735    fn num_rows(&self) -> usize {
736        match self {
737            PartToMerge::Bulk { part, .. } => part.num_rows(),
738            PartToMerge::Encoded { part, .. } => part.metadata().num_rows,
739        }
740    }
741
742    /// Creates a record batch iterator for this part.
743    fn create_iterator(
744        self,
745        context: Arc<BulkIterContext>,
746    ) -> Result<Option<BoxedRecordBatchIterator>> {
747        match self {
748            PartToMerge::Bulk { part, .. } => {
749                let iter = BulkPartRecordBatchIter::new(
750                    part.batch, context, None, // No sequence filter for merging
751                );
752                Ok(Some(Box::new(iter) as BoxedRecordBatchIterator))
753            }
754            PartToMerge::Encoded { part, .. } => part.read(context, None),
755        }
756    }
757}
758
759struct MemtableCompactor {
760    region_id: RegionId,
761    memtable_id: MemtableId,
762}
763
764impl MemtableCompactor {
765    /// Creates a new MemtableCompactor.
766    fn new(region_id: RegionId, memtable_id: MemtableId) -> Self {
767        Self {
768            region_id,
769            memtable_id,
770        }
771    }
772
773    /// Merges bulk parts and then encodes the result to an [EncodedBulkPart].
774    fn merge_bulk_parts(
775        &mut self,
776        arrow_schema: &SchemaRef,
777        bulk_parts: &RwLock<BulkParts>,
778        metadata: &RegionMetadataRef,
779        dedup: bool,
780        merge_mode: MergeMode,
781    ) -> Result<()> {
782        let start = Instant::now();
783
784        // Collects unmerged parts and mark them as being merged
785        let parts_to_merge = bulk_parts.write().unwrap().collect_bulk_parts_to_merge();
786        if parts_to_merge.is_empty() {
787            return Ok(());
788        }
789
790        let merged_file_ids: HashSet<FileId> =
791            parts_to_merge.iter().map(|part| part.file_id()).collect();
792        let mut guard = MergingFlagsGuard::new(bulk_parts, &merged_file_ids, false);
793
794        // Sorts parts by row count (ascending) to merge parts with similar row counts.
795        let mut sorted_parts = parts_to_merge;
796        sorted_parts.sort_unstable_by_key(|part| part.num_rows());
797
798        // Groups parts into chunks for concurrent processing.
799        let part_groups: Vec<Vec<PartToMerge>> = sorted_parts
800            .chunks(16)
801            .map(|chunk| chunk.to_vec())
802            .collect();
803
804        let total_groups = part_groups.len();
805        let total_parts_to_merge: usize = part_groups.iter().map(|group| group.len()).sum();
806        let merged_parts = part_groups
807            .into_par_iter()
808            .map(|group| Self::merge_parts_group(group, arrow_schema, metadata, dedup, merge_mode))
809            .collect::<Result<Vec<Option<EncodedBulkPart>>>>()?;
810
811        // Installs merged parts.
812        let total_output_rows = {
813            let mut parts = bulk_parts.write().unwrap();
814            parts.install_merged_parts(merged_parts.into_iter().flatten(), &merged_file_ids, false)
815        };
816
817        guard.mark_success();
818
819        common_telemetry::debug!(
820            "BulkMemtable {} {} concurrent compact {} groups, {} bulk parts, {} rows, cost: {:?}",
821            self.region_id,
822            self.memtable_id,
823            total_groups,
824            total_parts_to_merge,
825            total_output_rows,
826            start.elapsed()
827        );
828
829        Ok(())
830    }
831
832    /// Merges encoded parts and then encodes the result to an [EncodedBulkPart].
833    fn merge_encoded_parts(
834        &mut self,
835        arrow_schema: &SchemaRef,
836        bulk_parts: &RwLock<BulkParts>,
837        metadata: &RegionMetadataRef,
838        dedup: bool,
839        merge_mode: MergeMode,
840    ) -> Result<()> {
841        let start = Instant::now();
842
843        // Collects unmerged encoded parts within size threshold and mark them as being merged.
844        let parts_to_merge = {
845            let mut parts = bulk_parts.write().unwrap();
846            parts.collect_encoded_parts_to_merge()
847        };
848
849        if parts_to_merge.is_empty() {
850            return Ok(());
851        }
852
853        let merged_file_ids: HashSet<FileId> =
854            parts_to_merge.iter().map(|part| part.file_id()).collect();
855        let mut guard = MergingFlagsGuard::new(bulk_parts, &merged_file_ids, true);
856
857        if parts_to_merge.len() == 1 {
858            // Only 1 part, don't have to merge - the guard will automatically reset the flag
859            return Ok(());
860        }
861
862        // Groups parts into chunks for concurrent processing.
863        let part_groups: Vec<Vec<PartToMerge>> = parts_to_merge
864            .chunks(16)
865            .map(|chunk| chunk.to_vec())
866            .collect();
867
868        let total_groups = part_groups.len();
869        let total_parts_to_merge: usize = part_groups.iter().map(|group| group.len()).sum();
870
871        let merged_parts = part_groups
872            .into_par_iter()
873            .map(|group| Self::merge_parts_group(group, arrow_schema, metadata, dedup, merge_mode))
874            .collect::<Result<Vec<Option<EncodedBulkPart>>>>()?;
875
876        // Installs merged parts using iterator and get total output rows
877        let total_output_rows = {
878            let mut parts = bulk_parts.write().unwrap();
879            parts.install_merged_parts(merged_parts.into_iter().flatten(), &merged_file_ids, true)
880        };
881
882        // Marks the operation as successful to prevent flag reset
883        guard.mark_success();
884
885        common_telemetry::debug!(
886            "BulkMemtable {} {} concurrent compact {} groups, {} encoded parts, {} rows, cost: {:?}",
887            self.region_id,
888            self.memtable_id,
889            total_groups,
890            total_parts_to_merge,
891            total_output_rows,
892            start.elapsed()
893        );
894
895        Ok(())
896    }
897
898    /// Merges a group of parts into a single encoded part.
899    fn merge_parts_group(
900        parts_to_merge: Vec<PartToMerge>,
901        arrow_schema: &SchemaRef,
902        metadata: &RegionMetadataRef,
903        dedup: bool,
904        merge_mode: MergeMode,
905    ) -> Result<Option<EncodedBulkPart>> {
906        if parts_to_merge.is_empty() {
907            return Ok(None);
908        }
909
910        // Calculates timestamp bounds for merged data
911        let min_timestamp = parts_to_merge
912            .iter()
913            .map(|p| p.min_timestamp())
914            .min()
915            .unwrap_or(i64::MAX);
916        let max_timestamp = parts_to_merge
917            .iter()
918            .map(|p| p.max_timestamp())
919            .max()
920            .unwrap_or(i64::MIN);
921
922        let context = Arc::new(BulkIterContext::new(
923            metadata.clone(),
924            None, // No column projection for merging
925            None, // No predicate for merging
926            true,
927        )?);
928
929        // Creates iterators for all parts to merge.
930        let iterators: Vec<BoxedRecordBatchIterator> = parts_to_merge
931            .into_iter()
932            .filter_map(|part| part.create_iterator(context.clone()).ok().flatten())
933            .collect();
934
935        if iterators.is_empty() {
936            return Ok(None);
937        }
938
939        let merged_iter =
940            FlatMergeIterator::new(arrow_schema.clone(), iterators, DEFAULT_READ_BATCH_SIZE)?;
941
942        let boxed_iter: BoxedRecordBatchIterator = if dedup {
943            // Applies deduplication based on merge mode
944            match merge_mode {
945                MergeMode::LastRow => {
946                    let dedup_iter = FlatDedupIterator::new(merged_iter, FlatLastRow::new(false));
947                    Box::new(dedup_iter)
948                }
949                MergeMode::LastNonNull => {
950                    // Calculates field column start: total columns - fixed columns - field columns
951                    // Field column count = total metadata columns - time index column - primary key columns
952                    let field_column_count =
953                        metadata.column_metadatas.len() - 1 - metadata.primary_key.len();
954                    let total_columns = arrow_schema.fields().len();
955                    let field_column_start =
956                        total_columns - FIXED_POS_COLUMN_NUM - field_column_count;
957
958                    let dedup_iter = FlatDedupIterator::new(
959                        merged_iter,
960                        FlatLastNonNull::new(field_column_start, false),
961                    );
962                    Box::new(dedup_iter)
963                }
964            }
965        } else {
966            Box::new(merged_iter)
967        };
968
969        // Encodes the merged iterator
970        let encoder = BulkPartEncoder::new(metadata.clone(), DEFAULT_ROW_GROUP_SIZE)?;
971        let mut metrics = BulkPartEncodeMetrics::default();
972        let encoded_part = encoder.encode_record_batch_iter(
973            boxed_iter,
974            arrow_schema.clone(),
975            min_timestamp,
976            max_timestamp,
977            &mut metrics,
978        )?;
979
980        common_telemetry::trace!("merge_parts_group metrics: {:?}", metrics);
981
982        Ok(encoded_part)
983    }
984}
985
986/// A memtable compact task to run in background.
987struct MemCompactTask {
988    metadata: RegionMetadataRef,
989    parts: Arc<RwLock<BulkParts>>,
990
991    /// Cached flat SST arrow schema
992    flat_arrow_schema: SchemaRef,
993    /// Compactor for merging bulk parts
994    compactor: Arc<Mutex<MemtableCompactor>>,
995    /// Whether the append mode is enabled
996    append_mode: bool,
997    /// Mode to handle duplicate rows while merging
998    merge_mode: MergeMode,
999}
1000
1001impl MemCompactTask {
1002    fn compact(&self) -> Result<()> {
1003        let mut compactor = self.compactor.lock().unwrap();
1004
1005        // Tries to merge regular parts first
1006        let should_merge = self.parts.read().unwrap().should_merge_bulk_parts();
1007        if should_merge {
1008            compactor.merge_bulk_parts(
1009                &self.flat_arrow_schema,
1010                &self.parts,
1011                &self.metadata,
1012                !self.append_mode,
1013                self.merge_mode,
1014            )?;
1015        }
1016
1017        // Then tries to merge encoded parts
1018        let should_merge = self.parts.read().unwrap().should_merge_encoded_parts();
1019        if should_merge {
1020            compactor.merge_encoded_parts(
1021                &self.flat_arrow_schema,
1022                &self.parts,
1023                &self.metadata,
1024                !self.append_mode,
1025                self.merge_mode,
1026            )?;
1027        }
1028
1029        Ok(())
1030    }
1031}
1032
1033/// Scheduler to run compact tasks in background.
1034#[derive(Debug)]
1035pub struct CompactDispatcher {
1036    semaphore: Arc<Semaphore>,
1037}
1038
1039impl CompactDispatcher {
1040    /// Creates a new dispatcher with the given number of max concurrent tasks.
1041    pub fn new(permits: usize) -> Self {
1042        Self {
1043            semaphore: Arc::new(Semaphore::new(permits)),
1044        }
1045    }
1046
1047    /// Dispatches a compact task to run in background.
1048    fn dispatch_compact(&self, task: MemCompactTask) {
1049        let semaphore = self.semaphore.clone();
1050        common_runtime::spawn_global(async move {
1051            let Ok(_permit) = semaphore.acquire().await else {
1052                return;
1053            };
1054
1055            common_runtime::spawn_blocking_global(move || {
1056                if let Err(e) = task.compact() {
1057                    common_telemetry::error!(e; "Failed to compact memtable, region: {}", task.metadata.region_id);
1058                }
1059            });
1060        });
1061    }
1062}
1063
1064/// Builder to build a [BulkMemtable].
1065#[derive(Debug, Default)]
1066pub struct BulkMemtableBuilder {
1067    write_buffer_manager: Option<WriteBufferManagerRef>,
1068    compact_dispatcher: Option<Arc<CompactDispatcher>>,
1069    append_mode: bool,
1070    merge_mode: MergeMode,
1071}
1072
1073impl BulkMemtableBuilder {
1074    /// Creates a new builder with specific `write_buffer_manager`.
1075    pub fn new(
1076        write_buffer_manager: Option<WriteBufferManagerRef>,
1077        append_mode: bool,
1078        merge_mode: MergeMode,
1079    ) -> Self {
1080        Self {
1081            write_buffer_manager,
1082            compact_dispatcher: None,
1083            append_mode,
1084            merge_mode,
1085        }
1086    }
1087
1088    /// Sets the compact dispatcher.
1089    pub fn with_compact_dispatcher(mut self, compact_dispatcher: Arc<CompactDispatcher>) -> Self {
1090        self.compact_dispatcher = Some(compact_dispatcher);
1091        self
1092    }
1093}
1094
1095impl MemtableBuilder for BulkMemtableBuilder {
1096    fn build(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef {
1097        Arc::new(BulkMemtable::new(
1098            id,
1099            metadata.clone(),
1100            self.write_buffer_manager.clone(),
1101            self.compact_dispatcher.clone(),
1102            self.append_mode,
1103            self.merge_mode,
1104        ))
1105    }
1106
1107    fn use_bulk_insert(&self, _metadata: &RegionMetadataRef) -> bool {
1108        true
1109    }
1110}
1111
1112#[cfg(test)]
1113mod tests {
1114
1115    use mito_codec::row_converter::build_primary_key_codec;
1116
1117    use super::*;
1118    use crate::memtable::bulk::part::BulkPartConverter;
1119    use crate::read::scan_region::PredicateGroup;
1120    use crate::sst::{FlatSchemaOptions, to_flat_sst_arrow_schema};
1121    use crate::test_util::memtable_util::{build_key_values_with_ts_seq_values, metadata_for_test};
1122
1123    fn create_bulk_part_with_converter(
1124        k0: &str,
1125        k1: u32,
1126        timestamps: Vec<i64>,
1127        values: Vec<Option<f64>>,
1128        sequence: u64,
1129    ) -> Result<BulkPart> {
1130        let metadata = metadata_for_test();
1131        let capacity = 100;
1132        let primary_key_codec = build_primary_key_codec(&metadata);
1133        let schema = to_flat_sst_arrow_schema(
1134            &metadata,
1135            &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
1136        );
1137
1138        let mut converter =
1139            BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec, true);
1140
1141        let key_values = build_key_values_with_ts_seq_values(
1142            &metadata,
1143            k0.to_string(),
1144            k1,
1145            timestamps.into_iter(),
1146            values.into_iter(),
1147            sequence,
1148        );
1149
1150        converter.append_key_values(&key_values)?;
1151        converter.convert()
1152    }
1153
1154    #[test]
1155    fn test_bulk_memtable_write_read() {
1156        let metadata = metadata_for_test();
1157        let memtable =
1158            BulkMemtable::new(999, metadata.clone(), None, None, false, MergeMode::LastRow);
1159
1160        let test_data = [
1161            (
1162                "key_a",
1163                1u32,
1164                vec![1000i64, 2000i64],
1165                vec![Some(10.5), Some(20.5)],
1166                100u64,
1167            ),
1168            (
1169                "key_b",
1170                2u32,
1171                vec![1500i64, 2500i64],
1172                vec![Some(15.5), Some(25.5)],
1173                200u64,
1174            ),
1175            ("key_c", 3u32, vec![3000i64], vec![Some(30.5)], 300u64),
1176        ];
1177
1178        for (k0, k1, timestamps, values, seq) in test_data.iter() {
1179            let part =
1180                create_bulk_part_with_converter(k0, *k1, timestamps.clone(), values.clone(), *seq)
1181                    .unwrap();
1182            memtable.write_bulk(part).unwrap();
1183        }
1184
1185        let stats = memtable.stats();
1186        assert_eq!(5, stats.num_rows);
1187        assert_eq!(3, stats.num_ranges);
1188        assert_eq!(300, stats.max_sequence);
1189
1190        let (min_ts, max_ts) = stats.time_range.unwrap();
1191        assert_eq!(1000, min_ts.value());
1192        assert_eq!(3000, max_ts.value());
1193
1194        let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
1195        let ranges = memtable.ranges(None, predicate_group, None, false).unwrap();
1196
1197        assert_eq!(3, ranges.ranges.len());
1198        assert_eq!(5, ranges.stats.num_rows);
1199
1200        for (_range_id, range) in ranges.ranges.iter() {
1201            assert!(range.num_rows() > 0);
1202            assert!(range.is_record_batch());
1203
1204            let record_batch_iter = range.build_record_batch_iter(None).unwrap();
1205
1206            let mut total_rows = 0;
1207            for batch_result in record_batch_iter {
1208                let batch = batch_result.unwrap();
1209                total_rows += batch.num_rows();
1210                assert!(batch.num_rows() > 0);
1211                assert_eq!(8, batch.num_columns());
1212            }
1213            assert_eq!(total_rows, range.num_rows());
1214        }
1215    }
1216
1217    #[test]
1218    fn test_bulk_memtable_ranges_with_projection() {
1219        let metadata = metadata_for_test();
1220        let memtable =
1221            BulkMemtable::new(111, metadata.clone(), None, None, false, MergeMode::LastRow);
1222
1223        let bulk_part = create_bulk_part_with_converter(
1224            "projection_test",
1225            5,
1226            vec![5000, 6000, 7000],
1227            vec![Some(50.0), Some(60.0), Some(70.0)],
1228            500,
1229        )
1230        .unwrap();
1231
1232        memtable.write_bulk(bulk_part).unwrap();
1233
1234        let projection = vec![4u32];
1235        let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
1236        let ranges = memtable
1237            .ranges(Some(&projection), predicate_group, None, false)
1238            .unwrap();
1239
1240        assert_eq!(1, ranges.ranges.len());
1241        let range = ranges.ranges.get(&0).unwrap();
1242
1243        assert!(range.is_record_batch());
1244        let record_batch_iter = range.build_record_batch_iter(None).unwrap();
1245
1246        let mut total_rows = 0;
1247        for batch_result in record_batch_iter {
1248            let batch = batch_result.unwrap();
1249            assert!(batch.num_rows() > 0);
1250            assert_eq!(5, batch.num_columns());
1251            total_rows += batch.num_rows();
1252        }
1253        assert_eq!(3, total_rows);
1254    }
1255
1256    #[test]
1257    fn test_bulk_memtable_unsupported_operations() {
1258        let metadata = metadata_for_test();
1259        let memtable =
1260            BulkMemtable::new(111, metadata.clone(), None, None, false, MergeMode::LastRow);
1261
1262        let key_values = build_key_values_with_ts_seq_values(
1263            &metadata,
1264            "test".to_string(),
1265            1,
1266            vec![1000].into_iter(),
1267            vec![Some(1.0)].into_iter(),
1268            1,
1269        );
1270
1271        let err = memtable.write(&key_values).unwrap_err();
1272        assert!(err.to_string().contains("not supported"));
1273
1274        let kv = key_values.iter().next().unwrap();
1275        let err = memtable.write_one(kv).unwrap_err();
1276        assert!(err.to_string().contains("not supported"));
1277    }
1278
1279    #[test]
1280    fn test_bulk_memtable_freeze() {
1281        let metadata = metadata_for_test();
1282        let memtable =
1283            BulkMemtable::new(222, metadata.clone(), None, None, false, MergeMode::LastRow);
1284
1285        let bulk_part = create_bulk_part_with_converter(
1286            "freeze_test",
1287            10,
1288            vec![10000],
1289            vec![Some(100.0)],
1290            1000,
1291        )
1292        .unwrap();
1293
1294        memtable.write_bulk(bulk_part).unwrap();
1295        memtable.freeze().unwrap();
1296
1297        let stats_after_freeze = memtable.stats();
1298        assert_eq!(1, stats_after_freeze.num_rows);
1299    }
1300
1301    #[test]
1302    fn test_bulk_memtable_fork() {
1303        let metadata = metadata_for_test();
1304        let original_memtable =
1305            BulkMemtable::new(333, metadata.clone(), None, None, false, MergeMode::LastRow);
1306
1307        let bulk_part =
1308            create_bulk_part_with_converter("fork_test", 15, vec![15000], vec![Some(150.0)], 1500)
1309                .unwrap();
1310
1311        original_memtable.write_bulk(bulk_part).unwrap();
1312
1313        let forked_memtable = original_memtable.fork(444, &metadata);
1314
1315        assert_eq!(forked_memtable.id(), 444);
1316        assert!(forked_memtable.is_empty());
1317        assert_eq!(0, forked_memtable.stats().num_rows);
1318
1319        assert!(!original_memtable.is_empty());
1320        assert_eq!(1, original_memtable.stats().num_rows);
1321    }
1322
1323    #[test]
1324    fn test_bulk_memtable_ranges_multiple_parts() {
1325        let metadata = metadata_for_test();
1326        let memtable =
1327            BulkMemtable::new(777, metadata.clone(), None, None, false, MergeMode::LastRow);
1328
1329        let parts_data = vec![
1330            (
1331                "part1",
1332                1u32,
1333                vec![1000i64, 1100i64],
1334                vec![Some(10.0), Some(11.0)],
1335                100u64,
1336            ),
1337            (
1338                "part2",
1339                2u32,
1340                vec![2000i64, 2100i64],
1341                vec![Some(20.0), Some(21.0)],
1342                200u64,
1343            ),
1344            ("part3", 3u32, vec![3000i64], vec![Some(30.0)], 300u64),
1345        ];
1346
1347        for (k0, k1, timestamps, values, seq) in parts_data {
1348            let part = create_bulk_part_with_converter(k0, k1, timestamps, values, seq).unwrap();
1349            memtable.write_bulk(part).unwrap();
1350        }
1351
1352        let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
1353        let ranges = memtable.ranges(None, predicate_group, None, false).unwrap();
1354
1355        assert_eq!(3, ranges.ranges.len());
1356        assert_eq!(5, ranges.stats.num_rows);
1357        assert_eq!(3, ranges.stats.num_ranges);
1358
1359        for (range_id, range) in ranges.ranges.iter() {
1360            assert!(*range_id < 3);
1361            assert!(range.num_rows() > 0);
1362            assert!(range.is_record_batch());
1363        }
1364    }
1365
1366    #[test]
1367    fn test_bulk_memtable_ranges_with_sequence_filter() {
1368        let metadata = metadata_for_test();
1369        let memtable =
1370            BulkMemtable::new(888, metadata.clone(), None, None, false, MergeMode::LastRow);
1371
1372        let part = create_bulk_part_with_converter(
1373            "seq_test",
1374            1,
1375            vec![1000, 2000, 3000],
1376            vec![Some(10.0), Some(20.0), Some(30.0)],
1377            500,
1378        )
1379        .unwrap();
1380
1381        memtable.write_bulk(part).unwrap();
1382
1383        let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
1384        let sequence_filter = Some(400u64); // Filters out rows with sequence > 400
1385        let ranges = memtable
1386            .ranges(None, predicate_group, sequence_filter, false)
1387            .unwrap();
1388
1389        assert_eq!(1, ranges.ranges.len());
1390        let range = ranges.ranges.get(&0).unwrap();
1391
1392        let mut record_batch_iter = range.build_record_batch_iter(None).unwrap();
1393        assert!(record_batch_iter.next().is_none());
1394    }
1395
1396    #[test]
1397    fn test_bulk_memtable_ranges_with_encoded_parts() {
1398        let metadata = metadata_for_test();
1399        let memtable =
1400            BulkMemtable::new(999, metadata.clone(), None, None, false, MergeMode::LastRow);
1401
1402        // Adds enough bulk parts to trigger encoding
1403        for i in 0..10 {
1404            let part = create_bulk_part_with_converter(
1405                &format!("key_{}", i),
1406                i,
1407                vec![1000 + i as i64 * 100],
1408                vec![Some(i as f64 * 10.0)],
1409                100 + i as u64,
1410            )
1411            .unwrap();
1412            memtable.write_bulk(part).unwrap();
1413        }
1414
1415        memtable.compact(false).unwrap();
1416
1417        let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
1418        let ranges = memtable.ranges(None, predicate_group, None, false).unwrap();
1419
1420        // Should have ranges for both bulk parts and encoded parts
1421        assert_eq!(3, ranges.ranges.len());
1422        assert_eq!(10, ranges.stats.num_rows);
1423
1424        for (_range_id, range) in ranges.ranges.iter() {
1425            assert!(range.num_rows() > 0);
1426            assert!(range.is_record_batch());
1427
1428            let record_batch_iter = range.build_record_batch_iter(None).unwrap();
1429            let mut total_rows = 0;
1430            for batch_result in record_batch_iter {
1431                let batch = batch_result.unwrap();
1432                total_rows += batch.num_rows();
1433                assert!(batch.num_rows() > 0);
1434            }
1435            assert_eq!(total_rows, range.num_rows());
1436        }
1437    }
1438}