1use std::collections::{HashMap, HashSet};
18use std::sync::{Arc, Mutex, MutexGuard};
19use std::time::Duration;
20
21use common_telemetry::debug;
22use common_time::timestamp::TimeUnit;
23use common_time::timestamp_millis::BucketAligned;
24use common_time::Timestamp;
25use datatypes::arrow;
26use datatypes::arrow::array::{
27 ArrayRef, BooleanArray, RecordBatch, RecordBatchOptions, TimestampMicrosecondArray,
28 TimestampMillisecondArray, TimestampNanosecondArray, TimestampSecondArray,
29};
30use datatypes::arrow::buffer::{BooleanBuffer, MutableBuffer};
31use datatypes::arrow::datatypes::{DataType, Int64Type, SchemaRef};
32use mito_codec::key_values::KeyValue;
33use mito_codec::row_converter::{build_primary_key_codec, PrimaryKeyCodec};
34use smallvec::{smallvec, SmallVec};
35use snafu::{OptionExt, ResultExt};
36use store_api::metadata::RegionMetadataRef;
37
38use crate::error;
39use crate::error::{InvalidRequestSnafu, Result};
40use crate::memtable::bulk::part::{BulkPart, BulkPartConverter};
41use crate::memtable::version::SmallMemtableVec;
42use crate::memtable::{KeyValues, MemtableBuilderRef, MemtableId, MemtableRef};
43use crate::sst::{to_flat_sst_arrow_schema, FlatSchemaOptions};
44
45const INITIAL_TIME_WINDOW: Duration = Duration::from_days(1);
47
48#[derive(Debug, Clone)]
50pub struct TimePartition {
51 memtable: MemtableRef,
53 time_range: PartTimeRange,
55}
56
57impl TimePartition {
58 fn contains_timestamp(&self, ts: Timestamp) -> bool {
60 self.time_range.contains_timestamp(ts)
61 }
62
63 fn write(&self, kvs: &KeyValues) -> Result<()> {
65 self.memtable.write(kvs)
66 }
67
68 fn write_record_batch(&self, rb: BulkPart) -> Result<()> {
70 self.memtable.write_bulk(rb)
71 }
72
73 fn write_record_batch_partial(&self, part: &BulkPart) -> Result<()> {
75 let Some(filtered) = filter_record_batch(
76 part,
77 self.time_range.min_timestamp.value(),
78 self.time_range.max_timestamp.value(),
79 )?
80 else {
81 return Ok(());
82 };
83 self.write_record_batch(filtered)
84 }
85}
86
87macro_rules! create_filter_buffer {
88 ($ts_array:expr, $min:expr, $max:expr) => {{
89 let len = $ts_array.len();
90 let mut buffer = MutableBuffer::new(len.div_ceil(64) * 8);
91
92 let f = |idx: usize| -> bool {
93 unsafe {
95 let val = $ts_array.value_unchecked(idx);
96 val >= $min && val < $max
97 }
98 };
99
100 let chunks = len / 64;
101 let remainder = len % 64;
102
103 for chunk in 0..chunks {
104 let mut packed = 0;
105 for bit_idx in 0..64 {
106 let i = bit_idx + chunk * 64;
107 packed |= (f(i) as u64) << bit_idx;
108 }
109 unsafe { buffer.push_unchecked(packed) }
111 }
112
113 if remainder != 0 {
114 let mut packed = 0;
115 for bit_idx in 0..remainder {
116 let i = bit_idx + chunks * 64;
117 packed |= (f(i) as u64) << bit_idx;
118 }
119 unsafe { buffer.push_unchecked(packed) }
121 }
122
123 BooleanArray::new(BooleanBuffer::new(buffer.into(), 0, len), None)
124 }};
125}
126
127macro_rules! handle_timestamp_array {
128 ($ts_array:expr, $array_type:ty, $min:expr, $max:expr) => {{
129 let ts_array = $ts_array.as_any().downcast_ref::<$array_type>().unwrap();
130 let filter = create_filter_buffer!(ts_array, $min, $max);
131
132 let res = arrow::compute::filter(ts_array, &filter).context(error::ComputeArrowSnafu)?;
133 if res.is_empty() {
134 return Ok(None);
135 }
136
137 let i64array = res.as_any().downcast_ref::<$array_type>().unwrap();
138 let max_ts = arrow::compute::max(i64array).unwrap();
140 let min_ts = arrow::compute::min(i64array).unwrap();
141
142 (res, filter, min_ts, max_ts)
143 }};
144}
145
146pub fn filter_record_batch(part: &BulkPart, min: i64, max: i64) -> Result<Option<BulkPart>> {
149 let ts_array = part.timestamps();
150 let (ts_array, filter, min_ts, max_ts) = match ts_array.data_type() {
151 DataType::Timestamp(unit, _) => match unit {
152 arrow::datatypes::TimeUnit::Second => {
153 handle_timestamp_array!(ts_array, TimestampSecondArray, min, max)
154 }
155 arrow::datatypes::TimeUnit::Millisecond => {
156 handle_timestamp_array!(ts_array, TimestampMillisecondArray, min, max)
157 }
158 arrow::datatypes::TimeUnit::Microsecond => {
159 handle_timestamp_array!(ts_array, TimestampMicrosecondArray, min, max)
160 }
161 arrow::datatypes::TimeUnit::Nanosecond => {
162 handle_timestamp_array!(ts_array, TimestampNanosecondArray, min, max)
163 }
164 },
165 _ => {
166 unreachable!("Got data type: {:?}", ts_array.data_type());
167 }
168 };
169
170 let num_rows = ts_array.len();
171 let arrays = part
172 .batch
173 .columns()
174 .iter()
175 .enumerate()
176 .map(|(index, array)| {
177 if index == part.timestamp_index {
178 Ok(ts_array.clone())
179 } else {
180 arrow::compute::filter(&array, &filter).context(error::ComputeArrowSnafu)
181 }
182 })
183 .collect::<Result<Vec<_>>>()?;
184 let batch = RecordBatch::try_new_with_options(
185 part.batch.schema(),
186 arrays,
187 &RecordBatchOptions::default().with_row_count(Some(num_rows)),
188 )
189 .context(error::NewRecordBatchSnafu)?;
190 Ok(Some(BulkPart {
191 batch,
192 max_ts,
193 min_ts,
194 sequence: part.sequence,
195 timestamp_index: part.timestamp_index,
196 raw_data: None,
197 }))
198}
199
200type PartitionVec = SmallVec<[TimePartition; 2]>;
201
202#[derive(Debug)]
204pub struct TimePartitions {
205 inner: Mutex<PartitionsInner>,
207 part_duration: Duration,
209 metadata: RegionMetadataRef,
211 builder: MemtableBuilderRef,
213 primary_key_codec: Arc<dyn PrimaryKeyCodec>,
215
216 bulk_schema: Option<SchemaRef>,
219}
220
221pub type TimePartitionsRef = Arc<TimePartitions>;
222
223impl TimePartitions {
224 pub fn new(
226 metadata: RegionMetadataRef,
227 builder: MemtableBuilderRef,
228 next_memtable_id: MemtableId,
229 part_duration: Option<Duration>,
230 ) -> Self {
231 let inner = PartitionsInner::new(next_memtable_id);
232 let primary_key_codec = build_primary_key_codec(&metadata);
233 let bulk_schema = builder.use_bulk_insert(&metadata).then(|| {
234 let opts = FlatSchemaOptions::from_encoding(metadata.primary_key_encoding);
235 to_flat_sst_arrow_schema(&metadata, &opts)
236 });
237
238 Self {
239 inner: Mutex::new(inner),
240 part_duration: part_duration.unwrap_or(INITIAL_TIME_WINDOW),
241 metadata,
242 builder,
243 primary_key_codec,
244 bulk_schema,
245 }
246 }
247
248 pub fn write(&self, kvs: &KeyValues) -> Result<()> {
252 if let Some(bulk_schema) = &self.bulk_schema {
253 let mut converter = BulkPartConverter::new(
254 &self.metadata,
255 bulk_schema.clone(),
256 kvs.num_rows(),
257 self.primary_key_codec.clone(),
258 true,
260 );
261 converter.append_key_values(kvs)?;
262 let part = converter.convert()?;
263
264 return self.write_bulk(part);
265 }
266
267 let parts = self.list_partitions();
269
270 for part in parts.iter().rev() {
273 let mut all_in_partition = true;
274 for kv in kvs.iter() {
275 let ts = kv.timestamp().as_timestamp().unwrap().unwrap();
277 if !part.contains_timestamp(ts) {
278 all_in_partition = false;
279 break;
280 }
281 }
282 if !all_in_partition {
283 continue;
284 }
285
286 return part.write(kvs);
288 }
289
290 self.write_multi_parts(kvs, &parts)
292 }
293
294 pub fn write_bulk(&self, part: BulkPart) -> Result<()> {
295 let time_type = self
296 .metadata
297 .time_index_column()
298 .column_schema
299 .data_type
300 .as_timestamp()
301 .unwrap();
302
303 let parts = self.list_partitions();
305 let (matching_parts, missing_parts) = self.find_partitions_by_time_range(
306 part.timestamps(),
307 &parts,
308 time_type.create_timestamp(part.min_ts),
309 time_type.create_timestamp(part.max_ts),
310 )?;
311
312 if matching_parts.len() == 1 && missing_parts.is_empty() {
313 return matching_parts[0].write_record_batch(part);
315 }
316
317 for matching in matching_parts {
318 matching.write_record_batch_partial(&part)?
319 }
320
321 for missing in missing_parts {
322 let new_part = {
323 let mut inner = self.inner.lock().unwrap();
324 self.get_or_create_time_partition(missing, &mut inner)?
325 };
326 new_part.write_record_batch_partial(&part)?;
327 }
328 Ok(())
329 }
330
331 fn get_or_create_time_partition(
334 &self,
335 part_start: Timestamp,
336 inner: &mut MutexGuard<PartitionsInner>,
337 ) -> Result<TimePartition> {
338 let part_pos = match inner
339 .parts
340 .iter()
341 .position(|part| part.time_range.min_timestamp == part_start)
342 {
343 Some(pos) => pos,
344 None => {
345 let range = PartTimeRange::from_start_duration(part_start, self.part_duration)
346 .with_context(|| InvalidRequestSnafu {
347 region_id: self.metadata.region_id,
348 reason: format!(
349 "Partition time range for {part_start:?} is out of bound, bucket size: {:?}", self.part_duration
350 ),
351 })?;
352 let memtable = self
353 .builder
354 .build(inner.alloc_memtable_id(), &self.metadata);
355 debug!(
356 "Create time partition {:?} for region {}, duration: {:?}, memtable_id: {}, parts_total: {}",
357 range,
358 self.metadata.region_id,
359 self.part_duration,
360 memtable.id(),
361 inner.parts.len() + 1
362 );
363 let pos = inner.parts.len();
364 inner.parts.push(TimePartition {
365 memtable,
366 time_range: range,
367 });
368 pos
369 }
370 };
371 Ok(inner.parts[part_pos].clone())
372 }
373
374 pub fn list_memtables(&self, memtables: &mut Vec<MemtableRef>) {
376 let inner = self.inner.lock().unwrap();
377 memtables.extend(inner.parts.iter().map(|part| part.memtable.clone()));
378 }
379
380 pub fn num_partitions(&self) -> usize {
382 let inner = self.inner.lock().unwrap();
383 inner.parts.len()
384 }
385
386 pub fn is_empty(&self) -> bool {
388 let inner = self.inner.lock().unwrap();
389 inner.parts.iter().all(|part| part.memtable.is_empty())
390 }
391
392 pub fn freeze(&self) -> Result<()> {
394 let inner = self.inner.lock().unwrap();
395 for part in &*inner.parts {
396 part.memtable.freeze()?;
397 }
398 Ok(())
399 }
400
401 pub fn fork(&self, metadata: &RegionMetadataRef, part_duration: Option<Duration>) -> Self {
403 let part_duration = part_duration.unwrap_or(self.part_duration);
405
406 let mut inner = self.inner.lock().unwrap();
407 let latest_part = inner
408 .parts
409 .iter()
410 .max_by_key(|part| part.time_range.min_timestamp)
411 .cloned();
412
413 let Some(old_part) = latest_part else {
414 return Self::new(
416 metadata.clone(),
417 self.builder.clone(),
418 inner.next_memtable_id,
419 Some(part_duration),
420 );
421 };
422
423 let old_stats = old_part.memtable.stats();
424 let partitions_inner = old_stats
426 .time_range()
427 .and_then(|(_, old_stats_end_timestamp)| {
428 partition_start_timestamp(old_stats_end_timestamp, part_duration)
429 .and_then(|start| PartTimeRange::from_start_duration(start, part_duration))
430 })
431 .map(|part_time_range| {
432 let memtable = old_part.memtable.fork(inner.alloc_memtable_id(), metadata);
434 let part = TimePartition {
435 memtable,
436 time_range: part_time_range,
437 };
438 PartitionsInner::with_partition(part, inner.next_memtable_id)
439 })
440 .unwrap_or_else(|| PartitionsInner::new(inner.next_memtable_id));
441
442 Self {
443 inner: Mutex::new(partitions_inner),
444 part_duration,
445 metadata: metadata.clone(),
446 builder: self.builder.clone(),
447 primary_key_codec: self.primary_key_codec.clone(),
448 bulk_schema: self.bulk_schema.clone(),
449 }
450 }
451
452 pub(crate) fn part_duration(&self) -> Duration {
454 self.part_duration
455 }
456
457 pub(crate) fn memory_usage(&self) -> usize {
459 let inner = self.inner.lock().unwrap();
460 inner
461 .parts
462 .iter()
463 .map(|part| part.memtable.stats().estimated_bytes)
464 .sum()
465 }
466
467 pub(crate) fn num_rows(&self) -> u64 {
469 let inner = self.inner.lock().unwrap();
470 inner
471 .parts
472 .iter()
473 .map(|part| part.memtable.stats().num_rows as u64)
474 .sum()
475 }
476
477 pub(crate) fn list_memtables_to_small_vec(&self, memtables: &mut SmallMemtableVec) {
479 let inner = self.inner.lock().unwrap();
480 memtables.extend(inner.parts.iter().map(|part| part.memtable.clone()));
481 }
482
483 pub(crate) fn next_memtable_id(&self) -> MemtableId {
485 let inner = self.inner.lock().unwrap();
486 inner.next_memtable_id
487 }
488
489 pub(crate) fn new_with_part_duration(&self, part_duration: Option<Duration>) -> Self {
492 debug_assert!(self.is_empty());
493
494 Self::new(
495 self.metadata.clone(),
496 self.builder.clone(),
497 self.next_memtable_id(),
498 Some(part_duration.unwrap_or(self.part_duration)),
499 )
500 }
501
502 fn list_partitions(&self) -> PartitionVec {
504 let inner = self.inner.lock().unwrap();
505 inner.parts.clone()
506 }
507
508 fn find_partitions_by_time_range<'a>(
511 &self,
512 ts_array: &ArrayRef,
513 existing_parts: &'a [TimePartition],
514 min: Timestamp,
515 max: Timestamp,
516 ) -> Result<(Vec<&'a TimePartition>, Vec<Timestamp>)> {
517 let mut matching = Vec::new();
518
519 let mut present = HashSet::new();
520 for part in existing_parts {
522 let part_time_range = &part.time_range;
523 if !(max < part_time_range.min_timestamp || min >= part_time_range.max_timestamp) {
524 matching.push(part);
525 present.insert(part_time_range.min_timestamp.value());
526 }
527 }
528
529 let part_duration = self.part_duration_or_default();
531 let timestamp_unit = self.metadata.time_index_type().unit();
532
533 let part_duration_sec = part_duration.as_secs() as i64;
534 let start_bucket = min
536 .convert_to(TimeUnit::Second)
537 .unwrap()
538 .value()
539 .div_euclid(part_duration_sec);
540 let end_bucket = max
541 .convert_to(TimeUnit::Second)
542 .unwrap()
543 .value()
544 .div_euclid(part_duration_sec);
545 let bucket_num = (end_bucket - start_bucket + 1) as usize;
546
547 let num_timestamps = ts_array.len();
548 let missing = if bucket_num <= num_timestamps {
549 (start_bucket..=end_bucket)
550 .filter_map(|start_sec| {
551 let Some(timestamp) = Timestamp::new_second(start_sec * part_duration_sec)
552 .convert_to(timestamp_unit)
553 else {
554 return Some(
555 InvalidRequestSnafu {
556 region_id: self.metadata.region_id,
557 reason: format!("Timestamp out of range: {}", start_sec),
558 }
559 .fail(),
560 );
561 };
562 if present.insert(timestamp.value()) {
563 Some(Ok(timestamp))
564 } else {
565 None
566 }
567 })
568 .collect::<Result<Vec<_>>>()?
569 } else {
570 let ts_primitive = match ts_array.data_type() {
571 DataType::Timestamp(unit, _) => match unit {
572 arrow::datatypes::TimeUnit::Second => ts_array
573 .as_any()
574 .downcast_ref::<TimestampSecondArray>()
575 .unwrap()
576 .reinterpret_cast::<Int64Type>(),
577 arrow::datatypes::TimeUnit::Millisecond => ts_array
578 .as_any()
579 .downcast_ref::<TimestampMillisecondArray>()
580 .unwrap()
581 .reinterpret_cast::<Int64Type>(),
582 arrow::datatypes::TimeUnit::Microsecond => ts_array
583 .as_any()
584 .downcast_ref::<TimestampMicrosecondArray>()
585 .unwrap()
586 .reinterpret_cast::<Int64Type>(),
587 arrow::datatypes::TimeUnit::Nanosecond => ts_array
588 .as_any()
589 .downcast_ref::<TimestampNanosecondArray>()
590 .unwrap()
591 .reinterpret_cast::<Int64Type>(),
592 },
593 _ => unreachable!(),
594 };
595
596 ts_primitive
597 .values()
598 .iter()
599 .filter_map(|ts| {
600 let ts = self.metadata.time_index_type().create_timestamp(*ts);
601 let Some(bucket_start) = ts
602 .convert_to(TimeUnit::Second)
603 .and_then(|ts| ts.align_by_bucket(part_duration_sec))
604 .and_then(|ts| ts.convert_to(timestamp_unit))
605 else {
606 return Some(
607 InvalidRequestSnafu {
608 region_id: self.metadata.region_id,
609 reason: format!("Timestamp out of range: {:?}", ts),
610 }
611 .fail(),
612 );
613 };
614 if present.insert(bucket_start.value()) {
615 Some(Ok(bucket_start))
616 } else {
617 None
618 }
619 })
620 .collect::<Result<Vec<_>>>()?
621 };
622 Ok((matching, missing))
623 }
624
625 fn part_duration_or_default(&self) -> Duration {
627 self.part_duration
628 }
629
630 fn write_multi_parts(&self, kvs: &KeyValues, parts: &PartitionVec) -> Result<()> {
632 let mut parts_to_write = HashMap::new();
633 let mut missing_parts = HashMap::new();
634 for kv in kvs.iter() {
635 let mut part_found = false;
636 let ts = kv.timestamp().as_timestamp().unwrap().unwrap();
638 for part in parts {
639 if part.contains_timestamp(ts) {
640 parts_to_write
641 .entry(part.time_range.min_timestamp)
642 .or_insert_with(|| PartitionToWrite {
643 partition: part.clone(),
644 key_values: Vec::new(),
645 })
646 .key_values
647 .push(kv);
648 part_found = true;
649 break;
650 }
651 }
652
653 if !part_found {
654 let part_duration = self.part_duration_or_default();
657 let part_start =
658 partition_start_timestamp(ts, part_duration).with_context(|| {
659 InvalidRequestSnafu {
660 region_id: self.metadata.region_id,
661 reason: format!(
662 "timestamp {ts:?} and bucket {part_duration:?} are out of range"
663 ),
664 }
665 })?;
666 missing_parts
667 .entry(part_start)
668 .or_insert_with(Vec::new)
669 .push(kv);
670 }
671 }
672
673 for part_to_write in parts_to_write.into_values() {
675 for kv in part_to_write.key_values {
676 part_to_write.partition.memtable.write_one(kv)?;
677 }
678 }
679
680 let mut inner = self.inner.lock().unwrap();
683 for (part_start, key_values) in missing_parts {
684 let partition = self.get_or_create_time_partition(part_start, &mut inner)?;
685 for kv in key_values {
686 partition.memtable.write_one(kv)?;
687 }
688 }
689
690 Ok(())
691 }
692
693 pub(crate) fn series_count(&self) -> usize {
695 self.inner.lock().unwrap().series_count()
696 }
697}
698
699fn partition_start_timestamp(ts: Timestamp, bucket: Duration) -> Option<Timestamp> {
703 let ts_sec = ts.convert_to(TimeUnit::Second).unwrap();
705 let bucket_sec: i64 = bucket.as_secs().try_into().ok()?;
706 let start_sec = ts_sec.align_by_bucket(bucket_sec)?;
707 start_sec.convert_to(ts.unit())
708}
709
710#[derive(Debug)]
711struct PartitionsInner {
712 parts: PartitionVec,
714 next_memtable_id: MemtableId,
716}
717
718impl PartitionsInner {
719 fn new(next_memtable_id: MemtableId) -> Self {
720 Self {
721 parts: Default::default(),
722 next_memtable_id,
723 }
724 }
725
726 fn with_partition(part: TimePartition, next_memtable_id: MemtableId) -> Self {
727 Self {
728 parts: smallvec![part],
729 next_memtable_id,
730 }
731 }
732
733 fn alloc_memtable_id(&mut self) -> MemtableId {
734 let id = self.next_memtable_id;
735 self.next_memtable_id += 1;
736 id
737 }
738
739 pub(crate) fn series_count(&self) -> usize {
740 self.parts
741 .iter()
742 .map(|p| p.memtable.stats().series_count)
743 .sum()
744 }
745}
746
747#[derive(Debug, Clone, Copy)]
749struct PartTimeRange {
750 min_timestamp: Timestamp,
752 max_timestamp: Timestamp,
754}
755
756impl PartTimeRange {
757 fn from_start_duration(start: Timestamp, duration: Duration) -> Option<Self> {
758 let start_sec = start.convert_to(TimeUnit::Second)?;
759 let end_sec = start_sec.add_duration(duration).ok()?;
760 let min_timestamp = start_sec.convert_to(start.unit())?;
761 let max_timestamp = end_sec.convert_to(start.unit())?;
762
763 Some(Self {
764 min_timestamp,
765 max_timestamp,
766 })
767 }
768
769 fn contains_timestamp(&self, ts: Timestamp) -> bool {
771 self.min_timestamp <= ts && ts < self.max_timestamp
772 }
773}
774
775struct PartitionToWrite<'a> {
776 partition: TimePartition,
777 key_values: Vec<KeyValue<'a>>,
778}
779
780#[cfg(test)]
781mod tests {
782 use std::sync::Arc;
783
784 use api::v1::SemanticType;
785 use datatypes::arrow::array::{ArrayRef, StringArray, TimestampMillisecondArray};
786 use datatypes::arrow::datatypes::{DataType, Field, Schema};
787 use datatypes::arrow::record_batch::RecordBatch;
788 use datatypes::prelude::ConcreteDataType;
789 use datatypes::schema::ColumnSchema;
790 use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
791 use store_api::storage::SequenceNumber;
792
793 use super::*;
794 use crate::memtable::partition_tree::PartitionTreeMemtableBuilder;
795 use crate::memtable::time_series::TimeSeriesMemtableBuilder;
796 use crate::test_util::memtable_util::{self, collect_iter_timestamps};
797
798 #[test]
799 fn test_no_duration() {
800 let metadata = memtable_util::metadata_for_test();
801 let builder = Arc::new(PartitionTreeMemtableBuilder::default());
802 let partitions = TimePartitions::new(metadata.clone(), builder, 0, None);
803 assert_eq!(0, partitions.num_partitions());
804 assert!(partitions.is_empty());
805
806 let kvs = memtable_util::build_key_values(
807 &metadata,
808 "hello".to_string(),
809 0,
810 &[1000, 3000, 7000, 5000, 6000],
811 0, );
813 partitions.write(&kvs).unwrap();
814
815 assert_eq!(1, partitions.num_partitions());
816 assert!(!partitions.is_empty());
817 let mut memtables = Vec::new();
818 partitions.list_memtables(&mut memtables);
819 assert_eq!(0, memtables[0].id());
820
821 let iter = memtables[0].iter(None, None, None).unwrap();
822 let timestamps = collect_iter_timestamps(iter);
823 assert_eq!(&[1000, 3000, 5000, 6000, 7000], ×tamps[..]);
824 }
825
826 #[test]
827 fn test_write_single_part() {
828 let metadata = memtable_util::metadata_for_test();
829 let builder = Arc::new(PartitionTreeMemtableBuilder::default());
830 let partitions =
831 TimePartitions::new(metadata.clone(), builder, 0, Some(Duration::from_secs(10)));
832 assert_eq!(0, partitions.num_partitions());
833
834 let kvs = memtable_util::build_key_values(
835 &metadata,
836 "hello".to_string(),
837 0,
838 &[5000, 2000, 0],
839 0, );
841 partitions.write(&kvs).unwrap();
843 assert_eq!(1, partitions.num_partitions());
844 assert!(!partitions.is_empty());
845
846 let kvs = memtable_util::build_key_values(
847 &metadata,
848 "hello".to_string(),
849 0,
850 &[3000, 7000, 4000],
851 3, );
853 partitions.write(&kvs).unwrap();
855 assert_eq!(1, partitions.num_partitions());
856
857 let mut memtables = Vec::new();
858 partitions.list_memtables(&mut memtables);
859 let iter = memtables[0].iter(None, None, None).unwrap();
860 let timestamps = collect_iter_timestamps(iter);
861 assert_eq!(&[0, 2000, 3000, 4000, 5000, 7000], ×tamps[..]);
862 let parts = partitions.list_partitions();
863 assert_eq!(
864 Timestamp::new_millisecond(0),
865 parts[0].time_range.min_timestamp
866 );
867 assert_eq!(
868 Timestamp::new_millisecond(10000),
869 parts[0].time_range.max_timestamp
870 );
871 }
872
873 #[cfg(test)]
874 fn new_multi_partitions(metadata: &RegionMetadataRef) -> TimePartitions {
875 let builder = Arc::new(PartitionTreeMemtableBuilder::default());
876 let partitions =
877 TimePartitions::new(metadata.clone(), builder, 0, Some(Duration::from_secs(5)));
878 assert_eq!(0, partitions.num_partitions());
879
880 let kvs = memtable_util::build_key_values(
881 metadata,
882 "hello".to_string(),
883 0,
884 &[2000, 0],
885 0, );
887 partitions.write(&kvs).unwrap();
889 assert_eq!(1, partitions.num_partitions());
890 assert!(!partitions.is_empty());
891
892 let kvs = memtable_util::build_key_values(
893 metadata,
894 "hello".to_string(),
895 0,
896 &[3000, 7000, 4000, 5000],
897 2, );
899 partitions.write(&kvs).unwrap();
901 assert_eq!(2, partitions.num_partitions());
902
903 partitions
904 }
905
906 #[test]
907 fn test_write_multi_parts() {
908 let metadata = memtable_util::metadata_for_test();
909 let partitions = new_multi_partitions(&metadata);
910
911 let parts = partitions.list_partitions();
912 let iter = parts[0].memtable.iter(None, None, None).unwrap();
913 let timestamps = collect_iter_timestamps(iter);
914 assert_eq!(0, parts[0].memtable.id());
915 assert_eq!(
916 Timestamp::new_millisecond(0),
917 parts[0].time_range.min_timestamp
918 );
919 assert_eq!(
920 Timestamp::new_millisecond(5000),
921 parts[0].time_range.max_timestamp
922 );
923 assert_eq!(&[0, 2000, 3000, 4000], ×tamps[..]);
924 let iter = parts[1].memtable.iter(None, None, None).unwrap();
925 assert_eq!(1, parts[1].memtable.id());
926 let timestamps = collect_iter_timestamps(iter);
927 assert_eq!(&[5000, 7000], ×tamps[..]);
928 assert_eq!(
929 Timestamp::new_millisecond(5000),
930 parts[1].time_range.min_timestamp
931 );
932 assert_eq!(
933 Timestamp::new_millisecond(10000),
934 parts[1].time_range.max_timestamp
935 );
936 }
937
938 #[test]
939 fn test_new_with_part_duration() {
940 let metadata = memtable_util::metadata_for_test();
941 let builder = Arc::new(PartitionTreeMemtableBuilder::default());
942 let partitions = TimePartitions::new(metadata.clone(), builder.clone(), 0, None);
943
944 let new_parts = partitions.new_with_part_duration(Some(Duration::from_secs(5)));
945 assert_eq!(Duration::from_secs(5), new_parts.part_duration());
946 assert_eq!(0, new_parts.next_memtable_id());
947
948 let new_parts = new_parts.new_with_part_duration(None);
950 assert_eq!(Duration::from_secs(5), new_parts.part_duration());
951 assert_eq!(0, new_parts.next_memtable_id());
953
954 let new_parts = new_parts.new_with_part_duration(Some(Duration::from_secs(10)));
955 assert_eq!(Duration::from_secs(10), new_parts.part_duration());
956 assert_eq!(0, new_parts.next_memtable_id());
958
959 let builder = Arc::new(PartitionTreeMemtableBuilder::default());
960 let partitions = TimePartitions::new(metadata.clone(), builder.clone(), 0, None);
961 let new_parts = partitions.new_with_part_duration(None);
963 assert_eq!(INITIAL_TIME_WINDOW, new_parts.part_duration());
964 assert_eq!(0, new_parts.next_memtable_id());
965 }
966
967 #[test]
968 fn test_fork_empty() {
969 let metadata = memtable_util::metadata_for_test();
970 let builder = Arc::new(PartitionTreeMemtableBuilder::default());
971 let partitions = TimePartitions::new(metadata.clone(), builder, 0, None);
972 partitions.freeze().unwrap();
973 let new_parts = partitions.fork(&metadata, None);
974 assert_eq!(INITIAL_TIME_WINDOW, new_parts.part_duration());
975 assert!(new_parts.list_partitions().is_empty());
976 assert_eq!(0, new_parts.next_memtable_id());
977
978 new_parts.freeze().unwrap();
979 let new_parts = new_parts.fork(&metadata, Some(Duration::from_secs(5)));
980 assert_eq!(Duration::from_secs(5), new_parts.part_duration());
981 assert!(new_parts.list_partitions().is_empty());
982 assert_eq!(0, new_parts.next_memtable_id());
983
984 new_parts.freeze().unwrap();
985 let new_parts = new_parts.fork(&metadata, None);
986 assert_eq!(Duration::from_secs(5), new_parts.part_duration());
988 assert!(new_parts.list_partitions().is_empty());
989 assert_eq!(0, new_parts.next_memtable_id());
990
991 new_parts.freeze().unwrap();
992 let new_parts = new_parts.fork(&metadata, Some(Duration::from_secs(10)));
993 assert_eq!(Duration::from_secs(10), new_parts.part_duration());
994 assert!(new_parts.list_partitions().is_empty());
995 assert_eq!(0, new_parts.next_memtable_id());
996 }
997
998 #[test]
999 fn test_fork_non_empty_none() {
1000 let metadata = memtable_util::metadata_for_test();
1001 let partitions = new_multi_partitions(&metadata);
1002 partitions.freeze().unwrap();
1003
1004 let new_parts = partitions.fork(&metadata, None);
1006 assert!(new_parts.is_empty());
1007 assert_eq!(Duration::from_secs(5), new_parts.part_duration());
1008 assert_eq!(2, new_parts.list_partitions()[0].memtable.id());
1009 assert_eq!(3, new_parts.next_memtable_id());
1010
1011 let new_parts = partitions.fork(&metadata, Some(Duration::from_secs(10)));
1013 assert!(new_parts.is_empty());
1014 assert_eq!(Duration::from_secs(10), new_parts.part_duration());
1015 assert_eq!(3, new_parts.list_partitions()[0].memtable.id());
1016 assert_eq!(4, new_parts.next_memtable_id());
1017 }
1018
1019 #[test]
1020 fn test_find_partitions_by_time_range() {
1021 let metadata = memtable_util::metadata_for_test();
1022 let builder = Arc::new(PartitionTreeMemtableBuilder::default());
1023
1024 let partitions = TimePartitions::new(metadata.clone(), builder.clone(), 0, None);
1026 let parts = partitions.list_partitions();
1027 let (matching, missing) = partitions
1028 .find_partitions_by_time_range(
1029 &(Arc::new(TimestampMillisecondArray::from_iter_values(1000..=2000)) as ArrayRef),
1030 &parts,
1031 Timestamp::new_millisecond(1000),
1032 Timestamp::new_millisecond(2000),
1033 )
1034 .unwrap();
1035 assert_eq!(matching.len(), 0);
1036 assert_eq!(missing.len(), 1);
1037 assert_eq!(missing[0], Timestamp::new_millisecond(0));
1038
1039 let partitions = TimePartitions::new(
1041 metadata.clone(),
1042 builder.clone(),
1043 0,
1044 Some(Duration::from_secs(5)),
1045 );
1046
1047 let kvs =
1049 memtable_util::build_key_values(&metadata, "hello".to_string(), 0, &[2000, 4000], 0);
1050 partitions.write(&kvs).unwrap();
1051 let kvs =
1052 memtable_util::build_key_values(&metadata, "hello".to_string(), 0, &[7000, 8000], 2);
1053 partitions.write(&kvs).unwrap();
1054
1055 let parts = partitions.list_partitions();
1056 assert_eq!(2, parts.len());
1057
1058 let (matching, missing) = partitions
1060 .find_partitions_by_time_range(
1061 &(Arc::new(TimestampMillisecondArray::from_iter_values(2000..=4000)) as ArrayRef),
1062 &parts,
1063 Timestamp::new_millisecond(2000),
1064 Timestamp::new_millisecond(4000),
1065 )
1066 .unwrap();
1067 assert_eq!(matching.len(), 1);
1068 assert!(missing.is_empty());
1069 assert_eq!(matching[0].time_range.min_timestamp.value(), 0);
1070
1071 let (matching, missing) = partitions
1073 .find_partitions_by_time_range(
1074 &(Arc::new(TimestampMillisecondArray::from_iter_values(3000..=8000)) as ArrayRef),
1075 &parts,
1076 Timestamp::new_millisecond(3000),
1077 Timestamp::new_millisecond(8000),
1078 )
1079 .unwrap();
1080 assert_eq!(matching.len(), 2);
1081 assert!(missing.is_empty());
1082 assert_eq!(matching[0].time_range.min_timestamp.value(), 0);
1083 assert_eq!(matching[1].time_range.min_timestamp.value(), 5000);
1084
1085 let (matching, missing) = partitions
1087 .find_partitions_by_time_range(
1088 &(Arc::new(TimestampMillisecondArray::from_iter_values(12000..=13000)) as ArrayRef),
1089 &parts,
1090 Timestamp::new_millisecond(12000),
1091 Timestamp::new_millisecond(13000),
1092 )
1093 .unwrap();
1094 assert!(matching.is_empty());
1095 assert_eq!(missing.len(), 1);
1096 assert_eq!(missing[0].value(), 10000);
1097
1098 let (matching, missing) = partitions
1100 .find_partitions_by_time_range(
1101 &(Arc::new(TimestampMillisecondArray::from_iter_values(4000..=6000)) as ArrayRef),
1102 &parts,
1103 Timestamp::new_millisecond(4000),
1104 Timestamp::new_millisecond(6000),
1105 )
1106 .unwrap();
1107 assert_eq!(matching.len(), 2);
1108 assert!(missing.is_empty());
1109 assert_eq!(matching[0].time_range.min_timestamp.value(), 0);
1110 assert_eq!(matching[1].time_range.min_timestamp.value(), 5000);
1111
1112 let (matching, missing) = partitions
1114 .find_partitions_by_time_range(
1115 &(Arc::new(TimestampMillisecondArray::from_iter_values(4999..=5000)) as ArrayRef),
1116 &parts,
1117 Timestamp::new_millisecond(4999),
1118 Timestamp::new_millisecond(5000),
1119 )
1120 .unwrap();
1121 assert_eq!(matching.len(), 2);
1122 assert!(missing.is_empty());
1123 assert_eq!(matching[0].time_range.min_timestamp.value(), 0);
1124 assert_eq!(matching[1].time_range.min_timestamp.value(), 5000);
1125
1126 let (matching, missing) = partitions
1128 .find_partitions_by_time_range(
1129 &(Arc::new(TimestampMillisecondArray::from_iter_values(9999..=10000)) as ArrayRef),
1130 &parts,
1131 Timestamp::new_millisecond(9999),
1132 Timestamp::new_millisecond(10000),
1133 )
1134 .unwrap();
1135 assert_eq!(matching.len(), 1);
1136 assert_eq!(1, missing.len());
1137 assert_eq!(matching[0].time_range.min_timestamp.value(), 5000);
1138 assert_eq!(missing[0].value(), 10000);
1139
1140 let (matching, missing) = partitions
1142 .find_partitions_by_time_range(
1143 &(Arc::new(TimestampMillisecondArray::from_iter_values(-1000..=1000)) as ArrayRef),
1144 &parts,
1145 Timestamp::new_millisecond(-1000),
1146 Timestamp::new_millisecond(1000),
1147 )
1148 .unwrap();
1149 assert_eq!(matching.len(), 1);
1150 assert_eq!(matching[0].time_range.min_timestamp.value(), 0);
1151 assert_eq!(1, missing.len());
1152 assert_eq!(missing[0].value(), -5000);
1153
1154 let (matching, missing) = partitions
1156 .find_partitions_by_time_range(
1157 &(Arc::new(TimestampMillisecondArray::from(vec![
1158 -100000000000,
1159 0,
1160 100000000000,
1161 ])) as ArrayRef),
1162 &parts,
1163 Timestamp::new_millisecond(-100000000000),
1164 Timestamp::new_millisecond(100000000000),
1165 )
1166 .unwrap();
1167 assert_eq!(2, matching.len());
1168 assert_eq!(matching[0].time_range.min_timestamp.value(), 0);
1169 assert_eq!(matching[1].time_range.min_timestamp.value(), 5000);
1170 assert_eq!(2, missing.len());
1171 assert_eq!(missing[0].value(), -100000000000);
1172 assert_eq!(missing[1].value(), 100000000000);
1173 }
1174
1175 fn build_part(ts: &[i64], sequence: SequenceNumber) -> BulkPart {
1176 let schema = Arc::new(Schema::new(vec![
1177 Field::new(
1178 "ts",
1179 DataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
1180 false,
1181 ),
1182 Field::new("val", DataType::Utf8, true),
1183 ]));
1184 let ts_data = ts.to_vec();
1185 let ts_array = Arc::new(TimestampMillisecondArray::from(ts_data));
1186 let val_array = Arc::new(StringArray::from_iter_values(
1187 ts.iter().map(|v| v.to_string()),
1188 ));
1189 let batch = RecordBatch::try_new(
1190 schema,
1191 vec![ts_array.clone() as ArrayRef, val_array.clone() as ArrayRef],
1192 )
1193 .unwrap();
1194 let max_ts = ts.iter().max().copied().unwrap();
1195 let min_ts = ts.iter().min().copied().unwrap();
1196 BulkPart {
1197 batch,
1198 max_ts,
1199 min_ts,
1200 sequence,
1201 timestamp_index: 0,
1202 raw_data: None,
1203 }
1204 }
1205
1206 #[test]
1207 fn test_write_bulk() {
1208 let mut metadata_builder = RegionMetadataBuilder::new(0.into());
1209 metadata_builder
1210 .push_column_metadata(ColumnMetadata {
1211 column_schema: ColumnSchema::new(
1212 "ts",
1213 ConcreteDataType::timestamp_millisecond_datatype(),
1214 false,
1215 ),
1216 semantic_type: SemanticType::Timestamp,
1217 column_id: 0,
1218 })
1219 .push_column_metadata(ColumnMetadata {
1220 column_schema: ColumnSchema::new("val", ConcreteDataType::string_datatype(), false),
1221 semantic_type: SemanticType::Field,
1222 column_id: 1,
1223 })
1224 .primary_key(vec![]);
1225 let metadata = Arc::new(metadata_builder.build().unwrap());
1226
1227 let builder = Arc::new(TimeSeriesMemtableBuilder::default());
1228 let partitions = TimePartitions::new(
1229 metadata.clone(),
1230 builder.clone(),
1231 0,
1232 Some(Duration::from_secs(5)),
1233 );
1234
1235 partitions
1237 .write_bulk(build_part(&[1000, 2000, 3000], 0))
1238 .unwrap();
1239
1240 let parts = partitions.list_partitions();
1241 assert_eq!(1, parts.len());
1242 let iter = parts[0].memtable.iter(None, None, None).unwrap();
1243 let timestamps = collect_iter_timestamps(iter);
1244 assert_eq!(&[1000, 2000, 3000], ×tamps[..]);
1245
1246 partitions
1248 .write_bulk(build_part(&[4000, 5000, 6000], 1))
1249 .unwrap();
1250 let parts = partitions.list_partitions();
1251 assert_eq!(2, parts.len());
1252 let iter = parts[0].memtable.iter(None, None, None).unwrap();
1254 let timestamps = collect_iter_timestamps(iter);
1255 assert_eq!(&[1000, 2000, 3000, 4000], ×tamps[..]);
1256 let iter = parts[1].memtable.iter(None, None, None).unwrap();
1258 let timestamps = collect_iter_timestamps(iter);
1259 assert_eq!(&[5000, 6000], ×tamps[..]);
1260
1261 partitions
1263 .write_bulk(build_part(&[11000, 12000], 3))
1264 .unwrap();
1265
1266 let parts = partitions.list_partitions();
1267 assert_eq!(3, parts.len());
1268
1269 let iter = parts[2].memtable.iter(None, None, None).unwrap();
1271 let timestamps = collect_iter_timestamps(iter);
1272 assert_eq!(&[11000, 12000], ×tamps[..]);
1273
1274 let partitions = TimePartitions::new(metadata.clone(), builder, 3, None);
1276
1277 partitions
1278 .write_bulk(build_part(&[1000, 5000, 9000], 4))
1279 .unwrap();
1280
1281 let parts = partitions.list_partitions();
1282 assert_eq!(1, parts.len());
1283 let iter = parts[0].memtable.iter(None, None, None).unwrap();
1284 let timestamps = collect_iter_timestamps(iter);
1285 assert_eq!(&[1000, 5000, 9000], ×tamps[..]);
1286 }
1287
1288 #[test]
1289 fn test_split_record_batch() {
1290 let schema = Arc::new(Schema::new(vec![
1291 Field::new(
1292 "ts",
1293 DataType::Timestamp(TimeUnit::Millisecond.as_arrow_time_unit(), None),
1294 false,
1295 ),
1296 Field::new("val", DataType::Utf8, true),
1297 ]));
1298
1299 let ts_array = Arc::new(TimestampMillisecondArray::from(vec![
1300 1000, 2000, 5000, 7000, 8000,
1301 ]));
1302 let val_array = Arc::new(StringArray::from(vec!["a", "b", "c", "d", "e"]));
1303 let batch = RecordBatch::try_new(
1304 schema.clone(),
1305 vec![ts_array as ArrayRef, val_array as ArrayRef],
1306 )
1307 .unwrap();
1308
1309 let part = BulkPart {
1310 batch,
1311 max_ts: 8000,
1312 min_ts: 1000,
1313 sequence: 0,
1314 timestamp_index: 0,
1315 raw_data: None,
1316 };
1317
1318 let result = filter_record_batch(&part, 1000, 2000).unwrap();
1319 assert!(result.is_some());
1320 let filtered = result.unwrap();
1321 assert_eq!(filtered.num_rows(), 1);
1322 assert_eq!(filtered.min_ts, 1000);
1323 assert_eq!(filtered.max_ts, 1000);
1324
1325 let result = filter_record_batch(&part, 3000, 6000).unwrap();
1327 assert!(result.is_some());
1328 let filtered = result.unwrap();
1329 assert_eq!(filtered.num_rows(), 1);
1330 assert_eq!(filtered.min_ts, 5000);
1331 assert_eq!(filtered.max_ts, 5000);
1332
1333 let result = filter_record_batch(&part, 3000, 4000).unwrap();
1335 assert!(result.is_none());
1336
1337 let result = filter_record_batch(&part, 0, 9000).unwrap();
1339 assert!(result.is_some());
1340 let filtered = result.unwrap();
1341 assert_eq!(filtered.num_rows(), 5);
1342 assert_eq!(filtered.min_ts, 1000);
1343 assert_eq!(filtered.max_ts, 8000);
1344 }
1345}