mito2/memtable/
bulk.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Memtable implementation for bulk load
16
17#[allow(unused)]
18pub mod context;
19#[allow(unused)]
20pub mod part;
21pub mod part_reader;
22mod row_group_reader;
23
24use std::collections::BTreeMap;
25use std::sync::atomic::{AtomicI64, AtomicU64, AtomicUsize, Ordering};
26use std::sync::{Arc, RwLock};
27
28use datatypes::arrow::datatypes::SchemaRef;
29use mito_codec::key_values::KeyValue;
30use store_api::metadata::RegionMetadataRef;
31use store_api::storage::{ColumnId, SequenceNumber};
32
33use crate::error::{Result, UnsupportedOperationSnafu};
34use crate::flush::WriteBufferManagerRef;
35use crate::memtable::bulk::context::BulkIterContext;
36use crate::memtable::bulk::part::BulkPart;
37use crate::memtable::bulk::part_reader::BulkPartRecordBatchIter;
38use crate::memtable::stats::WriteMetrics;
39use crate::memtable::{
40    AllocTracker, BoxedBatchIterator, BoxedRecordBatchIterator, EncodedBulkPart, IterBuilder,
41    KeyValues, MemScanMetrics, Memtable, MemtableBuilder, MemtableId, MemtableRange,
42    MemtableRangeContext, MemtableRanges, MemtableRef, MemtableStats, PredicateGroup,
43};
44use crate::sst::file::FileId;
45use crate::sst::{to_flat_sst_arrow_schema, FlatSchemaOptions};
46
47/// All parts in a bulk memtable.
48#[derive(Default)]
49struct BulkParts {
50    /// Raw parts.
51    parts: Vec<BulkPartWrapper>,
52    /// Parts encoded as parquets.
53    encoded_parts: Vec<EncodedPartWrapper>,
54}
55
56impl BulkParts {
57    /// Total number of parts (raw + encoded).
58    fn num_parts(&self) -> usize {
59        self.parts.len() + self.encoded_parts.len()
60    }
61
62    /// Returns true if there is no part.
63    fn is_empty(&self) -> bool {
64        self.parts.is_empty() && self.encoded_parts.is_empty()
65    }
66}
67
68/// Memtable that ingests and scans parts directly.
69pub struct BulkMemtable {
70    id: MemtableId,
71    parts: Arc<RwLock<BulkParts>>,
72    metadata: RegionMetadataRef,
73    alloc_tracker: AllocTracker,
74    max_timestamp: AtomicI64,
75    min_timestamp: AtomicI64,
76    max_sequence: AtomicU64,
77    num_rows: AtomicUsize,
78    /// Cached flat SST arrow schema for memtable compaction.
79    #[allow(dead_code)]
80    flat_arrow_schema: SchemaRef,
81}
82
83impl std::fmt::Debug for BulkMemtable {
84    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
85        f.debug_struct("BulkMemtable")
86            .field("id", &self.id)
87            .field("num_rows", &self.num_rows.load(Ordering::Relaxed))
88            .field("min_timestamp", &self.min_timestamp.load(Ordering::Relaxed))
89            .field("max_timestamp", &self.max_timestamp.load(Ordering::Relaxed))
90            .field("max_sequence", &self.max_sequence.load(Ordering::Relaxed))
91            .finish()
92    }
93}
94
95impl Memtable for BulkMemtable {
96    fn id(&self) -> MemtableId {
97        self.id
98    }
99
100    fn write(&self, _kvs: &KeyValues) -> Result<()> {
101        UnsupportedOperationSnafu {
102            err_msg: "write() is not supported for bulk memtable",
103        }
104        .fail()
105    }
106
107    fn write_one(&self, _key_value: KeyValue) -> Result<()> {
108        UnsupportedOperationSnafu {
109            err_msg: "write_one() is not supported for bulk memtable",
110        }
111        .fail()
112    }
113
114    fn write_bulk(&self, fragment: BulkPart) -> Result<()> {
115        let local_metrics = WriteMetrics {
116            key_bytes: 0,
117            value_bytes: fragment.estimated_size(),
118            min_ts: fragment.min_ts,
119            max_ts: fragment.max_ts,
120            num_rows: fragment.num_rows(),
121            max_sequence: fragment.sequence,
122        };
123
124        {
125            let mut bulk_parts = self.parts.write().unwrap();
126            bulk_parts.parts.push(BulkPartWrapper {
127                part: fragment,
128                file_id: FileId::random(),
129            });
130
131            // Since this operation should be fast, we do it in parts lock scope.
132            // This ensure the statistics in `ranges()` are correct. What's more,
133            // it guarantees no rows are out of the time range so we don't need to
134            // prune rows by time range again in the iterator of the MemtableRange.
135            self.update_stats(local_metrics);
136        }
137
138        Ok(())
139    }
140
141    #[cfg(any(test, feature = "test"))]
142    fn iter(
143        &self,
144        _projection: Option<&[ColumnId]>,
145        _predicate: Option<table::predicate::Predicate>,
146        _sequence: Option<SequenceNumber>,
147    ) -> Result<crate::memtable::BoxedBatchIterator> {
148        todo!()
149    }
150
151    fn ranges(
152        &self,
153        projection: Option<&[ColumnId]>,
154        predicate: PredicateGroup,
155        sequence: Option<SequenceNumber>,
156    ) -> Result<MemtableRanges> {
157        let mut ranges = BTreeMap::new();
158        let mut range_id = 0;
159
160        let context = Arc::new(BulkIterContext::new(
161            self.metadata.clone(),
162            &projection,
163            predicate.predicate().cloned(),
164        ));
165
166        // Adds ranges for regular parts and encoded parts
167        {
168            let bulk_parts = self.parts.read().unwrap();
169
170            // Adds ranges for regular parts
171            for part_wrapper in bulk_parts.parts.iter() {
172                // Skips empty parts
173                if part_wrapper.part.num_rows() == 0 {
174                    continue;
175                }
176
177                let range = MemtableRange::new(
178                    Arc::new(MemtableRangeContext::new(
179                        self.id,
180                        Box::new(BulkRangeIterBuilder {
181                            part: part_wrapper.part.clone(),
182                            context: context.clone(),
183                            sequence,
184                        }),
185                        predicate.clone(),
186                    )),
187                    part_wrapper.part.num_rows(),
188                );
189                ranges.insert(range_id, range);
190                range_id += 1;
191            }
192
193            // Adds ranges for encoded parts
194            for encoded_part_wrapper in bulk_parts.encoded_parts.iter() {
195                // Skips empty parts
196                if encoded_part_wrapper.part.metadata().num_rows == 0 {
197                    continue;
198                }
199
200                let range = MemtableRange::new(
201                    Arc::new(MemtableRangeContext::new(
202                        self.id,
203                        Box::new(EncodedBulkRangeIterBuilder {
204                            file_id: encoded_part_wrapper.file_id,
205                            part: encoded_part_wrapper.part.clone(),
206                            context: context.clone(),
207                            sequence,
208                        }),
209                        predicate.clone(),
210                    )),
211                    encoded_part_wrapper.part.metadata().num_rows,
212                );
213                ranges.insert(range_id, range);
214                range_id += 1;
215            }
216        }
217
218        let mut stats = self.stats();
219        stats.num_ranges = ranges.len();
220
221        // TODO(yingwen): Supports per range stats.
222        Ok(MemtableRanges { ranges, stats })
223    }
224
225    fn is_empty(&self) -> bool {
226        let bulk_parts = self.parts.read().unwrap();
227        bulk_parts.is_empty()
228    }
229
230    fn freeze(&self) -> Result<()> {
231        self.alloc_tracker.done_allocating();
232        Ok(())
233    }
234
235    fn stats(&self) -> MemtableStats {
236        let estimated_bytes = self.alloc_tracker.bytes_allocated();
237
238        if estimated_bytes == 0 || self.num_rows.load(Ordering::Relaxed) == 0 {
239            return MemtableStats {
240                estimated_bytes,
241                time_range: None,
242                num_rows: 0,
243                num_ranges: 0,
244                max_sequence: 0,
245                series_count: 0,
246            };
247        }
248
249        let ts_type = self
250            .metadata
251            .time_index_column()
252            .column_schema
253            .data_type
254            .clone()
255            .as_timestamp()
256            .expect("Timestamp column must have timestamp type");
257        let max_timestamp = ts_type.create_timestamp(self.max_timestamp.load(Ordering::Relaxed));
258        let min_timestamp = ts_type.create_timestamp(self.min_timestamp.load(Ordering::Relaxed));
259
260        let num_ranges = self.parts.read().unwrap().num_parts();
261
262        MemtableStats {
263            estimated_bytes,
264            time_range: Some((min_timestamp, max_timestamp)),
265            num_rows: self.num_rows.load(Ordering::Relaxed),
266            num_ranges,
267            max_sequence: self.max_sequence.load(Ordering::Relaxed),
268            series_count: self.estimated_series_count(),
269        }
270    }
271
272    fn fork(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef {
273        // Computes the new flat schema based on the new metadata.
274        let flat_arrow_schema = to_flat_sst_arrow_schema(
275            metadata,
276            &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
277        );
278
279        Arc::new(Self {
280            id,
281            parts: Arc::new(RwLock::new(BulkParts::default())),
282            metadata: metadata.clone(),
283            alloc_tracker: AllocTracker::new(self.alloc_tracker.write_buffer_manager()),
284            max_timestamp: AtomicI64::new(i64::MIN),
285            min_timestamp: AtomicI64::new(i64::MAX),
286            max_sequence: AtomicU64::new(0),
287            num_rows: AtomicUsize::new(0),
288            flat_arrow_schema,
289        })
290    }
291}
292
293impl BulkMemtable {
294    /// Creates a new BulkMemtable
295    pub fn new(
296        id: MemtableId,
297        metadata: RegionMetadataRef,
298        write_buffer_manager: Option<WriteBufferManagerRef>,
299    ) -> Self {
300        let flat_arrow_schema = to_flat_sst_arrow_schema(
301            &metadata,
302            &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
303        );
304
305        Self {
306            id,
307            parts: Arc::new(RwLock::new(BulkParts::default())),
308            metadata,
309            alloc_tracker: AllocTracker::new(write_buffer_manager),
310            max_timestamp: AtomicI64::new(i64::MIN),
311            min_timestamp: AtomicI64::new(i64::MAX),
312            max_sequence: AtomicU64::new(0),
313            num_rows: AtomicUsize::new(0),
314            flat_arrow_schema,
315        }
316    }
317
318    /// Updates memtable stats.
319    ///
320    /// Please update this inside the write lock scope.
321    fn update_stats(&self, stats: WriteMetrics) {
322        self.alloc_tracker
323            .on_allocation(stats.key_bytes + stats.value_bytes);
324
325        self.max_timestamp
326            .fetch_max(stats.max_ts, Ordering::Relaxed);
327        self.min_timestamp
328            .fetch_min(stats.min_ts, Ordering::Relaxed);
329        self.max_sequence
330            .fetch_max(stats.max_sequence, Ordering::Relaxed);
331        self.num_rows.fetch_add(stats.num_rows, Ordering::Relaxed);
332    }
333
334    /// Returns the estimated time series count.
335    fn estimated_series_count(&self) -> usize {
336        let bulk_parts = self.parts.read().unwrap();
337        bulk_parts
338            .parts
339            .iter()
340            .map(|part_wrapper| part_wrapper.part.estimated_series_count())
341            .sum()
342    }
343}
344
345/// Iterator builder for bulk range
346struct BulkRangeIterBuilder {
347    part: BulkPart,
348    context: Arc<BulkIterContext>,
349    sequence: Option<SequenceNumber>,
350}
351
352impl IterBuilder for BulkRangeIterBuilder {
353    fn build(&self, _metrics: Option<MemScanMetrics>) -> Result<BoxedBatchIterator> {
354        UnsupportedOperationSnafu {
355            err_msg: "BatchIterator is not supported for bulk memtable",
356        }
357        .fail()
358    }
359
360    fn is_record_batch(&self) -> bool {
361        true
362    }
363
364    fn build_record_batch(
365        &self,
366        _metrics: Option<MemScanMetrics>,
367    ) -> Result<BoxedRecordBatchIterator> {
368        let iter = BulkPartRecordBatchIter::new(
369            self.part.batch.clone(),
370            self.context.clone(),
371            self.sequence,
372        );
373
374        Ok(Box::new(iter))
375    }
376}
377
378/// Iterator builder for encoded bulk range
379struct EncodedBulkRangeIterBuilder {
380    #[allow(dead_code)]
381    file_id: FileId,
382    part: EncodedBulkPart,
383    context: Arc<BulkIterContext>,
384    sequence: Option<SequenceNumber>,
385}
386
387impl IterBuilder for EncodedBulkRangeIterBuilder {
388    fn build(&self, _metrics: Option<MemScanMetrics>) -> Result<BoxedBatchIterator> {
389        UnsupportedOperationSnafu {
390            err_msg: "BatchIterator is not supported for encoded bulk memtable",
391        }
392        .fail()
393    }
394
395    fn is_record_batch(&self) -> bool {
396        true
397    }
398
399    fn build_record_batch(
400        &self,
401        _metrics: Option<MemScanMetrics>,
402    ) -> Result<BoxedRecordBatchIterator> {
403        if let Some(iter) = self.part.read(self.context.clone(), self.sequence)? {
404            Ok(iter)
405        } else {
406            // Return an empty iterator if no data to read
407            Ok(Box::new(std::iter::empty()))
408        }
409    }
410}
411
412struct BulkPartWrapper {
413    part: BulkPart,
414    /// The unique file id for this part in memtable.
415    #[allow(dead_code)]
416    file_id: FileId,
417}
418
419struct EncodedPartWrapper {
420    part: EncodedBulkPart,
421    /// The unique file id for this part in memtable.
422    #[allow(dead_code)]
423    file_id: FileId,
424}
425
426/// Builder to build a [BulkMemtable].
427#[derive(Debug, Default)]
428pub struct BulkMemtableBuilder {
429    write_buffer_manager: Option<WriteBufferManagerRef>,
430}
431
432impl BulkMemtableBuilder {
433    /// Creates a new builder with specific `write_buffer_manager`.
434    pub fn new(write_buffer_manager: Option<WriteBufferManagerRef>) -> Self {
435        Self {
436            write_buffer_manager,
437        }
438    }
439}
440
441impl MemtableBuilder for BulkMemtableBuilder {
442    fn build(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef {
443        Arc::new(BulkMemtable::new(
444            id,
445            metadata.clone(),
446            self.write_buffer_manager.clone(),
447        ))
448    }
449
450    fn use_bulk_insert(&self, _metadata: &RegionMetadataRef) -> bool {
451        true
452    }
453}
454
455#[cfg(test)]
456mod tests {
457
458    use mito_codec::row_converter::build_primary_key_codec;
459
460    use super::*;
461    use crate::memtable::bulk::part::BulkPartConverter;
462    use crate::read::scan_region::PredicateGroup;
463    use crate::sst::{to_flat_sst_arrow_schema, FlatSchemaOptions};
464    use crate::test_util::memtable_util::{build_key_values_with_ts_seq_values, metadata_for_test};
465
466    fn create_bulk_part_with_converter(
467        k0: &str,
468        k1: u32,
469        timestamps: Vec<i64>,
470        values: Vec<Option<f64>>,
471        sequence: u64,
472    ) -> Result<BulkPart> {
473        let metadata = metadata_for_test();
474        let capacity = 100;
475        let primary_key_codec = build_primary_key_codec(&metadata);
476        let schema = to_flat_sst_arrow_schema(
477            &metadata,
478            &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
479        );
480
481        let mut converter =
482            BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec, true);
483
484        let key_values = build_key_values_with_ts_seq_values(
485            &metadata,
486            k0.to_string(),
487            k1,
488            timestamps.into_iter(),
489            values.into_iter(),
490            sequence,
491        );
492
493        converter.append_key_values(&key_values)?;
494        converter.convert()
495    }
496
497    #[test]
498    fn test_bulk_memtable_write_read() {
499        let metadata = metadata_for_test();
500        let memtable = BulkMemtable::new(999, metadata.clone(), None);
501
502        let test_data = vec![
503            (
504                "key_a",
505                1u32,
506                vec![1000i64, 2000i64],
507                vec![Some(10.5), Some(20.5)],
508                100u64,
509            ),
510            (
511                "key_b",
512                2u32,
513                vec![1500i64, 2500i64],
514                vec![Some(15.5), Some(25.5)],
515                200u64,
516            ),
517            ("key_c", 3u32, vec![3000i64], vec![Some(30.5)], 300u64),
518        ];
519
520        for (k0, k1, timestamps, values, seq) in test_data.iter() {
521            let part =
522                create_bulk_part_with_converter(k0, *k1, timestamps.clone(), values.clone(), *seq)
523                    .unwrap();
524            memtable.write_bulk(part).unwrap();
525        }
526
527        let stats = memtable.stats();
528        assert_eq!(5, stats.num_rows);
529        assert_eq!(3, stats.num_ranges);
530        assert_eq!(300, stats.max_sequence);
531
532        let (min_ts, max_ts) = stats.time_range.unwrap();
533        assert_eq!(1000, min_ts.value());
534        assert_eq!(3000, max_ts.value());
535
536        let predicate_group = PredicateGroup::new(&metadata, &[]);
537        let ranges = memtable.ranges(None, predicate_group, None).unwrap();
538
539        assert_eq!(3, ranges.ranges.len());
540        assert_eq!(5, ranges.stats.num_rows);
541
542        for (_range_id, range) in ranges.ranges.iter() {
543            assert!(range.num_rows() > 0);
544            assert!(range.is_record_batch());
545
546            let record_batch_iter = range.build_record_batch_iter(None).unwrap();
547
548            let mut total_rows = 0;
549            for batch_result in record_batch_iter {
550                let batch = batch_result.unwrap();
551                total_rows += batch.num_rows();
552                assert!(batch.num_rows() > 0);
553                assert_eq!(8, batch.num_columns());
554            }
555            assert_eq!(total_rows, range.num_rows());
556        }
557    }
558
559    #[test]
560    fn test_bulk_memtable_ranges_with_projection() {
561        let metadata = metadata_for_test();
562        let memtable = BulkMemtable::new(111, metadata.clone(), None);
563
564        let bulk_part = create_bulk_part_with_converter(
565            "projection_test",
566            5,
567            vec![5000, 6000, 7000],
568            vec![Some(50.0), Some(60.0), Some(70.0)],
569            500,
570        )
571        .unwrap();
572
573        memtable.write_bulk(bulk_part).unwrap();
574
575        let projection = vec![4u32];
576        let predicate_group = PredicateGroup::new(&metadata, &[]);
577        let ranges = memtable
578            .ranges(Some(&projection), predicate_group, None)
579            .unwrap();
580
581        assert_eq!(1, ranges.ranges.len());
582        let range = ranges.ranges.get(&0).unwrap();
583
584        assert!(range.is_record_batch());
585        let record_batch_iter = range.build_record_batch_iter(None).unwrap();
586
587        let mut total_rows = 0;
588        for batch_result in record_batch_iter {
589            let batch = batch_result.unwrap();
590            assert!(batch.num_rows() > 0);
591            assert_eq!(5, batch.num_columns());
592            total_rows += batch.num_rows();
593        }
594        assert_eq!(3, total_rows);
595    }
596
597    #[test]
598    fn test_bulk_memtable_unsupported_operations() {
599        let metadata = metadata_for_test();
600        let memtable = BulkMemtable::new(111, metadata.clone(), None);
601
602        let key_values = build_key_values_with_ts_seq_values(
603            &metadata,
604            "test".to_string(),
605            1,
606            vec![1000].into_iter(),
607            vec![Some(1.0)].into_iter(),
608            1,
609        );
610
611        let err = memtable.write(&key_values).unwrap_err();
612        assert!(err.to_string().contains("not supported"));
613
614        let kv = key_values.iter().next().unwrap();
615        let err = memtable.write_one(kv).unwrap_err();
616        assert!(err.to_string().contains("not supported"));
617    }
618
619    #[test]
620    fn test_bulk_memtable_freeze() {
621        let metadata = metadata_for_test();
622        let memtable = BulkMemtable::new(222, metadata.clone(), None);
623
624        let bulk_part = create_bulk_part_with_converter(
625            "freeze_test",
626            10,
627            vec![10000],
628            vec![Some(100.0)],
629            1000,
630        )
631        .unwrap();
632
633        memtable.write_bulk(bulk_part).unwrap();
634        memtable.freeze().unwrap();
635
636        let stats_after_freeze = memtable.stats();
637        assert_eq!(1, stats_after_freeze.num_rows);
638    }
639
640    #[test]
641    fn test_bulk_memtable_fork() {
642        let metadata = metadata_for_test();
643        let original_memtable = BulkMemtable::new(333, metadata.clone(), None);
644
645        let bulk_part =
646            create_bulk_part_with_converter("fork_test", 15, vec![15000], vec![Some(150.0)], 1500)
647                .unwrap();
648
649        original_memtable.write_bulk(bulk_part).unwrap();
650
651        let forked_memtable = original_memtable.fork(444, &metadata);
652
653        assert_eq!(forked_memtable.id(), 444);
654        assert!(forked_memtable.is_empty());
655        assert_eq!(0, forked_memtable.stats().num_rows);
656
657        assert!(!original_memtable.is_empty());
658        assert_eq!(1, original_memtable.stats().num_rows);
659    }
660}