1use std::collections::{HashMap, HashSet};
18use std::sync::{Arc, Mutex, MutexGuard};
19use std::time::Duration;
20
21use common_telemetry::debug;
22use common_time::timestamp::TimeUnit;
23use common_time::timestamp_millis::BucketAligned;
24use common_time::Timestamp;
25use datatypes::arrow;
26use datatypes::arrow::array::{
27 ArrayRef, BooleanArray, RecordBatch, RecordBatchOptions, TimestampMicrosecondArray,
28 TimestampMillisecondArray, TimestampNanosecondArray, TimestampSecondArray,
29};
30use datatypes::arrow::buffer::{BooleanBuffer, MutableBuffer};
31use datatypes::arrow::datatypes::{DataType, Int64Type};
32use mito_codec::key_values::KeyValue;
33use smallvec::{smallvec, SmallVec};
34use snafu::{OptionExt, ResultExt};
35use store_api::metadata::RegionMetadataRef;
36
37use crate::error;
38use crate::error::{InvalidRequestSnafu, Result};
39use crate::memtable::bulk::part::BulkPart;
40use crate::memtable::version::SmallMemtableVec;
41use crate::memtable::{KeyValues, MemtableBuilderRef, MemtableId, MemtableRef};
42
43const INITIAL_TIME_WINDOW: Duration = Duration::from_days(1);
45
46#[derive(Debug, Clone)]
48pub struct TimePartition {
49 memtable: MemtableRef,
51 time_range: PartTimeRange,
53}
54
55impl TimePartition {
56 fn contains_timestamp(&self, ts: Timestamp) -> bool {
58 self.time_range.contains_timestamp(ts)
59 }
60
61 fn write(&self, kvs: &KeyValues) -> Result<()> {
63 self.memtable.write(kvs)
64 }
65
66 fn write_record_batch(&self, rb: BulkPart) -> Result<()> {
68 self.memtable.write_bulk(rb)
69 }
70
71 fn write_record_batch_partial(&self, part: &BulkPart) -> Result<()> {
73 let Some(filtered) = filter_record_batch(
74 part,
75 self.time_range.min_timestamp.value(),
76 self.time_range.max_timestamp.value(),
77 )?
78 else {
79 return Ok(());
80 };
81 self.write_record_batch(filtered)
82 }
83}
84
85macro_rules! create_filter_buffer {
86 ($ts_array:expr, $min:expr, $max:expr) => {{
87 let len = $ts_array.len();
88 let mut buffer = MutableBuffer::new(len.div_ceil(64) * 8);
89
90 let f = |idx: usize| -> bool {
91 unsafe {
93 let val = $ts_array.value_unchecked(idx);
94 val >= $min && val < $max
95 }
96 };
97
98 let chunks = len / 64;
99 let remainder = len % 64;
100
101 for chunk in 0..chunks {
102 let mut packed = 0;
103 for bit_idx in 0..64 {
104 let i = bit_idx + chunk * 64;
105 packed |= (f(i) as u64) << bit_idx;
106 }
107 unsafe { buffer.push_unchecked(packed) }
109 }
110
111 if remainder != 0 {
112 let mut packed = 0;
113 for bit_idx in 0..remainder {
114 let i = bit_idx + chunks * 64;
115 packed |= (f(i) as u64) << bit_idx;
116 }
117 unsafe { buffer.push_unchecked(packed) }
119 }
120
121 BooleanArray::new(BooleanBuffer::new(buffer.into(), 0, len), None)
122 }};
123}
124
125macro_rules! handle_timestamp_array {
126 ($ts_array:expr, $array_type:ty, $min:expr, $max:expr) => {{
127 let ts_array = $ts_array.as_any().downcast_ref::<$array_type>().unwrap();
128 let filter = create_filter_buffer!(ts_array, $min, $max);
129
130 let res = arrow::compute::filter(ts_array, &filter).context(error::ComputeArrowSnafu)?;
131 if res.is_empty() {
132 return Ok(None);
133 }
134
135 let i64array = res.as_any().downcast_ref::<$array_type>().unwrap();
136 let max_ts = arrow::compute::max(i64array).unwrap();
138 let min_ts = arrow::compute::min(i64array).unwrap();
139
140 (res, filter, min_ts, max_ts)
141 }};
142}
143
144pub fn filter_record_batch(part: &BulkPart, min: i64, max: i64) -> Result<Option<BulkPart>> {
147 let ts_array = part.timestamps();
148 let (ts_array, filter, min_ts, max_ts) = match ts_array.data_type() {
149 DataType::Timestamp(unit, _) => match unit {
150 arrow::datatypes::TimeUnit::Second => {
151 handle_timestamp_array!(ts_array, TimestampSecondArray, min, max)
152 }
153 arrow::datatypes::TimeUnit::Millisecond => {
154 handle_timestamp_array!(ts_array, TimestampMillisecondArray, min, max)
155 }
156 arrow::datatypes::TimeUnit::Microsecond => {
157 handle_timestamp_array!(ts_array, TimestampMicrosecondArray, min, max)
158 }
159 arrow::datatypes::TimeUnit::Nanosecond => {
160 handle_timestamp_array!(ts_array, TimestampNanosecondArray, min, max)
161 }
162 },
163 _ => {
164 unreachable!("Got data type: {:?}", ts_array.data_type());
165 }
166 };
167
168 let num_rows = ts_array.len();
169 let arrays = part
170 .batch
171 .columns()
172 .iter()
173 .enumerate()
174 .map(|(index, array)| {
175 if index == part.timestamp_index {
176 Ok(ts_array.clone())
177 } else {
178 arrow::compute::filter(&array, &filter).context(error::ComputeArrowSnafu)
179 }
180 })
181 .collect::<Result<Vec<_>>>()?;
182 let batch = RecordBatch::try_new_with_options(
183 part.batch.schema(),
184 arrays,
185 &RecordBatchOptions::default().with_row_count(Some(num_rows)),
186 )
187 .context(error::NewRecordBatchSnafu)?;
188 Ok(Some(BulkPart {
189 batch,
190 max_ts,
191 min_ts,
192 sequence: part.sequence,
193 timestamp_index: part.timestamp_index,
194 raw_data: None,
195 }))
196}
197
198type PartitionVec = SmallVec<[TimePartition; 2]>;
199
200#[derive(Debug)]
202pub struct TimePartitions {
203 inner: Mutex<PartitionsInner>,
205 part_duration: Duration,
207 metadata: RegionMetadataRef,
209 builder: MemtableBuilderRef,
211}
212
213pub type TimePartitionsRef = Arc<TimePartitions>;
214
215impl TimePartitions {
216 pub fn new(
218 metadata: RegionMetadataRef,
219 builder: MemtableBuilderRef,
220 next_memtable_id: MemtableId,
221 part_duration: Option<Duration>,
222 ) -> Self {
223 let inner = PartitionsInner::new(next_memtable_id);
224 Self {
225 inner: Mutex::new(inner),
226 part_duration: part_duration.unwrap_or(INITIAL_TIME_WINDOW),
227 metadata,
228 builder,
229 }
230 }
231
232 pub fn write(&self, kvs: &KeyValues) -> Result<()> {
236 let parts = self.list_partitions();
238
239 for part in parts.iter().rev() {
242 let mut all_in_partition = true;
243 for kv in kvs.iter() {
244 let ts = kv.timestamp().as_timestamp().unwrap().unwrap();
246 if !part.contains_timestamp(ts) {
247 all_in_partition = false;
248 break;
249 }
250 }
251 if !all_in_partition {
252 continue;
253 }
254
255 return part.write(kvs);
257 }
258
259 self.write_multi_parts(kvs, &parts)
261 }
262
263 pub fn write_bulk(&self, part: BulkPart) -> Result<()> {
264 let time_type = self
265 .metadata
266 .time_index_column()
267 .column_schema
268 .data_type
269 .as_timestamp()
270 .unwrap();
271
272 let parts = self.list_partitions();
274 let (matching_parts, missing_parts) = self.find_partitions_by_time_range(
275 part.timestamps(),
276 &parts,
277 time_type.create_timestamp(part.min_ts),
278 time_type.create_timestamp(part.max_ts),
279 )?;
280
281 if matching_parts.len() == 1 && missing_parts.is_empty() {
282 return matching_parts[0].write_record_batch(part);
284 }
285
286 for matching in matching_parts {
287 matching.write_record_batch_partial(&part)?
288 }
289
290 for missing in missing_parts {
291 let new_part = {
292 let mut inner = self.inner.lock().unwrap();
293 self.get_or_create_time_partition(missing, &mut inner)?
294 };
295 new_part.write_record_batch_partial(&part)?;
296 }
297 Ok(())
298 }
299
300 fn get_or_create_time_partition(
303 &self,
304 part_start: Timestamp,
305 inner: &mut MutexGuard<PartitionsInner>,
306 ) -> Result<TimePartition> {
307 let part_pos = match inner
308 .parts
309 .iter()
310 .position(|part| part.time_range.min_timestamp == part_start)
311 {
312 Some(pos) => pos,
313 None => {
314 let range = PartTimeRange::from_start_duration(part_start, self.part_duration)
315 .with_context(|| InvalidRequestSnafu {
316 region_id: self.metadata.region_id,
317 reason: format!(
318 "Partition time range for {part_start:?} is out of bound, bucket size: {:?}", self.part_duration
319 ),
320 })?;
321 let memtable = self
322 .builder
323 .build(inner.alloc_memtable_id(), &self.metadata);
324 debug!(
325 "Create time partition {:?} for region {}, duration: {:?}, memtable_id: {}, parts_total: {}",
326 range,
327 self.metadata.region_id,
328 self.part_duration,
329 memtable.id(),
330 inner.parts.len() + 1
331 );
332 let pos = inner.parts.len();
333 inner.parts.push(TimePartition {
334 memtable,
335 time_range: range,
336 });
337 pos
338 }
339 };
340 Ok(inner.parts[part_pos].clone())
341 }
342
343 pub fn list_memtables(&self, memtables: &mut Vec<MemtableRef>) {
345 let inner = self.inner.lock().unwrap();
346 memtables.extend(inner.parts.iter().map(|part| part.memtable.clone()));
347 }
348
349 pub fn num_partitions(&self) -> usize {
351 let inner = self.inner.lock().unwrap();
352 inner.parts.len()
353 }
354
355 pub fn is_empty(&self) -> bool {
357 let inner = self.inner.lock().unwrap();
358 inner.parts.iter().all(|part| part.memtable.is_empty())
359 }
360
361 pub fn freeze(&self) -> Result<()> {
363 let inner = self.inner.lock().unwrap();
364 for part in &*inner.parts {
365 part.memtable.freeze()?;
366 }
367 Ok(())
368 }
369
370 pub fn fork(&self, metadata: &RegionMetadataRef, part_duration: Option<Duration>) -> Self {
372 let part_duration = part_duration.unwrap_or(self.part_duration);
374
375 let mut inner = self.inner.lock().unwrap();
376 let latest_part = inner
377 .parts
378 .iter()
379 .max_by_key(|part| part.time_range.min_timestamp)
380 .cloned();
381
382 let Some(old_part) = latest_part else {
383 return Self::new(
385 metadata.clone(),
386 self.builder.clone(),
387 inner.next_memtable_id,
388 Some(part_duration),
389 );
390 };
391
392 let old_stats = old_part.memtable.stats();
393 let partitions_inner = old_stats
395 .time_range()
396 .and_then(|(_, old_stats_end_timestamp)| {
397 partition_start_timestamp(old_stats_end_timestamp, part_duration)
398 .and_then(|start| PartTimeRange::from_start_duration(start, part_duration))
399 })
400 .map(|part_time_range| {
401 let memtable = old_part.memtable.fork(inner.alloc_memtable_id(), metadata);
403 let part = TimePartition {
404 memtable,
405 time_range: part_time_range,
406 };
407 PartitionsInner::with_partition(part, inner.next_memtable_id)
408 })
409 .unwrap_or_else(|| PartitionsInner::new(inner.next_memtable_id));
410
411 Self {
412 inner: Mutex::new(partitions_inner),
413 part_duration,
414 metadata: metadata.clone(),
415 builder: self.builder.clone(),
416 }
417 }
418
419 pub(crate) fn part_duration(&self) -> Duration {
421 self.part_duration
422 }
423
424 pub(crate) fn memory_usage(&self) -> usize {
426 let inner = self.inner.lock().unwrap();
427 inner
428 .parts
429 .iter()
430 .map(|part| part.memtable.stats().estimated_bytes)
431 .sum()
432 }
433
434 pub(crate) fn num_rows(&self) -> u64 {
436 let inner = self.inner.lock().unwrap();
437 inner
438 .parts
439 .iter()
440 .map(|part| part.memtable.stats().num_rows as u64)
441 .sum()
442 }
443
444 pub(crate) fn list_memtables_to_small_vec(&self, memtables: &mut SmallMemtableVec) {
446 let inner = self.inner.lock().unwrap();
447 memtables.extend(inner.parts.iter().map(|part| part.memtable.clone()));
448 }
449
450 pub(crate) fn next_memtable_id(&self) -> MemtableId {
452 let inner = self.inner.lock().unwrap();
453 inner.next_memtable_id
454 }
455
456 pub(crate) fn new_with_part_duration(&self, part_duration: Option<Duration>) -> Self {
459 debug_assert!(self.is_empty());
460
461 Self::new(
462 self.metadata.clone(),
463 self.builder.clone(),
464 self.next_memtable_id(),
465 Some(part_duration.unwrap_or(self.part_duration)),
466 )
467 }
468
469 fn list_partitions(&self) -> PartitionVec {
471 let inner = self.inner.lock().unwrap();
472 inner.parts.clone()
473 }
474
475 fn find_partitions_by_time_range<'a>(
478 &self,
479 ts_array: &ArrayRef,
480 existing_parts: &'a [TimePartition],
481 min: Timestamp,
482 max: Timestamp,
483 ) -> Result<(Vec<&'a TimePartition>, Vec<Timestamp>)> {
484 let mut matching = Vec::new();
485
486 let mut present = HashSet::new();
487 for part in existing_parts {
489 let part_time_range = &part.time_range;
490 if !(max < part_time_range.min_timestamp || min >= part_time_range.max_timestamp) {
491 matching.push(part);
492 present.insert(part_time_range.min_timestamp.value());
493 }
494 }
495
496 let part_duration = self.part_duration_or_default();
498 let timestamp_unit = self.metadata.time_index_type().unit();
499
500 let part_duration_sec = part_duration.as_secs() as i64;
501 let start_bucket = min
503 .convert_to(TimeUnit::Second)
504 .unwrap()
505 .value()
506 .div_euclid(part_duration_sec);
507 let end_bucket = max
508 .convert_to(TimeUnit::Second)
509 .unwrap()
510 .value()
511 .div_euclid(part_duration_sec);
512 let bucket_num = (end_bucket - start_bucket + 1) as usize;
513
514 let num_timestamps = ts_array.len();
515 let missing = if bucket_num <= num_timestamps {
516 (start_bucket..=end_bucket)
517 .filter_map(|start_sec| {
518 let Some(timestamp) = Timestamp::new_second(start_sec * part_duration_sec)
519 .convert_to(timestamp_unit)
520 else {
521 return Some(
522 InvalidRequestSnafu {
523 region_id: self.metadata.region_id,
524 reason: format!("Timestamp out of range: {}", start_sec),
525 }
526 .fail(),
527 );
528 };
529 if present.insert(timestamp.value()) {
530 Some(Ok(timestamp))
531 } else {
532 None
533 }
534 })
535 .collect::<Result<Vec<_>>>()?
536 } else {
537 let ts_primitive = match ts_array.data_type() {
538 DataType::Timestamp(unit, _) => match unit {
539 arrow::datatypes::TimeUnit::Second => ts_array
540 .as_any()
541 .downcast_ref::<TimestampSecondArray>()
542 .unwrap()
543 .reinterpret_cast::<Int64Type>(),
544 arrow::datatypes::TimeUnit::Millisecond => ts_array
545 .as_any()
546 .downcast_ref::<TimestampMillisecondArray>()
547 .unwrap()
548 .reinterpret_cast::<Int64Type>(),
549 arrow::datatypes::TimeUnit::Microsecond => ts_array
550 .as_any()
551 .downcast_ref::<TimestampMicrosecondArray>()
552 .unwrap()
553 .reinterpret_cast::<Int64Type>(),
554 arrow::datatypes::TimeUnit::Nanosecond => ts_array
555 .as_any()
556 .downcast_ref::<TimestampNanosecondArray>()
557 .unwrap()
558 .reinterpret_cast::<Int64Type>(),
559 },
560 _ => unreachable!(),
561 };
562
563 ts_primitive
564 .values()
565 .iter()
566 .filter_map(|ts| {
567 let ts = self.metadata.time_index_type().create_timestamp(*ts);
568 let Some(bucket_start) = ts
569 .convert_to(TimeUnit::Second)
570 .and_then(|ts| ts.align_by_bucket(part_duration_sec))
571 .and_then(|ts| ts.convert_to(timestamp_unit))
572 else {
573 return Some(
574 InvalidRequestSnafu {
575 region_id: self.metadata.region_id,
576 reason: format!("Timestamp out of range: {:?}", ts),
577 }
578 .fail(),
579 );
580 };
581 if present.insert(bucket_start.value()) {
582 Some(Ok(bucket_start))
583 } else {
584 None
585 }
586 })
587 .collect::<Result<Vec<_>>>()?
588 };
589 Ok((matching, missing))
590 }
591
592 fn part_duration_or_default(&self) -> Duration {
594 self.part_duration
595 }
596
597 fn write_multi_parts(&self, kvs: &KeyValues, parts: &PartitionVec) -> Result<()> {
599 let mut parts_to_write = HashMap::new();
600 let mut missing_parts = HashMap::new();
601 for kv in kvs.iter() {
602 let mut part_found = false;
603 let ts = kv.timestamp().as_timestamp().unwrap().unwrap();
605 for part in parts {
606 if part.contains_timestamp(ts) {
607 parts_to_write
608 .entry(part.time_range.min_timestamp)
609 .or_insert_with(|| PartitionToWrite {
610 partition: part.clone(),
611 key_values: Vec::new(),
612 })
613 .key_values
614 .push(kv);
615 part_found = true;
616 break;
617 }
618 }
619
620 if !part_found {
621 let part_duration = self.part_duration_or_default();
624 let part_start =
625 partition_start_timestamp(ts, part_duration).with_context(|| {
626 InvalidRequestSnafu {
627 region_id: self.metadata.region_id,
628 reason: format!(
629 "timestamp {ts:?} and bucket {part_duration:?} are out of range"
630 ),
631 }
632 })?;
633 missing_parts
634 .entry(part_start)
635 .or_insert_with(Vec::new)
636 .push(kv);
637 }
638 }
639
640 for part_to_write in parts_to_write.into_values() {
642 for kv in part_to_write.key_values {
643 part_to_write.partition.memtable.write_one(kv)?;
644 }
645 }
646
647 let mut inner = self.inner.lock().unwrap();
650 for (part_start, key_values) in missing_parts {
651 let partition = self.get_or_create_time_partition(part_start, &mut inner)?;
652 for kv in key_values {
653 partition.memtable.write_one(kv)?;
654 }
655 }
656
657 Ok(())
658 }
659}
660
661fn partition_start_timestamp(ts: Timestamp, bucket: Duration) -> Option<Timestamp> {
665 let ts_sec = ts.convert_to(TimeUnit::Second).unwrap();
667 let bucket_sec: i64 = bucket.as_secs().try_into().ok()?;
668 let start_sec = ts_sec.align_by_bucket(bucket_sec)?;
669 start_sec.convert_to(ts.unit())
670}
671
672#[derive(Debug)]
673struct PartitionsInner {
674 parts: PartitionVec,
676 next_memtable_id: MemtableId,
678}
679
680impl PartitionsInner {
681 fn new(next_memtable_id: MemtableId) -> Self {
682 Self {
683 parts: Default::default(),
684 next_memtable_id,
685 }
686 }
687
688 fn with_partition(part: TimePartition, next_memtable_id: MemtableId) -> Self {
689 Self {
690 parts: smallvec![part],
691 next_memtable_id,
692 }
693 }
694
695 fn alloc_memtable_id(&mut self) -> MemtableId {
696 let id = self.next_memtable_id;
697 self.next_memtable_id += 1;
698 id
699 }
700}
701
702#[derive(Debug, Clone, Copy)]
704struct PartTimeRange {
705 min_timestamp: Timestamp,
707 max_timestamp: Timestamp,
709}
710
711impl PartTimeRange {
712 fn from_start_duration(start: Timestamp, duration: Duration) -> Option<Self> {
713 let start_sec = start.convert_to(TimeUnit::Second)?;
714 let end_sec = start_sec.add_duration(duration).ok()?;
715 let min_timestamp = start_sec.convert_to(start.unit())?;
716 let max_timestamp = end_sec.convert_to(start.unit())?;
717
718 Some(Self {
719 min_timestamp,
720 max_timestamp,
721 })
722 }
723
724 fn contains_timestamp(&self, ts: Timestamp) -> bool {
726 self.min_timestamp <= ts && ts < self.max_timestamp
727 }
728}
729
730struct PartitionToWrite<'a> {
731 partition: TimePartition,
732 key_values: Vec<KeyValue<'a>>,
733}
734
735#[cfg(test)]
736mod tests {
737 use std::sync::Arc;
738
739 use api::v1::SemanticType;
740 use datatypes::arrow::array::{ArrayRef, StringArray, TimestampMillisecondArray};
741 use datatypes::arrow::datatypes::{DataType, Field, Schema};
742 use datatypes::arrow::record_batch::RecordBatch;
743 use datatypes::prelude::ConcreteDataType;
744 use datatypes::schema::ColumnSchema;
745 use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
746 use store_api::storage::SequenceNumber;
747
748 use super::*;
749 use crate::memtable::partition_tree::PartitionTreeMemtableBuilder;
750 use crate::memtable::time_series::TimeSeriesMemtableBuilder;
751 use crate::test_util::memtable_util::{self, collect_iter_timestamps};
752
753 #[test]
754 fn test_no_duration() {
755 let metadata = memtable_util::metadata_for_test();
756 let builder = Arc::new(PartitionTreeMemtableBuilder::default());
757 let partitions = TimePartitions::new(metadata.clone(), builder, 0, None);
758 assert_eq!(0, partitions.num_partitions());
759 assert!(partitions.is_empty());
760
761 let kvs = memtable_util::build_key_values(
762 &metadata,
763 "hello".to_string(),
764 0,
765 &[1000, 3000, 7000, 5000, 6000],
766 0, );
768 partitions.write(&kvs).unwrap();
769
770 assert_eq!(1, partitions.num_partitions());
771 assert!(!partitions.is_empty());
772 let mut memtables = Vec::new();
773 partitions.list_memtables(&mut memtables);
774 assert_eq!(0, memtables[0].id());
775
776 let iter = memtables[0].iter(None, None, None).unwrap();
777 let timestamps = collect_iter_timestamps(iter);
778 assert_eq!(&[1000, 3000, 5000, 6000, 7000], ×tamps[..]);
779 }
780
781 #[test]
782 fn test_write_single_part() {
783 let metadata = memtable_util::metadata_for_test();
784 let builder = Arc::new(PartitionTreeMemtableBuilder::default());
785 let partitions =
786 TimePartitions::new(metadata.clone(), builder, 0, Some(Duration::from_secs(10)));
787 assert_eq!(0, partitions.num_partitions());
788
789 let kvs = memtable_util::build_key_values(
790 &metadata,
791 "hello".to_string(),
792 0,
793 &[5000, 2000, 0],
794 0, );
796 partitions.write(&kvs).unwrap();
798 assert_eq!(1, partitions.num_partitions());
799 assert!(!partitions.is_empty());
800
801 let kvs = memtable_util::build_key_values(
802 &metadata,
803 "hello".to_string(),
804 0,
805 &[3000, 7000, 4000],
806 3, );
808 partitions.write(&kvs).unwrap();
810 assert_eq!(1, partitions.num_partitions());
811
812 let mut memtables = Vec::new();
813 partitions.list_memtables(&mut memtables);
814 let iter = memtables[0].iter(None, None, None).unwrap();
815 let timestamps = collect_iter_timestamps(iter);
816 assert_eq!(&[0, 2000, 3000, 4000, 5000, 7000], ×tamps[..]);
817 let parts = partitions.list_partitions();
818 assert_eq!(
819 Timestamp::new_millisecond(0),
820 parts[0].time_range.min_timestamp
821 );
822 assert_eq!(
823 Timestamp::new_millisecond(10000),
824 parts[0].time_range.max_timestamp
825 );
826 }
827
828 #[cfg(test)]
829 fn new_multi_partitions(metadata: &RegionMetadataRef) -> TimePartitions {
830 let builder = Arc::new(PartitionTreeMemtableBuilder::default());
831 let partitions =
832 TimePartitions::new(metadata.clone(), builder, 0, Some(Duration::from_secs(5)));
833 assert_eq!(0, partitions.num_partitions());
834
835 let kvs = memtable_util::build_key_values(
836 metadata,
837 "hello".to_string(),
838 0,
839 &[2000, 0],
840 0, );
842 partitions.write(&kvs).unwrap();
844 assert_eq!(1, partitions.num_partitions());
845 assert!(!partitions.is_empty());
846
847 let kvs = memtable_util::build_key_values(
848 metadata,
849 "hello".to_string(),
850 0,
851 &[3000, 7000, 4000, 5000],
852 2, );
854 partitions.write(&kvs).unwrap();
856 assert_eq!(2, partitions.num_partitions());
857
858 partitions
859 }
860
861 #[test]
862 fn test_write_multi_parts() {
863 let metadata = memtable_util::metadata_for_test();
864 let partitions = new_multi_partitions(&metadata);
865
866 let parts = partitions.list_partitions();
867 let iter = parts[0].memtable.iter(None, None, None).unwrap();
868 let timestamps = collect_iter_timestamps(iter);
869 assert_eq!(0, parts[0].memtable.id());
870 assert_eq!(
871 Timestamp::new_millisecond(0),
872 parts[0].time_range.min_timestamp
873 );
874 assert_eq!(
875 Timestamp::new_millisecond(5000),
876 parts[0].time_range.max_timestamp
877 );
878 assert_eq!(&[0, 2000, 3000, 4000], ×tamps[..]);
879 let iter = parts[1].memtable.iter(None, None, None).unwrap();
880 assert_eq!(1, parts[1].memtable.id());
881 let timestamps = collect_iter_timestamps(iter);
882 assert_eq!(&[5000, 7000], ×tamps[..]);
883 assert_eq!(
884 Timestamp::new_millisecond(5000),
885 parts[1].time_range.min_timestamp
886 );
887 assert_eq!(
888 Timestamp::new_millisecond(10000),
889 parts[1].time_range.max_timestamp
890 );
891 }
892
893 #[test]
894 fn test_new_with_part_duration() {
895 let metadata = memtable_util::metadata_for_test();
896 let builder = Arc::new(PartitionTreeMemtableBuilder::default());
897 let partitions = TimePartitions::new(metadata.clone(), builder.clone(), 0, None);
898
899 let new_parts = partitions.new_with_part_duration(Some(Duration::from_secs(5)));
900 assert_eq!(Duration::from_secs(5), new_parts.part_duration());
901 assert_eq!(0, new_parts.next_memtable_id());
902
903 let new_parts = new_parts.new_with_part_duration(None);
905 assert_eq!(Duration::from_secs(5), new_parts.part_duration());
906 assert_eq!(0, new_parts.next_memtable_id());
908
909 let new_parts = new_parts.new_with_part_duration(Some(Duration::from_secs(10)));
910 assert_eq!(Duration::from_secs(10), new_parts.part_duration());
911 assert_eq!(0, new_parts.next_memtable_id());
913
914 let builder = Arc::new(PartitionTreeMemtableBuilder::default());
915 let partitions = TimePartitions::new(metadata.clone(), builder.clone(), 0, None);
916 let new_parts = partitions.new_with_part_duration(None);
918 assert_eq!(INITIAL_TIME_WINDOW, new_parts.part_duration());
919 assert_eq!(0, new_parts.next_memtable_id());
920 }
921
922 #[test]
923 fn test_fork_empty() {
924 let metadata = memtable_util::metadata_for_test();
925 let builder = Arc::new(PartitionTreeMemtableBuilder::default());
926 let partitions = TimePartitions::new(metadata.clone(), builder, 0, None);
927 partitions.freeze().unwrap();
928 let new_parts = partitions.fork(&metadata, None);
929 assert_eq!(INITIAL_TIME_WINDOW, new_parts.part_duration());
930 assert!(new_parts.list_partitions().is_empty());
931 assert_eq!(0, new_parts.next_memtable_id());
932
933 new_parts.freeze().unwrap();
934 let new_parts = new_parts.fork(&metadata, Some(Duration::from_secs(5)));
935 assert_eq!(Duration::from_secs(5), new_parts.part_duration());
936 assert!(new_parts.list_partitions().is_empty());
937 assert_eq!(0, new_parts.next_memtable_id());
938
939 new_parts.freeze().unwrap();
940 let new_parts = new_parts.fork(&metadata, None);
941 assert_eq!(Duration::from_secs(5), new_parts.part_duration());
943 assert!(new_parts.list_partitions().is_empty());
944 assert_eq!(0, new_parts.next_memtable_id());
945
946 new_parts.freeze().unwrap();
947 let new_parts = new_parts.fork(&metadata, Some(Duration::from_secs(10)));
948 assert_eq!(Duration::from_secs(10), new_parts.part_duration());
949 assert!(new_parts.list_partitions().is_empty());
950 assert_eq!(0, new_parts.next_memtable_id());
951 }
952
953 #[test]
954 fn test_fork_non_empty_none() {
955 let metadata = memtable_util::metadata_for_test();
956 let partitions = new_multi_partitions(&metadata);
957 partitions.freeze().unwrap();
958
959 let new_parts = partitions.fork(&metadata, None);
961 assert!(new_parts.is_empty());
962 assert_eq!(Duration::from_secs(5), new_parts.part_duration());
963 assert_eq!(2, new_parts.list_partitions()[0].memtable.id());
964 assert_eq!(3, new_parts.next_memtable_id());
965
966 let new_parts = partitions.fork(&metadata, Some(Duration::from_secs(10)));
968 assert!(new_parts.is_empty());
969 assert_eq!(Duration::from_secs(10), new_parts.part_duration());
970 assert_eq!(3, new_parts.list_partitions()[0].memtable.id());
971 assert_eq!(4, new_parts.next_memtable_id());
972 }
973
974 #[test]
975 fn test_find_partitions_by_time_range() {
976 let metadata = memtable_util::metadata_for_test();
977 let builder = Arc::new(PartitionTreeMemtableBuilder::default());
978
979 let partitions = TimePartitions::new(metadata.clone(), builder.clone(), 0, None);
981 let parts = partitions.list_partitions();
982 let (matching, missing) = partitions
983 .find_partitions_by_time_range(
984 &(Arc::new(TimestampMillisecondArray::from_iter_values(1000..=2000)) as ArrayRef),
985 &parts,
986 Timestamp::new_millisecond(1000),
987 Timestamp::new_millisecond(2000),
988 )
989 .unwrap();
990 assert_eq!(matching.len(), 0);
991 assert_eq!(missing.len(), 1);
992 assert_eq!(missing[0], Timestamp::new_millisecond(0));
993
994 let partitions = TimePartitions::new(
996 metadata.clone(),
997 builder.clone(),
998 0,
999 Some(Duration::from_secs(5)),
1000 );
1001
1002 let kvs =
1004 memtable_util::build_key_values(&metadata, "hello".to_string(), 0, &[2000, 4000], 0);
1005 partitions.write(&kvs).unwrap();
1006 let kvs =
1007 memtable_util::build_key_values(&metadata, "hello".to_string(), 0, &[7000, 8000], 2);
1008 partitions.write(&kvs).unwrap();
1009
1010 let parts = partitions.list_partitions();
1011 assert_eq!(2, parts.len());
1012
1013 let (matching, missing) = partitions
1015 .find_partitions_by_time_range(
1016 &(Arc::new(TimestampMillisecondArray::from_iter_values(2000..=4000)) as ArrayRef),
1017 &parts,
1018 Timestamp::new_millisecond(2000),
1019 Timestamp::new_millisecond(4000),
1020 )
1021 .unwrap();
1022 assert_eq!(matching.len(), 1);
1023 assert!(missing.is_empty());
1024 assert_eq!(matching[0].time_range.min_timestamp.value(), 0);
1025
1026 let (matching, missing) = partitions
1028 .find_partitions_by_time_range(
1029 &(Arc::new(TimestampMillisecondArray::from_iter_values(3000..=8000)) as ArrayRef),
1030 &parts,
1031 Timestamp::new_millisecond(3000),
1032 Timestamp::new_millisecond(8000),
1033 )
1034 .unwrap();
1035 assert_eq!(matching.len(), 2);
1036 assert!(missing.is_empty());
1037 assert_eq!(matching[0].time_range.min_timestamp.value(), 0);
1038 assert_eq!(matching[1].time_range.min_timestamp.value(), 5000);
1039
1040 let (matching, missing) = partitions
1042 .find_partitions_by_time_range(
1043 &(Arc::new(TimestampMillisecondArray::from_iter_values(12000..=13000)) as ArrayRef),
1044 &parts,
1045 Timestamp::new_millisecond(12000),
1046 Timestamp::new_millisecond(13000),
1047 )
1048 .unwrap();
1049 assert!(matching.is_empty());
1050 assert_eq!(missing.len(), 1);
1051 assert_eq!(missing[0].value(), 10000);
1052
1053 let (matching, missing) = partitions
1055 .find_partitions_by_time_range(
1056 &(Arc::new(TimestampMillisecondArray::from_iter_values(4000..=6000)) as ArrayRef),
1057 &parts,
1058 Timestamp::new_millisecond(4000),
1059 Timestamp::new_millisecond(6000),
1060 )
1061 .unwrap();
1062 assert_eq!(matching.len(), 2);
1063 assert!(missing.is_empty());
1064 assert_eq!(matching[0].time_range.min_timestamp.value(), 0);
1065 assert_eq!(matching[1].time_range.min_timestamp.value(), 5000);
1066
1067 let (matching, missing) = partitions
1069 .find_partitions_by_time_range(
1070 &(Arc::new(TimestampMillisecondArray::from_iter_values(4999..=5000)) as ArrayRef),
1071 &parts,
1072 Timestamp::new_millisecond(4999),
1073 Timestamp::new_millisecond(5000),
1074 )
1075 .unwrap();
1076 assert_eq!(matching.len(), 2);
1077 assert!(missing.is_empty());
1078 assert_eq!(matching[0].time_range.min_timestamp.value(), 0);
1079 assert_eq!(matching[1].time_range.min_timestamp.value(), 5000);
1080
1081 let (matching, missing) = partitions
1083 .find_partitions_by_time_range(
1084 &(Arc::new(TimestampMillisecondArray::from_iter_values(9999..=10000)) as ArrayRef),
1085 &parts,
1086 Timestamp::new_millisecond(9999),
1087 Timestamp::new_millisecond(10000),
1088 )
1089 .unwrap();
1090 assert_eq!(matching.len(), 1);
1091 assert_eq!(1, missing.len());
1092 assert_eq!(matching[0].time_range.min_timestamp.value(), 5000);
1093 assert_eq!(missing[0].value(), 10000);
1094
1095 let (matching, missing) = partitions
1097 .find_partitions_by_time_range(
1098 &(Arc::new(TimestampMillisecondArray::from_iter_values(-1000..=1000)) as ArrayRef),
1099 &parts,
1100 Timestamp::new_millisecond(-1000),
1101 Timestamp::new_millisecond(1000),
1102 )
1103 .unwrap();
1104 assert_eq!(matching.len(), 1);
1105 assert_eq!(matching[0].time_range.min_timestamp.value(), 0);
1106 assert_eq!(1, missing.len());
1107 assert_eq!(missing[0].value(), -5000);
1108
1109 let (matching, missing) = partitions
1111 .find_partitions_by_time_range(
1112 &(Arc::new(TimestampMillisecondArray::from(vec![
1113 -100000000000,
1114 0,
1115 100000000000,
1116 ])) as ArrayRef),
1117 &parts,
1118 Timestamp::new_millisecond(-100000000000),
1119 Timestamp::new_millisecond(100000000000),
1120 )
1121 .unwrap();
1122 assert_eq!(2, matching.len());
1123 assert_eq!(matching[0].time_range.min_timestamp.value(), 0);
1124 assert_eq!(matching[1].time_range.min_timestamp.value(), 5000);
1125 assert_eq!(2, missing.len());
1126 assert_eq!(missing[0].value(), -100000000000);
1127 assert_eq!(missing[1].value(), 100000000000);
1128 }
1129
1130 fn build_part(ts: &[i64], sequence: SequenceNumber) -> BulkPart {
1131 let schema = Arc::new(Schema::new(vec![
1132 Field::new(
1133 "ts",
1134 DataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
1135 false,
1136 ),
1137 Field::new("val", DataType::Utf8, true),
1138 ]));
1139 let ts_data = ts.to_vec();
1140 let ts_array = Arc::new(TimestampMillisecondArray::from(ts_data));
1141 let val_array = Arc::new(StringArray::from_iter_values(
1142 ts.iter().map(|v| v.to_string()),
1143 ));
1144 let batch = RecordBatch::try_new(
1145 schema,
1146 vec![ts_array.clone() as ArrayRef, val_array.clone() as ArrayRef],
1147 )
1148 .unwrap();
1149 let max_ts = ts.iter().max().copied().unwrap();
1150 let min_ts = ts.iter().min().copied().unwrap();
1151 BulkPart {
1152 batch,
1153 max_ts,
1154 min_ts,
1155 sequence,
1156 timestamp_index: 0,
1157 raw_data: None,
1158 }
1159 }
1160
1161 #[test]
1162 fn test_write_bulk() {
1163 let mut metadata_builder = RegionMetadataBuilder::new(0.into());
1164 metadata_builder
1165 .push_column_metadata(ColumnMetadata {
1166 column_schema: ColumnSchema::new(
1167 "ts",
1168 ConcreteDataType::timestamp_millisecond_datatype(),
1169 false,
1170 ),
1171 semantic_type: SemanticType::Timestamp,
1172 column_id: 0,
1173 })
1174 .push_column_metadata(ColumnMetadata {
1175 column_schema: ColumnSchema::new("val", ConcreteDataType::string_datatype(), false),
1176 semantic_type: SemanticType::Field,
1177 column_id: 1,
1178 })
1179 .primary_key(vec![]);
1180 let metadata = Arc::new(metadata_builder.build().unwrap());
1181
1182 let builder = Arc::new(TimeSeriesMemtableBuilder::default());
1183 let partitions = TimePartitions::new(
1184 metadata.clone(),
1185 builder.clone(),
1186 0,
1187 Some(Duration::from_secs(5)),
1188 );
1189
1190 partitions
1192 .write_bulk(build_part(&[1000, 2000, 3000], 0))
1193 .unwrap();
1194
1195 let parts = partitions.list_partitions();
1196 assert_eq!(1, parts.len());
1197 let iter = parts[0].memtable.iter(None, None, None).unwrap();
1198 let timestamps = collect_iter_timestamps(iter);
1199 assert_eq!(&[1000, 2000, 3000], ×tamps[..]);
1200
1201 partitions
1203 .write_bulk(build_part(&[4000, 5000, 6000], 1))
1204 .unwrap();
1205 let parts = partitions.list_partitions();
1206 assert_eq!(2, parts.len());
1207 let iter = parts[0].memtable.iter(None, None, None).unwrap();
1209 let timestamps = collect_iter_timestamps(iter);
1210 assert_eq!(&[1000, 2000, 3000, 4000], ×tamps[..]);
1211 let iter = parts[1].memtable.iter(None, None, None).unwrap();
1213 let timestamps = collect_iter_timestamps(iter);
1214 assert_eq!(&[5000, 6000], ×tamps[..]);
1215
1216 partitions
1218 .write_bulk(build_part(&[11000, 12000], 3))
1219 .unwrap();
1220
1221 let parts = partitions.list_partitions();
1222 assert_eq!(3, parts.len());
1223
1224 let iter = parts[2].memtable.iter(None, None, None).unwrap();
1226 let timestamps = collect_iter_timestamps(iter);
1227 assert_eq!(&[11000, 12000], ×tamps[..]);
1228
1229 let partitions = TimePartitions::new(metadata.clone(), builder, 3, None);
1231
1232 partitions
1233 .write_bulk(build_part(&[1000, 5000, 9000], 4))
1234 .unwrap();
1235
1236 let parts = partitions.list_partitions();
1237 assert_eq!(1, parts.len());
1238 let iter = parts[0].memtable.iter(None, None, None).unwrap();
1239 let timestamps = collect_iter_timestamps(iter);
1240 assert_eq!(&[1000, 5000, 9000], ×tamps[..]);
1241 }
1242
1243 #[test]
1244 fn test_split_record_batch() {
1245 let schema = Arc::new(Schema::new(vec![
1246 Field::new(
1247 "ts",
1248 DataType::Timestamp(TimeUnit::Millisecond.as_arrow_time_unit(), None),
1249 false,
1250 ),
1251 Field::new("val", DataType::Utf8, true),
1252 ]));
1253
1254 let ts_array = Arc::new(TimestampMillisecondArray::from(vec![
1255 1000, 2000, 5000, 7000, 8000,
1256 ]));
1257 let val_array = Arc::new(StringArray::from(vec!["a", "b", "c", "d", "e"]));
1258 let batch = RecordBatch::try_new(
1259 schema.clone(),
1260 vec![ts_array as ArrayRef, val_array as ArrayRef],
1261 )
1262 .unwrap();
1263
1264 let part = BulkPart {
1265 batch,
1266 max_ts: 8000,
1267 min_ts: 1000,
1268 sequence: 0,
1269 timestamp_index: 0,
1270 raw_data: None,
1271 };
1272
1273 let result = filter_record_batch(&part, 1000, 2000).unwrap();
1274 assert!(result.is_some());
1275 let filtered = result.unwrap();
1276 assert_eq!(filtered.num_rows(), 1);
1277 assert_eq!(filtered.min_ts, 1000);
1278 assert_eq!(filtered.max_ts, 1000);
1279
1280 let result = filter_record_batch(&part, 3000, 6000).unwrap();
1282 assert!(result.is_some());
1283 let filtered = result.unwrap();
1284 assert_eq!(filtered.num_rows(), 1);
1285 assert_eq!(filtered.min_ts, 5000);
1286 assert_eq!(filtered.max_ts, 5000);
1287
1288 let result = filter_record_batch(&part, 3000, 4000).unwrap();
1290 assert!(result.is_none());
1291
1292 let result = filter_record_batch(&part, 0, 9000).unwrap();
1294 assert!(result.is_some());
1295 let filtered = result.unwrap();
1296 assert_eq!(filtered.num_rows(), 5);
1297 assert_eq!(filtered.min_ts, 1000);
1298 assert_eq!(filtered.max_ts, 8000);
1299 }
1300}