1#[allow(unused)]
18pub mod context;
19#[allow(unused)]
20pub mod part;
21pub mod part_reader;
22mod row_group_reader;
23
24use std::collections::BTreeMap;
25use std::sync::atomic::{AtomicI64, AtomicU64, AtomicUsize, Ordering};
26use std::sync::{Arc, RwLock};
27
28use datatypes::arrow::datatypes::SchemaRef;
29use mito_codec::key_values::KeyValue;
30use store_api::metadata::RegionMetadataRef;
31use store_api::storage::{ColumnId, SequenceNumber};
32
33use crate::error::{Result, UnsupportedOperationSnafu};
34use crate::flush::WriteBufferManagerRef;
35use crate::memtable::bulk::context::BulkIterContext;
36use crate::memtable::bulk::part::BulkPart;
37use crate::memtable::bulk::part_reader::BulkPartRecordBatchIter;
38use crate::memtable::stats::WriteMetrics;
39use crate::memtable::{
40 AllocTracker, BoxedBatchIterator, BoxedRecordBatchIterator, EncodedBulkPart, IterBuilder,
41 KeyValues, MemScanMetrics, Memtable, MemtableBuilder, MemtableId, MemtableRange,
42 MemtableRangeContext, MemtableRanges, MemtableRef, MemtableStats, PredicateGroup,
43};
44use crate::sst::file::FileId;
45use crate::sst::{to_flat_sst_arrow_schema, FlatSchemaOptions};
46
47#[derive(Default)]
49struct BulkParts {
50 parts: Vec<BulkPartWrapper>,
52 encoded_parts: Vec<EncodedPartWrapper>,
54}
55
56impl BulkParts {
57 fn num_parts(&self) -> usize {
59 self.parts.len() + self.encoded_parts.len()
60 }
61
62 fn is_empty(&self) -> bool {
64 self.parts.is_empty() && self.encoded_parts.is_empty()
65 }
66}
67
68pub struct BulkMemtable {
70 id: MemtableId,
71 parts: Arc<RwLock<BulkParts>>,
72 metadata: RegionMetadataRef,
73 alloc_tracker: AllocTracker,
74 max_timestamp: AtomicI64,
75 min_timestamp: AtomicI64,
76 max_sequence: AtomicU64,
77 num_rows: AtomicUsize,
78 #[allow(dead_code)]
80 flat_arrow_schema: SchemaRef,
81}
82
83impl std::fmt::Debug for BulkMemtable {
84 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
85 f.debug_struct("BulkMemtable")
86 .field("id", &self.id)
87 .field("num_rows", &self.num_rows.load(Ordering::Relaxed))
88 .field("min_timestamp", &self.min_timestamp.load(Ordering::Relaxed))
89 .field("max_timestamp", &self.max_timestamp.load(Ordering::Relaxed))
90 .field("max_sequence", &self.max_sequence.load(Ordering::Relaxed))
91 .finish()
92 }
93}
94
95impl Memtable for BulkMemtable {
96 fn id(&self) -> MemtableId {
97 self.id
98 }
99
100 fn write(&self, _kvs: &KeyValues) -> Result<()> {
101 UnsupportedOperationSnafu {
102 err_msg: "write() is not supported for bulk memtable",
103 }
104 .fail()
105 }
106
107 fn write_one(&self, _key_value: KeyValue) -> Result<()> {
108 UnsupportedOperationSnafu {
109 err_msg: "write_one() is not supported for bulk memtable",
110 }
111 .fail()
112 }
113
114 fn write_bulk(&self, fragment: BulkPart) -> Result<()> {
115 let local_metrics = WriteMetrics {
116 key_bytes: 0,
117 value_bytes: fragment.estimated_size(),
118 min_ts: fragment.min_ts,
119 max_ts: fragment.max_ts,
120 num_rows: fragment.num_rows(),
121 max_sequence: fragment.sequence,
122 };
123
124 {
125 let mut bulk_parts = self.parts.write().unwrap();
126 bulk_parts.parts.push(BulkPartWrapper {
127 part: fragment,
128 file_id: FileId::random(),
129 });
130
131 self.update_stats(local_metrics);
136 }
137
138 Ok(())
139 }
140
141 #[cfg(any(test, feature = "test"))]
142 fn iter(
143 &self,
144 _projection: Option<&[ColumnId]>,
145 _predicate: Option<table::predicate::Predicate>,
146 _sequence: Option<SequenceNumber>,
147 ) -> Result<crate::memtable::BoxedBatchIterator> {
148 todo!()
149 }
150
151 fn ranges(
152 &self,
153 projection: Option<&[ColumnId]>,
154 predicate: PredicateGroup,
155 sequence: Option<SequenceNumber>,
156 ) -> Result<MemtableRanges> {
157 let mut ranges = BTreeMap::new();
158 let mut range_id = 0;
159
160 let context = Arc::new(BulkIterContext::new(
161 self.metadata.clone(),
162 &projection,
163 predicate.predicate().cloned(),
164 ));
165
166 {
168 let bulk_parts = self.parts.read().unwrap();
169
170 for part_wrapper in bulk_parts.parts.iter() {
172 if part_wrapper.part.num_rows() == 0 {
174 continue;
175 }
176
177 let range = MemtableRange::new(
178 Arc::new(MemtableRangeContext::new(
179 self.id,
180 Box::new(BulkRangeIterBuilder {
181 part: part_wrapper.part.clone(),
182 context: context.clone(),
183 sequence,
184 }),
185 predicate.clone(),
186 )),
187 part_wrapper.part.num_rows(),
188 );
189 ranges.insert(range_id, range);
190 range_id += 1;
191 }
192
193 for encoded_part_wrapper in bulk_parts.encoded_parts.iter() {
195 if encoded_part_wrapper.part.metadata().num_rows == 0 {
197 continue;
198 }
199
200 let range = MemtableRange::new(
201 Arc::new(MemtableRangeContext::new(
202 self.id,
203 Box::new(EncodedBulkRangeIterBuilder {
204 file_id: encoded_part_wrapper.file_id,
205 part: encoded_part_wrapper.part.clone(),
206 context: context.clone(),
207 sequence,
208 }),
209 predicate.clone(),
210 )),
211 encoded_part_wrapper.part.metadata().num_rows,
212 );
213 ranges.insert(range_id, range);
214 range_id += 1;
215 }
216 }
217
218 let mut stats = self.stats();
219 stats.num_ranges = ranges.len();
220
221 Ok(MemtableRanges { ranges, stats })
223 }
224
225 fn is_empty(&self) -> bool {
226 let bulk_parts = self.parts.read().unwrap();
227 bulk_parts.is_empty()
228 }
229
230 fn freeze(&self) -> Result<()> {
231 self.alloc_tracker.done_allocating();
232 Ok(())
233 }
234
235 fn stats(&self) -> MemtableStats {
236 let estimated_bytes = self.alloc_tracker.bytes_allocated();
237
238 if estimated_bytes == 0 || self.num_rows.load(Ordering::Relaxed) == 0 {
239 return MemtableStats {
240 estimated_bytes,
241 time_range: None,
242 num_rows: 0,
243 num_ranges: 0,
244 max_sequence: 0,
245 series_count: 0,
246 };
247 }
248
249 let ts_type = self
250 .metadata
251 .time_index_column()
252 .column_schema
253 .data_type
254 .clone()
255 .as_timestamp()
256 .expect("Timestamp column must have timestamp type");
257 let max_timestamp = ts_type.create_timestamp(self.max_timestamp.load(Ordering::Relaxed));
258 let min_timestamp = ts_type.create_timestamp(self.min_timestamp.load(Ordering::Relaxed));
259
260 let num_ranges = self.parts.read().unwrap().num_parts();
261
262 MemtableStats {
263 estimated_bytes,
264 time_range: Some((min_timestamp, max_timestamp)),
265 num_rows: self.num_rows.load(Ordering::Relaxed),
266 num_ranges,
267 max_sequence: self.max_sequence.load(Ordering::Relaxed),
268 series_count: self.estimated_series_count(),
269 }
270 }
271
272 fn fork(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef {
273 let flat_arrow_schema = to_flat_sst_arrow_schema(
275 metadata,
276 &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
277 );
278
279 Arc::new(Self {
280 id,
281 parts: Arc::new(RwLock::new(BulkParts::default())),
282 metadata: metadata.clone(),
283 alloc_tracker: AllocTracker::new(self.alloc_tracker.write_buffer_manager()),
284 max_timestamp: AtomicI64::new(i64::MIN),
285 min_timestamp: AtomicI64::new(i64::MAX),
286 max_sequence: AtomicU64::new(0),
287 num_rows: AtomicUsize::new(0),
288 flat_arrow_schema,
289 })
290 }
291}
292
293impl BulkMemtable {
294 pub fn new(
296 id: MemtableId,
297 metadata: RegionMetadataRef,
298 write_buffer_manager: Option<WriteBufferManagerRef>,
299 ) -> Self {
300 let flat_arrow_schema = to_flat_sst_arrow_schema(
301 &metadata,
302 &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
303 );
304
305 Self {
306 id,
307 parts: Arc::new(RwLock::new(BulkParts::default())),
308 metadata,
309 alloc_tracker: AllocTracker::new(write_buffer_manager),
310 max_timestamp: AtomicI64::new(i64::MIN),
311 min_timestamp: AtomicI64::new(i64::MAX),
312 max_sequence: AtomicU64::new(0),
313 num_rows: AtomicUsize::new(0),
314 flat_arrow_schema,
315 }
316 }
317
318 fn update_stats(&self, stats: WriteMetrics) {
322 self.alloc_tracker
323 .on_allocation(stats.key_bytes + stats.value_bytes);
324
325 self.max_timestamp
326 .fetch_max(stats.max_ts, Ordering::Relaxed);
327 self.min_timestamp
328 .fetch_min(stats.min_ts, Ordering::Relaxed);
329 self.max_sequence
330 .fetch_max(stats.max_sequence, Ordering::Relaxed);
331 self.num_rows.fetch_add(stats.num_rows, Ordering::Relaxed);
332 }
333
334 fn estimated_series_count(&self) -> usize {
336 let bulk_parts = self.parts.read().unwrap();
337 bulk_parts
338 .parts
339 .iter()
340 .map(|part_wrapper| part_wrapper.part.estimated_series_count())
341 .sum()
342 }
343}
344
345struct BulkRangeIterBuilder {
347 part: BulkPart,
348 context: Arc<BulkIterContext>,
349 sequence: Option<SequenceNumber>,
350}
351
352impl IterBuilder for BulkRangeIterBuilder {
353 fn build(&self, _metrics: Option<MemScanMetrics>) -> Result<BoxedBatchIterator> {
354 UnsupportedOperationSnafu {
355 err_msg: "BatchIterator is not supported for bulk memtable",
356 }
357 .fail()
358 }
359
360 fn is_record_batch(&self) -> bool {
361 true
362 }
363
364 fn build_record_batch(
365 &self,
366 _metrics: Option<MemScanMetrics>,
367 ) -> Result<BoxedRecordBatchIterator> {
368 let iter = BulkPartRecordBatchIter::new(
369 self.part.batch.clone(),
370 self.context.clone(),
371 self.sequence,
372 );
373
374 Ok(Box::new(iter))
375 }
376}
377
378struct EncodedBulkRangeIterBuilder {
380 #[allow(dead_code)]
381 file_id: FileId,
382 part: EncodedBulkPart,
383 context: Arc<BulkIterContext>,
384 sequence: Option<SequenceNumber>,
385}
386
387impl IterBuilder for EncodedBulkRangeIterBuilder {
388 fn build(&self, _metrics: Option<MemScanMetrics>) -> Result<BoxedBatchIterator> {
389 UnsupportedOperationSnafu {
390 err_msg: "BatchIterator is not supported for encoded bulk memtable",
391 }
392 .fail()
393 }
394
395 fn is_record_batch(&self) -> bool {
396 true
397 }
398
399 fn build_record_batch(
400 &self,
401 _metrics: Option<MemScanMetrics>,
402 ) -> Result<BoxedRecordBatchIterator> {
403 if let Some(iter) = self.part.read(self.context.clone(), self.sequence)? {
404 Ok(iter)
405 } else {
406 Ok(Box::new(std::iter::empty()))
408 }
409 }
410}
411
412struct BulkPartWrapper {
413 part: BulkPart,
414 #[allow(dead_code)]
416 file_id: FileId,
417}
418
419struct EncodedPartWrapper {
420 part: EncodedBulkPart,
421 #[allow(dead_code)]
423 file_id: FileId,
424}
425
426#[derive(Debug, Default)]
428pub struct BulkMemtableBuilder {
429 write_buffer_manager: Option<WriteBufferManagerRef>,
430}
431
432impl BulkMemtableBuilder {
433 pub fn new(write_buffer_manager: Option<WriteBufferManagerRef>) -> Self {
435 Self {
436 write_buffer_manager,
437 }
438 }
439}
440
441impl MemtableBuilder for BulkMemtableBuilder {
442 fn build(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef {
443 Arc::new(BulkMemtable::new(
444 id,
445 metadata.clone(),
446 self.write_buffer_manager.clone(),
447 ))
448 }
449
450 fn use_bulk_insert(&self, _metadata: &RegionMetadataRef) -> bool {
451 true
452 }
453}
454
455#[cfg(test)]
456mod tests {
457
458 use mito_codec::row_converter::build_primary_key_codec;
459
460 use super::*;
461 use crate::memtable::bulk::part::BulkPartConverter;
462 use crate::read::scan_region::PredicateGroup;
463 use crate::sst::{to_flat_sst_arrow_schema, FlatSchemaOptions};
464 use crate::test_util::memtable_util::{build_key_values_with_ts_seq_values, metadata_for_test};
465
466 fn create_bulk_part_with_converter(
467 k0: &str,
468 k1: u32,
469 timestamps: Vec<i64>,
470 values: Vec<Option<f64>>,
471 sequence: u64,
472 ) -> Result<BulkPart> {
473 let metadata = metadata_for_test();
474 let capacity = 100;
475 let primary_key_codec = build_primary_key_codec(&metadata);
476 let schema = to_flat_sst_arrow_schema(
477 &metadata,
478 &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
479 );
480
481 let mut converter =
482 BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec, true);
483
484 let key_values = build_key_values_with_ts_seq_values(
485 &metadata,
486 k0.to_string(),
487 k1,
488 timestamps.into_iter(),
489 values.into_iter(),
490 sequence,
491 );
492
493 converter.append_key_values(&key_values)?;
494 converter.convert()
495 }
496
497 #[test]
498 fn test_bulk_memtable_write_read() {
499 let metadata = metadata_for_test();
500 let memtable = BulkMemtable::new(999, metadata.clone(), None);
501
502 let test_data = vec![
503 (
504 "key_a",
505 1u32,
506 vec![1000i64, 2000i64],
507 vec![Some(10.5), Some(20.5)],
508 100u64,
509 ),
510 (
511 "key_b",
512 2u32,
513 vec![1500i64, 2500i64],
514 vec![Some(15.5), Some(25.5)],
515 200u64,
516 ),
517 ("key_c", 3u32, vec![3000i64], vec![Some(30.5)], 300u64),
518 ];
519
520 for (k0, k1, timestamps, values, seq) in test_data.iter() {
521 let part =
522 create_bulk_part_with_converter(k0, *k1, timestamps.clone(), values.clone(), *seq)
523 .unwrap();
524 memtable.write_bulk(part).unwrap();
525 }
526
527 let stats = memtable.stats();
528 assert_eq!(5, stats.num_rows);
529 assert_eq!(3, stats.num_ranges);
530 assert_eq!(300, stats.max_sequence);
531
532 let (min_ts, max_ts) = stats.time_range.unwrap();
533 assert_eq!(1000, min_ts.value());
534 assert_eq!(3000, max_ts.value());
535
536 let predicate_group = PredicateGroup::new(&metadata, &[]);
537 let ranges = memtable.ranges(None, predicate_group, None).unwrap();
538
539 assert_eq!(3, ranges.ranges.len());
540 assert_eq!(5, ranges.stats.num_rows);
541
542 for (_range_id, range) in ranges.ranges.iter() {
543 assert!(range.num_rows() > 0);
544 assert!(range.is_record_batch());
545
546 let record_batch_iter = range.build_record_batch_iter(None).unwrap();
547
548 let mut total_rows = 0;
549 for batch_result in record_batch_iter {
550 let batch = batch_result.unwrap();
551 total_rows += batch.num_rows();
552 assert!(batch.num_rows() > 0);
553 assert_eq!(8, batch.num_columns());
554 }
555 assert_eq!(total_rows, range.num_rows());
556 }
557 }
558
559 #[test]
560 fn test_bulk_memtable_ranges_with_projection() {
561 let metadata = metadata_for_test();
562 let memtable = BulkMemtable::new(111, metadata.clone(), None);
563
564 let bulk_part = create_bulk_part_with_converter(
565 "projection_test",
566 5,
567 vec![5000, 6000, 7000],
568 vec![Some(50.0), Some(60.0), Some(70.0)],
569 500,
570 )
571 .unwrap();
572
573 memtable.write_bulk(bulk_part).unwrap();
574
575 let projection = vec![4u32];
576 let predicate_group = PredicateGroup::new(&metadata, &[]);
577 let ranges = memtable
578 .ranges(Some(&projection), predicate_group, None)
579 .unwrap();
580
581 assert_eq!(1, ranges.ranges.len());
582 let range = ranges.ranges.get(&0).unwrap();
583
584 assert!(range.is_record_batch());
585 let record_batch_iter = range.build_record_batch_iter(None).unwrap();
586
587 let mut total_rows = 0;
588 for batch_result in record_batch_iter {
589 let batch = batch_result.unwrap();
590 assert!(batch.num_rows() > 0);
591 assert_eq!(5, batch.num_columns());
592 total_rows += batch.num_rows();
593 }
594 assert_eq!(3, total_rows);
595 }
596
597 #[test]
598 fn test_bulk_memtable_unsupported_operations() {
599 let metadata = metadata_for_test();
600 let memtable = BulkMemtable::new(111, metadata.clone(), None);
601
602 let key_values = build_key_values_with_ts_seq_values(
603 &metadata,
604 "test".to_string(),
605 1,
606 vec![1000].into_iter(),
607 vec![Some(1.0)].into_iter(),
608 1,
609 );
610
611 let err = memtable.write(&key_values).unwrap_err();
612 assert!(err.to_string().contains("not supported"));
613
614 let kv = key_values.iter().next().unwrap();
615 let err = memtable.write_one(kv).unwrap_err();
616 assert!(err.to_string().contains("not supported"));
617 }
618
619 #[test]
620 fn test_bulk_memtable_freeze() {
621 let metadata = metadata_for_test();
622 let memtable = BulkMemtable::new(222, metadata.clone(), None);
623
624 let bulk_part = create_bulk_part_with_converter(
625 "freeze_test",
626 10,
627 vec![10000],
628 vec![Some(100.0)],
629 1000,
630 )
631 .unwrap();
632
633 memtable.write_bulk(bulk_part).unwrap();
634 memtable.freeze().unwrap();
635
636 let stats_after_freeze = memtable.stats();
637 assert_eq!(1, stats_after_freeze.num_rows);
638 }
639
640 #[test]
641 fn test_bulk_memtable_fork() {
642 let metadata = metadata_for_test();
643 let original_memtable = BulkMemtable::new(333, metadata.clone(), None);
644
645 let bulk_part =
646 create_bulk_part_with_converter("fork_test", 15, vec![15000], vec![Some(150.0)], 1500)
647 .unwrap();
648
649 original_memtable.write_bulk(bulk_part).unwrap();
650
651 let forked_memtable = original_memtable.fork(444, &metadata);
652
653 assert_eq!(forked_memtable.id(), 444);
654 assert!(forked_memtable.is_empty());
655 assert_eq!(0, forked_memtable.stats().num_rows);
656
657 assert!(!original_memtable.is_empty());
658 assert_eq!(1, original_memtable.stats().num_rows);
659 }
660}