mito2/memtable/
time_series.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::btree_map::Entry;
16use std::collections::{BTreeMap, Bound, HashSet};
17use std::fmt::{Debug, Formatter};
18use std::sync::atomic::{AtomicI64, AtomicU64, AtomicUsize, Ordering};
19use std::sync::{Arc, RwLock};
20use std::time::{Duration, Instant};
21
22use api::v1::OpType;
23use common_recordbatch::filter::SimpleFilterEvaluator;
24use common_telemetry::{debug, error};
25use common_time::Timestamp;
26use datatypes::arrow;
27use datatypes::arrow::array::ArrayRef;
28use datatypes::data_type::{ConcreteDataType, DataType};
29use datatypes::prelude::{MutableVector, Vector, VectorRef};
30use datatypes::types::TimestampType;
31use datatypes::value::{Value, ValueRef};
32use datatypes::vectors::{
33    Helper, TimestampMicrosecondVector, TimestampMillisecondVector, TimestampNanosecondVector,
34    TimestampSecondVector, UInt64Vector, UInt8Vector,
35};
36use snafu::{ensure, ResultExt};
37use store_api::metadata::RegionMetadataRef;
38use store_api::storage::{ColumnId, SequenceNumber};
39use table::predicate::Predicate;
40
41use crate::error::{
42    ComputeArrowSnafu, ConvertVectorSnafu, PrimaryKeyLengthMismatchSnafu, Result,
43    UnsupportedOperationSnafu,
44};
45use crate::flush::WriteBufferManagerRef;
46use crate::memtable::key_values::KeyValue;
47use crate::memtable::stats::WriteMetrics;
48use crate::memtable::{
49    AllocTracker, BoxedBatchIterator, BulkPart, IterBuilder, KeyValues, Memtable, MemtableBuilder,
50    MemtableId, MemtableRange, MemtableRangeContext, MemtableRanges, MemtableRef, MemtableStats,
51    PredicateGroup,
52};
53use crate::metrics::{READ_ROWS_TOTAL, READ_STAGE_ELAPSED};
54use crate::read::dedup::LastNonNullIter;
55use crate::read::{Batch, BatchBuilder, BatchColumn};
56use crate::region::options::MergeMode;
57use crate::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt};
58
59/// Initial vector builder capacity.
60const INITIAL_BUILDER_CAPACITY: usize = 16;
61
62/// Vector builder capacity.
63const BUILDER_CAPACITY: usize = 512;
64
65/// Builder to build [TimeSeriesMemtable].
66#[derive(Debug, Default)]
67pub struct TimeSeriesMemtableBuilder {
68    write_buffer_manager: Option<WriteBufferManagerRef>,
69    dedup: bool,
70    merge_mode: MergeMode,
71}
72
73impl TimeSeriesMemtableBuilder {
74    /// Creates a new builder with specific `write_buffer_manager`.
75    pub fn new(
76        write_buffer_manager: Option<WriteBufferManagerRef>,
77        dedup: bool,
78        merge_mode: MergeMode,
79    ) -> Self {
80        Self {
81            write_buffer_manager,
82            dedup,
83            merge_mode,
84        }
85    }
86}
87
88impl MemtableBuilder for TimeSeriesMemtableBuilder {
89    fn build(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef {
90        Arc::new(TimeSeriesMemtable::new(
91            metadata.clone(),
92            id,
93            self.write_buffer_manager.clone(),
94            self.dedup,
95            self.merge_mode,
96        ))
97    }
98}
99
100/// Memtable implementation that groups rows by their primary key.
101pub struct TimeSeriesMemtable {
102    id: MemtableId,
103    region_metadata: RegionMetadataRef,
104    row_codec: Arc<DensePrimaryKeyCodec>,
105    series_set: SeriesSet,
106    alloc_tracker: AllocTracker,
107    max_timestamp: AtomicI64,
108    min_timestamp: AtomicI64,
109    max_sequence: AtomicU64,
110    dedup: bool,
111    merge_mode: MergeMode,
112    /// Total written rows in memtable. This also includes deleted and duplicated rows.
113    num_rows: AtomicUsize,
114}
115
116impl TimeSeriesMemtable {
117    pub fn new(
118        region_metadata: RegionMetadataRef,
119        id: MemtableId,
120        write_buffer_manager: Option<WriteBufferManagerRef>,
121        dedup: bool,
122        merge_mode: MergeMode,
123    ) -> Self {
124        let row_codec = Arc::new(DensePrimaryKeyCodec::new(&region_metadata));
125        let series_set = SeriesSet::new(region_metadata.clone(), row_codec.clone());
126        let dedup = if merge_mode == MergeMode::LastNonNull {
127            false
128        } else {
129            dedup
130        };
131        Self {
132            id,
133            region_metadata,
134            series_set,
135            row_codec,
136            alloc_tracker: AllocTracker::new(write_buffer_manager),
137            max_timestamp: AtomicI64::new(i64::MIN),
138            min_timestamp: AtomicI64::new(i64::MAX),
139            max_sequence: AtomicU64::new(0),
140            dedup,
141            merge_mode,
142            num_rows: Default::default(),
143        }
144    }
145
146    /// Updates memtable stats.
147    fn update_stats(&self, stats: WriteMetrics) {
148        self.alloc_tracker
149            .on_allocation(stats.key_bytes + stats.value_bytes);
150        self.max_timestamp.fetch_max(stats.max_ts, Ordering::SeqCst);
151        self.min_timestamp.fetch_min(stats.min_ts, Ordering::SeqCst);
152    }
153
154    fn write_key_value(&self, kv: KeyValue, stats: &mut WriteMetrics) -> Result<()> {
155        ensure!(
156            self.row_codec.num_fields() == kv.num_primary_keys(),
157            PrimaryKeyLengthMismatchSnafu {
158                expect: self.row_codec.num_fields(),
159                actual: kv.num_primary_keys(),
160            }
161        );
162
163        let primary_key_encoded = self.row_codec.encode(kv.primary_keys())?;
164
165        let (key_allocated, value_allocated) =
166            self.series_set.push_to_series(primary_key_encoded, &kv);
167        stats.key_bytes += key_allocated;
168        stats.value_bytes += value_allocated;
169
170        // safety: timestamp of kv must be both present and a valid timestamp value.
171        let ts = kv.timestamp().as_timestamp().unwrap().unwrap().value();
172        stats.min_ts = stats.min_ts.min(ts);
173        stats.max_ts = stats.max_ts.max(ts);
174        Ok(())
175    }
176}
177
178impl Debug for TimeSeriesMemtable {
179    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
180        f.debug_struct("TimeSeriesMemtable").finish()
181    }
182}
183
184impl Memtable for TimeSeriesMemtable {
185    fn id(&self) -> MemtableId {
186        self.id
187    }
188
189    fn write(&self, kvs: &KeyValues) -> Result<()> {
190        if kvs.is_empty() {
191            return Ok(());
192        }
193
194        let mut local_stats = WriteMetrics::default();
195
196        for kv in kvs.iter() {
197            self.write_key_value(kv, &mut local_stats)?;
198        }
199        local_stats.value_bytes += kvs.num_rows() * std::mem::size_of::<Timestamp>();
200        local_stats.value_bytes += kvs.num_rows() * std::mem::size_of::<OpType>();
201
202        // TODO(hl): this maybe inaccurate since for-iteration may return early.
203        // We may lift the primary key length check out of Memtable::write
204        // so that we can ensure writing to memtable will succeed.
205        self.update_stats(local_stats);
206
207        // update max_sequence
208        let sequence = kvs.max_sequence();
209        self.max_sequence.fetch_max(sequence, Ordering::Relaxed);
210
211        self.num_rows.fetch_add(kvs.num_rows(), Ordering::Relaxed);
212        Ok(())
213    }
214
215    fn write_one(&self, key_value: KeyValue) -> Result<()> {
216        let mut metrics = WriteMetrics::default();
217        let res = self.write_key_value(key_value, &mut metrics);
218        metrics.value_bytes += std::mem::size_of::<Timestamp>() + std::mem::size_of::<OpType>();
219
220        self.update_stats(metrics);
221
222        // update max_sequence
223        if res.is_ok() {
224            self.max_sequence
225                .fetch_max(key_value.sequence(), Ordering::Relaxed);
226        }
227
228        self.num_rows.fetch_add(1, Ordering::Relaxed);
229        res
230    }
231
232    fn write_bulk(&self, _part: BulkPart) -> Result<()> {
233        UnsupportedOperationSnafu {
234            err_msg: "TimeSeriesMemtable does not support write_bulk",
235        }
236        .fail()
237    }
238
239    fn iter(
240        &self,
241        projection: Option<&[ColumnId]>,
242        filters: Option<Predicate>,
243        sequence: Option<SequenceNumber>,
244    ) -> Result<BoxedBatchIterator> {
245        let projection = if let Some(projection) = projection {
246            projection.iter().copied().collect()
247        } else {
248            self.region_metadata
249                .field_columns()
250                .map(|c| c.column_id)
251                .collect()
252        };
253
254        let iter = self
255            .series_set
256            .iter_series(projection, filters, self.dedup, sequence)?;
257
258        if self.merge_mode == MergeMode::LastNonNull {
259            let iter = LastNonNullIter::new(iter);
260            Ok(Box::new(iter))
261        } else {
262            Ok(Box::new(iter))
263        }
264    }
265
266    fn ranges(
267        &self,
268        projection: Option<&[ColumnId]>,
269        predicate: PredicateGroup,
270        sequence: Option<SequenceNumber>,
271    ) -> MemtableRanges {
272        let projection = if let Some(projection) = projection {
273            projection.iter().copied().collect()
274        } else {
275            self.region_metadata
276                .field_columns()
277                .map(|c| c.column_id)
278                .collect()
279        };
280        let builder = Box::new(TimeSeriesIterBuilder {
281            series_set: self.series_set.clone(),
282            projection,
283            predicate: predicate.predicate().cloned(),
284            dedup: self.dedup,
285            merge_mode: self.merge_mode,
286            sequence,
287        });
288        let context = Arc::new(MemtableRangeContext::new(self.id, builder, predicate));
289
290        MemtableRanges {
291            ranges: [(0, MemtableRange::new(context))].into(),
292            stats: self.stats(),
293        }
294    }
295
296    fn is_empty(&self) -> bool {
297        self.series_set.series.read().unwrap().is_empty()
298    }
299
300    fn freeze(&self) -> Result<()> {
301        self.alloc_tracker.done_allocating();
302
303        Ok(())
304    }
305
306    fn stats(&self) -> MemtableStats {
307        let estimated_bytes = self.alloc_tracker.bytes_allocated();
308
309        if estimated_bytes == 0 {
310            // no rows ever written
311            return MemtableStats {
312                estimated_bytes,
313                time_range: None,
314                num_rows: 0,
315                num_ranges: 0,
316                max_sequence: 0,
317            };
318        }
319        let ts_type = self
320            .region_metadata
321            .time_index_column()
322            .column_schema
323            .data_type
324            .clone()
325            .as_timestamp()
326            .expect("Timestamp column must have timestamp type");
327        let max_timestamp = ts_type.create_timestamp(self.max_timestamp.load(Ordering::Relaxed));
328        let min_timestamp = ts_type.create_timestamp(self.min_timestamp.load(Ordering::Relaxed));
329        MemtableStats {
330            estimated_bytes,
331            time_range: Some((min_timestamp, max_timestamp)),
332            num_rows: self.num_rows.load(Ordering::Relaxed),
333            num_ranges: 1,
334            max_sequence: self.max_sequence.load(Ordering::Relaxed),
335        }
336    }
337
338    fn fork(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef {
339        Arc::new(TimeSeriesMemtable::new(
340            metadata.clone(),
341            id,
342            self.alloc_tracker.write_buffer_manager(),
343            self.dedup,
344            self.merge_mode,
345        ))
346    }
347}
348
349type SeriesRwLockMap = RwLock<BTreeMap<Vec<u8>, Arc<RwLock<Series>>>>;
350
351#[derive(Clone)]
352struct SeriesSet {
353    region_metadata: RegionMetadataRef,
354    series: Arc<SeriesRwLockMap>,
355    codec: Arc<DensePrimaryKeyCodec>,
356}
357
358impl SeriesSet {
359    fn new(region_metadata: RegionMetadataRef, codec: Arc<DensePrimaryKeyCodec>) -> Self {
360        Self {
361            region_metadata,
362            series: Default::default(),
363            codec,
364        }
365    }
366}
367
368impl SeriesSet {
369    /// Push [KeyValue] to SeriesSet with given primary key and return key/value allocated memory size.
370    fn push_to_series(&self, primary_key: Vec<u8>, kv: &KeyValue) -> (usize, usize) {
371        if let Some(series) = self.series.read().unwrap().get(&primary_key) {
372            let value_allocated = series.write().unwrap().push(
373                kv.timestamp(),
374                kv.sequence(),
375                kv.op_type(),
376                kv.fields(),
377            );
378            return (0, value_allocated);
379        };
380
381        let mut indices = self.series.write().unwrap();
382        match indices.entry(primary_key) {
383            Entry::Vacant(v) => {
384                let key_len = v.key().len();
385                let mut series = Series::new(&self.region_metadata);
386                let value_allocated =
387                    series.push(kv.timestamp(), kv.sequence(), kv.op_type(), kv.fields());
388                v.insert(Arc::new(RwLock::new(series)));
389                (key_len, value_allocated)
390            }
391            // safety: series must exist at given index.
392            Entry::Occupied(v) => {
393                let value_allocated = v.get().write().unwrap().push(
394                    kv.timestamp(),
395                    kv.sequence(),
396                    kv.op_type(),
397                    kv.fields(),
398                );
399                (0, value_allocated)
400            }
401        }
402    }
403
404    #[cfg(test)]
405    fn get_series(&self, primary_key: &[u8]) -> Option<Arc<RwLock<Series>>> {
406        self.series.read().unwrap().get(primary_key).cloned()
407    }
408
409    /// Iterates all series in [SeriesSet].
410    fn iter_series(
411        &self,
412        projection: HashSet<ColumnId>,
413        predicate: Option<Predicate>,
414        dedup: bool,
415        sequence: Option<SequenceNumber>,
416    ) -> Result<Iter> {
417        let primary_key_schema = primary_key_schema(&self.region_metadata);
418        let primary_key_datatypes = self
419            .region_metadata
420            .primary_key_columns()
421            .map(|pk| pk.column_schema.data_type.clone())
422            .collect();
423
424        Iter::try_new(
425            self.region_metadata.clone(),
426            self.series.clone(),
427            projection,
428            predicate,
429            primary_key_schema,
430            primary_key_datatypes,
431            self.codec.clone(),
432            dedup,
433            sequence,
434        )
435    }
436}
437
438/// Creates an arrow [SchemaRef](arrow::datatypes::SchemaRef) that only contains primary keys
439/// of given region schema
440pub(crate) fn primary_key_schema(
441    region_metadata: &RegionMetadataRef,
442) -> arrow::datatypes::SchemaRef {
443    let fields = region_metadata
444        .primary_key_columns()
445        .map(|pk| {
446            arrow::datatypes::Field::new(
447                pk.column_schema.name.clone(),
448                pk.column_schema.data_type.as_arrow_type(),
449                pk.column_schema.is_nullable(),
450            )
451        })
452        .collect::<Vec<_>>();
453    Arc::new(arrow::datatypes::Schema::new(fields))
454}
455
456/// Metrics for reading the memtable.
457#[derive(Debug, Default)]
458struct Metrics {
459    /// Total series in the memtable.
460    total_series: usize,
461    /// Number of series pruned.
462    num_pruned_series: usize,
463    /// Number of rows read.
464    num_rows: usize,
465    /// Number of batch read.
466    num_batches: usize,
467    /// Duration to scan the memtable.
468    scan_cost: Duration,
469}
470
471struct Iter {
472    metadata: RegionMetadataRef,
473    series: Arc<SeriesRwLockMap>,
474    projection: HashSet<ColumnId>,
475    last_key: Option<Vec<u8>>,
476    predicate: Vec<SimpleFilterEvaluator>,
477    pk_schema: arrow::datatypes::SchemaRef,
478    pk_datatypes: Vec<ConcreteDataType>,
479    codec: Arc<DensePrimaryKeyCodec>,
480    dedup: bool,
481    sequence: Option<SequenceNumber>,
482    metrics: Metrics,
483}
484
485impl Iter {
486    #[allow(clippy::too_many_arguments)]
487    pub(crate) fn try_new(
488        metadata: RegionMetadataRef,
489        series: Arc<SeriesRwLockMap>,
490        projection: HashSet<ColumnId>,
491        predicate: Option<Predicate>,
492        pk_schema: arrow::datatypes::SchemaRef,
493        pk_datatypes: Vec<ConcreteDataType>,
494        codec: Arc<DensePrimaryKeyCodec>,
495        dedup: bool,
496        sequence: Option<SequenceNumber>,
497    ) -> Result<Self> {
498        let predicate = predicate
499            .map(|predicate| {
500                predicate
501                    .exprs()
502                    .iter()
503                    .filter_map(SimpleFilterEvaluator::try_new)
504                    .collect::<Vec<_>>()
505            })
506            .unwrap_or_default();
507        Ok(Self {
508            metadata,
509            series,
510            projection,
511            last_key: None,
512            predicate,
513            pk_schema,
514            pk_datatypes,
515            codec,
516            dedup,
517            sequence,
518            metrics: Metrics::default(),
519        })
520    }
521}
522
523impl Drop for Iter {
524    fn drop(&mut self) {
525        debug!(
526            "Iter {} time series memtable, metrics: {:?}",
527            self.metadata.region_id, self.metrics
528        );
529
530        READ_ROWS_TOTAL
531            .with_label_values(&["time_series_memtable"])
532            .inc_by(self.metrics.num_rows as u64);
533        READ_STAGE_ELAPSED
534            .with_label_values(&["scan_memtable"])
535            .observe(self.metrics.scan_cost.as_secs_f64());
536    }
537}
538
539impl Iterator for Iter {
540    type Item = Result<Batch>;
541
542    fn next(&mut self) -> Option<Self::Item> {
543        let start = Instant::now();
544        let map = self.series.read().unwrap();
545        let range = match &self.last_key {
546            None => map.range::<Vec<u8>, _>(..),
547            Some(last_key) => {
548                map.range::<Vec<u8>, _>((Bound::Excluded(last_key), Bound::Unbounded))
549            }
550        };
551
552        // TODO(hl): maybe yield more than one time series to amortize range overhead.
553        for (primary_key, series) in range {
554            self.metrics.total_series += 1;
555
556            let mut series = series.write().unwrap();
557            if !self.predicate.is_empty()
558                && !prune_primary_key(
559                    &self.codec,
560                    primary_key.as_slice(),
561                    &mut series,
562                    &self.pk_datatypes,
563                    self.pk_schema.clone(),
564                    &self.predicate,
565                )
566            {
567                // read next series
568                self.metrics.num_pruned_series += 1;
569                continue;
570            }
571            self.last_key = Some(primary_key.clone());
572
573            let values = series.compact(&self.metadata);
574            let batch = values.and_then(|v| {
575                v.to_batch(primary_key, &self.metadata, &self.projection, self.dedup)
576            });
577
578            // Update metrics.
579            self.metrics.num_batches += 1;
580            self.metrics.num_rows += batch.as_ref().map(|b| b.num_rows()).unwrap_or(0);
581            self.metrics.scan_cost += start.elapsed();
582
583            let mut batch = batch;
584            batch = batch.and_then(|mut batch| {
585                batch.filter_by_sequence(self.sequence)?;
586                Ok(batch)
587            });
588            return Some(batch);
589        }
590        self.metrics.scan_cost += start.elapsed();
591
592        None
593    }
594}
595
596fn prune_primary_key(
597    codec: &Arc<DensePrimaryKeyCodec>,
598    pk: &[u8],
599    series: &mut Series,
600    datatypes: &[ConcreteDataType],
601    pk_schema: arrow::datatypes::SchemaRef,
602    predicates: &[SimpleFilterEvaluator],
603) -> bool {
604    // no primary key, we simply return true.
605    if pk_schema.fields().is_empty() {
606        return true;
607    }
608
609    // retrieve primary key values from cache or decode from bytes.
610    let pk_values = if let Some(pk_values) = series.pk_cache.as_ref() {
611        pk_values
612    } else {
613        let pk_values = codec.decode_dense_without_column_id(pk);
614        if let Err(e) = pk_values {
615            error!(e; "Failed to decode primary key");
616            return true;
617        }
618        series.update_pk_cache(pk_values.unwrap());
619        series.pk_cache.as_ref().unwrap()
620    };
621
622    // evaluate predicates against primary key values
623    let mut result = true;
624    for predicate in predicates {
625        // ignore predicates that are not referencing primary key columns
626        let Ok(index) = pk_schema.index_of(predicate.column_name()) else {
627            continue;
628        };
629        // Safety: arrow schema and datatypes are constructed from the same source.
630        let scalar_value = pk_values[index]
631            .try_to_scalar_value(&datatypes[index])
632            .unwrap();
633        result &= predicate.evaluate_scalar(&scalar_value).unwrap_or(true);
634    }
635
636    result
637}
638
639/// A `Series` holds a list of field values of some given primary key.
640struct Series {
641    pk_cache: Option<Vec<Value>>,
642    active: ValueBuilder,
643    frozen: Vec<Values>,
644    region_metadata: RegionMetadataRef,
645}
646
647impl Series {
648    fn new(region_metadata: &RegionMetadataRef) -> Self {
649        Self {
650            pk_cache: None,
651            active: ValueBuilder::new(region_metadata, INITIAL_BUILDER_CAPACITY),
652            frozen: vec![],
653            region_metadata: region_metadata.clone(),
654        }
655    }
656
657    /// Pushes a row of values into Series. Return the size of values.
658    fn push<'a>(
659        &mut self,
660        ts: ValueRef<'a>,
661        sequence: u64,
662        op_type: OpType,
663        values: impl Iterator<Item = ValueRef<'a>>,
664    ) -> usize {
665        // + 10 to avoid potential reallocation.
666        if self.active.len() + 10 > BUILDER_CAPACITY {
667            let region_metadata = self.region_metadata.clone();
668            self.freeze(&region_metadata);
669        }
670        self.active.push(ts, sequence, op_type as u8, values)
671    }
672
673    fn update_pk_cache(&mut self, pk_values: Vec<Value>) {
674        self.pk_cache = Some(pk_values);
675    }
676
677    /// Freezes the active part and push it to `frozen`.
678    fn freeze(&mut self, region_metadata: &RegionMetadataRef) {
679        if self.active.len() != 0 {
680            let mut builder = ValueBuilder::new(region_metadata, INITIAL_BUILDER_CAPACITY);
681            std::mem::swap(&mut self.active, &mut builder);
682            self.frozen.push(Values::from(builder));
683        }
684    }
685
686    /// Freezes active part to frozen part and compact frozen part to reduce memory fragmentation.
687    /// Returns the frozen and compacted values.
688    fn compact(&mut self, region_metadata: &RegionMetadataRef) -> Result<&Values> {
689        self.freeze(region_metadata);
690
691        let frozen = &self.frozen;
692
693        // Each series must contain at least one row
694        debug_assert!(!frozen.is_empty());
695
696        if frozen.len() > 1 {
697            // TODO(hl): We should keep track of min/max timestamps for each values and avoid
698            // cloning and sorting when values do not overlap with each other.
699
700            let column_size = frozen[0].fields.len() + 3;
701
702            if cfg!(debug_assertions) {
703                debug_assert!(frozen
704                    .iter()
705                    .zip(frozen.iter().skip(1))
706                    .all(|(prev, next)| { prev.fields.len() == next.fields.len() }));
707            }
708
709            let arrays = frozen.iter().map(|v| v.columns()).collect::<Vec<_>>();
710            let concatenated = (0..column_size)
711                .map(|i| {
712                    let to_concat = arrays.iter().map(|a| a[i].as_ref()).collect::<Vec<_>>();
713                    arrow::compute::concat(&to_concat)
714                })
715                .collect::<std::result::Result<Vec<_>, _>>()
716                .context(ComputeArrowSnafu)?;
717
718            debug_assert_eq!(concatenated.len(), column_size);
719            let values = Values::from_columns(&concatenated)?;
720            self.frozen = vec![values];
721        };
722        Ok(&self.frozen[0])
723    }
724}
725
726/// `ValueBuilder` holds all the vector builders for field columns.
727struct ValueBuilder {
728    timestamp: Vec<i64>,
729    timestamp_type: ConcreteDataType,
730    sequence: Vec<u64>,
731    op_type: Vec<u8>,
732    fields: Vec<Option<Box<dyn MutableVector>>>,
733    field_types: Vec<ConcreteDataType>,
734}
735
736impl ValueBuilder {
737    fn new(region_metadata: &RegionMetadataRef, capacity: usize) -> Self {
738        let timestamp_type = region_metadata
739            .time_index_column()
740            .column_schema
741            .data_type
742            .clone();
743        let sequence = Vec::with_capacity(capacity);
744        let op_type = Vec::with_capacity(capacity);
745
746        let field_types = region_metadata
747            .field_columns()
748            .map(|c| c.column_schema.data_type.clone())
749            .collect::<Vec<_>>();
750        let fields = (0..field_types.len()).map(|_| None).collect();
751
752        Self {
753            timestamp: Vec::with_capacity(capacity),
754            timestamp_type,
755            sequence,
756            op_type,
757            fields,
758            field_types,
759        }
760    }
761
762    /// Pushes a new row to `ValueBuilder`.
763    /// We don't need primary keys since they've already be encoded.
764    /// Returns the size of field values.
765    ///
766    /// In this method, we don't check the data type of the value, because it is already checked in the caller.
767    fn push<'a>(
768        &mut self,
769        ts: ValueRef,
770        sequence: u64,
771        op_type: u8,
772        fields: impl Iterator<Item = ValueRef<'a>>,
773    ) -> usize {
774        #[cfg(debug_assertions)]
775        let fields = {
776            let field_vec = fields.collect::<Vec<_>>();
777            debug_assert_eq!(field_vec.len(), self.fields.len());
778            field_vec.into_iter()
779        };
780
781        self.timestamp
782            .push(ts.as_timestamp().unwrap().unwrap().value());
783        self.sequence.push(sequence);
784        self.op_type.push(op_type);
785        let num_rows = self.timestamp.len();
786        let mut size = 0;
787        for (idx, field_value) in fields.enumerate() {
788            size += field_value.data_size();
789            if !field_value.is_null() || self.fields[idx].is_some() {
790                if let Some(field) = self.fields[idx].as_mut() {
791                    let _ = field.try_push_value_ref(field_value);
792                } else {
793                    let mut mutable_vector = self.field_types[idx]
794                        .create_mutable_vector(num_rows.max(INITIAL_BUILDER_CAPACITY));
795                    mutable_vector.push_nulls(num_rows - 1);
796                    let _ = mutable_vector.try_push_value_ref(field_value);
797                    self.fields[idx] = Some(mutable_vector);
798                }
799            }
800        }
801
802        size
803    }
804
805    /// Returns the length of [ValueBuilder]
806    fn len(&self) -> usize {
807        let sequence_len = self.sequence.len();
808        debug_assert_eq!(sequence_len, self.op_type.len());
809        debug_assert_eq!(sequence_len, self.timestamp.len());
810        sequence_len
811    }
812}
813
814/// [Values] holds an immutable vectors of field columns, including `sequence` and `op_type`.
815#[derive(Clone)]
816struct Values {
817    timestamp: VectorRef,
818    sequence: Arc<UInt64Vector>,
819    op_type: Arc<UInt8Vector>,
820    fields: Vec<VectorRef>,
821}
822
823impl Values {
824    /// Converts [Values] to `Batch`, sorts the batch according to `timestamp, sequence` desc and
825    /// keeps only the latest row for the same timestamp.
826    pub fn to_batch(
827        &self,
828        primary_key: &[u8],
829        metadata: &RegionMetadataRef,
830        projection: &HashSet<ColumnId>,
831        dedup: bool,
832    ) -> Result<Batch> {
833        let builder = BatchBuilder::with_required_columns(
834            primary_key.to_vec(),
835            self.timestamp.clone(),
836            self.sequence.clone(),
837            self.op_type.clone(),
838        );
839
840        let fields = metadata
841            .field_columns()
842            .zip(self.fields.iter())
843            .filter_map(|(c, f)| {
844                projection.get(&c.column_id).map(|c| BatchColumn {
845                    column_id: *c,
846                    data: f.clone(),
847                })
848            })
849            .collect();
850
851        let mut batch = builder.with_fields(fields).build()?;
852        batch.sort(dedup)?;
853        Ok(batch)
854    }
855
856    /// Returns a vector of all columns converted to arrow [Array](datatypes::arrow::array::Array) in [Values].
857    fn columns(&self) -> Vec<ArrayRef> {
858        let mut res = Vec::with_capacity(3 + self.fields.len());
859        res.push(self.timestamp.to_arrow_array());
860        res.push(self.sequence.to_arrow_array());
861        res.push(self.op_type.to_arrow_array());
862        res.extend(self.fields.iter().map(|f| f.to_arrow_array()));
863        res
864    }
865
866    /// Builds a new [Values] instance from columns.
867    fn from_columns(cols: &[ArrayRef]) -> Result<Self> {
868        debug_assert!(cols.len() >= 3);
869        let timestamp = Helper::try_into_vector(&cols[0]).context(ConvertVectorSnafu)?;
870        let sequence =
871            Arc::new(UInt64Vector::try_from_arrow_array(&cols[1]).context(ConvertVectorSnafu)?);
872        let op_type =
873            Arc::new(UInt8Vector::try_from_arrow_array(&cols[2]).context(ConvertVectorSnafu)?);
874        let fields = Helper::try_into_vectors(&cols[3..]).context(ConvertVectorSnafu)?;
875
876        Ok(Self {
877            timestamp,
878            sequence,
879            op_type,
880            fields,
881        })
882    }
883}
884
885impl From<ValueBuilder> for Values {
886    fn from(mut value: ValueBuilder) -> Self {
887        let num_rows = value.len();
888        let fields = value
889            .fields
890            .iter_mut()
891            .enumerate()
892            .map(|(i, v)| {
893                if let Some(v) = v {
894                    v.to_vector()
895                } else {
896                    let mut single_null = value.field_types[i].create_mutable_vector(num_rows);
897                    single_null.push_nulls(num_rows);
898                    single_null.to_vector()
899                }
900            })
901            .collect::<Vec<_>>();
902        let sequence = Arc::new(UInt64Vector::from_vec(value.sequence));
903        let op_type = Arc::new(UInt8Vector::from_vec(value.op_type));
904        let timestamp: VectorRef = match value.timestamp_type {
905            ConcreteDataType::Timestamp(TimestampType::Second(_)) => {
906                Arc::new(TimestampSecondVector::from_vec(value.timestamp))
907            }
908            ConcreteDataType::Timestamp(TimestampType::Millisecond(_)) => {
909                Arc::new(TimestampMillisecondVector::from_vec(value.timestamp))
910            }
911            ConcreteDataType::Timestamp(TimestampType::Microsecond(_)) => {
912                Arc::new(TimestampMicrosecondVector::from_vec(value.timestamp))
913            }
914            ConcreteDataType::Timestamp(TimestampType::Nanosecond(_)) => {
915                Arc::new(TimestampNanosecondVector::from_vec(value.timestamp))
916            }
917            _ => unreachable!(),
918        };
919
920        if cfg!(debug_assertions) {
921            debug_assert_eq!(timestamp.len(), sequence.len());
922            debug_assert_eq!(timestamp.len(), op_type.len());
923            for field in &fields {
924                debug_assert_eq!(timestamp.len(), field.len());
925            }
926        }
927
928        Self {
929            timestamp,
930            sequence,
931            op_type,
932            fields,
933        }
934    }
935}
936
937struct TimeSeriesIterBuilder {
938    series_set: SeriesSet,
939    projection: HashSet<ColumnId>,
940    predicate: Option<Predicate>,
941    dedup: bool,
942    sequence: Option<SequenceNumber>,
943    merge_mode: MergeMode,
944}
945
946impl IterBuilder for TimeSeriesIterBuilder {
947    fn build(&self) -> Result<BoxedBatchIterator> {
948        let iter = self.series_set.iter_series(
949            self.projection.clone(),
950            self.predicate.clone(),
951            self.dedup,
952            self.sequence,
953        )?;
954
955        if self.merge_mode == MergeMode::LastNonNull {
956            let iter = LastNonNullIter::new(iter);
957            Ok(Box::new(iter))
958        } else {
959            Ok(Box::new(iter))
960        }
961    }
962}
963
964#[cfg(test)]
965mod tests {
966    use std::collections::{HashMap, HashSet};
967
968    use api::helper::ColumnDataTypeWrapper;
969    use api::v1::value::ValueData;
970    use api::v1::{Mutation, Row, Rows, SemanticType};
971    use common_time::Timestamp;
972    use datatypes::prelude::{ConcreteDataType, ScalarVector};
973    use datatypes::schema::ColumnSchema;
974    use datatypes::value::{OrderedFloat, Value};
975    use datatypes::vectors::{Float64Vector, Int64Vector, TimestampMillisecondVector};
976    use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
977    use store_api::storage::RegionId;
978
979    use super::*;
980    use crate::row_converter::SortField;
981    use crate::test_util::column_metadata_to_column_schema;
982
983    fn schema_for_test() -> RegionMetadataRef {
984        let mut builder = RegionMetadataBuilder::new(RegionId::new(123, 456));
985        builder
986            .push_column_metadata(ColumnMetadata {
987                column_schema: ColumnSchema::new("k0", ConcreteDataType::string_datatype(), false),
988                semantic_type: SemanticType::Tag,
989                column_id: 0,
990            })
991            .push_column_metadata(ColumnMetadata {
992                column_schema: ColumnSchema::new("k1", ConcreteDataType::int64_datatype(), false),
993                semantic_type: SemanticType::Tag,
994                column_id: 1,
995            })
996            .push_column_metadata(ColumnMetadata {
997                column_schema: ColumnSchema::new(
998                    "ts",
999                    ConcreteDataType::timestamp_millisecond_datatype(),
1000                    false,
1001                ),
1002                semantic_type: SemanticType::Timestamp,
1003                column_id: 2,
1004            })
1005            .push_column_metadata(ColumnMetadata {
1006                column_schema: ColumnSchema::new("v0", ConcreteDataType::int64_datatype(), true),
1007                semantic_type: SemanticType::Field,
1008                column_id: 3,
1009            })
1010            .push_column_metadata(ColumnMetadata {
1011                column_schema: ColumnSchema::new("v1", ConcreteDataType::float64_datatype(), true),
1012                semantic_type: SemanticType::Field,
1013                column_id: 4,
1014            })
1015            .primary_key(vec![0, 1]);
1016        let region_metadata = builder.build().unwrap();
1017        Arc::new(region_metadata)
1018    }
1019
1020    fn ts_value_ref(val: i64) -> ValueRef<'static> {
1021        ValueRef::Timestamp(Timestamp::new_millisecond(val))
1022    }
1023
1024    fn field_value_ref(v0: i64, v1: f64) -> impl Iterator<Item = ValueRef<'static>> {
1025        vec![ValueRef::Int64(v0), ValueRef::Float64(OrderedFloat(v1))].into_iter()
1026    }
1027
1028    fn check_values(values: &Values, expect: &[(i64, u64, u8, i64, f64)]) {
1029        let ts = values
1030            .timestamp
1031            .as_any()
1032            .downcast_ref::<TimestampMillisecondVector>()
1033            .unwrap();
1034
1035        let v0 = values.fields[0]
1036            .as_any()
1037            .downcast_ref::<Int64Vector>()
1038            .unwrap();
1039        let v1 = values.fields[1]
1040            .as_any()
1041            .downcast_ref::<Float64Vector>()
1042            .unwrap();
1043        let read = ts
1044            .iter_data()
1045            .zip(values.sequence.iter_data())
1046            .zip(values.op_type.iter_data())
1047            .zip(v0.iter_data())
1048            .zip(v1.iter_data())
1049            .map(|((((ts, sequence), op_type), v0), v1)| {
1050                (
1051                    ts.unwrap().0.value(),
1052                    sequence.unwrap(),
1053                    op_type.unwrap(),
1054                    v0.unwrap(),
1055                    v1.unwrap(),
1056                )
1057            })
1058            .collect::<Vec<_>>();
1059        assert_eq!(expect, &read);
1060    }
1061
1062    #[test]
1063    fn test_series() {
1064        let region_metadata = schema_for_test();
1065        let mut series = Series::new(&region_metadata);
1066        series.push(ts_value_ref(1), 0, OpType::Put, field_value_ref(1, 10.1));
1067        series.push(ts_value_ref(2), 0, OpType::Put, field_value_ref(2, 10.2));
1068        assert_eq!(2, series.active.timestamp.len());
1069        assert_eq!(0, series.frozen.len());
1070
1071        let values = series.compact(&region_metadata).unwrap();
1072        check_values(values, &[(1, 0, 1, 1, 10.1), (2, 0, 1, 2, 10.2)]);
1073        assert_eq!(0, series.active.timestamp.len());
1074        assert_eq!(1, series.frozen.len());
1075    }
1076
1077    #[test]
1078    fn test_series_with_nulls() {
1079        let region_metadata = schema_for_test();
1080        let mut series = Series::new(&region_metadata);
1081        // col1: NULL 1 2 3
1082        // col2: NULL NULL 10.2 NULL
1083        series.push(
1084            ts_value_ref(1),
1085            0,
1086            OpType::Put,
1087            vec![ValueRef::Null, ValueRef::Null].into_iter(),
1088        );
1089        series.push(
1090            ts_value_ref(1),
1091            0,
1092            OpType::Put,
1093            vec![ValueRef::Int64(1), ValueRef::Null].into_iter(),
1094        );
1095        series.push(ts_value_ref(1), 2, OpType::Put, field_value_ref(2, 10.2));
1096        series.push(
1097            ts_value_ref(1),
1098            3,
1099            OpType::Put,
1100            vec![ValueRef::Int64(2), ValueRef::Null].into_iter(),
1101        );
1102        assert_eq!(4, series.active.timestamp.len());
1103        assert_eq!(0, series.frozen.len());
1104
1105        let values = series.compact(&region_metadata).unwrap();
1106        assert_eq!(values.fields[0].null_count(), 1);
1107        assert_eq!(values.fields[1].null_count(), 3);
1108        assert_eq!(0, series.active.timestamp.len());
1109        assert_eq!(1, series.frozen.len());
1110    }
1111
1112    fn check_value(batch: &Batch, expect: Vec<Vec<Value>>) {
1113        let ts_len = batch.timestamps().len();
1114        assert_eq!(batch.sequences().len(), ts_len);
1115        assert_eq!(batch.op_types().len(), ts_len);
1116        for f in batch.fields() {
1117            assert_eq!(f.data.len(), ts_len);
1118        }
1119
1120        let mut rows = vec![];
1121        for idx in 0..ts_len {
1122            let mut row = Vec::with_capacity(batch.fields().len() + 3);
1123            row.push(batch.timestamps().get(idx));
1124            row.push(batch.sequences().get(idx));
1125            row.push(batch.op_types().get(idx));
1126            row.extend(batch.fields().iter().map(|f| f.data.get(idx)));
1127            rows.push(row);
1128        }
1129
1130        assert_eq!(expect.len(), rows.len());
1131        for (idx, row) in rows.iter().enumerate() {
1132            assert_eq!(&expect[idx], row);
1133        }
1134    }
1135
1136    #[test]
1137    fn test_values_sort() {
1138        let schema = schema_for_test();
1139        let timestamp = Arc::new(TimestampMillisecondVector::from_vec(vec![1, 2, 3, 4, 3]));
1140        let sequence = Arc::new(UInt64Vector::from_vec(vec![1, 1, 1, 1, 2]));
1141        let op_type = Arc::new(UInt8Vector::from_vec(vec![1, 1, 1, 1, 0]));
1142
1143        let fields = vec![
1144            Arc::new(Int64Vector::from_vec(vec![4, 3, 2, 1, 2])) as Arc<_>,
1145            Arc::new(Float64Vector::from_vec(vec![1.1, 2.1, 4.2, 3.3, 4.2])) as Arc<_>,
1146        ];
1147        let values = Values {
1148            timestamp: timestamp as Arc<_>,
1149            sequence,
1150            op_type,
1151            fields,
1152        };
1153
1154        let batch = values
1155            .to_batch(
1156                b"test",
1157                &schema,
1158                &[0, 1, 2, 3, 4].into_iter().collect(),
1159                true,
1160            )
1161            .unwrap();
1162        check_value(
1163            &batch,
1164            vec![
1165                vec![
1166                    Value::Timestamp(Timestamp::new_millisecond(1)),
1167                    Value::UInt64(1),
1168                    Value::UInt8(1),
1169                    Value::Int64(4),
1170                    Value::Float64(OrderedFloat(1.1)),
1171                ],
1172                vec![
1173                    Value::Timestamp(Timestamp::new_millisecond(2)),
1174                    Value::UInt64(1),
1175                    Value::UInt8(1),
1176                    Value::Int64(3),
1177                    Value::Float64(OrderedFloat(2.1)),
1178                ],
1179                vec![
1180                    Value::Timestamp(Timestamp::new_millisecond(3)),
1181                    Value::UInt64(2),
1182                    Value::UInt8(0),
1183                    Value::Int64(2),
1184                    Value::Float64(OrderedFloat(4.2)),
1185                ],
1186                vec![
1187                    Value::Timestamp(Timestamp::new_millisecond(4)),
1188                    Value::UInt64(1),
1189                    Value::UInt8(1),
1190                    Value::Int64(1),
1191                    Value::Float64(OrderedFloat(3.3)),
1192                ],
1193            ],
1194        )
1195    }
1196
1197    fn build_key_values(schema: &RegionMetadataRef, k0: String, k1: i64, len: usize) -> KeyValues {
1198        let column_schema = schema
1199            .column_metadatas
1200            .iter()
1201            .map(|c| api::v1::ColumnSchema {
1202                column_name: c.column_schema.name.clone(),
1203                datatype: ColumnDataTypeWrapper::try_from(c.column_schema.data_type.clone())
1204                    .unwrap()
1205                    .datatype() as i32,
1206                semantic_type: c.semantic_type as i32,
1207                ..Default::default()
1208            })
1209            .collect();
1210
1211        let rows = (0..len)
1212            .map(|i| Row {
1213                values: vec![
1214                    api::v1::Value {
1215                        value_data: Some(ValueData::StringValue(k0.clone())),
1216                    },
1217                    api::v1::Value {
1218                        value_data: Some(ValueData::I64Value(k1)),
1219                    },
1220                    api::v1::Value {
1221                        value_data: Some(ValueData::TimestampMillisecondValue(i as i64)),
1222                    },
1223                    api::v1::Value {
1224                        value_data: Some(ValueData::I64Value(i as i64)),
1225                    },
1226                    api::v1::Value {
1227                        value_data: Some(ValueData::F64Value(i as f64)),
1228                    },
1229                ],
1230            })
1231            .collect();
1232        let mutation = api::v1::Mutation {
1233            op_type: 1,
1234            sequence: 0,
1235            rows: Some(Rows {
1236                schema: column_schema,
1237                rows,
1238            }),
1239            write_hint: None,
1240        };
1241        KeyValues::new(schema.as_ref(), mutation).unwrap()
1242    }
1243
1244    #[test]
1245    fn test_series_set_concurrency() {
1246        let schema = schema_for_test();
1247        let row_codec = Arc::new(DensePrimaryKeyCodec::with_fields(
1248            schema
1249                .primary_key_columns()
1250                .map(|c| {
1251                    (
1252                        c.column_id,
1253                        SortField::new(c.column_schema.data_type.clone()),
1254                    )
1255                })
1256                .collect(),
1257        ));
1258        let set = Arc::new(SeriesSet::new(schema.clone(), row_codec));
1259
1260        let concurrency = 32;
1261        let pk_num = concurrency * 2;
1262        let mut handles = Vec::with_capacity(concurrency);
1263        for i in 0..concurrency {
1264            let set = set.clone();
1265            let schema = schema.clone();
1266            let column_schemas = schema
1267                .column_metadatas
1268                .iter()
1269                .map(column_metadata_to_column_schema)
1270                .collect::<Vec<_>>();
1271            let handle = std::thread::spawn(move || {
1272                for j in i * 100..(i + 1) * 100 {
1273                    let pk = j % pk_num;
1274                    let primary_key = format!("pk-{}", pk).as_bytes().to_vec();
1275
1276                    let kvs = KeyValues::new(
1277                        &schema,
1278                        Mutation {
1279                            op_type: OpType::Put as i32,
1280                            sequence: j as u64,
1281                            rows: Some(Rows {
1282                                schema: column_schemas.clone(),
1283                                rows: vec![Row {
1284                                    values: vec![
1285                                        api::v1::Value {
1286                                            value_data: Some(ValueData::StringValue(format!(
1287                                                "{}",
1288                                                j
1289                                            ))),
1290                                        },
1291                                        api::v1::Value {
1292                                            value_data: Some(ValueData::I64Value(j as i64)),
1293                                        },
1294                                        api::v1::Value {
1295                                            value_data: Some(ValueData::TimestampMillisecondValue(
1296                                                j as i64,
1297                                            )),
1298                                        },
1299                                        api::v1::Value {
1300                                            value_data: Some(ValueData::I64Value(j as i64)),
1301                                        },
1302                                        api::v1::Value {
1303                                            value_data: Some(ValueData::F64Value(j as f64)),
1304                                        },
1305                                    ],
1306                                }],
1307                            }),
1308                            write_hint: None,
1309                        },
1310                    )
1311                    .unwrap();
1312                    set.push_to_series(primary_key, &kvs.iter().next().unwrap());
1313                }
1314            });
1315            handles.push(handle);
1316        }
1317        for h in handles {
1318            h.join().unwrap();
1319        }
1320
1321        let mut timestamps = Vec::with_capacity(concurrency * 100);
1322        let mut sequences = Vec::with_capacity(concurrency * 100);
1323        let mut op_types = Vec::with_capacity(concurrency * 100);
1324        let mut v0 = Vec::with_capacity(concurrency * 100);
1325
1326        for i in 0..pk_num {
1327            let pk = format!("pk-{}", i).as_bytes().to_vec();
1328            let series = set.get_series(&pk).unwrap();
1329            let mut guard = series.write().unwrap();
1330            let values = guard.compact(&schema).unwrap();
1331            timestamps.extend(values.sequence.iter_data().map(|v| v.unwrap() as i64));
1332            sequences.extend(values.sequence.iter_data().map(|v| v.unwrap() as i64));
1333            op_types.extend(values.op_type.iter_data().map(|v| v.unwrap()));
1334            v0.extend(
1335                values
1336                    .fields
1337                    .first()
1338                    .unwrap()
1339                    .as_any()
1340                    .downcast_ref::<Int64Vector>()
1341                    .unwrap()
1342                    .iter_data()
1343                    .map(|v| v.unwrap()),
1344            );
1345        }
1346
1347        let expected_sequence = (0..(concurrency * 100) as i64).collect::<HashSet<_>>();
1348        assert_eq!(
1349            expected_sequence,
1350            sequences.iter().copied().collect::<HashSet<_>>()
1351        );
1352
1353        op_types.iter().all(|op| *op == OpType::Put as u8);
1354        assert_eq!(
1355            expected_sequence,
1356            timestamps.iter().copied().collect::<HashSet<_>>()
1357        );
1358
1359        assert_eq!(timestamps, sequences);
1360        assert_eq!(v0, timestamps);
1361    }
1362
1363    #[test]
1364    fn test_memtable() {
1365        common_telemetry::init_default_ut_logging();
1366        check_memtable_dedup(true);
1367        check_memtable_dedup(false);
1368    }
1369
1370    fn check_memtable_dedup(dedup: bool) {
1371        let schema = schema_for_test();
1372        let kvs = build_key_values(&schema, "hello".to_string(), 42, 100);
1373        let memtable = TimeSeriesMemtable::new(schema, 42, None, dedup, MergeMode::LastRow);
1374        memtable.write(&kvs).unwrap();
1375        memtable.write(&kvs).unwrap();
1376
1377        let mut expected_ts: HashMap<i64, usize> = HashMap::new();
1378        for ts in kvs
1379            .iter()
1380            .map(|kv| kv.timestamp().as_timestamp().unwrap().unwrap().value())
1381        {
1382            *expected_ts.entry(ts).or_default() += if dedup { 1 } else { 2 };
1383        }
1384
1385        let iter = memtable.iter(None, None, None).unwrap();
1386        let mut read = HashMap::new();
1387
1388        for ts in iter
1389            .flat_map(|batch| {
1390                batch
1391                    .unwrap()
1392                    .timestamps()
1393                    .as_any()
1394                    .downcast_ref::<TimestampMillisecondVector>()
1395                    .unwrap()
1396                    .iter_data()
1397                    .collect::<Vec<_>>()
1398                    .into_iter()
1399            })
1400            .map(|v| v.unwrap().0.value())
1401        {
1402            *read.entry(ts).or_default() += 1;
1403        }
1404        assert_eq!(expected_ts, read);
1405
1406        let stats = memtable.stats();
1407        assert!(stats.bytes_allocated() > 0);
1408        assert_eq!(
1409            Some((
1410                Timestamp::new_millisecond(0),
1411                Timestamp::new_millisecond(99)
1412            )),
1413            stats.time_range()
1414        );
1415    }
1416
1417    #[test]
1418    fn test_memtable_projection() {
1419        common_telemetry::init_default_ut_logging();
1420        let schema = schema_for_test();
1421        let kvs = build_key_values(&schema, "hello".to_string(), 42, 100);
1422        let memtable = TimeSeriesMemtable::new(schema, 42, None, true, MergeMode::LastRow);
1423        memtable.write(&kvs).unwrap();
1424
1425        let iter = memtable.iter(Some(&[3]), None, None).unwrap();
1426
1427        let mut v0_all = vec![];
1428
1429        for res in iter {
1430            let batch = res.unwrap();
1431            assert_eq!(1, batch.fields().len());
1432            let v0 = batch
1433                .fields()
1434                .first()
1435                .unwrap()
1436                .data
1437                .as_any()
1438                .downcast_ref::<Int64Vector>()
1439                .unwrap();
1440            v0_all.extend(v0.iter_data().map(|v| v.unwrap()));
1441        }
1442        assert_eq!((0..100i64).collect::<Vec<_>>(), v0_all);
1443    }
1444
1445    #[test]
1446    fn test_memtable_concurrent_write_read() {
1447        common_telemetry::init_default_ut_logging();
1448        let schema = schema_for_test();
1449        let memtable = Arc::new(TimeSeriesMemtable::new(
1450            schema.clone(),
1451            42,
1452            None,
1453            true,
1454            MergeMode::LastRow,
1455        ));
1456
1457        // Number of writer threads
1458        let num_writers = 10;
1459        // Number of reader threads
1460        let num_readers = 5;
1461        // Number of series per writer
1462        let series_per_writer = 100;
1463        // Number of rows per series
1464        let rows_per_series = 10;
1465        // Total number of series
1466        let total_series = num_writers * series_per_writer;
1467
1468        // Create a barrier to synchronize the start of all threads
1469        let barrier = Arc::new(std::sync::Barrier::new(num_writers + num_readers + 1));
1470
1471        // Spawn writer threads
1472        let mut writer_handles = Vec::with_capacity(num_writers);
1473        for writer_id in 0..num_writers {
1474            let memtable = memtable.clone();
1475            let schema = schema.clone();
1476            let barrier = barrier.clone();
1477
1478            let handle = std::thread::spawn(move || {
1479                // Wait for all threads to be ready
1480                barrier.wait();
1481
1482                // Create and write series
1483                for series_id in 0..series_per_writer {
1484                    let series_key = format!("writer-{}-series-{}", writer_id, series_id);
1485                    let kvs =
1486                        build_key_values(&schema, series_key, series_id as i64, rows_per_series);
1487                    memtable.write(&kvs).unwrap();
1488                }
1489            });
1490
1491            writer_handles.push(handle);
1492        }
1493
1494        // Spawn reader threads
1495        let mut reader_handles = Vec::with_capacity(num_readers);
1496        for _ in 0..num_readers {
1497            let memtable = memtable.clone();
1498            let barrier = barrier.clone();
1499
1500            let handle = std::thread::spawn(move || {
1501                barrier.wait();
1502
1503                for _ in 0..10 {
1504                    let iter = memtable.iter(None, None, None).unwrap();
1505                    for batch_result in iter {
1506                        let _ = batch_result.unwrap();
1507                    }
1508                }
1509            });
1510
1511            reader_handles.push(handle);
1512        }
1513
1514        barrier.wait();
1515
1516        for handle in writer_handles {
1517            handle.join().unwrap();
1518        }
1519        for handle in reader_handles {
1520            handle.join().unwrap();
1521        }
1522
1523        let iter = memtable.iter(None, None, None).unwrap();
1524        let mut series_count = 0;
1525        let mut row_count = 0;
1526
1527        for batch_result in iter {
1528            let batch = batch_result.unwrap();
1529            series_count += 1;
1530            row_count += batch.num_rows();
1531        }
1532        assert_eq!(total_series, series_count);
1533        assert_eq!(total_series * rows_per_series, row_count);
1534    }
1535}