mito2/memtable/
simple_bulk_memtable.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16use std::fmt::{Debug, Formatter};
17use std::sync::atomic::{AtomicI64, AtomicU64, AtomicUsize, Ordering};
18use std::sync::{Arc, RwLock};
19
20use api::v1::OpType;
21use datatypes::vectors::Helper;
22use snafu::{OptionExt, ResultExt};
23use store_api::metadata::RegionMetadataRef;
24use store_api::storage::{ColumnId, SequenceNumber};
25use table::predicate::Predicate;
26
27use crate::flush::WriteBufferManagerRef;
28use crate::memtable::bulk::part::BulkPart;
29use crate::memtable::key_values::KeyValue;
30use crate::memtable::stats::WriteMetrics;
31use crate::memtable::time_series::{Series, Values};
32use crate::memtable::{
33    AllocTracker, BoxedBatchIterator, IterBuilder, KeyValues, Memtable, MemtableId, MemtableRange,
34    MemtableRangeContext, MemtableRanges, MemtableRef, MemtableStats,
35};
36use crate::read::dedup::LastNonNullIter;
37use crate::read::scan_region::PredicateGroup;
38use crate::read::Batch;
39use crate::region::options::MergeMode;
40use crate::{error, metrics};
41
42pub struct SimpleBulkMemtable {
43    id: MemtableId,
44    region_metadata: RegionMetadataRef,
45    alloc_tracker: AllocTracker,
46    max_timestamp: AtomicI64,
47    min_timestamp: AtomicI64,
48    max_sequence: AtomicU64,
49    dedup: bool,
50    merge_mode: MergeMode,
51    num_rows: AtomicUsize,
52    series: RwLock<Series>,
53}
54
55impl SimpleBulkMemtable {
56    pub(crate) fn new(
57        id: MemtableId,
58        region_metadata: RegionMetadataRef,
59        write_buffer_manager: Option<WriteBufferManagerRef>,
60        dedup: bool,
61        merge_mode: MergeMode,
62    ) -> Self {
63        let dedup = if merge_mode == MergeMode::LastNonNull {
64            false
65        } else {
66            dedup
67        };
68        let series = RwLock::new(Series::new(&region_metadata));
69
70        Self {
71            id,
72            region_metadata,
73            alloc_tracker: AllocTracker::new(write_buffer_manager),
74            max_timestamp: AtomicI64::new(i64::MIN),
75            min_timestamp: AtomicI64::new(i64::MAX),
76            max_sequence: AtomicU64::new(0),
77            dedup,
78            merge_mode,
79            num_rows: AtomicUsize::new(0),
80            series,
81        }
82    }
83
84    fn build_projection(&self, projection: Option<&[ColumnId]>) -> HashSet<ColumnId> {
85        if let Some(projection) = projection {
86            projection.iter().copied().collect()
87        } else {
88            self.region_metadata
89                .field_columns()
90                .map(|c| c.column_id)
91                .collect()
92        }
93    }
94
95    fn create_iter(
96        &self,
97        projection: Option<&[ColumnId]>,
98        sequence: Option<SequenceNumber>,
99    ) -> error::Result<BatchIterBuilder> {
100        let mut series = self.series.write().unwrap();
101
102        let values = if series.is_empty() {
103            None
104        } else {
105            Some(series.compact(&self.region_metadata)?.clone())
106        };
107
108        let projection = self.build_projection(projection);
109
110        Ok(BatchIterBuilder {
111            region_metadata: self.region_metadata.clone(),
112            values,
113            projection,
114            dedup: self.dedup,
115            sequence,
116            merge_mode: self.merge_mode,
117        })
118    }
119
120    fn write_key_value(&self, kv: KeyValue, stats: &mut WriteMetrics) {
121        let ts = kv.timestamp();
122        let sequence = kv.sequence();
123        let op_type = kv.op_type();
124        let mut series = self.series.write().unwrap();
125        let size = series.push(ts, sequence, op_type, kv.fields());
126        stats.value_bytes += size;
127        // safety: timestamp of kv must be both present and a valid timestamp value.
128        let ts = kv.timestamp().as_timestamp().unwrap().unwrap().value();
129        stats.min_ts = stats.min_ts.min(ts);
130        stats.max_ts = stats.max_ts.max(ts);
131    }
132
133    /// Updates memtable stats.
134    fn update_stats(&self, stats: WriteMetrics) {
135        self.alloc_tracker
136            .on_allocation(stats.key_bytes + stats.value_bytes);
137        self.num_rows.fetch_add(stats.num_rows, Ordering::SeqCst);
138        self.max_timestamp.fetch_max(stats.max_ts, Ordering::SeqCst);
139        self.min_timestamp.fetch_min(stats.min_ts, Ordering::SeqCst);
140        self.max_sequence
141            .fetch_max(stats.max_sequence, Ordering::SeqCst);
142    }
143
144    #[cfg(test)]
145    fn schema(&self) -> &RegionMetadataRef {
146        &self.region_metadata
147    }
148}
149
150impl Debug for SimpleBulkMemtable {
151    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
152        f.debug_struct("SimpleBulkMemtable").finish()
153    }
154}
155
156impl Memtable for SimpleBulkMemtable {
157    fn id(&self) -> MemtableId {
158        self.id
159    }
160
161    fn write(&self, kvs: &KeyValues) -> error::Result<()> {
162        let mut stats = WriteMetrics::default();
163        let max_sequence = kvs.max_sequence();
164        for kv in kvs.iter() {
165            self.write_key_value(kv, &mut stats);
166        }
167        stats.max_sequence = max_sequence;
168        stats.num_rows = kvs.num_rows();
169        self.update_stats(stats);
170        Ok(())
171    }
172
173    fn write_one(&self, kv: KeyValue) -> error::Result<()> {
174        debug_assert_eq!(0, kv.num_primary_keys());
175        let mut stats = WriteMetrics::default();
176        self.write_key_value(kv, &mut stats);
177        stats.num_rows = 1;
178        stats.max_sequence = kv.sequence();
179        self.update_stats(stats);
180        Ok(())
181    }
182
183    fn write_bulk(&self, part: BulkPart) -> error::Result<()> {
184        let rb = &part.batch;
185
186        let ts = Helper::try_into_vector(
187            rb.column_by_name(&self.region_metadata.time_index_column().column_schema.name)
188                .with_context(|| error::InvalidRequestSnafu {
189                    region_id: self.region_metadata.region_id,
190                    reason: "Timestamp not found",
191                })?,
192        )
193        .context(error::ConvertVectorSnafu)?;
194
195        let sequence = part.sequence;
196
197        let fields: Vec<_> = self
198            .region_metadata
199            .field_columns()
200            .map(|f| {
201                let array = rb.column_by_name(&f.column_schema.name).ok_or_else(|| {
202                    error::InvalidRequestSnafu {
203                        region_id: self.region_metadata.region_id,
204                        reason: format!("Column {} not found", f.column_schema.name),
205                    }
206                    .build()
207                })?;
208                Helper::try_into_vector(array).context(error::ConvertVectorSnafu)
209            })
210            .collect::<error::Result<Vec<_>>>()?;
211
212        let mut series = self.series.write().unwrap();
213        let extend_timer = metrics::REGION_WORKER_HANDLE_WRITE_ELAPSED
214            .with_label_values(&["bulk_extend"])
215            .start_timer();
216        series.extend(ts, OpType::Put as u8, sequence, fields.into_iter())?;
217        extend_timer.observe_duration();
218
219        self.update_stats(WriteMetrics {
220            key_bytes: 0,
221            value_bytes: part.estimated_size(),
222            min_ts: part.min_ts,
223            max_ts: part.max_ts,
224            num_rows: part.num_rows,
225            max_sequence: sequence,
226        });
227        Ok(())
228    }
229
230    fn iter(
231        &self,
232        projection: Option<&[ColumnId]>,
233        _predicate: Option<Predicate>,
234        sequence: Option<SequenceNumber>,
235    ) -> error::Result<BoxedBatchIterator> {
236        let iter = self.create_iter(projection, sequence)?.build()?;
237
238        if self.merge_mode == MergeMode::LastNonNull {
239            let iter = LastNonNullIter::new(iter);
240            Ok(Box::new(iter))
241        } else {
242            Ok(Box::new(iter))
243        }
244    }
245
246    fn ranges(
247        &self,
248        projection: Option<&[ColumnId]>,
249        predicate: PredicateGroup,
250        sequence: Option<SequenceNumber>,
251    ) -> error::Result<MemtableRanges> {
252        let builder = Box::new(self.create_iter(projection, sequence).unwrap());
253
254        let context = Arc::new(MemtableRangeContext::new(self.id, builder, predicate));
255        Ok(MemtableRanges {
256            ranges: [(0, MemtableRange::new(context))].into(),
257            stats: self.stats(),
258        })
259    }
260
261    fn is_empty(&self) -> bool {
262        self.series.read().unwrap().is_empty()
263    }
264
265    fn freeze(&self) -> error::Result<()> {
266        self.series.write().unwrap().freeze(&self.region_metadata);
267        Ok(())
268    }
269
270    fn stats(&self) -> MemtableStats {
271        let estimated_bytes = self.alloc_tracker.bytes_allocated();
272        let num_rows = self.num_rows.load(Ordering::Relaxed);
273        if num_rows == 0 {
274            // no rows ever written
275            return MemtableStats {
276                estimated_bytes,
277                time_range: None,
278                num_rows: 0,
279                num_ranges: 0,
280                max_sequence: 0,
281            };
282        }
283        let ts_type = self
284            .region_metadata
285            .time_index_column()
286            .column_schema
287            .data_type
288            .clone()
289            .as_timestamp()
290            .expect("Timestamp column must have timestamp type");
291        let max_timestamp = ts_type.create_timestamp(self.max_timestamp.load(Ordering::Relaxed));
292        let min_timestamp = ts_type.create_timestamp(self.min_timestamp.load(Ordering::Relaxed));
293        MemtableStats {
294            estimated_bytes,
295            time_range: Some((min_timestamp, max_timestamp)),
296            num_rows,
297            num_ranges: 1,
298            max_sequence: self.max_sequence.load(Ordering::Relaxed),
299        }
300    }
301
302    fn fork(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef {
303        Arc::new(Self::new(
304            id,
305            metadata.clone(),
306            self.alloc_tracker.write_buffer_manager(),
307            self.dedup,
308            self.merge_mode,
309        ))
310    }
311}
312
313#[derive(Clone)]
314struct BatchIterBuilder {
315    region_metadata: RegionMetadataRef,
316    values: Option<Values>,
317    projection: HashSet<ColumnId>,
318    sequence: Option<SequenceNumber>,
319    dedup: bool,
320    merge_mode: MergeMode,
321}
322
323impl IterBuilder for BatchIterBuilder {
324    fn build(&self) -> error::Result<BoxedBatchIterator> {
325        let Some(values) = self.values.clone() else {
326            return Ok(Box::new(Iter { batch: None }));
327        };
328
329        let maybe_batch = values
330            .to_batch(&[], &self.region_metadata, &self.projection, self.dedup)
331            .and_then(|mut b| {
332                b.filter_by_sequence(self.sequence)?;
333                Ok(b)
334            })
335            .map(Some)
336            .transpose();
337
338        let iter = Iter { batch: maybe_batch };
339
340        if self.merge_mode == MergeMode::LastNonNull {
341            Ok(Box::new(LastNonNullIter::new(iter)))
342        } else {
343            Ok(Box::new(iter))
344        }
345    }
346}
347
348struct Iter {
349    batch: Option<error::Result<Batch>>,
350}
351
352impl Iterator for Iter {
353    type Item = error::Result<Batch>;
354
355    fn next(&mut self) -> Option<Self::Item> {
356        self.batch.take()
357    }
358}
359
360#[cfg(test)]
361mod tests {
362    use std::sync::Arc;
363
364    use api::v1::value::ValueData;
365    use api::v1::{Mutation, OpType, Row, Rows, SemanticType};
366    use common_recordbatch::DfRecordBatch;
367    use common_time::Timestamp;
368    use datatypes::arrow::array::{ArrayRef, Float64Array, TimestampMillisecondArray};
369    use datatypes::arrow_array::StringArray;
370    use datatypes::data_type::ConcreteDataType;
371    use datatypes::prelude::{ScalarVector, Vector};
372    use datatypes::schema::ColumnSchema;
373    use datatypes::value::Value;
374    use datatypes::vectors::TimestampMillisecondVector;
375    use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
376    use store_api::storage::SequenceNumber;
377
378    use super::*;
379    use crate::region::options::MergeMode;
380    use crate::test_util::column_metadata_to_column_schema;
381
382    fn new_test_metadata() -> RegionMetadataRef {
383        let mut builder = RegionMetadataBuilder::new(1.into());
384        builder
385            .push_column_metadata(ColumnMetadata {
386                column_schema: ColumnSchema::new(
387                    "ts",
388                    ConcreteDataType::timestamp_millisecond_datatype(),
389                    false,
390                ),
391                semantic_type: SemanticType::Timestamp,
392                column_id: 1,
393            })
394            .push_column_metadata(ColumnMetadata {
395                column_schema: ColumnSchema::new("f1", ConcreteDataType::float64_datatype(), true),
396                semantic_type: SemanticType::Field,
397                column_id: 2,
398            })
399            .push_column_metadata(ColumnMetadata {
400                column_schema: ColumnSchema::new("f2", ConcreteDataType::string_datatype(), true),
401                semantic_type: SemanticType::Field,
402                column_id: 3,
403            });
404        Arc::new(builder.build().unwrap())
405    }
406
407    fn new_test_memtable(dedup: bool, merge_mode: MergeMode) -> SimpleBulkMemtable {
408        SimpleBulkMemtable::new(1, new_test_metadata(), None, dedup, merge_mode)
409    }
410
411    fn build_key_values(
412        metadata: &RegionMetadataRef,
413        sequence: SequenceNumber,
414        row_values: &[(i64, f64, String)],
415    ) -> KeyValues {
416        let column_schemas: Vec<_> = metadata
417            .column_metadatas
418            .iter()
419            .map(column_metadata_to_column_schema)
420            .collect();
421
422        let rows: Vec<_> = row_values
423            .iter()
424            .map(|(ts, f1, f2)| Row {
425                values: vec![
426                    api::v1::Value {
427                        value_data: Some(ValueData::TimestampMillisecondValue(*ts)),
428                    },
429                    api::v1::Value {
430                        value_data: Some(ValueData::F64Value(*f1)),
431                    },
432                    api::v1::Value {
433                        value_data: Some(ValueData::StringValue(f2.clone())),
434                    },
435                ],
436            })
437            .collect();
438        let mutation = Mutation {
439            op_type: OpType::Put as i32,
440            sequence,
441            rows: Some(Rows {
442                schema: column_schemas,
443                rows,
444            }),
445            write_hint: None,
446        };
447        KeyValues::new(metadata, mutation).unwrap()
448    }
449
450    #[test]
451    fn test_write_and_iter() {
452        let memtable = new_test_memtable(false, MergeMode::LastRow);
453        memtable
454            .write(&build_key_values(
455                &memtable.region_metadata,
456                0,
457                &[(1, 1.0, "a".to_string())],
458            ))
459            .unwrap();
460        memtable
461            .write(&build_key_values(
462                &memtable.region_metadata,
463                1,
464                &[(2, 2.0, "b".to_string())],
465            ))
466            .unwrap();
467
468        let mut iter = memtable.iter(None, None, None).unwrap();
469        let batch = iter.next().unwrap().unwrap();
470        assert_eq!(2, batch.num_rows());
471        assert_eq!(2, batch.fields().len());
472        let ts_v = batch
473            .timestamps()
474            .as_any()
475            .downcast_ref::<TimestampMillisecondVector>()
476            .unwrap();
477        assert_eq!(Value::Timestamp(Timestamp::new_millisecond(1)), ts_v.get(0));
478        assert_eq!(Value::Timestamp(Timestamp::new_millisecond(2)), ts_v.get(1));
479    }
480
481    #[test]
482    fn test_projection() {
483        let memtable = new_test_memtable(false, MergeMode::LastRow);
484        memtable
485            .write(&build_key_values(
486                &memtable.region_metadata,
487                0,
488                &[(1, 1.0, "a".to_string())],
489            ))
490            .unwrap();
491
492        let mut iter = memtable.iter(None, None, None).unwrap();
493        let batch = iter.next().unwrap().unwrap();
494        assert_eq!(1, batch.num_rows());
495        assert_eq!(2, batch.fields().len());
496
497        let ts_v = batch
498            .timestamps()
499            .as_any()
500            .downcast_ref::<TimestampMillisecondVector>()
501            .unwrap();
502        assert_eq!(Value::Timestamp(Timestamp::new_millisecond(1)), ts_v.get(0));
503
504        // Only project column 2 (f1)
505        let projection = vec![2];
506        let mut iter = memtable.iter(Some(&projection), None, None).unwrap();
507        let batch = iter.next().unwrap().unwrap();
508
509        assert_eq!(1, batch.num_rows());
510        assert_eq!(1, batch.fields().len()); // only f1
511        assert_eq!(2, batch.fields()[0].column_id);
512    }
513
514    #[test]
515    fn test_dedup() {
516        let memtable = new_test_memtable(true, MergeMode::LastRow);
517        memtable
518            .write(&build_key_values(
519                &memtable.region_metadata,
520                0,
521                &[(1, 1.0, "a".to_string())],
522            ))
523            .unwrap();
524        memtable
525            .write(&build_key_values(
526                &memtable.region_metadata,
527                1,
528                &[(1, 2.0, "b".to_string())],
529            ))
530            .unwrap();
531        let mut iter = memtable.iter(None, None, None).unwrap();
532        let batch = iter.next().unwrap().unwrap();
533
534        assert_eq!(1, batch.num_rows()); // deduped to 1 row
535        assert_eq!(2.0, batch.fields()[0].data.get(0).as_f64_lossy().unwrap()); // last write wins
536    }
537
538    #[test]
539    fn test_write_one() {
540        let memtable = new_test_memtable(false, MergeMode::LastRow);
541        let kvs = build_key_values(&memtable.region_metadata, 0, &[(1, 1.0, "a".to_string())]);
542        let kv = kvs.iter().next().unwrap();
543        memtable.write_one(kv).unwrap();
544
545        let mut iter = memtable.iter(None, None, None).unwrap();
546        let batch = iter.next().unwrap().unwrap();
547        assert_eq!(1, batch.num_rows());
548    }
549
550    #[test]
551    fn test_write_bulk() {
552        let memtable = new_test_memtable(false, MergeMode::LastRow);
553        let arrow_schema = memtable.schema().schema.arrow_schema().clone();
554        let arrays = vec![
555            Arc::new(TimestampMillisecondArray::from(vec![1, 2])) as ArrayRef,
556            Arc::new(Float64Array::from(vec![1.0, 2.0])) as ArrayRef,
557            Arc::new(StringArray::from(vec!["a", "b"])) as ArrayRef,
558        ];
559        let rb = DfRecordBatch::try_new(arrow_schema, arrays).unwrap();
560
561        let part = BulkPart {
562            batch: rb,
563            sequence: 1,
564            min_ts: 1,
565            max_ts: 2,
566            num_rows: 2,
567            timestamp_index: 0,
568        };
569        memtable.write_bulk(part).unwrap();
570
571        let mut iter = memtable.iter(None, None, None).unwrap();
572        let batch = iter.next().unwrap().unwrap();
573        assert_eq!(2, batch.num_rows());
574
575        let stats = memtable.stats();
576        assert_eq!(1, stats.max_sequence);
577        assert_eq!(2, stats.num_rows);
578        assert_eq!(
579            Some((Timestamp::new_millisecond(1), Timestamp::new_millisecond(2))),
580            stats.time_range
581        );
582
583        let kvs = build_key_values(&memtable.region_metadata, 2, &[(3, 3.0, "c".to_string())]);
584        memtable.write(&kvs).unwrap();
585        let mut iter = memtable.iter(None, None, None).unwrap();
586        let batch = iter.next().unwrap().unwrap();
587        assert_eq!(3, batch.num_rows());
588        assert_eq!(
589            vec![1, 2, 3],
590            batch
591                .timestamps()
592                .as_any()
593                .downcast_ref::<TimestampMillisecondVector>()
594                .unwrap()
595                .iter_data()
596                .map(|t| { t.unwrap().0.value() })
597                .collect::<Vec<_>>()
598        );
599    }
600
601    #[test]
602    fn test_is_empty() {
603        let memtable = new_test_memtable(false, MergeMode::LastRow);
604        assert!(memtable.is_empty());
605
606        memtable
607            .write(&build_key_values(
608                &memtable.region_metadata,
609                0,
610                &[(1, 1.0, "a".to_string())],
611            ))
612            .unwrap();
613        assert!(!memtable.is_empty());
614    }
615
616    #[test]
617    fn test_stats() {
618        let memtable = new_test_memtable(false, MergeMode::LastRow);
619        let stats = memtable.stats();
620        assert_eq!(0, stats.num_rows);
621        assert!(stats.time_range.is_none());
622
623        memtable
624            .write(&build_key_values(
625                &memtable.region_metadata,
626                0,
627                &[(1, 1.0, "a".to_string())],
628            ))
629            .unwrap();
630        let stats = memtable.stats();
631        assert_eq!(1, stats.num_rows);
632        assert!(stats.time_range.is_some());
633    }
634
635    #[test]
636    fn test_fork() {
637        let memtable = new_test_memtable(false, MergeMode::LastRow);
638        memtable
639            .write(&build_key_values(
640                &memtable.region_metadata,
641                0,
642                &[(1, 1.0, "a".to_string())],
643            ))
644            .unwrap();
645
646        let forked = memtable.fork(2, &memtable.region_metadata);
647        assert!(forked.is_empty());
648    }
649
650    #[test]
651    fn test_sequence_filter() {
652        let memtable = new_test_memtable(false, MergeMode::LastRow);
653        memtable
654            .write(&build_key_values(
655                &memtable.region_metadata,
656                0,
657                &[(1, 1.0, "a".to_string())],
658            ))
659            .unwrap();
660        memtable
661            .write(&build_key_values(
662                &memtable.region_metadata,
663                1,
664                &[(2, 2.0, "b".to_string())],
665            ))
666            .unwrap();
667
668        // Filter with sequence 0 should only return first write
669        let mut iter = memtable.iter(None, None, Some(0)).unwrap();
670        let batch = iter.next().unwrap().unwrap();
671        assert_eq!(1, batch.num_rows());
672        assert_eq!(1.0, batch.fields()[0].data.get(0).as_f64_lossy().unwrap());
673    }
674}