mito2/memtable/
bulk.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Memtable implementation for bulk load
16
17#[allow(unused)]
18pub mod context;
19#[allow(unused)]
20pub mod part;
21pub mod part_reader;
22mod row_group_reader;
23
24use std::collections::{BTreeMap, HashSet};
25use std::sync::atomic::{AtomicI64, AtomicU64, AtomicUsize, Ordering};
26use std::sync::{Arc, LazyLock, Mutex, RwLock};
27use std::time::Instant;
28
29/// Reads an environment variable as usize, returning default if not set or invalid.
30fn env_usize(name: &str, default: usize) -> usize {
31    std::env::var(name)
32        .ok()
33        .and_then(|v| v.parse().ok())
34        .unwrap_or(default)
35}
36
37use datatypes::arrow::datatypes::SchemaRef;
38use mito_codec::key_values::KeyValue;
39use rayon::prelude::*;
40use store_api::metadata::RegionMetadataRef;
41use store_api::storage::{ColumnId, FileId, RegionId, SequenceRange};
42use tokio::sync::Semaphore;
43
44use crate::error::{Result, UnsupportedOperationSnafu};
45use crate::flush::WriteBufferManagerRef;
46use crate::memtable::bulk::context::BulkIterContext;
47use crate::memtable::bulk::part::{
48    BulkPart, BulkPartEncodeMetrics, BulkPartEncoder, MultiBulkPart, UnorderedPart,
49};
50use crate::memtable::bulk::part_reader::BulkPartBatchIter;
51use crate::memtable::stats::WriteMetrics;
52use crate::memtable::{
53    AllocTracker, BoxedBatchIterator, BoxedRecordBatchIterator, EncodedBulkPart, EncodedRange,
54    IterBuilder, KeyValues, MemScanMetrics, Memtable, MemtableBuilder, MemtableId, MemtableRange,
55    MemtableRangeContext, MemtableRanges, MemtableRef, MemtableStats, RangesOptions,
56};
57use crate::read::flat_dedup::{FlatDedupIterator, FlatLastNonNull, FlatLastRow};
58use crate::read::flat_merge::FlatMergeIterator;
59use crate::region::options::MergeMode;
60use crate::sst::parquet::format::FIXED_POS_COLUMN_NUM;
61use crate::sst::parquet::{DEFAULT_READ_BATCH_SIZE, DEFAULT_ROW_GROUP_SIZE};
62use crate::sst::{FlatSchemaOptions, to_flat_sst_arrow_schema};
63
64/// Default merge threshold for triggering compaction.
65const DEFAULT_MERGE_THRESHOLD: usize = 16;
66
67/// Threshold for triggering merge of parts. Configurable via `GREPTIME_BULK_MERGE_THRESHOLD`.
68static MERGE_THRESHOLD: LazyLock<usize> =
69    LazyLock::new(|| env_usize("GREPTIME_BULK_MERGE_THRESHOLD", DEFAULT_MERGE_THRESHOLD));
70
71/// Default maximum number of groups for parallel merging.
72const DEFAULT_MAX_MERGE_GROUPS: usize = 32;
73
74/// Maximum merge groups. Configurable via `GREPTIME_BULK_MAX_MERGE_GROUPS`.
75static MAX_MERGE_GROUPS: LazyLock<usize> =
76    LazyLock::new(|| env_usize("GREPTIME_BULK_MAX_MERGE_GROUPS", DEFAULT_MAX_MERGE_GROUPS));
77
78/// Row threshold for encoding parts. Configurable via `GREPTIME_BULK_ENCODE_ROW_THRESHOLD`.
79/// When estimated rows exceed this threshold, parts are encoded as EncodedBulkPart.
80pub(crate) static ENCODE_ROW_THRESHOLD: LazyLock<usize> = LazyLock::new(|| {
81    env_usize(
82        "GREPTIME_BULK_ENCODE_ROW_THRESHOLD",
83        10 * DEFAULT_ROW_GROUP_SIZE,
84    )
85});
86
87/// Default bytes threshold for encoding.
88const DEFAULT_ENCODE_BYTES_THRESHOLD: usize = 64 * 1024 * 1024;
89
90/// Bytes threshold for encoding parts. Configurable via `GREPTIME_BULK_ENCODE_BYTES_THRESHOLD`.
91/// When estimated bytes exceed this threshold, parts are encoded as EncodedBulkPart.
92static ENCODE_BYTES_THRESHOLD: LazyLock<usize> = LazyLock::new(|| {
93    env_usize(
94        "GREPTIME_BULK_ENCODE_BYTES_THRESHOLD",
95        DEFAULT_ENCODE_BYTES_THRESHOLD,
96    )
97});
98
99/// Configuration for bulk memtable.
100#[derive(Debug, Clone)]
101pub struct BulkMemtableConfig {
102    /// Threshold for triggering merge of parts.
103    pub merge_threshold: usize,
104    /// Row threshold for encoding parts.
105    pub encode_row_threshold: usize,
106    /// Bytes threshold for encoding parts.
107    pub encode_bytes_threshold: usize,
108    /// Maximum number of groups for parallel merging.
109    pub max_merge_groups: usize,
110}
111
112impl Default for BulkMemtableConfig {
113    fn default() -> Self {
114        Self {
115            merge_threshold: *MERGE_THRESHOLD,
116            encode_row_threshold: *ENCODE_ROW_THRESHOLD,
117            encode_bytes_threshold: *ENCODE_BYTES_THRESHOLD,
118            max_merge_groups: *MAX_MERGE_GROUPS,
119        }
120    }
121}
122
123/// Result of merging parts - either a MultiBulkPart or an EncodedBulkPart
124enum MergedPart {
125    /// Merged part stored as MultiBulkPart (when rows < DEFAULT_ROW_GROUP_SIZE)
126    Multi(MultiBulkPart),
127    /// Merged part stored as EncodedBulkPart (when rows >= DEFAULT_ROW_GROUP_SIZE)
128    Encoded(EncodedBulkPart),
129}
130
131/// Result of collecting parts to merge
132struct CollectedParts {
133    /// Groups of parts ready for merging (each group has up to 16 parts)
134    groups: Vec<Vec<PartToMerge>>,
135}
136
137/// All parts in a bulk memtable.
138#[derive(Default)]
139struct BulkParts {
140    /// Unordered small parts (< 1024 rows).
141    unordered_part: UnorderedPart,
142    /// All parts (raw and encoded).
143    parts: Vec<BulkPartWrapper>,
144}
145
146impl BulkParts {
147    /// Total number of parts (including unordered).
148    fn num_parts(&self) -> usize {
149        let unordered_count = if self.unordered_part.is_empty() { 0 } else { 1 };
150        self.parts.len() + unordered_count
151    }
152
153    /// Returns true if there is no part.
154    fn is_empty(&self) -> bool {
155        self.unordered_part.is_empty() && self.parts.is_empty()
156    }
157
158    /// Returns true if bulk parts or encoded parts should be merged.
159    /// Uses short-circuit counting to stop early once threshold is reached.
160    fn should_merge_parts(&self, merge_threshold: usize) -> bool {
161        let mut bulk_count = 0;
162        let mut encoded_count = 0;
163
164        for wrapper in &self.parts {
165            if wrapper.merging {
166                continue;
167            }
168
169            if wrapper.part.is_encoded() {
170                encoded_count += 1;
171            } else {
172                bulk_count += 1;
173            }
174
175            // Short-circuit: stop counting if either threshold is reached
176            if bulk_count >= merge_threshold || encoded_count >= merge_threshold {
177                return true;
178            }
179        }
180
181        false
182    }
183
184    /// Returns true if the unordered_part should be compacted into a BulkPart.
185    fn should_compact_unordered_part(&self) -> bool {
186        self.unordered_part.should_compact()
187    }
188
189    /// Collects unmerged parts and marks them as being merged.
190    /// Only collects parts of types that meet the threshold.
191    /// Parts are pre-grouped into chunks for parallel processing.
192    fn collect_parts_to_merge(
193        &mut self,
194        merge_threshold: usize,
195        max_merge_groups: usize,
196    ) -> CollectedParts {
197        // First pass: collect indices and row counts for each type
198        let mut bulk_indices: Vec<(usize, usize)> = Vec::new();
199        let mut encoded_indices: Vec<(usize, usize)> = Vec::new();
200
201        for (idx, wrapper) in self.parts.iter().enumerate() {
202            if wrapper.merging {
203                continue;
204            }
205            let num_rows = wrapper.part.num_rows();
206            if wrapper.part.is_encoded() {
207                encoded_indices.push((idx, num_rows));
208            } else {
209                bulk_indices.push((idx, num_rows));
210            }
211        }
212
213        let mut groups = Vec::new();
214
215        // Process bulk parts if threshold met
216        if bulk_indices.len() >= merge_threshold {
217            groups.extend(self.collect_and_group_parts(
218                bulk_indices,
219                merge_threshold,
220                max_merge_groups,
221            ));
222        }
223
224        // Process encoded parts if threshold met
225        if encoded_indices.len() >= merge_threshold {
226            groups.extend(self.collect_and_group_parts(
227                encoded_indices,
228                merge_threshold,
229                max_merge_groups,
230            ));
231        }
232
233        CollectedParts { groups }
234    }
235
236    /// Sorts indices by row count, groups into chunks, marks as merging, and returns groups.
237    fn collect_and_group_parts(
238        &mut self,
239        mut indices: Vec<(usize, usize)>,
240        merge_threshold: usize,
241        max_merge_groups: usize,
242    ) -> Vec<Vec<PartToMerge>> {
243        if indices.is_empty() {
244            return Vec::new();
245        }
246
247        // Sort by row count for better grouping
248        indices.sort_unstable_by_key(|(_, num_rows)| *num_rows);
249
250        // Group into chunks of merge_threshold size, limit to max_merge_groups
251        indices
252            .chunks(merge_threshold)
253            .take(max_merge_groups)
254            .map(|chunk| {
255                chunk
256                    .iter()
257                    .map(|(idx, _)| {
258                        let wrapper = &mut self.parts[*idx];
259                        wrapper.merging = true;
260                        wrapper.part.clone()
261                    })
262                    .collect()
263            })
264            .collect()
265    }
266
267    /// Installs merged parts and removes the original parts by file ids.
268    /// Returns the total number of rows in the merged parts.
269    fn install_merged_parts<I>(
270        &mut self,
271        merged_parts: I,
272        merged_file_ids: &HashSet<FileId>,
273    ) -> usize
274    where
275        I: IntoIterator<Item = MergedPart>,
276    {
277        let mut total_output_rows = 0;
278
279        for merged_part in merged_parts {
280            match merged_part {
281                MergedPart::Encoded(encoded_part) => {
282                    total_output_rows += encoded_part.metadata().num_rows;
283                    self.parts.push(BulkPartWrapper {
284                        part: PartToMerge::Encoded {
285                            part: encoded_part,
286                            file_id: FileId::random(),
287                        },
288                        merging: false,
289                    });
290                }
291                MergedPart::Multi(multi_part) => {
292                    total_output_rows += multi_part.num_rows();
293                    self.parts.push(BulkPartWrapper {
294                        part: PartToMerge::Multi {
295                            part: multi_part,
296                            file_id: FileId::random(),
297                        },
298                        merging: false,
299                    });
300                }
301            }
302        }
303
304        self.parts
305            .retain(|wrapper| !merged_file_ids.contains(&wrapper.file_id()));
306
307        total_output_rows
308    }
309
310    /// Resets merging flag for parts with the given file ids.
311    /// Used when merging fails or is cancelled.
312    fn reset_merging_flags(&mut self, file_ids: &HashSet<FileId>) {
313        for wrapper in &mut self.parts {
314            if file_ids.contains(&wrapper.file_id()) {
315                wrapper.merging = false;
316            }
317        }
318    }
319}
320
321/// RAII guard for managing merging flags.
322/// Automatically resets merging flags when dropped if the merge operation wasn't successful.
323struct MergingFlagsGuard<'a> {
324    bulk_parts: &'a RwLock<BulkParts>,
325    file_ids: &'a HashSet<FileId>,
326    success: bool,
327}
328
329impl<'a> MergingFlagsGuard<'a> {
330    /// Creates a new guard for the given file ids.
331    fn new(bulk_parts: &'a RwLock<BulkParts>, file_ids: &'a HashSet<FileId>) -> Self {
332        Self {
333            bulk_parts,
334            file_ids,
335            success: false,
336        }
337    }
338
339    /// Marks the merge operation as successful.
340    /// When this is called, the guard will not reset the flags on drop.
341    fn mark_success(&mut self) {
342        self.success = true;
343    }
344}
345
346impl<'a> Drop for MergingFlagsGuard<'a> {
347    fn drop(&mut self) {
348        if !self.success
349            && let Ok(mut parts) = self.bulk_parts.write()
350        {
351            parts.reset_merging_flags(self.file_ids);
352        }
353    }
354}
355
356/// Memtable that ingests and scans parts directly.
357pub struct BulkMemtable {
358    id: MemtableId,
359    /// Configuration for the bulk memtable.
360    config: BulkMemtableConfig,
361    parts: Arc<RwLock<BulkParts>>,
362    metadata: RegionMetadataRef,
363    alloc_tracker: AllocTracker,
364    max_timestamp: AtomicI64,
365    min_timestamp: AtomicI64,
366    max_sequence: AtomicU64,
367    num_rows: AtomicUsize,
368    /// Cached flat SST arrow schema for memtable compaction.
369    flat_arrow_schema: SchemaRef,
370    /// Compactor for merging bulk parts
371    compactor: Arc<Mutex<MemtableCompactor>>,
372    /// Dispatcher for scheduling compaction tasks
373    compact_dispatcher: Option<Arc<CompactDispatcher>>,
374    /// Whether the append mode is enabled
375    append_mode: bool,
376    /// Mode to handle duplicate rows while merging
377    merge_mode: MergeMode,
378}
379
380impl std::fmt::Debug for BulkMemtable {
381    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
382        f.debug_struct("BulkMemtable")
383            .field("id", &self.id)
384            .field("num_rows", &self.num_rows.load(Ordering::Relaxed))
385            .field("min_timestamp", &self.min_timestamp.load(Ordering::Relaxed))
386            .field("max_timestamp", &self.max_timestamp.load(Ordering::Relaxed))
387            .field("max_sequence", &self.max_sequence.load(Ordering::Relaxed))
388            .finish()
389    }
390}
391
392impl Memtable for BulkMemtable {
393    fn id(&self) -> MemtableId {
394        self.id
395    }
396
397    fn write(&self, _kvs: &KeyValues) -> Result<()> {
398        UnsupportedOperationSnafu {
399            err_msg: "write() is not supported for bulk memtable",
400        }
401        .fail()
402    }
403
404    fn write_one(&self, _key_value: KeyValue) -> Result<()> {
405        UnsupportedOperationSnafu {
406            err_msg: "write_one() is not supported for bulk memtable",
407        }
408        .fail()
409    }
410
411    fn write_bulk(&self, fragment: BulkPart) -> Result<()> {
412        let local_metrics = WriteMetrics {
413            key_bytes: 0,
414            value_bytes: fragment.estimated_size(),
415            min_ts: fragment.min_timestamp,
416            max_ts: fragment.max_timestamp,
417            num_rows: fragment.num_rows(),
418            max_sequence: fragment.sequence,
419        };
420
421        {
422            let mut bulk_parts = self.parts.write().unwrap();
423
424            // Routes small parts to unordered_part based on threshold
425            if bulk_parts.unordered_part.should_accept(fragment.num_rows()) {
426                bulk_parts.unordered_part.push(fragment);
427
428                // Compacts unordered_part if threshold is reached
429                if bulk_parts.should_compact_unordered_part()
430                    && let Some(bulk_part) = bulk_parts.unordered_part.to_bulk_part()?
431                {
432                    bulk_parts.parts.push(BulkPartWrapper {
433                        part: PartToMerge::Bulk {
434                            part: bulk_part,
435                            file_id: FileId::random(),
436                        },
437                        merging: false,
438                    });
439                    bulk_parts.unordered_part.clear();
440                }
441            } else {
442                bulk_parts.parts.push(BulkPartWrapper {
443                    part: PartToMerge::Bulk {
444                        part: fragment,
445                        file_id: FileId::random(),
446                    },
447                    merging: false,
448                });
449            }
450
451            // Since this operation should be fast, we do it in parts lock scope.
452            // This ensure the statistics in `ranges()` are correct. What's more,
453            // it guarantees no rows are out of the time range so we don't need to
454            // prune rows by time range again in the iterator of the MemtableRange.
455            self.update_stats(local_metrics);
456        }
457
458        if self.should_compact() {
459            self.schedule_compact();
460        }
461
462        Ok(())
463    }
464
465    #[cfg(any(test, feature = "test"))]
466    fn iter(
467        &self,
468        _projection: Option<&[ColumnId]>,
469        _predicate: Option<table::predicate::Predicate>,
470        _sequence: Option<SequenceRange>,
471    ) -> Result<crate::memtable::BoxedBatchIterator> {
472        todo!()
473    }
474
475    fn ranges(
476        &self,
477        projection: Option<&[ColumnId]>,
478        options: RangesOptions,
479    ) -> Result<MemtableRanges> {
480        let predicate = options.predicate;
481        let sequence = options.sequence;
482        let mut ranges = BTreeMap::new();
483        let mut range_id = 0;
484
485        // TODO(yingwen): Filter ranges by sequence.
486        let context = Arc::new(BulkIterContext::new_with_pre_filter_mode(
487            self.metadata.clone(),
488            projection,
489            predicate.predicate().cloned(),
490            options.for_flush,
491            options.pre_filter_mode,
492        )?);
493
494        // Adds ranges for regular parts and encoded parts
495        {
496            let bulk_parts = self.parts.read().unwrap();
497
498            // Adds range for unordered part if not empty
499            if !bulk_parts.unordered_part.is_empty()
500                && let Some(unordered_bulk_part) = bulk_parts.unordered_part.to_bulk_part()?
501            {
502                let part_stats = unordered_bulk_part.to_memtable_stats(&self.metadata);
503                let range = MemtableRange::new(
504                    Arc::new(MemtableRangeContext::new(
505                        self.id,
506                        Box::new(BulkRangeIterBuilder {
507                            part: unordered_bulk_part,
508                            context: context.clone(),
509                            sequence,
510                        }),
511                        predicate.clone(),
512                    )),
513                    part_stats,
514                );
515                ranges.insert(range_id, range);
516                range_id += 1;
517            }
518
519            // Adds ranges for all parts
520            for part_wrapper in bulk_parts.parts.iter() {
521                // Skips empty parts
522                if part_wrapper.part.num_rows() == 0 {
523                    continue;
524                }
525
526                let part_stats = part_wrapper.part.to_memtable_stats(&self.metadata);
527                let iter_builder: Box<dyn IterBuilder> = match &part_wrapper.part {
528                    PartToMerge::Bulk { part, .. } => Box::new(BulkRangeIterBuilder {
529                        part: part.clone(),
530                        context: context.clone(),
531                        sequence,
532                    }),
533                    PartToMerge::Multi { part, .. } => Box::new(MultiBulkRangeIterBuilder {
534                        part: part.clone(),
535                        context: context.clone(),
536                        sequence,
537                    }),
538                    PartToMerge::Encoded { part, file_id } => {
539                        Box::new(EncodedBulkRangeIterBuilder {
540                            file_id: *file_id,
541                            part: part.clone(),
542                            context: context.clone(),
543                            sequence,
544                        })
545                    }
546                };
547
548                let range = MemtableRange::new(
549                    Arc::new(MemtableRangeContext::new(
550                        self.id,
551                        iter_builder,
552                        predicate.clone(),
553                    )),
554                    part_stats,
555                );
556                ranges.insert(range_id, range);
557                range_id += 1;
558            }
559        }
560
561        Ok(MemtableRanges { ranges })
562    }
563
564    fn is_empty(&self) -> bool {
565        let bulk_parts = self.parts.read().unwrap();
566        bulk_parts.is_empty()
567    }
568
569    fn freeze(&self) -> Result<()> {
570        self.alloc_tracker.done_allocating();
571        Ok(())
572    }
573
574    fn stats(&self) -> MemtableStats {
575        let estimated_bytes = self.alloc_tracker.bytes_allocated();
576
577        if estimated_bytes == 0 || self.num_rows.load(Ordering::Relaxed) == 0 {
578            return MemtableStats {
579                estimated_bytes,
580                time_range: None,
581                num_rows: 0,
582                num_ranges: 0,
583                max_sequence: 0,
584                series_count: 0,
585            };
586        }
587
588        let ts_type = self
589            .metadata
590            .time_index_column()
591            .column_schema
592            .data_type
593            .clone()
594            .as_timestamp()
595            .expect("Timestamp column must have timestamp type");
596        let max_timestamp = ts_type.create_timestamp(self.max_timestamp.load(Ordering::Relaxed));
597        let min_timestamp = ts_type.create_timestamp(self.min_timestamp.load(Ordering::Relaxed));
598
599        let num_ranges = self.parts.read().unwrap().num_parts();
600
601        MemtableStats {
602            estimated_bytes,
603            time_range: Some((min_timestamp, max_timestamp)),
604            num_rows: self.num_rows.load(Ordering::Relaxed),
605            num_ranges,
606            max_sequence: self.max_sequence.load(Ordering::Relaxed),
607            series_count: self.estimated_series_count(),
608        }
609    }
610
611    fn fork(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef {
612        // Computes the new flat schema based on the new metadata.
613        let flat_arrow_schema = to_flat_sst_arrow_schema(
614            metadata,
615            &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
616        );
617
618        Arc::new(Self {
619            id,
620            config: self.config.clone(),
621            parts: Arc::new(RwLock::new(BulkParts::default())),
622            metadata: metadata.clone(),
623            alloc_tracker: AllocTracker::new(self.alloc_tracker.write_buffer_manager()),
624            max_timestamp: AtomicI64::new(i64::MIN),
625            min_timestamp: AtomicI64::new(i64::MAX),
626            max_sequence: AtomicU64::new(0),
627            num_rows: AtomicUsize::new(0),
628            flat_arrow_schema,
629            compactor: Arc::new(Mutex::new(MemtableCompactor::new(
630                metadata.region_id,
631                id,
632                self.config.clone(),
633            ))),
634            compact_dispatcher: self.compact_dispatcher.clone(),
635            append_mode: self.append_mode,
636            merge_mode: self.merge_mode,
637        })
638    }
639
640    fn compact(&self, for_flush: bool) -> Result<()> {
641        let mut compactor = self.compactor.lock().unwrap();
642
643        if for_flush {
644            return Ok(());
645        }
646
647        // Unified merge for all parts
648        let should_merge = self
649            .parts
650            .read()
651            .unwrap()
652            .should_merge_parts(self.config.merge_threshold);
653        if should_merge {
654            compactor.merge_parts(
655                &self.flat_arrow_schema,
656                &self.parts,
657                &self.metadata,
658                !self.append_mode,
659                self.merge_mode,
660            )?;
661        }
662
663        Ok(())
664    }
665}
666
667impl BulkMemtable {
668    /// Creates a new BulkMemtable
669    pub fn new(
670        id: MemtableId,
671        config: BulkMemtableConfig,
672        metadata: RegionMetadataRef,
673        write_buffer_manager: Option<WriteBufferManagerRef>,
674        compact_dispatcher: Option<Arc<CompactDispatcher>>,
675        append_mode: bool,
676        merge_mode: MergeMode,
677    ) -> Self {
678        let flat_arrow_schema = to_flat_sst_arrow_schema(
679            &metadata,
680            &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
681        );
682
683        let region_id = metadata.region_id;
684        Self {
685            id,
686            config: config.clone(),
687            parts: Arc::new(RwLock::new(BulkParts::default())),
688            metadata,
689            alloc_tracker: AllocTracker::new(write_buffer_manager),
690            max_timestamp: AtomicI64::new(i64::MIN),
691            min_timestamp: AtomicI64::new(i64::MAX),
692            max_sequence: AtomicU64::new(0),
693            num_rows: AtomicUsize::new(0),
694            flat_arrow_schema,
695            compactor: Arc::new(Mutex::new(MemtableCompactor::new(region_id, id, config))),
696            compact_dispatcher,
697            append_mode,
698            merge_mode,
699        }
700    }
701
702    /// Sets the unordered part threshold (for testing).
703    #[cfg(test)]
704    pub fn set_unordered_part_threshold(&self, threshold: usize) {
705        self.parts
706            .write()
707            .unwrap()
708            .unordered_part
709            .set_threshold(threshold);
710    }
711
712    /// Sets the unordered part compact threshold (for testing).
713    #[cfg(test)]
714    pub fn set_unordered_part_compact_threshold(&self, compact_threshold: usize) {
715        self.parts
716            .write()
717            .unwrap()
718            .unordered_part
719            .set_compact_threshold(compact_threshold);
720    }
721
722    /// Updates memtable stats.
723    ///
724    /// Please update this inside the write lock scope.
725    fn update_stats(&self, stats: WriteMetrics) {
726        self.alloc_tracker
727            .on_allocation(stats.key_bytes + stats.value_bytes);
728
729        self.max_timestamp
730            .fetch_max(stats.max_ts, Ordering::Relaxed);
731        self.min_timestamp
732            .fetch_min(stats.min_ts, Ordering::Relaxed);
733        self.max_sequence
734            .fetch_max(stats.max_sequence, Ordering::Relaxed);
735        self.num_rows.fetch_add(stats.num_rows, Ordering::Relaxed);
736    }
737
738    /// Returns the estimated time series count.
739    fn estimated_series_count(&self) -> usize {
740        let bulk_parts = self.parts.read().unwrap();
741        bulk_parts
742            .parts
743            .iter()
744            .map(|part_wrapper| part_wrapper.part.series_count())
745            .sum()
746    }
747
748    /// Returns whether the memtable should be compacted.
749    fn should_compact(&self) -> bool {
750        let parts = self.parts.read().unwrap();
751        parts.should_merge_parts(self.config.merge_threshold)
752    }
753
754    /// Schedules a compaction task using the CompactDispatcher.
755    fn schedule_compact(&self) {
756        if let Some(dispatcher) = &self.compact_dispatcher {
757            let task = MemCompactTask {
758                metadata: self.metadata.clone(),
759                parts: self.parts.clone(),
760                config: self.config.clone(),
761                flat_arrow_schema: self.flat_arrow_schema.clone(),
762                compactor: self.compactor.clone(),
763                append_mode: self.append_mode,
764                merge_mode: self.merge_mode,
765            };
766
767            dispatcher.dispatch_compact(task);
768        } else {
769            // Uses synchronous compaction if no dispatcher is available.
770            if let Err(e) = self.compact(false) {
771                common_telemetry::error!(e; "Failed to compact table");
772            }
773        }
774    }
775}
776
777/// Iterator builder for bulk range
778pub struct BulkRangeIterBuilder {
779    pub part: BulkPart,
780    pub context: Arc<BulkIterContext>,
781    pub sequence: Option<SequenceRange>,
782}
783
784/// Iterator builder for multi bulk range
785struct MultiBulkRangeIterBuilder {
786    part: MultiBulkPart,
787    context: Arc<BulkIterContext>,
788    sequence: Option<SequenceRange>,
789}
790
791impl IterBuilder for BulkRangeIterBuilder {
792    fn build(&self, _metrics: Option<MemScanMetrics>) -> Result<BoxedBatchIterator> {
793        UnsupportedOperationSnafu {
794            err_msg: "BatchIterator is not supported for bulk memtable",
795        }
796        .fail()
797    }
798
799    fn is_record_batch(&self) -> bool {
800        true
801    }
802
803    fn build_record_batch(
804        &self,
805        metrics: Option<MemScanMetrics>,
806    ) -> Result<BoxedRecordBatchIterator> {
807        let series_count = self.part.estimated_series_count();
808        let iter = BulkPartBatchIter::from_single(
809            self.part.batch.clone(),
810            self.context.clone(),
811            self.sequence,
812            series_count,
813            metrics,
814        );
815
816        Ok(Box::new(iter))
817    }
818
819    fn encoded_range(&self) -> Option<EncodedRange> {
820        None
821    }
822}
823
824impl IterBuilder for MultiBulkRangeIterBuilder {
825    fn build(&self, _metrics: Option<MemScanMetrics>) -> Result<BoxedBatchIterator> {
826        UnsupportedOperationSnafu {
827            err_msg: "BatchIterator is not supported for multi bulk memtable",
828        }
829        .fail()
830    }
831
832    fn is_record_batch(&self) -> bool {
833        true
834    }
835
836    fn build_record_batch(
837        &self,
838        metrics: Option<MemScanMetrics>,
839    ) -> Result<BoxedRecordBatchIterator> {
840        self.part
841            .read(self.context.clone(), self.sequence, metrics)?
842            .ok_or_else(|| {
843                UnsupportedOperationSnafu {
844                    err_msg: "Failed to create iterator for multi bulk part",
845                }
846                .build()
847            })
848    }
849
850    fn encoded_range(&self) -> Option<EncodedRange> {
851        None
852    }
853}
854
855/// Iterator builder for encoded bulk range
856struct EncodedBulkRangeIterBuilder {
857    file_id: FileId,
858    part: EncodedBulkPart,
859    context: Arc<BulkIterContext>,
860    sequence: Option<SequenceRange>,
861}
862
863impl IterBuilder for EncodedBulkRangeIterBuilder {
864    fn build(&self, _metrics: Option<MemScanMetrics>) -> Result<BoxedBatchIterator> {
865        UnsupportedOperationSnafu {
866            err_msg: "BatchIterator is not supported for encoded bulk memtable",
867        }
868        .fail()
869    }
870
871    fn is_record_batch(&self) -> bool {
872        true
873    }
874
875    fn build_record_batch(
876        &self,
877        metrics: Option<MemScanMetrics>,
878    ) -> Result<BoxedRecordBatchIterator> {
879        if let Some(iter) = self
880            .part
881            .read(self.context.clone(), self.sequence, metrics)?
882        {
883            Ok(iter)
884        } else {
885            // Return an empty iterator if no data to read
886            Ok(Box::new(std::iter::empty()))
887        }
888    }
889
890    fn encoded_range(&self) -> Option<EncodedRange> {
891        Some(EncodedRange {
892            data: self.part.data().clone(),
893            sst_info: self.part.to_sst_info(self.file_id),
894        })
895    }
896}
897
898struct BulkPartWrapper {
899    /// The part to store. It already contains the file id.
900    part: PartToMerge,
901    /// Whether this part is currently being merged.
902    merging: bool,
903}
904
905impl BulkPartWrapper {
906    /// Returns the file id of this part.
907    fn file_id(&self) -> FileId {
908        self.part.file_id()
909    }
910}
911
912/// Enum to wrap different types of parts for unified merging.
913#[derive(Clone)]
914enum PartToMerge {
915    /// Raw bulk part.
916    Bulk { part: BulkPart, file_id: FileId },
917    /// Multiple bulk parts.
918    Multi {
919        part: MultiBulkPart,
920        file_id: FileId,
921    },
922    /// Encoded bulk part.
923    Encoded {
924        part: EncodedBulkPart,
925        file_id: FileId,
926    },
927}
928
929impl PartToMerge {
930    /// Gets the file ID of this part.
931    fn file_id(&self) -> FileId {
932        match self {
933            PartToMerge::Bulk { file_id, .. } => *file_id,
934            PartToMerge::Multi { file_id, .. } => *file_id,
935            PartToMerge::Encoded { file_id, .. } => *file_id,
936        }
937    }
938
939    /// Gets the minimum timestamp of this part.
940    fn min_timestamp(&self) -> i64 {
941        match self {
942            PartToMerge::Bulk { part, .. } => part.min_timestamp,
943            PartToMerge::Multi { part, .. } => part.min_timestamp(),
944            PartToMerge::Encoded { part, .. } => part.metadata().min_timestamp,
945        }
946    }
947
948    /// Gets the maximum timestamp of this part.
949    fn max_timestamp(&self) -> i64 {
950        match self {
951            PartToMerge::Bulk { part, .. } => part.max_timestamp,
952            PartToMerge::Multi { part, .. } => part.max_timestamp(),
953            PartToMerge::Encoded { part, .. } => part.metadata().max_timestamp,
954        }
955    }
956
957    /// Gets the number of rows in this part.
958    fn num_rows(&self) -> usize {
959        match self {
960            PartToMerge::Bulk { part, .. } => part.num_rows(),
961            PartToMerge::Multi { part, .. } => part.num_rows(),
962            PartToMerge::Encoded { part, .. } => part.metadata().num_rows,
963        }
964    }
965
966    /// Gets the maximum sequence number of this part.
967    fn max_sequence(&self) -> u64 {
968        match self {
969            PartToMerge::Bulk { part, .. } => part.sequence,
970            PartToMerge::Multi { part, .. } => part.max_sequence(),
971            PartToMerge::Encoded { part, .. } => part.metadata().max_sequence,
972        }
973    }
974
975    /// Gets the estimated series count in this part.
976    fn series_count(&self) -> usize {
977        match self {
978            PartToMerge::Bulk { part, .. } => part.estimated_series_count(),
979            PartToMerge::Multi { part, .. } => part.series_count(),
980            PartToMerge::Encoded { part, .. } => part.metadata().num_series as usize,
981        }
982    }
983
984    /// Returns true if this is an encoded part.
985    fn is_encoded(&self) -> bool {
986        matches!(self, PartToMerge::Encoded { .. })
987    }
988
989    /// Gets the estimated size in bytes of this part.
990    fn estimated_size(&self) -> usize {
991        match self {
992            PartToMerge::Bulk { part, .. } => part.estimated_size(),
993            PartToMerge::Multi { part, .. } => part.estimated_size(),
994            PartToMerge::Encoded { part, .. } => part.size_bytes(),
995        }
996    }
997
998    /// Converts this part to `MemtableStats`.
999    fn to_memtable_stats(&self, region_metadata: &RegionMetadataRef) -> MemtableStats {
1000        match self {
1001            PartToMerge::Bulk { part, .. } => part.to_memtable_stats(region_metadata),
1002            PartToMerge::Multi { part, .. } => part.to_memtable_stats(region_metadata),
1003            PartToMerge::Encoded { part, .. } => part.to_memtable_stats(),
1004        }
1005    }
1006
1007    /// Creates a record batch iterator for this part.
1008    fn create_iterator(
1009        self,
1010        context: Arc<BulkIterContext>,
1011    ) -> Result<Option<BoxedRecordBatchIterator>> {
1012        match self {
1013            PartToMerge::Bulk { part, .. } => {
1014                let series_count = part.estimated_series_count();
1015                let iter = BulkPartBatchIter::from_single(
1016                    part.batch,
1017                    context,
1018                    None, // No sequence filter for merging
1019                    series_count,
1020                    None, // No metrics for merging
1021                );
1022                Ok(Some(Box::new(iter) as BoxedRecordBatchIterator))
1023            }
1024            PartToMerge::Multi { part, .. } => part.read(context, None, None),
1025            PartToMerge::Encoded { part, .. } => part.read(context, None, None),
1026        }
1027    }
1028}
1029
1030struct MemtableCompactor {
1031    region_id: RegionId,
1032    memtable_id: MemtableId,
1033    /// Configuration for the bulk memtable.
1034    config: BulkMemtableConfig,
1035}
1036
1037impl MemtableCompactor {
1038    /// Creates a new MemtableCompactor.
1039    fn new(region_id: RegionId, memtable_id: MemtableId, config: BulkMemtableConfig) -> Self {
1040        Self {
1041            region_id,
1042            memtable_id,
1043            config,
1044        }
1045    }
1046
1047    /// Merges parts (bulk and encoded) and then encodes the result.
1048    fn merge_parts(
1049        &mut self,
1050        arrow_schema: &SchemaRef,
1051        bulk_parts: &RwLock<BulkParts>,
1052        metadata: &RegionMetadataRef,
1053        dedup: bool,
1054        merge_mode: MergeMode,
1055    ) -> Result<()> {
1056        let start = Instant::now();
1057
1058        // Collect pre-grouped parts
1059        let collected = bulk_parts
1060            .write()
1061            .unwrap()
1062            .collect_parts_to_merge(self.config.merge_threshold, self.config.max_merge_groups);
1063
1064        if collected.groups.is_empty() {
1065            return Ok(());
1066        }
1067
1068        // Collect all file IDs for tracking
1069        let merged_file_ids: HashSet<FileId> = collected
1070            .groups
1071            .iter()
1072            .flatten()
1073            .map(|part| part.file_id())
1074            .collect();
1075        let mut guard = MergingFlagsGuard::new(bulk_parts, &merged_file_ids);
1076
1077        let num_groups = collected.groups.len();
1078        let num_parts: usize = collected.groups.iter().map(|g| g.len()).sum();
1079
1080        let encode_row_threshold = self.config.encode_row_threshold;
1081        let encode_bytes_threshold = self.config.encode_bytes_threshold;
1082
1083        // Merge all groups in parallel
1084        let merged_parts = collected
1085            .groups
1086            .into_par_iter()
1087            .map(|group| {
1088                Self::merge_parts_group(
1089                    group,
1090                    arrow_schema,
1091                    metadata,
1092                    dedup,
1093                    merge_mode,
1094                    encode_row_threshold,
1095                    encode_bytes_threshold,
1096                )
1097            })
1098            .collect::<Result<Vec<Option<MergedPart>>>>()?;
1099
1100        // Install all merged parts
1101        let total_output_rows = {
1102            let mut parts = bulk_parts.write().unwrap();
1103            parts.install_merged_parts(merged_parts.into_iter().flatten(), &merged_file_ids)
1104        };
1105
1106        guard.mark_success();
1107
1108        common_telemetry::debug!(
1109            "BulkMemtable {} {} concurrent compact {} groups, {} parts, {} rows, cost: {:?}",
1110            self.region_id,
1111            self.memtable_id,
1112            num_groups,
1113            num_parts,
1114            total_output_rows,
1115            start.elapsed()
1116        );
1117
1118        Ok(())
1119    }
1120
1121    /// Merges a group of parts into a single part (either MultiBulkPart or EncodedBulkPart).
1122    fn merge_parts_group(
1123        parts_to_merge: Vec<PartToMerge>,
1124        arrow_schema: &SchemaRef,
1125        metadata: &RegionMetadataRef,
1126        dedup: bool,
1127        merge_mode: MergeMode,
1128        encode_row_threshold: usize,
1129        encode_bytes_threshold: usize,
1130    ) -> Result<Option<MergedPart>> {
1131        if parts_to_merge.is_empty() {
1132            return Ok(None);
1133        }
1134
1135        // Calculates timestamp bounds and statistics for merged data
1136        let min_timestamp = parts_to_merge
1137            .iter()
1138            .map(|p| p.min_timestamp())
1139            .min()
1140            .unwrap_or(i64::MAX);
1141        let max_timestamp = parts_to_merge
1142            .iter()
1143            .map(|p| p.max_timestamp())
1144            .max()
1145            .unwrap_or(i64::MIN);
1146        let max_sequence = parts_to_merge
1147            .iter()
1148            .map(|p| p.max_sequence())
1149            .max()
1150            .unwrap_or(0);
1151
1152        // Collects statistics from parts before creating iterators
1153        let estimated_total_rows: usize = parts_to_merge.iter().map(|p| p.num_rows()).sum();
1154        let estimated_total_bytes: usize = parts_to_merge.iter().map(|p| p.estimated_size()).sum();
1155        let estimated_series_count = parts_to_merge
1156            .iter()
1157            .map(|p| p.series_count())
1158            .max()
1159            .unwrap_or(0);
1160
1161        let context = Arc::new(BulkIterContext::new(
1162            metadata.clone(),
1163            None, // No column projection for merging
1164            None, // No predicate for merging
1165            true,
1166        )?);
1167
1168        // Creates iterators for all parts to merge.
1169        let iterators: Vec<BoxedRecordBatchIterator> = parts_to_merge
1170            .into_iter()
1171            .filter_map(|part| part.create_iterator(context.clone()).ok().flatten())
1172            .collect();
1173
1174        if iterators.is_empty() {
1175            return Ok(None);
1176        }
1177
1178        let merged_iter =
1179            FlatMergeIterator::new(arrow_schema.clone(), iterators, DEFAULT_READ_BATCH_SIZE)?;
1180
1181        let boxed_iter: BoxedRecordBatchIterator = if dedup {
1182            // Applies deduplication based on merge mode
1183            match merge_mode {
1184                MergeMode::LastRow => {
1185                    let dedup_iter = FlatDedupIterator::new(merged_iter, FlatLastRow::new(false));
1186                    Box::new(dedup_iter)
1187                }
1188                MergeMode::LastNonNull => {
1189                    // Calculates field column start: total columns - fixed columns - field columns
1190                    // Field column count = total metadata columns - time index column - primary key columns
1191                    let field_column_count =
1192                        metadata.column_metadatas.len() - 1 - metadata.primary_key.len();
1193                    let total_columns = arrow_schema.fields().len();
1194                    let field_column_start =
1195                        total_columns - FIXED_POS_COLUMN_NUM - field_column_count;
1196
1197                    let dedup_iter = FlatDedupIterator::new(
1198                        merged_iter,
1199                        FlatLastNonNull::new(field_column_start, false),
1200                    );
1201                    Box::new(dedup_iter)
1202                }
1203            }
1204        } else {
1205            Box::new(merged_iter)
1206        };
1207
1208        // Encode as EncodedBulkPart if rows exceed row threshold OR bytes exceed bytes threshold
1209        if estimated_total_rows > encode_row_threshold
1210            || estimated_total_bytes > encode_bytes_threshold
1211        {
1212            let encoder = BulkPartEncoder::new(metadata.clone(), DEFAULT_ROW_GROUP_SIZE)?;
1213            let mut metrics = BulkPartEncodeMetrics::default();
1214            let encoded_part = encoder.encode_record_batch_iter(
1215                boxed_iter,
1216                arrow_schema.clone(),
1217                min_timestamp,
1218                max_timestamp,
1219                max_sequence,
1220                &mut metrics,
1221            )?;
1222
1223            common_telemetry::trace!("merge_parts_group metrics: {:?}", metrics);
1224
1225            Ok(encoded_part.map(MergedPart::Encoded))
1226        } else {
1227            // Otherwise, collect into MultiBulkPart
1228            let mut batches = Vec::new();
1229            let mut actual_total_rows = 0;
1230
1231            for batch_result in boxed_iter {
1232                let batch = batch_result?;
1233                actual_total_rows += batch.num_rows();
1234                batches.push(batch);
1235            }
1236
1237            if actual_total_rows == 0 {
1238                return Ok(None);
1239            }
1240
1241            let multi_part = MultiBulkPart::new(
1242                batches,
1243                min_timestamp,
1244                max_timestamp,
1245                max_sequence,
1246                estimated_series_count,
1247            );
1248
1249            common_telemetry::trace!(
1250                "merge_parts_group created MultiBulkPart: rows={}, batches={}",
1251                actual_total_rows,
1252                multi_part.num_batches()
1253            );
1254
1255            Ok(Some(MergedPart::Multi(multi_part)))
1256        }
1257    }
1258}
1259
1260/// A memtable compact task to run in background.
1261struct MemCompactTask {
1262    metadata: RegionMetadataRef,
1263    parts: Arc<RwLock<BulkParts>>,
1264    /// Configuration for the bulk memtable.
1265    config: BulkMemtableConfig,
1266    /// Cached flat SST arrow schema
1267    flat_arrow_schema: SchemaRef,
1268    /// Compactor for merging bulk parts
1269    compactor: Arc<Mutex<MemtableCompactor>>,
1270    /// Whether the append mode is enabled
1271    append_mode: bool,
1272    /// Mode to handle duplicate rows while merging
1273    merge_mode: MergeMode,
1274}
1275
1276impl MemCompactTask {
1277    fn compact(&self) -> Result<()> {
1278        let mut compactor = self.compactor.lock().unwrap();
1279
1280        let should_merge = self
1281            .parts
1282            .read()
1283            .unwrap()
1284            .should_merge_parts(self.config.merge_threshold);
1285        if should_merge {
1286            compactor.merge_parts(
1287                &self.flat_arrow_schema,
1288                &self.parts,
1289                &self.metadata,
1290                !self.append_mode,
1291                self.merge_mode,
1292            )?;
1293        }
1294
1295        Ok(())
1296    }
1297}
1298
1299/// Scheduler to run compact tasks in background.
1300#[derive(Debug)]
1301pub struct CompactDispatcher {
1302    semaphore: Arc<Semaphore>,
1303}
1304
1305impl CompactDispatcher {
1306    /// Creates a new dispatcher with the given number of max concurrent tasks.
1307    pub fn new(permits: usize) -> Self {
1308        Self {
1309            semaphore: Arc::new(Semaphore::new(permits)),
1310        }
1311    }
1312
1313    /// Dispatches a compact task to run in background.
1314    fn dispatch_compact(&self, task: MemCompactTask) {
1315        let semaphore = self.semaphore.clone();
1316        common_runtime::spawn_global(async move {
1317            let Ok(_permit) = semaphore.acquire().await else {
1318                return;
1319            };
1320
1321            common_runtime::spawn_blocking_global(move || {
1322                if let Err(e) = task.compact() {
1323                    common_telemetry::error!(e; "Failed to compact memtable, region: {}", task.metadata.region_id);
1324                }
1325            });
1326        });
1327    }
1328}
1329
1330/// Builder to build a [BulkMemtable].
1331#[derive(Debug, Default)]
1332pub struct BulkMemtableBuilder {
1333    /// Configuration for the bulk memtable.
1334    config: BulkMemtableConfig,
1335    write_buffer_manager: Option<WriteBufferManagerRef>,
1336    compact_dispatcher: Option<Arc<CompactDispatcher>>,
1337    append_mode: bool,
1338    merge_mode: MergeMode,
1339}
1340
1341impl BulkMemtableBuilder {
1342    /// Creates a new builder with specific `write_buffer_manager`.
1343    pub fn new(
1344        write_buffer_manager: Option<WriteBufferManagerRef>,
1345        append_mode: bool,
1346        merge_mode: MergeMode,
1347    ) -> Self {
1348        Self {
1349            config: BulkMemtableConfig::default(),
1350            write_buffer_manager,
1351            compact_dispatcher: None,
1352            append_mode,
1353            merge_mode,
1354        }
1355    }
1356
1357    /// Sets the compact dispatcher.
1358    pub fn with_compact_dispatcher(mut self, compact_dispatcher: Arc<CompactDispatcher>) -> Self {
1359        self.compact_dispatcher = Some(compact_dispatcher);
1360        self
1361    }
1362}
1363
1364impl MemtableBuilder for BulkMemtableBuilder {
1365    fn build(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef {
1366        Arc::new(BulkMemtable::new(
1367            id,
1368            self.config.clone(),
1369            metadata.clone(),
1370            self.write_buffer_manager.clone(),
1371            self.compact_dispatcher.clone(),
1372            self.append_mode,
1373            self.merge_mode,
1374        ))
1375    }
1376
1377    fn use_bulk_insert(&self, _metadata: &RegionMetadataRef) -> bool {
1378        true
1379    }
1380}
1381
1382#[cfg(test)]
1383mod tests {
1384    use mito_codec::row_converter::build_primary_key_codec;
1385
1386    use super::*;
1387    use crate::memtable::bulk::part::BulkPartConverter;
1388    use crate::read::scan_region::PredicateGroup;
1389    use crate::sst::{FlatSchemaOptions, to_flat_sst_arrow_schema};
1390    use crate::test_util::memtable_util::{build_key_values_with_ts_seq_values, metadata_for_test};
1391
1392    fn create_bulk_part_with_converter(
1393        k0: &str,
1394        k1: u32,
1395        timestamps: Vec<i64>,
1396        values: Vec<Option<f64>>,
1397        sequence: u64,
1398    ) -> Result<BulkPart> {
1399        let metadata = metadata_for_test();
1400        let capacity = 100;
1401        let primary_key_codec = build_primary_key_codec(&metadata);
1402        let schema = to_flat_sst_arrow_schema(
1403            &metadata,
1404            &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
1405        );
1406
1407        let mut converter =
1408            BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec, true);
1409
1410        let key_values = build_key_values_with_ts_seq_values(
1411            &metadata,
1412            k0.to_string(),
1413            k1,
1414            timestamps.into_iter(),
1415            values.into_iter(),
1416            sequence,
1417        );
1418
1419        converter.append_key_values(&key_values)?;
1420        converter.convert()
1421    }
1422
1423    #[test]
1424    fn test_bulk_memtable_write_read() {
1425        let metadata = metadata_for_test();
1426        let memtable = BulkMemtable::new(
1427            999,
1428            BulkMemtableConfig::default(),
1429            metadata.clone(),
1430            None,
1431            None,
1432            false,
1433            MergeMode::LastRow,
1434        );
1435        // Disable unordered_part for this test
1436        memtable.set_unordered_part_threshold(0);
1437
1438        let test_data = [
1439            (
1440                "key_a",
1441                1u32,
1442                vec![1000i64, 2000i64],
1443                vec![Some(10.5), Some(20.5)],
1444                100u64,
1445            ),
1446            (
1447                "key_b",
1448                2u32,
1449                vec![1500i64, 2500i64],
1450                vec![Some(15.5), Some(25.5)],
1451                200u64,
1452            ),
1453            ("key_c", 3u32, vec![3000i64], vec![Some(30.5)], 300u64),
1454        ];
1455
1456        for (k0, k1, timestamps, values, seq) in test_data.iter() {
1457            let part =
1458                create_bulk_part_with_converter(k0, *k1, timestamps.clone(), values.clone(), *seq)
1459                    .unwrap();
1460            memtable.write_bulk(part).unwrap();
1461        }
1462
1463        let stats = memtable.stats();
1464        assert_eq!(5, stats.num_rows);
1465        assert_eq!(3, stats.num_ranges);
1466        assert_eq!(300, stats.max_sequence);
1467
1468        let (min_ts, max_ts) = stats.time_range.unwrap();
1469        assert_eq!(1000, min_ts.value());
1470        assert_eq!(3000, max_ts.value());
1471
1472        let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
1473        let ranges = memtable
1474            .ranges(
1475                None,
1476                RangesOptions::default().with_predicate(predicate_group),
1477            )
1478            .unwrap();
1479
1480        assert_eq!(3, ranges.ranges.len());
1481        let total_rows: usize = ranges.ranges.values().map(|r| r.stats().num_rows()).sum();
1482        assert_eq!(5, total_rows);
1483
1484        for (_range_id, range) in ranges.ranges.iter() {
1485            assert!(range.num_rows() > 0);
1486            assert!(range.is_record_batch());
1487
1488            let record_batch_iter = range.build_record_batch_iter(None).unwrap();
1489
1490            let mut total_rows = 0;
1491            for batch_result in record_batch_iter {
1492                let batch = batch_result.unwrap();
1493                total_rows += batch.num_rows();
1494                assert!(batch.num_rows() > 0);
1495                assert_eq!(8, batch.num_columns());
1496            }
1497            assert_eq!(total_rows, range.num_rows());
1498        }
1499    }
1500
1501    #[test]
1502    fn test_bulk_memtable_ranges_with_projection() {
1503        let metadata = metadata_for_test();
1504        let memtable = BulkMemtable::new(
1505            111,
1506            BulkMemtableConfig::default(),
1507            metadata.clone(),
1508            None,
1509            None,
1510            false,
1511            MergeMode::LastRow,
1512        );
1513
1514        let bulk_part = create_bulk_part_with_converter(
1515            "projection_test",
1516            5,
1517            vec![5000, 6000, 7000],
1518            vec![Some(50.0), Some(60.0), Some(70.0)],
1519            500,
1520        )
1521        .unwrap();
1522
1523        memtable.write_bulk(bulk_part).unwrap();
1524
1525        let projection = vec![4u32];
1526        let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
1527        let ranges = memtable
1528            .ranges(
1529                Some(&projection),
1530                RangesOptions::default().with_predicate(predicate_group),
1531            )
1532            .unwrap();
1533
1534        assert_eq!(1, ranges.ranges.len());
1535        let range = ranges.ranges.get(&0).unwrap();
1536
1537        assert!(range.is_record_batch());
1538        let record_batch_iter = range.build_record_batch_iter(None).unwrap();
1539
1540        let mut total_rows = 0;
1541        for batch_result in record_batch_iter {
1542            let batch = batch_result.unwrap();
1543            assert!(batch.num_rows() > 0);
1544            assert_eq!(5, batch.num_columns());
1545            total_rows += batch.num_rows();
1546        }
1547        assert_eq!(3, total_rows);
1548    }
1549
1550    #[test]
1551    fn test_bulk_memtable_unsupported_operations() {
1552        let metadata = metadata_for_test();
1553        let memtable = BulkMemtable::new(
1554            111,
1555            BulkMemtableConfig::default(),
1556            metadata.clone(),
1557            None,
1558            None,
1559            false,
1560            MergeMode::LastRow,
1561        );
1562
1563        let key_values = build_key_values_with_ts_seq_values(
1564            &metadata,
1565            "test".to_string(),
1566            1,
1567            vec![1000].into_iter(),
1568            vec![Some(1.0)].into_iter(),
1569            1,
1570        );
1571
1572        let err = memtable.write(&key_values).unwrap_err();
1573        assert!(err.to_string().contains("not supported"));
1574
1575        let kv = key_values.iter().next().unwrap();
1576        let err = memtable.write_one(kv).unwrap_err();
1577        assert!(err.to_string().contains("not supported"));
1578    }
1579
1580    #[test]
1581    fn test_bulk_memtable_freeze() {
1582        let metadata = metadata_for_test();
1583        let memtable = BulkMemtable::new(
1584            222,
1585            BulkMemtableConfig::default(),
1586            metadata.clone(),
1587            None,
1588            None,
1589            false,
1590            MergeMode::LastRow,
1591        );
1592
1593        let bulk_part = create_bulk_part_with_converter(
1594            "freeze_test",
1595            10,
1596            vec![10000],
1597            vec![Some(100.0)],
1598            1000,
1599        )
1600        .unwrap();
1601
1602        memtable.write_bulk(bulk_part).unwrap();
1603        memtable.freeze().unwrap();
1604
1605        let stats_after_freeze = memtable.stats();
1606        assert_eq!(1, stats_after_freeze.num_rows);
1607    }
1608
1609    #[test]
1610    fn test_bulk_memtable_fork() {
1611        let metadata = metadata_for_test();
1612        let original_memtable = BulkMemtable::new(
1613            333,
1614            BulkMemtableConfig::default(),
1615            metadata.clone(),
1616            None,
1617            None,
1618            false,
1619            MergeMode::LastRow,
1620        );
1621
1622        let bulk_part =
1623            create_bulk_part_with_converter("fork_test", 15, vec![15000], vec![Some(150.0)], 1500)
1624                .unwrap();
1625
1626        original_memtable.write_bulk(bulk_part).unwrap();
1627
1628        let forked_memtable = original_memtable.fork(444, &metadata);
1629
1630        assert_eq!(forked_memtable.id(), 444);
1631        assert!(forked_memtable.is_empty());
1632        assert_eq!(0, forked_memtable.stats().num_rows);
1633
1634        assert!(!original_memtable.is_empty());
1635        assert_eq!(1, original_memtable.stats().num_rows);
1636    }
1637
1638    #[test]
1639    fn test_bulk_memtable_ranges_multiple_parts() {
1640        let metadata = metadata_for_test();
1641        let memtable = BulkMemtable::new(
1642            777,
1643            BulkMemtableConfig::default(),
1644            metadata.clone(),
1645            None,
1646            None,
1647            false,
1648            MergeMode::LastRow,
1649        );
1650        // Disable unordered_part for this test
1651        memtable.set_unordered_part_threshold(0);
1652
1653        let parts_data = vec![
1654            (
1655                "part1",
1656                1u32,
1657                vec![1000i64, 1100i64],
1658                vec![Some(10.0), Some(11.0)],
1659                100u64,
1660            ),
1661            (
1662                "part2",
1663                2u32,
1664                vec![2000i64, 2100i64],
1665                vec![Some(20.0), Some(21.0)],
1666                200u64,
1667            ),
1668            ("part3", 3u32, vec![3000i64], vec![Some(30.0)], 300u64),
1669        ];
1670
1671        for (k0, k1, timestamps, values, seq) in parts_data {
1672            let part = create_bulk_part_with_converter(k0, k1, timestamps, values, seq).unwrap();
1673            memtable.write_bulk(part).unwrap();
1674        }
1675
1676        let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
1677        let ranges = memtable
1678            .ranges(
1679                None,
1680                RangesOptions::default().with_predicate(predicate_group),
1681            )
1682            .unwrap();
1683
1684        assert_eq!(3, ranges.ranges.len());
1685        let total_rows: usize = ranges.ranges.values().map(|r| r.stats().num_rows()).sum();
1686        assert_eq!(5, total_rows);
1687        assert_eq!(3, ranges.ranges.len());
1688
1689        for (range_id, range) in ranges.ranges.iter() {
1690            assert!(*range_id < 3);
1691            assert!(range.num_rows() > 0);
1692            assert!(range.is_record_batch());
1693        }
1694    }
1695
1696    #[test]
1697    fn test_bulk_memtable_ranges_with_sequence_filter() {
1698        let metadata = metadata_for_test();
1699        let memtable = BulkMemtable::new(
1700            888,
1701            BulkMemtableConfig::default(),
1702            metadata.clone(),
1703            None,
1704            None,
1705            false,
1706            MergeMode::LastRow,
1707        );
1708
1709        let part = create_bulk_part_with_converter(
1710            "seq_test",
1711            1,
1712            vec![1000, 2000, 3000],
1713            vec![Some(10.0), Some(20.0), Some(30.0)],
1714            500,
1715        )
1716        .unwrap();
1717
1718        memtable.write_bulk(part).unwrap();
1719
1720        let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
1721        let sequence_filter = Some(SequenceRange::LtEq { max: 400 }); // Filters out rows with sequence > 400
1722        let ranges = memtable
1723            .ranges(
1724                None,
1725                RangesOptions::default()
1726                    .with_predicate(predicate_group)
1727                    .with_sequence(sequence_filter),
1728            )
1729            .unwrap();
1730
1731        assert_eq!(1, ranges.ranges.len());
1732        let range = ranges.ranges.get(&0).unwrap();
1733
1734        let mut record_batch_iter = range.build_record_batch_iter(None).unwrap();
1735        assert!(record_batch_iter.next().is_none());
1736    }
1737
1738    #[test]
1739    fn test_bulk_memtable_ranges_with_encoded_parts() {
1740        let metadata = metadata_for_test();
1741        let config = BulkMemtableConfig {
1742            merge_threshold: 8,
1743            ..Default::default()
1744        };
1745        let memtable = BulkMemtable::new(
1746            999,
1747            config,
1748            metadata.clone(),
1749            None,
1750            None,
1751            false,
1752            MergeMode::LastRow,
1753        );
1754        // Disable unordered_part for this test
1755        memtable.set_unordered_part_threshold(0);
1756
1757        // Adds enough bulk parts to trigger encoding
1758        for i in 0..10 {
1759            let part = create_bulk_part_with_converter(
1760                &format!("key_{}", i),
1761                i,
1762                vec![1000 + i as i64 * 100],
1763                vec![Some(i as f64 * 10.0)],
1764                100 + i as u64,
1765            )
1766            .unwrap();
1767            memtable.write_bulk(part).unwrap();
1768        }
1769
1770        memtable.compact(false).unwrap();
1771
1772        let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
1773        let ranges = memtable
1774            .ranges(
1775                None,
1776                RangesOptions::default().with_predicate(predicate_group),
1777            )
1778            .unwrap();
1779
1780        // Should have ranges for both bulk parts and encoded parts
1781        assert_eq!(3, ranges.ranges.len());
1782        let total_rows: usize = ranges.ranges.values().map(|r| r.stats().num_rows()).sum();
1783        assert_eq!(10, total_rows);
1784
1785        for (_range_id, range) in ranges.ranges.iter() {
1786            assert!(range.num_rows() > 0);
1787            assert!(range.is_record_batch());
1788
1789            let record_batch_iter = range.build_record_batch_iter(None).unwrap();
1790            let mut total_rows = 0;
1791            for batch_result in record_batch_iter {
1792                let batch = batch_result.unwrap();
1793                total_rows += batch.num_rows();
1794                assert!(batch.num_rows() > 0);
1795            }
1796            assert_eq!(total_rows, range.num_rows());
1797        }
1798    }
1799
1800    #[test]
1801    fn test_bulk_memtable_unordered_part() {
1802        let metadata = metadata_for_test();
1803        let memtable = BulkMemtable::new(
1804            1001,
1805            BulkMemtableConfig::default(),
1806            metadata.clone(),
1807            None,
1808            None,
1809            false,
1810            MergeMode::LastRow,
1811        );
1812
1813        // Set smaller thresholds for testing with smaller inputs
1814        // Accept parts with < 5 rows into unordered_part
1815        memtable.set_unordered_part_threshold(5);
1816        // Compact when total rows >= 10
1817        memtable.set_unordered_part_compact_threshold(10);
1818
1819        // Write 3 small parts (each has 2 rows), should be collected in unordered_part
1820        for i in 0..3 {
1821            let part = create_bulk_part_with_converter(
1822                &format!("key_{}", i),
1823                i,
1824                vec![1000 + i as i64 * 100, 1100 + i as i64 * 100],
1825                vec![Some(i as f64 * 10.0), Some(i as f64 * 10.0 + 1.0)],
1826                100 + i as u64,
1827            )
1828            .unwrap();
1829            assert_eq!(2, part.num_rows());
1830            memtable.write_bulk(part).unwrap();
1831        }
1832
1833        // Total rows = 6, not yet reaching compact threshold
1834        let stats = memtable.stats();
1835        assert_eq!(6, stats.num_rows);
1836
1837        // Write 2 more small parts (each has 2 rows)
1838        // This should trigger compaction when total >= 10
1839        for i in 3..5 {
1840            let part = create_bulk_part_with_converter(
1841                &format!("key_{}", i),
1842                i,
1843                vec![1000 + i as i64 * 100, 1100 + i as i64 * 100],
1844                vec![Some(i as f64 * 10.0), Some(i as f64 * 10.0 + 1.0)],
1845                100 + i as u64,
1846            )
1847            .unwrap();
1848            memtable.write_bulk(part).unwrap();
1849        }
1850
1851        // Total rows = 10, should have compacted unordered_part into a regular part
1852        let stats = memtable.stats();
1853        assert_eq!(10, stats.num_rows);
1854
1855        // Verify we can read all data correctly
1856        let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
1857        let ranges = memtable
1858            .ranges(
1859                None,
1860                RangesOptions::default().with_predicate(predicate_group),
1861            )
1862            .unwrap();
1863
1864        // Should have at least 1 range (the compacted part)
1865        assert!(!ranges.ranges.is_empty());
1866        let total_rows: usize = ranges.ranges.values().map(|r| r.stats().num_rows()).sum();
1867        assert_eq!(10, total_rows);
1868
1869        // Read all data and verify
1870        let mut total_rows_read = 0;
1871        for (_range_id, range) in ranges.ranges.iter() {
1872            assert!(range.is_record_batch());
1873            let record_batch_iter = range.build_record_batch_iter(None).unwrap();
1874
1875            for batch_result in record_batch_iter {
1876                let batch = batch_result.unwrap();
1877                total_rows_read += batch.num_rows();
1878            }
1879        }
1880        assert_eq!(10, total_rows_read);
1881    }
1882
1883    #[test]
1884    fn test_bulk_memtable_unordered_part_mixed_sizes() {
1885        let metadata = metadata_for_test();
1886        let memtable = BulkMemtable::new(
1887            1002,
1888            BulkMemtableConfig::default(),
1889            metadata.clone(),
1890            None,
1891            None,
1892            false,
1893            MergeMode::LastRow,
1894        );
1895
1896        // Set threshold to 4 rows - parts with < 4 rows go to unordered_part
1897        memtable.set_unordered_part_threshold(4);
1898        memtable.set_unordered_part_compact_threshold(8);
1899
1900        // Write small parts (3 rows each) - should go to unordered_part
1901        for i in 0..2 {
1902            let part = create_bulk_part_with_converter(
1903                &format!("small_{}", i),
1904                i,
1905                vec![1000 + i as i64, 2000 + i as i64, 3000 + i as i64],
1906                vec![Some(i as f64), Some(i as f64 + 1.0), Some(i as f64 + 2.0)],
1907                10 + i as u64,
1908            )
1909            .unwrap();
1910            assert_eq!(3, part.num_rows());
1911            memtable.write_bulk(part).unwrap();
1912        }
1913
1914        // Write a large part (5 rows) - should go directly to regular parts
1915        let large_part = create_bulk_part_with_converter(
1916            "large_key",
1917            100,
1918            vec![5000, 6000, 7000, 8000, 9000],
1919            vec![
1920                Some(100.0),
1921                Some(101.0),
1922                Some(102.0),
1923                Some(103.0),
1924                Some(104.0),
1925            ],
1926            50,
1927        )
1928        .unwrap();
1929        assert_eq!(5, large_part.num_rows());
1930        memtable.write_bulk(large_part).unwrap();
1931
1932        // Write another small part (2 rows) - should trigger compaction of unordered_part
1933        let part = create_bulk_part_with_converter(
1934            "small_2",
1935            2,
1936            vec![4000, 4100],
1937            vec![Some(20.0), Some(21.0)],
1938            30,
1939        )
1940        .unwrap();
1941        memtable.write_bulk(part).unwrap();
1942
1943        let stats = memtable.stats();
1944        assert_eq!(13, stats.num_rows); // 3 + 3 + 5 + 2 = 13
1945
1946        // Verify all data can be read
1947        let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
1948        let ranges = memtable
1949            .ranges(
1950                None,
1951                RangesOptions::default().with_predicate(predicate_group),
1952            )
1953            .unwrap();
1954
1955        let total_rows: usize = ranges.ranges.values().map(|r| r.stats().num_rows()).sum();
1956        assert_eq!(13, total_rows);
1957
1958        let mut total_rows_read = 0;
1959        for (_range_id, range) in ranges.ranges.iter() {
1960            let record_batch_iter = range.build_record_batch_iter(None).unwrap();
1961            for batch_result in record_batch_iter {
1962                let batch = batch_result.unwrap();
1963                total_rows_read += batch.num_rows();
1964            }
1965        }
1966        assert_eq!(13, total_rows_read);
1967    }
1968
1969    #[test]
1970    fn test_bulk_memtable_unordered_part_with_ranges() {
1971        let metadata = metadata_for_test();
1972        let memtable = BulkMemtable::new(
1973            1003,
1974            BulkMemtableConfig::default(),
1975            metadata.clone(),
1976            None,
1977            None,
1978            false,
1979            MergeMode::LastRow,
1980        );
1981
1982        // Set small thresholds
1983        memtable.set_unordered_part_threshold(3);
1984        memtable.set_unordered_part_compact_threshold(100); // High threshold to prevent auto-compaction
1985
1986        // Write several small parts that stay in unordered_part
1987        for i in 0..3 {
1988            let part = create_bulk_part_with_converter(
1989                &format!("key_{}", i),
1990                i,
1991                vec![1000 + i as i64 * 100],
1992                vec![Some(i as f64 * 10.0)],
1993                100 + i as u64,
1994            )
1995            .unwrap();
1996            assert_eq!(1, part.num_rows());
1997            memtable.write_bulk(part).unwrap();
1998        }
1999
2000        let stats = memtable.stats();
2001        assert_eq!(3, stats.num_rows);
2002
2003        // Test that ranges() can correctly read from unordered_part
2004        let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
2005        let ranges = memtable
2006            .ranges(
2007                None,
2008                RangesOptions::default().with_predicate(predicate_group),
2009            )
2010            .unwrap();
2011
2012        // Should have 1 range for the unordered_part
2013        assert_eq!(1, ranges.ranges.len());
2014        let total_rows: usize = ranges.ranges.values().map(|r| r.stats().num_rows()).sum();
2015        assert_eq!(3, total_rows);
2016
2017        // Verify data is sorted correctly in the range
2018        let range = ranges.ranges.get(&0).unwrap();
2019        let record_batch_iter = range.build_record_batch_iter(None).unwrap();
2020
2021        let mut total_rows = 0;
2022        for batch_result in record_batch_iter {
2023            let batch = batch_result.unwrap();
2024            total_rows += batch.num_rows();
2025            // Verify data is properly sorted by primary key
2026            assert!(batch.num_rows() > 0);
2027        }
2028        assert_eq!(3, total_rows);
2029    }
2030
2031    /// Helper to create a BulkPartWrapper from a BulkPart.
2032    fn create_bulk_part_wrapper(part: BulkPart) -> BulkPartWrapper {
2033        BulkPartWrapper {
2034            part: PartToMerge::Bulk {
2035                part,
2036                file_id: FileId::random(),
2037            },
2038            merging: false,
2039        }
2040    }
2041
2042    #[test]
2043    fn test_should_merge_parts_below_threshold() {
2044        let mut bulk_parts = BulkParts::default();
2045
2046        // Add 7 bulk parts (below DEFAULT_MERGE_THRESHOLD of 8)
2047        for i in 0..DEFAULT_MERGE_THRESHOLD - 1 {
2048            let part = create_bulk_part_with_converter(
2049                &format!("key_{}", i),
2050                i as u32,
2051                vec![1000 + i as i64 * 100],
2052                vec![Some(i as f64 * 10.0)],
2053                100 + i as u64,
2054            )
2055            .unwrap();
2056            bulk_parts.parts.push(create_bulk_part_wrapper(part));
2057        }
2058
2059        // Should not trigger merge since we have only 7 parts
2060        assert!(!bulk_parts.should_merge_parts(DEFAULT_MERGE_THRESHOLD));
2061    }
2062
2063    #[test]
2064    fn test_should_merge_parts_at_threshold() {
2065        let mut bulk_parts = BulkParts::default();
2066        let merge_threshold = 8;
2067
2068        // Add 8 bulk parts (at merge_threshold)
2069        for i in 0..merge_threshold {
2070            let part = create_bulk_part_with_converter(
2071                &format!("key_{}", i),
2072                i as u32,
2073                vec![1000 + i as i64 * 100],
2074                vec![Some(i as f64 * 10.0)],
2075                100 + i as u64,
2076            )
2077            .unwrap();
2078            bulk_parts.parts.push(create_bulk_part_wrapper(part));
2079        }
2080
2081        // Should trigger merge since we have 8 parts
2082        assert!(bulk_parts.should_merge_parts(merge_threshold));
2083    }
2084
2085    #[test]
2086    fn test_should_merge_parts_with_merging_flag() {
2087        let mut bulk_parts = BulkParts::default();
2088        let merge_threshold = 8;
2089
2090        // Add 10 bulk parts
2091        for i in 0..10 {
2092            let part = create_bulk_part_with_converter(
2093                &format!("key_{}", i),
2094                i as u32,
2095                vec![1000 + i as i64 * 100],
2096                vec![Some(i as f64 * 10.0)],
2097                100 + i as u64,
2098            )
2099            .unwrap();
2100            bulk_parts.parts.push(create_bulk_part_wrapper(part));
2101        }
2102
2103        // Should trigger merge since we have 10 parts
2104        assert!(bulk_parts.should_merge_parts(merge_threshold));
2105
2106        // Mark first 3 parts as merging
2107        for wrapper in bulk_parts.parts.iter_mut().take(3) {
2108            wrapper.merging = true;
2109        }
2110
2111        // Now only 7 parts are available for merging, should not trigger
2112        assert!(!bulk_parts.should_merge_parts(merge_threshold));
2113    }
2114
2115    #[test]
2116    fn test_collect_parts_to_merge_grouping() {
2117        let mut bulk_parts = BulkParts::default();
2118
2119        // Add 16 bulk parts with different row counts
2120        for i in 0..16 {
2121            let num_rows = (i % 4) + 1; // 1 to 4 rows
2122            let timestamps: Vec<i64> = (0..num_rows)
2123                .map(|j| 1000 + i as i64 * 100 + j as i64)
2124                .collect();
2125            let values: Vec<Option<f64>> =
2126                (0..num_rows).map(|j| Some((i * 10 + j) as f64)).collect();
2127            let part = create_bulk_part_with_converter(
2128                &format!("key_{}", i),
2129                i as u32,
2130                timestamps,
2131                values,
2132                100 + i as u64,
2133            )
2134            .unwrap();
2135            bulk_parts.parts.push(create_bulk_part_wrapper(part));
2136        }
2137
2138        // Should trigger merge since we have 16 parts
2139        assert!(bulk_parts.should_merge_parts(DEFAULT_MERGE_THRESHOLD));
2140
2141        // Collect parts to merge
2142        let collected =
2143            bulk_parts.collect_parts_to_merge(DEFAULT_MERGE_THRESHOLD, DEFAULT_MAX_MERGE_GROUPS);
2144
2145        // Should have groups
2146        assert!(!collected.groups.is_empty());
2147
2148        // All groups should have parts
2149        for group in &collected.groups {
2150            assert!(!group.is_empty());
2151        }
2152
2153        // Total parts collected should be 16
2154        let total_parts: usize = collected.groups.iter().map(|g| g.len()).sum();
2155        assert_eq!(16, total_parts);
2156    }
2157
2158    #[test]
2159    fn test_bulk_memtable_ranges_with_multi_bulk_part() {
2160        let metadata = metadata_for_test();
2161        let merge_threshold = 8;
2162        let config = BulkMemtableConfig {
2163            merge_threshold,
2164            ..Default::default()
2165        };
2166        let memtable = BulkMemtable::new(
2167            2005,
2168            config,
2169            metadata.clone(),
2170            None,
2171            None,
2172            false,
2173            MergeMode::LastRow,
2174        );
2175        // Disable unordered_part for this test
2176        memtable.set_unordered_part_threshold(0);
2177
2178        // Write enough bulk parts to trigger merge (merge_threshold = 8)
2179        // Each part has small number of rows so total < DEFAULT_ROW_GROUP_SIZE
2180        // This will result in MultiBulkPart after compaction
2181        for i in 0..merge_threshold {
2182            let part = create_bulk_part_with_converter(
2183                &format!("key_{}", i),
2184                i as u32,
2185                vec![1000 + i as i64 * 100, 2000 + i as i64 * 100],
2186                vec![Some(i as f64 * 10.0), Some(i as f64 * 10.0 + 1.0)],
2187                100 + i as u64,
2188            )
2189            .unwrap();
2190            memtable.write_bulk(part).unwrap();
2191        }
2192
2193        // Compact to trigger MultiBulkPart creation (since total rows < DEFAULT_ROW_GROUP_SIZE)
2194        memtable.compact(false).unwrap();
2195
2196        // Verify we can read from the memtable
2197        let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
2198        let ranges = memtable
2199            .ranges(
2200                None,
2201                RangesOptions::default().with_predicate(predicate_group),
2202            )
2203            .unwrap();
2204
2205        assert_eq!(1, ranges.ranges.len());
2206        let expected_rows = merge_threshold * 2; // Each part has 2 rows
2207        let total_rows: usize = ranges.ranges.values().map(|r| r.stats().num_rows()).sum();
2208        assert_eq!(expected_rows, total_rows);
2209
2210        // Read all data
2211        let mut total_rows_read = 0;
2212        for (_range_id, range) in ranges.ranges.iter() {
2213            assert!(range.is_record_batch());
2214            let record_batch_iter = range.build_record_batch_iter(None).unwrap();
2215
2216            for batch_result in record_batch_iter {
2217                let batch = batch_result.unwrap();
2218                total_rows_read += batch.num_rows();
2219            }
2220        }
2221        assert_eq!(expected_rows, total_rows_read);
2222    }
2223}