1#[cfg(any(test, feature = "test"))]
16mod test_only;
17
18use std::collections::HashSet;
19use std::fmt::{Debug, Formatter};
20use std::sync::atomic::{AtomicI64, AtomicU64, AtomicUsize, Ordering};
21use std::sync::{Arc, RwLock};
22use std::time::{Duration, Instant};
23
24use api::v1::OpType;
25use datatypes::vectors::Helper;
26use mito_codec::key_values::KeyValue;
27use rayon::prelude::*;
28use snafu::{OptionExt, ResultExt};
29use store_api::metadata::RegionMetadataRef;
30use store_api::storage::ColumnId;
31
32use crate::flush::WriteBufferManagerRef;
33use crate::memtable::bulk::part::BulkPart;
34use crate::memtable::stats::WriteMetrics;
35use crate::memtable::time_series::Series;
36use crate::memtable::{
37 AllocTracker, BatchToRecordBatchContext, BoxedBatchIterator, IterBuilder, KeyValues,
38 MemScanMetrics, Memtable, MemtableId, MemtableRange, MemtableRangeContext, MemtableRanges,
39 MemtableRef, MemtableStats, RangesOptions, read_column_ids_from_projection,
40};
41use crate::metrics::MEMTABLE_ACTIVE_SERIES_COUNT;
42use crate::read::Batch;
43use crate::read::dedup::LastNonNullIter;
44use crate::region::options::MergeMode;
45use crate::{error, metrics};
46
47pub struct SimpleBulkMemtable {
48 id: MemtableId,
49 region_metadata: RegionMetadataRef,
50 alloc_tracker: AllocTracker,
51 max_timestamp: AtomicI64,
52 min_timestamp: AtomicI64,
53 max_sequence: AtomicU64,
54 dedup: bool,
55 merge_mode: MergeMode,
56 num_rows: AtomicUsize,
57 series: RwLock<Series>,
58}
59
60impl Drop for SimpleBulkMemtable {
61 fn drop(&mut self) {
62 MEMTABLE_ACTIVE_SERIES_COUNT.dec();
63 }
64}
65
66impl SimpleBulkMemtable {
67 pub fn new(
68 id: MemtableId,
69 region_metadata: RegionMetadataRef,
70 write_buffer_manager: Option<WriteBufferManagerRef>,
71 dedup: bool,
72 merge_mode: MergeMode,
73 ) -> Self {
74 let series = RwLock::new(Series::with_capacity(®ion_metadata, 1024, 8192));
75
76 Self {
77 id,
78 region_metadata,
79 alloc_tracker: AllocTracker::new(write_buffer_manager),
80 max_timestamp: AtomicI64::new(i64::MIN),
81 min_timestamp: AtomicI64::new(i64::MAX),
82 max_sequence: AtomicU64::new(0),
83 dedup,
84 merge_mode,
85 num_rows: AtomicUsize::new(0),
86 series,
87 }
88 }
89
90 fn build_projection(&self, projection: Option<&[ColumnId]>) -> HashSet<ColumnId> {
91 if let Some(projection) = projection {
92 projection.iter().copied().collect()
93 } else {
94 self.region_metadata
95 .field_columns()
96 .map(|c| c.column_id)
97 .collect()
98 }
99 }
100
101 fn write_key_value(&self, kv: KeyValue, stats: &mut WriteMetrics) {
102 let ts = kv.timestamp();
103 let sequence = kv.sequence();
104 let op_type = kv.op_type();
105 let mut series = self.series.write().unwrap();
106 let size = series.push(ts, sequence, op_type, kv.fields());
107 stats.value_bytes += size;
108 let ts = kv
110 .timestamp()
111 .try_into_timestamp()
112 .unwrap()
113 .unwrap()
114 .value();
115 stats.min_ts = stats.min_ts.min(ts);
116 stats.max_ts = stats.max_ts.max(ts);
117 }
118
119 fn update_stats(&self, stats: WriteMetrics) {
121 self.alloc_tracker
122 .on_allocation(stats.key_bytes + stats.value_bytes);
123 self.num_rows.fetch_add(stats.num_rows, Ordering::SeqCst);
124 self.max_timestamp.fetch_max(stats.max_ts, Ordering::SeqCst);
125 self.min_timestamp.fetch_min(stats.min_ts, Ordering::SeqCst);
126 self.max_sequence
127 .fetch_max(stats.max_sequence, Ordering::SeqCst);
128 }
129
130 #[cfg(test)]
131 fn schema(&self) -> &RegionMetadataRef {
132 &self.region_metadata
133 }
134}
135
136impl Debug for SimpleBulkMemtable {
137 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
138 f.debug_struct("SimpleBulkMemtable").finish()
139 }
140}
141
142impl Memtable for SimpleBulkMemtable {
143 fn id(&self) -> MemtableId {
144 self.id
145 }
146
147 fn write(&self, kvs: &KeyValues) -> error::Result<()> {
148 let mut stats = WriteMetrics::default();
149 let max_sequence = kvs.max_sequence();
150 for kv in kvs.iter() {
151 self.write_key_value(kv, &mut stats);
152 }
153 stats.max_sequence = max_sequence;
154 stats.num_rows = kvs.num_rows();
155 self.update_stats(stats);
156 Ok(())
157 }
158
159 fn write_one(&self, kv: KeyValue) -> error::Result<()> {
160 debug_assert_eq!(0, kv.num_primary_keys());
161 let mut stats = WriteMetrics::default();
162 self.write_key_value(kv, &mut stats);
163 stats.num_rows = 1;
164 stats.max_sequence = kv.sequence();
165 self.update_stats(stats);
166 Ok(())
167 }
168
169 fn write_bulk(&self, part: BulkPart) -> error::Result<()> {
170 let rb = &part.batch;
171
172 let ts = Helper::try_into_vector(
173 rb.column_by_name(&self.region_metadata.time_index_column().column_schema.name)
174 .with_context(|| error::InvalidRequestSnafu {
175 region_id: self.region_metadata.region_id,
176 reason: "Timestamp not found",
177 })?,
178 )
179 .context(error::ConvertVectorSnafu)?;
180
181 let sequence = part.sequence;
182
183 let fields: Vec<_> = self
184 .region_metadata
185 .field_columns()
186 .map(|f| {
187 let array = rb.column_by_name(&f.column_schema.name).ok_or_else(|| {
188 error::InvalidRequestSnafu {
189 region_id: self.region_metadata.region_id,
190 reason: format!("Column {} not found", f.column_schema.name),
191 }
192 .build()
193 })?;
194 Helper::try_into_vector(array).context(error::ConvertVectorSnafu)
195 })
196 .collect::<error::Result<Vec<_>>>()?;
197
198 let mut series = self.series.write().unwrap();
199 let extend_timer = metrics::REGION_WORKER_HANDLE_WRITE_ELAPSED
200 .with_label_values(&["bulk_extend"])
201 .start_timer();
202 series.extend(ts, OpType::Put as u8, sequence, fields)?;
203 extend_timer.observe_duration();
204
205 self.update_stats(WriteMetrics {
206 key_bytes: 0,
207 value_bytes: part.estimated_size(),
208 min_ts: part.min_timestamp,
209 max_ts: part.max_timestamp,
210 num_rows: part.num_rows(),
211 max_sequence: sequence,
212 });
213 Ok(())
214 }
215
216 fn ranges(
217 &self,
218 projection: Option<&[ColumnId]>,
219 options: RangesOptions,
220 ) -> error::Result<MemtableRanges> {
221 let predicate = options.predicate;
222 let sequence = options.sequence;
223 let start_time = Instant::now();
224 let read_column_ids = read_column_ids_from_projection(&self.region_metadata, projection);
225 let projection = Arc::new(self.build_projection(projection));
226
227 let max_sequence = self.max_sequence.load(Ordering::Relaxed);
229 let time_range = {
230 let num_rows = self.num_rows.load(Ordering::Relaxed);
231 if num_rows > 0 {
232 let ts_type = self.region_metadata.time_index_type();
233 let max_timestamp =
234 ts_type.create_timestamp(self.max_timestamp.load(Ordering::Relaxed));
235 let min_timestamp =
236 ts_type.create_timestamp(self.min_timestamp.load(Ordering::Relaxed));
237 Some((min_timestamp, max_timestamp))
238 } else {
239 None
240 }
241 };
242
243 let values = self.series.read().unwrap().read_to_values();
244 let batch_to_record_batch = Arc::new(BatchToRecordBatchContext::new(
245 self.region_metadata.clone(),
246 read_column_ids.clone(),
247 ));
248
249 let contexts = values
250 .into_par_iter()
251 .filter_map(|v| {
252 let filtered = match v.to_batch(
253 &[],
254 &self.region_metadata,
255 &projection,
256 sequence,
257 self.dedup,
258 self.merge_mode,
259 ) {
260 Ok(filtered) => filtered,
261 Err(e) => {
262 return Some(Err(e));
263 }
264 };
265 if filtered.is_empty() {
266 None
267 } else {
268 Some(Ok(filtered))
269 }
270 })
271 .map(|result| {
272 result.map(|batch| {
273 let num_rows = batch.num_rows();
274 let estimated_bytes = batch.memory_size();
275
276 let range_stats = MemtableStats {
277 estimated_bytes,
278 time_range,
279 num_rows,
280 num_ranges: 1,
281 max_sequence,
282 series_count: 1,
283 };
284
285 let builder = BatchRangeBuilder {
286 batch,
287 merge_mode: self.merge_mode,
288 scan_cost: start_time.elapsed(),
289 };
290 (
291 range_stats,
292 Arc::new(MemtableRangeContext::new_with_batch_to_record_batch(
293 self.id,
294 Box::new(builder),
295 predicate.clone(),
296 Some(batch_to_record_batch.clone()),
297 )),
298 )
299 })
300 })
301 .collect::<error::Result<Vec<_>>>()?;
302
303 let ranges = contexts
304 .into_iter()
305 .enumerate()
306 .map(|(idx, (range_stats, context))| (idx, MemtableRange::new(context, range_stats)))
307 .collect();
308
309 Ok(MemtableRanges { ranges })
310 }
311
312 fn is_empty(&self) -> bool {
313 self.series.read().unwrap().is_empty()
314 }
315
316 fn freeze(&self) -> error::Result<()> {
317 self.series.write().unwrap().freeze(&self.region_metadata);
318 Ok(())
319 }
320
321 fn stats(&self) -> MemtableStats {
322 let estimated_bytes = self.alloc_tracker.bytes_allocated();
323 let num_rows = self.num_rows.load(Ordering::Relaxed);
324 if num_rows == 0 {
325 return MemtableStats {
327 estimated_bytes,
328 time_range: None,
329 num_rows: 0,
330 num_ranges: 0,
331 max_sequence: 0,
332 series_count: 0,
333 };
334 }
335 let ts_type = self.region_metadata.time_index_type();
336 let max_timestamp = ts_type.create_timestamp(self.max_timestamp.load(Ordering::Relaxed));
337 let min_timestamp = ts_type.create_timestamp(self.min_timestamp.load(Ordering::Relaxed));
338 MemtableStats {
339 estimated_bytes,
340 time_range: Some((min_timestamp, max_timestamp)),
341 num_rows,
342 num_ranges: 1,
343 max_sequence: self.max_sequence.load(Ordering::Relaxed),
344 series_count: 1,
345 }
346 }
347
348 fn fork(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef {
349 Arc::new(Self::new(
350 id,
351 metadata.clone(),
352 self.alloc_tracker.write_buffer_manager(),
353 self.dedup,
354 self.merge_mode,
355 ))
356 }
357}
358
359#[derive(Clone)]
360pub struct BatchRangeBuilder {
361 pub batch: Batch,
362 pub merge_mode: MergeMode,
363 scan_cost: Duration,
364}
365
366impl IterBuilder for BatchRangeBuilder {
367 fn build(&self, metrics: Option<MemScanMetrics>) -> error::Result<BoxedBatchIterator> {
368 let batch = self.batch.clone();
369 if let Some(metrics) = metrics {
370 let inner = crate::memtable::MemScanMetricsData {
371 total_series: 1,
372 num_rows: batch.num_rows(),
373 num_batches: 1,
374 scan_cost: self.scan_cost,
375 ..Default::default()
376 };
377 metrics.merge_inner(&inner);
378 }
379
380 let iter = Iter {
381 batch: Some(Ok(batch)),
382 };
383
384 if self.merge_mode == MergeMode::LastNonNull {
385 Ok(Box::new(LastNonNullIter::new(iter)))
386 } else {
387 Ok(Box::new(iter))
388 }
389 }
390}
391
392struct Iter {
393 batch: Option<error::Result<Batch>>,
394}
395
396impl Iterator for Iter {
397 type Item = error::Result<Batch>;
398
399 fn next(&mut self) -> Option<Self::Item> {
400 self.batch.take()
401 }
402}
403
404#[cfg(test)]
405mod tests {
406 use std::sync::Arc;
407
408 use api::v1::helper::row;
409 use api::v1::value::ValueData;
410 use api::v1::{Mutation, OpType, Rows, SemanticType};
411 use common_recordbatch::DfRecordBatch;
412 use common_time::Timestamp;
413 use datatypes::arrow::array::{ArrayRef, Float64Array, RecordBatch, TimestampMillisecondArray};
414 use datatypes::arrow_array::StringArray;
415 use datatypes::data_type::ConcreteDataType;
416 use datatypes::prelude::{ScalarVector, Vector};
417 use datatypes::schema::ColumnSchema;
418 use datatypes::value::Value;
419 use datatypes::vectors::TimestampMillisecondVector;
420 use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
421 use store_api::storage::{RegionId, SequenceNumber, SequenceRange};
422
423 use super::*;
424 use crate::region::options::MergeMode;
425 use crate::test_util::column_metadata_to_column_schema;
426
427 fn new_test_metadata() -> RegionMetadataRef {
428 let mut builder = RegionMetadataBuilder::new(1.into());
429 builder
430 .push_column_metadata(ColumnMetadata {
431 column_schema: ColumnSchema::new(
432 "ts",
433 ConcreteDataType::timestamp_millisecond_datatype(),
434 false,
435 ),
436 semantic_type: SemanticType::Timestamp,
437 column_id: 1,
438 })
439 .push_column_metadata(ColumnMetadata {
440 column_schema: ColumnSchema::new("f1", ConcreteDataType::float64_datatype(), true),
441 semantic_type: SemanticType::Field,
442 column_id: 2,
443 })
444 .push_column_metadata(ColumnMetadata {
445 column_schema: ColumnSchema::new("f2", ConcreteDataType::string_datatype(), true),
446 semantic_type: SemanticType::Field,
447 column_id: 3,
448 });
449 Arc::new(builder.build().unwrap())
450 }
451
452 fn new_test_memtable(dedup: bool, merge_mode: MergeMode) -> SimpleBulkMemtable {
453 SimpleBulkMemtable::new(1, new_test_metadata(), None, dedup, merge_mode)
454 }
455
456 fn build_key_values(
457 metadata: &RegionMetadataRef,
458 sequence: SequenceNumber,
459 row_values: &[(i64, f64, String)],
460 op_type: OpType,
461 ) -> KeyValues {
462 let column_schemas: Vec<_> = metadata
463 .column_metadatas
464 .iter()
465 .map(column_metadata_to_column_schema)
466 .collect();
467
468 let rows: Vec<_> = row_values
469 .iter()
470 .map(|(ts, f1, f2)| {
471 row(vec![
472 ValueData::TimestampMillisecondValue(*ts),
473 ValueData::F64Value(*f1),
474 ValueData::StringValue(f2.clone()),
475 ])
476 })
477 .collect();
478 let mutation = Mutation {
479 op_type: op_type as i32,
480 sequence,
481 rows: Some(Rows {
482 schema: column_schemas,
483 rows,
484 }),
485 write_hint: None,
486 };
487 KeyValues::new(metadata, mutation).unwrap()
488 }
489
490 #[test]
491 fn test_write_and_iter() {
492 let memtable = new_test_memtable(false, MergeMode::LastRow);
493 memtable
494 .write(&build_key_values(
495 &memtable.region_metadata,
496 0,
497 &[(1, 1.0, "a".to_string())],
498 OpType::Put,
499 ))
500 .unwrap();
501 memtable
502 .write(&build_key_values(
503 &memtable.region_metadata,
504 1,
505 &[(2, 2.0, "b".to_string())],
506 OpType::Put,
507 ))
508 .unwrap();
509
510 let mut iter = memtable
511 .ranges(None, RangesOptions::default())
512 .unwrap()
513 .build(None)
514 .unwrap();
515 let batch = iter.next().unwrap().unwrap();
516 assert_eq!(2, batch.num_rows());
517 assert_eq!(2, batch.fields().len());
518 let ts_v = batch
519 .timestamps()
520 .as_any()
521 .downcast_ref::<TimestampMillisecondVector>()
522 .unwrap();
523 assert_eq!(Value::Timestamp(Timestamp::new_millisecond(1)), ts_v.get(0));
524 assert_eq!(Value::Timestamp(Timestamp::new_millisecond(2)), ts_v.get(1));
525 }
526
527 #[test]
528 fn test_projection() {
529 let memtable = new_test_memtable(false, MergeMode::LastRow);
530 memtable
531 .write(&build_key_values(
532 &memtable.region_metadata,
533 0,
534 &[(1, 1.0, "a".to_string())],
535 OpType::Put,
536 ))
537 .unwrap();
538
539 let mut iter = memtable
540 .ranges(None, RangesOptions::default())
541 .unwrap()
542 .build(None)
543 .unwrap();
544 let batch = iter.next().unwrap().unwrap();
545 assert_eq!(1, batch.num_rows());
546 assert_eq!(2, batch.fields().len());
547
548 let ts_v = batch
549 .timestamps()
550 .as_any()
551 .downcast_ref::<TimestampMillisecondVector>()
552 .unwrap();
553 assert_eq!(Value::Timestamp(Timestamp::new_millisecond(1)), ts_v.get(0));
554
555 let projection = vec![2];
557 let mut iter = memtable
558 .ranges(Some(&projection), RangesOptions::default())
559 .unwrap()
560 .build(None)
561 .unwrap();
562 let batch = iter.next().unwrap().unwrap();
563
564 assert_eq!(1, batch.num_rows());
565 assert_eq!(1, batch.fields().len()); assert_eq!(2, batch.fields()[0].column_id);
567 }
568
569 #[test]
570 fn test_dedup() {
571 let memtable = new_test_memtable(true, MergeMode::LastRow);
572 memtable
573 .write(&build_key_values(
574 &memtable.region_metadata,
575 0,
576 &[(1, 1.0, "a".to_string())],
577 OpType::Put,
578 ))
579 .unwrap();
580 memtable
581 .write(&build_key_values(
582 &memtable.region_metadata,
583 1,
584 &[(1, 2.0, "b".to_string())],
585 OpType::Put,
586 ))
587 .unwrap();
588 let mut iter = memtable
589 .ranges(None, RangesOptions::default())
590 .unwrap()
591 .build(None)
592 .unwrap();
593 let batch = iter.next().unwrap().unwrap();
594
595 assert_eq!(1, batch.num_rows()); assert_eq!(2.0, batch.fields()[0].data.get(0).as_f64_lossy().unwrap()); }
598
599 #[test]
600 fn test_write_one() {
601 let memtable = new_test_memtable(false, MergeMode::LastRow);
602 let kvs = build_key_values(
603 &memtable.region_metadata,
604 0,
605 &[(1, 1.0, "a".to_string())],
606 OpType::Put,
607 );
608 let kv = kvs.iter().next().unwrap();
609 memtable.write_one(kv).unwrap();
610
611 let mut iter = memtable
612 .ranges(None, RangesOptions::default())
613 .unwrap()
614 .build(None)
615 .unwrap();
616 let batch = iter.next().unwrap().unwrap();
617 assert_eq!(1, batch.num_rows());
618 }
619
620 #[tokio::test]
621 async fn test_single_range() {
622 let memtable = new_test_memtable(true, MergeMode::LastRow);
623 let kvs = build_key_values(
624 &memtable.region_metadata,
625 0,
626 &[(1, 1.0, "a".to_string())],
627 OpType::Put,
628 );
629 memtable.write_one(kvs.iter().next().unwrap()).unwrap();
630
631 let kvs = build_key_values(
632 &memtable.region_metadata,
633 1,
634 &[(1, 2.0, "b".to_string())],
635 OpType::Put,
636 );
637 memtable.write_one(kvs.iter().next().unwrap()).unwrap();
638 memtable.freeze().unwrap();
639
640 let ranges = memtable.ranges(None, RangesOptions::default()).unwrap();
641 assert_eq!(ranges.ranges.len(), 1);
642 let range = ranges.ranges.into_values().next().unwrap();
643 let mut reader = range.context.builder.build(None).unwrap();
644
645 let mut num_rows = 0;
646 while let Some(b) = reader.next().transpose().unwrap() {
647 num_rows += b.num_rows();
648 assert_eq!(b.fields()[1].data.get(0).as_string(), Some("b".to_string()));
649 }
650 assert_eq!(num_rows, 1);
651 }
652
653 #[test]
654 fn test_write_bulk() {
655 let memtable = new_test_memtable(false, MergeMode::LastRow);
656 let arrow_schema = memtable.schema().schema.arrow_schema().clone();
657 let arrays = vec![
658 Arc::new(TimestampMillisecondArray::from(vec![1, 2])) as ArrayRef,
659 Arc::new(Float64Array::from(vec![1.0, 2.0])) as ArrayRef,
660 Arc::new(StringArray::from(vec!["a", "b"])) as ArrayRef,
661 ];
662 let rb = DfRecordBatch::try_new(arrow_schema, arrays).unwrap();
663
664 let part = BulkPart {
665 batch: rb,
666 sequence: 1,
667 min_timestamp: 1,
668 max_timestamp: 2,
669 timestamp_index: 0,
670 raw_data: None,
671 };
672 memtable.write_bulk(part).unwrap();
673
674 let mut iter = memtable
675 .ranges(None, RangesOptions::default())
676 .unwrap()
677 .build(None)
678 .unwrap();
679 let batch = iter.next().unwrap().unwrap();
680 assert_eq!(2, batch.num_rows());
681
682 let stats = memtable.stats();
683 assert_eq!(1, stats.max_sequence);
684 assert_eq!(2, stats.num_rows);
685 assert_eq!(
686 Some((Timestamp::new_millisecond(1), Timestamp::new_millisecond(2))),
687 stats.time_range
688 );
689
690 let kvs = build_key_values(
691 &memtable.region_metadata,
692 2,
693 &[(3, 3.0, "c".to_string())],
694 OpType::Put,
695 );
696 memtable.write(&kvs).unwrap();
697 let mut iter = memtable
698 .ranges(None, RangesOptions::default())
699 .unwrap()
700 .build(None)
701 .unwrap();
702 let batch = iter.next().unwrap().unwrap();
703 assert_eq!(3, batch.num_rows());
704 assert_eq!(
705 vec![1, 2, 3],
706 batch
707 .timestamps()
708 .as_any()
709 .downcast_ref::<TimestampMillisecondVector>()
710 .unwrap()
711 .iter_data()
712 .map(|t| { t.unwrap().0.value() })
713 .collect::<Vec<_>>()
714 );
715 }
716
717 #[test]
718 fn test_is_empty() {
719 let memtable = new_test_memtable(false, MergeMode::LastRow);
720 assert!(memtable.is_empty());
721
722 memtable
723 .write(&build_key_values(
724 &memtable.region_metadata,
725 0,
726 &[(1, 1.0, "a".to_string())],
727 OpType::Put,
728 ))
729 .unwrap();
730 assert!(!memtable.is_empty());
731 }
732
733 #[test]
734 fn test_stats() {
735 let memtable = new_test_memtable(false, MergeMode::LastRow);
736 let stats = memtable.stats();
737 assert_eq!(0, stats.num_rows);
738 assert!(stats.time_range.is_none());
739
740 memtable
741 .write(&build_key_values(
742 &memtable.region_metadata,
743 0,
744 &[(1, 1.0, "a".to_string())],
745 OpType::Put,
746 ))
747 .unwrap();
748 let stats = memtable.stats();
749 assert_eq!(1, stats.num_rows);
750 assert!(stats.time_range.is_some());
751 }
752
753 #[test]
754 fn test_fork() {
755 let memtable = new_test_memtable(false, MergeMode::LastRow);
756 memtable
757 .write(&build_key_values(
758 &memtable.region_metadata,
759 0,
760 &[(1, 1.0, "a".to_string())],
761 OpType::Put,
762 ))
763 .unwrap();
764
765 let forked = memtable.fork(2, &memtable.region_metadata);
766 assert!(forked.is_empty());
767 }
768
769 #[test]
770 fn test_sequence_filter() {
771 let memtable = new_test_memtable(false, MergeMode::LastRow);
772 memtable
773 .write(&build_key_values(
774 &memtable.region_metadata,
775 0,
776 &[(1, 1.0, "a".to_string())],
777 OpType::Put,
778 ))
779 .unwrap();
780 memtable
781 .write(&build_key_values(
782 &memtable.region_metadata,
783 1,
784 &[(2, 2.0, "b".to_string())],
785 OpType::Put,
786 ))
787 .unwrap();
788
789 let mut iter = memtable
791 .ranges(
792 None,
793 RangesOptions {
794 sequence: Some(SequenceRange::LtEq { max: 0 }),
795 ..Default::default()
796 },
797 )
798 .unwrap()
799 .build(None)
800 .unwrap();
801 let batch = iter.next().unwrap().unwrap();
802 assert_eq!(1, batch.num_rows());
803 assert_eq!(1.0, batch.fields()[0].data.get(0).as_f64_lossy().unwrap());
804 }
805
806 fn rb_with_large_string(
807 ts: i64,
808 string_len: i32,
809 region_meta: &RegionMetadataRef,
810 ) -> RecordBatch {
811 let schema = region_meta.schema.arrow_schema().clone();
812 RecordBatch::try_new(
813 schema,
814 vec![
815 Arc::new(StringArray::from_iter_values(
816 ["a".repeat(string_len as usize).clone()].into_iter(),
817 )) as ArrayRef,
818 Arc::new(TimestampMillisecondArray::from_iter_values(
819 [ts].into_iter(),
820 )) as ArrayRef,
821 ],
822 )
823 .unwrap()
824 }
825
826 #[test]
827 fn test_write_read_large_string() {
828 let mut builder = RegionMetadataBuilder::new(RegionId::new(123, 456));
829 builder
830 .push_column_metadata(ColumnMetadata {
831 column_schema: ColumnSchema::new("k0", ConcreteDataType::string_datatype(), false),
832 semantic_type: SemanticType::Field,
833 column_id: 0,
834 })
835 .push_column_metadata(ColumnMetadata {
836 column_schema: ColumnSchema::new(
837 "ts",
838 ConcreteDataType::timestamp_millisecond_datatype(),
839 false,
840 ),
841 semantic_type: SemanticType::Timestamp,
842 column_id: 1,
843 })
844 .primary_key(vec![]);
845 let region_meta = Arc::new(builder.build().unwrap());
846 let memtable =
847 SimpleBulkMemtable::new(0, region_meta.clone(), None, true, MergeMode::LastRow);
848 memtable
849 .write_bulk(BulkPart {
850 batch: rb_with_large_string(0, i32::MAX, ®ion_meta),
851 max_timestamp: 0,
852 min_timestamp: 0,
853 sequence: 0,
854 timestamp_index: 1,
855 raw_data: None,
856 })
857 .unwrap();
858
859 memtable.freeze().unwrap();
860 memtable
861 .write_bulk(BulkPart {
862 batch: rb_with_large_string(1, 3, ®ion_meta),
863 max_timestamp: 1,
864 min_timestamp: 1,
865 sequence: 1,
866 timestamp_index: 1,
867 raw_data: None,
868 })
869 .unwrap();
870 let MemtableRanges { ranges, .. } =
871 memtable.ranges(None, RangesOptions::default()).unwrap();
872 let mut rows = 0;
873 for range in ranges.into_values() {
874 let iter = range.build_iter().unwrap();
875 for batch in iter {
876 rows += batch.unwrap().num_rows();
877 }
878 }
879 assert_eq!(rows, 2);
880 }
881
882 #[test]
883 fn test_build_record_batch_iter_from_memtable() {
884 let memtable = new_test_memtable(false, MergeMode::LastRow);
885
886 let kvs = build_key_values(
887 &memtable.region_metadata,
888 0,
889 &[(1, 1.0, "a".to_string()), (2, 2.0, "b".to_string())],
890 OpType::Put,
891 );
892 memtable.write(&kvs).unwrap();
893
894 let read_column_ids: Vec<ColumnId> = memtable
895 .region_metadata
896 .column_metadatas
897 .iter()
898 .map(|c| c.column_id)
899 .collect();
900 let ranges = memtable
901 .ranges(Some(&read_column_ids), RangesOptions::default())
902 .unwrap();
903 assert!(!ranges.ranges.is_empty());
904
905 let mut total_rows = 0;
906 for range in ranges.ranges.into_values() {
907 let mut iter = range.build_record_batch_iter(None, None).unwrap();
908 while let Some(rb) = iter.next().transpose().unwrap() {
909 total_rows += rb.num_rows();
910 let schema = rb.schema();
911 let column_names: Vec<_> =
912 schema.fields().iter().map(|f| f.name().as_str()).collect();
913 assert_eq!(
914 column_names,
915 vec!["f1", "f2", "ts", "__primary_key", "__sequence", "__op_type"]
916 );
917 }
918 }
919 assert_eq!(2, total_rows);
920 }
921}