1use std::collections::{HashMap, HashSet};
18use std::sync::{Arc, Mutex, MutexGuard};
19use std::time::Duration;
20
21use common_telemetry::debug;
22use common_time::timestamp::TimeUnit;
23use common_time::timestamp_millis::BucketAligned;
24use common_time::Timestamp;
25use datatypes::arrow;
26use datatypes::arrow::array::{
27 ArrayRef, BooleanArray, RecordBatch, RecordBatchOptions, TimestampMicrosecondArray,
28 TimestampMillisecondArray, TimestampNanosecondArray, TimestampSecondArray,
29};
30use datatypes::arrow::buffer::{BooleanBuffer, MutableBuffer};
31use datatypes::arrow::datatypes::{DataType, Int64Type};
32use smallvec::{smallvec, SmallVec};
33use snafu::{OptionExt, ResultExt};
34use store_api::metadata::RegionMetadataRef;
35
36use crate::error;
37use crate::error::{InvalidRequestSnafu, Result};
38use crate::memtable::bulk::part::BulkPart;
39use crate::memtable::key_values::KeyValue;
40use crate::memtable::version::SmallMemtableVec;
41use crate::memtable::{KeyValues, MemtableBuilderRef, MemtableId, MemtableRef};
42
43#[derive(Debug, Clone)]
45pub struct TimePartition {
46 memtable: MemtableRef,
48 time_range: Option<PartTimeRange>,
52}
53
54impl TimePartition {
55 fn contains_timestamp(&self, ts: Timestamp) -> bool {
57 let Some(range) = self.time_range else {
58 return true;
59 };
60
61 range.contains_timestamp(ts)
62 }
63
64 fn write(&self, kvs: &KeyValues) -> Result<()> {
66 self.memtable.write(kvs)
67 }
68
69 fn write_record_batch(&self, rb: BulkPart) -> Result<()> {
71 self.memtable.write_bulk(rb)
72 }
73
74 fn write_record_batch_partial(&self, part: &BulkPart) -> error::Result<()> {
76 let Some(range) = self.time_range else {
77 unreachable!("TimePartition must have explicit time range when a bulk request involves multiple time partition")
78 };
79 let Some(filtered) = filter_record_batch(
80 part,
81 range.min_timestamp.value(),
82 range.max_timestamp.value(),
83 )?
84 else {
85 return Ok(());
86 };
87 self.write_record_batch(filtered)
88 }
89}
90
91macro_rules! create_filter_buffer {
92 ($ts_array:expr, $min:expr, $max:expr) => {{
93 let len = $ts_array.len();
94 let mut buffer = MutableBuffer::new(len.div_ceil(64) * 8);
95
96 let f = |idx: usize| -> bool {
97 unsafe {
99 let val = $ts_array.value_unchecked(idx);
100 val >= $min && val < $max
101 }
102 };
103
104 let chunks = len / 64;
105 let remainder = len % 64;
106
107 for chunk in 0..chunks {
108 let mut packed = 0;
109 for bit_idx in 0..64 {
110 let i = bit_idx + chunk * 64;
111 packed |= (f(i) as u64) << bit_idx;
112 }
113 unsafe { buffer.push_unchecked(packed) }
115 }
116
117 if remainder != 0 {
118 let mut packed = 0;
119 for bit_idx in 0..remainder {
120 let i = bit_idx + chunks * 64;
121 packed |= (f(i) as u64) << bit_idx;
122 }
123 unsafe { buffer.push_unchecked(packed) }
125 }
126
127 BooleanArray::new(BooleanBuffer::new(buffer.into(), 0, len), None)
128 }};
129}
130
131macro_rules! handle_timestamp_array {
132 ($ts_array:expr, $array_type:ty, $min:expr, $max:expr) => {{
133 let ts_array = $ts_array.as_any().downcast_ref::<$array_type>().unwrap();
134 let filter = create_filter_buffer!(ts_array, $min, $max);
135
136 let res = arrow::compute::filter(ts_array, &filter).context(error::ComputeArrowSnafu)?;
137 if res.is_empty() {
138 return Ok(None);
139 }
140
141 let i64array = res.as_any().downcast_ref::<$array_type>().unwrap();
142 let max_ts = arrow::compute::max(i64array).unwrap();
144 let min_ts = arrow::compute::min(i64array).unwrap();
145
146 (res, filter, min_ts, max_ts)
147 }};
148}
149
150pub fn filter_record_batch(part: &BulkPart, min: i64, max: i64) -> Result<Option<BulkPart>> {
153 let ts_array = part.timestamps();
154 let (ts_array, filter, min_ts, max_ts) = match ts_array.data_type() {
155 DataType::Timestamp(unit, _) => match unit {
156 arrow::datatypes::TimeUnit::Second => {
157 handle_timestamp_array!(ts_array, TimestampSecondArray, min, max)
158 }
159 arrow::datatypes::TimeUnit::Millisecond => {
160 handle_timestamp_array!(ts_array, TimestampMillisecondArray, min, max)
161 }
162 arrow::datatypes::TimeUnit::Microsecond => {
163 handle_timestamp_array!(ts_array, TimestampMicrosecondArray, min, max)
164 }
165 arrow::datatypes::TimeUnit::Nanosecond => {
166 handle_timestamp_array!(ts_array, TimestampNanosecondArray, min, max)
167 }
168 },
169 _ => {
170 unreachable!("Got data type: {:?}", ts_array.data_type());
171 }
172 };
173
174 let num_rows = ts_array.len();
175 let arrays = part
176 .batch
177 .columns()
178 .iter()
179 .enumerate()
180 .map(|(index, array)| {
181 if index == part.timestamp_index {
182 Ok(ts_array.clone())
183 } else {
184 arrow::compute::filter(&array, &filter).context(error::ComputeArrowSnafu)
185 }
186 })
187 .collect::<Result<Vec<_>>>()?;
188 let batch = RecordBatch::try_new_with_options(
189 part.batch.schema(),
190 arrays,
191 &RecordBatchOptions::default().with_row_count(Some(num_rows)),
192 )
193 .context(error::NewRecordBatchSnafu)?;
194 Ok(Some(BulkPart {
195 batch,
196 num_rows,
197 max_ts,
198 min_ts,
199 sequence: part.sequence,
200 timestamp_index: part.timestamp_index,
201 }))
202}
203
204type PartitionVec = SmallVec<[TimePartition; 2]>;
205
206#[derive(Debug)]
208pub struct TimePartitions {
209 inner: Mutex<PartitionsInner>,
211 part_duration: Option<Duration>,
216 metadata: RegionMetadataRef,
218 builder: MemtableBuilderRef,
220}
221
222pub type TimePartitionsRef = Arc<TimePartitions>;
223
224impl TimePartitions {
225 pub fn new(
227 metadata: RegionMetadataRef,
228 builder: MemtableBuilderRef,
229 next_memtable_id: MemtableId,
230 part_duration: Option<Duration>,
231 ) -> Self {
232 let mut inner = PartitionsInner::new(next_memtable_id);
233 if part_duration.is_none() {
234 let memtable = builder.build(inner.alloc_memtable_id(), &metadata);
237 debug!(
238 "Creates a time partition for all timestamps, region: {}, memtable_id: {}",
239 metadata.region_id,
240 memtable.id(),
241 );
242 let part = TimePartition {
243 memtable,
244 time_range: None,
245 };
246 inner.parts.push(part);
247 }
248
249 Self {
250 inner: Mutex::new(inner),
251 part_duration,
252 metadata,
253 builder,
254 }
255 }
256
257 pub fn write(&self, kvs: &KeyValues) -> Result<()> {
261 let parts = self.list_partitions();
263
264 for part in parts.iter().rev() {
267 let mut all_in_partition = true;
268 for kv in kvs.iter() {
269 let ts = kv.timestamp().as_timestamp().unwrap().unwrap();
271 if !part.contains_timestamp(ts) {
272 all_in_partition = false;
273 break;
274 }
275 }
276 if !all_in_partition {
277 continue;
278 }
279
280 return part.write(kvs);
282 }
283
284 self.write_multi_parts(kvs, &parts)
286 }
287
288 pub fn write_bulk(&self, part: BulkPart) -> Result<()> {
289 let time_type = self
290 .metadata
291 .time_index_column()
292 .column_schema
293 .data_type
294 .as_timestamp()
295 .unwrap();
296
297 let parts = self.list_partitions();
299 let (matching_parts, missing_parts) = self.find_partitions_by_time_range(
300 part.timestamps(),
301 &parts,
302 time_type.create_timestamp(part.min_ts),
303 time_type.create_timestamp(part.max_ts),
304 )?;
305
306 if matching_parts.len() == 1 && missing_parts.is_empty() {
307 return matching_parts[0].write_record_batch(part);
309 }
310
311 for matching in matching_parts {
312 matching.write_record_batch_partial(&part)?
313 }
314
315 for missing in missing_parts {
316 let new_part = {
317 let mut inner = self.inner.lock().unwrap();
318 self.get_or_create_time_partition(missing, &mut inner)?
319 };
320 new_part.write_record_batch_partial(&part)?;
321 }
322 Ok(())
323 }
324
325 fn get_or_create_time_partition(
328 &self,
329 part_start: Timestamp,
330 inner: &mut MutexGuard<PartitionsInner>,
331 ) -> Result<TimePartition> {
332 let part_duration = self.part_duration.unwrap();
333 let part_pos = match inner
334 .parts
335 .iter()
336 .position(|part| part.time_range.unwrap().min_timestamp == part_start)
337 {
338 Some(pos) => pos,
339 None => {
340 let range = PartTimeRange::from_start_duration(part_start, part_duration)
341 .with_context(|| InvalidRequestSnafu {
342 region_id: self.metadata.region_id,
343 reason: format!(
344 "Partition time range for {part_start:?} is out of bound, bucket size: {part_duration:?}",
345 ),
346 })?;
347 let memtable = self
348 .builder
349 .build(inner.alloc_memtable_id(), &self.metadata);
350 debug!(
351 "Create time partition {:?} for region {}, duration: {:?}, memtable_id: {}, parts_total: {}",
352 range,
353 self.metadata.region_id,
354 part_duration,
355 memtable.id(),
356 inner.parts.len() + 1
357 );
358 let pos = inner.parts.len();
359 inner.parts.push(TimePartition {
360 memtable,
361 time_range: Some(range),
362 });
363 pos
364 }
365 };
366 Ok(inner.parts[part_pos].clone())
367 }
368
369 pub fn list_memtables(&self, memtables: &mut Vec<MemtableRef>) {
371 let inner = self.inner.lock().unwrap();
372 memtables.extend(inner.parts.iter().map(|part| part.memtable.clone()));
373 }
374
375 pub fn num_partitions(&self) -> usize {
377 let inner = self.inner.lock().unwrap();
378 inner.parts.len()
379 }
380
381 pub fn is_empty(&self) -> bool {
383 let inner = self.inner.lock().unwrap();
384 inner.parts.iter().all(|part| part.memtable.is_empty())
385 }
386
387 pub fn freeze(&self) -> Result<()> {
389 let inner = self.inner.lock().unwrap();
390 for part in &*inner.parts {
391 part.memtable.freeze()?;
392 }
393 Ok(())
394 }
395
396 pub fn fork(&self, metadata: &RegionMetadataRef, part_duration: Option<Duration>) -> Self {
398 let part_duration = part_duration.or(self.part_duration);
400
401 let mut inner = self.inner.lock().unwrap();
402 let latest_part = inner
403 .parts
404 .iter()
405 .max_by_key(|part| part.time_range.map(|range| range.min_timestamp))
406 .cloned();
407
408 let Some(old_part) = latest_part else {
409 return Self::new(
411 metadata.clone(),
412 self.builder.clone(),
413 inner.next_memtable_id,
414 part_duration,
415 );
416 };
417
418 let old_stats = old_part.memtable.stats();
419 let new_time_range =
422 old_stats
423 .time_range()
424 .zip(part_duration)
425 .and_then(|(range, bucket)| {
426 partition_start_timestamp(range.1, bucket)
427 .and_then(|start| PartTimeRange::from_start_duration(start, bucket))
428 });
429 let memtable = old_part.memtable.fork(inner.alloc_memtable_id(), metadata);
431 let new_part = TimePartition {
432 memtable,
433 time_range: new_time_range,
434 };
435
436 Self {
437 inner: Mutex::new(PartitionsInner::with_partition(
438 new_part,
439 inner.next_memtable_id,
440 )),
441 part_duration,
442 metadata: metadata.clone(),
443 builder: self.builder.clone(),
444 }
445 }
446
447 pub(crate) fn part_duration(&self) -> Option<Duration> {
449 self.part_duration
450 }
451
452 pub(crate) fn memory_usage(&self) -> usize {
454 let inner = self.inner.lock().unwrap();
455 inner
456 .parts
457 .iter()
458 .map(|part| part.memtable.stats().estimated_bytes)
459 .sum()
460 }
461
462 pub(crate) fn num_rows(&self) -> u64 {
464 let inner = self.inner.lock().unwrap();
465 inner
466 .parts
467 .iter()
468 .map(|part| part.memtable.stats().num_rows as u64)
469 .sum()
470 }
471
472 pub(crate) fn list_memtables_to_small_vec(&self, memtables: &mut SmallMemtableVec) {
474 let inner = self.inner.lock().unwrap();
475 memtables.extend(inner.parts.iter().map(|part| part.memtable.clone()));
476 }
477
478 pub(crate) fn next_memtable_id(&self) -> MemtableId {
480 let inner = self.inner.lock().unwrap();
481 inner.next_memtable_id
482 }
483
484 pub(crate) fn new_with_part_duration(&self, part_duration: Option<Duration>) -> Self {
487 debug_assert!(self.is_empty());
488
489 Self::new(
490 self.metadata.clone(),
491 self.builder.clone(),
492 self.next_memtable_id(),
493 part_duration.or(self.part_duration),
494 )
495 }
496
497 fn list_partitions(&self) -> PartitionVec {
499 let inner = self.inner.lock().unwrap();
500 inner.parts.clone()
501 }
502
503 fn find_partitions_by_time_range<'a>(
506 &self,
507 ts_array: &ArrayRef,
508 existing_parts: &'a [TimePartition],
509 min: Timestamp,
510 max: Timestamp,
511 ) -> Result<(Vec<&'a TimePartition>, Vec<Timestamp>)> {
512 let mut matching = Vec::new();
513
514 let mut present = HashSet::new();
515 for part in existing_parts {
517 let Some(part_time_range) = part.time_range.as_ref() else {
518 matching.push(part);
519 return Ok((matching, Vec::new()));
520 };
521
522 if !(max < part_time_range.min_timestamp || min >= part_time_range.max_timestamp) {
523 matching.push(part);
524 present.insert(part_time_range.min_timestamp.value());
525 }
526 }
527
528 let part_duration = self.part_duration.unwrap();
530 let timestamp_unit = self.metadata.time_index_type().unit();
531
532 let part_duration_sec = part_duration.as_secs() as i64;
533 let start_bucket = min
535 .convert_to(TimeUnit::Second)
536 .unwrap()
537 .value()
538 .div_euclid(part_duration_sec);
539 let end_bucket = max
540 .convert_to(TimeUnit::Second)
541 .unwrap()
542 .value()
543 .div_euclid(part_duration_sec);
544 let bucket_num = (end_bucket - start_bucket + 1) as usize;
545
546 let num_timestamps = ts_array.len();
547 let missing = if bucket_num <= num_timestamps {
548 (start_bucket..=end_bucket)
549 .filter_map(|start_sec| {
550 let Some(timestamp) = Timestamp::new_second(start_sec * part_duration_sec)
551 .convert_to(timestamp_unit)
552 else {
553 return Some(
554 InvalidRequestSnafu {
555 region_id: self.metadata.region_id,
556 reason: format!("Timestamp out of range: {}", start_sec),
557 }
558 .fail(),
559 );
560 };
561 if present.insert(timestamp.value()) {
562 Some(Ok(timestamp))
563 } else {
564 None
565 }
566 })
567 .collect::<Result<Vec<_>>>()?
568 } else {
569 let ts_primitive = match ts_array.data_type() {
570 DataType::Timestamp(unit, _) => match unit {
571 arrow::datatypes::TimeUnit::Second => ts_array
572 .as_any()
573 .downcast_ref::<TimestampSecondArray>()
574 .unwrap()
575 .reinterpret_cast::<Int64Type>(),
576 arrow::datatypes::TimeUnit::Millisecond => ts_array
577 .as_any()
578 .downcast_ref::<TimestampMillisecondArray>()
579 .unwrap()
580 .reinterpret_cast::<Int64Type>(),
581 arrow::datatypes::TimeUnit::Microsecond => ts_array
582 .as_any()
583 .downcast_ref::<TimestampMicrosecondArray>()
584 .unwrap()
585 .reinterpret_cast::<Int64Type>(),
586 arrow::datatypes::TimeUnit::Nanosecond => ts_array
587 .as_any()
588 .downcast_ref::<TimestampNanosecondArray>()
589 .unwrap()
590 .reinterpret_cast::<Int64Type>(),
591 },
592 _ => unreachable!(),
593 };
594
595 ts_primitive
596 .values()
597 .iter()
598 .filter_map(|ts| {
599 let ts = self.metadata.time_index_type().create_timestamp(*ts);
600 let Some(bucket_start) = ts
601 .convert_to(TimeUnit::Second)
602 .and_then(|ts| ts.align_by_bucket(part_duration_sec))
603 .and_then(|ts| ts.convert_to(timestamp_unit))
604 else {
605 return Some(
606 InvalidRequestSnafu {
607 region_id: self.metadata.region_id,
608 reason: format!("Timestamp out of range: {:?}", ts),
609 }
610 .fail(),
611 );
612 };
613 if present.insert(bucket_start.value()) {
614 Some(Ok(bucket_start))
615 } else {
616 None
617 }
618 })
619 .collect::<Result<Vec<_>>>()?
620 };
621 Ok((matching, missing))
622 }
623
624 fn write_multi_parts(&self, kvs: &KeyValues, parts: &PartitionVec) -> Result<()> {
626 debug_assert!(self.part_duration.is_some());
629
630 let mut parts_to_write = HashMap::new();
631 let mut missing_parts = HashMap::new();
632 for kv in kvs.iter() {
633 let mut part_found = false;
634 let ts = kv.timestamp().as_timestamp().unwrap().unwrap();
636 for part in parts {
637 if part.contains_timestamp(ts) {
638 parts_to_write
640 .entry(part.time_range.unwrap().min_timestamp)
641 .or_insert_with(|| PartitionToWrite {
642 partition: part.clone(),
643 key_values: Vec::new(),
644 })
645 .key_values
646 .push(kv);
647 part_found = true;
648 break;
649 }
650 }
651
652 if !part_found {
653 let part_duration = self.part_duration.unwrap();
656 let part_start =
657 partition_start_timestamp(ts, part_duration).with_context(|| {
658 InvalidRequestSnafu {
659 region_id: self.metadata.region_id,
660 reason: format!(
661 "timestamp {ts:?} and bucket {part_duration:?} are out of range"
662 ),
663 }
664 })?;
665 missing_parts
666 .entry(part_start)
667 .or_insert_with(Vec::new)
668 .push(kv);
669 }
670 }
671
672 for part_to_write in parts_to_write.into_values() {
674 for kv in part_to_write.key_values {
675 part_to_write.partition.memtable.write_one(kv)?;
676 }
677 }
678
679 let mut inner = self.inner.lock().unwrap();
682 for (part_start, key_values) in missing_parts {
683 let partition = self.get_or_create_time_partition(part_start, &mut inner)?;
684 for kv in key_values {
685 partition.memtable.write_one(kv)?;
686 }
687 }
688
689 Ok(())
690 }
691}
692
693fn partition_start_timestamp(ts: Timestamp, bucket: Duration) -> Option<Timestamp> {
697 let ts_sec = ts.convert_to(TimeUnit::Second).unwrap();
699 let bucket_sec: i64 = bucket.as_secs().try_into().ok()?;
700 let start_sec = ts_sec.align_by_bucket(bucket_sec)?;
701 start_sec.convert_to(ts.unit())
702}
703
704#[derive(Debug)]
705struct PartitionsInner {
706 parts: PartitionVec,
708 next_memtable_id: MemtableId,
710}
711
712impl PartitionsInner {
713 fn new(next_memtable_id: MemtableId) -> Self {
714 Self {
715 parts: Default::default(),
716 next_memtable_id,
717 }
718 }
719
720 fn with_partition(part: TimePartition, next_memtable_id: MemtableId) -> Self {
721 Self {
722 parts: smallvec![part],
723 next_memtable_id,
724 }
725 }
726
727 fn alloc_memtable_id(&mut self) -> MemtableId {
728 let id = self.next_memtable_id;
729 self.next_memtable_id += 1;
730 id
731 }
732}
733
734#[derive(Debug, Clone, Copy)]
736struct PartTimeRange {
737 min_timestamp: Timestamp,
739 max_timestamp: Timestamp,
741}
742
743impl PartTimeRange {
744 fn from_start_duration(start: Timestamp, duration: Duration) -> Option<Self> {
745 let start_sec = start.convert_to(TimeUnit::Second)?;
746 let end_sec = start_sec.add_duration(duration).ok()?;
747 let min_timestamp = start_sec.convert_to(start.unit())?;
748 let max_timestamp = end_sec.convert_to(start.unit())?;
749
750 Some(Self {
751 min_timestamp,
752 max_timestamp,
753 })
754 }
755
756 fn contains_timestamp(&self, ts: Timestamp) -> bool {
758 self.min_timestamp <= ts && ts < self.max_timestamp
759 }
760}
761
762struct PartitionToWrite<'a> {
763 partition: TimePartition,
764 key_values: Vec<KeyValue<'a>>,
765}
766
767#[cfg(test)]
768mod tests {
769 use std::sync::Arc;
770
771 use api::v1::SemanticType;
772 use datatypes::arrow::array::{ArrayRef, StringArray, TimestampMillisecondArray};
773 use datatypes::arrow::datatypes::{DataType, Field, Schema};
774 use datatypes::arrow::record_batch::RecordBatch;
775 use datatypes::prelude::ConcreteDataType;
776 use datatypes::schema::ColumnSchema;
777 use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
778 use store_api::storage::SequenceNumber;
779
780 use super::*;
781 use crate::memtable::partition_tree::PartitionTreeMemtableBuilder;
782 use crate::memtable::time_series::TimeSeriesMemtableBuilder;
783 use crate::test_util::memtable_util::{self, collect_iter_timestamps};
784
785 #[test]
786 fn test_no_duration() {
787 let metadata = memtable_util::metadata_for_test();
788 let builder = Arc::new(PartitionTreeMemtableBuilder::default());
789 let partitions = TimePartitions::new(metadata.clone(), builder, 0, None);
790 assert_eq!(1, partitions.num_partitions());
791 assert!(partitions.is_empty());
792
793 let kvs = memtable_util::build_key_values(
794 &metadata,
795 "hello".to_string(),
796 0,
797 &[1000, 3000, 7000, 5000, 6000],
798 0, );
800 partitions.write(&kvs).unwrap();
801
802 assert_eq!(1, partitions.num_partitions());
803 assert!(!partitions.is_empty());
804 let mut memtables = Vec::new();
805 partitions.list_memtables(&mut memtables);
806 assert_eq!(0, memtables[0].id());
807
808 let iter = memtables[0].iter(None, None, None).unwrap();
809 let timestamps = collect_iter_timestamps(iter);
810 assert_eq!(&[1000, 3000, 5000, 6000, 7000], ×tamps[..]);
811 }
812
813 #[test]
814 fn test_write_single_part() {
815 let metadata = memtable_util::metadata_for_test();
816 let builder = Arc::new(PartitionTreeMemtableBuilder::default());
817 let partitions =
818 TimePartitions::new(metadata.clone(), builder, 0, Some(Duration::from_secs(10)));
819 assert_eq!(0, partitions.num_partitions());
820
821 let kvs = memtable_util::build_key_values(
822 &metadata,
823 "hello".to_string(),
824 0,
825 &[5000, 2000, 0],
826 0, );
828 partitions.write(&kvs).unwrap();
830 assert_eq!(1, partitions.num_partitions());
831 assert!(!partitions.is_empty());
832
833 let kvs = memtable_util::build_key_values(
834 &metadata,
835 "hello".to_string(),
836 0,
837 &[3000, 7000, 4000],
838 3, );
840 partitions.write(&kvs).unwrap();
842 assert_eq!(1, partitions.num_partitions());
843
844 let mut memtables = Vec::new();
845 partitions.list_memtables(&mut memtables);
846 let iter = memtables[0].iter(None, None, None).unwrap();
847 let timestamps = collect_iter_timestamps(iter);
848 assert_eq!(&[0, 2000, 3000, 4000, 5000, 7000], ×tamps[..]);
849 let parts = partitions.list_partitions();
850 assert_eq!(
851 Timestamp::new_millisecond(0),
852 parts[0].time_range.unwrap().min_timestamp
853 );
854 assert_eq!(
855 Timestamp::new_millisecond(10000),
856 parts[0].time_range.unwrap().max_timestamp
857 );
858 }
859
860 fn new_multi_partitions(metadata: &RegionMetadataRef) -> TimePartitions {
861 let builder = Arc::new(PartitionTreeMemtableBuilder::default());
862 let partitions =
863 TimePartitions::new(metadata.clone(), builder, 0, Some(Duration::from_secs(5)));
864 assert_eq!(0, partitions.num_partitions());
865
866 let kvs = memtable_util::build_key_values(
867 metadata,
868 "hello".to_string(),
869 0,
870 &[2000, 0],
871 0, );
873 partitions.write(&kvs).unwrap();
875 assert_eq!(1, partitions.num_partitions());
876 assert!(!partitions.is_empty());
877
878 let kvs = memtable_util::build_key_values(
879 metadata,
880 "hello".to_string(),
881 0,
882 &[3000, 7000, 4000, 5000],
883 2, );
885 partitions.write(&kvs).unwrap();
887 assert_eq!(2, partitions.num_partitions());
888
889 partitions
890 }
891
892 #[test]
893 fn test_write_multi_parts() {
894 let metadata = memtable_util::metadata_for_test();
895 let partitions = new_multi_partitions(&metadata);
896
897 let parts = partitions.list_partitions();
898 let iter = parts[0].memtable.iter(None, None, None).unwrap();
899 let timestamps = collect_iter_timestamps(iter);
900 assert_eq!(0, parts[0].memtable.id());
901 assert_eq!(
902 Timestamp::new_millisecond(0),
903 parts[0].time_range.unwrap().min_timestamp
904 );
905 assert_eq!(
906 Timestamp::new_millisecond(5000),
907 parts[0].time_range.unwrap().max_timestamp
908 );
909 assert_eq!(&[0, 2000, 3000, 4000], ×tamps[..]);
910 let iter = parts[1].memtable.iter(None, None, None).unwrap();
911 assert_eq!(1, parts[1].memtable.id());
912 let timestamps = collect_iter_timestamps(iter);
913 assert_eq!(&[5000, 7000], ×tamps[..]);
914 assert_eq!(
915 Timestamp::new_millisecond(5000),
916 parts[1].time_range.unwrap().min_timestamp
917 );
918 assert_eq!(
919 Timestamp::new_millisecond(10000),
920 parts[1].time_range.unwrap().max_timestamp
921 );
922 }
923
924 #[test]
925 fn test_new_with_part_duration() {
926 let metadata = memtable_util::metadata_for_test();
927 let builder = Arc::new(PartitionTreeMemtableBuilder::default());
928 let partitions = TimePartitions::new(metadata.clone(), builder.clone(), 0, None);
929
930 let new_parts = partitions.new_with_part_duration(Some(Duration::from_secs(5)));
931 assert_eq!(Duration::from_secs(5), new_parts.part_duration().unwrap());
932 assert_eq!(1, new_parts.next_memtable_id());
933
934 let new_parts = new_parts.new_with_part_duration(None);
936 assert_eq!(Duration::from_secs(5), new_parts.part_duration().unwrap());
937 assert_eq!(1, new_parts.next_memtable_id());
939
940 let new_parts = new_parts.new_with_part_duration(Some(Duration::from_secs(10)));
941 assert_eq!(Duration::from_secs(10), new_parts.part_duration().unwrap());
942 assert_eq!(1, new_parts.next_memtable_id());
944
945 let builder = Arc::new(PartitionTreeMemtableBuilder::default());
946 let partitions = TimePartitions::new(metadata.clone(), builder.clone(), 0, None);
947 let new_parts = partitions.new_with_part_duration(None);
949 assert!(new_parts.part_duration().is_none());
950 assert_eq!(2, new_parts.next_memtable_id());
951 }
952
953 #[test]
954 fn test_fork_empty() {
955 let metadata = memtable_util::metadata_for_test();
956 let builder = Arc::new(PartitionTreeMemtableBuilder::default());
957 let partitions = TimePartitions::new(metadata.clone(), builder, 0, None);
958 partitions.freeze().unwrap();
959 let new_parts = partitions.fork(&metadata, None);
960 assert!(new_parts.part_duration().is_none());
961 assert_eq!(1, new_parts.list_partitions()[0].memtable.id());
962 assert_eq!(2, new_parts.next_memtable_id());
963
964 new_parts.freeze().unwrap();
965 let new_parts = new_parts.fork(&metadata, Some(Duration::from_secs(5)));
966 assert_eq!(Duration::from_secs(5), new_parts.part_duration().unwrap());
967 assert_eq!(2, new_parts.list_partitions()[0].memtable.id());
968 assert_eq!(3, new_parts.next_memtable_id());
969
970 new_parts.freeze().unwrap();
971 let new_parts = new_parts.fork(&metadata, None);
972 assert_eq!(Duration::from_secs(5), new_parts.part_duration().unwrap());
974 assert_eq!(3, new_parts.list_partitions()[0].memtable.id());
975 assert_eq!(4, new_parts.next_memtable_id());
976
977 new_parts.freeze().unwrap();
978 let new_parts = new_parts.fork(&metadata, Some(Duration::from_secs(10)));
979 assert_eq!(Duration::from_secs(10), new_parts.part_duration().unwrap());
980 assert_eq!(4, new_parts.list_partitions()[0].memtable.id());
981 assert_eq!(5, new_parts.next_memtable_id());
982 }
983
984 #[test]
985 fn test_fork_non_empty_none() {
986 let metadata = memtable_util::metadata_for_test();
987 let partitions = new_multi_partitions(&metadata);
988 partitions.freeze().unwrap();
989
990 let new_parts = partitions.fork(&metadata, None);
992 assert!(new_parts.is_empty());
993 assert_eq!(Duration::from_secs(5), new_parts.part_duration().unwrap());
994 assert_eq!(2, new_parts.list_partitions()[0].memtable.id());
995 assert_eq!(3, new_parts.next_memtable_id());
996
997 let new_parts = partitions.fork(&metadata, Some(Duration::from_secs(10)));
999 assert!(new_parts.is_empty());
1000 assert_eq!(Duration::from_secs(10), new_parts.part_duration().unwrap());
1001 assert_eq!(3, new_parts.list_partitions()[0].memtable.id());
1002 assert_eq!(4, new_parts.next_memtable_id());
1003 }
1004
1005 #[test]
1006 fn test_find_partitions_by_time_range() {
1007 let metadata = memtable_util::metadata_for_test();
1008 let builder = Arc::new(PartitionTreeMemtableBuilder::default());
1009
1010 let partitions = TimePartitions::new(metadata.clone(), builder.clone(), 0, None);
1012 let parts = partitions.list_partitions();
1013 let (matching, missing) = partitions
1014 .find_partitions_by_time_range(
1015 &(Arc::new(TimestampMillisecondArray::from_iter_values(1000..=2000)) as ArrayRef),
1016 &parts,
1017 Timestamp::new_millisecond(1000),
1018 Timestamp::new_millisecond(2000),
1019 )
1020 .unwrap();
1021 assert_eq!(matching.len(), 1);
1022 assert!(missing.is_empty());
1023 assert!(matching[0].time_range.is_none());
1024
1025 let partitions = TimePartitions::new(
1027 metadata.clone(),
1028 builder.clone(),
1029 0,
1030 Some(Duration::from_secs(5)),
1031 );
1032
1033 let kvs =
1035 memtable_util::build_key_values(&metadata, "hello".to_string(), 0, &[2000, 4000], 0);
1036 partitions.write(&kvs).unwrap();
1037 let kvs =
1038 memtable_util::build_key_values(&metadata, "hello".to_string(), 0, &[7000, 8000], 2);
1039 partitions.write(&kvs).unwrap();
1040
1041 let parts = partitions.list_partitions();
1042 assert_eq!(2, parts.len());
1043
1044 let (matching, missing) = partitions
1046 .find_partitions_by_time_range(
1047 &(Arc::new(TimestampMillisecondArray::from_iter_values(2000..=4000)) as ArrayRef),
1048 &parts,
1049 Timestamp::new_millisecond(2000),
1050 Timestamp::new_millisecond(4000),
1051 )
1052 .unwrap();
1053 assert_eq!(matching.len(), 1);
1054 assert!(missing.is_empty());
1055 assert_eq!(matching[0].time_range.unwrap().min_timestamp.value(), 0);
1056
1057 let (matching, missing) = partitions
1059 .find_partitions_by_time_range(
1060 &(Arc::new(TimestampMillisecondArray::from_iter_values(3000..=8000)) as ArrayRef),
1061 &parts,
1062 Timestamp::new_millisecond(3000),
1063 Timestamp::new_millisecond(8000),
1064 )
1065 .unwrap();
1066 assert_eq!(matching.len(), 2);
1067 assert!(missing.is_empty());
1068 assert_eq!(matching[0].time_range.unwrap().min_timestamp.value(), 0);
1069 assert_eq!(matching[1].time_range.unwrap().min_timestamp.value(), 5000);
1070
1071 let (matching, missing) = partitions
1073 .find_partitions_by_time_range(
1074 &(Arc::new(TimestampMillisecondArray::from_iter_values(12000..=13000)) as ArrayRef),
1075 &parts,
1076 Timestamp::new_millisecond(12000),
1077 Timestamp::new_millisecond(13000),
1078 )
1079 .unwrap();
1080 assert!(matching.is_empty());
1081 assert_eq!(missing.len(), 1);
1082 assert_eq!(missing[0].value(), 10000);
1083
1084 let (matching, missing) = partitions
1086 .find_partitions_by_time_range(
1087 &(Arc::new(TimestampMillisecondArray::from_iter_values(4000..=6000)) as ArrayRef),
1088 &parts,
1089 Timestamp::new_millisecond(4000),
1090 Timestamp::new_millisecond(6000),
1091 )
1092 .unwrap();
1093 assert_eq!(matching.len(), 2);
1094 assert!(missing.is_empty());
1095 assert_eq!(matching[0].time_range.unwrap().min_timestamp.value(), 0);
1096 assert_eq!(matching[1].time_range.unwrap().min_timestamp.value(), 5000);
1097
1098 let (matching, missing) = partitions
1100 .find_partitions_by_time_range(
1101 &(Arc::new(TimestampMillisecondArray::from_iter_values(4999..=5000)) as ArrayRef),
1102 &parts,
1103 Timestamp::new_millisecond(4999),
1104 Timestamp::new_millisecond(5000),
1105 )
1106 .unwrap();
1107 assert_eq!(matching.len(), 2);
1108 assert!(missing.is_empty());
1109 assert_eq!(matching[0].time_range.unwrap().min_timestamp.value(), 0);
1110 assert_eq!(matching[1].time_range.unwrap().min_timestamp.value(), 5000);
1111
1112 let (matching, missing) = partitions
1114 .find_partitions_by_time_range(
1115 &(Arc::new(TimestampMillisecondArray::from_iter_values(9999..=10000)) as ArrayRef),
1116 &parts,
1117 Timestamp::new_millisecond(9999),
1118 Timestamp::new_millisecond(10000),
1119 )
1120 .unwrap();
1121 assert_eq!(matching.len(), 1);
1122 assert_eq!(1, missing.len());
1123 assert_eq!(matching[0].time_range.unwrap().min_timestamp.value(), 5000);
1124 assert_eq!(missing[0].value(), 10000);
1125
1126 let (matching, missing) = partitions
1128 .find_partitions_by_time_range(
1129 &(Arc::new(TimestampMillisecondArray::from_iter_values(-1000..=1000)) as ArrayRef),
1130 &parts,
1131 Timestamp::new_millisecond(-1000),
1132 Timestamp::new_millisecond(1000),
1133 )
1134 .unwrap();
1135 assert_eq!(matching.len(), 1);
1136 assert_eq!(matching[0].time_range.unwrap().min_timestamp.value(), 0);
1137 assert_eq!(1, missing.len());
1138 assert_eq!(missing[0].value(), -5000);
1139
1140 let (matching, missing) = partitions
1142 .find_partitions_by_time_range(
1143 &(Arc::new(TimestampMillisecondArray::from(vec![
1144 -100000000000,
1145 0,
1146 100000000000,
1147 ])) as ArrayRef),
1148 &parts,
1149 Timestamp::new_millisecond(-100000000000),
1150 Timestamp::new_millisecond(100000000000),
1151 )
1152 .unwrap();
1153 assert_eq!(2, matching.len());
1154 assert_eq!(matching[0].time_range.unwrap().min_timestamp.value(), 0);
1155 assert_eq!(matching[1].time_range.unwrap().min_timestamp.value(), 5000);
1156 assert_eq!(2, missing.len());
1157 assert_eq!(missing[0].value(), -100000000000);
1158 assert_eq!(missing[1].value(), 100000000000);
1159 }
1160
1161 fn build_part(ts: &[i64], sequence: SequenceNumber) -> BulkPart {
1162 let schema = Arc::new(Schema::new(vec![
1163 Field::new(
1164 "ts",
1165 arrow::datatypes::DataType::Timestamp(
1166 arrow::datatypes::TimeUnit::Millisecond,
1167 None,
1168 ),
1169 false,
1170 ),
1171 Field::new("val", DataType::Utf8, true),
1172 ]));
1173 let ts_data = ts.to_vec();
1174 let ts_array = Arc::new(TimestampMillisecondArray::from(ts_data));
1175 let val_array = Arc::new(StringArray::from_iter_values(
1176 ts.iter().map(|v| v.to_string()),
1177 ));
1178 let batch = RecordBatch::try_new(
1179 schema,
1180 vec![ts_array.clone() as ArrayRef, val_array.clone() as ArrayRef],
1181 )
1182 .unwrap();
1183 let max_ts = ts.iter().max().copied().unwrap();
1184 let min_ts = ts.iter().min().copied().unwrap();
1185 BulkPart {
1186 batch,
1187 num_rows: ts.len(),
1188 max_ts,
1189 min_ts,
1190 sequence,
1191 timestamp_index: 0,
1192 }
1193 }
1194
1195 #[test]
1196 fn test_write_bulk() {
1197 let mut metadata_builder = RegionMetadataBuilder::new(0.into());
1198 metadata_builder
1199 .push_column_metadata(ColumnMetadata {
1200 column_schema: ColumnSchema::new(
1201 "ts",
1202 ConcreteDataType::timestamp_millisecond_datatype(),
1203 false,
1204 ),
1205 semantic_type: SemanticType::Timestamp,
1206 column_id: 0,
1207 })
1208 .push_column_metadata(ColumnMetadata {
1209 column_schema: ColumnSchema::new("val", ConcreteDataType::string_datatype(), false),
1210 semantic_type: SemanticType::Field,
1211 column_id: 1,
1212 })
1213 .primary_key(vec![]);
1214 let metadata = Arc::new(metadata_builder.build().unwrap());
1215
1216 let builder = Arc::new(TimeSeriesMemtableBuilder::default());
1217 let partitions = TimePartitions::new(
1218 metadata.clone(),
1219 builder.clone(),
1220 0,
1221 Some(Duration::from_secs(5)),
1222 );
1223
1224 partitions
1226 .write_bulk(build_part(&[1000, 2000, 3000], 0))
1227 .unwrap();
1228
1229 let parts = partitions.list_partitions();
1230 assert_eq!(1, parts.len());
1231 let iter = parts[0].memtable.iter(None, None, None).unwrap();
1232 let timestamps = collect_iter_timestamps(iter);
1233 assert_eq!(&[1000, 2000, 3000], ×tamps[..]);
1234
1235 partitions
1237 .write_bulk(build_part(&[4000, 5000, 6000], 1))
1238 .unwrap();
1239 let parts = partitions.list_partitions();
1240 assert_eq!(2, parts.len());
1241 let iter = parts[0].memtable.iter(None, None, None).unwrap();
1243 let timestamps = collect_iter_timestamps(iter);
1244 assert_eq!(&[1000, 2000, 3000, 4000], ×tamps[..]);
1245 let iter = parts[1].memtable.iter(None, None, None).unwrap();
1247 let timestamps = collect_iter_timestamps(iter);
1248 assert_eq!(&[5000, 6000], ×tamps[..]);
1249
1250 partitions
1252 .write_bulk(build_part(&[11000, 12000], 3))
1253 .unwrap();
1254
1255 let parts = partitions.list_partitions();
1256 assert_eq!(3, parts.len());
1257
1258 let iter = parts[2].memtable.iter(None, None, None).unwrap();
1260 let timestamps = collect_iter_timestamps(iter);
1261 assert_eq!(&[11000, 12000], ×tamps[..]);
1262
1263 let partitions = TimePartitions::new(metadata.clone(), builder, 3, None);
1265
1266 partitions
1267 .write_bulk(build_part(&[1000, 5000, 9000], 4))
1268 .unwrap();
1269
1270 let parts = partitions.list_partitions();
1271 assert_eq!(1, parts.len());
1272 let iter = parts[0].memtable.iter(None, None, None).unwrap();
1273 let timestamps = collect_iter_timestamps(iter);
1274 assert_eq!(&[1000, 5000, 9000], ×tamps[..]);
1275 }
1276
1277 #[test]
1278 fn test_split_record_batch() {
1279 let schema = Arc::new(Schema::new(vec![
1280 Field::new(
1281 "ts",
1282 DataType::Timestamp(TimeUnit::Millisecond.as_arrow_time_unit(), None),
1283 false,
1284 ),
1285 Field::new("val", DataType::Utf8, true),
1286 ]));
1287
1288 let ts_array = Arc::new(TimestampMillisecondArray::from(vec![
1289 1000, 2000, 5000, 7000, 8000,
1290 ]));
1291 let val_array = Arc::new(StringArray::from(vec!["a", "b", "c", "d", "e"]));
1292 let batch = RecordBatch::try_new(
1293 schema.clone(),
1294 vec![ts_array as ArrayRef, val_array as ArrayRef],
1295 )
1296 .unwrap();
1297
1298 let part = BulkPart {
1299 batch,
1300 num_rows: 5,
1301 max_ts: 8000,
1302 min_ts: 1000,
1303 sequence: 0,
1304 timestamp_index: 0,
1305 };
1306
1307 let result = filter_record_batch(&part, 1000, 2000).unwrap();
1308 assert!(result.is_some());
1309 let filtered = result.unwrap();
1310 assert_eq!(filtered.num_rows, 1);
1311 assert_eq!(filtered.min_ts, 1000);
1312 assert_eq!(filtered.max_ts, 1000);
1313
1314 let result = filter_record_batch(&part, 3000, 6000).unwrap();
1316 assert!(result.is_some());
1317 let filtered = result.unwrap();
1318 assert_eq!(filtered.num_rows, 1);
1319 assert_eq!(filtered.min_ts, 5000);
1320 assert_eq!(filtered.max_ts, 5000);
1321
1322 let result = filter_record_batch(&part, 3000, 4000).unwrap();
1324 assert!(result.is_none());
1325
1326 let result = filter_record_batch(&part, 0, 9000).unwrap();
1328 assert!(result.is_some());
1329 let filtered = result.unwrap();
1330 assert_eq!(filtered.num_rows, 5);
1331 assert_eq!(filtered.min_ts, 1000);
1332 assert_eq!(filtered.max_ts, 8000);
1333 }
1334}