1use std::collections::btree_map::Entry;
16use std::collections::{BTreeMap, Bound, HashSet};
17use std::fmt::{Debug, Formatter};
18use std::iter;
19use std::sync::atomic::{AtomicI64, AtomicU64, AtomicUsize, Ordering};
20use std::sync::{Arc, RwLock};
21use std::time::{Duration, Instant};
22
23use api::v1::OpType;
24use common_recordbatch::filter::SimpleFilterEvaluator;
25use common_telemetry::{debug, error};
26use common_time::Timestamp;
27use datatypes::arrow;
28use datatypes::arrow::array::ArrayRef;
29use datatypes::arrow_array::StringArray;
30use datatypes::data_type::{ConcreteDataType, DataType};
31use datatypes::prelude::{ScalarVector, Vector, VectorRef};
32use datatypes::types::TimestampType;
33use datatypes::value::{Value, ValueRef};
34use datatypes::vectors::{
35 Helper, TimestampMicrosecondVector, TimestampMillisecondVector, TimestampNanosecondVector,
36 TimestampSecondVector, UInt8Vector, UInt64Vector,
37};
38use mito_codec::key_values::KeyValue;
39use mito_codec::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt};
40use snafu::{OptionExt, ResultExt, ensure};
41use store_api::metadata::RegionMetadataRef;
42use store_api::storage::{ColumnId, SequenceRange};
43use table::predicate::Predicate;
44
45use crate::error::{
46 self, ComputeArrowSnafu, ConvertVectorSnafu, EncodeSnafu, PrimaryKeyLengthMismatchSnafu, Result,
47};
48use crate::flush::WriteBufferManagerRef;
49use crate::memtable::builder::{FieldBuilder, StringBuilder};
50use crate::memtable::bulk::part::BulkPart;
51use crate::memtable::simple_bulk_memtable::SimpleBulkMemtable;
52use crate::memtable::stats::WriteMetrics;
53use crate::memtable::{
54 AllocTracker, BatchToRecordBatchContext, BoxedBatchIterator, IterBuilder, KeyValues,
55 MemScanMetrics, Memtable, MemtableBuilder, MemtableId, MemtableRange, MemtableRangeContext,
56 MemtableRanges, MemtableRef, MemtableStats, RangesOptions, read_column_ids_from_projection,
57};
58use crate::metrics::{
59 MEMTABLE_ACTIVE_FIELD_BUILDER_COUNT, MEMTABLE_ACTIVE_SERIES_COUNT, READ_ROWS_TOTAL,
60 READ_STAGE_ELAPSED,
61};
62use crate::read::dedup::LastNonNullIter;
63use crate::read::{Batch, BatchBuilder, BatchColumn};
64use crate::region::options::MergeMode;
65
66const INITIAL_BUILDER_CAPACITY: usize = 4;
68
69const BUILDER_CAPACITY: usize = 512;
71
72#[derive(Debug, Default)]
74pub struct TimeSeriesMemtableBuilder {
75 write_buffer_manager: Option<WriteBufferManagerRef>,
76 dedup: bool,
77 merge_mode: MergeMode,
78}
79
80impl TimeSeriesMemtableBuilder {
81 pub fn new(
83 write_buffer_manager: Option<WriteBufferManagerRef>,
84 dedup: bool,
85 merge_mode: MergeMode,
86 ) -> Self {
87 Self {
88 write_buffer_manager,
89 dedup,
90 merge_mode,
91 }
92 }
93}
94
95impl MemtableBuilder for TimeSeriesMemtableBuilder {
96 fn build(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef {
97 if metadata.primary_key.is_empty() {
98 Arc::new(SimpleBulkMemtable::new(
99 id,
100 metadata.clone(),
101 self.write_buffer_manager.clone(),
102 self.dedup,
103 self.merge_mode,
104 ))
105 } else {
106 Arc::new(TimeSeriesMemtable::new(
107 metadata.clone(),
108 id,
109 self.write_buffer_manager.clone(),
110 self.dedup,
111 self.merge_mode,
112 ))
113 }
114 }
115
116 fn use_bulk_insert(&self, _metadata: &RegionMetadataRef) -> bool {
117 false
120 }
121}
122
123pub struct TimeSeriesMemtable {
125 id: MemtableId,
126 region_metadata: RegionMetadataRef,
127 row_codec: Arc<DensePrimaryKeyCodec>,
128 series_set: SeriesSet,
129 alloc_tracker: AllocTracker,
130 max_timestamp: AtomicI64,
131 min_timestamp: AtomicI64,
132 max_sequence: AtomicU64,
133 dedup: bool,
134 merge_mode: MergeMode,
135 num_rows: AtomicUsize,
137}
138
139impl TimeSeriesMemtable {
140 pub fn new(
141 region_metadata: RegionMetadataRef,
142 id: MemtableId,
143 write_buffer_manager: Option<WriteBufferManagerRef>,
144 dedup: bool,
145 merge_mode: MergeMode,
146 ) -> Self {
147 let row_codec = Arc::new(DensePrimaryKeyCodec::new(®ion_metadata));
148 let series_set = SeriesSet::new(region_metadata.clone(), row_codec.clone());
149 Self {
150 id,
151 region_metadata,
152 series_set,
153 row_codec,
154 alloc_tracker: AllocTracker::new(write_buffer_manager),
155 max_timestamp: AtomicI64::new(i64::MIN),
156 min_timestamp: AtomicI64::new(i64::MAX),
157 max_sequence: AtomicU64::new(0),
158 dedup,
159 merge_mode,
160 num_rows: Default::default(),
161 }
162 }
163
164 fn update_stats(&self, stats: WriteMetrics) {
166 self.alloc_tracker
167 .on_allocation(stats.key_bytes + stats.value_bytes);
168 self.max_timestamp.fetch_max(stats.max_ts, Ordering::SeqCst);
169 self.min_timestamp.fetch_min(stats.min_ts, Ordering::SeqCst);
170 self.max_sequence
171 .fetch_max(stats.max_sequence, Ordering::SeqCst);
172 self.num_rows.fetch_add(stats.num_rows, Ordering::SeqCst);
173 }
174
175 fn write_key_value(&self, kv: KeyValue, stats: &mut WriteMetrics) -> Result<()> {
176 ensure!(
177 self.row_codec.num_fields() == kv.num_primary_keys(),
178 PrimaryKeyLengthMismatchSnafu {
179 expect: self.row_codec.num_fields(),
180 actual: kv.num_primary_keys(),
181 }
182 );
183
184 let primary_key_encoded = self
185 .row_codec
186 .encode(kv.primary_keys())
187 .context(EncodeSnafu)?;
188
189 let (key_allocated, value_allocated) =
190 self.series_set.push_to_series(primary_key_encoded, &kv);
191 stats.key_bytes += key_allocated;
192 stats.value_bytes += value_allocated;
193
194 let ts = kv
196 .timestamp()
197 .try_into_timestamp()
198 .unwrap()
199 .unwrap()
200 .value();
201 stats.min_ts = stats.min_ts.min(ts);
202 stats.max_ts = stats.max_ts.max(ts);
203 Ok(())
204 }
205}
206
207impl Debug for TimeSeriesMemtable {
208 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
209 f.debug_struct("TimeSeriesMemtable").finish()
210 }
211}
212
213impl Memtable for TimeSeriesMemtable {
214 fn id(&self) -> MemtableId {
215 self.id
216 }
217
218 fn write(&self, kvs: &KeyValues) -> Result<()> {
219 if kvs.is_empty() {
220 return Ok(());
221 }
222
223 let mut local_stats = WriteMetrics::default();
224
225 for kv in kvs.iter() {
226 self.write_key_value(kv, &mut local_stats)?;
227 }
228 local_stats.value_bytes += kvs.num_rows() * std::mem::size_of::<Timestamp>();
229 local_stats.value_bytes += kvs.num_rows() * std::mem::size_of::<OpType>();
230 local_stats.max_sequence = kvs.max_sequence();
231 local_stats.num_rows = kvs.num_rows();
232 self.update_stats(local_stats);
236 Ok(())
237 }
238
239 fn write_one(&self, key_value: KeyValue) -> Result<()> {
240 let mut metrics = WriteMetrics::default();
241 let res = self.write_key_value(key_value, &mut metrics);
242 metrics.value_bytes += std::mem::size_of::<Timestamp>() + std::mem::size_of::<OpType>();
243 metrics.max_sequence = key_value.sequence();
244 metrics.num_rows = 1;
245
246 if res.is_ok() {
247 self.update_stats(metrics);
248 }
249 res
250 }
251
252 fn write_bulk(&self, part: BulkPart) -> Result<()> {
253 let mutation = part.to_mutation(&self.region_metadata)?;
255 let mut metrics = WriteMetrics::default();
256 if let Some(key_values) = KeyValues::new(&self.region_metadata, mutation) {
257 for kv in key_values.iter() {
258 self.write_key_value(kv, &mut metrics)?
259 }
260 }
261
262 metrics.max_sequence = part.sequence;
263 metrics.max_ts = part.max_timestamp;
264 metrics.min_ts = part.min_timestamp;
265 metrics.num_rows = part.num_rows();
266 self.update_stats(metrics);
267 Ok(())
268 }
269
270 fn ranges(
271 &self,
272 projection: Option<&[ColumnId]>,
273 options: RangesOptions,
274 ) -> Result<MemtableRanges> {
275 let predicate = options.predicate;
276 let sequence = options.sequence;
277 let read_column_ids = read_column_ids_from_projection(&self.region_metadata, projection);
278 let projection = if let Some(projection) = projection {
279 projection.iter().copied().collect()
280 } else {
281 self.region_metadata
282 .field_columns()
283 .map(|c| c.column_id)
284 .collect()
285 };
286 let builder = Box::new(TimeSeriesIterBuilder {
287 series_set: self.series_set.clone(),
288 projection,
289 predicate: predicate.predicate().cloned(),
290 dedup: self.dedup,
291 merge_mode: self.merge_mode,
292 sequence,
293 });
294 let adapter_context = Arc::new(BatchToRecordBatchContext::new(
295 self.region_metadata.clone(),
296 read_column_ids,
297 ));
298 let context = Arc::new(MemtableRangeContext::new_with_batch_to_record_batch(
299 self.id,
300 builder,
301 predicate,
302 Some(adapter_context),
303 ));
304
305 let range_stats = self.stats();
306 let range = MemtableRange::new(context, range_stats);
307 Ok(MemtableRanges {
308 ranges: [(0, range)].into(),
309 })
310 }
311
312 fn is_empty(&self) -> bool {
313 self.series_set.series.read().unwrap().0.is_empty()
314 }
315
316 fn freeze(&self) -> Result<()> {
317 self.alloc_tracker.done_allocating();
318
319 Ok(())
320 }
321
322 fn stats(&self) -> MemtableStats {
323 let estimated_bytes = self.alloc_tracker.bytes_allocated();
324
325 if estimated_bytes == 0 {
326 return MemtableStats {
328 estimated_bytes,
329 time_range: None,
330 num_rows: 0,
331 num_ranges: 0,
332 max_sequence: 0,
333 series_count: 0,
334 };
335 }
336 let ts_type = self
337 .region_metadata
338 .time_index_column()
339 .column_schema
340 .data_type
341 .clone()
342 .as_timestamp()
343 .expect("Timestamp column must have timestamp type");
344 let max_timestamp = ts_type.create_timestamp(self.max_timestamp.load(Ordering::Relaxed));
345 let min_timestamp = ts_type.create_timestamp(self.min_timestamp.load(Ordering::Relaxed));
346 let series_count = self.series_set.series.read().unwrap().0.len();
347 MemtableStats {
348 estimated_bytes,
349 time_range: Some((min_timestamp, max_timestamp)),
350 num_rows: self.num_rows.load(Ordering::Relaxed),
351 num_ranges: 1,
352 max_sequence: self.max_sequence.load(Ordering::Relaxed),
353 series_count,
354 }
355 }
356
357 fn fork(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef {
358 Arc::new(TimeSeriesMemtable::new(
359 metadata.clone(),
360 id,
361 self.alloc_tracker.write_buffer_manager(),
362 self.dedup,
363 self.merge_mode,
364 ))
365 }
366}
367
368#[derive(Default)]
369struct SeriesMap(BTreeMap<Vec<u8>, Arc<RwLock<Series>>>);
370
371impl Drop for SeriesMap {
372 fn drop(&mut self) {
373 let num_series = self.0.len();
374 let num_field_builders = self
375 .0
376 .values()
377 .map(|v| v.read().unwrap().active.num_field_builders())
378 .sum::<usize>();
379 MEMTABLE_ACTIVE_SERIES_COUNT.sub(num_series as i64);
380 MEMTABLE_ACTIVE_FIELD_BUILDER_COUNT.sub(num_field_builders as i64);
381 }
382}
383
384#[derive(Clone)]
385pub(crate) struct SeriesSet {
386 region_metadata: RegionMetadataRef,
387 series: Arc<RwLock<SeriesMap>>,
388 codec: Arc<DensePrimaryKeyCodec>,
389}
390
391impl SeriesSet {
392 fn new(region_metadata: RegionMetadataRef, codec: Arc<DensePrimaryKeyCodec>) -> Self {
393 Self {
394 region_metadata,
395 series: Default::default(),
396 codec,
397 }
398 }
399}
400
401impl SeriesSet {
402 fn push_to_series(&self, primary_key: Vec<u8>, kv: &KeyValue) -> (usize, usize) {
404 if let Some(series) = self.series.read().unwrap().0.get(&primary_key) {
405 let value_allocated = series.write().unwrap().push(
406 kv.timestamp(),
407 kv.sequence(),
408 kv.op_type(),
409 kv.fields(),
410 );
411 return (0, value_allocated);
412 };
413
414 let mut indices = self.series.write().unwrap();
415 match indices.0.entry(primary_key) {
416 Entry::Vacant(v) => {
417 let key_len = v.key().len();
418 let mut series = Series::new(&self.region_metadata);
419 let value_allocated =
420 series.push(kv.timestamp(), kv.sequence(), kv.op_type(), kv.fields());
421 v.insert(Arc::new(RwLock::new(series)));
422 (key_len, value_allocated)
423 }
424 Entry::Occupied(v) => {
426 let value_allocated = v.get().write().unwrap().push(
427 kv.timestamp(),
428 kv.sequence(),
429 kv.op_type(),
430 kv.fields(),
431 );
432 (0, value_allocated)
433 }
434 }
435 }
436
437 #[cfg(test)]
438 fn get_series(&self, primary_key: &[u8]) -> Option<Arc<RwLock<Series>>> {
439 self.series.read().unwrap().0.get(primary_key).cloned()
440 }
441
442 fn iter_series(
444 &self,
445 projection: HashSet<ColumnId>,
446 predicate: Option<Predicate>,
447 dedup: bool,
448 merge_mode: MergeMode,
449 sequence: Option<SequenceRange>,
450 mem_scan_metrics: Option<MemScanMetrics>,
451 ) -> Result<Iter> {
452 let primary_key_schema = primary_key_schema(&self.region_metadata);
453 let primary_key_datatypes = self
454 .region_metadata
455 .primary_key_columns()
456 .map(|pk| pk.column_schema.data_type.clone())
457 .collect();
458
459 Iter::try_new(
460 self.region_metadata.clone(),
461 self.series.clone(),
462 projection,
463 predicate,
464 primary_key_schema,
465 primary_key_datatypes,
466 self.codec.clone(),
467 dedup,
468 merge_mode,
469 sequence,
470 mem_scan_metrics,
471 )
472 }
473}
474
475pub(crate) fn primary_key_schema(
478 region_metadata: &RegionMetadataRef,
479) -> arrow::datatypes::SchemaRef {
480 let fields = region_metadata
481 .primary_key_columns()
482 .map(|pk| {
483 arrow::datatypes::Field::new(
484 pk.column_schema.name.clone(),
485 pk.column_schema.data_type.as_arrow_type(),
486 pk.column_schema.is_nullable(),
487 )
488 })
489 .collect::<Vec<_>>();
490 Arc::new(arrow::datatypes::Schema::new(fields))
491}
492
493#[derive(Debug, Default)]
495struct Metrics {
496 total_series: usize,
498 num_pruned_series: usize,
500 num_rows: usize,
502 num_batches: usize,
504 scan_cost: Duration,
506}
507
508struct Iter {
509 metadata: RegionMetadataRef,
510 series: Arc<RwLock<SeriesMap>>,
511 projection: HashSet<ColumnId>,
512 last_key: Option<Vec<u8>>,
513 predicate: Vec<SimpleFilterEvaluator>,
514 pk_schema: arrow::datatypes::SchemaRef,
515 pk_datatypes: Vec<ConcreteDataType>,
516 codec: Arc<DensePrimaryKeyCodec>,
517 dedup: bool,
518 merge_mode: MergeMode,
519 sequence: Option<SequenceRange>,
520 metrics: Metrics,
521 mem_scan_metrics: Option<MemScanMetrics>,
522}
523
524impl Iter {
525 #[allow(clippy::too_many_arguments)]
526 pub(crate) fn try_new(
527 metadata: RegionMetadataRef,
528 series: Arc<RwLock<SeriesMap>>,
529 projection: HashSet<ColumnId>,
530 predicate: Option<Predicate>,
531 pk_schema: arrow::datatypes::SchemaRef,
532 pk_datatypes: Vec<ConcreteDataType>,
533 codec: Arc<DensePrimaryKeyCodec>,
534 dedup: bool,
535 merge_mode: MergeMode,
536 sequence: Option<SequenceRange>,
537 mem_scan_metrics: Option<MemScanMetrics>,
538 ) -> Result<Self> {
539 let predicate = predicate
540 .map(|predicate| {
541 predicate
542 .exprs()
543 .iter()
544 .filter_map(SimpleFilterEvaluator::try_new)
545 .collect::<Vec<_>>()
546 })
547 .unwrap_or_default();
548 Ok(Self {
549 metadata,
550 series,
551 projection,
552 last_key: None,
553 predicate,
554 pk_schema,
555 pk_datatypes,
556 codec,
557 dedup,
558 merge_mode,
559 sequence,
560 metrics: Metrics::default(),
561 mem_scan_metrics,
562 })
563 }
564
565 fn report_mem_scan_metrics(&mut self) {
566 if let Some(mem_scan_metrics) = self.mem_scan_metrics.take() {
567 let inner = crate::memtable::MemScanMetricsData {
568 total_series: self.metrics.total_series,
569 num_rows: self.metrics.num_rows,
570 num_batches: self.metrics.num_batches,
571 scan_cost: self.metrics.scan_cost,
572 };
573 mem_scan_metrics.merge_inner(&inner);
574 }
575 }
576}
577
578impl Drop for Iter {
579 fn drop(&mut self) {
580 debug!(
581 "Iter {} time series memtable, metrics: {:?}",
582 self.metadata.region_id, self.metrics
583 );
584
585 self.report_mem_scan_metrics();
587
588 READ_ROWS_TOTAL
589 .with_label_values(&["time_series_memtable"])
590 .inc_by(self.metrics.num_rows as u64);
591 READ_STAGE_ELAPSED
592 .with_label_values(&["scan_memtable"])
593 .observe(self.metrics.scan_cost.as_secs_f64());
594 }
595}
596
597impl Iterator for Iter {
598 type Item = Result<Batch>;
599
600 fn next(&mut self) -> Option<Self::Item> {
601 let start = Instant::now();
602 let map = self.series.read().unwrap();
603 let range = match &self.last_key {
604 None => map.0.range::<Vec<u8>, _>(..),
605 Some(last_key) => map
606 .0
607 .range::<Vec<u8>, _>((Bound::Excluded(last_key), Bound::Unbounded)),
608 };
609
610 for (primary_key, series) in range {
612 self.metrics.total_series += 1;
613
614 let mut series = series.write().unwrap();
615 if !self.predicate.is_empty()
616 && !prune_primary_key(
617 &self.codec,
618 primary_key.as_slice(),
619 &mut series,
620 &self.pk_datatypes,
621 self.pk_schema.clone(),
622 &self.predicate,
623 )
624 {
625 self.metrics.num_pruned_series += 1;
627 continue;
628 }
629 self.last_key = Some(primary_key.clone());
630
631 let values = series.compact(&self.metadata);
632 let batch = values.and_then(|v| {
633 v.to_batch(
634 primary_key,
635 &self.metadata,
636 &self.projection,
637 self.sequence,
638 self.dedup,
639 self.merge_mode,
640 )
641 });
642
643 self.metrics.num_batches += 1;
645 self.metrics.num_rows += batch.as_ref().map(|b| b.num_rows()).unwrap_or(0);
646 self.metrics.scan_cost += start.elapsed();
647
648 return Some(batch);
649 }
650 drop(map); self.metrics.scan_cost += start.elapsed();
652
653 self.report_mem_scan_metrics();
655
656 None
657 }
658}
659
660fn prune_primary_key(
661 codec: &Arc<DensePrimaryKeyCodec>,
662 pk: &[u8],
663 series: &mut Series,
664 datatypes: &[ConcreteDataType],
665 pk_schema: arrow::datatypes::SchemaRef,
666 predicates: &[SimpleFilterEvaluator],
667) -> bool {
668 if pk_schema.fields().is_empty() {
670 return true;
671 }
672
673 let pk_values = if let Some(pk_values) = series.pk_cache.as_ref() {
675 pk_values
676 } else {
677 let pk_values = codec.decode_dense_without_column_id(pk);
678 if let Err(e) = pk_values {
679 error!(e; "Failed to decode primary key");
680 return true;
681 }
682 series.update_pk_cache(pk_values.unwrap());
683 series.pk_cache.as_ref().unwrap()
684 };
685
686 let mut result = true;
688 for predicate in predicates {
689 let Ok(index) = pk_schema.index_of(predicate.column_name()) else {
691 continue;
692 };
693 let scalar_value = pk_values[index]
695 .try_to_scalar_value(&datatypes[index])
696 .unwrap();
697 result &= predicate.evaluate_scalar(&scalar_value).unwrap_or(true);
698 }
699
700 result
701}
702
703pub struct Series {
705 pk_cache: Option<Vec<Value>>,
706 active: ValueBuilder,
707 frozen: Vec<Values>,
708 region_metadata: RegionMetadataRef,
709 capacity: usize,
710}
711
712impl Series {
713 pub(crate) fn with_capacity(
714 region_metadata: &RegionMetadataRef,
715 init_capacity: usize,
716 capacity: usize,
717 ) -> Self {
718 MEMTABLE_ACTIVE_SERIES_COUNT.inc();
719 Self {
720 pk_cache: None,
721 active: ValueBuilder::new(region_metadata, init_capacity),
722 frozen: vec![],
723 region_metadata: region_metadata.clone(),
724 capacity,
725 }
726 }
727
728 pub(crate) fn new(region_metadata: &RegionMetadataRef) -> Self {
729 Self::with_capacity(region_metadata, INITIAL_BUILDER_CAPACITY, BUILDER_CAPACITY)
730 }
731
732 pub fn is_empty(&self) -> bool {
733 self.active.len() == 0 && self.frozen.is_empty()
734 }
735
736 pub(crate) fn push<'a>(
738 &mut self,
739 ts: ValueRef<'a>,
740 sequence: u64,
741 op_type: OpType,
742 values: impl Iterator<Item = ValueRef<'a>>,
743 ) -> usize {
744 if self.active.len() + 10 > self.capacity {
746 let region_metadata = self.region_metadata.clone();
747 self.freeze(®ion_metadata);
748 }
749 self.active.push(ts, sequence, op_type as u8, values)
750 }
751
752 fn update_pk_cache(&mut self, pk_values: Vec<Value>) {
753 self.pk_cache = Some(pk_values);
754 }
755
756 pub(crate) fn freeze(&mut self, region_metadata: &RegionMetadataRef) {
758 if self.active.len() != 0 {
759 let mut builder = ValueBuilder::new(region_metadata, INITIAL_BUILDER_CAPACITY);
760 std::mem::swap(&mut self.active, &mut builder);
761 self.frozen.push(Values::from(builder));
762 }
763 }
764
765 pub(crate) fn extend(
766 &mut self,
767 ts_v: VectorRef,
768 op_type_v: u8,
769 sequence_v: u64,
770 fields: Vec<VectorRef>,
771 ) -> Result<()> {
772 if !self.active.can_accommodate(&fields)? {
773 let region_metadata = self.region_metadata.clone();
774 self.freeze(®ion_metadata);
775 }
776 self.active.extend(ts_v, op_type_v, sequence_v, fields)
777 }
778
779 pub(crate) fn compact(&mut self, region_metadata: &RegionMetadataRef) -> Result<&Values> {
782 self.freeze(region_metadata);
783
784 let frozen = &self.frozen;
785
786 debug_assert!(!frozen.is_empty());
788
789 if frozen.len() > 1 {
790 let column_size = frozen[0].fields.len() + 3;
794
795 if cfg!(debug_assertions) {
796 debug_assert!(
797 frozen
798 .iter()
799 .zip(frozen.iter().skip(1))
800 .all(|(prev, next)| { prev.fields.len() == next.fields.len() })
801 );
802 }
803
804 let arrays = frozen.iter().map(|v| v.columns()).collect::<Vec<_>>();
805 let concatenated = (0..column_size)
806 .map(|i| {
807 let to_concat = arrays.iter().map(|a| a[i].as_ref()).collect::<Vec<_>>();
808 arrow::compute::concat(&to_concat)
809 })
810 .collect::<std::result::Result<Vec<_>, _>>()
811 .context(ComputeArrowSnafu)?;
812
813 debug_assert_eq!(concatenated.len(), column_size);
814 let values = Values::from_columns(&concatenated)?;
815 self.frozen = vec![values];
816 };
817 Ok(&self.frozen[0])
818 }
819
820 pub fn read_to_values(&self) -> Vec<Values> {
821 let mut res = Vec::with_capacity(self.frozen.len() + 1);
822 res.extend(self.frozen.iter().cloned());
823 res.push(self.active.finish_cloned());
824 res
825 }
826}
827
828pub(crate) struct ValueBuilder {
830 timestamp: Vec<i64>,
831 timestamp_type: ConcreteDataType,
832 sequence: Vec<u64>,
833 op_type: Vec<u8>,
834 fields: Vec<Option<FieldBuilder>>,
835 field_types: Vec<ConcreteDataType>,
836}
837
838impl ValueBuilder {
839 pub(crate) fn new(region_metadata: &RegionMetadataRef, capacity: usize) -> Self {
840 let timestamp_type = region_metadata
841 .time_index_column()
842 .column_schema
843 .data_type
844 .clone();
845 let sequence = Vec::with_capacity(capacity);
846 let op_type = Vec::with_capacity(capacity);
847
848 let field_types = region_metadata
849 .field_columns()
850 .map(|c| c.column_schema.data_type.clone())
851 .collect::<Vec<_>>();
852 let fields = (0..field_types.len()).map(|_| None).collect();
853 Self {
854 timestamp: Vec::with_capacity(capacity),
855 timestamp_type,
856 sequence,
857 op_type,
858 fields,
859 field_types,
860 }
861 }
862
863 pub fn num_field_builders(&self) -> usize {
865 self.fields.iter().flatten().count()
866 }
867
868 pub(crate) fn push<'a>(
874 &mut self,
875 ts: ValueRef,
876 sequence: u64,
877 op_type: u8,
878 fields: impl Iterator<Item = ValueRef<'a>>,
879 ) -> usize {
880 #[cfg(debug_assertions)]
881 let fields = {
882 let field_vec = fields.collect::<Vec<_>>();
883 debug_assert_eq!(field_vec.len(), self.fields.len());
884 field_vec.into_iter()
885 };
886
887 self.timestamp
888 .push(ts.try_into_timestamp().unwrap().unwrap().value());
889 self.sequence.push(sequence);
890 self.op_type.push(op_type);
891 let num_rows = self.timestamp.len();
892 let mut size = 0;
893 for (idx, field_value) in fields.enumerate() {
894 size += field_value.data_size();
895 if !field_value.is_null() || self.fields[idx].is_some() {
896 if let Some(field) = self.fields[idx].as_mut() {
897 let _ = field.push(field_value);
898 } else {
899 let mut mutable_vector =
900 if let ConcreteDataType::String(_) = &self.field_types[idx] {
901 FieldBuilder::String(StringBuilder::with_capacity(4, 8))
902 } else {
903 FieldBuilder::Other(
904 self.field_types[idx]
905 .create_mutable_vector(num_rows.max(INITIAL_BUILDER_CAPACITY)),
906 )
907 };
908 mutable_vector.push_nulls(num_rows - 1);
909 mutable_vector
910 .push(field_value)
911 .unwrap_or_else(|e| panic!("unexpected field value: {e:?}"));
912 self.fields[idx] = Some(mutable_vector);
913 MEMTABLE_ACTIVE_FIELD_BUILDER_COUNT.inc();
914 }
915 }
916 }
917
918 size
919 }
920
921 pub(crate) fn can_accommodate(&self, fields: &[VectorRef]) -> Result<bool> {
924 for (field_src, field_dest) in fields.iter().zip(self.fields.iter()) {
925 let Some(builder) = field_dest else {
926 continue;
927 };
928 let FieldBuilder::String(builder) = builder else {
929 continue;
930 };
931 let array = field_src.to_arrow_array();
932 let string_array = array
933 .as_any()
934 .downcast_ref::<StringArray>()
935 .with_context(|| error::InvalidBatchSnafu {
936 reason: format!(
937 "Field type mismatch, expecting String, given: {}",
938 field_src.data_type()
939 ),
940 })?;
941 let space_needed = string_array.value_data().len() as i32;
942 if builder.next_offset().checked_add(space_needed).is_none() {
944 return Ok(false);
945 }
946 }
947 Ok(true)
948 }
949
950 pub(crate) fn extend(
951 &mut self,
952 ts_v: VectorRef,
953 op_type: u8,
954 sequence: u64,
955 fields: Vec<VectorRef>,
956 ) -> Result<()> {
957 let num_rows_before = self.timestamp.len();
958 let num_rows_to_write = ts_v.len();
959 self.timestamp.reserve(num_rows_to_write);
960 match self.timestamp_type {
961 ConcreteDataType::Timestamp(TimestampType::Second(_)) => {
962 self.timestamp.extend(
963 ts_v.as_any()
964 .downcast_ref::<TimestampSecondVector>()
965 .unwrap()
966 .iter_data()
967 .map(|v| v.unwrap().0.value()),
968 );
969 }
970 ConcreteDataType::Timestamp(TimestampType::Millisecond(_)) => {
971 self.timestamp.extend(
972 ts_v.as_any()
973 .downcast_ref::<TimestampMillisecondVector>()
974 .unwrap()
975 .iter_data()
976 .map(|v| v.unwrap().0.value()),
977 );
978 }
979 ConcreteDataType::Timestamp(TimestampType::Microsecond(_)) => {
980 self.timestamp.extend(
981 ts_v.as_any()
982 .downcast_ref::<TimestampMicrosecondVector>()
983 .unwrap()
984 .iter_data()
985 .map(|v| v.unwrap().0.value()),
986 );
987 }
988 ConcreteDataType::Timestamp(TimestampType::Nanosecond(_)) => {
989 self.timestamp.extend(
990 ts_v.as_any()
991 .downcast_ref::<TimestampNanosecondVector>()
992 .unwrap()
993 .iter_data()
994 .map(|v| v.unwrap().0.value()),
995 );
996 }
997 _ => unreachable!(),
998 };
999
1000 self.op_type.reserve(num_rows_to_write);
1001 self.op_type
1002 .extend(iter::repeat_n(op_type, num_rows_to_write));
1003 self.sequence.reserve(num_rows_to_write);
1004 self.sequence
1005 .extend(iter::repeat_n(sequence, num_rows_to_write));
1006
1007 for (field_idx, (field_src, field_dest)) in
1008 fields.into_iter().zip(self.fields.iter_mut()).enumerate()
1009 {
1010 let builder = field_dest.get_or_insert_with(|| {
1011 let mut field_builder =
1012 FieldBuilder::create(&self.field_types[field_idx], INITIAL_BUILDER_CAPACITY);
1013 field_builder.push_nulls(num_rows_before);
1014 field_builder
1015 });
1016 match builder {
1017 FieldBuilder::String(builder) => {
1018 let array = field_src.to_arrow_array();
1019 let string_array =
1020 array
1021 .as_any()
1022 .downcast_ref::<StringArray>()
1023 .with_context(|| error::InvalidBatchSnafu {
1024 reason: format!(
1025 "Field type mismatch, expecting String, given: {}",
1026 field_src.data_type()
1027 ),
1028 })?;
1029 builder.append_array(string_array);
1030 }
1031 FieldBuilder::Other(builder) => {
1032 let len = field_src.len();
1033 builder
1034 .extend_slice_of(&*field_src, 0, len)
1035 .context(error::ComputeVectorSnafu)?;
1036 }
1037 }
1038 }
1039 Ok(())
1040 }
1041
1042 fn len(&self) -> usize {
1044 let sequence_len = self.sequence.len();
1045 debug_assert_eq!(sequence_len, self.op_type.len());
1046 debug_assert_eq!(sequence_len, self.timestamp.len());
1047 sequence_len
1048 }
1049
1050 fn finish_cloned(&self) -> Values {
1051 let num_rows = self.sequence.len();
1052 let fields = self
1053 .fields
1054 .iter()
1055 .enumerate()
1056 .map(|(i, v)| {
1057 if let Some(v) = v {
1058 MEMTABLE_ACTIVE_FIELD_BUILDER_COUNT.dec();
1059 v.finish_cloned()
1060 } else {
1061 let mut single_null = self.field_types[i].create_mutable_vector(num_rows);
1062 single_null.push_nulls(num_rows);
1063 single_null.to_vector()
1064 }
1065 })
1066 .collect::<Vec<_>>();
1067
1068 let sequence = Arc::new(UInt64Vector::from_vec(self.sequence.clone()));
1069 let op_type = Arc::new(UInt8Vector::from_vec(self.op_type.clone()));
1070 let timestamp: VectorRef = match self.timestamp_type {
1071 ConcreteDataType::Timestamp(TimestampType::Second(_)) => {
1072 Arc::new(TimestampSecondVector::from_vec(self.timestamp.clone()))
1073 }
1074 ConcreteDataType::Timestamp(TimestampType::Millisecond(_)) => {
1075 Arc::new(TimestampMillisecondVector::from_vec(self.timestamp.clone()))
1076 }
1077 ConcreteDataType::Timestamp(TimestampType::Microsecond(_)) => {
1078 Arc::new(TimestampMicrosecondVector::from_vec(self.timestamp.clone()))
1079 }
1080 ConcreteDataType::Timestamp(TimestampType::Nanosecond(_)) => {
1081 Arc::new(TimestampNanosecondVector::from_vec(self.timestamp.clone()))
1082 }
1083 _ => unreachable!(),
1084 };
1085
1086 if cfg!(debug_assertions) {
1087 debug_assert_eq!(timestamp.len(), sequence.len());
1088 debug_assert_eq!(timestamp.len(), op_type.len());
1089 for field in &fields {
1090 debug_assert_eq!(timestamp.len(), field.len());
1091 }
1092 }
1093
1094 Values {
1095 timestamp,
1096 sequence,
1097 op_type,
1098 fields,
1099 }
1100 }
1101}
1102
1103#[derive(Clone)]
1105pub struct Values {
1106 pub(crate) timestamp: VectorRef,
1107 pub(crate) sequence: Arc<UInt64Vector>,
1108 pub(crate) op_type: Arc<UInt8Vector>,
1109 pub(crate) fields: Vec<VectorRef>,
1110}
1111
1112impl Values {
1113 pub fn to_batch(
1116 &self,
1117 primary_key: &[u8],
1118 metadata: &RegionMetadataRef,
1119 projection: &HashSet<ColumnId>,
1120 sequence: Option<SequenceRange>,
1121 dedup: bool,
1122 merge_mode: MergeMode,
1123 ) -> Result<Batch> {
1124 let builder = BatchBuilder::with_required_columns(
1125 primary_key.to_vec(),
1126 self.timestamp.clone(),
1127 self.sequence.clone(),
1128 self.op_type.clone(),
1129 );
1130
1131 let fields = metadata
1132 .field_columns()
1133 .zip(self.fields.iter())
1134 .filter_map(|(c, f)| {
1135 projection.get(&c.column_id).map(|c| BatchColumn {
1136 column_id: *c,
1137 data: f.clone(),
1138 })
1139 })
1140 .collect();
1141
1142 let mut batch = builder.with_fields(fields).build()?;
1143 batch.filter_by_sequence(sequence)?;
1147
1148 match (dedup, merge_mode) {
1149 (false, _) => batch.sort(false)?,
1151 (true, MergeMode::LastRow) => batch.sort(true)?,
1153 (true, MergeMode::LastNonNull) => {
1155 batch.sort(false)?;
1156 batch.merge_last_non_null()?;
1157 }
1158 }
1159 Ok(batch)
1160 }
1161
1162 fn columns(&self) -> Vec<ArrayRef> {
1164 let mut res = Vec::with_capacity(3 + self.fields.len());
1165 res.push(self.timestamp.to_arrow_array());
1166 res.push(self.sequence.to_arrow_array());
1167 res.push(self.op_type.to_arrow_array());
1168 res.extend(self.fields.iter().map(|f| f.to_arrow_array()));
1169 res
1170 }
1171
1172 fn from_columns(cols: &[ArrayRef]) -> Result<Self> {
1174 debug_assert!(cols.len() >= 3);
1175 let timestamp = Helper::try_into_vector(&cols[0]).context(ConvertVectorSnafu)?;
1176 let sequence =
1177 Arc::new(UInt64Vector::try_from_arrow_array(&cols[1]).context(ConvertVectorSnafu)?);
1178 let op_type =
1179 Arc::new(UInt8Vector::try_from_arrow_array(&cols[2]).context(ConvertVectorSnafu)?);
1180 let fields = Helper::try_into_vectors(&cols[3..]).context(ConvertVectorSnafu)?;
1181
1182 Ok(Self {
1183 timestamp,
1184 sequence,
1185 op_type,
1186 fields,
1187 })
1188 }
1189}
1190
1191impl From<ValueBuilder> for Values {
1192 fn from(mut value: ValueBuilder) -> Self {
1193 let num_rows = value.len();
1194 let fields = value
1195 .fields
1196 .iter_mut()
1197 .enumerate()
1198 .map(|(i, v)| {
1199 if let Some(v) = v {
1200 MEMTABLE_ACTIVE_FIELD_BUILDER_COUNT.dec();
1201 v.finish()
1202 } else {
1203 let mut single_null = value.field_types[i].create_mutable_vector(num_rows);
1204 single_null.push_nulls(num_rows);
1205 single_null.to_vector()
1206 }
1207 })
1208 .collect::<Vec<_>>();
1209
1210 let sequence = Arc::new(UInt64Vector::from_vec(value.sequence));
1211 let op_type = Arc::new(UInt8Vector::from_vec(value.op_type));
1212 let timestamp: VectorRef = match value.timestamp_type {
1213 ConcreteDataType::Timestamp(TimestampType::Second(_)) => {
1214 Arc::new(TimestampSecondVector::from_vec(value.timestamp))
1215 }
1216 ConcreteDataType::Timestamp(TimestampType::Millisecond(_)) => {
1217 Arc::new(TimestampMillisecondVector::from_vec(value.timestamp))
1218 }
1219 ConcreteDataType::Timestamp(TimestampType::Microsecond(_)) => {
1220 Arc::new(TimestampMicrosecondVector::from_vec(value.timestamp))
1221 }
1222 ConcreteDataType::Timestamp(TimestampType::Nanosecond(_)) => {
1223 Arc::new(TimestampNanosecondVector::from_vec(value.timestamp))
1224 }
1225 _ => unreachable!(),
1226 };
1227
1228 if cfg!(debug_assertions) {
1229 debug_assert_eq!(timestamp.len(), sequence.len());
1230 debug_assert_eq!(timestamp.len(), op_type.len());
1231 for field in &fields {
1232 debug_assert_eq!(timestamp.len(), field.len());
1233 }
1234 }
1235
1236 Self {
1237 timestamp,
1238 sequence,
1239 op_type,
1240 fields,
1241 }
1242 }
1243}
1244
1245struct TimeSeriesIterBuilder {
1246 series_set: SeriesSet,
1247 projection: HashSet<ColumnId>,
1248 predicate: Option<Predicate>,
1249 dedup: bool,
1250 sequence: Option<SequenceRange>,
1251 merge_mode: MergeMode,
1252}
1253
1254impl IterBuilder for TimeSeriesIterBuilder {
1255 fn build(&self, metrics: Option<MemScanMetrics>) -> Result<BoxedBatchIterator> {
1256 let iter = self.series_set.iter_series(
1257 self.projection.clone(),
1258 self.predicate.clone(),
1259 self.dedup,
1260 self.merge_mode,
1261 self.sequence,
1262 metrics,
1263 )?;
1264 if self.merge_mode == MergeMode::LastNonNull {
1265 let iter = LastNonNullIter::new(iter);
1266 Ok(Box::new(iter))
1267 } else {
1268 Ok(Box::new(iter))
1269 }
1270 }
1271}
1272
1273#[cfg(test)]
1274mod tests {
1275 use std::collections::{HashMap, HashSet};
1276
1277 use api::helper::ColumnDataTypeWrapper;
1278 use api::v1::helper::row;
1279 use api::v1::value::ValueData;
1280 use api::v1::{Mutation, Rows, SemanticType};
1281 use common_time::Timestamp;
1282 use datatypes::prelude::{ConcreteDataType, ScalarVector};
1283 use datatypes::schema::ColumnSchema;
1284 use datatypes::value::{OrderedFloat, Value};
1285 use datatypes::vectors::{Float64Vector, Int64Vector, TimestampMillisecondVector};
1286 use mito_codec::row_converter::SortField;
1287 use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
1288 use store_api::storage::RegionId;
1289
1290 use super::*;
1291 use crate::test_util::column_metadata_to_column_schema;
1292
1293 fn schema_for_test() -> RegionMetadataRef {
1294 let mut builder = RegionMetadataBuilder::new(RegionId::new(123, 456));
1295 builder
1296 .push_column_metadata(ColumnMetadata {
1297 column_schema: ColumnSchema::new("k0", ConcreteDataType::string_datatype(), false),
1298 semantic_type: SemanticType::Tag,
1299 column_id: 0,
1300 })
1301 .push_column_metadata(ColumnMetadata {
1302 column_schema: ColumnSchema::new("k1", ConcreteDataType::int64_datatype(), false),
1303 semantic_type: SemanticType::Tag,
1304 column_id: 1,
1305 })
1306 .push_column_metadata(ColumnMetadata {
1307 column_schema: ColumnSchema::new(
1308 "ts",
1309 ConcreteDataType::timestamp_millisecond_datatype(),
1310 false,
1311 ),
1312 semantic_type: SemanticType::Timestamp,
1313 column_id: 2,
1314 })
1315 .push_column_metadata(ColumnMetadata {
1316 column_schema: ColumnSchema::new("v0", ConcreteDataType::int64_datatype(), true),
1317 semantic_type: SemanticType::Field,
1318 column_id: 3,
1319 })
1320 .push_column_metadata(ColumnMetadata {
1321 column_schema: ColumnSchema::new("v1", ConcreteDataType::float64_datatype(), true),
1322 semantic_type: SemanticType::Field,
1323 column_id: 4,
1324 })
1325 .primary_key(vec![0, 1]);
1326 let region_metadata = builder.build().unwrap();
1327 Arc::new(region_metadata)
1328 }
1329
1330 fn ts_value_ref(val: i64) -> ValueRef<'static> {
1331 ValueRef::Timestamp(Timestamp::new_millisecond(val))
1332 }
1333
1334 fn field_value_ref(v0: i64, v1: f64) -> impl Iterator<Item = ValueRef<'static>> {
1335 vec![ValueRef::Int64(v0), ValueRef::Float64(OrderedFloat(v1))].into_iter()
1336 }
1337
1338 fn check_values(values: &Values, expect: &[(i64, u64, u8, i64, f64)]) {
1339 let ts = values
1340 .timestamp
1341 .as_any()
1342 .downcast_ref::<TimestampMillisecondVector>()
1343 .unwrap();
1344
1345 let v0 = values.fields[0]
1346 .as_any()
1347 .downcast_ref::<Int64Vector>()
1348 .unwrap();
1349 let v1 = values.fields[1]
1350 .as_any()
1351 .downcast_ref::<Float64Vector>()
1352 .unwrap();
1353 let read = ts
1354 .iter_data()
1355 .zip(values.sequence.iter_data())
1356 .zip(values.op_type.iter_data())
1357 .zip(v0.iter_data())
1358 .zip(v1.iter_data())
1359 .map(|((((ts, sequence), op_type), v0), v1)| {
1360 (
1361 ts.unwrap().0.value(),
1362 sequence.unwrap(),
1363 op_type.unwrap(),
1364 v0.unwrap(),
1365 v1.unwrap(),
1366 )
1367 })
1368 .collect::<Vec<_>>();
1369 assert_eq!(expect, &read);
1370 }
1371
1372 #[test]
1373 fn test_series() {
1374 let region_metadata = schema_for_test();
1375 let mut series = Series::new(®ion_metadata);
1376 series.push(ts_value_ref(1), 0, OpType::Put, field_value_ref(1, 10.1));
1377 series.push(ts_value_ref(2), 0, OpType::Put, field_value_ref(2, 10.2));
1378 assert_eq!(2, series.active.timestamp.len());
1379 assert_eq!(0, series.frozen.len());
1380
1381 let values = series.compact(®ion_metadata).unwrap();
1382 check_values(values, &[(1, 0, 1, 1, 10.1), (2, 0, 1, 2, 10.2)]);
1383 assert_eq!(0, series.active.timestamp.len());
1384 assert_eq!(1, series.frozen.len());
1385 }
1386
1387 #[test]
1388 fn test_series_with_nulls() {
1389 let region_metadata = schema_for_test();
1390 let mut series = Series::new(®ion_metadata);
1391 series.push(
1394 ts_value_ref(1),
1395 0,
1396 OpType::Put,
1397 vec![ValueRef::Null, ValueRef::Null].into_iter(),
1398 );
1399 series.push(
1400 ts_value_ref(1),
1401 0,
1402 OpType::Put,
1403 vec![ValueRef::Int64(1), ValueRef::Null].into_iter(),
1404 );
1405 series.push(ts_value_ref(1), 2, OpType::Put, field_value_ref(2, 10.2));
1406 series.push(
1407 ts_value_ref(1),
1408 3,
1409 OpType::Put,
1410 vec![ValueRef::Int64(2), ValueRef::Null].into_iter(),
1411 );
1412 assert_eq!(4, series.active.timestamp.len());
1413 assert_eq!(0, series.frozen.len());
1414
1415 let values = series.compact(®ion_metadata).unwrap();
1416 assert_eq!(values.fields[0].null_count(), 1);
1417 assert_eq!(values.fields[1].null_count(), 3);
1418 assert_eq!(0, series.active.timestamp.len());
1419 assert_eq!(1, series.frozen.len());
1420 }
1421
1422 fn check_value(batch: &Batch, expect: Vec<Vec<Value>>) {
1423 let ts_len = batch.timestamps().len();
1424 assert_eq!(batch.sequences().len(), ts_len);
1425 assert_eq!(batch.op_types().len(), ts_len);
1426 for f in batch.fields() {
1427 assert_eq!(f.data.len(), ts_len);
1428 }
1429
1430 let mut rows = vec![];
1431 for idx in 0..ts_len {
1432 let mut row = Vec::with_capacity(batch.fields().len() + 3);
1433 row.push(batch.timestamps().get(idx));
1434 row.push(batch.sequences().get(idx));
1435 row.push(batch.op_types().get(idx));
1436 row.extend(batch.fields().iter().map(|f| f.data.get(idx)));
1437 rows.push(row);
1438 }
1439
1440 assert_eq!(expect.len(), rows.len());
1441 for (idx, row) in rows.iter().enumerate() {
1442 assert_eq!(&expect[idx], row);
1443 }
1444 }
1445
1446 #[test]
1447 fn test_values_sort() {
1448 let schema = schema_for_test();
1449 let timestamp = Arc::new(TimestampMillisecondVector::from_vec(vec![1, 2, 3, 4, 3]));
1450 let sequence = Arc::new(UInt64Vector::from_vec(vec![1, 1, 1, 1, 2]));
1451 let op_type = Arc::new(UInt8Vector::from_vec(vec![1, 1, 1, 1, 0]));
1452
1453 let fields = vec![
1454 Arc::new(Int64Vector::from_vec(vec![4, 3, 2, 1, 2])) as Arc<_>,
1455 Arc::new(Float64Vector::from_vec(vec![1.1, 2.1, 4.2, 3.3, 4.2])) as Arc<_>,
1456 ];
1457 let values = Values {
1458 timestamp: timestamp as Arc<_>,
1459 sequence,
1460 op_type,
1461 fields,
1462 };
1463
1464 let batch = values
1465 .to_batch(
1466 b"test",
1467 &schema,
1468 &[0, 1, 2, 3, 4].into_iter().collect(),
1469 None,
1470 true,
1471 MergeMode::LastRow,
1472 )
1473 .unwrap();
1474 check_value(
1475 &batch,
1476 vec![
1477 vec![
1478 Value::Timestamp(Timestamp::new_millisecond(1)),
1479 Value::UInt64(1),
1480 Value::UInt8(1),
1481 Value::Int64(4),
1482 Value::Float64(OrderedFloat(1.1)),
1483 ],
1484 vec![
1485 Value::Timestamp(Timestamp::new_millisecond(2)),
1486 Value::UInt64(1),
1487 Value::UInt8(1),
1488 Value::Int64(3),
1489 Value::Float64(OrderedFloat(2.1)),
1490 ],
1491 vec![
1492 Value::Timestamp(Timestamp::new_millisecond(3)),
1493 Value::UInt64(2),
1494 Value::UInt8(0),
1495 Value::Int64(2),
1496 Value::Float64(OrderedFloat(4.2)),
1497 ],
1498 vec![
1499 Value::Timestamp(Timestamp::new_millisecond(4)),
1500 Value::UInt64(1),
1501 Value::UInt8(1),
1502 Value::Int64(1),
1503 Value::Float64(OrderedFloat(3.3)),
1504 ],
1505 ],
1506 )
1507 }
1508
1509 #[test]
1510 fn test_last_non_null_should_filter_by_sequence_before_merge_drop_ts() {
1511 let schema = schema_for_test();
1512 let projection: HashSet<_> = [0, 1, 2, 3, 4].into_iter().collect();
1513
1514 let timestamp = Arc::new(TimestampMillisecondVector::from_vec(vec![1, 1, 1]));
1521 let sequence = Arc::new(UInt64Vector::from_vec(vec![1, 2, 3]));
1522 let op_type = Arc::new(UInt8Vector::from_vec(vec![OpType::Put as u8; 3]));
1523 let fields = vec![
1524 Arc::new(Int64Vector::from(vec![None, Some(10), None])) as Arc<_>,
1525 Arc::new(Float64Vector::from(vec![Some(1.5), None, None])) as Arc<_>,
1526 ];
1527 let values = Values {
1528 timestamp: timestamp as Arc<_>,
1529 sequence,
1530 op_type,
1531 fields,
1532 };
1533
1534 let batch = values
1535 .to_batch(
1536 b"test",
1537 &schema,
1538 &projection,
1539 Some(SequenceRange::LtEq { max: 2 }),
1540 true,
1541 MergeMode::LastNonNull,
1542 )
1543 .unwrap();
1544
1545 check_value(
1546 &batch,
1547 vec![vec![
1548 Value::Timestamp(Timestamp::new_millisecond(1)),
1549 Value::UInt64(2),
1550 Value::UInt8(OpType::Put as u8),
1551 Value::Int64(10),
1552 Value::Float64(OrderedFloat(1.5)),
1553 ]],
1554 );
1555 }
1556
1557 #[test]
1558 fn test_last_non_null_should_filter_by_sequence_before_merge_no_fill_from_out_of_range_row() {
1559 let schema = schema_for_test();
1560 let projection: HashSet<_> = [0, 1, 2, 3, 4].into_iter().collect();
1561
1562 let timestamp = Arc::new(TimestampMillisecondVector::from_vec(vec![1, 1]));
1568 let sequence = Arc::new(UInt64Vector::from_vec(vec![1, 2]));
1569 let op_type = Arc::new(UInt8Vector::from_vec(vec![OpType::Put as u8; 2]));
1570 let fields = vec![
1571 Arc::new(Int64Vector::from(vec![Some(10), None])) as Arc<_>,
1572 Arc::new(Float64Vector::from(vec![Some(1.0), Some(1.0)])) as Arc<_>,
1573 ];
1574 let values = Values {
1575 timestamp: timestamp as Arc<_>,
1576 sequence,
1577 op_type,
1578 fields,
1579 };
1580
1581 let batch = values
1582 .to_batch(
1583 b"test",
1584 &schema,
1585 &projection,
1586 Some(SequenceRange::Gt { min: 1 }),
1587 true,
1588 MergeMode::LastNonNull,
1589 )
1590 .unwrap();
1591
1592 check_value(
1593 &batch,
1594 vec![vec![
1595 Value::Timestamp(Timestamp::new_millisecond(1)),
1596 Value::UInt64(2),
1597 Value::UInt8(OpType::Put as u8),
1598 Value::Null,
1599 Value::Float64(OrderedFloat(1.0)),
1600 ]],
1601 );
1602 }
1603
1604 fn build_key_values(schema: &RegionMetadataRef, k0: String, k1: i64, len: usize) -> KeyValues {
1605 let column_schema = schema
1606 .column_metadatas
1607 .iter()
1608 .map(|c| api::v1::ColumnSchema {
1609 column_name: c.column_schema.name.clone(),
1610 datatype: ColumnDataTypeWrapper::try_from(c.column_schema.data_type.clone())
1611 .unwrap()
1612 .datatype() as i32,
1613 semantic_type: c.semantic_type as i32,
1614 ..Default::default()
1615 })
1616 .collect();
1617
1618 let rows = (0..len)
1619 .map(|i| {
1620 row(vec![
1621 ValueData::StringValue(k0.clone()),
1622 ValueData::I64Value(k1),
1623 ValueData::TimestampMillisecondValue(i as i64),
1624 ValueData::I64Value(i as i64),
1625 ValueData::F64Value(i as f64),
1626 ])
1627 })
1628 .collect();
1629 let mutation = api::v1::Mutation {
1630 op_type: 1,
1631 sequence: 0,
1632 rows: Some(Rows {
1633 schema: column_schema,
1634 rows,
1635 }),
1636 write_hint: None,
1637 };
1638 KeyValues::new(schema.as_ref(), mutation).unwrap()
1639 }
1640
1641 #[test]
1642 fn test_series_set_concurrency() {
1643 let schema = schema_for_test();
1644 let row_codec = Arc::new(DensePrimaryKeyCodec::with_fields(
1645 schema
1646 .primary_key_columns()
1647 .map(|c| {
1648 (
1649 c.column_id,
1650 SortField::new(c.column_schema.data_type.clone()),
1651 )
1652 })
1653 .collect(),
1654 ));
1655 let set = Arc::new(SeriesSet::new(schema.clone(), row_codec));
1656
1657 let concurrency = 32;
1658 let pk_num = concurrency * 2;
1659 let mut handles = Vec::with_capacity(concurrency);
1660 for i in 0..concurrency {
1661 let set = set.clone();
1662 let schema = schema.clone();
1663 let column_schemas = schema
1664 .column_metadatas
1665 .iter()
1666 .map(column_metadata_to_column_schema)
1667 .collect::<Vec<_>>();
1668 let handle = std::thread::spawn(move || {
1669 for j in i * 100..(i + 1) * 100 {
1670 let pk = j % pk_num;
1671 let primary_key = format!("pk-{}", pk).as_bytes().to_vec();
1672
1673 let kvs = KeyValues::new(
1674 &schema,
1675 Mutation {
1676 op_type: OpType::Put as i32,
1677 sequence: j as u64,
1678 rows: Some(Rows {
1679 schema: column_schemas.clone(),
1680 rows: vec![row(vec![
1681 ValueData::StringValue(format!("{}", j)),
1682 ValueData::I64Value(j as i64),
1683 ValueData::TimestampMillisecondValue(j as i64),
1684 ValueData::I64Value(j as i64),
1685 ValueData::F64Value(j as f64),
1686 ])],
1687 }),
1688 write_hint: None,
1689 },
1690 )
1691 .unwrap();
1692 set.push_to_series(primary_key, &kvs.iter().next().unwrap());
1693 }
1694 });
1695 handles.push(handle);
1696 }
1697 for h in handles {
1698 h.join().unwrap();
1699 }
1700
1701 let mut timestamps = Vec::with_capacity(concurrency * 100);
1702 let mut sequences = Vec::with_capacity(concurrency * 100);
1703 let mut op_types = Vec::with_capacity(concurrency * 100);
1704 let mut v0 = Vec::with_capacity(concurrency * 100);
1705
1706 for i in 0..pk_num {
1707 let pk = format!("pk-{}", i).as_bytes().to_vec();
1708 let series = set.get_series(&pk).unwrap();
1709 let mut guard = series.write().unwrap();
1710 let values = guard.compact(&schema).unwrap();
1711 timestamps.extend(values.sequence.iter_data().map(|v| v.unwrap() as i64));
1712 sequences.extend(values.sequence.iter_data().map(|v| v.unwrap() as i64));
1713 op_types.extend(values.op_type.iter_data().map(|v| v.unwrap()));
1714 v0.extend(
1715 values
1716 .fields
1717 .first()
1718 .unwrap()
1719 .as_any()
1720 .downcast_ref::<Int64Vector>()
1721 .unwrap()
1722 .iter_data()
1723 .map(|v| v.unwrap()),
1724 );
1725 }
1726
1727 let expected_sequence = (0..(concurrency * 100) as i64).collect::<HashSet<_>>();
1728 assert_eq!(
1729 expected_sequence,
1730 sequences.iter().copied().collect::<HashSet<_>>()
1731 );
1732
1733 op_types.iter().all(|op| *op == OpType::Put as u8);
1734 assert_eq!(
1735 expected_sequence,
1736 timestamps.iter().copied().collect::<HashSet<_>>()
1737 );
1738
1739 assert_eq!(timestamps, sequences);
1740 assert_eq!(v0, timestamps);
1741 }
1742
1743 #[test]
1744 fn test_memtable() {
1745 common_telemetry::init_default_ut_logging();
1746 check_memtable_dedup(true);
1747 check_memtable_dedup(false);
1748 }
1749
1750 fn check_memtable_dedup(dedup: bool) {
1751 let schema = schema_for_test();
1752 let kvs = build_key_values(&schema, "hello".to_string(), 42, 100);
1753 let memtable = TimeSeriesMemtable::new(schema, 42, None, dedup, MergeMode::LastRow);
1754 memtable.write(&kvs).unwrap();
1755 memtable.write(&kvs).unwrap();
1756
1757 let mut expected_ts: HashMap<i64, usize> = HashMap::new();
1758 for ts in kvs.iter().map(|kv| {
1759 kv.timestamp()
1760 .try_into_timestamp()
1761 .unwrap()
1762 .unwrap()
1763 .value()
1764 }) {
1765 *expected_ts.entry(ts).or_default() += if dedup { 1 } else { 2 };
1766 }
1767
1768 let ranges = memtable.ranges(None, RangesOptions::default()).unwrap();
1769 let range = ranges.ranges.into_values().next().unwrap();
1770 let iter = range.build_iter().unwrap();
1771 let mut read = HashMap::new();
1772
1773 for ts in iter
1774 .flat_map(|batch| {
1775 batch
1776 .unwrap()
1777 .timestamps()
1778 .as_any()
1779 .downcast_ref::<TimestampMillisecondVector>()
1780 .unwrap()
1781 .iter_data()
1782 .collect::<Vec<_>>()
1783 .into_iter()
1784 })
1785 .map(|v| v.unwrap().0.value())
1786 {
1787 *read.entry(ts).or_default() += 1;
1788 }
1789 assert_eq!(expected_ts, read);
1790
1791 let stats = memtable.stats();
1792 assert!(stats.bytes_allocated() > 0);
1793 assert_eq!(
1794 Some((
1795 Timestamp::new_millisecond(0),
1796 Timestamp::new_millisecond(99)
1797 )),
1798 stats.time_range()
1799 );
1800 }
1801
1802 #[test]
1803 fn test_memtable_projection() {
1804 common_telemetry::init_default_ut_logging();
1805 let schema = schema_for_test();
1806 let kvs = build_key_values(&schema, "hello".to_string(), 42, 100);
1807 let memtable = TimeSeriesMemtable::new(schema, 42, None, true, MergeMode::LastRow);
1808 memtable.write(&kvs).unwrap();
1809
1810 let iter = memtable
1811 .ranges(Some(&[3]), RangesOptions::default())
1812 .unwrap()
1813 .build(None)
1814 .unwrap();
1815
1816 let mut v0_all = vec![];
1817
1818 for res in iter {
1819 let batch = res.unwrap();
1820 assert_eq!(1, batch.fields().len());
1821 let v0 = batch
1822 .fields()
1823 .first()
1824 .unwrap()
1825 .data
1826 .as_any()
1827 .downcast_ref::<Int64Vector>()
1828 .unwrap();
1829 v0_all.extend(v0.iter_data().map(|v| v.unwrap()));
1830 }
1831 assert_eq!((0..100i64).collect::<Vec<_>>(), v0_all);
1832 }
1833
1834 #[test]
1835 fn test_memtable_concurrent_write_read() {
1836 common_telemetry::init_default_ut_logging();
1837 let schema = schema_for_test();
1838 let memtable = Arc::new(TimeSeriesMemtable::new(
1839 schema.clone(),
1840 42,
1841 None,
1842 true,
1843 MergeMode::LastRow,
1844 ));
1845
1846 let num_writers = 10;
1848 let num_readers = 5;
1850 let series_per_writer = 100;
1852 let rows_per_series = 10;
1854 let total_series = num_writers * series_per_writer;
1856
1857 let barrier = Arc::new(std::sync::Barrier::new(num_writers + num_readers + 1));
1859
1860 let mut writer_handles = Vec::with_capacity(num_writers);
1862 for writer_id in 0..num_writers {
1863 let memtable = memtable.clone();
1864 let schema = schema.clone();
1865 let barrier = barrier.clone();
1866
1867 let handle = std::thread::spawn(move || {
1868 barrier.wait();
1870
1871 for series_id in 0..series_per_writer {
1873 let series_key = format!("writer-{}-series-{}", writer_id, series_id);
1874 let kvs =
1875 build_key_values(&schema, series_key, series_id as i64, rows_per_series);
1876 memtable.write(&kvs).unwrap();
1877 }
1878 });
1879
1880 writer_handles.push(handle);
1881 }
1882
1883 let mut reader_handles = Vec::with_capacity(num_readers);
1885 for _ in 0..num_readers {
1886 let memtable = memtable.clone();
1887 let barrier = barrier.clone();
1888
1889 let handle = std::thread::spawn(move || {
1890 barrier.wait();
1891
1892 for _ in 0..10 {
1893 let iter = memtable
1894 .ranges(None, RangesOptions::default())
1895 .unwrap()
1896 .build(None)
1897 .unwrap();
1898 for batch_result in iter {
1899 let _ = batch_result.unwrap();
1900 }
1901 }
1902 });
1903
1904 reader_handles.push(handle);
1905 }
1906
1907 barrier.wait();
1908
1909 for handle in writer_handles {
1910 handle.join().unwrap();
1911 }
1912 for handle in reader_handles {
1913 handle.join().unwrap();
1914 }
1915
1916 let iter = memtable
1917 .ranges(None, RangesOptions::default())
1918 .unwrap()
1919 .build(None)
1920 .unwrap();
1921 let mut series_count = 0;
1922 let mut row_count = 0;
1923
1924 for batch_result in iter {
1925 let batch = batch_result.unwrap();
1926 series_count += 1;
1927 row_count += batch.num_rows();
1928 }
1929 assert_eq!(total_series, series_count);
1930 assert_eq!(total_series * rows_per_series, row_count);
1931 }
1932
1933 #[test]
1934 fn test_build_record_batch_iter_from_memtable() {
1935 let schema = schema_for_test();
1936 let memtable = TimeSeriesMemtable::new(schema.clone(), 1, None, true, MergeMode::LastRow);
1937
1938 let kvs = build_key_values(&schema, "test".to_string(), 1, 10);
1939 memtable.write(&kvs).unwrap();
1940
1941 let read_column_ids: Vec<ColumnId> = schema
1942 .column_metadatas
1943 .iter()
1944 .map(|c| c.column_id)
1945 .collect();
1946 let ranges = memtable
1947 .ranges(Some(&read_column_ids), RangesOptions::default())
1948 .unwrap();
1949 assert_eq!(1, ranges.ranges.len());
1950
1951 let range = ranges.ranges.into_values().next().unwrap();
1952 let mut iter = range.build_record_batch_iter(None, None).unwrap();
1953 let rb = iter.next().transpose().unwrap().unwrap();
1954 assert_eq!(10, rb.num_rows());
1955 let schema = rb.schema();
1957 let column_names: Vec<_> = schema.fields().iter().map(|f| f.name().as_str()).collect();
1958 assert_eq!(
1959 column_names,
1960 vec![
1961 "k0",
1962 "k1",
1963 "v0",
1964 "v1",
1965 "ts",
1966 "__primary_key",
1967 "__sequence",
1968 "__op_type",
1969 ]
1970 );
1971 assert!(iter.next().is_none());
1972 }
1973
1974 #[test]
1975 fn test_build_record_batch_iter_with_time_range() {
1976 let schema = schema_for_test();
1977 let memtable = TimeSeriesMemtable::new(schema.clone(), 1, None, true, MergeMode::LastRow);
1978
1979 let kvs = build_key_values(&schema, "test".to_string(), 1, 10);
1980 memtable.write(&kvs).unwrap();
1981
1982 let read_column_ids: Vec<ColumnId> = schema
1983 .column_metadatas
1984 .iter()
1985 .map(|c| c.column_id)
1986 .collect();
1987 let ranges = memtable
1988 .ranges(Some(&read_column_ids), RangesOptions::default())
1989 .unwrap();
1990 assert_eq!(1, ranges.ranges.len());
1991
1992 let time_range = (Timestamp::new_millisecond(3), Timestamp::new_millisecond(7));
1993
1994 let range = ranges.ranges.into_values().next().unwrap();
1995 let mut iter = range
1996 .build_record_batch_iter(Some(time_range), None)
1997 .unwrap();
1998
1999 let mut total_rows = 0;
2000 let mut all_timestamps = Vec::new();
2001 while let Some(rb) = iter.next().transpose().unwrap() {
2002 total_rows += rb.num_rows();
2003 let ts_col = rb
2004 .column_by_name("ts")
2005 .unwrap()
2006 .as_any()
2007 .downcast_ref::<datatypes::arrow::array::TimestampMillisecondArray>()
2008 .unwrap();
2009 for i in 0..ts_col.len() {
2010 all_timestamps.push(ts_col.value(i));
2011 }
2012 }
2013 assert_eq!(5, total_rows);
2014 all_timestamps.sort();
2015 assert_eq!(vec![3, 4, 5, 6, 7], all_timestamps);
2016 }
2017}