Skip to main content

mito2/memtable/
bulk.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Memtable implementation for bulk load
16
17pub(crate) mod chunk_reader;
18pub mod context;
19pub mod part;
20pub mod part_reader;
21mod row_group_reader;
22
23use std::collections::{BTreeMap, HashSet};
24use std::sync::atomic::{AtomicI64, AtomicU64, AtomicUsize, Ordering};
25use std::sync::{Arc, LazyLock, Mutex, RwLock};
26use std::time::Instant;
27
28/// Reads an environment variable as usize, returning default if not set or invalid.
29fn env_usize(name: &str, default: usize) -> usize {
30    std::env::var(name)
31        .ok()
32        .and_then(|v| v.parse().ok())
33        .unwrap_or(default)
34}
35
36use common_time::Timestamp;
37use datatypes::arrow::datatypes::SchemaRef;
38use mito_codec::key_values::KeyValue;
39use rayon::prelude::*;
40use store_api::metadata::RegionMetadataRef;
41use store_api::storage::{ColumnId, FileId, RegionId, SequenceRange};
42use tokio::sync::Semaphore;
43
44use crate::error::{Result, UnsupportedOperationSnafu};
45use crate::flush::WriteBufferManagerRef;
46use crate::memtable::bulk::context::BulkIterContext;
47use crate::memtable::bulk::part::{
48    BulkPart, BulkPartEncodeMetrics, BulkPartEncoder, MultiBulkPart, UnorderedPart,
49    should_prune_bulk_part,
50};
51use crate::memtable::bulk::part_reader::BulkPartBatchIter;
52use crate::memtable::stats::WriteMetrics;
53use crate::memtable::{
54    AllocTracker, BoxedBatchIterator, BoxedRecordBatchIterator, EncodedBulkPart, EncodedRange,
55    IterBuilder, KeyValues, MemScanMetrics, Memtable, MemtableBuilder, MemtableId, MemtableRange,
56    MemtableRangeContext, MemtableRanges, MemtableRef, MemtableStats, RangesOptions,
57};
58use crate::read::flat_dedup::{FlatDedupIterator, FlatLastNonNull, FlatLastRow};
59use crate::read::flat_merge::FlatMergeIterator;
60use crate::region::options::MergeMode;
61use crate::sst::parquet::flat_format::field_column_start;
62use crate::sst::parquet::{DEFAULT_READ_BATCH_SIZE, DEFAULT_ROW_GROUP_SIZE};
63use crate::sst::{FlatSchemaOptions, to_flat_sst_arrow_schema};
64
65/// Default merge threshold for triggering compaction.
66const DEFAULT_MERGE_THRESHOLD: usize = 16;
67
68/// Threshold for triggering merge of parts. Configurable via `GREPTIME_BULK_MERGE_THRESHOLD`.
69static MERGE_THRESHOLD: LazyLock<usize> =
70    LazyLock::new(|| env_usize("GREPTIME_BULK_MERGE_THRESHOLD", DEFAULT_MERGE_THRESHOLD));
71
72/// Default maximum number of groups for parallel merging.
73const DEFAULT_MAX_MERGE_GROUPS: usize = 32;
74
75/// Maximum merge groups. Configurable via `GREPTIME_BULK_MAX_MERGE_GROUPS`.
76static MAX_MERGE_GROUPS: LazyLock<usize> =
77    LazyLock::new(|| env_usize("GREPTIME_BULK_MAX_MERGE_GROUPS", DEFAULT_MAX_MERGE_GROUPS));
78
79/// Row threshold for encoding parts. Configurable via `GREPTIME_BULK_ENCODE_ROW_THRESHOLD`.
80/// When estimated rows exceed this threshold, parts are encoded as EncodedBulkPart.
81pub(crate) static ENCODE_ROW_THRESHOLD: LazyLock<usize> = LazyLock::new(|| {
82    env_usize(
83        "GREPTIME_BULK_ENCODE_ROW_THRESHOLD",
84        10 * DEFAULT_ROW_GROUP_SIZE,
85    )
86});
87
88/// Default bytes threshold for encoding.
89const DEFAULT_ENCODE_BYTES_THRESHOLD: usize = 64 * 1024 * 1024;
90
91/// Bytes threshold for encoding parts. Configurable via `GREPTIME_BULK_ENCODE_BYTES_THRESHOLD`.
92/// When estimated bytes exceed this threshold, parts are encoded as EncodedBulkPart.
93static ENCODE_BYTES_THRESHOLD: LazyLock<usize> = LazyLock::new(|| {
94    env_usize(
95        "GREPTIME_BULK_ENCODE_BYTES_THRESHOLD",
96        DEFAULT_ENCODE_BYTES_THRESHOLD,
97    )
98});
99
100/// Configuration for bulk memtable.
101#[derive(Debug, Clone)]
102pub struct BulkMemtableConfig {
103    /// Threshold for triggering merge of parts.
104    pub merge_threshold: usize,
105    /// Row threshold for encoding parts.
106    pub encode_row_threshold: usize,
107    /// Bytes threshold for encoding parts.
108    pub encode_bytes_threshold: usize,
109    /// Maximum number of groups for parallel merging.
110    pub max_merge_groups: usize,
111}
112
113impl Default for BulkMemtableConfig {
114    fn default() -> Self {
115        Self {
116            merge_threshold: *MERGE_THRESHOLD,
117            encode_row_threshold: *ENCODE_ROW_THRESHOLD,
118            encode_bytes_threshold: *ENCODE_BYTES_THRESHOLD,
119            max_merge_groups: *MAX_MERGE_GROUPS,
120        }
121    }
122}
123
124/// Result of merging parts - either a MultiBulkPart or an EncodedBulkPart
125enum MergedPart {
126    /// Merged part stored as MultiBulkPart (when rows < DEFAULT_ROW_GROUP_SIZE)
127    Multi(MultiBulkPart),
128    /// Merged part stored as EncodedBulkPart (when rows >= DEFAULT_ROW_GROUP_SIZE)
129    Encoded(EncodedBulkPart),
130}
131
132/// Result of collecting parts to merge
133struct CollectedParts {
134    /// Groups of parts ready for merging (each group has up to 16 parts)
135    groups: Vec<Vec<PartToMerge>>,
136}
137
138/// All parts in a bulk memtable.
139#[derive(Default)]
140struct BulkParts {
141    /// Unordered small parts.
142    unordered_part: UnorderedPart,
143    /// All parts (raw and encoded).
144    parts: Vec<BulkPartWrapper>,
145}
146
147impl BulkParts {
148    /// Total number of parts (including unordered).
149    fn num_parts(&self) -> usize {
150        let unordered_count = if self.unordered_part.is_empty() { 0 } else { 1 };
151        self.parts.len() + unordered_count
152    }
153
154    /// Returns true if there is no part.
155    fn is_empty(&self) -> bool {
156        self.unordered_part.is_empty() && self.parts.is_empty()
157    }
158
159    /// Returns true if bulk parts or encoded parts should be merged.
160    /// Uses short-circuit counting to stop early once threshold is reached.
161    fn should_merge_parts(&self, merge_threshold: usize) -> bool {
162        let mut bulk_count = 0;
163        let mut encoded_count = 0;
164
165        for wrapper in &self.parts {
166            if wrapper.merging {
167                continue;
168            }
169
170            if wrapper.part.is_encoded() {
171                encoded_count += 1;
172            } else {
173                bulk_count += 1;
174            }
175
176            // Short-circuit: stop counting if either threshold is reached
177            if bulk_count >= merge_threshold || encoded_count >= merge_threshold {
178                return true;
179            }
180        }
181
182        false
183    }
184
185    /// Returns true if the unordered_part should be compacted into a BulkPart.
186    fn should_compact_unordered_part(&self) -> bool {
187        self.unordered_part.should_compact()
188    }
189
190    /// Collects unmerged parts and marks them as being merged.
191    /// Only collects parts of types that meet the threshold.
192    /// Parts are pre-grouped into chunks for parallel processing.
193    fn collect_parts_to_merge(
194        &mut self,
195        merge_threshold: usize,
196        max_merge_groups: usize,
197    ) -> CollectedParts {
198        // First pass: collect indices and row counts for each type
199        let mut bulk_indices: Vec<(usize, usize)> = Vec::new();
200        let mut encoded_indices: Vec<(usize, usize)> = Vec::new();
201
202        for (idx, wrapper) in self.parts.iter().enumerate() {
203            if wrapper.merging {
204                continue;
205            }
206            let num_rows = wrapper.part.num_rows();
207            if wrapper.part.is_encoded() {
208                encoded_indices.push((idx, num_rows));
209            } else {
210                bulk_indices.push((idx, num_rows));
211            }
212        }
213
214        let mut groups = Vec::new();
215
216        // Process bulk parts if threshold met
217        if bulk_indices.len() >= merge_threshold {
218            groups.extend(self.collect_and_group_parts(
219                bulk_indices,
220                merge_threshold,
221                max_merge_groups,
222            ));
223        }
224
225        // Process encoded parts if threshold met
226        if encoded_indices.len() >= merge_threshold {
227            groups.extend(self.collect_and_group_parts(
228                encoded_indices,
229                merge_threshold,
230                max_merge_groups,
231            ));
232        }
233
234        CollectedParts { groups }
235    }
236
237    /// Sorts indices by row count, groups into chunks, marks as merging, and returns groups.
238    fn collect_and_group_parts(
239        &mut self,
240        mut indices: Vec<(usize, usize)>,
241        merge_threshold: usize,
242        max_merge_groups: usize,
243    ) -> Vec<Vec<PartToMerge>> {
244        if indices.is_empty() {
245            return Vec::new();
246        }
247
248        // Sort by row count for better grouping
249        indices.sort_unstable_by_key(|(_, num_rows)| *num_rows);
250
251        // Group into chunks of merge_threshold size, limit to max_merge_groups
252        indices
253            .chunks(merge_threshold)
254            .take(max_merge_groups)
255            .map(|chunk| {
256                chunk
257                    .iter()
258                    .map(|(idx, _)| {
259                        let wrapper = &mut self.parts[*idx];
260                        wrapper.merging = true;
261                        wrapper.part.clone()
262                    })
263                    .collect()
264            })
265            .collect()
266    }
267
268    /// Installs merged parts and removes the original parts by file ids.
269    /// Returns the total number of rows in the merged parts.
270    fn install_merged_parts<I>(
271        &mut self,
272        merged_parts: I,
273        merged_file_ids: &HashSet<FileId>,
274    ) -> usize
275    where
276        I: IntoIterator<Item = MergedPart>,
277    {
278        let mut total_output_rows = 0;
279
280        for merged_part in merged_parts {
281            match merged_part {
282                MergedPart::Encoded(encoded_part) => {
283                    total_output_rows += encoded_part.metadata().num_rows;
284                    self.parts.push(BulkPartWrapper {
285                        part: PartToMerge::Encoded {
286                            part: encoded_part,
287                            file_id: FileId::random(),
288                        },
289                        merging: false,
290                    });
291                }
292                MergedPart::Multi(multi_part) => {
293                    total_output_rows += multi_part.num_rows();
294                    self.parts.push(BulkPartWrapper {
295                        part: PartToMerge::Multi {
296                            part: multi_part,
297                            file_id: FileId::random(),
298                        },
299                        merging: false,
300                    });
301                }
302            }
303        }
304
305        self.parts
306            .retain(|wrapper| !merged_file_ids.contains(&wrapper.file_id()));
307
308        total_output_rows
309    }
310
311    /// Resets merging flag for parts with the given file ids.
312    /// Used when merging fails or is cancelled.
313    fn reset_merging_flags(&mut self, file_ids: &HashSet<FileId>) {
314        for wrapper in &mut self.parts {
315            if file_ids.contains(&wrapper.file_id()) {
316                wrapper.merging = false;
317            }
318        }
319    }
320}
321
322/// RAII guard for managing merging flags.
323/// Automatically resets merging flags when dropped if the merge operation wasn't successful.
324struct MergingFlagsGuard<'a> {
325    bulk_parts: &'a RwLock<BulkParts>,
326    file_ids: &'a HashSet<FileId>,
327    success: bool,
328}
329
330impl<'a> MergingFlagsGuard<'a> {
331    /// Creates a new guard for the given file ids.
332    fn new(bulk_parts: &'a RwLock<BulkParts>, file_ids: &'a HashSet<FileId>) -> Self {
333        Self {
334            bulk_parts,
335            file_ids,
336            success: false,
337        }
338    }
339
340    /// Marks the merge operation as successful.
341    /// When this is called, the guard will not reset the flags on drop.
342    fn mark_success(&mut self) {
343        self.success = true;
344    }
345}
346
347impl<'a> Drop for MergingFlagsGuard<'a> {
348    fn drop(&mut self) {
349        if !self.success
350            && let Ok(mut parts) = self.bulk_parts.write()
351        {
352            parts.reset_merging_flags(self.file_ids);
353        }
354    }
355}
356
357/// Memtable that ingests and scans parts directly.
358pub struct BulkMemtable {
359    id: MemtableId,
360    /// Configuration for the bulk memtable.
361    config: BulkMemtableConfig,
362    parts: Arc<RwLock<BulkParts>>,
363    metadata: RegionMetadataRef,
364    alloc_tracker: AllocTracker,
365    max_timestamp: AtomicI64,
366    min_timestamp: AtomicI64,
367    max_sequence: AtomicU64,
368    num_rows: AtomicUsize,
369    /// Cached flat SST arrow schema for memtable compaction.
370    flat_arrow_schema: SchemaRef,
371    /// Compactor for merging bulk parts
372    compactor: Arc<Mutex<MemtableCompactor>>,
373    /// Dispatcher for scheduling compaction tasks
374    compact_dispatcher: Option<Arc<CompactDispatcher>>,
375    /// Whether the append mode is enabled
376    append_mode: bool,
377    /// Mode to handle duplicate rows while merging
378    merge_mode: MergeMode,
379}
380
381impl std::fmt::Debug for BulkMemtable {
382    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
383        f.debug_struct("BulkMemtable")
384            .field("id", &self.id)
385            .field("num_rows", &self.num_rows.load(Ordering::Relaxed))
386            .field("min_timestamp", &self.min_timestamp.load(Ordering::Relaxed))
387            .field("max_timestamp", &self.max_timestamp.load(Ordering::Relaxed))
388            .field("max_sequence", &self.max_sequence.load(Ordering::Relaxed))
389            .finish()
390    }
391}
392
393impl Memtable for BulkMemtable {
394    fn id(&self) -> MemtableId {
395        self.id
396    }
397
398    fn write(&self, _kvs: &KeyValues) -> Result<()> {
399        UnsupportedOperationSnafu {
400            err_msg: "write() is not supported for bulk memtable",
401        }
402        .fail()
403    }
404
405    fn write_one(&self, _key_value: KeyValue) -> Result<()> {
406        UnsupportedOperationSnafu {
407            err_msg: "write_one() is not supported for bulk memtable",
408        }
409        .fail()
410    }
411
412    fn write_bulk(&self, fragment: BulkPart) -> Result<()> {
413        let local_metrics = WriteMetrics {
414            key_bytes: 0,
415            value_bytes: fragment.estimated_size(),
416            min_ts: fragment.min_timestamp,
417            max_ts: fragment.max_timestamp,
418            num_rows: fragment.num_rows(),
419            max_sequence: fragment.sequence,
420        };
421
422        {
423            let mut bulk_parts = self.parts.write().unwrap();
424
425            // Routes small parts to unordered_part based on threshold
426            if bulk_parts.unordered_part.should_accept(fragment.num_rows()) {
427                bulk_parts.unordered_part.push(fragment);
428
429                // Compacts unordered_part if threshold is reached
430                if bulk_parts.should_compact_unordered_part()
431                    && let Some(bulk_part) = bulk_parts.unordered_part.to_bulk_part()?
432                {
433                    bulk_parts.parts.push(BulkPartWrapper {
434                        part: PartToMerge::Bulk {
435                            part: bulk_part,
436                            file_id: FileId::random(),
437                        },
438                        merging: false,
439                    });
440                    bulk_parts.unordered_part.clear();
441                }
442            } else {
443                bulk_parts.parts.push(BulkPartWrapper {
444                    part: PartToMerge::Bulk {
445                        part: fragment,
446                        file_id: FileId::random(),
447                    },
448                    merging: false,
449                });
450            }
451
452            // Since this operation should be fast, we do it in parts lock scope.
453            // This ensure the statistics in `ranges()` are correct. What's more,
454            // it guarantees no rows are out of the time range so we don't need to
455            // prune rows by time range again in the iterator of the MemtableRange.
456            self.update_stats(local_metrics);
457        }
458
459        if self.should_compact() {
460            self.schedule_compact();
461        }
462
463        Ok(())
464    }
465
466    fn ranges(
467        &self,
468        projection: Option<&[ColumnId]>,
469        options: RangesOptions,
470    ) -> Result<MemtableRanges> {
471        let predicate = options.predicate;
472        let sequence = options.sequence;
473        let mut ranges = BTreeMap::new();
474        let mut range_id = 0;
475
476        // TODO(yingwen): Filter ranges by sequence.
477        let context = Arc::new(BulkIterContext::new_with_pre_filter_mode(
478            self.metadata.clone(),
479            projection,
480            predicate.predicate().cloned(),
481            options.for_flush,
482            options.pre_filter_mode,
483        )?);
484
485        // Adds ranges for regular parts and encoded parts
486        {
487            let bulk_parts = self.parts.read().unwrap();
488
489            // Adds range for unordered part if not empty
490            if !bulk_parts.unordered_part.is_empty()
491                && let Some(unordered_bulk_part) = bulk_parts.unordered_part.to_bulk_part()?
492            {
493                let part_stats = unordered_bulk_part.to_memtable_stats(&self.metadata);
494                let range = MemtableRange::new(
495                    Arc::new(MemtableRangeContext::new(
496                        self.id,
497                        Box::new(BulkRangeIterBuilder {
498                            part: unordered_bulk_part,
499                            context: context.clone(),
500                            sequence,
501                        }),
502                        predicate.clone(),
503                    )),
504                    part_stats,
505                );
506                ranges.insert(range_id, range);
507                range_id += 1;
508            }
509
510            // Adds ranges for all parts
511            for part_wrapper in bulk_parts.parts.iter() {
512                // Skips empty parts
513                if part_wrapper.part.num_rows() == 0 {
514                    continue;
515                }
516
517                let part_stats = part_wrapper.part.to_memtable_stats(&self.metadata);
518                let iter_builder: Box<dyn IterBuilder> = match &part_wrapper.part {
519                    PartToMerge::Bulk { part, .. } => Box::new(BulkRangeIterBuilder {
520                        part: part.clone(),
521                        context: context.clone(),
522                        sequence,
523                    }),
524                    PartToMerge::Multi { part, .. } => Box::new(MultiBulkRangeIterBuilder {
525                        part: part.clone(),
526                        context: context.clone(),
527                        sequence,
528                    }),
529                    PartToMerge::Encoded { part, file_id } => {
530                        Box::new(EncodedBulkRangeIterBuilder {
531                            file_id: *file_id,
532                            part: part.clone(),
533                            context: context.clone(),
534                            sequence,
535                        })
536                    }
537                };
538
539                let range = MemtableRange::new(
540                    Arc::new(MemtableRangeContext::new(
541                        self.id,
542                        iter_builder,
543                        predicate.clone(),
544                    )),
545                    part_stats,
546                );
547                ranges.insert(range_id, range);
548                range_id += 1;
549            }
550        }
551
552        Ok(MemtableRanges { ranges })
553    }
554
555    fn is_empty(&self) -> bool {
556        let bulk_parts = self.parts.read().unwrap();
557        bulk_parts.is_empty()
558    }
559
560    fn freeze(&self) -> Result<()> {
561        self.alloc_tracker.done_allocating();
562        Ok(())
563    }
564
565    fn stats(&self) -> MemtableStats {
566        let estimated_bytes = self.alloc_tracker.bytes_allocated();
567
568        if estimated_bytes == 0 || self.num_rows.load(Ordering::Relaxed) == 0 {
569            return MemtableStats {
570                estimated_bytes,
571                time_range: None,
572                num_rows: 0,
573                num_ranges: 0,
574                max_sequence: 0,
575                series_count: 0,
576            };
577        }
578
579        let ts_type = self
580            .metadata
581            .time_index_column()
582            .column_schema
583            .data_type
584            .clone()
585            .as_timestamp()
586            .expect("Timestamp column must have timestamp type");
587        let max_timestamp = ts_type.create_timestamp(self.max_timestamp.load(Ordering::Relaxed));
588        let min_timestamp = ts_type.create_timestamp(self.min_timestamp.load(Ordering::Relaxed));
589
590        let num_ranges = self.parts.read().unwrap().num_parts();
591
592        MemtableStats {
593            estimated_bytes,
594            time_range: Some((min_timestamp, max_timestamp)),
595            num_rows: self.num_rows.load(Ordering::Relaxed),
596            num_ranges,
597            max_sequence: self.max_sequence.load(Ordering::Relaxed),
598            series_count: self.estimated_series_count(),
599        }
600    }
601
602    fn fork(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef {
603        // Computes the new flat schema based on the new metadata.
604        let flat_arrow_schema = to_flat_sst_arrow_schema(
605            metadata,
606            &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
607        );
608
609        Arc::new(Self {
610            id,
611            config: self.config.clone(),
612            parts: Arc::new(RwLock::new(BulkParts::default())),
613            metadata: metadata.clone(),
614            alloc_tracker: AllocTracker::new(self.alloc_tracker.write_buffer_manager()),
615            max_timestamp: AtomicI64::new(i64::MIN),
616            min_timestamp: AtomicI64::new(i64::MAX),
617            max_sequence: AtomicU64::new(0),
618            num_rows: AtomicUsize::new(0),
619            flat_arrow_schema,
620            compactor: Arc::new(Mutex::new(MemtableCompactor::new(
621                metadata.region_id,
622                id,
623                self.config.clone(),
624            ))),
625            compact_dispatcher: self.compact_dispatcher.clone(),
626            append_mode: self.append_mode,
627            merge_mode: self.merge_mode,
628        })
629    }
630
631    fn compact(&self, for_flush: bool) -> Result<()> {
632        let mut compactor = self.compactor.lock().unwrap();
633
634        if for_flush {
635            return Ok(());
636        }
637
638        // Unified merge for all parts
639        let should_merge = self
640            .parts
641            .read()
642            .unwrap()
643            .should_merge_parts(self.config.merge_threshold);
644        if should_merge {
645            compactor.merge_parts(
646                &self.flat_arrow_schema,
647                &self.parts,
648                &self.metadata,
649                !self.append_mode,
650                self.merge_mode,
651            )?;
652        }
653
654        Ok(())
655    }
656}
657
658impl BulkMemtable {
659    /// Creates a new BulkMemtable
660    pub fn new(
661        id: MemtableId,
662        config: BulkMemtableConfig,
663        metadata: RegionMetadataRef,
664        write_buffer_manager: Option<WriteBufferManagerRef>,
665        compact_dispatcher: Option<Arc<CompactDispatcher>>,
666        append_mode: bool,
667        merge_mode: MergeMode,
668    ) -> Self {
669        let flat_arrow_schema = to_flat_sst_arrow_schema(
670            &metadata,
671            &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
672        );
673
674        let region_id = metadata.region_id;
675        Self {
676            id,
677            config: config.clone(),
678            parts: Arc::new(RwLock::new(BulkParts::default())),
679            metadata,
680            alloc_tracker: AllocTracker::new(write_buffer_manager),
681            max_timestamp: AtomicI64::new(i64::MIN),
682            min_timestamp: AtomicI64::new(i64::MAX),
683            max_sequence: AtomicU64::new(0),
684            num_rows: AtomicUsize::new(0),
685            flat_arrow_schema,
686            compactor: Arc::new(Mutex::new(MemtableCompactor::new(region_id, id, config))),
687            compact_dispatcher,
688            append_mode,
689            merge_mode,
690        }
691    }
692
693    /// Sets the unordered part threshold (for testing).
694    #[cfg(test)]
695    pub fn set_unordered_part_threshold(&self, threshold: usize) {
696        self.parts
697            .write()
698            .unwrap()
699            .unordered_part
700            .set_threshold(threshold);
701    }
702
703    /// Sets the unordered part compact threshold (for testing).
704    #[cfg(test)]
705    pub fn set_unordered_part_compact_threshold(&self, compact_threshold: usize) {
706        self.parts
707            .write()
708            .unwrap()
709            .unordered_part
710            .set_compact_threshold(compact_threshold);
711    }
712
713    /// Updates memtable stats.
714    ///
715    /// Please update this inside the write lock scope.
716    fn update_stats(&self, stats: WriteMetrics) {
717        self.alloc_tracker
718            .on_allocation(stats.key_bytes + stats.value_bytes);
719
720        self.max_timestamp
721            .fetch_max(stats.max_ts, Ordering::Relaxed);
722        self.min_timestamp
723            .fetch_min(stats.min_ts, Ordering::Relaxed);
724        self.max_sequence
725            .fetch_max(stats.max_sequence, Ordering::Relaxed);
726        self.num_rows.fetch_add(stats.num_rows, Ordering::Relaxed);
727    }
728
729    /// Returns the estimated time series count.
730    fn estimated_series_count(&self) -> usize {
731        let bulk_parts = self.parts.read().unwrap();
732        bulk_parts
733            .parts
734            .iter()
735            .map(|part_wrapper| part_wrapper.part.series_count())
736            .sum()
737    }
738
739    /// Returns whether the memtable should be compacted.
740    fn should_compact(&self) -> bool {
741        let parts = self.parts.read().unwrap();
742        parts.should_merge_parts(self.config.merge_threshold)
743    }
744
745    /// Schedules a compaction task using the CompactDispatcher.
746    fn schedule_compact(&self) {
747        if let Some(dispatcher) = &self.compact_dispatcher {
748            let task = MemCompactTask {
749                metadata: self.metadata.clone(),
750                parts: self.parts.clone(),
751                config: self.config.clone(),
752                flat_arrow_schema: self.flat_arrow_schema.clone(),
753                compactor: self.compactor.clone(),
754                append_mode: self.append_mode,
755                merge_mode: self.merge_mode,
756            };
757
758            dispatcher.dispatch_compact(task);
759        } else {
760            // Uses synchronous compaction if no dispatcher is available.
761            if let Err(e) = self.compact(false) {
762                common_telemetry::error!(e; "Failed to compact table");
763            }
764        }
765    }
766}
767
768/// Iterator builder for bulk range
769pub struct BulkRangeIterBuilder {
770    pub part: BulkPart,
771    pub context: Arc<BulkIterContext>,
772    pub sequence: Option<SequenceRange>,
773}
774
775/// Iterator builder for multi bulk range
776struct MultiBulkRangeIterBuilder {
777    part: MultiBulkPart,
778    context: Arc<BulkIterContext>,
779    sequence: Option<SequenceRange>,
780}
781
782impl IterBuilder for BulkRangeIterBuilder {
783    fn build(&self, _metrics: Option<MemScanMetrics>) -> Result<BoxedBatchIterator> {
784        UnsupportedOperationSnafu {
785            err_msg: "BatchIterator is not supported for bulk memtable",
786        }
787        .fail()
788    }
789
790    fn is_record_batch(&self) -> bool {
791        true
792    }
793
794    fn build_record_batch(
795        &self,
796        _time_range: Option<(Timestamp, Timestamp)>,
797        metrics: Option<MemScanMetrics>,
798    ) -> Result<BoxedRecordBatchIterator> {
799        let metadata = self.context.read_format().metadata();
800        if should_prune_bulk_part(&self.part.batch, &self.context, metadata) {
801            return Ok(Box::new(std::iter::empty()));
802        }
803
804        let series_count = self.part.estimated_series_count();
805        let iter = BulkPartBatchIter::from_single(
806            self.part.batch.clone(),
807            self.context.clone(),
808            self.sequence,
809            series_count,
810            metrics,
811        );
812
813        Ok(Box::new(iter))
814    }
815
816    fn encoded_range(&self) -> Option<EncodedRange> {
817        None
818    }
819}
820
821impl IterBuilder for MultiBulkRangeIterBuilder {
822    fn build(&self, _metrics: Option<MemScanMetrics>) -> Result<BoxedBatchIterator> {
823        UnsupportedOperationSnafu {
824            err_msg: "BatchIterator is not supported for multi bulk memtable",
825        }
826        .fail()
827    }
828
829    fn is_record_batch(&self) -> bool {
830        true
831    }
832
833    fn build_record_batch(
834        &self,
835        _time_range: Option<(Timestamp, Timestamp)>,
836        metrics: Option<MemScanMetrics>,
837    ) -> Result<BoxedRecordBatchIterator> {
838        match self
839            .part
840            .read(self.context.clone(), self.sequence, metrics)?
841        {
842            Some(iter) => Ok(iter),
843            // All batches were pruned by the predicate. Return an empty iterator.
844            None => Ok(Box::new(std::iter::empty())),
845        }
846    }
847
848    fn encoded_range(&self) -> Option<EncodedRange> {
849        None
850    }
851}
852
853/// Iterator builder for encoded bulk range
854struct EncodedBulkRangeIterBuilder {
855    file_id: FileId,
856    part: EncodedBulkPart,
857    context: Arc<BulkIterContext>,
858    sequence: Option<SequenceRange>,
859}
860
861impl IterBuilder for EncodedBulkRangeIterBuilder {
862    fn build(&self, _metrics: Option<MemScanMetrics>) -> Result<BoxedBatchIterator> {
863        UnsupportedOperationSnafu {
864            err_msg: "BatchIterator is not supported for encoded bulk memtable",
865        }
866        .fail()
867    }
868
869    fn is_record_batch(&self) -> bool {
870        true
871    }
872
873    fn build_record_batch(
874        &self,
875        _time_range: Option<(Timestamp, Timestamp)>,
876        metrics: Option<MemScanMetrics>,
877    ) -> Result<BoxedRecordBatchIterator> {
878        if let Some(iter) = self
879            .part
880            .read(self.context.clone(), self.sequence, metrics)?
881        {
882            Ok(iter)
883        } else {
884            // Return an empty iterator if no data to read
885            Ok(Box::new(std::iter::empty()))
886        }
887    }
888
889    fn encoded_range(&self) -> Option<EncodedRange> {
890        Some(EncodedRange {
891            data: self.part.data().clone(),
892            sst_info: self.part.to_sst_info(self.file_id),
893        })
894    }
895}
896
897struct BulkPartWrapper {
898    /// The part to store. It already contains the file id.
899    part: PartToMerge,
900    /// Whether this part is currently being merged.
901    merging: bool,
902}
903
904impl BulkPartWrapper {
905    /// Returns the file id of this part.
906    fn file_id(&self) -> FileId {
907        self.part.file_id()
908    }
909}
910
911/// Enum to wrap different types of parts for unified merging.
912#[derive(Clone)]
913enum PartToMerge {
914    /// Raw bulk part.
915    Bulk { part: BulkPart, file_id: FileId },
916    /// Multiple bulk parts.
917    Multi {
918        part: MultiBulkPart,
919        file_id: FileId,
920    },
921    /// Encoded bulk part.
922    Encoded {
923        part: EncodedBulkPart,
924        file_id: FileId,
925    },
926}
927
928impl PartToMerge {
929    /// Gets the file ID of this part.
930    fn file_id(&self) -> FileId {
931        match self {
932            PartToMerge::Bulk { file_id, .. } => *file_id,
933            PartToMerge::Multi { file_id, .. } => *file_id,
934            PartToMerge::Encoded { file_id, .. } => *file_id,
935        }
936    }
937
938    /// Gets the minimum timestamp of this part.
939    fn min_timestamp(&self) -> i64 {
940        match self {
941            PartToMerge::Bulk { part, .. } => part.min_timestamp,
942            PartToMerge::Multi { part, .. } => part.min_timestamp(),
943            PartToMerge::Encoded { part, .. } => part.metadata().min_timestamp,
944        }
945    }
946
947    /// Gets the maximum timestamp of this part.
948    fn max_timestamp(&self) -> i64 {
949        match self {
950            PartToMerge::Bulk { part, .. } => part.max_timestamp,
951            PartToMerge::Multi { part, .. } => part.max_timestamp(),
952            PartToMerge::Encoded { part, .. } => part.metadata().max_timestamp,
953        }
954    }
955
956    /// Gets the number of rows in this part.
957    fn num_rows(&self) -> usize {
958        match self {
959            PartToMerge::Bulk { part, .. } => part.num_rows(),
960            PartToMerge::Multi { part, .. } => part.num_rows(),
961            PartToMerge::Encoded { part, .. } => part.metadata().num_rows,
962        }
963    }
964
965    /// Gets the maximum sequence number of this part.
966    fn max_sequence(&self) -> u64 {
967        match self {
968            PartToMerge::Bulk { part, .. } => part.sequence,
969            PartToMerge::Multi { part, .. } => part.max_sequence(),
970            PartToMerge::Encoded { part, .. } => part.metadata().max_sequence,
971        }
972    }
973
974    /// Gets the estimated series count in this part.
975    fn series_count(&self) -> usize {
976        match self {
977            PartToMerge::Bulk { part, .. } => part.estimated_series_count(),
978            PartToMerge::Multi { part, .. } => part.series_count(),
979            PartToMerge::Encoded { part, .. } => part.metadata().num_series as usize,
980        }
981    }
982
983    /// Returns true if this is an encoded part.
984    fn is_encoded(&self) -> bool {
985        matches!(self, PartToMerge::Encoded { .. })
986    }
987
988    /// Gets the estimated size in bytes of this part.
989    fn estimated_size(&self) -> usize {
990        match self {
991            PartToMerge::Bulk { part, .. } => part.estimated_size(),
992            PartToMerge::Multi { part, .. } => part.estimated_size(),
993            PartToMerge::Encoded { part, .. } => part.size_bytes(),
994        }
995    }
996
997    /// Converts this part to `MemtableStats`.
998    fn to_memtable_stats(&self, region_metadata: &RegionMetadataRef) -> MemtableStats {
999        match self {
1000            PartToMerge::Bulk { part, .. } => part.to_memtable_stats(region_metadata),
1001            PartToMerge::Multi { part, .. } => part.to_memtable_stats(region_metadata),
1002            PartToMerge::Encoded { part, .. } => part.to_memtable_stats(),
1003        }
1004    }
1005
1006    /// Creates a record batch iterator for this part.
1007    fn create_iterator(
1008        self,
1009        context: Arc<BulkIterContext>,
1010    ) -> Result<Option<BoxedRecordBatchIterator>> {
1011        match self {
1012            PartToMerge::Bulk { part, .. } => {
1013                let series_count = part.estimated_series_count();
1014                let iter = BulkPartBatchIter::from_single(
1015                    part.batch,
1016                    context,
1017                    None, // No sequence filter for merging
1018                    series_count,
1019                    None, // No metrics for merging
1020                );
1021                Ok(Some(Box::new(iter) as BoxedRecordBatchIterator))
1022            }
1023            PartToMerge::Multi { part, .. } => part.read(context, None, None),
1024            PartToMerge::Encoded { part, .. } => part.read(context, None, None),
1025        }
1026    }
1027}
1028
1029struct MemtableCompactor {
1030    region_id: RegionId,
1031    memtable_id: MemtableId,
1032    /// Configuration for the bulk memtable.
1033    config: BulkMemtableConfig,
1034}
1035
1036impl MemtableCompactor {
1037    /// Creates a new MemtableCompactor.
1038    fn new(region_id: RegionId, memtable_id: MemtableId, config: BulkMemtableConfig) -> Self {
1039        Self {
1040            region_id,
1041            memtable_id,
1042            config,
1043        }
1044    }
1045
1046    /// Merges parts (bulk and encoded) and then encodes the result.
1047    fn merge_parts(
1048        &mut self,
1049        arrow_schema: &SchemaRef,
1050        bulk_parts: &RwLock<BulkParts>,
1051        metadata: &RegionMetadataRef,
1052        dedup: bool,
1053        merge_mode: MergeMode,
1054    ) -> Result<()> {
1055        let start = Instant::now();
1056
1057        // Collect pre-grouped parts
1058        let collected = bulk_parts
1059            .write()
1060            .unwrap()
1061            .collect_parts_to_merge(self.config.merge_threshold, self.config.max_merge_groups);
1062
1063        if collected.groups.is_empty() {
1064            return Ok(());
1065        }
1066
1067        // Collect all file IDs for tracking
1068        let merged_file_ids: HashSet<FileId> = collected
1069            .groups
1070            .iter()
1071            .flatten()
1072            .map(|part| part.file_id())
1073            .collect();
1074        let mut guard = MergingFlagsGuard::new(bulk_parts, &merged_file_ids);
1075
1076        let num_groups = collected.groups.len();
1077        let num_parts: usize = collected.groups.iter().map(|g| g.len()).sum();
1078
1079        let encode_row_threshold = self.config.encode_row_threshold;
1080        let encode_bytes_threshold = self.config.encode_bytes_threshold;
1081
1082        // Merge all groups in parallel
1083        let merged_parts = collected
1084            .groups
1085            .into_par_iter()
1086            .map(|group| {
1087                Self::merge_parts_group(
1088                    group,
1089                    arrow_schema,
1090                    metadata,
1091                    dedup,
1092                    merge_mode,
1093                    encode_row_threshold,
1094                    encode_bytes_threshold,
1095                )
1096            })
1097            .collect::<Result<Vec<Option<MergedPart>>>>()?;
1098
1099        // Install all merged parts
1100        let total_output_rows = {
1101            let mut parts = bulk_parts.write().unwrap();
1102            parts.install_merged_parts(merged_parts.into_iter().flatten(), &merged_file_ids)
1103        };
1104
1105        guard.mark_success();
1106
1107        common_telemetry::debug!(
1108            "BulkMemtable {} {} concurrent compact {} groups, {} parts, {} rows, cost: {:?}",
1109            self.region_id,
1110            self.memtable_id,
1111            num_groups,
1112            num_parts,
1113            total_output_rows,
1114            start.elapsed()
1115        );
1116
1117        Ok(())
1118    }
1119
1120    /// Merges a group of parts into a single part (either MultiBulkPart or EncodedBulkPart).
1121    fn merge_parts_group(
1122        parts_to_merge: Vec<PartToMerge>,
1123        arrow_schema: &SchemaRef,
1124        metadata: &RegionMetadataRef,
1125        dedup: bool,
1126        merge_mode: MergeMode,
1127        encode_row_threshold: usize,
1128        encode_bytes_threshold: usize,
1129    ) -> Result<Option<MergedPart>> {
1130        if parts_to_merge.is_empty() {
1131            return Ok(None);
1132        }
1133
1134        // Calculates timestamp bounds and statistics for merged data
1135        let min_timestamp = parts_to_merge
1136            .iter()
1137            .map(|p| p.min_timestamp())
1138            .min()
1139            .unwrap_or(i64::MAX);
1140        let max_timestamp = parts_to_merge
1141            .iter()
1142            .map(|p| p.max_timestamp())
1143            .max()
1144            .unwrap_or(i64::MIN);
1145        let max_sequence = parts_to_merge
1146            .iter()
1147            .map(|p| p.max_sequence())
1148            .max()
1149            .unwrap_or(0);
1150
1151        // Collects statistics from parts before creating iterators
1152        let estimated_total_rows: usize = parts_to_merge.iter().map(|p| p.num_rows()).sum();
1153        let estimated_total_bytes: usize = parts_to_merge.iter().map(|p| p.estimated_size()).sum();
1154        let estimated_series_count = parts_to_merge
1155            .iter()
1156            .map(|p| p.series_count())
1157            .max()
1158            .unwrap_or(0);
1159
1160        let context = Arc::new(BulkIterContext::new(
1161            metadata.clone(),
1162            None, // No column projection for merging
1163            None, // No predicate for merging
1164            true,
1165        )?);
1166
1167        // Creates iterators for all parts to merge.
1168        let iterators: Vec<BoxedRecordBatchIterator> = parts_to_merge
1169            .into_iter()
1170            .filter_map(|part| part.create_iterator(context.clone()).ok().flatten())
1171            .collect();
1172
1173        if iterators.is_empty() {
1174            return Ok(None);
1175        }
1176
1177        let merged_iter =
1178            FlatMergeIterator::new(arrow_schema.clone(), iterators, DEFAULT_READ_BATCH_SIZE)?;
1179
1180        let boxed_iter: BoxedRecordBatchIterator = if dedup {
1181            // Applies deduplication based on merge mode
1182            match merge_mode {
1183                MergeMode::LastRow => {
1184                    let dedup_iter = FlatDedupIterator::new(merged_iter, FlatLastRow::new(false));
1185                    Box::new(dedup_iter)
1186                }
1187                MergeMode::LastNonNull => {
1188                    let field_column_start =
1189                        field_column_start(metadata, arrow_schema.fields().len());
1190
1191                    let dedup_iter = FlatDedupIterator::new(
1192                        merged_iter,
1193                        FlatLastNonNull::new(field_column_start, false),
1194                    );
1195                    Box::new(dedup_iter)
1196                }
1197            }
1198        } else {
1199            Box::new(merged_iter)
1200        };
1201
1202        // Encode as EncodedBulkPart if rows exceed row threshold OR bytes exceed bytes threshold
1203        if estimated_total_rows > encode_row_threshold
1204            || estimated_total_bytes > encode_bytes_threshold
1205        {
1206            let encoder = BulkPartEncoder::new(metadata.clone(), DEFAULT_ROW_GROUP_SIZE)?;
1207            let mut metrics = BulkPartEncodeMetrics::default();
1208            let encoded_part = encoder.encode_record_batch_iter(
1209                boxed_iter,
1210                arrow_schema.clone(),
1211                min_timestamp,
1212                max_timestamp,
1213                max_sequence,
1214                &mut metrics,
1215            )?;
1216
1217            common_telemetry::trace!("merge_parts_group metrics: {:?}", metrics);
1218
1219            Ok(encoded_part.map(MergedPart::Encoded))
1220        } else {
1221            // Otherwise, collect into MultiBulkPart
1222            let mut batches = Vec::new();
1223            let mut actual_total_rows = 0;
1224
1225            for batch_result in boxed_iter {
1226                let batch = batch_result?;
1227                actual_total_rows += batch.num_rows();
1228                batches.push(batch);
1229            }
1230
1231            if actual_total_rows == 0 {
1232                return Ok(None);
1233            }
1234
1235            let multi_part = MultiBulkPart::new(
1236                batches,
1237                min_timestamp,
1238                max_timestamp,
1239                max_sequence,
1240                estimated_series_count,
1241                metadata,
1242            );
1243
1244            common_telemetry::trace!(
1245                "merge_parts_group created MultiBulkPart: rows={}, batches={}",
1246                actual_total_rows,
1247                multi_part.num_batches()
1248            );
1249
1250            Ok(Some(MergedPart::Multi(multi_part)))
1251        }
1252    }
1253}
1254
1255/// A memtable compact task to run in background.
1256struct MemCompactTask {
1257    metadata: RegionMetadataRef,
1258    parts: Arc<RwLock<BulkParts>>,
1259    /// Configuration for the bulk memtable.
1260    config: BulkMemtableConfig,
1261    /// Cached flat SST arrow schema
1262    flat_arrow_schema: SchemaRef,
1263    /// Compactor for merging bulk parts
1264    compactor: Arc<Mutex<MemtableCompactor>>,
1265    /// Whether the append mode is enabled
1266    append_mode: bool,
1267    /// Mode to handle duplicate rows while merging
1268    merge_mode: MergeMode,
1269}
1270
1271impl MemCompactTask {
1272    fn compact(&self) -> Result<()> {
1273        let mut compactor = self.compactor.lock().unwrap();
1274
1275        let should_merge = self
1276            .parts
1277            .read()
1278            .unwrap()
1279            .should_merge_parts(self.config.merge_threshold);
1280        if should_merge {
1281            compactor.merge_parts(
1282                &self.flat_arrow_schema,
1283                &self.parts,
1284                &self.metadata,
1285                !self.append_mode,
1286                self.merge_mode,
1287            )?;
1288        }
1289
1290        Ok(())
1291    }
1292}
1293
1294/// Scheduler to run compact tasks in background.
1295#[derive(Debug)]
1296pub struct CompactDispatcher {
1297    semaphore: Arc<Semaphore>,
1298}
1299
1300impl CompactDispatcher {
1301    /// Creates a new dispatcher with the given number of max concurrent tasks.
1302    pub fn new(permits: usize) -> Self {
1303        Self {
1304            semaphore: Arc::new(Semaphore::new(permits)),
1305        }
1306    }
1307
1308    /// Dispatches a compact task to run in background.
1309    fn dispatch_compact(&self, task: MemCompactTask) {
1310        let semaphore = self.semaphore.clone();
1311        common_runtime::spawn_global(async move {
1312            let Ok(_permit) = semaphore.acquire().await else {
1313                return;
1314            };
1315
1316            common_runtime::spawn_blocking_global(move || {
1317                if let Err(e) = task.compact() {
1318                    common_telemetry::error!(e; "Failed to compact memtable, region: {}", task.metadata.region_id);
1319                }
1320            });
1321        });
1322    }
1323}
1324
1325/// Builder to build a [BulkMemtable].
1326#[derive(Debug, Default)]
1327pub struct BulkMemtableBuilder {
1328    /// Configuration for the bulk memtable.
1329    config: BulkMemtableConfig,
1330    write_buffer_manager: Option<WriteBufferManagerRef>,
1331    compact_dispatcher: Option<Arc<CompactDispatcher>>,
1332    append_mode: bool,
1333    merge_mode: MergeMode,
1334}
1335
1336impl BulkMemtableBuilder {
1337    /// Creates a new builder with specific `write_buffer_manager`.
1338    pub fn new(
1339        write_buffer_manager: Option<WriteBufferManagerRef>,
1340        append_mode: bool,
1341        merge_mode: MergeMode,
1342    ) -> Self {
1343        Self {
1344            config: BulkMemtableConfig::default(),
1345            write_buffer_manager,
1346            compact_dispatcher: None,
1347            append_mode,
1348            merge_mode,
1349        }
1350    }
1351
1352    /// Sets the compact dispatcher.
1353    pub fn with_compact_dispatcher(mut self, compact_dispatcher: Arc<CompactDispatcher>) -> Self {
1354        self.compact_dispatcher = Some(compact_dispatcher);
1355        self
1356    }
1357}
1358
1359impl MemtableBuilder for BulkMemtableBuilder {
1360    fn build(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef {
1361        Arc::new(BulkMemtable::new(
1362            id,
1363            self.config.clone(),
1364            metadata.clone(),
1365            self.write_buffer_manager.clone(),
1366            self.compact_dispatcher.clone(),
1367            self.append_mode,
1368            self.merge_mode,
1369        ))
1370    }
1371
1372    fn use_bulk_insert(&self, _metadata: &RegionMetadataRef) -> bool {
1373        true
1374    }
1375}
1376
1377#[cfg(test)]
1378mod tests {
1379    use mito_codec::row_converter::build_primary_key_codec;
1380
1381    use super::*;
1382    use crate::memtable::bulk::part::BulkPartConverter;
1383    use crate::read::scan_region::PredicateGroup;
1384    use crate::sst::{FlatSchemaOptions, to_flat_sst_arrow_schema};
1385    use crate::test_util::memtable_util::{build_key_values_with_ts_seq_values, metadata_for_test};
1386
1387    fn create_bulk_part_with_converter(
1388        k0: &str,
1389        k1: u32,
1390        timestamps: Vec<i64>,
1391        values: Vec<Option<f64>>,
1392        sequence: u64,
1393    ) -> Result<BulkPart> {
1394        let metadata = metadata_for_test();
1395        let capacity = 100;
1396        let primary_key_codec = build_primary_key_codec(&metadata);
1397        let schema = to_flat_sst_arrow_schema(
1398            &metadata,
1399            &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
1400        );
1401
1402        let mut converter =
1403            BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec, true);
1404
1405        let key_values = build_key_values_with_ts_seq_values(
1406            &metadata,
1407            k0.to_string(),
1408            k1,
1409            timestamps.into_iter(),
1410            values.into_iter(),
1411            sequence,
1412        );
1413
1414        converter.append_key_values(&key_values)?;
1415        converter.convert()
1416    }
1417
1418    #[test]
1419    fn test_bulk_memtable_write_read() {
1420        let metadata = metadata_for_test();
1421        let memtable = BulkMemtable::new(
1422            999,
1423            BulkMemtableConfig::default(),
1424            metadata.clone(),
1425            None,
1426            None,
1427            false,
1428            MergeMode::LastRow,
1429        );
1430        // Disable unordered_part for this test
1431        memtable.set_unordered_part_threshold(0);
1432
1433        let test_data = [
1434            (
1435                "key_a",
1436                1u32,
1437                vec![1000i64, 2000i64],
1438                vec![Some(10.5), Some(20.5)],
1439                100u64,
1440            ),
1441            (
1442                "key_b",
1443                2u32,
1444                vec![1500i64, 2500i64],
1445                vec![Some(15.5), Some(25.5)],
1446                200u64,
1447            ),
1448            ("key_c", 3u32, vec![3000i64], vec![Some(30.5)], 300u64),
1449        ];
1450
1451        for (k0, k1, timestamps, values, seq) in test_data.iter() {
1452            let part =
1453                create_bulk_part_with_converter(k0, *k1, timestamps.clone(), values.clone(), *seq)
1454                    .unwrap();
1455            memtable.write_bulk(part).unwrap();
1456        }
1457
1458        let stats = memtable.stats();
1459        assert_eq!(5, stats.num_rows);
1460        assert_eq!(3, stats.num_ranges);
1461        assert_eq!(300, stats.max_sequence);
1462
1463        let (min_ts, max_ts) = stats.time_range.unwrap();
1464        assert_eq!(1000, min_ts.value());
1465        assert_eq!(3000, max_ts.value());
1466
1467        let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
1468        let ranges = memtable
1469            .ranges(
1470                None,
1471                RangesOptions::default().with_predicate(predicate_group),
1472            )
1473            .unwrap();
1474
1475        assert_eq!(3, ranges.ranges.len());
1476        let total_rows: usize = ranges.ranges.values().map(|r| r.stats().num_rows()).sum();
1477        assert_eq!(5, total_rows);
1478
1479        for (_range_id, range) in ranges.ranges.iter() {
1480            assert!(range.num_rows() > 0);
1481            assert!(range.is_record_batch());
1482
1483            let record_batch_iter = range.build_record_batch_iter(None, None).unwrap();
1484
1485            let mut total_rows = 0;
1486            for batch_result in record_batch_iter {
1487                let batch = batch_result.unwrap();
1488                total_rows += batch.num_rows();
1489                assert!(batch.num_rows() > 0);
1490                assert_eq!(8, batch.num_columns());
1491            }
1492            assert_eq!(total_rows, range.num_rows());
1493        }
1494    }
1495
1496    #[test]
1497    fn test_bulk_memtable_ranges_with_projection() {
1498        let metadata = metadata_for_test();
1499        let memtable = BulkMemtable::new(
1500            111,
1501            BulkMemtableConfig::default(),
1502            metadata.clone(),
1503            None,
1504            None,
1505            false,
1506            MergeMode::LastRow,
1507        );
1508
1509        let bulk_part = create_bulk_part_with_converter(
1510            "projection_test",
1511            5,
1512            vec![5000, 6000, 7000],
1513            vec![Some(50.0), Some(60.0), Some(70.0)],
1514            500,
1515        )
1516        .unwrap();
1517
1518        memtable.write_bulk(bulk_part).unwrap();
1519
1520        let projection = vec![4u32];
1521        let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
1522        let ranges = memtable
1523            .ranges(
1524                Some(&projection),
1525                RangesOptions::default().with_predicate(predicate_group),
1526            )
1527            .unwrap();
1528
1529        assert_eq!(1, ranges.ranges.len());
1530        let range = ranges.ranges.get(&0).unwrap();
1531
1532        assert!(range.is_record_batch());
1533        let record_batch_iter = range.build_record_batch_iter(None, None).unwrap();
1534
1535        let mut total_rows = 0;
1536        for batch_result in record_batch_iter {
1537            let batch = batch_result.unwrap();
1538            assert!(batch.num_rows() > 0);
1539            assert_eq!(5, batch.num_columns());
1540            total_rows += batch.num_rows();
1541        }
1542        assert_eq!(3, total_rows);
1543    }
1544
1545    #[test]
1546    fn test_bulk_memtable_unsupported_operations() {
1547        let metadata = metadata_for_test();
1548        let memtable = BulkMemtable::new(
1549            111,
1550            BulkMemtableConfig::default(),
1551            metadata.clone(),
1552            None,
1553            None,
1554            false,
1555            MergeMode::LastRow,
1556        );
1557
1558        let key_values = build_key_values_with_ts_seq_values(
1559            &metadata,
1560            "test".to_string(),
1561            1,
1562            vec![1000].into_iter(),
1563            vec![Some(1.0)].into_iter(),
1564            1,
1565        );
1566
1567        let err = memtable.write(&key_values).unwrap_err();
1568        assert!(err.to_string().contains("not supported"));
1569
1570        let kv = key_values.iter().next().unwrap();
1571        let err = memtable.write_one(kv).unwrap_err();
1572        assert!(err.to_string().contains("not supported"));
1573    }
1574
1575    #[test]
1576    fn test_bulk_memtable_freeze() {
1577        let metadata = metadata_for_test();
1578        let memtable = BulkMemtable::new(
1579            222,
1580            BulkMemtableConfig::default(),
1581            metadata.clone(),
1582            None,
1583            None,
1584            false,
1585            MergeMode::LastRow,
1586        );
1587
1588        let bulk_part = create_bulk_part_with_converter(
1589            "freeze_test",
1590            10,
1591            vec![10000],
1592            vec![Some(100.0)],
1593            1000,
1594        )
1595        .unwrap();
1596
1597        memtable.write_bulk(bulk_part).unwrap();
1598        memtable.freeze().unwrap();
1599
1600        let stats_after_freeze = memtable.stats();
1601        assert_eq!(1, stats_after_freeze.num_rows);
1602    }
1603
1604    #[test]
1605    fn test_bulk_memtable_fork() {
1606        let metadata = metadata_for_test();
1607        let original_memtable = BulkMemtable::new(
1608            333,
1609            BulkMemtableConfig::default(),
1610            metadata.clone(),
1611            None,
1612            None,
1613            false,
1614            MergeMode::LastRow,
1615        );
1616
1617        let bulk_part =
1618            create_bulk_part_with_converter("fork_test", 15, vec![15000], vec![Some(150.0)], 1500)
1619                .unwrap();
1620
1621        original_memtable.write_bulk(bulk_part).unwrap();
1622
1623        let forked_memtable = original_memtable.fork(444, &metadata);
1624
1625        assert_eq!(forked_memtable.id(), 444);
1626        assert!(forked_memtable.is_empty());
1627        assert_eq!(0, forked_memtable.stats().num_rows);
1628
1629        assert!(!original_memtable.is_empty());
1630        assert_eq!(1, original_memtable.stats().num_rows);
1631    }
1632
1633    #[test]
1634    fn test_bulk_memtable_ranges_multiple_parts() {
1635        let metadata = metadata_for_test();
1636        let memtable = BulkMemtable::new(
1637            777,
1638            BulkMemtableConfig::default(),
1639            metadata.clone(),
1640            None,
1641            None,
1642            false,
1643            MergeMode::LastRow,
1644        );
1645        // Disable unordered_part for this test
1646        memtable.set_unordered_part_threshold(0);
1647
1648        let parts_data = vec![
1649            (
1650                "part1",
1651                1u32,
1652                vec![1000i64, 1100i64],
1653                vec![Some(10.0), Some(11.0)],
1654                100u64,
1655            ),
1656            (
1657                "part2",
1658                2u32,
1659                vec![2000i64, 2100i64],
1660                vec![Some(20.0), Some(21.0)],
1661                200u64,
1662            ),
1663            ("part3", 3u32, vec![3000i64], vec![Some(30.0)], 300u64),
1664        ];
1665
1666        for (k0, k1, timestamps, values, seq) in parts_data {
1667            let part = create_bulk_part_with_converter(k0, k1, timestamps, values, seq).unwrap();
1668            memtable.write_bulk(part).unwrap();
1669        }
1670
1671        let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
1672        let ranges = memtable
1673            .ranges(
1674                None,
1675                RangesOptions::default().with_predicate(predicate_group),
1676            )
1677            .unwrap();
1678
1679        assert_eq!(3, ranges.ranges.len());
1680        let total_rows: usize = ranges.ranges.values().map(|r| r.stats().num_rows()).sum();
1681        assert_eq!(5, total_rows);
1682        assert_eq!(3, ranges.ranges.len());
1683
1684        for (range_id, range) in ranges.ranges.iter() {
1685            assert!(*range_id < 3);
1686            assert!(range.num_rows() > 0);
1687            assert!(range.is_record_batch());
1688        }
1689    }
1690
1691    #[test]
1692    fn test_bulk_memtable_ranges_with_sequence_filter() {
1693        let metadata = metadata_for_test();
1694        let memtable = BulkMemtable::new(
1695            888,
1696            BulkMemtableConfig::default(),
1697            metadata.clone(),
1698            None,
1699            None,
1700            false,
1701            MergeMode::LastRow,
1702        );
1703
1704        let part = create_bulk_part_with_converter(
1705            "seq_test",
1706            1,
1707            vec![1000, 2000, 3000],
1708            vec![Some(10.0), Some(20.0), Some(30.0)],
1709            500,
1710        )
1711        .unwrap();
1712
1713        memtable.write_bulk(part).unwrap();
1714
1715        let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
1716        let sequence_filter = Some(SequenceRange::LtEq { max: 400 }); // Filters out rows with sequence > 400
1717        let ranges = memtable
1718            .ranges(
1719                None,
1720                RangesOptions::default()
1721                    .with_predicate(predicate_group)
1722                    .with_sequence(sequence_filter),
1723            )
1724            .unwrap();
1725
1726        assert_eq!(1, ranges.ranges.len());
1727        let range = ranges.ranges.get(&0).unwrap();
1728
1729        let mut record_batch_iter = range.build_record_batch_iter(None, None).unwrap();
1730        assert!(record_batch_iter.next().is_none());
1731    }
1732
1733    #[test]
1734    fn test_bulk_memtable_ranges_with_encoded_parts() {
1735        let metadata = metadata_for_test();
1736        let config = BulkMemtableConfig {
1737            merge_threshold: 8,
1738            ..Default::default()
1739        };
1740        let memtable = BulkMemtable::new(
1741            999,
1742            config,
1743            metadata.clone(),
1744            None,
1745            None,
1746            false,
1747            MergeMode::LastRow,
1748        );
1749        // Disable unordered_part for this test
1750        memtable.set_unordered_part_threshold(0);
1751
1752        // Adds enough bulk parts to trigger encoding
1753        for i in 0..10 {
1754            let part = create_bulk_part_with_converter(
1755                &format!("key_{}", i),
1756                i,
1757                vec![1000 + i as i64 * 100],
1758                vec![Some(i as f64 * 10.0)],
1759                100 + i as u64,
1760            )
1761            .unwrap();
1762            memtable.write_bulk(part).unwrap();
1763        }
1764
1765        memtable.compact(false).unwrap();
1766
1767        let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
1768        let ranges = memtable
1769            .ranges(
1770                None,
1771                RangesOptions::default().with_predicate(predicate_group),
1772            )
1773            .unwrap();
1774
1775        // Should have ranges for both bulk parts and encoded parts
1776        assert_eq!(3, ranges.ranges.len());
1777        let total_rows: usize = ranges.ranges.values().map(|r| r.stats().num_rows()).sum();
1778        assert_eq!(10, total_rows);
1779
1780        for (_range_id, range) in ranges.ranges.iter() {
1781            assert!(range.num_rows() > 0);
1782            assert!(range.is_record_batch());
1783
1784            let record_batch_iter = range.build_record_batch_iter(None, None).unwrap();
1785            let mut total_rows = 0;
1786            for batch_result in record_batch_iter {
1787                let batch = batch_result.unwrap();
1788                total_rows += batch.num_rows();
1789                assert!(batch.num_rows() > 0);
1790            }
1791            assert_eq!(total_rows, range.num_rows());
1792        }
1793    }
1794
1795    #[test]
1796    fn test_bulk_memtable_unordered_part() {
1797        let metadata = metadata_for_test();
1798        let memtable = BulkMemtable::new(
1799            1001,
1800            BulkMemtableConfig::default(),
1801            metadata.clone(),
1802            None,
1803            None,
1804            false,
1805            MergeMode::LastRow,
1806        );
1807
1808        // Set smaller thresholds for testing with smaller inputs
1809        // Accept parts with < 5 rows into unordered_part
1810        memtable.set_unordered_part_threshold(5);
1811        // Compact when total rows >= 10
1812        memtable.set_unordered_part_compact_threshold(10);
1813
1814        // Write 3 small parts (each has 2 rows), should be collected in unordered_part
1815        for i in 0..3 {
1816            let part = create_bulk_part_with_converter(
1817                &format!("key_{}", i),
1818                i,
1819                vec![1000 + i as i64 * 100, 1100 + i as i64 * 100],
1820                vec![Some(i as f64 * 10.0), Some(i as f64 * 10.0 + 1.0)],
1821                100 + i as u64,
1822            )
1823            .unwrap();
1824            assert_eq!(2, part.num_rows());
1825            memtable.write_bulk(part).unwrap();
1826        }
1827
1828        // Total rows = 6, not yet reaching compact threshold
1829        let stats = memtable.stats();
1830        assert_eq!(6, stats.num_rows);
1831
1832        // Write 2 more small parts (each has 2 rows)
1833        // This should trigger compaction when total >= 10
1834        for i in 3..5 {
1835            let part = create_bulk_part_with_converter(
1836                &format!("key_{}", i),
1837                i,
1838                vec![1000 + i as i64 * 100, 1100 + i as i64 * 100],
1839                vec![Some(i as f64 * 10.0), Some(i as f64 * 10.0 + 1.0)],
1840                100 + i as u64,
1841            )
1842            .unwrap();
1843            memtable.write_bulk(part).unwrap();
1844        }
1845
1846        // Total rows = 10, should have compacted unordered_part into a regular part
1847        let stats = memtable.stats();
1848        assert_eq!(10, stats.num_rows);
1849
1850        // Verify we can read all data correctly
1851        let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
1852        let ranges = memtable
1853            .ranges(
1854                None,
1855                RangesOptions::default().with_predicate(predicate_group),
1856            )
1857            .unwrap();
1858
1859        // Should have at least 1 range (the compacted part)
1860        assert!(!ranges.ranges.is_empty());
1861        let total_rows: usize = ranges.ranges.values().map(|r| r.stats().num_rows()).sum();
1862        assert_eq!(10, total_rows);
1863
1864        // Read all data and verify
1865        let mut total_rows_read = 0;
1866        for (_range_id, range) in ranges.ranges.iter() {
1867            assert!(range.is_record_batch());
1868            let record_batch_iter = range.build_record_batch_iter(None, None).unwrap();
1869
1870            for batch_result in record_batch_iter {
1871                let batch = batch_result.unwrap();
1872                total_rows_read += batch.num_rows();
1873            }
1874        }
1875        assert_eq!(10, total_rows_read);
1876    }
1877
1878    #[test]
1879    fn test_bulk_memtable_unordered_part_mixed_sizes() {
1880        let metadata = metadata_for_test();
1881        let memtable = BulkMemtable::new(
1882            1002,
1883            BulkMemtableConfig::default(),
1884            metadata.clone(),
1885            None,
1886            None,
1887            false,
1888            MergeMode::LastRow,
1889        );
1890
1891        // Set threshold to 4 rows - parts with < 4 rows go to unordered_part
1892        memtable.set_unordered_part_threshold(4);
1893        memtable.set_unordered_part_compact_threshold(8);
1894
1895        // Write small parts (3 rows each) - should go to unordered_part
1896        for i in 0..2 {
1897            let part = create_bulk_part_with_converter(
1898                &format!("small_{}", i),
1899                i,
1900                vec![1000 + i as i64, 2000 + i as i64, 3000 + i as i64],
1901                vec![Some(i as f64), Some(i as f64 + 1.0), Some(i as f64 + 2.0)],
1902                10 + i as u64,
1903            )
1904            .unwrap();
1905            assert_eq!(3, part.num_rows());
1906            memtable.write_bulk(part).unwrap();
1907        }
1908
1909        // Write a large part (5 rows) - should go directly to regular parts
1910        let large_part = create_bulk_part_with_converter(
1911            "large_key",
1912            100,
1913            vec![5000, 6000, 7000, 8000, 9000],
1914            vec![
1915                Some(100.0),
1916                Some(101.0),
1917                Some(102.0),
1918                Some(103.0),
1919                Some(104.0),
1920            ],
1921            50,
1922        )
1923        .unwrap();
1924        assert_eq!(5, large_part.num_rows());
1925        memtable.write_bulk(large_part).unwrap();
1926
1927        // Write another small part (2 rows) - should trigger compaction of unordered_part
1928        let part = create_bulk_part_with_converter(
1929            "small_2",
1930            2,
1931            vec![4000, 4100],
1932            vec![Some(20.0), Some(21.0)],
1933            30,
1934        )
1935        .unwrap();
1936        memtable.write_bulk(part).unwrap();
1937
1938        let stats = memtable.stats();
1939        assert_eq!(13, stats.num_rows); // 3 + 3 + 5 + 2 = 13
1940
1941        // Verify all data can be read
1942        let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
1943        let ranges = memtable
1944            .ranges(
1945                None,
1946                RangesOptions::default().with_predicate(predicate_group),
1947            )
1948            .unwrap();
1949
1950        let total_rows: usize = ranges.ranges.values().map(|r| r.stats().num_rows()).sum();
1951        assert_eq!(13, total_rows);
1952
1953        let mut total_rows_read = 0;
1954        for (_range_id, range) in ranges.ranges.iter() {
1955            let record_batch_iter = range.build_record_batch_iter(None, None).unwrap();
1956            for batch_result in record_batch_iter {
1957                let batch = batch_result.unwrap();
1958                total_rows_read += batch.num_rows();
1959            }
1960        }
1961        assert_eq!(13, total_rows_read);
1962    }
1963
1964    #[test]
1965    fn test_bulk_memtable_unordered_part_with_ranges() {
1966        let metadata = metadata_for_test();
1967        let memtable = BulkMemtable::new(
1968            1003,
1969            BulkMemtableConfig::default(),
1970            metadata.clone(),
1971            None,
1972            None,
1973            false,
1974            MergeMode::LastRow,
1975        );
1976
1977        // Set small thresholds
1978        memtable.set_unordered_part_threshold(3);
1979        memtable.set_unordered_part_compact_threshold(100); // High threshold to prevent auto-compaction
1980
1981        // Write several small parts that stay in unordered_part
1982        for i in 0..3 {
1983            let part = create_bulk_part_with_converter(
1984                &format!("key_{}", i),
1985                i,
1986                vec![1000 + i as i64 * 100],
1987                vec![Some(i as f64 * 10.0)],
1988                100 + i as u64,
1989            )
1990            .unwrap();
1991            assert_eq!(1, part.num_rows());
1992            memtable.write_bulk(part).unwrap();
1993        }
1994
1995        let stats = memtable.stats();
1996        assert_eq!(3, stats.num_rows);
1997
1998        // Test that ranges() can correctly read from unordered_part
1999        let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
2000        let ranges = memtable
2001            .ranges(
2002                None,
2003                RangesOptions::default().with_predicate(predicate_group),
2004            )
2005            .unwrap();
2006
2007        // Should have 1 range for the unordered_part
2008        assert_eq!(1, ranges.ranges.len());
2009        let total_rows: usize = ranges.ranges.values().map(|r| r.stats().num_rows()).sum();
2010        assert_eq!(3, total_rows);
2011
2012        // Verify data is sorted correctly in the range
2013        let range = ranges.ranges.get(&0).unwrap();
2014        let record_batch_iter = range.build_record_batch_iter(None, None).unwrap();
2015
2016        let mut total_rows = 0;
2017        for batch_result in record_batch_iter {
2018            let batch = batch_result.unwrap();
2019            total_rows += batch.num_rows();
2020            // Verify data is properly sorted by primary key
2021            assert!(batch.num_rows() > 0);
2022        }
2023        assert_eq!(3, total_rows);
2024    }
2025
2026    /// Helper to create a BulkPartWrapper from a BulkPart.
2027    fn create_bulk_part_wrapper(part: BulkPart) -> BulkPartWrapper {
2028        BulkPartWrapper {
2029            part: PartToMerge::Bulk {
2030                part,
2031                file_id: FileId::random(),
2032            },
2033            merging: false,
2034        }
2035    }
2036
2037    #[test]
2038    fn test_should_merge_parts_below_threshold() {
2039        let mut bulk_parts = BulkParts::default();
2040
2041        // Add 7 bulk parts (below DEFAULT_MERGE_THRESHOLD of 8)
2042        for i in 0..DEFAULT_MERGE_THRESHOLD - 1 {
2043            let part = create_bulk_part_with_converter(
2044                &format!("key_{}", i),
2045                i as u32,
2046                vec![1000 + i as i64 * 100],
2047                vec![Some(i as f64 * 10.0)],
2048                100 + i as u64,
2049            )
2050            .unwrap();
2051            bulk_parts.parts.push(create_bulk_part_wrapper(part));
2052        }
2053
2054        // Should not trigger merge since we have only 7 parts
2055        assert!(!bulk_parts.should_merge_parts(DEFAULT_MERGE_THRESHOLD));
2056    }
2057
2058    #[test]
2059    fn test_should_merge_parts_at_threshold() {
2060        let mut bulk_parts = BulkParts::default();
2061        let merge_threshold = 8;
2062
2063        // Add 8 bulk parts (at merge_threshold)
2064        for i in 0..merge_threshold {
2065            let part = create_bulk_part_with_converter(
2066                &format!("key_{}", i),
2067                i as u32,
2068                vec![1000 + i as i64 * 100],
2069                vec![Some(i as f64 * 10.0)],
2070                100 + i as u64,
2071            )
2072            .unwrap();
2073            bulk_parts.parts.push(create_bulk_part_wrapper(part));
2074        }
2075
2076        // Should trigger merge since we have 8 parts
2077        assert!(bulk_parts.should_merge_parts(merge_threshold));
2078    }
2079
2080    #[test]
2081    fn test_should_merge_parts_with_merging_flag() {
2082        let mut bulk_parts = BulkParts::default();
2083        let merge_threshold = 8;
2084
2085        // Add 10 bulk parts
2086        for i in 0..10 {
2087            let part = create_bulk_part_with_converter(
2088                &format!("key_{}", i),
2089                i as u32,
2090                vec![1000 + i as i64 * 100],
2091                vec![Some(i as f64 * 10.0)],
2092                100 + i as u64,
2093            )
2094            .unwrap();
2095            bulk_parts.parts.push(create_bulk_part_wrapper(part));
2096        }
2097
2098        // Should trigger merge since we have 10 parts
2099        assert!(bulk_parts.should_merge_parts(merge_threshold));
2100
2101        // Mark first 3 parts as merging
2102        for wrapper in bulk_parts.parts.iter_mut().take(3) {
2103            wrapper.merging = true;
2104        }
2105
2106        // Now only 7 parts are available for merging, should not trigger
2107        assert!(!bulk_parts.should_merge_parts(merge_threshold));
2108    }
2109
2110    #[test]
2111    fn test_collect_parts_to_merge_grouping() {
2112        let mut bulk_parts = BulkParts::default();
2113
2114        // Add 16 bulk parts with different row counts
2115        for i in 0..16 {
2116            let num_rows = (i % 4) + 1; // 1 to 4 rows
2117            let timestamps: Vec<i64> = (0..num_rows)
2118                .map(|j| 1000 + i as i64 * 100 + j as i64)
2119                .collect();
2120            let values: Vec<Option<f64>> =
2121                (0..num_rows).map(|j| Some((i * 10 + j) as f64)).collect();
2122            let part = create_bulk_part_with_converter(
2123                &format!("key_{}", i),
2124                i as u32,
2125                timestamps,
2126                values,
2127                100 + i as u64,
2128            )
2129            .unwrap();
2130            bulk_parts.parts.push(create_bulk_part_wrapper(part));
2131        }
2132
2133        // Should trigger merge since we have 16 parts
2134        assert!(bulk_parts.should_merge_parts(DEFAULT_MERGE_THRESHOLD));
2135
2136        // Collect parts to merge
2137        let collected =
2138            bulk_parts.collect_parts_to_merge(DEFAULT_MERGE_THRESHOLD, DEFAULT_MAX_MERGE_GROUPS);
2139
2140        // Should have groups
2141        assert!(!collected.groups.is_empty());
2142
2143        // All groups should have parts
2144        for group in &collected.groups {
2145            assert!(!group.is_empty());
2146        }
2147
2148        // Total parts collected should be 16
2149        let total_parts: usize = collected.groups.iter().map(|g| g.len()).sum();
2150        assert_eq!(16, total_parts);
2151    }
2152
2153    #[test]
2154    fn test_bulk_memtable_ranges_with_multi_bulk_part() {
2155        let metadata = metadata_for_test();
2156        let merge_threshold = 8;
2157        let config = BulkMemtableConfig {
2158            merge_threshold,
2159            ..Default::default()
2160        };
2161        let memtable = BulkMemtable::new(
2162            2005,
2163            config,
2164            metadata.clone(),
2165            None,
2166            None,
2167            false,
2168            MergeMode::LastRow,
2169        );
2170        // Disable unordered_part for this test
2171        memtable.set_unordered_part_threshold(0);
2172
2173        // Write enough bulk parts to trigger merge (merge_threshold = 8)
2174        // Each part has small number of rows so total < DEFAULT_ROW_GROUP_SIZE
2175        // This will result in MultiBulkPart after compaction
2176        for i in 0..merge_threshold {
2177            let part = create_bulk_part_with_converter(
2178                &format!("key_{}", i),
2179                i as u32,
2180                vec![1000 + i as i64 * 100, 2000 + i as i64 * 100],
2181                vec![Some(i as f64 * 10.0), Some(i as f64 * 10.0 + 1.0)],
2182                100 + i as u64,
2183            )
2184            .unwrap();
2185            memtable.write_bulk(part).unwrap();
2186        }
2187
2188        // Compact to trigger MultiBulkPart creation (since total rows < DEFAULT_ROW_GROUP_SIZE)
2189        memtable.compact(false).unwrap();
2190
2191        // Verify we can read from the memtable
2192        let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
2193        let ranges = memtable
2194            .ranges(
2195                None,
2196                RangesOptions::default().with_predicate(predicate_group),
2197            )
2198            .unwrap();
2199
2200        assert_eq!(1, ranges.ranges.len());
2201        let expected_rows = merge_threshold * 2; // Each part has 2 rows
2202        let total_rows: usize = ranges.ranges.values().map(|r| r.stats().num_rows()).sum();
2203        assert_eq!(expected_rows, total_rows);
2204
2205        // Read all data
2206        let mut total_rows_read = 0;
2207        for (_range_id, range) in ranges.ranges.iter() {
2208            assert!(range.is_record_batch());
2209            let record_batch_iter = range.build_record_batch_iter(None, None).unwrap();
2210
2211            for batch_result in record_batch_iter {
2212                let batch = batch_result.unwrap();
2213                total_rows_read += batch.num_rows();
2214            }
2215        }
2216        assert_eq!(expected_rows, total_rows_read);
2217    }
2218
2219    #[test]
2220    fn test_multi_bulk_range_iter_builder_all_pruned() {
2221        let metadata = metadata_for_test();
2222        let merge_threshold = 8;
2223        let config = BulkMemtableConfig {
2224            merge_threshold,
2225            ..Default::default()
2226        };
2227        let memtable = BulkMemtable::new(
2228            2006,
2229            config,
2230            metadata.clone(),
2231            None,
2232            None,
2233            false,
2234            MergeMode::LastRow,
2235        );
2236        memtable.set_unordered_part_threshold(0);
2237
2238        // Write enough bulk parts to trigger merge into MultiBulkPart.
2239        for i in 0..merge_threshold {
2240            let part = create_bulk_part_with_converter(
2241                &format!("key_{}", i),
2242                i as u32,
2243                vec![1000 + i as i64 * 100, 2000 + i as i64 * 100],
2244                vec![Some(i as f64 * 10.0), Some(i as f64 * 10.0 + 1.0)],
2245                100 + i as u64,
2246            )
2247            .unwrap();
2248            memtable.write_bulk(part).unwrap();
2249        }
2250        memtable.compact(false).unwrap();
2251
2252        // Use a predicate that matches no rows so all batches are pruned.
2253        let filter = datafusion_expr::col("k0").eq(datafusion_expr::lit("nonexistent"));
2254        let predicate_group = PredicateGroup::new(&metadata, &[filter]).unwrap();
2255        let ranges = memtable
2256            .ranges(
2257                None,
2258                RangesOptions::default().with_predicate(predicate_group),
2259            )
2260            .unwrap();
2261
2262        // Should return ranges but each range should produce an empty iterator
2263        // instead of an error.
2264        for (_range_id, range) in ranges.ranges.iter() {
2265            assert!(range.is_record_batch());
2266            let record_batch_iter = range.build_record_batch_iter(None, None).unwrap();
2267            let total_rows: usize = record_batch_iter.map(|r| r.unwrap().num_rows()).sum();
2268            assert_eq!(0, total_rows);
2269        }
2270    }
2271}