1pub(crate) mod data;
18mod dedup;
19mod dict;
20mod merger;
21mod partition;
22mod shard;
23mod shard_builder;
24mod tree;
25
26use std::fmt;
27use std::sync::Arc;
28use std::sync::atomic::{AtomicI64, AtomicU64, AtomicUsize, Ordering};
29
30use common_base::readable_size::ReadableSize;
31use common_stat::get_total_memory_readable;
32use mito_codec::key_values::KeyValue;
33use mito_codec::row_converter::{PrimaryKeyCodec, build_primary_key_codec};
34use serde::{Deserialize, Serialize};
35use store_api::metadata::RegionMetadataRef;
36use store_api::storage::{ColumnId, SequenceRange};
37use table::predicate::Predicate;
38
39use crate::error::{Result, UnsupportedOperationSnafu};
40use crate::flush::WriteBufferManagerRef;
41use crate::memtable::bulk::part::BulkPart;
42use crate::memtable::partition_tree::tree::PartitionTree;
43use crate::memtable::stats::WriteMetrics;
44use crate::memtable::{
45 AllocTracker, BatchToRecordBatchContext, BoxedBatchIterator, IterBuilder, KeyValues,
46 MemScanMetrics, Memtable, MemtableBuilder, MemtableId, MemtableRange, MemtableRangeContext,
47 MemtableRanges, MemtableRef, MemtableStats, RangesOptions, read_column_ids_from_projection,
48};
49use crate::region::options::MergeMode;
50
51pub(crate) const DICTIONARY_SIZE_FACTOR: u64 = 8;
53pub(crate) const DEFAULT_MAX_KEYS_PER_SHARD: usize = 8192;
54pub(crate) const DEFAULT_FREEZE_THRESHOLD: usize = 131072;
55
56type ShardId = u32;
58type PkIndex = u16;
60
61#[derive(Debug, Clone, Copy, PartialEq, Eq)]
63struct PkId {
64 shard_id: ShardId,
65 pk_index: PkIndex,
66}
67
68#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
73#[serde(default)]
74pub struct PartitionTreeConfig {
75 pub index_max_keys_per_shard: usize,
77 pub data_freeze_threshold: usize,
79 #[serde(skip_deserializing)]
84 pub dedup: bool,
85 pub fork_dictionary_bytes: ReadableSize,
87 #[serde(skip_deserializing)]
89 pub merge_mode: MergeMode,
90}
91
92impl Default for PartitionTreeConfig {
93 fn default() -> Self {
94 let mut fork_dictionary_bytes = ReadableSize::mb(512);
95 if let Some(total_memory) = get_total_memory_readable() {
96 let adjust_dictionary_bytes =
97 std::cmp::min(total_memory / DICTIONARY_SIZE_FACTOR, fork_dictionary_bytes);
98 if adjust_dictionary_bytes.0 > 0 {
99 fork_dictionary_bytes = adjust_dictionary_bytes;
100 }
101 }
102
103 Self {
104 index_max_keys_per_shard: 8192,
105 data_freeze_threshold: 131072,
106 dedup: true,
107 fork_dictionary_bytes,
108 merge_mode: MergeMode::LastRow,
109 }
110 }
111}
112
113pub struct PartitionTreeMemtable {
115 id: MemtableId,
116 tree: Arc<PartitionTree>,
117 alloc_tracker: AllocTracker,
118 max_timestamp: AtomicI64,
119 min_timestamp: AtomicI64,
120 max_sequence: AtomicU64,
121 num_rows: AtomicUsize,
123}
124
125impl fmt::Debug for PartitionTreeMemtable {
126 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
127 f.debug_struct("PartitionTreeMemtable")
128 .field("id", &self.id)
129 .finish()
130 }
131}
132
133impl Memtable for PartitionTreeMemtable {
134 fn id(&self) -> MemtableId {
135 self.id
136 }
137
138 fn write(&self, kvs: &KeyValues) -> Result<()> {
139 if kvs.is_empty() {
140 return Ok(());
141 }
142
143 let mut metrics = WriteMetrics::default();
146 let mut pk_buffer = Vec::new();
147 let res = self.tree.write(kvs, &mut pk_buffer, &mut metrics);
149
150 if res.is_ok() {
151 metrics.max_sequence = kvs.max_sequence();
152 metrics.num_rows = kvs.num_rows();
153 self.update_stats(&metrics);
154 }
155 res
156 }
157
158 fn write_one(&self, key_value: KeyValue) -> Result<()> {
159 let mut metrics = WriteMetrics::default();
160 let mut pk_buffer = Vec::new();
161 let res = self.tree.write_one(key_value, &mut pk_buffer, &mut metrics);
163
164 if res.is_ok() {
166 metrics.max_sequence = metrics.max_sequence.max(key_value.sequence());
167 metrics.num_rows = 1;
168 self.update_stats(&metrics);
169 }
170 res
171 }
172
173 fn write_bulk(&self, _part: BulkPart) -> Result<()> {
174 UnsupportedOperationSnafu {
175 err_msg: "PartitionTreeMemtable does not support write_bulk",
176 }
177 .fail()
178 }
179
180 fn ranges(
181 &self,
182 projection: Option<&[ColumnId]>,
183 options: RangesOptions,
184 ) -> Result<MemtableRanges> {
185 let predicate = options.predicate;
186 let sequence = options.sequence;
187 let read_column_ids = read_column_ids_from_projection(&self.tree.metadata, projection);
188 let projection = projection.map(|ids| ids.to_vec());
189 let builder = Box::new(PartitionTreeIterBuilder {
190 tree: self.tree.clone(),
191 projection,
192 predicate: predicate.predicate().cloned(),
193 sequence,
194 });
195 let adapter_context = Arc::new(BatchToRecordBatchContext::new(
196 self.tree.metadata.clone(),
197 read_column_ids,
198 ));
199 let context = Arc::new(MemtableRangeContext::new_with_batch_to_record_batch(
200 self.id,
201 builder,
202 predicate,
203 Some(adapter_context),
204 ));
205
206 let range_stats = self.stats();
207 let range = MemtableRange::new(context, range_stats);
208 Ok(MemtableRanges {
209 ranges: [(0, range)].into(),
210 })
211 }
212
213 fn is_empty(&self) -> bool {
214 self.tree.is_empty()
215 }
216
217 fn freeze(&self) -> Result<()> {
218 self.alloc_tracker.done_allocating();
219
220 self.tree.freeze()
221 }
222
223 fn stats(&self) -> MemtableStats {
224 let estimated_bytes = self.alloc_tracker.bytes_allocated();
225
226 if estimated_bytes == 0 {
227 return MemtableStats {
229 estimated_bytes,
230 time_range: None,
231 num_rows: 0,
232 num_ranges: 0,
233 max_sequence: 0,
234 series_count: 0,
235 };
236 }
237
238 let ts_type = self
239 .tree
240 .metadata
241 .time_index_column()
242 .column_schema
243 .data_type
244 .clone()
245 .as_timestamp()
246 .expect("Timestamp column must have timestamp type");
247 let max_timestamp = ts_type.create_timestamp(self.max_timestamp.load(Ordering::Relaxed));
248 let min_timestamp = ts_type.create_timestamp(self.min_timestamp.load(Ordering::Relaxed));
249 let series_count = self.tree.series_count();
250 MemtableStats {
251 estimated_bytes,
252 time_range: Some((min_timestamp, max_timestamp)),
253 num_rows: self.num_rows.load(Ordering::Relaxed),
254 num_ranges: 1,
255 max_sequence: self.max_sequence.load(Ordering::Relaxed),
256 series_count,
257 }
258 }
259
260 fn fork(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef {
261 let tree = self.tree.fork(metadata.clone());
262
263 let memtable = PartitionTreeMemtable::with_tree(id, tree);
264 Arc::new(memtable)
265 }
266}
267
268impl PartitionTreeMemtable {
269 pub fn new(
271 id: MemtableId,
272 row_codec: Arc<dyn PrimaryKeyCodec>,
273 metadata: RegionMetadataRef,
274 write_buffer_manager: Option<WriteBufferManagerRef>,
275 config: &PartitionTreeConfig,
276 ) -> Self {
277 Self::with_tree(
278 id,
279 PartitionTree::new(row_codec, metadata, config, write_buffer_manager.clone()),
280 )
281 }
282
283 fn with_tree(id: MemtableId, tree: PartitionTree) -> Self {
287 let alloc_tracker = AllocTracker::new(tree.write_buffer_manager());
288
289 Self {
290 id,
291 tree: Arc::new(tree),
292 alloc_tracker,
293 max_timestamp: AtomicI64::new(i64::MIN),
294 min_timestamp: AtomicI64::new(i64::MAX),
295 num_rows: AtomicUsize::new(0),
296 max_sequence: AtomicU64::new(0),
297 }
298 }
299
300 fn update_stats(&self, metrics: &WriteMetrics) {
302 self.alloc_tracker.on_allocation(metrics.value_bytes);
304 self.max_timestamp
305 .fetch_max(metrics.max_ts, Ordering::SeqCst);
306 self.min_timestamp
307 .fetch_min(metrics.min_ts, Ordering::SeqCst);
308 self.num_rows.fetch_add(metrics.num_rows, Ordering::SeqCst);
309 self.max_sequence
310 .fetch_max(metrics.max_sequence, Ordering::SeqCst);
311 }
312
313 #[cfg(any(test, feature = "test"))]
314 pub fn iter(
315 &self,
316 projection: Option<&[ColumnId]>,
317 predicate: Option<Predicate>,
318 sequence: Option<SequenceRange>,
319 ) -> Result<BoxedBatchIterator> {
320 self.tree.read(projection, predicate, sequence, None)
321 }
322}
323
324#[derive(Debug, Default)]
326pub struct PartitionTreeMemtableBuilder {
327 config: PartitionTreeConfig,
328 write_buffer_manager: Option<WriteBufferManagerRef>,
329}
330
331impl PartitionTreeMemtableBuilder {
332 pub fn new(
334 config: PartitionTreeConfig,
335 write_buffer_manager: Option<WriteBufferManagerRef>,
336 ) -> Self {
337 Self {
338 config,
339 write_buffer_manager,
340 }
341 }
342}
343
344impl MemtableBuilder for PartitionTreeMemtableBuilder {
345 fn build(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef {
346 let codec = build_primary_key_codec(metadata);
347 Arc::new(PartitionTreeMemtable::new(
348 id,
349 codec,
350 metadata.clone(),
351 self.write_buffer_manager.clone(),
352 &self.config,
353 ))
354 }
355
356 fn use_bulk_insert(&self, _metadata: &RegionMetadataRef) -> bool {
357 false
358 }
359}
360
361struct PartitionTreeIterBuilder {
362 tree: Arc<PartitionTree>,
363 projection: Option<Vec<ColumnId>>,
364 predicate: Option<Predicate>,
365 sequence: Option<SequenceRange>,
366}
367
368impl IterBuilder for PartitionTreeIterBuilder {
369 fn build(&self, metrics: Option<MemScanMetrics>) -> Result<BoxedBatchIterator> {
370 self.tree.read(
371 self.projection.as_deref(),
372 self.predicate.clone(),
373 self.sequence,
374 metrics,
375 )
376 }
377}
378
379#[cfg(test)]
380mod tests {
381 use std::collections::HashMap;
382 use std::sync::Arc;
383
384 use api::v1::helper::{field_column_schema, row, tag_column_schema, time_index_column_schema};
385 use api::v1::value::ValueData;
386 use api::v1::{Mutation, OpType, Rows, SemanticType};
387 use common_query::prelude::{greptime_timestamp, greptime_value};
388 use common_time::Timestamp;
389 use datatypes::data_type::ConcreteDataType;
390 use datatypes::prelude::Vector;
391 use datatypes::scalars::ScalarVector;
392 use datatypes::schema::ColumnSchema;
393 use datatypes::value::Value;
394 use datatypes::vectors::{Int64Vector, StringVector};
395 use mito_codec::row_converter::DensePrimaryKeyCodec;
396 use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
397 use store_api::storage::RegionId;
398
399 use super::*;
400 use crate::test_util::memtable_util::{
401 self, collect_iter_timestamps, region_metadata_to_row_schema,
402 };
403
404 #[test]
405 fn test_memtable_sorted_input() {
406 write_iter_sorted_input(true);
407 write_iter_sorted_input(false);
408 }
409
410 fn write_iter_sorted_input(has_pk: bool) {
411 let metadata = if has_pk {
412 Arc::new(memtable_util::metadata_with_primary_key(vec![1, 0], true))
413 } else {
414 Arc::new(memtable_util::metadata_with_primary_key(vec![], false))
415 };
416 let timestamps = (0..100).collect::<Vec<_>>();
417 let kvs =
418 memtable_util::build_key_values(&metadata, "hello".to_string(), 42, ×tamps, 1);
419 let codec = Arc::new(DensePrimaryKeyCodec::new(&metadata));
420 let memtable = PartitionTreeMemtable::new(
421 1,
422 codec,
423 metadata.clone(),
424 None,
425 &PartitionTreeConfig::default(),
426 );
427 memtable.write(&kvs).unwrap();
428
429 let expected_ts = kvs
430 .iter()
431 .map(|kv| {
432 kv.timestamp()
433 .try_into_timestamp()
434 .unwrap()
435 .unwrap()
436 .value()
437 })
438 .collect::<Vec<_>>();
439
440 let iter = memtable.iter(None, None, None).unwrap();
441 let read = collect_iter_timestamps(iter);
442 assert_eq!(expected_ts, read);
443
444 let stats = memtable.stats();
445 assert!(stats.bytes_allocated() > 0);
446 assert_eq!(
447 Some((
448 Timestamp::new_millisecond(0),
449 Timestamp::new_millisecond(99)
450 )),
451 stats.time_range()
452 );
453 }
454
455 #[test]
456 fn test_memtable_unsorted_input() {
457 write_iter_unsorted_input(true);
458 write_iter_unsorted_input(false);
459 }
460
461 fn write_iter_unsorted_input(has_pk: bool) {
462 let metadata = if has_pk {
463 Arc::new(memtable_util::metadata_with_primary_key(vec![1, 0], true))
464 } else {
465 Arc::new(memtable_util::metadata_with_primary_key(vec![], false))
466 };
467 let codec = Arc::new(DensePrimaryKeyCodec::new(&metadata));
468 let memtable = PartitionTreeMemtable::new(
469 1,
470 codec,
471 metadata.clone(),
472 None,
473 &PartitionTreeConfig::default(),
474 );
475
476 let kvs = memtable_util::build_key_values(
477 &metadata,
478 "hello".to_string(),
479 0,
480 &[1, 3, 7, 5, 6],
481 0, );
483 memtable.write(&kvs).unwrap();
484
485 let kvs = memtable_util::build_key_values(
486 &metadata,
487 "hello".to_string(),
488 0,
489 &[5, 2, 4, 0, 7],
490 5, );
492 memtable.write(&kvs).unwrap();
493
494 let iter = memtable.iter(None, None, None).unwrap();
495 let read = collect_iter_timestamps(iter);
496 assert_eq!(vec![0, 1, 2, 3, 4, 5, 6, 7], read);
497
498 let iter = memtable.iter(None, None, None).unwrap();
499 let read = iter
500 .flat_map(|batch| {
501 batch
502 .unwrap()
503 .sequences()
504 .iter_data()
505 .collect::<Vec<_>>()
506 .into_iter()
507 })
508 .map(|v| v.unwrap())
509 .collect::<Vec<_>>();
510 assert_eq!(vec![8, 0, 6, 1, 7, 5, 4, 9], read);
511
512 let stats = memtable.stats();
513 assert!(stats.bytes_allocated() > 0);
514 assert_eq!(
515 Some((Timestamp::new_millisecond(0), Timestamp::new_millisecond(7))),
516 stats.time_range()
517 );
518 }
519
520 #[test]
521 fn test_memtable_projection() {
522 write_iter_projection(true);
523 write_iter_projection(false);
524 }
525
526 fn write_iter_projection(has_pk: bool) {
527 let metadata = if has_pk {
528 Arc::new(memtable_util::metadata_with_primary_key(vec![1, 0], true))
529 } else {
530 Arc::new(memtable_util::metadata_with_primary_key(vec![], false))
531 };
532 let memtable = PartitionTreeMemtableBuilder::new(PartitionTreeConfig::default(), None)
534 .build(1, &metadata);
535
536 let expect = (0..100).collect::<Vec<_>>();
537 let kvs = memtable_util::build_key_values(&metadata, "hello".to_string(), 10, &expect, 1);
538 memtable.write(&kvs).unwrap();
539 let ranges = memtable
540 .ranges(Some(&[3]), RangesOptions::default())
541 .unwrap();
542 let iter = ranges.build(None).unwrap();
543
544 let mut v0_all = vec![];
545 for res in iter {
546 let batch = res.unwrap();
547 assert_eq!(1, batch.fields().len());
548 let v0 = batch
549 .fields()
550 .first()
551 .unwrap()
552 .data
553 .as_any()
554 .downcast_ref::<Int64Vector>()
555 .unwrap();
556 v0_all.extend(v0.iter_data().map(|v| v.unwrap()));
557 }
558 assert_eq!(expect, v0_all);
559 }
560
561 #[test]
562 fn test_write_iter_multi_keys() {
563 write_iter_multi_keys(1, 100);
564 write_iter_multi_keys(2, 100);
565 write_iter_multi_keys(4, 100);
566 write_iter_multi_keys(8, 5);
567 write_iter_multi_keys(2, 10);
568 }
569
570 fn write_iter_multi_keys(max_keys: usize, freeze_threshold: usize) {
571 let metadata = Arc::new(memtable_util::metadata_with_primary_key(vec![1, 0], true));
572 let codec = Arc::new(DensePrimaryKeyCodec::new(&metadata));
573 let memtable = PartitionTreeMemtable::new(
574 1,
575 codec,
576 metadata.clone(),
577 None,
578 &PartitionTreeConfig {
579 index_max_keys_per_shard: max_keys,
580 data_freeze_threshold: freeze_threshold,
581 ..Default::default()
582 },
583 );
584
585 let mut data = Vec::new();
586 for i in 0..4 {
588 for j in 0..4 {
589 let timestamps = [11, 13, 1, 5, 3, 7, 9];
591 let key = format!("a{j}");
592 let kvs =
593 memtable_util::build_key_values(&metadata, key.clone(), i, ×tamps, 0);
594 memtable.write(&kvs).unwrap();
595 for ts in timestamps {
596 data.push((i, key.clone(), ts));
597 }
598 }
599 for j in 0..4 {
600 let timestamps = [10, 2, 4, 8, 6];
602 let key = format!("a{j}");
603 let kvs =
604 memtable_util::build_key_values(&metadata, key.clone(), i, ×tamps, 200);
605 memtable.write(&kvs).unwrap();
606 for ts in timestamps {
607 data.push((i, key.clone(), ts));
608 }
609 }
610 }
611 data.sort_unstable();
612
613 let expect = data.into_iter().map(|x| x.2).collect::<Vec<_>>();
614 let iter = memtable.iter(None, None, None).unwrap();
615 let read = collect_iter_timestamps(iter);
616 assert_eq!(expect, read);
617 }
618
619 #[test]
620 fn test_deserialize_config() {
621 let config = PartitionTreeConfig {
622 dedup: false,
623 ..Default::default()
624 };
625 let json = serde_json::to_string(&config).unwrap();
627 let config: PartitionTreeConfig = serde_json::from_str(&json).unwrap();
628 assert!(config.dedup);
629 assert_eq!(PartitionTreeConfig::default(), config);
630 }
631
632 fn metadata_for_metric_engine() -> RegionMetadataRef {
633 let mut builder = RegionMetadataBuilder::new(RegionId::new(123, 456));
634 builder
635 .push_column_metadata(ColumnMetadata {
636 column_schema: ColumnSchema::new(
637 "__table_id",
638 ConcreteDataType::uint32_datatype(),
639 false,
640 ),
641 semantic_type: SemanticType::Tag,
642 column_id: 2147483652,
643 })
644 .push_column_metadata(ColumnMetadata {
645 column_schema: ColumnSchema::new(
646 "__tsid",
647 ConcreteDataType::uint64_datatype(),
648 false,
649 ),
650 semantic_type: SemanticType::Tag,
651 column_id: 2147483651,
652 })
653 .push_column_metadata(ColumnMetadata {
654 column_schema: ColumnSchema::new(
655 "test_label",
656 ConcreteDataType::string_datatype(),
657 false,
658 ),
659 semantic_type: SemanticType::Tag,
660 column_id: 2,
661 })
662 .push_column_metadata(ColumnMetadata {
663 column_schema: ColumnSchema::new(
664 greptime_timestamp(),
665 ConcreteDataType::timestamp_millisecond_datatype(),
666 false,
667 ),
668 semantic_type: SemanticType::Timestamp,
669 column_id: 0,
670 })
671 .push_column_metadata(ColumnMetadata {
672 column_schema: ColumnSchema::new(
673 greptime_value(),
674 ConcreteDataType::float64_datatype(),
675 true,
676 ),
677 semantic_type: SemanticType::Field,
678 column_id: 1,
679 })
680 .primary_key(vec![2147483652, 2147483651, 2]);
681 let region_metadata = builder.build().unwrap();
682 Arc::new(region_metadata)
683 }
684
685 fn build_key_values(
686 metadata: RegionMetadataRef,
687 labels: &[&str],
688 table_id: &[u32],
689 ts_id: &[u64],
690 ts: &[i64],
691 values: &[f64],
692 sequence: u64,
693 ) -> KeyValues {
694 let column_schema = region_metadata_to_row_schema(&metadata);
695
696 let rows = ts
697 .iter()
698 .zip(table_id.iter())
699 .zip(ts_id.iter())
700 .zip(labels.iter())
701 .zip(values.iter())
702 .map(|((((ts, table_id), ts_id), label), val)| {
703 row(vec![
704 ValueData::U32Value(*table_id),
705 ValueData::U64Value(*ts_id),
706 ValueData::StringValue(label.to_string()),
707 ValueData::TimestampMillisecondValue(*ts),
708 ValueData::F64Value(*val),
709 ])
710 })
711 .collect();
712 let mutation = api::v1::Mutation {
713 op_type: 1,
714 sequence,
715 rows: Some(Rows {
716 schema: column_schema,
717 rows,
718 }),
719 write_hint: None,
720 };
721 KeyValues::new(metadata.as_ref(), mutation).unwrap()
722 }
723
724 #[test]
725 fn test_write_freeze() {
726 let metadata = metadata_for_metric_engine();
727 let memtable = PartitionTreeMemtableBuilder::new(
728 PartitionTreeConfig {
729 index_max_keys_per_shard: 40,
730 ..Default::default()
731 },
732 None,
733 )
734 .build(1, &metadata);
735
736 let codec = DensePrimaryKeyCodec::new(&metadata);
737
738 memtable
739 .write(&build_key_values(
740 metadata.clone(),
741 &["daily", "10min", "daily", "10min"],
742 &[1025, 1025, 1025, 1025],
743 &[
744 16442255374049317291,
745 5686004715529701024,
746 16442255374049317291,
747 5686004715529701024,
748 ],
749 &[1712070000000, 1712717731000, 1712761200000, 1712761200000],
750 &[0.0, 0.0, 0.0, 0.0],
751 1,
752 ))
753 .unwrap();
754
755 memtable.freeze().unwrap();
756 let new_memtable = memtable.fork(2, &metadata);
757
758 new_memtable
759 .write(&build_key_values(
760 metadata.clone(),
761 &["10min"],
762 &[1025],
763 &[5686004715529701024],
764 &[1714643131000],
765 &[0.1],
766 2,
767 ))
768 .unwrap();
769
770 let mut reader = new_memtable
771 .ranges(None, RangesOptions::default())
772 .unwrap()
773 .build(None)
774 .unwrap();
775 let batch = reader.next().unwrap().unwrap();
776 let pk = codec.decode(batch.primary_key()).unwrap().into_dense();
777 if let Value::String(s) = &pk[2] {
778 assert_eq!("10min", s.as_utf8());
779 } else {
780 unreachable!()
781 }
782 }
783
784 fn kv_region_metadata() -> RegionMetadataRef {
785 let mut builder = RegionMetadataBuilder::new(RegionId::new(123, 456));
786 builder
787 .push_column_metadata(ColumnMetadata {
788 column_schema: ColumnSchema::new(
789 "ts",
790 ConcreteDataType::timestamp_millisecond_datatype(),
791 false,
792 ),
793 semantic_type: SemanticType::Timestamp,
794 column_id: 0,
795 })
796 .push_column_metadata(ColumnMetadata {
797 column_schema: ColumnSchema::new("k", ConcreteDataType::string_datatype(), false),
798 semantic_type: SemanticType::Tag,
799 column_id: 1,
800 })
801 .push_column_metadata(ColumnMetadata {
802 column_schema: ColumnSchema::new("v", ConcreteDataType::string_datatype(), false),
803 semantic_type: SemanticType::Field,
804 column_id: 2,
805 })
806 .primary_key(vec![1]);
807 let region_metadata = builder.build().unwrap();
808 Arc::new(region_metadata)
809 }
810
811 fn kv_column_schemas() -> Vec<api::v1::ColumnSchema> {
812 vec![
813 time_index_column_schema("ts", api::v1::ColumnDataType::TimestampMillisecond),
814 tag_column_schema("k", api::v1::ColumnDataType::String),
815 field_column_schema("v", api::v1::ColumnDataType::String),
816 ]
817 }
818
819 fn key_values<T: AsRef<str>>(
820 metadata: &RegionMetadataRef,
821 keys: impl Iterator<Item = T>,
822 ) -> KeyValues {
823 let rows = keys
824 .map(|c| {
825 row(vec![
826 ValueData::TimestampMillisecondValue(0),
827 ValueData::StringValue(c.as_ref().to_string()),
828 ValueData::StringValue(c.as_ref().to_string()),
829 ])
830 })
831 .collect();
832 let mutation = Mutation {
833 op_type: OpType::Put as i32,
834 sequence: 0,
835 rows: Some(Rows {
836 schema: kv_column_schemas(),
837 rows,
838 }),
839 write_hint: None,
840 };
841 KeyValues::new(metadata, mutation).unwrap()
842 }
843
844 fn collect_kvs(
845 iter: BoxedBatchIterator,
846 region_meta: &RegionMetadataRef,
847 ) -> HashMap<String, String> {
848 let decoder = DensePrimaryKeyCodec::new(region_meta);
849 let mut res = HashMap::new();
850 for v in iter {
851 let batch = v.unwrap();
852 let values = decoder.decode(batch.primary_key()).unwrap().into_dense();
853 let field_vector = batch.fields()[0]
854 .data
855 .as_any()
856 .downcast_ref::<StringVector>()
857 .unwrap();
858 for row in 0..batch.num_rows() {
859 res.insert(
860 values[0].as_string().unwrap(),
861 field_vector.get(row).as_string().unwrap(),
862 );
863 }
864 }
865 res
866 }
867
868 #[test]
869 fn test_reorder_insert_key_values() {
870 let metadata = kv_region_metadata();
871 let memtable = PartitionTreeMemtableBuilder::new(PartitionTreeConfig::default(), None)
872 .build(1, &metadata);
873
874 memtable
875 .write(&key_values(&metadata, ('a'..'h').map(|c| c.to_string())))
876 .unwrap();
877 memtable.freeze().unwrap();
878 assert_eq!(
879 collect_kvs(
880 memtable
881 .ranges(None, RangesOptions::default())
882 .unwrap()
883 .build(None)
884 .unwrap(),
885 &metadata
886 ),
887 ('a'..'h').map(|c| (c.to_string(), c.to_string())).collect()
888 );
889 let forked = memtable.fork(2, &metadata);
890
891 let keys = ["c", "f", "i", "h", "b", "e", "g"];
892 forked.write(&key_values(&metadata, keys.iter())).unwrap();
893 forked.freeze().unwrap();
894 assert_eq!(
895 collect_kvs(
896 forked
897 .ranges(None, RangesOptions::default())
898 .unwrap()
899 .build(None)
900 .unwrap(),
901 &metadata
902 ),
903 keys.iter()
904 .map(|c| (c.to_string(), c.to_string()))
905 .collect()
906 );
907
908 let forked2 = forked.fork(3, &metadata);
909
910 let keys = ["g", "e", "a", "f", "b", "c", "h"];
911 forked2.write(&key_values(&metadata, keys.iter())).unwrap();
912
913 let kvs = collect_kvs(
914 forked2
915 .ranges(None, RangesOptions::default())
916 .unwrap()
917 .build(None)
918 .unwrap(),
919 &metadata,
920 );
921 let expected = keys
922 .iter()
923 .map(|c| (c.to_string(), c.to_string()))
924 .collect::<HashMap<_, _>>();
925 assert_eq!(kvs, expected);
926 }
927
928 #[test]
929 fn test_build_record_batch_iter_from_memtable() {
930 let metadata = Arc::new(memtable_util::metadata_with_primary_key(vec![1, 0], true));
931 let codec = Arc::new(DensePrimaryKeyCodec::new(&metadata));
932 let memtable = PartitionTreeMemtable::new(
933 1,
934 codec,
935 metadata.clone(),
936 None,
937 &PartitionTreeConfig::default(),
938 );
939
940 let kvs =
941 memtable_util::build_key_values(&metadata, "hello".to_string(), 42, &[1, 2, 3], 0);
942 memtable.write(&kvs).unwrap();
943
944 let read_column_ids: Vec<ColumnId> = metadata
945 .column_metadatas
946 .iter()
947 .map(|c| c.column_id)
948 .collect();
949 let ranges = memtable
950 .ranges(Some(&read_column_ids), RangesOptions::default())
951 .unwrap();
952 assert!(!ranges.ranges.is_empty());
953
954 let mut total_rows = 0;
955 for range in ranges.ranges.into_values() {
956 let mut iter = range.build_record_batch_iter(None, None).unwrap();
957 while let Some(rb) = iter.next().transpose().unwrap() {
958 total_rows += rb.num_rows();
959 let schema = rb.schema();
960 let column_names: Vec<_> =
961 schema.fields().iter().map(|f| f.name().as_str()).collect();
962 assert_eq!(
963 column_names,
964 vec![
965 "__table_id",
966 "k0",
967 "v0",
968 "v1",
969 "ts",
970 "__primary_key",
971 "__sequence",
972 "__op_type",
973 ]
974 );
975 }
976 }
977 assert_eq!(3, total_rows);
978 }
979
980 #[test]
981 fn test_build_record_batch_iter_with_time_range() {
982 let metadata = Arc::new(memtable_util::metadata_with_primary_key(vec![1, 0], true));
983 let codec = Arc::new(DensePrimaryKeyCodec::new(&metadata));
984 let memtable = PartitionTreeMemtable::new(
985 1,
986 codec,
987 metadata.clone(),
988 None,
989 &PartitionTreeConfig::default(),
990 );
991
992 let kvs = memtable_util::build_key_values(
993 &metadata,
994 "hello".to_string(),
995 42,
996 &[1, 2, 3, 4, 5],
997 0,
998 );
999 memtable.write(&kvs).unwrap();
1000
1001 let read_column_ids: Vec<ColumnId> = metadata
1002 .column_metadatas
1003 .iter()
1004 .map(|c| c.column_id)
1005 .collect();
1006 let ranges = memtable
1007 .ranges(Some(&read_column_ids), RangesOptions::default())
1008 .unwrap();
1009 assert!(!ranges.ranges.is_empty());
1010
1011 let time_range = (Timestamp::new_millisecond(2), Timestamp::new_millisecond(4));
1012
1013 let mut total_rows = 0;
1014 let mut all_timestamps = Vec::new();
1015 for range in ranges.ranges.into_values() {
1016 let mut iter = range
1017 .build_record_batch_iter(Some(time_range), None)
1018 .unwrap();
1019 while let Some(rb) = iter.next().transpose().unwrap() {
1020 total_rows += rb.num_rows();
1021 let ts_col = rb
1023 .column_by_name("ts")
1024 .unwrap()
1025 .as_any()
1026 .downcast_ref::<datatypes::arrow::array::TimestampMillisecondArray>()
1027 .unwrap();
1028 for i in 0..ts_col.len() {
1029 all_timestamps.push(ts_col.value(i));
1030 }
1031 }
1032 }
1033 assert_eq!(3, total_rows);
1034 all_timestamps.sort();
1035 assert_eq!(vec![2, 3, 4], all_timestamps);
1036 }
1037}