mito2/memtable/
time_series.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::btree_map::Entry;
16use std::collections::{BTreeMap, Bound, HashSet};
17use std::fmt::{Debug, Formatter};
18use std::iter;
19use std::sync::atomic::{AtomicI64, AtomicU64, AtomicUsize, Ordering};
20use std::sync::{Arc, RwLock};
21use std::time::{Duration, Instant};
22
23use api::v1::OpType;
24use common_recordbatch::filter::SimpleFilterEvaluator;
25use common_telemetry::{debug, error};
26use common_time::Timestamp;
27use datatypes::arrow;
28use datatypes::arrow::array::ArrayRef;
29use datatypes::arrow_array::StringArray;
30use datatypes::data_type::{ConcreteDataType, DataType};
31use datatypes::prelude::{ScalarVector, Vector, VectorRef};
32use datatypes::types::TimestampType;
33use datatypes::value::{Value, ValueRef};
34use datatypes::vectors::{
35    Helper, TimestampMicrosecondVector, TimestampMillisecondVector, TimestampNanosecondVector,
36    TimestampSecondVector, UInt64Vector, UInt8Vector,
37};
38use snafu::{ensure, OptionExt, ResultExt};
39use store_api::metadata::RegionMetadataRef;
40use store_api::storage::{ColumnId, SequenceNumber};
41use table::predicate::Predicate;
42
43use crate::error;
44use crate::error::{ComputeArrowSnafu, ConvertVectorSnafu, PrimaryKeyLengthMismatchSnafu, Result};
45use crate::flush::WriteBufferManagerRef;
46use crate::memtable::builder::{FieldBuilder, StringBuilder};
47use crate::memtable::bulk::part::BulkPart;
48use crate::memtable::key_values::KeyValue;
49use crate::memtable::simple_bulk_memtable::SimpleBulkMemtable;
50use crate::memtable::stats::WriteMetrics;
51use crate::memtable::{
52    AllocTracker, BoxedBatchIterator, IterBuilder, KeyValues, Memtable, MemtableBuilder,
53    MemtableId, MemtableRange, MemtableRangeContext, MemtableRanges, MemtableRef, MemtableStats,
54    PredicateGroup,
55};
56use crate::metrics::{READ_ROWS_TOTAL, READ_STAGE_ELAPSED};
57use crate::read::dedup::LastNonNullIter;
58use crate::read::{Batch, BatchBuilder, BatchColumn};
59use crate::region::options::MergeMode;
60use crate::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt};
61
62/// Initial vector builder capacity.
63const INITIAL_BUILDER_CAPACITY: usize = 1024 * 8;
64
65/// Vector builder capacity.
66const BUILDER_CAPACITY: usize = 512;
67
68/// Builder to build [TimeSeriesMemtable].
69#[derive(Debug, Default)]
70pub struct TimeSeriesMemtableBuilder {
71    write_buffer_manager: Option<WriteBufferManagerRef>,
72    dedup: bool,
73    merge_mode: MergeMode,
74}
75
76impl TimeSeriesMemtableBuilder {
77    /// Creates a new builder with specific `write_buffer_manager`.
78    pub fn new(
79        write_buffer_manager: Option<WriteBufferManagerRef>,
80        dedup: bool,
81        merge_mode: MergeMode,
82    ) -> Self {
83        Self {
84            write_buffer_manager,
85            dedup,
86            merge_mode,
87        }
88    }
89}
90
91impl MemtableBuilder for TimeSeriesMemtableBuilder {
92    fn build(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef {
93        if metadata.primary_key.is_empty() {
94            Arc::new(SimpleBulkMemtable::new(
95                id,
96                metadata.clone(),
97                self.write_buffer_manager.clone(),
98                self.dedup,
99                self.merge_mode,
100            ))
101        } else {
102            Arc::new(TimeSeriesMemtable::new(
103                metadata.clone(),
104                id,
105                self.write_buffer_manager.clone(),
106                self.dedup,
107                self.merge_mode,
108            ))
109        }
110    }
111}
112
113/// Memtable implementation that groups rows by their primary key.
114pub struct TimeSeriesMemtable {
115    id: MemtableId,
116    region_metadata: RegionMetadataRef,
117    row_codec: Arc<DensePrimaryKeyCodec>,
118    series_set: SeriesSet,
119    alloc_tracker: AllocTracker,
120    max_timestamp: AtomicI64,
121    min_timestamp: AtomicI64,
122    max_sequence: AtomicU64,
123    dedup: bool,
124    merge_mode: MergeMode,
125    /// Total written rows in memtable. This also includes deleted and duplicated rows.
126    num_rows: AtomicUsize,
127}
128
129impl TimeSeriesMemtable {
130    pub fn new(
131        region_metadata: RegionMetadataRef,
132        id: MemtableId,
133        write_buffer_manager: Option<WriteBufferManagerRef>,
134        dedup: bool,
135        merge_mode: MergeMode,
136    ) -> Self {
137        let row_codec = Arc::new(DensePrimaryKeyCodec::new(&region_metadata));
138        let series_set = SeriesSet::new(region_metadata.clone(), row_codec.clone());
139        let dedup = if merge_mode == MergeMode::LastNonNull {
140            false
141        } else {
142            dedup
143        };
144        Self {
145            id,
146            region_metadata,
147            series_set,
148            row_codec,
149            alloc_tracker: AllocTracker::new(write_buffer_manager),
150            max_timestamp: AtomicI64::new(i64::MIN),
151            min_timestamp: AtomicI64::new(i64::MAX),
152            max_sequence: AtomicU64::new(0),
153            dedup,
154            merge_mode,
155            num_rows: Default::default(),
156        }
157    }
158
159    /// Updates memtable stats.
160    fn update_stats(&self, stats: WriteMetrics) {
161        self.alloc_tracker
162            .on_allocation(stats.key_bytes + stats.value_bytes);
163        self.max_timestamp.fetch_max(stats.max_ts, Ordering::SeqCst);
164        self.min_timestamp.fetch_min(stats.min_ts, Ordering::SeqCst);
165        self.max_sequence
166            .fetch_max(stats.max_sequence, Ordering::SeqCst);
167        self.num_rows.fetch_add(stats.num_rows, Ordering::SeqCst);
168    }
169
170    fn write_key_value(&self, kv: KeyValue, stats: &mut WriteMetrics) -> Result<()> {
171        ensure!(
172            self.row_codec.num_fields() == kv.num_primary_keys(),
173            PrimaryKeyLengthMismatchSnafu {
174                expect: self.row_codec.num_fields(),
175                actual: kv.num_primary_keys(),
176            }
177        );
178
179        let primary_key_encoded = self.row_codec.encode(kv.primary_keys())?;
180
181        let (key_allocated, value_allocated) =
182            self.series_set.push_to_series(primary_key_encoded, &kv);
183        stats.key_bytes += key_allocated;
184        stats.value_bytes += value_allocated;
185
186        // safety: timestamp of kv must be both present and a valid timestamp value.
187        let ts = kv.timestamp().as_timestamp().unwrap().unwrap().value();
188        stats.min_ts = stats.min_ts.min(ts);
189        stats.max_ts = stats.max_ts.max(ts);
190        Ok(())
191    }
192}
193
194impl Debug for TimeSeriesMemtable {
195    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
196        f.debug_struct("TimeSeriesMemtable").finish()
197    }
198}
199
200impl Memtable for TimeSeriesMemtable {
201    fn id(&self) -> MemtableId {
202        self.id
203    }
204
205    fn write(&self, kvs: &KeyValues) -> Result<()> {
206        if kvs.is_empty() {
207            return Ok(());
208        }
209
210        let mut local_stats = WriteMetrics::default();
211
212        for kv in kvs.iter() {
213            self.write_key_value(kv, &mut local_stats)?;
214        }
215        local_stats.value_bytes += kvs.num_rows() * std::mem::size_of::<Timestamp>();
216        local_stats.value_bytes += kvs.num_rows() * std::mem::size_of::<OpType>();
217        local_stats.max_sequence = kvs.max_sequence();
218        local_stats.num_rows = kvs.num_rows();
219        // TODO(hl): this maybe inaccurate since for-iteration may return early.
220        // We may lift the primary key length check out of Memtable::write
221        // so that we can ensure writing to memtable will succeed.
222        self.update_stats(local_stats);
223        Ok(())
224    }
225
226    fn write_one(&self, key_value: KeyValue) -> Result<()> {
227        let mut metrics = WriteMetrics::default();
228        let res = self.write_key_value(key_value, &mut metrics);
229        metrics.value_bytes += std::mem::size_of::<Timestamp>() + std::mem::size_of::<OpType>();
230        metrics.max_sequence = key_value.sequence();
231        metrics.num_rows = 1;
232
233        if res.is_ok() {
234            self.update_stats(metrics);
235        }
236        res
237    }
238
239    fn write_bulk(&self, part: BulkPart) -> Result<()> {
240        // Default implementation fallback to row iteration.
241        let mutation = part.to_mutation(&self.region_metadata)?;
242        let mut metrics = WriteMetrics::default();
243        if let Some(key_values) = KeyValues::new(&self.region_metadata, mutation) {
244            for kv in key_values.iter() {
245                self.write_key_value(kv, &mut metrics)?
246            }
247        }
248
249        metrics.max_sequence = part.sequence;
250        metrics.max_ts = part.max_ts;
251        metrics.min_ts = part.min_ts;
252        metrics.num_rows = part.num_rows;
253        self.update_stats(metrics);
254        Ok(())
255    }
256
257    fn iter(
258        &self,
259        projection: Option<&[ColumnId]>,
260        filters: Option<Predicate>,
261        sequence: Option<SequenceNumber>,
262    ) -> Result<BoxedBatchIterator> {
263        let projection = if let Some(projection) = projection {
264            projection.iter().copied().collect()
265        } else {
266            self.region_metadata
267                .field_columns()
268                .map(|c| c.column_id)
269                .collect()
270        };
271
272        let iter = self
273            .series_set
274            .iter_series(projection, filters, self.dedup, sequence)?;
275
276        if self.merge_mode == MergeMode::LastNonNull {
277            let iter = LastNonNullIter::new(iter);
278            Ok(Box::new(iter))
279        } else {
280            Ok(Box::new(iter))
281        }
282    }
283
284    fn ranges(
285        &self,
286        projection: Option<&[ColumnId]>,
287        predicate: PredicateGroup,
288        sequence: Option<SequenceNumber>,
289    ) -> Result<MemtableRanges> {
290        let projection = if let Some(projection) = projection {
291            projection.iter().copied().collect()
292        } else {
293            self.region_metadata
294                .field_columns()
295                .map(|c| c.column_id)
296                .collect()
297        };
298        let builder = Box::new(TimeSeriesIterBuilder {
299            series_set: self.series_set.clone(),
300            projection,
301            predicate: predicate.predicate().cloned(),
302            dedup: self.dedup,
303            merge_mode: self.merge_mode,
304            sequence,
305        });
306        let context = Arc::new(MemtableRangeContext::new(self.id, builder, predicate));
307
308        Ok(MemtableRanges {
309            ranges: [(0, MemtableRange::new(context))].into(),
310            stats: self.stats(),
311        })
312    }
313
314    fn is_empty(&self) -> bool {
315        self.series_set.series.read().unwrap().is_empty()
316    }
317
318    fn freeze(&self) -> Result<()> {
319        self.alloc_tracker.done_allocating();
320
321        Ok(())
322    }
323
324    fn stats(&self) -> MemtableStats {
325        let estimated_bytes = self.alloc_tracker.bytes_allocated();
326
327        if estimated_bytes == 0 {
328            // no rows ever written
329            return MemtableStats {
330                estimated_bytes,
331                time_range: None,
332                num_rows: 0,
333                num_ranges: 0,
334                max_sequence: 0,
335            };
336        }
337        let ts_type = self
338            .region_metadata
339            .time_index_column()
340            .column_schema
341            .data_type
342            .clone()
343            .as_timestamp()
344            .expect("Timestamp column must have timestamp type");
345        let max_timestamp = ts_type.create_timestamp(self.max_timestamp.load(Ordering::Relaxed));
346        let min_timestamp = ts_type.create_timestamp(self.min_timestamp.load(Ordering::Relaxed));
347        MemtableStats {
348            estimated_bytes,
349            time_range: Some((min_timestamp, max_timestamp)),
350            num_rows: self.num_rows.load(Ordering::Relaxed),
351            num_ranges: 1,
352            max_sequence: self.max_sequence.load(Ordering::Relaxed),
353        }
354    }
355
356    fn fork(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef {
357        Arc::new(TimeSeriesMemtable::new(
358            metadata.clone(),
359            id,
360            self.alloc_tracker.write_buffer_manager(),
361            self.dedup,
362            self.merge_mode,
363        ))
364    }
365}
366
367type SeriesRwLockMap = RwLock<BTreeMap<Vec<u8>, Arc<RwLock<Series>>>>;
368
369#[derive(Clone)]
370pub(crate) struct SeriesSet {
371    pub(crate) region_metadata: RegionMetadataRef,
372    pub(crate) series: Arc<SeriesRwLockMap>,
373    pub(crate) codec: Arc<DensePrimaryKeyCodec>,
374}
375
376impl SeriesSet {
377    fn new(region_metadata: RegionMetadataRef, codec: Arc<DensePrimaryKeyCodec>) -> Self {
378        Self {
379            region_metadata,
380            series: Default::default(),
381            codec,
382        }
383    }
384}
385
386impl SeriesSet {
387    /// Push [KeyValue] to SeriesSet with given primary key and return key/value allocated memory size.
388    fn push_to_series(&self, primary_key: Vec<u8>, kv: &KeyValue) -> (usize, usize) {
389        if let Some(series) = self.series.read().unwrap().get(&primary_key) {
390            let value_allocated = series.write().unwrap().push(
391                kv.timestamp(),
392                kv.sequence(),
393                kv.op_type(),
394                kv.fields(),
395            );
396            return (0, value_allocated);
397        };
398
399        let mut indices = self.series.write().unwrap();
400        match indices.entry(primary_key) {
401            Entry::Vacant(v) => {
402                let key_len = v.key().len();
403                let mut series = Series::new(&self.region_metadata);
404                let value_allocated =
405                    series.push(kv.timestamp(), kv.sequence(), kv.op_type(), kv.fields());
406                v.insert(Arc::new(RwLock::new(series)));
407                (key_len, value_allocated)
408            }
409            // safety: series must exist at given index.
410            Entry::Occupied(v) => {
411                let value_allocated = v.get().write().unwrap().push(
412                    kv.timestamp(),
413                    kv.sequence(),
414                    kv.op_type(),
415                    kv.fields(),
416                );
417                (0, value_allocated)
418            }
419        }
420    }
421
422    #[cfg(test)]
423    fn get_series(&self, primary_key: &[u8]) -> Option<Arc<RwLock<Series>>> {
424        self.series.read().unwrap().get(primary_key).cloned()
425    }
426
427    /// Iterates all series in [SeriesSet].
428    fn iter_series(
429        &self,
430        projection: HashSet<ColumnId>,
431        predicate: Option<Predicate>,
432        dedup: bool,
433        sequence: Option<SequenceNumber>,
434    ) -> Result<Iter> {
435        let primary_key_schema = primary_key_schema(&self.region_metadata);
436        let primary_key_datatypes = self
437            .region_metadata
438            .primary_key_columns()
439            .map(|pk| pk.column_schema.data_type.clone())
440            .collect();
441
442        Iter::try_new(
443            self.region_metadata.clone(),
444            self.series.clone(),
445            projection,
446            predicate,
447            primary_key_schema,
448            primary_key_datatypes,
449            self.codec.clone(),
450            dedup,
451            sequence,
452        )
453    }
454}
455
456/// Creates an arrow [SchemaRef](arrow::datatypes::SchemaRef) that only contains primary keys
457/// of given region schema
458pub(crate) fn primary_key_schema(
459    region_metadata: &RegionMetadataRef,
460) -> arrow::datatypes::SchemaRef {
461    let fields = region_metadata
462        .primary_key_columns()
463        .map(|pk| {
464            arrow::datatypes::Field::new(
465                pk.column_schema.name.clone(),
466                pk.column_schema.data_type.as_arrow_type(),
467                pk.column_schema.is_nullable(),
468            )
469        })
470        .collect::<Vec<_>>();
471    Arc::new(arrow::datatypes::Schema::new(fields))
472}
473
474/// Metrics for reading the memtable.
475#[derive(Debug, Default)]
476struct Metrics {
477    /// Total series in the memtable.
478    total_series: usize,
479    /// Number of series pruned.
480    num_pruned_series: usize,
481    /// Number of rows read.
482    num_rows: usize,
483    /// Number of batch read.
484    num_batches: usize,
485    /// Duration to scan the memtable.
486    scan_cost: Duration,
487}
488
489struct Iter {
490    metadata: RegionMetadataRef,
491    series: Arc<SeriesRwLockMap>,
492    projection: HashSet<ColumnId>,
493    last_key: Option<Vec<u8>>,
494    predicate: Vec<SimpleFilterEvaluator>,
495    pk_schema: arrow::datatypes::SchemaRef,
496    pk_datatypes: Vec<ConcreteDataType>,
497    codec: Arc<DensePrimaryKeyCodec>,
498    dedup: bool,
499    sequence: Option<SequenceNumber>,
500    metrics: Metrics,
501}
502
503impl Iter {
504    #[allow(clippy::too_many_arguments)]
505    pub(crate) fn try_new(
506        metadata: RegionMetadataRef,
507        series: Arc<SeriesRwLockMap>,
508        projection: HashSet<ColumnId>,
509        predicate: Option<Predicate>,
510        pk_schema: arrow::datatypes::SchemaRef,
511        pk_datatypes: Vec<ConcreteDataType>,
512        codec: Arc<DensePrimaryKeyCodec>,
513        dedup: bool,
514        sequence: Option<SequenceNumber>,
515    ) -> Result<Self> {
516        let predicate = predicate
517            .map(|predicate| {
518                predicate
519                    .exprs()
520                    .iter()
521                    .filter_map(SimpleFilterEvaluator::try_new)
522                    .collect::<Vec<_>>()
523            })
524            .unwrap_or_default();
525        Ok(Self {
526            metadata,
527            series,
528            projection,
529            last_key: None,
530            predicate,
531            pk_schema,
532            pk_datatypes,
533            codec,
534            dedup,
535            sequence,
536            metrics: Metrics::default(),
537        })
538    }
539}
540
541impl Drop for Iter {
542    fn drop(&mut self) {
543        debug!(
544            "Iter {} time series memtable, metrics: {:?}",
545            self.metadata.region_id, self.metrics
546        );
547
548        READ_ROWS_TOTAL
549            .with_label_values(&["time_series_memtable"])
550            .inc_by(self.metrics.num_rows as u64);
551        READ_STAGE_ELAPSED
552            .with_label_values(&["scan_memtable"])
553            .observe(self.metrics.scan_cost.as_secs_f64());
554    }
555}
556
557impl Iterator for Iter {
558    type Item = Result<Batch>;
559
560    fn next(&mut self) -> Option<Self::Item> {
561        let start = Instant::now();
562        let map = self.series.read().unwrap();
563        let range = match &self.last_key {
564            None => map.range::<Vec<u8>, _>(..),
565            Some(last_key) => {
566                map.range::<Vec<u8>, _>((Bound::Excluded(last_key), Bound::Unbounded))
567            }
568        };
569
570        // TODO(hl): maybe yield more than one time series to amortize range overhead.
571        for (primary_key, series) in range {
572            self.metrics.total_series += 1;
573
574            let mut series = series.write().unwrap();
575            if !self.predicate.is_empty()
576                && !prune_primary_key(
577                    &self.codec,
578                    primary_key.as_slice(),
579                    &mut series,
580                    &self.pk_datatypes,
581                    self.pk_schema.clone(),
582                    &self.predicate,
583                )
584            {
585                // read next series
586                self.metrics.num_pruned_series += 1;
587                continue;
588            }
589            self.last_key = Some(primary_key.clone());
590
591            let values = series.compact(&self.metadata);
592            let batch = values.and_then(|v| {
593                v.to_batch(primary_key, &self.metadata, &self.projection, self.dedup)
594            });
595
596            // Update metrics.
597            self.metrics.num_batches += 1;
598            self.metrics.num_rows += batch.as_ref().map(|b| b.num_rows()).unwrap_or(0);
599            self.metrics.scan_cost += start.elapsed();
600
601            let mut batch = batch;
602            batch = batch.and_then(|mut batch| {
603                batch.filter_by_sequence(self.sequence)?;
604                Ok(batch)
605            });
606            return Some(batch);
607        }
608        self.metrics.scan_cost += start.elapsed();
609
610        None
611    }
612}
613
614fn prune_primary_key(
615    codec: &Arc<DensePrimaryKeyCodec>,
616    pk: &[u8],
617    series: &mut Series,
618    datatypes: &[ConcreteDataType],
619    pk_schema: arrow::datatypes::SchemaRef,
620    predicates: &[SimpleFilterEvaluator],
621) -> bool {
622    // no primary key, we simply return true.
623    if pk_schema.fields().is_empty() {
624        return true;
625    }
626
627    // retrieve primary key values from cache or decode from bytes.
628    let pk_values = if let Some(pk_values) = series.pk_cache.as_ref() {
629        pk_values
630    } else {
631        let pk_values = codec.decode_dense_without_column_id(pk);
632        if let Err(e) = pk_values {
633            error!(e; "Failed to decode primary key");
634            return true;
635        }
636        series.update_pk_cache(pk_values.unwrap());
637        series.pk_cache.as_ref().unwrap()
638    };
639
640    // evaluate predicates against primary key values
641    let mut result = true;
642    for predicate in predicates {
643        // ignore predicates that are not referencing primary key columns
644        let Ok(index) = pk_schema.index_of(predicate.column_name()) else {
645            continue;
646        };
647        // Safety: arrow schema and datatypes are constructed from the same source.
648        let scalar_value = pk_values[index]
649            .try_to_scalar_value(&datatypes[index])
650            .unwrap();
651        result &= predicate.evaluate_scalar(&scalar_value).unwrap_or(true);
652    }
653
654    result
655}
656
657/// A `Series` holds a list of field values of some given primary key.
658pub(crate) struct Series {
659    pk_cache: Option<Vec<Value>>,
660    active: ValueBuilder,
661    frozen: Vec<Values>,
662    region_metadata: RegionMetadataRef,
663}
664
665impl Series {
666    pub(crate) fn new(region_metadata: &RegionMetadataRef) -> Self {
667        Self {
668            pk_cache: None,
669            active: ValueBuilder::new(region_metadata, INITIAL_BUILDER_CAPACITY),
670            frozen: vec![],
671            region_metadata: region_metadata.clone(),
672        }
673    }
674
675    pub fn is_empty(&self) -> bool {
676        self.active.len() == 0 && self.frozen.is_empty()
677    }
678
679    /// Pushes a row of values into Series. Return the size of values.
680    pub(crate) fn push<'a>(
681        &mut self,
682        ts: ValueRef<'a>,
683        sequence: u64,
684        op_type: OpType,
685        values: impl Iterator<Item = ValueRef<'a>>,
686    ) -> usize {
687        // + 10 to avoid potential reallocation.
688        if self.active.len() + 10 > BUILDER_CAPACITY {
689            let region_metadata = self.region_metadata.clone();
690            self.freeze(&region_metadata);
691        }
692        self.active.push(ts, sequence, op_type as u8, values)
693    }
694
695    fn update_pk_cache(&mut self, pk_values: Vec<Value>) {
696        self.pk_cache = Some(pk_values);
697    }
698
699    /// Freezes the active part and push it to `frozen`.
700    pub(crate) fn freeze(&mut self, region_metadata: &RegionMetadataRef) {
701        if self.active.len() != 0 {
702            let mut builder = ValueBuilder::new(region_metadata, INITIAL_BUILDER_CAPACITY);
703            std::mem::swap(&mut self.active, &mut builder);
704            self.frozen.push(Values::from(builder));
705        }
706    }
707
708    pub(crate) fn extend(
709        &mut self,
710        ts_v: VectorRef,
711        op_type_v: u8,
712        sequence_v: u64,
713        fields: impl Iterator<Item = VectorRef>,
714    ) -> Result<()> {
715        self.active.extend(ts_v, op_type_v, sequence_v, fields)
716    }
717
718    /// Freezes active part to frozen part and compact frozen part to reduce memory fragmentation.
719    /// Returns the frozen and compacted values.
720    pub(crate) fn compact(&mut self, region_metadata: &RegionMetadataRef) -> Result<&Values> {
721        self.freeze(region_metadata);
722
723        let frozen = &self.frozen;
724
725        // Each series must contain at least one row
726        debug_assert!(!frozen.is_empty());
727
728        if frozen.len() > 1 {
729            // TODO(hl): We should keep track of min/max timestamps for each values and avoid
730            // cloning and sorting when values do not overlap with each other.
731
732            let column_size = frozen[0].fields.len() + 3;
733
734            if cfg!(debug_assertions) {
735                debug_assert!(frozen
736                    .iter()
737                    .zip(frozen.iter().skip(1))
738                    .all(|(prev, next)| { prev.fields.len() == next.fields.len() }));
739            }
740
741            let arrays = frozen.iter().map(|v| v.columns()).collect::<Vec<_>>();
742            let concatenated = (0..column_size)
743                .map(|i| {
744                    let to_concat = arrays.iter().map(|a| a[i].as_ref()).collect::<Vec<_>>();
745                    arrow::compute::concat(&to_concat)
746                })
747                .collect::<std::result::Result<Vec<_>, _>>()
748                .context(ComputeArrowSnafu)?;
749
750            debug_assert_eq!(concatenated.len(), column_size);
751            let values = Values::from_columns(&concatenated)?;
752            self.frozen = vec![values];
753        };
754        Ok(&self.frozen[0])
755    }
756}
757
758/// `ValueBuilder` holds all the vector builders for field columns.
759struct ValueBuilder {
760    timestamp: Vec<i64>,
761    timestamp_type: ConcreteDataType,
762    sequence: Vec<u64>,
763    op_type: Vec<u8>,
764    fields: Vec<Option<FieldBuilder>>,
765    field_types: Vec<ConcreteDataType>,
766}
767
768impl ValueBuilder {
769    pub(crate) fn new(region_metadata: &RegionMetadataRef, capacity: usize) -> Self {
770        let timestamp_type = region_metadata
771            .time_index_column()
772            .column_schema
773            .data_type
774            .clone();
775        let sequence = Vec::with_capacity(capacity);
776        let op_type = Vec::with_capacity(capacity);
777
778        let field_types = region_metadata
779            .field_columns()
780            .map(|c| c.column_schema.data_type.clone())
781            .collect::<Vec<_>>();
782        let fields = (0..field_types.len()).map(|_| None).collect();
783
784        Self {
785            timestamp: Vec::with_capacity(capacity),
786            timestamp_type,
787            sequence,
788            op_type,
789            fields,
790            field_types,
791        }
792    }
793
794    /// Pushes a new row to `ValueBuilder`.
795    /// We don't need primary keys since they've already be encoded.
796    /// Returns the size of field values.
797    ///
798    /// In this method, we don't check the data type of the value, because it is already checked in the caller.
799    fn push<'a>(
800        &mut self,
801        ts: ValueRef,
802        sequence: u64,
803        op_type: u8,
804        fields: impl Iterator<Item = ValueRef<'a>>,
805    ) -> usize {
806        #[cfg(debug_assertions)]
807        let fields = {
808            let field_vec = fields.collect::<Vec<_>>();
809            debug_assert_eq!(field_vec.len(), self.fields.len());
810            field_vec.into_iter()
811        };
812
813        self.timestamp
814            .push(ts.as_timestamp().unwrap().unwrap().value());
815        self.sequence.push(sequence);
816        self.op_type.push(op_type);
817        let num_rows = self.timestamp.len();
818        let mut size = 0;
819        for (idx, field_value) in fields.enumerate() {
820            size += field_value.data_size();
821            if !field_value.is_null() || self.fields[idx].is_some() {
822                if let Some(field) = self.fields[idx].as_mut() {
823                    let _ = field.push(field_value);
824                } else {
825                    let mut mutable_vector =
826                        if let ConcreteDataType::String(_) = &self.field_types[idx] {
827                            FieldBuilder::String(StringBuilder::with_capacity(256, 4096))
828                        } else {
829                            FieldBuilder::Other(
830                                self.field_types[idx]
831                                    .create_mutable_vector(num_rows.max(INITIAL_BUILDER_CAPACITY)),
832                            )
833                        };
834                    mutable_vector.push_nulls(num_rows - 1);
835                    let _ = mutable_vector.push(field_value);
836                    self.fields[idx] = Some(mutable_vector);
837                }
838            }
839        }
840
841        size
842    }
843
844    pub(crate) fn extend(
845        &mut self,
846        ts_v: VectorRef,
847        op_type: u8,
848        sequence: u64,
849        fields: impl Iterator<Item = VectorRef>,
850    ) -> error::Result<()> {
851        let num_rows_before = self.timestamp.len();
852        let num_rows_to_write = ts_v.len();
853        self.timestamp.reserve(num_rows_to_write);
854        match self.timestamp_type {
855            ConcreteDataType::Timestamp(TimestampType::Second(_)) => {
856                self.timestamp.extend(
857                    ts_v.as_any()
858                        .downcast_ref::<TimestampSecondVector>()
859                        .unwrap()
860                        .iter_data()
861                        .map(|v| v.unwrap().0.value()),
862                );
863            }
864            ConcreteDataType::Timestamp(TimestampType::Millisecond(_)) => {
865                self.timestamp.extend(
866                    ts_v.as_any()
867                        .downcast_ref::<TimestampMillisecondVector>()
868                        .unwrap()
869                        .iter_data()
870                        .map(|v| v.unwrap().0.value()),
871                );
872            }
873            ConcreteDataType::Timestamp(TimestampType::Microsecond(_)) => {
874                self.timestamp.extend(
875                    ts_v.as_any()
876                        .downcast_ref::<TimestampMicrosecondVector>()
877                        .unwrap()
878                        .iter_data()
879                        .map(|v| v.unwrap().0.value()),
880                );
881            }
882            ConcreteDataType::Timestamp(TimestampType::Nanosecond(_)) => {
883                self.timestamp.extend(
884                    ts_v.as_any()
885                        .downcast_ref::<TimestampNanosecondVector>()
886                        .unwrap()
887                        .iter_data()
888                        .map(|v| v.unwrap().0.value()),
889                );
890            }
891            _ => unreachable!(),
892        };
893
894        self.op_type.reserve(num_rows_to_write);
895        self.op_type
896            .extend(iter::repeat_n(op_type, num_rows_to_write));
897        self.sequence.reserve(num_rows_to_write);
898        self.sequence
899            .extend(iter::repeat_n(sequence, num_rows_to_write));
900
901        for (field_idx, (field_src, field_dest)) in fields.zip(self.fields.iter_mut()).enumerate() {
902            let builder = field_dest.get_or_insert_with(|| {
903                let mut field_builder =
904                    FieldBuilder::create(&self.field_types[field_idx], INITIAL_BUILDER_CAPACITY);
905                field_builder.push_nulls(num_rows_before);
906                field_builder
907            });
908            match builder {
909                FieldBuilder::String(builder) => {
910                    let array = field_src.to_arrow_array();
911                    let string_array =
912                        array
913                            .as_any()
914                            .downcast_ref::<StringArray>()
915                            .with_context(|| error::InvalidBatchSnafu {
916                                reason: format!(
917                                    "Field type mismatch, expecting String, given: {}",
918                                    field_src.data_type()
919                                ),
920                            })?;
921                    builder.append_array(string_array);
922                }
923                FieldBuilder::Other(builder) => {
924                    let len = field_src.len();
925                    builder
926                        .extend_slice_of(&*field_src, 0, len)
927                        .context(error::ComputeVectorSnafu)?;
928                }
929            }
930        }
931        Ok(())
932    }
933
934    /// Returns the length of [ValueBuilder]
935    fn len(&self) -> usize {
936        let sequence_len = self.sequence.len();
937        debug_assert_eq!(sequence_len, self.op_type.len());
938        debug_assert_eq!(sequence_len, self.timestamp.len());
939        sequence_len
940    }
941}
942
943/// [Values] holds an immutable vectors of field columns, including `sequence` and `op_type`.
944#[derive(Clone)]
945pub(crate) struct Values {
946    timestamp: VectorRef,
947    sequence: Arc<UInt64Vector>,
948    op_type: Arc<UInt8Vector>,
949    fields: Vec<VectorRef>,
950}
951
952impl Values {
953    /// Converts [Values] to `Batch`, sorts the batch according to `timestamp, sequence` desc and
954    /// keeps only the latest row for the same timestamp.
955    pub fn to_batch(
956        &self,
957        primary_key: &[u8],
958        metadata: &RegionMetadataRef,
959        projection: &HashSet<ColumnId>,
960        dedup: bool,
961    ) -> Result<Batch> {
962        let builder = BatchBuilder::with_required_columns(
963            primary_key.to_vec(),
964            self.timestamp.clone(),
965            self.sequence.clone(),
966            self.op_type.clone(),
967        );
968
969        let fields = metadata
970            .field_columns()
971            .zip(self.fields.iter())
972            .filter_map(|(c, f)| {
973                projection.get(&c.column_id).map(|c| BatchColumn {
974                    column_id: *c,
975                    data: f.clone(),
976                })
977            })
978            .collect();
979
980        let mut batch = builder.with_fields(fields).build()?;
981        batch.sort(dedup)?;
982        Ok(batch)
983    }
984
985    /// Returns a vector of all columns converted to arrow [Array](datatypes::arrow::array::Array) in [Values].
986    fn columns(&self) -> Vec<ArrayRef> {
987        let mut res = Vec::with_capacity(3 + self.fields.len());
988        res.push(self.timestamp.to_arrow_array());
989        res.push(self.sequence.to_arrow_array());
990        res.push(self.op_type.to_arrow_array());
991        res.extend(self.fields.iter().map(|f| f.to_arrow_array()));
992        res
993    }
994
995    /// Builds a new [Values] instance from columns.
996    fn from_columns(cols: &[ArrayRef]) -> Result<Self> {
997        debug_assert!(cols.len() >= 3);
998        let timestamp = Helper::try_into_vector(&cols[0]).context(ConvertVectorSnafu)?;
999        let sequence =
1000            Arc::new(UInt64Vector::try_from_arrow_array(&cols[1]).context(ConvertVectorSnafu)?);
1001        let op_type =
1002            Arc::new(UInt8Vector::try_from_arrow_array(&cols[2]).context(ConvertVectorSnafu)?);
1003        let fields = Helper::try_into_vectors(&cols[3..]).context(ConvertVectorSnafu)?;
1004
1005        Ok(Self {
1006            timestamp,
1007            sequence,
1008            op_type,
1009            fields,
1010        })
1011    }
1012}
1013
1014impl From<ValueBuilder> for Values {
1015    fn from(mut value: ValueBuilder) -> Self {
1016        let num_rows = value.len();
1017        let fields = value
1018            .fields
1019            .iter_mut()
1020            .enumerate()
1021            .map(|(i, v)| {
1022                if let Some(v) = v {
1023                    v.finish()
1024                } else {
1025                    let mut single_null = value.field_types[i].create_mutable_vector(num_rows);
1026                    single_null.push_nulls(num_rows);
1027                    single_null.to_vector()
1028                }
1029            })
1030            .collect::<Vec<_>>();
1031
1032        let sequence = Arc::new(UInt64Vector::from_vec(value.sequence));
1033        let op_type = Arc::new(UInt8Vector::from_vec(value.op_type));
1034        let timestamp: VectorRef = match value.timestamp_type {
1035            ConcreteDataType::Timestamp(TimestampType::Second(_)) => {
1036                Arc::new(TimestampSecondVector::from_vec(value.timestamp))
1037            }
1038            ConcreteDataType::Timestamp(TimestampType::Millisecond(_)) => {
1039                Arc::new(TimestampMillisecondVector::from_vec(value.timestamp))
1040            }
1041            ConcreteDataType::Timestamp(TimestampType::Microsecond(_)) => {
1042                Arc::new(TimestampMicrosecondVector::from_vec(value.timestamp))
1043            }
1044            ConcreteDataType::Timestamp(TimestampType::Nanosecond(_)) => {
1045                Arc::new(TimestampNanosecondVector::from_vec(value.timestamp))
1046            }
1047            _ => unreachable!(),
1048        };
1049
1050        if cfg!(debug_assertions) {
1051            debug_assert_eq!(timestamp.len(), sequence.len());
1052            debug_assert_eq!(timestamp.len(), op_type.len());
1053            for field in &fields {
1054                debug_assert_eq!(timestamp.len(), field.len());
1055            }
1056        }
1057
1058        Self {
1059            timestamp,
1060            sequence,
1061            op_type,
1062            fields,
1063        }
1064    }
1065}
1066
1067struct TimeSeriesIterBuilder {
1068    series_set: SeriesSet,
1069    projection: HashSet<ColumnId>,
1070    predicate: Option<Predicate>,
1071    dedup: bool,
1072    sequence: Option<SequenceNumber>,
1073    merge_mode: MergeMode,
1074}
1075
1076impl IterBuilder for TimeSeriesIterBuilder {
1077    fn build(&self) -> Result<BoxedBatchIterator> {
1078        let iter = self.series_set.iter_series(
1079            self.projection.clone(),
1080            self.predicate.clone(),
1081            self.dedup,
1082            self.sequence,
1083        )?;
1084
1085        if self.merge_mode == MergeMode::LastNonNull {
1086            let iter = LastNonNullIter::new(iter);
1087            Ok(Box::new(iter))
1088        } else {
1089            Ok(Box::new(iter))
1090        }
1091    }
1092}
1093
1094#[cfg(test)]
1095mod tests {
1096    use std::collections::{HashMap, HashSet};
1097
1098    use api::helper::ColumnDataTypeWrapper;
1099    use api::v1::value::ValueData;
1100    use api::v1::{Mutation, Row, Rows, SemanticType};
1101    use common_time::Timestamp;
1102    use datatypes::prelude::{ConcreteDataType, ScalarVector};
1103    use datatypes::schema::ColumnSchema;
1104    use datatypes::value::{OrderedFloat, Value};
1105    use datatypes::vectors::{Float64Vector, Int64Vector, TimestampMillisecondVector};
1106    use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
1107    use store_api::storage::RegionId;
1108
1109    use super::*;
1110    use crate::row_converter::SortField;
1111    use crate::test_util::column_metadata_to_column_schema;
1112
1113    fn schema_for_test() -> RegionMetadataRef {
1114        let mut builder = RegionMetadataBuilder::new(RegionId::new(123, 456));
1115        builder
1116            .push_column_metadata(ColumnMetadata {
1117                column_schema: ColumnSchema::new("k0", ConcreteDataType::string_datatype(), false),
1118                semantic_type: SemanticType::Tag,
1119                column_id: 0,
1120            })
1121            .push_column_metadata(ColumnMetadata {
1122                column_schema: ColumnSchema::new("k1", ConcreteDataType::int64_datatype(), false),
1123                semantic_type: SemanticType::Tag,
1124                column_id: 1,
1125            })
1126            .push_column_metadata(ColumnMetadata {
1127                column_schema: ColumnSchema::new(
1128                    "ts",
1129                    ConcreteDataType::timestamp_millisecond_datatype(),
1130                    false,
1131                ),
1132                semantic_type: SemanticType::Timestamp,
1133                column_id: 2,
1134            })
1135            .push_column_metadata(ColumnMetadata {
1136                column_schema: ColumnSchema::new("v0", ConcreteDataType::int64_datatype(), true),
1137                semantic_type: SemanticType::Field,
1138                column_id: 3,
1139            })
1140            .push_column_metadata(ColumnMetadata {
1141                column_schema: ColumnSchema::new("v1", ConcreteDataType::float64_datatype(), true),
1142                semantic_type: SemanticType::Field,
1143                column_id: 4,
1144            })
1145            .primary_key(vec![0, 1]);
1146        let region_metadata = builder.build().unwrap();
1147        Arc::new(region_metadata)
1148    }
1149
1150    fn ts_value_ref(val: i64) -> ValueRef<'static> {
1151        ValueRef::Timestamp(Timestamp::new_millisecond(val))
1152    }
1153
1154    fn field_value_ref(v0: i64, v1: f64) -> impl Iterator<Item = ValueRef<'static>> {
1155        vec![ValueRef::Int64(v0), ValueRef::Float64(OrderedFloat(v1))].into_iter()
1156    }
1157
1158    fn check_values(values: &Values, expect: &[(i64, u64, u8, i64, f64)]) {
1159        let ts = values
1160            .timestamp
1161            .as_any()
1162            .downcast_ref::<TimestampMillisecondVector>()
1163            .unwrap();
1164
1165        let v0 = values.fields[0]
1166            .as_any()
1167            .downcast_ref::<Int64Vector>()
1168            .unwrap();
1169        let v1 = values.fields[1]
1170            .as_any()
1171            .downcast_ref::<Float64Vector>()
1172            .unwrap();
1173        let read = ts
1174            .iter_data()
1175            .zip(values.sequence.iter_data())
1176            .zip(values.op_type.iter_data())
1177            .zip(v0.iter_data())
1178            .zip(v1.iter_data())
1179            .map(|((((ts, sequence), op_type), v0), v1)| {
1180                (
1181                    ts.unwrap().0.value(),
1182                    sequence.unwrap(),
1183                    op_type.unwrap(),
1184                    v0.unwrap(),
1185                    v1.unwrap(),
1186                )
1187            })
1188            .collect::<Vec<_>>();
1189        assert_eq!(expect, &read);
1190    }
1191
1192    #[test]
1193    fn test_series() {
1194        let region_metadata = schema_for_test();
1195        let mut series = Series::new(&region_metadata);
1196        series.push(ts_value_ref(1), 0, OpType::Put, field_value_ref(1, 10.1));
1197        series.push(ts_value_ref(2), 0, OpType::Put, field_value_ref(2, 10.2));
1198        assert_eq!(2, series.active.timestamp.len());
1199        assert_eq!(0, series.frozen.len());
1200
1201        let values = series.compact(&region_metadata).unwrap();
1202        check_values(values, &[(1, 0, 1, 1, 10.1), (2, 0, 1, 2, 10.2)]);
1203        assert_eq!(0, series.active.timestamp.len());
1204        assert_eq!(1, series.frozen.len());
1205    }
1206
1207    #[test]
1208    fn test_series_with_nulls() {
1209        let region_metadata = schema_for_test();
1210        let mut series = Series::new(&region_metadata);
1211        // col1: NULL 1 2 3
1212        // col2: NULL NULL 10.2 NULL
1213        series.push(
1214            ts_value_ref(1),
1215            0,
1216            OpType::Put,
1217            vec![ValueRef::Null, ValueRef::Null].into_iter(),
1218        );
1219        series.push(
1220            ts_value_ref(1),
1221            0,
1222            OpType::Put,
1223            vec![ValueRef::Int64(1), ValueRef::Null].into_iter(),
1224        );
1225        series.push(ts_value_ref(1), 2, OpType::Put, field_value_ref(2, 10.2));
1226        series.push(
1227            ts_value_ref(1),
1228            3,
1229            OpType::Put,
1230            vec![ValueRef::Int64(2), ValueRef::Null].into_iter(),
1231        );
1232        assert_eq!(4, series.active.timestamp.len());
1233        assert_eq!(0, series.frozen.len());
1234
1235        let values = series.compact(&region_metadata).unwrap();
1236        assert_eq!(values.fields[0].null_count(), 1);
1237        assert_eq!(values.fields[1].null_count(), 3);
1238        assert_eq!(0, series.active.timestamp.len());
1239        assert_eq!(1, series.frozen.len());
1240    }
1241
1242    fn check_value(batch: &Batch, expect: Vec<Vec<Value>>) {
1243        let ts_len = batch.timestamps().len();
1244        assert_eq!(batch.sequences().len(), ts_len);
1245        assert_eq!(batch.op_types().len(), ts_len);
1246        for f in batch.fields() {
1247            assert_eq!(f.data.len(), ts_len);
1248        }
1249
1250        let mut rows = vec![];
1251        for idx in 0..ts_len {
1252            let mut row = Vec::with_capacity(batch.fields().len() + 3);
1253            row.push(batch.timestamps().get(idx));
1254            row.push(batch.sequences().get(idx));
1255            row.push(batch.op_types().get(idx));
1256            row.extend(batch.fields().iter().map(|f| f.data.get(idx)));
1257            rows.push(row);
1258        }
1259
1260        assert_eq!(expect.len(), rows.len());
1261        for (idx, row) in rows.iter().enumerate() {
1262            assert_eq!(&expect[idx], row);
1263        }
1264    }
1265
1266    #[test]
1267    fn test_values_sort() {
1268        let schema = schema_for_test();
1269        let timestamp = Arc::new(TimestampMillisecondVector::from_vec(vec![1, 2, 3, 4, 3]));
1270        let sequence = Arc::new(UInt64Vector::from_vec(vec![1, 1, 1, 1, 2]));
1271        let op_type = Arc::new(UInt8Vector::from_vec(vec![1, 1, 1, 1, 0]));
1272
1273        let fields = vec![
1274            Arc::new(Int64Vector::from_vec(vec![4, 3, 2, 1, 2])) as Arc<_>,
1275            Arc::new(Float64Vector::from_vec(vec![1.1, 2.1, 4.2, 3.3, 4.2])) as Arc<_>,
1276        ];
1277        let values = Values {
1278            timestamp: timestamp as Arc<_>,
1279            sequence,
1280            op_type,
1281            fields,
1282        };
1283
1284        let batch = values
1285            .to_batch(
1286                b"test",
1287                &schema,
1288                &[0, 1, 2, 3, 4].into_iter().collect(),
1289                true,
1290            )
1291            .unwrap();
1292        check_value(
1293            &batch,
1294            vec![
1295                vec![
1296                    Value::Timestamp(Timestamp::new_millisecond(1)),
1297                    Value::UInt64(1),
1298                    Value::UInt8(1),
1299                    Value::Int64(4),
1300                    Value::Float64(OrderedFloat(1.1)),
1301                ],
1302                vec![
1303                    Value::Timestamp(Timestamp::new_millisecond(2)),
1304                    Value::UInt64(1),
1305                    Value::UInt8(1),
1306                    Value::Int64(3),
1307                    Value::Float64(OrderedFloat(2.1)),
1308                ],
1309                vec![
1310                    Value::Timestamp(Timestamp::new_millisecond(3)),
1311                    Value::UInt64(2),
1312                    Value::UInt8(0),
1313                    Value::Int64(2),
1314                    Value::Float64(OrderedFloat(4.2)),
1315                ],
1316                vec![
1317                    Value::Timestamp(Timestamp::new_millisecond(4)),
1318                    Value::UInt64(1),
1319                    Value::UInt8(1),
1320                    Value::Int64(1),
1321                    Value::Float64(OrderedFloat(3.3)),
1322                ],
1323            ],
1324        )
1325    }
1326
1327    fn build_key_values(schema: &RegionMetadataRef, k0: String, k1: i64, len: usize) -> KeyValues {
1328        let column_schema = schema
1329            .column_metadatas
1330            .iter()
1331            .map(|c| api::v1::ColumnSchema {
1332                column_name: c.column_schema.name.clone(),
1333                datatype: ColumnDataTypeWrapper::try_from(c.column_schema.data_type.clone())
1334                    .unwrap()
1335                    .datatype() as i32,
1336                semantic_type: c.semantic_type as i32,
1337                ..Default::default()
1338            })
1339            .collect();
1340
1341        let rows = (0..len)
1342            .map(|i| Row {
1343                values: vec![
1344                    api::v1::Value {
1345                        value_data: Some(ValueData::StringValue(k0.clone())),
1346                    },
1347                    api::v1::Value {
1348                        value_data: Some(ValueData::I64Value(k1)),
1349                    },
1350                    api::v1::Value {
1351                        value_data: Some(ValueData::TimestampMillisecondValue(i as i64)),
1352                    },
1353                    api::v1::Value {
1354                        value_data: Some(ValueData::I64Value(i as i64)),
1355                    },
1356                    api::v1::Value {
1357                        value_data: Some(ValueData::F64Value(i as f64)),
1358                    },
1359                ],
1360            })
1361            .collect();
1362        let mutation = api::v1::Mutation {
1363            op_type: 1,
1364            sequence: 0,
1365            rows: Some(Rows {
1366                schema: column_schema,
1367                rows,
1368            }),
1369            write_hint: None,
1370        };
1371        KeyValues::new(schema.as_ref(), mutation).unwrap()
1372    }
1373
1374    #[test]
1375    fn test_series_set_concurrency() {
1376        let schema = schema_for_test();
1377        let row_codec = Arc::new(DensePrimaryKeyCodec::with_fields(
1378            schema
1379                .primary_key_columns()
1380                .map(|c| {
1381                    (
1382                        c.column_id,
1383                        SortField::new(c.column_schema.data_type.clone()),
1384                    )
1385                })
1386                .collect(),
1387        ));
1388        let set = Arc::new(SeriesSet::new(schema.clone(), row_codec));
1389
1390        let concurrency = 32;
1391        let pk_num = concurrency * 2;
1392        let mut handles = Vec::with_capacity(concurrency);
1393        for i in 0..concurrency {
1394            let set = set.clone();
1395            let schema = schema.clone();
1396            let column_schemas = schema
1397                .column_metadatas
1398                .iter()
1399                .map(column_metadata_to_column_schema)
1400                .collect::<Vec<_>>();
1401            let handle = std::thread::spawn(move || {
1402                for j in i * 100..(i + 1) * 100 {
1403                    let pk = j % pk_num;
1404                    let primary_key = format!("pk-{}", pk).as_bytes().to_vec();
1405
1406                    let kvs = KeyValues::new(
1407                        &schema,
1408                        Mutation {
1409                            op_type: OpType::Put as i32,
1410                            sequence: j as u64,
1411                            rows: Some(Rows {
1412                                schema: column_schemas.clone(),
1413                                rows: vec![Row {
1414                                    values: vec![
1415                                        api::v1::Value {
1416                                            value_data: Some(ValueData::StringValue(format!(
1417                                                "{}",
1418                                                j
1419                                            ))),
1420                                        },
1421                                        api::v1::Value {
1422                                            value_data: Some(ValueData::I64Value(j as i64)),
1423                                        },
1424                                        api::v1::Value {
1425                                            value_data: Some(ValueData::TimestampMillisecondValue(
1426                                                j as i64,
1427                                            )),
1428                                        },
1429                                        api::v1::Value {
1430                                            value_data: Some(ValueData::I64Value(j as i64)),
1431                                        },
1432                                        api::v1::Value {
1433                                            value_data: Some(ValueData::F64Value(j as f64)),
1434                                        },
1435                                    ],
1436                                }],
1437                            }),
1438                            write_hint: None,
1439                        },
1440                    )
1441                    .unwrap();
1442                    set.push_to_series(primary_key, &kvs.iter().next().unwrap());
1443                }
1444            });
1445            handles.push(handle);
1446        }
1447        for h in handles {
1448            h.join().unwrap();
1449        }
1450
1451        let mut timestamps = Vec::with_capacity(concurrency * 100);
1452        let mut sequences = Vec::with_capacity(concurrency * 100);
1453        let mut op_types = Vec::with_capacity(concurrency * 100);
1454        let mut v0 = Vec::with_capacity(concurrency * 100);
1455
1456        for i in 0..pk_num {
1457            let pk = format!("pk-{}", i).as_bytes().to_vec();
1458            let series = set.get_series(&pk).unwrap();
1459            let mut guard = series.write().unwrap();
1460            let values = guard.compact(&schema).unwrap();
1461            timestamps.extend(values.sequence.iter_data().map(|v| v.unwrap() as i64));
1462            sequences.extend(values.sequence.iter_data().map(|v| v.unwrap() as i64));
1463            op_types.extend(values.op_type.iter_data().map(|v| v.unwrap()));
1464            v0.extend(
1465                values
1466                    .fields
1467                    .first()
1468                    .unwrap()
1469                    .as_any()
1470                    .downcast_ref::<Int64Vector>()
1471                    .unwrap()
1472                    .iter_data()
1473                    .map(|v| v.unwrap()),
1474            );
1475        }
1476
1477        let expected_sequence = (0..(concurrency * 100) as i64).collect::<HashSet<_>>();
1478        assert_eq!(
1479            expected_sequence,
1480            sequences.iter().copied().collect::<HashSet<_>>()
1481        );
1482
1483        op_types.iter().all(|op| *op == OpType::Put as u8);
1484        assert_eq!(
1485            expected_sequence,
1486            timestamps.iter().copied().collect::<HashSet<_>>()
1487        );
1488
1489        assert_eq!(timestamps, sequences);
1490        assert_eq!(v0, timestamps);
1491    }
1492
1493    #[test]
1494    fn test_memtable() {
1495        common_telemetry::init_default_ut_logging();
1496        check_memtable_dedup(true);
1497        check_memtable_dedup(false);
1498    }
1499
1500    fn check_memtable_dedup(dedup: bool) {
1501        let schema = schema_for_test();
1502        let kvs = build_key_values(&schema, "hello".to_string(), 42, 100);
1503        let memtable = TimeSeriesMemtable::new(schema, 42, None, dedup, MergeMode::LastRow);
1504        memtable.write(&kvs).unwrap();
1505        memtable.write(&kvs).unwrap();
1506
1507        let mut expected_ts: HashMap<i64, usize> = HashMap::new();
1508        for ts in kvs
1509            .iter()
1510            .map(|kv| kv.timestamp().as_timestamp().unwrap().unwrap().value())
1511        {
1512            *expected_ts.entry(ts).or_default() += if dedup { 1 } else { 2 };
1513        }
1514
1515        let iter = memtable.iter(None, None, None).unwrap();
1516        let mut read = HashMap::new();
1517
1518        for ts in iter
1519            .flat_map(|batch| {
1520                batch
1521                    .unwrap()
1522                    .timestamps()
1523                    .as_any()
1524                    .downcast_ref::<TimestampMillisecondVector>()
1525                    .unwrap()
1526                    .iter_data()
1527                    .collect::<Vec<_>>()
1528                    .into_iter()
1529            })
1530            .map(|v| v.unwrap().0.value())
1531        {
1532            *read.entry(ts).or_default() += 1;
1533        }
1534        assert_eq!(expected_ts, read);
1535
1536        let stats = memtable.stats();
1537        assert!(stats.bytes_allocated() > 0);
1538        assert_eq!(
1539            Some((
1540                Timestamp::new_millisecond(0),
1541                Timestamp::new_millisecond(99)
1542            )),
1543            stats.time_range()
1544        );
1545    }
1546
1547    #[test]
1548    fn test_memtable_projection() {
1549        common_telemetry::init_default_ut_logging();
1550        let schema = schema_for_test();
1551        let kvs = build_key_values(&schema, "hello".to_string(), 42, 100);
1552        let memtable = TimeSeriesMemtable::new(schema, 42, None, true, MergeMode::LastRow);
1553        memtable.write(&kvs).unwrap();
1554
1555        let iter = memtable.iter(Some(&[3]), None, None).unwrap();
1556
1557        let mut v0_all = vec![];
1558
1559        for res in iter {
1560            let batch = res.unwrap();
1561            assert_eq!(1, batch.fields().len());
1562            let v0 = batch
1563                .fields()
1564                .first()
1565                .unwrap()
1566                .data
1567                .as_any()
1568                .downcast_ref::<Int64Vector>()
1569                .unwrap();
1570            v0_all.extend(v0.iter_data().map(|v| v.unwrap()));
1571        }
1572        assert_eq!((0..100i64).collect::<Vec<_>>(), v0_all);
1573    }
1574
1575    #[test]
1576    fn test_memtable_concurrent_write_read() {
1577        common_telemetry::init_default_ut_logging();
1578        let schema = schema_for_test();
1579        let memtable = Arc::new(TimeSeriesMemtable::new(
1580            schema.clone(),
1581            42,
1582            None,
1583            true,
1584            MergeMode::LastRow,
1585        ));
1586
1587        // Number of writer threads
1588        let num_writers = 10;
1589        // Number of reader threads
1590        let num_readers = 5;
1591        // Number of series per writer
1592        let series_per_writer = 100;
1593        // Number of rows per series
1594        let rows_per_series = 10;
1595        // Total number of series
1596        let total_series = num_writers * series_per_writer;
1597
1598        // Create a barrier to synchronize the start of all threads
1599        let barrier = Arc::new(std::sync::Barrier::new(num_writers + num_readers + 1));
1600
1601        // Spawn writer threads
1602        let mut writer_handles = Vec::with_capacity(num_writers);
1603        for writer_id in 0..num_writers {
1604            let memtable = memtable.clone();
1605            let schema = schema.clone();
1606            let barrier = barrier.clone();
1607
1608            let handle = std::thread::spawn(move || {
1609                // Wait for all threads to be ready
1610                barrier.wait();
1611
1612                // Create and write series
1613                for series_id in 0..series_per_writer {
1614                    let series_key = format!("writer-{}-series-{}", writer_id, series_id);
1615                    let kvs =
1616                        build_key_values(&schema, series_key, series_id as i64, rows_per_series);
1617                    memtable.write(&kvs).unwrap();
1618                }
1619            });
1620
1621            writer_handles.push(handle);
1622        }
1623
1624        // Spawn reader threads
1625        let mut reader_handles = Vec::with_capacity(num_readers);
1626        for _ in 0..num_readers {
1627            let memtable = memtable.clone();
1628            let barrier = barrier.clone();
1629
1630            let handle = std::thread::spawn(move || {
1631                barrier.wait();
1632
1633                for _ in 0..10 {
1634                    let iter = memtable.iter(None, None, None).unwrap();
1635                    for batch_result in iter {
1636                        let _ = batch_result.unwrap();
1637                    }
1638                }
1639            });
1640
1641            reader_handles.push(handle);
1642        }
1643
1644        barrier.wait();
1645
1646        for handle in writer_handles {
1647            handle.join().unwrap();
1648        }
1649        for handle in reader_handles {
1650            handle.join().unwrap();
1651        }
1652
1653        let iter = memtable.iter(None, None, None).unwrap();
1654        let mut series_count = 0;
1655        let mut row_count = 0;
1656
1657        for batch_result in iter {
1658            let batch = batch_result.unwrap();
1659            series_count += 1;
1660            row_count += batch.num_rows();
1661        }
1662        assert_eq!(total_series, series_count);
1663        assert_eq!(total_series * rows_per_series, row_count);
1664    }
1665}