mito2/memtable/
simple_bulk_memtable.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#[cfg(any(test, feature = "test"))]
16mod test_only;
17
18use std::collections::HashSet;
19use std::fmt::{Debug, Formatter};
20use std::sync::atomic::{AtomicI64, AtomicU64, AtomicUsize, Ordering};
21use std::sync::{Arc, RwLock};
22use std::time::{Duration, Instant};
23
24use api::v1::OpType;
25use datatypes::vectors::Helper;
26use mito_codec::key_values::KeyValue;
27use rayon::prelude::*;
28use snafu::{OptionExt, ResultExt};
29use store_api::metadata::RegionMetadataRef;
30use store_api::storage::ColumnId;
31
32use crate::flush::WriteBufferManagerRef;
33use crate::memtable::bulk::part::BulkPart;
34use crate::memtable::stats::WriteMetrics;
35use crate::memtable::time_series::Series;
36use crate::memtable::{
37    AllocTracker, BoxedBatchIterator, IterBuilder, KeyValues, MemScanMetrics, Memtable, MemtableId,
38    MemtableRange, MemtableRangeContext, MemtableRanges, MemtableRef, MemtableStats, RangesOptions,
39};
40use crate::metrics::MEMTABLE_ACTIVE_SERIES_COUNT;
41use crate::read::Batch;
42use crate::read::dedup::LastNonNullIter;
43use crate::region::options::MergeMode;
44use crate::{error, metrics};
45
46pub struct SimpleBulkMemtable {
47    id: MemtableId,
48    region_metadata: RegionMetadataRef,
49    alloc_tracker: AllocTracker,
50    max_timestamp: AtomicI64,
51    min_timestamp: AtomicI64,
52    max_sequence: AtomicU64,
53    dedup: bool,
54    merge_mode: MergeMode,
55    num_rows: AtomicUsize,
56    series: RwLock<Series>,
57}
58
59impl Drop for SimpleBulkMemtable {
60    fn drop(&mut self) {
61        MEMTABLE_ACTIVE_SERIES_COUNT.dec();
62    }
63}
64
65impl SimpleBulkMemtable {
66    pub fn new(
67        id: MemtableId,
68        region_metadata: RegionMetadataRef,
69        write_buffer_manager: Option<WriteBufferManagerRef>,
70        dedup: bool,
71        merge_mode: MergeMode,
72    ) -> Self {
73        let dedup = if merge_mode == MergeMode::LastNonNull {
74            false
75        } else {
76            dedup
77        };
78        let series = RwLock::new(Series::with_capacity(&region_metadata, 1024, 8192));
79
80        Self {
81            id,
82            region_metadata,
83            alloc_tracker: AllocTracker::new(write_buffer_manager),
84            max_timestamp: AtomicI64::new(i64::MIN),
85            min_timestamp: AtomicI64::new(i64::MAX),
86            max_sequence: AtomicU64::new(0),
87            dedup,
88            merge_mode,
89            num_rows: AtomicUsize::new(0),
90            series,
91        }
92    }
93
94    fn build_projection(&self, projection: Option<&[ColumnId]>) -> HashSet<ColumnId> {
95        if let Some(projection) = projection {
96            projection.iter().copied().collect()
97        } else {
98            self.region_metadata
99                .field_columns()
100                .map(|c| c.column_id)
101                .collect()
102        }
103    }
104
105    fn write_key_value(&self, kv: KeyValue, stats: &mut WriteMetrics) {
106        let ts = kv.timestamp();
107        let sequence = kv.sequence();
108        let op_type = kv.op_type();
109        let mut series = self.series.write().unwrap();
110        let size = series.push(ts, sequence, op_type, kv.fields());
111        stats.value_bytes += size;
112        // safety: timestamp of kv must be both present and a valid timestamp value.
113        let ts = kv
114            .timestamp()
115            .try_into_timestamp()
116            .unwrap()
117            .unwrap()
118            .value();
119        stats.min_ts = stats.min_ts.min(ts);
120        stats.max_ts = stats.max_ts.max(ts);
121    }
122
123    /// Updates memtable stats.
124    fn update_stats(&self, stats: WriteMetrics) {
125        self.alloc_tracker
126            .on_allocation(stats.key_bytes + stats.value_bytes);
127        self.num_rows.fetch_add(stats.num_rows, Ordering::SeqCst);
128        self.max_timestamp.fetch_max(stats.max_ts, Ordering::SeqCst);
129        self.min_timestamp.fetch_min(stats.min_ts, Ordering::SeqCst);
130        self.max_sequence
131            .fetch_max(stats.max_sequence, Ordering::SeqCst);
132    }
133
134    #[cfg(test)]
135    fn schema(&self) -> &RegionMetadataRef {
136        &self.region_metadata
137    }
138}
139
140impl Debug for SimpleBulkMemtable {
141    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
142        f.debug_struct("SimpleBulkMemtable").finish()
143    }
144}
145
146impl Memtable for SimpleBulkMemtable {
147    fn id(&self) -> MemtableId {
148        self.id
149    }
150
151    fn write(&self, kvs: &KeyValues) -> error::Result<()> {
152        let mut stats = WriteMetrics::default();
153        let max_sequence = kvs.max_sequence();
154        for kv in kvs.iter() {
155            self.write_key_value(kv, &mut stats);
156        }
157        stats.max_sequence = max_sequence;
158        stats.num_rows = kvs.num_rows();
159        self.update_stats(stats);
160        Ok(())
161    }
162
163    fn write_one(&self, kv: KeyValue) -> error::Result<()> {
164        debug_assert_eq!(0, kv.num_primary_keys());
165        let mut stats = WriteMetrics::default();
166        self.write_key_value(kv, &mut stats);
167        stats.num_rows = 1;
168        stats.max_sequence = kv.sequence();
169        self.update_stats(stats);
170        Ok(())
171    }
172
173    fn write_bulk(&self, part: BulkPart) -> error::Result<()> {
174        let rb = &part.batch;
175
176        let ts = Helper::try_into_vector(
177            rb.column_by_name(&self.region_metadata.time_index_column().column_schema.name)
178                .with_context(|| error::InvalidRequestSnafu {
179                    region_id: self.region_metadata.region_id,
180                    reason: "Timestamp not found",
181                })?,
182        )
183        .context(error::ConvertVectorSnafu)?;
184
185        let sequence = part.sequence;
186
187        let fields: Vec<_> = self
188            .region_metadata
189            .field_columns()
190            .map(|f| {
191                let array = rb.column_by_name(&f.column_schema.name).ok_or_else(|| {
192                    error::InvalidRequestSnafu {
193                        region_id: self.region_metadata.region_id,
194                        reason: format!("Column {} not found", f.column_schema.name),
195                    }
196                    .build()
197                })?;
198                Helper::try_into_vector(array).context(error::ConvertVectorSnafu)
199            })
200            .collect::<error::Result<Vec<_>>>()?;
201
202        let mut series = self.series.write().unwrap();
203        let extend_timer = metrics::REGION_WORKER_HANDLE_WRITE_ELAPSED
204            .with_label_values(&["bulk_extend"])
205            .start_timer();
206        series.extend(ts, OpType::Put as u8, sequence, fields)?;
207        extend_timer.observe_duration();
208
209        self.update_stats(WriteMetrics {
210            key_bytes: 0,
211            value_bytes: part.estimated_size(),
212            min_ts: part.min_timestamp,
213            max_ts: part.max_timestamp,
214            num_rows: part.num_rows(),
215            max_sequence: sequence,
216        });
217        Ok(())
218    }
219
220    #[cfg(any(test, feature = "test"))]
221    fn iter(
222        &self,
223        projection: Option<&[ColumnId]>,
224        _predicate: Option<table::predicate::Predicate>,
225        sequence: Option<store_api::storage::SequenceRange>,
226    ) -> error::Result<BoxedBatchIterator> {
227        let iter = self.create_iter(projection, sequence)?.build(None)?;
228
229        if self.merge_mode == MergeMode::LastNonNull {
230            let iter = LastNonNullIter::new(iter);
231            Ok(Box::new(iter))
232        } else {
233            Ok(Box::new(iter))
234        }
235    }
236
237    fn ranges(
238        &self,
239        projection: Option<&[ColumnId]>,
240        options: RangesOptions,
241    ) -> error::Result<MemtableRanges> {
242        let predicate = options.predicate;
243        let sequence = options.sequence;
244        let start_time = Instant::now();
245        let projection = Arc::new(self.build_projection(projection));
246
247        // Use the memtable's overall time range and max sequence for all ranges
248        let max_sequence = self.max_sequence.load(Ordering::Relaxed);
249        let time_range = {
250            let num_rows = self.num_rows.load(Ordering::Relaxed);
251            if num_rows > 0 {
252                let ts_type = self.region_metadata.time_index_type();
253                let max_timestamp =
254                    ts_type.create_timestamp(self.max_timestamp.load(Ordering::Relaxed));
255                let min_timestamp =
256                    ts_type.create_timestamp(self.min_timestamp.load(Ordering::Relaxed));
257                Some((min_timestamp, max_timestamp))
258            } else {
259                None
260            }
261        };
262
263        let values = self.series.read().unwrap().read_to_values();
264        let contexts = values
265            .into_par_iter()
266            .filter_map(|v| {
267                let filtered = match v
268                    .to_batch(&[], &self.region_metadata, &projection, self.dedup)
269                    .and_then(|mut b| {
270                        b.filter_by_sequence(sequence)?;
271                        Ok(b)
272                    }) {
273                    Ok(filtered) => filtered,
274                    Err(e) => {
275                        return Some(Err(e));
276                    }
277                };
278                if filtered.is_empty() {
279                    None
280                } else {
281                    Some(Ok(filtered))
282                }
283            })
284            .map(|result| {
285                result.map(|batch| {
286                    let num_rows = batch.num_rows();
287                    let estimated_bytes = batch.memory_size();
288
289                    let range_stats = MemtableStats {
290                        estimated_bytes,
291                        time_range,
292                        num_rows,
293                        num_ranges: 1,
294                        max_sequence,
295                        series_count: 1,
296                    };
297
298                    let builder = BatchRangeBuilder {
299                        batch,
300                        merge_mode: self.merge_mode,
301                        scan_cost: start_time.elapsed(),
302                    };
303                    (
304                        range_stats,
305                        Arc::new(MemtableRangeContext::new(
306                            self.id,
307                            Box::new(builder),
308                            predicate.clone(),
309                        )),
310                    )
311                })
312            })
313            .collect::<error::Result<Vec<_>>>()?;
314
315        let ranges = contexts
316            .into_iter()
317            .enumerate()
318            .map(|(idx, (range_stats, context))| (idx, MemtableRange::new(context, range_stats)))
319            .collect();
320
321        Ok(MemtableRanges { ranges })
322    }
323
324    fn is_empty(&self) -> bool {
325        self.series.read().unwrap().is_empty()
326    }
327
328    fn freeze(&self) -> error::Result<()> {
329        self.series.write().unwrap().freeze(&self.region_metadata);
330        Ok(())
331    }
332
333    fn stats(&self) -> MemtableStats {
334        let estimated_bytes = self.alloc_tracker.bytes_allocated();
335        let num_rows = self.num_rows.load(Ordering::Relaxed);
336        if num_rows == 0 {
337            // no rows ever written
338            return MemtableStats {
339                estimated_bytes,
340                time_range: None,
341                num_rows: 0,
342                num_ranges: 0,
343                max_sequence: 0,
344                series_count: 0,
345            };
346        }
347        let ts_type = self.region_metadata.time_index_type();
348        let max_timestamp = ts_type.create_timestamp(self.max_timestamp.load(Ordering::Relaxed));
349        let min_timestamp = ts_type.create_timestamp(self.min_timestamp.load(Ordering::Relaxed));
350        MemtableStats {
351            estimated_bytes,
352            time_range: Some((min_timestamp, max_timestamp)),
353            num_rows,
354            num_ranges: 1,
355            max_sequence: self.max_sequence.load(Ordering::Relaxed),
356            series_count: 1,
357        }
358    }
359
360    fn fork(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef {
361        Arc::new(Self::new(
362            id,
363            metadata.clone(),
364            self.alloc_tracker.write_buffer_manager(),
365            self.dedup,
366            self.merge_mode,
367        ))
368    }
369}
370
371#[derive(Clone)]
372pub struct BatchRangeBuilder {
373    pub batch: Batch,
374    pub merge_mode: MergeMode,
375    scan_cost: Duration,
376}
377
378impl IterBuilder for BatchRangeBuilder {
379    fn build(&self, metrics: Option<MemScanMetrics>) -> error::Result<BoxedBatchIterator> {
380        let batch = self.batch.clone();
381        if let Some(metrics) = metrics {
382            let inner = crate::memtable::MemScanMetricsData {
383                total_series: 1,
384                num_rows: batch.num_rows(),
385                num_batches: 1,
386                scan_cost: self.scan_cost,
387            };
388            metrics.merge_inner(&inner);
389        }
390
391        let iter = Iter {
392            batch: Some(Ok(batch)),
393        };
394
395        if self.merge_mode == MergeMode::LastNonNull {
396            Ok(Box::new(LastNonNullIter::new(iter)))
397        } else {
398            Ok(Box::new(iter))
399        }
400    }
401}
402
403struct Iter {
404    batch: Option<error::Result<Batch>>,
405}
406
407impl Iterator for Iter {
408    type Item = error::Result<Batch>;
409
410    fn next(&mut self) -> Option<Self::Item> {
411        self.batch.take()
412    }
413}
414
415#[cfg(test)]
416mod tests {
417    use std::sync::Arc;
418
419    use api::v1::helper::row;
420    use api::v1::value::ValueData;
421    use api::v1::{Mutation, OpType, Rows, SemanticType};
422    use common_recordbatch::DfRecordBatch;
423    use common_time::Timestamp;
424    use datatypes::arrow::array::{ArrayRef, Float64Array, RecordBatch, TimestampMillisecondArray};
425    use datatypes::arrow_array::StringArray;
426    use datatypes::data_type::ConcreteDataType;
427    use datatypes::prelude::{ScalarVector, Vector};
428    use datatypes::schema::ColumnSchema;
429    use datatypes::value::Value;
430    use datatypes::vectors::TimestampMillisecondVector;
431    use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
432    use store_api::storage::{RegionId, SequenceNumber, SequenceRange};
433
434    use super::*;
435    use crate::read;
436    use crate::read::dedup::DedupReader;
437    use crate::read::merge::MergeReaderBuilder;
438    use crate::read::{BatchReader, Source};
439    use crate::region::options::MergeMode;
440    use crate::test_util::column_metadata_to_column_schema;
441
442    fn new_test_metadata() -> RegionMetadataRef {
443        let mut builder = RegionMetadataBuilder::new(1.into());
444        builder
445            .push_column_metadata(ColumnMetadata {
446                column_schema: ColumnSchema::new(
447                    "ts",
448                    ConcreteDataType::timestamp_millisecond_datatype(),
449                    false,
450                ),
451                semantic_type: SemanticType::Timestamp,
452                column_id: 1,
453            })
454            .push_column_metadata(ColumnMetadata {
455                column_schema: ColumnSchema::new("f1", ConcreteDataType::float64_datatype(), true),
456                semantic_type: SemanticType::Field,
457                column_id: 2,
458            })
459            .push_column_metadata(ColumnMetadata {
460                column_schema: ColumnSchema::new("f2", ConcreteDataType::string_datatype(), true),
461                semantic_type: SemanticType::Field,
462                column_id: 3,
463            });
464        Arc::new(builder.build().unwrap())
465    }
466
467    fn new_test_memtable(dedup: bool, merge_mode: MergeMode) -> SimpleBulkMemtable {
468        SimpleBulkMemtable::new(1, new_test_metadata(), None, dedup, merge_mode)
469    }
470
471    fn build_key_values(
472        metadata: &RegionMetadataRef,
473        sequence: SequenceNumber,
474        row_values: &[(i64, f64, String)],
475        op_type: OpType,
476    ) -> KeyValues {
477        let column_schemas: Vec<_> = metadata
478            .column_metadatas
479            .iter()
480            .map(column_metadata_to_column_schema)
481            .collect();
482
483        let rows: Vec<_> = row_values
484            .iter()
485            .map(|(ts, f1, f2)| {
486                row(vec![
487                    ValueData::TimestampMillisecondValue(*ts),
488                    ValueData::F64Value(*f1),
489                    ValueData::StringValue(f2.clone()),
490                ])
491            })
492            .collect();
493        let mutation = Mutation {
494            op_type: op_type as i32,
495            sequence,
496            rows: Some(Rows {
497                schema: column_schemas,
498                rows,
499            }),
500            write_hint: None,
501        };
502        KeyValues::new(metadata, mutation).unwrap()
503    }
504
505    #[test]
506    fn test_write_and_iter() {
507        let memtable = new_test_memtable(false, MergeMode::LastRow);
508        memtable
509            .write(&build_key_values(
510                &memtable.region_metadata,
511                0,
512                &[(1, 1.0, "a".to_string())],
513                OpType::Put,
514            ))
515            .unwrap();
516        memtable
517            .write(&build_key_values(
518                &memtable.region_metadata,
519                1,
520                &[(2, 2.0, "b".to_string())],
521                OpType::Put,
522            ))
523            .unwrap();
524
525        let mut iter = memtable.iter(None, None, None).unwrap();
526        let batch = iter.next().unwrap().unwrap();
527        assert_eq!(2, batch.num_rows());
528        assert_eq!(2, batch.fields().len());
529        let ts_v = batch
530            .timestamps()
531            .as_any()
532            .downcast_ref::<TimestampMillisecondVector>()
533            .unwrap();
534        assert_eq!(Value::Timestamp(Timestamp::new_millisecond(1)), ts_v.get(0));
535        assert_eq!(Value::Timestamp(Timestamp::new_millisecond(2)), ts_v.get(1));
536    }
537
538    #[test]
539    fn test_projection() {
540        let memtable = new_test_memtable(false, MergeMode::LastRow);
541        memtable
542            .write(&build_key_values(
543                &memtable.region_metadata,
544                0,
545                &[(1, 1.0, "a".to_string())],
546                OpType::Put,
547            ))
548            .unwrap();
549
550        let mut iter = memtable.iter(None, None, None).unwrap();
551        let batch = iter.next().unwrap().unwrap();
552        assert_eq!(1, batch.num_rows());
553        assert_eq!(2, batch.fields().len());
554
555        let ts_v = batch
556            .timestamps()
557            .as_any()
558            .downcast_ref::<TimestampMillisecondVector>()
559            .unwrap();
560        assert_eq!(Value::Timestamp(Timestamp::new_millisecond(1)), ts_v.get(0));
561
562        // Only project column 2 (f1)
563        let projection = vec![2];
564        let mut iter = memtable.iter(Some(&projection), None, None).unwrap();
565        let batch = iter.next().unwrap().unwrap();
566
567        assert_eq!(1, batch.num_rows());
568        assert_eq!(1, batch.fields().len()); // only f1
569        assert_eq!(2, batch.fields()[0].column_id);
570    }
571
572    #[test]
573    fn test_dedup() {
574        let memtable = new_test_memtable(true, MergeMode::LastRow);
575        memtable
576            .write(&build_key_values(
577                &memtable.region_metadata,
578                0,
579                &[(1, 1.0, "a".to_string())],
580                OpType::Put,
581            ))
582            .unwrap();
583        memtable
584            .write(&build_key_values(
585                &memtable.region_metadata,
586                1,
587                &[(1, 2.0, "b".to_string())],
588                OpType::Put,
589            ))
590            .unwrap();
591        let mut iter = memtable.iter(None, None, None).unwrap();
592        let batch = iter.next().unwrap().unwrap();
593
594        assert_eq!(1, batch.num_rows()); // deduped to 1 row
595        assert_eq!(2.0, batch.fields()[0].data.get(0).as_f64_lossy().unwrap()); // last write wins
596    }
597
598    #[test]
599    fn test_write_one() {
600        let memtable = new_test_memtable(false, MergeMode::LastRow);
601        let kvs = build_key_values(
602            &memtable.region_metadata,
603            0,
604            &[(1, 1.0, "a".to_string())],
605            OpType::Put,
606        );
607        let kv = kvs.iter().next().unwrap();
608        memtable.write_one(kv).unwrap();
609
610        let mut iter = memtable.iter(None, None, None).unwrap();
611        let batch = iter.next().unwrap().unwrap();
612        assert_eq!(1, batch.num_rows());
613    }
614
615    #[tokio::test]
616    async fn test_write_dedup() {
617        let memtable = new_test_memtable(true, MergeMode::LastRow);
618        let kvs = build_key_values(
619            &memtable.region_metadata,
620            0,
621            &[(1, 1.0, "a".to_string())],
622            OpType::Put,
623        );
624        let kv = kvs.iter().next().unwrap();
625        memtable.write_one(kv).unwrap();
626        memtable.freeze().unwrap();
627
628        let kvs = build_key_values(
629            &memtable.region_metadata,
630            1,
631            &[(1, 1.0, "a".to_string())],
632            OpType::Delete,
633        );
634        let kv = kvs.iter().next().unwrap();
635        memtable.write_one(kv).unwrap();
636
637        let ranges = memtable.ranges(None, RangesOptions::default()).unwrap();
638        let mut source = vec![];
639        for r in ranges.ranges.values() {
640            source.push(Source::Iter(r.build_iter().unwrap()));
641        }
642
643        let reader = MergeReaderBuilder::from_sources(source)
644            .build()
645            .await
646            .unwrap();
647
648        let mut reader = DedupReader::new(reader, read::dedup::LastRow::new(false), None);
649        let mut num_rows = 0;
650        while let Some(b) = reader.next_batch().await.unwrap() {
651            num_rows += b.num_rows();
652        }
653        assert_eq!(num_rows, 1);
654    }
655
656    #[tokio::test]
657    async fn test_delete_only() {
658        let memtable = new_test_memtable(true, MergeMode::LastRow);
659        let kvs = build_key_values(
660            &memtable.region_metadata,
661            0,
662            &[(1, 1.0, "a".to_string())],
663            OpType::Delete,
664        );
665        let kv = kvs.iter().next().unwrap();
666        memtable.write_one(kv).unwrap();
667        memtable.freeze().unwrap();
668
669        let ranges = memtable.ranges(None, RangesOptions::default()).unwrap();
670        let mut source = vec![];
671        for r in ranges.ranges.values() {
672            source.push(Source::Iter(r.build_iter().unwrap()));
673        }
674
675        let reader = MergeReaderBuilder::from_sources(source)
676            .build()
677            .await
678            .unwrap();
679
680        let mut reader = DedupReader::new(reader, read::dedup::LastRow::new(false), None);
681        let mut num_rows = 0;
682        while let Some(b) = reader.next_batch().await.unwrap() {
683            num_rows += b.num_rows();
684            assert_eq!(b.num_rows(), 1);
685            assert_eq!(b.op_types().get_data(0).unwrap(), OpType::Delete as u8);
686        }
687        assert_eq!(num_rows, 1);
688    }
689
690    #[tokio::test]
691    async fn test_single_range() {
692        let memtable = new_test_memtable(true, MergeMode::LastRow);
693        let kvs = build_key_values(
694            &memtable.region_metadata,
695            0,
696            &[(1, 1.0, "a".to_string())],
697            OpType::Put,
698        );
699        memtable.write_one(kvs.iter().next().unwrap()).unwrap();
700
701        let kvs = build_key_values(
702            &memtable.region_metadata,
703            1,
704            &[(1, 2.0, "b".to_string())],
705            OpType::Put,
706        );
707        memtable.write_one(kvs.iter().next().unwrap()).unwrap();
708        memtable.freeze().unwrap();
709
710        let ranges = memtable.ranges(None, RangesOptions::default()).unwrap();
711        assert_eq!(ranges.ranges.len(), 1);
712        let range = ranges.ranges.into_values().next().unwrap();
713        let mut reader = range.context.builder.build(None).unwrap();
714
715        let mut num_rows = 0;
716        while let Some(b) = reader.next().transpose().unwrap() {
717            num_rows += b.num_rows();
718            assert_eq!(b.fields()[1].data.get(0).as_string(), Some("b".to_string()));
719        }
720        assert_eq!(num_rows, 1);
721    }
722
723    #[test]
724    fn test_write_bulk() {
725        let memtable = new_test_memtable(false, MergeMode::LastRow);
726        let arrow_schema = memtable.schema().schema.arrow_schema().clone();
727        let arrays = vec![
728            Arc::new(TimestampMillisecondArray::from(vec![1, 2])) as ArrayRef,
729            Arc::new(Float64Array::from(vec![1.0, 2.0])) as ArrayRef,
730            Arc::new(StringArray::from(vec!["a", "b"])) as ArrayRef,
731        ];
732        let rb = DfRecordBatch::try_new(arrow_schema, arrays).unwrap();
733
734        let part = BulkPart {
735            batch: rb,
736            sequence: 1,
737            min_timestamp: 1,
738            max_timestamp: 2,
739            timestamp_index: 0,
740            raw_data: None,
741        };
742        memtable.write_bulk(part).unwrap();
743
744        let mut iter = memtable.iter(None, None, None).unwrap();
745        let batch = iter.next().unwrap().unwrap();
746        assert_eq!(2, batch.num_rows());
747
748        let stats = memtable.stats();
749        assert_eq!(1, stats.max_sequence);
750        assert_eq!(2, stats.num_rows);
751        assert_eq!(
752            Some((Timestamp::new_millisecond(1), Timestamp::new_millisecond(2))),
753            stats.time_range
754        );
755
756        let kvs = build_key_values(
757            &memtable.region_metadata,
758            2,
759            &[(3, 3.0, "c".to_string())],
760            OpType::Put,
761        );
762        memtable.write(&kvs).unwrap();
763        let mut iter = memtable.iter(None, None, None).unwrap();
764        let batch = iter.next().unwrap().unwrap();
765        assert_eq!(3, batch.num_rows());
766        assert_eq!(
767            vec![1, 2, 3],
768            batch
769                .timestamps()
770                .as_any()
771                .downcast_ref::<TimestampMillisecondVector>()
772                .unwrap()
773                .iter_data()
774                .map(|t| { t.unwrap().0.value() })
775                .collect::<Vec<_>>()
776        );
777    }
778
779    #[test]
780    fn test_is_empty() {
781        let memtable = new_test_memtable(false, MergeMode::LastRow);
782        assert!(memtable.is_empty());
783
784        memtable
785            .write(&build_key_values(
786                &memtable.region_metadata,
787                0,
788                &[(1, 1.0, "a".to_string())],
789                OpType::Put,
790            ))
791            .unwrap();
792        assert!(!memtable.is_empty());
793    }
794
795    #[test]
796    fn test_stats() {
797        let memtable = new_test_memtable(false, MergeMode::LastRow);
798        let stats = memtable.stats();
799        assert_eq!(0, stats.num_rows);
800        assert!(stats.time_range.is_none());
801
802        memtable
803            .write(&build_key_values(
804                &memtable.region_metadata,
805                0,
806                &[(1, 1.0, "a".to_string())],
807                OpType::Put,
808            ))
809            .unwrap();
810        let stats = memtable.stats();
811        assert_eq!(1, stats.num_rows);
812        assert!(stats.time_range.is_some());
813    }
814
815    #[test]
816    fn test_fork() {
817        let memtable = new_test_memtable(false, MergeMode::LastRow);
818        memtable
819            .write(&build_key_values(
820                &memtable.region_metadata,
821                0,
822                &[(1, 1.0, "a".to_string())],
823                OpType::Put,
824            ))
825            .unwrap();
826
827        let forked = memtable.fork(2, &memtable.region_metadata);
828        assert!(forked.is_empty());
829    }
830
831    #[test]
832    fn test_sequence_filter() {
833        let memtable = new_test_memtable(false, MergeMode::LastRow);
834        memtable
835            .write(&build_key_values(
836                &memtable.region_metadata,
837                0,
838                &[(1, 1.0, "a".to_string())],
839                OpType::Put,
840            ))
841            .unwrap();
842        memtable
843            .write(&build_key_values(
844                &memtable.region_metadata,
845                1,
846                &[(2, 2.0, "b".to_string())],
847                OpType::Put,
848            ))
849            .unwrap();
850
851        // Filter with sequence 0 should only return first write
852        let mut iter = memtable
853            .iter(None, None, Some(SequenceRange::LtEq { max: 0 }))
854            .unwrap();
855        let batch = iter.next().unwrap().unwrap();
856        assert_eq!(1, batch.num_rows());
857        assert_eq!(1.0, batch.fields()[0].data.get(0).as_f64_lossy().unwrap());
858    }
859
860    fn rb_with_large_string(
861        ts: i64,
862        string_len: i32,
863        region_meta: &RegionMetadataRef,
864    ) -> RecordBatch {
865        let schema = region_meta.schema.arrow_schema().clone();
866        RecordBatch::try_new(
867            schema,
868            vec![
869                Arc::new(StringArray::from_iter_values(
870                    ["a".repeat(string_len as usize).clone()].into_iter(),
871                )) as ArrayRef,
872                Arc::new(TimestampMillisecondArray::from_iter_values(
873                    [ts].into_iter(),
874                )) as ArrayRef,
875            ],
876        )
877        .unwrap()
878    }
879
880    #[tokio::test]
881    async fn test_write_read_large_string() {
882        let mut builder = RegionMetadataBuilder::new(RegionId::new(123, 456));
883        builder
884            .push_column_metadata(ColumnMetadata {
885                column_schema: ColumnSchema::new("k0", ConcreteDataType::string_datatype(), false),
886                semantic_type: SemanticType::Field,
887                column_id: 0,
888            })
889            .push_column_metadata(ColumnMetadata {
890                column_schema: ColumnSchema::new(
891                    "ts",
892                    ConcreteDataType::timestamp_millisecond_datatype(),
893                    false,
894                ),
895                semantic_type: SemanticType::Timestamp,
896                column_id: 1,
897            })
898            .primary_key(vec![]);
899        let region_meta = Arc::new(builder.build().unwrap());
900        let memtable =
901            SimpleBulkMemtable::new(0, region_meta.clone(), None, true, MergeMode::LastRow);
902        memtable
903            .write_bulk(BulkPart {
904                batch: rb_with_large_string(0, i32::MAX, &region_meta),
905                max_timestamp: 0,
906                min_timestamp: 0,
907                sequence: 0,
908                timestamp_index: 1,
909                raw_data: None,
910            })
911            .unwrap();
912
913        memtable.freeze().unwrap();
914        memtable
915            .write_bulk(BulkPart {
916                batch: rb_with_large_string(1, 3, &region_meta),
917                max_timestamp: 1,
918                min_timestamp: 1,
919                sequence: 1,
920                timestamp_index: 1,
921                raw_data: None,
922            })
923            .unwrap();
924        let MemtableRanges { ranges, .. } =
925            memtable.ranges(None, RangesOptions::default()).unwrap();
926        let mut source = if ranges.len() == 1 {
927            let only_range = ranges.into_values().next().unwrap();
928            Source::Iter(only_range.build_iter().unwrap())
929        } else {
930            let sources = ranges
931                .into_values()
932                .map(|r| r.build_iter().map(Source::Iter))
933                .collect::<error::Result<Vec<_>>>()
934                .unwrap();
935            let merge_reader = MergeReaderBuilder::from_sources(sources)
936                .build()
937                .await
938                .unwrap();
939            Source::Reader(Box::new(merge_reader))
940        };
941
942        let mut rows = 0;
943        while let Some(b) = source.next_batch().await.unwrap() {
944            rows += b.num_rows();
945        }
946        assert_eq!(rows, 2);
947    }
948}