mito2/memtable/
time_partition.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Partitions memtables by time.
16
17use std::collections::{HashMap, HashSet};
18use std::sync::{Arc, Mutex, MutexGuard};
19use std::time::Duration;
20
21use common_telemetry::debug;
22use common_time::timestamp::TimeUnit;
23use common_time::timestamp_millis::BucketAligned;
24use common_time::Timestamp;
25use datatypes::arrow;
26use datatypes::arrow::array::{
27    ArrayRef, BooleanArray, RecordBatch, RecordBatchOptions, TimestampMicrosecondArray,
28    TimestampMillisecondArray, TimestampNanosecondArray, TimestampSecondArray,
29};
30use datatypes::arrow::buffer::{BooleanBuffer, MutableBuffer};
31use datatypes::arrow::datatypes::{DataType, Int64Type};
32use mito_codec::key_values::KeyValue;
33use smallvec::{smallvec, SmallVec};
34use snafu::{OptionExt, ResultExt};
35use store_api::metadata::RegionMetadataRef;
36
37use crate::error;
38use crate::error::{InvalidRequestSnafu, Result};
39use crate::memtable::bulk::part::BulkPart;
40use crate::memtable::version::SmallMemtableVec;
41use crate::memtable::{KeyValues, MemtableBuilderRef, MemtableId, MemtableRef};
42
43/// Initial time window if not specified.
44const INITIAL_TIME_WINDOW: Duration = Duration::from_days(1);
45
46/// A partition holds rows with timestamps between `[min, max)`.
47#[derive(Debug, Clone)]
48pub struct TimePartition {
49    /// Memtable of the partition.
50    memtable: MemtableRef,
51    /// Time range of the partition. `min` is inclusive and `max` is exclusive.
52    time_range: PartTimeRange,
53}
54
55impl TimePartition {
56    /// Returns whether the `ts` belongs to the partition.
57    fn contains_timestamp(&self, ts: Timestamp) -> bool {
58        self.time_range.contains_timestamp(ts)
59    }
60
61    /// Write rows to the part.
62    fn write(&self, kvs: &KeyValues) -> Result<()> {
63        self.memtable.write(kvs)
64    }
65
66    /// Writes a record batch to memtable.
67    fn write_record_batch(&self, rb: BulkPart) -> Result<()> {
68        self.memtable.write_bulk(rb)
69    }
70
71    /// Write a partial [BulkPart] according to [TimePartition::time_range].
72    fn write_record_batch_partial(&self, part: &BulkPart) -> Result<()> {
73        let Some(filtered) = filter_record_batch(
74            part,
75            self.time_range.min_timestamp.value(),
76            self.time_range.max_timestamp.value(),
77        )?
78        else {
79            return Ok(());
80        };
81        self.write_record_batch(filtered)
82    }
83}
84
85macro_rules! create_filter_buffer {
86    ($ts_array:expr, $min:expr, $max:expr) => {{
87        let len = $ts_array.len();
88        let mut buffer = MutableBuffer::new(len.div_ceil(64) * 8);
89
90        let f = |idx: usize| -> bool {
91            // SAFETY: we only iterate the array within index bound.
92            unsafe {
93                let val = $ts_array.value_unchecked(idx);
94                val >= $min && val < $max
95            }
96        };
97
98        let chunks = len / 64;
99        let remainder = len % 64;
100
101        for chunk in 0..chunks {
102            let mut packed = 0;
103            for bit_idx in 0..64 {
104                let i = bit_idx + chunk * 64;
105                packed |= (f(i) as u64) << bit_idx;
106            }
107            // SAFETY: Already allocated sufficient capacity
108            unsafe { buffer.push_unchecked(packed) }
109        }
110
111        if remainder != 0 {
112            let mut packed = 0;
113            for bit_idx in 0..remainder {
114                let i = bit_idx + chunks * 64;
115                packed |= (f(i) as u64) << bit_idx;
116            }
117            // SAFETY: Already allocated sufficient capacity
118            unsafe { buffer.push_unchecked(packed) }
119        }
120
121        BooleanArray::new(BooleanBuffer::new(buffer.into(), 0, len), None)
122    }};
123}
124
125macro_rules! handle_timestamp_array {
126    ($ts_array:expr, $array_type:ty, $min:expr, $max:expr) => {{
127        let ts_array = $ts_array.as_any().downcast_ref::<$array_type>().unwrap();
128        let filter = create_filter_buffer!(ts_array, $min, $max);
129
130        let res = arrow::compute::filter(ts_array, &filter).context(error::ComputeArrowSnafu)?;
131        if res.is_empty() {
132            return Ok(None);
133        }
134
135        let i64array = res.as_any().downcast_ref::<$array_type>().unwrap();
136        // safety: we've checked res is not empty
137        let max_ts = arrow::compute::max(i64array).unwrap();
138        let min_ts = arrow::compute::min(i64array).unwrap();
139
140        (res, filter, min_ts, max_ts)
141    }};
142}
143
144/// Filters the given part according to min (inclusive) and max (exclusive) timestamp range.
145/// Returns [None] if no matching rows.
146pub fn filter_record_batch(part: &BulkPart, min: i64, max: i64) -> Result<Option<BulkPart>> {
147    let ts_array = part.timestamps();
148    let (ts_array, filter, min_ts, max_ts) = match ts_array.data_type() {
149        DataType::Timestamp(unit, _) => match unit {
150            arrow::datatypes::TimeUnit::Second => {
151                handle_timestamp_array!(ts_array, TimestampSecondArray, min, max)
152            }
153            arrow::datatypes::TimeUnit::Millisecond => {
154                handle_timestamp_array!(ts_array, TimestampMillisecondArray, min, max)
155            }
156            arrow::datatypes::TimeUnit::Microsecond => {
157                handle_timestamp_array!(ts_array, TimestampMicrosecondArray, min, max)
158            }
159            arrow::datatypes::TimeUnit::Nanosecond => {
160                handle_timestamp_array!(ts_array, TimestampNanosecondArray, min, max)
161            }
162        },
163        _ => {
164            unreachable!("Got data type: {:?}", ts_array.data_type());
165        }
166    };
167
168    let num_rows = ts_array.len();
169    let arrays = part
170        .batch
171        .columns()
172        .iter()
173        .enumerate()
174        .map(|(index, array)| {
175            if index == part.timestamp_index {
176                Ok(ts_array.clone())
177            } else {
178                arrow::compute::filter(&array, &filter).context(error::ComputeArrowSnafu)
179            }
180        })
181        .collect::<Result<Vec<_>>>()?;
182    let batch = RecordBatch::try_new_with_options(
183        part.batch.schema(),
184        arrays,
185        &RecordBatchOptions::default().with_row_count(Some(num_rows)),
186    )
187    .context(error::NewRecordBatchSnafu)?;
188    Ok(Some(BulkPart {
189        batch,
190        max_ts,
191        min_ts,
192        sequence: part.sequence,
193        timestamp_index: part.timestamp_index,
194        raw_data: None,
195    }))
196}
197
198type PartitionVec = SmallVec<[TimePartition; 2]>;
199
200/// Partitions.
201#[derive(Debug)]
202pub struct TimePartitions {
203    /// Mutable data of partitions.
204    inner: Mutex<PartitionsInner>,
205    /// Duration of a partition.
206    part_duration: Duration,
207    /// Metadata of the region.
208    metadata: RegionMetadataRef,
209    /// Builder of memtables.
210    builder: MemtableBuilderRef,
211}
212
213pub type TimePartitionsRef = Arc<TimePartitions>;
214
215impl TimePartitions {
216    /// Returns a new empty partition list with optional duration.
217    pub fn new(
218        metadata: RegionMetadataRef,
219        builder: MemtableBuilderRef,
220        next_memtable_id: MemtableId,
221        part_duration: Option<Duration>,
222    ) -> Self {
223        let inner = PartitionsInner::new(next_memtable_id);
224        Self {
225            inner: Mutex::new(inner),
226            part_duration: part_duration.unwrap_or(INITIAL_TIME_WINDOW),
227            metadata,
228            builder,
229        }
230    }
231
232    /// Write key values to memtables.
233    ///
234    /// It creates new partitions if necessary.
235    pub fn write(&self, kvs: &KeyValues) -> Result<()> {
236        // Get all parts.
237        let parts = self.list_partitions();
238
239        // Checks whether all rows belongs to a single part. Checks in reverse order as we usually
240        // put to latest part.
241        for part in parts.iter().rev() {
242            let mut all_in_partition = true;
243            for kv in kvs.iter() {
244                // Safety: We checked the schema in the write request.
245                let ts = kv.timestamp().as_timestamp().unwrap().unwrap();
246                if !part.contains_timestamp(ts) {
247                    all_in_partition = false;
248                    break;
249                }
250            }
251            if !all_in_partition {
252                continue;
253            }
254
255            // We can write all rows to this part.
256            return part.write(kvs);
257        }
258
259        // Slow path: We have to split kvs by partitions.
260        self.write_multi_parts(kvs, &parts)
261    }
262
263    pub fn write_bulk(&self, part: BulkPart) -> Result<()> {
264        let time_type = self
265            .metadata
266            .time_index_column()
267            .column_schema
268            .data_type
269            .as_timestamp()
270            .unwrap();
271
272        // Get all parts.
273        let parts = self.list_partitions();
274        let (matching_parts, missing_parts) = self.find_partitions_by_time_range(
275            part.timestamps(),
276            &parts,
277            time_type.create_timestamp(part.min_ts),
278            time_type.create_timestamp(part.max_ts),
279        )?;
280
281        if matching_parts.len() == 1 && missing_parts.is_empty() {
282            // fast path: all timestamps fall in one time partition.
283            return matching_parts[0].write_record_batch(part);
284        }
285
286        for matching in matching_parts {
287            matching.write_record_batch_partial(&part)?
288        }
289
290        for missing in missing_parts {
291            let new_part = {
292                let mut inner = self.inner.lock().unwrap();
293                self.get_or_create_time_partition(missing, &mut inner)?
294            };
295            new_part.write_record_batch_partial(&part)?;
296        }
297        Ok(())
298    }
299
300    // Creates or gets parts with given start timestamp.
301    // Acquires the lock to avoid others create the same partition.
302    fn get_or_create_time_partition(
303        &self,
304        part_start: Timestamp,
305        inner: &mut MutexGuard<PartitionsInner>,
306    ) -> Result<TimePartition> {
307        let part_pos = match inner
308            .parts
309            .iter()
310            .position(|part| part.time_range.min_timestamp == part_start)
311        {
312            Some(pos) => pos,
313            None => {
314                let range = PartTimeRange::from_start_duration(part_start, self.part_duration)
315                    .with_context(|| InvalidRequestSnafu {
316                        region_id: self.metadata.region_id,
317                        reason: format!(
318                            "Partition time range for {part_start:?} is out of bound, bucket size: {:?}", self.part_duration
319                        ),
320                    })?;
321                let memtable = self
322                    .builder
323                    .build(inner.alloc_memtable_id(), &self.metadata);
324                debug!(
325                        "Create time partition {:?} for region {}, duration: {:?}, memtable_id: {}, parts_total: {}",
326                        range,
327                        self.metadata.region_id,
328                        self.part_duration,
329                        memtable.id(),
330                        inner.parts.len() + 1
331                    );
332                let pos = inner.parts.len();
333                inner.parts.push(TimePartition {
334                    memtable,
335                    time_range: range,
336                });
337                pos
338            }
339        };
340        Ok(inner.parts[part_pos].clone())
341    }
342
343    /// Append memtables in partitions to `memtables`.
344    pub fn list_memtables(&self, memtables: &mut Vec<MemtableRef>) {
345        let inner = self.inner.lock().unwrap();
346        memtables.extend(inner.parts.iter().map(|part| part.memtable.clone()));
347    }
348
349    /// Returns the number of partitions.
350    pub fn num_partitions(&self) -> usize {
351        let inner = self.inner.lock().unwrap();
352        inner.parts.len()
353    }
354
355    /// Returns true if all memtables are empty.
356    pub fn is_empty(&self) -> bool {
357        let inner = self.inner.lock().unwrap();
358        inner.parts.iter().all(|part| part.memtable.is_empty())
359    }
360
361    /// Freezes all memtables.
362    pub fn freeze(&self) -> Result<()> {
363        let inner = self.inner.lock().unwrap();
364        for part in &*inner.parts {
365            part.memtable.freeze()?;
366        }
367        Ok(())
368    }
369
370    /// Forks latest partition and updates the partition duration if `part_duration` is Some.
371    pub fn fork(&self, metadata: &RegionMetadataRef, part_duration: Option<Duration>) -> Self {
372        // Fall back to the existing partition duration.
373        let part_duration = part_duration.unwrap_or(self.part_duration);
374
375        let mut inner = self.inner.lock().unwrap();
376        let latest_part = inner
377            .parts
378            .iter()
379            .max_by_key(|part| part.time_range.min_timestamp)
380            .cloned();
381
382        let Some(old_part) = latest_part else {
383            // If there is no partition, then we create a new partition with the new duration.
384            return Self::new(
385                metadata.clone(),
386                self.builder.clone(),
387                inner.next_memtable_id,
388                Some(part_duration),
389            );
390        };
391
392        let old_stats = old_part.memtable.stats();
393        // Use the max timestamp to compute the new time range for the memtable.
394        let partitions_inner = old_stats
395            .time_range()
396            .and_then(|(_, old_stats_end_timestamp)| {
397                partition_start_timestamp(old_stats_end_timestamp, part_duration)
398                    .and_then(|start| PartTimeRange::from_start_duration(start, part_duration))
399            })
400            .map(|part_time_range| {
401                // Forks the latest partition, but compute the time range based on the new duration.
402                let memtable = old_part.memtable.fork(inner.alloc_memtable_id(), metadata);
403                let part = TimePartition {
404                    memtable,
405                    time_range: part_time_range,
406                };
407                PartitionsInner::with_partition(part, inner.next_memtable_id)
408            })
409            .unwrap_or_else(|| PartitionsInner::new(inner.next_memtable_id));
410
411        Self {
412            inner: Mutex::new(partitions_inner),
413            part_duration,
414            metadata: metadata.clone(),
415            builder: self.builder.clone(),
416        }
417    }
418
419    /// Returns partition duration.
420    pub(crate) fn part_duration(&self) -> Duration {
421        self.part_duration
422    }
423
424    /// Returns memory usage.
425    pub(crate) fn memory_usage(&self) -> usize {
426        let inner = self.inner.lock().unwrap();
427        inner
428            .parts
429            .iter()
430            .map(|part| part.memtable.stats().estimated_bytes)
431            .sum()
432    }
433
434    /// Returns the number of rows.
435    pub(crate) fn num_rows(&self) -> u64 {
436        let inner = self.inner.lock().unwrap();
437        inner
438            .parts
439            .iter()
440            .map(|part| part.memtable.stats().num_rows as u64)
441            .sum()
442    }
443
444    /// Append memtables in partitions to small vec.
445    pub(crate) fn list_memtables_to_small_vec(&self, memtables: &mut SmallMemtableVec) {
446        let inner = self.inner.lock().unwrap();
447        memtables.extend(inner.parts.iter().map(|part| part.memtable.clone()));
448    }
449
450    /// Returns the next memtable id.
451    pub(crate) fn next_memtable_id(&self) -> MemtableId {
452        let inner = self.inner.lock().unwrap();
453        inner.next_memtable_id
454    }
455
456    /// Creates a new empty partition list from this list and a `part_duration`.
457    /// It falls back to the old partition duration if `part_duration` is `None`.
458    pub(crate) fn new_with_part_duration(&self, part_duration: Option<Duration>) -> Self {
459        debug_assert!(self.is_empty());
460
461        Self::new(
462            self.metadata.clone(),
463            self.builder.clone(),
464            self.next_memtable_id(),
465            Some(part_duration.unwrap_or(self.part_duration)),
466        )
467    }
468
469    /// Returns all partitions.
470    fn list_partitions(&self) -> PartitionVec {
471        let inner = self.inner.lock().unwrap();
472        inner.parts.clone()
473    }
474
475    /// Find existing partitions that match the bulk data's time range and identify
476    /// any new partitions that need to be created
477    fn find_partitions_by_time_range<'a>(
478        &self,
479        ts_array: &ArrayRef,
480        existing_parts: &'a [TimePartition],
481        min: Timestamp,
482        max: Timestamp,
483    ) -> Result<(Vec<&'a TimePartition>, Vec<Timestamp>)> {
484        let mut matching = Vec::new();
485
486        let mut present = HashSet::new();
487        // First find any existing partitions that overlap
488        for part in existing_parts {
489            let part_time_range = &part.time_range;
490            if !(max < part_time_range.min_timestamp || min >= part_time_range.max_timestamp) {
491                matching.push(part);
492                present.insert(part_time_range.min_timestamp.value());
493            }
494        }
495
496        // safety: self.part_duration can only be present when reach here.
497        let part_duration = self.part_duration_or_default();
498        let timestamp_unit = self.metadata.time_index_type().unit();
499
500        let part_duration_sec = part_duration.as_secs() as i64;
501        // SAFETY: Timestamps won't overflow when converting to Second.
502        let start_bucket = min
503            .convert_to(TimeUnit::Second)
504            .unwrap()
505            .value()
506            .div_euclid(part_duration_sec);
507        let end_bucket = max
508            .convert_to(TimeUnit::Second)
509            .unwrap()
510            .value()
511            .div_euclid(part_duration_sec);
512        let bucket_num = (end_bucket - start_bucket + 1) as usize;
513
514        let num_timestamps = ts_array.len();
515        let missing = if bucket_num <= num_timestamps {
516            (start_bucket..=end_bucket)
517                .filter_map(|start_sec| {
518                    let Some(timestamp) = Timestamp::new_second(start_sec * part_duration_sec)
519                        .convert_to(timestamp_unit)
520                    else {
521                        return Some(
522                            InvalidRequestSnafu {
523                                region_id: self.metadata.region_id,
524                                reason: format!("Timestamp out of range: {}", start_sec),
525                            }
526                            .fail(),
527                        );
528                    };
529                    if present.insert(timestamp.value()) {
530                        Some(Ok(timestamp))
531                    } else {
532                        None
533                    }
534                })
535                .collect::<Result<Vec<_>>>()?
536        } else {
537            let ts_primitive = match ts_array.data_type() {
538                DataType::Timestamp(unit, _) => match unit {
539                    arrow::datatypes::TimeUnit::Second => ts_array
540                        .as_any()
541                        .downcast_ref::<TimestampSecondArray>()
542                        .unwrap()
543                        .reinterpret_cast::<Int64Type>(),
544                    arrow::datatypes::TimeUnit::Millisecond => ts_array
545                        .as_any()
546                        .downcast_ref::<TimestampMillisecondArray>()
547                        .unwrap()
548                        .reinterpret_cast::<Int64Type>(),
549                    arrow::datatypes::TimeUnit::Microsecond => ts_array
550                        .as_any()
551                        .downcast_ref::<TimestampMicrosecondArray>()
552                        .unwrap()
553                        .reinterpret_cast::<Int64Type>(),
554                    arrow::datatypes::TimeUnit::Nanosecond => ts_array
555                        .as_any()
556                        .downcast_ref::<TimestampNanosecondArray>()
557                        .unwrap()
558                        .reinterpret_cast::<Int64Type>(),
559                },
560                _ => unreachable!(),
561            };
562
563            ts_primitive
564                .values()
565                .iter()
566                .filter_map(|ts| {
567                    let ts = self.metadata.time_index_type().create_timestamp(*ts);
568                    let Some(bucket_start) = ts
569                        .convert_to(TimeUnit::Second)
570                        .and_then(|ts| ts.align_by_bucket(part_duration_sec))
571                        .and_then(|ts| ts.convert_to(timestamp_unit))
572                    else {
573                        return Some(
574                            InvalidRequestSnafu {
575                                region_id: self.metadata.region_id,
576                                reason: format!("Timestamp out of range: {:?}", ts),
577                            }
578                            .fail(),
579                        );
580                    };
581                    if present.insert(bucket_start.value()) {
582                        Some(Ok(bucket_start))
583                    } else {
584                        None
585                    }
586                })
587                .collect::<Result<Vec<_>>>()?
588        };
589        Ok((matching, missing))
590    }
591
592    /// Returns partition duration, or use default 1day duration is not present.
593    fn part_duration_or_default(&self) -> Duration {
594        self.part_duration
595    }
596
597    /// Write to multiple partitions.
598    fn write_multi_parts(&self, kvs: &KeyValues, parts: &PartitionVec) -> Result<()> {
599        let mut parts_to_write = HashMap::new();
600        let mut missing_parts = HashMap::new();
601        for kv in kvs.iter() {
602            let mut part_found = false;
603            // Safety: We used the timestamp before.
604            let ts = kv.timestamp().as_timestamp().unwrap().unwrap();
605            for part in parts {
606                if part.contains_timestamp(ts) {
607                    parts_to_write
608                        .entry(part.time_range.min_timestamp)
609                        .or_insert_with(|| PartitionToWrite {
610                            partition: part.clone(),
611                            key_values: Vec::new(),
612                        })
613                        .key_values
614                        .push(kv);
615                    part_found = true;
616                    break;
617                }
618            }
619
620            if !part_found {
621                // We need to write it to a new part.
622                // Safety: `new()` ensures duration is always Some if we do to this method.
623                let part_duration = self.part_duration_or_default();
624                let part_start =
625                    partition_start_timestamp(ts, part_duration).with_context(|| {
626                        InvalidRequestSnafu {
627                            region_id: self.metadata.region_id,
628                            reason: format!(
629                                "timestamp {ts:?} and bucket {part_duration:?} are out of range"
630                            ),
631                        }
632                    })?;
633                missing_parts
634                    .entry(part_start)
635                    .or_insert_with(Vec::new)
636                    .push(kv);
637            }
638        }
639
640        // Writes rows to existing parts.
641        for part_to_write in parts_to_write.into_values() {
642            for kv in part_to_write.key_values {
643                part_to_write.partition.memtable.write_one(kv)?;
644            }
645        }
646
647        // Creates new parts and writes to them. Acquires the lock to avoid others create
648        // the same partition.
649        let mut inner = self.inner.lock().unwrap();
650        for (part_start, key_values) in missing_parts {
651            let partition = self.get_or_create_time_partition(part_start, &mut inner)?;
652            for kv in key_values {
653                partition.memtable.write_one(kv)?;
654            }
655        }
656
657        Ok(())
658    }
659
660    /// Timeseries count in all time partitions.
661    pub(crate) fn series_count(&self) -> usize {
662        self.inner.lock().unwrap().series_count()
663    }
664}
665
666/// Computes the start timestamp of the partition for `ts`.
667///
668/// It always use bucket size in seconds which should fit all timestamp resolution.
669fn partition_start_timestamp(ts: Timestamp, bucket: Duration) -> Option<Timestamp> {
670    // Safety: We convert it to seconds so it never returns `None`.
671    let ts_sec = ts.convert_to(TimeUnit::Second).unwrap();
672    let bucket_sec: i64 = bucket.as_secs().try_into().ok()?;
673    let start_sec = ts_sec.align_by_bucket(bucket_sec)?;
674    start_sec.convert_to(ts.unit())
675}
676
677#[derive(Debug)]
678struct PartitionsInner {
679    /// All partitions.
680    parts: PartitionVec,
681    /// Next memtable id.
682    next_memtable_id: MemtableId,
683}
684
685impl PartitionsInner {
686    fn new(next_memtable_id: MemtableId) -> Self {
687        Self {
688            parts: Default::default(),
689            next_memtable_id,
690        }
691    }
692
693    fn with_partition(part: TimePartition, next_memtable_id: MemtableId) -> Self {
694        Self {
695            parts: smallvec![part],
696            next_memtable_id,
697        }
698    }
699
700    fn alloc_memtable_id(&mut self) -> MemtableId {
701        let id = self.next_memtable_id;
702        self.next_memtable_id += 1;
703        id
704    }
705
706    pub(crate) fn series_count(&self) -> usize {
707        self.parts
708            .iter()
709            .map(|p| p.memtable.stats().series_count)
710            .sum()
711    }
712}
713
714/// Time range of a partition.
715#[derive(Debug, Clone, Copy)]
716struct PartTimeRange {
717    /// Inclusive min timestamp of rows in the partition.
718    min_timestamp: Timestamp,
719    /// Exclusive max timestamp of rows in the partition.
720    max_timestamp: Timestamp,
721}
722
723impl PartTimeRange {
724    fn from_start_duration(start: Timestamp, duration: Duration) -> Option<Self> {
725        let start_sec = start.convert_to(TimeUnit::Second)?;
726        let end_sec = start_sec.add_duration(duration).ok()?;
727        let min_timestamp = start_sec.convert_to(start.unit())?;
728        let max_timestamp = end_sec.convert_to(start.unit())?;
729
730        Some(Self {
731            min_timestamp,
732            max_timestamp,
733        })
734    }
735
736    /// Returns whether the `ts` belongs to the partition.
737    fn contains_timestamp(&self, ts: Timestamp) -> bool {
738        self.min_timestamp <= ts && ts < self.max_timestamp
739    }
740}
741
742struct PartitionToWrite<'a> {
743    partition: TimePartition,
744    key_values: Vec<KeyValue<'a>>,
745}
746
747#[cfg(test)]
748mod tests {
749    use std::sync::Arc;
750
751    use api::v1::SemanticType;
752    use datatypes::arrow::array::{ArrayRef, StringArray, TimestampMillisecondArray};
753    use datatypes::arrow::datatypes::{DataType, Field, Schema};
754    use datatypes::arrow::record_batch::RecordBatch;
755    use datatypes::prelude::ConcreteDataType;
756    use datatypes::schema::ColumnSchema;
757    use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
758    use store_api::storage::SequenceNumber;
759
760    use super::*;
761    use crate::memtable::partition_tree::PartitionTreeMemtableBuilder;
762    use crate::memtable::time_series::TimeSeriesMemtableBuilder;
763    use crate::test_util::memtable_util::{self, collect_iter_timestamps};
764
765    #[test]
766    fn test_no_duration() {
767        let metadata = memtable_util::metadata_for_test();
768        let builder = Arc::new(PartitionTreeMemtableBuilder::default());
769        let partitions = TimePartitions::new(metadata.clone(), builder, 0, None);
770        assert_eq!(0, partitions.num_partitions());
771        assert!(partitions.is_empty());
772
773        let kvs = memtable_util::build_key_values(
774            &metadata,
775            "hello".to_string(),
776            0,
777            &[1000, 3000, 7000, 5000, 6000],
778            0, // sequence 0, 1, 2, 3, 4
779        );
780        partitions.write(&kvs).unwrap();
781
782        assert_eq!(1, partitions.num_partitions());
783        assert!(!partitions.is_empty());
784        let mut memtables = Vec::new();
785        partitions.list_memtables(&mut memtables);
786        assert_eq!(0, memtables[0].id());
787
788        let iter = memtables[0].iter(None, None, None).unwrap();
789        let timestamps = collect_iter_timestamps(iter);
790        assert_eq!(&[1000, 3000, 5000, 6000, 7000], &timestamps[..]);
791    }
792
793    #[test]
794    fn test_write_single_part() {
795        let metadata = memtable_util::metadata_for_test();
796        let builder = Arc::new(PartitionTreeMemtableBuilder::default());
797        let partitions =
798            TimePartitions::new(metadata.clone(), builder, 0, Some(Duration::from_secs(10)));
799        assert_eq!(0, partitions.num_partitions());
800
801        let kvs = memtable_util::build_key_values(
802            &metadata,
803            "hello".to_string(),
804            0,
805            &[5000, 2000, 0],
806            0, // sequence 0, 1, 2
807        );
808        // It should creates a new partition.
809        partitions.write(&kvs).unwrap();
810        assert_eq!(1, partitions.num_partitions());
811        assert!(!partitions.is_empty());
812
813        let kvs = memtable_util::build_key_values(
814            &metadata,
815            "hello".to_string(),
816            0,
817            &[3000, 7000, 4000],
818            3, // sequence 3, 4, 5
819        );
820        // Still writes to the same partition.
821        partitions.write(&kvs).unwrap();
822        assert_eq!(1, partitions.num_partitions());
823
824        let mut memtables = Vec::new();
825        partitions.list_memtables(&mut memtables);
826        let iter = memtables[0].iter(None, None, None).unwrap();
827        let timestamps = collect_iter_timestamps(iter);
828        assert_eq!(&[0, 2000, 3000, 4000, 5000, 7000], &timestamps[..]);
829        let parts = partitions.list_partitions();
830        assert_eq!(
831            Timestamp::new_millisecond(0),
832            parts[0].time_range.min_timestamp
833        );
834        assert_eq!(
835            Timestamp::new_millisecond(10000),
836            parts[0].time_range.max_timestamp
837        );
838    }
839
840    #[cfg(test)]
841    fn new_multi_partitions(metadata: &RegionMetadataRef) -> TimePartitions {
842        let builder = Arc::new(PartitionTreeMemtableBuilder::default());
843        let partitions =
844            TimePartitions::new(metadata.clone(), builder, 0, Some(Duration::from_secs(5)));
845        assert_eq!(0, partitions.num_partitions());
846
847        let kvs = memtable_util::build_key_values(
848            metadata,
849            "hello".to_string(),
850            0,
851            &[2000, 0],
852            0, // sequence 0, 1
853        );
854        // It should creates a new partition.
855        partitions.write(&kvs).unwrap();
856        assert_eq!(1, partitions.num_partitions());
857        assert!(!partitions.is_empty());
858
859        let kvs = memtable_util::build_key_values(
860            metadata,
861            "hello".to_string(),
862            0,
863            &[3000, 7000, 4000, 5000],
864            2, // sequence 2, 3, 4, 5
865        );
866        // Writes 2 rows to the old partition and 1 row to a new partition.
867        partitions.write(&kvs).unwrap();
868        assert_eq!(2, partitions.num_partitions());
869
870        partitions
871    }
872
873    #[test]
874    fn test_write_multi_parts() {
875        let metadata = memtable_util::metadata_for_test();
876        let partitions = new_multi_partitions(&metadata);
877
878        let parts = partitions.list_partitions();
879        let iter = parts[0].memtable.iter(None, None, None).unwrap();
880        let timestamps = collect_iter_timestamps(iter);
881        assert_eq!(0, parts[0].memtable.id());
882        assert_eq!(
883            Timestamp::new_millisecond(0),
884            parts[0].time_range.min_timestamp
885        );
886        assert_eq!(
887            Timestamp::new_millisecond(5000),
888            parts[0].time_range.max_timestamp
889        );
890        assert_eq!(&[0, 2000, 3000, 4000], &timestamps[..]);
891        let iter = parts[1].memtable.iter(None, None, None).unwrap();
892        assert_eq!(1, parts[1].memtable.id());
893        let timestamps = collect_iter_timestamps(iter);
894        assert_eq!(&[5000, 7000], &timestamps[..]);
895        assert_eq!(
896            Timestamp::new_millisecond(5000),
897            parts[1].time_range.min_timestamp
898        );
899        assert_eq!(
900            Timestamp::new_millisecond(10000),
901            parts[1].time_range.max_timestamp
902        );
903    }
904
905    #[test]
906    fn test_new_with_part_duration() {
907        let metadata = memtable_util::metadata_for_test();
908        let builder = Arc::new(PartitionTreeMemtableBuilder::default());
909        let partitions = TimePartitions::new(metadata.clone(), builder.clone(), 0, None);
910
911        let new_parts = partitions.new_with_part_duration(Some(Duration::from_secs(5)));
912        assert_eq!(Duration::from_secs(5), new_parts.part_duration());
913        assert_eq!(0, new_parts.next_memtable_id());
914
915        // Won't update the duration if it's None.
916        let new_parts = new_parts.new_with_part_duration(None);
917        assert_eq!(Duration::from_secs(5), new_parts.part_duration());
918        // Don't need to create new memtables.
919        assert_eq!(0, new_parts.next_memtable_id());
920
921        let new_parts = new_parts.new_with_part_duration(Some(Duration::from_secs(10)));
922        assert_eq!(Duration::from_secs(10), new_parts.part_duration());
923        // Don't need to create new memtables.
924        assert_eq!(0, new_parts.next_memtable_id());
925
926        let builder = Arc::new(PartitionTreeMemtableBuilder::default());
927        let partitions = TimePartitions::new(metadata.clone(), builder.clone(), 0, None);
928        // Need to build a new memtable as duration is still None.
929        let new_parts = partitions.new_with_part_duration(None);
930        assert_eq!(INITIAL_TIME_WINDOW, new_parts.part_duration());
931        assert_eq!(0, new_parts.next_memtable_id());
932    }
933
934    #[test]
935    fn test_fork_empty() {
936        let metadata = memtable_util::metadata_for_test();
937        let builder = Arc::new(PartitionTreeMemtableBuilder::default());
938        let partitions = TimePartitions::new(metadata.clone(), builder, 0, None);
939        partitions.freeze().unwrap();
940        let new_parts = partitions.fork(&metadata, None);
941        assert_eq!(INITIAL_TIME_WINDOW, new_parts.part_duration());
942        assert!(new_parts.list_partitions().is_empty());
943        assert_eq!(0, new_parts.next_memtable_id());
944
945        new_parts.freeze().unwrap();
946        let new_parts = new_parts.fork(&metadata, Some(Duration::from_secs(5)));
947        assert_eq!(Duration::from_secs(5), new_parts.part_duration());
948        assert!(new_parts.list_partitions().is_empty());
949        assert_eq!(0, new_parts.next_memtable_id());
950
951        new_parts.freeze().unwrap();
952        let new_parts = new_parts.fork(&metadata, None);
953        // Won't update the duration.
954        assert_eq!(Duration::from_secs(5), new_parts.part_duration());
955        assert!(new_parts.list_partitions().is_empty());
956        assert_eq!(0, new_parts.next_memtable_id());
957
958        new_parts.freeze().unwrap();
959        let new_parts = new_parts.fork(&metadata, Some(Duration::from_secs(10)));
960        assert_eq!(Duration::from_secs(10), new_parts.part_duration());
961        assert!(new_parts.list_partitions().is_empty());
962        assert_eq!(0, new_parts.next_memtable_id());
963    }
964
965    #[test]
966    fn test_fork_non_empty_none() {
967        let metadata = memtable_util::metadata_for_test();
968        let partitions = new_multi_partitions(&metadata);
969        partitions.freeze().unwrap();
970
971        // Won't update the duration.
972        let new_parts = partitions.fork(&metadata, None);
973        assert!(new_parts.is_empty());
974        assert_eq!(Duration::from_secs(5), new_parts.part_duration());
975        assert_eq!(2, new_parts.list_partitions()[0].memtable.id());
976        assert_eq!(3, new_parts.next_memtable_id());
977
978        // Although we don't fork a memtable multiple times, we still add a test for it.
979        let new_parts = partitions.fork(&metadata, Some(Duration::from_secs(10)));
980        assert!(new_parts.is_empty());
981        assert_eq!(Duration::from_secs(10), new_parts.part_duration());
982        assert_eq!(3, new_parts.list_partitions()[0].memtable.id());
983        assert_eq!(4, new_parts.next_memtable_id());
984    }
985
986    #[test]
987    fn test_find_partitions_by_time_range() {
988        let metadata = memtable_util::metadata_for_test();
989        let builder = Arc::new(PartitionTreeMemtableBuilder::default());
990
991        // Case 1: No time range partitioning
992        let partitions = TimePartitions::new(metadata.clone(), builder.clone(), 0, None);
993        let parts = partitions.list_partitions();
994        let (matching, missing) = partitions
995            .find_partitions_by_time_range(
996                &(Arc::new(TimestampMillisecondArray::from_iter_values(1000..=2000)) as ArrayRef),
997                &parts,
998                Timestamp::new_millisecond(1000),
999                Timestamp::new_millisecond(2000),
1000            )
1001            .unwrap();
1002        assert_eq!(matching.len(), 0);
1003        assert_eq!(missing.len(), 1);
1004        assert_eq!(missing[0], Timestamp::new_millisecond(0));
1005
1006        // Case 2: With time range partitioning
1007        let partitions = TimePartitions::new(
1008            metadata.clone(),
1009            builder.clone(),
1010            0,
1011            Some(Duration::from_secs(5)),
1012        );
1013
1014        // Create two existing partitions: [0, 5000) and [5000, 10000)
1015        let kvs =
1016            memtable_util::build_key_values(&metadata, "hello".to_string(), 0, &[2000, 4000], 0);
1017        partitions.write(&kvs).unwrap();
1018        let kvs =
1019            memtable_util::build_key_values(&metadata, "hello".to_string(), 0, &[7000, 8000], 2);
1020        partitions.write(&kvs).unwrap();
1021
1022        let parts = partitions.list_partitions();
1023        assert_eq!(2, parts.len());
1024
1025        // Test case 2a: Query fully within existing partition
1026        let (matching, missing) = partitions
1027            .find_partitions_by_time_range(
1028                &(Arc::new(TimestampMillisecondArray::from_iter_values(2000..=4000)) as ArrayRef),
1029                &parts,
1030                Timestamp::new_millisecond(2000),
1031                Timestamp::new_millisecond(4000),
1032            )
1033            .unwrap();
1034        assert_eq!(matching.len(), 1);
1035        assert!(missing.is_empty());
1036        assert_eq!(matching[0].time_range.min_timestamp.value(), 0);
1037
1038        // Test case 2b: Query spanning multiple existing partitions
1039        let (matching, missing) = partitions
1040            .find_partitions_by_time_range(
1041                &(Arc::new(TimestampMillisecondArray::from_iter_values(3000..=8000)) as ArrayRef),
1042                &parts,
1043                Timestamp::new_millisecond(3000),
1044                Timestamp::new_millisecond(8000),
1045            )
1046            .unwrap();
1047        assert_eq!(matching.len(), 2);
1048        assert!(missing.is_empty());
1049        assert_eq!(matching[0].time_range.min_timestamp.value(), 0);
1050        assert_eq!(matching[1].time_range.min_timestamp.value(), 5000);
1051
1052        // Test case 2c: Query requiring new partition
1053        let (matching, missing) = partitions
1054            .find_partitions_by_time_range(
1055                &(Arc::new(TimestampMillisecondArray::from_iter_values(12000..=13000)) as ArrayRef),
1056                &parts,
1057                Timestamp::new_millisecond(12000),
1058                Timestamp::new_millisecond(13000),
1059            )
1060            .unwrap();
1061        assert!(matching.is_empty());
1062        assert_eq!(missing.len(), 1);
1063        assert_eq!(missing[0].value(), 10000);
1064
1065        // Test case 2d: Query partially overlapping existing partition
1066        let (matching, missing) = partitions
1067            .find_partitions_by_time_range(
1068                &(Arc::new(TimestampMillisecondArray::from_iter_values(4000..=6000)) as ArrayRef),
1069                &parts,
1070                Timestamp::new_millisecond(4000),
1071                Timestamp::new_millisecond(6000),
1072            )
1073            .unwrap();
1074        assert_eq!(matching.len(), 2);
1075        assert!(missing.is_empty());
1076        assert_eq!(matching[0].time_range.min_timestamp.value(), 0);
1077        assert_eq!(matching[1].time_range.min_timestamp.value(), 5000);
1078
1079        // Test case 2e: Corner case
1080        let (matching, missing) = partitions
1081            .find_partitions_by_time_range(
1082                &(Arc::new(TimestampMillisecondArray::from_iter_values(4999..=5000)) as ArrayRef),
1083                &parts,
1084                Timestamp::new_millisecond(4999),
1085                Timestamp::new_millisecond(5000),
1086            )
1087            .unwrap();
1088        assert_eq!(matching.len(), 2);
1089        assert!(missing.is_empty());
1090        assert_eq!(matching[0].time_range.min_timestamp.value(), 0);
1091        assert_eq!(matching[1].time_range.min_timestamp.value(), 5000);
1092
1093        // Test case 2f: Corner case with
1094        let (matching, missing) = partitions
1095            .find_partitions_by_time_range(
1096                &(Arc::new(TimestampMillisecondArray::from_iter_values(9999..=10000)) as ArrayRef),
1097                &parts,
1098                Timestamp::new_millisecond(9999),
1099                Timestamp::new_millisecond(10000),
1100            )
1101            .unwrap();
1102        assert_eq!(matching.len(), 1);
1103        assert_eq!(1, missing.len());
1104        assert_eq!(matching[0].time_range.min_timestamp.value(), 5000);
1105        assert_eq!(missing[0].value(), 10000);
1106
1107        // Test case 2g: Cross 0
1108        let (matching, missing) = partitions
1109            .find_partitions_by_time_range(
1110                &(Arc::new(TimestampMillisecondArray::from_iter_values(-1000..=1000)) as ArrayRef),
1111                &parts,
1112                Timestamp::new_millisecond(-1000),
1113                Timestamp::new_millisecond(1000),
1114            )
1115            .unwrap();
1116        assert_eq!(matching.len(), 1);
1117        assert_eq!(matching[0].time_range.min_timestamp.value(), 0);
1118        assert_eq!(1, missing.len());
1119        assert_eq!(missing[0].value(), -5000);
1120
1121        // Test case 3: sparse data
1122        let (matching, missing) = partitions
1123            .find_partitions_by_time_range(
1124                &(Arc::new(TimestampMillisecondArray::from(vec![
1125                    -100000000000,
1126                    0,
1127                    100000000000,
1128                ])) as ArrayRef),
1129                &parts,
1130                Timestamp::new_millisecond(-100000000000),
1131                Timestamp::new_millisecond(100000000000),
1132            )
1133            .unwrap();
1134        assert_eq!(2, matching.len());
1135        assert_eq!(matching[0].time_range.min_timestamp.value(), 0);
1136        assert_eq!(matching[1].time_range.min_timestamp.value(), 5000);
1137        assert_eq!(2, missing.len());
1138        assert_eq!(missing[0].value(), -100000000000);
1139        assert_eq!(missing[1].value(), 100000000000);
1140    }
1141
1142    fn build_part(ts: &[i64], sequence: SequenceNumber) -> BulkPart {
1143        let schema = Arc::new(Schema::new(vec![
1144            Field::new(
1145                "ts",
1146                DataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
1147                false,
1148            ),
1149            Field::new("val", DataType::Utf8, true),
1150        ]));
1151        let ts_data = ts.to_vec();
1152        let ts_array = Arc::new(TimestampMillisecondArray::from(ts_data));
1153        let val_array = Arc::new(StringArray::from_iter_values(
1154            ts.iter().map(|v| v.to_string()),
1155        ));
1156        let batch = RecordBatch::try_new(
1157            schema,
1158            vec![ts_array.clone() as ArrayRef, val_array.clone() as ArrayRef],
1159        )
1160        .unwrap();
1161        let max_ts = ts.iter().max().copied().unwrap();
1162        let min_ts = ts.iter().min().copied().unwrap();
1163        BulkPart {
1164            batch,
1165            max_ts,
1166            min_ts,
1167            sequence,
1168            timestamp_index: 0,
1169            raw_data: None,
1170        }
1171    }
1172
1173    #[test]
1174    fn test_write_bulk() {
1175        let mut metadata_builder = RegionMetadataBuilder::new(0.into());
1176        metadata_builder
1177            .push_column_metadata(ColumnMetadata {
1178                column_schema: ColumnSchema::new(
1179                    "ts",
1180                    ConcreteDataType::timestamp_millisecond_datatype(),
1181                    false,
1182                ),
1183                semantic_type: SemanticType::Timestamp,
1184                column_id: 0,
1185            })
1186            .push_column_metadata(ColumnMetadata {
1187                column_schema: ColumnSchema::new("val", ConcreteDataType::string_datatype(), false),
1188                semantic_type: SemanticType::Field,
1189                column_id: 1,
1190            })
1191            .primary_key(vec![]);
1192        let metadata = Arc::new(metadata_builder.build().unwrap());
1193
1194        let builder = Arc::new(TimeSeriesMemtableBuilder::default());
1195        let partitions = TimePartitions::new(
1196            metadata.clone(),
1197            builder.clone(),
1198            0,
1199            Some(Duration::from_secs(5)),
1200        );
1201
1202        // Test case 1: Write to single partition
1203        partitions
1204            .write_bulk(build_part(&[1000, 2000, 3000], 0))
1205            .unwrap();
1206
1207        let parts = partitions.list_partitions();
1208        assert_eq!(1, parts.len());
1209        let iter = parts[0].memtable.iter(None, None, None).unwrap();
1210        let timestamps = collect_iter_timestamps(iter);
1211        assert_eq!(&[1000, 2000, 3000], &timestamps[..]);
1212
1213        // Test case 2: Write across multiple existing partitions
1214        partitions
1215            .write_bulk(build_part(&[4000, 5000, 6000], 1))
1216            .unwrap();
1217        let parts = partitions.list_partitions();
1218        assert_eq!(2, parts.len());
1219        // Check first partition [0, 5000)
1220        let iter = parts[0].memtable.iter(None, None, None).unwrap();
1221        let timestamps = collect_iter_timestamps(iter);
1222        assert_eq!(&[1000, 2000, 3000, 4000], &timestamps[..]);
1223        // Check second partition [5000, 10000)
1224        let iter = parts[1].memtable.iter(None, None, None).unwrap();
1225        let timestamps = collect_iter_timestamps(iter);
1226        assert_eq!(&[5000, 6000], &timestamps[..]);
1227
1228        // Test case 3: Write requiring new partition
1229        partitions
1230            .write_bulk(build_part(&[11000, 12000], 3))
1231            .unwrap();
1232
1233        let parts = partitions.list_partitions();
1234        assert_eq!(3, parts.len());
1235
1236        // Check new partition [10000, 15000)
1237        let iter = parts[2].memtable.iter(None, None, None).unwrap();
1238        let timestamps = collect_iter_timestamps(iter);
1239        assert_eq!(&[11000, 12000], &timestamps[..]);
1240
1241        // Test case 4: Write with no time range partitioning
1242        let partitions = TimePartitions::new(metadata.clone(), builder, 3, None);
1243
1244        partitions
1245            .write_bulk(build_part(&[1000, 5000, 9000], 4))
1246            .unwrap();
1247
1248        let parts = partitions.list_partitions();
1249        assert_eq!(1, parts.len());
1250        let iter = parts[0].memtable.iter(None, None, None).unwrap();
1251        let timestamps = collect_iter_timestamps(iter);
1252        assert_eq!(&[1000, 5000, 9000], &timestamps[..]);
1253    }
1254
1255    #[test]
1256    fn test_split_record_batch() {
1257        let schema = Arc::new(Schema::new(vec![
1258            Field::new(
1259                "ts",
1260                DataType::Timestamp(TimeUnit::Millisecond.as_arrow_time_unit(), None),
1261                false,
1262            ),
1263            Field::new("val", DataType::Utf8, true),
1264        ]));
1265
1266        let ts_array = Arc::new(TimestampMillisecondArray::from(vec![
1267            1000, 2000, 5000, 7000, 8000,
1268        ]));
1269        let val_array = Arc::new(StringArray::from(vec!["a", "b", "c", "d", "e"]));
1270        let batch = RecordBatch::try_new(
1271            schema.clone(),
1272            vec![ts_array as ArrayRef, val_array as ArrayRef],
1273        )
1274        .unwrap();
1275
1276        let part = BulkPart {
1277            batch,
1278            max_ts: 8000,
1279            min_ts: 1000,
1280            sequence: 0,
1281            timestamp_index: 0,
1282            raw_data: None,
1283        };
1284
1285        let result = filter_record_batch(&part, 1000, 2000).unwrap();
1286        assert!(result.is_some());
1287        let filtered = result.unwrap();
1288        assert_eq!(filtered.num_rows(), 1);
1289        assert_eq!(filtered.min_ts, 1000);
1290        assert_eq!(filtered.max_ts, 1000);
1291
1292        // Test splitting with range [3000, 6000)
1293        let result = filter_record_batch(&part, 3000, 6000).unwrap();
1294        assert!(result.is_some());
1295        let filtered = result.unwrap();
1296        assert_eq!(filtered.num_rows(), 1);
1297        assert_eq!(filtered.min_ts, 5000);
1298        assert_eq!(filtered.max_ts, 5000);
1299
1300        // Test splitting with range that includes no points
1301        let result = filter_record_batch(&part, 3000, 4000).unwrap();
1302        assert!(result.is_none());
1303
1304        // Test splitting with range that includes all points
1305        let result = filter_record_batch(&part, 0, 9000).unwrap();
1306        assert!(result.is_some());
1307        let filtered = result.unwrap();
1308        assert_eq!(filtered.num_rows(), 5);
1309        assert_eq!(filtered.min_ts, 1000);
1310        assert_eq!(filtered.max_ts, 8000);
1311    }
1312}