1use std::collections::{HashMap, HashSet};
18use std::sync::{Arc, Mutex, MutexGuard};
19use std::time::Duration;
20
21use common_telemetry::debug;
22use common_time::Timestamp;
23use common_time::timestamp::TimeUnit;
24use common_time::timestamp_millis::BucketAligned;
25use datatypes::arrow;
26use datatypes::arrow::array::{
27 ArrayRef, BooleanArray, RecordBatch, RecordBatchOptions, TimestampMicrosecondArray,
28 TimestampMillisecondArray, TimestampNanosecondArray, TimestampSecondArray,
29};
30use datatypes::arrow::buffer::{BooleanBuffer, MutableBuffer};
31use datatypes::arrow::datatypes::{DataType, Int64Type, SchemaRef};
32use mito_codec::key_values::KeyValue;
33use mito_codec::row_converter::{PrimaryKeyCodec, build_primary_key_codec};
34use smallvec::{SmallVec, smallvec};
35use snafu::{OptionExt, ResultExt};
36use store_api::metadata::RegionMetadataRef;
37
38use crate::error;
39use crate::error::{InvalidRequestSnafu, Result};
40use crate::memtable::bulk::part::{BulkPart, BulkPartConverter};
41use crate::memtable::version::SmallMemtableVec;
42use crate::memtable::{KeyValues, MemtableBuilderRef, MemtableId, MemtableRef};
43use crate::sst::{FlatSchemaOptions, to_flat_sst_arrow_schema};
44
45const INITIAL_TIME_WINDOW: Duration = Duration::from_days(1);
47
48#[derive(Debug, Clone)]
50pub struct TimePartition {
51 memtable: MemtableRef,
53 time_range: PartTimeRange,
55}
56
57impl TimePartition {
58 fn contains_timestamp(&self, ts: Timestamp) -> bool {
60 self.time_range.contains_timestamp(ts)
61 }
62
63 fn write(&self, kvs: &KeyValues) -> Result<()> {
65 self.memtable.write(kvs)
66 }
67
68 fn write_record_batch(&self, rb: BulkPart) -> Result<()> {
70 self.memtable.write_bulk(rb)
71 }
72
73 fn write_record_batch_partial(&self, part: &BulkPart) -> Result<()> {
75 let Some(filtered) = filter_record_batch(
76 part,
77 self.time_range.min_timestamp.value(),
78 self.time_range.max_timestamp.value(),
79 )?
80 else {
81 return Ok(());
82 };
83 self.write_record_batch(filtered)
84 }
85}
86
87macro_rules! create_filter_buffer {
88 ($ts_array:expr, $min:expr, $max:expr) => {{
89 let len = $ts_array.len();
90 let mut buffer = MutableBuffer::new(len.div_ceil(64) * 8);
91
92 let f = |idx: usize| -> bool {
93 unsafe {
95 let val = $ts_array.value_unchecked(idx);
96 val >= $min && val < $max
97 }
98 };
99
100 let chunks = len / 64;
101 let remainder = len % 64;
102
103 for chunk in 0..chunks {
104 let mut packed = 0;
105 for bit_idx in 0..64 {
106 let i = bit_idx + chunk * 64;
107 packed |= (f(i) as u64) << bit_idx;
108 }
109 unsafe { buffer.push_unchecked(packed) }
111 }
112
113 if remainder != 0 {
114 let mut packed = 0;
115 for bit_idx in 0..remainder {
116 let i = bit_idx + chunks * 64;
117 packed |= (f(i) as u64) << bit_idx;
118 }
119 unsafe { buffer.push_unchecked(packed) }
121 }
122
123 BooleanArray::new(BooleanBuffer::new(buffer.into(), 0, len), None)
124 }};
125}
126
127macro_rules! handle_timestamp_array {
128 ($ts_array:expr, $array_type:ty, $min:expr, $max:expr) => {{
129 let ts_array = $ts_array.as_any().downcast_ref::<$array_type>().unwrap();
130 let filter = create_filter_buffer!(ts_array, $min, $max);
131
132 let res = arrow::compute::filter(ts_array, &filter).context(error::ComputeArrowSnafu)?;
133 if res.is_empty() {
134 return Ok(None);
135 }
136
137 let i64array = res.as_any().downcast_ref::<$array_type>().unwrap();
138 let max_ts = arrow::compute::max(i64array).unwrap();
140 let min_ts = arrow::compute::min(i64array).unwrap();
141
142 (res, filter, min_ts, max_ts)
143 }};
144}
145
146pub fn filter_record_batch(part: &BulkPart, min: i64, max: i64) -> Result<Option<BulkPart>> {
149 let ts_array = part.timestamps();
150 let (ts_array, filter, min_ts, max_ts) = match ts_array.data_type() {
151 DataType::Timestamp(unit, _) => match unit {
152 arrow::datatypes::TimeUnit::Second => {
153 handle_timestamp_array!(ts_array, TimestampSecondArray, min, max)
154 }
155 arrow::datatypes::TimeUnit::Millisecond => {
156 handle_timestamp_array!(ts_array, TimestampMillisecondArray, min, max)
157 }
158 arrow::datatypes::TimeUnit::Microsecond => {
159 handle_timestamp_array!(ts_array, TimestampMicrosecondArray, min, max)
160 }
161 arrow::datatypes::TimeUnit::Nanosecond => {
162 handle_timestamp_array!(ts_array, TimestampNanosecondArray, min, max)
163 }
164 },
165 _ => {
166 unreachable!("Got data type: {:?}", ts_array.data_type());
167 }
168 };
169
170 let num_rows = ts_array.len();
171 let arrays = part
172 .batch
173 .columns()
174 .iter()
175 .enumerate()
176 .map(|(index, array)| {
177 if index == part.timestamp_index {
178 Ok(ts_array.clone())
179 } else {
180 arrow::compute::filter(&array, &filter).context(error::ComputeArrowSnafu)
181 }
182 })
183 .collect::<Result<Vec<_>>>()?;
184 let batch = RecordBatch::try_new_with_options(
185 part.batch.schema(),
186 arrays,
187 &RecordBatchOptions::default().with_row_count(Some(num_rows)),
188 )
189 .context(error::NewRecordBatchSnafu)?;
190 Ok(Some(BulkPart {
191 batch,
192 max_timestamp: max_ts,
193 min_timestamp: min_ts,
194 sequence: part.sequence,
195 timestamp_index: part.timestamp_index,
196 raw_data: None,
197 }))
198}
199
200type PartitionVec = SmallVec<[TimePartition; 2]>;
201
202#[derive(Debug)]
204pub struct TimePartitions {
205 inner: Mutex<PartitionsInner>,
207 part_duration: Duration,
209 metadata: RegionMetadataRef,
211 builder: MemtableBuilderRef,
213 primary_key_codec: Arc<dyn PrimaryKeyCodec>,
215
216 bulk_schema: Option<SchemaRef>,
219}
220
221pub type TimePartitionsRef = Arc<TimePartitions>;
222
223impl TimePartitions {
224 pub fn new(
226 metadata: RegionMetadataRef,
227 builder: MemtableBuilderRef,
228 next_memtable_id: MemtableId,
229 part_duration: Option<Duration>,
230 ) -> Self {
231 let inner = PartitionsInner::new(next_memtable_id);
232 let primary_key_codec = build_primary_key_codec(&metadata);
233 let bulk_schema = builder.use_bulk_insert(&metadata).then(|| {
234 let opts = FlatSchemaOptions::from_encoding(metadata.primary_key_encoding);
235 to_flat_sst_arrow_schema(&metadata, &opts)
236 });
237
238 Self {
239 inner: Mutex::new(inner),
240 part_duration: part_duration.unwrap_or(INITIAL_TIME_WINDOW),
241 metadata,
242 builder,
243 primary_key_codec,
244 bulk_schema,
245 }
246 }
247
248 pub fn write(&self, kvs: &KeyValues) -> Result<()> {
252 if let Some(bulk_schema) = &self.bulk_schema {
253 let mut converter = BulkPartConverter::new(
254 &self.metadata,
255 bulk_schema.clone(),
256 kvs.num_rows(),
257 self.primary_key_codec.clone(),
258 true,
260 );
261 converter.append_key_values(kvs)?;
262 let part = converter.convert()?;
263
264 return self.write_bulk_inner(part);
265 }
266
267 let parts = self.list_partitions();
269
270 for part in parts.iter().rev() {
273 let mut all_in_partition = true;
274 for kv in kvs.iter() {
275 let ts = kv.timestamp().try_into_timestamp().unwrap().unwrap();
277 if !part.contains_timestamp(ts) {
278 all_in_partition = false;
279 break;
280 }
281 }
282 if !all_in_partition {
283 continue;
284 }
285
286 return part.write(kvs);
288 }
289
290 self.write_multi_parts(kvs, &parts)
292 }
293
294 pub fn write_bulk(&self, part: BulkPart) -> Result<()> {
296 let part = if let Some(bulk_schema) = &self.bulk_schema {
298 let converted = crate::memtable::bulk::part::convert_bulk_part(
299 part,
300 &self.metadata,
301 self.primary_key_codec.clone(),
302 bulk_schema.clone(),
303 true,
305 )?;
306 match converted {
307 Some(p) => p,
308 None => return Ok(()),
309 }
310 } else {
311 part
312 };
313
314 self.write_bulk_inner(part)
315 }
316
317 fn write_bulk_inner(&self, part: BulkPart) -> Result<()> {
319 let time_type = self
320 .metadata
321 .time_index_column()
322 .column_schema
323 .data_type
324 .as_timestamp()
325 .unwrap();
326
327 let parts = self.list_partitions();
329 let (matching_parts, missing_parts) = self.find_partitions_by_time_range(
330 part.timestamps(),
331 &parts,
332 time_type.create_timestamp(part.min_timestamp),
333 time_type.create_timestamp(part.max_timestamp),
334 )?;
335
336 if matching_parts.len() == 1 && missing_parts.is_empty() {
337 return matching_parts[0].write_record_batch(part);
339 }
340
341 for matching in matching_parts {
342 matching.write_record_batch_partial(&part)?
343 }
344
345 for missing in missing_parts {
346 let new_part = {
347 let mut inner = self.inner.lock().unwrap();
348 self.get_or_create_time_partition(missing, &mut inner)?
349 };
350 new_part.write_record_batch_partial(&part)?;
351 }
352 Ok(())
353 }
354
355 fn get_or_create_time_partition(
358 &self,
359 part_start: Timestamp,
360 inner: &mut MutexGuard<PartitionsInner>,
361 ) -> Result<TimePartition> {
362 let part_pos = match inner
363 .parts
364 .iter()
365 .position(|part| part.time_range.min_timestamp == part_start)
366 {
367 Some(pos) => pos,
368 None => {
369 let range = PartTimeRange::from_start_duration(part_start, self.part_duration)
370 .with_context(|| InvalidRequestSnafu {
371 region_id: self.metadata.region_id,
372 reason: format!(
373 "Partition time range for {part_start:?} is out of bound, bucket size: {:?}", self.part_duration
374 ),
375 })?;
376 let memtable = self
377 .builder
378 .build(inner.alloc_memtable_id(), &self.metadata);
379 debug!(
380 "Create time partition {:?} for region {}, duration: {:?}, memtable_id: {}, parts_total: {}, metadata: {:?}",
381 range,
382 self.metadata.region_id,
383 self.part_duration,
384 memtable.id(),
385 inner.parts.len() + 1,
386 self.metadata,
387 );
388 let pos = inner.parts.len();
389 inner.parts.push(TimePartition {
390 memtable,
391 time_range: range,
392 });
393 pos
394 }
395 };
396 Ok(inner.parts[part_pos].clone())
397 }
398
399 pub fn list_memtables(&self, memtables: &mut Vec<MemtableRef>) {
401 let inner = self.inner.lock().unwrap();
402 memtables.extend(inner.parts.iter().map(|part| part.memtable.clone()));
403 }
404
405 pub fn num_partitions(&self) -> usize {
407 let inner = self.inner.lock().unwrap();
408 inner.parts.len()
409 }
410
411 pub fn is_empty(&self) -> bool {
413 let inner = self.inner.lock().unwrap();
414 inner.parts.iter().all(|part| part.memtable.is_empty())
415 }
416
417 pub fn freeze(&self) -> Result<()> {
419 let inner = self.inner.lock().unwrap();
420 for part in &*inner.parts {
421 part.memtable.freeze()?;
422 }
423 Ok(())
424 }
425
426 pub fn fork(&self, metadata: &RegionMetadataRef, part_duration: Option<Duration>) -> Self {
428 let part_duration = part_duration.unwrap_or(self.part_duration);
430
431 let mut inner = self.inner.lock().unwrap();
432 let latest_part = inner
433 .parts
434 .iter()
435 .max_by_key(|part| part.time_range.min_timestamp)
436 .cloned();
437
438 let Some(old_part) = latest_part else {
439 return Self::new(
441 metadata.clone(),
442 self.builder.clone(),
443 inner.next_memtable_id,
444 Some(part_duration),
445 );
446 };
447
448 let old_stats = old_part.memtable.stats();
449 let partitions_inner = old_stats
451 .time_range()
452 .and_then(|(_, old_stats_end_timestamp)| {
453 partition_start_timestamp(old_stats_end_timestamp, part_duration)
454 .and_then(|start| PartTimeRange::from_start_duration(start, part_duration))
455 })
456 .map(|part_time_range| {
457 let memtable = old_part.memtable.fork(inner.alloc_memtable_id(), metadata);
459 let part = TimePartition {
460 memtable,
461 time_range: part_time_range,
462 };
463 PartitionsInner::with_partition(part, inner.next_memtable_id)
464 })
465 .unwrap_or_else(|| PartitionsInner::new(inner.next_memtable_id));
466
467 Self {
468 inner: Mutex::new(partitions_inner),
469 part_duration,
470 metadata: metadata.clone(),
471 builder: self.builder.clone(),
472 primary_key_codec: self.primary_key_codec.clone(),
473 bulk_schema: self.bulk_schema.clone(),
474 }
475 }
476
477 pub(crate) fn part_duration(&self) -> Duration {
479 self.part_duration
480 }
481
482 pub(crate) fn memtable_builder(&self) -> &MemtableBuilderRef {
484 &self.builder
485 }
486
487 pub(crate) fn memory_usage(&self) -> usize {
489 let inner = self.inner.lock().unwrap();
490 inner
491 .parts
492 .iter()
493 .map(|part| part.memtable.stats().estimated_bytes)
494 .sum()
495 }
496
497 pub(crate) fn num_rows(&self) -> u64 {
499 let inner = self.inner.lock().unwrap();
500 inner
501 .parts
502 .iter()
503 .map(|part| part.memtable.stats().num_rows as u64)
504 .sum()
505 }
506
507 pub(crate) fn list_memtables_to_small_vec(&self, memtables: &mut SmallMemtableVec) {
509 let inner = self.inner.lock().unwrap();
510 memtables.extend(inner.parts.iter().map(|part| part.memtable.clone()));
511 }
512
513 pub(crate) fn next_memtable_id(&self) -> MemtableId {
515 let inner = self.inner.lock().unwrap();
516 inner.next_memtable_id
517 }
518
519 pub(crate) fn new_with_part_duration(
522 &self,
523 part_duration: Option<Duration>,
524 memtable_builder: Option<MemtableBuilderRef>,
525 ) -> Self {
526 debug_assert!(self.is_empty());
527
528 Self::new(
529 self.metadata.clone(),
530 memtable_builder.unwrap_or_else(|| self.builder.clone()),
531 self.next_memtable_id(),
532 Some(part_duration.unwrap_or(self.part_duration)),
533 )
534 }
535
536 fn list_partitions(&self) -> PartitionVec {
538 let inner = self.inner.lock().unwrap();
539 inner.parts.clone()
540 }
541
542 fn find_partitions_by_time_range<'a>(
545 &self,
546 ts_array: &ArrayRef,
547 existing_parts: &'a [TimePartition],
548 min: Timestamp,
549 max: Timestamp,
550 ) -> Result<(Vec<&'a TimePartition>, Vec<Timestamp>)> {
551 let mut matching = Vec::new();
552
553 let mut present = HashSet::new();
554 for part in existing_parts {
556 let part_time_range = &part.time_range;
557 if !(max < part_time_range.min_timestamp || min >= part_time_range.max_timestamp) {
558 matching.push(part);
559 present.insert(part_time_range.min_timestamp.value());
560 }
561 }
562
563 let part_duration = self.part_duration_or_default();
565 let timestamp_unit = self.metadata.time_index_type().unit();
566
567 let part_duration_sec = part_duration.as_secs() as i64;
568 let start_bucket = min
570 .convert_to(TimeUnit::Second)
571 .unwrap()
572 .value()
573 .div_euclid(part_duration_sec);
574 let end_bucket = max
575 .convert_to(TimeUnit::Second)
576 .unwrap()
577 .value()
578 .div_euclid(part_duration_sec);
579 let bucket_num = (end_bucket - start_bucket + 1) as usize;
580
581 let num_timestamps = ts_array.len();
582 let missing = if bucket_num <= num_timestamps {
583 (start_bucket..=end_bucket)
584 .filter_map(|start_sec| {
585 let Some(timestamp) = Timestamp::new_second(start_sec * part_duration_sec)
586 .convert_to(timestamp_unit)
587 else {
588 return Some(
589 InvalidRequestSnafu {
590 region_id: self.metadata.region_id,
591 reason: format!("Timestamp out of range: {}", start_sec),
592 }
593 .fail(),
594 );
595 };
596 if present.insert(timestamp.value()) {
597 Some(Ok(timestamp))
598 } else {
599 None
600 }
601 })
602 .collect::<Result<Vec<_>>>()?
603 } else {
604 let ts_primitive = match ts_array.data_type() {
605 DataType::Timestamp(unit, _) => match unit {
606 arrow::datatypes::TimeUnit::Second => ts_array
607 .as_any()
608 .downcast_ref::<TimestampSecondArray>()
609 .unwrap()
610 .reinterpret_cast::<Int64Type>(),
611 arrow::datatypes::TimeUnit::Millisecond => ts_array
612 .as_any()
613 .downcast_ref::<TimestampMillisecondArray>()
614 .unwrap()
615 .reinterpret_cast::<Int64Type>(),
616 arrow::datatypes::TimeUnit::Microsecond => ts_array
617 .as_any()
618 .downcast_ref::<TimestampMicrosecondArray>()
619 .unwrap()
620 .reinterpret_cast::<Int64Type>(),
621 arrow::datatypes::TimeUnit::Nanosecond => ts_array
622 .as_any()
623 .downcast_ref::<TimestampNanosecondArray>()
624 .unwrap()
625 .reinterpret_cast::<Int64Type>(),
626 },
627 _ => unreachable!(),
628 };
629
630 ts_primitive
631 .values()
632 .iter()
633 .filter_map(|ts| {
634 let ts = self.metadata.time_index_type().create_timestamp(*ts);
635 let Some(bucket_start) = ts
636 .convert_to(TimeUnit::Second)
637 .and_then(|ts| ts.align_by_bucket(part_duration_sec))
638 .and_then(|ts| ts.convert_to(timestamp_unit))
639 else {
640 return Some(
641 InvalidRequestSnafu {
642 region_id: self.metadata.region_id,
643 reason: format!("Timestamp out of range: {:?}", ts),
644 }
645 .fail(),
646 );
647 };
648 if present.insert(bucket_start.value()) {
649 Some(Ok(bucket_start))
650 } else {
651 None
652 }
653 })
654 .collect::<Result<Vec<_>>>()?
655 };
656 Ok((matching, missing))
657 }
658
659 fn part_duration_or_default(&self) -> Duration {
661 self.part_duration
662 }
663
664 fn write_multi_parts(&self, kvs: &KeyValues, parts: &PartitionVec) -> Result<()> {
666 let mut parts_to_write = HashMap::new();
667 let mut missing_parts = HashMap::new();
668 for kv in kvs.iter() {
669 let mut part_found = false;
670 let ts = kv.timestamp().try_into_timestamp().unwrap().unwrap();
672 for part in parts {
673 if part.contains_timestamp(ts) {
674 parts_to_write
675 .entry(part.time_range.min_timestamp)
676 .or_insert_with(|| PartitionToWrite {
677 partition: part.clone(),
678 key_values: Vec::new(),
679 })
680 .key_values
681 .push(kv);
682 part_found = true;
683 break;
684 }
685 }
686
687 if !part_found {
688 let part_duration = self.part_duration_or_default();
691 let part_start =
692 partition_start_timestamp(ts, part_duration).with_context(|| {
693 InvalidRequestSnafu {
694 region_id: self.metadata.region_id,
695 reason: format!(
696 "timestamp {ts:?} and bucket {part_duration:?} are out of range"
697 ),
698 }
699 })?;
700 missing_parts
701 .entry(part_start)
702 .or_insert_with(Vec::new)
703 .push(kv);
704 }
705 }
706
707 for part_to_write in parts_to_write.into_values() {
709 for kv in part_to_write.key_values {
710 part_to_write.partition.memtable.write_one(kv)?;
711 }
712 }
713
714 let mut inner = self.inner.lock().unwrap();
717 for (part_start, key_values) in missing_parts {
718 let partition = self.get_or_create_time_partition(part_start, &mut inner)?;
719 for kv in key_values {
720 partition.memtable.write_one(kv)?;
721 }
722 }
723
724 Ok(())
725 }
726
727 pub(crate) fn series_count(&self) -> usize {
729 self.inner.lock().unwrap().series_count()
730 }
731}
732
733fn partition_start_timestamp(ts: Timestamp, bucket: Duration) -> Option<Timestamp> {
737 let ts_sec = ts.convert_to(TimeUnit::Second).unwrap();
739 let bucket_sec: i64 = bucket.as_secs().try_into().ok()?;
740 let start_sec = ts_sec.align_by_bucket(bucket_sec)?;
741 start_sec.convert_to(ts.unit())
742}
743
744#[derive(Debug)]
745struct PartitionsInner {
746 parts: PartitionVec,
748 next_memtable_id: MemtableId,
750}
751
752impl PartitionsInner {
753 fn new(next_memtable_id: MemtableId) -> Self {
754 Self {
755 parts: Default::default(),
756 next_memtable_id,
757 }
758 }
759
760 fn with_partition(part: TimePartition, next_memtable_id: MemtableId) -> Self {
761 Self {
762 parts: smallvec![part],
763 next_memtable_id,
764 }
765 }
766
767 fn alloc_memtable_id(&mut self) -> MemtableId {
768 let id = self.next_memtable_id;
769 self.next_memtable_id += 1;
770 id
771 }
772
773 pub(crate) fn series_count(&self) -> usize {
774 self.parts
775 .iter()
776 .map(|p| p.memtable.stats().series_count)
777 .sum()
778 }
779}
780
781#[derive(Debug, Clone, Copy)]
783struct PartTimeRange {
784 min_timestamp: Timestamp,
786 max_timestamp: Timestamp,
788}
789
790impl PartTimeRange {
791 fn from_start_duration(start: Timestamp, duration: Duration) -> Option<Self> {
792 let start_sec = start.convert_to(TimeUnit::Second)?;
793 let end_sec = start_sec.add_duration(duration).ok()?;
794 let min_timestamp = start_sec.convert_to(start.unit())?;
795 let max_timestamp = end_sec.convert_to(start.unit())?;
796
797 Some(Self {
798 min_timestamp,
799 max_timestamp,
800 })
801 }
802
803 fn contains_timestamp(&self, ts: Timestamp) -> bool {
805 self.min_timestamp <= ts && ts < self.max_timestamp
806 }
807}
808
809struct PartitionToWrite<'a> {
810 partition: TimePartition,
811 key_values: Vec<KeyValue<'a>>,
812}
813
814#[cfg(test)]
815mod tests {
816 use std::sync::Arc;
817
818 use api::v1::SemanticType;
819 use datatypes::arrow::array::{ArrayRef, StringArray, TimestampMillisecondArray};
820 use datatypes::arrow::datatypes::{DataType, Field, Schema};
821 use datatypes::arrow::record_batch::RecordBatch;
822 use datatypes::prelude::ConcreteDataType;
823 use datatypes::schema::ColumnSchema;
824 use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
825 use store_api::storage::SequenceNumber;
826
827 use super::*;
828 use crate::memtable::partition_tree::PartitionTreeMemtableBuilder;
829 use crate::memtable::time_series::TimeSeriesMemtableBuilder;
830 use crate::memtable::{IterBuilder, RangesOptions};
831 use crate::test_util::memtable_util::{self, collect_iter_timestamps};
832
833 #[test]
834 fn test_no_duration() {
835 let metadata = memtable_util::metadata_for_test();
836 let builder = Arc::new(PartitionTreeMemtableBuilder::default());
837 let partitions = TimePartitions::new(metadata.clone(), builder, 0, None);
838 assert_eq!(0, partitions.num_partitions());
839 assert!(partitions.is_empty());
840
841 let kvs = memtable_util::build_key_values(
842 &metadata,
843 "hello".to_string(),
844 0,
845 &[1000, 3000, 7000, 5000, 6000],
846 0, );
848 partitions.write(&kvs).unwrap();
849
850 assert_eq!(1, partitions.num_partitions());
851 assert!(!partitions.is_empty());
852 let mut memtables = Vec::new();
853 partitions.list_memtables(&mut memtables);
854 assert_eq!(0, memtables[0].id());
855
856 let iter = memtables[0]
857 .ranges(None, RangesOptions::default())
858 .unwrap()
859 .build(None)
860 .unwrap();
861 let timestamps = collect_iter_timestamps(iter);
862 assert_eq!(&[1000, 3000, 5000, 6000, 7000], ×tamps[..]);
863 }
864
865 #[test]
866 fn test_write_single_part() {
867 let metadata = memtable_util::metadata_for_test();
868 let builder = Arc::new(PartitionTreeMemtableBuilder::default());
869 let partitions =
870 TimePartitions::new(metadata.clone(), builder, 0, Some(Duration::from_secs(10)));
871 assert_eq!(0, partitions.num_partitions());
872
873 let kvs = memtable_util::build_key_values(
874 &metadata,
875 "hello".to_string(),
876 0,
877 &[5000, 2000, 0],
878 0, );
880 partitions.write(&kvs).unwrap();
882 assert_eq!(1, partitions.num_partitions());
883 assert!(!partitions.is_empty());
884
885 let kvs = memtable_util::build_key_values(
886 &metadata,
887 "hello".to_string(),
888 0,
889 &[3000, 7000, 4000],
890 3, );
892 partitions.write(&kvs).unwrap();
894 assert_eq!(1, partitions.num_partitions());
895
896 let mut memtables = Vec::new();
897 partitions.list_memtables(&mut memtables);
898 let iter = memtables[0]
899 .ranges(None, RangesOptions::default())
900 .unwrap()
901 .build(None)
902 .unwrap();
903 let timestamps = collect_iter_timestamps(iter);
904 assert_eq!(&[0, 2000, 3000, 4000, 5000, 7000], ×tamps[..]);
905 let parts = partitions.list_partitions();
906 assert_eq!(
907 Timestamp::new_millisecond(0),
908 parts[0].time_range.min_timestamp
909 );
910 assert_eq!(
911 Timestamp::new_millisecond(10000),
912 parts[0].time_range.max_timestamp
913 );
914 }
915
916 #[cfg(test)]
917 fn new_multi_partitions(metadata: &RegionMetadataRef) -> TimePartitions {
918 let builder = Arc::new(PartitionTreeMemtableBuilder::default());
919 let partitions =
920 TimePartitions::new(metadata.clone(), builder, 0, Some(Duration::from_secs(5)));
921 assert_eq!(0, partitions.num_partitions());
922
923 let kvs = memtable_util::build_key_values(
924 metadata,
925 "hello".to_string(),
926 0,
927 &[2000, 0],
928 0, );
930 partitions.write(&kvs).unwrap();
932 assert_eq!(1, partitions.num_partitions());
933 assert!(!partitions.is_empty());
934
935 let kvs = memtable_util::build_key_values(
936 metadata,
937 "hello".to_string(),
938 0,
939 &[3000, 7000, 4000, 5000],
940 2, );
942 partitions.write(&kvs).unwrap();
944 assert_eq!(2, partitions.num_partitions());
945
946 partitions
947 }
948
949 #[test]
950 fn test_write_multi_parts() {
951 let metadata = memtable_util::metadata_for_test();
952 let partitions = new_multi_partitions(&metadata);
953
954 let parts = partitions.list_partitions();
955 let iter = parts[0]
956 .memtable
957 .ranges(None, RangesOptions::default())
958 .unwrap()
959 .build(None)
960 .unwrap();
961 let timestamps = collect_iter_timestamps(iter);
962 assert_eq!(0, parts[0].memtable.id());
963 assert_eq!(
964 Timestamp::new_millisecond(0),
965 parts[0].time_range.min_timestamp
966 );
967 assert_eq!(
968 Timestamp::new_millisecond(5000),
969 parts[0].time_range.max_timestamp
970 );
971 assert_eq!(&[0, 2000, 3000, 4000], ×tamps[..]);
972 let iter = parts[1]
973 .memtable
974 .ranges(None, RangesOptions::default())
975 .unwrap()
976 .build(None)
977 .unwrap();
978 assert_eq!(1, parts[1].memtable.id());
979 let timestamps = collect_iter_timestamps(iter);
980 assert_eq!(&[5000, 7000], ×tamps[..]);
981 assert_eq!(
982 Timestamp::new_millisecond(5000),
983 parts[1].time_range.min_timestamp
984 );
985 assert_eq!(
986 Timestamp::new_millisecond(10000),
987 parts[1].time_range.max_timestamp
988 );
989 }
990
991 #[test]
992 fn test_new_with_part_duration() {
993 let metadata = memtable_util::metadata_for_test();
994 let builder = Arc::new(PartitionTreeMemtableBuilder::default());
995 let partitions = TimePartitions::new(metadata.clone(), builder.clone(), 0, None);
996
997 let new_parts = partitions.new_with_part_duration(Some(Duration::from_secs(5)), None);
998 assert_eq!(Duration::from_secs(5), new_parts.part_duration());
999 assert_eq!(0, new_parts.next_memtable_id());
1000
1001 let new_parts = new_parts.new_with_part_duration(None, None);
1003 assert_eq!(Duration::from_secs(5), new_parts.part_duration());
1004 assert_eq!(0, new_parts.next_memtable_id());
1006
1007 let new_parts = new_parts.new_with_part_duration(Some(Duration::from_secs(10)), None);
1008 assert_eq!(Duration::from_secs(10), new_parts.part_duration());
1009 assert_eq!(0, new_parts.next_memtable_id());
1011
1012 let builder = Arc::new(PartitionTreeMemtableBuilder::default());
1013 let partitions = TimePartitions::new(metadata.clone(), builder.clone(), 0, None);
1014 let new_parts = partitions.new_with_part_duration(None, None);
1016 assert_eq!(INITIAL_TIME_WINDOW, new_parts.part_duration());
1017 assert_eq!(0, new_parts.next_memtable_id());
1018 }
1019
1020 #[test]
1021 fn test_fork_empty() {
1022 let metadata = memtable_util::metadata_for_test();
1023 let builder = Arc::new(PartitionTreeMemtableBuilder::default());
1024 let partitions = TimePartitions::new(metadata.clone(), builder, 0, None);
1025 partitions.freeze().unwrap();
1026 let new_parts = partitions.fork(&metadata, None);
1027 assert_eq!(INITIAL_TIME_WINDOW, new_parts.part_duration());
1028 assert!(new_parts.list_partitions().is_empty());
1029 assert_eq!(0, new_parts.next_memtable_id());
1030
1031 new_parts.freeze().unwrap();
1032 let new_parts = new_parts.fork(&metadata, Some(Duration::from_secs(5)));
1033 assert_eq!(Duration::from_secs(5), new_parts.part_duration());
1034 assert!(new_parts.list_partitions().is_empty());
1035 assert_eq!(0, new_parts.next_memtable_id());
1036
1037 new_parts.freeze().unwrap();
1038 let new_parts = new_parts.fork(&metadata, None);
1039 assert_eq!(Duration::from_secs(5), new_parts.part_duration());
1041 assert!(new_parts.list_partitions().is_empty());
1042 assert_eq!(0, new_parts.next_memtable_id());
1043
1044 new_parts.freeze().unwrap();
1045 let new_parts = new_parts.fork(&metadata, Some(Duration::from_secs(10)));
1046 assert_eq!(Duration::from_secs(10), new_parts.part_duration());
1047 assert!(new_parts.list_partitions().is_empty());
1048 assert_eq!(0, new_parts.next_memtable_id());
1049 }
1050
1051 #[test]
1052 fn test_fork_non_empty_none() {
1053 let metadata = memtable_util::metadata_for_test();
1054 let partitions = new_multi_partitions(&metadata);
1055 partitions.freeze().unwrap();
1056
1057 let new_parts = partitions.fork(&metadata, None);
1059 assert!(new_parts.is_empty());
1060 assert_eq!(Duration::from_secs(5), new_parts.part_duration());
1061 assert_eq!(2, new_parts.list_partitions()[0].memtable.id());
1062 assert_eq!(3, new_parts.next_memtable_id());
1063
1064 let new_parts = partitions.fork(&metadata, Some(Duration::from_secs(10)));
1066 assert!(new_parts.is_empty());
1067 assert_eq!(Duration::from_secs(10), new_parts.part_duration());
1068 assert_eq!(3, new_parts.list_partitions()[0].memtable.id());
1069 assert_eq!(4, new_parts.next_memtable_id());
1070 }
1071
1072 #[test]
1073 fn test_find_partitions_by_time_range() {
1074 let metadata = memtable_util::metadata_for_test();
1075 let builder = Arc::new(PartitionTreeMemtableBuilder::default());
1076
1077 let partitions = TimePartitions::new(metadata.clone(), builder.clone(), 0, None);
1079 let parts = partitions.list_partitions();
1080 let (matching, missing) = partitions
1081 .find_partitions_by_time_range(
1082 &(Arc::new(TimestampMillisecondArray::from_iter_values(1000..=2000)) as ArrayRef),
1083 &parts,
1084 Timestamp::new_millisecond(1000),
1085 Timestamp::new_millisecond(2000),
1086 )
1087 .unwrap();
1088 assert_eq!(matching.len(), 0);
1089 assert_eq!(missing.len(), 1);
1090 assert_eq!(missing[0], Timestamp::new_millisecond(0));
1091
1092 let partitions = TimePartitions::new(
1094 metadata.clone(),
1095 builder.clone(),
1096 0,
1097 Some(Duration::from_secs(5)),
1098 );
1099
1100 let kvs =
1102 memtable_util::build_key_values(&metadata, "hello".to_string(), 0, &[2000, 4000], 0);
1103 partitions.write(&kvs).unwrap();
1104 let kvs =
1105 memtable_util::build_key_values(&metadata, "hello".to_string(), 0, &[7000, 8000], 2);
1106 partitions.write(&kvs).unwrap();
1107
1108 let parts = partitions.list_partitions();
1109 assert_eq!(2, parts.len());
1110
1111 let (matching, missing) = partitions
1113 .find_partitions_by_time_range(
1114 &(Arc::new(TimestampMillisecondArray::from_iter_values(2000..=4000)) as ArrayRef),
1115 &parts,
1116 Timestamp::new_millisecond(2000),
1117 Timestamp::new_millisecond(4000),
1118 )
1119 .unwrap();
1120 assert_eq!(matching.len(), 1);
1121 assert!(missing.is_empty());
1122 assert_eq!(matching[0].time_range.min_timestamp.value(), 0);
1123
1124 let (matching, missing) = partitions
1126 .find_partitions_by_time_range(
1127 &(Arc::new(TimestampMillisecondArray::from_iter_values(3000..=8000)) as ArrayRef),
1128 &parts,
1129 Timestamp::new_millisecond(3000),
1130 Timestamp::new_millisecond(8000),
1131 )
1132 .unwrap();
1133 assert_eq!(matching.len(), 2);
1134 assert!(missing.is_empty());
1135 assert_eq!(matching[0].time_range.min_timestamp.value(), 0);
1136 assert_eq!(matching[1].time_range.min_timestamp.value(), 5000);
1137
1138 let (matching, missing) = partitions
1140 .find_partitions_by_time_range(
1141 &(Arc::new(TimestampMillisecondArray::from_iter_values(12000..=13000)) as ArrayRef),
1142 &parts,
1143 Timestamp::new_millisecond(12000),
1144 Timestamp::new_millisecond(13000),
1145 )
1146 .unwrap();
1147 assert!(matching.is_empty());
1148 assert_eq!(missing.len(), 1);
1149 assert_eq!(missing[0].value(), 10000);
1150
1151 let (matching, missing) = partitions
1153 .find_partitions_by_time_range(
1154 &(Arc::new(TimestampMillisecondArray::from_iter_values(4000..=6000)) as ArrayRef),
1155 &parts,
1156 Timestamp::new_millisecond(4000),
1157 Timestamp::new_millisecond(6000),
1158 )
1159 .unwrap();
1160 assert_eq!(matching.len(), 2);
1161 assert!(missing.is_empty());
1162 assert_eq!(matching[0].time_range.min_timestamp.value(), 0);
1163 assert_eq!(matching[1].time_range.min_timestamp.value(), 5000);
1164
1165 let (matching, missing) = partitions
1167 .find_partitions_by_time_range(
1168 &(Arc::new(TimestampMillisecondArray::from_iter_values(4999..=5000)) as ArrayRef),
1169 &parts,
1170 Timestamp::new_millisecond(4999),
1171 Timestamp::new_millisecond(5000),
1172 )
1173 .unwrap();
1174 assert_eq!(matching.len(), 2);
1175 assert!(missing.is_empty());
1176 assert_eq!(matching[0].time_range.min_timestamp.value(), 0);
1177 assert_eq!(matching[1].time_range.min_timestamp.value(), 5000);
1178
1179 let (matching, missing) = partitions
1181 .find_partitions_by_time_range(
1182 &(Arc::new(TimestampMillisecondArray::from_iter_values(9999..=10000)) as ArrayRef),
1183 &parts,
1184 Timestamp::new_millisecond(9999),
1185 Timestamp::new_millisecond(10000),
1186 )
1187 .unwrap();
1188 assert_eq!(matching.len(), 1);
1189 assert_eq!(1, missing.len());
1190 assert_eq!(matching[0].time_range.min_timestamp.value(), 5000);
1191 assert_eq!(missing[0].value(), 10000);
1192
1193 let (matching, missing) = partitions
1195 .find_partitions_by_time_range(
1196 &(Arc::new(TimestampMillisecondArray::from_iter_values(-1000..=1000)) as ArrayRef),
1197 &parts,
1198 Timestamp::new_millisecond(-1000),
1199 Timestamp::new_millisecond(1000),
1200 )
1201 .unwrap();
1202 assert_eq!(matching.len(), 1);
1203 assert_eq!(matching[0].time_range.min_timestamp.value(), 0);
1204 assert_eq!(1, missing.len());
1205 assert_eq!(missing[0].value(), -5000);
1206
1207 let (matching, missing) = partitions
1209 .find_partitions_by_time_range(
1210 &(Arc::new(TimestampMillisecondArray::from(vec![
1211 -100000000000,
1212 0,
1213 100000000000,
1214 ])) as ArrayRef),
1215 &parts,
1216 Timestamp::new_millisecond(-100000000000),
1217 Timestamp::new_millisecond(100000000000),
1218 )
1219 .unwrap();
1220 assert_eq!(2, matching.len());
1221 assert_eq!(matching[0].time_range.min_timestamp.value(), 0);
1222 assert_eq!(matching[1].time_range.min_timestamp.value(), 5000);
1223 assert_eq!(2, missing.len());
1224 assert_eq!(missing[0].value(), -100000000000);
1225 assert_eq!(missing[1].value(), 100000000000);
1226 }
1227
1228 fn build_part(ts: &[i64], sequence: SequenceNumber) -> BulkPart {
1229 let schema = Arc::new(Schema::new(vec![
1230 Field::new(
1231 "ts",
1232 DataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
1233 false,
1234 ),
1235 Field::new("val", DataType::Utf8, true),
1236 ]));
1237 let ts_data = ts.to_vec();
1238 let ts_array = Arc::new(TimestampMillisecondArray::from(ts_data));
1239 let val_array = Arc::new(StringArray::from_iter_values(
1240 ts.iter().map(|v| v.to_string()),
1241 ));
1242 let batch = RecordBatch::try_new(
1243 schema,
1244 vec![ts_array.clone() as ArrayRef, val_array.clone() as ArrayRef],
1245 )
1246 .unwrap();
1247 let max_ts = ts.iter().max().copied().unwrap();
1248 let min_ts = ts.iter().min().copied().unwrap();
1249 BulkPart {
1250 batch,
1251 max_timestamp: max_ts,
1252 min_timestamp: min_ts,
1253 sequence,
1254 timestamp_index: 0,
1255 raw_data: None,
1256 }
1257 }
1258
1259 #[test]
1260 fn test_write_bulk() {
1261 let mut metadata_builder = RegionMetadataBuilder::new(0.into());
1262 metadata_builder
1263 .push_column_metadata(ColumnMetadata {
1264 column_schema: ColumnSchema::new(
1265 "ts",
1266 ConcreteDataType::timestamp_millisecond_datatype(),
1267 false,
1268 ),
1269 semantic_type: SemanticType::Timestamp,
1270 column_id: 0,
1271 })
1272 .push_column_metadata(ColumnMetadata {
1273 column_schema: ColumnSchema::new("val", ConcreteDataType::string_datatype(), false),
1274 semantic_type: SemanticType::Field,
1275 column_id: 1,
1276 })
1277 .primary_key(vec![]);
1278 let metadata = Arc::new(metadata_builder.build().unwrap());
1279
1280 let builder = Arc::new(TimeSeriesMemtableBuilder::default());
1281 let partitions = TimePartitions::new(
1282 metadata.clone(),
1283 builder.clone(),
1284 0,
1285 Some(Duration::from_secs(5)),
1286 );
1287
1288 partitions
1290 .write_bulk(build_part(&[1000, 2000, 3000], 0))
1291 .unwrap();
1292
1293 let parts = partitions.list_partitions();
1294 assert_eq!(1, parts.len());
1295 let iter = parts[0]
1296 .memtable
1297 .ranges(None, RangesOptions::default())
1298 .unwrap()
1299 .build(None)
1300 .unwrap();
1301 let timestamps = collect_iter_timestamps(iter);
1302 assert_eq!(&[1000, 2000, 3000], ×tamps[..]);
1303
1304 partitions
1306 .write_bulk(build_part(&[4000, 5000, 6000], 1))
1307 .unwrap();
1308 let parts = partitions.list_partitions();
1309 assert_eq!(2, parts.len());
1310 let iter = parts[0]
1312 .memtable
1313 .ranges(None, RangesOptions::default())
1314 .unwrap()
1315 .build(None)
1316 .unwrap();
1317 let timestamps = collect_iter_timestamps(iter);
1318 assert_eq!(&[1000, 2000, 3000, 4000], ×tamps[..]);
1319 let iter = parts[1]
1321 .memtable
1322 .ranges(None, RangesOptions::default())
1323 .unwrap()
1324 .build(None)
1325 .unwrap();
1326 let timestamps = collect_iter_timestamps(iter);
1327 assert_eq!(&[5000, 6000], ×tamps[..]);
1328
1329 partitions
1331 .write_bulk(build_part(&[11000, 12000], 3))
1332 .unwrap();
1333
1334 let parts = partitions.list_partitions();
1335 assert_eq!(3, parts.len());
1336
1337 let iter = parts[2]
1339 .memtable
1340 .ranges(None, RangesOptions::default())
1341 .unwrap()
1342 .build(None)
1343 .unwrap();
1344 let timestamps = collect_iter_timestamps(iter);
1345 assert_eq!(&[11000, 12000], ×tamps[..]);
1346
1347 let partitions = TimePartitions::new(metadata.clone(), builder, 3, None);
1349
1350 partitions
1351 .write_bulk(build_part(&[1000, 5000, 9000], 4))
1352 .unwrap();
1353
1354 let parts = partitions.list_partitions();
1355 assert_eq!(1, parts.len());
1356 let iter = parts[0]
1357 .memtable
1358 .ranges(None, RangesOptions::default())
1359 .unwrap()
1360 .build(None)
1361 .unwrap();
1362 let timestamps = collect_iter_timestamps(iter);
1363 assert_eq!(&[1000, 5000, 9000], ×tamps[..]);
1364 }
1365
1366 #[test]
1367 fn test_split_record_batch() {
1368 let schema = Arc::new(Schema::new(vec![
1369 Field::new(
1370 "ts",
1371 DataType::Timestamp(TimeUnit::Millisecond.as_arrow_time_unit(), None),
1372 false,
1373 ),
1374 Field::new("val", DataType::Utf8, true),
1375 ]));
1376
1377 let ts_array = Arc::new(TimestampMillisecondArray::from(vec![
1378 1000, 2000, 5000, 7000, 8000,
1379 ]));
1380 let val_array = Arc::new(StringArray::from(vec!["a", "b", "c", "d", "e"]));
1381 let batch = RecordBatch::try_new(
1382 schema.clone(),
1383 vec![ts_array as ArrayRef, val_array as ArrayRef],
1384 )
1385 .unwrap();
1386
1387 let part = BulkPart {
1388 batch,
1389 max_timestamp: 8000,
1390 min_timestamp: 1000,
1391 sequence: 0,
1392 timestamp_index: 0,
1393 raw_data: None,
1394 };
1395
1396 let result = filter_record_batch(&part, 1000, 2000).unwrap();
1397 assert!(result.is_some());
1398 let filtered = result.unwrap();
1399 assert_eq!(filtered.num_rows(), 1);
1400 assert_eq!(filtered.min_timestamp, 1000);
1401 assert_eq!(filtered.max_timestamp, 1000);
1402
1403 let result = filter_record_batch(&part, 3000, 6000).unwrap();
1405 assert!(result.is_some());
1406 let filtered = result.unwrap();
1407 assert_eq!(filtered.num_rows(), 1);
1408 assert_eq!(filtered.min_timestamp, 5000);
1409 assert_eq!(filtered.max_timestamp, 5000);
1410
1411 let result = filter_record_batch(&part, 3000, 4000).unwrap();
1413 assert!(result.is_none());
1414
1415 let result = filter_record_batch(&part, 0, 9000).unwrap();
1417 assert!(result.is_some());
1418 let filtered = result.unwrap();
1419 assert_eq!(filtered.num_rows(), 5);
1420 assert_eq!(filtered.min_timestamp, 1000);
1421 assert_eq!(filtered.max_timestamp, 8000);
1422 }
1423}