mito2/memtable/
simple_bulk_memtable.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#[cfg(any(test, feature = "test"))]
16mod test_only;
17
18use std::collections::HashSet;
19use std::fmt::{Debug, Formatter};
20use std::sync::atomic::{AtomicI64, AtomicU64, AtomicUsize, Ordering};
21use std::sync::{Arc, RwLock};
22use std::time::{Duration, Instant};
23
24use api::v1::OpType;
25use datatypes::vectors::Helper;
26use mito_codec::key_values::KeyValue;
27use rayon::prelude::*;
28use snafu::{OptionExt, ResultExt};
29use store_api::metadata::RegionMetadataRef;
30use store_api::storage::ColumnId;
31
32use crate::flush::WriteBufferManagerRef;
33use crate::memtable::bulk::part::BulkPart;
34use crate::memtable::stats::WriteMetrics;
35use crate::memtable::time_series::Series;
36use crate::memtable::{
37    AllocTracker, BoxedBatchIterator, IterBuilder, KeyValues, MemScanMetrics, Memtable, MemtableId,
38    MemtableRange, MemtableRangeContext, MemtableRanges, MemtableRef, MemtableStats, RangesOptions,
39};
40use crate::metrics::MEMTABLE_ACTIVE_SERIES_COUNT;
41use crate::read::Batch;
42use crate::read::dedup::LastNonNullIter;
43use crate::region::options::MergeMode;
44use crate::{error, metrics};
45
46pub struct SimpleBulkMemtable {
47    id: MemtableId,
48    region_metadata: RegionMetadataRef,
49    alloc_tracker: AllocTracker,
50    max_timestamp: AtomicI64,
51    min_timestamp: AtomicI64,
52    max_sequence: AtomicU64,
53    dedup: bool,
54    merge_mode: MergeMode,
55    num_rows: AtomicUsize,
56    series: RwLock<Series>,
57}
58
59impl Drop for SimpleBulkMemtable {
60    fn drop(&mut self) {
61        MEMTABLE_ACTIVE_SERIES_COUNT.dec();
62    }
63}
64
65impl SimpleBulkMemtable {
66    pub fn new(
67        id: MemtableId,
68        region_metadata: RegionMetadataRef,
69        write_buffer_manager: Option<WriteBufferManagerRef>,
70        dedup: bool,
71        merge_mode: MergeMode,
72    ) -> Self {
73        let series = RwLock::new(Series::with_capacity(&region_metadata, 1024, 8192));
74
75        Self {
76            id,
77            region_metadata,
78            alloc_tracker: AllocTracker::new(write_buffer_manager),
79            max_timestamp: AtomicI64::new(i64::MIN),
80            min_timestamp: AtomicI64::new(i64::MAX),
81            max_sequence: AtomicU64::new(0),
82            dedup,
83            merge_mode,
84            num_rows: AtomicUsize::new(0),
85            series,
86        }
87    }
88
89    fn build_projection(&self, projection: Option<&[ColumnId]>) -> HashSet<ColumnId> {
90        if let Some(projection) = projection {
91            projection.iter().copied().collect()
92        } else {
93            self.region_metadata
94                .field_columns()
95                .map(|c| c.column_id)
96                .collect()
97        }
98    }
99
100    fn write_key_value(&self, kv: KeyValue, stats: &mut WriteMetrics) {
101        let ts = kv.timestamp();
102        let sequence = kv.sequence();
103        let op_type = kv.op_type();
104        let mut series = self.series.write().unwrap();
105        let size = series.push(ts, sequence, op_type, kv.fields());
106        stats.value_bytes += size;
107        // safety: timestamp of kv must be both present and a valid timestamp value.
108        let ts = kv
109            .timestamp()
110            .try_into_timestamp()
111            .unwrap()
112            .unwrap()
113            .value();
114        stats.min_ts = stats.min_ts.min(ts);
115        stats.max_ts = stats.max_ts.max(ts);
116    }
117
118    /// Updates memtable stats.
119    fn update_stats(&self, stats: WriteMetrics) {
120        self.alloc_tracker
121            .on_allocation(stats.key_bytes + stats.value_bytes);
122        self.num_rows.fetch_add(stats.num_rows, Ordering::SeqCst);
123        self.max_timestamp.fetch_max(stats.max_ts, Ordering::SeqCst);
124        self.min_timestamp.fetch_min(stats.min_ts, Ordering::SeqCst);
125        self.max_sequence
126            .fetch_max(stats.max_sequence, Ordering::SeqCst);
127    }
128
129    #[cfg(test)]
130    fn schema(&self) -> &RegionMetadataRef {
131        &self.region_metadata
132    }
133}
134
135impl Debug for SimpleBulkMemtable {
136    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
137        f.debug_struct("SimpleBulkMemtable").finish()
138    }
139}
140
141impl Memtable for SimpleBulkMemtable {
142    fn id(&self) -> MemtableId {
143        self.id
144    }
145
146    fn write(&self, kvs: &KeyValues) -> error::Result<()> {
147        let mut stats = WriteMetrics::default();
148        let max_sequence = kvs.max_sequence();
149        for kv in kvs.iter() {
150            self.write_key_value(kv, &mut stats);
151        }
152        stats.max_sequence = max_sequence;
153        stats.num_rows = kvs.num_rows();
154        self.update_stats(stats);
155        Ok(())
156    }
157
158    fn write_one(&self, kv: KeyValue) -> error::Result<()> {
159        debug_assert_eq!(0, kv.num_primary_keys());
160        let mut stats = WriteMetrics::default();
161        self.write_key_value(kv, &mut stats);
162        stats.num_rows = 1;
163        stats.max_sequence = kv.sequence();
164        self.update_stats(stats);
165        Ok(())
166    }
167
168    fn write_bulk(&self, part: BulkPart) -> error::Result<()> {
169        let rb = &part.batch;
170
171        let ts = Helper::try_into_vector(
172            rb.column_by_name(&self.region_metadata.time_index_column().column_schema.name)
173                .with_context(|| error::InvalidRequestSnafu {
174                    region_id: self.region_metadata.region_id,
175                    reason: "Timestamp not found",
176                })?,
177        )
178        .context(error::ConvertVectorSnafu)?;
179
180        let sequence = part.sequence;
181
182        let fields: Vec<_> = self
183            .region_metadata
184            .field_columns()
185            .map(|f| {
186                let array = rb.column_by_name(&f.column_schema.name).ok_or_else(|| {
187                    error::InvalidRequestSnafu {
188                        region_id: self.region_metadata.region_id,
189                        reason: format!("Column {} not found", f.column_schema.name),
190                    }
191                    .build()
192                })?;
193                Helper::try_into_vector(array).context(error::ConvertVectorSnafu)
194            })
195            .collect::<error::Result<Vec<_>>>()?;
196
197        let mut series = self.series.write().unwrap();
198        let extend_timer = metrics::REGION_WORKER_HANDLE_WRITE_ELAPSED
199            .with_label_values(&["bulk_extend"])
200            .start_timer();
201        series.extend(ts, OpType::Put as u8, sequence, fields)?;
202        extend_timer.observe_duration();
203
204        self.update_stats(WriteMetrics {
205            key_bytes: 0,
206            value_bytes: part.estimated_size(),
207            min_ts: part.min_timestamp,
208            max_ts: part.max_timestamp,
209            num_rows: part.num_rows(),
210            max_sequence: sequence,
211        });
212        Ok(())
213    }
214
215    #[cfg(any(test, feature = "test"))]
216    fn iter(
217        &self,
218        projection: Option<&[ColumnId]>,
219        _predicate: Option<table::predicate::Predicate>,
220        sequence: Option<store_api::storage::SequenceRange>,
221    ) -> error::Result<BoxedBatchIterator> {
222        let iter = self.create_iter(projection, sequence)?.build(None)?;
223        if self.merge_mode == MergeMode::LastNonNull {
224            let iter = LastNonNullIter::new(iter);
225            Ok(Box::new(iter))
226        } else {
227            Ok(Box::new(iter))
228        }
229    }
230
231    fn ranges(
232        &self,
233        projection: Option<&[ColumnId]>,
234        options: RangesOptions,
235    ) -> error::Result<MemtableRanges> {
236        let predicate = options.predicate;
237        let sequence = options.sequence;
238        let start_time = Instant::now();
239        let projection = Arc::new(self.build_projection(projection));
240
241        // Use the memtable's overall time range and max sequence for all ranges
242        let max_sequence = self.max_sequence.load(Ordering::Relaxed);
243        let time_range = {
244            let num_rows = self.num_rows.load(Ordering::Relaxed);
245            if num_rows > 0 {
246                let ts_type = self.region_metadata.time_index_type();
247                let max_timestamp =
248                    ts_type.create_timestamp(self.max_timestamp.load(Ordering::Relaxed));
249                let min_timestamp =
250                    ts_type.create_timestamp(self.min_timestamp.load(Ordering::Relaxed));
251                Some((min_timestamp, max_timestamp))
252            } else {
253                None
254            }
255        };
256
257        let values = self.series.read().unwrap().read_to_values();
258        let contexts = values
259            .into_par_iter()
260            .filter_map(|v| {
261                let filtered = match v.to_batch(
262                    &[],
263                    &self.region_metadata,
264                    &projection,
265                    sequence,
266                    self.dedup,
267                    self.merge_mode,
268                ) {
269                    Ok(filtered) => filtered,
270                    Err(e) => {
271                        return Some(Err(e));
272                    }
273                };
274                if filtered.is_empty() {
275                    None
276                } else {
277                    Some(Ok(filtered))
278                }
279            })
280            .map(|result| {
281                result.map(|batch| {
282                    let num_rows = batch.num_rows();
283                    let estimated_bytes = batch.memory_size();
284
285                    let range_stats = MemtableStats {
286                        estimated_bytes,
287                        time_range,
288                        num_rows,
289                        num_ranges: 1,
290                        max_sequence,
291                        series_count: 1,
292                    };
293
294                    let builder = BatchRangeBuilder {
295                        batch,
296                        merge_mode: self.merge_mode,
297                        scan_cost: start_time.elapsed(),
298                    };
299                    (
300                        range_stats,
301                        Arc::new(MemtableRangeContext::new(
302                            self.id,
303                            Box::new(builder),
304                            predicate.clone(),
305                        )),
306                    )
307                })
308            })
309            .collect::<error::Result<Vec<_>>>()?;
310
311        let ranges = contexts
312            .into_iter()
313            .enumerate()
314            .map(|(idx, (range_stats, context))| (idx, MemtableRange::new(context, range_stats)))
315            .collect();
316
317        Ok(MemtableRanges { ranges })
318    }
319
320    fn is_empty(&self) -> bool {
321        self.series.read().unwrap().is_empty()
322    }
323
324    fn freeze(&self) -> error::Result<()> {
325        self.series.write().unwrap().freeze(&self.region_metadata);
326        Ok(())
327    }
328
329    fn stats(&self) -> MemtableStats {
330        let estimated_bytes = self.alloc_tracker.bytes_allocated();
331        let num_rows = self.num_rows.load(Ordering::Relaxed);
332        if num_rows == 0 {
333            // no rows ever written
334            return MemtableStats {
335                estimated_bytes,
336                time_range: None,
337                num_rows: 0,
338                num_ranges: 0,
339                max_sequence: 0,
340                series_count: 0,
341            };
342        }
343        let ts_type = self.region_metadata.time_index_type();
344        let max_timestamp = ts_type.create_timestamp(self.max_timestamp.load(Ordering::Relaxed));
345        let min_timestamp = ts_type.create_timestamp(self.min_timestamp.load(Ordering::Relaxed));
346        MemtableStats {
347            estimated_bytes,
348            time_range: Some((min_timestamp, max_timestamp)),
349            num_rows,
350            num_ranges: 1,
351            max_sequence: self.max_sequence.load(Ordering::Relaxed),
352            series_count: 1,
353        }
354    }
355
356    fn fork(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef {
357        Arc::new(Self::new(
358            id,
359            metadata.clone(),
360            self.alloc_tracker.write_buffer_manager(),
361            self.dedup,
362            self.merge_mode,
363        ))
364    }
365}
366
367#[derive(Clone)]
368pub struct BatchRangeBuilder {
369    pub batch: Batch,
370    pub merge_mode: MergeMode,
371    scan_cost: Duration,
372}
373
374impl IterBuilder for BatchRangeBuilder {
375    fn build(&self, metrics: Option<MemScanMetrics>) -> error::Result<BoxedBatchIterator> {
376        let batch = self.batch.clone();
377        if let Some(metrics) = metrics {
378            let inner = crate::memtable::MemScanMetricsData {
379                total_series: 1,
380                num_rows: batch.num_rows(),
381                num_batches: 1,
382                scan_cost: self.scan_cost,
383            };
384            metrics.merge_inner(&inner);
385        }
386
387        let iter = Iter {
388            batch: Some(Ok(batch)),
389        };
390
391        if self.merge_mode == MergeMode::LastNonNull {
392            Ok(Box::new(LastNonNullIter::new(iter)))
393        } else {
394            Ok(Box::new(iter))
395        }
396    }
397}
398
399struct Iter {
400    batch: Option<error::Result<Batch>>,
401}
402
403impl Iterator for Iter {
404    type Item = error::Result<Batch>;
405
406    fn next(&mut self) -> Option<Self::Item> {
407        self.batch.take()
408    }
409}
410
411#[cfg(test)]
412mod tests {
413    use std::sync::Arc;
414
415    use api::v1::helper::row;
416    use api::v1::value::ValueData;
417    use api::v1::{Mutation, OpType, Rows, SemanticType};
418    use common_recordbatch::DfRecordBatch;
419    use common_time::Timestamp;
420    use datatypes::arrow::array::{ArrayRef, Float64Array, RecordBatch, TimestampMillisecondArray};
421    use datatypes::arrow_array::StringArray;
422    use datatypes::data_type::ConcreteDataType;
423    use datatypes::prelude::{ScalarVector, Vector};
424    use datatypes::schema::ColumnSchema;
425    use datatypes::value::Value;
426    use datatypes::vectors::TimestampMillisecondVector;
427    use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
428    use store_api::storage::{RegionId, SequenceNumber, SequenceRange};
429
430    use super::*;
431    use crate::read;
432    use crate::read::dedup::DedupReader;
433    use crate::read::merge::MergeReaderBuilder;
434    use crate::read::{BatchReader, Source};
435    use crate::region::options::MergeMode;
436    use crate::test_util::column_metadata_to_column_schema;
437
438    fn new_test_metadata() -> RegionMetadataRef {
439        let mut builder = RegionMetadataBuilder::new(1.into());
440        builder
441            .push_column_metadata(ColumnMetadata {
442                column_schema: ColumnSchema::new(
443                    "ts",
444                    ConcreteDataType::timestamp_millisecond_datatype(),
445                    false,
446                ),
447                semantic_type: SemanticType::Timestamp,
448                column_id: 1,
449            })
450            .push_column_metadata(ColumnMetadata {
451                column_schema: ColumnSchema::new("f1", ConcreteDataType::float64_datatype(), true),
452                semantic_type: SemanticType::Field,
453                column_id: 2,
454            })
455            .push_column_metadata(ColumnMetadata {
456                column_schema: ColumnSchema::new("f2", ConcreteDataType::string_datatype(), true),
457                semantic_type: SemanticType::Field,
458                column_id: 3,
459            });
460        Arc::new(builder.build().unwrap())
461    }
462
463    fn new_test_memtable(dedup: bool, merge_mode: MergeMode) -> SimpleBulkMemtable {
464        SimpleBulkMemtable::new(1, new_test_metadata(), None, dedup, merge_mode)
465    }
466
467    fn build_key_values(
468        metadata: &RegionMetadataRef,
469        sequence: SequenceNumber,
470        row_values: &[(i64, f64, String)],
471        op_type: OpType,
472    ) -> KeyValues {
473        let column_schemas: Vec<_> = metadata
474            .column_metadatas
475            .iter()
476            .map(column_metadata_to_column_schema)
477            .collect();
478
479        let rows: Vec<_> = row_values
480            .iter()
481            .map(|(ts, f1, f2)| {
482                row(vec![
483                    ValueData::TimestampMillisecondValue(*ts),
484                    ValueData::F64Value(*f1),
485                    ValueData::StringValue(f2.clone()),
486                ])
487            })
488            .collect();
489        let mutation = Mutation {
490            op_type: op_type as i32,
491            sequence,
492            rows: Some(Rows {
493                schema: column_schemas,
494                rows,
495            }),
496            write_hint: None,
497        };
498        KeyValues::new(metadata, mutation).unwrap()
499    }
500
501    #[test]
502    fn test_write_and_iter() {
503        let memtable = new_test_memtable(false, MergeMode::LastRow);
504        memtable
505            .write(&build_key_values(
506                &memtable.region_metadata,
507                0,
508                &[(1, 1.0, "a".to_string())],
509                OpType::Put,
510            ))
511            .unwrap();
512        memtable
513            .write(&build_key_values(
514                &memtable.region_metadata,
515                1,
516                &[(2, 2.0, "b".to_string())],
517                OpType::Put,
518            ))
519            .unwrap();
520
521        let mut iter = memtable.iter(None, None, None).unwrap();
522        let batch = iter.next().unwrap().unwrap();
523        assert_eq!(2, batch.num_rows());
524        assert_eq!(2, batch.fields().len());
525        let ts_v = batch
526            .timestamps()
527            .as_any()
528            .downcast_ref::<TimestampMillisecondVector>()
529            .unwrap();
530        assert_eq!(Value::Timestamp(Timestamp::new_millisecond(1)), ts_v.get(0));
531        assert_eq!(Value::Timestamp(Timestamp::new_millisecond(2)), ts_v.get(1));
532    }
533
534    #[test]
535    fn test_projection() {
536        let memtable = new_test_memtable(false, MergeMode::LastRow);
537        memtable
538            .write(&build_key_values(
539                &memtable.region_metadata,
540                0,
541                &[(1, 1.0, "a".to_string())],
542                OpType::Put,
543            ))
544            .unwrap();
545
546        let mut iter = memtable.iter(None, None, None).unwrap();
547        let batch = iter.next().unwrap().unwrap();
548        assert_eq!(1, batch.num_rows());
549        assert_eq!(2, batch.fields().len());
550
551        let ts_v = batch
552            .timestamps()
553            .as_any()
554            .downcast_ref::<TimestampMillisecondVector>()
555            .unwrap();
556        assert_eq!(Value::Timestamp(Timestamp::new_millisecond(1)), ts_v.get(0));
557
558        // Only project column 2 (f1)
559        let projection = vec![2];
560        let mut iter = memtable.iter(Some(&projection), None, None).unwrap();
561        let batch = iter.next().unwrap().unwrap();
562
563        assert_eq!(1, batch.num_rows());
564        assert_eq!(1, batch.fields().len()); // only f1
565        assert_eq!(2, batch.fields()[0].column_id);
566    }
567
568    #[test]
569    fn test_dedup() {
570        let memtable = new_test_memtable(true, MergeMode::LastRow);
571        memtable
572            .write(&build_key_values(
573                &memtable.region_metadata,
574                0,
575                &[(1, 1.0, "a".to_string())],
576                OpType::Put,
577            ))
578            .unwrap();
579        memtable
580            .write(&build_key_values(
581                &memtable.region_metadata,
582                1,
583                &[(1, 2.0, "b".to_string())],
584                OpType::Put,
585            ))
586            .unwrap();
587        let mut iter = memtable.iter(None, None, None).unwrap();
588        let batch = iter.next().unwrap().unwrap();
589
590        assert_eq!(1, batch.num_rows()); // deduped to 1 row
591        assert_eq!(2.0, batch.fields()[0].data.get(0).as_f64_lossy().unwrap()); // last write wins
592    }
593
594    #[test]
595    fn test_write_one() {
596        let memtable = new_test_memtable(false, MergeMode::LastRow);
597        let kvs = build_key_values(
598            &memtable.region_metadata,
599            0,
600            &[(1, 1.0, "a".to_string())],
601            OpType::Put,
602        );
603        let kv = kvs.iter().next().unwrap();
604        memtable.write_one(kv).unwrap();
605
606        let mut iter = memtable.iter(None, None, None).unwrap();
607        let batch = iter.next().unwrap().unwrap();
608        assert_eq!(1, batch.num_rows());
609    }
610
611    #[tokio::test]
612    async fn test_write_dedup() {
613        let memtable = new_test_memtable(true, MergeMode::LastRow);
614        let kvs = build_key_values(
615            &memtable.region_metadata,
616            0,
617            &[(1, 1.0, "a".to_string())],
618            OpType::Put,
619        );
620        let kv = kvs.iter().next().unwrap();
621        memtable.write_one(kv).unwrap();
622        memtable.freeze().unwrap();
623
624        let kvs = build_key_values(
625            &memtable.region_metadata,
626            1,
627            &[(1, 1.0, "a".to_string())],
628            OpType::Delete,
629        );
630        let kv = kvs.iter().next().unwrap();
631        memtable.write_one(kv).unwrap();
632
633        let ranges = memtable.ranges(None, RangesOptions::default()).unwrap();
634        let mut source = vec![];
635        for r in ranges.ranges.values() {
636            source.push(Source::Iter(r.build_iter().unwrap()));
637        }
638
639        let reader = MergeReaderBuilder::from_sources(source)
640            .build()
641            .await
642            .unwrap();
643
644        let mut reader = DedupReader::new(reader, read::dedup::LastRow::new(false), None);
645        let mut num_rows = 0;
646        while let Some(b) = reader.next_batch().await.unwrap() {
647            num_rows += b.num_rows();
648        }
649        assert_eq!(num_rows, 1);
650    }
651
652    #[tokio::test]
653    async fn test_delete_only() {
654        let memtable = new_test_memtable(true, MergeMode::LastRow);
655        let kvs = build_key_values(
656            &memtable.region_metadata,
657            0,
658            &[(1, 1.0, "a".to_string())],
659            OpType::Delete,
660        );
661        let kv = kvs.iter().next().unwrap();
662        memtable.write_one(kv).unwrap();
663        memtable.freeze().unwrap();
664
665        let ranges = memtable.ranges(None, RangesOptions::default()).unwrap();
666        let mut source = vec![];
667        for r in ranges.ranges.values() {
668            source.push(Source::Iter(r.build_iter().unwrap()));
669        }
670
671        let reader = MergeReaderBuilder::from_sources(source)
672            .build()
673            .await
674            .unwrap();
675
676        let mut reader = DedupReader::new(reader, read::dedup::LastRow::new(false), None);
677        let mut num_rows = 0;
678        while let Some(b) = reader.next_batch().await.unwrap() {
679            num_rows += b.num_rows();
680            assert_eq!(b.num_rows(), 1);
681            assert_eq!(b.op_types().get_data(0).unwrap(), OpType::Delete as u8);
682        }
683        assert_eq!(num_rows, 1);
684    }
685
686    #[tokio::test]
687    async fn test_single_range() {
688        let memtable = new_test_memtable(true, MergeMode::LastRow);
689        let kvs = build_key_values(
690            &memtable.region_metadata,
691            0,
692            &[(1, 1.0, "a".to_string())],
693            OpType::Put,
694        );
695        memtable.write_one(kvs.iter().next().unwrap()).unwrap();
696
697        let kvs = build_key_values(
698            &memtable.region_metadata,
699            1,
700            &[(1, 2.0, "b".to_string())],
701            OpType::Put,
702        );
703        memtable.write_one(kvs.iter().next().unwrap()).unwrap();
704        memtable.freeze().unwrap();
705
706        let ranges = memtable.ranges(None, RangesOptions::default()).unwrap();
707        assert_eq!(ranges.ranges.len(), 1);
708        let range = ranges.ranges.into_values().next().unwrap();
709        let mut reader = range.context.builder.build(None).unwrap();
710
711        let mut num_rows = 0;
712        while let Some(b) = reader.next().transpose().unwrap() {
713            num_rows += b.num_rows();
714            assert_eq!(b.fields()[1].data.get(0).as_string(), Some("b".to_string()));
715        }
716        assert_eq!(num_rows, 1);
717    }
718
719    #[test]
720    fn test_write_bulk() {
721        let memtable = new_test_memtable(false, MergeMode::LastRow);
722        let arrow_schema = memtable.schema().schema.arrow_schema().clone();
723        let arrays = vec![
724            Arc::new(TimestampMillisecondArray::from(vec![1, 2])) as ArrayRef,
725            Arc::new(Float64Array::from(vec![1.0, 2.0])) as ArrayRef,
726            Arc::new(StringArray::from(vec!["a", "b"])) as ArrayRef,
727        ];
728        let rb = DfRecordBatch::try_new(arrow_schema, arrays).unwrap();
729
730        let part = BulkPart {
731            batch: rb,
732            sequence: 1,
733            min_timestamp: 1,
734            max_timestamp: 2,
735            timestamp_index: 0,
736            raw_data: None,
737        };
738        memtable.write_bulk(part).unwrap();
739
740        let mut iter = memtable.iter(None, None, None).unwrap();
741        let batch = iter.next().unwrap().unwrap();
742        assert_eq!(2, batch.num_rows());
743
744        let stats = memtable.stats();
745        assert_eq!(1, stats.max_sequence);
746        assert_eq!(2, stats.num_rows);
747        assert_eq!(
748            Some((Timestamp::new_millisecond(1), Timestamp::new_millisecond(2))),
749            stats.time_range
750        );
751
752        let kvs = build_key_values(
753            &memtable.region_metadata,
754            2,
755            &[(3, 3.0, "c".to_string())],
756            OpType::Put,
757        );
758        memtable.write(&kvs).unwrap();
759        let mut iter = memtable.iter(None, None, None).unwrap();
760        let batch = iter.next().unwrap().unwrap();
761        assert_eq!(3, batch.num_rows());
762        assert_eq!(
763            vec![1, 2, 3],
764            batch
765                .timestamps()
766                .as_any()
767                .downcast_ref::<TimestampMillisecondVector>()
768                .unwrap()
769                .iter_data()
770                .map(|t| { t.unwrap().0.value() })
771                .collect::<Vec<_>>()
772        );
773    }
774
775    #[test]
776    fn test_is_empty() {
777        let memtable = new_test_memtable(false, MergeMode::LastRow);
778        assert!(memtable.is_empty());
779
780        memtable
781            .write(&build_key_values(
782                &memtable.region_metadata,
783                0,
784                &[(1, 1.0, "a".to_string())],
785                OpType::Put,
786            ))
787            .unwrap();
788        assert!(!memtable.is_empty());
789    }
790
791    #[test]
792    fn test_stats() {
793        let memtable = new_test_memtable(false, MergeMode::LastRow);
794        let stats = memtable.stats();
795        assert_eq!(0, stats.num_rows);
796        assert!(stats.time_range.is_none());
797
798        memtable
799            .write(&build_key_values(
800                &memtable.region_metadata,
801                0,
802                &[(1, 1.0, "a".to_string())],
803                OpType::Put,
804            ))
805            .unwrap();
806        let stats = memtable.stats();
807        assert_eq!(1, stats.num_rows);
808        assert!(stats.time_range.is_some());
809    }
810
811    #[test]
812    fn test_fork() {
813        let memtable = new_test_memtable(false, MergeMode::LastRow);
814        memtable
815            .write(&build_key_values(
816                &memtable.region_metadata,
817                0,
818                &[(1, 1.0, "a".to_string())],
819                OpType::Put,
820            ))
821            .unwrap();
822
823        let forked = memtable.fork(2, &memtable.region_metadata);
824        assert!(forked.is_empty());
825    }
826
827    #[test]
828    fn test_sequence_filter() {
829        let memtable = new_test_memtable(false, MergeMode::LastRow);
830        memtable
831            .write(&build_key_values(
832                &memtable.region_metadata,
833                0,
834                &[(1, 1.0, "a".to_string())],
835                OpType::Put,
836            ))
837            .unwrap();
838        memtable
839            .write(&build_key_values(
840                &memtable.region_metadata,
841                1,
842                &[(2, 2.0, "b".to_string())],
843                OpType::Put,
844            ))
845            .unwrap();
846
847        // Filter with sequence 0 should only return first write
848        let mut iter = memtable
849            .iter(None, None, Some(SequenceRange::LtEq { max: 0 }))
850            .unwrap();
851        let batch = iter.next().unwrap().unwrap();
852        assert_eq!(1, batch.num_rows());
853        assert_eq!(1.0, batch.fields()[0].data.get(0).as_f64_lossy().unwrap());
854    }
855
856    fn rb_with_large_string(
857        ts: i64,
858        string_len: i32,
859        region_meta: &RegionMetadataRef,
860    ) -> RecordBatch {
861        let schema = region_meta.schema.arrow_schema().clone();
862        RecordBatch::try_new(
863            schema,
864            vec![
865                Arc::new(StringArray::from_iter_values(
866                    ["a".repeat(string_len as usize).clone()].into_iter(),
867                )) as ArrayRef,
868                Arc::new(TimestampMillisecondArray::from_iter_values(
869                    [ts].into_iter(),
870                )) as ArrayRef,
871            ],
872        )
873        .unwrap()
874    }
875
876    #[tokio::test]
877    async fn test_write_read_large_string() {
878        let mut builder = RegionMetadataBuilder::new(RegionId::new(123, 456));
879        builder
880            .push_column_metadata(ColumnMetadata {
881                column_schema: ColumnSchema::new("k0", ConcreteDataType::string_datatype(), false),
882                semantic_type: SemanticType::Field,
883                column_id: 0,
884            })
885            .push_column_metadata(ColumnMetadata {
886                column_schema: ColumnSchema::new(
887                    "ts",
888                    ConcreteDataType::timestamp_millisecond_datatype(),
889                    false,
890                ),
891                semantic_type: SemanticType::Timestamp,
892                column_id: 1,
893            })
894            .primary_key(vec![]);
895        let region_meta = Arc::new(builder.build().unwrap());
896        let memtable =
897            SimpleBulkMemtable::new(0, region_meta.clone(), None, true, MergeMode::LastRow);
898        memtable
899            .write_bulk(BulkPart {
900                batch: rb_with_large_string(0, i32::MAX, &region_meta),
901                max_timestamp: 0,
902                min_timestamp: 0,
903                sequence: 0,
904                timestamp_index: 1,
905                raw_data: None,
906            })
907            .unwrap();
908
909        memtable.freeze().unwrap();
910        memtable
911            .write_bulk(BulkPart {
912                batch: rb_with_large_string(1, 3, &region_meta),
913                max_timestamp: 1,
914                min_timestamp: 1,
915                sequence: 1,
916                timestamp_index: 1,
917                raw_data: None,
918            })
919            .unwrap();
920        let MemtableRanges { ranges, .. } =
921            memtable.ranges(None, RangesOptions::default()).unwrap();
922        let mut source = if ranges.len() == 1 {
923            let only_range = ranges.into_values().next().unwrap();
924            Source::Iter(only_range.build_iter().unwrap())
925        } else {
926            let sources = ranges
927                .into_values()
928                .map(|r| r.build_iter().map(Source::Iter))
929                .collect::<error::Result<Vec<_>>>()
930                .unwrap();
931            let merge_reader = MergeReaderBuilder::from_sources(sources)
932                .build()
933                .await
934                .unwrap();
935            Source::Reader(Box::new(merge_reader))
936        };
937
938        let mut rows = 0;
939        while let Some(b) = source.next_batch().await.unwrap() {
940            rows += b.num_rows();
941        }
942        assert_eq!(rows, 2);
943    }
944}