1#[allow(unused)]
18pub mod context;
19#[allow(unused)]
20pub mod part;
21pub mod part_reader;
22mod row_group_reader;
23
24use std::collections::{BTreeMap, HashSet};
25use std::sync::atomic::{AtomicI64, AtomicU64, AtomicUsize, Ordering};
26use std::sync::{Arc, Mutex, RwLock};
27use std::time::Instant;
28
29use datatypes::arrow::datatypes::SchemaRef;
30use mito_codec::key_values::KeyValue;
31use rayon::prelude::*;
32use store_api::metadata::RegionMetadataRef;
33use store_api::storage::{ColumnId, FileId, RegionId, SequenceRange};
34use tokio::sync::Semaphore;
35
36use crate::error::{Result, UnsupportedOperationSnafu};
37use crate::flush::WriteBufferManagerRef;
38use crate::memtable::bulk::context::BulkIterContext;
39use crate::memtable::bulk::part::{BulkPart, BulkPartEncodeMetrics, BulkPartEncoder};
40use crate::memtable::bulk::part_reader::BulkPartRecordBatchIter;
41use crate::memtable::stats::WriteMetrics;
42use crate::memtable::{
43 AllocTracker, BoxedBatchIterator, BoxedRecordBatchIterator, EncodedBulkPart, EncodedRange,
44 IterBuilder, KeyValues, MemScanMetrics, Memtable, MemtableBuilder, MemtableId, MemtableRange,
45 MemtableRangeContext, MemtableRanges, MemtableRef, MemtableStats, RangesOptions,
46};
47use crate::read::flat_dedup::{FlatDedupIterator, FlatLastNonNull, FlatLastRow};
48use crate::read::flat_merge::FlatMergeIterator;
49use crate::region::options::MergeMode;
50use crate::sst::parquet::format::FIXED_POS_COLUMN_NUM;
51use crate::sst::parquet::{DEFAULT_READ_BATCH_SIZE, DEFAULT_ROW_GROUP_SIZE};
52use crate::sst::{FlatSchemaOptions, to_flat_sst_arrow_schema};
53
54#[derive(Default)]
56struct BulkParts {
57 parts: Vec<BulkPartWrapper>,
59 encoded_parts: Vec<EncodedPartWrapper>,
61}
62
63impl BulkParts {
64 fn num_parts(&self) -> usize {
66 self.parts.len() + self.encoded_parts.len()
67 }
68
69 fn is_empty(&self) -> bool {
71 self.parts.is_empty() && self.encoded_parts.is_empty()
72 }
73
74 fn should_merge_bulk_parts(&self) -> bool {
76 let unmerged_count = self.parts.iter().filter(|wrapper| !wrapper.merging).count();
77 unmerged_count >= 8
79 }
80
81 fn should_merge_encoded_parts(&self) -> bool {
83 let unmerged_count = self
84 .encoded_parts
85 .iter()
86 .filter(|wrapper| !wrapper.merging)
87 .count();
88 unmerged_count >= 8
90 }
91
92 fn collect_bulk_parts_to_merge(&mut self) -> Vec<PartToMerge> {
95 let mut collected_parts = Vec::new();
96
97 for wrapper in &mut self.parts {
98 if !wrapper.merging {
99 wrapper.merging = true;
100 collected_parts.push(PartToMerge::Bulk {
101 part: wrapper.part.clone(),
102 file_id: wrapper.file_id,
103 });
104 }
105 }
106 collected_parts
107 }
108
109 fn collect_encoded_parts_to_merge(&mut self) -> Vec<PartToMerge> {
112 let min_size = self
114 .encoded_parts
115 .iter()
116 .filter(|wrapper| !wrapper.merging)
117 .map(|wrapper| wrapper.part.size_bytes())
118 .min();
119
120 let Some(min_size) = min_size else {
121 return Vec::new();
122 };
123
124 let max_allowed_size = min_size.saturating_mul(16).min(4 * 1024 * 1024);
125 let mut collected_parts = Vec::new();
126
127 for wrapper in &mut self.encoded_parts {
128 if !wrapper.merging {
129 let size = wrapper.part.size_bytes();
130 if size <= max_allowed_size {
131 wrapper.merging = true;
132 collected_parts.push(PartToMerge::Encoded {
133 part: wrapper.part.clone(),
134 file_id: wrapper.file_id,
135 });
136 }
137 }
138 }
139 collected_parts
140 }
141
142 fn install_merged_parts<I>(
145 &mut self,
146 merged_parts: I,
147 merged_file_ids: &HashSet<FileId>,
148 merge_encoded: bool,
149 ) -> usize
150 where
151 I: IntoIterator<Item = EncodedBulkPart>,
152 {
153 let mut total_output_rows = 0;
154
155 for encoded_part in merged_parts {
156 total_output_rows += encoded_part.metadata().num_rows;
157 self.encoded_parts.push(EncodedPartWrapper {
158 part: encoded_part,
159 file_id: FileId::random(),
160 merging: false,
161 });
162 }
163
164 if merge_encoded {
165 self.encoded_parts
166 .retain(|wrapper| !merged_file_ids.contains(&wrapper.file_id));
167 } else {
168 self.parts
169 .retain(|wrapper| !merged_file_ids.contains(&wrapper.file_id));
170 }
171
172 total_output_rows
173 }
174
175 fn reset_merging_flags(&mut self, file_ids: &HashSet<FileId>, merge_encoded: bool) {
178 if merge_encoded {
179 for wrapper in &mut self.encoded_parts {
180 if file_ids.contains(&wrapper.file_id) {
181 wrapper.merging = false;
182 }
183 }
184 } else {
185 for wrapper in &mut self.parts {
186 if file_ids.contains(&wrapper.file_id) {
187 wrapper.merging = false;
188 }
189 }
190 }
191 }
192}
193
194struct MergingFlagsGuard<'a> {
197 bulk_parts: &'a RwLock<BulkParts>,
198 file_ids: &'a HashSet<FileId>,
199 merge_encoded: bool,
200 success: bool,
201}
202
203impl<'a> MergingFlagsGuard<'a> {
204 fn new(
206 bulk_parts: &'a RwLock<BulkParts>,
207 file_ids: &'a HashSet<FileId>,
208 merge_encoded: bool,
209 ) -> Self {
210 Self {
211 bulk_parts,
212 file_ids,
213 merge_encoded,
214 success: false,
215 }
216 }
217
218 fn mark_success(&mut self) {
221 self.success = true;
222 }
223}
224
225impl<'a> Drop for MergingFlagsGuard<'a> {
226 fn drop(&mut self) {
227 if !self.success
228 && let Ok(mut parts) = self.bulk_parts.write()
229 {
230 parts.reset_merging_flags(self.file_ids, self.merge_encoded);
231 }
232 }
233}
234
235pub struct BulkMemtable {
237 id: MemtableId,
238 parts: Arc<RwLock<BulkParts>>,
239 metadata: RegionMetadataRef,
240 alloc_tracker: AllocTracker,
241 max_timestamp: AtomicI64,
242 min_timestamp: AtomicI64,
243 max_sequence: AtomicU64,
244 num_rows: AtomicUsize,
245 flat_arrow_schema: SchemaRef,
247 compactor: Arc<Mutex<MemtableCompactor>>,
249 compact_dispatcher: Option<Arc<CompactDispatcher>>,
251 append_mode: bool,
253 merge_mode: MergeMode,
255}
256
257impl std::fmt::Debug for BulkMemtable {
258 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
259 f.debug_struct("BulkMemtable")
260 .field("id", &self.id)
261 .field("num_rows", &self.num_rows.load(Ordering::Relaxed))
262 .field("min_timestamp", &self.min_timestamp.load(Ordering::Relaxed))
263 .field("max_timestamp", &self.max_timestamp.load(Ordering::Relaxed))
264 .field("max_sequence", &self.max_sequence.load(Ordering::Relaxed))
265 .finish()
266 }
267}
268
269impl Memtable for BulkMemtable {
270 fn id(&self) -> MemtableId {
271 self.id
272 }
273
274 fn write(&self, _kvs: &KeyValues) -> Result<()> {
275 UnsupportedOperationSnafu {
276 err_msg: "write() is not supported for bulk memtable",
277 }
278 .fail()
279 }
280
281 fn write_one(&self, _key_value: KeyValue) -> Result<()> {
282 UnsupportedOperationSnafu {
283 err_msg: "write_one() is not supported for bulk memtable",
284 }
285 .fail()
286 }
287
288 fn write_bulk(&self, fragment: BulkPart) -> Result<()> {
289 let local_metrics = WriteMetrics {
290 key_bytes: 0,
291 value_bytes: fragment.estimated_size(),
292 min_ts: fragment.min_timestamp,
293 max_ts: fragment.max_timestamp,
294 num_rows: fragment.num_rows(),
295 max_sequence: fragment.sequence,
296 };
297
298 {
299 let mut bulk_parts = self.parts.write().unwrap();
300 bulk_parts.parts.push(BulkPartWrapper {
301 part: fragment,
302 file_id: FileId::random(),
303 merging: false,
304 });
305
306 self.update_stats(local_metrics);
311 }
312
313 if self.should_compact() {
314 self.schedule_compact();
315 }
316
317 Ok(())
318 }
319
320 #[cfg(any(test, feature = "test"))]
321 fn iter(
322 &self,
323 _projection: Option<&[ColumnId]>,
324 _predicate: Option<table::predicate::Predicate>,
325 _sequence: Option<SequenceRange>,
326 ) -> Result<crate::memtable::BoxedBatchIterator> {
327 todo!()
328 }
329
330 fn ranges(
331 &self,
332 projection: Option<&[ColumnId]>,
333 options: RangesOptions,
334 ) -> Result<MemtableRanges> {
335 let predicate = options.predicate;
336 let sequence = options.sequence;
337 let mut ranges = BTreeMap::new();
338 let mut range_id = 0;
339
340 let context = Arc::new(BulkIterContext::new_with_pre_filter_mode(
342 self.metadata.clone(),
343 projection,
344 predicate.predicate().cloned(),
345 options.for_flush,
346 options.pre_filter_mode,
347 )?);
348
349 {
351 let bulk_parts = self.parts.read().unwrap();
352
353 for part_wrapper in bulk_parts.parts.iter() {
355 if part_wrapper.part.num_rows() == 0 {
357 continue;
358 }
359
360 let range = MemtableRange::new(
361 Arc::new(MemtableRangeContext::new(
362 self.id,
363 Box::new(BulkRangeIterBuilder {
364 part: part_wrapper.part.clone(),
365 context: context.clone(),
366 sequence,
367 }),
368 predicate.clone(),
369 )),
370 part_wrapper.part.num_rows(),
371 );
372 ranges.insert(range_id, range);
373 range_id += 1;
374 }
375
376 for encoded_part_wrapper in bulk_parts.encoded_parts.iter() {
378 if encoded_part_wrapper.part.metadata().num_rows == 0 {
380 continue;
381 }
382
383 let range = MemtableRange::new(
384 Arc::new(MemtableRangeContext::new(
385 self.id,
386 Box::new(EncodedBulkRangeIterBuilder {
387 file_id: encoded_part_wrapper.file_id,
388 part: encoded_part_wrapper.part.clone(),
389 context: context.clone(),
390 sequence,
391 }),
392 predicate.clone(),
393 )),
394 encoded_part_wrapper.part.metadata().num_rows,
395 );
396 ranges.insert(range_id, range);
397 range_id += 1;
398 }
399 }
400
401 let mut stats = self.stats();
402 stats.num_ranges = ranges.len();
403
404 Ok(MemtableRanges { ranges, stats })
406 }
407
408 fn is_empty(&self) -> bool {
409 let bulk_parts = self.parts.read().unwrap();
410 bulk_parts.is_empty()
411 }
412
413 fn freeze(&self) -> Result<()> {
414 self.alloc_tracker.done_allocating();
415 Ok(())
416 }
417
418 fn stats(&self) -> MemtableStats {
419 let estimated_bytes = self.alloc_tracker.bytes_allocated();
420
421 if estimated_bytes == 0 || self.num_rows.load(Ordering::Relaxed) == 0 {
422 return MemtableStats {
423 estimated_bytes,
424 time_range: None,
425 num_rows: 0,
426 num_ranges: 0,
427 max_sequence: 0,
428 series_count: 0,
429 };
430 }
431
432 let ts_type = self
433 .metadata
434 .time_index_column()
435 .column_schema
436 .data_type
437 .clone()
438 .as_timestamp()
439 .expect("Timestamp column must have timestamp type");
440 let max_timestamp = ts_type.create_timestamp(self.max_timestamp.load(Ordering::Relaxed));
441 let min_timestamp = ts_type.create_timestamp(self.min_timestamp.load(Ordering::Relaxed));
442
443 let num_ranges = self.parts.read().unwrap().num_parts();
444
445 MemtableStats {
446 estimated_bytes,
447 time_range: Some((min_timestamp, max_timestamp)),
448 num_rows: self.num_rows.load(Ordering::Relaxed),
449 num_ranges,
450 max_sequence: self.max_sequence.load(Ordering::Relaxed),
451 series_count: self.estimated_series_count(),
452 }
453 }
454
455 fn fork(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef {
456 let flat_arrow_schema = to_flat_sst_arrow_schema(
458 metadata,
459 &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
460 );
461
462 Arc::new(Self {
463 id,
464 parts: Arc::new(RwLock::new(BulkParts::default())),
465 metadata: metadata.clone(),
466 alloc_tracker: AllocTracker::new(self.alloc_tracker.write_buffer_manager()),
467 max_timestamp: AtomicI64::new(i64::MIN),
468 min_timestamp: AtomicI64::new(i64::MAX),
469 max_sequence: AtomicU64::new(0),
470 num_rows: AtomicUsize::new(0),
471 flat_arrow_schema,
472 compactor: Arc::new(Mutex::new(MemtableCompactor::new(metadata.region_id, id))),
473 compact_dispatcher: self.compact_dispatcher.clone(),
474 append_mode: self.append_mode,
475 merge_mode: self.merge_mode,
476 })
477 }
478
479 fn compact(&self, for_flush: bool) -> Result<()> {
480 let mut compactor = self.compactor.lock().unwrap();
481
482 if for_flush {
483 return Ok(());
484 }
485
486 let should_merge = self.parts.read().unwrap().should_merge_bulk_parts();
488 if should_merge {
489 compactor.merge_bulk_parts(
490 &self.flat_arrow_schema,
491 &self.parts,
492 &self.metadata,
493 !self.append_mode,
494 self.merge_mode,
495 )?;
496 }
497
498 let should_merge = self.parts.read().unwrap().should_merge_encoded_parts();
500 if should_merge {
501 compactor.merge_encoded_parts(
502 &self.flat_arrow_schema,
503 &self.parts,
504 &self.metadata,
505 !self.append_mode,
506 self.merge_mode,
507 )?;
508 }
509
510 Ok(())
511 }
512}
513
514impl BulkMemtable {
515 pub fn new(
517 id: MemtableId,
518 metadata: RegionMetadataRef,
519 write_buffer_manager: Option<WriteBufferManagerRef>,
520 compact_dispatcher: Option<Arc<CompactDispatcher>>,
521 append_mode: bool,
522 merge_mode: MergeMode,
523 ) -> Self {
524 let flat_arrow_schema = to_flat_sst_arrow_schema(
525 &metadata,
526 &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
527 );
528
529 let region_id = metadata.region_id;
530 Self {
531 id,
532 parts: Arc::new(RwLock::new(BulkParts::default())),
533 metadata,
534 alloc_tracker: AllocTracker::new(write_buffer_manager),
535 max_timestamp: AtomicI64::new(i64::MIN),
536 min_timestamp: AtomicI64::new(i64::MAX),
537 max_sequence: AtomicU64::new(0),
538 num_rows: AtomicUsize::new(0),
539 flat_arrow_schema,
540 compactor: Arc::new(Mutex::new(MemtableCompactor::new(region_id, id))),
541 compact_dispatcher,
542 append_mode,
543 merge_mode,
544 }
545 }
546
547 fn update_stats(&self, stats: WriteMetrics) {
551 self.alloc_tracker
552 .on_allocation(stats.key_bytes + stats.value_bytes);
553
554 self.max_timestamp
555 .fetch_max(stats.max_ts, Ordering::Relaxed);
556 self.min_timestamp
557 .fetch_min(stats.min_ts, Ordering::Relaxed);
558 self.max_sequence
559 .fetch_max(stats.max_sequence, Ordering::Relaxed);
560 self.num_rows.fetch_add(stats.num_rows, Ordering::Relaxed);
561 }
562
563 fn estimated_series_count(&self) -> usize {
565 let bulk_parts = self.parts.read().unwrap();
566 bulk_parts
567 .parts
568 .iter()
569 .map(|part_wrapper| part_wrapper.part.estimated_series_count())
570 .sum()
571 }
572
573 fn should_compact(&self) -> bool {
575 let parts = self.parts.read().unwrap();
576 parts.should_merge_bulk_parts() || parts.should_merge_encoded_parts()
577 }
578
579 fn schedule_compact(&self) {
581 if let Some(dispatcher) = &self.compact_dispatcher {
582 let task = MemCompactTask {
583 metadata: self.metadata.clone(),
584 parts: self.parts.clone(),
585 flat_arrow_schema: self.flat_arrow_schema.clone(),
586 compactor: self.compactor.clone(),
587 append_mode: self.append_mode,
588 merge_mode: self.merge_mode,
589 };
590
591 dispatcher.dispatch_compact(task);
592 } else {
593 if let Err(e) = self.compact(false) {
595 common_telemetry::error!(e; "Failed to compact table");
596 }
597 }
598 }
599}
600
601struct BulkRangeIterBuilder {
603 part: BulkPart,
604 context: Arc<BulkIterContext>,
605 sequence: Option<SequenceRange>,
606}
607
608impl IterBuilder for BulkRangeIterBuilder {
609 fn build(&self, _metrics: Option<MemScanMetrics>) -> Result<BoxedBatchIterator> {
610 UnsupportedOperationSnafu {
611 err_msg: "BatchIterator is not supported for bulk memtable",
612 }
613 .fail()
614 }
615
616 fn is_record_batch(&self) -> bool {
617 true
618 }
619
620 fn build_record_batch(
621 &self,
622 _metrics: Option<MemScanMetrics>,
623 ) -> Result<BoxedRecordBatchIterator> {
624 let iter = BulkPartRecordBatchIter::new(
625 self.part.batch.clone(),
626 self.context.clone(),
627 self.sequence,
628 );
629
630 Ok(Box::new(iter))
631 }
632
633 fn encoded_range(&self) -> Option<EncodedRange> {
634 None
635 }
636}
637
638struct EncodedBulkRangeIterBuilder {
640 file_id: FileId,
641 part: EncodedBulkPart,
642 context: Arc<BulkIterContext>,
643 sequence: Option<SequenceRange>,
644}
645
646impl IterBuilder for EncodedBulkRangeIterBuilder {
647 fn build(&self, _metrics: Option<MemScanMetrics>) -> Result<BoxedBatchIterator> {
648 UnsupportedOperationSnafu {
649 err_msg: "BatchIterator is not supported for encoded bulk memtable",
650 }
651 .fail()
652 }
653
654 fn is_record_batch(&self) -> bool {
655 true
656 }
657
658 fn build_record_batch(
659 &self,
660 _metrics: Option<MemScanMetrics>,
661 ) -> Result<BoxedRecordBatchIterator> {
662 if let Some(iter) = self.part.read(self.context.clone(), self.sequence)? {
663 Ok(iter)
664 } else {
665 Ok(Box::new(std::iter::empty()))
667 }
668 }
669
670 fn encoded_range(&self) -> Option<EncodedRange> {
671 Some(EncodedRange {
672 data: self.part.data().clone(),
673 sst_info: self.part.to_sst_info(self.file_id),
674 })
675 }
676}
677
678struct BulkPartWrapper {
679 part: BulkPart,
680 file_id: FileId,
682 merging: bool,
684}
685
686struct EncodedPartWrapper {
687 part: EncodedBulkPart,
688 file_id: FileId,
690 merging: bool,
692}
693
694#[derive(Clone)]
696enum PartToMerge {
697 Bulk { part: BulkPart, file_id: FileId },
699 Encoded {
701 part: EncodedBulkPart,
702 file_id: FileId,
703 },
704}
705
706impl PartToMerge {
707 fn file_id(&self) -> FileId {
709 match self {
710 PartToMerge::Bulk { file_id, .. } => *file_id,
711 PartToMerge::Encoded { file_id, .. } => *file_id,
712 }
713 }
714
715 fn min_timestamp(&self) -> i64 {
717 match self {
718 PartToMerge::Bulk { part, .. } => part.min_timestamp,
719 PartToMerge::Encoded { part, .. } => part.metadata().min_timestamp,
720 }
721 }
722
723 fn max_timestamp(&self) -> i64 {
725 match self {
726 PartToMerge::Bulk { part, .. } => part.max_timestamp,
727 PartToMerge::Encoded { part, .. } => part.metadata().max_timestamp,
728 }
729 }
730
731 fn num_rows(&self) -> usize {
733 match self {
734 PartToMerge::Bulk { part, .. } => part.num_rows(),
735 PartToMerge::Encoded { part, .. } => part.metadata().num_rows,
736 }
737 }
738
739 fn create_iterator(
741 self,
742 context: Arc<BulkIterContext>,
743 ) -> Result<Option<BoxedRecordBatchIterator>> {
744 match self {
745 PartToMerge::Bulk { part, .. } => {
746 let iter = BulkPartRecordBatchIter::new(
747 part.batch, context, None, );
749 Ok(Some(Box::new(iter) as BoxedRecordBatchIterator))
750 }
751 PartToMerge::Encoded { part, .. } => part.read(context, None),
752 }
753 }
754}
755
756struct MemtableCompactor {
757 region_id: RegionId,
758 memtable_id: MemtableId,
759}
760
761impl MemtableCompactor {
762 fn new(region_id: RegionId, memtable_id: MemtableId) -> Self {
764 Self {
765 region_id,
766 memtable_id,
767 }
768 }
769
770 fn merge_bulk_parts(
772 &mut self,
773 arrow_schema: &SchemaRef,
774 bulk_parts: &RwLock<BulkParts>,
775 metadata: &RegionMetadataRef,
776 dedup: bool,
777 merge_mode: MergeMode,
778 ) -> Result<()> {
779 let start = Instant::now();
780
781 let parts_to_merge = bulk_parts.write().unwrap().collect_bulk_parts_to_merge();
783 if parts_to_merge.is_empty() {
784 return Ok(());
785 }
786
787 let merged_file_ids: HashSet<FileId> =
788 parts_to_merge.iter().map(|part| part.file_id()).collect();
789 let mut guard = MergingFlagsGuard::new(bulk_parts, &merged_file_ids, false);
790
791 let mut sorted_parts = parts_to_merge;
793 sorted_parts.sort_unstable_by_key(|part| part.num_rows());
794
795 let part_groups: Vec<Vec<PartToMerge>> = sorted_parts
797 .chunks(16)
798 .map(|chunk| chunk.to_vec())
799 .collect();
800
801 let total_groups = part_groups.len();
802 let total_parts_to_merge: usize = part_groups.iter().map(|group| group.len()).sum();
803 let merged_parts = part_groups
804 .into_par_iter()
805 .map(|group| Self::merge_parts_group(group, arrow_schema, metadata, dedup, merge_mode))
806 .collect::<Result<Vec<Option<EncodedBulkPart>>>>()?;
807
808 let total_output_rows = {
810 let mut parts = bulk_parts.write().unwrap();
811 parts.install_merged_parts(merged_parts.into_iter().flatten(), &merged_file_ids, false)
812 };
813
814 guard.mark_success();
815
816 common_telemetry::debug!(
817 "BulkMemtable {} {} concurrent compact {} groups, {} bulk parts, {} rows, cost: {:?}",
818 self.region_id,
819 self.memtable_id,
820 total_groups,
821 total_parts_to_merge,
822 total_output_rows,
823 start.elapsed()
824 );
825
826 Ok(())
827 }
828
829 fn merge_encoded_parts(
831 &mut self,
832 arrow_schema: &SchemaRef,
833 bulk_parts: &RwLock<BulkParts>,
834 metadata: &RegionMetadataRef,
835 dedup: bool,
836 merge_mode: MergeMode,
837 ) -> Result<()> {
838 let start = Instant::now();
839
840 let parts_to_merge = {
842 let mut parts = bulk_parts.write().unwrap();
843 parts.collect_encoded_parts_to_merge()
844 };
845
846 if parts_to_merge.is_empty() {
847 return Ok(());
848 }
849
850 let merged_file_ids: HashSet<FileId> =
851 parts_to_merge.iter().map(|part| part.file_id()).collect();
852 let mut guard = MergingFlagsGuard::new(bulk_parts, &merged_file_ids, true);
853
854 if parts_to_merge.len() == 1 {
855 return Ok(());
857 }
858
859 let part_groups: Vec<Vec<PartToMerge>> = parts_to_merge
861 .chunks(16)
862 .map(|chunk| chunk.to_vec())
863 .collect();
864
865 let total_groups = part_groups.len();
866 let total_parts_to_merge: usize = part_groups.iter().map(|group| group.len()).sum();
867
868 let merged_parts = part_groups
869 .into_par_iter()
870 .map(|group| Self::merge_parts_group(group, arrow_schema, metadata, dedup, merge_mode))
871 .collect::<Result<Vec<Option<EncodedBulkPart>>>>()?;
872
873 let total_output_rows = {
875 let mut parts = bulk_parts.write().unwrap();
876 parts.install_merged_parts(merged_parts.into_iter().flatten(), &merged_file_ids, true)
877 };
878
879 guard.mark_success();
881
882 common_telemetry::debug!(
883 "BulkMemtable {} {} concurrent compact {} groups, {} encoded parts, {} rows, cost: {:?}",
884 self.region_id,
885 self.memtable_id,
886 total_groups,
887 total_parts_to_merge,
888 total_output_rows,
889 start.elapsed()
890 );
891
892 Ok(())
893 }
894
895 fn merge_parts_group(
897 parts_to_merge: Vec<PartToMerge>,
898 arrow_schema: &SchemaRef,
899 metadata: &RegionMetadataRef,
900 dedup: bool,
901 merge_mode: MergeMode,
902 ) -> Result<Option<EncodedBulkPart>> {
903 if parts_to_merge.is_empty() {
904 return Ok(None);
905 }
906
907 let min_timestamp = parts_to_merge
909 .iter()
910 .map(|p| p.min_timestamp())
911 .min()
912 .unwrap_or(i64::MAX);
913 let max_timestamp = parts_to_merge
914 .iter()
915 .map(|p| p.max_timestamp())
916 .max()
917 .unwrap_or(i64::MIN);
918
919 let context = Arc::new(BulkIterContext::new(
920 metadata.clone(),
921 None, None, true,
924 )?);
925
926 let iterators: Vec<BoxedRecordBatchIterator> = parts_to_merge
928 .into_iter()
929 .filter_map(|part| part.create_iterator(context.clone()).ok().flatten())
930 .collect();
931
932 if iterators.is_empty() {
933 return Ok(None);
934 }
935
936 let merged_iter =
937 FlatMergeIterator::new(arrow_schema.clone(), iterators, DEFAULT_READ_BATCH_SIZE)?;
938
939 let boxed_iter: BoxedRecordBatchIterator = if dedup {
940 match merge_mode {
942 MergeMode::LastRow => {
943 let dedup_iter = FlatDedupIterator::new(merged_iter, FlatLastRow::new(false));
944 Box::new(dedup_iter)
945 }
946 MergeMode::LastNonNull => {
947 let field_column_count =
950 metadata.column_metadatas.len() - 1 - metadata.primary_key.len();
951 let total_columns = arrow_schema.fields().len();
952 let field_column_start =
953 total_columns - FIXED_POS_COLUMN_NUM - field_column_count;
954
955 let dedup_iter = FlatDedupIterator::new(
956 merged_iter,
957 FlatLastNonNull::new(field_column_start, false),
958 );
959 Box::new(dedup_iter)
960 }
961 }
962 } else {
963 Box::new(merged_iter)
964 };
965
966 let encoder = BulkPartEncoder::new(metadata.clone(), DEFAULT_ROW_GROUP_SIZE)?;
968 let mut metrics = BulkPartEncodeMetrics::default();
969 let encoded_part = encoder.encode_record_batch_iter(
970 boxed_iter,
971 arrow_schema.clone(),
972 min_timestamp,
973 max_timestamp,
974 &mut metrics,
975 )?;
976
977 common_telemetry::trace!("merge_parts_group metrics: {:?}", metrics);
978
979 Ok(encoded_part)
980 }
981}
982
983struct MemCompactTask {
985 metadata: RegionMetadataRef,
986 parts: Arc<RwLock<BulkParts>>,
987
988 flat_arrow_schema: SchemaRef,
990 compactor: Arc<Mutex<MemtableCompactor>>,
992 append_mode: bool,
994 merge_mode: MergeMode,
996}
997
998impl MemCompactTask {
999 fn compact(&self) -> Result<()> {
1000 let mut compactor = self.compactor.lock().unwrap();
1001
1002 let should_merge = self.parts.read().unwrap().should_merge_bulk_parts();
1004 if should_merge {
1005 compactor.merge_bulk_parts(
1006 &self.flat_arrow_schema,
1007 &self.parts,
1008 &self.metadata,
1009 !self.append_mode,
1010 self.merge_mode,
1011 )?;
1012 }
1013
1014 let should_merge = self.parts.read().unwrap().should_merge_encoded_parts();
1016 if should_merge {
1017 compactor.merge_encoded_parts(
1018 &self.flat_arrow_schema,
1019 &self.parts,
1020 &self.metadata,
1021 !self.append_mode,
1022 self.merge_mode,
1023 )?;
1024 }
1025
1026 Ok(())
1027 }
1028}
1029
1030#[derive(Debug)]
1032pub struct CompactDispatcher {
1033 semaphore: Arc<Semaphore>,
1034}
1035
1036impl CompactDispatcher {
1037 pub fn new(permits: usize) -> Self {
1039 Self {
1040 semaphore: Arc::new(Semaphore::new(permits)),
1041 }
1042 }
1043
1044 fn dispatch_compact(&self, task: MemCompactTask) {
1046 let semaphore = self.semaphore.clone();
1047 common_runtime::spawn_global(async move {
1048 let Ok(_permit) = semaphore.acquire().await else {
1049 return;
1050 };
1051
1052 common_runtime::spawn_blocking_global(move || {
1053 if let Err(e) = task.compact() {
1054 common_telemetry::error!(e; "Failed to compact memtable, region: {}", task.metadata.region_id);
1055 }
1056 });
1057 });
1058 }
1059}
1060
1061#[derive(Debug, Default)]
1063pub struct BulkMemtableBuilder {
1064 write_buffer_manager: Option<WriteBufferManagerRef>,
1065 compact_dispatcher: Option<Arc<CompactDispatcher>>,
1066 append_mode: bool,
1067 merge_mode: MergeMode,
1068}
1069
1070impl BulkMemtableBuilder {
1071 pub fn new(
1073 write_buffer_manager: Option<WriteBufferManagerRef>,
1074 append_mode: bool,
1075 merge_mode: MergeMode,
1076 ) -> Self {
1077 Self {
1078 write_buffer_manager,
1079 compact_dispatcher: None,
1080 append_mode,
1081 merge_mode,
1082 }
1083 }
1084
1085 pub fn with_compact_dispatcher(mut self, compact_dispatcher: Arc<CompactDispatcher>) -> Self {
1087 self.compact_dispatcher = Some(compact_dispatcher);
1088 self
1089 }
1090}
1091
1092impl MemtableBuilder for BulkMemtableBuilder {
1093 fn build(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef {
1094 Arc::new(BulkMemtable::new(
1095 id,
1096 metadata.clone(),
1097 self.write_buffer_manager.clone(),
1098 self.compact_dispatcher.clone(),
1099 self.append_mode,
1100 self.merge_mode,
1101 ))
1102 }
1103
1104 fn use_bulk_insert(&self, _metadata: &RegionMetadataRef) -> bool {
1105 true
1106 }
1107}
1108
1109#[cfg(test)]
1110mod tests {
1111
1112 use mito_codec::row_converter::build_primary_key_codec;
1113
1114 use super::*;
1115 use crate::memtable::bulk::part::BulkPartConverter;
1116 use crate::read::scan_region::PredicateGroup;
1117 use crate::sst::{FlatSchemaOptions, to_flat_sst_arrow_schema};
1118 use crate::test_util::memtable_util::{build_key_values_with_ts_seq_values, metadata_for_test};
1119
1120 fn create_bulk_part_with_converter(
1121 k0: &str,
1122 k1: u32,
1123 timestamps: Vec<i64>,
1124 values: Vec<Option<f64>>,
1125 sequence: u64,
1126 ) -> Result<BulkPart> {
1127 let metadata = metadata_for_test();
1128 let capacity = 100;
1129 let primary_key_codec = build_primary_key_codec(&metadata);
1130 let schema = to_flat_sst_arrow_schema(
1131 &metadata,
1132 &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
1133 );
1134
1135 let mut converter =
1136 BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec, true);
1137
1138 let key_values = build_key_values_with_ts_seq_values(
1139 &metadata,
1140 k0.to_string(),
1141 k1,
1142 timestamps.into_iter(),
1143 values.into_iter(),
1144 sequence,
1145 );
1146
1147 converter.append_key_values(&key_values)?;
1148 converter.convert()
1149 }
1150
1151 #[test]
1152 fn test_bulk_memtable_write_read() {
1153 let metadata = metadata_for_test();
1154 let memtable =
1155 BulkMemtable::new(999, metadata.clone(), None, None, false, MergeMode::LastRow);
1156
1157 let test_data = [
1158 (
1159 "key_a",
1160 1u32,
1161 vec![1000i64, 2000i64],
1162 vec![Some(10.5), Some(20.5)],
1163 100u64,
1164 ),
1165 (
1166 "key_b",
1167 2u32,
1168 vec![1500i64, 2500i64],
1169 vec![Some(15.5), Some(25.5)],
1170 200u64,
1171 ),
1172 ("key_c", 3u32, vec![3000i64], vec![Some(30.5)], 300u64),
1173 ];
1174
1175 for (k0, k1, timestamps, values, seq) in test_data.iter() {
1176 let part =
1177 create_bulk_part_with_converter(k0, *k1, timestamps.clone(), values.clone(), *seq)
1178 .unwrap();
1179 memtable.write_bulk(part).unwrap();
1180 }
1181
1182 let stats = memtable.stats();
1183 assert_eq!(5, stats.num_rows);
1184 assert_eq!(3, stats.num_ranges);
1185 assert_eq!(300, stats.max_sequence);
1186
1187 let (min_ts, max_ts) = stats.time_range.unwrap();
1188 assert_eq!(1000, min_ts.value());
1189 assert_eq!(3000, max_ts.value());
1190
1191 let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
1192 let ranges = memtable
1193 .ranges(
1194 None,
1195 RangesOptions::default().with_predicate(predicate_group),
1196 )
1197 .unwrap();
1198
1199 assert_eq!(3, ranges.ranges.len());
1200 assert_eq!(5, ranges.stats.num_rows);
1201
1202 for (_range_id, range) in ranges.ranges.iter() {
1203 assert!(range.num_rows() > 0);
1204 assert!(range.is_record_batch());
1205
1206 let record_batch_iter = range.build_record_batch_iter(None).unwrap();
1207
1208 let mut total_rows = 0;
1209 for batch_result in record_batch_iter {
1210 let batch = batch_result.unwrap();
1211 total_rows += batch.num_rows();
1212 assert!(batch.num_rows() > 0);
1213 assert_eq!(8, batch.num_columns());
1214 }
1215 assert_eq!(total_rows, range.num_rows());
1216 }
1217 }
1218
1219 #[test]
1220 fn test_bulk_memtable_ranges_with_projection() {
1221 let metadata = metadata_for_test();
1222 let memtable =
1223 BulkMemtable::new(111, metadata.clone(), None, None, false, MergeMode::LastRow);
1224
1225 let bulk_part = create_bulk_part_with_converter(
1226 "projection_test",
1227 5,
1228 vec![5000, 6000, 7000],
1229 vec![Some(50.0), Some(60.0), Some(70.0)],
1230 500,
1231 )
1232 .unwrap();
1233
1234 memtable.write_bulk(bulk_part).unwrap();
1235
1236 let projection = vec![4u32];
1237 let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
1238 let ranges = memtable
1239 .ranges(
1240 Some(&projection),
1241 RangesOptions::default().with_predicate(predicate_group),
1242 )
1243 .unwrap();
1244
1245 assert_eq!(1, ranges.ranges.len());
1246 let range = ranges.ranges.get(&0).unwrap();
1247
1248 assert!(range.is_record_batch());
1249 let record_batch_iter = range.build_record_batch_iter(None).unwrap();
1250
1251 let mut total_rows = 0;
1252 for batch_result in record_batch_iter {
1253 let batch = batch_result.unwrap();
1254 assert!(batch.num_rows() > 0);
1255 assert_eq!(5, batch.num_columns());
1256 total_rows += batch.num_rows();
1257 }
1258 assert_eq!(3, total_rows);
1259 }
1260
1261 #[test]
1262 fn test_bulk_memtable_unsupported_operations() {
1263 let metadata = metadata_for_test();
1264 let memtable =
1265 BulkMemtable::new(111, metadata.clone(), None, None, false, MergeMode::LastRow);
1266
1267 let key_values = build_key_values_with_ts_seq_values(
1268 &metadata,
1269 "test".to_string(),
1270 1,
1271 vec![1000].into_iter(),
1272 vec![Some(1.0)].into_iter(),
1273 1,
1274 );
1275
1276 let err = memtable.write(&key_values).unwrap_err();
1277 assert!(err.to_string().contains("not supported"));
1278
1279 let kv = key_values.iter().next().unwrap();
1280 let err = memtable.write_one(kv).unwrap_err();
1281 assert!(err.to_string().contains("not supported"));
1282 }
1283
1284 #[test]
1285 fn test_bulk_memtable_freeze() {
1286 let metadata = metadata_for_test();
1287 let memtable =
1288 BulkMemtable::new(222, metadata.clone(), None, None, false, MergeMode::LastRow);
1289
1290 let bulk_part = create_bulk_part_with_converter(
1291 "freeze_test",
1292 10,
1293 vec![10000],
1294 vec![Some(100.0)],
1295 1000,
1296 )
1297 .unwrap();
1298
1299 memtable.write_bulk(bulk_part).unwrap();
1300 memtable.freeze().unwrap();
1301
1302 let stats_after_freeze = memtable.stats();
1303 assert_eq!(1, stats_after_freeze.num_rows);
1304 }
1305
1306 #[test]
1307 fn test_bulk_memtable_fork() {
1308 let metadata = metadata_for_test();
1309 let original_memtable =
1310 BulkMemtable::new(333, metadata.clone(), None, None, false, MergeMode::LastRow);
1311
1312 let bulk_part =
1313 create_bulk_part_with_converter("fork_test", 15, vec![15000], vec![Some(150.0)], 1500)
1314 .unwrap();
1315
1316 original_memtable.write_bulk(bulk_part).unwrap();
1317
1318 let forked_memtable = original_memtable.fork(444, &metadata);
1319
1320 assert_eq!(forked_memtable.id(), 444);
1321 assert!(forked_memtable.is_empty());
1322 assert_eq!(0, forked_memtable.stats().num_rows);
1323
1324 assert!(!original_memtable.is_empty());
1325 assert_eq!(1, original_memtable.stats().num_rows);
1326 }
1327
1328 #[test]
1329 fn test_bulk_memtable_ranges_multiple_parts() {
1330 let metadata = metadata_for_test();
1331 let memtable =
1332 BulkMemtable::new(777, metadata.clone(), None, None, false, MergeMode::LastRow);
1333
1334 let parts_data = vec![
1335 (
1336 "part1",
1337 1u32,
1338 vec![1000i64, 1100i64],
1339 vec![Some(10.0), Some(11.0)],
1340 100u64,
1341 ),
1342 (
1343 "part2",
1344 2u32,
1345 vec![2000i64, 2100i64],
1346 vec![Some(20.0), Some(21.0)],
1347 200u64,
1348 ),
1349 ("part3", 3u32, vec![3000i64], vec![Some(30.0)], 300u64),
1350 ];
1351
1352 for (k0, k1, timestamps, values, seq) in parts_data {
1353 let part = create_bulk_part_with_converter(k0, k1, timestamps, values, seq).unwrap();
1354 memtable.write_bulk(part).unwrap();
1355 }
1356
1357 let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
1358 let ranges = memtable
1359 .ranges(
1360 None,
1361 RangesOptions::default().with_predicate(predicate_group),
1362 )
1363 .unwrap();
1364
1365 assert_eq!(3, ranges.ranges.len());
1366 assert_eq!(5, ranges.stats.num_rows);
1367 assert_eq!(3, ranges.stats.num_ranges);
1368
1369 for (range_id, range) in ranges.ranges.iter() {
1370 assert!(*range_id < 3);
1371 assert!(range.num_rows() > 0);
1372 assert!(range.is_record_batch());
1373 }
1374 }
1375
1376 #[test]
1377 fn test_bulk_memtable_ranges_with_sequence_filter() {
1378 let metadata = metadata_for_test();
1379 let memtable =
1380 BulkMemtable::new(888, metadata.clone(), None, None, false, MergeMode::LastRow);
1381
1382 let part = create_bulk_part_with_converter(
1383 "seq_test",
1384 1,
1385 vec![1000, 2000, 3000],
1386 vec![Some(10.0), Some(20.0), Some(30.0)],
1387 500,
1388 )
1389 .unwrap();
1390
1391 memtable.write_bulk(part).unwrap();
1392
1393 let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
1394 let sequence_filter = Some(SequenceRange::LtEq { max: 400 }); let ranges = memtable
1396 .ranges(
1397 None,
1398 RangesOptions::default()
1399 .with_predicate(predicate_group)
1400 .with_sequence(sequence_filter),
1401 )
1402 .unwrap();
1403
1404 assert_eq!(1, ranges.ranges.len());
1405 let range = ranges.ranges.get(&0).unwrap();
1406
1407 let mut record_batch_iter = range.build_record_batch_iter(None).unwrap();
1408 assert!(record_batch_iter.next().is_none());
1409 }
1410
1411 #[test]
1412 fn test_bulk_memtable_ranges_with_encoded_parts() {
1413 let metadata = metadata_for_test();
1414 let memtable =
1415 BulkMemtable::new(999, metadata.clone(), None, None, false, MergeMode::LastRow);
1416
1417 for i in 0..10 {
1419 let part = create_bulk_part_with_converter(
1420 &format!("key_{}", i),
1421 i,
1422 vec![1000 + i as i64 * 100],
1423 vec![Some(i as f64 * 10.0)],
1424 100 + i as u64,
1425 )
1426 .unwrap();
1427 memtable.write_bulk(part).unwrap();
1428 }
1429
1430 memtable.compact(false).unwrap();
1431
1432 let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
1433 let ranges = memtable
1434 .ranges(
1435 None,
1436 RangesOptions::default().with_predicate(predicate_group),
1437 )
1438 .unwrap();
1439
1440 assert_eq!(3, ranges.ranges.len());
1442 assert_eq!(10, ranges.stats.num_rows);
1443
1444 for (_range_id, range) in ranges.ranges.iter() {
1445 assert!(range.num_rows() > 0);
1446 assert!(range.is_record_batch());
1447
1448 let record_batch_iter = range.build_record_batch_iter(None).unwrap();
1449 let mut total_rows = 0;
1450 for batch_result in record_batch_iter {
1451 let batch = batch_result.unwrap();
1452 total_rows += batch.num_rows();
1453 assert!(batch.num_rows() > 0);
1454 }
1455 assert_eq!(total_rows, range.num_rows());
1456 }
1457 }
1458}