mito2/memtable/
time_series.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::btree_map::Entry;
16use std::collections::{BTreeMap, Bound, HashSet};
17use std::fmt::{Debug, Formatter};
18use std::iter;
19use std::sync::atomic::{AtomicI64, AtomicU64, AtomicUsize, Ordering};
20use std::sync::{Arc, RwLock};
21use std::time::{Duration, Instant};
22
23use api::v1::OpType;
24use common_recordbatch::filter::SimpleFilterEvaluator;
25use common_telemetry::{debug, error};
26use common_time::Timestamp;
27use datatypes::arrow;
28use datatypes::arrow::array::ArrayRef;
29use datatypes::arrow_array::StringArray;
30use datatypes::data_type::{ConcreteDataType, DataType};
31use datatypes::prelude::{ScalarVector, Vector, VectorRef};
32use datatypes::types::TimestampType;
33use datatypes::value::{Value, ValueRef};
34use datatypes::vectors::{
35    Helper, TimestampMicrosecondVector, TimestampMillisecondVector, TimestampNanosecondVector,
36    TimestampSecondVector, UInt64Vector, UInt8Vector,
37};
38use mito_codec::key_values::KeyValue;
39use mito_codec::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt};
40use snafu::{ensure, OptionExt, ResultExt};
41use store_api::metadata::RegionMetadataRef;
42use store_api::storage::{ColumnId, SequenceNumber};
43use table::predicate::Predicate;
44
45use crate::error::{
46    self, ComputeArrowSnafu, ConvertVectorSnafu, EncodeSnafu, PrimaryKeyLengthMismatchSnafu, Result,
47};
48use crate::flush::WriteBufferManagerRef;
49use crate::memtable::builder::{FieldBuilder, StringBuilder};
50use crate::memtable::bulk::part::BulkPart;
51use crate::memtable::simple_bulk_memtable::SimpleBulkMemtable;
52use crate::memtable::stats::WriteMetrics;
53use crate::memtable::{
54    AllocTracker, BoxedBatchIterator, IterBuilder, KeyValues, Memtable, MemtableBuilder,
55    MemtableId, MemtableRange, MemtableRangeContext, MemtableRanges, MemtableRef, MemtableStats,
56    PredicateGroup,
57};
58use crate::metrics::{READ_ROWS_TOTAL, READ_STAGE_ELAPSED};
59use crate::read::dedup::LastNonNullIter;
60use crate::read::{Batch, BatchBuilder, BatchColumn};
61use crate::region::options::MergeMode;
62
63/// Initial vector builder capacity.
64const INITIAL_BUILDER_CAPACITY: usize = 4;
65
66/// Vector builder capacity.
67const BUILDER_CAPACITY: usize = 512;
68
69/// Builder to build [TimeSeriesMemtable].
70#[derive(Debug, Default)]
71pub struct TimeSeriesMemtableBuilder {
72    write_buffer_manager: Option<WriteBufferManagerRef>,
73    dedup: bool,
74    merge_mode: MergeMode,
75}
76
77impl TimeSeriesMemtableBuilder {
78    /// Creates a new builder with specific `write_buffer_manager`.
79    pub fn new(
80        write_buffer_manager: Option<WriteBufferManagerRef>,
81        dedup: bool,
82        merge_mode: MergeMode,
83    ) -> Self {
84        Self {
85            write_buffer_manager,
86            dedup,
87            merge_mode,
88        }
89    }
90}
91
92impl MemtableBuilder for TimeSeriesMemtableBuilder {
93    fn build(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef {
94        if metadata.primary_key.is_empty() {
95            Arc::new(SimpleBulkMemtable::new(
96                id,
97                metadata.clone(),
98                self.write_buffer_manager.clone(),
99                self.dedup,
100                self.merge_mode,
101            ))
102        } else {
103            Arc::new(TimeSeriesMemtable::new(
104                metadata.clone(),
105                id,
106                self.write_buffer_manager.clone(),
107                self.dedup,
108                self.merge_mode,
109            ))
110        }
111    }
112}
113
114/// Memtable implementation that groups rows by their primary key.
115pub struct TimeSeriesMemtable {
116    id: MemtableId,
117    region_metadata: RegionMetadataRef,
118    row_codec: Arc<DensePrimaryKeyCodec>,
119    series_set: SeriesSet,
120    alloc_tracker: AllocTracker,
121    max_timestamp: AtomicI64,
122    min_timestamp: AtomicI64,
123    max_sequence: AtomicU64,
124    dedup: bool,
125    merge_mode: MergeMode,
126    /// Total written rows in memtable. This also includes deleted and duplicated rows.
127    num_rows: AtomicUsize,
128}
129
130impl TimeSeriesMemtable {
131    pub fn new(
132        region_metadata: RegionMetadataRef,
133        id: MemtableId,
134        write_buffer_manager: Option<WriteBufferManagerRef>,
135        dedup: bool,
136        merge_mode: MergeMode,
137    ) -> Self {
138        let row_codec = Arc::new(DensePrimaryKeyCodec::new(&region_metadata));
139        let series_set = SeriesSet::new(region_metadata.clone(), row_codec.clone());
140        let dedup = if merge_mode == MergeMode::LastNonNull {
141            false
142        } else {
143            dedup
144        };
145        Self {
146            id,
147            region_metadata,
148            series_set,
149            row_codec,
150            alloc_tracker: AllocTracker::new(write_buffer_manager),
151            max_timestamp: AtomicI64::new(i64::MIN),
152            min_timestamp: AtomicI64::new(i64::MAX),
153            max_sequence: AtomicU64::new(0),
154            dedup,
155            merge_mode,
156            num_rows: Default::default(),
157        }
158    }
159
160    /// Updates memtable stats.
161    fn update_stats(&self, stats: WriteMetrics) {
162        self.alloc_tracker
163            .on_allocation(stats.key_bytes + stats.value_bytes);
164        self.max_timestamp.fetch_max(stats.max_ts, Ordering::SeqCst);
165        self.min_timestamp.fetch_min(stats.min_ts, Ordering::SeqCst);
166        self.max_sequence
167            .fetch_max(stats.max_sequence, Ordering::SeqCst);
168        self.num_rows.fetch_add(stats.num_rows, Ordering::SeqCst);
169    }
170
171    fn write_key_value(&self, kv: KeyValue, stats: &mut WriteMetrics) -> Result<()> {
172        ensure!(
173            self.row_codec.num_fields() == kv.num_primary_keys(),
174            PrimaryKeyLengthMismatchSnafu {
175                expect: self.row_codec.num_fields(),
176                actual: kv.num_primary_keys(),
177            }
178        );
179
180        let primary_key_encoded = self
181            .row_codec
182            .encode(kv.primary_keys())
183            .context(EncodeSnafu)?;
184
185        let (key_allocated, value_allocated) =
186            self.series_set.push_to_series(primary_key_encoded, &kv);
187        stats.key_bytes += key_allocated;
188        stats.value_bytes += value_allocated;
189
190        // safety: timestamp of kv must be both present and a valid timestamp value.
191        let ts = kv.timestamp().as_timestamp().unwrap().unwrap().value();
192        stats.min_ts = stats.min_ts.min(ts);
193        stats.max_ts = stats.max_ts.max(ts);
194        Ok(())
195    }
196}
197
198impl Debug for TimeSeriesMemtable {
199    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
200        f.debug_struct("TimeSeriesMemtable").finish()
201    }
202}
203
204impl Memtable for TimeSeriesMemtable {
205    fn id(&self) -> MemtableId {
206        self.id
207    }
208
209    fn write(&self, kvs: &KeyValues) -> Result<()> {
210        if kvs.is_empty() {
211            return Ok(());
212        }
213
214        let mut local_stats = WriteMetrics::default();
215
216        for kv in kvs.iter() {
217            self.write_key_value(kv, &mut local_stats)?;
218        }
219        local_stats.value_bytes += kvs.num_rows() * std::mem::size_of::<Timestamp>();
220        local_stats.value_bytes += kvs.num_rows() * std::mem::size_of::<OpType>();
221        local_stats.max_sequence = kvs.max_sequence();
222        local_stats.num_rows = kvs.num_rows();
223        // TODO(hl): this maybe inaccurate since for-iteration may return early.
224        // We may lift the primary key length check out of Memtable::write
225        // so that we can ensure writing to memtable will succeed.
226        self.update_stats(local_stats);
227        Ok(())
228    }
229
230    fn write_one(&self, key_value: KeyValue) -> Result<()> {
231        let mut metrics = WriteMetrics::default();
232        let res = self.write_key_value(key_value, &mut metrics);
233        metrics.value_bytes += std::mem::size_of::<Timestamp>() + std::mem::size_of::<OpType>();
234        metrics.max_sequence = key_value.sequence();
235        metrics.num_rows = 1;
236
237        if res.is_ok() {
238            self.update_stats(metrics);
239        }
240        res
241    }
242
243    fn write_bulk(&self, part: BulkPart) -> Result<()> {
244        // Default implementation fallback to row iteration.
245        let mutation = part.to_mutation(&self.region_metadata)?;
246        let mut metrics = WriteMetrics::default();
247        if let Some(key_values) = KeyValues::new(&self.region_metadata, mutation) {
248            for kv in key_values.iter() {
249                self.write_key_value(kv, &mut metrics)?
250            }
251        }
252
253        metrics.max_sequence = part.sequence;
254        metrics.max_ts = part.max_ts;
255        metrics.min_ts = part.min_ts;
256        metrics.num_rows = part.num_rows();
257        self.update_stats(metrics);
258        Ok(())
259    }
260
261    fn iter(
262        &self,
263        projection: Option<&[ColumnId]>,
264        filters: Option<Predicate>,
265        sequence: Option<SequenceNumber>,
266    ) -> Result<BoxedBatchIterator> {
267        let projection = if let Some(projection) = projection {
268            projection.iter().copied().collect()
269        } else {
270            self.region_metadata
271                .field_columns()
272                .map(|c| c.column_id)
273                .collect()
274        };
275
276        let iter = self
277            .series_set
278            .iter_series(projection, filters, self.dedup, sequence)?;
279
280        if self.merge_mode == MergeMode::LastNonNull {
281            let iter = LastNonNullIter::new(iter);
282            Ok(Box::new(iter))
283        } else {
284            Ok(Box::new(iter))
285        }
286    }
287
288    fn ranges(
289        &self,
290        projection: Option<&[ColumnId]>,
291        predicate: PredicateGroup,
292        sequence: Option<SequenceNumber>,
293    ) -> Result<MemtableRanges> {
294        let projection = if let Some(projection) = projection {
295            projection.iter().copied().collect()
296        } else {
297            self.region_metadata
298                .field_columns()
299                .map(|c| c.column_id)
300                .collect()
301        };
302        let builder = Box::new(TimeSeriesIterBuilder {
303            series_set: self.series_set.clone(),
304            projection,
305            predicate: predicate.predicate().cloned(),
306            dedup: self.dedup,
307            merge_mode: self.merge_mode,
308            sequence,
309        });
310        let context = Arc::new(MemtableRangeContext::new(self.id, builder, predicate));
311
312        Ok(MemtableRanges {
313            ranges: [(0, MemtableRange::new(context))].into(),
314            stats: self.stats(),
315        })
316    }
317
318    fn is_empty(&self) -> bool {
319        self.series_set.series.read().unwrap().is_empty()
320    }
321
322    fn freeze(&self) -> Result<()> {
323        self.alloc_tracker.done_allocating();
324
325        Ok(())
326    }
327
328    fn stats(&self) -> MemtableStats {
329        let estimated_bytes = self.alloc_tracker.bytes_allocated();
330
331        if estimated_bytes == 0 {
332            // no rows ever written
333            return MemtableStats {
334                estimated_bytes,
335                time_range: None,
336                num_rows: 0,
337                num_ranges: 0,
338                max_sequence: 0,
339            };
340        }
341        let ts_type = self
342            .region_metadata
343            .time_index_column()
344            .column_schema
345            .data_type
346            .clone()
347            .as_timestamp()
348            .expect("Timestamp column must have timestamp type");
349        let max_timestamp = ts_type.create_timestamp(self.max_timestamp.load(Ordering::Relaxed));
350        let min_timestamp = ts_type.create_timestamp(self.min_timestamp.load(Ordering::Relaxed));
351        MemtableStats {
352            estimated_bytes,
353            time_range: Some((min_timestamp, max_timestamp)),
354            num_rows: self.num_rows.load(Ordering::Relaxed),
355            num_ranges: 1,
356            max_sequence: self.max_sequence.load(Ordering::Relaxed),
357        }
358    }
359
360    fn fork(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef {
361        Arc::new(TimeSeriesMemtable::new(
362            metadata.clone(),
363            id,
364            self.alloc_tracker.write_buffer_manager(),
365            self.dedup,
366            self.merge_mode,
367        ))
368    }
369}
370
371type SeriesRwLockMap = RwLock<BTreeMap<Vec<u8>, Arc<RwLock<Series>>>>;
372
373#[derive(Clone)]
374pub(crate) struct SeriesSet {
375    pub(crate) region_metadata: RegionMetadataRef,
376    pub(crate) series: Arc<SeriesRwLockMap>,
377    pub(crate) codec: Arc<DensePrimaryKeyCodec>,
378}
379
380impl SeriesSet {
381    fn new(region_metadata: RegionMetadataRef, codec: Arc<DensePrimaryKeyCodec>) -> Self {
382        Self {
383            region_metadata,
384            series: Default::default(),
385            codec,
386        }
387    }
388}
389
390impl SeriesSet {
391    /// Push [KeyValue] to SeriesSet with given primary key and return key/value allocated memory size.
392    fn push_to_series(&self, primary_key: Vec<u8>, kv: &KeyValue) -> (usize, usize) {
393        if let Some(series) = self.series.read().unwrap().get(&primary_key) {
394            let value_allocated = series.write().unwrap().push(
395                kv.timestamp(),
396                kv.sequence(),
397                kv.op_type(),
398                kv.fields(),
399            );
400            return (0, value_allocated);
401        };
402
403        let mut indices = self.series.write().unwrap();
404        match indices.entry(primary_key) {
405            Entry::Vacant(v) => {
406                let key_len = v.key().len();
407                let mut series = Series::new(&self.region_metadata);
408                let value_allocated =
409                    series.push(kv.timestamp(), kv.sequence(), kv.op_type(), kv.fields());
410                v.insert(Arc::new(RwLock::new(series)));
411                (key_len, value_allocated)
412            }
413            // safety: series must exist at given index.
414            Entry::Occupied(v) => {
415                let value_allocated = v.get().write().unwrap().push(
416                    kv.timestamp(),
417                    kv.sequence(),
418                    kv.op_type(),
419                    kv.fields(),
420                );
421                (0, value_allocated)
422            }
423        }
424    }
425
426    #[cfg(test)]
427    fn get_series(&self, primary_key: &[u8]) -> Option<Arc<RwLock<Series>>> {
428        self.series.read().unwrap().get(primary_key).cloned()
429    }
430
431    /// Iterates all series in [SeriesSet].
432    fn iter_series(
433        &self,
434        projection: HashSet<ColumnId>,
435        predicate: Option<Predicate>,
436        dedup: bool,
437        sequence: Option<SequenceNumber>,
438    ) -> Result<Iter> {
439        let primary_key_schema = primary_key_schema(&self.region_metadata);
440        let primary_key_datatypes = self
441            .region_metadata
442            .primary_key_columns()
443            .map(|pk| pk.column_schema.data_type.clone())
444            .collect();
445
446        Iter::try_new(
447            self.region_metadata.clone(),
448            self.series.clone(),
449            projection,
450            predicate,
451            primary_key_schema,
452            primary_key_datatypes,
453            self.codec.clone(),
454            dedup,
455            sequence,
456        )
457    }
458}
459
460/// Creates an arrow [SchemaRef](arrow::datatypes::SchemaRef) that only contains primary keys
461/// of given region schema
462pub(crate) fn primary_key_schema(
463    region_metadata: &RegionMetadataRef,
464) -> arrow::datatypes::SchemaRef {
465    let fields = region_metadata
466        .primary_key_columns()
467        .map(|pk| {
468            arrow::datatypes::Field::new(
469                pk.column_schema.name.clone(),
470                pk.column_schema.data_type.as_arrow_type(),
471                pk.column_schema.is_nullable(),
472            )
473        })
474        .collect::<Vec<_>>();
475    Arc::new(arrow::datatypes::Schema::new(fields))
476}
477
478/// Metrics for reading the memtable.
479#[derive(Debug, Default)]
480struct Metrics {
481    /// Total series in the memtable.
482    total_series: usize,
483    /// Number of series pruned.
484    num_pruned_series: usize,
485    /// Number of rows read.
486    num_rows: usize,
487    /// Number of batch read.
488    num_batches: usize,
489    /// Duration to scan the memtable.
490    scan_cost: Duration,
491}
492
493struct Iter {
494    metadata: RegionMetadataRef,
495    series: Arc<SeriesRwLockMap>,
496    projection: HashSet<ColumnId>,
497    last_key: Option<Vec<u8>>,
498    predicate: Vec<SimpleFilterEvaluator>,
499    pk_schema: arrow::datatypes::SchemaRef,
500    pk_datatypes: Vec<ConcreteDataType>,
501    codec: Arc<DensePrimaryKeyCodec>,
502    dedup: bool,
503    sequence: Option<SequenceNumber>,
504    metrics: Metrics,
505}
506
507impl Iter {
508    #[allow(clippy::too_many_arguments)]
509    pub(crate) fn try_new(
510        metadata: RegionMetadataRef,
511        series: Arc<SeriesRwLockMap>,
512        projection: HashSet<ColumnId>,
513        predicate: Option<Predicate>,
514        pk_schema: arrow::datatypes::SchemaRef,
515        pk_datatypes: Vec<ConcreteDataType>,
516        codec: Arc<DensePrimaryKeyCodec>,
517        dedup: bool,
518        sequence: Option<SequenceNumber>,
519    ) -> Result<Self> {
520        let predicate = predicate
521            .map(|predicate| {
522                predicate
523                    .exprs()
524                    .iter()
525                    .filter_map(SimpleFilterEvaluator::try_new)
526                    .collect::<Vec<_>>()
527            })
528            .unwrap_or_default();
529        Ok(Self {
530            metadata,
531            series,
532            projection,
533            last_key: None,
534            predicate,
535            pk_schema,
536            pk_datatypes,
537            codec,
538            dedup,
539            sequence,
540            metrics: Metrics::default(),
541        })
542    }
543}
544
545impl Drop for Iter {
546    fn drop(&mut self) {
547        debug!(
548            "Iter {} time series memtable, metrics: {:?}",
549            self.metadata.region_id, self.metrics
550        );
551
552        READ_ROWS_TOTAL
553            .with_label_values(&["time_series_memtable"])
554            .inc_by(self.metrics.num_rows as u64);
555        READ_STAGE_ELAPSED
556            .with_label_values(&["scan_memtable"])
557            .observe(self.metrics.scan_cost.as_secs_f64());
558    }
559}
560
561impl Iterator for Iter {
562    type Item = Result<Batch>;
563
564    fn next(&mut self) -> Option<Self::Item> {
565        let start = Instant::now();
566        let map = self.series.read().unwrap();
567        let range = match &self.last_key {
568            None => map.range::<Vec<u8>, _>(..),
569            Some(last_key) => {
570                map.range::<Vec<u8>, _>((Bound::Excluded(last_key), Bound::Unbounded))
571            }
572        };
573
574        // TODO(hl): maybe yield more than one time series to amortize range overhead.
575        for (primary_key, series) in range {
576            self.metrics.total_series += 1;
577
578            let mut series = series.write().unwrap();
579            if !self.predicate.is_empty()
580                && !prune_primary_key(
581                    &self.codec,
582                    primary_key.as_slice(),
583                    &mut series,
584                    &self.pk_datatypes,
585                    self.pk_schema.clone(),
586                    &self.predicate,
587                )
588            {
589                // read next series
590                self.metrics.num_pruned_series += 1;
591                continue;
592            }
593            self.last_key = Some(primary_key.clone());
594
595            let values = series.compact(&self.metadata);
596            let batch = values.and_then(|v| {
597                v.to_batch(primary_key, &self.metadata, &self.projection, self.dedup)
598            });
599
600            // Update metrics.
601            self.metrics.num_batches += 1;
602            self.metrics.num_rows += batch.as_ref().map(|b| b.num_rows()).unwrap_or(0);
603            self.metrics.scan_cost += start.elapsed();
604
605            let mut batch = batch;
606            batch = batch.and_then(|mut batch| {
607                batch.filter_by_sequence(self.sequence)?;
608                Ok(batch)
609            });
610            return Some(batch);
611        }
612        self.metrics.scan_cost += start.elapsed();
613
614        None
615    }
616}
617
618fn prune_primary_key(
619    codec: &Arc<DensePrimaryKeyCodec>,
620    pk: &[u8],
621    series: &mut Series,
622    datatypes: &[ConcreteDataType],
623    pk_schema: arrow::datatypes::SchemaRef,
624    predicates: &[SimpleFilterEvaluator],
625) -> bool {
626    // no primary key, we simply return true.
627    if pk_schema.fields().is_empty() {
628        return true;
629    }
630
631    // retrieve primary key values from cache or decode from bytes.
632    let pk_values = if let Some(pk_values) = series.pk_cache.as_ref() {
633        pk_values
634    } else {
635        let pk_values = codec.decode_dense_without_column_id(pk);
636        if let Err(e) = pk_values {
637            error!(e; "Failed to decode primary key");
638            return true;
639        }
640        series.update_pk_cache(pk_values.unwrap());
641        series.pk_cache.as_ref().unwrap()
642    };
643
644    // evaluate predicates against primary key values
645    let mut result = true;
646    for predicate in predicates {
647        // ignore predicates that are not referencing primary key columns
648        let Ok(index) = pk_schema.index_of(predicate.column_name()) else {
649            continue;
650        };
651        // Safety: arrow schema and datatypes are constructed from the same source.
652        let scalar_value = pk_values[index]
653            .try_to_scalar_value(&datatypes[index])
654            .unwrap();
655        result &= predicate.evaluate_scalar(&scalar_value).unwrap_or(true);
656    }
657
658    result
659}
660
661/// A `Series` holds a list of field values of some given primary key.
662pub(crate) struct Series {
663    pk_cache: Option<Vec<Value>>,
664    active: ValueBuilder,
665    frozen: Vec<Values>,
666    region_metadata: RegionMetadataRef,
667}
668
669impl Series {
670    pub(crate) fn with_capacity(region_metadata: &RegionMetadataRef, builder_cap: usize) -> Self {
671        Self {
672            pk_cache: None,
673            active: ValueBuilder::new(region_metadata, builder_cap),
674            frozen: vec![],
675            region_metadata: region_metadata.clone(),
676        }
677    }
678
679    pub(crate) fn new(region_metadata: &RegionMetadataRef) -> Self {
680        Self::with_capacity(region_metadata, INITIAL_BUILDER_CAPACITY)
681    }
682
683    pub fn is_empty(&self) -> bool {
684        self.active.len() == 0 && self.frozen.is_empty()
685    }
686
687    /// Pushes a row of values into Series. Return the size of values.
688    pub(crate) fn push<'a>(
689        &mut self,
690        ts: ValueRef<'a>,
691        sequence: u64,
692        op_type: OpType,
693        values: impl Iterator<Item = ValueRef<'a>>,
694    ) -> usize {
695        // + 10 to avoid potential reallocation.
696        if self.active.len() + 10 > BUILDER_CAPACITY {
697            let region_metadata = self.region_metadata.clone();
698            self.freeze(&region_metadata);
699        }
700        self.active.push(ts, sequence, op_type as u8, values)
701    }
702
703    fn update_pk_cache(&mut self, pk_values: Vec<Value>) {
704        self.pk_cache = Some(pk_values);
705    }
706
707    /// Freezes the active part and push it to `frozen`.
708    pub(crate) fn freeze(&mut self, region_metadata: &RegionMetadataRef) {
709        if self.active.len() != 0 {
710            let mut builder = ValueBuilder::new(region_metadata, INITIAL_BUILDER_CAPACITY);
711            std::mem::swap(&mut self.active, &mut builder);
712            self.frozen.push(Values::from(builder));
713        }
714    }
715
716    pub(crate) fn extend(
717        &mut self,
718        ts_v: VectorRef,
719        op_type_v: u8,
720        sequence_v: u64,
721        fields: impl Iterator<Item = VectorRef>,
722    ) -> Result<()> {
723        self.active.extend(ts_v, op_type_v, sequence_v, fields)
724    }
725
726    /// Freezes active part to frozen part and compact frozen part to reduce memory fragmentation.
727    /// Returns the frozen and compacted values.
728    pub(crate) fn compact(&mut self, region_metadata: &RegionMetadataRef) -> Result<&Values> {
729        self.freeze(region_metadata);
730
731        let frozen = &self.frozen;
732
733        // Each series must contain at least one row
734        debug_assert!(!frozen.is_empty());
735
736        if frozen.len() > 1 {
737            // TODO(hl): We should keep track of min/max timestamps for each values and avoid
738            // cloning and sorting when values do not overlap with each other.
739
740            let column_size = frozen[0].fields.len() + 3;
741
742            if cfg!(debug_assertions) {
743                debug_assert!(frozen
744                    .iter()
745                    .zip(frozen.iter().skip(1))
746                    .all(|(prev, next)| { prev.fields.len() == next.fields.len() }));
747            }
748
749            let arrays = frozen.iter().map(|v| v.columns()).collect::<Vec<_>>();
750            let concatenated = (0..column_size)
751                .map(|i| {
752                    let to_concat = arrays.iter().map(|a| a[i].as_ref()).collect::<Vec<_>>();
753                    arrow::compute::concat(&to_concat)
754                })
755                .collect::<std::result::Result<Vec<_>, _>>()
756                .context(ComputeArrowSnafu)?;
757
758            debug_assert_eq!(concatenated.len(), column_size);
759            let values = Values::from_columns(&concatenated)?;
760            self.frozen = vec![values];
761        };
762        Ok(&self.frozen[0])
763    }
764}
765
766/// `ValueBuilder` holds all the vector builders for field columns.
767struct ValueBuilder {
768    timestamp: Vec<i64>,
769    timestamp_type: ConcreteDataType,
770    sequence: Vec<u64>,
771    op_type: Vec<u8>,
772    fields: Vec<Option<FieldBuilder>>,
773    field_types: Vec<ConcreteDataType>,
774}
775
776impl ValueBuilder {
777    pub(crate) fn new(region_metadata: &RegionMetadataRef, capacity: usize) -> Self {
778        let timestamp_type = region_metadata
779            .time_index_column()
780            .column_schema
781            .data_type
782            .clone();
783        let sequence = Vec::with_capacity(capacity);
784        let op_type = Vec::with_capacity(capacity);
785
786        let field_types = region_metadata
787            .field_columns()
788            .map(|c| c.column_schema.data_type.clone())
789            .collect::<Vec<_>>();
790        let fields = (0..field_types.len()).map(|_| None).collect();
791
792        Self {
793            timestamp: Vec::with_capacity(capacity),
794            timestamp_type,
795            sequence,
796            op_type,
797            fields,
798            field_types,
799        }
800    }
801
802    /// Pushes a new row to `ValueBuilder`.
803    /// We don't need primary keys since they've already be encoded.
804    /// Returns the size of field values.
805    ///
806    /// In this method, we don't check the data type of the value, because it is already checked in the caller.
807    fn push<'a>(
808        &mut self,
809        ts: ValueRef,
810        sequence: u64,
811        op_type: u8,
812        fields: impl Iterator<Item = ValueRef<'a>>,
813    ) -> usize {
814        #[cfg(debug_assertions)]
815        let fields = {
816            let field_vec = fields.collect::<Vec<_>>();
817            debug_assert_eq!(field_vec.len(), self.fields.len());
818            field_vec.into_iter()
819        };
820
821        self.timestamp
822            .push(ts.as_timestamp().unwrap().unwrap().value());
823        self.sequence.push(sequence);
824        self.op_type.push(op_type);
825        let num_rows = self.timestamp.len();
826        let mut size = 0;
827        for (idx, field_value) in fields.enumerate() {
828            size += field_value.data_size();
829            if !field_value.is_null() || self.fields[idx].is_some() {
830                if let Some(field) = self.fields[idx].as_mut() {
831                    let _ = field.push(field_value);
832                } else {
833                    let mut mutable_vector =
834                        if let ConcreteDataType::String(_) = &self.field_types[idx] {
835                            FieldBuilder::String(StringBuilder::with_capacity(256, 4096))
836                        } else {
837                            FieldBuilder::Other(
838                                self.field_types[idx]
839                                    .create_mutable_vector(num_rows.max(INITIAL_BUILDER_CAPACITY)),
840                            )
841                        };
842                    mutable_vector.push_nulls(num_rows - 1);
843                    let _ = mutable_vector.push(field_value);
844                    self.fields[idx] = Some(mutable_vector);
845                }
846            }
847        }
848
849        size
850    }
851
852    pub(crate) fn extend(
853        &mut self,
854        ts_v: VectorRef,
855        op_type: u8,
856        sequence: u64,
857        fields: impl Iterator<Item = VectorRef>,
858    ) -> error::Result<()> {
859        let num_rows_before = self.timestamp.len();
860        let num_rows_to_write = ts_v.len();
861        self.timestamp.reserve(num_rows_to_write);
862        match self.timestamp_type {
863            ConcreteDataType::Timestamp(TimestampType::Second(_)) => {
864                self.timestamp.extend(
865                    ts_v.as_any()
866                        .downcast_ref::<TimestampSecondVector>()
867                        .unwrap()
868                        .iter_data()
869                        .map(|v| v.unwrap().0.value()),
870                );
871            }
872            ConcreteDataType::Timestamp(TimestampType::Millisecond(_)) => {
873                self.timestamp.extend(
874                    ts_v.as_any()
875                        .downcast_ref::<TimestampMillisecondVector>()
876                        .unwrap()
877                        .iter_data()
878                        .map(|v| v.unwrap().0.value()),
879                );
880            }
881            ConcreteDataType::Timestamp(TimestampType::Microsecond(_)) => {
882                self.timestamp.extend(
883                    ts_v.as_any()
884                        .downcast_ref::<TimestampMicrosecondVector>()
885                        .unwrap()
886                        .iter_data()
887                        .map(|v| v.unwrap().0.value()),
888                );
889            }
890            ConcreteDataType::Timestamp(TimestampType::Nanosecond(_)) => {
891                self.timestamp.extend(
892                    ts_v.as_any()
893                        .downcast_ref::<TimestampNanosecondVector>()
894                        .unwrap()
895                        .iter_data()
896                        .map(|v| v.unwrap().0.value()),
897                );
898            }
899            _ => unreachable!(),
900        };
901
902        self.op_type.reserve(num_rows_to_write);
903        self.op_type
904            .extend(iter::repeat_n(op_type, num_rows_to_write));
905        self.sequence.reserve(num_rows_to_write);
906        self.sequence
907            .extend(iter::repeat_n(sequence, num_rows_to_write));
908
909        for (field_idx, (field_src, field_dest)) in fields.zip(self.fields.iter_mut()).enumerate() {
910            let builder = field_dest.get_or_insert_with(|| {
911                let mut field_builder =
912                    FieldBuilder::create(&self.field_types[field_idx], INITIAL_BUILDER_CAPACITY);
913                field_builder.push_nulls(num_rows_before);
914                field_builder
915            });
916            match builder {
917                FieldBuilder::String(builder) => {
918                    let array = field_src.to_arrow_array();
919                    let string_array =
920                        array
921                            .as_any()
922                            .downcast_ref::<StringArray>()
923                            .with_context(|| error::InvalidBatchSnafu {
924                                reason: format!(
925                                    "Field type mismatch, expecting String, given: {}",
926                                    field_src.data_type()
927                                ),
928                            })?;
929                    builder.append_array(string_array);
930                }
931                FieldBuilder::Other(builder) => {
932                    let len = field_src.len();
933                    builder
934                        .extend_slice_of(&*field_src, 0, len)
935                        .context(error::ComputeVectorSnafu)?;
936                }
937            }
938        }
939        Ok(())
940    }
941
942    /// Returns the length of [ValueBuilder]
943    fn len(&self) -> usize {
944        let sequence_len = self.sequence.len();
945        debug_assert_eq!(sequence_len, self.op_type.len());
946        debug_assert_eq!(sequence_len, self.timestamp.len());
947        sequence_len
948    }
949}
950
951/// [Values] holds an immutable vectors of field columns, including `sequence` and `op_type`.
952#[derive(Clone)]
953pub(crate) struct Values {
954    timestamp: VectorRef,
955    sequence: Arc<UInt64Vector>,
956    op_type: Arc<UInt8Vector>,
957    fields: Vec<VectorRef>,
958}
959
960impl Values {
961    /// Converts [Values] to `Batch`, sorts the batch according to `timestamp, sequence` desc and
962    /// keeps only the latest row for the same timestamp.
963    pub fn to_batch(
964        &self,
965        primary_key: &[u8],
966        metadata: &RegionMetadataRef,
967        projection: &HashSet<ColumnId>,
968        dedup: bool,
969    ) -> Result<Batch> {
970        let builder = BatchBuilder::with_required_columns(
971            primary_key.to_vec(),
972            self.timestamp.clone(),
973            self.sequence.clone(),
974            self.op_type.clone(),
975        );
976
977        let fields = metadata
978            .field_columns()
979            .zip(self.fields.iter())
980            .filter_map(|(c, f)| {
981                projection.get(&c.column_id).map(|c| BatchColumn {
982                    column_id: *c,
983                    data: f.clone(),
984                })
985            })
986            .collect();
987
988        let mut batch = builder.with_fields(fields).build()?;
989        batch.sort(dedup)?;
990        Ok(batch)
991    }
992
993    /// Returns a vector of all columns converted to arrow [Array](datatypes::arrow::array::Array) in [Values].
994    fn columns(&self) -> Vec<ArrayRef> {
995        let mut res = Vec::with_capacity(3 + self.fields.len());
996        res.push(self.timestamp.to_arrow_array());
997        res.push(self.sequence.to_arrow_array());
998        res.push(self.op_type.to_arrow_array());
999        res.extend(self.fields.iter().map(|f| f.to_arrow_array()));
1000        res
1001    }
1002
1003    /// Builds a new [Values] instance from columns.
1004    fn from_columns(cols: &[ArrayRef]) -> Result<Self> {
1005        debug_assert!(cols.len() >= 3);
1006        let timestamp = Helper::try_into_vector(&cols[0]).context(ConvertVectorSnafu)?;
1007        let sequence =
1008            Arc::new(UInt64Vector::try_from_arrow_array(&cols[1]).context(ConvertVectorSnafu)?);
1009        let op_type =
1010            Arc::new(UInt8Vector::try_from_arrow_array(&cols[2]).context(ConvertVectorSnafu)?);
1011        let fields = Helper::try_into_vectors(&cols[3..]).context(ConvertVectorSnafu)?;
1012
1013        Ok(Self {
1014            timestamp,
1015            sequence,
1016            op_type,
1017            fields,
1018        })
1019    }
1020}
1021
1022impl From<ValueBuilder> for Values {
1023    fn from(mut value: ValueBuilder) -> Self {
1024        let num_rows = value.len();
1025        let fields = value
1026            .fields
1027            .iter_mut()
1028            .enumerate()
1029            .map(|(i, v)| {
1030                if let Some(v) = v {
1031                    v.finish()
1032                } else {
1033                    let mut single_null = value.field_types[i].create_mutable_vector(num_rows);
1034                    single_null.push_nulls(num_rows);
1035                    single_null.to_vector()
1036                }
1037            })
1038            .collect::<Vec<_>>();
1039
1040        let sequence = Arc::new(UInt64Vector::from_vec(value.sequence));
1041        let op_type = Arc::new(UInt8Vector::from_vec(value.op_type));
1042        let timestamp: VectorRef = match value.timestamp_type {
1043            ConcreteDataType::Timestamp(TimestampType::Second(_)) => {
1044                Arc::new(TimestampSecondVector::from_vec(value.timestamp))
1045            }
1046            ConcreteDataType::Timestamp(TimestampType::Millisecond(_)) => {
1047                Arc::new(TimestampMillisecondVector::from_vec(value.timestamp))
1048            }
1049            ConcreteDataType::Timestamp(TimestampType::Microsecond(_)) => {
1050                Arc::new(TimestampMicrosecondVector::from_vec(value.timestamp))
1051            }
1052            ConcreteDataType::Timestamp(TimestampType::Nanosecond(_)) => {
1053                Arc::new(TimestampNanosecondVector::from_vec(value.timestamp))
1054            }
1055            _ => unreachable!(),
1056        };
1057
1058        if cfg!(debug_assertions) {
1059            debug_assert_eq!(timestamp.len(), sequence.len());
1060            debug_assert_eq!(timestamp.len(), op_type.len());
1061            for field in &fields {
1062                debug_assert_eq!(timestamp.len(), field.len());
1063            }
1064        }
1065
1066        Self {
1067            timestamp,
1068            sequence,
1069            op_type,
1070            fields,
1071        }
1072    }
1073}
1074
1075struct TimeSeriesIterBuilder {
1076    series_set: SeriesSet,
1077    projection: HashSet<ColumnId>,
1078    predicate: Option<Predicate>,
1079    dedup: bool,
1080    sequence: Option<SequenceNumber>,
1081    merge_mode: MergeMode,
1082}
1083
1084impl IterBuilder for TimeSeriesIterBuilder {
1085    fn build(&self) -> Result<BoxedBatchIterator> {
1086        let iter = self.series_set.iter_series(
1087            self.projection.clone(),
1088            self.predicate.clone(),
1089            self.dedup,
1090            self.sequence,
1091        )?;
1092
1093        if self.merge_mode == MergeMode::LastNonNull {
1094            let iter = LastNonNullIter::new(iter);
1095            Ok(Box::new(iter))
1096        } else {
1097            Ok(Box::new(iter))
1098        }
1099    }
1100}
1101
1102#[cfg(test)]
1103mod tests {
1104    use std::collections::{HashMap, HashSet};
1105
1106    use api::helper::ColumnDataTypeWrapper;
1107    use api::v1::value::ValueData;
1108    use api::v1::{Mutation, Row, Rows, SemanticType};
1109    use common_time::Timestamp;
1110    use datatypes::prelude::{ConcreteDataType, ScalarVector};
1111    use datatypes::schema::ColumnSchema;
1112    use datatypes::value::{OrderedFloat, Value};
1113    use datatypes::vectors::{Float64Vector, Int64Vector, TimestampMillisecondVector};
1114    use mito_codec::row_converter::SortField;
1115    use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
1116    use store_api::storage::RegionId;
1117
1118    use super::*;
1119    use crate::test_util::column_metadata_to_column_schema;
1120
1121    fn schema_for_test() -> RegionMetadataRef {
1122        let mut builder = RegionMetadataBuilder::new(RegionId::new(123, 456));
1123        builder
1124            .push_column_metadata(ColumnMetadata {
1125                column_schema: ColumnSchema::new("k0", ConcreteDataType::string_datatype(), false),
1126                semantic_type: SemanticType::Tag,
1127                column_id: 0,
1128            })
1129            .push_column_metadata(ColumnMetadata {
1130                column_schema: ColumnSchema::new("k1", ConcreteDataType::int64_datatype(), false),
1131                semantic_type: SemanticType::Tag,
1132                column_id: 1,
1133            })
1134            .push_column_metadata(ColumnMetadata {
1135                column_schema: ColumnSchema::new(
1136                    "ts",
1137                    ConcreteDataType::timestamp_millisecond_datatype(),
1138                    false,
1139                ),
1140                semantic_type: SemanticType::Timestamp,
1141                column_id: 2,
1142            })
1143            .push_column_metadata(ColumnMetadata {
1144                column_schema: ColumnSchema::new("v0", ConcreteDataType::int64_datatype(), true),
1145                semantic_type: SemanticType::Field,
1146                column_id: 3,
1147            })
1148            .push_column_metadata(ColumnMetadata {
1149                column_schema: ColumnSchema::new("v1", ConcreteDataType::float64_datatype(), true),
1150                semantic_type: SemanticType::Field,
1151                column_id: 4,
1152            })
1153            .primary_key(vec![0, 1]);
1154        let region_metadata = builder.build().unwrap();
1155        Arc::new(region_metadata)
1156    }
1157
1158    fn ts_value_ref(val: i64) -> ValueRef<'static> {
1159        ValueRef::Timestamp(Timestamp::new_millisecond(val))
1160    }
1161
1162    fn field_value_ref(v0: i64, v1: f64) -> impl Iterator<Item = ValueRef<'static>> {
1163        vec![ValueRef::Int64(v0), ValueRef::Float64(OrderedFloat(v1))].into_iter()
1164    }
1165
1166    fn check_values(values: &Values, expect: &[(i64, u64, u8, i64, f64)]) {
1167        let ts = values
1168            .timestamp
1169            .as_any()
1170            .downcast_ref::<TimestampMillisecondVector>()
1171            .unwrap();
1172
1173        let v0 = values.fields[0]
1174            .as_any()
1175            .downcast_ref::<Int64Vector>()
1176            .unwrap();
1177        let v1 = values.fields[1]
1178            .as_any()
1179            .downcast_ref::<Float64Vector>()
1180            .unwrap();
1181        let read = ts
1182            .iter_data()
1183            .zip(values.sequence.iter_data())
1184            .zip(values.op_type.iter_data())
1185            .zip(v0.iter_data())
1186            .zip(v1.iter_data())
1187            .map(|((((ts, sequence), op_type), v0), v1)| {
1188                (
1189                    ts.unwrap().0.value(),
1190                    sequence.unwrap(),
1191                    op_type.unwrap(),
1192                    v0.unwrap(),
1193                    v1.unwrap(),
1194                )
1195            })
1196            .collect::<Vec<_>>();
1197        assert_eq!(expect, &read);
1198    }
1199
1200    #[test]
1201    fn test_series() {
1202        let region_metadata = schema_for_test();
1203        let mut series = Series::new(&region_metadata);
1204        series.push(ts_value_ref(1), 0, OpType::Put, field_value_ref(1, 10.1));
1205        series.push(ts_value_ref(2), 0, OpType::Put, field_value_ref(2, 10.2));
1206        assert_eq!(2, series.active.timestamp.len());
1207        assert_eq!(0, series.frozen.len());
1208
1209        let values = series.compact(&region_metadata).unwrap();
1210        check_values(values, &[(1, 0, 1, 1, 10.1), (2, 0, 1, 2, 10.2)]);
1211        assert_eq!(0, series.active.timestamp.len());
1212        assert_eq!(1, series.frozen.len());
1213    }
1214
1215    #[test]
1216    fn test_series_with_nulls() {
1217        let region_metadata = schema_for_test();
1218        let mut series = Series::new(&region_metadata);
1219        // col1: NULL 1 2 3
1220        // col2: NULL NULL 10.2 NULL
1221        series.push(
1222            ts_value_ref(1),
1223            0,
1224            OpType::Put,
1225            vec![ValueRef::Null, ValueRef::Null].into_iter(),
1226        );
1227        series.push(
1228            ts_value_ref(1),
1229            0,
1230            OpType::Put,
1231            vec![ValueRef::Int64(1), ValueRef::Null].into_iter(),
1232        );
1233        series.push(ts_value_ref(1), 2, OpType::Put, field_value_ref(2, 10.2));
1234        series.push(
1235            ts_value_ref(1),
1236            3,
1237            OpType::Put,
1238            vec![ValueRef::Int64(2), ValueRef::Null].into_iter(),
1239        );
1240        assert_eq!(4, series.active.timestamp.len());
1241        assert_eq!(0, series.frozen.len());
1242
1243        let values = series.compact(&region_metadata).unwrap();
1244        assert_eq!(values.fields[0].null_count(), 1);
1245        assert_eq!(values.fields[1].null_count(), 3);
1246        assert_eq!(0, series.active.timestamp.len());
1247        assert_eq!(1, series.frozen.len());
1248    }
1249
1250    fn check_value(batch: &Batch, expect: Vec<Vec<Value>>) {
1251        let ts_len = batch.timestamps().len();
1252        assert_eq!(batch.sequences().len(), ts_len);
1253        assert_eq!(batch.op_types().len(), ts_len);
1254        for f in batch.fields() {
1255            assert_eq!(f.data.len(), ts_len);
1256        }
1257
1258        let mut rows = vec![];
1259        for idx in 0..ts_len {
1260            let mut row = Vec::with_capacity(batch.fields().len() + 3);
1261            row.push(batch.timestamps().get(idx));
1262            row.push(batch.sequences().get(idx));
1263            row.push(batch.op_types().get(idx));
1264            row.extend(batch.fields().iter().map(|f| f.data.get(idx)));
1265            rows.push(row);
1266        }
1267
1268        assert_eq!(expect.len(), rows.len());
1269        for (idx, row) in rows.iter().enumerate() {
1270            assert_eq!(&expect[idx], row);
1271        }
1272    }
1273
1274    #[test]
1275    fn test_values_sort() {
1276        let schema = schema_for_test();
1277        let timestamp = Arc::new(TimestampMillisecondVector::from_vec(vec![1, 2, 3, 4, 3]));
1278        let sequence = Arc::new(UInt64Vector::from_vec(vec![1, 1, 1, 1, 2]));
1279        let op_type = Arc::new(UInt8Vector::from_vec(vec![1, 1, 1, 1, 0]));
1280
1281        let fields = vec![
1282            Arc::new(Int64Vector::from_vec(vec![4, 3, 2, 1, 2])) as Arc<_>,
1283            Arc::new(Float64Vector::from_vec(vec![1.1, 2.1, 4.2, 3.3, 4.2])) as Arc<_>,
1284        ];
1285        let values = Values {
1286            timestamp: timestamp as Arc<_>,
1287            sequence,
1288            op_type,
1289            fields,
1290        };
1291
1292        let batch = values
1293            .to_batch(
1294                b"test",
1295                &schema,
1296                &[0, 1, 2, 3, 4].into_iter().collect(),
1297                true,
1298            )
1299            .unwrap();
1300        check_value(
1301            &batch,
1302            vec![
1303                vec![
1304                    Value::Timestamp(Timestamp::new_millisecond(1)),
1305                    Value::UInt64(1),
1306                    Value::UInt8(1),
1307                    Value::Int64(4),
1308                    Value::Float64(OrderedFloat(1.1)),
1309                ],
1310                vec![
1311                    Value::Timestamp(Timestamp::new_millisecond(2)),
1312                    Value::UInt64(1),
1313                    Value::UInt8(1),
1314                    Value::Int64(3),
1315                    Value::Float64(OrderedFloat(2.1)),
1316                ],
1317                vec![
1318                    Value::Timestamp(Timestamp::new_millisecond(3)),
1319                    Value::UInt64(2),
1320                    Value::UInt8(0),
1321                    Value::Int64(2),
1322                    Value::Float64(OrderedFloat(4.2)),
1323                ],
1324                vec![
1325                    Value::Timestamp(Timestamp::new_millisecond(4)),
1326                    Value::UInt64(1),
1327                    Value::UInt8(1),
1328                    Value::Int64(1),
1329                    Value::Float64(OrderedFloat(3.3)),
1330                ],
1331            ],
1332        )
1333    }
1334
1335    fn build_key_values(schema: &RegionMetadataRef, k0: String, k1: i64, len: usize) -> KeyValues {
1336        let column_schema = schema
1337            .column_metadatas
1338            .iter()
1339            .map(|c| api::v1::ColumnSchema {
1340                column_name: c.column_schema.name.clone(),
1341                datatype: ColumnDataTypeWrapper::try_from(c.column_schema.data_type.clone())
1342                    .unwrap()
1343                    .datatype() as i32,
1344                semantic_type: c.semantic_type as i32,
1345                ..Default::default()
1346            })
1347            .collect();
1348
1349        let rows = (0..len)
1350            .map(|i| Row {
1351                values: vec![
1352                    api::v1::Value {
1353                        value_data: Some(ValueData::StringValue(k0.clone())),
1354                    },
1355                    api::v1::Value {
1356                        value_data: Some(ValueData::I64Value(k1)),
1357                    },
1358                    api::v1::Value {
1359                        value_data: Some(ValueData::TimestampMillisecondValue(i as i64)),
1360                    },
1361                    api::v1::Value {
1362                        value_data: Some(ValueData::I64Value(i as i64)),
1363                    },
1364                    api::v1::Value {
1365                        value_data: Some(ValueData::F64Value(i as f64)),
1366                    },
1367                ],
1368            })
1369            .collect();
1370        let mutation = api::v1::Mutation {
1371            op_type: 1,
1372            sequence: 0,
1373            rows: Some(Rows {
1374                schema: column_schema,
1375                rows,
1376            }),
1377            write_hint: None,
1378        };
1379        KeyValues::new(schema.as_ref(), mutation).unwrap()
1380    }
1381
1382    #[test]
1383    fn test_series_set_concurrency() {
1384        let schema = schema_for_test();
1385        let row_codec = Arc::new(DensePrimaryKeyCodec::with_fields(
1386            schema
1387                .primary_key_columns()
1388                .map(|c| {
1389                    (
1390                        c.column_id,
1391                        SortField::new(c.column_schema.data_type.clone()),
1392                    )
1393                })
1394                .collect(),
1395        ));
1396        let set = Arc::new(SeriesSet::new(schema.clone(), row_codec));
1397
1398        let concurrency = 32;
1399        let pk_num = concurrency * 2;
1400        let mut handles = Vec::with_capacity(concurrency);
1401        for i in 0..concurrency {
1402            let set = set.clone();
1403            let schema = schema.clone();
1404            let column_schemas = schema
1405                .column_metadatas
1406                .iter()
1407                .map(column_metadata_to_column_schema)
1408                .collect::<Vec<_>>();
1409            let handle = std::thread::spawn(move || {
1410                for j in i * 100..(i + 1) * 100 {
1411                    let pk = j % pk_num;
1412                    let primary_key = format!("pk-{}", pk).as_bytes().to_vec();
1413
1414                    let kvs = KeyValues::new(
1415                        &schema,
1416                        Mutation {
1417                            op_type: OpType::Put as i32,
1418                            sequence: j as u64,
1419                            rows: Some(Rows {
1420                                schema: column_schemas.clone(),
1421                                rows: vec![Row {
1422                                    values: vec![
1423                                        api::v1::Value {
1424                                            value_data: Some(ValueData::StringValue(format!(
1425                                                "{}",
1426                                                j
1427                                            ))),
1428                                        },
1429                                        api::v1::Value {
1430                                            value_data: Some(ValueData::I64Value(j as i64)),
1431                                        },
1432                                        api::v1::Value {
1433                                            value_data: Some(ValueData::TimestampMillisecondValue(
1434                                                j as i64,
1435                                            )),
1436                                        },
1437                                        api::v1::Value {
1438                                            value_data: Some(ValueData::I64Value(j as i64)),
1439                                        },
1440                                        api::v1::Value {
1441                                            value_data: Some(ValueData::F64Value(j as f64)),
1442                                        },
1443                                    ],
1444                                }],
1445                            }),
1446                            write_hint: None,
1447                        },
1448                    )
1449                    .unwrap();
1450                    set.push_to_series(primary_key, &kvs.iter().next().unwrap());
1451                }
1452            });
1453            handles.push(handle);
1454        }
1455        for h in handles {
1456            h.join().unwrap();
1457        }
1458
1459        let mut timestamps = Vec::with_capacity(concurrency * 100);
1460        let mut sequences = Vec::with_capacity(concurrency * 100);
1461        let mut op_types = Vec::with_capacity(concurrency * 100);
1462        let mut v0 = Vec::with_capacity(concurrency * 100);
1463
1464        for i in 0..pk_num {
1465            let pk = format!("pk-{}", i).as_bytes().to_vec();
1466            let series = set.get_series(&pk).unwrap();
1467            let mut guard = series.write().unwrap();
1468            let values = guard.compact(&schema).unwrap();
1469            timestamps.extend(values.sequence.iter_data().map(|v| v.unwrap() as i64));
1470            sequences.extend(values.sequence.iter_data().map(|v| v.unwrap() as i64));
1471            op_types.extend(values.op_type.iter_data().map(|v| v.unwrap()));
1472            v0.extend(
1473                values
1474                    .fields
1475                    .first()
1476                    .unwrap()
1477                    .as_any()
1478                    .downcast_ref::<Int64Vector>()
1479                    .unwrap()
1480                    .iter_data()
1481                    .map(|v| v.unwrap()),
1482            );
1483        }
1484
1485        let expected_sequence = (0..(concurrency * 100) as i64).collect::<HashSet<_>>();
1486        assert_eq!(
1487            expected_sequence,
1488            sequences.iter().copied().collect::<HashSet<_>>()
1489        );
1490
1491        op_types.iter().all(|op| *op == OpType::Put as u8);
1492        assert_eq!(
1493            expected_sequence,
1494            timestamps.iter().copied().collect::<HashSet<_>>()
1495        );
1496
1497        assert_eq!(timestamps, sequences);
1498        assert_eq!(v0, timestamps);
1499    }
1500
1501    #[test]
1502    fn test_memtable() {
1503        common_telemetry::init_default_ut_logging();
1504        check_memtable_dedup(true);
1505        check_memtable_dedup(false);
1506    }
1507
1508    fn check_memtable_dedup(dedup: bool) {
1509        let schema = schema_for_test();
1510        let kvs = build_key_values(&schema, "hello".to_string(), 42, 100);
1511        let memtable = TimeSeriesMemtable::new(schema, 42, None, dedup, MergeMode::LastRow);
1512        memtable.write(&kvs).unwrap();
1513        memtable.write(&kvs).unwrap();
1514
1515        let mut expected_ts: HashMap<i64, usize> = HashMap::new();
1516        for ts in kvs
1517            .iter()
1518            .map(|kv| kv.timestamp().as_timestamp().unwrap().unwrap().value())
1519        {
1520            *expected_ts.entry(ts).or_default() += if dedup { 1 } else { 2 };
1521        }
1522
1523        let iter = memtable.iter(None, None, None).unwrap();
1524        let mut read = HashMap::new();
1525
1526        for ts in iter
1527            .flat_map(|batch| {
1528                batch
1529                    .unwrap()
1530                    .timestamps()
1531                    .as_any()
1532                    .downcast_ref::<TimestampMillisecondVector>()
1533                    .unwrap()
1534                    .iter_data()
1535                    .collect::<Vec<_>>()
1536                    .into_iter()
1537            })
1538            .map(|v| v.unwrap().0.value())
1539        {
1540            *read.entry(ts).or_default() += 1;
1541        }
1542        assert_eq!(expected_ts, read);
1543
1544        let stats = memtable.stats();
1545        assert!(stats.bytes_allocated() > 0);
1546        assert_eq!(
1547            Some((
1548                Timestamp::new_millisecond(0),
1549                Timestamp::new_millisecond(99)
1550            )),
1551            stats.time_range()
1552        );
1553    }
1554
1555    #[test]
1556    fn test_memtable_projection() {
1557        common_telemetry::init_default_ut_logging();
1558        let schema = schema_for_test();
1559        let kvs = build_key_values(&schema, "hello".to_string(), 42, 100);
1560        let memtable = TimeSeriesMemtable::new(schema, 42, None, true, MergeMode::LastRow);
1561        memtable.write(&kvs).unwrap();
1562
1563        let iter = memtable.iter(Some(&[3]), None, None).unwrap();
1564
1565        let mut v0_all = vec![];
1566
1567        for res in iter {
1568            let batch = res.unwrap();
1569            assert_eq!(1, batch.fields().len());
1570            let v0 = batch
1571                .fields()
1572                .first()
1573                .unwrap()
1574                .data
1575                .as_any()
1576                .downcast_ref::<Int64Vector>()
1577                .unwrap();
1578            v0_all.extend(v0.iter_data().map(|v| v.unwrap()));
1579        }
1580        assert_eq!((0..100i64).collect::<Vec<_>>(), v0_all);
1581    }
1582
1583    #[test]
1584    fn test_memtable_concurrent_write_read() {
1585        common_telemetry::init_default_ut_logging();
1586        let schema = schema_for_test();
1587        let memtable = Arc::new(TimeSeriesMemtable::new(
1588            schema.clone(),
1589            42,
1590            None,
1591            true,
1592            MergeMode::LastRow,
1593        ));
1594
1595        // Number of writer threads
1596        let num_writers = 10;
1597        // Number of reader threads
1598        let num_readers = 5;
1599        // Number of series per writer
1600        let series_per_writer = 100;
1601        // Number of rows per series
1602        let rows_per_series = 10;
1603        // Total number of series
1604        let total_series = num_writers * series_per_writer;
1605
1606        // Create a barrier to synchronize the start of all threads
1607        let barrier = Arc::new(std::sync::Barrier::new(num_writers + num_readers + 1));
1608
1609        // Spawn writer threads
1610        let mut writer_handles = Vec::with_capacity(num_writers);
1611        for writer_id in 0..num_writers {
1612            let memtable = memtable.clone();
1613            let schema = schema.clone();
1614            let barrier = barrier.clone();
1615
1616            let handle = std::thread::spawn(move || {
1617                // Wait for all threads to be ready
1618                barrier.wait();
1619
1620                // Create and write series
1621                for series_id in 0..series_per_writer {
1622                    let series_key = format!("writer-{}-series-{}", writer_id, series_id);
1623                    let kvs =
1624                        build_key_values(&schema, series_key, series_id as i64, rows_per_series);
1625                    memtable.write(&kvs).unwrap();
1626                }
1627            });
1628
1629            writer_handles.push(handle);
1630        }
1631
1632        // Spawn reader threads
1633        let mut reader_handles = Vec::with_capacity(num_readers);
1634        for _ in 0..num_readers {
1635            let memtable = memtable.clone();
1636            let barrier = barrier.clone();
1637
1638            let handle = std::thread::spawn(move || {
1639                barrier.wait();
1640
1641                for _ in 0..10 {
1642                    let iter = memtable.iter(None, None, None).unwrap();
1643                    for batch_result in iter {
1644                        let _ = batch_result.unwrap();
1645                    }
1646                }
1647            });
1648
1649            reader_handles.push(handle);
1650        }
1651
1652        barrier.wait();
1653
1654        for handle in writer_handles {
1655            handle.join().unwrap();
1656        }
1657        for handle in reader_handles {
1658            handle.join().unwrap();
1659        }
1660
1661        let iter = memtable.iter(None, None, None).unwrap();
1662        let mut series_count = 0;
1663        let mut row_count = 0;
1664
1665        for batch_result in iter {
1666            let batch = batch_result.unwrap();
1667            series_count += 1;
1668            row_count += batch.num_rows();
1669        }
1670        assert_eq!(total_series, series_count);
1671        assert_eq!(total_series * rows_per_series, row_count);
1672    }
1673}