1pub(crate) mod data;
18mod dedup;
19mod dict;
20mod merger;
21mod partition;
22mod primary_key_filter;
23mod shard;
24mod shard_builder;
25mod tree;
26
27use std::fmt;
28use std::sync::atomic::{AtomicI64, AtomicU64, AtomicUsize, Ordering};
29use std::sync::Arc;
30
31use common_base::readable_size::ReadableSize;
32pub(crate) use primary_key_filter::{DensePrimaryKeyFilter, SparsePrimaryKeyFilter};
33use serde::{Deserialize, Serialize};
34use store_api::metadata::RegionMetadataRef;
35use store_api::storage::{ColumnId, SequenceNumber};
36use table::predicate::Predicate;
37
38use crate::error::{Result, UnsupportedOperationSnafu};
39use crate::flush::WriteBufferManagerRef;
40use crate::memtable::bulk::part::BulkPart;
41use crate::memtable::key_values::KeyValue;
42use crate::memtable::partition_tree::tree::PartitionTree;
43use crate::memtable::stats::WriteMetrics;
44use crate::memtable::{
45 AllocTracker, BoxedBatchIterator, IterBuilder, KeyValues, Memtable, MemtableBuilder,
46 MemtableId, MemtableRange, MemtableRangeContext, MemtableRanges, MemtableRef, MemtableStats,
47 PredicateGroup,
48};
49use crate::region::options::MergeMode;
50use crate::row_converter::{build_primary_key_codec, PrimaryKeyCodec};
51
52pub(crate) const DICTIONARY_SIZE_FACTOR: u64 = 8;
54pub(crate) const DEFAULT_MAX_KEYS_PER_SHARD: usize = 8192;
55pub(crate) const DEFAULT_FREEZE_THRESHOLD: usize = 131072;
56
57type ShardId = u32;
59type PkIndex = u16;
61
62#[derive(Debug, Clone, Copy, PartialEq, Eq)]
64struct PkId {
65 shard_id: ShardId,
66 pk_index: PkIndex,
67}
68
69#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
74#[serde(default)]
75pub struct PartitionTreeConfig {
76 pub index_max_keys_per_shard: usize,
78 pub data_freeze_threshold: usize,
80 #[serde(skip_deserializing)]
85 pub dedup: bool,
86 pub fork_dictionary_bytes: ReadableSize,
88 #[serde(skip_deserializing)]
90 pub merge_mode: MergeMode,
91}
92
93impl Default for PartitionTreeConfig {
94 fn default() -> Self {
95 let mut fork_dictionary_bytes = ReadableSize::mb(512);
96 if let Some(sys_memory) = common_config::utils::get_sys_total_memory() {
97 let adjust_dictionary_bytes =
98 std::cmp::min(sys_memory / DICTIONARY_SIZE_FACTOR, fork_dictionary_bytes);
99 if adjust_dictionary_bytes.0 > 0 {
100 fork_dictionary_bytes = adjust_dictionary_bytes;
101 }
102 }
103
104 Self {
105 index_max_keys_per_shard: 8192,
106 data_freeze_threshold: 131072,
107 dedup: true,
108 fork_dictionary_bytes,
109 merge_mode: MergeMode::LastRow,
110 }
111 }
112}
113
114pub struct PartitionTreeMemtable {
116 id: MemtableId,
117 tree: Arc<PartitionTree>,
118 alloc_tracker: AllocTracker,
119 max_timestamp: AtomicI64,
120 min_timestamp: AtomicI64,
121 max_sequence: AtomicU64,
122 num_rows: AtomicUsize,
124}
125
126impl fmt::Debug for PartitionTreeMemtable {
127 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
128 f.debug_struct("PartitionTreeMemtable")
129 .field("id", &self.id)
130 .finish()
131 }
132}
133
134impl Memtable for PartitionTreeMemtable {
135 fn id(&self) -> MemtableId {
136 self.id
137 }
138
139 fn write(&self, kvs: &KeyValues) -> Result<()> {
140 if kvs.is_empty() {
141 return Ok(());
142 }
143
144 let mut metrics = WriteMetrics::default();
147 let mut pk_buffer = Vec::new();
148 let res = self.tree.write(kvs, &mut pk_buffer, &mut metrics);
150
151 if res.is_ok() {
152 metrics.max_sequence = kvs.max_sequence();
153 metrics.num_rows = kvs.num_rows();
154 self.update_stats(&metrics);
155 }
156 res
157 }
158
159 fn write_one(&self, key_value: KeyValue) -> Result<()> {
160 let mut metrics = WriteMetrics::default();
161 let mut pk_buffer = Vec::new();
162 let res = self.tree.write_one(key_value, &mut pk_buffer, &mut metrics);
164
165 if res.is_ok() {
167 metrics.max_sequence = metrics.max_sequence.max(key_value.sequence());
168 metrics.num_rows = 1;
169 self.update_stats(&metrics);
170 }
171 res
172 }
173
174 fn write_bulk(&self, _part: BulkPart) -> Result<()> {
175 UnsupportedOperationSnafu {
176 err_msg: "PartitionTreeMemtable does not support write_bulk",
177 }
178 .fail()
179 }
180
181 fn iter(
182 &self,
183 projection: Option<&[ColumnId]>,
184 predicate: Option<Predicate>,
185 sequence: Option<SequenceNumber>,
186 ) -> Result<BoxedBatchIterator> {
187 self.tree.read(projection, predicate, sequence)
188 }
189
190 fn ranges(
191 &self,
192 projection: Option<&[ColumnId]>,
193 predicate: PredicateGroup,
194 sequence: Option<SequenceNumber>,
195 ) -> Result<MemtableRanges> {
196 let projection = projection.map(|ids| ids.to_vec());
197 let builder = Box::new(PartitionTreeIterBuilder {
198 tree: self.tree.clone(),
199 projection,
200 predicate: predicate.predicate().cloned(),
201 sequence,
202 });
203 let context = Arc::new(MemtableRangeContext::new(self.id, builder, predicate));
204
205 Ok(MemtableRanges {
206 ranges: [(0, MemtableRange::new(context))].into(),
207 stats: self.stats(),
208 })
209 }
210
211 fn is_empty(&self) -> bool {
212 self.tree.is_empty()
213 }
214
215 fn freeze(&self) -> Result<()> {
216 self.alloc_tracker.done_allocating();
217
218 self.tree.freeze()
219 }
220
221 fn stats(&self) -> MemtableStats {
222 let estimated_bytes = self.alloc_tracker.bytes_allocated();
223
224 if estimated_bytes == 0 {
225 return MemtableStats {
227 estimated_bytes,
228 time_range: None,
229 num_rows: 0,
230 num_ranges: 0,
231 max_sequence: 0,
232 };
233 }
234
235 let ts_type = self
236 .tree
237 .metadata
238 .time_index_column()
239 .column_schema
240 .data_type
241 .clone()
242 .as_timestamp()
243 .expect("Timestamp column must have timestamp type");
244 let max_timestamp = ts_type.create_timestamp(self.max_timestamp.load(Ordering::Relaxed));
245 let min_timestamp = ts_type.create_timestamp(self.min_timestamp.load(Ordering::Relaxed));
246 MemtableStats {
247 estimated_bytes,
248 time_range: Some((min_timestamp, max_timestamp)),
249 num_rows: self.num_rows.load(Ordering::Relaxed),
250 num_ranges: 1,
251 max_sequence: self.max_sequence.load(Ordering::Relaxed),
252 }
253 }
254
255 fn fork(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef {
256 let tree = self.tree.fork(metadata.clone());
257
258 let memtable = PartitionTreeMemtable::with_tree(id, tree);
259 Arc::new(memtable)
260 }
261}
262
263impl PartitionTreeMemtable {
264 pub fn new(
266 id: MemtableId,
267 row_codec: Arc<dyn PrimaryKeyCodec>,
268 metadata: RegionMetadataRef,
269 write_buffer_manager: Option<WriteBufferManagerRef>,
270 config: &PartitionTreeConfig,
271 ) -> Self {
272 Self::with_tree(
273 id,
274 PartitionTree::new(row_codec, metadata, config, write_buffer_manager.clone()),
275 )
276 }
277
278 fn with_tree(id: MemtableId, tree: PartitionTree) -> Self {
282 let alloc_tracker = AllocTracker::new(tree.write_buffer_manager());
283
284 Self {
285 id,
286 tree: Arc::new(tree),
287 alloc_tracker,
288 max_timestamp: AtomicI64::new(i64::MIN),
289 min_timestamp: AtomicI64::new(i64::MAX),
290 num_rows: AtomicUsize::new(0),
291 max_sequence: AtomicU64::new(0),
292 }
293 }
294
295 fn update_stats(&self, metrics: &WriteMetrics) {
297 self.alloc_tracker.on_allocation(metrics.value_bytes);
299 self.max_timestamp
300 .fetch_max(metrics.max_ts, Ordering::SeqCst);
301 self.min_timestamp
302 .fetch_min(metrics.min_ts, Ordering::SeqCst);
303 self.num_rows.fetch_add(metrics.num_rows, Ordering::SeqCst);
304 self.max_sequence
305 .fetch_max(metrics.max_sequence, Ordering::SeqCst);
306 }
307}
308
309#[derive(Debug, Default)]
311pub struct PartitionTreeMemtableBuilder {
312 config: PartitionTreeConfig,
313 write_buffer_manager: Option<WriteBufferManagerRef>,
314}
315
316impl PartitionTreeMemtableBuilder {
317 pub fn new(
319 config: PartitionTreeConfig,
320 write_buffer_manager: Option<WriteBufferManagerRef>,
321 ) -> Self {
322 Self {
323 config,
324 write_buffer_manager,
325 }
326 }
327}
328
329impl MemtableBuilder for PartitionTreeMemtableBuilder {
330 fn build(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef {
331 let codec = build_primary_key_codec(metadata);
332 Arc::new(PartitionTreeMemtable::new(
333 id,
334 codec,
335 metadata.clone(),
336 self.write_buffer_manager.clone(),
337 &self.config,
338 ))
339 }
340}
341
342struct PartitionTreeIterBuilder {
343 tree: Arc<PartitionTree>,
344 projection: Option<Vec<ColumnId>>,
345 predicate: Option<Predicate>,
346 sequence: Option<SequenceNumber>,
347}
348
349impl IterBuilder for PartitionTreeIterBuilder {
350 fn build(&self) -> Result<BoxedBatchIterator> {
351 self.tree.read(
352 self.projection.as_deref(),
353 self.predicate.clone(),
354 self.sequence,
355 )
356 }
357}
358
359#[cfg(test)]
360mod tests {
361 use api::v1::value::ValueData;
362 use api::v1::{Row, Rows, SemanticType};
363 use common_time::Timestamp;
364 use datafusion_common::{Column, ScalarValue};
365 use datafusion_expr::{BinaryExpr, Expr, Operator};
366 use datatypes::data_type::ConcreteDataType;
367 use datatypes::scalars::ScalarVector;
368 use datatypes::schema::ColumnSchema;
369 use datatypes::value::Value;
370 use datatypes::vectors::Int64Vector;
371 use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
372 use store_api::storage::RegionId;
373
374 use super::*;
375 use crate::row_converter::DensePrimaryKeyCodec;
376 use crate::test_util::memtable_util::{
377 self, collect_iter_timestamps, region_metadata_to_row_schema,
378 };
379
380 #[test]
381 fn test_memtable_sorted_input() {
382 write_iter_sorted_input(true);
383 write_iter_sorted_input(false);
384 }
385
386 fn write_iter_sorted_input(has_pk: bool) {
387 let metadata = if has_pk {
388 memtable_util::metadata_with_primary_key(vec![1, 0], true)
389 } else {
390 memtable_util::metadata_with_primary_key(vec![], false)
391 };
392 let timestamps = (0..100).collect::<Vec<_>>();
393 let kvs =
394 memtable_util::build_key_values(&metadata, "hello".to_string(), 42, ×tamps, 1);
395 let codec = Arc::new(DensePrimaryKeyCodec::new(&metadata));
396 let memtable = PartitionTreeMemtable::new(
397 1,
398 codec,
399 metadata.clone(),
400 None,
401 &PartitionTreeConfig::default(),
402 );
403 memtable.write(&kvs).unwrap();
404
405 let expected_ts = kvs
406 .iter()
407 .map(|kv| kv.timestamp().as_timestamp().unwrap().unwrap().value())
408 .collect::<Vec<_>>();
409
410 let iter = memtable.iter(None, None, None).unwrap();
411 let read = collect_iter_timestamps(iter);
412 assert_eq!(expected_ts, read);
413
414 let stats = memtable.stats();
415 assert!(stats.bytes_allocated() > 0);
416 assert_eq!(
417 Some((
418 Timestamp::new_millisecond(0),
419 Timestamp::new_millisecond(99)
420 )),
421 stats.time_range()
422 );
423 }
424
425 #[test]
426 fn test_memtable_unsorted_input() {
427 write_iter_unsorted_input(true);
428 write_iter_unsorted_input(false);
429 }
430
431 fn write_iter_unsorted_input(has_pk: bool) {
432 let metadata = if has_pk {
433 memtable_util::metadata_with_primary_key(vec![1, 0], true)
434 } else {
435 memtable_util::metadata_with_primary_key(vec![], false)
436 };
437 let codec = Arc::new(DensePrimaryKeyCodec::new(&metadata));
438 let memtable = PartitionTreeMemtable::new(
439 1,
440 codec,
441 metadata.clone(),
442 None,
443 &PartitionTreeConfig::default(),
444 );
445
446 let kvs = memtable_util::build_key_values(
447 &metadata,
448 "hello".to_string(),
449 0,
450 &[1, 3, 7, 5, 6],
451 0, );
453 memtable.write(&kvs).unwrap();
454
455 let kvs = memtable_util::build_key_values(
456 &metadata,
457 "hello".to_string(),
458 0,
459 &[5, 2, 4, 0, 7],
460 5, );
462 memtable.write(&kvs).unwrap();
463
464 let iter = memtable.iter(None, None, None).unwrap();
465 let read = collect_iter_timestamps(iter);
466 assert_eq!(vec![0, 1, 2, 3, 4, 5, 6, 7], read);
467
468 let iter = memtable.iter(None, None, None).unwrap();
469 let read = iter
470 .flat_map(|batch| {
471 batch
472 .unwrap()
473 .sequences()
474 .iter_data()
475 .collect::<Vec<_>>()
476 .into_iter()
477 })
478 .map(|v| v.unwrap())
479 .collect::<Vec<_>>();
480 assert_eq!(vec![8, 0, 6, 1, 7, 5, 4, 9], read);
481
482 let stats = memtable.stats();
483 assert!(stats.bytes_allocated() > 0);
484 assert_eq!(
485 Some((Timestamp::new_millisecond(0), Timestamp::new_millisecond(7))),
486 stats.time_range()
487 );
488 }
489
490 #[test]
491 fn test_memtable_projection() {
492 write_iter_projection(true);
493 write_iter_projection(false);
494 }
495
496 fn write_iter_projection(has_pk: bool) {
497 let metadata = if has_pk {
498 memtable_util::metadata_with_primary_key(vec![1, 0], true)
499 } else {
500 memtable_util::metadata_with_primary_key(vec![], false)
501 };
502 let memtable = PartitionTreeMemtableBuilder::new(PartitionTreeConfig::default(), None)
504 .build(1, &metadata);
505
506 let expect = (0..100).collect::<Vec<_>>();
507 let kvs = memtable_util::build_key_values(&metadata, "hello".to_string(), 10, &expect, 1);
508 memtable.write(&kvs).unwrap();
509 let iter = memtable.iter(Some(&[3]), None, None).unwrap();
510
511 let mut v0_all = vec![];
512 for res in iter {
513 let batch = res.unwrap();
514 assert_eq!(1, batch.fields().len());
515 let v0 = batch
516 .fields()
517 .first()
518 .unwrap()
519 .data
520 .as_any()
521 .downcast_ref::<Int64Vector>()
522 .unwrap();
523 v0_all.extend(v0.iter_data().map(|v| v.unwrap()));
524 }
525 assert_eq!(expect, v0_all);
526 }
527
528 #[test]
529 fn test_write_iter_multi_keys() {
530 write_iter_multi_keys(1, 100);
531 write_iter_multi_keys(2, 100);
532 write_iter_multi_keys(4, 100);
533 write_iter_multi_keys(8, 5);
534 write_iter_multi_keys(2, 10);
535 }
536
537 fn write_iter_multi_keys(max_keys: usize, freeze_threshold: usize) {
538 let metadata = memtable_util::metadata_with_primary_key(vec![1, 0], true);
539 let codec = Arc::new(DensePrimaryKeyCodec::new(&metadata));
540 let memtable = PartitionTreeMemtable::new(
541 1,
542 codec,
543 metadata.clone(),
544 None,
545 &PartitionTreeConfig {
546 index_max_keys_per_shard: max_keys,
547 data_freeze_threshold: freeze_threshold,
548 ..Default::default()
549 },
550 );
551
552 let mut data = Vec::new();
553 for i in 0..4 {
555 for j in 0..4 {
556 let timestamps = [11, 13, 1, 5, 3, 7, 9];
558 let key = format!("a{j}");
559 let kvs =
560 memtable_util::build_key_values(&metadata, key.clone(), i, ×tamps, 0);
561 memtable.write(&kvs).unwrap();
562 for ts in timestamps {
563 data.push((i, key.clone(), ts));
564 }
565 }
566 for j in 0..4 {
567 let timestamps = [10, 2, 4, 8, 6];
569 let key = format!("a{j}");
570 let kvs =
571 memtable_util::build_key_values(&metadata, key.clone(), i, ×tamps, 200);
572 memtable.write(&kvs).unwrap();
573 for ts in timestamps {
574 data.push((i, key.clone(), ts));
575 }
576 }
577 }
578 data.sort_unstable();
579
580 let expect = data.into_iter().map(|x| x.2).collect::<Vec<_>>();
581 let iter = memtable.iter(None, None, None).unwrap();
582 let read = collect_iter_timestamps(iter);
583 assert_eq!(expect, read);
584 }
585
586 #[test]
587 fn test_memtable_filter() {
588 let metadata = memtable_util::metadata_with_primary_key(vec![0, 1], false);
589 let memtable = PartitionTreeMemtableBuilder::new(
591 PartitionTreeConfig {
592 index_max_keys_per_shard: 40,
593 ..Default::default()
594 },
595 None,
596 )
597 .build(1, &metadata);
598
599 for i in 0..100 {
600 let timestamps: Vec<_> = (0..10).map(|v| i as i64 * 1000 + v).collect();
601 let kvs =
602 memtable_util::build_key_values(&metadata, "hello".to_string(), i, ×tamps, 1);
603 memtable.write(&kvs).unwrap();
604 }
605
606 for i in 0..100 {
607 let timestamps: Vec<_> = (0..10).map(|v| i as i64 * 1000 + v).collect();
608 let expr = Expr::BinaryExpr(BinaryExpr {
609 left: Box::new(Expr::Column(Column::from_name("k1"))),
610 op: Operator::Eq,
611 right: Box::new(Expr::Literal(ScalarValue::UInt32(Some(i)))),
612 });
613 let iter = memtable
614 .iter(None, Some(Predicate::new(vec![expr])), None)
615 .unwrap();
616 let read = collect_iter_timestamps(iter);
617 assert_eq!(timestamps, read);
618 }
619 }
620
621 #[test]
622 fn test_deserialize_config() {
623 let config = PartitionTreeConfig {
624 dedup: false,
625 ..Default::default()
626 };
627 let json = serde_json::to_string(&config).unwrap();
629 let config: PartitionTreeConfig = serde_json::from_str(&json).unwrap();
630 assert!(config.dedup);
631 assert_eq!(PartitionTreeConfig::default(), config);
632 }
633
634 fn metadata_for_metric_engine() -> RegionMetadataRef {
635 let mut builder = RegionMetadataBuilder::new(RegionId::new(123, 456));
636 builder
637 .push_column_metadata(ColumnMetadata {
638 column_schema: ColumnSchema::new(
639 "__table_id",
640 ConcreteDataType::uint32_datatype(),
641 false,
642 ),
643 semantic_type: SemanticType::Tag,
644 column_id: 2147483652,
645 })
646 .push_column_metadata(ColumnMetadata {
647 column_schema: ColumnSchema::new(
648 "__tsid",
649 ConcreteDataType::uint64_datatype(),
650 false,
651 ),
652 semantic_type: SemanticType::Tag,
653 column_id: 2147483651,
654 })
655 .push_column_metadata(ColumnMetadata {
656 column_schema: ColumnSchema::new(
657 "test_label",
658 ConcreteDataType::string_datatype(),
659 false,
660 ),
661 semantic_type: SemanticType::Tag,
662 column_id: 2,
663 })
664 .push_column_metadata(ColumnMetadata {
665 column_schema: ColumnSchema::new(
666 "greptime_timestamp",
667 ConcreteDataType::timestamp_millisecond_datatype(),
668 false,
669 ),
670 semantic_type: SemanticType::Timestamp,
671 column_id: 0,
672 })
673 .push_column_metadata(ColumnMetadata {
674 column_schema: ColumnSchema::new(
675 "greptime_value",
676 ConcreteDataType::float64_datatype(),
677 true,
678 ),
679 semantic_type: SemanticType::Field,
680 column_id: 1,
681 })
682 .primary_key(vec![2147483652, 2147483651, 2]);
683 let region_metadata = builder.build().unwrap();
684 Arc::new(region_metadata)
685 }
686
687 fn build_key_values(
688 metadata: RegionMetadataRef,
689 labels: &[&str],
690 table_id: &[u32],
691 ts_id: &[u64],
692 ts: &[i64],
693 values: &[f64],
694 sequence: u64,
695 ) -> KeyValues {
696 let column_schema = region_metadata_to_row_schema(&metadata);
697
698 let rows = ts
699 .iter()
700 .zip(table_id.iter())
701 .zip(ts_id.iter())
702 .zip(labels.iter())
703 .zip(values.iter())
704 .map(|((((ts, table_id), ts_id), label), val)| Row {
705 values: vec![
706 api::v1::Value {
707 value_data: Some(ValueData::U32Value(*table_id)),
708 },
709 api::v1::Value {
710 value_data: Some(ValueData::U64Value(*ts_id)),
711 },
712 api::v1::Value {
713 value_data: Some(ValueData::StringValue(label.to_string())),
714 },
715 api::v1::Value {
716 value_data: Some(ValueData::TimestampMillisecondValue(*ts)),
717 },
718 api::v1::Value {
719 value_data: Some(ValueData::F64Value(*val)),
720 },
721 ],
722 })
723 .collect();
724 let mutation = api::v1::Mutation {
725 op_type: 1,
726 sequence,
727 rows: Some(Rows {
728 schema: column_schema,
729 rows,
730 }),
731 write_hint: None,
732 };
733 KeyValues::new(metadata.as_ref(), mutation).unwrap()
734 }
735
736 #[test]
737 fn test_write_freeze() {
738 let metadata = metadata_for_metric_engine();
739 let memtable = PartitionTreeMemtableBuilder::new(
740 PartitionTreeConfig {
741 index_max_keys_per_shard: 40,
742 ..Default::default()
743 },
744 None,
745 )
746 .build(1, &metadata);
747
748 let codec = DensePrimaryKeyCodec::new(&metadata);
749
750 memtable
751 .write(&build_key_values(
752 metadata.clone(),
753 &["daily", "10min", "daily", "10min"],
754 &[1025, 1025, 1025, 1025],
755 &[
756 16442255374049317291,
757 5686004715529701024,
758 16442255374049317291,
759 5686004715529701024,
760 ],
761 &[1712070000000, 1712717731000, 1712761200000, 1712761200000],
762 &[0.0, 0.0, 0.0, 0.0],
763 1,
764 ))
765 .unwrap();
766
767 memtable.freeze().unwrap();
768 let new_memtable = memtable.fork(2, &metadata);
769
770 new_memtable
771 .write(&build_key_values(
772 metadata.clone(),
773 &["10min"],
774 &[1025],
775 &[5686004715529701024],
776 &[1714643131000],
777 &[0.1],
778 2,
779 ))
780 .unwrap();
781
782 let mut reader = new_memtable.iter(None, None, None).unwrap();
783 let batch = reader.next().unwrap().unwrap();
784 let pk = codec.decode(batch.primary_key()).unwrap().into_dense();
785 if let Value::String(s) = &pk[2] {
786 assert_eq!("10min", s.as_utf8());
787 } else {
788 unreachable!()
789 }
790 }
791}