1pub(crate) mod chunk_reader;
18pub mod context;
19pub(crate) mod json_align;
20pub mod part;
21pub mod part_reader;
22mod row_group_reader;
23
24use std::collections::{BTreeMap, HashSet};
25use std::sync::atomic::{AtomicI64, AtomicU64, AtomicUsize, Ordering};
26use std::sync::{Arc, LazyLock, Mutex, RwLock};
27use std::time::Instant;
28
29fn env_usize(name: &str, default: usize) -> usize {
31 std::env::var(name)
32 .ok()
33 .and_then(|v| v.parse().ok())
34 .unwrap_or(default)
35}
36
37use common_time::Timestamp;
38use datatypes::arrow::datatypes::SchemaRef;
39use mito_codec::key_values::KeyValue;
40use rayon::prelude::*;
41use serde::{Deserialize, Serialize};
42use serde_with::{DisplayFromStr, serde_as};
43use store_api::metadata::RegionMetadataRef;
44use store_api::storage::{ColumnId, FileId, RegionId, SequenceRange};
45use tokio::sync::Semaphore;
46
47use crate::error::{Result, UnsupportedOperationSnafu};
48use crate::flush::WriteBufferManagerRef;
49use crate::memtable::bulk::context::BulkIterContext;
50use crate::memtable::bulk::json_align::Json2Aligner;
51use crate::memtable::bulk::part::{
52 BulkPart, BulkPartEncodeMetrics, BulkPartEncoder, MultiBulkPart, UnorderedPart,
53 should_prune_bulk_part,
54};
55use crate::memtable::bulk::part_reader::BulkPartBatchIter;
56use crate::memtable::stats::WriteMetrics;
57use crate::memtable::{
58 AllocTracker, BoxedBatchIterator, BoxedRecordBatchIterator, EncodedBulkPart, EncodedRange,
59 IterBuilder, KeyValues, MemScanMetrics, Memtable, MemtableBuilder, MemtableId, MemtableRange,
60 MemtableRangeContext, MemtableRanges, MemtableRef, MemtableStats, RangesOptions,
61};
62use crate::read::flat_dedup::{FlatDedupIterator, FlatLastNonNull, FlatLastRow};
63use crate::read::flat_merge::FlatMergeIterator;
64use crate::region::options::MergeMode;
65use crate::sst::parquet::flat_format::field_column_start;
66use crate::sst::parquet::{DEFAULT_READ_BATCH_SIZE, DEFAULT_ROW_GROUP_SIZE};
67
68const DEFAULT_MERGE_THRESHOLD: usize = 16;
70
71static MERGE_THRESHOLD: LazyLock<usize> =
73 LazyLock::new(|| env_usize("GREPTIME_BULK_MERGE_THRESHOLD", DEFAULT_MERGE_THRESHOLD));
74
75const DEFAULT_MAX_MERGE_GROUPS: usize = 32;
77
78static MAX_MERGE_GROUPS: LazyLock<usize> =
80 LazyLock::new(|| env_usize("GREPTIME_BULK_MAX_MERGE_GROUPS", DEFAULT_MAX_MERGE_GROUPS));
81
82pub(crate) static ENCODE_ROW_THRESHOLD: LazyLock<usize> = LazyLock::new(|| {
85 env_usize(
86 "GREPTIME_BULK_ENCODE_ROW_THRESHOLD",
87 10 * DEFAULT_ROW_GROUP_SIZE,
88 )
89});
90
91const DEFAULT_ENCODE_BYTES_THRESHOLD: usize = 64 * 1024 * 1024;
93
94static ENCODE_BYTES_THRESHOLD: LazyLock<usize> = LazyLock::new(|| {
97 env_usize(
98 "GREPTIME_BULK_ENCODE_BYTES_THRESHOLD",
99 DEFAULT_ENCODE_BYTES_THRESHOLD,
100 )
101});
102
103#[serde_as]
105#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
106#[serde(default)]
107pub struct BulkMemtableConfig {
108 #[serde_as(as = "DisplayFromStr")]
110 pub merge_threshold: usize,
111 #[serde_as(as = "DisplayFromStr")]
113 pub encode_row_threshold: usize,
114 #[serde_as(as = "DisplayFromStr")]
116 pub encode_bytes_threshold: usize,
117 #[serde_as(as = "DisplayFromStr")]
119 pub max_merge_groups: usize,
120}
121
122impl Default for BulkMemtableConfig {
123 fn default() -> Self {
124 Self {
125 merge_threshold: *MERGE_THRESHOLD,
126 encode_row_threshold: *ENCODE_ROW_THRESHOLD,
127 encode_bytes_threshold: *ENCODE_BYTES_THRESHOLD,
128 max_merge_groups: *MAX_MERGE_GROUPS,
129 }
130 .sanitize()
131 }
132}
133
134impl BulkMemtableConfig {
135 fn sanitize(mut self) -> Self {
136 if self.merge_threshold == 0 {
137 self.merge_threshold = DEFAULT_MERGE_THRESHOLD;
138 }
139 self
140 }
141}
142
143enum MergedPart {
145 Multi(MultiBulkPart),
147 Encoded(EncodedBulkPart),
149}
150
151struct CollectedParts {
153 groups: Vec<Vec<PartToMerge>>,
155}
156
157#[derive(Default)]
159struct BulkParts {
160 unordered_part: UnorderedPart,
162 parts: Vec<BulkPartWrapper>,
164}
165
166impl BulkParts {
167 fn num_parts(&self) -> usize {
169 let unordered_count = if self.unordered_part.is_empty() { 0 } else { 1 };
170 self.parts.len() + unordered_count
171 }
172
173 fn is_empty(&self) -> bool {
175 self.unordered_part.is_empty() && self.parts.is_empty()
176 }
177
178 fn should_merge_parts(&self, merge_threshold: usize) -> bool {
181 let mut bulk_count = 0;
182 let mut encoded_count = 0;
183
184 for wrapper in &self.parts {
185 if wrapper.merging {
186 continue;
187 }
188
189 if wrapper.part.is_encoded() {
190 encoded_count += 1;
191 } else {
192 bulk_count += 1;
193 }
194
195 if bulk_count >= merge_threshold || encoded_count >= merge_threshold {
197 return true;
198 }
199 }
200
201 false
202 }
203
204 fn should_compact_unordered_part(&self) -> bool {
206 self.unordered_part.should_compact()
207 }
208
209 fn collect_parts_to_merge(
213 &mut self,
214 merge_threshold: usize,
215 max_merge_groups: usize,
216 ) -> CollectedParts {
217 let mut bulk_indices: Vec<(usize, usize)> = Vec::new();
219 let mut encoded_indices: Vec<(usize, usize)> = Vec::new();
220
221 for (idx, wrapper) in self.parts.iter().enumerate() {
222 if wrapper.merging {
223 continue;
224 }
225 let num_rows = wrapper.part.num_rows();
226 if wrapper.part.is_encoded() {
227 encoded_indices.push((idx, num_rows));
228 } else {
229 bulk_indices.push((idx, num_rows));
230 }
231 }
232
233 let mut groups = Vec::new();
234
235 if bulk_indices.len() >= merge_threshold {
237 groups.extend(self.collect_and_group_parts(
238 bulk_indices,
239 merge_threshold,
240 max_merge_groups,
241 ));
242 }
243
244 if encoded_indices.len() >= merge_threshold {
246 groups.extend(self.collect_and_group_parts(
247 encoded_indices,
248 merge_threshold,
249 max_merge_groups,
250 ));
251 }
252
253 CollectedParts { groups }
254 }
255
256 fn collect_and_group_parts(
258 &mut self,
259 mut indices: Vec<(usize, usize)>,
260 merge_threshold: usize,
261 max_merge_groups: usize,
262 ) -> Vec<Vec<PartToMerge>> {
263 if indices.is_empty() {
264 return Vec::new();
265 }
266
267 indices.sort_unstable_by_key(|(_, num_rows)| *num_rows);
269
270 indices
272 .chunks(merge_threshold)
273 .take(max_merge_groups)
274 .map(|chunk| {
275 chunk
276 .iter()
277 .map(|(idx, _)| {
278 let wrapper = &mut self.parts[*idx];
279 wrapper.merging = true;
280 wrapper.part.clone()
281 })
282 .collect()
283 })
284 .collect()
285 }
286
287 fn install_merged_parts<I>(
290 &mut self,
291 merged_parts: I,
292 merged_file_ids: &HashSet<FileId>,
293 ) -> usize
294 where
295 I: IntoIterator<Item = MergedPart>,
296 {
297 let mut total_output_rows = 0;
298
299 for merged_part in merged_parts {
300 match merged_part {
301 MergedPart::Encoded(encoded_part) => {
302 total_output_rows += encoded_part.metadata().num_rows;
303 self.parts.push(BulkPartWrapper {
304 part: PartToMerge::Encoded {
305 part: encoded_part,
306 file_id: FileId::random(),
307 },
308 merging: false,
309 });
310 }
311 MergedPart::Multi(multi_part) => {
312 total_output_rows += multi_part.num_rows();
313 self.parts.push(BulkPartWrapper {
314 part: PartToMerge::Multi {
315 part: multi_part,
316 file_id: FileId::random(),
317 },
318 merging: false,
319 });
320 }
321 }
322 }
323
324 self.parts
325 .retain(|wrapper| !merged_file_ids.contains(&wrapper.file_id()));
326
327 total_output_rows
328 }
329
330 fn reset_merging_flags(&mut self, file_ids: &HashSet<FileId>) {
333 for wrapper in &mut self.parts {
334 if file_ids.contains(&wrapper.file_id()) {
335 wrapper.merging = false;
336 }
337 }
338 }
339}
340
341struct MergingFlagsGuard<'a> {
344 bulk_parts: &'a RwLock<BulkParts>,
345 file_ids: &'a HashSet<FileId>,
346 success: bool,
347}
348
349impl<'a> MergingFlagsGuard<'a> {
350 fn new(bulk_parts: &'a RwLock<BulkParts>, file_ids: &'a HashSet<FileId>) -> Self {
352 Self {
353 bulk_parts,
354 file_ids,
355 success: false,
356 }
357 }
358
359 fn mark_success(&mut self) {
362 self.success = true;
363 }
364}
365
366impl<'a> Drop for MergingFlagsGuard<'a> {
367 fn drop(&mut self) {
368 if !self.success
369 && let Ok(mut parts) = self.bulk_parts.write()
370 {
371 parts.reset_merging_flags(self.file_ids);
372 }
373 }
374}
375
376pub struct BulkMemtable {
378 id: MemtableId,
379 config: BulkMemtableConfig,
381 parts: Arc<RwLock<BulkParts>>,
382 metadata: RegionMetadataRef,
383 alloc_tracker: AllocTracker,
384 max_timestamp: AtomicI64,
385 min_timestamp: AtomicI64,
386 max_sequence: AtomicU64,
387 num_rows: AtomicUsize,
388 compactor: Arc<Mutex<MemtableCompactor>>,
390 compact_dispatcher: Option<Arc<CompactDispatcher>>,
392 append_mode: bool,
394 merge_mode: MergeMode,
396}
397
398impl std::fmt::Debug for BulkMemtable {
399 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
400 f.debug_struct("BulkMemtable")
401 .field("id", &self.id)
402 .field("num_rows", &self.num_rows.load(Ordering::Relaxed))
403 .field("min_timestamp", &self.min_timestamp.load(Ordering::Relaxed))
404 .field("max_timestamp", &self.max_timestamp.load(Ordering::Relaxed))
405 .field("max_sequence", &self.max_sequence.load(Ordering::Relaxed))
406 .finish()
407 }
408}
409
410impl Memtable for BulkMemtable {
411 fn id(&self) -> MemtableId {
412 self.id
413 }
414
415 fn write(&self, _kvs: &KeyValues) -> Result<()> {
416 UnsupportedOperationSnafu {
417 err_msg: "write() is not supported for bulk memtable",
418 }
419 .fail()
420 }
421
422 fn write_one(&self, _key_value: KeyValue) -> Result<()> {
423 UnsupportedOperationSnafu {
424 err_msg: "write_one() is not supported for bulk memtable",
425 }
426 .fail()
427 }
428
429 fn write_bulk(&self, fragment: BulkPart) -> Result<()> {
430 let local_metrics = WriteMetrics {
431 key_bytes: 0,
432 value_bytes: fragment.estimated_size(),
433 min_ts: fragment.min_timestamp,
434 max_ts: fragment.max_timestamp,
435 num_rows: fragment.num_rows(),
436 max_sequence: fragment.sequence,
437 };
438
439 {
440 let mut bulk_parts = self.parts.write().unwrap();
441
442 if bulk_parts.unordered_part.should_accept(fragment.num_rows()) {
444 bulk_parts.unordered_part.push(fragment);
445
446 if bulk_parts.should_compact_unordered_part()
448 && let Some(bulk_part) = bulk_parts.unordered_part.to_bulk_part()?
449 {
450 bulk_parts.parts.push(BulkPartWrapper {
451 part: PartToMerge::Bulk {
452 part: bulk_part,
453 file_id: FileId::random(),
454 },
455 merging: false,
456 });
457 bulk_parts.unordered_part.clear();
458 }
459 } else {
460 bulk_parts.parts.push(BulkPartWrapper {
461 part: PartToMerge::Bulk {
462 part: fragment,
463 file_id: FileId::random(),
464 },
465 merging: false,
466 });
467 }
468
469 self.update_stats(local_metrics);
474 }
475
476 if self.should_compact() {
477 self.schedule_compact();
478 }
479
480 Ok(())
481 }
482
483 fn ranges(
484 &self,
485 projection: Option<&[ColumnId]>,
486 options: RangesOptions,
487 ) -> Result<MemtableRanges> {
488 let predicate = options.predicate;
489 let sequence = options.sequence;
490 let mut ranges = BTreeMap::new();
491 let mut range_id = 0;
492
493 let context = Arc::new(BulkIterContext::new_with_pre_filter_mode(
495 self.metadata.clone(),
496 projection,
497 predicate.predicate().cloned(),
498 options.for_flush,
499 options.pre_filter_mode,
500 )?);
501
502 {
504 let bulk_parts = self.parts.read().unwrap();
505
506 if !bulk_parts.unordered_part.is_empty()
508 && let Some(unordered_bulk_part) = bulk_parts.unordered_part.to_bulk_part()?
509 {
510 let part_stats = unordered_bulk_part.to_memtable_stats(&self.metadata);
511 let range = MemtableRange::new(
512 Arc::new(MemtableRangeContext::new(
513 self.id,
514 Box::new(BulkRangeIterBuilder {
515 part: unordered_bulk_part,
516 context: context.clone(),
517 sequence,
518 }),
519 predicate.clone(),
520 )),
521 part_stats,
522 );
523 ranges.insert(range_id, range);
524 range_id += 1;
525 }
526
527 for part_wrapper in bulk_parts.parts.iter() {
529 if part_wrapper.part.num_rows() == 0 {
531 continue;
532 }
533
534 let part_stats = part_wrapper.part.to_memtable_stats(&self.metadata);
535 let iter_builder: Box<dyn IterBuilder> = match &part_wrapper.part {
536 PartToMerge::Bulk { part, .. } => Box::new(BulkRangeIterBuilder {
537 part: part.clone(),
538 context: context.clone(),
539 sequence,
540 }),
541 PartToMerge::Multi { part, .. } => Box::new(MultiBulkRangeIterBuilder {
542 part: part.clone(),
543 context: context.clone(),
544 sequence,
545 }),
546 PartToMerge::Encoded { part, file_id } => {
547 Box::new(EncodedBulkRangeIterBuilder {
548 file_id: *file_id,
549 part: part.clone(),
550 context: context.clone(),
551 sequence,
552 })
553 }
554 };
555
556 let range = MemtableRange::new(
557 Arc::new(MemtableRangeContext::new(
558 self.id,
559 iter_builder,
560 predicate.clone(),
561 )),
562 part_stats,
563 );
564 ranges.insert(range_id, range);
565 range_id += 1;
566 }
567 }
568
569 Ok(MemtableRanges { ranges })
570 }
571
572 fn is_empty(&self) -> bool {
573 let bulk_parts = self.parts.read().unwrap();
574 bulk_parts.is_empty()
575 }
576
577 fn freeze(&self) -> Result<()> {
578 self.alloc_tracker.done_allocating();
579 Ok(())
580 }
581
582 fn stats(&self) -> MemtableStats {
583 let estimated_bytes = self.alloc_tracker.bytes_allocated();
584
585 if estimated_bytes == 0 || self.num_rows.load(Ordering::Relaxed) == 0 {
586 return MemtableStats {
587 estimated_bytes,
588 time_range: None,
589 num_rows: 0,
590 num_ranges: 0,
591 max_sequence: 0,
592 series_count: 0,
593 };
594 }
595
596 let ts_type = self
597 .metadata
598 .time_index_column()
599 .column_schema
600 .data_type
601 .clone()
602 .as_timestamp()
603 .expect("Timestamp column must have timestamp type");
604 let max_timestamp = ts_type.create_timestamp(self.max_timestamp.load(Ordering::Relaxed));
605 let min_timestamp = ts_type.create_timestamp(self.min_timestamp.load(Ordering::Relaxed));
606
607 let num_ranges = self.parts.read().unwrap().num_parts();
608
609 MemtableStats {
610 estimated_bytes,
611 time_range: Some((min_timestamp, max_timestamp)),
612 num_rows: self.num_rows.load(Ordering::Relaxed),
613 num_ranges,
614 max_sequence: self.max_sequence.load(Ordering::Relaxed),
615 series_count: self.estimated_series_count(),
616 }
617 }
618
619 fn fork(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef {
620 Arc::new(Self {
621 id,
622 config: self.config.clone(),
623 parts: Arc::new(RwLock::new(BulkParts::default())),
624 metadata: metadata.clone(),
625 alloc_tracker: AllocTracker::new(self.alloc_tracker.write_buffer_manager()),
626 max_timestamp: AtomicI64::new(i64::MIN),
627 min_timestamp: AtomicI64::new(i64::MAX),
628 max_sequence: AtomicU64::new(0),
629 num_rows: AtomicUsize::new(0),
630 compactor: Arc::new(Mutex::new(MemtableCompactor::new(
631 metadata.region_id,
632 id,
633 self.config.clone(),
634 ))),
635 compact_dispatcher: self.compact_dispatcher.clone(),
636 append_mode: self.append_mode,
637 merge_mode: self.merge_mode,
638 })
639 }
640
641 fn compact(&self, for_flush: bool) -> Result<()> {
642 let mut compactor = self.compactor.lock().unwrap();
643
644 if for_flush {
645 return Ok(());
646 }
647
648 let should_merge = self
650 .parts
651 .read()
652 .unwrap()
653 .should_merge_parts(self.config.merge_threshold);
654 if should_merge {
655 compactor.merge_parts(
656 &self.parts,
657 &self.metadata,
658 !self.append_mode,
659 self.merge_mode,
660 )?;
661 }
662
663 Ok(())
664 }
665}
666
667impl BulkMemtable {
668 pub fn new(
670 id: MemtableId,
671 config: BulkMemtableConfig,
672 metadata: RegionMetadataRef,
673 write_buffer_manager: Option<WriteBufferManagerRef>,
674 compact_dispatcher: Option<Arc<CompactDispatcher>>,
675 append_mode: bool,
676 merge_mode: MergeMode,
677 ) -> Self {
678 let config = config.sanitize();
679 let region_id = metadata.region_id;
680 Self {
681 id,
682 config: config.clone(),
683 parts: Arc::new(RwLock::new(BulkParts::default())),
684 metadata,
685 alloc_tracker: AllocTracker::new(write_buffer_manager),
686 max_timestamp: AtomicI64::new(i64::MIN),
687 min_timestamp: AtomicI64::new(i64::MAX),
688 max_sequence: AtomicU64::new(0),
689 num_rows: AtomicUsize::new(0),
690 compactor: Arc::new(Mutex::new(MemtableCompactor::new(region_id, id, config))),
691 compact_dispatcher,
692 append_mode,
693 merge_mode,
694 }
695 }
696
697 #[cfg(test)]
699 pub fn set_unordered_part_threshold(&self, threshold: usize) {
700 self.parts
701 .write()
702 .unwrap()
703 .unordered_part
704 .set_threshold(threshold);
705 }
706
707 #[cfg(test)]
709 pub fn set_unordered_part_compact_threshold(&self, compact_threshold: usize) {
710 self.parts
711 .write()
712 .unwrap()
713 .unordered_part
714 .set_compact_threshold(compact_threshold);
715 }
716
717 fn update_stats(&self, stats: WriteMetrics) {
721 self.alloc_tracker
722 .on_allocation(stats.key_bytes + stats.value_bytes);
723
724 self.max_timestamp
725 .fetch_max(stats.max_ts, Ordering::Relaxed);
726 self.min_timestamp
727 .fetch_min(stats.min_ts, Ordering::Relaxed);
728 self.max_sequence
729 .fetch_max(stats.max_sequence, Ordering::Relaxed);
730 self.num_rows.fetch_add(stats.num_rows, Ordering::Relaxed);
731 }
732
733 fn estimated_series_count(&self) -> usize {
735 let bulk_parts = self.parts.read().unwrap();
736 bulk_parts
737 .parts
738 .iter()
739 .map(|part_wrapper| part_wrapper.part.series_count())
740 .sum()
741 }
742
743 fn should_compact(&self) -> bool {
745 let parts = self.parts.read().unwrap();
746 parts.should_merge_parts(self.config.merge_threshold)
747 }
748
749 fn schedule_compact(&self) {
751 if let Some(dispatcher) = &self.compact_dispatcher {
752 let task = MemCompactTask {
753 metadata: self.metadata.clone(),
754 parts: self.parts.clone(),
755 config: self.config.clone(),
756 compactor: self.compactor.clone(),
757 append_mode: self.append_mode,
758 merge_mode: self.merge_mode,
759 };
760
761 dispatcher.dispatch_compact(task);
762 } else {
763 if let Err(e) = self.compact(false) {
765 common_telemetry::error!(e; "Failed to compact table");
766 }
767 }
768 }
769}
770
771pub struct BulkRangeIterBuilder {
773 pub part: BulkPart,
774 pub context: Arc<BulkIterContext>,
775 pub sequence: Option<SequenceRange>,
776}
777
778struct MultiBulkRangeIterBuilder {
780 part: MultiBulkPart,
781 context: Arc<BulkIterContext>,
782 sequence: Option<SequenceRange>,
783}
784
785impl IterBuilder for BulkRangeIterBuilder {
786 fn build(&self, _metrics: Option<MemScanMetrics>) -> Result<BoxedBatchIterator> {
787 UnsupportedOperationSnafu {
788 err_msg: "BatchIterator is not supported for bulk memtable",
789 }
790 .fail()
791 }
792
793 fn is_record_batch(&self) -> bool {
794 true
795 }
796
797 fn build_record_batch(
798 &self,
799 _time_range: Option<(Timestamp, Timestamp)>,
800 metrics: Option<MemScanMetrics>,
801 ) -> Result<BoxedRecordBatchIterator> {
802 let metadata = self.context.read_format().metadata();
803 if should_prune_bulk_part(&self.part.batch, &self.context, metadata) {
804 return Ok(Box::new(std::iter::empty()));
805 }
806
807 let series_count = self.part.estimated_series_count();
808 let iter = BulkPartBatchIter::from_single(
809 self.part.batch.clone(),
810 self.context.clone(),
811 self.sequence,
812 series_count,
813 metrics,
814 );
815
816 Ok(Box::new(iter))
817 }
818
819 fn record_batch_schema_hint(&self) -> Option<SchemaRef> {
820 Some(self.part.schema())
821 }
822
823 fn encoded_range(&self) -> Option<EncodedRange> {
824 None
825 }
826}
827
828impl IterBuilder for MultiBulkRangeIterBuilder {
829 fn build(&self, _metrics: Option<MemScanMetrics>) -> Result<BoxedBatchIterator> {
830 UnsupportedOperationSnafu {
831 err_msg: "BatchIterator is not supported for multi bulk memtable",
832 }
833 .fail()
834 }
835
836 fn is_record_batch(&self) -> bool {
837 true
838 }
839
840 fn build_record_batch(
841 &self,
842 _time_range: Option<(Timestamp, Timestamp)>,
843 metrics: Option<MemScanMetrics>,
844 ) -> Result<BoxedRecordBatchIterator> {
845 match self
846 .part
847 .read(self.context.clone(), self.sequence, metrics)?
848 {
849 Some(iter) => Ok(iter),
850 None => Ok(Box::new(std::iter::empty())),
852 }
853 }
854
855 fn record_batch_schema_hint(&self) -> Option<SchemaRef> {
856 self.part.schemas().next()
857 }
858
859 fn encoded_range(&self) -> Option<EncodedRange> {
860 None
861 }
862}
863
864struct EncodedBulkRangeIterBuilder {
866 file_id: FileId,
867 part: EncodedBulkPart,
868 context: Arc<BulkIterContext>,
869 sequence: Option<SequenceRange>,
870}
871
872impl IterBuilder for EncodedBulkRangeIterBuilder {
873 fn build(&self, _metrics: Option<MemScanMetrics>) -> Result<BoxedBatchIterator> {
874 UnsupportedOperationSnafu {
875 err_msg: "BatchIterator is not supported for encoded bulk memtable",
876 }
877 .fail()
878 }
879
880 fn is_record_batch(&self) -> bool {
881 true
882 }
883
884 fn build_record_batch(
885 &self,
886 _time_range: Option<(Timestamp, Timestamp)>,
887 metrics: Option<MemScanMetrics>,
888 ) -> Result<BoxedRecordBatchIterator> {
889 if let Some(iter) = self
890 .part
891 .read(self.context.clone(), self.sequence, metrics)?
892 {
893 Ok(iter)
894 } else {
895 Ok(Box::new(std::iter::empty()))
897 }
898 }
899
900 fn record_batch_schema_hint(&self) -> Option<SchemaRef> {
901 Some(self.part.schema())
902 }
903
904 fn encoded_range(&self) -> Option<EncodedRange> {
905 Some(EncodedRange {
906 data: self.part.data().clone(),
907 sst_info: self.part.to_sst_info(self.file_id),
908 })
909 }
910}
911
912struct BulkPartWrapper {
913 part: PartToMerge,
915 merging: bool,
917}
918
919impl BulkPartWrapper {
920 fn file_id(&self) -> FileId {
922 self.part.file_id()
923 }
924}
925
926#[derive(Clone)]
928enum PartToMerge {
929 Bulk { part: BulkPart, file_id: FileId },
931 Multi {
933 part: MultiBulkPart,
934 file_id: FileId,
935 },
936 Encoded {
938 part: EncodedBulkPart,
939 file_id: FileId,
940 },
941}
942
943impl PartToMerge {
944 fn file_id(&self) -> FileId {
946 match self {
947 PartToMerge::Bulk { file_id, .. } => *file_id,
948 PartToMerge::Multi { file_id, .. } => *file_id,
949 PartToMerge::Encoded { file_id, .. } => *file_id,
950 }
951 }
952
953 fn min_timestamp(&self) -> i64 {
955 match self {
956 PartToMerge::Bulk { part, .. } => part.min_timestamp,
957 PartToMerge::Multi { part, .. } => part.min_timestamp(),
958 PartToMerge::Encoded { part, .. } => part.metadata().min_timestamp,
959 }
960 }
961
962 fn max_timestamp(&self) -> i64 {
964 match self {
965 PartToMerge::Bulk { part, .. } => part.max_timestamp,
966 PartToMerge::Multi { part, .. } => part.max_timestamp(),
967 PartToMerge::Encoded { part, .. } => part.metadata().max_timestamp,
968 }
969 }
970
971 fn num_rows(&self) -> usize {
973 match self {
974 PartToMerge::Bulk { part, .. } => part.num_rows(),
975 PartToMerge::Multi { part, .. } => part.num_rows(),
976 PartToMerge::Encoded { part, .. } => part.metadata().num_rows,
977 }
978 }
979
980 fn max_sequence(&self) -> u64 {
982 match self {
983 PartToMerge::Bulk { part, .. } => part.sequence,
984 PartToMerge::Multi { part, .. } => part.max_sequence(),
985 PartToMerge::Encoded { part, .. } => part.metadata().max_sequence,
986 }
987 }
988
989 fn series_count(&self) -> usize {
991 match self {
992 PartToMerge::Bulk { part, .. } => part.estimated_series_count(),
993 PartToMerge::Multi { part, .. } => part.series_count(),
994 PartToMerge::Encoded { part, .. } => part.metadata().num_series as usize,
995 }
996 }
997
998 fn is_encoded(&self) -> bool {
1000 matches!(self, PartToMerge::Encoded { .. })
1001 }
1002
1003 fn estimated_size(&self) -> usize {
1005 match self {
1006 PartToMerge::Bulk { part, .. } => part.estimated_size(),
1007 PartToMerge::Multi { part, .. } => part.estimated_size(),
1008 PartToMerge::Encoded { part, .. } => part.size_bytes(),
1009 }
1010 }
1011
1012 fn to_memtable_stats(&self, region_metadata: &RegionMetadataRef) -> MemtableStats {
1014 match self {
1015 PartToMerge::Bulk { part, .. } => part.to_memtable_stats(region_metadata),
1016 PartToMerge::Multi { part, .. } => part.to_memtable_stats(region_metadata),
1017 PartToMerge::Encoded { part, .. } => part.to_memtable_stats(),
1018 }
1019 }
1020
1021 fn arrow_schema(&self) -> SchemaRef {
1023 match self {
1024 PartToMerge::Bulk { part, .. } => part.schema(),
1025 PartToMerge::Multi { part, .. } => part
1028 .schemas()
1029 .next()
1030 .expect("MultiBulkPart must contain at least one record batch"),
1031 PartToMerge::Encoded { part, .. } => part.schema(),
1032 }
1033 }
1034
1035 fn create_iterator(
1037 self,
1038 context: Arc<BulkIterContext>,
1039 ) -> Result<Option<BoxedRecordBatchIterator>> {
1040 match self {
1041 PartToMerge::Bulk { part, .. } => {
1042 let series_count = part.estimated_series_count();
1043 let iter = BulkPartBatchIter::from_single(
1044 part.batch,
1045 context,
1046 None, series_count,
1048 None, );
1050 Ok(Some(Box::new(iter) as BoxedRecordBatchIterator))
1051 }
1052 PartToMerge::Multi { part, .. } => part.read(context, None, None),
1053 PartToMerge::Encoded { part, .. } => part.read(context, None, None),
1054 }
1055 }
1056}
1057
1058struct MemtableCompactor {
1059 region_id: RegionId,
1060 memtable_id: MemtableId,
1061 config: BulkMemtableConfig,
1063}
1064
1065impl MemtableCompactor {
1066 fn new(region_id: RegionId, memtable_id: MemtableId, config: BulkMemtableConfig) -> Self {
1068 Self {
1069 region_id,
1070 memtable_id,
1071 config,
1072 }
1073 }
1074
1075 fn merge_parts(
1077 &mut self,
1078 bulk_parts: &RwLock<BulkParts>,
1079 metadata: &RegionMetadataRef,
1080 dedup: bool,
1081 merge_mode: MergeMode,
1082 ) -> Result<()> {
1083 let start = Instant::now();
1084
1085 let collected = bulk_parts
1087 .write()
1088 .unwrap()
1089 .collect_parts_to_merge(self.config.merge_threshold, self.config.max_merge_groups);
1090
1091 if collected.groups.is_empty() {
1092 return Ok(());
1093 }
1094
1095 let merged_file_ids: HashSet<FileId> = collected
1097 .groups
1098 .iter()
1099 .flatten()
1100 .map(|part| part.file_id())
1101 .collect();
1102 let mut guard = MergingFlagsGuard::new(bulk_parts, &merged_file_ids);
1103
1104 let num_groups = collected.groups.len();
1105 let num_parts: usize = collected.groups.iter().map(|g| g.len()).sum();
1106
1107 let encode_row_threshold = self.config.encode_row_threshold;
1108 let encode_bytes_threshold = self.config.encode_bytes_threshold;
1109
1110 let merged_parts = collected
1112 .groups
1113 .into_par_iter()
1114 .map(|group| {
1115 Self::merge_parts_group(
1116 group,
1117 metadata,
1118 dedup,
1119 merge_mode,
1120 encode_row_threshold,
1121 encode_bytes_threshold,
1122 )
1123 })
1124 .collect::<Result<Vec<Option<MergedPart>>>>()?;
1125
1126 let total_output_rows = {
1128 let mut parts = bulk_parts.write().unwrap();
1129 parts.install_merged_parts(merged_parts.into_iter().flatten(), &merged_file_ids)
1130 };
1131
1132 guard.mark_success();
1133
1134 common_telemetry::debug!(
1135 "BulkMemtable {} {} concurrent compact {} groups, {} parts, {} rows, cost: {:?}",
1136 self.region_id,
1137 self.memtable_id,
1138 num_groups,
1139 num_parts,
1140 total_output_rows,
1141 start.elapsed()
1142 );
1143
1144 Ok(())
1145 }
1146
1147 fn merge_parts_group(
1149 parts_to_merge: Vec<PartToMerge>,
1150 metadata: &RegionMetadataRef,
1151 dedup: bool,
1152 merge_mode: MergeMode,
1153 encode_row_threshold: usize,
1154 encode_bytes_threshold: usize,
1155 ) -> Result<Option<MergedPart>> {
1156 if parts_to_merge.is_empty() {
1157 return Ok(None);
1158 }
1159
1160 let min_timestamp = parts_to_merge
1162 .iter()
1163 .map(|p| p.min_timestamp())
1164 .min()
1165 .unwrap_or(i64::MAX);
1166 let max_timestamp = parts_to_merge
1167 .iter()
1168 .map(|p| p.max_timestamp())
1169 .max()
1170 .unwrap_or(i64::MIN);
1171 let max_sequence = parts_to_merge
1172 .iter()
1173 .map(|p| p.max_sequence())
1174 .max()
1175 .unwrap_or(0);
1176
1177 let estimated_total_rows: usize = parts_to_merge.iter().map(|p| p.num_rows()).sum();
1179 let estimated_total_bytes: usize = parts_to_merge.iter().map(|p| p.estimated_size()).sum();
1180 let estimated_series_count = parts_to_merge
1181 .iter()
1182 .map(|p| p.series_count())
1183 .max()
1184 .unwrap_or(0);
1185
1186 let context = Arc::new(BulkIterContext::new(
1187 metadata.clone(),
1188 None, None, true,
1191 )?);
1192
1193 let aligner = Json2Aligner::try_new(parts_to_merge.iter().map(PartToMerge::arrow_schema))?;
1194
1195 let iterators: Vec<BoxedRecordBatchIterator> = parts_to_merge
1196 .into_iter()
1197 .filter_map(|part| part.create_iterator(context.clone()).ok().flatten())
1198 .map(|iter| aligner.wrap_iter(iter))
1199 .collect();
1200
1201 if iterators.is_empty() {
1202 return Ok(None);
1203 }
1204
1205 let merged_iter =
1206 FlatMergeIterator::new(aligner.schema().clone(), iterators, DEFAULT_READ_BATCH_SIZE)?;
1207
1208 let boxed_iter: BoxedRecordBatchIterator = if dedup {
1209 match merge_mode {
1210 MergeMode::LastRow => {
1211 let dedup_iter = FlatDedupIterator::new(merged_iter, FlatLastRow::new(false));
1212 Box::new(dedup_iter)
1213 }
1214 MergeMode::LastNonNull => {
1215 let field_column_start =
1216 field_column_start(metadata, aligner.schema().fields().len());
1217
1218 let dedup_iter = FlatDedupIterator::new(
1219 merged_iter,
1220 FlatLastNonNull::new(field_column_start, false),
1221 );
1222 Box::new(dedup_iter)
1223 }
1224 }
1225 } else {
1226 Box::new(merged_iter)
1227 };
1228
1229 if estimated_total_rows > encode_row_threshold
1231 || estimated_total_bytes > encode_bytes_threshold
1232 {
1233 let encoder = BulkPartEncoder::new(metadata.clone(), DEFAULT_ROW_GROUP_SIZE)?;
1234 let mut metrics = BulkPartEncodeMetrics::default();
1235 let encoded_part = encoder.encode_record_batch_iter(
1236 boxed_iter,
1237 aligner.schema().clone(),
1238 min_timestamp,
1239 max_timestamp,
1240 max_sequence,
1241 &mut metrics,
1242 )?;
1243
1244 common_telemetry::trace!("merge_parts_group metrics: {:?}", metrics);
1245
1246 Ok(encoded_part.map(MergedPart::Encoded))
1247 } else {
1248 let mut batches = Vec::new();
1250 let mut actual_total_rows = 0;
1251
1252 for batch_result in boxed_iter {
1253 let batch = batch_result?;
1254 actual_total_rows += batch.num_rows();
1255 batches.push(batch);
1256 }
1257
1258 if actual_total_rows == 0 {
1259 return Ok(None);
1260 }
1261
1262 let multi_part = MultiBulkPart::new(
1263 batches,
1264 min_timestamp,
1265 max_timestamp,
1266 max_sequence,
1267 estimated_series_count,
1268 metadata,
1269 );
1270
1271 common_telemetry::trace!(
1272 "merge_parts_group created MultiBulkPart: rows={}, batches={}",
1273 actual_total_rows,
1274 multi_part.num_batches()
1275 );
1276
1277 Ok(Some(MergedPart::Multi(multi_part)))
1278 }
1279 }
1280}
1281
1282struct MemCompactTask {
1284 metadata: RegionMetadataRef,
1285 parts: Arc<RwLock<BulkParts>>,
1286 config: BulkMemtableConfig,
1288 compactor: Arc<Mutex<MemtableCompactor>>,
1290 append_mode: bool,
1292 merge_mode: MergeMode,
1294}
1295
1296impl MemCompactTask {
1297 fn compact(&self) -> Result<()> {
1298 let mut compactor = self.compactor.lock().unwrap();
1299
1300 let should_merge = self
1301 .parts
1302 .read()
1303 .unwrap()
1304 .should_merge_parts(self.config.merge_threshold);
1305 if should_merge {
1306 compactor.merge_parts(
1307 &self.parts,
1308 &self.metadata,
1309 !self.append_mode,
1310 self.merge_mode,
1311 )?;
1312 }
1313
1314 Ok(())
1315 }
1316}
1317
1318#[derive(Debug)]
1320pub struct CompactDispatcher {
1321 semaphore: Arc<Semaphore>,
1322}
1323
1324impl CompactDispatcher {
1325 pub fn new(permits: usize) -> Self {
1327 Self {
1328 semaphore: Arc::new(Semaphore::new(permits)),
1329 }
1330 }
1331
1332 fn dispatch_compact(&self, task: MemCompactTask) {
1334 let semaphore = self.semaphore.clone();
1335 common_runtime::spawn_global(async move {
1336 let Ok(_permit) = semaphore.acquire().await else {
1337 return;
1338 };
1339
1340 common_runtime::spawn_blocking_global(move || {
1341 if let Err(e) = task.compact() {
1342 common_telemetry::error!(e; "Failed to compact memtable, region: {}", task.metadata.region_id);
1343 }
1344 });
1345 });
1346 }
1347}
1348
1349#[derive(Debug, Default)]
1351pub struct BulkMemtableBuilder {
1352 config: BulkMemtableConfig,
1354 write_buffer_manager: Option<WriteBufferManagerRef>,
1355 compact_dispatcher: Option<Arc<CompactDispatcher>>,
1356 append_mode: bool,
1357 merge_mode: MergeMode,
1358}
1359
1360impl BulkMemtableBuilder {
1361 pub fn new(
1363 write_buffer_manager: Option<WriteBufferManagerRef>,
1364 append_mode: bool,
1365 merge_mode: MergeMode,
1366 ) -> Self {
1367 Self {
1368 config: BulkMemtableConfig::default(),
1369 write_buffer_manager,
1370 compact_dispatcher: None,
1371 append_mode,
1372 merge_mode,
1373 }
1374 }
1375
1376 pub fn with_config(mut self, config: BulkMemtableConfig) -> Self {
1378 self.config = config;
1379 self
1380 }
1381
1382 pub fn with_compact_dispatcher(mut self, compact_dispatcher: Arc<CompactDispatcher>) -> Self {
1384 self.compact_dispatcher = Some(compact_dispatcher);
1385 self
1386 }
1387
1388 #[cfg(test)]
1389 pub(crate) fn config(&self) -> &BulkMemtableConfig {
1390 &self.config
1391 }
1392}
1393
1394impl MemtableBuilder for BulkMemtableBuilder {
1395 fn build(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef {
1396 Arc::new(BulkMemtable::new(
1397 id,
1398 self.config.clone(),
1399 metadata.clone(),
1400 self.write_buffer_manager.clone(),
1401 self.compact_dispatcher.clone(),
1402 self.append_mode,
1403 self.merge_mode,
1404 ))
1405 }
1406
1407 fn use_bulk_insert(&self, _metadata: &RegionMetadataRef) -> bool {
1408 true
1409 }
1410}
1411
1412#[cfg(test)]
1413mod tests {
1414 use api::helper::encode_json_value;
1415 use api::v1::value::ValueData;
1416 use api::v1::{Mutation, Row, Rows, SemanticType};
1417 use datatypes::data_type::ConcreteDataType;
1418 use datatypes::extension::json::{JsonExtensionType, JsonMetadata};
1419 use datatypes::json::value::JsonValue;
1420 use datatypes::schema::ColumnSchema;
1421 use datatypes::types::json_type::{JsonNativeType, JsonObjectType};
1422 use mito_codec::row_converter::build_primary_key_codec;
1423 use serde_json::json;
1424 use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder, RegionMetadataRef};
1425
1426 use super::*;
1427 use crate::memtable::bulk::part::BulkPartConverter;
1428 use crate::read::scan_region::PredicateGroup;
1429 use crate::sst::{FlatSchemaOptions, to_flat_sst_arrow_schema};
1430 use crate::test_util::memtable_util::{
1431 build_key_values_with_ts_seq_values, metadata_for_test, region_metadata_to_row_schema,
1432 };
1433
1434 fn create_bulk_part_with_converter(
1435 k0: &str,
1436 k1: u32,
1437 timestamps: Vec<i64>,
1438 values: Vec<Option<f64>>,
1439 sequence: u64,
1440 ) -> Result<BulkPart> {
1441 let metadata = metadata_for_test();
1442 let capacity = 100;
1443 let primary_key_codec = build_primary_key_codec(&metadata);
1444 let schema = to_flat_sst_arrow_schema(
1445 &metadata,
1446 &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
1447 );
1448
1449 let mut converter =
1450 BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec, true);
1451
1452 let key_values = build_key_values_with_ts_seq_values(
1453 &metadata,
1454 k0.to_string(),
1455 k1,
1456 timestamps.into_iter(),
1457 values.into_iter(),
1458 sequence,
1459 );
1460
1461 converter.append_key_values(&key_values)?;
1462 converter.convert()
1463 }
1464
1465 #[test]
1466 fn test_bulk_memtable_sanitizes_zero_merge_threshold() {
1467 let metadata = metadata_for_test();
1468 let config = BulkMemtableConfig {
1469 merge_threshold: 0,
1470 ..Default::default()
1471 };
1472
1473 let memtable =
1474 BulkMemtable::new(999, config, metadata, None, None, false, MergeMode::LastRow);
1475
1476 assert_eq!(DEFAULT_MERGE_THRESHOLD, memtable.config.merge_threshold);
1477 assert_eq!(
1478 DEFAULT_MERGE_THRESHOLD,
1479 memtable.compactor.lock().unwrap().config.merge_threshold
1480 );
1481 }
1482
1483 #[test]
1484 fn test_bulk_memtable_write_read() {
1485 let metadata = metadata_for_test();
1486 let memtable = BulkMemtable::new(
1487 999,
1488 BulkMemtableConfig::default(),
1489 metadata.clone(),
1490 None,
1491 None,
1492 false,
1493 MergeMode::LastRow,
1494 );
1495 memtable.set_unordered_part_threshold(0);
1497
1498 let test_data = [
1499 (
1500 "key_a",
1501 1u32,
1502 vec![1000i64, 2000i64],
1503 vec![Some(10.5), Some(20.5)],
1504 100u64,
1505 ),
1506 (
1507 "key_b",
1508 2u32,
1509 vec![1500i64, 2500i64],
1510 vec![Some(15.5), Some(25.5)],
1511 200u64,
1512 ),
1513 ("key_c", 3u32, vec![3000i64], vec![Some(30.5)], 300u64),
1514 ];
1515
1516 for (k0, k1, timestamps, values, seq) in test_data.iter() {
1517 let part =
1518 create_bulk_part_with_converter(k0, *k1, timestamps.clone(), values.clone(), *seq)
1519 .unwrap();
1520 memtable.write_bulk(part).unwrap();
1521 }
1522
1523 let stats = memtable.stats();
1524 assert_eq!(5, stats.num_rows);
1525 assert_eq!(3, stats.num_ranges);
1526 assert_eq!(300, stats.max_sequence);
1527
1528 let (min_ts, max_ts) = stats.time_range.unwrap();
1529 assert_eq!(1000, min_ts.value());
1530 assert_eq!(3000, max_ts.value());
1531
1532 let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
1533 let ranges = memtable
1534 .ranges(
1535 None,
1536 RangesOptions::default().with_predicate(predicate_group),
1537 )
1538 .unwrap();
1539
1540 assert_eq!(3, ranges.ranges.len());
1541 let total_rows: usize = ranges.ranges.values().map(|r| r.stats().num_rows()).sum();
1542 assert_eq!(5, total_rows);
1543
1544 for (_range_id, range) in ranges.ranges.iter() {
1545 assert!(range.num_rows() > 0);
1546 assert!(range.is_record_batch());
1547
1548 let record_batch_iter = range.build_record_batch_iter(None, None).unwrap();
1549
1550 let mut total_rows = 0;
1551 for batch_result in record_batch_iter {
1552 let batch = batch_result.unwrap();
1553 total_rows += batch.num_rows();
1554 assert!(batch.num_rows() > 0);
1555 assert_eq!(8, batch.num_columns());
1556 }
1557 assert_eq!(total_rows, range.num_rows());
1558 }
1559 }
1560
1561 #[test]
1562 fn test_bulk_memtable_compact_parts_with_json2() {
1563 let metadata = mock_metadata_with_json2();
1564
1565 let config = BulkMemtableConfig {
1566 merge_threshold: 2,
1567 encode_row_threshold: 1,
1568 encode_bytes_threshold: 1,
1569 ..Default::default()
1570 };
1571 let memtable = BulkMemtable::new(
1572 999,
1573 config,
1574 metadata.clone(),
1575 None,
1576 None,
1577 true,
1578 MergeMode::LastRow,
1579 );
1580 memtable.set_unordered_part_threshold(0);
1581
1582 let part1 = mock_bulk_part_with_json2(&metadata, vec![1000, 2000], 100).unwrap();
1583 let part2 = mock_bulk_part_with_json2(&metadata, vec![3000, 4000], 200).unwrap();
1584
1585 memtable.write_bulk(part1).unwrap();
1586 memtable.write_bulk(part2).unwrap();
1587 memtable.compact(false).unwrap();
1588
1589 let stats = memtable.stats();
1590 assert_eq!(4, stats.num_rows);
1591 assert_eq!(201, stats.max_sequence);
1592
1593 let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
1594 let opts = RangesOptions::default().with_predicate(predicate_group);
1595 let ranges = memtable.ranges(None, opts).unwrap();
1596
1597 assert_eq!(1, ranges.ranges.len());
1598 let total_rows: usize = ranges.ranges.values().map(|r| r.stats().num_rows()).sum();
1599 assert_eq!(4, total_rows);
1600 }
1601
1602 fn mock_metadata_with_json2() -> RegionMetadataRef {
1603 let col_meta_1 = ColumnMetadata {
1604 column_schema: ColumnSchema::new(
1605 "ts",
1606 ConcreteDataType::timestamp_millisecond_datatype(),
1607 false,
1608 ),
1609 semantic_type: SemanticType::Timestamp,
1610 column_id: 0,
1611 };
1612
1613 let data_type = ConcreteDataType::json2(JsonNativeType::Object(JsonObjectType::new()));
1614 let mut col_schema = ColumnSchema::new("data", data_type, true);
1615 let extension = JsonExtensionType::new(Arc::new(JsonMetadata::default()));
1616 col_schema.with_extension_type(&extension).unwrap();
1617
1618 let col_meta_2 = ColumnMetadata {
1619 column_schema: col_schema,
1620 semantic_type: SemanticType::Field,
1621 column_id: 1,
1622 };
1623 let mut builder = RegionMetadataBuilder::new(RegionId::new(123, 789));
1624 builder
1625 .push_column_metadata(col_meta_1)
1626 .push_column_metadata(col_meta_2)
1627 .primary_key(vec![]);
1628 Arc::new(builder.build().unwrap())
1629 }
1630
1631 fn mock_bulk_part_with_json2(
1632 metadata: &RegionMetadataRef,
1633 timestamps: Vec<i64>,
1634 sequence: u64,
1635 ) -> Result<BulkPart> {
1636 let capacity = timestamps.len();
1637 let primary_key_codec = build_primary_key_codec(metadata);
1638 let json_type = JsonNativeType::Object(JsonObjectType::from([
1639 ("id".to_string(), JsonNativeType::i64()),
1640 (
1641 "payload".to_string(),
1642 JsonNativeType::Object(JsonObjectType::from([(
1643 "message".to_string(),
1644 JsonNativeType::String,
1645 )])),
1646 ),
1647 ]));
1648 let mut options = FlatSchemaOptions::from_encoding(metadata.primary_key_encoding);
1649 options
1650 .concretized_json_types
1651 .insert("data".to_string(), json_type.as_arrow_type());
1652 let schema = to_flat_sst_arrow_schema(metadata, &options);
1653
1654 let mut converter =
1655 BulkPartConverter::new(metadata, schema, capacity, primary_key_codec, true);
1656
1657 let rows = timestamps
1658 .into_iter()
1659 .map(|ts| {
1660 let val1 = api::v1::Value {
1661 value_data: Some(ValueData::TimestampMillisecondValue(ts)),
1662 };
1663 let value_data = ValueData::JsonValue(encode_json_value(JsonValue::from(json!({
1664 "id": ts,
1665 "payload": {
1666 "message": format!("row-{ts}"),
1667 },
1668 }))));
1669 let val2 = api::v1::Value {
1670 value_data: Some(value_data),
1671 };
1672 Row {
1673 values: vec![val1, val2],
1674 }
1675 })
1676 .collect();
1677
1678 let mutation = Mutation {
1679 op_type: 1,
1680 sequence,
1681 rows: Some(Rows {
1682 schema: region_metadata_to_row_schema(metadata),
1683 rows,
1684 }),
1685 write_hint: None,
1686 };
1687 let key_values = KeyValues::new(metadata.as_ref(), mutation).unwrap();
1688
1689 converter.append_key_values(&key_values)?;
1690 converter.convert()
1691 }
1692
1693 #[test]
1694 fn test_bulk_memtable_ranges_with_projection() {
1695 let metadata = metadata_for_test();
1696 let memtable = BulkMemtable::new(
1697 111,
1698 BulkMemtableConfig::default(),
1699 metadata.clone(),
1700 None,
1701 None,
1702 false,
1703 MergeMode::LastRow,
1704 );
1705
1706 let bulk_part = create_bulk_part_with_converter(
1707 "projection_test",
1708 5,
1709 vec![5000, 6000, 7000],
1710 vec![Some(50.0), Some(60.0), Some(70.0)],
1711 500,
1712 )
1713 .unwrap();
1714
1715 memtable.write_bulk(bulk_part).unwrap();
1716
1717 let projection = vec![4u32];
1718 let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
1719 let ranges = memtable
1720 .ranges(
1721 Some(&projection),
1722 RangesOptions::default().with_predicate(predicate_group),
1723 )
1724 .unwrap();
1725
1726 assert_eq!(1, ranges.ranges.len());
1727 let range = ranges.ranges.get(&0).unwrap();
1728
1729 assert!(range.is_record_batch());
1730 let record_batch_iter = range.build_record_batch_iter(None, None).unwrap();
1731
1732 let mut total_rows = 0;
1733 for batch_result in record_batch_iter {
1734 let batch = batch_result.unwrap();
1735 assert!(batch.num_rows() > 0);
1736 assert_eq!(5, batch.num_columns());
1737 total_rows += batch.num_rows();
1738 }
1739 assert_eq!(3, total_rows);
1740 }
1741
1742 #[test]
1743 fn test_bulk_memtable_unsupported_operations() {
1744 let metadata = metadata_for_test();
1745 let memtable = BulkMemtable::new(
1746 111,
1747 BulkMemtableConfig::default(),
1748 metadata.clone(),
1749 None,
1750 None,
1751 false,
1752 MergeMode::LastRow,
1753 );
1754
1755 let key_values = build_key_values_with_ts_seq_values(
1756 &metadata,
1757 "test".to_string(),
1758 1,
1759 vec![1000].into_iter(),
1760 vec![Some(1.0)].into_iter(),
1761 1,
1762 );
1763
1764 let err = memtable.write(&key_values).unwrap_err();
1765 assert!(err.to_string().contains("not supported"));
1766
1767 let kv = key_values.iter().next().unwrap();
1768 let err = memtable.write_one(kv).unwrap_err();
1769 assert!(err.to_string().contains("not supported"));
1770 }
1771
1772 #[test]
1773 fn test_bulk_memtable_freeze() {
1774 let metadata = metadata_for_test();
1775 let memtable = BulkMemtable::new(
1776 222,
1777 BulkMemtableConfig::default(),
1778 metadata.clone(),
1779 None,
1780 None,
1781 false,
1782 MergeMode::LastRow,
1783 );
1784
1785 let bulk_part = create_bulk_part_with_converter(
1786 "freeze_test",
1787 10,
1788 vec![10000],
1789 vec![Some(100.0)],
1790 1000,
1791 )
1792 .unwrap();
1793
1794 memtable.write_bulk(bulk_part).unwrap();
1795 memtable.freeze().unwrap();
1796
1797 let stats_after_freeze = memtable.stats();
1798 assert_eq!(1, stats_after_freeze.num_rows);
1799 }
1800
1801 #[test]
1802 fn test_bulk_memtable_fork() {
1803 let metadata = metadata_for_test();
1804 let original_memtable = BulkMemtable::new(
1805 333,
1806 BulkMemtableConfig::default(),
1807 metadata.clone(),
1808 None,
1809 None,
1810 false,
1811 MergeMode::LastRow,
1812 );
1813
1814 let bulk_part =
1815 create_bulk_part_with_converter("fork_test", 15, vec![15000], vec![Some(150.0)], 1500)
1816 .unwrap();
1817
1818 original_memtable.write_bulk(bulk_part).unwrap();
1819
1820 let forked_memtable = original_memtable.fork(444, &metadata);
1821
1822 assert_eq!(forked_memtable.id(), 444);
1823 assert!(forked_memtable.is_empty());
1824 assert_eq!(0, forked_memtable.stats().num_rows);
1825
1826 assert!(!original_memtable.is_empty());
1827 assert_eq!(1, original_memtable.stats().num_rows);
1828 }
1829
1830 #[test]
1831 fn test_bulk_memtable_ranges_multiple_parts() {
1832 let metadata = metadata_for_test();
1833 let memtable = BulkMemtable::new(
1834 777,
1835 BulkMemtableConfig::default(),
1836 metadata.clone(),
1837 None,
1838 None,
1839 false,
1840 MergeMode::LastRow,
1841 );
1842 memtable.set_unordered_part_threshold(0);
1844
1845 let parts_data = vec![
1846 (
1847 "part1",
1848 1u32,
1849 vec![1000i64, 1100i64],
1850 vec![Some(10.0), Some(11.0)],
1851 100u64,
1852 ),
1853 (
1854 "part2",
1855 2u32,
1856 vec![2000i64, 2100i64],
1857 vec![Some(20.0), Some(21.0)],
1858 200u64,
1859 ),
1860 ("part3", 3u32, vec![3000i64], vec![Some(30.0)], 300u64),
1861 ];
1862
1863 for (k0, k1, timestamps, values, seq) in parts_data {
1864 let part = create_bulk_part_with_converter(k0, k1, timestamps, values, seq).unwrap();
1865 memtable.write_bulk(part).unwrap();
1866 }
1867
1868 let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
1869 let ranges = memtable
1870 .ranges(
1871 None,
1872 RangesOptions::default().with_predicate(predicate_group),
1873 )
1874 .unwrap();
1875
1876 assert_eq!(3, ranges.ranges.len());
1877 let total_rows: usize = ranges.ranges.values().map(|r| r.stats().num_rows()).sum();
1878 assert_eq!(5, total_rows);
1879 assert_eq!(3, ranges.ranges.len());
1880
1881 for (range_id, range) in ranges.ranges.iter() {
1882 assert!(*range_id < 3);
1883 assert!(range.num_rows() > 0);
1884 assert!(range.is_record_batch());
1885 }
1886 }
1887
1888 #[test]
1889 fn test_bulk_memtable_ranges_with_sequence_filter() {
1890 let metadata = metadata_for_test();
1891 let memtable = BulkMemtable::new(
1892 888,
1893 BulkMemtableConfig::default(),
1894 metadata.clone(),
1895 None,
1896 None,
1897 false,
1898 MergeMode::LastRow,
1899 );
1900
1901 let part = create_bulk_part_with_converter(
1902 "seq_test",
1903 1,
1904 vec![1000, 2000, 3000],
1905 vec![Some(10.0), Some(20.0), Some(30.0)],
1906 500,
1907 )
1908 .unwrap();
1909
1910 memtable.write_bulk(part).unwrap();
1911
1912 let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
1913 let sequence_filter = Some(SequenceRange::LtEq { max: 400 }); let ranges = memtable
1915 .ranges(
1916 None,
1917 RangesOptions::default()
1918 .with_predicate(predicate_group)
1919 .with_sequence(sequence_filter),
1920 )
1921 .unwrap();
1922
1923 assert_eq!(1, ranges.ranges.len());
1924 let range = ranges.ranges.get(&0).unwrap();
1925
1926 let mut record_batch_iter = range.build_record_batch_iter(None, None).unwrap();
1927 assert!(record_batch_iter.next().is_none());
1928 }
1929
1930 #[test]
1931 fn test_bulk_memtable_ranges_with_encoded_parts() {
1932 let metadata = metadata_for_test();
1933 let config = BulkMemtableConfig {
1934 merge_threshold: 8,
1935 ..Default::default()
1936 };
1937 let memtable = BulkMemtable::new(
1938 999,
1939 config,
1940 metadata.clone(),
1941 None,
1942 None,
1943 false,
1944 MergeMode::LastRow,
1945 );
1946 memtable.set_unordered_part_threshold(0);
1948
1949 for i in 0..10 {
1951 let part = create_bulk_part_with_converter(
1952 &format!("key_{}", i),
1953 i,
1954 vec![1000 + i as i64 * 100],
1955 vec![Some(i as f64 * 10.0)],
1956 100 + i as u64,
1957 )
1958 .unwrap();
1959 memtable.write_bulk(part).unwrap();
1960 }
1961
1962 memtable.compact(false).unwrap();
1963
1964 let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
1965 let ranges = memtable
1966 .ranges(
1967 None,
1968 RangesOptions::default().with_predicate(predicate_group),
1969 )
1970 .unwrap();
1971
1972 assert_eq!(3, ranges.ranges.len());
1974 let total_rows: usize = ranges.ranges.values().map(|r| r.stats().num_rows()).sum();
1975 assert_eq!(10, total_rows);
1976
1977 for (_range_id, range) in ranges.ranges.iter() {
1978 assert!(range.num_rows() > 0);
1979 assert!(range.is_record_batch());
1980
1981 let record_batch_iter = range.build_record_batch_iter(None, None).unwrap();
1982 let mut total_rows = 0;
1983 for batch_result in record_batch_iter {
1984 let batch = batch_result.unwrap();
1985 total_rows += batch.num_rows();
1986 assert!(batch.num_rows() > 0);
1987 }
1988 assert_eq!(total_rows, range.num_rows());
1989 }
1990 }
1991
1992 #[test]
1993 fn test_bulk_memtable_unordered_part() {
1994 let metadata = metadata_for_test();
1995 let memtable = BulkMemtable::new(
1996 1001,
1997 BulkMemtableConfig::default(),
1998 metadata.clone(),
1999 None,
2000 None,
2001 false,
2002 MergeMode::LastRow,
2003 );
2004
2005 memtable.set_unordered_part_threshold(5);
2008 memtable.set_unordered_part_compact_threshold(10);
2010
2011 for i in 0..3 {
2013 let part = create_bulk_part_with_converter(
2014 &format!("key_{}", i),
2015 i,
2016 vec![1000 + i as i64 * 100, 1100 + i as i64 * 100],
2017 vec![Some(i as f64 * 10.0), Some(i as f64 * 10.0 + 1.0)],
2018 100 + i as u64,
2019 )
2020 .unwrap();
2021 assert_eq!(2, part.num_rows());
2022 memtable.write_bulk(part).unwrap();
2023 }
2024
2025 let stats = memtable.stats();
2027 assert_eq!(6, stats.num_rows);
2028
2029 for i in 3..5 {
2032 let part = create_bulk_part_with_converter(
2033 &format!("key_{}", i),
2034 i,
2035 vec![1000 + i as i64 * 100, 1100 + i as i64 * 100],
2036 vec![Some(i as f64 * 10.0), Some(i as f64 * 10.0 + 1.0)],
2037 100 + i as u64,
2038 )
2039 .unwrap();
2040 memtable.write_bulk(part).unwrap();
2041 }
2042
2043 let stats = memtable.stats();
2045 assert_eq!(10, stats.num_rows);
2046
2047 let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
2049 let ranges = memtable
2050 .ranges(
2051 None,
2052 RangesOptions::default().with_predicate(predicate_group),
2053 )
2054 .unwrap();
2055
2056 assert!(!ranges.ranges.is_empty());
2058 let total_rows: usize = ranges.ranges.values().map(|r| r.stats().num_rows()).sum();
2059 assert_eq!(10, total_rows);
2060
2061 let mut total_rows_read = 0;
2063 for (_range_id, range) in ranges.ranges.iter() {
2064 assert!(range.is_record_batch());
2065 let record_batch_iter = range.build_record_batch_iter(None, None).unwrap();
2066
2067 for batch_result in record_batch_iter {
2068 let batch = batch_result.unwrap();
2069 total_rows_read += batch.num_rows();
2070 }
2071 }
2072 assert_eq!(10, total_rows_read);
2073 }
2074
2075 #[test]
2076 fn test_bulk_memtable_unordered_part_mixed_sizes() {
2077 let metadata = metadata_for_test();
2078 let memtable = BulkMemtable::new(
2079 1002,
2080 BulkMemtableConfig::default(),
2081 metadata.clone(),
2082 None,
2083 None,
2084 false,
2085 MergeMode::LastRow,
2086 );
2087
2088 memtable.set_unordered_part_threshold(4);
2090 memtable.set_unordered_part_compact_threshold(8);
2091
2092 for i in 0..2 {
2094 let part = create_bulk_part_with_converter(
2095 &format!("small_{}", i),
2096 i,
2097 vec![1000 + i as i64, 2000 + i as i64, 3000 + i as i64],
2098 vec![Some(i as f64), Some(i as f64 + 1.0), Some(i as f64 + 2.0)],
2099 10 + i as u64,
2100 )
2101 .unwrap();
2102 assert_eq!(3, part.num_rows());
2103 memtable.write_bulk(part).unwrap();
2104 }
2105
2106 let large_part = create_bulk_part_with_converter(
2108 "large_key",
2109 100,
2110 vec![5000, 6000, 7000, 8000, 9000],
2111 vec![
2112 Some(100.0),
2113 Some(101.0),
2114 Some(102.0),
2115 Some(103.0),
2116 Some(104.0),
2117 ],
2118 50,
2119 )
2120 .unwrap();
2121 assert_eq!(5, large_part.num_rows());
2122 memtable.write_bulk(large_part).unwrap();
2123
2124 let part = create_bulk_part_with_converter(
2126 "small_2",
2127 2,
2128 vec![4000, 4100],
2129 vec![Some(20.0), Some(21.0)],
2130 30,
2131 )
2132 .unwrap();
2133 memtable.write_bulk(part).unwrap();
2134
2135 let stats = memtable.stats();
2136 assert_eq!(13, stats.num_rows); let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
2140 let ranges = memtable
2141 .ranges(
2142 None,
2143 RangesOptions::default().with_predicate(predicate_group),
2144 )
2145 .unwrap();
2146
2147 let total_rows: usize = ranges.ranges.values().map(|r| r.stats().num_rows()).sum();
2148 assert_eq!(13, total_rows);
2149
2150 let mut total_rows_read = 0;
2151 for (_range_id, range) in ranges.ranges.iter() {
2152 let record_batch_iter = range.build_record_batch_iter(None, None).unwrap();
2153 for batch_result in record_batch_iter {
2154 let batch = batch_result.unwrap();
2155 total_rows_read += batch.num_rows();
2156 }
2157 }
2158 assert_eq!(13, total_rows_read);
2159 }
2160
2161 #[test]
2162 fn test_bulk_memtable_unordered_part_with_ranges() {
2163 let metadata = metadata_for_test();
2164 let memtable = BulkMemtable::new(
2165 1003,
2166 BulkMemtableConfig::default(),
2167 metadata.clone(),
2168 None,
2169 None,
2170 false,
2171 MergeMode::LastRow,
2172 );
2173
2174 memtable.set_unordered_part_threshold(3);
2176 memtable.set_unordered_part_compact_threshold(100); for i in 0..3 {
2180 let part = create_bulk_part_with_converter(
2181 &format!("key_{}", i),
2182 i,
2183 vec![1000 + i as i64 * 100],
2184 vec![Some(i as f64 * 10.0)],
2185 100 + i as u64,
2186 )
2187 .unwrap();
2188 assert_eq!(1, part.num_rows());
2189 memtable.write_bulk(part).unwrap();
2190 }
2191
2192 let stats = memtable.stats();
2193 assert_eq!(3, stats.num_rows);
2194
2195 let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
2197 let ranges = memtable
2198 .ranges(
2199 None,
2200 RangesOptions::default().with_predicate(predicate_group),
2201 )
2202 .unwrap();
2203
2204 assert_eq!(1, ranges.ranges.len());
2206 let total_rows: usize = ranges.ranges.values().map(|r| r.stats().num_rows()).sum();
2207 assert_eq!(3, total_rows);
2208
2209 let range = ranges.ranges.get(&0).unwrap();
2211 let record_batch_iter = range.build_record_batch_iter(None, None).unwrap();
2212
2213 let mut total_rows = 0;
2214 for batch_result in record_batch_iter {
2215 let batch = batch_result.unwrap();
2216 total_rows += batch.num_rows();
2217 assert!(batch.num_rows() > 0);
2219 }
2220 assert_eq!(3, total_rows);
2221 }
2222
2223 fn create_bulk_part_wrapper(part: BulkPart) -> BulkPartWrapper {
2225 BulkPartWrapper {
2226 part: PartToMerge::Bulk {
2227 part,
2228 file_id: FileId::random(),
2229 },
2230 merging: false,
2231 }
2232 }
2233
2234 #[test]
2235 fn test_should_merge_parts_below_threshold() {
2236 let mut bulk_parts = BulkParts::default();
2237
2238 for i in 0..DEFAULT_MERGE_THRESHOLD - 1 {
2240 let part = create_bulk_part_with_converter(
2241 &format!("key_{}", i),
2242 i as u32,
2243 vec![1000 + i as i64 * 100],
2244 vec![Some(i as f64 * 10.0)],
2245 100 + i as u64,
2246 )
2247 .unwrap();
2248 bulk_parts.parts.push(create_bulk_part_wrapper(part));
2249 }
2250
2251 assert!(!bulk_parts.should_merge_parts(DEFAULT_MERGE_THRESHOLD));
2253 }
2254
2255 #[test]
2256 fn test_should_merge_parts_at_threshold() {
2257 let mut bulk_parts = BulkParts::default();
2258 let merge_threshold = 8;
2259
2260 for i in 0..merge_threshold {
2262 let part = create_bulk_part_with_converter(
2263 &format!("key_{}", i),
2264 i as u32,
2265 vec![1000 + i as i64 * 100],
2266 vec![Some(i as f64 * 10.0)],
2267 100 + i as u64,
2268 )
2269 .unwrap();
2270 bulk_parts.parts.push(create_bulk_part_wrapper(part));
2271 }
2272
2273 assert!(bulk_parts.should_merge_parts(merge_threshold));
2275 }
2276
2277 #[test]
2278 fn test_should_merge_parts_with_merging_flag() {
2279 let mut bulk_parts = BulkParts::default();
2280 let merge_threshold = 8;
2281
2282 for i in 0..10 {
2284 let part = create_bulk_part_with_converter(
2285 &format!("key_{}", i),
2286 i as u32,
2287 vec![1000 + i as i64 * 100],
2288 vec![Some(i as f64 * 10.0)],
2289 100 + i as u64,
2290 )
2291 .unwrap();
2292 bulk_parts.parts.push(create_bulk_part_wrapper(part));
2293 }
2294
2295 assert!(bulk_parts.should_merge_parts(merge_threshold));
2297
2298 for wrapper in bulk_parts.parts.iter_mut().take(3) {
2300 wrapper.merging = true;
2301 }
2302
2303 assert!(!bulk_parts.should_merge_parts(merge_threshold));
2305 }
2306
2307 #[test]
2308 fn test_collect_parts_to_merge_grouping() {
2309 let mut bulk_parts = BulkParts::default();
2310
2311 for i in 0..16 {
2313 let num_rows = (i % 4) + 1; let timestamps: Vec<i64> = (0..num_rows)
2315 .map(|j| 1000 + i as i64 * 100 + j as i64)
2316 .collect();
2317 let values: Vec<Option<f64>> =
2318 (0..num_rows).map(|j| Some((i * 10 + j) as f64)).collect();
2319 let part = create_bulk_part_with_converter(
2320 &format!("key_{}", i),
2321 i as u32,
2322 timestamps,
2323 values,
2324 100 + i as u64,
2325 )
2326 .unwrap();
2327 bulk_parts.parts.push(create_bulk_part_wrapper(part));
2328 }
2329
2330 assert!(bulk_parts.should_merge_parts(DEFAULT_MERGE_THRESHOLD));
2332
2333 let collected =
2335 bulk_parts.collect_parts_to_merge(DEFAULT_MERGE_THRESHOLD, DEFAULT_MAX_MERGE_GROUPS);
2336
2337 assert!(!collected.groups.is_empty());
2339
2340 for group in &collected.groups {
2342 assert!(!group.is_empty());
2343 }
2344
2345 let total_parts: usize = collected.groups.iter().map(|g| g.len()).sum();
2347 assert_eq!(16, total_parts);
2348 }
2349
2350 #[test]
2351 fn test_bulk_memtable_ranges_with_multi_bulk_part() {
2352 let metadata = metadata_for_test();
2353 let merge_threshold = 8;
2354 let config = BulkMemtableConfig {
2355 merge_threshold,
2356 ..Default::default()
2357 };
2358 let memtable = BulkMemtable::new(
2359 2005,
2360 config,
2361 metadata.clone(),
2362 None,
2363 None,
2364 false,
2365 MergeMode::LastRow,
2366 );
2367 memtable.set_unordered_part_threshold(0);
2369
2370 for i in 0..merge_threshold {
2374 let part = create_bulk_part_with_converter(
2375 &format!("key_{}", i),
2376 i as u32,
2377 vec![1000 + i as i64 * 100, 2000 + i as i64 * 100],
2378 vec![Some(i as f64 * 10.0), Some(i as f64 * 10.0 + 1.0)],
2379 100 + i as u64,
2380 )
2381 .unwrap();
2382 memtable.write_bulk(part).unwrap();
2383 }
2384
2385 memtable.compact(false).unwrap();
2387
2388 let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
2390 let ranges = memtable
2391 .ranges(
2392 None,
2393 RangesOptions::default().with_predicate(predicate_group),
2394 )
2395 .unwrap();
2396
2397 assert_eq!(1, ranges.ranges.len());
2398 let expected_rows = merge_threshold * 2; let total_rows: usize = ranges.ranges.values().map(|r| r.stats().num_rows()).sum();
2400 assert_eq!(expected_rows, total_rows);
2401
2402 let mut total_rows_read = 0;
2404 for (_range_id, range) in ranges.ranges.iter() {
2405 assert!(range.is_record_batch());
2406 let record_batch_iter = range.build_record_batch_iter(None, None).unwrap();
2407
2408 for batch_result in record_batch_iter {
2409 let batch = batch_result.unwrap();
2410 total_rows_read += batch.num_rows();
2411 }
2412 }
2413 assert_eq!(expected_rows, total_rows_read);
2414 }
2415
2416 #[test]
2417 fn test_multi_bulk_range_iter_builder_all_pruned() {
2418 let metadata = metadata_for_test();
2419 let merge_threshold = 8;
2420 let config = BulkMemtableConfig {
2421 merge_threshold,
2422 ..Default::default()
2423 };
2424 let memtable = BulkMemtable::new(
2425 2006,
2426 config,
2427 metadata.clone(),
2428 None,
2429 None,
2430 false,
2431 MergeMode::LastRow,
2432 );
2433 memtable.set_unordered_part_threshold(0);
2434
2435 for i in 0..merge_threshold {
2437 let part = create_bulk_part_with_converter(
2438 &format!("key_{}", i),
2439 i as u32,
2440 vec![1000 + i as i64 * 100, 2000 + i as i64 * 100],
2441 vec![Some(i as f64 * 10.0), Some(i as f64 * 10.0 + 1.0)],
2442 100 + i as u64,
2443 )
2444 .unwrap();
2445 memtable.write_bulk(part).unwrap();
2446 }
2447 memtable.compact(false).unwrap();
2448
2449 let filter = datafusion_expr::col("k0").eq(datafusion_expr::lit("nonexistent"));
2451 let predicate_group = PredicateGroup::new(&metadata, &[filter]).unwrap();
2452 let ranges = memtable
2453 .ranges(
2454 None,
2455 RangesOptions::default().with_predicate(predicate_group),
2456 )
2457 .unwrap();
2458
2459 for (_range_id, range) in ranges.ranges.iter() {
2462 assert!(range.is_record_batch());
2463 let record_batch_iter = range.build_record_batch_iter(None, None).unwrap();
2464 let total_rows: usize = record_batch_iter.map(|r| r.unwrap().num_rows()).sum();
2465 assert_eq!(0, total_rows);
2466 }
2467 }
2468}