1#[cfg(any(test, feature = "test"))]
16mod test_only;
17
18use std::collections::HashSet;
19use std::fmt::{Debug, Formatter};
20use std::sync::atomic::{AtomicI64, AtomicU64, AtomicUsize, Ordering};
21use std::sync::{Arc, RwLock};
22use std::time::{Duration, Instant};
23
24use api::v1::OpType;
25use datatypes::vectors::Helper;
26use mito_codec::key_values::KeyValue;
27use rayon::prelude::*;
28use snafu::{OptionExt, ResultExt};
29use store_api::metadata::RegionMetadataRef;
30use store_api::storage::ColumnId;
31
32use crate::flush::WriteBufferManagerRef;
33use crate::memtable::bulk::part::BulkPart;
34use crate::memtable::stats::WriteMetrics;
35use crate::memtable::time_series::Series;
36use crate::memtable::{
37 AllocTracker, BoxedBatchIterator, IterBuilder, KeyValues, MemScanMetrics, Memtable, MemtableId,
38 MemtableRange, MemtableRangeContext, MemtableRanges, MemtableRef, MemtableStats, RangesOptions,
39};
40use crate::metrics::MEMTABLE_ACTIVE_SERIES_COUNT;
41use crate::read::Batch;
42use crate::read::dedup::LastNonNullIter;
43use crate::region::options::MergeMode;
44use crate::{error, metrics};
45
46pub struct SimpleBulkMemtable {
47 id: MemtableId,
48 region_metadata: RegionMetadataRef,
49 alloc_tracker: AllocTracker,
50 max_timestamp: AtomicI64,
51 min_timestamp: AtomicI64,
52 max_sequence: AtomicU64,
53 dedup: bool,
54 merge_mode: MergeMode,
55 num_rows: AtomicUsize,
56 series: RwLock<Series>,
57}
58
59impl Drop for SimpleBulkMemtable {
60 fn drop(&mut self) {
61 MEMTABLE_ACTIVE_SERIES_COUNT.dec();
62 }
63}
64
65impl SimpleBulkMemtable {
66 pub fn new(
67 id: MemtableId,
68 region_metadata: RegionMetadataRef,
69 write_buffer_manager: Option<WriteBufferManagerRef>,
70 dedup: bool,
71 merge_mode: MergeMode,
72 ) -> Self {
73 let dedup = if merge_mode == MergeMode::LastNonNull {
74 false
75 } else {
76 dedup
77 };
78 let series = RwLock::new(Series::with_capacity(®ion_metadata, 1024, 8192));
79
80 Self {
81 id,
82 region_metadata,
83 alloc_tracker: AllocTracker::new(write_buffer_manager),
84 max_timestamp: AtomicI64::new(i64::MIN),
85 min_timestamp: AtomicI64::new(i64::MAX),
86 max_sequence: AtomicU64::new(0),
87 dedup,
88 merge_mode,
89 num_rows: AtomicUsize::new(0),
90 series,
91 }
92 }
93
94 fn build_projection(&self, projection: Option<&[ColumnId]>) -> HashSet<ColumnId> {
95 if let Some(projection) = projection {
96 projection.iter().copied().collect()
97 } else {
98 self.region_metadata
99 .field_columns()
100 .map(|c| c.column_id)
101 .collect()
102 }
103 }
104
105 fn write_key_value(&self, kv: KeyValue, stats: &mut WriteMetrics) {
106 let ts = kv.timestamp();
107 let sequence = kv.sequence();
108 let op_type = kv.op_type();
109 let mut series = self.series.write().unwrap();
110 let size = series.push(ts, sequence, op_type, kv.fields());
111 stats.value_bytes += size;
112 let ts = kv
114 .timestamp()
115 .try_into_timestamp()
116 .unwrap()
117 .unwrap()
118 .value();
119 stats.min_ts = stats.min_ts.min(ts);
120 stats.max_ts = stats.max_ts.max(ts);
121 }
122
123 fn update_stats(&self, stats: WriteMetrics) {
125 self.alloc_tracker
126 .on_allocation(stats.key_bytes + stats.value_bytes);
127 self.num_rows.fetch_add(stats.num_rows, Ordering::SeqCst);
128 self.max_timestamp.fetch_max(stats.max_ts, Ordering::SeqCst);
129 self.min_timestamp.fetch_min(stats.min_ts, Ordering::SeqCst);
130 self.max_sequence
131 .fetch_max(stats.max_sequence, Ordering::SeqCst);
132 }
133
134 #[cfg(test)]
135 fn schema(&self) -> &RegionMetadataRef {
136 &self.region_metadata
137 }
138}
139
140impl Debug for SimpleBulkMemtable {
141 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
142 f.debug_struct("SimpleBulkMemtable").finish()
143 }
144}
145
146impl Memtable for SimpleBulkMemtable {
147 fn id(&self) -> MemtableId {
148 self.id
149 }
150
151 fn write(&self, kvs: &KeyValues) -> error::Result<()> {
152 let mut stats = WriteMetrics::default();
153 let max_sequence = kvs.max_sequence();
154 for kv in kvs.iter() {
155 self.write_key_value(kv, &mut stats);
156 }
157 stats.max_sequence = max_sequence;
158 stats.num_rows = kvs.num_rows();
159 self.update_stats(stats);
160 Ok(())
161 }
162
163 fn write_one(&self, kv: KeyValue) -> error::Result<()> {
164 debug_assert_eq!(0, kv.num_primary_keys());
165 let mut stats = WriteMetrics::default();
166 self.write_key_value(kv, &mut stats);
167 stats.num_rows = 1;
168 stats.max_sequence = kv.sequence();
169 self.update_stats(stats);
170 Ok(())
171 }
172
173 fn write_bulk(&self, part: BulkPart) -> error::Result<()> {
174 let rb = &part.batch;
175
176 let ts = Helper::try_into_vector(
177 rb.column_by_name(&self.region_metadata.time_index_column().column_schema.name)
178 .with_context(|| error::InvalidRequestSnafu {
179 region_id: self.region_metadata.region_id,
180 reason: "Timestamp not found",
181 })?,
182 )
183 .context(error::ConvertVectorSnafu)?;
184
185 let sequence = part.sequence;
186
187 let fields: Vec<_> = self
188 .region_metadata
189 .field_columns()
190 .map(|f| {
191 let array = rb.column_by_name(&f.column_schema.name).ok_or_else(|| {
192 error::InvalidRequestSnafu {
193 region_id: self.region_metadata.region_id,
194 reason: format!("Column {} not found", f.column_schema.name),
195 }
196 .build()
197 })?;
198 Helper::try_into_vector(array).context(error::ConvertVectorSnafu)
199 })
200 .collect::<error::Result<Vec<_>>>()?;
201
202 let mut series = self.series.write().unwrap();
203 let extend_timer = metrics::REGION_WORKER_HANDLE_WRITE_ELAPSED
204 .with_label_values(&["bulk_extend"])
205 .start_timer();
206 series.extend(ts, OpType::Put as u8, sequence, fields)?;
207 extend_timer.observe_duration();
208
209 self.update_stats(WriteMetrics {
210 key_bytes: 0,
211 value_bytes: part.estimated_size(),
212 min_ts: part.min_timestamp,
213 max_ts: part.max_timestamp,
214 num_rows: part.num_rows(),
215 max_sequence: sequence,
216 });
217 Ok(())
218 }
219
220 #[cfg(any(test, feature = "test"))]
221 fn iter(
222 &self,
223 projection: Option<&[ColumnId]>,
224 _predicate: Option<table::predicate::Predicate>,
225 sequence: Option<store_api::storage::SequenceRange>,
226 ) -> error::Result<BoxedBatchIterator> {
227 let iter = self.create_iter(projection, sequence)?.build(None)?;
228
229 if self.merge_mode == MergeMode::LastNonNull {
230 let iter = LastNonNullIter::new(iter);
231 Ok(Box::new(iter))
232 } else {
233 Ok(Box::new(iter))
234 }
235 }
236
237 fn ranges(
238 &self,
239 projection: Option<&[ColumnId]>,
240 options: RangesOptions,
241 ) -> error::Result<MemtableRanges> {
242 let predicate = options.predicate;
243 let sequence = options.sequence;
244 let start_time = Instant::now();
245 let projection = Arc::new(self.build_projection(projection));
246 let values = self.series.read().unwrap().read_to_values();
247 let contexts = values
248 .into_par_iter()
249 .filter_map(|v| {
250 let filtered = match v
251 .to_batch(&[], &self.region_metadata, &projection, self.dedup)
252 .and_then(|mut b| {
253 b.filter_by_sequence(sequence)?;
254 Ok(b)
255 }) {
256 Ok(filtered) => filtered,
257 Err(e) => {
258 return Some(Err(e));
259 }
260 };
261 if filtered.is_empty() {
262 None
263 } else {
264 Some(Ok(filtered))
265 }
266 })
267 .map(|result| {
268 result.map(|batch| {
269 let num_rows = batch.num_rows();
270 let builder = BatchRangeBuilder {
271 batch,
272 merge_mode: self.merge_mode,
273 scan_cost: start_time.elapsed(),
274 };
275 (
276 num_rows,
277 Arc::new(MemtableRangeContext::new(
278 self.id,
279 Box::new(builder),
280 predicate.clone(),
281 )),
282 )
283 })
284 })
285 .collect::<error::Result<Vec<_>>>()?;
286
287 let ranges = contexts
288 .into_iter()
289 .enumerate()
290 .map(|(idx, (num_rows, context))| (idx, MemtableRange::new(context, num_rows)))
291 .collect();
292
293 Ok(MemtableRanges {
294 ranges,
295 stats: self.stats(),
296 })
297 }
298
299 fn is_empty(&self) -> bool {
300 self.series.read().unwrap().is_empty()
301 }
302
303 fn freeze(&self) -> error::Result<()> {
304 self.series.write().unwrap().freeze(&self.region_metadata);
305 Ok(())
306 }
307
308 fn stats(&self) -> MemtableStats {
309 let estimated_bytes = self.alloc_tracker.bytes_allocated();
310 let num_rows = self.num_rows.load(Ordering::Relaxed);
311 if num_rows == 0 {
312 return MemtableStats {
314 estimated_bytes,
315 time_range: None,
316 num_rows: 0,
317 num_ranges: 0,
318 max_sequence: 0,
319 series_count: 0,
320 };
321 }
322 let ts_type = self
323 .region_metadata
324 .time_index_column()
325 .column_schema
326 .data_type
327 .clone()
328 .as_timestamp()
329 .expect("Timestamp column must have timestamp type");
330 let max_timestamp = ts_type.create_timestamp(self.max_timestamp.load(Ordering::Relaxed));
331 let min_timestamp = ts_type.create_timestamp(self.min_timestamp.load(Ordering::Relaxed));
332 MemtableStats {
333 estimated_bytes,
334 time_range: Some((min_timestamp, max_timestamp)),
335 num_rows,
336 num_ranges: 1,
337 max_sequence: self.max_sequence.load(Ordering::Relaxed),
338 series_count: 1,
339 }
340 }
341
342 fn fork(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef {
343 Arc::new(Self::new(
344 id,
345 metadata.clone(),
346 self.alloc_tracker.write_buffer_manager(),
347 self.dedup,
348 self.merge_mode,
349 ))
350 }
351}
352
353#[derive(Clone)]
354pub struct BatchRangeBuilder {
355 pub batch: Batch,
356 pub merge_mode: MergeMode,
357 scan_cost: Duration,
358}
359
360impl IterBuilder for BatchRangeBuilder {
361 fn build(&self, metrics: Option<MemScanMetrics>) -> error::Result<BoxedBatchIterator> {
362 let batch = self.batch.clone();
363 if let Some(metrics) = metrics {
364 let inner = crate::memtable::MemScanMetricsData {
365 total_series: 1,
366 num_rows: batch.num_rows(),
367 num_batches: 1,
368 scan_cost: self.scan_cost,
369 };
370 metrics.merge_inner(&inner);
371 }
372
373 let iter = Iter {
374 batch: Some(Ok(batch)),
375 };
376
377 if self.merge_mode == MergeMode::LastNonNull {
378 Ok(Box::new(LastNonNullIter::new(iter)))
379 } else {
380 Ok(Box::new(iter))
381 }
382 }
383}
384
385struct Iter {
386 batch: Option<error::Result<Batch>>,
387}
388
389impl Iterator for Iter {
390 type Item = error::Result<Batch>;
391
392 fn next(&mut self) -> Option<Self::Item> {
393 self.batch.take()
394 }
395}
396
397#[cfg(test)]
398mod tests {
399 use std::sync::Arc;
400
401 use api::v1::helper::row;
402 use api::v1::value::ValueData;
403 use api::v1::{Mutation, OpType, Rows, SemanticType};
404 use common_recordbatch::DfRecordBatch;
405 use common_time::Timestamp;
406 use datatypes::arrow::array::{ArrayRef, Float64Array, RecordBatch, TimestampMillisecondArray};
407 use datatypes::arrow_array::StringArray;
408 use datatypes::data_type::ConcreteDataType;
409 use datatypes::prelude::{ScalarVector, Vector};
410 use datatypes::schema::ColumnSchema;
411 use datatypes::value::Value;
412 use datatypes::vectors::TimestampMillisecondVector;
413 use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
414 use store_api::storage::{RegionId, SequenceNumber, SequenceRange};
415
416 use super::*;
417 use crate::read;
418 use crate::read::dedup::DedupReader;
419 use crate::read::merge::MergeReaderBuilder;
420 use crate::read::{BatchReader, Source};
421 use crate::region::options::MergeMode;
422 use crate::test_util::column_metadata_to_column_schema;
423
424 fn new_test_metadata() -> RegionMetadataRef {
425 let mut builder = RegionMetadataBuilder::new(1.into());
426 builder
427 .push_column_metadata(ColumnMetadata {
428 column_schema: ColumnSchema::new(
429 "ts",
430 ConcreteDataType::timestamp_millisecond_datatype(),
431 false,
432 ),
433 semantic_type: SemanticType::Timestamp,
434 column_id: 1,
435 })
436 .push_column_metadata(ColumnMetadata {
437 column_schema: ColumnSchema::new("f1", ConcreteDataType::float64_datatype(), true),
438 semantic_type: SemanticType::Field,
439 column_id: 2,
440 })
441 .push_column_metadata(ColumnMetadata {
442 column_schema: ColumnSchema::new("f2", ConcreteDataType::string_datatype(), true),
443 semantic_type: SemanticType::Field,
444 column_id: 3,
445 });
446 Arc::new(builder.build().unwrap())
447 }
448
449 fn new_test_memtable(dedup: bool, merge_mode: MergeMode) -> SimpleBulkMemtable {
450 SimpleBulkMemtable::new(1, new_test_metadata(), None, dedup, merge_mode)
451 }
452
453 fn build_key_values(
454 metadata: &RegionMetadataRef,
455 sequence: SequenceNumber,
456 row_values: &[(i64, f64, String)],
457 op_type: OpType,
458 ) -> KeyValues {
459 let column_schemas: Vec<_> = metadata
460 .column_metadatas
461 .iter()
462 .map(column_metadata_to_column_schema)
463 .collect();
464
465 let rows: Vec<_> = row_values
466 .iter()
467 .map(|(ts, f1, f2)| {
468 row(vec![
469 ValueData::TimestampMillisecondValue(*ts),
470 ValueData::F64Value(*f1),
471 ValueData::StringValue(f2.clone()),
472 ])
473 })
474 .collect();
475 let mutation = Mutation {
476 op_type: op_type as i32,
477 sequence,
478 rows: Some(Rows {
479 schema: column_schemas,
480 rows,
481 }),
482 write_hint: None,
483 };
484 KeyValues::new(metadata, mutation).unwrap()
485 }
486
487 #[test]
488 fn test_write_and_iter() {
489 let memtable = new_test_memtable(false, MergeMode::LastRow);
490 memtable
491 .write(&build_key_values(
492 &memtable.region_metadata,
493 0,
494 &[(1, 1.0, "a".to_string())],
495 OpType::Put,
496 ))
497 .unwrap();
498 memtable
499 .write(&build_key_values(
500 &memtable.region_metadata,
501 1,
502 &[(2, 2.0, "b".to_string())],
503 OpType::Put,
504 ))
505 .unwrap();
506
507 let mut iter = memtable.iter(None, None, None).unwrap();
508 let batch = iter.next().unwrap().unwrap();
509 assert_eq!(2, batch.num_rows());
510 assert_eq!(2, batch.fields().len());
511 let ts_v = batch
512 .timestamps()
513 .as_any()
514 .downcast_ref::<TimestampMillisecondVector>()
515 .unwrap();
516 assert_eq!(Value::Timestamp(Timestamp::new_millisecond(1)), ts_v.get(0));
517 assert_eq!(Value::Timestamp(Timestamp::new_millisecond(2)), ts_v.get(1));
518 }
519
520 #[test]
521 fn test_projection() {
522 let memtable = new_test_memtable(false, MergeMode::LastRow);
523 memtable
524 .write(&build_key_values(
525 &memtable.region_metadata,
526 0,
527 &[(1, 1.0, "a".to_string())],
528 OpType::Put,
529 ))
530 .unwrap();
531
532 let mut iter = memtable.iter(None, None, None).unwrap();
533 let batch = iter.next().unwrap().unwrap();
534 assert_eq!(1, batch.num_rows());
535 assert_eq!(2, batch.fields().len());
536
537 let ts_v = batch
538 .timestamps()
539 .as_any()
540 .downcast_ref::<TimestampMillisecondVector>()
541 .unwrap();
542 assert_eq!(Value::Timestamp(Timestamp::new_millisecond(1)), ts_v.get(0));
543
544 let projection = vec![2];
546 let mut iter = memtable.iter(Some(&projection), None, None).unwrap();
547 let batch = iter.next().unwrap().unwrap();
548
549 assert_eq!(1, batch.num_rows());
550 assert_eq!(1, batch.fields().len()); assert_eq!(2, batch.fields()[0].column_id);
552 }
553
554 #[test]
555 fn test_dedup() {
556 let memtable = new_test_memtable(true, MergeMode::LastRow);
557 memtable
558 .write(&build_key_values(
559 &memtable.region_metadata,
560 0,
561 &[(1, 1.0, "a".to_string())],
562 OpType::Put,
563 ))
564 .unwrap();
565 memtable
566 .write(&build_key_values(
567 &memtable.region_metadata,
568 1,
569 &[(1, 2.0, "b".to_string())],
570 OpType::Put,
571 ))
572 .unwrap();
573 let mut iter = memtable.iter(None, None, None).unwrap();
574 let batch = iter.next().unwrap().unwrap();
575
576 assert_eq!(1, batch.num_rows()); assert_eq!(2.0, batch.fields()[0].data.get(0).as_f64_lossy().unwrap()); }
579
580 #[test]
581 fn test_write_one() {
582 let memtable = new_test_memtable(false, MergeMode::LastRow);
583 let kvs = build_key_values(
584 &memtable.region_metadata,
585 0,
586 &[(1, 1.0, "a".to_string())],
587 OpType::Put,
588 );
589 let kv = kvs.iter().next().unwrap();
590 memtable.write_one(kv).unwrap();
591
592 let mut iter = memtable.iter(None, None, None).unwrap();
593 let batch = iter.next().unwrap().unwrap();
594 assert_eq!(1, batch.num_rows());
595 }
596
597 #[tokio::test]
598 async fn test_write_dedup() {
599 let memtable = new_test_memtable(true, MergeMode::LastRow);
600 let kvs = build_key_values(
601 &memtable.region_metadata,
602 0,
603 &[(1, 1.0, "a".to_string())],
604 OpType::Put,
605 );
606 let kv = kvs.iter().next().unwrap();
607 memtable.write_one(kv).unwrap();
608 memtable.freeze().unwrap();
609
610 let kvs = build_key_values(
611 &memtable.region_metadata,
612 1,
613 &[(1, 1.0, "a".to_string())],
614 OpType::Delete,
615 );
616 let kv = kvs.iter().next().unwrap();
617 memtable.write_one(kv).unwrap();
618
619 let ranges = memtable.ranges(None, RangesOptions::default()).unwrap();
620 let mut source = vec![];
621 for r in ranges.ranges.values() {
622 source.push(Source::Iter(r.build_iter().unwrap()));
623 }
624
625 let reader = MergeReaderBuilder::from_sources(source)
626 .build()
627 .await
628 .unwrap();
629
630 let mut reader = DedupReader::new(reader, read::dedup::LastRow::new(false));
631 let mut num_rows = 0;
632 while let Some(b) = reader.next_batch().await.unwrap() {
633 num_rows += b.num_rows();
634 }
635 assert_eq!(num_rows, 1);
636 }
637
638 #[tokio::test]
639 async fn test_delete_only() {
640 let memtable = new_test_memtable(true, MergeMode::LastRow);
641 let kvs = build_key_values(
642 &memtable.region_metadata,
643 0,
644 &[(1, 1.0, "a".to_string())],
645 OpType::Delete,
646 );
647 let kv = kvs.iter().next().unwrap();
648 memtable.write_one(kv).unwrap();
649 memtable.freeze().unwrap();
650
651 let ranges = memtable.ranges(None, RangesOptions::default()).unwrap();
652 let mut source = vec![];
653 for r in ranges.ranges.values() {
654 source.push(Source::Iter(r.build_iter().unwrap()));
655 }
656
657 let reader = MergeReaderBuilder::from_sources(source)
658 .build()
659 .await
660 .unwrap();
661
662 let mut reader = DedupReader::new(reader, read::dedup::LastRow::new(false));
663 let mut num_rows = 0;
664 while let Some(b) = reader.next_batch().await.unwrap() {
665 num_rows += b.num_rows();
666 assert_eq!(b.num_rows(), 1);
667 assert_eq!(b.op_types().get_data(0).unwrap(), OpType::Delete as u8);
668 }
669 assert_eq!(num_rows, 1);
670 }
671
672 #[tokio::test]
673 async fn test_single_range() {
674 let memtable = new_test_memtable(true, MergeMode::LastRow);
675 let kvs = build_key_values(
676 &memtable.region_metadata,
677 0,
678 &[(1, 1.0, "a".to_string())],
679 OpType::Put,
680 );
681 memtable.write_one(kvs.iter().next().unwrap()).unwrap();
682
683 let kvs = build_key_values(
684 &memtable.region_metadata,
685 1,
686 &[(1, 2.0, "b".to_string())],
687 OpType::Put,
688 );
689 memtable.write_one(kvs.iter().next().unwrap()).unwrap();
690 memtable.freeze().unwrap();
691
692 let ranges = memtable.ranges(None, RangesOptions::default()).unwrap();
693 assert_eq!(ranges.ranges.len(), 1);
694 let range = ranges.ranges.into_values().next().unwrap();
695 let mut reader = range.context.builder.build(None).unwrap();
696
697 let mut num_rows = 0;
698 while let Some(b) = reader.next().transpose().unwrap() {
699 num_rows += b.num_rows();
700 assert_eq!(b.fields()[1].data.get(0).as_string(), Some("b".to_string()));
701 }
702 assert_eq!(num_rows, 1);
703 }
704
705 #[test]
706 fn test_write_bulk() {
707 let memtable = new_test_memtable(false, MergeMode::LastRow);
708 let arrow_schema = memtable.schema().schema.arrow_schema().clone();
709 let arrays = vec![
710 Arc::new(TimestampMillisecondArray::from(vec![1, 2])) as ArrayRef,
711 Arc::new(Float64Array::from(vec![1.0, 2.0])) as ArrayRef,
712 Arc::new(StringArray::from(vec!["a", "b"])) as ArrayRef,
713 ];
714 let rb = DfRecordBatch::try_new(arrow_schema, arrays).unwrap();
715
716 let part = BulkPart {
717 batch: rb,
718 sequence: 1,
719 min_timestamp: 1,
720 max_timestamp: 2,
721 timestamp_index: 0,
722 raw_data: None,
723 };
724 memtable.write_bulk(part).unwrap();
725
726 let mut iter = memtable.iter(None, None, None).unwrap();
727 let batch = iter.next().unwrap().unwrap();
728 assert_eq!(2, batch.num_rows());
729
730 let stats = memtable.stats();
731 assert_eq!(1, stats.max_sequence);
732 assert_eq!(2, stats.num_rows);
733 assert_eq!(
734 Some((Timestamp::new_millisecond(1), Timestamp::new_millisecond(2))),
735 stats.time_range
736 );
737
738 let kvs = build_key_values(
739 &memtable.region_metadata,
740 2,
741 &[(3, 3.0, "c".to_string())],
742 OpType::Put,
743 );
744 memtable.write(&kvs).unwrap();
745 let mut iter = memtable.iter(None, None, None).unwrap();
746 let batch = iter.next().unwrap().unwrap();
747 assert_eq!(3, batch.num_rows());
748 assert_eq!(
749 vec![1, 2, 3],
750 batch
751 .timestamps()
752 .as_any()
753 .downcast_ref::<TimestampMillisecondVector>()
754 .unwrap()
755 .iter_data()
756 .map(|t| { t.unwrap().0.value() })
757 .collect::<Vec<_>>()
758 );
759 }
760
761 #[test]
762 fn test_is_empty() {
763 let memtable = new_test_memtable(false, MergeMode::LastRow);
764 assert!(memtable.is_empty());
765
766 memtable
767 .write(&build_key_values(
768 &memtable.region_metadata,
769 0,
770 &[(1, 1.0, "a".to_string())],
771 OpType::Put,
772 ))
773 .unwrap();
774 assert!(!memtable.is_empty());
775 }
776
777 #[test]
778 fn test_stats() {
779 let memtable = new_test_memtable(false, MergeMode::LastRow);
780 let stats = memtable.stats();
781 assert_eq!(0, stats.num_rows);
782 assert!(stats.time_range.is_none());
783
784 memtable
785 .write(&build_key_values(
786 &memtable.region_metadata,
787 0,
788 &[(1, 1.0, "a".to_string())],
789 OpType::Put,
790 ))
791 .unwrap();
792 let stats = memtable.stats();
793 assert_eq!(1, stats.num_rows);
794 assert!(stats.time_range.is_some());
795 }
796
797 #[test]
798 fn test_fork() {
799 let memtable = new_test_memtable(false, MergeMode::LastRow);
800 memtable
801 .write(&build_key_values(
802 &memtable.region_metadata,
803 0,
804 &[(1, 1.0, "a".to_string())],
805 OpType::Put,
806 ))
807 .unwrap();
808
809 let forked = memtable.fork(2, &memtable.region_metadata);
810 assert!(forked.is_empty());
811 }
812
813 #[test]
814 fn test_sequence_filter() {
815 let memtable = new_test_memtable(false, MergeMode::LastRow);
816 memtable
817 .write(&build_key_values(
818 &memtable.region_metadata,
819 0,
820 &[(1, 1.0, "a".to_string())],
821 OpType::Put,
822 ))
823 .unwrap();
824 memtable
825 .write(&build_key_values(
826 &memtable.region_metadata,
827 1,
828 &[(2, 2.0, "b".to_string())],
829 OpType::Put,
830 ))
831 .unwrap();
832
833 let mut iter = memtable
835 .iter(None, None, Some(SequenceRange::LtEq { max: 0 }))
836 .unwrap();
837 let batch = iter.next().unwrap().unwrap();
838 assert_eq!(1, batch.num_rows());
839 assert_eq!(1.0, batch.fields()[0].data.get(0).as_f64_lossy().unwrap());
840 }
841
842 fn rb_with_large_string(
843 ts: i64,
844 string_len: i32,
845 region_meta: &RegionMetadataRef,
846 ) -> RecordBatch {
847 let schema = region_meta.schema.arrow_schema().clone();
848 RecordBatch::try_new(
849 schema,
850 vec![
851 Arc::new(StringArray::from_iter_values(
852 ["a".repeat(string_len as usize).clone()].into_iter(),
853 )) as ArrayRef,
854 Arc::new(TimestampMillisecondArray::from_iter_values(
855 [ts].into_iter(),
856 )) as ArrayRef,
857 ],
858 )
859 .unwrap()
860 }
861
862 #[tokio::test]
863 async fn test_write_read_large_string() {
864 let mut builder = RegionMetadataBuilder::new(RegionId::new(123, 456));
865 builder
866 .push_column_metadata(ColumnMetadata {
867 column_schema: ColumnSchema::new("k0", ConcreteDataType::string_datatype(), false),
868 semantic_type: SemanticType::Field,
869 column_id: 0,
870 })
871 .push_column_metadata(ColumnMetadata {
872 column_schema: ColumnSchema::new(
873 "ts",
874 ConcreteDataType::timestamp_millisecond_datatype(),
875 false,
876 ),
877 semantic_type: SemanticType::Timestamp,
878 column_id: 1,
879 })
880 .primary_key(vec![]);
881 let region_meta = Arc::new(builder.build().unwrap());
882 let memtable =
883 SimpleBulkMemtable::new(0, region_meta.clone(), None, true, MergeMode::LastRow);
884 memtable
885 .write_bulk(BulkPart {
886 batch: rb_with_large_string(0, i32::MAX, ®ion_meta),
887 max_timestamp: 0,
888 min_timestamp: 0,
889 sequence: 0,
890 timestamp_index: 1,
891 raw_data: None,
892 })
893 .unwrap();
894
895 memtable.freeze().unwrap();
896 memtable
897 .write_bulk(BulkPart {
898 batch: rb_with_large_string(1, 3, ®ion_meta),
899 max_timestamp: 1,
900 min_timestamp: 1,
901 sequence: 1,
902 timestamp_index: 1,
903 raw_data: None,
904 })
905 .unwrap();
906 let MemtableRanges { ranges, .. } =
907 memtable.ranges(None, RangesOptions::default()).unwrap();
908 let mut source = if ranges.len() == 1 {
909 let only_range = ranges.into_values().next().unwrap();
910 Source::Iter(only_range.build_iter().unwrap())
911 } else {
912 let sources = ranges
913 .into_values()
914 .map(|r| r.build_iter().map(Source::Iter))
915 .collect::<error::Result<Vec<_>>>()
916 .unwrap();
917 let merge_reader = MergeReaderBuilder::from_sources(sources)
918 .build()
919 .await
920 .unwrap();
921 Source::Reader(Box::new(merge_reader))
922 };
923
924 let mut rows = 0;
925 while let Some(b) = source.next_batch().await.unwrap() {
926 rows += b.num_rows();
927 }
928 assert_eq!(rows, 2);
929 }
930}