mito2/memtable/
bulk.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Memtable implementation for bulk load
16
17#[allow(unused)]
18pub mod context;
19#[allow(unused)]
20pub mod part;
21pub mod part_reader;
22mod row_group_reader;
23
24use std::collections::{BTreeMap, HashSet};
25use std::sync::atomic::{AtomicI64, AtomicU64, AtomicUsize, Ordering};
26use std::sync::{Arc, Mutex, RwLock};
27use std::time::Instant;
28
29use datatypes::arrow::datatypes::SchemaRef;
30use mito_codec::key_values::KeyValue;
31use rayon::prelude::*;
32use store_api::metadata::RegionMetadataRef;
33use store_api::storage::{ColumnId, FileId, RegionId, SequenceRange};
34use tokio::sync::Semaphore;
35
36use crate::error::{Result, UnsupportedOperationSnafu};
37use crate::flush::WriteBufferManagerRef;
38use crate::memtable::bulk::context::BulkIterContext;
39use crate::memtable::bulk::part::{BulkPart, BulkPartEncodeMetrics, BulkPartEncoder};
40use crate::memtable::bulk::part_reader::BulkPartRecordBatchIter;
41use crate::memtable::stats::WriteMetrics;
42use crate::memtable::{
43    AllocTracker, BoxedBatchIterator, BoxedRecordBatchIterator, EncodedBulkPart, EncodedRange,
44    IterBuilder, KeyValues, MemScanMetrics, Memtable, MemtableBuilder, MemtableId, MemtableRange,
45    MemtableRangeContext, MemtableRanges, MemtableRef, MemtableStats, RangesOptions,
46};
47use crate::read::flat_dedup::{FlatDedupIterator, FlatLastNonNull, FlatLastRow};
48use crate::read::flat_merge::FlatMergeIterator;
49use crate::region::options::MergeMode;
50use crate::sst::parquet::format::FIXED_POS_COLUMN_NUM;
51use crate::sst::parquet::{DEFAULT_READ_BATCH_SIZE, DEFAULT_ROW_GROUP_SIZE};
52use crate::sst::{FlatSchemaOptions, to_flat_sst_arrow_schema};
53
54/// All parts in a bulk memtable.
55#[derive(Default)]
56struct BulkParts {
57    /// Raw parts.
58    parts: Vec<BulkPartWrapper>,
59    /// Parts encoded as parquets.
60    encoded_parts: Vec<EncodedPartWrapper>,
61}
62
63impl BulkParts {
64    /// Total number of parts (raw + encoded).
65    fn num_parts(&self) -> usize {
66        self.parts.len() + self.encoded_parts.len()
67    }
68
69    /// Returns true if there is no part.
70    fn is_empty(&self) -> bool {
71        self.parts.is_empty() && self.encoded_parts.is_empty()
72    }
73
74    /// Returns true if the bulk parts should be merged.
75    fn should_merge_bulk_parts(&self) -> bool {
76        let unmerged_count = self.parts.iter().filter(|wrapper| !wrapper.merging).count();
77        // If the total number of unmerged parts is >= 8, start a merge task.
78        unmerged_count >= 8
79    }
80
81    /// Returns true if the encoded parts should be merged.
82    fn should_merge_encoded_parts(&self) -> bool {
83        let unmerged_count = self
84            .encoded_parts
85            .iter()
86            .filter(|wrapper| !wrapper.merging)
87            .count();
88        // If the total number of unmerged encoded parts is >= 8, start a merge task.
89        unmerged_count >= 8
90    }
91
92    /// Collects unmerged parts and marks them as being merged.
93    /// Returns the collected parts to merge.
94    fn collect_bulk_parts_to_merge(&mut self) -> Vec<PartToMerge> {
95        let mut collected_parts = Vec::new();
96
97        for wrapper in &mut self.parts {
98            if !wrapper.merging {
99                wrapper.merging = true;
100                collected_parts.push(PartToMerge::Bulk {
101                    part: wrapper.part.clone(),
102                    file_id: wrapper.file_id,
103                });
104            }
105        }
106        collected_parts
107    }
108
109    /// Collects unmerged encoded parts within size threshold and marks them as being merged.
110    /// Returns the collected parts to merge.
111    fn collect_encoded_parts_to_merge(&mut self) -> Vec<PartToMerge> {
112        // Find minimum size among unmerged parts
113        let min_size = self
114            .encoded_parts
115            .iter()
116            .filter(|wrapper| !wrapper.merging)
117            .map(|wrapper| wrapper.part.size_bytes())
118            .min();
119
120        let Some(min_size) = min_size else {
121            return Vec::new();
122        };
123
124        let max_allowed_size = min_size.saturating_mul(16).min(4 * 1024 * 1024);
125        let mut collected_parts = Vec::new();
126
127        for wrapper in &mut self.encoded_parts {
128            if !wrapper.merging {
129                let size = wrapper.part.size_bytes();
130                if size <= max_allowed_size {
131                    wrapper.merging = true;
132                    collected_parts.push(PartToMerge::Encoded {
133                        part: wrapper.part.clone(),
134                        file_id: wrapper.file_id,
135                    });
136                }
137            }
138        }
139        collected_parts
140    }
141
142    /// Installs merged encoded parts and removes the original parts by file ids.
143    /// Returns the total number of rows in the merged parts.
144    fn install_merged_parts<I>(
145        &mut self,
146        merged_parts: I,
147        merged_file_ids: &HashSet<FileId>,
148        merge_encoded: bool,
149    ) -> usize
150    where
151        I: IntoIterator<Item = EncodedBulkPart>,
152    {
153        let mut total_output_rows = 0;
154
155        for encoded_part in merged_parts {
156            total_output_rows += encoded_part.metadata().num_rows;
157            self.encoded_parts.push(EncodedPartWrapper {
158                part: encoded_part,
159                file_id: FileId::random(),
160                merging: false,
161            });
162        }
163
164        if merge_encoded {
165            self.encoded_parts
166                .retain(|wrapper| !merged_file_ids.contains(&wrapper.file_id));
167        } else {
168            self.parts
169                .retain(|wrapper| !merged_file_ids.contains(&wrapper.file_id));
170        }
171
172        total_output_rows
173    }
174
175    /// Resets merging flag for parts with the given file ids.
176    /// Used when merging fails or is cancelled.
177    fn reset_merging_flags(&mut self, file_ids: &HashSet<FileId>, merge_encoded: bool) {
178        if merge_encoded {
179            for wrapper in &mut self.encoded_parts {
180                if file_ids.contains(&wrapper.file_id) {
181                    wrapper.merging = false;
182                }
183            }
184        } else {
185            for wrapper in &mut self.parts {
186                if file_ids.contains(&wrapper.file_id) {
187                    wrapper.merging = false;
188                }
189            }
190        }
191    }
192}
193
194/// RAII guard for managing merging flags.
195/// Automatically resets merging flags when dropped if the merge operation wasn't successful.
196struct MergingFlagsGuard<'a> {
197    bulk_parts: &'a RwLock<BulkParts>,
198    file_ids: &'a HashSet<FileId>,
199    merge_encoded: bool,
200    success: bool,
201}
202
203impl<'a> MergingFlagsGuard<'a> {
204    /// Creates a new guard for the given file ids.
205    fn new(
206        bulk_parts: &'a RwLock<BulkParts>,
207        file_ids: &'a HashSet<FileId>,
208        merge_encoded: bool,
209    ) -> Self {
210        Self {
211            bulk_parts,
212            file_ids,
213            merge_encoded,
214            success: false,
215        }
216    }
217
218    /// Marks the merge operation as successful.
219    /// When this is called, the guard will not reset the flags on drop.
220    fn mark_success(&mut self) {
221        self.success = true;
222    }
223}
224
225impl<'a> Drop for MergingFlagsGuard<'a> {
226    fn drop(&mut self) {
227        if !self.success
228            && let Ok(mut parts) = self.bulk_parts.write()
229        {
230            parts.reset_merging_flags(self.file_ids, self.merge_encoded);
231        }
232    }
233}
234
235/// Memtable that ingests and scans parts directly.
236pub struct BulkMemtable {
237    id: MemtableId,
238    parts: Arc<RwLock<BulkParts>>,
239    metadata: RegionMetadataRef,
240    alloc_tracker: AllocTracker,
241    max_timestamp: AtomicI64,
242    min_timestamp: AtomicI64,
243    max_sequence: AtomicU64,
244    num_rows: AtomicUsize,
245    /// Cached flat SST arrow schema for memtable compaction.
246    flat_arrow_schema: SchemaRef,
247    /// Compactor for merging bulk parts
248    compactor: Arc<Mutex<MemtableCompactor>>,
249    /// Dispatcher for scheduling compaction tasks
250    compact_dispatcher: Option<Arc<CompactDispatcher>>,
251    /// Whether the append mode is enabled
252    append_mode: bool,
253    /// Mode to handle duplicate rows while merging
254    merge_mode: MergeMode,
255}
256
257impl std::fmt::Debug for BulkMemtable {
258    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
259        f.debug_struct("BulkMemtable")
260            .field("id", &self.id)
261            .field("num_rows", &self.num_rows.load(Ordering::Relaxed))
262            .field("min_timestamp", &self.min_timestamp.load(Ordering::Relaxed))
263            .field("max_timestamp", &self.max_timestamp.load(Ordering::Relaxed))
264            .field("max_sequence", &self.max_sequence.load(Ordering::Relaxed))
265            .finish()
266    }
267}
268
269impl Memtable for BulkMemtable {
270    fn id(&self) -> MemtableId {
271        self.id
272    }
273
274    fn write(&self, _kvs: &KeyValues) -> Result<()> {
275        UnsupportedOperationSnafu {
276            err_msg: "write() is not supported for bulk memtable",
277        }
278        .fail()
279    }
280
281    fn write_one(&self, _key_value: KeyValue) -> Result<()> {
282        UnsupportedOperationSnafu {
283            err_msg: "write_one() is not supported for bulk memtable",
284        }
285        .fail()
286    }
287
288    fn write_bulk(&self, fragment: BulkPart) -> Result<()> {
289        let local_metrics = WriteMetrics {
290            key_bytes: 0,
291            value_bytes: fragment.estimated_size(),
292            min_ts: fragment.min_timestamp,
293            max_ts: fragment.max_timestamp,
294            num_rows: fragment.num_rows(),
295            max_sequence: fragment.sequence,
296        };
297
298        {
299            let mut bulk_parts = self.parts.write().unwrap();
300            bulk_parts.parts.push(BulkPartWrapper {
301                part: fragment,
302                file_id: FileId::random(),
303                merging: false,
304            });
305
306            // Since this operation should be fast, we do it in parts lock scope.
307            // This ensure the statistics in `ranges()` are correct. What's more,
308            // it guarantees no rows are out of the time range so we don't need to
309            // prune rows by time range again in the iterator of the MemtableRange.
310            self.update_stats(local_metrics);
311        }
312
313        if self.should_compact() {
314            self.schedule_compact();
315        }
316
317        Ok(())
318    }
319
320    #[cfg(any(test, feature = "test"))]
321    fn iter(
322        &self,
323        _projection: Option<&[ColumnId]>,
324        _predicate: Option<table::predicate::Predicate>,
325        _sequence: Option<SequenceRange>,
326    ) -> Result<crate::memtable::BoxedBatchIterator> {
327        todo!()
328    }
329
330    fn ranges(
331        &self,
332        projection: Option<&[ColumnId]>,
333        options: RangesOptions,
334    ) -> Result<MemtableRanges> {
335        let predicate = options.predicate;
336        let sequence = options.sequence;
337        let mut ranges = BTreeMap::new();
338        let mut range_id = 0;
339
340        // TODO(yingwen): Filter ranges by sequence.
341        let context = Arc::new(BulkIterContext::new_with_pre_filter_mode(
342            self.metadata.clone(),
343            projection,
344            predicate.predicate().cloned(),
345            options.for_flush,
346            options.pre_filter_mode,
347        )?);
348
349        // Adds ranges for regular parts and encoded parts
350        {
351            let bulk_parts = self.parts.read().unwrap();
352
353            // Adds ranges for regular parts
354            for part_wrapper in bulk_parts.parts.iter() {
355                // Skips empty parts
356                if part_wrapper.part.num_rows() == 0 {
357                    continue;
358                }
359
360                let range = MemtableRange::new(
361                    Arc::new(MemtableRangeContext::new(
362                        self.id,
363                        Box::new(BulkRangeIterBuilder {
364                            part: part_wrapper.part.clone(),
365                            context: context.clone(),
366                            sequence,
367                        }),
368                        predicate.clone(),
369                    )),
370                    part_wrapper.part.num_rows(),
371                );
372                ranges.insert(range_id, range);
373                range_id += 1;
374            }
375
376            // Adds ranges for encoded parts
377            for encoded_part_wrapper in bulk_parts.encoded_parts.iter() {
378                // Skips empty parts
379                if encoded_part_wrapper.part.metadata().num_rows == 0 {
380                    continue;
381                }
382
383                let range = MemtableRange::new(
384                    Arc::new(MemtableRangeContext::new(
385                        self.id,
386                        Box::new(EncodedBulkRangeIterBuilder {
387                            file_id: encoded_part_wrapper.file_id,
388                            part: encoded_part_wrapper.part.clone(),
389                            context: context.clone(),
390                            sequence,
391                        }),
392                        predicate.clone(),
393                    )),
394                    encoded_part_wrapper.part.metadata().num_rows,
395                );
396                ranges.insert(range_id, range);
397                range_id += 1;
398            }
399        }
400
401        let mut stats = self.stats();
402        stats.num_ranges = ranges.len();
403
404        // TODO(yingwen): Supports per range stats.
405        Ok(MemtableRanges { ranges, stats })
406    }
407
408    fn is_empty(&self) -> bool {
409        let bulk_parts = self.parts.read().unwrap();
410        bulk_parts.is_empty()
411    }
412
413    fn freeze(&self) -> Result<()> {
414        self.alloc_tracker.done_allocating();
415        Ok(())
416    }
417
418    fn stats(&self) -> MemtableStats {
419        let estimated_bytes = self.alloc_tracker.bytes_allocated();
420
421        if estimated_bytes == 0 || self.num_rows.load(Ordering::Relaxed) == 0 {
422            return MemtableStats {
423                estimated_bytes,
424                time_range: None,
425                num_rows: 0,
426                num_ranges: 0,
427                max_sequence: 0,
428                series_count: 0,
429            };
430        }
431
432        let ts_type = self
433            .metadata
434            .time_index_column()
435            .column_schema
436            .data_type
437            .clone()
438            .as_timestamp()
439            .expect("Timestamp column must have timestamp type");
440        let max_timestamp = ts_type.create_timestamp(self.max_timestamp.load(Ordering::Relaxed));
441        let min_timestamp = ts_type.create_timestamp(self.min_timestamp.load(Ordering::Relaxed));
442
443        let num_ranges = self.parts.read().unwrap().num_parts();
444
445        MemtableStats {
446            estimated_bytes,
447            time_range: Some((min_timestamp, max_timestamp)),
448            num_rows: self.num_rows.load(Ordering::Relaxed),
449            num_ranges,
450            max_sequence: self.max_sequence.load(Ordering::Relaxed),
451            series_count: self.estimated_series_count(),
452        }
453    }
454
455    fn fork(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef {
456        // Computes the new flat schema based on the new metadata.
457        let flat_arrow_schema = to_flat_sst_arrow_schema(
458            metadata,
459            &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
460        );
461
462        Arc::new(Self {
463            id,
464            parts: Arc::new(RwLock::new(BulkParts::default())),
465            metadata: metadata.clone(),
466            alloc_tracker: AllocTracker::new(self.alloc_tracker.write_buffer_manager()),
467            max_timestamp: AtomicI64::new(i64::MIN),
468            min_timestamp: AtomicI64::new(i64::MAX),
469            max_sequence: AtomicU64::new(0),
470            num_rows: AtomicUsize::new(0),
471            flat_arrow_schema,
472            compactor: Arc::new(Mutex::new(MemtableCompactor::new(metadata.region_id, id))),
473            compact_dispatcher: self.compact_dispatcher.clone(),
474            append_mode: self.append_mode,
475            merge_mode: self.merge_mode,
476        })
477    }
478
479    fn compact(&self, for_flush: bool) -> Result<()> {
480        let mut compactor = self.compactor.lock().unwrap();
481
482        if for_flush {
483            return Ok(());
484        }
485
486        // Try to merge regular parts first
487        let should_merge = self.parts.read().unwrap().should_merge_bulk_parts();
488        if should_merge {
489            compactor.merge_bulk_parts(
490                &self.flat_arrow_schema,
491                &self.parts,
492                &self.metadata,
493                !self.append_mode,
494                self.merge_mode,
495            )?;
496        }
497
498        // Then try to merge encoded parts
499        let should_merge = self.parts.read().unwrap().should_merge_encoded_parts();
500        if should_merge {
501            compactor.merge_encoded_parts(
502                &self.flat_arrow_schema,
503                &self.parts,
504                &self.metadata,
505                !self.append_mode,
506                self.merge_mode,
507            )?;
508        }
509
510        Ok(())
511    }
512}
513
514impl BulkMemtable {
515    /// Creates a new BulkMemtable
516    pub fn new(
517        id: MemtableId,
518        metadata: RegionMetadataRef,
519        write_buffer_manager: Option<WriteBufferManagerRef>,
520        compact_dispatcher: Option<Arc<CompactDispatcher>>,
521        append_mode: bool,
522        merge_mode: MergeMode,
523    ) -> Self {
524        let flat_arrow_schema = to_flat_sst_arrow_schema(
525            &metadata,
526            &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
527        );
528
529        let region_id = metadata.region_id;
530        Self {
531            id,
532            parts: Arc::new(RwLock::new(BulkParts::default())),
533            metadata,
534            alloc_tracker: AllocTracker::new(write_buffer_manager),
535            max_timestamp: AtomicI64::new(i64::MIN),
536            min_timestamp: AtomicI64::new(i64::MAX),
537            max_sequence: AtomicU64::new(0),
538            num_rows: AtomicUsize::new(0),
539            flat_arrow_schema,
540            compactor: Arc::new(Mutex::new(MemtableCompactor::new(region_id, id))),
541            compact_dispatcher,
542            append_mode,
543            merge_mode,
544        }
545    }
546
547    /// Updates memtable stats.
548    ///
549    /// Please update this inside the write lock scope.
550    fn update_stats(&self, stats: WriteMetrics) {
551        self.alloc_tracker
552            .on_allocation(stats.key_bytes + stats.value_bytes);
553
554        self.max_timestamp
555            .fetch_max(stats.max_ts, Ordering::Relaxed);
556        self.min_timestamp
557            .fetch_min(stats.min_ts, Ordering::Relaxed);
558        self.max_sequence
559            .fetch_max(stats.max_sequence, Ordering::Relaxed);
560        self.num_rows.fetch_add(stats.num_rows, Ordering::Relaxed);
561    }
562
563    /// Returns the estimated time series count.
564    fn estimated_series_count(&self) -> usize {
565        let bulk_parts = self.parts.read().unwrap();
566        bulk_parts
567            .parts
568            .iter()
569            .map(|part_wrapper| part_wrapper.part.estimated_series_count())
570            .sum()
571    }
572
573    /// Returns whether the memtable should be compacted.
574    fn should_compact(&self) -> bool {
575        let parts = self.parts.read().unwrap();
576        parts.should_merge_bulk_parts() || parts.should_merge_encoded_parts()
577    }
578
579    /// Schedules a compaction task using the CompactDispatcher.
580    fn schedule_compact(&self) {
581        if let Some(dispatcher) = &self.compact_dispatcher {
582            let task = MemCompactTask {
583                metadata: self.metadata.clone(),
584                parts: self.parts.clone(),
585                flat_arrow_schema: self.flat_arrow_schema.clone(),
586                compactor: self.compactor.clone(),
587                append_mode: self.append_mode,
588                merge_mode: self.merge_mode,
589            };
590
591            dispatcher.dispatch_compact(task);
592        } else {
593            // Uses synchronous compaction if no dispatcher is available.
594            if let Err(e) = self.compact(false) {
595                common_telemetry::error!(e; "Failed to compact table");
596            }
597        }
598    }
599}
600
601/// Iterator builder for bulk range
602struct BulkRangeIterBuilder {
603    part: BulkPart,
604    context: Arc<BulkIterContext>,
605    sequence: Option<SequenceRange>,
606}
607
608impl IterBuilder for BulkRangeIterBuilder {
609    fn build(&self, _metrics: Option<MemScanMetrics>) -> Result<BoxedBatchIterator> {
610        UnsupportedOperationSnafu {
611            err_msg: "BatchIterator is not supported for bulk memtable",
612        }
613        .fail()
614    }
615
616    fn is_record_batch(&self) -> bool {
617        true
618    }
619
620    fn build_record_batch(
621        &self,
622        _metrics: Option<MemScanMetrics>,
623    ) -> Result<BoxedRecordBatchIterator> {
624        let iter = BulkPartRecordBatchIter::new(
625            self.part.batch.clone(),
626            self.context.clone(),
627            self.sequence,
628        );
629
630        Ok(Box::new(iter))
631    }
632
633    fn encoded_range(&self) -> Option<EncodedRange> {
634        None
635    }
636}
637
638/// Iterator builder for encoded bulk range
639struct EncodedBulkRangeIterBuilder {
640    file_id: FileId,
641    part: EncodedBulkPart,
642    context: Arc<BulkIterContext>,
643    sequence: Option<SequenceRange>,
644}
645
646impl IterBuilder for EncodedBulkRangeIterBuilder {
647    fn build(&self, _metrics: Option<MemScanMetrics>) -> Result<BoxedBatchIterator> {
648        UnsupportedOperationSnafu {
649            err_msg: "BatchIterator is not supported for encoded bulk memtable",
650        }
651        .fail()
652    }
653
654    fn is_record_batch(&self) -> bool {
655        true
656    }
657
658    fn build_record_batch(
659        &self,
660        _metrics: Option<MemScanMetrics>,
661    ) -> Result<BoxedRecordBatchIterator> {
662        if let Some(iter) = self.part.read(self.context.clone(), self.sequence)? {
663            Ok(iter)
664        } else {
665            // Return an empty iterator if no data to read
666            Ok(Box::new(std::iter::empty()))
667        }
668    }
669
670    fn encoded_range(&self) -> Option<EncodedRange> {
671        Some(EncodedRange {
672            data: self.part.data().clone(),
673            sst_info: self.part.to_sst_info(self.file_id),
674        })
675    }
676}
677
678struct BulkPartWrapper {
679    part: BulkPart,
680    /// The unique file id for this part in memtable.
681    file_id: FileId,
682    /// Whether this part is currently being merged.
683    merging: bool,
684}
685
686struct EncodedPartWrapper {
687    part: EncodedBulkPart,
688    /// The unique file id for this part in memtable.
689    file_id: FileId,
690    /// Whether this part is currently being merged.
691    merging: bool,
692}
693
694/// Enum to wrap different types of parts for unified merging.
695#[derive(Clone)]
696enum PartToMerge {
697    /// Raw bulk part.
698    Bulk { part: BulkPart, file_id: FileId },
699    /// Encoded bulk part.
700    Encoded {
701        part: EncodedBulkPart,
702        file_id: FileId,
703    },
704}
705
706impl PartToMerge {
707    /// Gets the file ID of this part.
708    fn file_id(&self) -> FileId {
709        match self {
710            PartToMerge::Bulk { file_id, .. } => *file_id,
711            PartToMerge::Encoded { file_id, .. } => *file_id,
712        }
713    }
714
715    /// Gets the minimum timestamp of this part.
716    fn min_timestamp(&self) -> i64 {
717        match self {
718            PartToMerge::Bulk { part, .. } => part.min_timestamp,
719            PartToMerge::Encoded { part, .. } => part.metadata().min_timestamp,
720        }
721    }
722
723    /// Gets the maximum timestamp of this part.
724    fn max_timestamp(&self) -> i64 {
725        match self {
726            PartToMerge::Bulk { part, .. } => part.max_timestamp,
727            PartToMerge::Encoded { part, .. } => part.metadata().max_timestamp,
728        }
729    }
730
731    /// Gets the number of rows in this part.
732    fn num_rows(&self) -> usize {
733        match self {
734            PartToMerge::Bulk { part, .. } => part.num_rows(),
735            PartToMerge::Encoded { part, .. } => part.metadata().num_rows,
736        }
737    }
738
739    /// Creates a record batch iterator for this part.
740    fn create_iterator(
741        self,
742        context: Arc<BulkIterContext>,
743    ) -> Result<Option<BoxedRecordBatchIterator>> {
744        match self {
745            PartToMerge::Bulk { part, .. } => {
746                let iter = BulkPartRecordBatchIter::new(
747                    part.batch, context, None, // No sequence filter for merging
748                );
749                Ok(Some(Box::new(iter) as BoxedRecordBatchIterator))
750            }
751            PartToMerge::Encoded { part, .. } => part.read(context, None),
752        }
753    }
754}
755
756struct MemtableCompactor {
757    region_id: RegionId,
758    memtable_id: MemtableId,
759}
760
761impl MemtableCompactor {
762    /// Creates a new MemtableCompactor.
763    fn new(region_id: RegionId, memtable_id: MemtableId) -> Self {
764        Self {
765            region_id,
766            memtable_id,
767        }
768    }
769
770    /// Merges bulk parts and then encodes the result to an [EncodedBulkPart].
771    fn merge_bulk_parts(
772        &mut self,
773        arrow_schema: &SchemaRef,
774        bulk_parts: &RwLock<BulkParts>,
775        metadata: &RegionMetadataRef,
776        dedup: bool,
777        merge_mode: MergeMode,
778    ) -> Result<()> {
779        let start = Instant::now();
780
781        // Collects unmerged parts and mark them as being merged
782        let parts_to_merge = bulk_parts.write().unwrap().collect_bulk_parts_to_merge();
783        if parts_to_merge.is_empty() {
784            return Ok(());
785        }
786
787        let merged_file_ids: HashSet<FileId> =
788            parts_to_merge.iter().map(|part| part.file_id()).collect();
789        let mut guard = MergingFlagsGuard::new(bulk_parts, &merged_file_ids, false);
790
791        // Sorts parts by row count (ascending) to merge parts with similar row counts.
792        let mut sorted_parts = parts_to_merge;
793        sorted_parts.sort_unstable_by_key(|part| part.num_rows());
794
795        // Groups parts into chunks for concurrent processing.
796        let part_groups: Vec<Vec<PartToMerge>> = sorted_parts
797            .chunks(16)
798            .map(|chunk| chunk.to_vec())
799            .collect();
800
801        let total_groups = part_groups.len();
802        let total_parts_to_merge: usize = part_groups.iter().map(|group| group.len()).sum();
803        let merged_parts = part_groups
804            .into_par_iter()
805            .map(|group| Self::merge_parts_group(group, arrow_schema, metadata, dedup, merge_mode))
806            .collect::<Result<Vec<Option<EncodedBulkPart>>>>()?;
807
808        // Installs merged parts.
809        let total_output_rows = {
810            let mut parts = bulk_parts.write().unwrap();
811            parts.install_merged_parts(merged_parts.into_iter().flatten(), &merged_file_ids, false)
812        };
813
814        guard.mark_success();
815
816        common_telemetry::debug!(
817            "BulkMemtable {} {} concurrent compact {} groups, {} bulk parts, {} rows, cost: {:?}",
818            self.region_id,
819            self.memtable_id,
820            total_groups,
821            total_parts_to_merge,
822            total_output_rows,
823            start.elapsed()
824        );
825
826        Ok(())
827    }
828
829    /// Merges encoded parts and then encodes the result to an [EncodedBulkPart].
830    fn merge_encoded_parts(
831        &mut self,
832        arrow_schema: &SchemaRef,
833        bulk_parts: &RwLock<BulkParts>,
834        metadata: &RegionMetadataRef,
835        dedup: bool,
836        merge_mode: MergeMode,
837    ) -> Result<()> {
838        let start = Instant::now();
839
840        // Collects unmerged encoded parts within size threshold and mark them as being merged.
841        let parts_to_merge = {
842            let mut parts = bulk_parts.write().unwrap();
843            parts.collect_encoded_parts_to_merge()
844        };
845
846        if parts_to_merge.is_empty() {
847            return Ok(());
848        }
849
850        let merged_file_ids: HashSet<FileId> =
851            parts_to_merge.iter().map(|part| part.file_id()).collect();
852        let mut guard = MergingFlagsGuard::new(bulk_parts, &merged_file_ids, true);
853
854        if parts_to_merge.len() == 1 {
855            // Only 1 part, don't have to merge - the guard will automatically reset the flag
856            return Ok(());
857        }
858
859        // Groups parts into chunks for concurrent processing.
860        let part_groups: Vec<Vec<PartToMerge>> = parts_to_merge
861            .chunks(16)
862            .map(|chunk| chunk.to_vec())
863            .collect();
864
865        let total_groups = part_groups.len();
866        let total_parts_to_merge: usize = part_groups.iter().map(|group| group.len()).sum();
867
868        let merged_parts = part_groups
869            .into_par_iter()
870            .map(|group| Self::merge_parts_group(group, arrow_schema, metadata, dedup, merge_mode))
871            .collect::<Result<Vec<Option<EncodedBulkPart>>>>()?;
872
873        // Installs merged parts using iterator and get total output rows
874        let total_output_rows = {
875            let mut parts = bulk_parts.write().unwrap();
876            parts.install_merged_parts(merged_parts.into_iter().flatten(), &merged_file_ids, true)
877        };
878
879        // Marks the operation as successful to prevent flag reset
880        guard.mark_success();
881
882        common_telemetry::debug!(
883            "BulkMemtable {} {} concurrent compact {} groups, {} encoded parts, {} rows, cost: {:?}",
884            self.region_id,
885            self.memtable_id,
886            total_groups,
887            total_parts_to_merge,
888            total_output_rows,
889            start.elapsed()
890        );
891
892        Ok(())
893    }
894
895    /// Merges a group of parts into a single encoded part.
896    fn merge_parts_group(
897        parts_to_merge: Vec<PartToMerge>,
898        arrow_schema: &SchemaRef,
899        metadata: &RegionMetadataRef,
900        dedup: bool,
901        merge_mode: MergeMode,
902    ) -> Result<Option<EncodedBulkPart>> {
903        if parts_to_merge.is_empty() {
904            return Ok(None);
905        }
906
907        // Calculates timestamp bounds for merged data
908        let min_timestamp = parts_to_merge
909            .iter()
910            .map(|p| p.min_timestamp())
911            .min()
912            .unwrap_or(i64::MAX);
913        let max_timestamp = parts_to_merge
914            .iter()
915            .map(|p| p.max_timestamp())
916            .max()
917            .unwrap_or(i64::MIN);
918
919        let context = Arc::new(BulkIterContext::new(
920            metadata.clone(),
921            None, // No column projection for merging
922            None, // No predicate for merging
923            true,
924        )?);
925
926        // Creates iterators for all parts to merge.
927        let iterators: Vec<BoxedRecordBatchIterator> = parts_to_merge
928            .into_iter()
929            .filter_map(|part| part.create_iterator(context.clone()).ok().flatten())
930            .collect();
931
932        if iterators.is_empty() {
933            return Ok(None);
934        }
935
936        let merged_iter =
937            FlatMergeIterator::new(arrow_schema.clone(), iterators, DEFAULT_READ_BATCH_SIZE)?;
938
939        let boxed_iter: BoxedRecordBatchIterator = if dedup {
940            // Applies deduplication based on merge mode
941            match merge_mode {
942                MergeMode::LastRow => {
943                    let dedup_iter = FlatDedupIterator::new(merged_iter, FlatLastRow::new(false));
944                    Box::new(dedup_iter)
945                }
946                MergeMode::LastNonNull => {
947                    // Calculates field column start: total columns - fixed columns - field columns
948                    // Field column count = total metadata columns - time index column - primary key columns
949                    let field_column_count =
950                        metadata.column_metadatas.len() - 1 - metadata.primary_key.len();
951                    let total_columns = arrow_schema.fields().len();
952                    let field_column_start =
953                        total_columns - FIXED_POS_COLUMN_NUM - field_column_count;
954
955                    let dedup_iter = FlatDedupIterator::new(
956                        merged_iter,
957                        FlatLastNonNull::new(field_column_start, false),
958                    );
959                    Box::new(dedup_iter)
960                }
961            }
962        } else {
963            Box::new(merged_iter)
964        };
965
966        // Encodes the merged iterator
967        let encoder = BulkPartEncoder::new(metadata.clone(), DEFAULT_ROW_GROUP_SIZE)?;
968        let mut metrics = BulkPartEncodeMetrics::default();
969        let encoded_part = encoder.encode_record_batch_iter(
970            boxed_iter,
971            arrow_schema.clone(),
972            min_timestamp,
973            max_timestamp,
974            &mut metrics,
975        )?;
976
977        common_telemetry::trace!("merge_parts_group metrics: {:?}", metrics);
978
979        Ok(encoded_part)
980    }
981}
982
983/// A memtable compact task to run in background.
984struct MemCompactTask {
985    metadata: RegionMetadataRef,
986    parts: Arc<RwLock<BulkParts>>,
987
988    /// Cached flat SST arrow schema
989    flat_arrow_schema: SchemaRef,
990    /// Compactor for merging bulk parts
991    compactor: Arc<Mutex<MemtableCompactor>>,
992    /// Whether the append mode is enabled
993    append_mode: bool,
994    /// Mode to handle duplicate rows while merging
995    merge_mode: MergeMode,
996}
997
998impl MemCompactTask {
999    fn compact(&self) -> Result<()> {
1000        let mut compactor = self.compactor.lock().unwrap();
1001
1002        // Tries to merge regular parts first
1003        let should_merge = self.parts.read().unwrap().should_merge_bulk_parts();
1004        if should_merge {
1005            compactor.merge_bulk_parts(
1006                &self.flat_arrow_schema,
1007                &self.parts,
1008                &self.metadata,
1009                !self.append_mode,
1010                self.merge_mode,
1011            )?;
1012        }
1013
1014        // Then tries to merge encoded parts
1015        let should_merge = self.parts.read().unwrap().should_merge_encoded_parts();
1016        if should_merge {
1017            compactor.merge_encoded_parts(
1018                &self.flat_arrow_schema,
1019                &self.parts,
1020                &self.metadata,
1021                !self.append_mode,
1022                self.merge_mode,
1023            )?;
1024        }
1025
1026        Ok(())
1027    }
1028}
1029
1030/// Scheduler to run compact tasks in background.
1031#[derive(Debug)]
1032pub struct CompactDispatcher {
1033    semaphore: Arc<Semaphore>,
1034}
1035
1036impl CompactDispatcher {
1037    /// Creates a new dispatcher with the given number of max concurrent tasks.
1038    pub fn new(permits: usize) -> Self {
1039        Self {
1040            semaphore: Arc::new(Semaphore::new(permits)),
1041        }
1042    }
1043
1044    /// Dispatches a compact task to run in background.
1045    fn dispatch_compact(&self, task: MemCompactTask) {
1046        let semaphore = self.semaphore.clone();
1047        common_runtime::spawn_global(async move {
1048            let Ok(_permit) = semaphore.acquire().await else {
1049                return;
1050            };
1051
1052            common_runtime::spawn_blocking_global(move || {
1053                if let Err(e) = task.compact() {
1054                    common_telemetry::error!(e; "Failed to compact memtable, region: {}", task.metadata.region_id);
1055                }
1056            });
1057        });
1058    }
1059}
1060
1061/// Builder to build a [BulkMemtable].
1062#[derive(Debug, Default)]
1063pub struct BulkMemtableBuilder {
1064    write_buffer_manager: Option<WriteBufferManagerRef>,
1065    compact_dispatcher: Option<Arc<CompactDispatcher>>,
1066    append_mode: bool,
1067    merge_mode: MergeMode,
1068}
1069
1070impl BulkMemtableBuilder {
1071    /// Creates a new builder with specific `write_buffer_manager`.
1072    pub fn new(
1073        write_buffer_manager: Option<WriteBufferManagerRef>,
1074        append_mode: bool,
1075        merge_mode: MergeMode,
1076    ) -> Self {
1077        Self {
1078            write_buffer_manager,
1079            compact_dispatcher: None,
1080            append_mode,
1081            merge_mode,
1082        }
1083    }
1084
1085    /// Sets the compact dispatcher.
1086    pub fn with_compact_dispatcher(mut self, compact_dispatcher: Arc<CompactDispatcher>) -> Self {
1087        self.compact_dispatcher = Some(compact_dispatcher);
1088        self
1089    }
1090}
1091
1092impl MemtableBuilder for BulkMemtableBuilder {
1093    fn build(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef {
1094        Arc::new(BulkMemtable::new(
1095            id,
1096            metadata.clone(),
1097            self.write_buffer_manager.clone(),
1098            self.compact_dispatcher.clone(),
1099            self.append_mode,
1100            self.merge_mode,
1101        ))
1102    }
1103
1104    fn use_bulk_insert(&self, _metadata: &RegionMetadataRef) -> bool {
1105        true
1106    }
1107}
1108
1109#[cfg(test)]
1110mod tests {
1111
1112    use mito_codec::row_converter::build_primary_key_codec;
1113
1114    use super::*;
1115    use crate::memtable::bulk::part::BulkPartConverter;
1116    use crate::read::scan_region::PredicateGroup;
1117    use crate::sst::{FlatSchemaOptions, to_flat_sst_arrow_schema};
1118    use crate::test_util::memtable_util::{build_key_values_with_ts_seq_values, metadata_for_test};
1119
1120    fn create_bulk_part_with_converter(
1121        k0: &str,
1122        k1: u32,
1123        timestamps: Vec<i64>,
1124        values: Vec<Option<f64>>,
1125        sequence: u64,
1126    ) -> Result<BulkPart> {
1127        let metadata = metadata_for_test();
1128        let capacity = 100;
1129        let primary_key_codec = build_primary_key_codec(&metadata);
1130        let schema = to_flat_sst_arrow_schema(
1131            &metadata,
1132            &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
1133        );
1134
1135        let mut converter =
1136            BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec, true);
1137
1138        let key_values = build_key_values_with_ts_seq_values(
1139            &metadata,
1140            k0.to_string(),
1141            k1,
1142            timestamps.into_iter(),
1143            values.into_iter(),
1144            sequence,
1145        );
1146
1147        converter.append_key_values(&key_values)?;
1148        converter.convert()
1149    }
1150
1151    #[test]
1152    fn test_bulk_memtable_write_read() {
1153        let metadata = metadata_for_test();
1154        let memtable =
1155            BulkMemtable::new(999, metadata.clone(), None, None, false, MergeMode::LastRow);
1156
1157        let test_data = [
1158            (
1159                "key_a",
1160                1u32,
1161                vec![1000i64, 2000i64],
1162                vec![Some(10.5), Some(20.5)],
1163                100u64,
1164            ),
1165            (
1166                "key_b",
1167                2u32,
1168                vec![1500i64, 2500i64],
1169                vec![Some(15.5), Some(25.5)],
1170                200u64,
1171            ),
1172            ("key_c", 3u32, vec![3000i64], vec![Some(30.5)], 300u64),
1173        ];
1174
1175        for (k0, k1, timestamps, values, seq) in test_data.iter() {
1176            let part =
1177                create_bulk_part_with_converter(k0, *k1, timestamps.clone(), values.clone(), *seq)
1178                    .unwrap();
1179            memtable.write_bulk(part).unwrap();
1180        }
1181
1182        let stats = memtable.stats();
1183        assert_eq!(5, stats.num_rows);
1184        assert_eq!(3, stats.num_ranges);
1185        assert_eq!(300, stats.max_sequence);
1186
1187        let (min_ts, max_ts) = stats.time_range.unwrap();
1188        assert_eq!(1000, min_ts.value());
1189        assert_eq!(3000, max_ts.value());
1190
1191        let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
1192        let ranges = memtable
1193            .ranges(
1194                None,
1195                RangesOptions::default().with_predicate(predicate_group),
1196            )
1197            .unwrap();
1198
1199        assert_eq!(3, ranges.ranges.len());
1200        assert_eq!(5, ranges.stats.num_rows);
1201
1202        for (_range_id, range) in ranges.ranges.iter() {
1203            assert!(range.num_rows() > 0);
1204            assert!(range.is_record_batch());
1205
1206            let record_batch_iter = range.build_record_batch_iter(None).unwrap();
1207
1208            let mut total_rows = 0;
1209            for batch_result in record_batch_iter {
1210                let batch = batch_result.unwrap();
1211                total_rows += batch.num_rows();
1212                assert!(batch.num_rows() > 0);
1213                assert_eq!(8, batch.num_columns());
1214            }
1215            assert_eq!(total_rows, range.num_rows());
1216        }
1217    }
1218
1219    #[test]
1220    fn test_bulk_memtable_ranges_with_projection() {
1221        let metadata = metadata_for_test();
1222        let memtable =
1223            BulkMemtable::new(111, metadata.clone(), None, None, false, MergeMode::LastRow);
1224
1225        let bulk_part = create_bulk_part_with_converter(
1226            "projection_test",
1227            5,
1228            vec![5000, 6000, 7000],
1229            vec![Some(50.0), Some(60.0), Some(70.0)],
1230            500,
1231        )
1232        .unwrap();
1233
1234        memtable.write_bulk(bulk_part).unwrap();
1235
1236        let projection = vec![4u32];
1237        let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
1238        let ranges = memtable
1239            .ranges(
1240                Some(&projection),
1241                RangesOptions::default().with_predicate(predicate_group),
1242            )
1243            .unwrap();
1244
1245        assert_eq!(1, ranges.ranges.len());
1246        let range = ranges.ranges.get(&0).unwrap();
1247
1248        assert!(range.is_record_batch());
1249        let record_batch_iter = range.build_record_batch_iter(None).unwrap();
1250
1251        let mut total_rows = 0;
1252        for batch_result in record_batch_iter {
1253            let batch = batch_result.unwrap();
1254            assert!(batch.num_rows() > 0);
1255            assert_eq!(5, batch.num_columns());
1256            total_rows += batch.num_rows();
1257        }
1258        assert_eq!(3, total_rows);
1259    }
1260
1261    #[test]
1262    fn test_bulk_memtable_unsupported_operations() {
1263        let metadata = metadata_for_test();
1264        let memtable =
1265            BulkMemtable::new(111, metadata.clone(), None, None, false, MergeMode::LastRow);
1266
1267        let key_values = build_key_values_with_ts_seq_values(
1268            &metadata,
1269            "test".to_string(),
1270            1,
1271            vec![1000].into_iter(),
1272            vec![Some(1.0)].into_iter(),
1273            1,
1274        );
1275
1276        let err = memtable.write(&key_values).unwrap_err();
1277        assert!(err.to_string().contains("not supported"));
1278
1279        let kv = key_values.iter().next().unwrap();
1280        let err = memtable.write_one(kv).unwrap_err();
1281        assert!(err.to_string().contains("not supported"));
1282    }
1283
1284    #[test]
1285    fn test_bulk_memtable_freeze() {
1286        let metadata = metadata_for_test();
1287        let memtable =
1288            BulkMemtable::new(222, metadata.clone(), None, None, false, MergeMode::LastRow);
1289
1290        let bulk_part = create_bulk_part_with_converter(
1291            "freeze_test",
1292            10,
1293            vec![10000],
1294            vec![Some(100.0)],
1295            1000,
1296        )
1297        .unwrap();
1298
1299        memtable.write_bulk(bulk_part).unwrap();
1300        memtable.freeze().unwrap();
1301
1302        let stats_after_freeze = memtable.stats();
1303        assert_eq!(1, stats_after_freeze.num_rows);
1304    }
1305
1306    #[test]
1307    fn test_bulk_memtable_fork() {
1308        let metadata = metadata_for_test();
1309        let original_memtable =
1310            BulkMemtable::new(333, metadata.clone(), None, None, false, MergeMode::LastRow);
1311
1312        let bulk_part =
1313            create_bulk_part_with_converter("fork_test", 15, vec![15000], vec![Some(150.0)], 1500)
1314                .unwrap();
1315
1316        original_memtable.write_bulk(bulk_part).unwrap();
1317
1318        let forked_memtable = original_memtable.fork(444, &metadata);
1319
1320        assert_eq!(forked_memtable.id(), 444);
1321        assert!(forked_memtable.is_empty());
1322        assert_eq!(0, forked_memtable.stats().num_rows);
1323
1324        assert!(!original_memtable.is_empty());
1325        assert_eq!(1, original_memtable.stats().num_rows);
1326    }
1327
1328    #[test]
1329    fn test_bulk_memtable_ranges_multiple_parts() {
1330        let metadata = metadata_for_test();
1331        let memtable =
1332            BulkMemtable::new(777, metadata.clone(), None, None, false, MergeMode::LastRow);
1333
1334        let parts_data = vec![
1335            (
1336                "part1",
1337                1u32,
1338                vec![1000i64, 1100i64],
1339                vec![Some(10.0), Some(11.0)],
1340                100u64,
1341            ),
1342            (
1343                "part2",
1344                2u32,
1345                vec![2000i64, 2100i64],
1346                vec![Some(20.0), Some(21.0)],
1347                200u64,
1348            ),
1349            ("part3", 3u32, vec![3000i64], vec![Some(30.0)], 300u64),
1350        ];
1351
1352        for (k0, k1, timestamps, values, seq) in parts_data {
1353            let part = create_bulk_part_with_converter(k0, k1, timestamps, values, seq).unwrap();
1354            memtable.write_bulk(part).unwrap();
1355        }
1356
1357        let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
1358        let ranges = memtable
1359            .ranges(
1360                None,
1361                RangesOptions::default().with_predicate(predicate_group),
1362            )
1363            .unwrap();
1364
1365        assert_eq!(3, ranges.ranges.len());
1366        assert_eq!(5, ranges.stats.num_rows);
1367        assert_eq!(3, ranges.stats.num_ranges);
1368
1369        for (range_id, range) in ranges.ranges.iter() {
1370            assert!(*range_id < 3);
1371            assert!(range.num_rows() > 0);
1372            assert!(range.is_record_batch());
1373        }
1374    }
1375
1376    #[test]
1377    fn test_bulk_memtable_ranges_with_sequence_filter() {
1378        let metadata = metadata_for_test();
1379        let memtable =
1380            BulkMemtable::new(888, metadata.clone(), None, None, false, MergeMode::LastRow);
1381
1382        let part = create_bulk_part_with_converter(
1383            "seq_test",
1384            1,
1385            vec![1000, 2000, 3000],
1386            vec![Some(10.0), Some(20.0), Some(30.0)],
1387            500,
1388        )
1389        .unwrap();
1390
1391        memtable.write_bulk(part).unwrap();
1392
1393        let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
1394        let sequence_filter = Some(SequenceRange::LtEq { max: 400 }); // Filters out rows with sequence > 400
1395        let ranges = memtable
1396            .ranges(
1397                None,
1398                RangesOptions::default()
1399                    .with_predicate(predicate_group)
1400                    .with_sequence(sequence_filter),
1401            )
1402            .unwrap();
1403
1404        assert_eq!(1, ranges.ranges.len());
1405        let range = ranges.ranges.get(&0).unwrap();
1406
1407        let mut record_batch_iter = range.build_record_batch_iter(None).unwrap();
1408        assert!(record_batch_iter.next().is_none());
1409    }
1410
1411    #[test]
1412    fn test_bulk_memtable_ranges_with_encoded_parts() {
1413        let metadata = metadata_for_test();
1414        let memtable =
1415            BulkMemtable::new(999, metadata.clone(), None, None, false, MergeMode::LastRow);
1416
1417        // Adds enough bulk parts to trigger encoding
1418        for i in 0..10 {
1419            let part = create_bulk_part_with_converter(
1420                &format!("key_{}", i),
1421                i,
1422                vec![1000 + i as i64 * 100],
1423                vec![Some(i as f64 * 10.0)],
1424                100 + i as u64,
1425            )
1426            .unwrap();
1427            memtable.write_bulk(part).unwrap();
1428        }
1429
1430        memtable.compact(false).unwrap();
1431
1432        let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
1433        let ranges = memtable
1434            .ranges(
1435                None,
1436                RangesOptions::default().with_predicate(predicate_group),
1437            )
1438            .unwrap();
1439
1440        // Should have ranges for both bulk parts and encoded parts
1441        assert_eq!(3, ranges.ranges.len());
1442        assert_eq!(10, ranges.stats.num_rows);
1443
1444        for (_range_id, range) in ranges.ranges.iter() {
1445            assert!(range.num_rows() > 0);
1446            assert!(range.is_record_batch());
1447
1448            let record_batch_iter = range.build_record_batch_iter(None).unwrap();
1449            let mut total_rows = 0;
1450            for batch_result in record_batch_iter {
1451                let batch = batch_result.unwrap();
1452                total_rows += batch.num_rows();
1453                assert!(batch.num_rows() > 0);
1454            }
1455            assert_eq!(total_rows, range.num_rows());
1456        }
1457    }
1458}