mito2/memtable/
bulk.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Memtable implementation for bulk load
16
17#[allow(unused)]
18pub mod context;
19#[allow(unused)]
20pub mod part;
21pub mod part_reader;
22mod row_group_reader;
23
24use std::collections::{BTreeMap, HashSet};
25use std::sync::atomic::{AtomicI64, AtomicU64, AtomicUsize, Ordering};
26use std::sync::{Arc, Mutex, RwLock};
27use std::time::Instant;
28
29use datatypes::arrow::datatypes::SchemaRef;
30use mito_codec::key_values::KeyValue;
31use rayon::prelude::*;
32use store_api::metadata::RegionMetadataRef;
33use store_api::storage::{ColumnId, FileId, RegionId, SequenceRange};
34use tokio::sync::Semaphore;
35
36use crate::error::{Result, UnsupportedOperationSnafu};
37use crate::flush::WriteBufferManagerRef;
38use crate::memtable::bulk::context::BulkIterContext;
39use crate::memtable::bulk::part::{
40    BulkPart, BulkPartEncodeMetrics, BulkPartEncoder, UnorderedPart,
41};
42use crate::memtable::bulk::part_reader::BulkPartRecordBatchIter;
43use crate::memtable::stats::WriteMetrics;
44use crate::memtable::{
45    AllocTracker, BoxedBatchIterator, BoxedRecordBatchIterator, EncodedBulkPart, EncodedRange,
46    IterBuilder, KeyValues, MemScanMetrics, Memtable, MemtableBuilder, MemtableId, MemtableRange,
47    MemtableRangeContext, MemtableRanges, MemtableRef, MemtableStats, RangesOptions,
48};
49use crate::read::flat_dedup::{FlatDedupIterator, FlatLastNonNull, FlatLastRow};
50use crate::read::flat_merge::FlatMergeIterator;
51use crate::region::options::MergeMode;
52use crate::sst::parquet::format::FIXED_POS_COLUMN_NUM;
53use crate::sst::parquet::{DEFAULT_READ_BATCH_SIZE, DEFAULT_ROW_GROUP_SIZE};
54use crate::sst::{FlatSchemaOptions, to_flat_sst_arrow_schema};
55
56/// All parts in a bulk memtable.
57#[derive(Default)]
58struct BulkParts {
59    /// Unordered small parts (< 1024 rows).
60    unordered_part: UnorderedPart,
61    /// Raw parts.
62    parts: Vec<BulkPartWrapper>,
63    /// Parts encoded as parquets.
64    encoded_parts: Vec<EncodedPartWrapper>,
65}
66
67impl BulkParts {
68    /// Total number of parts (raw + encoded + unordered).
69    fn num_parts(&self) -> usize {
70        let unordered_count = if self.unordered_part.is_empty() { 0 } else { 1 };
71        self.parts.len() + self.encoded_parts.len() + unordered_count
72    }
73
74    /// Returns true if there is no part.
75    fn is_empty(&self) -> bool {
76        self.unordered_part.is_empty() && self.parts.is_empty() && self.encoded_parts.is_empty()
77    }
78
79    /// Returns true if the bulk parts should be merged.
80    fn should_merge_bulk_parts(&self) -> bool {
81        let unmerged_count = self.parts.iter().filter(|wrapper| !wrapper.merging).count();
82        // If the total number of unmerged parts is >= 8, start a merge task.
83        unmerged_count >= 8
84    }
85
86    /// Returns true if the encoded parts should be merged.
87    fn should_merge_encoded_parts(&self) -> bool {
88        let unmerged_count = self
89            .encoded_parts
90            .iter()
91            .filter(|wrapper| !wrapper.merging)
92            .count();
93        // If the total number of unmerged encoded parts is >= 8, start a merge task.
94        unmerged_count >= 8
95    }
96
97    /// Returns true if the unordered_part should be compacted into a BulkPart.
98    fn should_compact_unordered_part(&self) -> bool {
99        self.unordered_part.should_compact()
100    }
101
102    /// Collects unmerged parts and marks them as being merged.
103    /// Returns the collected parts to merge.
104    fn collect_bulk_parts_to_merge(&mut self) -> Vec<PartToMerge> {
105        let mut collected_parts = Vec::new();
106
107        for wrapper in &mut self.parts {
108            if !wrapper.merging {
109                wrapper.merging = true;
110                collected_parts.push(PartToMerge::Bulk {
111                    part: wrapper.part.clone(),
112                    file_id: wrapper.file_id,
113                });
114            }
115        }
116        collected_parts
117    }
118
119    /// Collects unmerged encoded parts within size threshold and marks them as being merged.
120    /// Returns the collected parts to merge.
121    fn collect_encoded_parts_to_merge(&mut self) -> Vec<PartToMerge> {
122        // Find minimum size among unmerged parts
123        let min_size = self
124            .encoded_parts
125            .iter()
126            .filter(|wrapper| !wrapper.merging)
127            .map(|wrapper| wrapper.part.size_bytes())
128            .min();
129
130        let Some(min_size) = min_size else {
131            return Vec::new();
132        };
133
134        let max_allowed_size = min_size.saturating_mul(16).min(4 * 1024 * 1024);
135        let mut collected_parts = Vec::new();
136
137        for wrapper in &mut self.encoded_parts {
138            if !wrapper.merging {
139                let size = wrapper.part.size_bytes();
140                if size <= max_allowed_size {
141                    wrapper.merging = true;
142                    collected_parts.push(PartToMerge::Encoded {
143                        part: wrapper.part.clone(),
144                        file_id: wrapper.file_id,
145                    });
146                }
147            }
148        }
149        collected_parts
150    }
151
152    /// Installs merged encoded parts and removes the original parts by file ids.
153    /// Returns the total number of rows in the merged parts.
154    fn install_merged_parts<I>(
155        &mut self,
156        merged_parts: I,
157        merged_file_ids: &HashSet<FileId>,
158        merge_encoded: bool,
159    ) -> usize
160    where
161        I: IntoIterator<Item = EncodedBulkPart>,
162    {
163        let mut total_output_rows = 0;
164
165        for encoded_part in merged_parts {
166            total_output_rows += encoded_part.metadata().num_rows;
167            self.encoded_parts.push(EncodedPartWrapper {
168                part: encoded_part,
169                file_id: FileId::random(),
170                merging: false,
171            });
172        }
173
174        if merge_encoded {
175            self.encoded_parts
176                .retain(|wrapper| !merged_file_ids.contains(&wrapper.file_id));
177        } else {
178            self.parts
179                .retain(|wrapper| !merged_file_ids.contains(&wrapper.file_id));
180        }
181
182        total_output_rows
183    }
184
185    /// Resets merging flag for parts with the given file ids.
186    /// Used when merging fails or is cancelled.
187    fn reset_merging_flags(&mut self, file_ids: &HashSet<FileId>, merge_encoded: bool) {
188        if merge_encoded {
189            for wrapper in &mut self.encoded_parts {
190                if file_ids.contains(&wrapper.file_id) {
191                    wrapper.merging = false;
192                }
193            }
194        } else {
195            for wrapper in &mut self.parts {
196                if file_ids.contains(&wrapper.file_id) {
197                    wrapper.merging = false;
198                }
199            }
200        }
201    }
202}
203
204/// RAII guard for managing merging flags.
205/// Automatically resets merging flags when dropped if the merge operation wasn't successful.
206struct MergingFlagsGuard<'a> {
207    bulk_parts: &'a RwLock<BulkParts>,
208    file_ids: &'a HashSet<FileId>,
209    merge_encoded: bool,
210    success: bool,
211}
212
213impl<'a> MergingFlagsGuard<'a> {
214    /// Creates a new guard for the given file ids.
215    fn new(
216        bulk_parts: &'a RwLock<BulkParts>,
217        file_ids: &'a HashSet<FileId>,
218        merge_encoded: bool,
219    ) -> Self {
220        Self {
221            bulk_parts,
222            file_ids,
223            merge_encoded,
224            success: false,
225        }
226    }
227
228    /// Marks the merge operation as successful.
229    /// When this is called, the guard will not reset the flags on drop.
230    fn mark_success(&mut self) {
231        self.success = true;
232    }
233}
234
235impl<'a> Drop for MergingFlagsGuard<'a> {
236    fn drop(&mut self) {
237        if !self.success
238            && let Ok(mut parts) = self.bulk_parts.write()
239        {
240            parts.reset_merging_flags(self.file_ids, self.merge_encoded);
241        }
242    }
243}
244
245/// Memtable that ingests and scans parts directly.
246pub struct BulkMemtable {
247    id: MemtableId,
248    parts: Arc<RwLock<BulkParts>>,
249    metadata: RegionMetadataRef,
250    alloc_tracker: AllocTracker,
251    max_timestamp: AtomicI64,
252    min_timestamp: AtomicI64,
253    max_sequence: AtomicU64,
254    num_rows: AtomicUsize,
255    /// Cached flat SST arrow schema for memtable compaction.
256    flat_arrow_schema: SchemaRef,
257    /// Compactor for merging bulk parts
258    compactor: Arc<Mutex<MemtableCompactor>>,
259    /// Dispatcher for scheduling compaction tasks
260    compact_dispatcher: Option<Arc<CompactDispatcher>>,
261    /// Whether the append mode is enabled
262    append_mode: bool,
263    /// Mode to handle duplicate rows while merging
264    merge_mode: MergeMode,
265}
266
267impl std::fmt::Debug for BulkMemtable {
268    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
269        f.debug_struct("BulkMemtable")
270            .field("id", &self.id)
271            .field("num_rows", &self.num_rows.load(Ordering::Relaxed))
272            .field("min_timestamp", &self.min_timestamp.load(Ordering::Relaxed))
273            .field("max_timestamp", &self.max_timestamp.load(Ordering::Relaxed))
274            .field("max_sequence", &self.max_sequence.load(Ordering::Relaxed))
275            .finish()
276    }
277}
278
279impl Memtable for BulkMemtable {
280    fn id(&self) -> MemtableId {
281        self.id
282    }
283
284    fn write(&self, _kvs: &KeyValues) -> Result<()> {
285        UnsupportedOperationSnafu {
286            err_msg: "write() is not supported for bulk memtable",
287        }
288        .fail()
289    }
290
291    fn write_one(&self, _key_value: KeyValue) -> Result<()> {
292        UnsupportedOperationSnafu {
293            err_msg: "write_one() is not supported for bulk memtable",
294        }
295        .fail()
296    }
297
298    fn write_bulk(&self, fragment: BulkPart) -> Result<()> {
299        let local_metrics = WriteMetrics {
300            key_bytes: 0,
301            value_bytes: fragment.estimated_size(),
302            min_ts: fragment.min_timestamp,
303            max_ts: fragment.max_timestamp,
304            num_rows: fragment.num_rows(),
305            max_sequence: fragment.sequence,
306        };
307
308        {
309            let mut bulk_parts = self.parts.write().unwrap();
310
311            // Routes small parts to unordered_part based on threshold
312            if bulk_parts.unordered_part.should_accept(fragment.num_rows()) {
313                bulk_parts.unordered_part.push(fragment);
314
315                // Compacts unordered_part if threshold is reached
316                if bulk_parts.should_compact_unordered_part()
317                    && let Some(bulk_part) = bulk_parts.unordered_part.to_bulk_part()?
318                {
319                    bulk_parts.parts.push(BulkPartWrapper {
320                        part: bulk_part,
321                        file_id: FileId::random(),
322                        merging: false,
323                    });
324                    bulk_parts.unordered_part.clear();
325                }
326            } else {
327                bulk_parts.parts.push(BulkPartWrapper {
328                    part: fragment,
329                    file_id: FileId::random(),
330                    merging: false,
331                });
332            }
333
334            // Since this operation should be fast, we do it in parts lock scope.
335            // This ensure the statistics in `ranges()` are correct. What's more,
336            // it guarantees no rows are out of the time range so we don't need to
337            // prune rows by time range again in the iterator of the MemtableRange.
338            self.update_stats(local_metrics);
339        }
340
341        if self.should_compact() {
342            self.schedule_compact();
343        }
344
345        Ok(())
346    }
347
348    #[cfg(any(test, feature = "test"))]
349    fn iter(
350        &self,
351        _projection: Option<&[ColumnId]>,
352        _predicate: Option<table::predicate::Predicate>,
353        _sequence: Option<SequenceRange>,
354    ) -> Result<crate::memtable::BoxedBatchIterator> {
355        todo!()
356    }
357
358    fn ranges(
359        &self,
360        projection: Option<&[ColumnId]>,
361        options: RangesOptions,
362    ) -> Result<MemtableRanges> {
363        let predicate = options.predicate;
364        let sequence = options.sequence;
365        let mut ranges = BTreeMap::new();
366        let mut range_id = 0;
367
368        // TODO(yingwen): Filter ranges by sequence.
369        let context = Arc::new(BulkIterContext::new_with_pre_filter_mode(
370            self.metadata.clone(),
371            projection,
372            predicate.predicate().cloned(),
373            options.for_flush,
374            options.pre_filter_mode,
375        )?);
376
377        // Adds ranges for regular parts and encoded parts
378        {
379            let bulk_parts = self.parts.read().unwrap();
380
381            // Adds range for unordered part if not empty
382            if !bulk_parts.unordered_part.is_empty()
383                && let Some(unordered_bulk_part) = bulk_parts.unordered_part.to_bulk_part()?
384            {
385                let num_rows = unordered_bulk_part.num_rows();
386                let range = MemtableRange::new(
387                    Arc::new(MemtableRangeContext::new(
388                        self.id,
389                        Box::new(BulkRangeIterBuilder {
390                            part: unordered_bulk_part,
391                            context: context.clone(),
392                            sequence,
393                        }),
394                        predicate.clone(),
395                    )),
396                    num_rows,
397                );
398                ranges.insert(range_id, range);
399                range_id += 1;
400            }
401
402            // Adds ranges for regular parts
403            for part_wrapper in bulk_parts.parts.iter() {
404                // Skips empty parts
405                if part_wrapper.part.num_rows() == 0 {
406                    continue;
407                }
408
409                let range = MemtableRange::new(
410                    Arc::new(MemtableRangeContext::new(
411                        self.id,
412                        Box::new(BulkRangeIterBuilder {
413                            part: part_wrapper.part.clone(),
414                            context: context.clone(),
415                            sequence,
416                        }),
417                        predicate.clone(),
418                    )),
419                    part_wrapper.part.num_rows(),
420                );
421                ranges.insert(range_id, range);
422                range_id += 1;
423            }
424
425            // Adds ranges for encoded parts
426            for encoded_part_wrapper in bulk_parts.encoded_parts.iter() {
427                // Skips empty parts
428                if encoded_part_wrapper.part.metadata().num_rows == 0 {
429                    continue;
430                }
431
432                let range = MemtableRange::new(
433                    Arc::new(MemtableRangeContext::new(
434                        self.id,
435                        Box::new(EncodedBulkRangeIterBuilder {
436                            file_id: encoded_part_wrapper.file_id,
437                            part: encoded_part_wrapper.part.clone(),
438                            context: context.clone(),
439                            sequence,
440                        }),
441                        predicate.clone(),
442                    )),
443                    encoded_part_wrapper.part.metadata().num_rows,
444                );
445                ranges.insert(range_id, range);
446                range_id += 1;
447            }
448        }
449
450        let mut stats = self.stats();
451        stats.num_ranges = ranges.len();
452
453        // TODO(yingwen): Supports per range stats.
454        Ok(MemtableRanges { ranges, stats })
455    }
456
457    fn is_empty(&self) -> bool {
458        let bulk_parts = self.parts.read().unwrap();
459        bulk_parts.is_empty()
460    }
461
462    fn freeze(&self) -> Result<()> {
463        self.alloc_tracker.done_allocating();
464        Ok(())
465    }
466
467    fn stats(&self) -> MemtableStats {
468        let estimated_bytes = self.alloc_tracker.bytes_allocated();
469
470        if estimated_bytes == 0 || self.num_rows.load(Ordering::Relaxed) == 0 {
471            return MemtableStats {
472                estimated_bytes,
473                time_range: None,
474                num_rows: 0,
475                num_ranges: 0,
476                max_sequence: 0,
477                series_count: 0,
478            };
479        }
480
481        let ts_type = self
482            .metadata
483            .time_index_column()
484            .column_schema
485            .data_type
486            .clone()
487            .as_timestamp()
488            .expect("Timestamp column must have timestamp type");
489        let max_timestamp = ts_type.create_timestamp(self.max_timestamp.load(Ordering::Relaxed));
490        let min_timestamp = ts_type.create_timestamp(self.min_timestamp.load(Ordering::Relaxed));
491
492        let num_ranges = self.parts.read().unwrap().num_parts();
493
494        MemtableStats {
495            estimated_bytes,
496            time_range: Some((min_timestamp, max_timestamp)),
497            num_rows: self.num_rows.load(Ordering::Relaxed),
498            num_ranges,
499            max_sequence: self.max_sequence.load(Ordering::Relaxed),
500            series_count: self.estimated_series_count(),
501        }
502    }
503
504    fn fork(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef {
505        // Computes the new flat schema based on the new metadata.
506        let flat_arrow_schema = to_flat_sst_arrow_schema(
507            metadata,
508            &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
509        );
510
511        Arc::new(Self {
512            id,
513            parts: Arc::new(RwLock::new(BulkParts::default())),
514            metadata: metadata.clone(),
515            alloc_tracker: AllocTracker::new(self.alloc_tracker.write_buffer_manager()),
516            max_timestamp: AtomicI64::new(i64::MIN),
517            min_timestamp: AtomicI64::new(i64::MAX),
518            max_sequence: AtomicU64::new(0),
519            num_rows: AtomicUsize::new(0),
520            flat_arrow_schema,
521            compactor: Arc::new(Mutex::new(MemtableCompactor::new(metadata.region_id, id))),
522            compact_dispatcher: self.compact_dispatcher.clone(),
523            append_mode: self.append_mode,
524            merge_mode: self.merge_mode,
525        })
526    }
527
528    fn compact(&self, for_flush: bool) -> Result<()> {
529        let mut compactor = self.compactor.lock().unwrap();
530
531        if for_flush {
532            return Ok(());
533        }
534
535        // Try to merge regular parts first
536        let should_merge = self.parts.read().unwrap().should_merge_bulk_parts();
537        if should_merge {
538            compactor.merge_bulk_parts(
539                &self.flat_arrow_schema,
540                &self.parts,
541                &self.metadata,
542                !self.append_mode,
543                self.merge_mode,
544            )?;
545        }
546
547        // Then try to merge encoded parts
548        let should_merge = self.parts.read().unwrap().should_merge_encoded_parts();
549        if should_merge {
550            compactor.merge_encoded_parts(
551                &self.flat_arrow_schema,
552                &self.parts,
553                &self.metadata,
554                !self.append_mode,
555                self.merge_mode,
556            )?;
557        }
558
559        Ok(())
560    }
561}
562
563impl BulkMemtable {
564    /// Creates a new BulkMemtable
565    pub fn new(
566        id: MemtableId,
567        metadata: RegionMetadataRef,
568        write_buffer_manager: Option<WriteBufferManagerRef>,
569        compact_dispatcher: Option<Arc<CompactDispatcher>>,
570        append_mode: bool,
571        merge_mode: MergeMode,
572    ) -> Self {
573        let flat_arrow_schema = to_flat_sst_arrow_schema(
574            &metadata,
575            &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
576        );
577
578        let region_id = metadata.region_id;
579        Self {
580            id,
581            parts: Arc::new(RwLock::new(BulkParts::default())),
582            metadata,
583            alloc_tracker: AllocTracker::new(write_buffer_manager),
584            max_timestamp: AtomicI64::new(i64::MIN),
585            min_timestamp: AtomicI64::new(i64::MAX),
586            max_sequence: AtomicU64::new(0),
587            num_rows: AtomicUsize::new(0),
588            flat_arrow_schema,
589            compactor: Arc::new(Mutex::new(MemtableCompactor::new(region_id, id))),
590            compact_dispatcher,
591            append_mode,
592            merge_mode,
593        }
594    }
595
596    /// Sets the unordered part threshold (for testing).
597    #[cfg(test)]
598    pub fn set_unordered_part_threshold(&self, threshold: usize) {
599        self.parts
600            .write()
601            .unwrap()
602            .unordered_part
603            .set_threshold(threshold);
604    }
605
606    /// Sets the unordered part compact threshold (for testing).
607    #[cfg(test)]
608    pub fn set_unordered_part_compact_threshold(&self, compact_threshold: usize) {
609        self.parts
610            .write()
611            .unwrap()
612            .unordered_part
613            .set_compact_threshold(compact_threshold);
614    }
615
616    /// Updates memtable stats.
617    ///
618    /// Please update this inside the write lock scope.
619    fn update_stats(&self, stats: WriteMetrics) {
620        self.alloc_tracker
621            .on_allocation(stats.key_bytes + stats.value_bytes);
622
623        self.max_timestamp
624            .fetch_max(stats.max_ts, Ordering::Relaxed);
625        self.min_timestamp
626            .fetch_min(stats.min_ts, Ordering::Relaxed);
627        self.max_sequence
628            .fetch_max(stats.max_sequence, Ordering::Relaxed);
629        self.num_rows.fetch_add(stats.num_rows, Ordering::Relaxed);
630    }
631
632    /// Returns the estimated time series count.
633    fn estimated_series_count(&self) -> usize {
634        let bulk_parts = self.parts.read().unwrap();
635        bulk_parts
636            .parts
637            .iter()
638            .map(|part_wrapper| part_wrapper.part.estimated_series_count())
639            .sum()
640    }
641
642    /// Returns whether the memtable should be compacted.
643    fn should_compact(&self) -> bool {
644        let parts = self.parts.read().unwrap();
645        parts.should_merge_bulk_parts() || parts.should_merge_encoded_parts()
646    }
647
648    /// Schedules a compaction task using the CompactDispatcher.
649    fn schedule_compact(&self) {
650        if let Some(dispatcher) = &self.compact_dispatcher {
651            let task = MemCompactTask {
652                metadata: self.metadata.clone(),
653                parts: self.parts.clone(),
654                flat_arrow_schema: self.flat_arrow_schema.clone(),
655                compactor: self.compactor.clone(),
656                append_mode: self.append_mode,
657                merge_mode: self.merge_mode,
658            };
659
660            dispatcher.dispatch_compact(task);
661        } else {
662            // Uses synchronous compaction if no dispatcher is available.
663            if let Err(e) = self.compact(false) {
664                common_telemetry::error!(e; "Failed to compact table");
665            }
666        }
667    }
668}
669
670/// Iterator builder for bulk range
671struct BulkRangeIterBuilder {
672    part: BulkPart,
673    context: Arc<BulkIterContext>,
674    sequence: Option<SequenceRange>,
675}
676
677impl IterBuilder for BulkRangeIterBuilder {
678    fn build(&self, _metrics: Option<MemScanMetrics>) -> Result<BoxedBatchIterator> {
679        UnsupportedOperationSnafu {
680            err_msg: "BatchIterator is not supported for bulk memtable",
681        }
682        .fail()
683    }
684
685    fn is_record_batch(&self) -> bool {
686        true
687    }
688
689    fn build_record_batch(
690        &self,
691        metrics: Option<MemScanMetrics>,
692    ) -> Result<BoxedRecordBatchIterator> {
693        let series_count = self.part.estimated_series_count();
694        let iter = BulkPartRecordBatchIter::new(
695            self.part.batch.clone(),
696            self.context.clone(),
697            self.sequence,
698            series_count,
699            metrics,
700        );
701
702        Ok(Box::new(iter))
703    }
704
705    fn encoded_range(&self) -> Option<EncodedRange> {
706        None
707    }
708}
709
710/// Iterator builder for encoded bulk range
711struct EncodedBulkRangeIterBuilder {
712    file_id: FileId,
713    part: EncodedBulkPart,
714    context: Arc<BulkIterContext>,
715    sequence: Option<SequenceRange>,
716}
717
718impl IterBuilder for EncodedBulkRangeIterBuilder {
719    fn build(&self, _metrics: Option<MemScanMetrics>) -> Result<BoxedBatchIterator> {
720        UnsupportedOperationSnafu {
721            err_msg: "BatchIterator is not supported for encoded bulk memtable",
722        }
723        .fail()
724    }
725
726    fn is_record_batch(&self) -> bool {
727        true
728    }
729
730    fn build_record_batch(
731        &self,
732        metrics: Option<MemScanMetrics>,
733    ) -> Result<BoxedRecordBatchIterator> {
734        if let Some(iter) = self
735            .part
736            .read(self.context.clone(), self.sequence, metrics)?
737        {
738            Ok(iter)
739        } else {
740            // Return an empty iterator if no data to read
741            Ok(Box::new(std::iter::empty()))
742        }
743    }
744
745    fn encoded_range(&self) -> Option<EncodedRange> {
746        Some(EncodedRange {
747            data: self.part.data().clone(),
748            sst_info: self.part.to_sst_info(self.file_id),
749        })
750    }
751}
752
753struct BulkPartWrapper {
754    part: BulkPart,
755    /// The unique file id for this part in memtable.
756    file_id: FileId,
757    /// Whether this part is currently being merged.
758    merging: bool,
759}
760
761struct EncodedPartWrapper {
762    part: EncodedBulkPart,
763    /// The unique file id for this part in memtable.
764    file_id: FileId,
765    /// Whether this part is currently being merged.
766    merging: bool,
767}
768
769/// Enum to wrap different types of parts for unified merging.
770#[derive(Clone)]
771enum PartToMerge {
772    /// Raw bulk part.
773    Bulk { part: BulkPart, file_id: FileId },
774    /// Encoded bulk part.
775    Encoded {
776        part: EncodedBulkPart,
777        file_id: FileId,
778    },
779}
780
781impl PartToMerge {
782    /// Gets the file ID of this part.
783    fn file_id(&self) -> FileId {
784        match self {
785            PartToMerge::Bulk { file_id, .. } => *file_id,
786            PartToMerge::Encoded { file_id, .. } => *file_id,
787        }
788    }
789
790    /// Gets the minimum timestamp of this part.
791    fn min_timestamp(&self) -> i64 {
792        match self {
793            PartToMerge::Bulk { part, .. } => part.min_timestamp,
794            PartToMerge::Encoded { part, .. } => part.metadata().min_timestamp,
795        }
796    }
797
798    /// Gets the maximum timestamp of this part.
799    fn max_timestamp(&self) -> i64 {
800        match self {
801            PartToMerge::Bulk { part, .. } => part.max_timestamp,
802            PartToMerge::Encoded { part, .. } => part.metadata().max_timestamp,
803        }
804    }
805
806    /// Gets the number of rows in this part.
807    fn num_rows(&self) -> usize {
808        match self {
809            PartToMerge::Bulk { part, .. } => part.num_rows(),
810            PartToMerge::Encoded { part, .. } => part.metadata().num_rows,
811        }
812    }
813
814    /// Creates a record batch iterator for this part.
815    fn create_iterator(
816        self,
817        context: Arc<BulkIterContext>,
818    ) -> Result<Option<BoxedRecordBatchIterator>> {
819        match self {
820            PartToMerge::Bulk { part, .. } => {
821                let series_count = part.estimated_series_count();
822                let iter = BulkPartRecordBatchIter::new(
823                    part.batch,
824                    context,
825                    None, // No sequence filter for merging
826                    series_count,
827                    None, // No metrics for merging
828                );
829                Ok(Some(Box::new(iter) as BoxedRecordBatchIterator))
830            }
831            PartToMerge::Encoded { part, .. } => part.read(context, None, None),
832        }
833    }
834}
835
836struct MemtableCompactor {
837    region_id: RegionId,
838    memtable_id: MemtableId,
839}
840
841impl MemtableCompactor {
842    /// Creates a new MemtableCompactor.
843    fn new(region_id: RegionId, memtable_id: MemtableId) -> Self {
844        Self {
845            region_id,
846            memtable_id,
847        }
848    }
849
850    /// Merges bulk parts and then encodes the result to an [EncodedBulkPart].
851    fn merge_bulk_parts(
852        &mut self,
853        arrow_schema: &SchemaRef,
854        bulk_parts: &RwLock<BulkParts>,
855        metadata: &RegionMetadataRef,
856        dedup: bool,
857        merge_mode: MergeMode,
858    ) -> Result<()> {
859        let start = Instant::now();
860
861        // Collects unmerged parts and mark them as being merged
862        let parts_to_merge = bulk_parts.write().unwrap().collect_bulk_parts_to_merge();
863        if parts_to_merge.is_empty() {
864            return Ok(());
865        }
866
867        let merged_file_ids: HashSet<FileId> =
868            parts_to_merge.iter().map(|part| part.file_id()).collect();
869        let mut guard = MergingFlagsGuard::new(bulk_parts, &merged_file_ids, false);
870
871        // Sorts parts by row count (ascending) to merge parts with similar row counts.
872        let mut sorted_parts = parts_to_merge;
873        sorted_parts.sort_unstable_by_key(|part| part.num_rows());
874
875        // Groups parts into chunks for concurrent processing.
876        let part_groups: Vec<Vec<PartToMerge>> = sorted_parts
877            .chunks(16)
878            .map(|chunk| chunk.to_vec())
879            .collect();
880
881        let total_groups = part_groups.len();
882        let total_parts_to_merge: usize = part_groups.iter().map(|group| group.len()).sum();
883        let merged_parts = part_groups
884            .into_par_iter()
885            .map(|group| Self::merge_parts_group(group, arrow_schema, metadata, dedup, merge_mode))
886            .collect::<Result<Vec<Option<EncodedBulkPart>>>>()?;
887
888        // Installs merged parts.
889        let total_output_rows = {
890            let mut parts = bulk_parts.write().unwrap();
891            parts.install_merged_parts(merged_parts.into_iter().flatten(), &merged_file_ids, false)
892        };
893
894        guard.mark_success();
895
896        common_telemetry::debug!(
897            "BulkMemtable {} {} concurrent compact {} groups, {} bulk parts, {} rows, cost: {:?}",
898            self.region_id,
899            self.memtable_id,
900            total_groups,
901            total_parts_to_merge,
902            total_output_rows,
903            start.elapsed()
904        );
905
906        Ok(())
907    }
908
909    /// Merges encoded parts and then encodes the result to an [EncodedBulkPart].
910    fn merge_encoded_parts(
911        &mut self,
912        arrow_schema: &SchemaRef,
913        bulk_parts: &RwLock<BulkParts>,
914        metadata: &RegionMetadataRef,
915        dedup: bool,
916        merge_mode: MergeMode,
917    ) -> Result<()> {
918        let start = Instant::now();
919
920        // Collects unmerged encoded parts within size threshold and mark them as being merged.
921        let parts_to_merge = {
922            let mut parts = bulk_parts.write().unwrap();
923            parts.collect_encoded_parts_to_merge()
924        };
925
926        if parts_to_merge.is_empty() {
927            return Ok(());
928        }
929
930        let merged_file_ids: HashSet<FileId> =
931            parts_to_merge.iter().map(|part| part.file_id()).collect();
932        let mut guard = MergingFlagsGuard::new(bulk_parts, &merged_file_ids, true);
933
934        if parts_to_merge.len() == 1 {
935            // Only 1 part, don't have to merge - the guard will automatically reset the flag
936            return Ok(());
937        }
938
939        // Groups parts into chunks for concurrent processing.
940        let part_groups: Vec<Vec<PartToMerge>> = parts_to_merge
941            .chunks(16)
942            .map(|chunk| chunk.to_vec())
943            .collect();
944
945        let total_groups = part_groups.len();
946        let total_parts_to_merge: usize = part_groups.iter().map(|group| group.len()).sum();
947
948        let merged_parts = part_groups
949            .into_par_iter()
950            .map(|group| Self::merge_parts_group(group, arrow_schema, metadata, dedup, merge_mode))
951            .collect::<Result<Vec<Option<EncodedBulkPart>>>>()?;
952
953        // Installs merged parts using iterator and get total output rows
954        let total_output_rows = {
955            let mut parts = bulk_parts.write().unwrap();
956            parts.install_merged_parts(merged_parts.into_iter().flatten(), &merged_file_ids, true)
957        };
958
959        // Marks the operation as successful to prevent flag reset
960        guard.mark_success();
961
962        common_telemetry::debug!(
963            "BulkMemtable {} {} concurrent compact {} groups, {} encoded parts, {} rows, cost: {:?}",
964            self.region_id,
965            self.memtable_id,
966            total_groups,
967            total_parts_to_merge,
968            total_output_rows,
969            start.elapsed()
970        );
971
972        Ok(())
973    }
974
975    /// Merges a group of parts into a single encoded part.
976    fn merge_parts_group(
977        parts_to_merge: Vec<PartToMerge>,
978        arrow_schema: &SchemaRef,
979        metadata: &RegionMetadataRef,
980        dedup: bool,
981        merge_mode: MergeMode,
982    ) -> Result<Option<EncodedBulkPart>> {
983        if parts_to_merge.is_empty() {
984            return Ok(None);
985        }
986
987        // Calculates timestamp bounds for merged data
988        let min_timestamp = parts_to_merge
989            .iter()
990            .map(|p| p.min_timestamp())
991            .min()
992            .unwrap_or(i64::MAX);
993        let max_timestamp = parts_to_merge
994            .iter()
995            .map(|p| p.max_timestamp())
996            .max()
997            .unwrap_or(i64::MIN);
998
999        let context = Arc::new(BulkIterContext::new(
1000            metadata.clone(),
1001            None, // No column projection for merging
1002            None, // No predicate for merging
1003            true,
1004        )?);
1005
1006        // Creates iterators for all parts to merge.
1007        let iterators: Vec<BoxedRecordBatchIterator> = parts_to_merge
1008            .into_iter()
1009            .filter_map(|part| part.create_iterator(context.clone()).ok().flatten())
1010            .collect();
1011
1012        if iterators.is_empty() {
1013            return Ok(None);
1014        }
1015
1016        let merged_iter =
1017            FlatMergeIterator::new(arrow_schema.clone(), iterators, DEFAULT_READ_BATCH_SIZE)?;
1018
1019        let boxed_iter: BoxedRecordBatchIterator = if dedup {
1020            // Applies deduplication based on merge mode
1021            match merge_mode {
1022                MergeMode::LastRow => {
1023                    let dedup_iter = FlatDedupIterator::new(merged_iter, FlatLastRow::new(false));
1024                    Box::new(dedup_iter)
1025                }
1026                MergeMode::LastNonNull => {
1027                    // Calculates field column start: total columns - fixed columns - field columns
1028                    // Field column count = total metadata columns - time index column - primary key columns
1029                    let field_column_count =
1030                        metadata.column_metadatas.len() - 1 - metadata.primary_key.len();
1031                    let total_columns = arrow_schema.fields().len();
1032                    let field_column_start =
1033                        total_columns - FIXED_POS_COLUMN_NUM - field_column_count;
1034
1035                    let dedup_iter = FlatDedupIterator::new(
1036                        merged_iter,
1037                        FlatLastNonNull::new(field_column_start, false),
1038                    );
1039                    Box::new(dedup_iter)
1040                }
1041            }
1042        } else {
1043            Box::new(merged_iter)
1044        };
1045
1046        // Encodes the merged iterator
1047        let encoder = BulkPartEncoder::new(metadata.clone(), DEFAULT_ROW_GROUP_SIZE)?;
1048        let mut metrics = BulkPartEncodeMetrics::default();
1049        let encoded_part = encoder.encode_record_batch_iter(
1050            boxed_iter,
1051            arrow_schema.clone(),
1052            min_timestamp,
1053            max_timestamp,
1054            &mut metrics,
1055        )?;
1056
1057        common_telemetry::trace!("merge_parts_group metrics: {:?}", metrics);
1058
1059        Ok(encoded_part)
1060    }
1061}
1062
1063/// A memtable compact task to run in background.
1064struct MemCompactTask {
1065    metadata: RegionMetadataRef,
1066    parts: Arc<RwLock<BulkParts>>,
1067
1068    /// Cached flat SST arrow schema
1069    flat_arrow_schema: SchemaRef,
1070    /// Compactor for merging bulk parts
1071    compactor: Arc<Mutex<MemtableCompactor>>,
1072    /// Whether the append mode is enabled
1073    append_mode: bool,
1074    /// Mode to handle duplicate rows while merging
1075    merge_mode: MergeMode,
1076}
1077
1078impl MemCompactTask {
1079    fn compact(&self) -> Result<()> {
1080        let mut compactor = self.compactor.lock().unwrap();
1081
1082        // Tries to merge regular parts first
1083        let should_merge = self.parts.read().unwrap().should_merge_bulk_parts();
1084        if should_merge {
1085            compactor.merge_bulk_parts(
1086                &self.flat_arrow_schema,
1087                &self.parts,
1088                &self.metadata,
1089                !self.append_mode,
1090                self.merge_mode,
1091            )?;
1092        }
1093
1094        // Then tries to merge encoded parts
1095        let should_merge = self.parts.read().unwrap().should_merge_encoded_parts();
1096        if should_merge {
1097            compactor.merge_encoded_parts(
1098                &self.flat_arrow_schema,
1099                &self.parts,
1100                &self.metadata,
1101                !self.append_mode,
1102                self.merge_mode,
1103            )?;
1104        }
1105
1106        Ok(())
1107    }
1108}
1109
1110/// Scheduler to run compact tasks in background.
1111#[derive(Debug)]
1112pub struct CompactDispatcher {
1113    semaphore: Arc<Semaphore>,
1114}
1115
1116impl CompactDispatcher {
1117    /// Creates a new dispatcher with the given number of max concurrent tasks.
1118    pub fn new(permits: usize) -> Self {
1119        Self {
1120            semaphore: Arc::new(Semaphore::new(permits)),
1121        }
1122    }
1123
1124    /// Dispatches a compact task to run in background.
1125    fn dispatch_compact(&self, task: MemCompactTask) {
1126        let semaphore = self.semaphore.clone();
1127        common_runtime::spawn_global(async move {
1128            let Ok(_permit) = semaphore.acquire().await else {
1129                return;
1130            };
1131
1132            common_runtime::spawn_blocking_global(move || {
1133                if let Err(e) = task.compact() {
1134                    common_telemetry::error!(e; "Failed to compact memtable, region: {}", task.metadata.region_id);
1135                }
1136            });
1137        });
1138    }
1139}
1140
1141/// Builder to build a [BulkMemtable].
1142#[derive(Debug, Default)]
1143pub struct BulkMemtableBuilder {
1144    write_buffer_manager: Option<WriteBufferManagerRef>,
1145    compact_dispatcher: Option<Arc<CompactDispatcher>>,
1146    append_mode: bool,
1147    merge_mode: MergeMode,
1148}
1149
1150impl BulkMemtableBuilder {
1151    /// Creates a new builder with specific `write_buffer_manager`.
1152    pub fn new(
1153        write_buffer_manager: Option<WriteBufferManagerRef>,
1154        append_mode: bool,
1155        merge_mode: MergeMode,
1156    ) -> Self {
1157        Self {
1158            write_buffer_manager,
1159            compact_dispatcher: None,
1160            append_mode,
1161            merge_mode,
1162        }
1163    }
1164
1165    /// Sets the compact dispatcher.
1166    pub fn with_compact_dispatcher(mut self, compact_dispatcher: Arc<CompactDispatcher>) -> Self {
1167        self.compact_dispatcher = Some(compact_dispatcher);
1168        self
1169    }
1170}
1171
1172impl MemtableBuilder for BulkMemtableBuilder {
1173    fn build(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef {
1174        Arc::new(BulkMemtable::new(
1175            id,
1176            metadata.clone(),
1177            self.write_buffer_manager.clone(),
1178            self.compact_dispatcher.clone(),
1179            self.append_mode,
1180            self.merge_mode,
1181        ))
1182    }
1183
1184    fn use_bulk_insert(&self, _metadata: &RegionMetadataRef) -> bool {
1185        true
1186    }
1187}
1188
1189#[cfg(test)]
1190mod tests {
1191
1192    use mito_codec::row_converter::build_primary_key_codec;
1193
1194    use super::*;
1195    use crate::memtable::bulk::part::BulkPartConverter;
1196    use crate::read::scan_region::PredicateGroup;
1197    use crate::sst::{FlatSchemaOptions, to_flat_sst_arrow_schema};
1198    use crate::test_util::memtable_util::{build_key_values_with_ts_seq_values, metadata_for_test};
1199
1200    fn create_bulk_part_with_converter(
1201        k0: &str,
1202        k1: u32,
1203        timestamps: Vec<i64>,
1204        values: Vec<Option<f64>>,
1205        sequence: u64,
1206    ) -> Result<BulkPart> {
1207        let metadata = metadata_for_test();
1208        let capacity = 100;
1209        let primary_key_codec = build_primary_key_codec(&metadata);
1210        let schema = to_flat_sst_arrow_schema(
1211            &metadata,
1212            &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
1213        );
1214
1215        let mut converter =
1216            BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec, true);
1217
1218        let key_values = build_key_values_with_ts_seq_values(
1219            &metadata,
1220            k0.to_string(),
1221            k1,
1222            timestamps.into_iter(),
1223            values.into_iter(),
1224            sequence,
1225        );
1226
1227        converter.append_key_values(&key_values)?;
1228        converter.convert()
1229    }
1230
1231    #[test]
1232    fn test_bulk_memtable_write_read() {
1233        let metadata = metadata_for_test();
1234        let memtable =
1235            BulkMemtable::new(999, metadata.clone(), None, None, false, MergeMode::LastRow);
1236        // Disable unordered_part for this test
1237        memtable.set_unordered_part_threshold(0);
1238
1239        let test_data = [
1240            (
1241                "key_a",
1242                1u32,
1243                vec![1000i64, 2000i64],
1244                vec![Some(10.5), Some(20.5)],
1245                100u64,
1246            ),
1247            (
1248                "key_b",
1249                2u32,
1250                vec![1500i64, 2500i64],
1251                vec![Some(15.5), Some(25.5)],
1252                200u64,
1253            ),
1254            ("key_c", 3u32, vec![3000i64], vec![Some(30.5)], 300u64),
1255        ];
1256
1257        for (k0, k1, timestamps, values, seq) in test_data.iter() {
1258            let part =
1259                create_bulk_part_with_converter(k0, *k1, timestamps.clone(), values.clone(), *seq)
1260                    .unwrap();
1261            memtable.write_bulk(part).unwrap();
1262        }
1263
1264        let stats = memtable.stats();
1265        assert_eq!(5, stats.num_rows);
1266        assert_eq!(3, stats.num_ranges);
1267        assert_eq!(300, stats.max_sequence);
1268
1269        let (min_ts, max_ts) = stats.time_range.unwrap();
1270        assert_eq!(1000, min_ts.value());
1271        assert_eq!(3000, max_ts.value());
1272
1273        let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
1274        let ranges = memtable
1275            .ranges(
1276                None,
1277                RangesOptions::default().with_predicate(predicate_group),
1278            )
1279            .unwrap();
1280
1281        assert_eq!(3, ranges.ranges.len());
1282        assert_eq!(5, ranges.stats.num_rows);
1283
1284        for (_range_id, range) in ranges.ranges.iter() {
1285            assert!(range.num_rows() > 0);
1286            assert!(range.is_record_batch());
1287
1288            let record_batch_iter = range.build_record_batch_iter(None).unwrap();
1289
1290            let mut total_rows = 0;
1291            for batch_result in record_batch_iter {
1292                let batch = batch_result.unwrap();
1293                total_rows += batch.num_rows();
1294                assert!(batch.num_rows() > 0);
1295                assert_eq!(8, batch.num_columns());
1296            }
1297            assert_eq!(total_rows, range.num_rows());
1298        }
1299    }
1300
1301    #[test]
1302    fn test_bulk_memtable_ranges_with_projection() {
1303        let metadata = metadata_for_test();
1304        let memtable =
1305            BulkMemtable::new(111, metadata.clone(), None, None, false, MergeMode::LastRow);
1306
1307        let bulk_part = create_bulk_part_with_converter(
1308            "projection_test",
1309            5,
1310            vec![5000, 6000, 7000],
1311            vec![Some(50.0), Some(60.0), Some(70.0)],
1312            500,
1313        )
1314        .unwrap();
1315
1316        memtable.write_bulk(bulk_part).unwrap();
1317
1318        let projection = vec![4u32];
1319        let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
1320        let ranges = memtable
1321            .ranges(
1322                Some(&projection),
1323                RangesOptions::default().with_predicate(predicate_group),
1324            )
1325            .unwrap();
1326
1327        assert_eq!(1, ranges.ranges.len());
1328        let range = ranges.ranges.get(&0).unwrap();
1329
1330        assert!(range.is_record_batch());
1331        let record_batch_iter = range.build_record_batch_iter(None).unwrap();
1332
1333        let mut total_rows = 0;
1334        for batch_result in record_batch_iter {
1335            let batch = batch_result.unwrap();
1336            assert!(batch.num_rows() > 0);
1337            assert_eq!(5, batch.num_columns());
1338            total_rows += batch.num_rows();
1339        }
1340        assert_eq!(3, total_rows);
1341    }
1342
1343    #[test]
1344    fn test_bulk_memtable_unsupported_operations() {
1345        let metadata = metadata_for_test();
1346        let memtable =
1347            BulkMemtable::new(111, metadata.clone(), None, None, false, MergeMode::LastRow);
1348
1349        let key_values = build_key_values_with_ts_seq_values(
1350            &metadata,
1351            "test".to_string(),
1352            1,
1353            vec![1000].into_iter(),
1354            vec![Some(1.0)].into_iter(),
1355            1,
1356        );
1357
1358        let err = memtable.write(&key_values).unwrap_err();
1359        assert!(err.to_string().contains("not supported"));
1360
1361        let kv = key_values.iter().next().unwrap();
1362        let err = memtable.write_one(kv).unwrap_err();
1363        assert!(err.to_string().contains("not supported"));
1364    }
1365
1366    #[test]
1367    fn test_bulk_memtable_freeze() {
1368        let metadata = metadata_for_test();
1369        let memtable =
1370            BulkMemtable::new(222, metadata.clone(), None, None, false, MergeMode::LastRow);
1371
1372        let bulk_part = create_bulk_part_with_converter(
1373            "freeze_test",
1374            10,
1375            vec![10000],
1376            vec![Some(100.0)],
1377            1000,
1378        )
1379        .unwrap();
1380
1381        memtable.write_bulk(bulk_part).unwrap();
1382        memtable.freeze().unwrap();
1383
1384        let stats_after_freeze = memtable.stats();
1385        assert_eq!(1, stats_after_freeze.num_rows);
1386    }
1387
1388    #[test]
1389    fn test_bulk_memtable_fork() {
1390        let metadata = metadata_for_test();
1391        let original_memtable =
1392            BulkMemtable::new(333, metadata.clone(), None, None, false, MergeMode::LastRow);
1393
1394        let bulk_part =
1395            create_bulk_part_with_converter("fork_test", 15, vec![15000], vec![Some(150.0)], 1500)
1396                .unwrap();
1397
1398        original_memtable.write_bulk(bulk_part).unwrap();
1399
1400        let forked_memtable = original_memtable.fork(444, &metadata);
1401
1402        assert_eq!(forked_memtable.id(), 444);
1403        assert!(forked_memtable.is_empty());
1404        assert_eq!(0, forked_memtable.stats().num_rows);
1405
1406        assert!(!original_memtable.is_empty());
1407        assert_eq!(1, original_memtable.stats().num_rows);
1408    }
1409
1410    #[test]
1411    fn test_bulk_memtable_ranges_multiple_parts() {
1412        let metadata = metadata_for_test();
1413        let memtable =
1414            BulkMemtable::new(777, metadata.clone(), None, None, false, MergeMode::LastRow);
1415        // Disable unordered_part for this test
1416        memtable.set_unordered_part_threshold(0);
1417
1418        let parts_data = vec![
1419            (
1420                "part1",
1421                1u32,
1422                vec![1000i64, 1100i64],
1423                vec![Some(10.0), Some(11.0)],
1424                100u64,
1425            ),
1426            (
1427                "part2",
1428                2u32,
1429                vec![2000i64, 2100i64],
1430                vec![Some(20.0), Some(21.0)],
1431                200u64,
1432            ),
1433            ("part3", 3u32, vec![3000i64], vec![Some(30.0)], 300u64),
1434        ];
1435
1436        for (k0, k1, timestamps, values, seq) in parts_data {
1437            let part = create_bulk_part_with_converter(k0, k1, timestamps, values, seq).unwrap();
1438            memtable.write_bulk(part).unwrap();
1439        }
1440
1441        let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
1442        let ranges = memtable
1443            .ranges(
1444                None,
1445                RangesOptions::default().with_predicate(predicate_group),
1446            )
1447            .unwrap();
1448
1449        assert_eq!(3, ranges.ranges.len());
1450        assert_eq!(5, ranges.stats.num_rows);
1451        assert_eq!(3, ranges.stats.num_ranges);
1452
1453        for (range_id, range) in ranges.ranges.iter() {
1454            assert!(*range_id < 3);
1455            assert!(range.num_rows() > 0);
1456            assert!(range.is_record_batch());
1457        }
1458    }
1459
1460    #[test]
1461    fn test_bulk_memtable_ranges_with_sequence_filter() {
1462        let metadata = metadata_for_test();
1463        let memtable =
1464            BulkMemtable::new(888, metadata.clone(), None, None, false, MergeMode::LastRow);
1465
1466        let part = create_bulk_part_with_converter(
1467            "seq_test",
1468            1,
1469            vec![1000, 2000, 3000],
1470            vec![Some(10.0), Some(20.0), Some(30.0)],
1471            500,
1472        )
1473        .unwrap();
1474
1475        memtable.write_bulk(part).unwrap();
1476
1477        let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
1478        let sequence_filter = Some(SequenceRange::LtEq { max: 400 }); // Filters out rows with sequence > 400
1479        let ranges = memtable
1480            .ranges(
1481                None,
1482                RangesOptions::default()
1483                    .with_predicate(predicate_group)
1484                    .with_sequence(sequence_filter),
1485            )
1486            .unwrap();
1487
1488        assert_eq!(1, ranges.ranges.len());
1489        let range = ranges.ranges.get(&0).unwrap();
1490
1491        let mut record_batch_iter = range.build_record_batch_iter(None).unwrap();
1492        assert!(record_batch_iter.next().is_none());
1493    }
1494
1495    #[test]
1496    fn test_bulk_memtable_ranges_with_encoded_parts() {
1497        let metadata = metadata_for_test();
1498        let memtable =
1499            BulkMemtable::new(999, metadata.clone(), None, None, false, MergeMode::LastRow);
1500        // Disable unordered_part for this test
1501        memtable.set_unordered_part_threshold(0);
1502
1503        // Adds enough bulk parts to trigger encoding
1504        for i in 0..10 {
1505            let part = create_bulk_part_with_converter(
1506                &format!("key_{}", i),
1507                i,
1508                vec![1000 + i as i64 * 100],
1509                vec![Some(i as f64 * 10.0)],
1510                100 + i as u64,
1511            )
1512            .unwrap();
1513            memtable.write_bulk(part).unwrap();
1514        }
1515
1516        memtable.compact(false).unwrap();
1517
1518        let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
1519        let ranges = memtable
1520            .ranges(
1521                None,
1522                RangesOptions::default().with_predicate(predicate_group),
1523            )
1524            .unwrap();
1525
1526        // Should have ranges for both bulk parts and encoded parts
1527        assert_eq!(3, ranges.ranges.len());
1528        assert_eq!(10, ranges.stats.num_rows);
1529
1530        for (_range_id, range) in ranges.ranges.iter() {
1531            assert!(range.num_rows() > 0);
1532            assert!(range.is_record_batch());
1533
1534            let record_batch_iter = range.build_record_batch_iter(None).unwrap();
1535            let mut total_rows = 0;
1536            for batch_result in record_batch_iter {
1537                let batch = batch_result.unwrap();
1538                total_rows += batch.num_rows();
1539                assert!(batch.num_rows() > 0);
1540            }
1541            assert_eq!(total_rows, range.num_rows());
1542        }
1543    }
1544
1545    #[test]
1546    fn test_bulk_memtable_unordered_part() {
1547        let metadata = metadata_for_test();
1548        let memtable = BulkMemtable::new(
1549            1001,
1550            metadata.clone(),
1551            None,
1552            None,
1553            false,
1554            MergeMode::LastRow,
1555        );
1556
1557        // Set smaller thresholds for testing with smaller inputs
1558        // Accept parts with < 5 rows into unordered_part
1559        memtable.set_unordered_part_threshold(5);
1560        // Compact when total rows >= 10
1561        memtable.set_unordered_part_compact_threshold(10);
1562
1563        // Write 3 small parts (each has 2 rows), should be collected in unordered_part
1564        for i in 0..3 {
1565            let part = create_bulk_part_with_converter(
1566                &format!("key_{}", i),
1567                i,
1568                vec![1000 + i as i64 * 100, 1100 + i as i64 * 100],
1569                vec![Some(i as f64 * 10.0), Some(i as f64 * 10.0 + 1.0)],
1570                100 + i as u64,
1571            )
1572            .unwrap();
1573            assert_eq!(2, part.num_rows());
1574            memtable.write_bulk(part).unwrap();
1575        }
1576
1577        // Total rows = 6, not yet reaching compact threshold
1578        let stats = memtable.stats();
1579        assert_eq!(6, stats.num_rows);
1580
1581        // Write 2 more small parts (each has 2 rows)
1582        // This should trigger compaction when total >= 10
1583        for i in 3..5 {
1584            let part = create_bulk_part_with_converter(
1585                &format!("key_{}", i),
1586                i,
1587                vec![1000 + i as i64 * 100, 1100 + i as i64 * 100],
1588                vec![Some(i as f64 * 10.0), Some(i as f64 * 10.0 + 1.0)],
1589                100 + i as u64,
1590            )
1591            .unwrap();
1592            memtable.write_bulk(part).unwrap();
1593        }
1594
1595        // Total rows = 10, should have compacted unordered_part into a regular part
1596        let stats = memtable.stats();
1597        assert_eq!(10, stats.num_rows);
1598
1599        // Verify we can read all data correctly
1600        let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
1601        let ranges = memtable
1602            .ranges(
1603                None,
1604                RangesOptions::default().with_predicate(predicate_group),
1605            )
1606            .unwrap();
1607
1608        // Should have at least 1 range (the compacted part)
1609        assert!(!ranges.ranges.is_empty());
1610        assert_eq!(10, ranges.stats.num_rows);
1611
1612        // Read all data and verify
1613        let mut total_rows_read = 0;
1614        for (_range_id, range) in ranges.ranges.iter() {
1615            assert!(range.is_record_batch());
1616            let record_batch_iter = range.build_record_batch_iter(None).unwrap();
1617
1618            for batch_result in record_batch_iter {
1619                let batch = batch_result.unwrap();
1620                total_rows_read += batch.num_rows();
1621            }
1622        }
1623        assert_eq!(10, total_rows_read);
1624    }
1625
1626    #[test]
1627    fn test_bulk_memtable_unordered_part_mixed_sizes() {
1628        let metadata = metadata_for_test();
1629        let memtable = BulkMemtable::new(
1630            1002,
1631            metadata.clone(),
1632            None,
1633            None,
1634            false,
1635            MergeMode::LastRow,
1636        );
1637
1638        // Set threshold to 4 rows - parts with < 4 rows go to unordered_part
1639        memtable.set_unordered_part_threshold(4);
1640        memtable.set_unordered_part_compact_threshold(8);
1641
1642        // Write small parts (3 rows each) - should go to unordered_part
1643        for i in 0..2 {
1644            let part = create_bulk_part_with_converter(
1645                &format!("small_{}", i),
1646                i,
1647                vec![1000 + i as i64, 2000 + i as i64, 3000 + i as i64],
1648                vec![Some(i as f64), Some(i as f64 + 1.0), Some(i as f64 + 2.0)],
1649                10 + i as u64,
1650            )
1651            .unwrap();
1652            assert_eq!(3, part.num_rows());
1653            memtable.write_bulk(part).unwrap();
1654        }
1655
1656        // Write a large part (5 rows) - should go directly to regular parts
1657        let large_part = create_bulk_part_with_converter(
1658            "large_key",
1659            100,
1660            vec![5000, 6000, 7000, 8000, 9000],
1661            vec![
1662                Some(100.0),
1663                Some(101.0),
1664                Some(102.0),
1665                Some(103.0),
1666                Some(104.0),
1667            ],
1668            50,
1669        )
1670        .unwrap();
1671        assert_eq!(5, large_part.num_rows());
1672        memtable.write_bulk(large_part).unwrap();
1673
1674        // Write another small part (2 rows) - should trigger compaction of unordered_part
1675        let part = create_bulk_part_with_converter(
1676            "small_2",
1677            2,
1678            vec![4000, 4100],
1679            vec![Some(20.0), Some(21.0)],
1680            30,
1681        )
1682        .unwrap();
1683        memtable.write_bulk(part).unwrap();
1684
1685        let stats = memtable.stats();
1686        assert_eq!(13, stats.num_rows); // 3 + 3 + 5 + 2 = 13
1687
1688        // Verify all data can be read
1689        let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
1690        let ranges = memtable
1691            .ranges(
1692                None,
1693                RangesOptions::default().with_predicate(predicate_group),
1694            )
1695            .unwrap();
1696
1697        assert_eq!(13, ranges.stats.num_rows);
1698
1699        let mut total_rows_read = 0;
1700        for (_range_id, range) in ranges.ranges.iter() {
1701            let record_batch_iter = range.build_record_batch_iter(None).unwrap();
1702            for batch_result in record_batch_iter {
1703                let batch = batch_result.unwrap();
1704                total_rows_read += batch.num_rows();
1705            }
1706        }
1707        assert_eq!(13, total_rows_read);
1708    }
1709
1710    #[test]
1711    fn test_bulk_memtable_unordered_part_with_ranges() {
1712        let metadata = metadata_for_test();
1713        let memtable = BulkMemtable::new(
1714            1003,
1715            metadata.clone(),
1716            None,
1717            None,
1718            false,
1719            MergeMode::LastRow,
1720        );
1721
1722        // Set small thresholds
1723        memtable.set_unordered_part_threshold(3);
1724        memtable.set_unordered_part_compact_threshold(100); // High threshold to prevent auto-compaction
1725
1726        // Write several small parts that stay in unordered_part
1727        for i in 0..3 {
1728            let part = create_bulk_part_with_converter(
1729                &format!("key_{}", i),
1730                i,
1731                vec![1000 + i as i64 * 100],
1732                vec![Some(i as f64 * 10.0)],
1733                100 + i as u64,
1734            )
1735            .unwrap();
1736            assert_eq!(1, part.num_rows());
1737            memtable.write_bulk(part).unwrap();
1738        }
1739
1740        let stats = memtable.stats();
1741        assert_eq!(3, stats.num_rows);
1742
1743        // Test that ranges() can correctly read from unordered_part
1744        let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
1745        let ranges = memtable
1746            .ranges(
1747                None,
1748                RangesOptions::default().with_predicate(predicate_group),
1749            )
1750            .unwrap();
1751
1752        // Should have 1 range for the unordered_part
1753        assert_eq!(1, ranges.ranges.len());
1754        assert_eq!(3, ranges.stats.num_rows);
1755
1756        // Verify data is sorted correctly in the range
1757        let range = ranges.ranges.get(&0).unwrap();
1758        let record_batch_iter = range.build_record_batch_iter(None).unwrap();
1759
1760        let mut total_rows = 0;
1761        for batch_result in record_batch_iter {
1762            let batch = batch_result.unwrap();
1763            total_rows += batch.num_rows();
1764            // Verify data is properly sorted by primary key
1765            assert!(batch.num_rows() > 0);
1766        }
1767        assert_eq!(3, total_rows);
1768    }
1769}