1use std::collections::btree_map::Entry;
16use std::collections::{BTreeMap, Bound, HashSet};
17use std::fmt::{Debug, Formatter};
18use std::iter;
19use std::sync::atomic::{AtomicI64, AtomicU64, AtomicUsize, Ordering};
20use std::sync::{Arc, RwLock};
21use std::time::{Duration, Instant};
22
23use api::v1::OpType;
24use common_recordbatch::filter::SimpleFilterEvaluator;
25use common_telemetry::{debug, error};
26use common_time::Timestamp;
27use datatypes::arrow;
28use datatypes::arrow::array::ArrayRef;
29use datatypes::arrow_array::StringArray;
30use datatypes::data_type::{ConcreteDataType, DataType};
31use datatypes::prelude::{ScalarVector, Vector, VectorRef};
32use datatypes::types::TimestampType;
33use datatypes::value::{Value, ValueRef};
34use datatypes::vectors::{
35 Helper, TimestampMicrosecondVector, TimestampMillisecondVector, TimestampNanosecondVector,
36 TimestampSecondVector, UInt64Vector, UInt8Vector,
37};
38use mito_codec::key_values::KeyValue;
39use mito_codec::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt};
40use snafu::{ensure, OptionExt, ResultExt};
41use store_api::metadata::RegionMetadataRef;
42use store_api::storage::{ColumnId, SequenceNumber};
43use table::predicate::Predicate;
44
45use crate::error::{
46 self, ComputeArrowSnafu, ConvertVectorSnafu, EncodeSnafu, PrimaryKeyLengthMismatchSnafu, Result,
47};
48use crate::flush::WriteBufferManagerRef;
49use crate::memtable::builder::{FieldBuilder, StringBuilder};
50use crate::memtable::bulk::part::BulkPart;
51use crate::memtable::simple_bulk_memtable::SimpleBulkMemtable;
52use crate::memtable::stats::WriteMetrics;
53use crate::memtable::{
54 AllocTracker, BoxedBatchIterator, IterBuilder, KeyValues, MemScanMetrics, Memtable,
55 MemtableBuilder, MemtableId, MemtableRange, MemtableRangeContext, MemtableRanges, MemtableRef,
56 MemtableStats, PredicateGroup,
57};
58use crate::metrics::{
59 MEMTABLE_ACTIVE_FIELD_BUILDER_COUNT, MEMTABLE_ACTIVE_SERIES_COUNT, READ_ROWS_TOTAL,
60 READ_STAGE_ELAPSED,
61};
62use crate::read::dedup::LastNonNullIter;
63use crate::read::{Batch, BatchBuilder, BatchColumn};
64use crate::region::options::MergeMode;
65
66const INITIAL_BUILDER_CAPACITY: usize = 4;
68
69const BUILDER_CAPACITY: usize = 512;
71
72#[derive(Debug, Default)]
74pub struct TimeSeriesMemtableBuilder {
75 write_buffer_manager: Option<WriteBufferManagerRef>,
76 dedup: bool,
77 merge_mode: MergeMode,
78}
79
80impl TimeSeriesMemtableBuilder {
81 pub fn new(
83 write_buffer_manager: Option<WriteBufferManagerRef>,
84 dedup: bool,
85 merge_mode: MergeMode,
86 ) -> Self {
87 Self {
88 write_buffer_manager,
89 dedup,
90 merge_mode,
91 }
92 }
93}
94
95impl MemtableBuilder for TimeSeriesMemtableBuilder {
96 fn build(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef {
97 if metadata.primary_key.is_empty() {
98 Arc::new(SimpleBulkMemtable::new(
99 id,
100 metadata.clone(),
101 self.write_buffer_manager.clone(),
102 self.dedup,
103 self.merge_mode,
104 ))
105 } else {
106 Arc::new(TimeSeriesMemtable::new(
107 metadata.clone(),
108 id,
109 self.write_buffer_manager.clone(),
110 self.dedup,
111 self.merge_mode,
112 ))
113 }
114 }
115
116 fn use_bulk_insert(&self, _metadata: &RegionMetadataRef) -> bool {
117 false
120 }
121}
122
123pub struct TimeSeriesMemtable {
125 id: MemtableId,
126 region_metadata: RegionMetadataRef,
127 row_codec: Arc<DensePrimaryKeyCodec>,
128 series_set: SeriesSet,
129 alloc_tracker: AllocTracker,
130 max_timestamp: AtomicI64,
131 min_timestamp: AtomicI64,
132 max_sequence: AtomicU64,
133 dedup: bool,
134 merge_mode: MergeMode,
135 num_rows: AtomicUsize,
137}
138
139impl TimeSeriesMemtable {
140 pub fn new(
141 region_metadata: RegionMetadataRef,
142 id: MemtableId,
143 write_buffer_manager: Option<WriteBufferManagerRef>,
144 dedup: bool,
145 merge_mode: MergeMode,
146 ) -> Self {
147 let row_codec = Arc::new(DensePrimaryKeyCodec::new(®ion_metadata));
148 let series_set = SeriesSet::new(region_metadata.clone(), row_codec.clone());
149 let dedup = if merge_mode == MergeMode::LastNonNull {
150 false
151 } else {
152 dedup
153 };
154 Self {
155 id,
156 region_metadata,
157 series_set,
158 row_codec,
159 alloc_tracker: AllocTracker::new(write_buffer_manager),
160 max_timestamp: AtomicI64::new(i64::MIN),
161 min_timestamp: AtomicI64::new(i64::MAX),
162 max_sequence: AtomicU64::new(0),
163 dedup,
164 merge_mode,
165 num_rows: Default::default(),
166 }
167 }
168
169 fn update_stats(&self, stats: WriteMetrics) {
171 self.alloc_tracker
172 .on_allocation(stats.key_bytes + stats.value_bytes);
173 self.max_timestamp.fetch_max(stats.max_ts, Ordering::SeqCst);
174 self.min_timestamp.fetch_min(stats.min_ts, Ordering::SeqCst);
175 self.max_sequence
176 .fetch_max(stats.max_sequence, Ordering::SeqCst);
177 self.num_rows.fetch_add(stats.num_rows, Ordering::SeqCst);
178 }
179
180 fn write_key_value(&self, kv: KeyValue, stats: &mut WriteMetrics) -> Result<()> {
181 ensure!(
182 self.row_codec.num_fields() == kv.num_primary_keys(),
183 PrimaryKeyLengthMismatchSnafu {
184 expect: self.row_codec.num_fields(),
185 actual: kv.num_primary_keys(),
186 }
187 );
188
189 let primary_key_encoded = self
190 .row_codec
191 .encode(kv.primary_keys())
192 .context(EncodeSnafu)?;
193
194 let (key_allocated, value_allocated) =
195 self.series_set.push_to_series(primary_key_encoded, &kv);
196 stats.key_bytes += key_allocated;
197 stats.value_bytes += value_allocated;
198
199 let ts = kv.timestamp().as_timestamp().unwrap().unwrap().value();
201 stats.min_ts = stats.min_ts.min(ts);
202 stats.max_ts = stats.max_ts.max(ts);
203 Ok(())
204 }
205}
206
207impl Debug for TimeSeriesMemtable {
208 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
209 f.debug_struct("TimeSeriesMemtable").finish()
210 }
211}
212
213impl Memtable for TimeSeriesMemtable {
214 fn id(&self) -> MemtableId {
215 self.id
216 }
217
218 fn write(&self, kvs: &KeyValues) -> Result<()> {
219 if kvs.is_empty() {
220 return Ok(());
221 }
222
223 let mut local_stats = WriteMetrics::default();
224
225 for kv in kvs.iter() {
226 self.write_key_value(kv, &mut local_stats)?;
227 }
228 local_stats.value_bytes += kvs.num_rows() * std::mem::size_of::<Timestamp>();
229 local_stats.value_bytes += kvs.num_rows() * std::mem::size_of::<OpType>();
230 local_stats.max_sequence = kvs.max_sequence();
231 local_stats.num_rows = kvs.num_rows();
232 self.update_stats(local_stats);
236 Ok(())
237 }
238
239 fn write_one(&self, key_value: KeyValue) -> Result<()> {
240 let mut metrics = WriteMetrics::default();
241 let res = self.write_key_value(key_value, &mut metrics);
242 metrics.value_bytes += std::mem::size_of::<Timestamp>() + std::mem::size_of::<OpType>();
243 metrics.max_sequence = key_value.sequence();
244 metrics.num_rows = 1;
245
246 if res.is_ok() {
247 self.update_stats(metrics);
248 }
249 res
250 }
251
252 fn write_bulk(&self, part: BulkPart) -> Result<()> {
253 let mutation = part.to_mutation(&self.region_metadata)?;
255 let mut metrics = WriteMetrics::default();
256 if let Some(key_values) = KeyValues::new(&self.region_metadata, mutation) {
257 for kv in key_values.iter() {
258 self.write_key_value(kv, &mut metrics)?
259 }
260 }
261
262 metrics.max_sequence = part.sequence;
263 metrics.max_ts = part.max_ts;
264 metrics.min_ts = part.min_ts;
265 metrics.num_rows = part.num_rows();
266 self.update_stats(metrics);
267 Ok(())
268 }
269
270 #[cfg(any(test, feature = "test"))]
271 fn iter(
272 &self,
273 projection: Option<&[ColumnId]>,
274 filters: Option<Predicate>,
275 sequence: Option<SequenceNumber>,
276 ) -> Result<BoxedBatchIterator> {
277 let projection = if let Some(projection) = projection {
278 projection.iter().copied().collect()
279 } else {
280 self.region_metadata
281 .field_columns()
282 .map(|c| c.column_id)
283 .collect()
284 };
285
286 let iter = self
287 .series_set
288 .iter_series(projection, filters, self.dedup, sequence, None)?;
289
290 if self.merge_mode == MergeMode::LastNonNull {
291 let iter = LastNonNullIter::new(iter);
292 Ok(Box::new(iter))
293 } else {
294 Ok(Box::new(iter))
295 }
296 }
297
298 fn ranges(
299 &self,
300 projection: Option<&[ColumnId]>,
301 predicate: PredicateGroup,
302 sequence: Option<SequenceNumber>,
303 ) -> Result<MemtableRanges> {
304 let projection = if let Some(projection) = projection {
305 projection.iter().copied().collect()
306 } else {
307 self.region_metadata
308 .field_columns()
309 .map(|c| c.column_id)
310 .collect()
311 };
312 let builder = Box::new(TimeSeriesIterBuilder {
313 series_set: self.series_set.clone(),
314 projection,
315 predicate: predicate.predicate().cloned(),
316 dedup: self.dedup,
317 merge_mode: self.merge_mode,
318 sequence,
319 });
320 let context = Arc::new(MemtableRangeContext::new(self.id, builder, predicate));
321
322 let stats = self.stats();
323 Ok(MemtableRanges {
324 ranges: [(0, MemtableRange::new(context, stats.num_rows))].into(),
325 stats,
326 })
327 }
328
329 fn is_empty(&self) -> bool {
330 self.series_set.series.read().unwrap().0.is_empty()
331 }
332
333 fn freeze(&self) -> Result<()> {
334 self.alloc_tracker.done_allocating();
335
336 Ok(())
337 }
338
339 fn stats(&self) -> MemtableStats {
340 let estimated_bytes = self.alloc_tracker.bytes_allocated();
341
342 if estimated_bytes == 0 {
343 return MemtableStats {
345 estimated_bytes,
346 time_range: None,
347 num_rows: 0,
348 num_ranges: 0,
349 max_sequence: 0,
350 series_count: 0,
351 };
352 }
353 let ts_type = self
354 .region_metadata
355 .time_index_column()
356 .column_schema
357 .data_type
358 .clone()
359 .as_timestamp()
360 .expect("Timestamp column must have timestamp type");
361 let max_timestamp = ts_type.create_timestamp(self.max_timestamp.load(Ordering::Relaxed));
362 let min_timestamp = ts_type.create_timestamp(self.min_timestamp.load(Ordering::Relaxed));
363 let series_count = self.series_set.series.read().unwrap().0.len();
364 MemtableStats {
365 estimated_bytes,
366 time_range: Some((min_timestamp, max_timestamp)),
367 num_rows: self.num_rows.load(Ordering::Relaxed),
368 num_ranges: 1,
369 max_sequence: self.max_sequence.load(Ordering::Relaxed),
370 series_count,
371 }
372 }
373
374 fn fork(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef {
375 Arc::new(TimeSeriesMemtable::new(
376 metadata.clone(),
377 id,
378 self.alloc_tracker.write_buffer_manager(),
379 self.dedup,
380 self.merge_mode,
381 ))
382 }
383}
384
385#[derive(Default)]
386struct SeriesMap(BTreeMap<Vec<u8>, Arc<RwLock<Series>>>);
387
388impl Drop for SeriesMap {
389 fn drop(&mut self) {
390 let num_series = self.0.len();
391 let num_field_builders = self
392 .0
393 .values()
394 .map(|v| v.read().unwrap().active.num_field_builders())
395 .sum::<usize>();
396 MEMTABLE_ACTIVE_SERIES_COUNT.sub(num_series as i64);
397 MEMTABLE_ACTIVE_FIELD_BUILDER_COUNT.sub(num_field_builders as i64);
398 }
399}
400
401#[derive(Clone)]
402pub(crate) struct SeriesSet {
403 region_metadata: RegionMetadataRef,
404 series: Arc<RwLock<SeriesMap>>,
405 codec: Arc<DensePrimaryKeyCodec>,
406}
407
408impl SeriesSet {
409 fn new(region_metadata: RegionMetadataRef, codec: Arc<DensePrimaryKeyCodec>) -> Self {
410 Self {
411 region_metadata,
412 series: Default::default(),
413 codec,
414 }
415 }
416}
417
418impl SeriesSet {
419 fn push_to_series(&self, primary_key: Vec<u8>, kv: &KeyValue) -> (usize, usize) {
421 if let Some(series) = self.series.read().unwrap().0.get(&primary_key) {
422 let value_allocated = series.write().unwrap().push(
423 kv.timestamp(),
424 kv.sequence(),
425 kv.op_type(),
426 kv.fields(),
427 );
428 return (0, value_allocated);
429 };
430
431 let mut indices = self.series.write().unwrap();
432 match indices.0.entry(primary_key) {
433 Entry::Vacant(v) => {
434 let key_len = v.key().len();
435 let mut series = Series::new(&self.region_metadata);
436 let value_allocated =
437 series.push(kv.timestamp(), kv.sequence(), kv.op_type(), kv.fields());
438 v.insert(Arc::new(RwLock::new(series)));
439 (key_len, value_allocated)
440 }
441 Entry::Occupied(v) => {
443 let value_allocated = v.get().write().unwrap().push(
444 kv.timestamp(),
445 kv.sequence(),
446 kv.op_type(),
447 kv.fields(),
448 );
449 (0, value_allocated)
450 }
451 }
452 }
453
454 #[cfg(test)]
455 fn get_series(&self, primary_key: &[u8]) -> Option<Arc<RwLock<Series>>> {
456 self.series.read().unwrap().0.get(primary_key).cloned()
457 }
458
459 fn iter_series(
461 &self,
462 projection: HashSet<ColumnId>,
463 predicate: Option<Predicate>,
464 dedup: bool,
465 sequence: Option<SequenceNumber>,
466 mem_scan_metrics: Option<MemScanMetrics>,
467 ) -> Result<Iter> {
468 let primary_key_schema = primary_key_schema(&self.region_metadata);
469 let primary_key_datatypes = self
470 .region_metadata
471 .primary_key_columns()
472 .map(|pk| pk.column_schema.data_type.clone())
473 .collect();
474
475 Iter::try_new(
476 self.region_metadata.clone(),
477 self.series.clone(),
478 projection,
479 predicate,
480 primary_key_schema,
481 primary_key_datatypes,
482 self.codec.clone(),
483 dedup,
484 sequence,
485 mem_scan_metrics,
486 )
487 }
488}
489
490pub(crate) fn primary_key_schema(
493 region_metadata: &RegionMetadataRef,
494) -> arrow::datatypes::SchemaRef {
495 let fields = region_metadata
496 .primary_key_columns()
497 .map(|pk| {
498 arrow::datatypes::Field::new(
499 pk.column_schema.name.clone(),
500 pk.column_schema.data_type.as_arrow_type(),
501 pk.column_schema.is_nullable(),
502 )
503 })
504 .collect::<Vec<_>>();
505 Arc::new(arrow::datatypes::Schema::new(fields))
506}
507
508#[derive(Debug, Default)]
510struct Metrics {
511 total_series: usize,
513 num_pruned_series: usize,
515 num_rows: usize,
517 num_batches: usize,
519 scan_cost: Duration,
521}
522
523struct Iter {
524 metadata: RegionMetadataRef,
525 series: Arc<RwLock<SeriesMap>>,
526 projection: HashSet<ColumnId>,
527 last_key: Option<Vec<u8>>,
528 predicate: Vec<SimpleFilterEvaluator>,
529 pk_schema: arrow::datatypes::SchemaRef,
530 pk_datatypes: Vec<ConcreteDataType>,
531 codec: Arc<DensePrimaryKeyCodec>,
532 dedup: bool,
533 sequence: Option<SequenceNumber>,
534 metrics: Metrics,
535 mem_scan_metrics: Option<MemScanMetrics>,
536}
537
538impl Iter {
539 #[allow(clippy::too_many_arguments)]
540 pub(crate) fn try_new(
541 metadata: RegionMetadataRef,
542 series: Arc<RwLock<SeriesMap>>,
543 projection: HashSet<ColumnId>,
544 predicate: Option<Predicate>,
545 pk_schema: arrow::datatypes::SchemaRef,
546 pk_datatypes: Vec<ConcreteDataType>,
547 codec: Arc<DensePrimaryKeyCodec>,
548 dedup: bool,
549 sequence: Option<SequenceNumber>,
550 mem_scan_metrics: Option<MemScanMetrics>,
551 ) -> Result<Self> {
552 let predicate = predicate
553 .map(|predicate| {
554 predicate
555 .exprs()
556 .iter()
557 .filter_map(SimpleFilterEvaluator::try_new)
558 .collect::<Vec<_>>()
559 })
560 .unwrap_or_default();
561 Ok(Self {
562 metadata,
563 series,
564 projection,
565 last_key: None,
566 predicate,
567 pk_schema,
568 pk_datatypes,
569 codec,
570 dedup,
571 sequence,
572 metrics: Metrics::default(),
573 mem_scan_metrics,
574 })
575 }
576
577 fn report_mem_scan_metrics(&mut self) {
578 if let Some(mem_scan_metrics) = self.mem_scan_metrics.take() {
579 let inner = crate::memtable::MemScanMetricsData {
580 total_series: self.metrics.total_series,
581 num_rows: self.metrics.num_rows,
582 num_batches: self.metrics.num_batches,
583 scan_cost: self.metrics.scan_cost,
584 };
585 mem_scan_metrics.merge_inner(&inner);
586 }
587 }
588}
589
590impl Drop for Iter {
591 fn drop(&mut self) {
592 debug!(
593 "Iter {} time series memtable, metrics: {:?}",
594 self.metadata.region_id, self.metrics
595 );
596
597 self.report_mem_scan_metrics();
599
600 READ_ROWS_TOTAL
601 .with_label_values(&["time_series_memtable"])
602 .inc_by(self.metrics.num_rows as u64);
603 READ_STAGE_ELAPSED
604 .with_label_values(&["scan_memtable"])
605 .observe(self.metrics.scan_cost.as_secs_f64());
606 }
607}
608
609impl Iterator for Iter {
610 type Item = Result<Batch>;
611
612 fn next(&mut self) -> Option<Self::Item> {
613 let start = Instant::now();
614 let map = self.series.read().unwrap();
615 let range = match &self.last_key {
616 None => map.0.range::<Vec<u8>, _>(..),
617 Some(last_key) => map
618 .0
619 .range::<Vec<u8>, _>((Bound::Excluded(last_key), Bound::Unbounded)),
620 };
621
622 for (primary_key, series) in range {
624 self.metrics.total_series += 1;
625
626 let mut series = series.write().unwrap();
627 if !self.predicate.is_empty()
628 && !prune_primary_key(
629 &self.codec,
630 primary_key.as_slice(),
631 &mut series,
632 &self.pk_datatypes,
633 self.pk_schema.clone(),
634 &self.predicate,
635 )
636 {
637 self.metrics.num_pruned_series += 1;
639 continue;
640 }
641 self.last_key = Some(primary_key.clone());
642
643 let values = series.compact(&self.metadata);
644 let batch = values.and_then(|v| {
645 v.to_batch(primary_key, &self.metadata, &self.projection, self.dedup)
646 });
647
648 self.metrics.num_batches += 1;
650 self.metrics.num_rows += batch.as_ref().map(|b| b.num_rows()).unwrap_or(0);
651 self.metrics.scan_cost += start.elapsed();
652
653 let mut batch = batch;
654 batch = batch.and_then(|mut batch| {
655 batch.filter_by_sequence(self.sequence)?;
656 Ok(batch)
657 });
658 return Some(batch);
659 }
660 drop(map); self.metrics.scan_cost += start.elapsed();
662
663 self.report_mem_scan_metrics();
665
666 None
667 }
668}
669
670fn prune_primary_key(
671 codec: &Arc<DensePrimaryKeyCodec>,
672 pk: &[u8],
673 series: &mut Series,
674 datatypes: &[ConcreteDataType],
675 pk_schema: arrow::datatypes::SchemaRef,
676 predicates: &[SimpleFilterEvaluator],
677) -> bool {
678 if pk_schema.fields().is_empty() {
680 return true;
681 }
682
683 let pk_values = if let Some(pk_values) = series.pk_cache.as_ref() {
685 pk_values
686 } else {
687 let pk_values = codec.decode_dense_without_column_id(pk);
688 if let Err(e) = pk_values {
689 error!(e; "Failed to decode primary key");
690 return true;
691 }
692 series.update_pk_cache(pk_values.unwrap());
693 series.pk_cache.as_ref().unwrap()
694 };
695
696 let mut result = true;
698 for predicate in predicates {
699 let Ok(index) = pk_schema.index_of(predicate.column_name()) else {
701 continue;
702 };
703 let scalar_value = pk_values[index]
705 .try_to_scalar_value(&datatypes[index])
706 .unwrap();
707 result &= predicate.evaluate_scalar(&scalar_value).unwrap_or(true);
708 }
709
710 result
711}
712
713pub struct Series {
715 pk_cache: Option<Vec<Value>>,
716 active: ValueBuilder,
717 frozen: Vec<Values>,
718 region_metadata: RegionMetadataRef,
719 capacity: usize,
720}
721
722impl Series {
723 pub(crate) fn with_capacity(
724 region_metadata: &RegionMetadataRef,
725 init_capacity: usize,
726 capacity: usize,
727 ) -> Self {
728 MEMTABLE_ACTIVE_SERIES_COUNT.inc();
729 Self {
730 pk_cache: None,
731 active: ValueBuilder::new(region_metadata, init_capacity),
732 frozen: vec![],
733 region_metadata: region_metadata.clone(),
734 capacity,
735 }
736 }
737
738 pub(crate) fn new(region_metadata: &RegionMetadataRef) -> Self {
739 Self::with_capacity(region_metadata, INITIAL_BUILDER_CAPACITY, BUILDER_CAPACITY)
740 }
741
742 pub fn is_empty(&self) -> bool {
743 self.active.len() == 0 && self.frozen.is_empty()
744 }
745
746 pub(crate) fn push<'a>(
748 &mut self,
749 ts: ValueRef<'a>,
750 sequence: u64,
751 op_type: OpType,
752 values: impl Iterator<Item = ValueRef<'a>>,
753 ) -> usize {
754 if self.active.len() + 10 > self.capacity {
756 let region_metadata = self.region_metadata.clone();
757 self.freeze(®ion_metadata);
758 }
759 self.active.push(ts, sequence, op_type as u8, values)
760 }
761
762 fn update_pk_cache(&mut self, pk_values: Vec<Value>) {
763 self.pk_cache = Some(pk_values);
764 }
765
766 pub(crate) fn freeze(&mut self, region_metadata: &RegionMetadataRef) {
768 if self.active.len() != 0 {
769 let mut builder = ValueBuilder::new(region_metadata, INITIAL_BUILDER_CAPACITY);
770 std::mem::swap(&mut self.active, &mut builder);
771 self.frozen.push(Values::from(builder));
772 }
773 }
774
775 pub(crate) fn extend(
776 &mut self,
777 ts_v: VectorRef,
778 op_type_v: u8,
779 sequence_v: u64,
780 fields: Vec<VectorRef>,
781 ) -> Result<()> {
782 if !self.active.can_accommodate(&fields)? {
783 let region_metadata = self.region_metadata.clone();
784 self.freeze(®ion_metadata);
785 }
786 self.active.extend(ts_v, op_type_v, sequence_v, fields)
787 }
788
789 pub(crate) fn compact(&mut self, region_metadata: &RegionMetadataRef) -> Result<&Values> {
792 self.freeze(region_metadata);
793
794 let frozen = &self.frozen;
795
796 debug_assert!(!frozen.is_empty());
798
799 if frozen.len() > 1 {
800 let column_size = frozen[0].fields.len() + 3;
804
805 if cfg!(debug_assertions) {
806 debug_assert!(frozen
807 .iter()
808 .zip(frozen.iter().skip(1))
809 .all(|(prev, next)| { prev.fields.len() == next.fields.len() }));
810 }
811
812 let arrays = frozen.iter().map(|v| v.columns()).collect::<Vec<_>>();
813 let concatenated = (0..column_size)
814 .map(|i| {
815 let to_concat = arrays.iter().map(|a| a[i].as_ref()).collect::<Vec<_>>();
816 arrow::compute::concat(&to_concat)
817 })
818 .collect::<std::result::Result<Vec<_>, _>>()
819 .context(ComputeArrowSnafu)?;
820
821 debug_assert_eq!(concatenated.len(), column_size);
822 let values = Values::from_columns(&concatenated)?;
823 self.frozen = vec![values];
824 };
825 Ok(&self.frozen[0])
826 }
827
828 pub fn read_to_values(&self) -> Vec<Values> {
829 let mut res = Vec::with_capacity(self.frozen.len() + 1);
830 res.extend(self.frozen.iter().cloned());
831 res.push(self.active.finish_cloned());
832 res
833 }
834}
835
836pub(crate) struct ValueBuilder {
838 timestamp: Vec<i64>,
839 timestamp_type: ConcreteDataType,
840 sequence: Vec<u64>,
841 op_type: Vec<u8>,
842 fields: Vec<Option<FieldBuilder>>,
843 field_types: Vec<ConcreteDataType>,
844}
845
846impl ValueBuilder {
847 pub(crate) fn new(region_metadata: &RegionMetadataRef, capacity: usize) -> Self {
848 let timestamp_type = region_metadata
849 .time_index_column()
850 .column_schema
851 .data_type
852 .clone();
853 let sequence = Vec::with_capacity(capacity);
854 let op_type = Vec::with_capacity(capacity);
855
856 let field_types = region_metadata
857 .field_columns()
858 .map(|c| c.column_schema.data_type.clone())
859 .collect::<Vec<_>>();
860 let fields = (0..field_types.len()).map(|_| None).collect();
861 Self {
862 timestamp: Vec::with_capacity(capacity),
863 timestamp_type,
864 sequence,
865 op_type,
866 fields,
867 field_types,
868 }
869 }
870
871 pub fn num_field_builders(&self) -> usize {
873 self.fields.iter().flatten().count()
874 }
875
876 pub(crate) fn push<'a>(
882 &mut self,
883 ts: ValueRef,
884 sequence: u64,
885 op_type: u8,
886 fields: impl Iterator<Item = ValueRef<'a>>,
887 ) -> usize {
888 #[cfg(debug_assertions)]
889 let fields = {
890 let field_vec = fields.collect::<Vec<_>>();
891 debug_assert_eq!(field_vec.len(), self.fields.len());
892 field_vec.into_iter()
893 };
894
895 self.timestamp
896 .push(ts.as_timestamp().unwrap().unwrap().value());
897 self.sequence.push(sequence);
898 self.op_type.push(op_type);
899 let num_rows = self.timestamp.len();
900 let mut size = 0;
901 for (idx, field_value) in fields.enumerate() {
902 size += field_value.data_size();
903 if !field_value.is_null() || self.fields[idx].is_some() {
904 if let Some(field) = self.fields[idx].as_mut() {
905 let _ = field.push(field_value);
906 } else {
907 let mut mutable_vector =
908 if let ConcreteDataType::String(_) = &self.field_types[idx] {
909 FieldBuilder::String(StringBuilder::with_capacity(4, 8))
910 } else {
911 FieldBuilder::Other(
912 self.field_types[idx]
913 .create_mutable_vector(num_rows.max(INITIAL_BUILDER_CAPACITY)),
914 )
915 };
916 mutable_vector.push_nulls(num_rows - 1);
917 let _ = mutable_vector.push(field_value);
918 self.fields[idx] = Some(mutable_vector);
919 MEMTABLE_ACTIVE_FIELD_BUILDER_COUNT.inc();
920 }
921 }
922 }
923
924 size
925 }
926
927 pub(crate) fn can_accommodate(&self, fields: &[VectorRef]) -> Result<bool> {
930 for (field_src, field_dest) in fields.iter().zip(self.fields.iter()) {
931 let Some(builder) = field_dest else {
932 continue;
933 };
934 let FieldBuilder::String(builder) = builder else {
935 continue;
936 };
937 let array = field_src.to_arrow_array();
938 let string_array = array
939 .as_any()
940 .downcast_ref::<StringArray>()
941 .with_context(|| error::InvalidBatchSnafu {
942 reason: format!(
943 "Field type mismatch, expecting String, given: {}",
944 field_src.data_type()
945 ),
946 })?;
947 let space_needed = string_array.value_data().len() as i32;
948 if builder.next_offset().checked_add(space_needed).is_none() {
950 return Ok(false);
951 }
952 }
953 Ok(true)
954 }
955
956 pub(crate) fn extend(
957 &mut self,
958 ts_v: VectorRef,
959 op_type: u8,
960 sequence: u64,
961 fields: Vec<VectorRef>,
962 ) -> Result<()> {
963 let num_rows_before = self.timestamp.len();
964 let num_rows_to_write = ts_v.len();
965 self.timestamp.reserve(num_rows_to_write);
966 match self.timestamp_type {
967 ConcreteDataType::Timestamp(TimestampType::Second(_)) => {
968 self.timestamp.extend(
969 ts_v.as_any()
970 .downcast_ref::<TimestampSecondVector>()
971 .unwrap()
972 .iter_data()
973 .map(|v| v.unwrap().0.value()),
974 );
975 }
976 ConcreteDataType::Timestamp(TimestampType::Millisecond(_)) => {
977 self.timestamp.extend(
978 ts_v.as_any()
979 .downcast_ref::<TimestampMillisecondVector>()
980 .unwrap()
981 .iter_data()
982 .map(|v| v.unwrap().0.value()),
983 );
984 }
985 ConcreteDataType::Timestamp(TimestampType::Microsecond(_)) => {
986 self.timestamp.extend(
987 ts_v.as_any()
988 .downcast_ref::<TimestampMicrosecondVector>()
989 .unwrap()
990 .iter_data()
991 .map(|v| v.unwrap().0.value()),
992 );
993 }
994 ConcreteDataType::Timestamp(TimestampType::Nanosecond(_)) => {
995 self.timestamp.extend(
996 ts_v.as_any()
997 .downcast_ref::<TimestampNanosecondVector>()
998 .unwrap()
999 .iter_data()
1000 .map(|v| v.unwrap().0.value()),
1001 );
1002 }
1003 _ => unreachable!(),
1004 };
1005
1006 self.op_type.reserve(num_rows_to_write);
1007 self.op_type
1008 .extend(iter::repeat_n(op_type, num_rows_to_write));
1009 self.sequence.reserve(num_rows_to_write);
1010 self.sequence
1011 .extend(iter::repeat_n(sequence, num_rows_to_write));
1012
1013 for (field_idx, (field_src, field_dest)) in
1014 fields.into_iter().zip(self.fields.iter_mut()).enumerate()
1015 {
1016 let builder = field_dest.get_or_insert_with(|| {
1017 let mut field_builder =
1018 FieldBuilder::create(&self.field_types[field_idx], INITIAL_BUILDER_CAPACITY);
1019 field_builder.push_nulls(num_rows_before);
1020 field_builder
1021 });
1022 match builder {
1023 FieldBuilder::String(builder) => {
1024 let array = field_src.to_arrow_array();
1025 let string_array =
1026 array
1027 .as_any()
1028 .downcast_ref::<StringArray>()
1029 .with_context(|| error::InvalidBatchSnafu {
1030 reason: format!(
1031 "Field type mismatch, expecting String, given: {}",
1032 field_src.data_type()
1033 ),
1034 })?;
1035 builder.append_array(string_array);
1036 }
1037 FieldBuilder::Other(builder) => {
1038 let len = field_src.len();
1039 builder
1040 .extend_slice_of(&*field_src, 0, len)
1041 .context(error::ComputeVectorSnafu)?;
1042 }
1043 }
1044 }
1045 Ok(())
1046 }
1047
1048 fn len(&self) -> usize {
1050 let sequence_len = self.sequence.len();
1051 debug_assert_eq!(sequence_len, self.op_type.len());
1052 debug_assert_eq!(sequence_len, self.timestamp.len());
1053 sequence_len
1054 }
1055
1056 fn finish_cloned(&self) -> Values {
1057 let num_rows = self.sequence.len();
1058 let fields = self
1059 .fields
1060 .iter()
1061 .enumerate()
1062 .map(|(i, v)| {
1063 if let Some(v) = v {
1064 MEMTABLE_ACTIVE_FIELD_BUILDER_COUNT.dec();
1065 v.finish_cloned()
1066 } else {
1067 let mut single_null = self.field_types[i].create_mutable_vector(num_rows);
1068 single_null.push_nulls(num_rows);
1069 single_null.to_vector()
1070 }
1071 })
1072 .collect::<Vec<_>>();
1073
1074 let sequence = Arc::new(UInt64Vector::from_vec(self.sequence.clone()));
1075 let op_type = Arc::new(UInt8Vector::from_vec(self.op_type.clone()));
1076 let timestamp: VectorRef = match self.timestamp_type {
1077 ConcreteDataType::Timestamp(TimestampType::Second(_)) => {
1078 Arc::new(TimestampSecondVector::from_vec(self.timestamp.clone()))
1079 }
1080 ConcreteDataType::Timestamp(TimestampType::Millisecond(_)) => {
1081 Arc::new(TimestampMillisecondVector::from_vec(self.timestamp.clone()))
1082 }
1083 ConcreteDataType::Timestamp(TimestampType::Microsecond(_)) => {
1084 Arc::new(TimestampMicrosecondVector::from_vec(self.timestamp.clone()))
1085 }
1086 ConcreteDataType::Timestamp(TimestampType::Nanosecond(_)) => {
1087 Arc::new(TimestampNanosecondVector::from_vec(self.timestamp.clone()))
1088 }
1089 _ => unreachable!(),
1090 };
1091
1092 if cfg!(debug_assertions) {
1093 debug_assert_eq!(timestamp.len(), sequence.len());
1094 debug_assert_eq!(timestamp.len(), op_type.len());
1095 for field in &fields {
1096 debug_assert_eq!(timestamp.len(), field.len());
1097 }
1098 }
1099
1100 Values {
1101 timestamp,
1102 sequence,
1103 op_type,
1104 fields,
1105 }
1106 }
1107}
1108
1109#[derive(Clone)]
1111pub struct Values {
1112 pub(crate) timestamp: VectorRef,
1113 pub(crate) sequence: Arc<UInt64Vector>,
1114 pub(crate) op_type: Arc<UInt8Vector>,
1115 pub(crate) fields: Vec<VectorRef>,
1116}
1117
1118impl Values {
1119 pub fn to_batch(
1122 &self,
1123 primary_key: &[u8],
1124 metadata: &RegionMetadataRef,
1125 projection: &HashSet<ColumnId>,
1126 dedup: bool,
1127 ) -> Result<Batch> {
1128 let builder = BatchBuilder::with_required_columns(
1129 primary_key.to_vec(),
1130 self.timestamp.clone(),
1131 self.sequence.clone(),
1132 self.op_type.clone(),
1133 );
1134
1135 let fields = metadata
1136 .field_columns()
1137 .zip(self.fields.iter())
1138 .filter_map(|(c, f)| {
1139 projection.get(&c.column_id).map(|c| BatchColumn {
1140 column_id: *c,
1141 data: f.clone(),
1142 })
1143 })
1144 .collect();
1145
1146 let mut batch = builder.with_fields(fields).build()?;
1147 batch.sort(dedup)?;
1148 Ok(batch)
1149 }
1150
1151 fn columns(&self) -> Vec<ArrayRef> {
1153 let mut res = Vec::with_capacity(3 + self.fields.len());
1154 res.push(self.timestamp.to_arrow_array());
1155 res.push(self.sequence.to_arrow_array());
1156 res.push(self.op_type.to_arrow_array());
1157 res.extend(self.fields.iter().map(|f| f.to_arrow_array()));
1158 res
1159 }
1160
1161 fn from_columns(cols: &[ArrayRef]) -> Result<Self> {
1163 debug_assert!(cols.len() >= 3);
1164 let timestamp = Helper::try_into_vector(&cols[0]).context(ConvertVectorSnafu)?;
1165 let sequence =
1166 Arc::new(UInt64Vector::try_from_arrow_array(&cols[1]).context(ConvertVectorSnafu)?);
1167 let op_type =
1168 Arc::new(UInt8Vector::try_from_arrow_array(&cols[2]).context(ConvertVectorSnafu)?);
1169 let fields = Helper::try_into_vectors(&cols[3..]).context(ConvertVectorSnafu)?;
1170
1171 Ok(Self {
1172 timestamp,
1173 sequence,
1174 op_type,
1175 fields,
1176 })
1177 }
1178}
1179
1180impl From<ValueBuilder> for Values {
1181 fn from(mut value: ValueBuilder) -> Self {
1182 let num_rows = value.len();
1183 let fields = value
1184 .fields
1185 .iter_mut()
1186 .enumerate()
1187 .map(|(i, v)| {
1188 if let Some(v) = v {
1189 MEMTABLE_ACTIVE_FIELD_BUILDER_COUNT.dec();
1190 v.finish()
1191 } else {
1192 let mut single_null = value.field_types[i].create_mutable_vector(num_rows);
1193 single_null.push_nulls(num_rows);
1194 single_null.to_vector()
1195 }
1196 })
1197 .collect::<Vec<_>>();
1198
1199 let sequence = Arc::new(UInt64Vector::from_vec(value.sequence));
1200 let op_type = Arc::new(UInt8Vector::from_vec(value.op_type));
1201 let timestamp: VectorRef = match value.timestamp_type {
1202 ConcreteDataType::Timestamp(TimestampType::Second(_)) => {
1203 Arc::new(TimestampSecondVector::from_vec(value.timestamp))
1204 }
1205 ConcreteDataType::Timestamp(TimestampType::Millisecond(_)) => {
1206 Arc::new(TimestampMillisecondVector::from_vec(value.timestamp))
1207 }
1208 ConcreteDataType::Timestamp(TimestampType::Microsecond(_)) => {
1209 Arc::new(TimestampMicrosecondVector::from_vec(value.timestamp))
1210 }
1211 ConcreteDataType::Timestamp(TimestampType::Nanosecond(_)) => {
1212 Arc::new(TimestampNanosecondVector::from_vec(value.timestamp))
1213 }
1214 _ => unreachable!(),
1215 };
1216
1217 if cfg!(debug_assertions) {
1218 debug_assert_eq!(timestamp.len(), sequence.len());
1219 debug_assert_eq!(timestamp.len(), op_type.len());
1220 for field in &fields {
1221 debug_assert_eq!(timestamp.len(), field.len());
1222 }
1223 }
1224
1225 Self {
1226 timestamp,
1227 sequence,
1228 op_type,
1229 fields,
1230 }
1231 }
1232}
1233
1234struct TimeSeriesIterBuilder {
1235 series_set: SeriesSet,
1236 projection: HashSet<ColumnId>,
1237 predicate: Option<Predicate>,
1238 dedup: bool,
1239 sequence: Option<SequenceNumber>,
1240 merge_mode: MergeMode,
1241}
1242
1243impl IterBuilder for TimeSeriesIterBuilder {
1244 fn build(&self, metrics: Option<MemScanMetrics>) -> Result<BoxedBatchIterator> {
1245 let iter = self.series_set.iter_series(
1246 self.projection.clone(),
1247 self.predicate.clone(),
1248 self.dedup,
1249 self.sequence,
1250 metrics,
1251 )?;
1252
1253 if self.merge_mode == MergeMode::LastNonNull {
1254 let iter = LastNonNullIter::new(iter);
1255 Ok(Box::new(iter))
1256 } else {
1257 Ok(Box::new(iter))
1258 }
1259 }
1260}
1261
1262#[cfg(test)]
1263mod tests {
1264 use std::collections::{HashMap, HashSet};
1265
1266 use api::helper::ColumnDataTypeWrapper;
1267 use api::v1::value::ValueData;
1268 use api::v1::{Mutation, Row, Rows, SemanticType};
1269 use common_time::Timestamp;
1270 use datatypes::prelude::{ConcreteDataType, ScalarVector};
1271 use datatypes::schema::ColumnSchema;
1272 use datatypes::value::{OrderedFloat, Value};
1273 use datatypes::vectors::{Float64Vector, Int64Vector, TimestampMillisecondVector};
1274 use mito_codec::row_converter::SortField;
1275 use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
1276 use store_api::storage::RegionId;
1277
1278 use super::*;
1279 use crate::test_util::column_metadata_to_column_schema;
1280
1281 fn schema_for_test() -> RegionMetadataRef {
1282 let mut builder = RegionMetadataBuilder::new(RegionId::new(123, 456));
1283 builder
1284 .push_column_metadata(ColumnMetadata {
1285 column_schema: ColumnSchema::new("k0", ConcreteDataType::string_datatype(), false),
1286 semantic_type: SemanticType::Tag,
1287 column_id: 0,
1288 })
1289 .push_column_metadata(ColumnMetadata {
1290 column_schema: ColumnSchema::new("k1", ConcreteDataType::int64_datatype(), false),
1291 semantic_type: SemanticType::Tag,
1292 column_id: 1,
1293 })
1294 .push_column_metadata(ColumnMetadata {
1295 column_schema: ColumnSchema::new(
1296 "ts",
1297 ConcreteDataType::timestamp_millisecond_datatype(),
1298 false,
1299 ),
1300 semantic_type: SemanticType::Timestamp,
1301 column_id: 2,
1302 })
1303 .push_column_metadata(ColumnMetadata {
1304 column_schema: ColumnSchema::new("v0", ConcreteDataType::int64_datatype(), true),
1305 semantic_type: SemanticType::Field,
1306 column_id: 3,
1307 })
1308 .push_column_metadata(ColumnMetadata {
1309 column_schema: ColumnSchema::new("v1", ConcreteDataType::float64_datatype(), true),
1310 semantic_type: SemanticType::Field,
1311 column_id: 4,
1312 })
1313 .primary_key(vec![0, 1]);
1314 let region_metadata = builder.build().unwrap();
1315 Arc::new(region_metadata)
1316 }
1317
1318 fn ts_value_ref(val: i64) -> ValueRef<'static> {
1319 ValueRef::Timestamp(Timestamp::new_millisecond(val))
1320 }
1321
1322 fn field_value_ref(v0: i64, v1: f64) -> impl Iterator<Item = ValueRef<'static>> {
1323 vec![ValueRef::Int64(v0), ValueRef::Float64(OrderedFloat(v1))].into_iter()
1324 }
1325
1326 fn check_values(values: &Values, expect: &[(i64, u64, u8, i64, f64)]) {
1327 let ts = values
1328 .timestamp
1329 .as_any()
1330 .downcast_ref::<TimestampMillisecondVector>()
1331 .unwrap();
1332
1333 let v0 = values.fields[0]
1334 .as_any()
1335 .downcast_ref::<Int64Vector>()
1336 .unwrap();
1337 let v1 = values.fields[1]
1338 .as_any()
1339 .downcast_ref::<Float64Vector>()
1340 .unwrap();
1341 let read = ts
1342 .iter_data()
1343 .zip(values.sequence.iter_data())
1344 .zip(values.op_type.iter_data())
1345 .zip(v0.iter_data())
1346 .zip(v1.iter_data())
1347 .map(|((((ts, sequence), op_type), v0), v1)| {
1348 (
1349 ts.unwrap().0.value(),
1350 sequence.unwrap(),
1351 op_type.unwrap(),
1352 v0.unwrap(),
1353 v1.unwrap(),
1354 )
1355 })
1356 .collect::<Vec<_>>();
1357 assert_eq!(expect, &read);
1358 }
1359
1360 #[test]
1361 fn test_series() {
1362 let region_metadata = schema_for_test();
1363 let mut series = Series::new(®ion_metadata);
1364 series.push(ts_value_ref(1), 0, OpType::Put, field_value_ref(1, 10.1));
1365 series.push(ts_value_ref(2), 0, OpType::Put, field_value_ref(2, 10.2));
1366 assert_eq!(2, series.active.timestamp.len());
1367 assert_eq!(0, series.frozen.len());
1368
1369 let values = series.compact(®ion_metadata).unwrap();
1370 check_values(values, &[(1, 0, 1, 1, 10.1), (2, 0, 1, 2, 10.2)]);
1371 assert_eq!(0, series.active.timestamp.len());
1372 assert_eq!(1, series.frozen.len());
1373 }
1374
1375 #[test]
1376 fn test_series_with_nulls() {
1377 let region_metadata = schema_for_test();
1378 let mut series = Series::new(®ion_metadata);
1379 series.push(
1382 ts_value_ref(1),
1383 0,
1384 OpType::Put,
1385 vec![ValueRef::Null, ValueRef::Null].into_iter(),
1386 );
1387 series.push(
1388 ts_value_ref(1),
1389 0,
1390 OpType::Put,
1391 vec![ValueRef::Int64(1), ValueRef::Null].into_iter(),
1392 );
1393 series.push(ts_value_ref(1), 2, OpType::Put, field_value_ref(2, 10.2));
1394 series.push(
1395 ts_value_ref(1),
1396 3,
1397 OpType::Put,
1398 vec![ValueRef::Int64(2), ValueRef::Null].into_iter(),
1399 );
1400 assert_eq!(4, series.active.timestamp.len());
1401 assert_eq!(0, series.frozen.len());
1402
1403 let values = series.compact(®ion_metadata).unwrap();
1404 assert_eq!(values.fields[0].null_count(), 1);
1405 assert_eq!(values.fields[1].null_count(), 3);
1406 assert_eq!(0, series.active.timestamp.len());
1407 assert_eq!(1, series.frozen.len());
1408 }
1409
1410 fn check_value(batch: &Batch, expect: Vec<Vec<Value>>) {
1411 let ts_len = batch.timestamps().len();
1412 assert_eq!(batch.sequences().len(), ts_len);
1413 assert_eq!(batch.op_types().len(), ts_len);
1414 for f in batch.fields() {
1415 assert_eq!(f.data.len(), ts_len);
1416 }
1417
1418 let mut rows = vec![];
1419 for idx in 0..ts_len {
1420 let mut row = Vec::with_capacity(batch.fields().len() + 3);
1421 row.push(batch.timestamps().get(idx));
1422 row.push(batch.sequences().get(idx));
1423 row.push(batch.op_types().get(idx));
1424 row.extend(batch.fields().iter().map(|f| f.data.get(idx)));
1425 rows.push(row);
1426 }
1427
1428 assert_eq!(expect.len(), rows.len());
1429 for (idx, row) in rows.iter().enumerate() {
1430 assert_eq!(&expect[idx], row);
1431 }
1432 }
1433
1434 #[test]
1435 fn test_values_sort() {
1436 let schema = schema_for_test();
1437 let timestamp = Arc::new(TimestampMillisecondVector::from_vec(vec![1, 2, 3, 4, 3]));
1438 let sequence = Arc::new(UInt64Vector::from_vec(vec![1, 1, 1, 1, 2]));
1439 let op_type = Arc::new(UInt8Vector::from_vec(vec![1, 1, 1, 1, 0]));
1440
1441 let fields = vec![
1442 Arc::new(Int64Vector::from_vec(vec![4, 3, 2, 1, 2])) as Arc<_>,
1443 Arc::new(Float64Vector::from_vec(vec![1.1, 2.1, 4.2, 3.3, 4.2])) as Arc<_>,
1444 ];
1445 let values = Values {
1446 timestamp: timestamp as Arc<_>,
1447 sequence,
1448 op_type,
1449 fields,
1450 };
1451
1452 let batch = values
1453 .to_batch(
1454 b"test",
1455 &schema,
1456 &[0, 1, 2, 3, 4].into_iter().collect(),
1457 true,
1458 )
1459 .unwrap();
1460 check_value(
1461 &batch,
1462 vec![
1463 vec![
1464 Value::Timestamp(Timestamp::new_millisecond(1)),
1465 Value::UInt64(1),
1466 Value::UInt8(1),
1467 Value::Int64(4),
1468 Value::Float64(OrderedFloat(1.1)),
1469 ],
1470 vec![
1471 Value::Timestamp(Timestamp::new_millisecond(2)),
1472 Value::UInt64(1),
1473 Value::UInt8(1),
1474 Value::Int64(3),
1475 Value::Float64(OrderedFloat(2.1)),
1476 ],
1477 vec![
1478 Value::Timestamp(Timestamp::new_millisecond(3)),
1479 Value::UInt64(2),
1480 Value::UInt8(0),
1481 Value::Int64(2),
1482 Value::Float64(OrderedFloat(4.2)),
1483 ],
1484 vec![
1485 Value::Timestamp(Timestamp::new_millisecond(4)),
1486 Value::UInt64(1),
1487 Value::UInt8(1),
1488 Value::Int64(1),
1489 Value::Float64(OrderedFloat(3.3)),
1490 ],
1491 ],
1492 )
1493 }
1494
1495 fn build_key_values(schema: &RegionMetadataRef, k0: String, k1: i64, len: usize) -> KeyValues {
1496 let column_schema = schema
1497 .column_metadatas
1498 .iter()
1499 .map(|c| api::v1::ColumnSchema {
1500 column_name: c.column_schema.name.clone(),
1501 datatype: ColumnDataTypeWrapper::try_from(c.column_schema.data_type.clone())
1502 .unwrap()
1503 .datatype() as i32,
1504 semantic_type: c.semantic_type as i32,
1505 ..Default::default()
1506 })
1507 .collect();
1508
1509 let rows = (0..len)
1510 .map(|i| Row {
1511 values: vec![
1512 api::v1::Value {
1513 value_data: Some(ValueData::StringValue(k0.clone())),
1514 },
1515 api::v1::Value {
1516 value_data: Some(ValueData::I64Value(k1)),
1517 },
1518 api::v1::Value {
1519 value_data: Some(ValueData::TimestampMillisecondValue(i as i64)),
1520 },
1521 api::v1::Value {
1522 value_data: Some(ValueData::I64Value(i as i64)),
1523 },
1524 api::v1::Value {
1525 value_data: Some(ValueData::F64Value(i as f64)),
1526 },
1527 ],
1528 })
1529 .collect();
1530 let mutation = api::v1::Mutation {
1531 op_type: 1,
1532 sequence: 0,
1533 rows: Some(Rows {
1534 schema: column_schema,
1535 rows,
1536 }),
1537 write_hint: None,
1538 };
1539 KeyValues::new(schema.as_ref(), mutation).unwrap()
1540 }
1541
1542 #[test]
1543 fn test_series_set_concurrency() {
1544 let schema = schema_for_test();
1545 let row_codec = Arc::new(DensePrimaryKeyCodec::with_fields(
1546 schema
1547 .primary_key_columns()
1548 .map(|c| {
1549 (
1550 c.column_id,
1551 SortField::new(c.column_schema.data_type.clone()),
1552 )
1553 })
1554 .collect(),
1555 ));
1556 let set = Arc::new(SeriesSet::new(schema.clone(), row_codec));
1557
1558 let concurrency = 32;
1559 let pk_num = concurrency * 2;
1560 let mut handles = Vec::with_capacity(concurrency);
1561 for i in 0..concurrency {
1562 let set = set.clone();
1563 let schema = schema.clone();
1564 let column_schemas = schema
1565 .column_metadatas
1566 .iter()
1567 .map(column_metadata_to_column_schema)
1568 .collect::<Vec<_>>();
1569 let handle = std::thread::spawn(move || {
1570 for j in i * 100..(i + 1) * 100 {
1571 let pk = j % pk_num;
1572 let primary_key = format!("pk-{}", pk).as_bytes().to_vec();
1573
1574 let kvs = KeyValues::new(
1575 &schema,
1576 Mutation {
1577 op_type: OpType::Put as i32,
1578 sequence: j as u64,
1579 rows: Some(Rows {
1580 schema: column_schemas.clone(),
1581 rows: vec![Row {
1582 values: vec![
1583 api::v1::Value {
1584 value_data: Some(ValueData::StringValue(format!(
1585 "{}",
1586 j
1587 ))),
1588 },
1589 api::v1::Value {
1590 value_data: Some(ValueData::I64Value(j as i64)),
1591 },
1592 api::v1::Value {
1593 value_data: Some(ValueData::TimestampMillisecondValue(
1594 j as i64,
1595 )),
1596 },
1597 api::v1::Value {
1598 value_data: Some(ValueData::I64Value(j as i64)),
1599 },
1600 api::v1::Value {
1601 value_data: Some(ValueData::F64Value(j as f64)),
1602 },
1603 ],
1604 }],
1605 }),
1606 write_hint: None,
1607 },
1608 )
1609 .unwrap();
1610 set.push_to_series(primary_key, &kvs.iter().next().unwrap());
1611 }
1612 });
1613 handles.push(handle);
1614 }
1615 for h in handles {
1616 h.join().unwrap();
1617 }
1618
1619 let mut timestamps = Vec::with_capacity(concurrency * 100);
1620 let mut sequences = Vec::with_capacity(concurrency * 100);
1621 let mut op_types = Vec::with_capacity(concurrency * 100);
1622 let mut v0 = Vec::with_capacity(concurrency * 100);
1623
1624 for i in 0..pk_num {
1625 let pk = format!("pk-{}", i).as_bytes().to_vec();
1626 let series = set.get_series(&pk).unwrap();
1627 let mut guard = series.write().unwrap();
1628 let values = guard.compact(&schema).unwrap();
1629 timestamps.extend(values.sequence.iter_data().map(|v| v.unwrap() as i64));
1630 sequences.extend(values.sequence.iter_data().map(|v| v.unwrap() as i64));
1631 op_types.extend(values.op_type.iter_data().map(|v| v.unwrap()));
1632 v0.extend(
1633 values
1634 .fields
1635 .first()
1636 .unwrap()
1637 .as_any()
1638 .downcast_ref::<Int64Vector>()
1639 .unwrap()
1640 .iter_data()
1641 .map(|v| v.unwrap()),
1642 );
1643 }
1644
1645 let expected_sequence = (0..(concurrency * 100) as i64).collect::<HashSet<_>>();
1646 assert_eq!(
1647 expected_sequence,
1648 sequences.iter().copied().collect::<HashSet<_>>()
1649 );
1650
1651 op_types.iter().all(|op| *op == OpType::Put as u8);
1652 assert_eq!(
1653 expected_sequence,
1654 timestamps.iter().copied().collect::<HashSet<_>>()
1655 );
1656
1657 assert_eq!(timestamps, sequences);
1658 assert_eq!(v0, timestamps);
1659 }
1660
1661 #[test]
1662 fn test_memtable() {
1663 common_telemetry::init_default_ut_logging();
1664 check_memtable_dedup(true);
1665 check_memtable_dedup(false);
1666 }
1667
1668 fn check_memtable_dedup(dedup: bool) {
1669 let schema = schema_for_test();
1670 let kvs = build_key_values(&schema, "hello".to_string(), 42, 100);
1671 let memtable = TimeSeriesMemtable::new(schema, 42, None, dedup, MergeMode::LastRow);
1672 memtable.write(&kvs).unwrap();
1673 memtable.write(&kvs).unwrap();
1674
1675 let mut expected_ts: HashMap<i64, usize> = HashMap::new();
1676 for ts in kvs
1677 .iter()
1678 .map(|kv| kv.timestamp().as_timestamp().unwrap().unwrap().value())
1679 {
1680 *expected_ts.entry(ts).or_default() += if dedup { 1 } else { 2 };
1681 }
1682
1683 let iter = memtable.iter(None, None, None).unwrap();
1684 let mut read = HashMap::new();
1685
1686 for ts in iter
1687 .flat_map(|batch| {
1688 batch
1689 .unwrap()
1690 .timestamps()
1691 .as_any()
1692 .downcast_ref::<TimestampMillisecondVector>()
1693 .unwrap()
1694 .iter_data()
1695 .collect::<Vec<_>>()
1696 .into_iter()
1697 })
1698 .map(|v| v.unwrap().0.value())
1699 {
1700 *read.entry(ts).or_default() += 1;
1701 }
1702 assert_eq!(expected_ts, read);
1703
1704 let stats = memtable.stats();
1705 assert!(stats.bytes_allocated() > 0);
1706 assert_eq!(
1707 Some((
1708 Timestamp::new_millisecond(0),
1709 Timestamp::new_millisecond(99)
1710 )),
1711 stats.time_range()
1712 );
1713 }
1714
1715 #[test]
1716 fn test_memtable_projection() {
1717 common_telemetry::init_default_ut_logging();
1718 let schema = schema_for_test();
1719 let kvs = build_key_values(&schema, "hello".to_string(), 42, 100);
1720 let memtable = TimeSeriesMemtable::new(schema, 42, None, true, MergeMode::LastRow);
1721 memtable.write(&kvs).unwrap();
1722
1723 let iter = memtable.iter(Some(&[3]), None, None).unwrap();
1724
1725 let mut v0_all = vec![];
1726
1727 for res in iter {
1728 let batch = res.unwrap();
1729 assert_eq!(1, batch.fields().len());
1730 let v0 = batch
1731 .fields()
1732 .first()
1733 .unwrap()
1734 .data
1735 .as_any()
1736 .downcast_ref::<Int64Vector>()
1737 .unwrap();
1738 v0_all.extend(v0.iter_data().map(|v| v.unwrap()));
1739 }
1740 assert_eq!((0..100i64).collect::<Vec<_>>(), v0_all);
1741 }
1742
1743 #[test]
1744 fn test_memtable_concurrent_write_read() {
1745 common_telemetry::init_default_ut_logging();
1746 let schema = schema_for_test();
1747 let memtable = Arc::new(TimeSeriesMemtable::new(
1748 schema.clone(),
1749 42,
1750 None,
1751 true,
1752 MergeMode::LastRow,
1753 ));
1754
1755 let num_writers = 10;
1757 let num_readers = 5;
1759 let series_per_writer = 100;
1761 let rows_per_series = 10;
1763 let total_series = num_writers * series_per_writer;
1765
1766 let barrier = Arc::new(std::sync::Barrier::new(num_writers + num_readers + 1));
1768
1769 let mut writer_handles = Vec::with_capacity(num_writers);
1771 for writer_id in 0..num_writers {
1772 let memtable = memtable.clone();
1773 let schema = schema.clone();
1774 let barrier = barrier.clone();
1775
1776 let handle = std::thread::spawn(move || {
1777 barrier.wait();
1779
1780 for series_id in 0..series_per_writer {
1782 let series_key = format!("writer-{}-series-{}", writer_id, series_id);
1783 let kvs =
1784 build_key_values(&schema, series_key, series_id as i64, rows_per_series);
1785 memtable.write(&kvs).unwrap();
1786 }
1787 });
1788
1789 writer_handles.push(handle);
1790 }
1791
1792 let mut reader_handles = Vec::with_capacity(num_readers);
1794 for _ in 0..num_readers {
1795 let memtable = memtable.clone();
1796 let barrier = barrier.clone();
1797
1798 let handle = std::thread::spawn(move || {
1799 barrier.wait();
1800
1801 for _ in 0..10 {
1802 let iter = memtable.iter(None, None, None).unwrap();
1803 for batch_result in iter {
1804 let _ = batch_result.unwrap();
1805 }
1806 }
1807 });
1808
1809 reader_handles.push(handle);
1810 }
1811
1812 barrier.wait();
1813
1814 for handle in writer_handles {
1815 handle.join().unwrap();
1816 }
1817 for handle in reader_handles {
1818 handle.join().unwrap();
1819 }
1820
1821 let iter = memtable.iter(None, None, None).unwrap();
1822 let mut series_count = 0;
1823 let mut row_count = 0;
1824
1825 for batch_result in iter {
1826 let batch = batch_result.unwrap();
1827 series_count += 1;
1828 row_count += batch.num_rows();
1829 }
1830 assert_eq!(total_series, series_count);
1831 assert_eq!(total_series * rows_per_series, row_count);
1832 }
1833}