1#[allow(unused)]
18pub mod context;
19#[allow(unused)]
20pub mod part;
21pub mod part_reader;
22mod row_group_reader;
23
24use std::collections::{BTreeMap, HashSet};
25use std::sync::atomic::{AtomicI64, AtomicU64, AtomicUsize, Ordering};
26use std::sync::{Arc, Mutex, RwLock};
27use std::time::Instant;
28
29use datatypes::arrow::datatypes::SchemaRef;
30use mito_codec::key_values::KeyValue;
31use rayon::prelude::*;
32use store_api::metadata::RegionMetadataRef;
33use store_api::storage::{ColumnId, RegionId, SequenceNumber};
34use tokio::sync::Semaphore;
35
36use crate::error::{Result, UnsupportedOperationSnafu};
37use crate::flush::WriteBufferManagerRef;
38use crate::memtable::bulk::context::BulkIterContext;
39use crate::memtable::bulk::part::{BulkPart, BulkPartEncodeMetrics, BulkPartEncoder};
40use crate::memtable::bulk::part_reader::BulkPartRecordBatchIter;
41use crate::memtable::stats::WriteMetrics;
42use crate::memtable::{
43 AllocTracker, BoxedBatchIterator, BoxedRecordBatchIterator, EncodedBulkPart, EncodedRange,
44 IterBuilder, KeyValues, MemScanMetrics, Memtable, MemtableBuilder, MemtableId, MemtableRange,
45 MemtableRangeContext, MemtableRanges, MemtableRef, MemtableStats, PredicateGroup,
46};
47use crate::read::flat_dedup::{FlatDedupIterator, FlatLastNonNull, FlatLastRow};
48use crate::read::flat_merge::FlatMergeIterator;
49use crate::region::options::MergeMode;
50use crate::sst::file::FileId;
51use crate::sst::parquet::format::FIXED_POS_COLUMN_NUM;
52use crate::sst::parquet::{DEFAULT_READ_BATCH_SIZE, DEFAULT_ROW_GROUP_SIZE};
53use crate::sst::{FlatSchemaOptions, to_flat_sst_arrow_schema};
54
55#[derive(Default)]
57struct BulkParts {
58 parts: Vec<BulkPartWrapper>,
60 encoded_parts: Vec<EncodedPartWrapper>,
62}
63
64impl BulkParts {
65 fn num_parts(&self) -> usize {
67 self.parts.len() + self.encoded_parts.len()
68 }
69
70 fn is_empty(&self) -> bool {
72 self.parts.is_empty() && self.encoded_parts.is_empty()
73 }
74
75 fn should_merge_bulk_parts(&self) -> bool {
77 let unmerged_count = self.parts.iter().filter(|wrapper| !wrapper.merging).count();
78 unmerged_count >= 8
80 }
81
82 fn should_merge_encoded_parts(&self) -> bool {
84 let unmerged_count = self
85 .encoded_parts
86 .iter()
87 .filter(|wrapper| !wrapper.merging)
88 .count();
89 unmerged_count >= 8
91 }
92
93 fn collect_bulk_parts_to_merge(&mut self) -> Vec<PartToMerge> {
96 let mut collected_parts = Vec::new();
97
98 for wrapper in &mut self.parts {
99 if !wrapper.merging {
100 wrapper.merging = true;
101 collected_parts.push(PartToMerge::Bulk {
102 part: wrapper.part.clone(),
103 file_id: wrapper.file_id,
104 });
105 }
106 }
107 collected_parts
108 }
109
110 fn collect_encoded_parts_to_merge(&mut self) -> Vec<PartToMerge> {
113 let min_size = self
115 .encoded_parts
116 .iter()
117 .filter(|wrapper| !wrapper.merging)
118 .map(|wrapper| wrapper.part.size_bytes())
119 .min();
120
121 let Some(min_size) = min_size else {
122 return Vec::new();
123 };
124
125 let max_allowed_size = min_size.saturating_mul(16).min(4 * 1024 * 1024);
126 let mut collected_parts = Vec::new();
127
128 for wrapper in &mut self.encoded_parts {
129 if !wrapper.merging {
130 let size = wrapper.part.size_bytes();
131 if size <= max_allowed_size {
132 wrapper.merging = true;
133 collected_parts.push(PartToMerge::Encoded {
134 part: wrapper.part.clone(),
135 file_id: wrapper.file_id,
136 });
137 }
138 }
139 }
140 collected_parts
141 }
142
143 fn install_merged_parts<I>(
146 &mut self,
147 merged_parts: I,
148 merged_file_ids: &HashSet<FileId>,
149 merge_encoded: bool,
150 ) -> usize
151 where
152 I: IntoIterator<Item = EncodedBulkPart>,
153 {
154 let mut total_output_rows = 0;
155
156 for encoded_part in merged_parts {
157 total_output_rows += encoded_part.metadata().num_rows;
158 self.encoded_parts.push(EncodedPartWrapper {
159 part: encoded_part,
160 file_id: FileId::random(),
161 merging: false,
162 });
163 }
164
165 if merge_encoded {
166 self.encoded_parts
167 .retain(|wrapper| !merged_file_ids.contains(&wrapper.file_id));
168 } else {
169 self.parts
170 .retain(|wrapper| !merged_file_ids.contains(&wrapper.file_id));
171 }
172
173 total_output_rows
174 }
175
176 fn reset_merging_flags(&mut self, file_ids: &HashSet<FileId>, merge_encoded: bool) {
179 if merge_encoded {
180 for wrapper in &mut self.encoded_parts {
181 if file_ids.contains(&wrapper.file_id) {
182 wrapper.merging = false;
183 }
184 }
185 } else {
186 for wrapper in &mut self.parts {
187 if file_ids.contains(&wrapper.file_id) {
188 wrapper.merging = false;
189 }
190 }
191 }
192 }
193}
194
195struct MergingFlagsGuard<'a> {
198 bulk_parts: &'a RwLock<BulkParts>,
199 file_ids: &'a HashSet<FileId>,
200 merge_encoded: bool,
201 success: bool,
202}
203
204impl<'a> MergingFlagsGuard<'a> {
205 fn new(
207 bulk_parts: &'a RwLock<BulkParts>,
208 file_ids: &'a HashSet<FileId>,
209 merge_encoded: bool,
210 ) -> Self {
211 Self {
212 bulk_parts,
213 file_ids,
214 merge_encoded,
215 success: false,
216 }
217 }
218
219 fn mark_success(&mut self) {
222 self.success = true;
223 }
224}
225
226impl<'a> Drop for MergingFlagsGuard<'a> {
227 fn drop(&mut self) {
228 if !self.success
229 && let Ok(mut parts) = self.bulk_parts.write()
230 {
231 parts.reset_merging_flags(self.file_ids, self.merge_encoded);
232 }
233 }
234}
235
236pub struct BulkMemtable {
238 id: MemtableId,
239 parts: Arc<RwLock<BulkParts>>,
240 metadata: RegionMetadataRef,
241 alloc_tracker: AllocTracker,
242 max_timestamp: AtomicI64,
243 min_timestamp: AtomicI64,
244 max_sequence: AtomicU64,
245 num_rows: AtomicUsize,
246 #[allow(dead_code)]
248 flat_arrow_schema: SchemaRef,
249 compactor: Arc<Mutex<MemtableCompactor>>,
251 compact_dispatcher: Option<Arc<CompactDispatcher>>,
253 append_mode: bool,
255 merge_mode: MergeMode,
257}
258
259impl std::fmt::Debug for BulkMemtable {
260 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
261 f.debug_struct("BulkMemtable")
262 .field("id", &self.id)
263 .field("num_rows", &self.num_rows.load(Ordering::Relaxed))
264 .field("min_timestamp", &self.min_timestamp.load(Ordering::Relaxed))
265 .field("max_timestamp", &self.max_timestamp.load(Ordering::Relaxed))
266 .field("max_sequence", &self.max_sequence.load(Ordering::Relaxed))
267 .finish()
268 }
269}
270
271impl Memtable for BulkMemtable {
272 fn id(&self) -> MemtableId {
273 self.id
274 }
275
276 fn write(&self, _kvs: &KeyValues) -> Result<()> {
277 UnsupportedOperationSnafu {
278 err_msg: "write() is not supported for bulk memtable",
279 }
280 .fail()
281 }
282
283 fn write_one(&self, _key_value: KeyValue) -> Result<()> {
284 UnsupportedOperationSnafu {
285 err_msg: "write_one() is not supported for bulk memtable",
286 }
287 .fail()
288 }
289
290 fn write_bulk(&self, fragment: BulkPart) -> Result<()> {
291 let local_metrics = WriteMetrics {
292 key_bytes: 0,
293 value_bytes: fragment.estimated_size(),
294 min_ts: fragment.min_timestamp,
295 max_ts: fragment.max_timestamp,
296 num_rows: fragment.num_rows(),
297 max_sequence: fragment.sequence,
298 };
299
300 {
301 let mut bulk_parts = self.parts.write().unwrap();
302 bulk_parts.parts.push(BulkPartWrapper {
303 part: fragment,
304 file_id: FileId::random(),
305 merging: false,
306 });
307
308 self.update_stats(local_metrics);
313 }
314
315 if self.should_compact() {
316 self.schedule_compact();
317 }
318
319 Ok(())
320 }
321
322 #[cfg(any(test, feature = "test"))]
323 fn iter(
324 &self,
325 _projection: Option<&[ColumnId]>,
326 _predicate: Option<table::predicate::Predicate>,
327 _sequence: Option<SequenceNumber>,
328 ) -> Result<crate::memtable::BoxedBatchIterator> {
329 todo!()
330 }
331
332 fn ranges(
333 &self,
334 projection: Option<&[ColumnId]>,
335 predicate: PredicateGroup,
336 sequence: Option<SequenceNumber>,
337 ) -> Result<MemtableRanges> {
338 let mut ranges = BTreeMap::new();
339 let mut range_id = 0;
340
341 let context = Arc::new(BulkIterContext::new(
343 self.metadata.clone(),
344 &projection,
345 predicate.predicate().cloned(),
346 ));
347
348 {
350 let bulk_parts = self.parts.read().unwrap();
351
352 for part_wrapper in bulk_parts.parts.iter() {
354 if part_wrapper.part.num_rows() == 0 {
356 continue;
357 }
358
359 let range = MemtableRange::new(
360 Arc::new(MemtableRangeContext::new(
361 self.id,
362 Box::new(BulkRangeIterBuilder {
363 part: part_wrapper.part.clone(),
364 context: context.clone(),
365 sequence,
366 }),
367 predicate.clone(),
368 )),
369 part_wrapper.part.num_rows(),
370 );
371 ranges.insert(range_id, range);
372 range_id += 1;
373 }
374
375 for encoded_part_wrapper in bulk_parts.encoded_parts.iter() {
377 if encoded_part_wrapper.part.metadata().num_rows == 0 {
379 continue;
380 }
381
382 let range = MemtableRange::new(
383 Arc::new(MemtableRangeContext::new(
384 self.id,
385 Box::new(EncodedBulkRangeIterBuilder {
386 file_id: encoded_part_wrapper.file_id,
387 part: encoded_part_wrapper.part.clone(),
388 context: context.clone(),
389 sequence,
390 }),
391 predicate.clone(),
392 )),
393 encoded_part_wrapper.part.metadata().num_rows,
394 );
395 ranges.insert(range_id, range);
396 range_id += 1;
397 }
398 }
399
400 let mut stats = self.stats();
401 stats.num_ranges = ranges.len();
402
403 Ok(MemtableRanges { ranges, stats })
405 }
406
407 fn is_empty(&self) -> bool {
408 let bulk_parts = self.parts.read().unwrap();
409 bulk_parts.is_empty()
410 }
411
412 fn freeze(&self) -> Result<()> {
413 self.alloc_tracker.done_allocating();
414 Ok(())
415 }
416
417 fn stats(&self) -> MemtableStats {
418 let estimated_bytes = self.alloc_tracker.bytes_allocated();
419
420 if estimated_bytes == 0 || self.num_rows.load(Ordering::Relaxed) == 0 {
421 return MemtableStats {
422 estimated_bytes,
423 time_range: None,
424 num_rows: 0,
425 num_ranges: 0,
426 max_sequence: 0,
427 series_count: 0,
428 };
429 }
430
431 let ts_type = self
432 .metadata
433 .time_index_column()
434 .column_schema
435 .data_type
436 .clone()
437 .as_timestamp()
438 .expect("Timestamp column must have timestamp type");
439 let max_timestamp = ts_type.create_timestamp(self.max_timestamp.load(Ordering::Relaxed));
440 let min_timestamp = ts_type.create_timestamp(self.min_timestamp.load(Ordering::Relaxed));
441
442 let num_ranges = self.parts.read().unwrap().num_parts();
443
444 MemtableStats {
445 estimated_bytes,
446 time_range: Some((min_timestamp, max_timestamp)),
447 num_rows: self.num_rows.load(Ordering::Relaxed),
448 num_ranges,
449 max_sequence: self.max_sequence.load(Ordering::Relaxed),
450 series_count: self.estimated_series_count(),
451 }
452 }
453
454 fn fork(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef {
455 let flat_arrow_schema = to_flat_sst_arrow_schema(
457 metadata,
458 &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
459 );
460
461 Arc::new(Self {
462 id,
463 parts: Arc::new(RwLock::new(BulkParts::default())),
464 metadata: metadata.clone(),
465 alloc_tracker: AllocTracker::new(self.alloc_tracker.write_buffer_manager()),
466 max_timestamp: AtomicI64::new(i64::MIN),
467 min_timestamp: AtomicI64::new(i64::MAX),
468 max_sequence: AtomicU64::new(0),
469 num_rows: AtomicUsize::new(0),
470 flat_arrow_schema,
471 compactor: Arc::new(Mutex::new(MemtableCompactor::new(metadata.region_id, id))),
472 compact_dispatcher: self.compact_dispatcher.clone(),
473 append_mode: self.append_mode,
474 merge_mode: self.merge_mode,
475 })
476 }
477
478 fn compact(&self, for_flush: bool) -> Result<()> {
479 let mut compactor = self.compactor.lock().unwrap();
480
481 if for_flush {
482 return Ok(());
483 }
484
485 let should_merge = self.parts.read().unwrap().should_merge_bulk_parts();
487 if should_merge {
488 compactor.merge_bulk_parts(
489 &self.flat_arrow_schema,
490 &self.parts,
491 &self.metadata,
492 !self.append_mode,
493 self.merge_mode,
494 )?;
495 }
496
497 let should_merge = self.parts.read().unwrap().should_merge_encoded_parts();
499 if should_merge {
500 compactor.merge_encoded_parts(
501 &self.flat_arrow_schema,
502 &self.parts,
503 &self.metadata,
504 !self.append_mode,
505 self.merge_mode,
506 )?;
507 }
508
509 Ok(())
510 }
511}
512
513impl BulkMemtable {
514 pub fn new(
516 id: MemtableId,
517 metadata: RegionMetadataRef,
518 write_buffer_manager: Option<WriteBufferManagerRef>,
519 compact_dispatcher: Option<Arc<CompactDispatcher>>,
520 append_mode: bool,
521 merge_mode: MergeMode,
522 ) -> Self {
523 let flat_arrow_schema = to_flat_sst_arrow_schema(
524 &metadata,
525 &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
526 );
527
528 let region_id = metadata.region_id;
529 Self {
530 id,
531 parts: Arc::new(RwLock::new(BulkParts::default())),
532 metadata,
533 alloc_tracker: AllocTracker::new(write_buffer_manager),
534 max_timestamp: AtomicI64::new(i64::MIN),
535 min_timestamp: AtomicI64::new(i64::MAX),
536 max_sequence: AtomicU64::new(0),
537 num_rows: AtomicUsize::new(0),
538 flat_arrow_schema,
539 compactor: Arc::new(Mutex::new(MemtableCompactor::new(region_id, id))),
540 compact_dispatcher,
541 append_mode,
542 merge_mode,
543 }
544 }
545
546 fn update_stats(&self, stats: WriteMetrics) {
550 self.alloc_tracker
551 .on_allocation(stats.key_bytes + stats.value_bytes);
552
553 self.max_timestamp
554 .fetch_max(stats.max_ts, Ordering::Relaxed);
555 self.min_timestamp
556 .fetch_min(stats.min_ts, Ordering::Relaxed);
557 self.max_sequence
558 .fetch_max(stats.max_sequence, Ordering::Relaxed);
559 self.num_rows.fetch_add(stats.num_rows, Ordering::Relaxed);
560 }
561
562 fn estimated_series_count(&self) -> usize {
564 let bulk_parts = self.parts.read().unwrap();
565 bulk_parts
566 .parts
567 .iter()
568 .map(|part_wrapper| part_wrapper.part.estimated_series_count())
569 .sum()
570 }
571
572 fn should_compact(&self) -> bool {
574 let parts = self.parts.read().unwrap();
575 parts.should_merge_bulk_parts() || parts.should_merge_encoded_parts()
576 }
577
578 fn schedule_compact(&self) {
580 if let Some(dispatcher) = &self.compact_dispatcher {
581 let task = MemCompactTask {
582 metadata: self.metadata.clone(),
583 parts: self.parts.clone(),
584 flat_arrow_schema: self.flat_arrow_schema.clone(),
585 compactor: self.compactor.clone(),
586 append_mode: self.append_mode,
587 merge_mode: self.merge_mode,
588 };
589
590 dispatcher.dispatch_compact(task);
591 } else {
592 if let Err(e) = self.compact(false) {
594 common_telemetry::error!(e; "Failed to compact table");
595 }
596 }
597 }
598}
599
600struct BulkRangeIterBuilder {
602 part: BulkPart,
603 context: Arc<BulkIterContext>,
604 sequence: Option<SequenceNumber>,
605}
606
607impl IterBuilder for BulkRangeIterBuilder {
608 fn build(&self, _metrics: Option<MemScanMetrics>) -> Result<BoxedBatchIterator> {
609 UnsupportedOperationSnafu {
610 err_msg: "BatchIterator is not supported for bulk memtable",
611 }
612 .fail()
613 }
614
615 fn is_record_batch(&self) -> bool {
616 true
617 }
618
619 fn build_record_batch(
620 &self,
621 _metrics: Option<MemScanMetrics>,
622 ) -> Result<BoxedRecordBatchIterator> {
623 let iter = BulkPartRecordBatchIter::new(
624 self.part.batch.clone(),
625 self.context.clone(),
626 self.sequence,
627 );
628
629 Ok(Box::new(iter))
630 }
631
632 fn encoded_range(&self) -> Option<EncodedRange> {
633 None
634 }
635}
636
637struct EncodedBulkRangeIterBuilder {
639 #[allow(dead_code)]
640 file_id: FileId,
641 part: EncodedBulkPart,
642 context: Arc<BulkIterContext>,
643 sequence: Option<SequenceNumber>,
644}
645
646impl IterBuilder for EncodedBulkRangeIterBuilder {
647 fn build(&self, _metrics: Option<MemScanMetrics>) -> Result<BoxedBatchIterator> {
648 UnsupportedOperationSnafu {
649 err_msg: "BatchIterator is not supported for encoded bulk memtable",
650 }
651 .fail()
652 }
653
654 fn is_record_batch(&self) -> bool {
655 true
656 }
657
658 fn build_record_batch(
659 &self,
660 _metrics: Option<MemScanMetrics>,
661 ) -> Result<BoxedRecordBatchIterator> {
662 if let Some(iter) = self.part.read(self.context.clone(), self.sequence)? {
663 Ok(iter)
664 } else {
665 Ok(Box::new(std::iter::empty()))
667 }
668 }
669
670 fn encoded_range(&self) -> Option<EncodedRange> {
671 Some(EncodedRange {
672 data: self.part.data().clone(),
673 sst_info: self.part.to_sst_info(self.file_id),
674 })
675 }
676}
677
678struct BulkPartWrapper {
679 part: BulkPart,
680 #[allow(dead_code)]
682 file_id: FileId,
683 merging: bool,
685}
686
687struct EncodedPartWrapper {
688 part: EncodedBulkPart,
689 #[allow(dead_code)]
691 file_id: FileId,
692 merging: bool,
694}
695
696#[derive(Clone)]
698enum PartToMerge {
699 Bulk { part: BulkPart, file_id: FileId },
701 Encoded {
703 part: EncodedBulkPart,
704 file_id: FileId,
705 },
706}
707
708impl PartToMerge {
709 fn file_id(&self) -> FileId {
711 match self {
712 PartToMerge::Bulk { file_id, .. } => *file_id,
713 PartToMerge::Encoded { file_id, .. } => *file_id,
714 }
715 }
716
717 fn min_timestamp(&self) -> i64 {
719 match self {
720 PartToMerge::Bulk { part, .. } => part.min_timestamp,
721 PartToMerge::Encoded { part, .. } => part.metadata().min_timestamp,
722 }
723 }
724
725 fn max_timestamp(&self) -> i64 {
727 match self {
728 PartToMerge::Bulk { part, .. } => part.max_timestamp,
729 PartToMerge::Encoded { part, .. } => part.metadata().max_timestamp,
730 }
731 }
732
733 fn num_rows(&self) -> usize {
735 match self {
736 PartToMerge::Bulk { part, .. } => part.num_rows(),
737 PartToMerge::Encoded { part, .. } => part.metadata().num_rows,
738 }
739 }
740
741 fn create_iterator(
743 self,
744 context: Arc<BulkIterContext>,
745 ) -> Result<Option<BoxedRecordBatchIterator>> {
746 match self {
747 PartToMerge::Bulk { part, .. } => {
748 let iter = BulkPartRecordBatchIter::new(
749 part.batch, context, None, );
751 Ok(Some(Box::new(iter) as BoxedRecordBatchIterator))
752 }
753 PartToMerge::Encoded { part, .. } => part.read(context, None),
754 }
755 }
756}
757
758struct MemtableCompactor {
759 region_id: RegionId,
760 memtable_id: MemtableId,
761}
762
763impl MemtableCompactor {
764 fn new(region_id: RegionId, memtable_id: MemtableId) -> Self {
766 Self {
767 region_id,
768 memtable_id,
769 }
770 }
771
772 fn merge_bulk_parts(
774 &mut self,
775 arrow_schema: &SchemaRef,
776 bulk_parts: &RwLock<BulkParts>,
777 metadata: &RegionMetadataRef,
778 dedup: bool,
779 merge_mode: MergeMode,
780 ) -> Result<()> {
781 let start = Instant::now();
782
783 let parts_to_merge = bulk_parts.write().unwrap().collect_bulk_parts_to_merge();
785 if parts_to_merge.is_empty() {
786 return Ok(());
787 }
788
789 let merged_file_ids: HashSet<FileId> =
790 parts_to_merge.iter().map(|part| part.file_id()).collect();
791 let mut guard = MergingFlagsGuard::new(bulk_parts, &merged_file_ids, false);
792
793 let mut sorted_parts = parts_to_merge;
795 sorted_parts.sort_unstable_by_key(|part| part.num_rows());
796
797 let part_groups: Vec<Vec<PartToMerge>> = sorted_parts
799 .chunks(16)
800 .map(|chunk| chunk.to_vec())
801 .collect();
802
803 let total_groups = part_groups.len();
804 let total_parts_to_merge: usize = part_groups.iter().map(|group| group.len()).sum();
805 let merged_parts = part_groups
806 .into_par_iter()
807 .map(|group| Self::merge_parts_group(group, arrow_schema, metadata, dedup, merge_mode))
808 .collect::<Result<Vec<Option<EncodedBulkPart>>>>()?;
809
810 let total_output_rows = {
812 let mut parts = bulk_parts.write().unwrap();
813 parts.install_merged_parts(merged_parts.into_iter().flatten(), &merged_file_ids, false)
814 };
815
816 guard.mark_success();
817
818 common_telemetry::debug!(
819 "BulkMemtable {} {} concurrent compact {} groups, {} bulk parts, {} rows, cost: {:?}",
820 self.region_id,
821 self.memtable_id,
822 total_groups,
823 total_parts_to_merge,
824 total_output_rows,
825 start.elapsed()
826 );
827
828 Ok(())
829 }
830
831 fn merge_encoded_parts(
833 &mut self,
834 arrow_schema: &SchemaRef,
835 bulk_parts: &RwLock<BulkParts>,
836 metadata: &RegionMetadataRef,
837 dedup: bool,
838 merge_mode: MergeMode,
839 ) -> Result<()> {
840 let start = Instant::now();
841
842 let parts_to_merge = {
844 let mut parts = bulk_parts.write().unwrap();
845 parts.collect_encoded_parts_to_merge()
846 };
847
848 if parts_to_merge.is_empty() {
849 return Ok(());
850 }
851
852 let merged_file_ids: HashSet<FileId> =
853 parts_to_merge.iter().map(|part| part.file_id()).collect();
854 let mut guard = MergingFlagsGuard::new(bulk_parts, &merged_file_ids, true);
855
856 if parts_to_merge.len() == 1 {
857 return Ok(());
859 }
860
861 let part_groups: Vec<Vec<PartToMerge>> = parts_to_merge
863 .chunks(16)
864 .map(|chunk| chunk.to_vec())
865 .collect();
866
867 let total_groups = part_groups.len();
868 let total_parts_to_merge: usize = part_groups.iter().map(|group| group.len()).sum();
869
870 let merged_parts = part_groups
871 .into_par_iter()
872 .map(|group| Self::merge_parts_group(group, arrow_schema, metadata, dedup, merge_mode))
873 .collect::<Result<Vec<Option<EncodedBulkPart>>>>()?;
874
875 let total_output_rows = {
877 let mut parts = bulk_parts.write().unwrap();
878 parts.install_merged_parts(merged_parts.into_iter().flatten(), &merged_file_ids, true)
879 };
880
881 guard.mark_success();
883
884 common_telemetry::debug!(
885 "BulkMemtable {} {} concurrent compact {} groups, {} encoded parts, {} rows, cost: {:?}",
886 self.region_id,
887 self.memtable_id,
888 total_groups,
889 total_parts_to_merge,
890 total_output_rows,
891 start.elapsed()
892 );
893
894 Ok(())
895 }
896
897 fn merge_parts_group(
899 parts_to_merge: Vec<PartToMerge>,
900 arrow_schema: &SchemaRef,
901 metadata: &RegionMetadataRef,
902 dedup: bool,
903 merge_mode: MergeMode,
904 ) -> Result<Option<EncodedBulkPart>> {
905 if parts_to_merge.is_empty() {
906 return Ok(None);
907 }
908
909 let min_timestamp = parts_to_merge
911 .iter()
912 .map(|p| p.min_timestamp())
913 .min()
914 .unwrap_or(i64::MAX);
915 let max_timestamp = parts_to_merge
916 .iter()
917 .map(|p| p.max_timestamp())
918 .max()
919 .unwrap_or(i64::MIN);
920
921 let context = Arc::new(BulkIterContext::new(
922 metadata.clone(),
923 &None, None, ));
926
927 let iterators: Vec<BoxedRecordBatchIterator> = parts_to_merge
929 .into_iter()
930 .filter_map(|part| part.create_iterator(context.clone()).ok().flatten())
931 .collect();
932
933 if iterators.is_empty() {
934 return Ok(None);
935 }
936
937 let merged_iter =
938 FlatMergeIterator::new(arrow_schema.clone(), iterators, DEFAULT_READ_BATCH_SIZE)?;
939
940 let boxed_iter: BoxedRecordBatchIterator = if dedup {
941 match merge_mode {
943 MergeMode::LastRow => {
944 let dedup_iter = FlatDedupIterator::new(merged_iter, FlatLastRow::new(false));
945 Box::new(dedup_iter)
946 }
947 MergeMode::LastNonNull => {
948 let field_column_count =
951 metadata.column_metadatas.len() - 1 - metadata.primary_key.len();
952 let total_columns = arrow_schema.fields().len();
953 let field_column_start =
954 total_columns - FIXED_POS_COLUMN_NUM - field_column_count;
955
956 let dedup_iter = FlatDedupIterator::new(
957 merged_iter,
958 FlatLastNonNull::new(field_column_start, false),
959 );
960 Box::new(dedup_iter)
961 }
962 }
963 } else {
964 Box::new(merged_iter)
965 };
966
967 let encoder = BulkPartEncoder::new(metadata.clone(), DEFAULT_ROW_GROUP_SIZE)?;
969 let mut metrics = BulkPartEncodeMetrics::default();
970 let encoded_part = encoder.encode_record_batch_iter(
971 boxed_iter,
972 arrow_schema.clone(),
973 min_timestamp,
974 max_timestamp,
975 &mut metrics,
976 )?;
977
978 common_telemetry::trace!("merge_parts_group metrics: {:?}", metrics);
979
980 Ok(encoded_part)
981 }
982}
983
984struct MemCompactTask {
986 metadata: RegionMetadataRef,
987 parts: Arc<RwLock<BulkParts>>,
988
989 flat_arrow_schema: SchemaRef,
991 compactor: Arc<Mutex<MemtableCompactor>>,
993 append_mode: bool,
995 merge_mode: MergeMode,
997}
998
999impl MemCompactTask {
1000 fn compact(&self) -> Result<()> {
1001 let mut compactor = self.compactor.lock().unwrap();
1002
1003 let should_merge = self.parts.read().unwrap().should_merge_bulk_parts();
1005 if should_merge {
1006 compactor.merge_bulk_parts(
1007 &self.flat_arrow_schema,
1008 &self.parts,
1009 &self.metadata,
1010 !self.append_mode,
1011 self.merge_mode,
1012 )?;
1013 }
1014
1015 let should_merge = self.parts.read().unwrap().should_merge_encoded_parts();
1017 if should_merge {
1018 compactor.merge_encoded_parts(
1019 &self.flat_arrow_schema,
1020 &self.parts,
1021 &self.metadata,
1022 !self.append_mode,
1023 self.merge_mode,
1024 )?;
1025 }
1026
1027 Ok(())
1028 }
1029}
1030
1031#[derive(Debug)]
1033pub struct CompactDispatcher {
1034 semaphore: Arc<Semaphore>,
1035}
1036
1037impl CompactDispatcher {
1038 pub fn new(permits: usize) -> Self {
1040 Self {
1041 semaphore: Arc::new(Semaphore::new(permits)),
1042 }
1043 }
1044
1045 fn dispatch_compact(&self, task: MemCompactTask) {
1047 let semaphore = self.semaphore.clone();
1048 common_runtime::spawn_global(async move {
1049 let Ok(_permit) = semaphore.acquire().await else {
1050 return;
1051 };
1052
1053 common_runtime::spawn_blocking_global(move || {
1054 if let Err(e) = task.compact() {
1055 common_telemetry::error!(e; "Failed to compact memtable, region: {}", task.metadata.region_id);
1056 }
1057 });
1058 });
1059 }
1060}
1061
1062#[derive(Debug, Default)]
1064pub struct BulkMemtableBuilder {
1065 write_buffer_manager: Option<WriteBufferManagerRef>,
1066 compact_dispatcher: Option<Arc<CompactDispatcher>>,
1067 append_mode: bool,
1068 merge_mode: MergeMode,
1069}
1070
1071impl BulkMemtableBuilder {
1072 pub fn new(
1074 write_buffer_manager: Option<WriteBufferManagerRef>,
1075 append_mode: bool,
1076 merge_mode: MergeMode,
1077 ) -> Self {
1078 Self {
1079 write_buffer_manager,
1080 compact_dispatcher: None,
1081 append_mode,
1082 merge_mode,
1083 }
1084 }
1085
1086 pub fn with_compact_dispatcher(mut self, compact_dispatcher: Arc<CompactDispatcher>) -> Self {
1088 self.compact_dispatcher = Some(compact_dispatcher);
1089 self
1090 }
1091}
1092
1093impl MemtableBuilder for BulkMemtableBuilder {
1094 fn build(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef {
1095 Arc::new(BulkMemtable::new(
1096 id,
1097 metadata.clone(),
1098 self.write_buffer_manager.clone(),
1099 self.compact_dispatcher.clone(),
1100 self.append_mode,
1101 self.merge_mode,
1102 ))
1103 }
1104
1105 fn use_bulk_insert(&self, _metadata: &RegionMetadataRef) -> bool {
1106 true
1107 }
1108}
1109
1110#[cfg(test)]
1111mod tests {
1112
1113 use mito_codec::row_converter::build_primary_key_codec;
1114
1115 use super::*;
1116 use crate::memtable::bulk::part::BulkPartConverter;
1117 use crate::read::scan_region::PredicateGroup;
1118 use crate::sst::{FlatSchemaOptions, to_flat_sst_arrow_schema};
1119 use crate::test_util::memtable_util::{build_key_values_with_ts_seq_values, metadata_for_test};
1120
1121 fn create_bulk_part_with_converter(
1122 k0: &str,
1123 k1: u32,
1124 timestamps: Vec<i64>,
1125 values: Vec<Option<f64>>,
1126 sequence: u64,
1127 ) -> Result<BulkPart> {
1128 let metadata = metadata_for_test();
1129 let capacity = 100;
1130 let primary_key_codec = build_primary_key_codec(&metadata);
1131 let schema = to_flat_sst_arrow_schema(
1132 &metadata,
1133 &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
1134 );
1135
1136 let mut converter =
1137 BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec, true);
1138
1139 let key_values = build_key_values_with_ts_seq_values(
1140 &metadata,
1141 k0.to_string(),
1142 k1,
1143 timestamps.into_iter(),
1144 values.into_iter(),
1145 sequence,
1146 );
1147
1148 converter.append_key_values(&key_values)?;
1149 converter.convert()
1150 }
1151
1152 #[test]
1153 fn test_bulk_memtable_write_read() {
1154 let metadata = metadata_for_test();
1155 let memtable =
1156 BulkMemtable::new(999, metadata.clone(), None, None, false, MergeMode::LastRow);
1157
1158 let test_data = vec![
1159 (
1160 "key_a",
1161 1u32,
1162 vec![1000i64, 2000i64],
1163 vec![Some(10.5), Some(20.5)],
1164 100u64,
1165 ),
1166 (
1167 "key_b",
1168 2u32,
1169 vec![1500i64, 2500i64],
1170 vec![Some(15.5), Some(25.5)],
1171 200u64,
1172 ),
1173 ("key_c", 3u32, vec![3000i64], vec![Some(30.5)], 300u64),
1174 ];
1175
1176 for (k0, k1, timestamps, values, seq) in test_data.iter() {
1177 let part =
1178 create_bulk_part_with_converter(k0, *k1, timestamps.clone(), values.clone(), *seq)
1179 .unwrap();
1180 memtable.write_bulk(part).unwrap();
1181 }
1182
1183 let stats = memtable.stats();
1184 assert_eq!(5, stats.num_rows);
1185 assert_eq!(3, stats.num_ranges);
1186 assert_eq!(300, stats.max_sequence);
1187
1188 let (min_ts, max_ts) = stats.time_range.unwrap();
1189 assert_eq!(1000, min_ts.value());
1190 assert_eq!(3000, max_ts.value());
1191
1192 let predicate_group = PredicateGroup::new(&metadata, &[]);
1193 let ranges = memtable.ranges(None, predicate_group, None).unwrap();
1194
1195 assert_eq!(3, ranges.ranges.len());
1196 assert_eq!(5, ranges.stats.num_rows);
1197
1198 for (_range_id, range) in ranges.ranges.iter() {
1199 assert!(range.num_rows() > 0);
1200 assert!(range.is_record_batch());
1201
1202 let record_batch_iter = range.build_record_batch_iter(None).unwrap();
1203
1204 let mut total_rows = 0;
1205 for batch_result in record_batch_iter {
1206 let batch = batch_result.unwrap();
1207 total_rows += batch.num_rows();
1208 assert!(batch.num_rows() > 0);
1209 assert_eq!(8, batch.num_columns());
1210 }
1211 assert_eq!(total_rows, range.num_rows());
1212 }
1213 }
1214
1215 #[test]
1216 fn test_bulk_memtable_ranges_with_projection() {
1217 let metadata = metadata_for_test();
1218 let memtable =
1219 BulkMemtable::new(111, metadata.clone(), None, None, false, MergeMode::LastRow);
1220
1221 let bulk_part = create_bulk_part_with_converter(
1222 "projection_test",
1223 5,
1224 vec![5000, 6000, 7000],
1225 vec![Some(50.0), Some(60.0), Some(70.0)],
1226 500,
1227 )
1228 .unwrap();
1229
1230 memtable.write_bulk(bulk_part).unwrap();
1231
1232 let projection = vec![4u32];
1233 let predicate_group = PredicateGroup::new(&metadata, &[]);
1234 let ranges = memtable
1235 .ranges(Some(&projection), predicate_group, None)
1236 .unwrap();
1237
1238 assert_eq!(1, ranges.ranges.len());
1239 let range = ranges.ranges.get(&0).unwrap();
1240
1241 assert!(range.is_record_batch());
1242 let record_batch_iter = range.build_record_batch_iter(None).unwrap();
1243
1244 let mut total_rows = 0;
1245 for batch_result in record_batch_iter {
1246 let batch = batch_result.unwrap();
1247 assert!(batch.num_rows() > 0);
1248 assert_eq!(5, batch.num_columns());
1249 total_rows += batch.num_rows();
1250 }
1251 assert_eq!(3, total_rows);
1252 }
1253
1254 #[test]
1255 fn test_bulk_memtable_unsupported_operations() {
1256 let metadata = metadata_for_test();
1257 let memtable =
1258 BulkMemtable::new(111, metadata.clone(), None, None, false, MergeMode::LastRow);
1259
1260 let key_values = build_key_values_with_ts_seq_values(
1261 &metadata,
1262 "test".to_string(),
1263 1,
1264 vec![1000].into_iter(),
1265 vec![Some(1.0)].into_iter(),
1266 1,
1267 );
1268
1269 let err = memtable.write(&key_values).unwrap_err();
1270 assert!(err.to_string().contains("not supported"));
1271
1272 let kv = key_values.iter().next().unwrap();
1273 let err = memtable.write_one(kv).unwrap_err();
1274 assert!(err.to_string().contains("not supported"));
1275 }
1276
1277 #[test]
1278 fn test_bulk_memtable_freeze() {
1279 let metadata = metadata_for_test();
1280 let memtable =
1281 BulkMemtable::new(222, metadata.clone(), None, None, false, MergeMode::LastRow);
1282
1283 let bulk_part = create_bulk_part_with_converter(
1284 "freeze_test",
1285 10,
1286 vec![10000],
1287 vec![Some(100.0)],
1288 1000,
1289 )
1290 .unwrap();
1291
1292 memtable.write_bulk(bulk_part).unwrap();
1293 memtable.freeze().unwrap();
1294
1295 let stats_after_freeze = memtable.stats();
1296 assert_eq!(1, stats_after_freeze.num_rows);
1297 }
1298
1299 #[test]
1300 fn test_bulk_memtable_fork() {
1301 let metadata = metadata_for_test();
1302 let original_memtable =
1303 BulkMemtable::new(333, metadata.clone(), None, None, false, MergeMode::LastRow);
1304
1305 let bulk_part =
1306 create_bulk_part_with_converter("fork_test", 15, vec![15000], vec![Some(150.0)], 1500)
1307 .unwrap();
1308
1309 original_memtable.write_bulk(bulk_part).unwrap();
1310
1311 let forked_memtable = original_memtable.fork(444, &metadata);
1312
1313 assert_eq!(forked_memtable.id(), 444);
1314 assert!(forked_memtable.is_empty());
1315 assert_eq!(0, forked_memtable.stats().num_rows);
1316
1317 assert!(!original_memtable.is_empty());
1318 assert_eq!(1, original_memtable.stats().num_rows);
1319 }
1320
1321 #[test]
1322 fn test_bulk_memtable_ranges_multiple_parts() {
1323 let metadata = metadata_for_test();
1324 let memtable =
1325 BulkMemtable::new(777, metadata.clone(), None, None, false, MergeMode::LastRow);
1326
1327 let parts_data = vec![
1328 (
1329 "part1",
1330 1u32,
1331 vec![1000i64, 1100i64],
1332 vec![Some(10.0), Some(11.0)],
1333 100u64,
1334 ),
1335 (
1336 "part2",
1337 2u32,
1338 vec![2000i64, 2100i64],
1339 vec![Some(20.0), Some(21.0)],
1340 200u64,
1341 ),
1342 ("part3", 3u32, vec![3000i64], vec![Some(30.0)], 300u64),
1343 ];
1344
1345 for (k0, k1, timestamps, values, seq) in parts_data {
1346 let part = create_bulk_part_with_converter(k0, k1, timestamps, values, seq).unwrap();
1347 memtable.write_bulk(part).unwrap();
1348 }
1349
1350 let predicate_group = PredicateGroup::new(&metadata, &[]);
1351 let ranges = memtable.ranges(None, predicate_group, None).unwrap();
1352
1353 assert_eq!(3, ranges.ranges.len());
1354 assert_eq!(5, ranges.stats.num_rows);
1355 assert_eq!(3, ranges.stats.num_ranges);
1356
1357 for (range_id, range) in ranges.ranges.iter() {
1358 assert!(*range_id < 3);
1359 assert!(range.num_rows() > 0);
1360 assert!(range.is_record_batch());
1361 }
1362 }
1363
1364 #[test]
1365 fn test_bulk_memtable_ranges_with_sequence_filter() {
1366 let metadata = metadata_for_test();
1367 let memtable =
1368 BulkMemtable::new(888, metadata.clone(), None, None, false, MergeMode::LastRow);
1369
1370 let part = create_bulk_part_with_converter(
1371 "seq_test",
1372 1,
1373 vec![1000, 2000, 3000],
1374 vec![Some(10.0), Some(20.0), Some(30.0)],
1375 500,
1376 )
1377 .unwrap();
1378
1379 memtable.write_bulk(part).unwrap();
1380
1381 let predicate_group = PredicateGroup::new(&metadata, &[]);
1382 let sequence_filter = Some(400u64); let ranges = memtable
1384 .ranges(None, predicate_group, sequence_filter)
1385 .unwrap();
1386
1387 assert_eq!(1, ranges.ranges.len());
1388 let range = ranges.ranges.get(&0).unwrap();
1389
1390 let mut record_batch_iter = range.build_record_batch_iter(None).unwrap();
1391 assert!(record_batch_iter.next().is_none());
1392 }
1393
1394 #[test]
1395 fn test_bulk_memtable_ranges_with_encoded_parts() {
1396 let metadata = metadata_for_test();
1397 let memtable =
1398 BulkMemtable::new(999, metadata.clone(), None, None, false, MergeMode::LastRow);
1399
1400 for i in 0..10 {
1402 let part = create_bulk_part_with_converter(
1403 &format!("key_{}", i),
1404 i,
1405 vec![1000 + i as i64 * 100],
1406 vec![Some(i as f64 * 10.0)],
1407 100 + i as u64,
1408 )
1409 .unwrap();
1410 memtable.write_bulk(part).unwrap();
1411 }
1412
1413 memtable.compact(false).unwrap();
1414
1415 let predicate_group = PredicateGroup::new(&metadata, &[]);
1416 let ranges = memtable.ranges(None, predicate_group, None).unwrap();
1417
1418 assert_eq!(3, ranges.ranges.len());
1420 assert_eq!(10, ranges.stats.num_rows);
1421
1422 for (_range_id, range) in ranges.ranges.iter() {
1423 assert!(range.num_rows() > 0);
1424 assert!(range.is_record_batch());
1425
1426 let record_batch_iter = range.build_record_batch_iter(None).unwrap();
1427 let mut total_rows = 0;
1428 for batch_result in record_batch_iter {
1429 let batch = batch_result.unwrap();
1430 total_rows += batch.num_rows();
1431 assert!(batch.num_rows() > 0);
1432 }
1433 assert_eq!(total_rows, range.num_rows());
1434 }
1435 }
1436}