Skip to main content

mito2/memtable/
time_series.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::btree_map::Entry;
16use std::collections::{BTreeMap, Bound, HashSet};
17use std::fmt::{Debug, Formatter};
18use std::iter;
19use std::sync::atomic::{AtomicI64, AtomicU64, AtomicUsize, Ordering};
20use std::sync::{Arc, RwLock};
21use std::time::{Duration, Instant};
22
23use api::v1::OpType;
24use common_recordbatch::filter::SimpleFilterEvaluator;
25use common_telemetry::{debug, error};
26use common_time::Timestamp;
27use datatypes::arrow;
28use datatypes::arrow::array::ArrayRef;
29use datatypes::arrow_array::StringArray;
30use datatypes::data_type::{ConcreteDataType, DataType};
31use datatypes::prelude::{ScalarVector, Vector, VectorRef};
32use datatypes::types::TimestampType;
33use datatypes::value::{Value, ValueRef};
34use datatypes::vectors::{
35    Helper, TimestampMicrosecondVector, TimestampMillisecondVector, TimestampNanosecondVector,
36    TimestampSecondVector, UInt8Vector, UInt64Vector,
37};
38use mito_codec::key_values::KeyValue;
39use mito_codec::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt};
40use snafu::{OptionExt, ResultExt, ensure};
41use store_api::metadata::RegionMetadataRef;
42use store_api::storage::{ColumnId, SequenceRange};
43use table::predicate::Predicate;
44
45use crate::error::{
46    self, ComputeArrowSnafu, ConvertVectorSnafu, EncodeSnafu, PrimaryKeyLengthMismatchSnafu, Result,
47};
48use crate::flush::WriteBufferManagerRef;
49use crate::memtable::builder::{FieldBuilder, StringBuilder};
50use crate::memtable::bulk::part::BulkPart;
51use crate::memtable::simple_bulk_memtable::SimpleBulkMemtable;
52use crate::memtable::stats::WriteMetrics;
53use crate::memtable::{
54    AllocTracker, BatchToRecordBatchContext, BoxedBatchIterator, BoxedRecordBatchIterator,
55    IterBuilder, KeyValues, MemScanMetrics, Memtable, MemtableBuilder, MemtableId, MemtableRange,
56    MemtableRangeContext, MemtableRanges, MemtableRef, MemtableStats, RangesOptions,
57    read_column_ids_from_projection,
58};
59use crate::metrics::{
60    MEMTABLE_ACTIVE_FIELD_BUILDER_COUNT, MEMTABLE_ACTIVE_SERIES_COUNT, READ_ROWS_TOTAL,
61    READ_STAGE_ELAPSED,
62};
63use crate::read::dedup::LastNonNullIter;
64use crate::read::prune::PruneTimeIterator;
65use crate::read::scan_region::PredicateGroup;
66use crate::read::{Batch, BatchBuilder, BatchColumn};
67use crate::region::options::MergeMode;
68
69/// Initial vector builder capacity.
70const INITIAL_BUILDER_CAPACITY: usize = 4;
71
72/// Vector builder capacity.
73const BUILDER_CAPACITY: usize = 512;
74
75/// Builder to build [TimeSeriesMemtable].
76#[derive(Debug, Default)]
77pub struct TimeSeriesMemtableBuilder {
78    write_buffer_manager: Option<WriteBufferManagerRef>,
79    dedup: bool,
80    merge_mode: MergeMode,
81}
82
83impl TimeSeriesMemtableBuilder {
84    /// Creates a new builder with specific `write_buffer_manager`.
85    pub fn new(
86        write_buffer_manager: Option<WriteBufferManagerRef>,
87        dedup: bool,
88        merge_mode: MergeMode,
89    ) -> Self {
90        Self {
91            write_buffer_manager,
92            dedup,
93            merge_mode,
94        }
95    }
96}
97
98impl MemtableBuilder for TimeSeriesMemtableBuilder {
99    fn build(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef {
100        if metadata.primary_key.is_empty() {
101            Arc::new(SimpleBulkMemtable::new(
102                id,
103                metadata.clone(),
104                self.write_buffer_manager.clone(),
105                self.dedup,
106                self.merge_mode,
107            ))
108        } else {
109            Arc::new(TimeSeriesMemtable::new(
110                metadata.clone(),
111                id,
112                self.write_buffer_manager.clone(),
113                self.dedup,
114                self.merge_mode,
115            ))
116        }
117    }
118
119    fn use_bulk_insert(&self, _metadata: &RegionMetadataRef) -> bool {
120        // Now if we can use simple bulk memtable, the input request is already
121        // a bulk write request and won't call this method.
122        false
123    }
124}
125
126/// Memtable implementation that groups rows by their primary key.
127pub struct TimeSeriesMemtable {
128    id: MemtableId,
129    region_metadata: RegionMetadataRef,
130    row_codec: Arc<DensePrimaryKeyCodec>,
131    series_set: SeriesSet,
132    alloc_tracker: AllocTracker,
133    max_timestamp: AtomicI64,
134    min_timestamp: AtomicI64,
135    max_sequence: AtomicU64,
136    dedup: bool,
137    merge_mode: MergeMode,
138    /// Total written rows in memtable. This also includes deleted and duplicated rows.
139    num_rows: AtomicUsize,
140}
141
142impl TimeSeriesMemtable {
143    pub fn new(
144        region_metadata: RegionMetadataRef,
145        id: MemtableId,
146        write_buffer_manager: Option<WriteBufferManagerRef>,
147        dedup: bool,
148        merge_mode: MergeMode,
149    ) -> Self {
150        let row_codec = Arc::new(DensePrimaryKeyCodec::new(&region_metadata));
151        let series_set = SeriesSet::new(region_metadata.clone(), row_codec.clone());
152        Self {
153            id,
154            region_metadata,
155            series_set,
156            row_codec,
157            alloc_tracker: AllocTracker::new(write_buffer_manager),
158            max_timestamp: AtomicI64::new(i64::MIN),
159            min_timestamp: AtomicI64::new(i64::MAX),
160            max_sequence: AtomicU64::new(0),
161            dedup,
162            merge_mode,
163            num_rows: Default::default(),
164        }
165    }
166
167    /// Updates memtable stats.
168    fn update_stats(&self, stats: WriteMetrics) {
169        self.alloc_tracker
170            .on_allocation(stats.key_bytes + stats.value_bytes);
171        self.max_timestamp.fetch_max(stats.max_ts, Ordering::SeqCst);
172        self.min_timestamp.fetch_min(stats.min_ts, Ordering::SeqCst);
173        self.max_sequence
174            .fetch_max(stats.max_sequence, Ordering::SeqCst);
175        self.num_rows.fetch_add(stats.num_rows, Ordering::SeqCst);
176    }
177
178    fn write_key_value(&self, kv: KeyValue, stats: &mut WriteMetrics) -> Result<()> {
179        ensure!(
180            self.row_codec.num_fields() == kv.num_primary_keys(),
181            PrimaryKeyLengthMismatchSnafu {
182                expect: self.row_codec.num_fields(),
183                actual: kv.num_primary_keys(),
184            }
185        );
186
187        let primary_key_encoded = self
188            .row_codec
189            .encode(kv.primary_keys())
190            .context(EncodeSnafu)?;
191
192        let (key_allocated, value_allocated) =
193            self.series_set.push_to_series(primary_key_encoded, &kv);
194        stats.key_bytes += key_allocated;
195        stats.value_bytes += value_allocated;
196
197        // safety: timestamp of kv must be both present and a valid timestamp value.
198        let ts = kv
199            .timestamp()
200            .try_into_timestamp()
201            .unwrap()
202            .unwrap()
203            .value();
204        stats.min_ts = stats.min_ts.min(ts);
205        stats.max_ts = stats.max_ts.max(ts);
206        Ok(())
207    }
208}
209
210impl Debug for TimeSeriesMemtable {
211    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
212        f.debug_struct("TimeSeriesMemtable").finish()
213    }
214}
215
216impl Memtable for TimeSeriesMemtable {
217    fn id(&self) -> MemtableId {
218        self.id
219    }
220
221    fn write(&self, kvs: &KeyValues) -> Result<()> {
222        if kvs.is_empty() {
223            return Ok(());
224        }
225
226        let mut local_stats = WriteMetrics::default();
227
228        for kv in kvs.iter() {
229            self.write_key_value(kv, &mut local_stats)?;
230        }
231        local_stats.value_bytes += kvs.num_rows() * std::mem::size_of::<Timestamp>();
232        local_stats.value_bytes += kvs.num_rows() * std::mem::size_of::<OpType>();
233        local_stats.max_sequence = kvs.max_sequence();
234        local_stats.num_rows = kvs.num_rows();
235        // TODO(hl): this maybe inaccurate since for-iteration may return early.
236        // We may lift the primary key length check out of Memtable::write
237        // so that we can ensure writing to memtable will succeed.
238        self.update_stats(local_stats);
239        Ok(())
240    }
241
242    fn write_one(&self, key_value: KeyValue) -> Result<()> {
243        let mut metrics = WriteMetrics::default();
244        let res = self.write_key_value(key_value, &mut metrics);
245        metrics.value_bytes += std::mem::size_of::<Timestamp>() + std::mem::size_of::<OpType>();
246        metrics.max_sequence = key_value.sequence();
247        metrics.num_rows = 1;
248
249        if res.is_ok() {
250            self.update_stats(metrics);
251        }
252        res
253    }
254
255    fn write_bulk(&self, part: BulkPart) -> Result<()> {
256        // Default implementation fallback to row iteration.
257        let mutation = part.to_mutation(&self.region_metadata)?;
258        let mut metrics = WriteMetrics::default();
259        if let Some(key_values) = KeyValues::new(&self.region_metadata, mutation) {
260            for kv in key_values.iter() {
261                self.write_key_value(kv, &mut metrics)?
262            }
263        }
264
265        metrics.max_sequence = part.sequence;
266        metrics.max_ts = part.max_timestamp;
267        metrics.min_ts = part.min_timestamp;
268        metrics.num_rows = part.num_rows();
269        self.update_stats(metrics);
270        Ok(())
271    }
272
273    fn ranges(
274        &self,
275        projection: Option<&[ColumnId]>,
276        options: RangesOptions,
277    ) -> Result<MemtableRanges> {
278        let predicate = options.predicate;
279        let sequence = options.sequence;
280        let read_column_ids = read_column_ids_from_projection(&self.region_metadata, projection);
281        let projection = if let Some(projection) = projection {
282            projection.iter().copied().collect()
283        } else {
284            self.region_metadata
285                .field_columns()
286                .map(|c| c.column_id)
287                .collect()
288        };
289        let batch_to_record_batch = Arc::new(BatchToRecordBatchContext::new(
290            self.region_metadata.clone(),
291            read_column_ids,
292        ));
293        let builder = Box::new(TimeSeriesIterBuilder {
294            series_set: self.series_set.clone(),
295            projection,
296            predicate: predicate.clone(),
297            dedup: self.dedup,
298            merge_mode: self.merge_mode,
299            sequence,
300            batch_to_record_batch,
301        });
302        let context = Arc::new(MemtableRangeContext::new(self.id, builder, predicate));
303        let range_stats = self.stats();
304        let range = MemtableRange::new(context, range_stats);
305        Ok(MemtableRanges {
306            ranges: [(0, range)].into(),
307        })
308    }
309
310    fn is_empty(&self) -> bool {
311        self.series_set.series.read().unwrap().0.is_empty()
312    }
313
314    fn freeze(&self) -> Result<()> {
315        self.alloc_tracker.done_allocating();
316
317        Ok(())
318    }
319
320    fn stats(&self) -> MemtableStats {
321        let estimated_bytes = self.alloc_tracker.bytes_allocated();
322
323        if estimated_bytes == 0 {
324            // no rows ever written
325            return MemtableStats {
326                estimated_bytes,
327                time_range: None,
328                num_rows: 0,
329                num_ranges: 0,
330                max_sequence: 0,
331                series_count: 0,
332            };
333        }
334        let ts_type = self
335            .region_metadata
336            .time_index_column()
337            .column_schema
338            .data_type
339            .clone()
340            .as_timestamp()
341            .expect("Timestamp column must have timestamp type");
342        let max_timestamp = ts_type.create_timestamp(self.max_timestamp.load(Ordering::Relaxed));
343        let min_timestamp = ts_type.create_timestamp(self.min_timestamp.load(Ordering::Relaxed));
344        let series_count = self.series_set.series.read().unwrap().0.len();
345        MemtableStats {
346            estimated_bytes,
347            time_range: Some((min_timestamp, max_timestamp)),
348            num_rows: self.num_rows.load(Ordering::Relaxed),
349            num_ranges: 1,
350            max_sequence: self.max_sequence.load(Ordering::Relaxed),
351            series_count,
352        }
353    }
354
355    fn fork(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef {
356        Arc::new(TimeSeriesMemtable::new(
357            metadata.clone(),
358            id,
359            self.alloc_tracker.write_buffer_manager(),
360            self.dedup,
361            self.merge_mode,
362        ))
363    }
364}
365
366#[derive(Default)]
367struct SeriesMap(BTreeMap<Vec<u8>, Arc<RwLock<Series>>>);
368
369impl Drop for SeriesMap {
370    fn drop(&mut self) {
371        let num_series = self.0.len();
372        let num_field_builders = self
373            .0
374            .values()
375            .map(|v| v.read().unwrap().active.num_field_builders())
376            .sum::<usize>();
377        MEMTABLE_ACTIVE_SERIES_COUNT.sub(num_series as i64);
378        MEMTABLE_ACTIVE_FIELD_BUILDER_COUNT.sub(num_field_builders as i64);
379    }
380}
381
382#[derive(Clone)]
383pub(crate) struct SeriesSet {
384    region_metadata: RegionMetadataRef,
385    series: Arc<RwLock<SeriesMap>>,
386    codec: Arc<DensePrimaryKeyCodec>,
387}
388
389impl SeriesSet {
390    fn new(region_metadata: RegionMetadataRef, codec: Arc<DensePrimaryKeyCodec>) -> Self {
391        Self {
392            region_metadata,
393            series: Default::default(),
394            codec,
395        }
396    }
397}
398
399impl SeriesSet {
400    /// Push [KeyValue] to SeriesSet with given primary key and return key/value allocated memory size.
401    fn push_to_series(&self, primary_key: Vec<u8>, kv: &KeyValue) -> (usize, usize) {
402        if let Some(series) = self.series.read().unwrap().0.get(&primary_key) {
403            let value_allocated = series.write().unwrap().push(
404                kv.timestamp(),
405                kv.sequence(),
406                kv.op_type(),
407                kv.fields(),
408            );
409            return (0, value_allocated);
410        };
411
412        let mut indices = self.series.write().unwrap();
413        match indices.0.entry(primary_key) {
414            Entry::Vacant(v) => {
415                let key_len = v.key().len();
416                let mut series = Series::new(&self.region_metadata);
417                let value_allocated =
418                    series.push(kv.timestamp(), kv.sequence(), kv.op_type(), kv.fields());
419                v.insert(Arc::new(RwLock::new(series)));
420                (key_len, value_allocated)
421            }
422            // safety: series must exist at given index.
423            Entry::Occupied(v) => {
424                let value_allocated = v.get().write().unwrap().push(
425                    kv.timestamp(),
426                    kv.sequence(),
427                    kv.op_type(),
428                    kv.fields(),
429                );
430                (0, value_allocated)
431            }
432        }
433    }
434
435    #[cfg(test)]
436    fn get_series(&self, primary_key: &[u8]) -> Option<Arc<RwLock<Series>>> {
437        self.series.read().unwrap().0.get(primary_key).cloned()
438    }
439
440    /// Iterates all series in [SeriesSet].
441    fn iter_series(
442        &self,
443        projection: HashSet<ColumnId>,
444        predicate: PredicateGroup,
445        dedup: bool,
446        merge_mode: MergeMode,
447        sequence: Option<SequenceRange>,
448        mem_scan_metrics: Option<MemScanMetrics>,
449    ) -> Result<Iter> {
450        let primary_key_schema = primary_key_schema(&self.region_metadata);
451        let primary_key_datatypes = self
452            .region_metadata
453            .primary_key_columns()
454            .map(|pk| pk.column_schema.data_type.clone())
455            .collect();
456
457        Iter::try_new(
458            self.region_metadata.clone(),
459            self.series.clone(),
460            projection,
461            predicate.predicate().cloned(),
462            primary_key_schema,
463            primary_key_datatypes,
464            self.codec.clone(),
465            dedup,
466            merge_mode,
467            sequence,
468            mem_scan_metrics,
469        )
470    }
471}
472
473/// Creates an arrow [SchemaRef](arrow::datatypes::SchemaRef) that only contains primary keys
474/// of given region schema
475pub(crate) fn primary_key_schema(
476    region_metadata: &RegionMetadataRef,
477) -> arrow::datatypes::SchemaRef {
478    let fields = region_metadata
479        .primary_key_columns()
480        .map(|pk| {
481            arrow::datatypes::Field::new(
482                pk.column_schema.name.clone(),
483                pk.column_schema.data_type.as_arrow_type(),
484                pk.column_schema.is_nullable(),
485            )
486        })
487        .collect::<Vec<_>>();
488    Arc::new(arrow::datatypes::Schema::new(fields))
489}
490
491/// Metrics for reading the memtable.
492#[derive(Debug, Default)]
493struct Metrics {
494    /// Total series in the memtable.
495    total_series: usize,
496    /// Number of series pruned.
497    num_pruned_series: usize,
498    /// Number of rows read.
499    num_rows: usize,
500    /// Number of batch read.
501    num_batches: usize,
502    /// Duration to scan the memtable.
503    scan_cost: Duration,
504}
505
506struct Iter {
507    metadata: RegionMetadataRef,
508    series: Arc<RwLock<SeriesMap>>,
509    projection: HashSet<ColumnId>,
510    last_key: Option<Vec<u8>>,
511    predicate: Vec<SimpleFilterEvaluator>,
512    pk_schema: arrow::datatypes::SchemaRef,
513    pk_datatypes: Vec<ConcreteDataType>,
514    codec: Arc<DensePrimaryKeyCodec>,
515    dedup: bool,
516    merge_mode: MergeMode,
517    sequence: Option<SequenceRange>,
518    metrics: Metrics,
519    mem_scan_metrics: Option<MemScanMetrics>,
520}
521
522impl Iter {
523    #[allow(clippy::too_many_arguments)]
524    pub(crate) fn try_new(
525        metadata: RegionMetadataRef,
526        series: Arc<RwLock<SeriesMap>>,
527        projection: HashSet<ColumnId>,
528        predicate: Option<Predicate>,
529        pk_schema: arrow::datatypes::SchemaRef,
530        pk_datatypes: Vec<ConcreteDataType>,
531        codec: Arc<DensePrimaryKeyCodec>,
532        dedup: bool,
533        merge_mode: MergeMode,
534        sequence: Option<SequenceRange>,
535        mem_scan_metrics: Option<MemScanMetrics>,
536    ) -> Result<Self> {
537        let predicate = predicate
538            .map(|predicate| {
539                predicate
540                    .exprs()
541                    .iter()
542                    .filter_map(SimpleFilterEvaluator::try_new)
543                    .collect::<Vec<_>>()
544            })
545            .unwrap_or_default();
546        Ok(Self {
547            metadata,
548            series,
549            projection,
550            last_key: None,
551            predicate,
552            pk_schema,
553            pk_datatypes,
554            codec,
555            dedup,
556            merge_mode,
557            sequence,
558            metrics: Metrics::default(),
559            mem_scan_metrics,
560        })
561    }
562
563    fn report_mem_scan_metrics(&mut self) {
564        if let Some(mem_scan_metrics) = self.mem_scan_metrics.take() {
565            let inner = crate::memtable::MemScanMetricsData {
566                total_series: self.metrics.total_series,
567                num_rows: self.metrics.num_rows,
568                num_batches: self.metrics.num_batches,
569                scan_cost: self.metrics.scan_cost,
570                ..Default::default()
571            };
572            mem_scan_metrics.merge_inner(&inner);
573        }
574    }
575}
576
577impl Drop for Iter {
578    fn drop(&mut self) {
579        debug!(
580            "Iter {} time series memtable, metrics: {:?}",
581            self.metadata.region_id, self.metrics
582        );
583
584        // Report MemScanMetrics if not already reported
585        self.report_mem_scan_metrics();
586
587        READ_ROWS_TOTAL
588            .with_label_values(&["time_series_memtable"])
589            .inc_by(self.metrics.num_rows as u64);
590        READ_STAGE_ELAPSED
591            .with_label_values(&["scan_memtable"])
592            .observe(self.metrics.scan_cost.as_secs_f64());
593    }
594}
595
596impl Iterator for Iter {
597    type Item = Result<Batch>;
598
599    fn next(&mut self) -> Option<Self::Item> {
600        let start = Instant::now();
601        let map = self.series.read().unwrap();
602        let range = match &self.last_key {
603            None => map.0.range::<Vec<u8>, _>(..),
604            Some(last_key) => map
605                .0
606                .range::<Vec<u8>, _>((Bound::Excluded(last_key), Bound::Unbounded)),
607        };
608
609        // TODO(hl): maybe yield more than one time series to amortize range overhead.
610        for (primary_key, series) in range {
611            self.metrics.total_series += 1;
612
613            let mut series = series.write().unwrap();
614            if !self.predicate.is_empty()
615                && !prune_primary_key(
616                    &self.codec,
617                    primary_key.as_slice(),
618                    &mut series,
619                    &self.pk_datatypes,
620                    self.pk_schema.clone(),
621                    &self.predicate,
622                )
623            {
624                // read next series
625                self.metrics.num_pruned_series += 1;
626                continue;
627            }
628            self.last_key = Some(primary_key.clone());
629
630            let values = series.compact(&self.metadata);
631            let batch = values.and_then(|v| {
632                v.to_batch(
633                    primary_key,
634                    &self.metadata,
635                    &self.projection,
636                    self.sequence,
637                    self.dedup,
638                    self.merge_mode,
639                )
640            });
641
642            // Update metrics.
643            self.metrics.num_batches += 1;
644            self.metrics.num_rows += batch.as_ref().map(|b| b.num_rows()).unwrap_or(0);
645            self.metrics.scan_cost += start.elapsed();
646
647            return Some(batch);
648        }
649        drop(map); // Explicitly drop the read lock
650        self.metrics.scan_cost += start.elapsed();
651
652        // Report MemScanMetrics before returning None
653        self.report_mem_scan_metrics();
654
655        None
656    }
657}
658
659fn prune_primary_key(
660    codec: &Arc<DensePrimaryKeyCodec>,
661    pk: &[u8],
662    series: &mut Series,
663    datatypes: &[ConcreteDataType],
664    pk_schema: arrow::datatypes::SchemaRef,
665    predicates: &[SimpleFilterEvaluator],
666) -> bool {
667    // no primary key, we simply return true.
668    if pk_schema.fields().is_empty() {
669        return true;
670    }
671
672    // retrieve primary key values from cache or decode from bytes.
673    let pk_values = if let Some(pk_values) = series.pk_cache.as_ref() {
674        pk_values
675    } else {
676        let pk_values = codec.decode_dense_without_column_id(pk);
677        if let Err(e) = pk_values {
678            error!(e; "Failed to decode primary key");
679            return true;
680        }
681        series.update_pk_cache(pk_values.unwrap());
682        series.pk_cache.as_ref().unwrap()
683    };
684
685    // evaluate predicates against primary key values
686    let mut result = true;
687    for predicate in predicates {
688        // ignore predicates that are not referencing primary key columns
689        let Ok(index) = pk_schema.index_of(predicate.column_name()) else {
690            continue;
691        };
692        // Safety: arrow schema and datatypes are constructed from the same source.
693        let scalar_value = pk_values[index]
694            .try_to_scalar_value(&datatypes[index])
695            .unwrap();
696        result &= predicate.evaluate_scalar(&scalar_value).unwrap_or(true);
697    }
698
699    result
700}
701
702/// A `Series` holds a list of field values of some given primary key.
703pub struct Series {
704    pk_cache: Option<Vec<Value>>,
705    active: ValueBuilder,
706    frozen: Vec<Values>,
707    region_metadata: RegionMetadataRef,
708    capacity: usize,
709}
710
711impl Series {
712    pub(crate) fn with_capacity(
713        region_metadata: &RegionMetadataRef,
714        init_capacity: usize,
715        capacity: usize,
716    ) -> Self {
717        MEMTABLE_ACTIVE_SERIES_COUNT.inc();
718        Self {
719            pk_cache: None,
720            active: ValueBuilder::new(region_metadata, init_capacity),
721            frozen: vec![],
722            region_metadata: region_metadata.clone(),
723            capacity,
724        }
725    }
726
727    pub(crate) fn new(region_metadata: &RegionMetadataRef) -> Self {
728        Self::with_capacity(region_metadata, INITIAL_BUILDER_CAPACITY, BUILDER_CAPACITY)
729    }
730
731    pub fn is_empty(&self) -> bool {
732        self.active.len() == 0 && self.frozen.is_empty()
733    }
734
735    /// Pushes a row of values into Series. Return the size of values.
736    pub(crate) fn push<'a>(
737        &mut self,
738        ts: ValueRef<'a>,
739        sequence: u64,
740        op_type: OpType,
741        values: impl Iterator<Item = ValueRef<'a>>,
742    ) -> usize {
743        // + 10 to avoid potential reallocation.
744        if self.active.len() + 10 > self.capacity {
745            let region_metadata = self.region_metadata.clone();
746            self.freeze(&region_metadata);
747        }
748        self.active.push(ts, sequence, op_type as u8, values)
749    }
750
751    fn update_pk_cache(&mut self, pk_values: Vec<Value>) {
752        self.pk_cache = Some(pk_values);
753    }
754
755    /// Freezes the active part and push it to `frozen`.
756    pub(crate) fn freeze(&mut self, region_metadata: &RegionMetadataRef) {
757        if self.active.len() != 0 {
758            let mut builder = ValueBuilder::new(region_metadata, INITIAL_BUILDER_CAPACITY);
759            std::mem::swap(&mut self.active, &mut builder);
760            self.frozen.push(Values::from(builder));
761        }
762    }
763
764    pub(crate) fn extend(
765        &mut self,
766        ts_v: VectorRef,
767        op_type_v: u8,
768        sequence_v: u64,
769        fields: Vec<VectorRef>,
770    ) -> Result<()> {
771        if !self.active.can_accommodate(&fields)? {
772            let region_metadata = self.region_metadata.clone();
773            self.freeze(&region_metadata);
774        }
775        self.active.extend(ts_v, op_type_v, sequence_v, fields)
776    }
777
778    /// Freezes active part to frozen part and compact frozen part to reduce memory fragmentation.
779    /// Returns the frozen and compacted values.
780    pub(crate) fn compact(&mut self, region_metadata: &RegionMetadataRef) -> Result<&Values> {
781        self.freeze(region_metadata);
782
783        let frozen = &self.frozen;
784
785        // Each series must contain at least one row
786        debug_assert!(!frozen.is_empty());
787
788        if frozen.len() > 1 {
789            // TODO(hl): We should keep track of min/max timestamps for each values and avoid
790            // cloning and sorting when values do not overlap with each other.
791
792            let column_size = frozen[0].fields.len() + 3;
793
794            if cfg!(debug_assertions) {
795                debug_assert!(
796                    frozen
797                        .iter()
798                        .zip(frozen.iter().skip(1))
799                        .all(|(prev, next)| { prev.fields.len() == next.fields.len() })
800                );
801            }
802
803            let arrays = frozen.iter().map(|v| v.columns()).collect::<Vec<_>>();
804            let concatenated = (0..column_size)
805                .map(|i| {
806                    let to_concat = arrays.iter().map(|a| a[i].as_ref()).collect::<Vec<_>>();
807                    arrow::compute::concat(&to_concat)
808                })
809                .collect::<std::result::Result<Vec<_>, _>>()
810                .context(ComputeArrowSnafu)?;
811
812            debug_assert_eq!(concatenated.len(), column_size);
813            let values = Values::from_columns(&concatenated)?;
814            self.frozen = vec![values];
815        };
816        Ok(&self.frozen[0])
817    }
818
819    pub fn read_to_values(&self) -> Vec<Values> {
820        let mut res = Vec::with_capacity(self.frozen.len() + 1);
821        res.extend(self.frozen.iter().cloned());
822        res.push(self.active.finish_cloned());
823        res
824    }
825}
826
827/// `ValueBuilder` holds all the vector builders for field columns.
828pub(crate) struct ValueBuilder {
829    timestamp: Vec<i64>,
830    timestamp_type: ConcreteDataType,
831    sequence: Vec<u64>,
832    op_type: Vec<u8>,
833    fields: Vec<Option<FieldBuilder>>,
834    field_types: Vec<ConcreteDataType>,
835}
836
837impl ValueBuilder {
838    pub(crate) fn new(region_metadata: &RegionMetadataRef, capacity: usize) -> Self {
839        let timestamp_type = region_metadata
840            .time_index_column()
841            .column_schema
842            .data_type
843            .clone();
844        let sequence = Vec::with_capacity(capacity);
845        let op_type = Vec::with_capacity(capacity);
846
847        let field_types = region_metadata
848            .field_columns()
849            .map(|c| c.column_schema.data_type.clone())
850            .collect::<Vec<_>>();
851        let fields = (0..field_types.len()).map(|_| None).collect();
852        Self {
853            timestamp: Vec::with_capacity(capacity),
854            timestamp_type,
855            sequence,
856            op_type,
857            fields,
858            field_types,
859        }
860    }
861
862    /// Returns number of field builders.
863    pub fn num_field_builders(&self) -> usize {
864        self.fields.iter().flatten().count()
865    }
866
867    /// Pushes a new row to `ValueBuilder`.
868    /// We don't need primary keys since they've already be encoded.
869    /// Returns the size of field values.
870    ///
871    /// In this method, we don't check the data type of the value, because it is already checked in the caller.
872    pub(crate) fn push<'a>(
873        &mut self,
874        ts: ValueRef,
875        sequence: u64,
876        op_type: u8,
877        fields: impl Iterator<Item = ValueRef<'a>>,
878    ) -> usize {
879        #[cfg(debug_assertions)]
880        let fields = {
881            let field_vec = fields.collect::<Vec<_>>();
882            debug_assert_eq!(field_vec.len(), self.fields.len());
883            field_vec.into_iter()
884        };
885
886        self.timestamp
887            .push(ts.try_into_timestamp().unwrap().unwrap().value());
888        self.sequence.push(sequence);
889        self.op_type.push(op_type);
890        let num_rows = self.timestamp.len();
891        let mut size = 0;
892        for (idx, field_value) in fields.enumerate() {
893            size += field_value.data_size();
894            if !field_value.is_null() || self.fields[idx].is_some() {
895                if let Some(field) = self.fields[idx].as_mut() {
896                    let _ = field.push(field_value);
897                } else {
898                    let mut mutable_vector =
899                        if let ConcreteDataType::String(_) = &self.field_types[idx] {
900                            FieldBuilder::String(StringBuilder::with_capacity(4, 8))
901                        } else {
902                            FieldBuilder::Other(
903                                self.field_types[idx]
904                                    .create_mutable_vector(num_rows.max(INITIAL_BUILDER_CAPACITY)),
905                            )
906                        };
907                    mutable_vector.push_nulls(num_rows - 1);
908                    mutable_vector
909                        .push(field_value)
910                        .unwrap_or_else(|e| panic!("unexpected field value: {e:?}"));
911                    self.fields[idx] = Some(mutable_vector);
912                    MEMTABLE_ACTIVE_FIELD_BUILDER_COUNT.inc();
913                }
914            }
915        }
916
917        size
918    }
919
920    /// Checks if current value builder have sufficient space to accommodate `fields`.
921    /// Returns false if there is no space to accommodate fields due to offset overflow.
922    pub(crate) fn can_accommodate(&self, fields: &[VectorRef]) -> Result<bool> {
923        for (field_src, field_dest) in fields.iter().zip(self.fields.iter()) {
924            let Some(builder) = field_dest else {
925                continue;
926            };
927            let FieldBuilder::String(builder) = builder else {
928                continue;
929            };
930            let array = field_src.to_arrow_array();
931            let string_array = array
932                .as_any()
933                .downcast_ref::<StringArray>()
934                .with_context(|| error::InvalidBatchSnafu {
935                    reason: format!(
936                        "Field type mismatch, expecting String, given: {}",
937                        field_src.data_type()
938                    ),
939                })?;
940            let space_needed = string_array.value_data().len() as i32;
941            // offset may overflow
942            if builder.next_offset().checked_add(space_needed).is_none() {
943                return Ok(false);
944            }
945        }
946        Ok(true)
947    }
948
949    pub(crate) fn extend(
950        &mut self,
951        ts_v: VectorRef,
952        op_type: u8,
953        sequence: u64,
954        fields: Vec<VectorRef>,
955    ) -> Result<()> {
956        let num_rows_before = self.timestamp.len();
957        let num_rows_to_write = ts_v.len();
958        self.timestamp.reserve(num_rows_to_write);
959        match self.timestamp_type {
960            ConcreteDataType::Timestamp(TimestampType::Second(_)) => {
961                self.timestamp.extend(
962                    ts_v.as_any()
963                        .downcast_ref::<TimestampSecondVector>()
964                        .unwrap()
965                        .iter_data()
966                        .map(|v| v.unwrap().0.value()),
967                );
968            }
969            ConcreteDataType::Timestamp(TimestampType::Millisecond(_)) => {
970                self.timestamp.extend(
971                    ts_v.as_any()
972                        .downcast_ref::<TimestampMillisecondVector>()
973                        .unwrap()
974                        .iter_data()
975                        .map(|v| v.unwrap().0.value()),
976                );
977            }
978            ConcreteDataType::Timestamp(TimestampType::Microsecond(_)) => {
979                self.timestamp.extend(
980                    ts_v.as_any()
981                        .downcast_ref::<TimestampMicrosecondVector>()
982                        .unwrap()
983                        .iter_data()
984                        .map(|v| v.unwrap().0.value()),
985                );
986            }
987            ConcreteDataType::Timestamp(TimestampType::Nanosecond(_)) => {
988                self.timestamp.extend(
989                    ts_v.as_any()
990                        .downcast_ref::<TimestampNanosecondVector>()
991                        .unwrap()
992                        .iter_data()
993                        .map(|v| v.unwrap().0.value()),
994                );
995            }
996            _ => unreachable!(),
997        };
998
999        self.op_type.reserve(num_rows_to_write);
1000        self.op_type
1001            .extend(iter::repeat_n(op_type, num_rows_to_write));
1002        self.sequence.reserve(num_rows_to_write);
1003        self.sequence
1004            .extend(iter::repeat_n(sequence, num_rows_to_write));
1005
1006        for (field_idx, (field_src, field_dest)) in
1007            fields.into_iter().zip(self.fields.iter_mut()).enumerate()
1008        {
1009            let builder = field_dest.get_or_insert_with(|| {
1010                let mut field_builder =
1011                    FieldBuilder::create(&self.field_types[field_idx], INITIAL_BUILDER_CAPACITY);
1012                field_builder.push_nulls(num_rows_before);
1013                field_builder
1014            });
1015            match builder {
1016                FieldBuilder::String(builder) => {
1017                    let array = field_src.to_arrow_array();
1018                    let string_array =
1019                        array
1020                            .as_any()
1021                            .downcast_ref::<StringArray>()
1022                            .with_context(|| error::InvalidBatchSnafu {
1023                                reason: format!(
1024                                    "Field type mismatch, expecting String, given: {}",
1025                                    field_src.data_type()
1026                                ),
1027                            })?;
1028                    builder.append_array(string_array);
1029                }
1030                FieldBuilder::Other(builder) => {
1031                    let len = field_src.len();
1032                    builder
1033                        .extend_slice_of(&*field_src, 0, len)
1034                        .context(error::ComputeVectorSnafu)?;
1035                }
1036            }
1037        }
1038        Ok(())
1039    }
1040
1041    /// Returns the length of [ValueBuilder]
1042    fn len(&self) -> usize {
1043        let sequence_len = self.sequence.len();
1044        debug_assert_eq!(sequence_len, self.op_type.len());
1045        debug_assert_eq!(sequence_len, self.timestamp.len());
1046        sequence_len
1047    }
1048
1049    fn finish_cloned(&self) -> Values {
1050        let num_rows = self.sequence.len();
1051        let fields = self
1052            .fields
1053            .iter()
1054            .enumerate()
1055            .map(|(i, v)| {
1056                if let Some(v) = v {
1057                    MEMTABLE_ACTIVE_FIELD_BUILDER_COUNT.dec();
1058                    v.finish_cloned()
1059                } else {
1060                    let mut single_null = self.field_types[i].create_mutable_vector(num_rows);
1061                    single_null.push_nulls(num_rows);
1062                    single_null.to_vector()
1063                }
1064            })
1065            .collect::<Vec<_>>();
1066
1067        let sequence = Arc::new(UInt64Vector::from_vec(self.sequence.clone()));
1068        let op_type = Arc::new(UInt8Vector::from_vec(self.op_type.clone()));
1069        let timestamp: VectorRef = match self.timestamp_type {
1070            ConcreteDataType::Timestamp(TimestampType::Second(_)) => {
1071                Arc::new(TimestampSecondVector::from_vec(self.timestamp.clone()))
1072            }
1073            ConcreteDataType::Timestamp(TimestampType::Millisecond(_)) => {
1074                Arc::new(TimestampMillisecondVector::from_vec(self.timestamp.clone()))
1075            }
1076            ConcreteDataType::Timestamp(TimestampType::Microsecond(_)) => {
1077                Arc::new(TimestampMicrosecondVector::from_vec(self.timestamp.clone()))
1078            }
1079            ConcreteDataType::Timestamp(TimestampType::Nanosecond(_)) => {
1080                Arc::new(TimestampNanosecondVector::from_vec(self.timestamp.clone()))
1081            }
1082            _ => unreachable!(),
1083        };
1084
1085        if cfg!(debug_assertions) {
1086            debug_assert_eq!(timestamp.len(), sequence.len());
1087            debug_assert_eq!(timestamp.len(), op_type.len());
1088            for field in &fields {
1089                debug_assert_eq!(timestamp.len(), field.len());
1090            }
1091        }
1092
1093        Values {
1094            timestamp,
1095            sequence,
1096            op_type,
1097            fields,
1098        }
1099    }
1100}
1101
1102/// [Values] holds an immutable vectors of field columns, including `sequence` and `op_type`.
1103#[derive(Clone)]
1104pub struct Values {
1105    pub(crate) timestamp: VectorRef,
1106    pub(crate) sequence: Arc<UInt64Vector>,
1107    pub(crate) op_type: Arc<UInt8Vector>,
1108    pub(crate) fields: Vec<VectorRef>,
1109}
1110
1111impl Values {
1112    /// Converts [Values] to `Batch`, applies the optional sequence filter, sorts the batch
1113    /// according to `timestamp, sequence` desc, and applies dedup/merge according to `merge_mode`.
1114    pub fn to_batch(
1115        &self,
1116        primary_key: &[u8],
1117        metadata: &RegionMetadataRef,
1118        projection: &HashSet<ColumnId>,
1119        sequence: Option<SequenceRange>,
1120        dedup: bool,
1121        merge_mode: MergeMode,
1122    ) -> Result<Batch> {
1123        let builder = BatchBuilder::with_required_columns(
1124            primary_key.to_vec(),
1125            self.timestamp.clone(),
1126            self.sequence.clone(),
1127            self.op_type.clone(),
1128        );
1129
1130        let fields = metadata
1131            .field_columns()
1132            .zip(self.fields.iter())
1133            .filter_map(|(c, f)| {
1134                projection.get(&c.column_id).map(|c| BatchColumn {
1135                    column_id: *c,
1136                    data: f.clone(),
1137                })
1138            })
1139            .collect();
1140
1141        let mut batch = builder.with_fields(fields).build()?;
1142        // The sequence filter must be applied before dedup/merge to:
1143        // - avoid dropping a timestamp when the newest row is out of range
1144        // - avoid filling null fields from rows that should be excluded by the sequence filter.
1145        batch.filter_by_sequence(sequence)?;
1146
1147        match (dedup, merge_mode) {
1148            // append-only, keep duplicate rows.
1149            (false, _) => batch.sort(false)?,
1150            // keep the last row for each timestamp.
1151            (true, MergeMode::LastRow) => batch.sort(true)?,
1152            // keep the last non-null value for each field.
1153            (true, MergeMode::LastNonNull) => {
1154                batch.sort(false)?;
1155                batch.merge_last_non_null()?;
1156            }
1157        }
1158        Ok(batch)
1159    }
1160
1161    /// Returns a vector of all columns converted to arrow [Array](datatypes::arrow::array::Array) in [Values].
1162    fn columns(&self) -> Vec<ArrayRef> {
1163        let mut res = Vec::with_capacity(3 + self.fields.len());
1164        res.push(self.timestamp.to_arrow_array());
1165        res.push(self.sequence.to_arrow_array());
1166        res.push(self.op_type.to_arrow_array());
1167        res.extend(self.fields.iter().map(|f| f.to_arrow_array()));
1168        res
1169    }
1170
1171    /// Builds a new [Values] instance from columns.
1172    fn from_columns(cols: &[ArrayRef]) -> Result<Self> {
1173        debug_assert!(cols.len() >= 3);
1174        let timestamp = Helper::try_into_vector(&cols[0]).context(ConvertVectorSnafu)?;
1175        let sequence =
1176            Arc::new(UInt64Vector::try_from_arrow_array(&cols[1]).context(ConvertVectorSnafu)?);
1177        let op_type =
1178            Arc::new(UInt8Vector::try_from_arrow_array(&cols[2]).context(ConvertVectorSnafu)?);
1179        let fields = Helper::try_into_vectors(&cols[3..]).context(ConvertVectorSnafu)?;
1180
1181        Ok(Self {
1182            timestamp,
1183            sequence,
1184            op_type,
1185            fields,
1186        })
1187    }
1188}
1189
1190impl From<ValueBuilder> for Values {
1191    fn from(mut value: ValueBuilder) -> Self {
1192        let num_rows = value.len();
1193        let fields = value
1194            .fields
1195            .iter_mut()
1196            .enumerate()
1197            .map(|(i, v)| {
1198                if let Some(v) = v {
1199                    MEMTABLE_ACTIVE_FIELD_BUILDER_COUNT.dec();
1200                    v.finish()
1201                } else {
1202                    let mut single_null = value.field_types[i].create_mutable_vector(num_rows);
1203                    single_null.push_nulls(num_rows);
1204                    single_null.to_vector()
1205                }
1206            })
1207            .collect::<Vec<_>>();
1208
1209        let sequence = Arc::new(UInt64Vector::from_vec(value.sequence));
1210        let op_type = Arc::new(UInt8Vector::from_vec(value.op_type));
1211        let timestamp: VectorRef = match value.timestamp_type {
1212            ConcreteDataType::Timestamp(TimestampType::Second(_)) => {
1213                Arc::new(TimestampSecondVector::from_vec(value.timestamp))
1214            }
1215            ConcreteDataType::Timestamp(TimestampType::Millisecond(_)) => {
1216                Arc::new(TimestampMillisecondVector::from_vec(value.timestamp))
1217            }
1218            ConcreteDataType::Timestamp(TimestampType::Microsecond(_)) => {
1219                Arc::new(TimestampMicrosecondVector::from_vec(value.timestamp))
1220            }
1221            ConcreteDataType::Timestamp(TimestampType::Nanosecond(_)) => {
1222                Arc::new(TimestampNanosecondVector::from_vec(value.timestamp))
1223            }
1224            _ => unreachable!(),
1225        };
1226
1227        if cfg!(debug_assertions) {
1228            debug_assert_eq!(timestamp.len(), sequence.len());
1229            debug_assert_eq!(timestamp.len(), op_type.len());
1230            for field in &fields {
1231                debug_assert_eq!(timestamp.len(), field.len());
1232            }
1233        }
1234
1235        Self {
1236            timestamp,
1237            sequence,
1238            op_type,
1239            fields,
1240        }
1241    }
1242}
1243
1244struct TimeSeriesIterBuilder {
1245    series_set: SeriesSet,
1246    projection: HashSet<ColumnId>,
1247    predicate: PredicateGroup,
1248    dedup: bool,
1249    sequence: Option<SequenceRange>,
1250    merge_mode: MergeMode,
1251    batch_to_record_batch: Arc<BatchToRecordBatchContext>,
1252}
1253
1254impl IterBuilder for TimeSeriesIterBuilder {
1255    fn build(&self, metrics: Option<MemScanMetrics>) -> Result<BoxedBatchIterator> {
1256        let iter = self.series_set.iter_series(
1257            self.projection.clone(),
1258            self.predicate.clone(),
1259            self.dedup,
1260            self.merge_mode,
1261            self.sequence,
1262            metrics,
1263        )?;
1264        if self.merge_mode == MergeMode::LastNonNull {
1265            let iter = LastNonNullIter::new(iter);
1266            Ok(Box::new(iter))
1267        } else {
1268            Ok(Box::new(iter))
1269        }
1270    }
1271
1272    fn is_record_batch(&self) -> bool {
1273        true
1274    }
1275
1276    fn build_record_batch(
1277        &self,
1278        time_range: Option<(Timestamp, Timestamp)>,
1279        metrics: Option<MemScanMetrics>,
1280    ) -> Result<BoxedRecordBatchIterator> {
1281        let iter = self.build(metrics)?;
1282        let iter: BoxedBatchIterator = if let Some(time_range) = time_range {
1283            let time_filters = self.predicate.time_filters();
1284            Box::new(PruneTimeIterator::new(iter, time_range, time_filters))
1285        } else {
1286            iter
1287        };
1288        Ok(self.batch_to_record_batch.adapt_iter(iter))
1289    }
1290}
1291
1292#[cfg(test)]
1293mod tests {
1294    use std::collections::{HashMap, HashSet};
1295
1296    use api::helper::ColumnDataTypeWrapper;
1297    use api::v1::helper::row;
1298    use api::v1::value::ValueData;
1299    use api::v1::{Mutation, Rows, SemanticType};
1300    use common_time::Timestamp;
1301    use datatypes::prelude::{ConcreteDataType, ScalarVector};
1302    use datatypes::schema::ColumnSchema;
1303    use datatypes::value::{OrderedFloat, Value};
1304    use datatypes::vectors::{Float64Vector, Int64Vector, TimestampMillisecondVector};
1305    use mito_codec::row_converter::SortField;
1306    use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
1307    use store_api::storage::RegionId;
1308
1309    use super::*;
1310    use crate::test_util::column_metadata_to_column_schema;
1311
1312    fn schema_for_test() -> RegionMetadataRef {
1313        let mut builder = RegionMetadataBuilder::new(RegionId::new(123, 456));
1314        builder
1315            .push_column_metadata(ColumnMetadata {
1316                column_schema: ColumnSchema::new("k0", ConcreteDataType::string_datatype(), false),
1317                semantic_type: SemanticType::Tag,
1318                column_id: 0,
1319            })
1320            .push_column_metadata(ColumnMetadata {
1321                column_schema: ColumnSchema::new("k1", ConcreteDataType::int64_datatype(), false),
1322                semantic_type: SemanticType::Tag,
1323                column_id: 1,
1324            })
1325            .push_column_metadata(ColumnMetadata {
1326                column_schema: ColumnSchema::new(
1327                    "ts",
1328                    ConcreteDataType::timestamp_millisecond_datatype(),
1329                    false,
1330                ),
1331                semantic_type: SemanticType::Timestamp,
1332                column_id: 2,
1333            })
1334            .push_column_metadata(ColumnMetadata {
1335                column_schema: ColumnSchema::new("v0", ConcreteDataType::int64_datatype(), true),
1336                semantic_type: SemanticType::Field,
1337                column_id: 3,
1338            })
1339            .push_column_metadata(ColumnMetadata {
1340                column_schema: ColumnSchema::new("v1", ConcreteDataType::float64_datatype(), true),
1341                semantic_type: SemanticType::Field,
1342                column_id: 4,
1343            })
1344            .primary_key(vec![0, 1]);
1345        let region_metadata = builder.build().unwrap();
1346        Arc::new(region_metadata)
1347    }
1348
1349    fn ts_value_ref(val: i64) -> ValueRef<'static> {
1350        ValueRef::Timestamp(Timestamp::new_millisecond(val))
1351    }
1352
1353    fn field_value_ref(v0: i64, v1: f64) -> impl Iterator<Item = ValueRef<'static>> {
1354        vec![ValueRef::Int64(v0), ValueRef::Float64(OrderedFloat(v1))].into_iter()
1355    }
1356
1357    fn check_values(values: &Values, expect: &[(i64, u64, u8, i64, f64)]) {
1358        let ts = values
1359            .timestamp
1360            .as_any()
1361            .downcast_ref::<TimestampMillisecondVector>()
1362            .unwrap();
1363
1364        let v0 = values.fields[0]
1365            .as_any()
1366            .downcast_ref::<Int64Vector>()
1367            .unwrap();
1368        let v1 = values.fields[1]
1369            .as_any()
1370            .downcast_ref::<Float64Vector>()
1371            .unwrap();
1372        let read = ts
1373            .iter_data()
1374            .zip(values.sequence.iter_data())
1375            .zip(values.op_type.iter_data())
1376            .zip(v0.iter_data())
1377            .zip(v1.iter_data())
1378            .map(|((((ts, sequence), op_type), v0), v1)| {
1379                (
1380                    ts.unwrap().0.value(),
1381                    sequence.unwrap(),
1382                    op_type.unwrap(),
1383                    v0.unwrap(),
1384                    v1.unwrap(),
1385                )
1386            })
1387            .collect::<Vec<_>>();
1388        assert_eq!(expect, &read);
1389    }
1390
1391    #[test]
1392    fn test_series() {
1393        let region_metadata = schema_for_test();
1394        let mut series = Series::new(&region_metadata);
1395        series.push(ts_value_ref(1), 0, OpType::Put, field_value_ref(1, 10.1));
1396        series.push(ts_value_ref(2), 0, OpType::Put, field_value_ref(2, 10.2));
1397        assert_eq!(2, series.active.timestamp.len());
1398        assert_eq!(0, series.frozen.len());
1399
1400        let values = series.compact(&region_metadata).unwrap();
1401        check_values(values, &[(1, 0, 1, 1, 10.1), (2, 0, 1, 2, 10.2)]);
1402        assert_eq!(0, series.active.timestamp.len());
1403        assert_eq!(1, series.frozen.len());
1404    }
1405
1406    #[test]
1407    fn test_series_with_nulls() {
1408        let region_metadata = schema_for_test();
1409        let mut series = Series::new(&region_metadata);
1410        // col1: NULL 1 2 3
1411        // col2: NULL NULL 10.2 NULL
1412        series.push(
1413            ts_value_ref(1),
1414            0,
1415            OpType::Put,
1416            vec![ValueRef::Null, ValueRef::Null].into_iter(),
1417        );
1418        series.push(
1419            ts_value_ref(1),
1420            0,
1421            OpType::Put,
1422            vec![ValueRef::Int64(1), ValueRef::Null].into_iter(),
1423        );
1424        series.push(ts_value_ref(1), 2, OpType::Put, field_value_ref(2, 10.2));
1425        series.push(
1426            ts_value_ref(1),
1427            3,
1428            OpType::Put,
1429            vec![ValueRef::Int64(2), ValueRef::Null].into_iter(),
1430        );
1431        assert_eq!(4, series.active.timestamp.len());
1432        assert_eq!(0, series.frozen.len());
1433
1434        let values = series.compact(&region_metadata).unwrap();
1435        assert_eq!(values.fields[0].null_count(), 1);
1436        assert_eq!(values.fields[1].null_count(), 3);
1437        assert_eq!(0, series.active.timestamp.len());
1438        assert_eq!(1, series.frozen.len());
1439    }
1440
1441    fn check_value(batch: &Batch, expect: Vec<Vec<Value>>) {
1442        let ts_len = batch.timestamps().len();
1443        assert_eq!(batch.sequences().len(), ts_len);
1444        assert_eq!(batch.op_types().len(), ts_len);
1445        for f in batch.fields() {
1446            assert_eq!(f.data.len(), ts_len);
1447        }
1448
1449        let mut rows = vec![];
1450        for idx in 0..ts_len {
1451            let mut row = Vec::with_capacity(batch.fields().len() + 3);
1452            row.push(batch.timestamps().get(idx));
1453            row.push(batch.sequences().get(idx));
1454            row.push(batch.op_types().get(idx));
1455            row.extend(batch.fields().iter().map(|f| f.data.get(idx)));
1456            rows.push(row);
1457        }
1458
1459        assert_eq!(expect.len(), rows.len());
1460        for (idx, row) in rows.iter().enumerate() {
1461            assert_eq!(&expect[idx], row);
1462        }
1463    }
1464
1465    #[test]
1466    fn test_values_sort() {
1467        let schema = schema_for_test();
1468        let timestamp = Arc::new(TimestampMillisecondVector::from_vec(vec![1, 2, 3, 4, 3]));
1469        let sequence = Arc::new(UInt64Vector::from_vec(vec![1, 1, 1, 1, 2]));
1470        let op_type = Arc::new(UInt8Vector::from_vec(vec![1, 1, 1, 1, 0]));
1471
1472        let fields = vec![
1473            Arc::new(Int64Vector::from_vec(vec![4, 3, 2, 1, 2])) as Arc<_>,
1474            Arc::new(Float64Vector::from_vec(vec![1.1, 2.1, 4.2, 3.3, 4.2])) as Arc<_>,
1475        ];
1476        let values = Values {
1477            timestamp: timestamp as Arc<_>,
1478            sequence,
1479            op_type,
1480            fields,
1481        };
1482
1483        let batch = values
1484            .to_batch(
1485                b"test",
1486                &schema,
1487                &[0, 1, 2, 3, 4].into_iter().collect(),
1488                None,
1489                true,
1490                MergeMode::LastRow,
1491            )
1492            .unwrap();
1493        check_value(
1494            &batch,
1495            vec![
1496                vec![
1497                    Value::Timestamp(Timestamp::new_millisecond(1)),
1498                    Value::UInt64(1),
1499                    Value::UInt8(1),
1500                    Value::Int64(4),
1501                    Value::Float64(OrderedFloat(1.1)),
1502                ],
1503                vec![
1504                    Value::Timestamp(Timestamp::new_millisecond(2)),
1505                    Value::UInt64(1),
1506                    Value::UInt8(1),
1507                    Value::Int64(3),
1508                    Value::Float64(OrderedFloat(2.1)),
1509                ],
1510                vec![
1511                    Value::Timestamp(Timestamp::new_millisecond(3)),
1512                    Value::UInt64(2),
1513                    Value::UInt8(0),
1514                    Value::Int64(2),
1515                    Value::Float64(OrderedFloat(4.2)),
1516                ],
1517                vec![
1518                    Value::Timestamp(Timestamp::new_millisecond(4)),
1519                    Value::UInt64(1),
1520                    Value::UInt8(1),
1521                    Value::Int64(1),
1522                    Value::Float64(OrderedFloat(3.3)),
1523                ],
1524            ],
1525        )
1526    }
1527
1528    #[test]
1529    fn test_last_non_null_should_filter_by_sequence_before_merge_drop_ts() {
1530        let schema = schema_for_test();
1531        let projection: HashSet<_> = [0, 1, 2, 3, 4].into_iter().collect();
1532
1533        // Same timestamp, newest sequence is out of range. We should still keep the timestamp by
1534        // using the latest row *within* the sequence range as the base row.
1535        //
1536        // Expect after filtering seq<=2:
1537        // - base row: seq=2
1538        // - v0 from seq=2, v1 filled from seq=1
1539        let timestamp = Arc::new(TimestampMillisecondVector::from_vec(vec![1, 1, 1]));
1540        let sequence = Arc::new(UInt64Vector::from_vec(vec![1, 2, 3]));
1541        let op_type = Arc::new(UInt8Vector::from_vec(vec![OpType::Put as u8; 3]));
1542        let fields = vec![
1543            Arc::new(Int64Vector::from(vec![None, Some(10), None])) as Arc<_>,
1544            Arc::new(Float64Vector::from(vec![Some(1.5), None, None])) as Arc<_>,
1545        ];
1546        let values = Values {
1547            timestamp: timestamp as Arc<_>,
1548            sequence,
1549            op_type,
1550            fields,
1551        };
1552
1553        let batch = values
1554            .to_batch(
1555                b"test",
1556                &schema,
1557                &projection,
1558                Some(SequenceRange::LtEq { max: 2 }),
1559                true,
1560                MergeMode::LastNonNull,
1561            )
1562            .unwrap();
1563
1564        check_value(
1565            &batch,
1566            vec![vec![
1567                Value::Timestamp(Timestamp::new_millisecond(1)),
1568                Value::UInt64(2),
1569                Value::UInt8(OpType::Put as u8),
1570                Value::Int64(10),
1571                Value::Float64(OrderedFloat(1.5)),
1572            ]],
1573        );
1574    }
1575
1576    #[test]
1577    fn test_last_non_null_should_filter_by_sequence_before_merge_no_fill_from_out_of_range_row() {
1578        let schema = schema_for_test();
1579        let projection: HashSet<_> = [0, 1, 2, 3, 4].into_iter().collect();
1580
1581        // Same timestamp, older sequence is out of range. We must not fill null fields using rows
1582        // that should be excluded by the sequence filter.
1583        //
1584        // Expect after filtering seq>1:
1585        // - keep only seq=2 row, v0 stays NULL.
1586        let timestamp = Arc::new(TimestampMillisecondVector::from_vec(vec![1, 1]));
1587        let sequence = Arc::new(UInt64Vector::from_vec(vec![1, 2]));
1588        let op_type = Arc::new(UInt8Vector::from_vec(vec![OpType::Put as u8; 2]));
1589        let fields = vec![
1590            Arc::new(Int64Vector::from(vec![Some(10), None])) as Arc<_>,
1591            Arc::new(Float64Vector::from(vec![Some(1.0), Some(1.0)])) as Arc<_>,
1592        ];
1593        let values = Values {
1594            timestamp: timestamp as Arc<_>,
1595            sequence,
1596            op_type,
1597            fields,
1598        };
1599
1600        let batch = values
1601            .to_batch(
1602                b"test",
1603                &schema,
1604                &projection,
1605                Some(SequenceRange::Gt { min: 1 }),
1606                true,
1607                MergeMode::LastNonNull,
1608            )
1609            .unwrap();
1610
1611        check_value(
1612            &batch,
1613            vec![vec![
1614                Value::Timestamp(Timestamp::new_millisecond(1)),
1615                Value::UInt64(2),
1616                Value::UInt8(OpType::Put as u8),
1617                Value::Null,
1618                Value::Float64(OrderedFloat(1.0)),
1619            ]],
1620        );
1621    }
1622
1623    fn build_key_values(schema: &RegionMetadataRef, k0: String, k1: i64, len: usize) -> KeyValues {
1624        let column_schema = schema
1625            .column_metadatas
1626            .iter()
1627            .map(|c| api::v1::ColumnSchema {
1628                column_name: c.column_schema.name.clone(),
1629                datatype: ColumnDataTypeWrapper::try_from(c.column_schema.data_type.clone())
1630                    .unwrap()
1631                    .datatype() as i32,
1632                semantic_type: c.semantic_type as i32,
1633                ..Default::default()
1634            })
1635            .collect();
1636
1637        let rows = (0..len)
1638            .map(|i| {
1639                row(vec![
1640                    ValueData::StringValue(k0.clone()),
1641                    ValueData::I64Value(k1),
1642                    ValueData::TimestampMillisecondValue(i as i64),
1643                    ValueData::I64Value(i as i64),
1644                    ValueData::F64Value(i as f64),
1645                ])
1646            })
1647            .collect();
1648        let mutation = api::v1::Mutation {
1649            op_type: 1,
1650            sequence: 0,
1651            rows: Some(Rows {
1652                schema: column_schema,
1653                rows,
1654            }),
1655            write_hint: None,
1656        };
1657        KeyValues::new(schema.as_ref(), mutation).unwrap()
1658    }
1659
1660    #[test]
1661    fn test_series_set_concurrency() {
1662        let schema = schema_for_test();
1663        let row_codec = Arc::new(DensePrimaryKeyCodec::with_fields(
1664            schema
1665                .primary_key_columns()
1666                .map(|c| {
1667                    (
1668                        c.column_id,
1669                        SortField::new(c.column_schema.data_type.clone()),
1670                    )
1671                })
1672                .collect(),
1673        ));
1674        let set = Arc::new(SeriesSet::new(schema.clone(), row_codec));
1675
1676        let concurrency = 32;
1677        let pk_num = concurrency * 2;
1678        let mut handles = Vec::with_capacity(concurrency);
1679        for i in 0..concurrency {
1680            let set = set.clone();
1681            let schema = schema.clone();
1682            let column_schemas = schema
1683                .column_metadatas
1684                .iter()
1685                .map(column_metadata_to_column_schema)
1686                .collect::<Vec<_>>();
1687            let handle = std::thread::spawn(move || {
1688                for j in i * 100..(i + 1) * 100 {
1689                    let pk = j % pk_num;
1690                    let primary_key = format!("pk-{}", pk).as_bytes().to_vec();
1691
1692                    let kvs = KeyValues::new(
1693                        &schema,
1694                        Mutation {
1695                            op_type: OpType::Put as i32,
1696                            sequence: j as u64,
1697                            rows: Some(Rows {
1698                                schema: column_schemas.clone(),
1699                                rows: vec![row(vec![
1700                                    ValueData::StringValue(format!("{}", j)),
1701                                    ValueData::I64Value(j as i64),
1702                                    ValueData::TimestampMillisecondValue(j as i64),
1703                                    ValueData::I64Value(j as i64),
1704                                    ValueData::F64Value(j as f64),
1705                                ])],
1706                            }),
1707                            write_hint: None,
1708                        },
1709                    )
1710                    .unwrap();
1711                    set.push_to_series(primary_key, &kvs.iter().next().unwrap());
1712                }
1713            });
1714            handles.push(handle);
1715        }
1716        for h in handles {
1717            h.join().unwrap();
1718        }
1719
1720        let mut timestamps = Vec::with_capacity(concurrency * 100);
1721        let mut sequences = Vec::with_capacity(concurrency * 100);
1722        let mut op_types = Vec::with_capacity(concurrency * 100);
1723        let mut v0 = Vec::with_capacity(concurrency * 100);
1724
1725        for i in 0..pk_num {
1726            let pk = format!("pk-{}", i).as_bytes().to_vec();
1727            let series = set.get_series(&pk).unwrap();
1728            let mut guard = series.write().unwrap();
1729            let values = guard.compact(&schema).unwrap();
1730            timestamps.extend(values.sequence.iter_data().map(|v| v.unwrap() as i64));
1731            sequences.extend(values.sequence.iter_data().map(|v| v.unwrap() as i64));
1732            op_types.extend(values.op_type.iter_data().map(|v| v.unwrap()));
1733            v0.extend(
1734                values
1735                    .fields
1736                    .first()
1737                    .unwrap()
1738                    .as_any()
1739                    .downcast_ref::<Int64Vector>()
1740                    .unwrap()
1741                    .iter_data()
1742                    .map(|v| v.unwrap()),
1743            );
1744        }
1745
1746        let expected_sequence = (0..(concurrency * 100) as i64).collect::<HashSet<_>>();
1747        assert_eq!(
1748            expected_sequence,
1749            sequences.iter().copied().collect::<HashSet<_>>()
1750        );
1751
1752        op_types.iter().all(|op| *op == OpType::Put as u8);
1753        assert_eq!(
1754            expected_sequence,
1755            timestamps.iter().copied().collect::<HashSet<_>>()
1756        );
1757
1758        assert_eq!(timestamps, sequences);
1759        assert_eq!(v0, timestamps);
1760    }
1761
1762    #[test]
1763    fn test_memtable() {
1764        common_telemetry::init_default_ut_logging();
1765        check_memtable_dedup(true);
1766        check_memtable_dedup(false);
1767    }
1768
1769    fn check_memtable_dedup(dedup: bool) {
1770        let schema = schema_for_test();
1771        let kvs = build_key_values(&schema, "hello".to_string(), 42, 100);
1772        let memtable = TimeSeriesMemtable::new(schema, 42, None, dedup, MergeMode::LastRow);
1773        memtable.write(&kvs).unwrap();
1774        memtable.write(&kvs).unwrap();
1775
1776        let mut expected_ts: HashMap<i64, usize> = HashMap::new();
1777        for ts in kvs.iter().map(|kv| {
1778            kv.timestamp()
1779                .try_into_timestamp()
1780                .unwrap()
1781                .unwrap()
1782                .value()
1783        }) {
1784            *expected_ts.entry(ts).or_default() += if dedup { 1 } else { 2 };
1785        }
1786
1787        let ranges = memtable.ranges(None, RangesOptions::default()).unwrap();
1788        let range = ranges.ranges.into_values().next().unwrap();
1789        let iter = range.build_iter().unwrap();
1790        let mut read = HashMap::new();
1791
1792        for ts in iter
1793            .flat_map(|batch| {
1794                batch
1795                    .unwrap()
1796                    .timestamps()
1797                    .as_any()
1798                    .downcast_ref::<TimestampMillisecondVector>()
1799                    .unwrap()
1800                    .iter_data()
1801                    .collect::<Vec<_>>()
1802                    .into_iter()
1803            })
1804            .map(|v| v.unwrap().0.value())
1805        {
1806            *read.entry(ts).or_default() += 1;
1807        }
1808        assert_eq!(expected_ts, read);
1809
1810        let stats = memtable.stats();
1811        assert!(stats.bytes_allocated() > 0);
1812        assert_eq!(
1813            Some((
1814                Timestamp::new_millisecond(0),
1815                Timestamp::new_millisecond(99)
1816            )),
1817            stats.time_range()
1818        );
1819    }
1820
1821    #[test]
1822    fn test_memtable_projection() {
1823        common_telemetry::init_default_ut_logging();
1824        let schema = schema_for_test();
1825        let kvs = build_key_values(&schema, "hello".to_string(), 42, 100);
1826        let memtable = TimeSeriesMemtable::new(schema, 42, None, true, MergeMode::LastRow);
1827        memtable.write(&kvs).unwrap();
1828
1829        let iter = memtable
1830            .ranges(Some(&[3]), RangesOptions::default())
1831            .unwrap()
1832            .build(None)
1833            .unwrap();
1834
1835        let mut v0_all = vec![];
1836
1837        for res in iter {
1838            let batch = res.unwrap();
1839            assert_eq!(1, batch.fields().len());
1840            let v0 = batch
1841                .fields()
1842                .first()
1843                .unwrap()
1844                .data
1845                .as_any()
1846                .downcast_ref::<Int64Vector>()
1847                .unwrap();
1848            v0_all.extend(v0.iter_data().map(|v| v.unwrap()));
1849        }
1850        assert_eq!((0..100i64).collect::<Vec<_>>(), v0_all);
1851    }
1852
1853    #[test]
1854    fn test_memtable_concurrent_write_read() {
1855        common_telemetry::init_default_ut_logging();
1856        let schema = schema_for_test();
1857        let memtable = Arc::new(TimeSeriesMemtable::new(
1858            schema.clone(),
1859            42,
1860            None,
1861            true,
1862            MergeMode::LastRow,
1863        ));
1864
1865        // Number of writer threads
1866        let num_writers = 10;
1867        // Number of reader threads
1868        let num_readers = 5;
1869        // Number of series per writer
1870        let series_per_writer = 100;
1871        // Number of rows per series
1872        let rows_per_series = 10;
1873        // Total number of series
1874        let total_series = num_writers * series_per_writer;
1875
1876        // Create a barrier to synchronize the start of all threads
1877        let barrier = Arc::new(std::sync::Barrier::new(num_writers + num_readers + 1));
1878
1879        // Spawn writer threads
1880        let mut writer_handles = Vec::with_capacity(num_writers);
1881        for writer_id in 0..num_writers {
1882            let memtable = memtable.clone();
1883            let schema = schema.clone();
1884            let barrier = barrier.clone();
1885
1886            let handle = std::thread::spawn(move || {
1887                // Wait for all threads to be ready
1888                barrier.wait();
1889
1890                // Create and write series
1891                for series_id in 0..series_per_writer {
1892                    let series_key = format!("writer-{}-series-{}", writer_id, series_id);
1893                    let kvs =
1894                        build_key_values(&schema, series_key, series_id as i64, rows_per_series);
1895                    memtable.write(&kvs).unwrap();
1896                }
1897            });
1898
1899            writer_handles.push(handle);
1900        }
1901
1902        // Spawn reader threads
1903        let mut reader_handles = Vec::with_capacity(num_readers);
1904        for _ in 0..num_readers {
1905            let memtable = memtable.clone();
1906            let barrier = barrier.clone();
1907
1908            let handle = std::thread::spawn(move || {
1909                barrier.wait();
1910
1911                for _ in 0..10 {
1912                    let iter = memtable
1913                        .ranges(None, RangesOptions::default())
1914                        .unwrap()
1915                        .build(None)
1916                        .unwrap();
1917                    for batch_result in iter {
1918                        let _ = batch_result.unwrap();
1919                    }
1920                }
1921            });
1922
1923            reader_handles.push(handle);
1924        }
1925
1926        barrier.wait();
1927
1928        for handle in writer_handles {
1929            handle.join().unwrap();
1930        }
1931        for handle in reader_handles {
1932            handle.join().unwrap();
1933        }
1934
1935        let iter = memtable
1936            .ranges(None, RangesOptions::default())
1937            .unwrap()
1938            .build(None)
1939            .unwrap();
1940        let mut series_count = 0;
1941        let mut row_count = 0;
1942
1943        for batch_result in iter {
1944            let batch = batch_result.unwrap();
1945            series_count += 1;
1946            row_count += batch.num_rows();
1947        }
1948        assert_eq!(total_series, series_count);
1949        assert_eq!(total_series * rows_per_series, row_count);
1950    }
1951
1952    #[test]
1953    fn test_build_record_batch_iter_from_memtable() {
1954        let schema = schema_for_test();
1955        let memtable = TimeSeriesMemtable::new(schema.clone(), 1, None, true, MergeMode::LastRow);
1956
1957        let kvs = build_key_values(&schema, "test".to_string(), 1, 10);
1958        memtable.write(&kvs).unwrap();
1959
1960        let read_column_ids: Vec<ColumnId> = schema
1961            .column_metadatas
1962            .iter()
1963            .map(|c| c.column_id)
1964            .collect();
1965        let ranges = memtable
1966            .ranges(Some(&read_column_ids), RangesOptions::default())
1967            .unwrap();
1968        assert_eq!(1, ranges.ranges.len());
1969
1970        let range = ranges.ranges.into_values().next().unwrap();
1971        let mut iter = range.build_record_batch_iter(None, None).unwrap();
1972        let rb = iter.next().transpose().unwrap().unwrap();
1973        assert_eq!(10, rb.num_rows());
1974        // k0, k1 (pk columns), v0, v1 (field columns), ts, __primary_key, __sequence, __op_type
1975        let schema = rb.schema();
1976        let column_names: Vec<_> = schema.fields().iter().map(|f| f.name().as_str()).collect();
1977        assert_eq!(
1978            column_names,
1979            vec![
1980                "k0",
1981                "k1",
1982                "v0",
1983                "v1",
1984                "ts",
1985                "__primary_key",
1986                "__sequence",
1987                "__op_type",
1988            ]
1989        );
1990        assert!(iter.next().is_none());
1991    }
1992
1993    #[test]
1994    fn test_build_record_batch_iter_with_time_range() {
1995        let schema = schema_for_test();
1996        let memtable = TimeSeriesMemtable::new(schema.clone(), 1, None, true, MergeMode::LastRow);
1997
1998        let kvs = build_key_values(&schema, "test".to_string(), 1, 10);
1999        memtable.write(&kvs).unwrap();
2000
2001        let read_column_ids: Vec<ColumnId> = schema
2002            .column_metadatas
2003            .iter()
2004            .map(|c| c.column_id)
2005            .collect();
2006        let ranges = memtable
2007            .ranges(Some(&read_column_ids), RangesOptions::default())
2008            .unwrap();
2009        assert_eq!(1, ranges.ranges.len());
2010
2011        let time_range = (Timestamp::new_millisecond(3), Timestamp::new_millisecond(7));
2012
2013        let range = ranges.ranges.into_values().next().unwrap();
2014        let mut iter = range
2015            .build_record_batch_iter(Some(time_range), None)
2016            .unwrap();
2017
2018        let mut total_rows = 0;
2019        let mut all_timestamps = Vec::new();
2020        while let Some(rb) = iter.next().transpose().unwrap() {
2021            total_rows += rb.num_rows();
2022            let ts_col = rb
2023                .column_by_name("ts")
2024                .unwrap()
2025                .as_any()
2026                .downcast_ref::<datatypes::arrow::array::TimestampMillisecondArray>()
2027                .unwrap();
2028            for i in 0..ts_col.len() {
2029                all_timestamps.push(ts_col.value(i));
2030            }
2031        }
2032        assert_eq!(5, total_rows);
2033        all_timestamps.sort();
2034        assert_eq!(vec![3, 4, 5, 6, 7], all_timestamps);
2035    }
2036
2037    /// Helper to create a TimeSeriesIterBuilder from a memtable and schema.
2038    fn build_iter_builder(
2039        schema: &RegionMetadataRef,
2040        memtable: &TimeSeriesMemtable,
2041        projection: Option<&[ColumnId]>,
2042        dedup: bool,
2043        merge_mode: MergeMode,
2044        sequence: Option<SequenceRange>,
2045    ) -> TimeSeriesIterBuilder {
2046        let read_column_ids = read_column_ids_from_projection(schema, projection);
2047        let field_projection = if let Some(projection) = projection {
2048            projection.iter().copied().collect()
2049        } else {
2050            schema.field_columns().map(|c| c.column_id).collect()
2051        };
2052        let adapter_context = Arc::new(BatchToRecordBatchContext::new(
2053            schema.clone(),
2054            read_column_ids,
2055        ));
2056        TimeSeriesIterBuilder {
2057            series_set: memtable.series_set.clone(),
2058            projection: field_projection,
2059            predicate: PredicateGroup::default(),
2060            dedup,
2061            merge_mode,
2062            sequence,
2063            batch_to_record_batch: adapter_context,
2064        }
2065    }
2066
2067    #[test]
2068    fn test_iter_builder_build_record_batch_basic() {
2069        let schema = schema_for_test();
2070        let memtable = TimeSeriesMemtable::new(schema.clone(), 1, None, true, MergeMode::LastRow);
2071
2072        let kvs = build_key_values(&schema, "hello".to_string(), 42, 10);
2073        memtable.write(&kvs).unwrap();
2074
2075        let builder = build_iter_builder(&schema, &memtable, None, true, MergeMode::LastRow, None);
2076
2077        let mut iter = builder.build_record_batch(None, None).unwrap();
2078        let rb = iter.next().transpose().unwrap().unwrap();
2079        assert_eq!(10, rb.num_rows());
2080
2081        let rb_schema = rb.schema();
2082        let col_names: Vec<_> = rb_schema
2083            .fields()
2084            .iter()
2085            .map(|f| f.name().as_str())
2086            .collect();
2087        assert_eq!(
2088            col_names,
2089            vec![
2090                "k0",
2091                "k1",
2092                "v0",
2093                "v1",
2094                "ts",
2095                "__primary_key",
2096                "__sequence",
2097                "__op_type",
2098            ]
2099        );
2100
2101        assert!(iter.next().is_none());
2102    }
2103
2104    #[test]
2105    fn test_iter_builder_build_record_batch_with_projection() {
2106        let schema = schema_for_test();
2107        let memtable = TimeSeriesMemtable::new(schema.clone(), 1, None, true, MergeMode::LastRow);
2108
2109        let kvs = build_key_values(&schema, "test".to_string(), 1, 5);
2110        memtable.write(&kvs).unwrap();
2111
2112        // Project only field v0 (column_id=3) and ts (column_id=2).
2113        let projection = vec![2, 3];
2114        let builder = build_iter_builder(
2115            &schema,
2116            &memtable,
2117            Some(&projection),
2118            true,
2119            MergeMode::LastRow,
2120            None,
2121        );
2122
2123        let mut iter = builder.build_record_batch(None, None).unwrap();
2124        let rb = iter.next().transpose().unwrap().unwrap();
2125        assert_eq!(5, rb.num_rows());
2126
2127        let rb_schema = rb.schema();
2128        let col_names: Vec<_> = rb_schema
2129            .fields()
2130            .iter()
2131            .map(|f| f.name().as_str())
2132            .collect();
2133        // Only projected columns + internal columns.
2134        assert_eq!(
2135            col_names,
2136            vec!["v0", "ts", "__primary_key", "__sequence", "__op_type",]
2137        );
2138
2139        assert!(iter.next().is_none());
2140    }
2141
2142    #[test]
2143    fn test_iter_builder_build_record_batch_multiple_series() {
2144        let schema = schema_for_test();
2145        let memtable = TimeSeriesMemtable::new(schema.clone(), 1, None, true, MergeMode::LastRow);
2146
2147        let kvs_a = build_key_values(&schema, "aaa".to_string(), 1, 3);
2148        let kvs_b = build_key_values(&schema, "bbb".to_string(), 2, 4);
2149        memtable.write(&kvs_a).unwrap();
2150        memtable.write(&kvs_b).unwrap();
2151
2152        let builder = build_iter_builder(&schema, &memtable, None, true, MergeMode::LastRow, None);
2153
2154        let iter = builder.build_record_batch(None, None).unwrap();
2155        let mut total_rows = 0;
2156        for rb in iter {
2157            let rb = rb.unwrap();
2158            total_rows += rb.num_rows();
2159            assert_eq!(8, rb.num_columns());
2160        }
2161        assert_eq!(7, total_rows);
2162    }
2163
2164    #[test]
2165    fn test_iter_builder_build_record_batch_dedup() {
2166        let schema = schema_for_test();
2167        let memtable = TimeSeriesMemtable::new(schema.clone(), 1, None, true, MergeMode::LastRow);
2168
2169        // Write same data twice — dedup should keep only one copy per timestamp.
2170        let kvs = build_key_values(&schema, "dup".to_string(), 10, 5);
2171        memtable.write(&kvs).unwrap();
2172        memtable.write(&kvs).unwrap();
2173
2174        let builder = build_iter_builder(&schema, &memtable, None, true, MergeMode::LastRow, None);
2175
2176        let iter = builder.build_record_batch(None, None).unwrap();
2177        let total_rows: usize = iter.map(|rb| rb.unwrap().num_rows()).sum();
2178        assert_eq!(5, total_rows);
2179    }
2180
2181    #[test]
2182    fn test_iter_builder_build_record_batch_no_dedup() {
2183        let schema = schema_for_test();
2184        let memtable = TimeSeriesMemtable::new(schema.clone(), 1, None, false, MergeMode::LastRow);
2185
2186        let kvs = build_key_values(&schema, "dup".to_string(), 10, 5);
2187        memtable.write(&kvs).unwrap();
2188        memtable.write(&kvs).unwrap();
2189
2190        let builder = build_iter_builder(&schema, &memtable, None, false, MergeMode::LastRow, None);
2191
2192        let iter = builder.build_record_batch(None, None).unwrap();
2193        let total_rows: usize = iter.map(|rb| rb.unwrap().num_rows()).sum();
2194        assert_eq!(10, total_rows);
2195    }
2196
2197    #[test]
2198    fn test_iter_builder_build_record_batch_with_sequence_filter() {
2199        let schema = schema_for_test();
2200        let memtable = TimeSeriesMemtable::new(schema.clone(), 1, None, true, MergeMode::LastRow);
2201
2202        // build_key_values creates a mutation with base sequence=0.
2203        // Each row gets sequence = base + row_index, so 5 rows get sequences 0,1,2,3,4.
2204        let kvs = build_key_values(&schema, "seq".to_string(), 1, 5);
2205        memtable.write(&kvs).unwrap();
2206
2207        // Filter to sequence > 4 — should yield no rows.
2208        let builder = build_iter_builder(
2209            &schema,
2210            &memtable,
2211            None,
2212            true,
2213            MergeMode::LastRow,
2214            Some(SequenceRange::Gt { min: 4 }),
2215        );
2216
2217        let iter = builder.build_record_batch(None, None).unwrap();
2218        let total_rows: usize = iter.map(|rb| rb.unwrap().num_rows()).sum();
2219        assert_eq!(0, total_rows);
2220
2221        // Filter to sequence <= 2 — should yield 3 rows (sequences 0, 1, 2).
2222        let builder = build_iter_builder(
2223            &schema,
2224            &memtable,
2225            None,
2226            true,
2227            MergeMode::LastRow,
2228            Some(SequenceRange::LtEq { max: 2 }),
2229        );
2230
2231        let iter = builder.build_record_batch(None, None).unwrap();
2232        let total_rows: usize = iter.map(|rb| rb.unwrap().num_rows()).sum();
2233        assert_eq!(3, total_rows);
2234    }
2235
2236    #[test]
2237    fn test_iter_builder_build_record_batch_data_correctness() {
2238        use datatypes::arrow::array::{
2239            Float64Array, Int64Array, TimestampMillisecondArray, UInt8Array,
2240        };
2241
2242        let schema = schema_for_test();
2243        let memtable = TimeSeriesMemtable::new(schema.clone(), 1, None, true, MergeMode::LastRow);
2244
2245        let kvs = build_key_values(&schema, "check".to_string(), 7, 3);
2246        memtable.write(&kvs).unwrap();
2247
2248        let builder = build_iter_builder(&schema, &memtable, None, true, MergeMode::LastRow, None);
2249
2250        let mut iter = builder.build_record_batch(None, None).unwrap();
2251        let rb = iter.next().transpose().unwrap().unwrap();
2252        assert_eq!(3, rb.num_rows());
2253
2254        // Verify timestamp values.
2255        let ts_col = rb
2256            .column_by_name("ts")
2257            .unwrap()
2258            .as_any()
2259            .downcast_ref::<TimestampMillisecondArray>()
2260            .unwrap();
2261        let timestamps: Vec<_> = (0..ts_col.len()).map(|i| ts_col.value(i)).collect();
2262        assert_eq!(vec![0, 1, 2], timestamps);
2263
2264        // Verify field v0 values.
2265        let v0_col = rb
2266            .column_by_name("v0")
2267            .unwrap()
2268            .as_any()
2269            .downcast_ref::<Int64Array>()
2270            .unwrap();
2271        let v0_values: Vec<_> = (0..v0_col.len()).map(|i| v0_col.value(i)).collect();
2272        assert_eq!(vec![0, 1, 2], v0_values);
2273
2274        // Verify field v1 values.
2275        let v1_col = rb
2276            .column_by_name("v1")
2277            .unwrap()
2278            .as_any()
2279            .downcast_ref::<Float64Array>()
2280            .unwrap();
2281        let v1_values: Vec<_> = (0..v1_col.len()).map(|i| v1_col.value(i)).collect();
2282        assert_eq!(vec![0.0, 1.0, 2.0], v1_values);
2283
2284        // Verify op_type is all Put (1).
2285        let op_col = rb
2286            .column_by_name("__op_type")
2287            .unwrap()
2288            .as_any()
2289            .downcast_ref::<UInt8Array>()
2290            .unwrap();
2291        for i in 0..op_col.len() {
2292            assert_eq!(OpType::Put as u8, op_col.value(i));
2293        }
2294
2295        assert!(iter.next().is_none());
2296    }
2297}