mito2/memtable/
time_partition.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Partitions memtables by time.
16
17use std::collections::{HashMap, HashSet};
18use std::sync::{Arc, Mutex, MutexGuard};
19use std::time::Duration;
20
21use common_telemetry::debug;
22use common_time::timestamp::TimeUnit;
23use common_time::timestamp_millis::BucketAligned;
24use common_time::Timestamp;
25use datatypes::arrow;
26use datatypes::arrow::array::{
27    ArrayRef, BooleanArray, RecordBatch, RecordBatchOptions, TimestampMicrosecondArray,
28    TimestampMillisecondArray, TimestampNanosecondArray, TimestampSecondArray,
29};
30use datatypes::arrow::buffer::{BooleanBuffer, MutableBuffer};
31use datatypes::arrow::datatypes::{DataType, Int64Type};
32use mito_codec::key_values::KeyValue;
33use smallvec::{smallvec, SmallVec};
34use snafu::{OptionExt, ResultExt};
35use store_api::metadata::RegionMetadataRef;
36
37use crate::error;
38use crate::error::{InvalidRequestSnafu, Result};
39use crate::memtable::bulk::part::BulkPart;
40use crate::memtable::version::SmallMemtableVec;
41use crate::memtable::{KeyValues, MemtableBuilderRef, MemtableId, MemtableRef};
42
43/// Initial time window if not specified.
44const INITIAL_TIME_WINDOW: Duration = Duration::from_days(1);
45
46/// A partition holds rows with timestamps between `[min, max)`.
47#[derive(Debug, Clone)]
48pub struct TimePartition {
49    /// Memtable of the partition.
50    memtable: MemtableRef,
51    /// Time range of the partition. `min` is inclusive and `max` is exclusive.
52    time_range: PartTimeRange,
53}
54
55impl TimePartition {
56    /// Returns whether the `ts` belongs to the partition.
57    fn contains_timestamp(&self, ts: Timestamp) -> bool {
58        self.time_range.contains_timestamp(ts)
59    }
60
61    /// Write rows to the part.
62    fn write(&self, kvs: &KeyValues) -> Result<()> {
63        self.memtable.write(kvs)
64    }
65
66    /// Writes a record batch to memtable.
67    fn write_record_batch(&self, rb: BulkPart) -> Result<()> {
68        self.memtable.write_bulk(rb)
69    }
70
71    /// Write a partial [BulkPart] according to [TimePartition::time_range].
72    fn write_record_batch_partial(&self, part: &BulkPart) -> Result<()> {
73        let Some(filtered) = filter_record_batch(
74            part,
75            self.time_range.min_timestamp.value(),
76            self.time_range.max_timestamp.value(),
77        )?
78        else {
79            return Ok(());
80        };
81        self.write_record_batch(filtered)
82    }
83}
84
85macro_rules! create_filter_buffer {
86    ($ts_array:expr, $min:expr, $max:expr) => {{
87        let len = $ts_array.len();
88        let mut buffer = MutableBuffer::new(len.div_ceil(64) * 8);
89
90        let f = |idx: usize| -> bool {
91            // SAFETY: we only iterate the array within index bound.
92            unsafe {
93                let val = $ts_array.value_unchecked(idx);
94                val >= $min && val < $max
95            }
96        };
97
98        let chunks = len / 64;
99        let remainder = len % 64;
100
101        for chunk in 0..chunks {
102            let mut packed = 0;
103            for bit_idx in 0..64 {
104                let i = bit_idx + chunk * 64;
105                packed |= (f(i) as u64) << bit_idx;
106            }
107            // SAFETY: Already allocated sufficient capacity
108            unsafe { buffer.push_unchecked(packed) }
109        }
110
111        if remainder != 0 {
112            let mut packed = 0;
113            for bit_idx in 0..remainder {
114                let i = bit_idx + chunks * 64;
115                packed |= (f(i) as u64) << bit_idx;
116            }
117            // SAFETY: Already allocated sufficient capacity
118            unsafe { buffer.push_unchecked(packed) }
119        }
120
121        BooleanArray::new(BooleanBuffer::new(buffer.into(), 0, len), None)
122    }};
123}
124
125macro_rules! handle_timestamp_array {
126    ($ts_array:expr, $array_type:ty, $min:expr, $max:expr) => {{
127        let ts_array = $ts_array.as_any().downcast_ref::<$array_type>().unwrap();
128        let filter = create_filter_buffer!(ts_array, $min, $max);
129
130        let res = arrow::compute::filter(ts_array, &filter).context(error::ComputeArrowSnafu)?;
131        if res.is_empty() {
132            return Ok(None);
133        }
134
135        let i64array = res.as_any().downcast_ref::<$array_type>().unwrap();
136        // safety: we've checked res is not empty
137        let max_ts = arrow::compute::max(i64array).unwrap();
138        let min_ts = arrow::compute::min(i64array).unwrap();
139
140        (res, filter, min_ts, max_ts)
141    }};
142}
143
144/// Filters the given part according to min (inclusive) and max (exclusive) timestamp range.
145/// Returns [None] if no matching rows.
146pub fn filter_record_batch(part: &BulkPart, min: i64, max: i64) -> Result<Option<BulkPart>> {
147    let ts_array = part.timestamps();
148    let (ts_array, filter, min_ts, max_ts) = match ts_array.data_type() {
149        DataType::Timestamp(unit, _) => match unit {
150            arrow::datatypes::TimeUnit::Second => {
151                handle_timestamp_array!(ts_array, TimestampSecondArray, min, max)
152            }
153            arrow::datatypes::TimeUnit::Millisecond => {
154                handle_timestamp_array!(ts_array, TimestampMillisecondArray, min, max)
155            }
156            arrow::datatypes::TimeUnit::Microsecond => {
157                handle_timestamp_array!(ts_array, TimestampMicrosecondArray, min, max)
158            }
159            arrow::datatypes::TimeUnit::Nanosecond => {
160                handle_timestamp_array!(ts_array, TimestampNanosecondArray, min, max)
161            }
162        },
163        _ => {
164            unreachable!("Got data type: {:?}", ts_array.data_type());
165        }
166    };
167
168    let num_rows = ts_array.len();
169    let arrays = part
170        .batch
171        .columns()
172        .iter()
173        .enumerate()
174        .map(|(index, array)| {
175            if index == part.timestamp_index {
176                Ok(ts_array.clone())
177            } else {
178                arrow::compute::filter(&array, &filter).context(error::ComputeArrowSnafu)
179            }
180        })
181        .collect::<Result<Vec<_>>>()?;
182    let batch = RecordBatch::try_new_with_options(
183        part.batch.schema(),
184        arrays,
185        &RecordBatchOptions::default().with_row_count(Some(num_rows)),
186    )
187    .context(error::NewRecordBatchSnafu)?;
188    Ok(Some(BulkPart {
189        batch,
190        max_ts,
191        min_ts,
192        sequence: part.sequence,
193        timestamp_index: part.timestamp_index,
194        raw_data: None,
195    }))
196}
197
198type PartitionVec = SmallVec<[TimePartition; 2]>;
199
200/// Partitions.
201#[derive(Debug)]
202pub struct TimePartitions {
203    /// Mutable data of partitions.
204    inner: Mutex<PartitionsInner>,
205    /// Duration of a partition.
206    part_duration: Duration,
207    /// Metadata of the region.
208    metadata: RegionMetadataRef,
209    /// Builder of memtables.
210    builder: MemtableBuilderRef,
211}
212
213pub type TimePartitionsRef = Arc<TimePartitions>;
214
215impl TimePartitions {
216    /// Returns a new empty partition list with optional duration.
217    pub fn new(
218        metadata: RegionMetadataRef,
219        builder: MemtableBuilderRef,
220        next_memtable_id: MemtableId,
221        part_duration: Option<Duration>,
222    ) -> Self {
223        let inner = PartitionsInner::new(next_memtable_id);
224        Self {
225            inner: Mutex::new(inner),
226            part_duration: part_duration.unwrap_or(INITIAL_TIME_WINDOW),
227            metadata,
228            builder,
229        }
230    }
231
232    /// Write key values to memtables.
233    ///
234    /// It creates new partitions if necessary.
235    pub fn write(&self, kvs: &KeyValues) -> Result<()> {
236        // Get all parts.
237        let parts = self.list_partitions();
238
239        // Checks whether all rows belongs to a single part. Checks in reverse order as we usually
240        // put to latest part.
241        for part in parts.iter().rev() {
242            let mut all_in_partition = true;
243            for kv in kvs.iter() {
244                // Safety: We checked the schema in the write request.
245                let ts = kv.timestamp().as_timestamp().unwrap().unwrap();
246                if !part.contains_timestamp(ts) {
247                    all_in_partition = false;
248                    break;
249                }
250            }
251            if !all_in_partition {
252                continue;
253            }
254
255            // We can write all rows to this part.
256            return part.write(kvs);
257        }
258
259        // Slow path: We have to split kvs by partitions.
260        self.write_multi_parts(kvs, &parts)
261    }
262
263    pub fn write_bulk(&self, part: BulkPart) -> Result<()> {
264        let time_type = self
265            .metadata
266            .time_index_column()
267            .column_schema
268            .data_type
269            .as_timestamp()
270            .unwrap();
271
272        // Get all parts.
273        let parts = self.list_partitions();
274        let (matching_parts, missing_parts) = self.find_partitions_by_time_range(
275            part.timestamps(),
276            &parts,
277            time_type.create_timestamp(part.min_ts),
278            time_type.create_timestamp(part.max_ts),
279        )?;
280
281        if matching_parts.len() == 1 && missing_parts.is_empty() {
282            // fast path: all timestamps fall in one time partition.
283            return matching_parts[0].write_record_batch(part);
284        }
285
286        for matching in matching_parts {
287            matching.write_record_batch_partial(&part)?
288        }
289
290        for missing in missing_parts {
291            let new_part = {
292                let mut inner = self.inner.lock().unwrap();
293                self.get_or_create_time_partition(missing, &mut inner)?
294            };
295            new_part.write_record_batch_partial(&part)?;
296        }
297        Ok(())
298    }
299
300    // Creates or gets parts with given start timestamp.
301    // Acquires the lock to avoid others create the same partition.
302    fn get_or_create_time_partition(
303        &self,
304        part_start: Timestamp,
305        inner: &mut MutexGuard<PartitionsInner>,
306    ) -> Result<TimePartition> {
307        let part_pos = match inner
308            .parts
309            .iter()
310            .position(|part| part.time_range.min_timestamp == part_start)
311        {
312            Some(pos) => pos,
313            None => {
314                let range = PartTimeRange::from_start_duration(part_start, self.part_duration)
315                    .with_context(|| InvalidRequestSnafu {
316                        region_id: self.metadata.region_id,
317                        reason: format!(
318                            "Partition time range for {part_start:?} is out of bound, bucket size: {:?}", self.part_duration
319                        ),
320                    })?;
321                let memtable = self
322                    .builder
323                    .build(inner.alloc_memtable_id(), &self.metadata);
324                debug!(
325                        "Create time partition {:?} for region {}, duration: {:?}, memtable_id: {}, parts_total: {}",
326                        range,
327                        self.metadata.region_id,
328                        self.part_duration,
329                        memtable.id(),
330                        inner.parts.len() + 1
331                    );
332                let pos = inner.parts.len();
333                inner.parts.push(TimePartition {
334                    memtable,
335                    time_range: range,
336                });
337                pos
338            }
339        };
340        Ok(inner.parts[part_pos].clone())
341    }
342
343    /// Append memtables in partitions to `memtables`.
344    pub fn list_memtables(&self, memtables: &mut Vec<MemtableRef>) {
345        let inner = self.inner.lock().unwrap();
346        memtables.extend(inner.parts.iter().map(|part| part.memtable.clone()));
347    }
348
349    /// Returns the number of partitions.
350    pub fn num_partitions(&self) -> usize {
351        let inner = self.inner.lock().unwrap();
352        inner.parts.len()
353    }
354
355    /// Returns true if all memtables are empty.
356    pub fn is_empty(&self) -> bool {
357        let inner = self.inner.lock().unwrap();
358        inner.parts.iter().all(|part| part.memtable.is_empty())
359    }
360
361    /// Freezes all memtables.
362    pub fn freeze(&self) -> Result<()> {
363        let inner = self.inner.lock().unwrap();
364        for part in &*inner.parts {
365            part.memtable.freeze()?;
366        }
367        Ok(())
368    }
369
370    /// Forks latest partition and updates the partition duration if `part_duration` is Some.
371    pub fn fork(&self, metadata: &RegionMetadataRef, part_duration: Option<Duration>) -> Self {
372        // Fall back to the existing partition duration.
373        let part_duration = part_duration.unwrap_or(self.part_duration);
374
375        let mut inner = self.inner.lock().unwrap();
376        let latest_part = inner
377            .parts
378            .iter()
379            .max_by_key(|part| part.time_range.min_timestamp)
380            .cloned();
381
382        let Some(old_part) = latest_part else {
383            // If there is no partition, then we create a new partition with the new duration.
384            return Self::new(
385                metadata.clone(),
386                self.builder.clone(),
387                inner.next_memtable_id,
388                Some(part_duration),
389            );
390        };
391
392        let old_stats = old_part.memtable.stats();
393        // Use the max timestamp to compute the new time range for the memtable.
394        let partitions_inner = old_stats
395            .time_range()
396            .and_then(|(_, old_stats_end_timestamp)| {
397                partition_start_timestamp(old_stats_end_timestamp, part_duration)
398                    .and_then(|start| PartTimeRange::from_start_duration(start, part_duration))
399            })
400            .map(|part_time_range| {
401                // Forks the latest partition, but compute the time range based on the new duration.
402                let memtable = old_part.memtable.fork(inner.alloc_memtable_id(), metadata);
403                let part = TimePartition {
404                    memtable,
405                    time_range: part_time_range,
406                };
407                PartitionsInner::with_partition(part, inner.next_memtable_id)
408            })
409            .unwrap_or_else(|| PartitionsInner::new(inner.next_memtable_id));
410
411        Self {
412            inner: Mutex::new(partitions_inner),
413            part_duration,
414            metadata: metadata.clone(),
415            builder: self.builder.clone(),
416        }
417    }
418
419    /// Returns partition duration.
420    pub(crate) fn part_duration(&self) -> Duration {
421        self.part_duration
422    }
423
424    /// Returns memory usage.
425    pub(crate) fn memory_usage(&self) -> usize {
426        let inner = self.inner.lock().unwrap();
427        inner
428            .parts
429            .iter()
430            .map(|part| part.memtable.stats().estimated_bytes)
431            .sum()
432    }
433
434    /// Returns the number of rows.
435    pub(crate) fn num_rows(&self) -> u64 {
436        let inner = self.inner.lock().unwrap();
437        inner
438            .parts
439            .iter()
440            .map(|part| part.memtable.stats().num_rows as u64)
441            .sum()
442    }
443
444    /// Append memtables in partitions to small vec.
445    pub(crate) fn list_memtables_to_small_vec(&self, memtables: &mut SmallMemtableVec) {
446        let inner = self.inner.lock().unwrap();
447        memtables.extend(inner.parts.iter().map(|part| part.memtable.clone()));
448    }
449
450    /// Returns the next memtable id.
451    pub(crate) fn next_memtable_id(&self) -> MemtableId {
452        let inner = self.inner.lock().unwrap();
453        inner.next_memtable_id
454    }
455
456    /// Creates a new empty partition list from this list and a `part_duration`.
457    /// It falls back to the old partition duration if `part_duration` is `None`.
458    pub(crate) fn new_with_part_duration(&self, part_duration: Option<Duration>) -> Self {
459        debug_assert!(self.is_empty());
460
461        Self::new(
462            self.metadata.clone(),
463            self.builder.clone(),
464            self.next_memtable_id(),
465            Some(part_duration.unwrap_or(self.part_duration)),
466        )
467    }
468
469    /// Returns all partitions.
470    fn list_partitions(&self) -> PartitionVec {
471        let inner = self.inner.lock().unwrap();
472        inner.parts.clone()
473    }
474
475    /// Find existing partitions that match the bulk data's time range and identify
476    /// any new partitions that need to be created
477    fn find_partitions_by_time_range<'a>(
478        &self,
479        ts_array: &ArrayRef,
480        existing_parts: &'a [TimePartition],
481        min: Timestamp,
482        max: Timestamp,
483    ) -> Result<(Vec<&'a TimePartition>, Vec<Timestamp>)> {
484        let mut matching = Vec::new();
485
486        let mut present = HashSet::new();
487        // First find any existing partitions that overlap
488        for part in existing_parts {
489            let part_time_range = &part.time_range;
490            if !(max < part_time_range.min_timestamp || min >= part_time_range.max_timestamp) {
491                matching.push(part);
492                present.insert(part_time_range.min_timestamp.value());
493            }
494        }
495
496        // safety: self.part_duration can only be present when reach here.
497        let part_duration = self.part_duration_or_default();
498        let timestamp_unit = self.metadata.time_index_type().unit();
499
500        let part_duration_sec = part_duration.as_secs() as i64;
501        // SAFETY: Timestamps won't overflow when converting to Second.
502        let start_bucket = min
503            .convert_to(TimeUnit::Second)
504            .unwrap()
505            .value()
506            .div_euclid(part_duration_sec);
507        let end_bucket = max
508            .convert_to(TimeUnit::Second)
509            .unwrap()
510            .value()
511            .div_euclid(part_duration_sec);
512        let bucket_num = (end_bucket - start_bucket + 1) as usize;
513
514        let num_timestamps = ts_array.len();
515        let missing = if bucket_num <= num_timestamps {
516            (start_bucket..=end_bucket)
517                .filter_map(|start_sec| {
518                    let Some(timestamp) = Timestamp::new_second(start_sec * part_duration_sec)
519                        .convert_to(timestamp_unit)
520                    else {
521                        return Some(
522                            InvalidRequestSnafu {
523                                region_id: self.metadata.region_id,
524                                reason: format!("Timestamp out of range: {}", start_sec),
525                            }
526                            .fail(),
527                        );
528                    };
529                    if present.insert(timestamp.value()) {
530                        Some(Ok(timestamp))
531                    } else {
532                        None
533                    }
534                })
535                .collect::<Result<Vec<_>>>()?
536        } else {
537            let ts_primitive = match ts_array.data_type() {
538                DataType::Timestamp(unit, _) => match unit {
539                    arrow::datatypes::TimeUnit::Second => ts_array
540                        .as_any()
541                        .downcast_ref::<TimestampSecondArray>()
542                        .unwrap()
543                        .reinterpret_cast::<Int64Type>(),
544                    arrow::datatypes::TimeUnit::Millisecond => ts_array
545                        .as_any()
546                        .downcast_ref::<TimestampMillisecondArray>()
547                        .unwrap()
548                        .reinterpret_cast::<Int64Type>(),
549                    arrow::datatypes::TimeUnit::Microsecond => ts_array
550                        .as_any()
551                        .downcast_ref::<TimestampMicrosecondArray>()
552                        .unwrap()
553                        .reinterpret_cast::<Int64Type>(),
554                    arrow::datatypes::TimeUnit::Nanosecond => ts_array
555                        .as_any()
556                        .downcast_ref::<TimestampNanosecondArray>()
557                        .unwrap()
558                        .reinterpret_cast::<Int64Type>(),
559                },
560                _ => unreachable!(),
561            };
562
563            ts_primitive
564                .values()
565                .iter()
566                .filter_map(|ts| {
567                    let ts = self.metadata.time_index_type().create_timestamp(*ts);
568                    let Some(bucket_start) = ts
569                        .convert_to(TimeUnit::Second)
570                        .and_then(|ts| ts.align_by_bucket(part_duration_sec))
571                        .and_then(|ts| ts.convert_to(timestamp_unit))
572                    else {
573                        return Some(
574                            InvalidRequestSnafu {
575                                region_id: self.metadata.region_id,
576                                reason: format!("Timestamp out of range: {:?}", ts),
577                            }
578                            .fail(),
579                        );
580                    };
581                    if present.insert(bucket_start.value()) {
582                        Some(Ok(bucket_start))
583                    } else {
584                        None
585                    }
586                })
587                .collect::<Result<Vec<_>>>()?
588        };
589        Ok((matching, missing))
590    }
591
592    /// Returns partition duration, or use default 1day duration is not present.
593    fn part_duration_or_default(&self) -> Duration {
594        self.part_duration
595    }
596
597    /// Write to multiple partitions.
598    fn write_multi_parts(&self, kvs: &KeyValues, parts: &PartitionVec) -> Result<()> {
599        let mut parts_to_write = HashMap::new();
600        let mut missing_parts = HashMap::new();
601        for kv in kvs.iter() {
602            let mut part_found = false;
603            // Safety: We used the timestamp before.
604            let ts = kv.timestamp().as_timestamp().unwrap().unwrap();
605            for part in parts {
606                if part.contains_timestamp(ts) {
607                    parts_to_write
608                        .entry(part.time_range.min_timestamp)
609                        .or_insert_with(|| PartitionToWrite {
610                            partition: part.clone(),
611                            key_values: Vec::new(),
612                        })
613                        .key_values
614                        .push(kv);
615                    part_found = true;
616                    break;
617                }
618            }
619
620            if !part_found {
621                // We need to write it to a new part.
622                // Safety: `new()` ensures duration is always Some if we do to this method.
623                let part_duration = self.part_duration_or_default();
624                let part_start =
625                    partition_start_timestamp(ts, part_duration).with_context(|| {
626                        InvalidRequestSnafu {
627                            region_id: self.metadata.region_id,
628                            reason: format!(
629                                "timestamp {ts:?} and bucket {part_duration:?} are out of range"
630                            ),
631                        }
632                    })?;
633                missing_parts
634                    .entry(part_start)
635                    .or_insert_with(Vec::new)
636                    .push(kv);
637            }
638        }
639
640        // Writes rows to existing parts.
641        for part_to_write in parts_to_write.into_values() {
642            for kv in part_to_write.key_values {
643                part_to_write.partition.memtable.write_one(kv)?;
644            }
645        }
646
647        // Creates new parts and writes to them. Acquires the lock to avoid others create
648        // the same partition.
649        let mut inner = self.inner.lock().unwrap();
650        for (part_start, key_values) in missing_parts {
651            let partition = self.get_or_create_time_partition(part_start, &mut inner)?;
652            for kv in key_values {
653                partition.memtable.write_one(kv)?;
654            }
655        }
656
657        Ok(())
658    }
659}
660
661/// Computes the start timestamp of the partition for `ts`.
662///
663/// It always use bucket size in seconds which should fit all timestamp resolution.
664fn partition_start_timestamp(ts: Timestamp, bucket: Duration) -> Option<Timestamp> {
665    // Safety: We convert it to seconds so it never returns `None`.
666    let ts_sec = ts.convert_to(TimeUnit::Second).unwrap();
667    let bucket_sec: i64 = bucket.as_secs().try_into().ok()?;
668    let start_sec = ts_sec.align_by_bucket(bucket_sec)?;
669    start_sec.convert_to(ts.unit())
670}
671
672#[derive(Debug)]
673struct PartitionsInner {
674    /// All partitions.
675    parts: PartitionVec,
676    /// Next memtable id.
677    next_memtable_id: MemtableId,
678}
679
680impl PartitionsInner {
681    fn new(next_memtable_id: MemtableId) -> Self {
682        Self {
683            parts: Default::default(),
684            next_memtable_id,
685        }
686    }
687
688    fn with_partition(part: TimePartition, next_memtable_id: MemtableId) -> Self {
689        Self {
690            parts: smallvec![part],
691            next_memtable_id,
692        }
693    }
694
695    fn alloc_memtable_id(&mut self) -> MemtableId {
696        let id = self.next_memtable_id;
697        self.next_memtable_id += 1;
698        id
699    }
700}
701
702/// Time range of a partition.
703#[derive(Debug, Clone, Copy)]
704struct PartTimeRange {
705    /// Inclusive min timestamp of rows in the partition.
706    min_timestamp: Timestamp,
707    /// Exclusive max timestamp of rows in the partition.
708    max_timestamp: Timestamp,
709}
710
711impl PartTimeRange {
712    fn from_start_duration(start: Timestamp, duration: Duration) -> Option<Self> {
713        let start_sec = start.convert_to(TimeUnit::Second)?;
714        let end_sec = start_sec.add_duration(duration).ok()?;
715        let min_timestamp = start_sec.convert_to(start.unit())?;
716        let max_timestamp = end_sec.convert_to(start.unit())?;
717
718        Some(Self {
719            min_timestamp,
720            max_timestamp,
721        })
722    }
723
724    /// Returns whether the `ts` belongs to the partition.
725    fn contains_timestamp(&self, ts: Timestamp) -> bool {
726        self.min_timestamp <= ts && ts < self.max_timestamp
727    }
728}
729
730struct PartitionToWrite<'a> {
731    partition: TimePartition,
732    key_values: Vec<KeyValue<'a>>,
733}
734
735#[cfg(test)]
736mod tests {
737    use std::sync::Arc;
738
739    use api::v1::SemanticType;
740    use datatypes::arrow::array::{ArrayRef, StringArray, TimestampMillisecondArray};
741    use datatypes::arrow::datatypes::{DataType, Field, Schema};
742    use datatypes::arrow::record_batch::RecordBatch;
743    use datatypes::prelude::ConcreteDataType;
744    use datatypes::schema::ColumnSchema;
745    use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
746    use store_api::storage::SequenceNumber;
747
748    use super::*;
749    use crate::memtable::partition_tree::PartitionTreeMemtableBuilder;
750    use crate::memtable::time_series::TimeSeriesMemtableBuilder;
751    use crate::test_util::memtable_util::{self, collect_iter_timestamps};
752
753    #[test]
754    fn test_no_duration() {
755        let metadata = memtable_util::metadata_for_test();
756        let builder = Arc::new(PartitionTreeMemtableBuilder::default());
757        let partitions = TimePartitions::new(metadata.clone(), builder, 0, None);
758        assert_eq!(0, partitions.num_partitions());
759        assert!(partitions.is_empty());
760
761        let kvs = memtable_util::build_key_values(
762            &metadata,
763            "hello".to_string(),
764            0,
765            &[1000, 3000, 7000, 5000, 6000],
766            0, // sequence 0, 1, 2, 3, 4
767        );
768        partitions.write(&kvs).unwrap();
769
770        assert_eq!(1, partitions.num_partitions());
771        assert!(!partitions.is_empty());
772        let mut memtables = Vec::new();
773        partitions.list_memtables(&mut memtables);
774        assert_eq!(0, memtables[0].id());
775
776        let iter = memtables[0].iter(None, None, None).unwrap();
777        let timestamps = collect_iter_timestamps(iter);
778        assert_eq!(&[1000, 3000, 5000, 6000, 7000], &timestamps[..]);
779    }
780
781    #[test]
782    fn test_write_single_part() {
783        let metadata = memtable_util::metadata_for_test();
784        let builder = Arc::new(PartitionTreeMemtableBuilder::default());
785        let partitions =
786            TimePartitions::new(metadata.clone(), builder, 0, Some(Duration::from_secs(10)));
787        assert_eq!(0, partitions.num_partitions());
788
789        let kvs = memtable_util::build_key_values(
790            &metadata,
791            "hello".to_string(),
792            0,
793            &[5000, 2000, 0],
794            0, // sequence 0, 1, 2
795        );
796        // It should creates a new partition.
797        partitions.write(&kvs).unwrap();
798        assert_eq!(1, partitions.num_partitions());
799        assert!(!partitions.is_empty());
800
801        let kvs = memtable_util::build_key_values(
802            &metadata,
803            "hello".to_string(),
804            0,
805            &[3000, 7000, 4000],
806            3, // sequence 3, 4, 5
807        );
808        // Still writes to the same partition.
809        partitions.write(&kvs).unwrap();
810        assert_eq!(1, partitions.num_partitions());
811
812        let mut memtables = Vec::new();
813        partitions.list_memtables(&mut memtables);
814        let iter = memtables[0].iter(None, None, None).unwrap();
815        let timestamps = collect_iter_timestamps(iter);
816        assert_eq!(&[0, 2000, 3000, 4000, 5000, 7000], &timestamps[..]);
817        let parts = partitions.list_partitions();
818        assert_eq!(
819            Timestamp::new_millisecond(0),
820            parts[0].time_range.min_timestamp
821        );
822        assert_eq!(
823            Timestamp::new_millisecond(10000),
824            parts[0].time_range.max_timestamp
825        );
826    }
827
828    #[cfg(test)]
829    fn new_multi_partitions(metadata: &RegionMetadataRef) -> TimePartitions {
830        let builder = Arc::new(PartitionTreeMemtableBuilder::default());
831        let partitions =
832            TimePartitions::new(metadata.clone(), builder, 0, Some(Duration::from_secs(5)));
833        assert_eq!(0, partitions.num_partitions());
834
835        let kvs = memtable_util::build_key_values(
836            metadata,
837            "hello".to_string(),
838            0,
839            &[2000, 0],
840            0, // sequence 0, 1
841        );
842        // It should creates a new partition.
843        partitions.write(&kvs).unwrap();
844        assert_eq!(1, partitions.num_partitions());
845        assert!(!partitions.is_empty());
846
847        let kvs = memtable_util::build_key_values(
848            metadata,
849            "hello".to_string(),
850            0,
851            &[3000, 7000, 4000, 5000],
852            2, // sequence 2, 3, 4, 5
853        );
854        // Writes 2 rows to the old partition and 1 row to a new partition.
855        partitions.write(&kvs).unwrap();
856        assert_eq!(2, partitions.num_partitions());
857
858        partitions
859    }
860
861    #[test]
862    fn test_write_multi_parts() {
863        let metadata = memtable_util::metadata_for_test();
864        let partitions = new_multi_partitions(&metadata);
865
866        let parts = partitions.list_partitions();
867        let iter = parts[0].memtable.iter(None, None, None).unwrap();
868        let timestamps = collect_iter_timestamps(iter);
869        assert_eq!(0, parts[0].memtable.id());
870        assert_eq!(
871            Timestamp::new_millisecond(0),
872            parts[0].time_range.min_timestamp
873        );
874        assert_eq!(
875            Timestamp::new_millisecond(5000),
876            parts[0].time_range.max_timestamp
877        );
878        assert_eq!(&[0, 2000, 3000, 4000], &timestamps[..]);
879        let iter = parts[1].memtable.iter(None, None, None).unwrap();
880        assert_eq!(1, parts[1].memtable.id());
881        let timestamps = collect_iter_timestamps(iter);
882        assert_eq!(&[5000, 7000], &timestamps[..]);
883        assert_eq!(
884            Timestamp::new_millisecond(5000),
885            parts[1].time_range.min_timestamp
886        );
887        assert_eq!(
888            Timestamp::new_millisecond(10000),
889            parts[1].time_range.max_timestamp
890        );
891    }
892
893    #[test]
894    fn test_new_with_part_duration() {
895        let metadata = memtable_util::metadata_for_test();
896        let builder = Arc::new(PartitionTreeMemtableBuilder::default());
897        let partitions = TimePartitions::new(metadata.clone(), builder.clone(), 0, None);
898
899        let new_parts = partitions.new_with_part_duration(Some(Duration::from_secs(5)));
900        assert_eq!(Duration::from_secs(5), new_parts.part_duration());
901        assert_eq!(0, new_parts.next_memtable_id());
902
903        // Won't update the duration if it's None.
904        let new_parts = new_parts.new_with_part_duration(None);
905        assert_eq!(Duration::from_secs(5), new_parts.part_duration());
906        // Don't need to create new memtables.
907        assert_eq!(0, new_parts.next_memtable_id());
908
909        let new_parts = new_parts.new_with_part_duration(Some(Duration::from_secs(10)));
910        assert_eq!(Duration::from_secs(10), new_parts.part_duration());
911        // Don't need to create new memtables.
912        assert_eq!(0, new_parts.next_memtable_id());
913
914        let builder = Arc::new(PartitionTreeMemtableBuilder::default());
915        let partitions = TimePartitions::new(metadata.clone(), builder.clone(), 0, None);
916        // Need to build a new memtable as duration is still None.
917        let new_parts = partitions.new_with_part_duration(None);
918        assert_eq!(INITIAL_TIME_WINDOW, new_parts.part_duration());
919        assert_eq!(0, new_parts.next_memtable_id());
920    }
921
922    #[test]
923    fn test_fork_empty() {
924        let metadata = memtable_util::metadata_for_test();
925        let builder = Arc::new(PartitionTreeMemtableBuilder::default());
926        let partitions = TimePartitions::new(metadata.clone(), builder, 0, None);
927        partitions.freeze().unwrap();
928        let new_parts = partitions.fork(&metadata, None);
929        assert_eq!(INITIAL_TIME_WINDOW, new_parts.part_duration());
930        assert!(new_parts.list_partitions().is_empty());
931        assert_eq!(0, new_parts.next_memtable_id());
932
933        new_parts.freeze().unwrap();
934        let new_parts = new_parts.fork(&metadata, Some(Duration::from_secs(5)));
935        assert_eq!(Duration::from_secs(5), new_parts.part_duration());
936        assert!(new_parts.list_partitions().is_empty());
937        assert_eq!(0, new_parts.next_memtable_id());
938
939        new_parts.freeze().unwrap();
940        let new_parts = new_parts.fork(&metadata, None);
941        // Won't update the duration.
942        assert_eq!(Duration::from_secs(5), new_parts.part_duration());
943        assert!(new_parts.list_partitions().is_empty());
944        assert_eq!(0, new_parts.next_memtable_id());
945
946        new_parts.freeze().unwrap();
947        let new_parts = new_parts.fork(&metadata, Some(Duration::from_secs(10)));
948        assert_eq!(Duration::from_secs(10), new_parts.part_duration());
949        assert!(new_parts.list_partitions().is_empty());
950        assert_eq!(0, new_parts.next_memtable_id());
951    }
952
953    #[test]
954    fn test_fork_non_empty_none() {
955        let metadata = memtable_util::metadata_for_test();
956        let partitions = new_multi_partitions(&metadata);
957        partitions.freeze().unwrap();
958
959        // Won't update the duration.
960        let new_parts = partitions.fork(&metadata, None);
961        assert!(new_parts.is_empty());
962        assert_eq!(Duration::from_secs(5), new_parts.part_duration());
963        assert_eq!(2, new_parts.list_partitions()[0].memtable.id());
964        assert_eq!(3, new_parts.next_memtable_id());
965
966        // Although we don't fork a memtable multiple times, we still add a test for it.
967        let new_parts = partitions.fork(&metadata, Some(Duration::from_secs(10)));
968        assert!(new_parts.is_empty());
969        assert_eq!(Duration::from_secs(10), new_parts.part_duration());
970        assert_eq!(3, new_parts.list_partitions()[0].memtable.id());
971        assert_eq!(4, new_parts.next_memtable_id());
972    }
973
974    #[test]
975    fn test_find_partitions_by_time_range() {
976        let metadata = memtable_util::metadata_for_test();
977        let builder = Arc::new(PartitionTreeMemtableBuilder::default());
978
979        // Case 1: No time range partitioning
980        let partitions = TimePartitions::new(metadata.clone(), builder.clone(), 0, None);
981        let parts = partitions.list_partitions();
982        let (matching, missing) = partitions
983            .find_partitions_by_time_range(
984                &(Arc::new(TimestampMillisecondArray::from_iter_values(1000..=2000)) as ArrayRef),
985                &parts,
986                Timestamp::new_millisecond(1000),
987                Timestamp::new_millisecond(2000),
988            )
989            .unwrap();
990        assert_eq!(matching.len(), 0);
991        assert_eq!(missing.len(), 1);
992        assert_eq!(missing[0], Timestamp::new_millisecond(0));
993
994        // Case 2: With time range partitioning
995        let partitions = TimePartitions::new(
996            metadata.clone(),
997            builder.clone(),
998            0,
999            Some(Duration::from_secs(5)),
1000        );
1001
1002        // Create two existing partitions: [0, 5000) and [5000, 10000)
1003        let kvs =
1004            memtable_util::build_key_values(&metadata, "hello".to_string(), 0, &[2000, 4000], 0);
1005        partitions.write(&kvs).unwrap();
1006        let kvs =
1007            memtable_util::build_key_values(&metadata, "hello".to_string(), 0, &[7000, 8000], 2);
1008        partitions.write(&kvs).unwrap();
1009
1010        let parts = partitions.list_partitions();
1011        assert_eq!(2, parts.len());
1012
1013        // Test case 2a: Query fully within existing partition
1014        let (matching, missing) = partitions
1015            .find_partitions_by_time_range(
1016                &(Arc::new(TimestampMillisecondArray::from_iter_values(2000..=4000)) as ArrayRef),
1017                &parts,
1018                Timestamp::new_millisecond(2000),
1019                Timestamp::new_millisecond(4000),
1020            )
1021            .unwrap();
1022        assert_eq!(matching.len(), 1);
1023        assert!(missing.is_empty());
1024        assert_eq!(matching[0].time_range.min_timestamp.value(), 0);
1025
1026        // Test case 2b: Query spanning multiple existing partitions
1027        let (matching, missing) = partitions
1028            .find_partitions_by_time_range(
1029                &(Arc::new(TimestampMillisecondArray::from_iter_values(3000..=8000)) as ArrayRef),
1030                &parts,
1031                Timestamp::new_millisecond(3000),
1032                Timestamp::new_millisecond(8000),
1033            )
1034            .unwrap();
1035        assert_eq!(matching.len(), 2);
1036        assert!(missing.is_empty());
1037        assert_eq!(matching[0].time_range.min_timestamp.value(), 0);
1038        assert_eq!(matching[1].time_range.min_timestamp.value(), 5000);
1039
1040        // Test case 2c: Query requiring new partition
1041        let (matching, missing) = partitions
1042            .find_partitions_by_time_range(
1043                &(Arc::new(TimestampMillisecondArray::from_iter_values(12000..=13000)) as ArrayRef),
1044                &parts,
1045                Timestamp::new_millisecond(12000),
1046                Timestamp::new_millisecond(13000),
1047            )
1048            .unwrap();
1049        assert!(matching.is_empty());
1050        assert_eq!(missing.len(), 1);
1051        assert_eq!(missing[0].value(), 10000);
1052
1053        // Test case 2d: Query partially overlapping existing partition
1054        let (matching, missing) = partitions
1055            .find_partitions_by_time_range(
1056                &(Arc::new(TimestampMillisecondArray::from_iter_values(4000..=6000)) as ArrayRef),
1057                &parts,
1058                Timestamp::new_millisecond(4000),
1059                Timestamp::new_millisecond(6000),
1060            )
1061            .unwrap();
1062        assert_eq!(matching.len(), 2);
1063        assert!(missing.is_empty());
1064        assert_eq!(matching[0].time_range.min_timestamp.value(), 0);
1065        assert_eq!(matching[1].time_range.min_timestamp.value(), 5000);
1066
1067        // Test case 2e: Corner case
1068        let (matching, missing) = partitions
1069            .find_partitions_by_time_range(
1070                &(Arc::new(TimestampMillisecondArray::from_iter_values(4999..=5000)) as ArrayRef),
1071                &parts,
1072                Timestamp::new_millisecond(4999),
1073                Timestamp::new_millisecond(5000),
1074            )
1075            .unwrap();
1076        assert_eq!(matching.len(), 2);
1077        assert!(missing.is_empty());
1078        assert_eq!(matching[0].time_range.min_timestamp.value(), 0);
1079        assert_eq!(matching[1].time_range.min_timestamp.value(), 5000);
1080
1081        // Test case 2f: Corner case with
1082        let (matching, missing) = partitions
1083            .find_partitions_by_time_range(
1084                &(Arc::new(TimestampMillisecondArray::from_iter_values(9999..=10000)) as ArrayRef),
1085                &parts,
1086                Timestamp::new_millisecond(9999),
1087                Timestamp::new_millisecond(10000),
1088            )
1089            .unwrap();
1090        assert_eq!(matching.len(), 1);
1091        assert_eq!(1, missing.len());
1092        assert_eq!(matching[0].time_range.min_timestamp.value(), 5000);
1093        assert_eq!(missing[0].value(), 10000);
1094
1095        // Test case 2g: Cross 0
1096        let (matching, missing) = partitions
1097            .find_partitions_by_time_range(
1098                &(Arc::new(TimestampMillisecondArray::from_iter_values(-1000..=1000)) as ArrayRef),
1099                &parts,
1100                Timestamp::new_millisecond(-1000),
1101                Timestamp::new_millisecond(1000),
1102            )
1103            .unwrap();
1104        assert_eq!(matching.len(), 1);
1105        assert_eq!(matching[0].time_range.min_timestamp.value(), 0);
1106        assert_eq!(1, missing.len());
1107        assert_eq!(missing[0].value(), -5000);
1108
1109        // Test case 3: sparse data
1110        let (matching, missing) = partitions
1111            .find_partitions_by_time_range(
1112                &(Arc::new(TimestampMillisecondArray::from(vec![
1113                    -100000000000,
1114                    0,
1115                    100000000000,
1116                ])) as ArrayRef),
1117                &parts,
1118                Timestamp::new_millisecond(-100000000000),
1119                Timestamp::new_millisecond(100000000000),
1120            )
1121            .unwrap();
1122        assert_eq!(2, matching.len());
1123        assert_eq!(matching[0].time_range.min_timestamp.value(), 0);
1124        assert_eq!(matching[1].time_range.min_timestamp.value(), 5000);
1125        assert_eq!(2, missing.len());
1126        assert_eq!(missing[0].value(), -100000000000);
1127        assert_eq!(missing[1].value(), 100000000000);
1128    }
1129
1130    fn build_part(ts: &[i64], sequence: SequenceNumber) -> BulkPart {
1131        let schema = Arc::new(Schema::new(vec![
1132            Field::new(
1133                "ts",
1134                DataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
1135                false,
1136            ),
1137            Field::new("val", DataType::Utf8, true),
1138        ]));
1139        let ts_data = ts.to_vec();
1140        let ts_array = Arc::new(TimestampMillisecondArray::from(ts_data));
1141        let val_array = Arc::new(StringArray::from_iter_values(
1142            ts.iter().map(|v| v.to_string()),
1143        ));
1144        let batch = RecordBatch::try_new(
1145            schema,
1146            vec![ts_array.clone() as ArrayRef, val_array.clone() as ArrayRef],
1147        )
1148        .unwrap();
1149        let max_ts = ts.iter().max().copied().unwrap();
1150        let min_ts = ts.iter().min().copied().unwrap();
1151        BulkPart {
1152            batch,
1153            max_ts,
1154            min_ts,
1155            sequence,
1156            timestamp_index: 0,
1157            raw_data: None,
1158        }
1159    }
1160
1161    #[test]
1162    fn test_write_bulk() {
1163        let mut metadata_builder = RegionMetadataBuilder::new(0.into());
1164        metadata_builder
1165            .push_column_metadata(ColumnMetadata {
1166                column_schema: ColumnSchema::new(
1167                    "ts",
1168                    ConcreteDataType::timestamp_millisecond_datatype(),
1169                    false,
1170                ),
1171                semantic_type: SemanticType::Timestamp,
1172                column_id: 0,
1173            })
1174            .push_column_metadata(ColumnMetadata {
1175                column_schema: ColumnSchema::new("val", ConcreteDataType::string_datatype(), false),
1176                semantic_type: SemanticType::Field,
1177                column_id: 1,
1178            })
1179            .primary_key(vec![]);
1180        let metadata = Arc::new(metadata_builder.build().unwrap());
1181
1182        let builder = Arc::new(TimeSeriesMemtableBuilder::default());
1183        let partitions = TimePartitions::new(
1184            metadata.clone(),
1185            builder.clone(),
1186            0,
1187            Some(Duration::from_secs(5)),
1188        );
1189
1190        // Test case 1: Write to single partition
1191        partitions
1192            .write_bulk(build_part(&[1000, 2000, 3000], 0))
1193            .unwrap();
1194
1195        let parts = partitions.list_partitions();
1196        assert_eq!(1, parts.len());
1197        let iter = parts[0].memtable.iter(None, None, None).unwrap();
1198        let timestamps = collect_iter_timestamps(iter);
1199        assert_eq!(&[1000, 2000, 3000], &timestamps[..]);
1200
1201        // Test case 2: Write across multiple existing partitions
1202        partitions
1203            .write_bulk(build_part(&[4000, 5000, 6000], 1))
1204            .unwrap();
1205        let parts = partitions.list_partitions();
1206        assert_eq!(2, parts.len());
1207        // Check first partition [0, 5000)
1208        let iter = parts[0].memtable.iter(None, None, None).unwrap();
1209        let timestamps = collect_iter_timestamps(iter);
1210        assert_eq!(&[1000, 2000, 3000, 4000], &timestamps[..]);
1211        // Check second partition [5000, 10000)
1212        let iter = parts[1].memtable.iter(None, None, None).unwrap();
1213        let timestamps = collect_iter_timestamps(iter);
1214        assert_eq!(&[5000, 6000], &timestamps[..]);
1215
1216        // Test case 3: Write requiring new partition
1217        partitions
1218            .write_bulk(build_part(&[11000, 12000], 3))
1219            .unwrap();
1220
1221        let parts = partitions.list_partitions();
1222        assert_eq!(3, parts.len());
1223
1224        // Check new partition [10000, 15000)
1225        let iter = parts[2].memtable.iter(None, None, None).unwrap();
1226        let timestamps = collect_iter_timestamps(iter);
1227        assert_eq!(&[11000, 12000], &timestamps[..]);
1228
1229        // Test case 4: Write with no time range partitioning
1230        let partitions = TimePartitions::new(metadata.clone(), builder, 3, None);
1231
1232        partitions
1233            .write_bulk(build_part(&[1000, 5000, 9000], 4))
1234            .unwrap();
1235
1236        let parts = partitions.list_partitions();
1237        assert_eq!(1, parts.len());
1238        let iter = parts[0].memtable.iter(None, None, None).unwrap();
1239        let timestamps = collect_iter_timestamps(iter);
1240        assert_eq!(&[1000, 5000, 9000], &timestamps[..]);
1241    }
1242
1243    #[test]
1244    fn test_split_record_batch() {
1245        let schema = Arc::new(Schema::new(vec![
1246            Field::new(
1247                "ts",
1248                DataType::Timestamp(TimeUnit::Millisecond.as_arrow_time_unit(), None),
1249                false,
1250            ),
1251            Field::new("val", DataType::Utf8, true),
1252        ]));
1253
1254        let ts_array = Arc::new(TimestampMillisecondArray::from(vec![
1255            1000, 2000, 5000, 7000, 8000,
1256        ]));
1257        let val_array = Arc::new(StringArray::from(vec!["a", "b", "c", "d", "e"]));
1258        let batch = RecordBatch::try_new(
1259            schema.clone(),
1260            vec![ts_array as ArrayRef, val_array as ArrayRef],
1261        )
1262        .unwrap();
1263
1264        let part = BulkPart {
1265            batch,
1266            max_ts: 8000,
1267            min_ts: 1000,
1268            sequence: 0,
1269            timestamp_index: 0,
1270            raw_data: None,
1271        };
1272
1273        let result = filter_record_batch(&part, 1000, 2000).unwrap();
1274        assert!(result.is_some());
1275        let filtered = result.unwrap();
1276        assert_eq!(filtered.num_rows(), 1);
1277        assert_eq!(filtered.min_ts, 1000);
1278        assert_eq!(filtered.max_ts, 1000);
1279
1280        // Test splitting with range [3000, 6000)
1281        let result = filter_record_batch(&part, 3000, 6000).unwrap();
1282        assert!(result.is_some());
1283        let filtered = result.unwrap();
1284        assert_eq!(filtered.num_rows(), 1);
1285        assert_eq!(filtered.min_ts, 5000);
1286        assert_eq!(filtered.max_ts, 5000);
1287
1288        // Test splitting with range that includes no points
1289        let result = filter_record_batch(&part, 3000, 4000).unwrap();
1290        assert!(result.is_none());
1291
1292        // Test splitting with range that includes all points
1293        let result = filter_record_batch(&part, 0, 9000).unwrap();
1294        assert!(result.is_some());
1295        let filtered = result.unwrap();
1296        assert_eq!(filtered.num_rows(), 5);
1297        assert_eq!(filtered.min_ts, 1000);
1298        assert_eq!(filtered.max_ts, 8000);
1299    }
1300}