mito2/memtable/
time_series.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::btree_map::Entry;
16use std::collections::{BTreeMap, Bound, HashSet};
17use std::fmt::{Debug, Formatter};
18use std::iter;
19use std::sync::atomic::{AtomicI64, AtomicU64, AtomicUsize, Ordering};
20use std::sync::{Arc, RwLock};
21use std::time::{Duration, Instant};
22
23use api::v1::OpType;
24use common_recordbatch::filter::SimpleFilterEvaluator;
25use common_telemetry::{debug, error};
26use common_time::Timestamp;
27use datatypes::arrow;
28use datatypes::arrow::array::ArrayRef;
29use datatypes::arrow_array::StringArray;
30use datatypes::data_type::{ConcreteDataType, DataType};
31use datatypes::prelude::{ScalarVector, Vector, VectorRef};
32use datatypes::types::TimestampType;
33use datatypes::value::{Value, ValueRef};
34use datatypes::vectors::{
35    Helper, TimestampMicrosecondVector, TimestampMillisecondVector, TimestampNanosecondVector,
36    TimestampSecondVector, UInt64Vector, UInt8Vector,
37};
38use mito_codec::key_values::KeyValue;
39use mito_codec::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt};
40use snafu::{ensure, OptionExt, ResultExt};
41use store_api::metadata::RegionMetadataRef;
42use store_api::storage::{ColumnId, SequenceNumber};
43use table::predicate::Predicate;
44
45use crate::error::{
46    self, ComputeArrowSnafu, ConvertVectorSnafu, EncodeSnafu, PrimaryKeyLengthMismatchSnafu, Result,
47};
48use crate::flush::WriteBufferManagerRef;
49use crate::memtable::builder::{FieldBuilder, StringBuilder};
50use crate::memtable::bulk::part::BulkPart;
51use crate::memtable::simple_bulk_memtable::SimpleBulkMemtable;
52use crate::memtable::stats::WriteMetrics;
53use crate::memtable::{
54    AllocTracker, BoxedBatchIterator, IterBuilder, KeyValues, Memtable, MemtableBuilder,
55    MemtableId, MemtableRange, MemtableRangeContext, MemtableRanges, MemtableRef, MemtableStats,
56    PredicateGroup,
57};
58use crate::metrics::{
59    MEMTABLE_ACTIVE_FIELD_BUILDER_COUNT, MEMTABLE_ACTIVE_SERIES_COUNT, READ_ROWS_TOTAL,
60    READ_STAGE_ELAPSED,
61};
62use crate::read::dedup::LastNonNullIter;
63use crate::read::{Batch, BatchBuilder, BatchColumn};
64use crate::region::options::MergeMode;
65
66/// Initial vector builder capacity.
67const INITIAL_BUILDER_CAPACITY: usize = 4;
68
69/// Vector builder capacity.
70const BUILDER_CAPACITY: usize = 512;
71
72/// Builder to build [TimeSeriesMemtable].
73#[derive(Debug, Default)]
74pub struct TimeSeriesMemtableBuilder {
75    write_buffer_manager: Option<WriteBufferManagerRef>,
76    dedup: bool,
77    merge_mode: MergeMode,
78}
79
80impl TimeSeriesMemtableBuilder {
81    /// Creates a new builder with specific `write_buffer_manager`.
82    pub fn new(
83        write_buffer_manager: Option<WriteBufferManagerRef>,
84        dedup: bool,
85        merge_mode: MergeMode,
86    ) -> Self {
87        Self {
88            write_buffer_manager,
89            dedup,
90            merge_mode,
91        }
92    }
93}
94
95impl MemtableBuilder for TimeSeriesMemtableBuilder {
96    fn build(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef {
97        if metadata.primary_key.is_empty() {
98            Arc::new(SimpleBulkMemtable::new(
99                id,
100                metadata.clone(),
101                self.write_buffer_manager.clone(),
102                self.dedup,
103                self.merge_mode,
104            ))
105        } else {
106            Arc::new(TimeSeriesMemtable::new(
107                metadata.clone(),
108                id,
109                self.write_buffer_manager.clone(),
110                self.dedup,
111                self.merge_mode,
112            ))
113        }
114    }
115}
116
117/// Memtable implementation that groups rows by their primary key.
118pub struct TimeSeriesMemtable {
119    id: MemtableId,
120    region_metadata: RegionMetadataRef,
121    row_codec: Arc<DensePrimaryKeyCodec>,
122    series_set: SeriesSet,
123    alloc_tracker: AllocTracker,
124    max_timestamp: AtomicI64,
125    min_timestamp: AtomicI64,
126    max_sequence: AtomicU64,
127    dedup: bool,
128    merge_mode: MergeMode,
129    /// Total written rows in memtable. This also includes deleted and duplicated rows.
130    num_rows: AtomicUsize,
131}
132
133impl TimeSeriesMemtable {
134    pub fn new(
135        region_metadata: RegionMetadataRef,
136        id: MemtableId,
137        write_buffer_manager: Option<WriteBufferManagerRef>,
138        dedup: bool,
139        merge_mode: MergeMode,
140    ) -> Self {
141        let row_codec = Arc::new(DensePrimaryKeyCodec::new(&region_metadata));
142        let series_set = SeriesSet::new(region_metadata.clone(), row_codec.clone());
143        let dedup = if merge_mode == MergeMode::LastNonNull {
144            false
145        } else {
146            dedup
147        };
148        Self {
149            id,
150            region_metadata,
151            series_set,
152            row_codec,
153            alloc_tracker: AllocTracker::new(write_buffer_manager),
154            max_timestamp: AtomicI64::new(i64::MIN),
155            min_timestamp: AtomicI64::new(i64::MAX),
156            max_sequence: AtomicU64::new(0),
157            dedup,
158            merge_mode,
159            num_rows: Default::default(),
160        }
161    }
162
163    /// Updates memtable stats.
164    fn update_stats(&self, stats: WriteMetrics) {
165        self.alloc_tracker
166            .on_allocation(stats.key_bytes + stats.value_bytes);
167        self.max_timestamp.fetch_max(stats.max_ts, Ordering::SeqCst);
168        self.min_timestamp.fetch_min(stats.min_ts, Ordering::SeqCst);
169        self.max_sequence
170            .fetch_max(stats.max_sequence, Ordering::SeqCst);
171        self.num_rows.fetch_add(stats.num_rows, Ordering::SeqCst);
172    }
173
174    fn write_key_value(&self, kv: KeyValue, stats: &mut WriteMetrics) -> Result<()> {
175        ensure!(
176            self.row_codec.num_fields() == kv.num_primary_keys(),
177            PrimaryKeyLengthMismatchSnafu {
178                expect: self.row_codec.num_fields(),
179                actual: kv.num_primary_keys(),
180            }
181        );
182
183        let primary_key_encoded = self
184            .row_codec
185            .encode(kv.primary_keys())
186            .context(EncodeSnafu)?;
187
188        let (key_allocated, value_allocated) =
189            self.series_set.push_to_series(primary_key_encoded, &kv);
190        stats.key_bytes += key_allocated;
191        stats.value_bytes += value_allocated;
192
193        // safety: timestamp of kv must be both present and a valid timestamp value.
194        let ts = kv.timestamp().as_timestamp().unwrap().unwrap().value();
195        stats.min_ts = stats.min_ts.min(ts);
196        stats.max_ts = stats.max_ts.max(ts);
197        Ok(())
198    }
199}
200
201impl Debug for TimeSeriesMemtable {
202    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
203        f.debug_struct("TimeSeriesMemtable").finish()
204    }
205}
206
207impl Memtable for TimeSeriesMemtable {
208    fn id(&self) -> MemtableId {
209        self.id
210    }
211
212    fn write(&self, kvs: &KeyValues) -> Result<()> {
213        if kvs.is_empty() {
214            return Ok(());
215        }
216
217        let mut local_stats = WriteMetrics::default();
218
219        for kv in kvs.iter() {
220            self.write_key_value(kv, &mut local_stats)?;
221        }
222        local_stats.value_bytes += kvs.num_rows() * std::mem::size_of::<Timestamp>();
223        local_stats.value_bytes += kvs.num_rows() * std::mem::size_of::<OpType>();
224        local_stats.max_sequence = kvs.max_sequence();
225        local_stats.num_rows = kvs.num_rows();
226        // TODO(hl): this maybe inaccurate since for-iteration may return early.
227        // We may lift the primary key length check out of Memtable::write
228        // so that we can ensure writing to memtable will succeed.
229        self.update_stats(local_stats);
230        Ok(())
231    }
232
233    fn write_one(&self, key_value: KeyValue) -> Result<()> {
234        let mut metrics = WriteMetrics::default();
235        let res = self.write_key_value(key_value, &mut metrics);
236        metrics.value_bytes += std::mem::size_of::<Timestamp>() + std::mem::size_of::<OpType>();
237        metrics.max_sequence = key_value.sequence();
238        metrics.num_rows = 1;
239
240        if res.is_ok() {
241            self.update_stats(metrics);
242        }
243        res
244    }
245
246    fn write_bulk(&self, part: BulkPart) -> Result<()> {
247        // Default implementation fallback to row iteration.
248        let mutation = part.to_mutation(&self.region_metadata)?;
249        let mut metrics = WriteMetrics::default();
250        if let Some(key_values) = KeyValues::new(&self.region_metadata, mutation) {
251            for kv in key_values.iter() {
252                self.write_key_value(kv, &mut metrics)?
253            }
254        }
255
256        metrics.max_sequence = part.sequence;
257        metrics.max_ts = part.max_ts;
258        metrics.min_ts = part.min_ts;
259        metrics.num_rows = part.num_rows();
260        self.update_stats(metrics);
261        Ok(())
262    }
263
264    fn iter(
265        &self,
266        projection: Option<&[ColumnId]>,
267        filters: Option<Predicate>,
268        sequence: Option<SequenceNumber>,
269    ) -> Result<BoxedBatchIterator> {
270        let projection = if let Some(projection) = projection {
271            projection.iter().copied().collect()
272        } else {
273            self.region_metadata
274                .field_columns()
275                .map(|c| c.column_id)
276                .collect()
277        };
278
279        let iter = self
280            .series_set
281            .iter_series(projection, filters, self.dedup, sequence)?;
282
283        if self.merge_mode == MergeMode::LastNonNull {
284            let iter = LastNonNullIter::new(iter);
285            Ok(Box::new(iter))
286        } else {
287            Ok(Box::new(iter))
288        }
289    }
290
291    fn ranges(
292        &self,
293        projection: Option<&[ColumnId]>,
294        predicate: PredicateGroup,
295        sequence: Option<SequenceNumber>,
296    ) -> Result<MemtableRanges> {
297        let projection = if let Some(projection) = projection {
298            projection.iter().copied().collect()
299        } else {
300            self.region_metadata
301                .field_columns()
302                .map(|c| c.column_id)
303                .collect()
304        };
305        let builder = Box::new(TimeSeriesIterBuilder {
306            series_set: self.series_set.clone(),
307            projection,
308            predicate: predicate.predicate().cloned(),
309            dedup: self.dedup,
310            merge_mode: self.merge_mode,
311            sequence,
312        });
313        let context = Arc::new(MemtableRangeContext::new(self.id, builder, predicate));
314
315        Ok(MemtableRanges {
316            ranges: [(0, MemtableRange::new(context))].into(),
317            stats: self.stats(),
318        })
319    }
320
321    fn is_empty(&self) -> bool {
322        self.series_set.series.read().unwrap().0.is_empty()
323    }
324
325    fn freeze(&self) -> Result<()> {
326        self.alloc_tracker.done_allocating();
327
328        Ok(())
329    }
330
331    fn stats(&self) -> MemtableStats {
332        let estimated_bytes = self.alloc_tracker.bytes_allocated();
333
334        if estimated_bytes == 0 {
335            // no rows ever written
336            return MemtableStats {
337                estimated_bytes,
338                time_range: None,
339                num_rows: 0,
340                num_ranges: 0,
341                max_sequence: 0,
342                series_count: 0,
343            };
344        }
345        let ts_type = self
346            .region_metadata
347            .time_index_column()
348            .column_schema
349            .data_type
350            .clone()
351            .as_timestamp()
352            .expect("Timestamp column must have timestamp type");
353        let max_timestamp = ts_type.create_timestamp(self.max_timestamp.load(Ordering::Relaxed));
354        let min_timestamp = ts_type.create_timestamp(self.min_timestamp.load(Ordering::Relaxed));
355        let series_count = self.series_set.series.read().unwrap().0.len();
356        MemtableStats {
357            estimated_bytes,
358            time_range: Some((min_timestamp, max_timestamp)),
359            num_rows: self.num_rows.load(Ordering::Relaxed),
360            num_ranges: 1,
361            max_sequence: self.max_sequence.load(Ordering::Relaxed),
362            series_count,
363        }
364    }
365
366    fn fork(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef {
367        Arc::new(TimeSeriesMemtable::new(
368            metadata.clone(),
369            id,
370            self.alloc_tracker.write_buffer_manager(),
371            self.dedup,
372            self.merge_mode,
373        ))
374    }
375}
376
377#[derive(Default)]
378struct SeriesMap(BTreeMap<Vec<u8>, Arc<RwLock<Series>>>);
379
380impl Drop for SeriesMap {
381    fn drop(&mut self) {
382        let num_series = self.0.len();
383        let num_field_builders = self
384            .0
385            .values()
386            .map(|v| v.read().unwrap().active.num_field_builders())
387            .sum::<usize>();
388        MEMTABLE_ACTIVE_SERIES_COUNT.sub(num_series as i64);
389        MEMTABLE_ACTIVE_FIELD_BUILDER_COUNT.sub(num_field_builders as i64);
390    }
391}
392
393#[derive(Clone)]
394pub(crate) struct SeriesSet {
395    region_metadata: RegionMetadataRef,
396    series: Arc<RwLock<SeriesMap>>,
397    codec: Arc<DensePrimaryKeyCodec>,
398}
399
400impl SeriesSet {
401    fn new(region_metadata: RegionMetadataRef, codec: Arc<DensePrimaryKeyCodec>) -> Self {
402        Self {
403            region_metadata,
404            series: Default::default(),
405            codec,
406        }
407    }
408}
409
410impl SeriesSet {
411    /// Push [KeyValue] to SeriesSet with given primary key and return key/value allocated memory size.
412    fn push_to_series(&self, primary_key: Vec<u8>, kv: &KeyValue) -> (usize, usize) {
413        if let Some(series) = self.series.read().unwrap().0.get(&primary_key) {
414            let value_allocated = series.write().unwrap().push(
415                kv.timestamp(),
416                kv.sequence(),
417                kv.op_type(),
418                kv.fields(),
419            );
420            return (0, value_allocated);
421        };
422
423        let mut indices = self.series.write().unwrap();
424        match indices.0.entry(primary_key) {
425            Entry::Vacant(v) => {
426                let key_len = v.key().len();
427                let mut series = Series::new(&self.region_metadata);
428                let value_allocated =
429                    series.push(kv.timestamp(), kv.sequence(), kv.op_type(), kv.fields());
430                v.insert(Arc::new(RwLock::new(series)));
431                (key_len, value_allocated)
432            }
433            // safety: series must exist at given index.
434            Entry::Occupied(v) => {
435                let value_allocated = v.get().write().unwrap().push(
436                    kv.timestamp(),
437                    kv.sequence(),
438                    kv.op_type(),
439                    kv.fields(),
440                );
441                (0, value_allocated)
442            }
443        }
444    }
445
446    #[cfg(test)]
447    fn get_series(&self, primary_key: &[u8]) -> Option<Arc<RwLock<Series>>> {
448        self.series.read().unwrap().0.get(primary_key).cloned()
449    }
450
451    /// Iterates all series in [SeriesSet].
452    fn iter_series(
453        &self,
454        projection: HashSet<ColumnId>,
455        predicate: Option<Predicate>,
456        dedup: bool,
457        sequence: Option<SequenceNumber>,
458    ) -> Result<Iter> {
459        let primary_key_schema = primary_key_schema(&self.region_metadata);
460        let primary_key_datatypes = self
461            .region_metadata
462            .primary_key_columns()
463            .map(|pk| pk.column_schema.data_type.clone())
464            .collect();
465
466        Iter::try_new(
467            self.region_metadata.clone(),
468            self.series.clone(),
469            projection,
470            predicate,
471            primary_key_schema,
472            primary_key_datatypes,
473            self.codec.clone(),
474            dedup,
475            sequence,
476        )
477    }
478}
479
480/// Creates an arrow [SchemaRef](arrow::datatypes::SchemaRef) that only contains primary keys
481/// of given region schema
482pub(crate) fn primary_key_schema(
483    region_metadata: &RegionMetadataRef,
484) -> arrow::datatypes::SchemaRef {
485    let fields = region_metadata
486        .primary_key_columns()
487        .map(|pk| {
488            arrow::datatypes::Field::new(
489                pk.column_schema.name.clone(),
490                pk.column_schema.data_type.as_arrow_type(),
491                pk.column_schema.is_nullable(),
492            )
493        })
494        .collect::<Vec<_>>();
495    Arc::new(arrow::datatypes::Schema::new(fields))
496}
497
498/// Metrics for reading the memtable.
499#[derive(Debug, Default)]
500struct Metrics {
501    /// Total series in the memtable.
502    total_series: usize,
503    /// Number of series pruned.
504    num_pruned_series: usize,
505    /// Number of rows read.
506    num_rows: usize,
507    /// Number of batch read.
508    num_batches: usize,
509    /// Duration to scan the memtable.
510    scan_cost: Duration,
511}
512
513struct Iter {
514    metadata: RegionMetadataRef,
515    series: Arc<RwLock<SeriesMap>>,
516    projection: HashSet<ColumnId>,
517    last_key: Option<Vec<u8>>,
518    predicate: Vec<SimpleFilterEvaluator>,
519    pk_schema: arrow::datatypes::SchemaRef,
520    pk_datatypes: Vec<ConcreteDataType>,
521    codec: Arc<DensePrimaryKeyCodec>,
522    dedup: bool,
523    sequence: Option<SequenceNumber>,
524    metrics: Metrics,
525}
526
527impl Iter {
528    #[allow(clippy::too_many_arguments)]
529    pub(crate) fn try_new(
530        metadata: RegionMetadataRef,
531        series: Arc<RwLock<SeriesMap>>,
532        projection: HashSet<ColumnId>,
533        predicate: Option<Predicate>,
534        pk_schema: arrow::datatypes::SchemaRef,
535        pk_datatypes: Vec<ConcreteDataType>,
536        codec: Arc<DensePrimaryKeyCodec>,
537        dedup: bool,
538        sequence: Option<SequenceNumber>,
539    ) -> Result<Self> {
540        let predicate = predicate
541            .map(|predicate| {
542                predicate
543                    .exprs()
544                    .iter()
545                    .filter_map(SimpleFilterEvaluator::try_new)
546                    .collect::<Vec<_>>()
547            })
548            .unwrap_or_default();
549        Ok(Self {
550            metadata,
551            series,
552            projection,
553            last_key: None,
554            predicate,
555            pk_schema,
556            pk_datatypes,
557            codec,
558            dedup,
559            sequence,
560            metrics: Metrics::default(),
561        })
562    }
563}
564
565impl Drop for Iter {
566    fn drop(&mut self) {
567        debug!(
568            "Iter {} time series memtable, metrics: {:?}",
569            self.metadata.region_id, self.metrics
570        );
571
572        READ_ROWS_TOTAL
573            .with_label_values(&["time_series_memtable"])
574            .inc_by(self.metrics.num_rows as u64);
575        READ_STAGE_ELAPSED
576            .with_label_values(&["scan_memtable"])
577            .observe(self.metrics.scan_cost.as_secs_f64());
578    }
579}
580
581impl Iterator for Iter {
582    type Item = Result<Batch>;
583
584    fn next(&mut self) -> Option<Self::Item> {
585        let start = Instant::now();
586        let map = self.series.read().unwrap();
587        let range = match &self.last_key {
588            None => map.0.range::<Vec<u8>, _>(..),
589            Some(last_key) => map
590                .0
591                .range::<Vec<u8>, _>((Bound::Excluded(last_key), Bound::Unbounded)),
592        };
593
594        // TODO(hl): maybe yield more than one time series to amortize range overhead.
595        for (primary_key, series) in range {
596            self.metrics.total_series += 1;
597
598            let mut series = series.write().unwrap();
599            if !self.predicate.is_empty()
600                && !prune_primary_key(
601                    &self.codec,
602                    primary_key.as_slice(),
603                    &mut series,
604                    &self.pk_datatypes,
605                    self.pk_schema.clone(),
606                    &self.predicate,
607                )
608            {
609                // read next series
610                self.metrics.num_pruned_series += 1;
611                continue;
612            }
613            self.last_key = Some(primary_key.clone());
614
615            let values = series.compact(&self.metadata);
616            let batch = values.and_then(|v| {
617                v.to_batch(primary_key, &self.metadata, &self.projection, self.dedup)
618            });
619
620            // Update metrics.
621            self.metrics.num_batches += 1;
622            self.metrics.num_rows += batch.as_ref().map(|b| b.num_rows()).unwrap_or(0);
623            self.metrics.scan_cost += start.elapsed();
624
625            let mut batch = batch;
626            batch = batch.and_then(|mut batch| {
627                batch.filter_by_sequence(self.sequence)?;
628                Ok(batch)
629            });
630            return Some(batch);
631        }
632        self.metrics.scan_cost += start.elapsed();
633
634        None
635    }
636}
637
638fn prune_primary_key(
639    codec: &Arc<DensePrimaryKeyCodec>,
640    pk: &[u8],
641    series: &mut Series,
642    datatypes: &[ConcreteDataType],
643    pk_schema: arrow::datatypes::SchemaRef,
644    predicates: &[SimpleFilterEvaluator],
645) -> bool {
646    // no primary key, we simply return true.
647    if pk_schema.fields().is_empty() {
648        return true;
649    }
650
651    // retrieve primary key values from cache or decode from bytes.
652    let pk_values = if let Some(pk_values) = series.pk_cache.as_ref() {
653        pk_values
654    } else {
655        let pk_values = codec.decode_dense_without_column_id(pk);
656        if let Err(e) = pk_values {
657            error!(e; "Failed to decode primary key");
658            return true;
659        }
660        series.update_pk_cache(pk_values.unwrap());
661        series.pk_cache.as_ref().unwrap()
662    };
663
664    // evaluate predicates against primary key values
665    let mut result = true;
666    for predicate in predicates {
667        // ignore predicates that are not referencing primary key columns
668        let Ok(index) = pk_schema.index_of(predicate.column_name()) else {
669            continue;
670        };
671        // Safety: arrow schema and datatypes are constructed from the same source.
672        let scalar_value = pk_values[index]
673            .try_to_scalar_value(&datatypes[index])
674            .unwrap();
675        result &= predicate.evaluate_scalar(&scalar_value).unwrap_or(true);
676    }
677
678    result
679}
680
681/// A `Series` holds a list of field values of some given primary key.
682pub(crate) struct Series {
683    pk_cache: Option<Vec<Value>>,
684    active: ValueBuilder,
685    frozen: Vec<Values>,
686    region_metadata: RegionMetadataRef,
687}
688
689impl Series {
690    pub(crate) fn with_capacity(region_metadata: &RegionMetadataRef, builder_cap: usize) -> Self {
691        MEMTABLE_ACTIVE_SERIES_COUNT.inc();
692        Self {
693            pk_cache: None,
694            active: ValueBuilder::new(region_metadata, builder_cap),
695            frozen: vec![],
696            region_metadata: region_metadata.clone(),
697        }
698    }
699
700    pub(crate) fn new(region_metadata: &RegionMetadataRef) -> Self {
701        Self::with_capacity(region_metadata, INITIAL_BUILDER_CAPACITY)
702    }
703
704    pub fn is_empty(&self) -> bool {
705        self.active.len() == 0 && self.frozen.is_empty()
706    }
707
708    /// Pushes a row of values into Series. Return the size of values.
709    pub(crate) fn push<'a>(
710        &mut self,
711        ts: ValueRef<'a>,
712        sequence: u64,
713        op_type: OpType,
714        values: impl Iterator<Item = ValueRef<'a>>,
715    ) -> usize {
716        // + 10 to avoid potential reallocation.
717        if self.active.len() + 10 > BUILDER_CAPACITY {
718            let region_metadata = self.region_metadata.clone();
719            self.freeze(&region_metadata);
720        }
721        self.active.push(ts, sequence, op_type as u8, values)
722    }
723
724    fn update_pk_cache(&mut self, pk_values: Vec<Value>) {
725        self.pk_cache = Some(pk_values);
726    }
727
728    /// Freezes the active part and push it to `frozen`.
729    pub(crate) fn freeze(&mut self, region_metadata: &RegionMetadataRef) {
730        if self.active.len() != 0 {
731            let mut builder = ValueBuilder::new(region_metadata, INITIAL_BUILDER_CAPACITY);
732            std::mem::swap(&mut self.active, &mut builder);
733            self.frozen.push(Values::from(builder));
734        }
735    }
736
737    pub(crate) fn extend(
738        &mut self,
739        ts_v: VectorRef,
740        op_type_v: u8,
741        sequence_v: u64,
742        fields: impl Iterator<Item = VectorRef>,
743    ) -> Result<()> {
744        self.active.extend(ts_v, op_type_v, sequence_v, fields)
745    }
746
747    /// Freezes active part to frozen part and compact frozen part to reduce memory fragmentation.
748    /// Returns the frozen and compacted values.
749    pub(crate) fn compact(&mut self, region_metadata: &RegionMetadataRef) -> Result<&Values> {
750        self.freeze(region_metadata);
751
752        let frozen = &self.frozen;
753
754        // Each series must contain at least one row
755        debug_assert!(!frozen.is_empty());
756
757        if frozen.len() > 1 {
758            // TODO(hl): We should keep track of min/max timestamps for each values and avoid
759            // cloning and sorting when values do not overlap with each other.
760
761            let column_size = frozen[0].fields.len() + 3;
762
763            if cfg!(debug_assertions) {
764                debug_assert!(frozen
765                    .iter()
766                    .zip(frozen.iter().skip(1))
767                    .all(|(prev, next)| { prev.fields.len() == next.fields.len() }));
768            }
769
770            let arrays = frozen.iter().map(|v| v.columns()).collect::<Vec<_>>();
771            let concatenated = (0..column_size)
772                .map(|i| {
773                    let to_concat = arrays.iter().map(|a| a[i].as_ref()).collect::<Vec<_>>();
774                    arrow::compute::concat(&to_concat)
775                })
776                .collect::<std::result::Result<Vec<_>, _>>()
777                .context(ComputeArrowSnafu)?;
778
779            debug_assert_eq!(concatenated.len(), column_size);
780            let values = Values::from_columns(&concatenated)?;
781            self.frozen = vec![values];
782        };
783        Ok(&self.frozen[0])
784    }
785}
786
787/// `ValueBuilder` holds all the vector builders for field columns.
788struct ValueBuilder {
789    timestamp: Vec<i64>,
790    timestamp_type: ConcreteDataType,
791    sequence: Vec<u64>,
792    op_type: Vec<u8>,
793    fields: Vec<Option<FieldBuilder>>,
794    field_types: Vec<ConcreteDataType>,
795}
796
797impl ValueBuilder {
798    pub(crate) fn new(region_metadata: &RegionMetadataRef, capacity: usize) -> Self {
799        let timestamp_type = region_metadata
800            .time_index_column()
801            .column_schema
802            .data_type
803            .clone();
804        let sequence = Vec::with_capacity(capacity);
805        let op_type = Vec::with_capacity(capacity);
806
807        let field_types = region_metadata
808            .field_columns()
809            .map(|c| c.column_schema.data_type.clone())
810            .collect::<Vec<_>>();
811        let fields = (0..field_types.len()).map(|_| None).collect();
812        Self {
813            timestamp: Vec::with_capacity(capacity),
814            timestamp_type,
815            sequence,
816            op_type,
817            fields,
818            field_types,
819        }
820    }
821
822    /// Returns number of field builders.
823    pub fn num_field_builders(&self) -> usize {
824        self.fields.iter().flatten().count()
825    }
826
827    /// Pushes a new row to `ValueBuilder`.
828    /// We don't need primary keys since they've already be encoded.
829    /// Returns the size of field values.
830    ///
831    /// In this method, we don't check the data type of the value, because it is already checked in the caller.
832    fn push<'a>(
833        &mut self,
834        ts: ValueRef,
835        sequence: u64,
836        op_type: u8,
837        fields: impl Iterator<Item = ValueRef<'a>>,
838    ) -> usize {
839        #[cfg(debug_assertions)]
840        let fields = {
841            let field_vec = fields.collect::<Vec<_>>();
842            debug_assert_eq!(field_vec.len(), self.fields.len());
843            field_vec.into_iter()
844        };
845
846        self.timestamp
847            .push(ts.as_timestamp().unwrap().unwrap().value());
848        self.sequence.push(sequence);
849        self.op_type.push(op_type);
850        let num_rows = self.timestamp.len();
851        let mut size = 0;
852        for (idx, field_value) in fields.enumerate() {
853            size += field_value.data_size();
854            if !field_value.is_null() || self.fields[idx].is_some() {
855                if let Some(field) = self.fields[idx].as_mut() {
856                    let _ = field.push(field_value);
857                } else {
858                    let mut mutable_vector =
859                        if let ConcreteDataType::String(_) = &self.field_types[idx] {
860                            FieldBuilder::String(StringBuilder::with_capacity(4, 8))
861                        } else {
862                            FieldBuilder::Other(
863                                self.field_types[idx]
864                                    .create_mutable_vector(num_rows.max(INITIAL_BUILDER_CAPACITY)),
865                            )
866                        };
867                    mutable_vector.push_nulls(num_rows - 1);
868                    let _ = mutable_vector.push(field_value);
869                    self.fields[idx] = Some(mutable_vector);
870                    MEMTABLE_ACTIVE_FIELD_BUILDER_COUNT.inc();
871                }
872            }
873        }
874
875        size
876    }
877
878    pub(crate) fn extend(
879        &mut self,
880        ts_v: VectorRef,
881        op_type: u8,
882        sequence: u64,
883        fields: impl Iterator<Item = VectorRef>,
884    ) -> error::Result<()> {
885        let num_rows_before = self.timestamp.len();
886        let num_rows_to_write = ts_v.len();
887        self.timestamp.reserve(num_rows_to_write);
888        match self.timestamp_type {
889            ConcreteDataType::Timestamp(TimestampType::Second(_)) => {
890                self.timestamp.extend(
891                    ts_v.as_any()
892                        .downcast_ref::<TimestampSecondVector>()
893                        .unwrap()
894                        .iter_data()
895                        .map(|v| v.unwrap().0.value()),
896                );
897            }
898            ConcreteDataType::Timestamp(TimestampType::Millisecond(_)) => {
899                self.timestamp.extend(
900                    ts_v.as_any()
901                        .downcast_ref::<TimestampMillisecondVector>()
902                        .unwrap()
903                        .iter_data()
904                        .map(|v| v.unwrap().0.value()),
905                );
906            }
907            ConcreteDataType::Timestamp(TimestampType::Microsecond(_)) => {
908                self.timestamp.extend(
909                    ts_v.as_any()
910                        .downcast_ref::<TimestampMicrosecondVector>()
911                        .unwrap()
912                        .iter_data()
913                        .map(|v| v.unwrap().0.value()),
914                );
915            }
916            ConcreteDataType::Timestamp(TimestampType::Nanosecond(_)) => {
917                self.timestamp.extend(
918                    ts_v.as_any()
919                        .downcast_ref::<TimestampNanosecondVector>()
920                        .unwrap()
921                        .iter_data()
922                        .map(|v| v.unwrap().0.value()),
923                );
924            }
925            _ => unreachable!(),
926        };
927
928        self.op_type.reserve(num_rows_to_write);
929        self.op_type
930            .extend(iter::repeat_n(op_type, num_rows_to_write));
931        self.sequence.reserve(num_rows_to_write);
932        self.sequence
933            .extend(iter::repeat_n(sequence, num_rows_to_write));
934
935        for (field_idx, (field_src, field_dest)) in fields.zip(self.fields.iter_mut()).enumerate() {
936            let builder = field_dest.get_or_insert_with(|| {
937                let mut field_builder =
938                    FieldBuilder::create(&self.field_types[field_idx], INITIAL_BUILDER_CAPACITY);
939                field_builder.push_nulls(num_rows_before);
940                field_builder
941            });
942            match builder {
943                FieldBuilder::String(builder) => {
944                    let array = field_src.to_arrow_array();
945                    let string_array =
946                        array
947                            .as_any()
948                            .downcast_ref::<StringArray>()
949                            .with_context(|| error::InvalidBatchSnafu {
950                                reason: format!(
951                                    "Field type mismatch, expecting String, given: {}",
952                                    field_src.data_type()
953                                ),
954                            })?;
955                    builder.append_array(string_array);
956                }
957                FieldBuilder::Other(builder) => {
958                    let len = field_src.len();
959                    builder
960                        .extend_slice_of(&*field_src, 0, len)
961                        .context(error::ComputeVectorSnafu)?;
962                }
963            }
964        }
965        Ok(())
966    }
967
968    /// Returns the length of [ValueBuilder]
969    fn len(&self) -> usize {
970        let sequence_len = self.sequence.len();
971        debug_assert_eq!(sequence_len, self.op_type.len());
972        debug_assert_eq!(sequence_len, self.timestamp.len());
973        sequence_len
974    }
975}
976
977/// [Values] holds an immutable vectors of field columns, including `sequence` and `op_type`.
978#[derive(Clone)]
979pub(crate) struct Values {
980    timestamp: VectorRef,
981    sequence: Arc<UInt64Vector>,
982    op_type: Arc<UInt8Vector>,
983    fields: Vec<VectorRef>,
984}
985
986impl Values {
987    /// Converts [Values] to `Batch`, sorts the batch according to `timestamp, sequence` desc and
988    /// keeps only the latest row for the same timestamp.
989    pub fn to_batch(
990        &self,
991        primary_key: &[u8],
992        metadata: &RegionMetadataRef,
993        projection: &HashSet<ColumnId>,
994        dedup: bool,
995    ) -> Result<Batch> {
996        let builder = BatchBuilder::with_required_columns(
997            primary_key.to_vec(),
998            self.timestamp.clone(),
999            self.sequence.clone(),
1000            self.op_type.clone(),
1001        );
1002
1003        let fields = metadata
1004            .field_columns()
1005            .zip(self.fields.iter())
1006            .filter_map(|(c, f)| {
1007                projection.get(&c.column_id).map(|c| BatchColumn {
1008                    column_id: *c,
1009                    data: f.clone(),
1010                })
1011            })
1012            .collect();
1013
1014        let mut batch = builder.with_fields(fields).build()?;
1015        batch.sort(dedup)?;
1016        Ok(batch)
1017    }
1018
1019    /// Returns a vector of all columns converted to arrow [Array](datatypes::arrow::array::Array) in [Values].
1020    fn columns(&self) -> Vec<ArrayRef> {
1021        let mut res = Vec::with_capacity(3 + self.fields.len());
1022        res.push(self.timestamp.to_arrow_array());
1023        res.push(self.sequence.to_arrow_array());
1024        res.push(self.op_type.to_arrow_array());
1025        res.extend(self.fields.iter().map(|f| f.to_arrow_array()));
1026        res
1027    }
1028
1029    /// Builds a new [Values] instance from columns.
1030    fn from_columns(cols: &[ArrayRef]) -> Result<Self> {
1031        debug_assert!(cols.len() >= 3);
1032        let timestamp = Helper::try_into_vector(&cols[0]).context(ConvertVectorSnafu)?;
1033        let sequence =
1034            Arc::new(UInt64Vector::try_from_arrow_array(&cols[1]).context(ConvertVectorSnafu)?);
1035        let op_type =
1036            Arc::new(UInt8Vector::try_from_arrow_array(&cols[2]).context(ConvertVectorSnafu)?);
1037        let fields = Helper::try_into_vectors(&cols[3..]).context(ConvertVectorSnafu)?;
1038
1039        Ok(Self {
1040            timestamp,
1041            sequence,
1042            op_type,
1043            fields,
1044        })
1045    }
1046}
1047
1048impl From<ValueBuilder> for Values {
1049    fn from(mut value: ValueBuilder) -> Self {
1050        let num_rows = value.len();
1051        let fields = value
1052            .fields
1053            .iter_mut()
1054            .enumerate()
1055            .map(|(i, v)| {
1056                if let Some(v) = v {
1057                    MEMTABLE_ACTIVE_FIELD_BUILDER_COUNT.dec();
1058                    v.finish()
1059                } else {
1060                    let mut single_null = value.field_types[i].create_mutable_vector(num_rows);
1061                    single_null.push_nulls(num_rows);
1062                    single_null.to_vector()
1063                }
1064            })
1065            .collect::<Vec<_>>();
1066
1067        let sequence = Arc::new(UInt64Vector::from_vec(value.sequence));
1068        let op_type = Arc::new(UInt8Vector::from_vec(value.op_type));
1069        let timestamp: VectorRef = match value.timestamp_type {
1070            ConcreteDataType::Timestamp(TimestampType::Second(_)) => {
1071                Arc::new(TimestampSecondVector::from_vec(value.timestamp))
1072            }
1073            ConcreteDataType::Timestamp(TimestampType::Millisecond(_)) => {
1074                Arc::new(TimestampMillisecondVector::from_vec(value.timestamp))
1075            }
1076            ConcreteDataType::Timestamp(TimestampType::Microsecond(_)) => {
1077                Arc::new(TimestampMicrosecondVector::from_vec(value.timestamp))
1078            }
1079            ConcreteDataType::Timestamp(TimestampType::Nanosecond(_)) => {
1080                Arc::new(TimestampNanosecondVector::from_vec(value.timestamp))
1081            }
1082            _ => unreachable!(),
1083        };
1084
1085        if cfg!(debug_assertions) {
1086            debug_assert_eq!(timestamp.len(), sequence.len());
1087            debug_assert_eq!(timestamp.len(), op_type.len());
1088            for field in &fields {
1089                debug_assert_eq!(timestamp.len(), field.len());
1090            }
1091        }
1092
1093        Self {
1094            timestamp,
1095            sequence,
1096            op_type,
1097            fields,
1098        }
1099    }
1100}
1101
1102struct TimeSeriesIterBuilder {
1103    series_set: SeriesSet,
1104    projection: HashSet<ColumnId>,
1105    predicate: Option<Predicate>,
1106    dedup: bool,
1107    sequence: Option<SequenceNumber>,
1108    merge_mode: MergeMode,
1109}
1110
1111impl IterBuilder for TimeSeriesIterBuilder {
1112    fn build(&self) -> Result<BoxedBatchIterator> {
1113        let iter = self.series_set.iter_series(
1114            self.projection.clone(),
1115            self.predicate.clone(),
1116            self.dedup,
1117            self.sequence,
1118        )?;
1119
1120        if self.merge_mode == MergeMode::LastNonNull {
1121            let iter = LastNonNullIter::new(iter);
1122            Ok(Box::new(iter))
1123        } else {
1124            Ok(Box::new(iter))
1125        }
1126    }
1127}
1128
1129#[cfg(test)]
1130mod tests {
1131    use std::collections::{HashMap, HashSet};
1132
1133    use api::helper::ColumnDataTypeWrapper;
1134    use api::v1::value::ValueData;
1135    use api::v1::{Mutation, Row, Rows, SemanticType};
1136    use common_time::Timestamp;
1137    use datatypes::prelude::{ConcreteDataType, ScalarVector};
1138    use datatypes::schema::ColumnSchema;
1139    use datatypes::value::{OrderedFloat, Value};
1140    use datatypes::vectors::{Float64Vector, Int64Vector, TimestampMillisecondVector};
1141    use mito_codec::row_converter::SortField;
1142    use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
1143    use store_api::storage::RegionId;
1144
1145    use super::*;
1146    use crate::test_util::column_metadata_to_column_schema;
1147
1148    fn schema_for_test() -> RegionMetadataRef {
1149        let mut builder = RegionMetadataBuilder::new(RegionId::new(123, 456));
1150        builder
1151            .push_column_metadata(ColumnMetadata {
1152                column_schema: ColumnSchema::new("k0", ConcreteDataType::string_datatype(), false),
1153                semantic_type: SemanticType::Tag,
1154                column_id: 0,
1155            })
1156            .push_column_metadata(ColumnMetadata {
1157                column_schema: ColumnSchema::new("k1", ConcreteDataType::int64_datatype(), false),
1158                semantic_type: SemanticType::Tag,
1159                column_id: 1,
1160            })
1161            .push_column_metadata(ColumnMetadata {
1162                column_schema: ColumnSchema::new(
1163                    "ts",
1164                    ConcreteDataType::timestamp_millisecond_datatype(),
1165                    false,
1166                ),
1167                semantic_type: SemanticType::Timestamp,
1168                column_id: 2,
1169            })
1170            .push_column_metadata(ColumnMetadata {
1171                column_schema: ColumnSchema::new("v0", ConcreteDataType::int64_datatype(), true),
1172                semantic_type: SemanticType::Field,
1173                column_id: 3,
1174            })
1175            .push_column_metadata(ColumnMetadata {
1176                column_schema: ColumnSchema::new("v1", ConcreteDataType::float64_datatype(), true),
1177                semantic_type: SemanticType::Field,
1178                column_id: 4,
1179            })
1180            .primary_key(vec![0, 1]);
1181        let region_metadata = builder.build().unwrap();
1182        Arc::new(region_metadata)
1183    }
1184
1185    fn ts_value_ref(val: i64) -> ValueRef<'static> {
1186        ValueRef::Timestamp(Timestamp::new_millisecond(val))
1187    }
1188
1189    fn field_value_ref(v0: i64, v1: f64) -> impl Iterator<Item = ValueRef<'static>> {
1190        vec![ValueRef::Int64(v0), ValueRef::Float64(OrderedFloat(v1))].into_iter()
1191    }
1192
1193    fn check_values(values: &Values, expect: &[(i64, u64, u8, i64, f64)]) {
1194        let ts = values
1195            .timestamp
1196            .as_any()
1197            .downcast_ref::<TimestampMillisecondVector>()
1198            .unwrap();
1199
1200        let v0 = values.fields[0]
1201            .as_any()
1202            .downcast_ref::<Int64Vector>()
1203            .unwrap();
1204        let v1 = values.fields[1]
1205            .as_any()
1206            .downcast_ref::<Float64Vector>()
1207            .unwrap();
1208        let read = ts
1209            .iter_data()
1210            .zip(values.sequence.iter_data())
1211            .zip(values.op_type.iter_data())
1212            .zip(v0.iter_data())
1213            .zip(v1.iter_data())
1214            .map(|((((ts, sequence), op_type), v0), v1)| {
1215                (
1216                    ts.unwrap().0.value(),
1217                    sequence.unwrap(),
1218                    op_type.unwrap(),
1219                    v0.unwrap(),
1220                    v1.unwrap(),
1221                )
1222            })
1223            .collect::<Vec<_>>();
1224        assert_eq!(expect, &read);
1225    }
1226
1227    #[test]
1228    fn test_series() {
1229        let region_metadata = schema_for_test();
1230        let mut series = Series::new(&region_metadata);
1231        series.push(ts_value_ref(1), 0, OpType::Put, field_value_ref(1, 10.1));
1232        series.push(ts_value_ref(2), 0, OpType::Put, field_value_ref(2, 10.2));
1233        assert_eq!(2, series.active.timestamp.len());
1234        assert_eq!(0, series.frozen.len());
1235
1236        let values = series.compact(&region_metadata).unwrap();
1237        check_values(values, &[(1, 0, 1, 1, 10.1), (2, 0, 1, 2, 10.2)]);
1238        assert_eq!(0, series.active.timestamp.len());
1239        assert_eq!(1, series.frozen.len());
1240    }
1241
1242    #[test]
1243    fn test_series_with_nulls() {
1244        let region_metadata = schema_for_test();
1245        let mut series = Series::new(&region_metadata);
1246        // col1: NULL 1 2 3
1247        // col2: NULL NULL 10.2 NULL
1248        series.push(
1249            ts_value_ref(1),
1250            0,
1251            OpType::Put,
1252            vec![ValueRef::Null, ValueRef::Null].into_iter(),
1253        );
1254        series.push(
1255            ts_value_ref(1),
1256            0,
1257            OpType::Put,
1258            vec![ValueRef::Int64(1), ValueRef::Null].into_iter(),
1259        );
1260        series.push(ts_value_ref(1), 2, OpType::Put, field_value_ref(2, 10.2));
1261        series.push(
1262            ts_value_ref(1),
1263            3,
1264            OpType::Put,
1265            vec![ValueRef::Int64(2), ValueRef::Null].into_iter(),
1266        );
1267        assert_eq!(4, series.active.timestamp.len());
1268        assert_eq!(0, series.frozen.len());
1269
1270        let values = series.compact(&region_metadata).unwrap();
1271        assert_eq!(values.fields[0].null_count(), 1);
1272        assert_eq!(values.fields[1].null_count(), 3);
1273        assert_eq!(0, series.active.timestamp.len());
1274        assert_eq!(1, series.frozen.len());
1275    }
1276
1277    fn check_value(batch: &Batch, expect: Vec<Vec<Value>>) {
1278        let ts_len = batch.timestamps().len();
1279        assert_eq!(batch.sequences().len(), ts_len);
1280        assert_eq!(batch.op_types().len(), ts_len);
1281        for f in batch.fields() {
1282            assert_eq!(f.data.len(), ts_len);
1283        }
1284
1285        let mut rows = vec![];
1286        for idx in 0..ts_len {
1287            let mut row = Vec::with_capacity(batch.fields().len() + 3);
1288            row.push(batch.timestamps().get(idx));
1289            row.push(batch.sequences().get(idx));
1290            row.push(batch.op_types().get(idx));
1291            row.extend(batch.fields().iter().map(|f| f.data.get(idx)));
1292            rows.push(row);
1293        }
1294
1295        assert_eq!(expect.len(), rows.len());
1296        for (idx, row) in rows.iter().enumerate() {
1297            assert_eq!(&expect[idx], row);
1298        }
1299    }
1300
1301    #[test]
1302    fn test_values_sort() {
1303        let schema = schema_for_test();
1304        let timestamp = Arc::new(TimestampMillisecondVector::from_vec(vec![1, 2, 3, 4, 3]));
1305        let sequence = Arc::new(UInt64Vector::from_vec(vec![1, 1, 1, 1, 2]));
1306        let op_type = Arc::new(UInt8Vector::from_vec(vec![1, 1, 1, 1, 0]));
1307
1308        let fields = vec![
1309            Arc::new(Int64Vector::from_vec(vec![4, 3, 2, 1, 2])) as Arc<_>,
1310            Arc::new(Float64Vector::from_vec(vec![1.1, 2.1, 4.2, 3.3, 4.2])) as Arc<_>,
1311        ];
1312        let values = Values {
1313            timestamp: timestamp as Arc<_>,
1314            sequence,
1315            op_type,
1316            fields,
1317        };
1318
1319        let batch = values
1320            .to_batch(
1321                b"test",
1322                &schema,
1323                &[0, 1, 2, 3, 4].into_iter().collect(),
1324                true,
1325            )
1326            .unwrap();
1327        check_value(
1328            &batch,
1329            vec![
1330                vec![
1331                    Value::Timestamp(Timestamp::new_millisecond(1)),
1332                    Value::UInt64(1),
1333                    Value::UInt8(1),
1334                    Value::Int64(4),
1335                    Value::Float64(OrderedFloat(1.1)),
1336                ],
1337                vec![
1338                    Value::Timestamp(Timestamp::new_millisecond(2)),
1339                    Value::UInt64(1),
1340                    Value::UInt8(1),
1341                    Value::Int64(3),
1342                    Value::Float64(OrderedFloat(2.1)),
1343                ],
1344                vec![
1345                    Value::Timestamp(Timestamp::new_millisecond(3)),
1346                    Value::UInt64(2),
1347                    Value::UInt8(0),
1348                    Value::Int64(2),
1349                    Value::Float64(OrderedFloat(4.2)),
1350                ],
1351                vec![
1352                    Value::Timestamp(Timestamp::new_millisecond(4)),
1353                    Value::UInt64(1),
1354                    Value::UInt8(1),
1355                    Value::Int64(1),
1356                    Value::Float64(OrderedFloat(3.3)),
1357                ],
1358            ],
1359        )
1360    }
1361
1362    fn build_key_values(schema: &RegionMetadataRef, k0: String, k1: i64, len: usize) -> KeyValues {
1363        let column_schema = schema
1364            .column_metadatas
1365            .iter()
1366            .map(|c| api::v1::ColumnSchema {
1367                column_name: c.column_schema.name.clone(),
1368                datatype: ColumnDataTypeWrapper::try_from(c.column_schema.data_type.clone())
1369                    .unwrap()
1370                    .datatype() as i32,
1371                semantic_type: c.semantic_type as i32,
1372                ..Default::default()
1373            })
1374            .collect();
1375
1376        let rows = (0..len)
1377            .map(|i| Row {
1378                values: vec![
1379                    api::v1::Value {
1380                        value_data: Some(ValueData::StringValue(k0.clone())),
1381                    },
1382                    api::v1::Value {
1383                        value_data: Some(ValueData::I64Value(k1)),
1384                    },
1385                    api::v1::Value {
1386                        value_data: Some(ValueData::TimestampMillisecondValue(i as i64)),
1387                    },
1388                    api::v1::Value {
1389                        value_data: Some(ValueData::I64Value(i as i64)),
1390                    },
1391                    api::v1::Value {
1392                        value_data: Some(ValueData::F64Value(i as f64)),
1393                    },
1394                ],
1395            })
1396            .collect();
1397        let mutation = api::v1::Mutation {
1398            op_type: 1,
1399            sequence: 0,
1400            rows: Some(Rows {
1401                schema: column_schema,
1402                rows,
1403            }),
1404            write_hint: None,
1405        };
1406        KeyValues::new(schema.as_ref(), mutation).unwrap()
1407    }
1408
1409    #[test]
1410    fn test_series_set_concurrency() {
1411        let schema = schema_for_test();
1412        let row_codec = Arc::new(DensePrimaryKeyCodec::with_fields(
1413            schema
1414                .primary_key_columns()
1415                .map(|c| {
1416                    (
1417                        c.column_id,
1418                        SortField::new(c.column_schema.data_type.clone()),
1419                    )
1420                })
1421                .collect(),
1422        ));
1423        let set = Arc::new(SeriesSet::new(schema.clone(), row_codec));
1424
1425        let concurrency = 32;
1426        let pk_num = concurrency * 2;
1427        let mut handles = Vec::with_capacity(concurrency);
1428        for i in 0..concurrency {
1429            let set = set.clone();
1430            let schema = schema.clone();
1431            let column_schemas = schema
1432                .column_metadatas
1433                .iter()
1434                .map(column_metadata_to_column_schema)
1435                .collect::<Vec<_>>();
1436            let handle = std::thread::spawn(move || {
1437                for j in i * 100..(i + 1) * 100 {
1438                    let pk = j % pk_num;
1439                    let primary_key = format!("pk-{}", pk).as_bytes().to_vec();
1440
1441                    let kvs = KeyValues::new(
1442                        &schema,
1443                        Mutation {
1444                            op_type: OpType::Put as i32,
1445                            sequence: j as u64,
1446                            rows: Some(Rows {
1447                                schema: column_schemas.clone(),
1448                                rows: vec![Row {
1449                                    values: vec![
1450                                        api::v1::Value {
1451                                            value_data: Some(ValueData::StringValue(format!(
1452                                                "{}",
1453                                                j
1454                                            ))),
1455                                        },
1456                                        api::v1::Value {
1457                                            value_data: Some(ValueData::I64Value(j as i64)),
1458                                        },
1459                                        api::v1::Value {
1460                                            value_data: Some(ValueData::TimestampMillisecondValue(
1461                                                j as i64,
1462                                            )),
1463                                        },
1464                                        api::v1::Value {
1465                                            value_data: Some(ValueData::I64Value(j as i64)),
1466                                        },
1467                                        api::v1::Value {
1468                                            value_data: Some(ValueData::F64Value(j as f64)),
1469                                        },
1470                                    ],
1471                                }],
1472                            }),
1473                            write_hint: None,
1474                        },
1475                    )
1476                    .unwrap();
1477                    set.push_to_series(primary_key, &kvs.iter().next().unwrap());
1478                }
1479            });
1480            handles.push(handle);
1481        }
1482        for h in handles {
1483            h.join().unwrap();
1484        }
1485
1486        let mut timestamps = Vec::with_capacity(concurrency * 100);
1487        let mut sequences = Vec::with_capacity(concurrency * 100);
1488        let mut op_types = Vec::with_capacity(concurrency * 100);
1489        let mut v0 = Vec::with_capacity(concurrency * 100);
1490
1491        for i in 0..pk_num {
1492            let pk = format!("pk-{}", i).as_bytes().to_vec();
1493            let series = set.get_series(&pk).unwrap();
1494            let mut guard = series.write().unwrap();
1495            let values = guard.compact(&schema).unwrap();
1496            timestamps.extend(values.sequence.iter_data().map(|v| v.unwrap() as i64));
1497            sequences.extend(values.sequence.iter_data().map(|v| v.unwrap() as i64));
1498            op_types.extend(values.op_type.iter_data().map(|v| v.unwrap()));
1499            v0.extend(
1500                values
1501                    .fields
1502                    .first()
1503                    .unwrap()
1504                    .as_any()
1505                    .downcast_ref::<Int64Vector>()
1506                    .unwrap()
1507                    .iter_data()
1508                    .map(|v| v.unwrap()),
1509            );
1510        }
1511
1512        let expected_sequence = (0..(concurrency * 100) as i64).collect::<HashSet<_>>();
1513        assert_eq!(
1514            expected_sequence,
1515            sequences.iter().copied().collect::<HashSet<_>>()
1516        );
1517
1518        op_types.iter().all(|op| *op == OpType::Put as u8);
1519        assert_eq!(
1520            expected_sequence,
1521            timestamps.iter().copied().collect::<HashSet<_>>()
1522        );
1523
1524        assert_eq!(timestamps, sequences);
1525        assert_eq!(v0, timestamps);
1526    }
1527
1528    #[test]
1529    fn test_memtable() {
1530        common_telemetry::init_default_ut_logging();
1531        check_memtable_dedup(true);
1532        check_memtable_dedup(false);
1533    }
1534
1535    fn check_memtable_dedup(dedup: bool) {
1536        let schema = schema_for_test();
1537        let kvs = build_key_values(&schema, "hello".to_string(), 42, 100);
1538        let memtable = TimeSeriesMemtable::new(schema, 42, None, dedup, MergeMode::LastRow);
1539        memtable.write(&kvs).unwrap();
1540        memtable.write(&kvs).unwrap();
1541
1542        let mut expected_ts: HashMap<i64, usize> = HashMap::new();
1543        for ts in kvs
1544            .iter()
1545            .map(|kv| kv.timestamp().as_timestamp().unwrap().unwrap().value())
1546        {
1547            *expected_ts.entry(ts).or_default() += if dedup { 1 } else { 2 };
1548        }
1549
1550        let iter = memtable.iter(None, None, None).unwrap();
1551        let mut read = HashMap::new();
1552
1553        for ts in iter
1554            .flat_map(|batch| {
1555                batch
1556                    .unwrap()
1557                    .timestamps()
1558                    .as_any()
1559                    .downcast_ref::<TimestampMillisecondVector>()
1560                    .unwrap()
1561                    .iter_data()
1562                    .collect::<Vec<_>>()
1563                    .into_iter()
1564            })
1565            .map(|v| v.unwrap().0.value())
1566        {
1567            *read.entry(ts).or_default() += 1;
1568        }
1569        assert_eq!(expected_ts, read);
1570
1571        let stats = memtable.stats();
1572        assert!(stats.bytes_allocated() > 0);
1573        assert_eq!(
1574            Some((
1575                Timestamp::new_millisecond(0),
1576                Timestamp::new_millisecond(99)
1577            )),
1578            stats.time_range()
1579        );
1580    }
1581
1582    #[test]
1583    fn test_memtable_projection() {
1584        common_telemetry::init_default_ut_logging();
1585        let schema = schema_for_test();
1586        let kvs = build_key_values(&schema, "hello".to_string(), 42, 100);
1587        let memtable = TimeSeriesMemtable::new(schema, 42, None, true, MergeMode::LastRow);
1588        memtable.write(&kvs).unwrap();
1589
1590        let iter = memtable.iter(Some(&[3]), None, None).unwrap();
1591
1592        let mut v0_all = vec![];
1593
1594        for res in iter {
1595            let batch = res.unwrap();
1596            assert_eq!(1, batch.fields().len());
1597            let v0 = batch
1598                .fields()
1599                .first()
1600                .unwrap()
1601                .data
1602                .as_any()
1603                .downcast_ref::<Int64Vector>()
1604                .unwrap();
1605            v0_all.extend(v0.iter_data().map(|v| v.unwrap()));
1606        }
1607        assert_eq!((0..100i64).collect::<Vec<_>>(), v0_all);
1608    }
1609
1610    #[test]
1611    fn test_memtable_concurrent_write_read() {
1612        common_telemetry::init_default_ut_logging();
1613        let schema = schema_for_test();
1614        let memtable = Arc::new(TimeSeriesMemtable::new(
1615            schema.clone(),
1616            42,
1617            None,
1618            true,
1619            MergeMode::LastRow,
1620        ));
1621
1622        // Number of writer threads
1623        let num_writers = 10;
1624        // Number of reader threads
1625        let num_readers = 5;
1626        // Number of series per writer
1627        let series_per_writer = 100;
1628        // Number of rows per series
1629        let rows_per_series = 10;
1630        // Total number of series
1631        let total_series = num_writers * series_per_writer;
1632
1633        // Create a barrier to synchronize the start of all threads
1634        let barrier = Arc::new(std::sync::Barrier::new(num_writers + num_readers + 1));
1635
1636        // Spawn writer threads
1637        let mut writer_handles = Vec::with_capacity(num_writers);
1638        for writer_id in 0..num_writers {
1639            let memtable = memtable.clone();
1640            let schema = schema.clone();
1641            let barrier = barrier.clone();
1642
1643            let handle = std::thread::spawn(move || {
1644                // Wait for all threads to be ready
1645                barrier.wait();
1646
1647                // Create and write series
1648                for series_id in 0..series_per_writer {
1649                    let series_key = format!("writer-{}-series-{}", writer_id, series_id);
1650                    let kvs =
1651                        build_key_values(&schema, series_key, series_id as i64, rows_per_series);
1652                    memtable.write(&kvs).unwrap();
1653                }
1654            });
1655
1656            writer_handles.push(handle);
1657        }
1658
1659        // Spawn reader threads
1660        let mut reader_handles = Vec::with_capacity(num_readers);
1661        for _ in 0..num_readers {
1662            let memtable = memtable.clone();
1663            let barrier = barrier.clone();
1664
1665            let handle = std::thread::spawn(move || {
1666                barrier.wait();
1667
1668                for _ in 0..10 {
1669                    let iter = memtable.iter(None, None, None).unwrap();
1670                    for batch_result in iter {
1671                        let _ = batch_result.unwrap();
1672                    }
1673                }
1674            });
1675
1676            reader_handles.push(handle);
1677        }
1678
1679        barrier.wait();
1680
1681        for handle in writer_handles {
1682            handle.join().unwrap();
1683        }
1684        for handle in reader_handles {
1685            handle.join().unwrap();
1686        }
1687
1688        let iter = memtable.iter(None, None, None).unwrap();
1689        let mut series_count = 0;
1690        let mut row_count = 0;
1691
1692        for batch_result in iter {
1693            let batch = batch_result.unwrap();
1694            series_count += 1;
1695            row_count += batch.num_rows();
1696        }
1697        assert_eq!(total_series, series_count);
1698        assert_eq!(total_series * rows_per_series, row_count);
1699    }
1700}