mito2/memtable/
time_series.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::btree_map::Entry;
16use std::collections::{BTreeMap, Bound, HashSet};
17use std::fmt::{Debug, Formatter};
18use std::iter;
19use std::sync::atomic::{AtomicI64, AtomicU64, AtomicUsize, Ordering};
20use std::sync::{Arc, RwLock};
21use std::time::{Duration, Instant};
22
23use api::v1::OpType;
24use common_recordbatch::filter::SimpleFilterEvaluator;
25use common_telemetry::{debug, error};
26use common_time::Timestamp;
27use datatypes::arrow;
28use datatypes::arrow::array::ArrayRef;
29use datatypes::arrow_array::StringArray;
30use datatypes::data_type::{ConcreteDataType, DataType};
31use datatypes::prelude::{ScalarVector, Vector, VectorRef};
32use datatypes::types::TimestampType;
33use datatypes::value::{Value, ValueRef};
34use datatypes::vectors::{
35    Helper, TimestampMicrosecondVector, TimestampMillisecondVector, TimestampNanosecondVector,
36    TimestampSecondVector, UInt64Vector, UInt8Vector,
37};
38use mito_codec::key_values::KeyValue;
39use mito_codec::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt};
40use snafu::{ensure, OptionExt, ResultExt};
41use store_api::metadata::RegionMetadataRef;
42use store_api::storage::{ColumnId, SequenceNumber};
43use table::predicate::Predicate;
44
45use crate::error::{
46    self, ComputeArrowSnafu, ConvertVectorSnafu, EncodeSnafu, PrimaryKeyLengthMismatchSnafu, Result,
47};
48use crate::flush::WriteBufferManagerRef;
49use crate::memtable::builder::{FieldBuilder, StringBuilder};
50use crate::memtable::bulk::part::BulkPart;
51use crate::memtable::simple_bulk_memtable::SimpleBulkMemtable;
52use crate::memtable::stats::WriteMetrics;
53use crate::memtable::{
54    AllocTracker, BoxedBatchIterator, IterBuilder, KeyValues, MemScanMetrics, Memtable,
55    MemtableBuilder, MemtableId, MemtableRange, MemtableRangeContext, MemtableRanges, MemtableRef,
56    MemtableStats, PredicateGroup,
57};
58use crate::metrics::{
59    MEMTABLE_ACTIVE_FIELD_BUILDER_COUNT, MEMTABLE_ACTIVE_SERIES_COUNT, READ_ROWS_TOTAL,
60    READ_STAGE_ELAPSED,
61};
62use crate::read::dedup::LastNonNullIter;
63use crate::read::{Batch, BatchBuilder, BatchColumn};
64use crate::region::options::MergeMode;
65
66/// Initial vector builder capacity.
67const INITIAL_BUILDER_CAPACITY: usize = 4;
68
69/// Vector builder capacity.
70const BUILDER_CAPACITY: usize = 512;
71
72/// Builder to build [TimeSeriesMemtable].
73#[derive(Debug, Default)]
74pub struct TimeSeriesMemtableBuilder {
75    write_buffer_manager: Option<WriteBufferManagerRef>,
76    dedup: bool,
77    merge_mode: MergeMode,
78}
79
80impl TimeSeriesMemtableBuilder {
81    /// Creates a new builder with specific `write_buffer_manager`.
82    pub fn new(
83        write_buffer_manager: Option<WriteBufferManagerRef>,
84        dedup: bool,
85        merge_mode: MergeMode,
86    ) -> Self {
87        Self {
88            write_buffer_manager,
89            dedup,
90            merge_mode,
91        }
92    }
93}
94
95impl MemtableBuilder for TimeSeriesMemtableBuilder {
96    fn build(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef {
97        if metadata.primary_key.is_empty() {
98            Arc::new(SimpleBulkMemtable::new(
99                id,
100                metadata.clone(),
101                self.write_buffer_manager.clone(),
102                self.dedup,
103                self.merge_mode,
104            ))
105        } else {
106            Arc::new(TimeSeriesMemtable::new(
107                metadata.clone(),
108                id,
109                self.write_buffer_manager.clone(),
110                self.dedup,
111                self.merge_mode,
112            ))
113        }
114    }
115
116    fn use_bulk_insert(&self, _metadata: &RegionMetadataRef) -> bool {
117        // Now if we can use simple bulk memtable, the input request is already
118        // a bulk write request and won't call this method.
119        false
120    }
121}
122
123/// Memtable implementation that groups rows by their primary key.
124pub struct TimeSeriesMemtable {
125    id: MemtableId,
126    region_metadata: RegionMetadataRef,
127    row_codec: Arc<DensePrimaryKeyCodec>,
128    series_set: SeriesSet,
129    alloc_tracker: AllocTracker,
130    max_timestamp: AtomicI64,
131    min_timestamp: AtomicI64,
132    max_sequence: AtomicU64,
133    dedup: bool,
134    merge_mode: MergeMode,
135    /// Total written rows in memtable. This also includes deleted and duplicated rows.
136    num_rows: AtomicUsize,
137}
138
139impl TimeSeriesMemtable {
140    pub fn new(
141        region_metadata: RegionMetadataRef,
142        id: MemtableId,
143        write_buffer_manager: Option<WriteBufferManagerRef>,
144        dedup: bool,
145        merge_mode: MergeMode,
146    ) -> Self {
147        let row_codec = Arc::new(DensePrimaryKeyCodec::new(&region_metadata));
148        let series_set = SeriesSet::new(region_metadata.clone(), row_codec.clone());
149        let dedup = if merge_mode == MergeMode::LastNonNull {
150            false
151        } else {
152            dedup
153        };
154        Self {
155            id,
156            region_metadata,
157            series_set,
158            row_codec,
159            alloc_tracker: AllocTracker::new(write_buffer_manager),
160            max_timestamp: AtomicI64::new(i64::MIN),
161            min_timestamp: AtomicI64::new(i64::MAX),
162            max_sequence: AtomicU64::new(0),
163            dedup,
164            merge_mode,
165            num_rows: Default::default(),
166        }
167    }
168
169    /// Updates memtable stats.
170    fn update_stats(&self, stats: WriteMetrics) {
171        self.alloc_tracker
172            .on_allocation(stats.key_bytes + stats.value_bytes);
173        self.max_timestamp.fetch_max(stats.max_ts, Ordering::SeqCst);
174        self.min_timestamp.fetch_min(stats.min_ts, Ordering::SeqCst);
175        self.max_sequence
176            .fetch_max(stats.max_sequence, Ordering::SeqCst);
177        self.num_rows.fetch_add(stats.num_rows, Ordering::SeqCst);
178    }
179
180    fn write_key_value(&self, kv: KeyValue, stats: &mut WriteMetrics) -> Result<()> {
181        ensure!(
182            self.row_codec.num_fields() == kv.num_primary_keys(),
183            PrimaryKeyLengthMismatchSnafu {
184                expect: self.row_codec.num_fields(),
185                actual: kv.num_primary_keys(),
186            }
187        );
188
189        let primary_key_encoded = self
190            .row_codec
191            .encode(kv.primary_keys())
192            .context(EncodeSnafu)?;
193
194        let (key_allocated, value_allocated) =
195            self.series_set.push_to_series(primary_key_encoded, &kv);
196        stats.key_bytes += key_allocated;
197        stats.value_bytes += value_allocated;
198
199        // safety: timestamp of kv must be both present and a valid timestamp value.
200        let ts = kv.timestamp().as_timestamp().unwrap().unwrap().value();
201        stats.min_ts = stats.min_ts.min(ts);
202        stats.max_ts = stats.max_ts.max(ts);
203        Ok(())
204    }
205}
206
207impl Debug for TimeSeriesMemtable {
208    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
209        f.debug_struct("TimeSeriesMemtable").finish()
210    }
211}
212
213impl Memtable for TimeSeriesMemtable {
214    fn id(&self) -> MemtableId {
215        self.id
216    }
217
218    fn write(&self, kvs: &KeyValues) -> Result<()> {
219        if kvs.is_empty() {
220            return Ok(());
221        }
222
223        let mut local_stats = WriteMetrics::default();
224
225        for kv in kvs.iter() {
226            self.write_key_value(kv, &mut local_stats)?;
227        }
228        local_stats.value_bytes += kvs.num_rows() * std::mem::size_of::<Timestamp>();
229        local_stats.value_bytes += kvs.num_rows() * std::mem::size_of::<OpType>();
230        local_stats.max_sequence = kvs.max_sequence();
231        local_stats.num_rows = kvs.num_rows();
232        // TODO(hl): this maybe inaccurate since for-iteration may return early.
233        // We may lift the primary key length check out of Memtable::write
234        // so that we can ensure writing to memtable will succeed.
235        self.update_stats(local_stats);
236        Ok(())
237    }
238
239    fn write_one(&self, key_value: KeyValue) -> Result<()> {
240        let mut metrics = WriteMetrics::default();
241        let res = self.write_key_value(key_value, &mut metrics);
242        metrics.value_bytes += std::mem::size_of::<Timestamp>() + std::mem::size_of::<OpType>();
243        metrics.max_sequence = key_value.sequence();
244        metrics.num_rows = 1;
245
246        if res.is_ok() {
247            self.update_stats(metrics);
248        }
249        res
250    }
251
252    fn write_bulk(&self, part: BulkPart) -> Result<()> {
253        // Default implementation fallback to row iteration.
254        let mutation = part.to_mutation(&self.region_metadata)?;
255        let mut metrics = WriteMetrics::default();
256        if let Some(key_values) = KeyValues::new(&self.region_metadata, mutation) {
257            for kv in key_values.iter() {
258                self.write_key_value(kv, &mut metrics)?
259            }
260        }
261
262        metrics.max_sequence = part.sequence;
263        metrics.max_ts = part.max_ts;
264        metrics.min_ts = part.min_ts;
265        metrics.num_rows = part.num_rows();
266        self.update_stats(metrics);
267        Ok(())
268    }
269
270    #[cfg(any(test, feature = "test"))]
271    fn iter(
272        &self,
273        projection: Option<&[ColumnId]>,
274        filters: Option<Predicate>,
275        sequence: Option<SequenceNumber>,
276    ) -> Result<BoxedBatchIterator> {
277        let projection = if let Some(projection) = projection {
278            projection.iter().copied().collect()
279        } else {
280            self.region_metadata
281                .field_columns()
282                .map(|c| c.column_id)
283                .collect()
284        };
285
286        let iter = self
287            .series_set
288            .iter_series(projection, filters, self.dedup, sequence, None)?;
289
290        if self.merge_mode == MergeMode::LastNonNull {
291            let iter = LastNonNullIter::new(iter);
292            Ok(Box::new(iter))
293        } else {
294            Ok(Box::new(iter))
295        }
296    }
297
298    fn ranges(
299        &self,
300        projection: Option<&[ColumnId]>,
301        predicate: PredicateGroup,
302        sequence: Option<SequenceNumber>,
303    ) -> Result<MemtableRanges> {
304        let projection = if let Some(projection) = projection {
305            projection.iter().copied().collect()
306        } else {
307            self.region_metadata
308                .field_columns()
309                .map(|c| c.column_id)
310                .collect()
311        };
312        let builder = Box::new(TimeSeriesIterBuilder {
313            series_set: self.series_set.clone(),
314            projection,
315            predicate: predicate.predicate().cloned(),
316            dedup: self.dedup,
317            merge_mode: self.merge_mode,
318            sequence,
319        });
320        let context = Arc::new(MemtableRangeContext::new(self.id, builder, predicate));
321
322        let stats = self.stats();
323        Ok(MemtableRanges {
324            ranges: [(0, MemtableRange::new(context, stats.num_rows))].into(),
325            stats,
326        })
327    }
328
329    fn is_empty(&self) -> bool {
330        self.series_set.series.read().unwrap().0.is_empty()
331    }
332
333    fn freeze(&self) -> Result<()> {
334        self.alloc_tracker.done_allocating();
335
336        Ok(())
337    }
338
339    fn stats(&self) -> MemtableStats {
340        let estimated_bytes = self.alloc_tracker.bytes_allocated();
341
342        if estimated_bytes == 0 {
343            // no rows ever written
344            return MemtableStats {
345                estimated_bytes,
346                time_range: None,
347                num_rows: 0,
348                num_ranges: 0,
349                max_sequence: 0,
350                series_count: 0,
351            };
352        }
353        let ts_type = self
354            .region_metadata
355            .time_index_column()
356            .column_schema
357            .data_type
358            .clone()
359            .as_timestamp()
360            .expect("Timestamp column must have timestamp type");
361        let max_timestamp = ts_type.create_timestamp(self.max_timestamp.load(Ordering::Relaxed));
362        let min_timestamp = ts_type.create_timestamp(self.min_timestamp.load(Ordering::Relaxed));
363        let series_count = self.series_set.series.read().unwrap().0.len();
364        MemtableStats {
365            estimated_bytes,
366            time_range: Some((min_timestamp, max_timestamp)),
367            num_rows: self.num_rows.load(Ordering::Relaxed),
368            num_ranges: 1,
369            max_sequence: self.max_sequence.load(Ordering::Relaxed),
370            series_count,
371        }
372    }
373
374    fn fork(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef {
375        Arc::new(TimeSeriesMemtable::new(
376            metadata.clone(),
377            id,
378            self.alloc_tracker.write_buffer_manager(),
379            self.dedup,
380            self.merge_mode,
381        ))
382    }
383}
384
385#[derive(Default)]
386struct SeriesMap(BTreeMap<Vec<u8>, Arc<RwLock<Series>>>);
387
388impl Drop for SeriesMap {
389    fn drop(&mut self) {
390        let num_series = self.0.len();
391        let num_field_builders = self
392            .0
393            .values()
394            .map(|v| v.read().unwrap().active.num_field_builders())
395            .sum::<usize>();
396        MEMTABLE_ACTIVE_SERIES_COUNT.sub(num_series as i64);
397        MEMTABLE_ACTIVE_FIELD_BUILDER_COUNT.sub(num_field_builders as i64);
398    }
399}
400
401#[derive(Clone)]
402pub(crate) struct SeriesSet {
403    region_metadata: RegionMetadataRef,
404    series: Arc<RwLock<SeriesMap>>,
405    codec: Arc<DensePrimaryKeyCodec>,
406}
407
408impl SeriesSet {
409    fn new(region_metadata: RegionMetadataRef, codec: Arc<DensePrimaryKeyCodec>) -> Self {
410        Self {
411            region_metadata,
412            series: Default::default(),
413            codec,
414        }
415    }
416}
417
418impl SeriesSet {
419    /// Push [KeyValue] to SeriesSet with given primary key and return key/value allocated memory size.
420    fn push_to_series(&self, primary_key: Vec<u8>, kv: &KeyValue) -> (usize, usize) {
421        if let Some(series) = self.series.read().unwrap().0.get(&primary_key) {
422            let value_allocated = series.write().unwrap().push(
423                kv.timestamp(),
424                kv.sequence(),
425                kv.op_type(),
426                kv.fields(),
427            );
428            return (0, value_allocated);
429        };
430
431        let mut indices = self.series.write().unwrap();
432        match indices.0.entry(primary_key) {
433            Entry::Vacant(v) => {
434                let key_len = v.key().len();
435                let mut series = Series::new(&self.region_metadata);
436                let value_allocated =
437                    series.push(kv.timestamp(), kv.sequence(), kv.op_type(), kv.fields());
438                v.insert(Arc::new(RwLock::new(series)));
439                (key_len, value_allocated)
440            }
441            // safety: series must exist at given index.
442            Entry::Occupied(v) => {
443                let value_allocated = v.get().write().unwrap().push(
444                    kv.timestamp(),
445                    kv.sequence(),
446                    kv.op_type(),
447                    kv.fields(),
448                );
449                (0, value_allocated)
450            }
451        }
452    }
453
454    #[cfg(test)]
455    fn get_series(&self, primary_key: &[u8]) -> Option<Arc<RwLock<Series>>> {
456        self.series.read().unwrap().0.get(primary_key).cloned()
457    }
458
459    /// Iterates all series in [SeriesSet].
460    fn iter_series(
461        &self,
462        projection: HashSet<ColumnId>,
463        predicate: Option<Predicate>,
464        dedup: bool,
465        sequence: Option<SequenceNumber>,
466        mem_scan_metrics: Option<MemScanMetrics>,
467    ) -> Result<Iter> {
468        let primary_key_schema = primary_key_schema(&self.region_metadata);
469        let primary_key_datatypes = self
470            .region_metadata
471            .primary_key_columns()
472            .map(|pk| pk.column_schema.data_type.clone())
473            .collect();
474
475        Iter::try_new(
476            self.region_metadata.clone(),
477            self.series.clone(),
478            projection,
479            predicate,
480            primary_key_schema,
481            primary_key_datatypes,
482            self.codec.clone(),
483            dedup,
484            sequence,
485            mem_scan_metrics,
486        )
487    }
488}
489
490/// Creates an arrow [SchemaRef](arrow::datatypes::SchemaRef) that only contains primary keys
491/// of given region schema
492pub(crate) fn primary_key_schema(
493    region_metadata: &RegionMetadataRef,
494) -> arrow::datatypes::SchemaRef {
495    let fields = region_metadata
496        .primary_key_columns()
497        .map(|pk| {
498            arrow::datatypes::Field::new(
499                pk.column_schema.name.clone(),
500                pk.column_schema.data_type.as_arrow_type(),
501                pk.column_schema.is_nullable(),
502            )
503        })
504        .collect::<Vec<_>>();
505    Arc::new(arrow::datatypes::Schema::new(fields))
506}
507
508/// Metrics for reading the memtable.
509#[derive(Debug, Default)]
510struct Metrics {
511    /// Total series in the memtable.
512    total_series: usize,
513    /// Number of series pruned.
514    num_pruned_series: usize,
515    /// Number of rows read.
516    num_rows: usize,
517    /// Number of batch read.
518    num_batches: usize,
519    /// Duration to scan the memtable.
520    scan_cost: Duration,
521}
522
523struct Iter {
524    metadata: RegionMetadataRef,
525    series: Arc<RwLock<SeriesMap>>,
526    projection: HashSet<ColumnId>,
527    last_key: Option<Vec<u8>>,
528    predicate: Vec<SimpleFilterEvaluator>,
529    pk_schema: arrow::datatypes::SchemaRef,
530    pk_datatypes: Vec<ConcreteDataType>,
531    codec: Arc<DensePrimaryKeyCodec>,
532    dedup: bool,
533    sequence: Option<SequenceNumber>,
534    metrics: Metrics,
535    mem_scan_metrics: Option<MemScanMetrics>,
536}
537
538impl Iter {
539    #[allow(clippy::too_many_arguments)]
540    pub(crate) fn try_new(
541        metadata: RegionMetadataRef,
542        series: Arc<RwLock<SeriesMap>>,
543        projection: HashSet<ColumnId>,
544        predicate: Option<Predicate>,
545        pk_schema: arrow::datatypes::SchemaRef,
546        pk_datatypes: Vec<ConcreteDataType>,
547        codec: Arc<DensePrimaryKeyCodec>,
548        dedup: bool,
549        sequence: Option<SequenceNumber>,
550        mem_scan_metrics: Option<MemScanMetrics>,
551    ) -> Result<Self> {
552        let predicate = predicate
553            .map(|predicate| {
554                predicate
555                    .exprs()
556                    .iter()
557                    .filter_map(SimpleFilterEvaluator::try_new)
558                    .collect::<Vec<_>>()
559            })
560            .unwrap_or_default();
561        Ok(Self {
562            metadata,
563            series,
564            projection,
565            last_key: None,
566            predicate,
567            pk_schema,
568            pk_datatypes,
569            codec,
570            dedup,
571            sequence,
572            metrics: Metrics::default(),
573            mem_scan_metrics,
574        })
575    }
576
577    fn report_mem_scan_metrics(&mut self) {
578        if let Some(mem_scan_metrics) = self.mem_scan_metrics.take() {
579            let inner = crate::memtable::MemScanMetricsData {
580                total_series: self.metrics.total_series,
581                num_rows: self.metrics.num_rows,
582                num_batches: self.metrics.num_batches,
583                scan_cost: self.metrics.scan_cost,
584            };
585            mem_scan_metrics.merge_inner(&inner);
586        }
587    }
588}
589
590impl Drop for Iter {
591    fn drop(&mut self) {
592        debug!(
593            "Iter {} time series memtable, metrics: {:?}",
594            self.metadata.region_id, self.metrics
595        );
596
597        // Report MemScanMetrics if not already reported
598        self.report_mem_scan_metrics();
599
600        READ_ROWS_TOTAL
601            .with_label_values(&["time_series_memtable"])
602            .inc_by(self.metrics.num_rows as u64);
603        READ_STAGE_ELAPSED
604            .with_label_values(&["scan_memtable"])
605            .observe(self.metrics.scan_cost.as_secs_f64());
606    }
607}
608
609impl Iterator for Iter {
610    type Item = Result<Batch>;
611
612    fn next(&mut self) -> Option<Self::Item> {
613        let start = Instant::now();
614        let map = self.series.read().unwrap();
615        let range = match &self.last_key {
616            None => map.0.range::<Vec<u8>, _>(..),
617            Some(last_key) => map
618                .0
619                .range::<Vec<u8>, _>((Bound::Excluded(last_key), Bound::Unbounded)),
620        };
621
622        // TODO(hl): maybe yield more than one time series to amortize range overhead.
623        for (primary_key, series) in range {
624            self.metrics.total_series += 1;
625
626            let mut series = series.write().unwrap();
627            if !self.predicate.is_empty()
628                && !prune_primary_key(
629                    &self.codec,
630                    primary_key.as_slice(),
631                    &mut series,
632                    &self.pk_datatypes,
633                    self.pk_schema.clone(),
634                    &self.predicate,
635                )
636            {
637                // read next series
638                self.metrics.num_pruned_series += 1;
639                continue;
640            }
641            self.last_key = Some(primary_key.clone());
642
643            let values = series.compact(&self.metadata);
644            let batch = values.and_then(|v| {
645                v.to_batch(primary_key, &self.metadata, &self.projection, self.dedup)
646            });
647
648            // Update metrics.
649            self.metrics.num_batches += 1;
650            self.metrics.num_rows += batch.as_ref().map(|b| b.num_rows()).unwrap_or(0);
651            self.metrics.scan_cost += start.elapsed();
652
653            let mut batch = batch;
654            batch = batch.and_then(|mut batch| {
655                batch.filter_by_sequence(self.sequence)?;
656                Ok(batch)
657            });
658            return Some(batch);
659        }
660        drop(map); // Explicitly drop the read lock
661        self.metrics.scan_cost += start.elapsed();
662
663        // Report MemScanMetrics before returning None
664        self.report_mem_scan_metrics();
665
666        None
667    }
668}
669
670fn prune_primary_key(
671    codec: &Arc<DensePrimaryKeyCodec>,
672    pk: &[u8],
673    series: &mut Series,
674    datatypes: &[ConcreteDataType],
675    pk_schema: arrow::datatypes::SchemaRef,
676    predicates: &[SimpleFilterEvaluator],
677) -> bool {
678    // no primary key, we simply return true.
679    if pk_schema.fields().is_empty() {
680        return true;
681    }
682
683    // retrieve primary key values from cache or decode from bytes.
684    let pk_values = if let Some(pk_values) = series.pk_cache.as_ref() {
685        pk_values
686    } else {
687        let pk_values = codec.decode_dense_without_column_id(pk);
688        if let Err(e) = pk_values {
689            error!(e; "Failed to decode primary key");
690            return true;
691        }
692        series.update_pk_cache(pk_values.unwrap());
693        series.pk_cache.as_ref().unwrap()
694    };
695
696    // evaluate predicates against primary key values
697    let mut result = true;
698    for predicate in predicates {
699        // ignore predicates that are not referencing primary key columns
700        let Ok(index) = pk_schema.index_of(predicate.column_name()) else {
701            continue;
702        };
703        // Safety: arrow schema and datatypes are constructed from the same source.
704        let scalar_value = pk_values[index]
705            .try_to_scalar_value(&datatypes[index])
706            .unwrap();
707        result &= predicate.evaluate_scalar(&scalar_value).unwrap_or(true);
708    }
709
710    result
711}
712
713/// A `Series` holds a list of field values of some given primary key.
714pub struct Series {
715    pk_cache: Option<Vec<Value>>,
716    active: ValueBuilder,
717    frozen: Vec<Values>,
718    region_metadata: RegionMetadataRef,
719    capacity: usize,
720}
721
722impl Series {
723    pub(crate) fn with_capacity(
724        region_metadata: &RegionMetadataRef,
725        init_capacity: usize,
726        capacity: usize,
727    ) -> Self {
728        MEMTABLE_ACTIVE_SERIES_COUNT.inc();
729        Self {
730            pk_cache: None,
731            active: ValueBuilder::new(region_metadata, init_capacity),
732            frozen: vec![],
733            region_metadata: region_metadata.clone(),
734            capacity,
735        }
736    }
737
738    pub(crate) fn new(region_metadata: &RegionMetadataRef) -> Self {
739        Self::with_capacity(region_metadata, INITIAL_BUILDER_CAPACITY, BUILDER_CAPACITY)
740    }
741
742    pub fn is_empty(&self) -> bool {
743        self.active.len() == 0 && self.frozen.is_empty()
744    }
745
746    /// Pushes a row of values into Series. Return the size of values.
747    pub(crate) fn push<'a>(
748        &mut self,
749        ts: ValueRef<'a>,
750        sequence: u64,
751        op_type: OpType,
752        values: impl Iterator<Item = ValueRef<'a>>,
753    ) -> usize {
754        // + 10 to avoid potential reallocation.
755        if self.active.len() + 10 > self.capacity {
756            let region_metadata = self.region_metadata.clone();
757            self.freeze(&region_metadata);
758        }
759        self.active.push(ts, sequence, op_type as u8, values)
760    }
761
762    fn update_pk_cache(&mut self, pk_values: Vec<Value>) {
763        self.pk_cache = Some(pk_values);
764    }
765
766    /// Freezes the active part and push it to `frozen`.
767    pub(crate) fn freeze(&mut self, region_metadata: &RegionMetadataRef) {
768        if self.active.len() != 0 {
769            let mut builder = ValueBuilder::new(region_metadata, INITIAL_BUILDER_CAPACITY);
770            std::mem::swap(&mut self.active, &mut builder);
771            self.frozen.push(Values::from(builder));
772        }
773    }
774
775    pub(crate) fn extend(
776        &mut self,
777        ts_v: VectorRef,
778        op_type_v: u8,
779        sequence_v: u64,
780        fields: Vec<VectorRef>,
781    ) -> Result<()> {
782        if !self.active.can_accommodate(&fields)? {
783            let region_metadata = self.region_metadata.clone();
784            self.freeze(&region_metadata);
785        }
786        self.active.extend(ts_v, op_type_v, sequence_v, fields)
787    }
788
789    /// Freezes active part to frozen part and compact frozen part to reduce memory fragmentation.
790    /// Returns the frozen and compacted values.
791    pub(crate) fn compact(&mut self, region_metadata: &RegionMetadataRef) -> Result<&Values> {
792        self.freeze(region_metadata);
793
794        let frozen = &self.frozen;
795
796        // Each series must contain at least one row
797        debug_assert!(!frozen.is_empty());
798
799        if frozen.len() > 1 {
800            // TODO(hl): We should keep track of min/max timestamps for each values and avoid
801            // cloning and sorting when values do not overlap with each other.
802
803            let column_size = frozen[0].fields.len() + 3;
804
805            if cfg!(debug_assertions) {
806                debug_assert!(frozen
807                    .iter()
808                    .zip(frozen.iter().skip(1))
809                    .all(|(prev, next)| { prev.fields.len() == next.fields.len() }));
810            }
811
812            let arrays = frozen.iter().map(|v| v.columns()).collect::<Vec<_>>();
813            let concatenated = (0..column_size)
814                .map(|i| {
815                    let to_concat = arrays.iter().map(|a| a[i].as_ref()).collect::<Vec<_>>();
816                    arrow::compute::concat(&to_concat)
817                })
818                .collect::<std::result::Result<Vec<_>, _>>()
819                .context(ComputeArrowSnafu)?;
820
821            debug_assert_eq!(concatenated.len(), column_size);
822            let values = Values::from_columns(&concatenated)?;
823            self.frozen = vec![values];
824        };
825        Ok(&self.frozen[0])
826    }
827
828    pub fn read_to_values(&self) -> Vec<Values> {
829        let mut res = Vec::with_capacity(self.frozen.len() + 1);
830        res.extend(self.frozen.iter().cloned());
831        res.push(self.active.finish_cloned());
832        res
833    }
834}
835
836/// `ValueBuilder` holds all the vector builders for field columns.
837pub(crate) struct ValueBuilder {
838    timestamp: Vec<i64>,
839    timestamp_type: ConcreteDataType,
840    sequence: Vec<u64>,
841    op_type: Vec<u8>,
842    fields: Vec<Option<FieldBuilder>>,
843    field_types: Vec<ConcreteDataType>,
844}
845
846impl ValueBuilder {
847    pub(crate) fn new(region_metadata: &RegionMetadataRef, capacity: usize) -> Self {
848        let timestamp_type = region_metadata
849            .time_index_column()
850            .column_schema
851            .data_type
852            .clone();
853        let sequence = Vec::with_capacity(capacity);
854        let op_type = Vec::with_capacity(capacity);
855
856        let field_types = region_metadata
857            .field_columns()
858            .map(|c| c.column_schema.data_type.clone())
859            .collect::<Vec<_>>();
860        let fields = (0..field_types.len()).map(|_| None).collect();
861        Self {
862            timestamp: Vec::with_capacity(capacity),
863            timestamp_type,
864            sequence,
865            op_type,
866            fields,
867            field_types,
868        }
869    }
870
871    /// Returns number of field builders.
872    pub fn num_field_builders(&self) -> usize {
873        self.fields.iter().flatten().count()
874    }
875
876    /// Pushes a new row to `ValueBuilder`.
877    /// We don't need primary keys since they've already be encoded.
878    /// Returns the size of field values.
879    ///
880    /// In this method, we don't check the data type of the value, because it is already checked in the caller.
881    pub(crate) fn push<'a>(
882        &mut self,
883        ts: ValueRef,
884        sequence: u64,
885        op_type: u8,
886        fields: impl Iterator<Item = ValueRef<'a>>,
887    ) -> usize {
888        #[cfg(debug_assertions)]
889        let fields = {
890            let field_vec = fields.collect::<Vec<_>>();
891            debug_assert_eq!(field_vec.len(), self.fields.len());
892            field_vec.into_iter()
893        };
894
895        self.timestamp
896            .push(ts.as_timestamp().unwrap().unwrap().value());
897        self.sequence.push(sequence);
898        self.op_type.push(op_type);
899        let num_rows = self.timestamp.len();
900        let mut size = 0;
901        for (idx, field_value) in fields.enumerate() {
902            size += field_value.data_size();
903            if !field_value.is_null() || self.fields[idx].is_some() {
904                if let Some(field) = self.fields[idx].as_mut() {
905                    let _ = field.push(field_value);
906                } else {
907                    let mut mutable_vector =
908                        if let ConcreteDataType::String(_) = &self.field_types[idx] {
909                            FieldBuilder::String(StringBuilder::with_capacity(4, 8))
910                        } else {
911                            FieldBuilder::Other(
912                                self.field_types[idx]
913                                    .create_mutable_vector(num_rows.max(INITIAL_BUILDER_CAPACITY)),
914                            )
915                        };
916                    mutable_vector.push_nulls(num_rows - 1);
917                    let _ = mutable_vector.push(field_value);
918                    self.fields[idx] = Some(mutable_vector);
919                    MEMTABLE_ACTIVE_FIELD_BUILDER_COUNT.inc();
920                }
921            }
922        }
923
924        size
925    }
926
927    /// Checks if current value builder have sufficient space to accommodate `fields`.
928    /// Returns false if there is no space to accommodate fields due to offset overflow.
929    pub(crate) fn can_accommodate(&self, fields: &[VectorRef]) -> Result<bool> {
930        for (field_src, field_dest) in fields.iter().zip(self.fields.iter()) {
931            let Some(builder) = field_dest else {
932                continue;
933            };
934            let FieldBuilder::String(builder) = builder else {
935                continue;
936            };
937            let array = field_src.to_arrow_array();
938            let string_array = array
939                .as_any()
940                .downcast_ref::<StringArray>()
941                .with_context(|| error::InvalidBatchSnafu {
942                    reason: format!(
943                        "Field type mismatch, expecting String, given: {}",
944                        field_src.data_type()
945                    ),
946                })?;
947            let space_needed = string_array.value_data().len() as i32;
948            // offset may overflow
949            if builder.next_offset().checked_add(space_needed).is_none() {
950                return Ok(false);
951            }
952        }
953        Ok(true)
954    }
955
956    pub(crate) fn extend(
957        &mut self,
958        ts_v: VectorRef,
959        op_type: u8,
960        sequence: u64,
961        fields: Vec<VectorRef>,
962    ) -> Result<()> {
963        let num_rows_before = self.timestamp.len();
964        let num_rows_to_write = ts_v.len();
965        self.timestamp.reserve(num_rows_to_write);
966        match self.timestamp_type {
967            ConcreteDataType::Timestamp(TimestampType::Second(_)) => {
968                self.timestamp.extend(
969                    ts_v.as_any()
970                        .downcast_ref::<TimestampSecondVector>()
971                        .unwrap()
972                        .iter_data()
973                        .map(|v| v.unwrap().0.value()),
974                );
975            }
976            ConcreteDataType::Timestamp(TimestampType::Millisecond(_)) => {
977                self.timestamp.extend(
978                    ts_v.as_any()
979                        .downcast_ref::<TimestampMillisecondVector>()
980                        .unwrap()
981                        .iter_data()
982                        .map(|v| v.unwrap().0.value()),
983                );
984            }
985            ConcreteDataType::Timestamp(TimestampType::Microsecond(_)) => {
986                self.timestamp.extend(
987                    ts_v.as_any()
988                        .downcast_ref::<TimestampMicrosecondVector>()
989                        .unwrap()
990                        .iter_data()
991                        .map(|v| v.unwrap().0.value()),
992                );
993            }
994            ConcreteDataType::Timestamp(TimestampType::Nanosecond(_)) => {
995                self.timestamp.extend(
996                    ts_v.as_any()
997                        .downcast_ref::<TimestampNanosecondVector>()
998                        .unwrap()
999                        .iter_data()
1000                        .map(|v| v.unwrap().0.value()),
1001                );
1002            }
1003            _ => unreachable!(),
1004        };
1005
1006        self.op_type.reserve(num_rows_to_write);
1007        self.op_type
1008            .extend(iter::repeat_n(op_type, num_rows_to_write));
1009        self.sequence.reserve(num_rows_to_write);
1010        self.sequence
1011            .extend(iter::repeat_n(sequence, num_rows_to_write));
1012
1013        for (field_idx, (field_src, field_dest)) in
1014            fields.into_iter().zip(self.fields.iter_mut()).enumerate()
1015        {
1016            let builder = field_dest.get_or_insert_with(|| {
1017                let mut field_builder =
1018                    FieldBuilder::create(&self.field_types[field_idx], INITIAL_BUILDER_CAPACITY);
1019                field_builder.push_nulls(num_rows_before);
1020                field_builder
1021            });
1022            match builder {
1023                FieldBuilder::String(builder) => {
1024                    let array = field_src.to_arrow_array();
1025                    let string_array =
1026                        array
1027                            .as_any()
1028                            .downcast_ref::<StringArray>()
1029                            .with_context(|| error::InvalidBatchSnafu {
1030                                reason: format!(
1031                                    "Field type mismatch, expecting String, given: {}",
1032                                    field_src.data_type()
1033                                ),
1034                            })?;
1035                    builder.append_array(string_array);
1036                }
1037                FieldBuilder::Other(builder) => {
1038                    let len = field_src.len();
1039                    builder
1040                        .extend_slice_of(&*field_src, 0, len)
1041                        .context(error::ComputeVectorSnafu)?;
1042                }
1043            }
1044        }
1045        Ok(())
1046    }
1047
1048    /// Returns the length of [ValueBuilder]
1049    fn len(&self) -> usize {
1050        let sequence_len = self.sequence.len();
1051        debug_assert_eq!(sequence_len, self.op_type.len());
1052        debug_assert_eq!(sequence_len, self.timestamp.len());
1053        sequence_len
1054    }
1055
1056    fn finish_cloned(&self) -> Values {
1057        let num_rows = self.sequence.len();
1058        let fields = self
1059            .fields
1060            .iter()
1061            .enumerate()
1062            .map(|(i, v)| {
1063                if let Some(v) = v {
1064                    MEMTABLE_ACTIVE_FIELD_BUILDER_COUNT.dec();
1065                    v.finish_cloned()
1066                } else {
1067                    let mut single_null = self.field_types[i].create_mutable_vector(num_rows);
1068                    single_null.push_nulls(num_rows);
1069                    single_null.to_vector()
1070                }
1071            })
1072            .collect::<Vec<_>>();
1073
1074        let sequence = Arc::new(UInt64Vector::from_vec(self.sequence.clone()));
1075        let op_type = Arc::new(UInt8Vector::from_vec(self.op_type.clone()));
1076        let timestamp: VectorRef = match self.timestamp_type {
1077            ConcreteDataType::Timestamp(TimestampType::Second(_)) => {
1078                Arc::new(TimestampSecondVector::from_vec(self.timestamp.clone()))
1079            }
1080            ConcreteDataType::Timestamp(TimestampType::Millisecond(_)) => {
1081                Arc::new(TimestampMillisecondVector::from_vec(self.timestamp.clone()))
1082            }
1083            ConcreteDataType::Timestamp(TimestampType::Microsecond(_)) => {
1084                Arc::new(TimestampMicrosecondVector::from_vec(self.timestamp.clone()))
1085            }
1086            ConcreteDataType::Timestamp(TimestampType::Nanosecond(_)) => {
1087                Arc::new(TimestampNanosecondVector::from_vec(self.timestamp.clone()))
1088            }
1089            _ => unreachable!(),
1090        };
1091
1092        if cfg!(debug_assertions) {
1093            debug_assert_eq!(timestamp.len(), sequence.len());
1094            debug_assert_eq!(timestamp.len(), op_type.len());
1095            for field in &fields {
1096                debug_assert_eq!(timestamp.len(), field.len());
1097            }
1098        }
1099
1100        Values {
1101            timestamp,
1102            sequence,
1103            op_type,
1104            fields,
1105        }
1106    }
1107}
1108
1109/// [Values] holds an immutable vectors of field columns, including `sequence` and `op_type`.
1110#[derive(Clone)]
1111pub struct Values {
1112    pub(crate) timestamp: VectorRef,
1113    pub(crate) sequence: Arc<UInt64Vector>,
1114    pub(crate) op_type: Arc<UInt8Vector>,
1115    pub(crate) fields: Vec<VectorRef>,
1116}
1117
1118impl Values {
1119    /// Converts [Values] to `Batch`, sorts the batch according to `timestamp, sequence` desc and
1120    /// keeps only the latest row for the same timestamp.
1121    pub fn to_batch(
1122        &self,
1123        primary_key: &[u8],
1124        metadata: &RegionMetadataRef,
1125        projection: &HashSet<ColumnId>,
1126        dedup: bool,
1127    ) -> Result<Batch> {
1128        let builder = BatchBuilder::with_required_columns(
1129            primary_key.to_vec(),
1130            self.timestamp.clone(),
1131            self.sequence.clone(),
1132            self.op_type.clone(),
1133        );
1134
1135        let fields = metadata
1136            .field_columns()
1137            .zip(self.fields.iter())
1138            .filter_map(|(c, f)| {
1139                projection.get(&c.column_id).map(|c| BatchColumn {
1140                    column_id: *c,
1141                    data: f.clone(),
1142                })
1143            })
1144            .collect();
1145
1146        let mut batch = builder.with_fields(fields).build()?;
1147        batch.sort(dedup)?;
1148        Ok(batch)
1149    }
1150
1151    /// Returns a vector of all columns converted to arrow [Array](datatypes::arrow::array::Array) in [Values].
1152    fn columns(&self) -> Vec<ArrayRef> {
1153        let mut res = Vec::with_capacity(3 + self.fields.len());
1154        res.push(self.timestamp.to_arrow_array());
1155        res.push(self.sequence.to_arrow_array());
1156        res.push(self.op_type.to_arrow_array());
1157        res.extend(self.fields.iter().map(|f| f.to_arrow_array()));
1158        res
1159    }
1160
1161    /// Builds a new [Values] instance from columns.
1162    fn from_columns(cols: &[ArrayRef]) -> Result<Self> {
1163        debug_assert!(cols.len() >= 3);
1164        let timestamp = Helper::try_into_vector(&cols[0]).context(ConvertVectorSnafu)?;
1165        let sequence =
1166            Arc::new(UInt64Vector::try_from_arrow_array(&cols[1]).context(ConvertVectorSnafu)?);
1167        let op_type =
1168            Arc::new(UInt8Vector::try_from_arrow_array(&cols[2]).context(ConvertVectorSnafu)?);
1169        let fields = Helper::try_into_vectors(&cols[3..]).context(ConvertVectorSnafu)?;
1170
1171        Ok(Self {
1172            timestamp,
1173            sequence,
1174            op_type,
1175            fields,
1176        })
1177    }
1178}
1179
1180impl From<ValueBuilder> for Values {
1181    fn from(mut value: ValueBuilder) -> Self {
1182        let num_rows = value.len();
1183        let fields = value
1184            .fields
1185            .iter_mut()
1186            .enumerate()
1187            .map(|(i, v)| {
1188                if let Some(v) = v {
1189                    MEMTABLE_ACTIVE_FIELD_BUILDER_COUNT.dec();
1190                    v.finish()
1191                } else {
1192                    let mut single_null = value.field_types[i].create_mutable_vector(num_rows);
1193                    single_null.push_nulls(num_rows);
1194                    single_null.to_vector()
1195                }
1196            })
1197            .collect::<Vec<_>>();
1198
1199        let sequence = Arc::new(UInt64Vector::from_vec(value.sequence));
1200        let op_type = Arc::new(UInt8Vector::from_vec(value.op_type));
1201        let timestamp: VectorRef = match value.timestamp_type {
1202            ConcreteDataType::Timestamp(TimestampType::Second(_)) => {
1203                Arc::new(TimestampSecondVector::from_vec(value.timestamp))
1204            }
1205            ConcreteDataType::Timestamp(TimestampType::Millisecond(_)) => {
1206                Arc::new(TimestampMillisecondVector::from_vec(value.timestamp))
1207            }
1208            ConcreteDataType::Timestamp(TimestampType::Microsecond(_)) => {
1209                Arc::new(TimestampMicrosecondVector::from_vec(value.timestamp))
1210            }
1211            ConcreteDataType::Timestamp(TimestampType::Nanosecond(_)) => {
1212                Arc::new(TimestampNanosecondVector::from_vec(value.timestamp))
1213            }
1214            _ => unreachable!(),
1215        };
1216
1217        if cfg!(debug_assertions) {
1218            debug_assert_eq!(timestamp.len(), sequence.len());
1219            debug_assert_eq!(timestamp.len(), op_type.len());
1220            for field in &fields {
1221                debug_assert_eq!(timestamp.len(), field.len());
1222            }
1223        }
1224
1225        Self {
1226            timestamp,
1227            sequence,
1228            op_type,
1229            fields,
1230        }
1231    }
1232}
1233
1234struct TimeSeriesIterBuilder {
1235    series_set: SeriesSet,
1236    projection: HashSet<ColumnId>,
1237    predicate: Option<Predicate>,
1238    dedup: bool,
1239    sequence: Option<SequenceNumber>,
1240    merge_mode: MergeMode,
1241}
1242
1243impl IterBuilder for TimeSeriesIterBuilder {
1244    fn build(&self, metrics: Option<MemScanMetrics>) -> Result<BoxedBatchIterator> {
1245        let iter = self.series_set.iter_series(
1246            self.projection.clone(),
1247            self.predicate.clone(),
1248            self.dedup,
1249            self.sequence,
1250            metrics,
1251        )?;
1252
1253        if self.merge_mode == MergeMode::LastNonNull {
1254            let iter = LastNonNullIter::new(iter);
1255            Ok(Box::new(iter))
1256        } else {
1257            Ok(Box::new(iter))
1258        }
1259    }
1260}
1261
1262#[cfg(test)]
1263mod tests {
1264    use std::collections::{HashMap, HashSet};
1265
1266    use api::helper::ColumnDataTypeWrapper;
1267    use api::v1::value::ValueData;
1268    use api::v1::{Mutation, Row, Rows, SemanticType};
1269    use common_time::Timestamp;
1270    use datatypes::prelude::{ConcreteDataType, ScalarVector};
1271    use datatypes::schema::ColumnSchema;
1272    use datatypes::value::{OrderedFloat, Value};
1273    use datatypes::vectors::{Float64Vector, Int64Vector, TimestampMillisecondVector};
1274    use mito_codec::row_converter::SortField;
1275    use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
1276    use store_api::storage::RegionId;
1277
1278    use super::*;
1279    use crate::test_util::column_metadata_to_column_schema;
1280
1281    fn schema_for_test() -> RegionMetadataRef {
1282        let mut builder = RegionMetadataBuilder::new(RegionId::new(123, 456));
1283        builder
1284            .push_column_metadata(ColumnMetadata {
1285                column_schema: ColumnSchema::new("k0", ConcreteDataType::string_datatype(), false),
1286                semantic_type: SemanticType::Tag,
1287                column_id: 0,
1288            })
1289            .push_column_metadata(ColumnMetadata {
1290                column_schema: ColumnSchema::new("k1", ConcreteDataType::int64_datatype(), false),
1291                semantic_type: SemanticType::Tag,
1292                column_id: 1,
1293            })
1294            .push_column_metadata(ColumnMetadata {
1295                column_schema: ColumnSchema::new(
1296                    "ts",
1297                    ConcreteDataType::timestamp_millisecond_datatype(),
1298                    false,
1299                ),
1300                semantic_type: SemanticType::Timestamp,
1301                column_id: 2,
1302            })
1303            .push_column_metadata(ColumnMetadata {
1304                column_schema: ColumnSchema::new("v0", ConcreteDataType::int64_datatype(), true),
1305                semantic_type: SemanticType::Field,
1306                column_id: 3,
1307            })
1308            .push_column_metadata(ColumnMetadata {
1309                column_schema: ColumnSchema::new("v1", ConcreteDataType::float64_datatype(), true),
1310                semantic_type: SemanticType::Field,
1311                column_id: 4,
1312            })
1313            .primary_key(vec![0, 1]);
1314        let region_metadata = builder.build().unwrap();
1315        Arc::new(region_metadata)
1316    }
1317
1318    fn ts_value_ref(val: i64) -> ValueRef<'static> {
1319        ValueRef::Timestamp(Timestamp::new_millisecond(val))
1320    }
1321
1322    fn field_value_ref(v0: i64, v1: f64) -> impl Iterator<Item = ValueRef<'static>> {
1323        vec![ValueRef::Int64(v0), ValueRef::Float64(OrderedFloat(v1))].into_iter()
1324    }
1325
1326    fn check_values(values: &Values, expect: &[(i64, u64, u8, i64, f64)]) {
1327        let ts = values
1328            .timestamp
1329            .as_any()
1330            .downcast_ref::<TimestampMillisecondVector>()
1331            .unwrap();
1332
1333        let v0 = values.fields[0]
1334            .as_any()
1335            .downcast_ref::<Int64Vector>()
1336            .unwrap();
1337        let v1 = values.fields[1]
1338            .as_any()
1339            .downcast_ref::<Float64Vector>()
1340            .unwrap();
1341        let read = ts
1342            .iter_data()
1343            .zip(values.sequence.iter_data())
1344            .zip(values.op_type.iter_data())
1345            .zip(v0.iter_data())
1346            .zip(v1.iter_data())
1347            .map(|((((ts, sequence), op_type), v0), v1)| {
1348                (
1349                    ts.unwrap().0.value(),
1350                    sequence.unwrap(),
1351                    op_type.unwrap(),
1352                    v0.unwrap(),
1353                    v1.unwrap(),
1354                )
1355            })
1356            .collect::<Vec<_>>();
1357        assert_eq!(expect, &read);
1358    }
1359
1360    #[test]
1361    fn test_series() {
1362        let region_metadata = schema_for_test();
1363        let mut series = Series::new(&region_metadata);
1364        series.push(ts_value_ref(1), 0, OpType::Put, field_value_ref(1, 10.1));
1365        series.push(ts_value_ref(2), 0, OpType::Put, field_value_ref(2, 10.2));
1366        assert_eq!(2, series.active.timestamp.len());
1367        assert_eq!(0, series.frozen.len());
1368
1369        let values = series.compact(&region_metadata).unwrap();
1370        check_values(values, &[(1, 0, 1, 1, 10.1), (2, 0, 1, 2, 10.2)]);
1371        assert_eq!(0, series.active.timestamp.len());
1372        assert_eq!(1, series.frozen.len());
1373    }
1374
1375    #[test]
1376    fn test_series_with_nulls() {
1377        let region_metadata = schema_for_test();
1378        let mut series = Series::new(&region_metadata);
1379        // col1: NULL 1 2 3
1380        // col2: NULL NULL 10.2 NULL
1381        series.push(
1382            ts_value_ref(1),
1383            0,
1384            OpType::Put,
1385            vec![ValueRef::Null, ValueRef::Null].into_iter(),
1386        );
1387        series.push(
1388            ts_value_ref(1),
1389            0,
1390            OpType::Put,
1391            vec![ValueRef::Int64(1), ValueRef::Null].into_iter(),
1392        );
1393        series.push(ts_value_ref(1), 2, OpType::Put, field_value_ref(2, 10.2));
1394        series.push(
1395            ts_value_ref(1),
1396            3,
1397            OpType::Put,
1398            vec![ValueRef::Int64(2), ValueRef::Null].into_iter(),
1399        );
1400        assert_eq!(4, series.active.timestamp.len());
1401        assert_eq!(0, series.frozen.len());
1402
1403        let values = series.compact(&region_metadata).unwrap();
1404        assert_eq!(values.fields[0].null_count(), 1);
1405        assert_eq!(values.fields[1].null_count(), 3);
1406        assert_eq!(0, series.active.timestamp.len());
1407        assert_eq!(1, series.frozen.len());
1408    }
1409
1410    fn check_value(batch: &Batch, expect: Vec<Vec<Value>>) {
1411        let ts_len = batch.timestamps().len();
1412        assert_eq!(batch.sequences().len(), ts_len);
1413        assert_eq!(batch.op_types().len(), ts_len);
1414        for f in batch.fields() {
1415            assert_eq!(f.data.len(), ts_len);
1416        }
1417
1418        let mut rows = vec![];
1419        for idx in 0..ts_len {
1420            let mut row = Vec::with_capacity(batch.fields().len() + 3);
1421            row.push(batch.timestamps().get(idx));
1422            row.push(batch.sequences().get(idx));
1423            row.push(batch.op_types().get(idx));
1424            row.extend(batch.fields().iter().map(|f| f.data.get(idx)));
1425            rows.push(row);
1426        }
1427
1428        assert_eq!(expect.len(), rows.len());
1429        for (idx, row) in rows.iter().enumerate() {
1430            assert_eq!(&expect[idx], row);
1431        }
1432    }
1433
1434    #[test]
1435    fn test_values_sort() {
1436        let schema = schema_for_test();
1437        let timestamp = Arc::new(TimestampMillisecondVector::from_vec(vec![1, 2, 3, 4, 3]));
1438        let sequence = Arc::new(UInt64Vector::from_vec(vec![1, 1, 1, 1, 2]));
1439        let op_type = Arc::new(UInt8Vector::from_vec(vec![1, 1, 1, 1, 0]));
1440
1441        let fields = vec![
1442            Arc::new(Int64Vector::from_vec(vec![4, 3, 2, 1, 2])) as Arc<_>,
1443            Arc::new(Float64Vector::from_vec(vec![1.1, 2.1, 4.2, 3.3, 4.2])) as Arc<_>,
1444        ];
1445        let values = Values {
1446            timestamp: timestamp as Arc<_>,
1447            sequence,
1448            op_type,
1449            fields,
1450        };
1451
1452        let batch = values
1453            .to_batch(
1454                b"test",
1455                &schema,
1456                &[0, 1, 2, 3, 4].into_iter().collect(),
1457                true,
1458            )
1459            .unwrap();
1460        check_value(
1461            &batch,
1462            vec![
1463                vec![
1464                    Value::Timestamp(Timestamp::new_millisecond(1)),
1465                    Value::UInt64(1),
1466                    Value::UInt8(1),
1467                    Value::Int64(4),
1468                    Value::Float64(OrderedFloat(1.1)),
1469                ],
1470                vec![
1471                    Value::Timestamp(Timestamp::new_millisecond(2)),
1472                    Value::UInt64(1),
1473                    Value::UInt8(1),
1474                    Value::Int64(3),
1475                    Value::Float64(OrderedFloat(2.1)),
1476                ],
1477                vec![
1478                    Value::Timestamp(Timestamp::new_millisecond(3)),
1479                    Value::UInt64(2),
1480                    Value::UInt8(0),
1481                    Value::Int64(2),
1482                    Value::Float64(OrderedFloat(4.2)),
1483                ],
1484                vec![
1485                    Value::Timestamp(Timestamp::new_millisecond(4)),
1486                    Value::UInt64(1),
1487                    Value::UInt8(1),
1488                    Value::Int64(1),
1489                    Value::Float64(OrderedFloat(3.3)),
1490                ],
1491            ],
1492        )
1493    }
1494
1495    fn build_key_values(schema: &RegionMetadataRef, k0: String, k1: i64, len: usize) -> KeyValues {
1496        let column_schema = schema
1497            .column_metadatas
1498            .iter()
1499            .map(|c| api::v1::ColumnSchema {
1500                column_name: c.column_schema.name.clone(),
1501                datatype: ColumnDataTypeWrapper::try_from(c.column_schema.data_type.clone())
1502                    .unwrap()
1503                    .datatype() as i32,
1504                semantic_type: c.semantic_type as i32,
1505                ..Default::default()
1506            })
1507            .collect();
1508
1509        let rows = (0..len)
1510            .map(|i| Row {
1511                values: vec![
1512                    api::v1::Value {
1513                        value_data: Some(ValueData::StringValue(k0.clone())),
1514                    },
1515                    api::v1::Value {
1516                        value_data: Some(ValueData::I64Value(k1)),
1517                    },
1518                    api::v1::Value {
1519                        value_data: Some(ValueData::TimestampMillisecondValue(i as i64)),
1520                    },
1521                    api::v1::Value {
1522                        value_data: Some(ValueData::I64Value(i as i64)),
1523                    },
1524                    api::v1::Value {
1525                        value_data: Some(ValueData::F64Value(i as f64)),
1526                    },
1527                ],
1528            })
1529            .collect();
1530        let mutation = api::v1::Mutation {
1531            op_type: 1,
1532            sequence: 0,
1533            rows: Some(Rows {
1534                schema: column_schema,
1535                rows,
1536            }),
1537            write_hint: None,
1538        };
1539        KeyValues::new(schema.as_ref(), mutation).unwrap()
1540    }
1541
1542    #[test]
1543    fn test_series_set_concurrency() {
1544        let schema = schema_for_test();
1545        let row_codec = Arc::new(DensePrimaryKeyCodec::with_fields(
1546            schema
1547                .primary_key_columns()
1548                .map(|c| {
1549                    (
1550                        c.column_id,
1551                        SortField::new(c.column_schema.data_type.clone()),
1552                    )
1553                })
1554                .collect(),
1555        ));
1556        let set = Arc::new(SeriesSet::new(schema.clone(), row_codec));
1557
1558        let concurrency = 32;
1559        let pk_num = concurrency * 2;
1560        let mut handles = Vec::with_capacity(concurrency);
1561        for i in 0..concurrency {
1562            let set = set.clone();
1563            let schema = schema.clone();
1564            let column_schemas = schema
1565                .column_metadatas
1566                .iter()
1567                .map(column_metadata_to_column_schema)
1568                .collect::<Vec<_>>();
1569            let handle = std::thread::spawn(move || {
1570                for j in i * 100..(i + 1) * 100 {
1571                    let pk = j % pk_num;
1572                    let primary_key = format!("pk-{}", pk).as_bytes().to_vec();
1573
1574                    let kvs = KeyValues::new(
1575                        &schema,
1576                        Mutation {
1577                            op_type: OpType::Put as i32,
1578                            sequence: j as u64,
1579                            rows: Some(Rows {
1580                                schema: column_schemas.clone(),
1581                                rows: vec![Row {
1582                                    values: vec![
1583                                        api::v1::Value {
1584                                            value_data: Some(ValueData::StringValue(format!(
1585                                                "{}",
1586                                                j
1587                                            ))),
1588                                        },
1589                                        api::v1::Value {
1590                                            value_data: Some(ValueData::I64Value(j as i64)),
1591                                        },
1592                                        api::v1::Value {
1593                                            value_data: Some(ValueData::TimestampMillisecondValue(
1594                                                j as i64,
1595                                            )),
1596                                        },
1597                                        api::v1::Value {
1598                                            value_data: Some(ValueData::I64Value(j as i64)),
1599                                        },
1600                                        api::v1::Value {
1601                                            value_data: Some(ValueData::F64Value(j as f64)),
1602                                        },
1603                                    ],
1604                                }],
1605                            }),
1606                            write_hint: None,
1607                        },
1608                    )
1609                    .unwrap();
1610                    set.push_to_series(primary_key, &kvs.iter().next().unwrap());
1611                }
1612            });
1613            handles.push(handle);
1614        }
1615        for h in handles {
1616            h.join().unwrap();
1617        }
1618
1619        let mut timestamps = Vec::with_capacity(concurrency * 100);
1620        let mut sequences = Vec::with_capacity(concurrency * 100);
1621        let mut op_types = Vec::with_capacity(concurrency * 100);
1622        let mut v0 = Vec::with_capacity(concurrency * 100);
1623
1624        for i in 0..pk_num {
1625            let pk = format!("pk-{}", i).as_bytes().to_vec();
1626            let series = set.get_series(&pk).unwrap();
1627            let mut guard = series.write().unwrap();
1628            let values = guard.compact(&schema).unwrap();
1629            timestamps.extend(values.sequence.iter_data().map(|v| v.unwrap() as i64));
1630            sequences.extend(values.sequence.iter_data().map(|v| v.unwrap() as i64));
1631            op_types.extend(values.op_type.iter_data().map(|v| v.unwrap()));
1632            v0.extend(
1633                values
1634                    .fields
1635                    .first()
1636                    .unwrap()
1637                    .as_any()
1638                    .downcast_ref::<Int64Vector>()
1639                    .unwrap()
1640                    .iter_data()
1641                    .map(|v| v.unwrap()),
1642            );
1643        }
1644
1645        let expected_sequence = (0..(concurrency * 100) as i64).collect::<HashSet<_>>();
1646        assert_eq!(
1647            expected_sequence,
1648            sequences.iter().copied().collect::<HashSet<_>>()
1649        );
1650
1651        op_types.iter().all(|op| *op == OpType::Put as u8);
1652        assert_eq!(
1653            expected_sequence,
1654            timestamps.iter().copied().collect::<HashSet<_>>()
1655        );
1656
1657        assert_eq!(timestamps, sequences);
1658        assert_eq!(v0, timestamps);
1659    }
1660
1661    #[test]
1662    fn test_memtable() {
1663        common_telemetry::init_default_ut_logging();
1664        check_memtable_dedup(true);
1665        check_memtable_dedup(false);
1666    }
1667
1668    fn check_memtable_dedup(dedup: bool) {
1669        let schema = schema_for_test();
1670        let kvs = build_key_values(&schema, "hello".to_string(), 42, 100);
1671        let memtable = TimeSeriesMemtable::new(schema, 42, None, dedup, MergeMode::LastRow);
1672        memtable.write(&kvs).unwrap();
1673        memtable.write(&kvs).unwrap();
1674
1675        let mut expected_ts: HashMap<i64, usize> = HashMap::new();
1676        for ts in kvs
1677            .iter()
1678            .map(|kv| kv.timestamp().as_timestamp().unwrap().unwrap().value())
1679        {
1680            *expected_ts.entry(ts).or_default() += if dedup { 1 } else { 2 };
1681        }
1682
1683        let iter = memtable.iter(None, None, None).unwrap();
1684        let mut read = HashMap::new();
1685
1686        for ts in iter
1687            .flat_map(|batch| {
1688                batch
1689                    .unwrap()
1690                    .timestamps()
1691                    .as_any()
1692                    .downcast_ref::<TimestampMillisecondVector>()
1693                    .unwrap()
1694                    .iter_data()
1695                    .collect::<Vec<_>>()
1696                    .into_iter()
1697            })
1698            .map(|v| v.unwrap().0.value())
1699        {
1700            *read.entry(ts).or_default() += 1;
1701        }
1702        assert_eq!(expected_ts, read);
1703
1704        let stats = memtable.stats();
1705        assert!(stats.bytes_allocated() > 0);
1706        assert_eq!(
1707            Some((
1708                Timestamp::new_millisecond(0),
1709                Timestamp::new_millisecond(99)
1710            )),
1711            stats.time_range()
1712        );
1713    }
1714
1715    #[test]
1716    fn test_memtable_projection() {
1717        common_telemetry::init_default_ut_logging();
1718        let schema = schema_for_test();
1719        let kvs = build_key_values(&schema, "hello".to_string(), 42, 100);
1720        let memtable = TimeSeriesMemtable::new(schema, 42, None, true, MergeMode::LastRow);
1721        memtable.write(&kvs).unwrap();
1722
1723        let iter = memtable.iter(Some(&[3]), None, None).unwrap();
1724
1725        let mut v0_all = vec![];
1726
1727        for res in iter {
1728            let batch = res.unwrap();
1729            assert_eq!(1, batch.fields().len());
1730            let v0 = batch
1731                .fields()
1732                .first()
1733                .unwrap()
1734                .data
1735                .as_any()
1736                .downcast_ref::<Int64Vector>()
1737                .unwrap();
1738            v0_all.extend(v0.iter_data().map(|v| v.unwrap()));
1739        }
1740        assert_eq!((0..100i64).collect::<Vec<_>>(), v0_all);
1741    }
1742
1743    #[test]
1744    fn test_memtable_concurrent_write_read() {
1745        common_telemetry::init_default_ut_logging();
1746        let schema = schema_for_test();
1747        let memtable = Arc::new(TimeSeriesMemtable::new(
1748            schema.clone(),
1749            42,
1750            None,
1751            true,
1752            MergeMode::LastRow,
1753        ));
1754
1755        // Number of writer threads
1756        let num_writers = 10;
1757        // Number of reader threads
1758        let num_readers = 5;
1759        // Number of series per writer
1760        let series_per_writer = 100;
1761        // Number of rows per series
1762        let rows_per_series = 10;
1763        // Total number of series
1764        let total_series = num_writers * series_per_writer;
1765
1766        // Create a barrier to synchronize the start of all threads
1767        let barrier = Arc::new(std::sync::Barrier::new(num_writers + num_readers + 1));
1768
1769        // Spawn writer threads
1770        let mut writer_handles = Vec::with_capacity(num_writers);
1771        for writer_id in 0..num_writers {
1772            let memtable = memtable.clone();
1773            let schema = schema.clone();
1774            let barrier = barrier.clone();
1775
1776            let handle = std::thread::spawn(move || {
1777                // Wait for all threads to be ready
1778                barrier.wait();
1779
1780                // Create and write series
1781                for series_id in 0..series_per_writer {
1782                    let series_key = format!("writer-{}-series-{}", writer_id, series_id);
1783                    let kvs =
1784                        build_key_values(&schema, series_key, series_id as i64, rows_per_series);
1785                    memtable.write(&kvs).unwrap();
1786                }
1787            });
1788
1789            writer_handles.push(handle);
1790        }
1791
1792        // Spawn reader threads
1793        let mut reader_handles = Vec::with_capacity(num_readers);
1794        for _ in 0..num_readers {
1795            let memtable = memtable.clone();
1796            let barrier = barrier.clone();
1797
1798            let handle = std::thread::spawn(move || {
1799                barrier.wait();
1800
1801                for _ in 0..10 {
1802                    let iter = memtable.iter(None, None, None).unwrap();
1803                    for batch_result in iter {
1804                        let _ = batch_result.unwrap();
1805                    }
1806                }
1807            });
1808
1809            reader_handles.push(handle);
1810        }
1811
1812        barrier.wait();
1813
1814        for handle in writer_handles {
1815            handle.join().unwrap();
1816        }
1817        for handle in reader_handles {
1818            handle.join().unwrap();
1819        }
1820
1821        let iter = memtable.iter(None, None, None).unwrap();
1822        let mut series_count = 0;
1823        let mut row_count = 0;
1824
1825        for batch_result in iter {
1826            let batch = batch_result.unwrap();
1827            series_count += 1;
1828            row_count += batch.num_rows();
1829        }
1830        assert_eq!(total_series, series_count);
1831        assert_eq!(total_series * rows_per_series, row_count);
1832    }
1833}