1#[cfg(any(test, feature = "test"))]
16mod test_only;
17
18use std::collections::HashSet;
19use std::fmt::{Debug, Formatter};
20use std::sync::atomic::{AtomicI64, AtomicU64, AtomicUsize, Ordering};
21use std::sync::{Arc, RwLock};
22use std::time::{Duration, Instant};
23
24use api::v1::OpType;
25use datatypes::vectors::Helper;
26use mito_codec::key_values::KeyValue;
27use rayon::prelude::*;
28use snafu::{OptionExt, ResultExt};
29use store_api::metadata::RegionMetadataRef;
30use store_api::storage::{ColumnId, SequenceNumber};
31
32use crate::flush::WriteBufferManagerRef;
33use crate::memtable::bulk::part::BulkPart;
34use crate::memtable::stats::WriteMetrics;
35use crate::memtable::time_series::Series;
36use crate::memtable::{
37 AllocTracker, BoxedBatchIterator, IterBuilder, KeyValues, MemScanMetrics, Memtable, MemtableId,
38 MemtableRange, MemtableRangeContext, MemtableRanges, MemtableRef, MemtableStats,
39};
40use crate::metrics::MEMTABLE_ACTIVE_SERIES_COUNT;
41use crate::read::dedup::LastNonNullIter;
42use crate::read::scan_region::PredicateGroup;
43use crate::read::Batch;
44use crate::region::options::MergeMode;
45use crate::{error, metrics};
46
47pub struct SimpleBulkMemtable {
48 id: MemtableId,
49 region_metadata: RegionMetadataRef,
50 alloc_tracker: AllocTracker,
51 max_timestamp: AtomicI64,
52 min_timestamp: AtomicI64,
53 max_sequence: AtomicU64,
54 dedup: bool,
55 merge_mode: MergeMode,
56 num_rows: AtomicUsize,
57 series: RwLock<Series>,
58}
59
60impl Drop for SimpleBulkMemtable {
61 fn drop(&mut self) {
62 MEMTABLE_ACTIVE_SERIES_COUNT.dec();
63 }
64}
65
66impl SimpleBulkMemtable {
67 pub fn new(
68 id: MemtableId,
69 region_metadata: RegionMetadataRef,
70 write_buffer_manager: Option<WriteBufferManagerRef>,
71 dedup: bool,
72 merge_mode: MergeMode,
73 ) -> Self {
74 let dedup = if merge_mode == MergeMode::LastNonNull {
75 false
76 } else {
77 dedup
78 };
79 let series = RwLock::new(Series::with_capacity(®ion_metadata, 1024, 8192));
80
81 Self {
82 id,
83 region_metadata,
84 alloc_tracker: AllocTracker::new(write_buffer_manager),
85 max_timestamp: AtomicI64::new(i64::MIN),
86 min_timestamp: AtomicI64::new(i64::MAX),
87 max_sequence: AtomicU64::new(0),
88 dedup,
89 merge_mode,
90 num_rows: AtomicUsize::new(0),
91 series,
92 }
93 }
94
95 fn build_projection(&self, projection: Option<&[ColumnId]>) -> HashSet<ColumnId> {
96 if let Some(projection) = projection {
97 projection.iter().copied().collect()
98 } else {
99 self.region_metadata
100 .field_columns()
101 .map(|c| c.column_id)
102 .collect()
103 }
104 }
105
106 fn write_key_value(&self, kv: KeyValue, stats: &mut WriteMetrics) {
107 let ts = kv.timestamp();
108 let sequence = kv.sequence();
109 let op_type = kv.op_type();
110 let mut series = self.series.write().unwrap();
111 let size = series.push(ts, sequence, op_type, kv.fields());
112 stats.value_bytes += size;
113 let ts = kv.timestamp().as_timestamp().unwrap().unwrap().value();
115 stats.min_ts = stats.min_ts.min(ts);
116 stats.max_ts = stats.max_ts.max(ts);
117 }
118
119 fn update_stats(&self, stats: WriteMetrics) {
121 self.alloc_tracker
122 .on_allocation(stats.key_bytes + stats.value_bytes);
123 self.num_rows.fetch_add(stats.num_rows, Ordering::SeqCst);
124 self.max_timestamp.fetch_max(stats.max_ts, Ordering::SeqCst);
125 self.min_timestamp.fetch_min(stats.min_ts, Ordering::SeqCst);
126 self.max_sequence
127 .fetch_max(stats.max_sequence, Ordering::SeqCst);
128 }
129
130 #[cfg(test)]
131 fn schema(&self) -> &RegionMetadataRef {
132 &self.region_metadata
133 }
134}
135
136impl Debug for SimpleBulkMemtable {
137 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
138 f.debug_struct("SimpleBulkMemtable").finish()
139 }
140}
141
142impl Memtable for SimpleBulkMemtable {
143 fn id(&self) -> MemtableId {
144 self.id
145 }
146
147 fn write(&self, kvs: &KeyValues) -> error::Result<()> {
148 let mut stats = WriteMetrics::default();
149 let max_sequence = kvs.max_sequence();
150 for kv in kvs.iter() {
151 self.write_key_value(kv, &mut stats);
152 }
153 stats.max_sequence = max_sequence;
154 stats.num_rows = kvs.num_rows();
155 self.update_stats(stats);
156 Ok(())
157 }
158
159 fn write_one(&self, kv: KeyValue) -> error::Result<()> {
160 debug_assert_eq!(0, kv.num_primary_keys());
161 let mut stats = WriteMetrics::default();
162 self.write_key_value(kv, &mut stats);
163 stats.num_rows = 1;
164 stats.max_sequence = kv.sequence();
165 self.update_stats(stats);
166 Ok(())
167 }
168
169 fn write_bulk(&self, part: BulkPart) -> error::Result<()> {
170 let rb = &part.batch;
171
172 let ts = Helper::try_into_vector(
173 rb.column_by_name(&self.region_metadata.time_index_column().column_schema.name)
174 .with_context(|| error::InvalidRequestSnafu {
175 region_id: self.region_metadata.region_id,
176 reason: "Timestamp not found",
177 })?,
178 )
179 .context(error::ConvertVectorSnafu)?;
180
181 let sequence = part.sequence;
182
183 let fields: Vec<_> = self
184 .region_metadata
185 .field_columns()
186 .map(|f| {
187 let array = rb.column_by_name(&f.column_schema.name).ok_or_else(|| {
188 error::InvalidRequestSnafu {
189 region_id: self.region_metadata.region_id,
190 reason: format!("Column {} not found", f.column_schema.name),
191 }
192 .build()
193 })?;
194 Helper::try_into_vector(array).context(error::ConvertVectorSnafu)
195 })
196 .collect::<error::Result<Vec<_>>>()?;
197
198 let mut series = self.series.write().unwrap();
199 let extend_timer = metrics::REGION_WORKER_HANDLE_WRITE_ELAPSED
200 .with_label_values(&["bulk_extend"])
201 .start_timer();
202 series.extend(ts, OpType::Put as u8, sequence, fields)?;
203 extend_timer.observe_duration();
204
205 self.update_stats(WriteMetrics {
206 key_bytes: 0,
207 value_bytes: part.estimated_size(),
208 min_ts: part.min_ts,
209 max_ts: part.max_ts,
210 num_rows: part.num_rows(),
211 max_sequence: sequence,
212 });
213 Ok(())
214 }
215
216 #[cfg(any(test, feature = "test"))]
217 fn iter(
218 &self,
219 projection: Option<&[ColumnId]>,
220 _predicate: Option<table::predicate::Predicate>,
221 sequence: Option<SequenceNumber>,
222 ) -> error::Result<BoxedBatchIterator> {
223 let iter = self.create_iter(projection, sequence)?.build(None)?;
224
225 if self.merge_mode == MergeMode::LastNonNull {
226 let iter = LastNonNullIter::new(iter);
227 Ok(Box::new(iter))
228 } else {
229 Ok(Box::new(iter))
230 }
231 }
232
233 fn ranges(
234 &self,
235 projection: Option<&[ColumnId]>,
236 predicate: PredicateGroup,
237 sequence: Option<SequenceNumber>,
238 ) -> error::Result<MemtableRanges> {
239 let start_time = Instant::now();
240 let projection = Arc::new(self.build_projection(projection));
241 let values = self.series.read().unwrap().read_to_values();
242 let contexts = values
243 .into_par_iter()
244 .filter_map(|v| {
245 let filtered = match v
246 .to_batch(&[], &self.region_metadata, &projection, self.dedup)
247 .and_then(|mut b| {
248 b.filter_by_sequence(sequence)?;
249 Ok(b)
250 }) {
251 Ok(filtered) => filtered,
252 Err(e) => {
253 return Some(Err(e));
254 }
255 };
256 if filtered.is_empty() {
257 None
258 } else {
259 Some(Ok(filtered))
260 }
261 })
262 .map(|result| {
263 result.map(|batch| {
264 let num_rows = batch.num_rows();
265 let builder = BatchRangeBuilder {
266 batch,
267 merge_mode: self.merge_mode,
268 scan_cost: start_time.elapsed(),
269 };
270 (
271 num_rows,
272 Arc::new(MemtableRangeContext::new(
273 self.id,
274 Box::new(builder),
275 predicate.clone(),
276 )),
277 )
278 })
279 })
280 .collect::<error::Result<Vec<_>>>()?;
281
282 let ranges = contexts
283 .into_iter()
284 .enumerate()
285 .map(|(idx, (num_rows, context))| (idx, MemtableRange::new(context, num_rows)))
286 .collect();
287
288 Ok(MemtableRanges {
289 ranges,
290 stats: self.stats(),
291 })
292 }
293
294 fn is_empty(&self) -> bool {
295 self.series.read().unwrap().is_empty()
296 }
297
298 fn freeze(&self) -> error::Result<()> {
299 self.series.write().unwrap().freeze(&self.region_metadata);
300 Ok(())
301 }
302
303 fn stats(&self) -> MemtableStats {
304 let estimated_bytes = self.alloc_tracker.bytes_allocated();
305 let num_rows = self.num_rows.load(Ordering::Relaxed);
306 if num_rows == 0 {
307 return MemtableStats {
309 estimated_bytes,
310 time_range: None,
311 num_rows: 0,
312 num_ranges: 0,
313 max_sequence: 0,
314 series_count: 0,
315 };
316 }
317 let ts_type = self
318 .region_metadata
319 .time_index_column()
320 .column_schema
321 .data_type
322 .clone()
323 .as_timestamp()
324 .expect("Timestamp column must have timestamp type");
325 let max_timestamp = ts_type.create_timestamp(self.max_timestamp.load(Ordering::Relaxed));
326 let min_timestamp = ts_type.create_timestamp(self.min_timestamp.load(Ordering::Relaxed));
327 MemtableStats {
328 estimated_bytes,
329 time_range: Some((min_timestamp, max_timestamp)),
330 num_rows,
331 num_ranges: 1,
332 max_sequence: self.max_sequence.load(Ordering::Relaxed),
333 series_count: 1,
334 }
335 }
336
337 fn fork(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef {
338 Arc::new(Self::new(
339 id,
340 metadata.clone(),
341 self.alloc_tracker.write_buffer_manager(),
342 self.dedup,
343 self.merge_mode,
344 ))
345 }
346}
347
348#[derive(Clone)]
349pub struct BatchRangeBuilder {
350 pub batch: Batch,
351 pub merge_mode: MergeMode,
352 scan_cost: Duration,
353}
354
355impl IterBuilder for BatchRangeBuilder {
356 fn build(&self, metrics: Option<MemScanMetrics>) -> error::Result<BoxedBatchIterator> {
357 let batch = self.batch.clone();
358 if let Some(metrics) = metrics {
359 let inner = crate::memtable::MemScanMetricsData {
360 total_series: 1,
361 num_rows: batch.num_rows(),
362 num_batches: 1,
363 scan_cost: self.scan_cost,
364 };
365 metrics.merge_inner(&inner);
366 }
367
368 let iter = Iter {
369 batch: Some(Ok(batch)),
370 };
371
372 if self.merge_mode == MergeMode::LastNonNull {
373 Ok(Box::new(LastNonNullIter::new(iter)))
374 } else {
375 Ok(Box::new(iter))
376 }
377 }
378}
379
380struct Iter {
381 batch: Option<error::Result<Batch>>,
382}
383
384impl Iterator for Iter {
385 type Item = error::Result<Batch>;
386
387 fn next(&mut self) -> Option<Self::Item> {
388 self.batch.take()
389 }
390}
391
392#[cfg(test)]
393mod tests {
394 use std::sync::Arc;
395
396 use api::v1::value::ValueData;
397 use api::v1::{Mutation, OpType, Row, Rows, SemanticType};
398 use common_recordbatch::DfRecordBatch;
399 use common_time::Timestamp;
400 use datatypes::arrow::array::{ArrayRef, Float64Array, RecordBatch, TimestampMillisecondArray};
401 use datatypes::arrow_array::StringArray;
402 use datatypes::data_type::ConcreteDataType;
403 use datatypes::prelude::{ScalarVector, Vector};
404 use datatypes::schema::ColumnSchema;
405 use datatypes::value::Value;
406 use datatypes::vectors::TimestampMillisecondVector;
407 use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
408 use store_api::storage::{RegionId, SequenceNumber};
409
410 use super::*;
411 use crate::read;
412 use crate::read::dedup::DedupReader;
413 use crate::read::merge::MergeReaderBuilder;
414 use crate::read::{BatchReader, Source};
415 use crate::region::options::MergeMode;
416 use crate::test_util::column_metadata_to_column_schema;
417
418 fn new_test_metadata() -> RegionMetadataRef {
419 let mut builder = RegionMetadataBuilder::new(1.into());
420 builder
421 .push_column_metadata(ColumnMetadata {
422 column_schema: ColumnSchema::new(
423 "ts",
424 ConcreteDataType::timestamp_millisecond_datatype(),
425 false,
426 ),
427 semantic_type: SemanticType::Timestamp,
428 column_id: 1,
429 })
430 .push_column_metadata(ColumnMetadata {
431 column_schema: ColumnSchema::new("f1", ConcreteDataType::float64_datatype(), true),
432 semantic_type: SemanticType::Field,
433 column_id: 2,
434 })
435 .push_column_metadata(ColumnMetadata {
436 column_schema: ColumnSchema::new("f2", ConcreteDataType::string_datatype(), true),
437 semantic_type: SemanticType::Field,
438 column_id: 3,
439 });
440 Arc::new(builder.build().unwrap())
441 }
442
443 fn new_test_memtable(dedup: bool, merge_mode: MergeMode) -> SimpleBulkMemtable {
444 SimpleBulkMemtable::new(1, new_test_metadata(), None, dedup, merge_mode)
445 }
446
447 fn build_key_values(
448 metadata: &RegionMetadataRef,
449 sequence: SequenceNumber,
450 row_values: &[(i64, f64, String)],
451 op_type: OpType,
452 ) -> KeyValues {
453 let column_schemas: Vec<_> = metadata
454 .column_metadatas
455 .iter()
456 .map(column_metadata_to_column_schema)
457 .collect();
458
459 let rows: Vec<_> = row_values
460 .iter()
461 .map(|(ts, f1, f2)| Row {
462 values: vec![
463 api::v1::Value {
464 value_data: Some(ValueData::TimestampMillisecondValue(*ts)),
465 },
466 api::v1::Value {
467 value_data: Some(ValueData::F64Value(*f1)),
468 },
469 api::v1::Value {
470 value_data: Some(ValueData::StringValue(f2.clone())),
471 },
472 ],
473 })
474 .collect();
475 let mutation = Mutation {
476 op_type: op_type as i32,
477 sequence,
478 rows: Some(Rows {
479 schema: column_schemas,
480 rows,
481 }),
482 write_hint: None,
483 };
484 KeyValues::new(metadata, mutation).unwrap()
485 }
486
487 #[test]
488 fn test_write_and_iter() {
489 let memtable = new_test_memtable(false, MergeMode::LastRow);
490 memtable
491 .write(&build_key_values(
492 &memtable.region_metadata,
493 0,
494 &[(1, 1.0, "a".to_string())],
495 OpType::Put,
496 ))
497 .unwrap();
498 memtable
499 .write(&build_key_values(
500 &memtable.region_metadata,
501 1,
502 &[(2, 2.0, "b".to_string())],
503 OpType::Put,
504 ))
505 .unwrap();
506
507 let mut iter = memtable.iter(None, None, None).unwrap();
508 let batch = iter.next().unwrap().unwrap();
509 assert_eq!(2, batch.num_rows());
510 assert_eq!(2, batch.fields().len());
511 let ts_v = batch
512 .timestamps()
513 .as_any()
514 .downcast_ref::<TimestampMillisecondVector>()
515 .unwrap();
516 assert_eq!(Value::Timestamp(Timestamp::new_millisecond(1)), ts_v.get(0));
517 assert_eq!(Value::Timestamp(Timestamp::new_millisecond(2)), ts_v.get(1));
518 }
519
520 #[test]
521 fn test_projection() {
522 let memtable = new_test_memtable(false, MergeMode::LastRow);
523 memtable
524 .write(&build_key_values(
525 &memtable.region_metadata,
526 0,
527 &[(1, 1.0, "a".to_string())],
528 OpType::Put,
529 ))
530 .unwrap();
531
532 let mut iter = memtable.iter(None, None, None).unwrap();
533 let batch = iter.next().unwrap().unwrap();
534 assert_eq!(1, batch.num_rows());
535 assert_eq!(2, batch.fields().len());
536
537 let ts_v = batch
538 .timestamps()
539 .as_any()
540 .downcast_ref::<TimestampMillisecondVector>()
541 .unwrap();
542 assert_eq!(Value::Timestamp(Timestamp::new_millisecond(1)), ts_v.get(0));
543
544 let projection = vec![2];
546 let mut iter = memtable.iter(Some(&projection), None, None).unwrap();
547 let batch = iter.next().unwrap().unwrap();
548
549 assert_eq!(1, batch.num_rows());
550 assert_eq!(1, batch.fields().len()); assert_eq!(2, batch.fields()[0].column_id);
552 }
553
554 #[test]
555 fn test_dedup() {
556 let memtable = new_test_memtable(true, MergeMode::LastRow);
557 memtable
558 .write(&build_key_values(
559 &memtable.region_metadata,
560 0,
561 &[(1, 1.0, "a".to_string())],
562 OpType::Put,
563 ))
564 .unwrap();
565 memtable
566 .write(&build_key_values(
567 &memtable.region_metadata,
568 1,
569 &[(1, 2.0, "b".to_string())],
570 OpType::Put,
571 ))
572 .unwrap();
573 let mut iter = memtable.iter(None, None, None).unwrap();
574 let batch = iter.next().unwrap().unwrap();
575
576 assert_eq!(1, batch.num_rows()); assert_eq!(2.0, batch.fields()[0].data.get(0).as_f64_lossy().unwrap()); }
579
580 #[test]
581 fn test_write_one() {
582 let memtable = new_test_memtable(false, MergeMode::LastRow);
583 let kvs = build_key_values(
584 &memtable.region_metadata,
585 0,
586 &[(1, 1.0, "a".to_string())],
587 OpType::Put,
588 );
589 let kv = kvs.iter().next().unwrap();
590 memtable.write_one(kv).unwrap();
591
592 let mut iter = memtable.iter(None, None, None).unwrap();
593 let batch = iter.next().unwrap().unwrap();
594 assert_eq!(1, batch.num_rows());
595 }
596
597 #[tokio::test]
598 async fn test_write_dedup() {
599 let memtable = new_test_memtable(true, MergeMode::LastRow);
600 let kvs = build_key_values(
601 &memtable.region_metadata,
602 0,
603 &[(1, 1.0, "a".to_string())],
604 OpType::Put,
605 );
606 let kv = kvs.iter().next().unwrap();
607 memtable.write_one(kv).unwrap();
608 memtable.freeze().unwrap();
609
610 let kvs = build_key_values(
611 &memtable.region_metadata,
612 1,
613 &[(1, 1.0, "a".to_string())],
614 OpType::Delete,
615 );
616 let kv = kvs.iter().next().unwrap();
617 memtable.write_one(kv).unwrap();
618
619 let ranges = memtable
620 .ranges(None, PredicateGroup::default(), None)
621 .unwrap();
622 let mut source = vec![];
623 for r in ranges.ranges.values() {
624 source.push(Source::Iter(r.build_iter().unwrap()));
625 }
626
627 let reader = MergeReaderBuilder::from_sources(source)
628 .build()
629 .await
630 .unwrap();
631
632 let mut reader = DedupReader::new(reader, read::dedup::LastRow::new(false));
633 let mut num_rows = 0;
634 while let Some(b) = reader.next_batch().await.unwrap() {
635 num_rows += b.num_rows();
636 }
637 assert_eq!(num_rows, 1);
638 }
639
640 #[tokio::test]
641 async fn test_delete_only() {
642 let memtable = new_test_memtable(true, MergeMode::LastRow);
643 let kvs = build_key_values(
644 &memtable.region_metadata,
645 0,
646 &[(1, 1.0, "a".to_string())],
647 OpType::Delete,
648 );
649 let kv = kvs.iter().next().unwrap();
650 memtable.write_one(kv).unwrap();
651 memtable.freeze().unwrap();
652
653 let ranges = memtable
654 .ranges(None, PredicateGroup::default(), None)
655 .unwrap();
656 let mut source = vec![];
657 for r in ranges.ranges.values() {
658 source.push(Source::Iter(r.build_iter().unwrap()));
659 }
660
661 let reader = MergeReaderBuilder::from_sources(source)
662 .build()
663 .await
664 .unwrap();
665
666 let mut reader = DedupReader::new(reader, read::dedup::LastRow::new(false));
667 let mut num_rows = 0;
668 while let Some(b) = reader.next_batch().await.unwrap() {
669 num_rows += b.num_rows();
670 assert_eq!(b.num_rows(), 1);
671 assert_eq!(b.op_types().get_data(0).unwrap(), OpType::Delete as u8);
672 }
673 assert_eq!(num_rows, 1);
674 }
675
676 #[tokio::test]
677 async fn test_single_range() {
678 let memtable = new_test_memtable(true, MergeMode::LastRow);
679 let kvs = build_key_values(
680 &memtable.region_metadata,
681 0,
682 &[(1, 1.0, "a".to_string())],
683 OpType::Put,
684 );
685 memtable.write_one(kvs.iter().next().unwrap()).unwrap();
686
687 let kvs = build_key_values(
688 &memtable.region_metadata,
689 1,
690 &[(1, 2.0, "b".to_string())],
691 OpType::Put,
692 );
693 memtable.write_one(kvs.iter().next().unwrap()).unwrap();
694 memtable.freeze().unwrap();
695
696 let ranges = memtable
697 .ranges(None, PredicateGroup::default(), None)
698 .unwrap();
699 assert_eq!(ranges.ranges.len(), 1);
700 let range = ranges.ranges.into_values().next().unwrap();
701 let mut reader = range.context.builder.build(None).unwrap();
702
703 let mut num_rows = 0;
704 while let Some(b) = reader.next().transpose().unwrap() {
705 num_rows += b.num_rows();
706 assert_eq!(b.fields()[1].data.get(0).as_string(), Some("b".to_string()));
707 }
708 assert_eq!(num_rows, 1);
709 }
710
711 #[test]
712 fn test_write_bulk() {
713 let memtable = new_test_memtable(false, MergeMode::LastRow);
714 let arrow_schema = memtable.schema().schema.arrow_schema().clone();
715 let arrays = vec![
716 Arc::new(TimestampMillisecondArray::from(vec![1, 2])) as ArrayRef,
717 Arc::new(Float64Array::from(vec![1.0, 2.0])) as ArrayRef,
718 Arc::new(StringArray::from(vec!["a", "b"])) as ArrayRef,
719 ];
720 let rb = DfRecordBatch::try_new(arrow_schema, arrays).unwrap();
721
722 let part = BulkPart {
723 batch: rb,
724 sequence: 1,
725 min_ts: 1,
726 max_ts: 2,
727 timestamp_index: 0,
728 raw_data: None,
729 };
730 memtable.write_bulk(part).unwrap();
731
732 let mut iter = memtable.iter(None, None, None).unwrap();
733 let batch = iter.next().unwrap().unwrap();
734 assert_eq!(2, batch.num_rows());
735
736 let stats = memtable.stats();
737 assert_eq!(1, stats.max_sequence);
738 assert_eq!(2, stats.num_rows);
739 assert_eq!(
740 Some((Timestamp::new_millisecond(1), Timestamp::new_millisecond(2))),
741 stats.time_range
742 );
743
744 let kvs = build_key_values(
745 &memtable.region_metadata,
746 2,
747 &[(3, 3.0, "c".to_string())],
748 OpType::Put,
749 );
750 memtable.write(&kvs).unwrap();
751 let mut iter = memtable.iter(None, None, None).unwrap();
752 let batch = iter.next().unwrap().unwrap();
753 assert_eq!(3, batch.num_rows());
754 assert_eq!(
755 vec![1, 2, 3],
756 batch
757 .timestamps()
758 .as_any()
759 .downcast_ref::<TimestampMillisecondVector>()
760 .unwrap()
761 .iter_data()
762 .map(|t| { t.unwrap().0.value() })
763 .collect::<Vec<_>>()
764 );
765 }
766
767 #[test]
768 fn test_is_empty() {
769 let memtable = new_test_memtable(false, MergeMode::LastRow);
770 assert!(memtable.is_empty());
771
772 memtable
773 .write(&build_key_values(
774 &memtable.region_metadata,
775 0,
776 &[(1, 1.0, "a".to_string())],
777 OpType::Put,
778 ))
779 .unwrap();
780 assert!(!memtable.is_empty());
781 }
782
783 #[test]
784 fn test_stats() {
785 let memtable = new_test_memtable(false, MergeMode::LastRow);
786 let stats = memtable.stats();
787 assert_eq!(0, stats.num_rows);
788 assert!(stats.time_range.is_none());
789
790 memtable
791 .write(&build_key_values(
792 &memtable.region_metadata,
793 0,
794 &[(1, 1.0, "a".to_string())],
795 OpType::Put,
796 ))
797 .unwrap();
798 let stats = memtable.stats();
799 assert_eq!(1, stats.num_rows);
800 assert!(stats.time_range.is_some());
801 }
802
803 #[test]
804 fn test_fork() {
805 let memtable = new_test_memtable(false, MergeMode::LastRow);
806 memtable
807 .write(&build_key_values(
808 &memtable.region_metadata,
809 0,
810 &[(1, 1.0, "a".to_string())],
811 OpType::Put,
812 ))
813 .unwrap();
814
815 let forked = memtable.fork(2, &memtable.region_metadata);
816 assert!(forked.is_empty());
817 }
818
819 #[test]
820 fn test_sequence_filter() {
821 let memtable = new_test_memtable(false, MergeMode::LastRow);
822 memtable
823 .write(&build_key_values(
824 &memtable.region_metadata,
825 0,
826 &[(1, 1.0, "a".to_string())],
827 OpType::Put,
828 ))
829 .unwrap();
830 memtable
831 .write(&build_key_values(
832 &memtable.region_metadata,
833 1,
834 &[(2, 2.0, "b".to_string())],
835 OpType::Put,
836 ))
837 .unwrap();
838
839 let mut iter = memtable.iter(None, None, Some(0)).unwrap();
841 let batch = iter.next().unwrap().unwrap();
842 assert_eq!(1, batch.num_rows());
843 assert_eq!(1.0, batch.fields()[0].data.get(0).as_f64_lossy().unwrap());
844 }
845
846 fn rb_with_large_string(
847 ts: i64,
848 string_len: i32,
849 region_meta: &RegionMetadataRef,
850 ) -> RecordBatch {
851 let schema = region_meta.schema.arrow_schema().clone();
852 RecordBatch::try_new(
853 schema,
854 vec![
855 Arc::new(StringArray::from_iter_values(
856 ["a".repeat(string_len as usize).to_string()].into_iter(),
857 )) as ArrayRef,
858 Arc::new(TimestampMillisecondArray::from_iter_values(
859 [ts].into_iter(),
860 )) as ArrayRef,
861 ],
862 )
863 .unwrap()
864 }
865
866 #[tokio::test]
867 async fn test_write_read_large_string() {
868 let mut builder = RegionMetadataBuilder::new(RegionId::new(123, 456));
869 builder
870 .push_column_metadata(ColumnMetadata {
871 column_schema: ColumnSchema::new("k0", ConcreteDataType::string_datatype(), false),
872 semantic_type: SemanticType::Field,
873 column_id: 0,
874 })
875 .push_column_metadata(ColumnMetadata {
876 column_schema: ColumnSchema::new(
877 "ts",
878 ConcreteDataType::timestamp_millisecond_datatype(),
879 false,
880 ),
881 semantic_type: SemanticType::Timestamp,
882 column_id: 1,
883 })
884 .primary_key(vec![]);
885 let region_meta = Arc::new(builder.build().unwrap());
886 let memtable =
887 SimpleBulkMemtable::new(0, region_meta.clone(), None, true, MergeMode::LastRow);
888 memtable
889 .write_bulk(BulkPart {
890 batch: rb_with_large_string(0, i32::MAX, ®ion_meta),
891 max_ts: 0,
892 min_ts: 0,
893 sequence: 0,
894 timestamp_index: 1,
895 raw_data: None,
896 })
897 .unwrap();
898
899 memtable.freeze().unwrap();
900 memtable
901 .write_bulk(BulkPart {
902 batch: rb_with_large_string(1, 3, ®ion_meta),
903 max_ts: 1,
904 min_ts: 1,
905 sequence: 1,
906 timestamp_index: 1,
907 raw_data: None,
908 })
909 .unwrap();
910 let MemtableRanges { ranges, .. } = memtable
911 .ranges(None, PredicateGroup::default(), None)
912 .unwrap();
913 let mut source = if ranges.len() == 1 {
914 let only_range = ranges.into_values().next().unwrap();
915 Source::Iter(only_range.build_iter().unwrap())
916 } else {
917 let sources = ranges
918 .into_values()
919 .map(|r| r.build_iter().map(Source::Iter))
920 .collect::<error::Result<Vec<_>>>()
921 .unwrap();
922 let merge_reader = MergeReaderBuilder::from_sources(sources)
923 .build()
924 .await
925 .unwrap();
926 Source::Reader(Box::new(merge_reader))
927 };
928
929 let mut rows = 0;
930 while let Some(b) = source.next_batch().await.unwrap() {
931 rows += b.num_rows();
932 }
933 assert_eq!(rows, 2);
934 }
935}