mito2/memtable/
simple_bulk_memtable.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16use std::fmt::{Debug, Formatter};
17use std::sync::atomic::{AtomicI64, AtomicU64, AtomicUsize, Ordering};
18use std::sync::{Arc, RwLock};
19
20use api::v1::OpType;
21use datatypes::vectors::Helper;
22use mito_codec::key_values::KeyValue;
23use snafu::{OptionExt, ResultExt};
24use store_api::metadata::RegionMetadataRef;
25use store_api::storage::{ColumnId, SequenceNumber};
26use table::predicate::Predicate;
27
28use crate::flush::WriteBufferManagerRef;
29use crate::memtable::bulk::part::BulkPart;
30use crate::memtable::stats::WriteMetrics;
31use crate::memtable::time_series::{Series, Values};
32use crate::memtable::{
33    AllocTracker, BoxedBatchIterator, IterBuilder, KeyValues, Memtable, MemtableId, MemtableRange,
34    MemtableRangeContext, MemtableRanges, MemtableRef, MemtableStats,
35};
36use crate::metrics::MEMTABLE_ACTIVE_SERIES_COUNT;
37use crate::read::dedup::LastNonNullIter;
38use crate::read::scan_region::PredicateGroup;
39use crate::read::Batch;
40use crate::region::options::MergeMode;
41use crate::{error, metrics};
42
43pub struct SimpleBulkMemtable {
44    id: MemtableId,
45    region_metadata: RegionMetadataRef,
46    alloc_tracker: AllocTracker,
47    max_timestamp: AtomicI64,
48    min_timestamp: AtomicI64,
49    max_sequence: AtomicU64,
50    dedup: bool,
51    merge_mode: MergeMode,
52    num_rows: AtomicUsize,
53    series: RwLock<Series>,
54}
55
56impl Drop for SimpleBulkMemtable {
57    fn drop(&mut self) {
58        MEMTABLE_ACTIVE_SERIES_COUNT.dec();
59    }
60}
61
62impl SimpleBulkMemtable {
63    pub(crate) fn new(
64        id: MemtableId,
65        region_metadata: RegionMetadataRef,
66        write_buffer_manager: Option<WriteBufferManagerRef>,
67        dedup: bool,
68        merge_mode: MergeMode,
69    ) -> Self {
70        let dedup = if merge_mode == MergeMode::LastNonNull {
71            false
72        } else {
73            dedup
74        };
75        let series = RwLock::new(Series::with_capacity(&region_metadata, 1024));
76
77        Self {
78            id,
79            region_metadata,
80            alloc_tracker: AllocTracker::new(write_buffer_manager),
81            max_timestamp: AtomicI64::new(i64::MIN),
82            min_timestamp: AtomicI64::new(i64::MAX),
83            max_sequence: AtomicU64::new(0),
84            dedup,
85            merge_mode,
86            num_rows: AtomicUsize::new(0),
87            series,
88        }
89    }
90
91    fn build_projection(&self, projection: Option<&[ColumnId]>) -> HashSet<ColumnId> {
92        if let Some(projection) = projection {
93            projection.iter().copied().collect()
94        } else {
95            self.region_metadata
96                .field_columns()
97                .map(|c| c.column_id)
98                .collect()
99        }
100    }
101
102    fn create_iter(
103        &self,
104        projection: Option<&[ColumnId]>,
105        sequence: Option<SequenceNumber>,
106    ) -> error::Result<BatchIterBuilder> {
107        let mut series = self.series.write().unwrap();
108
109        let values = if series.is_empty() {
110            None
111        } else {
112            Some(series.compact(&self.region_metadata)?.clone())
113        };
114
115        let projection = self.build_projection(projection);
116
117        Ok(BatchIterBuilder {
118            region_metadata: self.region_metadata.clone(),
119            values,
120            projection,
121            dedup: self.dedup,
122            sequence,
123            merge_mode: self.merge_mode,
124        })
125    }
126
127    fn write_key_value(&self, kv: KeyValue, stats: &mut WriteMetrics) {
128        let ts = kv.timestamp();
129        let sequence = kv.sequence();
130        let op_type = kv.op_type();
131        let mut series = self.series.write().unwrap();
132        let size = series.push(ts, sequence, op_type, kv.fields());
133        stats.value_bytes += size;
134        // safety: timestamp of kv must be both present and a valid timestamp value.
135        let ts = kv.timestamp().as_timestamp().unwrap().unwrap().value();
136        stats.min_ts = stats.min_ts.min(ts);
137        stats.max_ts = stats.max_ts.max(ts);
138    }
139
140    /// Updates memtable stats.
141    fn update_stats(&self, stats: WriteMetrics) {
142        self.alloc_tracker
143            .on_allocation(stats.key_bytes + stats.value_bytes);
144        self.num_rows.fetch_add(stats.num_rows, Ordering::SeqCst);
145        self.max_timestamp.fetch_max(stats.max_ts, Ordering::SeqCst);
146        self.min_timestamp.fetch_min(stats.min_ts, Ordering::SeqCst);
147        self.max_sequence
148            .fetch_max(stats.max_sequence, Ordering::SeqCst);
149    }
150
151    #[cfg(test)]
152    fn schema(&self) -> &RegionMetadataRef {
153        &self.region_metadata
154    }
155}
156
157impl Debug for SimpleBulkMemtable {
158    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
159        f.debug_struct("SimpleBulkMemtable").finish()
160    }
161}
162
163impl Memtable for SimpleBulkMemtable {
164    fn id(&self) -> MemtableId {
165        self.id
166    }
167
168    fn write(&self, kvs: &KeyValues) -> error::Result<()> {
169        let mut stats = WriteMetrics::default();
170        let max_sequence = kvs.max_sequence();
171        for kv in kvs.iter() {
172            self.write_key_value(kv, &mut stats);
173        }
174        stats.max_sequence = max_sequence;
175        stats.num_rows = kvs.num_rows();
176        self.update_stats(stats);
177        Ok(())
178    }
179
180    fn write_one(&self, kv: KeyValue) -> error::Result<()> {
181        debug_assert_eq!(0, kv.num_primary_keys());
182        let mut stats = WriteMetrics::default();
183        self.write_key_value(kv, &mut stats);
184        stats.num_rows = 1;
185        stats.max_sequence = kv.sequence();
186        self.update_stats(stats);
187        Ok(())
188    }
189
190    fn write_bulk(&self, part: BulkPart) -> error::Result<()> {
191        let rb = &part.batch;
192
193        let ts = Helper::try_into_vector(
194            rb.column_by_name(&self.region_metadata.time_index_column().column_schema.name)
195                .with_context(|| error::InvalidRequestSnafu {
196                    region_id: self.region_metadata.region_id,
197                    reason: "Timestamp not found",
198                })?,
199        )
200        .context(error::ConvertVectorSnafu)?;
201
202        let sequence = part.sequence;
203
204        let fields: Vec<_> = self
205            .region_metadata
206            .field_columns()
207            .map(|f| {
208                let array = rb.column_by_name(&f.column_schema.name).ok_or_else(|| {
209                    error::InvalidRequestSnafu {
210                        region_id: self.region_metadata.region_id,
211                        reason: format!("Column {} not found", f.column_schema.name),
212                    }
213                    .build()
214                })?;
215                Helper::try_into_vector(array).context(error::ConvertVectorSnafu)
216            })
217            .collect::<error::Result<Vec<_>>>()?;
218
219        let mut series = self.series.write().unwrap();
220        let extend_timer = metrics::REGION_WORKER_HANDLE_WRITE_ELAPSED
221            .with_label_values(&["bulk_extend"])
222            .start_timer();
223        series.extend(ts, OpType::Put as u8, sequence, fields.into_iter())?;
224        extend_timer.observe_duration();
225
226        self.update_stats(WriteMetrics {
227            key_bytes: 0,
228            value_bytes: part.estimated_size(),
229            min_ts: part.min_ts,
230            max_ts: part.max_ts,
231            num_rows: part.num_rows(),
232            max_sequence: sequence,
233        });
234        Ok(())
235    }
236
237    fn iter(
238        &self,
239        projection: Option<&[ColumnId]>,
240        _predicate: Option<Predicate>,
241        sequence: Option<SequenceNumber>,
242    ) -> error::Result<BoxedBatchIterator> {
243        let iter = self.create_iter(projection, sequence)?.build()?;
244
245        if self.merge_mode == MergeMode::LastNonNull {
246            let iter = LastNonNullIter::new(iter);
247            Ok(Box::new(iter))
248        } else {
249            Ok(Box::new(iter))
250        }
251    }
252
253    fn ranges(
254        &self,
255        projection: Option<&[ColumnId]>,
256        predicate: PredicateGroup,
257        sequence: Option<SequenceNumber>,
258    ) -> error::Result<MemtableRanges> {
259        let builder = Box::new(self.create_iter(projection, sequence).unwrap());
260
261        let context = Arc::new(MemtableRangeContext::new(self.id, builder, predicate));
262        Ok(MemtableRanges {
263            ranges: [(0, MemtableRange::new(context))].into(),
264            stats: self.stats(),
265        })
266    }
267
268    fn is_empty(&self) -> bool {
269        self.series.read().unwrap().is_empty()
270    }
271
272    fn freeze(&self) -> error::Result<()> {
273        self.series.write().unwrap().freeze(&self.region_metadata);
274        Ok(())
275    }
276
277    fn stats(&self) -> MemtableStats {
278        let estimated_bytes = self.alloc_tracker.bytes_allocated();
279        let num_rows = self.num_rows.load(Ordering::Relaxed);
280        if num_rows == 0 {
281            // no rows ever written
282            return MemtableStats {
283                estimated_bytes,
284                time_range: None,
285                num_rows: 0,
286                num_ranges: 0,
287                max_sequence: 0,
288                series_count: 0,
289            };
290        }
291        let ts_type = self
292            .region_metadata
293            .time_index_column()
294            .column_schema
295            .data_type
296            .clone()
297            .as_timestamp()
298            .expect("Timestamp column must have timestamp type");
299        let max_timestamp = ts_type.create_timestamp(self.max_timestamp.load(Ordering::Relaxed));
300        let min_timestamp = ts_type.create_timestamp(self.min_timestamp.load(Ordering::Relaxed));
301        MemtableStats {
302            estimated_bytes,
303            time_range: Some((min_timestamp, max_timestamp)),
304            num_rows,
305            num_ranges: 1,
306            max_sequence: self.max_sequence.load(Ordering::Relaxed),
307            series_count: 1,
308        }
309    }
310
311    fn fork(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef {
312        Arc::new(Self::new(
313            id,
314            metadata.clone(),
315            self.alloc_tracker.write_buffer_manager(),
316            self.dedup,
317            self.merge_mode,
318        ))
319    }
320}
321
322#[derive(Clone)]
323struct BatchIterBuilder {
324    region_metadata: RegionMetadataRef,
325    values: Option<Values>,
326    projection: HashSet<ColumnId>,
327    sequence: Option<SequenceNumber>,
328    dedup: bool,
329    merge_mode: MergeMode,
330}
331
332impl IterBuilder for BatchIterBuilder {
333    fn build(&self) -> error::Result<BoxedBatchIterator> {
334        let Some(values) = self.values.clone() else {
335            return Ok(Box::new(Iter { batch: None }));
336        };
337
338        let maybe_batch = values
339            .to_batch(&[], &self.region_metadata, &self.projection, self.dedup)
340            .and_then(|mut b| {
341                b.filter_by_sequence(self.sequence)?;
342                Ok(b)
343            })
344            .map(Some)
345            .transpose();
346
347        let iter = Iter { batch: maybe_batch };
348
349        if self.merge_mode == MergeMode::LastNonNull {
350            Ok(Box::new(LastNonNullIter::new(iter)))
351        } else {
352            Ok(Box::new(iter))
353        }
354    }
355}
356
357struct Iter {
358    batch: Option<error::Result<Batch>>,
359}
360
361impl Iterator for Iter {
362    type Item = error::Result<Batch>;
363
364    fn next(&mut self) -> Option<Self::Item> {
365        self.batch.take()
366    }
367}
368
369#[cfg(test)]
370mod tests {
371    use std::sync::Arc;
372
373    use api::v1::value::ValueData;
374    use api::v1::{Mutation, OpType, Row, Rows, SemanticType};
375    use common_recordbatch::DfRecordBatch;
376    use common_time::Timestamp;
377    use datatypes::arrow::array::{ArrayRef, Float64Array, TimestampMillisecondArray};
378    use datatypes::arrow_array::StringArray;
379    use datatypes::data_type::ConcreteDataType;
380    use datatypes::prelude::{ScalarVector, Vector};
381    use datatypes::schema::ColumnSchema;
382    use datatypes::value::Value;
383    use datatypes::vectors::TimestampMillisecondVector;
384    use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
385    use store_api::storage::SequenceNumber;
386
387    use super::*;
388    use crate::region::options::MergeMode;
389    use crate::test_util::column_metadata_to_column_schema;
390
391    fn new_test_metadata() -> RegionMetadataRef {
392        let mut builder = RegionMetadataBuilder::new(1.into());
393        builder
394            .push_column_metadata(ColumnMetadata {
395                column_schema: ColumnSchema::new(
396                    "ts",
397                    ConcreteDataType::timestamp_millisecond_datatype(),
398                    false,
399                ),
400                semantic_type: SemanticType::Timestamp,
401                column_id: 1,
402            })
403            .push_column_metadata(ColumnMetadata {
404                column_schema: ColumnSchema::new("f1", ConcreteDataType::float64_datatype(), true),
405                semantic_type: SemanticType::Field,
406                column_id: 2,
407            })
408            .push_column_metadata(ColumnMetadata {
409                column_schema: ColumnSchema::new("f2", ConcreteDataType::string_datatype(), true),
410                semantic_type: SemanticType::Field,
411                column_id: 3,
412            });
413        Arc::new(builder.build().unwrap())
414    }
415
416    fn new_test_memtable(dedup: bool, merge_mode: MergeMode) -> SimpleBulkMemtable {
417        SimpleBulkMemtable::new(1, new_test_metadata(), None, dedup, merge_mode)
418    }
419
420    fn build_key_values(
421        metadata: &RegionMetadataRef,
422        sequence: SequenceNumber,
423        row_values: &[(i64, f64, String)],
424    ) -> KeyValues {
425        let column_schemas: Vec<_> = metadata
426            .column_metadatas
427            .iter()
428            .map(column_metadata_to_column_schema)
429            .collect();
430
431        let rows: Vec<_> = row_values
432            .iter()
433            .map(|(ts, f1, f2)| Row {
434                values: vec![
435                    api::v1::Value {
436                        value_data: Some(ValueData::TimestampMillisecondValue(*ts)),
437                    },
438                    api::v1::Value {
439                        value_data: Some(ValueData::F64Value(*f1)),
440                    },
441                    api::v1::Value {
442                        value_data: Some(ValueData::StringValue(f2.clone())),
443                    },
444                ],
445            })
446            .collect();
447        let mutation = Mutation {
448            op_type: OpType::Put as i32,
449            sequence,
450            rows: Some(Rows {
451                schema: column_schemas,
452                rows,
453            }),
454            write_hint: None,
455        };
456        KeyValues::new(metadata, mutation).unwrap()
457    }
458
459    #[test]
460    fn test_write_and_iter() {
461        let memtable = new_test_memtable(false, MergeMode::LastRow);
462        memtable
463            .write(&build_key_values(
464                &memtable.region_metadata,
465                0,
466                &[(1, 1.0, "a".to_string())],
467            ))
468            .unwrap();
469        memtable
470            .write(&build_key_values(
471                &memtable.region_metadata,
472                1,
473                &[(2, 2.0, "b".to_string())],
474            ))
475            .unwrap();
476
477        let mut iter = memtable.iter(None, None, None).unwrap();
478        let batch = iter.next().unwrap().unwrap();
479        assert_eq!(2, batch.num_rows());
480        assert_eq!(2, batch.fields().len());
481        let ts_v = batch
482            .timestamps()
483            .as_any()
484            .downcast_ref::<TimestampMillisecondVector>()
485            .unwrap();
486        assert_eq!(Value::Timestamp(Timestamp::new_millisecond(1)), ts_v.get(0));
487        assert_eq!(Value::Timestamp(Timestamp::new_millisecond(2)), ts_v.get(1));
488    }
489
490    #[test]
491    fn test_projection() {
492        let memtable = new_test_memtable(false, MergeMode::LastRow);
493        memtable
494            .write(&build_key_values(
495                &memtable.region_metadata,
496                0,
497                &[(1, 1.0, "a".to_string())],
498            ))
499            .unwrap();
500
501        let mut iter = memtable.iter(None, None, None).unwrap();
502        let batch = iter.next().unwrap().unwrap();
503        assert_eq!(1, batch.num_rows());
504        assert_eq!(2, batch.fields().len());
505
506        let ts_v = batch
507            .timestamps()
508            .as_any()
509            .downcast_ref::<TimestampMillisecondVector>()
510            .unwrap();
511        assert_eq!(Value::Timestamp(Timestamp::new_millisecond(1)), ts_v.get(0));
512
513        // Only project column 2 (f1)
514        let projection = vec![2];
515        let mut iter = memtable.iter(Some(&projection), None, None).unwrap();
516        let batch = iter.next().unwrap().unwrap();
517
518        assert_eq!(1, batch.num_rows());
519        assert_eq!(1, batch.fields().len()); // only f1
520        assert_eq!(2, batch.fields()[0].column_id);
521    }
522
523    #[test]
524    fn test_dedup() {
525        let memtable = new_test_memtable(true, MergeMode::LastRow);
526        memtable
527            .write(&build_key_values(
528                &memtable.region_metadata,
529                0,
530                &[(1, 1.0, "a".to_string())],
531            ))
532            .unwrap();
533        memtable
534            .write(&build_key_values(
535                &memtable.region_metadata,
536                1,
537                &[(1, 2.0, "b".to_string())],
538            ))
539            .unwrap();
540        let mut iter = memtable.iter(None, None, None).unwrap();
541        let batch = iter.next().unwrap().unwrap();
542
543        assert_eq!(1, batch.num_rows()); // deduped to 1 row
544        assert_eq!(2.0, batch.fields()[0].data.get(0).as_f64_lossy().unwrap()); // last write wins
545    }
546
547    #[test]
548    fn test_write_one() {
549        let memtable = new_test_memtable(false, MergeMode::LastRow);
550        let kvs = build_key_values(&memtable.region_metadata, 0, &[(1, 1.0, "a".to_string())]);
551        let kv = kvs.iter().next().unwrap();
552        memtable.write_one(kv).unwrap();
553
554        let mut iter = memtable.iter(None, None, None).unwrap();
555        let batch = iter.next().unwrap().unwrap();
556        assert_eq!(1, batch.num_rows());
557    }
558
559    #[test]
560    fn test_write_bulk() {
561        let memtable = new_test_memtable(false, MergeMode::LastRow);
562        let arrow_schema = memtable.schema().schema.arrow_schema().clone();
563        let arrays = vec![
564            Arc::new(TimestampMillisecondArray::from(vec![1, 2])) as ArrayRef,
565            Arc::new(Float64Array::from(vec![1.0, 2.0])) as ArrayRef,
566            Arc::new(StringArray::from(vec!["a", "b"])) as ArrayRef,
567        ];
568        let rb = DfRecordBatch::try_new(arrow_schema, arrays).unwrap();
569
570        let part = BulkPart {
571            batch: rb,
572            sequence: 1,
573            min_ts: 1,
574            max_ts: 2,
575            timestamp_index: 0,
576            raw_data: None,
577        };
578        memtable.write_bulk(part).unwrap();
579
580        let mut iter = memtable.iter(None, None, None).unwrap();
581        let batch = iter.next().unwrap().unwrap();
582        assert_eq!(2, batch.num_rows());
583
584        let stats = memtable.stats();
585        assert_eq!(1, stats.max_sequence);
586        assert_eq!(2, stats.num_rows);
587        assert_eq!(
588            Some((Timestamp::new_millisecond(1), Timestamp::new_millisecond(2))),
589            stats.time_range
590        );
591
592        let kvs = build_key_values(&memtable.region_metadata, 2, &[(3, 3.0, "c".to_string())]);
593        memtable.write(&kvs).unwrap();
594        let mut iter = memtable.iter(None, None, None).unwrap();
595        let batch = iter.next().unwrap().unwrap();
596        assert_eq!(3, batch.num_rows());
597        assert_eq!(
598            vec![1, 2, 3],
599            batch
600                .timestamps()
601                .as_any()
602                .downcast_ref::<TimestampMillisecondVector>()
603                .unwrap()
604                .iter_data()
605                .map(|t| { t.unwrap().0.value() })
606                .collect::<Vec<_>>()
607        );
608    }
609
610    #[test]
611    fn test_is_empty() {
612        let memtable = new_test_memtable(false, MergeMode::LastRow);
613        assert!(memtable.is_empty());
614
615        memtable
616            .write(&build_key_values(
617                &memtable.region_metadata,
618                0,
619                &[(1, 1.0, "a".to_string())],
620            ))
621            .unwrap();
622        assert!(!memtable.is_empty());
623    }
624
625    #[test]
626    fn test_stats() {
627        let memtable = new_test_memtable(false, MergeMode::LastRow);
628        let stats = memtable.stats();
629        assert_eq!(0, stats.num_rows);
630        assert!(stats.time_range.is_none());
631
632        memtable
633            .write(&build_key_values(
634                &memtable.region_metadata,
635                0,
636                &[(1, 1.0, "a".to_string())],
637            ))
638            .unwrap();
639        let stats = memtable.stats();
640        assert_eq!(1, stats.num_rows);
641        assert!(stats.time_range.is_some());
642    }
643
644    #[test]
645    fn test_fork() {
646        let memtable = new_test_memtable(false, MergeMode::LastRow);
647        memtable
648            .write(&build_key_values(
649                &memtable.region_metadata,
650                0,
651                &[(1, 1.0, "a".to_string())],
652            ))
653            .unwrap();
654
655        let forked = memtable.fork(2, &memtable.region_metadata);
656        assert!(forked.is_empty());
657    }
658
659    #[test]
660    fn test_sequence_filter() {
661        let memtable = new_test_memtable(false, MergeMode::LastRow);
662        memtable
663            .write(&build_key_values(
664                &memtable.region_metadata,
665                0,
666                &[(1, 1.0, "a".to_string())],
667            ))
668            .unwrap();
669        memtable
670            .write(&build_key_values(
671                &memtable.region_metadata,
672                1,
673                &[(2, 2.0, "b".to_string())],
674            ))
675            .unwrap();
676
677        // Filter with sequence 0 should only return first write
678        let mut iter = memtable.iter(None, None, Some(0)).unwrap();
679        let batch = iter.next().unwrap().unwrap();
680        assert_eq!(1, batch.num_rows());
681        assert_eq!(1.0, batch.fields()[0].data.get(0).as_f64_lossy().unwrap());
682    }
683}