1#[allow(unused)]
18pub mod context;
19#[allow(unused)]
20pub mod part;
21pub mod part_reader;
22mod row_group_reader;
23
24use std::collections::{BTreeMap, HashSet};
25use std::sync::atomic::{AtomicI64, AtomicU64, AtomicUsize, Ordering};
26use std::sync::{Arc, Mutex, RwLock};
27use std::time::Instant;
28
29use datatypes::arrow::datatypes::SchemaRef;
30use mito_codec::key_values::KeyValue;
31use rayon::prelude::*;
32use store_api::metadata::RegionMetadataRef;
33use store_api::storage::{ColumnId, FileId, RegionId, SequenceNumber};
34use tokio::sync::Semaphore;
35
36use crate::error::{Result, UnsupportedOperationSnafu};
37use crate::flush::WriteBufferManagerRef;
38use crate::memtable::bulk::context::BulkIterContext;
39use crate::memtable::bulk::part::{BulkPart, BulkPartEncodeMetrics, BulkPartEncoder};
40use crate::memtable::bulk::part_reader::BulkPartRecordBatchIter;
41use crate::memtable::stats::WriteMetrics;
42use crate::memtable::{
43 AllocTracker, BoxedBatchIterator, BoxedRecordBatchIterator, EncodedBulkPart, EncodedRange,
44 IterBuilder, KeyValues, MemScanMetrics, Memtable, MemtableBuilder, MemtableId, MemtableRange,
45 MemtableRangeContext, MemtableRanges, MemtableRef, MemtableStats, PredicateGroup,
46};
47use crate::read::flat_dedup::{FlatDedupIterator, FlatLastNonNull, FlatLastRow};
48use crate::read::flat_merge::FlatMergeIterator;
49use crate::region::options::MergeMode;
50use crate::sst::parquet::format::FIXED_POS_COLUMN_NUM;
51use crate::sst::parquet::{DEFAULT_READ_BATCH_SIZE, DEFAULT_ROW_GROUP_SIZE};
52use crate::sst::{FlatSchemaOptions, to_flat_sst_arrow_schema};
53
54#[derive(Default)]
56struct BulkParts {
57 parts: Vec<BulkPartWrapper>,
59 encoded_parts: Vec<EncodedPartWrapper>,
61}
62
63impl BulkParts {
64 fn num_parts(&self) -> usize {
66 self.parts.len() + self.encoded_parts.len()
67 }
68
69 fn is_empty(&self) -> bool {
71 self.parts.is_empty() && self.encoded_parts.is_empty()
72 }
73
74 fn should_merge_bulk_parts(&self) -> bool {
76 let unmerged_count = self.parts.iter().filter(|wrapper| !wrapper.merging).count();
77 unmerged_count >= 8
79 }
80
81 fn should_merge_encoded_parts(&self) -> bool {
83 let unmerged_count = self
84 .encoded_parts
85 .iter()
86 .filter(|wrapper| !wrapper.merging)
87 .count();
88 unmerged_count >= 8
90 }
91
92 fn collect_bulk_parts_to_merge(&mut self) -> Vec<PartToMerge> {
95 let mut collected_parts = Vec::new();
96
97 for wrapper in &mut self.parts {
98 if !wrapper.merging {
99 wrapper.merging = true;
100 collected_parts.push(PartToMerge::Bulk {
101 part: wrapper.part.clone(),
102 file_id: wrapper.file_id,
103 });
104 }
105 }
106 collected_parts
107 }
108
109 fn collect_encoded_parts_to_merge(&mut self) -> Vec<PartToMerge> {
112 let min_size = self
114 .encoded_parts
115 .iter()
116 .filter(|wrapper| !wrapper.merging)
117 .map(|wrapper| wrapper.part.size_bytes())
118 .min();
119
120 let Some(min_size) = min_size else {
121 return Vec::new();
122 };
123
124 let max_allowed_size = min_size.saturating_mul(16).min(4 * 1024 * 1024);
125 let mut collected_parts = Vec::new();
126
127 for wrapper in &mut self.encoded_parts {
128 if !wrapper.merging {
129 let size = wrapper.part.size_bytes();
130 if size <= max_allowed_size {
131 wrapper.merging = true;
132 collected_parts.push(PartToMerge::Encoded {
133 part: wrapper.part.clone(),
134 file_id: wrapper.file_id,
135 });
136 }
137 }
138 }
139 collected_parts
140 }
141
142 fn install_merged_parts<I>(
145 &mut self,
146 merged_parts: I,
147 merged_file_ids: &HashSet<FileId>,
148 merge_encoded: bool,
149 ) -> usize
150 where
151 I: IntoIterator<Item = EncodedBulkPart>,
152 {
153 let mut total_output_rows = 0;
154
155 for encoded_part in merged_parts {
156 total_output_rows += encoded_part.metadata().num_rows;
157 self.encoded_parts.push(EncodedPartWrapper {
158 part: encoded_part,
159 file_id: FileId::random(),
160 merging: false,
161 });
162 }
163
164 if merge_encoded {
165 self.encoded_parts
166 .retain(|wrapper| !merged_file_ids.contains(&wrapper.file_id));
167 } else {
168 self.parts
169 .retain(|wrapper| !merged_file_ids.contains(&wrapper.file_id));
170 }
171
172 total_output_rows
173 }
174
175 fn reset_merging_flags(&mut self, file_ids: &HashSet<FileId>, merge_encoded: bool) {
178 if merge_encoded {
179 for wrapper in &mut self.encoded_parts {
180 if file_ids.contains(&wrapper.file_id) {
181 wrapper.merging = false;
182 }
183 }
184 } else {
185 for wrapper in &mut self.parts {
186 if file_ids.contains(&wrapper.file_id) {
187 wrapper.merging = false;
188 }
189 }
190 }
191 }
192}
193
194struct MergingFlagsGuard<'a> {
197 bulk_parts: &'a RwLock<BulkParts>,
198 file_ids: &'a HashSet<FileId>,
199 merge_encoded: bool,
200 success: bool,
201}
202
203impl<'a> MergingFlagsGuard<'a> {
204 fn new(
206 bulk_parts: &'a RwLock<BulkParts>,
207 file_ids: &'a HashSet<FileId>,
208 merge_encoded: bool,
209 ) -> Self {
210 Self {
211 bulk_parts,
212 file_ids,
213 merge_encoded,
214 success: false,
215 }
216 }
217
218 fn mark_success(&mut self) {
221 self.success = true;
222 }
223}
224
225impl<'a> Drop for MergingFlagsGuard<'a> {
226 fn drop(&mut self) {
227 if !self.success
228 && let Ok(mut parts) = self.bulk_parts.write()
229 {
230 parts.reset_merging_flags(self.file_ids, self.merge_encoded);
231 }
232 }
233}
234
235pub struct BulkMemtable {
237 id: MemtableId,
238 parts: Arc<RwLock<BulkParts>>,
239 metadata: RegionMetadataRef,
240 alloc_tracker: AllocTracker,
241 max_timestamp: AtomicI64,
242 min_timestamp: AtomicI64,
243 max_sequence: AtomicU64,
244 num_rows: AtomicUsize,
245 #[allow(dead_code)]
247 flat_arrow_schema: SchemaRef,
248 compactor: Arc<Mutex<MemtableCompactor>>,
250 compact_dispatcher: Option<Arc<CompactDispatcher>>,
252 append_mode: bool,
254 merge_mode: MergeMode,
256}
257
258impl std::fmt::Debug for BulkMemtable {
259 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
260 f.debug_struct("BulkMemtable")
261 .field("id", &self.id)
262 .field("num_rows", &self.num_rows.load(Ordering::Relaxed))
263 .field("min_timestamp", &self.min_timestamp.load(Ordering::Relaxed))
264 .field("max_timestamp", &self.max_timestamp.load(Ordering::Relaxed))
265 .field("max_sequence", &self.max_sequence.load(Ordering::Relaxed))
266 .finish()
267 }
268}
269
270impl Memtable for BulkMemtable {
271 fn id(&self) -> MemtableId {
272 self.id
273 }
274
275 fn write(&self, _kvs: &KeyValues) -> Result<()> {
276 UnsupportedOperationSnafu {
277 err_msg: "write() is not supported for bulk memtable",
278 }
279 .fail()
280 }
281
282 fn write_one(&self, _key_value: KeyValue) -> Result<()> {
283 UnsupportedOperationSnafu {
284 err_msg: "write_one() is not supported for bulk memtable",
285 }
286 .fail()
287 }
288
289 fn write_bulk(&self, fragment: BulkPart) -> Result<()> {
290 let local_metrics = WriteMetrics {
291 key_bytes: 0,
292 value_bytes: fragment.estimated_size(),
293 min_ts: fragment.min_timestamp,
294 max_ts: fragment.max_timestamp,
295 num_rows: fragment.num_rows(),
296 max_sequence: fragment.sequence,
297 };
298
299 {
300 let mut bulk_parts = self.parts.write().unwrap();
301 bulk_parts.parts.push(BulkPartWrapper {
302 part: fragment,
303 file_id: FileId::random(),
304 merging: false,
305 });
306
307 self.update_stats(local_metrics);
312 }
313
314 if self.should_compact() {
315 self.schedule_compact();
316 }
317
318 Ok(())
319 }
320
321 #[cfg(any(test, feature = "test"))]
322 fn iter(
323 &self,
324 _projection: Option<&[ColumnId]>,
325 _predicate: Option<table::predicate::Predicate>,
326 _sequence: Option<SequenceNumber>,
327 ) -> Result<crate::memtable::BoxedBatchIterator> {
328 todo!()
329 }
330
331 fn ranges(
332 &self,
333 projection: Option<&[ColumnId]>,
334 predicate: PredicateGroup,
335 sequence: Option<SequenceNumber>,
336 for_flush: bool,
337 ) -> Result<MemtableRanges> {
338 let mut ranges = BTreeMap::new();
339 let mut range_id = 0;
340
341 let context = Arc::new(BulkIterContext::new(
343 self.metadata.clone(),
344 projection,
345 predicate.predicate().cloned(),
346 for_flush,
347 )?);
348
349 {
351 let bulk_parts = self.parts.read().unwrap();
352
353 for part_wrapper in bulk_parts.parts.iter() {
355 if part_wrapper.part.num_rows() == 0 {
357 continue;
358 }
359
360 let range = MemtableRange::new(
361 Arc::new(MemtableRangeContext::new(
362 self.id,
363 Box::new(BulkRangeIterBuilder {
364 part: part_wrapper.part.clone(),
365 context: context.clone(),
366 sequence,
367 }),
368 predicate.clone(),
369 )),
370 part_wrapper.part.num_rows(),
371 );
372 ranges.insert(range_id, range);
373 range_id += 1;
374 }
375
376 for encoded_part_wrapper in bulk_parts.encoded_parts.iter() {
378 if encoded_part_wrapper.part.metadata().num_rows == 0 {
380 continue;
381 }
382
383 let range = MemtableRange::new(
384 Arc::new(MemtableRangeContext::new(
385 self.id,
386 Box::new(EncodedBulkRangeIterBuilder {
387 file_id: encoded_part_wrapper.file_id,
388 part: encoded_part_wrapper.part.clone(),
389 context: context.clone(),
390 sequence,
391 }),
392 predicate.clone(),
393 )),
394 encoded_part_wrapper.part.metadata().num_rows,
395 );
396 ranges.insert(range_id, range);
397 range_id += 1;
398 }
399 }
400
401 let mut stats = self.stats();
402 stats.num_ranges = ranges.len();
403
404 Ok(MemtableRanges { ranges, stats })
406 }
407
408 fn is_empty(&self) -> bool {
409 let bulk_parts = self.parts.read().unwrap();
410 bulk_parts.is_empty()
411 }
412
413 fn freeze(&self) -> Result<()> {
414 self.alloc_tracker.done_allocating();
415 Ok(())
416 }
417
418 fn stats(&self) -> MemtableStats {
419 let estimated_bytes = self.alloc_tracker.bytes_allocated();
420
421 if estimated_bytes == 0 || self.num_rows.load(Ordering::Relaxed) == 0 {
422 return MemtableStats {
423 estimated_bytes,
424 time_range: None,
425 num_rows: 0,
426 num_ranges: 0,
427 max_sequence: 0,
428 series_count: 0,
429 };
430 }
431
432 let ts_type = self
433 .metadata
434 .time_index_column()
435 .column_schema
436 .data_type
437 .clone()
438 .as_timestamp()
439 .expect("Timestamp column must have timestamp type");
440 let max_timestamp = ts_type.create_timestamp(self.max_timestamp.load(Ordering::Relaxed));
441 let min_timestamp = ts_type.create_timestamp(self.min_timestamp.load(Ordering::Relaxed));
442
443 let num_ranges = self.parts.read().unwrap().num_parts();
444
445 MemtableStats {
446 estimated_bytes,
447 time_range: Some((min_timestamp, max_timestamp)),
448 num_rows: self.num_rows.load(Ordering::Relaxed),
449 num_ranges,
450 max_sequence: self.max_sequence.load(Ordering::Relaxed),
451 series_count: self.estimated_series_count(),
452 }
453 }
454
455 fn fork(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef {
456 let flat_arrow_schema = to_flat_sst_arrow_schema(
458 metadata,
459 &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
460 );
461
462 Arc::new(Self {
463 id,
464 parts: Arc::new(RwLock::new(BulkParts::default())),
465 metadata: metadata.clone(),
466 alloc_tracker: AllocTracker::new(self.alloc_tracker.write_buffer_manager()),
467 max_timestamp: AtomicI64::new(i64::MIN),
468 min_timestamp: AtomicI64::new(i64::MAX),
469 max_sequence: AtomicU64::new(0),
470 num_rows: AtomicUsize::new(0),
471 flat_arrow_schema,
472 compactor: Arc::new(Mutex::new(MemtableCompactor::new(metadata.region_id, id))),
473 compact_dispatcher: self.compact_dispatcher.clone(),
474 append_mode: self.append_mode,
475 merge_mode: self.merge_mode,
476 })
477 }
478
479 fn compact(&self, for_flush: bool) -> Result<()> {
480 let mut compactor = self.compactor.lock().unwrap();
481
482 if for_flush {
483 return Ok(());
484 }
485
486 let should_merge = self.parts.read().unwrap().should_merge_bulk_parts();
488 if should_merge {
489 compactor.merge_bulk_parts(
490 &self.flat_arrow_schema,
491 &self.parts,
492 &self.metadata,
493 !self.append_mode,
494 self.merge_mode,
495 )?;
496 }
497
498 let should_merge = self.parts.read().unwrap().should_merge_encoded_parts();
500 if should_merge {
501 compactor.merge_encoded_parts(
502 &self.flat_arrow_schema,
503 &self.parts,
504 &self.metadata,
505 !self.append_mode,
506 self.merge_mode,
507 )?;
508 }
509
510 Ok(())
511 }
512}
513
514impl BulkMemtable {
515 pub fn new(
517 id: MemtableId,
518 metadata: RegionMetadataRef,
519 write_buffer_manager: Option<WriteBufferManagerRef>,
520 compact_dispatcher: Option<Arc<CompactDispatcher>>,
521 append_mode: bool,
522 merge_mode: MergeMode,
523 ) -> Self {
524 let flat_arrow_schema = to_flat_sst_arrow_schema(
525 &metadata,
526 &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
527 );
528
529 let region_id = metadata.region_id;
530 Self {
531 id,
532 parts: Arc::new(RwLock::new(BulkParts::default())),
533 metadata,
534 alloc_tracker: AllocTracker::new(write_buffer_manager),
535 max_timestamp: AtomicI64::new(i64::MIN),
536 min_timestamp: AtomicI64::new(i64::MAX),
537 max_sequence: AtomicU64::new(0),
538 num_rows: AtomicUsize::new(0),
539 flat_arrow_schema,
540 compactor: Arc::new(Mutex::new(MemtableCompactor::new(region_id, id))),
541 compact_dispatcher,
542 append_mode,
543 merge_mode,
544 }
545 }
546
547 fn update_stats(&self, stats: WriteMetrics) {
551 self.alloc_tracker
552 .on_allocation(stats.key_bytes + stats.value_bytes);
553
554 self.max_timestamp
555 .fetch_max(stats.max_ts, Ordering::Relaxed);
556 self.min_timestamp
557 .fetch_min(stats.min_ts, Ordering::Relaxed);
558 self.max_sequence
559 .fetch_max(stats.max_sequence, Ordering::Relaxed);
560 self.num_rows.fetch_add(stats.num_rows, Ordering::Relaxed);
561 }
562
563 fn estimated_series_count(&self) -> usize {
565 let bulk_parts = self.parts.read().unwrap();
566 bulk_parts
567 .parts
568 .iter()
569 .map(|part_wrapper| part_wrapper.part.estimated_series_count())
570 .sum()
571 }
572
573 fn should_compact(&self) -> bool {
575 let parts = self.parts.read().unwrap();
576 parts.should_merge_bulk_parts() || parts.should_merge_encoded_parts()
577 }
578
579 fn schedule_compact(&self) {
581 if let Some(dispatcher) = &self.compact_dispatcher {
582 let task = MemCompactTask {
583 metadata: self.metadata.clone(),
584 parts: self.parts.clone(),
585 flat_arrow_schema: self.flat_arrow_schema.clone(),
586 compactor: self.compactor.clone(),
587 append_mode: self.append_mode,
588 merge_mode: self.merge_mode,
589 };
590
591 dispatcher.dispatch_compact(task);
592 } else {
593 if let Err(e) = self.compact(false) {
595 common_telemetry::error!(e; "Failed to compact table");
596 }
597 }
598 }
599}
600
601struct BulkRangeIterBuilder {
603 part: BulkPart,
604 context: Arc<BulkIterContext>,
605 sequence: Option<SequenceNumber>,
606}
607
608impl IterBuilder for BulkRangeIterBuilder {
609 fn build(&self, _metrics: Option<MemScanMetrics>) -> Result<BoxedBatchIterator> {
610 UnsupportedOperationSnafu {
611 err_msg: "BatchIterator is not supported for bulk memtable",
612 }
613 .fail()
614 }
615
616 fn is_record_batch(&self) -> bool {
617 true
618 }
619
620 fn build_record_batch(
621 &self,
622 _metrics: Option<MemScanMetrics>,
623 ) -> Result<BoxedRecordBatchIterator> {
624 let iter = BulkPartRecordBatchIter::new(
625 self.part.batch.clone(),
626 self.context.clone(),
627 self.sequence,
628 );
629
630 Ok(Box::new(iter))
631 }
632
633 fn encoded_range(&self) -> Option<EncodedRange> {
634 None
635 }
636}
637
638struct EncodedBulkRangeIterBuilder {
640 #[allow(dead_code)]
641 file_id: FileId,
642 part: EncodedBulkPart,
643 context: Arc<BulkIterContext>,
644 sequence: Option<SequenceNumber>,
645}
646
647impl IterBuilder for EncodedBulkRangeIterBuilder {
648 fn build(&self, _metrics: Option<MemScanMetrics>) -> Result<BoxedBatchIterator> {
649 UnsupportedOperationSnafu {
650 err_msg: "BatchIterator is not supported for encoded bulk memtable",
651 }
652 .fail()
653 }
654
655 fn is_record_batch(&self) -> bool {
656 true
657 }
658
659 fn build_record_batch(
660 &self,
661 _metrics: Option<MemScanMetrics>,
662 ) -> Result<BoxedRecordBatchIterator> {
663 if let Some(iter) = self.part.read(self.context.clone(), self.sequence)? {
664 Ok(iter)
665 } else {
666 Ok(Box::new(std::iter::empty()))
668 }
669 }
670
671 fn encoded_range(&self) -> Option<EncodedRange> {
672 Some(EncodedRange {
673 data: self.part.data().clone(),
674 sst_info: self.part.to_sst_info(self.file_id),
675 })
676 }
677}
678
679struct BulkPartWrapper {
680 part: BulkPart,
681 #[allow(dead_code)]
683 file_id: FileId,
684 merging: bool,
686}
687
688struct EncodedPartWrapper {
689 part: EncodedBulkPart,
690 #[allow(dead_code)]
692 file_id: FileId,
693 merging: bool,
695}
696
697#[derive(Clone)]
699enum PartToMerge {
700 Bulk { part: BulkPart, file_id: FileId },
702 Encoded {
704 part: EncodedBulkPart,
705 file_id: FileId,
706 },
707}
708
709impl PartToMerge {
710 fn file_id(&self) -> FileId {
712 match self {
713 PartToMerge::Bulk { file_id, .. } => *file_id,
714 PartToMerge::Encoded { file_id, .. } => *file_id,
715 }
716 }
717
718 fn min_timestamp(&self) -> i64 {
720 match self {
721 PartToMerge::Bulk { part, .. } => part.min_timestamp,
722 PartToMerge::Encoded { part, .. } => part.metadata().min_timestamp,
723 }
724 }
725
726 fn max_timestamp(&self) -> i64 {
728 match self {
729 PartToMerge::Bulk { part, .. } => part.max_timestamp,
730 PartToMerge::Encoded { part, .. } => part.metadata().max_timestamp,
731 }
732 }
733
734 fn num_rows(&self) -> usize {
736 match self {
737 PartToMerge::Bulk { part, .. } => part.num_rows(),
738 PartToMerge::Encoded { part, .. } => part.metadata().num_rows,
739 }
740 }
741
742 fn create_iterator(
744 self,
745 context: Arc<BulkIterContext>,
746 ) -> Result<Option<BoxedRecordBatchIterator>> {
747 match self {
748 PartToMerge::Bulk { part, .. } => {
749 let iter = BulkPartRecordBatchIter::new(
750 part.batch, context, None, );
752 Ok(Some(Box::new(iter) as BoxedRecordBatchIterator))
753 }
754 PartToMerge::Encoded { part, .. } => part.read(context, None),
755 }
756 }
757}
758
759struct MemtableCompactor {
760 region_id: RegionId,
761 memtable_id: MemtableId,
762}
763
764impl MemtableCompactor {
765 fn new(region_id: RegionId, memtable_id: MemtableId) -> Self {
767 Self {
768 region_id,
769 memtable_id,
770 }
771 }
772
773 fn merge_bulk_parts(
775 &mut self,
776 arrow_schema: &SchemaRef,
777 bulk_parts: &RwLock<BulkParts>,
778 metadata: &RegionMetadataRef,
779 dedup: bool,
780 merge_mode: MergeMode,
781 ) -> Result<()> {
782 let start = Instant::now();
783
784 let parts_to_merge = bulk_parts.write().unwrap().collect_bulk_parts_to_merge();
786 if parts_to_merge.is_empty() {
787 return Ok(());
788 }
789
790 let merged_file_ids: HashSet<FileId> =
791 parts_to_merge.iter().map(|part| part.file_id()).collect();
792 let mut guard = MergingFlagsGuard::new(bulk_parts, &merged_file_ids, false);
793
794 let mut sorted_parts = parts_to_merge;
796 sorted_parts.sort_unstable_by_key(|part| part.num_rows());
797
798 let part_groups: Vec<Vec<PartToMerge>> = sorted_parts
800 .chunks(16)
801 .map(|chunk| chunk.to_vec())
802 .collect();
803
804 let total_groups = part_groups.len();
805 let total_parts_to_merge: usize = part_groups.iter().map(|group| group.len()).sum();
806 let merged_parts = part_groups
807 .into_par_iter()
808 .map(|group| Self::merge_parts_group(group, arrow_schema, metadata, dedup, merge_mode))
809 .collect::<Result<Vec<Option<EncodedBulkPart>>>>()?;
810
811 let total_output_rows = {
813 let mut parts = bulk_parts.write().unwrap();
814 parts.install_merged_parts(merged_parts.into_iter().flatten(), &merged_file_ids, false)
815 };
816
817 guard.mark_success();
818
819 common_telemetry::debug!(
820 "BulkMemtable {} {} concurrent compact {} groups, {} bulk parts, {} rows, cost: {:?}",
821 self.region_id,
822 self.memtable_id,
823 total_groups,
824 total_parts_to_merge,
825 total_output_rows,
826 start.elapsed()
827 );
828
829 Ok(())
830 }
831
832 fn merge_encoded_parts(
834 &mut self,
835 arrow_schema: &SchemaRef,
836 bulk_parts: &RwLock<BulkParts>,
837 metadata: &RegionMetadataRef,
838 dedup: bool,
839 merge_mode: MergeMode,
840 ) -> Result<()> {
841 let start = Instant::now();
842
843 let parts_to_merge = {
845 let mut parts = bulk_parts.write().unwrap();
846 parts.collect_encoded_parts_to_merge()
847 };
848
849 if parts_to_merge.is_empty() {
850 return Ok(());
851 }
852
853 let merged_file_ids: HashSet<FileId> =
854 parts_to_merge.iter().map(|part| part.file_id()).collect();
855 let mut guard = MergingFlagsGuard::new(bulk_parts, &merged_file_ids, true);
856
857 if parts_to_merge.len() == 1 {
858 return Ok(());
860 }
861
862 let part_groups: Vec<Vec<PartToMerge>> = parts_to_merge
864 .chunks(16)
865 .map(|chunk| chunk.to_vec())
866 .collect();
867
868 let total_groups = part_groups.len();
869 let total_parts_to_merge: usize = part_groups.iter().map(|group| group.len()).sum();
870
871 let merged_parts = part_groups
872 .into_par_iter()
873 .map(|group| Self::merge_parts_group(group, arrow_schema, metadata, dedup, merge_mode))
874 .collect::<Result<Vec<Option<EncodedBulkPart>>>>()?;
875
876 let total_output_rows = {
878 let mut parts = bulk_parts.write().unwrap();
879 parts.install_merged_parts(merged_parts.into_iter().flatten(), &merged_file_ids, true)
880 };
881
882 guard.mark_success();
884
885 common_telemetry::debug!(
886 "BulkMemtable {} {} concurrent compact {} groups, {} encoded parts, {} rows, cost: {:?}",
887 self.region_id,
888 self.memtable_id,
889 total_groups,
890 total_parts_to_merge,
891 total_output_rows,
892 start.elapsed()
893 );
894
895 Ok(())
896 }
897
898 fn merge_parts_group(
900 parts_to_merge: Vec<PartToMerge>,
901 arrow_schema: &SchemaRef,
902 metadata: &RegionMetadataRef,
903 dedup: bool,
904 merge_mode: MergeMode,
905 ) -> Result<Option<EncodedBulkPart>> {
906 if parts_to_merge.is_empty() {
907 return Ok(None);
908 }
909
910 let min_timestamp = parts_to_merge
912 .iter()
913 .map(|p| p.min_timestamp())
914 .min()
915 .unwrap_or(i64::MAX);
916 let max_timestamp = parts_to_merge
917 .iter()
918 .map(|p| p.max_timestamp())
919 .max()
920 .unwrap_or(i64::MIN);
921
922 let context = Arc::new(BulkIterContext::new(
923 metadata.clone(),
924 None, None, true,
927 )?);
928
929 let iterators: Vec<BoxedRecordBatchIterator> = parts_to_merge
931 .into_iter()
932 .filter_map(|part| part.create_iterator(context.clone()).ok().flatten())
933 .collect();
934
935 if iterators.is_empty() {
936 return Ok(None);
937 }
938
939 let merged_iter =
940 FlatMergeIterator::new(arrow_schema.clone(), iterators, DEFAULT_READ_BATCH_SIZE)?;
941
942 let boxed_iter: BoxedRecordBatchIterator = if dedup {
943 match merge_mode {
945 MergeMode::LastRow => {
946 let dedup_iter = FlatDedupIterator::new(merged_iter, FlatLastRow::new(false));
947 Box::new(dedup_iter)
948 }
949 MergeMode::LastNonNull => {
950 let field_column_count =
953 metadata.column_metadatas.len() - 1 - metadata.primary_key.len();
954 let total_columns = arrow_schema.fields().len();
955 let field_column_start =
956 total_columns - FIXED_POS_COLUMN_NUM - field_column_count;
957
958 let dedup_iter = FlatDedupIterator::new(
959 merged_iter,
960 FlatLastNonNull::new(field_column_start, false),
961 );
962 Box::new(dedup_iter)
963 }
964 }
965 } else {
966 Box::new(merged_iter)
967 };
968
969 let encoder = BulkPartEncoder::new(metadata.clone(), DEFAULT_ROW_GROUP_SIZE)?;
971 let mut metrics = BulkPartEncodeMetrics::default();
972 let encoded_part = encoder.encode_record_batch_iter(
973 boxed_iter,
974 arrow_schema.clone(),
975 min_timestamp,
976 max_timestamp,
977 &mut metrics,
978 )?;
979
980 common_telemetry::trace!("merge_parts_group metrics: {:?}", metrics);
981
982 Ok(encoded_part)
983 }
984}
985
986struct MemCompactTask {
988 metadata: RegionMetadataRef,
989 parts: Arc<RwLock<BulkParts>>,
990
991 flat_arrow_schema: SchemaRef,
993 compactor: Arc<Mutex<MemtableCompactor>>,
995 append_mode: bool,
997 merge_mode: MergeMode,
999}
1000
1001impl MemCompactTask {
1002 fn compact(&self) -> Result<()> {
1003 let mut compactor = self.compactor.lock().unwrap();
1004
1005 let should_merge = self.parts.read().unwrap().should_merge_bulk_parts();
1007 if should_merge {
1008 compactor.merge_bulk_parts(
1009 &self.flat_arrow_schema,
1010 &self.parts,
1011 &self.metadata,
1012 !self.append_mode,
1013 self.merge_mode,
1014 )?;
1015 }
1016
1017 let should_merge = self.parts.read().unwrap().should_merge_encoded_parts();
1019 if should_merge {
1020 compactor.merge_encoded_parts(
1021 &self.flat_arrow_schema,
1022 &self.parts,
1023 &self.metadata,
1024 !self.append_mode,
1025 self.merge_mode,
1026 )?;
1027 }
1028
1029 Ok(())
1030 }
1031}
1032
1033#[derive(Debug)]
1035pub struct CompactDispatcher {
1036 semaphore: Arc<Semaphore>,
1037}
1038
1039impl CompactDispatcher {
1040 pub fn new(permits: usize) -> Self {
1042 Self {
1043 semaphore: Arc::new(Semaphore::new(permits)),
1044 }
1045 }
1046
1047 fn dispatch_compact(&self, task: MemCompactTask) {
1049 let semaphore = self.semaphore.clone();
1050 common_runtime::spawn_global(async move {
1051 let Ok(_permit) = semaphore.acquire().await else {
1052 return;
1053 };
1054
1055 common_runtime::spawn_blocking_global(move || {
1056 if let Err(e) = task.compact() {
1057 common_telemetry::error!(e; "Failed to compact memtable, region: {}", task.metadata.region_id);
1058 }
1059 });
1060 });
1061 }
1062}
1063
1064#[derive(Debug, Default)]
1066pub struct BulkMemtableBuilder {
1067 write_buffer_manager: Option<WriteBufferManagerRef>,
1068 compact_dispatcher: Option<Arc<CompactDispatcher>>,
1069 append_mode: bool,
1070 merge_mode: MergeMode,
1071}
1072
1073impl BulkMemtableBuilder {
1074 pub fn new(
1076 write_buffer_manager: Option<WriteBufferManagerRef>,
1077 append_mode: bool,
1078 merge_mode: MergeMode,
1079 ) -> Self {
1080 Self {
1081 write_buffer_manager,
1082 compact_dispatcher: None,
1083 append_mode,
1084 merge_mode,
1085 }
1086 }
1087
1088 pub fn with_compact_dispatcher(mut self, compact_dispatcher: Arc<CompactDispatcher>) -> Self {
1090 self.compact_dispatcher = Some(compact_dispatcher);
1091 self
1092 }
1093}
1094
1095impl MemtableBuilder for BulkMemtableBuilder {
1096 fn build(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef {
1097 Arc::new(BulkMemtable::new(
1098 id,
1099 metadata.clone(),
1100 self.write_buffer_manager.clone(),
1101 self.compact_dispatcher.clone(),
1102 self.append_mode,
1103 self.merge_mode,
1104 ))
1105 }
1106
1107 fn use_bulk_insert(&self, _metadata: &RegionMetadataRef) -> bool {
1108 true
1109 }
1110}
1111
1112#[cfg(test)]
1113mod tests {
1114
1115 use mito_codec::row_converter::build_primary_key_codec;
1116
1117 use super::*;
1118 use crate::memtable::bulk::part::BulkPartConverter;
1119 use crate::read::scan_region::PredicateGroup;
1120 use crate::sst::{FlatSchemaOptions, to_flat_sst_arrow_schema};
1121 use crate::test_util::memtable_util::{build_key_values_with_ts_seq_values, metadata_for_test};
1122
1123 fn create_bulk_part_with_converter(
1124 k0: &str,
1125 k1: u32,
1126 timestamps: Vec<i64>,
1127 values: Vec<Option<f64>>,
1128 sequence: u64,
1129 ) -> Result<BulkPart> {
1130 let metadata = metadata_for_test();
1131 let capacity = 100;
1132 let primary_key_codec = build_primary_key_codec(&metadata);
1133 let schema = to_flat_sst_arrow_schema(
1134 &metadata,
1135 &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
1136 );
1137
1138 let mut converter =
1139 BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec, true);
1140
1141 let key_values = build_key_values_with_ts_seq_values(
1142 &metadata,
1143 k0.to_string(),
1144 k1,
1145 timestamps.into_iter(),
1146 values.into_iter(),
1147 sequence,
1148 );
1149
1150 converter.append_key_values(&key_values)?;
1151 converter.convert()
1152 }
1153
1154 #[test]
1155 fn test_bulk_memtable_write_read() {
1156 let metadata = metadata_for_test();
1157 let memtable =
1158 BulkMemtable::new(999, metadata.clone(), None, None, false, MergeMode::LastRow);
1159
1160 let test_data = [
1161 (
1162 "key_a",
1163 1u32,
1164 vec![1000i64, 2000i64],
1165 vec![Some(10.5), Some(20.5)],
1166 100u64,
1167 ),
1168 (
1169 "key_b",
1170 2u32,
1171 vec![1500i64, 2500i64],
1172 vec![Some(15.5), Some(25.5)],
1173 200u64,
1174 ),
1175 ("key_c", 3u32, vec![3000i64], vec![Some(30.5)], 300u64),
1176 ];
1177
1178 for (k0, k1, timestamps, values, seq) in test_data.iter() {
1179 let part =
1180 create_bulk_part_with_converter(k0, *k1, timestamps.clone(), values.clone(), *seq)
1181 .unwrap();
1182 memtable.write_bulk(part).unwrap();
1183 }
1184
1185 let stats = memtable.stats();
1186 assert_eq!(5, stats.num_rows);
1187 assert_eq!(3, stats.num_ranges);
1188 assert_eq!(300, stats.max_sequence);
1189
1190 let (min_ts, max_ts) = stats.time_range.unwrap();
1191 assert_eq!(1000, min_ts.value());
1192 assert_eq!(3000, max_ts.value());
1193
1194 let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
1195 let ranges = memtable.ranges(None, predicate_group, None, false).unwrap();
1196
1197 assert_eq!(3, ranges.ranges.len());
1198 assert_eq!(5, ranges.stats.num_rows);
1199
1200 for (_range_id, range) in ranges.ranges.iter() {
1201 assert!(range.num_rows() > 0);
1202 assert!(range.is_record_batch());
1203
1204 let record_batch_iter = range.build_record_batch_iter(None).unwrap();
1205
1206 let mut total_rows = 0;
1207 for batch_result in record_batch_iter {
1208 let batch = batch_result.unwrap();
1209 total_rows += batch.num_rows();
1210 assert!(batch.num_rows() > 0);
1211 assert_eq!(8, batch.num_columns());
1212 }
1213 assert_eq!(total_rows, range.num_rows());
1214 }
1215 }
1216
1217 #[test]
1218 fn test_bulk_memtable_ranges_with_projection() {
1219 let metadata = metadata_for_test();
1220 let memtable =
1221 BulkMemtable::new(111, metadata.clone(), None, None, false, MergeMode::LastRow);
1222
1223 let bulk_part = create_bulk_part_with_converter(
1224 "projection_test",
1225 5,
1226 vec![5000, 6000, 7000],
1227 vec![Some(50.0), Some(60.0), Some(70.0)],
1228 500,
1229 )
1230 .unwrap();
1231
1232 memtable.write_bulk(bulk_part).unwrap();
1233
1234 let projection = vec![4u32];
1235 let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
1236 let ranges = memtable
1237 .ranges(Some(&projection), predicate_group, None, false)
1238 .unwrap();
1239
1240 assert_eq!(1, ranges.ranges.len());
1241 let range = ranges.ranges.get(&0).unwrap();
1242
1243 assert!(range.is_record_batch());
1244 let record_batch_iter = range.build_record_batch_iter(None).unwrap();
1245
1246 let mut total_rows = 0;
1247 for batch_result in record_batch_iter {
1248 let batch = batch_result.unwrap();
1249 assert!(batch.num_rows() > 0);
1250 assert_eq!(5, batch.num_columns());
1251 total_rows += batch.num_rows();
1252 }
1253 assert_eq!(3, total_rows);
1254 }
1255
1256 #[test]
1257 fn test_bulk_memtable_unsupported_operations() {
1258 let metadata = metadata_for_test();
1259 let memtable =
1260 BulkMemtable::new(111, metadata.clone(), None, None, false, MergeMode::LastRow);
1261
1262 let key_values = build_key_values_with_ts_seq_values(
1263 &metadata,
1264 "test".to_string(),
1265 1,
1266 vec![1000].into_iter(),
1267 vec![Some(1.0)].into_iter(),
1268 1,
1269 );
1270
1271 let err = memtable.write(&key_values).unwrap_err();
1272 assert!(err.to_string().contains("not supported"));
1273
1274 let kv = key_values.iter().next().unwrap();
1275 let err = memtable.write_one(kv).unwrap_err();
1276 assert!(err.to_string().contains("not supported"));
1277 }
1278
1279 #[test]
1280 fn test_bulk_memtable_freeze() {
1281 let metadata = metadata_for_test();
1282 let memtable =
1283 BulkMemtable::new(222, metadata.clone(), None, None, false, MergeMode::LastRow);
1284
1285 let bulk_part = create_bulk_part_with_converter(
1286 "freeze_test",
1287 10,
1288 vec![10000],
1289 vec![Some(100.0)],
1290 1000,
1291 )
1292 .unwrap();
1293
1294 memtable.write_bulk(bulk_part).unwrap();
1295 memtable.freeze().unwrap();
1296
1297 let stats_after_freeze = memtable.stats();
1298 assert_eq!(1, stats_after_freeze.num_rows);
1299 }
1300
1301 #[test]
1302 fn test_bulk_memtable_fork() {
1303 let metadata = metadata_for_test();
1304 let original_memtable =
1305 BulkMemtable::new(333, metadata.clone(), None, None, false, MergeMode::LastRow);
1306
1307 let bulk_part =
1308 create_bulk_part_with_converter("fork_test", 15, vec![15000], vec![Some(150.0)], 1500)
1309 .unwrap();
1310
1311 original_memtable.write_bulk(bulk_part).unwrap();
1312
1313 let forked_memtable = original_memtable.fork(444, &metadata);
1314
1315 assert_eq!(forked_memtable.id(), 444);
1316 assert!(forked_memtable.is_empty());
1317 assert_eq!(0, forked_memtable.stats().num_rows);
1318
1319 assert!(!original_memtable.is_empty());
1320 assert_eq!(1, original_memtable.stats().num_rows);
1321 }
1322
1323 #[test]
1324 fn test_bulk_memtable_ranges_multiple_parts() {
1325 let metadata = metadata_for_test();
1326 let memtable =
1327 BulkMemtable::new(777, metadata.clone(), None, None, false, MergeMode::LastRow);
1328
1329 let parts_data = vec![
1330 (
1331 "part1",
1332 1u32,
1333 vec![1000i64, 1100i64],
1334 vec![Some(10.0), Some(11.0)],
1335 100u64,
1336 ),
1337 (
1338 "part2",
1339 2u32,
1340 vec![2000i64, 2100i64],
1341 vec![Some(20.0), Some(21.0)],
1342 200u64,
1343 ),
1344 ("part3", 3u32, vec![3000i64], vec![Some(30.0)], 300u64),
1345 ];
1346
1347 for (k0, k1, timestamps, values, seq) in parts_data {
1348 let part = create_bulk_part_with_converter(k0, k1, timestamps, values, seq).unwrap();
1349 memtable.write_bulk(part).unwrap();
1350 }
1351
1352 let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
1353 let ranges = memtable.ranges(None, predicate_group, None, false).unwrap();
1354
1355 assert_eq!(3, ranges.ranges.len());
1356 assert_eq!(5, ranges.stats.num_rows);
1357 assert_eq!(3, ranges.stats.num_ranges);
1358
1359 for (range_id, range) in ranges.ranges.iter() {
1360 assert!(*range_id < 3);
1361 assert!(range.num_rows() > 0);
1362 assert!(range.is_record_batch());
1363 }
1364 }
1365
1366 #[test]
1367 fn test_bulk_memtable_ranges_with_sequence_filter() {
1368 let metadata = metadata_for_test();
1369 let memtable =
1370 BulkMemtable::new(888, metadata.clone(), None, None, false, MergeMode::LastRow);
1371
1372 let part = create_bulk_part_with_converter(
1373 "seq_test",
1374 1,
1375 vec![1000, 2000, 3000],
1376 vec![Some(10.0), Some(20.0), Some(30.0)],
1377 500,
1378 )
1379 .unwrap();
1380
1381 memtable.write_bulk(part).unwrap();
1382
1383 let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
1384 let sequence_filter = Some(400u64); let ranges = memtable
1386 .ranges(None, predicate_group, sequence_filter, false)
1387 .unwrap();
1388
1389 assert_eq!(1, ranges.ranges.len());
1390 let range = ranges.ranges.get(&0).unwrap();
1391
1392 let mut record_batch_iter = range.build_record_batch_iter(None).unwrap();
1393 assert!(record_batch_iter.next().is_none());
1394 }
1395
1396 #[test]
1397 fn test_bulk_memtable_ranges_with_encoded_parts() {
1398 let metadata = metadata_for_test();
1399 let memtable =
1400 BulkMemtable::new(999, metadata.clone(), None, None, false, MergeMode::LastRow);
1401
1402 for i in 0..10 {
1404 let part = create_bulk_part_with_converter(
1405 &format!("key_{}", i),
1406 i,
1407 vec![1000 + i as i64 * 100],
1408 vec![Some(i as f64 * 10.0)],
1409 100 + i as u64,
1410 )
1411 .unwrap();
1412 memtable.write_bulk(part).unwrap();
1413 }
1414
1415 memtable.compact(false).unwrap();
1416
1417 let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
1418 let ranges = memtable.ranges(None, predicate_group, None, false).unwrap();
1419
1420 assert_eq!(3, ranges.ranges.len());
1422 assert_eq!(10, ranges.stats.num_rows);
1423
1424 for (_range_id, range) in ranges.ranges.iter() {
1425 assert!(range.num_rows() > 0);
1426 assert!(range.is_record_batch());
1427
1428 let record_batch_iter = range.build_record_batch_iter(None).unwrap();
1429 let mut total_rows = 0;
1430 for batch_result in record_batch_iter {
1431 let batch = batch_result.unwrap();
1432 total_rows += batch.num_rows();
1433 assert!(batch.num_rows() > 0);
1434 }
1435 assert_eq!(total_rows, range.num_rows());
1436 }
1437 }
1438}