1pub(crate) mod data;
18mod dedup;
19mod dict;
20mod merger;
21mod partition;
22mod shard;
23mod shard_builder;
24mod tree;
25
26use std::fmt;
27use std::sync::atomic::{AtomicI64, AtomicU64, AtomicUsize, Ordering};
28use std::sync::Arc;
29
30use common_base::readable_size::ReadableSize;
31use mito_codec::key_values::KeyValue;
32use mito_codec::row_converter::{build_primary_key_codec, PrimaryKeyCodec};
33use serde::{Deserialize, Serialize};
34use store_api::metadata::RegionMetadataRef;
35use store_api::storage::{ColumnId, SequenceNumber};
36use table::predicate::Predicate;
37
38use crate::error::{Result, UnsupportedOperationSnafu};
39use crate::flush::WriteBufferManagerRef;
40use crate::memtable::bulk::part::BulkPart;
41use crate::memtable::partition_tree::tree::PartitionTree;
42use crate::memtable::stats::WriteMetrics;
43use crate::memtable::{
44 AllocTracker, BoxedBatchIterator, IterBuilder, KeyValues, Memtable, MemtableBuilder,
45 MemtableId, MemtableRange, MemtableRangeContext, MemtableRanges, MemtableRef, MemtableStats,
46 PredicateGroup,
47};
48use crate::region::options::MergeMode;
49
50pub(crate) const DICTIONARY_SIZE_FACTOR: u64 = 8;
52pub(crate) const DEFAULT_MAX_KEYS_PER_SHARD: usize = 8192;
53pub(crate) const DEFAULT_FREEZE_THRESHOLD: usize = 131072;
54
55type ShardId = u32;
57type PkIndex = u16;
59
60#[derive(Debug, Clone, Copy, PartialEq, Eq)]
62struct PkId {
63 shard_id: ShardId,
64 pk_index: PkIndex,
65}
66
67#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
72#[serde(default)]
73pub struct PartitionTreeConfig {
74 pub index_max_keys_per_shard: usize,
76 pub data_freeze_threshold: usize,
78 #[serde(skip_deserializing)]
83 pub dedup: bool,
84 pub fork_dictionary_bytes: ReadableSize,
86 #[serde(skip_deserializing)]
88 pub merge_mode: MergeMode,
89}
90
91impl Default for PartitionTreeConfig {
92 fn default() -> Self {
93 let mut fork_dictionary_bytes = ReadableSize::mb(512);
94 if let Some(sys_memory) = common_config::utils::get_sys_total_memory() {
95 let adjust_dictionary_bytes =
96 std::cmp::min(sys_memory / DICTIONARY_SIZE_FACTOR, fork_dictionary_bytes);
97 if adjust_dictionary_bytes.0 > 0 {
98 fork_dictionary_bytes = adjust_dictionary_bytes;
99 }
100 }
101
102 Self {
103 index_max_keys_per_shard: 8192,
104 data_freeze_threshold: 131072,
105 dedup: true,
106 fork_dictionary_bytes,
107 merge_mode: MergeMode::LastRow,
108 }
109 }
110}
111
112pub struct PartitionTreeMemtable {
114 id: MemtableId,
115 tree: Arc<PartitionTree>,
116 alloc_tracker: AllocTracker,
117 max_timestamp: AtomicI64,
118 min_timestamp: AtomicI64,
119 max_sequence: AtomicU64,
120 num_rows: AtomicUsize,
122}
123
124impl fmt::Debug for PartitionTreeMemtable {
125 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
126 f.debug_struct("PartitionTreeMemtable")
127 .field("id", &self.id)
128 .finish()
129 }
130}
131
132impl Memtable for PartitionTreeMemtable {
133 fn id(&self) -> MemtableId {
134 self.id
135 }
136
137 fn write(&self, kvs: &KeyValues) -> Result<()> {
138 if kvs.is_empty() {
139 return Ok(());
140 }
141
142 let mut metrics = WriteMetrics::default();
145 let mut pk_buffer = Vec::new();
146 let res = self.tree.write(kvs, &mut pk_buffer, &mut metrics);
148
149 if res.is_ok() {
150 metrics.max_sequence = kvs.max_sequence();
151 metrics.num_rows = kvs.num_rows();
152 self.update_stats(&metrics);
153 }
154 res
155 }
156
157 fn write_one(&self, key_value: KeyValue) -> Result<()> {
158 let mut metrics = WriteMetrics::default();
159 let mut pk_buffer = Vec::new();
160 let res = self.tree.write_one(key_value, &mut pk_buffer, &mut metrics);
162
163 if res.is_ok() {
165 metrics.max_sequence = metrics.max_sequence.max(key_value.sequence());
166 metrics.num_rows = 1;
167 self.update_stats(&metrics);
168 }
169 res
170 }
171
172 fn write_bulk(&self, _part: BulkPart) -> Result<()> {
173 UnsupportedOperationSnafu {
174 err_msg: "PartitionTreeMemtable does not support write_bulk",
175 }
176 .fail()
177 }
178
179 fn iter(
180 &self,
181 projection: Option<&[ColumnId]>,
182 predicate: Option<Predicate>,
183 sequence: Option<SequenceNumber>,
184 ) -> Result<BoxedBatchIterator> {
185 self.tree.read(projection, predicate, sequence)
186 }
187
188 fn ranges(
189 &self,
190 projection: Option<&[ColumnId]>,
191 predicate: PredicateGroup,
192 sequence: Option<SequenceNumber>,
193 ) -> Result<MemtableRanges> {
194 let projection = projection.map(|ids| ids.to_vec());
195 let builder = Box::new(PartitionTreeIterBuilder {
196 tree: self.tree.clone(),
197 projection,
198 predicate: predicate.predicate().cloned(),
199 sequence,
200 });
201 let context = Arc::new(MemtableRangeContext::new(self.id, builder, predicate));
202
203 Ok(MemtableRanges {
204 ranges: [(0, MemtableRange::new(context))].into(),
205 stats: self.stats(),
206 })
207 }
208
209 fn is_empty(&self) -> bool {
210 self.tree.is_empty()
211 }
212
213 fn freeze(&self) -> Result<()> {
214 self.alloc_tracker.done_allocating();
215
216 self.tree.freeze()
217 }
218
219 fn stats(&self) -> MemtableStats {
220 let estimated_bytes = self.alloc_tracker.bytes_allocated();
221
222 if estimated_bytes == 0 {
223 return MemtableStats {
225 estimated_bytes,
226 time_range: None,
227 num_rows: 0,
228 num_ranges: 0,
229 max_sequence: 0,
230 };
231 }
232
233 let ts_type = self
234 .tree
235 .metadata
236 .time_index_column()
237 .column_schema
238 .data_type
239 .clone()
240 .as_timestamp()
241 .expect("Timestamp column must have timestamp type");
242 let max_timestamp = ts_type.create_timestamp(self.max_timestamp.load(Ordering::Relaxed));
243 let min_timestamp = ts_type.create_timestamp(self.min_timestamp.load(Ordering::Relaxed));
244 MemtableStats {
245 estimated_bytes,
246 time_range: Some((min_timestamp, max_timestamp)),
247 num_rows: self.num_rows.load(Ordering::Relaxed),
248 num_ranges: 1,
249 max_sequence: self.max_sequence.load(Ordering::Relaxed),
250 }
251 }
252
253 fn fork(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef {
254 let tree = self.tree.fork(metadata.clone());
255
256 let memtable = PartitionTreeMemtable::with_tree(id, tree);
257 Arc::new(memtable)
258 }
259}
260
261impl PartitionTreeMemtable {
262 pub fn new(
264 id: MemtableId,
265 row_codec: Arc<dyn PrimaryKeyCodec>,
266 metadata: RegionMetadataRef,
267 write_buffer_manager: Option<WriteBufferManagerRef>,
268 config: &PartitionTreeConfig,
269 ) -> Self {
270 Self::with_tree(
271 id,
272 PartitionTree::new(row_codec, metadata, config, write_buffer_manager.clone()),
273 )
274 }
275
276 fn with_tree(id: MemtableId, tree: PartitionTree) -> Self {
280 let alloc_tracker = AllocTracker::new(tree.write_buffer_manager());
281
282 Self {
283 id,
284 tree: Arc::new(tree),
285 alloc_tracker,
286 max_timestamp: AtomicI64::new(i64::MIN),
287 min_timestamp: AtomicI64::new(i64::MAX),
288 num_rows: AtomicUsize::new(0),
289 max_sequence: AtomicU64::new(0),
290 }
291 }
292
293 fn update_stats(&self, metrics: &WriteMetrics) {
295 self.alloc_tracker.on_allocation(metrics.value_bytes);
297 self.max_timestamp
298 .fetch_max(metrics.max_ts, Ordering::SeqCst);
299 self.min_timestamp
300 .fetch_min(metrics.min_ts, Ordering::SeqCst);
301 self.num_rows.fetch_add(metrics.num_rows, Ordering::SeqCst);
302 self.max_sequence
303 .fetch_max(metrics.max_sequence, Ordering::SeqCst);
304 }
305}
306
307#[derive(Debug, Default)]
309pub struct PartitionTreeMemtableBuilder {
310 config: PartitionTreeConfig,
311 write_buffer_manager: Option<WriteBufferManagerRef>,
312}
313
314impl PartitionTreeMemtableBuilder {
315 pub fn new(
317 config: PartitionTreeConfig,
318 write_buffer_manager: Option<WriteBufferManagerRef>,
319 ) -> Self {
320 Self {
321 config,
322 write_buffer_manager,
323 }
324 }
325}
326
327impl MemtableBuilder for PartitionTreeMemtableBuilder {
328 fn build(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef {
329 let codec = build_primary_key_codec(metadata);
330 Arc::new(PartitionTreeMemtable::new(
331 id,
332 codec,
333 metadata.clone(),
334 self.write_buffer_manager.clone(),
335 &self.config,
336 ))
337 }
338}
339
340struct PartitionTreeIterBuilder {
341 tree: Arc<PartitionTree>,
342 projection: Option<Vec<ColumnId>>,
343 predicate: Option<Predicate>,
344 sequence: Option<SequenceNumber>,
345}
346
347impl IterBuilder for PartitionTreeIterBuilder {
348 fn build(&self) -> Result<BoxedBatchIterator> {
349 self.tree.read(
350 self.projection.as_deref(),
351 self.predicate.clone(),
352 self.sequence,
353 )
354 }
355}
356
357#[cfg(test)]
358mod tests {
359 use api::v1::value::ValueData;
360 use api::v1::{Row, Rows, SemanticType};
361 use common_time::Timestamp;
362 use datafusion_common::{Column, ScalarValue};
363 use datafusion_expr::{BinaryExpr, Expr, Operator};
364 use datatypes::data_type::ConcreteDataType;
365 use datatypes::scalars::ScalarVector;
366 use datatypes::schema::ColumnSchema;
367 use datatypes::value::Value;
368 use datatypes::vectors::Int64Vector;
369 use mito_codec::row_converter::DensePrimaryKeyCodec;
370 use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
371 use store_api::storage::RegionId;
372
373 use super::*;
374 use crate::test_util::memtable_util::{
375 self, collect_iter_timestamps, region_metadata_to_row_schema,
376 };
377
378 #[test]
379 fn test_memtable_sorted_input() {
380 write_iter_sorted_input(true);
381 write_iter_sorted_input(false);
382 }
383
384 fn write_iter_sorted_input(has_pk: bool) {
385 let metadata = if has_pk {
386 memtable_util::metadata_with_primary_key(vec![1, 0], true)
387 } else {
388 memtable_util::metadata_with_primary_key(vec![], false)
389 };
390 let timestamps = (0..100).collect::<Vec<_>>();
391 let kvs =
392 memtable_util::build_key_values(&metadata, "hello".to_string(), 42, ×tamps, 1);
393 let codec = Arc::new(DensePrimaryKeyCodec::new(&metadata));
394 let memtable = PartitionTreeMemtable::new(
395 1,
396 codec,
397 metadata.clone(),
398 None,
399 &PartitionTreeConfig::default(),
400 );
401 memtable.write(&kvs).unwrap();
402
403 let expected_ts = kvs
404 .iter()
405 .map(|kv| kv.timestamp().as_timestamp().unwrap().unwrap().value())
406 .collect::<Vec<_>>();
407
408 let iter = memtable.iter(None, None, None).unwrap();
409 let read = collect_iter_timestamps(iter);
410 assert_eq!(expected_ts, read);
411
412 let stats = memtable.stats();
413 assert!(stats.bytes_allocated() > 0);
414 assert_eq!(
415 Some((
416 Timestamp::new_millisecond(0),
417 Timestamp::new_millisecond(99)
418 )),
419 stats.time_range()
420 );
421 }
422
423 #[test]
424 fn test_memtable_unsorted_input() {
425 write_iter_unsorted_input(true);
426 write_iter_unsorted_input(false);
427 }
428
429 fn write_iter_unsorted_input(has_pk: bool) {
430 let metadata = if has_pk {
431 memtable_util::metadata_with_primary_key(vec![1, 0], true)
432 } else {
433 memtable_util::metadata_with_primary_key(vec![], false)
434 };
435 let codec = Arc::new(DensePrimaryKeyCodec::new(&metadata));
436 let memtable = PartitionTreeMemtable::new(
437 1,
438 codec,
439 metadata.clone(),
440 None,
441 &PartitionTreeConfig::default(),
442 );
443
444 let kvs = memtable_util::build_key_values(
445 &metadata,
446 "hello".to_string(),
447 0,
448 &[1, 3, 7, 5, 6],
449 0, );
451 memtable.write(&kvs).unwrap();
452
453 let kvs = memtable_util::build_key_values(
454 &metadata,
455 "hello".to_string(),
456 0,
457 &[5, 2, 4, 0, 7],
458 5, );
460 memtable.write(&kvs).unwrap();
461
462 let iter = memtable.iter(None, None, None).unwrap();
463 let read = collect_iter_timestamps(iter);
464 assert_eq!(vec![0, 1, 2, 3, 4, 5, 6, 7], read);
465
466 let iter = memtable.iter(None, None, None).unwrap();
467 let read = iter
468 .flat_map(|batch| {
469 batch
470 .unwrap()
471 .sequences()
472 .iter_data()
473 .collect::<Vec<_>>()
474 .into_iter()
475 })
476 .map(|v| v.unwrap())
477 .collect::<Vec<_>>();
478 assert_eq!(vec![8, 0, 6, 1, 7, 5, 4, 9], read);
479
480 let stats = memtable.stats();
481 assert!(stats.bytes_allocated() > 0);
482 assert_eq!(
483 Some((Timestamp::new_millisecond(0), Timestamp::new_millisecond(7))),
484 stats.time_range()
485 );
486 }
487
488 #[test]
489 fn test_memtable_projection() {
490 write_iter_projection(true);
491 write_iter_projection(false);
492 }
493
494 fn write_iter_projection(has_pk: bool) {
495 let metadata = if has_pk {
496 memtable_util::metadata_with_primary_key(vec![1, 0], true)
497 } else {
498 memtable_util::metadata_with_primary_key(vec![], false)
499 };
500 let memtable = PartitionTreeMemtableBuilder::new(PartitionTreeConfig::default(), None)
502 .build(1, &metadata);
503
504 let expect = (0..100).collect::<Vec<_>>();
505 let kvs = memtable_util::build_key_values(&metadata, "hello".to_string(), 10, &expect, 1);
506 memtable.write(&kvs).unwrap();
507 let iter = memtable.iter(Some(&[3]), None, None).unwrap();
508
509 let mut v0_all = vec![];
510 for res in iter {
511 let batch = res.unwrap();
512 assert_eq!(1, batch.fields().len());
513 let v0 = batch
514 .fields()
515 .first()
516 .unwrap()
517 .data
518 .as_any()
519 .downcast_ref::<Int64Vector>()
520 .unwrap();
521 v0_all.extend(v0.iter_data().map(|v| v.unwrap()));
522 }
523 assert_eq!(expect, v0_all);
524 }
525
526 #[test]
527 fn test_write_iter_multi_keys() {
528 write_iter_multi_keys(1, 100);
529 write_iter_multi_keys(2, 100);
530 write_iter_multi_keys(4, 100);
531 write_iter_multi_keys(8, 5);
532 write_iter_multi_keys(2, 10);
533 }
534
535 fn write_iter_multi_keys(max_keys: usize, freeze_threshold: usize) {
536 let metadata = memtable_util::metadata_with_primary_key(vec![1, 0], true);
537 let codec = Arc::new(DensePrimaryKeyCodec::new(&metadata));
538 let memtable = PartitionTreeMemtable::new(
539 1,
540 codec,
541 metadata.clone(),
542 None,
543 &PartitionTreeConfig {
544 index_max_keys_per_shard: max_keys,
545 data_freeze_threshold: freeze_threshold,
546 ..Default::default()
547 },
548 );
549
550 let mut data = Vec::new();
551 for i in 0..4 {
553 for j in 0..4 {
554 let timestamps = [11, 13, 1, 5, 3, 7, 9];
556 let key = format!("a{j}");
557 let kvs =
558 memtable_util::build_key_values(&metadata, key.clone(), i, ×tamps, 0);
559 memtable.write(&kvs).unwrap();
560 for ts in timestamps {
561 data.push((i, key.clone(), ts));
562 }
563 }
564 for j in 0..4 {
565 let timestamps = [10, 2, 4, 8, 6];
567 let key = format!("a{j}");
568 let kvs =
569 memtable_util::build_key_values(&metadata, key.clone(), i, ×tamps, 200);
570 memtable.write(&kvs).unwrap();
571 for ts in timestamps {
572 data.push((i, key.clone(), ts));
573 }
574 }
575 }
576 data.sort_unstable();
577
578 let expect = data.into_iter().map(|x| x.2).collect::<Vec<_>>();
579 let iter = memtable.iter(None, None, None).unwrap();
580 let read = collect_iter_timestamps(iter);
581 assert_eq!(expect, read);
582 }
583
584 #[test]
585 fn test_memtable_filter() {
586 let metadata = memtable_util::metadata_with_primary_key(vec![0, 1], false);
587 let memtable = PartitionTreeMemtableBuilder::new(
589 PartitionTreeConfig {
590 index_max_keys_per_shard: 40,
591 ..Default::default()
592 },
593 None,
594 )
595 .build(1, &metadata);
596
597 for i in 0..100 {
598 let timestamps: Vec<_> = (0..10).map(|v| i as i64 * 1000 + v).collect();
599 let kvs =
600 memtable_util::build_key_values(&metadata, "hello".to_string(), i, ×tamps, 1);
601 memtable.write(&kvs).unwrap();
602 }
603
604 for i in 0..100 {
605 let timestamps: Vec<_> = (0..10).map(|v| i as i64 * 1000 + v).collect();
606 let expr = Expr::BinaryExpr(BinaryExpr {
607 left: Box::new(Expr::Column(Column::from_name("k1"))),
608 op: Operator::Eq,
609 right: Box::new(Expr::Literal(ScalarValue::UInt32(Some(i)))),
610 });
611 let iter = memtable
612 .iter(None, Some(Predicate::new(vec![expr])), None)
613 .unwrap();
614 let read = collect_iter_timestamps(iter);
615 assert_eq!(timestamps, read);
616 }
617 }
618
619 #[test]
620 fn test_deserialize_config() {
621 let config = PartitionTreeConfig {
622 dedup: false,
623 ..Default::default()
624 };
625 let json = serde_json::to_string(&config).unwrap();
627 let config: PartitionTreeConfig = serde_json::from_str(&json).unwrap();
628 assert!(config.dedup);
629 assert_eq!(PartitionTreeConfig::default(), config);
630 }
631
632 fn metadata_for_metric_engine() -> RegionMetadataRef {
633 let mut builder = RegionMetadataBuilder::new(RegionId::new(123, 456));
634 builder
635 .push_column_metadata(ColumnMetadata {
636 column_schema: ColumnSchema::new(
637 "__table_id",
638 ConcreteDataType::uint32_datatype(),
639 false,
640 ),
641 semantic_type: SemanticType::Tag,
642 column_id: 2147483652,
643 })
644 .push_column_metadata(ColumnMetadata {
645 column_schema: ColumnSchema::new(
646 "__tsid",
647 ConcreteDataType::uint64_datatype(),
648 false,
649 ),
650 semantic_type: SemanticType::Tag,
651 column_id: 2147483651,
652 })
653 .push_column_metadata(ColumnMetadata {
654 column_schema: ColumnSchema::new(
655 "test_label",
656 ConcreteDataType::string_datatype(),
657 false,
658 ),
659 semantic_type: SemanticType::Tag,
660 column_id: 2,
661 })
662 .push_column_metadata(ColumnMetadata {
663 column_schema: ColumnSchema::new(
664 "greptime_timestamp",
665 ConcreteDataType::timestamp_millisecond_datatype(),
666 false,
667 ),
668 semantic_type: SemanticType::Timestamp,
669 column_id: 0,
670 })
671 .push_column_metadata(ColumnMetadata {
672 column_schema: ColumnSchema::new(
673 "greptime_value",
674 ConcreteDataType::float64_datatype(),
675 true,
676 ),
677 semantic_type: SemanticType::Field,
678 column_id: 1,
679 })
680 .primary_key(vec![2147483652, 2147483651, 2]);
681 let region_metadata = builder.build().unwrap();
682 Arc::new(region_metadata)
683 }
684
685 fn build_key_values(
686 metadata: RegionMetadataRef,
687 labels: &[&str],
688 table_id: &[u32],
689 ts_id: &[u64],
690 ts: &[i64],
691 values: &[f64],
692 sequence: u64,
693 ) -> KeyValues {
694 let column_schema = region_metadata_to_row_schema(&metadata);
695
696 let rows = ts
697 .iter()
698 .zip(table_id.iter())
699 .zip(ts_id.iter())
700 .zip(labels.iter())
701 .zip(values.iter())
702 .map(|((((ts, table_id), ts_id), label), val)| Row {
703 values: vec![
704 api::v1::Value {
705 value_data: Some(ValueData::U32Value(*table_id)),
706 },
707 api::v1::Value {
708 value_data: Some(ValueData::U64Value(*ts_id)),
709 },
710 api::v1::Value {
711 value_data: Some(ValueData::StringValue(label.to_string())),
712 },
713 api::v1::Value {
714 value_data: Some(ValueData::TimestampMillisecondValue(*ts)),
715 },
716 api::v1::Value {
717 value_data: Some(ValueData::F64Value(*val)),
718 },
719 ],
720 })
721 .collect();
722 let mutation = api::v1::Mutation {
723 op_type: 1,
724 sequence,
725 rows: Some(Rows {
726 schema: column_schema,
727 rows,
728 }),
729 write_hint: None,
730 };
731 KeyValues::new(metadata.as_ref(), mutation).unwrap()
732 }
733
734 #[test]
735 fn test_write_freeze() {
736 let metadata = metadata_for_metric_engine();
737 let memtable = PartitionTreeMemtableBuilder::new(
738 PartitionTreeConfig {
739 index_max_keys_per_shard: 40,
740 ..Default::default()
741 },
742 None,
743 )
744 .build(1, &metadata);
745
746 let codec = DensePrimaryKeyCodec::new(&metadata);
747
748 memtable
749 .write(&build_key_values(
750 metadata.clone(),
751 &["daily", "10min", "daily", "10min"],
752 &[1025, 1025, 1025, 1025],
753 &[
754 16442255374049317291,
755 5686004715529701024,
756 16442255374049317291,
757 5686004715529701024,
758 ],
759 &[1712070000000, 1712717731000, 1712761200000, 1712761200000],
760 &[0.0, 0.0, 0.0, 0.0],
761 1,
762 ))
763 .unwrap();
764
765 memtable.freeze().unwrap();
766 let new_memtable = memtable.fork(2, &metadata);
767
768 new_memtable
769 .write(&build_key_values(
770 metadata.clone(),
771 &["10min"],
772 &[1025],
773 &[5686004715529701024],
774 &[1714643131000],
775 &[0.1],
776 2,
777 ))
778 .unwrap();
779
780 let mut reader = new_memtable.iter(None, None, None).unwrap();
781 let batch = reader.next().unwrap().unwrap();
782 let pk = codec.decode(batch.primary_key()).unwrap().into_dense();
783 if let Value::String(s) = &pk[2] {
784 assert_eq!("10min", s.as_utf8());
785 } else {
786 unreachable!()
787 }
788 }
789}