1pub(crate) mod chunk_reader;
18pub mod context;
19pub mod part;
20pub mod part_reader;
21mod row_group_reader;
22
23use std::collections::{BTreeMap, HashSet};
24use std::sync::atomic::{AtomicI64, AtomicU64, AtomicUsize, Ordering};
25use std::sync::{Arc, LazyLock, Mutex, RwLock};
26use std::time::Instant;
27
28fn env_usize(name: &str, default: usize) -> usize {
30 std::env::var(name)
31 .ok()
32 .and_then(|v| v.parse().ok())
33 .unwrap_or(default)
34}
35
36use common_time::Timestamp;
37use datatypes::arrow::datatypes::SchemaRef;
38use mito_codec::key_values::KeyValue;
39use rayon::prelude::*;
40use store_api::metadata::RegionMetadataRef;
41use store_api::storage::{ColumnId, FileId, RegionId, SequenceRange};
42use tokio::sync::Semaphore;
43
44use crate::error::{Result, UnsupportedOperationSnafu};
45use crate::flush::WriteBufferManagerRef;
46use crate::memtable::bulk::context::BulkIterContext;
47use crate::memtable::bulk::part::{
48 BulkPart, BulkPartEncodeMetrics, BulkPartEncoder, MultiBulkPart, UnorderedPart,
49};
50use crate::memtable::bulk::part_reader::BulkPartBatchIter;
51use crate::memtable::stats::WriteMetrics;
52use crate::memtable::{
53 AllocTracker, BoxedBatchIterator, BoxedRecordBatchIterator, EncodedBulkPart, EncodedRange,
54 IterBuilder, KeyValues, MemScanMetrics, Memtable, MemtableBuilder, MemtableId, MemtableRange,
55 MemtableRangeContext, MemtableRanges, MemtableRef, MemtableStats, RangesOptions,
56};
57use crate::read::flat_dedup::{FlatDedupIterator, FlatLastNonNull, FlatLastRow};
58use crate::read::flat_merge::FlatMergeIterator;
59use crate::region::options::MergeMode;
60use crate::sst::parquet::flat_format::field_column_start;
61use crate::sst::parquet::{DEFAULT_READ_BATCH_SIZE, DEFAULT_ROW_GROUP_SIZE};
62use crate::sst::{FlatSchemaOptions, to_flat_sst_arrow_schema};
63
64const DEFAULT_MERGE_THRESHOLD: usize = 16;
66
67static MERGE_THRESHOLD: LazyLock<usize> =
69 LazyLock::new(|| env_usize("GREPTIME_BULK_MERGE_THRESHOLD", DEFAULT_MERGE_THRESHOLD));
70
71const DEFAULT_MAX_MERGE_GROUPS: usize = 32;
73
74static MAX_MERGE_GROUPS: LazyLock<usize> =
76 LazyLock::new(|| env_usize("GREPTIME_BULK_MAX_MERGE_GROUPS", DEFAULT_MAX_MERGE_GROUPS));
77
78pub(crate) static ENCODE_ROW_THRESHOLD: LazyLock<usize> = LazyLock::new(|| {
81 env_usize(
82 "GREPTIME_BULK_ENCODE_ROW_THRESHOLD",
83 10 * DEFAULT_ROW_GROUP_SIZE,
84 )
85});
86
87const DEFAULT_ENCODE_BYTES_THRESHOLD: usize = 64 * 1024 * 1024;
89
90static ENCODE_BYTES_THRESHOLD: LazyLock<usize> = LazyLock::new(|| {
93 env_usize(
94 "GREPTIME_BULK_ENCODE_BYTES_THRESHOLD",
95 DEFAULT_ENCODE_BYTES_THRESHOLD,
96 )
97});
98
99#[derive(Debug, Clone)]
101pub struct BulkMemtableConfig {
102 pub merge_threshold: usize,
104 pub encode_row_threshold: usize,
106 pub encode_bytes_threshold: usize,
108 pub max_merge_groups: usize,
110}
111
112impl Default for BulkMemtableConfig {
113 fn default() -> Self {
114 Self {
115 merge_threshold: *MERGE_THRESHOLD,
116 encode_row_threshold: *ENCODE_ROW_THRESHOLD,
117 encode_bytes_threshold: *ENCODE_BYTES_THRESHOLD,
118 max_merge_groups: *MAX_MERGE_GROUPS,
119 }
120 }
121}
122
123enum MergedPart {
125 Multi(MultiBulkPart),
127 Encoded(EncodedBulkPart),
129}
130
131struct CollectedParts {
133 groups: Vec<Vec<PartToMerge>>,
135}
136
137#[derive(Default)]
139struct BulkParts {
140 unordered_part: UnorderedPart,
142 parts: Vec<BulkPartWrapper>,
144}
145
146impl BulkParts {
147 fn num_parts(&self) -> usize {
149 let unordered_count = if self.unordered_part.is_empty() { 0 } else { 1 };
150 self.parts.len() + unordered_count
151 }
152
153 fn is_empty(&self) -> bool {
155 self.unordered_part.is_empty() && self.parts.is_empty()
156 }
157
158 fn should_merge_parts(&self, merge_threshold: usize) -> bool {
161 let mut bulk_count = 0;
162 let mut encoded_count = 0;
163
164 for wrapper in &self.parts {
165 if wrapper.merging {
166 continue;
167 }
168
169 if wrapper.part.is_encoded() {
170 encoded_count += 1;
171 } else {
172 bulk_count += 1;
173 }
174
175 if bulk_count >= merge_threshold || encoded_count >= merge_threshold {
177 return true;
178 }
179 }
180
181 false
182 }
183
184 fn should_compact_unordered_part(&self) -> bool {
186 self.unordered_part.should_compact()
187 }
188
189 fn collect_parts_to_merge(
193 &mut self,
194 merge_threshold: usize,
195 max_merge_groups: usize,
196 ) -> CollectedParts {
197 let mut bulk_indices: Vec<(usize, usize)> = Vec::new();
199 let mut encoded_indices: Vec<(usize, usize)> = Vec::new();
200
201 for (idx, wrapper) in self.parts.iter().enumerate() {
202 if wrapper.merging {
203 continue;
204 }
205 let num_rows = wrapper.part.num_rows();
206 if wrapper.part.is_encoded() {
207 encoded_indices.push((idx, num_rows));
208 } else {
209 bulk_indices.push((idx, num_rows));
210 }
211 }
212
213 let mut groups = Vec::new();
214
215 if bulk_indices.len() >= merge_threshold {
217 groups.extend(self.collect_and_group_parts(
218 bulk_indices,
219 merge_threshold,
220 max_merge_groups,
221 ));
222 }
223
224 if encoded_indices.len() >= merge_threshold {
226 groups.extend(self.collect_and_group_parts(
227 encoded_indices,
228 merge_threshold,
229 max_merge_groups,
230 ));
231 }
232
233 CollectedParts { groups }
234 }
235
236 fn collect_and_group_parts(
238 &mut self,
239 mut indices: Vec<(usize, usize)>,
240 merge_threshold: usize,
241 max_merge_groups: usize,
242 ) -> Vec<Vec<PartToMerge>> {
243 if indices.is_empty() {
244 return Vec::new();
245 }
246
247 indices.sort_unstable_by_key(|(_, num_rows)| *num_rows);
249
250 indices
252 .chunks(merge_threshold)
253 .take(max_merge_groups)
254 .map(|chunk| {
255 chunk
256 .iter()
257 .map(|(idx, _)| {
258 let wrapper = &mut self.parts[*idx];
259 wrapper.merging = true;
260 wrapper.part.clone()
261 })
262 .collect()
263 })
264 .collect()
265 }
266
267 fn install_merged_parts<I>(
270 &mut self,
271 merged_parts: I,
272 merged_file_ids: &HashSet<FileId>,
273 ) -> usize
274 where
275 I: IntoIterator<Item = MergedPart>,
276 {
277 let mut total_output_rows = 0;
278
279 for merged_part in merged_parts {
280 match merged_part {
281 MergedPart::Encoded(encoded_part) => {
282 total_output_rows += encoded_part.metadata().num_rows;
283 self.parts.push(BulkPartWrapper {
284 part: PartToMerge::Encoded {
285 part: encoded_part,
286 file_id: FileId::random(),
287 },
288 merging: false,
289 });
290 }
291 MergedPart::Multi(multi_part) => {
292 total_output_rows += multi_part.num_rows();
293 self.parts.push(BulkPartWrapper {
294 part: PartToMerge::Multi {
295 part: multi_part,
296 file_id: FileId::random(),
297 },
298 merging: false,
299 });
300 }
301 }
302 }
303
304 self.parts
305 .retain(|wrapper| !merged_file_ids.contains(&wrapper.file_id()));
306
307 total_output_rows
308 }
309
310 fn reset_merging_flags(&mut self, file_ids: &HashSet<FileId>) {
313 for wrapper in &mut self.parts {
314 if file_ids.contains(&wrapper.file_id()) {
315 wrapper.merging = false;
316 }
317 }
318 }
319}
320
321struct MergingFlagsGuard<'a> {
324 bulk_parts: &'a RwLock<BulkParts>,
325 file_ids: &'a HashSet<FileId>,
326 success: bool,
327}
328
329impl<'a> MergingFlagsGuard<'a> {
330 fn new(bulk_parts: &'a RwLock<BulkParts>, file_ids: &'a HashSet<FileId>) -> Self {
332 Self {
333 bulk_parts,
334 file_ids,
335 success: false,
336 }
337 }
338
339 fn mark_success(&mut self) {
342 self.success = true;
343 }
344}
345
346impl<'a> Drop for MergingFlagsGuard<'a> {
347 fn drop(&mut self) {
348 if !self.success
349 && let Ok(mut parts) = self.bulk_parts.write()
350 {
351 parts.reset_merging_flags(self.file_ids);
352 }
353 }
354}
355
356pub struct BulkMemtable {
358 id: MemtableId,
359 config: BulkMemtableConfig,
361 parts: Arc<RwLock<BulkParts>>,
362 metadata: RegionMetadataRef,
363 alloc_tracker: AllocTracker,
364 max_timestamp: AtomicI64,
365 min_timestamp: AtomicI64,
366 max_sequence: AtomicU64,
367 num_rows: AtomicUsize,
368 flat_arrow_schema: SchemaRef,
370 compactor: Arc<Mutex<MemtableCompactor>>,
372 compact_dispatcher: Option<Arc<CompactDispatcher>>,
374 append_mode: bool,
376 merge_mode: MergeMode,
378}
379
380impl std::fmt::Debug for BulkMemtable {
381 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
382 f.debug_struct("BulkMemtable")
383 .field("id", &self.id)
384 .field("num_rows", &self.num_rows.load(Ordering::Relaxed))
385 .field("min_timestamp", &self.min_timestamp.load(Ordering::Relaxed))
386 .field("max_timestamp", &self.max_timestamp.load(Ordering::Relaxed))
387 .field("max_sequence", &self.max_sequence.load(Ordering::Relaxed))
388 .finish()
389 }
390}
391
392impl Memtable for BulkMemtable {
393 fn id(&self) -> MemtableId {
394 self.id
395 }
396
397 fn write(&self, _kvs: &KeyValues) -> Result<()> {
398 UnsupportedOperationSnafu {
399 err_msg: "write() is not supported for bulk memtable",
400 }
401 .fail()
402 }
403
404 fn write_one(&self, _key_value: KeyValue) -> Result<()> {
405 UnsupportedOperationSnafu {
406 err_msg: "write_one() is not supported for bulk memtable",
407 }
408 .fail()
409 }
410
411 fn write_bulk(&self, fragment: BulkPart) -> Result<()> {
412 let local_metrics = WriteMetrics {
413 key_bytes: 0,
414 value_bytes: fragment.estimated_size(),
415 min_ts: fragment.min_timestamp,
416 max_ts: fragment.max_timestamp,
417 num_rows: fragment.num_rows(),
418 max_sequence: fragment.sequence,
419 };
420
421 {
422 let mut bulk_parts = self.parts.write().unwrap();
423
424 if bulk_parts.unordered_part.should_accept(fragment.num_rows()) {
426 bulk_parts.unordered_part.push(fragment);
427
428 if bulk_parts.should_compact_unordered_part()
430 && let Some(bulk_part) = bulk_parts.unordered_part.to_bulk_part()?
431 {
432 bulk_parts.parts.push(BulkPartWrapper {
433 part: PartToMerge::Bulk {
434 part: bulk_part,
435 file_id: FileId::random(),
436 },
437 merging: false,
438 });
439 bulk_parts.unordered_part.clear();
440 }
441 } else {
442 bulk_parts.parts.push(BulkPartWrapper {
443 part: PartToMerge::Bulk {
444 part: fragment,
445 file_id: FileId::random(),
446 },
447 merging: false,
448 });
449 }
450
451 self.update_stats(local_metrics);
456 }
457
458 if self.should_compact() {
459 self.schedule_compact();
460 }
461
462 Ok(())
463 }
464
465 fn ranges(
466 &self,
467 projection: Option<&[ColumnId]>,
468 options: RangesOptions,
469 ) -> Result<MemtableRanges> {
470 let predicate = options.predicate;
471 let sequence = options.sequence;
472 let mut ranges = BTreeMap::new();
473 let mut range_id = 0;
474
475 let context = Arc::new(BulkIterContext::new_with_pre_filter_mode(
477 self.metadata.clone(),
478 projection,
479 predicate.predicate().cloned(),
480 options.for_flush,
481 options.pre_filter_mode,
482 )?);
483
484 {
486 let bulk_parts = self.parts.read().unwrap();
487
488 if !bulk_parts.unordered_part.is_empty()
490 && let Some(unordered_bulk_part) = bulk_parts.unordered_part.to_bulk_part()?
491 {
492 let part_stats = unordered_bulk_part.to_memtable_stats(&self.metadata);
493 let range = MemtableRange::new(
494 Arc::new(MemtableRangeContext::new(
495 self.id,
496 Box::new(BulkRangeIterBuilder {
497 part: unordered_bulk_part,
498 context: context.clone(),
499 sequence,
500 }),
501 predicate.clone(),
502 )),
503 part_stats,
504 );
505 ranges.insert(range_id, range);
506 range_id += 1;
507 }
508
509 for part_wrapper in bulk_parts.parts.iter() {
511 if part_wrapper.part.num_rows() == 0 {
513 continue;
514 }
515
516 let part_stats = part_wrapper.part.to_memtable_stats(&self.metadata);
517 let iter_builder: Box<dyn IterBuilder> = match &part_wrapper.part {
518 PartToMerge::Bulk { part, .. } => Box::new(BulkRangeIterBuilder {
519 part: part.clone(),
520 context: context.clone(),
521 sequence,
522 }),
523 PartToMerge::Multi { part, .. } => Box::new(MultiBulkRangeIterBuilder {
524 part: part.clone(),
525 context: context.clone(),
526 sequence,
527 }),
528 PartToMerge::Encoded { part, file_id } => {
529 Box::new(EncodedBulkRangeIterBuilder {
530 file_id: *file_id,
531 part: part.clone(),
532 context: context.clone(),
533 sequence,
534 })
535 }
536 };
537
538 let range = MemtableRange::new(
539 Arc::new(MemtableRangeContext::new(
540 self.id,
541 iter_builder,
542 predicate.clone(),
543 )),
544 part_stats,
545 );
546 ranges.insert(range_id, range);
547 range_id += 1;
548 }
549 }
550
551 Ok(MemtableRanges { ranges })
552 }
553
554 fn is_empty(&self) -> bool {
555 let bulk_parts = self.parts.read().unwrap();
556 bulk_parts.is_empty()
557 }
558
559 fn freeze(&self) -> Result<()> {
560 self.alloc_tracker.done_allocating();
561 Ok(())
562 }
563
564 fn stats(&self) -> MemtableStats {
565 let estimated_bytes = self.alloc_tracker.bytes_allocated();
566
567 if estimated_bytes == 0 || self.num_rows.load(Ordering::Relaxed) == 0 {
568 return MemtableStats {
569 estimated_bytes,
570 time_range: None,
571 num_rows: 0,
572 num_ranges: 0,
573 max_sequence: 0,
574 series_count: 0,
575 };
576 }
577
578 let ts_type = self
579 .metadata
580 .time_index_column()
581 .column_schema
582 .data_type
583 .clone()
584 .as_timestamp()
585 .expect("Timestamp column must have timestamp type");
586 let max_timestamp = ts_type.create_timestamp(self.max_timestamp.load(Ordering::Relaxed));
587 let min_timestamp = ts_type.create_timestamp(self.min_timestamp.load(Ordering::Relaxed));
588
589 let num_ranges = self.parts.read().unwrap().num_parts();
590
591 MemtableStats {
592 estimated_bytes,
593 time_range: Some((min_timestamp, max_timestamp)),
594 num_rows: self.num_rows.load(Ordering::Relaxed),
595 num_ranges,
596 max_sequence: self.max_sequence.load(Ordering::Relaxed),
597 series_count: self.estimated_series_count(),
598 }
599 }
600
601 fn fork(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef {
602 let flat_arrow_schema = to_flat_sst_arrow_schema(
604 metadata,
605 &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
606 );
607
608 Arc::new(Self {
609 id,
610 config: self.config.clone(),
611 parts: Arc::new(RwLock::new(BulkParts::default())),
612 metadata: metadata.clone(),
613 alloc_tracker: AllocTracker::new(self.alloc_tracker.write_buffer_manager()),
614 max_timestamp: AtomicI64::new(i64::MIN),
615 min_timestamp: AtomicI64::new(i64::MAX),
616 max_sequence: AtomicU64::new(0),
617 num_rows: AtomicUsize::new(0),
618 flat_arrow_schema,
619 compactor: Arc::new(Mutex::new(MemtableCompactor::new(
620 metadata.region_id,
621 id,
622 self.config.clone(),
623 ))),
624 compact_dispatcher: self.compact_dispatcher.clone(),
625 append_mode: self.append_mode,
626 merge_mode: self.merge_mode,
627 })
628 }
629
630 fn compact(&self, for_flush: bool) -> Result<()> {
631 let mut compactor = self.compactor.lock().unwrap();
632
633 if for_flush {
634 return Ok(());
635 }
636
637 let should_merge = self
639 .parts
640 .read()
641 .unwrap()
642 .should_merge_parts(self.config.merge_threshold);
643 if should_merge {
644 compactor.merge_parts(
645 &self.flat_arrow_schema,
646 &self.parts,
647 &self.metadata,
648 !self.append_mode,
649 self.merge_mode,
650 )?;
651 }
652
653 Ok(())
654 }
655}
656
657impl BulkMemtable {
658 pub fn new(
660 id: MemtableId,
661 config: BulkMemtableConfig,
662 metadata: RegionMetadataRef,
663 write_buffer_manager: Option<WriteBufferManagerRef>,
664 compact_dispatcher: Option<Arc<CompactDispatcher>>,
665 append_mode: bool,
666 merge_mode: MergeMode,
667 ) -> Self {
668 let flat_arrow_schema = to_flat_sst_arrow_schema(
669 &metadata,
670 &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
671 );
672
673 let region_id = metadata.region_id;
674 Self {
675 id,
676 config: config.clone(),
677 parts: Arc::new(RwLock::new(BulkParts::default())),
678 metadata,
679 alloc_tracker: AllocTracker::new(write_buffer_manager),
680 max_timestamp: AtomicI64::new(i64::MIN),
681 min_timestamp: AtomicI64::new(i64::MAX),
682 max_sequence: AtomicU64::new(0),
683 num_rows: AtomicUsize::new(0),
684 flat_arrow_schema,
685 compactor: Arc::new(Mutex::new(MemtableCompactor::new(region_id, id, config))),
686 compact_dispatcher,
687 append_mode,
688 merge_mode,
689 }
690 }
691
692 #[cfg(test)]
694 pub fn set_unordered_part_threshold(&self, threshold: usize) {
695 self.parts
696 .write()
697 .unwrap()
698 .unordered_part
699 .set_threshold(threshold);
700 }
701
702 #[cfg(test)]
704 pub fn set_unordered_part_compact_threshold(&self, compact_threshold: usize) {
705 self.parts
706 .write()
707 .unwrap()
708 .unordered_part
709 .set_compact_threshold(compact_threshold);
710 }
711
712 fn update_stats(&self, stats: WriteMetrics) {
716 self.alloc_tracker
717 .on_allocation(stats.key_bytes + stats.value_bytes);
718
719 self.max_timestamp
720 .fetch_max(stats.max_ts, Ordering::Relaxed);
721 self.min_timestamp
722 .fetch_min(stats.min_ts, Ordering::Relaxed);
723 self.max_sequence
724 .fetch_max(stats.max_sequence, Ordering::Relaxed);
725 self.num_rows.fetch_add(stats.num_rows, Ordering::Relaxed);
726 }
727
728 fn estimated_series_count(&self) -> usize {
730 let bulk_parts = self.parts.read().unwrap();
731 bulk_parts
732 .parts
733 .iter()
734 .map(|part_wrapper| part_wrapper.part.series_count())
735 .sum()
736 }
737
738 fn should_compact(&self) -> bool {
740 let parts = self.parts.read().unwrap();
741 parts.should_merge_parts(self.config.merge_threshold)
742 }
743
744 fn schedule_compact(&self) {
746 if let Some(dispatcher) = &self.compact_dispatcher {
747 let task = MemCompactTask {
748 metadata: self.metadata.clone(),
749 parts: self.parts.clone(),
750 config: self.config.clone(),
751 flat_arrow_schema: self.flat_arrow_schema.clone(),
752 compactor: self.compactor.clone(),
753 append_mode: self.append_mode,
754 merge_mode: self.merge_mode,
755 };
756
757 dispatcher.dispatch_compact(task);
758 } else {
759 if let Err(e) = self.compact(false) {
761 common_telemetry::error!(e; "Failed to compact table");
762 }
763 }
764 }
765}
766
767pub struct BulkRangeIterBuilder {
769 pub part: BulkPart,
770 pub context: Arc<BulkIterContext>,
771 pub sequence: Option<SequenceRange>,
772}
773
774struct MultiBulkRangeIterBuilder {
776 part: MultiBulkPart,
777 context: Arc<BulkIterContext>,
778 sequence: Option<SequenceRange>,
779}
780
781impl IterBuilder for BulkRangeIterBuilder {
782 fn build(&self, _metrics: Option<MemScanMetrics>) -> Result<BoxedBatchIterator> {
783 UnsupportedOperationSnafu {
784 err_msg: "BatchIterator is not supported for bulk memtable",
785 }
786 .fail()
787 }
788
789 fn is_record_batch(&self) -> bool {
790 true
791 }
792
793 fn build_record_batch(
794 &self,
795 _time_range: Option<(Timestamp, Timestamp)>,
796 metrics: Option<MemScanMetrics>,
797 ) -> Result<BoxedRecordBatchIterator> {
798 let series_count = self.part.estimated_series_count();
799 let iter = BulkPartBatchIter::from_single(
800 self.part.batch.clone(),
801 self.context.clone(),
802 self.sequence,
803 series_count,
804 metrics,
805 );
806
807 Ok(Box::new(iter))
808 }
809
810 fn encoded_range(&self) -> Option<EncodedRange> {
811 None
812 }
813}
814
815impl IterBuilder for MultiBulkRangeIterBuilder {
816 fn build(&self, _metrics: Option<MemScanMetrics>) -> Result<BoxedBatchIterator> {
817 UnsupportedOperationSnafu {
818 err_msg: "BatchIterator is not supported for multi bulk memtable",
819 }
820 .fail()
821 }
822
823 fn is_record_batch(&self) -> bool {
824 true
825 }
826
827 fn build_record_batch(
828 &self,
829 _time_range: Option<(Timestamp, Timestamp)>,
830 metrics: Option<MemScanMetrics>,
831 ) -> Result<BoxedRecordBatchIterator> {
832 self.part
833 .read(self.context.clone(), self.sequence, metrics)?
834 .ok_or_else(|| {
835 UnsupportedOperationSnafu {
836 err_msg: "Failed to create iterator for multi bulk part",
837 }
838 .build()
839 })
840 }
841
842 fn encoded_range(&self) -> Option<EncodedRange> {
843 None
844 }
845}
846
847struct EncodedBulkRangeIterBuilder {
849 file_id: FileId,
850 part: EncodedBulkPart,
851 context: Arc<BulkIterContext>,
852 sequence: Option<SequenceRange>,
853}
854
855impl IterBuilder for EncodedBulkRangeIterBuilder {
856 fn build(&self, _metrics: Option<MemScanMetrics>) -> Result<BoxedBatchIterator> {
857 UnsupportedOperationSnafu {
858 err_msg: "BatchIterator is not supported for encoded bulk memtable",
859 }
860 .fail()
861 }
862
863 fn is_record_batch(&self) -> bool {
864 true
865 }
866
867 fn build_record_batch(
868 &self,
869 _time_range: Option<(Timestamp, Timestamp)>,
870 metrics: Option<MemScanMetrics>,
871 ) -> Result<BoxedRecordBatchIterator> {
872 if let Some(iter) = self
873 .part
874 .read(self.context.clone(), self.sequence, metrics)?
875 {
876 Ok(iter)
877 } else {
878 Ok(Box::new(std::iter::empty()))
880 }
881 }
882
883 fn encoded_range(&self) -> Option<EncodedRange> {
884 Some(EncodedRange {
885 data: self.part.data().clone(),
886 sst_info: self.part.to_sst_info(self.file_id),
887 })
888 }
889}
890
891struct BulkPartWrapper {
892 part: PartToMerge,
894 merging: bool,
896}
897
898impl BulkPartWrapper {
899 fn file_id(&self) -> FileId {
901 self.part.file_id()
902 }
903}
904
905#[derive(Clone)]
907enum PartToMerge {
908 Bulk { part: BulkPart, file_id: FileId },
910 Multi {
912 part: MultiBulkPart,
913 file_id: FileId,
914 },
915 Encoded {
917 part: EncodedBulkPart,
918 file_id: FileId,
919 },
920}
921
922impl PartToMerge {
923 fn file_id(&self) -> FileId {
925 match self {
926 PartToMerge::Bulk { file_id, .. } => *file_id,
927 PartToMerge::Multi { file_id, .. } => *file_id,
928 PartToMerge::Encoded { file_id, .. } => *file_id,
929 }
930 }
931
932 fn min_timestamp(&self) -> i64 {
934 match self {
935 PartToMerge::Bulk { part, .. } => part.min_timestamp,
936 PartToMerge::Multi { part, .. } => part.min_timestamp(),
937 PartToMerge::Encoded { part, .. } => part.metadata().min_timestamp,
938 }
939 }
940
941 fn max_timestamp(&self) -> i64 {
943 match self {
944 PartToMerge::Bulk { part, .. } => part.max_timestamp,
945 PartToMerge::Multi { part, .. } => part.max_timestamp(),
946 PartToMerge::Encoded { part, .. } => part.metadata().max_timestamp,
947 }
948 }
949
950 fn num_rows(&self) -> usize {
952 match self {
953 PartToMerge::Bulk { part, .. } => part.num_rows(),
954 PartToMerge::Multi { part, .. } => part.num_rows(),
955 PartToMerge::Encoded { part, .. } => part.metadata().num_rows,
956 }
957 }
958
959 fn max_sequence(&self) -> u64 {
961 match self {
962 PartToMerge::Bulk { part, .. } => part.sequence,
963 PartToMerge::Multi { part, .. } => part.max_sequence(),
964 PartToMerge::Encoded { part, .. } => part.metadata().max_sequence,
965 }
966 }
967
968 fn series_count(&self) -> usize {
970 match self {
971 PartToMerge::Bulk { part, .. } => part.estimated_series_count(),
972 PartToMerge::Multi { part, .. } => part.series_count(),
973 PartToMerge::Encoded { part, .. } => part.metadata().num_series as usize,
974 }
975 }
976
977 fn is_encoded(&self) -> bool {
979 matches!(self, PartToMerge::Encoded { .. })
980 }
981
982 fn estimated_size(&self) -> usize {
984 match self {
985 PartToMerge::Bulk { part, .. } => part.estimated_size(),
986 PartToMerge::Multi { part, .. } => part.estimated_size(),
987 PartToMerge::Encoded { part, .. } => part.size_bytes(),
988 }
989 }
990
991 fn to_memtable_stats(&self, region_metadata: &RegionMetadataRef) -> MemtableStats {
993 match self {
994 PartToMerge::Bulk { part, .. } => part.to_memtable_stats(region_metadata),
995 PartToMerge::Multi { part, .. } => part.to_memtable_stats(region_metadata),
996 PartToMerge::Encoded { part, .. } => part.to_memtable_stats(),
997 }
998 }
999
1000 fn create_iterator(
1002 self,
1003 context: Arc<BulkIterContext>,
1004 ) -> Result<Option<BoxedRecordBatchIterator>> {
1005 match self {
1006 PartToMerge::Bulk { part, .. } => {
1007 let series_count = part.estimated_series_count();
1008 let iter = BulkPartBatchIter::from_single(
1009 part.batch,
1010 context,
1011 None, series_count,
1013 None, );
1015 Ok(Some(Box::new(iter) as BoxedRecordBatchIterator))
1016 }
1017 PartToMerge::Multi { part, .. } => part.read(context, None, None),
1018 PartToMerge::Encoded { part, .. } => part.read(context, None, None),
1019 }
1020 }
1021}
1022
1023struct MemtableCompactor {
1024 region_id: RegionId,
1025 memtable_id: MemtableId,
1026 config: BulkMemtableConfig,
1028}
1029
1030impl MemtableCompactor {
1031 fn new(region_id: RegionId, memtable_id: MemtableId, config: BulkMemtableConfig) -> Self {
1033 Self {
1034 region_id,
1035 memtable_id,
1036 config,
1037 }
1038 }
1039
1040 fn merge_parts(
1042 &mut self,
1043 arrow_schema: &SchemaRef,
1044 bulk_parts: &RwLock<BulkParts>,
1045 metadata: &RegionMetadataRef,
1046 dedup: bool,
1047 merge_mode: MergeMode,
1048 ) -> Result<()> {
1049 let start = Instant::now();
1050
1051 let collected = bulk_parts
1053 .write()
1054 .unwrap()
1055 .collect_parts_to_merge(self.config.merge_threshold, self.config.max_merge_groups);
1056
1057 if collected.groups.is_empty() {
1058 return Ok(());
1059 }
1060
1061 let merged_file_ids: HashSet<FileId> = collected
1063 .groups
1064 .iter()
1065 .flatten()
1066 .map(|part| part.file_id())
1067 .collect();
1068 let mut guard = MergingFlagsGuard::new(bulk_parts, &merged_file_ids);
1069
1070 let num_groups = collected.groups.len();
1071 let num_parts: usize = collected.groups.iter().map(|g| g.len()).sum();
1072
1073 let encode_row_threshold = self.config.encode_row_threshold;
1074 let encode_bytes_threshold = self.config.encode_bytes_threshold;
1075
1076 let merged_parts = collected
1078 .groups
1079 .into_par_iter()
1080 .map(|group| {
1081 Self::merge_parts_group(
1082 group,
1083 arrow_schema,
1084 metadata,
1085 dedup,
1086 merge_mode,
1087 encode_row_threshold,
1088 encode_bytes_threshold,
1089 )
1090 })
1091 .collect::<Result<Vec<Option<MergedPart>>>>()?;
1092
1093 let total_output_rows = {
1095 let mut parts = bulk_parts.write().unwrap();
1096 parts.install_merged_parts(merged_parts.into_iter().flatten(), &merged_file_ids)
1097 };
1098
1099 guard.mark_success();
1100
1101 common_telemetry::debug!(
1102 "BulkMemtable {} {} concurrent compact {} groups, {} parts, {} rows, cost: {:?}",
1103 self.region_id,
1104 self.memtable_id,
1105 num_groups,
1106 num_parts,
1107 total_output_rows,
1108 start.elapsed()
1109 );
1110
1111 Ok(())
1112 }
1113
1114 fn merge_parts_group(
1116 parts_to_merge: Vec<PartToMerge>,
1117 arrow_schema: &SchemaRef,
1118 metadata: &RegionMetadataRef,
1119 dedup: bool,
1120 merge_mode: MergeMode,
1121 encode_row_threshold: usize,
1122 encode_bytes_threshold: usize,
1123 ) -> Result<Option<MergedPart>> {
1124 if parts_to_merge.is_empty() {
1125 return Ok(None);
1126 }
1127
1128 let min_timestamp = parts_to_merge
1130 .iter()
1131 .map(|p| p.min_timestamp())
1132 .min()
1133 .unwrap_or(i64::MAX);
1134 let max_timestamp = parts_to_merge
1135 .iter()
1136 .map(|p| p.max_timestamp())
1137 .max()
1138 .unwrap_or(i64::MIN);
1139 let max_sequence = parts_to_merge
1140 .iter()
1141 .map(|p| p.max_sequence())
1142 .max()
1143 .unwrap_or(0);
1144
1145 let estimated_total_rows: usize = parts_to_merge.iter().map(|p| p.num_rows()).sum();
1147 let estimated_total_bytes: usize = parts_to_merge.iter().map(|p| p.estimated_size()).sum();
1148 let estimated_series_count = parts_to_merge
1149 .iter()
1150 .map(|p| p.series_count())
1151 .max()
1152 .unwrap_or(0);
1153
1154 let context = Arc::new(BulkIterContext::new(
1155 metadata.clone(),
1156 None, None, true,
1159 )?);
1160
1161 let iterators: Vec<BoxedRecordBatchIterator> = parts_to_merge
1163 .into_iter()
1164 .filter_map(|part| part.create_iterator(context.clone()).ok().flatten())
1165 .collect();
1166
1167 if iterators.is_empty() {
1168 return Ok(None);
1169 }
1170
1171 let merged_iter =
1172 FlatMergeIterator::new(arrow_schema.clone(), iterators, DEFAULT_READ_BATCH_SIZE)?;
1173
1174 let boxed_iter: BoxedRecordBatchIterator = if dedup {
1175 match merge_mode {
1177 MergeMode::LastRow => {
1178 let dedup_iter = FlatDedupIterator::new(merged_iter, FlatLastRow::new(false));
1179 Box::new(dedup_iter)
1180 }
1181 MergeMode::LastNonNull => {
1182 let field_column_start =
1183 field_column_start(metadata, arrow_schema.fields().len());
1184
1185 let dedup_iter = FlatDedupIterator::new(
1186 merged_iter,
1187 FlatLastNonNull::new(field_column_start, false),
1188 );
1189 Box::new(dedup_iter)
1190 }
1191 }
1192 } else {
1193 Box::new(merged_iter)
1194 };
1195
1196 if estimated_total_rows > encode_row_threshold
1198 || estimated_total_bytes > encode_bytes_threshold
1199 {
1200 let encoder = BulkPartEncoder::new(metadata.clone(), DEFAULT_ROW_GROUP_SIZE)?;
1201 let mut metrics = BulkPartEncodeMetrics::default();
1202 let encoded_part = encoder.encode_record_batch_iter(
1203 boxed_iter,
1204 arrow_schema.clone(),
1205 min_timestamp,
1206 max_timestamp,
1207 max_sequence,
1208 &mut metrics,
1209 )?;
1210
1211 common_telemetry::trace!("merge_parts_group metrics: {:?}", metrics);
1212
1213 Ok(encoded_part.map(MergedPart::Encoded))
1214 } else {
1215 let mut batches = Vec::new();
1217 let mut actual_total_rows = 0;
1218
1219 for batch_result in boxed_iter {
1220 let batch = batch_result?;
1221 actual_total_rows += batch.num_rows();
1222 batches.push(batch);
1223 }
1224
1225 if actual_total_rows == 0 {
1226 return Ok(None);
1227 }
1228
1229 let multi_part = MultiBulkPart::new(
1230 batches,
1231 min_timestamp,
1232 max_timestamp,
1233 max_sequence,
1234 estimated_series_count,
1235 );
1236
1237 common_telemetry::trace!(
1238 "merge_parts_group created MultiBulkPart: rows={}, batches={}",
1239 actual_total_rows,
1240 multi_part.num_batches()
1241 );
1242
1243 Ok(Some(MergedPart::Multi(multi_part)))
1244 }
1245 }
1246}
1247
1248struct MemCompactTask {
1250 metadata: RegionMetadataRef,
1251 parts: Arc<RwLock<BulkParts>>,
1252 config: BulkMemtableConfig,
1254 flat_arrow_schema: SchemaRef,
1256 compactor: Arc<Mutex<MemtableCompactor>>,
1258 append_mode: bool,
1260 merge_mode: MergeMode,
1262}
1263
1264impl MemCompactTask {
1265 fn compact(&self) -> Result<()> {
1266 let mut compactor = self.compactor.lock().unwrap();
1267
1268 let should_merge = self
1269 .parts
1270 .read()
1271 .unwrap()
1272 .should_merge_parts(self.config.merge_threshold);
1273 if should_merge {
1274 compactor.merge_parts(
1275 &self.flat_arrow_schema,
1276 &self.parts,
1277 &self.metadata,
1278 !self.append_mode,
1279 self.merge_mode,
1280 )?;
1281 }
1282
1283 Ok(())
1284 }
1285}
1286
1287#[derive(Debug)]
1289pub struct CompactDispatcher {
1290 semaphore: Arc<Semaphore>,
1291}
1292
1293impl CompactDispatcher {
1294 pub fn new(permits: usize) -> Self {
1296 Self {
1297 semaphore: Arc::new(Semaphore::new(permits)),
1298 }
1299 }
1300
1301 fn dispatch_compact(&self, task: MemCompactTask) {
1303 let semaphore = self.semaphore.clone();
1304 common_runtime::spawn_global(async move {
1305 let Ok(_permit) = semaphore.acquire().await else {
1306 return;
1307 };
1308
1309 common_runtime::spawn_blocking_global(move || {
1310 if let Err(e) = task.compact() {
1311 common_telemetry::error!(e; "Failed to compact memtable, region: {}", task.metadata.region_id);
1312 }
1313 });
1314 });
1315 }
1316}
1317
1318#[derive(Debug, Default)]
1320pub struct BulkMemtableBuilder {
1321 config: BulkMemtableConfig,
1323 write_buffer_manager: Option<WriteBufferManagerRef>,
1324 compact_dispatcher: Option<Arc<CompactDispatcher>>,
1325 append_mode: bool,
1326 merge_mode: MergeMode,
1327}
1328
1329impl BulkMemtableBuilder {
1330 pub fn new(
1332 write_buffer_manager: Option<WriteBufferManagerRef>,
1333 append_mode: bool,
1334 merge_mode: MergeMode,
1335 ) -> Self {
1336 Self {
1337 config: BulkMemtableConfig::default(),
1338 write_buffer_manager,
1339 compact_dispatcher: None,
1340 append_mode,
1341 merge_mode,
1342 }
1343 }
1344
1345 pub fn with_compact_dispatcher(mut self, compact_dispatcher: Arc<CompactDispatcher>) -> Self {
1347 self.compact_dispatcher = Some(compact_dispatcher);
1348 self
1349 }
1350}
1351
1352impl MemtableBuilder for BulkMemtableBuilder {
1353 fn build(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef {
1354 Arc::new(BulkMemtable::new(
1355 id,
1356 self.config.clone(),
1357 metadata.clone(),
1358 self.write_buffer_manager.clone(),
1359 self.compact_dispatcher.clone(),
1360 self.append_mode,
1361 self.merge_mode,
1362 ))
1363 }
1364
1365 fn use_bulk_insert(&self, _metadata: &RegionMetadataRef) -> bool {
1366 true
1367 }
1368}
1369
1370#[cfg(test)]
1371mod tests {
1372 use mito_codec::row_converter::build_primary_key_codec;
1373
1374 use super::*;
1375 use crate::memtable::bulk::part::BulkPartConverter;
1376 use crate::read::scan_region::PredicateGroup;
1377 use crate::sst::{FlatSchemaOptions, to_flat_sst_arrow_schema};
1378 use crate::test_util::memtable_util::{build_key_values_with_ts_seq_values, metadata_for_test};
1379
1380 fn create_bulk_part_with_converter(
1381 k0: &str,
1382 k1: u32,
1383 timestamps: Vec<i64>,
1384 values: Vec<Option<f64>>,
1385 sequence: u64,
1386 ) -> Result<BulkPart> {
1387 let metadata = metadata_for_test();
1388 let capacity = 100;
1389 let primary_key_codec = build_primary_key_codec(&metadata);
1390 let schema = to_flat_sst_arrow_schema(
1391 &metadata,
1392 &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
1393 );
1394
1395 let mut converter =
1396 BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec, true);
1397
1398 let key_values = build_key_values_with_ts_seq_values(
1399 &metadata,
1400 k0.to_string(),
1401 k1,
1402 timestamps.into_iter(),
1403 values.into_iter(),
1404 sequence,
1405 );
1406
1407 converter.append_key_values(&key_values)?;
1408 converter.convert()
1409 }
1410
1411 #[test]
1412 fn test_bulk_memtable_write_read() {
1413 let metadata = metadata_for_test();
1414 let memtable = BulkMemtable::new(
1415 999,
1416 BulkMemtableConfig::default(),
1417 metadata.clone(),
1418 None,
1419 None,
1420 false,
1421 MergeMode::LastRow,
1422 );
1423 memtable.set_unordered_part_threshold(0);
1425
1426 let test_data = [
1427 (
1428 "key_a",
1429 1u32,
1430 vec![1000i64, 2000i64],
1431 vec![Some(10.5), Some(20.5)],
1432 100u64,
1433 ),
1434 (
1435 "key_b",
1436 2u32,
1437 vec![1500i64, 2500i64],
1438 vec![Some(15.5), Some(25.5)],
1439 200u64,
1440 ),
1441 ("key_c", 3u32, vec![3000i64], vec![Some(30.5)], 300u64),
1442 ];
1443
1444 for (k0, k1, timestamps, values, seq) in test_data.iter() {
1445 let part =
1446 create_bulk_part_with_converter(k0, *k1, timestamps.clone(), values.clone(), *seq)
1447 .unwrap();
1448 memtable.write_bulk(part).unwrap();
1449 }
1450
1451 let stats = memtable.stats();
1452 assert_eq!(5, stats.num_rows);
1453 assert_eq!(3, stats.num_ranges);
1454 assert_eq!(300, stats.max_sequence);
1455
1456 let (min_ts, max_ts) = stats.time_range.unwrap();
1457 assert_eq!(1000, min_ts.value());
1458 assert_eq!(3000, max_ts.value());
1459
1460 let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
1461 let ranges = memtable
1462 .ranges(
1463 None,
1464 RangesOptions::default().with_predicate(predicate_group),
1465 )
1466 .unwrap();
1467
1468 assert_eq!(3, ranges.ranges.len());
1469 let total_rows: usize = ranges.ranges.values().map(|r| r.stats().num_rows()).sum();
1470 assert_eq!(5, total_rows);
1471
1472 for (_range_id, range) in ranges.ranges.iter() {
1473 assert!(range.num_rows() > 0);
1474 assert!(range.is_record_batch());
1475
1476 let record_batch_iter = range.build_record_batch_iter(None, None).unwrap();
1477
1478 let mut total_rows = 0;
1479 for batch_result in record_batch_iter {
1480 let batch = batch_result.unwrap();
1481 total_rows += batch.num_rows();
1482 assert!(batch.num_rows() > 0);
1483 assert_eq!(8, batch.num_columns());
1484 }
1485 assert_eq!(total_rows, range.num_rows());
1486 }
1487 }
1488
1489 #[test]
1490 fn test_bulk_memtable_ranges_with_projection() {
1491 let metadata = metadata_for_test();
1492 let memtable = BulkMemtable::new(
1493 111,
1494 BulkMemtableConfig::default(),
1495 metadata.clone(),
1496 None,
1497 None,
1498 false,
1499 MergeMode::LastRow,
1500 );
1501
1502 let bulk_part = create_bulk_part_with_converter(
1503 "projection_test",
1504 5,
1505 vec![5000, 6000, 7000],
1506 vec![Some(50.0), Some(60.0), Some(70.0)],
1507 500,
1508 )
1509 .unwrap();
1510
1511 memtable.write_bulk(bulk_part).unwrap();
1512
1513 let projection = vec![4u32];
1514 let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
1515 let ranges = memtable
1516 .ranges(
1517 Some(&projection),
1518 RangesOptions::default().with_predicate(predicate_group),
1519 )
1520 .unwrap();
1521
1522 assert_eq!(1, ranges.ranges.len());
1523 let range = ranges.ranges.get(&0).unwrap();
1524
1525 assert!(range.is_record_batch());
1526 let record_batch_iter = range.build_record_batch_iter(None, None).unwrap();
1527
1528 let mut total_rows = 0;
1529 for batch_result in record_batch_iter {
1530 let batch = batch_result.unwrap();
1531 assert!(batch.num_rows() > 0);
1532 assert_eq!(5, batch.num_columns());
1533 total_rows += batch.num_rows();
1534 }
1535 assert_eq!(3, total_rows);
1536 }
1537
1538 #[test]
1539 fn test_bulk_memtable_unsupported_operations() {
1540 let metadata = metadata_for_test();
1541 let memtable = BulkMemtable::new(
1542 111,
1543 BulkMemtableConfig::default(),
1544 metadata.clone(),
1545 None,
1546 None,
1547 false,
1548 MergeMode::LastRow,
1549 );
1550
1551 let key_values = build_key_values_with_ts_seq_values(
1552 &metadata,
1553 "test".to_string(),
1554 1,
1555 vec![1000].into_iter(),
1556 vec![Some(1.0)].into_iter(),
1557 1,
1558 );
1559
1560 let err = memtable.write(&key_values).unwrap_err();
1561 assert!(err.to_string().contains("not supported"));
1562
1563 let kv = key_values.iter().next().unwrap();
1564 let err = memtable.write_one(kv).unwrap_err();
1565 assert!(err.to_string().contains("not supported"));
1566 }
1567
1568 #[test]
1569 fn test_bulk_memtable_freeze() {
1570 let metadata = metadata_for_test();
1571 let memtable = BulkMemtable::new(
1572 222,
1573 BulkMemtableConfig::default(),
1574 metadata.clone(),
1575 None,
1576 None,
1577 false,
1578 MergeMode::LastRow,
1579 );
1580
1581 let bulk_part = create_bulk_part_with_converter(
1582 "freeze_test",
1583 10,
1584 vec![10000],
1585 vec![Some(100.0)],
1586 1000,
1587 )
1588 .unwrap();
1589
1590 memtable.write_bulk(bulk_part).unwrap();
1591 memtable.freeze().unwrap();
1592
1593 let stats_after_freeze = memtable.stats();
1594 assert_eq!(1, stats_after_freeze.num_rows);
1595 }
1596
1597 #[test]
1598 fn test_bulk_memtable_fork() {
1599 let metadata = metadata_for_test();
1600 let original_memtable = BulkMemtable::new(
1601 333,
1602 BulkMemtableConfig::default(),
1603 metadata.clone(),
1604 None,
1605 None,
1606 false,
1607 MergeMode::LastRow,
1608 );
1609
1610 let bulk_part =
1611 create_bulk_part_with_converter("fork_test", 15, vec![15000], vec![Some(150.0)], 1500)
1612 .unwrap();
1613
1614 original_memtable.write_bulk(bulk_part).unwrap();
1615
1616 let forked_memtable = original_memtable.fork(444, &metadata);
1617
1618 assert_eq!(forked_memtable.id(), 444);
1619 assert!(forked_memtable.is_empty());
1620 assert_eq!(0, forked_memtable.stats().num_rows);
1621
1622 assert!(!original_memtable.is_empty());
1623 assert_eq!(1, original_memtable.stats().num_rows);
1624 }
1625
1626 #[test]
1627 fn test_bulk_memtable_ranges_multiple_parts() {
1628 let metadata = metadata_for_test();
1629 let memtable = BulkMemtable::new(
1630 777,
1631 BulkMemtableConfig::default(),
1632 metadata.clone(),
1633 None,
1634 None,
1635 false,
1636 MergeMode::LastRow,
1637 );
1638 memtable.set_unordered_part_threshold(0);
1640
1641 let parts_data = vec![
1642 (
1643 "part1",
1644 1u32,
1645 vec![1000i64, 1100i64],
1646 vec![Some(10.0), Some(11.0)],
1647 100u64,
1648 ),
1649 (
1650 "part2",
1651 2u32,
1652 vec![2000i64, 2100i64],
1653 vec![Some(20.0), Some(21.0)],
1654 200u64,
1655 ),
1656 ("part3", 3u32, vec![3000i64], vec![Some(30.0)], 300u64),
1657 ];
1658
1659 for (k0, k1, timestamps, values, seq) in parts_data {
1660 let part = create_bulk_part_with_converter(k0, k1, timestamps, values, seq).unwrap();
1661 memtable.write_bulk(part).unwrap();
1662 }
1663
1664 let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
1665 let ranges = memtable
1666 .ranges(
1667 None,
1668 RangesOptions::default().with_predicate(predicate_group),
1669 )
1670 .unwrap();
1671
1672 assert_eq!(3, ranges.ranges.len());
1673 let total_rows: usize = ranges.ranges.values().map(|r| r.stats().num_rows()).sum();
1674 assert_eq!(5, total_rows);
1675 assert_eq!(3, ranges.ranges.len());
1676
1677 for (range_id, range) in ranges.ranges.iter() {
1678 assert!(*range_id < 3);
1679 assert!(range.num_rows() > 0);
1680 assert!(range.is_record_batch());
1681 }
1682 }
1683
1684 #[test]
1685 fn test_bulk_memtable_ranges_with_sequence_filter() {
1686 let metadata = metadata_for_test();
1687 let memtable = BulkMemtable::new(
1688 888,
1689 BulkMemtableConfig::default(),
1690 metadata.clone(),
1691 None,
1692 None,
1693 false,
1694 MergeMode::LastRow,
1695 );
1696
1697 let part = create_bulk_part_with_converter(
1698 "seq_test",
1699 1,
1700 vec![1000, 2000, 3000],
1701 vec![Some(10.0), Some(20.0), Some(30.0)],
1702 500,
1703 )
1704 .unwrap();
1705
1706 memtable.write_bulk(part).unwrap();
1707
1708 let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
1709 let sequence_filter = Some(SequenceRange::LtEq { max: 400 }); let ranges = memtable
1711 .ranges(
1712 None,
1713 RangesOptions::default()
1714 .with_predicate(predicate_group)
1715 .with_sequence(sequence_filter),
1716 )
1717 .unwrap();
1718
1719 assert_eq!(1, ranges.ranges.len());
1720 let range = ranges.ranges.get(&0).unwrap();
1721
1722 let mut record_batch_iter = range.build_record_batch_iter(None, None).unwrap();
1723 assert!(record_batch_iter.next().is_none());
1724 }
1725
1726 #[test]
1727 fn test_bulk_memtable_ranges_with_encoded_parts() {
1728 let metadata = metadata_for_test();
1729 let config = BulkMemtableConfig {
1730 merge_threshold: 8,
1731 ..Default::default()
1732 };
1733 let memtable = BulkMemtable::new(
1734 999,
1735 config,
1736 metadata.clone(),
1737 None,
1738 None,
1739 false,
1740 MergeMode::LastRow,
1741 );
1742 memtable.set_unordered_part_threshold(0);
1744
1745 for i in 0..10 {
1747 let part = create_bulk_part_with_converter(
1748 &format!("key_{}", i),
1749 i,
1750 vec![1000 + i as i64 * 100],
1751 vec![Some(i as f64 * 10.0)],
1752 100 + i as u64,
1753 )
1754 .unwrap();
1755 memtable.write_bulk(part).unwrap();
1756 }
1757
1758 memtable.compact(false).unwrap();
1759
1760 let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
1761 let ranges = memtable
1762 .ranges(
1763 None,
1764 RangesOptions::default().with_predicate(predicate_group),
1765 )
1766 .unwrap();
1767
1768 assert_eq!(3, ranges.ranges.len());
1770 let total_rows: usize = ranges.ranges.values().map(|r| r.stats().num_rows()).sum();
1771 assert_eq!(10, total_rows);
1772
1773 for (_range_id, range) in ranges.ranges.iter() {
1774 assert!(range.num_rows() > 0);
1775 assert!(range.is_record_batch());
1776
1777 let record_batch_iter = range.build_record_batch_iter(None, None).unwrap();
1778 let mut total_rows = 0;
1779 for batch_result in record_batch_iter {
1780 let batch = batch_result.unwrap();
1781 total_rows += batch.num_rows();
1782 assert!(batch.num_rows() > 0);
1783 }
1784 assert_eq!(total_rows, range.num_rows());
1785 }
1786 }
1787
1788 #[test]
1789 fn test_bulk_memtable_unordered_part() {
1790 let metadata = metadata_for_test();
1791 let memtable = BulkMemtable::new(
1792 1001,
1793 BulkMemtableConfig::default(),
1794 metadata.clone(),
1795 None,
1796 None,
1797 false,
1798 MergeMode::LastRow,
1799 );
1800
1801 memtable.set_unordered_part_threshold(5);
1804 memtable.set_unordered_part_compact_threshold(10);
1806
1807 for i in 0..3 {
1809 let part = create_bulk_part_with_converter(
1810 &format!("key_{}", i),
1811 i,
1812 vec![1000 + i as i64 * 100, 1100 + i as i64 * 100],
1813 vec![Some(i as f64 * 10.0), Some(i as f64 * 10.0 + 1.0)],
1814 100 + i as u64,
1815 )
1816 .unwrap();
1817 assert_eq!(2, part.num_rows());
1818 memtable.write_bulk(part).unwrap();
1819 }
1820
1821 let stats = memtable.stats();
1823 assert_eq!(6, stats.num_rows);
1824
1825 for i in 3..5 {
1828 let part = create_bulk_part_with_converter(
1829 &format!("key_{}", i),
1830 i,
1831 vec![1000 + i as i64 * 100, 1100 + i as i64 * 100],
1832 vec![Some(i as f64 * 10.0), Some(i as f64 * 10.0 + 1.0)],
1833 100 + i as u64,
1834 )
1835 .unwrap();
1836 memtable.write_bulk(part).unwrap();
1837 }
1838
1839 let stats = memtable.stats();
1841 assert_eq!(10, stats.num_rows);
1842
1843 let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
1845 let ranges = memtable
1846 .ranges(
1847 None,
1848 RangesOptions::default().with_predicate(predicate_group),
1849 )
1850 .unwrap();
1851
1852 assert!(!ranges.ranges.is_empty());
1854 let total_rows: usize = ranges.ranges.values().map(|r| r.stats().num_rows()).sum();
1855 assert_eq!(10, total_rows);
1856
1857 let mut total_rows_read = 0;
1859 for (_range_id, range) in ranges.ranges.iter() {
1860 assert!(range.is_record_batch());
1861 let record_batch_iter = range.build_record_batch_iter(None, None).unwrap();
1862
1863 for batch_result in record_batch_iter {
1864 let batch = batch_result.unwrap();
1865 total_rows_read += batch.num_rows();
1866 }
1867 }
1868 assert_eq!(10, total_rows_read);
1869 }
1870
1871 #[test]
1872 fn test_bulk_memtable_unordered_part_mixed_sizes() {
1873 let metadata = metadata_for_test();
1874 let memtable = BulkMemtable::new(
1875 1002,
1876 BulkMemtableConfig::default(),
1877 metadata.clone(),
1878 None,
1879 None,
1880 false,
1881 MergeMode::LastRow,
1882 );
1883
1884 memtable.set_unordered_part_threshold(4);
1886 memtable.set_unordered_part_compact_threshold(8);
1887
1888 for i in 0..2 {
1890 let part = create_bulk_part_with_converter(
1891 &format!("small_{}", i),
1892 i,
1893 vec![1000 + i as i64, 2000 + i as i64, 3000 + i as i64],
1894 vec![Some(i as f64), Some(i as f64 + 1.0), Some(i as f64 + 2.0)],
1895 10 + i as u64,
1896 )
1897 .unwrap();
1898 assert_eq!(3, part.num_rows());
1899 memtable.write_bulk(part).unwrap();
1900 }
1901
1902 let large_part = create_bulk_part_with_converter(
1904 "large_key",
1905 100,
1906 vec![5000, 6000, 7000, 8000, 9000],
1907 vec![
1908 Some(100.0),
1909 Some(101.0),
1910 Some(102.0),
1911 Some(103.0),
1912 Some(104.0),
1913 ],
1914 50,
1915 )
1916 .unwrap();
1917 assert_eq!(5, large_part.num_rows());
1918 memtable.write_bulk(large_part).unwrap();
1919
1920 let part = create_bulk_part_with_converter(
1922 "small_2",
1923 2,
1924 vec![4000, 4100],
1925 vec![Some(20.0), Some(21.0)],
1926 30,
1927 )
1928 .unwrap();
1929 memtable.write_bulk(part).unwrap();
1930
1931 let stats = memtable.stats();
1932 assert_eq!(13, stats.num_rows); let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
1936 let ranges = memtable
1937 .ranges(
1938 None,
1939 RangesOptions::default().with_predicate(predicate_group),
1940 )
1941 .unwrap();
1942
1943 let total_rows: usize = ranges.ranges.values().map(|r| r.stats().num_rows()).sum();
1944 assert_eq!(13, total_rows);
1945
1946 let mut total_rows_read = 0;
1947 for (_range_id, range) in ranges.ranges.iter() {
1948 let record_batch_iter = range.build_record_batch_iter(None, None).unwrap();
1949 for batch_result in record_batch_iter {
1950 let batch = batch_result.unwrap();
1951 total_rows_read += batch.num_rows();
1952 }
1953 }
1954 assert_eq!(13, total_rows_read);
1955 }
1956
1957 #[test]
1958 fn test_bulk_memtable_unordered_part_with_ranges() {
1959 let metadata = metadata_for_test();
1960 let memtable = BulkMemtable::new(
1961 1003,
1962 BulkMemtableConfig::default(),
1963 metadata.clone(),
1964 None,
1965 None,
1966 false,
1967 MergeMode::LastRow,
1968 );
1969
1970 memtable.set_unordered_part_threshold(3);
1972 memtable.set_unordered_part_compact_threshold(100); for i in 0..3 {
1976 let part = create_bulk_part_with_converter(
1977 &format!("key_{}", i),
1978 i,
1979 vec![1000 + i as i64 * 100],
1980 vec![Some(i as f64 * 10.0)],
1981 100 + i as u64,
1982 )
1983 .unwrap();
1984 assert_eq!(1, part.num_rows());
1985 memtable.write_bulk(part).unwrap();
1986 }
1987
1988 let stats = memtable.stats();
1989 assert_eq!(3, stats.num_rows);
1990
1991 let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
1993 let ranges = memtable
1994 .ranges(
1995 None,
1996 RangesOptions::default().with_predicate(predicate_group),
1997 )
1998 .unwrap();
1999
2000 assert_eq!(1, ranges.ranges.len());
2002 let total_rows: usize = ranges.ranges.values().map(|r| r.stats().num_rows()).sum();
2003 assert_eq!(3, total_rows);
2004
2005 let range = ranges.ranges.get(&0).unwrap();
2007 let record_batch_iter = range.build_record_batch_iter(None, None).unwrap();
2008
2009 let mut total_rows = 0;
2010 for batch_result in record_batch_iter {
2011 let batch = batch_result.unwrap();
2012 total_rows += batch.num_rows();
2013 assert!(batch.num_rows() > 0);
2015 }
2016 assert_eq!(3, total_rows);
2017 }
2018
2019 fn create_bulk_part_wrapper(part: BulkPart) -> BulkPartWrapper {
2021 BulkPartWrapper {
2022 part: PartToMerge::Bulk {
2023 part,
2024 file_id: FileId::random(),
2025 },
2026 merging: false,
2027 }
2028 }
2029
2030 #[test]
2031 fn test_should_merge_parts_below_threshold() {
2032 let mut bulk_parts = BulkParts::default();
2033
2034 for i in 0..DEFAULT_MERGE_THRESHOLD - 1 {
2036 let part = create_bulk_part_with_converter(
2037 &format!("key_{}", i),
2038 i as u32,
2039 vec![1000 + i as i64 * 100],
2040 vec![Some(i as f64 * 10.0)],
2041 100 + i as u64,
2042 )
2043 .unwrap();
2044 bulk_parts.parts.push(create_bulk_part_wrapper(part));
2045 }
2046
2047 assert!(!bulk_parts.should_merge_parts(DEFAULT_MERGE_THRESHOLD));
2049 }
2050
2051 #[test]
2052 fn test_should_merge_parts_at_threshold() {
2053 let mut bulk_parts = BulkParts::default();
2054 let merge_threshold = 8;
2055
2056 for i in 0..merge_threshold {
2058 let part = create_bulk_part_with_converter(
2059 &format!("key_{}", i),
2060 i as u32,
2061 vec![1000 + i as i64 * 100],
2062 vec![Some(i as f64 * 10.0)],
2063 100 + i as u64,
2064 )
2065 .unwrap();
2066 bulk_parts.parts.push(create_bulk_part_wrapper(part));
2067 }
2068
2069 assert!(bulk_parts.should_merge_parts(merge_threshold));
2071 }
2072
2073 #[test]
2074 fn test_should_merge_parts_with_merging_flag() {
2075 let mut bulk_parts = BulkParts::default();
2076 let merge_threshold = 8;
2077
2078 for i in 0..10 {
2080 let part = create_bulk_part_with_converter(
2081 &format!("key_{}", i),
2082 i as u32,
2083 vec![1000 + i as i64 * 100],
2084 vec![Some(i as f64 * 10.0)],
2085 100 + i as u64,
2086 )
2087 .unwrap();
2088 bulk_parts.parts.push(create_bulk_part_wrapper(part));
2089 }
2090
2091 assert!(bulk_parts.should_merge_parts(merge_threshold));
2093
2094 for wrapper in bulk_parts.parts.iter_mut().take(3) {
2096 wrapper.merging = true;
2097 }
2098
2099 assert!(!bulk_parts.should_merge_parts(merge_threshold));
2101 }
2102
2103 #[test]
2104 fn test_collect_parts_to_merge_grouping() {
2105 let mut bulk_parts = BulkParts::default();
2106
2107 for i in 0..16 {
2109 let num_rows = (i % 4) + 1; let timestamps: Vec<i64> = (0..num_rows)
2111 .map(|j| 1000 + i as i64 * 100 + j as i64)
2112 .collect();
2113 let values: Vec<Option<f64>> =
2114 (0..num_rows).map(|j| Some((i * 10 + j) as f64)).collect();
2115 let part = create_bulk_part_with_converter(
2116 &format!("key_{}", i),
2117 i as u32,
2118 timestamps,
2119 values,
2120 100 + i as u64,
2121 )
2122 .unwrap();
2123 bulk_parts.parts.push(create_bulk_part_wrapper(part));
2124 }
2125
2126 assert!(bulk_parts.should_merge_parts(DEFAULT_MERGE_THRESHOLD));
2128
2129 let collected =
2131 bulk_parts.collect_parts_to_merge(DEFAULT_MERGE_THRESHOLD, DEFAULT_MAX_MERGE_GROUPS);
2132
2133 assert!(!collected.groups.is_empty());
2135
2136 for group in &collected.groups {
2138 assert!(!group.is_empty());
2139 }
2140
2141 let total_parts: usize = collected.groups.iter().map(|g| g.len()).sum();
2143 assert_eq!(16, total_parts);
2144 }
2145
2146 #[test]
2147 fn test_bulk_memtable_ranges_with_multi_bulk_part() {
2148 let metadata = metadata_for_test();
2149 let merge_threshold = 8;
2150 let config = BulkMemtableConfig {
2151 merge_threshold,
2152 ..Default::default()
2153 };
2154 let memtable = BulkMemtable::new(
2155 2005,
2156 config,
2157 metadata.clone(),
2158 None,
2159 None,
2160 false,
2161 MergeMode::LastRow,
2162 );
2163 memtable.set_unordered_part_threshold(0);
2165
2166 for i in 0..merge_threshold {
2170 let part = create_bulk_part_with_converter(
2171 &format!("key_{}", i),
2172 i as u32,
2173 vec![1000 + i as i64 * 100, 2000 + i as i64 * 100],
2174 vec![Some(i as f64 * 10.0), Some(i as f64 * 10.0 + 1.0)],
2175 100 + i as u64,
2176 )
2177 .unwrap();
2178 memtable.write_bulk(part).unwrap();
2179 }
2180
2181 memtable.compact(false).unwrap();
2183
2184 let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
2186 let ranges = memtable
2187 .ranges(
2188 None,
2189 RangesOptions::default().with_predicate(predicate_group),
2190 )
2191 .unwrap();
2192
2193 assert_eq!(1, ranges.ranges.len());
2194 let expected_rows = merge_threshold * 2; let total_rows: usize = ranges.ranges.values().map(|r| r.stats().num_rows()).sum();
2196 assert_eq!(expected_rows, total_rows);
2197
2198 let mut total_rows_read = 0;
2200 for (_range_id, range) in ranges.ranges.iter() {
2201 assert!(range.is_record_batch());
2202 let record_batch_iter = range.build_record_batch_iter(None, None).unwrap();
2203
2204 for batch_result in record_batch_iter {
2205 let batch = batch_result.unwrap();
2206 total_rows_read += batch.num_rows();
2207 }
2208 }
2209 assert_eq!(expected_rows, total_rows_read);
2210 }
2211}