Skip to main content

mito2/memtable/
bulk.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Memtable implementation for bulk load
16
17pub(crate) mod chunk_reader;
18pub mod context;
19pub(crate) mod json_align;
20pub mod part;
21pub mod part_reader;
22mod row_group_reader;
23
24use std::collections::{BTreeMap, HashSet};
25use std::sync::atomic::{AtomicI64, AtomicU64, AtomicUsize, Ordering};
26use std::sync::{Arc, LazyLock, Mutex, RwLock};
27use std::time::Instant;
28
29/// Reads an environment variable as usize, returning default if not set or invalid.
30fn env_usize(name: &str, default: usize) -> usize {
31    std::env::var(name)
32        .ok()
33        .and_then(|v| v.parse().ok())
34        .unwrap_or(default)
35}
36
37use common_time::Timestamp;
38use datatypes::arrow::datatypes::SchemaRef;
39use mito_codec::key_values::KeyValue;
40use rayon::prelude::*;
41use serde::{Deserialize, Serialize};
42use serde_with::{DisplayFromStr, serde_as};
43use store_api::metadata::RegionMetadataRef;
44use store_api::storage::{ColumnId, FileId, RegionId, SequenceRange};
45use tokio::sync::Semaphore;
46
47use crate::error::{Result, UnsupportedOperationSnafu};
48use crate::flush::WriteBufferManagerRef;
49use crate::memtable::bulk::context::BulkIterContext;
50use crate::memtable::bulk::json_align::Json2Aligner;
51use crate::memtable::bulk::part::{
52    BulkPart, BulkPartEncodeMetrics, BulkPartEncoder, MultiBulkPart, UnorderedPart,
53    should_prune_bulk_part,
54};
55use crate::memtable::bulk::part_reader::BulkPartBatchIter;
56use crate::memtable::stats::WriteMetrics;
57use crate::memtable::{
58    AllocTracker, BoxedBatchIterator, BoxedRecordBatchIterator, EncodedBulkPart, EncodedRange,
59    IterBuilder, KeyValues, MemScanMetrics, Memtable, MemtableBuilder, MemtableId, MemtableRange,
60    MemtableRangeContext, MemtableRanges, MemtableRef, MemtableStats, RangesOptions,
61};
62use crate::read::flat_dedup::{FlatDedupIterator, FlatLastNonNull, FlatLastRow};
63use crate::read::flat_merge::FlatMergeIterator;
64use crate::region::options::MergeMode;
65use crate::sst::parquet::flat_format::field_column_start;
66use crate::sst::parquet::{DEFAULT_READ_BATCH_SIZE, DEFAULT_ROW_GROUP_SIZE};
67
68/// Default merge threshold for triggering compaction.
69const DEFAULT_MERGE_THRESHOLD: usize = 16;
70
71/// Threshold for triggering merge of parts. Configurable via `GREPTIME_BULK_MERGE_THRESHOLD`.
72static MERGE_THRESHOLD: LazyLock<usize> =
73    LazyLock::new(|| env_usize("GREPTIME_BULK_MERGE_THRESHOLD", DEFAULT_MERGE_THRESHOLD));
74
75/// Default maximum number of groups for parallel merging.
76const DEFAULT_MAX_MERGE_GROUPS: usize = 32;
77
78/// Maximum merge groups. Configurable via `GREPTIME_BULK_MAX_MERGE_GROUPS`.
79static MAX_MERGE_GROUPS: LazyLock<usize> =
80    LazyLock::new(|| env_usize("GREPTIME_BULK_MAX_MERGE_GROUPS", DEFAULT_MAX_MERGE_GROUPS));
81
82/// Row threshold for encoding parts. Configurable via `GREPTIME_BULK_ENCODE_ROW_THRESHOLD`.
83/// When estimated rows exceed this threshold, parts are encoded as EncodedBulkPart.
84pub(crate) static ENCODE_ROW_THRESHOLD: LazyLock<usize> = LazyLock::new(|| {
85    env_usize(
86        "GREPTIME_BULK_ENCODE_ROW_THRESHOLD",
87        10 * DEFAULT_ROW_GROUP_SIZE,
88    )
89});
90
91/// Default bytes threshold for encoding.
92const DEFAULT_ENCODE_BYTES_THRESHOLD: usize = 64 * 1024 * 1024;
93
94/// Bytes threshold for encoding parts. Configurable via `GREPTIME_BULK_ENCODE_BYTES_THRESHOLD`.
95/// When estimated bytes exceed this threshold, parts are encoded as EncodedBulkPart.
96static ENCODE_BYTES_THRESHOLD: LazyLock<usize> = LazyLock::new(|| {
97    env_usize(
98        "GREPTIME_BULK_ENCODE_BYTES_THRESHOLD",
99        DEFAULT_ENCODE_BYTES_THRESHOLD,
100    )
101});
102
103/// Configuration for bulk memtable.
104#[serde_as]
105#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
106#[serde(default)]
107pub struct BulkMemtableConfig {
108    /// Threshold for triggering merge of parts.
109    #[serde_as(as = "DisplayFromStr")]
110    pub merge_threshold: usize,
111    /// Row threshold for encoding parts.
112    #[serde_as(as = "DisplayFromStr")]
113    pub encode_row_threshold: usize,
114    /// Bytes threshold for encoding parts.
115    #[serde_as(as = "DisplayFromStr")]
116    pub encode_bytes_threshold: usize,
117    /// Maximum number of groups for parallel merging.
118    #[serde_as(as = "DisplayFromStr")]
119    pub max_merge_groups: usize,
120}
121
122impl Default for BulkMemtableConfig {
123    fn default() -> Self {
124        Self {
125            merge_threshold: *MERGE_THRESHOLD,
126            encode_row_threshold: *ENCODE_ROW_THRESHOLD,
127            encode_bytes_threshold: *ENCODE_BYTES_THRESHOLD,
128            max_merge_groups: *MAX_MERGE_GROUPS,
129        }
130        .sanitize()
131    }
132}
133
134impl BulkMemtableConfig {
135    fn sanitize(mut self) -> Self {
136        if self.merge_threshold == 0 {
137            self.merge_threshold = DEFAULT_MERGE_THRESHOLD;
138        }
139        self
140    }
141}
142
143/// Result of merging parts - either a MultiBulkPart or an EncodedBulkPart
144enum MergedPart {
145    /// Merged part stored as MultiBulkPart (when rows < DEFAULT_ROW_GROUP_SIZE)
146    Multi(MultiBulkPart),
147    /// Merged part stored as EncodedBulkPart (when rows >= DEFAULT_ROW_GROUP_SIZE)
148    Encoded(EncodedBulkPart),
149}
150
151/// Result of collecting parts to merge
152struct CollectedParts {
153    /// Groups of parts ready for merging (each group has up to 16 parts)
154    groups: Vec<Vec<PartToMerge>>,
155}
156
157/// All parts in a bulk memtable.
158#[derive(Default)]
159struct BulkParts {
160    /// Unordered small parts.
161    unordered_part: UnorderedPart,
162    /// All parts (raw and encoded).
163    parts: Vec<BulkPartWrapper>,
164}
165
166impl BulkParts {
167    /// Total number of parts (including unordered).
168    fn num_parts(&self) -> usize {
169        let unordered_count = if self.unordered_part.is_empty() { 0 } else { 1 };
170        self.parts.len() + unordered_count
171    }
172
173    /// Returns true if there is no part.
174    fn is_empty(&self) -> bool {
175        self.unordered_part.is_empty() && self.parts.is_empty()
176    }
177
178    /// Returns true if bulk parts or encoded parts should be merged.
179    /// Uses short-circuit counting to stop early once threshold is reached.
180    fn should_merge_parts(&self, merge_threshold: usize) -> bool {
181        let mut bulk_count = 0;
182        let mut encoded_count = 0;
183
184        for wrapper in &self.parts {
185            if wrapper.merging {
186                continue;
187            }
188
189            if wrapper.part.is_encoded() {
190                encoded_count += 1;
191            } else {
192                bulk_count += 1;
193            }
194
195            // Short-circuit: stop counting if either threshold is reached
196            if bulk_count >= merge_threshold || encoded_count >= merge_threshold {
197                return true;
198            }
199        }
200
201        false
202    }
203
204    /// Returns true if the unordered_part should be compacted into a BulkPart.
205    fn should_compact_unordered_part(&self) -> bool {
206        self.unordered_part.should_compact()
207    }
208
209    /// Collects unmerged parts and marks them as being merged.
210    /// Only collects parts of types that meet the threshold.
211    /// Parts are pre-grouped into chunks for parallel processing.
212    fn collect_parts_to_merge(
213        &mut self,
214        merge_threshold: usize,
215        max_merge_groups: usize,
216    ) -> CollectedParts {
217        // First pass: collect indices and row counts for each type
218        let mut bulk_indices: Vec<(usize, usize)> = Vec::new();
219        let mut encoded_indices: Vec<(usize, usize)> = Vec::new();
220
221        for (idx, wrapper) in self.parts.iter().enumerate() {
222            if wrapper.merging {
223                continue;
224            }
225            let num_rows = wrapper.part.num_rows();
226            if wrapper.part.is_encoded() {
227                encoded_indices.push((idx, num_rows));
228            } else {
229                bulk_indices.push((idx, num_rows));
230            }
231        }
232
233        let mut groups = Vec::new();
234
235        // Process bulk parts if threshold met
236        if bulk_indices.len() >= merge_threshold {
237            groups.extend(self.collect_and_group_parts(
238                bulk_indices,
239                merge_threshold,
240                max_merge_groups,
241            ));
242        }
243
244        // Process encoded parts if threshold met
245        if encoded_indices.len() >= merge_threshold {
246            groups.extend(self.collect_and_group_parts(
247                encoded_indices,
248                merge_threshold,
249                max_merge_groups,
250            ));
251        }
252
253        CollectedParts { groups }
254    }
255
256    /// Sorts indices by row count, groups into chunks, marks as merging, and returns groups.
257    fn collect_and_group_parts(
258        &mut self,
259        mut indices: Vec<(usize, usize)>,
260        merge_threshold: usize,
261        max_merge_groups: usize,
262    ) -> Vec<Vec<PartToMerge>> {
263        if indices.is_empty() {
264            return Vec::new();
265        }
266
267        // Sort by row count for better grouping
268        indices.sort_unstable_by_key(|(_, num_rows)| *num_rows);
269
270        // Group into chunks of merge_threshold size, limit to max_merge_groups
271        indices
272            .chunks(merge_threshold)
273            .take(max_merge_groups)
274            .map(|chunk| {
275                chunk
276                    .iter()
277                    .map(|(idx, _)| {
278                        let wrapper = &mut self.parts[*idx];
279                        wrapper.merging = true;
280                        wrapper.part.clone()
281                    })
282                    .collect()
283            })
284            .collect()
285    }
286
287    /// Installs merged parts and removes the original parts by file ids.
288    /// Returns the total number of rows in the merged parts.
289    fn install_merged_parts<I>(
290        &mut self,
291        merged_parts: I,
292        merged_file_ids: &HashSet<FileId>,
293    ) -> usize
294    where
295        I: IntoIterator<Item = MergedPart>,
296    {
297        let mut total_output_rows = 0;
298
299        for merged_part in merged_parts {
300            match merged_part {
301                MergedPart::Encoded(encoded_part) => {
302                    total_output_rows += encoded_part.metadata().num_rows;
303                    self.parts.push(BulkPartWrapper {
304                        part: PartToMerge::Encoded {
305                            part: encoded_part,
306                            file_id: FileId::random(),
307                        },
308                        merging: false,
309                    });
310                }
311                MergedPart::Multi(multi_part) => {
312                    total_output_rows += multi_part.num_rows();
313                    self.parts.push(BulkPartWrapper {
314                        part: PartToMerge::Multi {
315                            part: multi_part,
316                            file_id: FileId::random(),
317                        },
318                        merging: false,
319                    });
320                }
321            }
322        }
323
324        self.parts
325            .retain(|wrapper| !merged_file_ids.contains(&wrapper.file_id()));
326
327        total_output_rows
328    }
329
330    /// Resets merging flag for parts with the given file ids.
331    /// Used when merging fails or is cancelled.
332    fn reset_merging_flags(&mut self, file_ids: &HashSet<FileId>) {
333        for wrapper in &mut self.parts {
334            if file_ids.contains(&wrapper.file_id()) {
335                wrapper.merging = false;
336            }
337        }
338    }
339}
340
341/// RAII guard for managing merging flags.
342/// Automatically resets merging flags when dropped if the merge operation wasn't successful.
343struct MergingFlagsGuard<'a> {
344    bulk_parts: &'a RwLock<BulkParts>,
345    file_ids: &'a HashSet<FileId>,
346    success: bool,
347}
348
349impl<'a> MergingFlagsGuard<'a> {
350    /// Creates a new guard for the given file ids.
351    fn new(bulk_parts: &'a RwLock<BulkParts>, file_ids: &'a HashSet<FileId>) -> Self {
352        Self {
353            bulk_parts,
354            file_ids,
355            success: false,
356        }
357    }
358
359    /// Marks the merge operation as successful.
360    /// When this is called, the guard will not reset the flags on drop.
361    fn mark_success(&mut self) {
362        self.success = true;
363    }
364}
365
366impl<'a> Drop for MergingFlagsGuard<'a> {
367    fn drop(&mut self) {
368        if !self.success
369            && let Ok(mut parts) = self.bulk_parts.write()
370        {
371            parts.reset_merging_flags(self.file_ids);
372        }
373    }
374}
375
376/// Memtable that ingests and scans parts directly.
377pub struct BulkMemtable {
378    id: MemtableId,
379    /// Configuration for the bulk memtable.
380    config: BulkMemtableConfig,
381    parts: Arc<RwLock<BulkParts>>,
382    metadata: RegionMetadataRef,
383    alloc_tracker: AllocTracker,
384    max_timestamp: AtomicI64,
385    min_timestamp: AtomicI64,
386    max_sequence: AtomicU64,
387    num_rows: AtomicUsize,
388    /// Compactor for merging bulk parts
389    compactor: Arc<Mutex<MemtableCompactor>>,
390    /// Dispatcher for scheduling compaction tasks
391    compact_dispatcher: Option<Arc<CompactDispatcher>>,
392    /// Whether the append mode is enabled
393    append_mode: bool,
394    /// Mode to handle duplicate rows while merging
395    merge_mode: MergeMode,
396}
397
398impl std::fmt::Debug for BulkMemtable {
399    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
400        f.debug_struct("BulkMemtable")
401            .field("id", &self.id)
402            .field("num_rows", &self.num_rows.load(Ordering::Relaxed))
403            .field("min_timestamp", &self.min_timestamp.load(Ordering::Relaxed))
404            .field("max_timestamp", &self.max_timestamp.load(Ordering::Relaxed))
405            .field("max_sequence", &self.max_sequence.load(Ordering::Relaxed))
406            .finish()
407    }
408}
409
410impl Memtable for BulkMemtable {
411    fn id(&self) -> MemtableId {
412        self.id
413    }
414
415    fn write(&self, _kvs: &KeyValues) -> Result<()> {
416        UnsupportedOperationSnafu {
417            err_msg: "write() is not supported for bulk memtable",
418        }
419        .fail()
420    }
421
422    fn write_one(&self, _key_value: KeyValue) -> Result<()> {
423        UnsupportedOperationSnafu {
424            err_msg: "write_one() is not supported for bulk memtable",
425        }
426        .fail()
427    }
428
429    fn write_bulk(&self, fragment: BulkPart) -> Result<()> {
430        let local_metrics = WriteMetrics {
431            key_bytes: 0,
432            value_bytes: fragment.estimated_size(),
433            min_ts: fragment.min_timestamp,
434            max_ts: fragment.max_timestamp,
435            num_rows: fragment.num_rows(),
436            max_sequence: fragment.sequence,
437        };
438
439        {
440            let mut bulk_parts = self.parts.write().unwrap();
441
442            // Routes small parts to unordered_part based on threshold
443            if bulk_parts.unordered_part.should_accept(fragment.num_rows()) {
444                bulk_parts.unordered_part.push(fragment);
445
446                // Compacts unordered_part if threshold is reached
447                if bulk_parts.should_compact_unordered_part()
448                    && let Some(bulk_part) = bulk_parts.unordered_part.to_bulk_part()?
449                {
450                    bulk_parts.parts.push(BulkPartWrapper {
451                        part: PartToMerge::Bulk {
452                            part: bulk_part,
453                            file_id: FileId::random(),
454                        },
455                        merging: false,
456                    });
457                    bulk_parts.unordered_part.clear();
458                }
459            } else {
460                bulk_parts.parts.push(BulkPartWrapper {
461                    part: PartToMerge::Bulk {
462                        part: fragment,
463                        file_id: FileId::random(),
464                    },
465                    merging: false,
466                });
467            }
468
469            // Since this operation should be fast, we do it in parts lock scope.
470            // This ensure the statistics in `ranges()` are correct. What's more,
471            // it guarantees no rows are out of the time range so we don't need to
472            // prune rows by time range again in the iterator of the MemtableRange.
473            self.update_stats(local_metrics);
474        }
475
476        if self.should_compact() {
477            self.schedule_compact();
478        }
479
480        Ok(())
481    }
482
483    fn ranges(
484        &self,
485        projection: Option<&[ColumnId]>,
486        options: RangesOptions,
487    ) -> Result<MemtableRanges> {
488        let predicate = options.predicate;
489        let sequence = options.sequence;
490        let mut ranges = BTreeMap::new();
491        let mut range_id = 0;
492
493        // TODO(yingwen): Filter ranges by sequence.
494        let context = Arc::new(BulkIterContext::new_with_pre_filter_mode(
495            self.metadata.clone(),
496            projection,
497            predicate.predicate().cloned(),
498            options.for_flush,
499            options.pre_filter_mode,
500        )?);
501
502        // Adds ranges for regular parts and encoded parts
503        {
504            let bulk_parts = self.parts.read().unwrap();
505
506            // Adds range for unordered part if not empty
507            if !bulk_parts.unordered_part.is_empty()
508                && let Some(unordered_bulk_part) = bulk_parts.unordered_part.to_bulk_part()?
509            {
510                let part_stats = unordered_bulk_part.to_memtable_stats(&self.metadata);
511                let range = MemtableRange::new(
512                    Arc::new(MemtableRangeContext::new(
513                        self.id,
514                        Box::new(BulkRangeIterBuilder {
515                            part: unordered_bulk_part,
516                            context: context.clone(),
517                            sequence,
518                        }),
519                        predicate.clone(),
520                    )),
521                    part_stats,
522                );
523                ranges.insert(range_id, range);
524                range_id += 1;
525            }
526
527            // Adds ranges for all parts
528            for part_wrapper in bulk_parts.parts.iter() {
529                // Skips empty parts
530                if part_wrapper.part.num_rows() == 0 {
531                    continue;
532                }
533
534                let part_stats = part_wrapper.part.to_memtable_stats(&self.metadata);
535                let iter_builder: Box<dyn IterBuilder> = match &part_wrapper.part {
536                    PartToMerge::Bulk { part, .. } => Box::new(BulkRangeIterBuilder {
537                        part: part.clone(),
538                        context: context.clone(),
539                        sequence,
540                    }),
541                    PartToMerge::Multi { part, .. } => Box::new(MultiBulkRangeIterBuilder {
542                        part: part.clone(),
543                        context: context.clone(),
544                        sequence,
545                    }),
546                    PartToMerge::Encoded { part, file_id } => {
547                        Box::new(EncodedBulkRangeIterBuilder {
548                            file_id: *file_id,
549                            part: part.clone(),
550                            context: context.clone(),
551                            sequence,
552                        })
553                    }
554                };
555
556                let range = MemtableRange::new(
557                    Arc::new(MemtableRangeContext::new(
558                        self.id,
559                        iter_builder,
560                        predicate.clone(),
561                    )),
562                    part_stats,
563                );
564                ranges.insert(range_id, range);
565                range_id += 1;
566            }
567        }
568
569        Ok(MemtableRanges { ranges })
570    }
571
572    fn is_empty(&self) -> bool {
573        let bulk_parts = self.parts.read().unwrap();
574        bulk_parts.is_empty()
575    }
576
577    fn freeze(&self) -> Result<()> {
578        self.alloc_tracker.done_allocating();
579        Ok(())
580    }
581
582    fn stats(&self) -> MemtableStats {
583        let estimated_bytes = self.alloc_tracker.bytes_allocated();
584
585        if estimated_bytes == 0 || self.num_rows.load(Ordering::Relaxed) == 0 {
586            return MemtableStats {
587                estimated_bytes,
588                time_range: None,
589                num_rows: 0,
590                num_ranges: 0,
591                max_sequence: 0,
592                series_count: 0,
593            };
594        }
595
596        let ts_type = self
597            .metadata
598            .time_index_column()
599            .column_schema
600            .data_type
601            .clone()
602            .as_timestamp()
603            .expect("Timestamp column must have timestamp type");
604        let max_timestamp = ts_type.create_timestamp(self.max_timestamp.load(Ordering::Relaxed));
605        let min_timestamp = ts_type.create_timestamp(self.min_timestamp.load(Ordering::Relaxed));
606
607        let num_ranges = self.parts.read().unwrap().num_parts();
608
609        MemtableStats {
610            estimated_bytes,
611            time_range: Some((min_timestamp, max_timestamp)),
612            num_rows: self.num_rows.load(Ordering::Relaxed),
613            num_ranges,
614            max_sequence: self.max_sequence.load(Ordering::Relaxed),
615            series_count: self.estimated_series_count(),
616        }
617    }
618
619    fn fork(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef {
620        Arc::new(Self {
621            id,
622            config: self.config.clone(),
623            parts: Arc::new(RwLock::new(BulkParts::default())),
624            metadata: metadata.clone(),
625            alloc_tracker: AllocTracker::new(self.alloc_tracker.write_buffer_manager()),
626            max_timestamp: AtomicI64::new(i64::MIN),
627            min_timestamp: AtomicI64::new(i64::MAX),
628            max_sequence: AtomicU64::new(0),
629            num_rows: AtomicUsize::new(0),
630            compactor: Arc::new(Mutex::new(MemtableCompactor::new(
631                metadata.region_id,
632                id,
633                self.config.clone(),
634            ))),
635            compact_dispatcher: self.compact_dispatcher.clone(),
636            append_mode: self.append_mode,
637            merge_mode: self.merge_mode,
638        })
639    }
640
641    fn compact(&self, for_flush: bool) -> Result<()> {
642        let mut compactor = self.compactor.lock().unwrap();
643
644        if for_flush {
645            return Ok(());
646        }
647
648        // Unified merge for all parts
649        let should_merge = self
650            .parts
651            .read()
652            .unwrap()
653            .should_merge_parts(self.config.merge_threshold);
654        if should_merge {
655            compactor.merge_parts(
656                &self.parts,
657                &self.metadata,
658                !self.append_mode,
659                self.merge_mode,
660            )?;
661        }
662
663        Ok(())
664    }
665}
666
667impl BulkMemtable {
668    /// Creates a new BulkMemtable
669    pub fn new(
670        id: MemtableId,
671        config: BulkMemtableConfig,
672        metadata: RegionMetadataRef,
673        write_buffer_manager: Option<WriteBufferManagerRef>,
674        compact_dispatcher: Option<Arc<CompactDispatcher>>,
675        append_mode: bool,
676        merge_mode: MergeMode,
677    ) -> Self {
678        let config = config.sanitize();
679        let region_id = metadata.region_id;
680        Self {
681            id,
682            config: config.clone(),
683            parts: Arc::new(RwLock::new(BulkParts::default())),
684            metadata,
685            alloc_tracker: AllocTracker::new(write_buffer_manager),
686            max_timestamp: AtomicI64::new(i64::MIN),
687            min_timestamp: AtomicI64::new(i64::MAX),
688            max_sequence: AtomicU64::new(0),
689            num_rows: AtomicUsize::new(0),
690            compactor: Arc::new(Mutex::new(MemtableCompactor::new(region_id, id, config))),
691            compact_dispatcher,
692            append_mode,
693            merge_mode,
694        }
695    }
696
697    /// Sets the unordered part threshold (for testing).
698    #[cfg(test)]
699    pub fn set_unordered_part_threshold(&self, threshold: usize) {
700        self.parts
701            .write()
702            .unwrap()
703            .unordered_part
704            .set_threshold(threshold);
705    }
706
707    /// Sets the unordered part compact threshold (for testing).
708    #[cfg(test)]
709    pub fn set_unordered_part_compact_threshold(&self, compact_threshold: usize) {
710        self.parts
711            .write()
712            .unwrap()
713            .unordered_part
714            .set_compact_threshold(compact_threshold);
715    }
716
717    /// Updates memtable stats.
718    ///
719    /// Please update this inside the write lock scope.
720    fn update_stats(&self, stats: WriteMetrics) {
721        self.alloc_tracker
722            .on_allocation(stats.key_bytes + stats.value_bytes);
723
724        self.max_timestamp
725            .fetch_max(stats.max_ts, Ordering::Relaxed);
726        self.min_timestamp
727            .fetch_min(stats.min_ts, Ordering::Relaxed);
728        self.max_sequence
729            .fetch_max(stats.max_sequence, Ordering::Relaxed);
730        self.num_rows.fetch_add(stats.num_rows, Ordering::Relaxed);
731    }
732
733    /// Returns the estimated time series count.
734    fn estimated_series_count(&self) -> usize {
735        let bulk_parts = self.parts.read().unwrap();
736        bulk_parts
737            .parts
738            .iter()
739            .map(|part_wrapper| part_wrapper.part.series_count())
740            .sum()
741    }
742
743    /// Returns whether the memtable should be compacted.
744    fn should_compact(&self) -> bool {
745        let parts = self.parts.read().unwrap();
746        parts.should_merge_parts(self.config.merge_threshold)
747    }
748
749    /// Schedules a compaction task using the CompactDispatcher.
750    fn schedule_compact(&self) {
751        if let Some(dispatcher) = &self.compact_dispatcher {
752            let task = MemCompactTask {
753                metadata: self.metadata.clone(),
754                parts: self.parts.clone(),
755                config: self.config.clone(),
756                compactor: self.compactor.clone(),
757                append_mode: self.append_mode,
758                merge_mode: self.merge_mode,
759            };
760
761            dispatcher.dispatch_compact(task);
762        } else {
763            // Uses synchronous compaction if no dispatcher is available.
764            if let Err(e) = self.compact(false) {
765                common_telemetry::error!(e; "Failed to compact table");
766            }
767        }
768    }
769}
770
771/// Iterator builder for bulk range
772pub struct BulkRangeIterBuilder {
773    pub part: BulkPart,
774    pub context: Arc<BulkIterContext>,
775    pub sequence: Option<SequenceRange>,
776}
777
778/// Iterator builder for multi bulk range
779struct MultiBulkRangeIterBuilder {
780    part: MultiBulkPart,
781    context: Arc<BulkIterContext>,
782    sequence: Option<SequenceRange>,
783}
784
785impl IterBuilder for BulkRangeIterBuilder {
786    fn build(&self, _metrics: Option<MemScanMetrics>) -> Result<BoxedBatchIterator> {
787        UnsupportedOperationSnafu {
788            err_msg: "BatchIterator is not supported for bulk memtable",
789        }
790        .fail()
791    }
792
793    fn is_record_batch(&self) -> bool {
794        true
795    }
796
797    fn build_record_batch(
798        &self,
799        _time_range: Option<(Timestamp, Timestamp)>,
800        metrics: Option<MemScanMetrics>,
801    ) -> Result<BoxedRecordBatchIterator> {
802        let metadata = self.context.read_format().metadata();
803        if should_prune_bulk_part(&self.part.batch, &self.context, metadata) {
804            return Ok(Box::new(std::iter::empty()));
805        }
806
807        let series_count = self.part.estimated_series_count();
808        let iter = BulkPartBatchIter::from_single(
809            self.part.batch.clone(),
810            self.context.clone(),
811            self.sequence,
812            series_count,
813            metrics,
814        );
815
816        Ok(Box::new(iter))
817    }
818
819    fn record_batch_schema_hint(&self) -> Option<SchemaRef> {
820        Some(self.part.schema())
821    }
822
823    fn encoded_range(&self) -> Option<EncodedRange> {
824        None
825    }
826}
827
828impl IterBuilder for MultiBulkRangeIterBuilder {
829    fn build(&self, _metrics: Option<MemScanMetrics>) -> Result<BoxedBatchIterator> {
830        UnsupportedOperationSnafu {
831            err_msg: "BatchIterator is not supported for multi bulk memtable",
832        }
833        .fail()
834    }
835
836    fn is_record_batch(&self) -> bool {
837        true
838    }
839
840    fn build_record_batch(
841        &self,
842        _time_range: Option<(Timestamp, Timestamp)>,
843        metrics: Option<MemScanMetrics>,
844    ) -> Result<BoxedRecordBatchIterator> {
845        match self
846            .part
847            .read(self.context.clone(), self.sequence, metrics)?
848        {
849            Some(iter) => Ok(iter),
850            // All batches were pruned by the predicate. Return an empty iterator.
851            None => Ok(Box::new(std::iter::empty())),
852        }
853    }
854
855    fn record_batch_schema_hint(&self) -> Option<SchemaRef> {
856        self.part.schemas().next()
857    }
858
859    fn encoded_range(&self) -> Option<EncodedRange> {
860        None
861    }
862}
863
864/// Iterator builder for encoded bulk range
865struct EncodedBulkRangeIterBuilder {
866    file_id: FileId,
867    part: EncodedBulkPart,
868    context: Arc<BulkIterContext>,
869    sequence: Option<SequenceRange>,
870}
871
872impl IterBuilder for EncodedBulkRangeIterBuilder {
873    fn build(&self, _metrics: Option<MemScanMetrics>) -> Result<BoxedBatchIterator> {
874        UnsupportedOperationSnafu {
875            err_msg: "BatchIterator is not supported for encoded bulk memtable",
876        }
877        .fail()
878    }
879
880    fn is_record_batch(&self) -> bool {
881        true
882    }
883
884    fn build_record_batch(
885        &self,
886        _time_range: Option<(Timestamp, Timestamp)>,
887        metrics: Option<MemScanMetrics>,
888    ) -> Result<BoxedRecordBatchIterator> {
889        if let Some(iter) = self
890            .part
891            .read(self.context.clone(), self.sequence, metrics)?
892        {
893            Ok(iter)
894        } else {
895            // Return an empty iterator if no data to read
896            Ok(Box::new(std::iter::empty()))
897        }
898    }
899
900    fn record_batch_schema_hint(&self) -> Option<SchemaRef> {
901        Some(self.part.schema())
902    }
903
904    fn encoded_range(&self) -> Option<EncodedRange> {
905        Some(EncodedRange {
906            data: self.part.data().clone(),
907            sst_info: self.part.to_sst_info(self.file_id),
908        })
909    }
910}
911
912struct BulkPartWrapper {
913    /// The part to store. It already contains the file id.
914    part: PartToMerge,
915    /// Whether this part is currently being merged.
916    merging: bool,
917}
918
919impl BulkPartWrapper {
920    /// Returns the file id of this part.
921    fn file_id(&self) -> FileId {
922        self.part.file_id()
923    }
924}
925
926/// Enum to wrap different types of parts for unified merging.
927#[derive(Clone)]
928enum PartToMerge {
929    /// Raw bulk part.
930    Bulk { part: BulkPart, file_id: FileId },
931    /// Multiple bulk parts.
932    Multi {
933        part: MultiBulkPart,
934        file_id: FileId,
935    },
936    /// Encoded bulk part.
937    Encoded {
938        part: EncodedBulkPart,
939        file_id: FileId,
940    },
941}
942
943impl PartToMerge {
944    /// Gets the file ID of this part.
945    fn file_id(&self) -> FileId {
946        match self {
947            PartToMerge::Bulk { file_id, .. } => *file_id,
948            PartToMerge::Multi { file_id, .. } => *file_id,
949            PartToMerge::Encoded { file_id, .. } => *file_id,
950        }
951    }
952
953    /// Gets the minimum timestamp of this part.
954    fn min_timestamp(&self) -> i64 {
955        match self {
956            PartToMerge::Bulk { part, .. } => part.min_timestamp,
957            PartToMerge::Multi { part, .. } => part.min_timestamp(),
958            PartToMerge::Encoded { part, .. } => part.metadata().min_timestamp,
959        }
960    }
961
962    /// Gets the maximum timestamp of this part.
963    fn max_timestamp(&self) -> i64 {
964        match self {
965            PartToMerge::Bulk { part, .. } => part.max_timestamp,
966            PartToMerge::Multi { part, .. } => part.max_timestamp(),
967            PartToMerge::Encoded { part, .. } => part.metadata().max_timestamp,
968        }
969    }
970
971    /// Gets the number of rows in this part.
972    fn num_rows(&self) -> usize {
973        match self {
974            PartToMerge::Bulk { part, .. } => part.num_rows(),
975            PartToMerge::Multi { part, .. } => part.num_rows(),
976            PartToMerge::Encoded { part, .. } => part.metadata().num_rows,
977        }
978    }
979
980    /// Gets the maximum sequence number of this part.
981    fn max_sequence(&self) -> u64 {
982        match self {
983            PartToMerge::Bulk { part, .. } => part.sequence,
984            PartToMerge::Multi { part, .. } => part.max_sequence(),
985            PartToMerge::Encoded { part, .. } => part.metadata().max_sequence,
986        }
987    }
988
989    /// Gets the estimated series count in this part.
990    fn series_count(&self) -> usize {
991        match self {
992            PartToMerge::Bulk { part, .. } => part.estimated_series_count(),
993            PartToMerge::Multi { part, .. } => part.series_count(),
994            PartToMerge::Encoded { part, .. } => part.metadata().num_series as usize,
995        }
996    }
997
998    /// Returns true if this is an encoded part.
999    fn is_encoded(&self) -> bool {
1000        matches!(self, PartToMerge::Encoded { .. })
1001    }
1002
1003    /// Gets the estimated size in bytes of this part.
1004    fn estimated_size(&self) -> usize {
1005        match self {
1006            PartToMerge::Bulk { part, .. } => part.estimated_size(),
1007            PartToMerge::Multi { part, .. } => part.estimated_size(),
1008            PartToMerge::Encoded { part, .. } => part.size_bytes(),
1009        }
1010    }
1011
1012    /// Converts this part to `MemtableStats`.
1013    fn to_memtable_stats(&self, region_metadata: &RegionMetadataRef) -> MemtableStats {
1014        match self {
1015            PartToMerge::Bulk { part, .. } => part.to_memtable_stats(region_metadata),
1016            PartToMerge::Multi { part, .. } => part.to_memtable_stats(region_metadata),
1017            PartToMerge::Encoded { part, .. } => part.to_memtable_stats(),
1018        }
1019    }
1020
1021    /// Returns the Arrow schema of the record batches contained in this [`PartToMerge`].
1022    fn arrow_schema(&self) -> SchemaRef {
1023        match self {
1024            PartToMerge::Bulk { part, .. } => part.schema(),
1025            // A MultiBulkPart is built from batches that have already been aligned, so
1026            // all contained batches are expected to share the same arrow schema.
1027            PartToMerge::Multi { part, .. } => part
1028                .schemas()
1029                .next()
1030                .expect("MultiBulkPart must contain at least one record batch"),
1031            PartToMerge::Encoded { part, .. } => part.schema(),
1032        }
1033    }
1034
1035    /// Creates a record batch iterator for this part.
1036    fn create_iterator(
1037        self,
1038        context: Arc<BulkIterContext>,
1039    ) -> Result<Option<BoxedRecordBatchIterator>> {
1040        match self {
1041            PartToMerge::Bulk { part, .. } => {
1042                let series_count = part.estimated_series_count();
1043                let iter = BulkPartBatchIter::from_single(
1044                    part.batch,
1045                    context,
1046                    None, // No sequence filter for merging
1047                    series_count,
1048                    None, // No metrics for merging
1049                );
1050                Ok(Some(Box::new(iter) as BoxedRecordBatchIterator))
1051            }
1052            PartToMerge::Multi { part, .. } => part.read(context, None, None),
1053            PartToMerge::Encoded { part, .. } => part.read(context, None, None),
1054        }
1055    }
1056}
1057
1058struct MemtableCompactor {
1059    region_id: RegionId,
1060    memtable_id: MemtableId,
1061    /// Configuration for the bulk memtable.
1062    config: BulkMemtableConfig,
1063}
1064
1065impl MemtableCompactor {
1066    /// Creates a new MemtableCompactor.
1067    fn new(region_id: RegionId, memtable_id: MemtableId, config: BulkMemtableConfig) -> Self {
1068        Self {
1069            region_id,
1070            memtable_id,
1071            config,
1072        }
1073    }
1074
1075    /// Merges parts (bulk and encoded) and then encodes the result.
1076    fn merge_parts(
1077        &mut self,
1078        bulk_parts: &RwLock<BulkParts>,
1079        metadata: &RegionMetadataRef,
1080        dedup: bool,
1081        merge_mode: MergeMode,
1082    ) -> Result<()> {
1083        let start = Instant::now();
1084
1085        // Collect pre-grouped parts
1086        let collected = bulk_parts
1087            .write()
1088            .unwrap()
1089            .collect_parts_to_merge(self.config.merge_threshold, self.config.max_merge_groups);
1090
1091        if collected.groups.is_empty() {
1092            return Ok(());
1093        }
1094
1095        // Collect all file IDs for tracking
1096        let merged_file_ids: HashSet<FileId> = collected
1097            .groups
1098            .iter()
1099            .flatten()
1100            .map(|part| part.file_id())
1101            .collect();
1102        let mut guard = MergingFlagsGuard::new(bulk_parts, &merged_file_ids);
1103
1104        let num_groups = collected.groups.len();
1105        let num_parts: usize = collected.groups.iter().map(|g| g.len()).sum();
1106
1107        let encode_row_threshold = self.config.encode_row_threshold;
1108        let encode_bytes_threshold = self.config.encode_bytes_threshold;
1109
1110        // Merge all groups in parallel
1111        let merged_parts = collected
1112            .groups
1113            .into_par_iter()
1114            .map(|group| {
1115                Self::merge_parts_group(
1116                    group,
1117                    metadata,
1118                    dedup,
1119                    merge_mode,
1120                    encode_row_threshold,
1121                    encode_bytes_threshold,
1122                )
1123            })
1124            .collect::<Result<Vec<Option<MergedPart>>>>()?;
1125
1126        // Install all merged parts
1127        let total_output_rows = {
1128            let mut parts = bulk_parts.write().unwrap();
1129            parts.install_merged_parts(merged_parts.into_iter().flatten(), &merged_file_ids)
1130        };
1131
1132        guard.mark_success();
1133
1134        common_telemetry::debug!(
1135            "BulkMemtable {} {} concurrent compact {} groups, {} parts, {} rows, cost: {:?}",
1136            self.region_id,
1137            self.memtable_id,
1138            num_groups,
1139            num_parts,
1140            total_output_rows,
1141            start.elapsed()
1142        );
1143
1144        Ok(())
1145    }
1146
1147    /// Merges a group of parts into a single part (either MultiBulkPart or EncodedBulkPart).
1148    fn merge_parts_group(
1149        parts_to_merge: Vec<PartToMerge>,
1150        metadata: &RegionMetadataRef,
1151        dedup: bool,
1152        merge_mode: MergeMode,
1153        encode_row_threshold: usize,
1154        encode_bytes_threshold: usize,
1155    ) -> Result<Option<MergedPart>> {
1156        if parts_to_merge.is_empty() {
1157            return Ok(None);
1158        }
1159
1160        // Calculates timestamp bounds and statistics for merged data
1161        let min_timestamp = parts_to_merge
1162            .iter()
1163            .map(|p| p.min_timestamp())
1164            .min()
1165            .unwrap_or(i64::MAX);
1166        let max_timestamp = parts_to_merge
1167            .iter()
1168            .map(|p| p.max_timestamp())
1169            .max()
1170            .unwrap_or(i64::MIN);
1171        let max_sequence = parts_to_merge
1172            .iter()
1173            .map(|p| p.max_sequence())
1174            .max()
1175            .unwrap_or(0);
1176
1177        // Collects statistics from parts before creating iterators
1178        let estimated_total_rows: usize = parts_to_merge.iter().map(|p| p.num_rows()).sum();
1179        let estimated_total_bytes: usize = parts_to_merge.iter().map(|p| p.estimated_size()).sum();
1180        let estimated_series_count = parts_to_merge
1181            .iter()
1182            .map(|p| p.series_count())
1183            .max()
1184            .unwrap_or(0);
1185
1186        let context = Arc::new(BulkIterContext::new(
1187            metadata.clone(),
1188            None, // No column projection for merging
1189            None, // No predicate for merging
1190            true,
1191        )?);
1192
1193        let aligner = Json2Aligner::try_new(parts_to_merge.iter().map(PartToMerge::arrow_schema))?;
1194
1195        let iterators: Vec<BoxedRecordBatchIterator> = parts_to_merge
1196            .into_iter()
1197            .filter_map(|part| part.create_iterator(context.clone()).ok().flatten())
1198            .map(|iter| aligner.wrap_iter(iter))
1199            .collect();
1200
1201        if iterators.is_empty() {
1202            return Ok(None);
1203        }
1204
1205        let merged_iter =
1206            FlatMergeIterator::new(aligner.schema().clone(), iterators, DEFAULT_READ_BATCH_SIZE)?;
1207
1208        let boxed_iter: BoxedRecordBatchIterator = if dedup {
1209            match merge_mode {
1210                MergeMode::LastRow => {
1211                    let dedup_iter = FlatDedupIterator::new(merged_iter, FlatLastRow::new(false));
1212                    Box::new(dedup_iter)
1213                }
1214                MergeMode::LastNonNull => {
1215                    let field_column_start =
1216                        field_column_start(metadata, aligner.schema().fields().len());
1217
1218                    let dedup_iter = FlatDedupIterator::new(
1219                        merged_iter,
1220                        FlatLastNonNull::new(field_column_start, false),
1221                    );
1222                    Box::new(dedup_iter)
1223                }
1224            }
1225        } else {
1226            Box::new(merged_iter)
1227        };
1228
1229        // Encode as EncodedBulkPart if rows exceed row threshold OR bytes exceed bytes threshold
1230        if estimated_total_rows > encode_row_threshold
1231            || estimated_total_bytes > encode_bytes_threshold
1232        {
1233            let encoder = BulkPartEncoder::new(metadata.clone(), DEFAULT_ROW_GROUP_SIZE)?;
1234            let mut metrics = BulkPartEncodeMetrics::default();
1235            let encoded_part = encoder.encode_record_batch_iter(
1236                boxed_iter,
1237                aligner.schema().clone(),
1238                min_timestamp,
1239                max_timestamp,
1240                max_sequence,
1241                &mut metrics,
1242            )?;
1243
1244            common_telemetry::trace!("merge_parts_group metrics: {:?}", metrics);
1245
1246            Ok(encoded_part.map(MergedPart::Encoded))
1247        } else {
1248            // Otherwise, collect into MultiBulkPart
1249            let mut batches = Vec::new();
1250            let mut actual_total_rows = 0;
1251
1252            for batch_result in boxed_iter {
1253                let batch = batch_result?;
1254                actual_total_rows += batch.num_rows();
1255                batches.push(batch);
1256            }
1257
1258            if actual_total_rows == 0 {
1259                return Ok(None);
1260            }
1261
1262            let multi_part = MultiBulkPart::new(
1263                batches,
1264                min_timestamp,
1265                max_timestamp,
1266                max_sequence,
1267                estimated_series_count,
1268                metadata,
1269            );
1270
1271            common_telemetry::trace!(
1272                "merge_parts_group created MultiBulkPart: rows={}, batches={}",
1273                actual_total_rows,
1274                multi_part.num_batches()
1275            );
1276
1277            Ok(Some(MergedPart::Multi(multi_part)))
1278        }
1279    }
1280}
1281
1282/// A memtable compact task to run in background.
1283struct MemCompactTask {
1284    metadata: RegionMetadataRef,
1285    parts: Arc<RwLock<BulkParts>>,
1286    /// Configuration for the bulk memtable.
1287    config: BulkMemtableConfig,
1288    /// Compactor for merging bulk parts
1289    compactor: Arc<Mutex<MemtableCompactor>>,
1290    /// Whether the append mode is enabled
1291    append_mode: bool,
1292    /// Mode to handle duplicate rows while merging
1293    merge_mode: MergeMode,
1294}
1295
1296impl MemCompactTask {
1297    fn compact(&self) -> Result<()> {
1298        let mut compactor = self.compactor.lock().unwrap();
1299
1300        let should_merge = self
1301            .parts
1302            .read()
1303            .unwrap()
1304            .should_merge_parts(self.config.merge_threshold);
1305        if should_merge {
1306            compactor.merge_parts(
1307                &self.parts,
1308                &self.metadata,
1309                !self.append_mode,
1310                self.merge_mode,
1311            )?;
1312        }
1313
1314        Ok(())
1315    }
1316}
1317
1318/// Scheduler to run compact tasks in background.
1319#[derive(Debug)]
1320pub struct CompactDispatcher {
1321    semaphore: Arc<Semaphore>,
1322}
1323
1324impl CompactDispatcher {
1325    /// Creates a new dispatcher with the given number of max concurrent tasks.
1326    pub fn new(permits: usize) -> Self {
1327        Self {
1328            semaphore: Arc::new(Semaphore::new(permits)),
1329        }
1330    }
1331
1332    /// Dispatches a compact task to run in background.
1333    fn dispatch_compact(&self, task: MemCompactTask) {
1334        let semaphore = self.semaphore.clone();
1335        common_runtime::spawn_global(async move {
1336            let Ok(_permit) = semaphore.acquire().await else {
1337                return;
1338            };
1339
1340            common_runtime::spawn_blocking_global(move || {
1341                if let Err(e) = task.compact() {
1342                    common_telemetry::error!(e; "Failed to compact memtable, region: {}", task.metadata.region_id);
1343                }
1344            });
1345        });
1346    }
1347}
1348
1349/// Builder to build a [BulkMemtable].
1350#[derive(Debug, Default)]
1351pub struct BulkMemtableBuilder {
1352    /// Configuration for the bulk memtable.
1353    config: BulkMemtableConfig,
1354    write_buffer_manager: Option<WriteBufferManagerRef>,
1355    compact_dispatcher: Option<Arc<CompactDispatcher>>,
1356    append_mode: bool,
1357    merge_mode: MergeMode,
1358}
1359
1360impl BulkMemtableBuilder {
1361    /// Creates a new builder with specific `write_buffer_manager`.
1362    pub fn new(
1363        write_buffer_manager: Option<WriteBufferManagerRef>,
1364        append_mode: bool,
1365        merge_mode: MergeMode,
1366    ) -> Self {
1367        Self {
1368            config: BulkMemtableConfig::default(),
1369            write_buffer_manager,
1370            compact_dispatcher: None,
1371            append_mode,
1372            merge_mode,
1373        }
1374    }
1375
1376    /// Sets the bulk memtable config.
1377    pub fn with_config(mut self, config: BulkMemtableConfig) -> Self {
1378        self.config = config;
1379        self
1380    }
1381
1382    /// Sets the compact dispatcher.
1383    pub fn with_compact_dispatcher(mut self, compact_dispatcher: Arc<CompactDispatcher>) -> Self {
1384        self.compact_dispatcher = Some(compact_dispatcher);
1385        self
1386    }
1387
1388    #[cfg(test)]
1389    pub(crate) fn config(&self) -> &BulkMemtableConfig {
1390        &self.config
1391    }
1392}
1393
1394impl MemtableBuilder for BulkMemtableBuilder {
1395    fn build(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef {
1396        Arc::new(BulkMemtable::new(
1397            id,
1398            self.config.clone(),
1399            metadata.clone(),
1400            self.write_buffer_manager.clone(),
1401            self.compact_dispatcher.clone(),
1402            self.append_mode,
1403            self.merge_mode,
1404        ))
1405    }
1406
1407    fn use_bulk_insert(&self, _metadata: &RegionMetadataRef) -> bool {
1408        true
1409    }
1410}
1411
1412#[cfg(test)]
1413mod tests {
1414    use api::helper::encode_json_value;
1415    use api::v1::value::ValueData;
1416    use api::v1::{Mutation, Row, Rows, SemanticType};
1417    use datatypes::data_type::ConcreteDataType;
1418    use datatypes::extension::json::{JsonExtensionType, JsonMetadata};
1419    use datatypes::json::value::JsonValue;
1420    use datatypes::schema::ColumnSchema;
1421    use datatypes::types::json_type::{JsonNativeType, JsonObjectType};
1422    use mito_codec::row_converter::build_primary_key_codec;
1423    use serde_json::json;
1424    use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder, RegionMetadataRef};
1425
1426    use super::*;
1427    use crate::memtable::bulk::part::BulkPartConverter;
1428    use crate::read::scan_region::PredicateGroup;
1429    use crate::sst::{FlatSchemaOptions, to_flat_sst_arrow_schema};
1430    use crate::test_util::memtable_util::{
1431        build_key_values_with_ts_seq_values, metadata_for_test, region_metadata_to_row_schema,
1432    };
1433
1434    fn create_bulk_part_with_converter(
1435        k0: &str,
1436        k1: u32,
1437        timestamps: Vec<i64>,
1438        values: Vec<Option<f64>>,
1439        sequence: u64,
1440    ) -> Result<BulkPart> {
1441        let metadata = metadata_for_test();
1442        let capacity = 100;
1443        let primary_key_codec = build_primary_key_codec(&metadata);
1444        let schema = to_flat_sst_arrow_schema(
1445            &metadata,
1446            &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
1447        );
1448
1449        let mut converter =
1450            BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec, true);
1451
1452        let key_values = build_key_values_with_ts_seq_values(
1453            &metadata,
1454            k0.to_string(),
1455            k1,
1456            timestamps.into_iter(),
1457            values.into_iter(),
1458            sequence,
1459        );
1460
1461        converter.append_key_values(&key_values)?;
1462        converter.convert()
1463    }
1464
1465    #[test]
1466    fn test_bulk_memtable_sanitizes_zero_merge_threshold() {
1467        let metadata = metadata_for_test();
1468        let config = BulkMemtableConfig {
1469            merge_threshold: 0,
1470            ..Default::default()
1471        };
1472
1473        let memtable =
1474            BulkMemtable::new(999, config, metadata, None, None, false, MergeMode::LastRow);
1475
1476        assert_eq!(DEFAULT_MERGE_THRESHOLD, memtable.config.merge_threshold);
1477        assert_eq!(
1478            DEFAULT_MERGE_THRESHOLD,
1479            memtable.compactor.lock().unwrap().config.merge_threshold
1480        );
1481    }
1482
1483    #[test]
1484    fn test_bulk_memtable_write_read() {
1485        let metadata = metadata_for_test();
1486        let memtable = BulkMemtable::new(
1487            999,
1488            BulkMemtableConfig::default(),
1489            metadata.clone(),
1490            None,
1491            None,
1492            false,
1493            MergeMode::LastRow,
1494        );
1495        // Disable unordered_part for this test
1496        memtable.set_unordered_part_threshold(0);
1497
1498        let test_data = [
1499            (
1500                "key_a",
1501                1u32,
1502                vec![1000i64, 2000i64],
1503                vec![Some(10.5), Some(20.5)],
1504                100u64,
1505            ),
1506            (
1507                "key_b",
1508                2u32,
1509                vec![1500i64, 2500i64],
1510                vec![Some(15.5), Some(25.5)],
1511                200u64,
1512            ),
1513            ("key_c", 3u32, vec![3000i64], vec![Some(30.5)], 300u64),
1514        ];
1515
1516        for (k0, k1, timestamps, values, seq) in test_data.iter() {
1517            let part =
1518                create_bulk_part_with_converter(k0, *k1, timestamps.clone(), values.clone(), *seq)
1519                    .unwrap();
1520            memtable.write_bulk(part).unwrap();
1521        }
1522
1523        let stats = memtable.stats();
1524        assert_eq!(5, stats.num_rows);
1525        assert_eq!(3, stats.num_ranges);
1526        assert_eq!(300, stats.max_sequence);
1527
1528        let (min_ts, max_ts) = stats.time_range.unwrap();
1529        assert_eq!(1000, min_ts.value());
1530        assert_eq!(3000, max_ts.value());
1531
1532        let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
1533        let ranges = memtable
1534            .ranges(
1535                None,
1536                RangesOptions::default().with_predicate(predicate_group),
1537            )
1538            .unwrap();
1539
1540        assert_eq!(3, ranges.ranges.len());
1541        let total_rows: usize = ranges.ranges.values().map(|r| r.stats().num_rows()).sum();
1542        assert_eq!(5, total_rows);
1543
1544        for (_range_id, range) in ranges.ranges.iter() {
1545            assert!(range.num_rows() > 0);
1546            assert!(range.is_record_batch());
1547
1548            let record_batch_iter = range.build_record_batch_iter(None, None).unwrap();
1549
1550            let mut total_rows = 0;
1551            for batch_result in record_batch_iter {
1552                let batch = batch_result.unwrap();
1553                total_rows += batch.num_rows();
1554                assert!(batch.num_rows() > 0);
1555                assert_eq!(8, batch.num_columns());
1556            }
1557            assert_eq!(total_rows, range.num_rows());
1558        }
1559    }
1560
1561    #[test]
1562    fn test_bulk_memtable_compact_parts_with_json2() {
1563        let metadata = mock_metadata_with_json2();
1564
1565        let config = BulkMemtableConfig {
1566            merge_threshold: 2,
1567            encode_row_threshold: 1,
1568            encode_bytes_threshold: 1,
1569            ..Default::default()
1570        };
1571        let memtable = BulkMemtable::new(
1572            999,
1573            config,
1574            metadata.clone(),
1575            None,
1576            None,
1577            true,
1578            MergeMode::LastRow,
1579        );
1580        memtable.set_unordered_part_threshold(0);
1581
1582        let part1 = mock_bulk_part_with_json2(&metadata, vec![1000, 2000], 100).unwrap();
1583        let part2 = mock_bulk_part_with_json2(&metadata, vec![3000, 4000], 200).unwrap();
1584
1585        memtable.write_bulk(part1).unwrap();
1586        memtable.write_bulk(part2).unwrap();
1587        memtable.compact(false).unwrap();
1588
1589        let stats = memtable.stats();
1590        assert_eq!(4, stats.num_rows);
1591        assert_eq!(201, stats.max_sequence);
1592
1593        let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
1594        let opts = RangesOptions::default().with_predicate(predicate_group);
1595        let ranges = memtable.ranges(None, opts).unwrap();
1596
1597        assert_eq!(1, ranges.ranges.len());
1598        let total_rows: usize = ranges.ranges.values().map(|r| r.stats().num_rows()).sum();
1599        assert_eq!(4, total_rows);
1600    }
1601
1602    fn mock_metadata_with_json2() -> RegionMetadataRef {
1603        let col_meta_1 = ColumnMetadata {
1604            column_schema: ColumnSchema::new(
1605                "ts",
1606                ConcreteDataType::timestamp_millisecond_datatype(),
1607                false,
1608            ),
1609            semantic_type: SemanticType::Timestamp,
1610            column_id: 0,
1611        };
1612
1613        let data_type = ConcreteDataType::json2(JsonNativeType::Object(JsonObjectType::new()));
1614        let mut col_schema = ColumnSchema::new("data", data_type, true);
1615        let extension = JsonExtensionType::new(Arc::new(JsonMetadata::default()));
1616        col_schema.with_extension_type(&extension).unwrap();
1617
1618        let col_meta_2 = ColumnMetadata {
1619            column_schema: col_schema,
1620            semantic_type: SemanticType::Field,
1621            column_id: 1,
1622        };
1623        let mut builder = RegionMetadataBuilder::new(RegionId::new(123, 789));
1624        builder
1625            .push_column_metadata(col_meta_1)
1626            .push_column_metadata(col_meta_2)
1627            .primary_key(vec![]);
1628        Arc::new(builder.build().unwrap())
1629    }
1630
1631    fn mock_bulk_part_with_json2(
1632        metadata: &RegionMetadataRef,
1633        timestamps: Vec<i64>,
1634        sequence: u64,
1635    ) -> Result<BulkPart> {
1636        let capacity = timestamps.len();
1637        let primary_key_codec = build_primary_key_codec(metadata);
1638        let json_type = JsonNativeType::Object(JsonObjectType::from([
1639            ("id".to_string(), JsonNativeType::i64()),
1640            (
1641                "payload".to_string(),
1642                JsonNativeType::Object(JsonObjectType::from([(
1643                    "message".to_string(),
1644                    JsonNativeType::String,
1645                )])),
1646            ),
1647        ]));
1648        let mut options = FlatSchemaOptions::from_encoding(metadata.primary_key_encoding);
1649        options
1650            .concretized_json_types
1651            .insert("data".to_string(), json_type.as_arrow_type());
1652        let schema = to_flat_sst_arrow_schema(metadata, &options);
1653
1654        let mut converter =
1655            BulkPartConverter::new(metadata, schema, capacity, primary_key_codec, true);
1656
1657        let rows = timestamps
1658            .into_iter()
1659            .map(|ts| {
1660                let val1 = api::v1::Value {
1661                    value_data: Some(ValueData::TimestampMillisecondValue(ts)),
1662                };
1663                let value_data = ValueData::JsonValue(encode_json_value(JsonValue::from(json!({
1664                    "id": ts,
1665                    "payload": {
1666                        "message": format!("row-{ts}"),
1667                    },
1668                }))));
1669                let val2 = api::v1::Value {
1670                    value_data: Some(value_data),
1671                };
1672                Row {
1673                    values: vec![val1, val2],
1674                }
1675            })
1676            .collect();
1677
1678        let mutation = Mutation {
1679            op_type: 1,
1680            sequence,
1681            rows: Some(Rows {
1682                schema: region_metadata_to_row_schema(metadata),
1683                rows,
1684            }),
1685            write_hint: None,
1686        };
1687        let key_values = KeyValues::new(metadata.as_ref(), mutation).unwrap();
1688
1689        converter.append_key_values(&key_values)?;
1690        converter.convert()
1691    }
1692
1693    #[test]
1694    fn test_bulk_memtable_ranges_with_projection() {
1695        let metadata = metadata_for_test();
1696        let memtable = BulkMemtable::new(
1697            111,
1698            BulkMemtableConfig::default(),
1699            metadata.clone(),
1700            None,
1701            None,
1702            false,
1703            MergeMode::LastRow,
1704        );
1705
1706        let bulk_part = create_bulk_part_with_converter(
1707            "projection_test",
1708            5,
1709            vec![5000, 6000, 7000],
1710            vec![Some(50.0), Some(60.0), Some(70.0)],
1711            500,
1712        )
1713        .unwrap();
1714
1715        memtable.write_bulk(bulk_part).unwrap();
1716
1717        let projection = vec![4u32];
1718        let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
1719        let ranges = memtable
1720            .ranges(
1721                Some(&projection),
1722                RangesOptions::default().with_predicate(predicate_group),
1723            )
1724            .unwrap();
1725
1726        assert_eq!(1, ranges.ranges.len());
1727        let range = ranges.ranges.get(&0).unwrap();
1728
1729        assert!(range.is_record_batch());
1730        let record_batch_iter = range.build_record_batch_iter(None, None).unwrap();
1731
1732        let mut total_rows = 0;
1733        for batch_result in record_batch_iter {
1734            let batch = batch_result.unwrap();
1735            assert!(batch.num_rows() > 0);
1736            assert_eq!(5, batch.num_columns());
1737            total_rows += batch.num_rows();
1738        }
1739        assert_eq!(3, total_rows);
1740    }
1741
1742    #[test]
1743    fn test_bulk_memtable_unsupported_operations() {
1744        let metadata = metadata_for_test();
1745        let memtable = BulkMemtable::new(
1746            111,
1747            BulkMemtableConfig::default(),
1748            metadata.clone(),
1749            None,
1750            None,
1751            false,
1752            MergeMode::LastRow,
1753        );
1754
1755        let key_values = build_key_values_with_ts_seq_values(
1756            &metadata,
1757            "test".to_string(),
1758            1,
1759            vec![1000].into_iter(),
1760            vec![Some(1.0)].into_iter(),
1761            1,
1762        );
1763
1764        let err = memtable.write(&key_values).unwrap_err();
1765        assert!(err.to_string().contains("not supported"));
1766
1767        let kv = key_values.iter().next().unwrap();
1768        let err = memtable.write_one(kv).unwrap_err();
1769        assert!(err.to_string().contains("not supported"));
1770    }
1771
1772    #[test]
1773    fn test_bulk_memtable_freeze() {
1774        let metadata = metadata_for_test();
1775        let memtable = BulkMemtable::new(
1776            222,
1777            BulkMemtableConfig::default(),
1778            metadata.clone(),
1779            None,
1780            None,
1781            false,
1782            MergeMode::LastRow,
1783        );
1784
1785        let bulk_part = create_bulk_part_with_converter(
1786            "freeze_test",
1787            10,
1788            vec![10000],
1789            vec![Some(100.0)],
1790            1000,
1791        )
1792        .unwrap();
1793
1794        memtable.write_bulk(bulk_part).unwrap();
1795        memtable.freeze().unwrap();
1796
1797        let stats_after_freeze = memtable.stats();
1798        assert_eq!(1, stats_after_freeze.num_rows);
1799    }
1800
1801    #[test]
1802    fn test_bulk_memtable_fork() {
1803        let metadata = metadata_for_test();
1804        let original_memtable = BulkMemtable::new(
1805            333,
1806            BulkMemtableConfig::default(),
1807            metadata.clone(),
1808            None,
1809            None,
1810            false,
1811            MergeMode::LastRow,
1812        );
1813
1814        let bulk_part =
1815            create_bulk_part_with_converter("fork_test", 15, vec![15000], vec![Some(150.0)], 1500)
1816                .unwrap();
1817
1818        original_memtable.write_bulk(bulk_part).unwrap();
1819
1820        let forked_memtable = original_memtable.fork(444, &metadata);
1821
1822        assert_eq!(forked_memtable.id(), 444);
1823        assert!(forked_memtable.is_empty());
1824        assert_eq!(0, forked_memtable.stats().num_rows);
1825
1826        assert!(!original_memtable.is_empty());
1827        assert_eq!(1, original_memtable.stats().num_rows);
1828    }
1829
1830    #[test]
1831    fn test_bulk_memtable_ranges_multiple_parts() {
1832        let metadata = metadata_for_test();
1833        let memtable = BulkMemtable::new(
1834            777,
1835            BulkMemtableConfig::default(),
1836            metadata.clone(),
1837            None,
1838            None,
1839            false,
1840            MergeMode::LastRow,
1841        );
1842        // Disable unordered_part for this test
1843        memtable.set_unordered_part_threshold(0);
1844
1845        let parts_data = vec![
1846            (
1847                "part1",
1848                1u32,
1849                vec![1000i64, 1100i64],
1850                vec![Some(10.0), Some(11.0)],
1851                100u64,
1852            ),
1853            (
1854                "part2",
1855                2u32,
1856                vec![2000i64, 2100i64],
1857                vec![Some(20.0), Some(21.0)],
1858                200u64,
1859            ),
1860            ("part3", 3u32, vec![3000i64], vec![Some(30.0)], 300u64),
1861        ];
1862
1863        for (k0, k1, timestamps, values, seq) in parts_data {
1864            let part = create_bulk_part_with_converter(k0, k1, timestamps, values, seq).unwrap();
1865            memtable.write_bulk(part).unwrap();
1866        }
1867
1868        let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
1869        let ranges = memtable
1870            .ranges(
1871                None,
1872                RangesOptions::default().with_predicate(predicate_group),
1873            )
1874            .unwrap();
1875
1876        assert_eq!(3, ranges.ranges.len());
1877        let total_rows: usize = ranges.ranges.values().map(|r| r.stats().num_rows()).sum();
1878        assert_eq!(5, total_rows);
1879        assert_eq!(3, ranges.ranges.len());
1880
1881        for (range_id, range) in ranges.ranges.iter() {
1882            assert!(*range_id < 3);
1883            assert!(range.num_rows() > 0);
1884            assert!(range.is_record_batch());
1885        }
1886    }
1887
1888    #[test]
1889    fn test_bulk_memtable_ranges_with_sequence_filter() {
1890        let metadata = metadata_for_test();
1891        let memtable = BulkMemtable::new(
1892            888,
1893            BulkMemtableConfig::default(),
1894            metadata.clone(),
1895            None,
1896            None,
1897            false,
1898            MergeMode::LastRow,
1899        );
1900
1901        let part = create_bulk_part_with_converter(
1902            "seq_test",
1903            1,
1904            vec![1000, 2000, 3000],
1905            vec![Some(10.0), Some(20.0), Some(30.0)],
1906            500,
1907        )
1908        .unwrap();
1909
1910        memtable.write_bulk(part).unwrap();
1911
1912        let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
1913        let sequence_filter = Some(SequenceRange::LtEq { max: 400 }); // Filters out rows with sequence > 400
1914        let ranges = memtable
1915            .ranges(
1916                None,
1917                RangesOptions::default()
1918                    .with_predicate(predicate_group)
1919                    .with_sequence(sequence_filter),
1920            )
1921            .unwrap();
1922
1923        assert_eq!(1, ranges.ranges.len());
1924        let range = ranges.ranges.get(&0).unwrap();
1925
1926        let mut record_batch_iter = range.build_record_batch_iter(None, None).unwrap();
1927        assert!(record_batch_iter.next().is_none());
1928    }
1929
1930    #[test]
1931    fn test_bulk_memtable_ranges_with_encoded_parts() {
1932        let metadata = metadata_for_test();
1933        let config = BulkMemtableConfig {
1934            merge_threshold: 8,
1935            ..Default::default()
1936        };
1937        let memtable = BulkMemtable::new(
1938            999,
1939            config,
1940            metadata.clone(),
1941            None,
1942            None,
1943            false,
1944            MergeMode::LastRow,
1945        );
1946        // Disable unordered_part for this test
1947        memtable.set_unordered_part_threshold(0);
1948
1949        // Adds enough bulk parts to trigger encoding
1950        for i in 0..10 {
1951            let part = create_bulk_part_with_converter(
1952                &format!("key_{}", i),
1953                i,
1954                vec![1000 + i as i64 * 100],
1955                vec![Some(i as f64 * 10.0)],
1956                100 + i as u64,
1957            )
1958            .unwrap();
1959            memtable.write_bulk(part).unwrap();
1960        }
1961
1962        memtable.compact(false).unwrap();
1963
1964        let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
1965        let ranges = memtable
1966            .ranges(
1967                None,
1968                RangesOptions::default().with_predicate(predicate_group),
1969            )
1970            .unwrap();
1971
1972        // Should have ranges for both bulk parts and encoded parts
1973        assert_eq!(3, ranges.ranges.len());
1974        let total_rows: usize = ranges.ranges.values().map(|r| r.stats().num_rows()).sum();
1975        assert_eq!(10, total_rows);
1976
1977        for (_range_id, range) in ranges.ranges.iter() {
1978            assert!(range.num_rows() > 0);
1979            assert!(range.is_record_batch());
1980
1981            let record_batch_iter = range.build_record_batch_iter(None, None).unwrap();
1982            let mut total_rows = 0;
1983            for batch_result in record_batch_iter {
1984                let batch = batch_result.unwrap();
1985                total_rows += batch.num_rows();
1986                assert!(batch.num_rows() > 0);
1987            }
1988            assert_eq!(total_rows, range.num_rows());
1989        }
1990    }
1991
1992    #[test]
1993    fn test_bulk_memtable_unordered_part() {
1994        let metadata = metadata_for_test();
1995        let memtable = BulkMemtable::new(
1996            1001,
1997            BulkMemtableConfig::default(),
1998            metadata.clone(),
1999            None,
2000            None,
2001            false,
2002            MergeMode::LastRow,
2003        );
2004
2005        // Set smaller thresholds for testing with smaller inputs
2006        // Accept parts with < 5 rows into unordered_part
2007        memtable.set_unordered_part_threshold(5);
2008        // Compact when total rows >= 10
2009        memtable.set_unordered_part_compact_threshold(10);
2010
2011        // Write 3 small parts (each has 2 rows), should be collected in unordered_part
2012        for i in 0..3 {
2013            let part = create_bulk_part_with_converter(
2014                &format!("key_{}", i),
2015                i,
2016                vec![1000 + i as i64 * 100, 1100 + i as i64 * 100],
2017                vec![Some(i as f64 * 10.0), Some(i as f64 * 10.0 + 1.0)],
2018                100 + i as u64,
2019            )
2020            .unwrap();
2021            assert_eq!(2, part.num_rows());
2022            memtable.write_bulk(part).unwrap();
2023        }
2024
2025        // Total rows = 6, not yet reaching compact threshold
2026        let stats = memtable.stats();
2027        assert_eq!(6, stats.num_rows);
2028
2029        // Write 2 more small parts (each has 2 rows)
2030        // This should trigger compaction when total >= 10
2031        for i in 3..5 {
2032            let part = create_bulk_part_with_converter(
2033                &format!("key_{}", i),
2034                i,
2035                vec![1000 + i as i64 * 100, 1100 + i as i64 * 100],
2036                vec![Some(i as f64 * 10.0), Some(i as f64 * 10.0 + 1.0)],
2037                100 + i as u64,
2038            )
2039            .unwrap();
2040            memtable.write_bulk(part).unwrap();
2041        }
2042
2043        // Total rows = 10, should have compacted unordered_part into a regular part
2044        let stats = memtable.stats();
2045        assert_eq!(10, stats.num_rows);
2046
2047        // Verify we can read all data correctly
2048        let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
2049        let ranges = memtable
2050            .ranges(
2051                None,
2052                RangesOptions::default().with_predicate(predicate_group),
2053            )
2054            .unwrap();
2055
2056        // Should have at least 1 range (the compacted part)
2057        assert!(!ranges.ranges.is_empty());
2058        let total_rows: usize = ranges.ranges.values().map(|r| r.stats().num_rows()).sum();
2059        assert_eq!(10, total_rows);
2060
2061        // Read all data and verify
2062        let mut total_rows_read = 0;
2063        for (_range_id, range) in ranges.ranges.iter() {
2064            assert!(range.is_record_batch());
2065            let record_batch_iter = range.build_record_batch_iter(None, None).unwrap();
2066
2067            for batch_result in record_batch_iter {
2068                let batch = batch_result.unwrap();
2069                total_rows_read += batch.num_rows();
2070            }
2071        }
2072        assert_eq!(10, total_rows_read);
2073    }
2074
2075    #[test]
2076    fn test_bulk_memtable_unordered_part_mixed_sizes() {
2077        let metadata = metadata_for_test();
2078        let memtable = BulkMemtable::new(
2079            1002,
2080            BulkMemtableConfig::default(),
2081            metadata.clone(),
2082            None,
2083            None,
2084            false,
2085            MergeMode::LastRow,
2086        );
2087
2088        // Set threshold to 4 rows - parts with < 4 rows go to unordered_part
2089        memtable.set_unordered_part_threshold(4);
2090        memtable.set_unordered_part_compact_threshold(8);
2091
2092        // Write small parts (3 rows each) - should go to unordered_part
2093        for i in 0..2 {
2094            let part = create_bulk_part_with_converter(
2095                &format!("small_{}", i),
2096                i,
2097                vec![1000 + i as i64, 2000 + i as i64, 3000 + i as i64],
2098                vec![Some(i as f64), Some(i as f64 + 1.0), Some(i as f64 + 2.0)],
2099                10 + i as u64,
2100            )
2101            .unwrap();
2102            assert_eq!(3, part.num_rows());
2103            memtable.write_bulk(part).unwrap();
2104        }
2105
2106        // Write a large part (5 rows) - should go directly to regular parts
2107        let large_part = create_bulk_part_with_converter(
2108            "large_key",
2109            100,
2110            vec![5000, 6000, 7000, 8000, 9000],
2111            vec![
2112                Some(100.0),
2113                Some(101.0),
2114                Some(102.0),
2115                Some(103.0),
2116                Some(104.0),
2117            ],
2118            50,
2119        )
2120        .unwrap();
2121        assert_eq!(5, large_part.num_rows());
2122        memtable.write_bulk(large_part).unwrap();
2123
2124        // Write another small part (2 rows) - should trigger compaction of unordered_part
2125        let part = create_bulk_part_with_converter(
2126            "small_2",
2127            2,
2128            vec![4000, 4100],
2129            vec![Some(20.0), Some(21.0)],
2130            30,
2131        )
2132        .unwrap();
2133        memtable.write_bulk(part).unwrap();
2134
2135        let stats = memtable.stats();
2136        assert_eq!(13, stats.num_rows); // 3 + 3 + 5 + 2 = 13
2137
2138        // Verify all data can be read
2139        let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
2140        let ranges = memtable
2141            .ranges(
2142                None,
2143                RangesOptions::default().with_predicate(predicate_group),
2144            )
2145            .unwrap();
2146
2147        let total_rows: usize = ranges.ranges.values().map(|r| r.stats().num_rows()).sum();
2148        assert_eq!(13, total_rows);
2149
2150        let mut total_rows_read = 0;
2151        for (_range_id, range) in ranges.ranges.iter() {
2152            let record_batch_iter = range.build_record_batch_iter(None, None).unwrap();
2153            for batch_result in record_batch_iter {
2154                let batch = batch_result.unwrap();
2155                total_rows_read += batch.num_rows();
2156            }
2157        }
2158        assert_eq!(13, total_rows_read);
2159    }
2160
2161    #[test]
2162    fn test_bulk_memtable_unordered_part_with_ranges() {
2163        let metadata = metadata_for_test();
2164        let memtable = BulkMemtable::new(
2165            1003,
2166            BulkMemtableConfig::default(),
2167            metadata.clone(),
2168            None,
2169            None,
2170            false,
2171            MergeMode::LastRow,
2172        );
2173
2174        // Set small thresholds
2175        memtable.set_unordered_part_threshold(3);
2176        memtable.set_unordered_part_compact_threshold(100); // High threshold to prevent auto-compaction
2177
2178        // Write several small parts that stay in unordered_part
2179        for i in 0..3 {
2180            let part = create_bulk_part_with_converter(
2181                &format!("key_{}", i),
2182                i,
2183                vec![1000 + i as i64 * 100],
2184                vec![Some(i as f64 * 10.0)],
2185                100 + i as u64,
2186            )
2187            .unwrap();
2188            assert_eq!(1, part.num_rows());
2189            memtable.write_bulk(part).unwrap();
2190        }
2191
2192        let stats = memtable.stats();
2193        assert_eq!(3, stats.num_rows);
2194
2195        // Test that ranges() can correctly read from unordered_part
2196        let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
2197        let ranges = memtable
2198            .ranges(
2199                None,
2200                RangesOptions::default().with_predicate(predicate_group),
2201            )
2202            .unwrap();
2203
2204        // Should have 1 range for the unordered_part
2205        assert_eq!(1, ranges.ranges.len());
2206        let total_rows: usize = ranges.ranges.values().map(|r| r.stats().num_rows()).sum();
2207        assert_eq!(3, total_rows);
2208
2209        // Verify data is sorted correctly in the range
2210        let range = ranges.ranges.get(&0).unwrap();
2211        let record_batch_iter = range.build_record_batch_iter(None, None).unwrap();
2212
2213        let mut total_rows = 0;
2214        for batch_result in record_batch_iter {
2215            let batch = batch_result.unwrap();
2216            total_rows += batch.num_rows();
2217            // Verify data is properly sorted by primary key
2218            assert!(batch.num_rows() > 0);
2219        }
2220        assert_eq!(3, total_rows);
2221    }
2222
2223    /// Helper to create a BulkPartWrapper from a BulkPart.
2224    fn create_bulk_part_wrapper(part: BulkPart) -> BulkPartWrapper {
2225        BulkPartWrapper {
2226            part: PartToMerge::Bulk {
2227                part,
2228                file_id: FileId::random(),
2229            },
2230            merging: false,
2231        }
2232    }
2233
2234    #[test]
2235    fn test_should_merge_parts_below_threshold() {
2236        let mut bulk_parts = BulkParts::default();
2237
2238        // Add 7 bulk parts (below DEFAULT_MERGE_THRESHOLD of 8)
2239        for i in 0..DEFAULT_MERGE_THRESHOLD - 1 {
2240            let part = create_bulk_part_with_converter(
2241                &format!("key_{}", i),
2242                i as u32,
2243                vec![1000 + i as i64 * 100],
2244                vec![Some(i as f64 * 10.0)],
2245                100 + i as u64,
2246            )
2247            .unwrap();
2248            bulk_parts.parts.push(create_bulk_part_wrapper(part));
2249        }
2250
2251        // Should not trigger merge since we have only 7 parts
2252        assert!(!bulk_parts.should_merge_parts(DEFAULT_MERGE_THRESHOLD));
2253    }
2254
2255    #[test]
2256    fn test_should_merge_parts_at_threshold() {
2257        let mut bulk_parts = BulkParts::default();
2258        let merge_threshold = 8;
2259
2260        // Add 8 bulk parts (at merge_threshold)
2261        for i in 0..merge_threshold {
2262            let part = create_bulk_part_with_converter(
2263                &format!("key_{}", i),
2264                i as u32,
2265                vec![1000 + i as i64 * 100],
2266                vec![Some(i as f64 * 10.0)],
2267                100 + i as u64,
2268            )
2269            .unwrap();
2270            bulk_parts.parts.push(create_bulk_part_wrapper(part));
2271        }
2272
2273        // Should trigger merge since we have 8 parts
2274        assert!(bulk_parts.should_merge_parts(merge_threshold));
2275    }
2276
2277    #[test]
2278    fn test_should_merge_parts_with_merging_flag() {
2279        let mut bulk_parts = BulkParts::default();
2280        let merge_threshold = 8;
2281
2282        // Add 10 bulk parts
2283        for i in 0..10 {
2284            let part = create_bulk_part_with_converter(
2285                &format!("key_{}", i),
2286                i as u32,
2287                vec![1000 + i as i64 * 100],
2288                vec![Some(i as f64 * 10.0)],
2289                100 + i as u64,
2290            )
2291            .unwrap();
2292            bulk_parts.parts.push(create_bulk_part_wrapper(part));
2293        }
2294
2295        // Should trigger merge since we have 10 parts
2296        assert!(bulk_parts.should_merge_parts(merge_threshold));
2297
2298        // Mark first 3 parts as merging
2299        for wrapper in bulk_parts.parts.iter_mut().take(3) {
2300            wrapper.merging = true;
2301        }
2302
2303        // Now only 7 parts are available for merging, should not trigger
2304        assert!(!bulk_parts.should_merge_parts(merge_threshold));
2305    }
2306
2307    #[test]
2308    fn test_collect_parts_to_merge_grouping() {
2309        let mut bulk_parts = BulkParts::default();
2310
2311        // Add 16 bulk parts with different row counts
2312        for i in 0..16 {
2313            let num_rows = (i % 4) + 1; // 1 to 4 rows
2314            let timestamps: Vec<i64> = (0..num_rows)
2315                .map(|j| 1000 + i as i64 * 100 + j as i64)
2316                .collect();
2317            let values: Vec<Option<f64>> =
2318                (0..num_rows).map(|j| Some((i * 10 + j) as f64)).collect();
2319            let part = create_bulk_part_with_converter(
2320                &format!("key_{}", i),
2321                i as u32,
2322                timestamps,
2323                values,
2324                100 + i as u64,
2325            )
2326            .unwrap();
2327            bulk_parts.parts.push(create_bulk_part_wrapper(part));
2328        }
2329
2330        // Should trigger merge since we have 16 parts
2331        assert!(bulk_parts.should_merge_parts(DEFAULT_MERGE_THRESHOLD));
2332
2333        // Collect parts to merge
2334        let collected =
2335            bulk_parts.collect_parts_to_merge(DEFAULT_MERGE_THRESHOLD, DEFAULT_MAX_MERGE_GROUPS);
2336
2337        // Should have groups
2338        assert!(!collected.groups.is_empty());
2339
2340        // All groups should have parts
2341        for group in &collected.groups {
2342            assert!(!group.is_empty());
2343        }
2344
2345        // Total parts collected should be 16
2346        let total_parts: usize = collected.groups.iter().map(|g| g.len()).sum();
2347        assert_eq!(16, total_parts);
2348    }
2349
2350    #[test]
2351    fn test_bulk_memtable_ranges_with_multi_bulk_part() {
2352        let metadata = metadata_for_test();
2353        let merge_threshold = 8;
2354        let config = BulkMemtableConfig {
2355            merge_threshold,
2356            ..Default::default()
2357        };
2358        let memtable = BulkMemtable::new(
2359            2005,
2360            config,
2361            metadata.clone(),
2362            None,
2363            None,
2364            false,
2365            MergeMode::LastRow,
2366        );
2367        // Disable unordered_part for this test
2368        memtable.set_unordered_part_threshold(0);
2369
2370        // Write enough bulk parts to trigger merge (merge_threshold = 8)
2371        // Each part has small number of rows so total < DEFAULT_ROW_GROUP_SIZE
2372        // This will result in MultiBulkPart after compaction
2373        for i in 0..merge_threshold {
2374            let part = create_bulk_part_with_converter(
2375                &format!("key_{}", i),
2376                i as u32,
2377                vec![1000 + i as i64 * 100, 2000 + i as i64 * 100],
2378                vec![Some(i as f64 * 10.0), Some(i as f64 * 10.0 + 1.0)],
2379                100 + i as u64,
2380            )
2381            .unwrap();
2382            memtable.write_bulk(part).unwrap();
2383        }
2384
2385        // Compact to trigger MultiBulkPart creation (since total rows < DEFAULT_ROW_GROUP_SIZE)
2386        memtable.compact(false).unwrap();
2387
2388        // Verify we can read from the memtable
2389        let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
2390        let ranges = memtable
2391            .ranges(
2392                None,
2393                RangesOptions::default().with_predicate(predicate_group),
2394            )
2395            .unwrap();
2396
2397        assert_eq!(1, ranges.ranges.len());
2398        let expected_rows = merge_threshold * 2; // Each part has 2 rows
2399        let total_rows: usize = ranges.ranges.values().map(|r| r.stats().num_rows()).sum();
2400        assert_eq!(expected_rows, total_rows);
2401
2402        // Read all data
2403        let mut total_rows_read = 0;
2404        for (_range_id, range) in ranges.ranges.iter() {
2405            assert!(range.is_record_batch());
2406            let record_batch_iter = range.build_record_batch_iter(None, None).unwrap();
2407
2408            for batch_result in record_batch_iter {
2409                let batch = batch_result.unwrap();
2410                total_rows_read += batch.num_rows();
2411            }
2412        }
2413        assert_eq!(expected_rows, total_rows_read);
2414    }
2415
2416    #[test]
2417    fn test_multi_bulk_range_iter_builder_all_pruned() {
2418        let metadata = metadata_for_test();
2419        let merge_threshold = 8;
2420        let config = BulkMemtableConfig {
2421            merge_threshold,
2422            ..Default::default()
2423        };
2424        let memtable = BulkMemtable::new(
2425            2006,
2426            config,
2427            metadata.clone(),
2428            None,
2429            None,
2430            false,
2431            MergeMode::LastRow,
2432        );
2433        memtable.set_unordered_part_threshold(0);
2434
2435        // Write enough bulk parts to trigger merge into MultiBulkPart.
2436        for i in 0..merge_threshold {
2437            let part = create_bulk_part_with_converter(
2438                &format!("key_{}", i),
2439                i as u32,
2440                vec![1000 + i as i64 * 100, 2000 + i as i64 * 100],
2441                vec![Some(i as f64 * 10.0), Some(i as f64 * 10.0 + 1.0)],
2442                100 + i as u64,
2443            )
2444            .unwrap();
2445            memtable.write_bulk(part).unwrap();
2446        }
2447        memtable.compact(false).unwrap();
2448
2449        // Use a predicate that matches no rows so all batches are pruned.
2450        let filter = datafusion_expr::col("k0").eq(datafusion_expr::lit("nonexistent"));
2451        let predicate_group = PredicateGroup::new(&metadata, &[filter]).unwrap();
2452        let ranges = memtable
2453            .ranges(
2454                None,
2455                RangesOptions::default().with_predicate(predicate_group),
2456            )
2457            .unwrap();
2458
2459        // Should return ranges but each range should produce an empty iterator
2460        // instead of an error.
2461        for (_range_id, range) in ranges.ranges.iter() {
2462            assert!(range.is_record_batch());
2463            let record_batch_iter = range.build_record_batch_iter(None, None).unwrap();
2464            let total_rows: usize = record_batch_iter.map(|r| r.unwrap().num_rows()).sum();
2465            assert_eq!(0, total_rows);
2466        }
2467    }
2468}