Skip to main content

mito2/memtable/
bulk.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Memtable implementation for bulk load
16
17pub(crate) mod chunk_reader;
18pub mod context;
19pub mod part;
20pub mod part_reader;
21mod row_group_reader;
22
23use std::collections::{BTreeMap, HashSet};
24use std::sync::atomic::{AtomicI64, AtomicU64, AtomicUsize, Ordering};
25use std::sync::{Arc, LazyLock, Mutex, RwLock};
26use std::time::Instant;
27
28/// Reads an environment variable as usize, returning default if not set or invalid.
29fn env_usize(name: &str, default: usize) -> usize {
30    std::env::var(name)
31        .ok()
32        .and_then(|v| v.parse().ok())
33        .unwrap_or(default)
34}
35
36use common_time::Timestamp;
37use datatypes::arrow::datatypes::SchemaRef;
38use mito_codec::key_values::KeyValue;
39use rayon::prelude::*;
40use serde::{Deserialize, Serialize};
41use serde_with::{DisplayFromStr, serde_as};
42use store_api::metadata::RegionMetadataRef;
43use store_api::storage::{ColumnId, FileId, RegionId, SequenceRange};
44use tokio::sync::Semaphore;
45
46use crate::error::{Result, UnsupportedOperationSnafu};
47use crate::flush::WriteBufferManagerRef;
48use crate::memtable::bulk::context::BulkIterContext;
49use crate::memtable::bulk::part::{
50    BulkPart, BulkPartEncodeMetrics, BulkPartEncoder, MultiBulkPart, UnorderedPart,
51    should_prune_bulk_part,
52};
53use crate::memtable::bulk::part_reader::BulkPartBatchIter;
54use crate::memtable::stats::WriteMetrics;
55use crate::memtable::{
56    AllocTracker, BoxedBatchIterator, BoxedRecordBatchIterator, EncodedBulkPart, EncodedRange,
57    IterBuilder, KeyValues, MemScanMetrics, Memtable, MemtableBuilder, MemtableId, MemtableRange,
58    MemtableRangeContext, MemtableRanges, MemtableRef, MemtableStats, RangesOptions,
59};
60use crate::read::flat_dedup::{FlatDedupIterator, FlatLastNonNull, FlatLastRow};
61use crate::read::flat_merge::FlatMergeIterator;
62use crate::region::options::MergeMode;
63use crate::sst::parquet::flat_format::field_column_start;
64use crate::sst::parquet::{DEFAULT_READ_BATCH_SIZE, DEFAULT_ROW_GROUP_SIZE};
65use crate::sst::{FlatSchemaOptions, to_flat_sst_arrow_schema};
66
67/// Default merge threshold for triggering compaction.
68const DEFAULT_MERGE_THRESHOLD: usize = 16;
69
70/// Threshold for triggering merge of parts. Configurable via `GREPTIME_BULK_MERGE_THRESHOLD`.
71static MERGE_THRESHOLD: LazyLock<usize> =
72    LazyLock::new(|| env_usize("GREPTIME_BULK_MERGE_THRESHOLD", DEFAULT_MERGE_THRESHOLD));
73
74/// Default maximum number of groups for parallel merging.
75const DEFAULT_MAX_MERGE_GROUPS: usize = 32;
76
77/// Maximum merge groups. Configurable via `GREPTIME_BULK_MAX_MERGE_GROUPS`.
78static MAX_MERGE_GROUPS: LazyLock<usize> =
79    LazyLock::new(|| env_usize("GREPTIME_BULK_MAX_MERGE_GROUPS", DEFAULT_MAX_MERGE_GROUPS));
80
81/// Row threshold for encoding parts. Configurable via `GREPTIME_BULK_ENCODE_ROW_THRESHOLD`.
82/// When estimated rows exceed this threshold, parts are encoded as EncodedBulkPart.
83pub(crate) static ENCODE_ROW_THRESHOLD: LazyLock<usize> = LazyLock::new(|| {
84    env_usize(
85        "GREPTIME_BULK_ENCODE_ROW_THRESHOLD",
86        10 * DEFAULT_ROW_GROUP_SIZE,
87    )
88});
89
90/// Default bytes threshold for encoding.
91const DEFAULT_ENCODE_BYTES_THRESHOLD: usize = 64 * 1024 * 1024;
92
93/// Bytes threshold for encoding parts. Configurable via `GREPTIME_BULK_ENCODE_BYTES_THRESHOLD`.
94/// When estimated bytes exceed this threshold, parts are encoded as EncodedBulkPart.
95static ENCODE_BYTES_THRESHOLD: LazyLock<usize> = LazyLock::new(|| {
96    env_usize(
97        "GREPTIME_BULK_ENCODE_BYTES_THRESHOLD",
98        DEFAULT_ENCODE_BYTES_THRESHOLD,
99    )
100});
101
102/// Configuration for bulk memtable.
103#[serde_as]
104#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
105#[serde(default)]
106pub struct BulkMemtableConfig {
107    /// Threshold for triggering merge of parts.
108    #[serde_as(as = "DisplayFromStr")]
109    pub merge_threshold: usize,
110    /// Row threshold for encoding parts.
111    #[serde_as(as = "DisplayFromStr")]
112    pub encode_row_threshold: usize,
113    /// Bytes threshold for encoding parts.
114    #[serde_as(as = "DisplayFromStr")]
115    pub encode_bytes_threshold: usize,
116    /// Maximum number of groups for parallel merging.
117    #[serde_as(as = "DisplayFromStr")]
118    pub max_merge_groups: usize,
119}
120
121impl Default for BulkMemtableConfig {
122    fn default() -> Self {
123        Self {
124            merge_threshold: *MERGE_THRESHOLD,
125            encode_row_threshold: *ENCODE_ROW_THRESHOLD,
126            encode_bytes_threshold: *ENCODE_BYTES_THRESHOLD,
127            max_merge_groups: *MAX_MERGE_GROUPS,
128        }
129        .sanitize()
130    }
131}
132
133impl BulkMemtableConfig {
134    fn sanitize(mut self) -> Self {
135        if self.merge_threshold == 0 {
136            self.merge_threshold = DEFAULT_MERGE_THRESHOLD;
137        }
138        self
139    }
140}
141
142/// Result of merging parts - either a MultiBulkPart or an EncodedBulkPart
143enum MergedPart {
144    /// Merged part stored as MultiBulkPart (when rows < DEFAULT_ROW_GROUP_SIZE)
145    Multi(MultiBulkPart),
146    /// Merged part stored as EncodedBulkPart (when rows >= DEFAULT_ROW_GROUP_SIZE)
147    Encoded(EncodedBulkPart),
148}
149
150/// Result of collecting parts to merge
151struct CollectedParts {
152    /// Groups of parts ready for merging (each group has up to 16 parts)
153    groups: Vec<Vec<PartToMerge>>,
154}
155
156/// All parts in a bulk memtable.
157#[derive(Default)]
158struct BulkParts {
159    /// Unordered small parts.
160    unordered_part: UnorderedPart,
161    /// All parts (raw and encoded).
162    parts: Vec<BulkPartWrapper>,
163}
164
165impl BulkParts {
166    /// Total number of parts (including unordered).
167    fn num_parts(&self) -> usize {
168        let unordered_count = if self.unordered_part.is_empty() { 0 } else { 1 };
169        self.parts.len() + unordered_count
170    }
171
172    /// Returns true if there is no part.
173    fn is_empty(&self) -> bool {
174        self.unordered_part.is_empty() && self.parts.is_empty()
175    }
176
177    /// Returns true if bulk parts or encoded parts should be merged.
178    /// Uses short-circuit counting to stop early once threshold is reached.
179    fn should_merge_parts(&self, merge_threshold: usize) -> bool {
180        let mut bulk_count = 0;
181        let mut encoded_count = 0;
182
183        for wrapper in &self.parts {
184            if wrapper.merging {
185                continue;
186            }
187
188            if wrapper.part.is_encoded() {
189                encoded_count += 1;
190            } else {
191                bulk_count += 1;
192            }
193
194            // Short-circuit: stop counting if either threshold is reached
195            if bulk_count >= merge_threshold || encoded_count >= merge_threshold {
196                return true;
197            }
198        }
199
200        false
201    }
202
203    /// Returns true if the unordered_part should be compacted into a BulkPart.
204    fn should_compact_unordered_part(&self) -> bool {
205        self.unordered_part.should_compact()
206    }
207
208    /// Collects unmerged parts and marks them as being merged.
209    /// Only collects parts of types that meet the threshold.
210    /// Parts are pre-grouped into chunks for parallel processing.
211    fn collect_parts_to_merge(
212        &mut self,
213        merge_threshold: usize,
214        max_merge_groups: usize,
215    ) -> CollectedParts {
216        // First pass: collect indices and row counts for each type
217        let mut bulk_indices: Vec<(usize, usize)> = Vec::new();
218        let mut encoded_indices: Vec<(usize, usize)> = Vec::new();
219
220        for (idx, wrapper) in self.parts.iter().enumerate() {
221            if wrapper.merging {
222                continue;
223            }
224            let num_rows = wrapper.part.num_rows();
225            if wrapper.part.is_encoded() {
226                encoded_indices.push((idx, num_rows));
227            } else {
228                bulk_indices.push((idx, num_rows));
229            }
230        }
231
232        let mut groups = Vec::new();
233
234        // Process bulk parts if threshold met
235        if bulk_indices.len() >= merge_threshold {
236            groups.extend(self.collect_and_group_parts(
237                bulk_indices,
238                merge_threshold,
239                max_merge_groups,
240            ));
241        }
242
243        // Process encoded parts if threshold met
244        if encoded_indices.len() >= merge_threshold {
245            groups.extend(self.collect_and_group_parts(
246                encoded_indices,
247                merge_threshold,
248                max_merge_groups,
249            ));
250        }
251
252        CollectedParts { groups }
253    }
254
255    /// Sorts indices by row count, groups into chunks, marks as merging, and returns groups.
256    fn collect_and_group_parts(
257        &mut self,
258        mut indices: Vec<(usize, usize)>,
259        merge_threshold: usize,
260        max_merge_groups: usize,
261    ) -> Vec<Vec<PartToMerge>> {
262        if indices.is_empty() {
263            return Vec::new();
264        }
265
266        // Sort by row count for better grouping
267        indices.sort_unstable_by_key(|(_, num_rows)| *num_rows);
268
269        // Group into chunks of merge_threshold size, limit to max_merge_groups
270        indices
271            .chunks(merge_threshold)
272            .take(max_merge_groups)
273            .map(|chunk| {
274                chunk
275                    .iter()
276                    .map(|(idx, _)| {
277                        let wrapper = &mut self.parts[*idx];
278                        wrapper.merging = true;
279                        wrapper.part.clone()
280                    })
281                    .collect()
282            })
283            .collect()
284    }
285
286    /// Installs merged parts and removes the original parts by file ids.
287    /// Returns the total number of rows in the merged parts.
288    fn install_merged_parts<I>(
289        &mut self,
290        merged_parts: I,
291        merged_file_ids: &HashSet<FileId>,
292    ) -> usize
293    where
294        I: IntoIterator<Item = MergedPart>,
295    {
296        let mut total_output_rows = 0;
297
298        for merged_part in merged_parts {
299            match merged_part {
300                MergedPart::Encoded(encoded_part) => {
301                    total_output_rows += encoded_part.metadata().num_rows;
302                    self.parts.push(BulkPartWrapper {
303                        part: PartToMerge::Encoded {
304                            part: encoded_part,
305                            file_id: FileId::random(),
306                        },
307                        merging: false,
308                    });
309                }
310                MergedPart::Multi(multi_part) => {
311                    total_output_rows += multi_part.num_rows();
312                    self.parts.push(BulkPartWrapper {
313                        part: PartToMerge::Multi {
314                            part: multi_part,
315                            file_id: FileId::random(),
316                        },
317                        merging: false,
318                    });
319                }
320            }
321        }
322
323        self.parts
324            .retain(|wrapper| !merged_file_ids.contains(&wrapper.file_id()));
325
326        total_output_rows
327    }
328
329    /// Resets merging flag for parts with the given file ids.
330    /// Used when merging fails or is cancelled.
331    fn reset_merging_flags(&mut self, file_ids: &HashSet<FileId>) {
332        for wrapper in &mut self.parts {
333            if file_ids.contains(&wrapper.file_id()) {
334                wrapper.merging = false;
335            }
336        }
337    }
338}
339
340/// RAII guard for managing merging flags.
341/// Automatically resets merging flags when dropped if the merge operation wasn't successful.
342struct MergingFlagsGuard<'a> {
343    bulk_parts: &'a RwLock<BulkParts>,
344    file_ids: &'a HashSet<FileId>,
345    success: bool,
346}
347
348impl<'a> MergingFlagsGuard<'a> {
349    /// Creates a new guard for the given file ids.
350    fn new(bulk_parts: &'a RwLock<BulkParts>, file_ids: &'a HashSet<FileId>) -> Self {
351        Self {
352            bulk_parts,
353            file_ids,
354            success: false,
355        }
356    }
357
358    /// Marks the merge operation as successful.
359    /// When this is called, the guard will not reset the flags on drop.
360    fn mark_success(&mut self) {
361        self.success = true;
362    }
363}
364
365impl<'a> Drop for MergingFlagsGuard<'a> {
366    fn drop(&mut self) {
367        if !self.success
368            && let Ok(mut parts) = self.bulk_parts.write()
369        {
370            parts.reset_merging_flags(self.file_ids);
371        }
372    }
373}
374
375/// Memtable that ingests and scans parts directly.
376pub struct BulkMemtable {
377    id: MemtableId,
378    /// Configuration for the bulk memtable.
379    config: BulkMemtableConfig,
380    parts: Arc<RwLock<BulkParts>>,
381    metadata: RegionMetadataRef,
382    alloc_tracker: AllocTracker,
383    max_timestamp: AtomicI64,
384    min_timestamp: AtomicI64,
385    max_sequence: AtomicU64,
386    num_rows: AtomicUsize,
387    /// Cached flat SST arrow schema for memtable compaction.
388    flat_arrow_schema: SchemaRef,
389    /// Compactor for merging bulk parts
390    compactor: Arc<Mutex<MemtableCompactor>>,
391    /// Dispatcher for scheduling compaction tasks
392    compact_dispatcher: Option<Arc<CompactDispatcher>>,
393    /// Whether the append mode is enabled
394    append_mode: bool,
395    /// Mode to handle duplicate rows while merging
396    merge_mode: MergeMode,
397}
398
399impl std::fmt::Debug for BulkMemtable {
400    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
401        f.debug_struct("BulkMemtable")
402            .field("id", &self.id)
403            .field("num_rows", &self.num_rows.load(Ordering::Relaxed))
404            .field("min_timestamp", &self.min_timestamp.load(Ordering::Relaxed))
405            .field("max_timestamp", &self.max_timestamp.load(Ordering::Relaxed))
406            .field("max_sequence", &self.max_sequence.load(Ordering::Relaxed))
407            .finish()
408    }
409}
410
411impl Memtable for BulkMemtable {
412    fn id(&self) -> MemtableId {
413        self.id
414    }
415
416    fn write(&self, _kvs: &KeyValues) -> Result<()> {
417        UnsupportedOperationSnafu {
418            err_msg: "write() is not supported for bulk memtable",
419        }
420        .fail()
421    }
422
423    fn write_one(&self, _key_value: KeyValue) -> Result<()> {
424        UnsupportedOperationSnafu {
425            err_msg: "write_one() is not supported for bulk memtable",
426        }
427        .fail()
428    }
429
430    fn write_bulk(&self, fragment: BulkPart) -> Result<()> {
431        let local_metrics = WriteMetrics {
432            key_bytes: 0,
433            value_bytes: fragment.estimated_size(),
434            min_ts: fragment.min_timestamp,
435            max_ts: fragment.max_timestamp,
436            num_rows: fragment.num_rows(),
437            max_sequence: fragment.sequence,
438        };
439
440        {
441            let mut bulk_parts = self.parts.write().unwrap();
442
443            // Routes small parts to unordered_part based on threshold
444            if bulk_parts.unordered_part.should_accept(fragment.num_rows()) {
445                bulk_parts.unordered_part.push(fragment);
446
447                // Compacts unordered_part if threshold is reached
448                if bulk_parts.should_compact_unordered_part()
449                    && let Some(bulk_part) = bulk_parts.unordered_part.to_bulk_part()?
450                {
451                    bulk_parts.parts.push(BulkPartWrapper {
452                        part: PartToMerge::Bulk {
453                            part: bulk_part,
454                            file_id: FileId::random(),
455                        },
456                        merging: false,
457                    });
458                    bulk_parts.unordered_part.clear();
459                }
460            } else {
461                bulk_parts.parts.push(BulkPartWrapper {
462                    part: PartToMerge::Bulk {
463                        part: fragment,
464                        file_id: FileId::random(),
465                    },
466                    merging: false,
467                });
468            }
469
470            // Since this operation should be fast, we do it in parts lock scope.
471            // This ensure the statistics in `ranges()` are correct. What's more,
472            // it guarantees no rows are out of the time range so we don't need to
473            // prune rows by time range again in the iterator of the MemtableRange.
474            self.update_stats(local_metrics);
475        }
476
477        if self.should_compact() {
478            self.schedule_compact();
479        }
480
481        Ok(())
482    }
483
484    fn ranges(
485        &self,
486        projection: Option<&[ColumnId]>,
487        options: RangesOptions,
488    ) -> Result<MemtableRanges> {
489        let predicate = options.predicate;
490        let sequence = options.sequence;
491        let mut ranges = BTreeMap::new();
492        let mut range_id = 0;
493
494        // TODO(yingwen): Filter ranges by sequence.
495        let context = Arc::new(BulkIterContext::new_with_pre_filter_mode(
496            self.metadata.clone(),
497            projection,
498            predicate.predicate().cloned(),
499            options.for_flush,
500            options.pre_filter_mode,
501        )?);
502
503        // Adds ranges for regular parts and encoded parts
504        {
505            let bulk_parts = self.parts.read().unwrap();
506
507            // Adds range for unordered part if not empty
508            if !bulk_parts.unordered_part.is_empty()
509                && let Some(unordered_bulk_part) = bulk_parts.unordered_part.to_bulk_part()?
510            {
511                let part_stats = unordered_bulk_part.to_memtable_stats(&self.metadata);
512                let range = MemtableRange::new(
513                    Arc::new(MemtableRangeContext::new(
514                        self.id,
515                        Box::new(BulkRangeIterBuilder {
516                            part: unordered_bulk_part,
517                            context: context.clone(),
518                            sequence,
519                        }),
520                        predicate.clone(),
521                    )),
522                    part_stats,
523                );
524                ranges.insert(range_id, range);
525                range_id += 1;
526            }
527
528            // Adds ranges for all parts
529            for part_wrapper in bulk_parts.parts.iter() {
530                // Skips empty parts
531                if part_wrapper.part.num_rows() == 0 {
532                    continue;
533                }
534
535                let part_stats = part_wrapper.part.to_memtable_stats(&self.metadata);
536                let iter_builder: Box<dyn IterBuilder> = match &part_wrapper.part {
537                    PartToMerge::Bulk { part, .. } => Box::new(BulkRangeIterBuilder {
538                        part: part.clone(),
539                        context: context.clone(),
540                        sequence,
541                    }),
542                    PartToMerge::Multi { part, .. } => Box::new(MultiBulkRangeIterBuilder {
543                        part: part.clone(),
544                        context: context.clone(),
545                        sequence,
546                    }),
547                    PartToMerge::Encoded { part, file_id } => {
548                        Box::new(EncodedBulkRangeIterBuilder {
549                            file_id: *file_id,
550                            part: part.clone(),
551                            context: context.clone(),
552                            sequence,
553                        })
554                    }
555                };
556
557                let range = MemtableRange::new(
558                    Arc::new(MemtableRangeContext::new(
559                        self.id,
560                        iter_builder,
561                        predicate.clone(),
562                    )),
563                    part_stats,
564                );
565                ranges.insert(range_id, range);
566                range_id += 1;
567            }
568        }
569
570        Ok(MemtableRanges { ranges })
571    }
572
573    fn is_empty(&self) -> bool {
574        let bulk_parts = self.parts.read().unwrap();
575        bulk_parts.is_empty()
576    }
577
578    fn freeze(&self) -> Result<()> {
579        self.alloc_tracker.done_allocating();
580        Ok(())
581    }
582
583    fn stats(&self) -> MemtableStats {
584        let estimated_bytes = self.alloc_tracker.bytes_allocated();
585
586        if estimated_bytes == 0 || self.num_rows.load(Ordering::Relaxed) == 0 {
587            return MemtableStats {
588                estimated_bytes,
589                time_range: None,
590                num_rows: 0,
591                num_ranges: 0,
592                max_sequence: 0,
593                series_count: 0,
594            };
595        }
596
597        let ts_type = self
598            .metadata
599            .time_index_column()
600            .column_schema
601            .data_type
602            .clone()
603            .as_timestamp()
604            .expect("Timestamp column must have timestamp type");
605        let max_timestamp = ts_type.create_timestamp(self.max_timestamp.load(Ordering::Relaxed));
606        let min_timestamp = ts_type.create_timestamp(self.min_timestamp.load(Ordering::Relaxed));
607
608        let num_ranges = self.parts.read().unwrap().num_parts();
609
610        MemtableStats {
611            estimated_bytes,
612            time_range: Some((min_timestamp, max_timestamp)),
613            num_rows: self.num_rows.load(Ordering::Relaxed),
614            num_ranges,
615            max_sequence: self.max_sequence.load(Ordering::Relaxed),
616            series_count: self.estimated_series_count(),
617        }
618    }
619
620    fn fork(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef {
621        // Computes the new flat schema based on the new metadata.
622        let flat_arrow_schema = to_flat_sst_arrow_schema(
623            metadata,
624            &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
625        );
626
627        Arc::new(Self {
628            id,
629            config: self.config.clone(),
630            parts: Arc::new(RwLock::new(BulkParts::default())),
631            metadata: metadata.clone(),
632            alloc_tracker: AllocTracker::new(self.alloc_tracker.write_buffer_manager()),
633            max_timestamp: AtomicI64::new(i64::MIN),
634            min_timestamp: AtomicI64::new(i64::MAX),
635            max_sequence: AtomicU64::new(0),
636            num_rows: AtomicUsize::new(0),
637            flat_arrow_schema,
638            compactor: Arc::new(Mutex::new(MemtableCompactor::new(
639                metadata.region_id,
640                id,
641                self.config.clone(),
642            ))),
643            compact_dispatcher: self.compact_dispatcher.clone(),
644            append_mode: self.append_mode,
645            merge_mode: self.merge_mode,
646        })
647    }
648
649    fn compact(&self, for_flush: bool) -> Result<()> {
650        let mut compactor = self.compactor.lock().unwrap();
651
652        if for_flush {
653            return Ok(());
654        }
655
656        // Unified merge for all parts
657        let should_merge = self
658            .parts
659            .read()
660            .unwrap()
661            .should_merge_parts(self.config.merge_threshold);
662        if should_merge {
663            compactor.merge_parts(
664                &self.flat_arrow_schema,
665                &self.parts,
666                &self.metadata,
667                !self.append_mode,
668                self.merge_mode,
669            )?;
670        }
671
672        Ok(())
673    }
674}
675
676impl BulkMemtable {
677    /// Creates a new BulkMemtable
678    pub fn new(
679        id: MemtableId,
680        config: BulkMemtableConfig,
681        metadata: RegionMetadataRef,
682        write_buffer_manager: Option<WriteBufferManagerRef>,
683        compact_dispatcher: Option<Arc<CompactDispatcher>>,
684        append_mode: bool,
685        merge_mode: MergeMode,
686    ) -> Self {
687        let config = config.sanitize();
688        let flat_arrow_schema = to_flat_sst_arrow_schema(
689            &metadata,
690            &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
691        );
692
693        let region_id = metadata.region_id;
694        Self {
695            id,
696            config: config.clone(),
697            parts: Arc::new(RwLock::new(BulkParts::default())),
698            metadata,
699            alloc_tracker: AllocTracker::new(write_buffer_manager),
700            max_timestamp: AtomicI64::new(i64::MIN),
701            min_timestamp: AtomicI64::new(i64::MAX),
702            max_sequence: AtomicU64::new(0),
703            num_rows: AtomicUsize::new(0),
704            flat_arrow_schema,
705            compactor: Arc::new(Mutex::new(MemtableCompactor::new(region_id, id, config))),
706            compact_dispatcher,
707            append_mode,
708            merge_mode,
709        }
710    }
711
712    /// Sets the unordered part threshold (for testing).
713    #[cfg(test)]
714    pub fn set_unordered_part_threshold(&self, threshold: usize) {
715        self.parts
716            .write()
717            .unwrap()
718            .unordered_part
719            .set_threshold(threshold);
720    }
721
722    /// Sets the unordered part compact threshold (for testing).
723    #[cfg(test)]
724    pub fn set_unordered_part_compact_threshold(&self, compact_threshold: usize) {
725        self.parts
726            .write()
727            .unwrap()
728            .unordered_part
729            .set_compact_threshold(compact_threshold);
730    }
731
732    /// Updates memtable stats.
733    ///
734    /// Please update this inside the write lock scope.
735    fn update_stats(&self, stats: WriteMetrics) {
736        self.alloc_tracker
737            .on_allocation(stats.key_bytes + stats.value_bytes);
738
739        self.max_timestamp
740            .fetch_max(stats.max_ts, Ordering::Relaxed);
741        self.min_timestamp
742            .fetch_min(stats.min_ts, Ordering::Relaxed);
743        self.max_sequence
744            .fetch_max(stats.max_sequence, Ordering::Relaxed);
745        self.num_rows.fetch_add(stats.num_rows, Ordering::Relaxed);
746    }
747
748    /// Returns the estimated time series count.
749    fn estimated_series_count(&self) -> usize {
750        let bulk_parts = self.parts.read().unwrap();
751        bulk_parts
752            .parts
753            .iter()
754            .map(|part_wrapper| part_wrapper.part.series_count())
755            .sum()
756    }
757
758    /// Returns whether the memtable should be compacted.
759    fn should_compact(&self) -> bool {
760        let parts = self.parts.read().unwrap();
761        parts.should_merge_parts(self.config.merge_threshold)
762    }
763
764    /// Schedules a compaction task using the CompactDispatcher.
765    fn schedule_compact(&self) {
766        if let Some(dispatcher) = &self.compact_dispatcher {
767            let task = MemCompactTask {
768                metadata: self.metadata.clone(),
769                parts: self.parts.clone(),
770                config: self.config.clone(),
771                flat_arrow_schema: self.flat_arrow_schema.clone(),
772                compactor: self.compactor.clone(),
773                append_mode: self.append_mode,
774                merge_mode: self.merge_mode,
775            };
776
777            dispatcher.dispatch_compact(task);
778        } else {
779            // Uses synchronous compaction if no dispatcher is available.
780            if let Err(e) = self.compact(false) {
781                common_telemetry::error!(e; "Failed to compact table");
782            }
783        }
784    }
785}
786
787/// Iterator builder for bulk range
788pub struct BulkRangeIterBuilder {
789    pub part: BulkPart,
790    pub context: Arc<BulkIterContext>,
791    pub sequence: Option<SequenceRange>,
792}
793
794/// Iterator builder for multi bulk range
795struct MultiBulkRangeIterBuilder {
796    part: MultiBulkPart,
797    context: Arc<BulkIterContext>,
798    sequence: Option<SequenceRange>,
799}
800
801impl IterBuilder for BulkRangeIterBuilder {
802    fn build(&self, _metrics: Option<MemScanMetrics>) -> Result<BoxedBatchIterator> {
803        UnsupportedOperationSnafu {
804            err_msg: "BatchIterator is not supported for bulk memtable",
805        }
806        .fail()
807    }
808
809    fn is_record_batch(&self) -> bool {
810        true
811    }
812
813    fn build_record_batch(
814        &self,
815        _time_range: Option<(Timestamp, Timestamp)>,
816        metrics: Option<MemScanMetrics>,
817    ) -> Result<BoxedRecordBatchIterator> {
818        let metadata = self.context.read_format().metadata();
819        if should_prune_bulk_part(&self.part.batch, &self.context, metadata) {
820            return Ok(Box::new(std::iter::empty()));
821        }
822
823        let series_count = self.part.estimated_series_count();
824        let iter = BulkPartBatchIter::from_single(
825            self.part.batch.clone(),
826            self.context.clone(),
827            self.sequence,
828            series_count,
829            metrics,
830        );
831
832        Ok(Box::new(iter))
833    }
834
835    fn encoded_range(&self) -> Option<EncodedRange> {
836        None
837    }
838}
839
840impl IterBuilder for MultiBulkRangeIterBuilder {
841    fn build(&self, _metrics: Option<MemScanMetrics>) -> Result<BoxedBatchIterator> {
842        UnsupportedOperationSnafu {
843            err_msg: "BatchIterator is not supported for multi bulk memtable",
844        }
845        .fail()
846    }
847
848    fn is_record_batch(&self) -> bool {
849        true
850    }
851
852    fn build_record_batch(
853        &self,
854        _time_range: Option<(Timestamp, Timestamp)>,
855        metrics: Option<MemScanMetrics>,
856    ) -> Result<BoxedRecordBatchIterator> {
857        match self
858            .part
859            .read(self.context.clone(), self.sequence, metrics)?
860        {
861            Some(iter) => Ok(iter),
862            // All batches were pruned by the predicate. Return an empty iterator.
863            None => Ok(Box::new(std::iter::empty())),
864        }
865    }
866
867    fn encoded_range(&self) -> Option<EncodedRange> {
868        None
869    }
870}
871
872/// Iterator builder for encoded bulk range
873struct EncodedBulkRangeIterBuilder {
874    file_id: FileId,
875    part: EncodedBulkPart,
876    context: Arc<BulkIterContext>,
877    sequence: Option<SequenceRange>,
878}
879
880impl IterBuilder for EncodedBulkRangeIterBuilder {
881    fn build(&self, _metrics: Option<MemScanMetrics>) -> Result<BoxedBatchIterator> {
882        UnsupportedOperationSnafu {
883            err_msg: "BatchIterator is not supported for encoded bulk memtable",
884        }
885        .fail()
886    }
887
888    fn is_record_batch(&self) -> bool {
889        true
890    }
891
892    fn build_record_batch(
893        &self,
894        _time_range: Option<(Timestamp, Timestamp)>,
895        metrics: Option<MemScanMetrics>,
896    ) -> Result<BoxedRecordBatchIterator> {
897        if let Some(iter) = self
898            .part
899            .read(self.context.clone(), self.sequence, metrics)?
900        {
901            Ok(iter)
902        } else {
903            // Return an empty iterator if no data to read
904            Ok(Box::new(std::iter::empty()))
905        }
906    }
907
908    fn encoded_range(&self) -> Option<EncodedRange> {
909        Some(EncodedRange {
910            data: self.part.data().clone(),
911            sst_info: self.part.to_sst_info(self.file_id),
912        })
913    }
914}
915
916struct BulkPartWrapper {
917    /// The part to store. It already contains the file id.
918    part: PartToMerge,
919    /// Whether this part is currently being merged.
920    merging: bool,
921}
922
923impl BulkPartWrapper {
924    /// Returns the file id of this part.
925    fn file_id(&self) -> FileId {
926        self.part.file_id()
927    }
928}
929
930/// Enum to wrap different types of parts for unified merging.
931#[derive(Clone)]
932enum PartToMerge {
933    /// Raw bulk part.
934    Bulk { part: BulkPart, file_id: FileId },
935    /// Multiple bulk parts.
936    Multi {
937        part: MultiBulkPart,
938        file_id: FileId,
939    },
940    /// Encoded bulk part.
941    Encoded {
942        part: EncodedBulkPart,
943        file_id: FileId,
944    },
945}
946
947impl PartToMerge {
948    /// Gets the file ID of this part.
949    fn file_id(&self) -> FileId {
950        match self {
951            PartToMerge::Bulk { file_id, .. } => *file_id,
952            PartToMerge::Multi { file_id, .. } => *file_id,
953            PartToMerge::Encoded { file_id, .. } => *file_id,
954        }
955    }
956
957    /// Gets the minimum timestamp of this part.
958    fn min_timestamp(&self) -> i64 {
959        match self {
960            PartToMerge::Bulk { part, .. } => part.min_timestamp,
961            PartToMerge::Multi { part, .. } => part.min_timestamp(),
962            PartToMerge::Encoded { part, .. } => part.metadata().min_timestamp,
963        }
964    }
965
966    /// Gets the maximum timestamp of this part.
967    fn max_timestamp(&self) -> i64 {
968        match self {
969            PartToMerge::Bulk { part, .. } => part.max_timestamp,
970            PartToMerge::Multi { part, .. } => part.max_timestamp(),
971            PartToMerge::Encoded { part, .. } => part.metadata().max_timestamp,
972        }
973    }
974
975    /// Gets the number of rows in this part.
976    fn num_rows(&self) -> usize {
977        match self {
978            PartToMerge::Bulk { part, .. } => part.num_rows(),
979            PartToMerge::Multi { part, .. } => part.num_rows(),
980            PartToMerge::Encoded { part, .. } => part.metadata().num_rows,
981        }
982    }
983
984    /// Gets the maximum sequence number of this part.
985    fn max_sequence(&self) -> u64 {
986        match self {
987            PartToMerge::Bulk { part, .. } => part.sequence,
988            PartToMerge::Multi { part, .. } => part.max_sequence(),
989            PartToMerge::Encoded { part, .. } => part.metadata().max_sequence,
990        }
991    }
992
993    /// Gets the estimated series count in this part.
994    fn series_count(&self) -> usize {
995        match self {
996            PartToMerge::Bulk { part, .. } => part.estimated_series_count(),
997            PartToMerge::Multi { part, .. } => part.series_count(),
998            PartToMerge::Encoded { part, .. } => part.metadata().num_series as usize,
999        }
1000    }
1001
1002    /// Returns true if this is an encoded part.
1003    fn is_encoded(&self) -> bool {
1004        matches!(self, PartToMerge::Encoded { .. })
1005    }
1006
1007    /// Gets the estimated size in bytes of this part.
1008    fn estimated_size(&self) -> usize {
1009        match self {
1010            PartToMerge::Bulk { part, .. } => part.estimated_size(),
1011            PartToMerge::Multi { part, .. } => part.estimated_size(),
1012            PartToMerge::Encoded { part, .. } => part.size_bytes(),
1013        }
1014    }
1015
1016    /// Converts this part to `MemtableStats`.
1017    fn to_memtable_stats(&self, region_metadata: &RegionMetadataRef) -> MemtableStats {
1018        match self {
1019            PartToMerge::Bulk { part, .. } => part.to_memtable_stats(region_metadata),
1020            PartToMerge::Multi { part, .. } => part.to_memtable_stats(region_metadata),
1021            PartToMerge::Encoded { part, .. } => part.to_memtable_stats(),
1022        }
1023    }
1024
1025    /// Creates a record batch iterator for this part.
1026    fn create_iterator(
1027        self,
1028        context: Arc<BulkIterContext>,
1029    ) -> Result<Option<BoxedRecordBatchIterator>> {
1030        match self {
1031            PartToMerge::Bulk { part, .. } => {
1032                let series_count = part.estimated_series_count();
1033                let iter = BulkPartBatchIter::from_single(
1034                    part.batch,
1035                    context,
1036                    None, // No sequence filter for merging
1037                    series_count,
1038                    None, // No metrics for merging
1039                );
1040                Ok(Some(Box::new(iter) as BoxedRecordBatchIterator))
1041            }
1042            PartToMerge::Multi { part, .. } => part.read(context, None, None),
1043            PartToMerge::Encoded { part, .. } => part.read(context, None, None),
1044        }
1045    }
1046}
1047
1048struct MemtableCompactor {
1049    region_id: RegionId,
1050    memtable_id: MemtableId,
1051    /// Configuration for the bulk memtable.
1052    config: BulkMemtableConfig,
1053}
1054
1055impl MemtableCompactor {
1056    /// Creates a new MemtableCompactor.
1057    fn new(region_id: RegionId, memtable_id: MemtableId, config: BulkMemtableConfig) -> Self {
1058        Self {
1059            region_id,
1060            memtable_id,
1061            config,
1062        }
1063    }
1064
1065    /// Merges parts (bulk and encoded) and then encodes the result.
1066    fn merge_parts(
1067        &mut self,
1068        arrow_schema: &SchemaRef,
1069        bulk_parts: &RwLock<BulkParts>,
1070        metadata: &RegionMetadataRef,
1071        dedup: bool,
1072        merge_mode: MergeMode,
1073    ) -> Result<()> {
1074        let start = Instant::now();
1075
1076        // Collect pre-grouped parts
1077        let collected = bulk_parts
1078            .write()
1079            .unwrap()
1080            .collect_parts_to_merge(self.config.merge_threshold, self.config.max_merge_groups);
1081
1082        if collected.groups.is_empty() {
1083            return Ok(());
1084        }
1085
1086        // Collect all file IDs for tracking
1087        let merged_file_ids: HashSet<FileId> = collected
1088            .groups
1089            .iter()
1090            .flatten()
1091            .map(|part| part.file_id())
1092            .collect();
1093        let mut guard = MergingFlagsGuard::new(bulk_parts, &merged_file_ids);
1094
1095        let num_groups = collected.groups.len();
1096        let num_parts: usize = collected.groups.iter().map(|g| g.len()).sum();
1097
1098        let encode_row_threshold = self.config.encode_row_threshold;
1099        let encode_bytes_threshold = self.config.encode_bytes_threshold;
1100
1101        // Merge all groups in parallel
1102        let merged_parts = collected
1103            .groups
1104            .into_par_iter()
1105            .map(|group| {
1106                Self::merge_parts_group(
1107                    group,
1108                    arrow_schema,
1109                    metadata,
1110                    dedup,
1111                    merge_mode,
1112                    encode_row_threshold,
1113                    encode_bytes_threshold,
1114                )
1115            })
1116            .collect::<Result<Vec<Option<MergedPart>>>>()?;
1117
1118        // Install all merged parts
1119        let total_output_rows = {
1120            let mut parts = bulk_parts.write().unwrap();
1121            parts.install_merged_parts(merged_parts.into_iter().flatten(), &merged_file_ids)
1122        };
1123
1124        guard.mark_success();
1125
1126        common_telemetry::debug!(
1127            "BulkMemtable {} {} concurrent compact {} groups, {} parts, {} rows, cost: {:?}",
1128            self.region_id,
1129            self.memtable_id,
1130            num_groups,
1131            num_parts,
1132            total_output_rows,
1133            start.elapsed()
1134        );
1135
1136        Ok(())
1137    }
1138
1139    /// Merges a group of parts into a single part (either MultiBulkPart or EncodedBulkPart).
1140    fn merge_parts_group(
1141        parts_to_merge: Vec<PartToMerge>,
1142        arrow_schema: &SchemaRef,
1143        metadata: &RegionMetadataRef,
1144        dedup: bool,
1145        merge_mode: MergeMode,
1146        encode_row_threshold: usize,
1147        encode_bytes_threshold: usize,
1148    ) -> Result<Option<MergedPart>> {
1149        if parts_to_merge.is_empty() {
1150            return Ok(None);
1151        }
1152
1153        // Calculates timestamp bounds and statistics for merged data
1154        let min_timestamp = parts_to_merge
1155            .iter()
1156            .map(|p| p.min_timestamp())
1157            .min()
1158            .unwrap_or(i64::MAX);
1159        let max_timestamp = parts_to_merge
1160            .iter()
1161            .map(|p| p.max_timestamp())
1162            .max()
1163            .unwrap_or(i64::MIN);
1164        let max_sequence = parts_to_merge
1165            .iter()
1166            .map(|p| p.max_sequence())
1167            .max()
1168            .unwrap_or(0);
1169
1170        // Collects statistics from parts before creating iterators
1171        let estimated_total_rows: usize = parts_to_merge.iter().map(|p| p.num_rows()).sum();
1172        let estimated_total_bytes: usize = parts_to_merge.iter().map(|p| p.estimated_size()).sum();
1173        let estimated_series_count = parts_to_merge
1174            .iter()
1175            .map(|p| p.series_count())
1176            .max()
1177            .unwrap_or(0);
1178
1179        let context = Arc::new(BulkIterContext::new(
1180            metadata.clone(),
1181            None, // No column projection for merging
1182            None, // No predicate for merging
1183            true,
1184        )?);
1185
1186        // Creates iterators for all parts to merge.
1187        let iterators: Vec<BoxedRecordBatchIterator> = parts_to_merge
1188            .into_iter()
1189            .filter_map(|part| part.create_iterator(context.clone()).ok().flatten())
1190            .collect();
1191
1192        if iterators.is_empty() {
1193            return Ok(None);
1194        }
1195
1196        let merged_iter =
1197            FlatMergeIterator::new(arrow_schema.clone(), iterators, DEFAULT_READ_BATCH_SIZE)?;
1198
1199        let boxed_iter: BoxedRecordBatchIterator = if dedup {
1200            // Applies deduplication based on merge mode
1201            match merge_mode {
1202                MergeMode::LastRow => {
1203                    let dedup_iter = FlatDedupIterator::new(merged_iter, FlatLastRow::new(false));
1204                    Box::new(dedup_iter)
1205                }
1206                MergeMode::LastNonNull => {
1207                    let field_column_start =
1208                        field_column_start(metadata, arrow_schema.fields().len());
1209
1210                    let dedup_iter = FlatDedupIterator::new(
1211                        merged_iter,
1212                        FlatLastNonNull::new(field_column_start, false),
1213                    );
1214                    Box::new(dedup_iter)
1215                }
1216            }
1217        } else {
1218            Box::new(merged_iter)
1219        };
1220
1221        // Encode as EncodedBulkPart if rows exceed row threshold OR bytes exceed bytes threshold
1222        if estimated_total_rows > encode_row_threshold
1223            || estimated_total_bytes > encode_bytes_threshold
1224        {
1225            let encoder = BulkPartEncoder::new(metadata.clone(), DEFAULT_ROW_GROUP_SIZE)?;
1226            let mut metrics = BulkPartEncodeMetrics::default();
1227            let encoded_part = encoder.encode_record_batch_iter(
1228                boxed_iter,
1229                arrow_schema.clone(),
1230                min_timestamp,
1231                max_timestamp,
1232                max_sequence,
1233                &mut metrics,
1234            )?;
1235
1236            common_telemetry::trace!("merge_parts_group metrics: {:?}", metrics);
1237
1238            Ok(encoded_part.map(MergedPart::Encoded))
1239        } else {
1240            // Otherwise, collect into MultiBulkPart
1241            let mut batches = Vec::new();
1242            let mut actual_total_rows = 0;
1243
1244            for batch_result in boxed_iter {
1245                let batch = batch_result?;
1246                actual_total_rows += batch.num_rows();
1247                batches.push(batch);
1248            }
1249
1250            if actual_total_rows == 0 {
1251                return Ok(None);
1252            }
1253
1254            let multi_part = MultiBulkPart::new(
1255                batches,
1256                min_timestamp,
1257                max_timestamp,
1258                max_sequence,
1259                estimated_series_count,
1260                metadata,
1261            );
1262
1263            common_telemetry::trace!(
1264                "merge_parts_group created MultiBulkPart: rows={}, batches={}",
1265                actual_total_rows,
1266                multi_part.num_batches()
1267            );
1268
1269            Ok(Some(MergedPart::Multi(multi_part)))
1270        }
1271    }
1272}
1273
1274/// A memtable compact task to run in background.
1275struct MemCompactTask {
1276    metadata: RegionMetadataRef,
1277    parts: Arc<RwLock<BulkParts>>,
1278    /// Configuration for the bulk memtable.
1279    config: BulkMemtableConfig,
1280    /// Cached flat SST arrow schema
1281    flat_arrow_schema: SchemaRef,
1282    /// Compactor for merging bulk parts
1283    compactor: Arc<Mutex<MemtableCompactor>>,
1284    /// Whether the append mode is enabled
1285    append_mode: bool,
1286    /// Mode to handle duplicate rows while merging
1287    merge_mode: MergeMode,
1288}
1289
1290impl MemCompactTask {
1291    fn compact(&self) -> Result<()> {
1292        let mut compactor = self.compactor.lock().unwrap();
1293
1294        let should_merge = self
1295            .parts
1296            .read()
1297            .unwrap()
1298            .should_merge_parts(self.config.merge_threshold);
1299        if should_merge {
1300            compactor.merge_parts(
1301                &self.flat_arrow_schema,
1302                &self.parts,
1303                &self.metadata,
1304                !self.append_mode,
1305                self.merge_mode,
1306            )?;
1307        }
1308
1309        Ok(())
1310    }
1311}
1312
1313/// Scheduler to run compact tasks in background.
1314#[derive(Debug)]
1315pub struct CompactDispatcher {
1316    semaphore: Arc<Semaphore>,
1317}
1318
1319impl CompactDispatcher {
1320    /// Creates a new dispatcher with the given number of max concurrent tasks.
1321    pub fn new(permits: usize) -> Self {
1322        Self {
1323            semaphore: Arc::new(Semaphore::new(permits)),
1324        }
1325    }
1326
1327    /// Dispatches a compact task to run in background.
1328    fn dispatch_compact(&self, task: MemCompactTask) {
1329        let semaphore = self.semaphore.clone();
1330        common_runtime::spawn_global(async move {
1331            let Ok(_permit) = semaphore.acquire().await else {
1332                return;
1333            };
1334
1335            common_runtime::spawn_blocking_global(move || {
1336                if let Err(e) = task.compact() {
1337                    common_telemetry::error!(e; "Failed to compact memtable, region: {}", task.metadata.region_id);
1338                }
1339            });
1340        });
1341    }
1342}
1343
1344/// Builder to build a [BulkMemtable].
1345#[derive(Debug, Default)]
1346pub struct BulkMemtableBuilder {
1347    /// Configuration for the bulk memtable.
1348    config: BulkMemtableConfig,
1349    write_buffer_manager: Option<WriteBufferManagerRef>,
1350    compact_dispatcher: Option<Arc<CompactDispatcher>>,
1351    append_mode: bool,
1352    merge_mode: MergeMode,
1353}
1354
1355impl BulkMemtableBuilder {
1356    /// Creates a new builder with specific `write_buffer_manager`.
1357    pub fn new(
1358        write_buffer_manager: Option<WriteBufferManagerRef>,
1359        append_mode: bool,
1360        merge_mode: MergeMode,
1361    ) -> Self {
1362        Self {
1363            config: BulkMemtableConfig::default(),
1364            write_buffer_manager,
1365            compact_dispatcher: None,
1366            append_mode,
1367            merge_mode,
1368        }
1369    }
1370
1371    /// Sets the bulk memtable config.
1372    pub fn with_config(mut self, config: BulkMemtableConfig) -> Self {
1373        self.config = config;
1374        self
1375    }
1376
1377    /// Sets the compact dispatcher.
1378    pub fn with_compact_dispatcher(mut self, compact_dispatcher: Arc<CompactDispatcher>) -> Self {
1379        self.compact_dispatcher = Some(compact_dispatcher);
1380        self
1381    }
1382
1383    #[cfg(test)]
1384    pub(crate) fn config(&self) -> &BulkMemtableConfig {
1385        &self.config
1386    }
1387}
1388
1389impl MemtableBuilder for BulkMemtableBuilder {
1390    fn build(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef {
1391        Arc::new(BulkMemtable::new(
1392            id,
1393            self.config.clone(),
1394            metadata.clone(),
1395            self.write_buffer_manager.clone(),
1396            self.compact_dispatcher.clone(),
1397            self.append_mode,
1398            self.merge_mode,
1399        ))
1400    }
1401
1402    fn use_bulk_insert(&self, _metadata: &RegionMetadataRef) -> bool {
1403        true
1404    }
1405}
1406
1407#[cfg(test)]
1408mod tests {
1409    use mito_codec::row_converter::build_primary_key_codec;
1410
1411    use super::*;
1412    use crate::memtable::bulk::part::BulkPartConverter;
1413    use crate::read::scan_region::PredicateGroup;
1414    use crate::sst::{FlatSchemaOptions, to_flat_sst_arrow_schema};
1415    use crate::test_util::memtable_util::{build_key_values_with_ts_seq_values, metadata_for_test};
1416
1417    fn create_bulk_part_with_converter(
1418        k0: &str,
1419        k1: u32,
1420        timestamps: Vec<i64>,
1421        values: Vec<Option<f64>>,
1422        sequence: u64,
1423    ) -> Result<BulkPart> {
1424        let metadata = metadata_for_test();
1425        let capacity = 100;
1426        let primary_key_codec = build_primary_key_codec(&metadata);
1427        let schema = to_flat_sst_arrow_schema(
1428            &metadata,
1429            &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
1430        );
1431
1432        let mut converter =
1433            BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec, true);
1434
1435        let key_values = build_key_values_with_ts_seq_values(
1436            &metadata,
1437            k0.to_string(),
1438            k1,
1439            timestamps.into_iter(),
1440            values.into_iter(),
1441            sequence,
1442        );
1443
1444        converter.append_key_values(&key_values)?;
1445        converter.convert()
1446    }
1447
1448    #[test]
1449    fn test_bulk_memtable_sanitizes_zero_merge_threshold() {
1450        let metadata = metadata_for_test();
1451        let config = BulkMemtableConfig {
1452            merge_threshold: 0,
1453            ..Default::default()
1454        };
1455
1456        let memtable =
1457            BulkMemtable::new(999, config, metadata, None, None, false, MergeMode::LastRow);
1458
1459        assert_eq!(DEFAULT_MERGE_THRESHOLD, memtable.config.merge_threshold);
1460        assert_eq!(
1461            DEFAULT_MERGE_THRESHOLD,
1462            memtable.compactor.lock().unwrap().config.merge_threshold
1463        );
1464    }
1465
1466    #[test]
1467    fn test_bulk_memtable_write_read() {
1468        let metadata = metadata_for_test();
1469        let memtable = BulkMemtable::new(
1470            999,
1471            BulkMemtableConfig::default(),
1472            metadata.clone(),
1473            None,
1474            None,
1475            false,
1476            MergeMode::LastRow,
1477        );
1478        // Disable unordered_part for this test
1479        memtable.set_unordered_part_threshold(0);
1480
1481        let test_data = [
1482            (
1483                "key_a",
1484                1u32,
1485                vec![1000i64, 2000i64],
1486                vec![Some(10.5), Some(20.5)],
1487                100u64,
1488            ),
1489            (
1490                "key_b",
1491                2u32,
1492                vec![1500i64, 2500i64],
1493                vec![Some(15.5), Some(25.5)],
1494                200u64,
1495            ),
1496            ("key_c", 3u32, vec![3000i64], vec![Some(30.5)], 300u64),
1497        ];
1498
1499        for (k0, k1, timestamps, values, seq) in test_data.iter() {
1500            let part =
1501                create_bulk_part_with_converter(k0, *k1, timestamps.clone(), values.clone(), *seq)
1502                    .unwrap();
1503            memtable.write_bulk(part).unwrap();
1504        }
1505
1506        let stats = memtable.stats();
1507        assert_eq!(5, stats.num_rows);
1508        assert_eq!(3, stats.num_ranges);
1509        assert_eq!(300, stats.max_sequence);
1510
1511        let (min_ts, max_ts) = stats.time_range.unwrap();
1512        assert_eq!(1000, min_ts.value());
1513        assert_eq!(3000, max_ts.value());
1514
1515        let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
1516        let ranges = memtable
1517            .ranges(
1518                None,
1519                RangesOptions::default().with_predicate(predicate_group),
1520            )
1521            .unwrap();
1522
1523        assert_eq!(3, ranges.ranges.len());
1524        let total_rows: usize = ranges.ranges.values().map(|r| r.stats().num_rows()).sum();
1525        assert_eq!(5, total_rows);
1526
1527        for (_range_id, range) in ranges.ranges.iter() {
1528            assert!(range.num_rows() > 0);
1529            assert!(range.is_record_batch());
1530
1531            let record_batch_iter = range.build_record_batch_iter(None, None).unwrap();
1532
1533            let mut total_rows = 0;
1534            for batch_result in record_batch_iter {
1535                let batch = batch_result.unwrap();
1536                total_rows += batch.num_rows();
1537                assert!(batch.num_rows() > 0);
1538                assert_eq!(8, batch.num_columns());
1539            }
1540            assert_eq!(total_rows, range.num_rows());
1541        }
1542    }
1543
1544    #[test]
1545    fn test_bulk_memtable_ranges_with_projection() {
1546        let metadata = metadata_for_test();
1547        let memtable = BulkMemtable::new(
1548            111,
1549            BulkMemtableConfig::default(),
1550            metadata.clone(),
1551            None,
1552            None,
1553            false,
1554            MergeMode::LastRow,
1555        );
1556
1557        let bulk_part = create_bulk_part_with_converter(
1558            "projection_test",
1559            5,
1560            vec![5000, 6000, 7000],
1561            vec![Some(50.0), Some(60.0), Some(70.0)],
1562            500,
1563        )
1564        .unwrap();
1565
1566        memtable.write_bulk(bulk_part).unwrap();
1567
1568        let projection = vec![4u32];
1569        let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
1570        let ranges = memtable
1571            .ranges(
1572                Some(&projection),
1573                RangesOptions::default().with_predicate(predicate_group),
1574            )
1575            .unwrap();
1576
1577        assert_eq!(1, ranges.ranges.len());
1578        let range = ranges.ranges.get(&0).unwrap();
1579
1580        assert!(range.is_record_batch());
1581        let record_batch_iter = range.build_record_batch_iter(None, None).unwrap();
1582
1583        let mut total_rows = 0;
1584        for batch_result in record_batch_iter {
1585            let batch = batch_result.unwrap();
1586            assert!(batch.num_rows() > 0);
1587            assert_eq!(5, batch.num_columns());
1588            total_rows += batch.num_rows();
1589        }
1590        assert_eq!(3, total_rows);
1591    }
1592
1593    #[test]
1594    fn test_bulk_memtable_unsupported_operations() {
1595        let metadata = metadata_for_test();
1596        let memtable = BulkMemtable::new(
1597            111,
1598            BulkMemtableConfig::default(),
1599            metadata.clone(),
1600            None,
1601            None,
1602            false,
1603            MergeMode::LastRow,
1604        );
1605
1606        let key_values = build_key_values_with_ts_seq_values(
1607            &metadata,
1608            "test".to_string(),
1609            1,
1610            vec![1000].into_iter(),
1611            vec![Some(1.0)].into_iter(),
1612            1,
1613        );
1614
1615        let err = memtable.write(&key_values).unwrap_err();
1616        assert!(err.to_string().contains("not supported"));
1617
1618        let kv = key_values.iter().next().unwrap();
1619        let err = memtable.write_one(kv).unwrap_err();
1620        assert!(err.to_string().contains("not supported"));
1621    }
1622
1623    #[test]
1624    fn test_bulk_memtable_freeze() {
1625        let metadata = metadata_for_test();
1626        let memtable = BulkMemtable::new(
1627            222,
1628            BulkMemtableConfig::default(),
1629            metadata.clone(),
1630            None,
1631            None,
1632            false,
1633            MergeMode::LastRow,
1634        );
1635
1636        let bulk_part = create_bulk_part_with_converter(
1637            "freeze_test",
1638            10,
1639            vec![10000],
1640            vec![Some(100.0)],
1641            1000,
1642        )
1643        .unwrap();
1644
1645        memtable.write_bulk(bulk_part).unwrap();
1646        memtable.freeze().unwrap();
1647
1648        let stats_after_freeze = memtable.stats();
1649        assert_eq!(1, stats_after_freeze.num_rows);
1650    }
1651
1652    #[test]
1653    fn test_bulk_memtable_fork() {
1654        let metadata = metadata_for_test();
1655        let original_memtable = BulkMemtable::new(
1656            333,
1657            BulkMemtableConfig::default(),
1658            metadata.clone(),
1659            None,
1660            None,
1661            false,
1662            MergeMode::LastRow,
1663        );
1664
1665        let bulk_part =
1666            create_bulk_part_with_converter("fork_test", 15, vec![15000], vec![Some(150.0)], 1500)
1667                .unwrap();
1668
1669        original_memtable.write_bulk(bulk_part).unwrap();
1670
1671        let forked_memtable = original_memtable.fork(444, &metadata);
1672
1673        assert_eq!(forked_memtable.id(), 444);
1674        assert!(forked_memtable.is_empty());
1675        assert_eq!(0, forked_memtable.stats().num_rows);
1676
1677        assert!(!original_memtable.is_empty());
1678        assert_eq!(1, original_memtable.stats().num_rows);
1679    }
1680
1681    #[test]
1682    fn test_bulk_memtable_ranges_multiple_parts() {
1683        let metadata = metadata_for_test();
1684        let memtable = BulkMemtable::new(
1685            777,
1686            BulkMemtableConfig::default(),
1687            metadata.clone(),
1688            None,
1689            None,
1690            false,
1691            MergeMode::LastRow,
1692        );
1693        // Disable unordered_part for this test
1694        memtable.set_unordered_part_threshold(0);
1695
1696        let parts_data = vec![
1697            (
1698                "part1",
1699                1u32,
1700                vec![1000i64, 1100i64],
1701                vec![Some(10.0), Some(11.0)],
1702                100u64,
1703            ),
1704            (
1705                "part2",
1706                2u32,
1707                vec![2000i64, 2100i64],
1708                vec![Some(20.0), Some(21.0)],
1709                200u64,
1710            ),
1711            ("part3", 3u32, vec![3000i64], vec![Some(30.0)], 300u64),
1712        ];
1713
1714        for (k0, k1, timestamps, values, seq) in parts_data {
1715            let part = create_bulk_part_with_converter(k0, k1, timestamps, values, seq).unwrap();
1716            memtable.write_bulk(part).unwrap();
1717        }
1718
1719        let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
1720        let ranges = memtable
1721            .ranges(
1722                None,
1723                RangesOptions::default().with_predicate(predicate_group),
1724            )
1725            .unwrap();
1726
1727        assert_eq!(3, ranges.ranges.len());
1728        let total_rows: usize = ranges.ranges.values().map(|r| r.stats().num_rows()).sum();
1729        assert_eq!(5, total_rows);
1730        assert_eq!(3, ranges.ranges.len());
1731
1732        for (range_id, range) in ranges.ranges.iter() {
1733            assert!(*range_id < 3);
1734            assert!(range.num_rows() > 0);
1735            assert!(range.is_record_batch());
1736        }
1737    }
1738
1739    #[test]
1740    fn test_bulk_memtable_ranges_with_sequence_filter() {
1741        let metadata = metadata_for_test();
1742        let memtable = BulkMemtable::new(
1743            888,
1744            BulkMemtableConfig::default(),
1745            metadata.clone(),
1746            None,
1747            None,
1748            false,
1749            MergeMode::LastRow,
1750        );
1751
1752        let part = create_bulk_part_with_converter(
1753            "seq_test",
1754            1,
1755            vec![1000, 2000, 3000],
1756            vec![Some(10.0), Some(20.0), Some(30.0)],
1757            500,
1758        )
1759        .unwrap();
1760
1761        memtable.write_bulk(part).unwrap();
1762
1763        let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
1764        let sequence_filter = Some(SequenceRange::LtEq { max: 400 }); // Filters out rows with sequence > 400
1765        let ranges = memtable
1766            .ranges(
1767                None,
1768                RangesOptions::default()
1769                    .with_predicate(predicate_group)
1770                    .with_sequence(sequence_filter),
1771            )
1772            .unwrap();
1773
1774        assert_eq!(1, ranges.ranges.len());
1775        let range = ranges.ranges.get(&0).unwrap();
1776
1777        let mut record_batch_iter = range.build_record_batch_iter(None, None).unwrap();
1778        assert!(record_batch_iter.next().is_none());
1779    }
1780
1781    #[test]
1782    fn test_bulk_memtable_ranges_with_encoded_parts() {
1783        let metadata = metadata_for_test();
1784        let config = BulkMemtableConfig {
1785            merge_threshold: 8,
1786            ..Default::default()
1787        };
1788        let memtable = BulkMemtable::new(
1789            999,
1790            config,
1791            metadata.clone(),
1792            None,
1793            None,
1794            false,
1795            MergeMode::LastRow,
1796        );
1797        // Disable unordered_part for this test
1798        memtable.set_unordered_part_threshold(0);
1799
1800        // Adds enough bulk parts to trigger encoding
1801        for i in 0..10 {
1802            let part = create_bulk_part_with_converter(
1803                &format!("key_{}", i),
1804                i,
1805                vec![1000 + i as i64 * 100],
1806                vec![Some(i as f64 * 10.0)],
1807                100 + i as u64,
1808            )
1809            .unwrap();
1810            memtable.write_bulk(part).unwrap();
1811        }
1812
1813        memtable.compact(false).unwrap();
1814
1815        let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
1816        let ranges = memtable
1817            .ranges(
1818                None,
1819                RangesOptions::default().with_predicate(predicate_group),
1820            )
1821            .unwrap();
1822
1823        // Should have ranges for both bulk parts and encoded parts
1824        assert_eq!(3, ranges.ranges.len());
1825        let total_rows: usize = ranges.ranges.values().map(|r| r.stats().num_rows()).sum();
1826        assert_eq!(10, total_rows);
1827
1828        for (_range_id, range) in ranges.ranges.iter() {
1829            assert!(range.num_rows() > 0);
1830            assert!(range.is_record_batch());
1831
1832            let record_batch_iter = range.build_record_batch_iter(None, None).unwrap();
1833            let mut total_rows = 0;
1834            for batch_result in record_batch_iter {
1835                let batch = batch_result.unwrap();
1836                total_rows += batch.num_rows();
1837                assert!(batch.num_rows() > 0);
1838            }
1839            assert_eq!(total_rows, range.num_rows());
1840        }
1841    }
1842
1843    #[test]
1844    fn test_bulk_memtable_unordered_part() {
1845        let metadata = metadata_for_test();
1846        let memtable = BulkMemtable::new(
1847            1001,
1848            BulkMemtableConfig::default(),
1849            metadata.clone(),
1850            None,
1851            None,
1852            false,
1853            MergeMode::LastRow,
1854        );
1855
1856        // Set smaller thresholds for testing with smaller inputs
1857        // Accept parts with < 5 rows into unordered_part
1858        memtable.set_unordered_part_threshold(5);
1859        // Compact when total rows >= 10
1860        memtable.set_unordered_part_compact_threshold(10);
1861
1862        // Write 3 small parts (each has 2 rows), should be collected in unordered_part
1863        for i in 0..3 {
1864            let part = create_bulk_part_with_converter(
1865                &format!("key_{}", i),
1866                i,
1867                vec![1000 + i as i64 * 100, 1100 + i as i64 * 100],
1868                vec![Some(i as f64 * 10.0), Some(i as f64 * 10.0 + 1.0)],
1869                100 + i as u64,
1870            )
1871            .unwrap();
1872            assert_eq!(2, part.num_rows());
1873            memtable.write_bulk(part).unwrap();
1874        }
1875
1876        // Total rows = 6, not yet reaching compact threshold
1877        let stats = memtable.stats();
1878        assert_eq!(6, stats.num_rows);
1879
1880        // Write 2 more small parts (each has 2 rows)
1881        // This should trigger compaction when total >= 10
1882        for i in 3..5 {
1883            let part = create_bulk_part_with_converter(
1884                &format!("key_{}", i),
1885                i,
1886                vec![1000 + i as i64 * 100, 1100 + i as i64 * 100],
1887                vec![Some(i as f64 * 10.0), Some(i as f64 * 10.0 + 1.0)],
1888                100 + i as u64,
1889            )
1890            .unwrap();
1891            memtable.write_bulk(part).unwrap();
1892        }
1893
1894        // Total rows = 10, should have compacted unordered_part into a regular part
1895        let stats = memtable.stats();
1896        assert_eq!(10, stats.num_rows);
1897
1898        // Verify we can read all data correctly
1899        let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
1900        let ranges = memtable
1901            .ranges(
1902                None,
1903                RangesOptions::default().with_predicate(predicate_group),
1904            )
1905            .unwrap();
1906
1907        // Should have at least 1 range (the compacted part)
1908        assert!(!ranges.ranges.is_empty());
1909        let total_rows: usize = ranges.ranges.values().map(|r| r.stats().num_rows()).sum();
1910        assert_eq!(10, total_rows);
1911
1912        // Read all data and verify
1913        let mut total_rows_read = 0;
1914        for (_range_id, range) in ranges.ranges.iter() {
1915            assert!(range.is_record_batch());
1916            let record_batch_iter = range.build_record_batch_iter(None, None).unwrap();
1917
1918            for batch_result in record_batch_iter {
1919                let batch = batch_result.unwrap();
1920                total_rows_read += batch.num_rows();
1921            }
1922        }
1923        assert_eq!(10, total_rows_read);
1924    }
1925
1926    #[test]
1927    fn test_bulk_memtable_unordered_part_mixed_sizes() {
1928        let metadata = metadata_for_test();
1929        let memtable = BulkMemtable::new(
1930            1002,
1931            BulkMemtableConfig::default(),
1932            metadata.clone(),
1933            None,
1934            None,
1935            false,
1936            MergeMode::LastRow,
1937        );
1938
1939        // Set threshold to 4 rows - parts with < 4 rows go to unordered_part
1940        memtable.set_unordered_part_threshold(4);
1941        memtable.set_unordered_part_compact_threshold(8);
1942
1943        // Write small parts (3 rows each) - should go to unordered_part
1944        for i in 0..2 {
1945            let part = create_bulk_part_with_converter(
1946                &format!("small_{}", i),
1947                i,
1948                vec![1000 + i as i64, 2000 + i as i64, 3000 + i as i64],
1949                vec![Some(i as f64), Some(i as f64 + 1.0), Some(i as f64 + 2.0)],
1950                10 + i as u64,
1951            )
1952            .unwrap();
1953            assert_eq!(3, part.num_rows());
1954            memtable.write_bulk(part).unwrap();
1955        }
1956
1957        // Write a large part (5 rows) - should go directly to regular parts
1958        let large_part = create_bulk_part_with_converter(
1959            "large_key",
1960            100,
1961            vec![5000, 6000, 7000, 8000, 9000],
1962            vec![
1963                Some(100.0),
1964                Some(101.0),
1965                Some(102.0),
1966                Some(103.0),
1967                Some(104.0),
1968            ],
1969            50,
1970        )
1971        .unwrap();
1972        assert_eq!(5, large_part.num_rows());
1973        memtable.write_bulk(large_part).unwrap();
1974
1975        // Write another small part (2 rows) - should trigger compaction of unordered_part
1976        let part = create_bulk_part_with_converter(
1977            "small_2",
1978            2,
1979            vec![4000, 4100],
1980            vec![Some(20.0), Some(21.0)],
1981            30,
1982        )
1983        .unwrap();
1984        memtable.write_bulk(part).unwrap();
1985
1986        let stats = memtable.stats();
1987        assert_eq!(13, stats.num_rows); // 3 + 3 + 5 + 2 = 13
1988
1989        // Verify all data can be read
1990        let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
1991        let ranges = memtable
1992            .ranges(
1993                None,
1994                RangesOptions::default().with_predicate(predicate_group),
1995            )
1996            .unwrap();
1997
1998        let total_rows: usize = ranges.ranges.values().map(|r| r.stats().num_rows()).sum();
1999        assert_eq!(13, total_rows);
2000
2001        let mut total_rows_read = 0;
2002        for (_range_id, range) in ranges.ranges.iter() {
2003            let record_batch_iter = range.build_record_batch_iter(None, None).unwrap();
2004            for batch_result in record_batch_iter {
2005                let batch = batch_result.unwrap();
2006                total_rows_read += batch.num_rows();
2007            }
2008        }
2009        assert_eq!(13, total_rows_read);
2010    }
2011
2012    #[test]
2013    fn test_bulk_memtable_unordered_part_with_ranges() {
2014        let metadata = metadata_for_test();
2015        let memtable = BulkMemtable::new(
2016            1003,
2017            BulkMemtableConfig::default(),
2018            metadata.clone(),
2019            None,
2020            None,
2021            false,
2022            MergeMode::LastRow,
2023        );
2024
2025        // Set small thresholds
2026        memtable.set_unordered_part_threshold(3);
2027        memtable.set_unordered_part_compact_threshold(100); // High threshold to prevent auto-compaction
2028
2029        // Write several small parts that stay in unordered_part
2030        for i in 0..3 {
2031            let part = create_bulk_part_with_converter(
2032                &format!("key_{}", i),
2033                i,
2034                vec![1000 + i as i64 * 100],
2035                vec![Some(i as f64 * 10.0)],
2036                100 + i as u64,
2037            )
2038            .unwrap();
2039            assert_eq!(1, part.num_rows());
2040            memtable.write_bulk(part).unwrap();
2041        }
2042
2043        let stats = memtable.stats();
2044        assert_eq!(3, stats.num_rows);
2045
2046        // Test that ranges() can correctly read from unordered_part
2047        let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
2048        let ranges = memtable
2049            .ranges(
2050                None,
2051                RangesOptions::default().with_predicate(predicate_group),
2052            )
2053            .unwrap();
2054
2055        // Should have 1 range for the unordered_part
2056        assert_eq!(1, ranges.ranges.len());
2057        let total_rows: usize = ranges.ranges.values().map(|r| r.stats().num_rows()).sum();
2058        assert_eq!(3, total_rows);
2059
2060        // Verify data is sorted correctly in the range
2061        let range = ranges.ranges.get(&0).unwrap();
2062        let record_batch_iter = range.build_record_batch_iter(None, None).unwrap();
2063
2064        let mut total_rows = 0;
2065        for batch_result in record_batch_iter {
2066            let batch = batch_result.unwrap();
2067            total_rows += batch.num_rows();
2068            // Verify data is properly sorted by primary key
2069            assert!(batch.num_rows() > 0);
2070        }
2071        assert_eq!(3, total_rows);
2072    }
2073
2074    /// Helper to create a BulkPartWrapper from a BulkPart.
2075    fn create_bulk_part_wrapper(part: BulkPart) -> BulkPartWrapper {
2076        BulkPartWrapper {
2077            part: PartToMerge::Bulk {
2078                part,
2079                file_id: FileId::random(),
2080            },
2081            merging: false,
2082        }
2083    }
2084
2085    #[test]
2086    fn test_should_merge_parts_below_threshold() {
2087        let mut bulk_parts = BulkParts::default();
2088
2089        // Add 7 bulk parts (below DEFAULT_MERGE_THRESHOLD of 8)
2090        for i in 0..DEFAULT_MERGE_THRESHOLD - 1 {
2091            let part = create_bulk_part_with_converter(
2092                &format!("key_{}", i),
2093                i as u32,
2094                vec![1000 + i as i64 * 100],
2095                vec![Some(i as f64 * 10.0)],
2096                100 + i as u64,
2097            )
2098            .unwrap();
2099            bulk_parts.parts.push(create_bulk_part_wrapper(part));
2100        }
2101
2102        // Should not trigger merge since we have only 7 parts
2103        assert!(!bulk_parts.should_merge_parts(DEFAULT_MERGE_THRESHOLD));
2104    }
2105
2106    #[test]
2107    fn test_should_merge_parts_at_threshold() {
2108        let mut bulk_parts = BulkParts::default();
2109        let merge_threshold = 8;
2110
2111        // Add 8 bulk parts (at merge_threshold)
2112        for i in 0..merge_threshold {
2113            let part = create_bulk_part_with_converter(
2114                &format!("key_{}", i),
2115                i as u32,
2116                vec![1000 + i as i64 * 100],
2117                vec![Some(i as f64 * 10.0)],
2118                100 + i as u64,
2119            )
2120            .unwrap();
2121            bulk_parts.parts.push(create_bulk_part_wrapper(part));
2122        }
2123
2124        // Should trigger merge since we have 8 parts
2125        assert!(bulk_parts.should_merge_parts(merge_threshold));
2126    }
2127
2128    #[test]
2129    fn test_should_merge_parts_with_merging_flag() {
2130        let mut bulk_parts = BulkParts::default();
2131        let merge_threshold = 8;
2132
2133        // Add 10 bulk parts
2134        for i in 0..10 {
2135            let part = create_bulk_part_with_converter(
2136                &format!("key_{}", i),
2137                i as u32,
2138                vec![1000 + i as i64 * 100],
2139                vec![Some(i as f64 * 10.0)],
2140                100 + i as u64,
2141            )
2142            .unwrap();
2143            bulk_parts.parts.push(create_bulk_part_wrapper(part));
2144        }
2145
2146        // Should trigger merge since we have 10 parts
2147        assert!(bulk_parts.should_merge_parts(merge_threshold));
2148
2149        // Mark first 3 parts as merging
2150        for wrapper in bulk_parts.parts.iter_mut().take(3) {
2151            wrapper.merging = true;
2152        }
2153
2154        // Now only 7 parts are available for merging, should not trigger
2155        assert!(!bulk_parts.should_merge_parts(merge_threshold));
2156    }
2157
2158    #[test]
2159    fn test_collect_parts_to_merge_grouping() {
2160        let mut bulk_parts = BulkParts::default();
2161
2162        // Add 16 bulk parts with different row counts
2163        for i in 0..16 {
2164            let num_rows = (i % 4) + 1; // 1 to 4 rows
2165            let timestamps: Vec<i64> = (0..num_rows)
2166                .map(|j| 1000 + i as i64 * 100 + j as i64)
2167                .collect();
2168            let values: Vec<Option<f64>> =
2169                (0..num_rows).map(|j| Some((i * 10 + j) as f64)).collect();
2170            let part = create_bulk_part_with_converter(
2171                &format!("key_{}", i),
2172                i as u32,
2173                timestamps,
2174                values,
2175                100 + i as u64,
2176            )
2177            .unwrap();
2178            bulk_parts.parts.push(create_bulk_part_wrapper(part));
2179        }
2180
2181        // Should trigger merge since we have 16 parts
2182        assert!(bulk_parts.should_merge_parts(DEFAULT_MERGE_THRESHOLD));
2183
2184        // Collect parts to merge
2185        let collected =
2186            bulk_parts.collect_parts_to_merge(DEFAULT_MERGE_THRESHOLD, DEFAULT_MAX_MERGE_GROUPS);
2187
2188        // Should have groups
2189        assert!(!collected.groups.is_empty());
2190
2191        // All groups should have parts
2192        for group in &collected.groups {
2193            assert!(!group.is_empty());
2194        }
2195
2196        // Total parts collected should be 16
2197        let total_parts: usize = collected.groups.iter().map(|g| g.len()).sum();
2198        assert_eq!(16, total_parts);
2199    }
2200
2201    #[test]
2202    fn test_bulk_memtable_ranges_with_multi_bulk_part() {
2203        let metadata = metadata_for_test();
2204        let merge_threshold = 8;
2205        let config = BulkMemtableConfig {
2206            merge_threshold,
2207            ..Default::default()
2208        };
2209        let memtable = BulkMemtable::new(
2210            2005,
2211            config,
2212            metadata.clone(),
2213            None,
2214            None,
2215            false,
2216            MergeMode::LastRow,
2217        );
2218        // Disable unordered_part for this test
2219        memtable.set_unordered_part_threshold(0);
2220
2221        // Write enough bulk parts to trigger merge (merge_threshold = 8)
2222        // Each part has small number of rows so total < DEFAULT_ROW_GROUP_SIZE
2223        // This will result in MultiBulkPart after compaction
2224        for i in 0..merge_threshold {
2225            let part = create_bulk_part_with_converter(
2226                &format!("key_{}", i),
2227                i as u32,
2228                vec![1000 + i as i64 * 100, 2000 + i as i64 * 100],
2229                vec![Some(i as f64 * 10.0), Some(i as f64 * 10.0 + 1.0)],
2230                100 + i as u64,
2231            )
2232            .unwrap();
2233            memtable.write_bulk(part).unwrap();
2234        }
2235
2236        // Compact to trigger MultiBulkPart creation (since total rows < DEFAULT_ROW_GROUP_SIZE)
2237        memtable.compact(false).unwrap();
2238
2239        // Verify we can read from the memtable
2240        let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
2241        let ranges = memtable
2242            .ranges(
2243                None,
2244                RangesOptions::default().with_predicate(predicate_group),
2245            )
2246            .unwrap();
2247
2248        assert_eq!(1, ranges.ranges.len());
2249        let expected_rows = merge_threshold * 2; // Each part has 2 rows
2250        let total_rows: usize = ranges.ranges.values().map(|r| r.stats().num_rows()).sum();
2251        assert_eq!(expected_rows, total_rows);
2252
2253        // Read all data
2254        let mut total_rows_read = 0;
2255        for (_range_id, range) in ranges.ranges.iter() {
2256            assert!(range.is_record_batch());
2257            let record_batch_iter = range.build_record_batch_iter(None, None).unwrap();
2258
2259            for batch_result in record_batch_iter {
2260                let batch = batch_result.unwrap();
2261                total_rows_read += batch.num_rows();
2262            }
2263        }
2264        assert_eq!(expected_rows, total_rows_read);
2265    }
2266
2267    #[test]
2268    fn test_multi_bulk_range_iter_builder_all_pruned() {
2269        let metadata = metadata_for_test();
2270        let merge_threshold = 8;
2271        let config = BulkMemtableConfig {
2272            merge_threshold,
2273            ..Default::default()
2274        };
2275        let memtable = BulkMemtable::new(
2276            2006,
2277            config,
2278            metadata.clone(),
2279            None,
2280            None,
2281            false,
2282            MergeMode::LastRow,
2283        );
2284        memtable.set_unordered_part_threshold(0);
2285
2286        // Write enough bulk parts to trigger merge into MultiBulkPart.
2287        for i in 0..merge_threshold {
2288            let part = create_bulk_part_with_converter(
2289                &format!("key_{}", i),
2290                i as u32,
2291                vec![1000 + i as i64 * 100, 2000 + i as i64 * 100],
2292                vec![Some(i as f64 * 10.0), Some(i as f64 * 10.0 + 1.0)],
2293                100 + i as u64,
2294            )
2295            .unwrap();
2296            memtable.write_bulk(part).unwrap();
2297        }
2298        memtable.compact(false).unwrap();
2299
2300        // Use a predicate that matches no rows so all batches are pruned.
2301        let filter = datafusion_expr::col("k0").eq(datafusion_expr::lit("nonexistent"));
2302        let predicate_group = PredicateGroup::new(&metadata, &[filter]).unwrap();
2303        let ranges = memtable
2304            .ranges(
2305                None,
2306                RangesOptions::default().with_predicate(predicate_group),
2307            )
2308            .unwrap();
2309
2310        // Should return ranges but each range should produce an empty iterator
2311        // instead of an error.
2312        for (_range_id, range) in ranges.ranges.iter() {
2313            assert!(range.is_record_batch());
2314            let record_batch_iter = range.build_record_batch_iter(None, None).unwrap();
2315            let total_rows: usize = record_batch_iter.map(|r| r.unwrap().num_rows()).sum();
2316            assert_eq!(0, total_rows);
2317        }
2318    }
2319}