1#[cfg(any(test, feature = "test"))]
16mod test_only;
17
18use std::collections::HashSet;
19use std::fmt::{Debug, Formatter};
20use std::sync::atomic::{AtomicI64, AtomicU64, AtomicUsize, Ordering};
21use std::sync::{Arc, RwLock};
22use std::time::{Duration, Instant};
23
24use api::v1::OpType;
25use datatypes::vectors::Helper;
26use mito_codec::key_values::KeyValue;
27use rayon::prelude::*;
28use snafu::{OptionExt, ResultExt};
29use store_api::metadata::RegionMetadataRef;
30use store_api::storage::ColumnId;
31
32use crate::flush::WriteBufferManagerRef;
33use crate::memtable::bulk::part::BulkPart;
34use crate::memtable::stats::WriteMetrics;
35use crate::memtable::time_series::Series;
36use crate::memtable::{
37 AllocTracker, BatchToRecordBatchContext, BoxedBatchIterator, IterBuilder, KeyValues,
38 MemScanMetrics, Memtable, MemtableId, MemtableRange, MemtableRangeContext, MemtableRanges,
39 MemtableRef, MemtableStats, RangesOptions, read_column_ids_from_projection,
40};
41use crate::metrics::MEMTABLE_ACTIVE_SERIES_COUNT;
42use crate::read::Batch;
43use crate::read::dedup::LastNonNullIter;
44use crate::region::options::MergeMode;
45use crate::{error, metrics};
46
47pub struct SimpleBulkMemtable {
48 id: MemtableId,
49 region_metadata: RegionMetadataRef,
50 alloc_tracker: AllocTracker,
51 max_timestamp: AtomicI64,
52 min_timestamp: AtomicI64,
53 max_sequence: AtomicU64,
54 dedup: bool,
55 merge_mode: MergeMode,
56 num_rows: AtomicUsize,
57 series: RwLock<Series>,
58}
59
60impl Drop for SimpleBulkMemtable {
61 fn drop(&mut self) {
62 MEMTABLE_ACTIVE_SERIES_COUNT.dec();
63 }
64}
65
66impl SimpleBulkMemtable {
67 pub fn new(
68 id: MemtableId,
69 region_metadata: RegionMetadataRef,
70 write_buffer_manager: Option<WriteBufferManagerRef>,
71 dedup: bool,
72 merge_mode: MergeMode,
73 ) -> Self {
74 let series = RwLock::new(Series::with_capacity(®ion_metadata, 1024, 8192));
75
76 Self {
77 id,
78 region_metadata,
79 alloc_tracker: AllocTracker::new(write_buffer_manager),
80 max_timestamp: AtomicI64::new(i64::MIN),
81 min_timestamp: AtomicI64::new(i64::MAX),
82 max_sequence: AtomicU64::new(0),
83 dedup,
84 merge_mode,
85 num_rows: AtomicUsize::new(0),
86 series,
87 }
88 }
89
90 fn build_projection(&self, projection: Option<&[ColumnId]>) -> HashSet<ColumnId> {
91 if let Some(projection) = projection {
92 projection.iter().copied().collect()
93 } else {
94 self.region_metadata
95 .field_columns()
96 .map(|c| c.column_id)
97 .collect()
98 }
99 }
100
101 fn write_key_value(&self, kv: KeyValue, stats: &mut WriteMetrics) {
102 let ts = kv.timestamp();
103 let sequence = kv.sequence();
104 let op_type = kv.op_type();
105 let mut series = self.series.write().unwrap();
106 let size = series.push(ts, sequence, op_type, kv.fields());
107 stats.value_bytes += size;
108 let ts = kv
110 .timestamp()
111 .try_into_timestamp()
112 .unwrap()
113 .unwrap()
114 .value();
115 stats.min_ts = stats.min_ts.min(ts);
116 stats.max_ts = stats.max_ts.max(ts);
117 }
118
119 fn update_stats(&self, stats: WriteMetrics) {
121 self.alloc_tracker
122 .on_allocation(stats.key_bytes + stats.value_bytes);
123 self.num_rows.fetch_add(stats.num_rows, Ordering::SeqCst);
124 self.max_timestamp.fetch_max(stats.max_ts, Ordering::SeqCst);
125 self.min_timestamp.fetch_min(stats.min_ts, Ordering::SeqCst);
126 self.max_sequence
127 .fetch_max(stats.max_sequence, Ordering::SeqCst);
128 }
129
130 #[cfg(test)]
131 fn schema(&self) -> &RegionMetadataRef {
132 &self.region_metadata
133 }
134}
135
136impl Debug for SimpleBulkMemtable {
137 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
138 f.debug_struct("SimpleBulkMemtable").finish()
139 }
140}
141
142impl Memtable for SimpleBulkMemtable {
143 fn id(&self) -> MemtableId {
144 self.id
145 }
146
147 fn write(&self, kvs: &KeyValues) -> error::Result<()> {
148 let mut stats = WriteMetrics::default();
149 let max_sequence = kvs.max_sequence();
150 for kv in kvs.iter() {
151 self.write_key_value(kv, &mut stats);
152 }
153 stats.max_sequence = max_sequence;
154 stats.num_rows = kvs.num_rows();
155 self.update_stats(stats);
156 Ok(())
157 }
158
159 fn write_one(&self, kv: KeyValue) -> error::Result<()> {
160 debug_assert_eq!(0, kv.num_primary_keys());
161 let mut stats = WriteMetrics::default();
162 self.write_key_value(kv, &mut stats);
163 stats.num_rows = 1;
164 stats.max_sequence = kv.sequence();
165 self.update_stats(stats);
166 Ok(())
167 }
168
169 fn write_bulk(&self, part: BulkPart) -> error::Result<()> {
170 let rb = &part.batch;
171
172 let ts = Helper::try_into_vector(
173 rb.column_by_name(&self.region_metadata.time_index_column().column_schema.name)
174 .with_context(|| error::InvalidRequestSnafu {
175 region_id: self.region_metadata.region_id,
176 reason: "Timestamp not found",
177 })?,
178 )
179 .context(error::ConvertVectorSnafu)?;
180
181 let sequence = part.sequence;
182
183 let fields: Vec<_> = self
184 .region_metadata
185 .field_columns()
186 .map(|f| {
187 let array = rb.column_by_name(&f.column_schema.name).ok_or_else(|| {
188 error::InvalidRequestSnafu {
189 region_id: self.region_metadata.region_id,
190 reason: format!("Column {} not found", f.column_schema.name),
191 }
192 .build()
193 })?;
194 Helper::try_into_vector(array).context(error::ConvertVectorSnafu)
195 })
196 .collect::<error::Result<Vec<_>>>()?;
197
198 let mut series = self.series.write().unwrap();
199 let extend_timer = metrics::REGION_WORKER_HANDLE_WRITE_ELAPSED
200 .with_label_values(&["bulk_extend"])
201 .start_timer();
202 series.extend(ts, OpType::Put as u8, sequence, fields)?;
203 extend_timer.observe_duration();
204
205 self.update_stats(WriteMetrics {
206 key_bytes: 0,
207 value_bytes: part.estimated_size(),
208 min_ts: part.min_timestamp,
209 max_ts: part.max_timestamp,
210 num_rows: part.num_rows(),
211 max_sequence: sequence,
212 });
213 Ok(())
214 }
215
216 fn ranges(
217 &self,
218 projection: Option<&[ColumnId]>,
219 options: RangesOptions,
220 ) -> error::Result<MemtableRanges> {
221 let predicate = options.predicate;
222 let sequence = options.sequence;
223 let start_time = Instant::now();
224 let read_column_ids = read_column_ids_from_projection(&self.region_metadata, projection);
225 let projection = Arc::new(self.build_projection(projection));
226
227 let max_sequence = self.max_sequence.load(Ordering::Relaxed);
229 let time_range = {
230 let num_rows = self.num_rows.load(Ordering::Relaxed);
231 if num_rows > 0 {
232 let ts_type = self.region_metadata.time_index_type();
233 let max_timestamp =
234 ts_type.create_timestamp(self.max_timestamp.load(Ordering::Relaxed));
235 let min_timestamp =
236 ts_type.create_timestamp(self.min_timestamp.load(Ordering::Relaxed));
237 Some((min_timestamp, max_timestamp))
238 } else {
239 None
240 }
241 };
242
243 let values = self.series.read().unwrap().read_to_values();
244 let batch_to_record_batch = Arc::new(BatchToRecordBatchContext::new(
245 self.region_metadata.clone(),
246 read_column_ids.clone(),
247 ));
248
249 let contexts = values
250 .into_par_iter()
251 .filter_map(|v| {
252 let filtered = match v.to_batch(
253 &[],
254 &self.region_metadata,
255 &projection,
256 sequence,
257 self.dedup,
258 self.merge_mode,
259 ) {
260 Ok(filtered) => filtered,
261 Err(e) => {
262 return Some(Err(e));
263 }
264 };
265 if filtered.is_empty() {
266 None
267 } else {
268 Some(Ok(filtered))
269 }
270 })
271 .map(|result| {
272 result.map(|batch| {
273 let num_rows = batch.num_rows();
274 let estimated_bytes = batch.memory_size();
275
276 let range_stats = MemtableStats {
277 estimated_bytes,
278 time_range,
279 num_rows,
280 num_ranges: 1,
281 max_sequence,
282 series_count: 1,
283 };
284
285 let builder = BatchRangeBuilder {
286 batch,
287 merge_mode: self.merge_mode,
288 scan_cost: start_time.elapsed(),
289 };
290 (
291 range_stats,
292 Arc::new(MemtableRangeContext::new_with_batch_to_record_batch(
293 self.id,
294 Box::new(builder),
295 predicate.clone(),
296 Some(batch_to_record_batch.clone()),
297 )),
298 )
299 })
300 })
301 .collect::<error::Result<Vec<_>>>()?;
302
303 let ranges = contexts
304 .into_iter()
305 .enumerate()
306 .map(|(idx, (range_stats, context))| (idx, MemtableRange::new(context, range_stats)))
307 .collect();
308
309 Ok(MemtableRanges { ranges })
310 }
311
312 fn is_empty(&self) -> bool {
313 self.series.read().unwrap().is_empty()
314 }
315
316 fn freeze(&self) -> error::Result<()> {
317 self.series.write().unwrap().freeze(&self.region_metadata);
318 Ok(())
319 }
320
321 fn stats(&self) -> MemtableStats {
322 let estimated_bytes = self.alloc_tracker.bytes_allocated();
323 let num_rows = self.num_rows.load(Ordering::Relaxed);
324 if num_rows == 0 {
325 return MemtableStats {
327 estimated_bytes,
328 time_range: None,
329 num_rows: 0,
330 num_ranges: 0,
331 max_sequence: 0,
332 series_count: 0,
333 };
334 }
335 let ts_type = self.region_metadata.time_index_type();
336 let max_timestamp = ts_type.create_timestamp(self.max_timestamp.load(Ordering::Relaxed));
337 let min_timestamp = ts_type.create_timestamp(self.min_timestamp.load(Ordering::Relaxed));
338 MemtableStats {
339 estimated_bytes,
340 time_range: Some((min_timestamp, max_timestamp)),
341 num_rows,
342 num_ranges: 1,
343 max_sequence: self.max_sequence.load(Ordering::Relaxed),
344 series_count: 1,
345 }
346 }
347
348 fn fork(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef {
349 Arc::new(Self::new(
350 id,
351 metadata.clone(),
352 self.alloc_tracker.write_buffer_manager(),
353 self.dedup,
354 self.merge_mode,
355 ))
356 }
357}
358
359#[derive(Clone)]
360pub struct BatchRangeBuilder {
361 pub batch: Batch,
362 pub merge_mode: MergeMode,
363 scan_cost: Duration,
364}
365
366impl IterBuilder for BatchRangeBuilder {
367 fn build(&self, metrics: Option<MemScanMetrics>) -> error::Result<BoxedBatchIterator> {
368 let batch = self.batch.clone();
369 if let Some(metrics) = metrics {
370 let inner = crate::memtable::MemScanMetricsData {
371 total_series: 1,
372 num_rows: batch.num_rows(),
373 num_batches: 1,
374 scan_cost: self.scan_cost,
375 };
376 metrics.merge_inner(&inner);
377 }
378
379 let iter = Iter {
380 batch: Some(Ok(batch)),
381 };
382
383 if self.merge_mode == MergeMode::LastNonNull {
384 Ok(Box::new(LastNonNullIter::new(iter)))
385 } else {
386 Ok(Box::new(iter))
387 }
388 }
389}
390
391struct Iter {
392 batch: Option<error::Result<Batch>>,
393}
394
395impl Iterator for Iter {
396 type Item = error::Result<Batch>;
397
398 fn next(&mut self) -> Option<Self::Item> {
399 self.batch.take()
400 }
401}
402
403#[cfg(test)]
404mod tests {
405 use std::sync::Arc;
406
407 use api::v1::helper::row;
408 use api::v1::value::ValueData;
409 use api::v1::{Mutation, OpType, Rows, SemanticType};
410 use common_recordbatch::DfRecordBatch;
411 use common_time::Timestamp;
412 use datatypes::arrow::array::{ArrayRef, Float64Array, RecordBatch, TimestampMillisecondArray};
413 use datatypes::arrow_array::StringArray;
414 use datatypes::data_type::ConcreteDataType;
415 use datatypes::prelude::{ScalarVector, Vector};
416 use datatypes::schema::ColumnSchema;
417 use datatypes::value::Value;
418 use datatypes::vectors::TimestampMillisecondVector;
419 use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
420 use store_api::storage::{RegionId, SequenceNumber, SequenceRange};
421
422 use super::*;
423 use crate::read;
424 use crate::read::dedup::DedupReader;
425 use crate::read::merge::MergeReaderBuilder;
426 use crate::read::{BatchReader, Source};
427 use crate::region::options::MergeMode;
428 use crate::test_util::column_metadata_to_column_schema;
429
430 fn new_test_metadata() -> RegionMetadataRef {
431 let mut builder = RegionMetadataBuilder::new(1.into());
432 builder
433 .push_column_metadata(ColumnMetadata {
434 column_schema: ColumnSchema::new(
435 "ts",
436 ConcreteDataType::timestamp_millisecond_datatype(),
437 false,
438 ),
439 semantic_type: SemanticType::Timestamp,
440 column_id: 1,
441 })
442 .push_column_metadata(ColumnMetadata {
443 column_schema: ColumnSchema::new("f1", ConcreteDataType::float64_datatype(), true),
444 semantic_type: SemanticType::Field,
445 column_id: 2,
446 })
447 .push_column_metadata(ColumnMetadata {
448 column_schema: ColumnSchema::new("f2", ConcreteDataType::string_datatype(), true),
449 semantic_type: SemanticType::Field,
450 column_id: 3,
451 });
452 Arc::new(builder.build().unwrap())
453 }
454
455 fn new_test_memtable(dedup: bool, merge_mode: MergeMode) -> SimpleBulkMemtable {
456 SimpleBulkMemtable::new(1, new_test_metadata(), None, dedup, merge_mode)
457 }
458
459 fn build_key_values(
460 metadata: &RegionMetadataRef,
461 sequence: SequenceNumber,
462 row_values: &[(i64, f64, String)],
463 op_type: OpType,
464 ) -> KeyValues {
465 let column_schemas: Vec<_> = metadata
466 .column_metadatas
467 .iter()
468 .map(column_metadata_to_column_schema)
469 .collect();
470
471 let rows: Vec<_> = row_values
472 .iter()
473 .map(|(ts, f1, f2)| {
474 row(vec![
475 ValueData::TimestampMillisecondValue(*ts),
476 ValueData::F64Value(*f1),
477 ValueData::StringValue(f2.clone()),
478 ])
479 })
480 .collect();
481 let mutation = Mutation {
482 op_type: op_type as i32,
483 sequence,
484 rows: Some(Rows {
485 schema: column_schemas,
486 rows,
487 }),
488 write_hint: None,
489 };
490 KeyValues::new(metadata, mutation).unwrap()
491 }
492
493 #[test]
494 fn test_write_and_iter() {
495 let memtable = new_test_memtable(false, MergeMode::LastRow);
496 memtable
497 .write(&build_key_values(
498 &memtable.region_metadata,
499 0,
500 &[(1, 1.0, "a".to_string())],
501 OpType::Put,
502 ))
503 .unwrap();
504 memtable
505 .write(&build_key_values(
506 &memtable.region_metadata,
507 1,
508 &[(2, 2.0, "b".to_string())],
509 OpType::Put,
510 ))
511 .unwrap();
512
513 let mut iter = memtable
514 .ranges(None, RangesOptions::default())
515 .unwrap()
516 .build(None)
517 .unwrap();
518 let batch = iter.next().unwrap().unwrap();
519 assert_eq!(2, batch.num_rows());
520 assert_eq!(2, batch.fields().len());
521 let ts_v = batch
522 .timestamps()
523 .as_any()
524 .downcast_ref::<TimestampMillisecondVector>()
525 .unwrap();
526 assert_eq!(Value::Timestamp(Timestamp::new_millisecond(1)), ts_v.get(0));
527 assert_eq!(Value::Timestamp(Timestamp::new_millisecond(2)), ts_v.get(1));
528 }
529
530 #[test]
531 fn test_projection() {
532 let memtable = new_test_memtable(false, MergeMode::LastRow);
533 memtable
534 .write(&build_key_values(
535 &memtable.region_metadata,
536 0,
537 &[(1, 1.0, "a".to_string())],
538 OpType::Put,
539 ))
540 .unwrap();
541
542 let mut iter = memtable
543 .ranges(None, RangesOptions::default())
544 .unwrap()
545 .build(None)
546 .unwrap();
547 let batch = iter.next().unwrap().unwrap();
548 assert_eq!(1, batch.num_rows());
549 assert_eq!(2, batch.fields().len());
550
551 let ts_v = batch
552 .timestamps()
553 .as_any()
554 .downcast_ref::<TimestampMillisecondVector>()
555 .unwrap();
556 assert_eq!(Value::Timestamp(Timestamp::new_millisecond(1)), ts_v.get(0));
557
558 let projection = vec![2];
560 let mut iter = memtable
561 .ranges(Some(&projection), RangesOptions::default())
562 .unwrap()
563 .build(None)
564 .unwrap();
565 let batch = iter.next().unwrap().unwrap();
566
567 assert_eq!(1, batch.num_rows());
568 assert_eq!(1, batch.fields().len()); assert_eq!(2, batch.fields()[0].column_id);
570 }
571
572 #[test]
573 fn test_dedup() {
574 let memtable = new_test_memtable(true, MergeMode::LastRow);
575 memtable
576 .write(&build_key_values(
577 &memtable.region_metadata,
578 0,
579 &[(1, 1.0, "a".to_string())],
580 OpType::Put,
581 ))
582 .unwrap();
583 memtable
584 .write(&build_key_values(
585 &memtable.region_metadata,
586 1,
587 &[(1, 2.0, "b".to_string())],
588 OpType::Put,
589 ))
590 .unwrap();
591 let mut iter = memtable
592 .ranges(None, RangesOptions::default())
593 .unwrap()
594 .build(None)
595 .unwrap();
596 let batch = iter.next().unwrap().unwrap();
597
598 assert_eq!(1, batch.num_rows()); assert_eq!(2.0, batch.fields()[0].data.get(0).as_f64_lossy().unwrap()); }
601
602 #[test]
603 fn test_write_one() {
604 let memtable = new_test_memtable(false, MergeMode::LastRow);
605 let kvs = build_key_values(
606 &memtable.region_metadata,
607 0,
608 &[(1, 1.0, "a".to_string())],
609 OpType::Put,
610 );
611 let kv = kvs.iter().next().unwrap();
612 memtable.write_one(kv).unwrap();
613
614 let mut iter = memtable
615 .ranges(None, RangesOptions::default())
616 .unwrap()
617 .build(None)
618 .unwrap();
619 let batch = iter.next().unwrap().unwrap();
620 assert_eq!(1, batch.num_rows());
621 }
622
623 #[tokio::test]
624 async fn test_write_dedup() {
625 let memtable = new_test_memtable(true, MergeMode::LastRow);
626 let kvs = build_key_values(
627 &memtable.region_metadata,
628 0,
629 &[(1, 1.0, "a".to_string())],
630 OpType::Put,
631 );
632 let kv = kvs.iter().next().unwrap();
633 memtable.write_one(kv).unwrap();
634 memtable.freeze().unwrap();
635
636 let kvs = build_key_values(
637 &memtable.region_metadata,
638 1,
639 &[(1, 1.0, "a".to_string())],
640 OpType::Delete,
641 );
642 let kv = kvs.iter().next().unwrap();
643 memtable.write_one(kv).unwrap();
644
645 let ranges = memtable.ranges(None, RangesOptions::default()).unwrap();
646 let mut source = vec![];
647 for r in ranges.ranges.values() {
648 source.push(Source::Iter(r.build_iter().unwrap()));
649 }
650
651 let reader = MergeReaderBuilder::from_sources(source)
652 .build()
653 .await
654 .unwrap();
655
656 let mut reader = DedupReader::new(reader, read::dedup::LastRow::new(false), None);
657 let mut num_rows = 0;
658 while let Some(b) = reader.next_batch().await.unwrap() {
659 num_rows += b.num_rows();
660 }
661 assert_eq!(num_rows, 1);
662 }
663
664 #[tokio::test]
665 async fn test_delete_only() {
666 let memtable = new_test_memtable(true, MergeMode::LastRow);
667 let kvs = build_key_values(
668 &memtable.region_metadata,
669 0,
670 &[(1, 1.0, "a".to_string())],
671 OpType::Delete,
672 );
673 let kv = kvs.iter().next().unwrap();
674 memtable.write_one(kv).unwrap();
675 memtable.freeze().unwrap();
676
677 let ranges = memtable.ranges(None, RangesOptions::default()).unwrap();
678 let mut source = vec![];
679 for r in ranges.ranges.values() {
680 source.push(Source::Iter(r.build_iter().unwrap()));
681 }
682
683 let reader = MergeReaderBuilder::from_sources(source)
684 .build()
685 .await
686 .unwrap();
687
688 let mut reader = DedupReader::new(reader, read::dedup::LastRow::new(false), None);
689 let mut num_rows = 0;
690 while let Some(b) = reader.next_batch().await.unwrap() {
691 num_rows += b.num_rows();
692 assert_eq!(b.num_rows(), 1);
693 assert_eq!(b.op_types().get_data(0).unwrap(), OpType::Delete as u8);
694 }
695 assert_eq!(num_rows, 1);
696 }
697
698 #[tokio::test]
699 async fn test_single_range() {
700 let memtable = new_test_memtable(true, MergeMode::LastRow);
701 let kvs = build_key_values(
702 &memtable.region_metadata,
703 0,
704 &[(1, 1.0, "a".to_string())],
705 OpType::Put,
706 );
707 memtable.write_one(kvs.iter().next().unwrap()).unwrap();
708
709 let kvs = build_key_values(
710 &memtable.region_metadata,
711 1,
712 &[(1, 2.0, "b".to_string())],
713 OpType::Put,
714 );
715 memtable.write_one(kvs.iter().next().unwrap()).unwrap();
716 memtable.freeze().unwrap();
717
718 let ranges = memtable.ranges(None, RangesOptions::default()).unwrap();
719 assert_eq!(ranges.ranges.len(), 1);
720 let range = ranges.ranges.into_values().next().unwrap();
721 let mut reader = range.context.builder.build(None).unwrap();
722
723 let mut num_rows = 0;
724 while let Some(b) = reader.next().transpose().unwrap() {
725 num_rows += b.num_rows();
726 assert_eq!(b.fields()[1].data.get(0).as_string(), Some("b".to_string()));
727 }
728 assert_eq!(num_rows, 1);
729 }
730
731 #[test]
732 fn test_write_bulk() {
733 let memtable = new_test_memtable(false, MergeMode::LastRow);
734 let arrow_schema = memtable.schema().schema.arrow_schema().clone();
735 let arrays = vec![
736 Arc::new(TimestampMillisecondArray::from(vec![1, 2])) as ArrayRef,
737 Arc::new(Float64Array::from(vec![1.0, 2.0])) as ArrayRef,
738 Arc::new(StringArray::from(vec!["a", "b"])) as ArrayRef,
739 ];
740 let rb = DfRecordBatch::try_new(arrow_schema, arrays).unwrap();
741
742 let part = BulkPart {
743 batch: rb,
744 sequence: 1,
745 min_timestamp: 1,
746 max_timestamp: 2,
747 timestamp_index: 0,
748 raw_data: None,
749 };
750 memtable.write_bulk(part).unwrap();
751
752 let mut iter = memtable
753 .ranges(None, RangesOptions::default())
754 .unwrap()
755 .build(None)
756 .unwrap();
757 let batch = iter.next().unwrap().unwrap();
758 assert_eq!(2, batch.num_rows());
759
760 let stats = memtable.stats();
761 assert_eq!(1, stats.max_sequence);
762 assert_eq!(2, stats.num_rows);
763 assert_eq!(
764 Some((Timestamp::new_millisecond(1), Timestamp::new_millisecond(2))),
765 stats.time_range
766 );
767
768 let kvs = build_key_values(
769 &memtable.region_metadata,
770 2,
771 &[(3, 3.0, "c".to_string())],
772 OpType::Put,
773 );
774 memtable.write(&kvs).unwrap();
775 let mut iter = memtable
776 .ranges(None, RangesOptions::default())
777 .unwrap()
778 .build(None)
779 .unwrap();
780 let batch = iter.next().unwrap().unwrap();
781 assert_eq!(3, batch.num_rows());
782 assert_eq!(
783 vec![1, 2, 3],
784 batch
785 .timestamps()
786 .as_any()
787 .downcast_ref::<TimestampMillisecondVector>()
788 .unwrap()
789 .iter_data()
790 .map(|t| { t.unwrap().0.value() })
791 .collect::<Vec<_>>()
792 );
793 }
794
795 #[test]
796 fn test_is_empty() {
797 let memtable = new_test_memtable(false, MergeMode::LastRow);
798 assert!(memtable.is_empty());
799
800 memtable
801 .write(&build_key_values(
802 &memtable.region_metadata,
803 0,
804 &[(1, 1.0, "a".to_string())],
805 OpType::Put,
806 ))
807 .unwrap();
808 assert!(!memtable.is_empty());
809 }
810
811 #[test]
812 fn test_stats() {
813 let memtable = new_test_memtable(false, MergeMode::LastRow);
814 let stats = memtable.stats();
815 assert_eq!(0, stats.num_rows);
816 assert!(stats.time_range.is_none());
817
818 memtable
819 .write(&build_key_values(
820 &memtable.region_metadata,
821 0,
822 &[(1, 1.0, "a".to_string())],
823 OpType::Put,
824 ))
825 .unwrap();
826 let stats = memtable.stats();
827 assert_eq!(1, stats.num_rows);
828 assert!(stats.time_range.is_some());
829 }
830
831 #[test]
832 fn test_fork() {
833 let memtable = new_test_memtable(false, MergeMode::LastRow);
834 memtable
835 .write(&build_key_values(
836 &memtable.region_metadata,
837 0,
838 &[(1, 1.0, "a".to_string())],
839 OpType::Put,
840 ))
841 .unwrap();
842
843 let forked = memtable.fork(2, &memtable.region_metadata);
844 assert!(forked.is_empty());
845 }
846
847 #[test]
848 fn test_sequence_filter() {
849 let memtable = new_test_memtable(false, MergeMode::LastRow);
850 memtable
851 .write(&build_key_values(
852 &memtable.region_metadata,
853 0,
854 &[(1, 1.0, "a".to_string())],
855 OpType::Put,
856 ))
857 .unwrap();
858 memtable
859 .write(&build_key_values(
860 &memtable.region_metadata,
861 1,
862 &[(2, 2.0, "b".to_string())],
863 OpType::Put,
864 ))
865 .unwrap();
866
867 let mut iter = memtable
869 .ranges(
870 None,
871 RangesOptions {
872 sequence: Some(SequenceRange::LtEq { max: 0 }),
873 ..Default::default()
874 },
875 )
876 .unwrap()
877 .build(None)
878 .unwrap();
879 let batch = iter.next().unwrap().unwrap();
880 assert_eq!(1, batch.num_rows());
881 assert_eq!(1.0, batch.fields()[0].data.get(0).as_f64_lossy().unwrap());
882 }
883
884 fn rb_with_large_string(
885 ts: i64,
886 string_len: i32,
887 region_meta: &RegionMetadataRef,
888 ) -> RecordBatch {
889 let schema = region_meta.schema.arrow_schema().clone();
890 RecordBatch::try_new(
891 schema,
892 vec![
893 Arc::new(StringArray::from_iter_values(
894 ["a".repeat(string_len as usize).clone()].into_iter(),
895 )) as ArrayRef,
896 Arc::new(TimestampMillisecondArray::from_iter_values(
897 [ts].into_iter(),
898 )) as ArrayRef,
899 ],
900 )
901 .unwrap()
902 }
903
904 #[tokio::test]
905 async fn test_write_read_large_string() {
906 let mut builder = RegionMetadataBuilder::new(RegionId::new(123, 456));
907 builder
908 .push_column_metadata(ColumnMetadata {
909 column_schema: ColumnSchema::new("k0", ConcreteDataType::string_datatype(), false),
910 semantic_type: SemanticType::Field,
911 column_id: 0,
912 })
913 .push_column_metadata(ColumnMetadata {
914 column_schema: ColumnSchema::new(
915 "ts",
916 ConcreteDataType::timestamp_millisecond_datatype(),
917 false,
918 ),
919 semantic_type: SemanticType::Timestamp,
920 column_id: 1,
921 })
922 .primary_key(vec![]);
923 let region_meta = Arc::new(builder.build().unwrap());
924 let memtable =
925 SimpleBulkMemtable::new(0, region_meta.clone(), None, true, MergeMode::LastRow);
926 memtable
927 .write_bulk(BulkPart {
928 batch: rb_with_large_string(0, i32::MAX, ®ion_meta),
929 max_timestamp: 0,
930 min_timestamp: 0,
931 sequence: 0,
932 timestamp_index: 1,
933 raw_data: None,
934 })
935 .unwrap();
936
937 memtable.freeze().unwrap();
938 memtable
939 .write_bulk(BulkPart {
940 batch: rb_with_large_string(1, 3, ®ion_meta),
941 max_timestamp: 1,
942 min_timestamp: 1,
943 sequence: 1,
944 timestamp_index: 1,
945 raw_data: None,
946 })
947 .unwrap();
948 let MemtableRanges { ranges, .. } =
949 memtable.ranges(None, RangesOptions::default()).unwrap();
950 let mut source = if ranges.len() == 1 {
951 let only_range = ranges.into_values().next().unwrap();
952 Source::Iter(only_range.build_iter().unwrap())
953 } else {
954 let sources = ranges
955 .into_values()
956 .map(|r| r.build_iter().map(Source::Iter))
957 .collect::<error::Result<Vec<_>>>()
958 .unwrap();
959 let merge_reader = MergeReaderBuilder::from_sources(sources)
960 .build()
961 .await
962 .unwrap();
963 Source::Reader(Box::new(merge_reader))
964 };
965
966 let mut rows = 0;
967 while let Some(b) = source.next_batch().await.unwrap() {
968 rows += b.num_rows();
969 }
970 assert_eq!(rows, 2);
971 }
972
973 #[test]
974 fn test_build_record_batch_iter_from_memtable() {
975 let memtable = new_test_memtable(false, MergeMode::LastRow);
976
977 let kvs = build_key_values(
978 &memtable.region_metadata,
979 0,
980 &[(1, 1.0, "a".to_string()), (2, 2.0, "b".to_string())],
981 OpType::Put,
982 );
983 memtable.write(&kvs).unwrap();
984
985 let read_column_ids: Vec<ColumnId> = memtable
986 .region_metadata
987 .column_metadatas
988 .iter()
989 .map(|c| c.column_id)
990 .collect();
991 let ranges = memtable
992 .ranges(Some(&read_column_ids), RangesOptions::default())
993 .unwrap();
994 assert!(!ranges.ranges.is_empty());
995
996 let mut total_rows = 0;
997 for range in ranges.ranges.into_values() {
998 let mut iter = range.build_record_batch_iter(None, None).unwrap();
999 while let Some(rb) = iter.next().transpose().unwrap() {
1000 total_rows += rb.num_rows();
1001 let schema = rb.schema();
1002 let column_names: Vec<_> =
1003 schema.fields().iter().map(|f| f.name().as_str()).collect();
1004 assert_eq!(
1005 column_names,
1006 vec!["f1", "f2", "ts", "__primary_key", "__sequence", "__op_type"]
1007 );
1008 }
1009 }
1010 assert_eq!(2, total_rows);
1011 }
1012}