mito2/memtable/
time_series.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::btree_map::Entry;
16use std::collections::{BTreeMap, Bound, HashSet};
17use std::fmt::{Debug, Formatter};
18use std::iter;
19use std::sync::atomic::{AtomicI64, AtomicU64, AtomicUsize, Ordering};
20use std::sync::{Arc, RwLock};
21use std::time::{Duration, Instant};
22
23use api::v1::OpType;
24use common_recordbatch::filter::SimpleFilterEvaluator;
25use common_telemetry::{debug, error};
26use common_time::Timestamp;
27use datatypes::arrow;
28use datatypes::arrow::array::ArrayRef;
29use datatypes::arrow_array::StringArray;
30use datatypes::data_type::{ConcreteDataType, DataType};
31use datatypes::prelude::{ScalarVector, Vector, VectorRef};
32use datatypes::types::TimestampType;
33use datatypes::value::{Value, ValueRef};
34use datatypes::vectors::{
35    Helper, TimestampMicrosecondVector, TimestampMillisecondVector, TimestampNanosecondVector,
36    TimestampSecondVector, UInt8Vector, UInt64Vector,
37};
38use mito_codec::key_values::KeyValue;
39use mito_codec::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt};
40use snafu::{OptionExt, ResultExt, ensure};
41use store_api::metadata::RegionMetadataRef;
42use store_api::storage::{ColumnId, SequenceRange};
43use table::predicate::Predicate;
44
45use crate::error::{
46    self, ComputeArrowSnafu, ConvertVectorSnafu, EncodeSnafu, PrimaryKeyLengthMismatchSnafu, Result,
47};
48use crate::flush::WriteBufferManagerRef;
49use crate::memtable::builder::{FieldBuilder, StringBuilder};
50use crate::memtable::bulk::part::BulkPart;
51use crate::memtable::simple_bulk_memtable::SimpleBulkMemtable;
52use crate::memtable::stats::WriteMetrics;
53use crate::memtable::{
54    AllocTracker, BoxedBatchIterator, IterBuilder, KeyValues, MemScanMetrics, Memtable,
55    MemtableBuilder, MemtableId, MemtableRange, MemtableRangeContext, MemtableRanges, MemtableRef,
56    MemtableStats, RangesOptions,
57};
58use crate::metrics::{
59    MEMTABLE_ACTIVE_FIELD_BUILDER_COUNT, MEMTABLE_ACTIVE_SERIES_COUNT, READ_ROWS_TOTAL,
60    READ_STAGE_ELAPSED,
61};
62use crate::read::dedup::LastNonNullIter;
63use crate::read::{Batch, BatchBuilder, BatchColumn};
64use crate::region::options::MergeMode;
65
66/// Initial vector builder capacity.
67const INITIAL_BUILDER_CAPACITY: usize = 4;
68
69/// Vector builder capacity.
70const BUILDER_CAPACITY: usize = 512;
71
72/// Builder to build [TimeSeriesMemtable].
73#[derive(Debug, Default)]
74pub struct TimeSeriesMemtableBuilder {
75    write_buffer_manager: Option<WriteBufferManagerRef>,
76    dedup: bool,
77    merge_mode: MergeMode,
78}
79
80impl TimeSeriesMemtableBuilder {
81    /// Creates a new builder with specific `write_buffer_manager`.
82    pub fn new(
83        write_buffer_manager: Option<WriteBufferManagerRef>,
84        dedup: bool,
85        merge_mode: MergeMode,
86    ) -> Self {
87        Self {
88            write_buffer_manager,
89            dedup,
90            merge_mode,
91        }
92    }
93}
94
95impl MemtableBuilder for TimeSeriesMemtableBuilder {
96    fn build(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef {
97        if metadata.primary_key.is_empty() {
98            Arc::new(SimpleBulkMemtable::new(
99                id,
100                metadata.clone(),
101                self.write_buffer_manager.clone(),
102                self.dedup,
103                self.merge_mode,
104            ))
105        } else {
106            Arc::new(TimeSeriesMemtable::new(
107                metadata.clone(),
108                id,
109                self.write_buffer_manager.clone(),
110                self.dedup,
111                self.merge_mode,
112            ))
113        }
114    }
115
116    fn use_bulk_insert(&self, _metadata: &RegionMetadataRef) -> bool {
117        // Now if we can use simple bulk memtable, the input request is already
118        // a bulk write request and won't call this method.
119        false
120    }
121}
122
123/// Memtable implementation that groups rows by their primary key.
124pub struct TimeSeriesMemtable {
125    id: MemtableId,
126    region_metadata: RegionMetadataRef,
127    row_codec: Arc<DensePrimaryKeyCodec>,
128    series_set: SeriesSet,
129    alloc_tracker: AllocTracker,
130    max_timestamp: AtomicI64,
131    min_timestamp: AtomicI64,
132    max_sequence: AtomicU64,
133    dedup: bool,
134    merge_mode: MergeMode,
135    /// Total written rows in memtable. This also includes deleted and duplicated rows.
136    num_rows: AtomicUsize,
137}
138
139impl TimeSeriesMemtable {
140    pub fn new(
141        region_metadata: RegionMetadataRef,
142        id: MemtableId,
143        write_buffer_manager: Option<WriteBufferManagerRef>,
144        dedup: bool,
145        merge_mode: MergeMode,
146    ) -> Self {
147        let row_codec = Arc::new(DensePrimaryKeyCodec::new(&region_metadata));
148        let series_set = SeriesSet::new(region_metadata.clone(), row_codec.clone());
149        Self {
150            id,
151            region_metadata,
152            series_set,
153            row_codec,
154            alloc_tracker: AllocTracker::new(write_buffer_manager),
155            max_timestamp: AtomicI64::new(i64::MIN),
156            min_timestamp: AtomicI64::new(i64::MAX),
157            max_sequence: AtomicU64::new(0),
158            dedup,
159            merge_mode,
160            num_rows: Default::default(),
161        }
162    }
163
164    /// Updates memtable stats.
165    fn update_stats(&self, stats: WriteMetrics) {
166        self.alloc_tracker
167            .on_allocation(stats.key_bytes + stats.value_bytes);
168        self.max_timestamp.fetch_max(stats.max_ts, Ordering::SeqCst);
169        self.min_timestamp.fetch_min(stats.min_ts, Ordering::SeqCst);
170        self.max_sequence
171            .fetch_max(stats.max_sequence, Ordering::SeqCst);
172        self.num_rows.fetch_add(stats.num_rows, Ordering::SeqCst);
173    }
174
175    fn write_key_value(&self, kv: KeyValue, stats: &mut WriteMetrics) -> Result<()> {
176        ensure!(
177            self.row_codec.num_fields() == kv.num_primary_keys(),
178            PrimaryKeyLengthMismatchSnafu {
179                expect: self.row_codec.num_fields(),
180                actual: kv.num_primary_keys(),
181            }
182        );
183
184        let primary_key_encoded = self
185            .row_codec
186            .encode(kv.primary_keys())
187            .context(EncodeSnafu)?;
188
189        let (key_allocated, value_allocated) =
190            self.series_set.push_to_series(primary_key_encoded, &kv);
191        stats.key_bytes += key_allocated;
192        stats.value_bytes += value_allocated;
193
194        // safety: timestamp of kv must be both present and a valid timestamp value.
195        let ts = kv
196            .timestamp()
197            .try_into_timestamp()
198            .unwrap()
199            .unwrap()
200            .value();
201        stats.min_ts = stats.min_ts.min(ts);
202        stats.max_ts = stats.max_ts.max(ts);
203        Ok(())
204    }
205}
206
207impl Debug for TimeSeriesMemtable {
208    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
209        f.debug_struct("TimeSeriesMemtable").finish()
210    }
211}
212
213impl Memtable for TimeSeriesMemtable {
214    fn id(&self) -> MemtableId {
215        self.id
216    }
217
218    fn write(&self, kvs: &KeyValues) -> Result<()> {
219        if kvs.is_empty() {
220            return Ok(());
221        }
222
223        let mut local_stats = WriteMetrics::default();
224
225        for kv in kvs.iter() {
226            self.write_key_value(kv, &mut local_stats)?;
227        }
228        local_stats.value_bytes += kvs.num_rows() * std::mem::size_of::<Timestamp>();
229        local_stats.value_bytes += kvs.num_rows() * std::mem::size_of::<OpType>();
230        local_stats.max_sequence = kvs.max_sequence();
231        local_stats.num_rows = kvs.num_rows();
232        // TODO(hl): this maybe inaccurate since for-iteration may return early.
233        // We may lift the primary key length check out of Memtable::write
234        // so that we can ensure writing to memtable will succeed.
235        self.update_stats(local_stats);
236        Ok(())
237    }
238
239    fn write_one(&self, key_value: KeyValue) -> Result<()> {
240        let mut metrics = WriteMetrics::default();
241        let res = self.write_key_value(key_value, &mut metrics);
242        metrics.value_bytes += std::mem::size_of::<Timestamp>() + std::mem::size_of::<OpType>();
243        metrics.max_sequence = key_value.sequence();
244        metrics.num_rows = 1;
245
246        if res.is_ok() {
247            self.update_stats(metrics);
248        }
249        res
250    }
251
252    fn write_bulk(&self, part: BulkPart) -> Result<()> {
253        // Default implementation fallback to row iteration.
254        let mutation = part.to_mutation(&self.region_metadata)?;
255        let mut metrics = WriteMetrics::default();
256        if let Some(key_values) = KeyValues::new(&self.region_metadata, mutation) {
257            for kv in key_values.iter() {
258                self.write_key_value(kv, &mut metrics)?
259            }
260        }
261
262        metrics.max_sequence = part.sequence;
263        metrics.max_ts = part.max_timestamp;
264        metrics.min_ts = part.min_timestamp;
265        metrics.num_rows = part.num_rows();
266        self.update_stats(metrics);
267        Ok(())
268    }
269
270    #[cfg(any(test, feature = "test"))]
271    fn iter(
272        &self,
273        projection: Option<&[ColumnId]>,
274        filters: Option<Predicate>,
275        sequence: Option<SequenceRange>,
276    ) -> Result<BoxedBatchIterator> {
277        let projection = if let Some(projection) = projection {
278            projection.iter().copied().collect()
279        } else {
280            self.region_metadata
281                .field_columns()
282                .map(|c| c.column_id)
283                .collect()
284        };
285
286        let iter = self.series_set.iter_series(
287            projection,
288            filters,
289            self.dedup,
290            self.merge_mode,
291            sequence,
292            None,
293        )?;
294
295        if self.merge_mode == MergeMode::LastNonNull {
296            let iter = LastNonNullIter::new(iter);
297            Ok(Box::new(iter))
298        } else {
299            Ok(Box::new(iter))
300        }
301    }
302
303    fn ranges(
304        &self,
305        projection: Option<&[ColumnId]>,
306        options: RangesOptions,
307    ) -> Result<MemtableRanges> {
308        let predicate = options.predicate;
309        let sequence = options.sequence;
310        let projection = if let Some(projection) = projection {
311            projection.iter().copied().collect()
312        } else {
313            self.region_metadata
314                .field_columns()
315                .map(|c| c.column_id)
316                .collect()
317        };
318        let builder = Box::new(TimeSeriesIterBuilder {
319            series_set: self.series_set.clone(),
320            projection,
321            predicate: predicate.predicate().cloned(),
322            dedup: self.dedup,
323            merge_mode: self.merge_mode,
324            sequence,
325        });
326        let context = Arc::new(MemtableRangeContext::new(self.id, builder, predicate));
327
328        let range_stats = self.stats();
329        let range = MemtableRange::new(context, range_stats);
330        Ok(MemtableRanges {
331            ranges: [(0, range)].into(),
332        })
333    }
334
335    fn is_empty(&self) -> bool {
336        self.series_set.series.read().unwrap().0.is_empty()
337    }
338
339    fn freeze(&self) -> Result<()> {
340        self.alloc_tracker.done_allocating();
341
342        Ok(())
343    }
344
345    fn stats(&self) -> MemtableStats {
346        let estimated_bytes = self.alloc_tracker.bytes_allocated();
347
348        if estimated_bytes == 0 {
349            // no rows ever written
350            return MemtableStats {
351                estimated_bytes,
352                time_range: None,
353                num_rows: 0,
354                num_ranges: 0,
355                max_sequence: 0,
356                series_count: 0,
357            };
358        }
359        let ts_type = self
360            .region_metadata
361            .time_index_column()
362            .column_schema
363            .data_type
364            .clone()
365            .as_timestamp()
366            .expect("Timestamp column must have timestamp type");
367        let max_timestamp = ts_type.create_timestamp(self.max_timestamp.load(Ordering::Relaxed));
368        let min_timestamp = ts_type.create_timestamp(self.min_timestamp.load(Ordering::Relaxed));
369        let series_count = self.series_set.series.read().unwrap().0.len();
370        MemtableStats {
371            estimated_bytes,
372            time_range: Some((min_timestamp, max_timestamp)),
373            num_rows: self.num_rows.load(Ordering::Relaxed),
374            num_ranges: 1,
375            max_sequence: self.max_sequence.load(Ordering::Relaxed),
376            series_count,
377        }
378    }
379
380    fn fork(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef {
381        Arc::new(TimeSeriesMemtable::new(
382            metadata.clone(),
383            id,
384            self.alloc_tracker.write_buffer_manager(),
385            self.dedup,
386            self.merge_mode,
387        ))
388    }
389}
390
391#[derive(Default)]
392struct SeriesMap(BTreeMap<Vec<u8>, Arc<RwLock<Series>>>);
393
394impl Drop for SeriesMap {
395    fn drop(&mut self) {
396        let num_series = self.0.len();
397        let num_field_builders = self
398            .0
399            .values()
400            .map(|v| v.read().unwrap().active.num_field_builders())
401            .sum::<usize>();
402        MEMTABLE_ACTIVE_SERIES_COUNT.sub(num_series as i64);
403        MEMTABLE_ACTIVE_FIELD_BUILDER_COUNT.sub(num_field_builders as i64);
404    }
405}
406
407#[derive(Clone)]
408pub(crate) struct SeriesSet {
409    region_metadata: RegionMetadataRef,
410    series: Arc<RwLock<SeriesMap>>,
411    codec: Arc<DensePrimaryKeyCodec>,
412}
413
414impl SeriesSet {
415    fn new(region_metadata: RegionMetadataRef, codec: Arc<DensePrimaryKeyCodec>) -> Self {
416        Self {
417            region_metadata,
418            series: Default::default(),
419            codec,
420        }
421    }
422}
423
424impl SeriesSet {
425    /// Push [KeyValue] to SeriesSet with given primary key and return key/value allocated memory size.
426    fn push_to_series(&self, primary_key: Vec<u8>, kv: &KeyValue) -> (usize, usize) {
427        if let Some(series) = self.series.read().unwrap().0.get(&primary_key) {
428            let value_allocated = series.write().unwrap().push(
429                kv.timestamp(),
430                kv.sequence(),
431                kv.op_type(),
432                kv.fields(),
433            );
434            return (0, value_allocated);
435        };
436
437        let mut indices = self.series.write().unwrap();
438        match indices.0.entry(primary_key) {
439            Entry::Vacant(v) => {
440                let key_len = v.key().len();
441                let mut series = Series::new(&self.region_metadata);
442                let value_allocated =
443                    series.push(kv.timestamp(), kv.sequence(), kv.op_type(), kv.fields());
444                v.insert(Arc::new(RwLock::new(series)));
445                (key_len, value_allocated)
446            }
447            // safety: series must exist at given index.
448            Entry::Occupied(v) => {
449                let value_allocated = v.get().write().unwrap().push(
450                    kv.timestamp(),
451                    kv.sequence(),
452                    kv.op_type(),
453                    kv.fields(),
454                );
455                (0, value_allocated)
456            }
457        }
458    }
459
460    #[cfg(test)]
461    fn get_series(&self, primary_key: &[u8]) -> Option<Arc<RwLock<Series>>> {
462        self.series.read().unwrap().0.get(primary_key).cloned()
463    }
464
465    /// Iterates all series in [SeriesSet].
466    fn iter_series(
467        &self,
468        projection: HashSet<ColumnId>,
469        predicate: Option<Predicate>,
470        dedup: bool,
471        merge_mode: MergeMode,
472        sequence: Option<SequenceRange>,
473        mem_scan_metrics: Option<MemScanMetrics>,
474    ) -> Result<Iter> {
475        let primary_key_schema = primary_key_schema(&self.region_metadata);
476        let primary_key_datatypes = self
477            .region_metadata
478            .primary_key_columns()
479            .map(|pk| pk.column_schema.data_type.clone())
480            .collect();
481
482        Iter::try_new(
483            self.region_metadata.clone(),
484            self.series.clone(),
485            projection,
486            predicate,
487            primary_key_schema,
488            primary_key_datatypes,
489            self.codec.clone(),
490            dedup,
491            merge_mode,
492            sequence,
493            mem_scan_metrics,
494        )
495    }
496}
497
498/// Creates an arrow [SchemaRef](arrow::datatypes::SchemaRef) that only contains primary keys
499/// of given region schema
500pub(crate) fn primary_key_schema(
501    region_metadata: &RegionMetadataRef,
502) -> arrow::datatypes::SchemaRef {
503    let fields = region_metadata
504        .primary_key_columns()
505        .map(|pk| {
506            arrow::datatypes::Field::new(
507                pk.column_schema.name.clone(),
508                pk.column_schema.data_type.as_arrow_type(),
509                pk.column_schema.is_nullable(),
510            )
511        })
512        .collect::<Vec<_>>();
513    Arc::new(arrow::datatypes::Schema::new(fields))
514}
515
516/// Metrics for reading the memtable.
517#[derive(Debug, Default)]
518struct Metrics {
519    /// Total series in the memtable.
520    total_series: usize,
521    /// Number of series pruned.
522    num_pruned_series: usize,
523    /// Number of rows read.
524    num_rows: usize,
525    /// Number of batch read.
526    num_batches: usize,
527    /// Duration to scan the memtable.
528    scan_cost: Duration,
529}
530
531struct Iter {
532    metadata: RegionMetadataRef,
533    series: Arc<RwLock<SeriesMap>>,
534    projection: HashSet<ColumnId>,
535    last_key: Option<Vec<u8>>,
536    predicate: Vec<SimpleFilterEvaluator>,
537    pk_schema: arrow::datatypes::SchemaRef,
538    pk_datatypes: Vec<ConcreteDataType>,
539    codec: Arc<DensePrimaryKeyCodec>,
540    dedup: bool,
541    merge_mode: MergeMode,
542    sequence: Option<SequenceRange>,
543    metrics: Metrics,
544    mem_scan_metrics: Option<MemScanMetrics>,
545}
546
547impl Iter {
548    #[allow(clippy::too_many_arguments)]
549    pub(crate) fn try_new(
550        metadata: RegionMetadataRef,
551        series: Arc<RwLock<SeriesMap>>,
552        projection: HashSet<ColumnId>,
553        predicate: Option<Predicate>,
554        pk_schema: arrow::datatypes::SchemaRef,
555        pk_datatypes: Vec<ConcreteDataType>,
556        codec: Arc<DensePrimaryKeyCodec>,
557        dedup: bool,
558        merge_mode: MergeMode,
559        sequence: Option<SequenceRange>,
560        mem_scan_metrics: Option<MemScanMetrics>,
561    ) -> Result<Self> {
562        let predicate = predicate
563            .map(|predicate| {
564                predicate
565                    .exprs()
566                    .iter()
567                    .filter_map(SimpleFilterEvaluator::try_new)
568                    .collect::<Vec<_>>()
569            })
570            .unwrap_or_default();
571        Ok(Self {
572            metadata,
573            series,
574            projection,
575            last_key: None,
576            predicate,
577            pk_schema,
578            pk_datatypes,
579            codec,
580            dedup,
581            merge_mode,
582            sequence,
583            metrics: Metrics::default(),
584            mem_scan_metrics,
585        })
586    }
587
588    fn report_mem_scan_metrics(&mut self) {
589        if let Some(mem_scan_metrics) = self.mem_scan_metrics.take() {
590            let inner = crate::memtable::MemScanMetricsData {
591                total_series: self.metrics.total_series,
592                num_rows: self.metrics.num_rows,
593                num_batches: self.metrics.num_batches,
594                scan_cost: self.metrics.scan_cost,
595            };
596            mem_scan_metrics.merge_inner(&inner);
597        }
598    }
599}
600
601impl Drop for Iter {
602    fn drop(&mut self) {
603        debug!(
604            "Iter {} time series memtable, metrics: {:?}",
605            self.metadata.region_id, self.metrics
606        );
607
608        // Report MemScanMetrics if not already reported
609        self.report_mem_scan_metrics();
610
611        READ_ROWS_TOTAL
612            .with_label_values(&["time_series_memtable"])
613            .inc_by(self.metrics.num_rows as u64);
614        READ_STAGE_ELAPSED
615            .with_label_values(&["scan_memtable"])
616            .observe(self.metrics.scan_cost.as_secs_f64());
617    }
618}
619
620impl Iterator for Iter {
621    type Item = Result<Batch>;
622
623    fn next(&mut self) -> Option<Self::Item> {
624        let start = Instant::now();
625        let map = self.series.read().unwrap();
626        let range = match &self.last_key {
627            None => map.0.range::<Vec<u8>, _>(..),
628            Some(last_key) => map
629                .0
630                .range::<Vec<u8>, _>((Bound::Excluded(last_key), Bound::Unbounded)),
631        };
632
633        // TODO(hl): maybe yield more than one time series to amortize range overhead.
634        for (primary_key, series) in range {
635            self.metrics.total_series += 1;
636
637            let mut series = series.write().unwrap();
638            if !self.predicate.is_empty()
639                && !prune_primary_key(
640                    &self.codec,
641                    primary_key.as_slice(),
642                    &mut series,
643                    &self.pk_datatypes,
644                    self.pk_schema.clone(),
645                    &self.predicate,
646                )
647            {
648                // read next series
649                self.metrics.num_pruned_series += 1;
650                continue;
651            }
652            self.last_key = Some(primary_key.clone());
653
654            let values = series.compact(&self.metadata);
655            let batch = values.and_then(|v| {
656                v.to_batch(
657                    primary_key,
658                    &self.metadata,
659                    &self.projection,
660                    self.sequence,
661                    self.dedup,
662                    self.merge_mode,
663                )
664            });
665
666            // Update metrics.
667            self.metrics.num_batches += 1;
668            self.metrics.num_rows += batch.as_ref().map(|b| b.num_rows()).unwrap_or(0);
669            self.metrics.scan_cost += start.elapsed();
670
671            return Some(batch);
672        }
673        drop(map); // Explicitly drop the read lock
674        self.metrics.scan_cost += start.elapsed();
675
676        // Report MemScanMetrics before returning None
677        self.report_mem_scan_metrics();
678
679        None
680    }
681}
682
683fn prune_primary_key(
684    codec: &Arc<DensePrimaryKeyCodec>,
685    pk: &[u8],
686    series: &mut Series,
687    datatypes: &[ConcreteDataType],
688    pk_schema: arrow::datatypes::SchemaRef,
689    predicates: &[SimpleFilterEvaluator],
690) -> bool {
691    // no primary key, we simply return true.
692    if pk_schema.fields().is_empty() {
693        return true;
694    }
695
696    // retrieve primary key values from cache or decode from bytes.
697    let pk_values = if let Some(pk_values) = series.pk_cache.as_ref() {
698        pk_values
699    } else {
700        let pk_values = codec.decode_dense_without_column_id(pk);
701        if let Err(e) = pk_values {
702            error!(e; "Failed to decode primary key");
703            return true;
704        }
705        series.update_pk_cache(pk_values.unwrap());
706        series.pk_cache.as_ref().unwrap()
707    };
708
709    // evaluate predicates against primary key values
710    let mut result = true;
711    for predicate in predicates {
712        // ignore predicates that are not referencing primary key columns
713        let Ok(index) = pk_schema.index_of(predicate.column_name()) else {
714            continue;
715        };
716        // Safety: arrow schema and datatypes are constructed from the same source.
717        let scalar_value = pk_values[index]
718            .try_to_scalar_value(&datatypes[index])
719            .unwrap();
720        result &= predicate.evaluate_scalar(&scalar_value).unwrap_or(true);
721    }
722
723    result
724}
725
726/// A `Series` holds a list of field values of some given primary key.
727pub struct Series {
728    pk_cache: Option<Vec<Value>>,
729    active: ValueBuilder,
730    frozen: Vec<Values>,
731    region_metadata: RegionMetadataRef,
732    capacity: usize,
733}
734
735impl Series {
736    pub(crate) fn with_capacity(
737        region_metadata: &RegionMetadataRef,
738        init_capacity: usize,
739        capacity: usize,
740    ) -> Self {
741        MEMTABLE_ACTIVE_SERIES_COUNT.inc();
742        Self {
743            pk_cache: None,
744            active: ValueBuilder::new(region_metadata, init_capacity),
745            frozen: vec![],
746            region_metadata: region_metadata.clone(),
747            capacity,
748        }
749    }
750
751    pub(crate) fn new(region_metadata: &RegionMetadataRef) -> Self {
752        Self::with_capacity(region_metadata, INITIAL_BUILDER_CAPACITY, BUILDER_CAPACITY)
753    }
754
755    pub fn is_empty(&self) -> bool {
756        self.active.len() == 0 && self.frozen.is_empty()
757    }
758
759    /// Pushes a row of values into Series. Return the size of values.
760    pub(crate) fn push<'a>(
761        &mut self,
762        ts: ValueRef<'a>,
763        sequence: u64,
764        op_type: OpType,
765        values: impl Iterator<Item = ValueRef<'a>>,
766    ) -> usize {
767        // + 10 to avoid potential reallocation.
768        if self.active.len() + 10 > self.capacity {
769            let region_metadata = self.region_metadata.clone();
770            self.freeze(&region_metadata);
771        }
772        self.active.push(ts, sequence, op_type as u8, values)
773    }
774
775    fn update_pk_cache(&mut self, pk_values: Vec<Value>) {
776        self.pk_cache = Some(pk_values);
777    }
778
779    /// Freezes the active part and push it to `frozen`.
780    pub(crate) fn freeze(&mut self, region_metadata: &RegionMetadataRef) {
781        if self.active.len() != 0 {
782            let mut builder = ValueBuilder::new(region_metadata, INITIAL_BUILDER_CAPACITY);
783            std::mem::swap(&mut self.active, &mut builder);
784            self.frozen.push(Values::from(builder));
785        }
786    }
787
788    pub(crate) fn extend(
789        &mut self,
790        ts_v: VectorRef,
791        op_type_v: u8,
792        sequence_v: u64,
793        fields: Vec<VectorRef>,
794    ) -> Result<()> {
795        if !self.active.can_accommodate(&fields)? {
796            let region_metadata = self.region_metadata.clone();
797            self.freeze(&region_metadata);
798        }
799        self.active.extend(ts_v, op_type_v, sequence_v, fields)
800    }
801
802    /// Freezes active part to frozen part and compact frozen part to reduce memory fragmentation.
803    /// Returns the frozen and compacted values.
804    pub(crate) fn compact(&mut self, region_metadata: &RegionMetadataRef) -> Result<&Values> {
805        self.freeze(region_metadata);
806
807        let frozen = &self.frozen;
808
809        // Each series must contain at least one row
810        debug_assert!(!frozen.is_empty());
811
812        if frozen.len() > 1 {
813            // TODO(hl): We should keep track of min/max timestamps for each values and avoid
814            // cloning and sorting when values do not overlap with each other.
815
816            let column_size = frozen[0].fields.len() + 3;
817
818            if cfg!(debug_assertions) {
819                debug_assert!(
820                    frozen
821                        .iter()
822                        .zip(frozen.iter().skip(1))
823                        .all(|(prev, next)| { prev.fields.len() == next.fields.len() })
824                );
825            }
826
827            let arrays = frozen.iter().map(|v| v.columns()).collect::<Vec<_>>();
828            let concatenated = (0..column_size)
829                .map(|i| {
830                    let to_concat = arrays.iter().map(|a| a[i].as_ref()).collect::<Vec<_>>();
831                    arrow::compute::concat(&to_concat)
832                })
833                .collect::<std::result::Result<Vec<_>, _>>()
834                .context(ComputeArrowSnafu)?;
835
836            debug_assert_eq!(concatenated.len(), column_size);
837            let values = Values::from_columns(&concatenated)?;
838            self.frozen = vec![values];
839        };
840        Ok(&self.frozen[0])
841    }
842
843    pub fn read_to_values(&self) -> Vec<Values> {
844        let mut res = Vec::with_capacity(self.frozen.len() + 1);
845        res.extend(self.frozen.iter().cloned());
846        res.push(self.active.finish_cloned());
847        res
848    }
849}
850
851/// `ValueBuilder` holds all the vector builders for field columns.
852pub(crate) struct ValueBuilder {
853    timestamp: Vec<i64>,
854    timestamp_type: ConcreteDataType,
855    sequence: Vec<u64>,
856    op_type: Vec<u8>,
857    fields: Vec<Option<FieldBuilder>>,
858    field_types: Vec<ConcreteDataType>,
859}
860
861impl ValueBuilder {
862    pub(crate) fn new(region_metadata: &RegionMetadataRef, capacity: usize) -> Self {
863        let timestamp_type = region_metadata
864            .time_index_column()
865            .column_schema
866            .data_type
867            .clone();
868        let sequence = Vec::with_capacity(capacity);
869        let op_type = Vec::with_capacity(capacity);
870
871        let field_types = region_metadata
872            .field_columns()
873            .map(|c| c.column_schema.data_type.clone())
874            .collect::<Vec<_>>();
875        let fields = (0..field_types.len()).map(|_| None).collect();
876        Self {
877            timestamp: Vec::with_capacity(capacity),
878            timestamp_type,
879            sequence,
880            op_type,
881            fields,
882            field_types,
883        }
884    }
885
886    /// Returns number of field builders.
887    pub fn num_field_builders(&self) -> usize {
888        self.fields.iter().flatten().count()
889    }
890
891    /// Pushes a new row to `ValueBuilder`.
892    /// We don't need primary keys since they've already be encoded.
893    /// Returns the size of field values.
894    ///
895    /// In this method, we don't check the data type of the value, because it is already checked in the caller.
896    pub(crate) fn push<'a>(
897        &mut self,
898        ts: ValueRef,
899        sequence: u64,
900        op_type: u8,
901        fields: impl Iterator<Item = ValueRef<'a>>,
902    ) -> usize {
903        #[cfg(debug_assertions)]
904        let fields = {
905            let field_vec = fields.collect::<Vec<_>>();
906            debug_assert_eq!(field_vec.len(), self.fields.len());
907            field_vec.into_iter()
908        };
909
910        self.timestamp
911            .push(ts.try_into_timestamp().unwrap().unwrap().value());
912        self.sequence.push(sequence);
913        self.op_type.push(op_type);
914        let num_rows = self.timestamp.len();
915        let mut size = 0;
916        for (idx, field_value) in fields.enumerate() {
917            size += field_value.data_size();
918            if !field_value.is_null() || self.fields[idx].is_some() {
919                if let Some(field) = self.fields[idx].as_mut() {
920                    let _ = field.push(field_value);
921                } else {
922                    let mut mutable_vector =
923                        if let ConcreteDataType::String(_) = &self.field_types[idx] {
924                            FieldBuilder::String(StringBuilder::with_capacity(4, 8))
925                        } else {
926                            FieldBuilder::Other(
927                                self.field_types[idx]
928                                    .create_mutable_vector(num_rows.max(INITIAL_BUILDER_CAPACITY)),
929                            )
930                        };
931                    mutable_vector.push_nulls(num_rows - 1);
932                    mutable_vector
933                        .push(field_value)
934                        .unwrap_or_else(|e| panic!("unexpected field value: {e:?}"));
935                    self.fields[idx] = Some(mutable_vector);
936                    MEMTABLE_ACTIVE_FIELD_BUILDER_COUNT.inc();
937                }
938            }
939        }
940
941        size
942    }
943
944    /// Checks if current value builder have sufficient space to accommodate `fields`.
945    /// Returns false if there is no space to accommodate fields due to offset overflow.
946    pub(crate) fn can_accommodate(&self, fields: &[VectorRef]) -> Result<bool> {
947        for (field_src, field_dest) in fields.iter().zip(self.fields.iter()) {
948            let Some(builder) = field_dest else {
949                continue;
950            };
951            let FieldBuilder::String(builder) = builder else {
952                continue;
953            };
954            let array = field_src.to_arrow_array();
955            let string_array = array
956                .as_any()
957                .downcast_ref::<StringArray>()
958                .with_context(|| error::InvalidBatchSnafu {
959                    reason: format!(
960                        "Field type mismatch, expecting String, given: {}",
961                        field_src.data_type()
962                    ),
963                })?;
964            let space_needed = string_array.value_data().len() as i32;
965            // offset may overflow
966            if builder.next_offset().checked_add(space_needed).is_none() {
967                return Ok(false);
968            }
969        }
970        Ok(true)
971    }
972
973    pub(crate) fn extend(
974        &mut self,
975        ts_v: VectorRef,
976        op_type: u8,
977        sequence: u64,
978        fields: Vec<VectorRef>,
979    ) -> Result<()> {
980        let num_rows_before = self.timestamp.len();
981        let num_rows_to_write = ts_v.len();
982        self.timestamp.reserve(num_rows_to_write);
983        match self.timestamp_type {
984            ConcreteDataType::Timestamp(TimestampType::Second(_)) => {
985                self.timestamp.extend(
986                    ts_v.as_any()
987                        .downcast_ref::<TimestampSecondVector>()
988                        .unwrap()
989                        .iter_data()
990                        .map(|v| v.unwrap().0.value()),
991                );
992            }
993            ConcreteDataType::Timestamp(TimestampType::Millisecond(_)) => {
994                self.timestamp.extend(
995                    ts_v.as_any()
996                        .downcast_ref::<TimestampMillisecondVector>()
997                        .unwrap()
998                        .iter_data()
999                        .map(|v| v.unwrap().0.value()),
1000                );
1001            }
1002            ConcreteDataType::Timestamp(TimestampType::Microsecond(_)) => {
1003                self.timestamp.extend(
1004                    ts_v.as_any()
1005                        .downcast_ref::<TimestampMicrosecondVector>()
1006                        .unwrap()
1007                        .iter_data()
1008                        .map(|v| v.unwrap().0.value()),
1009                );
1010            }
1011            ConcreteDataType::Timestamp(TimestampType::Nanosecond(_)) => {
1012                self.timestamp.extend(
1013                    ts_v.as_any()
1014                        .downcast_ref::<TimestampNanosecondVector>()
1015                        .unwrap()
1016                        .iter_data()
1017                        .map(|v| v.unwrap().0.value()),
1018                );
1019            }
1020            _ => unreachable!(),
1021        };
1022
1023        self.op_type.reserve(num_rows_to_write);
1024        self.op_type
1025            .extend(iter::repeat_n(op_type, num_rows_to_write));
1026        self.sequence.reserve(num_rows_to_write);
1027        self.sequence
1028            .extend(iter::repeat_n(sequence, num_rows_to_write));
1029
1030        for (field_idx, (field_src, field_dest)) in
1031            fields.into_iter().zip(self.fields.iter_mut()).enumerate()
1032        {
1033            let builder = field_dest.get_or_insert_with(|| {
1034                let mut field_builder =
1035                    FieldBuilder::create(&self.field_types[field_idx], INITIAL_BUILDER_CAPACITY);
1036                field_builder.push_nulls(num_rows_before);
1037                field_builder
1038            });
1039            match builder {
1040                FieldBuilder::String(builder) => {
1041                    let array = field_src.to_arrow_array();
1042                    let string_array =
1043                        array
1044                            .as_any()
1045                            .downcast_ref::<StringArray>()
1046                            .with_context(|| error::InvalidBatchSnafu {
1047                                reason: format!(
1048                                    "Field type mismatch, expecting String, given: {}",
1049                                    field_src.data_type()
1050                                ),
1051                            })?;
1052                    builder.append_array(string_array);
1053                }
1054                FieldBuilder::Other(builder) => {
1055                    let len = field_src.len();
1056                    builder
1057                        .extend_slice_of(&*field_src, 0, len)
1058                        .context(error::ComputeVectorSnafu)?;
1059                }
1060            }
1061        }
1062        Ok(())
1063    }
1064
1065    /// Returns the length of [ValueBuilder]
1066    fn len(&self) -> usize {
1067        let sequence_len = self.sequence.len();
1068        debug_assert_eq!(sequence_len, self.op_type.len());
1069        debug_assert_eq!(sequence_len, self.timestamp.len());
1070        sequence_len
1071    }
1072
1073    fn finish_cloned(&self) -> Values {
1074        let num_rows = self.sequence.len();
1075        let fields = self
1076            .fields
1077            .iter()
1078            .enumerate()
1079            .map(|(i, v)| {
1080                if let Some(v) = v {
1081                    MEMTABLE_ACTIVE_FIELD_BUILDER_COUNT.dec();
1082                    v.finish_cloned()
1083                } else {
1084                    let mut single_null = self.field_types[i].create_mutable_vector(num_rows);
1085                    single_null.push_nulls(num_rows);
1086                    single_null.to_vector()
1087                }
1088            })
1089            .collect::<Vec<_>>();
1090
1091        let sequence = Arc::new(UInt64Vector::from_vec(self.sequence.clone()));
1092        let op_type = Arc::new(UInt8Vector::from_vec(self.op_type.clone()));
1093        let timestamp: VectorRef = match self.timestamp_type {
1094            ConcreteDataType::Timestamp(TimestampType::Second(_)) => {
1095                Arc::new(TimestampSecondVector::from_vec(self.timestamp.clone()))
1096            }
1097            ConcreteDataType::Timestamp(TimestampType::Millisecond(_)) => {
1098                Arc::new(TimestampMillisecondVector::from_vec(self.timestamp.clone()))
1099            }
1100            ConcreteDataType::Timestamp(TimestampType::Microsecond(_)) => {
1101                Arc::new(TimestampMicrosecondVector::from_vec(self.timestamp.clone()))
1102            }
1103            ConcreteDataType::Timestamp(TimestampType::Nanosecond(_)) => {
1104                Arc::new(TimestampNanosecondVector::from_vec(self.timestamp.clone()))
1105            }
1106            _ => unreachable!(),
1107        };
1108
1109        if cfg!(debug_assertions) {
1110            debug_assert_eq!(timestamp.len(), sequence.len());
1111            debug_assert_eq!(timestamp.len(), op_type.len());
1112            for field in &fields {
1113                debug_assert_eq!(timestamp.len(), field.len());
1114            }
1115        }
1116
1117        Values {
1118            timestamp,
1119            sequence,
1120            op_type,
1121            fields,
1122        }
1123    }
1124}
1125
1126/// [Values] holds an immutable vectors of field columns, including `sequence` and `op_type`.
1127#[derive(Clone)]
1128pub struct Values {
1129    pub(crate) timestamp: VectorRef,
1130    pub(crate) sequence: Arc<UInt64Vector>,
1131    pub(crate) op_type: Arc<UInt8Vector>,
1132    pub(crate) fields: Vec<VectorRef>,
1133}
1134
1135impl Values {
1136    /// Converts [Values] to `Batch`, applies the optional sequence filter, sorts the batch
1137    /// according to `timestamp, sequence` desc, and applies dedup/merge according to `merge_mode`.
1138    pub fn to_batch(
1139        &self,
1140        primary_key: &[u8],
1141        metadata: &RegionMetadataRef,
1142        projection: &HashSet<ColumnId>,
1143        sequence: Option<SequenceRange>,
1144        dedup: bool,
1145        merge_mode: MergeMode,
1146    ) -> Result<Batch> {
1147        let builder = BatchBuilder::with_required_columns(
1148            primary_key.to_vec(),
1149            self.timestamp.clone(),
1150            self.sequence.clone(),
1151            self.op_type.clone(),
1152        );
1153
1154        let fields = metadata
1155            .field_columns()
1156            .zip(self.fields.iter())
1157            .filter_map(|(c, f)| {
1158                projection.get(&c.column_id).map(|c| BatchColumn {
1159                    column_id: *c,
1160                    data: f.clone(),
1161                })
1162            })
1163            .collect();
1164
1165        let mut batch = builder.with_fields(fields).build()?;
1166        // The sequence filter must be applied before dedup/merge to:
1167        // - avoid dropping a timestamp when the newest row is out of range
1168        // - avoid filling null fields from rows that should be excluded by the sequence filter.
1169        batch.filter_by_sequence(sequence)?;
1170
1171        match (dedup, merge_mode) {
1172            // append-only, keep duplicate rows.
1173            (false, _) => batch.sort(false)?,
1174            // keep the last row for each timestamp.
1175            (true, MergeMode::LastRow) => batch.sort(true)?,
1176            // keep the last non-null value for each field.
1177            (true, MergeMode::LastNonNull) => {
1178                batch.sort(false)?;
1179                batch.merge_last_non_null()?;
1180            }
1181        }
1182        Ok(batch)
1183    }
1184
1185    /// Returns a vector of all columns converted to arrow [Array](datatypes::arrow::array::Array) in [Values].
1186    fn columns(&self) -> Vec<ArrayRef> {
1187        let mut res = Vec::with_capacity(3 + self.fields.len());
1188        res.push(self.timestamp.to_arrow_array());
1189        res.push(self.sequence.to_arrow_array());
1190        res.push(self.op_type.to_arrow_array());
1191        res.extend(self.fields.iter().map(|f| f.to_arrow_array()));
1192        res
1193    }
1194
1195    /// Builds a new [Values] instance from columns.
1196    fn from_columns(cols: &[ArrayRef]) -> Result<Self> {
1197        debug_assert!(cols.len() >= 3);
1198        let timestamp = Helper::try_into_vector(&cols[0]).context(ConvertVectorSnafu)?;
1199        let sequence =
1200            Arc::new(UInt64Vector::try_from_arrow_array(&cols[1]).context(ConvertVectorSnafu)?);
1201        let op_type =
1202            Arc::new(UInt8Vector::try_from_arrow_array(&cols[2]).context(ConvertVectorSnafu)?);
1203        let fields = Helper::try_into_vectors(&cols[3..]).context(ConvertVectorSnafu)?;
1204
1205        Ok(Self {
1206            timestamp,
1207            sequence,
1208            op_type,
1209            fields,
1210        })
1211    }
1212}
1213
1214impl From<ValueBuilder> for Values {
1215    fn from(mut value: ValueBuilder) -> Self {
1216        let num_rows = value.len();
1217        let fields = value
1218            .fields
1219            .iter_mut()
1220            .enumerate()
1221            .map(|(i, v)| {
1222                if let Some(v) = v {
1223                    MEMTABLE_ACTIVE_FIELD_BUILDER_COUNT.dec();
1224                    v.finish()
1225                } else {
1226                    let mut single_null = value.field_types[i].create_mutable_vector(num_rows);
1227                    single_null.push_nulls(num_rows);
1228                    single_null.to_vector()
1229                }
1230            })
1231            .collect::<Vec<_>>();
1232
1233        let sequence = Arc::new(UInt64Vector::from_vec(value.sequence));
1234        let op_type = Arc::new(UInt8Vector::from_vec(value.op_type));
1235        let timestamp: VectorRef = match value.timestamp_type {
1236            ConcreteDataType::Timestamp(TimestampType::Second(_)) => {
1237                Arc::new(TimestampSecondVector::from_vec(value.timestamp))
1238            }
1239            ConcreteDataType::Timestamp(TimestampType::Millisecond(_)) => {
1240                Arc::new(TimestampMillisecondVector::from_vec(value.timestamp))
1241            }
1242            ConcreteDataType::Timestamp(TimestampType::Microsecond(_)) => {
1243                Arc::new(TimestampMicrosecondVector::from_vec(value.timestamp))
1244            }
1245            ConcreteDataType::Timestamp(TimestampType::Nanosecond(_)) => {
1246                Arc::new(TimestampNanosecondVector::from_vec(value.timestamp))
1247            }
1248            _ => unreachable!(),
1249        };
1250
1251        if cfg!(debug_assertions) {
1252            debug_assert_eq!(timestamp.len(), sequence.len());
1253            debug_assert_eq!(timestamp.len(), op_type.len());
1254            for field in &fields {
1255                debug_assert_eq!(timestamp.len(), field.len());
1256            }
1257        }
1258
1259        Self {
1260            timestamp,
1261            sequence,
1262            op_type,
1263            fields,
1264        }
1265    }
1266}
1267
1268struct TimeSeriesIterBuilder {
1269    series_set: SeriesSet,
1270    projection: HashSet<ColumnId>,
1271    predicate: Option<Predicate>,
1272    dedup: bool,
1273    sequence: Option<SequenceRange>,
1274    merge_mode: MergeMode,
1275}
1276
1277impl IterBuilder for TimeSeriesIterBuilder {
1278    fn build(&self, metrics: Option<MemScanMetrics>) -> Result<BoxedBatchIterator> {
1279        let iter = self.series_set.iter_series(
1280            self.projection.clone(),
1281            self.predicate.clone(),
1282            self.dedup,
1283            self.merge_mode,
1284            self.sequence,
1285            metrics,
1286        )?;
1287        if self.merge_mode == MergeMode::LastNonNull {
1288            let iter = LastNonNullIter::new(iter);
1289            Ok(Box::new(iter))
1290        } else {
1291            Ok(Box::new(iter))
1292        }
1293    }
1294}
1295
1296#[cfg(test)]
1297mod tests {
1298    use std::collections::{HashMap, HashSet};
1299
1300    use api::helper::ColumnDataTypeWrapper;
1301    use api::v1::helper::row;
1302    use api::v1::value::ValueData;
1303    use api::v1::{Mutation, Rows, SemanticType};
1304    use common_time::Timestamp;
1305    use datatypes::prelude::{ConcreteDataType, ScalarVector};
1306    use datatypes::schema::ColumnSchema;
1307    use datatypes::value::{OrderedFloat, Value};
1308    use datatypes::vectors::{Float64Vector, Int64Vector, TimestampMillisecondVector};
1309    use mito_codec::row_converter::SortField;
1310    use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
1311    use store_api::storage::RegionId;
1312
1313    use super::*;
1314    use crate::test_util::column_metadata_to_column_schema;
1315
1316    fn schema_for_test() -> RegionMetadataRef {
1317        let mut builder = RegionMetadataBuilder::new(RegionId::new(123, 456));
1318        builder
1319            .push_column_metadata(ColumnMetadata {
1320                column_schema: ColumnSchema::new("k0", ConcreteDataType::string_datatype(), false),
1321                semantic_type: SemanticType::Tag,
1322                column_id: 0,
1323            })
1324            .push_column_metadata(ColumnMetadata {
1325                column_schema: ColumnSchema::new("k1", ConcreteDataType::int64_datatype(), false),
1326                semantic_type: SemanticType::Tag,
1327                column_id: 1,
1328            })
1329            .push_column_metadata(ColumnMetadata {
1330                column_schema: ColumnSchema::new(
1331                    "ts",
1332                    ConcreteDataType::timestamp_millisecond_datatype(),
1333                    false,
1334                ),
1335                semantic_type: SemanticType::Timestamp,
1336                column_id: 2,
1337            })
1338            .push_column_metadata(ColumnMetadata {
1339                column_schema: ColumnSchema::new("v0", ConcreteDataType::int64_datatype(), true),
1340                semantic_type: SemanticType::Field,
1341                column_id: 3,
1342            })
1343            .push_column_metadata(ColumnMetadata {
1344                column_schema: ColumnSchema::new("v1", ConcreteDataType::float64_datatype(), true),
1345                semantic_type: SemanticType::Field,
1346                column_id: 4,
1347            })
1348            .primary_key(vec![0, 1]);
1349        let region_metadata = builder.build().unwrap();
1350        Arc::new(region_metadata)
1351    }
1352
1353    fn ts_value_ref(val: i64) -> ValueRef<'static> {
1354        ValueRef::Timestamp(Timestamp::new_millisecond(val))
1355    }
1356
1357    fn field_value_ref(v0: i64, v1: f64) -> impl Iterator<Item = ValueRef<'static>> {
1358        vec![ValueRef::Int64(v0), ValueRef::Float64(OrderedFloat(v1))].into_iter()
1359    }
1360
1361    fn check_values(values: &Values, expect: &[(i64, u64, u8, i64, f64)]) {
1362        let ts = values
1363            .timestamp
1364            .as_any()
1365            .downcast_ref::<TimestampMillisecondVector>()
1366            .unwrap();
1367
1368        let v0 = values.fields[0]
1369            .as_any()
1370            .downcast_ref::<Int64Vector>()
1371            .unwrap();
1372        let v1 = values.fields[1]
1373            .as_any()
1374            .downcast_ref::<Float64Vector>()
1375            .unwrap();
1376        let read = ts
1377            .iter_data()
1378            .zip(values.sequence.iter_data())
1379            .zip(values.op_type.iter_data())
1380            .zip(v0.iter_data())
1381            .zip(v1.iter_data())
1382            .map(|((((ts, sequence), op_type), v0), v1)| {
1383                (
1384                    ts.unwrap().0.value(),
1385                    sequence.unwrap(),
1386                    op_type.unwrap(),
1387                    v0.unwrap(),
1388                    v1.unwrap(),
1389                )
1390            })
1391            .collect::<Vec<_>>();
1392        assert_eq!(expect, &read);
1393    }
1394
1395    #[test]
1396    fn test_series() {
1397        let region_metadata = schema_for_test();
1398        let mut series = Series::new(&region_metadata);
1399        series.push(ts_value_ref(1), 0, OpType::Put, field_value_ref(1, 10.1));
1400        series.push(ts_value_ref(2), 0, OpType::Put, field_value_ref(2, 10.2));
1401        assert_eq!(2, series.active.timestamp.len());
1402        assert_eq!(0, series.frozen.len());
1403
1404        let values = series.compact(&region_metadata).unwrap();
1405        check_values(values, &[(1, 0, 1, 1, 10.1), (2, 0, 1, 2, 10.2)]);
1406        assert_eq!(0, series.active.timestamp.len());
1407        assert_eq!(1, series.frozen.len());
1408    }
1409
1410    #[test]
1411    fn test_series_with_nulls() {
1412        let region_metadata = schema_for_test();
1413        let mut series = Series::new(&region_metadata);
1414        // col1: NULL 1 2 3
1415        // col2: NULL NULL 10.2 NULL
1416        series.push(
1417            ts_value_ref(1),
1418            0,
1419            OpType::Put,
1420            vec![ValueRef::Null, ValueRef::Null].into_iter(),
1421        );
1422        series.push(
1423            ts_value_ref(1),
1424            0,
1425            OpType::Put,
1426            vec![ValueRef::Int64(1), ValueRef::Null].into_iter(),
1427        );
1428        series.push(ts_value_ref(1), 2, OpType::Put, field_value_ref(2, 10.2));
1429        series.push(
1430            ts_value_ref(1),
1431            3,
1432            OpType::Put,
1433            vec![ValueRef::Int64(2), ValueRef::Null].into_iter(),
1434        );
1435        assert_eq!(4, series.active.timestamp.len());
1436        assert_eq!(0, series.frozen.len());
1437
1438        let values = series.compact(&region_metadata).unwrap();
1439        assert_eq!(values.fields[0].null_count(), 1);
1440        assert_eq!(values.fields[1].null_count(), 3);
1441        assert_eq!(0, series.active.timestamp.len());
1442        assert_eq!(1, series.frozen.len());
1443    }
1444
1445    fn check_value(batch: &Batch, expect: Vec<Vec<Value>>) {
1446        let ts_len = batch.timestamps().len();
1447        assert_eq!(batch.sequences().len(), ts_len);
1448        assert_eq!(batch.op_types().len(), ts_len);
1449        for f in batch.fields() {
1450            assert_eq!(f.data.len(), ts_len);
1451        }
1452
1453        let mut rows = vec![];
1454        for idx in 0..ts_len {
1455            let mut row = Vec::with_capacity(batch.fields().len() + 3);
1456            row.push(batch.timestamps().get(idx));
1457            row.push(batch.sequences().get(idx));
1458            row.push(batch.op_types().get(idx));
1459            row.extend(batch.fields().iter().map(|f| f.data.get(idx)));
1460            rows.push(row);
1461        }
1462
1463        assert_eq!(expect.len(), rows.len());
1464        for (idx, row) in rows.iter().enumerate() {
1465            assert_eq!(&expect[idx], row);
1466        }
1467    }
1468
1469    #[test]
1470    fn test_values_sort() {
1471        let schema = schema_for_test();
1472        let timestamp = Arc::new(TimestampMillisecondVector::from_vec(vec![1, 2, 3, 4, 3]));
1473        let sequence = Arc::new(UInt64Vector::from_vec(vec![1, 1, 1, 1, 2]));
1474        let op_type = Arc::new(UInt8Vector::from_vec(vec![1, 1, 1, 1, 0]));
1475
1476        let fields = vec![
1477            Arc::new(Int64Vector::from_vec(vec![4, 3, 2, 1, 2])) as Arc<_>,
1478            Arc::new(Float64Vector::from_vec(vec![1.1, 2.1, 4.2, 3.3, 4.2])) as Arc<_>,
1479        ];
1480        let values = Values {
1481            timestamp: timestamp as Arc<_>,
1482            sequence,
1483            op_type,
1484            fields,
1485        };
1486
1487        let batch = values
1488            .to_batch(
1489                b"test",
1490                &schema,
1491                &[0, 1, 2, 3, 4].into_iter().collect(),
1492                None,
1493                true,
1494                MergeMode::LastRow,
1495            )
1496            .unwrap();
1497        check_value(
1498            &batch,
1499            vec![
1500                vec![
1501                    Value::Timestamp(Timestamp::new_millisecond(1)),
1502                    Value::UInt64(1),
1503                    Value::UInt8(1),
1504                    Value::Int64(4),
1505                    Value::Float64(OrderedFloat(1.1)),
1506                ],
1507                vec![
1508                    Value::Timestamp(Timestamp::new_millisecond(2)),
1509                    Value::UInt64(1),
1510                    Value::UInt8(1),
1511                    Value::Int64(3),
1512                    Value::Float64(OrderedFloat(2.1)),
1513                ],
1514                vec![
1515                    Value::Timestamp(Timestamp::new_millisecond(3)),
1516                    Value::UInt64(2),
1517                    Value::UInt8(0),
1518                    Value::Int64(2),
1519                    Value::Float64(OrderedFloat(4.2)),
1520                ],
1521                vec![
1522                    Value::Timestamp(Timestamp::new_millisecond(4)),
1523                    Value::UInt64(1),
1524                    Value::UInt8(1),
1525                    Value::Int64(1),
1526                    Value::Float64(OrderedFloat(3.3)),
1527                ],
1528            ],
1529        )
1530    }
1531
1532    #[test]
1533    fn test_last_non_null_should_filter_by_sequence_before_merge_drop_ts() {
1534        let schema = schema_for_test();
1535        let projection: HashSet<_> = [0, 1, 2, 3, 4].into_iter().collect();
1536
1537        // Same timestamp, newest sequence is out of range. We should still keep the timestamp by
1538        // using the latest row *within* the sequence range as the base row.
1539        //
1540        // Expect after filtering seq<=2:
1541        // - base row: seq=2
1542        // - v0 from seq=2, v1 filled from seq=1
1543        let timestamp = Arc::new(TimestampMillisecondVector::from_vec(vec![1, 1, 1]));
1544        let sequence = Arc::new(UInt64Vector::from_vec(vec![1, 2, 3]));
1545        let op_type = Arc::new(UInt8Vector::from_vec(vec![OpType::Put as u8; 3]));
1546        let fields = vec![
1547            Arc::new(Int64Vector::from(vec![None, Some(10), None])) as Arc<_>,
1548            Arc::new(Float64Vector::from(vec![Some(1.5), None, None])) as Arc<_>,
1549        ];
1550        let values = Values {
1551            timestamp: timestamp as Arc<_>,
1552            sequence,
1553            op_type,
1554            fields,
1555        };
1556
1557        let batch = values
1558            .to_batch(
1559                b"test",
1560                &schema,
1561                &projection,
1562                Some(SequenceRange::LtEq { max: 2 }),
1563                true,
1564                MergeMode::LastNonNull,
1565            )
1566            .unwrap();
1567
1568        check_value(
1569            &batch,
1570            vec![vec![
1571                Value::Timestamp(Timestamp::new_millisecond(1)),
1572                Value::UInt64(2),
1573                Value::UInt8(OpType::Put as u8),
1574                Value::Int64(10),
1575                Value::Float64(OrderedFloat(1.5)),
1576            ]],
1577        );
1578    }
1579
1580    #[test]
1581    fn test_last_non_null_should_filter_by_sequence_before_merge_no_fill_from_out_of_range_row() {
1582        let schema = schema_for_test();
1583        let projection: HashSet<_> = [0, 1, 2, 3, 4].into_iter().collect();
1584
1585        // Same timestamp, older sequence is out of range. We must not fill null fields using rows
1586        // that should be excluded by the sequence filter.
1587        //
1588        // Expect after filtering seq>1:
1589        // - keep only seq=2 row, v0 stays NULL.
1590        let timestamp = Arc::new(TimestampMillisecondVector::from_vec(vec![1, 1]));
1591        let sequence = Arc::new(UInt64Vector::from_vec(vec![1, 2]));
1592        let op_type = Arc::new(UInt8Vector::from_vec(vec![OpType::Put as u8; 2]));
1593        let fields = vec![
1594            Arc::new(Int64Vector::from(vec![Some(10), None])) as Arc<_>,
1595            Arc::new(Float64Vector::from(vec![Some(1.0), Some(1.0)])) as Arc<_>,
1596        ];
1597        let values = Values {
1598            timestamp: timestamp as Arc<_>,
1599            sequence,
1600            op_type,
1601            fields,
1602        };
1603
1604        let batch = values
1605            .to_batch(
1606                b"test",
1607                &schema,
1608                &projection,
1609                Some(SequenceRange::Gt { min: 1 }),
1610                true,
1611                MergeMode::LastNonNull,
1612            )
1613            .unwrap();
1614
1615        check_value(
1616            &batch,
1617            vec![vec![
1618                Value::Timestamp(Timestamp::new_millisecond(1)),
1619                Value::UInt64(2),
1620                Value::UInt8(OpType::Put as u8),
1621                Value::Null,
1622                Value::Float64(OrderedFloat(1.0)),
1623            ]],
1624        );
1625    }
1626
1627    fn build_key_values(schema: &RegionMetadataRef, k0: String, k1: i64, len: usize) -> KeyValues {
1628        let column_schema = schema
1629            .column_metadatas
1630            .iter()
1631            .map(|c| api::v1::ColumnSchema {
1632                column_name: c.column_schema.name.clone(),
1633                datatype: ColumnDataTypeWrapper::try_from(c.column_schema.data_type.clone())
1634                    .unwrap()
1635                    .datatype() as i32,
1636                semantic_type: c.semantic_type as i32,
1637                ..Default::default()
1638            })
1639            .collect();
1640
1641        let rows = (0..len)
1642            .map(|i| {
1643                row(vec![
1644                    ValueData::StringValue(k0.clone()),
1645                    ValueData::I64Value(k1),
1646                    ValueData::TimestampMillisecondValue(i as i64),
1647                    ValueData::I64Value(i as i64),
1648                    ValueData::F64Value(i as f64),
1649                ])
1650            })
1651            .collect();
1652        let mutation = api::v1::Mutation {
1653            op_type: 1,
1654            sequence: 0,
1655            rows: Some(Rows {
1656                schema: column_schema,
1657                rows,
1658            }),
1659            write_hint: None,
1660        };
1661        KeyValues::new(schema.as_ref(), mutation).unwrap()
1662    }
1663
1664    #[test]
1665    fn test_series_set_concurrency() {
1666        let schema = schema_for_test();
1667        let row_codec = Arc::new(DensePrimaryKeyCodec::with_fields(
1668            schema
1669                .primary_key_columns()
1670                .map(|c| {
1671                    (
1672                        c.column_id,
1673                        SortField::new(c.column_schema.data_type.clone()),
1674                    )
1675                })
1676                .collect(),
1677        ));
1678        let set = Arc::new(SeriesSet::new(schema.clone(), row_codec));
1679
1680        let concurrency = 32;
1681        let pk_num = concurrency * 2;
1682        let mut handles = Vec::with_capacity(concurrency);
1683        for i in 0..concurrency {
1684            let set = set.clone();
1685            let schema = schema.clone();
1686            let column_schemas = schema
1687                .column_metadatas
1688                .iter()
1689                .map(column_metadata_to_column_schema)
1690                .collect::<Vec<_>>();
1691            let handle = std::thread::spawn(move || {
1692                for j in i * 100..(i + 1) * 100 {
1693                    let pk = j % pk_num;
1694                    let primary_key = format!("pk-{}", pk).as_bytes().to_vec();
1695
1696                    let kvs = KeyValues::new(
1697                        &schema,
1698                        Mutation {
1699                            op_type: OpType::Put as i32,
1700                            sequence: j as u64,
1701                            rows: Some(Rows {
1702                                schema: column_schemas.clone(),
1703                                rows: vec![row(vec![
1704                                    ValueData::StringValue(format!("{}", j)),
1705                                    ValueData::I64Value(j as i64),
1706                                    ValueData::TimestampMillisecondValue(j as i64),
1707                                    ValueData::I64Value(j as i64),
1708                                    ValueData::F64Value(j as f64),
1709                                ])],
1710                            }),
1711                            write_hint: None,
1712                        },
1713                    )
1714                    .unwrap();
1715                    set.push_to_series(primary_key, &kvs.iter().next().unwrap());
1716                }
1717            });
1718            handles.push(handle);
1719        }
1720        for h in handles {
1721            h.join().unwrap();
1722        }
1723
1724        let mut timestamps = Vec::with_capacity(concurrency * 100);
1725        let mut sequences = Vec::with_capacity(concurrency * 100);
1726        let mut op_types = Vec::with_capacity(concurrency * 100);
1727        let mut v0 = Vec::with_capacity(concurrency * 100);
1728
1729        for i in 0..pk_num {
1730            let pk = format!("pk-{}", i).as_bytes().to_vec();
1731            let series = set.get_series(&pk).unwrap();
1732            let mut guard = series.write().unwrap();
1733            let values = guard.compact(&schema).unwrap();
1734            timestamps.extend(values.sequence.iter_data().map(|v| v.unwrap() as i64));
1735            sequences.extend(values.sequence.iter_data().map(|v| v.unwrap() as i64));
1736            op_types.extend(values.op_type.iter_data().map(|v| v.unwrap()));
1737            v0.extend(
1738                values
1739                    .fields
1740                    .first()
1741                    .unwrap()
1742                    .as_any()
1743                    .downcast_ref::<Int64Vector>()
1744                    .unwrap()
1745                    .iter_data()
1746                    .map(|v| v.unwrap()),
1747            );
1748        }
1749
1750        let expected_sequence = (0..(concurrency * 100) as i64).collect::<HashSet<_>>();
1751        assert_eq!(
1752            expected_sequence,
1753            sequences.iter().copied().collect::<HashSet<_>>()
1754        );
1755
1756        op_types.iter().all(|op| *op == OpType::Put as u8);
1757        assert_eq!(
1758            expected_sequence,
1759            timestamps.iter().copied().collect::<HashSet<_>>()
1760        );
1761
1762        assert_eq!(timestamps, sequences);
1763        assert_eq!(v0, timestamps);
1764    }
1765
1766    #[test]
1767    fn test_memtable() {
1768        common_telemetry::init_default_ut_logging();
1769        check_memtable_dedup(true);
1770        check_memtable_dedup(false);
1771    }
1772
1773    fn check_memtable_dedup(dedup: bool) {
1774        let schema = schema_for_test();
1775        let kvs = build_key_values(&schema, "hello".to_string(), 42, 100);
1776        let memtable = TimeSeriesMemtable::new(schema, 42, None, dedup, MergeMode::LastRow);
1777        memtable.write(&kvs).unwrap();
1778        memtable.write(&kvs).unwrap();
1779
1780        let mut expected_ts: HashMap<i64, usize> = HashMap::new();
1781        for ts in kvs.iter().map(|kv| {
1782            kv.timestamp()
1783                .try_into_timestamp()
1784                .unwrap()
1785                .unwrap()
1786                .value()
1787        }) {
1788            *expected_ts.entry(ts).or_default() += if dedup { 1 } else { 2 };
1789        }
1790
1791        let iter = memtable.iter(None, None, None).unwrap();
1792        let mut read = HashMap::new();
1793
1794        for ts in iter
1795            .flat_map(|batch| {
1796                batch
1797                    .unwrap()
1798                    .timestamps()
1799                    .as_any()
1800                    .downcast_ref::<TimestampMillisecondVector>()
1801                    .unwrap()
1802                    .iter_data()
1803                    .collect::<Vec<_>>()
1804                    .into_iter()
1805            })
1806            .map(|v| v.unwrap().0.value())
1807        {
1808            *read.entry(ts).or_default() += 1;
1809        }
1810        assert_eq!(expected_ts, read);
1811
1812        let stats = memtable.stats();
1813        assert!(stats.bytes_allocated() > 0);
1814        assert_eq!(
1815            Some((
1816                Timestamp::new_millisecond(0),
1817                Timestamp::new_millisecond(99)
1818            )),
1819            stats.time_range()
1820        );
1821    }
1822
1823    #[test]
1824    fn test_memtable_projection() {
1825        common_telemetry::init_default_ut_logging();
1826        let schema = schema_for_test();
1827        let kvs = build_key_values(&schema, "hello".to_string(), 42, 100);
1828        let memtable = TimeSeriesMemtable::new(schema, 42, None, true, MergeMode::LastRow);
1829        memtable.write(&kvs).unwrap();
1830
1831        let iter = memtable.iter(Some(&[3]), None, None).unwrap();
1832
1833        let mut v0_all = vec![];
1834
1835        for res in iter {
1836            let batch = res.unwrap();
1837            assert_eq!(1, batch.fields().len());
1838            let v0 = batch
1839                .fields()
1840                .first()
1841                .unwrap()
1842                .data
1843                .as_any()
1844                .downcast_ref::<Int64Vector>()
1845                .unwrap();
1846            v0_all.extend(v0.iter_data().map(|v| v.unwrap()));
1847        }
1848        assert_eq!((0..100i64).collect::<Vec<_>>(), v0_all);
1849    }
1850
1851    #[test]
1852    fn test_memtable_concurrent_write_read() {
1853        common_telemetry::init_default_ut_logging();
1854        let schema = schema_for_test();
1855        let memtable = Arc::new(TimeSeriesMemtable::new(
1856            schema.clone(),
1857            42,
1858            None,
1859            true,
1860            MergeMode::LastRow,
1861        ));
1862
1863        // Number of writer threads
1864        let num_writers = 10;
1865        // Number of reader threads
1866        let num_readers = 5;
1867        // Number of series per writer
1868        let series_per_writer = 100;
1869        // Number of rows per series
1870        let rows_per_series = 10;
1871        // Total number of series
1872        let total_series = num_writers * series_per_writer;
1873
1874        // Create a barrier to synchronize the start of all threads
1875        let barrier = Arc::new(std::sync::Barrier::new(num_writers + num_readers + 1));
1876
1877        // Spawn writer threads
1878        let mut writer_handles = Vec::with_capacity(num_writers);
1879        for writer_id in 0..num_writers {
1880            let memtable = memtable.clone();
1881            let schema = schema.clone();
1882            let barrier = barrier.clone();
1883
1884            let handle = std::thread::spawn(move || {
1885                // Wait for all threads to be ready
1886                barrier.wait();
1887
1888                // Create and write series
1889                for series_id in 0..series_per_writer {
1890                    let series_key = format!("writer-{}-series-{}", writer_id, series_id);
1891                    let kvs =
1892                        build_key_values(&schema, series_key, series_id as i64, rows_per_series);
1893                    memtable.write(&kvs).unwrap();
1894                }
1895            });
1896
1897            writer_handles.push(handle);
1898        }
1899
1900        // Spawn reader threads
1901        let mut reader_handles = Vec::with_capacity(num_readers);
1902        for _ in 0..num_readers {
1903            let memtable = memtable.clone();
1904            let barrier = barrier.clone();
1905
1906            let handle = std::thread::spawn(move || {
1907                barrier.wait();
1908
1909                for _ in 0..10 {
1910                    let iter = memtable.iter(None, None, None).unwrap();
1911                    for batch_result in iter {
1912                        let _ = batch_result.unwrap();
1913                    }
1914                }
1915            });
1916
1917            reader_handles.push(handle);
1918        }
1919
1920        barrier.wait();
1921
1922        for handle in writer_handles {
1923            handle.join().unwrap();
1924        }
1925        for handle in reader_handles {
1926            handle.join().unwrap();
1927        }
1928
1929        let iter = memtable.iter(None, None, None).unwrap();
1930        let mut series_count = 0;
1931        let mut row_count = 0;
1932
1933        for batch_result in iter {
1934            let batch = batch_result.unwrap();
1935            series_count += 1;
1936            row_count += batch.num_rows();
1937        }
1938        assert_eq!(total_series, series_count);
1939        assert_eq!(total_series * rows_per_series, row_count);
1940    }
1941}