mito2/memtable/
simple_bulk_memtable.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#[cfg(any(test, feature = "test"))]
16mod test_only;
17
18use std::collections::HashSet;
19use std::fmt::{Debug, Formatter};
20use std::sync::atomic::{AtomicI64, AtomicU64, AtomicUsize, Ordering};
21use std::sync::{Arc, RwLock};
22use std::time::{Duration, Instant};
23
24use api::v1::OpType;
25use datatypes::vectors::Helper;
26use mito_codec::key_values::KeyValue;
27use rayon::prelude::*;
28use snafu::{OptionExt, ResultExt};
29use store_api::metadata::RegionMetadataRef;
30use store_api::storage::{ColumnId, SequenceNumber};
31
32use crate::flush::WriteBufferManagerRef;
33use crate::memtable::bulk::part::BulkPart;
34use crate::memtable::stats::WriteMetrics;
35use crate::memtable::time_series::Series;
36use crate::memtable::{
37    AllocTracker, BoxedBatchIterator, IterBuilder, KeyValues, MemScanMetrics, Memtable, MemtableId,
38    MemtableRange, MemtableRangeContext, MemtableRanges, MemtableRef, MemtableStats,
39};
40use crate::metrics::MEMTABLE_ACTIVE_SERIES_COUNT;
41use crate::read::Batch;
42use crate::read::dedup::LastNonNullIter;
43use crate::read::scan_region::PredicateGroup;
44use crate::region::options::MergeMode;
45use crate::{error, metrics};
46
47pub struct SimpleBulkMemtable {
48    id: MemtableId,
49    region_metadata: RegionMetadataRef,
50    alloc_tracker: AllocTracker,
51    max_timestamp: AtomicI64,
52    min_timestamp: AtomicI64,
53    max_sequence: AtomicU64,
54    dedup: bool,
55    merge_mode: MergeMode,
56    num_rows: AtomicUsize,
57    series: RwLock<Series>,
58}
59
60impl Drop for SimpleBulkMemtable {
61    fn drop(&mut self) {
62        MEMTABLE_ACTIVE_SERIES_COUNT.dec();
63    }
64}
65
66impl SimpleBulkMemtable {
67    pub fn new(
68        id: MemtableId,
69        region_metadata: RegionMetadataRef,
70        write_buffer_manager: Option<WriteBufferManagerRef>,
71        dedup: bool,
72        merge_mode: MergeMode,
73    ) -> Self {
74        let dedup = if merge_mode == MergeMode::LastNonNull {
75            false
76        } else {
77            dedup
78        };
79        let series = RwLock::new(Series::with_capacity(&region_metadata, 1024, 8192));
80
81        Self {
82            id,
83            region_metadata,
84            alloc_tracker: AllocTracker::new(write_buffer_manager),
85            max_timestamp: AtomicI64::new(i64::MIN),
86            min_timestamp: AtomicI64::new(i64::MAX),
87            max_sequence: AtomicU64::new(0),
88            dedup,
89            merge_mode,
90            num_rows: AtomicUsize::new(0),
91            series,
92        }
93    }
94
95    fn build_projection(&self, projection: Option<&[ColumnId]>) -> HashSet<ColumnId> {
96        if let Some(projection) = projection {
97            projection.iter().copied().collect()
98        } else {
99            self.region_metadata
100                .field_columns()
101                .map(|c| c.column_id)
102                .collect()
103        }
104    }
105
106    fn write_key_value(&self, kv: KeyValue, stats: &mut WriteMetrics) {
107        let ts = kv.timestamp();
108        let sequence = kv.sequence();
109        let op_type = kv.op_type();
110        let mut series = self.series.write().unwrap();
111        let size = series.push(ts, sequence, op_type, kv.fields());
112        stats.value_bytes += size;
113        // safety: timestamp of kv must be both present and a valid timestamp value.
114        let ts = kv.timestamp().as_timestamp().unwrap().unwrap().value();
115        stats.min_ts = stats.min_ts.min(ts);
116        stats.max_ts = stats.max_ts.max(ts);
117    }
118
119    /// Updates memtable stats.
120    fn update_stats(&self, stats: WriteMetrics) {
121        self.alloc_tracker
122            .on_allocation(stats.key_bytes + stats.value_bytes);
123        self.num_rows.fetch_add(stats.num_rows, Ordering::SeqCst);
124        self.max_timestamp.fetch_max(stats.max_ts, Ordering::SeqCst);
125        self.min_timestamp.fetch_min(stats.min_ts, Ordering::SeqCst);
126        self.max_sequence
127            .fetch_max(stats.max_sequence, Ordering::SeqCst);
128    }
129
130    #[cfg(test)]
131    fn schema(&self) -> &RegionMetadataRef {
132        &self.region_metadata
133    }
134}
135
136impl Debug for SimpleBulkMemtable {
137    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
138        f.debug_struct("SimpleBulkMemtable").finish()
139    }
140}
141
142impl Memtable for SimpleBulkMemtable {
143    fn id(&self) -> MemtableId {
144        self.id
145    }
146
147    fn write(&self, kvs: &KeyValues) -> error::Result<()> {
148        let mut stats = WriteMetrics::default();
149        let max_sequence = kvs.max_sequence();
150        for kv in kvs.iter() {
151            self.write_key_value(kv, &mut stats);
152        }
153        stats.max_sequence = max_sequence;
154        stats.num_rows = kvs.num_rows();
155        self.update_stats(stats);
156        Ok(())
157    }
158
159    fn write_one(&self, kv: KeyValue) -> error::Result<()> {
160        debug_assert_eq!(0, kv.num_primary_keys());
161        let mut stats = WriteMetrics::default();
162        self.write_key_value(kv, &mut stats);
163        stats.num_rows = 1;
164        stats.max_sequence = kv.sequence();
165        self.update_stats(stats);
166        Ok(())
167    }
168
169    fn write_bulk(&self, part: BulkPart) -> error::Result<()> {
170        let rb = &part.batch;
171
172        let ts = Helper::try_into_vector(
173            rb.column_by_name(&self.region_metadata.time_index_column().column_schema.name)
174                .with_context(|| error::InvalidRequestSnafu {
175                    region_id: self.region_metadata.region_id,
176                    reason: "Timestamp not found",
177                })?,
178        )
179        .context(error::ConvertVectorSnafu)?;
180
181        let sequence = part.sequence;
182
183        let fields: Vec<_> = self
184            .region_metadata
185            .field_columns()
186            .map(|f| {
187                let array = rb.column_by_name(&f.column_schema.name).ok_or_else(|| {
188                    error::InvalidRequestSnafu {
189                        region_id: self.region_metadata.region_id,
190                        reason: format!("Column {} not found", f.column_schema.name),
191                    }
192                    .build()
193                })?;
194                Helper::try_into_vector(array).context(error::ConvertVectorSnafu)
195            })
196            .collect::<error::Result<Vec<_>>>()?;
197
198        let mut series = self.series.write().unwrap();
199        let extend_timer = metrics::REGION_WORKER_HANDLE_WRITE_ELAPSED
200            .with_label_values(&["bulk_extend"])
201            .start_timer();
202        series.extend(ts, OpType::Put as u8, sequence, fields)?;
203        extend_timer.observe_duration();
204
205        self.update_stats(WriteMetrics {
206            key_bytes: 0,
207            value_bytes: part.estimated_size(),
208            min_ts: part.min_timestamp,
209            max_ts: part.max_timestamp,
210            num_rows: part.num_rows(),
211            max_sequence: sequence,
212        });
213        Ok(())
214    }
215
216    #[cfg(any(test, feature = "test"))]
217    fn iter(
218        &self,
219        projection: Option<&[ColumnId]>,
220        _predicate: Option<table::predicate::Predicate>,
221        sequence: Option<SequenceNumber>,
222    ) -> error::Result<BoxedBatchIterator> {
223        let iter = self.create_iter(projection, sequence)?.build(None)?;
224
225        if self.merge_mode == MergeMode::LastNonNull {
226            let iter = LastNonNullIter::new(iter);
227            Ok(Box::new(iter))
228        } else {
229            Ok(Box::new(iter))
230        }
231    }
232
233    fn ranges(
234        &self,
235        projection: Option<&[ColumnId]>,
236        predicate: PredicateGroup,
237        sequence: Option<SequenceNumber>,
238        _for_flush: bool,
239    ) -> error::Result<MemtableRanges> {
240        let start_time = Instant::now();
241        let projection = Arc::new(self.build_projection(projection));
242        let values = self.series.read().unwrap().read_to_values();
243        let contexts = values
244            .into_par_iter()
245            .filter_map(|v| {
246                let filtered = match v
247                    .to_batch(&[], &self.region_metadata, &projection, self.dedup)
248                    .and_then(|mut b| {
249                        b.filter_by_sequence(sequence)?;
250                        Ok(b)
251                    }) {
252                    Ok(filtered) => filtered,
253                    Err(e) => {
254                        return Some(Err(e));
255                    }
256                };
257                if filtered.is_empty() {
258                    None
259                } else {
260                    Some(Ok(filtered))
261                }
262            })
263            .map(|result| {
264                result.map(|batch| {
265                    let num_rows = batch.num_rows();
266                    let builder = BatchRangeBuilder {
267                        batch,
268                        merge_mode: self.merge_mode,
269                        scan_cost: start_time.elapsed(),
270                    };
271                    (
272                        num_rows,
273                        Arc::new(MemtableRangeContext::new(
274                            self.id,
275                            Box::new(builder),
276                            predicate.clone(),
277                        )),
278                    )
279                })
280            })
281            .collect::<error::Result<Vec<_>>>()?;
282
283        let ranges = contexts
284            .into_iter()
285            .enumerate()
286            .map(|(idx, (num_rows, context))| (idx, MemtableRange::new(context, num_rows)))
287            .collect();
288
289        Ok(MemtableRanges {
290            ranges,
291            stats: self.stats(),
292        })
293    }
294
295    fn is_empty(&self) -> bool {
296        self.series.read().unwrap().is_empty()
297    }
298
299    fn freeze(&self) -> error::Result<()> {
300        self.series.write().unwrap().freeze(&self.region_metadata);
301        Ok(())
302    }
303
304    fn stats(&self) -> MemtableStats {
305        let estimated_bytes = self.alloc_tracker.bytes_allocated();
306        let num_rows = self.num_rows.load(Ordering::Relaxed);
307        if num_rows == 0 {
308            // no rows ever written
309            return MemtableStats {
310                estimated_bytes,
311                time_range: None,
312                num_rows: 0,
313                num_ranges: 0,
314                max_sequence: 0,
315                series_count: 0,
316            };
317        }
318        let ts_type = self
319            .region_metadata
320            .time_index_column()
321            .column_schema
322            .data_type
323            .clone()
324            .as_timestamp()
325            .expect("Timestamp column must have timestamp type");
326        let max_timestamp = ts_type.create_timestamp(self.max_timestamp.load(Ordering::Relaxed));
327        let min_timestamp = ts_type.create_timestamp(self.min_timestamp.load(Ordering::Relaxed));
328        MemtableStats {
329            estimated_bytes,
330            time_range: Some((min_timestamp, max_timestamp)),
331            num_rows,
332            num_ranges: 1,
333            max_sequence: self.max_sequence.load(Ordering::Relaxed),
334            series_count: 1,
335        }
336    }
337
338    fn fork(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef {
339        Arc::new(Self::new(
340            id,
341            metadata.clone(),
342            self.alloc_tracker.write_buffer_manager(),
343            self.dedup,
344            self.merge_mode,
345        ))
346    }
347}
348
349#[derive(Clone)]
350pub struct BatchRangeBuilder {
351    pub batch: Batch,
352    pub merge_mode: MergeMode,
353    scan_cost: Duration,
354}
355
356impl IterBuilder for BatchRangeBuilder {
357    fn build(&self, metrics: Option<MemScanMetrics>) -> error::Result<BoxedBatchIterator> {
358        let batch = self.batch.clone();
359        if let Some(metrics) = metrics {
360            let inner = crate::memtable::MemScanMetricsData {
361                total_series: 1,
362                num_rows: batch.num_rows(),
363                num_batches: 1,
364                scan_cost: self.scan_cost,
365            };
366            metrics.merge_inner(&inner);
367        }
368
369        let iter = Iter {
370            batch: Some(Ok(batch)),
371        };
372
373        if self.merge_mode == MergeMode::LastNonNull {
374            Ok(Box::new(LastNonNullIter::new(iter)))
375        } else {
376            Ok(Box::new(iter))
377        }
378    }
379}
380
381struct Iter {
382    batch: Option<error::Result<Batch>>,
383}
384
385impl Iterator for Iter {
386    type Item = error::Result<Batch>;
387
388    fn next(&mut self) -> Option<Self::Item> {
389        self.batch.take()
390    }
391}
392
393#[cfg(test)]
394mod tests {
395    use std::sync::Arc;
396
397    use api::v1::helper::row;
398    use api::v1::value::ValueData;
399    use api::v1::{Mutation, OpType, Rows, SemanticType};
400    use common_recordbatch::DfRecordBatch;
401    use common_time::Timestamp;
402    use datatypes::arrow::array::{ArrayRef, Float64Array, RecordBatch, TimestampMillisecondArray};
403    use datatypes::arrow_array::StringArray;
404    use datatypes::data_type::ConcreteDataType;
405    use datatypes::prelude::{ScalarVector, Vector};
406    use datatypes::schema::ColumnSchema;
407    use datatypes::value::Value;
408    use datatypes::vectors::TimestampMillisecondVector;
409    use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
410    use store_api::storage::{RegionId, SequenceNumber};
411
412    use super::*;
413    use crate::read;
414    use crate::read::dedup::DedupReader;
415    use crate::read::merge::MergeReaderBuilder;
416    use crate::read::{BatchReader, Source};
417    use crate::region::options::MergeMode;
418    use crate::test_util::column_metadata_to_column_schema;
419
420    fn new_test_metadata() -> RegionMetadataRef {
421        let mut builder = RegionMetadataBuilder::new(1.into());
422        builder
423            .push_column_metadata(ColumnMetadata {
424                column_schema: ColumnSchema::new(
425                    "ts",
426                    ConcreteDataType::timestamp_millisecond_datatype(),
427                    false,
428                ),
429                semantic_type: SemanticType::Timestamp,
430                column_id: 1,
431            })
432            .push_column_metadata(ColumnMetadata {
433                column_schema: ColumnSchema::new("f1", ConcreteDataType::float64_datatype(), true),
434                semantic_type: SemanticType::Field,
435                column_id: 2,
436            })
437            .push_column_metadata(ColumnMetadata {
438                column_schema: ColumnSchema::new("f2", ConcreteDataType::string_datatype(), true),
439                semantic_type: SemanticType::Field,
440                column_id: 3,
441            });
442        Arc::new(builder.build().unwrap())
443    }
444
445    fn new_test_memtable(dedup: bool, merge_mode: MergeMode) -> SimpleBulkMemtable {
446        SimpleBulkMemtable::new(1, new_test_metadata(), None, dedup, merge_mode)
447    }
448
449    fn build_key_values(
450        metadata: &RegionMetadataRef,
451        sequence: SequenceNumber,
452        row_values: &[(i64, f64, String)],
453        op_type: OpType,
454    ) -> KeyValues {
455        let column_schemas: Vec<_> = metadata
456            .column_metadatas
457            .iter()
458            .map(column_metadata_to_column_schema)
459            .collect();
460
461        let rows: Vec<_> = row_values
462            .iter()
463            .map(|(ts, f1, f2)| {
464                row(vec![
465                    ValueData::TimestampMillisecondValue(*ts),
466                    ValueData::F64Value(*f1),
467                    ValueData::StringValue(f2.clone()),
468                ])
469            })
470            .collect();
471        let mutation = Mutation {
472            op_type: op_type as i32,
473            sequence,
474            rows: Some(Rows {
475                schema: column_schemas,
476                rows,
477            }),
478            write_hint: None,
479        };
480        KeyValues::new(metadata, mutation).unwrap()
481    }
482
483    #[test]
484    fn test_write_and_iter() {
485        let memtable = new_test_memtable(false, MergeMode::LastRow);
486        memtable
487            .write(&build_key_values(
488                &memtable.region_metadata,
489                0,
490                &[(1, 1.0, "a".to_string())],
491                OpType::Put,
492            ))
493            .unwrap();
494        memtable
495            .write(&build_key_values(
496                &memtable.region_metadata,
497                1,
498                &[(2, 2.0, "b".to_string())],
499                OpType::Put,
500            ))
501            .unwrap();
502
503        let mut iter = memtable.iter(None, None, None).unwrap();
504        let batch = iter.next().unwrap().unwrap();
505        assert_eq!(2, batch.num_rows());
506        assert_eq!(2, batch.fields().len());
507        let ts_v = batch
508            .timestamps()
509            .as_any()
510            .downcast_ref::<TimestampMillisecondVector>()
511            .unwrap();
512        assert_eq!(Value::Timestamp(Timestamp::new_millisecond(1)), ts_v.get(0));
513        assert_eq!(Value::Timestamp(Timestamp::new_millisecond(2)), ts_v.get(1));
514    }
515
516    #[test]
517    fn test_projection() {
518        let memtable = new_test_memtable(false, MergeMode::LastRow);
519        memtable
520            .write(&build_key_values(
521                &memtable.region_metadata,
522                0,
523                &[(1, 1.0, "a".to_string())],
524                OpType::Put,
525            ))
526            .unwrap();
527
528        let mut iter = memtable.iter(None, None, None).unwrap();
529        let batch = iter.next().unwrap().unwrap();
530        assert_eq!(1, batch.num_rows());
531        assert_eq!(2, batch.fields().len());
532
533        let ts_v = batch
534            .timestamps()
535            .as_any()
536            .downcast_ref::<TimestampMillisecondVector>()
537            .unwrap();
538        assert_eq!(Value::Timestamp(Timestamp::new_millisecond(1)), ts_v.get(0));
539
540        // Only project column 2 (f1)
541        let projection = vec![2];
542        let mut iter = memtable.iter(Some(&projection), None, None).unwrap();
543        let batch = iter.next().unwrap().unwrap();
544
545        assert_eq!(1, batch.num_rows());
546        assert_eq!(1, batch.fields().len()); // only f1
547        assert_eq!(2, batch.fields()[0].column_id);
548    }
549
550    #[test]
551    fn test_dedup() {
552        let memtable = new_test_memtable(true, MergeMode::LastRow);
553        memtable
554            .write(&build_key_values(
555                &memtable.region_metadata,
556                0,
557                &[(1, 1.0, "a".to_string())],
558                OpType::Put,
559            ))
560            .unwrap();
561        memtable
562            .write(&build_key_values(
563                &memtable.region_metadata,
564                1,
565                &[(1, 2.0, "b".to_string())],
566                OpType::Put,
567            ))
568            .unwrap();
569        let mut iter = memtable.iter(None, None, None).unwrap();
570        let batch = iter.next().unwrap().unwrap();
571
572        assert_eq!(1, batch.num_rows()); // deduped to 1 row
573        assert_eq!(2.0, batch.fields()[0].data.get(0).as_f64_lossy().unwrap()); // last write wins
574    }
575
576    #[test]
577    fn test_write_one() {
578        let memtable = new_test_memtable(false, MergeMode::LastRow);
579        let kvs = build_key_values(
580            &memtable.region_metadata,
581            0,
582            &[(1, 1.0, "a".to_string())],
583            OpType::Put,
584        );
585        let kv = kvs.iter().next().unwrap();
586        memtable.write_one(kv).unwrap();
587
588        let mut iter = memtable.iter(None, None, None).unwrap();
589        let batch = iter.next().unwrap().unwrap();
590        assert_eq!(1, batch.num_rows());
591    }
592
593    #[tokio::test]
594    async fn test_write_dedup() {
595        let memtable = new_test_memtable(true, MergeMode::LastRow);
596        let kvs = build_key_values(
597            &memtable.region_metadata,
598            0,
599            &[(1, 1.0, "a".to_string())],
600            OpType::Put,
601        );
602        let kv = kvs.iter().next().unwrap();
603        memtable.write_one(kv).unwrap();
604        memtable.freeze().unwrap();
605
606        let kvs = build_key_values(
607            &memtable.region_metadata,
608            1,
609            &[(1, 1.0, "a".to_string())],
610            OpType::Delete,
611        );
612        let kv = kvs.iter().next().unwrap();
613        memtable.write_one(kv).unwrap();
614
615        let ranges = memtable
616            .ranges(None, PredicateGroup::default(), None, false)
617            .unwrap();
618        let mut source = vec![];
619        for r in ranges.ranges.values() {
620            source.push(Source::Iter(r.build_iter().unwrap()));
621        }
622
623        let reader = MergeReaderBuilder::from_sources(source)
624            .build()
625            .await
626            .unwrap();
627
628        let mut reader = DedupReader::new(reader, read::dedup::LastRow::new(false));
629        let mut num_rows = 0;
630        while let Some(b) = reader.next_batch().await.unwrap() {
631            num_rows += b.num_rows();
632        }
633        assert_eq!(num_rows, 1);
634    }
635
636    #[tokio::test]
637    async fn test_delete_only() {
638        let memtable = new_test_memtable(true, MergeMode::LastRow);
639        let kvs = build_key_values(
640            &memtable.region_metadata,
641            0,
642            &[(1, 1.0, "a".to_string())],
643            OpType::Delete,
644        );
645        let kv = kvs.iter().next().unwrap();
646        memtable.write_one(kv).unwrap();
647        memtable.freeze().unwrap();
648
649        let ranges = memtable
650            .ranges(None, PredicateGroup::default(), None, false)
651            .unwrap();
652        let mut source = vec![];
653        for r in ranges.ranges.values() {
654            source.push(Source::Iter(r.build_iter().unwrap()));
655        }
656
657        let reader = MergeReaderBuilder::from_sources(source)
658            .build()
659            .await
660            .unwrap();
661
662        let mut reader = DedupReader::new(reader, read::dedup::LastRow::new(false));
663        let mut num_rows = 0;
664        while let Some(b) = reader.next_batch().await.unwrap() {
665            num_rows += b.num_rows();
666            assert_eq!(b.num_rows(), 1);
667            assert_eq!(b.op_types().get_data(0).unwrap(), OpType::Delete as u8);
668        }
669        assert_eq!(num_rows, 1);
670    }
671
672    #[tokio::test]
673    async fn test_single_range() {
674        let memtable = new_test_memtable(true, MergeMode::LastRow);
675        let kvs = build_key_values(
676            &memtable.region_metadata,
677            0,
678            &[(1, 1.0, "a".to_string())],
679            OpType::Put,
680        );
681        memtable.write_one(kvs.iter().next().unwrap()).unwrap();
682
683        let kvs = build_key_values(
684            &memtable.region_metadata,
685            1,
686            &[(1, 2.0, "b".to_string())],
687            OpType::Put,
688        );
689        memtable.write_one(kvs.iter().next().unwrap()).unwrap();
690        memtable.freeze().unwrap();
691
692        let ranges = memtable
693            .ranges(None, PredicateGroup::default(), None, false)
694            .unwrap();
695        assert_eq!(ranges.ranges.len(), 1);
696        let range = ranges.ranges.into_values().next().unwrap();
697        let mut reader = range.context.builder.build(None).unwrap();
698
699        let mut num_rows = 0;
700        while let Some(b) = reader.next().transpose().unwrap() {
701            num_rows += b.num_rows();
702            assert_eq!(b.fields()[1].data.get(0).as_string(), Some("b".to_string()));
703        }
704        assert_eq!(num_rows, 1);
705    }
706
707    #[test]
708    fn test_write_bulk() {
709        let memtable = new_test_memtable(false, MergeMode::LastRow);
710        let arrow_schema = memtable.schema().schema.arrow_schema().clone();
711        let arrays = vec![
712            Arc::new(TimestampMillisecondArray::from(vec![1, 2])) as ArrayRef,
713            Arc::new(Float64Array::from(vec![1.0, 2.0])) as ArrayRef,
714            Arc::new(StringArray::from(vec!["a", "b"])) as ArrayRef,
715        ];
716        let rb = DfRecordBatch::try_new(arrow_schema, arrays).unwrap();
717
718        let part = BulkPart {
719            batch: rb,
720            sequence: 1,
721            min_timestamp: 1,
722            max_timestamp: 2,
723            timestamp_index: 0,
724            raw_data: None,
725        };
726        memtable.write_bulk(part).unwrap();
727
728        let mut iter = memtable.iter(None, None, None).unwrap();
729        let batch = iter.next().unwrap().unwrap();
730        assert_eq!(2, batch.num_rows());
731
732        let stats = memtable.stats();
733        assert_eq!(1, stats.max_sequence);
734        assert_eq!(2, stats.num_rows);
735        assert_eq!(
736            Some((Timestamp::new_millisecond(1), Timestamp::new_millisecond(2))),
737            stats.time_range
738        );
739
740        let kvs = build_key_values(
741            &memtable.region_metadata,
742            2,
743            &[(3, 3.0, "c".to_string())],
744            OpType::Put,
745        );
746        memtable.write(&kvs).unwrap();
747        let mut iter = memtable.iter(None, None, None).unwrap();
748        let batch = iter.next().unwrap().unwrap();
749        assert_eq!(3, batch.num_rows());
750        assert_eq!(
751            vec![1, 2, 3],
752            batch
753                .timestamps()
754                .as_any()
755                .downcast_ref::<TimestampMillisecondVector>()
756                .unwrap()
757                .iter_data()
758                .map(|t| { t.unwrap().0.value() })
759                .collect::<Vec<_>>()
760        );
761    }
762
763    #[test]
764    fn test_is_empty() {
765        let memtable = new_test_memtable(false, MergeMode::LastRow);
766        assert!(memtable.is_empty());
767
768        memtable
769            .write(&build_key_values(
770                &memtable.region_metadata,
771                0,
772                &[(1, 1.0, "a".to_string())],
773                OpType::Put,
774            ))
775            .unwrap();
776        assert!(!memtable.is_empty());
777    }
778
779    #[test]
780    fn test_stats() {
781        let memtable = new_test_memtable(false, MergeMode::LastRow);
782        let stats = memtable.stats();
783        assert_eq!(0, stats.num_rows);
784        assert!(stats.time_range.is_none());
785
786        memtable
787            .write(&build_key_values(
788                &memtable.region_metadata,
789                0,
790                &[(1, 1.0, "a".to_string())],
791                OpType::Put,
792            ))
793            .unwrap();
794        let stats = memtable.stats();
795        assert_eq!(1, stats.num_rows);
796        assert!(stats.time_range.is_some());
797    }
798
799    #[test]
800    fn test_fork() {
801        let memtable = new_test_memtable(false, MergeMode::LastRow);
802        memtable
803            .write(&build_key_values(
804                &memtable.region_metadata,
805                0,
806                &[(1, 1.0, "a".to_string())],
807                OpType::Put,
808            ))
809            .unwrap();
810
811        let forked = memtable.fork(2, &memtable.region_metadata);
812        assert!(forked.is_empty());
813    }
814
815    #[test]
816    fn test_sequence_filter() {
817        let memtable = new_test_memtable(false, MergeMode::LastRow);
818        memtable
819            .write(&build_key_values(
820                &memtable.region_metadata,
821                0,
822                &[(1, 1.0, "a".to_string())],
823                OpType::Put,
824            ))
825            .unwrap();
826        memtable
827            .write(&build_key_values(
828                &memtable.region_metadata,
829                1,
830                &[(2, 2.0, "b".to_string())],
831                OpType::Put,
832            ))
833            .unwrap();
834
835        // Filter with sequence 0 should only return first write
836        let mut iter = memtable.iter(None, None, Some(0)).unwrap();
837        let batch = iter.next().unwrap().unwrap();
838        assert_eq!(1, batch.num_rows());
839        assert_eq!(1.0, batch.fields()[0].data.get(0).as_f64_lossy().unwrap());
840    }
841
842    fn rb_with_large_string(
843        ts: i64,
844        string_len: i32,
845        region_meta: &RegionMetadataRef,
846    ) -> RecordBatch {
847        let schema = region_meta.schema.arrow_schema().clone();
848        RecordBatch::try_new(
849            schema,
850            vec![
851                Arc::new(StringArray::from_iter_values(
852                    ["a".repeat(string_len as usize).clone()].into_iter(),
853                )) as ArrayRef,
854                Arc::new(TimestampMillisecondArray::from_iter_values(
855                    [ts].into_iter(),
856                )) as ArrayRef,
857            ],
858        )
859        .unwrap()
860    }
861
862    #[tokio::test]
863    async fn test_write_read_large_string() {
864        let mut builder = RegionMetadataBuilder::new(RegionId::new(123, 456));
865        builder
866            .push_column_metadata(ColumnMetadata {
867                column_schema: ColumnSchema::new("k0", ConcreteDataType::string_datatype(), false),
868                semantic_type: SemanticType::Field,
869                column_id: 0,
870            })
871            .push_column_metadata(ColumnMetadata {
872                column_schema: ColumnSchema::new(
873                    "ts",
874                    ConcreteDataType::timestamp_millisecond_datatype(),
875                    false,
876                ),
877                semantic_type: SemanticType::Timestamp,
878                column_id: 1,
879            })
880            .primary_key(vec![]);
881        let region_meta = Arc::new(builder.build().unwrap());
882        let memtable =
883            SimpleBulkMemtable::new(0, region_meta.clone(), None, true, MergeMode::LastRow);
884        memtable
885            .write_bulk(BulkPart {
886                batch: rb_with_large_string(0, i32::MAX, &region_meta),
887                max_timestamp: 0,
888                min_timestamp: 0,
889                sequence: 0,
890                timestamp_index: 1,
891                raw_data: None,
892            })
893            .unwrap();
894
895        memtable.freeze().unwrap();
896        memtable
897            .write_bulk(BulkPart {
898                batch: rb_with_large_string(1, 3, &region_meta),
899                max_timestamp: 1,
900                min_timestamp: 1,
901                sequence: 1,
902                timestamp_index: 1,
903                raw_data: None,
904            })
905            .unwrap();
906        let MemtableRanges { ranges, .. } = memtable
907            .ranges(None, PredicateGroup::default(), None, false)
908            .unwrap();
909        let mut source = if ranges.len() == 1 {
910            let only_range = ranges.into_values().next().unwrap();
911            Source::Iter(only_range.build_iter().unwrap())
912        } else {
913            let sources = ranges
914                .into_values()
915                .map(|r| r.build_iter().map(Source::Iter))
916                .collect::<error::Result<Vec<_>>>()
917                .unwrap();
918            let merge_reader = MergeReaderBuilder::from_sources(sources)
919                .build()
920                .await
921                .unwrap();
922            Source::Reader(Box::new(merge_reader))
923        };
924
925        let mut rows = 0;
926        while let Some(b) = source.next_batch().await.unwrap() {
927            rows += b.num_rows();
928        }
929        assert_eq!(rows, 2);
930    }
931}