1#[cfg(any(test, feature = "test"))]
16mod test_only;
17
18use std::collections::HashSet;
19use std::fmt::{Debug, Formatter};
20use std::sync::atomic::{AtomicI64, AtomicU64, AtomicUsize, Ordering};
21use std::sync::{Arc, RwLock};
22use std::time::{Duration, Instant};
23
24use api::v1::OpType;
25use datatypes::vectors::Helper;
26use mito_codec::key_values::KeyValue;
27use rayon::prelude::*;
28use snafu::{OptionExt, ResultExt};
29use store_api::metadata::RegionMetadataRef;
30use store_api::storage::{ColumnId, SequenceNumber};
31
32use crate::flush::WriteBufferManagerRef;
33use crate::memtable::bulk::part::BulkPart;
34use crate::memtable::stats::WriteMetrics;
35use crate::memtable::time_series::Series;
36use crate::memtable::{
37 AllocTracker, BoxedBatchIterator, IterBuilder, KeyValues, MemScanMetrics, Memtable, MemtableId,
38 MemtableRange, MemtableRangeContext, MemtableRanges, MemtableRef, MemtableStats,
39};
40use crate::metrics::MEMTABLE_ACTIVE_SERIES_COUNT;
41use crate::read::Batch;
42use crate::read::dedup::LastNonNullIter;
43use crate::read::scan_region::PredicateGroup;
44use crate::region::options::MergeMode;
45use crate::{error, metrics};
46
47pub struct SimpleBulkMemtable {
48 id: MemtableId,
49 region_metadata: RegionMetadataRef,
50 alloc_tracker: AllocTracker,
51 max_timestamp: AtomicI64,
52 min_timestamp: AtomicI64,
53 max_sequence: AtomicU64,
54 dedup: bool,
55 merge_mode: MergeMode,
56 num_rows: AtomicUsize,
57 series: RwLock<Series>,
58}
59
60impl Drop for SimpleBulkMemtable {
61 fn drop(&mut self) {
62 MEMTABLE_ACTIVE_SERIES_COUNT.dec();
63 }
64}
65
66impl SimpleBulkMemtable {
67 pub fn new(
68 id: MemtableId,
69 region_metadata: RegionMetadataRef,
70 write_buffer_manager: Option<WriteBufferManagerRef>,
71 dedup: bool,
72 merge_mode: MergeMode,
73 ) -> Self {
74 let dedup = if merge_mode == MergeMode::LastNonNull {
75 false
76 } else {
77 dedup
78 };
79 let series = RwLock::new(Series::with_capacity(®ion_metadata, 1024, 8192));
80
81 Self {
82 id,
83 region_metadata,
84 alloc_tracker: AllocTracker::new(write_buffer_manager),
85 max_timestamp: AtomicI64::new(i64::MIN),
86 min_timestamp: AtomicI64::new(i64::MAX),
87 max_sequence: AtomicU64::new(0),
88 dedup,
89 merge_mode,
90 num_rows: AtomicUsize::new(0),
91 series,
92 }
93 }
94
95 fn build_projection(&self, projection: Option<&[ColumnId]>) -> HashSet<ColumnId> {
96 if let Some(projection) = projection {
97 projection.iter().copied().collect()
98 } else {
99 self.region_metadata
100 .field_columns()
101 .map(|c| c.column_id)
102 .collect()
103 }
104 }
105
106 fn write_key_value(&self, kv: KeyValue, stats: &mut WriteMetrics) {
107 let ts = kv.timestamp();
108 let sequence = kv.sequence();
109 let op_type = kv.op_type();
110 let mut series = self.series.write().unwrap();
111 let size = series.push(ts, sequence, op_type, kv.fields());
112 stats.value_bytes += size;
113 let ts = kv.timestamp().as_timestamp().unwrap().unwrap().value();
115 stats.min_ts = stats.min_ts.min(ts);
116 stats.max_ts = stats.max_ts.max(ts);
117 }
118
119 fn update_stats(&self, stats: WriteMetrics) {
121 self.alloc_tracker
122 .on_allocation(stats.key_bytes + stats.value_bytes);
123 self.num_rows.fetch_add(stats.num_rows, Ordering::SeqCst);
124 self.max_timestamp.fetch_max(stats.max_ts, Ordering::SeqCst);
125 self.min_timestamp.fetch_min(stats.min_ts, Ordering::SeqCst);
126 self.max_sequence
127 .fetch_max(stats.max_sequence, Ordering::SeqCst);
128 }
129
130 #[cfg(test)]
131 fn schema(&self) -> &RegionMetadataRef {
132 &self.region_metadata
133 }
134}
135
136impl Debug for SimpleBulkMemtable {
137 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
138 f.debug_struct("SimpleBulkMemtable").finish()
139 }
140}
141
142impl Memtable for SimpleBulkMemtable {
143 fn id(&self) -> MemtableId {
144 self.id
145 }
146
147 fn write(&self, kvs: &KeyValues) -> error::Result<()> {
148 let mut stats = WriteMetrics::default();
149 let max_sequence = kvs.max_sequence();
150 for kv in kvs.iter() {
151 self.write_key_value(kv, &mut stats);
152 }
153 stats.max_sequence = max_sequence;
154 stats.num_rows = kvs.num_rows();
155 self.update_stats(stats);
156 Ok(())
157 }
158
159 fn write_one(&self, kv: KeyValue) -> error::Result<()> {
160 debug_assert_eq!(0, kv.num_primary_keys());
161 let mut stats = WriteMetrics::default();
162 self.write_key_value(kv, &mut stats);
163 stats.num_rows = 1;
164 stats.max_sequence = kv.sequence();
165 self.update_stats(stats);
166 Ok(())
167 }
168
169 fn write_bulk(&self, part: BulkPart) -> error::Result<()> {
170 let rb = &part.batch;
171
172 let ts = Helper::try_into_vector(
173 rb.column_by_name(&self.region_metadata.time_index_column().column_schema.name)
174 .with_context(|| error::InvalidRequestSnafu {
175 region_id: self.region_metadata.region_id,
176 reason: "Timestamp not found",
177 })?,
178 )
179 .context(error::ConvertVectorSnafu)?;
180
181 let sequence = part.sequence;
182
183 let fields: Vec<_> = self
184 .region_metadata
185 .field_columns()
186 .map(|f| {
187 let array = rb.column_by_name(&f.column_schema.name).ok_or_else(|| {
188 error::InvalidRequestSnafu {
189 region_id: self.region_metadata.region_id,
190 reason: format!("Column {} not found", f.column_schema.name),
191 }
192 .build()
193 })?;
194 Helper::try_into_vector(array).context(error::ConvertVectorSnafu)
195 })
196 .collect::<error::Result<Vec<_>>>()?;
197
198 let mut series = self.series.write().unwrap();
199 let extend_timer = metrics::REGION_WORKER_HANDLE_WRITE_ELAPSED
200 .with_label_values(&["bulk_extend"])
201 .start_timer();
202 series.extend(ts, OpType::Put as u8, sequence, fields)?;
203 extend_timer.observe_duration();
204
205 self.update_stats(WriteMetrics {
206 key_bytes: 0,
207 value_bytes: part.estimated_size(),
208 min_ts: part.min_timestamp,
209 max_ts: part.max_timestamp,
210 num_rows: part.num_rows(),
211 max_sequence: sequence,
212 });
213 Ok(())
214 }
215
216 #[cfg(any(test, feature = "test"))]
217 fn iter(
218 &self,
219 projection: Option<&[ColumnId]>,
220 _predicate: Option<table::predicate::Predicate>,
221 sequence: Option<SequenceNumber>,
222 ) -> error::Result<BoxedBatchIterator> {
223 let iter = self.create_iter(projection, sequence)?.build(None)?;
224
225 if self.merge_mode == MergeMode::LastNonNull {
226 let iter = LastNonNullIter::new(iter);
227 Ok(Box::new(iter))
228 } else {
229 Ok(Box::new(iter))
230 }
231 }
232
233 fn ranges(
234 &self,
235 projection: Option<&[ColumnId]>,
236 predicate: PredicateGroup,
237 sequence: Option<SequenceNumber>,
238 _for_flush: bool,
239 ) -> error::Result<MemtableRanges> {
240 let start_time = Instant::now();
241 let projection = Arc::new(self.build_projection(projection));
242 let values = self.series.read().unwrap().read_to_values();
243 let contexts = values
244 .into_par_iter()
245 .filter_map(|v| {
246 let filtered = match v
247 .to_batch(&[], &self.region_metadata, &projection, self.dedup)
248 .and_then(|mut b| {
249 b.filter_by_sequence(sequence)?;
250 Ok(b)
251 }) {
252 Ok(filtered) => filtered,
253 Err(e) => {
254 return Some(Err(e));
255 }
256 };
257 if filtered.is_empty() {
258 None
259 } else {
260 Some(Ok(filtered))
261 }
262 })
263 .map(|result| {
264 result.map(|batch| {
265 let num_rows = batch.num_rows();
266 let builder = BatchRangeBuilder {
267 batch,
268 merge_mode: self.merge_mode,
269 scan_cost: start_time.elapsed(),
270 };
271 (
272 num_rows,
273 Arc::new(MemtableRangeContext::new(
274 self.id,
275 Box::new(builder),
276 predicate.clone(),
277 )),
278 )
279 })
280 })
281 .collect::<error::Result<Vec<_>>>()?;
282
283 let ranges = contexts
284 .into_iter()
285 .enumerate()
286 .map(|(idx, (num_rows, context))| (idx, MemtableRange::new(context, num_rows)))
287 .collect();
288
289 Ok(MemtableRanges {
290 ranges,
291 stats: self.stats(),
292 })
293 }
294
295 fn is_empty(&self) -> bool {
296 self.series.read().unwrap().is_empty()
297 }
298
299 fn freeze(&self) -> error::Result<()> {
300 self.series.write().unwrap().freeze(&self.region_metadata);
301 Ok(())
302 }
303
304 fn stats(&self) -> MemtableStats {
305 let estimated_bytes = self.alloc_tracker.bytes_allocated();
306 let num_rows = self.num_rows.load(Ordering::Relaxed);
307 if num_rows == 0 {
308 return MemtableStats {
310 estimated_bytes,
311 time_range: None,
312 num_rows: 0,
313 num_ranges: 0,
314 max_sequence: 0,
315 series_count: 0,
316 };
317 }
318 let ts_type = self
319 .region_metadata
320 .time_index_column()
321 .column_schema
322 .data_type
323 .clone()
324 .as_timestamp()
325 .expect("Timestamp column must have timestamp type");
326 let max_timestamp = ts_type.create_timestamp(self.max_timestamp.load(Ordering::Relaxed));
327 let min_timestamp = ts_type.create_timestamp(self.min_timestamp.load(Ordering::Relaxed));
328 MemtableStats {
329 estimated_bytes,
330 time_range: Some((min_timestamp, max_timestamp)),
331 num_rows,
332 num_ranges: 1,
333 max_sequence: self.max_sequence.load(Ordering::Relaxed),
334 series_count: 1,
335 }
336 }
337
338 fn fork(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef {
339 Arc::new(Self::new(
340 id,
341 metadata.clone(),
342 self.alloc_tracker.write_buffer_manager(),
343 self.dedup,
344 self.merge_mode,
345 ))
346 }
347}
348
349#[derive(Clone)]
350pub struct BatchRangeBuilder {
351 pub batch: Batch,
352 pub merge_mode: MergeMode,
353 scan_cost: Duration,
354}
355
356impl IterBuilder for BatchRangeBuilder {
357 fn build(&self, metrics: Option<MemScanMetrics>) -> error::Result<BoxedBatchIterator> {
358 let batch = self.batch.clone();
359 if let Some(metrics) = metrics {
360 let inner = crate::memtable::MemScanMetricsData {
361 total_series: 1,
362 num_rows: batch.num_rows(),
363 num_batches: 1,
364 scan_cost: self.scan_cost,
365 };
366 metrics.merge_inner(&inner);
367 }
368
369 let iter = Iter {
370 batch: Some(Ok(batch)),
371 };
372
373 if self.merge_mode == MergeMode::LastNonNull {
374 Ok(Box::new(LastNonNullIter::new(iter)))
375 } else {
376 Ok(Box::new(iter))
377 }
378 }
379}
380
381struct Iter {
382 batch: Option<error::Result<Batch>>,
383}
384
385impl Iterator for Iter {
386 type Item = error::Result<Batch>;
387
388 fn next(&mut self) -> Option<Self::Item> {
389 self.batch.take()
390 }
391}
392
393#[cfg(test)]
394mod tests {
395 use std::sync::Arc;
396
397 use api::v1::helper::row;
398 use api::v1::value::ValueData;
399 use api::v1::{Mutation, OpType, Rows, SemanticType};
400 use common_recordbatch::DfRecordBatch;
401 use common_time::Timestamp;
402 use datatypes::arrow::array::{ArrayRef, Float64Array, RecordBatch, TimestampMillisecondArray};
403 use datatypes::arrow_array::StringArray;
404 use datatypes::data_type::ConcreteDataType;
405 use datatypes::prelude::{ScalarVector, Vector};
406 use datatypes::schema::ColumnSchema;
407 use datatypes::value::Value;
408 use datatypes::vectors::TimestampMillisecondVector;
409 use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
410 use store_api::storage::{RegionId, SequenceNumber};
411
412 use super::*;
413 use crate::read;
414 use crate::read::dedup::DedupReader;
415 use crate::read::merge::MergeReaderBuilder;
416 use crate::read::{BatchReader, Source};
417 use crate::region::options::MergeMode;
418 use crate::test_util::column_metadata_to_column_schema;
419
420 fn new_test_metadata() -> RegionMetadataRef {
421 let mut builder = RegionMetadataBuilder::new(1.into());
422 builder
423 .push_column_metadata(ColumnMetadata {
424 column_schema: ColumnSchema::new(
425 "ts",
426 ConcreteDataType::timestamp_millisecond_datatype(),
427 false,
428 ),
429 semantic_type: SemanticType::Timestamp,
430 column_id: 1,
431 })
432 .push_column_metadata(ColumnMetadata {
433 column_schema: ColumnSchema::new("f1", ConcreteDataType::float64_datatype(), true),
434 semantic_type: SemanticType::Field,
435 column_id: 2,
436 })
437 .push_column_metadata(ColumnMetadata {
438 column_schema: ColumnSchema::new("f2", ConcreteDataType::string_datatype(), true),
439 semantic_type: SemanticType::Field,
440 column_id: 3,
441 });
442 Arc::new(builder.build().unwrap())
443 }
444
445 fn new_test_memtable(dedup: bool, merge_mode: MergeMode) -> SimpleBulkMemtable {
446 SimpleBulkMemtable::new(1, new_test_metadata(), None, dedup, merge_mode)
447 }
448
449 fn build_key_values(
450 metadata: &RegionMetadataRef,
451 sequence: SequenceNumber,
452 row_values: &[(i64, f64, String)],
453 op_type: OpType,
454 ) -> KeyValues {
455 let column_schemas: Vec<_> = metadata
456 .column_metadatas
457 .iter()
458 .map(column_metadata_to_column_schema)
459 .collect();
460
461 let rows: Vec<_> = row_values
462 .iter()
463 .map(|(ts, f1, f2)| {
464 row(vec![
465 ValueData::TimestampMillisecondValue(*ts),
466 ValueData::F64Value(*f1),
467 ValueData::StringValue(f2.clone()),
468 ])
469 })
470 .collect();
471 let mutation = Mutation {
472 op_type: op_type as i32,
473 sequence,
474 rows: Some(Rows {
475 schema: column_schemas,
476 rows,
477 }),
478 write_hint: None,
479 };
480 KeyValues::new(metadata, mutation).unwrap()
481 }
482
483 #[test]
484 fn test_write_and_iter() {
485 let memtable = new_test_memtable(false, MergeMode::LastRow);
486 memtable
487 .write(&build_key_values(
488 &memtable.region_metadata,
489 0,
490 &[(1, 1.0, "a".to_string())],
491 OpType::Put,
492 ))
493 .unwrap();
494 memtable
495 .write(&build_key_values(
496 &memtable.region_metadata,
497 1,
498 &[(2, 2.0, "b".to_string())],
499 OpType::Put,
500 ))
501 .unwrap();
502
503 let mut iter = memtable.iter(None, None, None).unwrap();
504 let batch = iter.next().unwrap().unwrap();
505 assert_eq!(2, batch.num_rows());
506 assert_eq!(2, batch.fields().len());
507 let ts_v = batch
508 .timestamps()
509 .as_any()
510 .downcast_ref::<TimestampMillisecondVector>()
511 .unwrap();
512 assert_eq!(Value::Timestamp(Timestamp::new_millisecond(1)), ts_v.get(0));
513 assert_eq!(Value::Timestamp(Timestamp::new_millisecond(2)), ts_v.get(1));
514 }
515
516 #[test]
517 fn test_projection() {
518 let memtable = new_test_memtable(false, MergeMode::LastRow);
519 memtable
520 .write(&build_key_values(
521 &memtable.region_metadata,
522 0,
523 &[(1, 1.0, "a".to_string())],
524 OpType::Put,
525 ))
526 .unwrap();
527
528 let mut iter = memtable.iter(None, None, None).unwrap();
529 let batch = iter.next().unwrap().unwrap();
530 assert_eq!(1, batch.num_rows());
531 assert_eq!(2, batch.fields().len());
532
533 let ts_v = batch
534 .timestamps()
535 .as_any()
536 .downcast_ref::<TimestampMillisecondVector>()
537 .unwrap();
538 assert_eq!(Value::Timestamp(Timestamp::new_millisecond(1)), ts_v.get(0));
539
540 let projection = vec![2];
542 let mut iter = memtable.iter(Some(&projection), None, None).unwrap();
543 let batch = iter.next().unwrap().unwrap();
544
545 assert_eq!(1, batch.num_rows());
546 assert_eq!(1, batch.fields().len()); assert_eq!(2, batch.fields()[0].column_id);
548 }
549
550 #[test]
551 fn test_dedup() {
552 let memtable = new_test_memtable(true, MergeMode::LastRow);
553 memtable
554 .write(&build_key_values(
555 &memtable.region_metadata,
556 0,
557 &[(1, 1.0, "a".to_string())],
558 OpType::Put,
559 ))
560 .unwrap();
561 memtable
562 .write(&build_key_values(
563 &memtable.region_metadata,
564 1,
565 &[(1, 2.0, "b".to_string())],
566 OpType::Put,
567 ))
568 .unwrap();
569 let mut iter = memtable.iter(None, None, None).unwrap();
570 let batch = iter.next().unwrap().unwrap();
571
572 assert_eq!(1, batch.num_rows()); assert_eq!(2.0, batch.fields()[0].data.get(0).as_f64_lossy().unwrap()); }
575
576 #[test]
577 fn test_write_one() {
578 let memtable = new_test_memtable(false, MergeMode::LastRow);
579 let kvs = build_key_values(
580 &memtable.region_metadata,
581 0,
582 &[(1, 1.0, "a".to_string())],
583 OpType::Put,
584 );
585 let kv = kvs.iter().next().unwrap();
586 memtable.write_one(kv).unwrap();
587
588 let mut iter = memtable.iter(None, None, None).unwrap();
589 let batch = iter.next().unwrap().unwrap();
590 assert_eq!(1, batch.num_rows());
591 }
592
593 #[tokio::test]
594 async fn test_write_dedup() {
595 let memtable = new_test_memtable(true, MergeMode::LastRow);
596 let kvs = build_key_values(
597 &memtable.region_metadata,
598 0,
599 &[(1, 1.0, "a".to_string())],
600 OpType::Put,
601 );
602 let kv = kvs.iter().next().unwrap();
603 memtable.write_one(kv).unwrap();
604 memtable.freeze().unwrap();
605
606 let kvs = build_key_values(
607 &memtable.region_metadata,
608 1,
609 &[(1, 1.0, "a".to_string())],
610 OpType::Delete,
611 );
612 let kv = kvs.iter().next().unwrap();
613 memtable.write_one(kv).unwrap();
614
615 let ranges = memtable
616 .ranges(None, PredicateGroup::default(), None, false)
617 .unwrap();
618 let mut source = vec![];
619 for r in ranges.ranges.values() {
620 source.push(Source::Iter(r.build_iter().unwrap()));
621 }
622
623 let reader = MergeReaderBuilder::from_sources(source)
624 .build()
625 .await
626 .unwrap();
627
628 let mut reader = DedupReader::new(reader, read::dedup::LastRow::new(false));
629 let mut num_rows = 0;
630 while let Some(b) = reader.next_batch().await.unwrap() {
631 num_rows += b.num_rows();
632 }
633 assert_eq!(num_rows, 1);
634 }
635
636 #[tokio::test]
637 async fn test_delete_only() {
638 let memtable = new_test_memtable(true, MergeMode::LastRow);
639 let kvs = build_key_values(
640 &memtable.region_metadata,
641 0,
642 &[(1, 1.0, "a".to_string())],
643 OpType::Delete,
644 );
645 let kv = kvs.iter().next().unwrap();
646 memtable.write_one(kv).unwrap();
647 memtable.freeze().unwrap();
648
649 let ranges = memtable
650 .ranges(None, PredicateGroup::default(), None, false)
651 .unwrap();
652 let mut source = vec![];
653 for r in ranges.ranges.values() {
654 source.push(Source::Iter(r.build_iter().unwrap()));
655 }
656
657 let reader = MergeReaderBuilder::from_sources(source)
658 .build()
659 .await
660 .unwrap();
661
662 let mut reader = DedupReader::new(reader, read::dedup::LastRow::new(false));
663 let mut num_rows = 0;
664 while let Some(b) = reader.next_batch().await.unwrap() {
665 num_rows += b.num_rows();
666 assert_eq!(b.num_rows(), 1);
667 assert_eq!(b.op_types().get_data(0).unwrap(), OpType::Delete as u8);
668 }
669 assert_eq!(num_rows, 1);
670 }
671
672 #[tokio::test]
673 async fn test_single_range() {
674 let memtable = new_test_memtable(true, MergeMode::LastRow);
675 let kvs = build_key_values(
676 &memtable.region_metadata,
677 0,
678 &[(1, 1.0, "a".to_string())],
679 OpType::Put,
680 );
681 memtable.write_one(kvs.iter().next().unwrap()).unwrap();
682
683 let kvs = build_key_values(
684 &memtable.region_metadata,
685 1,
686 &[(1, 2.0, "b".to_string())],
687 OpType::Put,
688 );
689 memtable.write_one(kvs.iter().next().unwrap()).unwrap();
690 memtable.freeze().unwrap();
691
692 let ranges = memtable
693 .ranges(None, PredicateGroup::default(), None, false)
694 .unwrap();
695 assert_eq!(ranges.ranges.len(), 1);
696 let range = ranges.ranges.into_values().next().unwrap();
697 let mut reader = range.context.builder.build(None).unwrap();
698
699 let mut num_rows = 0;
700 while let Some(b) = reader.next().transpose().unwrap() {
701 num_rows += b.num_rows();
702 assert_eq!(b.fields()[1].data.get(0).as_string(), Some("b".to_string()));
703 }
704 assert_eq!(num_rows, 1);
705 }
706
707 #[test]
708 fn test_write_bulk() {
709 let memtable = new_test_memtable(false, MergeMode::LastRow);
710 let arrow_schema = memtable.schema().schema.arrow_schema().clone();
711 let arrays = vec![
712 Arc::new(TimestampMillisecondArray::from(vec![1, 2])) as ArrayRef,
713 Arc::new(Float64Array::from(vec![1.0, 2.0])) as ArrayRef,
714 Arc::new(StringArray::from(vec!["a", "b"])) as ArrayRef,
715 ];
716 let rb = DfRecordBatch::try_new(arrow_schema, arrays).unwrap();
717
718 let part = BulkPart {
719 batch: rb,
720 sequence: 1,
721 min_timestamp: 1,
722 max_timestamp: 2,
723 timestamp_index: 0,
724 raw_data: None,
725 };
726 memtable.write_bulk(part).unwrap();
727
728 let mut iter = memtable.iter(None, None, None).unwrap();
729 let batch = iter.next().unwrap().unwrap();
730 assert_eq!(2, batch.num_rows());
731
732 let stats = memtable.stats();
733 assert_eq!(1, stats.max_sequence);
734 assert_eq!(2, stats.num_rows);
735 assert_eq!(
736 Some((Timestamp::new_millisecond(1), Timestamp::new_millisecond(2))),
737 stats.time_range
738 );
739
740 let kvs = build_key_values(
741 &memtable.region_metadata,
742 2,
743 &[(3, 3.0, "c".to_string())],
744 OpType::Put,
745 );
746 memtable.write(&kvs).unwrap();
747 let mut iter = memtable.iter(None, None, None).unwrap();
748 let batch = iter.next().unwrap().unwrap();
749 assert_eq!(3, batch.num_rows());
750 assert_eq!(
751 vec![1, 2, 3],
752 batch
753 .timestamps()
754 .as_any()
755 .downcast_ref::<TimestampMillisecondVector>()
756 .unwrap()
757 .iter_data()
758 .map(|t| { t.unwrap().0.value() })
759 .collect::<Vec<_>>()
760 );
761 }
762
763 #[test]
764 fn test_is_empty() {
765 let memtable = new_test_memtable(false, MergeMode::LastRow);
766 assert!(memtable.is_empty());
767
768 memtable
769 .write(&build_key_values(
770 &memtable.region_metadata,
771 0,
772 &[(1, 1.0, "a".to_string())],
773 OpType::Put,
774 ))
775 .unwrap();
776 assert!(!memtable.is_empty());
777 }
778
779 #[test]
780 fn test_stats() {
781 let memtable = new_test_memtable(false, MergeMode::LastRow);
782 let stats = memtable.stats();
783 assert_eq!(0, stats.num_rows);
784 assert!(stats.time_range.is_none());
785
786 memtable
787 .write(&build_key_values(
788 &memtable.region_metadata,
789 0,
790 &[(1, 1.0, "a".to_string())],
791 OpType::Put,
792 ))
793 .unwrap();
794 let stats = memtable.stats();
795 assert_eq!(1, stats.num_rows);
796 assert!(stats.time_range.is_some());
797 }
798
799 #[test]
800 fn test_fork() {
801 let memtable = new_test_memtable(false, MergeMode::LastRow);
802 memtable
803 .write(&build_key_values(
804 &memtable.region_metadata,
805 0,
806 &[(1, 1.0, "a".to_string())],
807 OpType::Put,
808 ))
809 .unwrap();
810
811 let forked = memtable.fork(2, &memtable.region_metadata);
812 assert!(forked.is_empty());
813 }
814
815 #[test]
816 fn test_sequence_filter() {
817 let memtable = new_test_memtable(false, MergeMode::LastRow);
818 memtable
819 .write(&build_key_values(
820 &memtable.region_metadata,
821 0,
822 &[(1, 1.0, "a".to_string())],
823 OpType::Put,
824 ))
825 .unwrap();
826 memtable
827 .write(&build_key_values(
828 &memtable.region_metadata,
829 1,
830 &[(2, 2.0, "b".to_string())],
831 OpType::Put,
832 ))
833 .unwrap();
834
835 let mut iter = memtable.iter(None, None, Some(0)).unwrap();
837 let batch = iter.next().unwrap().unwrap();
838 assert_eq!(1, batch.num_rows());
839 assert_eq!(1.0, batch.fields()[0].data.get(0).as_f64_lossy().unwrap());
840 }
841
842 fn rb_with_large_string(
843 ts: i64,
844 string_len: i32,
845 region_meta: &RegionMetadataRef,
846 ) -> RecordBatch {
847 let schema = region_meta.schema.arrow_schema().clone();
848 RecordBatch::try_new(
849 schema,
850 vec![
851 Arc::new(StringArray::from_iter_values(
852 ["a".repeat(string_len as usize).clone()].into_iter(),
853 )) as ArrayRef,
854 Arc::new(TimestampMillisecondArray::from_iter_values(
855 [ts].into_iter(),
856 )) as ArrayRef,
857 ],
858 )
859 .unwrap()
860 }
861
862 #[tokio::test]
863 async fn test_write_read_large_string() {
864 let mut builder = RegionMetadataBuilder::new(RegionId::new(123, 456));
865 builder
866 .push_column_metadata(ColumnMetadata {
867 column_schema: ColumnSchema::new("k0", ConcreteDataType::string_datatype(), false),
868 semantic_type: SemanticType::Field,
869 column_id: 0,
870 })
871 .push_column_metadata(ColumnMetadata {
872 column_schema: ColumnSchema::new(
873 "ts",
874 ConcreteDataType::timestamp_millisecond_datatype(),
875 false,
876 ),
877 semantic_type: SemanticType::Timestamp,
878 column_id: 1,
879 })
880 .primary_key(vec![]);
881 let region_meta = Arc::new(builder.build().unwrap());
882 let memtable =
883 SimpleBulkMemtable::new(0, region_meta.clone(), None, true, MergeMode::LastRow);
884 memtable
885 .write_bulk(BulkPart {
886 batch: rb_with_large_string(0, i32::MAX, ®ion_meta),
887 max_timestamp: 0,
888 min_timestamp: 0,
889 sequence: 0,
890 timestamp_index: 1,
891 raw_data: None,
892 })
893 .unwrap();
894
895 memtable.freeze().unwrap();
896 memtable
897 .write_bulk(BulkPart {
898 batch: rb_with_large_string(1, 3, ®ion_meta),
899 max_timestamp: 1,
900 min_timestamp: 1,
901 sequence: 1,
902 timestamp_index: 1,
903 raw_data: None,
904 })
905 .unwrap();
906 let MemtableRanges { ranges, .. } = memtable
907 .ranges(None, PredicateGroup::default(), None, false)
908 .unwrap();
909 let mut source = if ranges.len() == 1 {
910 let only_range = ranges.into_values().next().unwrap();
911 Source::Iter(only_range.build_iter().unwrap())
912 } else {
913 let sources = ranges
914 .into_values()
915 .map(|r| r.build_iter().map(Source::Iter))
916 .collect::<error::Result<Vec<_>>>()
917 .unwrap();
918 let merge_reader = MergeReaderBuilder::from_sources(sources)
919 .build()
920 .await
921 .unwrap();
922 Source::Reader(Box::new(merge_reader))
923 };
924
925 let mut rows = 0;
926 while let Some(b) = source.next_batch().await.unwrap() {
927 rows += b.num_rows();
928 }
929 assert_eq!(rows, 2);
930 }
931}