mito2/memtable/
time_series.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::btree_map::Entry;
16use std::collections::{BTreeMap, Bound, HashSet};
17use std::fmt::{Debug, Formatter};
18use std::iter;
19use std::sync::atomic::{AtomicI64, AtomicU64, AtomicUsize, Ordering};
20use std::sync::{Arc, RwLock};
21use std::time::{Duration, Instant};
22
23use api::v1::OpType;
24use common_recordbatch::filter::SimpleFilterEvaluator;
25use common_telemetry::{debug, error};
26use common_time::Timestamp;
27use datatypes::arrow;
28use datatypes::arrow::array::ArrayRef;
29use datatypes::arrow_array::StringArray;
30use datatypes::data_type::{ConcreteDataType, DataType};
31use datatypes::prelude::{ScalarVector, Vector, VectorRef};
32use datatypes::types::TimestampType;
33use datatypes::value::{Value, ValueRef};
34use datatypes::vectors::{
35    Helper, TimestampMicrosecondVector, TimestampMillisecondVector, TimestampNanosecondVector,
36    TimestampSecondVector, UInt8Vector, UInt64Vector,
37};
38use mito_codec::key_values::KeyValue;
39use mito_codec::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt};
40use snafu::{OptionExt, ResultExt, ensure};
41use store_api::metadata::RegionMetadataRef;
42use store_api::storage::{ColumnId, SequenceRange};
43use table::predicate::Predicate;
44
45use crate::error::{
46    self, ComputeArrowSnafu, ConvertVectorSnafu, EncodeSnafu, PrimaryKeyLengthMismatchSnafu, Result,
47};
48use crate::flush::WriteBufferManagerRef;
49use crate::memtable::builder::{FieldBuilder, StringBuilder};
50use crate::memtable::bulk::part::BulkPart;
51use crate::memtable::simple_bulk_memtable::SimpleBulkMemtable;
52use crate::memtable::stats::WriteMetrics;
53use crate::memtable::{
54    AllocTracker, BatchToRecordBatchContext, BoxedBatchIterator, IterBuilder, KeyValues,
55    MemScanMetrics, Memtable, MemtableBuilder, MemtableId, MemtableRange, MemtableRangeContext,
56    MemtableRanges, MemtableRef, MemtableStats, RangesOptions, read_column_ids_from_projection,
57};
58use crate::metrics::{
59    MEMTABLE_ACTIVE_FIELD_BUILDER_COUNT, MEMTABLE_ACTIVE_SERIES_COUNT, READ_ROWS_TOTAL,
60    READ_STAGE_ELAPSED,
61};
62use crate::read::dedup::LastNonNullIter;
63use crate::read::{Batch, BatchBuilder, BatchColumn};
64use crate::region::options::MergeMode;
65
66/// Initial vector builder capacity.
67const INITIAL_BUILDER_CAPACITY: usize = 4;
68
69/// Vector builder capacity.
70const BUILDER_CAPACITY: usize = 512;
71
72/// Builder to build [TimeSeriesMemtable].
73#[derive(Debug, Default)]
74pub struct TimeSeriesMemtableBuilder {
75    write_buffer_manager: Option<WriteBufferManagerRef>,
76    dedup: bool,
77    merge_mode: MergeMode,
78}
79
80impl TimeSeriesMemtableBuilder {
81    /// Creates a new builder with specific `write_buffer_manager`.
82    pub fn new(
83        write_buffer_manager: Option<WriteBufferManagerRef>,
84        dedup: bool,
85        merge_mode: MergeMode,
86    ) -> Self {
87        Self {
88            write_buffer_manager,
89            dedup,
90            merge_mode,
91        }
92    }
93}
94
95impl MemtableBuilder for TimeSeriesMemtableBuilder {
96    fn build(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef {
97        if metadata.primary_key.is_empty() {
98            Arc::new(SimpleBulkMemtable::new(
99                id,
100                metadata.clone(),
101                self.write_buffer_manager.clone(),
102                self.dedup,
103                self.merge_mode,
104            ))
105        } else {
106            Arc::new(TimeSeriesMemtable::new(
107                metadata.clone(),
108                id,
109                self.write_buffer_manager.clone(),
110                self.dedup,
111                self.merge_mode,
112            ))
113        }
114    }
115
116    fn use_bulk_insert(&self, _metadata: &RegionMetadataRef) -> bool {
117        // Now if we can use simple bulk memtable, the input request is already
118        // a bulk write request and won't call this method.
119        false
120    }
121}
122
123/// Memtable implementation that groups rows by their primary key.
124pub struct TimeSeriesMemtable {
125    id: MemtableId,
126    region_metadata: RegionMetadataRef,
127    row_codec: Arc<DensePrimaryKeyCodec>,
128    series_set: SeriesSet,
129    alloc_tracker: AllocTracker,
130    max_timestamp: AtomicI64,
131    min_timestamp: AtomicI64,
132    max_sequence: AtomicU64,
133    dedup: bool,
134    merge_mode: MergeMode,
135    /// Total written rows in memtable. This also includes deleted and duplicated rows.
136    num_rows: AtomicUsize,
137}
138
139impl TimeSeriesMemtable {
140    pub fn new(
141        region_metadata: RegionMetadataRef,
142        id: MemtableId,
143        write_buffer_manager: Option<WriteBufferManagerRef>,
144        dedup: bool,
145        merge_mode: MergeMode,
146    ) -> Self {
147        let row_codec = Arc::new(DensePrimaryKeyCodec::new(&region_metadata));
148        let series_set = SeriesSet::new(region_metadata.clone(), row_codec.clone());
149        Self {
150            id,
151            region_metadata,
152            series_set,
153            row_codec,
154            alloc_tracker: AllocTracker::new(write_buffer_manager),
155            max_timestamp: AtomicI64::new(i64::MIN),
156            min_timestamp: AtomicI64::new(i64::MAX),
157            max_sequence: AtomicU64::new(0),
158            dedup,
159            merge_mode,
160            num_rows: Default::default(),
161        }
162    }
163
164    /// Updates memtable stats.
165    fn update_stats(&self, stats: WriteMetrics) {
166        self.alloc_tracker
167            .on_allocation(stats.key_bytes + stats.value_bytes);
168        self.max_timestamp.fetch_max(stats.max_ts, Ordering::SeqCst);
169        self.min_timestamp.fetch_min(stats.min_ts, Ordering::SeqCst);
170        self.max_sequence
171            .fetch_max(stats.max_sequence, Ordering::SeqCst);
172        self.num_rows.fetch_add(stats.num_rows, Ordering::SeqCst);
173    }
174
175    fn write_key_value(&self, kv: KeyValue, stats: &mut WriteMetrics) -> Result<()> {
176        ensure!(
177            self.row_codec.num_fields() == kv.num_primary_keys(),
178            PrimaryKeyLengthMismatchSnafu {
179                expect: self.row_codec.num_fields(),
180                actual: kv.num_primary_keys(),
181            }
182        );
183
184        let primary_key_encoded = self
185            .row_codec
186            .encode(kv.primary_keys())
187            .context(EncodeSnafu)?;
188
189        let (key_allocated, value_allocated) =
190            self.series_set.push_to_series(primary_key_encoded, &kv);
191        stats.key_bytes += key_allocated;
192        stats.value_bytes += value_allocated;
193
194        // safety: timestamp of kv must be both present and a valid timestamp value.
195        let ts = kv
196            .timestamp()
197            .try_into_timestamp()
198            .unwrap()
199            .unwrap()
200            .value();
201        stats.min_ts = stats.min_ts.min(ts);
202        stats.max_ts = stats.max_ts.max(ts);
203        Ok(())
204    }
205}
206
207impl Debug for TimeSeriesMemtable {
208    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
209        f.debug_struct("TimeSeriesMemtable").finish()
210    }
211}
212
213impl Memtable for TimeSeriesMemtable {
214    fn id(&self) -> MemtableId {
215        self.id
216    }
217
218    fn write(&self, kvs: &KeyValues) -> Result<()> {
219        if kvs.is_empty() {
220            return Ok(());
221        }
222
223        let mut local_stats = WriteMetrics::default();
224
225        for kv in kvs.iter() {
226            self.write_key_value(kv, &mut local_stats)?;
227        }
228        local_stats.value_bytes += kvs.num_rows() * std::mem::size_of::<Timestamp>();
229        local_stats.value_bytes += kvs.num_rows() * std::mem::size_of::<OpType>();
230        local_stats.max_sequence = kvs.max_sequence();
231        local_stats.num_rows = kvs.num_rows();
232        // TODO(hl): this maybe inaccurate since for-iteration may return early.
233        // We may lift the primary key length check out of Memtable::write
234        // so that we can ensure writing to memtable will succeed.
235        self.update_stats(local_stats);
236        Ok(())
237    }
238
239    fn write_one(&self, key_value: KeyValue) -> Result<()> {
240        let mut metrics = WriteMetrics::default();
241        let res = self.write_key_value(key_value, &mut metrics);
242        metrics.value_bytes += std::mem::size_of::<Timestamp>() + std::mem::size_of::<OpType>();
243        metrics.max_sequence = key_value.sequence();
244        metrics.num_rows = 1;
245
246        if res.is_ok() {
247            self.update_stats(metrics);
248        }
249        res
250    }
251
252    fn write_bulk(&self, part: BulkPart) -> Result<()> {
253        // Default implementation fallback to row iteration.
254        let mutation = part.to_mutation(&self.region_metadata)?;
255        let mut metrics = WriteMetrics::default();
256        if let Some(key_values) = KeyValues::new(&self.region_metadata, mutation) {
257            for kv in key_values.iter() {
258                self.write_key_value(kv, &mut metrics)?
259            }
260        }
261
262        metrics.max_sequence = part.sequence;
263        metrics.max_ts = part.max_timestamp;
264        metrics.min_ts = part.min_timestamp;
265        metrics.num_rows = part.num_rows();
266        self.update_stats(metrics);
267        Ok(())
268    }
269
270    fn ranges(
271        &self,
272        projection: Option<&[ColumnId]>,
273        options: RangesOptions,
274    ) -> Result<MemtableRanges> {
275        let predicate = options.predicate;
276        let sequence = options.sequence;
277        let read_column_ids = read_column_ids_from_projection(&self.region_metadata, projection);
278        let projection = if let Some(projection) = projection {
279            projection.iter().copied().collect()
280        } else {
281            self.region_metadata
282                .field_columns()
283                .map(|c| c.column_id)
284                .collect()
285        };
286        let builder = Box::new(TimeSeriesIterBuilder {
287            series_set: self.series_set.clone(),
288            projection,
289            predicate: predicate.predicate().cloned(),
290            dedup: self.dedup,
291            merge_mode: self.merge_mode,
292            sequence,
293        });
294        let adapter_context = Arc::new(BatchToRecordBatchContext::new(
295            self.region_metadata.clone(),
296            read_column_ids,
297        ));
298        let context = Arc::new(MemtableRangeContext::new_with_batch_to_record_batch(
299            self.id,
300            builder,
301            predicate,
302            Some(adapter_context),
303        ));
304
305        let range_stats = self.stats();
306        let range = MemtableRange::new(context, range_stats);
307        Ok(MemtableRanges {
308            ranges: [(0, range)].into(),
309        })
310    }
311
312    fn is_empty(&self) -> bool {
313        self.series_set.series.read().unwrap().0.is_empty()
314    }
315
316    fn freeze(&self) -> Result<()> {
317        self.alloc_tracker.done_allocating();
318
319        Ok(())
320    }
321
322    fn stats(&self) -> MemtableStats {
323        let estimated_bytes = self.alloc_tracker.bytes_allocated();
324
325        if estimated_bytes == 0 {
326            // no rows ever written
327            return MemtableStats {
328                estimated_bytes,
329                time_range: None,
330                num_rows: 0,
331                num_ranges: 0,
332                max_sequence: 0,
333                series_count: 0,
334            };
335        }
336        let ts_type = self
337            .region_metadata
338            .time_index_column()
339            .column_schema
340            .data_type
341            .clone()
342            .as_timestamp()
343            .expect("Timestamp column must have timestamp type");
344        let max_timestamp = ts_type.create_timestamp(self.max_timestamp.load(Ordering::Relaxed));
345        let min_timestamp = ts_type.create_timestamp(self.min_timestamp.load(Ordering::Relaxed));
346        let series_count = self.series_set.series.read().unwrap().0.len();
347        MemtableStats {
348            estimated_bytes,
349            time_range: Some((min_timestamp, max_timestamp)),
350            num_rows: self.num_rows.load(Ordering::Relaxed),
351            num_ranges: 1,
352            max_sequence: self.max_sequence.load(Ordering::Relaxed),
353            series_count,
354        }
355    }
356
357    fn fork(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef {
358        Arc::new(TimeSeriesMemtable::new(
359            metadata.clone(),
360            id,
361            self.alloc_tracker.write_buffer_manager(),
362            self.dedup,
363            self.merge_mode,
364        ))
365    }
366}
367
368#[derive(Default)]
369struct SeriesMap(BTreeMap<Vec<u8>, Arc<RwLock<Series>>>);
370
371impl Drop for SeriesMap {
372    fn drop(&mut self) {
373        let num_series = self.0.len();
374        let num_field_builders = self
375            .0
376            .values()
377            .map(|v| v.read().unwrap().active.num_field_builders())
378            .sum::<usize>();
379        MEMTABLE_ACTIVE_SERIES_COUNT.sub(num_series as i64);
380        MEMTABLE_ACTIVE_FIELD_BUILDER_COUNT.sub(num_field_builders as i64);
381    }
382}
383
384#[derive(Clone)]
385pub(crate) struct SeriesSet {
386    region_metadata: RegionMetadataRef,
387    series: Arc<RwLock<SeriesMap>>,
388    codec: Arc<DensePrimaryKeyCodec>,
389}
390
391impl SeriesSet {
392    fn new(region_metadata: RegionMetadataRef, codec: Arc<DensePrimaryKeyCodec>) -> Self {
393        Self {
394            region_metadata,
395            series: Default::default(),
396            codec,
397        }
398    }
399}
400
401impl SeriesSet {
402    /// Push [KeyValue] to SeriesSet with given primary key and return key/value allocated memory size.
403    fn push_to_series(&self, primary_key: Vec<u8>, kv: &KeyValue) -> (usize, usize) {
404        if let Some(series) = self.series.read().unwrap().0.get(&primary_key) {
405            let value_allocated = series.write().unwrap().push(
406                kv.timestamp(),
407                kv.sequence(),
408                kv.op_type(),
409                kv.fields(),
410            );
411            return (0, value_allocated);
412        };
413
414        let mut indices = self.series.write().unwrap();
415        match indices.0.entry(primary_key) {
416            Entry::Vacant(v) => {
417                let key_len = v.key().len();
418                let mut series = Series::new(&self.region_metadata);
419                let value_allocated =
420                    series.push(kv.timestamp(), kv.sequence(), kv.op_type(), kv.fields());
421                v.insert(Arc::new(RwLock::new(series)));
422                (key_len, value_allocated)
423            }
424            // safety: series must exist at given index.
425            Entry::Occupied(v) => {
426                let value_allocated = v.get().write().unwrap().push(
427                    kv.timestamp(),
428                    kv.sequence(),
429                    kv.op_type(),
430                    kv.fields(),
431                );
432                (0, value_allocated)
433            }
434        }
435    }
436
437    #[cfg(test)]
438    fn get_series(&self, primary_key: &[u8]) -> Option<Arc<RwLock<Series>>> {
439        self.series.read().unwrap().0.get(primary_key).cloned()
440    }
441
442    /// Iterates all series in [SeriesSet].
443    fn iter_series(
444        &self,
445        projection: HashSet<ColumnId>,
446        predicate: Option<Predicate>,
447        dedup: bool,
448        merge_mode: MergeMode,
449        sequence: Option<SequenceRange>,
450        mem_scan_metrics: Option<MemScanMetrics>,
451    ) -> Result<Iter> {
452        let primary_key_schema = primary_key_schema(&self.region_metadata);
453        let primary_key_datatypes = self
454            .region_metadata
455            .primary_key_columns()
456            .map(|pk| pk.column_schema.data_type.clone())
457            .collect();
458
459        Iter::try_new(
460            self.region_metadata.clone(),
461            self.series.clone(),
462            projection,
463            predicate,
464            primary_key_schema,
465            primary_key_datatypes,
466            self.codec.clone(),
467            dedup,
468            merge_mode,
469            sequence,
470            mem_scan_metrics,
471        )
472    }
473}
474
475/// Creates an arrow [SchemaRef](arrow::datatypes::SchemaRef) that only contains primary keys
476/// of given region schema
477pub(crate) fn primary_key_schema(
478    region_metadata: &RegionMetadataRef,
479) -> arrow::datatypes::SchemaRef {
480    let fields = region_metadata
481        .primary_key_columns()
482        .map(|pk| {
483            arrow::datatypes::Field::new(
484                pk.column_schema.name.clone(),
485                pk.column_schema.data_type.as_arrow_type(),
486                pk.column_schema.is_nullable(),
487            )
488        })
489        .collect::<Vec<_>>();
490    Arc::new(arrow::datatypes::Schema::new(fields))
491}
492
493/// Metrics for reading the memtable.
494#[derive(Debug, Default)]
495struct Metrics {
496    /// Total series in the memtable.
497    total_series: usize,
498    /// Number of series pruned.
499    num_pruned_series: usize,
500    /// Number of rows read.
501    num_rows: usize,
502    /// Number of batch read.
503    num_batches: usize,
504    /// Duration to scan the memtable.
505    scan_cost: Duration,
506}
507
508struct Iter {
509    metadata: RegionMetadataRef,
510    series: Arc<RwLock<SeriesMap>>,
511    projection: HashSet<ColumnId>,
512    last_key: Option<Vec<u8>>,
513    predicate: Vec<SimpleFilterEvaluator>,
514    pk_schema: arrow::datatypes::SchemaRef,
515    pk_datatypes: Vec<ConcreteDataType>,
516    codec: Arc<DensePrimaryKeyCodec>,
517    dedup: bool,
518    merge_mode: MergeMode,
519    sequence: Option<SequenceRange>,
520    metrics: Metrics,
521    mem_scan_metrics: Option<MemScanMetrics>,
522}
523
524impl Iter {
525    #[allow(clippy::too_many_arguments)]
526    pub(crate) fn try_new(
527        metadata: RegionMetadataRef,
528        series: Arc<RwLock<SeriesMap>>,
529        projection: HashSet<ColumnId>,
530        predicate: Option<Predicate>,
531        pk_schema: arrow::datatypes::SchemaRef,
532        pk_datatypes: Vec<ConcreteDataType>,
533        codec: Arc<DensePrimaryKeyCodec>,
534        dedup: bool,
535        merge_mode: MergeMode,
536        sequence: Option<SequenceRange>,
537        mem_scan_metrics: Option<MemScanMetrics>,
538    ) -> Result<Self> {
539        let predicate = predicate
540            .map(|predicate| {
541                predicate
542                    .exprs()
543                    .iter()
544                    .filter_map(SimpleFilterEvaluator::try_new)
545                    .collect::<Vec<_>>()
546            })
547            .unwrap_or_default();
548        Ok(Self {
549            metadata,
550            series,
551            projection,
552            last_key: None,
553            predicate,
554            pk_schema,
555            pk_datatypes,
556            codec,
557            dedup,
558            merge_mode,
559            sequence,
560            metrics: Metrics::default(),
561            mem_scan_metrics,
562        })
563    }
564
565    fn report_mem_scan_metrics(&mut self) {
566        if let Some(mem_scan_metrics) = self.mem_scan_metrics.take() {
567            let inner = crate::memtable::MemScanMetricsData {
568                total_series: self.metrics.total_series,
569                num_rows: self.metrics.num_rows,
570                num_batches: self.metrics.num_batches,
571                scan_cost: self.metrics.scan_cost,
572            };
573            mem_scan_metrics.merge_inner(&inner);
574        }
575    }
576}
577
578impl Drop for Iter {
579    fn drop(&mut self) {
580        debug!(
581            "Iter {} time series memtable, metrics: {:?}",
582            self.metadata.region_id, self.metrics
583        );
584
585        // Report MemScanMetrics if not already reported
586        self.report_mem_scan_metrics();
587
588        READ_ROWS_TOTAL
589            .with_label_values(&["time_series_memtable"])
590            .inc_by(self.metrics.num_rows as u64);
591        READ_STAGE_ELAPSED
592            .with_label_values(&["scan_memtable"])
593            .observe(self.metrics.scan_cost.as_secs_f64());
594    }
595}
596
597impl Iterator for Iter {
598    type Item = Result<Batch>;
599
600    fn next(&mut self) -> Option<Self::Item> {
601        let start = Instant::now();
602        let map = self.series.read().unwrap();
603        let range = match &self.last_key {
604            None => map.0.range::<Vec<u8>, _>(..),
605            Some(last_key) => map
606                .0
607                .range::<Vec<u8>, _>((Bound::Excluded(last_key), Bound::Unbounded)),
608        };
609
610        // TODO(hl): maybe yield more than one time series to amortize range overhead.
611        for (primary_key, series) in range {
612            self.metrics.total_series += 1;
613
614            let mut series = series.write().unwrap();
615            if !self.predicate.is_empty()
616                && !prune_primary_key(
617                    &self.codec,
618                    primary_key.as_slice(),
619                    &mut series,
620                    &self.pk_datatypes,
621                    self.pk_schema.clone(),
622                    &self.predicate,
623                )
624            {
625                // read next series
626                self.metrics.num_pruned_series += 1;
627                continue;
628            }
629            self.last_key = Some(primary_key.clone());
630
631            let values = series.compact(&self.metadata);
632            let batch = values.and_then(|v| {
633                v.to_batch(
634                    primary_key,
635                    &self.metadata,
636                    &self.projection,
637                    self.sequence,
638                    self.dedup,
639                    self.merge_mode,
640                )
641            });
642
643            // Update metrics.
644            self.metrics.num_batches += 1;
645            self.metrics.num_rows += batch.as_ref().map(|b| b.num_rows()).unwrap_or(0);
646            self.metrics.scan_cost += start.elapsed();
647
648            return Some(batch);
649        }
650        drop(map); // Explicitly drop the read lock
651        self.metrics.scan_cost += start.elapsed();
652
653        // Report MemScanMetrics before returning None
654        self.report_mem_scan_metrics();
655
656        None
657    }
658}
659
660fn prune_primary_key(
661    codec: &Arc<DensePrimaryKeyCodec>,
662    pk: &[u8],
663    series: &mut Series,
664    datatypes: &[ConcreteDataType],
665    pk_schema: arrow::datatypes::SchemaRef,
666    predicates: &[SimpleFilterEvaluator],
667) -> bool {
668    // no primary key, we simply return true.
669    if pk_schema.fields().is_empty() {
670        return true;
671    }
672
673    // retrieve primary key values from cache or decode from bytes.
674    let pk_values = if let Some(pk_values) = series.pk_cache.as_ref() {
675        pk_values
676    } else {
677        let pk_values = codec.decode_dense_without_column_id(pk);
678        if let Err(e) = pk_values {
679            error!(e; "Failed to decode primary key");
680            return true;
681        }
682        series.update_pk_cache(pk_values.unwrap());
683        series.pk_cache.as_ref().unwrap()
684    };
685
686    // evaluate predicates against primary key values
687    let mut result = true;
688    for predicate in predicates {
689        // ignore predicates that are not referencing primary key columns
690        let Ok(index) = pk_schema.index_of(predicate.column_name()) else {
691            continue;
692        };
693        // Safety: arrow schema and datatypes are constructed from the same source.
694        let scalar_value = pk_values[index]
695            .try_to_scalar_value(&datatypes[index])
696            .unwrap();
697        result &= predicate.evaluate_scalar(&scalar_value).unwrap_or(true);
698    }
699
700    result
701}
702
703/// A `Series` holds a list of field values of some given primary key.
704pub struct Series {
705    pk_cache: Option<Vec<Value>>,
706    active: ValueBuilder,
707    frozen: Vec<Values>,
708    region_metadata: RegionMetadataRef,
709    capacity: usize,
710}
711
712impl Series {
713    pub(crate) fn with_capacity(
714        region_metadata: &RegionMetadataRef,
715        init_capacity: usize,
716        capacity: usize,
717    ) -> Self {
718        MEMTABLE_ACTIVE_SERIES_COUNT.inc();
719        Self {
720            pk_cache: None,
721            active: ValueBuilder::new(region_metadata, init_capacity),
722            frozen: vec![],
723            region_metadata: region_metadata.clone(),
724            capacity,
725        }
726    }
727
728    pub(crate) fn new(region_metadata: &RegionMetadataRef) -> Self {
729        Self::with_capacity(region_metadata, INITIAL_BUILDER_CAPACITY, BUILDER_CAPACITY)
730    }
731
732    pub fn is_empty(&self) -> bool {
733        self.active.len() == 0 && self.frozen.is_empty()
734    }
735
736    /// Pushes a row of values into Series. Return the size of values.
737    pub(crate) fn push<'a>(
738        &mut self,
739        ts: ValueRef<'a>,
740        sequence: u64,
741        op_type: OpType,
742        values: impl Iterator<Item = ValueRef<'a>>,
743    ) -> usize {
744        // + 10 to avoid potential reallocation.
745        if self.active.len() + 10 > self.capacity {
746            let region_metadata = self.region_metadata.clone();
747            self.freeze(&region_metadata);
748        }
749        self.active.push(ts, sequence, op_type as u8, values)
750    }
751
752    fn update_pk_cache(&mut self, pk_values: Vec<Value>) {
753        self.pk_cache = Some(pk_values);
754    }
755
756    /// Freezes the active part and push it to `frozen`.
757    pub(crate) fn freeze(&mut self, region_metadata: &RegionMetadataRef) {
758        if self.active.len() != 0 {
759            let mut builder = ValueBuilder::new(region_metadata, INITIAL_BUILDER_CAPACITY);
760            std::mem::swap(&mut self.active, &mut builder);
761            self.frozen.push(Values::from(builder));
762        }
763    }
764
765    pub(crate) fn extend(
766        &mut self,
767        ts_v: VectorRef,
768        op_type_v: u8,
769        sequence_v: u64,
770        fields: Vec<VectorRef>,
771    ) -> Result<()> {
772        if !self.active.can_accommodate(&fields)? {
773            let region_metadata = self.region_metadata.clone();
774            self.freeze(&region_metadata);
775        }
776        self.active.extend(ts_v, op_type_v, sequence_v, fields)
777    }
778
779    /// Freezes active part to frozen part and compact frozen part to reduce memory fragmentation.
780    /// Returns the frozen and compacted values.
781    pub(crate) fn compact(&mut self, region_metadata: &RegionMetadataRef) -> Result<&Values> {
782        self.freeze(region_metadata);
783
784        let frozen = &self.frozen;
785
786        // Each series must contain at least one row
787        debug_assert!(!frozen.is_empty());
788
789        if frozen.len() > 1 {
790            // TODO(hl): We should keep track of min/max timestamps for each values and avoid
791            // cloning and sorting when values do not overlap with each other.
792
793            let column_size = frozen[0].fields.len() + 3;
794
795            if cfg!(debug_assertions) {
796                debug_assert!(
797                    frozen
798                        .iter()
799                        .zip(frozen.iter().skip(1))
800                        .all(|(prev, next)| { prev.fields.len() == next.fields.len() })
801                );
802            }
803
804            let arrays = frozen.iter().map(|v| v.columns()).collect::<Vec<_>>();
805            let concatenated = (0..column_size)
806                .map(|i| {
807                    let to_concat = arrays.iter().map(|a| a[i].as_ref()).collect::<Vec<_>>();
808                    arrow::compute::concat(&to_concat)
809                })
810                .collect::<std::result::Result<Vec<_>, _>>()
811                .context(ComputeArrowSnafu)?;
812
813            debug_assert_eq!(concatenated.len(), column_size);
814            let values = Values::from_columns(&concatenated)?;
815            self.frozen = vec![values];
816        };
817        Ok(&self.frozen[0])
818    }
819
820    pub fn read_to_values(&self) -> Vec<Values> {
821        let mut res = Vec::with_capacity(self.frozen.len() + 1);
822        res.extend(self.frozen.iter().cloned());
823        res.push(self.active.finish_cloned());
824        res
825    }
826}
827
828/// `ValueBuilder` holds all the vector builders for field columns.
829pub(crate) struct ValueBuilder {
830    timestamp: Vec<i64>,
831    timestamp_type: ConcreteDataType,
832    sequence: Vec<u64>,
833    op_type: Vec<u8>,
834    fields: Vec<Option<FieldBuilder>>,
835    field_types: Vec<ConcreteDataType>,
836}
837
838impl ValueBuilder {
839    pub(crate) fn new(region_metadata: &RegionMetadataRef, capacity: usize) -> Self {
840        let timestamp_type = region_metadata
841            .time_index_column()
842            .column_schema
843            .data_type
844            .clone();
845        let sequence = Vec::with_capacity(capacity);
846        let op_type = Vec::with_capacity(capacity);
847
848        let field_types = region_metadata
849            .field_columns()
850            .map(|c| c.column_schema.data_type.clone())
851            .collect::<Vec<_>>();
852        let fields = (0..field_types.len()).map(|_| None).collect();
853        Self {
854            timestamp: Vec::with_capacity(capacity),
855            timestamp_type,
856            sequence,
857            op_type,
858            fields,
859            field_types,
860        }
861    }
862
863    /// Returns number of field builders.
864    pub fn num_field_builders(&self) -> usize {
865        self.fields.iter().flatten().count()
866    }
867
868    /// Pushes a new row to `ValueBuilder`.
869    /// We don't need primary keys since they've already be encoded.
870    /// Returns the size of field values.
871    ///
872    /// In this method, we don't check the data type of the value, because it is already checked in the caller.
873    pub(crate) fn push<'a>(
874        &mut self,
875        ts: ValueRef,
876        sequence: u64,
877        op_type: u8,
878        fields: impl Iterator<Item = ValueRef<'a>>,
879    ) -> usize {
880        #[cfg(debug_assertions)]
881        let fields = {
882            let field_vec = fields.collect::<Vec<_>>();
883            debug_assert_eq!(field_vec.len(), self.fields.len());
884            field_vec.into_iter()
885        };
886
887        self.timestamp
888            .push(ts.try_into_timestamp().unwrap().unwrap().value());
889        self.sequence.push(sequence);
890        self.op_type.push(op_type);
891        let num_rows = self.timestamp.len();
892        let mut size = 0;
893        for (idx, field_value) in fields.enumerate() {
894            size += field_value.data_size();
895            if !field_value.is_null() || self.fields[idx].is_some() {
896                if let Some(field) = self.fields[idx].as_mut() {
897                    let _ = field.push(field_value);
898                } else {
899                    let mut mutable_vector =
900                        if let ConcreteDataType::String(_) = &self.field_types[idx] {
901                            FieldBuilder::String(StringBuilder::with_capacity(4, 8))
902                        } else {
903                            FieldBuilder::Other(
904                                self.field_types[idx]
905                                    .create_mutable_vector(num_rows.max(INITIAL_BUILDER_CAPACITY)),
906                            )
907                        };
908                    mutable_vector.push_nulls(num_rows - 1);
909                    mutable_vector
910                        .push(field_value)
911                        .unwrap_or_else(|e| panic!("unexpected field value: {e:?}"));
912                    self.fields[idx] = Some(mutable_vector);
913                    MEMTABLE_ACTIVE_FIELD_BUILDER_COUNT.inc();
914                }
915            }
916        }
917
918        size
919    }
920
921    /// Checks if current value builder have sufficient space to accommodate `fields`.
922    /// Returns false if there is no space to accommodate fields due to offset overflow.
923    pub(crate) fn can_accommodate(&self, fields: &[VectorRef]) -> Result<bool> {
924        for (field_src, field_dest) in fields.iter().zip(self.fields.iter()) {
925            let Some(builder) = field_dest else {
926                continue;
927            };
928            let FieldBuilder::String(builder) = builder else {
929                continue;
930            };
931            let array = field_src.to_arrow_array();
932            let string_array = array
933                .as_any()
934                .downcast_ref::<StringArray>()
935                .with_context(|| error::InvalidBatchSnafu {
936                    reason: format!(
937                        "Field type mismatch, expecting String, given: {}",
938                        field_src.data_type()
939                    ),
940                })?;
941            let space_needed = string_array.value_data().len() as i32;
942            // offset may overflow
943            if builder.next_offset().checked_add(space_needed).is_none() {
944                return Ok(false);
945            }
946        }
947        Ok(true)
948    }
949
950    pub(crate) fn extend(
951        &mut self,
952        ts_v: VectorRef,
953        op_type: u8,
954        sequence: u64,
955        fields: Vec<VectorRef>,
956    ) -> Result<()> {
957        let num_rows_before = self.timestamp.len();
958        let num_rows_to_write = ts_v.len();
959        self.timestamp.reserve(num_rows_to_write);
960        match self.timestamp_type {
961            ConcreteDataType::Timestamp(TimestampType::Second(_)) => {
962                self.timestamp.extend(
963                    ts_v.as_any()
964                        .downcast_ref::<TimestampSecondVector>()
965                        .unwrap()
966                        .iter_data()
967                        .map(|v| v.unwrap().0.value()),
968                );
969            }
970            ConcreteDataType::Timestamp(TimestampType::Millisecond(_)) => {
971                self.timestamp.extend(
972                    ts_v.as_any()
973                        .downcast_ref::<TimestampMillisecondVector>()
974                        .unwrap()
975                        .iter_data()
976                        .map(|v| v.unwrap().0.value()),
977                );
978            }
979            ConcreteDataType::Timestamp(TimestampType::Microsecond(_)) => {
980                self.timestamp.extend(
981                    ts_v.as_any()
982                        .downcast_ref::<TimestampMicrosecondVector>()
983                        .unwrap()
984                        .iter_data()
985                        .map(|v| v.unwrap().0.value()),
986                );
987            }
988            ConcreteDataType::Timestamp(TimestampType::Nanosecond(_)) => {
989                self.timestamp.extend(
990                    ts_v.as_any()
991                        .downcast_ref::<TimestampNanosecondVector>()
992                        .unwrap()
993                        .iter_data()
994                        .map(|v| v.unwrap().0.value()),
995                );
996            }
997            _ => unreachable!(),
998        };
999
1000        self.op_type.reserve(num_rows_to_write);
1001        self.op_type
1002            .extend(iter::repeat_n(op_type, num_rows_to_write));
1003        self.sequence.reserve(num_rows_to_write);
1004        self.sequence
1005            .extend(iter::repeat_n(sequence, num_rows_to_write));
1006
1007        for (field_idx, (field_src, field_dest)) in
1008            fields.into_iter().zip(self.fields.iter_mut()).enumerate()
1009        {
1010            let builder = field_dest.get_or_insert_with(|| {
1011                let mut field_builder =
1012                    FieldBuilder::create(&self.field_types[field_idx], INITIAL_BUILDER_CAPACITY);
1013                field_builder.push_nulls(num_rows_before);
1014                field_builder
1015            });
1016            match builder {
1017                FieldBuilder::String(builder) => {
1018                    let array = field_src.to_arrow_array();
1019                    let string_array =
1020                        array
1021                            .as_any()
1022                            .downcast_ref::<StringArray>()
1023                            .with_context(|| error::InvalidBatchSnafu {
1024                                reason: format!(
1025                                    "Field type mismatch, expecting String, given: {}",
1026                                    field_src.data_type()
1027                                ),
1028                            })?;
1029                    builder.append_array(string_array);
1030                }
1031                FieldBuilder::Other(builder) => {
1032                    let len = field_src.len();
1033                    builder
1034                        .extend_slice_of(&*field_src, 0, len)
1035                        .context(error::ComputeVectorSnafu)?;
1036                }
1037            }
1038        }
1039        Ok(())
1040    }
1041
1042    /// Returns the length of [ValueBuilder]
1043    fn len(&self) -> usize {
1044        let sequence_len = self.sequence.len();
1045        debug_assert_eq!(sequence_len, self.op_type.len());
1046        debug_assert_eq!(sequence_len, self.timestamp.len());
1047        sequence_len
1048    }
1049
1050    fn finish_cloned(&self) -> Values {
1051        let num_rows = self.sequence.len();
1052        let fields = self
1053            .fields
1054            .iter()
1055            .enumerate()
1056            .map(|(i, v)| {
1057                if let Some(v) = v {
1058                    MEMTABLE_ACTIVE_FIELD_BUILDER_COUNT.dec();
1059                    v.finish_cloned()
1060                } else {
1061                    let mut single_null = self.field_types[i].create_mutable_vector(num_rows);
1062                    single_null.push_nulls(num_rows);
1063                    single_null.to_vector()
1064                }
1065            })
1066            .collect::<Vec<_>>();
1067
1068        let sequence = Arc::new(UInt64Vector::from_vec(self.sequence.clone()));
1069        let op_type = Arc::new(UInt8Vector::from_vec(self.op_type.clone()));
1070        let timestamp: VectorRef = match self.timestamp_type {
1071            ConcreteDataType::Timestamp(TimestampType::Second(_)) => {
1072                Arc::new(TimestampSecondVector::from_vec(self.timestamp.clone()))
1073            }
1074            ConcreteDataType::Timestamp(TimestampType::Millisecond(_)) => {
1075                Arc::new(TimestampMillisecondVector::from_vec(self.timestamp.clone()))
1076            }
1077            ConcreteDataType::Timestamp(TimestampType::Microsecond(_)) => {
1078                Arc::new(TimestampMicrosecondVector::from_vec(self.timestamp.clone()))
1079            }
1080            ConcreteDataType::Timestamp(TimestampType::Nanosecond(_)) => {
1081                Arc::new(TimestampNanosecondVector::from_vec(self.timestamp.clone()))
1082            }
1083            _ => unreachable!(),
1084        };
1085
1086        if cfg!(debug_assertions) {
1087            debug_assert_eq!(timestamp.len(), sequence.len());
1088            debug_assert_eq!(timestamp.len(), op_type.len());
1089            for field in &fields {
1090                debug_assert_eq!(timestamp.len(), field.len());
1091            }
1092        }
1093
1094        Values {
1095            timestamp,
1096            sequence,
1097            op_type,
1098            fields,
1099        }
1100    }
1101}
1102
1103/// [Values] holds an immutable vectors of field columns, including `sequence` and `op_type`.
1104#[derive(Clone)]
1105pub struct Values {
1106    pub(crate) timestamp: VectorRef,
1107    pub(crate) sequence: Arc<UInt64Vector>,
1108    pub(crate) op_type: Arc<UInt8Vector>,
1109    pub(crate) fields: Vec<VectorRef>,
1110}
1111
1112impl Values {
1113    /// Converts [Values] to `Batch`, applies the optional sequence filter, sorts the batch
1114    /// according to `timestamp, sequence` desc, and applies dedup/merge according to `merge_mode`.
1115    pub fn to_batch(
1116        &self,
1117        primary_key: &[u8],
1118        metadata: &RegionMetadataRef,
1119        projection: &HashSet<ColumnId>,
1120        sequence: Option<SequenceRange>,
1121        dedup: bool,
1122        merge_mode: MergeMode,
1123    ) -> Result<Batch> {
1124        let builder = BatchBuilder::with_required_columns(
1125            primary_key.to_vec(),
1126            self.timestamp.clone(),
1127            self.sequence.clone(),
1128            self.op_type.clone(),
1129        );
1130
1131        let fields = metadata
1132            .field_columns()
1133            .zip(self.fields.iter())
1134            .filter_map(|(c, f)| {
1135                projection.get(&c.column_id).map(|c| BatchColumn {
1136                    column_id: *c,
1137                    data: f.clone(),
1138                })
1139            })
1140            .collect();
1141
1142        let mut batch = builder.with_fields(fields).build()?;
1143        // The sequence filter must be applied before dedup/merge to:
1144        // - avoid dropping a timestamp when the newest row is out of range
1145        // - avoid filling null fields from rows that should be excluded by the sequence filter.
1146        batch.filter_by_sequence(sequence)?;
1147
1148        match (dedup, merge_mode) {
1149            // append-only, keep duplicate rows.
1150            (false, _) => batch.sort(false)?,
1151            // keep the last row for each timestamp.
1152            (true, MergeMode::LastRow) => batch.sort(true)?,
1153            // keep the last non-null value for each field.
1154            (true, MergeMode::LastNonNull) => {
1155                batch.sort(false)?;
1156                batch.merge_last_non_null()?;
1157            }
1158        }
1159        Ok(batch)
1160    }
1161
1162    /// Returns a vector of all columns converted to arrow [Array](datatypes::arrow::array::Array) in [Values].
1163    fn columns(&self) -> Vec<ArrayRef> {
1164        let mut res = Vec::with_capacity(3 + self.fields.len());
1165        res.push(self.timestamp.to_arrow_array());
1166        res.push(self.sequence.to_arrow_array());
1167        res.push(self.op_type.to_arrow_array());
1168        res.extend(self.fields.iter().map(|f| f.to_arrow_array()));
1169        res
1170    }
1171
1172    /// Builds a new [Values] instance from columns.
1173    fn from_columns(cols: &[ArrayRef]) -> Result<Self> {
1174        debug_assert!(cols.len() >= 3);
1175        let timestamp = Helper::try_into_vector(&cols[0]).context(ConvertVectorSnafu)?;
1176        let sequence =
1177            Arc::new(UInt64Vector::try_from_arrow_array(&cols[1]).context(ConvertVectorSnafu)?);
1178        let op_type =
1179            Arc::new(UInt8Vector::try_from_arrow_array(&cols[2]).context(ConvertVectorSnafu)?);
1180        let fields = Helper::try_into_vectors(&cols[3..]).context(ConvertVectorSnafu)?;
1181
1182        Ok(Self {
1183            timestamp,
1184            sequence,
1185            op_type,
1186            fields,
1187        })
1188    }
1189}
1190
1191impl From<ValueBuilder> for Values {
1192    fn from(mut value: ValueBuilder) -> Self {
1193        let num_rows = value.len();
1194        let fields = value
1195            .fields
1196            .iter_mut()
1197            .enumerate()
1198            .map(|(i, v)| {
1199                if let Some(v) = v {
1200                    MEMTABLE_ACTIVE_FIELD_BUILDER_COUNT.dec();
1201                    v.finish()
1202                } else {
1203                    let mut single_null = value.field_types[i].create_mutable_vector(num_rows);
1204                    single_null.push_nulls(num_rows);
1205                    single_null.to_vector()
1206                }
1207            })
1208            .collect::<Vec<_>>();
1209
1210        let sequence = Arc::new(UInt64Vector::from_vec(value.sequence));
1211        let op_type = Arc::new(UInt8Vector::from_vec(value.op_type));
1212        let timestamp: VectorRef = match value.timestamp_type {
1213            ConcreteDataType::Timestamp(TimestampType::Second(_)) => {
1214                Arc::new(TimestampSecondVector::from_vec(value.timestamp))
1215            }
1216            ConcreteDataType::Timestamp(TimestampType::Millisecond(_)) => {
1217                Arc::new(TimestampMillisecondVector::from_vec(value.timestamp))
1218            }
1219            ConcreteDataType::Timestamp(TimestampType::Microsecond(_)) => {
1220                Arc::new(TimestampMicrosecondVector::from_vec(value.timestamp))
1221            }
1222            ConcreteDataType::Timestamp(TimestampType::Nanosecond(_)) => {
1223                Arc::new(TimestampNanosecondVector::from_vec(value.timestamp))
1224            }
1225            _ => unreachable!(),
1226        };
1227
1228        if cfg!(debug_assertions) {
1229            debug_assert_eq!(timestamp.len(), sequence.len());
1230            debug_assert_eq!(timestamp.len(), op_type.len());
1231            for field in &fields {
1232                debug_assert_eq!(timestamp.len(), field.len());
1233            }
1234        }
1235
1236        Self {
1237            timestamp,
1238            sequence,
1239            op_type,
1240            fields,
1241        }
1242    }
1243}
1244
1245struct TimeSeriesIterBuilder {
1246    series_set: SeriesSet,
1247    projection: HashSet<ColumnId>,
1248    predicate: Option<Predicate>,
1249    dedup: bool,
1250    sequence: Option<SequenceRange>,
1251    merge_mode: MergeMode,
1252}
1253
1254impl IterBuilder for TimeSeriesIterBuilder {
1255    fn build(&self, metrics: Option<MemScanMetrics>) -> Result<BoxedBatchIterator> {
1256        let iter = self.series_set.iter_series(
1257            self.projection.clone(),
1258            self.predicate.clone(),
1259            self.dedup,
1260            self.merge_mode,
1261            self.sequence,
1262            metrics,
1263        )?;
1264        if self.merge_mode == MergeMode::LastNonNull {
1265            let iter = LastNonNullIter::new(iter);
1266            Ok(Box::new(iter))
1267        } else {
1268            Ok(Box::new(iter))
1269        }
1270    }
1271}
1272
1273#[cfg(test)]
1274mod tests {
1275    use std::collections::{HashMap, HashSet};
1276
1277    use api::helper::ColumnDataTypeWrapper;
1278    use api::v1::helper::row;
1279    use api::v1::value::ValueData;
1280    use api::v1::{Mutation, Rows, SemanticType};
1281    use common_time::Timestamp;
1282    use datatypes::prelude::{ConcreteDataType, ScalarVector};
1283    use datatypes::schema::ColumnSchema;
1284    use datatypes::value::{OrderedFloat, Value};
1285    use datatypes::vectors::{Float64Vector, Int64Vector, TimestampMillisecondVector};
1286    use mito_codec::row_converter::SortField;
1287    use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
1288    use store_api::storage::RegionId;
1289
1290    use super::*;
1291    use crate::test_util::column_metadata_to_column_schema;
1292
1293    fn schema_for_test() -> RegionMetadataRef {
1294        let mut builder = RegionMetadataBuilder::new(RegionId::new(123, 456));
1295        builder
1296            .push_column_metadata(ColumnMetadata {
1297                column_schema: ColumnSchema::new("k0", ConcreteDataType::string_datatype(), false),
1298                semantic_type: SemanticType::Tag,
1299                column_id: 0,
1300            })
1301            .push_column_metadata(ColumnMetadata {
1302                column_schema: ColumnSchema::new("k1", ConcreteDataType::int64_datatype(), false),
1303                semantic_type: SemanticType::Tag,
1304                column_id: 1,
1305            })
1306            .push_column_metadata(ColumnMetadata {
1307                column_schema: ColumnSchema::new(
1308                    "ts",
1309                    ConcreteDataType::timestamp_millisecond_datatype(),
1310                    false,
1311                ),
1312                semantic_type: SemanticType::Timestamp,
1313                column_id: 2,
1314            })
1315            .push_column_metadata(ColumnMetadata {
1316                column_schema: ColumnSchema::new("v0", ConcreteDataType::int64_datatype(), true),
1317                semantic_type: SemanticType::Field,
1318                column_id: 3,
1319            })
1320            .push_column_metadata(ColumnMetadata {
1321                column_schema: ColumnSchema::new("v1", ConcreteDataType::float64_datatype(), true),
1322                semantic_type: SemanticType::Field,
1323                column_id: 4,
1324            })
1325            .primary_key(vec![0, 1]);
1326        let region_metadata = builder.build().unwrap();
1327        Arc::new(region_metadata)
1328    }
1329
1330    fn ts_value_ref(val: i64) -> ValueRef<'static> {
1331        ValueRef::Timestamp(Timestamp::new_millisecond(val))
1332    }
1333
1334    fn field_value_ref(v0: i64, v1: f64) -> impl Iterator<Item = ValueRef<'static>> {
1335        vec![ValueRef::Int64(v0), ValueRef::Float64(OrderedFloat(v1))].into_iter()
1336    }
1337
1338    fn check_values(values: &Values, expect: &[(i64, u64, u8, i64, f64)]) {
1339        let ts = values
1340            .timestamp
1341            .as_any()
1342            .downcast_ref::<TimestampMillisecondVector>()
1343            .unwrap();
1344
1345        let v0 = values.fields[0]
1346            .as_any()
1347            .downcast_ref::<Int64Vector>()
1348            .unwrap();
1349        let v1 = values.fields[1]
1350            .as_any()
1351            .downcast_ref::<Float64Vector>()
1352            .unwrap();
1353        let read = ts
1354            .iter_data()
1355            .zip(values.sequence.iter_data())
1356            .zip(values.op_type.iter_data())
1357            .zip(v0.iter_data())
1358            .zip(v1.iter_data())
1359            .map(|((((ts, sequence), op_type), v0), v1)| {
1360                (
1361                    ts.unwrap().0.value(),
1362                    sequence.unwrap(),
1363                    op_type.unwrap(),
1364                    v0.unwrap(),
1365                    v1.unwrap(),
1366                )
1367            })
1368            .collect::<Vec<_>>();
1369        assert_eq!(expect, &read);
1370    }
1371
1372    #[test]
1373    fn test_series() {
1374        let region_metadata = schema_for_test();
1375        let mut series = Series::new(&region_metadata);
1376        series.push(ts_value_ref(1), 0, OpType::Put, field_value_ref(1, 10.1));
1377        series.push(ts_value_ref(2), 0, OpType::Put, field_value_ref(2, 10.2));
1378        assert_eq!(2, series.active.timestamp.len());
1379        assert_eq!(0, series.frozen.len());
1380
1381        let values = series.compact(&region_metadata).unwrap();
1382        check_values(values, &[(1, 0, 1, 1, 10.1), (2, 0, 1, 2, 10.2)]);
1383        assert_eq!(0, series.active.timestamp.len());
1384        assert_eq!(1, series.frozen.len());
1385    }
1386
1387    #[test]
1388    fn test_series_with_nulls() {
1389        let region_metadata = schema_for_test();
1390        let mut series = Series::new(&region_metadata);
1391        // col1: NULL 1 2 3
1392        // col2: NULL NULL 10.2 NULL
1393        series.push(
1394            ts_value_ref(1),
1395            0,
1396            OpType::Put,
1397            vec![ValueRef::Null, ValueRef::Null].into_iter(),
1398        );
1399        series.push(
1400            ts_value_ref(1),
1401            0,
1402            OpType::Put,
1403            vec![ValueRef::Int64(1), ValueRef::Null].into_iter(),
1404        );
1405        series.push(ts_value_ref(1), 2, OpType::Put, field_value_ref(2, 10.2));
1406        series.push(
1407            ts_value_ref(1),
1408            3,
1409            OpType::Put,
1410            vec![ValueRef::Int64(2), ValueRef::Null].into_iter(),
1411        );
1412        assert_eq!(4, series.active.timestamp.len());
1413        assert_eq!(0, series.frozen.len());
1414
1415        let values = series.compact(&region_metadata).unwrap();
1416        assert_eq!(values.fields[0].null_count(), 1);
1417        assert_eq!(values.fields[1].null_count(), 3);
1418        assert_eq!(0, series.active.timestamp.len());
1419        assert_eq!(1, series.frozen.len());
1420    }
1421
1422    fn check_value(batch: &Batch, expect: Vec<Vec<Value>>) {
1423        let ts_len = batch.timestamps().len();
1424        assert_eq!(batch.sequences().len(), ts_len);
1425        assert_eq!(batch.op_types().len(), ts_len);
1426        for f in batch.fields() {
1427            assert_eq!(f.data.len(), ts_len);
1428        }
1429
1430        let mut rows = vec![];
1431        for idx in 0..ts_len {
1432            let mut row = Vec::with_capacity(batch.fields().len() + 3);
1433            row.push(batch.timestamps().get(idx));
1434            row.push(batch.sequences().get(idx));
1435            row.push(batch.op_types().get(idx));
1436            row.extend(batch.fields().iter().map(|f| f.data.get(idx)));
1437            rows.push(row);
1438        }
1439
1440        assert_eq!(expect.len(), rows.len());
1441        for (idx, row) in rows.iter().enumerate() {
1442            assert_eq!(&expect[idx], row);
1443        }
1444    }
1445
1446    #[test]
1447    fn test_values_sort() {
1448        let schema = schema_for_test();
1449        let timestamp = Arc::new(TimestampMillisecondVector::from_vec(vec![1, 2, 3, 4, 3]));
1450        let sequence = Arc::new(UInt64Vector::from_vec(vec![1, 1, 1, 1, 2]));
1451        let op_type = Arc::new(UInt8Vector::from_vec(vec![1, 1, 1, 1, 0]));
1452
1453        let fields = vec![
1454            Arc::new(Int64Vector::from_vec(vec![4, 3, 2, 1, 2])) as Arc<_>,
1455            Arc::new(Float64Vector::from_vec(vec![1.1, 2.1, 4.2, 3.3, 4.2])) as Arc<_>,
1456        ];
1457        let values = Values {
1458            timestamp: timestamp as Arc<_>,
1459            sequence,
1460            op_type,
1461            fields,
1462        };
1463
1464        let batch = values
1465            .to_batch(
1466                b"test",
1467                &schema,
1468                &[0, 1, 2, 3, 4].into_iter().collect(),
1469                None,
1470                true,
1471                MergeMode::LastRow,
1472            )
1473            .unwrap();
1474        check_value(
1475            &batch,
1476            vec![
1477                vec![
1478                    Value::Timestamp(Timestamp::new_millisecond(1)),
1479                    Value::UInt64(1),
1480                    Value::UInt8(1),
1481                    Value::Int64(4),
1482                    Value::Float64(OrderedFloat(1.1)),
1483                ],
1484                vec![
1485                    Value::Timestamp(Timestamp::new_millisecond(2)),
1486                    Value::UInt64(1),
1487                    Value::UInt8(1),
1488                    Value::Int64(3),
1489                    Value::Float64(OrderedFloat(2.1)),
1490                ],
1491                vec![
1492                    Value::Timestamp(Timestamp::new_millisecond(3)),
1493                    Value::UInt64(2),
1494                    Value::UInt8(0),
1495                    Value::Int64(2),
1496                    Value::Float64(OrderedFloat(4.2)),
1497                ],
1498                vec![
1499                    Value::Timestamp(Timestamp::new_millisecond(4)),
1500                    Value::UInt64(1),
1501                    Value::UInt8(1),
1502                    Value::Int64(1),
1503                    Value::Float64(OrderedFloat(3.3)),
1504                ],
1505            ],
1506        )
1507    }
1508
1509    #[test]
1510    fn test_last_non_null_should_filter_by_sequence_before_merge_drop_ts() {
1511        let schema = schema_for_test();
1512        let projection: HashSet<_> = [0, 1, 2, 3, 4].into_iter().collect();
1513
1514        // Same timestamp, newest sequence is out of range. We should still keep the timestamp by
1515        // using the latest row *within* the sequence range as the base row.
1516        //
1517        // Expect after filtering seq<=2:
1518        // - base row: seq=2
1519        // - v0 from seq=2, v1 filled from seq=1
1520        let timestamp = Arc::new(TimestampMillisecondVector::from_vec(vec![1, 1, 1]));
1521        let sequence = Arc::new(UInt64Vector::from_vec(vec![1, 2, 3]));
1522        let op_type = Arc::new(UInt8Vector::from_vec(vec![OpType::Put as u8; 3]));
1523        let fields = vec![
1524            Arc::new(Int64Vector::from(vec![None, Some(10), None])) as Arc<_>,
1525            Arc::new(Float64Vector::from(vec![Some(1.5), None, None])) as Arc<_>,
1526        ];
1527        let values = Values {
1528            timestamp: timestamp as Arc<_>,
1529            sequence,
1530            op_type,
1531            fields,
1532        };
1533
1534        let batch = values
1535            .to_batch(
1536                b"test",
1537                &schema,
1538                &projection,
1539                Some(SequenceRange::LtEq { max: 2 }),
1540                true,
1541                MergeMode::LastNonNull,
1542            )
1543            .unwrap();
1544
1545        check_value(
1546            &batch,
1547            vec![vec![
1548                Value::Timestamp(Timestamp::new_millisecond(1)),
1549                Value::UInt64(2),
1550                Value::UInt8(OpType::Put as u8),
1551                Value::Int64(10),
1552                Value::Float64(OrderedFloat(1.5)),
1553            ]],
1554        );
1555    }
1556
1557    #[test]
1558    fn test_last_non_null_should_filter_by_sequence_before_merge_no_fill_from_out_of_range_row() {
1559        let schema = schema_for_test();
1560        let projection: HashSet<_> = [0, 1, 2, 3, 4].into_iter().collect();
1561
1562        // Same timestamp, older sequence is out of range. We must not fill null fields using rows
1563        // that should be excluded by the sequence filter.
1564        //
1565        // Expect after filtering seq>1:
1566        // - keep only seq=2 row, v0 stays NULL.
1567        let timestamp = Arc::new(TimestampMillisecondVector::from_vec(vec![1, 1]));
1568        let sequence = Arc::new(UInt64Vector::from_vec(vec![1, 2]));
1569        let op_type = Arc::new(UInt8Vector::from_vec(vec![OpType::Put as u8; 2]));
1570        let fields = vec![
1571            Arc::new(Int64Vector::from(vec![Some(10), None])) as Arc<_>,
1572            Arc::new(Float64Vector::from(vec![Some(1.0), Some(1.0)])) as Arc<_>,
1573        ];
1574        let values = Values {
1575            timestamp: timestamp as Arc<_>,
1576            sequence,
1577            op_type,
1578            fields,
1579        };
1580
1581        let batch = values
1582            .to_batch(
1583                b"test",
1584                &schema,
1585                &projection,
1586                Some(SequenceRange::Gt { min: 1 }),
1587                true,
1588                MergeMode::LastNonNull,
1589            )
1590            .unwrap();
1591
1592        check_value(
1593            &batch,
1594            vec![vec![
1595                Value::Timestamp(Timestamp::new_millisecond(1)),
1596                Value::UInt64(2),
1597                Value::UInt8(OpType::Put as u8),
1598                Value::Null,
1599                Value::Float64(OrderedFloat(1.0)),
1600            ]],
1601        );
1602    }
1603
1604    fn build_key_values(schema: &RegionMetadataRef, k0: String, k1: i64, len: usize) -> KeyValues {
1605        let column_schema = schema
1606            .column_metadatas
1607            .iter()
1608            .map(|c| api::v1::ColumnSchema {
1609                column_name: c.column_schema.name.clone(),
1610                datatype: ColumnDataTypeWrapper::try_from(c.column_schema.data_type.clone())
1611                    .unwrap()
1612                    .datatype() as i32,
1613                semantic_type: c.semantic_type as i32,
1614                ..Default::default()
1615            })
1616            .collect();
1617
1618        let rows = (0..len)
1619            .map(|i| {
1620                row(vec![
1621                    ValueData::StringValue(k0.clone()),
1622                    ValueData::I64Value(k1),
1623                    ValueData::TimestampMillisecondValue(i as i64),
1624                    ValueData::I64Value(i as i64),
1625                    ValueData::F64Value(i as f64),
1626                ])
1627            })
1628            .collect();
1629        let mutation = api::v1::Mutation {
1630            op_type: 1,
1631            sequence: 0,
1632            rows: Some(Rows {
1633                schema: column_schema,
1634                rows,
1635            }),
1636            write_hint: None,
1637        };
1638        KeyValues::new(schema.as_ref(), mutation).unwrap()
1639    }
1640
1641    #[test]
1642    fn test_series_set_concurrency() {
1643        let schema = schema_for_test();
1644        let row_codec = Arc::new(DensePrimaryKeyCodec::with_fields(
1645            schema
1646                .primary_key_columns()
1647                .map(|c| {
1648                    (
1649                        c.column_id,
1650                        SortField::new(c.column_schema.data_type.clone()),
1651                    )
1652                })
1653                .collect(),
1654        ));
1655        let set = Arc::new(SeriesSet::new(schema.clone(), row_codec));
1656
1657        let concurrency = 32;
1658        let pk_num = concurrency * 2;
1659        let mut handles = Vec::with_capacity(concurrency);
1660        for i in 0..concurrency {
1661            let set = set.clone();
1662            let schema = schema.clone();
1663            let column_schemas = schema
1664                .column_metadatas
1665                .iter()
1666                .map(column_metadata_to_column_schema)
1667                .collect::<Vec<_>>();
1668            let handle = std::thread::spawn(move || {
1669                for j in i * 100..(i + 1) * 100 {
1670                    let pk = j % pk_num;
1671                    let primary_key = format!("pk-{}", pk).as_bytes().to_vec();
1672
1673                    let kvs = KeyValues::new(
1674                        &schema,
1675                        Mutation {
1676                            op_type: OpType::Put as i32,
1677                            sequence: j as u64,
1678                            rows: Some(Rows {
1679                                schema: column_schemas.clone(),
1680                                rows: vec![row(vec![
1681                                    ValueData::StringValue(format!("{}", j)),
1682                                    ValueData::I64Value(j as i64),
1683                                    ValueData::TimestampMillisecondValue(j as i64),
1684                                    ValueData::I64Value(j as i64),
1685                                    ValueData::F64Value(j as f64),
1686                                ])],
1687                            }),
1688                            write_hint: None,
1689                        },
1690                    )
1691                    .unwrap();
1692                    set.push_to_series(primary_key, &kvs.iter().next().unwrap());
1693                }
1694            });
1695            handles.push(handle);
1696        }
1697        for h in handles {
1698            h.join().unwrap();
1699        }
1700
1701        let mut timestamps = Vec::with_capacity(concurrency * 100);
1702        let mut sequences = Vec::with_capacity(concurrency * 100);
1703        let mut op_types = Vec::with_capacity(concurrency * 100);
1704        let mut v0 = Vec::with_capacity(concurrency * 100);
1705
1706        for i in 0..pk_num {
1707            let pk = format!("pk-{}", i).as_bytes().to_vec();
1708            let series = set.get_series(&pk).unwrap();
1709            let mut guard = series.write().unwrap();
1710            let values = guard.compact(&schema).unwrap();
1711            timestamps.extend(values.sequence.iter_data().map(|v| v.unwrap() as i64));
1712            sequences.extend(values.sequence.iter_data().map(|v| v.unwrap() as i64));
1713            op_types.extend(values.op_type.iter_data().map(|v| v.unwrap()));
1714            v0.extend(
1715                values
1716                    .fields
1717                    .first()
1718                    .unwrap()
1719                    .as_any()
1720                    .downcast_ref::<Int64Vector>()
1721                    .unwrap()
1722                    .iter_data()
1723                    .map(|v| v.unwrap()),
1724            );
1725        }
1726
1727        let expected_sequence = (0..(concurrency * 100) as i64).collect::<HashSet<_>>();
1728        assert_eq!(
1729            expected_sequence,
1730            sequences.iter().copied().collect::<HashSet<_>>()
1731        );
1732
1733        op_types.iter().all(|op| *op == OpType::Put as u8);
1734        assert_eq!(
1735            expected_sequence,
1736            timestamps.iter().copied().collect::<HashSet<_>>()
1737        );
1738
1739        assert_eq!(timestamps, sequences);
1740        assert_eq!(v0, timestamps);
1741    }
1742
1743    #[test]
1744    fn test_memtable() {
1745        common_telemetry::init_default_ut_logging();
1746        check_memtable_dedup(true);
1747        check_memtable_dedup(false);
1748    }
1749
1750    fn check_memtable_dedup(dedup: bool) {
1751        let schema = schema_for_test();
1752        let kvs = build_key_values(&schema, "hello".to_string(), 42, 100);
1753        let memtable = TimeSeriesMemtable::new(schema, 42, None, dedup, MergeMode::LastRow);
1754        memtable.write(&kvs).unwrap();
1755        memtable.write(&kvs).unwrap();
1756
1757        let mut expected_ts: HashMap<i64, usize> = HashMap::new();
1758        for ts in kvs.iter().map(|kv| {
1759            kv.timestamp()
1760                .try_into_timestamp()
1761                .unwrap()
1762                .unwrap()
1763                .value()
1764        }) {
1765            *expected_ts.entry(ts).or_default() += if dedup { 1 } else { 2 };
1766        }
1767
1768        let ranges = memtable.ranges(None, RangesOptions::default()).unwrap();
1769        let range = ranges.ranges.into_values().next().unwrap();
1770        let iter = range.build_iter().unwrap();
1771        let mut read = HashMap::new();
1772
1773        for ts in iter
1774            .flat_map(|batch| {
1775                batch
1776                    .unwrap()
1777                    .timestamps()
1778                    .as_any()
1779                    .downcast_ref::<TimestampMillisecondVector>()
1780                    .unwrap()
1781                    .iter_data()
1782                    .collect::<Vec<_>>()
1783                    .into_iter()
1784            })
1785            .map(|v| v.unwrap().0.value())
1786        {
1787            *read.entry(ts).or_default() += 1;
1788        }
1789        assert_eq!(expected_ts, read);
1790
1791        let stats = memtable.stats();
1792        assert!(stats.bytes_allocated() > 0);
1793        assert_eq!(
1794            Some((
1795                Timestamp::new_millisecond(0),
1796                Timestamp::new_millisecond(99)
1797            )),
1798            stats.time_range()
1799        );
1800    }
1801
1802    #[test]
1803    fn test_memtable_projection() {
1804        common_telemetry::init_default_ut_logging();
1805        let schema = schema_for_test();
1806        let kvs = build_key_values(&schema, "hello".to_string(), 42, 100);
1807        let memtable = TimeSeriesMemtable::new(schema, 42, None, true, MergeMode::LastRow);
1808        memtable.write(&kvs).unwrap();
1809
1810        let iter = memtable
1811            .ranges(Some(&[3]), RangesOptions::default())
1812            .unwrap()
1813            .build(None)
1814            .unwrap();
1815
1816        let mut v0_all = vec![];
1817
1818        for res in iter {
1819            let batch = res.unwrap();
1820            assert_eq!(1, batch.fields().len());
1821            let v0 = batch
1822                .fields()
1823                .first()
1824                .unwrap()
1825                .data
1826                .as_any()
1827                .downcast_ref::<Int64Vector>()
1828                .unwrap();
1829            v0_all.extend(v0.iter_data().map(|v| v.unwrap()));
1830        }
1831        assert_eq!((0..100i64).collect::<Vec<_>>(), v0_all);
1832    }
1833
1834    #[test]
1835    fn test_memtable_concurrent_write_read() {
1836        common_telemetry::init_default_ut_logging();
1837        let schema = schema_for_test();
1838        let memtable = Arc::new(TimeSeriesMemtable::new(
1839            schema.clone(),
1840            42,
1841            None,
1842            true,
1843            MergeMode::LastRow,
1844        ));
1845
1846        // Number of writer threads
1847        let num_writers = 10;
1848        // Number of reader threads
1849        let num_readers = 5;
1850        // Number of series per writer
1851        let series_per_writer = 100;
1852        // Number of rows per series
1853        let rows_per_series = 10;
1854        // Total number of series
1855        let total_series = num_writers * series_per_writer;
1856
1857        // Create a barrier to synchronize the start of all threads
1858        let barrier = Arc::new(std::sync::Barrier::new(num_writers + num_readers + 1));
1859
1860        // Spawn writer threads
1861        let mut writer_handles = Vec::with_capacity(num_writers);
1862        for writer_id in 0..num_writers {
1863            let memtable = memtable.clone();
1864            let schema = schema.clone();
1865            let barrier = barrier.clone();
1866
1867            let handle = std::thread::spawn(move || {
1868                // Wait for all threads to be ready
1869                barrier.wait();
1870
1871                // Create and write series
1872                for series_id in 0..series_per_writer {
1873                    let series_key = format!("writer-{}-series-{}", writer_id, series_id);
1874                    let kvs =
1875                        build_key_values(&schema, series_key, series_id as i64, rows_per_series);
1876                    memtable.write(&kvs).unwrap();
1877                }
1878            });
1879
1880            writer_handles.push(handle);
1881        }
1882
1883        // Spawn reader threads
1884        let mut reader_handles = Vec::with_capacity(num_readers);
1885        for _ in 0..num_readers {
1886            let memtable = memtable.clone();
1887            let barrier = barrier.clone();
1888
1889            let handle = std::thread::spawn(move || {
1890                barrier.wait();
1891
1892                for _ in 0..10 {
1893                    let iter = memtable
1894                        .ranges(None, RangesOptions::default())
1895                        .unwrap()
1896                        .build(None)
1897                        .unwrap();
1898                    for batch_result in iter {
1899                        let _ = batch_result.unwrap();
1900                    }
1901                }
1902            });
1903
1904            reader_handles.push(handle);
1905        }
1906
1907        barrier.wait();
1908
1909        for handle in writer_handles {
1910            handle.join().unwrap();
1911        }
1912        for handle in reader_handles {
1913            handle.join().unwrap();
1914        }
1915
1916        let iter = memtable
1917            .ranges(None, RangesOptions::default())
1918            .unwrap()
1919            .build(None)
1920            .unwrap();
1921        let mut series_count = 0;
1922        let mut row_count = 0;
1923
1924        for batch_result in iter {
1925            let batch = batch_result.unwrap();
1926            series_count += 1;
1927            row_count += batch.num_rows();
1928        }
1929        assert_eq!(total_series, series_count);
1930        assert_eq!(total_series * rows_per_series, row_count);
1931    }
1932
1933    #[test]
1934    fn test_build_record_batch_iter_from_memtable() {
1935        let schema = schema_for_test();
1936        let memtable = TimeSeriesMemtable::new(schema.clone(), 1, None, true, MergeMode::LastRow);
1937
1938        let kvs = build_key_values(&schema, "test".to_string(), 1, 10);
1939        memtable.write(&kvs).unwrap();
1940
1941        let read_column_ids: Vec<ColumnId> = schema
1942            .column_metadatas
1943            .iter()
1944            .map(|c| c.column_id)
1945            .collect();
1946        let ranges = memtable
1947            .ranges(Some(&read_column_ids), RangesOptions::default())
1948            .unwrap();
1949        assert_eq!(1, ranges.ranges.len());
1950
1951        let range = ranges.ranges.into_values().next().unwrap();
1952        let mut iter = range.build_record_batch_iter(None, None).unwrap();
1953        let rb = iter.next().transpose().unwrap().unwrap();
1954        assert_eq!(10, rb.num_rows());
1955        // k0, k1 (pk columns), v0, v1 (field columns), ts, __primary_key, __sequence, __op_type
1956        let schema = rb.schema();
1957        let column_names: Vec<_> = schema.fields().iter().map(|f| f.name().as_str()).collect();
1958        assert_eq!(
1959            column_names,
1960            vec![
1961                "k0",
1962                "k1",
1963                "v0",
1964                "v1",
1965                "ts",
1966                "__primary_key",
1967                "__sequence",
1968                "__op_type",
1969            ]
1970        );
1971        assert!(iter.next().is_none());
1972    }
1973
1974    #[test]
1975    fn test_build_record_batch_iter_with_time_range() {
1976        let schema = schema_for_test();
1977        let memtable = TimeSeriesMemtable::new(schema.clone(), 1, None, true, MergeMode::LastRow);
1978
1979        let kvs = build_key_values(&schema, "test".to_string(), 1, 10);
1980        memtable.write(&kvs).unwrap();
1981
1982        let read_column_ids: Vec<ColumnId> = schema
1983            .column_metadatas
1984            .iter()
1985            .map(|c| c.column_id)
1986            .collect();
1987        let ranges = memtable
1988            .ranges(Some(&read_column_ids), RangesOptions::default())
1989            .unwrap();
1990        assert_eq!(1, ranges.ranges.len());
1991
1992        let time_range = (Timestamp::new_millisecond(3), Timestamp::new_millisecond(7));
1993
1994        let range = ranges.ranges.into_values().next().unwrap();
1995        let mut iter = range
1996            .build_record_batch_iter(Some(time_range), None)
1997            .unwrap();
1998
1999        let mut total_rows = 0;
2000        let mut all_timestamps = Vec::new();
2001        while let Some(rb) = iter.next().transpose().unwrap() {
2002            total_rows += rb.num_rows();
2003            let ts_col = rb
2004                .column_by_name("ts")
2005                .unwrap()
2006                .as_any()
2007                .downcast_ref::<datatypes::arrow::array::TimestampMillisecondArray>()
2008                .unwrap();
2009            for i in 0..ts_col.len() {
2010                all_timestamps.push(ts_col.value(i));
2011            }
2012        }
2013        assert_eq!(5, total_rows);
2014        all_timestamps.sort();
2015        assert_eq!(vec![3, 4, 5, 6, 7], all_timestamps);
2016    }
2017}