1pub(crate) mod chunk_reader;
18pub mod context;
19pub mod part;
20pub mod part_reader;
21mod row_group_reader;
22
23use std::collections::{BTreeMap, HashSet};
24use std::sync::atomic::{AtomicI64, AtomicU64, AtomicUsize, Ordering};
25use std::sync::{Arc, LazyLock, Mutex, RwLock};
26use std::time::Instant;
27
28fn env_usize(name: &str, default: usize) -> usize {
30 std::env::var(name)
31 .ok()
32 .and_then(|v| v.parse().ok())
33 .unwrap_or(default)
34}
35
36use common_time::Timestamp;
37use datatypes::arrow::datatypes::SchemaRef;
38use mito_codec::key_values::KeyValue;
39use rayon::prelude::*;
40use serde::{Deserialize, Serialize};
41use serde_with::{DisplayFromStr, serde_as};
42use store_api::metadata::RegionMetadataRef;
43use store_api::storage::{ColumnId, FileId, RegionId, SequenceRange};
44use tokio::sync::Semaphore;
45
46use crate::error::{Result, UnsupportedOperationSnafu};
47use crate::flush::WriteBufferManagerRef;
48use crate::memtable::bulk::context::BulkIterContext;
49use crate::memtable::bulk::part::{
50 BulkPart, BulkPartEncodeMetrics, BulkPartEncoder, MultiBulkPart, UnorderedPart,
51 should_prune_bulk_part,
52};
53use crate::memtable::bulk::part_reader::BulkPartBatchIter;
54use crate::memtable::stats::WriteMetrics;
55use crate::memtable::{
56 AllocTracker, BoxedBatchIterator, BoxedRecordBatchIterator, EncodedBulkPart, EncodedRange,
57 IterBuilder, KeyValues, MemScanMetrics, Memtable, MemtableBuilder, MemtableId, MemtableRange,
58 MemtableRangeContext, MemtableRanges, MemtableRef, MemtableStats, RangesOptions,
59};
60use crate::read::flat_dedup::{FlatDedupIterator, FlatLastNonNull, FlatLastRow};
61use crate::read::flat_merge::FlatMergeIterator;
62use crate::region::options::MergeMode;
63use crate::sst::parquet::flat_format::field_column_start;
64use crate::sst::parquet::{DEFAULT_READ_BATCH_SIZE, DEFAULT_ROW_GROUP_SIZE};
65use crate::sst::{FlatSchemaOptions, to_flat_sst_arrow_schema};
66
67const DEFAULT_MERGE_THRESHOLD: usize = 16;
69
70static MERGE_THRESHOLD: LazyLock<usize> =
72 LazyLock::new(|| env_usize("GREPTIME_BULK_MERGE_THRESHOLD", DEFAULT_MERGE_THRESHOLD));
73
74const DEFAULT_MAX_MERGE_GROUPS: usize = 32;
76
77static MAX_MERGE_GROUPS: LazyLock<usize> =
79 LazyLock::new(|| env_usize("GREPTIME_BULK_MAX_MERGE_GROUPS", DEFAULT_MAX_MERGE_GROUPS));
80
81pub(crate) static ENCODE_ROW_THRESHOLD: LazyLock<usize> = LazyLock::new(|| {
84 env_usize(
85 "GREPTIME_BULK_ENCODE_ROW_THRESHOLD",
86 10 * DEFAULT_ROW_GROUP_SIZE,
87 )
88});
89
90const DEFAULT_ENCODE_BYTES_THRESHOLD: usize = 64 * 1024 * 1024;
92
93static ENCODE_BYTES_THRESHOLD: LazyLock<usize> = LazyLock::new(|| {
96 env_usize(
97 "GREPTIME_BULK_ENCODE_BYTES_THRESHOLD",
98 DEFAULT_ENCODE_BYTES_THRESHOLD,
99 )
100});
101
102#[serde_as]
104#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
105#[serde(default)]
106pub struct BulkMemtableConfig {
107 #[serde_as(as = "DisplayFromStr")]
109 pub merge_threshold: usize,
110 #[serde_as(as = "DisplayFromStr")]
112 pub encode_row_threshold: usize,
113 #[serde_as(as = "DisplayFromStr")]
115 pub encode_bytes_threshold: usize,
116 #[serde_as(as = "DisplayFromStr")]
118 pub max_merge_groups: usize,
119}
120
121impl Default for BulkMemtableConfig {
122 fn default() -> Self {
123 Self {
124 merge_threshold: *MERGE_THRESHOLD,
125 encode_row_threshold: *ENCODE_ROW_THRESHOLD,
126 encode_bytes_threshold: *ENCODE_BYTES_THRESHOLD,
127 max_merge_groups: *MAX_MERGE_GROUPS,
128 }
129 .sanitize()
130 }
131}
132
133impl BulkMemtableConfig {
134 fn sanitize(mut self) -> Self {
135 if self.merge_threshold == 0 {
136 self.merge_threshold = DEFAULT_MERGE_THRESHOLD;
137 }
138 self
139 }
140}
141
142enum MergedPart {
144 Multi(MultiBulkPart),
146 Encoded(EncodedBulkPart),
148}
149
150struct CollectedParts {
152 groups: Vec<Vec<PartToMerge>>,
154}
155
156#[derive(Default)]
158struct BulkParts {
159 unordered_part: UnorderedPart,
161 parts: Vec<BulkPartWrapper>,
163}
164
165impl BulkParts {
166 fn num_parts(&self) -> usize {
168 let unordered_count = if self.unordered_part.is_empty() { 0 } else { 1 };
169 self.parts.len() + unordered_count
170 }
171
172 fn is_empty(&self) -> bool {
174 self.unordered_part.is_empty() && self.parts.is_empty()
175 }
176
177 fn should_merge_parts(&self, merge_threshold: usize) -> bool {
180 let mut bulk_count = 0;
181 let mut encoded_count = 0;
182
183 for wrapper in &self.parts {
184 if wrapper.merging {
185 continue;
186 }
187
188 if wrapper.part.is_encoded() {
189 encoded_count += 1;
190 } else {
191 bulk_count += 1;
192 }
193
194 if bulk_count >= merge_threshold || encoded_count >= merge_threshold {
196 return true;
197 }
198 }
199
200 false
201 }
202
203 fn should_compact_unordered_part(&self) -> bool {
205 self.unordered_part.should_compact()
206 }
207
208 fn collect_parts_to_merge(
212 &mut self,
213 merge_threshold: usize,
214 max_merge_groups: usize,
215 ) -> CollectedParts {
216 let mut bulk_indices: Vec<(usize, usize)> = Vec::new();
218 let mut encoded_indices: Vec<(usize, usize)> = Vec::new();
219
220 for (idx, wrapper) in self.parts.iter().enumerate() {
221 if wrapper.merging {
222 continue;
223 }
224 let num_rows = wrapper.part.num_rows();
225 if wrapper.part.is_encoded() {
226 encoded_indices.push((idx, num_rows));
227 } else {
228 bulk_indices.push((idx, num_rows));
229 }
230 }
231
232 let mut groups = Vec::new();
233
234 if bulk_indices.len() >= merge_threshold {
236 groups.extend(self.collect_and_group_parts(
237 bulk_indices,
238 merge_threshold,
239 max_merge_groups,
240 ));
241 }
242
243 if encoded_indices.len() >= merge_threshold {
245 groups.extend(self.collect_and_group_parts(
246 encoded_indices,
247 merge_threshold,
248 max_merge_groups,
249 ));
250 }
251
252 CollectedParts { groups }
253 }
254
255 fn collect_and_group_parts(
257 &mut self,
258 mut indices: Vec<(usize, usize)>,
259 merge_threshold: usize,
260 max_merge_groups: usize,
261 ) -> Vec<Vec<PartToMerge>> {
262 if indices.is_empty() {
263 return Vec::new();
264 }
265
266 indices.sort_unstable_by_key(|(_, num_rows)| *num_rows);
268
269 indices
271 .chunks(merge_threshold)
272 .take(max_merge_groups)
273 .map(|chunk| {
274 chunk
275 .iter()
276 .map(|(idx, _)| {
277 let wrapper = &mut self.parts[*idx];
278 wrapper.merging = true;
279 wrapper.part.clone()
280 })
281 .collect()
282 })
283 .collect()
284 }
285
286 fn install_merged_parts<I>(
289 &mut self,
290 merged_parts: I,
291 merged_file_ids: &HashSet<FileId>,
292 ) -> usize
293 where
294 I: IntoIterator<Item = MergedPart>,
295 {
296 let mut total_output_rows = 0;
297
298 for merged_part in merged_parts {
299 match merged_part {
300 MergedPart::Encoded(encoded_part) => {
301 total_output_rows += encoded_part.metadata().num_rows;
302 self.parts.push(BulkPartWrapper {
303 part: PartToMerge::Encoded {
304 part: encoded_part,
305 file_id: FileId::random(),
306 },
307 merging: false,
308 });
309 }
310 MergedPart::Multi(multi_part) => {
311 total_output_rows += multi_part.num_rows();
312 self.parts.push(BulkPartWrapper {
313 part: PartToMerge::Multi {
314 part: multi_part,
315 file_id: FileId::random(),
316 },
317 merging: false,
318 });
319 }
320 }
321 }
322
323 self.parts
324 .retain(|wrapper| !merged_file_ids.contains(&wrapper.file_id()));
325
326 total_output_rows
327 }
328
329 fn reset_merging_flags(&mut self, file_ids: &HashSet<FileId>) {
332 for wrapper in &mut self.parts {
333 if file_ids.contains(&wrapper.file_id()) {
334 wrapper.merging = false;
335 }
336 }
337 }
338}
339
340struct MergingFlagsGuard<'a> {
343 bulk_parts: &'a RwLock<BulkParts>,
344 file_ids: &'a HashSet<FileId>,
345 success: bool,
346}
347
348impl<'a> MergingFlagsGuard<'a> {
349 fn new(bulk_parts: &'a RwLock<BulkParts>, file_ids: &'a HashSet<FileId>) -> Self {
351 Self {
352 bulk_parts,
353 file_ids,
354 success: false,
355 }
356 }
357
358 fn mark_success(&mut self) {
361 self.success = true;
362 }
363}
364
365impl<'a> Drop for MergingFlagsGuard<'a> {
366 fn drop(&mut self) {
367 if !self.success
368 && let Ok(mut parts) = self.bulk_parts.write()
369 {
370 parts.reset_merging_flags(self.file_ids);
371 }
372 }
373}
374
375pub struct BulkMemtable {
377 id: MemtableId,
378 config: BulkMemtableConfig,
380 parts: Arc<RwLock<BulkParts>>,
381 metadata: RegionMetadataRef,
382 alloc_tracker: AllocTracker,
383 max_timestamp: AtomicI64,
384 min_timestamp: AtomicI64,
385 max_sequence: AtomicU64,
386 num_rows: AtomicUsize,
387 flat_arrow_schema: SchemaRef,
389 compactor: Arc<Mutex<MemtableCompactor>>,
391 compact_dispatcher: Option<Arc<CompactDispatcher>>,
393 append_mode: bool,
395 merge_mode: MergeMode,
397}
398
399impl std::fmt::Debug for BulkMemtable {
400 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
401 f.debug_struct("BulkMemtable")
402 .field("id", &self.id)
403 .field("num_rows", &self.num_rows.load(Ordering::Relaxed))
404 .field("min_timestamp", &self.min_timestamp.load(Ordering::Relaxed))
405 .field("max_timestamp", &self.max_timestamp.load(Ordering::Relaxed))
406 .field("max_sequence", &self.max_sequence.load(Ordering::Relaxed))
407 .finish()
408 }
409}
410
411impl Memtable for BulkMemtable {
412 fn id(&self) -> MemtableId {
413 self.id
414 }
415
416 fn write(&self, _kvs: &KeyValues) -> Result<()> {
417 UnsupportedOperationSnafu {
418 err_msg: "write() is not supported for bulk memtable",
419 }
420 .fail()
421 }
422
423 fn write_one(&self, _key_value: KeyValue) -> Result<()> {
424 UnsupportedOperationSnafu {
425 err_msg: "write_one() is not supported for bulk memtable",
426 }
427 .fail()
428 }
429
430 fn write_bulk(&self, fragment: BulkPart) -> Result<()> {
431 let local_metrics = WriteMetrics {
432 key_bytes: 0,
433 value_bytes: fragment.estimated_size(),
434 min_ts: fragment.min_timestamp,
435 max_ts: fragment.max_timestamp,
436 num_rows: fragment.num_rows(),
437 max_sequence: fragment.sequence,
438 };
439
440 {
441 let mut bulk_parts = self.parts.write().unwrap();
442
443 if bulk_parts.unordered_part.should_accept(fragment.num_rows()) {
445 bulk_parts.unordered_part.push(fragment);
446
447 if bulk_parts.should_compact_unordered_part()
449 && let Some(bulk_part) = bulk_parts.unordered_part.to_bulk_part()?
450 {
451 bulk_parts.parts.push(BulkPartWrapper {
452 part: PartToMerge::Bulk {
453 part: bulk_part,
454 file_id: FileId::random(),
455 },
456 merging: false,
457 });
458 bulk_parts.unordered_part.clear();
459 }
460 } else {
461 bulk_parts.parts.push(BulkPartWrapper {
462 part: PartToMerge::Bulk {
463 part: fragment,
464 file_id: FileId::random(),
465 },
466 merging: false,
467 });
468 }
469
470 self.update_stats(local_metrics);
475 }
476
477 if self.should_compact() {
478 self.schedule_compact();
479 }
480
481 Ok(())
482 }
483
484 fn ranges(
485 &self,
486 projection: Option<&[ColumnId]>,
487 options: RangesOptions,
488 ) -> Result<MemtableRanges> {
489 let predicate = options.predicate;
490 let sequence = options.sequence;
491 let mut ranges = BTreeMap::new();
492 let mut range_id = 0;
493
494 let context = Arc::new(BulkIterContext::new_with_pre_filter_mode(
496 self.metadata.clone(),
497 projection,
498 predicate.predicate().cloned(),
499 options.for_flush,
500 options.pre_filter_mode,
501 )?);
502
503 {
505 let bulk_parts = self.parts.read().unwrap();
506
507 if !bulk_parts.unordered_part.is_empty()
509 && let Some(unordered_bulk_part) = bulk_parts.unordered_part.to_bulk_part()?
510 {
511 let part_stats = unordered_bulk_part.to_memtable_stats(&self.metadata);
512 let range = MemtableRange::new(
513 Arc::new(MemtableRangeContext::new(
514 self.id,
515 Box::new(BulkRangeIterBuilder {
516 part: unordered_bulk_part,
517 context: context.clone(),
518 sequence,
519 }),
520 predicate.clone(),
521 )),
522 part_stats,
523 );
524 ranges.insert(range_id, range);
525 range_id += 1;
526 }
527
528 for part_wrapper in bulk_parts.parts.iter() {
530 if part_wrapper.part.num_rows() == 0 {
532 continue;
533 }
534
535 let part_stats = part_wrapper.part.to_memtable_stats(&self.metadata);
536 let iter_builder: Box<dyn IterBuilder> = match &part_wrapper.part {
537 PartToMerge::Bulk { part, .. } => Box::new(BulkRangeIterBuilder {
538 part: part.clone(),
539 context: context.clone(),
540 sequence,
541 }),
542 PartToMerge::Multi { part, .. } => Box::new(MultiBulkRangeIterBuilder {
543 part: part.clone(),
544 context: context.clone(),
545 sequence,
546 }),
547 PartToMerge::Encoded { part, file_id } => {
548 Box::new(EncodedBulkRangeIterBuilder {
549 file_id: *file_id,
550 part: part.clone(),
551 context: context.clone(),
552 sequence,
553 })
554 }
555 };
556
557 let range = MemtableRange::new(
558 Arc::new(MemtableRangeContext::new(
559 self.id,
560 iter_builder,
561 predicate.clone(),
562 )),
563 part_stats,
564 );
565 ranges.insert(range_id, range);
566 range_id += 1;
567 }
568 }
569
570 Ok(MemtableRanges { ranges })
571 }
572
573 fn is_empty(&self) -> bool {
574 let bulk_parts = self.parts.read().unwrap();
575 bulk_parts.is_empty()
576 }
577
578 fn freeze(&self) -> Result<()> {
579 self.alloc_tracker.done_allocating();
580 Ok(())
581 }
582
583 fn stats(&self) -> MemtableStats {
584 let estimated_bytes = self.alloc_tracker.bytes_allocated();
585
586 if estimated_bytes == 0 || self.num_rows.load(Ordering::Relaxed) == 0 {
587 return MemtableStats {
588 estimated_bytes,
589 time_range: None,
590 num_rows: 0,
591 num_ranges: 0,
592 max_sequence: 0,
593 series_count: 0,
594 };
595 }
596
597 let ts_type = self
598 .metadata
599 .time_index_column()
600 .column_schema
601 .data_type
602 .clone()
603 .as_timestamp()
604 .expect("Timestamp column must have timestamp type");
605 let max_timestamp = ts_type.create_timestamp(self.max_timestamp.load(Ordering::Relaxed));
606 let min_timestamp = ts_type.create_timestamp(self.min_timestamp.load(Ordering::Relaxed));
607
608 let num_ranges = self.parts.read().unwrap().num_parts();
609
610 MemtableStats {
611 estimated_bytes,
612 time_range: Some((min_timestamp, max_timestamp)),
613 num_rows: self.num_rows.load(Ordering::Relaxed),
614 num_ranges,
615 max_sequence: self.max_sequence.load(Ordering::Relaxed),
616 series_count: self.estimated_series_count(),
617 }
618 }
619
620 fn fork(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef {
621 let flat_arrow_schema = to_flat_sst_arrow_schema(
623 metadata,
624 &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
625 );
626
627 Arc::new(Self {
628 id,
629 config: self.config.clone(),
630 parts: Arc::new(RwLock::new(BulkParts::default())),
631 metadata: metadata.clone(),
632 alloc_tracker: AllocTracker::new(self.alloc_tracker.write_buffer_manager()),
633 max_timestamp: AtomicI64::new(i64::MIN),
634 min_timestamp: AtomicI64::new(i64::MAX),
635 max_sequence: AtomicU64::new(0),
636 num_rows: AtomicUsize::new(0),
637 flat_arrow_schema,
638 compactor: Arc::new(Mutex::new(MemtableCompactor::new(
639 metadata.region_id,
640 id,
641 self.config.clone(),
642 ))),
643 compact_dispatcher: self.compact_dispatcher.clone(),
644 append_mode: self.append_mode,
645 merge_mode: self.merge_mode,
646 })
647 }
648
649 fn compact(&self, for_flush: bool) -> Result<()> {
650 let mut compactor = self.compactor.lock().unwrap();
651
652 if for_flush {
653 return Ok(());
654 }
655
656 let should_merge = self
658 .parts
659 .read()
660 .unwrap()
661 .should_merge_parts(self.config.merge_threshold);
662 if should_merge {
663 compactor.merge_parts(
664 &self.flat_arrow_schema,
665 &self.parts,
666 &self.metadata,
667 !self.append_mode,
668 self.merge_mode,
669 )?;
670 }
671
672 Ok(())
673 }
674}
675
676impl BulkMemtable {
677 pub fn new(
679 id: MemtableId,
680 config: BulkMemtableConfig,
681 metadata: RegionMetadataRef,
682 write_buffer_manager: Option<WriteBufferManagerRef>,
683 compact_dispatcher: Option<Arc<CompactDispatcher>>,
684 append_mode: bool,
685 merge_mode: MergeMode,
686 ) -> Self {
687 let config = config.sanitize();
688 let flat_arrow_schema = to_flat_sst_arrow_schema(
689 &metadata,
690 &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
691 );
692
693 let region_id = metadata.region_id;
694 Self {
695 id,
696 config: config.clone(),
697 parts: Arc::new(RwLock::new(BulkParts::default())),
698 metadata,
699 alloc_tracker: AllocTracker::new(write_buffer_manager),
700 max_timestamp: AtomicI64::new(i64::MIN),
701 min_timestamp: AtomicI64::new(i64::MAX),
702 max_sequence: AtomicU64::new(0),
703 num_rows: AtomicUsize::new(0),
704 flat_arrow_schema,
705 compactor: Arc::new(Mutex::new(MemtableCompactor::new(region_id, id, config))),
706 compact_dispatcher,
707 append_mode,
708 merge_mode,
709 }
710 }
711
712 #[cfg(test)]
714 pub fn set_unordered_part_threshold(&self, threshold: usize) {
715 self.parts
716 .write()
717 .unwrap()
718 .unordered_part
719 .set_threshold(threshold);
720 }
721
722 #[cfg(test)]
724 pub fn set_unordered_part_compact_threshold(&self, compact_threshold: usize) {
725 self.parts
726 .write()
727 .unwrap()
728 .unordered_part
729 .set_compact_threshold(compact_threshold);
730 }
731
732 fn update_stats(&self, stats: WriteMetrics) {
736 self.alloc_tracker
737 .on_allocation(stats.key_bytes + stats.value_bytes);
738
739 self.max_timestamp
740 .fetch_max(stats.max_ts, Ordering::Relaxed);
741 self.min_timestamp
742 .fetch_min(stats.min_ts, Ordering::Relaxed);
743 self.max_sequence
744 .fetch_max(stats.max_sequence, Ordering::Relaxed);
745 self.num_rows.fetch_add(stats.num_rows, Ordering::Relaxed);
746 }
747
748 fn estimated_series_count(&self) -> usize {
750 let bulk_parts = self.parts.read().unwrap();
751 bulk_parts
752 .parts
753 .iter()
754 .map(|part_wrapper| part_wrapper.part.series_count())
755 .sum()
756 }
757
758 fn should_compact(&self) -> bool {
760 let parts = self.parts.read().unwrap();
761 parts.should_merge_parts(self.config.merge_threshold)
762 }
763
764 fn schedule_compact(&self) {
766 if let Some(dispatcher) = &self.compact_dispatcher {
767 let task = MemCompactTask {
768 metadata: self.metadata.clone(),
769 parts: self.parts.clone(),
770 config: self.config.clone(),
771 flat_arrow_schema: self.flat_arrow_schema.clone(),
772 compactor: self.compactor.clone(),
773 append_mode: self.append_mode,
774 merge_mode: self.merge_mode,
775 };
776
777 dispatcher.dispatch_compact(task);
778 } else {
779 if let Err(e) = self.compact(false) {
781 common_telemetry::error!(e; "Failed to compact table");
782 }
783 }
784 }
785}
786
787pub struct BulkRangeIterBuilder {
789 pub part: BulkPart,
790 pub context: Arc<BulkIterContext>,
791 pub sequence: Option<SequenceRange>,
792}
793
794struct MultiBulkRangeIterBuilder {
796 part: MultiBulkPart,
797 context: Arc<BulkIterContext>,
798 sequence: Option<SequenceRange>,
799}
800
801impl IterBuilder for BulkRangeIterBuilder {
802 fn build(&self, _metrics: Option<MemScanMetrics>) -> Result<BoxedBatchIterator> {
803 UnsupportedOperationSnafu {
804 err_msg: "BatchIterator is not supported for bulk memtable",
805 }
806 .fail()
807 }
808
809 fn is_record_batch(&self) -> bool {
810 true
811 }
812
813 fn build_record_batch(
814 &self,
815 _time_range: Option<(Timestamp, Timestamp)>,
816 metrics: Option<MemScanMetrics>,
817 ) -> Result<BoxedRecordBatchIterator> {
818 let metadata = self.context.read_format().metadata();
819 if should_prune_bulk_part(&self.part.batch, &self.context, metadata) {
820 return Ok(Box::new(std::iter::empty()));
821 }
822
823 let series_count = self.part.estimated_series_count();
824 let iter = BulkPartBatchIter::from_single(
825 self.part.batch.clone(),
826 self.context.clone(),
827 self.sequence,
828 series_count,
829 metrics,
830 );
831
832 Ok(Box::new(iter))
833 }
834
835 fn encoded_range(&self) -> Option<EncodedRange> {
836 None
837 }
838}
839
840impl IterBuilder for MultiBulkRangeIterBuilder {
841 fn build(&self, _metrics: Option<MemScanMetrics>) -> Result<BoxedBatchIterator> {
842 UnsupportedOperationSnafu {
843 err_msg: "BatchIterator is not supported for multi bulk memtable",
844 }
845 .fail()
846 }
847
848 fn is_record_batch(&self) -> bool {
849 true
850 }
851
852 fn build_record_batch(
853 &self,
854 _time_range: Option<(Timestamp, Timestamp)>,
855 metrics: Option<MemScanMetrics>,
856 ) -> Result<BoxedRecordBatchIterator> {
857 match self
858 .part
859 .read(self.context.clone(), self.sequence, metrics)?
860 {
861 Some(iter) => Ok(iter),
862 None => Ok(Box::new(std::iter::empty())),
864 }
865 }
866
867 fn encoded_range(&self) -> Option<EncodedRange> {
868 None
869 }
870}
871
872struct EncodedBulkRangeIterBuilder {
874 file_id: FileId,
875 part: EncodedBulkPart,
876 context: Arc<BulkIterContext>,
877 sequence: Option<SequenceRange>,
878}
879
880impl IterBuilder for EncodedBulkRangeIterBuilder {
881 fn build(&self, _metrics: Option<MemScanMetrics>) -> Result<BoxedBatchIterator> {
882 UnsupportedOperationSnafu {
883 err_msg: "BatchIterator is not supported for encoded bulk memtable",
884 }
885 .fail()
886 }
887
888 fn is_record_batch(&self) -> bool {
889 true
890 }
891
892 fn build_record_batch(
893 &self,
894 _time_range: Option<(Timestamp, Timestamp)>,
895 metrics: Option<MemScanMetrics>,
896 ) -> Result<BoxedRecordBatchIterator> {
897 if let Some(iter) = self
898 .part
899 .read(self.context.clone(), self.sequence, metrics)?
900 {
901 Ok(iter)
902 } else {
903 Ok(Box::new(std::iter::empty()))
905 }
906 }
907
908 fn encoded_range(&self) -> Option<EncodedRange> {
909 Some(EncodedRange {
910 data: self.part.data().clone(),
911 sst_info: self.part.to_sst_info(self.file_id),
912 })
913 }
914}
915
916struct BulkPartWrapper {
917 part: PartToMerge,
919 merging: bool,
921}
922
923impl BulkPartWrapper {
924 fn file_id(&self) -> FileId {
926 self.part.file_id()
927 }
928}
929
930#[derive(Clone)]
932enum PartToMerge {
933 Bulk { part: BulkPart, file_id: FileId },
935 Multi {
937 part: MultiBulkPart,
938 file_id: FileId,
939 },
940 Encoded {
942 part: EncodedBulkPart,
943 file_id: FileId,
944 },
945}
946
947impl PartToMerge {
948 fn file_id(&self) -> FileId {
950 match self {
951 PartToMerge::Bulk { file_id, .. } => *file_id,
952 PartToMerge::Multi { file_id, .. } => *file_id,
953 PartToMerge::Encoded { file_id, .. } => *file_id,
954 }
955 }
956
957 fn min_timestamp(&self) -> i64 {
959 match self {
960 PartToMerge::Bulk { part, .. } => part.min_timestamp,
961 PartToMerge::Multi { part, .. } => part.min_timestamp(),
962 PartToMerge::Encoded { part, .. } => part.metadata().min_timestamp,
963 }
964 }
965
966 fn max_timestamp(&self) -> i64 {
968 match self {
969 PartToMerge::Bulk { part, .. } => part.max_timestamp,
970 PartToMerge::Multi { part, .. } => part.max_timestamp(),
971 PartToMerge::Encoded { part, .. } => part.metadata().max_timestamp,
972 }
973 }
974
975 fn num_rows(&self) -> usize {
977 match self {
978 PartToMerge::Bulk { part, .. } => part.num_rows(),
979 PartToMerge::Multi { part, .. } => part.num_rows(),
980 PartToMerge::Encoded { part, .. } => part.metadata().num_rows,
981 }
982 }
983
984 fn max_sequence(&self) -> u64 {
986 match self {
987 PartToMerge::Bulk { part, .. } => part.sequence,
988 PartToMerge::Multi { part, .. } => part.max_sequence(),
989 PartToMerge::Encoded { part, .. } => part.metadata().max_sequence,
990 }
991 }
992
993 fn series_count(&self) -> usize {
995 match self {
996 PartToMerge::Bulk { part, .. } => part.estimated_series_count(),
997 PartToMerge::Multi { part, .. } => part.series_count(),
998 PartToMerge::Encoded { part, .. } => part.metadata().num_series as usize,
999 }
1000 }
1001
1002 fn is_encoded(&self) -> bool {
1004 matches!(self, PartToMerge::Encoded { .. })
1005 }
1006
1007 fn estimated_size(&self) -> usize {
1009 match self {
1010 PartToMerge::Bulk { part, .. } => part.estimated_size(),
1011 PartToMerge::Multi { part, .. } => part.estimated_size(),
1012 PartToMerge::Encoded { part, .. } => part.size_bytes(),
1013 }
1014 }
1015
1016 fn to_memtable_stats(&self, region_metadata: &RegionMetadataRef) -> MemtableStats {
1018 match self {
1019 PartToMerge::Bulk { part, .. } => part.to_memtable_stats(region_metadata),
1020 PartToMerge::Multi { part, .. } => part.to_memtable_stats(region_metadata),
1021 PartToMerge::Encoded { part, .. } => part.to_memtable_stats(),
1022 }
1023 }
1024
1025 fn create_iterator(
1027 self,
1028 context: Arc<BulkIterContext>,
1029 ) -> Result<Option<BoxedRecordBatchIterator>> {
1030 match self {
1031 PartToMerge::Bulk { part, .. } => {
1032 let series_count = part.estimated_series_count();
1033 let iter = BulkPartBatchIter::from_single(
1034 part.batch,
1035 context,
1036 None, series_count,
1038 None, );
1040 Ok(Some(Box::new(iter) as BoxedRecordBatchIterator))
1041 }
1042 PartToMerge::Multi { part, .. } => part.read(context, None, None),
1043 PartToMerge::Encoded { part, .. } => part.read(context, None, None),
1044 }
1045 }
1046}
1047
1048struct MemtableCompactor {
1049 region_id: RegionId,
1050 memtable_id: MemtableId,
1051 config: BulkMemtableConfig,
1053}
1054
1055impl MemtableCompactor {
1056 fn new(region_id: RegionId, memtable_id: MemtableId, config: BulkMemtableConfig) -> Self {
1058 Self {
1059 region_id,
1060 memtable_id,
1061 config,
1062 }
1063 }
1064
1065 fn merge_parts(
1067 &mut self,
1068 arrow_schema: &SchemaRef,
1069 bulk_parts: &RwLock<BulkParts>,
1070 metadata: &RegionMetadataRef,
1071 dedup: bool,
1072 merge_mode: MergeMode,
1073 ) -> Result<()> {
1074 let start = Instant::now();
1075
1076 let collected = bulk_parts
1078 .write()
1079 .unwrap()
1080 .collect_parts_to_merge(self.config.merge_threshold, self.config.max_merge_groups);
1081
1082 if collected.groups.is_empty() {
1083 return Ok(());
1084 }
1085
1086 let merged_file_ids: HashSet<FileId> = collected
1088 .groups
1089 .iter()
1090 .flatten()
1091 .map(|part| part.file_id())
1092 .collect();
1093 let mut guard = MergingFlagsGuard::new(bulk_parts, &merged_file_ids);
1094
1095 let num_groups = collected.groups.len();
1096 let num_parts: usize = collected.groups.iter().map(|g| g.len()).sum();
1097
1098 let encode_row_threshold = self.config.encode_row_threshold;
1099 let encode_bytes_threshold = self.config.encode_bytes_threshold;
1100
1101 let merged_parts = collected
1103 .groups
1104 .into_par_iter()
1105 .map(|group| {
1106 Self::merge_parts_group(
1107 group,
1108 arrow_schema,
1109 metadata,
1110 dedup,
1111 merge_mode,
1112 encode_row_threshold,
1113 encode_bytes_threshold,
1114 )
1115 })
1116 .collect::<Result<Vec<Option<MergedPart>>>>()?;
1117
1118 let total_output_rows = {
1120 let mut parts = bulk_parts.write().unwrap();
1121 parts.install_merged_parts(merged_parts.into_iter().flatten(), &merged_file_ids)
1122 };
1123
1124 guard.mark_success();
1125
1126 common_telemetry::debug!(
1127 "BulkMemtable {} {} concurrent compact {} groups, {} parts, {} rows, cost: {:?}",
1128 self.region_id,
1129 self.memtable_id,
1130 num_groups,
1131 num_parts,
1132 total_output_rows,
1133 start.elapsed()
1134 );
1135
1136 Ok(())
1137 }
1138
1139 fn merge_parts_group(
1141 parts_to_merge: Vec<PartToMerge>,
1142 arrow_schema: &SchemaRef,
1143 metadata: &RegionMetadataRef,
1144 dedup: bool,
1145 merge_mode: MergeMode,
1146 encode_row_threshold: usize,
1147 encode_bytes_threshold: usize,
1148 ) -> Result<Option<MergedPart>> {
1149 if parts_to_merge.is_empty() {
1150 return Ok(None);
1151 }
1152
1153 let min_timestamp = parts_to_merge
1155 .iter()
1156 .map(|p| p.min_timestamp())
1157 .min()
1158 .unwrap_or(i64::MAX);
1159 let max_timestamp = parts_to_merge
1160 .iter()
1161 .map(|p| p.max_timestamp())
1162 .max()
1163 .unwrap_or(i64::MIN);
1164 let max_sequence = parts_to_merge
1165 .iter()
1166 .map(|p| p.max_sequence())
1167 .max()
1168 .unwrap_or(0);
1169
1170 let estimated_total_rows: usize = parts_to_merge.iter().map(|p| p.num_rows()).sum();
1172 let estimated_total_bytes: usize = parts_to_merge.iter().map(|p| p.estimated_size()).sum();
1173 let estimated_series_count = parts_to_merge
1174 .iter()
1175 .map(|p| p.series_count())
1176 .max()
1177 .unwrap_or(0);
1178
1179 let context = Arc::new(BulkIterContext::new(
1180 metadata.clone(),
1181 None, None, true,
1184 )?);
1185
1186 let iterators: Vec<BoxedRecordBatchIterator> = parts_to_merge
1188 .into_iter()
1189 .filter_map(|part| part.create_iterator(context.clone()).ok().flatten())
1190 .collect();
1191
1192 if iterators.is_empty() {
1193 return Ok(None);
1194 }
1195
1196 let merged_iter =
1197 FlatMergeIterator::new(arrow_schema.clone(), iterators, DEFAULT_READ_BATCH_SIZE)?;
1198
1199 let boxed_iter: BoxedRecordBatchIterator = if dedup {
1200 match merge_mode {
1202 MergeMode::LastRow => {
1203 let dedup_iter = FlatDedupIterator::new(merged_iter, FlatLastRow::new(false));
1204 Box::new(dedup_iter)
1205 }
1206 MergeMode::LastNonNull => {
1207 let field_column_start =
1208 field_column_start(metadata, arrow_schema.fields().len());
1209
1210 let dedup_iter = FlatDedupIterator::new(
1211 merged_iter,
1212 FlatLastNonNull::new(field_column_start, false),
1213 );
1214 Box::new(dedup_iter)
1215 }
1216 }
1217 } else {
1218 Box::new(merged_iter)
1219 };
1220
1221 if estimated_total_rows > encode_row_threshold
1223 || estimated_total_bytes > encode_bytes_threshold
1224 {
1225 let encoder = BulkPartEncoder::new(metadata.clone(), DEFAULT_ROW_GROUP_SIZE)?;
1226 let mut metrics = BulkPartEncodeMetrics::default();
1227 let encoded_part = encoder.encode_record_batch_iter(
1228 boxed_iter,
1229 arrow_schema.clone(),
1230 min_timestamp,
1231 max_timestamp,
1232 max_sequence,
1233 &mut metrics,
1234 )?;
1235
1236 common_telemetry::trace!("merge_parts_group metrics: {:?}", metrics);
1237
1238 Ok(encoded_part.map(MergedPart::Encoded))
1239 } else {
1240 let mut batches = Vec::new();
1242 let mut actual_total_rows = 0;
1243
1244 for batch_result in boxed_iter {
1245 let batch = batch_result?;
1246 actual_total_rows += batch.num_rows();
1247 batches.push(batch);
1248 }
1249
1250 if actual_total_rows == 0 {
1251 return Ok(None);
1252 }
1253
1254 let multi_part = MultiBulkPart::new(
1255 batches,
1256 min_timestamp,
1257 max_timestamp,
1258 max_sequence,
1259 estimated_series_count,
1260 metadata,
1261 );
1262
1263 common_telemetry::trace!(
1264 "merge_parts_group created MultiBulkPart: rows={}, batches={}",
1265 actual_total_rows,
1266 multi_part.num_batches()
1267 );
1268
1269 Ok(Some(MergedPart::Multi(multi_part)))
1270 }
1271 }
1272}
1273
1274struct MemCompactTask {
1276 metadata: RegionMetadataRef,
1277 parts: Arc<RwLock<BulkParts>>,
1278 config: BulkMemtableConfig,
1280 flat_arrow_schema: SchemaRef,
1282 compactor: Arc<Mutex<MemtableCompactor>>,
1284 append_mode: bool,
1286 merge_mode: MergeMode,
1288}
1289
1290impl MemCompactTask {
1291 fn compact(&self) -> Result<()> {
1292 let mut compactor = self.compactor.lock().unwrap();
1293
1294 let should_merge = self
1295 .parts
1296 .read()
1297 .unwrap()
1298 .should_merge_parts(self.config.merge_threshold);
1299 if should_merge {
1300 compactor.merge_parts(
1301 &self.flat_arrow_schema,
1302 &self.parts,
1303 &self.metadata,
1304 !self.append_mode,
1305 self.merge_mode,
1306 )?;
1307 }
1308
1309 Ok(())
1310 }
1311}
1312
1313#[derive(Debug)]
1315pub struct CompactDispatcher {
1316 semaphore: Arc<Semaphore>,
1317}
1318
1319impl CompactDispatcher {
1320 pub fn new(permits: usize) -> Self {
1322 Self {
1323 semaphore: Arc::new(Semaphore::new(permits)),
1324 }
1325 }
1326
1327 fn dispatch_compact(&self, task: MemCompactTask) {
1329 let semaphore = self.semaphore.clone();
1330 common_runtime::spawn_global(async move {
1331 let Ok(_permit) = semaphore.acquire().await else {
1332 return;
1333 };
1334
1335 common_runtime::spawn_blocking_global(move || {
1336 if let Err(e) = task.compact() {
1337 common_telemetry::error!(e; "Failed to compact memtable, region: {}", task.metadata.region_id);
1338 }
1339 });
1340 });
1341 }
1342}
1343
1344#[derive(Debug, Default)]
1346pub struct BulkMemtableBuilder {
1347 config: BulkMemtableConfig,
1349 write_buffer_manager: Option<WriteBufferManagerRef>,
1350 compact_dispatcher: Option<Arc<CompactDispatcher>>,
1351 append_mode: bool,
1352 merge_mode: MergeMode,
1353}
1354
1355impl BulkMemtableBuilder {
1356 pub fn new(
1358 write_buffer_manager: Option<WriteBufferManagerRef>,
1359 append_mode: bool,
1360 merge_mode: MergeMode,
1361 ) -> Self {
1362 Self {
1363 config: BulkMemtableConfig::default(),
1364 write_buffer_manager,
1365 compact_dispatcher: None,
1366 append_mode,
1367 merge_mode,
1368 }
1369 }
1370
1371 pub fn with_config(mut self, config: BulkMemtableConfig) -> Self {
1373 self.config = config;
1374 self
1375 }
1376
1377 pub fn with_compact_dispatcher(mut self, compact_dispatcher: Arc<CompactDispatcher>) -> Self {
1379 self.compact_dispatcher = Some(compact_dispatcher);
1380 self
1381 }
1382
1383 #[cfg(test)]
1384 pub(crate) fn config(&self) -> &BulkMemtableConfig {
1385 &self.config
1386 }
1387}
1388
1389impl MemtableBuilder for BulkMemtableBuilder {
1390 fn build(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef {
1391 Arc::new(BulkMemtable::new(
1392 id,
1393 self.config.clone(),
1394 metadata.clone(),
1395 self.write_buffer_manager.clone(),
1396 self.compact_dispatcher.clone(),
1397 self.append_mode,
1398 self.merge_mode,
1399 ))
1400 }
1401
1402 fn use_bulk_insert(&self, _metadata: &RegionMetadataRef) -> bool {
1403 true
1404 }
1405}
1406
1407#[cfg(test)]
1408mod tests {
1409 use mito_codec::row_converter::build_primary_key_codec;
1410
1411 use super::*;
1412 use crate::memtable::bulk::part::BulkPartConverter;
1413 use crate::read::scan_region::PredicateGroup;
1414 use crate::sst::{FlatSchemaOptions, to_flat_sst_arrow_schema};
1415 use crate::test_util::memtable_util::{build_key_values_with_ts_seq_values, metadata_for_test};
1416
1417 fn create_bulk_part_with_converter(
1418 k0: &str,
1419 k1: u32,
1420 timestamps: Vec<i64>,
1421 values: Vec<Option<f64>>,
1422 sequence: u64,
1423 ) -> Result<BulkPart> {
1424 let metadata = metadata_for_test();
1425 let capacity = 100;
1426 let primary_key_codec = build_primary_key_codec(&metadata);
1427 let schema = to_flat_sst_arrow_schema(
1428 &metadata,
1429 &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
1430 );
1431
1432 let mut converter =
1433 BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec, true);
1434
1435 let key_values = build_key_values_with_ts_seq_values(
1436 &metadata,
1437 k0.to_string(),
1438 k1,
1439 timestamps.into_iter(),
1440 values.into_iter(),
1441 sequence,
1442 );
1443
1444 converter.append_key_values(&key_values)?;
1445 converter.convert()
1446 }
1447
1448 #[test]
1449 fn test_bulk_memtable_sanitizes_zero_merge_threshold() {
1450 let metadata = metadata_for_test();
1451 let config = BulkMemtableConfig {
1452 merge_threshold: 0,
1453 ..Default::default()
1454 };
1455
1456 let memtable =
1457 BulkMemtable::new(999, config, metadata, None, None, false, MergeMode::LastRow);
1458
1459 assert_eq!(DEFAULT_MERGE_THRESHOLD, memtable.config.merge_threshold);
1460 assert_eq!(
1461 DEFAULT_MERGE_THRESHOLD,
1462 memtable.compactor.lock().unwrap().config.merge_threshold
1463 );
1464 }
1465
1466 #[test]
1467 fn test_bulk_memtable_write_read() {
1468 let metadata = metadata_for_test();
1469 let memtable = BulkMemtable::new(
1470 999,
1471 BulkMemtableConfig::default(),
1472 metadata.clone(),
1473 None,
1474 None,
1475 false,
1476 MergeMode::LastRow,
1477 );
1478 memtable.set_unordered_part_threshold(0);
1480
1481 let test_data = [
1482 (
1483 "key_a",
1484 1u32,
1485 vec![1000i64, 2000i64],
1486 vec![Some(10.5), Some(20.5)],
1487 100u64,
1488 ),
1489 (
1490 "key_b",
1491 2u32,
1492 vec![1500i64, 2500i64],
1493 vec![Some(15.5), Some(25.5)],
1494 200u64,
1495 ),
1496 ("key_c", 3u32, vec![3000i64], vec![Some(30.5)], 300u64),
1497 ];
1498
1499 for (k0, k1, timestamps, values, seq) in test_data.iter() {
1500 let part =
1501 create_bulk_part_with_converter(k0, *k1, timestamps.clone(), values.clone(), *seq)
1502 .unwrap();
1503 memtable.write_bulk(part).unwrap();
1504 }
1505
1506 let stats = memtable.stats();
1507 assert_eq!(5, stats.num_rows);
1508 assert_eq!(3, stats.num_ranges);
1509 assert_eq!(300, stats.max_sequence);
1510
1511 let (min_ts, max_ts) = stats.time_range.unwrap();
1512 assert_eq!(1000, min_ts.value());
1513 assert_eq!(3000, max_ts.value());
1514
1515 let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
1516 let ranges = memtable
1517 .ranges(
1518 None,
1519 RangesOptions::default().with_predicate(predicate_group),
1520 )
1521 .unwrap();
1522
1523 assert_eq!(3, ranges.ranges.len());
1524 let total_rows: usize = ranges.ranges.values().map(|r| r.stats().num_rows()).sum();
1525 assert_eq!(5, total_rows);
1526
1527 for (_range_id, range) in ranges.ranges.iter() {
1528 assert!(range.num_rows() > 0);
1529 assert!(range.is_record_batch());
1530
1531 let record_batch_iter = range.build_record_batch_iter(None, None).unwrap();
1532
1533 let mut total_rows = 0;
1534 for batch_result in record_batch_iter {
1535 let batch = batch_result.unwrap();
1536 total_rows += batch.num_rows();
1537 assert!(batch.num_rows() > 0);
1538 assert_eq!(8, batch.num_columns());
1539 }
1540 assert_eq!(total_rows, range.num_rows());
1541 }
1542 }
1543
1544 #[test]
1545 fn test_bulk_memtable_ranges_with_projection() {
1546 let metadata = metadata_for_test();
1547 let memtable = BulkMemtable::new(
1548 111,
1549 BulkMemtableConfig::default(),
1550 metadata.clone(),
1551 None,
1552 None,
1553 false,
1554 MergeMode::LastRow,
1555 );
1556
1557 let bulk_part = create_bulk_part_with_converter(
1558 "projection_test",
1559 5,
1560 vec![5000, 6000, 7000],
1561 vec![Some(50.0), Some(60.0), Some(70.0)],
1562 500,
1563 )
1564 .unwrap();
1565
1566 memtable.write_bulk(bulk_part).unwrap();
1567
1568 let projection = vec![4u32];
1569 let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
1570 let ranges = memtable
1571 .ranges(
1572 Some(&projection),
1573 RangesOptions::default().with_predicate(predicate_group),
1574 )
1575 .unwrap();
1576
1577 assert_eq!(1, ranges.ranges.len());
1578 let range = ranges.ranges.get(&0).unwrap();
1579
1580 assert!(range.is_record_batch());
1581 let record_batch_iter = range.build_record_batch_iter(None, None).unwrap();
1582
1583 let mut total_rows = 0;
1584 for batch_result in record_batch_iter {
1585 let batch = batch_result.unwrap();
1586 assert!(batch.num_rows() > 0);
1587 assert_eq!(5, batch.num_columns());
1588 total_rows += batch.num_rows();
1589 }
1590 assert_eq!(3, total_rows);
1591 }
1592
1593 #[test]
1594 fn test_bulk_memtable_unsupported_operations() {
1595 let metadata = metadata_for_test();
1596 let memtable = BulkMemtable::new(
1597 111,
1598 BulkMemtableConfig::default(),
1599 metadata.clone(),
1600 None,
1601 None,
1602 false,
1603 MergeMode::LastRow,
1604 );
1605
1606 let key_values = build_key_values_with_ts_seq_values(
1607 &metadata,
1608 "test".to_string(),
1609 1,
1610 vec![1000].into_iter(),
1611 vec![Some(1.0)].into_iter(),
1612 1,
1613 );
1614
1615 let err = memtable.write(&key_values).unwrap_err();
1616 assert!(err.to_string().contains("not supported"));
1617
1618 let kv = key_values.iter().next().unwrap();
1619 let err = memtable.write_one(kv).unwrap_err();
1620 assert!(err.to_string().contains("not supported"));
1621 }
1622
1623 #[test]
1624 fn test_bulk_memtable_freeze() {
1625 let metadata = metadata_for_test();
1626 let memtable = BulkMemtable::new(
1627 222,
1628 BulkMemtableConfig::default(),
1629 metadata.clone(),
1630 None,
1631 None,
1632 false,
1633 MergeMode::LastRow,
1634 );
1635
1636 let bulk_part = create_bulk_part_with_converter(
1637 "freeze_test",
1638 10,
1639 vec![10000],
1640 vec![Some(100.0)],
1641 1000,
1642 )
1643 .unwrap();
1644
1645 memtable.write_bulk(bulk_part).unwrap();
1646 memtable.freeze().unwrap();
1647
1648 let stats_after_freeze = memtable.stats();
1649 assert_eq!(1, stats_after_freeze.num_rows);
1650 }
1651
1652 #[test]
1653 fn test_bulk_memtable_fork() {
1654 let metadata = metadata_for_test();
1655 let original_memtable = BulkMemtable::new(
1656 333,
1657 BulkMemtableConfig::default(),
1658 metadata.clone(),
1659 None,
1660 None,
1661 false,
1662 MergeMode::LastRow,
1663 );
1664
1665 let bulk_part =
1666 create_bulk_part_with_converter("fork_test", 15, vec![15000], vec![Some(150.0)], 1500)
1667 .unwrap();
1668
1669 original_memtable.write_bulk(bulk_part).unwrap();
1670
1671 let forked_memtable = original_memtable.fork(444, &metadata);
1672
1673 assert_eq!(forked_memtable.id(), 444);
1674 assert!(forked_memtable.is_empty());
1675 assert_eq!(0, forked_memtable.stats().num_rows);
1676
1677 assert!(!original_memtable.is_empty());
1678 assert_eq!(1, original_memtable.stats().num_rows);
1679 }
1680
1681 #[test]
1682 fn test_bulk_memtable_ranges_multiple_parts() {
1683 let metadata = metadata_for_test();
1684 let memtable = BulkMemtable::new(
1685 777,
1686 BulkMemtableConfig::default(),
1687 metadata.clone(),
1688 None,
1689 None,
1690 false,
1691 MergeMode::LastRow,
1692 );
1693 memtable.set_unordered_part_threshold(0);
1695
1696 let parts_data = vec![
1697 (
1698 "part1",
1699 1u32,
1700 vec![1000i64, 1100i64],
1701 vec![Some(10.0), Some(11.0)],
1702 100u64,
1703 ),
1704 (
1705 "part2",
1706 2u32,
1707 vec![2000i64, 2100i64],
1708 vec![Some(20.0), Some(21.0)],
1709 200u64,
1710 ),
1711 ("part3", 3u32, vec![3000i64], vec![Some(30.0)], 300u64),
1712 ];
1713
1714 for (k0, k1, timestamps, values, seq) in parts_data {
1715 let part = create_bulk_part_with_converter(k0, k1, timestamps, values, seq).unwrap();
1716 memtable.write_bulk(part).unwrap();
1717 }
1718
1719 let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
1720 let ranges = memtable
1721 .ranges(
1722 None,
1723 RangesOptions::default().with_predicate(predicate_group),
1724 )
1725 .unwrap();
1726
1727 assert_eq!(3, ranges.ranges.len());
1728 let total_rows: usize = ranges.ranges.values().map(|r| r.stats().num_rows()).sum();
1729 assert_eq!(5, total_rows);
1730 assert_eq!(3, ranges.ranges.len());
1731
1732 for (range_id, range) in ranges.ranges.iter() {
1733 assert!(*range_id < 3);
1734 assert!(range.num_rows() > 0);
1735 assert!(range.is_record_batch());
1736 }
1737 }
1738
1739 #[test]
1740 fn test_bulk_memtable_ranges_with_sequence_filter() {
1741 let metadata = metadata_for_test();
1742 let memtable = BulkMemtable::new(
1743 888,
1744 BulkMemtableConfig::default(),
1745 metadata.clone(),
1746 None,
1747 None,
1748 false,
1749 MergeMode::LastRow,
1750 );
1751
1752 let part = create_bulk_part_with_converter(
1753 "seq_test",
1754 1,
1755 vec![1000, 2000, 3000],
1756 vec![Some(10.0), Some(20.0), Some(30.0)],
1757 500,
1758 )
1759 .unwrap();
1760
1761 memtable.write_bulk(part).unwrap();
1762
1763 let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
1764 let sequence_filter = Some(SequenceRange::LtEq { max: 400 }); let ranges = memtable
1766 .ranges(
1767 None,
1768 RangesOptions::default()
1769 .with_predicate(predicate_group)
1770 .with_sequence(sequence_filter),
1771 )
1772 .unwrap();
1773
1774 assert_eq!(1, ranges.ranges.len());
1775 let range = ranges.ranges.get(&0).unwrap();
1776
1777 let mut record_batch_iter = range.build_record_batch_iter(None, None).unwrap();
1778 assert!(record_batch_iter.next().is_none());
1779 }
1780
1781 #[test]
1782 fn test_bulk_memtable_ranges_with_encoded_parts() {
1783 let metadata = metadata_for_test();
1784 let config = BulkMemtableConfig {
1785 merge_threshold: 8,
1786 ..Default::default()
1787 };
1788 let memtable = BulkMemtable::new(
1789 999,
1790 config,
1791 metadata.clone(),
1792 None,
1793 None,
1794 false,
1795 MergeMode::LastRow,
1796 );
1797 memtable.set_unordered_part_threshold(0);
1799
1800 for i in 0..10 {
1802 let part = create_bulk_part_with_converter(
1803 &format!("key_{}", i),
1804 i,
1805 vec![1000 + i as i64 * 100],
1806 vec![Some(i as f64 * 10.0)],
1807 100 + i as u64,
1808 )
1809 .unwrap();
1810 memtable.write_bulk(part).unwrap();
1811 }
1812
1813 memtable.compact(false).unwrap();
1814
1815 let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
1816 let ranges = memtable
1817 .ranges(
1818 None,
1819 RangesOptions::default().with_predicate(predicate_group),
1820 )
1821 .unwrap();
1822
1823 assert_eq!(3, ranges.ranges.len());
1825 let total_rows: usize = ranges.ranges.values().map(|r| r.stats().num_rows()).sum();
1826 assert_eq!(10, total_rows);
1827
1828 for (_range_id, range) in ranges.ranges.iter() {
1829 assert!(range.num_rows() > 0);
1830 assert!(range.is_record_batch());
1831
1832 let record_batch_iter = range.build_record_batch_iter(None, None).unwrap();
1833 let mut total_rows = 0;
1834 for batch_result in record_batch_iter {
1835 let batch = batch_result.unwrap();
1836 total_rows += batch.num_rows();
1837 assert!(batch.num_rows() > 0);
1838 }
1839 assert_eq!(total_rows, range.num_rows());
1840 }
1841 }
1842
1843 #[test]
1844 fn test_bulk_memtable_unordered_part() {
1845 let metadata = metadata_for_test();
1846 let memtable = BulkMemtable::new(
1847 1001,
1848 BulkMemtableConfig::default(),
1849 metadata.clone(),
1850 None,
1851 None,
1852 false,
1853 MergeMode::LastRow,
1854 );
1855
1856 memtable.set_unordered_part_threshold(5);
1859 memtable.set_unordered_part_compact_threshold(10);
1861
1862 for i in 0..3 {
1864 let part = create_bulk_part_with_converter(
1865 &format!("key_{}", i),
1866 i,
1867 vec![1000 + i as i64 * 100, 1100 + i as i64 * 100],
1868 vec![Some(i as f64 * 10.0), Some(i as f64 * 10.0 + 1.0)],
1869 100 + i as u64,
1870 )
1871 .unwrap();
1872 assert_eq!(2, part.num_rows());
1873 memtable.write_bulk(part).unwrap();
1874 }
1875
1876 let stats = memtable.stats();
1878 assert_eq!(6, stats.num_rows);
1879
1880 for i in 3..5 {
1883 let part = create_bulk_part_with_converter(
1884 &format!("key_{}", i),
1885 i,
1886 vec![1000 + i as i64 * 100, 1100 + i as i64 * 100],
1887 vec![Some(i as f64 * 10.0), Some(i as f64 * 10.0 + 1.0)],
1888 100 + i as u64,
1889 )
1890 .unwrap();
1891 memtable.write_bulk(part).unwrap();
1892 }
1893
1894 let stats = memtable.stats();
1896 assert_eq!(10, stats.num_rows);
1897
1898 let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
1900 let ranges = memtable
1901 .ranges(
1902 None,
1903 RangesOptions::default().with_predicate(predicate_group),
1904 )
1905 .unwrap();
1906
1907 assert!(!ranges.ranges.is_empty());
1909 let total_rows: usize = ranges.ranges.values().map(|r| r.stats().num_rows()).sum();
1910 assert_eq!(10, total_rows);
1911
1912 let mut total_rows_read = 0;
1914 for (_range_id, range) in ranges.ranges.iter() {
1915 assert!(range.is_record_batch());
1916 let record_batch_iter = range.build_record_batch_iter(None, None).unwrap();
1917
1918 for batch_result in record_batch_iter {
1919 let batch = batch_result.unwrap();
1920 total_rows_read += batch.num_rows();
1921 }
1922 }
1923 assert_eq!(10, total_rows_read);
1924 }
1925
1926 #[test]
1927 fn test_bulk_memtable_unordered_part_mixed_sizes() {
1928 let metadata = metadata_for_test();
1929 let memtable = BulkMemtable::new(
1930 1002,
1931 BulkMemtableConfig::default(),
1932 metadata.clone(),
1933 None,
1934 None,
1935 false,
1936 MergeMode::LastRow,
1937 );
1938
1939 memtable.set_unordered_part_threshold(4);
1941 memtable.set_unordered_part_compact_threshold(8);
1942
1943 for i in 0..2 {
1945 let part = create_bulk_part_with_converter(
1946 &format!("small_{}", i),
1947 i,
1948 vec![1000 + i as i64, 2000 + i as i64, 3000 + i as i64],
1949 vec![Some(i as f64), Some(i as f64 + 1.0), Some(i as f64 + 2.0)],
1950 10 + i as u64,
1951 )
1952 .unwrap();
1953 assert_eq!(3, part.num_rows());
1954 memtable.write_bulk(part).unwrap();
1955 }
1956
1957 let large_part = create_bulk_part_with_converter(
1959 "large_key",
1960 100,
1961 vec![5000, 6000, 7000, 8000, 9000],
1962 vec![
1963 Some(100.0),
1964 Some(101.0),
1965 Some(102.0),
1966 Some(103.0),
1967 Some(104.0),
1968 ],
1969 50,
1970 )
1971 .unwrap();
1972 assert_eq!(5, large_part.num_rows());
1973 memtable.write_bulk(large_part).unwrap();
1974
1975 let part = create_bulk_part_with_converter(
1977 "small_2",
1978 2,
1979 vec![4000, 4100],
1980 vec![Some(20.0), Some(21.0)],
1981 30,
1982 )
1983 .unwrap();
1984 memtable.write_bulk(part).unwrap();
1985
1986 let stats = memtable.stats();
1987 assert_eq!(13, stats.num_rows); let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
1991 let ranges = memtable
1992 .ranges(
1993 None,
1994 RangesOptions::default().with_predicate(predicate_group),
1995 )
1996 .unwrap();
1997
1998 let total_rows: usize = ranges.ranges.values().map(|r| r.stats().num_rows()).sum();
1999 assert_eq!(13, total_rows);
2000
2001 let mut total_rows_read = 0;
2002 for (_range_id, range) in ranges.ranges.iter() {
2003 let record_batch_iter = range.build_record_batch_iter(None, None).unwrap();
2004 for batch_result in record_batch_iter {
2005 let batch = batch_result.unwrap();
2006 total_rows_read += batch.num_rows();
2007 }
2008 }
2009 assert_eq!(13, total_rows_read);
2010 }
2011
2012 #[test]
2013 fn test_bulk_memtable_unordered_part_with_ranges() {
2014 let metadata = metadata_for_test();
2015 let memtable = BulkMemtable::new(
2016 1003,
2017 BulkMemtableConfig::default(),
2018 metadata.clone(),
2019 None,
2020 None,
2021 false,
2022 MergeMode::LastRow,
2023 );
2024
2025 memtable.set_unordered_part_threshold(3);
2027 memtable.set_unordered_part_compact_threshold(100); for i in 0..3 {
2031 let part = create_bulk_part_with_converter(
2032 &format!("key_{}", i),
2033 i,
2034 vec![1000 + i as i64 * 100],
2035 vec![Some(i as f64 * 10.0)],
2036 100 + i as u64,
2037 )
2038 .unwrap();
2039 assert_eq!(1, part.num_rows());
2040 memtable.write_bulk(part).unwrap();
2041 }
2042
2043 let stats = memtable.stats();
2044 assert_eq!(3, stats.num_rows);
2045
2046 let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
2048 let ranges = memtable
2049 .ranges(
2050 None,
2051 RangesOptions::default().with_predicate(predicate_group),
2052 )
2053 .unwrap();
2054
2055 assert_eq!(1, ranges.ranges.len());
2057 let total_rows: usize = ranges.ranges.values().map(|r| r.stats().num_rows()).sum();
2058 assert_eq!(3, total_rows);
2059
2060 let range = ranges.ranges.get(&0).unwrap();
2062 let record_batch_iter = range.build_record_batch_iter(None, None).unwrap();
2063
2064 let mut total_rows = 0;
2065 for batch_result in record_batch_iter {
2066 let batch = batch_result.unwrap();
2067 total_rows += batch.num_rows();
2068 assert!(batch.num_rows() > 0);
2070 }
2071 assert_eq!(3, total_rows);
2072 }
2073
2074 fn create_bulk_part_wrapper(part: BulkPart) -> BulkPartWrapper {
2076 BulkPartWrapper {
2077 part: PartToMerge::Bulk {
2078 part,
2079 file_id: FileId::random(),
2080 },
2081 merging: false,
2082 }
2083 }
2084
2085 #[test]
2086 fn test_should_merge_parts_below_threshold() {
2087 let mut bulk_parts = BulkParts::default();
2088
2089 for i in 0..DEFAULT_MERGE_THRESHOLD - 1 {
2091 let part = create_bulk_part_with_converter(
2092 &format!("key_{}", i),
2093 i as u32,
2094 vec![1000 + i as i64 * 100],
2095 vec![Some(i as f64 * 10.0)],
2096 100 + i as u64,
2097 )
2098 .unwrap();
2099 bulk_parts.parts.push(create_bulk_part_wrapper(part));
2100 }
2101
2102 assert!(!bulk_parts.should_merge_parts(DEFAULT_MERGE_THRESHOLD));
2104 }
2105
2106 #[test]
2107 fn test_should_merge_parts_at_threshold() {
2108 let mut bulk_parts = BulkParts::default();
2109 let merge_threshold = 8;
2110
2111 for i in 0..merge_threshold {
2113 let part = create_bulk_part_with_converter(
2114 &format!("key_{}", i),
2115 i as u32,
2116 vec![1000 + i as i64 * 100],
2117 vec![Some(i as f64 * 10.0)],
2118 100 + i as u64,
2119 )
2120 .unwrap();
2121 bulk_parts.parts.push(create_bulk_part_wrapper(part));
2122 }
2123
2124 assert!(bulk_parts.should_merge_parts(merge_threshold));
2126 }
2127
2128 #[test]
2129 fn test_should_merge_parts_with_merging_flag() {
2130 let mut bulk_parts = BulkParts::default();
2131 let merge_threshold = 8;
2132
2133 for i in 0..10 {
2135 let part = create_bulk_part_with_converter(
2136 &format!("key_{}", i),
2137 i as u32,
2138 vec![1000 + i as i64 * 100],
2139 vec![Some(i as f64 * 10.0)],
2140 100 + i as u64,
2141 )
2142 .unwrap();
2143 bulk_parts.parts.push(create_bulk_part_wrapper(part));
2144 }
2145
2146 assert!(bulk_parts.should_merge_parts(merge_threshold));
2148
2149 for wrapper in bulk_parts.parts.iter_mut().take(3) {
2151 wrapper.merging = true;
2152 }
2153
2154 assert!(!bulk_parts.should_merge_parts(merge_threshold));
2156 }
2157
2158 #[test]
2159 fn test_collect_parts_to_merge_grouping() {
2160 let mut bulk_parts = BulkParts::default();
2161
2162 for i in 0..16 {
2164 let num_rows = (i % 4) + 1; let timestamps: Vec<i64> = (0..num_rows)
2166 .map(|j| 1000 + i as i64 * 100 + j as i64)
2167 .collect();
2168 let values: Vec<Option<f64>> =
2169 (0..num_rows).map(|j| Some((i * 10 + j) as f64)).collect();
2170 let part = create_bulk_part_with_converter(
2171 &format!("key_{}", i),
2172 i as u32,
2173 timestamps,
2174 values,
2175 100 + i as u64,
2176 )
2177 .unwrap();
2178 bulk_parts.parts.push(create_bulk_part_wrapper(part));
2179 }
2180
2181 assert!(bulk_parts.should_merge_parts(DEFAULT_MERGE_THRESHOLD));
2183
2184 let collected =
2186 bulk_parts.collect_parts_to_merge(DEFAULT_MERGE_THRESHOLD, DEFAULT_MAX_MERGE_GROUPS);
2187
2188 assert!(!collected.groups.is_empty());
2190
2191 for group in &collected.groups {
2193 assert!(!group.is_empty());
2194 }
2195
2196 let total_parts: usize = collected.groups.iter().map(|g| g.len()).sum();
2198 assert_eq!(16, total_parts);
2199 }
2200
2201 #[test]
2202 fn test_bulk_memtable_ranges_with_multi_bulk_part() {
2203 let metadata = metadata_for_test();
2204 let merge_threshold = 8;
2205 let config = BulkMemtableConfig {
2206 merge_threshold,
2207 ..Default::default()
2208 };
2209 let memtable = BulkMemtable::new(
2210 2005,
2211 config,
2212 metadata.clone(),
2213 None,
2214 None,
2215 false,
2216 MergeMode::LastRow,
2217 );
2218 memtable.set_unordered_part_threshold(0);
2220
2221 for i in 0..merge_threshold {
2225 let part = create_bulk_part_with_converter(
2226 &format!("key_{}", i),
2227 i as u32,
2228 vec![1000 + i as i64 * 100, 2000 + i as i64 * 100],
2229 vec![Some(i as f64 * 10.0), Some(i as f64 * 10.0 + 1.0)],
2230 100 + i as u64,
2231 )
2232 .unwrap();
2233 memtable.write_bulk(part).unwrap();
2234 }
2235
2236 memtable.compact(false).unwrap();
2238
2239 let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
2241 let ranges = memtable
2242 .ranges(
2243 None,
2244 RangesOptions::default().with_predicate(predicate_group),
2245 )
2246 .unwrap();
2247
2248 assert_eq!(1, ranges.ranges.len());
2249 let expected_rows = merge_threshold * 2; let total_rows: usize = ranges.ranges.values().map(|r| r.stats().num_rows()).sum();
2251 assert_eq!(expected_rows, total_rows);
2252
2253 let mut total_rows_read = 0;
2255 for (_range_id, range) in ranges.ranges.iter() {
2256 assert!(range.is_record_batch());
2257 let record_batch_iter = range.build_record_batch_iter(None, None).unwrap();
2258
2259 for batch_result in record_batch_iter {
2260 let batch = batch_result.unwrap();
2261 total_rows_read += batch.num_rows();
2262 }
2263 }
2264 assert_eq!(expected_rows, total_rows_read);
2265 }
2266
2267 #[test]
2268 fn test_multi_bulk_range_iter_builder_all_pruned() {
2269 let metadata = metadata_for_test();
2270 let merge_threshold = 8;
2271 let config = BulkMemtableConfig {
2272 merge_threshold,
2273 ..Default::default()
2274 };
2275 let memtable = BulkMemtable::new(
2276 2006,
2277 config,
2278 metadata.clone(),
2279 None,
2280 None,
2281 false,
2282 MergeMode::LastRow,
2283 );
2284 memtable.set_unordered_part_threshold(0);
2285
2286 for i in 0..merge_threshold {
2288 let part = create_bulk_part_with_converter(
2289 &format!("key_{}", i),
2290 i as u32,
2291 vec![1000 + i as i64 * 100, 2000 + i as i64 * 100],
2292 vec![Some(i as f64 * 10.0), Some(i as f64 * 10.0 + 1.0)],
2293 100 + i as u64,
2294 )
2295 .unwrap();
2296 memtable.write_bulk(part).unwrap();
2297 }
2298 memtable.compact(false).unwrap();
2299
2300 let filter = datafusion_expr::col("k0").eq(datafusion_expr::lit("nonexistent"));
2302 let predicate_group = PredicateGroup::new(&metadata, &[filter]).unwrap();
2303 let ranges = memtable
2304 .ranges(
2305 None,
2306 RangesOptions::default().with_predicate(predicate_group),
2307 )
2308 .unwrap();
2309
2310 for (_range_id, range) in ranges.ranges.iter() {
2313 assert!(range.is_record_batch());
2314 let record_batch_iter = range.build_record_batch_iter(None, None).unwrap();
2315 let total_rows: usize = record_batch_iter.map(|r| r.unwrap().num_rows()).sum();
2316 assert_eq!(0, total_rows);
2317 }
2318 }
2319}