mito2/memtable/
bulk.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Memtable implementation for bulk load
16
17#[allow(unused)]
18pub mod context;
19#[allow(unused)]
20pub mod part;
21pub mod part_reader;
22mod row_group_reader;
23
24use std::collections::{BTreeMap, HashSet};
25use std::sync::atomic::{AtomicI64, AtomicU64, AtomicUsize, Ordering};
26use std::sync::{Arc, Mutex, RwLock};
27use std::time::Instant;
28
29use datatypes::arrow::datatypes::SchemaRef;
30use mito_codec::key_values::KeyValue;
31use rayon::prelude::*;
32use store_api::metadata::RegionMetadataRef;
33use store_api::storage::{ColumnId, FileId, RegionId, SequenceRange};
34use tokio::sync::Semaphore;
35
36use crate::error::{Result, UnsupportedOperationSnafu};
37use crate::flush::WriteBufferManagerRef;
38use crate::memtable::bulk::context::BulkIterContext;
39use crate::memtable::bulk::part::{
40    BulkPart, BulkPartEncodeMetrics, BulkPartEncoder, UnorderedPart,
41};
42use crate::memtable::bulk::part_reader::BulkPartRecordBatchIter;
43use crate::memtable::stats::WriteMetrics;
44use crate::memtable::{
45    AllocTracker, BoxedBatchIterator, BoxedRecordBatchIterator, EncodedBulkPart, EncodedRange,
46    IterBuilder, KeyValues, MemScanMetrics, Memtable, MemtableBuilder, MemtableId, MemtableRange,
47    MemtableRangeContext, MemtableRanges, MemtableRef, MemtableStats, RangesOptions,
48};
49use crate::read::flat_dedup::{FlatDedupIterator, FlatLastNonNull, FlatLastRow};
50use crate::read::flat_merge::FlatMergeIterator;
51use crate::region::options::MergeMode;
52use crate::sst::parquet::format::FIXED_POS_COLUMN_NUM;
53use crate::sst::parquet::{DEFAULT_READ_BATCH_SIZE, DEFAULT_ROW_GROUP_SIZE};
54use crate::sst::{FlatSchemaOptions, to_flat_sst_arrow_schema};
55
56/// All parts in a bulk memtable.
57#[derive(Default)]
58struct BulkParts {
59    /// Unordered small parts (< 1024 rows).
60    unordered_part: UnorderedPart,
61    /// Raw parts.
62    parts: Vec<BulkPartWrapper>,
63    /// Parts encoded as parquets.
64    encoded_parts: Vec<EncodedPartWrapper>,
65}
66
67impl BulkParts {
68    /// Total number of parts (raw + encoded + unordered).
69    fn num_parts(&self) -> usize {
70        let unordered_count = if self.unordered_part.is_empty() { 0 } else { 1 };
71        self.parts.len() + self.encoded_parts.len() + unordered_count
72    }
73
74    /// Returns true if there is no part.
75    fn is_empty(&self) -> bool {
76        self.unordered_part.is_empty() && self.parts.is_empty() && self.encoded_parts.is_empty()
77    }
78
79    /// Returns true if the bulk parts should be merged.
80    fn should_merge_bulk_parts(&self) -> bool {
81        let unmerged_count = self.parts.iter().filter(|wrapper| !wrapper.merging).count();
82        // If the total number of unmerged parts is >= 8, start a merge task.
83        unmerged_count >= 8
84    }
85
86    /// Returns true if the encoded parts should be merged.
87    fn should_merge_encoded_parts(&self) -> bool {
88        let unmerged_count = self
89            .encoded_parts
90            .iter()
91            .filter(|wrapper| !wrapper.merging)
92            .count();
93        // If the total number of unmerged encoded parts is >= 8, start a merge task.
94        unmerged_count >= 8
95    }
96
97    /// Returns true if the unordered_part should be compacted into a BulkPart.
98    fn should_compact_unordered_part(&self) -> bool {
99        self.unordered_part.should_compact()
100    }
101
102    /// Collects unmerged parts and marks them as being merged.
103    /// Returns the collected parts to merge.
104    fn collect_bulk_parts_to_merge(&mut self) -> Vec<PartToMerge> {
105        let mut collected_parts = Vec::new();
106
107        for wrapper in &mut self.parts {
108            if !wrapper.merging {
109                wrapper.merging = true;
110                collected_parts.push(PartToMerge::Bulk {
111                    part: wrapper.part.clone(),
112                    file_id: wrapper.file_id,
113                });
114            }
115        }
116        collected_parts
117    }
118
119    /// Collects unmerged encoded parts within size threshold and marks them as being merged.
120    /// Returns the collected parts to merge.
121    fn collect_encoded_parts_to_merge(&mut self) -> Vec<PartToMerge> {
122        // Find minimum size among unmerged parts
123        let min_size = self
124            .encoded_parts
125            .iter()
126            .filter(|wrapper| !wrapper.merging)
127            .map(|wrapper| wrapper.part.size_bytes())
128            .min();
129
130        let Some(min_size) = min_size else {
131            return Vec::new();
132        };
133
134        let max_allowed_size = min_size.saturating_mul(16).min(4 * 1024 * 1024);
135        let mut collected_parts = Vec::new();
136
137        for wrapper in &mut self.encoded_parts {
138            if !wrapper.merging {
139                let size = wrapper.part.size_bytes();
140                if size <= max_allowed_size {
141                    wrapper.merging = true;
142                    collected_parts.push(PartToMerge::Encoded {
143                        part: wrapper.part.clone(),
144                        file_id: wrapper.file_id,
145                    });
146                }
147            }
148        }
149        collected_parts
150    }
151
152    /// Installs merged encoded parts and removes the original parts by file ids.
153    /// Returns the total number of rows in the merged parts.
154    fn install_merged_parts<I>(
155        &mut self,
156        merged_parts: I,
157        merged_file_ids: &HashSet<FileId>,
158        merge_encoded: bool,
159    ) -> usize
160    where
161        I: IntoIterator<Item = EncodedBulkPart>,
162    {
163        let mut total_output_rows = 0;
164
165        for encoded_part in merged_parts {
166            total_output_rows += encoded_part.metadata().num_rows;
167            self.encoded_parts.push(EncodedPartWrapper {
168                part: encoded_part,
169                file_id: FileId::random(),
170                merging: false,
171            });
172        }
173
174        if merge_encoded {
175            self.encoded_parts
176                .retain(|wrapper| !merged_file_ids.contains(&wrapper.file_id));
177        } else {
178            self.parts
179                .retain(|wrapper| !merged_file_ids.contains(&wrapper.file_id));
180        }
181
182        total_output_rows
183    }
184
185    /// Resets merging flag for parts with the given file ids.
186    /// Used when merging fails or is cancelled.
187    fn reset_merging_flags(&mut self, file_ids: &HashSet<FileId>, merge_encoded: bool) {
188        if merge_encoded {
189            for wrapper in &mut self.encoded_parts {
190                if file_ids.contains(&wrapper.file_id) {
191                    wrapper.merging = false;
192                }
193            }
194        } else {
195            for wrapper in &mut self.parts {
196                if file_ids.contains(&wrapper.file_id) {
197                    wrapper.merging = false;
198                }
199            }
200        }
201    }
202}
203
204/// RAII guard for managing merging flags.
205/// Automatically resets merging flags when dropped if the merge operation wasn't successful.
206struct MergingFlagsGuard<'a> {
207    bulk_parts: &'a RwLock<BulkParts>,
208    file_ids: &'a HashSet<FileId>,
209    merge_encoded: bool,
210    success: bool,
211}
212
213impl<'a> MergingFlagsGuard<'a> {
214    /// Creates a new guard for the given file ids.
215    fn new(
216        bulk_parts: &'a RwLock<BulkParts>,
217        file_ids: &'a HashSet<FileId>,
218        merge_encoded: bool,
219    ) -> Self {
220        Self {
221            bulk_parts,
222            file_ids,
223            merge_encoded,
224            success: false,
225        }
226    }
227
228    /// Marks the merge operation as successful.
229    /// When this is called, the guard will not reset the flags on drop.
230    fn mark_success(&mut self) {
231        self.success = true;
232    }
233}
234
235impl<'a> Drop for MergingFlagsGuard<'a> {
236    fn drop(&mut self) {
237        if !self.success
238            && let Ok(mut parts) = self.bulk_parts.write()
239        {
240            parts.reset_merging_flags(self.file_ids, self.merge_encoded);
241        }
242    }
243}
244
245/// Memtable that ingests and scans parts directly.
246pub struct BulkMemtable {
247    id: MemtableId,
248    parts: Arc<RwLock<BulkParts>>,
249    metadata: RegionMetadataRef,
250    alloc_tracker: AllocTracker,
251    max_timestamp: AtomicI64,
252    min_timestamp: AtomicI64,
253    max_sequence: AtomicU64,
254    num_rows: AtomicUsize,
255    /// Cached flat SST arrow schema for memtable compaction.
256    flat_arrow_schema: SchemaRef,
257    /// Compactor for merging bulk parts
258    compactor: Arc<Mutex<MemtableCompactor>>,
259    /// Dispatcher for scheduling compaction tasks
260    compact_dispatcher: Option<Arc<CompactDispatcher>>,
261    /// Whether the append mode is enabled
262    append_mode: bool,
263    /// Mode to handle duplicate rows while merging
264    merge_mode: MergeMode,
265}
266
267impl std::fmt::Debug for BulkMemtable {
268    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
269        f.debug_struct("BulkMemtable")
270            .field("id", &self.id)
271            .field("num_rows", &self.num_rows.load(Ordering::Relaxed))
272            .field("min_timestamp", &self.min_timestamp.load(Ordering::Relaxed))
273            .field("max_timestamp", &self.max_timestamp.load(Ordering::Relaxed))
274            .field("max_sequence", &self.max_sequence.load(Ordering::Relaxed))
275            .finish()
276    }
277}
278
279impl Memtable for BulkMemtable {
280    fn id(&self) -> MemtableId {
281        self.id
282    }
283
284    fn write(&self, _kvs: &KeyValues) -> Result<()> {
285        UnsupportedOperationSnafu {
286            err_msg: "write() is not supported for bulk memtable",
287        }
288        .fail()
289    }
290
291    fn write_one(&self, _key_value: KeyValue) -> Result<()> {
292        UnsupportedOperationSnafu {
293            err_msg: "write_one() is not supported for bulk memtable",
294        }
295        .fail()
296    }
297
298    fn write_bulk(&self, fragment: BulkPart) -> Result<()> {
299        let local_metrics = WriteMetrics {
300            key_bytes: 0,
301            value_bytes: fragment.estimated_size(),
302            min_ts: fragment.min_timestamp,
303            max_ts: fragment.max_timestamp,
304            num_rows: fragment.num_rows(),
305            max_sequence: fragment.sequence,
306        };
307
308        {
309            let mut bulk_parts = self.parts.write().unwrap();
310
311            // Routes small parts to unordered_part based on threshold
312            if bulk_parts.unordered_part.should_accept(fragment.num_rows()) {
313                bulk_parts.unordered_part.push(fragment);
314
315                // Compacts unordered_part if threshold is reached
316                if bulk_parts.should_compact_unordered_part()
317                    && let Some(bulk_part) = bulk_parts.unordered_part.to_bulk_part()?
318                {
319                    bulk_parts.parts.push(BulkPartWrapper {
320                        part: bulk_part,
321                        file_id: FileId::random(),
322                        merging: false,
323                    });
324                    bulk_parts.unordered_part.clear();
325                }
326            } else {
327                bulk_parts.parts.push(BulkPartWrapper {
328                    part: fragment,
329                    file_id: FileId::random(),
330                    merging: false,
331                });
332            }
333
334            // Since this operation should be fast, we do it in parts lock scope.
335            // This ensure the statistics in `ranges()` are correct. What's more,
336            // it guarantees no rows are out of the time range so we don't need to
337            // prune rows by time range again in the iterator of the MemtableRange.
338            self.update_stats(local_metrics);
339        }
340
341        if self.should_compact() {
342            self.schedule_compact();
343        }
344
345        Ok(())
346    }
347
348    #[cfg(any(test, feature = "test"))]
349    fn iter(
350        &self,
351        _projection: Option<&[ColumnId]>,
352        _predicate: Option<table::predicate::Predicate>,
353        _sequence: Option<SequenceRange>,
354    ) -> Result<crate::memtable::BoxedBatchIterator> {
355        todo!()
356    }
357
358    fn ranges(
359        &self,
360        projection: Option<&[ColumnId]>,
361        options: RangesOptions,
362    ) -> Result<MemtableRanges> {
363        let predicate = options.predicate;
364        let sequence = options.sequence;
365        let mut ranges = BTreeMap::new();
366        let mut range_id = 0;
367
368        // TODO(yingwen): Filter ranges by sequence.
369        let context = Arc::new(BulkIterContext::new_with_pre_filter_mode(
370            self.metadata.clone(),
371            projection,
372            predicate.predicate().cloned(),
373            options.for_flush,
374            options.pre_filter_mode,
375        )?);
376
377        // Adds ranges for regular parts and encoded parts
378        {
379            let bulk_parts = self.parts.read().unwrap();
380
381            // Adds range for unordered part if not empty
382            if !bulk_parts.unordered_part.is_empty()
383                && let Some(unordered_bulk_part) = bulk_parts.unordered_part.to_bulk_part()?
384            {
385                let part_stats = unordered_bulk_part.to_memtable_stats(&self.metadata);
386                let range = MemtableRange::new(
387                    Arc::new(MemtableRangeContext::new(
388                        self.id,
389                        Box::new(BulkRangeIterBuilder {
390                            part: unordered_bulk_part,
391                            context: context.clone(),
392                            sequence,
393                        }),
394                        predicate.clone(),
395                    )),
396                    part_stats,
397                );
398                ranges.insert(range_id, range);
399                range_id += 1;
400            }
401
402            // Adds ranges for regular parts
403            for part_wrapper in bulk_parts.parts.iter() {
404                // Skips empty parts
405                if part_wrapper.part.num_rows() == 0 {
406                    continue;
407                }
408
409                let part_stats = part_wrapper.part.to_memtable_stats(&self.metadata);
410                let range = MemtableRange::new(
411                    Arc::new(MemtableRangeContext::new(
412                        self.id,
413                        Box::new(BulkRangeIterBuilder {
414                            part: part_wrapper.part.clone(),
415                            context: context.clone(),
416                            sequence,
417                        }),
418                        predicate.clone(),
419                    )),
420                    part_stats,
421                );
422                ranges.insert(range_id, range);
423                range_id += 1;
424            }
425
426            // Adds ranges for encoded parts
427            for encoded_part_wrapper in bulk_parts.encoded_parts.iter() {
428                // Skips empty parts
429                if encoded_part_wrapper.part.metadata().num_rows == 0 {
430                    continue;
431                }
432
433                let part_stats = encoded_part_wrapper.part.to_memtable_stats();
434                let range = MemtableRange::new(
435                    Arc::new(MemtableRangeContext::new(
436                        self.id,
437                        Box::new(EncodedBulkRangeIterBuilder {
438                            file_id: encoded_part_wrapper.file_id,
439                            part: encoded_part_wrapper.part.clone(),
440                            context: context.clone(),
441                            sequence,
442                        }),
443                        predicate.clone(),
444                    )),
445                    part_stats,
446                );
447                ranges.insert(range_id, range);
448                range_id += 1;
449            }
450        }
451
452        Ok(MemtableRanges { ranges })
453    }
454
455    fn is_empty(&self) -> bool {
456        let bulk_parts = self.parts.read().unwrap();
457        bulk_parts.is_empty()
458    }
459
460    fn freeze(&self) -> Result<()> {
461        self.alloc_tracker.done_allocating();
462        Ok(())
463    }
464
465    fn stats(&self) -> MemtableStats {
466        let estimated_bytes = self.alloc_tracker.bytes_allocated();
467
468        if estimated_bytes == 0 || self.num_rows.load(Ordering::Relaxed) == 0 {
469            return MemtableStats {
470                estimated_bytes,
471                time_range: None,
472                num_rows: 0,
473                num_ranges: 0,
474                max_sequence: 0,
475                series_count: 0,
476            };
477        }
478
479        let ts_type = self
480            .metadata
481            .time_index_column()
482            .column_schema
483            .data_type
484            .clone()
485            .as_timestamp()
486            .expect("Timestamp column must have timestamp type");
487        let max_timestamp = ts_type.create_timestamp(self.max_timestamp.load(Ordering::Relaxed));
488        let min_timestamp = ts_type.create_timestamp(self.min_timestamp.load(Ordering::Relaxed));
489
490        let num_ranges = self.parts.read().unwrap().num_parts();
491
492        MemtableStats {
493            estimated_bytes,
494            time_range: Some((min_timestamp, max_timestamp)),
495            num_rows: self.num_rows.load(Ordering::Relaxed),
496            num_ranges,
497            max_sequence: self.max_sequence.load(Ordering::Relaxed),
498            series_count: self.estimated_series_count(),
499        }
500    }
501
502    fn fork(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef {
503        // Computes the new flat schema based on the new metadata.
504        let flat_arrow_schema = to_flat_sst_arrow_schema(
505            metadata,
506            &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
507        );
508
509        Arc::new(Self {
510            id,
511            parts: Arc::new(RwLock::new(BulkParts::default())),
512            metadata: metadata.clone(),
513            alloc_tracker: AllocTracker::new(self.alloc_tracker.write_buffer_manager()),
514            max_timestamp: AtomicI64::new(i64::MIN),
515            min_timestamp: AtomicI64::new(i64::MAX),
516            max_sequence: AtomicU64::new(0),
517            num_rows: AtomicUsize::new(0),
518            flat_arrow_schema,
519            compactor: Arc::new(Mutex::new(MemtableCompactor::new(metadata.region_id, id))),
520            compact_dispatcher: self.compact_dispatcher.clone(),
521            append_mode: self.append_mode,
522            merge_mode: self.merge_mode,
523        })
524    }
525
526    fn compact(&self, for_flush: bool) -> Result<()> {
527        let mut compactor = self.compactor.lock().unwrap();
528
529        if for_flush {
530            return Ok(());
531        }
532
533        // Try to merge regular parts first
534        let should_merge = self.parts.read().unwrap().should_merge_bulk_parts();
535        if should_merge {
536            compactor.merge_bulk_parts(
537                &self.flat_arrow_schema,
538                &self.parts,
539                &self.metadata,
540                !self.append_mode,
541                self.merge_mode,
542            )?;
543        }
544
545        // Then try to merge encoded parts
546        let should_merge = self.parts.read().unwrap().should_merge_encoded_parts();
547        if should_merge {
548            compactor.merge_encoded_parts(
549                &self.flat_arrow_schema,
550                &self.parts,
551                &self.metadata,
552                !self.append_mode,
553                self.merge_mode,
554            )?;
555        }
556
557        Ok(())
558    }
559}
560
561impl BulkMemtable {
562    /// Creates a new BulkMemtable
563    pub fn new(
564        id: MemtableId,
565        metadata: RegionMetadataRef,
566        write_buffer_manager: Option<WriteBufferManagerRef>,
567        compact_dispatcher: Option<Arc<CompactDispatcher>>,
568        append_mode: bool,
569        merge_mode: MergeMode,
570    ) -> Self {
571        let flat_arrow_schema = to_flat_sst_arrow_schema(
572            &metadata,
573            &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
574        );
575
576        let region_id = metadata.region_id;
577        Self {
578            id,
579            parts: Arc::new(RwLock::new(BulkParts::default())),
580            metadata,
581            alloc_tracker: AllocTracker::new(write_buffer_manager),
582            max_timestamp: AtomicI64::new(i64::MIN),
583            min_timestamp: AtomicI64::new(i64::MAX),
584            max_sequence: AtomicU64::new(0),
585            num_rows: AtomicUsize::new(0),
586            flat_arrow_schema,
587            compactor: Arc::new(Mutex::new(MemtableCompactor::new(region_id, id))),
588            compact_dispatcher,
589            append_mode,
590            merge_mode,
591        }
592    }
593
594    /// Sets the unordered part threshold (for testing).
595    #[cfg(test)]
596    pub fn set_unordered_part_threshold(&self, threshold: usize) {
597        self.parts
598            .write()
599            .unwrap()
600            .unordered_part
601            .set_threshold(threshold);
602    }
603
604    /// Sets the unordered part compact threshold (for testing).
605    #[cfg(test)]
606    pub fn set_unordered_part_compact_threshold(&self, compact_threshold: usize) {
607        self.parts
608            .write()
609            .unwrap()
610            .unordered_part
611            .set_compact_threshold(compact_threshold);
612    }
613
614    /// Updates memtable stats.
615    ///
616    /// Please update this inside the write lock scope.
617    fn update_stats(&self, stats: WriteMetrics) {
618        self.alloc_tracker
619            .on_allocation(stats.key_bytes + stats.value_bytes);
620
621        self.max_timestamp
622            .fetch_max(stats.max_ts, Ordering::Relaxed);
623        self.min_timestamp
624            .fetch_min(stats.min_ts, Ordering::Relaxed);
625        self.max_sequence
626            .fetch_max(stats.max_sequence, Ordering::Relaxed);
627        self.num_rows.fetch_add(stats.num_rows, Ordering::Relaxed);
628    }
629
630    /// Returns the estimated time series count.
631    fn estimated_series_count(&self) -> usize {
632        let bulk_parts = self.parts.read().unwrap();
633        bulk_parts
634            .parts
635            .iter()
636            .map(|part_wrapper| part_wrapper.part.estimated_series_count())
637            .sum()
638    }
639
640    /// Returns whether the memtable should be compacted.
641    fn should_compact(&self) -> bool {
642        let parts = self.parts.read().unwrap();
643        parts.should_merge_bulk_parts() || parts.should_merge_encoded_parts()
644    }
645
646    /// Schedules a compaction task using the CompactDispatcher.
647    fn schedule_compact(&self) {
648        if let Some(dispatcher) = &self.compact_dispatcher {
649            let task = MemCompactTask {
650                metadata: self.metadata.clone(),
651                parts: self.parts.clone(),
652                flat_arrow_schema: self.flat_arrow_schema.clone(),
653                compactor: self.compactor.clone(),
654                append_mode: self.append_mode,
655                merge_mode: self.merge_mode,
656            };
657
658            dispatcher.dispatch_compact(task);
659        } else {
660            // Uses synchronous compaction if no dispatcher is available.
661            if let Err(e) = self.compact(false) {
662                common_telemetry::error!(e; "Failed to compact table");
663            }
664        }
665    }
666}
667
668/// Iterator builder for bulk range
669pub struct BulkRangeIterBuilder {
670    pub part: BulkPart,
671    pub context: Arc<BulkIterContext>,
672    pub sequence: Option<SequenceRange>,
673}
674
675impl IterBuilder for BulkRangeIterBuilder {
676    fn build(&self, _metrics: Option<MemScanMetrics>) -> Result<BoxedBatchIterator> {
677        UnsupportedOperationSnafu {
678            err_msg: "BatchIterator is not supported for bulk memtable",
679        }
680        .fail()
681    }
682
683    fn is_record_batch(&self) -> bool {
684        true
685    }
686
687    fn build_record_batch(
688        &self,
689        metrics: Option<MemScanMetrics>,
690    ) -> Result<BoxedRecordBatchIterator> {
691        let series_count = self.part.estimated_series_count();
692        let iter = BulkPartRecordBatchIter::new(
693            self.part.batch.clone(),
694            self.context.clone(),
695            self.sequence,
696            series_count,
697            metrics,
698        );
699
700        Ok(Box::new(iter))
701    }
702
703    fn encoded_range(&self) -> Option<EncodedRange> {
704        None
705    }
706}
707
708/// Iterator builder for encoded bulk range
709struct EncodedBulkRangeIterBuilder {
710    file_id: FileId,
711    part: EncodedBulkPart,
712    context: Arc<BulkIterContext>,
713    sequence: Option<SequenceRange>,
714}
715
716impl IterBuilder for EncodedBulkRangeIterBuilder {
717    fn build(&self, _metrics: Option<MemScanMetrics>) -> Result<BoxedBatchIterator> {
718        UnsupportedOperationSnafu {
719            err_msg: "BatchIterator is not supported for encoded bulk memtable",
720        }
721        .fail()
722    }
723
724    fn is_record_batch(&self) -> bool {
725        true
726    }
727
728    fn build_record_batch(
729        &self,
730        metrics: Option<MemScanMetrics>,
731    ) -> Result<BoxedRecordBatchIterator> {
732        if let Some(iter) = self
733            .part
734            .read(self.context.clone(), self.sequence, metrics)?
735        {
736            Ok(iter)
737        } else {
738            // Return an empty iterator if no data to read
739            Ok(Box::new(std::iter::empty()))
740        }
741    }
742
743    fn encoded_range(&self) -> Option<EncodedRange> {
744        Some(EncodedRange {
745            data: self.part.data().clone(),
746            sst_info: self.part.to_sst_info(self.file_id),
747        })
748    }
749}
750
751struct BulkPartWrapper {
752    part: BulkPart,
753    /// The unique file id for this part in memtable.
754    file_id: FileId,
755    /// Whether this part is currently being merged.
756    merging: bool,
757}
758
759struct EncodedPartWrapper {
760    part: EncodedBulkPart,
761    /// The unique file id for this part in memtable.
762    file_id: FileId,
763    /// Whether this part is currently being merged.
764    merging: bool,
765}
766
767/// Enum to wrap different types of parts for unified merging.
768#[derive(Clone)]
769enum PartToMerge {
770    /// Raw bulk part.
771    Bulk { part: BulkPart, file_id: FileId },
772    /// Encoded bulk part.
773    Encoded {
774        part: EncodedBulkPart,
775        file_id: FileId,
776    },
777}
778
779impl PartToMerge {
780    /// Gets the file ID of this part.
781    fn file_id(&self) -> FileId {
782        match self {
783            PartToMerge::Bulk { file_id, .. } => *file_id,
784            PartToMerge::Encoded { file_id, .. } => *file_id,
785        }
786    }
787
788    /// Gets the minimum timestamp of this part.
789    fn min_timestamp(&self) -> i64 {
790        match self {
791            PartToMerge::Bulk { part, .. } => part.min_timestamp,
792            PartToMerge::Encoded { part, .. } => part.metadata().min_timestamp,
793        }
794    }
795
796    /// Gets the maximum timestamp of this part.
797    fn max_timestamp(&self) -> i64 {
798        match self {
799            PartToMerge::Bulk { part, .. } => part.max_timestamp,
800            PartToMerge::Encoded { part, .. } => part.metadata().max_timestamp,
801        }
802    }
803
804    /// Gets the number of rows in this part.
805    fn num_rows(&self) -> usize {
806        match self {
807            PartToMerge::Bulk { part, .. } => part.num_rows(),
808            PartToMerge::Encoded { part, .. } => part.metadata().num_rows,
809        }
810    }
811
812    /// Gets the maximum sequence number of this part.
813    fn max_sequence(&self) -> u64 {
814        match self {
815            PartToMerge::Bulk { part, .. } => part.sequence,
816            PartToMerge::Encoded { part, .. } => part.metadata().max_sequence,
817        }
818    }
819
820    /// Creates a record batch iterator for this part.
821    fn create_iterator(
822        self,
823        context: Arc<BulkIterContext>,
824    ) -> Result<Option<BoxedRecordBatchIterator>> {
825        match self {
826            PartToMerge::Bulk { part, .. } => {
827                let series_count = part.estimated_series_count();
828                let iter = BulkPartRecordBatchIter::new(
829                    part.batch,
830                    context,
831                    None, // No sequence filter for merging
832                    series_count,
833                    None, // No metrics for merging
834                );
835                Ok(Some(Box::new(iter) as BoxedRecordBatchIterator))
836            }
837            PartToMerge::Encoded { part, .. } => part.read(context, None, None),
838        }
839    }
840}
841
842struct MemtableCompactor {
843    region_id: RegionId,
844    memtable_id: MemtableId,
845}
846
847impl MemtableCompactor {
848    /// Creates a new MemtableCompactor.
849    fn new(region_id: RegionId, memtable_id: MemtableId) -> Self {
850        Self {
851            region_id,
852            memtable_id,
853        }
854    }
855
856    /// Merges bulk parts and then encodes the result to an [EncodedBulkPart].
857    fn merge_bulk_parts(
858        &mut self,
859        arrow_schema: &SchemaRef,
860        bulk_parts: &RwLock<BulkParts>,
861        metadata: &RegionMetadataRef,
862        dedup: bool,
863        merge_mode: MergeMode,
864    ) -> Result<()> {
865        let start = Instant::now();
866
867        // Collects unmerged parts and mark them as being merged
868        let parts_to_merge = bulk_parts.write().unwrap().collect_bulk_parts_to_merge();
869        if parts_to_merge.is_empty() {
870            return Ok(());
871        }
872
873        let merged_file_ids: HashSet<FileId> =
874            parts_to_merge.iter().map(|part| part.file_id()).collect();
875        let mut guard = MergingFlagsGuard::new(bulk_parts, &merged_file_ids, false);
876
877        // Sorts parts by row count (ascending) to merge parts with similar row counts.
878        let mut sorted_parts = parts_to_merge;
879        sorted_parts.sort_unstable_by_key(|part| part.num_rows());
880
881        // Groups parts into chunks for concurrent processing.
882        let part_groups: Vec<Vec<PartToMerge>> = sorted_parts
883            .chunks(16)
884            .map(|chunk| chunk.to_vec())
885            .collect();
886
887        let total_groups = part_groups.len();
888        let total_parts_to_merge: usize = part_groups.iter().map(|group| group.len()).sum();
889        let merged_parts = part_groups
890            .into_par_iter()
891            .map(|group| Self::merge_parts_group(group, arrow_schema, metadata, dedup, merge_mode))
892            .collect::<Result<Vec<Option<EncodedBulkPart>>>>()?;
893
894        // Installs merged parts.
895        let total_output_rows = {
896            let mut parts = bulk_parts.write().unwrap();
897            parts.install_merged_parts(merged_parts.into_iter().flatten(), &merged_file_ids, false)
898        };
899
900        guard.mark_success();
901
902        common_telemetry::debug!(
903            "BulkMemtable {} {} concurrent compact {} groups, {} bulk parts, {} rows, cost: {:?}",
904            self.region_id,
905            self.memtable_id,
906            total_groups,
907            total_parts_to_merge,
908            total_output_rows,
909            start.elapsed()
910        );
911
912        Ok(())
913    }
914
915    /// Merges encoded parts and then encodes the result to an [EncodedBulkPart].
916    fn merge_encoded_parts(
917        &mut self,
918        arrow_schema: &SchemaRef,
919        bulk_parts: &RwLock<BulkParts>,
920        metadata: &RegionMetadataRef,
921        dedup: bool,
922        merge_mode: MergeMode,
923    ) -> Result<()> {
924        let start = Instant::now();
925
926        // Collects unmerged encoded parts within size threshold and mark them as being merged.
927        let parts_to_merge = {
928            let mut parts = bulk_parts.write().unwrap();
929            parts.collect_encoded_parts_to_merge()
930        };
931
932        if parts_to_merge.is_empty() {
933            return Ok(());
934        }
935
936        let merged_file_ids: HashSet<FileId> =
937            parts_to_merge.iter().map(|part| part.file_id()).collect();
938        let mut guard = MergingFlagsGuard::new(bulk_parts, &merged_file_ids, true);
939
940        if parts_to_merge.len() == 1 {
941            // Only 1 part, don't have to merge - the guard will automatically reset the flag
942            return Ok(());
943        }
944
945        // Groups parts into chunks for concurrent processing.
946        let part_groups: Vec<Vec<PartToMerge>> = parts_to_merge
947            .chunks(16)
948            .map(|chunk| chunk.to_vec())
949            .collect();
950
951        let total_groups = part_groups.len();
952        let total_parts_to_merge: usize = part_groups.iter().map(|group| group.len()).sum();
953
954        let merged_parts = part_groups
955            .into_par_iter()
956            .map(|group| Self::merge_parts_group(group, arrow_schema, metadata, dedup, merge_mode))
957            .collect::<Result<Vec<Option<EncodedBulkPart>>>>()?;
958
959        // Installs merged parts using iterator and get total output rows
960        let total_output_rows = {
961            let mut parts = bulk_parts.write().unwrap();
962            parts.install_merged_parts(merged_parts.into_iter().flatten(), &merged_file_ids, true)
963        };
964
965        // Marks the operation as successful to prevent flag reset
966        guard.mark_success();
967
968        common_telemetry::debug!(
969            "BulkMemtable {} {} concurrent compact {} groups, {} encoded parts, {} rows, cost: {:?}",
970            self.region_id,
971            self.memtable_id,
972            total_groups,
973            total_parts_to_merge,
974            total_output_rows,
975            start.elapsed()
976        );
977
978        Ok(())
979    }
980
981    /// Merges a group of parts into a single encoded part.
982    fn merge_parts_group(
983        parts_to_merge: Vec<PartToMerge>,
984        arrow_schema: &SchemaRef,
985        metadata: &RegionMetadataRef,
986        dedup: bool,
987        merge_mode: MergeMode,
988    ) -> Result<Option<EncodedBulkPart>> {
989        if parts_to_merge.is_empty() {
990            return Ok(None);
991        }
992
993        // Calculates timestamp bounds and max sequence for merged data
994        let min_timestamp = parts_to_merge
995            .iter()
996            .map(|p| p.min_timestamp())
997            .min()
998            .unwrap_or(i64::MAX);
999        let max_timestamp = parts_to_merge
1000            .iter()
1001            .map(|p| p.max_timestamp())
1002            .max()
1003            .unwrap_or(i64::MIN);
1004        let max_sequence = parts_to_merge
1005            .iter()
1006            .map(|p| p.max_sequence())
1007            .max()
1008            .unwrap_or(0);
1009
1010        let context = Arc::new(BulkIterContext::new(
1011            metadata.clone(),
1012            None, // No column projection for merging
1013            None, // No predicate for merging
1014            true,
1015        )?);
1016
1017        // Creates iterators for all parts to merge.
1018        let iterators: Vec<BoxedRecordBatchIterator> = parts_to_merge
1019            .into_iter()
1020            .filter_map(|part| part.create_iterator(context.clone()).ok().flatten())
1021            .collect();
1022
1023        if iterators.is_empty() {
1024            return Ok(None);
1025        }
1026
1027        let merged_iter =
1028            FlatMergeIterator::new(arrow_schema.clone(), iterators, DEFAULT_READ_BATCH_SIZE)?;
1029
1030        let boxed_iter: BoxedRecordBatchIterator = if dedup {
1031            // Applies deduplication based on merge mode
1032            match merge_mode {
1033                MergeMode::LastRow => {
1034                    let dedup_iter = FlatDedupIterator::new(merged_iter, FlatLastRow::new(false));
1035                    Box::new(dedup_iter)
1036                }
1037                MergeMode::LastNonNull => {
1038                    // Calculates field column start: total columns - fixed columns - field columns
1039                    // Field column count = total metadata columns - time index column - primary key columns
1040                    let field_column_count =
1041                        metadata.column_metadatas.len() - 1 - metadata.primary_key.len();
1042                    let total_columns = arrow_schema.fields().len();
1043                    let field_column_start =
1044                        total_columns - FIXED_POS_COLUMN_NUM - field_column_count;
1045
1046                    let dedup_iter = FlatDedupIterator::new(
1047                        merged_iter,
1048                        FlatLastNonNull::new(field_column_start, false),
1049                    );
1050                    Box::new(dedup_iter)
1051                }
1052            }
1053        } else {
1054            Box::new(merged_iter)
1055        };
1056
1057        // Encodes the merged iterator
1058        let encoder = BulkPartEncoder::new(metadata.clone(), DEFAULT_ROW_GROUP_SIZE)?;
1059        let mut metrics = BulkPartEncodeMetrics::default();
1060        let encoded_part = encoder.encode_record_batch_iter(
1061            boxed_iter,
1062            arrow_schema.clone(),
1063            min_timestamp,
1064            max_timestamp,
1065            max_sequence,
1066            &mut metrics,
1067        )?;
1068
1069        common_telemetry::trace!("merge_parts_group metrics: {:?}", metrics);
1070
1071        Ok(encoded_part)
1072    }
1073}
1074
1075/// A memtable compact task to run in background.
1076struct MemCompactTask {
1077    metadata: RegionMetadataRef,
1078    parts: Arc<RwLock<BulkParts>>,
1079
1080    /// Cached flat SST arrow schema
1081    flat_arrow_schema: SchemaRef,
1082    /// Compactor for merging bulk parts
1083    compactor: Arc<Mutex<MemtableCompactor>>,
1084    /// Whether the append mode is enabled
1085    append_mode: bool,
1086    /// Mode to handle duplicate rows while merging
1087    merge_mode: MergeMode,
1088}
1089
1090impl MemCompactTask {
1091    fn compact(&self) -> Result<()> {
1092        let mut compactor = self.compactor.lock().unwrap();
1093
1094        // Tries to merge regular parts first
1095        let should_merge = self.parts.read().unwrap().should_merge_bulk_parts();
1096        if should_merge {
1097            compactor.merge_bulk_parts(
1098                &self.flat_arrow_schema,
1099                &self.parts,
1100                &self.metadata,
1101                !self.append_mode,
1102                self.merge_mode,
1103            )?;
1104        }
1105
1106        // Then tries to merge encoded parts
1107        let should_merge = self.parts.read().unwrap().should_merge_encoded_parts();
1108        if should_merge {
1109            compactor.merge_encoded_parts(
1110                &self.flat_arrow_schema,
1111                &self.parts,
1112                &self.metadata,
1113                !self.append_mode,
1114                self.merge_mode,
1115            )?;
1116        }
1117
1118        Ok(())
1119    }
1120}
1121
1122/// Scheduler to run compact tasks in background.
1123#[derive(Debug)]
1124pub struct CompactDispatcher {
1125    semaphore: Arc<Semaphore>,
1126}
1127
1128impl CompactDispatcher {
1129    /// Creates a new dispatcher with the given number of max concurrent tasks.
1130    pub fn new(permits: usize) -> Self {
1131        Self {
1132            semaphore: Arc::new(Semaphore::new(permits)),
1133        }
1134    }
1135
1136    /// Dispatches a compact task to run in background.
1137    fn dispatch_compact(&self, task: MemCompactTask) {
1138        let semaphore = self.semaphore.clone();
1139        common_runtime::spawn_global(async move {
1140            let Ok(_permit) = semaphore.acquire().await else {
1141                return;
1142            };
1143
1144            common_runtime::spawn_blocking_global(move || {
1145                if let Err(e) = task.compact() {
1146                    common_telemetry::error!(e; "Failed to compact memtable, region: {}", task.metadata.region_id);
1147                }
1148            });
1149        });
1150    }
1151}
1152
1153/// Builder to build a [BulkMemtable].
1154#[derive(Debug, Default)]
1155pub struct BulkMemtableBuilder {
1156    write_buffer_manager: Option<WriteBufferManagerRef>,
1157    compact_dispatcher: Option<Arc<CompactDispatcher>>,
1158    append_mode: bool,
1159    merge_mode: MergeMode,
1160}
1161
1162impl BulkMemtableBuilder {
1163    /// Creates a new builder with specific `write_buffer_manager`.
1164    pub fn new(
1165        write_buffer_manager: Option<WriteBufferManagerRef>,
1166        append_mode: bool,
1167        merge_mode: MergeMode,
1168    ) -> Self {
1169        Self {
1170            write_buffer_manager,
1171            compact_dispatcher: None,
1172            append_mode,
1173            merge_mode,
1174        }
1175    }
1176
1177    /// Sets the compact dispatcher.
1178    pub fn with_compact_dispatcher(mut self, compact_dispatcher: Arc<CompactDispatcher>) -> Self {
1179        self.compact_dispatcher = Some(compact_dispatcher);
1180        self
1181    }
1182}
1183
1184impl MemtableBuilder for BulkMemtableBuilder {
1185    fn build(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef {
1186        Arc::new(BulkMemtable::new(
1187            id,
1188            metadata.clone(),
1189            self.write_buffer_manager.clone(),
1190            self.compact_dispatcher.clone(),
1191            self.append_mode,
1192            self.merge_mode,
1193        ))
1194    }
1195
1196    fn use_bulk_insert(&self, _metadata: &RegionMetadataRef) -> bool {
1197        true
1198    }
1199}
1200
1201#[cfg(test)]
1202mod tests {
1203    use mito_codec::row_converter::build_primary_key_codec;
1204
1205    use super::*;
1206    use crate::memtable::bulk::part::BulkPartConverter;
1207    use crate::read::scan_region::PredicateGroup;
1208    use crate::sst::{FlatSchemaOptions, to_flat_sst_arrow_schema};
1209    use crate::test_util::memtable_util::{build_key_values_with_ts_seq_values, metadata_for_test};
1210
1211    fn create_bulk_part_with_converter(
1212        k0: &str,
1213        k1: u32,
1214        timestamps: Vec<i64>,
1215        values: Vec<Option<f64>>,
1216        sequence: u64,
1217    ) -> Result<BulkPart> {
1218        let metadata = metadata_for_test();
1219        let capacity = 100;
1220        let primary_key_codec = build_primary_key_codec(&metadata);
1221        let schema = to_flat_sst_arrow_schema(
1222            &metadata,
1223            &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
1224        );
1225
1226        let mut converter =
1227            BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec, true);
1228
1229        let key_values = build_key_values_with_ts_seq_values(
1230            &metadata,
1231            k0.to_string(),
1232            k1,
1233            timestamps.into_iter(),
1234            values.into_iter(),
1235            sequence,
1236        );
1237
1238        converter.append_key_values(&key_values)?;
1239        converter.convert()
1240    }
1241
1242    #[test]
1243    fn test_bulk_memtable_write_read() {
1244        let metadata = metadata_for_test();
1245        let memtable =
1246            BulkMemtable::new(999, metadata.clone(), None, None, false, MergeMode::LastRow);
1247        // Disable unordered_part for this test
1248        memtable.set_unordered_part_threshold(0);
1249
1250        let test_data = [
1251            (
1252                "key_a",
1253                1u32,
1254                vec![1000i64, 2000i64],
1255                vec![Some(10.5), Some(20.5)],
1256                100u64,
1257            ),
1258            (
1259                "key_b",
1260                2u32,
1261                vec![1500i64, 2500i64],
1262                vec![Some(15.5), Some(25.5)],
1263                200u64,
1264            ),
1265            ("key_c", 3u32, vec![3000i64], vec![Some(30.5)], 300u64),
1266        ];
1267
1268        for (k0, k1, timestamps, values, seq) in test_data.iter() {
1269            let part =
1270                create_bulk_part_with_converter(k0, *k1, timestamps.clone(), values.clone(), *seq)
1271                    .unwrap();
1272            memtable.write_bulk(part).unwrap();
1273        }
1274
1275        let stats = memtable.stats();
1276        assert_eq!(5, stats.num_rows);
1277        assert_eq!(3, stats.num_ranges);
1278        assert_eq!(300, stats.max_sequence);
1279
1280        let (min_ts, max_ts) = stats.time_range.unwrap();
1281        assert_eq!(1000, min_ts.value());
1282        assert_eq!(3000, max_ts.value());
1283
1284        let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
1285        let ranges = memtable
1286            .ranges(
1287                None,
1288                RangesOptions::default().with_predicate(predicate_group),
1289            )
1290            .unwrap();
1291
1292        assert_eq!(3, ranges.ranges.len());
1293        let total_rows: usize = ranges.ranges.values().map(|r| r.stats().num_rows()).sum();
1294        assert_eq!(5, total_rows);
1295
1296        for (_range_id, range) in ranges.ranges.iter() {
1297            assert!(range.num_rows() > 0);
1298            assert!(range.is_record_batch());
1299
1300            let record_batch_iter = range.build_record_batch_iter(None).unwrap();
1301
1302            let mut total_rows = 0;
1303            for batch_result in record_batch_iter {
1304                let batch = batch_result.unwrap();
1305                total_rows += batch.num_rows();
1306                assert!(batch.num_rows() > 0);
1307                assert_eq!(8, batch.num_columns());
1308            }
1309            assert_eq!(total_rows, range.num_rows());
1310        }
1311    }
1312
1313    #[test]
1314    fn test_bulk_memtable_ranges_with_projection() {
1315        let metadata = metadata_for_test();
1316        let memtable =
1317            BulkMemtable::new(111, metadata.clone(), None, None, false, MergeMode::LastRow);
1318
1319        let bulk_part = create_bulk_part_with_converter(
1320            "projection_test",
1321            5,
1322            vec![5000, 6000, 7000],
1323            vec![Some(50.0), Some(60.0), Some(70.0)],
1324            500,
1325        )
1326        .unwrap();
1327
1328        memtable.write_bulk(bulk_part).unwrap();
1329
1330        let projection = vec![4u32];
1331        let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
1332        let ranges = memtable
1333            .ranges(
1334                Some(&projection),
1335                RangesOptions::default().with_predicate(predicate_group),
1336            )
1337            .unwrap();
1338
1339        assert_eq!(1, ranges.ranges.len());
1340        let range = ranges.ranges.get(&0).unwrap();
1341
1342        assert!(range.is_record_batch());
1343        let record_batch_iter = range.build_record_batch_iter(None).unwrap();
1344
1345        let mut total_rows = 0;
1346        for batch_result in record_batch_iter {
1347            let batch = batch_result.unwrap();
1348            assert!(batch.num_rows() > 0);
1349            assert_eq!(5, batch.num_columns());
1350            total_rows += batch.num_rows();
1351        }
1352        assert_eq!(3, total_rows);
1353    }
1354
1355    #[test]
1356    fn test_bulk_memtable_unsupported_operations() {
1357        let metadata = metadata_for_test();
1358        let memtable =
1359            BulkMemtable::new(111, metadata.clone(), None, None, false, MergeMode::LastRow);
1360
1361        let key_values = build_key_values_with_ts_seq_values(
1362            &metadata,
1363            "test".to_string(),
1364            1,
1365            vec![1000].into_iter(),
1366            vec![Some(1.0)].into_iter(),
1367            1,
1368        );
1369
1370        let err = memtable.write(&key_values).unwrap_err();
1371        assert!(err.to_string().contains("not supported"));
1372
1373        let kv = key_values.iter().next().unwrap();
1374        let err = memtable.write_one(kv).unwrap_err();
1375        assert!(err.to_string().contains("not supported"));
1376    }
1377
1378    #[test]
1379    fn test_bulk_memtable_freeze() {
1380        let metadata = metadata_for_test();
1381        let memtable =
1382            BulkMemtable::new(222, metadata.clone(), None, None, false, MergeMode::LastRow);
1383
1384        let bulk_part = create_bulk_part_with_converter(
1385            "freeze_test",
1386            10,
1387            vec![10000],
1388            vec![Some(100.0)],
1389            1000,
1390        )
1391        .unwrap();
1392
1393        memtable.write_bulk(bulk_part).unwrap();
1394        memtable.freeze().unwrap();
1395
1396        let stats_after_freeze = memtable.stats();
1397        assert_eq!(1, stats_after_freeze.num_rows);
1398    }
1399
1400    #[test]
1401    fn test_bulk_memtable_fork() {
1402        let metadata = metadata_for_test();
1403        let original_memtable =
1404            BulkMemtable::new(333, metadata.clone(), None, None, false, MergeMode::LastRow);
1405
1406        let bulk_part =
1407            create_bulk_part_with_converter("fork_test", 15, vec![15000], vec![Some(150.0)], 1500)
1408                .unwrap();
1409
1410        original_memtable.write_bulk(bulk_part).unwrap();
1411
1412        let forked_memtable = original_memtable.fork(444, &metadata);
1413
1414        assert_eq!(forked_memtable.id(), 444);
1415        assert!(forked_memtable.is_empty());
1416        assert_eq!(0, forked_memtable.stats().num_rows);
1417
1418        assert!(!original_memtable.is_empty());
1419        assert_eq!(1, original_memtable.stats().num_rows);
1420    }
1421
1422    #[test]
1423    fn test_bulk_memtable_ranges_multiple_parts() {
1424        let metadata = metadata_for_test();
1425        let memtable =
1426            BulkMemtable::new(777, metadata.clone(), None, None, false, MergeMode::LastRow);
1427        // Disable unordered_part for this test
1428        memtable.set_unordered_part_threshold(0);
1429
1430        let parts_data = vec![
1431            (
1432                "part1",
1433                1u32,
1434                vec![1000i64, 1100i64],
1435                vec![Some(10.0), Some(11.0)],
1436                100u64,
1437            ),
1438            (
1439                "part2",
1440                2u32,
1441                vec![2000i64, 2100i64],
1442                vec![Some(20.0), Some(21.0)],
1443                200u64,
1444            ),
1445            ("part3", 3u32, vec![3000i64], vec![Some(30.0)], 300u64),
1446        ];
1447
1448        for (k0, k1, timestamps, values, seq) in parts_data {
1449            let part = create_bulk_part_with_converter(k0, k1, timestamps, values, seq).unwrap();
1450            memtable.write_bulk(part).unwrap();
1451        }
1452
1453        let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
1454        let ranges = memtable
1455            .ranges(
1456                None,
1457                RangesOptions::default().with_predicate(predicate_group),
1458            )
1459            .unwrap();
1460
1461        assert_eq!(3, ranges.ranges.len());
1462        let total_rows: usize = ranges.ranges.values().map(|r| r.stats().num_rows()).sum();
1463        assert_eq!(5, total_rows);
1464        assert_eq!(3, ranges.ranges.len());
1465
1466        for (range_id, range) in ranges.ranges.iter() {
1467            assert!(*range_id < 3);
1468            assert!(range.num_rows() > 0);
1469            assert!(range.is_record_batch());
1470        }
1471    }
1472
1473    #[test]
1474    fn test_bulk_memtable_ranges_with_sequence_filter() {
1475        let metadata = metadata_for_test();
1476        let memtable =
1477            BulkMemtable::new(888, metadata.clone(), None, None, false, MergeMode::LastRow);
1478
1479        let part = create_bulk_part_with_converter(
1480            "seq_test",
1481            1,
1482            vec![1000, 2000, 3000],
1483            vec![Some(10.0), Some(20.0), Some(30.0)],
1484            500,
1485        )
1486        .unwrap();
1487
1488        memtable.write_bulk(part).unwrap();
1489
1490        let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
1491        let sequence_filter = Some(SequenceRange::LtEq { max: 400 }); // Filters out rows with sequence > 400
1492        let ranges = memtable
1493            .ranges(
1494                None,
1495                RangesOptions::default()
1496                    .with_predicate(predicate_group)
1497                    .with_sequence(sequence_filter),
1498            )
1499            .unwrap();
1500
1501        assert_eq!(1, ranges.ranges.len());
1502        let range = ranges.ranges.get(&0).unwrap();
1503
1504        let mut record_batch_iter = range.build_record_batch_iter(None).unwrap();
1505        assert!(record_batch_iter.next().is_none());
1506    }
1507
1508    #[test]
1509    fn test_bulk_memtable_ranges_with_encoded_parts() {
1510        let metadata = metadata_for_test();
1511        let memtable =
1512            BulkMemtable::new(999, metadata.clone(), None, None, false, MergeMode::LastRow);
1513        // Disable unordered_part for this test
1514        memtable.set_unordered_part_threshold(0);
1515
1516        // Adds enough bulk parts to trigger encoding
1517        for i in 0..10 {
1518            let part = create_bulk_part_with_converter(
1519                &format!("key_{}", i),
1520                i,
1521                vec![1000 + i as i64 * 100],
1522                vec![Some(i as f64 * 10.0)],
1523                100 + i as u64,
1524            )
1525            .unwrap();
1526            memtable.write_bulk(part).unwrap();
1527        }
1528
1529        memtable.compact(false).unwrap();
1530
1531        let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
1532        let ranges = memtable
1533            .ranges(
1534                None,
1535                RangesOptions::default().with_predicate(predicate_group),
1536            )
1537            .unwrap();
1538
1539        // Should have ranges for both bulk parts and encoded parts
1540        assert_eq!(3, ranges.ranges.len());
1541        let total_rows: usize = ranges.ranges.values().map(|r| r.stats().num_rows()).sum();
1542        assert_eq!(10, total_rows);
1543
1544        for (_range_id, range) in ranges.ranges.iter() {
1545            assert!(range.num_rows() > 0);
1546            assert!(range.is_record_batch());
1547
1548            let record_batch_iter = range.build_record_batch_iter(None).unwrap();
1549            let mut total_rows = 0;
1550            for batch_result in record_batch_iter {
1551                let batch = batch_result.unwrap();
1552                total_rows += batch.num_rows();
1553                assert!(batch.num_rows() > 0);
1554            }
1555            assert_eq!(total_rows, range.num_rows());
1556        }
1557    }
1558
1559    #[test]
1560    fn test_bulk_memtable_unordered_part() {
1561        let metadata = metadata_for_test();
1562        let memtable = BulkMemtable::new(
1563            1001,
1564            metadata.clone(),
1565            None,
1566            None,
1567            false,
1568            MergeMode::LastRow,
1569        );
1570
1571        // Set smaller thresholds for testing with smaller inputs
1572        // Accept parts with < 5 rows into unordered_part
1573        memtable.set_unordered_part_threshold(5);
1574        // Compact when total rows >= 10
1575        memtable.set_unordered_part_compact_threshold(10);
1576
1577        // Write 3 small parts (each has 2 rows), should be collected in unordered_part
1578        for i in 0..3 {
1579            let part = create_bulk_part_with_converter(
1580                &format!("key_{}", i),
1581                i,
1582                vec![1000 + i as i64 * 100, 1100 + i as i64 * 100],
1583                vec![Some(i as f64 * 10.0), Some(i as f64 * 10.0 + 1.0)],
1584                100 + i as u64,
1585            )
1586            .unwrap();
1587            assert_eq!(2, part.num_rows());
1588            memtable.write_bulk(part).unwrap();
1589        }
1590
1591        // Total rows = 6, not yet reaching compact threshold
1592        let stats = memtable.stats();
1593        assert_eq!(6, stats.num_rows);
1594
1595        // Write 2 more small parts (each has 2 rows)
1596        // This should trigger compaction when total >= 10
1597        for i in 3..5 {
1598            let part = create_bulk_part_with_converter(
1599                &format!("key_{}", i),
1600                i,
1601                vec![1000 + i as i64 * 100, 1100 + i as i64 * 100],
1602                vec![Some(i as f64 * 10.0), Some(i as f64 * 10.0 + 1.0)],
1603                100 + i as u64,
1604            )
1605            .unwrap();
1606            memtable.write_bulk(part).unwrap();
1607        }
1608
1609        // Total rows = 10, should have compacted unordered_part into a regular part
1610        let stats = memtable.stats();
1611        assert_eq!(10, stats.num_rows);
1612
1613        // Verify we can read all data correctly
1614        let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
1615        let ranges = memtable
1616            .ranges(
1617                None,
1618                RangesOptions::default().with_predicate(predicate_group),
1619            )
1620            .unwrap();
1621
1622        // Should have at least 1 range (the compacted part)
1623        assert!(!ranges.ranges.is_empty());
1624        let total_rows: usize = ranges.ranges.values().map(|r| r.stats().num_rows()).sum();
1625        assert_eq!(10, total_rows);
1626
1627        // Read all data and verify
1628        let mut total_rows_read = 0;
1629        for (_range_id, range) in ranges.ranges.iter() {
1630            assert!(range.is_record_batch());
1631            let record_batch_iter = range.build_record_batch_iter(None).unwrap();
1632
1633            for batch_result in record_batch_iter {
1634                let batch = batch_result.unwrap();
1635                total_rows_read += batch.num_rows();
1636            }
1637        }
1638        assert_eq!(10, total_rows_read);
1639    }
1640
1641    #[test]
1642    fn test_bulk_memtable_unordered_part_mixed_sizes() {
1643        let metadata = metadata_for_test();
1644        let memtable = BulkMemtable::new(
1645            1002,
1646            metadata.clone(),
1647            None,
1648            None,
1649            false,
1650            MergeMode::LastRow,
1651        );
1652
1653        // Set threshold to 4 rows - parts with < 4 rows go to unordered_part
1654        memtable.set_unordered_part_threshold(4);
1655        memtable.set_unordered_part_compact_threshold(8);
1656
1657        // Write small parts (3 rows each) - should go to unordered_part
1658        for i in 0..2 {
1659            let part = create_bulk_part_with_converter(
1660                &format!("small_{}", i),
1661                i,
1662                vec![1000 + i as i64, 2000 + i as i64, 3000 + i as i64],
1663                vec![Some(i as f64), Some(i as f64 + 1.0), Some(i as f64 + 2.0)],
1664                10 + i as u64,
1665            )
1666            .unwrap();
1667            assert_eq!(3, part.num_rows());
1668            memtable.write_bulk(part).unwrap();
1669        }
1670
1671        // Write a large part (5 rows) - should go directly to regular parts
1672        let large_part = create_bulk_part_with_converter(
1673            "large_key",
1674            100,
1675            vec![5000, 6000, 7000, 8000, 9000],
1676            vec![
1677                Some(100.0),
1678                Some(101.0),
1679                Some(102.0),
1680                Some(103.0),
1681                Some(104.0),
1682            ],
1683            50,
1684        )
1685        .unwrap();
1686        assert_eq!(5, large_part.num_rows());
1687        memtable.write_bulk(large_part).unwrap();
1688
1689        // Write another small part (2 rows) - should trigger compaction of unordered_part
1690        let part = create_bulk_part_with_converter(
1691            "small_2",
1692            2,
1693            vec![4000, 4100],
1694            vec![Some(20.0), Some(21.0)],
1695            30,
1696        )
1697        .unwrap();
1698        memtable.write_bulk(part).unwrap();
1699
1700        let stats = memtable.stats();
1701        assert_eq!(13, stats.num_rows); // 3 + 3 + 5 + 2 = 13
1702
1703        // Verify all data can be read
1704        let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
1705        let ranges = memtable
1706            .ranges(
1707                None,
1708                RangesOptions::default().with_predicate(predicate_group),
1709            )
1710            .unwrap();
1711
1712        let total_rows: usize = ranges.ranges.values().map(|r| r.stats().num_rows()).sum();
1713        assert_eq!(13, total_rows);
1714
1715        let mut total_rows_read = 0;
1716        for (_range_id, range) in ranges.ranges.iter() {
1717            let record_batch_iter = range.build_record_batch_iter(None).unwrap();
1718            for batch_result in record_batch_iter {
1719                let batch = batch_result.unwrap();
1720                total_rows_read += batch.num_rows();
1721            }
1722        }
1723        assert_eq!(13, total_rows_read);
1724    }
1725
1726    #[test]
1727    fn test_bulk_memtable_unordered_part_with_ranges() {
1728        let metadata = metadata_for_test();
1729        let memtable = BulkMemtable::new(
1730            1003,
1731            metadata.clone(),
1732            None,
1733            None,
1734            false,
1735            MergeMode::LastRow,
1736        );
1737
1738        // Set small thresholds
1739        memtable.set_unordered_part_threshold(3);
1740        memtable.set_unordered_part_compact_threshold(100); // High threshold to prevent auto-compaction
1741
1742        // Write several small parts that stay in unordered_part
1743        for i in 0..3 {
1744            let part = create_bulk_part_with_converter(
1745                &format!("key_{}", i),
1746                i,
1747                vec![1000 + i as i64 * 100],
1748                vec![Some(i as f64 * 10.0)],
1749                100 + i as u64,
1750            )
1751            .unwrap();
1752            assert_eq!(1, part.num_rows());
1753            memtable.write_bulk(part).unwrap();
1754        }
1755
1756        let stats = memtable.stats();
1757        assert_eq!(3, stats.num_rows);
1758
1759        // Test that ranges() can correctly read from unordered_part
1760        let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
1761        let ranges = memtable
1762            .ranges(
1763                None,
1764                RangesOptions::default().with_predicate(predicate_group),
1765            )
1766            .unwrap();
1767
1768        // Should have 1 range for the unordered_part
1769        assert_eq!(1, ranges.ranges.len());
1770        let total_rows: usize = ranges.ranges.values().map(|r| r.stats().num_rows()).sum();
1771        assert_eq!(3, total_rows);
1772
1773        // Verify data is sorted correctly in the range
1774        let range = ranges.ranges.get(&0).unwrap();
1775        let record_batch_iter = range.build_record_batch_iter(None).unwrap();
1776
1777        let mut total_rows = 0;
1778        for batch_result in record_batch_iter {
1779            let batch = batch_result.unwrap();
1780            total_rows += batch.num_rows();
1781            // Verify data is properly sorted by primary key
1782            assert!(batch.num_rows() > 0);
1783        }
1784        assert_eq!(3, total_rows);
1785    }
1786}