1use std::collections::{HashMap, HashSet};
18use std::sync::{Arc, Mutex, MutexGuard};
19use std::time::Duration;
20
21use common_telemetry::debug;
22use common_time::timestamp::TimeUnit;
23use common_time::timestamp_millis::BucketAligned;
24use common_time::Timestamp;
25use datatypes::arrow;
26use datatypes::arrow::array::{
27 ArrayRef, BooleanArray, RecordBatch, RecordBatchOptions, TimestampMicrosecondArray,
28 TimestampMillisecondArray, TimestampNanosecondArray, TimestampSecondArray,
29};
30use datatypes::arrow::buffer::{BooleanBuffer, MutableBuffer};
31use datatypes::arrow::datatypes::{DataType, Int64Type};
32use mito_codec::key_values::KeyValue;
33use smallvec::{smallvec, SmallVec};
34use snafu::{OptionExt, ResultExt};
35use store_api::metadata::RegionMetadataRef;
36
37use crate::error;
38use crate::error::{InvalidRequestSnafu, Result};
39use crate::memtable::bulk::part::BulkPart;
40use crate::memtable::version::SmallMemtableVec;
41use crate::memtable::{KeyValues, MemtableBuilderRef, MemtableId, MemtableRef};
42
43const INITIAL_TIME_WINDOW: Duration = Duration::from_days(1);
45
46#[derive(Debug, Clone)]
48pub struct TimePartition {
49 memtable: MemtableRef,
51 time_range: PartTimeRange,
53}
54
55impl TimePartition {
56 fn contains_timestamp(&self, ts: Timestamp) -> bool {
58 self.time_range.contains_timestamp(ts)
59 }
60
61 fn write(&self, kvs: &KeyValues) -> Result<()> {
63 self.memtable.write(kvs)
64 }
65
66 fn write_record_batch(&self, rb: BulkPart) -> Result<()> {
68 self.memtable.write_bulk(rb)
69 }
70
71 fn write_record_batch_partial(&self, part: &BulkPart) -> Result<()> {
73 let Some(filtered) = filter_record_batch(
74 part,
75 self.time_range.min_timestamp.value(),
76 self.time_range.max_timestamp.value(),
77 )?
78 else {
79 return Ok(());
80 };
81 self.write_record_batch(filtered)
82 }
83}
84
85macro_rules! create_filter_buffer {
86 ($ts_array:expr, $min:expr, $max:expr) => {{
87 let len = $ts_array.len();
88 let mut buffer = MutableBuffer::new(len.div_ceil(64) * 8);
89
90 let f = |idx: usize| -> bool {
91 unsafe {
93 let val = $ts_array.value_unchecked(idx);
94 val >= $min && val < $max
95 }
96 };
97
98 let chunks = len / 64;
99 let remainder = len % 64;
100
101 for chunk in 0..chunks {
102 let mut packed = 0;
103 for bit_idx in 0..64 {
104 let i = bit_idx + chunk * 64;
105 packed |= (f(i) as u64) << bit_idx;
106 }
107 unsafe { buffer.push_unchecked(packed) }
109 }
110
111 if remainder != 0 {
112 let mut packed = 0;
113 for bit_idx in 0..remainder {
114 let i = bit_idx + chunks * 64;
115 packed |= (f(i) as u64) << bit_idx;
116 }
117 unsafe { buffer.push_unchecked(packed) }
119 }
120
121 BooleanArray::new(BooleanBuffer::new(buffer.into(), 0, len), None)
122 }};
123}
124
125macro_rules! handle_timestamp_array {
126 ($ts_array:expr, $array_type:ty, $min:expr, $max:expr) => {{
127 let ts_array = $ts_array.as_any().downcast_ref::<$array_type>().unwrap();
128 let filter = create_filter_buffer!(ts_array, $min, $max);
129
130 let res = arrow::compute::filter(ts_array, &filter).context(error::ComputeArrowSnafu)?;
131 if res.is_empty() {
132 return Ok(None);
133 }
134
135 let i64array = res.as_any().downcast_ref::<$array_type>().unwrap();
136 let max_ts = arrow::compute::max(i64array).unwrap();
138 let min_ts = arrow::compute::min(i64array).unwrap();
139
140 (res, filter, min_ts, max_ts)
141 }};
142}
143
144pub fn filter_record_batch(part: &BulkPart, min: i64, max: i64) -> Result<Option<BulkPart>> {
147 let ts_array = part.timestamps();
148 let (ts_array, filter, min_ts, max_ts) = match ts_array.data_type() {
149 DataType::Timestamp(unit, _) => match unit {
150 arrow::datatypes::TimeUnit::Second => {
151 handle_timestamp_array!(ts_array, TimestampSecondArray, min, max)
152 }
153 arrow::datatypes::TimeUnit::Millisecond => {
154 handle_timestamp_array!(ts_array, TimestampMillisecondArray, min, max)
155 }
156 arrow::datatypes::TimeUnit::Microsecond => {
157 handle_timestamp_array!(ts_array, TimestampMicrosecondArray, min, max)
158 }
159 arrow::datatypes::TimeUnit::Nanosecond => {
160 handle_timestamp_array!(ts_array, TimestampNanosecondArray, min, max)
161 }
162 },
163 _ => {
164 unreachable!("Got data type: {:?}", ts_array.data_type());
165 }
166 };
167
168 let num_rows = ts_array.len();
169 let arrays = part
170 .batch
171 .columns()
172 .iter()
173 .enumerate()
174 .map(|(index, array)| {
175 if index == part.timestamp_index {
176 Ok(ts_array.clone())
177 } else {
178 arrow::compute::filter(&array, &filter).context(error::ComputeArrowSnafu)
179 }
180 })
181 .collect::<Result<Vec<_>>>()?;
182 let batch = RecordBatch::try_new_with_options(
183 part.batch.schema(),
184 arrays,
185 &RecordBatchOptions::default().with_row_count(Some(num_rows)),
186 )
187 .context(error::NewRecordBatchSnafu)?;
188 Ok(Some(BulkPart {
189 batch,
190 max_ts,
191 min_ts,
192 sequence: part.sequence,
193 timestamp_index: part.timestamp_index,
194 raw_data: None,
195 }))
196}
197
198type PartitionVec = SmallVec<[TimePartition; 2]>;
199
200#[derive(Debug)]
202pub struct TimePartitions {
203 inner: Mutex<PartitionsInner>,
205 part_duration: Duration,
207 metadata: RegionMetadataRef,
209 builder: MemtableBuilderRef,
211}
212
213pub type TimePartitionsRef = Arc<TimePartitions>;
214
215impl TimePartitions {
216 pub fn new(
218 metadata: RegionMetadataRef,
219 builder: MemtableBuilderRef,
220 next_memtable_id: MemtableId,
221 part_duration: Option<Duration>,
222 ) -> Self {
223 let inner = PartitionsInner::new(next_memtable_id);
224 Self {
225 inner: Mutex::new(inner),
226 part_duration: part_duration.unwrap_or(INITIAL_TIME_WINDOW),
227 metadata,
228 builder,
229 }
230 }
231
232 pub fn write(&self, kvs: &KeyValues) -> Result<()> {
236 let parts = self.list_partitions();
238
239 for part in parts.iter().rev() {
242 let mut all_in_partition = true;
243 for kv in kvs.iter() {
244 let ts = kv.timestamp().as_timestamp().unwrap().unwrap();
246 if !part.contains_timestamp(ts) {
247 all_in_partition = false;
248 break;
249 }
250 }
251 if !all_in_partition {
252 continue;
253 }
254
255 return part.write(kvs);
257 }
258
259 self.write_multi_parts(kvs, &parts)
261 }
262
263 pub fn write_bulk(&self, part: BulkPart) -> Result<()> {
264 let time_type = self
265 .metadata
266 .time_index_column()
267 .column_schema
268 .data_type
269 .as_timestamp()
270 .unwrap();
271
272 let parts = self.list_partitions();
274 let (matching_parts, missing_parts) = self.find_partitions_by_time_range(
275 part.timestamps(),
276 &parts,
277 time_type.create_timestamp(part.min_ts),
278 time_type.create_timestamp(part.max_ts),
279 )?;
280
281 if matching_parts.len() == 1 && missing_parts.is_empty() {
282 return matching_parts[0].write_record_batch(part);
284 }
285
286 for matching in matching_parts {
287 matching.write_record_batch_partial(&part)?
288 }
289
290 for missing in missing_parts {
291 let new_part = {
292 let mut inner = self.inner.lock().unwrap();
293 self.get_or_create_time_partition(missing, &mut inner)?
294 };
295 new_part.write_record_batch_partial(&part)?;
296 }
297 Ok(())
298 }
299
300 fn get_or_create_time_partition(
303 &self,
304 part_start: Timestamp,
305 inner: &mut MutexGuard<PartitionsInner>,
306 ) -> Result<TimePartition> {
307 let part_pos = match inner
308 .parts
309 .iter()
310 .position(|part| part.time_range.min_timestamp == part_start)
311 {
312 Some(pos) => pos,
313 None => {
314 let range = PartTimeRange::from_start_duration(part_start, self.part_duration)
315 .with_context(|| InvalidRequestSnafu {
316 region_id: self.metadata.region_id,
317 reason: format!(
318 "Partition time range for {part_start:?} is out of bound, bucket size: {:?}", self.part_duration
319 ),
320 })?;
321 let memtable = self
322 .builder
323 .build(inner.alloc_memtable_id(), &self.metadata);
324 debug!(
325 "Create time partition {:?} for region {}, duration: {:?}, memtable_id: {}, parts_total: {}",
326 range,
327 self.metadata.region_id,
328 self.part_duration,
329 memtable.id(),
330 inner.parts.len() + 1
331 );
332 let pos = inner.parts.len();
333 inner.parts.push(TimePartition {
334 memtable,
335 time_range: range,
336 });
337 pos
338 }
339 };
340 Ok(inner.parts[part_pos].clone())
341 }
342
343 pub fn list_memtables(&self, memtables: &mut Vec<MemtableRef>) {
345 let inner = self.inner.lock().unwrap();
346 memtables.extend(inner.parts.iter().map(|part| part.memtable.clone()));
347 }
348
349 pub fn num_partitions(&self) -> usize {
351 let inner = self.inner.lock().unwrap();
352 inner.parts.len()
353 }
354
355 pub fn is_empty(&self) -> bool {
357 let inner = self.inner.lock().unwrap();
358 inner.parts.iter().all(|part| part.memtable.is_empty())
359 }
360
361 pub fn freeze(&self) -> Result<()> {
363 let inner = self.inner.lock().unwrap();
364 for part in &*inner.parts {
365 part.memtable.freeze()?;
366 }
367 Ok(())
368 }
369
370 pub fn fork(&self, metadata: &RegionMetadataRef, part_duration: Option<Duration>) -> Self {
372 let part_duration = part_duration.unwrap_or(self.part_duration);
374
375 let mut inner = self.inner.lock().unwrap();
376 let latest_part = inner
377 .parts
378 .iter()
379 .max_by_key(|part| part.time_range.min_timestamp)
380 .cloned();
381
382 let Some(old_part) = latest_part else {
383 return Self::new(
385 metadata.clone(),
386 self.builder.clone(),
387 inner.next_memtable_id,
388 Some(part_duration),
389 );
390 };
391
392 let old_stats = old_part.memtable.stats();
393 let partitions_inner = old_stats
395 .time_range()
396 .and_then(|(_, old_stats_end_timestamp)| {
397 partition_start_timestamp(old_stats_end_timestamp, part_duration)
398 .and_then(|start| PartTimeRange::from_start_duration(start, part_duration))
399 })
400 .map(|part_time_range| {
401 let memtable = old_part.memtable.fork(inner.alloc_memtable_id(), metadata);
403 let part = TimePartition {
404 memtable,
405 time_range: part_time_range,
406 };
407 PartitionsInner::with_partition(part, inner.next_memtable_id)
408 })
409 .unwrap_or_else(|| PartitionsInner::new(inner.next_memtable_id));
410
411 Self {
412 inner: Mutex::new(partitions_inner),
413 part_duration,
414 metadata: metadata.clone(),
415 builder: self.builder.clone(),
416 }
417 }
418
419 pub(crate) fn part_duration(&self) -> Duration {
421 self.part_duration
422 }
423
424 pub(crate) fn memory_usage(&self) -> usize {
426 let inner = self.inner.lock().unwrap();
427 inner
428 .parts
429 .iter()
430 .map(|part| part.memtable.stats().estimated_bytes)
431 .sum()
432 }
433
434 pub(crate) fn num_rows(&self) -> u64 {
436 let inner = self.inner.lock().unwrap();
437 inner
438 .parts
439 .iter()
440 .map(|part| part.memtable.stats().num_rows as u64)
441 .sum()
442 }
443
444 pub(crate) fn list_memtables_to_small_vec(&self, memtables: &mut SmallMemtableVec) {
446 let inner = self.inner.lock().unwrap();
447 memtables.extend(inner.parts.iter().map(|part| part.memtable.clone()));
448 }
449
450 pub(crate) fn next_memtable_id(&self) -> MemtableId {
452 let inner = self.inner.lock().unwrap();
453 inner.next_memtable_id
454 }
455
456 pub(crate) fn new_with_part_duration(&self, part_duration: Option<Duration>) -> Self {
459 debug_assert!(self.is_empty());
460
461 Self::new(
462 self.metadata.clone(),
463 self.builder.clone(),
464 self.next_memtable_id(),
465 Some(part_duration.unwrap_or(self.part_duration)),
466 )
467 }
468
469 fn list_partitions(&self) -> PartitionVec {
471 let inner = self.inner.lock().unwrap();
472 inner.parts.clone()
473 }
474
475 fn find_partitions_by_time_range<'a>(
478 &self,
479 ts_array: &ArrayRef,
480 existing_parts: &'a [TimePartition],
481 min: Timestamp,
482 max: Timestamp,
483 ) -> Result<(Vec<&'a TimePartition>, Vec<Timestamp>)> {
484 let mut matching = Vec::new();
485
486 let mut present = HashSet::new();
487 for part in existing_parts {
489 let part_time_range = &part.time_range;
490 if !(max < part_time_range.min_timestamp || min >= part_time_range.max_timestamp) {
491 matching.push(part);
492 present.insert(part_time_range.min_timestamp.value());
493 }
494 }
495
496 let part_duration = self.part_duration_or_default();
498 let timestamp_unit = self.metadata.time_index_type().unit();
499
500 let part_duration_sec = part_duration.as_secs() as i64;
501 let start_bucket = min
503 .convert_to(TimeUnit::Second)
504 .unwrap()
505 .value()
506 .div_euclid(part_duration_sec);
507 let end_bucket = max
508 .convert_to(TimeUnit::Second)
509 .unwrap()
510 .value()
511 .div_euclid(part_duration_sec);
512 let bucket_num = (end_bucket - start_bucket + 1) as usize;
513
514 let num_timestamps = ts_array.len();
515 let missing = if bucket_num <= num_timestamps {
516 (start_bucket..=end_bucket)
517 .filter_map(|start_sec| {
518 let Some(timestamp) = Timestamp::new_second(start_sec * part_duration_sec)
519 .convert_to(timestamp_unit)
520 else {
521 return Some(
522 InvalidRequestSnafu {
523 region_id: self.metadata.region_id,
524 reason: format!("Timestamp out of range: {}", start_sec),
525 }
526 .fail(),
527 );
528 };
529 if present.insert(timestamp.value()) {
530 Some(Ok(timestamp))
531 } else {
532 None
533 }
534 })
535 .collect::<Result<Vec<_>>>()?
536 } else {
537 let ts_primitive = match ts_array.data_type() {
538 DataType::Timestamp(unit, _) => match unit {
539 arrow::datatypes::TimeUnit::Second => ts_array
540 .as_any()
541 .downcast_ref::<TimestampSecondArray>()
542 .unwrap()
543 .reinterpret_cast::<Int64Type>(),
544 arrow::datatypes::TimeUnit::Millisecond => ts_array
545 .as_any()
546 .downcast_ref::<TimestampMillisecondArray>()
547 .unwrap()
548 .reinterpret_cast::<Int64Type>(),
549 arrow::datatypes::TimeUnit::Microsecond => ts_array
550 .as_any()
551 .downcast_ref::<TimestampMicrosecondArray>()
552 .unwrap()
553 .reinterpret_cast::<Int64Type>(),
554 arrow::datatypes::TimeUnit::Nanosecond => ts_array
555 .as_any()
556 .downcast_ref::<TimestampNanosecondArray>()
557 .unwrap()
558 .reinterpret_cast::<Int64Type>(),
559 },
560 _ => unreachable!(),
561 };
562
563 ts_primitive
564 .values()
565 .iter()
566 .filter_map(|ts| {
567 let ts = self.metadata.time_index_type().create_timestamp(*ts);
568 let Some(bucket_start) = ts
569 .convert_to(TimeUnit::Second)
570 .and_then(|ts| ts.align_by_bucket(part_duration_sec))
571 .and_then(|ts| ts.convert_to(timestamp_unit))
572 else {
573 return Some(
574 InvalidRequestSnafu {
575 region_id: self.metadata.region_id,
576 reason: format!("Timestamp out of range: {:?}", ts),
577 }
578 .fail(),
579 );
580 };
581 if present.insert(bucket_start.value()) {
582 Some(Ok(bucket_start))
583 } else {
584 None
585 }
586 })
587 .collect::<Result<Vec<_>>>()?
588 };
589 Ok((matching, missing))
590 }
591
592 fn part_duration_or_default(&self) -> Duration {
594 self.part_duration
595 }
596
597 fn write_multi_parts(&self, kvs: &KeyValues, parts: &PartitionVec) -> Result<()> {
599 let mut parts_to_write = HashMap::new();
600 let mut missing_parts = HashMap::new();
601 for kv in kvs.iter() {
602 let mut part_found = false;
603 let ts = kv.timestamp().as_timestamp().unwrap().unwrap();
605 for part in parts {
606 if part.contains_timestamp(ts) {
607 parts_to_write
608 .entry(part.time_range.min_timestamp)
609 .or_insert_with(|| PartitionToWrite {
610 partition: part.clone(),
611 key_values: Vec::new(),
612 })
613 .key_values
614 .push(kv);
615 part_found = true;
616 break;
617 }
618 }
619
620 if !part_found {
621 let part_duration = self.part_duration_or_default();
624 let part_start =
625 partition_start_timestamp(ts, part_duration).with_context(|| {
626 InvalidRequestSnafu {
627 region_id: self.metadata.region_id,
628 reason: format!(
629 "timestamp {ts:?} and bucket {part_duration:?} are out of range"
630 ),
631 }
632 })?;
633 missing_parts
634 .entry(part_start)
635 .or_insert_with(Vec::new)
636 .push(kv);
637 }
638 }
639
640 for part_to_write in parts_to_write.into_values() {
642 for kv in part_to_write.key_values {
643 part_to_write.partition.memtable.write_one(kv)?;
644 }
645 }
646
647 let mut inner = self.inner.lock().unwrap();
650 for (part_start, key_values) in missing_parts {
651 let partition = self.get_or_create_time_partition(part_start, &mut inner)?;
652 for kv in key_values {
653 partition.memtable.write_one(kv)?;
654 }
655 }
656
657 Ok(())
658 }
659
660 pub(crate) fn series_count(&self) -> usize {
662 self.inner.lock().unwrap().series_count()
663 }
664}
665
666fn partition_start_timestamp(ts: Timestamp, bucket: Duration) -> Option<Timestamp> {
670 let ts_sec = ts.convert_to(TimeUnit::Second).unwrap();
672 let bucket_sec: i64 = bucket.as_secs().try_into().ok()?;
673 let start_sec = ts_sec.align_by_bucket(bucket_sec)?;
674 start_sec.convert_to(ts.unit())
675}
676
677#[derive(Debug)]
678struct PartitionsInner {
679 parts: PartitionVec,
681 next_memtable_id: MemtableId,
683}
684
685impl PartitionsInner {
686 fn new(next_memtable_id: MemtableId) -> Self {
687 Self {
688 parts: Default::default(),
689 next_memtable_id,
690 }
691 }
692
693 fn with_partition(part: TimePartition, next_memtable_id: MemtableId) -> Self {
694 Self {
695 parts: smallvec![part],
696 next_memtable_id,
697 }
698 }
699
700 fn alloc_memtable_id(&mut self) -> MemtableId {
701 let id = self.next_memtable_id;
702 self.next_memtable_id += 1;
703 id
704 }
705
706 pub(crate) fn series_count(&self) -> usize {
707 self.parts
708 .iter()
709 .map(|p| p.memtable.stats().series_count)
710 .sum()
711 }
712}
713
714#[derive(Debug, Clone, Copy)]
716struct PartTimeRange {
717 min_timestamp: Timestamp,
719 max_timestamp: Timestamp,
721}
722
723impl PartTimeRange {
724 fn from_start_duration(start: Timestamp, duration: Duration) -> Option<Self> {
725 let start_sec = start.convert_to(TimeUnit::Second)?;
726 let end_sec = start_sec.add_duration(duration).ok()?;
727 let min_timestamp = start_sec.convert_to(start.unit())?;
728 let max_timestamp = end_sec.convert_to(start.unit())?;
729
730 Some(Self {
731 min_timestamp,
732 max_timestamp,
733 })
734 }
735
736 fn contains_timestamp(&self, ts: Timestamp) -> bool {
738 self.min_timestamp <= ts && ts < self.max_timestamp
739 }
740}
741
742struct PartitionToWrite<'a> {
743 partition: TimePartition,
744 key_values: Vec<KeyValue<'a>>,
745}
746
747#[cfg(test)]
748mod tests {
749 use std::sync::Arc;
750
751 use api::v1::SemanticType;
752 use datatypes::arrow::array::{ArrayRef, StringArray, TimestampMillisecondArray};
753 use datatypes::arrow::datatypes::{DataType, Field, Schema};
754 use datatypes::arrow::record_batch::RecordBatch;
755 use datatypes::prelude::ConcreteDataType;
756 use datatypes::schema::ColumnSchema;
757 use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
758 use store_api::storage::SequenceNumber;
759
760 use super::*;
761 use crate::memtable::partition_tree::PartitionTreeMemtableBuilder;
762 use crate::memtable::time_series::TimeSeriesMemtableBuilder;
763 use crate::test_util::memtable_util::{self, collect_iter_timestamps};
764
765 #[test]
766 fn test_no_duration() {
767 let metadata = memtable_util::metadata_for_test();
768 let builder = Arc::new(PartitionTreeMemtableBuilder::default());
769 let partitions = TimePartitions::new(metadata.clone(), builder, 0, None);
770 assert_eq!(0, partitions.num_partitions());
771 assert!(partitions.is_empty());
772
773 let kvs = memtable_util::build_key_values(
774 &metadata,
775 "hello".to_string(),
776 0,
777 &[1000, 3000, 7000, 5000, 6000],
778 0, );
780 partitions.write(&kvs).unwrap();
781
782 assert_eq!(1, partitions.num_partitions());
783 assert!(!partitions.is_empty());
784 let mut memtables = Vec::new();
785 partitions.list_memtables(&mut memtables);
786 assert_eq!(0, memtables[0].id());
787
788 let iter = memtables[0].iter(None, None, None).unwrap();
789 let timestamps = collect_iter_timestamps(iter);
790 assert_eq!(&[1000, 3000, 5000, 6000, 7000], ×tamps[..]);
791 }
792
793 #[test]
794 fn test_write_single_part() {
795 let metadata = memtable_util::metadata_for_test();
796 let builder = Arc::new(PartitionTreeMemtableBuilder::default());
797 let partitions =
798 TimePartitions::new(metadata.clone(), builder, 0, Some(Duration::from_secs(10)));
799 assert_eq!(0, partitions.num_partitions());
800
801 let kvs = memtable_util::build_key_values(
802 &metadata,
803 "hello".to_string(),
804 0,
805 &[5000, 2000, 0],
806 0, );
808 partitions.write(&kvs).unwrap();
810 assert_eq!(1, partitions.num_partitions());
811 assert!(!partitions.is_empty());
812
813 let kvs = memtable_util::build_key_values(
814 &metadata,
815 "hello".to_string(),
816 0,
817 &[3000, 7000, 4000],
818 3, );
820 partitions.write(&kvs).unwrap();
822 assert_eq!(1, partitions.num_partitions());
823
824 let mut memtables = Vec::new();
825 partitions.list_memtables(&mut memtables);
826 let iter = memtables[0].iter(None, None, None).unwrap();
827 let timestamps = collect_iter_timestamps(iter);
828 assert_eq!(&[0, 2000, 3000, 4000, 5000, 7000], ×tamps[..]);
829 let parts = partitions.list_partitions();
830 assert_eq!(
831 Timestamp::new_millisecond(0),
832 parts[0].time_range.min_timestamp
833 );
834 assert_eq!(
835 Timestamp::new_millisecond(10000),
836 parts[0].time_range.max_timestamp
837 );
838 }
839
840 #[cfg(test)]
841 fn new_multi_partitions(metadata: &RegionMetadataRef) -> TimePartitions {
842 let builder = Arc::new(PartitionTreeMemtableBuilder::default());
843 let partitions =
844 TimePartitions::new(metadata.clone(), builder, 0, Some(Duration::from_secs(5)));
845 assert_eq!(0, partitions.num_partitions());
846
847 let kvs = memtable_util::build_key_values(
848 metadata,
849 "hello".to_string(),
850 0,
851 &[2000, 0],
852 0, );
854 partitions.write(&kvs).unwrap();
856 assert_eq!(1, partitions.num_partitions());
857 assert!(!partitions.is_empty());
858
859 let kvs = memtable_util::build_key_values(
860 metadata,
861 "hello".to_string(),
862 0,
863 &[3000, 7000, 4000, 5000],
864 2, );
866 partitions.write(&kvs).unwrap();
868 assert_eq!(2, partitions.num_partitions());
869
870 partitions
871 }
872
873 #[test]
874 fn test_write_multi_parts() {
875 let metadata = memtable_util::metadata_for_test();
876 let partitions = new_multi_partitions(&metadata);
877
878 let parts = partitions.list_partitions();
879 let iter = parts[0].memtable.iter(None, None, None).unwrap();
880 let timestamps = collect_iter_timestamps(iter);
881 assert_eq!(0, parts[0].memtable.id());
882 assert_eq!(
883 Timestamp::new_millisecond(0),
884 parts[0].time_range.min_timestamp
885 );
886 assert_eq!(
887 Timestamp::new_millisecond(5000),
888 parts[0].time_range.max_timestamp
889 );
890 assert_eq!(&[0, 2000, 3000, 4000], ×tamps[..]);
891 let iter = parts[1].memtable.iter(None, None, None).unwrap();
892 assert_eq!(1, parts[1].memtable.id());
893 let timestamps = collect_iter_timestamps(iter);
894 assert_eq!(&[5000, 7000], ×tamps[..]);
895 assert_eq!(
896 Timestamp::new_millisecond(5000),
897 parts[1].time_range.min_timestamp
898 );
899 assert_eq!(
900 Timestamp::new_millisecond(10000),
901 parts[1].time_range.max_timestamp
902 );
903 }
904
905 #[test]
906 fn test_new_with_part_duration() {
907 let metadata = memtable_util::metadata_for_test();
908 let builder = Arc::new(PartitionTreeMemtableBuilder::default());
909 let partitions = TimePartitions::new(metadata.clone(), builder.clone(), 0, None);
910
911 let new_parts = partitions.new_with_part_duration(Some(Duration::from_secs(5)));
912 assert_eq!(Duration::from_secs(5), new_parts.part_duration());
913 assert_eq!(0, new_parts.next_memtable_id());
914
915 let new_parts = new_parts.new_with_part_duration(None);
917 assert_eq!(Duration::from_secs(5), new_parts.part_duration());
918 assert_eq!(0, new_parts.next_memtable_id());
920
921 let new_parts = new_parts.new_with_part_duration(Some(Duration::from_secs(10)));
922 assert_eq!(Duration::from_secs(10), new_parts.part_duration());
923 assert_eq!(0, new_parts.next_memtable_id());
925
926 let builder = Arc::new(PartitionTreeMemtableBuilder::default());
927 let partitions = TimePartitions::new(metadata.clone(), builder.clone(), 0, None);
928 let new_parts = partitions.new_with_part_duration(None);
930 assert_eq!(INITIAL_TIME_WINDOW, new_parts.part_duration());
931 assert_eq!(0, new_parts.next_memtable_id());
932 }
933
934 #[test]
935 fn test_fork_empty() {
936 let metadata = memtable_util::metadata_for_test();
937 let builder = Arc::new(PartitionTreeMemtableBuilder::default());
938 let partitions = TimePartitions::new(metadata.clone(), builder, 0, None);
939 partitions.freeze().unwrap();
940 let new_parts = partitions.fork(&metadata, None);
941 assert_eq!(INITIAL_TIME_WINDOW, new_parts.part_duration());
942 assert!(new_parts.list_partitions().is_empty());
943 assert_eq!(0, new_parts.next_memtable_id());
944
945 new_parts.freeze().unwrap();
946 let new_parts = new_parts.fork(&metadata, Some(Duration::from_secs(5)));
947 assert_eq!(Duration::from_secs(5), new_parts.part_duration());
948 assert!(new_parts.list_partitions().is_empty());
949 assert_eq!(0, new_parts.next_memtable_id());
950
951 new_parts.freeze().unwrap();
952 let new_parts = new_parts.fork(&metadata, None);
953 assert_eq!(Duration::from_secs(5), new_parts.part_duration());
955 assert!(new_parts.list_partitions().is_empty());
956 assert_eq!(0, new_parts.next_memtable_id());
957
958 new_parts.freeze().unwrap();
959 let new_parts = new_parts.fork(&metadata, Some(Duration::from_secs(10)));
960 assert_eq!(Duration::from_secs(10), new_parts.part_duration());
961 assert!(new_parts.list_partitions().is_empty());
962 assert_eq!(0, new_parts.next_memtable_id());
963 }
964
965 #[test]
966 fn test_fork_non_empty_none() {
967 let metadata = memtable_util::metadata_for_test();
968 let partitions = new_multi_partitions(&metadata);
969 partitions.freeze().unwrap();
970
971 let new_parts = partitions.fork(&metadata, None);
973 assert!(new_parts.is_empty());
974 assert_eq!(Duration::from_secs(5), new_parts.part_duration());
975 assert_eq!(2, new_parts.list_partitions()[0].memtable.id());
976 assert_eq!(3, new_parts.next_memtable_id());
977
978 let new_parts = partitions.fork(&metadata, Some(Duration::from_secs(10)));
980 assert!(new_parts.is_empty());
981 assert_eq!(Duration::from_secs(10), new_parts.part_duration());
982 assert_eq!(3, new_parts.list_partitions()[0].memtable.id());
983 assert_eq!(4, new_parts.next_memtable_id());
984 }
985
986 #[test]
987 fn test_find_partitions_by_time_range() {
988 let metadata = memtable_util::metadata_for_test();
989 let builder = Arc::new(PartitionTreeMemtableBuilder::default());
990
991 let partitions = TimePartitions::new(metadata.clone(), builder.clone(), 0, None);
993 let parts = partitions.list_partitions();
994 let (matching, missing) = partitions
995 .find_partitions_by_time_range(
996 &(Arc::new(TimestampMillisecondArray::from_iter_values(1000..=2000)) as ArrayRef),
997 &parts,
998 Timestamp::new_millisecond(1000),
999 Timestamp::new_millisecond(2000),
1000 )
1001 .unwrap();
1002 assert_eq!(matching.len(), 0);
1003 assert_eq!(missing.len(), 1);
1004 assert_eq!(missing[0], Timestamp::new_millisecond(0));
1005
1006 let partitions = TimePartitions::new(
1008 metadata.clone(),
1009 builder.clone(),
1010 0,
1011 Some(Duration::from_secs(5)),
1012 );
1013
1014 let kvs =
1016 memtable_util::build_key_values(&metadata, "hello".to_string(), 0, &[2000, 4000], 0);
1017 partitions.write(&kvs).unwrap();
1018 let kvs =
1019 memtable_util::build_key_values(&metadata, "hello".to_string(), 0, &[7000, 8000], 2);
1020 partitions.write(&kvs).unwrap();
1021
1022 let parts = partitions.list_partitions();
1023 assert_eq!(2, parts.len());
1024
1025 let (matching, missing) = partitions
1027 .find_partitions_by_time_range(
1028 &(Arc::new(TimestampMillisecondArray::from_iter_values(2000..=4000)) as ArrayRef),
1029 &parts,
1030 Timestamp::new_millisecond(2000),
1031 Timestamp::new_millisecond(4000),
1032 )
1033 .unwrap();
1034 assert_eq!(matching.len(), 1);
1035 assert!(missing.is_empty());
1036 assert_eq!(matching[0].time_range.min_timestamp.value(), 0);
1037
1038 let (matching, missing) = partitions
1040 .find_partitions_by_time_range(
1041 &(Arc::new(TimestampMillisecondArray::from_iter_values(3000..=8000)) as ArrayRef),
1042 &parts,
1043 Timestamp::new_millisecond(3000),
1044 Timestamp::new_millisecond(8000),
1045 )
1046 .unwrap();
1047 assert_eq!(matching.len(), 2);
1048 assert!(missing.is_empty());
1049 assert_eq!(matching[0].time_range.min_timestamp.value(), 0);
1050 assert_eq!(matching[1].time_range.min_timestamp.value(), 5000);
1051
1052 let (matching, missing) = partitions
1054 .find_partitions_by_time_range(
1055 &(Arc::new(TimestampMillisecondArray::from_iter_values(12000..=13000)) as ArrayRef),
1056 &parts,
1057 Timestamp::new_millisecond(12000),
1058 Timestamp::new_millisecond(13000),
1059 )
1060 .unwrap();
1061 assert!(matching.is_empty());
1062 assert_eq!(missing.len(), 1);
1063 assert_eq!(missing[0].value(), 10000);
1064
1065 let (matching, missing) = partitions
1067 .find_partitions_by_time_range(
1068 &(Arc::new(TimestampMillisecondArray::from_iter_values(4000..=6000)) as ArrayRef),
1069 &parts,
1070 Timestamp::new_millisecond(4000),
1071 Timestamp::new_millisecond(6000),
1072 )
1073 .unwrap();
1074 assert_eq!(matching.len(), 2);
1075 assert!(missing.is_empty());
1076 assert_eq!(matching[0].time_range.min_timestamp.value(), 0);
1077 assert_eq!(matching[1].time_range.min_timestamp.value(), 5000);
1078
1079 let (matching, missing) = partitions
1081 .find_partitions_by_time_range(
1082 &(Arc::new(TimestampMillisecondArray::from_iter_values(4999..=5000)) as ArrayRef),
1083 &parts,
1084 Timestamp::new_millisecond(4999),
1085 Timestamp::new_millisecond(5000),
1086 )
1087 .unwrap();
1088 assert_eq!(matching.len(), 2);
1089 assert!(missing.is_empty());
1090 assert_eq!(matching[0].time_range.min_timestamp.value(), 0);
1091 assert_eq!(matching[1].time_range.min_timestamp.value(), 5000);
1092
1093 let (matching, missing) = partitions
1095 .find_partitions_by_time_range(
1096 &(Arc::new(TimestampMillisecondArray::from_iter_values(9999..=10000)) as ArrayRef),
1097 &parts,
1098 Timestamp::new_millisecond(9999),
1099 Timestamp::new_millisecond(10000),
1100 )
1101 .unwrap();
1102 assert_eq!(matching.len(), 1);
1103 assert_eq!(1, missing.len());
1104 assert_eq!(matching[0].time_range.min_timestamp.value(), 5000);
1105 assert_eq!(missing[0].value(), 10000);
1106
1107 let (matching, missing) = partitions
1109 .find_partitions_by_time_range(
1110 &(Arc::new(TimestampMillisecondArray::from_iter_values(-1000..=1000)) as ArrayRef),
1111 &parts,
1112 Timestamp::new_millisecond(-1000),
1113 Timestamp::new_millisecond(1000),
1114 )
1115 .unwrap();
1116 assert_eq!(matching.len(), 1);
1117 assert_eq!(matching[0].time_range.min_timestamp.value(), 0);
1118 assert_eq!(1, missing.len());
1119 assert_eq!(missing[0].value(), -5000);
1120
1121 let (matching, missing) = partitions
1123 .find_partitions_by_time_range(
1124 &(Arc::new(TimestampMillisecondArray::from(vec![
1125 -100000000000,
1126 0,
1127 100000000000,
1128 ])) as ArrayRef),
1129 &parts,
1130 Timestamp::new_millisecond(-100000000000),
1131 Timestamp::new_millisecond(100000000000),
1132 )
1133 .unwrap();
1134 assert_eq!(2, matching.len());
1135 assert_eq!(matching[0].time_range.min_timestamp.value(), 0);
1136 assert_eq!(matching[1].time_range.min_timestamp.value(), 5000);
1137 assert_eq!(2, missing.len());
1138 assert_eq!(missing[0].value(), -100000000000);
1139 assert_eq!(missing[1].value(), 100000000000);
1140 }
1141
1142 fn build_part(ts: &[i64], sequence: SequenceNumber) -> BulkPart {
1143 let schema = Arc::new(Schema::new(vec![
1144 Field::new(
1145 "ts",
1146 DataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
1147 false,
1148 ),
1149 Field::new("val", DataType::Utf8, true),
1150 ]));
1151 let ts_data = ts.to_vec();
1152 let ts_array = Arc::new(TimestampMillisecondArray::from(ts_data));
1153 let val_array = Arc::new(StringArray::from_iter_values(
1154 ts.iter().map(|v| v.to_string()),
1155 ));
1156 let batch = RecordBatch::try_new(
1157 schema,
1158 vec![ts_array.clone() as ArrayRef, val_array.clone() as ArrayRef],
1159 )
1160 .unwrap();
1161 let max_ts = ts.iter().max().copied().unwrap();
1162 let min_ts = ts.iter().min().copied().unwrap();
1163 BulkPart {
1164 batch,
1165 max_ts,
1166 min_ts,
1167 sequence,
1168 timestamp_index: 0,
1169 raw_data: None,
1170 }
1171 }
1172
1173 #[test]
1174 fn test_write_bulk() {
1175 let mut metadata_builder = RegionMetadataBuilder::new(0.into());
1176 metadata_builder
1177 .push_column_metadata(ColumnMetadata {
1178 column_schema: ColumnSchema::new(
1179 "ts",
1180 ConcreteDataType::timestamp_millisecond_datatype(),
1181 false,
1182 ),
1183 semantic_type: SemanticType::Timestamp,
1184 column_id: 0,
1185 })
1186 .push_column_metadata(ColumnMetadata {
1187 column_schema: ColumnSchema::new("val", ConcreteDataType::string_datatype(), false),
1188 semantic_type: SemanticType::Field,
1189 column_id: 1,
1190 })
1191 .primary_key(vec![]);
1192 let metadata = Arc::new(metadata_builder.build().unwrap());
1193
1194 let builder = Arc::new(TimeSeriesMemtableBuilder::default());
1195 let partitions = TimePartitions::new(
1196 metadata.clone(),
1197 builder.clone(),
1198 0,
1199 Some(Duration::from_secs(5)),
1200 );
1201
1202 partitions
1204 .write_bulk(build_part(&[1000, 2000, 3000], 0))
1205 .unwrap();
1206
1207 let parts = partitions.list_partitions();
1208 assert_eq!(1, parts.len());
1209 let iter = parts[0].memtable.iter(None, None, None).unwrap();
1210 let timestamps = collect_iter_timestamps(iter);
1211 assert_eq!(&[1000, 2000, 3000], ×tamps[..]);
1212
1213 partitions
1215 .write_bulk(build_part(&[4000, 5000, 6000], 1))
1216 .unwrap();
1217 let parts = partitions.list_partitions();
1218 assert_eq!(2, parts.len());
1219 let iter = parts[0].memtable.iter(None, None, None).unwrap();
1221 let timestamps = collect_iter_timestamps(iter);
1222 assert_eq!(&[1000, 2000, 3000, 4000], ×tamps[..]);
1223 let iter = parts[1].memtable.iter(None, None, None).unwrap();
1225 let timestamps = collect_iter_timestamps(iter);
1226 assert_eq!(&[5000, 6000], ×tamps[..]);
1227
1228 partitions
1230 .write_bulk(build_part(&[11000, 12000], 3))
1231 .unwrap();
1232
1233 let parts = partitions.list_partitions();
1234 assert_eq!(3, parts.len());
1235
1236 let iter = parts[2].memtable.iter(None, None, None).unwrap();
1238 let timestamps = collect_iter_timestamps(iter);
1239 assert_eq!(&[11000, 12000], ×tamps[..]);
1240
1241 let partitions = TimePartitions::new(metadata.clone(), builder, 3, None);
1243
1244 partitions
1245 .write_bulk(build_part(&[1000, 5000, 9000], 4))
1246 .unwrap();
1247
1248 let parts = partitions.list_partitions();
1249 assert_eq!(1, parts.len());
1250 let iter = parts[0].memtable.iter(None, None, None).unwrap();
1251 let timestamps = collect_iter_timestamps(iter);
1252 assert_eq!(&[1000, 5000, 9000], ×tamps[..]);
1253 }
1254
1255 #[test]
1256 fn test_split_record_batch() {
1257 let schema = Arc::new(Schema::new(vec![
1258 Field::new(
1259 "ts",
1260 DataType::Timestamp(TimeUnit::Millisecond.as_arrow_time_unit(), None),
1261 false,
1262 ),
1263 Field::new("val", DataType::Utf8, true),
1264 ]));
1265
1266 let ts_array = Arc::new(TimestampMillisecondArray::from(vec![
1267 1000, 2000, 5000, 7000, 8000,
1268 ]));
1269 let val_array = Arc::new(StringArray::from(vec!["a", "b", "c", "d", "e"]));
1270 let batch = RecordBatch::try_new(
1271 schema.clone(),
1272 vec![ts_array as ArrayRef, val_array as ArrayRef],
1273 )
1274 .unwrap();
1275
1276 let part = BulkPart {
1277 batch,
1278 max_ts: 8000,
1279 min_ts: 1000,
1280 sequence: 0,
1281 timestamp_index: 0,
1282 raw_data: None,
1283 };
1284
1285 let result = filter_record_batch(&part, 1000, 2000).unwrap();
1286 assert!(result.is_some());
1287 let filtered = result.unwrap();
1288 assert_eq!(filtered.num_rows(), 1);
1289 assert_eq!(filtered.min_ts, 1000);
1290 assert_eq!(filtered.max_ts, 1000);
1291
1292 let result = filter_record_batch(&part, 3000, 6000).unwrap();
1294 assert!(result.is_some());
1295 let filtered = result.unwrap();
1296 assert_eq!(filtered.num_rows(), 1);
1297 assert_eq!(filtered.min_ts, 5000);
1298 assert_eq!(filtered.max_ts, 5000);
1299
1300 let result = filter_record_batch(&part, 3000, 4000).unwrap();
1302 assert!(result.is_none());
1303
1304 let result = filter_record_batch(&part, 0, 9000).unwrap();
1306 assert!(result.is_some());
1307 let filtered = result.unwrap();
1308 assert_eq!(filtered.num_rows(), 5);
1309 assert_eq!(filtered.min_ts, 1000);
1310 assert_eq!(filtered.max_ts, 8000);
1311 }
1312}