mito2/memtable/
simple_bulk_memtable.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#[cfg(any(test, feature = "test"))]
16mod test_only;
17
18use std::collections::HashSet;
19use std::fmt::{Debug, Formatter};
20use std::sync::atomic::{AtomicI64, AtomicU64, AtomicUsize, Ordering};
21use std::sync::{Arc, RwLock};
22use std::time::{Duration, Instant};
23
24use api::v1::OpType;
25use datatypes::vectors::Helper;
26use mito_codec::key_values::KeyValue;
27use rayon::prelude::*;
28use snafu::{OptionExt, ResultExt};
29use store_api::metadata::RegionMetadataRef;
30use store_api::storage::{ColumnId, SequenceNumber};
31
32use crate::flush::WriteBufferManagerRef;
33use crate::memtable::bulk::part::BulkPart;
34use crate::memtable::stats::WriteMetrics;
35use crate::memtable::time_series::Series;
36use crate::memtable::{
37    AllocTracker, BoxedBatchIterator, IterBuilder, KeyValues, MemScanMetrics, Memtable, MemtableId,
38    MemtableRange, MemtableRangeContext, MemtableRanges, MemtableRef, MemtableStats,
39};
40use crate::metrics::MEMTABLE_ACTIVE_SERIES_COUNT;
41use crate::read::dedup::LastNonNullIter;
42use crate::read::scan_region::PredicateGroup;
43use crate::read::Batch;
44use crate::region::options::MergeMode;
45use crate::{error, metrics};
46
47pub struct SimpleBulkMemtable {
48    id: MemtableId,
49    region_metadata: RegionMetadataRef,
50    alloc_tracker: AllocTracker,
51    max_timestamp: AtomicI64,
52    min_timestamp: AtomicI64,
53    max_sequence: AtomicU64,
54    dedup: bool,
55    merge_mode: MergeMode,
56    num_rows: AtomicUsize,
57    series: RwLock<Series>,
58}
59
60impl Drop for SimpleBulkMemtable {
61    fn drop(&mut self) {
62        MEMTABLE_ACTIVE_SERIES_COUNT.dec();
63    }
64}
65
66impl SimpleBulkMemtable {
67    pub fn new(
68        id: MemtableId,
69        region_metadata: RegionMetadataRef,
70        write_buffer_manager: Option<WriteBufferManagerRef>,
71        dedup: bool,
72        merge_mode: MergeMode,
73    ) -> Self {
74        let dedup = if merge_mode == MergeMode::LastNonNull {
75            false
76        } else {
77            dedup
78        };
79        let series = RwLock::new(Series::with_capacity(&region_metadata, 1024, 8192));
80
81        Self {
82            id,
83            region_metadata,
84            alloc_tracker: AllocTracker::new(write_buffer_manager),
85            max_timestamp: AtomicI64::new(i64::MIN),
86            min_timestamp: AtomicI64::new(i64::MAX),
87            max_sequence: AtomicU64::new(0),
88            dedup,
89            merge_mode,
90            num_rows: AtomicUsize::new(0),
91            series,
92        }
93    }
94
95    fn build_projection(&self, projection: Option<&[ColumnId]>) -> HashSet<ColumnId> {
96        if let Some(projection) = projection {
97            projection.iter().copied().collect()
98        } else {
99            self.region_metadata
100                .field_columns()
101                .map(|c| c.column_id)
102                .collect()
103        }
104    }
105
106    fn write_key_value(&self, kv: KeyValue, stats: &mut WriteMetrics) {
107        let ts = kv.timestamp();
108        let sequence = kv.sequence();
109        let op_type = kv.op_type();
110        let mut series = self.series.write().unwrap();
111        let size = series.push(ts, sequence, op_type, kv.fields());
112        stats.value_bytes += size;
113        // safety: timestamp of kv must be both present and a valid timestamp value.
114        let ts = kv.timestamp().as_timestamp().unwrap().unwrap().value();
115        stats.min_ts = stats.min_ts.min(ts);
116        stats.max_ts = stats.max_ts.max(ts);
117    }
118
119    /// Updates memtable stats.
120    fn update_stats(&self, stats: WriteMetrics) {
121        self.alloc_tracker
122            .on_allocation(stats.key_bytes + stats.value_bytes);
123        self.num_rows.fetch_add(stats.num_rows, Ordering::SeqCst);
124        self.max_timestamp.fetch_max(stats.max_ts, Ordering::SeqCst);
125        self.min_timestamp.fetch_min(stats.min_ts, Ordering::SeqCst);
126        self.max_sequence
127            .fetch_max(stats.max_sequence, Ordering::SeqCst);
128    }
129
130    #[cfg(test)]
131    fn schema(&self) -> &RegionMetadataRef {
132        &self.region_metadata
133    }
134}
135
136impl Debug for SimpleBulkMemtable {
137    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
138        f.debug_struct("SimpleBulkMemtable").finish()
139    }
140}
141
142impl Memtable for SimpleBulkMemtable {
143    fn id(&self) -> MemtableId {
144        self.id
145    }
146
147    fn write(&self, kvs: &KeyValues) -> error::Result<()> {
148        let mut stats = WriteMetrics::default();
149        let max_sequence = kvs.max_sequence();
150        for kv in kvs.iter() {
151            self.write_key_value(kv, &mut stats);
152        }
153        stats.max_sequence = max_sequence;
154        stats.num_rows = kvs.num_rows();
155        self.update_stats(stats);
156        Ok(())
157    }
158
159    fn write_one(&self, kv: KeyValue) -> error::Result<()> {
160        debug_assert_eq!(0, kv.num_primary_keys());
161        let mut stats = WriteMetrics::default();
162        self.write_key_value(kv, &mut stats);
163        stats.num_rows = 1;
164        stats.max_sequence = kv.sequence();
165        self.update_stats(stats);
166        Ok(())
167    }
168
169    fn write_bulk(&self, part: BulkPart) -> error::Result<()> {
170        let rb = &part.batch;
171
172        let ts = Helper::try_into_vector(
173            rb.column_by_name(&self.region_metadata.time_index_column().column_schema.name)
174                .with_context(|| error::InvalidRequestSnafu {
175                    region_id: self.region_metadata.region_id,
176                    reason: "Timestamp not found",
177                })?,
178        )
179        .context(error::ConvertVectorSnafu)?;
180
181        let sequence = part.sequence;
182
183        let fields: Vec<_> = self
184            .region_metadata
185            .field_columns()
186            .map(|f| {
187                let array = rb.column_by_name(&f.column_schema.name).ok_or_else(|| {
188                    error::InvalidRequestSnafu {
189                        region_id: self.region_metadata.region_id,
190                        reason: format!("Column {} not found", f.column_schema.name),
191                    }
192                    .build()
193                })?;
194                Helper::try_into_vector(array).context(error::ConvertVectorSnafu)
195            })
196            .collect::<error::Result<Vec<_>>>()?;
197
198        let mut series = self.series.write().unwrap();
199        let extend_timer = metrics::REGION_WORKER_HANDLE_WRITE_ELAPSED
200            .with_label_values(&["bulk_extend"])
201            .start_timer();
202        series.extend(ts, OpType::Put as u8, sequence, fields)?;
203        extend_timer.observe_duration();
204
205        self.update_stats(WriteMetrics {
206            key_bytes: 0,
207            value_bytes: part.estimated_size(),
208            min_ts: part.min_ts,
209            max_ts: part.max_ts,
210            num_rows: part.num_rows(),
211            max_sequence: sequence,
212        });
213        Ok(())
214    }
215
216    #[cfg(any(test, feature = "test"))]
217    fn iter(
218        &self,
219        projection: Option<&[ColumnId]>,
220        _predicate: Option<table::predicate::Predicate>,
221        sequence: Option<SequenceNumber>,
222    ) -> error::Result<BoxedBatchIterator> {
223        let iter = self.create_iter(projection, sequence)?.build(None)?;
224
225        if self.merge_mode == MergeMode::LastNonNull {
226            let iter = LastNonNullIter::new(iter);
227            Ok(Box::new(iter))
228        } else {
229            Ok(Box::new(iter))
230        }
231    }
232
233    fn ranges(
234        &self,
235        projection: Option<&[ColumnId]>,
236        predicate: PredicateGroup,
237        sequence: Option<SequenceNumber>,
238    ) -> error::Result<MemtableRanges> {
239        let start_time = Instant::now();
240        let projection = Arc::new(self.build_projection(projection));
241        let values = self.series.read().unwrap().read_to_values();
242        let contexts = values
243            .into_par_iter()
244            .filter_map(|v| {
245                let filtered = match v
246                    .to_batch(&[], &self.region_metadata, &projection, self.dedup)
247                    .and_then(|mut b| {
248                        b.filter_by_sequence(sequence)?;
249                        Ok(b)
250                    }) {
251                    Ok(filtered) => filtered,
252                    Err(e) => {
253                        return Some(Err(e));
254                    }
255                };
256                if filtered.is_empty() {
257                    None
258                } else {
259                    Some(Ok(filtered))
260                }
261            })
262            .map(|result| {
263                result.map(|batch| {
264                    let num_rows = batch.num_rows();
265                    let builder = BatchRangeBuilder {
266                        batch,
267                        merge_mode: self.merge_mode,
268                        scan_cost: start_time.elapsed(),
269                    };
270                    (
271                        num_rows,
272                        Arc::new(MemtableRangeContext::new(
273                            self.id,
274                            Box::new(builder),
275                            predicate.clone(),
276                        )),
277                    )
278                })
279            })
280            .collect::<error::Result<Vec<_>>>()?;
281
282        let ranges = contexts
283            .into_iter()
284            .enumerate()
285            .map(|(idx, (num_rows, context))| (idx, MemtableRange::new(context, num_rows)))
286            .collect();
287
288        Ok(MemtableRanges {
289            ranges,
290            stats: self.stats(),
291        })
292    }
293
294    fn is_empty(&self) -> bool {
295        self.series.read().unwrap().is_empty()
296    }
297
298    fn freeze(&self) -> error::Result<()> {
299        self.series.write().unwrap().freeze(&self.region_metadata);
300        Ok(())
301    }
302
303    fn stats(&self) -> MemtableStats {
304        let estimated_bytes = self.alloc_tracker.bytes_allocated();
305        let num_rows = self.num_rows.load(Ordering::Relaxed);
306        if num_rows == 0 {
307            // no rows ever written
308            return MemtableStats {
309                estimated_bytes,
310                time_range: None,
311                num_rows: 0,
312                num_ranges: 0,
313                max_sequence: 0,
314                series_count: 0,
315            };
316        }
317        let ts_type = self
318            .region_metadata
319            .time_index_column()
320            .column_schema
321            .data_type
322            .clone()
323            .as_timestamp()
324            .expect("Timestamp column must have timestamp type");
325        let max_timestamp = ts_type.create_timestamp(self.max_timestamp.load(Ordering::Relaxed));
326        let min_timestamp = ts_type.create_timestamp(self.min_timestamp.load(Ordering::Relaxed));
327        MemtableStats {
328            estimated_bytes,
329            time_range: Some((min_timestamp, max_timestamp)),
330            num_rows,
331            num_ranges: 1,
332            max_sequence: self.max_sequence.load(Ordering::Relaxed),
333            series_count: 1,
334        }
335    }
336
337    fn fork(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef {
338        Arc::new(Self::new(
339            id,
340            metadata.clone(),
341            self.alloc_tracker.write_buffer_manager(),
342            self.dedup,
343            self.merge_mode,
344        ))
345    }
346}
347
348#[derive(Clone)]
349pub struct BatchRangeBuilder {
350    pub batch: Batch,
351    pub merge_mode: MergeMode,
352    scan_cost: Duration,
353}
354
355impl IterBuilder for BatchRangeBuilder {
356    fn build(&self, metrics: Option<MemScanMetrics>) -> error::Result<BoxedBatchIterator> {
357        let batch = self.batch.clone();
358        if let Some(metrics) = metrics {
359            let inner = crate::memtable::MemScanMetricsData {
360                total_series: 1,
361                num_rows: batch.num_rows(),
362                num_batches: 1,
363                scan_cost: self.scan_cost,
364            };
365            metrics.merge_inner(&inner);
366        }
367
368        let iter = Iter {
369            batch: Some(Ok(batch)),
370        };
371
372        if self.merge_mode == MergeMode::LastNonNull {
373            Ok(Box::new(LastNonNullIter::new(iter)))
374        } else {
375            Ok(Box::new(iter))
376        }
377    }
378}
379
380struct Iter {
381    batch: Option<error::Result<Batch>>,
382}
383
384impl Iterator for Iter {
385    type Item = error::Result<Batch>;
386
387    fn next(&mut self) -> Option<Self::Item> {
388        self.batch.take()
389    }
390}
391
392#[cfg(test)]
393mod tests {
394    use std::sync::Arc;
395
396    use api::v1::value::ValueData;
397    use api::v1::{Mutation, OpType, Row, Rows, SemanticType};
398    use common_recordbatch::DfRecordBatch;
399    use common_time::Timestamp;
400    use datatypes::arrow::array::{ArrayRef, Float64Array, RecordBatch, TimestampMillisecondArray};
401    use datatypes::arrow_array::StringArray;
402    use datatypes::data_type::ConcreteDataType;
403    use datatypes::prelude::{ScalarVector, Vector};
404    use datatypes::schema::ColumnSchema;
405    use datatypes::value::Value;
406    use datatypes::vectors::TimestampMillisecondVector;
407    use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
408    use store_api::storage::{RegionId, SequenceNumber};
409
410    use super::*;
411    use crate::read;
412    use crate::read::dedup::DedupReader;
413    use crate::read::merge::MergeReaderBuilder;
414    use crate::read::{BatchReader, Source};
415    use crate::region::options::MergeMode;
416    use crate::test_util::column_metadata_to_column_schema;
417
418    fn new_test_metadata() -> RegionMetadataRef {
419        let mut builder = RegionMetadataBuilder::new(1.into());
420        builder
421            .push_column_metadata(ColumnMetadata {
422                column_schema: ColumnSchema::new(
423                    "ts",
424                    ConcreteDataType::timestamp_millisecond_datatype(),
425                    false,
426                ),
427                semantic_type: SemanticType::Timestamp,
428                column_id: 1,
429            })
430            .push_column_metadata(ColumnMetadata {
431                column_schema: ColumnSchema::new("f1", ConcreteDataType::float64_datatype(), true),
432                semantic_type: SemanticType::Field,
433                column_id: 2,
434            })
435            .push_column_metadata(ColumnMetadata {
436                column_schema: ColumnSchema::new("f2", ConcreteDataType::string_datatype(), true),
437                semantic_type: SemanticType::Field,
438                column_id: 3,
439            });
440        Arc::new(builder.build().unwrap())
441    }
442
443    fn new_test_memtable(dedup: bool, merge_mode: MergeMode) -> SimpleBulkMemtable {
444        SimpleBulkMemtable::new(1, new_test_metadata(), None, dedup, merge_mode)
445    }
446
447    fn build_key_values(
448        metadata: &RegionMetadataRef,
449        sequence: SequenceNumber,
450        row_values: &[(i64, f64, String)],
451        op_type: OpType,
452    ) -> KeyValues {
453        let column_schemas: Vec<_> = metadata
454            .column_metadatas
455            .iter()
456            .map(column_metadata_to_column_schema)
457            .collect();
458
459        let rows: Vec<_> = row_values
460            .iter()
461            .map(|(ts, f1, f2)| Row {
462                values: vec![
463                    api::v1::Value {
464                        value_data: Some(ValueData::TimestampMillisecondValue(*ts)),
465                    },
466                    api::v1::Value {
467                        value_data: Some(ValueData::F64Value(*f1)),
468                    },
469                    api::v1::Value {
470                        value_data: Some(ValueData::StringValue(f2.clone())),
471                    },
472                ],
473            })
474            .collect();
475        let mutation = Mutation {
476            op_type: op_type as i32,
477            sequence,
478            rows: Some(Rows {
479                schema: column_schemas,
480                rows,
481            }),
482            write_hint: None,
483        };
484        KeyValues::new(metadata, mutation).unwrap()
485    }
486
487    #[test]
488    fn test_write_and_iter() {
489        let memtable = new_test_memtable(false, MergeMode::LastRow);
490        memtable
491            .write(&build_key_values(
492                &memtable.region_metadata,
493                0,
494                &[(1, 1.0, "a".to_string())],
495                OpType::Put,
496            ))
497            .unwrap();
498        memtable
499            .write(&build_key_values(
500                &memtable.region_metadata,
501                1,
502                &[(2, 2.0, "b".to_string())],
503                OpType::Put,
504            ))
505            .unwrap();
506
507        let mut iter = memtable.iter(None, None, None).unwrap();
508        let batch = iter.next().unwrap().unwrap();
509        assert_eq!(2, batch.num_rows());
510        assert_eq!(2, batch.fields().len());
511        let ts_v = batch
512            .timestamps()
513            .as_any()
514            .downcast_ref::<TimestampMillisecondVector>()
515            .unwrap();
516        assert_eq!(Value::Timestamp(Timestamp::new_millisecond(1)), ts_v.get(0));
517        assert_eq!(Value::Timestamp(Timestamp::new_millisecond(2)), ts_v.get(1));
518    }
519
520    #[test]
521    fn test_projection() {
522        let memtable = new_test_memtable(false, MergeMode::LastRow);
523        memtable
524            .write(&build_key_values(
525                &memtable.region_metadata,
526                0,
527                &[(1, 1.0, "a".to_string())],
528                OpType::Put,
529            ))
530            .unwrap();
531
532        let mut iter = memtable.iter(None, None, None).unwrap();
533        let batch = iter.next().unwrap().unwrap();
534        assert_eq!(1, batch.num_rows());
535        assert_eq!(2, batch.fields().len());
536
537        let ts_v = batch
538            .timestamps()
539            .as_any()
540            .downcast_ref::<TimestampMillisecondVector>()
541            .unwrap();
542        assert_eq!(Value::Timestamp(Timestamp::new_millisecond(1)), ts_v.get(0));
543
544        // Only project column 2 (f1)
545        let projection = vec![2];
546        let mut iter = memtable.iter(Some(&projection), None, None).unwrap();
547        let batch = iter.next().unwrap().unwrap();
548
549        assert_eq!(1, batch.num_rows());
550        assert_eq!(1, batch.fields().len()); // only f1
551        assert_eq!(2, batch.fields()[0].column_id);
552    }
553
554    #[test]
555    fn test_dedup() {
556        let memtable = new_test_memtable(true, MergeMode::LastRow);
557        memtable
558            .write(&build_key_values(
559                &memtable.region_metadata,
560                0,
561                &[(1, 1.0, "a".to_string())],
562                OpType::Put,
563            ))
564            .unwrap();
565        memtable
566            .write(&build_key_values(
567                &memtable.region_metadata,
568                1,
569                &[(1, 2.0, "b".to_string())],
570                OpType::Put,
571            ))
572            .unwrap();
573        let mut iter = memtable.iter(None, None, None).unwrap();
574        let batch = iter.next().unwrap().unwrap();
575
576        assert_eq!(1, batch.num_rows()); // deduped to 1 row
577        assert_eq!(2.0, batch.fields()[0].data.get(0).as_f64_lossy().unwrap()); // last write wins
578    }
579
580    #[test]
581    fn test_write_one() {
582        let memtable = new_test_memtable(false, MergeMode::LastRow);
583        let kvs = build_key_values(
584            &memtable.region_metadata,
585            0,
586            &[(1, 1.0, "a".to_string())],
587            OpType::Put,
588        );
589        let kv = kvs.iter().next().unwrap();
590        memtable.write_one(kv).unwrap();
591
592        let mut iter = memtable.iter(None, None, None).unwrap();
593        let batch = iter.next().unwrap().unwrap();
594        assert_eq!(1, batch.num_rows());
595    }
596
597    #[tokio::test]
598    async fn test_write_dedup() {
599        let memtable = new_test_memtable(true, MergeMode::LastRow);
600        let kvs = build_key_values(
601            &memtable.region_metadata,
602            0,
603            &[(1, 1.0, "a".to_string())],
604            OpType::Put,
605        );
606        let kv = kvs.iter().next().unwrap();
607        memtable.write_one(kv).unwrap();
608        memtable.freeze().unwrap();
609
610        let kvs = build_key_values(
611            &memtable.region_metadata,
612            1,
613            &[(1, 1.0, "a".to_string())],
614            OpType::Delete,
615        );
616        let kv = kvs.iter().next().unwrap();
617        memtable.write_one(kv).unwrap();
618
619        let ranges = memtable
620            .ranges(None, PredicateGroup::default(), None)
621            .unwrap();
622        let mut source = vec![];
623        for r in ranges.ranges.values() {
624            source.push(Source::Iter(r.build_iter().unwrap()));
625        }
626
627        let reader = MergeReaderBuilder::from_sources(source)
628            .build()
629            .await
630            .unwrap();
631
632        let mut reader = DedupReader::new(reader, read::dedup::LastRow::new(false));
633        let mut num_rows = 0;
634        while let Some(b) = reader.next_batch().await.unwrap() {
635            num_rows += b.num_rows();
636        }
637        assert_eq!(num_rows, 1);
638    }
639
640    #[tokio::test]
641    async fn test_delete_only() {
642        let memtable = new_test_memtable(true, MergeMode::LastRow);
643        let kvs = build_key_values(
644            &memtable.region_metadata,
645            0,
646            &[(1, 1.0, "a".to_string())],
647            OpType::Delete,
648        );
649        let kv = kvs.iter().next().unwrap();
650        memtable.write_one(kv).unwrap();
651        memtable.freeze().unwrap();
652
653        let ranges = memtable
654            .ranges(None, PredicateGroup::default(), None)
655            .unwrap();
656        let mut source = vec![];
657        for r in ranges.ranges.values() {
658            source.push(Source::Iter(r.build_iter().unwrap()));
659        }
660
661        let reader = MergeReaderBuilder::from_sources(source)
662            .build()
663            .await
664            .unwrap();
665
666        let mut reader = DedupReader::new(reader, read::dedup::LastRow::new(false));
667        let mut num_rows = 0;
668        while let Some(b) = reader.next_batch().await.unwrap() {
669            num_rows += b.num_rows();
670            assert_eq!(b.num_rows(), 1);
671            assert_eq!(b.op_types().get_data(0).unwrap(), OpType::Delete as u8);
672        }
673        assert_eq!(num_rows, 1);
674    }
675
676    #[tokio::test]
677    async fn test_single_range() {
678        let memtable = new_test_memtable(true, MergeMode::LastRow);
679        let kvs = build_key_values(
680            &memtable.region_metadata,
681            0,
682            &[(1, 1.0, "a".to_string())],
683            OpType::Put,
684        );
685        memtable.write_one(kvs.iter().next().unwrap()).unwrap();
686
687        let kvs = build_key_values(
688            &memtable.region_metadata,
689            1,
690            &[(1, 2.0, "b".to_string())],
691            OpType::Put,
692        );
693        memtable.write_one(kvs.iter().next().unwrap()).unwrap();
694        memtable.freeze().unwrap();
695
696        let ranges = memtable
697            .ranges(None, PredicateGroup::default(), None)
698            .unwrap();
699        assert_eq!(ranges.ranges.len(), 1);
700        let range = ranges.ranges.into_values().next().unwrap();
701        let mut reader = range.context.builder.build(None).unwrap();
702
703        let mut num_rows = 0;
704        while let Some(b) = reader.next().transpose().unwrap() {
705            num_rows += b.num_rows();
706            assert_eq!(b.fields()[1].data.get(0).as_string(), Some("b".to_string()));
707        }
708        assert_eq!(num_rows, 1);
709    }
710
711    #[test]
712    fn test_write_bulk() {
713        let memtable = new_test_memtable(false, MergeMode::LastRow);
714        let arrow_schema = memtable.schema().schema.arrow_schema().clone();
715        let arrays = vec![
716            Arc::new(TimestampMillisecondArray::from(vec![1, 2])) as ArrayRef,
717            Arc::new(Float64Array::from(vec![1.0, 2.0])) as ArrayRef,
718            Arc::new(StringArray::from(vec!["a", "b"])) as ArrayRef,
719        ];
720        let rb = DfRecordBatch::try_new(arrow_schema, arrays).unwrap();
721
722        let part = BulkPart {
723            batch: rb,
724            sequence: 1,
725            min_ts: 1,
726            max_ts: 2,
727            timestamp_index: 0,
728            raw_data: None,
729        };
730        memtable.write_bulk(part).unwrap();
731
732        let mut iter = memtable.iter(None, None, None).unwrap();
733        let batch = iter.next().unwrap().unwrap();
734        assert_eq!(2, batch.num_rows());
735
736        let stats = memtable.stats();
737        assert_eq!(1, stats.max_sequence);
738        assert_eq!(2, stats.num_rows);
739        assert_eq!(
740            Some((Timestamp::new_millisecond(1), Timestamp::new_millisecond(2))),
741            stats.time_range
742        );
743
744        let kvs = build_key_values(
745            &memtable.region_metadata,
746            2,
747            &[(3, 3.0, "c".to_string())],
748            OpType::Put,
749        );
750        memtable.write(&kvs).unwrap();
751        let mut iter = memtable.iter(None, None, None).unwrap();
752        let batch = iter.next().unwrap().unwrap();
753        assert_eq!(3, batch.num_rows());
754        assert_eq!(
755            vec![1, 2, 3],
756            batch
757                .timestamps()
758                .as_any()
759                .downcast_ref::<TimestampMillisecondVector>()
760                .unwrap()
761                .iter_data()
762                .map(|t| { t.unwrap().0.value() })
763                .collect::<Vec<_>>()
764        );
765    }
766
767    #[test]
768    fn test_is_empty() {
769        let memtable = new_test_memtable(false, MergeMode::LastRow);
770        assert!(memtable.is_empty());
771
772        memtable
773            .write(&build_key_values(
774                &memtable.region_metadata,
775                0,
776                &[(1, 1.0, "a".to_string())],
777                OpType::Put,
778            ))
779            .unwrap();
780        assert!(!memtable.is_empty());
781    }
782
783    #[test]
784    fn test_stats() {
785        let memtable = new_test_memtable(false, MergeMode::LastRow);
786        let stats = memtable.stats();
787        assert_eq!(0, stats.num_rows);
788        assert!(stats.time_range.is_none());
789
790        memtable
791            .write(&build_key_values(
792                &memtable.region_metadata,
793                0,
794                &[(1, 1.0, "a".to_string())],
795                OpType::Put,
796            ))
797            .unwrap();
798        let stats = memtable.stats();
799        assert_eq!(1, stats.num_rows);
800        assert!(stats.time_range.is_some());
801    }
802
803    #[test]
804    fn test_fork() {
805        let memtable = new_test_memtable(false, MergeMode::LastRow);
806        memtable
807            .write(&build_key_values(
808                &memtable.region_metadata,
809                0,
810                &[(1, 1.0, "a".to_string())],
811                OpType::Put,
812            ))
813            .unwrap();
814
815        let forked = memtable.fork(2, &memtable.region_metadata);
816        assert!(forked.is_empty());
817    }
818
819    #[test]
820    fn test_sequence_filter() {
821        let memtable = new_test_memtable(false, MergeMode::LastRow);
822        memtable
823            .write(&build_key_values(
824                &memtable.region_metadata,
825                0,
826                &[(1, 1.0, "a".to_string())],
827                OpType::Put,
828            ))
829            .unwrap();
830        memtable
831            .write(&build_key_values(
832                &memtable.region_metadata,
833                1,
834                &[(2, 2.0, "b".to_string())],
835                OpType::Put,
836            ))
837            .unwrap();
838
839        // Filter with sequence 0 should only return first write
840        let mut iter = memtable.iter(None, None, Some(0)).unwrap();
841        let batch = iter.next().unwrap().unwrap();
842        assert_eq!(1, batch.num_rows());
843        assert_eq!(1.0, batch.fields()[0].data.get(0).as_f64_lossy().unwrap());
844    }
845
846    fn rb_with_large_string(
847        ts: i64,
848        string_len: i32,
849        region_meta: &RegionMetadataRef,
850    ) -> RecordBatch {
851        let schema = region_meta.schema.arrow_schema().clone();
852        RecordBatch::try_new(
853            schema,
854            vec![
855                Arc::new(StringArray::from_iter_values(
856                    ["a".repeat(string_len as usize).to_string()].into_iter(),
857                )) as ArrayRef,
858                Arc::new(TimestampMillisecondArray::from_iter_values(
859                    [ts].into_iter(),
860                )) as ArrayRef,
861            ],
862        )
863        .unwrap()
864    }
865
866    #[tokio::test]
867    async fn test_write_read_large_string() {
868        let mut builder = RegionMetadataBuilder::new(RegionId::new(123, 456));
869        builder
870            .push_column_metadata(ColumnMetadata {
871                column_schema: ColumnSchema::new("k0", ConcreteDataType::string_datatype(), false),
872                semantic_type: SemanticType::Field,
873                column_id: 0,
874            })
875            .push_column_metadata(ColumnMetadata {
876                column_schema: ColumnSchema::new(
877                    "ts",
878                    ConcreteDataType::timestamp_millisecond_datatype(),
879                    false,
880                ),
881                semantic_type: SemanticType::Timestamp,
882                column_id: 1,
883            })
884            .primary_key(vec![]);
885        let region_meta = Arc::new(builder.build().unwrap());
886        let memtable =
887            SimpleBulkMemtable::new(0, region_meta.clone(), None, true, MergeMode::LastRow);
888        memtable
889            .write_bulk(BulkPart {
890                batch: rb_with_large_string(0, i32::MAX, &region_meta),
891                max_ts: 0,
892                min_ts: 0,
893                sequence: 0,
894                timestamp_index: 1,
895                raw_data: None,
896            })
897            .unwrap();
898
899        memtable.freeze().unwrap();
900        memtable
901            .write_bulk(BulkPart {
902                batch: rb_with_large_string(1, 3, &region_meta),
903                max_ts: 1,
904                min_ts: 1,
905                sequence: 1,
906                timestamp_index: 1,
907                raw_data: None,
908            })
909            .unwrap();
910        let MemtableRanges { ranges, .. } = memtable
911            .ranges(None, PredicateGroup::default(), None)
912            .unwrap();
913        let mut source = if ranges.len() == 1 {
914            let only_range = ranges.into_values().next().unwrap();
915            Source::Iter(only_range.build_iter().unwrap())
916        } else {
917            let sources = ranges
918                .into_values()
919                .map(|r| r.build_iter().map(Source::Iter))
920                .collect::<error::Result<Vec<_>>>()
921                .unwrap();
922            let merge_reader = MergeReaderBuilder::from_sources(sources)
923                .build()
924                .await
925                .unwrap();
926            Source::Reader(Box::new(merge_reader))
927        };
928
929        let mut rows = 0;
930        while let Some(b) = source.next_batch().await.unwrap() {
931            rows += b.num_rows();
932        }
933        assert_eq!(rows, 2);
934    }
935}