1#[allow(unused)]
18pub mod context;
19#[allow(unused)]
20pub mod part;
21pub mod part_reader;
22mod row_group_reader;
23
24use std::collections::{BTreeMap, HashSet};
25use std::sync::atomic::{AtomicI64, AtomicU64, AtomicUsize, Ordering};
26use std::sync::{Arc, Mutex, RwLock};
27use std::time::Instant;
28
29use datatypes::arrow::datatypes::SchemaRef;
30use mito_codec::key_values::KeyValue;
31use rayon::prelude::*;
32use store_api::metadata::RegionMetadataRef;
33use store_api::storage::{ColumnId, FileId, RegionId, SequenceRange};
34use tokio::sync::Semaphore;
35
36use crate::error::{Result, UnsupportedOperationSnafu};
37use crate::flush::WriteBufferManagerRef;
38use crate::memtable::bulk::context::BulkIterContext;
39use crate::memtable::bulk::part::{
40 BulkPart, BulkPartEncodeMetrics, BulkPartEncoder, UnorderedPart,
41};
42use crate::memtable::bulk::part_reader::BulkPartRecordBatchIter;
43use crate::memtable::stats::WriteMetrics;
44use crate::memtable::{
45 AllocTracker, BoxedBatchIterator, BoxedRecordBatchIterator, EncodedBulkPart, EncodedRange,
46 IterBuilder, KeyValues, MemScanMetrics, Memtable, MemtableBuilder, MemtableId, MemtableRange,
47 MemtableRangeContext, MemtableRanges, MemtableRef, MemtableStats, RangesOptions,
48};
49use crate::read::flat_dedup::{FlatDedupIterator, FlatLastNonNull, FlatLastRow};
50use crate::read::flat_merge::FlatMergeIterator;
51use crate::region::options::MergeMode;
52use crate::sst::parquet::format::FIXED_POS_COLUMN_NUM;
53use crate::sst::parquet::{DEFAULT_READ_BATCH_SIZE, DEFAULT_ROW_GROUP_SIZE};
54use crate::sst::{FlatSchemaOptions, to_flat_sst_arrow_schema};
55
56#[derive(Default)]
58struct BulkParts {
59 unordered_part: UnorderedPart,
61 parts: Vec<BulkPartWrapper>,
63 encoded_parts: Vec<EncodedPartWrapper>,
65}
66
67impl BulkParts {
68 fn num_parts(&self) -> usize {
70 let unordered_count = if self.unordered_part.is_empty() { 0 } else { 1 };
71 self.parts.len() + self.encoded_parts.len() + unordered_count
72 }
73
74 fn is_empty(&self) -> bool {
76 self.unordered_part.is_empty() && self.parts.is_empty() && self.encoded_parts.is_empty()
77 }
78
79 fn should_merge_bulk_parts(&self) -> bool {
81 let unmerged_count = self.parts.iter().filter(|wrapper| !wrapper.merging).count();
82 unmerged_count >= 8
84 }
85
86 fn should_merge_encoded_parts(&self) -> bool {
88 let unmerged_count = self
89 .encoded_parts
90 .iter()
91 .filter(|wrapper| !wrapper.merging)
92 .count();
93 unmerged_count >= 8
95 }
96
97 fn should_compact_unordered_part(&self) -> bool {
99 self.unordered_part.should_compact()
100 }
101
102 fn collect_bulk_parts_to_merge(&mut self) -> Vec<PartToMerge> {
105 let mut collected_parts = Vec::new();
106
107 for wrapper in &mut self.parts {
108 if !wrapper.merging {
109 wrapper.merging = true;
110 collected_parts.push(PartToMerge::Bulk {
111 part: wrapper.part.clone(),
112 file_id: wrapper.file_id,
113 });
114 }
115 }
116 collected_parts
117 }
118
119 fn collect_encoded_parts_to_merge(&mut self) -> Vec<PartToMerge> {
122 let min_size = self
124 .encoded_parts
125 .iter()
126 .filter(|wrapper| !wrapper.merging)
127 .map(|wrapper| wrapper.part.size_bytes())
128 .min();
129
130 let Some(min_size) = min_size else {
131 return Vec::new();
132 };
133
134 let max_allowed_size = min_size.saturating_mul(16).min(4 * 1024 * 1024);
135 let mut collected_parts = Vec::new();
136
137 for wrapper in &mut self.encoded_parts {
138 if !wrapper.merging {
139 let size = wrapper.part.size_bytes();
140 if size <= max_allowed_size {
141 wrapper.merging = true;
142 collected_parts.push(PartToMerge::Encoded {
143 part: wrapper.part.clone(),
144 file_id: wrapper.file_id,
145 });
146 }
147 }
148 }
149 collected_parts
150 }
151
152 fn install_merged_parts<I>(
155 &mut self,
156 merged_parts: I,
157 merged_file_ids: &HashSet<FileId>,
158 merge_encoded: bool,
159 ) -> usize
160 where
161 I: IntoIterator<Item = EncodedBulkPart>,
162 {
163 let mut total_output_rows = 0;
164
165 for encoded_part in merged_parts {
166 total_output_rows += encoded_part.metadata().num_rows;
167 self.encoded_parts.push(EncodedPartWrapper {
168 part: encoded_part,
169 file_id: FileId::random(),
170 merging: false,
171 });
172 }
173
174 if merge_encoded {
175 self.encoded_parts
176 .retain(|wrapper| !merged_file_ids.contains(&wrapper.file_id));
177 } else {
178 self.parts
179 .retain(|wrapper| !merged_file_ids.contains(&wrapper.file_id));
180 }
181
182 total_output_rows
183 }
184
185 fn reset_merging_flags(&mut self, file_ids: &HashSet<FileId>, merge_encoded: bool) {
188 if merge_encoded {
189 for wrapper in &mut self.encoded_parts {
190 if file_ids.contains(&wrapper.file_id) {
191 wrapper.merging = false;
192 }
193 }
194 } else {
195 for wrapper in &mut self.parts {
196 if file_ids.contains(&wrapper.file_id) {
197 wrapper.merging = false;
198 }
199 }
200 }
201 }
202}
203
204struct MergingFlagsGuard<'a> {
207 bulk_parts: &'a RwLock<BulkParts>,
208 file_ids: &'a HashSet<FileId>,
209 merge_encoded: bool,
210 success: bool,
211}
212
213impl<'a> MergingFlagsGuard<'a> {
214 fn new(
216 bulk_parts: &'a RwLock<BulkParts>,
217 file_ids: &'a HashSet<FileId>,
218 merge_encoded: bool,
219 ) -> Self {
220 Self {
221 bulk_parts,
222 file_ids,
223 merge_encoded,
224 success: false,
225 }
226 }
227
228 fn mark_success(&mut self) {
231 self.success = true;
232 }
233}
234
235impl<'a> Drop for MergingFlagsGuard<'a> {
236 fn drop(&mut self) {
237 if !self.success
238 && let Ok(mut parts) = self.bulk_parts.write()
239 {
240 parts.reset_merging_flags(self.file_ids, self.merge_encoded);
241 }
242 }
243}
244
245pub struct BulkMemtable {
247 id: MemtableId,
248 parts: Arc<RwLock<BulkParts>>,
249 metadata: RegionMetadataRef,
250 alloc_tracker: AllocTracker,
251 max_timestamp: AtomicI64,
252 min_timestamp: AtomicI64,
253 max_sequence: AtomicU64,
254 num_rows: AtomicUsize,
255 flat_arrow_schema: SchemaRef,
257 compactor: Arc<Mutex<MemtableCompactor>>,
259 compact_dispatcher: Option<Arc<CompactDispatcher>>,
261 append_mode: bool,
263 merge_mode: MergeMode,
265}
266
267impl std::fmt::Debug for BulkMemtable {
268 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
269 f.debug_struct("BulkMemtable")
270 .field("id", &self.id)
271 .field("num_rows", &self.num_rows.load(Ordering::Relaxed))
272 .field("min_timestamp", &self.min_timestamp.load(Ordering::Relaxed))
273 .field("max_timestamp", &self.max_timestamp.load(Ordering::Relaxed))
274 .field("max_sequence", &self.max_sequence.load(Ordering::Relaxed))
275 .finish()
276 }
277}
278
279impl Memtable for BulkMemtable {
280 fn id(&self) -> MemtableId {
281 self.id
282 }
283
284 fn write(&self, _kvs: &KeyValues) -> Result<()> {
285 UnsupportedOperationSnafu {
286 err_msg: "write() is not supported for bulk memtable",
287 }
288 .fail()
289 }
290
291 fn write_one(&self, _key_value: KeyValue) -> Result<()> {
292 UnsupportedOperationSnafu {
293 err_msg: "write_one() is not supported for bulk memtable",
294 }
295 .fail()
296 }
297
298 fn write_bulk(&self, fragment: BulkPart) -> Result<()> {
299 let local_metrics = WriteMetrics {
300 key_bytes: 0,
301 value_bytes: fragment.estimated_size(),
302 min_ts: fragment.min_timestamp,
303 max_ts: fragment.max_timestamp,
304 num_rows: fragment.num_rows(),
305 max_sequence: fragment.sequence,
306 };
307
308 {
309 let mut bulk_parts = self.parts.write().unwrap();
310
311 if bulk_parts.unordered_part.should_accept(fragment.num_rows()) {
313 bulk_parts.unordered_part.push(fragment);
314
315 if bulk_parts.should_compact_unordered_part()
317 && let Some(bulk_part) = bulk_parts.unordered_part.to_bulk_part()?
318 {
319 bulk_parts.parts.push(BulkPartWrapper {
320 part: bulk_part,
321 file_id: FileId::random(),
322 merging: false,
323 });
324 bulk_parts.unordered_part.clear();
325 }
326 } else {
327 bulk_parts.parts.push(BulkPartWrapper {
328 part: fragment,
329 file_id: FileId::random(),
330 merging: false,
331 });
332 }
333
334 self.update_stats(local_metrics);
339 }
340
341 if self.should_compact() {
342 self.schedule_compact();
343 }
344
345 Ok(())
346 }
347
348 #[cfg(any(test, feature = "test"))]
349 fn iter(
350 &self,
351 _projection: Option<&[ColumnId]>,
352 _predicate: Option<table::predicate::Predicate>,
353 _sequence: Option<SequenceRange>,
354 ) -> Result<crate::memtable::BoxedBatchIterator> {
355 todo!()
356 }
357
358 fn ranges(
359 &self,
360 projection: Option<&[ColumnId]>,
361 options: RangesOptions,
362 ) -> Result<MemtableRanges> {
363 let predicate = options.predicate;
364 let sequence = options.sequence;
365 let mut ranges = BTreeMap::new();
366 let mut range_id = 0;
367
368 let context = Arc::new(BulkIterContext::new_with_pre_filter_mode(
370 self.metadata.clone(),
371 projection,
372 predicate.predicate().cloned(),
373 options.for_flush,
374 options.pre_filter_mode,
375 )?);
376
377 {
379 let bulk_parts = self.parts.read().unwrap();
380
381 if !bulk_parts.unordered_part.is_empty()
383 && let Some(unordered_bulk_part) = bulk_parts.unordered_part.to_bulk_part()?
384 {
385 let num_rows = unordered_bulk_part.num_rows();
386 let range = MemtableRange::new(
387 Arc::new(MemtableRangeContext::new(
388 self.id,
389 Box::new(BulkRangeIterBuilder {
390 part: unordered_bulk_part,
391 context: context.clone(),
392 sequence,
393 }),
394 predicate.clone(),
395 )),
396 num_rows,
397 );
398 ranges.insert(range_id, range);
399 range_id += 1;
400 }
401
402 for part_wrapper in bulk_parts.parts.iter() {
404 if part_wrapper.part.num_rows() == 0 {
406 continue;
407 }
408
409 let range = MemtableRange::new(
410 Arc::new(MemtableRangeContext::new(
411 self.id,
412 Box::new(BulkRangeIterBuilder {
413 part: part_wrapper.part.clone(),
414 context: context.clone(),
415 sequence,
416 }),
417 predicate.clone(),
418 )),
419 part_wrapper.part.num_rows(),
420 );
421 ranges.insert(range_id, range);
422 range_id += 1;
423 }
424
425 for encoded_part_wrapper in bulk_parts.encoded_parts.iter() {
427 if encoded_part_wrapper.part.metadata().num_rows == 0 {
429 continue;
430 }
431
432 let range = MemtableRange::new(
433 Arc::new(MemtableRangeContext::new(
434 self.id,
435 Box::new(EncodedBulkRangeIterBuilder {
436 file_id: encoded_part_wrapper.file_id,
437 part: encoded_part_wrapper.part.clone(),
438 context: context.clone(),
439 sequence,
440 }),
441 predicate.clone(),
442 )),
443 encoded_part_wrapper.part.metadata().num_rows,
444 );
445 ranges.insert(range_id, range);
446 range_id += 1;
447 }
448 }
449
450 let mut stats = self.stats();
451 stats.num_ranges = ranges.len();
452
453 Ok(MemtableRanges { ranges, stats })
455 }
456
457 fn is_empty(&self) -> bool {
458 let bulk_parts = self.parts.read().unwrap();
459 bulk_parts.is_empty()
460 }
461
462 fn freeze(&self) -> Result<()> {
463 self.alloc_tracker.done_allocating();
464 Ok(())
465 }
466
467 fn stats(&self) -> MemtableStats {
468 let estimated_bytes = self.alloc_tracker.bytes_allocated();
469
470 if estimated_bytes == 0 || self.num_rows.load(Ordering::Relaxed) == 0 {
471 return MemtableStats {
472 estimated_bytes,
473 time_range: None,
474 num_rows: 0,
475 num_ranges: 0,
476 max_sequence: 0,
477 series_count: 0,
478 };
479 }
480
481 let ts_type = self
482 .metadata
483 .time_index_column()
484 .column_schema
485 .data_type
486 .clone()
487 .as_timestamp()
488 .expect("Timestamp column must have timestamp type");
489 let max_timestamp = ts_type.create_timestamp(self.max_timestamp.load(Ordering::Relaxed));
490 let min_timestamp = ts_type.create_timestamp(self.min_timestamp.load(Ordering::Relaxed));
491
492 let num_ranges = self.parts.read().unwrap().num_parts();
493
494 MemtableStats {
495 estimated_bytes,
496 time_range: Some((min_timestamp, max_timestamp)),
497 num_rows: self.num_rows.load(Ordering::Relaxed),
498 num_ranges,
499 max_sequence: self.max_sequence.load(Ordering::Relaxed),
500 series_count: self.estimated_series_count(),
501 }
502 }
503
504 fn fork(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef {
505 let flat_arrow_schema = to_flat_sst_arrow_schema(
507 metadata,
508 &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
509 );
510
511 Arc::new(Self {
512 id,
513 parts: Arc::new(RwLock::new(BulkParts::default())),
514 metadata: metadata.clone(),
515 alloc_tracker: AllocTracker::new(self.alloc_tracker.write_buffer_manager()),
516 max_timestamp: AtomicI64::new(i64::MIN),
517 min_timestamp: AtomicI64::new(i64::MAX),
518 max_sequence: AtomicU64::new(0),
519 num_rows: AtomicUsize::new(0),
520 flat_arrow_schema,
521 compactor: Arc::new(Mutex::new(MemtableCompactor::new(metadata.region_id, id))),
522 compact_dispatcher: self.compact_dispatcher.clone(),
523 append_mode: self.append_mode,
524 merge_mode: self.merge_mode,
525 })
526 }
527
528 fn compact(&self, for_flush: bool) -> Result<()> {
529 let mut compactor = self.compactor.lock().unwrap();
530
531 if for_flush {
532 return Ok(());
533 }
534
535 let should_merge = self.parts.read().unwrap().should_merge_bulk_parts();
537 if should_merge {
538 compactor.merge_bulk_parts(
539 &self.flat_arrow_schema,
540 &self.parts,
541 &self.metadata,
542 !self.append_mode,
543 self.merge_mode,
544 )?;
545 }
546
547 let should_merge = self.parts.read().unwrap().should_merge_encoded_parts();
549 if should_merge {
550 compactor.merge_encoded_parts(
551 &self.flat_arrow_schema,
552 &self.parts,
553 &self.metadata,
554 !self.append_mode,
555 self.merge_mode,
556 )?;
557 }
558
559 Ok(())
560 }
561}
562
563impl BulkMemtable {
564 pub fn new(
566 id: MemtableId,
567 metadata: RegionMetadataRef,
568 write_buffer_manager: Option<WriteBufferManagerRef>,
569 compact_dispatcher: Option<Arc<CompactDispatcher>>,
570 append_mode: bool,
571 merge_mode: MergeMode,
572 ) -> Self {
573 let flat_arrow_schema = to_flat_sst_arrow_schema(
574 &metadata,
575 &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
576 );
577
578 let region_id = metadata.region_id;
579 Self {
580 id,
581 parts: Arc::new(RwLock::new(BulkParts::default())),
582 metadata,
583 alloc_tracker: AllocTracker::new(write_buffer_manager),
584 max_timestamp: AtomicI64::new(i64::MIN),
585 min_timestamp: AtomicI64::new(i64::MAX),
586 max_sequence: AtomicU64::new(0),
587 num_rows: AtomicUsize::new(0),
588 flat_arrow_schema,
589 compactor: Arc::new(Mutex::new(MemtableCompactor::new(region_id, id))),
590 compact_dispatcher,
591 append_mode,
592 merge_mode,
593 }
594 }
595
596 #[cfg(test)]
598 pub fn set_unordered_part_threshold(&self, threshold: usize) {
599 self.parts
600 .write()
601 .unwrap()
602 .unordered_part
603 .set_threshold(threshold);
604 }
605
606 #[cfg(test)]
608 pub fn set_unordered_part_compact_threshold(&self, compact_threshold: usize) {
609 self.parts
610 .write()
611 .unwrap()
612 .unordered_part
613 .set_compact_threshold(compact_threshold);
614 }
615
616 fn update_stats(&self, stats: WriteMetrics) {
620 self.alloc_tracker
621 .on_allocation(stats.key_bytes + stats.value_bytes);
622
623 self.max_timestamp
624 .fetch_max(stats.max_ts, Ordering::Relaxed);
625 self.min_timestamp
626 .fetch_min(stats.min_ts, Ordering::Relaxed);
627 self.max_sequence
628 .fetch_max(stats.max_sequence, Ordering::Relaxed);
629 self.num_rows.fetch_add(stats.num_rows, Ordering::Relaxed);
630 }
631
632 fn estimated_series_count(&self) -> usize {
634 let bulk_parts = self.parts.read().unwrap();
635 bulk_parts
636 .parts
637 .iter()
638 .map(|part_wrapper| part_wrapper.part.estimated_series_count())
639 .sum()
640 }
641
642 fn should_compact(&self) -> bool {
644 let parts = self.parts.read().unwrap();
645 parts.should_merge_bulk_parts() || parts.should_merge_encoded_parts()
646 }
647
648 fn schedule_compact(&self) {
650 if let Some(dispatcher) = &self.compact_dispatcher {
651 let task = MemCompactTask {
652 metadata: self.metadata.clone(),
653 parts: self.parts.clone(),
654 flat_arrow_schema: self.flat_arrow_schema.clone(),
655 compactor: self.compactor.clone(),
656 append_mode: self.append_mode,
657 merge_mode: self.merge_mode,
658 };
659
660 dispatcher.dispatch_compact(task);
661 } else {
662 if let Err(e) = self.compact(false) {
664 common_telemetry::error!(e; "Failed to compact table");
665 }
666 }
667 }
668}
669
670struct BulkRangeIterBuilder {
672 part: BulkPart,
673 context: Arc<BulkIterContext>,
674 sequence: Option<SequenceRange>,
675}
676
677impl IterBuilder for BulkRangeIterBuilder {
678 fn build(&self, _metrics: Option<MemScanMetrics>) -> Result<BoxedBatchIterator> {
679 UnsupportedOperationSnafu {
680 err_msg: "BatchIterator is not supported for bulk memtable",
681 }
682 .fail()
683 }
684
685 fn is_record_batch(&self) -> bool {
686 true
687 }
688
689 fn build_record_batch(
690 &self,
691 metrics: Option<MemScanMetrics>,
692 ) -> Result<BoxedRecordBatchIterator> {
693 let series_count = self.part.estimated_series_count();
694 let iter = BulkPartRecordBatchIter::new(
695 self.part.batch.clone(),
696 self.context.clone(),
697 self.sequence,
698 series_count,
699 metrics,
700 );
701
702 Ok(Box::new(iter))
703 }
704
705 fn encoded_range(&self) -> Option<EncodedRange> {
706 None
707 }
708}
709
710struct EncodedBulkRangeIterBuilder {
712 file_id: FileId,
713 part: EncodedBulkPart,
714 context: Arc<BulkIterContext>,
715 sequence: Option<SequenceRange>,
716}
717
718impl IterBuilder for EncodedBulkRangeIterBuilder {
719 fn build(&self, _metrics: Option<MemScanMetrics>) -> Result<BoxedBatchIterator> {
720 UnsupportedOperationSnafu {
721 err_msg: "BatchIterator is not supported for encoded bulk memtable",
722 }
723 .fail()
724 }
725
726 fn is_record_batch(&self) -> bool {
727 true
728 }
729
730 fn build_record_batch(
731 &self,
732 metrics: Option<MemScanMetrics>,
733 ) -> Result<BoxedRecordBatchIterator> {
734 if let Some(iter) = self
735 .part
736 .read(self.context.clone(), self.sequence, metrics)?
737 {
738 Ok(iter)
739 } else {
740 Ok(Box::new(std::iter::empty()))
742 }
743 }
744
745 fn encoded_range(&self) -> Option<EncodedRange> {
746 Some(EncodedRange {
747 data: self.part.data().clone(),
748 sst_info: self.part.to_sst_info(self.file_id),
749 })
750 }
751}
752
753struct BulkPartWrapper {
754 part: BulkPart,
755 file_id: FileId,
757 merging: bool,
759}
760
761struct EncodedPartWrapper {
762 part: EncodedBulkPart,
763 file_id: FileId,
765 merging: bool,
767}
768
769#[derive(Clone)]
771enum PartToMerge {
772 Bulk { part: BulkPart, file_id: FileId },
774 Encoded {
776 part: EncodedBulkPart,
777 file_id: FileId,
778 },
779}
780
781impl PartToMerge {
782 fn file_id(&self) -> FileId {
784 match self {
785 PartToMerge::Bulk { file_id, .. } => *file_id,
786 PartToMerge::Encoded { file_id, .. } => *file_id,
787 }
788 }
789
790 fn min_timestamp(&self) -> i64 {
792 match self {
793 PartToMerge::Bulk { part, .. } => part.min_timestamp,
794 PartToMerge::Encoded { part, .. } => part.metadata().min_timestamp,
795 }
796 }
797
798 fn max_timestamp(&self) -> i64 {
800 match self {
801 PartToMerge::Bulk { part, .. } => part.max_timestamp,
802 PartToMerge::Encoded { part, .. } => part.metadata().max_timestamp,
803 }
804 }
805
806 fn num_rows(&self) -> usize {
808 match self {
809 PartToMerge::Bulk { part, .. } => part.num_rows(),
810 PartToMerge::Encoded { part, .. } => part.metadata().num_rows,
811 }
812 }
813
814 fn create_iterator(
816 self,
817 context: Arc<BulkIterContext>,
818 ) -> Result<Option<BoxedRecordBatchIterator>> {
819 match self {
820 PartToMerge::Bulk { part, .. } => {
821 let series_count = part.estimated_series_count();
822 let iter = BulkPartRecordBatchIter::new(
823 part.batch,
824 context,
825 None, series_count,
827 None, );
829 Ok(Some(Box::new(iter) as BoxedRecordBatchIterator))
830 }
831 PartToMerge::Encoded { part, .. } => part.read(context, None, None),
832 }
833 }
834}
835
836struct MemtableCompactor {
837 region_id: RegionId,
838 memtable_id: MemtableId,
839}
840
841impl MemtableCompactor {
842 fn new(region_id: RegionId, memtable_id: MemtableId) -> Self {
844 Self {
845 region_id,
846 memtable_id,
847 }
848 }
849
850 fn merge_bulk_parts(
852 &mut self,
853 arrow_schema: &SchemaRef,
854 bulk_parts: &RwLock<BulkParts>,
855 metadata: &RegionMetadataRef,
856 dedup: bool,
857 merge_mode: MergeMode,
858 ) -> Result<()> {
859 let start = Instant::now();
860
861 let parts_to_merge = bulk_parts.write().unwrap().collect_bulk_parts_to_merge();
863 if parts_to_merge.is_empty() {
864 return Ok(());
865 }
866
867 let merged_file_ids: HashSet<FileId> =
868 parts_to_merge.iter().map(|part| part.file_id()).collect();
869 let mut guard = MergingFlagsGuard::new(bulk_parts, &merged_file_ids, false);
870
871 let mut sorted_parts = parts_to_merge;
873 sorted_parts.sort_unstable_by_key(|part| part.num_rows());
874
875 let part_groups: Vec<Vec<PartToMerge>> = sorted_parts
877 .chunks(16)
878 .map(|chunk| chunk.to_vec())
879 .collect();
880
881 let total_groups = part_groups.len();
882 let total_parts_to_merge: usize = part_groups.iter().map(|group| group.len()).sum();
883 let merged_parts = part_groups
884 .into_par_iter()
885 .map(|group| Self::merge_parts_group(group, arrow_schema, metadata, dedup, merge_mode))
886 .collect::<Result<Vec<Option<EncodedBulkPart>>>>()?;
887
888 let total_output_rows = {
890 let mut parts = bulk_parts.write().unwrap();
891 parts.install_merged_parts(merged_parts.into_iter().flatten(), &merged_file_ids, false)
892 };
893
894 guard.mark_success();
895
896 common_telemetry::debug!(
897 "BulkMemtable {} {} concurrent compact {} groups, {} bulk parts, {} rows, cost: {:?}",
898 self.region_id,
899 self.memtable_id,
900 total_groups,
901 total_parts_to_merge,
902 total_output_rows,
903 start.elapsed()
904 );
905
906 Ok(())
907 }
908
909 fn merge_encoded_parts(
911 &mut self,
912 arrow_schema: &SchemaRef,
913 bulk_parts: &RwLock<BulkParts>,
914 metadata: &RegionMetadataRef,
915 dedup: bool,
916 merge_mode: MergeMode,
917 ) -> Result<()> {
918 let start = Instant::now();
919
920 let parts_to_merge = {
922 let mut parts = bulk_parts.write().unwrap();
923 parts.collect_encoded_parts_to_merge()
924 };
925
926 if parts_to_merge.is_empty() {
927 return Ok(());
928 }
929
930 let merged_file_ids: HashSet<FileId> =
931 parts_to_merge.iter().map(|part| part.file_id()).collect();
932 let mut guard = MergingFlagsGuard::new(bulk_parts, &merged_file_ids, true);
933
934 if parts_to_merge.len() == 1 {
935 return Ok(());
937 }
938
939 let part_groups: Vec<Vec<PartToMerge>> = parts_to_merge
941 .chunks(16)
942 .map(|chunk| chunk.to_vec())
943 .collect();
944
945 let total_groups = part_groups.len();
946 let total_parts_to_merge: usize = part_groups.iter().map(|group| group.len()).sum();
947
948 let merged_parts = part_groups
949 .into_par_iter()
950 .map(|group| Self::merge_parts_group(group, arrow_schema, metadata, dedup, merge_mode))
951 .collect::<Result<Vec<Option<EncodedBulkPart>>>>()?;
952
953 let total_output_rows = {
955 let mut parts = bulk_parts.write().unwrap();
956 parts.install_merged_parts(merged_parts.into_iter().flatten(), &merged_file_ids, true)
957 };
958
959 guard.mark_success();
961
962 common_telemetry::debug!(
963 "BulkMemtable {} {} concurrent compact {} groups, {} encoded parts, {} rows, cost: {:?}",
964 self.region_id,
965 self.memtable_id,
966 total_groups,
967 total_parts_to_merge,
968 total_output_rows,
969 start.elapsed()
970 );
971
972 Ok(())
973 }
974
975 fn merge_parts_group(
977 parts_to_merge: Vec<PartToMerge>,
978 arrow_schema: &SchemaRef,
979 metadata: &RegionMetadataRef,
980 dedup: bool,
981 merge_mode: MergeMode,
982 ) -> Result<Option<EncodedBulkPart>> {
983 if parts_to_merge.is_empty() {
984 return Ok(None);
985 }
986
987 let min_timestamp = parts_to_merge
989 .iter()
990 .map(|p| p.min_timestamp())
991 .min()
992 .unwrap_or(i64::MAX);
993 let max_timestamp = parts_to_merge
994 .iter()
995 .map(|p| p.max_timestamp())
996 .max()
997 .unwrap_or(i64::MIN);
998
999 let context = Arc::new(BulkIterContext::new(
1000 metadata.clone(),
1001 None, None, true,
1004 )?);
1005
1006 let iterators: Vec<BoxedRecordBatchIterator> = parts_to_merge
1008 .into_iter()
1009 .filter_map(|part| part.create_iterator(context.clone()).ok().flatten())
1010 .collect();
1011
1012 if iterators.is_empty() {
1013 return Ok(None);
1014 }
1015
1016 let merged_iter =
1017 FlatMergeIterator::new(arrow_schema.clone(), iterators, DEFAULT_READ_BATCH_SIZE)?;
1018
1019 let boxed_iter: BoxedRecordBatchIterator = if dedup {
1020 match merge_mode {
1022 MergeMode::LastRow => {
1023 let dedup_iter = FlatDedupIterator::new(merged_iter, FlatLastRow::new(false));
1024 Box::new(dedup_iter)
1025 }
1026 MergeMode::LastNonNull => {
1027 let field_column_count =
1030 metadata.column_metadatas.len() - 1 - metadata.primary_key.len();
1031 let total_columns = arrow_schema.fields().len();
1032 let field_column_start =
1033 total_columns - FIXED_POS_COLUMN_NUM - field_column_count;
1034
1035 let dedup_iter = FlatDedupIterator::new(
1036 merged_iter,
1037 FlatLastNonNull::new(field_column_start, false),
1038 );
1039 Box::new(dedup_iter)
1040 }
1041 }
1042 } else {
1043 Box::new(merged_iter)
1044 };
1045
1046 let encoder = BulkPartEncoder::new(metadata.clone(), DEFAULT_ROW_GROUP_SIZE)?;
1048 let mut metrics = BulkPartEncodeMetrics::default();
1049 let encoded_part = encoder.encode_record_batch_iter(
1050 boxed_iter,
1051 arrow_schema.clone(),
1052 min_timestamp,
1053 max_timestamp,
1054 &mut metrics,
1055 )?;
1056
1057 common_telemetry::trace!("merge_parts_group metrics: {:?}", metrics);
1058
1059 Ok(encoded_part)
1060 }
1061}
1062
1063struct MemCompactTask {
1065 metadata: RegionMetadataRef,
1066 parts: Arc<RwLock<BulkParts>>,
1067
1068 flat_arrow_schema: SchemaRef,
1070 compactor: Arc<Mutex<MemtableCompactor>>,
1072 append_mode: bool,
1074 merge_mode: MergeMode,
1076}
1077
1078impl MemCompactTask {
1079 fn compact(&self) -> Result<()> {
1080 let mut compactor = self.compactor.lock().unwrap();
1081
1082 let should_merge = self.parts.read().unwrap().should_merge_bulk_parts();
1084 if should_merge {
1085 compactor.merge_bulk_parts(
1086 &self.flat_arrow_schema,
1087 &self.parts,
1088 &self.metadata,
1089 !self.append_mode,
1090 self.merge_mode,
1091 )?;
1092 }
1093
1094 let should_merge = self.parts.read().unwrap().should_merge_encoded_parts();
1096 if should_merge {
1097 compactor.merge_encoded_parts(
1098 &self.flat_arrow_schema,
1099 &self.parts,
1100 &self.metadata,
1101 !self.append_mode,
1102 self.merge_mode,
1103 )?;
1104 }
1105
1106 Ok(())
1107 }
1108}
1109
1110#[derive(Debug)]
1112pub struct CompactDispatcher {
1113 semaphore: Arc<Semaphore>,
1114}
1115
1116impl CompactDispatcher {
1117 pub fn new(permits: usize) -> Self {
1119 Self {
1120 semaphore: Arc::new(Semaphore::new(permits)),
1121 }
1122 }
1123
1124 fn dispatch_compact(&self, task: MemCompactTask) {
1126 let semaphore = self.semaphore.clone();
1127 common_runtime::spawn_global(async move {
1128 let Ok(_permit) = semaphore.acquire().await else {
1129 return;
1130 };
1131
1132 common_runtime::spawn_blocking_global(move || {
1133 if let Err(e) = task.compact() {
1134 common_telemetry::error!(e; "Failed to compact memtable, region: {}", task.metadata.region_id);
1135 }
1136 });
1137 });
1138 }
1139}
1140
1141#[derive(Debug, Default)]
1143pub struct BulkMemtableBuilder {
1144 write_buffer_manager: Option<WriteBufferManagerRef>,
1145 compact_dispatcher: Option<Arc<CompactDispatcher>>,
1146 append_mode: bool,
1147 merge_mode: MergeMode,
1148}
1149
1150impl BulkMemtableBuilder {
1151 pub fn new(
1153 write_buffer_manager: Option<WriteBufferManagerRef>,
1154 append_mode: bool,
1155 merge_mode: MergeMode,
1156 ) -> Self {
1157 Self {
1158 write_buffer_manager,
1159 compact_dispatcher: None,
1160 append_mode,
1161 merge_mode,
1162 }
1163 }
1164
1165 pub fn with_compact_dispatcher(mut self, compact_dispatcher: Arc<CompactDispatcher>) -> Self {
1167 self.compact_dispatcher = Some(compact_dispatcher);
1168 self
1169 }
1170}
1171
1172impl MemtableBuilder for BulkMemtableBuilder {
1173 fn build(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef {
1174 Arc::new(BulkMemtable::new(
1175 id,
1176 metadata.clone(),
1177 self.write_buffer_manager.clone(),
1178 self.compact_dispatcher.clone(),
1179 self.append_mode,
1180 self.merge_mode,
1181 ))
1182 }
1183
1184 fn use_bulk_insert(&self, _metadata: &RegionMetadataRef) -> bool {
1185 true
1186 }
1187}
1188
1189#[cfg(test)]
1190mod tests {
1191
1192 use mito_codec::row_converter::build_primary_key_codec;
1193
1194 use super::*;
1195 use crate::memtable::bulk::part::BulkPartConverter;
1196 use crate::read::scan_region::PredicateGroup;
1197 use crate::sst::{FlatSchemaOptions, to_flat_sst_arrow_schema};
1198 use crate::test_util::memtable_util::{build_key_values_with_ts_seq_values, metadata_for_test};
1199
1200 fn create_bulk_part_with_converter(
1201 k0: &str,
1202 k1: u32,
1203 timestamps: Vec<i64>,
1204 values: Vec<Option<f64>>,
1205 sequence: u64,
1206 ) -> Result<BulkPart> {
1207 let metadata = metadata_for_test();
1208 let capacity = 100;
1209 let primary_key_codec = build_primary_key_codec(&metadata);
1210 let schema = to_flat_sst_arrow_schema(
1211 &metadata,
1212 &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
1213 );
1214
1215 let mut converter =
1216 BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec, true);
1217
1218 let key_values = build_key_values_with_ts_seq_values(
1219 &metadata,
1220 k0.to_string(),
1221 k1,
1222 timestamps.into_iter(),
1223 values.into_iter(),
1224 sequence,
1225 );
1226
1227 converter.append_key_values(&key_values)?;
1228 converter.convert()
1229 }
1230
1231 #[test]
1232 fn test_bulk_memtable_write_read() {
1233 let metadata = metadata_for_test();
1234 let memtable =
1235 BulkMemtable::new(999, metadata.clone(), None, None, false, MergeMode::LastRow);
1236 memtable.set_unordered_part_threshold(0);
1238
1239 let test_data = [
1240 (
1241 "key_a",
1242 1u32,
1243 vec![1000i64, 2000i64],
1244 vec![Some(10.5), Some(20.5)],
1245 100u64,
1246 ),
1247 (
1248 "key_b",
1249 2u32,
1250 vec![1500i64, 2500i64],
1251 vec![Some(15.5), Some(25.5)],
1252 200u64,
1253 ),
1254 ("key_c", 3u32, vec![3000i64], vec![Some(30.5)], 300u64),
1255 ];
1256
1257 for (k0, k1, timestamps, values, seq) in test_data.iter() {
1258 let part =
1259 create_bulk_part_with_converter(k0, *k1, timestamps.clone(), values.clone(), *seq)
1260 .unwrap();
1261 memtable.write_bulk(part).unwrap();
1262 }
1263
1264 let stats = memtable.stats();
1265 assert_eq!(5, stats.num_rows);
1266 assert_eq!(3, stats.num_ranges);
1267 assert_eq!(300, stats.max_sequence);
1268
1269 let (min_ts, max_ts) = stats.time_range.unwrap();
1270 assert_eq!(1000, min_ts.value());
1271 assert_eq!(3000, max_ts.value());
1272
1273 let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
1274 let ranges = memtable
1275 .ranges(
1276 None,
1277 RangesOptions::default().with_predicate(predicate_group),
1278 )
1279 .unwrap();
1280
1281 assert_eq!(3, ranges.ranges.len());
1282 assert_eq!(5, ranges.stats.num_rows);
1283
1284 for (_range_id, range) in ranges.ranges.iter() {
1285 assert!(range.num_rows() > 0);
1286 assert!(range.is_record_batch());
1287
1288 let record_batch_iter = range.build_record_batch_iter(None).unwrap();
1289
1290 let mut total_rows = 0;
1291 for batch_result in record_batch_iter {
1292 let batch = batch_result.unwrap();
1293 total_rows += batch.num_rows();
1294 assert!(batch.num_rows() > 0);
1295 assert_eq!(8, batch.num_columns());
1296 }
1297 assert_eq!(total_rows, range.num_rows());
1298 }
1299 }
1300
1301 #[test]
1302 fn test_bulk_memtable_ranges_with_projection() {
1303 let metadata = metadata_for_test();
1304 let memtable =
1305 BulkMemtable::new(111, metadata.clone(), None, None, false, MergeMode::LastRow);
1306
1307 let bulk_part = create_bulk_part_with_converter(
1308 "projection_test",
1309 5,
1310 vec![5000, 6000, 7000],
1311 vec![Some(50.0), Some(60.0), Some(70.0)],
1312 500,
1313 )
1314 .unwrap();
1315
1316 memtable.write_bulk(bulk_part).unwrap();
1317
1318 let projection = vec![4u32];
1319 let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
1320 let ranges = memtable
1321 .ranges(
1322 Some(&projection),
1323 RangesOptions::default().with_predicate(predicate_group),
1324 )
1325 .unwrap();
1326
1327 assert_eq!(1, ranges.ranges.len());
1328 let range = ranges.ranges.get(&0).unwrap();
1329
1330 assert!(range.is_record_batch());
1331 let record_batch_iter = range.build_record_batch_iter(None).unwrap();
1332
1333 let mut total_rows = 0;
1334 for batch_result in record_batch_iter {
1335 let batch = batch_result.unwrap();
1336 assert!(batch.num_rows() > 0);
1337 assert_eq!(5, batch.num_columns());
1338 total_rows += batch.num_rows();
1339 }
1340 assert_eq!(3, total_rows);
1341 }
1342
1343 #[test]
1344 fn test_bulk_memtable_unsupported_operations() {
1345 let metadata = metadata_for_test();
1346 let memtable =
1347 BulkMemtable::new(111, metadata.clone(), None, None, false, MergeMode::LastRow);
1348
1349 let key_values = build_key_values_with_ts_seq_values(
1350 &metadata,
1351 "test".to_string(),
1352 1,
1353 vec![1000].into_iter(),
1354 vec![Some(1.0)].into_iter(),
1355 1,
1356 );
1357
1358 let err = memtable.write(&key_values).unwrap_err();
1359 assert!(err.to_string().contains("not supported"));
1360
1361 let kv = key_values.iter().next().unwrap();
1362 let err = memtable.write_one(kv).unwrap_err();
1363 assert!(err.to_string().contains("not supported"));
1364 }
1365
1366 #[test]
1367 fn test_bulk_memtable_freeze() {
1368 let metadata = metadata_for_test();
1369 let memtable =
1370 BulkMemtable::new(222, metadata.clone(), None, None, false, MergeMode::LastRow);
1371
1372 let bulk_part = create_bulk_part_with_converter(
1373 "freeze_test",
1374 10,
1375 vec![10000],
1376 vec![Some(100.0)],
1377 1000,
1378 )
1379 .unwrap();
1380
1381 memtable.write_bulk(bulk_part).unwrap();
1382 memtable.freeze().unwrap();
1383
1384 let stats_after_freeze = memtable.stats();
1385 assert_eq!(1, stats_after_freeze.num_rows);
1386 }
1387
1388 #[test]
1389 fn test_bulk_memtable_fork() {
1390 let metadata = metadata_for_test();
1391 let original_memtable =
1392 BulkMemtable::new(333, metadata.clone(), None, None, false, MergeMode::LastRow);
1393
1394 let bulk_part =
1395 create_bulk_part_with_converter("fork_test", 15, vec![15000], vec![Some(150.0)], 1500)
1396 .unwrap();
1397
1398 original_memtable.write_bulk(bulk_part).unwrap();
1399
1400 let forked_memtable = original_memtable.fork(444, &metadata);
1401
1402 assert_eq!(forked_memtable.id(), 444);
1403 assert!(forked_memtable.is_empty());
1404 assert_eq!(0, forked_memtable.stats().num_rows);
1405
1406 assert!(!original_memtable.is_empty());
1407 assert_eq!(1, original_memtable.stats().num_rows);
1408 }
1409
1410 #[test]
1411 fn test_bulk_memtable_ranges_multiple_parts() {
1412 let metadata = metadata_for_test();
1413 let memtable =
1414 BulkMemtable::new(777, metadata.clone(), None, None, false, MergeMode::LastRow);
1415 memtable.set_unordered_part_threshold(0);
1417
1418 let parts_data = vec![
1419 (
1420 "part1",
1421 1u32,
1422 vec![1000i64, 1100i64],
1423 vec![Some(10.0), Some(11.0)],
1424 100u64,
1425 ),
1426 (
1427 "part2",
1428 2u32,
1429 vec![2000i64, 2100i64],
1430 vec![Some(20.0), Some(21.0)],
1431 200u64,
1432 ),
1433 ("part3", 3u32, vec![3000i64], vec![Some(30.0)], 300u64),
1434 ];
1435
1436 for (k0, k1, timestamps, values, seq) in parts_data {
1437 let part = create_bulk_part_with_converter(k0, k1, timestamps, values, seq).unwrap();
1438 memtable.write_bulk(part).unwrap();
1439 }
1440
1441 let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
1442 let ranges = memtable
1443 .ranges(
1444 None,
1445 RangesOptions::default().with_predicate(predicate_group),
1446 )
1447 .unwrap();
1448
1449 assert_eq!(3, ranges.ranges.len());
1450 assert_eq!(5, ranges.stats.num_rows);
1451 assert_eq!(3, ranges.stats.num_ranges);
1452
1453 for (range_id, range) in ranges.ranges.iter() {
1454 assert!(*range_id < 3);
1455 assert!(range.num_rows() > 0);
1456 assert!(range.is_record_batch());
1457 }
1458 }
1459
1460 #[test]
1461 fn test_bulk_memtable_ranges_with_sequence_filter() {
1462 let metadata = metadata_for_test();
1463 let memtable =
1464 BulkMemtable::new(888, metadata.clone(), None, None, false, MergeMode::LastRow);
1465
1466 let part = create_bulk_part_with_converter(
1467 "seq_test",
1468 1,
1469 vec![1000, 2000, 3000],
1470 vec![Some(10.0), Some(20.0), Some(30.0)],
1471 500,
1472 )
1473 .unwrap();
1474
1475 memtable.write_bulk(part).unwrap();
1476
1477 let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
1478 let sequence_filter = Some(SequenceRange::LtEq { max: 400 }); let ranges = memtable
1480 .ranges(
1481 None,
1482 RangesOptions::default()
1483 .with_predicate(predicate_group)
1484 .with_sequence(sequence_filter),
1485 )
1486 .unwrap();
1487
1488 assert_eq!(1, ranges.ranges.len());
1489 let range = ranges.ranges.get(&0).unwrap();
1490
1491 let mut record_batch_iter = range.build_record_batch_iter(None).unwrap();
1492 assert!(record_batch_iter.next().is_none());
1493 }
1494
1495 #[test]
1496 fn test_bulk_memtable_ranges_with_encoded_parts() {
1497 let metadata = metadata_for_test();
1498 let memtable =
1499 BulkMemtable::new(999, metadata.clone(), None, None, false, MergeMode::LastRow);
1500 memtable.set_unordered_part_threshold(0);
1502
1503 for i in 0..10 {
1505 let part = create_bulk_part_with_converter(
1506 &format!("key_{}", i),
1507 i,
1508 vec![1000 + i as i64 * 100],
1509 vec![Some(i as f64 * 10.0)],
1510 100 + i as u64,
1511 )
1512 .unwrap();
1513 memtable.write_bulk(part).unwrap();
1514 }
1515
1516 memtable.compact(false).unwrap();
1517
1518 let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
1519 let ranges = memtable
1520 .ranges(
1521 None,
1522 RangesOptions::default().with_predicate(predicate_group),
1523 )
1524 .unwrap();
1525
1526 assert_eq!(3, ranges.ranges.len());
1528 assert_eq!(10, ranges.stats.num_rows);
1529
1530 for (_range_id, range) in ranges.ranges.iter() {
1531 assert!(range.num_rows() > 0);
1532 assert!(range.is_record_batch());
1533
1534 let record_batch_iter = range.build_record_batch_iter(None).unwrap();
1535 let mut total_rows = 0;
1536 for batch_result in record_batch_iter {
1537 let batch = batch_result.unwrap();
1538 total_rows += batch.num_rows();
1539 assert!(batch.num_rows() > 0);
1540 }
1541 assert_eq!(total_rows, range.num_rows());
1542 }
1543 }
1544
1545 #[test]
1546 fn test_bulk_memtable_unordered_part() {
1547 let metadata = metadata_for_test();
1548 let memtable = BulkMemtable::new(
1549 1001,
1550 metadata.clone(),
1551 None,
1552 None,
1553 false,
1554 MergeMode::LastRow,
1555 );
1556
1557 memtable.set_unordered_part_threshold(5);
1560 memtable.set_unordered_part_compact_threshold(10);
1562
1563 for i in 0..3 {
1565 let part = create_bulk_part_with_converter(
1566 &format!("key_{}", i),
1567 i,
1568 vec![1000 + i as i64 * 100, 1100 + i as i64 * 100],
1569 vec![Some(i as f64 * 10.0), Some(i as f64 * 10.0 + 1.0)],
1570 100 + i as u64,
1571 )
1572 .unwrap();
1573 assert_eq!(2, part.num_rows());
1574 memtable.write_bulk(part).unwrap();
1575 }
1576
1577 let stats = memtable.stats();
1579 assert_eq!(6, stats.num_rows);
1580
1581 for i in 3..5 {
1584 let part = create_bulk_part_with_converter(
1585 &format!("key_{}", i),
1586 i,
1587 vec![1000 + i as i64 * 100, 1100 + i as i64 * 100],
1588 vec![Some(i as f64 * 10.0), Some(i as f64 * 10.0 + 1.0)],
1589 100 + i as u64,
1590 )
1591 .unwrap();
1592 memtable.write_bulk(part).unwrap();
1593 }
1594
1595 let stats = memtable.stats();
1597 assert_eq!(10, stats.num_rows);
1598
1599 let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
1601 let ranges = memtable
1602 .ranges(
1603 None,
1604 RangesOptions::default().with_predicate(predicate_group),
1605 )
1606 .unwrap();
1607
1608 assert!(!ranges.ranges.is_empty());
1610 assert_eq!(10, ranges.stats.num_rows);
1611
1612 let mut total_rows_read = 0;
1614 for (_range_id, range) in ranges.ranges.iter() {
1615 assert!(range.is_record_batch());
1616 let record_batch_iter = range.build_record_batch_iter(None).unwrap();
1617
1618 for batch_result in record_batch_iter {
1619 let batch = batch_result.unwrap();
1620 total_rows_read += batch.num_rows();
1621 }
1622 }
1623 assert_eq!(10, total_rows_read);
1624 }
1625
1626 #[test]
1627 fn test_bulk_memtable_unordered_part_mixed_sizes() {
1628 let metadata = metadata_for_test();
1629 let memtable = BulkMemtable::new(
1630 1002,
1631 metadata.clone(),
1632 None,
1633 None,
1634 false,
1635 MergeMode::LastRow,
1636 );
1637
1638 memtable.set_unordered_part_threshold(4);
1640 memtable.set_unordered_part_compact_threshold(8);
1641
1642 for i in 0..2 {
1644 let part = create_bulk_part_with_converter(
1645 &format!("small_{}", i),
1646 i,
1647 vec![1000 + i as i64, 2000 + i as i64, 3000 + i as i64],
1648 vec![Some(i as f64), Some(i as f64 + 1.0), Some(i as f64 + 2.0)],
1649 10 + i as u64,
1650 )
1651 .unwrap();
1652 assert_eq!(3, part.num_rows());
1653 memtable.write_bulk(part).unwrap();
1654 }
1655
1656 let large_part = create_bulk_part_with_converter(
1658 "large_key",
1659 100,
1660 vec![5000, 6000, 7000, 8000, 9000],
1661 vec![
1662 Some(100.0),
1663 Some(101.0),
1664 Some(102.0),
1665 Some(103.0),
1666 Some(104.0),
1667 ],
1668 50,
1669 )
1670 .unwrap();
1671 assert_eq!(5, large_part.num_rows());
1672 memtable.write_bulk(large_part).unwrap();
1673
1674 let part = create_bulk_part_with_converter(
1676 "small_2",
1677 2,
1678 vec![4000, 4100],
1679 vec![Some(20.0), Some(21.0)],
1680 30,
1681 )
1682 .unwrap();
1683 memtable.write_bulk(part).unwrap();
1684
1685 let stats = memtable.stats();
1686 assert_eq!(13, stats.num_rows); let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
1690 let ranges = memtable
1691 .ranges(
1692 None,
1693 RangesOptions::default().with_predicate(predicate_group),
1694 )
1695 .unwrap();
1696
1697 assert_eq!(13, ranges.stats.num_rows);
1698
1699 let mut total_rows_read = 0;
1700 for (_range_id, range) in ranges.ranges.iter() {
1701 let record_batch_iter = range.build_record_batch_iter(None).unwrap();
1702 for batch_result in record_batch_iter {
1703 let batch = batch_result.unwrap();
1704 total_rows_read += batch.num_rows();
1705 }
1706 }
1707 assert_eq!(13, total_rows_read);
1708 }
1709
1710 #[test]
1711 fn test_bulk_memtable_unordered_part_with_ranges() {
1712 let metadata = metadata_for_test();
1713 let memtable = BulkMemtable::new(
1714 1003,
1715 metadata.clone(),
1716 None,
1717 None,
1718 false,
1719 MergeMode::LastRow,
1720 );
1721
1722 memtable.set_unordered_part_threshold(3);
1724 memtable.set_unordered_part_compact_threshold(100); for i in 0..3 {
1728 let part = create_bulk_part_with_converter(
1729 &format!("key_{}", i),
1730 i,
1731 vec![1000 + i as i64 * 100],
1732 vec![Some(i as f64 * 10.0)],
1733 100 + i as u64,
1734 )
1735 .unwrap();
1736 assert_eq!(1, part.num_rows());
1737 memtable.write_bulk(part).unwrap();
1738 }
1739
1740 let stats = memtable.stats();
1741 assert_eq!(3, stats.num_rows);
1742
1743 let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
1745 let ranges = memtable
1746 .ranges(
1747 None,
1748 RangesOptions::default().with_predicate(predicate_group),
1749 )
1750 .unwrap();
1751
1752 assert_eq!(1, ranges.ranges.len());
1754 assert_eq!(3, ranges.stats.num_rows);
1755
1756 let range = ranges.ranges.get(&0).unwrap();
1758 let record_batch_iter = range.build_record_batch_iter(None).unwrap();
1759
1760 let mut total_rows = 0;
1761 for batch_result in record_batch_iter {
1762 let batch = batch_result.unwrap();
1763 total_rows += batch.num_rows();
1764 assert!(batch.num_rows() > 0);
1766 }
1767 assert_eq!(3, total_rows);
1768 }
1769}