1use std::collections::HashSet;
16use std::fmt::{Debug, Formatter};
17use std::sync::atomic::{AtomicI64, AtomicU64, AtomicUsize, Ordering};
18use std::sync::{Arc, RwLock};
19
20use api::v1::OpType;
21use datatypes::vectors::Helper;
22use snafu::{OptionExt, ResultExt};
23use store_api::metadata::RegionMetadataRef;
24use store_api::storage::{ColumnId, SequenceNumber};
25use table::predicate::Predicate;
26
27use crate::flush::WriteBufferManagerRef;
28use crate::memtable::bulk::part::BulkPart;
29use crate::memtable::key_values::KeyValue;
30use crate::memtable::stats::WriteMetrics;
31use crate::memtable::time_series::{Series, Values};
32use crate::memtable::{
33 AllocTracker, BoxedBatchIterator, IterBuilder, KeyValues, Memtable, MemtableId, MemtableRange,
34 MemtableRangeContext, MemtableRanges, MemtableRef, MemtableStats,
35};
36use crate::read::dedup::LastNonNullIter;
37use crate::read::scan_region::PredicateGroup;
38use crate::read::Batch;
39use crate::region::options::MergeMode;
40use crate::{error, metrics};
41
42pub struct SimpleBulkMemtable {
43 id: MemtableId,
44 region_metadata: RegionMetadataRef,
45 alloc_tracker: AllocTracker,
46 max_timestamp: AtomicI64,
47 min_timestamp: AtomicI64,
48 max_sequence: AtomicU64,
49 dedup: bool,
50 merge_mode: MergeMode,
51 num_rows: AtomicUsize,
52 series: RwLock<Series>,
53}
54
55impl SimpleBulkMemtable {
56 pub(crate) fn new(
57 id: MemtableId,
58 region_metadata: RegionMetadataRef,
59 write_buffer_manager: Option<WriteBufferManagerRef>,
60 dedup: bool,
61 merge_mode: MergeMode,
62 ) -> Self {
63 let dedup = if merge_mode == MergeMode::LastNonNull {
64 false
65 } else {
66 dedup
67 };
68 let series = RwLock::new(Series::new(®ion_metadata));
69
70 Self {
71 id,
72 region_metadata,
73 alloc_tracker: AllocTracker::new(write_buffer_manager),
74 max_timestamp: AtomicI64::new(i64::MIN),
75 min_timestamp: AtomicI64::new(i64::MAX),
76 max_sequence: AtomicU64::new(0),
77 dedup,
78 merge_mode,
79 num_rows: AtomicUsize::new(0),
80 series,
81 }
82 }
83
84 fn build_projection(&self, projection: Option<&[ColumnId]>) -> HashSet<ColumnId> {
85 if let Some(projection) = projection {
86 projection.iter().copied().collect()
87 } else {
88 self.region_metadata
89 .field_columns()
90 .map(|c| c.column_id)
91 .collect()
92 }
93 }
94
95 fn create_iter(
96 &self,
97 projection: Option<&[ColumnId]>,
98 sequence: Option<SequenceNumber>,
99 ) -> error::Result<BatchIterBuilder> {
100 let mut series = self.series.write().unwrap();
101
102 let values = if series.is_empty() {
103 None
104 } else {
105 Some(series.compact(&self.region_metadata)?.clone())
106 };
107
108 let projection = self.build_projection(projection);
109
110 Ok(BatchIterBuilder {
111 region_metadata: self.region_metadata.clone(),
112 values,
113 projection,
114 dedup: self.dedup,
115 sequence,
116 merge_mode: self.merge_mode,
117 })
118 }
119
120 fn write_key_value(&self, kv: KeyValue, stats: &mut WriteMetrics) {
121 let ts = kv.timestamp();
122 let sequence = kv.sequence();
123 let op_type = kv.op_type();
124 let mut series = self.series.write().unwrap();
125 let size = series.push(ts, sequence, op_type, kv.fields());
126 stats.value_bytes += size;
127 let ts = kv.timestamp().as_timestamp().unwrap().unwrap().value();
129 stats.min_ts = stats.min_ts.min(ts);
130 stats.max_ts = stats.max_ts.max(ts);
131 }
132
133 fn update_stats(&self, stats: WriteMetrics) {
135 self.alloc_tracker
136 .on_allocation(stats.key_bytes + stats.value_bytes);
137 self.num_rows.fetch_add(stats.num_rows, Ordering::SeqCst);
138 self.max_timestamp.fetch_max(stats.max_ts, Ordering::SeqCst);
139 self.min_timestamp.fetch_min(stats.min_ts, Ordering::SeqCst);
140 self.max_sequence
141 .fetch_max(stats.max_sequence, Ordering::SeqCst);
142 }
143
144 #[cfg(test)]
145 fn schema(&self) -> &RegionMetadataRef {
146 &self.region_metadata
147 }
148}
149
150impl Debug for SimpleBulkMemtable {
151 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
152 f.debug_struct("SimpleBulkMemtable").finish()
153 }
154}
155
156impl Memtable for SimpleBulkMemtable {
157 fn id(&self) -> MemtableId {
158 self.id
159 }
160
161 fn write(&self, kvs: &KeyValues) -> error::Result<()> {
162 let mut stats = WriteMetrics::default();
163 let max_sequence = kvs.max_sequence();
164 for kv in kvs.iter() {
165 self.write_key_value(kv, &mut stats);
166 }
167 stats.max_sequence = max_sequence;
168 stats.num_rows = kvs.num_rows();
169 self.update_stats(stats);
170 Ok(())
171 }
172
173 fn write_one(&self, kv: KeyValue) -> error::Result<()> {
174 debug_assert_eq!(0, kv.num_primary_keys());
175 let mut stats = WriteMetrics::default();
176 self.write_key_value(kv, &mut stats);
177 stats.num_rows = 1;
178 stats.max_sequence = kv.sequence();
179 self.update_stats(stats);
180 Ok(())
181 }
182
183 fn write_bulk(&self, part: BulkPart) -> error::Result<()> {
184 let rb = &part.batch;
185
186 let ts = Helper::try_into_vector(
187 rb.column_by_name(&self.region_metadata.time_index_column().column_schema.name)
188 .with_context(|| error::InvalidRequestSnafu {
189 region_id: self.region_metadata.region_id,
190 reason: "Timestamp not found",
191 })?,
192 )
193 .context(error::ConvertVectorSnafu)?;
194
195 let sequence = part.sequence;
196
197 let fields: Vec<_> = self
198 .region_metadata
199 .field_columns()
200 .map(|f| {
201 let array = rb.column_by_name(&f.column_schema.name).ok_or_else(|| {
202 error::InvalidRequestSnafu {
203 region_id: self.region_metadata.region_id,
204 reason: format!("Column {} not found", f.column_schema.name),
205 }
206 .build()
207 })?;
208 Helper::try_into_vector(array).context(error::ConvertVectorSnafu)
209 })
210 .collect::<error::Result<Vec<_>>>()?;
211
212 let mut series = self.series.write().unwrap();
213 let extend_timer = metrics::REGION_WORKER_HANDLE_WRITE_ELAPSED
214 .with_label_values(&["bulk_extend"])
215 .start_timer();
216 series.extend(ts, OpType::Put as u8, sequence, fields.into_iter())?;
217 extend_timer.observe_duration();
218
219 self.update_stats(WriteMetrics {
220 key_bytes: 0,
221 value_bytes: part.estimated_size(),
222 min_ts: part.min_ts,
223 max_ts: part.max_ts,
224 num_rows: part.num_rows,
225 max_sequence: sequence,
226 });
227 Ok(())
228 }
229
230 fn iter(
231 &self,
232 projection: Option<&[ColumnId]>,
233 _predicate: Option<Predicate>,
234 sequence: Option<SequenceNumber>,
235 ) -> error::Result<BoxedBatchIterator> {
236 let iter = self.create_iter(projection, sequence)?.build()?;
237
238 if self.merge_mode == MergeMode::LastNonNull {
239 let iter = LastNonNullIter::new(iter);
240 Ok(Box::new(iter))
241 } else {
242 Ok(Box::new(iter))
243 }
244 }
245
246 fn ranges(
247 &self,
248 projection: Option<&[ColumnId]>,
249 predicate: PredicateGroup,
250 sequence: Option<SequenceNumber>,
251 ) -> error::Result<MemtableRanges> {
252 let builder = Box::new(self.create_iter(projection, sequence).unwrap());
253
254 let context = Arc::new(MemtableRangeContext::new(self.id, builder, predicate));
255 Ok(MemtableRanges {
256 ranges: [(0, MemtableRange::new(context))].into(),
257 stats: self.stats(),
258 })
259 }
260
261 fn is_empty(&self) -> bool {
262 self.series.read().unwrap().is_empty()
263 }
264
265 fn freeze(&self) -> error::Result<()> {
266 self.series.write().unwrap().freeze(&self.region_metadata);
267 Ok(())
268 }
269
270 fn stats(&self) -> MemtableStats {
271 let estimated_bytes = self.alloc_tracker.bytes_allocated();
272 let num_rows = self.num_rows.load(Ordering::Relaxed);
273 if num_rows == 0 {
274 return MemtableStats {
276 estimated_bytes,
277 time_range: None,
278 num_rows: 0,
279 num_ranges: 0,
280 max_sequence: 0,
281 };
282 }
283 let ts_type = self
284 .region_metadata
285 .time_index_column()
286 .column_schema
287 .data_type
288 .clone()
289 .as_timestamp()
290 .expect("Timestamp column must have timestamp type");
291 let max_timestamp = ts_type.create_timestamp(self.max_timestamp.load(Ordering::Relaxed));
292 let min_timestamp = ts_type.create_timestamp(self.min_timestamp.load(Ordering::Relaxed));
293 MemtableStats {
294 estimated_bytes,
295 time_range: Some((min_timestamp, max_timestamp)),
296 num_rows,
297 num_ranges: 1,
298 max_sequence: self.max_sequence.load(Ordering::Relaxed),
299 }
300 }
301
302 fn fork(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef {
303 Arc::new(Self::new(
304 id,
305 metadata.clone(),
306 self.alloc_tracker.write_buffer_manager(),
307 self.dedup,
308 self.merge_mode,
309 ))
310 }
311}
312
313#[derive(Clone)]
314struct BatchIterBuilder {
315 region_metadata: RegionMetadataRef,
316 values: Option<Values>,
317 projection: HashSet<ColumnId>,
318 sequence: Option<SequenceNumber>,
319 dedup: bool,
320 merge_mode: MergeMode,
321}
322
323impl IterBuilder for BatchIterBuilder {
324 fn build(&self) -> error::Result<BoxedBatchIterator> {
325 let Some(values) = self.values.clone() else {
326 return Ok(Box::new(Iter { batch: None }));
327 };
328
329 let maybe_batch = values
330 .to_batch(&[], &self.region_metadata, &self.projection, self.dedup)
331 .and_then(|mut b| {
332 b.filter_by_sequence(self.sequence)?;
333 Ok(b)
334 })
335 .map(Some)
336 .transpose();
337
338 let iter = Iter { batch: maybe_batch };
339
340 if self.merge_mode == MergeMode::LastNonNull {
341 Ok(Box::new(LastNonNullIter::new(iter)))
342 } else {
343 Ok(Box::new(iter))
344 }
345 }
346}
347
348struct Iter {
349 batch: Option<error::Result<Batch>>,
350}
351
352impl Iterator for Iter {
353 type Item = error::Result<Batch>;
354
355 fn next(&mut self) -> Option<Self::Item> {
356 self.batch.take()
357 }
358}
359
360#[cfg(test)]
361mod tests {
362 use std::sync::Arc;
363
364 use api::v1::value::ValueData;
365 use api::v1::{Mutation, OpType, Row, Rows, SemanticType};
366 use common_recordbatch::DfRecordBatch;
367 use common_time::Timestamp;
368 use datatypes::arrow::array::{ArrayRef, Float64Array, TimestampMillisecondArray};
369 use datatypes::arrow_array::StringArray;
370 use datatypes::data_type::ConcreteDataType;
371 use datatypes::prelude::{ScalarVector, Vector};
372 use datatypes::schema::ColumnSchema;
373 use datatypes::value::Value;
374 use datatypes::vectors::TimestampMillisecondVector;
375 use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
376 use store_api::storage::SequenceNumber;
377
378 use super::*;
379 use crate::region::options::MergeMode;
380 use crate::test_util::column_metadata_to_column_schema;
381
382 fn new_test_metadata() -> RegionMetadataRef {
383 let mut builder = RegionMetadataBuilder::new(1.into());
384 builder
385 .push_column_metadata(ColumnMetadata {
386 column_schema: ColumnSchema::new(
387 "ts",
388 ConcreteDataType::timestamp_millisecond_datatype(),
389 false,
390 ),
391 semantic_type: SemanticType::Timestamp,
392 column_id: 1,
393 })
394 .push_column_metadata(ColumnMetadata {
395 column_schema: ColumnSchema::new("f1", ConcreteDataType::float64_datatype(), true),
396 semantic_type: SemanticType::Field,
397 column_id: 2,
398 })
399 .push_column_metadata(ColumnMetadata {
400 column_schema: ColumnSchema::new("f2", ConcreteDataType::string_datatype(), true),
401 semantic_type: SemanticType::Field,
402 column_id: 3,
403 });
404 Arc::new(builder.build().unwrap())
405 }
406
407 fn new_test_memtable(dedup: bool, merge_mode: MergeMode) -> SimpleBulkMemtable {
408 SimpleBulkMemtable::new(1, new_test_metadata(), None, dedup, merge_mode)
409 }
410
411 fn build_key_values(
412 metadata: &RegionMetadataRef,
413 sequence: SequenceNumber,
414 row_values: &[(i64, f64, String)],
415 ) -> KeyValues {
416 let column_schemas: Vec<_> = metadata
417 .column_metadatas
418 .iter()
419 .map(column_metadata_to_column_schema)
420 .collect();
421
422 let rows: Vec<_> = row_values
423 .iter()
424 .map(|(ts, f1, f2)| Row {
425 values: vec![
426 api::v1::Value {
427 value_data: Some(ValueData::TimestampMillisecondValue(*ts)),
428 },
429 api::v1::Value {
430 value_data: Some(ValueData::F64Value(*f1)),
431 },
432 api::v1::Value {
433 value_data: Some(ValueData::StringValue(f2.clone())),
434 },
435 ],
436 })
437 .collect();
438 let mutation = Mutation {
439 op_type: OpType::Put as i32,
440 sequence,
441 rows: Some(Rows {
442 schema: column_schemas,
443 rows,
444 }),
445 write_hint: None,
446 };
447 KeyValues::new(metadata, mutation).unwrap()
448 }
449
450 #[test]
451 fn test_write_and_iter() {
452 let memtable = new_test_memtable(false, MergeMode::LastRow);
453 memtable
454 .write(&build_key_values(
455 &memtable.region_metadata,
456 0,
457 &[(1, 1.0, "a".to_string())],
458 ))
459 .unwrap();
460 memtable
461 .write(&build_key_values(
462 &memtable.region_metadata,
463 1,
464 &[(2, 2.0, "b".to_string())],
465 ))
466 .unwrap();
467
468 let mut iter = memtable.iter(None, None, None).unwrap();
469 let batch = iter.next().unwrap().unwrap();
470 assert_eq!(2, batch.num_rows());
471 assert_eq!(2, batch.fields().len());
472 let ts_v = batch
473 .timestamps()
474 .as_any()
475 .downcast_ref::<TimestampMillisecondVector>()
476 .unwrap();
477 assert_eq!(Value::Timestamp(Timestamp::new_millisecond(1)), ts_v.get(0));
478 assert_eq!(Value::Timestamp(Timestamp::new_millisecond(2)), ts_v.get(1));
479 }
480
481 #[test]
482 fn test_projection() {
483 let memtable = new_test_memtable(false, MergeMode::LastRow);
484 memtable
485 .write(&build_key_values(
486 &memtable.region_metadata,
487 0,
488 &[(1, 1.0, "a".to_string())],
489 ))
490 .unwrap();
491
492 let mut iter = memtable.iter(None, None, None).unwrap();
493 let batch = iter.next().unwrap().unwrap();
494 assert_eq!(1, batch.num_rows());
495 assert_eq!(2, batch.fields().len());
496
497 let ts_v = batch
498 .timestamps()
499 .as_any()
500 .downcast_ref::<TimestampMillisecondVector>()
501 .unwrap();
502 assert_eq!(Value::Timestamp(Timestamp::new_millisecond(1)), ts_v.get(0));
503
504 let projection = vec![2];
506 let mut iter = memtable.iter(Some(&projection), None, None).unwrap();
507 let batch = iter.next().unwrap().unwrap();
508
509 assert_eq!(1, batch.num_rows());
510 assert_eq!(1, batch.fields().len()); assert_eq!(2, batch.fields()[0].column_id);
512 }
513
514 #[test]
515 fn test_dedup() {
516 let memtable = new_test_memtable(true, MergeMode::LastRow);
517 memtable
518 .write(&build_key_values(
519 &memtable.region_metadata,
520 0,
521 &[(1, 1.0, "a".to_string())],
522 ))
523 .unwrap();
524 memtable
525 .write(&build_key_values(
526 &memtable.region_metadata,
527 1,
528 &[(1, 2.0, "b".to_string())],
529 ))
530 .unwrap();
531 let mut iter = memtable.iter(None, None, None).unwrap();
532 let batch = iter.next().unwrap().unwrap();
533
534 assert_eq!(1, batch.num_rows()); assert_eq!(2.0, batch.fields()[0].data.get(0).as_f64_lossy().unwrap()); }
537
538 #[test]
539 fn test_write_one() {
540 let memtable = new_test_memtable(false, MergeMode::LastRow);
541 let kvs = build_key_values(&memtable.region_metadata, 0, &[(1, 1.0, "a".to_string())]);
542 let kv = kvs.iter().next().unwrap();
543 memtable.write_one(kv).unwrap();
544
545 let mut iter = memtable.iter(None, None, None).unwrap();
546 let batch = iter.next().unwrap().unwrap();
547 assert_eq!(1, batch.num_rows());
548 }
549
550 #[test]
551 fn test_write_bulk() {
552 let memtable = new_test_memtable(false, MergeMode::LastRow);
553 let arrow_schema = memtable.schema().schema.arrow_schema().clone();
554 let arrays = vec![
555 Arc::new(TimestampMillisecondArray::from(vec![1, 2])) as ArrayRef,
556 Arc::new(Float64Array::from(vec![1.0, 2.0])) as ArrayRef,
557 Arc::new(StringArray::from(vec!["a", "b"])) as ArrayRef,
558 ];
559 let rb = DfRecordBatch::try_new(arrow_schema, arrays).unwrap();
560
561 let part = BulkPart {
562 batch: rb,
563 sequence: 1,
564 min_ts: 1,
565 max_ts: 2,
566 num_rows: 2,
567 timestamp_index: 0,
568 };
569 memtable.write_bulk(part).unwrap();
570
571 let mut iter = memtable.iter(None, None, None).unwrap();
572 let batch = iter.next().unwrap().unwrap();
573 assert_eq!(2, batch.num_rows());
574
575 let stats = memtable.stats();
576 assert_eq!(1, stats.max_sequence);
577 assert_eq!(2, stats.num_rows);
578 assert_eq!(
579 Some((Timestamp::new_millisecond(1), Timestamp::new_millisecond(2))),
580 stats.time_range
581 );
582
583 let kvs = build_key_values(&memtable.region_metadata, 2, &[(3, 3.0, "c".to_string())]);
584 memtable.write(&kvs).unwrap();
585 let mut iter = memtable.iter(None, None, None).unwrap();
586 let batch = iter.next().unwrap().unwrap();
587 assert_eq!(3, batch.num_rows());
588 assert_eq!(
589 vec![1, 2, 3],
590 batch
591 .timestamps()
592 .as_any()
593 .downcast_ref::<TimestampMillisecondVector>()
594 .unwrap()
595 .iter_data()
596 .map(|t| { t.unwrap().0.value() })
597 .collect::<Vec<_>>()
598 );
599 }
600
601 #[test]
602 fn test_is_empty() {
603 let memtable = new_test_memtable(false, MergeMode::LastRow);
604 assert!(memtable.is_empty());
605
606 memtable
607 .write(&build_key_values(
608 &memtable.region_metadata,
609 0,
610 &[(1, 1.0, "a".to_string())],
611 ))
612 .unwrap();
613 assert!(!memtable.is_empty());
614 }
615
616 #[test]
617 fn test_stats() {
618 let memtable = new_test_memtable(false, MergeMode::LastRow);
619 let stats = memtable.stats();
620 assert_eq!(0, stats.num_rows);
621 assert!(stats.time_range.is_none());
622
623 memtable
624 .write(&build_key_values(
625 &memtable.region_metadata,
626 0,
627 &[(1, 1.0, "a".to_string())],
628 ))
629 .unwrap();
630 let stats = memtable.stats();
631 assert_eq!(1, stats.num_rows);
632 assert!(stats.time_range.is_some());
633 }
634
635 #[test]
636 fn test_fork() {
637 let memtable = new_test_memtable(false, MergeMode::LastRow);
638 memtable
639 .write(&build_key_values(
640 &memtable.region_metadata,
641 0,
642 &[(1, 1.0, "a".to_string())],
643 ))
644 .unwrap();
645
646 let forked = memtable.fork(2, &memtable.region_metadata);
647 assert!(forked.is_empty());
648 }
649
650 #[test]
651 fn test_sequence_filter() {
652 let memtable = new_test_memtable(false, MergeMode::LastRow);
653 memtable
654 .write(&build_key_values(
655 &memtable.region_metadata,
656 0,
657 &[(1, 1.0, "a".to_string())],
658 ))
659 .unwrap();
660 memtable
661 .write(&build_key_values(
662 &memtable.region_metadata,
663 1,
664 &[(2, 2.0, "b".to_string())],
665 ))
666 .unwrap();
667
668 let mut iter = memtable.iter(None, None, Some(0)).unwrap();
670 let batch = iter.next().unwrap().unwrap();
671 assert_eq!(1, batch.num_rows());
672 assert_eq!(1.0, batch.fields()[0].data.get(0).as_f64_lossy().unwrap());
673 }
674}