1#[cfg(any(test, feature = "test"))]
16mod test_only;
17
18use std::collections::HashSet;
19use std::fmt::{Debug, Formatter};
20use std::sync::atomic::{AtomicI64, AtomicU64, AtomicUsize, Ordering};
21use std::sync::{Arc, RwLock};
22use std::time::{Duration, Instant};
23
24use api::v1::OpType;
25use datatypes::vectors::Helper;
26use mito_codec::key_values::KeyValue;
27use rayon::prelude::*;
28use snafu::{OptionExt, ResultExt};
29use store_api::metadata::RegionMetadataRef;
30use store_api::storage::{ColumnId, SequenceNumber};
31
32use crate::flush::WriteBufferManagerRef;
33use crate::memtable::bulk::part::BulkPart;
34use crate::memtable::stats::WriteMetrics;
35use crate::memtable::time_series::Series;
36use crate::memtable::{
37 AllocTracker, BoxedBatchIterator, IterBuilder, KeyValues, MemScanMetrics, Memtable, MemtableId,
38 MemtableRange, MemtableRangeContext, MemtableRanges, MemtableRef, MemtableStats,
39};
40use crate::metrics::MEMTABLE_ACTIVE_SERIES_COUNT;
41use crate::read::dedup::LastNonNullIter;
42use crate::read::scan_region::PredicateGroup;
43use crate::read::Batch;
44use crate::region::options::MergeMode;
45use crate::{error, metrics};
46
47pub struct SimpleBulkMemtable {
48 id: MemtableId,
49 region_metadata: RegionMetadataRef,
50 alloc_tracker: AllocTracker,
51 max_timestamp: AtomicI64,
52 min_timestamp: AtomicI64,
53 max_sequence: AtomicU64,
54 dedup: bool,
55 merge_mode: MergeMode,
56 num_rows: AtomicUsize,
57 series: RwLock<Series>,
58}
59
60impl Drop for SimpleBulkMemtable {
61 fn drop(&mut self) {
62 MEMTABLE_ACTIVE_SERIES_COUNT.dec();
63 }
64}
65
66impl SimpleBulkMemtable {
67 pub fn new(
68 id: MemtableId,
69 region_metadata: RegionMetadataRef,
70 write_buffer_manager: Option<WriteBufferManagerRef>,
71 dedup: bool,
72 merge_mode: MergeMode,
73 ) -> Self {
74 let dedup = if merge_mode == MergeMode::LastNonNull {
75 false
76 } else {
77 dedup
78 };
79 let series = RwLock::new(Series::with_capacity(®ion_metadata, 1024, 8192));
80
81 Self {
82 id,
83 region_metadata,
84 alloc_tracker: AllocTracker::new(write_buffer_manager),
85 max_timestamp: AtomicI64::new(i64::MIN),
86 min_timestamp: AtomicI64::new(i64::MAX),
87 max_sequence: AtomicU64::new(0),
88 dedup,
89 merge_mode,
90 num_rows: AtomicUsize::new(0),
91 series,
92 }
93 }
94
95 fn build_projection(&self, projection: Option<&[ColumnId]>) -> HashSet<ColumnId> {
96 if let Some(projection) = projection {
97 projection.iter().copied().collect()
98 } else {
99 self.region_metadata
100 .field_columns()
101 .map(|c| c.column_id)
102 .collect()
103 }
104 }
105
106 fn write_key_value(&self, kv: KeyValue, stats: &mut WriteMetrics) {
107 let ts = kv.timestamp();
108 let sequence = kv.sequence();
109 let op_type = kv.op_type();
110 let mut series = self.series.write().unwrap();
111 let size = series.push(ts, sequence, op_type, kv.fields());
112 stats.value_bytes += size;
113 let ts = kv.timestamp().as_timestamp().unwrap().unwrap().value();
115 stats.min_ts = stats.min_ts.min(ts);
116 stats.max_ts = stats.max_ts.max(ts);
117 }
118
119 fn update_stats(&self, stats: WriteMetrics) {
121 self.alloc_tracker
122 .on_allocation(stats.key_bytes + stats.value_bytes);
123 self.num_rows.fetch_add(stats.num_rows, Ordering::SeqCst);
124 self.max_timestamp.fetch_max(stats.max_ts, Ordering::SeqCst);
125 self.min_timestamp.fetch_min(stats.min_ts, Ordering::SeqCst);
126 self.max_sequence
127 .fetch_max(stats.max_sequence, Ordering::SeqCst);
128 }
129
130 #[cfg(test)]
131 fn schema(&self) -> &RegionMetadataRef {
132 &self.region_metadata
133 }
134}
135
136impl Debug for SimpleBulkMemtable {
137 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
138 f.debug_struct("SimpleBulkMemtable").finish()
139 }
140}
141
142impl Memtable for SimpleBulkMemtable {
143 fn id(&self) -> MemtableId {
144 self.id
145 }
146
147 fn write(&self, kvs: &KeyValues) -> error::Result<()> {
148 let mut stats = WriteMetrics::default();
149 let max_sequence = kvs.max_sequence();
150 for kv in kvs.iter() {
151 self.write_key_value(kv, &mut stats);
152 }
153 stats.max_sequence = max_sequence;
154 stats.num_rows = kvs.num_rows();
155 self.update_stats(stats);
156 Ok(())
157 }
158
159 fn write_one(&self, kv: KeyValue) -> error::Result<()> {
160 debug_assert_eq!(0, kv.num_primary_keys());
161 let mut stats = WriteMetrics::default();
162 self.write_key_value(kv, &mut stats);
163 stats.num_rows = 1;
164 stats.max_sequence = kv.sequence();
165 self.update_stats(stats);
166 Ok(())
167 }
168
169 fn write_bulk(&self, part: BulkPart) -> error::Result<()> {
170 let rb = &part.batch;
171
172 let ts = Helper::try_into_vector(
173 rb.column_by_name(&self.region_metadata.time_index_column().column_schema.name)
174 .with_context(|| error::InvalidRequestSnafu {
175 region_id: self.region_metadata.region_id,
176 reason: "Timestamp not found",
177 })?,
178 )
179 .context(error::ConvertVectorSnafu)?;
180
181 let sequence = part.sequence;
182
183 let fields: Vec<_> = self
184 .region_metadata
185 .field_columns()
186 .map(|f| {
187 let array = rb.column_by_name(&f.column_schema.name).ok_or_else(|| {
188 error::InvalidRequestSnafu {
189 region_id: self.region_metadata.region_id,
190 reason: format!("Column {} not found", f.column_schema.name),
191 }
192 .build()
193 })?;
194 Helper::try_into_vector(array).context(error::ConvertVectorSnafu)
195 })
196 .collect::<error::Result<Vec<_>>>()?;
197
198 let mut series = self.series.write().unwrap();
199 let extend_timer = metrics::REGION_WORKER_HANDLE_WRITE_ELAPSED
200 .with_label_values(&["bulk_extend"])
201 .start_timer();
202 series.extend(ts, OpType::Put as u8, sequence, fields)?;
203 extend_timer.observe_duration();
204
205 self.update_stats(WriteMetrics {
206 key_bytes: 0,
207 value_bytes: part.estimated_size(),
208 min_ts: part.min_ts,
209 max_ts: part.max_ts,
210 num_rows: part.num_rows(),
211 max_sequence: sequence,
212 });
213 Ok(())
214 }
215
216 #[cfg(any(test, feature = "test"))]
217 fn iter(
218 &self,
219 projection: Option<&[ColumnId]>,
220 _predicate: Option<table::predicate::Predicate>,
221 sequence: Option<SequenceNumber>,
222 ) -> error::Result<BoxedBatchIterator> {
223 let iter = self.create_iter(projection, sequence)?.build(None)?;
224
225 if self.merge_mode == MergeMode::LastNonNull {
226 let iter = LastNonNullIter::new(iter);
227 Ok(Box::new(iter))
228 } else {
229 Ok(Box::new(iter))
230 }
231 }
232
233 fn ranges(
234 &self,
235 projection: Option<&[ColumnId]>,
236 predicate: PredicateGroup,
237 sequence: Option<SequenceNumber>,
238 ) -> error::Result<MemtableRanges> {
239 let start_time = Instant::now();
240 let projection = Arc::new(self.build_projection(projection));
241 let values = self.series.read().unwrap().read_to_values();
242 let contexts = values
243 .into_par_iter()
244 .filter_map(|v| {
245 let filtered = match v
246 .to_batch(&[], &self.region_metadata, &projection, self.dedup)
247 .and_then(|mut b| {
248 b.filter_by_sequence(sequence)?;
249 Ok(b)
250 }) {
251 Ok(filtered) => filtered,
252 Err(e) => {
253 return Some(Err(e));
254 }
255 };
256 if filtered.is_empty() {
257 None
258 } else {
259 Some(Ok(filtered))
260 }
261 })
262 .map(|result| {
263 result.map(|batch| {
264 let num_rows = batch.num_rows();
265 let builder = BatchRangeBuilder {
266 batch,
267 merge_mode: self.merge_mode,
268 scan_cost: start_time.elapsed(),
269 };
270 (
271 num_rows,
272 Arc::new(MemtableRangeContext::new(
273 self.id,
274 Box::new(builder),
275 predicate.clone(),
276 )),
277 )
278 })
279 })
280 .collect::<error::Result<Vec<_>>>()?;
281
282 let ranges = contexts
283 .into_iter()
284 .enumerate()
285 .map(|(idx, (num_rows, context))| (idx, MemtableRange::new(context, num_rows)))
286 .collect();
287
288 Ok(MemtableRanges {
289 ranges,
290 stats: self.stats(),
291 })
292 }
293
294 fn is_empty(&self) -> bool {
295 self.series.read().unwrap().is_empty()
296 }
297
298 fn freeze(&self) -> error::Result<()> {
299 self.series.write().unwrap().freeze(&self.region_metadata);
300 Ok(())
301 }
302
303 fn stats(&self) -> MemtableStats {
304 let estimated_bytes = self.alloc_tracker.bytes_allocated();
305 let num_rows = self.num_rows.load(Ordering::Relaxed);
306 if num_rows == 0 {
307 return MemtableStats {
309 estimated_bytes,
310 time_range: None,
311 num_rows: 0,
312 num_ranges: 0,
313 max_sequence: 0,
314 series_count: 0,
315 };
316 }
317 let ts_type = self
318 .region_metadata
319 .time_index_column()
320 .column_schema
321 .data_type
322 .clone()
323 .as_timestamp()
324 .expect("Timestamp column must have timestamp type");
325 let max_timestamp = ts_type.create_timestamp(self.max_timestamp.load(Ordering::Relaxed));
326 let min_timestamp = ts_type.create_timestamp(self.min_timestamp.load(Ordering::Relaxed));
327 MemtableStats {
328 estimated_bytes,
329 time_range: Some((min_timestamp, max_timestamp)),
330 num_rows,
331 num_ranges: 1,
332 max_sequence: self.max_sequence.load(Ordering::Relaxed),
333 series_count: 1,
334 }
335 }
336
337 fn fork(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef {
338 Arc::new(Self::new(
339 id,
340 metadata.clone(),
341 self.alloc_tracker.write_buffer_manager(),
342 self.dedup,
343 self.merge_mode,
344 ))
345 }
346}
347
348#[derive(Clone)]
349pub struct BatchRangeBuilder {
350 pub batch: Batch,
351 pub merge_mode: MergeMode,
352 scan_cost: Duration,
353}
354
355impl IterBuilder for BatchRangeBuilder {
356 fn build(&self, metrics: Option<MemScanMetrics>) -> error::Result<BoxedBatchIterator> {
357 let batch = self.batch.clone();
358 if let Some(metrics) = metrics {
359 let inner = crate::memtable::MemScanMetricsData {
360 total_series: 1,
361 num_rows: batch.num_rows(),
362 num_batches: 1,
363 scan_cost: self.scan_cost,
364 };
365 metrics.merge_inner(&inner);
366 }
367
368 let iter = Iter {
369 batch: Some(Ok(batch)),
370 };
371
372 if self.merge_mode == MergeMode::LastNonNull {
373 Ok(Box::new(LastNonNullIter::new(iter)))
374 } else {
375 Ok(Box::new(iter))
376 }
377 }
378}
379
380struct Iter {
381 batch: Option<error::Result<Batch>>,
382}
383
384impl Iterator for Iter {
385 type Item = error::Result<Batch>;
386
387 fn next(&mut self) -> Option<Self::Item> {
388 self.batch.take()
389 }
390}
391
392#[cfg(test)]
393mod tests {
394 use std::sync::Arc;
395
396 use api::v1::helper::row;
397 use api::v1::value::ValueData;
398 use api::v1::{Mutation, OpType, Rows, SemanticType};
399 use common_recordbatch::DfRecordBatch;
400 use common_time::Timestamp;
401 use datatypes::arrow::array::{ArrayRef, Float64Array, RecordBatch, TimestampMillisecondArray};
402 use datatypes::arrow_array::StringArray;
403 use datatypes::data_type::ConcreteDataType;
404 use datatypes::prelude::{ScalarVector, Vector};
405 use datatypes::schema::ColumnSchema;
406 use datatypes::value::Value;
407 use datatypes::vectors::TimestampMillisecondVector;
408 use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
409 use store_api::storage::{RegionId, SequenceNumber};
410
411 use super::*;
412 use crate::read;
413 use crate::read::dedup::DedupReader;
414 use crate::read::merge::MergeReaderBuilder;
415 use crate::read::{BatchReader, Source};
416 use crate::region::options::MergeMode;
417 use crate::test_util::column_metadata_to_column_schema;
418
419 fn new_test_metadata() -> RegionMetadataRef {
420 let mut builder = RegionMetadataBuilder::new(1.into());
421 builder
422 .push_column_metadata(ColumnMetadata {
423 column_schema: ColumnSchema::new(
424 "ts",
425 ConcreteDataType::timestamp_millisecond_datatype(),
426 false,
427 ),
428 semantic_type: SemanticType::Timestamp,
429 column_id: 1,
430 })
431 .push_column_metadata(ColumnMetadata {
432 column_schema: ColumnSchema::new("f1", ConcreteDataType::float64_datatype(), true),
433 semantic_type: SemanticType::Field,
434 column_id: 2,
435 })
436 .push_column_metadata(ColumnMetadata {
437 column_schema: ColumnSchema::new("f2", ConcreteDataType::string_datatype(), true),
438 semantic_type: SemanticType::Field,
439 column_id: 3,
440 });
441 Arc::new(builder.build().unwrap())
442 }
443
444 fn new_test_memtable(dedup: bool, merge_mode: MergeMode) -> SimpleBulkMemtable {
445 SimpleBulkMemtable::new(1, new_test_metadata(), None, dedup, merge_mode)
446 }
447
448 fn build_key_values(
449 metadata: &RegionMetadataRef,
450 sequence: SequenceNumber,
451 row_values: &[(i64, f64, String)],
452 op_type: OpType,
453 ) -> KeyValues {
454 let column_schemas: Vec<_> = metadata
455 .column_metadatas
456 .iter()
457 .map(column_metadata_to_column_schema)
458 .collect();
459
460 let rows: Vec<_> = row_values
461 .iter()
462 .map(|(ts, f1, f2)| {
463 row(vec![
464 ValueData::TimestampMillisecondValue(*ts),
465 ValueData::F64Value(*f1),
466 ValueData::StringValue(f2.clone()),
467 ])
468 })
469 .collect();
470 let mutation = Mutation {
471 op_type: op_type as i32,
472 sequence,
473 rows: Some(Rows {
474 schema: column_schemas,
475 rows,
476 }),
477 write_hint: None,
478 };
479 KeyValues::new(metadata, mutation).unwrap()
480 }
481
482 #[test]
483 fn test_write_and_iter() {
484 let memtable = new_test_memtable(false, MergeMode::LastRow);
485 memtable
486 .write(&build_key_values(
487 &memtable.region_metadata,
488 0,
489 &[(1, 1.0, "a".to_string())],
490 OpType::Put,
491 ))
492 .unwrap();
493 memtable
494 .write(&build_key_values(
495 &memtable.region_metadata,
496 1,
497 &[(2, 2.0, "b".to_string())],
498 OpType::Put,
499 ))
500 .unwrap();
501
502 let mut iter = memtable.iter(None, None, None).unwrap();
503 let batch = iter.next().unwrap().unwrap();
504 assert_eq!(2, batch.num_rows());
505 assert_eq!(2, batch.fields().len());
506 let ts_v = batch
507 .timestamps()
508 .as_any()
509 .downcast_ref::<TimestampMillisecondVector>()
510 .unwrap();
511 assert_eq!(Value::Timestamp(Timestamp::new_millisecond(1)), ts_v.get(0));
512 assert_eq!(Value::Timestamp(Timestamp::new_millisecond(2)), ts_v.get(1));
513 }
514
515 #[test]
516 fn test_projection() {
517 let memtable = new_test_memtable(false, MergeMode::LastRow);
518 memtable
519 .write(&build_key_values(
520 &memtable.region_metadata,
521 0,
522 &[(1, 1.0, "a".to_string())],
523 OpType::Put,
524 ))
525 .unwrap();
526
527 let mut iter = memtable.iter(None, None, None).unwrap();
528 let batch = iter.next().unwrap().unwrap();
529 assert_eq!(1, batch.num_rows());
530 assert_eq!(2, batch.fields().len());
531
532 let ts_v = batch
533 .timestamps()
534 .as_any()
535 .downcast_ref::<TimestampMillisecondVector>()
536 .unwrap();
537 assert_eq!(Value::Timestamp(Timestamp::new_millisecond(1)), ts_v.get(0));
538
539 let projection = vec![2];
541 let mut iter = memtable.iter(Some(&projection), None, None).unwrap();
542 let batch = iter.next().unwrap().unwrap();
543
544 assert_eq!(1, batch.num_rows());
545 assert_eq!(1, batch.fields().len()); assert_eq!(2, batch.fields()[0].column_id);
547 }
548
549 #[test]
550 fn test_dedup() {
551 let memtable = new_test_memtable(true, MergeMode::LastRow);
552 memtable
553 .write(&build_key_values(
554 &memtable.region_metadata,
555 0,
556 &[(1, 1.0, "a".to_string())],
557 OpType::Put,
558 ))
559 .unwrap();
560 memtable
561 .write(&build_key_values(
562 &memtable.region_metadata,
563 1,
564 &[(1, 2.0, "b".to_string())],
565 OpType::Put,
566 ))
567 .unwrap();
568 let mut iter = memtable.iter(None, None, None).unwrap();
569 let batch = iter.next().unwrap().unwrap();
570
571 assert_eq!(1, batch.num_rows()); assert_eq!(2.0, batch.fields()[0].data.get(0).as_f64_lossy().unwrap()); }
574
575 #[test]
576 fn test_write_one() {
577 let memtable = new_test_memtable(false, MergeMode::LastRow);
578 let kvs = build_key_values(
579 &memtable.region_metadata,
580 0,
581 &[(1, 1.0, "a".to_string())],
582 OpType::Put,
583 );
584 let kv = kvs.iter().next().unwrap();
585 memtable.write_one(kv).unwrap();
586
587 let mut iter = memtable.iter(None, None, None).unwrap();
588 let batch = iter.next().unwrap().unwrap();
589 assert_eq!(1, batch.num_rows());
590 }
591
592 #[tokio::test]
593 async fn test_write_dedup() {
594 let memtable = new_test_memtable(true, MergeMode::LastRow);
595 let kvs = build_key_values(
596 &memtable.region_metadata,
597 0,
598 &[(1, 1.0, "a".to_string())],
599 OpType::Put,
600 );
601 let kv = kvs.iter().next().unwrap();
602 memtable.write_one(kv).unwrap();
603 memtable.freeze().unwrap();
604
605 let kvs = build_key_values(
606 &memtable.region_metadata,
607 1,
608 &[(1, 1.0, "a".to_string())],
609 OpType::Delete,
610 );
611 let kv = kvs.iter().next().unwrap();
612 memtable.write_one(kv).unwrap();
613
614 let ranges = memtable
615 .ranges(None, PredicateGroup::default(), None)
616 .unwrap();
617 let mut source = vec![];
618 for r in ranges.ranges.values() {
619 source.push(Source::Iter(r.build_iter().unwrap()));
620 }
621
622 let reader = MergeReaderBuilder::from_sources(source)
623 .build()
624 .await
625 .unwrap();
626
627 let mut reader = DedupReader::new(reader, read::dedup::LastRow::new(false));
628 let mut num_rows = 0;
629 while let Some(b) = reader.next_batch().await.unwrap() {
630 num_rows += b.num_rows();
631 }
632 assert_eq!(num_rows, 1);
633 }
634
635 #[tokio::test]
636 async fn test_delete_only() {
637 let memtable = new_test_memtable(true, MergeMode::LastRow);
638 let kvs = build_key_values(
639 &memtable.region_metadata,
640 0,
641 &[(1, 1.0, "a".to_string())],
642 OpType::Delete,
643 );
644 let kv = kvs.iter().next().unwrap();
645 memtable.write_one(kv).unwrap();
646 memtable.freeze().unwrap();
647
648 let ranges = memtable
649 .ranges(None, PredicateGroup::default(), None)
650 .unwrap();
651 let mut source = vec![];
652 for r in ranges.ranges.values() {
653 source.push(Source::Iter(r.build_iter().unwrap()));
654 }
655
656 let reader = MergeReaderBuilder::from_sources(source)
657 .build()
658 .await
659 .unwrap();
660
661 let mut reader = DedupReader::new(reader, read::dedup::LastRow::new(false));
662 let mut num_rows = 0;
663 while let Some(b) = reader.next_batch().await.unwrap() {
664 num_rows += b.num_rows();
665 assert_eq!(b.num_rows(), 1);
666 assert_eq!(b.op_types().get_data(0).unwrap(), OpType::Delete as u8);
667 }
668 assert_eq!(num_rows, 1);
669 }
670
671 #[tokio::test]
672 async fn test_single_range() {
673 let memtable = new_test_memtable(true, MergeMode::LastRow);
674 let kvs = build_key_values(
675 &memtable.region_metadata,
676 0,
677 &[(1, 1.0, "a".to_string())],
678 OpType::Put,
679 );
680 memtable.write_one(kvs.iter().next().unwrap()).unwrap();
681
682 let kvs = build_key_values(
683 &memtable.region_metadata,
684 1,
685 &[(1, 2.0, "b".to_string())],
686 OpType::Put,
687 );
688 memtable.write_one(kvs.iter().next().unwrap()).unwrap();
689 memtable.freeze().unwrap();
690
691 let ranges = memtable
692 .ranges(None, PredicateGroup::default(), None)
693 .unwrap();
694 assert_eq!(ranges.ranges.len(), 1);
695 let range = ranges.ranges.into_values().next().unwrap();
696 let mut reader = range.context.builder.build(None).unwrap();
697
698 let mut num_rows = 0;
699 while let Some(b) = reader.next().transpose().unwrap() {
700 num_rows += b.num_rows();
701 assert_eq!(b.fields()[1].data.get(0).as_string(), Some("b".to_string()));
702 }
703 assert_eq!(num_rows, 1);
704 }
705
706 #[test]
707 fn test_write_bulk() {
708 let memtable = new_test_memtable(false, MergeMode::LastRow);
709 let arrow_schema = memtable.schema().schema.arrow_schema().clone();
710 let arrays = vec![
711 Arc::new(TimestampMillisecondArray::from(vec![1, 2])) as ArrayRef,
712 Arc::new(Float64Array::from(vec![1.0, 2.0])) as ArrayRef,
713 Arc::new(StringArray::from(vec!["a", "b"])) as ArrayRef,
714 ];
715 let rb = DfRecordBatch::try_new(arrow_schema, arrays).unwrap();
716
717 let part = BulkPart {
718 batch: rb,
719 sequence: 1,
720 min_ts: 1,
721 max_ts: 2,
722 timestamp_index: 0,
723 raw_data: None,
724 };
725 memtable.write_bulk(part).unwrap();
726
727 let mut iter = memtable.iter(None, None, None).unwrap();
728 let batch = iter.next().unwrap().unwrap();
729 assert_eq!(2, batch.num_rows());
730
731 let stats = memtable.stats();
732 assert_eq!(1, stats.max_sequence);
733 assert_eq!(2, stats.num_rows);
734 assert_eq!(
735 Some((Timestamp::new_millisecond(1), Timestamp::new_millisecond(2))),
736 stats.time_range
737 );
738
739 let kvs = build_key_values(
740 &memtable.region_metadata,
741 2,
742 &[(3, 3.0, "c".to_string())],
743 OpType::Put,
744 );
745 memtable.write(&kvs).unwrap();
746 let mut iter = memtable.iter(None, None, None).unwrap();
747 let batch = iter.next().unwrap().unwrap();
748 assert_eq!(3, batch.num_rows());
749 assert_eq!(
750 vec![1, 2, 3],
751 batch
752 .timestamps()
753 .as_any()
754 .downcast_ref::<TimestampMillisecondVector>()
755 .unwrap()
756 .iter_data()
757 .map(|t| { t.unwrap().0.value() })
758 .collect::<Vec<_>>()
759 );
760 }
761
762 #[test]
763 fn test_is_empty() {
764 let memtable = new_test_memtable(false, MergeMode::LastRow);
765 assert!(memtable.is_empty());
766
767 memtable
768 .write(&build_key_values(
769 &memtable.region_metadata,
770 0,
771 &[(1, 1.0, "a".to_string())],
772 OpType::Put,
773 ))
774 .unwrap();
775 assert!(!memtable.is_empty());
776 }
777
778 #[test]
779 fn test_stats() {
780 let memtable = new_test_memtable(false, MergeMode::LastRow);
781 let stats = memtable.stats();
782 assert_eq!(0, stats.num_rows);
783 assert!(stats.time_range.is_none());
784
785 memtable
786 .write(&build_key_values(
787 &memtable.region_metadata,
788 0,
789 &[(1, 1.0, "a".to_string())],
790 OpType::Put,
791 ))
792 .unwrap();
793 let stats = memtable.stats();
794 assert_eq!(1, stats.num_rows);
795 assert!(stats.time_range.is_some());
796 }
797
798 #[test]
799 fn test_fork() {
800 let memtable = new_test_memtable(false, MergeMode::LastRow);
801 memtable
802 .write(&build_key_values(
803 &memtable.region_metadata,
804 0,
805 &[(1, 1.0, "a".to_string())],
806 OpType::Put,
807 ))
808 .unwrap();
809
810 let forked = memtable.fork(2, &memtable.region_metadata);
811 assert!(forked.is_empty());
812 }
813
814 #[test]
815 fn test_sequence_filter() {
816 let memtable = new_test_memtable(false, MergeMode::LastRow);
817 memtable
818 .write(&build_key_values(
819 &memtable.region_metadata,
820 0,
821 &[(1, 1.0, "a".to_string())],
822 OpType::Put,
823 ))
824 .unwrap();
825 memtable
826 .write(&build_key_values(
827 &memtable.region_metadata,
828 1,
829 &[(2, 2.0, "b".to_string())],
830 OpType::Put,
831 ))
832 .unwrap();
833
834 let mut iter = memtable.iter(None, None, Some(0)).unwrap();
836 let batch = iter.next().unwrap().unwrap();
837 assert_eq!(1, batch.num_rows());
838 assert_eq!(1.0, batch.fields()[0].data.get(0).as_f64_lossy().unwrap());
839 }
840
841 fn rb_with_large_string(
842 ts: i64,
843 string_len: i32,
844 region_meta: &RegionMetadataRef,
845 ) -> RecordBatch {
846 let schema = region_meta.schema.arrow_schema().clone();
847 RecordBatch::try_new(
848 schema,
849 vec![
850 Arc::new(StringArray::from_iter_values(
851 ["a".repeat(string_len as usize).to_string()].into_iter(),
852 )) as ArrayRef,
853 Arc::new(TimestampMillisecondArray::from_iter_values(
854 [ts].into_iter(),
855 )) as ArrayRef,
856 ],
857 )
858 .unwrap()
859 }
860
861 #[tokio::test]
862 async fn test_write_read_large_string() {
863 let mut builder = RegionMetadataBuilder::new(RegionId::new(123, 456));
864 builder
865 .push_column_metadata(ColumnMetadata {
866 column_schema: ColumnSchema::new("k0", ConcreteDataType::string_datatype(), false),
867 semantic_type: SemanticType::Field,
868 column_id: 0,
869 })
870 .push_column_metadata(ColumnMetadata {
871 column_schema: ColumnSchema::new(
872 "ts",
873 ConcreteDataType::timestamp_millisecond_datatype(),
874 false,
875 ),
876 semantic_type: SemanticType::Timestamp,
877 column_id: 1,
878 })
879 .primary_key(vec![]);
880 let region_meta = Arc::new(builder.build().unwrap());
881 let memtable =
882 SimpleBulkMemtable::new(0, region_meta.clone(), None, true, MergeMode::LastRow);
883 memtable
884 .write_bulk(BulkPart {
885 batch: rb_with_large_string(0, i32::MAX, ®ion_meta),
886 max_ts: 0,
887 min_ts: 0,
888 sequence: 0,
889 timestamp_index: 1,
890 raw_data: None,
891 })
892 .unwrap();
893
894 memtable.freeze().unwrap();
895 memtable
896 .write_bulk(BulkPart {
897 batch: rb_with_large_string(1, 3, ®ion_meta),
898 max_ts: 1,
899 min_ts: 1,
900 sequence: 1,
901 timestamp_index: 1,
902 raw_data: None,
903 })
904 .unwrap();
905 let MemtableRanges { ranges, .. } = memtable
906 .ranges(None, PredicateGroup::default(), None)
907 .unwrap();
908 let mut source = if ranges.len() == 1 {
909 let only_range = ranges.into_values().next().unwrap();
910 Source::Iter(only_range.build_iter().unwrap())
911 } else {
912 let sources = ranges
913 .into_values()
914 .map(|r| r.build_iter().map(Source::Iter))
915 .collect::<error::Result<Vec<_>>>()
916 .unwrap();
917 let merge_reader = MergeReaderBuilder::from_sources(sources)
918 .build()
919 .await
920 .unwrap();
921 Source::Reader(Box::new(merge_reader))
922 };
923
924 let mut rows = 0;
925 while let Some(b) = source.next_batch().await.unwrap() {
926 rows += b.num_rows();
927 }
928 assert_eq!(rows, 2);
929 }
930}