1#[cfg(any(test, feature = "test"))]
16mod test_only;
17
18use std::collections::HashSet;
19use std::fmt::{Debug, Formatter};
20use std::sync::atomic::{AtomicI64, AtomicU64, AtomicUsize, Ordering};
21use std::sync::{Arc, RwLock};
22use std::time::{Duration, Instant};
23
24use api::v1::OpType;
25use datatypes::vectors::Helper;
26use mito_codec::key_values::KeyValue;
27use rayon::prelude::*;
28use snafu::{OptionExt, ResultExt};
29use store_api::metadata::RegionMetadataRef;
30use store_api::storage::ColumnId;
31
32use crate::flush::WriteBufferManagerRef;
33use crate::memtable::bulk::part::BulkPart;
34use crate::memtable::stats::WriteMetrics;
35use crate::memtable::time_series::Series;
36use crate::memtable::{
37 AllocTracker, BoxedBatchIterator, IterBuilder, KeyValues, MemScanMetrics, Memtable, MemtableId,
38 MemtableRange, MemtableRangeContext, MemtableRanges, MemtableRef, MemtableStats, RangesOptions,
39};
40use crate::metrics::MEMTABLE_ACTIVE_SERIES_COUNT;
41use crate::read::Batch;
42use crate::read::dedup::LastNonNullIter;
43use crate::region::options::MergeMode;
44use crate::{error, metrics};
45
46pub struct SimpleBulkMemtable {
47 id: MemtableId,
48 region_metadata: RegionMetadataRef,
49 alloc_tracker: AllocTracker,
50 max_timestamp: AtomicI64,
51 min_timestamp: AtomicI64,
52 max_sequence: AtomicU64,
53 dedup: bool,
54 merge_mode: MergeMode,
55 num_rows: AtomicUsize,
56 series: RwLock<Series>,
57}
58
59impl Drop for SimpleBulkMemtable {
60 fn drop(&mut self) {
61 MEMTABLE_ACTIVE_SERIES_COUNT.dec();
62 }
63}
64
65impl SimpleBulkMemtable {
66 pub fn new(
67 id: MemtableId,
68 region_metadata: RegionMetadataRef,
69 write_buffer_manager: Option<WriteBufferManagerRef>,
70 dedup: bool,
71 merge_mode: MergeMode,
72 ) -> Self {
73 let series = RwLock::new(Series::with_capacity(®ion_metadata, 1024, 8192));
74
75 Self {
76 id,
77 region_metadata,
78 alloc_tracker: AllocTracker::new(write_buffer_manager),
79 max_timestamp: AtomicI64::new(i64::MIN),
80 min_timestamp: AtomicI64::new(i64::MAX),
81 max_sequence: AtomicU64::new(0),
82 dedup,
83 merge_mode,
84 num_rows: AtomicUsize::new(0),
85 series,
86 }
87 }
88
89 fn build_projection(&self, projection: Option<&[ColumnId]>) -> HashSet<ColumnId> {
90 if let Some(projection) = projection {
91 projection.iter().copied().collect()
92 } else {
93 self.region_metadata
94 .field_columns()
95 .map(|c| c.column_id)
96 .collect()
97 }
98 }
99
100 fn write_key_value(&self, kv: KeyValue, stats: &mut WriteMetrics) {
101 let ts = kv.timestamp();
102 let sequence = kv.sequence();
103 let op_type = kv.op_type();
104 let mut series = self.series.write().unwrap();
105 let size = series.push(ts, sequence, op_type, kv.fields());
106 stats.value_bytes += size;
107 let ts = kv
109 .timestamp()
110 .try_into_timestamp()
111 .unwrap()
112 .unwrap()
113 .value();
114 stats.min_ts = stats.min_ts.min(ts);
115 stats.max_ts = stats.max_ts.max(ts);
116 }
117
118 fn update_stats(&self, stats: WriteMetrics) {
120 self.alloc_tracker
121 .on_allocation(stats.key_bytes + stats.value_bytes);
122 self.num_rows.fetch_add(stats.num_rows, Ordering::SeqCst);
123 self.max_timestamp.fetch_max(stats.max_ts, Ordering::SeqCst);
124 self.min_timestamp.fetch_min(stats.min_ts, Ordering::SeqCst);
125 self.max_sequence
126 .fetch_max(stats.max_sequence, Ordering::SeqCst);
127 }
128
129 #[cfg(test)]
130 fn schema(&self) -> &RegionMetadataRef {
131 &self.region_metadata
132 }
133}
134
135impl Debug for SimpleBulkMemtable {
136 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
137 f.debug_struct("SimpleBulkMemtable").finish()
138 }
139}
140
141impl Memtable for SimpleBulkMemtable {
142 fn id(&self) -> MemtableId {
143 self.id
144 }
145
146 fn write(&self, kvs: &KeyValues) -> error::Result<()> {
147 let mut stats = WriteMetrics::default();
148 let max_sequence = kvs.max_sequence();
149 for kv in kvs.iter() {
150 self.write_key_value(kv, &mut stats);
151 }
152 stats.max_sequence = max_sequence;
153 stats.num_rows = kvs.num_rows();
154 self.update_stats(stats);
155 Ok(())
156 }
157
158 fn write_one(&self, kv: KeyValue) -> error::Result<()> {
159 debug_assert_eq!(0, kv.num_primary_keys());
160 let mut stats = WriteMetrics::default();
161 self.write_key_value(kv, &mut stats);
162 stats.num_rows = 1;
163 stats.max_sequence = kv.sequence();
164 self.update_stats(stats);
165 Ok(())
166 }
167
168 fn write_bulk(&self, part: BulkPart) -> error::Result<()> {
169 let rb = &part.batch;
170
171 let ts = Helper::try_into_vector(
172 rb.column_by_name(&self.region_metadata.time_index_column().column_schema.name)
173 .with_context(|| error::InvalidRequestSnafu {
174 region_id: self.region_metadata.region_id,
175 reason: "Timestamp not found",
176 })?,
177 )
178 .context(error::ConvertVectorSnafu)?;
179
180 let sequence = part.sequence;
181
182 let fields: Vec<_> = self
183 .region_metadata
184 .field_columns()
185 .map(|f| {
186 let array = rb.column_by_name(&f.column_schema.name).ok_or_else(|| {
187 error::InvalidRequestSnafu {
188 region_id: self.region_metadata.region_id,
189 reason: format!("Column {} not found", f.column_schema.name),
190 }
191 .build()
192 })?;
193 Helper::try_into_vector(array).context(error::ConvertVectorSnafu)
194 })
195 .collect::<error::Result<Vec<_>>>()?;
196
197 let mut series = self.series.write().unwrap();
198 let extend_timer = metrics::REGION_WORKER_HANDLE_WRITE_ELAPSED
199 .with_label_values(&["bulk_extend"])
200 .start_timer();
201 series.extend(ts, OpType::Put as u8, sequence, fields)?;
202 extend_timer.observe_duration();
203
204 self.update_stats(WriteMetrics {
205 key_bytes: 0,
206 value_bytes: part.estimated_size(),
207 min_ts: part.min_timestamp,
208 max_ts: part.max_timestamp,
209 num_rows: part.num_rows(),
210 max_sequence: sequence,
211 });
212 Ok(())
213 }
214
215 #[cfg(any(test, feature = "test"))]
216 fn iter(
217 &self,
218 projection: Option<&[ColumnId]>,
219 _predicate: Option<table::predicate::Predicate>,
220 sequence: Option<store_api::storage::SequenceRange>,
221 ) -> error::Result<BoxedBatchIterator> {
222 let iter = self.create_iter(projection, sequence)?.build(None)?;
223 if self.merge_mode == MergeMode::LastNonNull {
224 let iter = LastNonNullIter::new(iter);
225 Ok(Box::new(iter))
226 } else {
227 Ok(Box::new(iter))
228 }
229 }
230
231 fn ranges(
232 &self,
233 projection: Option<&[ColumnId]>,
234 options: RangesOptions,
235 ) -> error::Result<MemtableRanges> {
236 let predicate = options.predicate;
237 let sequence = options.sequence;
238 let start_time = Instant::now();
239 let projection = Arc::new(self.build_projection(projection));
240
241 let max_sequence = self.max_sequence.load(Ordering::Relaxed);
243 let time_range = {
244 let num_rows = self.num_rows.load(Ordering::Relaxed);
245 if num_rows > 0 {
246 let ts_type = self.region_metadata.time_index_type();
247 let max_timestamp =
248 ts_type.create_timestamp(self.max_timestamp.load(Ordering::Relaxed));
249 let min_timestamp =
250 ts_type.create_timestamp(self.min_timestamp.load(Ordering::Relaxed));
251 Some((min_timestamp, max_timestamp))
252 } else {
253 None
254 }
255 };
256
257 let values = self.series.read().unwrap().read_to_values();
258 let contexts = values
259 .into_par_iter()
260 .filter_map(|v| {
261 let filtered = match v.to_batch(
262 &[],
263 &self.region_metadata,
264 &projection,
265 sequence,
266 self.dedup,
267 self.merge_mode,
268 ) {
269 Ok(filtered) => filtered,
270 Err(e) => {
271 return Some(Err(e));
272 }
273 };
274 if filtered.is_empty() {
275 None
276 } else {
277 Some(Ok(filtered))
278 }
279 })
280 .map(|result| {
281 result.map(|batch| {
282 let num_rows = batch.num_rows();
283 let estimated_bytes = batch.memory_size();
284
285 let range_stats = MemtableStats {
286 estimated_bytes,
287 time_range,
288 num_rows,
289 num_ranges: 1,
290 max_sequence,
291 series_count: 1,
292 };
293
294 let builder = BatchRangeBuilder {
295 batch,
296 merge_mode: self.merge_mode,
297 scan_cost: start_time.elapsed(),
298 };
299 (
300 range_stats,
301 Arc::new(MemtableRangeContext::new(
302 self.id,
303 Box::new(builder),
304 predicate.clone(),
305 )),
306 )
307 })
308 })
309 .collect::<error::Result<Vec<_>>>()?;
310
311 let ranges = contexts
312 .into_iter()
313 .enumerate()
314 .map(|(idx, (range_stats, context))| (idx, MemtableRange::new(context, range_stats)))
315 .collect();
316
317 Ok(MemtableRanges { ranges })
318 }
319
320 fn is_empty(&self) -> bool {
321 self.series.read().unwrap().is_empty()
322 }
323
324 fn freeze(&self) -> error::Result<()> {
325 self.series.write().unwrap().freeze(&self.region_metadata);
326 Ok(())
327 }
328
329 fn stats(&self) -> MemtableStats {
330 let estimated_bytes = self.alloc_tracker.bytes_allocated();
331 let num_rows = self.num_rows.load(Ordering::Relaxed);
332 if num_rows == 0 {
333 return MemtableStats {
335 estimated_bytes,
336 time_range: None,
337 num_rows: 0,
338 num_ranges: 0,
339 max_sequence: 0,
340 series_count: 0,
341 };
342 }
343 let ts_type = self.region_metadata.time_index_type();
344 let max_timestamp = ts_type.create_timestamp(self.max_timestamp.load(Ordering::Relaxed));
345 let min_timestamp = ts_type.create_timestamp(self.min_timestamp.load(Ordering::Relaxed));
346 MemtableStats {
347 estimated_bytes,
348 time_range: Some((min_timestamp, max_timestamp)),
349 num_rows,
350 num_ranges: 1,
351 max_sequence: self.max_sequence.load(Ordering::Relaxed),
352 series_count: 1,
353 }
354 }
355
356 fn fork(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef {
357 Arc::new(Self::new(
358 id,
359 metadata.clone(),
360 self.alloc_tracker.write_buffer_manager(),
361 self.dedup,
362 self.merge_mode,
363 ))
364 }
365}
366
367#[derive(Clone)]
368pub struct BatchRangeBuilder {
369 pub batch: Batch,
370 pub merge_mode: MergeMode,
371 scan_cost: Duration,
372}
373
374impl IterBuilder for BatchRangeBuilder {
375 fn build(&self, metrics: Option<MemScanMetrics>) -> error::Result<BoxedBatchIterator> {
376 let batch = self.batch.clone();
377 if let Some(metrics) = metrics {
378 let inner = crate::memtable::MemScanMetricsData {
379 total_series: 1,
380 num_rows: batch.num_rows(),
381 num_batches: 1,
382 scan_cost: self.scan_cost,
383 };
384 metrics.merge_inner(&inner);
385 }
386
387 let iter = Iter {
388 batch: Some(Ok(batch)),
389 };
390
391 if self.merge_mode == MergeMode::LastNonNull {
392 Ok(Box::new(LastNonNullIter::new(iter)))
393 } else {
394 Ok(Box::new(iter))
395 }
396 }
397}
398
399struct Iter {
400 batch: Option<error::Result<Batch>>,
401}
402
403impl Iterator for Iter {
404 type Item = error::Result<Batch>;
405
406 fn next(&mut self) -> Option<Self::Item> {
407 self.batch.take()
408 }
409}
410
411#[cfg(test)]
412mod tests {
413 use std::sync::Arc;
414
415 use api::v1::helper::row;
416 use api::v1::value::ValueData;
417 use api::v1::{Mutation, OpType, Rows, SemanticType};
418 use common_recordbatch::DfRecordBatch;
419 use common_time::Timestamp;
420 use datatypes::arrow::array::{ArrayRef, Float64Array, RecordBatch, TimestampMillisecondArray};
421 use datatypes::arrow_array::StringArray;
422 use datatypes::data_type::ConcreteDataType;
423 use datatypes::prelude::{ScalarVector, Vector};
424 use datatypes::schema::ColumnSchema;
425 use datatypes::value::Value;
426 use datatypes::vectors::TimestampMillisecondVector;
427 use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
428 use store_api::storage::{RegionId, SequenceNumber, SequenceRange};
429
430 use super::*;
431 use crate::read;
432 use crate::read::dedup::DedupReader;
433 use crate::read::merge::MergeReaderBuilder;
434 use crate::read::{BatchReader, Source};
435 use crate::region::options::MergeMode;
436 use crate::test_util::column_metadata_to_column_schema;
437
438 fn new_test_metadata() -> RegionMetadataRef {
439 let mut builder = RegionMetadataBuilder::new(1.into());
440 builder
441 .push_column_metadata(ColumnMetadata {
442 column_schema: ColumnSchema::new(
443 "ts",
444 ConcreteDataType::timestamp_millisecond_datatype(),
445 false,
446 ),
447 semantic_type: SemanticType::Timestamp,
448 column_id: 1,
449 })
450 .push_column_metadata(ColumnMetadata {
451 column_schema: ColumnSchema::new("f1", ConcreteDataType::float64_datatype(), true),
452 semantic_type: SemanticType::Field,
453 column_id: 2,
454 })
455 .push_column_metadata(ColumnMetadata {
456 column_schema: ColumnSchema::new("f2", ConcreteDataType::string_datatype(), true),
457 semantic_type: SemanticType::Field,
458 column_id: 3,
459 });
460 Arc::new(builder.build().unwrap())
461 }
462
463 fn new_test_memtable(dedup: bool, merge_mode: MergeMode) -> SimpleBulkMemtable {
464 SimpleBulkMemtable::new(1, new_test_metadata(), None, dedup, merge_mode)
465 }
466
467 fn build_key_values(
468 metadata: &RegionMetadataRef,
469 sequence: SequenceNumber,
470 row_values: &[(i64, f64, String)],
471 op_type: OpType,
472 ) -> KeyValues {
473 let column_schemas: Vec<_> = metadata
474 .column_metadatas
475 .iter()
476 .map(column_metadata_to_column_schema)
477 .collect();
478
479 let rows: Vec<_> = row_values
480 .iter()
481 .map(|(ts, f1, f2)| {
482 row(vec![
483 ValueData::TimestampMillisecondValue(*ts),
484 ValueData::F64Value(*f1),
485 ValueData::StringValue(f2.clone()),
486 ])
487 })
488 .collect();
489 let mutation = Mutation {
490 op_type: op_type as i32,
491 sequence,
492 rows: Some(Rows {
493 schema: column_schemas,
494 rows,
495 }),
496 write_hint: None,
497 };
498 KeyValues::new(metadata, mutation).unwrap()
499 }
500
501 #[test]
502 fn test_write_and_iter() {
503 let memtable = new_test_memtable(false, MergeMode::LastRow);
504 memtable
505 .write(&build_key_values(
506 &memtable.region_metadata,
507 0,
508 &[(1, 1.0, "a".to_string())],
509 OpType::Put,
510 ))
511 .unwrap();
512 memtable
513 .write(&build_key_values(
514 &memtable.region_metadata,
515 1,
516 &[(2, 2.0, "b".to_string())],
517 OpType::Put,
518 ))
519 .unwrap();
520
521 let mut iter = memtable.iter(None, None, None).unwrap();
522 let batch = iter.next().unwrap().unwrap();
523 assert_eq!(2, batch.num_rows());
524 assert_eq!(2, batch.fields().len());
525 let ts_v = batch
526 .timestamps()
527 .as_any()
528 .downcast_ref::<TimestampMillisecondVector>()
529 .unwrap();
530 assert_eq!(Value::Timestamp(Timestamp::new_millisecond(1)), ts_v.get(0));
531 assert_eq!(Value::Timestamp(Timestamp::new_millisecond(2)), ts_v.get(1));
532 }
533
534 #[test]
535 fn test_projection() {
536 let memtable = new_test_memtable(false, MergeMode::LastRow);
537 memtable
538 .write(&build_key_values(
539 &memtable.region_metadata,
540 0,
541 &[(1, 1.0, "a".to_string())],
542 OpType::Put,
543 ))
544 .unwrap();
545
546 let mut iter = memtable.iter(None, None, None).unwrap();
547 let batch = iter.next().unwrap().unwrap();
548 assert_eq!(1, batch.num_rows());
549 assert_eq!(2, batch.fields().len());
550
551 let ts_v = batch
552 .timestamps()
553 .as_any()
554 .downcast_ref::<TimestampMillisecondVector>()
555 .unwrap();
556 assert_eq!(Value::Timestamp(Timestamp::new_millisecond(1)), ts_v.get(0));
557
558 let projection = vec![2];
560 let mut iter = memtable.iter(Some(&projection), None, None).unwrap();
561 let batch = iter.next().unwrap().unwrap();
562
563 assert_eq!(1, batch.num_rows());
564 assert_eq!(1, batch.fields().len()); assert_eq!(2, batch.fields()[0].column_id);
566 }
567
568 #[test]
569 fn test_dedup() {
570 let memtable = new_test_memtable(true, MergeMode::LastRow);
571 memtable
572 .write(&build_key_values(
573 &memtable.region_metadata,
574 0,
575 &[(1, 1.0, "a".to_string())],
576 OpType::Put,
577 ))
578 .unwrap();
579 memtable
580 .write(&build_key_values(
581 &memtable.region_metadata,
582 1,
583 &[(1, 2.0, "b".to_string())],
584 OpType::Put,
585 ))
586 .unwrap();
587 let mut iter = memtable.iter(None, None, None).unwrap();
588 let batch = iter.next().unwrap().unwrap();
589
590 assert_eq!(1, batch.num_rows()); assert_eq!(2.0, batch.fields()[0].data.get(0).as_f64_lossy().unwrap()); }
593
594 #[test]
595 fn test_write_one() {
596 let memtable = new_test_memtable(false, MergeMode::LastRow);
597 let kvs = build_key_values(
598 &memtable.region_metadata,
599 0,
600 &[(1, 1.0, "a".to_string())],
601 OpType::Put,
602 );
603 let kv = kvs.iter().next().unwrap();
604 memtable.write_one(kv).unwrap();
605
606 let mut iter = memtable.iter(None, None, None).unwrap();
607 let batch = iter.next().unwrap().unwrap();
608 assert_eq!(1, batch.num_rows());
609 }
610
611 #[tokio::test]
612 async fn test_write_dedup() {
613 let memtable = new_test_memtable(true, MergeMode::LastRow);
614 let kvs = build_key_values(
615 &memtable.region_metadata,
616 0,
617 &[(1, 1.0, "a".to_string())],
618 OpType::Put,
619 );
620 let kv = kvs.iter().next().unwrap();
621 memtable.write_one(kv).unwrap();
622 memtable.freeze().unwrap();
623
624 let kvs = build_key_values(
625 &memtable.region_metadata,
626 1,
627 &[(1, 1.0, "a".to_string())],
628 OpType::Delete,
629 );
630 let kv = kvs.iter().next().unwrap();
631 memtable.write_one(kv).unwrap();
632
633 let ranges = memtable.ranges(None, RangesOptions::default()).unwrap();
634 let mut source = vec![];
635 for r in ranges.ranges.values() {
636 source.push(Source::Iter(r.build_iter().unwrap()));
637 }
638
639 let reader = MergeReaderBuilder::from_sources(source)
640 .build()
641 .await
642 .unwrap();
643
644 let mut reader = DedupReader::new(reader, read::dedup::LastRow::new(false), None);
645 let mut num_rows = 0;
646 while let Some(b) = reader.next_batch().await.unwrap() {
647 num_rows += b.num_rows();
648 }
649 assert_eq!(num_rows, 1);
650 }
651
652 #[tokio::test]
653 async fn test_delete_only() {
654 let memtable = new_test_memtable(true, MergeMode::LastRow);
655 let kvs = build_key_values(
656 &memtable.region_metadata,
657 0,
658 &[(1, 1.0, "a".to_string())],
659 OpType::Delete,
660 );
661 let kv = kvs.iter().next().unwrap();
662 memtable.write_one(kv).unwrap();
663 memtable.freeze().unwrap();
664
665 let ranges = memtable.ranges(None, RangesOptions::default()).unwrap();
666 let mut source = vec![];
667 for r in ranges.ranges.values() {
668 source.push(Source::Iter(r.build_iter().unwrap()));
669 }
670
671 let reader = MergeReaderBuilder::from_sources(source)
672 .build()
673 .await
674 .unwrap();
675
676 let mut reader = DedupReader::new(reader, read::dedup::LastRow::new(false), None);
677 let mut num_rows = 0;
678 while let Some(b) = reader.next_batch().await.unwrap() {
679 num_rows += b.num_rows();
680 assert_eq!(b.num_rows(), 1);
681 assert_eq!(b.op_types().get_data(0).unwrap(), OpType::Delete as u8);
682 }
683 assert_eq!(num_rows, 1);
684 }
685
686 #[tokio::test]
687 async fn test_single_range() {
688 let memtable = new_test_memtable(true, MergeMode::LastRow);
689 let kvs = build_key_values(
690 &memtable.region_metadata,
691 0,
692 &[(1, 1.0, "a".to_string())],
693 OpType::Put,
694 );
695 memtable.write_one(kvs.iter().next().unwrap()).unwrap();
696
697 let kvs = build_key_values(
698 &memtable.region_metadata,
699 1,
700 &[(1, 2.0, "b".to_string())],
701 OpType::Put,
702 );
703 memtable.write_one(kvs.iter().next().unwrap()).unwrap();
704 memtable.freeze().unwrap();
705
706 let ranges = memtable.ranges(None, RangesOptions::default()).unwrap();
707 assert_eq!(ranges.ranges.len(), 1);
708 let range = ranges.ranges.into_values().next().unwrap();
709 let mut reader = range.context.builder.build(None).unwrap();
710
711 let mut num_rows = 0;
712 while let Some(b) = reader.next().transpose().unwrap() {
713 num_rows += b.num_rows();
714 assert_eq!(b.fields()[1].data.get(0).as_string(), Some("b".to_string()));
715 }
716 assert_eq!(num_rows, 1);
717 }
718
719 #[test]
720 fn test_write_bulk() {
721 let memtable = new_test_memtable(false, MergeMode::LastRow);
722 let arrow_schema = memtable.schema().schema.arrow_schema().clone();
723 let arrays = vec![
724 Arc::new(TimestampMillisecondArray::from(vec![1, 2])) as ArrayRef,
725 Arc::new(Float64Array::from(vec![1.0, 2.0])) as ArrayRef,
726 Arc::new(StringArray::from(vec!["a", "b"])) as ArrayRef,
727 ];
728 let rb = DfRecordBatch::try_new(arrow_schema, arrays).unwrap();
729
730 let part = BulkPart {
731 batch: rb,
732 sequence: 1,
733 min_timestamp: 1,
734 max_timestamp: 2,
735 timestamp_index: 0,
736 raw_data: None,
737 };
738 memtable.write_bulk(part).unwrap();
739
740 let mut iter = memtable.iter(None, None, None).unwrap();
741 let batch = iter.next().unwrap().unwrap();
742 assert_eq!(2, batch.num_rows());
743
744 let stats = memtable.stats();
745 assert_eq!(1, stats.max_sequence);
746 assert_eq!(2, stats.num_rows);
747 assert_eq!(
748 Some((Timestamp::new_millisecond(1), Timestamp::new_millisecond(2))),
749 stats.time_range
750 );
751
752 let kvs = build_key_values(
753 &memtable.region_metadata,
754 2,
755 &[(3, 3.0, "c".to_string())],
756 OpType::Put,
757 );
758 memtable.write(&kvs).unwrap();
759 let mut iter = memtable.iter(None, None, None).unwrap();
760 let batch = iter.next().unwrap().unwrap();
761 assert_eq!(3, batch.num_rows());
762 assert_eq!(
763 vec![1, 2, 3],
764 batch
765 .timestamps()
766 .as_any()
767 .downcast_ref::<TimestampMillisecondVector>()
768 .unwrap()
769 .iter_data()
770 .map(|t| { t.unwrap().0.value() })
771 .collect::<Vec<_>>()
772 );
773 }
774
775 #[test]
776 fn test_is_empty() {
777 let memtable = new_test_memtable(false, MergeMode::LastRow);
778 assert!(memtable.is_empty());
779
780 memtable
781 .write(&build_key_values(
782 &memtable.region_metadata,
783 0,
784 &[(1, 1.0, "a".to_string())],
785 OpType::Put,
786 ))
787 .unwrap();
788 assert!(!memtable.is_empty());
789 }
790
791 #[test]
792 fn test_stats() {
793 let memtable = new_test_memtable(false, MergeMode::LastRow);
794 let stats = memtable.stats();
795 assert_eq!(0, stats.num_rows);
796 assert!(stats.time_range.is_none());
797
798 memtable
799 .write(&build_key_values(
800 &memtable.region_metadata,
801 0,
802 &[(1, 1.0, "a".to_string())],
803 OpType::Put,
804 ))
805 .unwrap();
806 let stats = memtable.stats();
807 assert_eq!(1, stats.num_rows);
808 assert!(stats.time_range.is_some());
809 }
810
811 #[test]
812 fn test_fork() {
813 let memtable = new_test_memtable(false, MergeMode::LastRow);
814 memtable
815 .write(&build_key_values(
816 &memtable.region_metadata,
817 0,
818 &[(1, 1.0, "a".to_string())],
819 OpType::Put,
820 ))
821 .unwrap();
822
823 let forked = memtable.fork(2, &memtable.region_metadata);
824 assert!(forked.is_empty());
825 }
826
827 #[test]
828 fn test_sequence_filter() {
829 let memtable = new_test_memtable(false, MergeMode::LastRow);
830 memtable
831 .write(&build_key_values(
832 &memtable.region_metadata,
833 0,
834 &[(1, 1.0, "a".to_string())],
835 OpType::Put,
836 ))
837 .unwrap();
838 memtable
839 .write(&build_key_values(
840 &memtable.region_metadata,
841 1,
842 &[(2, 2.0, "b".to_string())],
843 OpType::Put,
844 ))
845 .unwrap();
846
847 let mut iter = memtable
849 .iter(None, None, Some(SequenceRange::LtEq { max: 0 }))
850 .unwrap();
851 let batch = iter.next().unwrap().unwrap();
852 assert_eq!(1, batch.num_rows());
853 assert_eq!(1.0, batch.fields()[0].data.get(0).as_f64_lossy().unwrap());
854 }
855
856 fn rb_with_large_string(
857 ts: i64,
858 string_len: i32,
859 region_meta: &RegionMetadataRef,
860 ) -> RecordBatch {
861 let schema = region_meta.schema.arrow_schema().clone();
862 RecordBatch::try_new(
863 schema,
864 vec![
865 Arc::new(StringArray::from_iter_values(
866 ["a".repeat(string_len as usize).clone()].into_iter(),
867 )) as ArrayRef,
868 Arc::new(TimestampMillisecondArray::from_iter_values(
869 [ts].into_iter(),
870 )) as ArrayRef,
871 ],
872 )
873 .unwrap()
874 }
875
876 #[tokio::test]
877 async fn test_write_read_large_string() {
878 let mut builder = RegionMetadataBuilder::new(RegionId::new(123, 456));
879 builder
880 .push_column_metadata(ColumnMetadata {
881 column_schema: ColumnSchema::new("k0", ConcreteDataType::string_datatype(), false),
882 semantic_type: SemanticType::Field,
883 column_id: 0,
884 })
885 .push_column_metadata(ColumnMetadata {
886 column_schema: ColumnSchema::new(
887 "ts",
888 ConcreteDataType::timestamp_millisecond_datatype(),
889 false,
890 ),
891 semantic_type: SemanticType::Timestamp,
892 column_id: 1,
893 })
894 .primary_key(vec![]);
895 let region_meta = Arc::new(builder.build().unwrap());
896 let memtable =
897 SimpleBulkMemtable::new(0, region_meta.clone(), None, true, MergeMode::LastRow);
898 memtable
899 .write_bulk(BulkPart {
900 batch: rb_with_large_string(0, i32::MAX, ®ion_meta),
901 max_timestamp: 0,
902 min_timestamp: 0,
903 sequence: 0,
904 timestamp_index: 1,
905 raw_data: None,
906 })
907 .unwrap();
908
909 memtable.freeze().unwrap();
910 memtable
911 .write_bulk(BulkPart {
912 batch: rb_with_large_string(1, 3, ®ion_meta),
913 max_timestamp: 1,
914 min_timestamp: 1,
915 sequence: 1,
916 timestamp_index: 1,
917 raw_data: None,
918 })
919 .unwrap();
920 let MemtableRanges { ranges, .. } =
921 memtable.ranges(None, RangesOptions::default()).unwrap();
922 let mut source = if ranges.len() == 1 {
923 let only_range = ranges.into_values().next().unwrap();
924 Source::Iter(only_range.build_iter().unwrap())
925 } else {
926 let sources = ranges
927 .into_values()
928 .map(|r| r.build_iter().map(Source::Iter))
929 .collect::<error::Result<Vec<_>>>()
930 .unwrap();
931 let merge_reader = MergeReaderBuilder::from_sources(sources)
932 .build()
933 .await
934 .unwrap();
935 Source::Reader(Box::new(merge_reader))
936 };
937
938 let mut rows = 0;
939 while let Some(b) = source.next_batch().await.unwrap() {
940 rows += b.num_rows();
941 }
942 assert_eq!(rows, 2);
943 }
944}