mito2/memtable/
time_series.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::btree_map::Entry;
16use std::collections::{BTreeMap, Bound, HashSet};
17use std::fmt::{Debug, Formatter};
18use std::iter;
19use std::sync::atomic::{AtomicI64, AtomicU64, AtomicUsize, Ordering};
20use std::sync::{Arc, RwLock};
21use std::time::{Duration, Instant};
22
23use api::v1::OpType;
24use common_recordbatch::filter::SimpleFilterEvaluator;
25use common_telemetry::{debug, error};
26use common_time::Timestamp;
27use datatypes::arrow;
28use datatypes::arrow::array::ArrayRef;
29use datatypes::arrow_array::StringArray;
30use datatypes::data_type::{ConcreteDataType, DataType};
31use datatypes::prelude::{ScalarVector, Vector, VectorRef};
32use datatypes::types::TimestampType;
33use datatypes::value::{Value, ValueRef};
34use datatypes::vectors::{
35    Helper, TimestampMicrosecondVector, TimestampMillisecondVector, TimestampNanosecondVector,
36    TimestampSecondVector, UInt8Vector, UInt64Vector,
37};
38use mito_codec::key_values::KeyValue;
39use mito_codec::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt};
40use snafu::{OptionExt, ResultExt, ensure};
41use store_api::metadata::RegionMetadataRef;
42use store_api::storage::{ColumnId, SequenceRange};
43use table::predicate::Predicate;
44
45use crate::error::{
46    self, ComputeArrowSnafu, ConvertVectorSnafu, EncodeSnafu, PrimaryKeyLengthMismatchSnafu, Result,
47};
48use crate::flush::WriteBufferManagerRef;
49use crate::memtable::builder::{FieldBuilder, StringBuilder};
50use crate::memtable::bulk::part::BulkPart;
51use crate::memtable::simple_bulk_memtable::SimpleBulkMemtable;
52use crate::memtable::stats::WriteMetrics;
53use crate::memtable::{
54    AllocTracker, BoxedBatchIterator, IterBuilder, KeyValues, MemScanMetrics, Memtable,
55    MemtableBuilder, MemtableId, MemtableRange, MemtableRangeContext, MemtableRanges, MemtableRef,
56    MemtableStats, RangesOptions,
57};
58use crate::metrics::{
59    MEMTABLE_ACTIVE_FIELD_BUILDER_COUNT, MEMTABLE_ACTIVE_SERIES_COUNT, READ_ROWS_TOTAL,
60    READ_STAGE_ELAPSED,
61};
62use crate::read::dedup::LastNonNullIter;
63use crate::read::{Batch, BatchBuilder, BatchColumn};
64use crate::region::options::MergeMode;
65
66/// Initial vector builder capacity.
67const INITIAL_BUILDER_CAPACITY: usize = 4;
68
69/// Vector builder capacity.
70const BUILDER_CAPACITY: usize = 512;
71
72/// Builder to build [TimeSeriesMemtable].
73#[derive(Debug, Default)]
74pub struct TimeSeriesMemtableBuilder {
75    write_buffer_manager: Option<WriteBufferManagerRef>,
76    dedup: bool,
77    merge_mode: MergeMode,
78}
79
80impl TimeSeriesMemtableBuilder {
81    /// Creates a new builder with specific `write_buffer_manager`.
82    pub fn new(
83        write_buffer_manager: Option<WriteBufferManagerRef>,
84        dedup: bool,
85        merge_mode: MergeMode,
86    ) -> Self {
87        Self {
88            write_buffer_manager,
89            dedup,
90            merge_mode,
91        }
92    }
93}
94
95impl MemtableBuilder for TimeSeriesMemtableBuilder {
96    fn build(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef {
97        if metadata.primary_key.is_empty() {
98            Arc::new(SimpleBulkMemtable::new(
99                id,
100                metadata.clone(),
101                self.write_buffer_manager.clone(),
102                self.dedup,
103                self.merge_mode,
104            ))
105        } else {
106            Arc::new(TimeSeriesMemtable::new(
107                metadata.clone(),
108                id,
109                self.write_buffer_manager.clone(),
110                self.dedup,
111                self.merge_mode,
112            ))
113        }
114    }
115
116    fn use_bulk_insert(&self, _metadata: &RegionMetadataRef) -> bool {
117        // Now if we can use simple bulk memtable, the input request is already
118        // a bulk write request and won't call this method.
119        false
120    }
121}
122
123/// Memtable implementation that groups rows by their primary key.
124pub struct TimeSeriesMemtable {
125    id: MemtableId,
126    region_metadata: RegionMetadataRef,
127    row_codec: Arc<DensePrimaryKeyCodec>,
128    series_set: SeriesSet,
129    alloc_tracker: AllocTracker,
130    max_timestamp: AtomicI64,
131    min_timestamp: AtomicI64,
132    max_sequence: AtomicU64,
133    dedup: bool,
134    merge_mode: MergeMode,
135    /// Total written rows in memtable. This also includes deleted and duplicated rows.
136    num_rows: AtomicUsize,
137}
138
139impl TimeSeriesMemtable {
140    pub fn new(
141        region_metadata: RegionMetadataRef,
142        id: MemtableId,
143        write_buffer_manager: Option<WriteBufferManagerRef>,
144        dedup: bool,
145        merge_mode: MergeMode,
146    ) -> Self {
147        let row_codec = Arc::new(DensePrimaryKeyCodec::new(&region_metadata));
148        let series_set = SeriesSet::new(region_metadata.clone(), row_codec.clone());
149        let dedup = if merge_mode == MergeMode::LastNonNull {
150            false
151        } else {
152            dedup
153        };
154        Self {
155            id,
156            region_metadata,
157            series_set,
158            row_codec,
159            alloc_tracker: AllocTracker::new(write_buffer_manager),
160            max_timestamp: AtomicI64::new(i64::MIN),
161            min_timestamp: AtomicI64::new(i64::MAX),
162            max_sequence: AtomicU64::new(0),
163            dedup,
164            merge_mode,
165            num_rows: Default::default(),
166        }
167    }
168
169    /// Updates memtable stats.
170    fn update_stats(&self, stats: WriteMetrics) {
171        self.alloc_tracker
172            .on_allocation(stats.key_bytes + stats.value_bytes);
173        self.max_timestamp.fetch_max(stats.max_ts, Ordering::SeqCst);
174        self.min_timestamp.fetch_min(stats.min_ts, Ordering::SeqCst);
175        self.max_sequence
176            .fetch_max(stats.max_sequence, Ordering::SeqCst);
177        self.num_rows.fetch_add(stats.num_rows, Ordering::SeqCst);
178    }
179
180    fn write_key_value(&self, kv: KeyValue, stats: &mut WriteMetrics) -> Result<()> {
181        ensure!(
182            self.row_codec.num_fields() == kv.num_primary_keys(),
183            PrimaryKeyLengthMismatchSnafu {
184                expect: self.row_codec.num_fields(),
185                actual: kv.num_primary_keys(),
186            }
187        );
188
189        let primary_key_encoded = self
190            .row_codec
191            .encode(kv.primary_keys())
192            .context(EncodeSnafu)?;
193
194        let (key_allocated, value_allocated) =
195            self.series_set.push_to_series(primary_key_encoded, &kv);
196        stats.key_bytes += key_allocated;
197        stats.value_bytes += value_allocated;
198
199        // safety: timestamp of kv must be both present and a valid timestamp value.
200        let ts = kv
201            .timestamp()
202            .try_into_timestamp()
203            .unwrap()
204            .unwrap()
205            .value();
206        stats.min_ts = stats.min_ts.min(ts);
207        stats.max_ts = stats.max_ts.max(ts);
208        Ok(())
209    }
210}
211
212impl Debug for TimeSeriesMemtable {
213    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
214        f.debug_struct("TimeSeriesMemtable").finish()
215    }
216}
217
218impl Memtable for TimeSeriesMemtable {
219    fn id(&self) -> MemtableId {
220        self.id
221    }
222
223    fn write(&self, kvs: &KeyValues) -> Result<()> {
224        if kvs.is_empty() {
225            return Ok(());
226        }
227
228        let mut local_stats = WriteMetrics::default();
229
230        for kv in kvs.iter() {
231            self.write_key_value(kv, &mut local_stats)?;
232        }
233        local_stats.value_bytes += kvs.num_rows() * std::mem::size_of::<Timestamp>();
234        local_stats.value_bytes += kvs.num_rows() * std::mem::size_of::<OpType>();
235        local_stats.max_sequence = kvs.max_sequence();
236        local_stats.num_rows = kvs.num_rows();
237        // TODO(hl): this maybe inaccurate since for-iteration may return early.
238        // We may lift the primary key length check out of Memtable::write
239        // so that we can ensure writing to memtable will succeed.
240        self.update_stats(local_stats);
241        Ok(())
242    }
243
244    fn write_one(&self, key_value: KeyValue) -> Result<()> {
245        let mut metrics = WriteMetrics::default();
246        let res = self.write_key_value(key_value, &mut metrics);
247        metrics.value_bytes += std::mem::size_of::<Timestamp>() + std::mem::size_of::<OpType>();
248        metrics.max_sequence = key_value.sequence();
249        metrics.num_rows = 1;
250
251        if res.is_ok() {
252            self.update_stats(metrics);
253        }
254        res
255    }
256
257    fn write_bulk(&self, part: BulkPart) -> Result<()> {
258        // Default implementation fallback to row iteration.
259        let mutation = part.to_mutation(&self.region_metadata)?;
260        let mut metrics = WriteMetrics::default();
261        if let Some(key_values) = KeyValues::new(&self.region_metadata, mutation) {
262            for kv in key_values.iter() {
263                self.write_key_value(kv, &mut metrics)?
264            }
265        }
266
267        metrics.max_sequence = part.sequence;
268        metrics.max_ts = part.max_timestamp;
269        metrics.min_ts = part.min_timestamp;
270        metrics.num_rows = part.num_rows();
271        self.update_stats(metrics);
272        Ok(())
273    }
274
275    #[cfg(any(test, feature = "test"))]
276    fn iter(
277        &self,
278        projection: Option<&[ColumnId]>,
279        filters: Option<Predicate>,
280        sequence: Option<SequenceRange>,
281    ) -> Result<BoxedBatchIterator> {
282        let projection = if let Some(projection) = projection {
283            projection.iter().copied().collect()
284        } else {
285            self.region_metadata
286                .field_columns()
287                .map(|c| c.column_id)
288                .collect()
289        };
290
291        let iter = self
292            .series_set
293            .iter_series(projection, filters, self.dedup, sequence, None)?;
294
295        if self.merge_mode == MergeMode::LastNonNull {
296            let iter = LastNonNullIter::new(iter);
297            Ok(Box::new(iter))
298        } else {
299            Ok(Box::new(iter))
300        }
301    }
302
303    fn ranges(
304        &self,
305        projection: Option<&[ColumnId]>,
306        options: RangesOptions,
307    ) -> Result<MemtableRanges> {
308        let predicate = options.predicate;
309        let sequence = options.sequence;
310        let projection = if let Some(projection) = projection {
311            projection.iter().copied().collect()
312        } else {
313            self.region_metadata
314                .field_columns()
315                .map(|c| c.column_id)
316                .collect()
317        };
318        let builder = Box::new(TimeSeriesIterBuilder {
319            series_set: self.series_set.clone(),
320            projection,
321            predicate: predicate.predicate().cloned(),
322            dedup: self.dedup,
323            merge_mode: self.merge_mode,
324            sequence,
325        });
326        let context = Arc::new(MemtableRangeContext::new(self.id, builder, predicate));
327
328        let stats = self.stats();
329        Ok(MemtableRanges {
330            ranges: [(0, MemtableRange::new(context, stats.num_rows))].into(),
331            stats,
332        })
333    }
334
335    fn is_empty(&self) -> bool {
336        self.series_set.series.read().unwrap().0.is_empty()
337    }
338
339    fn freeze(&self) -> Result<()> {
340        self.alloc_tracker.done_allocating();
341
342        Ok(())
343    }
344
345    fn stats(&self) -> MemtableStats {
346        let estimated_bytes = self.alloc_tracker.bytes_allocated();
347
348        if estimated_bytes == 0 {
349            // no rows ever written
350            return MemtableStats {
351                estimated_bytes,
352                time_range: None,
353                num_rows: 0,
354                num_ranges: 0,
355                max_sequence: 0,
356                series_count: 0,
357            };
358        }
359        let ts_type = self
360            .region_metadata
361            .time_index_column()
362            .column_schema
363            .data_type
364            .clone()
365            .as_timestamp()
366            .expect("Timestamp column must have timestamp type");
367        let max_timestamp = ts_type.create_timestamp(self.max_timestamp.load(Ordering::Relaxed));
368        let min_timestamp = ts_type.create_timestamp(self.min_timestamp.load(Ordering::Relaxed));
369        let series_count = self.series_set.series.read().unwrap().0.len();
370        MemtableStats {
371            estimated_bytes,
372            time_range: Some((min_timestamp, max_timestamp)),
373            num_rows: self.num_rows.load(Ordering::Relaxed),
374            num_ranges: 1,
375            max_sequence: self.max_sequence.load(Ordering::Relaxed),
376            series_count,
377        }
378    }
379
380    fn fork(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef {
381        Arc::new(TimeSeriesMemtable::new(
382            metadata.clone(),
383            id,
384            self.alloc_tracker.write_buffer_manager(),
385            self.dedup,
386            self.merge_mode,
387        ))
388    }
389}
390
391#[derive(Default)]
392struct SeriesMap(BTreeMap<Vec<u8>, Arc<RwLock<Series>>>);
393
394impl Drop for SeriesMap {
395    fn drop(&mut self) {
396        let num_series = self.0.len();
397        let num_field_builders = self
398            .0
399            .values()
400            .map(|v| v.read().unwrap().active.num_field_builders())
401            .sum::<usize>();
402        MEMTABLE_ACTIVE_SERIES_COUNT.sub(num_series as i64);
403        MEMTABLE_ACTIVE_FIELD_BUILDER_COUNT.sub(num_field_builders as i64);
404    }
405}
406
407#[derive(Clone)]
408pub(crate) struct SeriesSet {
409    region_metadata: RegionMetadataRef,
410    series: Arc<RwLock<SeriesMap>>,
411    codec: Arc<DensePrimaryKeyCodec>,
412}
413
414impl SeriesSet {
415    fn new(region_metadata: RegionMetadataRef, codec: Arc<DensePrimaryKeyCodec>) -> Self {
416        Self {
417            region_metadata,
418            series: Default::default(),
419            codec,
420        }
421    }
422}
423
424impl SeriesSet {
425    /// Push [KeyValue] to SeriesSet with given primary key and return key/value allocated memory size.
426    fn push_to_series(&self, primary_key: Vec<u8>, kv: &KeyValue) -> (usize, usize) {
427        if let Some(series) = self.series.read().unwrap().0.get(&primary_key) {
428            let value_allocated = series.write().unwrap().push(
429                kv.timestamp(),
430                kv.sequence(),
431                kv.op_type(),
432                kv.fields(),
433            );
434            return (0, value_allocated);
435        };
436
437        let mut indices = self.series.write().unwrap();
438        match indices.0.entry(primary_key) {
439            Entry::Vacant(v) => {
440                let key_len = v.key().len();
441                let mut series = Series::new(&self.region_metadata);
442                let value_allocated =
443                    series.push(kv.timestamp(), kv.sequence(), kv.op_type(), kv.fields());
444                v.insert(Arc::new(RwLock::new(series)));
445                (key_len, value_allocated)
446            }
447            // safety: series must exist at given index.
448            Entry::Occupied(v) => {
449                let value_allocated = v.get().write().unwrap().push(
450                    kv.timestamp(),
451                    kv.sequence(),
452                    kv.op_type(),
453                    kv.fields(),
454                );
455                (0, value_allocated)
456            }
457        }
458    }
459
460    #[cfg(test)]
461    fn get_series(&self, primary_key: &[u8]) -> Option<Arc<RwLock<Series>>> {
462        self.series.read().unwrap().0.get(primary_key).cloned()
463    }
464
465    /// Iterates all series in [SeriesSet].
466    fn iter_series(
467        &self,
468        projection: HashSet<ColumnId>,
469        predicate: Option<Predicate>,
470        dedup: bool,
471        sequence: Option<SequenceRange>,
472        mem_scan_metrics: Option<MemScanMetrics>,
473    ) -> Result<Iter> {
474        let primary_key_schema = primary_key_schema(&self.region_metadata);
475        let primary_key_datatypes = self
476            .region_metadata
477            .primary_key_columns()
478            .map(|pk| pk.column_schema.data_type.clone())
479            .collect();
480
481        Iter::try_new(
482            self.region_metadata.clone(),
483            self.series.clone(),
484            projection,
485            predicate,
486            primary_key_schema,
487            primary_key_datatypes,
488            self.codec.clone(),
489            dedup,
490            sequence,
491            mem_scan_metrics,
492        )
493    }
494}
495
496/// Creates an arrow [SchemaRef](arrow::datatypes::SchemaRef) that only contains primary keys
497/// of given region schema
498pub(crate) fn primary_key_schema(
499    region_metadata: &RegionMetadataRef,
500) -> arrow::datatypes::SchemaRef {
501    let fields = region_metadata
502        .primary_key_columns()
503        .map(|pk| {
504            arrow::datatypes::Field::new(
505                pk.column_schema.name.clone(),
506                pk.column_schema.data_type.as_arrow_type(),
507                pk.column_schema.is_nullable(),
508            )
509        })
510        .collect::<Vec<_>>();
511    Arc::new(arrow::datatypes::Schema::new(fields))
512}
513
514/// Metrics for reading the memtable.
515#[derive(Debug, Default)]
516struct Metrics {
517    /// Total series in the memtable.
518    total_series: usize,
519    /// Number of series pruned.
520    num_pruned_series: usize,
521    /// Number of rows read.
522    num_rows: usize,
523    /// Number of batch read.
524    num_batches: usize,
525    /// Duration to scan the memtable.
526    scan_cost: Duration,
527}
528
529struct Iter {
530    metadata: RegionMetadataRef,
531    series: Arc<RwLock<SeriesMap>>,
532    projection: HashSet<ColumnId>,
533    last_key: Option<Vec<u8>>,
534    predicate: Vec<SimpleFilterEvaluator>,
535    pk_schema: arrow::datatypes::SchemaRef,
536    pk_datatypes: Vec<ConcreteDataType>,
537    codec: Arc<DensePrimaryKeyCodec>,
538    dedup: bool,
539    sequence: Option<SequenceRange>,
540    metrics: Metrics,
541    mem_scan_metrics: Option<MemScanMetrics>,
542}
543
544impl Iter {
545    #[allow(clippy::too_many_arguments)]
546    pub(crate) fn try_new(
547        metadata: RegionMetadataRef,
548        series: Arc<RwLock<SeriesMap>>,
549        projection: HashSet<ColumnId>,
550        predicate: Option<Predicate>,
551        pk_schema: arrow::datatypes::SchemaRef,
552        pk_datatypes: Vec<ConcreteDataType>,
553        codec: Arc<DensePrimaryKeyCodec>,
554        dedup: bool,
555        sequence: Option<SequenceRange>,
556        mem_scan_metrics: Option<MemScanMetrics>,
557    ) -> Result<Self> {
558        let predicate = predicate
559            .map(|predicate| {
560                predicate
561                    .exprs()
562                    .iter()
563                    .filter_map(SimpleFilterEvaluator::try_new)
564                    .collect::<Vec<_>>()
565            })
566            .unwrap_or_default();
567        Ok(Self {
568            metadata,
569            series,
570            projection,
571            last_key: None,
572            predicate,
573            pk_schema,
574            pk_datatypes,
575            codec,
576            dedup,
577            sequence,
578            metrics: Metrics::default(),
579            mem_scan_metrics,
580        })
581    }
582
583    fn report_mem_scan_metrics(&mut self) {
584        if let Some(mem_scan_metrics) = self.mem_scan_metrics.take() {
585            let inner = crate::memtable::MemScanMetricsData {
586                total_series: self.metrics.total_series,
587                num_rows: self.metrics.num_rows,
588                num_batches: self.metrics.num_batches,
589                scan_cost: self.metrics.scan_cost,
590            };
591            mem_scan_metrics.merge_inner(&inner);
592        }
593    }
594}
595
596impl Drop for Iter {
597    fn drop(&mut self) {
598        debug!(
599            "Iter {} time series memtable, metrics: {:?}",
600            self.metadata.region_id, self.metrics
601        );
602
603        // Report MemScanMetrics if not already reported
604        self.report_mem_scan_metrics();
605
606        READ_ROWS_TOTAL
607            .with_label_values(&["time_series_memtable"])
608            .inc_by(self.metrics.num_rows as u64);
609        READ_STAGE_ELAPSED
610            .with_label_values(&["scan_memtable"])
611            .observe(self.metrics.scan_cost.as_secs_f64());
612    }
613}
614
615impl Iterator for Iter {
616    type Item = Result<Batch>;
617
618    fn next(&mut self) -> Option<Self::Item> {
619        let start = Instant::now();
620        let map = self.series.read().unwrap();
621        let range = match &self.last_key {
622            None => map.0.range::<Vec<u8>, _>(..),
623            Some(last_key) => map
624                .0
625                .range::<Vec<u8>, _>((Bound::Excluded(last_key), Bound::Unbounded)),
626        };
627
628        // TODO(hl): maybe yield more than one time series to amortize range overhead.
629        for (primary_key, series) in range {
630            self.metrics.total_series += 1;
631
632            let mut series = series.write().unwrap();
633            if !self.predicate.is_empty()
634                && !prune_primary_key(
635                    &self.codec,
636                    primary_key.as_slice(),
637                    &mut series,
638                    &self.pk_datatypes,
639                    self.pk_schema.clone(),
640                    &self.predicate,
641                )
642            {
643                // read next series
644                self.metrics.num_pruned_series += 1;
645                continue;
646            }
647            self.last_key = Some(primary_key.clone());
648
649            let values = series.compact(&self.metadata);
650            let batch = values.and_then(|v| {
651                v.to_batch(primary_key, &self.metadata, &self.projection, self.dedup)
652            });
653
654            // Update metrics.
655            self.metrics.num_batches += 1;
656            self.metrics.num_rows += batch.as_ref().map(|b| b.num_rows()).unwrap_or(0);
657            self.metrics.scan_cost += start.elapsed();
658
659            let mut batch = batch;
660            batch = batch.and_then(|mut batch| {
661                batch.filter_by_sequence(self.sequence)?;
662                Ok(batch)
663            });
664            return Some(batch);
665        }
666        drop(map); // Explicitly drop the read lock
667        self.metrics.scan_cost += start.elapsed();
668
669        // Report MemScanMetrics before returning None
670        self.report_mem_scan_metrics();
671
672        None
673    }
674}
675
676fn prune_primary_key(
677    codec: &Arc<DensePrimaryKeyCodec>,
678    pk: &[u8],
679    series: &mut Series,
680    datatypes: &[ConcreteDataType],
681    pk_schema: arrow::datatypes::SchemaRef,
682    predicates: &[SimpleFilterEvaluator],
683) -> bool {
684    // no primary key, we simply return true.
685    if pk_schema.fields().is_empty() {
686        return true;
687    }
688
689    // retrieve primary key values from cache or decode from bytes.
690    let pk_values = if let Some(pk_values) = series.pk_cache.as_ref() {
691        pk_values
692    } else {
693        let pk_values = codec.decode_dense_without_column_id(pk);
694        if let Err(e) = pk_values {
695            error!(e; "Failed to decode primary key");
696            return true;
697        }
698        series.update_pk_cache(pk_values.unwrap());
699        series.pk_cache.as_ref().unwrap()
700    };
701
702    // evaluate predicates against primary key values
703    let mut result = true;
704    for predicate in predicates {
705        // ignore predicates that are not referencing primary key columns
706        let Ok(index) = pk_schema.index_of(predicate.column_name()) else {
707            continue;
708        };
709        // Safety: arrow schema and datatypes are constructed from the same source.
710        let scalar_value = pk_values[index]
711            .try_to_scalar_value(&datatypes[index])
712            .unwrap();
713        result &= predicate.evaluate_scalar(&scalar_value).unwrap_or(true);
714    }
715
716    result
717}
718
719/// A `Series` holds a list of field values of some given primary key.
720pub struct Series {
721    pk_cache: Option<Vec<Value>>,
722    active: ValueBuilder,
723    frozen: Vec<Values>,
724    region_metadata: RegionMetadataRef,
725    capacity: usize,
726}
727
728impl Series {
729    pub(crate) fn with_capacity(
730        region_metadata: &RegionMetadataRef,
731        init_capacity: usize,
732        capacity: usize,
733    ) -> Self {
734        MEMTABLE_ACTIVE_SERIES_COUNT.inc();
735        Self {
736            pk_cache: None,
737            active: ValueBuilder::new(region_metadata, init_capacity),
738            frozen: vec![],
739            region_metadata: region_metadata.clone(),
740            capacity,
741        }
742    }
743
744    pub(crate) fn new(region_metadata: &RegionMetadataRef) -> Self {
745        Self::with_capacity(region_metadata, INITIAL_BUILDER_CAPACITY, BUILDER_CAPACITY)
746    }
747
748    pub fn is_empty(&self) -> bool {
749        self.active.len() == 0 && self.frozen.is_empty()
750    }
751
752    /// Pushes a row of values into Series. Return the size of values.
753    pub(crate) fn push<'a>(
754        &mut self,
755        ts: ValueRef<'a>,
756        sequence: u64,
757        op_type: OpType,
758        values: impl Iterator<Item = ValueRef<'a>>,
759    ) -> usize {
760        // + 10 to avoid potential reallocation.
761        if self.active.len() + 10 > self.capacity {
762            let region_metadata = self.region_metadata.clone();
763            self.freeze(&region_metadata);
764        }
765        self.active.push(ts, sequence, op_type as u8, values)
766    }
767
768    fn update_pk_cache(&mut self, pk_values: Vec<Value>) {
769        self.pk_cache = Some(pk_values);
770    }
771
772    /// Freezes the active part and push it to `frozen`.
773    pub(crate) fn freeze(&mut self, region_metadata: &RegionMetadataRef) {
774        if self.active.len() != 0 {
775            let mut builder = ValueBuilder::new(region_metadata, INITIAL_BUILDER_CAPACITY);
776            std::mem::swap(&mut self.active, &mut builder);
777            self.frozen.push(Values::from(builder));
778        }
779    }
780
781    pub(crate) fn extend(
782        &mut self,
783        ts_v: VectorRef,
784        op_type_v: u8,
785        sequence_v: u64,
786        fields: Vec<VectorRef>,
787    ) -> Result<()> {
788        if !self.active.can_accommodate(&fields)? {
789            let region_metadata = self.region_metadata.clone();
790            self.freeze(&region_metadata);
791        }
792        self.active.extend(ts_v, op_type_v, sequence_v, fields)
793    }
794
795    /// Freezes active part to frozen part and compact frozen part to reduce memory fragmentation.
796    /// Returns the frozen and compacted values.
797    pub(crate) fn compact(&mut self, region_metadata: &RegionMetadataRef) -> Result<&Values> {
798        self.freeze(region_metadata);
799
800        let frozen = &self.frozen;
801
802        // Each series must contain at least one row
803        debug_assert!(!frozen.is_empty());
804
805        if frozen.len() > 1 {
806            // TODO(hl): We should keep track of min/max timestamps for each values and avoid
807            // cloning and sorting when values do not overlap with each other.
808
809            let column_size = frozen[0].fields.len() + 3;
810
811            if cfg!(debug_assertions) {
812                debug_assert!(
813                    frozen
814                        .iter()
815                        .zip(frozen.iter().skip(1))
816                        .all(|(prev, next)| { prev.fields.len() == next.fields.len() })
817                );
818            }
819
820            let arrays = frozen.iter().map(|v| v.columns()).collect::<Vec<_>>();
821            let concatenated = (0..column_size)
822                .map(|i| {
823                    let to_concat = arrays.iter().map(|a| a[i].as_ref()).collect::<Vec<_>>();
824                    arrow::compute::concat(&to_concat)
825                })
826                .collect::<std::result::Result<Vec<_>, _>>()
827                .context(ComputeArrowSnafu)?;
828
829            debug_assert_eq!(concatenated.len(), column_size);
830            let values = Values::from_columns(&concatenated)?;
831            self.frozen = vec![values];
832        };
833        Ok(&self.frozen[0])
834    }
835
836    pub fn read_to_values(&self) -> Vec<Values> {
837        let mut res = Vec::with_capacity(self.frozen.len() + 1);
838        res.extend(self.frozen.iter().cloned());
839        res.push(self.active.finish_cloned());
840        res
841    }
842}
843
844/// `ValueBuilder` holds all the vector builders for field columns.
845pub(crate) struct ValueBuilder {
846    timestamp: Vec<i64>,
847    timestamp_type: ConcreteDataType,
848    sequence: Vec<u64>,
849    op_type: Vec<u8>,
850    fields: Vec<Option<FieldBuilder>>,
851    field_types: Vec<ConcreteDataType>,
852}
853
854impl ValueBuilder {
855    pub(crate) fn new(region_metadata: &RegionMetadataRef, capacity: usize) -> Self {
856        let timestamp_type = region_metadata
857            .time_index_column()
858            .column_schema
859            .data_type
860            .clone();
861        let sequence = Vec::with_capacity(capacity);
862        let op_type = Vec::with_capacity(capacity);
863
864        let field_types = region_metadata
865            .field_columns()
866            .map(|c| c.column_schema.data_type.clone())
867            .collect::<Vec<_>>();
868        let fields = (0..field_types.len()).map(|_| None).collect();
869        Self {
870            timestamp: Vec::with_capacity(capacity),
871            timestamp_type,
872            sequence,
873            op_type,
874            fields,
875            field_types,
876        }
877    }
878
879    /// Returns number of field builders.
880    pub fn num_field_builders(&self) -> usize {
881        self.fields.iter().flatten().count()
882    }
883
884    /// Pushes a new row to `ValueBuilder`.
885    /// We don't need primary keys since they've already be encoded.
886    /// Returns the size of field values.
887    ///
888    /// In this method, we don't check the data type of the value, because it is already checked in the caller.
889    pub(crate) fn push<'a>(
890        &mut self,
891        ts: ValueRef,
892        sequence: u64,
893        op_type: u8,
894        fields: impl Iterator<Item = ValueRef<'a>>,
895    ) -> usize {
896        #[cfg(debug_assertions)]
897        let fields = {
898            let field_vec = fields.collect::<Vec<_>>();
899            debug_assert_eq!(field_vec.len(), self.fields.len());
900            field_vec.into_iter()
901        };
902
903        self.timestamp
904            .push(ts.try_into_timestamp().unwrap().unwrap().value());
905        self.sequence.push(sequence);
906        self.op_type.push(op_type);
907        let num_rows = self.timestamp.len();
908        let mut size = 0;
909        for (idx, field_value) in fields.enumerate() {
910            size += field_value.data_size();
911            if !field_value.is_null() || self.fields[idx].is_some() {
912                if let Some(field) = self.fields[idx].as_mut() {
913                    let _ = field.push(field_value);
914                } else {
915                    let mut mutable_vector =
916                        if let ConcreteDataType::String(_) = &self.field_types[idx] {
917                            FieldBuilder::String(StringBuilder::with_capacity(4, 8))
918                        } else {
919                            FieldBuilder::Other(
920                                self.field_types[idx]
921                                    .create_mutable_vector(num_rows.max(INITIAL_BUILDER_CAPACITY)),
922                            )
923                        };
924                    mutable_vector.push_nulls(num_rows - 1);
925                    mutable_vector
926                        .push(field_value)
927                        .unwrap_or_else(|e| panic!("unexpected field value: {e:?}"));
928                    self.fields[idx] = Some(mutable_vector);
929                    MEMTABLE_ACTIVE_FIELD_BUILDER_COUNT.inc();
930                }
931            }
932        }
933
934        size
935    }
936
937    /// Checks if current value builder have sufficient space to accommodate `fields`.
938    /// Returns false if there is no space to accommodate fields due to offset overflow.
939    pub(crate) fn can_accommodate(&self, fields: &[VectorRef]) -> Result<bool> {
940        for (field_src, field_dest) in fields.iter().zip(self.fields.iter()) {
941            let Some(builder) = field_dest else {
942                continue;
943            };
944            let FieldBuilder::String(builder) = builder else {
945                continue;
946            };
947            let array = field_src.to_arrow_array();
948            let string_array = array
949                .as_any()
950                .downcast_ref::<StringArray>()
951                .with_context(|| error::InvalidBatchSnafu {
952                    reason: format!(
953                        "Field type mismatch, expecting String, given: {}",
954                        field_src.data_type()
955                    ),
956                })?;
957            let space_needed = string_array.value_data().len() as i32;
958            // offset may overflow
959            if builder.next_offset().checked_add(space_needed).is_none() {
960                return Ok(false);
961            }
962        }
963        Ok(true)
964    }
965
966    pub(crate) fn extend(
967        &mut self,
968        ts_v: VectorRef,
969        op_type: u8,
970        sequence: u64,
971        fields: Vec<VectorRef>,
972    ) -> Result<()> {
973        let num_rows_before = self.timestamp.len();
974        let num_rows_to_write = ts_v.len();
975        self.timestamp.reserve(num_rows_to_write);
976        match self.timestamp_type {
977            ConcreteDataType::Timestamp(TimestampType::Second(_)) => {
978                self.timestamp.extend(
979                    ts_v.as_any()
980                        .downcast_ref::<TimestampSecondVector>()
981                        .unwrap()
982                        .iter_data()
983                        .map(|v| v.unwrap().0.value()),
984                );
985            }
986            ConcreteDataType::Timestamp(TimestampType::Millisecond(_)) => {
987                self.timestamp.extend(
988                    ts_v.as_any()
989                        .downcast_ref::<TimestampMillisecondVector>()
990                        .unwrap()
991                        .iter_data()
992                        .map(|v| v.unwrap().0.value()),
993                );
994            }
995            ConcreteDataType::Timestamp(TimestampType::Microsecond(_)) => {
996                self.timestamp.extend(
997                    ts_v.as_any()
998                        .downcast_ref::<TimestampMicrosecondVector>()
999                        .unwrap()
1000                        .iter_data()
1001                        .map(|v| v.unwrap().0.value()),
1002                );
1003            }
1004            ConcreteDataType::Timestamp(TimestampType::Nanosecond(_)) => {
1005                self.timestamp.extend(
1006                    ts_v.as_any()
1007                        .downcast_ref::<TimestampNanosecondVector>()
1008                        .unwrap()
1009                        .iter_data()
1010                        .map(|v| v.unwrap().0.value()),
1011                );
1012            }
1013            _ => unreachable!(),
1014        };
1015
1016        self.op_type.reserve(num_rows_to_write);
1017        self.op_type
1018            .extend(iter::repeat_n(op_type, num_rows_to_write));
1019        self.sequence.reserve(num_rows_to_write);
1020        self.sequence
1021            .extend(iter::repeat_n(sequence, num_rows_to_write));
1022
1023        for (field_idx, (field_src, field_dest)) in
1024            fields.into_iter().zip(self.fields.iter_mut()).enumerate()
1025        {
1026            let builder = field_dest.get_or_insert_with(|| {
1027                let mut field_builder =
1028                    FieldBuilder::create(&self.field_types[field_idx], INITIAL_BUILDER_CAPACITY);
1029                field_builder.push_nulls(num_rows_before);
1030                field_builder
1031            });
1032            match builder {
1033                FieldBuilder::String(builder) => {
1034                    let array = field_src.to_arrow_array();
1035                    let string_array =
1036                        array
1037                            .as_any()
1038                            .downcast_ref::<StringArray>()
1039                            .with_context(|| error::InvalidBatchSnafu {
1040                                reason: format!(
1041                                    "Field type mismatch, expecting String, given: {}",
1042                                    field_src.data_type()
1043                                ),
1044                            })?;
1045                    builder.append_array(string_array);
1046                }
1047                FieldBuilder::Other(builder) => {
1048                    let len = field_src.len();
1049                    builder
1050                        .extend_slice_of(&*field_src, 0, len)
1051                        .context(error::ComputeVectorSnafu)?;
1052                }
1053            }
1054        }
1055        Ok(())
1056    }
1057
1058    /// Returns the length of [ValueBuilder]
1059    fn len(&self) -> usize {
1060        let sequence_len = self.sequence.len();
1061        debug_assert_eq!(sequence_len, self.op_type.len());
1062        debug_assert_eq!(sequence_len, self.timestamp.len());
1063        sequence_len
1064    }
1065
1066    fn finish_cloned(&self) -> Values {
1067        let num_rows = self.sequence.len();
1068        let fields = self
1069            .fields
1070            .iter()
1071            .enumerate()
1072            .map(|(i, v)| {
1073                if let Some(v) = v {
1074                    MEMTABLE_ACTIVE_FIELD_BUILDER_COUNT.dec();
1075                    v.finish_cloned()
1076                } else {
1077                    let mut single_null = self.field_types[i].create_mutable_vector(num_rows);
1078                    single_null.push_nulls(num_rows);
1079                    single_null.to_vector()
1080                }
1081            })
1082            .collect::<Vec<_>>();
1083
1084        let sequence = Arc::new(UInt64Vector::from_vec(self.sequence.clone()));
1085        let op_type = Arc::new(UInt8Vector::from_vec(self.op_type.clone()));
1086        let timestamp: VectorRef = match self.timestamp_type {
1087            ConcreteDataType::Timestamp(TimestampType::Second(_)) => {
1088                Arc::new(TimestampSecondVector::from_vec(self.timestamp.clone()))
1089            }
1090            ConcreteDataType::Timestamp(TimestampType::Millisecond(_)) => {
1091                Arc::new(TimestampMillisecondVector::from_vec(self.timestamp.clone()))
1092            }
1093            ConcreteDataType::Timestamp(TimestampType::Microsecond(_)) => {
1094                Arc::new(TimestampMicrosecondVector::from_vec(self.timestamp.clone()))
1095            }
1096            ConcreteDataType::Timestamp(TimestampType::Nanosecond(_)) => {
1097                Arc::new(TimestampNanosecondVector::from_vec(self.timestamp.clone()))
1098            }
1099            _ => unreachable!(),
1100        };
1101
1102        if cfg!(debug_assertions) {
1103            debug_assert_eq!(timestamp.len(), sequence.len());
1104            debug_assert_eq!(timestamp.len(), op_type.len());
1105            for field in &fields {
1106                debug_assert_eq!(timestamp.len(), field.len());
1107            }
1108        }
1109
1110        Values {
1111            timestamp,
1112            sequence,
1113            op_type,
1114            fields,
1115        }
1116    }
1117}
1118
1119/// [Values] holds an immutable vectors of field columns, including `sequence` and `op_type`.
1120#[derive(Clone)]
1121pub struct Values {
1122    pub(crate) timestamp: VectorRef,
1123    pub(crate) sequence: Arc<UInt64Vector>,
1124    pub(crate) op_type: Arc<UInt8Vector>,
1125    pub(crate) fields: Vec<VectorRef>,
1126}
1127
1128impl Values {
1129    /// Converts [Values] to `Batch`, sorts the batch according to `timestamp, sequence` desc and
1130    /// keeps only the latest row for the same timestamp.
1131    pub fn to_batch(
1132        &self,
1133        primary_key: &[u8],
1134        metadata: &RegionMetadataRef,
1135        projection: &HashSet<ColumnId>,
1136        dedup: bool,
1137    ) -> Result<Batch> {
1138        let builder = BatchBuilder::with_required_columns(
1139            primary_key.to_vec(),
1140            self.timestamp.clone(),
1141            self.sequence.clone(),
1142            self.op_type.clone(),
1143        );
1144
1145        let fields = metadata
1146            .field_columns()
1147            .zip(self.fields.iter())
1148            .filter_map(|(c, f)| {
1149                projection.get(&c.column_id).map(|c| BatchColumn {
1150                    column_id: *c,
1151                    data: f.clone(),
1152                })
1153            })
1154            .collect();
1155
1156        let mut batch = builder.with_fields(fields).build()?;
1157        batch.sort(dedup)?;
1158        Ok(batch)
1159    }
1160
1161    /// Returns a vector of all columns converted to arrow [Array](datatypes::arrow::array::Array) in [Values].
1162    fn columns(&self) -> Vec<ArrayRef> {
1163        let mut res = Vec::with_capacity(3 + self.fields.len());
1164        res.push(self.timestamp.to_arrow_array());
1165        res.push(self.sequence.to_arrow_array());
1166        res.push(self.op_type.to_arrow_array());
1167        res.extend(self.fields.iter().map(|f| f.to_arrow_array()));
1168        res
1169    }
1170
1171    /// Builds a new [Values] instance from columns.
1172    fn from_columns(cols: &[ArrayRef]) -> Result<Self> {
1173        debug_assert!(cols.len() >= 3);
1174        let timestamp = Helper::try_into_vector(&cols[0]).context(ConvertVectorSnafu)?;
1175        let sequence =
1176            Arc::new(UInt64Vector::try_from_arrow_array(&cols[1]).context(ConvertVectorSnafu)?);
1177        let op_type =
1178            Arc::new(UInt8Vector::try_from_arrow_array(&cols[2]).context(ConvertVectorSnafu)?);
1179        let fields = Helper::try_into_vectors(&cols[3..]).context(ConvertVectorSnafu)?;
1180
1181        Ok(Self {
1182            timestamp,
1183            sequence,
1184            op_type,
1185            fields,
1186        })
1187    }
1188}
1189
1190impl From<ValueBuilder> for Values {
1191    fn from(mut value: ValueBuilder) -> Self {
1192        let num_rows = value.len();
1193        let fields = value
1194            .fields
1195            .iter_mut()
1196            .enumerate()
1197            .map(|(i, v)| {
1198                if let Some(v) = v {
1199                    MEMTABLE_ACTIVE_FIELD_BUILDER_COUNT.dec();
1200                    v.finish()
1201                } else {
1202                    let mut single_null = value.field_types[i].create_mutable_vector(num_rows);
1203                    single_null.push_nulls(num_rows);
1204                    single_null.to_vector()
1205                }
1206            })
1207            .collect::<Vec<_>>();
1208
1209        let sequence = Arc::new(UInt64Vector::from_vec(value.sequence));
1210        let op_type = Arc::new(UInt8Vector::from_vec(value.op_type));
1211        let timestamp: VectorRef = match value.timestamp_type {
1212            ConcreteDataType::Timestamp(TimestampType::Second(_)) => {
1213                Arc::new(TimestampSecondVector::from_vec(value.timestamp))
1214            }
1215            ConcreteDataType::Timestamp(TimestampType::Millisecond(_)) => {
1216                Arc::new(TimestampMillisecondVector::from_vec(value.timestamp))
1217            }
1218            ConcreteDataType::Timestamp(TimestampType::Microsecond(_)) => {
1219                Arc::new(TimestampMicrosecondVector::from_vec(value.timestamp))
1220            }
1221            ConcreteDataType::Timestamp(TimestampType::Nanosecond(_)) => {
1222                Arc::new(TimestampNanosecondVector::from_vec(value.timestamp))
1223            }
1224            _ => unreachable!(),
1225        };
1226
1227        if cfg!(debug_assertions) {
1228            debug_assert_eq!(timestamp.len(), sequence.len());
1229            debug_assert_eq!(timestamp.len(), op_type.len());
1230            for field in &fields {
1231                debug_assert_eq!(timestamp.len(), field.len());
1232            }
1233        }
1234
1235        Self {
1236            timestamp,
1237            sequence,
1238            op_type,
1239            fields,
1240        }
1241    }
1242}
1243
1244struct TimeSeriesIterBuilder {
1245    series_set: SeriesSet,
1246    projection: HashSet<ColumnId>,
1247    predicate: Option<Predicate>,
1248    dedup: bool,
1249    sequence: Option<SequenceRange>,
1250    merge_mode: MergeMode,
1251}
1252
1253impl IterBuilder for TimeSeriesIterBuilder {
1254    fn build(&self, metrics: Option<MemScanMetrics>) -> Result<BoxedBatchIterator> {
1255        let iter = self.series_set.iter_series(
1256            self.projection.clone(),
1257            self.predicate.clone(),
1258            self.dedup,
1259            self.sequence,
1260            metrics,
1261        )?;
1262
1263        if self.merge_mode == MergeMode::LastNonNull {
1264            let iter = LastNonNullIter::new(iter);
1265            Ok(Box::new(iter))
1266        } else {
1267            Ok(Box::new(iter))
1268        }
1269    }
1270}
1271
1272#[cfg(test)]
1273mod tests {
1274    use std::collections::{HashMap, HashSet};
1275
1276    use api::helper::ColumnDataTypeWrapper;
1277    use api::v1::helper::row;
1278    use api::v1::value::ValueData;
1279    use api::v1::{Mutation, Rows, SemanticType};
1280    use common_time::Timestamp;
1281    use datatypes::prelude::{ConcreteDataType, ScalarVector};
1282    use datatypes::schema::ColumnSchema;
1283    use datatypes::value::{OrderedFloat, Value};
1284    use datatypes::vectors::{Float64Vector, Int64Vector, TimestampMillisecondVector};
1285    use mito_codec::row_converter::SortField;
1286    use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
1287    use store_api::storage::RegionId;
1288
1289    use super::*;
1290    use crate::test_util::column_metadata_to_column_schema;
1291
1292    fn schema_for_test() -> RegionMetadataRef {
1293        let mut builder = RegionMetadataBuilder::new(RegionId::new(123, 456));
1294        builder
1295            .push_column_metadata(ColumnMetadata {
1296                column_schema: ColumnSchema::new("k0", ConcreteDataType::string_datatype(), false),
1297                semantic_type: SemanticType::Tag,
1298                column_id: 0,
1299            })
1300            .push_column_metadata(ColumnMetadata {
1301                column_schema: ColumnSchema::new("k1", ConcreteDataType::int64_datatype(), false),
1302                semantic_type: SemanticType::Tag,
1303                column_id: 1,
1304            })
1305            .push_column_metadata(ColumnMetadata {
1306                column_schema: ColumnSchema::new(
1307                    "ts",
1308                    ConcreteDataType::timestamp_millisecond_datatype(),
1309                    false,
1310                ),
1311                semantic_type: SemanticType::Timestamp,
1312                column_id: 2,
1313            })
1314            .push_column_metadata(ColumnMetadata {
1315                column_schema: ColumnSchema::new("v0", ConcreteDataType::int64_datatype(), true),
1316                semantic_type: SemanticType::Field,
1317                column_id: 3,
1318            })
1319            .push_column_metadata(ColumnMetadata {
1320                column_schema: ColumnSchema::new("v1", ConcreteDataType::float64_datatype(), true),
1321                semantic_type: SemanticType::Field,
1322                column_id: 4,
1323            })
1324            .primary_key(vec![0, 1]);
1325        let region_metadata = builder.build().unwrap();
1326        Arc::new(region_metadata)
1327    }
1328
1329    fn ts_value_ref(val: i64) -> ValueRef<'static> {
1330        ValueRef::Timestamp(Timestamp::new_millisecond(val))
1331    }
1332
1333    fn field_value_ref(v0: i64, v1: f64) -> impl Iterator<Item = ValueRef<'static>> {
1334        vec![ValueRef::Int64(v0), ValueRef::Float64(OrderedFloat(v1))].into_iter()
1335    }
1336
1337    fn check_values(values: &Values, expect: &[(i64, u64, u8, i64, f64)]) {
1338        let ts = values
1339            .timestamp
1340            .as_any()
1341            .downcast_ref::<TimestampMillisecondVector>()
1342            .unwrap();
1343
1344        let v0 = values.fields[0]
1345            .as_any()
1346            .downcast_ref::<Int64Vector>()
1347            .unwrap();
1348        let v1 = values.fields[1]
1349            .as_any()
1350            .downcast_ref::<Float64Vector>()
1351            .unwrap();
1352        let read = ts
1353            .iter_data()
1354            .zip(values.sequence.iter_data())
1355            .zip(values.op_type.iter_data())
1356            .zip(v0.iter_data())
1357            .zip(v1.iter_data())
1358            .map(|((((ts, sequence), op_type), v0), v1)| {
1359                (
1360                    ts.unwrap().0.value(),
1361                    sequence.unwrap(),
1362                    op_type.unwrap(),
1363                    v0.unwrap(),
1364                    v1.unwrap(),
1365                )
1366            })
1367            .collect::<Vec<_>>();
1368        assert_eq!(expect, &read);
1369    }
1370
1371    #[test]
1372    fn test_series() {
1373        let region_metadata = schema_for_test();
1374        let mut series = Series::new(&region_metadata);
1375        series.push(ts_value_ref(1), 0, OpType::Put, field_value_ref(1, 10.1));
1376        series.push(ts_value_ref(2), 0, OpType::Put, field_value_ref(2, 10.2));
1377        assert_eq!(2, series.active.timestamp.len());
1378        assert_eq!(0, series.frozen.len());
1379
1380        let values = series.compact(&region_metadata).unwrap();
1381        check_values(values, &[(1, 0, 1, 1, 10.1), (2, 0, 1, 2, 10.2)]);
1382        assert_eq!(0, series.active.timestamp.len());
1383        assert_eq!(1, series.frozen.len());
1384    }
1385
1386    #[test]
1387    fn test_series_with_nulls() {
1388        let region_metadata = schema_for_test();
1389        let mut series = Series::new(&region_metadata);
1390        // col1: NULL 1 2 3
1391        // col2: NULL NULL 10.2 NULL
1392        series.push(
1393            ts_value_ref(1),
1394            0,
1395            OpType::Put,
1396            vec![ValueRef::Null, ValueRef::Null].into_iter(),
1397        );
1398        series.push(
1399            ts_value_ref(1),
1400            0,
1401            OpType::Put,
1402            vec![ValueRef::Int64(1), ValueRef::Null].into_iter(),
1403        );
1404        series.push(ts_value_ref(1), 2, OpType::Put, field_value_ref(2, 10.2));
1405        series.push(
1406            ts_value_ref(1),
1407            3,
1408            OpType::Put,
1409            vec![ValueRef::Int64(2), ValueRef::Null].into_iter(),
1410        );
1411        assert_eq!(4, series.active.timestamp.len());
1412        assert_eq!(0, series.frozen.len());
1413
1414        let values = series.compact(&region_metadata).unwrap();
1415        assert_eq!(values.fields[0].null_count(), 1);
1416        assert_eq!(values.fields[1].null_count(), 3);
1417        assert_eq!(0, series.active.timestamp.len());
1418        assert_eq!(1, series.frozen.len());
1419    }
1420
1421    fn check_value(batch: &Batch, expect: Vec<Vec<Value>>) {
1422        let ts_len = batch.timestamps().len();
1423        assert_eq!(batch.sequences().len(), ts_len);
1424        assert_eq!(batch.op_types().len(), ts_len);
1425        for f in batch.fields() {
1426            assert_eq!(f.data.len(), ts_len);
1427        }
1428
1429        let mut rows = vec![];
1430        for idx in 0..ts_len {
1431            let mut row = Vec::with_capacity(batch.fields().len() + 3);
1432            row.push(batch.timestamps().get(idx));
1433            row.push(batch.sequences().get(idx));
1434            row.push(batch.op_types().get(idx));
1435            row.extend(batch.fields().iter().map(|f| f.data.get(idx)));
1436            rows.push(row);
1437        }
1438
1439        assert_eq!(expect.len(), rows.len());
1440        for (idx, row) in rows.iter().enumerate() {
1441            assert_eq!(&expect[idx], row);
1442        }
1443    }
1444
1445    #[test]
1446    fn test_values_sort() {
1447        let schema = schema_for_test();
1448        let timestamp = Arc::new(TimestampMillisecondVector::from_vec(vec![1, 2, 3, 4, 3]));
1449        let sequence = Arc::new(UInt64Vector::from_vec(vec![1, 1, 1, 1, 2]));
1450        let op_type = Arc::new(UInt8Vector::from_vec(vec![1, 1, 1, 1, 0]));
1451
1452        let fields = vec![
1453            Arc::new(Int64Vector::from_vec(vec![4, 3, 2, 1, 2])) as Arc<_>,
1454            Arc::new(Float64Vector::from_vec(vec![1.1, 2.1, 4.2, 3.3, 4.2])) as Arc<_>,
1455        ];
1456        let values = Values {
1457            timestamp: timestamp as Arc<_>,
1458            sequence,
1459            op_type,
1460            fields,
1461        };
1462
1463        let batch = values
1464            .to_batch(
1465                b"test",
1466                &schema,
1467                &[0, 1, 2, 3, 4].into_iter().collect(),
1468                true,
1469            )
1470            .unwrap();
1471        check_value(
1472            &batch,
1473            vec![
1474                vec![
1475                    Value::Timestamp(Timestamp::new_millisecond(1)),
1476                    Value::UInt64(1),
1477                    Value::UInt8(1),
1478                    Value::Int64(4),
1479                    Value::Float64(OrderedFloat(1.1)),
1480                ],
1481                vec![
1482                    Value::Timestamp(Timestamp::new_millisecond(2)),
1483                    Value::UInt64(1),
1484                    Value::UInt8(1),
1485                    Value::Int64(3),
1486                    Value::Float64(OrderedFloat(2.1)),
1487                ],
1488                vec![
1489                    Value::Timestamp(Timestamp::new_millisecond(3)),
1490                    Value::UInt64(2),
1491                    Value::UInt8(0),
1492                    Value::Int64(2),
1493                    Value::Float64(OrderedFloat(4.2)),
1494                ],
1495                vec![
1496                    Value::Timestamp(Timestamp::new_millisecond(4)),
1497                    Value::UInt64(1),
1498                    Value::UInt8(1),
1499                    Value::Int64(1),
1500                    Value::Float64(OrderedFloat(3.3)),
1501                ],
1502            ],
1503        )
1504    }
1505
1506    fn build_key_values(schema: &RegionMetadataRef, k0: String, k1: i64, len: usize) -> KeyValues {
1507        let column_schema = schema
1508            .column_metadatas
1509            .iter()
1510            .map(|c| api::v1::ColumnSchema {
1511                column_name: c.column_schema.name.clone(),
1512                datatype: ColumnDataTypeWrapper::try_from(c.column_schema.data_type.clone())
1513                    .unwrap()
1514                    .datatype() as i32,
1515                semantic_type: c.semantic_type as i32,
1516                ..Default::default()
1517            })
1518            .collect();
1519
1520        let rows = (0..len)
1521            .map(|i| {
1522                row(vec![
1523                    ValueData::StringValue(k0.clone()),
1524                    ValueData::I64Value(k1),
1525                    ValueData::TimestampMillisecondValue(i as i64),
1526                    ValueData::I64Value(i as i64),
1527                    ValueData::F64Value(i as f64),
1528                ])
1529            })
1530            .collect();
1531        let mutation = api::v1::Mutation {
1532            op_type: 1,
1533            sequence: 0,
1534            rows: Some(Rows {
1535                schema: column_schema,
1536                rows,
1537            }),
1538            write_hint: None,
1539        };
1540        KeyValues::new(schema.as_ref(), mutation).unwrap()
1541    }
1542
1543    #[test]
1544    fn test_series_set_concurrency() {
1545        let schema = schema_for_test();
1546        let row_codec = Arc::new(DensePrimaryKeyCodec::with_fields(
1547            schema
1548                .primary_key_columns()
1549                .map(|c| {
1550                    (
1551                        c.column_id,
1552                        SortField::new(c.column_schema.data_type.clone()),
1553                    )
1554                })
1555                .collect(),
1556        ));
1557        let set = Arc::new(SeriesSet::new(schema.clone(), row_codec));
1558
1559        let concurrency = 32;
1560        let pk_num = concurrency * 2;
1561        let mut handles = Vec::with_capacity(concurrency);
1562        for i in 0..concurrency {
1563            let set = set.clone();
1564            let schema = schema.clone();
1565            let column_schemas = schema
1566                .column_metadatas
1567                .iter()
1568                .map(column_metadata_to_column_schema)
1569                .collect::<Vec<_>>();
1570            let handle = std::thread::spawn(move || {
1571                for j in i * 100..(i + 1) * 100 {
1572                    let pk = j % pk_num;
1573                    let primary_key = format!("pk-{}", pk).as_bytes().to_vec();
1574
1575                    let kvs = KeyValues::new(
1576                        &schema,
1577                        Mutation {
1578                            op_type: OpType::Put as i32,
1579                            sequence: j as u64,
1580                            rows: Some(Rows {
1581                                schema: column_schemas.clone(),
1582                                rows: vec![row(vec![
1583                                    ValueData::StringValue(format!("{}", j)),
1584                                    ValueData::I64Value(j as i64),
1585                                    ValueData::TimestampMillisecondValue(j as i64),
1586                                    ValueData::I64Value(j as i64),
1587                                    ValueData::F64Value(j as f64),
1588                                ])],
1589                            }),
1590                            write_hint: None,
1591                        },
1592                    )
1593                    .unwrap();
1594                    set.push_to_series(primary_key, &kvs.iter().next().unwrap());
1595                }
1596            });
1597            handles.push(handle);
1598        }
1599        for h in handles {
1600            h.join().unwrap();
1601        }
1602
1603        let mut timestamps = Vec::with_capacity(concurrency * 100);
1604        let mut sequences = Vec::with_capacity(concurrency * 100);
1605        let mut op_types = Vec::with_capacity(concurrency * 100);
1606        let mut v0 = Vec::with_capacity(concurrency * 100);
1607
1608        for i in 0..pk_num {
1609            let pk = format!("pk-{}", i).as_bytes().to_vec();
1610            let series = set.get_series(&pk).unwrap();
1611            let mut guard = series.write().unwrap();
1612            let values = guard.compact(&schema).unwrap();
1613            timestamps.extend(values.sequence.iter_data().map(|v| v.unwrap() as i64));
1614            sequences.extend(values.sequence.iter_data().map(|v| v.unwrap() as i64));
1615            op_types.extend(values.op_type.iter_data().map(|v| v.unwrap()));
1616            v0.extend(
1617                values
1618                    .fields
1619                    .first()
1620                    .unwrap()
1621                    .as_any()
1622                    .downcast_ref::<Int64Vector>()
1623                    .unwrap()
1624                    .iter_data()
1625                    .map(|v| v.unwrap()),
1626            );
1627        }
1628
1629        let expected_sequence = (0..(concurrency * 100) as i64).collect::<HashSet<_>>();
1630        assert_eq!(
1631            expected_sequence,
1632            sequences.iter().copied().collect::<HashSet<_>>()
1633        );
1634
1635        op_types.iter().all(|op| *op == OpType::Put as u8);
1636        assert_eq!(
1637            expected_sequence,
1638            timestamps.iter().copied().collect::<HashSet<_>>()
1639        );
1640
1641        assert_eq!(timestamps, sequences);
1642        assert_eq!(v0, timestamps);
1643    }
1644
1645    #[test]
1646    fn test_memtable() {
1647        common_telemetry::init_default_ut_logging();
1648        check_memtable_dedup(true);
1649        check_memtable_dedup(false);
1650    }
1651
1652    fn check_memtable_dedup(dedup: bool) {
1653        let schema = schema_for_test();
1654        let kvs = build_key_values(&schema, "hello".to_string(), 42, 100);
1655        let memtable = TimeSeriesMemtable::new(schema, 42, None, dedup, MergeMode::LastRow);
1656        memtable.write(&kvs).unwrap();
1657        memtable.write(&kvs).unwrap();
1658
1659        let mut expected_ts: HashMap<i64, usize> = HashMap::new();
1660        for ts in kvs.iter().map(|kv| {
1661            kv.timestamp()
1662                .try_into_timestamp()
1663                .unwrap()
1664                .unwrap()
1665                .value()
1666        }) {
1667            *expected_ts.entry(ts).or_default() += if dedup { 1 } else { 2 };
1668        }
1669
1670        let iter = memtable.iter(None, None, None).unwrap();
1671        let mut read = HashMap::new();
1672
1673        for ts in iter
1674            .flat_map(|batch| {
1675                batch
1676                    .unwrap()
1677                    .timestamps()
1678                    .as_any()
1679                    .downcast_ref::<TimestampMillisecondVector>()
1680                    .unwrap()
1681                    .iter_data()
1682                    .collect::<Vec<_>>()
1683                    .into_iter()
1684            })
1685            .map(|v| v.unwrap().0.value())
1686        {
1687            *read.entry(ts).or_default() += 1;
1688        }
1689        assert_eq!(expected_ts, read);
1690
1691        let stats = memtable.stats();
1692        assert!(stats.bytes_allocated() > 0);
1693        assert_eq!(
1694            Some((
1695                Timestamp::new_millisecond(0),
1696                Timestamp::new_millisecond(99)
1697            )),
1698            stats.time_range()
1699        );
1700    }
1701
1702    #[test]
1703    fn test_memtable_projection() {
1704        common_telemetry::init_default_ut_logging();
1705        let schema = schema_for_test();
1706        let kvs = build_key_values(&schema, "hello".to_string(), 42, 100);
1707        let memtable = TimeSeriesMemtable::new(schema, 42, None, true, MergeMode::LastRow);
1708        memtable.write(&kvs).unwrap();
1709
1710        let iter = memtable.iter(Some(&[3]), None, None).unwrap();
1711
1712        let mut v0_all = vec![];
1713
1714        for res in iter {
1715            let batch = res.unwrap();
1716            assert_eq!(1, batch.fields().len());
1717            let v0 = batch
1718                .fields()
1719                .first()
1720                .unwrap()
1721                .data
1722                .as_any()
1723                .downcast_ref::<Int64Vector>()
1724                .unwrap();
1725            v0_all.extend(v0.iter_data().map(|v| v.unwrap()));
1726        }
1727        assert_eq!((0..100i64).collect::<Vec<_>>(), v0_all);
1728    }
1729
1730    #[test]
1731    fn test_memtable_concurrent_write_read() {
1732        common_telemetry::init_default_ut_logging();
1733        let schema = schema_for_test();
1734        let memtable = Arc::new(TimeSeriesMemtable::new(
1735            schema.clone(),
1736            42,
1737            None,
1738            true,
1739            MergeMode::LastRow,
1740        ));
1741
1742        // Number of writer threads
1743        let num_writers = 10;
1744        // Number of reader threads
1745        let num_readers = 5;
1746        // Number of series per writer
1747        let series_per_writer = 100;
1748        // Number of rows per series
1749        let rows_per_series = 10;
1750        // Total number of series
1751        let total_series = num_writers * series_per_writer;
1752
1753        // Create a barrier to synchronize the start of all threads
1754        let barrier = Arc::new(std::sync::Barrier::new(num_writers + num_readers + 1));
1755
1756        // Spawn writer threads
1757        let mut writer_handles = Vec::with_capacity(num_writers);
1758        for writer_id in 0..num_writers {
1759            let memtable = memtable.clone();
1760            let schema = schema.clone();
1761            let barrier = barrier.clone();
1762
1763            let handle = std::thread::spawn(move || {
1764                // Wait for all threads to be ready
1765                barrier.wait();
1766
1767                // Create and write series
1768                for series_id in 0..series_per_writer {
1769                    let series_key = format!("writer-{}-series-{}", writer_id, series_id);
1770                    let kvs =
1771                        build_key_values(&schema, series_key, series_id as i64, rows_per_series);
1772                    memtable.write(&kvs).unwrap();
1773                }
1774            });
1775
1776            writer_handles.push(handle);
1777        }
1778
1779        // Spawn reader threads
1780        let mut reader_handles = Vec::with_capacity(num_readers);
1781        for _ in 0..num_readers {
1782            let memtable = memtable.clone();
1783            let barrier = barrier.clone();
1784
1785            let handle = std::thread::spawn(move || {
1786                barrier.wait();
1787
1788                for _ in 0..10 {
1789                    let iter = memtable.iter(None, None, None).unwrap();
1790                    for batch_result in iter {
1791                        let _ = batch_result.unwrap();
1792                    }
1793                }
1794            });
1795
1796            reader_handles.push(handle);
1797        }
1798
1799        barrier.wait();
1800
1801        for handle in writer_handles {
1802            handle.join().unwrap();
1803        }
1804        for handle in reader_handles {
1805            handle.join().unwrap();
1806        }
1807
1808        let iter = memtable.iter(None, None, None).unwrap();
1809        let mut series_count = 0;
1810        let mut row_count = 0;
1811
1812        for batch_result in iter {
1813            let batch = batch_result.unwrap();
1814            series_count += 1;
1815            row_count += batch.num_rows();
1816        }
1817        assert_eq!(total_series, series_count);
1818        assert_eq!(total_series * rows_per_series, row_count);
1819    }
1820}