1use std::collections::{HashMap, HashSet};
18use std::sync::{Arc, Mutex, MutexGuard};
19use std::time::Duration;
20
21use common_telemetry::debug;
22use common_time::Timestamp;
23use common_time::timestamp::TimeUnit;
24use common_time::timestamp_millis::BucketAligned;
25use datatypes::arrow;
26use datatypes::arrow::array::{
27 ArrayRef, BooleanArray, RecordBatch, RecordBatchOptions, TimestampMicrosecondArray,
28 TimestampMillisecondArray, TimestampNanosecondArray, TimestampSecondArray,
29};
30use datatypes::arrow::buffer::{BooleanBuffer, MutableBuffer};
31use datatypes::arrow::datatypes::{DataType, Int64Type, SchemaRef};
32use mito_codec::key_values::KeyValue;
33use mito_codec::row_converter::{PrimaryKeyCodec, build_primary_key_codec};
34use smallvec::{SmallVec, smallvec};
35use snafu::{OptionExt, ResultExt};
36use store_api::metadata::RegionMetadataRef;
37
38use crate::error;
39use crate::error::{InvalidRequestSnafu, Result};
40use crate::memtable::bulk::part::{BulkPart, BulkPartConverter};
41use crate::memtable::version::SmallMemtableVec;
42use crate::memtable::{KeyValues, MemtableBuilderRef, MemtableId, MemtableRef};
43use crate::sst::{FlatSchemaOptions, to_flat_sst_arrow_schema};
44
45const INITIAL_TIME_WINDOW: Duration = Duration::from_days(1);
47
48#[derive(Debug, Clone)]
50pub struct TimePartition {
51 memtable: MemtableRef,
53 time_range: PartTimeRange,
55}
56
57impl TimePartition {
58 fn contains_timestamp(&self, ts: Timestamp) -> bool {
60 self.time_range.contains_timestamp(ts)
61 }
62
63 fn write(&self, kvs: &KeyValues) -> Result<()> {
65 self.memtable.write(kvs)
66 }
67
68 fn write_record_batch(&self, rb: BulkPart) -> Result<()> {
70 self.memtable.write_bulk(rb)
71 }
72
73 fn write_record_batch_partial(&self, part: &BulkPart) -> Result<()> {
75 let Some(filtered) = filter_record_batch(
76 part,
77 self.time_range.min_timestamp.value(),
78 self.time_range.max_timestamp.value(),
79 )?
80 else {
81 return Ok(());
82 };
83 self.write_record_batch(filtered)
84 }
85}
86
87macro_rules! create_filter_buffer {
88 ($ts_array:expr, $min:expr, $max:expr) => {{
89 let len = $ts_array.len();
90 let mut buffer = MutableBuffer::new(len.div_ceil(64) * 8);
91
92 let f = |idx: usize| -> bool {
93 unsafe {
95 let val = $ts_array.value_unchecked(idx);
96 val >= $min && val < $max
97 }
98 };
99
100 let chunks = len / 64;
101 let remainder = len % 64;
102
103 for chunk in 0..chunks {
104 let mut packed = 0;
105 for bit_idx in 0..64 {
106 let i = bit_idx + chunk * 64;
107 packed |= (f(i) as u64) << bit_idx;
108 }
109 unsafe { buffer.push_unchecked(packed) }
111 }
112
113 if remainder != 0 {
114 let mut packed = 0;
115 for bit_idx in 0..remainder {
116 let i = bit_idx + chunks * 64;
117 packed |= (f(i) as u64) << bit_idx;
118 }
119 unsafe { buffer.push_unchecked(packed) }
121 }
122
123 BooleanArray::new(BooleanBuffer::new(buffer.into(), 0, len), None)
124 }};
125}
126
127macro_rules! handle_timestamp_array {
128 ($ts_array:expr, $array_type:ty, $min:expr, $max:expr) => {{
129 let ts_array = $ts_array.as_any().downcast_ref::<$array_type>().unwrap();
130 let filter = create_filter_buffer!(ts_array, $min, $max);
131
132 let res = arrow::compute::filter(ts_array, &filter).context(error::ComputeArrowSnafu)?;
133 if res.is_empty() {
134 return Ok(None);
135 }
136
137 let i64array = res.as_any().downcast_ref::<$array_type>().unwrap();
138 let max_ts = arrow::compute::max(i64array).unwrap();
140 let min_ts = arrow::compute::min(i64array).unwrap();
141
142 (res, filter, min_ts, max_ts)
143 }};
144}
145
146pub fn filter_record_batch(part: &BulkPart, min: i64, max: i64) -> Result<Option<BulkPart>> {
149 let ts_array = part.timestamps();
150 let (ts_array, filter, min_ts, max_ts) = match ts_array.data_type() {
151 DataType::Timestamp(unit, _) => match unit {
152 arrow::datatypes::TimeUnit::Second => {
153 handle_timestamp_array!(ts_array, TimestampSecondArray, min, max)
154 }
155 arrow::datatypes::TimeUnit::Millisecond => {
156 handle_timestamp_array!(ts_array, TimestampMillisecondArray, min, max)
157 }
158 arrow::datatypes::TimeUnit::Microsecond => {
159 handle_timestamp_array!(ts_array, TimestampMicrosecondArray, min, max)
160 }
161 arrow::datatypes::TimeUnit::Nanosecond => {
162 handle_timestamp_array!(ts_array, TimestampNanosecondArray, min, max)
163 }
164 },
165 _ => {
166 unreachable!("Got data type: {:?}", ts_array.data_type());
167 }
168 };
169
170 let num_rows = ts_array.len();
171 let arrays = part
172 .batch
173 .columns()
174 .iter()
175 .enumerate()
176 .map(|(index, array)| {
177 if index == part.timestamp_index {
178 Ok(ts_array.clone())
179 } else {
180 arrow::compute::filter(&array, &filter).context(error::ComputeArrowSnafu)
181 }
182 })
183 .collect::<Result<Vec<_>>>()?;
184 let batch = RecordBatch::try_new_with_options(
185 part.batch.schema(),
186 arrays,
187 &RecordBatchOptions::default().with_row_count(Some(num_rows)),
188 )
189 .context(error::NewRecordBatchSnafu)?;
190 Ok(Some(BulkPart {
191 batch,
192 max_timestamp: max_ts,
193 min_timestamp: min_ts,
194 sequence: part.sequence,
195 timestamp_index: part.timestamp_index,
196 raw_data: None,
197 }))
198}
199
200type PartitionVec = SmallVec<[TimePartition; 2]>;
201
202#[derive(Debug)]
204pub struct TimePartitions {
205 inner: Mutex<PartitionsInner>,
207 part_duration: Duration,
209 metadata: RegionMetadataRef,
211 builder: MemtableBuilderRef,
213 primary_key_codec: Arc<dyn PrimaryKeyCodec>,
215
216 bulk_schema: Option<SchemaRef>,
219}
220
221pub type TimePartitionsRef = Arc<TimePartitions>;
222
223impl TimePartitions {
224 pub fn new(
226 metadata: RegionMetadataRef,
227 builder: MemtableBuilderRef,
228 next_memtable_id: MemtableId,
229 part_duration: Option<Duration>,
230 ) -> Self {
231 let inner = PartitionsInner::new(next_memtable_id);
232 let primary_key_codec = build_primary_key_codec(&metadata);
233 let bulk_schema = builder.use_bulk_insert(&metadata).then(|| {
234 let opts = FlatSchemaOptions::from_encoding(metadata.primary_key_encoding);
235 to_flat_sst_arrow_schema(&metadata, &opts)
236 });
237
238 Self {
239 inner: Mutex::new(inner),
240 part_duration: part_duration.unwrap_or(INITIAL_TIME_WINDOW),
241 metadata,
242 builder,
243 primary_key_codec,
244 bulk_schema,
245 }
246 }
247
248 pub fn write(&self, kvs: &KeyValues) -> Result<()> {
252 if let Some(bulk_schema) = &self.bulk_schema {
253 let mut converter = BulkPartConverter::new(
254 &self.metadata,
255 bulk_schema.clone(),
256 kvs.num_rows(),
257 self.primary_key_codec.clone(),
258 true,
260 );
261 converter.append_key_values(kvs)?;
262 let part = converter.convert()?;
263
264 return self.write_bulk(part);
265 }
266
267 let parts = self.list_partitions();
269
270 for part in parts.iter().rev() {
273 let mut all_in_partition = true;
274 for kv in kvs.iter() {
275 let ts = kv.timestamp().try_into_timestamp().unwrap().unwrap();
277 if !part.contains_timestamp(ts) {
278 all_in_partition = false;
279 break;
280 }
281 }
282 if !all_in_partition {
283 continue;
284 }
285
286 return part.write(kvs);
288 }
289
290 self.write_multi_parts(kvs, &parts)
292 }
293
294 pub fn write_bulk(&self, part: BulkPart) -> Result<()> {
295 let time_type = self
296 .metadata
297 .time_index_column()
298 .column_schema
299 .data_type
300 .as_timestamp()
301 .unwrap();
302
303 let parts = self.list_partitions();
305 let (matching_parts, missing_parts) = self.find_partitions_by_time_range(
306 part.timestamps(),
307 &parts,
308 time_type.create_timestamp(part.min_timestamp),
309 time_type.create_timestamp(part.max_timestamp),
310 )?;
311
312 if matching_parts.len() == 1 && missing_parts.is_empty() {
313 return matching_parts[0].write_record_batch(part);
315 }
316
317 for matching in matching_parts {
318 matching.write_record_batch_partial(&part)?
319 }
320
321 for missing in missing_parts {
322 let new_part = {
323 let mut inner = self.inner.lock().unwrap();
324 self.get_or_create_time_partition(missing, &mut inner)?
325 };
326 new_part.write_record_batch_partial(&part)?;
327 }
328 Ok(())
329 }
330
331 fn get_or_create_time_partition(
334 &self,
335 part_start: Timestamp,
336 inner: &mut MutexGuard<PartitionsInner>,
337 ) -> Result<TimePartition> {
338 let part_pos = match inner
339 .parts
340 .iter()
341 .position(|part| part.time_range.min_timestamp == part_start)
342 {
343 Some(pos) => pos,
344 None => {
345 let range = PartTimeRange::from_start_duration(part_start, self.part_duration)
346 .with_context(|| InvalidRequestSnafu {
347 region_id: self.metadata.region_id,
348 reason: format!(
349 "Partition time range for {part_start:?} is out of bound, bucket size: {:?}", self.part_duration
350 ),
351 })?;
352 let memtable = self
353 .builder
354 .build(inner.alloc_memtable_id(), &self.metadata);
355 debug!(
356 "Create time partition {:?} for region {}, duration: {:?}, memtable_id: {}, parts_total: {}, metadata: {:?}",
357 range,
358 self.metadata.region_id,
359 self.part_duration,
360 memtable.id(),
361 inner.parts.len() + 1,
362 self.metadata,
363 );
364 let pos = inner.parts.len();
365 inner.parts.push(TimePartition {
366 memtable,
367 time_range: range,
368 });
369 pos
370 }
371 };
372 Ok(inner.parts[part_pos].clone())
373 }
374
375 pub fn list_memtables(&self, memtables: &mut Vec<MemtableRef>) {
377 let inner = self.inner.lock().unwrap();
378 memtables.extend(inner.parts.iter().map(|part| part.memtable.clone()));
379 }
380
381 pub fn num_partitions(&self) -> usize {
383 let inner = self.inner.lock().unwrap();
384 inner.parts.len()
385 }
386
387 pub fn is_empty(&self) -> bool {
389 let inner = self.inner.lock().unwrap();
390 inner.parts.iter().all(|part| part.memtable.is_empty())
391 }
392
393 pub fn freeze(&self) -> Result<()> {
395 let inner = self.inner.lock().unwrap();
396 for part in &*inner.parts {
397 part.memtable.freeze()?;
398 }
399 Ok(())
400 }
401
402 pub fn fork(&self, metadata: &RegionMetadataRef, part_duration: Option<Duration>) -> Self {
404 let part_duration = part_duration.unwrap_or(self.part_duration);
406
407 let mut inner = self.inner.lock().unwrap();
408 let latest_part = inner
409 .parts
410 .iter()
411 .max_by_key(|part| part.time_range.min_timestamp)
412 .cloned();
413
414 let Some(old_part) = latest_part else {
415 return Self::new(
417 metadata.clone(),
418 self.builder.clone(),
419 inner.next_memtable_id,
420 Some(part_duration),
421 );
422 };
423
424 let old_stats = old_part.memtable.stats();
425 let partitions_inner = old_stats
427 .time_range()
428 .and_then(|(_, old_stats_end_timestamp)| {
429 partition_start_timestamp(old_stats_end_timestamp, part_duration)
430 .and_then(|start| PartTimeRange::from_start_duration(start, part_duration))
431 })
432 .map(|part_time_range| {
433 let memtable = old_part.memtable.fork(inner.alloc_memtable_id(), metadata);
435 let part = TimePartition {
436 memtable,
437 time_range: part_time_range,
438 };
439 PartitionsInner::with_partition(part, inner.next_memtable_id)
440 })
441 .unwrap_or_else(|| PartitionsInner::new(inner.next_memtable_id));
442
443 Self {
444 inner: Mutex::new(partitions_inner),
445 part_duration,
446 metadata: metadata.clone(),
447 builder: self.builder.clone(),
448 primary_key_codec: self.primary_key_codec.clone(),
449 bulk_schema: self.bulk_schema.clone(),
450 }
451 }
452
453 pub(crate) fn part_duration(&self) -> Duration {
455 self.part_duration
456 }
457
458 pub(crate) fn memtable_builder(&self) -> &MemtableBuilderRef {
460 &self.builder
461 }
462
463 pub(crate) fn memory_usage(&self) -> usize {
465 let inner = self.inner.lock().unwrap();
466 inner
467 .parts
468 .iter()
469 .map(|part| part.memtable.stats().estimated_bytes)
470 .sum()
471 }
472
473 pub(crate) fn num_rows(&self) -> u64 {
475 let inner = self.inner.lock().unwrap();
476 inner
477 .parts
478 .iter()
479 .map(|part| part.memtable.stats().num_rows as u64)
480 .sum()
481 }
482
483 pub(crate) fn list_memtables_to_small_vec(&self, memtables: &mut SmallMemtableVec) {
485 let inner = self.inner.lock().unwrap();
486 memtables.extend(inner.parts.iter().map(|part| part.memtable.clone()));
487 }
488
489 pub(crate) fn next_memtable_id(&self) -> MemtableId {
491 let inner = self.inner.lock().unwrap();
492 inner.next_memtable_id
493 }
494
495 pub(crate) fn new_with_part_duration(
498 &self,
499 part_duration: Option<Duration>,
500 memtable_builder: Option<MemtableBuilderRef>,
501 ) -> Self {
502 debug_assert!(self.is_empty());
503
504 Self::new(
505 self.metadata.clone(),
506 memtable_builder.unwrap_or_else(|| self.builder.clone()),
507 self.next_memtable_id(),
508 Some(part_duration.unwrap_or(self.part_duration)),
509 )
510 }
511
512 fn list_partitions(&self) -> PartitionVec {
514 let inner = self.inner.lock().unwrap();
515 inner.parts.clone()
516 }
517
518 fn find_partitions_by_time_range<'a>(
521 &self,
522 ts_array: &ArrayRef,
523 existing_parts: &'a [TimePartition],
524 min: Timestamp,
525 max: Timestamp,
526 ) -> Result<(Vec<&'a TimePartition>, Vec<Timestamp>)> {
527 let mut matching = Vec::new();
528
529 let mut present = HashSet::new();
530 for part in existing_parts {
532 let part_time_range = &part.time_range;
533 if !(max < part_time_range.min_timestamp || min >= part_time_range.max_timestamp) {
534 matching.push(part);
535 present.insert(part_time_range.min_timestamp.value());
536 }
537 }
538
539 let part_duration = self.part_duration_or_default();
541 let timestamp_unit = self.metadata.time_index_type().unit();
542
543 let part_duration_sec = part_duration.as_secs() as i64;
544 let start_bucket = min
546 .convert_to(TimeUnit::Second)
547 .unwrap()
548 .value()
549 .div_euclid(part_duration_sec);
550 let end_bucket = max
551 .convert_to(TimeUnit::Second)
552 .unwrap()
553 .value()
554 .div_euclid(part_duration_sec);
555 let bucket_num = (end_bucket - start_bucket + 1) as usize;
556
557 let num_timestamps = ts_array.len();
558 let missing = if bucket_num <= num_timestamps {
559 (start_bucket..=end_bucket)
560 .filter_map(|start_sec| {
561 let Some(timestamp) = Timestamp::new_second(start_sec * part_duration_sec)
562 .convert_to(timestamp_unit)
563 else {
564 return Some(
565 InvalidRequestSnafu {
566 region_id: self.metadata.region_id,
567 reason: format!("Timestamp out of range: {}", start_sec),
568 }
569 .fail(),
570 );
571 };
572 if present.insert(timestamp.value()) {
573 Some(Ok(timestamp))
574 } else {
575 None
576 }
577 })
578 .collect::<Result<Vec<_>>>()?
579 } else {
580 let ts_primitive = match ts_array.data_type() {
581 DataType::Timestamp(unit, _) => match unit {
582 arrow::datatypes::TimeUnit::Second => ts_array
583 .as_any()
584 .downcast_ref::<TimestampSecondArray>()
585 .unwrap()
586 .reinterpret_cast::<Int64Type>(),
587 arrow::datatypes::TimeUnit::Millisecond => ts_array
588 .as_any()
589 .downcast_ref::<TimestampMillisecondArray>()
590 .unwrap()
591 .reinterpret_cast::<Int64Type>(),
592 arrow::datatypes::TimeUnit::Microsecond => ts_array
593 .as_any()
594 .downcast_ref::<TimestampMicrosecondArray>()
595 .unwrap()
596 .reinterpret_cast::<Int64Type>(),
597 arrow::datatypes::TimeUnit::Nanosecond => ts_array
598 .as_any()
599 .downcast_ref::<TimestampNanosecondArray>()
600 .unwrap()
601 .reinterpret_cast::<Int64Type>(),
602 },
603 _ => unreachable!(),
604 };
605
606 ts_primitive
607 .values()
608 .iter()
609 .filter_map(|ts| {
610 let ts = self.metadata.time_index_type().create_timestamp(*ts);
611 let Some(bucket_start) = ts
612 .convert_to(TimeUnit::Second)
613 .and_then(|ts| ts.align_by_bucket(part_duration_sec))
614 .and_then(|ts| ts.convert_to(timestamp_unit))
615 else {
616 return Some(
617 InvalidRequestSnafu {
618 region_id: self.metadata.region_id,
619 reason: format!("Timestamp out of range: {:?}", ts),
620 }
621 .fail(),
622 );
623 };
624 if present.insert(bucket_start.value()) {
625 Some(Ok(bucket_start))
626 } else {
627 None
628 }
629 })
630 .collect::<Result<Vec<_>>>()?
631 };
632 Ok((matching, missing))
633 }
634
635 fn part_duration_or_default(&self) -> Duration {
637 self.part_duration
638 }
639
640 fn write_multi_parts(&self, kvs: &KeyValues, parts: &PartitionVec) -> Result<()> {
642 let mut parts_to_write = HashMap::new();
643 let mut missing_parts = HashMap::new();
644 for kv in kvs.iter() {
645 let mut part_found = false;
646 let ts = kv.timestamp().try_into_timestamp().unwrap().unwrap();
648 for part in parts {
649 if part.contains_timestamp(ts) {
650 parts_to_write
651 .entry(part.time_range.min_timestamp)
652 .or_insert_with(|| PartitionToWrite {
653 partition: part.clone(),
654 key_values: Vec::new(),
655 })
656 .key_values
657 .push(kv);
658 part_found = true;
659 break;
660 }
661 }
662
663 if !part_found {
664 let part_duration = self.part_duration_or_default();
667 let part_start =
668 partition_start_timestamp(ts, part_duration).with_context(|| {
669 InvalidRequestSnafu {
670 region_id: self.metadata.region_id,
671 reason: format!(
672 "timestamp {ts:?} and bucket {part_duration:?} are out of range"
673 ),
674 }
675 })?;
676 missing_parts
677 .entry(part_start)
678 .or_insert_with(Vec::new)
679 .push(kv);
680 }
681 }
682
683 for part_to_write in parts_to_write.into_values() {
685 for kv in part_to_write.key_values {
686 part_to_write.partition.memtable.write_one(kv)?;
687 }
688 }
689
690 let mut inner = self.inner.lock().unwrap();
693 for (part_start, key_values) in missing_parts {
694 let partition = self.get_or_create_time_partition(part_start, &mut inner)?;
695 for kv in key_values {
696 partition.memtable.write_one(kv)?;
697 }
698 }
699
700 Ok(())
701 }
702
703 pub(crate) fn series_count(&self) -> usize {
705 self.inner.lock().unwrap().series_count()
706 }
707}
708
709fn partition_start_timestamp(ts: Timestamp, bucket: Duration) -> Option<Timestamp> {
713 let ts_sec = ts.convert_to(TimeUnit::Second).unwrap();
715 let bucket_sec: i64 = bucket.as_secs().try_into().ok()?;
716 let start_sec = ts_sec.align_by_bucket(bucket_sec)?;
717 start_sec.convert_to(ts.unit())
718}
719
720#[derive(Debug)]
721struct PartitionsInner {
722 parts: PartitionVec,
724 next_memtable_id: MemtableId,
726}
727
728impl PartitionsInner {
729 fn new(next_memtable_id: MemtableId) -> Self {
730 Self {
731 parts: Default::default(),
732 next_memtable_id,
733 }
734 }
735
736 fn with_partition(part: TimePartition, next_memtable_id: MemtableId) -> Self {
737 Self {
738 parts: smallvec![part],
739 next_memtable_id,
740 }
741 }
742
743 fn alloc_memtable_id(&mut self) -> MemtableId {
744 let id = self.next_memtable_id;
745 self.next_memtable_id += 1;
746 id
747 }
748
749 pub(crate) fn series_count(&self) -> usize {
750 self.parts
751 .iter()
752 .map(|p| p.memtable.stats().series_count)
753 .sum()
754 }
755}
756
757#[derive(Debug, Clone, Copy)]
759struct PartTimeRange {
760 min_timestamp: Timestamp,
762 max_timestamp: Timestamp,
764}
765
766impl PartTimeRange {
767 fn from_start_duration(start: Timestamp, duration: Duration) -> Option<Self> {
768 let start_sec = start.convert_to(TimeUnit::Second)?;
769 let end_sec = start_sec.add_duration(duration).ok()?;
770 let min_timestamp = start_sec.convert_to(start.unit())?;
771 let max_timestamp = end_sec.convert_to(start.unit())?;
772
773 Some(Self {
774 min_timestamp,
775 max_timestamp,
776 })
777 }
778
779 fn contains_timestamp(&self, ts: Timestamp) -> bool {
781 self.min_timestamp <= ts && ts < self.max_timestamp
782 }
783}
784
785struct PartitionToWrite<'a> {
786 partition: TimePartition,
787 key_values: Vec<KeyValue<'a>>,
788}
789
790#[cfg(test)]
791mod tests {
792 use std::sync::Arc;
793
794 use api::v1::SemanticType;
795 use datatypes::arrow::array::{ArrayRef, StringArray, TimestampMillisecondArray};
796 use datatypes::arrow::datatypes::{DataType, Field, Schema};
797 use datatypes::arrow::record_batch::RecordBatch;
798 use datatypes::prelude::ConcreteDataType;
799 use datatypes::schema::ColumnSchema;
800 use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
801 use store_api::storage::SequenceNumber;
802
803 use super::*;
804 use crate::memtable::partition_tree::PartitionTreeMemtableBuilder;
805 use crate::memtable::time_series::TimeSeriesMemtableBuilder;
806 use crate::test_util::memtable_util::{self, collect_iter_timestamps};
807
808 #[test]
809 fn test_no_duration() {
810 let metadata = memtable_util::metadata_for_test();
811 let builder = Arc::new(PartitionTreeMemtableBuilder::default());
812 let partitions = TimePartitions::new(metadata.clone(), builder, 0, None);
813 assert_eq!(0, partitions.num_partitions());
814 assert!(partitions.is_empty());
815
816 let kvs = memtable_util::build_key_values(
817 &metadata,
818 "hello".to_string(),
819 0,
820 &[1000, 3000, 7000, 5000, 6000],
821 0, );
823 partitions.write(&kvs).unwrap();
824
825 assert_eq!(1, partitions.num_partitions());
826 assert!(!partitions.is_empty());
827 let mut memtables = Vec::new();
828 partitions.list_memtables(&mut memtables);
829 assert_eq!(0, memtables[0].id());
830
831 let iter = memtables[0].iter(None, None, None).unwrap();
832 let timestamps = collect_iter_timestamps(iter);
833 assert_eq!(&[1000, 3000, 5000, 6000, 7000], ×tamps[..]);
834 }
835
836 #[test]
837 fn test_write_single_part() {
838 let metadata = memtable_util::metadata_for_test();
839 let builder = Arc::new(PartitionTreeMemtableBuilder::default());
840 let partitions =
841 TimePartitions::new(metadata.clone(), builder, 0, Some(Duration::from_secs(10)));
842 assert_eq!(0, partitions.num_partitions());
843
844 let kvs = memtable_util::build_key_values(
845 &metadata,
846 "hello".to_string(),
847 0,
848 &[5000, 2000, 0],
849 0, );
851 partitions.write(&kvs).unwrap();
853 assert_eq!(1, partitions.num_partitions());
854 assert!(!partitions.is_empty());
855
856 let kvs = memtable_util::build_key_values(
857 &metadata,
858 "hello".to_string(),
859 0,
860 &[3000, 7000, 4000],
861 3, );
863 partitions.write(&kvs).unwrap();
865 assert_eq!(1, partitions.num_partitions());
866
867 let mut memtables = Vec::new();
868 partitions.list_memtables(&mut memtables);
869 let iter = memtables[0].iter(None, None, None).unwrap();
870 let timestamps = collect_iter_timestamps(iter);
871 assert_eq!(&[0, 2000, 3000, 4000, 5000, 7000], ×tamps[..]);
872 let parts = partitions.list_partitions();
873 assert_eq!(
874 Timestamp::new_millisecond(0),
875 parts[0].time_range.min_timestamp
876 );
877 assert_eq!(
878 Timestamp::new_millisecond(10000),
879 parts[0].time_range.max_timestamp
880 );
881 }
882
883 #[cfg(test)]
884 fn new_multi_partitions(metadata: &RegionMetadataRef) -> TimePartitions {
885 let builder = Arc::new(PartitionTreeMemtableBuilder::default());
886 let partitions =
887 TimePartitions::new(metadata.clone(), builder, 0, Some(Duration::from_secs(5)));
888 assert_eq!(0, partitions.num_partitions());
889
890 let kvs = memtable_util::build_key_values(
891 metadata,
892 "hello".to_string(),
893 0,
894 &[2000, 0],
895 0, );
897 partitions.write(&kvs).unwrap();
899 assert_eq!(1, partitions.num_partitions());
900 assert!(!partitions.is_empty());
901
902 let kvs = memtable_util::build_key_values(
903 metadata,
904 "hello".to_string(),
905 0,
906 &[3000, 7000, 4000, 5000],
907 2, );
909 partitions.write(&kvs).unwrap();
911 assert_eq!(2, partitions.num_partitions());
912
913 partitions
914 }
915
916 #[test]
917 fn test_write_multi_parts() {
918 let metadata = memtable_util::metadata_for_test();
919 let partitions = new_multi_partitions(&metadata);
920
921 let parts = partitions.list_partitions();
922 let iter = parts[0].memtable.iter(None, None, None).unwrap();
923 let timestamps = collect_iter_timestamps(iter);
924 assert_eq!(0, parts[0].memtable.id());
925 assert_eq!(
926 Timestamp::new_millisecond(0),
927 parts[0].time_range.min_timestamp
928 );
929 assert_eq!(
930 Timestamp::new_millisecond(5000),
931 parts[0].time_range.max_timestamp
932 );
933 assert_eq!(&[0, 2000, 3000, 4000], ×tamps[..]);
934 let iter = parts[1].memtable.iter(None, None, None).unwrap();
935 assert_eq!(1, parts[1].memtable.id());
936 let timestamps = collect_iter_timestamps(iter);
937 assert_eq!(&[5000, 7000], ×tamps[..]);
938 assert_eq!(
939 Timestamp::new_millisecond(5000),
940 parts[1].time_range.min_timestamp
941 );
942 assert_eq!(
943 Timestamp::new_millisecond(10000),
944 parts[1].time_range.max_timestamp
945 );
946 }
947
948 #[test]
949 fn test_new_with_part_duration() {
950 let metadata = memtable_util::metadata_for_test();
951 let builder = Arc::new(PartitionTreeMemtableBuilder::default());
952 let partitions = TimePartitions::new(metadata.clone(), builder.clone(), 0, None);
953
954 let new_parts = partitions.new_with_part_duration(Some(Duration::from_secs(5)), None);
955 assert_eq!(Duration::from_secs(5), new_parts.part_duration());
956 assert_eq!(0, new_parts.next_memtable_id());
957
958 let new_parts = new_parts.new_with_part_duration(None, None);
960 assert_eq!(Duration::from_secs(5), new_parts.part_duration());
961 assert_eq!(0, new_parts.next_memtable_id());
963
964 let new_parts = new_parts.new_with_part_duration(Some(Duration::from_secs(10)), None);
965 assert_eq!(Duration::from_secs(10), new_parts.part_duration());
966 assert_eq!(0, new_parts.next_memtable_id());
968
969 let builder = Arc::new(PartitionTreeMemtableBuilder::default());
970 let partitions = TimePartitions::new(metadata.clone(), builder.clone(), 0, None);
971 let new_parts = partitions.new_with_part_duration(None, None);
973 assert_eq!(INITIAL_TIME_WINDOW, new_parts.part_duration());
974 assert_eq!(0, new_parts.next_memtable_id());
975 }
976
977 #[test]
978 fn test_fork_empty() {
979 let metadata = memtable_util::metadata_for_test();
980 let builder = Arc::new(PartitionTreeMemtableBuilder::default());
981 let partitions = TimePartitions::new(metadata.clone(), builder, 0, None);
982 partitions.freeze().unwrap();
983 let new_parts = partitions.fork(&metadata, None);
984 assert_eq!(INITIAL_TIME_WINDOW, new_parts.part_duration());
985 assert!(new_parts.list_partitions().is_empty());
986 assert_eq!(0, new_parts.next_memtable_id());
987
988 new_parts.freeze().unwrap();
989 let new_parts = new_parts.fork(&metadata, Some(Duration::from_secs(5)));
990 assert_eq!(Duration::from_secs(5), new_parts.part_duration());
991 assert!(new_parts.list_partitions().is_empty());
992 assert_eq!(0, new_parts.next_memtable_id());
993
994 new_parts.freeze().unwrap();
995 let new_parts = new_parts.fork(&metadata, None);
996 assert_eq!(Duration::from_secs(5), new_parts.part_duration());
998 assert!(new_parts.list_partitions().is_empty());
999 assert_eq!(0, new_parts.next_memtable_id());
1000
1001 new_parts.freeze().unwrap();
1002 let new_parts = new_parts.fork(&metadata, Some(Duration::from_secs(10)));
1003 assert_eq!(Duration::from_secs(10), new_parts.part_duration());
1004 assert!(new_parts.list_partitions().is_empty());
1005 assert_eq!(0, new_parts.next_memtable_id());
1006 }
1007
1008 #[test]
1009 fn test_fork_non_empty_none() {
1010 let metadata = memtable_util::metadata_for_test();
1011 let partitions = new_multi_partitions(&metadata);
1012 partitions.freeze().unwrap();
1013
1014 let new_parts = partitions.fork(&metadata, None);
1016 assert!(new_parts.is_empty());
1017 assert_eq!(Duration::from_secs(5), new_parts.part_duration());
1018 assert_eq!(2, new_parts.list_partitions()[0].memtable.id());
1019 assert_eq!(3, new_parts.next_memtable_id());
1020
1021 let new_parts = partitions.fork(&metadata, Some(Duration::from_secs(10)));
1023 assert!(new_parts.is_empty());
1024 assert_eq!(Duration::from_secs(10), new_parts.part_duration());
1025 assert_eq!(3, new_parts.list_partitions()[0].memtable.id());
1026 assert_eq!(4, new_parts.next_memtable_id());
1027 }
1028
1029 #[test]
1030 fn test_find_partitions_by_time_range() {
1031 let metadata = memtable_util::metadata_for_test();
1032 let builder = Arc::new(PartitionTreeMemtableBuilder::default());
1033
1034 let partitions = TimePartitions::new(metadata.clone(), builder.clone(), 0, None);
1036 let parts = partitions.list_partitions();
1037 let (matching, missing) = partitions
1038 .find_partitions_by_time_range(
1039 &(Arc::new(TimestampMillisecondArray::from_iter_values(1000..=2000)) as ArrayRef),
1040 &parts,
1041 Timestamp::new_millisecond(1000),
1042 Timestamp::new_millisecond(2000),
1043 )
1044 .unwrap();
1045 assert_eq!(matching.len(), 0);
1046 assert_eq!(missing.len(), 1);
1047 assert_eq!(missing[0], Timestamp::new_millisecond(0));
1048
1049 let partitions = TimePartitions::new(
1051 metadata.clone(),
1052 builder.clone(),
1053 0,
1054 Some(Duration::from_secs(5)),
1055 );
1056
1057 let kvs =
1059 memtable_util::build_key_values(&metadata, "hello".to_string(), 0, &[2000, 4000], 0);
1060 partitions.write(&kvs).unwrap();
1061 let kvs =
1062 memtable_util::build_key_values(&metadata, "hello".to_string(), 0, &[7000, 8000], 2);
1063 partitions.write(&kvs).unwrap();
1064
1065 let parts = partitions.list_partitions();
1066 assert_eq!(2, parts.len());
1067
1068 let (matching, missing) = partitions
1070 .find_partitions_by_time_range(
1071 &(Arc::new(TimestampMillisecondArray::from_iter_values(2000..=4000)) as ArrayRef),
1072 &parts,
1073 Timestamp::new_millisecond(2000),
1074 Timestamp::new_millisecond(4000),
1075 )
1076 .unwrap();
1077 assert_eq!(matching.len(), 1);
1078 assert!(missing.is_empty());
1079 assert_eq!(matching[0].time_range.min_timestamp.value(), 0);
1080
1081 let (matching, missing) = partitions
1083 .find_partitions_by_time_range(
1084 &(Arc::new(TimestampMillisecondArray::from_iter_values(3000..=8000)) as ArrayRef),
1085 &parts,
1086 Timestamp::new_millisecond(3000),
1087 Timestamp::new_millisecond(8000),
1088 )
1089 .unwrap();
1090 assert_eq!(matching.len(), 2);
1091 assert!(missing.is_empty());
1092 assert_eq!(matching[0].time_range.min_timestamp.value(), 0);
1093 assert_eq!(matching[1].time_range.min_timestamp.value(), 5000);
1094
1095 let (matching, missing) = partitions
1097 .find_partitions_by_time_range(
1098 &(Arc::new(TimestampMillisecondArray::from_iter_values(12000..=13000)) as ArrayRef),
1099 &parts,
1100 Timestamp::new_millisecond(12000),
1101 Timestamp::new_millisecond(13000),
1102 )
1103 .unwrap();
1104 assert!(matching.is_empty());
1105 assert_eq!(missing.len(), 1);
1106 assert_eq!(missing[0].value(), 10000);
1107
1108 let (matching, missing) = partitions
1110 .find_partitions_by_time_range(
1111 &(Arc::new(TimestampMillisecondArray::from_iter_values(4000..=6000)) as ArrayRef),
1112 &parts,
1113 Timestamp::new_millisecond(4000),
1114 Timestamp::new_millisecond(6000),
1115 )
1116 .unwrap();
1117 assert_eq!(matching.len(), 2);
1118 assert!(missing.is_empty());
1119 assert_eq!(matching[0].time_range.min_timestamp.value(), 0);
1120 assert_eq!(matching[1].time_range.min_timestamp.value(), 5000);
1121
1122 let (matching, missing) = partitions
1124 .find_partitions_by_time_range(
1125 &(Arc::new(TimestampMillisecondArray::from_iter_values(4999..=5000)) as ArrayRef),
1126 &parts,
1127 Timestamp::new_millisecond(4999),
1128 Timestamp::new_millisecond(5000),
1129 )
1130 .unwrap();
1131 assert_eq!(matching.len(), 2);
1132 assert!(missing.is_empty());
1133 assert_eq!(matching[0].time_range.min_timestamp.value(), 0);
1134 assert_eq!(matching[1].time_range.min_timestamp.value(), 5000);
1135
1136 let (matching, missing) = partitions
1138 .find_partitions_by_time_range(
1139 &(Arc::new(TimestampMillisecondArray::from_iter_values(9999..=10000)) as ArrayRef),
1140 &parts,
1141 Timestamp::new_millisecond(9999),
1142 Timestamp::new_millisecond(10000),
1143 )
1144 .unwrap();
1145 assert_eq!(matching.len(), 1);
1146 assert_eq!(1, missing.len());
1147 assert_eq!(matching[0].time_range.min_timestamp.value(), 5000);
1148 assert_eq!(missing[0].value(), 10000);
1149
1150 let (matching, missing) = partitions
1152 .find_partitions_by_time_range(
1153 &(Arc::new(TimestampMillisecondArray::from_iter_values(-1000..=1000)) as ArrayRef),
1154 &parts,
1155 Timestamp::new_millisecond(-1000),
1156 Timestamp::new_millisecond(1000),
1157 )
1158 .unwrap();
1159 assert_eq!(matching.len(), 1);
1160 assert_eq!(matching[0].time_range.min_timestamp.value(), 0);
1161 assert_eq!(1, missing.len());
1162 assert_eq!(missing[0].value(), -5000);
1163
1164 let (matching, missing) = partitions
1166 .find_partitions_by_time_range(
1167 &(Arc::new(TimestampMillisecondArray::from(vec![
1168 -100000000000,
1169 0,
1170 100000000000,
1171 ])) as ArrayRef),
1172 &parts,
1173 Timestamp::new_millisecond(-100000000000),
1174 Timestamp::new_millisecond(100000000000),
1175 )
1176 .unwrap();
1177 assert_eq!(2, matching.len());
1178 assert_eq!(matching[0].time_range.min_timestamp.value(), 0);
1179 assert_eq!(matching[1].time_range.min_timestamp.value(), 5000);
1180 assert_eq!(2, missing.len());
1181 assert_eq!(missing[0].value(), -100000000000);
1182 assert_eq!(missing[1].value(), 100000000000);
1183 }
1184
1185 fn build_part(ts: &[i64], sequence: SequenceNumber) -> BulkPart {
1186 let schema = Arc::new(Schema::new(vec![
1187 Field::new(
1188 "ts",
1189 DataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
1190 false,
1191 ),
1192 Field::new("val", DataType::Utf8, true),
1193 ]));
1194 let ts_data = ts.to_vec();
1195 let ts_array = Arc::new(TimestampMillisecondArray::from(ts_data));
1196 let val_array = Arc::new(StringArray::from_iter_values(
1197 ts.iter().map(|v| v.to_string()),
1198 ));
1199 let batch = RecordBatch::try_new(
1200 schema,
1201 vec![ts_array.clone() as ArrayRef, val_array.clone() as ArrayRef],
1202 )
1203 .unwrap();
1204 let max_ts = ts.iter().max().copied().unwrap();
1205 let min_ts = ts.iter().min().copied().unwrap();
1206 BulkPart {
1207 batch,
1208 max_timestamp: max_ts,
1209 min_timestamp: min_ts,
1210 sequence,
1211 timestamp_index: 0,
1212 raw_data: None,
1213 }
1214 }
1215
1216 #[test]
1217 fn test_write_bulk() {
1218 let mut metadata_builder = RegionMetadataBuilder::new(0.into());
1219 metadata_builder
1220 .push_column_metadata(ColumnMetadata {
1221 column_schema: ColumnSchema::new(
1222 "ts",
1223 ConcreteDataType::timestamp_millisecond_datatype(),
1224 false,
1225 ),
1226 semantic_type: SemanticType::Timestamp,
1227 column_id: 0,
1228 })
1229 .push_column_metadata(ColumnMetadata {
1230 column_schema: ColumnSchema::new("val", ConcreteDataType::string_datatype(), false),
1231 semantic_type: SemanticType::Field,
1232 column_id: 1,
1233 })
1234 .primary_key(vec![]);
1235 let metadata = Arc::new(metadata_builder.build().unwrap());
1236
1237 let builder = Arc::new(TimeSeriesMemtableBuilder::default());
1238 let partitions = TimePartitions::new(
1239 metadata.clone(),
1240 builder.clone(),
1241 0,
1242 Some(Duration::from_secs(5)),
1243 );
1244
1245 partitions
1247 .write_bulk(build_part(&[1000, 2000, 3000], 0))
1248 .unwrap();
1249
1250 let parts = partitions.list_partitions();
1251 assert_eq!(1, parts.len());
1252 let iter = parts[0].memtable.iter(None, None, None).unwrap();
1253 let timestamps = collect_iter_timestamps(iter);
1254 assert_eq!(&[1000, 2000, 3000], ×tamps[..]);
1255
1256 partitions
1258 .write_bulk(build_part(&[4000, 5000, 6000], 1))
1259 .unwrap();
1260 let parts = partitions.list_partitions();
1261 assert_eq!(2, parts.len());
1262 let iter = parts[0].memtable.iter(None, None, None).unwrap();
1264 let timestamps = collect_iter_timestamps(iter);
1265 assert_eq!(&[1000, 2000, 3000, 4000], ×tamps[..]);
1266 let iter = parts[1].memtable.iter(None, None, None).unwrap();
1268 let timestamps = collect_iter_timestamps(iter);
1269 assert_eq!(&[5000, 6000], ×tamps[..]);
1270
1271 partitions
1273 .write_bulk(build_part(&[11000, 12000], 3))
1274 .unwrap();
1275
1276 let parts = partitions.list_partitions();
1277 assert_eq!(3, parts.len());
1278
1279 let iter = parts[2].memtable.iter(None, None, None).unwrap();
1281 let timestamps = collect_iter_timestamps(iter);
1282 assert_eq!(&[11000, 12000], ×tamps[..]);
1283
1284 let partitions = TimePartitions::new(metadata.clone(), builder, 3, None);
1286
1287 partitions
1288 .write_bulk(build_part(&[1000, 5000, 9000], 4))
1289 .unwrap();
1290
1291 let parts = partitions.list_partitions();
1292 assert_eq!(1, parts.len());
1293 let iter = parts[0].memtable.iter(None, None, None).unwrap();
1294 let timestamps = collect_iter_timestamps(iter);
1295 assert_eq!(&[1000, 5000, 9000], ×tamps[..]);
1296 }
1297
1298 #[test]
1299 fn test_split_record_batch() {
1300 let schema = Arc::new(Schema::new(vec![
1301 Field::new(
1302 "ts",
1303 DataType::Timestamp(TimeUnit::Millisecond.as_arrow_time_unit(), None),
1304 false,
1305 ),
1306 Field::new("val", DataType::Utf8, true),
1307 ]));
1308
1309 let ts_array = Arc::new(TimestampMillisecondArray::from(vec![
1310 1000, 2000, 5000, 7000, 8000,
1311 ]));
1312 let val_array = Arc::new(StringArray::from(vec!["a", "b", "c", "d", "e"]));
1313 let batch = RecordBatch::try_new(
1314 schema.clone(),
1315 vec![ts_array as ArrayRef, val_array as ArrayRef],
1316 )
1317 .unwrap();
1318
1319 let part = BulkPart {
1320 batch,
1321 max_timestamp: 8000,
1322 min_timestamp: 1000,
1323 sequence: 0,
1324 timestamp_index: 0,
1325 raw_data: None,
1326 };
1327
1328 let result = filter_record_batch(&part, 1000, 2000).unwrap();
1329 assert!(result.is_some());
1330 let filtered = result.unwrap();
1331 assert_eq!(filtered.num_rows(), 1);
1332 assert_eq!(filtered.min_timestamp, 1000);
1333 assert_eq!(filtered.max_timestamp, 1000);
1334
1335 let result = filter_record_batch(&part, 3000, 6000).unwrap();
1337 assert!(result.is_some());
1338 let filtered = result.unwrap();
1339 assert_eq!(filtered.num_rows(), 1);
1340 assert_eq!(filtered.min_timestamp, 5000);
1341 assert_eq!(filtered.max_timestamp, 5000);
1342
1343 let result = filter_record_batch(&part, 3000, 4000).unwrap();
1345 assert!(result.is_none());
1346
1347 let result = filter_record_batch(&part, 0, 9000).unwrap();
1349 assert!(result.is_some());
1350 let filtered = result.unwrap();
1351 assert_eq!(filtered.num_rows(), 5);
1352 assert_eq!(filtered.min_timestamp, 1000);
1353 assert_eq!(filtered.max_timestamp, 8000);
1354 }
1355}