mito2/memtable/
time_series.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::btree_map::Entry;
16use std::collections::{BTreeMap, Bound, HashSet};
17use std::fmt::{Debug, Formatter};
18use std::iter;
19use std::sync::atomic::{AtomicI64, AtomicU64, AtomicUsize, Ordering};
20use std::sync::{Arc, RwLock};
21use std::time::{Duration, Instant};
22
23use api::v1::OpType;
24use common_recordbatch::filter::SimpleFilterEvaluator;
25use common_telemetry::{debug, error};
26use common_time::Timestamp;
27use datatypes::arrow;
28use datatypes::arrow::array::ArrayRef;
29use datatypes::arrow_array::StringArray;
30use datatypes::data_type::{ConcreteDataType, DataType};
31use datatypes::prelude::{ScalarVector, Vector, VectorRef};
32use datatypes::types::TimestampType;
33use datatypes::value::{Value, ValueRef};
34use datatypes::vectors::{
35    Helper, TimestampMicrosecondVector, TimestampMillisecondVector, TimestampNanosecondVector,
36    TimestampSecondVector, UInt8Vector, UInt64Vector,
37};
38use mito_codec::key_values::KeyValue;
39use mito_codec::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt};
40use snafu::{OptionExt, ResultExt, ensure};
41use store_api::metadata::RegionMetadataRef;
42use store_api::storage::{ColumnId, SequenceNumber};
43use table::predicate::Predicate;
44
45use crate::error::{
46    self, ComputeArrowSnafu, ConvertVectorSnafu, EncodeSnafu, PrimaryKeyLengthMismatchSnafu, Result,
47};
48use crate::flush::WriteBufferManagerRef;
49use crate::memtable::builder::{FieldBuilder, StringBuilder};
50use crate::memtable::bulk::part::BulkPart;
51use crate::memtable::simple_bulk_memtable::SimpleBulkMemtable;
52use crate::memtable::stats::WriteMetrics;
53use crate::memtable::{
54    AllocTracker, BoxedBatchIterator, IterBuilder, KeyValues, MemScanMetrics, Memtable,
55    MemtableBuilder, MemtableId, MemtableRange, MemtableRangeContext, MemtableRanges, MemtableRef,
56    MemtableStats, PredicateGroup,
57};
58use crate::metrics::{
59    MEMTABLE_ACTIVE_FIELD_BUILDER_COUNT, MEMTABLE_ACTIVE_SERIES_COUNT, READ_ROWS_TOTAL,
60    READ_STAGE_ELAPSED,
61};
62use crate::read::dedup::LastNonNullIter;
63use crate::read::{Batch, BatchBuilder, BatchColumn};
64use crate::region::options::MergeMode;
65
66/// Initial vector builder capacity.
67const INITIAL_BUILDER_CAPACITY: usize = 4;
68
69/// Vector builder capacity.
70const BUILDER_CAPACITY: usize = 512;
71
72/// Builder to build [TimeSeriesMemtable].
73#[derive(Debug, Default)]
74pub struct TimeSeriesMemtableBuilder {
75    write_buffer_manager: Option<WriteBufferManagerRef>,
76    dedup: bool,
77    merge_mode: MergeMode,
78}
79
80impl TimeSeriesMemtableBuilder {
81    /// Creates a new builder with specific `write_buffer_manager`.
82    pub fn new(
83        write_buffer_manager: Option<WriteBufferManagerRef>,
84        dedup: bool,
85        merge_mode: MergeMode,
86    ) -> Self {
87        Self {
88            write_buffer_manager,
89            dedup,
90            merge_mode,
91        }
92    }
93}
94
95impl MemtableBuilder for TimeSeriesMemtableBuilder {
96    fn build(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef {
97        if metadata.primary_key.is_empty() {
98            Arc::new(SimpleBulkMemtable::new(
99                id,
100                metadata.clone(),
101                self.write_buffer_manager.clone(),
102                self.dedup,
103                self.merge_mode,
104            ))
105        } else {
106            Arc::new(TimeSeriesMemtable::new(
107                metadata.clone(),
108                id,
109                self.write_buffer_manager.clone(),
110                self.dedup,
111                self.merge_mode,
112            ))
113        }
114    }
115
116    fn use_bulk_insert(&self, _metadata: &RegionMetadataRef) -> bool {
117        // Now if we can use simple bulk memtable, the input request is already
118        // a bulk write request and won't call this method.
119        false
120    }
121}
122
123/// Memtable implementation that groups rows by their primary key.
124pub struct TimeSeriesMemtable {
125    id: MemtableId,
126    region_metadata: RegionMetadataRef,
127    row_codec: Arc<DensePrimaryKeyCodec>,
128    series_set: SeriesSet,
129    alloc_tracker: AllocTracker,
130    max_timestamp: AtomicI64,
131    min_timestamp: AtomicI64,
132    max_sequence: AtomicU64,
133    dedup: bool,
134    merge_mode: MergeMode,
135    /// Total written rows in memtable. This also includes deleted and duplicated rows.
136    num_rows: AtomicUsize,
137}
138
139impl TimeSeriesMemtable {
140    pub fn new(
141        region_metadata: RegionMetadataRef,
142        id: MemtableId,
143        write_buffer_manager: Option<WriteBufferManagerRef>,
144        dedup: bool,
145        merge_mode: MergeMode,
146    ) -> Self {
147        let row_codec = Arc::new(DensePrimaryKeyCodec::new(&region_metadata));
148        let series_set = SeriesSet::new(region_metadata.clone(), row_codec.clone());
149        let dedup = if merge_mode == MergeMode::LastNonNull {
150            false
151        } else {
152            dedup
153        };
154        Self {
155            id,
156            region_metadata,
157            series_set,
158            row_codec,
159            alloc_tracker: AllocTracker::new(write_buffer_manager),
160            max_timestamp: AtomicI64::new(i64::MIN),
161            min_timestamp: AtomicI64::new(i64::MAX),
162            max_sequence: AtomicU64::new(0),
163            dedup,
164            merge_mode,
165            num_rows: Default::default(),
166        }
167    }
168
169    /// Updates memtable stats.
170    fn update_stats(&self, stats: WriteMetrics) {
171        self.alloc_tracker
172            .on_allocation(stats.key_bytes + stats.value_bytes);
173        self.max_timestamp.fetch_max(stats.max_ts, Ordering::SeqCst);
174        self.min_timestamp.fetch_min(stats.min_ts, Ordering::SeqCst);
175        self.max_sequence
176            .fetch_max(stats.max_sequence, Ordering::SeqCst);
177        self.num_rows.fetch_add(stats.num_rows, Ordering::SeqCst);
178    }
179
180    fn write_key_value(&self, kv: KeyValue, stats: &mut WriteMetrics) -> Result<()> {
181        ensure!(
182            self.row_codec.num_fields() == kv.num_primary_keys(),
183            PrimaryKeyLengthMismatchSnafu {
184                expect: self.row_codec.num_fields(),
185                actual: kv.num_primary_keys(),
186            }
187        );
188
189        let primary_key_encoded = self
190            .row_codec
191            .encode(kv.primary_keys())
192            .context(EncodeSnafu)?;
193
194        let (key_allocated, value_allocated) =
195            self.series_set.push_to_series(primary_key_encoded, &kv);
196        stats.key_bytes += key_allocated;
197        stats.value_bytes += value_allocated;
198
199        // safety: timestamp of kv must be both present and a valid timestamp value.
200        let ts = kv.timestamp().as_timestamp().unwrap().unwrap().value();
201        stats.min_ts = stats.min_ts.min(ts);
202        stats.max_ts = stats.max_ts.max(ts);
203        Ok(())
204    }
205}
206
207impl Debug for TimeSeriesMemtable {
208    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
209        f.debug_struct("TimeSeriesMemtable").finish()
210    }
211}
212
213impl Memtable for TimeSeriesMemtable {
214    fn id(&self) -> MemtableId {
215        self.id
216    }
217
218    fn write(&self, kvs: &KeyValues) -> Result<()> {
219        if kvs.is_empty() {
220            return Ok(());
221        }
222
223        let mut local_stats = WriteMetrics::default();
224
225        for kv in kvs.iter() {
226            self.write_key_value(kv, &mut local_stats)?;
227        }
228        local_stats.value_bytes += kvs.num_rows() * std::mem::size_of::<Timestamp>();
229        local_stats.value_bytes += kvs.num_rows() * std::mem::size_of::<OpType>();
230        local_stats.max_sequence = kvs.max_sequence();
231        local_stats.num_rows = kvs.num_rows();
232        // TODO(hl): this maybe inaccurate since for-iteration may return early.
233        // We may lift the primary key length check out of Memtable::write
234        // so that we can ensure writing to memtable will succeed.
235        self.update_stats(local_stats);
236        Ok(())
237    }
238
239    fn write_one(&self, key_value: KeyValue) -> Result<()> {
240        let mut metrics = WriteMetrics::default();
241        let res = self.write_key_value(key_value, &mut metrics);
242        metrics.value_bytes += std::mem::size_of::<Timestamp>() + std::mem::size_of::<OpType>();
243        metrics.max_sequence = key_value.sequence();
244        metrics.num_rows = 1;
245
246        if res.is_ok() {
247            self.update_stats(metrics);
248        }
249        res
250    }
251
252    fn write_bulk(&self, part: BulkPart) -> Result<()> {
253        // Default implementation fallback to row iteration.
254        let mutation = part.to_mutation(&self.region_metadata)?;
255        let mut metrics = WriteMetrics::default();
256        if let Some(key_values) = KeyValues::new(&self.region_metadata, mutation) {
257            for kv in key_values.iter() {
258                self.write_key_value(kv, &mut metrics)?
259            }
260        }
261
262        metrics.max_sequence = part.sequence;
263        metrics.max_ts = part.max_timestamp;
264        metrics.min_ts = part.min_timestamp;
265        metrics.num_rows = part.num_rows();
266        self.update_stats(metrics);
267        Ok(())
268    }
269
270    #[cfg(any(test, feature = "test"))]
271    fn iter(
272        &self,
273        projection: Option<&[ColumnId]>,
274        filters: Option<Predicate>,
275        sequence: Option<SequenceNumber>,
276    ) -> Result<BoxedBatchIterator> {
277        let projection = if let Some(projection) = projection {
278            projection.iter().copied().collect()
279        } else {
280            self.region_metadata
281                .field_columns()
282                .map(|c| c.column_id)
283                .collect()
284        };
285
286        let iter = self
287            .series_set
288            .iter_series(projection, filters, self.dedup, sequence, None)?;
289
290        if self.merge_mode == MergeMode::LastNonNull {
291            let iter = LastNonNullIter::new(iter);
292            Ok(Box::new(iter))
293        } else {
294            Ok(Box::new(iter))
295        }
296    }
297
298    fn ranges(
299        &self,
300        projection: Option<&[ColumnId]>,
301        predicate: PredicateGroup,
302        sequence: Option<SequenceNumber>,
303        _for_flush: bool,
304    ) -> Result<MemtableRanges> {
305        let projection = if let Some(projection) = projection {
306            projection.iter().copied().collect()
307        } else {
308            self.region_metadata
309                .field_columns()
310                .map(|c| c.column_id)
311                .collect()
312        };
313        let builder = Box::new(TimeSeriesIterBuilder {
314            series_set: self.series_set.clone(),
315            projection,
316            predicate: predicate.predicate().cloned(),
317            dedup: self.dedup,
318            merge_mode: self.merge_mode,
319            sequence,
320        });
321        let context = Arc::new(MemtableRangeContext::new(self.id, builder, predicate));
322
323        let stats = self.stats();
324        Ok(MemtableRanges {
325            ranges: [(0, MemtableRange::new(context, stats.num_rows))].into(),
326            stats,
327        })
328    }
329
330    fn is_empty(&self) -> bool {
331        self.series_set.series.read().unwrap().0.is_empty()
332    }
333
334    fn freeze(&self) -> Result<()> {
335        self.alloc_tracker.done_allocating();
336
337        Ok(())
338    }
339
340    fn stats(&self) -> MemtableStats {
341        let estimated_bytes = self.alloc_tracker.bytes_allocated();
342
343        if estimated_bytes == 0 {
344            // no rows ever written
345            return MemtableStats {
346                estimated_bytes,
347                time_range: None,
348                num_rows: 0,
349                num_ranges: 0,
350                max_sequence: 0,
351                series_count: 0,
352            };
353        }
354        let ts_type = self
355            .region_metadata
356            .time_index_column()
357            .column_schema
358            .data_type
359            .clone()
360            .as_timestamp()
361            .expect("Timestamp column must have timestamp type");
362        let max_timestamp = ts_type.create_timestamp(self.max_timestamp.load(Ordering::Relaxed));
363        let min_timestamp = ts_type.create_timestamp(self.min_timestamp.load(Ordering::Relaxed));
364        let series_count = self.series_set.series.read().unwrap().0.len();
365        MemtableStats {
366            estimated_bytes,
367            time_range: Some((min_timestamp, max_timestamp)),
368            num_rows: self.num_rows.load(Ordering::Relaxed),
369            num_ranges: 1,
370            max_sequence: self.max_sequence.load(Ordering::Relaxed),
371            series_count,
372        }
373    }
374
375    fn fork(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef {
376        Arc::new(TimeSeriesMemtable::new(
377            metadata.clone(),
378            id,
379            self.alloc_tracker.write_buffer_manager(),
380            self.dedup,
381            self.merge_mode,
382        ))
383    }
384}
385
386#[derive(Default)]
387struct SeriesMap(BTreeMap<Vec<u8>, Arc<RwLock<Series>>>);
388
389impl Drop for SeriesMap {
390    fn drop(&mut self) {
391        let num_series = self.0.len();
392        let num_field_builders = self
393            .0
394            .values()
395            .map(|v| v.read().unwrap().active.num_field_builders())
396            .sum::<usize>();
397        MEMTABLE_ACTIVE_SERIES_COUNT.sub(num_series as i64);
398        MEMTABLE_ACTIVE_FIELD_BUILDER_COUNT.sub(num_field_builders as i64);
399    }
400}
401
402#[derive(Clone)]
403pub(crate) struct SeriesSet {
404    region_metadata: RegionMetadataRef,
405    series: Arc<RwLock<SeriesMap>>,
406    codec: Arc<DensePrimaryKeyCodec>,
407}
408
409impl SeriesSet {
410    fn new(region_metadata: RegionMetadataRef, codec: Arc<DensePrimaryKeyCodec>) -> Self {
411        Self {
412            region_metadata,
413            series: Default::default(),
414            codec,
415        }
416    }
417}
418
419impl SeriesSet {
420    /// Push [KeyValue] to SeriesSet with given primary key and return key/value allocated memory size.
421    fn push_to_series(&self, primary_key: Vec<u8>, kv: &KeyValue) -> (usize, usize) {
422        if let Some(series) = self.series.read().unwrap().0.get(&primary_key) {
423            let value_allocated = series.write().unwrap().push(
424                kv.timestamp(),
425                kv.sequence(),
426                kv.op_type(),
427                kv.fields(),
428            );
429            return (0, value_allocated);
430        };
431
432        let mut indices = self.series.write().unwrap();
433        match indices.0.entry(primary_key) {
434            Entry::Vacant(v) => {
435                let key_len = v.key().len();
436                let mut series = Series::new(&self.region_metadata);
437                let value_allocated =
438                    series.push(kv.timestamp(), kv.sequence(), kv.op_type(), kv.fields());
439                v.insert(Arc::new(RwLock::new(series)));
440                (key_len, value_allocated)
441            }
442            // safety: series must exist at given index.
443            Entry::Occupied(v) => {
444                let value_allocated = v.get().write().unwrap().push(
445                    kv.timestamp(),
446                    kv.sequence(),
447                    kv.op_type(),
448                    kv.fields(),
449                );
450                (0, value_allocated)
451            }
452        }
453    }
454
455    #[cfg(test)]
456    fn get_series(&self, primary_key: &[u8]) -> Option<Arc<RwLock<Series>>> {
457        self.series.read().unwrap().0.get(primary_key).cloned()
458    }
459
460    /// Iterates all series in [SeriesSet].
461    fn iter_series(
462        &self,
463        projection: HashSet<ColumnId>,
464        predicate: Option<Predicate>,
465        dedup: bool,
466        sequence: Option<SequenceNumber>,
467        mem_scan_metrics: Option<MemScanMetrics>,
468    ) -> Result<Iter> {
469        let primary_key_schema = primary_key_schema(&self.region_metadata);
470        let primary_key_datatypes = self
471            .region_metadata
472            .primary_key_columns()
473            .map(|pk| pk.column_schema.data_type.clone())
474            .collect();
475
476        Iter::try_new(
477            self.region_metadata.clone(),
478            self.series.clone(),
479            projection,
480            predicate,
481            primary_key_schema,
482            primary_key_datatypes,
483            self.codec.clone(),
484            dedup,
485            sequence,
486            mem_scan_metrics,
487        )
488    }
489}
490
491/// Creates an arrow [SchemaRef](arrow::datatypes::SchemaRef) that only contains primary keys
492/// of given region schema
493pub(crate) fn primary_key_schema(
494    region_metadata: &RegionMetadataRef,
495) -> arrow::datatypes::SchemaRef {
496    let fields = region_metadata
497        .primary_key_columns()
498        .map(|pk| {
499            arrow::datatypes::Field::new(
500                pk.column_schema.name.clone(),
501                pk.column_schema.data_type.as_arrow_type(),
502                pk.column_schema.is_nullable(),
503            )
504        })
505        .collect::<Vec<_>>();
506    Arc::new(arrow::datatypes::Schema::new(fields))
507}
508
509/// Metrics for reading the memtable.
510#[derive(Debug, Default)]
511struct Metrics {
512    /// Total series in the memtable.
513    total_series: usize,
514    /// Number of series pruned.
515    num_pruned_series: usize,
516    /// Number of rows read.
517    num_rows: usize,
518    /// Number of batch read.
519    num_batches: usize,
520    /// Duration to scan the memtable.
521    scan_cost: Duration,
522}
523
524struct Iter {
525    metadata: RegionMetadataRef,
526    series: Arc<RwLock<SeriesMap>>,
527    projection: HashSet<ColumnId>,
528    last_key: Option<Vec<u8>>,
529    predicate: Vec<SimpleFilterEvaluator>,
530    pk_schema: arrow::datatypes::SchemaRef,
531    pk_datatypes: Vec<ConcreteDataType>,
532    codec: Arc<DensePrimaryKeyCodec>,
533    dedup: bool,
534    sequence: Option<SequenceNumber>,
535    metrics: Metrics,
536    mem_scan_metrics: Option<MemScanMetrics>,
537}
538
539impl Iter {
540    #[allow(clippy::too_many_arguments)]
541    pub(crate) fn try_new(
542        metadata: RegionMetadataRef,
543        series: Arc<RwLock<SeriesMap>>,
544        projection: HashSet<ColumnId>,
545        predicate: Option<Predicate>,
546        pk_schema: arrow::datatypes::SchemaRef,
547        pk_datatypes: Vec<ConcreteDataType>,
548        codec: Arc<DensePrimaryKeyCodec>,
549        dedup: bool,
550        sequence: Option<SequenceNumber>,
551        mem_scan_metrics: Option<MemScanMetrics>,
552    ) -> Result<Self> {
553        let predicate = predicate
554            .map(|predicate| {
555                predicate
556                    .exprs()
557                    .iter()
558                    .filter_map(SimpleFilterEvaluator::try_new)
559                    .collect::<Vec<_>>()
560            })
561            .unwrap_or_default();
562        Ok(Self {
563            metadata,
564            series,
565            projection,
566            last_key: None,
567            predicate,
568            pk_schema,
569            pk_datatypes,
570            codec,
571            dedup,
572            sequence,
573            metrics: Metrics::default(),
574            mem_scan_metrics,
575        })
576    }
577
578    fn report_mem_scan_metrics(&mut self) {
579        if let Some(mem_scan_metrics) = self.mem_scan_metrics.take() {
580            let inner = crate::memtable::MemScanMetricsData {
581                total_series: self.metrics.total_series,
582                num_rows: self.metrics.num_rows,
583                num_batches: self.metrics.num_batches,
584                scan_cost: self.metrics.scan_cost,
585            };
586            mem_scan_metrics.merge_inner(&inner);
587        }
588    }
589}
590
591impl Drop for Iter {
592    fn drop(&mut self) {
593        debug!(
594            "Iter {} time series memtable, metrics: {:?}",
595            self.metadata.region_id, self.metrics
596        );
597
598        // Report MemScanMetrics if not already reported
599        self.report_mem_scan_metrics();
600
601        READ_ROWS_TOTAL
602            .with_label_values(&["time_series_memtable"])
603            .inc_by(self.metrics.num_rows as u64);
604        READ_STAGE_ELAPSED
605            .with_label_values(&["scan_memtable"])
606            .observe(self.metrics.scan_cost.as_secs_f64());
607    }
608}
609
610impl Iterator for Iter {
611    type Item = Result<Batch>;
612
613    fn next(&mut self) -> Option<Self::Item> {
614        let start = Instant::now();
615        let map = self.series.read().unwrap();
616        let range = match &self.last_key {
617            None => map.0.range::<Vec<u8>, _>(..),
618            Some(last_key) => map
619                .0
620                .range::<Vec<u8>, _>((Bound::Excluded(last_key), Bound::Unbounded)),
621        };
622
623        // TODO(hl): maybe yield more than one time series to amortize range overhead.
624        for (primary_key, series) in range {
625            self.metrics.total_series += 1;
626
627            let mut series = series.write().unwrap();
628            if !self.predicate.is_empty()
629                && !prune_primary_key(
630                    &self.codec,
631                    primary_key.as_slice(),
632                    &mut series,
633                    &self.pk_datatypes,
634                    self.pk_schema.clone(),
635                    &self.predicate,
636                )
637            {
638                // read next series
639                self.metrics.num_pruned_series += 1;
640                continue;
641            }
642            self.last_key = Some(primary_key.clone());
643
644            let values = series.compact(&self.metadata);
645            let batch = values.and_then(|v| {
646                v.to_batch(primary_key, &self.metadata, &self.projection, self.dedup)
647            });
648
649            // Update metrics.
650            self.metrics.num_batches += 1;
651            self.metrics.num_rows += batch.as_ref().map(|b| b.num_rows()).unwrap_or(0);
652            self.metrics.scan_cost += start.elapsed();
653
654            let mut batch = batch;
655            batch = batch.and_then(|mut batch| {
656                batch.filter_by_sequence(self.sequence)?;
657                Ok(batch)
658            });
659            return Some(batch);
660        }
661        drop(map); // Explicitly drop the read lock
662        self.metrics.scan_cost += start.elapsed();
663
664        // Report MemScanMetrics before returning None
665        self.report_mem_scan_metrics();
666
667        None
668    }
669}
670
671fn prune_primary_key(
672    codec: &Arc<DensePrimaryKeyCodec>,
673    pk: &[u8],
674    series: &mut Series,
675    datatypes: &[ConcreteDataType],
676    pk_schema: arrow::datatypes::SchemaRef,
677    predicates: &[SimpleFilterEvaluator],
678) -> bool {
679    // no primary key, we simply return true.
680    if pk_schema.fields().is_empty() {
681        return true;
682    }
683
684    // retrieve primary key values from cache or decode from bytes.
685    let pk_values = if let Some(pk_values) = series.pk_cache.as_ref() {
686        pk_values
687    } else {
688        let pk_values = codec.decode_dense_without_column_id(pk);
689        if let Err(e) = pk_values {
690            error!(e; "Failed to decode primary key");
691            return true;
692        }
693        series.update_pk_cache(pk_values.unwrap());
694        series.pk_cache.as_ref().unwrap()
695    };
696
697    // evaluate predicates against primary key values
698    let mut result = true;
699    for predicate in predicates {
700        // ignore predicates that are not referencing primary key columns
701        let Ok(index) = pk_schema.index_of(predicate.column_name()) else {
702            continue;
703        };
704        // Safety: arrow schema and datatypes are constructed from the same source.
705        let scalar_value = pk_values[index]
706            .try_to_scalar_value(&datatypes[index])
707            .unwrap();
708        result &= predicate.evaluate_scalar(&scalar_value).unwrap_or(true);
709    }
710
711    result
712}
713
714/// A `Series` holds a list of field values of some given primary key.
715pub struct Series {
716    pk_cache: Option<Vec<Value>>,
717    active: ValueBuilder,
718    frozen: Vec<Values>,
719    region_metadata: RegionMetadataRef,
720    capacity: usize,
721}
722
723impl Series {
724    pub(crate) fn with_capacity(
725        region_metadata: &RegionMetadataRef,
726        init_capacity: usize,
727        capacity: usize,
728    ) -> Self {
729        MEMTABLE_ACTIVE_SERIES_COUNT.inc();
730        Self {
731            pk_cache: None,
732            active: ValueBuilder::new(region_metadata, init_capacity),
733            frozen: vec![],
734            region_metadata: region_metadata.clone(),
735            capacity,
736        }
737    }
738
739    pub(crate) fn new(region_metadata: &RegionMetadataRef) -> Self {
740        Self::with_capacity(region_metadata, INITIAL_BUILDER_CAPACITY, BUILDER_CAPACITY)
741    }
742
743    pub fn is_empty(&self) -> bool {
744        self.active.len() == 0 && self.frozen.is_empty()
745    }
746
747    /// Pushes a row of values into Series. Return the size of values.
748    pub(crate) fn push<'a>(
749        &mut self,
750        ts: ValueRef<'a>,
751        sequence: u64,
752        op_type: OpType,
753        values: impl Iterator<Item = ValueRef<'a>>,
754    ) -> usize {
755        // + 10 to avoid potential reallocation.
756        if self.active.len() + 10 > self.capacity {
757            let region_metadata = self.region_metadata.clone();
758            self.freeze(&region_metadata);
759        }
760        self.active.push(ts, sequence, op_type as u8, values)
761    }
762
763    fn update_pk_cache(&mut self, pk_values: Vec<Value>) {
764        self.pk_cache = Some(pk_values);
765    }
766
767    /// Freezes the active part and push it to `frozen`.
768    pub(crate) fn freeze(&mut self, region_metadata: &RegionMetadataRef) {
769        if self.active.len() != 0 {
770            let mut builder = ValueBuilder::new(region_metadata, INITIAL_BUILDER_CAPACITY);
771            std::mem::swap(&mut self.active, &mut builder);
772            self.frozen.push(Values::from(builder));
773        }
774    }
775
776    pub(crate) fn extend(
777        &mut self,
778        ts_v: VectorRef,
779        op_type_v: u8,
780        sequence_v: u64,
781        fields: Vec<VectorRef>,
782    ) -> Result<()> {
783        if !self.active.can_accommodate(&fields)? {
784            let region_metadata = self.region_metadata.clone();
785            self.freeze(&region_metadata);
786        }
787        self.active.extend(ts_v, op_type_v, sequence_v, fields)
788    }
789
790    /// Freezes active part to frozen part and compact frozen part to reduce memory fragmentation.
791    /// Returns the frozen and compacted values.
792    pub(crate) fn compact(&mut self, region_metadata: &RegionMetadataRef) -> Result<&Values> {
793        self.freeze(region_metadata);
794
795        let frozen = &self.frozen;
796
797        // Each series must contain at least one row
798        debug_assert!(!frozen.is_empty());
799
800        if frozen.len() > 1 {
801            // TODO(hl): We should keep track of min/max timestamps for each values and avoid
802            // cloning and sorting when values do not overlap with each other.
803
804            let column_size = frozen[0].fields.len() + 3;
805
806            if cfg!(debug_assertions) {
807                debug_assert!(
808                    frozen
809                        .iter()
810                        .zip(frozen.iter().skip(1))
811                        .all(|(prev, next)| { prev.fields.len() == next.fields.len() })
812                );
813            }
814
815            let arrays = frozen.iter().map(|v| v.columns()).collect::<Vec<_>>();
816            let concatenated = (0..column_size)
817                .map(|i| {
818                    let to_concat = arrays.iter().map(|a| a[i].as_ref()).collect::<Vec<_>>();
819                    arrow::compute::concat(&to_concat)
820                })
821                .collect::<std::result::Result<Vec<_>, _>>()
822                .context(ComputeArrowSnafu)?;
823
824            debug_assert_eq!(concatenated.len(), column_size);
825            let values = Values::from_columns(&concatenated)?;
826            self.frozen = vec![values];
827        };
828        Ok(&self.frozen[0])
829    }
830
831    pub fn read_to_values(&self) -> Vec<Values> {
832        let mut res = Vec::with_capacity(self.frozen.len() + 1);
833        res.extend(self.frozen.iter().cloned());
834        res.push(self.active.finish_cloned());
835        res
836    }
837}
838
839/// `ValueBuilder` holds all the vector builders for field columns.
840pub(crate) struct ValueBuilder {
841    timestamp: Vec<i64>,
842    timestamp_type: ConcreteDataType,
843    sequence: Vec<u64>,
844    op_type: Vec<u8>,
845    fields: Vec<Option<FieldBuilder>>,
846    field_types: Vec<ConcreteDataType>,
847}
848
849impl ValueBuilder {
850    pub(crate) fn new(region_metadata: &RegionMetadataRef, capacity: usize) -> Self {
851        let timestamp_type = region_metadata
852            .time_index_column()
853            .column_schema
854            .data_type
855            .clone();
856        let sequence = Vec::with_capacity(capacity);
857        let op_type = Vec::with_capacity(capacity);
858
859        let field_types = region_metadata
860            .field_columns()
861            .map(|c| c.column_schema.data_type.clone())
862            .collect::<Vec<_>>();
863        let fields = (0..field_types.len()).map(|_| None).collect();
864        Self {
865            timestamp: Vec::with_capacity(capacity),
866            timestamp_type,
867            sequence,
868            op_type,
869            fields,
870            field_types,
871        }
872    }
873
874    /// Returns number of field builders.
875    pub fn num_field_builders(&self) -> usize {
876        self.fields.iter().flatten().count()
877    }
878
879    /// Pushes a new row to `ValueBuilder`.
880    /// We don't need primary keys since they've already be encoded.
881    /// Returns the size of field values.
882    ///
883    /// In this method, we don't check the data type of the value, because it is already checked in the caller.
884    pub(crate) fn push<'a>(
885        &mut self,
886        ts: ValueRef,
887        sequence: u64,
888        op_type: u8,
889        fields: impl Iterator<Item = ValueRef<'a>>,
890    ) -> usize {
891        #[cfg(debug_assertions)]
892        let fields = {
893            let field_vec = fields.collect::<Vec<_>>();
894            debug_assert_eq!(field_vec.len(), self.fields.len());
895            field_vec.into_iter()
896        };
897
898        self.timestamp
899            .push(ts.as_timestamp().unwrap().unwrap().value());
900        self.sequence.push(sequence);
901        self.op_type.push(op_type);
902        let num_rows = self.timestamp.len();
903        let mut size = 0;
904        for (idx, field_value) in fields.enumerate() {
905            size += field_value.data_size();
906            if !field_value.is_null() || self.fields[idx].is_some() {
907                if let Some(field) = self.fields[idx].as_mut() {
908                    let _ = field.push(field_value);
909                } else {
910                    let mut mutable_vector =
911                        if let ConcreteDataType::String(_) = &self.field_types[idx] {
912                            FieldBuilder::String(StringBuilder::with_capacity(4, 8))
913                        } else {
914                            FieldBuilder::Other(
915                                self.field_types[idx]
916                                    .create_mutable_vector(num_rows.max(INITIAL_BUILDER_CAPACITY)),
917                            )
918                        };
919                    mutable_vector.push_nulls(num_rows - 1);
920                    let _ = mutable_vector.push(field_value);
921                    self.fields[idx] = Some(mutable_vector);
922                    MEMTABLE_ACTIVE_FIELD_BUILDER_COUNT.inc();
923                }
924            }
925        }
926
927        size
928    }
929
930    /// Checks if current value builder have sufficient space to accommodate `fields`.
931    /// Returns false if there is no space to accommodate fields due to offset overflow.
932    pub(crate) fn can_accommodate(&self, fields: &[VectorRef]) -> Result<bool> {
933        for (field_src, field_dest) in fields.iter().zip(self.fields.iter()) {
934            let Some(builder) = field_dest else {
935                continue;
936            };
937            let FieldBuilder::String(builder) = builder else {
938                continue;
939            };
940            let array = field_src.to_arrow_array();
941            let string_array = array
942                .as_any()
943                .downcast_ref::<StringArray>()
944                .with_context(|| error::InvalidBatchSnafu {
945                    reason: format!(
946                        "Field type mismatch, expecting String, given: {}",
947                        field_src.data_type()
948                    ),
949                })?;
950            let space_needed = string_array.value_data().len() as i32;
951            // offset may overflow
952            if builder.next_offset().checked_add(space_needed).is_none() {
953                return Ok(false);
954            }
955        }
956        Ok(true)
957    }
958
959    pub(crate) fn extend(
960        &mut self,
961        ts_v: VectorRef,
962        op_type: u8,
963        sequence: u64,
964        fields: Vec<VectorRef>,
965    ) -> Result<()> {
966        let num_rows_before = self.timestamp.len();
967        let num_rows_to_write = ts_v.len();
968        self.timestamp.reserve(num_rows_to_write);
969        match self.timestamp_type {
970            ConcreteDataType::Timestamp(TimestampType::Second(_)) => {
971                self.timestamp.extend(
972                    ts_v.as_any()
973                        .downcast_ref::<TimestampSecondVector>()
974                        .unwrap()
975                        .iter_data()
976                        .map(|v| v.unwrap().0.value()),
977                );
978            }
979            ConcreteDataType::Timestamp(TimestampType::Millisecond(_)) => {
980                self.timestamp.extend(
981                    ts_v.as_any()
982                        .downcast_ref::<TimestampMillisecondVector>()
983                        .unwrap()
984                        .iter_data()
985                        .map(|v| v.unwrap().0.value()),
986                );
987            }
988            ConcreteDataType::Timestamp(TimestampType::Microsecond(_)) => {
989                self.timestamp.extend(
990                    ts_v.as_any()
991                        .downcast_ref::<TimestampMicrosecondVector>()
992                        .unwrap()
993                        .iter_data()
994                        .map(|v| v.unwrap().0.value()),
995                );
996            }
997            ConcreteDataType::Timestamp(TimestampType::Nanosecond(_)) => {
998                self.timestamp.extend(
999                    ts_v.as_any()
1000                        .downcast_ref::<TimestampNanosecondVector>()
1001                        .unwrap()
1002                        .iter_data()
1003                        .map(|v| v.unwrap().0.value()),
1004                );
1005            }
1006            _ => unreachable!(),
1007        };
1008
1009        self.op_type.reserve(num_rows_to_write);
1010        self.op_type
1011            .extend(iter::repeat_n(op_type, num_rows_to_write));
1012        self.sequence.reserve(num_rows_to_write);
1013        self.sequence
1014            .extend(iter::repeat_n(sequence, num_rows_to_write));
1015
1016        for (field_idx, (field_src, field_dest)) in
1017            fields.into_iter().zip(self.fields.iter_mut()).enumerate()
1018        {
1019            let builder = field_dest.get_or_insert_with(|| {
1020                let mut field_builder =
1021                    FieldBuilder::create(&self.field_types[field_idx], INITIAL_BUILDER_CAPACITY);
1022                field_builder.push_nulls(num_rows_before);
1023                field_builder
1024            });
1025            match builder {
1026                FieldBuilder::String(builder) => {
1027                    let array = field_src.to_arrow_array();
1028                    let string_array =
1029                        array
1030                            .as_any()
1031                            .downcast_ref::<StringArray>()
1032                            .with_context(|| error::InvalidBatchSnafu {
1033                                reason: format!(
1034                                    "Field type mismatch, expecting String, given: {}",
1035                                    field_src.data_type()
1036                                ),
1037                            })?;
1038                    builder.append_array(string_array);
1039                }
1040                FieldBuilder::Other(builder) => {
1041                    let len = field_src.len();
1042                    builder
1043                        .extend_slice_of(&*field_src, 0, len)
1044                        .context(error::ComputeVectorSnafu)?;
1045                }
1046            }
1047        }
1048        Ok(())
1049    }
1050
1051    /// Returns the length of [ValueBuilder]
1052    fn len(&self) -> usize {
1053        let sequence_len = self.sequence.len();
1054        debug_assert_eq!(sequence_len, self.op_type.len());
1055        debug_assert_eq!(sequence_len, self.timestamp.len());
1056        sequence_len
1057    }
1058
1059    fn finish_cloned(&self) -> Values {
1060        let num_rows = self.sequence.len();
1061        let fields = self
1062            .fields
1063            .iter()
1064            .enumerate()
1065            .map(|(i, v)| {
1066                if let Some(v) = v {
1067                    MEMTABLE_ACTIVE_FIELD_BUILDER_COUNT.dec();
1068                    v.finish_cloned()
1069                } else {
1070                    let mut single_null = self.field_types[i].create_mutable_vector(num_rows);
1071                    single_null.push_nulls(num_rows);
1072                    single_null.to_vector()
1073                }
1074            })
1075            .collect::<Vec<_>>();
1076
1077        let sequence = Arc::new(UInt64Vector::from_vec(self.sequence.clone()));
1078        let op_type = Arc::new(UInt8Vector::from_vec(self.op_type.clone()));
1079        let timestamp: VectorRef = match self.timestamp_type {
1080            ConcreteDataType::Timestamp(TimestampType::Second(_)) => {
1081                Arc::new(TimestampSecondVector::from_vec(self.timestamp.clone()))
1082            }
1083            ConcreteDataType::Timestamp(TimestampType::Millisecond(_)) => {
1084                Arc::new(TimestampMillisecondVector::from_vec(self.timestamp.clone()))
1085            }
1086            ConcreteDataType::Timestamp(TimestampType::Microsecond(_)) => {
1087                Arc::new(TimestampMicrosecondVector::from_vec(self.timestamp.clone()))
1088            }
1089            ConcreteDataType::Timestamp(TimestampType::Nanosecond(_)) => {
1090                Arc::new(TimestampNanosecondVector::from_vec(self.timestamp.clone()))
1091            }
1092            _ => unreachable!(),
1093        };
1094
1095        if cfg!(debug_assertions) {
1096            debug_assert_eq!(timestamp.len(), sequence.len());
1097            debug_assert_eq!(timestamp.len(), op_type.len());
1098            for field in &fields {
1099                debug_assert_eq!(timestamp.len(), field.len());
1100            }
1101        }
1102
1103        Values {
1104            timestamp,
1105            sequence,
1106            op_type,
1107            fields,
1108        }
1109    }
1110}
1111
1112/// [Values] holds an immutable vectors of field columns, including `sequence` and `op_type`.
1113#[derive(Clone)]
1114pub struct Values {
1115    pub(crate) timestamp: VectorRef,
1116    pub(crate) sequence: Arc<UInt64Vector>,
1117    pub(crate) op_type: Arc<UInt8Vector>,
1118    pub(crate) fields: Vec<VectorRef>,
1119}
1120
1121impl Values {
1122    /// Converts [Values] to `Batch`, sorts the batch according to `timestamp, sequence` desc and
1123    /// keeps only the latest row for the same timestamp.
1124    pub fn to_batch(
1125        &self,
1126        primary_key: &[u8],
1127        metadata: &RegionMetadataRef,
1128        projection: &HashSet<ColumnId>,
1129        dedup: bool,
1130    ) -> Result<Batch> {
1131        let builder = BatchBuilder::with_required_columns(
1132            primary_key.to_vec(),
1133            self.timestamp.clone(),
1134            self.sequence.clone(),
1135            self.op_type.clone(),
1136        );
1137
1138        let fields = metadata
1139            .field_columns()
1140            .zip(self.fields.iter())
1141            .filter_map(|(c, f)| {
1142                projection.get(&c.column_id).map(|c| BatchColumn {
1143                    column_id: *c,
1144                    data: f.clone(),
1145                })
1146            })
1147            .collect();
1148
1149        let mut batch = builder.with_fields(fields).build()?;
1150        batch.sort(dedup)?;
1151        Ok(batch)
1152    }
1153
1154    /// Returns a vector of all columns converted to arrow [Array](datatypes::arrow::array::Array) in [Values].
1155    fn columns(&self) -> Vec<ArrayRef> {
1156        let mut res = Vec::with_capacity(3 + self.fields.len());
1157        res.push(self.timestamp.to_arrow_array());
1158        res.push(self.sequence.to_arrow_array());
1159        res.push(self.op_type.to_arrow_array());
1160        res.extend(self.fields.iter().map(|f| f.to_arrow_array()));
1161        res
1162    }
1163
1164    /// Builds a new [Values] instance from columns.
1165    fn from_columns(cols: &[ArrayRef]) -> Result<Self> {
1166        debug_assert!(cols.len() >= 3);
1167        let timestamp = Helper::try_into_vector(&cols[0]).context(ConvertVectorSnafu)?;
1168        let sequence =
1169            Arc::new(UInt64Vector::try_from_arrow_array(&cols[1]).context(ConvertVectorSnafu)?);
1170        let op_type =
1171            Arc::new(UInt8Vector::try_from_arrow_array(&cols[2]).context(ConvertVectorSnafu)?);
1172        let fields = Helper::try_into_vectors(&cols[3..]).context(ConvertVectorSnafu)?;
1173
1174        Ok(Self {
1175            timestamp,
1176            sequence,
1177            op_type,
1178            fields,
1179        })
1180    }
1181}
1182
1183impl From<ValueBuilder> for Values {
1184    fn from(mut value: ValueBuilder) -> Self {
1185        let num_rows = value.len();
1186        let fields = value
1187            .fields
1188            .iter_mut()
1189            .enumerate()
1190            .map(|(i, v)| {
1191                if let Some(v) = v {
1192                    MEMTABLE_ACTIVE_FIELD_BUILDER_COUNT.dec();
1193                    v.finish()
1194                } else {
1195                    let mut single_null = value.field_types[i].create_mutable_vector(num_rows);
1196                    single_null.push_nulls(num_rows);
1197                    single_null.to_vector()
1198                }
1199            })
1200            .collect::<Vec<_>>();
1201
1202        let sequence = Arc::new(UInt64Vector::from_vec(value.sequence));
1203        let op_type = Arc::new(UInt8Vector::from_vec(value.op_type));
1204        let timestamp: VectorRef = match value.timestamp_type {
1205            ConcreteDataType::Timestamp(TimestampType::Second(_)) => {
1206                Arc::new(TimestampSecondVector::from_vec(value.timestamp))
1207            }
1208            ConcreteDataType::Timestamp(TimestampType::Millisecond(_)) => {
1209                Arc::new(TimestampMillisecondVector::from_vec(value.timestamp))
1210            }
1211            ConcreteDataType::Timestamp(TimestampType::Microsecond(_)) => {
1212                Arc::new(TimestampMicrosecondVector::from_vec(value.timestamp))
1213            }
1214            ConcreteDataType::Timestamp(TimestampType::Nanosecond(_)) => {
1215                Arc::new(TimestampNanosecondVector::from_vec(value.timestamp))
1216            }
1217            _ => unreachable!(),
1218        };
1219
1220        if cfg!(debug_assertions) {
1221            debug_assert_eq!(timestamp.len(), sequence.len());
1222            debug_assert_eq!(timestamp.len(), op_type.len());
1223            for field in &fields {
1224                debug_assert_eq!(timestamp.len(), field.len());
1225            }
1226        }
1227
1228        Self {
1229            timestamp,
1230            sequence,
1231            op_type,
1232            fields,
1233        }
1234    }
1235}
1236
1237struct TimeSeriesIterBuilder {
1238    series_set: SeriesSet,
1239    projection: HashSet<ColumnId>,
1240    predicate: Option<Predicate>,
1241    dedup: bool,
1242    sequence: Option<SequenceNumber>,
1243    merge_mode: MergeMode,
1244}
1245
1246impl IterBuilder for TimeSeriesIterBuilder {
1247    fn build(&self, metrics: Option<MemScanMetrics>) -> Result<BoxedBatchIterator> {
1248        let iter = self.series_set.iter_series(
1249            self.projection.clone(),
1250            self.predicate.clone(),
1251            self.dedup,
1252            self.sequence,
1253            metrics,
1254        )?;
1255
1256        if self.merge_mode == MergeMode::LastNonNull {
1257            let iter = LastNonNullIter::new(iter);
1258            Ok(Box::new(iter))
1259        } else {
1260            Ok(Box::new(iter))
1261        }
1262    }
1263}
1264
1265#[cfg(test)]
1266mod tests {
1267    use std::collections::{HashMap, HashSet};
1268
1269    use api::helper::ColumnDataTypeWrapper;
1270    use api::v1::helper::row;
1271    use api::v1::value::ValueData;
1272    use api::v1::{Mutation, Rows, SemanticType};
1273    use common_time::Timestamp;
1274    use datatypes::prelude::{ConcreteDataType, ScalarVector};
1275    use datatypes::schema::ColumnSchema;
1276    use datatypes::value::{OrderedFloat, Value};
1277    use datatypes::vectors::{Float64Vector, Int64Vector, TimestampMillisecondVector};
1278    use mito_codec::row_converter::SortField;
1279    use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
1280    use store_api::storage::RegionId;
1281
1282    use super::*;
1283    use crate::test_util::column_metadata_to_column_schema;
1284
1285    fn schema_for_test() -> RegionMetadataRef {
1286        let mut builder = RegionMetadataBuilder::new(RegionId::new(123, 456));
1287        builder
1288            .push_column_metadata(ColumnMetadata {
1289                column_schema: ColumnSchema::new("k0", ConcreteDataType::string_datatype(), false),
1290                semantic_type: SemanticType::Tag,
1291                column_id: 0,
1292            })
1293            .push_column_metadata(ColumnMetadata {
1294                column_schema: ColumnSchema::new("k1", ConcreteDataType::int64_datatype(), false),
1295                semantic_type: SemanticType::Tag,
1296                column_id: 1,
1297            })
1298            .push_column_metadata(ColumnMetadata {
1299                column_schema: ColumnSchema::new(
1300                    "ts",
1301                    ConcreteDataType::timestamp_millisecond_datatype(),
1302                    false,
1303                ),
1304                semantic_type: SemanticType::Timestamp,
1305                column_id: 2,
1306            })
1307            .push_column_metadata(ColumnMetadata {
1308                column_schema: ColumnSchema::new("v0", ConcreteDataType::int64_datatype(), true),
1309                semantic_type: SemanticType::Field,
1310                column_id: 3,
1311            })
1312            .push_column_metadata(ColumnMetadata {
1313                column_schema: ColumnSchema::new("v1", ConcreteDataType::float64_datatype(), true),
1314                semantic_type: SemanticType::Field,
1315                column_id: 4,
1316            })
1317            .primary_key(vec![0, 1]);
1318        let region_metadata = builder.build().unwrap();
1319        Arc::new(region_metadata)
1320    }
1321
1322    fn ts_value_ref(val: i64) -> ValueRef<'static> {
1323        ValueRef::Timestamp(Timestamp::new_millisecond(val))
1324    }
1325
1326    fn field_value_ref(v0: i64, v1: f64) -> impl Iterator<Item = ValueRef<'static>> {
1327        vec![ValueRef::Int64(v0), ValueRef::Float64(OrderedFloat(v1))].into_iter()
1328    }
1329
1330    fn check_values(values: &Values, expect: &[(i64, u64, u8, i64, f64)]) {
1331        let ts = values
1332            .timestamp
1333            .as_any()
1334            .downcast_ref::<TimestampMillisecondVector>()
1335            .unwrap();
1336
1337        let v0 = values.fields[0]
1338            .as_any()
1339            .downcast_ref::<Int64Vector>()
1340            .unwrap();
1341        let v1 = values.fields[1]
1342            .as_any()
1343            .downcast_ref::<Float64Vector>()
1344            .unwrap();
1345        let read = ts
1346            .iter_data()
1347            .zip(values.sequence.iter_data())
1348            .zip(values.op_type.iter_data())
1349            .zip(v0.iter_data())
1350            .zip(v1.iter_data())
1351            .map(|((((ts, sequence), op_type), v0), v1)| {
1352                (
1353                    ts.unwrap().0.value(),
1354                    sequence.unwrap(),
1355                    op_type.unwrap(),
1356                    v0.unwrap(),
1357                    v1.unwrap(),
1358                )
1359            })
1360            .collect::<Vec<_>>();
1361        assert_eq!(expect, &read);
1362    }
1363
1364    #[test]
1365    fn test_series() {
1366        let region_metadata = schema_for_test();
1367        let mut series = Series::new(&region_metadata);
1368        series.push(ts_value_ref(1), 0, OpType::Put, field_value_ref(1, 10.1));
1369        series.push(ts_value_ref(2), 0, OpType::Put, field_value_ref(2, 10.2));
1370        assert_eq!(2, series.active.timestamp.len());
1371        assert_eq!(0, series.frozen.len());
1372
1373        let values = series.compact(&region_metadata).unwrap();
1374        check_values(values, &[(1, 0, 1, 1, 10.1), (2, 0, 1, 2, 10.2)]);
1375        assert_eq!(0, series.active.timestamp.len());
1376        assert_eq!(1, series.frozen.len());
1377    }
1378
1379    #[test]
1380    fn test_series_with_nulls() {
1381        let region_metadata = schema_for_test();
1382        let mut series = Series::new(&region_metadata);
1383        // col1: NULL 1 2 3
1384        // col2: NULL NULL 10.2 NULL
1385        series.push(
1386            ts_value_ref(1),
1387            0,
1388            OpType::Put,
1389            vec![ValueRef::Null, ValueRef::Null].into_iter(),
1390        );
1391        series.push(
1392            ts_value_ref(1),
1393            0,
1394            OpType::Put,
1395            vec![ValueRef::Int64(1), ValueRef::Null].into_iter(),
1396        );
1397        series.push(ts_value_ref(1), 2, OpType::Put, field_value_ref(2, 10.2));
1398        series.push(
1399            ts_value_ref(1),
1400            3,
1401            OpType::Put,
1402            vec![ValueRef::Int64(2), ValueRef::Null].into_iter(),
1403        );
1404        assert_eq!(4, series.active.timestamp.len());
1405        assert_eq!(0, series.frozen.len());
1406
1407        let values = series.compact(&region_metadata).unwrap();
1408        assert_eq!(values.fields[0].null_count(), 1);
1409        assert_eq!(values.fields[1].null_count(), 3);
1410        assert_eq!(0, series.active.timestamp.len());
1411        assert_eq!(1, series.frozen.len());
1412    }
1413
1414    fn check_value(batch: &Batch, expect: Vec<Vec<Value>>) {
1415        let ts_len = batch.timestamps().len();
1416        assert_eq!(batch.sequences().len(), ts_len);
1417        assert_eq!(batch.op_types().len(), ts_len);
1418        for f in batch.fields() {
1419            assert_eq!(f.data.len(), ts_len);
1420        }
1421
1422        let mut rows = vec![];
1423        for idx in 0..ts_len {
1424            let mut row = Vec::with_capacity(batch.fields().len() + 3);
1425            row.push(batch.timestamps().get(idx));
1426            row.push(batch.sequences().get(idx));
1427            row.push(batch.op_types().get(idx));
1428            row.extend(batch.fields().iter().map(|f| f.data.get(idx)));
1429            rows.push(row);
1430        }
1431
1432        assert_eq!(expect.len(), rows.len());
1433        for (idx, row) in rows.iter().enumerate() {
1434            assert_eq!(&expect[idx], row);
1435        }
1436    }
1437
1438    #[test]
1439    fn test_values_sort() {
1440        let schema = schema_for_test();
1441        let timestamp = Arc::new(TimestampMillisecondVector::from_vec(vec![1, 2, 3, 4, 3]));
1442        let sequence = Arc::new(UInt64Vector::from_vec(vec![1, 1, 1, 1, 2]));
1443        let op_type = Arc::new(UInt8Vector::from_vec(vec![1, 1, 1, 1, 0]));
1444
1445        let fields = vec![
1446            Arc::new(Int64Vector::from_vec(vec![4, 3, 2, 1, 2])) as Arc<_>,
1447            Arc::new(Float64Vector::from_vec(vec![1.1, 2.1, 4.2, 3.3, 4.2])) as Arc<_>,
1448        ];
1449        let values = Values {
1450            timestamp: timestamp as Arc<_>,
1451            sequence,
1452            op_type,
1453            fields,
1454        };
1455
1456        let batch = values
1457            .to_batch(
1458                b"test",
1459                &schema,
1460                &[0, 1, 2, 3, 4].into_iter().collect(),
1461                true,
1462            )
1463            .unwrap();
1464        check_value(
1465            &batch,
1466            vec![
1467                vec![
1468                    Value::Timestamp(Timestamp::new_millisecond(1)),
1469                    Value::UInt64(1),
1470                    Value::UInt8(1),
1471                    Value::Int64(4),
1472                    Value::Float64(OrderedFloat(1.1)),
1473                ],
1474                vec![
1475                    Value::Timestamp(Timestamp::new_millisecond(2)),
1476                    Value::UInt64(1),
1477                    Value::UInt8(1),
1478                    Value::Int64(3),
1479                    Value::Float64(OrderedFloat(2.1)),
1480                ],
1481                vec![
1482                    Value::Timestamp(Timestamp::new_millisecond(3)),
1483                    Value::UInt64(2),
1484                    Value::UInt8(0),
1485                    Value::Int64(2),
1486                    Value::Float64(OrderedFloat(4.2)),
1487                ],
1488                vec![
1489                    Value::Timestamp(Timestamp::new_millisecond(4)),
1490                    Value::UInt64(1),
1491                    Value::UInt8(1),
1492                    Value::Int64(1),
1493                    Value::Float64(OrderedFloat(3.3)),
1494                ],
1495            ],
1496        )
1497    }
1498
1499    fn build_key_values(schema: &RegionMetadataRef, k0: String, k1: i64, len: usize) -> KeyValues {
1500        let column_schema = schema
1501            .column_metadatas
1502            .iter()
1503            .map(|c| api::v1::ColumnSchema {
1504                column_name: c.column_schema.name.clone(),
1505                datatype: ColumnDataTypeWrapper::try_from(c.column_schema.data_type.clone())
1506                    .unwrap()
1507                    .datatype() as i32,
1508                semantic_type: c.semantic_type as i32,
1509                ..Default::default()
1510            })
1511            .collect();
1512
1513        let rows = (0..len)
1514            .map(|i| {
1515                row(vec![
1516                    ValueData::StringValue(k0.clone()),
1517                    ValueData::I64Value(k1),
1518                    ValueData::TimestampMillisecondValue(i as i64),
1519                    ValueData::I64Value(i as i64),
1520                    ValueData::F64Value(i as f64),
1521                ])
1522            })
1523            .collect();
1524        let mutation = api::v1::Mutation {
1525            op_type: 1,
1526            sequence: 0,
1527            rows: Some(Rows {
1528                schema: column_schema,
1529                rows,
1530            }),
1531            write_hint: None,
1532        };
1533        KeyValues::new(schema.as_ref(), mutation).unwrap()
1534    }
1535
1536    #[test]
1537    fn test_series_set_concurrency() {
1538        let schema = schema_for_test();
1539        let row_codec = Arc::new(DensePrimaryKeyCodec::with_fields(
1540            schema
1541                .primary_key_columns()
1542                .map(|c| {
1543                    (
1544                        c.column_id,
1545                        SortField::new(c.column_schema.data_type.clone()),
1546                    )
1547                })
1548                .collect(),
1549        ));
1550        let set = Arc::new(SeriesSet::new(schema.clone(), row_codec));
1551
1552        let concurrency = 32;
1553        let pk_num = concurrency * 2;
1554        let mut handles = Vec::with_capacity(concurrency);
1555        for i in 0..concurrency {
1556            let set = set.clone();
1557            let schema = schema.clone();
1558            let column_schemas = schema
1559                .column_metadatas
1560                .iter()
1561                .map(column_metadata_to_column_schema)
1562                .collect::<Vec<_>>();
1563            let handle = std::thread::spawn(move || {
1564                for j in i * 100..(i + 1) * 100 {
1565                    let pk = j % pk_num;
1566                    let primary_key = format!("pk-{}", pk).as_bytes().to_vec();
1567
1568                    let kvs = KeyValues::new(
1569                        &schema,
1570                        Mutation {
1571                            op_type: OpType::Put as i32,
1572                            sequence: j as u64,
1573                            rows: Some(Rows {
1574                                schema: column_schemas.clone(),
1575                                rows: vec![row(vec![
1576                                    ValueData::StringValue(format!("{}", j)),
1577                                    ValueData::I64Value(j as i64),
1578                                    ValueData::TimestampMillisecondValue(j as i64),
1579                                    ValueData::I64Value(j as i64),
1580                                    ValueData::F64Value(j as f64),
1581                                ])],
1582                            }),
1583                            write_hint: None,
1584                        },
1585                    )
1586                    .unwrap();
1587                    set.push_to_series(primary_key, &kvs.iter().next().unwrap());
1588                }
1589            });
1590            handles.push(handle);
1591        }
1592        for h in handles {
1593            h.join().unwrap();
1594        }
1595
1596        let mut timestamps = Vec::with_capacity(concurrency * 100);
1597        let mut sequences = Vec::with_capacity(concurrency * 100);
1598        let mut op_types = Vec::with_capacity(concurrency * 100);
1599        let mut v0 = Vec::with_capacity(concurrency * 100);
1600
1601        for i in 0..pk_num {
1602            let pk = format!("pk-{}", i).as_bytes().to_vec();
1603            let series = set.get_series(&pk).unwrap();
1604            let mut guard = series.write().unwrap();
1605            let values = guard.compact(&schema).unwrap();
1606            timestamps.extend(values.sequence.iter_data().map(|v| v.unwrap() as i64));
1607            sequences.extend(values.sequence.iter_data().map(|v| v.unwrap() as i64));
1608            op_types.extend(values.op_type.iter_data().map(|v| v.unwrap()));
1609            v0.extend(
1610                values
1611                    .fields
1612                    .first()
1613                    .unwrap()
1614                    .as_any()
1615                    .downcast_ref::<Int64Vector>()
1616                    .unwrap()
1617                    .iter_data()
1618                    .map(|v| v.unwrap()),
1619            );
1620        }
1621
1622        let expected_sequence = (0..(concurrency * 100) as i64).collect::<HashSet<_>>();
1623        assert_eq!(
1624            expected_sequence,
1625            sequences.iter().copied().collect::<HashSet<_>>()
1626        );
1627
1628        op_types.iter().all(|op| *op == OpType::Put as u8);
1629        assert_eq!(
1630            expected_sequence,
1631            timestamps.iter().copied().collect::<HashSet<_>>()
1632        );
1633
1634        assert_eq!(timestamps, sequences);
1635        assert_eq!(v0, timestamps);
1636    }
1637
1638    #[test]
1639    fn test_memtable() {
1640        common_telemetry::init_default_ut_logging();
1641        check_memtable_dedup(true);
1642        check_memtable_dedup(false);
1643    }
1644
1645    fn check_memtable_dedup(dedup: bool) {
1646        let schema = schema_for_test();
1647        let kvs = build_key_values(&schema, "hello".to_string(), 42, 100);
1648        let memtable = TimeSeriesMemtable::new(schema, 42, None, dedup, MergeMode::LastRow);
1649        memtable.write(&kvs).unwrap();
1650        memtable.write(&kvs).unwrap();
1651
1652        let mut expected_ts: HashMap<i64, usize> = HashMap::new();
1653        for ts in kvs
1654            .iter()
1655            .map(|kv| kv.timestamp().as_timestamp().unwrap().unwrap().value())
1656        {
1657            *expected_ts.entry(ts).or_default() += if dedup { 1 } else { 2 };
1658        }
1659
1660        let iter = memtable.iter(None, None, None).unwrap();
1661        let mut read = HashMap::new();
1662
1663        for ts in iter
1664            .flat_map(|batch| {
1665                batch
1666                    .unwrap()
1667                    .timestamps()
1668                    .as_any()
1669                    .downcast_ref::<TimestampMillisecondVector>()
1670                    .unwrap()
1671                    .iter_data()
1672                    .collect::<Vec<_>>()
1673                    .into_iter()
1674            })
1675            .map(|v| v.unwrap().0.value())
1676        {
1677            *read.entry(ts).or_default() += 1;
1678        }
1679        assert_eq!(expected_ts, read);
1680
1681        let stats = memtable.stats();
1682        assert!(stats.bytes_allocated() > 0);
1683        assert_eq!(
1684            Some((
1685                Timestamp::new_millisecond(0),
1686                Timestamp::new_millisecond(99)
1687            )),
1688            stats.time_range()
1689        );
1690    }
1691
1692    #[test]
1693    fn test_memtable_projection() {
1694        common_telemetry::init_default_ut_logging();
1695        let schema = schema_for_test();
1696        let kvs = build_key_values(&schema, "hello".to_string(), 42, 100);
1697        let memtable = TimeSeriesMemtable::new(schema, 42, None, true, MergeMode::LastRow);
1698        memtable.write(&kvs).unwrap();
1699
1700        let iter = memtable.iter(Some(&[3]), None, None).unwrap();
1701
1702        let mut v0_all = vec![];
1703
1704        for res in iter {
1705            let batch = res.unwrap();
1706            assert_eq!(1, batch.fields().len());
1707            let v0 = batch
1708                .fields()
1709                .first()
1710                .unwrap()
1711                .data
1712                .as_any()
1713                .downcast_ref::<Int64Vector>()
1714                .unwrap();
1715            v0_all.extend(v0.iter_data().map(|v| v.unwrap()));
1716        }
1717        assert_eq!((0..100i64).collect::<Vec<_>>(), v0_all);
1718    }
1719
1720    #[test]
1721    fn test_memtable_concurrent_write_read() {
1722        common_telemetry::init_default_ut_logging();
1723        let schema = schema_for_test();
1724        let memtable = Arc::new(TimeSeriesMemtable::new(
1725            schema.clone(),
1726            42,
1727            None,
1728            true,
1729            MergeMode::LastRow,
1730        ));
1731
1732        // Number of writer threads
1733        let num_writers = 10;
1734        // Number of reader threads
1735        let num_readers = 5;
1736        // Number of series per writer
1737        let series_per_writer = 100;
1738        // Number of rows per series
1739        let rows_per_series = 10;
1740        // Total number of series
1741        let total_series = num_writers * series_per_writer;
1742
1743        // Create a barrier to synchronize the start of all threads
1744        let barrier = Arc::new(std::sync::Barrier::new(num_writers + num_readers + 1));
1745
1746        // Spawn writer threads
1747        let mut writer_handles = Vec::with_capacity(num_writers);
1748        for writer_id in 0..num_writers {
1749            let memtable = memtable.clone();
1750            let schema = schema.clone();
1751            let barrier = barrier.clone();
1752
1753            let handle = std::thread::spawn(move || {
1754                // Wait for all threads to be ready
1755                barrier.wait();
1756
1757                // Create and write series
1758                for series_id in 0..series_per_writer {
1759                    let series_key = format!("writer-{}-series-{}", writer_id, series_id);
1760                    let kvs =
1761                        build_key_values(&schema, series_key, series_id as i64, rows_per_series);
1762                    memtable.write(&kvs).unwrap();
1763                }
1764            });
1765
1766            writer_handles.push(handle);
1767        }
1768
1769        // Spawn reader threads
1770        let mut reader_handles = Vec::with_capacity(num_readers);
1771        for _ in 0..num_readers {
1772            let memtable = memtable.clone();
1773            let barrier = barrier.clone();
1774
1775            let handle = std::thread::spawn(move || {
1776                barrier.wait();
1777
1778                for _ in 0..10 {
1779                    let iter = memtable.iter(None, None, None).unwrap();
1780                    for batch_result in iter {
1781                        let _ = batch_result.unwrap();
1782                    }
1783                }
1784            });
1785
1786            reader_handles.push(handle);
1787        }
1788
1789        barrier.wait();
1790
1791        for handle in writer_handles {
1792            handle.join().unwrap();
1793        }
1794        for handle in reader_handles {
1795            handle.join().unwrap();
1796        }
1797
1798        let iter = memtable.iter(None, None, None).unwrap();
1799        let mut series_count = 0;
1800        let mut row_count = 0;
1801
1802        for batch_result in iter {
1803            let batch = batch_result.unwrap();
1804            series_count += 1;
1805            row_count += batch.num_rows();
1806        }
1807        assert_eq!(total_series, series_count);
1808        assert_eq!(total_series * rows_per_series, row_count);
1809    }
1810}