1pub(crate) mod data;
18mod dedup;
19mod dict;
20mod merger;
21mod partition;
22mod primary_key_filter;
23mod shard;
24mod shard_builder;
25mod tree;
26
27use std::fmt;
28use std::sync::atomic::{AtomicI64, AtomicU64, AtomicUsize, Ordering};
29use std::sync::Arc;
30
31use common_base::readable_size::ReadableSize;
32pub(crate) use primary_key_filter::{DensePrimaryKeyFilter, SparsePrimaryKeyFilter};
33use serde::{Deserialize, Serialize};
34use store_api::metadata::RegionMetadataRef;
35use store_api::storage::{ColumnId, SequenceNumber};
36use table::predicate::Predicate;
37
38use crate::error::{Result, UnsupportedOperationSnafu};
39use crate::flush::WriteBufferManagerRef;
40use crate::memtable::key_values::KeyValue;
41use crate::memtable::partition_tree::tree::PartitionTree;
42use crate::memtable::stats::WriteMetrics;
43use crate::memtable::{
44 AllocTracker, BoxedBatchIterator, BulkPart, IterBuilder, KeyValues, Memtable, MemtableBuilder,
45 MemtableId, MemtableRange, MemtableRangeContext, MemtableRanges, MemtableRef, MemtableStats,
46 PredicateGroup,
47};
48use crate::region::options::MergeMode;
49use crate::row_converter::{build_primary_key_codec, PrimaryKeyCodec};
50
51pub(crate) const DICTIONARY_SIZE_FACTOR: u64 = 8;
53pub(crate) const DEFAULT_MAX_KEYS_PER_SHARD: usize = 8192;
54pub(crate) const DEFAULT_FREEZE_THRESHOLD: usize = 131072;
55
56type ShardId = u32;
58type PkIndex = u16;
60
61#[derive(Debug, Clone, Copy, PartialEq, Eq)]
63struct PkId {
64 shard_id: ShardId,
65 pk_index: PkIndex,
66}
67
68#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
73#[serde(default)]
74pub struct PartitionTreeConfig {
75 pub index_max_keys_per_shard: usize,
77 pub data_freeze_threshold: usize,
79 #[serde(skip_deserializing)]
84 pub dedup: bool,
85 pub fork_dictionary_bytes: ReadableSize,
87 #[serde(skip_deserializing)]
89 pub merge_mode: MergeMode,
90}
91
92impl Default for PartitionTreeConfig {
93 fn default() -> Self {
94 let mut fork_dictionary_bytes = ReadableSize::mb(512);
95 if let Some(sys_memory) = common_config::utils::get_sys_total_memory() {
96 let adjust_dictionary_bytes =
97 std::cmp::min(sys_memory / DICTIONARY_SIZE_FACTOR, fork_dictionary_bytes);
98 if adjust_dictionary_bytes.0 > 0 {
99 fork_dictionary_bytes = adjust_dictionary_bytes;
100 }
101 }
102
103 Self {
104 index_max_keys_per_shard: 8192,
105 data_freeze_threshold: 131072,
106 dedup: true,
107 fork_dictionary_bytes,
108 merge_mode: MergeMode::LastRow,
109 }
110 }
111}
112
113pub struct PartitionTreeMemtable {
115 id: MemtableId,
116 tree: Arc<PartitionTree>,
117 alloc_tracker: AllocTracker,
118 max_timestamp: AtomicI64,
119 min_timestamp: AtomicI64,
120 max_sequence: AtomicU64,
121 num_rows: AtomicUsize,
123}
124
125impl fmt::Debug for PartitionTreeMemtable {
126 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
127 f.debug_struct("PartitionTreeMemtable")
128 .field("id", &self.id)
129 .finish()
130 }
131}
132
133impl Memtable for PartitionTreeMemtable {
134 fn id(&self) -> MemtableId {
135 self.id
136 }
137
138 fn write(&self, kvs: &KeyValues) -> Result<()> {
139 if kvs.is_empty() {
140 return Ok(());
141 }
142
143 let mut metrics = WriteMetrics::default();
146 let mut pk_buffer = Vec::new();
147 let res = self.tree.write(kvs, &mut pk_buffer, &mut metrics);
149
150 self.update_stats(&metrics);
151
152 if res.is_ok() {
154 let sequence = kvs.max_sequence();
155 self.max_sequence.fetch_max(sequence, Ordering::Relaxed);
156 }
157
158 self.num_rows.fetch_add(kvs.num_rows(), Ordering::Relaxed);
159 res
160 }
161
162 fn write_one(&self, key_value: KeyValue) -> Result<()> {
163 let mut metrics = WriteMetrics::default();
164 let mut pk_buffer = Vec::new();
165 let res = self.tree.write_one(key_value, &mut pk_buffer, &mut metrics);
167
168 self.update_stats(&metrics);
169
170 if res.is_ok() {
172 self.max_sequence
173 .fetch_max(key_value.sequence(), Ordering::Relaxed);
174 }
175
176 self.num_rows.fetch_add(1, Ordering::Relaxed);
177 res
178 }
179
180 fn write_bulk(&self, _part: BulkPart) -> Result<()> {
181 UnsupportedOperationSnafu {
182 err_msg: "PartitionTreeMemtable does not support write_bulk",
183 }
184 .fail()
185 }
186
187 fn iter(
188 &self,
189 projection: Option<&[ColumnId]>,
190 predicate: Option<Predicate>,
191 sequence: Option<SequenceNumber>,
192 ) -> Result<BoxedBatchIterator> {
193 self.tree.read(projection, predicate, sequence)
194 }
195
196 fn ranges(
197 &self,
198 projection: Option<&[ColumnId]>,
199 predicate: PredicateGroup,
200 sequence: Option<SequenceNumber>,
201 ) -> MemtableRanges {
202 let projection = projection.map(|ids| ids.to_vec());
203 let builder = Box::new(PartitionTreeIterBuilder {
204 tree: self.tree.clone(),
205 projection,
206 predicate: predicate.predicate().cloned(),
207 sequence,
208 });
209 let context = Arc::new(MemtableRangeContext::new(self.id, builder, predicate));
210
211 MemtableRanges {
212 ranges: [(0, MemtableRange::new(context))].into(),
213 stats: self.stats(),
214 }
215 }
216
217 fn is_empty(&self) -> bool {
218 self.tree.is_empty()
219 }
220
221 fn freeze(&self) -> Result<()> {
222 self.alloc_tracker.done_allocating();
223
224 self.tree.freeze()
225 }
226
227 fn stats(&self) -> MemtableStats {
228 let estimated_bytes = self.alloc_tracker.bytes_allocated();
229
230 if estimated_bytes == 0 {
231 return MemtableStats {
233 estimated_bytes,
234 time_range: None,
235 num_rows: 0,
236 num_ranges: 0,
237 max_sequence: 0,
238 };
239 }
240
241 let ts_type = self
242 .tree
243 .metadata
244 .time_index_column()
245 .column_schema
246 .data_type
247 .clone()
248 .as_timestamp()
249 .expect("Timestamp column must have timestamp type");
250 let max_timestamp = ts_type.create_timestamp(self.max_timestamp.load(Ordering::Relaxed));
251 let min_timestamp = ts_type.create_timestamp(self.min_timestamp.load(Ordering::Relaxed));
252 MemtableStats {
253 estimated_bytes,
254 time_range: Some((min_timestamp, max_timestamp)),
255 num_rows: self.num_rows.load(Ordering::Relaxed),
256 num_ranges: 1,
257 max_sequence: self.max_sequence.load(Ordering::Relaxed),
258 }
259 }
260
261 fn fork(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef {
262 let tree = self.tree.fork(metadata.clone());
263
264 let memtable = PartitionTreeMemtable::with_tree(id, tree);
265 Arc::new(memtable)
266 }
267}
268
269impl PartitionTreeMemtable {
270 pub fn new(
272 id: MemtableId,
273 row_codec: Arc<dyn PrimaryKeyCodec>,
274 metadata: RegionMetadataRef,
275 write_buffer_manager: Option<WriteBufferManagerRef>,
276 config: &PartitionTreeConfig,
277 ) -> Self {
278 Self::with_tree(
279 id,
280 PartitionTree::new(row_codec, metadata, config, write_buffer_manager.clone()),
281 )
282 }
283
284 fn with_tree(id: MemtableId, tree: PartitionTree) -> Self {
288 let alloc_tracker = AllocTracker::new(tree.write_buffer_manager());
289
290 Self {
291 id,
292 tree: Arc::new(tree),
293 alloc_tracker,
294 max_timestamp: AtomicI64::new(i64::MIN),
295 min_timestamp: AtomicI64::new(i64::MAX),
296 num_rows: AtomicUsize::new(0),
297 max_sequence: AtomicU64::new(0),
298 }
299 }
300
301 fn update_stats(&self, metrics: &WriteMetrics) {
303 self.alloc_tracker.on_allocation(metrics.value_bytes);
305 self.max_timestamp
306 .fetch_max(metrics.max_ts, Ordering::SeqCst);
307 self.min_timestamp
308 .fetch_min(metrics.min_ts, Ordering::SeqCst);
309 }
310}
311
312#[derive(Debug, Default)]
314pub struct PartitionTreeMemtableBuilder {
315 config: PartitionTreeConfig,
316 write_buffer_manager: Option<WriteBufferManagerRef>,
317}
318
319impl PartitionTreeMemtableBuilder {
320 pub fn new(
322 config: PartitionTreeConfig,
323 write_buffer_manager: Option<WriteBufferManagerRef>,
324 ) -> Self {
325 Self {
326 config,
327 write_buffer_manager,
328 }
329 }
330}
331
332impl MemtableBuilder for PartitionTreeMemtableBuilder {
333 fn build(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef {
334 let codec = build_primary_key_codec(metadata);
335 Arc::new(PartitionTreeMemtable::new(
336 id,
337 codec,
338 metadata.clone(),
339 self.write_buffer_manager.clone(),
340 &self.config,
341 ))
342 }
343}
344
345struct PartitionTreeIterBuilder {
346 tree: Arc<PartitionTree>,
347 projection: Option<Vec<ColumnId>>,
348 predicate: Option<Predicate>,
349 sequence: Option<SequenceNumber>,
350}
351
352impl IterBuilder for PartitionTreeIterBuilder {
353 fn build(&self) -> Result<BoxedBatchIterator> {
354 self.tree.read(
355 self.projection.as_deref(),
356 self.predicate.clone(),
357 self.sequence,
358 )
359 }
360}
361
362#[cfg(test)]
363mod tests {
364 use api::v1::value::ValueData;
365 use api::v1::{Row, Rows, SemanticType};
366 use common_time::Timestamp;
367 use datafusion_common::{Column, ScalarValue};
368 use datafusion_expr::{BinaryExpr, Expr, Operator};
369 use datatypes::data_type::ConcreteDataType;
370 use datatypes::scalars::ScalarVector;
371 use datatypes::schema::ColumnSchema;
372 use datatypes::value::Value;
373 use datatypes::vectors::Int64Vector;
374 use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
375 use store_api::storage::RegionId;
376
377 use super::*;
378 use crate::row_converter::DensePrimaryKeyCodec;
379 use crate::test_util::memtable_util::{
380 self, collect_iter_timestamps, region_metadata_to_row_schema,
381 };
382
383 #[test]
384 fn test_memtable_sorted_input() {
385 write_iter_sorted_input(true);
386 write_iter_sorted_input(false);
387 }
388
389 fn write_iter_sorted_input(has_pk: bool) {
390 let metadata = if has_pk {
391 memtable_util::metadata_with_primary_key(vec![1, 0], true)
392 } else {
393 memtable_util::metadata_with_primary_key(vec![], false)
394 };
395 let timestamps = (0..100).collect::<Vec<_>>();
396 let kvs =
397 memtable_util::build_key_values(&metadata, "hello".to_string(), 42, ×tamps, 1);
398 let codec = Arc::new(DensePrimaryKeyCodec::new(&metadata));
399 let memtable = PartitionTreeMemtable::new(
400 1,
401 codec,
402 metadata.clone(),
403 None,
404 &PartitionTreeConfig::default(),
405 );
406 memtable.write(&kvs).unwrap();
407
408 let expected_ts = kvs
409 .iter()
410 .map(|kv| kv.timestamp().as_timestamp().unwrap().unwrap().value())
411 .collect::<Vec<_>>();
412
413 let iter = memtable.iter(None, None, None).unwrap();
414 let read = collect_iter_timestamps(iter);
415 assert_eq!(expected_ts, read);
416
417 let stats = memtable.stats();
418 assert!(stats.bytes_allocated() > 0);
419 assert_eq!(
420 Some((
421 Timestamp::new_millisecond(0),
422 Timestamp::new_millisecond(99)
423 )),
424 stats.time_range()
425 );
426 }
427
428 #[test]
429 fn test_memtable_unsorted_input() {
430 write_iter_unsorted_input(true);
431 write_iter_unsorted_input(false);
432 }
433
434 fn write_iter_unsorted_input(has_pk: bool) {
435 let metadata = if has_pk {
436 memtable_util::metadata_with_primary_key(vec![1, 0], true)
437 } else {
438 memtable_util::metadata_with_primary_key(vec![], false)
439 };
440 let codec = Arc::new(DensePrimaryKeyCodec::new(&metadata));
441 let memtable = PartitionTreeMemtable::new(
442 1,
443 codec,
444 metadata.clone(),
445 None,
446 &PartitionTreeConfig::default(),
447 );
448
449 let kvs = memtable_util::build_key_values(
450 &metadata,
451 "hello".to_string(),
452 0,
453 &[1, 3, 7, 5, 6],
454 0, );
456 memtable.write(&kvs).unwrap();
457
458 let kvs = memtable_util::build_key_values(
459 &metadata,
460 "hello".to_string(),
461 0,
462 &[5, 2, 4, 0, 7],
463 5, );
465 memtable.write(&kvs).unwrap();
466
467 let iter = memtable.iter(None, None, None).unwrap();
468 let read = collect_iter_timestamps(iter);
469 assert_eq!(vec![0, 1, 2, 3, 4, 5, 6, 7], read);
470
471 let iter = memtable.iter(None, None, None).unwrap();
472 let read = iter
473 .flat_map(|batch| {
474 batch
475 .unwrap()
476 .sequences()
477 .iter_data()
478 .collect::<Vec<_>>()
479 .into_iter()
480 })
481 .map(|v| v.unwrap())
482 .collect::<Vec<_>>();
483 assert_eq!(vec![8, 0, 6, 1, 7, 5, 4, 9], read);
484
485 let stats = memtable.stats();
486 assert!(stats.bytes_allocated() > 0);
487 assert_eq!(
488 Some((Timestamp::new_millisecond(0), Timestamp::new_millisecond(7))),
489 stats.time_range()
490 );
491 }
492
493 #[test]
494 fn test_memtable_projection() {
495 write_iter_projection(true);
496 write_iter_projection(false);
497 }
498
499 fn write_iter_projection(has_pk: bool) {
500 let metadata = if has_pk {
501 memtable_util::metadata_with_primary_key(vec![1, 0], true)
502 } else {
503 memtable_util::metadata_with_primary_key(vec![], false)
504 };
505 let memtable = PartitionTreeMemtableBuilder::new(PartitionTreeConfig::default(), None)
507 .build(1, &metadata);
508
509 let expect = (0..100).collect::<Vec<_>>();
510 let kvs = memtable_util::build_key_values(&metadata, "hello".to_string(), 10, &expect, 1);
511 memtable.write(&kvs).unwrap();
512 let iter = memtable.iter(Some(&[3]), None, None).unwrap();
513
514 let mut v0_all = vec![];
515 for res in iter {
516 let batch = res.unwrap();
517 assert_eq!(1, batch.fields().len());
518 let v0 = batch
519 .fields()
520 .first()
521 .unwrap()
522 .data
523 .as_any()
524 .downcast_ref::<Int64Vector>()
525 .unwrap();
526 v0_all.extend(v0.iter_data().map(|v| v.unwrap()));
527 }
528 assert_eq!(expect, v0_all);
529 }
530
531 #[test]
532 fn test_write_iter_multi_keys() {
533 write_iter_multi_keys(1, 100);
534 write_iter_multi_keys(2, 100);
535 write_iter_multi_keys(4, 100);
536 write_iter_multi_keys(8, 5);
537 write_iter_multi_keys(2, 10);
538 }
539
540 fn write_iter_multi_keys(max_keys: usize, freeze_threshold: usize) {
541 let metadata = memtable_util::metadata_with_primary_key(vec![1, 0], true);
542 let codec = Arc::new(DensePrimaryKeyCodec::new(&metadata));
543 let memtable = PartitionTreeMemtable::new(
544 1,
545 codec,
546 metadata.clone(),
547 None,
548 &PartitionTreeConfig {
549 index_max_keys_per_shard: max_keys,
550 data_freeze_threshold: freeze_threshold,
551 ..Default::default()
552 },
553 );
554
555 let mut data = Vec::new();
556 for i in 0..4 {
558 for j in 0..4 {
559 let timestamps = [11, 13, 1, 5, 3, 7, 9];
561 let key = format!("a{j}");
562 let kvs =
563 memtable_util::build_key_values(&metadata, key.clone(), i, ×tamps, 0);
564 memtable.write(&kvs).unwrap();
565 for ts in timestamps {
566 data.push((i, key.clone(), ts));
567 }
568 }
569 for j in 0..4 {
570 let timestamps = [10, 2, 4, 8, 6];
572 let key = format!("a{j}");
573 let kvs =
574 memtable_util::build_key_values(&metadata, key.clone(), i, ×tamps, 200);
575 memtable.write(&kvs).unwrap();
576 for ts in timestamps {
577 data.push((i, key.clone(), ts));
578 }
579 }
580 }
581 data.sort_unstable();
582
583 let expect = data.into_iter().map(|x| x.2).collect::<Vec<_>>();
584 let iter = memtable.iter(None, None, None).unwrap();
585 let read = collect_iter_timestamps(iter);
586 assert_eq!(expect, read);
587 }
588
589 #[test]
590 fn test_memtable_filter() {
591 let metadata = memtable_util::metadata_with_primary_key(vec![0, 1], false);
592 let memtable = PartitionTreeMemtableBuilder::new(
594 PartitionTreeConfig {
595 index_max_keys_per_shard: 40,
596 ..Default::default()
597 },
598 None,
599 )
600 .build(1, &metadata);
601
602 for i in 0..100 {
603 let timestamps: Vec<_> = (0..10).map(|v| i as i64 * 1000 + v).collect();
604 let kvs =
605 memtable_util::build_key_values(&metadata, "hello".to_string(), i, ×tamps, 1);
606 memtable.write(&kvs).unwrap();
607 }
608
609 for i in 0..100 {
610 let timestamps: Vec<_> = (0..10).map(|v| i as i64 * 1000 + v).collect();
611 let expr = Expr::BinaryExpr(BinaryExpr {
612 left: Box::new(Expr::Column(Column::from_name("k1"))),
613 op: Operator::Eq,
614 right: Box::new(Expr::Literal(ScalarValue::UInt32(Some(i)))),
615 });
616 let iter = memtable
617 .iter(None, Some(Predicate::new(vec![expr])), None)
618 .unwrap();
619 let read = collect_iter_timestamps(iter);
620 assert_eq!(timestamps, read);
621 }
622 }
623
624 #[test]
625 fn test_deserialize_config() {
626 let config = PartitionTreeConfig {
627 dedup: false,
628 ..Default::default()
629 };
630 let json = serde_json::to_string(&config).unwrap();
632 let config: PartitionTreeConfig = serde_json::from_str(&json).unwrap();
633 assert!(config.dedup);
634 assert_eq!(PartitionTreeConfig::default(), config);
635 }
636
637 fn metadata_for_metric_engine() -> RegionMetadataRef {
638 let mut builder = RegionMetadataBuilder::new(RegionId::new(123, 456));
639 builder
640 .push_column_metadata(ColumnMetadata {
641 column_schema: ColumnSchema::new(
642 "__table_id",
643 ConcreteDataType::uint32_datatype(),
644 false,
645 ),
646 semantic_type: SemanticType::Tag,
647 column_id: 2147483652,
648 })
649 .push_column_metadata(ColumnMetadata {
650 column_schema: ColumnSchema::new(
651 "__tsid",
652 ConcreteDataType::uint64_datatype(),
653 false,
654 ),
655 semantic_type: SemanticType::Tag,
656 column_id: 2147483651,
657 })
658 .push_column_metadata(ColumnMetadata {
659 column_schema: ColumnSchema::new(
660 "test_label",
661 ConcreteDataType::string_datatype(),
662 false,
663 ),
664 semantic_type: SemanticType::Tag,
665 column_id: 2,
666 })
667 .push_column_metadata(ColumnMetadata {
668 column_schema: ColumnSchema::new(
669 "greptime_timestamp",
670 ConcreteDataType::timestamp_millisecond_datatype(),
671 false,
672 ),
673 semantic_type: SemanticType::Timestamp,
674 column_id: 0,
675 })
676 .push_column_metadata(ColumnMetadata {
677 column_schema: ColumnSchema::new(
678 "greptime_value",
679 ConcreteDataType::float64_datatype(),
680 true,
681 ),
682 semantic_type: SemanticType::Field,
683 column_id: 1,
684 })
685 .primary_key(vec![2147483652, 2147483651, 2]);
686 let region_metadata = builder.build().unwrap();
687 Arc::new(region_metadata)
688 }
689
690 fn build_key_values(
691 metadata: RegionMetadataRef,
692 labels: &[&str],
693 table_id: &[u32],
694 ts_id: &[u64],
695 ts: &[i64],
696 values: &[f64],
697 sequence: u64,
698 ) -> KeyValues {
699 let column_schema = region_metadata_to_row_schema(&metadata);
700
701 let rows = ts
702 .iter()
703 .zip(table_id.iter())
704 .zip(ts_id.iter())
705 .zip(labels.iter())
706 .zip(values.iter())
707 .map(|((((ts, table_id), ts_id), label), val)| Row {
708 values: vec![
709 api::v1::Value {
710 value_data: Some(ValueData::U32Value(*table_id)),
711 },
712 api::v1::Value {
713 value_data: Some(ValueData::U64Value(*ts_id)),
714 },
715 api::v1::Value {
716 value_data: Some(ValueData::StringValue(label.to_string())),
717 },
718 api::v1::Value {
719 value_data: Some(ValueData::TimestampMillisecondValue(*ts)),
720 },
721 api::v1::Value {
722 value_data: Some(ValueData::F64Value(*val)),
723 },
724 ],
725 })
726 .collect();
727 let mutation = api::v1::Mutation {
728 op_type: 1,
729 sequence,
730 rows: Some(Rows {
731 schema: column_schema,
732 rows,
733 }),
734 write_hint: None,
735 };
736 KeyValues::new(metadata.as_ref(), mutation).unwrap()
737 }
738
739 #[test]
740 fn test_write_freeze() {
741 let metadata = metadata_for_metric_engine();
742 let memtable = PartitionTreeMemtableBuilder::new(
743 PartitionTreeConfig {
744 index_max_keys_per_shard: 40,
745 ..Default::default()
746 },
747 None,
748 )
749 .build(1, &metadata);
750
751 let codec = DensePrimaryKeyCodec::new(&metadata);
752
753 memtable
754 .write(&build_key_values(
755 metadata.clone(),
756 &["daily", "10min", "daily", "10min"],
757 &[1025, 1025, 1025, 1025],
758 &[
759 16442255374049317291,
760 5686004715529701024,
761 16442255374049317291,
762 5686004715529701024,
763 ],
764 &[1712070000000, 1712717731000, 1712761200000, 1712761200000],
765 &[0.0, 0.0, 0.0, 0.0],
766 1,
767 ))
768 .unwrap();
769
770 memtable.freeze().unwrap();
771 let new_memtable = memtable.fork(2, &metadata);
772
773 new_memtable
774 .write(&build_key_values(
775 metadata.clone(),
776 &["10min"],
777 &[1025],
778 &[5686004715529701024],
779 &[1714643131000],
780 &[0.1],
781 2,
782 ))
783 .unwrap();
784
785 let mut reader = new_memtable.iter(None, None, None).unwrap();
786 let batch = reader.next().unwrap().unwrap();
787 let pk = codec.decode(batch.primary_key()).unwrap().into_dense();
788 if let Value::String(s) = &pk[2] {
789 assert_eq!("10min", s.as_utf8());
790 } else {
791 unreachable!()
792 }
793 }
794}