1#[allow(unused)]
18pub mod context;
19#[allow(unused)]
20pub mod part;
21pub mod part_reader;
22mod row_group_reader;
23
24use std::collections::{BTreeMap, HashSet};
25use std::sync::atomic::{AtomicI64, AtomicU64, AtomicUsize, Ordering};
26use std::sync::{Arc, Mutex, RwLock};
27use std::time::Instant;
28
29use datatypes::arrow::datatypes::SchemaRef;
30use mito_codec::key_values::KeyValue;
31use rayon::prelude::*;
32use store_api::metadata::RegionMetadataRef;
33use store_api::storage::{ColumnId, FileId, RegionId, SequenceRange};
34use tokio::sync::Semaphore;
35
36use crate::error::{Result, UnsupportedOperationSnafu};
37use crate::flush::WriteBufferManagerRef;
38use crate::memtable::bulk::context::BulkIterContext;
39use crate::memtable::bulk::part::{
40 BulkPart, BulkPartEncodeMetrics, BulkPartEncoder, UnorderedPart,
41};
42use crate::memtable::bulk::part_reader::BulkPartRecordBatchIter;
43use crate::memtable::stats::WriteMetrics;
44use crate::memtable::{
45 AllocTracker, BoxedBatchIterator, BoxedRecordBatchIterator, EncodedBulkPart, EncodedRange,
46 IterBuilder, KeyValues, MemScanMetrics, Memtable, MemtableBuilder, MemtableId, MemtableRange,
47 MemtableRangeContext, MemtableRanges, MemtableRef, MemtableStats, RangesOptions,
48};
49use crate::read::flat_dedup::{FlatDedupIterator, FlatLastNonNull, FlatLastRow};
50use crate::read::flat_merge::FlatMergeIterator;
51use crate::region::options::MergeMode;
52use crate::sst::parquet::format::FIXED_POS_COLUMN_NUM;
53use crate::sst::parquet::{DEFAULT_READ_BATCH_SIZE, DEFAULT_ROW_GROUP_SIZE};
54use crate::sst::{FlatSchemaOptions, to_flat_sst_arrow_schema};
55
56#[derive(Default)]
58struct BulkParts {
59 unordered_part: UnorderedPart,
61 parts: Vec<BulkPartWrapper>,
63 encoded_parts: Vec<EncodedPartWrapper>,
65}
66
67impl BulkParts {
68 fn num_parts(&self) -> usize {
70 let unordered_count = if self.unordered_part.is_empty() { 0 } else { 1 };
71 self.parts.len() + self.encoded_parts.len() + unordered_count
72 }
73
74 fn is_empty(&self) -> bool {
76 self.unordered_part.is_empty() && self.parts.is_empty() && self.encoded_parts.is_empty()
77 }
78
79 fn should_merge_bulk_parts(&self) -> bool {
81 let unmerged_count = self.parts.iter().filter(|wrapper| !wrapper.merging).count();
82 unmerged_count >= 8
84 }
85
86 fn should_merge_encoded_parts(&self) -> bool {
88 let unmerged_count = self
89 .encoded_parts
90 .iter()
91 .filter(|wrapper| !wrapper.merging)
92 .count();
93 unmerged_count >= 8
95 }
96
97 fn should_compact_unordered_part(&self) -> bool {
99 self.unordered_part.should_compact()
100 }
101
102 fn collect_bulk_parts_to_merge(&mut self) -> Vec<PartToMerge> {
105 let mut collected_parts = Vec::new();
106
107 for wrapper in &mut self.parts {
108 if !wrapper.merging {
109 wrapper.merging = true;
110 collected_parts.push(PartToMerge::Bulk {
111 part: wrapper.part.clone(),
112 file_id: wrapper.file_id,
113 });
114 }
115 }
116 collected_parts
117 }
118
119 fn collect_encoded_parts_to_merge(&mut self) -> Vec<PartToMerge> {
122 let min_size = self
124 .encoded_parts
125 .iter()
126 .filter(|wrapper| !wrapper.merging)
127 .map(|wrapper| wrapper.part.size_bytes())
128 .min();
129
130 let Some(min_size) = min_size else {
131 return Vec::new();
132 };
133
134 let max_allowed_size = min_size.saturating_mul(16).min(4 * 1024 * 1024);
135 let mut collected_parts = Vec::new();
136
137 for wrapper in &mut self.encoded_parts {
138 if !wrapper.merging {
139 let size = wrapper.part.size_bytes();
140 if size <= max_allowed_size {
141 wrapper.merging = true;
142 collected_parts.push(PartToMerge::Encoded {
143 part: wrapper.part.clone(),
144 file_id: wrapper.file_id,
145 });
146 }
147 }
148 }
149 collected_parts
150 }
151
152 fn install_merged_parts<I>(
155 &mut self,
156 merged_parts: I,
157 merged_file_ids: &HashSet<FileId>,
158 merge_encoded: bool,
159 ) -> usize
160 where
161 I: IntoIterator<Item = EncodedBulkPart>,
162 {
163 let mut total_output_rows = 0;
164
165 for encoded_part in merged_parts {
166 total_output_rows += encoded_part.metadata().num_rows;
167 self.encoded_parts.push(EncodedPartWrapper {
168 part: encoded_part,
169 file_id: FileId::random(),
170 merging: false,
171 });
172 }
173
174 if merge_encoded {
175 self.encoded_parts
176 .retain(|wrapper| !merged_file_ids.contains(&wrapper.file_id));
177 } else {
178 self.parts
179 .retain(|wrapper| !merged_file_ids.contains(&wrapper.file_id));
180 }
181
182 total_output_rows
183 }
184
185 fn reset_merging_flags(&mut self, file_ids: &HashSet<FileId>, merge_encoded: bool) {
188 if merge_encoded {
189 for wrapper in &mut self.encoded_parts {
190 if file_ids.contains(&wrapper.file_id) {
191 wrapper.merging = false;
192 }
193 }
194 } else {
195 for wrapper in &mut self.parts {
196 if file_ids.contains(&wrapper.file_id) {
197 wrapper.merging = false;
198 }
199 }
200 }
201 }
202}
203
204struct MergingFlagsGuard<'a> {
207 bulk_parts: &'a RwLock<BulkParts>,
208 file_ids: &'a HashSet<FileId>,
209 merge_encoded: bool,
210 success: bool,
211}
212
213impl<'a> MergingFlagsGuard<'a> {
214 fn new(
216 bulk_parts: &'a RwLock<BulkParts>,
217 file_ids: &'a HashSet<FileId>,
218 merge_encoded: bool,
219 ) -> Self {
220 Self {
221 bulk_parts,
222 file_ids,
223 merge_encoded,
224 success: false,
225 }
226 }
227
228 fn mark_success(&mut self) {
231 self.success = true;
232 }
233}
234
235impl<'a> Drop for MergingFlagsGuard<'a> {
236 fn drop(&mut self) {
237 if !self.success
238 && let Ok(mut parts) = self.bulk_parts.write()
239 {
240 parts.reset_merging_flags(self.file_ids, self.merge_encoded);
241 }
242 }
243}
244
245pub struct BulkMemtable {
247 id: MemtableId,
248 parts: Arc<RwLock<BulkParts>>,
249 metadata: RegionMetadataRef,
250 alloc_tracker: AllocTracker,
251 max_timestamp: AtomicI64,
252 min_timestamp: AtomicI64,
253 max_sequence: AtomicU64,
254 num_rows: AtomicUsize,
255 flat_arrow_schema: SchemaRef,
257 compactor: Arc<Mutex<MemtableCompactor>>,
259 compact_dispatcher: Option<Arc<CompactDispatcher>>,
261 append_mode: bool,
263 merge_mode: MergeMode,
265}
266
267impl std::fmt::Debug for BulkMemtable {
268 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
269 f.debug_struct("BulkMemtable")
270 .field("id", &self.id)
271 .field("num_rows", &self.num_rows.load(Ordering::Relaxed))
272 .field("min_timestamp", &self.min_timestamp.load(Ordering::Relaxed))
273 .field("max_timestamp", &self.max_timestamp.load(Ordering::Relaxed))
274 .field("max_sequence", &self.max_sequence.load(Ordering::Relaxed))
275 .finish()
276 }
277}
278
279impl Memtable for BulkMemtable {
280 fn id(&self) -> MemtableId {
281 self.id
282 }
283
284 fn write(&self, _kvs: &KeyValues) -> Result<()> {
285 UnsupportedOperationSnafu {
286 err_msg: "write() is not supported for bulk memtable",
287 }
288 .fail()
289 }
290
291 fn write_one(&self, _key_value: KeyValue) -> Result<()> {
292 UnsupportedOperationSnafu {
293 err_msg: "write_one() is not supported for bulk memtable",
294 }
295 .fail()
296 }
297
298 fn write_bulk(&self, fragment: BulkPart) -> Result<()> {
299 let local_metrics = WriteMetrics {
300 key_bytes: 0,
301 value_bytes: fragment.estimated_size(),
302 min_ts: fragment.min_timestamp,
303 max_ts: fragment.max_timestamp,
304 num_rows: fragment.num_rows(),
305 max_sequence: fragment.sequence,
306 };
307
308 {
309 let mut bulk_parts = self.parts.write().unwrap();
310
311 if bulk_parts.unordered_part.should_accept(fragment.num_rows()) {
313 bulk_parts.unordered_part.push(fragment);
314
315 if bulk_parts.should_compact_unordered_part()
317 && let Some(bulk_part) = bulk_parts.unordered_part.to_bulk_part()?
318 {
319 bulk_parts.parts.push(BulkPartWrapper {
320 part: bulk_part,
321 file_id: FileId::random(),
322 merging: false,
323 });
324 bulk_parts.unordered_part.clear();
325 }
326 } else {
327 bulk_parts.parts.push(BulkPartWrapper {
328 part: fragment,
329 file_id: FileId::random(),
330 merging: false,
331 });
332 }
333
334 self.update_stats(local_metrics);
339 }
340
341 if self.should_compact() {
342 self.schedule_compact();
343 }
344
345 Ok(())
346 }
347
348 #[cfg(any(test, feature = "test"))]
349 fn iter(
350 &self,
351 _projection: Option<&[ColumnId]>,
352 _predicate: Option<table::predicate::Predicate>,
353 _sequence: Option<SequenceRange>,
354 ) -> Result<crate::memtable::BoxedBatchIterator> {
355 todo!()
356 }
357
358 fn ranges(
359 &self,
360 projection: Option<&[ColumnId]>,
361 options: RangesOptions,
362 ) -> Result<MemtableRanges> {
363 let predicate = options.predicate;
364 let sequence = options.sequence;
365 let mut ranges = BTreeMap::new();
366 let mut range_id = 0;
367
368 let context = Arc::new(BulkIterContext::new_with_pre_filter_mode(
370 self.metadata.clone(),
371 projection,
372 predicate.predicate().cloned(),
373 options.for_flush,
374 options.pre_filter_mode,
375 )?);
376
377 {
379 let bulk_parts = self.parts.read().unwrap();
380
381 if !bulk_parts.unordered_part.is_empty()
383 && let Some(unordered_bulk_part) = bulk_parts.unordered_part.to_bulk_part()?
384 {
385 let part_stats = unordered_bulk_part.to_memtable_stats(&self.metadata);
386 let range = MemtableRange::new(
387 Arc::new(MemtableRangeContext::new(
388 self.id,
389 Box::new(BulkRangeIterBuilder {
390 part: unordered_bulk_part,
391 context: context.clone(),
392 sequence,
393 }),
394 predicate.clone(),
395 )),
396 part_stats,
397 );
398 ranges.insert(range_id, range);
399 range_id += 1;
400 }
401
402 for part_wrapper in bulk_parts.parts.iter() {
404 if part_wrapper.part.num_rows() == 0 {
406 continue;
407 }
408
409 let part_stats = part_wrapper.part.to_memtable_stats(&self.metadata);
410 let range = MemtableRange::new(
411 Arc::new(MemtableRangeContext::new(
412 self.id,
413 Box::new(BulkRangeIterBuilder {
414 part: part_wrapper.part.clone(),
415 context: context.clone(),
416 sequence,
417 }),
418 predicate.clone(),
419 )),
420 part_stats,
421 );
422 ranges.insert(range_id, range);
423 range_id += 1;
424 }
425
426 for encoded_part_wrapper in bulk_parts.encoded_parts.iter() {
428 if encoded_part_wrapper.part.metadata().num_rows == 0 {
430 continue;
431 }
432
433 let part_stats = encoded_part_wrapper.part.to_memtable_stats();
434 let range = MemtableRange::new(
435 Arc::new(MemtableRangeContext::new(
436 self.id,
437 Box::new(EncodedBulkRangeIterBuilder {
438 file_id: encoded_part_wrapper.file_id,
439 part: encoded_part_wrapper.part.clone(),
440 context: context.clone(),
441 sequence,
442 }),
443 predicate.clone(),
444 )),
445 part_stats,
446 );
447 ranges.insert(range_id, range);
448 range_id += 1;
449 }
450 }
451
452 Ok(MemtableRanges { ranges })
453 }
454
455 fn is_empty(&self) -> bool {
456 let bulk_parts = self.parts.read().unwrap();
457 bulk_parts.is_empty()
458 }
459
460 fn freeze(&self) -> Result<()> {
461 self.alloc_tracker.done_allocating();
462 Ok(())
463 }
464
465 fn stats(&self) -> MemtableStats {
466 let estimated_bytes = self.alloc_tracker.bytes_allocated();
467
468 if estimated_bytes == 0 || self.num_rows.load(Ordering::Relaxed) == 0 {
469 return MemtableStats {
470 estimated_bytes,
471 time_range: None,
472 num_rows: 0,
473 num_ranges: 0,
474 max_sequence: 0,
475 series_count: 0,
476 };
477 }
478
479 let ts_type = self
480 .metadata
481 .time_index_column()
482 .column_schema
483 .data_type
484 .clone()
485 .as_timestamp()
486 .expect("Timestamp column must have timestamp type");
487 let max_timestamp = ts_type.create_timestamp(self.max_timestamp.load(Ordering::Relaxed));
488 let min_timestamp = ts_type.create_timestamp(self.min_timestamp.load(Ordering::Relaxed));
489
490 let num_ranges = self.parts.read().unwrap().num_parts();
491
492 MemtableStats {
493 estimated_bytes,
494 time_range: Some((min_timestamp, max_timestamp)),
495 num_rows: self.num_rows.load(Ordering::Relaxed),
496 num_ranges,
497 max_sequence: self.max_sequence.load(Ordering::Relaxed),
498 series_count: self.estimated_series_count(),
499 }
500 }
501
502 fn fork(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef {
503 let flat_arrow_schema = to_flat_sst_arrow_schema(
505 metadata,
506 &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
507 );
508
509 Arc::new(Self {
510 id,
511 parts: Arc::new(RwLock::new(BulkParts::default())),
512 metadata: metadata.clone(),
513 alloc_tracker: AllocTracker::new(self.alloc_tracker.write_buffer_manager()),
514 max_timestamp: AtomicI64::new(i64::MIN),
515 min_timestamp: AtomicI64::new(i64::MAX),
516 max_sequence: AtomicU64::new(0),
517 num_rows: AtomicUsize::new(0),
518 flat_arrow_schema,
519 compactor: Arc::new(Mutex::new(MemtableCompactor::new(metadata.region_id, id))),
520 compact_dispatcher: self.compact_dispatcher.clone(),
521 append_mode: self.append_mode,
522 merge_mode: self.merge_mode,
523 })
524 }
525
526 fn compact(&self, for_flush: bool) -> Result<()> {
527 let mut compactor = self.compactor.lock().unwrap();
528
529 if for_flush {
530 return Ok(());
531 }
532
533 let should_merge = self.parts.read().unwrap().should_merge_bulk_parts();
535 if should_merge {
536 compactor.merge_bulk_parts(
537 &self.flat_arrow_schema,
538 &self.parts,
539 &self.metadata,
540 !self.append_mode,
541 self.merge_mode,
542 )?;
543 }
544
545 let should_merge = self.parts.read().unwrap().should_merge_encoded_parts();
547 if should_merge {
548 compactor.merge_encoded_parts(
549 &self.flat_arrow_schema,
550 &self.parts,
551 &self.metadata,
552 !self.append_mode,
553 self.merge_mode,
554 )?;
555 }
556
557 Ok(())
558 }
559}
560
561impl BulkMemtable {
562 pub fn new(
564 id: MemtableId,
565 metadata: RegionMetadataRef,
566 write_buffer_manager: Option<WriteBufferManagerRef>,
567 compact_dispatcher: Option<Arc<CompactDispatcher>>,
568 append_mode: bool,
569 merge_mode: MergeMode,
570 ) -> Self {
571 let flat_arrow_schema = to_flat_sst_arrow_schema(
572 &metadata,
573 &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
574 );
575
576 let region_id = metadata.region_id;
577 Self {
578 id,
579 parts: Arc::new(RwLock::new(BulkParts::default())),
580 metadata,
581 alloc_tracker: AllocTracker::new(write_buffer_manager),
582 max_timestamp: AtomicI64::new(i64::MIN),
583 min_timestamp: AtomicI64::new(i64::MAX),
584 max_sequence: AtomicU64::new(0),
585 num_rows: AtomicUsize::new(0),
586 flat_arrow_schema,
587 compactor: Arc::new(Mutex::new(MemtableCompactor::new(region_id, id))),
588 compact_dispatcher,
589 append_mode,
590 merge_mode,
591 }
592 }
593
594 #[cfg(test)]
596 pub fn set_unordered_part_threshold(&self, threshold: usize) {
597 self.parts
598 .write()
599 .unwrap()
600 .unordered_part
601 .set_threshold(threshold);
602 }
603
604 #[cfg(test)]
606 pub fn set_unordered_part_compact_threshold(&self, compact_threshold: usize) {
607 self.parts
608 .write()
609 .unwrap()
610 .unordered_part
611 .set_compact_threshold(compact_threshold);
612 }
613
614 fn update_stats(&self, stats: WriteMetrics) {
618 self.alloc_tracker
619 .on_allocation(stats.key_bytes + stats.value_bytes);
620
621 self.max_timestamp
622 .fetch_max(stats.max_ts, Ordering::Relaxed);
623 self.min_timestamp
624 .fetch_min(stats.min_ts, Ordering::Relaxed);
625 self.max_sequence
626 .fetch_max(stats.max_sequence, Ordering::Relaxed);
627 self.num_rows.fetch_add(stats.num_rows, Ordering::Relaxed);
628 }
629
630 fn estimated_series_count(&self) -> usize {
632 let bulk_parts = self.parts.read().unwrap();
633 bulk_parts
634 .parts
635 .iter()
636 .map(|part_wrapper| part_wrapper.part.estimated_series_count())
637 .sum()
638 }
639
640 fn should_compact(&self) -> bool {
642 let parts = self.parts.read().unwrap();
643 parts.should_merge_bulk_parts() || parts.should_merge_encoded_parts()
644 }
645
646 fn schedule_compact(&self) {
648 if let Some(dispatcher) = &self.compact_dispatcher {
649 let task = MemCompactTask {
650 metadata: self.metadata.clone(),
651 parts: self.parts.clone(),
652 flat_arrow_schema: self.flat_arrow_schema.clone(),
653 compactor: self.compactor.clone(),
654 append_mode: self.append_mode,
655 merge_mode: self.merge_mode,
656 };
657
658 dispatcher.dispatch_compact(task);
659 } else {
660 if let Err(e) = self.compact(false) {
662 common_telemetry::error!(e; "Failed to compact table");
663 }
664 }
665 }
666}
667
668pub struct BulkRangeIterBuilder {
670 pub part: BulkPart,
671 pub context: Arc<BulkIterContext>,
672 pub sequence: Option<SequenceRange>,
673}
674
675impl IterBuilder for BulkRangeIterBuilder {
676 fn build(&self, _metrics: Option<MemScanMetrics>) -> Result<BoxedBatchIterator> {
677 UnsupportedOperationSnafu {
678 err_msg: "BatchIterator is not supported for bulk memtable",
679 }
680 .fail()
681 }
682
683 fn is_record_batch(&self) -> bool {
684 true
685 }
686
687 fn build_record_batch(
688 &self,
689 metrics: Option<MemScanMetrics>,
690 ) -> Result<BoxedRecordBatchIterator> {
691 let series_count = self.part.estimated_series_count();
692 let iter = BulkPartRecordBatchIter::new(
693 self.part.batch.clone(),
694 self.context.clone(),
695 self.sequence,
696 series_count,
697 metrics,
698 );
699
700 Ok(Box::new(iter))
701 }
702
703 fn encoded_range(&self) -> Option<EncodedRange> {
704 None
705 }
706}
707
708struct EncodedBulkRangeIterBuilder {
710 file_id: FileId,
711 part: EncodedBulkPart,
712 context: Arc<BulkIterContext>,
713 sequence: Option<SequenceRange>,
714}
715
716impl IterBuilder for EncodedBulkRangeIterBuilder {
717 fn build(&self, _metrics: Option<MemScanMetrics>) -> Result<BoxedBatchIterator> {
718 UnsupportedOperationSnafu {
719 err_msg: "BatchIterator is not supported for encoded bulk memtable",
720 }
721 .fail()
722 }
723
724 fn is_record_batch(&self) -> bool {
725 true
726 }
727
728 fn build_record_batch(
729 &self,
730 metrics: Option<MemScanMetrics>,
731 ) -> Result<BoxedRecordBatchIterator> {
732 if let Some(iter) = self
733 .part
734 .read(self.context.clone(), self.sequence, metrics)?
735 {
736 Ok(iter)
737 } else {
738 Ok(Box::new(std::iter::empty()))
740 }
741 }
742
743 fn encoded_range(&self) -> Option<EncodedRange> {
744 Some(EncodedRange {
745 data: self.part.data().clone(),
746 sst_info: self.part.to_sst_info(self.file_id),
747 })
748 }
749}
750
751struct BulkPartWrapper {
752 part: BulkPart,
753 file_id: FileId,
755 merging: bool,
757}
758
759struct EncodedPartWrapper {
760 part: EncodedBulkPart,
761 file_id: FileId,
763 merging: bool,
765}
766
767#[derive(Clone)]
769enum PartToMerge {
770 Bulk { part: BulkPart, file_id: FileId },
772 Encoded {
774 part: EncodedBulkPart,
775 file_id: FileId,
776 },
777}
778
779impl PartToMerge {
780 fn file_id(&self) -> FileId {
782 match self {
783 PartToMerge::Bulk { file_id, .. } => *file_id,
784 PartToMerge::Encoded { file_id, .. } => *file_id,
785 }
786 }
787
788 fn min_timestamp(&self) -> i64 {
790 match self {
791 PartToMerge::Bulk { part, .. } => part.min_timestamp,
792 PartToMerge::Encoded { part, .. } => part.metadata().min_timestamp,
793 }
794 }
795
796 fn max_timestamp(&self) -> i64 {
798 match self {
799 PartToMerge::Bulk { part, .. } => part.max_timestamp,
800 PartToMerge::Encoded { part, .. } => part.metadata().max_timestamp,
801 }
802 }
803
804 fn num_rows(&self) -> usize {
806 match self {
807 PartToMerge::Bulk { part, .. } => part.num_rows(),
808 PartToMerge::Encoded { part, .. } => part.metadata().num_rows,
809 }
810 }
811
812 fn max_sequence(&self) -> u64 {
814 match self {
815 PartToMerge::Bulk { part, .. } => part.sequence,
816 PartToMerge::Encoded { part, .. } => part.metadata().max_sequence,
817 }
818 }
819
820 fn create_iterator(
822 self,
823 context: Arc<BulkIterContext>,
824 ) -> Result<Option<BoxedRecordBatchIterator>> {
825 match self {
826 PartToMerge::Bulk { part, .. } => {
827 let series_count = part.estimated_series_count();
828 let iter = BulkPartRecordBatchIter::new(
829 part.batch,
830 context,
831 None, series_count,
833 None, );
835 Ok(Some(Box::new(iter) as BoxedRecordBatchIterator))
836 }
837 PartToMerge::Encoded { part, .. } => part.read(context, None, None),
838 }
839 }
840}
841
842struct MemtableCompactor {
843 region_id: RegionId,
844 memtable_id: MemtableId,
845}
846
847impl MemtableCompactor {
848 fn new(region_id: RegionId, memtable_id: MemtableId) -> Self {
850 Self {
851 region_id,
852 memtable_id,
853 }
854 }
855
856 fn merge_bulk_parts(
858 &mut self,
859 arrow_schema: &SchemaRef,
860 bulk_parts: &RwLock<BulkParts>,
861 metadata: &RegionMetadataRef,
862 dedup: bool,
863 merge_mode: MergeMode,
864 ) -> Result<()> {
865 let start = Instant::now();
866
867 let parts_to_merge = bulk_parts.write().unwrap().collect_bulk_parts_to_merge();
869 if parts_to_merge.is_empty() {
870 return Ok(());
871 }
872
873 let merged_file_ids: HashSet<FileId> =
874 parts_to_merge.iter().map(|part| part.file_id()).collect();
875 let mut guard = MergingFlagsGuard::new(bulk_parts, &merged_file_ids, false);
876
877 let mut sorted_parts = parts_to_merge;
879 sorted_parts.sort_unstable_by_key(|part| part.num_rows());
880
881 let part_groups: Vec<Vec<PartToMerge>> = sorted_parts
883 .chunks(16)
884 .map(|chunk| chunk.to_vec())
885 .collect();
886
887 let total_groups = part_groups.len();
888 let total_parts_to_merge: usize = part_groups.iter().map(|group| group.len()).sum();
889 let merged_parts = part_groups
890 .into_par_iter()
891 .map(|group| Self::merge_parts_group(group, arrow_schema, metadata, dedup, merge_mode))
892 .collect::<Result<Vec<Option<EncodedBulkPart>>>>()?;
893
894 let total_output_rows = {
896 let mut parts = bulk_parts.write().unwrap();
897 parts.install_merged_parts(merged_parts.into_iter().flatten(), &merged_file_ids, false)
898 };
899
900 guard.mark_success();
901
902 common_telemetry::debug!(
903 "BulkMemtable {} {} concurrent compact {} groups, {} bulk parts, {} rows, cost: {:?}",
904 self.region_id,
905 self.memtable_id,
906 total_groups,
907 total_parts_to_merge,
908 total_output_rows,
909 start.elapsed()
910 );
911
912 Ok(())
913 }
914
915 fn merge_encoded_parts(
917 &mut self,
918 arrow_schema: &SchemaRef,
919 bulk_parts: &RwLock<BulkParts>,
920 metadata: &RegionMetadataRef,
921 dedup: bool,
922 merge_mode: MergeMode,
923 ) -> Result<()> {
924 let start = Instant::now();
925
926 let parts_to_merge = {
928 let mut parts = bulk_parts.write().unwrap();
929 parts.collect_encoded_parts_to_merge()
930 };
931
932 if parts_to_merge.is_empty() {
933 return Ok(());
934 }
935
936 let merged_file_ids: HashSet<FileId> =
937 parts_to_merge.iter().map(|part| part.file_id()).collect();
938 let mut guard = MergingFlagsGuard::new(bulk_parts, &merged_file_ids, true);
939
940 if parts_to_merge.len() == 1 {
941 return Ok(());
943 }
944
945 let part_groups: Vec<Vec<PartToMerge>> = parts_to_merge
947 .chunks(16)
948 .map(|chunk| chunk.to_vec())
949 .collect();
950
951 let total_groups = part_groups.len();
952 let total_parts_to_merge: usize = part_groups.iter().map(|group| group.len()).sum();
953
954 let merged_parts = part_groups
955 .into_par_iter()
956 .map(|group| Self::merge_parts_group(group, arrow_schema, metadata, dedup, merge_mode))
957 .collect::<Result<Vec<Option<EncodedBulkPart>>>>()?;
958
959 let total_output_rows = {
961 let mut parts = bulk_parts.write().unwrap();
962 parts.install_merged_parts(merged_parts.into_iter().flatten(), &merged_file_ids, true)
963 };
964
965 guard.mark_success();
967
968 common_telemetry::debug!(
969 "BulkMemtable {} {} concurrent compact {} groups, {} encoded parts, {} rows, cost: {:?}",
970 self.region_id,
971 self.memtable_id,
972 total_groups,
973 total_parts_to_merge,
974 total_output_rows,
975 start.elapsed()
976 );
977
978 Ok(())
979 }
980
981 fn merge_parts_group(
983 parts_to_merge: Vec<PartToMerge>,
984 arrow_schema: &SchemaRef,
985 metadata: &RegionMetadataRef,
986 dedup: bool,
987 merge_mode: MergeMode,
988 ) -> Result<Option<EncodedBulkPart>> {
989 if parts_to_merge.is_empty() {
990 return Ok(None);
991 }
992
993 let min_timestamp = parts_to_merge
995 .iter()
996 .map(|p| p.min_timestamp())
997 .min()
998 .unwrap_or(i64::MAX);
999 let max_timestamp = parts_to_merge
1000 .iter()
1001 .map(|p| p.max_timestamp())
1002 .max()
1003 .unwrap_or(i64::MIN);
1004 let max_sequence = parts_to_merge
1005 .iter()
1006 .map(|p| p.max_sequence())
1007 .max()
1008 .unwrap_or(0);
1009
1010 let context = Arc::new(BulkIterContext::new(
1011 metadata.clone(),
1012 None, None, true,
1015 )?);
1016
1017 let iterators: Vec<BoxedRecordBatchIterator> = parts_to_merge
1019 .into_iter()
1020 .filter_map(|part| part.create_iterator(context.clone()).ok().flatten())
1021 .collect();
1022
1023 if iterators.is_empty() {
1024 return Ok(None);
1025 }
1026
1027 let merged_iter =
1028 FlatMergeIterator::new(arrow_schema.clone(), iterators, DEFAULT_READ_BATCH_SIZE)?;
1029
1030 let boxed_iter: BoxedRecordBatchIterator = if dedup {
1031 match merge_mode {
1033 MergeMode::LastRow => {
1034 let dedup_iter = FlatDedupIterator::new(merged_iter, FlatLastRow::new(false));
1035 Box::new(dedup_iter)
1036 }
1037 MergeMode::LastNonNull => {
1038 let field_column_count =
1041 metadata.column_metadatas.len() - 1 - metadata.primary_key.len();
1042 let total_columns = arrow_schema.fields().len();
1043 let field_column_start =
1044 total_columns - FIXED_POS_COLUMN_NUM - field_column_count;
1045
1046 let dedup_iter = FlatDedupIterator::new(
1047 merged_iter,
1048 FlatLastNonNull::new(field_column_start, false),
1049 );
1050 Box::new(dedup_iter)
1051 }
1052 }
1053 } else {
1054 Box::new(merged_iter)
1055 };
1056
1057 let encoder = BulkPartEncoder::new(metadata.clone(), DEFAULT_ROW_GROUP_SIZE)?;
1059 let mut metrics = BulkPartEncodeMetrics::default();
1060 let encoded_part = encoder.encode_record_batch_iter(
1061 boxed_iter,
1062 arrow_schema.clone(),
1063 min_timestamp,
1064 max_timestamp,
1065 max_sequence,
1066 &mut metrics,
1067 )?;
1068
1069 common_telemetry::trace!("merge_parts_group metrics: {:?}", metrics);
1070
1071 Ok(encoded_part)
1072 }
1073}
1074
1075struct MemCompactTask {
1077 metadata: RegionMetadataRef,
1078 parts: Arc<RwLock<BulkParts>>,
1079
1080 flat_arrow_schema: SchemaRef,
1082 compactor: Arc<Mutex<MemtableCompactor>>,
1084 append_mode: bool,
1086 merge_mode: MergeMode,
1088}
1089
1090impl MemCompactTask {
1091 fn compact(&self) -> Result<()> {
1092 let mut compactor = self.compactor.lock().unwrap();
1093
1094 let should_merge = self.parts.read().unwrap().should_merge_bulk_parts();
1096 if should_merge {
1097 compactor.merge_bulk_parts(
1098 &self.flat_arrow_schema,
1099 &self.parts,
1100 &self.metadata,
1101 !self.append_mode,
1102 self.merge_mode,
1103 )?;
1104 }
1105
1106 let should_merge = self.parts.read().unwrap().should_merge_encoded_parts();
1108 if should_merge {
1109 compactor.merge_encoded_parts(
1110 &self.flat_arrow_schema,
1111 &self.parts,
1112 &self.metadata,
1113 !self.append_mode,
1114 self.merge_mode,
1115 )?;
1116 }
1117
1118 Ok(())
1119 }
1120}
1121
1122#[derive(Debug)]
1124pub struct CompactDispatcher {
1125 semaphore: Arc<Semaphore>,
1126}
1127
1128impl CompactDispatcher {
1129 pub fn new(permits: usize) -> Self {
1131 Self {
1132 semaphore: Arc::new(Semaphore::new(permits)),
1133 }
1134 }
1135
1136 fn dispatch_compact(&self, task: MemCompactTask) {
1138 let semaphore = self.semaphore.clone();
1139 common_runtime::spawn_global(async move {
1140 let Ok(_permit) = semaphore.acquire().await else {
1141 return;
1142 };
1143
1144 common_runtime::spawn_blocking_global(move || {
1145 if let Err(e) = task.compact() {
1146 common_telemetry::error!(e; "Failed to compact memtable, region: {}", task.metadata.region_id);
1147 }
1148 });
1149 });
1150 }
1151}
1152
1153#[derive(Debug, Default)]
1155pub struct BulkMemtableBuilder {
1156 write_buffer_manager: Option<WriteBufferManagerRef>,
1157 compact_dispatcher: Option<Arc<CompactDispatcher>>,
1158 append_mode: bool,
1159 merge_mode: MergeMode,
1160}
1161
1162impl BulkMemtableBuilder {
1163 pub fn new(
1165 write_buffer_manager: Option<WriteBufferManagerRef>,
1166 append_mode: bool,
1167 merge_mode: MergeMode,
1168 ) -> Self {
1169 Self {
1170 write_buffer_manager,
1171 compact_dispatcher: None,
1172 append_mode,
1173 merge_mode,
1174 }
1175 }
1176
1177 pub fn with_compact_dispatcher(mut self, compact_dispatcher: Arc<CompactDispatcher>) -> Self {
1179 self.compact_dispatcher = Some(compact_dispatcher);
1180 self
1181 }
1182}
1183
1184impl MemtableBuilder for BulkMemtableBuilder {
1185 fn build(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef {
1186 Arc::new(BulkMemtable::new(
1187 id,
1188 metadata.clone(),
1189 self.write_buffer_manager.clone(),
1190 self.compact_dispatcher.clone(),
1191 self.append_mode,
1192 self.merge_mode,
1193 ))
1194 }
1195
1196 fn use_bulk_insert(&self, _metadata: &RegionMetadataRef) -> bool {
1197 true
1198 }
1199}
1200
1201#[cfg(test)]
1202mod tests {
1203 use mito_codec::row_converter::build_primary_key_codec;
1204
1205 use super::*;
1206 use crate::memtable::bulk::part::BulkPartConverter;
1207 use crate::read::scan_region::PredicateGroup;
1208 use crate::sst::{FlatSchemaOptions, to_flat_sst_arrow_schema};
1209 use crate::test_util::memtable_util::{build_key_values_with_ts_seq_values, metadata_for_test};
1210
1211 fn create_bulk_part_with_converter(
1212 k0: &str,
1213 k1: u32,
1214 timestamps: Vec<i64>,
1215 values: Vec<Option<f64>>,
1216 sequence: u64,
1217 ) -> Result<BulkPart> {
1218 let metadata = metadata_for_test();
1219 let capacity = 100;
1220 let primary_key_codec = build_primary_key_codec(&metadata);
1221 let schema = to_flat_sst_arrow_schema(
1222 &metadata,
1223 &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
1224 );
1225
1226 let mut converter =
1227 BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec, true);
1228
1229 let key_values = build_key_values_with_ts_seq_values(
1230 &metadata,
1231 k0.to_string(),
1232 k1,
1233 timestamps.into_iter(),
1234 values.into_iter(),
1235 sequence,
1236 );
1237
1238 converter.append_key_values(&key_values)?;
1239 converter.convert()
1240 }
1241
1242 #[test]
1243 fn test_bulk_memtable_write_read() {
1244 let metadata = metadata_for_test();
1245 let memtable =
1246 BulkMemtable::new(999, metadata.clone(), None, None, false, MergeMode::LastRow);
1247 memtable.set_unordered_part_threshold(0);
1249
1250 let test_data = [
1251 (
1252 "key_a",
1253 1u32,
1254 vec![1000i64, 2000i64],
1255 vec![Some(10.5), Some(20.5)],
1256 100u64,
1257 ),
1258 (
1259 "key_b",
1260 2u32,
1261 vec![1500i64, 2500i64],
1262 vec![Some(15.5), Some(25.5)],
1263 200u64,
1264 ),
1265 ("key_c", 3u32, vec![3000i64], vec![Some(30.5)], 300u64),
1266 ];
1267
1268 for (k0, k1, timestamps, values, seq) in test_data.iter() {
1269 let part =
1270 create_bulk_part_with_converter(k0, *k1, timestamps.clone(), values.clone(), *seq)
1271 .unwrap();
1272 memtable.write_bulk(part).unwrap();
1273 }
1274
1275 let stats = memtable.stats();
1276 assert_eq!(5, stats.num_rows);
1277 assert_eq!(3, stats.num_ranges);
1278 assert_eq!(300, stats.max_sequence);
1279
1280 let (min_ts, max_ts) = stats.time_range.unwrap();
1281 assert_eq!(1000, min_ts.value());
1282 assert_eq!(3000, max_ts.value());
1283
1284 let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
1285 let ranges = memtable
1286 .ranges(
1287 None,
1288 RangesOptions::default().with_predicate(predicate_group),
1289 )
1290 .unwrap();
1291
1292 assert_eq!(3, ranges.ranges.len());
1293 let total_rows: usize = ranges.ranges.values().map(|r| r.stats().num_rows()).sum();
1294 assert_eq!(5, total_rows);
1295
1296 for (_range_id, range) in ranges.ranges.iter() {
1297 assert!(range.num_rows() > 0);
1298 assert!(range.is_record_batch());
1299
1300 let record_batch_iter = range.build_record_batch_iter(None).unwrap();
1301
1302 let mut total_rows = 0;
1303 for batch_result in record_batch_iter {
1304 let batch = batch_result.unwrap();
1305 total_rows += batch.num_rows();
1306 assert!(batch.num_rows() > 0);
1307 assert_eq!(8, batch.num_columns());
1308 }
1309 assert_eq!(total_rows, range.num_rows());
1310 }
1311 }
1312
1313 #[test]
1314 fn test_bulk_memtable_ranges_with_projection() {
1315 let metadata = metadata_for_test();
1316 let memtable =
1317 BulkMemtable::new(111, metadata.clone(), None, None, false, MergeMode::LastRow);
1318
1319 let bulk_part = create_bulk_part_with_converter(
1320 "projection_test",
1321 5,
1322 vec![5000, 6000, 7000],
1323 vec![Some(50.0), Some(60.0), Some(70.0)],
1324 500,
1325 )
1326 .unwrap();
1327
1328 memtable.write_bulk(bulk_part).unwrap();
1329
1330 let projection = vec![4u32];
1331 let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
1332 let ranges = memtable
1333 .ranges(
1334 Some(&projection),
1335 RangesOptions::default().with_predicate(predicate_group),
1336 )
1337 .unwrap();
1338
1339 assert_eq!(1, ranges.ranges.len());
1340 let range = ranges.ranges.get(&0).unwrap();
1341
1342 assert!(range.is_record_batch());
1343 let record_batch_iter = range.build_record_batch_iter(None).unwrap();
1344
1345 let mut total_rows = 0;
1346 for batch_result in record_batch_iter {
1347 let batch = batch_result.unwrap();
1348 assert!(batch.num_rows() > 0);
1349 assert_eq!(5, batch.num_columns());
1350 total_rows += batch.num_rows();
1351 }
1352 assert_eq!(3, total_rows);
1353 }
1354
1355 #[test]
1356 fn test_bulk_memtable_unsupported_operations() {
1357 let metadata = metadata_for_test();
1358 let memtable =
1359 BulkMemtable::new(111, metadata.clone(), None, None, false, MergeMode::LastRow);
1360
1361 let key_values = build_key_values_with_ts_seq_values(
1362 &metadata,
1363 "test".to_string(),
1364 1,
1365 vec![1000].into_iter(),
1366 vec![Some(1.0)].into_iter(),
1367 1,
1368 );
1369
1370 let err = memtable.write(&key_values).unwrap_err();
1371 assert!(err.to_string().contains("not supported"));
1372
1373 let kv = key_values.iter().next().unwrap();
1374 let err = memtable.write_one(kv).unwrap_err();
1375 assert!(err.to_string().contains("not supported"));
1376 }
1377
1378 #[test]
1379 fn test_bulk_memtable_freeze() {
1380 let metadata = metadata_for_test();
1381 let memtable =
1382 BulkMemtable::new(222, metadata.clone(), None, None, false, MergeMode::LastRow);
1383
1384 let bulk_part = create_bulk_part_with_converter(
1385 "freeze_test",
1386 10,
1387 vec![10000],
1388 vec![Some(100.0)],
1389 1000,
1390 )
1391 .unwrap();
1392
1393 memtable.write_bulk(bulk_part).unwrap();
1394 memtable.freeze().unwrap();
1395
1396 let stats_after_freeze = memtable.stats();
1397 assert_eq!(1, stats_after_freeze.num_rows);
1398 }
1399
1400 #[test]
1401 fn test_bulk_memtable_fork() {
1402 let metadata = metadata_for_test();
1403 let original_memtable =
1404 BulkMemtable::new(333, metadata.clone(), None, None, false, MergeMode::LastRow);
1405
1406 let bulk_part =
1407 create_bulk_part_with_converter("fork_test", 15, vec![15000], vec![Some(150.0)], 1500)
1408 .unwrap();
1409
1410 original_memtable.write_bulk(bulk_part).unwrap();
1411
1412 let forked_memtable = original_memtable.fork(444, &metadata);
1413
1414 assert_eq!(forked_memtable.id(), 444);
1415 assert!(forked_memtable.is_empty());
1416 assert_eq!(0, forked_memtable.stats().num_rows);
1417
1418 assert!(!original_memtable.is_empty());
1419 assert_eq!(1, original_memtable.stats().num_rows);
1420 }
1421
1422 #[test]
1423 fn test_bulk_memtable_ranges_multiple_parts() {
1424 let metadata = metadata_for_test();
1425 let memtable =
1426 BulkMemtable::new(777, metadata.clone(), None, None, false, MergeMode::LastRow);
1427 memtable.set_unordered_part_threshold(0);
1429
1430 let parts_data = vec![
1431 (
1432 "part1",
1433 1u32,
1434 vec![1000i64, 1100i64],
1435 vec![Some(10.0), Some(11.0)],
1436 100u64,
1437 ),
1438 (
1439 "part2",
1440 2u32,
1441 vec![2000i64, 2100i64],
1442 vec![Some(20.0), Some(21.0)],
1443 200u64,
1444 ),
1445 ("part3", 3u32, vec![3000i64], vec![Some(30.0)], 300u64),
1446 ];
1447
1448 for (k0, k1, timestamps, values, seq) in parts_data {
1449 let part = create_bulk_part_with_converter(k0, k1, timestamps, values, seq).unwrap();
1450 memtable.write_bulk(part).unwrap();
1451 }
1452
1453 let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
1454 let ranges = memtable
1455 .ranges(
1456 None,
1457 RangesOptions::default().with_predicate(predicate_group),
1458 )
1459 .unwrap();
1460
1461 assert_eq!(3, ranges.ranges.len());
1462 let total_rows: usize = ranges.ranges.values().map(|r| r.stats().num_rows()).sum();
1463 assert_eq!(5, total_rows);
1464 assert_eq!(3, ranges.ranges.len());
1465
1466 for (range_id, range) in ranges.ranges.iter() {
1467 assert!(*range_id < 3);
1468 assert!(range.num_rows() > 0);
1469 assert!(range.is_record_batch());
1470 }
1471 }
1472
1473 #[test]
1474 fn test_bulk_memtable_ranges_with_sequence_filter() {
1475 let metadata = metadata_for_test();
1476 let memtable =
1477 BulkMemtable::new(888, metadata.clone(), None, None, false, MergeMode::LastRow);
1478
1479 let part = create_bulk_part_with_converter(
1480 "seq_test",
1481 1,
1482 vec![1000, 2000, 3000],
1483 vec![Some(10.0), Some(20.0), Some(30.0)],
1484 500,
1485 )
1486 .unwrap();
1487
1488 memtable.write_bulk(part).unwrap();
1489
1490 let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
1491 let sequence_filter = Some(SequenceRange::LtEq { max: 400 }); let ranges = memtable
1493 .ranges(
1494 None,
1495 RangesOptions::default()
1496 .with_predicate(predicate_group)
1497 .with_sequence(sequence_filter),
1498 )
1499 .unwrap();
1500
1501 assert_eq!(1, ranges.ranges.len());
1502 let range = ranges.ranges.get(&0).unwrap();
1503
1504 let mut record_batch_iter = range.build_record_batch_iter(None).unwrap();
1505 assert!(record_batch_iter.next().is_none());
1506 }
1507
1508 #[test]
1509 fn test_bulk_memtable_ranges_with_encoded_parts() {
1510 let metadata = metadata_for_test();
1511 let memtable =
1512 BulkMemtable::new(999, metadata.clone(), None, None, false, MergeMode::LastRow);
1513 memtable.set_unordered_part_threshold(0);
1515
1516 for i in 0..10 {
1518 let part = create_bulk_part_with_converter(
1519 &format!("key_{}", i),
1520 i,
1521 vec![1000 + i as i64 * 100],
1522 vec![Some(i as f64 * 10.0)],
1523 100 + i as u64,
1524 )
1525 .unwrap();
1526 memtable.write_bulk(part).unwrap();
1527 }
1528
1529 memtable.compact(false).unwrap();
1530
1531 let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
1532 let ranges = memtable
1533 .ranges(
1534 None,
1535 RangesOptions::default().with_predicate(predicate_group),
1536 )
1537 .unwrap();
1538
1539 assert_eq!(3, ranges.ranges.len());
1541 let total_rows: usize = ranges.ranges.values().map(|r| r.stats().num_rows()).sum();
1542 assert_eq!(10, total_rows);
1543
1544 for (_range_id, range) in ranges.ranges.iter() {
1545 assert!(range.num_rows() > 0);
1546 assert!(range.is_record_batch());
1547
1548 let record_batch_iter = range.build_record_batch_iter(None).unwrap();
1549 let mut total_rows = 0;
1550 for batch_result in record_batch_iter {
1551 let batch = batch_result.unwrap();
1552 total_rows += batch.num_rows();
1553 assert!(batch.num_rows() > 0);
1554 }
1555 assert_eq!(total_rows, range.num_rows());
1556 }
1557 }
1558
1559 #[test]
1560 fn test_bulk_memtable_unordered_part() {
1561 let metadata = metadata_for_test();
1562 let memtable = BulkMemtable::new(
1563 1001,
1564 metadata.clone(),
1565 None,
1566 None,
1567 false,
1568 MergeMode::LastRow,
1569 );
1570
1571 memtable.set_unordered_part_threshold(5);
1574 memtable.set_unordered_part_compact_threshold(10);
1576
1577 for i in 0..3 {
1579 let part = create_bulk_part_with_converter(
1580 &format!("key_{}", i),
1581 i,
1582 vec![1000 + i as i64 * 100, 1100 + i as i64 * 100],
1583 vec![Some(i as f64 * 10.0), Some(i as f64 * 10.0 + 1.0)],
1584 100 + i as u64,
1585 )
1586 .unwrap();
1587 assert_eq!(2, part.num_rows());
1588 memtable.write_bulk(part).unwrap();
1589 }
1590
1591 let stats = memtable.stats();
1593 assert_eq!(6, stats.num_rows);
1594
1595 for i in 3..5 {
1598 let part = create_bulk_part_with_converter(
1599 &format!("key_{}", i),
1600 i,
1601 vec![1000 + i as i64 * 100, 1100 + i as i64 * 100],
1602 vec![Some(i as f64 * 10.0), Some(i as f64 * 10.0 + 1.0)],
1603 100 + i as u64,
1604 )
1605 .unwrap();
1606 memtable.write_bulk(part).unwrap();
1607 }
1608
1609 let stats = memtable.stats();
1611 assert_eq!(10, stats.num_rows);
1612
1613 let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
1615 let ranges = memtable
1616 .ranges(
1617 None,
1618 RangesOptions::default().with_predicate(predicate_group),
1619 )
1620 .unwrap();
1621
1622 assert!(!ranges.ranges.is_empty());
1624 let total_rows: usize = ranges.ranges.values().map(|r| r.stats().num_rows()).sum();
1625 assert_eq!(10, total_rows);
1626
1627 let mut total_rows_read = 0;
1629 for (_range_id, range) in ranges.ranges.iter() {
1630 assert!(range.is_record_batch());
1631 let record_batch_iter = range.build_record_batch_iter(None).unwrap();
1632
1633 for batch_result in record_batch_iter {
1634 let batch = batch_result.unwrap();
1635 total_rows_read += batch.num_rows();
1636 }
1637 }
1638 assert_eq!(10, total_rows_read);
1639 }
1640
1641 #[test]
1642 fn test_bulk_memtable_unordered_part_mixed_sizes() {
1643 let metadata = metadata_for_test();
1644 let memtable = BulkMemtable::new(
1645 1002,
1646 metadata.clone(),
1647 None,
1648 None,
1649 false,
1650 MergeMode::LastRow,
1651 );
1652
1653 memtable.set_unordered_part_threshold(4);
1655 memtable.set_unordered_part_compact_threshold(8);
1656
1657 for i in 0..2 {
1659 let part = create_bulk_part_with_converter(
1660 &format!("small_{}", i),
1661 i,
1662 vec![1000 + i as i64, 2000 + i as i64, 3000 + i as i64],
1663 vec![Some(i as f64), Some(i as f64 + 1.0), Some(i as f64 + 2.0)],
1664 10 + i as u64,
1665 )
1666 .unwrap();
1667 assert_eq!(3, part.num_rows());
1668 memtable.write_bulk(part).unwrap();
1669 }
1670
1671 let large_part = create_bulk_part_with_converter(
1673 "large_key",
1674 100,
1675 vec![5000, 6000, 7000, 8000, 9000],
1676 vec![
1677 Some(100.0),
1678 Some(101.0),
1679 Some(102.0),
1680 Some(103.0),
1681 Some(104.0),
1682 ],
1683 50,
1684 )
1685 .unwrap();
1686 assert_eq!(5, large_part.num_rows());
1687 memtable.write_bulk(large_part).unwrap();
1688
1689 let part = create_bulk_part_with_converter(
1691 "small_2",
1692 2,
1693 vec![4000, 4100],
1694 vec![Some(20.0), Some(21.0)],
1695 30,
1696 )
1697 .unwrap();
1698 memtable.write_bulk(part).unwrap();
1699
1700 let stats = memtable.stats();
1701 assert_eq!(13, stats.num_rows); let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
1705 let ranges = memtable
1706 .ranges(
1707 None,
1708 RangesOptions::default().with_predicate(predicate_group),
1709 )
1710 .unwrap();
1711
1712 let total_rows: usize = ranges.ranges.values().map(|r| r.stats().num_rows()).sum();
1713 assert_eq!(13, total_rows);
1714
1715 let mut total_rows_read = 0;
1716 for (_range_id, range) in ranges.ranges.iter() {
1717 let record_batch_iter = range.build_record_batch_iter(None).unwrap();
1718 for batch_result in record_batch_iter {
1719 let batch = batch_result.unwrap();
1720 total_rows_read += batch.num_rows();
1721 }
1722 }
1723 assert_eq!(13, total_rows_read);
1724 }
1725
1726 #[test]
1727 fn test_bulk_memtable_unordered_part_with_ranges() {
1728 let metadata = metadata_for_test();
1729 let memtable = BulkMemtable::new(
1730 1003,
1731 metadata.clone(),
1732 None,
1733 None,
1734 false,
1735 MergeMode::LastRow,
1736 );
1737
1738 memtable.set_unordered_part_threshold(3);
1740 memtable.set_unordered_part_compact_threshold(100); for i in 0..3 {
1744 let part = create_bulk_part_with_converter(
1745 &format!("key_{}", i),
1746 i,
1747 vec![1000 + i as i64 * 100],
1748 vec![Some(i as f64 * 10.0)],
1749 100 + i as u64,
1750 )
1751 .unwrap();
1752 assert_eq!(1, part.num_rows());
1753 memtable.write_bulk(part).unwrap();
1754 }
1755
1756 let stats = memtable.stats();
1757 assert_eq!(3, stats.num_rows);
1758
1759 let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
1761 let ranges = memtable
1762 .ranges(
1763 None,
1764 RangesOptions::default().with_predicate(predicate_group),
1765 )
1766 .unwrap();
1767
1768 assert_eq!(1, ranges.ranges.len());
1770 let total_rows: usize = ranges.ranges.values().map(|r| r.stats().num_rows()).sum();
1771 assert_eq!(3, total_rows);
1772
1773 let range = ranges.ranges.get(&0).unwrap();
1775 let record_batch_iter = range.build_record_batch_iter(None).unwrap();
1776
1777 let mut total_rows = 0;
1778 for batch_result in record_batch_iter {
1779 let batch = batch_result.unwrap();
1780 total_rows += batch.num_rows();
1781 assert!(batch.num_rows() > 0);
1783 }
1784 assert_eq!(3, total_rows);
1785 }
1786}