1#[allow(unused)]
18pub mod context;
19#[allow(unused)]
20pub mod part;
21pub mod part_reader;
22mod row_group_reader;
23
24use std::collections::{BTreeMap, HashSet};
25use std::sync::atomic::{AtomicI64, AtomicU64, AtomicUsize, Ordering};
26use std::sync::{Arc, LazyLock, Mutex, RwLock};
27use std::time::Instant;
28
29fn env_usize(name: &str, default: usize) -> usize {
31 std::env::var(name)
32 .ok()
33 .and_then(|v| v.parse().ok())
34 .unwrap_or(default)
35}
36
37use datatypes::arrow::datatypes::SchemaRef;
38use mito_codec::key_values::KeyValue;
39use rayon::prelude::*;
40use store_api::metadata::RegionMetadataRef;
41use store_api::storage::{ColumnId, FileId, RegionId, SequenceRange};
42use tokio::sync::Semaphore;
43
44use crate::error::{Result, UnsupportedOperationSnafu};
45use crate::flush::WriteBufferManagerRef;
46use crate::memtable::bulk::context::BulkIterContext;
47use crate::memtable::bulk::part::{
48 BulkPart, BulkPartEncodeMetrics, BulkPartEncoder, MultiBulkPart, UnorderedPart,
49};
50use crate::memtable::bulk::part_reader::BulkPartBatchIter;
51use crate::memtable::stats::WriteMetrics;
52use crate::memtable::{
53 AllocTracker, BoxedBatchIterator, BoxedRecordBatchIterator, EncodedBulkPart, EncodedRange,
54 IterBuilder, KeyValues, MemScanMetrics, Memtable, MemtableBuilder, MemtableId, MemtableRange,
55 MemtableRangeContext, MemtableRanges, MemtableRef, MemtableStats, RangesOptions,
56};
57use crate::read::flat_dedup::{FlatDedupIterator, FlatLastNonNull, FlatLastRow};
58use crate::read::flat_merge::FlatMergeIterator;
59use crate::region::options::MergeMode;
60use crate::sst::parquet::format::FIXED_POS_COLUMN_NUM;
61use crate::sst::parquet::{DEFAULT_READ_BATCH_SIZE, DEFAULT_ROW_GROUP_SIZE};
62use crate::sst::{FlatSchemaOptions, to_flat_sst_arrow_schema};
63
64const DEFAULT_MERGE_THRESHOLD: usize = 16;
66
67static MERGE_THRESHOLD: LazyLock<usize> =
69 LazyLock::new(|| env_usize("GREPTIME_BULK_MERGE_THRESHOLD", DEFAULT_MERGE_THRESHOLD));
70
71const DEFAULT_MAX_MERGE_GROUPS: usize = 32;
73
74static MAX_MERGE_GROUPS: LazyLock<usize> =
76 LazyLock::new(|| env_usize("GREPTIME_BULK_MAX_MERGE_GROUPS", DEFAULT_MAX_MERGE_GROUPS));
77
78pub(crate) static ENCODE_ROW_THRESHOLD: LazyLock<usize> = LazyLock::new(|| {
81 env_usize(
82 "GREPTIME_BULK_ENCODE_ROW_THRESHOLD",
83 10 * DEFAULT_ROW_GROUP_SIZE,
84 )
85});
86
87const DEFAULT_ENCODE_BYTES_THRESHOLD: usize = 64 * 1024 * 1024;
89
90static ENCODE_BYTES_THRESHOLD: LazyLock<usize> = LazyLock::new(|| {
93 env_usize(
94 "GREPTIME_BULK_ENCODE_BYTES_THRESHOLD",
95 DEFAULT_ENCODE_BYTES_THRESHOLD,
96 )
97});
98
99#[derive(Debug, Clone)]
101pub struct BulkMemtableConfig {
102 pub merge_threshold: usize,
104 pub encode_row_threshold: usize,
106 pub encode_bytes_threshold: usize,
108 pub max_merge_groups: usize,
110}
111
112impl Default for BulkMemtableConfig {
113 fn default() -> Self {
114 Self {
115 merge_threshold: *MERGE_THRESHOLD,
116 encode_row_threshold: *ENCODE_ROW_THRESHOLD,
117 encode_bytes_threshold: *ENCODE_BYTES_THRESHOLD,
118 max_merge_groups: *MAX_MERGE_GROUPS,
119 }
120 }
121}
122
123enum MergedPart {
125 Multi(MultiBulkPart),
127 Encoded(EncodedBulkPart),
129}
130
131struct CollectedParts {
133 groups: Vec<Vec<PartToMerge>>,
135}
136
137#[derive(Default)]
139struct BulkParts {
140 unordered_part: UnorderedPart,
142 parts: Vec<BulkPartWrapper>,
144}
145
146impl BulkParts {
147 fn num_parts(&self) -> usize {
149 let unordered_count = if self.unordered_part.is_empty() { 0 } else { 1 };
150 self.parts.len() + unordered_count
151 }
152
153 fn is_empty(&self) -> bool {
155 self.unordered_part.is_empty() && self.parts.is_empty()
156 }
157
158 fn should_merge_parts(&self, merge_threshold: usize) -> bool {
161 let mut bulk_count = 0;
162 let mut encoded_count = 0;
163
164 for wrapper in &self.parts {
165 if wrapper.merging {
166 continue;
167 }
168
169 if wrapper.part.is_encoded() {
170 encoded_count += 1;
171 } else {
172 bulk_count += 1;
173 }
174
175 if bulk_count >= merge_threshold || encoded_count >= merge_threshold {
177 return true;
178 }
179 }
180
181 false
182 }
183
184 fn should_compact_unordered_part(&self) -> bool {
186 self.unordered_part.should_compact()
187 }
188
189 fn collect_parts_to_merge(
193 &mut self,
194 merge_threshold: usize,
195 max_merge_groups: usize,
196 ) -> CollectedParts {
197 let mut bulk_indices: Vec<(usize, usize)> = Vec::new();
199 let mut encoded_indices: Vec<(usize, usize)> = Vec::new();
200
201 for (idx, wrapper) in self.parts.iter().enumerate() {
202 if wrapper.merging {
203 continue;
204 }
205 let num_rows = wrapper.part.num_rows();
206 if wrapper.part.is_encoded() {
207 encoded_indices.push((idx, num_rows));
208 } else {
209 bulk_indices.push((idx, num_rows));
210 }
211 }
212
213 let mut groups = Vec::new();
214
215 if bulk_indices.len() >= merge_threshold {
217 groups.extend(self.collect_and_group_parts(
218 bulk_indices,
219 merge_threshold,
220 max_merge_groups,
221 ));
222 }
223
224 if encoded_indices.len() >= merge_threshold {
226 groups.extend(self.collect_and_group_parts(
227 encoded_indices,
228 merge_threshold,
229 max_merge_groups,
230 ));
231 }
232
233 CollectedParts { groups }
234 }
235
236 fn collect_and_group_parts(
238 &mut self,
239 mut indices: Vec<(usize, usize)>,
240 merge_threshold: usize,
241 max_merge_groups: usize,
242 ) -> Vec<Vec<PartToMerge>> {
243 if indices.is_empty() {
244 return Vec::new();
245 }
246
247 indices.sort_unstable_by_key(|(_, num_rows)| *num_rows);
249
250 indices
252 .chunks(merge_threshold)
253 .take(max_merge_groups)
254 .map(|chunk| {
255 chunk
256 .iter()
257 .map(|(idx, _)| {
258 let wrapper = &mut self.parts[*idx];
259 wrapper.merging = true;
260 wrapper.part.clone()
261 })
262 .collect()
263 })
264 .collect()
265 }
266
267 fn install_merged_parts<I>(
270 &mut self,
271 merged_parts: I,
272 merged_file_ids: &HashSet<FileId>,
273 ) -> usize
274 where
275 I: IntoIterator<Item = MergedPart>,
276 {
277 let mut total_output_rows = 0;
278
279 for merged_part in merged_parts {
280 match merged_part {
281 MergedPart::Encoded(encoded_part) => {
282 total_output_rows += encoded_part.metadata().num_rows;
283 self.parts.push(BulkPartWrapper {
284 part: PartToMerge::Encoded {
285 part: encoded_part,
286 file_id: FileId::random(),
287 },
288 merging: false,
289 });
290 }
291 MergedPart::Multi(multi_part) => {
292 total_output_rows += multi_part.num_rows();
293 self.parts.push(BulkPartWrapper {
294 part: PartToMerge::Multi {
295 part: multi_part,
296 file_id: FileId::random(),
297 },
298 merging: false,
299 });
300 }
301 }
302 }
303
304 self.parts
305 .retain(|wrapper| !merged_file_ids.contains(&wrapper.file_id()));
306
307 total_output_rows
308 }
309
310 fn reset_merging_flags(&mut self, file_ids: &HashSet<FileId>) {
313 for wrapper in &mut self.parts {
314 if file_ids.contains(&wrapper.file_id()) {
315 wrapper.merging = false;
316 }
317 }
318 }
319}
320
321struct MergingFlagsGuard<'a> {
324 bulk_parts: &'a RwLock<BulkParts>,
325 file_ids: &'a HashSet<FileId>,
326 success: bool,
327}
328
329impl<'a> MergingFlagsGuard<'a> {
330 fn new(bulk_parts: &'a RwLock<BulkParts>, file_ids: &'a HashSet<FileId>) -> Self {
332 Self {
333 bulk_parts,
334 file_ids,
335 success: false,
336 }
337 }
338
339 fn mark_success(&mut self) {
342 self.success = true;
343 }
344}
345
346impl<'a> Drop for MergingFlagsGuard<'a> {
347 fn drop(&mut self) {
348 if !self.success
349 && let Ok(mut parts) = self.bulk_parts.write()
350 {
351 parts.reset_merging_flags(self.file_ids);
352 }
353 }
354}
355
356pub struct BulkMemtable {
358 id: MemtableId,
359 config: BulkMemtableConfig,
361 parts: Arc<RwLock<BulkParts>>,
362 metadata: RegionMetadataRef,
363 alloc_tracker: AllocTracker,
364 max_timestamp: AtomicI64,
365 min_timestamp: AtomicI64,
366 max_sequence: AtomicU64,
367 num_rows: AtomicUsize,
368 flat_arrow_schema: SchemaRef,
370 compactor: Arc<Mutex<MemtableCompactor>>,
372 compact_dispatcher: Option<Arc<CompactDispatcher>>,
374 append_mode: bool,
376 merge_mode: MergeMode,
378}
379
380impl std::fmt::Debug for BulkMemtable {
381 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
382 f.debug_struct("BulkMemtable")
383 .field("id", &self.id)
384 .field("num_rows", &self.num_rows.load(Ordering::Relaxed))
385 .field("min_timestamp", &self.min_timestamp.load(Ordering::Relaxed))
386 .field("max_timestamp", &self.max_timestamp.load(Ordering::Relaxed))
387 .field("max_sequence", &self.max_sequence.load(Ordering::Relaxed))
388 .finish()
389 }
390}
391
392impl Memtable for BulkMemtable {
393 fn id(&self) -> MemtableId {
394 self.id
395 }
396
397 fn write(&self, _kvs: &KeyValues) -> Result<()> {
398 UnsupportedOperationSnafu {
399 err_msg: "write() is not supported for bulk memtable",
400 }
401 .fail()
402 }
403
404 fn write_one(&self, _key_value: KeyValue) -> Result<()> {
405 UnsupportedOperationSnafu {
406 err_msg: "write_one() is not supported for bulk memtable",
407 }
408 .fail()
409 }
410
411 fn write_bulk(&self, fragment: BulkPart) -> Result<()> {
412 let local_metrics = WriteMetrics {
413 key_bytes: 0,
414 value_bytes: fragment.estimated_size(),
415 min_ts: fragment.min_timestamp,
416 max_ts: fragment.max_timestamp,
417 num_rows: fragment.num_rows(),
418 max_sequence: fragment.sequence,
419 };
420
421 {
422 let mut bulk_parts = self.parts.write().unwrap();
423
424 if bulk_parts.unordered_part.should_accept(fragment.num_rows()) {
426 bulk_parts.unordered_part.push(fragment);
427
428 if bulk_parts.should_compact_unordered_part()
430 && let Some(bulk_part) = bulk_parts.unordered_part.to_bulk_part()?
431 {
432 bulk_parts.parts.push(BulkPartWrapper {
433 part: PartToMerge::Bulk {
434 part: bulk_part,
435 file_id: FileId::random(),
436 },
437 merging: false,
438 });
439 bulk_parts.unordered_part.clear();
440 }
441 } else {
442 bulk_parts.parts.push(BulkPartWrapper {
443 part: PartToMerge::Bulk {
444 part: fragment,
445 file_id: FileId::random(),
446 },
447 merging: false,
448 });
449 }
450
451 self.update_stats(local_metrics);
456 }
457
458 if self.should_compact() {
459 self.schedule_compact();
460 }
461
462 Ok(())
463 }
464
465 #[cfg(any(test, feature = "test"))]
466 fn iter(
467 &self,
468 _projection: Option<&[ColumnId]>,
469 _predicate: Option<table::predicate::Predicate>,
470 _sequence: Option<SequenceRange>,
471 ) -> Result<crate::memtable::BoxedBatchIterator> {
472 todo!()
473 }
474
475 fn ranges(
476 &self,
477 projection: Option<&[ColumnId]>,
478 options: RangesOptions,
479 ) -> Result<MemtableRanges> {
480 let predicate = options.predicate;
481 let sequence = options.sequence;
482 let mut ranges = BTreeMap::new();
483 let mut range_id = 0;
484
485 let context = Arc::new(BulkIterContext::new_with_pre_filter_mode(
487 self.metadata.clone(),
488 projection,
489 predicate.predicate().cloned(),
490 options.for_flush,
491 options.pre_filter_mode,
492 )?);
493
494 {
496 let bulk_parts = self.parts.read().unwrap();
497
498 if !bulk_parts.unordered_part.is_empty()
500 && let Some(unordered_bulk_part) = bulk_parts.unordered_part.to_bulk_part()?
501 {
502 let part_stats = unordered_bulk_part.to_memtable_stats(&self.metadata);
503 let range = MemtableRange::new(
504 Arc::new(MemtableRangeContext::new(
505 self.id,
506 Box::new(BulkRangeIterBuilder {
507 part: unordered_bulk_part,
508 context: context.clone(),
509 sequence,
510 }),
511 predicate.clone(),
512 )),
513 part_stats,
514 );
515 ranges.insert(range_id, range);
516 range_id += 1;
517 }
518
519 for part_wrapper in bulk_parts.parts.iter() {
521 if part_wrapper.part.num_rows() == 0 {
523 continue;
524 }
525
526 let part_stats = part_wrapper.part.to_memtable_stats(&self.metadata);
527 let iter_builder: Box<dyn IterBuilder> = match &part_wrapper.part {
528 PartToMerge::Bulk { part, .. } => Box::new(BulkRangeIterBuilder {
529 part: part.clone(),
530 context: context.clone(),
531 sequence,
532 }),
533 PartToMerge::Multi { part, .. } => Box::new(MultiBulkRangeIterBuilder {
534 part: part.clone(),
535 context: context.clone(),
536 sequence,
537 }),
538 PartToMerge::Encoded { part, file_id } => {
539 Box::new(EncodedBulkRangeIterBuilder {
540 file_id: *file_id,
541 part: part.clone(),
542 context: context.clone(),
543 sequence,
544 })
545 }
546 };
547
548 let range = MemtableRange::new(
549 Arc::new(MemtableRangeContext::new(
550 self.id,
551 iter_builder,
552 predicate.clone(),
553 )),
554 part_stats,
555 );
556 ranges.insert(range_id, range);
557 range_id += 1;
558 }
559 }
560
561 Ok(MemtableRanges { ranges })
562 }
563
564 fn is_empty(&self) -> bool {
565 let bulk_parts = self.parts.read().unwrap();
566 bulk_parts.is_empty()
567 }
568
569 fn freeze(&self) -> Result<()> {
570 self.alloc_tracker.done_allocating();
571 Ok(())
572 }
573
574 fn stats(&self) -> MemtableStats {
575 let estimated_bytes = self.alloc_tracker.bytes_allocated();
576
577 if estimated_bytes == 0 || self.num_rows.load(Ordering::Relaxed) == 0 {
578 return MemtableStats {
579 estimated_bytes,
580 time_range: None,
581 num_rows: 0,
582 num_ranges: 0,
583 max_sequence: 0,
584 series_count: 0,
585 };
586 }
587
588 let ts_type = self
589 .metadata
590 .time_index_column()
591 .column_schema
592 .data_type
593 .clone()
594 .as_timestamp()
595 .expect("Timestamp column must have timestamp type");
596 let max_timestamp = ts_type.create_timestamp(self.max_timestamp.load(Ordering::Relaxed));
597 let min_timestamp = ts_type.create_timestamp(self.min_timestamp.load(Ordering::Relaxed));
598
599 let num_ranges = self.parts.read().unwrap().num_parts();
600
601 MemtableStats {
602 estimated_bytes,
603 time_range: Some((min_timestamp, max_timestamp)),
604 num_rows: self.num_rows.load(Ordering::Relaxed),
605 num_ranges,
606 max_sequence: self.max_sequence.load(Ordering::Relaxed),
607 series_count: self.estimated_series_count(),
608 }
609 }
610
611 fn fork(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef {
612 let flat_arrow_schema = to_flat_sst_arrow_schema(
614 metadata,
615 &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
616 );
617
618 Arc::new(Self {
619 id,
620 config: self.config.clone(),
621 parts: Arc::new(RwLock::new(BulkParts::default())),
622 metadata: metadata.clone(),
623 alloc_tracker: AllocTracker::new(self.alloc_tracker.write_buffer_manager()),
624 max_timestamp: AtomicI64::new(i64::MIN),
625 min_timestamp: AtomicI64::new(i64::MAX),
626 max_sequence: AtomicU64::new(0),
627 num_rows: AtomicUsize::new(0),
628 flat_arrow_schema,
629 compactor: Arc::new(Mutex::new(MemtableCompactor::new(
630 metadata.region_id,
631 id,
632 self.config.clone(),
633 ))),
634 compact_dispatcher: self.compact_dispatcher.clone(),
635 append_mode: self.append_mode,
636 merge_mode: self.merge_mode,
637 })
638 }
639
640 fn compact(&self, for_flush: bool) -> Result<()> {
641 let mut compactor = self.compactor.lock().unwrap();
642
643 if for_flush {
644 return Ok(());
645 }
646
647 let should_merge = self
649 .parts
650 .read()
651 .unwrap()
652 .should_merge_parts(self.config.merge_threshold);
653 if should_merge {
654 compactor.merge_parts(
655 &self.flat_arrow_schema,
656 &self.parts,
657 &self.metadata,
658 !self.append_mode,
659 self.merge_mode,
660 )?;
661 }
662
663 Ok(())
664 }
665}
666
667impl BulkMemtable {
668 pub fn new(
670 id: MemtableId,
671 config: BulkMemtableConfig,
672 metadata: RegionMetadataRef,
673 write_buffer_manager: Option<WriteBufferManagerRef>,
674 compact_dispatcher: Option<Arc<CompactDispatcher>>,
675 append_mode: bool,
676 merge_mode: MergeMode,
677 ) -> Self {
678 let flat_arrow_schema = to_flat_sst_arrow_schema(
679 &metadata,
680 &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
681 );
682
683 let region_id = metadata.region_id;
684 Self {
685 id,
686 config: config.clone(),
687 parts: Arc::new(RwLock::new(BulkParts::default())),
688 metadata,
689 alloc_tracker: AllocTracker::new(write_buffer_manager),
690 max_timestamp: AtomicI64::new(i64::MIN),
691 min_timestamp: AtomicI64::new(i64::MAX),
692 max_sequence: AtomicU64::new(0),
693 num_rows: AtomicUsize::new(0),
694 flat_arrow_schema,
695 compactor: Arc::new(Mutex::new(MemtableCompactor::new(region_id, id, config))),
696 compact_dispatcher,
697 append_mode,
698 merge_mode,
699 }
700 }
701
702 #[cfg(test)]
704 pub fn set_unordered_part_threshold(&self, threshold: usize) {
705 self.parts
706 .write()
707 .unwrap()
708 .unordered_part
709 .set_threshold(threshold);
710 }
711
712 #[cfg(test)]
714 pub fn set_unordered_part_compact_threshold(&self, compact_threshold: usize) {
715 self.parts
716 .write()
717 .unwrap()
718 .unordered_part
719 .set_compact_threshold(compact_threshold);
720 }
721
722 fn update_stats(&self, stats: WriteMetrics) {
726 self.alloc_tracker
727 .on_allocation(stats.key_bytes + stats.value_bytes);
728
729 self.max_timestamp
730 .fetch_max(stats.max_ts, Ordering::Relaxed);
731 self.min_timestamp
732 .fetch_min(stats.min_ts, Ordering::Relaxed);
733 self.max_sequence
734 .fetch_max(stats.max_sequence, Ordering::Relaxed);
735 self.num_rows.fetch_add(stats.num_rows, Ordering::Relaxed);
736 }
737
738 fn estimated_series_count(&self) -> usize {
740 let bulk_parts = self.parts.read().unwrap();
741 bulk_parts
742 .parts
743 .iter()
744 .map(|part_wrapper| part_wrapper.part.series_count())
745 .sum()
746 }
747
748 fn should_compact(&self) -> bool {
750 let parts = self.parts.read().unwrap();
751 parts.should_merge_parts(self.config.merge_threshold)
752 }
753
754 fn schedule_compact(&self) {
756 if let Some(dispatcher) = &self.compact_dispatcher {
757 let task = MemCompactTask {
758 metadata: self.metadata.clone(),
759 parts: self.parts.clone(),
760 config: self.config.clone(),
761 flat_arrow_schema: self.flat_arrow_schema.clone(),
762 compactor: self.compactor.clone(),
763 append_mode: self.append_mode,
764 merge_mode: self.merge_mode,
765 };
766
767 dispatcher.dispatch_compact(task);
768 } else {
769 if let Err(e) = self.compact(false) {
771 common_telemetry::error!(e; "Failed to compact table");
772 }
773 }
774 }
775}
776
777pub struct BulkRangeIterBuilder {
779 pub part: BulkPart,
780 pub context: Arc<BulkIterContext>,
781 pub sequence: Option<SequenceRange>,
782}
783
784struct MultiBulkRangeIterBuilder {
786 part: MultiBulkPart,
787 context: Arc<BulkIterContext>,
788 sequence: Option<SequenceRange>,
789}
790
791impl IterBuilder for BulkRangeIterBuilder {
792 fn build(&self, _metrics: Option<MemScanMetrics>) -> Result<BoxedBatchIterator> {
793 UnsupportedOperationSnafu {
794 err_msg: "BatchIterator is not supported for bulk memtable",
795 }
796 .fail()
797 }
798
799 fn is_record_batch(&self) -> bool {
800 true
801 }
802
803 fn build_record_batch(
804 &self,
805 metrics: Option<MemScanMetrics>,
806 ) -> Result<BoxedRecordBatchIterator> {
807 let series_count = self.part.estimated_series_count();
808 let iter = BulkPartBatchIter::from_single(
809 self.part.batch.clone(),
810 self.context.clone(),
811 self.sequence,
812 series_count,
813 metrics,
814 );
815
816 Ok(Box::new(iter))
817 }
818
819 fn encoded_range(&self) -> Option<EncodedRange> {
820 None
821 }
822}
823
824impl IterBuilder for MultiBulkRangeIterBuilder {
825 fn build(&self, _metrics: Option<MemScanMetrics>) -> Result<BoxedBatchIterator> {
826 UnsupportedOperationSnafu {
827 err_msg: "BatchIterator is not supported for multi bulk memtable",
828 }
829 .fail()
830 }
831
832 fn is_record_batch(&self) -> bool {
833 true
834 }
835
836 fn build_record_batch(
837 &self,
838 metrics: Option<MemScanMetrics>,
839 ) -> Result<BoxedRecordBatchIterator> {
840 self.part
841 .read(self.context.clone(), self.sequence, metrics)?
842 .ok_or_else(|| {
843 UnsupportedOperationSnafu {
844 err_msg: "Failed to create iterator for multi bulk part",
845 }
846 .build()
847 })
848 }
849
850 fn encoded_range(&self) -> Option<EncodedRange> {
851 None
852 }
853}
854
855struct EncodedBulkRangeIterBuilder {
857 file_id: FileId,
858 part: EncodedBulkPart,
859 context: Arc<BulkIterContext>,
860 sequence: Option<SequenceRange>,
861}
862
863impl IterBuilder for EncodedBulkRangeIterBuilder {
864 fn build(&self, _metrics: Option<MemScanMetrics>) -> Result<BoxedBatchIterator> {
865 UnsupportedOperationSnafu {
866 err_msg: "BatchIterator is not supported for encoded bulk memtable",
867 }
868 .fail()
869 }
870
871 fn is_record_batch(&self) -> bool {
872 true
873 }
874
875 fn build_record_batch(
876 &self,
877 metrics: Option<MemScanMetrics>,
878 ) -> Result<BoxedRecordBatchIterator> {
879 if let Some(iter) = self
880 .part
881 .read(self.context.clone(), self.sequence, metrics)?
882 {
883 Ok(iter)
884 } else {
885 Ok(Box::new(std::iter::empty()))
887 }
888 }
889
890 fn encoded_range(&self) -> Option<EncodedRange> {
891 Some(EncodedRange {
892 data: self.part.data().clone(),
893 sst_info: self.part.to_sst_info(self.file_id),
894 })
895 }
896}
897
898struct BulkPartWrapper {
899 part: PartToMerge,
901 merging: bool,
903}
904
905impl BulkPartWrapper {
906 fn file_id(&self) -> FileId {
908 self.part.file_id()
909 }
910}
911
912#[derive(Clone)]
914enum PartToMerge {
915 Bulk { part: BulkPart, file_id: FileId },
917 Multi {
919 part: MultiBulkPart,
920 file_id: FileId,
921 },
922 Encoded {
924 part: EncodedBulkPart,
925 file_id: FileId,
926 },
927}
928
929impl PartToMerge {
930 fn file_id(&self) -> FileId {
932 match self {
933 PartToMerge::Bulk { file_id, .. } => *file_id,
934 PartToMerge::Multi { file_id, .. } => *file_id,
935 PartToMerge::Encoded { file_id, .. } => *file_id,
936 }
937 }
938
939 fn min_timestamp(&self) -> i64 {
941 match self {
942 PartToMerge::Bulk { part, .. } => part.min_timestamp,
943 PartToMerge::Multi { part, .. } => part.min_timestamp(),
944 PartToMerge::Encoded { part, .. } => part.metadata().min_timestamp,
945 }
946 }
947
948 fn max_timestamp(&self) -> i64 {
950 match self {
951 PartToMerge::Bulk { part, .. } => part.max_timestamp,
952 PartToMerge::Multi { part, .. } => part.max_timestamp(),
953 PartToMerge::Encoded { part, .. } => part.metadata().max_timestamp,
954 }
955 }
956
957 fn num_rows(&self) -> usize {
959 match self {
960 PartToMerge::Bulk { part, .. } => part.num_rows(),
961 PartToMerge::Multi { part, .. } => part.num_rows(),
962 PartToMerge::Encoded { part, .. } => part.metadata().num_rows,
963 }
964 }
965
966 fn max_sequence(&self) -> u64 {
968 match self {
969 PartToMerge::Bulk { part, .. } => part.sequence,
970 PartToMerge::Multi { part, .. } => part.max_sequence(),
971 PartToMerge::Encoded { part, .. } => part.metadata().max_sequence,
972 }
973 }
974
975 fn series_count(&self) -> usize {
977 match self {
978 PartToMerge::Bulk { part, .. } => part.estimated_series_count(),
979 PartToMerge::Multi { part, .. } => part.series_count(),
980 PartToMerge::Encoded { part, .. } => part.metadata().num_series as usize,
981 }
982 }
983
984 fn is_encoded(&self) -> bool {
986 matches!(self, PartToMerge::Encoded { .. })
987 }
988
989 fn estimated_size(&self) -> usize {
991 match self {
992 PartToMerge::Bulk { part, .. } => part.estimated_size(),
993 PartToMerge::Multi { part, .. } => part.estimated_size(),
994 PartToMerge::Encoded { part, .. } => part.size_bytes(),
995 }
996 }
997
998 fn to_memtable_stats(&self, region_metadata: &RegionMetadataRef) -> MemtableStats {
1000 match self {
1001 PartToMerge::Bulk { part, .. } => part.to_memtable_stats(region_metadata),
1002 PartToMerge::Multi { part, .. } => part.to_memtable_stats(region_metadata),
1003 PartToMerge::Encoded { part, .. } => part.to_memtable_stats(),
1004 }
1005 }
1006
1007 fn create_iterator(
1009 self,
1010 context: Arc<BulkIterContext>,
1011 ) -> Result<Option<BoxedRecordBatchIterator>> {
1012 match self {
1013 PartToMerge::Bulk { part, .. } => {
1014 let series_count = part.estimated_series_count();
1015 let iter = BulkPartBatchIter::from_single(
1016 part.batch,
1017 context,
1018 None, series_count,
1020 None, );
1022 Ok(Some(Box::new(iter) as BoxedRecordBatchIterator))
1023 }
1024 PartToMerge::Multi { part, .. } => part.read(context, None, None),
1025 PartToMerge::Encoded { part, .. } => part.read(context, None, None),
1026 }
1027 }
1028}
1029
1030struct MemtableCompactor {
1031 region_id: RegionId,
1032 memtable_id: MemtableId,
1033 config: BulkMemtableConfig,
1035}
1036
1037impl MemtableCompactor {
1038 fn new(region_id: RegionId, memtable_id: MemtableId, config: BulkMemtableConfig) -> Self {
1040 Self {
1041 region_id,
1042 memtable_id,
1043 config,
1044 }
1045 }
1046
1047 fn merge_parts(
1049 &mut self,
1050 arrow_schema: &SchemaRef,
1051 bulk_parts: &RwLock<BulkParts>,
1052 metadata: &RegionMetadataRef,
1053 dedup: bool,
1054 merge_mode: MergeMode,
1055 ) -> Result<()> {
1056 let start = Instant::now();
1057
1058 let collected = bulk_parts
1060 .write()
1061 .unwrap()
1062 .collect_parts_to_merge(self.config.merge_threshold, self.config.max_merge_groups);
1063
1064 if collected.groups.is_empty() {
1065 return Ok(());
1066 }
1067
1068 let merged_file_ids: HashSet<FileId> = collected
1070 .groups
1071 .iter()
1072 .flatten()
1073 .map(|part| part.file_id())
1074 .collect();
1075 let mut guard = MergingFlagsGuard::new(bulk_parts, &merged_file_ids);
1076
1077 let num_groups = collected.groups.len();
1078 let num_parts: usize = collected.groups.iter().map(|g| g.len()).sum();
1079
1080 let encode_row_threshold = self.config.encode_row_threshold;
1081 let encode_bytes_threshold = self.config.encode_bytes_threshold;
1082
1083 let merged_parts = collected
1085 .groups
1086 .into_par_iter()
1087 .map(|group| {
1088 Self::merge_parts_group(
1089 group,
1090 arrow_schema,
1091 metadata,
1092 dedup,
1093 merge_mode,
1094 encode_row_threshold,
1095 encode_bytes_threshold,
1096 )
1097 })
1098 .collect::<Result<Vec<Option<MergedPart>>>>()?;
1099
1100 let total_output_rows = {
1102 let mut parts = bulk_parts.write().unwrap();
1103 parts.install_merged_parts(merged_parts.into_iter().flatten(), &merged_file_ids)
1104 };
1105
1106 guard.mark_success();
1107
1108 common_telemetry::debug!(
1109 "BulkMemtable {} {} concurrent compact {} groups, {} parts, {} rows, cost: {:?}",
1110 self.region_id,
1111 self.memtable_id,
1112 num_groups,
1113 num_parts,
1114 total_output_rows,
1115 start.elapsed()
1116 );
1117
1118 Ok(())
1119 }
1120
1121 fn merge_parts_group(
1123 parts_to_merge: Vec<PartToMerge>,
1124 arrow_schema: &SchemaRef,
1125 metadata: &RegionMetadataRef,
1126 dedup: bool,
1127 merge_mode: MergeMode,
1128 encode_row_threshold: usize,
1129 encode_bytes_threshold: usize,
1130 ) -> Result<Option<MergedPart>> {
1131 if parts_to_merge.is_empty() {
1132 return Ok(None);
1133 }
1134
1135 let min_timestamp = parts_to_merge
1137 .iter()
1138 .map(|p| p.min_timestamp())
1139 .min()
1140 .unwrap_or(i64::MAX);
1141 let max_timestamp = parts_to_merge
1142 .iter()
1143 .map(|p| p.max_timestamp())
1144 .max()
1145 .unwrap_or(i64::MIN);
1146 let max_sequence = parts_to_merge
1147 .iter()
1148 .map(|p| p.max_sequence())
1149 .max()
1150 .unwrap_or(0);
1151
1152 let estimated_total_rows: usize = parts_to_merge.iter().map(|p| p.num_rows()).sum();
1154 let estimated_total_bytes: usize = parts_to_merge.iter().map(|p| p.estimated_size()).sum();
1155 let estimated_series_count = parts_to_merge
1156 .iter()
1157 .map(|p| p.series_count())
1158 .max()
1159 .unwrap_or(0);
1160
1161 let context = Arc::new(BulkIterContext::new(
1162 metadata.clone(),
1163 None, None, true,
1166 )?);
1167
1168 let iterators: Vec<BoxedRecordBatchIterator> = parts_to_merge
1170 .into_iter()
1171 .filter_map(|part| part.create_iterator(context.clone()).ok().flatten())
1172 .collect();
1173
1174 if iterators.is_empty() {
1175 return Ok(None);
1176 }
1177
1178 let merged_iter =
1179 FlatMergeIterator::new(arrow_schema.clone(), iterators, DEFAULT_READ_BATCH_SIZE)?;
1180
1181 let boxed_iter: BoxedRecordBatchIterator = if dedup {
1182 match merge_mode {
1184 MergeMode::LastRow => {
1185 let dedup_iter = FlatDedupIterator::new(merged_iter, FlatLastRow::new(false));
1186 Box::new(dedup_iter)
1187 }
1188 MergeMode::LastNonNull => {
1189 let field_column_count =
1192 metadata.column_metadatas.len() - 1 - metadata.primary_key.len();
1193 let total_columns = arrow_schema.fields().len();
1194 let field_column_start =
1195 total_columns - FIXED_POS_COLUMN_NUM - field_column_count;
1196
1197 let dedup_iter = FlatDedupIterator::new(
1198 merged_iter,
1199 FlatLastNonNull::new(field_column_start, false),
1200 );
1201 Box::new(dedup_iter)
1202 }
1203 }
1204 } else {
1205 Box::new(merged_iter)
1206 };
1207
1208 if estimated_total_rows > encode_row_threshold
1210 || estimated_total_bytes > encode_bytes_threshold
1211 {
1212 let encoder = BulkPartEncoder::new(metadata.clone(), DEFAULT_ROW_GROUP_SIZE)?;
1213 let mut metrics = BulkPartEncodeMetrics::default();
1214 let encoded_part = encoder.encode_record_batch_iter(
1215 boxed_iter,
1216 arrow_schema.clone(),
1217 min_timestamp,
1218 max_timestamp,
1219 max_sequence,
1220 &mut metrics,
1221 )?;
1222
1223 common_telemetry::trace!("merge_parts_group metrics: {:?}", metrics);
1224
1225 Ok(encoded_part.map(MergedPart::Encoded))
1226 } else {
1227 let mut batches = Vec::new();
1229 let mut actual_total_rows = 0;
1230
1231 for batch_result in boxed_iter {
1232 let batch = batch_result?;
1233 actual_total_rows += batch.num_rows();
1234 batches.push(batch);
1235 }
1236
1237 if actual_total_rows == 0 {
1238 return Ok(None);
1239 }
1240
1241 let multi_part = MultiBulkPart::new(
1242 batches,
1243 min_timestamp,
1244 max_timestamp,
1245 max_sequence,
1246 estimated_series_count,
1247 );
1248
1249 common_telemetry::trace!(
1250 "merge_parts_group created MultiBulkPart: rows={}, batches={}",
1251 actual_total_rows,
1252 multi_part.num_batches()
1253 );
1254
1255 Ok(Some(MergedPart::Multi(multi_part)))
1256 }
1257 }
1258}
1259
1260struct MemCompactTask {
1262 metadata: RegionMetadataRef,
1263 parts: Arc<RwLock<BulkParts>>,
1264 config: BulkMemtableConfig,
1266 flat_arrow_schema: SchemaRef,
1268 compactor: Arc<Mutex<MemtableCompactor>>,
1270 append_mode: bool,
1272 merge_mode: MergeMode,
1274}
1275
1276impl MemCompactTask {
1277 fn compact(&self) -> Result<()> {
1278 let mut compactor = self.compactor.lock().unwrap();
1279
1280 let should_merge = self
1281 .parts
1282 .read()
1283 .unwrap()
1284 .should_merge_parts(self.config.merge_threshold);
1285 if should_merge {
1286 compactor.merge_parts(
1287 &self.flat_arrow_schema,
1288 &self.parts,
1289 &self.metadata,
1290 !self.append_mode,
1291 self.merge_mode,
1292 )?;
1293 }
1294
1295 Ok(())
1296 }
1297}
1298
1299#[derive(Debug)]
1301pub struct CompactDispatcher {
1302 semaphore: Arc<Semaphore>,
1303}
1304
1305impl CompactDispatcher {
1306 pub fn new(permits: usize) -> Self {
1308 Self {
1309 semaphore: Arc::new(Semaphore::new(permits)),
1310 }
1311 }
1312
1313 fn dispatch_compact(&self, task: MemCompactTask) {
1315 let semaphore = self.semaphore.clone();
1316 common_runtime::spawn_global(async move {
1317 let Ok(_permit) = semaphore.acquire().await else {
1318 return;
1319 };
1320
1321 common_runtime::spawn_blocking_global(move || {
1322 if let Err(e) = task.compact() {
1323 common_telemetry::error!(e; "Failed to compact memtable, region: {}", task.metadata.region_id);
1324 }
1325 });
1326 });
1327 }
1328}
1329
1330#[derive(Debug, Default)]
1332pub struct BulkMemtableBuilder {
1333 config: BulkMemtableConfig,
1335 write_buffer_manager: Option<WriteBufferManagerRef>,
1336 compact_dispatcher: Option<Arc<CompactDispatcher>>,
1337 append_mode: bool,
1338 merge_mode: MergeMode,
1339}
1340
1341impl BulkMemtableBuilder {
1342 pub fn new(
1344 write_buffer_manager: Option<WriteBufferManagerRef>,
1345 append_mode: bool,
1346 merge_mode: MergeMode,
1347 ) -> Self {
1348 Self {
1349 config: BulkMemtableConfig::default(),
1350 write_buffer_manager,
1351 compact_dispatcher: None,
1352 append_mode,
1353 merge_mode,
1354 }
1355 }
1356
1357 pub fn with_compact_dispatcher(mut self, compact_dispatcher: Arc<CompactDispatcher>) -> Self {
1359 self.compact_dispatcher = Some(compact_dispatcher);
1360 self
1361 }
1362}
1363
1364impl MemtableBuilder for BulkMemtableBuilder {
1365 fn build(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef {
1366 Arc::new(BulkMemtable::new(
1367 id,
1368 self.config.clone(),
1369 metadata.clone(),
1370 self.write_buffer_manager.clone(),
1371 self.compact_dispatcher.clone(),
1372 self.append_mode,
1373 self.merge_mode,
1374 ))
1375 }
1376
1377 fn use_bulk_insert(&self, _metadata: &RegionMetadataRef) -> bool {
1378 true
1379 }
1380}
1381
1382#[cfg(test)]
1383mod tests {
1384 use mito_codec::row_converter::build_primary_key_codec;
1385
1386 use super::*;
1387 use crate::memtable::bulk::part::BulkPartConverter;
1388 use crate::read::scan_region::PredicateGroup;
1389 use crate::sst::{FlatSchemaOptions, to_flat_sst_arrow_schema};
1390 use crate::test_util::memtable_util::{build_key_values_with_ts_seq_values, metadata_for_test};
1391
1392 fn create_bulk_part_with_converter(
1393 k0: &str,
1394 k1: u32,
1395 timestamps: Vec<i64>,
1396 values: Vec<Option<f64>>,
1397 sequence: u64,
1398 ) -> Result<BulkPart> {
1399 let metadata = metadata_for_test();
1400 let capacity = 100;
1401 let primary_key_codec = build_primary_key_codec(&metadata);
1402 let schema = to_flat_sst_arrow_schema(
1403 &metadata,
1404 &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
1405 );
1406
1407 let mut converter =
1408 BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec, true);
1409
1410 let key_values = build_key_values_with_ts_seq_values(
1411 &metadata,
1412 k0.to_string(),
1413 k1,
1414 timestamps.into_iter(),
1415 values.into_iter(),
1416 sequence,
1417 );
1418
1419 converter.append_key_values(&key_values)?;
1420 converter.convert()
1421 }
1422
1423 #[test]
1424 fn test_bulk_memtable_write_read() {
1425 let metadata = metadata_for_test();
1426 let memtable = BulkMemtable::new(
1427 999,
1428 BulkMemtableConfig::default(),
1429 metadata.clone(),
1430 None,
1431 None,
1432 false,
1433 MergeMode::LastRow,
1434 );
1435 memtable.set_unordered_part_threshold(0);
1437
1438 let test_data = [
1439 (
1440 "key_a",
1441 1u32,
1442 vec![1000i64, 2000i64],
1443 vec![Some(10.5), Some(20.5)],
1444 100u64,
1445 ),
1446 (
1447 "key_b",
1448 2u32,
1449 vec![1500i64, 2500i64],
1450 vec![Some(15.5), Some(25.5)],
1451 200u64,
1452 ),
1453 ("key_c", 3u32, vec![3000i64], vec![Some(30.5)], 300u64),
1454 ];
1455
1456 for (k0, k1, timestamps, values, seq) in test_data.iter() {
1457 let part =
1458 create_bulk_part_with_converter(k0, *k1, timestamps.clone(), values.clone(), *seq)
1459 .unwrap();
1460 memtable.write_bulk(part).unwrap();
1461 }
1462
1463 let stats = memtable.stats();
1464 assert_eq!(5, stats.num_rows);
1465 assert_eq!(3, stats.num_ranges);
1466 assert_eq!(300, stats.max_sequence);
1467
1468 let (min_ts, max_ts) = stats.time_range.unwrap();
1469 assert_eq!(1000, min_ts.value());
1470 assert_eq!(3000, max_ts.value());
1471
1472 let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
1473 let ranges = memtable
1474 .ranges(
1475 None,
1476 RangesOptions::default().with_predicate(predicate_group),
1477 )
1478 .unwrap();
1479
1480 assert_eq!(3, ranges.ranges.len());
1481 let total_rows: usize = ranges.ranges.values().map(|r| r.stats().num_rows()).sum();
1482 assert_eq!(5, total_rows);
1483
1484 for (_range_id, range) in ranges.ranges.iter() {
1485 assert!(range.num_rows() > 0);
1486 assert!(range.is_record_batch());
1487
1488 let record_batch_iter = range.build_record_batch_iter(None).unwrap();
1489
1490 let mut total_rows = 0;
1491 for batch_result in record_batch_iter {
1492 let batch = batch_result.unwrap();
1493 total_rows += batch.num_rows();
1494 assert!(batch.num_rows() > 0);
1495 assert_eq!(8, batch.num_columns());
1496 }
1497 assert_eq!(total_rows, range.num_rows());
1498 }
1499 }
1500
1501 #[test]
1502 fn test_bulk_memtable_ranges_with_projection() {
1503 let metadata = metadata_for_test();
1504 let memtable = BulkMemtable::new(
1505 111,
1506 BulkMemtableConfig::default(),
1507 metadata.clone(),
1508 None,
1509 None,
1510 false,
1511 MergeMode::LastRow,
1512 );
1513
1514 let bulk_part = create_bulk_part_with_converter(
1515 "projection_test",
1516 5,
1517 vec![5000, 6000, 7000],
1518 vec![Some(50.0), Some(60.0), Some(70.0)],
1519 500,
1520 )
1521 .unwrap();
1522
1523 memtable.write_bulk(bulk_part).unwrap();
1524
1525 let projection = vec![4u32];
1526 let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
1527 let ranges = memtable
1528 .ranges(
1529 Some(&projection),
1530 RangesOptions::default().with_predicate(predicate_group),
1531 )
1532 .unwrap();
1533
1534 assert_eq!(1, ranges.ranges.len());
1535 let range = ranges.ranges.get(&0).unwrap();
1536
1537 assert!(range.is_record_batch());
1538 let record_batch_iter = range.build_record_batch_iter(None).unwrap();
1539
1540 let mut total_rows = 0;
1541 for batch_result in record_batch_iter {
1542 let batch = batch_result.unwrap();
1543 assert!(batch.num_rows() > 0);
1544 assert_eq!(5, batch.num_columns());
1545 total_rows += batch.num_rows();
1546 }
1547 assert_eq!(3, total_rows);
1548 }
1549
1550 #[test]
1551 fn test_bulk_memtable_unsupported_operations() {
1552 let metadata = metadata_for_test();
1553 let memtable = BulkMemtable::new(
1554 111,
1555 BulkMemtableConfig::default(),
1556 metadata.clone(),
1557 None,
1558 None,
1559 false,
1560 MergeMode::LastRow,
1561 );
1562
1563 let key_values = build_key_values_with_ts_seq_values(
1564 &metadata,
1565 "test".to_string(),
1566 1,
1567 vec![1000].into_iter(),
1568 vec![Some(1.0)].into_iter(),
1569 1,
1570 );
1571
1572 let err = memtable.write(&key_values).unwrap_err();
1573 assert!(err.to_string().contains("not supported"));
1574
1575 let kv = key_values.iter().next().unwrap();
1576 let err = memtable.write_one(kv).unwrap_err();
1577 assert!(err.to_string().contains("not supported"));
1578 }
1579
1580 #[test]
1581 fn test_bulk_memtable_freeze() {
1582 let metadata = metadata_for_test();
1583 let memtable = BulkMemtable::new(
1584 222,
1585 BulkMemtableConfig::default(),
1586 metadata.clone(),
1587 None,
1588 None,
1589 false,
1590 MergeMode::LastRow,
1591 );
1592
1593 let bulk_part = create_bulk_part_with_converter(
1594 "freeze_test",
1595 10,
1596 vec![10000],
1597 vec![Some(100.0)],
1598 1000,
1599 )
1600 .unwrap();
1601
1602 memtable.write_bulk(bulk_part).unwrap();
1603 memtable.freeze().unwrap();
1604
1605 let stats_after_freeze = memtable.stats();
1606 assert_eq!(1, stats_after_freeze.num_rows);
1607 }
1608
1609 #[test]
1610 fn test_bulk_memtable_fork() {
1611 let metadata = metadata_for_test();
1612 let original_memtable = BulkMemtable::new(
1613 333,
1614 BulkMemtableConfig::default(),
1615 metadata.clone(),
1616 None,
1617 None,
1618 false,
1619 MergeMode::LastRow,
1620 );
1621
1622 let bulk_part =
1623 create_bulk_part_with_converter("fork_test", 15, vec![15000], vec![Some(150.0)], 1500)
1624 .unwrap();
1625
1626 original_memtable.write_bulk(bulk_part).unwrap();
1627
1628 let forked_memtable = original_memtable.fork(444, &metadata);
1629
1630 assert_eq!(forked_memtable.id(), 444);
1631 assert!(forked_memtable.is_empty());
1632 assert_eq!(0, forked_memtable.stats().num_rows);
1633
1634 assert!(!original_memtable.is_empty());
1635 assert_eq!(1, original_memtable.stats().num_rows);
1636 }
1637
1638 #[test]
1639 fn test_bulk_memtable_ranges_multiple_parts() {
1640 let metadata = metadata_for_test();
1641 let memtable = BulkMemtable::new(
1642 777,
1643 BulkMemtableConfig::default(),
1644 metadata.clone(),
1645 None,
1646 None,
1647 false,
1648 MergeMode::LastRow,
1649 );
1650 memtable.set_unordered_part_threshold(0);
1652
1653 let parts_data = vec![
1654 (
1655 "part1",
1656 1u32,
1657 vec![1000i64, 1100i64],
1658 vec![Some(10.0), Some(11.0)],
1659 100u64,
1660 ),
1661 (
1662 "part2",
1663 2u32,
1664 vec![2000i64, 2100i64],
1665 vec![Some(20.0), Some(21.0)],
1666 200u64,
1667 ),
1668 ("part3", 3u32, vec![3000i64], vec![Some(30.0)], 300u64),
1669 ];
1670
1671 for (k0, k1, timestamps, values, seq) in parts_data {
1672 let part = create_bulk_part_with_converter(k0, k1, timestamps, values, seq).unwrap();
1673 memtable.write_bulk(part).unwrap();
1674 }
1675
1676 let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
1677 let ranges = memtable
1678 .ranges(
1679 None,
1680 RangesOptions::default().with_predicate(predicate_group),
1681 )
1682 .unwrap();
1683
1684 assert_eq!(3, ranges.ranges.len());
1685 let total_rows: usize = ranges.ranges.values().map(|r| r.stats().num_rows()).sum();
1686 assert_eq!(5, total_rows);
1687 assert_eq!(3, ranges.ranges.len());
1688
1689 for (range_id, range) in ranges.ranges.iter() {
1690 assert!(*range_id < 3);
1691 assert!(range.num_rows() > 0);
1692 assert!(range.is_record_batch());
1693 }
1694 }
1695
1696 #[test]
1697 fn test_bulk_memtable_ranges_with_sequence_filter() {
1698 let metadata = metadata_for_test();
1699 let memtable = BulkMemtable::new(
1700 888,
1701 BulkMemtableConfig::default(),
1702 metadata.clone(),
1703 None,
1704 None,
1705 false,
1706 MergeMode::LastRow,
1707 );
1708
1709 let part = create_bulk_part_with_converter(
1710 "seq_test",
1711 1,
1712 vec![1000, 2000, 3000],
1713 vec![Some(10.0), Some(20.0), Some(30.0)],
1714 500,
1715 )
1716 .unwrap();
1717
1718 memtable.write_bulk(part).unwrap();
1719
1720 let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
1721 let sequence_filter = Some(SequenceRange::LtEq { max: 400 }); let ranges = memtable
1723 .ranges(
1724 None,
1725 RangesOptions::default()
1726 .with_predicate(predicate_group)
1727 .with_sequence(sequence_filter),
1728 )
1729 .unwrap();
1730
1731 assert_eq!(1, ranges.ranges.len());
1732 let range = ranges.ranges.get(&0).unwrap();
1733
1734 let mut record_batch_iter = range.build_record_batch_iter(None).unwrap();
1735 assert!(record_batch_iter.next().is_none());
1736 }
1737
1738 #[test]
1739 fn test_bulk_memtable_ranges_with_encoded_parts() {
1740 let metadata = metadata_for_test();
1741 let config = BulkMemtableConfig {
1742 merge_threshold: 8,
1743 ..Default::default()
1744 };
1745 let memtable = BulkMemtable::new(
1746 999,
1747 config,
1748 metadata.clone(),
1749 None,
1750 None,
1751 false,
1752 MergeMode::LastRow,
1753 );
1754 memtable.set_unordered_part_threshold(0);
1756
1757 for i in 0..10 {
1759 let part = create_bulk_part_with_converter(
1760 &format!("key_{}", i),
1761 i,
1762 vec![1000 + i as i64 * 100],
1763 vec![Some(i as f64 * 10.0)],
1764 100 + i as u64,
1765 )
1766 .unwrap();
1767 memtable.write_bulk(part).unwrap();
1768 }
1769
1770 memtable.compact(false).unwrap();
1771
1772 let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
1773 let ranges = memtable
1774 .ranges(
1775 None,
1776 RangesOptions::default().with_predicate(predicate_group),
1777 )
1778 .unwrap();
1779
1780 assert_eq!(3, ranges.ranges.len());
1782 let total_rows: usize = ranges.ranges.values().map(|r| r.stats().num_rows()).sum();
1783 assert_eq!(10, total_rows);
1784
1785 for (_range_id, range) in ranges.ranges.iter() {
1786 assert!(range.num_rows() > 0);
1787 assert!(range.is_record_batch());
1788
1789 let record_batch_iter = range.build_record_batch_iter(None).unwrap();
1790 let mut total_rows = 0;
1791 for batch_result in record_batch_iter {
1792 let batch = batch_result.unwrap();
1793 total_rows += batch.num_rows();
1794 assert!(batch.num_rows() > 0);
1795 }
1796 assert_eq!(total_rows, range.num_rows());
1797 }
1798 }
1799
1800 #[test]
1801 fn test_bulk_memtable_unordered_part() {
1802 let metadata = metadata_for_test();
1803 let memtable = BulkMemtable::new(
1804 1001,
1805 BulkMemtableConfig::default(),
1806 metadata.clone(),
1807 None,
1808 None,
1809 false,
1810 MergeMode::LastRow,
1811 );
1812
1813 memtable.set_unordered_part_threshold(5);
1816 memtable.set_unordered_part_compact_threshold(10);
1818
1819 for i in 0..3 {
1821 let part = create_bulk_part_with_converter(
1822 &format!("key_{}", i),
1823 i,
1824 vec![1000 + i as i64 * 100, 1100 + i as i64 * 100],
1825 vec![Some(i as f64 * 10.0), Some(i as f64 * 10.0 + 1.0)],
1826 100 + i as u64,
1827 )
1828 .unwrap();
1829 assert_eq!(2, part.num_rows());
1830 memtable.write_bulk(part).unwrap();
1831 }
1832
1833 let stats = memtable.stats();
1835 assert_eq!(6, stats.num_rows);
1836
1837 for i in 3..5 {
1840 let part = create_bulk_part_with_converter(
1841 &format!("key_{}", i),
1842 i,
1843 vec![1000 + i as i64 * 100, 1100 + i as i64 * 100],
1844 vec![Some(i as f64 * 10.0), Some(i as f64 * 10.0 + 1.0)],
1845 100 + i as u64,
1846 )
1847 .unwrap();
1848 memtable.write_bulk(part).unwrap();
1849 }
1850
1851 let stats = memtable.stats();
1853 assert_eq!(10, stats.num_rows);
1854
1855 let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
1857 let ranges = memtable
1858 .ranges(
1859 None,
1860 RangesOptions::default().with_predicate(predicate_group),
1861 )
1862 .unwrap();
1863
1864 assert!(!ranges.ranges.is_empty());
1866 let total_rows: usize = ranges.ranges.values().map(|r| r.stats().num_rows()).sum();
1867 assert_eq!(10, total_rows);
1868
1869 let mut total_rows_read = 0;
1871 for (_range_id, range) in ranges.ranges.iter() {
1872 assert!(range.is_record_batch());
1873 let record_batch_iter = range.build_record_batch_iter(None).unwrap();
1874
1875 for batch_result in record_batch_iter {
1876 let batch = batch_result.unwrap();
1877 total_rows_read += batch.num_rows();
1878 }
1879 }
1880 assert_eq!(10, total_rows_read);
1881 }
1882
1883 #[test]
1884 fn test_bulk_memtable_unordered_part_mixed_sizes() {
1885 let metadata = metadata_for_test();
1886 let memtable = BulkMemtable::new(
1887 1002,
1888 BulkMemtableConfig::default(),
1889 metadata.clone(),
1890 None,
1891 None,
1892 false,
1893 MergeMode::LastRow,
1894 );
1895
1896 memtable.set_unordered_part_threshold(4);
1898 memtable.set_unordered_part_compact_threshold(8);
1899
1900 for i in 0..2 {
1902 let part = create_bulk_part_with_converter(
1903 &format!("small_{}", i),
1904 i,
1905 vec![1000 + i as i64, 2000 + i as i64, 3000 + i as i64],
1906 vec![Some(i as f64), Some(i as f64 + 1.0), Some(i as f64 + 2.0)],
1907 10 + i as u64,
1908 )
1909 .unwrap();
1910 assert_eq!(3, part.num_rows());
1911 memtable.write_bulk(part).unwrap();
1912 }
1913
1914 let large_part = create_bulk_part_with_converter(
1916 "large_key",
1917 100,
1918 vec![5000, 6000, 7000, 8000, 9000],
1919 vec![
1920 Some(100.0),
1921 Some(101.0),
1922 Some(102.0),
1923 Some(103.0),
1924 Some(104.0),
1925 ],
1926 50,
1927 )
1928 .unwrap();
1929 assert_eq!(5, large_part.num_rows());
1930 memtable.write_bulk(large_part).unwrap();
1931
1932 let part = create_bulk_part_with_converter(
1934 "small_2",
1935 2,
1936 vec![4000, 4100],
1937 vec![Some(20.0), Some(21.0)],
1938 30,
1939 )
1940 .unwrap();
1941 memtable.write_bulk(part).unwrap();
1942
1943 let stats = memtable.stats();
1944 assert_eq!(13, stats.num_rows); let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
1948 let ranges = memtable
1949 .ranges(
1950 None,
1951 RangesOptions::default().with_predicate(predicate_group),
1952 )
1953 .unwrap();
1954
1955 let total_rows: usize = ranges.ranges.values().map(|r| r.stats().num_rows()).sum();
1956 assert_eq!(13, total_rows);
1957
1958 let mut total_rows_read = 0;
1959 for (_range_id, range) in ranges.ranges.iter() {
1960 let record_batch_iter = range.build_record_batch_iter(None).unwrap();
1961 for batch_result in record_batch_iter {
1962 let batch = batch_result.unwrap();
1963 total_rows_read += batch.num_rows();
1964 }
1965 }
1966 assert_eq!(13, total_rows_read);
1967 }
1968
1969 #[test]
1970 fn test_bulk_memtable_unordered_part_with_ranges() {
1971 let metadata = metadata_for_test();
1972 let memtable = BulkMemtable::new(
1973 1003,
1974 BulkMemtableConfig::default(),
1975 metadata.clone(),
1976 None,
1977 None,
1978 false,
1979 MergeMode::LastRow,
1980 );
1981
1982 memtable.set_unordered_part_threshold(3);
1984 memtable.set_unordered_part_compact_threshold(100); for i in 0..3 {
1988 let part = create_bulk_part_with_converter(
1989 &format!("key_{}", i),
1990 i,
1991 vec![1000 + i as i64 * 100],
1992 vec![Some(i as f64 * 10.0)],
1993 100 + i as u64,
1994 )
1995 .unwrap();
1996 assert_eq!(1, part.num_rows());
1997 memtable.write_bulk(part).unwrap();
1998 }
1999
2000 let stats = memtable.stats();
2001 assert_eq!(3, stats.num_rows);
2002
2003 let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
2005 let ranges = memtable
2006 .ranges(
2007 None,
2008 RangesOptions::default().with_predicate(predicate_group),
2009 )
2010 .unwrap();
2011
2012 assert_eq!(1, ranges.ranges.len());
2014 let total_rows: usize = ranges.ranges.values().map(|r| r.stats().num_rows()).sum();
2015 assert_eq!(3, total_rows);
2016
2017 let range = ranges.ranges.get(&0).unwrap();
2019 let record_batch_iter = range.build_record_batch_iter(None).unwrap();
2020
2021 let mut total_rows = 0;
2022 for batch_result in record_batch_iter {
2023 let batch = batch_result.unwrap();
2024 total_rows += batch.num_rows();
2025 assert!(batch.num_rows() > 0);
2027 }
2028 assert_eq!(3, total_rows);
2029 }
2030
2031 fn create_bulk_part_wrapper(part: BulkPart) -> BulkPartWrapper {
2033 BulkPartWrapper {
2034 part: PartToMerge::Bulk {
2035 part,
2036 file_id: FileId::random(),
2037 },
2038 merging: false,
2039 }
2040 }
2041
2042 #[test]
2043 fn test_should_merge_parts_below_threshold() {
2044 let mut bulk_parts = BulkParts::default();
2045
2046 for i in 0..DEFAULT_MERGE_THRESHOLD - 1 {
2048 let part = create_bulk_part_with_converter(
2049 &format!("key_{}", i),
2050 i as u32,
2051 vec![1000 + i as i64 * 100],
2052 vec![Some(i as f64 * 10.0)],
2053 100 + i as u64,
2054 )
2055 .unwrap();
2056 bulk_parts.parts.push(create_bulk_part_wrapper(part));
2057 }
2058
2059 assert!(!bulk_parts.should_merge_parts(DEFAULT_MERGE_THRESHOLD));
2061 }
2062
2063 #[test]
2064 fn test_should_merge_parts_at_threshold() {
2065 let mut bulk_parts = BulkParts::default();
2066 let merge_threshold = 8;
2067
2068 for i in 0..merge_threshold {
2070 let part = create_bulk_part_with_converter(
2071 &format!("key_{}", i),
2072 i as u32,
2073 vec![1000 + i as i64 * 100],
2074 vec![Some(i as f64 * 10.0)],
2075 100 + i as u64,
2076 )
2077 .unwrap();
2078 bulk_parts.parts.push(create_bulk_part_wrapper(part));
2079 }
2080
2081 assert!(bulk_parts.should_merge_parts(merge_threshold));
2083 }
2084
2085 #[test]
2086 fn test_should_merge_parts_with_merging_flag() {
2087 let mut bulk_parts = BulkParts::default();
2088 let merge_threshold = 8;
2089
2090 for i in 0..10 {
2092 let part = create_bulk_part_with_converter(
2093 &format!("key_{}", i),
2094 i as u32,
2095 vec![1000 + i as i64 * 100],
2096 vec![Some(i as f64 * 10.0)],
2097 100 + i as u64,
2098 )
2099 .unwrap();
2100 bulk_parts.parts.push(create_bulk_part_wrapper(part));
2101 }
2102
2103 assert!(bulk_parts.should_merge_parts(merge_threshold));
2105
2106 for wrapper in bulk_parts.parts.iter_mut().take(3) {
2108 wrapper.merging = true;
2109 }
2110
2111 assert!(!bulk_parts.should_merge_parts(merge_threshold));
2113 }
2114
2115 #[test]
2116 fn test_collect_parts_to_merge_grouping() {
2117 let mut bulk_parts = BulkParts::default();
2118
2119 for i in 0..16 {
2121 let num_rows = (i % 4) + 1; let timestamps: Vec<i64> = (0..num_rows)
2123 .map(|j| 1000 + i as i64 * 100 + j as i64)
2124 .collect();
2125 let values: Vec<Option<f64>> =
2126 (0..num_rows).map(|j| Some((i * 10 + j) as f64)).collect();
2127 let part = create_bulk_part_with_converter(
2128 &format!("key_{}", i),
2129 i as u32,
2130 timestamps,
2131 values,
2132 100 + i as u64,
2133 )
2134 .unwrap();
2135 bulk_parts.parts.push(create_bulk_part_wrapper(part));
2136 }
2137
2138 assert!(bulk_parts.should_merge_parts(DEFAULT_MERGE_THRESHOLD));
2140
2141 let collected =
2143 bulk_parts.collect_parts_to_merge(DEFAULT_MERGE_THRESHOLD, DEFAULT_MAX_MERGE_GROUPS);
2144
2145 assert!(!collected.groups.is_empty());
2147
2148 for group in &collected.groups {
2150 assert!(!group.is_empty());
2151 }
2152
2153 let total_parts: usize = collected.groups.iter().map(|g| g.len()).sum();
2155 assert_eq!(16, total_parts);
2156 }
2157
2158 #[test]
2159 fn test_bulk_memtable_ranges_with_multi_bulk_part() {
2160 let metadata = metadata_for_test();
2161 let merge_threshold = 8;
2162 let config = BulkMemtableConfig {
2163 merge_threshold,
2164 ..Default::default()
2165 };
2166 let memtable = BulkMemtable::new(
2167 2005,
2168 config,
2169 metadata.clone(),
2170 None,
2171 None,
2172 false,
2173 MergeMode::LastRow,
2174 );
2175 memtable.set_unordered_part_threshold(0);
2177
2178 for i in 0..merge_threshold {
2182 let part = create_bulk_part_with_converter(
2183 &format!("key_{}", i),
2184 i as u32,
2185 vec![1000 + i as i64 * 100, 2000 + i as i64 * 100],
2186 vec![Some(i as f64 * 10.0), Some(i as f64 * 10.0 + 1.0)],
2187 100 + i as u64,
2188 )
2189 .unwrap();
2190 memtable.write_bulk(part).unwrap();
2191 }
2192
2193 memtable.compact(false).unwrap();
2195
2196 let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
2198 let ranges = memtable
2199 .ranges(
2200 None,
2201 RangesOptions::default().with_predicate(predicate_group),
2202 )
2203 .unwrap();
2204
2205 assert_eq!(1, ranges.ranges.len());
2206 let expected_rows = merge_threshold * 2; let total_rows: usize = ranges.ranges.values().map(|r| r.stats().num_rows()).sum();
2208 assert_eq!(expected_rows, total_rows);
2209
2210 let mut total_rows_read = 0;
2212 for (_range_id, range) in ranges.ranges.iter() {
2213 assert!(range.is_record_batch());
2214 let record_batch_iter = range.build_record_batch_iter(None).unwrap();
2215
2216 for batch_result in record_batch_iter {
2217 let batch = batch_result.unwrap();
2218 total_rows_read += batch.num_rows();
2219 }
2220 }
2221 assert_eq!(expected_rows, total_rows_read);
2222 }
2223}