Skip to main content

mito2/memtable/
time_series.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::btree_map::Entry;
16use std::collections::{BTreeMap, Bound, HashSet};
17use std::fmt::{Debug, Formatter};
18use std::iter;
19use std::sync::atomic::{AtomicI64, AtomicU64, AtomicUsize, Ordering};
20use std::sync::{Arc, RwLock};
21use std::time::{Duration, Instant};
22
23use api::v1::OpType;
24use common_recordbatch::filter::SimpleFilterEvaluator;
25use common_telemetry::{debug, error};
26use common_time::Timestamp;
27use datatypes::arrow;
28use datatypes::arrow::array::ArrayRef;
29use datatypes::arrow_array::StringArray;
30use datatypes::data_type::{ConcreteDataType, DataType};
31use datatypes::prelude::{ScalarVector, Vector, VectorRef};
32use datatypes::types::TimestampType;
33use datatypes::value::{Value, ValueRef};
34use datatypes::vectors::{
35    Helper, TimestampMicrosecondVector, TimestampMillisecondVector, TimestampNanosecondVector,
36    TimestampSecondVector, UInt8Vector, UInt64Vector,
37};
38use mito_codec::key_values::KeyValue;
39use mito_codec::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt};
40use snafu::{OptionExt, ResultExt, ensure};
41use store_api::metadata::RegionMetadataRef;
42use store_api::storage::{ColumnId, SequenceRange};
43use table::predicate::Predicate;
44
45use crate::error::{
46    self, ComputeArrowSnafu, ConvertVectorSnafu, EncodeSnafu, PrimaryKeyLengthMismatchSnafu, Result,
47};
48use crate::flush::WriteBufferManagerRef;
49use crate::memtable::builder::{FieldBuilder, StringBuilder};
50use crate::memtable::bulk::part::BulkPart;
51use crate::memtable::simple_bulk_memtable::SimpleBulkMemtable;
52use crate::memtable::stats::WriteMetrics;
53use crate::memtable::{
54    AllocTracker, BatchToRecordBatchContext, BoxedBatchIterator, BoxedRecordBatchIterator,
55    IterBuilder, KeyValues, MemScanMetrics, Memtable, MemtableBuilder, MemtableId, MemtableRange,
56    MemtableRangeContext, MemtableRanges, MemtableRef, MemtableStats, RangesOptions,
57    read_column_ids_from_projection,
58};
59use crate::metrics::{
60    MEMTABLE_ACTIVE_FIELD_BUILDER_COUNT, MEMTABLE_ACTIVE_SERIES_COUNT, READ_ROWS_TOTAL,
61    READ_STAGE_ELAPSED,
62};
63use crate::read::dedup::LastNonNullIter;
64use crate::read::prune::PruneTimeIterator;
65use crate::read::scan_region::PredicateGroup;
66use crate::read::{Batch, BatchBuilder, BatchColumn};
67use crate::region::options::MergeMode;
68
69/// Initial vector builder capacity.
70const INITIAL_BUILDER_CAPACITY: usize = 4;
71
72/// Vector builder capacity.
73const BUILDER_CAPACITY: usize = 512;
74
75/// Builder to build [TimeSeriesMemtable].
76#[derive(Debug, Default)]
77pub struct TimeSeriesMemtableBuilder {
78    write_buffer_manager: Option<WriteBufferManagerRef>,
79    dedup: bool,
80    merge_mode: MergeMode,
81}
82
83impl TimeSeriesMemtableBuilder {
84    /// Creates a new builder with specific `write_buffer_manager`.
85    pub fn new(
86        write_buffer_manager: Option<WriteBufferManagerRef>,
87        dedup: bool,
88        merge_mode: MergeMode,
89    ) -> Self {
90        Self {
91            write_buffer_manager,
92            dedup,
93            merge_mode,
94        }
95    }
96}
97
98impl MemtableBuilder for TimeSeriesMemtableBuilder {
99    fn build(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef {
100        if metadata.primary_key.is_empty() {
101            Arc::new(SimpleBulkMemtable::new(
102                id,
103                metadata.clone(),
104                self.write_buffer_manager.clone(),
105                self.dedup,
106                self.merge_mode,
107            ))
108        } else {
109            Arc::new(TimeSeriesMemtable::new(
110                metadata.clone(),
111                id,
112                self.write_buffer_manager.clone(),
113                self.dedup,
114                self.merge_mode,
115            ))
116        }
117    }
118
119    fn use_bulk_insert(&self, _metadata: &RegionMetadataRef) -> bool {
120        // Now if we can use simple bulk memtable, the input request is already
121        // a bulk write request and won't call this method.
122        false
123    }
124}
125
126/// Memtable implementation that groups rows by their primary key.
127pub struct TimeSeriesMemtable {
128    id: MemtableId,
129    region_metadata: RegionMetadataRef,
130    row_codec: Arc<DensePrimaryKeyCodec>,
131    series_set: SeriesSet,
132    alloc_tracker: AllocTracker,
133    max_timestamp: AtomicI64,
134    min_timestamp: AtomicI64,
135    max_sequence: AtomicU64,
136    dedup: bool,
137    merge_mode: MergeMode,
138    /// Total written rows in memtable. This also includes deleted and duplicated rows.
139    num_rows: AtomicUsize,
140}
141
142impl TimeSeriesMemtable {
143    pub fn new(
144        region_metadata: RegionMetadataRef,
145        id: MemtableId,
146        write_buffer_manager: Option<WriteBufferManagerRef>,
147        dedup: bool,
148        merge_mode: MergeMode,
149    ) -> Self {
150        let row_codec = Arc::new(DensePrimaryKeyCodec::new(&region_metadata));
151        let series_set = SeriesSet::new(region_metadata.clone(), row_codec.clone());
152        Self {
153            id,
154            region_metadata,
155            series_set,
156            row_codec,
157            alloc_tracker: AllocTracker::new(write_buffer_manager),
158            max_timestamp: AtomicI64::new(i64::MIN),
159            min_timestamp: AtomicI64::new(i64::MAX),
160            max_sequence: AtomicU64::new(0),
161            dedup,
162            merge_mode,
163            num_rows: Default::default(),
164        }
165    }
166
167    /// Updates memtable stats.
168    fn update_stats(&self, stats: WriteMetrics) {
169        self.alloc_tracker
170            .on_allocation(stats.key_bytes + stats.value_bytes);
171        self.max_timestamp.fetch_max(stats.max_ts, Ordering::SeqCst);
172        self.min_timestamp.fetch_min(stats.min_ts, Ordering::SeqCst);
173        self.max_sequence
174            .fetch_max(stats.max_sequence, Ordering::SeqCst);
175        self.num_rows.fetch_add(stats.num_rows, Ordering::SeqCst);
176    }
177
178    fn write_key_value(&self, kv: KeyValue, stats: &mut WriteMetrics) -> Result<()> {
179        ensure!(
180            self.row_codec.num_fields() == kv.num_primary_keys(),
181            PrimaryKeyLengthMismatchSnafu {
182                expect: self.row_codec.num_fields(),
183                actual: kv.num_primary_keys(),
184            }
185        );
186
187        let primary_key_encoded = self
188            .row_codec
189            .encode(kv.primary_keys())
190            .context(EncodeSnafu)?;
191
192        let (key_allocated, value_allocated) =
193            self.series_set.push_to_series(primary_key_encoded, &kv);
194        stats.key_bytes += key_allocated;
195        stats.value_bytes += value_allocated;
196
197        // safety: timestamp of kv must be both present and a valid timestamp value.
198        let ts = kv
199            .timestamp()
200            .try_into_timestamp()
201            .unwrap()
202            .unwrap()
203            .value();
204        stats.min_ts = stats.min_ts.min(ts);
205        stats.max_ts = stats.max_ts.max(ts);
206        Ok(())
207    }
208}
209
210impl Debug for TimeSeriesMemtable {
211    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
212        f.debug_struct("TimeSeriesMemtable").finish()
213    }
214}
215
216impl Memtable for TimeSeriesMemtable {
217    fn id(&self) -> MemtableId {
218        self.id
219    }
220
221    fn write(&self, kvs: &KeyValues) -> Result<()> {
222        if kvs.is_empty() {
223            return Ok(());
224        }
225
226        let mut local_stats = WriteMetrics::default();
227
228        for kv in kvs.iter() {
229            self.write_key_value(kv, &mut local_stats)?;
230        }
231        local_stats.value_bytes += kvs.num_rows() * std::mem::size_of::<Timestamp>();
232        local_stats.value_bytes += kvs.num_rows() * std::mem::size_of::<OpType>();
233        local_stats.max_sequence = kvs.max_sequence();
234        local_stats.num_rows = kvs.num_rows();
235        // TODO(hl): this maybe inaccurate since for-iteration may return early.
236        // We may lift the primary key length check out of Memtable::write
237        // so that we can ensure writing to memtable will succeed.
238        self.update_stats(local_stats);
239        Ok(())
240    }
241
242    fn write_one(&self, key_value: KeyValue) -> Result<()> {
243        let mut metrics = WriteMetrics::default();
244        let res = self.write_key_value(key_value, &mut metrics);
245        metrics.value_bytes += std::mem::size_of::<Timestamp>() + std::mem::size_of::<OpType>();
246        metrics.max_sequence = key_value.sequence();
247        metrics.num_rows = 1;
248
249        if res.is_ok() {
250            self.update_stats(metrics);
251        }
252        res
253    }
254
255    fn write_bulk(&self, part: BulkPart) -> Result<()> {
256        // Default implementation fallback to row iteration.
257        let mutation = part.to_mutation(&self.region_metadata)?;
258        let mut metrics = WriteMetrics::default();
259        if let Some(key_values) = KeyValues::new(&self.region_metadata, mutation) {
260            for kv in key_values.iter() {
261                self.write_key_value(kv, &mut metrics)?
262            }
263        }
264
265        metrics.max_sequence = part.sequence;
266        metrics.max_ts = part.max_timestamp;
267        metrics.min_ts = part.min_timestamp;
268        metrics.num_rows = part.num_rows();
269        self.update_stats(metrics);
270        Ok(())
271    }
272
273    fn ranges(
274        &self,
275        projection: Option<&[ColumnId]>,
276        options: RangesOptions,
277    ) -> Result<MemtableRanges> {
278        let predicate = options.predicate;
279        let sequence = options.sequence;
280        let read_column_ids = read_column_ids_from_projection(&self.region_metadata, projection);
281        let projection = if let Some(projection) = projection {
282            projection.iter().copied().collect()
283        } else {
284            self.region_metadata
285                .field_columns()
286                .map(|c| c.column_id)
287                .collect()
288        };
289        let batch_to_record_batch = Arc::new(BatchToRecordBatchContext::new(
290            self.region_metadata.clone(),
291            read_column_ids,
292        ));
293        let builder = Box::new(TimeSeriesIterBuilder {
294            series_set: self.series_set.clone(),
295            projection,
296            predicate: predicate.clone(),
297            dedup: self.dedup,
298            merge_mode: self.merge_mode,
299            sequence,
300            batch_to_record_batch,
301        });
302        let context = Arc::new(MemtableRangeContext::new(self.id, builder, predicate));
303        let range_stats = self.stats();
304        let range = MemtableRange::new(context, range_stats);
305        Ok(MemtableRanges {
306            ranges: [(0, range)].into(),
307        })
308    }
309
310    fn is_empty(&self) -> bool {
311        self.series_set.series.read().unwrap().0.is_empty()
312    }
313
314    fn freeze(&self) -> Result<()> {
315        self.alloc_tracker.done_allocating();
316
317        Ok(())
318    }
319
320    fn stats(&self) -> MemtableStats {
321        let estimated_bytes = self.alloc_tracker.bytes_allocated();
322
323        if estimated_bytes == 0 {
324            // no rows ever written
325            return MemtableStats {
326                estimated_bytes,
327                time_range: None,
328                num_rows: 0,
329                num_ranges: 0,
330                max_sequence: 0,
331                series_count: 0,
332            };
333        }
334        let ts_type = self
335            .region_metadata
336            .time_index_column()
337            .column_schema
338            .data_type
339            .clone()
340            .as_timestamp()
341            .expect("Timestamp column must have timestamp type");
342        let max_timestamp = ts_type.create_timestamp(self.max_timestamp.load(Ordering::Relaxed));
343        let min_timestamp = ts_type.create_timestamp(self.min_timestamp.load(Ordering::Relaxed));
344        let series_count = self.series_set.series.read().unwrap().0.len();
345        MemtableStats {
346            estimated_bytes,
347            time_range: Some((min_timestamp, max_timestamp)),
348            num_rows: self.num_rows.load(Ordering::Relaxed),
349            num_ranges: 1,
350            max_sequence: self.max_sequence.load(Ordering::Relaxed),
351            series_count,
352        }
353    }
354
355    fn fork(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef {
356        Arc::new(TimeSeriesMemtable::new(
357            metadata.clone(),
358            id,
359            self.alloc_tracker.write_buffer_manager(),
360            self.dedup,
361            self.merge_mode,
362        ))
363    }
364}
365
366#[derive(Default)]
367struct SeriesMap(BTreeMap<Vec<u8>, Arc<RwLock<Series>>>);
368
369impl Drop for SeriesMap {
370    fn drop(&mut self) {
371        let num_series = self.0.len();
372        let num_field_builders = self
373            .0
374            .values()
375            .map(|v| v.read().unwrap().active.num_field_builders())
376            .sum::<usize>();
377        MEMTABLE_ACTIVE_SERIES_COUNT.sub(num_series as i64);
378        MEMTABLE_ACTIVE_FIELD_BUILDER_COUNT.sub(num_field_builders as i64);
379    }
380}
381
382#[derive(Clone)]
383pub(crate) struct SeriesSet {
384    region_metadata: RegionMetadataRef,
385    series: Arc<RwLock<SeriesMap>>,
386    codec: Arc<DensePrimaryKeyCodec>,
387}
388
389impl SeriesSet {
390    fn new(region_metadata: RegionMetadataRef, codec: Arc<DensePrimaryKeyCodec>) -> Self {
391        Self {
392            region_metadata,
393            series: Default::default(),
394            codec,
395        }
396    }
397}
398
399impl SeriesSet {
400    /// Push [KeyValue] to SeriesSet with given primary key and return key/value allocated memory size.
401    fn push_to_series(&self, primary_key: Vec<u8>, kv: &KeyValue) -> (usize, usize) {
402        if let Some(series) = self.series.read().unwrap().0.get(&primary_key) {
403            let value_allocated = series.write().unwrap().push(
404                kv.timestamp(),
405                kv.sequence(),
406                kv.op_type(),
407                kv.fields(),
408            );
409            return (0, value_allocated);
410        };
411
412        let mut indices = self.series.write().unwrap();
413        match indices.0.entry(primary_key) {
414            Entry::Vacant(v) => {
415                let key_len = v.key().len();
416                let mut series = Series::new(&self.region_metadata);
417                let value_allocated =
418                    series.push(kv.timestamp(), kv.sequence(), kv.op_type(), kv.fields());
419                v.insert(Arc::new(RwLock::new(series)));
420                (key_len, value_allocated)
421            }
422            // safety: series must exist at given index.
423            Entry::Occupied(v) => {
424                let value_allocated = v.get().write().unwrap().push(
425                    kv.timestamp(),
426                    kv.sequence(),
427                    kv.op_type(),
428                    kv.fields(),
429                );
430                (0, value_allocated)
431            }
432        }
433    }
434
435    #[cfg(test)]
436    fn get_series(&self, primary_key: &[u8]) -> Option<Arc<RwLock<Series>>> {
437        self.series.read().unwrap().0.get(primary_key).cloned()
438    }
439
440    /// Iterates all series in [SeriesSet].
441    fn iter_series(
442        &self,
443        projection: HashSet<ColumnId>,
444        predicate: PredicateGroup,
445        dedup: bool,
446        merge_mode: MergeMode,
447        sequence: Option<SequenceRange>,
448        mem_scan_metrics: Option<MemScanMetrics>,
449    ) -> Result<Iter> {
450        let primary_key_schema = primary_key_schema(&self.region_metadata);
451        let primary_key_datatypes = self
452            .region_metadata
453            .primary_key_columns()
454            .map(|pk| pk.column_schema.data_type.clone())
455            .collect();
456
457        Iter::try_new(
458            self.region_metadata.clone(),
459            self.series.clone(),
460            projection,
461            predicate.predicate().cloned(),
462            primary_key_schema,
463            primary_key_datatypes,
464            self.codec.clone(),
465            dedup,
466            merge_mode,
467            sequence,
468            mem_scan_metrics,
469        )
470    }
471}
472
473/// Creates an arrow [SchemaRef](arrow::datatypes::SchemaRef) that only contains primary keys
474/// of given region schema
475pub(crate) fn primary_key_schema(
476    region_metadata: &RegionMetadataRef,
477) -> arrow::datatypes::SchemaRef {
478    let fields = region_metadata
479        .primary_key_columns()
480        .map(|pk| {
481            arrow::datatypes::Field::new(
482                pk.column_schema.name.clone(),
483                pk.column_schema.data_type.as_arrow_type(),
484                pk.column_schema.is_nullable(),
485            )
486        })
487        .collect::<Vec<_>>();
488    Arc::new(arrow::datatypes::Schema::new(fields))
489}
490
491/// Metrics for reading the memtable.
492#[derive(Debug, Default)]
493struct Metrics {
494    /// Total series in the memtable.
495    total_series: usize,
496    /// Number of series pruned.
497    num_pruned_series: usize,
498    /// Number of rows read.
499    num_rows: usize,
500    /// Number of batch read.
501    num_batches: usize,
502    /// Duration to scan the memtable.
503    scan_cost: Duration,
504}
505
506struct Iter {
507    metadata: RegionMetadataRef,
508    series: Arc<RwLock<SeriesMap>>,
509    projection: HashSet<ColumnId>,
510    last_key: Option<Vec<u8>>,
511    predicate: Vec<SimpleFilterEvaluator>,
512    pk_schema: arrow::datatypes::SchemaRef,
513    pk_datatypes: Vec<ConcreteDataType>,
514    codec: Arc<DensePrimaryKeyCodec>,
515    dedup: bool,
516    merge_mode: MergeMode,
517    sequence: Option<SequenceRange>,
518    metrics: Metrics,
519    mem_scan_metrics: Option<MemScanMetrics>,
520}
521
522impl Iter {
523    #[allow(clippy::too_many_arguments)]
524    pub(crate) fn try_new(
525        metadata: RegionMetadataRef,
526        series: Arc<RwLock<SeriesMap>>,
527        projection: HashSet<ColumnId>,
528        predicate: Option<Predicate>,
529        pk_schema: arrow::datatypes::SchemaRef,
530        pk_datatypes: Vec<ConcreteDataType>,
531        codec: Arc<DensePrimaryKeyCodec>,
532        dedup: bool,
533        merge_mode: MergeMode,
534        sequence: Option<SequenceRange>,
535        mem_scan_metrics: Option<MemScanMetrics>,
536    ) -> Result<Self> {
537        let predicate = predicate
538            .map(|predicate| {
539                predicate
540                    .exprs()
541                    .iter()
542                    .filter_map(SimpleFilterEvaluator::try_new)
543                    .collect::<Vec<_>>()
544            })
545            .unwrap_or_default();
546        Ok(Self {
547            metadata,
548            series,
549            projection,
550            last_key: None,
551            predicate,
552            pk_schema,
553            pk_datatypes,
554            codec,
555            dedup,
556            merge_mode,
557            sequence,
558            metrics: Metrics::default(),
559            mem_scan_metrics,
560        })
561    }
562
563    fn report_mem_scan_metrics(&mut self) {
564        if let Some(mem_scan_metrics) = self.mem_scan_metrics.take() {
565            let inner = crate::memtable::MemScanMetricsData {
566                total_series: self.metrics.total_series,
567                num_rows: self.metrics.num_rows,
568                num_batches: self.metrics.num_batches,
569                scan_cost: self.metrics.scan_cost,
570                ..Default::default()
571            };
572            mem_scan_metrics.merge_inner(&inner);
573        }
574    }
575}
576
577impl Drop for Iter {
578    fn drop(&mut self) {
579        debug!(
580            "Iter {} time series memtable, metrics: {:?}",
581            self.metadata.region_id, self.metrics
582        );
583
584        // Report MemScanMetrics if not already reported
585        self.report_mem_scan_metrics();
586
587        READ_ROWS_TOTAL
588            .with_label_values(&["time_series_memtable"])
589            .inc_by(self.metrics.num_rows as u64);
590        READ_STAGE_ELAPSED
591            .with_label_values(&["scan_memtable"])
592            .observe(self.metrics.scan_cost.as_secs_f64());
593    }
594}
595
596impl Iterator for Iter {
597    type Item = Result<Batch>;
598
599    fn next(&mut self) -> Option<Self::Item> {
600        let start = Instant::now();
601        let map = self.series.read().unwrap();
602        let range = match &self.last_key {
603            None => map.0.range::<Vec<u8>, _>(..),
604            Some(last_key) => map
605                .0
606                .range::<Vec<u8>, _>((Bound::Excluded(last_key), Bound::Unbounded)),
607        };
608
609        // TODO(hl): maybe yield more than one time series to amortize range overhead.
610        for (primary_key, series) in range {
611            self.metrics.total_series += 1;
612
613            let mut series = series.write().unwrap();
614            if !self.predicate.is_empty()
615                && !prune_primary_key(
616                    &self.codec,
617                    primary_key.as_slice(),
618                    &mut series,
619                    &self.pk_datatypes,
620                    self.pk_schema.clone(),
621                    &self.predicate,
622                )
623            {
624                // read next series
625                self.metrics.num_pruned_series += 1;
626                continue;
627            }
628            self.last_key = Some(primary_key.clone());
629
630            let values = series.compact(&self.metadata);
631            let batch = values.and_then(|v| {
632                v.to_batch(
633                    primary_key,
634                    &self.metadata,
635                    &self.projection,
636                    self.sequence,
637                    self.dedup,
638                    self.merge_mode,
639                )
640            });
641
642            // Update metrics.
643            self.metrics.num_batches += 1;
644            self.metrics.num_rows += batch.as_ref().map(|b| b.num_rows()).unwrap_or(0);
645            self.metrics.scan_cost += start.elapsed();
646
647            return Some(batch);
648        }
649        drop(map); // Explicitly drop the read lock
650        self.metrics.scan_cost += start.elapsed();
651
652        // Report MemScanMetrics before returning None
653        self.report_mem_scan_metrics();
654
655        None
656    }
657}
658
659fn prune_primary_key(
660    codec: &Arc<DensePrimaryKeyCodec>,
661    pk: &[u8],
662    series: &mut Series,
663    datatypes: &[ConcreteDataType],
664    pk_schema: arrow::datatypes::SchemaRef,
665    predicates: &[SimpleFilterEvaluator],
666) -> bool {
667    // no primary key, we simply return true.
668    if pk_schema.fields().is_empty() {
669        return true;
670    }
671
672    // retrieve primary key values from cache or decode from bytes.
673    let pk_values = if let Some(pk_values) = series.pk_cache.as_ref() {
674        pk_values
675    } else {
676        let pk_values = codec.decode_dense_without_column_id(pk);
677        if let Err(e) = pk_values {
678            error!(e; "Failed to decode primary key");
679            return true;
680        }
681        series.update_pk_cache(pk_values.unwrap());
682        series.pk_cache.as_ref().unwrap()
683    };
684
685    // evaluate predicates against primary key values
686    let mut result = true;
687    for predicate in predicates {
688        // ignore predicates that are not referencing primary key columns
689        let Ok(index) = pk_schema.index_of(predicate.column_name()) else {
690            continue;
691        };
692        // Safety: arrow schema and datatypes are constructed from the same source.
693        let scalar_value = pk_values[index]
694            .try_to_scalar_value(&datatypes[index])
695            .unwrap();
696        result &= predicate.evaluate_scalar(&scalar_value).unwrap_or(true);
697    }
698
699    result
700}
701
702/// A `Series` holds a list of field values of some given primary key.
703pub struct Series {
704    pk_cache: Option<Vec<Value>>,
705    active: ValueBuilder,
706    frozen: Vec<Values>,
707    region_metadata: RegionMetadataRef,
708    capacity: usize,
709}
710
711impl Series {
712    pub(crate) fn with_capacity(
713        region_metadata: &RegionMetadataRef,
714        init_capacity: usize,
715        capacity: usize,
716    ) -> Self {
717        MEMTABLE_ACTIVE_SERIES_COUNT.inc();
718        Self {
719            pk_cache: None,
720            active: ValueBuilder::new(region_metadata, init_capacity),
721            frozen: vec![],
722            region_metadata: region_metadata.clone(),
723            capacity,
724        }
725    }
726
727    pub(crate) fn new(region_metadata: &RegionMetadataRef) -> Self {
728        Self::with_capacity(region_metadata, INITIAL_BUILDER_CAPACITY, BUILDER_CAPACITY)
729    }
730
731    pub fn is_empty(&self) -> bool {
732        self.active.len() == 0 && self.frozen.is_empty()
733    }
734
735    /// Pushes a row of values into Series. Return the size of values.
736    pub(crate) fn push<'a>(
737        &mut self,
738        ts: ValueRef<'a>,
739        sequence: u64,
740        op_type: OpType,
741        values: impl Iterator<Item = ValueRef<'a>>,
742    ) -> usize {
743        // + 10 to avoid potential reallocation.
744        if self.active.len() + 10 > self.capacity {
745            let region_metadata = self.region_metadata.clone();
746            self.freeze(&region_metadata);
747        }
748        self.active.push(ts, sequence, op_type as u8, values)
749    }
750
751    fn update_pk_cache(&mut self, pk_values: Vec<Value>) {
752        self.pk_cache = Some(pk_values);
753    }
754
755    /// Freezes the active part and push it to `frozen`.
756    pub(crate) fn freeze(&mut self, region_metadata: &RegionMetadataRef) {
757        if self.active.len() != 0 {
758            let mut builder = ValueBuilder::new(region_metadata, INITIAL_BUILDER_CAPACITY);
759            std::mem::swap(&mut self.active, &mut builder);
760            self.frozen.push(Values::from(builder));
761        }
762    }
763
764    pub(crate) fn extend(
765        &mut self,
766        ts_v: VectorRef,
767        op_type_v: u8,
768        sequence_v: u64,
769        fields: Vec<VectorRef>,
770    ) -> Result<()> {
771        if !self.active.can_accommodate(&fields)? {
772            let region_metadata = self.region_metadata.clone();
773            self.freeze(&region_metadata);
774        }
775        self.active.extend(ts_v, op_type_v, sequence_v, fields)
776    }
777
778    /// Freezes active part to frozen part and compact frozen part to reduce memory fragmentation.
779    /// Returns the frozen and compacted values.
780    pub(crate) fn compact(&mut self, region_metadata: &RegionMetadataRef) -> Result<&Values> {
781        self.freeze(region_metadata);
782
783        let frozen = &self.frozen;
784
785        // Each series must contain at least one row
786        debug_assert!(!frozen.is_empty());
787
788        if frozen.len() > 1 {
789            // TODO(hl): We should keep track of min/max timestamps for each values and avoid
790            // cloning and sorting when values do not overlap with each other.
791
792            let column_size = frozen[0].fields.len() + 3;
793
794            if cfg!(debug_assertions) {
795                debug_assert!(
796                    frozen
797                        .iter()
798                        .zip(frozen.iter().skip(1))
799                        .all(|(prev, next)| { prev.fields.len() == next.fields.len() })
800                );
801            }
802
803            let arrays = frozen.iter().map(|v| v.columns()).collect::<Vec<_>>();
804            let concatenated = (0..column_size)
805                .map(|i| {
806                    let to_concat = arrays.iter().map(|a| a[i].as_ref()).collect::<Vec<_>>();
807                    arrow::compute::concat(&to_concat)
808                })
809                .collect::<std::result::Result<Vec<_>, _>>()
810                .context(ComputeArrowSnafu)?;
811
812            debug_assert_eq!(concatenated.len(), column_size);
813            let values = Values::from_columns(&concatenated)?;
814            self.frozen = vec![values];
815        };
816        Ok(&self.frozen[0])
817    }
818
819    pub fn read_to_values(&self) -> Vec<Values> {
820        let mut res = Vec::with_capacity(self.frozen.len() + 1);
821        res.extend(self.frozen.iter().cloned());
822        res.push(self.active.finish_cloned());
823        res
824    }
825}
826
827/// `ValueBuilder` holds all the vector builders for field columns.
828pub(crate) struct ValueBuilder {
829    timestamp: Vec<i64>,
830    timestamp_type: ConcreteDataType,
831    sequence: Vec<u64>,
832    op_type: Vec<u8>,
833    fields: Vec<Option<FieldBuilder>>,
834    field_types: Vec<ConcreteDataType>,
835}
836
837impl ValueBuilder {
838    pub(crate) fn new(region_metadata: &RegionMetadataRef, capacity: usize) -> Self {
839        let timestamp_type = region_metadata
840            .time_index_column()
841            .column_schema
842            .data_type
843            .clone();
844        let sequence = Vec::with_capacity(capacity);
845        let op_type = Vec::with_capacity(capacity);
846
847        let field_types = region_metadata
848            .field_columns()
849            .map(|c| c.column_schema.data_type.clone())
850            .collect::<Vec<_>>();
851        let fields = (0..field_types.len()).map(|_| None).collect();
852        Self {
853            timestamp: Vec::with_capacity(capacity),
854            timestamp_type,
855            sequence,
856            op_type,
857            fields,
858            field_types,
859        }
860    }
861
862    /// Returns number of field builders.
863    pub fn num_field_builders(&self) -> usize {
864        self.fields.iter().flatten().count()
865    }
866
867    /// Pushes a new row to `ValueBuilder`.
868    /// We don't need primary keys since they've already be encoded.
869    /// Returns the size of field values.
870    ///
871    /// In this method, we don't check the data type of the value, because it is already checked in the caller.
872    pub(crate) fn push<'a>(
873        &mut self,
874        ts: ValueRef,
875        sequence: u64,
876        op_type: u8,
877        fields: impl Iterator<Item = ValueRef<'a>>,
878    ) -> usize {
879        #[cfg(debug_assertions)]
880        let fields = {
881            let field_vec = fields.collect::<Vec<_>>();
882            debug_assert_eq!(field_vec.len(), self.fields.len());
883            field_vec.into_iter()
884        };
885
886        self.timestamp
887            .push(ts.try_into_timestamp().unwrap().unwrap().value());
888        self.sequence.push(sequence);
889        self.op_type.push(op_type);
890        let num_rows = self.timestamp.len();
891        let mut size = 0;
892        for (idx, field_value) in fields.enumerate() {
893            size += field_value.data_size();
894            if !field_value.is_null() || self.fields[idx].is_some() {
895                if let Some(field) = self.fields[idx].as_mut() {
896                    field
897                        .push(field_value)
898                        .unwrap_or_else(|e| panic!("Failed to push field value: {e:?}"));
899                } else {
900                    let mut mutable_vector =
901                        if let ConcreteDataType::String(_) = &self.field_types[idx] {
902                            FieldBuilder::String(StringBuilder::with_capacity(4, 8))
903                        } else {
904                            FieldBuilder::Other(
905                                self.field_types[idx]
906                                    .create_mutable_vector(num_rows.max(INITIAL_BUILDER_CAPACITY)),
907                            )
908                        };
909                    mutable_vector.push_nulls(num_rows - 1);
910                    mutable_vector
911                        .push(field_value)
912                        .unwrap_or_else(|e| panic!("unexpected field value: {e:?}"));
913                    self.fields[idx] = Some(mutable_vector);
914                    MEMTABLE_ACTIVE_FIELD_BUILDER_COUNT.inc();
915                }
916            }
917        }
918
919        size
920    }
921
922    /// Checks if current value builder have sufficient space to accommodate `fields`.
923    /// Returns false if there is no space to accommodate fields due to offset overflow.
924    pub(crate) fn can_accommodate(&self, fields: &[VectorRef]) -> Result<bool> {
925        for (field_src, field_dest) in fields.iter().zip(self.fields.iter()) {
926            let Some(builder) = field_dest else {
927                continue;
928            };
929            let FieldBuilder::String(builder) = builder else {
930                continue;
931            };
932            let array = field_src.to_arrow_array();
933            let string_array = array
934                .as_any()
935                .downcast_ref::<StringArray>()
936                .with_context(|| error::InvalidBatchSnafu {
937                    reason: format!(
938                        "Field type mismatch, expecting String, given: {}",
939                        field_src.data_type()
940                    ),
941                })?;
942            let space_needed = string_array.value_data().len() as i32;
943            // offset may overflow
944            if builder.next_offset().checked_add(space_needed).is_none() {
945                return Ok(false);
946            }
947        }
948        Ok(true)
949    }
950
951    pub(crate) fn extend(
952        &mut self,
953        ts_v: VectorRef,
954        op_type: u8,
955        sequence: u64,
956        fields: Vec<VectorRef>,
957    ) -> Result<()> {
958        let num_rows_before = self.timestamp.len();
959        let num_rows_to_write = ts_v.len();
960        self.timestamp.reserve(num_rows_to_write);
961        match self.timestamp_type {
962            ConcreteDataType::Timestamp(TimestampType::Second(_)) => {
963                self.timestamp.extend(
964                    ts_v.as_any()
965                        .downcast_ref::<TimestampSecondVector>()
966                        .unwrap()
967                        .iter_data()
968                        .map(|v| v.unwrap().0.value()),
969                );
970            }
971            ConcreteDataType::Timestamp(TimestampType::Millisecond(_)) => {
972                self.timestamp.extend(
973                    ts_v.as_any()
974                        .downcast_ref::<TimestampMillisecondVector>()
975                        .unwrap()
976                        .iter_data()
977                        .map(|v| v.unwrap().0.value()),
978                );
979            }
980            ConcreteDataType::Timestamp(TimestampType::Microsecond(_)) => {
981                self.timestamp.extend(
982                    ts_v.as_any()
983                        .downcast_ref::<TimestampMicrosecondVector>()
984                        .unwrap()
985                        .iter_data()
986                        .map(|v| v.unwrap().0.value()),
987                );
988            }
989            ConcreteDataType::Timestamp(TimestampType::Nanosecond(_)) => {
990                self.timestamp.extend(
991                    ts_v.as_any()
992                        .downcast_ref::<TimestampNanosecondVector>()
993                        .unwrap()
994                        .iter_data()
995                        .map(|v| v.unwrap().0.value()),
996                );
997            }
998            _ => unreachable!(),
999        };
1000
1001        self.op_type.reserve(num_rows_to_write);
1002        self.op_type
1003            .extend(iter::repeat_n(op_type, num_rows_to_write));
1004        self.sequence.reserve(num_rows_to_write);
1005        self.sequence
1006            .extend(iter::repeat_n(sequence, num_rows_to_write));
1007
1008        for (field_idx, (field_src, field_dest)) in
1009            fields.into_iter().zip(self.fields.iter_mut()).enumerate()
1010        {
1011            let builder = field_dest.get_or_insert_with(|| {
1012                let mut field_builder =
1013                    FieldBuilder::create(&self.field_types[field_idx], INITIAL_BUILDER_CAPACITY);
1014                field_builder.push_nulls(num_rows_before);
1015                field_builder
1016            });
1017            match builder {
1018                FieldBuilder::String(builder) => {
1019                    let array = field_src.to_arrow_array();
1020                    let string_array =
1021                        array
1022                            .as_any()
1023                            .downcast_ref::<StringArray>()
1024                            .with_context(|| error::InvalidBatchSnafu {
1025                                reason: format!(
1026                                    "Field type mismatch, expecting String, given: {}",
1027                                    field_src.data_type()
1028                                ),
1029                            })?;
1030                    builder.append_array(string_array);
1031                }
1032                FieldBuilder::Other(builder) => {
1033                    let len = field_src.len();
1034                    builder
1035                        .extend_slice_of(&*field_src, 0, len)
1036                        .context(error::ComputeVectorSnafu)?;
1037                }
1038            }
1039        }
1040        Ok(())
1041    }
1042
1043    /// Returns the length of [ValueBuilder]
1044    fn len(&self) -> usize {
1045        let sequence_len = self.sequence.len();
1046        debug_assert_eq!(sequence_len, self.op_type.len());
1047        debug_assert_eq!(sequence_len, self.timestamp.len());
1048        sequence_len
1049    }
1050
1051    fn finish_cloned(&self) -> Values {
1052        let num_rows = self.sequence.len();
1053        let fields = self
1054            .fields
1055            .iter()
1056            .enumerate()
1057            .map(|(i, v)| {
1058                if let Some(v) = v {
1059                    MEMTABLE_ACTIVE_FIELD_BUILDER_COUNT.dec();
1060                    v.finish_cloned()
1061                } else {
1062                    let mut single_null = self.field_types[i].create_mutable_vector(num_rows);
1063                    single_null.push_nulls(num_rows);
1064                    single_null.to_vector()
1065                }
1066            })
1067            .collect::<Vec<_>>();
1068
1069        let sequence = Arc::new(UInt64Vector::from_vec(self.sequence.clone()));
1070        let op_type = Arc::new(UInt8Vector::from_vec(self.op_type.clone()));
1071        let timestamp: VectorRef = match self.timestamp_type {
1072            ConcreteDataType::Timestamp(TimestampType::Second(_)) => {
1073                Arc::new(TimestampSecondVector::from_vec(self.timestamp.clone()))
1074            }
1075            ConcreteDataType::Timestamp(TimestampType::Millisecond(_)) => {
1076                Arc::new(TimestampMillisecondVector::from_vec(self.timestamp.clone()))
1077            }
1078            ConcreteDataType::Timestamp(TimestampType::Microsecond(_)) => {
1079                Arc::new(TimestampMicrosecondVector::from_vec(self.timestamp.clone()))
1080            }
1081            ConcreteDataType::Timestamp(TimestampType::Nanosecond(_)) => {
1082                Arc::new(TimestampNanosecondVector::from_vec(self.timestamp.clone()))
1083            }
1084            _ => unreachable!(),
1085        };
1086
1087        if cfg!(debug_assertions) {
1088            debug_assert_eq!(timestamp.len(), sequence.len());
1089            debug_assert_eq!(timestamp.len(), op_type.len());
1090            for field in &fields {
1091                debug_assert_eq!(timestamp.len(), field.len());
1092            }
1093        }
1094
1095        Values {
1096            timestamp,
1097            sequence,
1098            op_type,
1099            fields,
1100        }
1101    }
1102}
1103
1104/// [Values] holds an immutable vectors of field columns, including `sequence` and `op_type`.
1105#[derive(Clone)]
1106pub struct Values {
1107    pub(crate) timestamp: VectorRef,
1108    pub(crate) sequence: Arc<UInt64Vector>,
1109    pub(crate) op_type: Arc<UInt8Vector>,
1110    pub(crate) fields: Vec<VectorRef>,
1111}
1112
1113impl Values {
1114    /// Converts [Values] to `Batch`, applies the optional sequence filter, sorts the batch
1115    /// according to `timestamp, sequence` desc, and applies dedup/merge according to `merge_mode`.
1116    pub fn to_batch(
1117        &self,
1118        primary_key: &[u8],
1119        metadata: &RegionMetadataRef,
1120        projection: &HashSet<ColumnId>,
1121        sequence: Option<SequenceRange>,
1122        dedup: bool,
1123        merge_mode: MergeMode,
1124    ) -> Result<Batch> {
1125        let builder = BatchBuilder::with_required_columns(
1126            primary_key.to_vec(),
1127            self.timestamp.clone(),
1128            self.sequence.clone(),
1129            self.op_type.clone(),
1130        );
1131
1132        let fields = metadata
1133            .field_columns()
1134            .zip(self.fields.iter())
1135            .filter_map(|(c, f)| {
1136                projection.get(&c.column_id).map(|c| BatchColumn {
1137                    column_id: *c,
1138                    data: f.clone(),
1139                })
1140            })
1141            .collect();
1142
1143        let mut batch = builder.with_fields(fields).build()?;
1144        // The sequence filter must be applied before dedup/merge to:
1145        // - avoid dropping a timestamp when the newest row is out of range
1146        // - avoid filling null fields from rows that should be excluded by the sequence filter.
1147        batch.filter_by_sequence(sequence)?;
1148
1149        match (dedup, merge_mode) {
1150            // append-only, keep duplicate rows.
1151            (false, _) => batch.sort(false)?,
1152            // keep the last row for each timestamp.
1153            (true, MergeMode::LastRow) => batch.sort(true)?,
1154            // keep the last non-null value for each field.
1155            (true, MergeMode::LastNonNull) => {
1156                batch.sort(false)?;
1157                batch.merge_last_non_null()?;
1158            }
1159        }
1160        Ok(batch)
1161    }
1162
1163    /// Returns a vector of all columns converted to arrow [Array](datatypes::arrow::array::Array) in [Values].
1164    fn columns(&self) -> Vec<ArrayRef> {
1165        let mut res = Vec::with_capacity(3 + self.fields.len());
1166        res.push(self.timestamp.to_arrow_array());
1167        res.push(self.sequence.to_arrow_array());
1168        res.push(self.op_type.to_arrow_array());
1169        res.extend(self.fields.iter().map(|f| f.to_arrow_array()));
1170        res
1171    }
1172
1173    /// Builds a new [Values] instance from columns.
1174    fn from_columns(cols: &[ArrayRef]) -> Result<Self> {
1175        debug_assert!(cols.len() >= 3);
1176        let timestamp = Helper::try_into_vector(&cols[0]).context(ConvertVectorSnafu)?;
1177        let sequence =
1178            Arc::new(UInt64Vector::try_from_arrow_array(&cols[1]).context(ConvertVectorSnafu)?);
1179        let op_type =
1180            Arc::new(UInt8Vector::try_from_arrow_array(&cols[2]).context(ConvertVectorSnafu)?);
1181        let fields = Helper::try_into_vectors(&cols[3..]).context(ConvertVectorSnafu)?;
1182
1183        Ok(Self {
1184            timestamp,
1185            sequence,
1186            op_type,
1187            fields,
1188        })
1189    }
1190}
1191
1192impl From<ValueBuilder> for Values {
1193    fn from(mut value: ValueBuilder) -> Self {
1194        let num_rows = value.len();
1195        let fields = value
1196            .fields
1197            .iter_mut()
1198            .enumerate()
1199            .map(|(i, v)| {
1200                if let Some(v) = v {
1201                    MEMTABLE_ACTIVE_FIELD_BUILDER_COUNT.dec();
1202                    v.finish()
1203                } else {
1204                    let mut single_null = value.field_types[i].create_mutable_vector(num_rows);
1205                    single_null.push_nulls(num_rows);
1206                    single_null.to_vector()
1207                }
1208            })
1209            .collect::<Vec<_>>();
1210
1211        let sequence = Arc::new(UInt64Vector::from_vec(value.sequence));
1212        let op_type = Arc::new(UInt8Vector::from_vec(value.op_type));
1213        let timestamp: VectorRef = match value.timestamp_type {
1214            ConcreteDataType::Timestamp(TimestampType::Second(_)) => {
1215                Arc::new(TimestampSecondVector::from_vec(value.timestamp))
1216            }
1217            ConcreteDataType::Timestamp(TimestampType::Millisecond(_)) => {
1218                Arc::new(TimestampMillisecondVector::from_vec(value.timestamp))
1219            }
1220            ConcreteDataType::Timestamp(TimestampType::Microsecond(_)) => {
1221                Arc::new(TimestampMicrosecondVector::from_vec(value.timestamp))
1222            }
1223            ConcreteDataType::Timestamp(TimestampType::Nanosecond(_)) => {
1224                Arc::new(TimestampNanosecondVector::from_vec(value.timestamp))
1225            }
1226            _ => unreachable!(),
1227        };
1228
1229        if cfg!(debug_assertions) {
1230            debug_assert_eq!(timestamp.len(), sequence.len());
1231            debug_assert_eq!(timestamp.len(), op_type.len());
1232            for field in &fields {
1233                debug_assert_eq!(timestamp.len(), field.len());
1234            }
1235        }
1236
1237        Self {
1238            timestamp,
1239            sequence,
1240            op_type,
1241            fields,
1242        }
1243    }
1244}
1245
1246struct TimeSeriesIterBuilder {
1247    series_set: SeriesSet,
1248    projection: HashSet<ColumnId>,
1249    predicate: PredicateGroup,
1250    dedup: bool,
1251    sequence: Option<SequenceRange>,
1252    merge_mode: MergeMode,
1253    batch_to_record_batch: Arc<BatchToRecordBatchContext>,
1254}
1255
1256impl IterBuilder for TimeSeriesIterBuilder {
1257    fn build(&self, metrics: Option<MemScanMetrics>) -> Result<BoxedBatchIterator> {
1258        let iter = self.series_set.iter_series(
1259            self.projection.clone(),
1260            self.predicate.clone(),
1261            self.dedup,
1262            self.merge_mode,
1263            self.sequence,
1264            metrics,
1265        )?;
1266        if self.merge_mode == MergeMode::LastNonNull {
1267            let iter = LastNonNullIter::new(iter);
1268            Ok(Box::new(iter))
1269        } else {
1270            Ok(Box::new(iter))
1271        }
1272    }
1273
1274    fn is_record_batch(&self) -> bool {
1275        true
1276    }
1277
1278    fn build_record_batch(
1279        &self,
1280        time_range: Option<(Timestamp, Timestamp)>,
1281        metrics: Option<MemScanMetrics>,
1282    ) -> Result<BoxedRecordBatchIterator> {
1283        let iter = self.build(metrics)?;
1284        let iter: BoxedBatchIterator = if let Some(time_range) = time_range {
1285            let time_filters = self.predicate.time_filters();
1286            Box::new(PruneTimeIterator::new(iter, time_range, time_filters))
1287        } else {
1288            iter
1289        };
1290        Ok(self.batch_to_record_batch.adapt_iter(iter))
1291    }
1292}
1293
1294#[cfg(test)]
1295mod tests {
1296    use std::collections::{HashMap, HashSet};
1297
1298    use api::helper::ColumnDataTypeWrapper;
1299    use api::v1::helper::row;
1300    use api::v1::value::ValueData;
1301    use api::v1::{Mutation, Rows, SemanticType};
1302    use common_time::Timestamp;
1303    use datatypes::prelude::{ConcreteDataType, ScalarVector};
1304    use datatypes::schema::ColumnSchema;
1305    use datatypes::value::{OrderedFloat, Value};
1306    use datatypes::vectors::{Float64Vector, Int64Vector, TimestampMillisecondVector};
1307    use mito_codec::row_converter::SortField;
1308    use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
1309    use store_api::storage::RegionId;
1310
1311    use super::*;
1312    use crate::test_util::column_metadata_to_column_schema;
1313
1314    fn schema_for_test() -> RegionMetadataRef {
1315        let mut builder = RegionMetadataBuilder::new(RegionId::new(123, 456));
1316        builder
1317            .push_column_metadata(ColumnMetadata {
1318                column_schema: ColumnSchema::new("k0", ConcreteDataType::string_datatype(), false),
1319                semantic_type: SemanticType::Tag,
1320                column_id: 0,
1321            })
1322            .push_column_metadata(ColumnMetadata {
1323                column_schema: ColumnSchema::new("k1", ConcreteDataType::int64_datatype(), false),
1324                semantic_type: SemanticType::Tag,
1325                column_id: 1,
1326            })
1327            .push_column_metadata(ColumnMetadata {
1328                column_schema: ColumnSchema::new(
1329                    "ts",
1330                    ConcreteDataType::timestamp_millisecond_datatype(),
1331                    false,
1332                ),
1333                semantic_type: SemanticType::Timestamp,
1334                column_id: 2,
1335            })
1336            .push_column_metadata(ColumnMetadata {
1337                column_schema: ColumnSchema::new("v0", ConcreteDataType::int64_datatype(), true),
1338                semantic_type: SemanticType::Field,
1339                column_id: 3,
1340            })
1341            .push_column_metadata(ColumnMetadata {
1342                column_schema: ColumnSchema::new("v1", ConcreteDataType::float64_datatype(), true),
1343                semantic_type: SemanticType::Field,
1344                column_id: 4,
1345            })
1346            .primary_key(vec![0, 1]);
1347        let region_metadata = builder.build().unwrap();
1348        Arc::new(region_metadata)
1349    }
1350
1351    fn ts_value_ref(val: i64) -> ValueRef<'static> {
1352        ValueRef::Timestamp(Timestamp::new_millisecond(val))
1353    }
1354
1355    fn field_value_ref(v0: i64, v1: f64) -> impl Iterator<Item = ValueRef<'static>> {
1356        vec![ValueRef::Int64(v0), ValueRef::Float64(OrderedFloat(v1))].into_iter()
1357    }
1358
1359    fn check_values(values: &Values, expect: &[(i64, u64, u8, i64, f64)]) {
1360        let ts = values
1361            .timestamp
1362            .as_any()
1363            .downcast_ref::<TimestampMillisecondVector>()
1364            .unwrap();
1365
1366        let v0 = values.fields[0]
1367            .as_any()
1368            .downcast_ref::<Int64Vector>()
1369            .unwrap();
1370        let v1 = values.fields[1]
1371            .as_any()
1372            .downcast_ref::<Float64Vector>()
1373            .unwrap();
1374        let read = ts
1375            .iter_data()
1376            .zip(values.sequence.iter_data())
1377            .zip(values.op_type.iter_data())
1378            .zip(v0.iter_data())
1379            .zip(v1.iter_data())
1380            .map(|((((ts, sequence), op_type), v0), v1)| {
1381                (
1382                    ts.unwrap().0.value(),
1383                    sequence.unwrap(),
1384                    op_type.unwrap(),
1385                    v0.unwrap(),
1386                    v1.unwrap(),
1387                )
1388            })
1389            .collect::<Vec<_>>();
1390        assert_eq!(expect, &read);
1391    }
1392
1393    #[test]
1394    fn test_series() {
1395        let region_metadata = schema_for_test();
1396        let mut series = Series::new(&region_metadata);
1397        series.push(ts_value_ref(1), 0, OpType::Put, field_value_ref(1, 10.1));
1398        series.push(ts_value_ref(2), 0, OpType::Put, field_value_ref(2, 10.2));
1399        assert_eq!(2, series.active.timestamp.len());
1400        assert_eq!(0, series.frozen.len());
1401
1402        let values = series.compact(&region_metadata).unwrap();
1403        check_values(values, &[(1, 0, 1, 1, 10.1), (2, 0, 1, 2, 10.2)]);
1404        assert_eq!(0, series.active.timestamp.len());
1405        assert_eq!(1, series.frozen.len());
1406    }
1407
1408    #[test]
1409    fn test_series_with_nulls() {
1410        let region_metadata = schema_for_test();
1411        let mut series = Series::new(&region_metadata);
1412        // col1: NULL 1 2 3
1413        // col2: NULL NULL 10.2 NULL
1414        series.push(
1415            ts_value_ref(1),
1416            0,
1417            OpType::Put,
1418            vec![ValueRef::Null, ValueRef::Null].into_iter(),
1419        );
1420        series.push(
1421            ts_value_ref(1),
1422            0,
1423            OpType::Put,
1424            vec![ValueRef::Int64(1), ValueRef::Null].into_iter(),
1425        );
1426        series.push(ts_value_ref(1), 2, OpType::Put, field_value_ref(2, 10.2));
1427        series.push(
1428            ts_value_ref(1),
1429            3,
1430            OpType::Put,
1431            vec![ValueRef::Int64(2), ValueRef::Null].into_iter(),
1432        );
1433        assert_eq!(4, series.active.timestamp.len());
1434        assert_eq!(0, series.frozen.len());
1435
1436        let values = series.compact(&region_metadata).unwrap();
1437        assert_eq!(values.fields[0].null_count(), 1);
1438        assert_eq!(values.fields[1].null_count(), 3);
1439        assert_eq!(0, series.active.timestamp.len());
1440        assert_eq!(1, series.frozen.len());
1441    }
1442
1443    fn check_value(batch: &Batch, expect: Vec<Vec<Value>>) {
1444        let ts_len = batch.timestamps().len();
1445        assert_eq!(batch.sequences().len(), ts_len);
1446        assert_eq!(batch.op_types().len(), ts_len);
1447        for f in batch.fields() {
1448            assert_eq!(f.data.len(), ts_len);
1449        }
1450
1451        let mut rows = vec![];
1452        for idx in 0..ts_len {
1453            let mut row = Vec::with_capacity(batch.fields().len() + 3);
1454            row.push(batch.timestamps().get(idx));
1455            row.push(batch.sequences().get(idx));
1456            row.push(batch.op_types().get(idx));
1457            row.extend(batch.fields().iter().map(|f| f.data.get(idx)));
1458            rows.push(row);
1459        }
1460
1461        assert_eq!(expect.len(), rows.len());
1462        for (idx, row) in rows.iter().enumerate() {
1463            assert_eq!(&expect[idx], row);
1464        }
1465    }
1466
1467    #[test]
1468    fn test_values_sort() {
1469        let schema = schema_for_test();
1470        let timestamp = Arc::new(TimestampMillisecondVector::from_vec(vec![1, 2, 3, 4, 3]));
1471        let sequence = Arc::new(UInt64Vector::from_vec(vec![1, 1, 1, 1, 2]));
1472        let op_type = Arc::new(UInt8Vector::from_vec(vec![1, 1, 1, 1, 0]));
1473
1474        let fields = vec![
1475            Arc::new(Int64Vector::from_vec(vec![4, 3, 2, 1, 2])) as Arc<_>,
1476            Arc::new(Float64Vector::from_vec(vec![1.1, 2.1, 4.2, 3.3, 4.2])) as Arc<_>,
1477        ];
1478        let values = Values {
1479            timestamp: timestamp as Arc<_>,
1480            sequence,
1481            op_type,
1482            fields,
1483        };
1484
1485        let batch = values
1486            .to_batch(
1487                b"test",
1488                &schema,
1489                &[0, 1, 2, 3, 4].into_iter().collect(),
1490                None,
1491                true,
1492                MergeMode::LastRow,
1493            )
1494            .unwrap();
1495        check_value(
1496            &batch,
1497            vec![
1498                vec![
1499                    Value::Timestamp(Timestamp::new_millisecond(1)),
1500                    Value::UInt64(1),
1501                    Value::UInt8(1),
1502                    Value::Int64(4),
1503                    Value::Float64(OrderedFloat(1.1)),
1504                ],
1505                vec![
1506                    Value::Timestamp(Timestamp::new_millisecond(2)),
1507                    Value::UInt64(1),
1508                    Value::UInt8(1),
1509                    Value::Int64(3),
1510                    Value::Float64(OrderedFloat(2.1)),
1511                ],
1512                vec![
1513                    Value::Timestamp(Timestamp::new_millisecond(3)),
1514                    Value::UInt64(2),
1515                    Value::UInt8(0),
1516                    Value::Int64(2),
1517                    Value::Float64(OrderedFloat(4.2)),
1518                ],
1519                vec![
1520                    Value::Timestamp(Timestamp::new_millisecond(4)),
1521                    Value::UInt64(1),
1522                    Value::UInt8(1),
1523                    Value::Int64(1),
1524                    Value::Float64(OrderedFloat(3.3)),
1525                ],
1526            ],
1527        )
1528    }
1529
1530    #[test]
1531    fn test_last_non_null_should_filter_by_sequence_before_merge_drop_ts() {
1532        let schema = schema_for_test();
1533        let projection: HashSet<_> = [0, 1, 2, 3, 4].into_iter().collect();
1534
1535        // Same timestamp, newest sequence is out of range. We should still keep the timestamp by
1536        // using the latest row *within* the sequence range as the base row.
1537        //
1538        // Expect after filtering seq<=2:
1539        // - base row: seq=2
1540        // - v0 from seq=2, v1 filled from seq=1
1541        let timestamp = Arc::new(TimestampMillisecondVector::from_vec(vec![1, 1, 1]));
1542        let sequence = Arc::new(UInt64Vector::from_vec(vec![1, 2, 3]));
1543        let op_type = Arc::new(UInt8Vector::from_vec(vec![OpType::Put as u8; 3]));
1544        let fields = vec![
1545            Arc::new(Int64Vector::from(vec![None, Some(10), None])) as Arc<_>,
1546            Arc::new(Float64Vector::from(vec![Some(1.5), None, None])) as Arc<_>,
1547        ];
1548        let values = Values {
1549            timestamp: timestamp as Arc<_>,
1550            sequence,
1551            op_type,
1552            fields,
1553        };
1554
1555        let batch = values
1556            .to_batch(
1557                b"test",
1558                &schema,
1559                &projection,
1560                Some(SequenceRange::LtEq { max: 2 }),
1561                true,
1562                MergeMode::LastNonNull,
1563            )
1564            .unwrap();
1565
1566        check_value(
1567            &batch,
1568            vec![vec![
1569                Value::Timestamp(Timestamp::new_millisecond(1)),
1570                Value::UInt64(2),
1571                Value::UInt8(OpType::Put as u8),
1572                Value::Int64(10),
1573                Value::Float64(OrderedFloat(1.5)),
1574            ]],
1575        );
1576    }
1577
1578    #[test]
1579    fn test_last_non_null_should_filter_by_sequence_before_merge_no_fill_from_out_of_range_row() {
1580        let schema = schema_for_test();
1581        let projection: HashSet<_> = [0, 1, 2, 3, 4].into_iter().collect();
1582
1583        // Same timestamp, older sequence is out of range. We must not fill null fields using rows
1584        // that should be excluded by the sequence filter.
1585        //
1586        // Expect after filtering seq>1:
1587        // - keep only seq=2 row, v0 stays NULL.
1588        let timestamp = Arc::new(TimestampMillisecondVector::from_vec(vec![1, 1]));
1589        let sequence = Arc::new(UInt64Vector::from_vec(vec![1, 2]));
1590        let op_type = Arc::new(UInt8Vector::from_vec(vec![OpType::Put as u8; 2]));
1591        let fields = vec![
1592            Arc::new(Int64Vector::from(vec![Some(10), None])) as Arc<_>,
1593            Arc::new(Float64Vector::from(vec![Some(1.0), Some(1.0)])) as Arc<_>,
1594        ];
1595        let values = Values {
1596            timestamp: timestamp as Arc<_>,
1597            sequence,
1598            op_type,
1599            fields,
1600        };
1601
1602        let batch = values
1603            .to_batch(
1604                b"test",
1605                &schema,
1606                &projection,
1607                Some(SequenceRange::Gt { min: 1 }),
1608                true,
1609                MergeMode::LastNonNull,
1610            )
1611            .unwrap();
1612
1613        check_value(
1614            &batch,
1615            vec![vec![
1616                Value::Timestamp(Timestamp::new_millisecond(1)),
1617                Value::UInt64(2),
1618                Value::UInt8(OpType::Put as u8),
1619                Value::Null,
1620                Value::Float64(OrderedFloat(1.0)),
1621            ]],
1622        );
1623    }
1624
1625    fn build_key_values(schema: &RegionMetadataRef, k0: String, k1: i64, len: usize) -> KeyValues {
1626        let column_schema = schema
1627            .column_metadatas
1628            .iter()
1629            .map(|c| api::v1::ColumnSchema {
1630                column_name: c.column_schema.name.clone(),
1631                datatype: ColumnDataTypeWrapper::try_from(c.column_schema.data_type.clone())
1632                    .unwrap()
1633                    .datatype() as i32,
1634                semantic_type: c.semantic_type as i32,
1635                ..Default::default()
1636            })
1637            .collect();
1638
1639        let rows = (0..len)
1640            .map(|i| {
1641                row(vec![
1642                    ValueData::StringValue(k0.clone()),
1643                    ValueData::I64Value(k1),
1644                    ValueData::TimestampMillisecondValue(i as i64),
1645                    ValueData::I64Value(i as i64),
1646                    ValueData::F64Value(i as f64),
1647                ])
1648            })
1649            .collect();
1650        let mutation = api::v1::Mutation {
1651            op_type: 1,
1652            sequence: 0,
1653            rows: Some(Rows {
1654                schema: column_schema,
1655                rows,
1656            }),
1657            write_hint: None,
1658        };
1659        KeyValues::new(schema.as_ref(), mutation).unwrap()
1660    }
1661
1662    #[test]
1663    fn test_series_set_concurrency() {
1664        let schema = schema_for_test();
1665        let row_codec = Arc::new(DensePrimaryKeyCodec::with_fields(
1666            schema
1667                .primary_key_columns()
1668                .map(|c| {
1669                    (
1670                        c.column_id,
1671                        SortField::new(c.column_schema.data_type.clone()),
1672                    )
1673                })
1674                .collect(),
1675        ));
1676        let set = Arc::new(SeriesSet::new(schema.clone(), row_codec));
1677
1678        let concurrency = 32;
1679        let pk_num = concurrency * 2;
1680        let mut handles = Vec::with_capacity(concurrency);
1681        for i in 0..concurrency {
1682            let set = set.clone();
1683            let schema = schema.clone();
1684            let column_schemas = schema
1685                .column_metadatas
1686                .iter()
1687                .map(column_metadata_to_column_schema)
1688                .collect::<Vec<_>>();
1689            let handle = std::thread::spawn(move || {
1690                for j in i * 100..(i + 1) * 100 {
1691                    let pk = j % pk_num;
1692                    let primary_key = format!("pk-{}", pk).as_bytes().to_vec();
1693
1694                    let kvs = KeyValues::new(
1695                        &schema,
1696                        Mutation {
1697                            op_type: OpType::Put as i32,
1698                            sequence: j as u64,
1699                            rows: Some(Rows {
1700                                schema: column_schemas.clone(),
1701                                rows: vec![row(vec![
1702                                    ValueData::StringValue(format!("{}", j)),
1703                                    ValueData::I64Value(j as i64),
1704                                    ValueData::TimestampMillisecondValue(j as i64),
1705                                    ValueData::I64Value(j as i64),
1706                                    ValueData::F64Value(j as f64),
1707                                ])],
1708                            }),
1709                            write_hint: None,
1710                        },
1711                    )
1712                    .unwrap();
1713                    set.push_to_series(primary_key, &kvs.iter().next().unwrap());
1714                }
1715            });
1716            handles.push(handle);
1717        }
1718        for h in handles {
1719            h.join().unwrap();
1720        }
1721
1722        let mut timestamps = Vec::with_capacity(concurrency * 100);
1723        let mut sequences = Vec::with_capacity(concurrency * 100);
1724        let mut op_types = Vec::with_capacity(concurrency * 100);
1725        let mut v0 = Vec::with_capacity(concurrency * 100);
1726
1727        for i in 0..pk_num {
1728            let pk = format!("pk-{}", i).as_bytes().to_vec();
1729            let series = set.get_series(&pk).unwrap();
1730            let mut guard = series.write().unwrap();
1731            let values = guard.compact(&schema).unwrap();
1732            timestamps.extend(values.sequence.iter_data().map(|v| v.unwrap() as i64));
1733            sequences.extend(values.sequence.iter_data().map(|v| v.unwrap() as i64));
1734            op_types.extend(values.op_type.iter_data().map(|v| v.unwrap()));
1735            v0.extend(
1736                values
1737                    .fields
1738                    .first()
1739                    .unwrap()
1740                    .as_any()
1741                    .downcast_ref::<Int64Vector>()
1742                    .unwrap()
1743                    .iter_data()
1744                    .map(|v| v.unwrap()),
1745            );
1746        }
1747
1748        let expected_sequence = (0..(concurrency * 100) as i64).collect::<HashSet<_>>();
1749        assert_eq!(
1750            expected_sequence,
1751            sequences.iter().copied().collect::<HashSet<_>>()
1752        );
1753
1754        op_types.iter().all(|op| *op == OpType::Put as u8);
1755        assert_eq!(
1756            expected_sequence,
1757            timestamps.iter().copied().collect::<HashSet<_>>()
1758        );
1759
1760        assert_eq!(timestamps, sequences);
1761        assert_eq!(v0, timestamps);
1762    }
1763
1764    #[test]
1765    fn test_memtable() {
1766        common_telemetry::init_default_ut_logging();
1767        check_memtable_dedup(true);
1768        check_memtable_dedup(false);
1769    }
1770
1771    fn check_memtable_dedup(dedup: bool) {
1772        let schema = schema_for_test();
1773        let kvs = build_key_values(&schema, "hello".to_string(), 42, 100);
1774        let memtable = TimeSeriesMemtable::new(schema, 42, None, dedup, MergeMode::LastRow);
1775        memtable.write(&kvs).unwrap();
1776        memtable.write(&kvs).unwrap();
1777
1778        let mut expected_ts: HashMap<i64, usize> = HashMap::new();
1779        for ts in kvs.iter().map(|kv| {
1780            kv.timestamp()
1781                .try_into_timestamp()
1782                .unwrap()
1783                .unwrap()
1784                .value()
1785        }) {
1786            *expected_ts.entry(ts).or_default() += if dedup { 1 } else { 2 };
1787        }
1788
1789        let ranges = memtable.ranges(None, RangesOptions::default()).unwrap();
1790        let range = ranges.ranges.into_values().next().unwrap();
1791        let iter = range.build_iter().unwrap();
1792        let mut read = HashMap::new();
1793
1794        for ts in iter
1795            .flat_map(|batch| {
1796                batch
1797                    .unwrap()
1798                    .timestamps()
1799                    .as_any()
1800                    .downcast_ref::<TimestampMillisecondVector>()
1801                    .unwrap()
1802                    .iter_data()
1803                    .collect::<Vec<_>>()
1804                    .into_iter()
1805            })
1806            .map(|v| v.unwrap().0.value())
1807        {
1808            *read.entry(ts).or_default() += 1;
1809        }
1810        assert_eq!(expected_ts, read);
1811
1812        let stats = memtable.stats();
1813        assert!(stats.bytes_allocated() > 0);
1814        assert_eq!(
1815            Some((
1816                Timestamp::new_millisecond(0),
1817                Timestamp::new_millisecond(99)
1818            )),
1819            stats.time_range()
1820        );
1821    }
1822
1823    #[test]
1824    fn test_memtable_projection() {
1825        common_telemetry::init_default_ut_logging();
1826        let schema = schema_for_test();
1827        let kvs = build_key_values(&schema, "hello".to_string(), 42, 100);
1828        let memtable = TimeSeriesMemtable::new(schema, 42, None, true, MergeMode::LastRow);
1829        memtable.write(&kvs).unwrap();
1830
1831        let iter = memtable
1832            .ranges(Some(&[3]), RangesOptions::default())
1833            .unwrap()
1834            .build(None)
1835            .unwrap();
1836
1837        let mut v0_all = vec![];
1838
1839        for res in iter {
1840            let batch = res.unwrap();
1841            assert_eq!(1, batch.fields().len());
1842            let v0 = batch
1843                .fields()
1844                .first()
1845                .unwrap()
1846                .data
1847                .as_any()
1848                .downcast_ref::<Int64Vector>()
1849                .unwrap();
1850            v0_all.extend(v0.iter_data().map(|v| v.unwrap()));
1851        }
1852        assert_eq!((0..100i64).collect::<Vec<_>>(), v0_all);
1853    }
1854
1855    #[test]
1856    fn test_memtable_concurrent_write_read() {
1857        common_telemetry::init_default_ut_logging();
1858        let schema = schema_for_test();
1859        let memtable = Arc::new(TimeSeriesMemtable::new(
1860            schema.clone(),
1861            42,
1862            None,
1863            true,
1864            MergeMode::LastRow,
1865        ));
1866
1867        // Number of writer threads
1868        let num_writers = 10;
1869        // Number of reader threads
1870        let num_readers = 5;
1871        // Number of series per writer
1872        let series_per_writer = 100;
1873        // Number of rows per series
1874        let rows_per_series = 10;
1875        // Total number of series
1876        let total_series = num_writers * series_per_writer;
1877
1878        // Create a barrier to synchronize the start of all threads
1879        let barrier = Arc::new(std::sync::Barrier::new(num_writers + num_readers + 1));
1880
1881        // Spawn writer threads
1882        let mut writer_handles = Vec::with_capacity(num_writers);
1883        for writer_id in 0..num_writers {
1884            let memtable = memtable.clone();
1885            let schema = schema.clone();
1886            let barrier = barrier.clone();
1887
1888            let handle = std::thread::spawn(move || {
1889                // Wait for all threads to be ready
1890                barrier.wait();
1891
1892                // Create and write series
1893                for series_id in 0..series_per_writer {
1894                    let series_key = format!("writer-{}-series-{}", writer_id, series_id);
1895                    let kvs =
1896                        build_key_values(&schema, series_key, series_id as i64, rows_per_series);
1897                    memtable.write(&kvs).unwrap();
1898                }
1899            });
1900
1901            writer_handles.push(handle);
1902        }
1903
1904        // Spawn reader threads
1905        let mut reader_handles = Vec::with_capacity(num_readers);
1906        for _ in 0..num_readers {
1907            let memtable = memtable.clone();
1908            let barrier = barrier.clone();
1909
1910            let handle = std::thread::spawn(move || {
1911                barrier.wait();
1912
1913                for _ in 0..10 {
1914                    let iter = memtable
1915                        .ranges(None, RangesOptions::default())
1916                        .unwrap()
1917                        .build(None)
1918                        .unwrap();
1919                    for batch_result in iter {
1920                        let _ = batch_result.unwrap();
1921                    }
1922                }
1923            });
1924
1925            reader_handles.push(handle);
1926        }
1927
1928        barrier.wait();
1929
1930        for handle in writer_handles {
1931            handle.join().unwrap();
1932        }
1933        for handle in reader_handles {
1934            handle.join().unwrap();
1935        }
1936
1937        let iter = memtable
1938            .ranges(None, RangesOptions::default())
1939            .unwrap()
1940            .build(None)
1941            .unwrap();
1942        let mut series_count = 0;
1943        let mut row_count = 0;
1944
1945        for batch_result in iter {
1946            let batch = batch_result.unwrap();
1947            series_count += 1;
1948            row_count += batch.num_rows();
1949        }
1950        assert_eq!(total_series, series_count);
1951        assert_eq!(total_series * rows_per_series, row_count);
1952    }
1953
1954    #[test]
1955    fn test_build_record_batch_iter_from_memtable() {
1956        let schema = schema_for_test();
1957        let memtable = TimeSeriesMemtable::new(schema.clone(), 1, None, true, MergeMode::LastRow);
1958
1959        let kvs = build_key_values(&schema, "test".to_string(), 1, 10);
1960        memtable.write(&kvs).unwrap();
1961
1962        let read_column_ids: Vec<ColumnId> = schema
1963            .column_metadatas
1964            .iter()
1965            .map(|c| c.column_id)
1966            .collect();
1967        let ranges = memtable
1968            .ranges(Some(&read_column_ids), RangesOptions::default())
1969            .unwrap();
1970        assert_eq!(1, ranges.ranges.len());
1971
1972        let range = ranges.ranges.into_values().next().unwrap();
1973        let mut iter = range.build_record_batch_iter(None, None).unwrap();
1974        let rb = iter.next().transpose().unwrap().unwrap();
1975        assert_eq!(10, rb.num_rows());
1976        // k0, k1 (pk columns), v0, v1 (field columns), ts, __primary_key, __sequence, __op_type
1977        let schema = rb.schema();
1978        let column_names: Vec<_> = schema.fields().iter().map(|f| f.name().as_str()).collect();
1979        assert_eq!(
1980            column_names,
1981            vec![
1982                "k0",
1983                "k1",
1984                "v0",
1985                "v1",
1986                "ts",
1987                "__primary_key",
1988                "__sequence",
1989                "__op_type",
1990            ]
1991        );
1992        assert!(iter.next().is_none());
1993    }
1994
1995    #[test]
1996    fn test_build_record_batch_iter_with_time_range() {
1997        let schema = schema_for_test();
1998        let memtable = TimeSeriesMemtable::new(schema.clone(), 1, None, true, MergeMode::LastRow);
1999
2000        let kvs = build_key_values(&schema, "test".to_string(), 1, 10);
2001        memtable.write(&kvs).unwrap();
2002
2003        let read_column_ids: Vec<ColumnId> = schema
2004            .column_metadatas
2005            .iter()
2006            .map(|c| c.column_id)
2007            .collect();
2008        let ranges = memtable
2009            .ranges(Some(&read_column_ids), RangesOptions::default())
2010            .unwrap();
2011        assert_eq!(1, ranges.ranges.len());
2012
2013        let time_range = (Timestamp::new_millisecond(3), Timestamp::new_millisecond(7));
2014
2015        let range = ranges.ranges.into_values().next().unwrap();
2016        let mut iter = range
2017            .build_record_batch_iter(Some(time_range), None)
2018            .unwrap();
2019
2020        let mut total_rows = 0;
2021        let mut all_timestamps = Vec::new();
2022        while let Some(rb) = iter.next().transpose().unwrap() {
2023            total_rows += rb.num_rows();
2024            let ts_col = rb
2025                .column_by_name("ts")
2026                .unwrap()
2027                .as_any()
2028                .downcast_ref::<datatypes::arrow::array::TimestampMillisecondArray>()
2029                .unwrap();
2030            for i in 0..ts_col.len() {
2031                all_timestamps.push(ts_col.value(i));
2032            }
2033        }
2034        assert_eq!(5, total_rows);
2035        all_timestamps.sort();
2036        assert_eq!(vec![3, 4, 5, 6, 7], all_timestamps);
2037    }
2038
2039    /// Helper to create a TimeSeriesIterBuilder from a memtable and schema.
2040    fn build_iter_builder(
2041        schema: &RegionMetadataRef,
2042        memtable: &TimeSeriesMemtable,
2043        projection: Option<&[ColumnId]>,
2044        dedup: bool,
2045        merge_mode: MergeMode,
2046        sequence: Option<SequenceRange>,
2047    ) -> TimeSeriesIterBuilder {
2048        let read_column_ids = read_column_ids_from_projection(schema, projection);
2049        let field_projection = if let Some(projection) = projection {
2050            projection.iter().copied().collect()
2051        } else {
2052            schema.field_columns().map(|c| c.column_id).collect()
2053        };
2054        let adapter_context = Arc::new(BatchToRecordBatchContext::new(
2055            schema.clone(),
2056            read_column_ids,
2057        ));
2058        TimeSeriesIterBuilder {
2059            series_set: memtable.series_set.clone(),
2060            projection: field_projection,
2061            predicate: PredicateGroup::default(),
2062            dedup,
2063            merge_mode,
2064            sequence,
2065            batch_to_record_batch: adapter_context,
2066        }
2067    }
2068
2069    #[test]
2070    fn test_iter_builder_build_record_batch_basic() {
2071        let schema = schema_for_test();
2072        let memtable = TimeSeriesMemtable::new(schema.clone(), 1, None, true, MergeMode::LastRow);
2073
2074        let kvs = build_key_values(&schema, "hello".to_string(), 42, 10);
2075        memtable.write(&kvs).unwrap();
2076
2077        let builder = build_iter_builder(&schema, &memtable, None, true, MergeMode::LastRow, None);
2078
2079        let mut iter = builder.build_record_batch(None, None).unwrap();
2080        let rb = iter.next().transpose().unwrap().unwrap();
2081        assert_eq!(10, rb.num_rows());
2082
2083        let rb_schema = rb.schema();
2084        let col_names: Vec<_> = rb_schema
2085            .fields()
2086            .iter()
2087            .map(|f| f.name().as_str())
2088            .collect();
2089        assert_eq!(
2090            col_names,
2091            vec![
2092                "k0",
2093                "k1",
2094                "v0",
2095                "v1",
2096                "ts",
2097                "__primary_key",
2098                "__sequence",
2099                "__op_type",
2100            ]
2101        );
2102
2103        assert!(iter.next().is_none());
2104    }
2105
2106    #[test]
2107    fn test_iter_builder_build_record_batch_with_projection() {
2108        let schema = schema_for_test();
2109        let memtable = TimeSeriesMemtable::new(schema.clone(), 1, None, true, MergeMode::LastRow);
2110
2111        let kvs = build_key_values(&schema, "test".to_string(), 1, 5);
2112        memtable.write(&kvs).unwrap();
2113
2114        // Project only field v0 (column_id=3) and ts (column_id=2).
2115        let projection = vec![2, 3];
2116        let builder = build_iter_builder(
2117            &schema,
2118            &memtable,
2119            Some(&projection),
2120            true,
2121            MergeMode::LastRow,
2122            None,
2123        );
2124
2125        let mut iter = builder.build_record_batch(None, None).unwrap();
2126        let rb = iter.next().transpose().unwrap().unwrap();
2127        assert_eq!(5, rb.num_rows());
2128
2129        let rb_schema = rb.schema();
2130        let col_names: Vec<_> = rb_schema
2131            .fields()
2132            .iter()
2133            .map(|f| f.name().as_str())
2134            .collect();
2135        // Only projected columns + internal columns.
2136        assert_eq!(
2137            col_names,
2138            vec!["v0", "ts", "__primary_key", "__sequence", "__op_type",]
2139        );
2140
2141        assert!(iter.next().is_none());
2142    }
2143
2144    #[test]
2145    fn test_iter_builder_build_record_batch_multiple_series() {
2146        let schema = schema_for_test();
2147        let memtable = TimeSeriesMemtable::new(schema.clone(), 1, None, true, MergeMode::LastRow);
2148
2149        let kvs_a = build_key_values(&schema, "aaa".to_string(), 1, 3);
2150        let kvs_b = build_key_values(&schema, "bbb".to_string(), 2, 4);
2151        memtable.write(&kvs_a).unwrap();
2152        memtable.write(&kvs_b).unwrap();
2153
2154        let builder = build_iter_builder(&schema, &memtable, None, true, MergeMode::LastRow, None);
2155
2156        let iter = builder.build_record_batch(None, None).unwrap();
2157        let mut total_rows = 0;
2158        for rb in iter {
2159            let rb = rb.unwrap();
2160            total_rows += rb.num_rows();
2161            assert_eq!(8, rb.num_columns());
2162        }
2163        assert_eq!(7, total_rows);
2164    }
2165
2166    #[test]
2167    fn test_iter_builder_build_record_batch_dedup() {
2168        let schema = schema_for_test();
2169        let memtable = TimeSeriesMemtable::new(schema.clone(), 1, None, true, MergeMode::LastRow);
2170
2171        // Write same data twice — dedup should keep only one copy per timestamp.
2172        let kvs = build_key_values(&schema, "dup".to_string(), 10, 5);
2173        memtable.write(&kvs).unwrap();
2174        memtable.write(&kvs).unwrap();
2175
2176        let builder = build_iter_builder(&schema, &memtable, None, true, MergeMode::LastRow, None);
2177
2178        let iter = builder.build_record_batch(None, None).unwrap();
2179        let total_rows: usize = iter.map(|rb| rb.unwrap().num_rows()).sum();
2180        assert_eq!(5, total_rows);
2181    }
2182
2183    #[test]
2184    fn test_iter_builder_build_record_batch_no_dedup() {
2185        let schema = schema_for_test();
2186        let memtable = TimeSeriesMemtable::new(schema.clone(), 1, None, false, MergeMode::LastRow);
2187
2188        let kvs = build_key_values(&schema, "dup".to_string(), 10, 5);
2189        memtable.write(&kvs).unwrap();
2190        memtable.write(&kvs).unwrap();
2191
2192        let builder = build_iter_builder(&schema, &memtable, None, false, MergeMode::LastRow, None);
2193
2194        let iter = builder.build_record_batch(None, None).unwrap();
2195        let total_rows: usize = iter.map(|rb| rb.unwrap().num_rows()).sum();
2196        assert_eq!(10, total_rows);
2197    }
2198
2199    #[test]
2200    fn test_iter_builder_build_record_batch_with_sequence_filter() {
2201        let schema = schema_for_test();
2202        let memtable = TimeSeriesMemtable::new(schema.clone(), 1, None, true, MergeMode::LastRow);
2203
2204        // build_key_values creates a mutation with base sequence=0.
2205        // Each row gets sequence = base + row_index, so 5 rows get sequences 0,1,2,3,4.
2206        let kvs = build_key_values(&schema, "seq".to_string(), 1, 5);
2207        memtable.write(&kvs).unwrap();
2208
2209        // Filter to sequence > 4 — should yield no rows.
2210        let builder = build_iter_builder(
2211            &schema,
2212            &memtable,
2213            None,
2214            true,
2215            MergeMode::LastRow,
2216            Some(SequenceRange::Gt { min: 4 }),
2217        );
2218
2219        let iter = builder.build_record_batch(None, None).unwrap();
2220        let total_rows: usize = iter.map(|rb| rb.unwrap().num_rows()).sum();
2221        assert_eq!(0, total_rows);
2222
2223        // Filter to sequence <= 2 — should yield 3 rows (sequences 0, 1, 2).
2224        let builder = build_iter_builder(
2225            &schema,
2226            &memtable,
2227            None,
2228            true,
2229            MergeMode::LastRow,
2230            Some(SequenceRange::LtEq { max: 2 }),
2231        );
2232
2233        let iter = builder.build_record_batch(None, None).unwrap();
2234        let total_rows: usize = iter.map(|rb| rb.unwrap().num_rows()).sum();
2235        assert_eq!(3, total_rows);
2236    }
2237
2238    #[test]
2239    fn test_iter_builder_build_record_batch_data_correctness() {
2240        use datatypes::arrow::array::{
2241            Float64Array, Int64Array, TimestampMillisecondArray, UInt8Array,
2242        };
2243
2244        let schema = schema_for_test();
2245        let memtable = TimeSeriesMemtable::new(schema.clone(), 1, None, true, MergeMode::LastRow);
2246
2247        let kvs = build_key_values(&schema, "check".to_string(), 7, 3);
2248        memtable.write(&kvs).unwrap();
2249
2250        let builder = build_iter_builder(&schema, &memtable, None, true, MergeMode::LastRow, None);
2251
2252        let mut iter = builder.build_record_batch(None, None).unwrap();
2253        let rb = iter.next().transpose().unwrap().unwrap();
2254        assert_eq!(3, rb.num_rows());
2255
2256        // Verify timestamp values.
2257        let ts_col = rb
2258            .column_by_name("ts")
2259            .unwrap()
2260            .as_any()
2261            .downcast_ref::<TimestampMillisecondArray>()
2262            .unwrap();
2263        let timestamps: Vec<_> = (0..ts_col.len()).map(|i| ts_col.value(i)).collect();
2264        assert_eq!(vec![0, 1, 2], timestamps);
2265
2266        // Verify field v0 values.
2267        let v0_col = rb
2268            .column_by_name("v0")
2269            .unwrap()
2270            .as_any()
2271            .downcast_ref::<Int64Array>()
2272            .unwrap();
2273        let v0_values: Vec<_> = (0..v0_col.len()).map(|i| v0_col.value(i)).collect();
2274        assert_eq!(vec![0, 1, 2], v0_values);
2275
2276        // Verify field v1 values.
2277        let v1_col = rb
2278            .column_by_name("v1")
2279            .unwrap()
2280            .as_any()
2281            .downcast_ref::<Float64Array>()
2282            .unwrap();
2283        let v1_values: Vec<_> = (0..v1_col.len()).map(|i| v1_col.value(i)).collect();
2284        assert_eq!(vec![0.0, 1.0, 2.0], v1_values);
2285
2286        // Verify op_type is all Put (1).
2287        let op_col = rb
2288            .column_by_name("__op_type")
2289            .unwrap()
2290            .as_any()
2291            .downcast_ref::<UInt8Array>()
2292            .unwrap();
2293        for i in 0..op_col.len() {
2294            assert_eq!(OpType::Put as u8, op_col.value(i));
2295        }
2296
2297        assert!(iter.next().is_none());
2298    }
2299}