1#[cfg(any(test, feature = "test"))]
16mod test_only;
17
18use std::collections::HashSet;
19use std::fmt::{Debug, Formatter};
20use std::sync::atomic::{AtomicI64, AtomicU64, AtomicUsize, Ordering};
21use std::sync::{Arc, RwLock};
22use std::time::{Duration, Instant};
23
24use api::v1::OpType;
25use datatypes::vectors::Helper;
26use mito_codec::key_values::KeyValue;
27use rayon::prelude::*;
28use snafu::{OptionExt, ResultExt};
29use store_api::metadata::RegionMetadataRef;
30use store_api::storage::ColumnId;
31
32use crate::flush::WriteBufferManagerRef;
33use crate::memtable::bulk::part::BulkPart;
34use crate::memtable::stats::WriteMetrics;
35use crate::memtable::time_series::Series;
36use crate::memtable::{
37 AllocTracker, BoxedBatchIterator, IterBuilder, KeyValues, MemScanMetrics, Memtable, MemtableId,
38 MemtableRange, MemtableRangeContext, MemtableRanges, MemtableRef, MemtableStats, RangesOptions,
39};
40use crate::metrics::MEMTABLE_ACTIVE_SERIES_COUNT;
41use crate::read::Batch;
42use crate::read::dedup::LastNonNullIter;
43use crate::region::options::MergeMode;
44use crate::{error, metrics};
45
46pub struct SimpleBulkMemtable {
47 id: MemtableId,
48 region_metadata: RegionMetadataRef,
49 alloc_tracker: AllocTracker,
50 max_timestamp: AtomicI64,
51 min_timestamp: AtomicI64,
52 max_sequence: AtomicU64,
53 dedup: bool,
54 merge_mode: MergeMode,
55 num_rows: AtomicUsize,
56 series: RwLock<Series>,
57}
58
59impl Drop for SimpleBulkMemtable {
60 fn drop(&mut self) {
61 MEMTABLE_ACTIVE_SERIES_COUNT.dec();
62 }
63}
64
65impl SimpleBulkMemtable {
66 pub fn new(
67 id: MemtableId,
68 region_metadata: RegionMetadataRef,
69 write_buffer_manager: Option<WriteBufferManagerRef>,
70 dedup: bool,
71 merge_mode: MergeMode,
72 ) -> Self {
73 let dedup = if merge_mode == MergeMode::LastNonNull {
74 false
75 } else {
76 dedup
77 };
78 let series = RwLock::new(Series::with_capacity(®ion_metadata, 1024, 8192));
79
80 Self {
81 id,
82 region_metadata,
83 alloc_tracker: AllocTracker::new(write_buffer_manager),
84 max_timestamp: AtomicI64::new(i64::MIN),
85 min_timestamp: AtomicI64::new(i64::MAX),
86 max_sequence: AtomicU64::new(0),
87 dedup,
88 merge_mode,
89 num_rows: AtomicUsize::new(0),
90 series,
91 }
92 }
93
94 fn build_projection(&self, projection: Option<&[ColumnId]>) -> HashSet<ColumnId> {
95 if let Some(projection) = projection {
96 projection.iter().copied().collect()
97 } else {
98 self.region_metadata
99 .field_columns()
100 .map(|c| c.column_id)
101 .collect()
102 }
103 }
104
105 fn write_key_value(&self, kv: KeyValue, stats: &mut WriteMetrics) {
106 let ts = kv.timestamp();
107 let sequence = kv.sequence();
108 let op_type = kv.op_type();
109 let mut series = self.series.write().unwrap();
110 let size = series.push(ts, sequence, op_type, kv.fields());
111 stats.value_bytes += size;
112 let ts = kv
114 .timestamp()
115 .try_into_timestamp()
116 .unwrap()
117 .unwrap()
118 .value();
119 stats.min_ts = stats.min_ts.min(ts);
120 stats.max_ts = stats.max_ts.max(ts);
121 }
122
123 fn update_stats(&self, stats: WriteMetrics) {
125 self.alloc_tracker
126 .on_allocation(stats.key_bytes + stats.value_bytes);
127 self.num_rows.fetch_add(stats.num_rows, Ordering::SeqCst);
128 self.max_timestamp.fetch_max(stats.max_ts, Ordering::SeqCst);
129 self.min_timestamp.fetch_min(stats.min_ts, Ordering::SeqCst);
130 self.max_sequence
131 .fetch_max(stats.max_sequence, Ordering::SeqCst);
132 }
133
134 #[cfg(test)]
135 fn schema(&self) -> &RegionMetadataRef {
136 &self.region_metadata
137 }
138}
139
140impl Debug for SimpleBulkMemtable {
141 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
142 f.debug_struct("SimpleBulkMemtable").finish()
143 }
144}
145
146impl Memtable for SimpleBulkMemtable {
147 fn id(&self) -> MemtableId {
148 self.id
149 }
150
151 fn write(&self, kvs: &KeyValues) -> error::Result<()> {
152 let mut stats = WriteMetrics::default();
153 let max_sequence = kvs.max_sequence();
154 for kv in kvs.iter() {
155 self.write_key_value(kv, &mut stats);
156 }
157 stats.max_sequence = max_sequence;
158 stats.num_rows = kvs.num_rows();
159 self.update_stats(stats);
160 Ok(())
161 }
162
163 fn write_one(&self, kv: KeyValue) -> error::Result<()> {
164 debug_assert_eq!(0, kv.num_primary_keys());
165 let mut stats = WriteMetrics::default();
166 self.write_key_value(kv, &mut stats);
167 stats.num_rows = 1;
168 stats.max_sequence = kv.sequence();
169 self.update_stats(stats);
170 Ok(())
171 }
172
173 fn write_bulk(&self, part: BulkPart) -> error::Result<()> {
174 let rb = &part.batch;
175
176 let ts = Helper::try_into_vector(
177 rb.column_by_name(&self.region_metadata.time_index_column().column_schema.name)
178 .with_context(|| error::InvalidRequestSnafu {
179 region_id: self.region_metadata.region_id,
180 reason: "Timestamp not found",
181 })?,
182 )
183 .context(error::ConvertVectorSnafu)?;
184
185 let sequence = part.sequence;
186
187 let fields: Vec<_> = self
188 .region_metadata
189 .field_columns()
190 .map(|f| {
191 let array = rb.column_by_name(&f.column_schema.name).ok_or_else(|| {
192 error::InvalidRequestSnafu {
193 region_id: self.region_metadata.region_id,
194 reason: format!("Column {} not found", f.column_schema.name),
195 }
196 .build()
197 })?;
198 Helper::try_into_vector(array).context(error::ConvertVectorSnafu)
199 })
200 .collect::<error::Result<Vec<_>>>()?;
201
202 let mut series = self.series.write().unwrap();
203 let extend_timer = metrics::REGION_WORKER_HANDLE_WRITE_ELAPSED
204 .with_label_values(&["bulk_extend"])
205 .start_timer();
206 series.extend(ts, OpType::Put as u8, sequence, fields)?;
207 extend_timer.observe_duration();
208
209 self.update_stats(WriteMetrics {
210 key_bytes: 0,
211 value_bytes: part.estimated_size(),
212 min_ts: part.min_timestamp,
213 max_ts: part.max_timestamp,
214 num_rows: part.num_rows(),
215 max_sequence: sequence,
216 });
217 Ok(())
218 }
219
220 #[cfg(any(test, feature = "test"))]
221 fn iter(
222 &self,
223 projection: Option<&[ColumnId]>,
224 _predicate: Option<table::predicate::Predicate>,
225 sequence: Option<store_api::storage::SequenceRange>,
226 ) -> error::Result<BoxedBatchIterator> {
227 let iter = self.create_iter(projection, sequence)?.build(None)?;
228
229 if self.merge_mode == MergeMode::LastNonNull {
230 let iter = LastNonNullIter::new(iter);
231 Ok(Box::new(iter))
232 } else {
233 Ok(Box::new(iter))
234 }
235 }
236
237 fn ranges(
238 &self,
239 projection: Option<&[ColumnId]>,
240 options: RangesOptions,
241 ) -> error::Result<MemtableRanges> {
242 let predicate = options.predicate;
243 let sequence = options.sequence;
244 let start_time = Instant::now();
245 let projection = Arc::new(self.build_projection(projection));
246
247 let max_sequence = self.max_sequence.load(Ordering::Relaxed);
249 let time_range = {
250 let num_rows = self.num_rows.load(Ordering::Relaxed);
251 if num_rows > 0 {
252 let ts_type = self.region_metadata.time_index_type();
253 let max_timestamp =
254 ts_type.create_timestamp(self.max_timestamp.load(Ordering::Relaxed));
255 let min_timestamp =
256 ts_type.create_timestamp(self.min_timestamp.load(Ordering::Relaxed));
257 Some((min_timestamp, max_timestamp))
258 } else {
259 None
260 }
261 };
262
263 let values = self.series.read().unwrap().read_to_values();
264 let contexts = values
265 .into_par_iter()
266 .filter_map(|v| {
267 let filtered = match v
268 .to_batch(&[], &self.region_metadata, &projection, self.dedup)
269 .and_then(|mut b| {
270 b.filter_by_sequence(sequence)?;
271 Ok(b)
272 }) {
273 Ok(filtered) => filtered,
274 Err(e) => {
275 return Some(Err(e));
276 }
277 };
278 if filtered.is_empty() {
279 None
280 } else {
281 Some(Ok(filtered))
282 }
283 })
284 .map(|result| {
285 result.map(|batch| {
286 let num_rows = batch.num_rows();
287 let estimated_bytes = batch.memory_size();
288
289 let range_stats = MemtableStats {
290 estimated_bytes,
291 time_range,
292 num_rows,
293 num_ranges: 1,
294 max_sequence,
295 series_count: 1,
296 };
297
298 let builder = BatchRangeBuilder {
299 batch,
300 merge_mode: self.merge_mode,
301 scan_cost: start_time.elapsed(),
302 };
303 (
304 range_stats,
305 Arc::new(MemtableRangeContext::new(
306 self.id,
307 Box::new(builder),
308 predicate.clone(),
309 )),
310 )
311 })
312 })
313 .collect::<error::Result<Vec<_>>>()?;
314
315 let ranges = contexts
316 .into_iter()
317 .enumerate()
318 .map(|(idx, (range_stats, context))| (idx, MemtableRange::new(context, range_stats)))
319 .collect();
320
321 Ok(MemtableRanges { ranges })
322 }
323
324 fn is_empty(&self) -> bool {
325 self.series.read().unwrap().is_empty()
326 }
327
328 fn freeze(&self) -> error::Result<()> {
329 self.series.write().unwrap().freeze(&self.region_metadata);
330 Ok(())
331 }
332
333 fn stats(&self) -> MemtableStats {
334 let estimated_bytes = self.alloc_tracker.bytes_allocated();
335 let num_rows = self.num_rows.load(Ordering::Relaxed);
336 if num_rows == 0 {
337 return MemtableStats {
339 estimated_bytes,
340 time_range: None,
341 num_rows: 0,
342 num_ranges: 0,
343 max_sequence: 0,
344 series_count: 0,
345 };
346 }
347 let ts_type = self.region_metadata.time_index_type();
348 let max_timestamp = ts_type.create_timestamp(self.max_timestamp.load(Ordering::Relaxed));
349 let min_timestamp = ts_type.create_timestamp(self.min_timestamp.load(Ordering::Relaxed));
350 MemtableStats {
351 estimated_bytes,
352 time_range: Some((min_timestamp, max_timestamp)),
353 num_rows,
354 num_ranges: 1,
355 max_sequence: self.max_sequence.load(Ordering::Relaxed),
356 series_count: 1,
357 }
358 }
359
360 fn fork(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef {
361 Arc::new(Self::new(
362 id,
363 metadata.clone(),
364 self.alloc_tracker.write_buffer_manager(),
365 self.dedup,
366 self.merge_mode,
367 ))
368 }
369}
370
371#[derive(Clone)]
372pub struct BatchRangeBuilder {
373 pub batch: Batch,
374 pub merge_mode: MergeMode,
375 scan_cost: Duration,
376}
377
378impl IterBuilder for BatchRangeBuilder {
379 fn build(&self, metrics: Option<MemScanMetrics>) -> error::Result<BoxedBatchIterator> {
380 let batch = self.batch.clone();
381 if let Some(metrics) = metrics {
382 let inner = crate::memtable::MemScanMetricsData {
383 total_series: 1,
384 num_rows: batch.num_rows(),
385 num_batches: 1,
386 scan_cost: self.scan_cost,
387 };
388 metrics.merge_inner(&inner);
389 }
390
391 let iter = Iter {
392 batch: Some(Ok(batch)),
393 };
394
395 if self.merge_mode == MergeMode::LastNonNull {
396 Ok(Box::new(LastNonNullIter::new(iter)))
397 } else {
398 Ok(Box::new(iter))
399 }
400 }
401}
402
403struct Iter {
404 batch: Option<error::Result<Batch>>,
405}
406
407impl Iterator for Iter {
408 type Item = error::Result<Batch>;
409
410 fn next(&mut self) -> Option<Self::Item> {
411 self.batch.take()
412 }
413}
414
415#[cfg(test)]
416mod tests {
417 use std::sync::Arc;
418
419 use api::v1::helper::row;
420 use api::v1::value::ValueData;
421 use api::v1::{Mutation, OpType, Rows, SemanticType};
422 use common_recordbatch::DfRecordBatch;
423 use common_time::Timestamp;
424 use datatypes::arrow::array::{ArrayRef, Float64Array, RecordBatch, TimestampMillisecondArray};
425 use datatypes::arrow_array::StringArray;
426 use datatypes::data_type::ConcreteDataType;
427 use datatypes::prelude::{ScalarVector, Vector};
428 use datatypes::schema::ColumnSchema;
429 use datatypes::value::Value;
430 use datatypes::vectors::TimestampMillisecondVector;
431 use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
432 use store_api::storage::{RegionId, SequenceNumber, SequenceRange};
433
434 use super::*;
435 use crate::read;
436 use crate::read::dedup::DedupReader;
437 use crate::read::merge::MergeReaderBuilder;
438 use crate::read::{BatchReader, Source};
439 use crate::region::options::MergeMode;
440 use crate::test_util::column_metadata_to_column_schema;
441
442 fn new_test_metadata() -> RegionMetadataRef {
443 let mut builder = RegionMetadataBuilder::new(1.into());
444 builder
445 .push_column_metadata(ColumnMetadata {
446 column_schema: ColumnSchema::new(
447 "ts",
448 ConcreteDataType::timestamp_millisecond_datatype(),
449 false,
450 ),
451 semantic_type: SemanticType::Timestamp,
452 column_id: 1,
453 })
454 .push_column_metadata(ColumnMetadata {
455 column_schema: ColumnSchema::new("f1", ConcreteDataType::float64_datatype(), true),
456 semantic_type: SemanticType::Field,
457 column_id: 2,
458 })
459 .push_column_metadata(ColumnMetadata {
460 column_schema: ColumnSchema::new("f2", ConcreteDataType::string_datatype(), true),
461 semantic_type: SemanticType::Field,
462 column_id: 3,
463 });
464 Arc::new(builder.build().unwrap())
465 }
466
467 fn new_test_memtable(dedup: bool, merge_mode: MergeMode) -> SimpleBulkMemtable {
468 SimpleBulkMemtable::new(1, new_test_metadata(), None, dedup, merge_mode)
469 }
470
471 fn build_key_values(
472 metadata: &RegionMetadataRef,
473 sequence: SequenceNumber,
474 row_values: &[(i64, f64, String)],
475 op_type: OpType,
476 ) -> KeyValues {
477 let column_schemas: Vec<_> = metadata
478 .column_metadatas
479 .iter()
480 .map(column_metadata_to_column_schema)
481 .collect();
482
483 let rows: Vec<_> = row_values
484 .iter()
485 .map(|(ts, f1, f2)| {
486 row(vec![
487 ValueData::TimestampMillisecondValue(*ts),
488 ValueData::F64Value(*f1),
489 ValueData::StringValue(f2.clone()),
490 ])
491 })
492 .collect();
493 let mutation = Mutation {
494 op_type: op_type as i32,
495 sequence,
496 rows: Some(Rows {
497 schema: column_schemas,
498 rows,
499 }),
500 write_hint: None,
501 };
502 KeyValues::new(metadata, mutation).unwrap()
503 }
504
505 #[test]
506 fn test_write_and_iter() {
507 let memtable = new_test_memtable(false, MergeMode::LastRow);
508 memtable
509 .write(&build_key_values(
510 &memtable.region_metadata,
511 0,
512 &[(1, 1.0, "a".to_string())],
513 OpType::Put,
514 ))
515 .unwrap();
516 memtable
517 .write(&build_key_values(
518 &memtable.region_metadata,
519 1,
520 &[(2, 2.0, "b".to_string())],
521 OpType::Put,
522 ))
523 .unwrap();
524
525 let mut iter = memtable.iter(None, None, None).unwrap();
526 let batch = iter.next().unwrap().unwrap();
527 assert_eq!(2, batch.num_rows());
528 assert_eq!(2, batch.fields().len());
529 let ts_v = batch
530 .timestamps()
531 .as_any()
532 .downcast_ref::<TimestampMillisecondVector>()
533 .unwrap();
534 assert_eq!(Value::Timestamp(Timestamp::new_millisecond(1)), ts_v.get(0));
535 assert_eq!(Value::Timestamp(Timestamp::new_millisecond(2)), ts_v.get(1));
536 }
537
538 #[test]
539 fn test_projection() {
540 let memtable = new_test_memtable(false, MergeMode::LastRow);
541 memtable
542 .write(&build_key_values(
543 &memtable.region_metadata,
544 0,
545 &[(1, 1.0, "a".to_string())],
546 OpType::Put,
547 ))
548 .unwrap();
549
550 let mut iter = memtable.iter(None, None, None).unwrap();
551 let batch = iter.next().unwrap().unwrap();
552 assert_eq!(1, batch.num_rows());
553 assert_eq!(2, batch.fields().len());
554
555 let ts_v = batch
556 .timestamps()
557 .as_any()
558 .downcast_ref::<TimestampMillisecondVector>()
559 .unwrap();
560 assert_eq!(Value::Timestamp(Timestamp::new_millisecond(1)), ts_v.get(0));
561
562 let projection = vec![2];
564 let mut iter = memtable.iter(Some(&projection), None, None).unwrap();
565 let batch = iter.next().unwrap().unwrap();
566
567 assert_eq!(1, batch.num_rows());
568 assert_eq!(1, batch.fields().len()); assert_eq!(2, batch.fields()[0].column_id);
570 }
571
572 #[test]
573 fn test_dedup() {
574 let memtable = new_test_memtable(true, MergeMode::LastRow);
575 memtable
576 .write(&build_key_values(
577 &memtable.region_metadata,
578 0,
579 &[(1, 1.0, "a".to_string())],
580 OpType::Put,
581 ))
582 .unwrap();
583 memtable
584 .write(&build_key_values(
585 &memtable.region_metadata,
586 1,
587 &[(1, 2.0, "b".to_string())],
588 OpType::Put,
589 ))
590 .unwrap();
591 let mut iter = memtable.iter(None, None, None).unwrap();
592 let batch = iter.next().unwrap().unwrap();
593
594 assert_eq!(1, batch.num_rows()); assert_eq!(2.0, batch.fields()[0].data.get(0).as_f64_lossy().unwrap()); }
597
598 #[test]
599 fn test_write_one() {
600 let memtable = new_test_memtable(false, MergeMode::LastRow);
601 let kvs = build_key_values(
602 &memtable.region_metadata,
603 0,
604 &[(1, 1.0, "a".to_string())],
605 OpType::Put,
606 );
607 let kv = kvs.iter().next().unwrap();
608 memtable.write_one(kv).unwrap();
609
610 let mut iter = memtable.iter(None, None, None).unwrap();
611 let batch = iter.next().unwrap().unwrap();
612 assert_eq!(1, batch.num_rows());
613 }
614
615 #[tokio::test]
616 async fn test_write_dedup() {
617 let memtable = new_test_memtable(true, MergeMode::LastRow);
618 let kvs = build_key_values(
619 &memtable.region_metadata,
620 0,
621 &[(1, 1.0, "a".to_string())],
622 OpType::Put,
623 );
624 let kv = kvs.iter().next().unwrap();
625 memtable.write_one(kv).unwrap();
626 memtable.freeze().unwrap();
627
628 let kvs = build_key_values(
629 &memtable.region_metadata,
630 1,
631 &[(1, 1.0, "a".to_string())],
632 OpType::Delete,
633 );
634 let kv = kvs.iter().next().unwrap();
635 memtable.write_one(kv).unwrap();
636
637 let ranges = memtable.ranges(None, RangesOptions::default()).unwrap();
638 let mut source = vec![];
639 for r in ranges.ranges.values() {
640 source.push(Source::Iter(r.build_iter().unwrap()));
641 }
642
643 let reader = MergeReaderBuilder::from_sources(source)
644 .build()
645 .await
646 .unwrap();
647
648 let mut reader = DedupReader::new(reader, read::dedup::LastRow::new(false), None);
649 let mut num_rows = 0;
650 while let Some(b) = reader.next_batch().await.unwrap() {
651 num_rows += b.num_rows();
652 }
653 assert_eq!(num_rows, 1);
654 }
655
656 #[tokio::test]
657 async fn test_delete_only() {
658 let memtable = new_test_memtable(true, MergeMode::LastRow);
659 let kvs = build_key_values(
660 &memtable.region_metadata,
661 0,
662 &[(1, 1.0, "a".to_string())],
663 OpType::Delete,
664 );
665 let kv = kvs.iter().next().unwrap();
666 memtable.write_one(kv).unwrap();
667 memtable.freeze().unwrap();
668
669 let ranges = memtable.ranges(None, RangesOptions::default()).unwrap();
670 let mut source = vec![];
671 for r in ranges.ranges.values() {
672 source.push(Source::Iter(r.build_iter().unwrap()));
673 }
674
675 let reader = MergeReaderBuilder::from_sources(source)
676 .build()
677 .await
678 .unwrap();
679
680 let mut reader = DedupReader::new(reader, read::dedup::LastRow::new(false), None);
681 let mut num_rows = 0;
682 while let Some(b) = reader.next_batch().await.unwrap() {
683 num_rows += b.num_rows();
684 assert_eq!(b.num_rows(), 1);
685 assert_eq!(b.op_types().get_data(0).unwrap(), OpType::Delete as u8);
686 }
687 assert_eq!(num_rows, 1);
688 }
689
690 #[tokio::test]
691 async fn test_single_range() {
692 let memtable = new_test_memtable(true, MergeMode::LastRow);
693 let kvs = build_key_values(
694 &memtable.region_metadata,
695 0,
696 &[(1, 1.0, "a".to_string())],
697 OpType::Put,
698 );
699 memtable.write_one(kvs.iter().next().unwrap()).unwrap();
700
701 let kvs = build_key_values(
702 &memtable.region_metadata,
703 1,
704 &[(1, 2.0, "b".to_string())],
705 OpType::Put,
706 );
707 memtable.write_one(kvs.iter().next().unwrap()).unwrap();
708 memtable.freeze().unwrap();
709
710 let ranges = memtable.ranges(None, RangesOptions::default()).unwrap();
711 assert_eq!(ranges.ranges.len(), 1);
712 let range = ranges.ranges.into_values().next().unwrap();
713 let mut reader = range.context.builder.build(None).unwrap();
714
715 let mut num_rows = 0;
716 while let Some(b) = reader.next().transpose().unwrap() {
717 num_rows += b.num_rows();
718 assert_eq!(b.fields()[1].data.get(0).as_string(), Some("b".to_string()));
719 }
720 assert_eq!(num_rows, 1);
721 }
722
723 #[test]
724 fn test_write_bulk() {
725 let memtable = new_test_memtable(false, MergeMode::LastRow);
726 let arrow_schema = memtable.schema().schema.arrow_schema().clone();
727 let arrays = vec![
728 Arc::new(TimestampMillisecondArray::from(vec![1, 2])) as ArrayRef,
729 Arc::new(Float64Array::from(vec![1.0, 2.0])) as ArrayRef,
730 Arc::new(StringArray::from(vec!["a", "b"])) as ArrayRef,
731 ];
732 let rb = DfRecordBatch::try_new(arrow_schema, arrays).unwrap();
733
734 let part = BulkPart {
735 batch: rb,
736 sequence: 1,
737 min_timestamp: 1,
738 max_timestamp: 2,
739 timestamp_index: 0,
740 raw_data: None,
741 };
742 memtable.write_bulk(part).unwrap();
743
744 let mut iter = memtable.iter(None, None, None).unwrap();
745 let batch = iter.next().unwrap().unwrap();
746 assert_eq!(2, batch.num_rows());
747
748 let stats = memtable.stats();
749 assert_eq!(1, stats.max_sequence);
750 assert_eq!(2, stats.num_rows);
751 assert_eq!(
752 Some((Timestamp::new_millisecond(1), Timestamp::new_millisecond(2))),
753 stats.time_range
754 );
755
756 let kvs = build_key_values(
757 &memtable.region_metadata,
758 2,
759 &[(3, 3.0, "c".to_string())],
760 OpType::Put,
761 );
762 memtable.write(&kvs).unwrap();
763 let mut iter = memtable.iter(None, None, None).unwrap();
764 let batch = iter.next().unwrap().unwrap();
765 assert_eq!(3, batch.num_rows());
766 assert_eq!(
767 vec![1, 2, 3],
768 batch
769 .timestamps()
770 .as_any()
771 .downcast_ref::<TimestampMillisecondVector>()
772 .unwrap()
773 .iter_data()
774 .map(|t| { t.unwrap().0.value() })
775 .collect::<Vec<_>>()
776 );
777 }
778
779 #[test]
780 fn test_is_empty() {
781 let memtable = new_test_memtable(false, MergeMode::LastRow);
782 assert!(memtable.is_empty());
783
784 memtable
785 .write(&build_key_values(
786 &memtable.region_metadata,
787 0,
788 &[(1, 1.0, "a".to_string())],
789 OpType::Put,
790 ))
791 .unwrap();
792 assert!(!memtable.is_empty());
793 }
794
795 #[test]
796 fn test_stats() {
797 let memtable = new_test_memtable(false, MergeMode::LastRow);
798 let stats = memtable.stats();
799 assert_eq!(0, stats.num_rows);
800 assert!(stats.time_range.is_none());
801
802 memtable
803 .write(&build_key_values(
804 &memtable.region_metadata,
805 0,
806 &[(1, 1.0, "a".to_string())],
807 OpType::Put,
808 ))
809 .unwrap();
810 let stats = memtable.stats();
811 assert_eq!(1, stats.num_rows);
812 assert!(stats.time_range.is_some());
813 }
814
815 #[test]
816 fn test_fork() {
817 let memtable = new_test_memtable(false, MergeMode::LastRow);
818 memtable
819 .write(&build_key_values(
820 &memtable.region_metadata,
821 0,
822 &[(1, 1.0, "a".to_string())],
823 OpType::Put,
824 ))
825 .unwrap();
826
827 let forked = memtable.fork(2, &memtable.region_metadata);
828 assert!(forked.is_empty());
829 }
830
831 #[test]
832 fn test_sequence_filter() {
833 let memtable = new_test_memtable(false, MergeMode::LastRow);
834 memtable
835 .write(&build_key_values(
836 &memtable.region_metadata,
837 0,
838 &[(1, 1.0, "a".to_string())],
839 OpType::Put,
840 ))
841 .unwrap();
842 memtable
843 .write(&build_key_values(
844 &memtable.region_metadata,
845 1,
846 &[(2, 2.0, "b".to_string())],
847 OpType::Put,
848 ))
849 .unwrap();
850
851 let mut iter = memtable
853 .iter(None, None, Some(SequenceRange::LtEq { max: 0 }))
854 .unwrap();
855 let batch = iter.next().unwrap().unwrap();
856 assert_eq!(1, batch.num_rows());
857 assert_eq!(1.0, batch.fields()[0].data.get(0).as_f64_lossy().unwrap());
858 }
859
860 fn rb_with_large_string(
861 ts: i64,
862 string_len: i32,
863 region_meta: &RegionMetadataRef,
864 ) -> RecordBatch {
865 let schema = region_meta.schema.arrow_schema().clone();
866 RecordBatch::try_new(
867 schema,
868 vec![
869 Arc::new(StringArray::from_iter_values(
870 ["a".repeat(string_len as usize).clone()].into_iter(),
871 )) as ArrayRef,
872 Arc::new(TimestampMillisecondArray::from_iter_values(
873 [ts].into_iter(),
874 )) as ArrayRef,
875 ],
876 )
877 .unwrap()
878 }
879
880 #[tokio::test]
881 async fn test_write_read_large_string() {
882 let mut builder = RegionMetadataBuilder::new(RegionId::new(123, 456));
883 builder
884 .push_column_metadata(ColumnMetadata {
885 column_schema: ColumnSchema::new("k0", ConcreteDataType::string_datatype(), false),
886 semantic_type: SemanticType::Field,
887 column_id: 0,
888 })
889 .push_column_metadata(ColumnMetadata {
890 column_schema: ColumnSchema::new(
891 "ts",
892 ConcreteDataType::timestamp_millisecond_datatype(),
893 false,
894 ),
895 semantic_type: SemanticType::Timestamp,
896 column_id: 1,
897 })
898 .primary_key(vec![]);
899 let region_meta = Arc::new(builder.build().unwrap());
900 let memtable =
901 SimpleBulkMemtable::new(0, region_meta.clone(), None, true, MergeMode::LastRow);
902 memtable
903 .write_bulk(BulkPart {
904 batch: rb_with_large_string(0, i32::MAX, ®ion_meta),
905 max_timestamp: 0,
906 min_timestamp: 0,
907 sequence: 0,
908 timestamp_index: 1,
909 raw_data: None,
910 })
911 .unwrap();
912
913 memtable.freeze().unwrap();
914 memtable
915 .write_bulk(BulkPart {
916 batch: rb_with_large_string(1, 3, ®ion_meta),
917 max_timestamp: 1,
918 min_timestamp: 1,
919 sequence: 1,
920 timestamp_index: 1,
921 raw_data: None,
922 })
923 .unwrap();
924 let MemtableRanges { ranges, .. } =
925 memtable.ranges(None, RangesOptions::default()).unwrap();
926 let mut source = if ranges.len() == 1 {
927 let only_range = ranges.into_values().next().unwrap();
928 Source::Iter(only_range.build_iter().unwrap())
929 } else {
930 let sources = ranges
931 .into_values()
932 .map(|r| r.build_iter().map(Source::Iter))
933 .collect::<error::Result<Vec<_>>>()
934 .unwrap();
935 let merge_reader = MergeReaderBuilder::from_sources(sources)
936 .build()
937 .await
938 .unwrap();
939 Source::Reader(Box::new(merge_reader))
940 };
941
942 let mut rows = 0;
943 while let Some(b) = source.next_batch().await.unwrap() {
944 rows += b.num_rows();
945 }
946 assert_eq!(rows, 2);
947 }
948}