Skip to main content

mito2/memtable/
bulk.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Memtable implementation for bulk load
16
17pub(crate) mod chunk_reader;
18pub mod context;
19pub mod part;
20pub mod part_reader;
21mod row_group_reader;
22
23use std::collections::{BTreeMap, HashSet};
24use std::sync::atomic::{AtomicI64, AtomicU64, AtomicUsize, Ordering};
25use std::sync::{Arc, LazyLock, Mutex, RwLock};
26use std::time::Instant;
27
28/// Reads an environment variable as usize, returning default if not set or invalid.
29fn env_usize(name: &str, default: usize) -> usize {
30    std::env::var(name)
31        .ok()
32        .and_then(|v| v.parse().ok())
33        .unwrap_or(default)
34}
35
36use common_time::Timestamp;
37use datatypes::arrow::datatypes::SchemaRef;
38use mito_codec::key_values::KeyValue;
39use rayon::prelude::*;
40use store_api::metadata::RegionMetadataRef;
41use store_api::storage::{ColumnId, FileId, RegionId, SequenceRange};
42use tokio::sync::Semaphore;
43
44use crate::error::{Result, UnsupportedOperationSnafu};
45use crate::flush::WriteBufferManagerRef;
46use crate::memtable::bulk::context::BulkIterContext;
47use crate::memtable::bulk::part::{
48    BulkPart, BulkPartEncodeMetrics, BulkPartEncoder, MultiBulkPart, UnorderedPart,
49};
50use crate::memtable::bulk::part_reader::BulkPartBatchIter;
51use crate::memtable::stats::WriteMetrics;
52use crate::memtable::{
53    AllocTracker, BoxedBatchIterator, BoxedRecordBatchIterator, EncodedBulkPart, EncodedRange,
54    IterBuilder, KeyValues, MemScanMetrics, Memtable, MemtableBuilder, MemtableId, MemtableRange,
55    MemtableRangeContext, MemtableRanges, MemtableRef, MemtableStats, RangesOptions,
56};
57use crate::read::flat_dedup::{FlatDedupIterator, FlatLastNonNull, FlatLastRow};
58use crate::read::flat_merge::FlatMergeIterator;
59use crate::region::options::MergeMode;
60use crate::sst::parquet::flat_format::field_column_start;
61use crate::sst::parquet::{DEFAULT_READ_BATCH_SIZE, DEFAULT_ROW_GROUP_SIZE};
62use crate::sst::{FlatSchemaOptions, to_flat_sst_arrow_schema};
63
64/// Default merge threshold for triggering compaction.
65const DEFAULT_MERGE_THRESHOLD: usize = 16;
66
67/// Threshold for triggering merge of parts. Configurable via `GREPTIME_BULK_MERGE_THRESHOLD`.
68static MERGE_THRESHOLD: LazyLock<usize> =
69    LazyLock::new(|| env_usize("GREPTIME_BULK_MERGE_THRESHOLD", DEFAULT_MERGE_THRESHOLD));
70
71/// Default maximum number of groups for parallel merging.
72const DEFAULT_MAX_MERGE_GROUPS: usize = 32;
73
74/// Maximum merge groups. Configurable via `GREPTIME_BULK_MAX_MERGE_GROUPS`.
75static MAX_MERGE_GROUPS: LazyLock<usize> =
76    LazyLock::new(|| env_usize("GREPTIME_BULK_MAX_MERGE_GROUPS", DEFAULT_MAX_MERGE_GROUPS));
77
78/// Row threshold for encoding parts. Configurable via `GREPTIME_BULK_ENCODE_ROW_THRESHOLD`.
79/// When estimated rows exceed this threshold, parts are encoded as EncodedBulkPart.
80pub(crate) static ENCODE_ROW_THRESHOLD: LazyLock<usize> = LazyLock::new(|| {
81    env_usize(
82        "GREPTIME_BULK_ENCODE_ROW_THRESHOLD",
83        10 * DEFAULT_ROW_GROUP_SIZE,
84    )
85});
86
87/// Default bytes threshold for encoding.
88const DEFAULT_ENCODE_BYTES_THRESHOLD: usize = 64 * 1024 * 1024;
89
90/// Bytes threshold for encoding parts. Configurable via `GREPTIME_BULK_ENCODE_BYTES_THRESHOLD`.
91/// When estimated bytes exceed this threshold, parts are encoded as EncodedBulkPart.
92static ENCODE_BYTES_THRESHOLD: LazyLock<usize> = LazyLock::new(|| {
93    env_usize(
94        "GREPTIME_BULK_ENCODE_BYTES_THRESHOLD",
95        DEFAULT_ENCODE_BYTES_THRESHOLD,
96    )
97});
98
99/// Configuration for bulk memtable.
100#[derive(Debug, Clone)]
101pub struct BulkMemtableConfig {
102    /// Threshold for triggering merge of parts.
103    pub merge_threshold: usize,
104    /// Row threshold for encoding parts.
105    pub encode_row_threshold: usize,
106    /// Bytes threshold for encoding parts.
107    pub encode_bytes_threshold: usize,
108    /// Maximum number of groups for parallel merging.
109    pub max_merge_groups: usize,
110}
111
112impl Default for BulkMemtableConfig {
113    fn default() -> Self {
114        Self {
115            merge_threshold: *MERGE_THRESHOLD,
116            encode_row_threshold: *ENCODE_ROW_THRESHOLD,
117            encode_bytes_threshold: *ENCODE_BYTES_THRESHOLD,
118            max_merge_groups: *MAX_MERGE_GROUPS,
119        }
120    }
121}
122
123/// Result of merging parts - either a MultiBulkPart or an EncodedBulkPart
124enum MergedPart {
125    /// Merged part stored as MultiBulkPart (when rows < DEFAULT_ROW_GROUP_SIZE)
126    Multi(MultiBulkPart),
127    /// Merged part stored as EncodedBulkPart (when rows >= DEFAULT_ROW_GROUP_SIZE)
128    Encoded(EncodedBulkPart),
129}
130
131/// Result of collecting parts to merge
132struct CollectedParts {
133    /// Groups of parts ready for merging (each group has up to 16 parts)
134    groups: Vec<Vec<PartToMerge>>,
135}
136
137/// All parts in a bulk memtable.
138#[derive(Default)]
139struct BulkParts {
140    /// Unordered small parts (< 1024 rows).
141    unordered_part: UnorderedPart,
142    /// All parts (raw and encoded).
143    parts: Vec<BulkPartWrapper>,
144}
145
146impl BulkParts {
147    /// Total number of parts (including unordered).
148    fn num_parts(&self) -> usize {
149        let unordered_count = if self.unordered_part.is_empty() { 0 } else { 1 };
150        self.parts.len() + unordered_count
151    }
152
153    /// Returns true if there is no part.
154    fn is_empty(&self) -> bool {
155        self.unordered_part.is_empty() && self.parts.is_empty()
156    }
157
158    /// Returns true if bulk parts or encoded parts should be merged.
159    /// Uses short-circuit counting to stop early once threshold is reached.
160    fn should_merge_parts(&self, merge_threshold: usize) -> bool {
161        let mut bulk_count = 0;
162        let mut encoded_count = 0;
163
164        for wrapper in &self.parts {
165            if wrapper.merging {
166                continue;
167            }
168
169            if wrapper.part.is_encoded() {
170                encoded_count += 1;
171            } else {
172                bulk_count += 1;
173            }
174
175            // Short-circuit: stop counting if either threshold is reached
176            if bulk_count >= merge_threshold || encoded_count >= merge_threshold {
177                return true;
178            }
179        }
180
181        false
182    }
183
184    /// Returns true if the unordered_part should be compacted into a BulkPart.
185    fn should_compact_unordered_part(&self) -> bool {
186        self.unordered_part.should_compact()
187    }
188
189    /// Collects unmerged parts and marks them as being merged.
190    /// Only collects parts of types that meet the threshold.
191    /// Parts are pre-grouped into chunks for parallel processing.
192    fn collect_parts_to_merge(
193        &mut self,
194        merge_threshold: usize,
195        max_merge_groups: usize,
196    ) -> CollectedParts {
197        // First pass: collect indices and row counts for each type
198        let mut bulk_indices: Vec<(usize, usize)> = Vec::new();
199        let mut encoded_indices: Vec<(usize, usize)> = Vec::new();
200
201        for (idx, wrapper) in self.parts.iter().enumerate() {
202            if wrapper.merging {
203                continue;
204            }
205            let num_rows = wrapper.part.num_rows();
206            if wrapper.part.is_encoded() {
207                encoded_indices.push((idx, num_rows));
208            } else {
209                bulk_indices.push((idx, num_rows));
210            }
211        }
212
213        let mut groups = Vec::new();
214
215        // Process bulk parts if threshold met
216        if bulk_indices.len() >= merge_threshold {
217            groups.extend(self.collect_and_group_parts(
218                bulk_indices,
219                merge_threshold,
220                max_merge_groups,
221            ));
222        }
223
224        // Process encoded parts if threshold met
225        if encoded_indices.len() >= merge_threshold {
226            groups.extend(self.collect_and_group_parts(
227                encoded_indices,
228                merge_threshold,
229                max_merge_groups,
230            ));
231        }
232
233        CollectedParts { groups }
234    }
235
236    /// Sorts indices by row count, groups into chunks, marks as merging, and returns groups.
237    fn collect_and_group_parts(
238        &mut self,
239        mut indices: Vec<(usize, usize)>,
240        merge_threshold: usize,
241        max_merge_groups: usize,
242    ) -> Vec<Vec<PartToMerge>> {
243        if indices.is_empty() {
244            return Vec::new();
245        }
246
247        // Sort by row count for better grouping
248        indices.sort_unstable_by_key(|(_, num_rows)| *num_rows);
249
250        // Group into chunks of merge_threshold size, limit to max_merge_groups
251        indices
252            .chunks(merge_threshold)
253            .take(max_merge_groups)
254            .map(|chunk| {
255                chunk
256                    .iter()
257                    .map(|(idx, _)| {
258                        let wrapper = &mut self.parts[*idx];
259                        wrapper.merging = true;
260                        wrapper.part.clone()
261                    })
262                    .collect()
263            })
264            .collect()
265    }
266
267    /// Installs merged parts and removes the original parts by file ids.
268    /// Returns the total number of rows in the merged parts.
269    fn install_merged_parts<I>(
270        &mut self,
271        merged_parts: I,
272        merged_file_ids: &HashSet<FileId>,
273    ) -> usize
274    where
275        I: IntoIterator<Item = MergedPart>,
276    {
277        let mut total_output_rows = 0;
278
279        for merged_part in merged_parts {
280            match merged_part {
281                MergedPart::Encoded(encoded_part) => {
282                    total_output_rows += encoded_part.metadata().num_rows;
283                    self.parts.push(BulkPartWrapper {
284                        part: PartToMerge::Encoded {
285                            part: encoded_part,
286                            file_id: FileId::random(),
287                        },
288                        merging: false,
289                    });
290                }
291                MergedPart::Multi(multi_part) => {
292                    total_output_rows += multi_part.num_rows();
293                    self.parts.push(BulkPartWrapper {
294                        part: PartToMerge::Multi {
295                            part: multi_part,
296                            file_id: FileId::random(),
297                        },
298                        merging: false,
299                    });
300                }
301            }
302        }
303
304        self.parts
305            .retain(|wrapper| !merged_file_ids.contains(&wrapper.file_id()));
306
307        total_output_rows
308    }
309
310    /// Resets merging flag for parts with the given file ids.
311    /// Used when merging fails or is cancelled.
312    fn reset_merging_flags(&mut self, file_ids: &HashSet<FileId>) {
313        for wrapper in &mut self.parts {
314            if file_ids.contains(&wrapper.file_id()) {
315                wrapper.merging = false;
316            }
317        }
318    }
319}
320
321/// RAII guard for managing merging flags.
322/// Automatically resets merging flags when dropped if the merge operation wasn't successful.
323struct MergingFlagsGuard<'a> {
324    bulk_parts: &'a RwLock<BulkParts>,
325    file_ids: &'a HashSet<FileId>,
326    success: bool,
327}
328
329impl<'a> MergingFlagsGuard<'a> {
330    /// Creates a new guard for the given file ids.
331    fn new(bulk_parts: &'a RwLock<BulkParts>, file_ids: &'a HashSet<FileId>) -> Self {
332        Self {
333            bulk_parts,
334            file_ids,
335            success: false,
336        }
337    }
338
339    /// Marks the merge operation as successful.
340    /// When this is called, the guard will not reset the flags on drop.
341    fn mark_success(&mut self) {
342        self.success = true;
343    }
344}
345
346impl<'a> Drop for MergingFlagsGuard<'a> {
347    fn drop(&mut self) {
348        if !self.success
349            && let Ok(mut parts) = self.bulk_parts.write()
350        {
351            parts.reset_merging_flags(self.file_ids);
352        }
353    }
354}
355
356/// Memtable that ingests and scans parts directly.
357pub struct BulkMemtable {
358    id: MemtableId,
359    /// Configuration for the bulk memtable.
360    config: BulkMemtableConfig,
361    parts: Arc<RwLock<BulkParts>>,
362    metadata: RegionMetadataRef,
363    alloc_tracker: AllocTracker,
364    max_timestamp: AtomicI64,
365    min_timestamp: AtomicI64,
366    max_sequence: AtomicU64,
367    num_rows: AtomicUsize,
368    /// Cached flat SST arrow schema for memtable compaction.
369    flat_arrow_schema: SchemaRef,
370    /// Compactor for merging bulk parts
371    compactor: Arc<Mutex<MemtableCompactor>>,
372    /// Dispatcher for scheduling compaction tasks
373    compact_dispatcher: Option<Arc<CompactDispatcher>>,
374    /// Whether the append mode is enabled
375    append_mode: bool,
376    /// Mode to handle duplicate rows while merging
377    merge_mode: MergeMode,
378}
379
380impl std::fmt::Debug for BulkMemtable {
381    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
382        f.debug_struct("BulkMemtable")
383            .field("id", &self.id)
384            .field("num_rows", &self.num_rows.load(Ordering::Relaxed))
385            .field("min_timestamp", &self.min_timestamp.load(Ordering::Relaxed))
386            .field("max_timestamp", &self.max_timestamp.load(Ordering::Relaxed))
387            .field("max_sequence", &self.max_sequence.load(Ordering::Relaxed))
388            .finish()
389    }
390}
391
392impl Memtable for BulkMemtable {
393    fn id(&self) -> MemtableId {
394        self.id
395    }
396
397    fn write(&self, _kvs: &KeyValues) -> Result<()> {
398        UnsupportedOperationSnafu {
399            err_msg: "write() is not supported for bulk memtable",
400        }
401        .fail()
402    }
403
404    fn write_one(&self, _key_value: KeyValue) -> Result<()> {
405        UnsupportedOperationSnafu {
406            err_msg: "write_one() is not supported for bulk memtable",
407        }
408        .fail()
409    }
410
411    fn write_bulk(&self, fragment: BulkPart) -> Result<()> {
412        let local_metrics = WriteMetrics {
413            key_bytes: 0,
414            value_bytes: fragment.estimated_size(),
415            min_ts: fragment.min_timestamp,
416            max_ts: fragment.max_timestamp,
417            num_rows: fragment.num_rows(),
418            max_sequence: fragment.sequence,
419        };
420
421        {
422            let mut bulk_parts = self.parts.write().unwrap();
423
424            // Routes small parts to unordered_part based on threshold
425            if bulk_parts.unordered_part.should_accept(fragment.num_rows()) {
426                bulk_parts.unordered_part.push(fragment);
427
428                // Compacts unordered_part if threshold is reached
429                if bulk_parts.should_compact_unordered_part()
430                    && let Some(bulk_part) = bulk_parts.unordered_part.to_bulk_part()?
431                {
432                    bulk_parts.parts.push(BulkPartWrapper {
433                        part: PartToMerge::Bulk {
434                            part: bulk_part,
435                            file_id: FileId::random(),
436                        },
437                        merging: false,
438                    });
439                    bulk_parts.unordered_part.clear();
440                }
441            } else {
442                bulk_parts.parts.push(BulkPartWrapper {
443                    part: PartToMerge::Bulk {
444                        part: fragment,
445                        file_id: FileId::random(),
446                    },
447                    merging: false,
448                });
449            }
450
451            // Since this operation should be fast, we do it in parts lock scope.
452            // This ensure the statistics in `ranges()` are correct. What's more,
453            // it guarantees no rows are out of the time range so we don't need to
454            // prune rows by time range again in the iterator of the MemtableRange.
455            self.update_stats(local_metrics);
456        }
457
458        if self.should_compact() {
459            self.schedule_compact();
460        }
461
462        Ok(())
463    }
464
465    fn ranges(
466        &self,
467        projection: Option<&[ColumnId]>,
468        options: RangesOptions,
469    ) -> Result<MemtableRanges> {
470        let predicate = options.predicate;
471        let sequence = options.sequence;
472        let mut ranges = BTreeMap::new();
473        let mut range_id = 0;
474
475        // TODO(yingwen): Filter ranges by sequence.
476        let context = Arc::new(BulkIterContext::new_with_pre_filter_mode(
477            self.metadata.clone(),
478            projection,
479            predicate.predicate().cloned(),
480            options.for_flush,
481            options.pre_filter_mode,
482        )?);
483
484        // Adds ranges for regular parts and encoded parts
485        {
486            let bulk_parts = self.parts.read().unwrap();
487
488            // Adds range for unordered part if not empty
489            if !bulk_parts.unordered_part.is_empty()
490                && let Some(unordered_bulk_part) = bulk_parts.unordered_part.to_bulk_part()?
491            {
492                let part_stats = unordered_bulk_part.to_memtable_stats(&self.metadata);
493                let range = MemtableRange::new(
494                    Arc::new(MemtableRangeContext::new(
495                        self.id,
496                        Box::new(BulkRangeIterBuilder {
497                            part: unordered_bulk_part,
498                            context: context.clone(),
499                            sequence,
500                        }),
501                        predicate.clone(),
502                    )),
503                    part_stats,
504                );
505                ranges.insert(range_id, range);
506                range_id += 1;
507            }
508
509            // Adds ranges for all parts
510            for part_wrapper in bulk_parts.parts.iter() {
511                // Skips empty parts
512                if part_wrapper.part.num_rows() == 0 {
513                    continue;
514                }
515
516                let part_stats = part_wrapper.part.to_memtable_stats(&self.metadata);
517                let iter_builder: Box<dyn IterBuilder> = match &part_wrapper.part {
518                    PartToMerge::Bulk { part, .. } => Box::new(BulkRangeIterBuilder {
519                        part: part.clone(),
520                        context: context.clone(),
521                        sequence,
522                    }),
523                    PartToMerge::Multi { part, .. } => Box::new(MultiBulkRangeIterBuilder {
524                        part: part.clone(),
525                        context: context.clone(),
526                        sequence,
527                    }),
528                    PartToMerge::Encoded { part, file_id } => {
529                        Box::new(EncodedBulkRangeIterBuilder {
530                            file_id: *file_id,
531                            part: part.clone(),
532                            context: context.clone(),
533                            sequence,
534                        })
535                    }
536                };
537
538                let range = MemtableRange::new(
539                    Arc::new(MemtableRangeContext::new(
540                        self.id,
541                        iter_builder,
542                        predicate.clone(),
543                    )),
544                    part_stats,
545                );
546                ranges.insert(range_id, range);
547                range_id += 1;
548            }
549        }
550
551        Ok(MemtableRanges { ranges })
552    }
553
554    fn is_empty(&self) -> bool {
555        let bulk_parts = self.parts.read().unwrap();
556        bulk_parts.is_empty()
557    }
558
559    fn freeze(&self) -> Result<()> {
560        self.alloc_tracker.done_allocating();
561        Ok(())
562    }
563
564    fn stats(&self) -> MemtableStats {
565        let estimated_bytes = self.alloc_tracker.bytes_allocated();
566
567        if estimated_bytes == 0 || self.num_rows.load(Ordering::Relaxed) == 0 {
568            return MemtableStats {
569                estimated_bytes,
570                time_range: None,
571                num_rows: 0,
572                num_ranges: 0,
573                max_sequence: 0,
574                series_count: 0,
575            };
576        }
577
578        let ts_type = self
579            .metadata
580            .time_index_column()
581            .column_schema
582            .data_type
583            .clone()
584            .as_timestamp()
585            .expect("Timestamp column must have timestamp type");
586        let max_timestamp = ts_type.create_timestamp(self.max_timestamp.load(Ordering::Relaxed));
587        let min_timestamp = ts_type.create_timestamp(self.min_timestamp.load(Ordering::Relaxed));
588
589        let num_ranges = self.parts.read().unwrap().num_parts();
590
591        MemtableStats {
592            estimated_bytes,
593            time_range: Some((min_timestamp, max_timestamp)),
594            num_rows: self.num_rows.load(Ordering::Relaxed),
595            num_ranges,
596            max_sequence: self.max_sequence.load(Ordering::Relaxed),
597            series_count: self.estimated_series_count(),
598        }
599    }
600
601    fn fork(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef {
602        // Computes the new flat schema based on the new metadata.
603        let flat_arrow_schema = to_flat_sst_arrow_schema(
604            metadata,
605            &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
606        );
607
608        Arc::new(Self {
609            id,
610            config: self.config.clone(),
611            parts: Arc::new(RwLock::new(BulkParts::default())),
612            metadata: metadata.clone(),
613            alloc_tracker: AllocTracker::new(self.alloc_tracker.write_buffer_manager()),
614            max_timestamp: AtomicI64::new(i64::MIN),
615            min_timestamp: AtomicI64::new(i64::MAX),
616            max_sequence: AtomicU64::new(0),
617            num_rows: AtomicUsize::new(0),
618            flat_arrow_schema,
619            compactor: Arc::new(Mutex::new(MemtableCompactor::new(
620                metadata.region_id,
621                id,
622                self.config.clone(),
623            ))),
624            compact_dispatcher: self.compact_dispatcher.clone(),
625            append_mode: self.append_mode,
626            merge_mode: self.merge_mode,
627        })
628    }
629
630    fn compact(&self, for_flush: bool) -> Result<()> {
631        let mut compactor = self.compactor.lock().unwrap();
632
633        if for_flush {
634            return Ok(());
635        }
636
637        // Unified merge for all parts
638        let should_merge = self
639            .parts
640            .read()
641            .unwrap()
642            .should_merge_parts(self.config.merge_threshold);
643        if should_merge {
644            compactor.merge_parts(
645                &self.flat_arrow_schema,
646                &self.parts,
647                &self.metadata,
648                !self.append_mode,
649                self.merge_mode,
650            )?;
651        }
652
653        Ok(())
654    }
655}
656
657impl BulkMemtable {
658    /// Creates a new BulkMemtable
659    pub fn new(
660        id: MemtableId,
661        config: BulkMemtableConfig,
662        metadata: RegionMetadataRef,
663        write_buffer_manager: Option<WriteBufferManagerRef>,
664        compact_dispatcher: Option<Arc<CompactDispatcher>>,
665        append_mode: bool,
666        merge_mode: MergeMode,
667    ) -> Self {
668        let flat_arrow_schema = to_flat_sst_arrow_schema(
669            &metadata,
670            &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
671        );
672
673        let region_id = metadata.region_id;
674        Self {
675            id,
676            config: config.clone(),
677            parts: Arc::new(RwLock::new(BulkParts::default())),
678            metadata,
679            alloc_tracker: AllocTracker::new(write_buffer_manager),
680            max_timestamp: AtomicI64::new(i64::MIN),
681            min_timestamp: AtomicI64::new(i64::MAX),
682            max_sequence: AtomicU64::new(0),
683            num_rows: AtomicUsize::new(0),
684            flat_arrow_schema,
685            compactor: Arc::new(Mutex::new(MemtableCompactor::new(region_id, id, config))),
686            compact_dispatcher,
687            append_mode,
688            merge_mode,
689        }
690    }
691
692    /// Sets the unordered part threshold (for testing).
693    #[cfg(test)]
694    pub fn set_unordered_part_threshold(&self, threshold: usize) {
695        self.parts
696            .write()
697            .unwrap()
698            .unordered_part
699            .set_threshold(threshold);
700    }
701
702    /// Sets the unordered part compact threshold (for testing).
703    #[cfg(test)]
704    pub fn set_unordered_part_compact_threshold(&self, compact_threshold: usize) {
705        self.parts
706            .write()
707            .unwrap()
708            .unordered_part
709            .set_compact_threshold(compact_threshold);
710    }
711
712    /// Updates memtable stats.
713    ///
714    /// Please update this inside the write lock scope.
715    fn update_stats(&self, stats: WriteMetrics) {
716        self.alloc_tracker
717            .on_allocation(stats.key_bytes + stats.value_bytes);
718
719        self.max_timestamp
720            .fetch_max(stats.max_ts, Ordering::Relaxed);
721        self.min_timestamp
722            .fetch_min(stats.min_ts, Ordering::Relaxed);
723        self.max_sequence
724            .fetch_max(stats.max_sequence, Ordering::Relaxed);
725        self.num_rows.fetch_add(stats.num_rows, Ordering::Relaxed);
726    }
727
728    /// Returns the estimated time series count.
729    fn estimated_series_count(&self) -> usize {
730        let bulk_parts = self.parts.read().unwrap();
731        bulk_parts
732            .parts
733            .iter()
734            .map(|part_wrapper| part_wrapper.part.series_count())
735            .sum()
736    }
737
738    /// Returns whether the memtable should be compacted.
739    fn should_compact(&self) -> bool {
740        let parts = self.parts.read().unwrap();
741        parts.should_merge_parts(self.config.merge_threshold)
742    }
743
744    /// Schedules a compaction task using the CompactDispatcher.
745    fn schedule_compact(&self) {
746        if let Some(dispatcher) = &self.compact_dispatcher {
747            let task = MemCompactTask {
748                metadata: self.metadata.clone(),
749                parts: self.parts.clone(),
750                config: self.config.clone(),
751                flat_arrow_schema: self.flat_arrow_schema.clone(),
752                compactor: self.compactor.clone(),
753                append_mode: self.append_mode,
754                merge_mode: self.merge_mode,
755            };
756
757            dispatcher.dispatch_compact(task);
758        } else {
759            // Uses synchronous compaction if no dispatcher is available.
760            if let Err(e) = self.compact(false) {
761                common_telemetry::error!(e; "Failed to compact table");
762            }
763        }
764    }
765}
766
767/// Iterator builder for bulk range
768pub struct BulkRangeIterBuilder {
769    pub part: BulkPart,
770    pub context: Arc<BulkIterContext>,
771    pub sequence: Option<SequenceRange>,
772}
773
774/// Iterator builder for multi bulk range
775struct MultiBulkRangeIterBuilder {
776    part: MultiBulkPart,
777    context: Arc<BulkIterContext>,
778    sequence: Option<SequenceRange>,
779}
780
781impl IterBuilder for BulkRangeIterBuilder {
782    fn build(&self, _metrics: Option<MemScanMetrics>) -> Result<BoxedBatchIterator> {
783        UnsupportedOperationSnafu {
784            err_msg: "BatchIterator is not supported for bulk memtable",
785        }
786        .fail()
787    }
788
789    fn is_record_batch(&self) -> bool {
790        true
791    }
792
793    fn build_record_batch(
794        &self,
795        _time_range: Option<(Timestamp, Timestamp)>,
796        metrics: Option<MemScanMetrics>,
797    ) -> Result<BoxedRecordBatchIterator> {
798        let series_count = self.part.estimated_series_count();
799        let iter = BulkPartBatchIter::from_single(
800            self.part.batch.clone(),
801            self.context.clone(),
802            self.sequence,
803            series_count,
804            metrics,
805        );
806
807        Ok(Box::new(iter))
808    }
809
810    fn encoded_range(&self) -> Option<EncodedRange> {
811        None
812    }
813}
814
815impl IterBuilder for MultiBulkRangeIterBuilder {
816    fn build(&self, _metrics: Option<MemScanMetrics>) -> Result<BoxedBatchIterator> {
817        UnsupportedOperationSnafu {
818            err_msg: "BatchIterator is not supported for multi bulk memtable",
819        }
820        .fail()
821    }
822
823    fn is_record_batch(&self) -> bool {
824        true
825    }
826
827    fn build_record_batch(
828        &self,
829        _time_range: Option<(Timestamp, Timestamp)>,
830        metrics: Option<MemScanMetrics>,
831    ) -> Result<BoxedRecordBatchIterator> {
832        self.part
833            .read(self.context.clone(), self.sequence, metrics)?
834            .ok_or_else(|| {
835                UnsupportedOperationSnafu {
836                    err_msg: "Failed to create iterator for multi bulk part",
837                }
838                .build()
839            })
840    }
841
842    fn encoded_range(&self) -> Option<EncodedRange> {
843        None
844    }
845}
846
847/// Iterator builder for encoded bulk range
848struct EncodedBulkRangeIterBuilder {
849    file_id: FileId,
850    part: EncodedBulkPart,
851    context: Arc<BulkIterContext>,
852    sequence: Option<SequenceRange>,
853}
854
855impl IterBuilder for EncodedBulkRangeIterBuilder {
856    fn build(&self, _metrics: Option<MemScanMetrics>) -> Result<BoxedBatchIterator> {
857        UnsupportedOperationSnafu {
858            err_msg: "BatchIterator is not supported for encoded bulk memtable",
859        }
860        .fail()
861    }
862
863    fn is_record_batch(&self) -> bool {
864        true
865    }
866
867    fn build_record_batch(
868        &self,
869        _time_range: Option<(Timestamp, Timestamp)>,
870        metrics: Option<MemScanMetrics>,
871    ) -> Result<BoxedRecordBatchIterator> {
872        if let Some(iter) = self
873            .part
874            .read(self.context.clone(), self.sequence, metrics)?
875        {
876            Ok(iter)
877        } else {
878            // Return an empty iterator if no data to read
879            Ok(Box::new(std::iter::empty()))
880        }
881    }
882
883    fn encoded_range(&self) -> Option<EncodedRange> {
884        Some(EncodedRange {
885            data: self.part.data().clone(),
886            sst_info: self.part.to_sst_info(self.file_id),
887        })
888    }
889}
890
891struct BulkPartWrapper {
892    /// The part to store. It already contains the file id.
893    part: PartToMerge,
894    /// Whether this part is currently being merged.
895    merging: bool,
896}
897
898impl BulkPartWrapper {
899    /// Returns the file id of this part.
900    fn file_id(&self) -> FileId {
901        self.part.file_id()
902    }
903}
904
905/// Enum to wrap different types of parts for unified merging.
906#[derive(Clone)]
907enum PartToMerge {
908    /// Raw bulk part.
909    Bulk { part: BulkPart, file_id: FileId },
910    /// Multiple bulk parts.
911    Multi {
912        part: MultiBulkPart,
913        file_id: FileId,
914    },
915    /// Encoded bulk part.
916    Encoded {
917        part: EncodedBulkPart,
918        file_id: FileId,
919    },
920}
921
922impl PartToMerge {
923    /// Gets the file ID of this part.
924    fn file_id(&self) -> FileId {
925        match self {
926            PartToMerge::Bulk { file_id, .. } => *file_id,
927            PartToMerge::Multi { file_id, .. } => *file_id,
928            PartToMerge::Encoded { file_id, .. } => *file_id,
929        }
930    }
931
932    /// Gets the minimum timestamp of this part.
933    fn min_timestamp(&self) -> i64 {
934        match self {
935            PartToMerge::Bulk { part, .. } => part.min_timestamp,
936            PartToMerge::Multi { part, .. } => part.min_timestamp(),
937            PartToMerge::Encoded { part, .. } => part.metadata().min_timestamp,
938        }
939    }
940
941    /// Gets the maximum timestamp of this part.
942    fn max_timestamp(&self) -> i64 {
943        match self {
944            PartToMerge::Bulk { part, .. } => part.max_timestamp,
945            PartToMerge::Multi { part, .. } => part.max_timestamp(),
946            PartToMerge::Encoded { part, .. } => part.metadata().max_timestamp,
947        }
948    }
949
950    /// Gets the number of rows in this part.
951    fn num_rows(&self) -> usize {
952        match self {
953            PartToMerge::Bulk { part, .. } => part.num_rows(),
954            PartToMerge::Multi { part, .. } => part.num_rows(),
955            PartToMerge::Encoded { part, .. } => part.metadata().num_rows,
956        }
957    }
958
959    /// Gets the maximum sequence number of this part.
960    fn max_sequence(&self) -> u64 {
961        match self {
962            PartToMerge::Bulk { part, .. } => part.sequence,
963            PartToMerge::Multi { part, .. } => part.max_sequence(),
964            PartToMerge::Encoded { part, .. } => part.metadata().max_sequence,
965        }
966    }
967
968    /// Gets the estimated series count in this part.
969    fn series_count(&self) -> usize {
970        match self {
971            PartToMerge::Bulk { part, .. } => part.estimated_series_count(),
972            PartToMerge::Multi { part, .. } => part.series_count(),
973            PartToMerge::Encoded { part, .. } => part.metadata().num_series as usize,
974        }
975    }
976
977    /// Returns true if this is an encoded part.
978    fn is_encoded(&self) -> bool {
979        matches!(self, PartToMerge::Encoded { .. })
980    }
981
982    /// Gets the estimated size in bytes of this part.
983    fn estimated_size(&self) -> usize {
984        match self {
985            PartToMerge::Bulk { part, .. } => part.estimated_size(),
986            PartToMerge::Multi { part, .. } => part.estimated_size(),
987            PartToMerge::Encoded { part, .. } => part.size_bytes(),
988        }
989    }
990
991    /// Converts this part to `MemtableStats`.
992    fn to_memtable_stats(&self, region_metadata: &RegionMetadataRef) -> MemtableStats {
993        match self {
994            PartToMerge::Bulk { part, .. } => part.to_memtable_stats(region_metadata),
995            PartToMerge::Multi { part, .. } => part.to_memtable_stats(region_metadata),
996            PartToMerge::Encoded { part, .. } => part.to_memtable_stats(),
997        }
998    }
999
1000    /// Creates a record batch iterator for this part.
1001    fn create_iterator(
1002        self,
1003        context: Arc<BulkIterContext>,
1004    ) -> Result<Option<BoxedRecordBatchIterator>> {
1005        match self {
1006            PartToMerge::Bulk { part, .. } => {
1007                let series_count = part.estimated_series_count();
1008                let iter = BulkPartBatchIter::from_single(
1009                    part.batch,
1010                    context,
1011                    None, // No sequence filter for merging
1012                    series_count,
1013                    None, // No metrics for merging
1014                );
1015                Ok(Some(Box::new(iter) as BoxedRecordBatchIterator))
1016            }
1017            PartToMerge::Multi { part, .. } => part.read(context, None, None),
1018            PartToMerge::Encoded { part, .. } => part.read(context, None, None),
1019        }
1020    }
1021}
1022
1023struct MemtableCompactor {
1024    region_id: RegionId,
1025    memtable_id: MemtableId,
1026    /// Configuration for the bulk memtable.
1027    config: BulkMemtableConfig,
1028}
1029
1030impl MemtableCompactor {
1031    /// Creates a new MemtableCompactor.
1032    fn new(region_id: RegionId, memtable_id: MemtableId, config: BulkMemtableConfig) -> Self {
1033        Self {
1034            region_id,
1035            memtable_id,
1036            config,
1037        }
1038    }
1039
1040    /// Merges parts (bulk and encoded) and then encodes the result.
1041    fn merge_parts(
1042        &mut self,
1043        arrow_schema: &SchemaRef,
1044        bulk_parts: &RwLock<BulkParts>,
1045        metadata: &RegionMetadataRef,
1046        dedup: bool,
1047        merge_mode: MergeMode,
1048    ) -> Result<()> {
1049        let start = Instant::now();
1050
1051        // Collect pre-grouped parts
1052        let collected = bulk_parts
1053            .write()
1054            .unwrap()
1055            .collect_parts_to_merge(self.config.merge_threshold, self.config.max_merge_groups);
1056
1057        if collected.groups.is_empty() {
1058            return Ok(());
1059        }
1060
1061        // Collect all file IDs for tracking
1062        let merged_file_ids: HashSet<FileId> = collected
1063            .groups
1064            .iter()
1065            .flatten()
1066            .map(|part| part.file_id())
1067            .collect();
1068        let mut guard = MergingFlagsGuard::new(bulk_parts, &merged_file_ids);
1069
1070        let num_groups = collected.groups.len();
1071        let num_parts: usize = collected.groups.iter().map(|g| g.len()).sum();
1072
1073        let encode_row_threshold = self.config.encode_row_threshold;
1074        let encode_bytes_threshold = self.config.encode_bytes_threshold;
1075
1076        // Merge all groups in parallel
1077        let merged_parts = collected
1078            .groups
1079            .into_par_iter()
1080            .map(|group| {
1081                Self::merge_parts_group(
1082                    group,
1083                    arrow_schema,
1084                    metadata,
1085                    dedup,
1086                    merge_mode,
1087                    encode_row_threshold,
1088                    encode_bytes_threshold,
1089                )
1090            })
1091            .collect::<Result<Vec<Option<MergedPart>>>>()?;
1092
1093        // Install all merged parts
1094        let total_output_rows = {
1095            let mut parts = bulk_parts.write().unwrap();
1096            parts.install_merged_parts(merged_parts.into_iter().flatten(), &merged_file_ids)
1097        };
1098
1099        guard.mark_success();
1100
1101        common_telemetry::debug!(
1102            "BulkMemtable {} {} concurrent compact {} groups, {} parts, {} rows, cost: {:?}",
1103            self.region_id,
1104            self.memtable_id,
1105            num_groups,
1106            num_parts,
1107            total_output_rows,
1108            start.elapsed()
1109        );
1110
1111        Ok(())
1112    }
1113
1114    /// Merges a group of parts into a single part (either MultiBulkPart or EncodedBulkPart).
1115    fn merge_parts_group(
1116        parts_to_merge: Vec<PartToMerge>,
1117        arrow_schema: &SchemaRef,
1118        metadata: &RegionMetadataRef,
1119        dedup: bool,
1120        merge_mode: MergeMode,
1121        encode_row_threshold: usize,
1122        encode_bytes_threshold: usize,
1123    ) -> Result<Option<MergedPart>> {
1124        if parts_to_merge.is_empty() {
1125            return Ok(None);
1126        }
1127
1128        // Calculates timestamp bounds and statistics for merged data
1129        let min_timestamp = parts_to_merge
1130            .iter()
1131            .map(|p| p.min_timestamp())
1132            .min()
1133            .unwrap_or(i64::MAX);
1134        let max_timestamp = parts_to_merge
1135            .iter()
1136            .map(|p| p.max_timestamp())
1137            .max()
1138            .unwrap_or(i64::MIN);
1139        let max_sequence = parts_to_merge
1140            .iter()
1141            .map(|p| p.max_sequence())
1142            .max()
1143            .unwrap_or(0);
1144
1145        // Collects statistics from parts before creating iterators
1146        let estimated_total_rows: usize = parts_to_merge.iter().map(|p| p.num_rows()).sum();
1147        let estimated_total_bytes: usize = parts_to_merge.iter().map(|p| p.estimated_size()).sum();
1148        let estimated_series_count = parts_to_merge
1149            .iter()
1150            .map(|p| p.series_count())
1151            .max()
1152            .unwrap_or(0);
1153
1154        let context = Arc::new(BulkIterContext::new(
1155            metadata.clone(),
1156            None, // No column projection for merging
1157            None, // No predicate for merging
1158            true,
1159        )?);
1160
1161        // Creates iterators for all parts to merge.
1162        let iterators: Vec<BoxedRecordBatchIterator> = parts_to_merge
1163            .into_iter()
1164            .filter_map(|part| part.create_iterator(context.clone()).ok().flatten())
1165            .collect();
1166
1167        if iterators.is_empty() {
1168            return Ok(None);
1169        }
1170
1171        let merged_iter =
1172            FlatMergeIterator::new(arrow_schema.clone(), iterators, DEFAULT_READ_BATCH_SIZE)?;
1173
1174        let boxed_iter: BoxedRecordBatchIterator = if dedup {
1175            // Applies deduplication based on merge mode
1176            match merge_mode {
1177                MergeMode::LastRow => {
1178                    let dedup_iter = FlatDedupIterator::new(merged_iter, FlatLastRow::new(false));
1179                    Box::new(dedup_iter)
1180                }
1181                MergeMode::LastNonNull => {
1182                    let field_column_start =
1183                        field_column_start(metadata, arrow_schema.fields().len());
1184
1185                    let dedup_iter = FlatDedupIterator::new(
1186                        merged_iter,
1187                        FlatLastNonNull::new(field_column_start, false),
1188                    );
1189                    Box::new(dedup_iter)
1190                }
1191            }
1192        } else {
1193            Box::new(merged_iter)
1194        };
1195
1196        // Encode as EncodedBulkPart if rows exceed row threshold OR bytes exceed bytes threshold
1197        if estimated_total_rows > encode_row_threshold
1198            || estimated_total_bytes > encode_bytes_threshold
1199        {
1200            let encoder = BulkPartEncoder::new(metadata.clone(), DEFAULT_ROW_GROUP_SIZE)?;
1201            let mut metrics = BulkPartEncodeMetrics::default();
1202            let encoded_part = encoder.encode_record_batch_iter(
1203                boxed_iter,
1204                arrow_schema.clone(),
1205                min_timestamp,
1206                max_timestamp,
1207                max_sequence,
1208                &mut metrics,
1209            )?;
1210
1211            common_telemetry::trace!("merge_parts_group metrics: {:?}", metrics);
1212
1213            Ok(encoded_part.map(MergedPart::Encoded))
1214        } else {
1215            // Otherwise, collect into MultiBulkPart
1216            let mut batches = Vec::new();
1217            let mut actual_total_rows = 0;
1218
1219            for batch_result in boxed_iter {
1220                let batch = batch_result?;
1221                actual_total_rows += batch.num_rows();
1222                batches.push(batch);
1223            }
1224
1225            if actual_total_rows == 0 {
1226                return Ok(None);
1227            }
1228
1229            let multi_part = MultiBulkPart::new(
1230                batches,
1231                min_timestamp,
1232                max_timestamp,
1233                max_sequence,
1234                estimated_series_count,
1235            );
1236
1237            common_telemetry::trace!(
1238                "merge_parts_group created MultiBulkPart: rows={}, batches={}",
1239                actual_total_rows,
1240                multi_part.num_batches()
1241            );
1242
1243            Ok(Some(MergedPart::Multi(multi_part)))
1244        }
1245    }
1246}
1247
1248/// A memtable compact task to run in background.
1249struct MemCompactTask {
1250    metadata: RegionMetadataRef,
1251    parts: Arc<RwLock<BulkParts>>,
1252    /// Configuration for the bulk memtable.
1253    config: BulkMemtableConfig,
1254    /// Cached flat SST arrow schema
1255    flat_arrow_schema: SchemaRef,
1256    /// Compactor for merging bulk parts
1257    compactor: Arc<Mutex<MemtableCompactor>>,
1258    /// Whether the append mode is enabled
1259    append_mode: bool,
1260    /// Mode to handle duplicate rows while merging
1261    merge_mode: MergeMode,
1262}
1263
1264impl MemCompactTask {
1265    fn compact(&self) -> Result<()> {
1266        let mut compactor = self.compactor.lock().unwrap();
1267
1268        let should_merge = self
1269            .parts
1270            .read()
1271            .unwrap()
1272            .should_merge_parts(self.config.merge_threshold);
1273        if should_merge {
1274            compactor.merge_parts(
1275                &self.flat_arrow_schema,
1276                &self.parts,
1277                &self.metadata,
1278                !self.append_mode,
1279                self.merge_mode,
1280            )?;
1281        }
1282
1283        Ok(())
1284    }
1285}
1286
1287/// Scheduler to run compact tasks in background.
1288#[derive(Debug)]
1289pub struct CompactDispatcher {
1290    semaphore: Arc<Semaphore>,
1291}
1292
1293impl CompactDispatcher {
1294    /// Creates a new dispatcher with the given number of max concurrent tasks.
1295    pub fn new(permits: usize) -> Self {
1296        Self {
1297            semaphore: Arc::new(Semaphore::new(permits)),
1298        }
1299    }
1300
1301    /// Dispatches a compact task to run in background.
1302    fn dispatch_compact(&self, task: MemCompactTask) {
1303        let semaphore = self.semaphore.clone();
1304        common_runtime::spawn_global(async move {
1305            let Ok(_permit) = semaphore.acquire().await else {
1306                return;
1307            };
1308
1309            common_runtime::spawn_blocking_global(move || {
1310                if let Err(e) = task.compact() {
1311                    common_telemetry::error!(e; "Failed to compact memtable, region: {}", task.metadata.region_id);
1312                }
1313            });
1314        });
1315    }
1316}
1317
1318/// Builder to build a [BulkMemtable].
1319#[derive(Debug, Default)]
1320pub struct BulkMemtableBuilder {
1321    /// Configuration for the bulk memtable.
1322    config: BulkMemtableConfig,
1323    write_buffer_manager: Option<WriteBufferManagerRef>,
1324    compact_dispatcher: Option<Arc<CompactDispatcher>>,
1325    append_mode: bool,
1326    merge_mode: MergeMode,
1327}
1328
1329impl BulkMemtableBuilder {
1330    /// Creates a new builder with specific `write_buffer_manager`.
1331    pub fn new(
1332        write_buffer_manager: Option<WriteBufferManagerRef>,
1333        append_mode: bool,
1334        merge_mode: MergeMode,
1335    ) -> Self {
1336        Self {
1337            config: BulkMemtableConfig::default(),
1338            write_buffer_manager,
1339            compact_dispatcher: None,
1340            append_mode,
1341            merge_mode,
1342        }
1343    }
1344
1345    /// Sets the compact dispatcher.
1346    pub fn with_compact_dispatcher(mut self, compact_dispatcher: Arc<CompactDispatcher>) -> Self {
1347        self.compact_dispatcher = Some(compact_dispatcher);
1348        self
1349    }
1350}
1351
1352impl MemtableBuilder for BulkMemtableBuilder {
1353    fn build(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef {
1354        Arc::new(BulkMemtable::new(
1355            id,
1356            self.config.clone(),
1357            metadata.clone(),
1358            self.write_buffer_manager.clone(),
1359            self.compact_dispatcher.clone(),
1360            self.append_mode,
1361            self.merge_mode,
1362        ))
1363    }
1364
1365    fn use_bulk_insert(&self, _metadata: &RegionMetadataRef) -> bool {
1366        true
1367    }
1368}
1369
1370#[cfg(test)]
1371mod tests {
1372    use mito_codec::row_converter::build_primary_key_codec;
1373
1374    use super::*;
1375    use crate::memtable::bulk::part::BulkPartConverter;
1376    use crate::read::scan_region::PredicateGroup;
1377    use crate::sst::{FlatSchemaOptions, to_flat_sst_arrow_schema};
1378    use crate::test_util::memtable_util::{build_key_values_with_ts_seq_values, metadata_for_test};
1379
1380    fn create_bulk_part_with_converter(
1381        k0: &str,
1382        k1: u32,
1383        timestamps: Vec<i64>,
1384        values: Vec<Option<f64>>,
1385        sequence: u64,
1386    ) -> Result<BulkPart> {
1387        let metadata = metadata_for_test();
1388        let capacity = 100;
1389        let primary_key_codec = build_primary_key_codec(&metadata);
1390        let schema = to_flat_sst_arrow_schema(
1391            &metadata,
1392            &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
1393        );
1394
1395        let mut converter =
1396            BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec, true);
1397
1398        let key_values = build_key_values_with_ts_seq_values(
1399            &metadata,
1400            k0.to_string(),
1401            k1,
1402            timestamps.into_iter(),
1403            values.into_iter(),
1404            sequence,
1405        );
1406
1407        converter.append_key_values(&key_values)?;
1408        converter.convert()
1409    }
1410
1411    #[test]
1412    fn test_bulk_memtable_write_read() {
1413        let metadata = metadata_for_test();
1414        let memtable = BulkMemtable::new(
1415            999,
1416            BulkMemtableConfig::default(),
1417            metadata.clone(),
1418            None,
1419            None,
1420            false,
1421            MergeMode::LastRow,
1422        );
1423        // Disable unordered_part for this test
1424        memtable.set_unordered_part_threshold(0);
1425
1426        let test_data = [
1427            (
1428                "key_a",
1429                1u32,
1430                vec![1000i64, 2000i64],
1431                vec![Some(10.5), Some(20.5)],
1432                100u64,
1433            ),
1434            (
1435                "key_b",
1436                2u32,
1437                vec![1500i64, 2500i64],
1438                vec![Some(15.5), Some(25.5)],
1439                200u64,
1440            ),
1441            ("key_c", 3u32, vec![3000i64], vec![Some(30.5)], 300u64),
1442        ];
1443
1444        for (k0, k1, timestamps, values, seq) in test_data.iter() {
1445            let part =
1446                create_bulk_part_with_converter(k0, *k1, timestamps.clone(), values.clone(), *seq)
1447                    .unwrap();
1448            memtable.write_bulk(part).unwrap();
1449        }
1450
1451        let stats = memtable.stats();
1452        assert_eq!(5, stats.num_rows);
1453        assert_eq!(3, stats.num_ranges);
1454        assert_eq!(300, stats.max_sequence);
1455
1456        let (min_ts, max_ts) = stats.time_range.unwrap();
1457        assert_eq!(1000, min_ts.value());
1458        assert_eq!(3000, max_ts.value());
1459
1460        let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
1461        let ranges = memtable
1462            .ranges(
1463                None,
1464                RangesOptions::default().with_predicate(predicate_group),
1465            )
1466            .unwrap();
1467
1468        assert_eq!(3, ranges.ranges.len());
1469        let total_rows: usize = ranges.ranges.values().map(|r| r.stats().num_rows()).sum();
1470        assert_eq!(5, total_rows);
1471
1472        for (_range_id, range) in ranges.ranges.iter() {
1473            assert!(range.num_rows() > 0);
1474            assert!(range.is_record_batch());
1475
1476            let record_batch_iter = range.build_record_batch_iter(None, None).unwrap();
1477
1478            let mut total_rows = 0;
1479            for batch_result in record_batch_iter {
1480                let batch = batch_result.unwrap();
1481                total_rows += batch.num_rows();
1482                assert!(batch.num_rows() > 0);
1483                assert_eq!(8, batch.num_columns());
1484            }
1485            assert_eq!(total_rows, range.num_rows());
1486        }
1487    }
1488
1489    #[test]
1490    fn test_bulk_memtable_ranges_with_projection() {
1491        let metadata = metadata_for_test();
1492        let memtable = BulkMemtable::new(
1493            111,
1494            BulkMemtableConfig::default(),
1495            metadata.clone(),
1496            None,
1497            None,
1498            false,
1499            MergeMode::LastRow,
1500        );
1501
1502        let bulk_part = create_bulk_part_with_converter(
1503            "projection_test",
1504            5,
1505            vec![5000, 6000, 7000],
1506            vec![Some(50.0), Some(60.0), Some(70.0)],
1507            500,
1508        )
1509        .unwrap();
1510
1511        memtable.write_bulk(bulk_part).unwrap();
1512
1513        let projection = vec![4u32];
1514        let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
1515        let ranges = memtable
1516            .ranges(
1517                Some(&projection),
1518                RangesOptions::default().with_predicate(predicate_group),
1519            )
1520            .unwrap();
1521
1522        assert_eq!(1, ranges.ranges.len());
1523        let range = ranges.ranges.get(&0).unwrap();
1524
1525        assert!(range.is_record_batch());
1526        let record_batch_iter = range.build_record_batch_iter(None, None).unwrap();
1527
1528        let mut total_rows = 0;
1529        for batch_result in record_batch_iter {
1530            let batch = batch_result.unwrap();
1531            assert!(batch.num_rows() > 0);
1532            assert_eq!(5, batch.num_columns());
1533            total_rows += batch.num_rows();
1534        }
1535        assert_eq!(3, total_rows);
1536    }
1537
1538    #[test]
1539    fn test_bulk_memtable_unsupported_operations() {
1540        let metadata = metadata_for_test();
1541        let memtable = BulkMemtable::new(
1542            111,
1543            BulkMemtableConfig::default(),
1544            metadata.clone(),
1545            None,
1546            None,
1547            false,
1548            MergeMode::LastRow,
1549        );
1550
1551        let key_values = build_key_values_with_ts_seq_values(
1552            &metadata,
1553            "test".to_string(),
1554            1,
1555            vec![1000].into_iter(),
1556            vec![Some(1.0)].into_iter(),
1557            1,
1558        );
1559
1560        let err = memtable.write(&key_values).unwrap_err();
1561        assert!(err.to_string().contains("not supported"));
1562
1563        let kv = key_values.iter().next().unwrap();
1564        let err = memtable.write_one(kv).unwrap_err();
1565        assert!(err.to_string().contains("not supported"));
1566    }
1567
1568    #[test]
1569    fn test_bulk_memtable_freeze() {
1570        let metadata = metadata_for_test();
1571        let memtable = BulkMemtable::new(
1572            222,
1573            BulkMemtableConfig::default(),
1574            metadata.clone(),
1575            None,
1576            None,
1577            false,
1578            MergeMode::LastRow,
1579        );
1580
1581        let bulk_part = create_bulk_part_with_converter(
1582            "freeze_test",
1583            10,
1584            vec![10000],
1585            vec![Some(100.0)],
1586            1000,
1587        )
1588        .unwrap();
1589
1590        memtable.write_bulk(bulk_part).unwrap();
1591        memtable.freeze().unwrap();
1592
1593        let stats_after_freeze = memtable.stats();
1594        assert_eq!(1, stats_after_freeze.num_rows);
1595    }
1596
1597    #[test]
1598    fn test_bulk_memtable_fork() {
1599        let metadata = metadata_for_test();
1600        let original_memtable = BulkMemtable::new(
1601            333,
1602            BulkMemtableConfig::default(),
1603            metadata.clone(),
1604            None,
1605            None,
1606            false,
1607            MergeMode::LastRow,
1608        );
1609
1610        let bulk_part =
1611            create_bulk_part_with_converter("fork_test", 15, vec![15000], vec![Some(150.0)], 1500)
1612                .unwrap();
1613
1614        original_memtable.write_bulk(bulk_part).unwrap();
1615
1616        let forked_memtable = original_memtable.fork(444, &metadata);
1617
1618        assert_eq!(forked_memtable.id(), 444);
1619        assert!(forked_memtable.is_empty());
1620        assert_eq!(0, forked_memtable.stats().num_rows);
1621
1622        assert!(!original_memtable.is_empty());
1623        assert_eq!(1, original_memtable.stats().num_rows);
1624    }
1625
1626    #[test]
1627    fn test_bulk_memtable_ranges_multiple_parts() {
1628        let metadata = metadata_for_test();
1629        let memtable = BulkMemtable::new(
1630            777,
1631            BulkMemtableConfig::default(),
1632            metadata.clone(),
1633            None,
1634            None,
1635            false,
1636            MergeMode::LastRow,
1637        );
1638        // Disable unordered_part for this test
1639        memtable.set_unordered_part_threshold(0);
1640
1641        let parts_data = vec![
1642            (
1643                "part1",
1644                1u32,
1645                vec![1000i64, 1100i64],
1646                vec![Some(10.0), Some(11.0)],
1647                100u64,
1648            ),
1649            (
1650                "part2",
1651                2u32,
1652                vec![2000i64, 2100i64],
1653                vec![Some(20.0), Some(21.0)],
1654                200u64,
1655            ),
1656            ("part3", 3u32, vec![3000i64], vec![Some(30.0)], 300u64),
1657        ];
1658
1659        for (k0, k1, timestamps, values, seq) in parts_data {
1660            let part = create_bulk_part_with_converter(k0, k1, timestamps, values, seq).unwrap();
1661            memtable.write_bulk(part).unwrap();
1662        }
1663
1664        let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
1665        let ranges = memtable
1666            .ranges(
1667                None,
1668                RangesOptions::default().with_predicate(predicate_group),
1669            )
1670            .unwrap();
1671
1672        assert_eq!(3, ranges.ranges.len());
1673        let total_rows: usize = ranges.ranges.values().map(|r| r.stats().num_rows()).sum();
1674        assert_eq!(5, total_rows);
1675        assert_eq!(3, ranges.ranges.len());
1676
1677        for (range_id, range) in ranges.ranges.iter() {
1678            assert!(*range_id < 3);
1679            assert!(range.num_rows() > 0);
1680            assert!(range.is_record_batch());
1681        }
1682    }
1683
1684    #[test]
1685    fn test_bulk_memtable_ranges_with_sequence_filter() {
1686        let metadata = metadata_for_test();
1687        let memtable = BulkMemtable::new(
1688            888,
1689            BulkMemtableConfig::default(),
1690            metadata.clone(),
1691            None,
1692            None,
1693            false,
1694            MergeMode::LastRow,
1695        );
1696
1697        let part = create_bulk_part_with_converter(
1698            "seq_test",
1699            1,
1700            vec![1000, 2000, 3000],
1701            vec![Some(10.0), Some(20.0), Some(30.0)],
1702            500,
1703        )
1704        .unwrap();
1705
1706        memtable.write_bulk(part).unwrap();
1707
1708        let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
1709        let sequence_filter = Some(SequenceRange::LtEq { max: 400 }); // Filters out rows with sequence > 400
1710        let ranges = memtable
1711            .ranges(
1712                None,
1713                RangesOptions::default()
1714                    .with_predicate(predicate_group)
1715                    .with_sequence(sequence_filter),
1716            )
1717            .unwrap();
1718
1719        assert_eq!(1, ranges.ranges.len());
1720        let range = ranges.ranges.get(&0).unwrap();
1721
1722        let mut record_batch_iter = range.build_record_batch_iter(None, None).unwrap();
1723        assert!(record_batch_iter.next().is_none());
1724    }
1725
1726    #[test]
1727    fn test_bulk_memtable_ranges_with_encoded_parts() {
1728        let metadata = metadata_for_test();
1729        let config = BulkMemtableConfig {
1730            merge_threshold: 8,
1731            ..Default::default()
1732        };
1733        let memtable = BulkMemtable::new(
1734            999,
1735            config,
1736            metadata.clone(),
1737            None,
1738            None,
1739            false,
1740            MergeMode::LastRow,
1741        );
1742        // Disable unordered_part for this test
1743        memtable.set_unordered_part_threshold(0);
1744
1745        // Adds enough bulk parts to trigger encoding
1746        for i in 0..10 {
1747            let part = create_bulk_part_with_converter(
1748                &format!("key_{}", i),
1749                i,
1750                vec![1000 + i as i64 * 100],
1751                vec![Some(i as f64 * 10.0)],
1752                100 + i as u64,
1753            )
1754            .unwrap();
1755            memtable.write_bulk(part).unwrap();
1756        }
1757
1758        memtable.compact(false).unwrap();
1759
1760        let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
1761        let ranges = memtable
1762            .ranges(
1763                None,
1764                RangesOptions::default().with_predicate(predicate_group),
1765            )
1766            .unwrap();
1767
1768        // Should have ranges for both bulk parts and encoded parts
1769        assert_eq!(3, ranges.ranges.len());
1770        let total_rows: usize = ranges.ranges.values().map(|r| r.stats().num_rows()).sum();
1771        assert_eq!(10, total_rows);
1772
1773        for (_range_id, range) in ranges.ranges.iter() {
1774            assert!(range.num_rows() > 0);
1775            assert!(range.is_record_batch());
1776
1777            let record_batch_iter = range.build_record_batch_iter(None, None).unwrap();
1778            let mut total_rows = 0;
1779            for batch_result in record_batch_iter {
1780                let batch = batch_result.unwrap();
1781                total_rows += batch.num_rows();
1782                assert!(batch.num_rows() > 0);
1783            }
1784            assert_eq!(total_rows, range.num_rows());
1785        }
1786    }
1787
1788    #[test]
1789    fn test_bulk_memtable_unordered_part() {
1790        let metadata = metadata_for_test();
1791        let memtable = BulkMemtable::new(
1792            1001,
1793            BulkMemtableConfig::default(),
1794            metadata.clone(),
1795            None,
1796            None,
1797            false,
1798            MergeMode::LastRow,
1799        );
1800
1801        // Set smaller thresholds for testing with smaller inputs
1802        // Accept parts with < 5 rows into unordered_part
1803        memtable.set_unordered_part_threshold(5);
1804        // Compact when total rows >= 10
1805        memtable.set_unordered_part_compact_threshold(10);
1806
1807        // Write 3 small parts (each has 2 rows), should be collected in unordered_part
1808        for i in 0..3 {
1809            let part = create_bulk_part_with_converter(
1810                &format!("key_{}", i),
1811                i,
1812                vec![1000 + i as i64 * 100, 1100 + i as i64 * 100],
1813                vec![Some(i as f64 * 10.0), Some(i as f64 * 10.0 + 1.0)],
1814                100 + i as u64,
1815            )
1816            .unwrap();
1817            assert_eq!(2, part.num_rows());
1818            memtable.write_bulk(part).unwrap();
1819        }
1820
1821        // Total rows = 6, not yet reaching compact threshold
1822        let stats = memtable.stats();
1823        assert_eq!(6, stats.num_rows);
1824
1825        // Write 2 more small parts (each has 2 rows)
1826        // This should trigger compaction when total >= 10
1827        for i in 3..5 {
1828            let part = create_bulk_part_with_converter(
1829                &format!("key_{}", i),
1830                i,
1831                vec![1000 + i as i64 * 100, 1100 + i as i64 * 100],
1832                vec![Some(i as f64 * 10.0), Some(i as f64 * 10.0 + 1.0)],
1833                100 + i as u64,
1834            )
1835            .unwrap();
1836            memtable.write_bulk(part).unwrap();
1837        }
1838
1839        // Total rows = 10, should have compacted unordered_part into a regular part
1840        let stats = memtable.stats();
1841        assert_eq!(10, stats.num_rows);
1842
1843        // Verify we can read all data correctly
1844        let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
1845        let ranges = memtable
1846            .ranges(
1847                None,
1848                RangesOptions::default().with_predicate(predicate_group),
1849            )
1850            .unwrap();
1851
1852        // Should have at least 1 range (the compacted part)
1853        assert!(!ranges.ranges.is_empty());
1854        let total_rows: usize = ranges.ranges.values().map(|r| r.stats().num_rows()).sum();
1855        assert_eq!(10, total_rows);
1856
1857        // Read all data and verify
1858        let mut total_rows_read = 0;
1859        for (_range_id, range) in ranges.ranges.iter() {
1860            assert!(range.is_record_batch());
1861            let record_batch_iter = range.build_record_batch_iter(None, None).unwrap();
1862
1863            for batch_result in record_batch_iter {
1864                let batch = batch_result.unwrap();
1865                total_rows_read += batch.num_rows();
1866            }
1867        }
1868        assert_eq!(10, total_rows_read);
1869    }
1870
1871    #[test]
1872    fn test_bulk_memtable_unordered_part_mixed_sizes() {
1873        let metadata = metadata_for_test();
1874        let memtable = BulkMemtable::new(
1875            1002,
1876            BulkMemtableConfig::default(),
1877            metadata.clone(),
1878            None,
1879            None,
1880            false,
1881            MergeMode::LastRow,
1882        );
1883
1884        // Set threshold to 4 rows - parts with < 4 rows go to unordered_part
1885        memtable.set_unordered_part_threshold(4);
1886        memtable.set_unordered_part_compact_threshold(8);
1887
1888        // Write small parts (3 rows each) - should go to unordered_part
1889        for i in 0..2 {
1890            let part = create_bulk_part_with_converter(
1891                &format!("small_{}", i),
1892                i,
1893                vec![1000 + i as i64, 2000 + i as i64, 3000 + i as i64],
1894                vec![Some(i as f64), Some(i as f64 + 1.0), Some(i as f64 + 2.0)],
1895                10 + i as u64,
1896            )
1897            .unwrap();
1898            assert_eq!(3, part.num_rows());
1899            memtable.write_bulk(part).unwrap();
1900        }
1901
1902        // Write a large part (5 rows) - should go directly to regular parts
1903        let large_part = create_bulk_part_with_converter(
1904            "large_key",
1905            100,
1906            vec![5000, 6000, 7000, 8000, 9000],
1907            vec![
1908                Some(100.0),
1909                Some(101.0),
1910                Some(102.0),
1911                Some(103.0),
1912                Some(104.0),
1913            ],
1914            50,
1915        )
1916        .unwrap();
1917        assert_eq!(5, large_part.num_rows());
1918        memtable.write_bulk(large_part).unwrap();
1919
1920        // Write another small part (2 rows) - should trigger compaction of unordered_part
1921        let part = create_bulk_part_with_converter(
1922            "small_2",
1923            2,
1924            vec![4000, 4100],
1925            vec![Some(20.0), Some(21.0)],
1926            30,
1927        )
1928        .unwrap();
1929        memtable.write_bulk(part).unwrap();
1930
1931        let stats = memtable.stats();
1932        assert_eq!(13, stats.num_rows); // 3 + 3 + 5 + 2 = 13
1933
1934        // Verify all data can be read
1935        let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
1936        let ranges = memtable
1937            .ranges(
1938                None,
1939                RangesOptions::default().with_predicate(predicate_group),
1940            )
1941            .unwrap();
1942
1943        let total_rows: usize = ranges.ranges.values().map(|r| r.stats().num_rows()).sum();
1944        assert_eq!(13, total_rows);
1945
1946        let mut total_rows_read = 0;
1947        for (_range_id, range) in ranges.ranges.iter() {
1948            let record_batch_iter = range.build_record_batch_iter(None, None).unwrap();
1949            for batch_result in record_batch_iter {
1950                let batch = batch_result.unwrap();
1951                total_rows_read += batch.num_rows();
1952            }
1953        }
1954        assert_eq!(13, total_rows_read);
1955    }
1956
1957    #[test]
1958    fn test_bulk_memtable_unordered_part_with_ranges() {
1959        let metadata = metadata_for_test();
1960        let memtable = BulkMemtable::new(
1961            1003,
1962            BulkMemtableConfig::default(),
1963            metadata.clone(),
1964            None,
1965            None,
1966            false,
1967            MergeMode::LastRow,
1968        );
1969
1970        // Set small thresholds
1971        memtable.set_unordered_part_threshold(3);
1972        memtable.set_unordered_part_compact_threshold(100); // High threshold to prevent auto-compaction
1973
1974        // Write several small parts that stay in unordered_part
1975        for i in 0..3 {
1976            let part = create_bulk_part_with_converter(
1977                &format!("key_{}", i),
1978                i,
1979                vec![1000 + i as i64 * 100],
1980                vec![Some(i as f64 * 10.0)],
1981                100 + i as u64,
1982            )
1983            .unwrap();
1984            assert_eq!(1, part.num_rows());
1985            memtable.write_bulk(part).unwrap();
1986        }
1987
1988        let stats = memtable.stats();
1989        assert_eq!(3, stats.num_rows);
1990
1991        // Test that ranges() can correctly read from unordered_part
1992        let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
1993        let ranges = memtable
1994            .ranges(
1995                None,
1996                RangesOptions::default().with_predicate(predicate_group),
1997            )
1998            .unwrap();
1999
2000        // Should have 1 range for the unordered_part
2001        assert_eq!(1, ranges.ranges.len());
2002        let total_rows: usize = ranges.ranges.values().map(|r| r.stats().num_rows()).sum();
2003        assert_eq!(3, total_rows);
2004
2005        // Verify data is sorted correctly in the range
2006        let range = ranges.ranges.get(&0).unwrap();
2007        let record_batch_iter = range.build_record_batch_iter(None, None).unwrap();
2008
2009        let mut total_rows = 0;
2010        for batch_result in record_batch_iter {
2011            let batch = batch_result.unwrap();
2012            total_rows += batch.num_rows();
2013            // Verify data is properly sorted by primary key
2014            assert!(batch.num_rows() > 0);
2015        }
2016        assert_eq!(3, total_rows);
2017    }
2018
2019    /// Helper to create a BulkPartWrapper from a BulkPart.
2020    fn create_bulk_part_wrapper(part: BulkPart) -> BulkPartWrapper {
2021        BulkPartWrapper {
2022            part: PartToMerge::Bulk {
2023                part,
2024                file_id: FileId::random(),
2025            },
2026            merging: false,
2027        }
2028    }
2029
2030    #[test]
2031    fn test_should_merge_parts_below_threshold() {
2032        let mut bulk_parts = BulkParts::default();
2033
2034        // Add 7 bulk parts (below DEFAULT_MERGE_THRESHOLD of 8)
2035        for i in 0..DEFAULT_MERGE_THRESHOLD - 1 {
2036            let part = create_bulk_part_with_converter(
2037                &format!("key_{}", i),
2038                i as u32,
2039                vec![1000 + i as i64 * 100],
2040                vec![Some(i as f64 * 10.0)],
2041                100 + i as u64,
2042            )
2043            .unwrap();
2044            bulk_parts.parts.push(create_bulk_part_wrapper(part));
2045        }
2046
2047        // Should not trigger merge since we have only 7 parts
2048        assert!(!bulk_parts.should_merge_parts(DEFAULT_MERGE_THRESHOLD));
2049    }
2050
2051    #[test]
2052    fn test_should_merge_parts_at_threshold() {
2053        let mut bulk_parts = BulkParts::default();
2054        let merge_threshold = 8;
2055
2056        // Add 8 bulk parts (at merge_threshold)
2057        for i in 0..merge_threshold {
2058            let part = create_bulk_part_with_converter(
2059                &format!("key_{}", i),
2060                i as u32,
2061                vec![1000 + i as i64 * 100],
2062                vec![Some(i as f64 * 10.0)],
2063                100 + i as u64,
2064            )
2065            .unwrap();
2066            bulk_parts.parts.push(create_bulk_part_wrapper(part));
2067        }
2068
2069        // Should trigger merge since we have 8 parts
2070        assert!(bulk_parts.should_merge_parts(merge_threshold));
2071    }
2072
2073    #[test]
2074    fn test_should_merge_parts_with_merging_flag() {
2075        let mut bulk_parts = BulkParts::default();
2076        let merge_threshold = 8;
2077
2078        // Add 10 bulk parts
2079        for i in 0..10 {
2080            let part = create_bulk_part_with_converter(
2081                &format!("key_{}", i),
2082                i as u32,
2083                vec![1000 + i as i64 * 100],
2084                vec![Some(i as f64 * 10.0)],
2085                100 + i as u64,
2086            )
2087            .unwrap();
2088            bulk_parts.parts.push(create_bulk_part_wrapper(part));
2089        }
2090
2091        // Should trigger merge since we have 10 parts
2092        assert!(bulk_parts.should_merge_parts(merge_threshold));
2093
2094        // Mark first 3 parts as merging
2095        for wrapper in bulk_parts.parts.iter_mut().take(3) {
2096            wrapper.merging = true;
2097        }
2098
2099        // Now only 7 parts are available for merging, should not trigger
2100        assert!(!bulk_parts.should_merge_parts(merge_threshold));
2101    }
2102
2103    #[test]
2104    fn test_collect_parts_to_merge_grouping() {
2105        let mut bulk_parts = BulkParts::default();
2106
2107        // Add 16 bulk parts with different row counts
2108        for i in 0..16 {
2109            let num_rows = (i % 4) + 1; // 1 to 4 rows
2110            let timestamps: Vec<i64> = (0..num_rows)
2111                .map(|j| 1000 + i as i64 * 100 + j as i64)
2112                .collect();
2113            let values: Vec<Option<f64>> =
2114                (0..num_rows).map(|j| Some((i * 10 + j) as f64)).collect();
2115            let part = create_bulk_part_with_converter(
2116                &format!("key_{}", i),
2117                i as u32,
2118                timestamps,
2119                values,
2120                100 + i as u64,
2121            )
2122            .unwrap();
2123            bulk_parts.parts.push(create_bulk_part_wrapper(part));
2124        }
2125
2126        // Should trigger merge since we have 16 parts
2127        assert!(bulk_parts.should_merge_parts(DEFAULT_MERGE_THRESHOLD));
2128
2129        // Collect parts to merge
2130        let collected =
2131            bulk_parts.collect_parts_to_merge(DEFAULT_MERGE_THRESHOLD, DEFAULT_MAX_MERGE_GROUPS);
2132
2133        // Should have groups
2134        assert!(!collected.groups.is_empty());
2135
2136        // All groups should have parts
2137        for group in &collected.groups {
2138            assert!(!group.is_empty());
2139        }
2140
2141        // Total parts collected should be 16
2142        let total_parts: usize = collected.groups.iter().map(|g| g.len()).sum();
2143        assert_eq!(16, total_parts);
2144    }
2145
2146    #[test]
2147    fn test_bulk_memtable_ranges_with_multi_bulk_part() {
2148        let metadata = metadata_for_test();
2149        let merge_threshold = 8;
2150        let config = BulkMemtableConfig {
2151            merge_threshold,
2152            ..Default::default()
2153        };
2154        let memtable = BulkMemtable::new(
2155            2005,
2156            config,
2157            metadata.clone(),
2158            None,
2159            None,
2160            false,
2161            MergeMode::LastRow,
2162        );
2163        // Disable unordered_part for this test
2164        memtable.set_unordered_part_threshold(0);
2165
2166        // Write enough bulk parts to trigger merge (merge_threshold = 8)
2167        // Each part has small number of rows so total < DEFAULT_ROW_GROUP_SIZE
2168        // This will result in MultiBulkPart after compaction
2169        for i in 0..merge_threshold {
2170            let part = create_bulk_part_with_converter(
2171                &format!("key_{}", i),
2172                i as u32,
2173                vec![1000 + i as i64 * 100, 2000 + i as i64 * 100],
2174                vec![Some(i as f64 * 10.0), Some(i as f64 * 10.0 + 1.0)],
2175                100 + i as u64,
2176            )
2177            .unwrap();
2178            memtable.write_bulk(part).unwrap();
2179        }
2180
2181        // Compact to trigger MultiBulkPart creation (since total rows < DEFAULT_ROW_GROUP_SIZE)
2182        memtable.compact(false).unwrap();
2183
2184        // Verify we can read from the memtable
2185        let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
2186        let ranges = memtable
2187            .ranges(
2188                None,
2189                RangesOptions::default().with_predicate(predicate_group),
2190            )
2191            .unwrap();
2192
2193        assert_eq!(1, ranges.ranges.len());
2194        let expected_rows = merge_threshold * 2; // Each part has 2 rows
2195        let total_rows: usize = ranges.ranges.values().map(|r| r.stats().num_rows()).sum();
2196        assert_eq!(expected_rows, total_rows);
2197
2198        // Read all data
2199        let mut total_rows_read = 0;
2200        for (_range_id, range) in ranges.ranges.iter() {
2201            assert!(range.is_record_batch());
2202            let record_batch_iter = range.build_record_batch_iter(None, None).unwrap();
2203
2204            for batch_result in record_batch_iter {
2205                let batch = batch_result.unwrap();
2206                total_rows_read += batch.num_rows();
2207            }
2208        }
2209        assert_eq!(expected_rows, total_rows_read);
2210    }
2211}