1use std::collections::HashSet;
16use std::fmt::{Debug, Formatter};
17use std::sync::atomic::{AtomicI64, AtomicU64, AtomicUsize, Ordering};
18use std::sync::{Arc, RwLock};
19
20use api::v1::OpType;
21use datatypes::vectors::Helper;
22use mito_codec::key_values::KeyValue;
23use snafu::{OptionExt, ResultExt};
24use store_api::metadata::RegionMetadataRef;
25use store_api::storage::{ColumnId, SequenceNumber};
26use table::predicate::Predicate;
27
28use crate::flush::WriteBufferManagerRef;
29use crate::memtable::bulk::part::BulkPart;
30use crate::memtable::stats::WriteMetrics;
31use crate::memtable::time_series::{Series, Values};
32use crate::memtable::{
33 AllocTracker, BoxedBatchIterator, IterBuilder, KeyValues, Memtable, MemtableId, MemtableRange,
34 MemtableRangeContext, MemtableRanges, MemtableRef, MemtableStats,
35};
36use crate::metrics::MEMTABLE_ACTIVE_SERIES_COUNT;
37use crate::read::dedup::LastNonNullIter;
38use crate::read::scan_region::PredicateGroup;
39use crate::read::Batch;
40use crate::region::options::MergeMode;
41use crate::{error, metrics};
42
43pub struct SimpleBulkMemtable {
44 id: MemtableId,
45 region_metadata: RegionMetadataRef,
46 alloc_tracker: AllocTracker,
47 max_timestamp: AtomicI64,
48 min_timestamp: AtomicI64,
49 max_sequence: AtomicU64,
50 dedup: bool,
51 merge_mode: MergeMode,
52 num_rows: AtomicUsize,
53 series: RwLock<Series>,
54}
55
56impl Drop for SimpleBulkMemtable {
57 fn drop(&mut self) {
58 MEMTABLE_ACTIVE_SERIES_COUNT.dec();
59 }
60}
61
62impl SimpleBulkMemtable {
63 pub(crate) fn new(
64 id: MemtableId,
65 region_metadata: RegionMetadataRef,
66 write_buffer_manager: Option<WriteBufferManagerRef>,
67 dedup: bool,
68 merge_mode: MergeMode,
69 ) -> Self {
70 let dedup = if merge_mode == MergeMode::LastNonNull {
71 false
72 } else {
73 dedup
74 };
75 let series = RwLock::new(Series::with_capacity(®ion_metadata, 1024));
76
77 Self {
78 id,
79 region_metadata,
80 alloc_tracker: AllocTracker::new(write_buffer_manager),
81 max_timestamp: AtomicI64::new(i64::MIN),
82 min_timestamp: AtomicI64::new(i64::MAX),
83 max_sequence: AtomicU64::new(0),
84 dedup,
85 merge_mode,
86 num_rows: AtomicUsize::new(0),
87 series,
88 }
89 }
90
91 fn build_projection(&self, projection: Option<&[ColumnId]>) -> HashSet<ColumnId> {
92 if let Some(projection) = projection {
93 projection.iter().copied().collect()
94 } else {
95 self.region_metadata
96 .field_columns()
97 .map(|c| c.column_id)
98 .collect()
99 }
100 }
101
102 fn create_iter(
103 &self,
104 projection: Option<&[ColumnId]>,
105 sequence: Option<SequenceNumber>,
106 ) -> error::Result<BatchIterBuilder> {
107 let mut series = self.series.write().unwrap();
108
109 let values = if series.is_empty() {
110 None
111 } else {
112 Some(series.compact(&self.region_metadata)?.clone())
113 };
114
115 let projection = self.build_projection(projection);
116
117 Ok(BatchIterBuilder {
118 region_metadata: self.region_metadata.clone(),
119 values,
120 projection,
121 dedup: self.dedup,
122 sequence,
123 merge_mode: self.merge_mode,
124 })
125 }
126
127 fn write_key_value(&self, kv: KeyValue, stats: &mut WriteMetrics) {
128 let ts = kv.timestamp();
129 let sequence = kv.sequence();
130 let op_type = kv.op_type();
131 let mut series = self.series.write().unwrap();
132 let size = series.push(ts, sequence, op_type, kv.fields());
133 stats.value_bytes += size;
134 let ts = kv.timestamp().as_timestamp().unwrap().unwrap().value();
136 stats.min_ts = stats.min_ts.min(ts);
137 stats.max_ts = stats.max_ts.max(ts);
138 }
139
140 fn update_stats(&self, stats: WriteMetrics) {
142 self.alloc_tracker
143 .on_allocation(stats.key_bytes + stats.value_bytes);
144 self.num_rows.fetch_add(stats.num_rows, Ordering::SeqCst);
145 self.max_timestamp.fetch_max(stats.max_ts, Ordering::SeqCst);
146 self.min_timestamp.fetch_min(stats.min_ts, Ordering::SeqCst);
147 self.max_sequence
148 .fetch_max(stats.max_sequence, Ordering::SeqCst);
149 }
150
151 #[cfg(test)]
152 fn schema(&self) -> &RegionMetadataRef {
153 &self.region_metadata
154 }
155}
156
157impl Debug for SimpleBulkMemtable {
158 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
159 f.debug_struct("SimpleBulkMemtable").finish()
160 }
161}
162
163impl Memtable for SimpleBulkMemtable {
164 fn id(&self) -> MemtableId {
165 self.id
166 }
167
168 fn write(&self, kvs: &KeyValues) -> error::Result<()> {
169 let mut stats = WriteMetrics::default();
170 let max_sequence = kvs.max_sequence();
171 for kv in kvs.iter() {
172 self.write_key_value(kv, &mut stats);
173 }
174 stats.max_sequence = max_sequence;
175 stats.num_rows = kvs.num_rows();
176 self.update_stats(stats);
177 Ok(())
178 }
179
180 fn write_one(&self, kv: KeyValue) -> error::Result<()> {
181 debug_assert_eq!(0, kv.num_primary_keys());
182 let mut stats = WriteMetrics::default();
183 self.write_key_value(kv, &mut stats);
184 stats.num_rows = 1;
185 stats.max_sequence = kv.sequence();
186 self.update_stats(stats);
187 Ok(())
188 }
189
190 fn write_bulk(&self, part: BulkPart) -> error::Result<()> {
191 let rb = &part.batch;
192
193 let ts = Helper::try_into_vector(
194 rb.column_by_name(&self.region_metadata.time_index_column().column_schema.name)
195 .with_context(|| error::InvalidRequestSnafu {
196 region_id: self.region_metadata.region_id,
197 reason: "Timestamp not found",
198 })?,
199 )
200 .context(error::ConvertVectorSnafu)?;
201
202 let sequence = part.sequence;
203
204 let fields: Vec<_> = self
205 .region_metadata
206 .field_columns()
207 .map(|f| {
208 let array = rb.column_by_name(&f.column_schema.name).ok_or_else(|| {
209 error::InvalidRequestSnafu {
210 region_id: self.region_metadata.region_id,
211 reason: format!("Column {} not found", f.column_schema.name),
212 }
213 .build()
214 })?;
215 Helper::try_into_vector(array).context(error::ConvertVectorSnafu)
216 })
217 .collect::<error::Result<Vec<_>>>()?;
218
219 let mut series = self.series.write().unwrap();
220 let extend_timer = metrics::REGION_WORKER_HANDLE_WRITE_ELAPSED
221 .with_label_values(&["bulk_extend"])
222 .start_timer();
223 series.extend(ts, OpType::Put as u8, sequence, fields.into_iter())?;
224 extend_timer.observe_duration();
225
226 self.update_stats(WriteMetrics {
227 key_bytes: 0,
228 value_bytes: part.estimated_size(),
229 min_ts: part.min_ts,
230 max_ts: part.max_ts,
231 num_rows: part.num_rows(),
232 max_sequence: sequence,
233 });
234 Ok(())
235 }
236
237 fn iter(
238 &self,
239 projection: Option<&[ColumnId]>,
240 _predicate: Option<Predicate>,
241 sequence: Option<SequenceNumber>,
242 ) -> error::Result<BoxedBatchIterator> {
243 let iter = self.create_iter(projection, sequence)?.build()?;
244
245 if self.merge_mode == MergeMode::LastNonNull {
246 let iter = LastNonNullIter::new(iter);
247 Ok(Box::new(iter))
248 } else {
249 Ok(Box::new(iter))
250 }
251 }
252
253 fn ranges(
254 &self,
255 projection: Option<&[ColumnId]>,
256 predicate: PredicateGroup,
257 sequence: Option<SequenceNumber>,
258 ) -> error::Result<MemtableRanges> {
259 let builder = Box::new(self.create_iter(projection, sequence).unwrap());
260
261 let context = Arc::new(MemtableRangeContext::new(self.id, builder, predicate));
262 Ok(MemtableRanges {
263 ranges: [(0, MemtableRange::new(context))].into(),
264 stats: self.stats(),
265 })
266 }
267
268 fn is_empty(&self) -> bool {
269 self.series.read().unwrap().is_empty()
270 }
271
272 fn freeze(&self) -> error::Result<()> {
273 self.series.write().unwrap().freeze(&self.region_metadata);
274 Ok(())
275 }
276
277 fn stats(&self) -> MemtableStats {
278 let estimated_bytes = self.alloc_tracker.bytes_allocated();
279 let num_rows = self.num_rows.load(Ordering::Relaxed);
280 if num_rows == 0 {
281 return MemtableStats {
283 estimated_bytes,
284 time_range: None,
285 num_rows: 0,
286 num_ranges: 0,
287 max_sequence: 0,
288 series_count: 0,
289 };
290 }
291 let ts_type = self
292 .region_metadata
293 .time_index_column()
294 .column_schema
295 .data_type
296 .clone()
297 .as_timestamp()
298 .expect("Timestamp column must have timestamp type");
299 let max_timestamp = ts_type.create_timestamp(self.max_timestamp.load(Ordering::Relaxed));
300 let min_timestamp = ts_type.create_timestamp(self.min_timestamp.load(Ordering::Relaxed));
301 MemtableStats {
302 estimated_bytes,
303 time_range: Some((min_timestamp, max_timestamp)),
304 num_rows,
305 num_ranges: 1,
306 max_sequence: self.max_sequence.load(Ordering::Relaxed),
307 series_count: 1,
308 }
309 }
310
311 fn fork(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef {
312 Arc::new(Self::new(
313 id,
314 metadata.clone(),
315 self.alloc_tracker.write_buffer_manager(),
316 self.dedup,
317 self.merge_mode,
318 ))
319 }
320}
321
322#[derive(Clone)]
323struct BatchIterBuilder {
324 region_metadata: RegionMetadataRef,
325 values: Option<Values>,
326 projection: HashSet<ColumnId>,
327 sequence: Option<SequenceNumber>,
328 dedup: bool,
329 merge_mode: MergeMode,
330}
331
332impl IterBuilder for BatchIterBuilder {
333 fn build(&self) -> error::Result<BoxedBatchIterator> {
334 let Some(values) = self.values.clone() else {
335 return Ok(Box::new(Iter { batch: None }));
336 };
337
338 let maybe_batch = values
339 .to_batch(&[], &self.region_metadata, &self.projection, self.dedup)
340 .and_then(|mut b| {
341 b.filter_by_sequence(self.sequence)?;
342 Ok(b)
343 })
344 .map(Some)
345 .transpose();
346
347 let iter = Iter { batch: maybe_batch };
348
349 if self.merge_mode == MergeMode::LastNonNull {
350 Ok(Box::new(LastNonNullIter::new(iter)))
351 } else {
352 Ok(Box::new(iter))
353 }
354 }
355}
356
357struct Iter {
358 batch: Option<error::Result<Batch>>,
359}
360
361impl Iterator for Iter {
362 type Item = error::Result<Batch>;
363
364 fn next(&mut self) -> Option<Self::Item> {
365 self.batch.take()
366 }
367}
368
369#[cfg(test)]
370mod tests {
371 use std::sync::Arc;
372
373 use api::v1::value::ValueData;
374 use api::v1::{Mutation, OpType, Row, Rows, SemanticType};
375 use common_recordbatch::DfRecordBatch;
376 use common_time::Timestamp;
377 use datatypes::arrow::array::{ArrayRef, Float64Array, TimestampMillisecondArray};
378 use datatypes::arrow_array::StringArray;
379 use datatypes::data_type::ConcreteDataType;
380 use datatypes::prelude::{ScalarVector, Vector};
381 use datatypes::schema::ColumnSchema;
382 use datatypes::value::Value;
383 use datatypes::vectors::TimestampMillisecondVector;
384 use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
385 use store_api::storage::SequenceNumber;
386
387 use super::*;
388 use crate::region::options::MergeMode;
389 use crate::test_util::column_metadata_to_column_schema;
390
391 fn new_test_metadata() -> RegionMetadataRef {
392 let mut builder = RegionMetadataBuilder::new(1.into());
393 builder
394 .push_column_metadata(ColumnMetadata {
395 column_schema: ColumnSchema::new(
396 "ts",
397 ConcreteDataType::timestamp_millisecond_datatype(),
398 false,
399 ),
400 semantic_type: SemanticType::Timestamp,
401 column_id: 1,
402 })
403 .push_column_metadata(ColumnMetadata {
404 column_schema: ColumnSchema::new("f1", ConcreteDataType::float64_datatype(), true),
405 semantic_type: SemanticType::Field,
406 column_id: 2,
407 })
408 .push_column_metadata(ColumnMetadata {
409 column_schema: ColumnSchema::new("f2", ConcreteDataType::string_datatype(), true),
410 semantic_type: SemanticType::Field,
411 column_id: 3,
412 });
413 Arc::new(builder.build().unwrap())
414 }
415
416 fn new_test_memtable(dedup: bool, merge_mode: MergeMode) -> SimpleBulkMemtable {
417 SimpleBulkMemtable::new(1, new_test_metadata(), None, dedup, merge_mode)
418 }
419
420 fn build_key_values(
421 metadata: &RegionMetadataRef,
422 sequence: SequenceNumber,
423 row_values: &[(i64, f64, String)],
424 ) -> KeyValues {
425 let column_schemas: Vec<_> = metadata
426 .column_metadatas
427 .iter()
428 .map(column_metadata_to_column_schema)
429 .collect();
430
431 let rows: Vec<_> = row_values
432 .iter()
433 .map(|(ts, f1, f2)| Row {
434 values: vec![
435 api::v1::Value {
436 value_data: Some(ValueData::TimestampMillisecondValue(*ts)),
437 },
438 api::v1::Value {
439 value_data: Some(ValueData::F64Value(*f1)),
440 },
441 api::v1::Value {
442 value_data: Some(ValueData::StringValue(f2.clone())),
443 },
444 ],
445 })
446 .collect();
447 let mutation = Mutation {
448 op_type: OpType::Put as i32,
449 sequence,
450 rows: Some(Rows {
451 schema: column_schemas,
452 rows,
453 }),
454 write_hint: None,
455 };
456 KeyValues::new(metadata, mutation).unwrap()
457 }
458
459 #[test]
460 fn test_write_and_iter() {
461 let memtable = new_test_memtable(false, MergeMode::LastRow);
462 memtable
463 .write(&build_key_values(
464 &memtable.region_metadata,
465 0,
466 &[(1, 1.0, "a".to_string())],
467 ))
468 .unwrap();
469 memtable
470 .write(&build_key_values(
471 &memtable.region_metadata,
472 1,
473 &[(2, 2.0, "b".to_string())],
474 ))
475 .unwrap();
476
477 let mut iter = memtable.iter(None, None, None).unwrap();
478 let batch = iter.next().unwrap().unwrap();
479 assert_eq!(2, batch.num_rows());
480 assert_eq!(2, batch.fields().len());
481 let ts_v = batch
482 .timestamps()
483 .as_any()
484 .downcast_ref::<TimestampMillisecondVector>()
485 .unwrap();
486 assert_eq!(Value::Timestamp(Timestamp::new_millisecond(1)), ts_v.get(0));
487 assert_eq!(Value::Timestamp(Timestamp::new_millisecond(2)), ts_v.get(1));
488 }
489
490 #[test]
491 fn test_projection() {
492 let memtable = new_test_memtable(false, MergeMode::LastRow);
493 memtable
494 .write(&build_key_values(
495 &memtable.region_metadata,
496 0,
497 &[(1, 1.0, "a".to_string())],
498 ))
499 .unwrap();
500
501 let mut iter = memtable.iter(None, None, None).unwrap();
502 let batch = iter.next().unwrap().unwrap();
503 assert_eq!(1, batch.num_rows());
504 assert_eq!(2, batch.fields().len());
505
506 let ts_v = batch
507 .timestamps()
508 .as_any()
509 .downcast_ref::<TimestampMillisecondVector>()
510 .unwrap();
511 assert_eq!(Value::Timestamp(Timestamp::new_millisecond(1)), ts_v.get(0));
512
513 let projection = vec![2];
515 let mut iter = memtable.iter(Some(&projection), None, None).unwrap();
516 let batch = iter.next().unwrap().unwrap();
517
518 assert_eq!(1, batch.num_rows());
519 assert_eq!(1, batch.fields().len()); assert_eq!(2, batch.fields()[0].column_id);
521 }
522
523 #[test]
524 fn test_dedup() {
525 let memtable = new_test_memtable(true, MergeMode::LastRow);
526 memtable
527 .write(&build_key_values(
528 &memtable.region_metadata,
529 0,
530 &[(1, 1.0, "a".to_string())],
531 ))
532 .unwrap();
533 memtable
534 .write(&build_key_values(
535 &memtable.region_metadata,
536 1,
537 &[(1, 2.0, "b".to_string())],
538 ))
539 .unwrap();
540 let mut iter = memtable.iter(None, None, None).unwrap();
541 let batch = iter.next().unwrap().unwrap();
542
543 assert_eq!(1, batch.num_rows()); assert_eq!(2.0, batch.fields()[0].data.get(0).as_f64_lossy().unwrap()); }
546
547 #[test]
548 fn test_write_one() {
549 let memtable = new_test_memtable(false, MergeMode::LastRow);
550 let kvs = build_key_values(&memtable.region_metadata, 0, &[(1, 1.0, "a".to_string())]);
551 let kv = kvs.iter().next().unwrap();
552 memtable.write_one(kv).unwrap();
553
554 let mut iter = memtable.iter(None, None, None).unwrap();
555 let batch = iter.next().unwrap().unwrap();
556 assert_eq!(1, batch.num_rows());
557 }
558
559 #[test]
560 fn test_write_bulk() {
561 let memtable = new_test_memtable(false, MergeMode::LastRow);
562 let arrow_schema = memtable.schema().schema.arrow_schema().clone();
563 let arrays = vec![
564 Arc::new(TimestampMillisecondArray::from(vec![1, 2])) as ArrayRef,
565 Arc::new(Float64Array::from(vec![1.0, 2.0])) as ArrayRef,
566 Arc::new(StringArray::from(vec!["a", "b"])) as ArrayRef,
567 ];
568 let rb = DfRecordBatch::try_new(arrow_schema, arrays).unwrap();
569
570 let part = BulkPart {
571 batch: rb,
572 sequence: 1,
573 min_ts: 1,
574 max_ts: 2,
575 timestamp_index: 0,
576 raw_data: None,
577 };
578 memtable.write_bulk(part).unwrap();
579
580 let mut iter = memtable.iter(None, None, None).unwrap();
581 let batch = iter.next().unwrap().unwrap();
582 assert_eq!(2, batch.num_rows());
583
584 let stats = memtable.stats();
585 assert_eq!(1, stats.max_sequence);
586 assert_eq!(2, stats.num_rows);
587 assert_eq!(
588 Some((Timestamp::new_millisecond(1), Timestamp::new_millisecond(2))),
589 stats.time_range
590 );
591
592 let kvs = build_key_values(&memtable.region_metadata, 2, &[(3, 3.0, "c".to_string())]);
593 memtable.write(&kvs).unwrap();
594 let mut iter = memtable.iter(None, None, None).unwrap();
595 let batch = iter.next().unwrap().unwrap();
596 assert_eq!(3, batch.num_rows());
597 assert_eq!(
598 vec![1, 2, 3],
599 batch
600 .timestamps()
601 .as_any()
602 .downcast_ref::<TimestampMillisecondVector>()
603 .unwrap()
604 .iter_data()
605 .map(|t| { t.unwrap().0.value() })
606 .collect::<Vec<_>>()
607 );
608 }
609
610 #[test]
611 fn test_is_empty() {
612 let memtable = new_test_memtable(false, MergeMode::LastRow);
613 assert!(memtable.is_empty());
614
615 memtable
616 .write(&build_key_values(
617 &memtable.region_metadata,
618 0,
619 &[(1, 1.0, "a".to_string())],
620 ))
621 .unwrap();
622 assert!(!memtable.is_empty());
623 }
624
625 #[test]
626 fn test_stats() {
627 let memtable = new_test_memtable(false, MergeMode::LastRow);
628 let stats = memtable.stats();
629 assert_eq!(0, stats.num_rows);
630 assert!(stats.time_range.is_none());
631
632 memtable
633 .write(&build_key_values(
634 &memtable.region_metadata,
635 0,
636 &[(1, 1.0, "a".to_string())],
637 ))
638 .unwrap();
639 let stats = memtable.stats();
640 assert_eq!(1, stats.num_rows);
641 assert!(stats.time_range.is_some());
642 }
643
644 #[test]
645 fn test_fork() {
646 let memtable = new_test_memtable(false, MergeMode::LastRow);
647 memtable
648 .write(&build_key_values(
649 &memtable.region_metadata,
650 0,
651 &[(1, 1.0, "a".to_string())],
652 ))
653 .unwrap();
654
655 let forked = memtable.fork(2, &memtable.region_metadata);
656 assert!(forked.is_empty());
657 }
658
659 #[test]
660 fn test_sequence_filter() {
661 let memtable = new_test_memtable(false, MergeMode::LastRow);
662 memtable
663 .write(&build_key_values(
664 &memtable.region_metadata,
665 0,
666 &[(1, 1.0, "a".to_string())],
667 ))
668 .unwrap();
669 memtable
670 .write(&build_key_values(
671 &memtable.region_metadata,
672 1,
673 &[(2, 2.0, "b".to_string())],
674 ))
675 .unwrap();
676
677 let mut iter = memtable.iter(None, None, Some(0)).unwrap();
679 let batch = iter.next().unwrap().unwrap();
680 assert_eq!(1, batch.num_rows());
681 assert_eq!(1.0, batch.fields()[0].data.get(0).as_f64_lossy().unwrap());
682 }
683}