1use std::collections::btree_map::Entry;
16use std::collections::{BTreeMap, Bound, HashSet};
17use std::fmt::{Debug, Formatter};
18use std::sync::atomic::{AtomicI64, AtomicU64, AtomicUsize, Ordering};
19use std::sync::{Arc, RwLock};
20use std::time::{Duration, Instant};
21
22use api::v1::OpType;
23use common_recordbatch::filter::SimpleFilterEvaluator;
24use common_telemetry::{debug, error};
25use common_time::Timestamp;
26use datatypes::arrow;
27use datatypes::arrow::array::ArrayRef;
28use datatypes::data_type::{ConcreteDataType, DataType};
29use datatypes::prelude::{MutableVector, Vector, VectorRef};
30use datatypes::types::TimestampType;
31use datatypes::value::{Value, ValueRef};
32use datatypes::vectors::{
33 Helper, TimestampMicrosecondVector, TimestampMillisecondVector, TimestampNanosecondVector,
34 TimestampSecondVector, UInt64Vector, UInt8Vector,
35};
36use snafu::{ensure, ResultExt};
37use store_api::metadata::RegionMetadataRef;
38use store_api::storage::{ColumnId, SequenceNumber};
39use table::predicate::Predicate;
40
41use crate::error::{
42 ComputeArrowSnafu, ConvertVectorSnafu, PrimaryKeyLengthMismatchSnafu, Result,
43 UnsupportedOperationSnafu,
44};
45use crate::flush::WriteBufferManagerRef;
46use crate::memtable::key_values::KeyValue;
47use crate::memtable::stats::WriteMetrics;
48use crate::memtable::{
49 AllocTracker, BoxedBatchIterator, BulkPart, IterBuilder, KeyValues, Memtable, MemtableBuilder,
50 MemtableId, MemtableRange, MemtableRangeContext, MemtableRanges, MemtableRef, MemtableStats,
51 PredicateGroup,
52};
53use crate::metrics::{READ_ROWS_TOTAL, READ_STAGE_ELAPSED};
54use crate::read::dedup::LastNonNullIter;
55use crate::read::{Batch, BatchBuilder, BatchColumn};
56use crate::region::options::MergeMode;
57use crate::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt};
58
59const INITIAL_BUILDER_CAPACITY: usize = 16;
61
62const BUILDER_CAPACITY: usize = 512;
64
65#[derive(Debug, Default)]
67pub struct TimeSeriesMemtableBuilder {
68 write_buffer_manager: Option<WriteBufferManagerRef>,
69 dedup: bool,
70 merge_mode: MergeMode,
71}
72
73impl TimeSeriesMemtableBuilder {
74 pub fn new(
76 write_buffer_manager: Option<WriteBufferManagerRef>,
77 dedup: bool,
78 merge_mode: MergeMode,
79 ) -> Self {
80 Self {
81 write_buffer_manager,
82 dedup,
83 merge_mode,
84 }
85 }
86}
87
88impl MemtableBuilder for TimeSeriesMemtableBuilder {
89 fn build(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef {
90 Arc::new(TimeSeriesMemtable::new(
91 metadata.clone(),
92 id,
93 self.write_buffer_manager.clone(),
94 self.dedup,
95 self.merge_mode,
96 ))
97 }
98}
99
100pub struct TimeSeriesMemtable {
102 id: MemtableId,
103 region_metadata: RegionMetadataRef,
104 row_codec: Arc<DensePrimaryKeyCodec>,
105 series_set: SeriesSet,
106 alloc_tracker: AllocTracker,
107 max_timestamp: AtomicI64,
108 min_timestamp: AtomicI64,
109 max_sequence: AtomicU64,
110 dedup: bool,
111 merge_mode: MergeMode,
112 num_rows: AtomicUsize,
114}
115
116impl TimeSeriesMemtable {
117 pub fn new(
118 region_metadata: RegionMetadataRef,
119 id: MemtableId,
120 write_buffer_manager: Option<WriteBufferManagerRef>,
121 dedup: bool,
122 merge_mode: MergeMode,
123 ) -> Self {
124 let row_codec = Arc::new(DensePrimaryKeyCodec::new(®ion_metadata));
125 let series_set = SeriesSet::new(region_metadata.clone(), row_codec.clone());
126 let dedup = if merge_mode == MergeMode::LastNonNull {
127 false
128 } else {
129 dedup
130 };
131 Self {
132 id,
133 region_metadata,
134 series_set,
135 row_codec,
136 alloc_tracker: AllocTracker::new(write_buffer_manager),
137 max_timestamp: AtomicI64::new(i64::MIN),
138 min_timestamp: AtomicI64::new(i64::MAX),
139 max_sequence: AtomicU64::new(0),
140 dedup,
141 merge_mode,
142 num_rows: Default::default(),
143 }
144 }
145
146 fn update_stats(&self, stats: WriteMetrics) {
148 self.alloc_tracker
149 .on_allocation(stats.key_bytes + stats.value_bytes);
150 self.max_timestamp.fetch_max(stats.max_ts, Ordering::SeqCst);
151 self.min_timestamp.fetch_min(stats.min_ts, Ordering::SeqCst);
152 }
153
154 fn write_key_value(&self, kv: KeyValue, stats: &mut WriteMetrics) -> Result<()> {
155 ensure!(
156 self.row_codec.num_fields() == kv.num_primary_keys(),
157 PrimaryKeyLengthMismatchSnafu {
158 expect: self.row_codec.num_fields(),
159 actual: kv.num_primary_keys(),
160 }
161 );
162
163 let primary_key_encoded = self.row_codec.encode(kv.primary_keys())?;
164
165 let (key_allocated, value_allocated) =
166 self.series_set.push_to_series(primary_key_encoded, &kv);
167 stats.key_bytes += key_allocated;
168 stats.value_bytes += value_allocated;
169
170 let ts = kv.timestamp().as_timestamp().unwrap().unwrap().value();
172 stats.min_ts = stats.min_ts.min(ts);
173 stats.max_ts = stats.max_ts.max(ts);
174 Ok(())
175 }
176}
177
178impl Debug for TimeSeriesMemtable {
179 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
180 f.debug_struct("TimeSeriesMemtable").finish()
181 }
182}
183
184impl Memtable for TimeSeriesMemtable {
185 fn id(&self) -> MemtableId {
186 self.id
187 }
188
189 fn write(&self, kvs: &KeyValues) -> Result<()> {
190 if kvs.is_empty() {
191 return Ok(());
192 }
193
194 let mut local_stats = WriteMetrics::default();
195
196 for kv in kvs.iter() {
197 self.write_key_value(kv, &mut local_stats)?;
198 }
199 local_stats.value_bytes += kvs.num_rows() * std::mem::size_of::<Timestamp>();
200 local_stats.value_bytes += kvs.num_rows() * std::mem::size_of::<OpType>();
201
202 self.update_stats(local_stats);
206
207 let sequence = kvs.max_sequence();
209 self.max_sequence.fetch_max(sequence, Ordering::Relaxed);
210
211 self.num_rows.fetch_add(kvs.num_rows(), Ordering::Relaxed);
212 Ok(())
213 }
214
215 fn write_one(&self, key_value: KeyValue) -> Result<()> {
216 let mut metrics = WriteMetrics::default();
217 let res = self.write_key_value(key_value, &mut metrics);
218 metrics.value_bytes += std::mem::size_of::<Timestamp>() + std::mem::size_of::<OpType>();
219
220 self.update_stats(metrics);
221
222 if res.is_ok() {
224 self.max_sequence
225 .fetch_max(key_value.sequence(), Ordering::Relaxed);
226 }
227
228 self.num_rows.fetch_add(1, Ordering::Relaxed);
229 res
230 }
231
232 fn write_bulk(&self, _part: BulkPart) -> Result<()> {
233 UnsupportedOperationSnafu {
234 err_msg: "TimeSeriesMemtable does not support write_bulk",
235 }
236 .fail()
237 }
238
239 fn iter(
240 &self,
241 projection: Option<&[ColumnId]>,
242 filters: Option<Predicate>,
243 sequence: Option<SequenceNumber>,
244 ) -> Result<BoxedBatchIterator> {
245 let projection = if let Some(projection) = projection {
246 projection.iter().copied().collect()
247 } else {
248 self.region_metadata
249 .field_columns()
250 .map(|c| c.column_id)
251 .collect()
252 };
253
254 let iter = self
255 .series_set
256 .iter_series(projection, filters, self.dedup, sequence)?;
257
258 if self.merge_mode == MergeMode::LastNonNull {
259 let iter = LastNonNullIter::new(iter);
260 Ok(Box::new(iter))
261 } else {
262 Ok(Box::new(iter))
263 }
264 }
265
266 fn ranges(
267 &self,
268 projection: Option<&[ColumnId]>,
269 predicate: PredicateGroup,
270 sequence: Option<SequenceNumber>,
271 ) -> MemtableRanges {
272 let projection = if let Some(projection) = projection {
273 projection.iter().copied().collect()
274 } else {
275 self.region_metadata
276 .field_columns()
277 .map(|c| c.column_id)
278 .collect()
279 };
280 let builder = Box::new(TimeSeriesIterBuilder {
281 series_set: self.series_set.clone(),
282 projection,
283 predicate: predicate.predicate().cloned(),
284 dedup: self.dedup,
285 merge_mode: self.merge_mode,
286 sequence,
287 });
288 let context = Arc::new(MemtableRangeContext::new(self.id, builder, predicate));
289
290 MemtableRanges {
291 ranges: [(0, MemtableRange::new(context))].into(),
292 stats: self.stats(),
293 }
294 }
295
296 fn is_empty(&self) -> bool {
297 self.series_set.series.read().unwrap().is_empty()
298 }
299
300 fn freeze(&self) -> Result<()> {
301 self.alloc_tracker.done_allocating();
302
303 Ok(())
304 }
305
306 fn stats(&self) -> MemtableStats {
307 let estimated_bytes = self.alloc_tracker.bytes_allocated();
308
309 if estimated_bytes == 0 {
310 return MemtableStats {
312 estimated_bytes,
313 time_range: None,
314 num_rows: 0,
315 num_ranges: 0,
316 max_sequence: 0,
317 };
318 }
319 let ts_type = self
320 .region_metadata
321 .time_index_column()
322 .column_schema
323 .data_type
324 .clone()
325 .as_timestamp()
326 .expect("Timestamp column must have timestamp type");
327 let max_timestamp = ts_type.create_timestamp(self.max_timestamp.load(Ordering::Relaxed));
328 let min_timestamp = ts_type.create_timestamp(self.min_timestamp.load(Ordering::Relaxed));
329 MemtableStats {
330 estimated_bytes,
331 time_range: Some((min_timestamp, max_timestamp)),
332 num_rows: self.num_rows.load(Ordering::Relaxed),
333 num_ranges: 1,
334 max_sequence: self.max_sequence.load(Ordering::Relaxed),
335 }
336 }
337
338 fn fork(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef {
339 Arc::new(TimeSeriesMemtable::new(
340 metadata.clone(),
341 id,
342 self.alloc_tracker.write_buffer_manager(),
343 self.dedup,
344 self.merge_mode,
345 ))
346 }
347}
348
349type SeriesRwLockMap = RwLock<BTreeMap<Vec<u8>, Arc<RwLock<Series>>>>;
350
351#[derive(Clone)]
352struct SeriesSet {
353 region_metadata: RegionMetadataRef,
354 series: Arc<SeriesRwLockMap>,
355 codec: Arc<DensePrimaryKeyCodec>,
356}
357
358impl SeriesSet {
359 fn new(region_metadata: RegionMetadataRef, codec: Arc<DensePrimaryKeyCodec>) -> Self {
360 Self {
361 region_metadata,
362 series: Default::default(),
363 codec,
364 }
365 }
366}
367
368impl SeriesSet {
369 fn push_to_series(&self, primary_key: Vec<u8>, kv: &KeyValue) -> (usize, usize) {
371 if let Some(series) = self.series.read().unwrap().get(&primary_key) {
372 let value_allocated = series.write().unwrap().push(
373 kv.timestamp(),
374 kv.sequence(),
375 kv.op_type(),
376 kv.fields(),
377 );
378 return (0, value_allocated);
379 };
380
381 let mut indices = self.series.write().unwrap();
382 match indices.entry(primary_key) {
383 Entry::Vacant(v) => {
384 let key_len = v.key().len();
385 let mut series = Series::new(&self.region_metadata);
386 let value_allocated =
387 series.push(kv.timestamp(), kv.sequence(), kv.op_type(), kv.fields());
388 v.insert(Arc::new(RwLock::new(series)));
389 (key_len, value_allocated)
390 }
391 Entry::Occupied(v) => {
393 let value_allocated = v.get().write().unwrap().push(
394 kv.timestamp(),
395 kv.sequence(),
396 kv.op_type(),
397 kv.fields(),
398 );
399 (0, value_allocated)
400 }
401 }
402 }
403
404 #[cfg(test)]
405 fn get_series(&self, primary_key: &[u8]) -> Option<Arc<RwLock<Series>>> {
406 self.series.read().unwrap().get(primary_key).cloned()
407 }
408
409 fn iter_series(
411 &self,
412 projection: HashSet<ColumnId>,
413 predicate: Option<Predicate>,
414 dedup: bool,
415 sequence: Option<SequenceNumber>,
416 ) -> Result<Iter> {
417 let primary_key_schema = primary_key_schema(&self.region_metadata);
418 let primary_key_datatypes = self
419 .region_metadata
420 .primary_key_columns()
421 .map(|pk| pk.column_schema.data_type.clone())
422 .collect();
423
424 Iter::try_new(
425 self.region_metadata.clone(),
426 self.series.clone(),
427 projection,
428 predicate,
429 primary_key_schema,
430 primary_key_datatypes,
431 self.codec.clone(),
432 dedup,
433 sequence,
434 )
435 }
436}
437
438pub(crate) fn primary_key_schema(
441 region_metadata: &RegionMetadataRef,
442) -> arrow::datatypes::SchemaRef {
443 let fields = region_metadata
444 .primary_key_columns()
445 .map(|pk| {
446 arrow::datatypes::Field::new(
447 pk.column_schema.name.clone(),
448 pk.column_schema.data_type.as_arrow_type(),
449 pk.column_schema.is_nullable(),
450 )
451 })
452 .collect::<Vec<_>>();
453 Arc::new(arrow::datatypes::Schema::new(fields))
454}
455
456#[derive(Debug, Default)]
458struct Metrics {
459 total_series: usize,
461 num_pruned_series: usize,
463 num_rows: usize,
465 num_batches: usize,
467 scan_cost: Duration,
469}
470
471struct Iter {
472 metadata: RegionMetadataRef,
473 series: Arc<SeriesRwLockMap>,
474 projection: HashSet<ColumnId>,
475 last_key: Option<Vec<u8>>,
476 predicate: Vec<SimpleFilterEvaluator>,
477 pk_schema: arrow::datatypes::SchemaRef,
478 pk_datatypes: Vec<ConcreteDataType>,
479 codec: Arc<DensePrimaryKeyCodec>,
480 dedup: bool,
481 sequence: Option<SequenceNumber>,
482 metrics: Metrics,
483}
484
485impl Iter {
486 #[allow(clippy::too_many_arguments)]
487 pub(crate) fn try_new(
488 metadata: RegionMetadataRef,
489 series: Arc<SeriesRwLockMap>,
490 projection: HashSet<ColumnId>,
491 predicate: Option<Predicate>,
492 pk_schema: arrow::datatypes::SchemaRef,
493 pk_datatypes: Vec<ConcreteDataType>,
494 codec: Arc<DensePrimaryKeyCodec>,
495 dedup: bool,
496 sequence: Option<SequenceNumber>,
497 ) -> Result<Self> {
498 let predicate = predicate
499 .map(|predicate| {
500 predicate
501 .exprs()
502 .iter()
503 .filter_map(SimpleFilterEvaluator::try_new)
504 .collect::<Vec<_>>()
505 })
506 .unwrap_or_default();
507 Ok(Self {
508 metadata,
509 series,
510 projection,
511 last_key: None,
512 predicate,
513 pk_schema,
514 pk_datatypes,
515 codec,
516 dedup,
517 sequence,
518 metrics: Metrics::default(),
519 })
520 }
521}
522
523impl Drop for Iter {
524 fn drop(&mut self) {
525 debug!(
526 "Iter {} time series memtable, metrics: {:?}",
527 self.metadata.region_id, self.metrics
528 );
529
530 READ_ROWS_TOTAL
531 .with_label_values(&["time_series_memtable"])
532 .inc_by(self.metrics.num_rows as u64);
533 READ_STAGE_ELAPSED
534 .with_label_values(&["scan_memtable"])
535 .observe(self.metrics.scan_cost.as_secs_f64());
536 }
537}
538
539impl Iterator for Iter {
540 type Item = Result<Batch>;
541
542 fn next(&mut self) -> Option<Self::Item> {
543 let start = Instant::now();
544 let map = self.series.read().unwrap();
545 let range = match &self.last_key {
546 None => map.range::<Vec<u8>, _>(..),
547 Some(last_key) => {
548 map.range::<Vec<u8>, _>((Bound::Excluded(last_key), Bound::Unbounded))
549 }
550 };
551
552 for (primary_key, series) in range {
554 self.metrics.total_series += 1;
555
556 let mut series = series.write().unwrap();
557 if !self.predicate.is_empty()
558 && !prune_primary_key(
559 &self.codec,
560 primary_key.as_slice(),
561 &mut series,
562 &self.pk_datatypes,
563 self.pk_schema.clone(),
564 &self.predicate,
565 )
566 {
567 self.metrics.num_pruned_series += 1;
569 continue;
570 }
571 self.last_key = Some(primary_key.clone());
572
573 let values = series.compact(&self.metadata);
574 let batch = values.and_then(|v| {
575 v.to_batch(primary_key, &self.metadata, &self.projection, self.dedup)
576 });
577
578 self.metrics.num_batches += 1;
580 self.metrics.num_rows += batch.as_ref().map(|b| b.num_rows()).unwrap_or(0);
581 self.metrics.scan_cost += start.elapsed();
582
583 let mut batch = batch;
584 batch = batch.and_then(|mut batch| {
585 batch.filter_by_sequence(self.sequence)?;
586 Ok(batch)
587 });
588 return Some(batch);
589 }
590 self.metrics.scan_cost += start.elapsed();
591
592 None
593 }
594}
595
596fn prune_primary_key(
597 codec: &Arc<DensePrimaryKeyCodec>,
598 pk: &[u8],
599 series: &mut Series,
600 datatypes: &[ConcreteDataType],
601 pk_schema: arrow::datatypes::SchemaRef,
602 predicates: &[SimpleFilterEvaluator],
603) -> bool {
604 if pk_schema.fields().is_empty() {
606 return true;
607 }
608
609 let pk_values = if let Some(pk_values) = series.pk_cache.as_ref() {
611 pk_values
612 } else {
613 let pk_values = codec.decode_dense_without_column_id(pk);
614 if let Err(e) = pk_values {
615 error!(e; "Failed to decode primary key");
616 return true;
617 }
618 series.update_pk_cache(pk_values.unwrap());
619 series.pk_cache.as_ref().unwrap()
620 };
621
622 let mut result = true;
624 for predicate in predicates {
625 let Ok(index) = pk_schema.index_of(predicate.column_name()) else {
627 continue;
628 };
629 let scalar_value = pk_values[index]
631 .try_to_scalar_value(&datatypes[index])
632 .unwrap();
633 result &= predicate.evaluate_scalar(&scalar_value).unwrap_or(true);
634 }
635
636 result
637}
638
639struct Series {
641 pk_cache: Option<Vec<Value>>,
642 active: ValueBuilder,
643 frozen: Vec<Values>,
644 region_metadata: RegionMetadataRef,
645}
646
647impl Series {
648 fn new(region_metadata: &RegionMetadataRef) -> Self {
649 Self {
650 pk_cache: None,
651 active: ValueBuilder::new(region_metadata, INITIAL_BUILDER_CAPACITY),
652 frozen: vec![],
653 region_metadata: region_metadata.clone(),
654 }
655 }
656
657 fn push<'a>(
659 &mut self,
660 ts: ValueRef<'a>,
661 sequence: u64,
662 op_type: OpType,
663 values: impl Iterator<Item = ValueRef<'a>>,
664 ) -> usize {
665 if self.active.len() + 10 > BUILDER_CAPACITY {
667 let region_metadata = self.region_metadata.clone();
668 self.freeze(®ion_metadata);
669 }
670 self.active.push(ts, sequence, op_type as u8, values)
671 }
672
673 fn update_pk_cache(&mut self, pk_values: Vec<Value>) {
674 self.pk_cache = Some(pk_values);
675 }
676
677 fn freeze(&mut self, region_metadata: &RegionMetadataRef) {
679 if self.active.len() != 0 {
680 let mut builder = ValueBuilder::new(region_metadata, INITIAL_BUILDER_CAPACITY);
681 std::mem::swap(&mut self.active, &mut builder);
682 self.frozen.push(Values::from(builder));
683 }
684 }
685
686 fn compact(&mut self, region_metadata: &RegionMetadataRef) -> Result<&Values> {
689 self.freeze(region_metadata);
690
691 let frozen = &self.frozen;
692
693 debug_assert!(!frozen.is_empty());
695
696 if frozen.len() > 1 {
697 let column_size = frozen[0].fields.len() + 3;
701
702 if cfg!(debug_assertions) {
703 debug_assert!(frozen
704 .iter()
705 .zip(frozen.iter().skip(1))
706 .all(|(prev, next)| { prev.fields.len() == next.fields.len() }));
707 }
708
709 let arrays = frozen.iter().map(|v| v.columns()).collect::<Vec<_>>();
710 let concatenated = (0..column_size)
711 .map(|i| {
712 let to_concat = arrays.iter().map(|a| a[i].as_ref()).collect::<Vec<_>>();
713 arrow::compute::concat(&to_concat)
714 })
715 .collect::<std::result::Result<Vec<_>, _>>()
716 .context(ComputeArrowSnafu)?;
717
718 debug_assert_eq!(concatenated.len(), column_size);
719 let values = Values::from_columns(&concatenated)?;
720 self.frozen = vec![values];
721 };
722 Ok(&self.frozen[0])
723 }
724}
725
726struct ValueBuilder {
728 timestamp: Vec<i64>,
729 timestamp_type: ConcreteDataType,
730 sequence: Vec<u64>,
731 op_type: Vec<u8>,
732 fields: Vec<Option<Box<dyn MutableVector>>>,
733 field_types: Vec<ConcreteDataType>,
734}
735
736impl ValueBuilder {
737 fn new(region_metadata: &RegionMetadataRef, capacity: usize) -> Self {
738 let timestamp_type = region_metadata
739 .time_index_column()
740 .column_schema
741 .data_type
742 .clone();
743 let sequence = Vec::with_capacity(capacity);
744 let op_type = Vec::with_capacity(capacity);
745
746 let field_types = region_metadata
747 .field_columns()
748 .map(|c| c.column_schema.data_type.clone())
749 .collect::<Vec<_>>();
750 let fields = (0..field_types.len()).map(|_| None).collect();
751
752 Self {
753 timestamp: Vec::with_capacity(capacity),
754 timestamp_type,
755 sequence,
756 op_type,
757 fields,
758 field_types,
759 }
760 }
761
762 fn push<'a>(
768 &mut self,
769 ts: ValueRef,
770 sequence: u64,
771 op_type: u8,
772 fields: impl Iterator<Item = ValueRef<'a>>,
773 ) -> usize {
774 #[cfg(debug_assertions)]
775 let fields = {
776 let field_vec = fields.collect::<Vec<_>>();
777 debug_assert_eq!(field_vec.len(), self.fields.len());
778 field_vec.into_iter()
779 };
780
781 self.timestamp
782 .push(ts.as_timestamp().unwrap().unwrap().value());
783 self.sequence.push(sequence);
784 self.op_type.push(op_type);
785 let num_rows = self.timestamp.len();
786 let mut size = 0;
787 for (idx, field_value) in fields.enumerate() {
788 size += field_value.data_size();
789 if !field_value.is_null() || self.fields[idx].is_some() {
790 if let Some(field) = self.fields[idx].as_mut() {
791 let _ = field.try_push_value_ref(field_value);
792 } else {
793 let mut mutable_vector = self.field_types[idx]
794 .create_mutable_vector(num_rows.max(INITIAL_BUILDER_CAPACITY));
795 mutable_vector.push_nulls(num_rows - 1);
796 let _ = mutable_vector.try_push_value_ref(field_value);
797 self.fields[idx] = Some(mutable_vector);
798 }
799 }
800 }
801
802 size
803 }
804
805 fn len(&self) -> usize {
807 let sequence_len = self.sequence.len();
808 debug_assert_eq!(sequence_len, self.op_type.len());
809 debug_assert_eq!(sequence_len, self.timestamp.len());
810 sequence_len
811 }
812}
813
814#[derive(Clone)]
816struct Values {
817 timestamp: VectorRef,
818 sequence: Arc<UInt64Vector>,
819 op_type: Arc<UInt8Vector>,
820 fields: Vec<VectorRef>,
821}
822
823impl Values {
824 pub fn to_batch(
827 &self,
828 primary_key: &[u8],
829 metadata: &RegionMetadataRef,
830 projection: &HashSet<ColumnId>,
831 dedup: bool,
832 ) -> Result<Batch> {
833 let builder = BatchBuilder::with_required_columns(
834 primary_key.to_vec(),
835 self.timestamp.clone(),
836 self.sequence.clone(),
837 self.op_type.clone(),
838 );
839
840 let fields = metadata
841 .field_columns()
842 .zip(self.fields.iter())
843 .filter_map(|(c, f)| {
844 projection.get(&c.column_id).map(|c| BatchColumn {
845 column_id: *c,
846 data: f.clone(),
847 })
848 })
849 .collect();
850
851 let mut batch = builder.with_fields(fields).build()?;
852 batch.sort(dedup)?;
853 Ok(batch)
854 }
855
856 fn columns(&self) -> Vec<ArrayRef> {
858 let mut res = Vec::with_capacity(3 + self.fields.len());
859 res.push(self.timestamp.to_arrow_array());
860 res.push(self.sequence.to_arrow_array());
861 res.push(self.op_type.to_arrow_array());
862 res.extend(self.fields.iter().map(|f| f.to_arrow_array()));
863 res
864 }
865
866 fn from_columns(cols: &[ArrayRef]) -> Result<Self> {
868 debug_assert!(cols.len() >= 3);
869 let timestamp = Helper::try_into_vector(&cols[0]).context(ConvertVectorSnafu)?;
870 let sequence =
871 Arc::new(UInt64Vector::try_from_arrow_array(&cols[1]).context(ConvertVectorSnafu)?);
872 let op_type =
873 Arc::new(UInt8Vector::try_from_arrow_array(&cols[2]).context(ConvertVectorSnafu)?);
874 let fields = Helper::try_into_vectors(&cols[3..]).context(ConvertVectorSnafu)?;
875
876 Ok(Self {
877 timestamp,
878 sequence,
879 op_type,
880 fields,
881 })
882 }
883}
884
885impl From<ValueBuilder> for Values {
886 fn from(mut value: ValueBuilder) -> Self {
887 let num_rows = value.len();
888 let fields = value
889 .fields
890 .iter_mut()
891 .enumerate()
892 .map(|(i, v)| {
893 if let Some(v) = v {
894 v.to_vector()
895 } else {
896 let mut single_null = value.field_types[i].create_mutable_vector(num_rows);
897 single_null.push_nulls(num_rows);
898 single_null.to_vector()
899 }
900 })
901 .collect::<Vec<_>>();
902 let sequence = Arc::new(UInt64Vector::from_vec(value.sequence));
903 let op_type = Arc::new(UInt8Vector::from_vec(value.op_type));
904 let timestamp: VectorRef = match value.timestamp_type {
905 ConcreteDataType::Timestamp(TimestampType::Second(_)) => {
906 Arc::new(TimestampSecondVector::from_vec(value.timestamp))
907 }
908 ConcreteDataType::Timestamp(TimestampType::Millisecond(_)) => {
909 Arc::new(TimestampMillisecondVector::from_vec(value.timestamp))
910 }
911 ConcreteDataType::Timestamp(TimestampType::Microsecond(_)) => {
912 Arc::new(TimestampMicrosecondVector::from_vec(value.timestamp))
913 }
914 ConcreteDataType::Timestamp(TimestampType::Nanosecond(_)) => {
915 Arc::new(TimestampNanosecondVector::from_vec(value.timestamp))
916 }
917 _ => unreachable!(),
918 };
919
920 if cfg!(debug_assertions) {
921 debug_assert_eq!(timestamp.len(), sequence.len());
922 debug_assert_eq!(timestamp.len(), op_type.len());
923 for field in &fields {
924 debug_assert_eq!(timestamp.len(), field.len());
925 }
926 }
927
928 Self {
929 timestamp,
930 sequence,
931 op_type,
932 fields,
933 }
934 }
935}
936
937struct TimeSeriesIterBuilder {
938 series_set: SeriesSet,
939 projection: HashSet<ColumnId>,
940 predicate: Option<Predicate>,
941 dedup: bool,
942 sequence: Option<SequenceNumber>,
943 merge_mode: MergeMode,
944}
945
946impl IterBuilder for TimeSeriesIterBuilder {
947 fn build(&self) -> Result<BoxedBatchIterator> {
948 let iter = self.series_set.iter_series(
949 self.projection.clone(),
950 self.predicate.clone(),
951 self.dedup,
952 self.sequence,
953 )?;
954
955 if self.merge_mode == MergeMode::LastNonNull {
956 let iter = LastNonNullIter::new(iter);
957 Ok(Box::new(iter))
958 } else {
959 Ok(Box::new(iter))
960 }
961 }
962}
963
964#[cfg(test)]
965mod tests {
966 use std::collections::{HashMap, HashSet};
967
968 use api::helper::ColumnDataTypeWrapper;
969 use api::v1::value::ValueData;
970 use api::v1::{Mutation, Row, Rows, SemanticType};
971 use common_time::Timestamp;
972 use datatypes::prelude::{ConcreteDataType, ScalarVector};
973 use datatypes::schema::ColumnSchema;
974 use datatypes::value::{OrderedFloat, Value};
975 use datatypes::vectors::{Float64Vector, Int64Vector, TimestampMillisecondVector};
976 use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
977 use store_api::storage::RegionId;
978
979 use super::*;
980 use crate::row_converter::SortField;
981 use crate::test_util::column_metadata_to_column_schema;
982
983 fn schema_for_test() -> RegionMetadataRef {
984 let mut builder = RegionMetadataBuilder::new(RegionId::new(123, 456));
985 builder
986 .push_column_metadata(ColumnMetadata {
987 column_schema: ColumnSchema::new("k0", ConcreteDataType::string_datatype(), false),
988 semantic_type: SemanticType::Tag,
989 column_id: 0,
990 })
991 .push_column_metadata(ColumnMetadata {
992 column_schema: ColumnSchema::new("k1", ConcreteDataType::int64_datatype(), false),
993 semantic_type: SemanticType::Tag,
994 column_id: 1,
995 })
996 .push_column_metadata(ColumnMetadata {
997 column_schema: ColumnSchema::new(
998 "ts",
999 ConcreteDataType::timestamp_millisecond_datatype(),
1000 false,
1001 ),
1002 semantic_type: SemanticType::Timestamp,
1003 column_id: 2,
1004 })
1005 .push_column_metadata(ColumnMetadata {
1006 column_schema: ColumnSchema::new("v0", ConcreteDataType::int64_datatype(), true),
1007 semantic_type: SemanticType::Field,
1008 column_id: 3,
1009 })
1010 .push_column_metadata(ColumnMetadata {
1011 column_schema: ColumnSchema::new("v1", ConcreteDataType::float64_datatype(), true),
1012 semantic_type: SemanticType::Field,
1013 column_id: 4,
1014 })
1015 .primary_key(vec![0, 1]);
1016 let region_metadata = builder.build().unwrap();
1017 Arc::new(region_metadata)
1018 }
1019
1020 fn ts_value_ref(val: i64) -> ValueRef<'static> {
1021 ValueRef::Timestamp(Timestamp::new_millisecond(val))
1022 }
1023
1024 fn field_value_ref(v0: i64, v1: f64) -> impl Iterator<Item = ValueRef<'static>> {
1025 vec![ValueRef::Int64(v0), ValueRef::Float64(OrderedFloat(v1))].into_iter()
1026 }
1027
1028 fn check_values(values: &Values, expect: &[(i64, u64, u8, i64, f64)]) {
1029 let ts = values
1030 .timestamp
1031 .as_any()
1032 .downcast_ref::<TimestampMillisecondVector>()
1033 .unwrap();
1034
1035 let v0 = values.fields[0]
1036 .as_any()
1037 .downcast_ref::<Int64Vector>()
1038 .unwrap();
1039 let v1 = values.fields[1]
1040 .as_any()
1041 .downcast_ref::<Float64Vector>()
1042 .unwrap();
1043 let read = ts
1044 .iter_data()
1045 .zip(values.sequence.iter_data())
1046 .zip(values.op_type.iter_data())
1047 .zip(v0.iter_data())
1048 .zip(v1.iter_data())
1049 .map(|((((ts, sequence), op_type), v0), v1)| {
1050 (
1051 ts.unwrap().0.value(),
1052 sequence.unwrap(),
1053 op_type.unwrap(),
1054 v0.unwrap(),
1055 v1.unwrap(),
1056 )
1057 })
1058 .collect::<Vec<_>>();
1059 assert_eq!(expect, &read);
1060 }
1061
1062 #[test]
1063 fn test_series() {
1064 let region_metadata = schema_for_test();
1065 let mut series = Series::new(®ion_metadata);
1066 series.push(ts_value_ref(1), 0, OpType::Put, field_value_ref(1, 10.1));
1067 series.push(ts_value_ref(2), 0, OpType::Put, field_value_ref(2, 10.2));
1068 assert_eq!(2, series.active.timestamp.len());
1069 assert_eq!(0, series.frozen.len());
1070
1071 let values = series.compact(®ion_metadata).unwrap();
1072 check_values(values, &[(1, 0, 1, 1, 10.1), (2, 0, 1, 2, 10.2)]);
1073 assert_eq!(0, series.active.timestamp.len());
1074 assert_eq!(1, series.frozen.len());
1075 }
1076
1077 #[test]
1078 fn test_series_with_nulls() {
1079 let region_metadata = schema_for_test();
1080 let mut series = Series::new(®ion_metadata);
1081 series.push(
1084 ts_value_ref(1),
1085 0,
1086 OpType::Put,
1087 vec![ValueRef::Null, ValueRef::Null].into_iter(),
1088 );
1089 series.push(
1090 ts_value_ref(1),
1091 0,
1092 OpType::Put,
1093 vec![ValueRef::Int64(1), ValueRef::Null].into_iter(),
1094 );
1095 series.push(ts_value_ref(1), 2, OpType::Put, field_value_ref(2, 10.2));
1096 series.push(
1097 ts_value_ref(1),
1098 3,
1099 OpType::Put,
1100 vec![ValueRef::Int64(2), ValueRef::Null].into_iter(),
1101 );
1102 assert_eq!(4, series.active.timestamp.len());
1103 assert_eq!(0, series.frozen.len());
1104
1105 let values = series.compact(®ion_metadata).unwrap();
1106 assert_eq!(values.fields[0].null_count(), 1);
1107 assert_eq!(values.fields[1].null_count(), 3);
1108 assert_eq!(0, series.active.timestamp.len());
1109 assert_eq!(1, series.frozen.len());
1110 }
1111
1112 fn check_value(batch: &Batch, expect: Vec<Vec<Value>>) {
1113 let ts_len = batch.timestamps().len();
1114 assert_eq!(batch.sequences().len(), ts_len);
1115 assert_eq!(batch.op_types().len(), ts_len);
1116 for f in batch.fields() {
1117 assert_eq!(f.data.len(), ts_len);
1118 }
1119
1120 let mut rows = vec![];
1121 for idx in 0..ts_len {
1122 let mut row = Vec::with_capacity(batch.fields().len() + 3);
1123 row.push(batch.timestamps().get(idx));
1124 row.push(batch.sequences().get(idx));
1125 row.push(batch.op_types().get(idx));
1126 row.extend(batch.fields().iter().map(|f| f.data.get(idx)));
1127 rows.push(row);
1128 }
1129
1130 assert_eq!(expect.len(), rows.len());
1131 for (idx, row) in rows.iter().enumerate() {
1132 assert_eq!(&expect[idx], row);
1133 }
1134 }
1135
1136 #[test]
1137 fn test_values_sort() {
1138 let schema = schema_for_test();
1139 let timestamp = Arc::new(TimestampMillisecondVector::from_vec(vec![1, 2, 3, 4, 3]));
1140 let sequence = Arc::new(UInt64Vector::from_vec(vec![1, 1, 1, 1, 2]));
1141 let op_type = Arc::new(UInt8Vector::from_vec(vec![1, 1, 1, 1, 0]));
1142
1143 let fields = vec![
1144 Arc::new(Int64Vector::from_vec(vec![4, 3, 2, 1, 2])) as Arc<_>,
1145 Arc::new(Float64Vector::from_vec(vec![1.1, 2.1, 4.2, 3.3, 4.2])) as Arc<_>,
1146 ];
1147 let values = Values {
1148 timestamp: timestamp as Arc<_>,
1149 sequence,
1150 op_type,
1151 fields,
1152 };
1153
1154 let batch = values
1155 .to_batch(
1156 b"test",
1157 &schema,
1158 &[0, 1, 2, 3, 4].into_iter().collect(),
1159 true,
1160 )
1161 .unwrap();
1162 check_value(
1163 &batch,
1164 vec![
1165 vec![
1166 Value::Timestamp(Timestamp::new_millisecond(1)),
1167 Value::UInt64(1),
1168 Value::UInt8(1),
1169 Value::Int64(4),
1170 Value::Float64(OrderedFloat(1.1)),
1171 ],
1172 vec![
1173 Value::Timestamp(Timestamp::new_millisecond(2)),
1174 Value::UInt64(1),
1175 Value::UInt8(1),
1176 Value::Int64(3),
1177 Value::Float64(OrderedFloat(2.1)),
1178 ],
1179 vec![
1180 Value::Timestamp(Timestamp::new_millisecond(3)),
1181 Value::UInt64(2),
1182 Value::UInt8(0),
1183 Value::Int64(2),
1184 Value::Float64(OrderedFloat(4.2)),
1185 ],
1186 vec![
1187 Value::Timestamp(Timestamp::new_millisecond(4)),
1188 Value::UInt64(1),
1189 Value::UInt8(1),
1190 Value::Int64(1),
1191 Value::Float64(OrderedFloat(3.3)),
1192 ],
1193 ],
1194 )
1195 }
1196
1197 fn build_key_values(schema: &RegionMetadataRef, k0: String, k1: i64, len: usize) -> KeyValues {
1198 let column_schema = schema
1199 .column_metadatas
1200 .iter()
1201 .map(|c| api::v1::ColumnSchema {
1202 column_name: c.column_schema.name.clone(),
1203 datatype: ColumnDataTypeWrapper::try_from(c.column_schema.data_type.clone())
1204 .unwrap()
1205 .datatype() as i32,
1206 semantic_type: c.semantic_type as i32,
1207 ..Default::default()
1208 })
1209 .collect();
1210
1211 let rows = (0..len)
1212 .map(|i| Row {
1213 values: vec![
1214 api::v1::Value {
1215 value_data: Some(ValueData::StringValue(k0.clone())),
1216 },
1217 api::v1::Value {
1218 value_data: Some(ValueData::I64Value(k1)),
1219 },
1220 api::v1::Value {
1221 value_data: Some(ValueData::TimestampMillisecondValue(i as i64)),
1222 },
1223 api::v1::Value {
1224 value_data: Some(ValueData::I64Value(i as i64)),
1225 },
1226 api::v1::Value {
1227 value_data: Some(ValueData::F64Value(i as f64)),
1228 },
1229 ],
1230 })
1231 .collect();
1232 let mutation = api::v1::Mutation {
1233 op_type: 1,
1234 sequence: 0,
1235 rows: Some(Rows {
1236 schema: column_schema,
1237 rows,
1238 }),
1239 write_hint: None,
1240 };
1241 KeyValues::new(schema.as_ref(), mutation).unwrap()
1242 }
1243
1244 #[test]
1245 fn test_series_set_concurrency() {
1246 let schema = schema_for_test();
1247 let row_codec = Arc::new(DensePrimaryKeyCodec::with_fields(
1248 schema
1249 .primary_key_columns()
1250 .map(|c| {
1251 (
1252 c.column_id,
1253 SortField::new(c.column_schema.data_type.clone()),
1254 )
1255 })
1256 .collect(),
1257 ));
1258 let set = Arc::new(SeriesSet::new(schema.clone(), row_codec));
1259
1260 let concurrency = 32;
1261 let pk_num = concurrency * 2;
1262 let mut handles = Vec::with_capacity(concurrency);
1263 for i in 0..concurrency {
1264 let set = set.clone();
1265 let schema = schema.clone();
1266 let column_schemas = schema
1267 .column_metadatas
1268 .iter()
1269 .map(column_metadata_to_column_schema)
1270 .collect::<Vec<_>>();
1271 let handle = std::thread::spawn(move || {
1272 for j in i * 100..(i + 1) * 100 {
1273 let pk = j % pk_num;
1274 let primary_key = format!("pk-{}", pk).as_bytes().to_vec();
1275
1276 let kvs = KeyValues::new(
1277 &schema,
1278 Mutation {
1279 op_type: OpType::Put as i32,
1280 sequence: j as u64,
1281 rows: Some(Rows {
1282 schema: column_schemas.clone(),
1283 rows: vec![Row {
1284 values: vec![
1285 api::v1::Value {
1286 value_data: Some(ValueData::StringValue(format!(
1287 "{}",
1288 j
1289 ))),
1290 },
1291 api::v1::Value {
1292 value_data: Some(ValueData::I64Value(j as i64)),
1293 },
1294 api::v1::Value {
1295 value_data: Some(ValueData::TimestampMillisecondValue(
1296 j as i64,
1297 )),
1298 },
1299 api::v1::Value {
1300 value_data: Some(ValueData::I64Value(j as i64)),
1301 },
1302 api::v1::Value {
1303 value_data: Some(ValueData::F64Value(j as f64)),
1304 },
1305 ],
1306 }],
1307 }),
1308 write_hint: None,
1309 },
1310 )
1311 .unwrap();
1312 set.push_to_series(primary_key, &kvs.iter().next().unwrap());
1313 }
1314 });
1315 handles.push(handle);
1316 }
1317 for h in handles {
1318 h.join().unwrap();
1319 }
1320
1321 let mut timestamps = Vec::with_capacity(concurrency * 100);
1322 let mut sequences = Vec::with_capacity(concurrency * 100);
1323 let mut op_types = Vec::with_capacity(concurrency * 100);
1324 let mut v0 = Vec::with_capacity(concurrency * 100);
1325
1326 for i in 0..pk_num {
1327 let pk = format!("pk-{}", i).as_bytes().to_vec();
1328 let series = set.get_series(&pk).unwrap();
1329 let mut guard = series.write().unwrap();
1330 let values = guard.compact(&schema).unwrap();
1331 timestamps.extend(values.sequence.iter_data().map(|v| v.unwrap() as i64));
1332 sequences.extend(values.sequence.iter_data().map(|v| v.unwrap() as i64));
1333 op_types.extend(values.op_type.iter_data().map(|v| v.unwrap()));
1334 v0.extend(
1335 values
1336 .fields
1337 .first()
1338 .unwrap()
1339 .as_any()
1340 .downcast_ref::<Int64Vector>()
1341 .unwrap()
1342 .iter_data()
1343 .map(|v| v.unwrap()),
1344 );
1345 }
1346
1347 let expected_sequence = (0..(concurrency * 100) as i64).collect::<HashSet<_>>();
1348 assert_eq!(
1349 expected_sequence,
1350 sequences.iter().copied().collect::<HashSet<_>>()
1351 );
1352
1353 op_types.iter().all(|op| *op == OpType::Put as u8);
1354 assert_eq!(
1355 expected_sequence,
1356 timestamps.iter().copied().collect::<HashSet<_>>()
1357 );
1358
1359 assert_eq!(timestamps, sequences);
1360 assert_eq!(v0, timestamps);
1361 }
1362
1363 #[test]
1364 fn test_memtable() {
1365 common_telemetry::init_default_ut_logging();
1366 check_memtable_dedup(true);
1367 check_memtable_dedup(false);
1368 }
1369
1370 fn check_memtable_dedup(dedup: bool) {
1371 let schema = schema_for_test();
1372 let kvs = build_key_values(&schema, "hello".to_string(), 42, 100);
1373 let memtable = TimeSeriesMemtable::new(schema, 42, None, dedup, MergeMode::LastRow);
1374 memtable.write(&kvs).unwrap();
1375 memtable.write(&kvs).unwrap();
1376
1377 let mut expected_ts: HashMap<i64, usize> = HashMap::new();
1378 for ts in kvs
1379 .iter()
1380 .map(|kv| kv.timestamp().as_timestamp().unwrap().unwrap().value())
1381 {
1382 *expected_ts.entry(ts).or_default() += if dedup { 1 } else { 2 };
1383 }
1384
1385 let iter = memtable.iter(None, None, None).unwrap();
1386 let mut read = HashMap::new();
1387
1388 for ts in iter
1389 .flat_map(|batch| {
1390 batch
1391 .unwrap()
1392 .timestamps()
1393 .as_any()
1394 .downcast_ref::<TimestampMillisecondVector>()
1395 .unwrap()
1396 .iter_data()
1397 .collect::<Vec<_>>()
1398 .into_iter()
1399 })
1400 .map(|v| v.unwrap().0.value())
1401 {
1402 *read.entry(ts).or_default() += 1;
1403 }
1404 assert_eq!(expected_ts, read);
1405
1406 let stats = memtable.stats();
1407 assert!(stats.bytes_allocated() > 0);
1408 assert_eq!(
1409 Some((
1410 Timestamp::new_millisecond(0),
1411 Timestamp::new_millisecond(99)
1412 )),
1413 stats.time_range()
1414 );
1415 }
1416
1417 #[test]
1418 fn test_memtable_projection() {
1419 common_telemetry::init_default_ut_logging();
1420 let schema = schema_for_test();
1421 let kvs = build_key_values(&schema, "hello".to_string(), 42, 100);
1422 let memtable = TimeSeriesMemtable::new(schema, 42, None, true, MergeMode::LastRow);
1423 memtable.write(&kvs).unwrap();
1424
1425 let iter = memtable.iter(Some(&[3]), None, None).unwrap();
1426
1427 let mut v0_all = vec![];
1428
1429 for res in iter {
1430 let batch = res.unwrap();
1431 assert_eq!(1, batch.fields().len());
1432 let v0 = batch
1433 .fields()
1434 .first()
1435 .unwrap()
1436 .data
1437 .as_any()
1438 .downcast_ref::<Int64Vector>()
1439 .unwrap();
1440 v0_all.extend(v0.iter_data().map(|v| v.unwrap()));
1441 }
1442 assert_eq!((0..100i64).collect::<Vec<_>>(), v0_all);
1443 }
1444
1445 #[test]
1446 fn test_memtable_concurrent_write_read() {
1447 common_telemetry::init_default_ut_logging();
1448 let schema = schema_for_test();
1449 let memtable = Arc::new(TimeSeriesMemtable::new(
1450 schema.clone(),
1451 42,
1452 None,
1453 true,
1454 MergeMode::LastRow,
1455 ));
1456
1457 let num_writers = 10;
1459 let num_readers = 5;
1461 let series_per_writer = 100;
1463 let rows_per_series = 10;
1465 let total_series = num_writers * series_per_writer;
1467
1468 let barrier = Arc::new(std::sync::Barrier::new(num_writers + num_readers + 1));
1470
1471 let mut writer_handles = Vec::with_capacity(num_writers);
1473 for writer_id in 0..num_writers {
1474 let memtable = memtable.clone();
1475 let schema = schema.clone();
1476 let barrier = barrier.clone();
1477
1478 let handle = std::thread::spawn(move || {
1479 barrier.wait();
1481
1482 for series_id in 0..series_per_writer {
1484 let series_key = format!("writer-{}-series-{}", writer_id, series_id);
1485 let kvs =
1486 build_key_values(&schema, series_key, series_id as i64, rows_per_series);
1487 memtable.write(&kvs).unwrap();
1488 }
1489 });
1490
1491 writer_handles.push(handle);
1492 }
1493
1494 let mut reader_handles = Vec::with_capacity(num_readers);
1496 for _ in 0..num_readers {
1497 let memtable = memtable.clone();
1498 let barrier = barrier.clone();
1499
1500 let handle = std::thread::spawn(move || {
1501 barrier.wait();
1502
1503 for _ in 0..10 {
1504 let iter = memtable.iter(None, None, None).unwrap();
1505 for batch_result in iter {
1506 let _ = batch_result.unwrap();
1507 }
1508 }
1509 });
1510
1511 reader_handles.push(handle);
1512 }
1513
1514 barrier.wait();
1515
1516 for handle in writer_handles {
1517 handle.join().unwrap();
1518 }
1519 for handle in reader_handles {
1520 handle.join().unwrap();
1521 }
1522
1523 let iter = memtable.iter(None, None, None).unwrap();
1524 let mut series_count = 0;
1525 let mut row_count = 0;
1526
1527 for batch_result in iter {
1528 let batch = batch_result.unwrap();
1529 series_count += 1;
1530 row_count += batch.num_rows();
1531 }
1532 assert_eq!(total_series, series_count);
1533 assert_eq!(total_series * rows_per_series, row_count);
1534 }
1535}