1pub(crate) mod chunk_reader;
18pub mod context;
19pub mod part;
20pub mod part_reader;
21mod row_group_reader;
22
23use std::collections::{BTreeMap, HashSet};
24use std::sync::atomic::{AtomicI64, AtomicU64, AtomicUsize, Ordering};
25use std::sync::{Arc, LazyLock, Mutex, RwLock};
26use std::time::Instant;
27
28fn env_usize(name: &str, default: usize) -> usize {
30 std::env::var(name)
31 .ok()
32 .and_then(|v| v.parse().ok())
33 .unwrap_or(default)
34}
35
36use common_time::Timestamp;
37use datatypes::arrow::datatypes::SchemaRef;
38use mito_codec::key_values::KeyValue;
39use rayon::prelude::*;
40use store_api::metadata::RegionMetadataRef;
41use store_api::storage::{ColumnId, FileId, RegionId, SequenceRange};
42use tokio::sync::Semaphore;
43
44use crate::error::{Result, UnsupportedOperationSnafu};
45use crate::flush::WriteBufferManagerRef;
46use crate::memtable::bulk::context::BulkIterContext;
47use crate::memtable::bulk::part::{
48 BulkPart, BulkPartEncodeMetrics, BulkPartEncoder, MultiBulkPart, UnorderedPart,
49 should_prune_bulk_part,
50};
51use crate::memtable::bulk::part_reader::BulkPartBatchIter;
52use crate::memtable::stats::WriteMetrics;
53use crate::memtable::{
54 AllocTracker, BoxedBatchIterator, BoxedRecordBatchIterator, EncodedBulkPart, EncodedRange,
55 IterBuilder, KeyValues, MemScanMetrics, Memtable, MemtableBuilder, MemtableId, MemtableRange,
56 MemtableRangeContext, MemtableRanges, MemtableRef, MemtableStats, RangesOptions,
57};
58use crate::read::flat_dedup::{FlatDedupIterator, FlatLastNonNull, FlatLastRow};
59use crate::read::flat_merge::FlatMergeIterator;
60use crate::region::options::MergeMode;
61use crate::sst::parquet::flat_format::field_column_start;
62use crate::sst::parquet::{DEFAULT_READ_BATCH_SIZE, DEFAULT_ROW_GROUP_SIZE};
63use crate::sst::{FlatSchemaOptions, to_flat_sst_arrow_schema};
64
65const DEFAULT_MERGE_THRESHOLD: usize = 16;
67
68static MERGE_THRESHOLD: LazyLock<usize> =
70 LazyLock::new(|| env_usize("GREPTIME_BULK_MERGE_THRESHOLD", DEFAULT_MERGE_THRESHOLD));
71
72const DEFAULT_MAX_MERGE_GROUPS: usize = 32;
74
75static MAX_MERGE_GROUPS: LazyLock<usize> =
77 LazyLock::new(|| env_usize("GREPTIME_BULK_MAX_MERGE_GROUPS", DEFAULT_MAX_MERGE_GROUPS));
78
79pub(crate) static ENCODE_ROW_THRESHOLD: LazyLock<usize> = LazyLock::new(|| {
82 env_usize(
83 "GREPTIME_BULK_ENCODE_ROW_THRESHOLD",
84 10 * DEFAULT_ROW_GROUP_SIZE,
85 )
86});
87
88const DEFAULT_ENCODE_BYTES_THRESHOLD: usize = 64 * 1024 * 1024;
90
91static ENCODE_BYTES_THRESHOLD: LazyLock<usize> = LazyLock::new(|| {
94 env_usize(
95 "GREPTIME_BULK_ENCODE_BYTES_THRESHOLD",
96 DEFAULT_ENCODE_BYTES_THRESHOLD,
97 )
98});
99
100#[derive(Debug, Clone)]
102pub struct BulkMemtableConfig {
103 pub merge_threshold: usize,
105 pub encode_row_threshold: usize,
107 pub encode_bytes_threshold: usize,
109 pub max_merge_groups: usize,
111}
112
113impl Default for BulkMemtableConfig {
114 fn default() -> Self {
115 Self {
116 merge_threshold: *MERGE_THRESHOLD,
117 encode_row_threshold: *ENCODE_ROW_THRESHOLD,
118 encode_bytes_threshold: *ENCODE_BYTES_THRESHOLD,
119 max_merge_groups: *MAX_MERGE_GROUPS,
120 }
121 }
122}
123
124enum MergedPart {
126 Multi(MultiBulkPart),
128 Encoded(EncodedBulkPart),
130}
131
132struct CollectedParts {
134 groups: Vec<Vec<PartToMerge>>,
136}
137
138#[derive(Default)]
140struct BulkParts {
141 unordered_part: UnorderedPart,
143 parts: Vec<BulkPartWrapper>,
145}
146
147impl BulkParts {
148 fn num_parts(&self) -> usize {
150 let unordered_count = if self.unordered_part.is_empty() { 0 } else { 1 };
151 self.parts.len() + unordered_count
152 }
153
154 fn is_empty(&self) -> bool {
156 self.unordered_part.is_empty() && self.parts.is_empty()
157 }
158
159 fn should_merge_parts(&self, merge_threshold: usize) -> bool {
162 let mut bulk_count = 0;
163 let mut encoded_count = 0;
164
165 for wrapper in &self.parts {
166 if wrapper.merging {
167 continue;
168 }
169
170 if wrapper.part.is_encoded() {
171 encoded_count += 1;
172 } else {
173 bulk_count += 1;
174 }
175
176 if bulk_count >= merge_threshold || encoded_count >= merge_threshold {
178 return true;
179 }
180 }
181
182 false
183 }
184
185 fn should_compact_unordered_part(&self) -> bool {
187 self.unordered_part.should_compact()
188 }
189
190 fn collect_parts_to_merge(
194 &mut self,
195 merge_threshold: usize,
196 max_merge_groups: usize,
197 ) -> CollectedParts {
198 let mut bulk_indices: Vec<(usize, usize)> = Vec::new();
200 let mut encoded_indices: Vec<(usize, usize)> = Vec::new();
201
202 for (idx, wrapper) in self.parts.iter().enumerate() {
203 if wrapper.merging {
204 continue;
205 }
206 let num_rows = wrapper.part.num_rows();
207 if wrapper.part.is_encoded() {
208 encoded_indices.push((idx, num_rows));
209 } else {
210 bulk_indices.push((idx, num_rows));
211 }
212 }
213
214 let mut groups = Vec::new();
215
216 if bulk_indices.len() >= merge_threshold {
218 groups.extend(self.collect_and_group_parts(
219 bulk_indices,
220 merge_threshold,
221 max_merge_groups,
222 ));
223 }
224
225 if encoded_indices.len() >= merge_threshold {
227 groups.extend(self.collect_and_group_parts(
228 encoded_indices,
229 merge_threshold,
230 max_merge_groups,
231 ));
232 }
233
234 CollectedParts { groups }
235 }
236
237 fn collect_and_group_parts(
239 &mut self,
240 mut indices: Vec<(usize, usize)>,
241 merge_threshold: usize,
242 max_merge_groups: usize,
243 ) -> Vec<Vec<PartToMerge>> {
244 if indices.is_empty() {
245 return Vec::new();
246 }
247
248 indices.sort_unstable_by_key(|(_, num_rows)| *num_rows);
250
251 indices
253 .chunks(merge_threshold)
254 .take(max_merge_groups)
255 .map(|chunk| {
256 chunk
257 .iter()
258 .map(|(idx, _)| {
259 let wrapper = &mut self.parts[*idx];
260 wrapper.merging = true;
261 wrapper.part.clone()
262 })
263 .collect()
264 })
265 .collect()
266 }
267
268 fn install_merged_parts<I>(
271 &mut self,
272 merged_parts: I,
273 merged_file_ids: &HashSet<FileId>,
274 ) -> usize
275 where
276 I: IntoIterator<Item = MergedPart>,
277 {
278 let mut total_output_rows = 0;
279
280 for merged_part in merged_parts {
281 match merged_part {
282 MergedPart::Encoded(encoded_part) => {
283 total_output_rows += encoded_part.metadata().num_rows;
284 self.parts.push(BulkPartWrapper {
285 part: PartToMerge::Encoded {
286 part: encoded_part,
287 file_id: FileId::random(),
288 },
289 merging: false,
290 });
291 }
292 MergedPart::Multi(multi_part) => {
293 total_output_rows += multi_part.num_rows();
294 self.parts.push(BulkPartWrapper {
295 part: PartToMerge::Multi {
296 part: multi_part,
297 file_id: FileId::random(),
298 },
299 merging: false,
300 });
301 }
302 }
303 }
304
305 self.parts
306 .retain(|wrapper| !merged_file_ids.contains(&wrapper.file_id()));
307
308 total_output_rows
309 }
310
311 fn reset_merging_flags(&mut self, file_ids: &HashSet<FileId>) {
314 for wrapper in &mut self.parts {
315 if file_ids.contains(&wrapper.file_id()) {
316 wrapper.merging = false;
317 }
318 }
319 }
320}
321
322struct MergingFlagsGuard<'a> {
325 bulk_parts: &'a RwLock<BulkParts>,
326 file_ids: &'a HashSet<FileId>,
327 success: bool,
328}
329
330impl<'a> MergingFlagsGuard<'a> {
331 fn new(bulk_parts: &'a RwLock<BulkParts>, file_ids: &'a HashSet<FileId>) -> Self {
333 Self {
334 bulk_parts,
335 file_ids,
336 success: false,
337 }
338 }
339
340 fn mark_success(&mut self) {
343 self.success = true;
344 }
345}
346
347impl<'a> Drop for MergingFlagsGuard<'a> {
348 fn drop(&mut self) {
349 if !self.success
350 && let Ok(mut parts) = self.bulk_parts.write()
351 {
352 parts.reset_merging_flags(self.file_ids);
353 }
354 }
355}
356
357pub struct BulkMemtable {
359 id: MemtableId,
360 config: BulkMemtableConfig,
362 parts: Arc<RwLock<BulkParts>>,
363 metadata: RegionMetadataRef,
364 alloc_tracker: AllocTracker,
365 max_timestamp: AtomicI64,
366 min_timestamp: AtomicI64,
367 max_sequence: AtomicU64,
368 num_rows: AtomicUsize,
369 flat_arrow_schema: SchemaRef,
371 compactor: Arc<Mutex<MemtableCompactor>>,
373 compact_dispatcher: Option<Arc<CompactDispatcher>>,
375 append_mode: bool,
377 merge_mode: MergeMode,
379}
380
381impl std::fmt::Debug for BulkMemtable {
382 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
383 f.debug_struct("BulkMemtable")
384 .field("id", &self.id)
385 .field("num_rows", &self.num_rows.load(Ordering::Relaxed))
386 .field("min_timestamp", &self.min_timestamp.load(Ordering::Relaxed))
387 .field("max_timestamp", &self.max_timestamp.load(Ordering::Relaxed))
388 .field("max_sequence", &self.max_sequence.load(Ordering::Relaxed))
389 .finish()
390 }
391}
392
393impl Memtable for BulkMemtable {
394 fn id(&self) -> MemtableId {
395 self.id
396 }
397
398 fn write(&self, _kvs: &KeyValues) -> Result<()> {
399 UnsupportedOperationSnafu {
400 err_msg: "write() is not supported for bulk memtable",
401 }
402 .fail()
403 }
404
405 fn write_one(&self, _key_value: KeyValue) -> Result<()> {
406 UnsupportedOperationSnafu {
407 err_msg: "write_one() is not supported for bulk memtable",
408 }
409 .fail()
410 }
411
412 fn write_bulk(&self, fragment: BulkPart) -> Result<()> {
413 let local_metrics = WriteMetrics {
414 key_bytes: 0,
415 value_bytes: fragment.estimated_size(),
416 min_ts: fragment.min_timestamp,
417 max_ts: fragment.max_timestamp,
418 num_rows: fragment.num_rows(),
419 max_sequence: fragment.sequence,
420 };
421
422 {
423 let mut bulk_parts = self.parts.write().unwrap();
424
425 if bulk_parts.unordered_part.should_accept(fragment.num_rows()) {
427 bulk_parts.unordered_part.push(fragment);
428
429 if bulk_parts.should_compact_unordered_part()
431 && let Some(bulk_part) = bulk_parts.unordered_part.to_bulk_part()?
432 {
433 bulk_parts.parts.push(BulkPartWrapper {
434 part: PartToMerge::Bulk {
435 part: bulk_part,
436 file_id: FileId::random(),
437 },
438 merging: false,
439 });
440 bulk_parts.unordered_part.clear();
441 }
442 } else {
443 bulk_parts.parts.push(BulkPartWrapper {
444 part: PartToMerge::Bulk {
445 part: fragment,
446 file_id: FileId::random(),
447 },
448 merging: false,
449 });
450 }
451
452 self.update_stats(local_metrics);
457 }
458
459 if self.should_compact() {
460 self.schedule_compact();
461 }
462
463 Ok(())
464 }
465
466 fn ranges(
467 &self,
468 projection: Option<&[ColumnId]>,
469 options: RangesOptions,
470 ) -> Result<MemtableRanges> {
471 let predicate = options.predicate;
472 let sequence = options.sequence;
473 let mut ranges = BTreeMap::new();
474 let mut range_id = 0;
475
476 let context = Arc::new(BulkIterContext::new_with_pre_filter_mode(
478 self.metadata.clone(),
479 projection,
480 predicate.predicate().cloned(),
481 options.for_flush,
482 options.pre_filter_mode,
483 )?);
484
485 {
487 let bulk_parts = self.parts.read().unwrap();
488
489 if !bulk_parts.unordered_part.is_empty()
491 && let Some(unordered_bulk_part) = bulk_parts.unordered_part.to_bulk_part()?
492 {
493 let part_stats = unordered_bulk_part.to_memtable_stats(&self.metadata);
494 let range = MemtableRange::new(
495 Arc::new(MemtableRangeContext::new(
496 self.id,
497 Box::new(BulkRangeIterBuilder {
498 part: unordered_bulk_part,
499 context: context.clone(),
500 sequence,
501 }),
502 predicate.clone(),
503 )),
504 part_stats,
505 );
506 ranges.insert(range_id, range);
507 range_id += 1;
508 }
509
510 for part_wrapper in bulk_parts.parts.iter() {
512 if part_wrapper.part.num_rows() == 0 {
514 continue;
515 }
516
517 let part_stats = part_wrapper.part.to_memtable_stats(&self.metadata);
518 let iter_builder: Box<dyn IterBuilder> = match &part_wrapper.part {
519 PartToMerge::Bulk { part, .. } => Box::new(BulkRangeIterBuilder {
520 part: part.clone(),
521 context: context.clone(),
522 sequence,
523 }),
524 PartToMerge::Multi { part, .. } => Box::new(MultiBulkRangeIterBuilder {
525 part: part.clone(),
526 context: context.clone(),
527 sequence,
528 }),
529 PartToMerge::Encoded { part, file_id } => {
530 Box::new(EncodedBulkRangeIterBuilder {
531 file_id: *file_id,
532 part: part.clone(),
533 context: context.clone(),
534 sequence,
535 })
536 }
537 };
538
539 let range = MemtableRange::new(
540 Arc::new(MemtableRangeContext::new(
541 self.id,
542 iter_builder,
543 predicate.clone(),
544 )),
545 part_stats,
546 );
547 ranges.insert(range_id, range);
548 range_id += 1;
549 }
550 }
551
552 Ok(MemtableRanges { ranges })
553 }
554
555 fn is_empty(&self) -> bool {
556 let bulk_parts = self.parts.read().unwrap();
557 bulk_parts.is_empty()
558 }
559
560 fn freeze(&self) -> Result<()> {
561 self.alloc_tracker.done_allocating();
562 Ok(())
563 }
564
565 fn stats(&self) -> MemtableStats {
566 let estimated_bytes = self.alloc_tracker.bytes_allocated();
567
568 if estimated_bytes == 0 || self.num_rows.load(Ordering::Relaxed) == 0 {
569 return MemtableStats {
570 estimated_bytes,
571 time_range: None,
572 num_rows: 0,
573 num_ranges: 0,
574 max_sequence: 0,
575 series_count: 0,
576 };
577 }
578
579 let ts_type = self
580 .metadata
581 .time_index_column()
582 .column_schema
583 .data_type
584 .clone()
585 .as_timestamp()
586 .expect("Timestamp column must have timestamp type");
587 let max_timestamp = ts_type.create_timestamp(self.max_timestamp.load(Ordering::Relaxed));
588 let min_timestamp = ts_type.create_timestamp(self.min_timestamp.load(Ordering::Relaxed));
589
590 let num_ranges = self.parts.read().unwrap().num_parts();
591
592 MemtableStats {
593 estimated_bytes,
594 time_range: Some((min_timestamp, max_timestamp)),
595 num_rows: self.num_rows.load(Ordering::Relaxed),
596 num_ranges,
597 max_sequence: self.max_sequence.load(Ordering::Relaxed),
598 series_count: self.estimated_series_count(),
599 }
600 }
601
602 fn fork(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef {
603 let flat_arrow_schema = to_flat_sst_arrow_schema(
605 metadata,
606 &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
607 );
608
609 Arc::new(Self {
610 id,
611 config: self.config.clone(),
612 parts: Arc::new(RwLock::new(BulkParts::default())),
613 metadata: metadata.clone(),
614 alloc_tracker: AllocTracker::new(self.alloc_tracker.write_buffer_manager()),
615 max_timestamp: AtomicI64::new(i64::MIN),
616 min_timestamp: AtomicI64::new(i64::MAX),
617 max_sequence: AtomicU64::new(0),
618 num_rows: AtomicUsize::new(0),
619 flat_arrow_schema,
620 compactor: Arc::new(Mutex::new(MemtableCompactor::new(
621 metadata.region_id,
622 id,
623 self.config.clone(),
624 ))),
625 compact_dispatcher: self.compact_dispatcher.clone(),
626 append_mode: self.append_mode,
627 merge_mode: self.merge_mode,
628 })
629 }
630
631 fn compact(&self, for_flush: bool) -> Result<()> {
632 let mut compactor = self.compactor.lock().unwrap();
633
634 if for_flush {
635 return Ok(());
636 }
637
638 let should_merge = self
640 .parts
641 .read()
642 .unwrap()
643 .should_merge_parts(self.config.merge_threshold);
644 if should_merge {
645 compactor.merge_parts(
646 &self.flat_arrow_schema,
647 &self.parts,
648 &self.metadata,
649 !self.append_mode,
650 self.merge_mode,
651 )?;
652 }
653
654 Ok(())
655 }
656}
657
658impl BulkMemtable {
659 pub fn new(
661 id: MemtableId,
662 config: BulkMemtableConfig,
663 metadata: RegionMetadataRef,
664 write_buffer_manager: Option<WriteBufferManagerRef>,
665 compact_dispatcher: Option<Arc<CompactDispatcher>>,
666 append_mode: bool,
667 merge_mode: MergeMode,
668 ) -> Self {
669 let flat_arrow_schema = to_flat_sst_arrow_schema(
670 &metadata,
671 &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
672 );
673
674 let region_id = metadata.region_id;
675 Self {
676 id,
677 config: config.clone(),
678 parts: Arc::new(RwLock::new(BulkParts::default())),
679 metadata,
680 alloc_tracker: AllocTracker::new(write_buffer_manager),
681 max_timestamp: AtomicI64::new(i64::MIN),
682 min_timestamp: AtomicI64::new(i64::MAX),
683 max_sequence: AtomicU64::new(0),
684 num_rows: AtomicUsize::new(0),
685 flat_arrow_schema,
686 compactor: Arc::new(Mutex::new(MemtableCompactor::new(region_id, id, config))),
687 compact_dispatcher,
688 append_mode,
689 merge_mode,
690 }
691 }
692
693 #[cfg(test)]
695 pub fn set_unordered_part_threshold(&self, threshold: usize) {
696 self.parts
697 .write()
698 .unwrap()
699 .unordered_part
700 .set_threshold(threshold);
701 }
702
703 #[cfg(test)]
705 pub fn set_unordered_part_compact_threshold(&self, compact_threshold: usize) {
706 self.parts
707 .write()
708 .unwrap()
709 .unordered_part
710 .set_compact_threshold(compact_threshold);
711 }
712
713 fn update_stats(&self, stats: WriteMetrics) {
717 self.alloc_tracker
718 .on_allocation(stats.key_bytes + stats.value_bytes);
719
720 self.max_timestamp
721 .fetch_max(stats.max_ts, Ordering::Relaxed);
722 self.min_timestamp
723 .fetch_min(stats.min_ts, Ordering::Relaxed);
724 self.max_sequence
725 .fetch_max(stats.max_sequence, Ordering::Relaxed);
726 self.num_rows.fetch_add(stats.num_rows, Ordering::Relaxed);
727 }
728
729 fn estimated_series_count(&self) -> usize {
731 let bulk_parts = self.parts.read().unwrap();
732 bulk_parts
733 .parts
734 .iter()
735 .map(|part_wrapper| part_wrapper.part.series_count())
736 .sum()
737 }
738
739 fn should_compact(&self) -> bool {
741 let parts = self.parts.read().unwrap();
742 parts.should_merge_parts(self.config.merge_threshold)
743 }
744
745 fn schedule_compact(&self) {
747 if let Some(dispatcher) = &self.compact_dispatcher {
748 let task = MemCompactTask {
749 metadata: self.metadata.clone(),
750 parts: self.parts.clone(),
751 config: self.config.clone(),
752 flat_arrow_schema: self.flat_arrow_schema.clone(),
753 compactor: self.compactor.clone(),
754 append_mode: self.append_mode,
755 merge_mode: self.merge_mode,
756 };
757
758 dispatcher.dispatch_compact(task);
759 } else {
760 if let Err(e) = self.compact(false) {
762 common_telemetry::error!(e; "Failed to compact table");
763 }
764 }
765 }
766}
767
768pub struct BulkRangeIterBuilder {
770 pub part: BulkPart,
771 pub context: Arc<BulkIterContext>,
772 pub sequence: Option<SequenceRange>,
773}
774
775struct MultiBulkRangeIterBuilder {
777 part: MultiBulkPart,
778 context: Arc<BulkIterContext>,
779 sequence: Option<SequenceRange>,
780}
781
782impl IterBuilder for BulkRangeIterBuilder {
783 fn build(&self, _metrics: Option<MemScanMetrics>) -> Result<BoxedBatchIterator> {
784 UnsupportedOperationSnafu {
785 err_msg: "BatchIterator is not supported for bulk memtable",
786 }
787 .fail()
788 }
789
790 fn is_record_batch(&self) -> bool {
791 true
792 }
793
794 fn build_record_batch(
795 &self,
796 _time_range: Option<(Timestamp, Timestamp)>,
797 metrics: Option<MemScanMetrics>,
798 ) -> Result<BoxedRecordBatchIterator> {
799 let metadata = self.context.read_format().metadata();
800 if should_prune_bulk_part(&self.part.batch, &self.context, metadata) {
801 return Ok(Box::new(std::iter::empty()));
802 }
803
804 let series_count = self.part.estimated_series_count();
805 let iter = BulkPartBatchIter::from_single(
806 self.part.batch.clone(),
807 self.context.clone(),
808 self.sequence,
809 series_count,
810 metrics,
811 );
812
813 Ok(Box::new(iter))
814 }
815
816 fn encoded_range(&self) -> Option<EncodedRange> {
817 None
818 }
819}
820
821impl IterBuilder for MultiBulkRangeIterBuilder {
822 fn build(&self, _metrics: Option<MemScanMetrics>) -> Result<BoxedBatchIterator> {
823 UnsupportedOperationSnafu {
824 err_msg: "BatchIterator is not supported for multi bulk memtable",
825 }
826 .fail()
827 }
828
829 fn is_record_batch(&self) -> bool {
830 true
831 }
832
833 fn build_record_batch(
834 &self,
835 _time_range: Option<(Timestamp, Timestamp)>,
836 metrics: Option<MemScanMetrics>,
837 ) -> Result<BoxedRecordBatchIterator> {
838 match self
839 .part
840 .read(self.context.clone(), self.sequence, metrics)?
841 {
842 Some(iter) => Ok(iter),
843 None => Ok(Box::new(std::iter::empty())),
845 }
846 }
847
848 fn encoded_range(&self) -> Option<EncodedRange> {
849 None
850 }
851}
852
853struct EncodedBulkRangeIterBuilder {
855 file_id: FileId,
856 part: EncodedBulkPart,
857 context: Arc<BulkIterContext>,
858 sequence: Option<SequenceRange>,
859}
860
861impl IterBuilder for EncodedBulkRangeIterBuilder {
862 fn build(&self, _metrics: Option<MemScanMetrics>) -> Result<BoxedBatchIterator> {
863 UnsupportedOperationSnafu {
864 err_msg: "BatchIterator is not supported for encoded bulk memtable",
865 }
866 .fail()
867 }
868
869 fn is_record_batch(&self) -> bool {
870 true
871 }
872
873 fn build_record_batch(
874 &self,
875 _time_range: Option<(Timestamp, Timestamp)>,
876 metrics: Option<MemScanMetrics>,
877 ) -> Result<BoxedRecordBatchIterator> {
878 if let Some(iter) = self
879 .part
880 .read(self.context.clone(), self.sequence, metrics)?
881 {
882 Ok(iter)
883 } else {
884 Ok(Box::new(std::iter::empty()))
886 }
887 }
888
889 fn encoded_range(&self) -> Option<EncodedRange> {
890 Some(EncodedRange {
891 data: self.part.data().clone(),
892 sst_info: self.part.to_sst_info(self.file_id),
893 })
894 }
895}
896
897struct BulkPartWrapper {
898 part: PartToMerge,
900 merging: bool,
902}
903
904impl BulkPartWrapper {
905 fn file_id(&self) -> FileId {
907 self.part.file_id()
908 }
909}
910
911#[derive(Clone)]
913enum PartToMerge {
914 Bulk { part: BulkPart, file_id: FileId },
916 Multi {
918 part: MultiBulkPart,
919 file_id: FileId,
920 },
921 Encoded {
923 part: EncodedBulkPart,
924 file_id: FileId,
925 },
926}
927
928impl PartToMerge {
929 fn file_id(&self) -> FileId {
931 match self {
932 PartToMerge::Bulk { file_id, .. } => *file_id,
933 PartToMerge::Multi { file_id, .. } => *file_id,
934 PartToMerge::Encoded { file_id, .. } => *file_id,
935 }
936 }
937
938 fn min_timestamp(&self) -> i64 {
940 match self {
941 PartToMerge::Bulk { part, .. } => part.min_timestamp,
942 PartToMerge::Multi { part, .. } => part.min_timestamp(),
943 PartToMerge::Encoded { part, .. } => part.metadata().min_timestamp,
944 }
945 }
946
947 fn max_timestamp(&self) -> i64 {
949 match self {
950 PartToMerge::Bulk { part, .. } => part.max_timestamp,
951 PartToMerge::Multi { part, .. } => part.max_timestamp(),
952 PartToMerge::Encoded { part, .. } => part.metadata().max_timestamp,
953 }
954 }
955
956 fn num_rows(&self) -> usize {
958 match self {
959 PartToMerge::Bulk { part, .. } => part.num_rows(),
960 PartToMerge::Multi { part, .. } => part.num_rows(),
961 PartToMerge::Encoded { part, .. } => part.metadata().num_rows,
962 }
963 }
964
965 fn max_sequence(&self) -> u64 {
967 match self {
968 PartToMerge::Bulk { part, .. } => part.sequence,
969 PartToMerge::Multi { part, .. } => part.max_sequence(),
970 PartToMerge::Encoded { part, .. } => part.metadata().max_sequence,
971 }
972 }
973
974 fn series_count(&self) -> usize {
976 match self {
977 PartToMerge::Bulk { part, .. } => part.estimated_series_count(),
978 PartToMerge::Multi { part, .. } => part.series_count(),
979 PartToMerge::Encoded { part, .. } => part.metadata().num_series as usize,
980 }
981 }
982
983 fn is_encoded(&self) -> bool {
985 matches!(self, PartToMerge::Encoded { .. })
986 }
987
988 fn estimated_size(&self) -> usize {
990 match self {
991 PartToMerge::Bulk { part, .. } => part.estimated_size(),
992 PartToMerge::Multi { part, .. } => part.estimated_size(),
993 PartToMerge::Encoded { part, .. } => part.size_bytes(),
994 }
995 }
996
997 fn to_memtable_stats(&self, region_metadata: &RegionMetadataRef) -> MemtableStats {
999 match self {
1000 PartToMerge::Bulk { part, .. } => part.to_memtable_stats(region_metadata),
1001 PartToMerge::Multi { part, .. } => part.to_memtable_stats(region_metadata),
1002 PartToMerge::Encoded { part, .. } => part.to_memtable_stats(),
1003 }
1004 }
1005
1006 fn create_iterator(
1008 self,
1009 context: Arc<BulkIterContext>,
1010 ) -> Result<Option<BoxedRecordBatchIterator>> {
1011 match self {
1012 PartToMerge::Bulk { part, .. } => {
1013 let series_count = part.estimated_series_count();
1014 let iter = BulkPartBatchIter::from_single(
1015 part.batch,
1016 context,
1017 None, series_count,
1019 None, );
1021 Ok(Some(Box::new(iter) as BoxedRecordBatchIterator))
1022 }
1023 PartToMerge::Multi { part, .. } => part.read(context, None, None),
1024 PartToMerge::Encoded { part, .. } => part.read(context, None, None),
1025 }
1026 }
1027}
1028
1029struct MemtableCompactor {
1030 region_id: RegionId,
1031 memtable_id: MemtableId,
1032 config: BulkMemtableConfig,
1034}
1035
1036impl MemtableCompactor {
1037 fn new(region_id: RegionId, memtable_id: MemtableId, config: BulkMemtableConfig) -> Self {
1039 Self {
1040 region_id,
1041 memtable_id,
1042 config,
1043 }
1044 }
1045
1046 fn merge_parts(
1048 &mut self,
1049 arrow_schema: &SchemaRef,
1050 bulk_parts: &RwLock<BulkParts>,
1051 metadata: &RegionMetadataRef,
1052 dedup: bool,
1053 merge_mode: MergeMode,
1054 ) -> Result<()> {
1055 let start = Instant::now();
1056
1057 let collected = bulk_parts
1059 .write()
1060 .unwrap()
1061 .collect_parts_to_merge(self.config.merge_threshold, self.config.max_merge_groups);
1062
1063 if collected.groups.is_empty() {
1064 return Ok(());
1065 }
1066
1067 let merged_file_ids: HashSet<FileId> = collected
1069 .groups
1070 .iter()
1071 .flatten()
1072 .map(|part| part.file_id())
1073 .collect();
1074 let mut guard = MergingFlagsGuard::new(bulk_parts, &merged_file_ids);
1075
1076 let num_groups = collected.groups.len();
1077 let num_parts: usize = collected.groups.iter().map(|g| g.len()).sum();
1078
1079 let encode_row_threshold = self.config.encode_row_threshold;
1080 let encode_bytes_threshold = self.config.encode_bytes_threshold;
1081
1082 let merged_parts = collected
1084 .groups
1085 .into_par_iter()
1086 .map(|group| {
1087 Self::merge_parts_group(
1088 group,
1089 arrow_schema,
1090 metadata,
1091 dedup,
1092 merge_mode,
1093 encode_row_threshold,
1094 encode_bytes_threshold,
1095 )
1096 })
1097 .collect::<Result<Vec<Option<MergedPart>>>>()?;
1098
1099 let total_output_rows = {
1101 let mut parts = bulk_parts.write().unwrap();
1102 parts.install_merged_parts(merged_parts.into_iter().flatten(), &merged_file_ids)
1103 };
1104
1105 guard.mark_success();
1106
1107 common_telemetry::debug!(
1108 "BulkMemtable {} {} concurrent compact {} groups, {} parts, {} rows, cost: {:?}",
1109 self.region_id,
1110 self.memtable_id,
1111 num_groups,
1112 num_parts,
1113 total_output_rows,
1114 start.elapsed()
1115 );
1116
1117 Ok(())
1118 }
1119
1120 fn merge_parts_group(
1122 parts_to_merge: Vec<PartToMerge>,
1123 arrow_schema: &SchemaRef,
1124 metadata: &RegionMetadataRef,
1125 dedup: bool,
1126 merge_mode: MergeMode,
1127 encode_row_threshold: usize,
1128 encode_bytes_threshold: usize,
1129 ) -> Result<Option<MergedPart>> {
1130 if parts_to_merge.is_empty() {
1131 return Ok(None);
1132 }
1133
1134 let min_timestamp = parts_to_merge
1136 .iter()
1137 .map(|p| p.min_timestamp())
1138 .min()
1139 .unwrap_or(i64::MAX);
1140 let max_timestamp = parts_to_merge
1141 .iter()
1142 .map(|p| p.max_timestamp())
1143 .max()
1144 .unwrap_or(i64::MIN);
1145 let max_sequence = parts_to_merge
1146 .iter()
1147 .map(|p| p.max_sequence())
1148 .max()
1149 .unwrap_or(0);
1150
1151 let estimated_total_rows: usize = parts_to_merge.iter().map(|p| p.num_rows()).sum();
1153 let estimated_total_bytes: usize = parts_to_merge.iter().map(|p| p.estimated_size()).sum();
1154 let estimated_series_count = parts_to_merge
1155 .iter()
1156 .map(|p| p.series_count())
1157 .max()
1158 .unwrap_or(0);
1159
1160 let context = Arc::new(BulkIterContext::new(
1161 metadata.clone(),
1162 None, None, true,
1165 )?);
1166
1167 let iterators: Vec<BoxedRecordBatchIterator> = parts_to_merge
1169 .into_iter()
1170 .filter_map(|part| part.create_iterator(context.clone()).ok().flatten())
1171 .collect();
1172
1173 if iterators.is_empty() {
1174 return Ok(None);
1175 }
1176
1177 let merged_iter =
1178 FlatMergeIterator::new(arrow_schema.clone(), iterators, DEFAULT_READ_BATCH_SIZE)?;
1179
1180 let boxed_iter: BoxedRecordBatchIterator = if dedup {
1181 match merge_mode {
1183 MergeMode::LastRow => {
1184 let dedup_iter = FlatDedupIterator::new(merged_iter, FlatLastRow::new(false));
1185 Box::new(dedup_iter)
1186 }
1187 MergeMode::LastNonNull => {
1188 let field_column_start =
1189 field_column_start(metadata, arrow_schema.fields().len());
1190
1191 let dedup_iter = FlatDedupIterator::new(
1192 merged_iter,
1193 FlatLastNonNull::new(field_column_start, false),
1194 );
1195 Box::new(dedup_iter)
1196 }
1197 }
1198 } else {
1199 Box::new(merged_iter)
1200 };
1201
1202 if estimated_total_rows > encode_row_threshold
1204 || estimated_total_bytes > encode_bytes_threshold
1205 {
1206 let encoder = BulkPartEncoder::new(metadata.clone(), DEFAULT_ROW_GROUP_SIZE)?;
1207 let mut metrics = BulkPartEncodeMetrics::default();
1208 let encoded_part = encoder.encode_record_batch_iter(
1209 boxed_iter,
1210 arrow_schema.clone(),
1211 min_timestamp,
1212 max_timestamp,
1213 max_sequence,
1214 &mut metrics,
1215 )?;
1216
1217 common_telemetry::trace!("merge_parts_group metrics: {:?}", metrics);
1218
1219 Ok(encoded_part.map(MergedPart::Encoded))
1220 } else {
1221 let mut batches = Vec::new();
1223 let mut actual_total_rows = 0;
1224
1225 for batch_result in boxed_iter {
1226 let batch = batch_result?;
1227 actual_total_rows += batch.num_rows();
1228 batches.push(batch);
1229 }
1230
1231 if actual_total_rows == 0 {
1232 return Ok(None);
1233 }
1234
1235 let multi_part = MultiBulkPart::new(
1236 batches,
1237 min_timestamp,
1238 max_timestamp,
1239 max_sequence,
1240 estimated_series_count,
1241 metadata,
1242 );
1243
1244 common_telemetry::trace!(
1245 "merge_parts_group created MultiBulkPart: rows={}, batches={}",
1246 actual_total_rows,
1247 multi_part.num_batches()
1248 );
1249
1250 Ok(Some(MergedPart::Multi(multi_part)))
1251 }
1252 }
1253}
1254
1255struct MemCompactTask {
1257 metadata: RegionMetadataRef,
1258 parts: Arc<RwLock<BulkParts>>,
1259 config: BulkMemtableConfig,
1261 flat_arrow_schema: SchemaRef,
1263 compactor: Arc<Mutex<MemtableCompactor>>,
1265 append_mode: bool,
1267 merge_mode: MergeMode,
1269}
1270
1271impl MemCompactTask {
1272 fn compact(&self) -> Result<()> {
1273 let mut compactor = self.compactor.lock().unwrap();
1274
1275 let should_merge = self
1276 .parts
1277 .read()
1278 .unwrap()
1279 .should_merge_parts(self.config.merge_threshold);
1280 if should_merge {
1281 compactor.merge_parts(
1282 &self.flat_arrow_schema,
1283 &self.parts,
1284 &self.metadata,
1285 !self.append_mode,
1286 self.merge_mode,
1287 )?;
1288 }
1289
1290 Ok(())
1291 }
1292}
1293
1294#[derive(Debug)]
1296pub struct CompactDispatcher {
1297 semaphore: Arc<Semaphore>,
1298}
1299
1300impl CompactDispatcher {
1301 pub fn new(permits: usize) -> Self {
1303 Self {
1304 semaphore: Arc::new(Semaphore::new(permits)),
1305 }
1306 }
1307
1308 fn dispatch_compact(&self, task: MemCompactTask) {
1310 let semaphore = self.semaphore.clone();
1311 common_runtime::spawn_global(async move {
1312 let Ok(_permit) = semaphore.acquire().await else {
1313 return;
1314 };
1315
1316 common_runtime::spawn_blocking_global(move || {
1317 if let Err(e) = task.compact() {
1318 common_telemetry::error!(e; "Failed to compact memtable, region: {}", task.metadata.region_id);
1319 }
1320 });
1321 });
1322 }
1323}
1324
1325#[derive(Debug, Default)]
1327pub struct BulkMemtableBuilder {
1328 config: BulkMemtableConfig,
1330 write_buffer_manager: Option<WriteBufferManagerRef>,
1331 compact_dispatcher: Option<Arc<CompactDispatcher>>,
1332 append_mode: bool,
1333 merge_mode: MergeMode,
1334}
1335
1336impl BulkMemtableBuilder {
1337 pub fn new(
1339 write_buffer_manager: Option<WriteBufferManagerRef>,
1340 append_mode: bool,
1341 merge_mode: MergeMode,
1342 ) -> Self {
1343 Self {
1344 config: BulkMemtableConfig::default(),
1345 write_buffer_manager,
1346 compact_dispatcher: None,
1347 append_mode,
1348 merge_mode,
1349 }
1350 }
1351
1352 pub fn with_compact_dispatcher(mut self, compact_dispatcher: Arc<CompactDispatcher>) -> Self {
1354 self.compact_dispatcher = Some(compact_dispatcher);
1355 self
1356 }
1357}
1358
1359impl MemtableBuilder for BulkMemtableBuilder {
1360 fn build(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef {
1361 Arc::new(BulkMemtable::new(
1362 id,
1363 self.config.clone(),
1364 metadata.clone(),
1365 self.write_buffer_manager.clone(),
1366 self.compact_dispatcher.clone(),
1367 self.append_mode,
1368 self.merge_mode,
1369 ))
1370 }
1371
1372 fn use_bulk_insert(&self, _metadata: &RegionMetadataRef) -> bool {
1373 true
1374 }
1375}
1376
1377#[cfg(test)]
1378mod tests {
1379 use mito_codec::row_converter::build_primary_key_codec;
1380
1381 use super::*;
1382 use crate::memtable::bulk::part::BulkPartConverter;
1383 use crate::read::scan_region::PredicateGroup;
1384 use crate::sst::{FlatSchemaOptions, to_flat_sst_arrow_schema};
1385 use crate::test_util::memtable_util::{build_key_values_with_ts_seq_values, metadata_for_test};
1386
1387 fn create_bulk_part_with_converter(
1388 k0: &str,
1389 k1: u32,
1390 timestamps: Vec<i64>,
1391 values: Vec<Option<f64>>,
1392 sequence: u64,
1393 ) -> Result<BulkPart> {
1394 let metadata = metadata_for_test();
1395 let capacity = 100;
1396 let primary_key_codec = build_primary_key_codec(&metadata);
1397 let schema = to_flat_sst_arrow_schema(
1398 &metadata,
1399 &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
1400 );
1401
1402 let mut converter =
1403 BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec, true);
1404
1405 let key_values = build_key_values_with_ts_seq_values(
1406 &metadata,
1407 k0.to_string(),
1408 k1,
1409 timestamps.into_iter(),
1410 values.into_iter(),
1411 sequence,
1412 );
1413
1414 converter.append_key_values(&key_values)?;
1415 converter.convert()
1416 }
1417
1418 #[test]
1419 fn test_bulk_memtable_write_read() {
1420 let metadata = metadata_for_test();
1421 let memtable = BulkMemtable::new(
1422 999,
1423 BulkMemtableConfig::default(),
1424 metadata.clone(),
1425 None,
1426 None,
1427 false,
1428 MergeMode::LastRow,
1429 );
1430 memtable.set_unordered_part_threshold(0);
1432
1433 let test_data = [
1434 (
1435 "key_a",
1436 1u32,
1437 vec![1000i64, 2000i64],
1438 vec![Some(10.5), Some(20.5)],
1439 100u64,
1440 ),
1441 (
1442 "key_b",
1443 2u32,
1444 vec![1500i64, 2500i64],
1445 vec![Some(15.5), Some(25.5)],
1446 200u64,
1447 ),
1448 ("key_c", 3u32, vec![3000i64], vec![Some(30.5)], 300u64),
1449 ];
1450
1451 for (k0, k1, timestamps, values, seq) in test_data.iter() {
1452 let part =
1453 create_bulk_part_with_converter(k0, *k1, timestamps.clone(), values.clone(), *seq)
1454 .unwrap();
1455 memtable.write_bulk(part).unwrap();
1456 }
1457
1458 let stats = memtable.stats();
1459 assert_eq!(5, stats.num_rows);
1460 assert_eq!(3, stats.num_ranges);
1461 assert_eq!(300, stats.max_sequence);
1462
1463 let (min_ts, max_ts) = stats.time_range.unwrap();
1464 assert_eq!(1000, min_ts.value());
1465 assert_eq!(3000, max_ts.value());
1466
1467 let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
1468 let ranges = memtable
1469 .ranges(
1470 None,
1471 RangesOptions::default().with_predicate(predicate_group),
1472 )
1473 .unwrap();
1474
1475 assert_eq!(3, ranges.ranges.len());
1476 let total_rows: usize = ranges.ranges.values().map(|r| r.stats().num_rows()).sum();
1477 assert_eq!(5, total_rows);
1478
1479 for (_range_id, range) in ranges.ranges.iter() {
1480 assert!(range.num_rows() > 0);
1481 assert!(range.is_record_batch());
1482
1483 let record_batch_iter = range.build_record_batch_iter(None, None).unwrap();
1484
1485 let mut total_rows = 0;
1486 for batch_result in record_batch_iter {
1487 let batch = batch_result.unwrap();
1488 total_rows += batch.num_rows();
1489 assert!(batch.num_rows() > 0);
1490 assert_eq!(8, batch.num_columns());
1491 }
1492 assert_eq!(total_rows, range.num_rows());
1493 }
1494 }
1495
1496 #[test]
1497 fn test_bulk_memtable_ranges_with_projection() {
1498 let metadata = metadata_for_test();
1499 let memtable = BulkMemtable::new(
1500 111,
1501 BulkMemtableConfig::default(),
1502 metadata.clone(),
1503 None,
1504 None,
1505 false,
1506 MergeMode::LastRow,
1507 );
1508
1509 let bulk_part = create_bulk_part_with_converter(
1510 "projection_test",
1511 5,
1512 vec![5000, 6000, 7000],
1513 vec![Some(50.0), Some(60.0), Some(70.0)],
1514 500,
1515 )
1516 .unwrap();
1517
1518 memtable.write_bulk(bulk_part).unwrap();
1519
1520 let projection = vec![4u32];
1521 let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
1522 let ranges = memtable
1523 .ranges(
1524 Some(&projection),
1525 RangesOptions::default().with_predicate(predicate_group),
1526 )
1527 .unwrap();
1528
1529 assert_eq!(1, ranges.ranges.len());
1530 let range = ranges.ranges.get(&0).unwrap();
1531
1532 assert!(range.is_record_batch());
1533 let record_batch_iter = range.build_record_batch_iter(None, None).unwrap();
1534
1535 let mut total_rows = 0;
1536 for batch_result in record_batch_iter {
1537 let batch = batch_result.unwrap();
1538 assert!(batch.num_rows() > 0);
1539 assert_eq!(5, batch.num_columns());
1540 total_rows += batch.num_rows();
1541 }
1542 assert_eq!(3, total_rows);
1543 }
1544
1545 #[test]
1546 fn test_bulk_memtable_unsupported_operations() {
1547 let metadata = metadata_for_test();
1548 let memtable = BulkMemtable::new(
1549 111,
1550 BulkMemtableConfig::default(),
1551 metadata.clone(),
1552 None,
1553 None,
1554 false,
1555 MergeMode::LastRow,
1556 );
1557
1558 let key_values = build_key_values_with_ts_seq_values(
1559 &metadata,
1560 "test".to_string(),
1561 1,
1562 vec![1000].into_iter(),
1563 vec![Some(1.0)].into_iter(),
1564 1,
1565 );
1566
1567 let err = memtable.write(&key_values).unwrap_err();
1568 assert!(err.to_string().contains("not supported"));
1569
1570 let kv = key_values.iter().next().unwrap();
1571 let err = memtable.write_one(kv).unwrap_err();
1572 assert!(err.to_string().contains("not supported"));
1573 }
1574
1575 #[test]
1576 fn test_bulk_memtable_freeze() {
1577 let metadata = metadata_for_test();
1578 let memtable = BulkMemtable::new(
1579 222,
1580 BulkMemtableConfig::default(),
1581 metadata.clone(),
1582 None,
1583 None,
1584 false,
1585 MergeMode::LastRow,
1586 );
1587
1588 let bulk_part = create_bulk_part_with_converter(
1589 "freeze_test",
1590 10,
1591 vec![10000],
1592 vec![Some(100.0)],
1593 1000,
1594 )
1595 .unwrap();
1596
1597 memtable.write_bulk(bulk_part).unwrap();
1598 memtable.freeze().unwrap();
1599
1600 let stats_after_freeze = memtable.stats();
1601 assert_eq!(1, stats_after_freeze.num_rows);
1602 }
1603
1604 #[test]
1605 fn test_bulk_memtable_fork() {
1606 let metadata = metadata_for_test();
1607 let original_memtable = BulkMemtable::new(
1608 333,
1609 BulkMemtableConfig::default(),
1610 metadata.clone(),
1611 None,
1612 None,
1613 false,
1614 MergeMode::LastRow,
1615 );
1616
1617 let bulk_part =
1618 create_bulk_part_with_converter("fork_test", 15, vec![15000], vec![Some(150.0)], 1500)
1619 .unwrap();
1620
1621 original_memtable.write_bulk(bulk_part).unwrap();
1622
1623 let forked_memtable = original_memtable.fork(444, &metadata);
1624
1625 assert_eq!(forked_memtable.id(), 444);
1626 assert!(forked_memtable.is_empty());
1627 assert_eq!(0, forked_memtable.stats().num_rows);
1628
1629 assert!(!original_memtable.is_empty());
1630 assert_eq!(1, original_memtable.stats().num_rows);
1631 }
1632
1633 #[test]
1634 fn test_bulk_memtable_ranges_multiple_parts() {
1635 let metadata = metadata_for_test();
1636 let memtable = BulkMemtable::new(
1637 777,
1638 BulkMemtableConfig::default(),
1639 metadata.clone(),
1640 None,
1641 None,
1642 false,
1643 MergeMode::LastRow,
1644 );
1645 memtable.set_unordered_part_threshold(0);
1647
1648 let parts_data = vec![
1649 (
1650 "part1",
1651 1u32,
1652 vec![1000i64, 1100i64],
1653 vec![Some(10.0), Some(11.0)],
1654 100u64,
1655 ),
1656 (
1657 "part2",
1658 2u32,
1659 vec![2000i64, 2100i64],
1660 vec![Some(20.0), Some(21.0)],
1661 200u64,
1662 ),
1663 ("part3", 3u32, vec![3000i64], vec![Some(30.0)], 300u64),
1664 ];
1665
1666 for (k0, k1, timestamps, values, seq) in parts_data {
1667 let part = create_bulk_part_with_converter(k0, k1, timestamps, values, seq).unwrap();
1668 memtable.write_bulk(part).unwrap();
1669 }
1670
1671 let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
1672 let ranges = memtable
1673 .ranges(
1674 None,
1675 RangesOptions::default().with_predicate(predicate_group),
1676 )
1677 .unwrap();
1678
1679 assert_eq!(3, ranges.ranges.len());
1680 let total_rows: usize = ranges.ranges.values().map(|r| r.stats().num_rows()).sum();
1681 assert_eq!(5, total_rows);
1682 assert_eq!(3, ranges.ranges.len());
1683
1684 for (range_id, range) in ranges.ranges.iter() {
1685 assert!(*range_id < 3);
1686 assert!(range.num_rows() > 0);
1687 assert!(range.is_record_batch());
1688 }
1689 }
1690
1691 #[test]
1692 fn test_bulk_memtable_ranges_with_sequence_filter() {
1693 let metadata = metadata_for_test();
1694 let memtable = BulkMemtable::new(
1695 888,
1696 BulkMemtableConfig::default(),
1697 metadata.clone(),
1698 None,
1699 None,
1700 false,
1701 MergeMode::LastRow,
1702 );
1703
1704 let part = create_bulk_part_with_converter(
1705 "seq_test",
1706 1,
1707 vec![1000, 2000, 3000],
1708 vec![Some(10.0), Some(20.0), Some(30.0)],
1709 500,
1710 )
1711 .unwrap();
1712
1713 memtable.write_bulk(part).unwrap();
1714
1715 let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
1716 let sequence_filter = Some(SequenceRange::LtEq { max: 400 }); let ranges = memtable
1718 .ranges(
1719 None,
1720 RangesOptions::default()
1721 .with_predicate(predicate_group)
1722 .with_sequence(sequence_filter),
1723 )
1724 .unwrap();
1725
1726 assert_eq!(1, ranges.ranges.len());
1727 let range = ranges.ranges.get(&0).unwrap();
1728
1729 let mut record_batch_iter = range.build_record_batch_iter(None, None).unwrap();
1730 assert!(record_batch_iter.next().is_none());
1731 }
1732
1733 #[test]
1734 fn test_bulk_memtable_ranges_with_encoded_parts() {
1735 let metadata = metadata_for_test();
1736 let config = BulkMemtableConfig {
1737 merge_threshold: 8,
1738 ..Default::default()
1739 };
1740 let memtable = BulkMemtable::new(
1741 999,
1742 config,
1743 metadata.clone(),
1744 None,
1745 None,
1746 false,
1747 MergeMode::LastRow,
1748 );
1749 memtable.set_unordered_part_threshold(0);
1751
1752 for i in 0..10 {
1754 let part = create_bulk_part_with_converter(
1755 &format!("key_{}", i),
1756 i,
1757 vec![1000 + i as i64 * 100],
1758 vec![Some(i as f64 * 10.0)],
1759 100 + i as u64,
1760 )
1761 .unwrap();
1762 memtable.write_bulk(part).unwrap();
1763 }
1764
1765 memtable.compact(false).unwrap();
1766
1767 let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
1768 let ranges = memtable
1769 .ranges(
1770 None,
1771 RangesOptions::default().with_predicate(predicate_group),
1772 )
1773 .unwrap();
1774
1775 assert_eq!(3, ranges.ranges.len());
1777 let total_rows: usize = ranges.ranges.values().map(|r| r.stats().num_rows()).sum();
1778 assert_eq!(10, total_rows);
1779
1780 for (_range_id, range) in ranges.ranges.iter() {
1781 assert!(range.num_rows() > 0);
1782 assert!(range.is_record_batch());
1783
1784 let record_batch_iter = range.build_record_batch_iter(None, None).unwrap();
1785 let mut total_rows = 0;
1786 for batch_result in record_batch_iter {
1787 let batch = batch_result.unwrap();
1788 total_rows += batch.num_rows();
1789 assert!(batch.num_rows() > 0);
1790 }
1791 assert_eq!(total_rows, range.num_rows());
1792 }
1793 }
1794
1795 #[test]
1796 fn test_bulk_memtable_unordered_part() {
1797 let metadata = metadata_for_test();
1798 let memtable = BulkMemtable::new(
1799 1001,
1800 BulkMemtableConfig::default(),
1801 metadata.clone(),
1802 None,
1803 None,
1804 false,
1805 MergeMode::LastRow,
1806 );
1807
1808 memtable.set_unordered_part_threshold(5);
1811 memtable.set_unordered_part_compact_threshold(10);
1813
1814 for i in 0..3 {
1816 let part = create_bulk_part_with_converter(
1817 &format!("key_{}", i),
1818 i,
1819 vec![1000 + i as i64 * 100, 1100 + i as i64 * 100],
1820 vec![Some(i as f64 * 10.0), Some(i as f64 * 10.0 + 1.0)],
1821 100 + i as u64,
1822 )
1823 .unwrap();
1824 assert_eq!(2, part.num_rows());
1825 memtable.write_bulk(part).unwrap();
1826 }
1827
1828 let stats = memtable.stats();
1830 assert_eq!(6, stats.num_rows);
1831
1832 for i in 3..5 {
1835 let part = create_bulk_part_with_converter(
1836 &format!("key_{}", i),
1837 i,
1838 vec![1000 + i as i64 * 100, 1100 + i as i64 * 100],
1839 vec![Some(i as f64 * 10.0), Some(i as f64 * 10.0 + 1.0)],
1840 100 + i as u64,
1841 )
1842 .unwrap();
1843 memtable.write_bulk(part).unwrap();
1844 }
1845
1846 let stats = memtable.stats();
1848 assert_eq!(10, stats.num_rows);
1849
1850 let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
1852 let ranges = memtable
1853 .ranges(
1854 None,
1855 RangesOptions::default().with_predicate(predicate_group),
1856 )
1857 .unwrap();
1858
1859 assert!(!ranges.ranges.is_empty());
1861 let total_rows: usize = ranges.ranges.values().map(|r| r.stats().num_rows()).sum();
1862 assert_eq!(10, total_rows);
1863
1864 let mut total_rows_read = 0;
1866 for (_range_id, range) in ranges.ranges.iter() {
1867 assert!(range.is_record_batch());
1868 let record_batch_iter = range.build_record_batch_iter(None, None).unwrap();
1869
1870 for batch_result in record_batch_iter {
1871 let batch = batch_result.unwrap();
1872 total_rows_read += batch.num_rows();
1873 }
1874 }
1875 assert_eq!(10, total_rows_read);
1876 }
1877
1878 #[test]
1879 fn test_bulk_memtable_unordered_part_mixed_sizes() {
1880 let metadata = metadata_for_test();
1881 let memtable = BulkMemtable::new(
1882 1002,
1883 BulkMemtableConfig::default(),
1884 metadata.clone(),
1885 None,
1886 None,
1887 false,
1888 MergeMode::LastRow,
1889 );
1890
1891 memtable.set_unordered_part_threshold(4);
1893 memtable.set_unordered_part_compact_threshold(8);
1894
1895 for i in 0..2 {
1897 let part = create_bulk_part_with_converter(
1898 &format!("small_{}", i),
1899 i,
1900 vec![1000 + i as i64, 2000 + i as i64, 3000 + i as i64],
1901 vec![Some(i as f64), Some(i as f64 + 1.0), Some(i as f64 + 2.0)],
1902 10 + i as u64,
1903 )
1904 .unwrap();
1905 assert_eq!(3, part.num_rows());
1906 memtable.write_bulk(part).unwrap();
1907 }
1908
1909 let large_part = create_bulk_part_with_converter(
1911 "large_key",
1912 100,
1913 vec![5000, 6000, 7000, 8000, 9000],
1914 vec![
1915 Some(100.0),
1916 Some(101.0),
1917 Some(102.0),
1918 Some(103.0),
1919 Some(104.0),
1920 ],
1921 50,
1922 )
1923 .unwrap();
1924 assert_eq!(5, large_part.num_rows());
1925 memtable.write_bulk(large_part).unwrap();
1926
1927 let part = create_bulk_part_with_converter(
1929 "small_2",
1930 2,
1931 vec![4000, 4100],
1932 vec![Some(20.0), Some(21.0)],
1933 30,
1934 )
1935 .unwrap();
1936 memtable.write_bulk(part).unwrap();
1937
1938 let stats = memtable.stats();
1939 assert_eq!(13, stats.num_rows); let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
1943 let ranges = memtable
1944 .ranges(
1945 None,
1946 RangesOptions::default().with_predicate(predicate_group),
1947 )
1948 .unwrap();
1949
1950 let total_rows: usize = ranges.ranges.values().map(|r| r.stats().num_rows()).sum();
1951 assert_eq!(13, total_rows);
1952
1953 let mut total_rows_read = 0;
1954 for (_range_id, range) in ranges.ranges.iter() {
1955 let record_batch_iter = range.build_record_batch_iter(None, None).unwrap();
1956 for batch_result in record_batch_iter {
1957 let batch = batch_result.unwrap();
1958 total_rows_read += batch.num_rows();
1959 }
1960 }
1961 assert_eq!(13, total_rows_read);
1962 }
1963
1964 #[test]
1965 fn test_bulk_memtable_unordered_part_with_ranges() {
1966 let metadata = metadata_for_test();
1967 let memtable = BulkMemtable::new(
1968 1003,
1969 BulkMemtableConfig::default(),
1970 metadata.clone(),
1971 None,
1972 None,
1973 false,
1974 MergeMode::LastRow,
1975 );
1976
1977 memtable.set_unordered_part_threshold(3);
1979 memtable.set_unordered_part_compact_threshold(100); for i in 0..3 {
1983 let part = create_bulk_part_with_converter(
1984 &format!("key_{}", i),
1985 i,
1986 vec![1000 + i as i64 * 100],
1987 vec![Some(i as f64 * 10.0)],
1988 100 + i as u64,
1989 )
1990 .unwrap();
1991 assert_eq!(1, part.num_rows());
1992 memtable.write_bulk(part).unwrap();
1993 }
1994
1995 let stats = memtable.stats();
1996 assert_eq!(3, stats.num_rows);
1997
1998 let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
2000 let ranges = memtable
2001 .ranges(
2002 None,
2003 RangesOptions::default().with_predicate(predicate_group),
2004 )
2005 .unwrap();
2006
2007 assert_eq!(1, ranges.ranges.len());
2009 let total_rows: usize = ranges.ranges.values().map(|r| r.stats().num_rows()).sum();
2010 assert_eq!(3, total_rows);
2011
2012 let range = ranges.ranges.get(&0).unwrap();
2014 let record_batch_iter = range.build_record_batch_iter(None, None).unwrap();
2015
2016 let mut total_rows = 0;
2017 for batch_result in record_batch_iter {
2018 let batch = batch_result.unwrap();
2019 total_rows += batch.num_rows();
2020 assert!(batch.num_rows() > 0);
2022 }
2023 assert_eq!(3, total_rows);
2024 }
2025
2026 fn create_bulk_part_wrapper(part: BulkPart) -> BulkPartWrapper {
2028 BulkPartWrapper {
2029 part: PartToMerge::Bulk {
2030 part,
2031 file_id: FileId::random(),
2032 },
2033 merging: false,
2034 }
2035 }
2036
2037 #[test]
2038 fn test_should_merge_parts_below_threshold() {
2039 let mut bulk_parts = BulkParts::default();
2040
2041 for i in 0..DEFAULT_MERGE_THRESHOLD - 1 {
2043 let part = create_bulk_part_with_converter(
2044 &format!("key_{}", i),
2045 i as u32,
2046 vec![1000 + i as i64 * 100],
2047 vec![Some(i as f64 * 10.0)],
2048 100 + i as u64,
2049 )
2050 .unwrap();
2051 bulk_parts.parts.push(create_bulk_part_wrapper(part));
2052 }
2053
2054 assert!(!bulk_parts.should_merge_parts(DEFAULT_MERGE_THRESHOLD));
2056 }
2057
2058 #[test]
2059 fn test_should_merge_parts_at_threshold() {
2060 let mut bulk_parts = BulkParts::default();
2061 let merge_threshold = 8;
2062
2063 for i in 0..merge_threshold {
2065 let part = create_bulk_part_with_converter(
2066 &format!("key_{}", i),
2067 i as u32,
2068 vec![1000 + i as i64 * 100],
2069 vec![Some(i as f64 * 10.0)],
2070 100 + i as u64,
2071 )
2072 .unwrap();
2073 bulk_parts.parts.push(create_bulk_part_wrapper(part));
2074 }
2075
2076 assert!(bulk_parts.should_merge_parts(merge_threshold));
2078 }
2079
2080 #[test]
2081 fn test_should_merge_parts_with_merging_flag() {
2082 let mut bulk_parts = BulkParts::default();
2083 let merge_threshold = 8;
2084
2085 for i in 0..10 {
2087 let part = create_bulk_part_with_converter(
2088 &format!("key_{}", i),
2089 i as u32,
2090 vec![1000 + i as i64 * 100],
2091 vec![Some(i as f64 * 10.0)],
2092 100 + i as u64,
2093 )
2094 .unwrap();
2095 bulk_parts.parts.push(create_bulk_part_wrapper(part));
2096 }
2097
2098 assert!(bulk_parts.should_merge_parts(merge_threshold));
2100
2101 for wrapper in bulk_parts.parts.iter_mut().take(3) {
2103 wrapper.merging = true;
2104 }
2105
2106 assert!(!bulk_parts.should_merge_parts(merge_threshold));
2108 }
2109
2110 #[test]
2111 fn test_collect_parts_to_merge_grouping() {
2112 let mut bulk_parts = BulkParts::default();
2113
2114 for i in 0..16 {
2116 let num_rows = (i % 4) + 1; let timestamps: Vec<i64> = (0..num_rows)
2118 .map(|j| 1000 + i as i64 * 100 + j as i64)
2119 .collect();
2120 let values: Vec<Option<f64>> =
2121 (0..num_rows).map(|j| Some((i * 10 + j) as f64)).collect();
2122 let part = create_bulk_part_with_converter(
2123 &format!("key_{}", i),
2124 i as u32,
2125 timestamps,
2126 values,
2127 100 + i as u64,
2128 )
2129 .unwrap();
2130 bulk_parts.parts.push(create_bulk_part_wrapper(part));
2131 }
2132
2133 assert!(bulk_parts.should_merge_parts(DEFAULT_MERGE_THRESHOLD));
2135
2136 let collected =
2138 bulk_parts.collect_parts_to_merge(DEFAULT_MERGE_THRESHOLD, DEFAULT_MAX_MERGE_GROUPS);
2139
2140 assert!(!collected.groups.is_empty());
2142
2143 for group in &collected.groups {
2145 assert!(!group.is_empty());
2146 }
2147
2148 let total_parts: usize = collected.groups.iter().map(|g| g.len()).sum();
2150 assert_eq!(16, total_parts);
2151 }
2152
2153 #[test]
2154 fn test_bulk_memtable_ranges_with_multi_bulk_part() {
2155 let metadata = metadata_for_test();
2156 let merge_threshold = 8;
2157 let config = BulkMemtableConfig {
2158 merge_threshold,
2159 ..Default::default()
2160 };
2161 let memtable = BulkMemtable::new(
2162 2005,
2163 config,
2164 metadata.clone(),
2165 None,
2166 None,
2167 false,
2168 MergeMode::LastRow,
2169 );
2170 memtable.set_unordered_part_threshold(0);
2172
2173 for i in 0..merge_threshold {
2177 let part = create_bulk_part_with_converter(
2178 &format!("key_{}", i),
2179 i as u32,
2180 vec![1000 + i as i64 * 100, 2000 + i as i64 * 100],
2181 vec![Some(i as f64 * 10.0), Some(i as f64 * 10.0 + 1.0)],
2182 100 + i as u64,
2183 )
2184 .unwrap();
2185 memtable.write_bulk(part).unwrap();
2186 }
2187
2188 memtable.compact(false).unwrap();
2190
2191 let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
2193 let ranges = memtable
2194 .ranges(
2195 None,
2196 RangesOptions::default().with_predicate(predicate_group),
2197 )
2198 .unwrap();
2199
2200 assert_eq!(1, ranges.ranges.len());
2201 let expected_rows = merge_threshold * 2; let total_rows: usize = ranges.ranges.values().map(|r| r.stats().num_rows()).sum();
2203 assert_eq!(expected_rows, total_rows);
2204
2205 let mut total_rows_read = 0;
2207 for (_range_id, range) in ranges.ranges.iter() {
2208 assert!(range.is_record_batch());
2209 let record_batch_iter = range.build_record_batch_iter(None, None).unwrap();
2210
2211 for batch_result in record_batch_iter {
2212 let batch = batch_result.unwrap();
2213 total_rows_read += batch.num_rows();
2214 }
2215 }
2216 assert_eq!(expected_rows, total_rows_read);
2217 }
2218
2219 #[test]
2220 fn test_multi_bulk_range_iter_builder_all_pruned() {
2221 let metadata = metadata_for_test();
2222 let merge_threshold = 8;
2223 let config = BulkMemtableConfig {
2224 merge_threshold,
2225 ..Default::default()
2226 };
2227 let memtable = BulkMemtable::new(
2228 2006,
2229 config,
2230 metadata.clone(),
2231 None,
2232 None,
2233 false,
2234 MergeMode::LastRow,
2235 );
2236 memtable.set_unordered_part_threshold(0);
2237
2238 for i in 0..merge_threshold {
2240 let part = create_bulk_part_with_converter(
2241 &format!("key_{}", i),
2242 i as u32,
2243 vec![1000 + i as i64 * 100, 2000 + i as i64 * 100],
2244 vec![Some(i as f64 * 10.0), Some(i as f64 * 10.0 + 1.0)],
2245 100 + i as u64,
2246 )
2247 .unwrap();
2248 memtable.write_bulk(part).unwrap();
2249 }
2250 memtable.compact(false).unwrap();
2251
2252 let filter = datafusion_expr::col("k0").eq(datafusion_expr::lit("nonexistent"));
2254 let predicate_group = PredicateGroup::new(&metadata, &[filter]).unwrap();
2255 let ranges = memtable
2256 .ranges(
2257 None,
2258 RangesOptions::default().with_predicate(predicate_group),
2259 )
2260 .unwrap();
2261
2262 for (_range_id, range) in ranges.ranges.iter() {
2265 assert!(range.is_record_batch());
2266 let record_batch_iter = range.build_record_batch_iter(None, None).unwrap();
2267 let total_rows: usize = record_batch_iter.map(|r| r.unwrap().num_rows()).sum();
2268 assert_eq!(0, total_rows);
2269 }
2270 }
2271}