mito2/memtable/
simple_bulk_memtable.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#[cfg(any(test, feature = "test"))]
16mod test_only;
17
18use std::collections::HashSet;
19use std::fmt::{Debug, Formatter};
20use std::sync::atomic::{AtomicI64, AtomicU64, AtomicUsize, Ordering};
21use std::sync::{Arc, RwLock};
22use std::time::{Duration, Instant};
23
24use api::v1::OpType;
25use datatypes::vectors::Helper;
26use mito_codec::key_values::KeyValue;
27use rayon::prelude::*;
28use snafu::{OptionExt, ResultExt};
29use store_api::metadata::RegionMetadataRef;
30use store_api::storage::ColumnId;
31
32use crate::flush::WriteBufferManagerRef;
33use crate::memtable::bulk::part::BulkPart;
34use crate::memtable::stats::WriteMetrics;
35use crate::memtable::time_series::Series;
36use crate::memtable::{
37    AllocTracker, BoxedBatchIterator, IterBuilder, KeyValues, MemScanMetrics, Memtable, MemtableId,
38    MemtableRange, MemtableRangeContext, MemtableRanges, MemtableRef, MemtableStats, RangesOptions,
39};
40use crate::metrics::MEMTABLE_ACTIVE_SERIES_COUNT;
41use crate::read::Batch;
42use crate::read::dedup::LastNonNullIter;
43use crate::region::options::MergeMode;
44use crate::{error, metrics};
45
46pub struct SimpleBulkMemtable {
47    id: MemtableId,
48    region_metadata: RegionMetadataRef,
49    alloc_tracker: AllocTracker,
50    max_timestamp: AtomicI64,
51    min_timestamp: AtomicI64,
52    max_sequence: AtomicU64,
53    dedup: bool,
54    merge_mode: MergeMode,
55    num_rows: AtomicUsize,
56    series: RwLock<Series>,
57}
58
59impl Drop for SimpleBulkMemtable {
60    fn drop(&mut self) {
61        MEMTABLE_ACTIVE_SERIES_COUNT.dec();
62    }
63}
64
65impl SimpleBulkMemtable {
66    pub fn new(
67        id: MemtableId,
68        region_metadata: RegionMetadataRef,
69        write_buffer_manager: Option<WriteBufferManagerRef>,
70        dedup: bool,
71        merge_mode: MergeMode,
72    ) -> Self {
73        let dedup = if merge_mode == MergeMode::LastNonNull {
74            false
75        } else {
76            dedup
77        };
78        let series = RwLock::new(Series::with_capacity(&region_metadata, 1024, 8192));
79
80        Self {
81            id,
82            region_metadata,
83            alloc_tracker: AllocTracker::new(write_buffer_manager),
84            max_timestamp: AtomicI64::new(i64::MIN),
85            min_timestamp: AtomicI64::new(i64::MAX),
86            max_sequence: AtomicU64::new(0),
87            dedup,
88            merge_mode,
89            num_rows: AtomicUsize::new(0),
90            series,
91        }
92    }
93
94    fn build_projection(&self, projection: Option<&[ColumnId]>) -> HashSet<ColumnId> {
95        if let Some(projection) = projection {
96            projection.iter().copied().collect()
97        } else {
98            self.region_metadata
99                .field_columns()
100                .map(|c| c.column_id)
101                .collect()
102        }
103    }
104
105    fn write_key_value(&self, kv: KeyValue, stats: &mut WriteMetrics) {
106        let ts = kv.timestamp();
107        let sequence = kv.sequence();
108        let op_type = kv.op_type();
109        let mut series = self.series.write().unwrap();
110        let size = series.push(ts, sequence, op_type, kv.fields());
111        stats.value_bytes += size;
112        // safety: timestamp of kv must be both present and a valid timestamp value.
113        let ts = kv
114            .timestamp()
115            .try_into_timestamp()
116            .unwrap()
117            .unwrap()
118            .value();
119        stats.min_ts = stats.min_ts.min(ts);
120        stats.max_ts = stats.max_ts.max(ts);
121    }
122
123    /// Updates memtable stats.
124    fn update_stats(&self, stats: WriteMetrics) {
125        self.alloc_tracker
126            .on_allocation(stats.key_bytes + stats.value_bytes);
127        self.num_rows.fetch_add(stats.num_rows, Ordering::SeqCst);
128        self.max_timestamp.fetch_max(stats.max_ts, Ordering::SeqCst);
129        self.min_timestamp.fetch_min(stats.min_ts, Ordering::SeqCst);
130        self.max_sequence
131            .fetch_max(stats.max_sequence, Ordering::SeqCst);
132    }
133
134    #[cfg(test)]
135    fn schema(&self) -> &RegionMetadataRef {
136        &self.region_metadata
137    }
138}
139
140impl Debug for SimpleBulkMemtable {
141    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
142        f.debug_struct("SimpleBulkMemtable").finish()
143    }
144}
145
146impl Memtable for SimpleBulkMemtable {
147    fn id(&self) -> MemtableId {
148        self.id
149    }
150
151    fn write(&self, kvs: &KeyValues) -> error::Result<()> {
152        let mut stats = WriteMetrics::default();
153        let max_sequence = kvs.max_sequence();
154        for kv in kvs.iter() {
155            self.write_key_value(kv, &mut stats);
156        }
157        stats.max_sequence = max_sequence;
158        stats.num_rows = kvs.num_rows();
159        self.update_stats(stats);
160        Ok(())
161    }
162
163    fn write_one(&self, kv: KeyValue) -> error::Result<()> {
164        debug_assert_eq!(0, kv.num_primary_keys());
165        let mut stats = WriteMetrics::default();
166        self.write_key_value(kv, &mut stats);
167        stats.num_rows = 1;
168        stats.max_sequence = kv.sequence();
169        self.update_stats(stats);
170        Ok(())
171    }
172
173    fn write_bulk(&self, part: BulkPart) -> error::Result<()> {
174        let rb = &part.batch;
175
176        let ts = Helper::try_into_vector(
177            rb.column_by_name(&self.region_metadata.time_index_column().column_schema.name)
178                .with_context(|| error::InvalidRequestSnafu {
179                    region_id: self.region_metadata.region_id,
180                    reason: "Timestamp not found",
181                })?,
182        )
183        .context(error::ConvertVectorSnafu)?;
184
185        let sequence = part.sequence;
186
187        let fields: Vec<_> = self
188            .region_metadata
189            .field_columns()
190            .map(|f| {
191                let array = rb.column_by_name(&f.column_schema.name).ok_or_else(|| {
192                    error::InvalidRequestSnafu {
193                        region_id: self.region_metadata.region_id,
194                        reason: format!("Column {} not found", f.column_schema.name),
195                    }
196                    .build()
197                })?;
198                Helper::try_into_vector(array).context(error::ConvertVectorSnafu)
199            })
200            .collect::<error::Result<Vec<_>>>()?;
201
202        let mut series = self.series.write().unwrap();
203        let extend_timer = metrics::REGION_WORKER_HANDLE_WRITE_ELAPSED
204            .with_label_values(&["bulk_extend"])
205            .start_timer();
206        series.extend(ts, OpType::Put as u8, sequence, fields)?;
207        extend_timer.observe_duration();
208
209        self.update_stats(WriteMetrics {
210            key_bytes: 0,
211            value_bytes: part.estimated_size(),
212            min_ts: part.min_timestamp,
213            max_ts: part.max_timestamp,
214            num_rows: part.num_rows(),
215            max_sequence: sequence,
216        });
217        Ok(())
218    }
219
220    #[cfg(any(test, feature = "test"))]
221    fn iter(
222        &self,
223        projection: Option<&[ColumnId]>,
224        _predicate: Option<table::predicate::Predicate>,
225        sequence: Option<store_api::storage::SequenceRange>,
226    ) -> error::Result<BoxedBatchIterator> {
227        let iter = self.create_iter(projection, sequence)?.build(None)?;
228
229        if self.merge_mode == MergeMode::LastNonNull {
230            let iter = LastNonNullIter::new(iter);
231            Ok(Box::new(iter))
232        } else {
233            Ok(Box::new(iter))
234        }
235    }
236
237    fn ranges(
238        &self,
239        projection: Option<&[ColumnId]>,
240        options: RangesOptions,
241    ) -> error::Result<MemtableRanges> {
242        let predicate = options.predicate;
243        let sequence = options.sequence;
244        let start_time = Instant::now();
245        let projection = Arc::new(self.build_projection(projection));
246        let values = self.series.read().unwrap().read_to_values();
247        let contexts = values
248            .into_par_iter()
249            .filter_map(|v| {
250                let filtered = match v
251                    .to_batch(&[], &self.region_metadata, &projection, self.dedup)
252                    .and_then(|mut b| {
253                        b.filter_by_sequence(sequence)?;
254                        Ok(b)
255                    }) {
256                    Ok(filtered) => filtered,
257                    Err(e) => {
258                        return Some(Err(e));
259                    }
260                };
261                if filtered.is_empty() {
262                    None
263                } else {
264                    Some(Ok(filtered))
265                }
266            })
267            .map(|result| {
268                result.map(|batch| {
269                    let num_rows = batch.num_rows();
270                    let builder = BatchRangeBuilder {
271                        batch,
272                        merge_mode: self.merge_mode,
273                        scan_cost: start_time.elapsed(),
274                    };
275                    (
276                        num_rows,
277                        Arc::new(MemtableRangeContext::new(
278                            self.id,
279                            Box::new(builder),
280                            predicate.clone(),
281                        )),
282                    )
283                })
284            })
285            .collect::<error::Result<Vec<_>>>()?;
286
287        let ranges = contexts
288            .into_iter()
289            .enumerate()
290            .map(|(idx, (num_rows, context))| (idx, MemtableRange::new(context, num_rows)))
291            .collect();
292
293        Ok(MemtableRanges {
294            ranges,
295            stats: self.stats(),
296        })
297    }
298
299    fn is_empty(&self) -> bool {
300        self.series.read().unwrap().is_empty()
301    }
302
303    fn freeze(&self) -> error::Result<()> {
304        self.series.write().unwrap().freeze(&self.region_metadata);
305        Ok(())
306    }
307
308    fn stats(&self) -> MemtableStats {
309        let estimated_bytes = self.alloc_tracker.bytes_allocated();
310        let num_rows = self.num_rows.load(Ordering::Relaxed);
311        if num_rows == 0 {
312            // no rows ever written
313            return MemtableStats {
314                estimated_bytes,
315                time_range: None,
316                num_rows: 0,
317                num_ranges: 0,
318                max_sequence: 0,
319                series_count: 0,
320            };
321        }
322        let ts_type = self
323            .region_metadata
324            .time_index_column()
325            .column_schema
326            .data_type
327            .clone()
328            .as_timestamp()
329            .expect("Timestamp column must have timestamp type");
330        let max_timestamp = ts_type.create_timestamp(self.max_timestamp.load(Ordering::Relaxed));
331        let min_timestamp = ts_type.create_timestamp(self.min_timestamp.load(Ordering::Relaxed));
332        MemtableStats {
333            estimated_bytes,
334            time_range: Some((min_timestamp, max_timestamp)),
335            num_rows,
336            num_ranges: 1,
337            max_sequence: self.max_sequence.load(Ordering::Relaxed),
338            series_count: 1,
339        }
340    }
341
342    fn fork(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef {
343        Arc::new(Self::new(
344            id,
345            metadata.clone(),
346            self.alloc_tracker.write_buffer_manager(),
347            self.dedup,
348            self.merge_mode,
349        ))
350    }
351}
352
353#[derive(Clone)]
354pub struct BatchRangeBuilder {
355    pub batch: Batch,
356    pub merge_mode: MergeMode,
357    scan_cost: Duration,
358}
359
360impl IterBuilder for BatchRangeBuilder {
361    fn build(&self, metrics: Option<MemScanMetrics>) -> error::Result<BoxedBatchIterator> {
362        let batch = self.batch.clone();
363        if let Some(metrics) = metrics {
364            let inner = crate::memtable::MemScanMetricsData {
365                total_series: 1,
366                num_rows: batch.num_rows(),
367                num_batches: 1,
368                scan_cost: self.scan_cost,
369            };
370            metrics.merge_inner(&inner);
371        }
372
373        let iter = Iter {
374            batch: Some(Ok(batch)),
375        };
376
377        if self.merge_mode == MergeMode::LastNonNull {
378            Ok(Box::new(LastNonNullIter::new(iter)))
379        } else {
380            Ok(Box::new(iter))
381        }
382    }
383}
384
385struct Iter {
386    batch: Option<error::Result<Batch>>,
387}
388
389impl Iterator for Iter {
390    type Item = error::Result<Batch>;
391
392    fn next(&mut self) -> Option<Self::Item> {
393        self.batch.take()
394    }
395}
396
397#[cfg(test)]
398mod tests {
399    use std::sync::Arc;
400
401    use api::v1::helper::row;
402    use api::v1::value::ValueData;
403    use api::v1::{Mutation, OpType, Rows, SemanticType};
404    use common_recordbatch::DfRecordBatch;
405    use common_time::Timestamp;
406    use datatypes::arrow::array::{ArrayRef, Float64Array, RecordBatch, TimestampMillisecondArray};
407    use datatypes::arrow_array::StringArray;
408    use datatypes::data_type::ConcreteDataType;
409    use datatypes::prelude::{ScalarVector, Vector};
410    use datatypes::schema::ColumnSchema;
411    use datatypes::value::Value;
412    use datatypes::vectors::TimestampMillisecondVector;
413    use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
414    use store_api::storage::{RegionId, SequenceNumber, SequenceRange};
415
416    use super::*;
417    use crate::read;
418    use crate::read::dedup::DedupReader;
419    use crate::read::merge::MergeReaderBuilder;
420    use crate::read::{BatchReader, Source};
421    use crate::region::options::MergeMode;
422    use crate::test_util::column_metadata_to_column_schema;
423
424    fn new_test_metadata() -> RegionMetadataRef {
425        let mut builder = RegionMetadataBuilder::new(1.into());
426        builder
427            .push_column_metadata(ColumnMetadata {
428                column_schema: ColumnSchema::new(
429                    "ts",
430                    ConcreteDataType::timestamp_millisecond_datatype(),
431                    false,
432                ),
433                semantic_type: SemanticType::Timestamp,
434                column_id: 1,
435            })
436            .push_column_metadata(ColumnMetadata {
437                column_schema: ColumnSchema::new("f1", ConcreteDataType::float64_datatype(), true),
438                semantic_type: SemanticType::Field,
439                column_id: 2,
440            })
441            .push_column_metadata(ColumnMetadata {
442                column_schema: ColumnSchema::new("f2", ConcreteDataType::string_datatype(), true),
443                semantic_type: SemanticType::Field,
444                column_id: 3,
445            });
446        Arc::new(builder.build().unwrap())
447    }
448
449    fn new_test_memtable(dedup: bool, merge_mode: MergeMode) -> SimpleBulkMemtable {
450        SimpleBulkMemtable::new(1, new_test_metadata(), None, dedup, merge_mode)
451    }
452
453    fn build_key_values(
454        metadata: &RegionMetadataRef,
455        sequence: SequenceNumber,
456        row_values: &[(i64, f64, String)],
457        op_type: OpType,
458    ) -> KeyValues {
459        let column_schemas: Vec<_> = metadata
460            .column_metadatas
461            .iter()
462            .map(column_metadata_to_column_schema)
463            .collect();
464
465        let rows: Vec<_> = row_values
466            .iter()
467            .map(|(ts, f1, f2)| {
468                row(vec![
469                    ValueData::TimestampMillisecondValue(*ts),
470                    ValueData::F64Value(*f1),
471                    ValueData::StringValue(f2.clone()),
472                ])
473            })
474            .collect();
475        let mutation = Mutation {
476            op_type: op_type as i32,
477            sequence,
478            rows: Some(Rows {
479                schema: column_schemas,
480                rows,
481            }),
482            write_hint: None,
483        };
484        KeyValues::new(metadata, mutation).unwrap()
485    }
486
487    #[test]
488    fn test_write_and_iter() {
489        let memtable = new_test_memtable(false, MergeMode::LastRow);
490        memtable
491            .write(&build_key_values(
492                &memtable.region_metadata,
493                0,
494                &[(1, 1.0, "a".to_string())],
495                OpType::Put,
496            ))
497            .unwrap();
498        memtable
499            .write(&build_key_values(
500                &memtable.region_metadata,
501                1,
502                &[(2, 2.0, "b".to_string())],
503                OpType::Put,
504            ))
505            .unwrap();
506
507        let mut iter = memtable.iter(None, None, None).unwrap();
508        let batch = iter.next().unwrap().unwrap();
509        assert_eq!(2, batch.num_rows());
510        assert_eq!(2, batch.fields().len());
511        let ts_v = batch
512            .timestamps()
513            .as_any()
514            .downcast_ref::<TimestampMillisecondVector>()
515            .unwrap();
516        assert_eq!(Value::Timestamp(Timestamp::new_millisecond(1)), ts_v.get(0));
517        assert_eq!(Value::Timestamp(Timestamp::new_millisecond(2)), ts_v.get(1));
518    }
519
520    #[test]
521    fn test_projection() {
522        let memtable = new_test_memtable(false, MergeMode::LastRow);
523        memtable
524            .write(&build_key_values(
525                &memtable.region_metadata,
526                0,
527                &[(1, 1.0, "a".to_string())],
528                OpType::Put,
529            ))
530            .unwrap();
531
532        let mut iter = memtable.iter(None, None, None).unwrap();
533        let batch = iter.next().unwrap().unwrap();
534        assert_eq!(1, batch.num_rows());
535        assert_eq!(2, batch.fields().len());
536
537        let ts_v = batch
538            .timestamps()
539            .as_any()
540            .downcast_ref::<TimestampMillisecondVector>()
541            .unwrap();
542        assert_eq!(Value::Timestamp(Timestamp::new_millisecond(1)), ts_v.get(0));
543
544        // Only project column 2 (f1)
545        let projection = vec![2];
546        let mut iter = memtable.iter(Some(&projection), None, None).unwrap();
547        let batch = iter.next().unwrap().unwrap();
548
549        assert_eq!(1, batch.num_rows());
550        assert_eq!(1, batch.fields().len()); // only f1
551        assert_eq!(2, batch.fields()[0].column_id);
552    }
553
554    #[test]
555    fn test_dedup() {
556        let memtable = new_test_memtable(true, MergeMode::LastRow);
557        memtable
558            .write(&build_key_values(
559                &memtable.region_metadata,
560                0,
561                &[(1, 1.0, "a".to_string())],
562                OpType::Put,
563            ))
564            .unwrap();
565        memtable
566            .write(&build_key_values(
567                &memtable.region_metadata,
568                1,
569                &[(1, 2.0, "b".to_string())],
570                OpType::Put,
571            ))
572            .unwrap();
573        let mut iter = memtable.iter(None, None, None).unwrap();
574        let batch = iter.next().unwrap().unwrap();
575
576        assert_eq!(1, batch.num_rows()); // deduped to 1 row
577        assert_eq!(2.0, batch.fields()[0].data.get(0).as_f64_lossy().unwrap()); // last write wins
578    }
579
580    #[test]
581    fn test_write_one() {
582        let memtable = new_test_memtable(false, MergeMode::LastRow);
583        let kvs = build_key_values(
584            &memtable.region_metadata,
585            0,
586            &[(1, 1.0, "a".to_string())],
587            OpType::Put,
588        );
589        let kv = kvs.iter().next().unwrap();
590        memtable.write_one(kv).unwrap();
591
592        let mut iter = memtable.iter(None, None, None).unwrap();
593        let batch = iter.next().unwrap().unwrap();
594        assert_eq!(1, batch.num_rows());
595    }
596
597    #[tokio::test]
598    async fn test_write_dedup() {
599        let memtable = new_test_memtable(true, MergeMode::LastRow);
600        let kvs = build_key_values(
601            &memtable.region_metadata,
602            0,
603            &[(1, 1.0, "a".to_string())],
604            OpType::Put,
605        );
606        let kv = kvs.iter().next().unwrap();
607        memtable.write_one(kv).unwrap();
608        memtable.freeze().unwrap();
609
610        let kvs = build_key_values(
611            &memtable.region_metadata,
612            1,
613            &[(1, 1.0, "a".to_string())],
614            OpType::Delete,
615        );
616        let kv = kvs.iter().next().unwrap();
617        memtable.write_one(kv).unwrap();
618
619        let ranges = memtable.ranges(None, RangesOptions::default()).unwrap();
620        let mut source = vec![];
621        for r in ranges.ranges.values() {
622            source.push(Source::Iter(r.build_iter().unwrap()));
623        }
624
625        let reader = MergeReaderBuilder::from_sources(source)
626            .build()
627            .await
628            .unwrap();
629
630        let mut reader = DedupReader::new(reader, read::dedup::LastRow::new(false));
631        let mut num_rows = 0;
632        while let Some(b) = reader.next_batch().await.unwrap() {
633            num_rows += b.num_rows();
634        }
635        assert_eq!(num_rows, 1);
636    }
637
638    #[tokio::test]
639    async fn test_delete_only() {
640        let memtable = new_test_memtable(true, MergeMode::LastRow);
641        let kvs = build_key_values(
642            &memtable.region_metadata,
643            0,
644            &[(1, 1.0, "a".to_string())],
645            OpType::Delete,
646        );
647        let kv = kvs.iter().next().unwrap();
648        memtable.write_one(kv).unwrap();
649        memtable.freeze().unwrap();
650
651        let ranges = memtable.ranges(None, RangesOptions::default()).unwrap();
652        let mut source = vec![];
653        for r in ranges.ranges.values() {
654            source.push(Source::Iter(r.build_iter().unwrap()));
655        }
656
657        let reader = MergeReaderBuilder::from_sources(source)
658            .build()
659            .await
660            .unwrap();
661
662        let mut reader = DedupReader::new(reader, read::dedup::LastRow::new(false));
663        let mut num_rows = 0;
664        while let Some(b) = reader.next_batch().await.unwrap() {
665            num_rows += b.num_rows();
666            assert_eq!(b.num_rows(), 1);
667            assert_eq!(b.op_types().get_data(0).unwrap(), OpType::Delete as u8);
668        }
669        assert_eq!(num_rows, 1);
670    }
671
672    #[tokio::test]
673    async fn test_single_range() {
674        let memtable = new_test_memtable(true, MergeMode::LastRow);
675        let kvs = build_key_values(
676            &memtable.region_metadata,
677            0,
678            &[(1, 1.0, "a".to_string())],
679            OpType::Put,
680        );
681        memtable.write_one(kvs.iter().next().unwrap()).unwrap();
682
683        let kvs = build_key_values(
684            &memtable.region_metadata,
685            1,
686            &[(1, 2.0, "b".to_string())],
687            OpType::Put,
688        );
689        memtable.write_one(kvs.iter().next().unwrap()).unwrap();
690        memtable.freeze().unwrap();
691
692        let ranges = memtable.ranges(None, RangesOptions::default()).unwrap();
693        assert_eq!(ranges.ranges.len(), 1);
694        let range = ranges.ranges.into_values().next().unwrap();
695        let mut reader = range.context.builder.build(None).unwrap();
696
697        let mut num_rows = 0;
698        while let Some(b) = reader.next().transpose().unwrap() {
699            num_rows += b.num_rows();
700            assert_eq!(b.fields()[1].data.get(0).as_string(), Some("b".to_string()));
701        }
702        assert_eq!(num_rows, 1);
703    }
704
705    #[test]
706    fn test_write_bulk() {
707        let memtable = new_test_memtable(false, MergeMode::LastRow);
708        let arrow_schema = memtable.schema().schema.arrow_schema().clone();
709        let arrays = vec![
710            Arc::new(TimestampMillisecondArray::from(vec![1, 2])) as ArrayRef,
711            Arc::new(Float64Array::from(vec![1.0, 2.0])) as ArrayRef,
712            Arc::new(StringArray::from(vec!["a", "b"])) as ArrayRef,
713        ];
714        let rb = DfRecordBatch::try_new(arrow_schema, arrays).unwrap();
715
716        let part = BulkPart {
717            batch: rb,
718            sequence: 1,
719            min_timestamp: 1,
720            max_timestamp: 2,
721            timestamp_index: 0,
722            raw_data: None,
723        };
724        memtable.write_bulk(part).unwrap();
725
726        let mut iter = memtable.iter(None, None, None).unwrap();
727        let batch = iter.next().unwrap().unwrap();
728        assert_eq!(2, batch.num_rows());
729
730        let stats = memtable.stats();
731        assert_eq!(1, stats.max_sequence);
732        assert_eq!(2, stats.num_rows);
733        assert_eq!(
734            Some((Timestamp::new_millisecond(1), Timestamp::new_millisecond(2))),
735            stats.time_range
736        );
737
738        let kvs = build_key_values(
739            &memtable.region_metadata,
740            2,
741            &[(3, 3.0, "c".to_string())],
742            OpType::Put,
743        );
744        memtable.write(&kvs).unwrap();
745        let mut iter = memtable.iter(None, None, None).unwrap();
746        let batch = iter.next().unwrap().unwrap();
747        assert_eq!(3, batch.num_rows());
748        assert_eq!(
749            vec![1, 2, 3],
750            batch
751                .timestamps()
752                .as_any()
753                .downcast_ref::<TimestampMillisecondVector>()
754                .unwrap()
755                .iter_data()
756                .map(|t| { t.unwrap().0.value() })
757                .collect::<Vec<_>>()
758        );
759    }
760
761    #[test]
762    fn test_is_empty() {
763        let memtable = new_test_memtable(false, MergeMode::LastRow);
764        assert!(memtable.is_empty());
765
766        memtable
767            .write(&build_key_values(
768                &memtable.region_metadata,
769                0,
770                &[(1, 1.0, "a".to_string())],
771                OpType::Put,
772            ))
773            .unwrap();
774        assert!(!memtable.is_empty());
775    }
776
777    #[test]
778    fn test_stats() {
779        let memtable = new_test_memtable(false, MergeMode::LastRow);
780        let stats = memtable.stats();
781        assert_eq!(0, stats.num_rows);
782        assert!(stats.time_range.is_none());
783
784        memtable
785            .write(&build_key_values(
786                &memtable.region_metadata,
787                0,
788                &[(1, 1.0, "a".to_string())],
789                OpType::Put,
790            ))
791            .unwrap();
792        let stats = memtable.stats();
793        assert_eq!(1, stats.num_rows);
794        assert!(stats.time_range.is_some());
795    }
796
797    #[test]
798    fn test_fork() {
799        let memtable = new_test_memtable(false, MergeMode::LastRow);
800        memtable
801            .write(&build_key_values(
802                &memtable.region_metadata,
803                0,
804                &[(1, 1.0, "a".to_string())],
805                OpType::Put,
806            ))
807            .unwrap();
808
809        let forked = memtable.fork(2, &memtable.region_metadata);
810        assert!(forked.is_empty());
811    }
812
813    #[test]
814    fn test_sequence_filter() {
815        let memtable = new_test_memtable(false, MergeMode::LastRow);
816        memtable
817            .write(&build_key_values(
818                &memtable.region_metadata,
819                0,
820                &[(1, 1.0, "a".to_string())],
821                OpType::Put,
822            ))
823            .unwrap();
824        memtable
825            .write(&build_key_values(
826                &memtable.region_metadata,
827                1,
828                &[(2, 2.0, "b".to_string())],
829                OpType::Put,
830            ))
831            .unwrap();
832
833        // Filter with sequence 0 should only return first write
834        let mut iter = memtable
835            .iter(None, None, Some(SequenceRange::LtEq { max: 0 }))
836            .unwrap();
837        let batch = iter.next().unwrap().unwrap();
838        assert_eq!(1, batch.num_rows());
839        assert_eq!(1.0, batch.fields()[0].data.get(0).as_f64_lossy().unwrap());
840    }
841
842    fn rb_with_large_string(
843        ts: i64,
844        string_len: i32,
845        region_meta: &RegionMetadataRef,
846    ) -> RecordBatch {
847        let schema = region_meta.schema.arrow_schema().clone();
848        RecordBatch::try_new(
849            schema,
850            vec![
851                Arc::new(StringArray::from_iter_values(
852                    ["a".repeat(string_len as usize).clone()].into_iter(),
853                )) as ArrayRef,
854                Arc::new(TimestampMillisecondArray::from_iter_values(
855                    [ts].into_iter(),
856                )) as ArrayRef,
857            ],
858        )
859        .unwrap()
860    }
861
862    #[tokio::test]
863    async fn test_write_read_large_string() {
864        let mut builder = RegionMetadataBuilder::new(RegionId::new(123, 456));
865        builder
866            .push_column_metadata(ColumnMetadata {
867                column_schema: ColumnSchema::new("k0", ConcreteDataType::string_datatype(), false),
868                semantic_type: SemanticType::Field,
869                column_id: 0,
870            })
871            .push_column_metadata(ColumnMetadata {
872                column_schema: ColumnSchema::new(
873                    "ts",
874                    ConcreteDataType::timestamp_millisecond_datatype(),
875                    false,
876                ),
877                semantic_type: SemanticType::Timestamp,
878                column_id: 1,
879            })
880            .primary_key(vec![]);
881        let region_meta = Arc::new(builder.build().unwrap());
882        let memtable =
883            SimpleBulkMemtable::new(0, region_meta.clone(), None, true, MergeMode::LastRow);
884        memtable
885            .write_bulk(BulkPart {
886                batch: rb_with_large_string(0, i32::MAX, &region_meta),
887                max_timestamp: 0,
888                min_timestamp: 0,
889                sequence: 0,
890                timestamp_index: 1,
891                raw_data: None,
892            })
893            .unwrap();
894
895        memtable.freeze().unwrap();
896        memtable
897            .write_bulk(BulkPart {
898                batch: rb_with_large_string(1, 3, &region_meta),
899                max_timestamp: 1,
900                min_timestamp: 1,
901                sequence: 1,
902                timestamp_index: 1,
903                raw_data: None,
904            })
905            .unwrap();
906        let MemtableRanges { ranges, .. } =
907            memtable.ranges(None, RangesOptions::default()).unwrap();
908        let mut source = if ranges.len() == 1 {
909            let only_range = ranges.into_values().next().unwrap();
910            Source::Iter(only_range.build_iter().unwrap())
911        } else {
912            let sources = ranges
913                .into_values()
914                .map(|r| r.build_iter().map(Source::Iter))
915                .collect::<error::Result<Vec<_>>>()
916                .unwrap();
917            let merge_reader = MergeReaderBuilder::from_sources(sources)
918                .build()
919                .await
920                .unwrap();
921            Source::Reader(Box::new(merge_reader))
922        };
923
924        let mut rows = 0;
925        while let Some(b) = source.next_batch().await.unwrap() {
926            rows += b.num_rows();
927        }
928        assert_eq!(rows, 2);
929    }
930}