mito2/memtable/
time_partition.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Partitions memtables by time.
16
17use std::collections::{HashMap, HashSet};
18use std::sync::{Arc, Mutex, MutexGuard};
19use std::time::Duration;
20
21use common_telemetry::debug;
22use common_time::timestamp::TimeUnit;
23use common_time::timestamp_millis::BucketAligned;
24use common_time::Timestamp;
25use datatypes::arrow;
26use datatypes::arrow::array::{
27    ArrayRef, BooleanArray, RecordBatch, RecordBatchOptions, TimestampMicrosecondArray,
28    TimestampMillisecondArray, TimestampNanosecondArray, TimestampSecondArray,
29};
30use datatypes::arrow::buffer::{BooleanBuffer, MutableBuffer};
31use datatypes::arrow::datatypes::{DataType, Int64Type};
32use smallvec::{smallvec, SmallVec};
33use snafu::{OptionExt, ResultExt};
34use store_api::metadata::RegionMetadataRef;
35
36use crate::error;
37use crate::error::{InvalidRequestSnafu, Result};
38use crate::memtable::bulk::part::BulkPart;
39use crate::memtable::key_values::KeyValue;
40use crate::memtable::version::SmallMemtableVec;
41use crate::memtable::{KeyValues, MemtableBuilderRef, MemtableId, MemtableRef};
42
43/// A partition holds rows with timestamps between `[min, max)`.
44#[derive(Debug, Clone)]
45pub struct TimePartition {
46    /// Memtable of the partition.
47    memtable: MemtableRef,
48    /// Time range of the partition. `min` is inclusive and `max` is exclusive.
49    /// `None` means there is no time range. The time
50    /// range is `None` if and only if the [TimePartitions::part_duration] is `None`.
51    time_range: Option<PartTimeRange>,
52}
53
54impl TimePartition {
55    /// Returns whether the `ts` belongs to the partition.
56    fn contains_timestamp(&self, ts: Timestamp) -> bool {
57        let Some(range) = self.time_range else {
58            return true;
59        };
60
61        range.contains_timestamp(ts)
62    }
63
64    /// Write rows to the part.
65    fn write(&self, kvs: &KeyValues) -> Result<()> {
66        self.memtable.write(kvs)
67    }
68
69    /// Writes a record batch to memtable.
70    fn write_record_batch(&self, rb: BulkPart) -> Result<()> {
71        self.memtable.write_bulk(rb)
72    }
73
74    /// Write a partial [BulkPart] according to [TimePartition::time_range].
75    fn write_record_batch_partial(&self, part: &BulkPart) -> error::Result<()> {
76        let Some(range) = self.time_range else {
77            unreachable!("TimePartition must have explicit time range when a bulk request involves multiple time partition")
78        };
79        let Some(filtered) = filter_record_batch(
80            part,
81            range.min_timestamp.value(),
82            range.max_timestamp.value(),
83        )?
84        else {
85            return Ok(());
86        };
87        self.write_record_batch(filtered)
88    }
89}
90
91macro_rules! create_filter_buffer {
92    ($ts_array:expr, $min:expr, $max:expr) => {{
93        let len = $ts_array.len();
94        let mut buffer = MutableBuffer::new(len.div_ceil(64) * 8);
95
96        let f = |idx: usize| -> bool {
97            // SAFETY: we only iterate the array within index bound.
98            unsafe {
99                let val = $ts_array.value_unchecked(idx);
100                val >= $min && val < $max
101            }
102        };
103
104        let chunks = len / 64;
105        let remainder = len % 64;
106
107        for chunk in 0..chunks {
108            let mut packed = 0;
109            for bit_idx in 0..64 {
110                let i = bit_idx + chunk * 64;
111                packed |= (f(i) as u64) << bit_idx;
112            }
113            // SAFETY: Already allocated sufficient capacity
114            unsafe { buffer.push_unchecked(packed) }
115        }
116
117        if remainder != 0 {
118            let mut packed = 0;
119            for bit_idx in 0..remainder {
120                let i = bit_idx + chunks * 64;
121                packed |= (f(i) as u64) << bit_idx;
122            }
123            // SAFETY: Already allocated sufficient capacity
124            unsafe { buffer.push_unchecked(packed) }
125        }
126
127        BooleanArray::new(BooleanBuffer::new(buffer.into(), 0, len), None)
128    }};
129}
130
131macro_rules! handle_timestamp_array {
132    ($ts_array:expr, $array_type:ty, $min:expr, $max:expr) => {{
133        let ts_array = $ts_array.as_any().downcast_ref::<$array_type>().unwrap();
134        let filter = create_filter_buffer!(ts_array, $min, $max);
135
136        let res = arrow::compute::filter(ts_array, &filter).context(error::ComputeArrowSnafu)?;
137        if res.is_empty() {
138            return Ok(None);
139        }
140
141        let i64array = res.as_any().downcast_ref::<$array_type>().unwrap();
142        // safety: we've checked res is not empty
143        let max_ts = arrow::compute::max(i64array).unwrap();
144        let min_ts = arrow::compute::min(i64array).unwrap();
145
146        (res, filter, min_ts, max_ts)
147    }};
148}
149
150/// Filters the given part according to min (inclusive) and max (exclusive) timestamp range.
151/// Returns [None] if no matching rows.
152pub fn filter_record_batch(part: &BulkPart, min: i64, max: i64) -> Result<Option<BulkPart>> {
153    let ts_array = part.timestamps();
154    let (ts_array, filter, min_ts, max_ts) = match ts_array.data_type() {
155        DataType::Timestamp(unit, _) => match unit {
156            arrow::datatypes::TimeUnit::Second => {
157                handle_timestamp_array!(ts_array, TimestampSecondArray, min, max)
158            }
159            arrow::datatypes::TimeUnit::Millisecond => {
160                handle_timestamp_array!(ts_array, TimestampMillisecondArray, min, max)
161            }
162            arrow::datatypes::TimeUnit::Microsecond => {
163                handle_timestamp_array!(ts_array, TimestampMicrosecondArray, min, max)
164            }
165            arrow::datatypes::TimeUnit::Nanosecond => {
166                handle_timestamp_array!(ts_array, TimestampNanosecondArray, min, max)
167            }
168        },
169        _ => {
170            unreachable!("Got data type: {:?}", ts_array.data_type());
171        }
172    };
173
174    let num_rows = ts_array.len();
175    let arrays = part
176        .batch
177        .columns()
178        .iter()
179        .enumerate()
180        .map(|(index, array)| {
181            if index == part.timestamp_index {
182                Ok(ts_array.clone())
183            } else {
184                arrow::compute::filter(&array, &filter).context(error::ComputeArrowSnafu)
185            }
186        })
187        .collect::<Result<Vec<_>>>()?;
188    let batch = RecordBatch::try_new_with_options(
189        part.batch.schema(),
190        arrays,
191        &RecordBatchOptions::default().with_row_count(Some(num_rows)),
192    )
193    .context(error::NewRecordBatchSnafu)?;
194    Ok(Some(BulkPart {
195        batch,
196        num_rows,
197        max_ts,
198        min_ts,
199        sequence: part.sequence,
200        timestamp_index: part.timestamp_index,
201    }))
202}
203
204type PartitionVec = SmallVec<[TimePartition; 2]>;
205
206/// Partitions.
207#[derive(Debug)]
208pub struct TimePartitions {
209    /// Mutable data of partitions.
210    inner: Mutex<PartitionsInner>,
211    /// Duration of a partition.
212    ///
213    /// `None` means there is only one partition and the [TimePartition::time_range] is
214    /// also `None`.
215    part_duration: Option<Duration>,
216    /// Metadata of the region.
217    metadata: RegionMetadataRef,
218    /// Builder of memtables.
219    builder: MemtableBuilderRef,
220}
221
222pub type TimePartitionsRef = Arc<TimePartitions>;
223
224impl TimePartitions {
225    /// Returns a new empty partition list with optional duration.
226    pub fn new(
227        metadata: RegionMetadataRef,
228        builder: MemtableBuilderRef,
229        next_memtable_id: MemtableId,
230        part_duration: Option<Duration>,
231    ) -> Self {
232        let mut inner = PartitionsInner::new(next_memtable_id);
233        if part_duration.is_none() {
234            // If `part_duration` is None, then we create a partition with `None` time
235            // range so we will write all rows to that partition.
236            let memtable = builder.build(inner.alloc_memtable_id(), &metadata);
237            debug!(
238                "Creates a time partition for all timestamps, region: {}, memtable_id: {}",
239                metadata.region_id,
240                memtable.id(),
241            );
242            let part = TimePartition {
243                memtable,
244                time_range: None,
245            };
246            inner.parts.push(part);
247        }
248
249        Self {
250            inner: Mutex::new(inner),
251            part_duration,
252            metadata,
253            builder,
254        }
255    }
256
257    /// Write key values to memtables.
258    ///
259    /// It creates new partitions if necessary.
260    pub fn write(&self, kvs: &KeyValues) -> Result<()> {
261        // Get all parts.
262        let parts = self.list_partitions();
263
264        // Checks whether all rows belongs to a single part. Checks in reverse order as we usually
265        // put to latest part.
266        for part in parts.iter().rev() {
267            let mut all_in_partition = true;
268            for kv in kvs.iter() {
269                // Safety: We checked the schema in the write request.
270                let ts = kv.timestamp().as_timestamp().unwrap().unwrap();
271                if !part.contains_timestamp(ts) {
272                    all_in_partition = false;
273                    break;
274                }
275            }
276            if !all_in_partition {
277                continue;
278            }
279
280            // We can write all rows to this part.
281            return part.write(kvs);
282        }
283
284        // Slow path: We have to split kvs by partitions.
285        self.write_multi_parts(kvs, &parts)
286    }
287
288    pub fn write_bulk(&self, part: BulkPart) -> Result<()> {
289        let time_type = self
290            .metadata
291            .time_index_column()
292            .column_schema
293            .data_type
294            .as_timestamp()
295            .unwrap();
296
297        // Get all parts.
298        let parts = self.list_partitions();
299        let (matching_parts, missing_parts) = self.find_partitions_by_time_range(
300            part.timestamps(),
301            &parts,
302            time_type.create_timestamp(part.min_ts),
303            time_type.create_timestamp(part.max_ts),
304        )?;
305
306        if matching_parts.len() == 1 && missing_parts.is_empty() {
307            // fast path: all timestamps fall in one time partition.
308            return matching_parts[0].write_record_batch(part);
309        }
310
311        for matching in matching_parts {
312            matching.write_record_batch_partial(&part)?
313        }
314
315        for missing in missing_parts {
316            let new_part = {
317                let mut inner = self.inner.lock().unwrap();
318                self.get_or_create_time_partition(missing, &mut inner)?
319            };
320            new_part.write_record_batch_partial(&part)?;
321        }
322        Ok(())
323    }
324
325    // Creates or gets parts with given start timestamp.
326    // Acquires the lock to avoid others create the same partition.
327    fn get_or_create_time_partition(
328        &self,
329        part_start: Timestamp,
330        inner: &mut MutexGuard<PartitionsInner>,
331    ) -> Result<TimePartition> {
332        let part_duration = self.part_duration.unwrap();
333        let part_pos = match inner
334            .parts
335            .iter()
336            .position(|part| part.time_range.unwrap().min_timestamp == part_start)
337        {
338            Some(pos) => pos,
339            None => {
340                let range = PartTimeRange::from_start_duration(part_start, part_duration)
341                    .with_context(|| InvalidRequestSnafu {
342                        region_id: self.metadata.region_id,
343                        reason: format!(
344                            "Partition time range for {part_start:?} is out of bound, bucket size: {part_duration:?}",
345                        ),
346                    })?;
347                let memtable = self
348                    .builder
349                    .build(inner.alloc_memtable_id(), &self.metadata);
350                debug!(
351                        "Create time partition {:?} for region {}, duration: {:?}, memtable_id: {}, parts_total: {}",
352                        range,
353                        self.metadata.region_id,
354                        part_duration,
355                        memtable.id(),
356                        inner.parts.len() + 1
357                    );
358                let pos = inner.parts.len();
359                inner.parts.push(TimePartition {
360                    memtable,
361                    time_range: Some(range),
362                });
363                pos
364            }
365        };
366        Ok(inner.parts[part_pos].clone())
367    }
368
369    /// Append memtables in partitions to `memtables`.
370    pub fn list_memtables(&self, memtables: &mut Vec<MemtableRef>) {
371        let inner = self.inner.lock().unwrap();
372        memtables.extend(inner.parts.iter().map(|part| part.memtable.clone()));
373    }
374
375    /// Returns the number of partitions.
376    pub fn num_partitions(&self) -> usize {
377        let inner = self.inner.lock().unwrap();
378        inner.parts.len()
379    }
380
381    /// Returns true if all memtables are empty.
382    pub fn is_empty(&self) -> bool {
383        let inner = self.inner.lock().unwrap();
384        inner.parts.iter().all(|part| part.memtable.is_empty())
385    }
386
387    /// Freezes all memtables.
388    pub fn freeze(&self) -> Result<()> {
389        let inner = self.inner.lock().unwrap();
390        for part in &*inner.parts {
391            part.memtable.freeze()?;
392        }
393        Ok(())
394    }
395
396    /// Forks latest partition and updates the partition duration if `part_duration` is Some.
397    pub fn fork(&self, metadata: &RegionMetadataRef, part_duration: Option<Duration>) -> Self {
398        // Fall back to the existing partition duration.
399        let part_duration = part_duration.or(self.part_duration);
400
401        let mut inner = self.inner.lock().unwrap();
402        let latest_part = inner
403            .parts
404            .iter()
405            .max_by_key(|part| part.time_range.map(|range| range.min_timestamp))
406            .cloned();
407
408        let Some(old_part) = latest_part else {
409            // If there is no partition, then we create a new partition with the new duration.
410            return Self::new(
411                metadata.clone(),
412                self.builder.clone(),
413                inner.next_memtable_id,
414                part_duration,
415            );
416        };
417
418        let old_stats = old_part.memtable.stats();
419        // Use the max timestamp to compute the new time range for the memtable.
420        // If `part_duration` is None, the new range will be None.
421        let new_time_range =
422            old_stats
423                .time_range()
424                .zip(part_duration)
425                .and_then(|(range, bucket)| {
426                    partition_start_timestamp(range.1, bucket)
427                        .and_then(|start| PartTimeRange::from_start_duration(start, bucket))
428                });
429        // Forks the latest partition, but compute the time range based on the new duration.
430        let memtable = old_part.memtable.fork(inner.alloc_memtable_id(), metadata);
431        let new_part = TimePartition {
432            memtable,
433            time_range: new_time_range,
434        };
435
436        Self {
437            inner: Mutex::new(PartitionsInner::with_partition(
438                new_part,
439                inner.next_memtable_id,
440            )),
441            part_duration,
442            metadata: metadata.clone(),
443            builder: self.builder.clone(),
444        }
445    }
446
447    /// Returns partition duration.
448    pub(crate) fn part_duration(&self) -> Option<Duration> {
449        self.part_duration
450    }
451
452    /// Returns memory usage.
453    pub(crate) fn memory_usage(&self) -> usize {
454        let inner = self.inner.lock().unwrap();
455        inner
456            .parts
457            .iter()
458            .map(|part| part.memtable.stats().estimated_bytes)
459            .sum()
460    }
461
462    /// Returns the number of rows.
463    pub(crate) fn num_rows(&self) -> u64 {
464        let inner = self.inner.lock().unwrap();
465        inner
466            .parts
467            .iter()
468            .map(|part| part.memtable.stats().num_rows as u64)
469            .sum()
470    }
471
472    /// Append memtables in partitions to small vec.
473    pub(crate) fn list_memtables_to_small_vec(&self, memtables: &mut SmallMemtableVec) {
474        let inner = self.inner.lock().unwrap();
475        memtables.extend(inner.parts.iter().map(|part| part.memtable.clone()));
476    }
477
478    /// Returns the next memtable id.
479    pub(crate) fn next_memtable_id(&self) -> MemtableId {
480        let inner = self.inner.lock().unwrap();
481        inner.next_memtable_id
482    }
483
484    /// Creates a new empty partition list from this list and a `part_duration`.
485    /// It falls back to the old partition duration if `part_duration` is `None`.
486    pub(crate) fn new_with_part_duration(&self, part_duration: Option<Duration>) -> Self {
487        debug_assert!(self.is_empty());
488
489        Self::new(
490            self.metadata.clone(),
491            self.builder.clone(),
492            self.next_memtable_id(),
493            part_duration.or(self.part_duration),
494        )
495    }
496
497    /// Returns all partitions.
498    fn list_partitions(&self) -> PartitionVec {
499        let inner = self.inner.lock().unwrap();
500        inner.parts.clone()
501    }
502
503    /// Find existing partitions that match the bulk data's time range and identify
504    /// any new partitions that need to be created
505    fn find_partitions_by_time_range<'a>(
506        &self,
507        ts_array: &ArrayRef,
508        existing_parts: &'a [TimePartition],
509        min: Timestamp,
510        max: Timestamp,
511    ) -> Result<(Vec<&'a TimePartition>, Vec<Timestamp>)> {
512        let mut matching = Vec::new();
513
514        let mut present = HashSet::new();
515        // First find any existing partitions that overlap
516        for part in existing_parts {
517            let Some(part_time_range) = part.time_range.as_ref() else {
518                matching.push(part);
519                return Ok((matching, Vec::new()));
520            };
521
522            if !(max < part_time_range.min_timestamp || min >= part_time_range.max_timestamp) {
523                matching.push(part);
524                present.insert(part_time_range.min_timestamp.value());
525            }
526        }
527
528        // safety: self.part_duration can only be present when reach here.
529        let part_duration = self.part_duration.unwrap();
530        let timestamp_unit = self.metadata.time_index_type().unit();
531
532        let part_duration_sec = part_duration.as_secs() as i64;
533        // SAFETY: Timestamps won't overflow when converting to Second.
534        let start_bucket = min
535            .convert_to(TimeUnit::Second)
536            .unwrap()
537            .value()
538            .div_euclid(part_duration_sec);
539        let end_bucket = max
540            .convert_to(TimeUnit::Second)
541            .unwrap()
542            .value()
543            .div_euclid(part_duration_sec);
544        let bucket_num = (end_bucket - start_bucket + 1) as usize;
545
546        let num_timestamps = ts_array.len();
547        let missing = if bucket_num <= num_timestamps {
548            (start_bucket..=end_bucket)
549                .filter_map(|start_sec| {
550                    let Some(timestamp) = Timestamp::new_second(start_sec * part_duration_sec)
551                        .convert_to(timestamp_unit)
552                    else {
553                        return Some(
554                            InvalidRequestSnafu {
555                                region_id: self.metadata.region_id,
556                                reason: format!("Timestamp out of range: {}", start_sec),
557                            }
558                            .fail(),
559                        );
560                    };
561                    if present.insert(timestamp.value()) {
562                        Some(Ok(timestamp))
563                    } else {
564                        None
565                    }
566                })
567                .collect::<Result<Vec<_>>>()?
568        } else {
569            let ts_primitive = match ts_array.data_type() {
570                DataType::Timestamp(unit, _) => match unit {
571                    arrow::datatypes::TimeUnit::Second => ts_array
572                        .as_any()
573                        .downcast_ref::<TimestampSecondArray>()
574                        .unwrap()
575                        .reinterpret_cast::<Int64Type>(),
576                    arrow::datatypes::TimeUnit::Millisecond => ts_array
577                        .as_any()
578                        .downcast_ref::<TimestampMillisecondArray>()
579                        .unwrap()
580                        .reinterpret_cast::<Int64Type>(),
581                    arrow::datatypes::TimeUnit::Microsecond => ts_array
582                        .as_any()
583                        .downcast_ref::<TimestampMicrosecondArray>()
584                        .unwrap()
585                        .reinterpret_cast::<Int64Type>(),
586                    arrow::datatypes::TimeUnit::Nanosecond => ts_array
587                        .as_any()
588                        .downcast_ref::<TimestampNanosecondArray>()
589                        .unwrap()
590                        .reinterpret_cast::<Int64Type>(),
591                },
592                _ => unreachable!(),
593            };
594
595            ts_primitive
596                .values()
597                .iter()
598                .filter_map(|ts| {
599                    let ts = self.metadata.time_index_type().create_timestamp(*ts);
600                    let Some(bucket_start) = ts
601                        .convert_to(TimeUnit::Second)
602                        .and_then(|ts| ts.align_by_bucket(part_duration_sec))
603                        .and_then(|ts| ts.convert_to(timestamp_unit))
604                    else {
605                        return Some(
606                            InvalidRequestSnafu {
607                                region_id: self.metadata.region_id,
608                                reason: format!("Timestamp out of range: {:?}", ts),
609                            }
610                            .fail(),
611                        );
612                    };
613                    if present.insert(bucket_start.value()) {
614                        Some(Ok(bucket_start))
615                    } else {
616                        None
617                    }
618                })
619                .collect::<Result<Vec<_>>>()?
620        };
621        Ok((matching, missing))
622    }
623
624    /// Write to multiple partitions.
625    fn write_multi_parts(&self, kvs: &KeyValues, parts: &PartitionVec) -> Result<()> {
626        // If part duration is `None` then there is always one partition and all rows
627        // will be put in that partition before invoking this method.
628        debug_assert!(self.part_duration.is_some());
629
630        let mut parts_to_write = HashMap::new();
631        let mut missing_parts = HashMap::new();
632        for kv in kvs.iter() {
633            let mut part_found = false;
634            // Safety: We used the timestamp before.
635            let ts = kv.timestamp().as_timestamp().unwrap().unwrap();
636            for part in parts {
637                if part.contains_timestamp(ts) {
638                    // Safety: Since part duration is `Some` so all time range should be `Some`.
639                    parts_to_write
640                        .entry(part.time_range.unwrap().min_timestamp)
641                        .or_insert_with(|| PartitionToWrite {
642                            partition: part.clone(),
643                            key_values: Vec::new(),
644                        })
645                        .key_values
646                        .push(kv);
647                    part_found = true;
648                    break;
649                }
650            }
651
652            if !part_found {
653                // We need to write it to a new part.
654                // Safety: `new()` ensures duration is always Some if we do to this method.
655                let part_duration = self.part_duration.unwrap();
656                let part_start =
657                    partition_start_timestamp(ts, part_duration).with_context(|| {
658                        InvalidRequestSnafu {
659                            region_id: self.metadata.region_id,
660                            reason: format!(
661                                "timestamp {ts:?} and bucket {part_duration:?} are out of range"
662                            ),
663                        }
664                    })?;
665                missing_parts
666                    .entry(part_start)
667                    .or_insert_with(Vec::new)
668                    .push(kv);
669            }
670        }
671
672        // Writes rows to existing parts.
673        for part_to_write in parts_to_write.into_values() {
674            for kv in part_to_write.key_values {
675                part_to_write.partition.memtable.write_one(kv)?;
676            }
677        }
678
679        // Creates new parts and writes to them. Acquires the lock to avoid others create
680        // the same partition.
681        let mut inner = self.inner.lock().unwrap();
682        for (part_start, key_values) in missing_parts {
683            let partition = self.get_or_create_time_partition(part_start, &mut inner)?;
684            for kv in key_values {
685                partition.memtable.write_one(kv)?;
686            }
687        }
688
689        Ok(())
690    }
691}
692
693/// Computes the start timestamp of the partition for `ts`.
694///
695/// It always use bucket size in seconds which should fit all timestamp resolution.
696fn partition_start_timestamp(ts: Timestamp, bucket: Duration) -> Option<Timestamp> {
697    // Safety: We convert it to seconds so it never returns `None`.
698    let ts_sec = ts.convert_to(TimeUnit::Second).unwrap();
699    let bucket_sec: i64 = bucket.as_secs().try_into().ok()?;
700    let start_sec = ts_sec.align_by_bucket(bucket_sec)?;
701    start_sec.convert_to(ts.unit())
702}
703
704#[derive(Debug)]
705struct PartitionsInner {
706    /// All partitions.
707    parts: PartitionVec,
708    /// Next memtable id.
709    next_memtable_id: MemtableId,
710}
711
712impl PartitionsInner {
713    fn new(next_memtable_id: MemtableId) -> Self {
714        Self {
715            parts: Default::default(),
716            next_memtable_id,
717        }
718    }
719
720    fn with_partition(part: TimePartition, next_memtable_id: MemtableId) -> Self {
721        Self {
722            parts: smallvec![part],
723            next_memtable_id,
724        }
725    }
726
727    fn alloc_memtable_id(&mut self) -> MemtableId {
728        let id = self.next_memtable_id;
729        self.next_memtable_id += 1;
730        id
731    }
732}
733
734/// Time range of a partition.
735#[derive(Debug, Clone, Copy)]
736struct PartTimeRange {
737    /// Inclusive min timestamp of rows in the partition.
738    min_timestamp: Timestamp,
739    /// Exclusive max timestamp of rows in the partition.
740    max_timestamp: Timestamp,
741}
742
743impl PartTimeRange {
744    fn from_start_duration(start: Timestamp, duration: Duration) -> Option<Self> {
745        let start_sec = start.convert_to(TimeUnit::Second)?;
746        let end_sec = start_sec.add_duration(duration).ok()?;
747        let min_timestamp = start_sec.convert_to(start.unit())?;
748        let max_timestamp = end_sec.convert_to(start.unit())?;
749
750        Some(Self {
751            min_timestamp,
752            max_timestamp,
753        })
754    }
755
756    /// Returns whether the `ts` belongs to the partition.
757    fn contains_timestamp(&self, ts: Timestamp) -> bool {
758        self.min_timestamp <= ts && ts < self.max_timestamp
759    }
760}
761
762struct PartitionToWrite<'a> {
763    partition: TimePartition,
764    key_values: Vec<KeyValue<'a>>,
765}
766
767#[cfg(test)]
768mod tests {
769    use std::sync::Arc;
770
771    use api::v1::SemanticType;
772    use datatypes::arrow::array::{ArrayRef, StringArray, TimestampMillisecondArray};
773    use datatypes::arrow::datatypes::{DataType, Field, Schema};
774    use datatypes::arrow::record_batch::RecordBatch;
775    use datatypes::prelude::ConcreteDataType;
776    use datatypes::schema::ColumnSchema;
777    use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
778    use store_api::storage::SequenceNumber;
779
780    use super::*;
781    use crate::memtable::partition_tree::PartitionTreeMemtableBuilder;
782    use crate::memtable::time_series::TimeSeriesMemtableBuilder;
783    use crate::test_util::memtable_util::{self, collect_iter_timestamps};
784
785    #[test]
786    fn test_no_duration() {
787        let metadata = memtable_util::metadata_for_test();
788        let builder = Arc::new(PartitionTreeMemtableBuilder::default());
789        let partitions = TimePartitions::new(metadata.clone(), builder, 0, None);
790        assert_eq!(1, partitions.num_partitions());
791        assert!(partitions.is_empty());
792
793        let kvs = memtable_util::build_key_values(
794            &metadata,
795            "hello".to_string(),
796            0,
797            &[1000, 3000, 7000, 5000, 6000],
798            0, // sequence 0, 1, 2, 3, 4
799        );
800        partitions.write(&kvs).unwrap();
801
802        assert_eq!(1, partitions.num_partitions());
803        assert!(!partitions.is_empty());
804        let mut memtables = Vec::new();
805        partitions.list_memtables(&mut memtables);
806        assert_eq!(0, memtables[0].id());
807
808        let iter = memtables[0].iter(None, None, None).unwrap();
809        let timestamps = collect_iter_timestamps(iter);
810        assert_eq!(&[1000, 3000, 5000, 6000, 7000], &timestamps[..]);
811    }
812
813    #[test]
814    fn test_write_single_part() {
815        let metadata = memtable_util::metadata_for_test();
816        let builder = Arc::new(PartitionTreeMemtableBuilder::default());
817        let partitions =
818            TimePartitions::new(metadata.clone(), builder, 0, Some(Duration::from_secs(10)));
819        assert_eq!(0, partitions.num_partitions());
820
821        let kvs = memtable_util::build_key_values(
822            &metadata,
823            "hello".to_string(),
824            0,
825            &[5000, 2000, 0],
826            0, // sequence 0, 1, 2
827        );
828        // It should creates a new partition.
829        partitions.write(&kvs).unwrap();
830        assert_eq!(1, partitions.num_partitions());
831        assert!(!partitions.is_empty());
832
833        let kvs = memtable_util::build_key_values(
834            &metadata,
835            "hello".to_string(),
836            0,
837            &[3000, 7000, 4000],
838            3, // sequence 3, 4, 5
839        );
840        // Still writes to the same partition.
841        partitions.write(&kvs).unwrap();
842        assert_eq!(1, partitions.num_partitions());
843
844        let mut memtables = Vec::new();
845        partitions.list_memtables(&mut memtables);
846        let iter = memtables[0].iter(None, None, None).unwrap();
847        let timestamps = collect_iter_timestamps(iter);
848        assert_eq!(&[0, 2000, 3000, 4000, 5000, 7000], &timestamps[..]);
849        let parts = partitions.list_partitions();
850        assert_eq!(
851            Timestamp::new_millisecond(0),
852            parts[0].time_range.unwrap().min_timestamp
853        );
854        assert_eq!(
855            Timestamp::new_millisecond(10000),
856            parts[0].time_range.unwrap().max_timestamp
857        );
858    }
859
860    fn new_multi_partitions(metadata: &RegionMetadataRef) -> TimePartitions {
861        let builder = Arc::new(PartitionTreeMemtableBuilder::default());
862        let partitions =
863            TimePartitions::new(metadata.clone(), builder, 0, Some(Duration::from_secs(5)));
864        assert_eq!(0, partitions.num_partitions());
865
866        let kvs = memtable_util::build_key_values(
867            metadata,
868            "hello".to_string(),
869            0,
870            &[2000, 0],
871            0, // sequence 0, 1
872        );
873        // It should creates a new partition.
874        partitions.write(&kvs).unwrap();
875        assert_eq!(1, partitions.num_partitions());
876        assert!(!partitions.is_empty());
877
878        let kvs = memtable_util::build_key_values(
879            metadata,
880            "hello".to_string(),
881            0,
882            &[3000, 7000, 4000, 5000],
883            2, // sequence 2, 3, 4, 5
884        );
885        // Writes 2 rows to the old partition and 1 row to a new partition.
886        partitions.write(&kvs).unwrap();
887        assert_eq!(2, partitions.num_partitions());
888
889        partitions
890    }
891
892    #[test]
893    fn test_write_multi_parts() {
894        let metadata = memtable_util::metadata_for_test();
895        let partitions = new_multi_partitions(&metadata);
896
897        let parts = partitions.list_partitions();
898        let iter = parts[0].memtable.iter(None, None, None).unwrap();
899        let timestamps = collect_iter_timestamps(iter);
900        assert_eq!(0, parts[0].memtable.id());
901        assert_eq!(
902            Timestamp::new_millisecond(0),
903            parts[0].time_range.unwrap().min_timestamp
904        );
905        assert_eq!(
906            Timestamp::new_millisecond(5000),
907            parts[0].time_range.unwrap().max_timestamp
908        );
909        assert_eq!(&[0, 2000, 3000, 4000], &timestamps[..]);
910        let iter = parts[1].memtable.iter(None, None, None).unwrap();
911        assert_eq!(1, parts[1].memtable.id());
912        let timestamps = collect_iter_timestamps(iter);
913        assert_eq!(&[5000, 7000], &timestamps[..]);
914        assert_eq!(
915            Timestamp::new_millisecond(5000),
916            parts[1].time_range.unwrap().min_timestamp
917        );
918        assert_eq!(
919            Timestamp::new_millisecond(10000),
920            parts[1].time_range.unwrap().max_timestamp
921        );
922    }
923
924    #[test]
925    fn test_new_with_part_duration() {
926        let metadata = memtable_util::metadata_for_test();
927        let builder = Arc::new(PartitionTreeMemtableBuilder::default());
928        let partitions = TimePartitions::new(metadata.clone(), builder.clone(), 0, None);
929
930        let new_parts = partitions.new_with_part_duration(Some(Duration::from_secs(5)));
931        assert_eq!(Duration::from_secs(5), new_parts.part_duration().unwrap());
932        assert_eq!(1, new_parts.next_memtable_id());
933
934        // Won't update the duration if it's None.
935        let new_parts = new_parts.new_with_part_duration(None);
936        assert_eq!(Duration::from_secs(5), new_parts.part_duration().unwrap());
937        // Don't need to create new memtables.
938        assert_eq!(1, new_parts.next_memtable_id());
939
940        let new_parts = new_parts.new_with_part_duration(Some(Duration::from_secs(10)));
941        assert_eq!(Duration::from_secs(10), new_parts.part_duration().unwrap());
942        // Don't need to create new memtables.
943        assert_eq!(1, new_parts.next_memtable_id());
944
945        let builder = Arc::new(PartitionTreeMemtableBuilder::default());
946        let partitions = TimePartitions::new(metadata.clone(), builder.clone(), 0, None);
947        // Need to build a new memtable as duration is still None.
948        let new_parts = partitions.new_with_part_duration(None);
949        assert!(new_parts.part_duration().is_none());
950        assert_eq!(2, new_parts.next_memtable_id());
951    }
952
953    #[test]
954    fn test_fork_empty() {
955        let metadata = memtable_util::metadata_for_test();
956        let builder = Arc::new(PartitionTreeMemtableBuilder::default());
957        let partitions = TimePartitions::new(metadata.clone(), builder, 0, None);
958        partitions.freeze().unwrap();
959        let new_parts = partitions.fork(&metadata, None);
960        assert!(new_parts.part_duration().is_none());
961        assert_eq!(1, new_parts.list_partitions()[0].memtable.id());
962        assert_eq!(2, new_parts.next_memtable_id());
963
964        new_parts.freeze().unwrap();
965        let new_parts = new_parts.fork(&metadata, Some(Duration::from_secs(5)));
966        assert_eq!(Duration::from_secs(5), new_parts.part_duration().unwrap());
967        assert_eq!(2, new_parts.list_partitions()[0].memtable.id());
968        assert_eq!(3, new_parts.next_memtable_id());
969
970        new_parts.freeze().unwrap();
971        let new_parts = new_parts.fork(&metadata, None);
972        // Won't update the duration.
973        assert_eq!(Duration::from_secs(5), new_parts.part_duration().unwrap());
974        assert_eq!(3, new_parts.list_partitions()[0].memtable.id());
975        assert_eq!(4, new_parts.next_memtable_id());
976
977        new_parts.freeze().unwrap();
978        let new_parts = new_parts.fork(&metadata, Some(Duration::from_secs(10)));
979        assert_eq!(Duration::from_secs(10), new_parts.part_duration().unwrap());
980        assert_eq!(4, new_parts.list_partitions()[0].memtable.id());
981        assert_eq!(5, new_parts.next_memtable_id());
982    }
983
984    #[test]
985    fn test_fork_non_empty_none() {
986        let metadata = memtable_util::metadata_for_test();
987        let partitions = new_multi_partitions(&metadata);
988        partitions.freeze().unwrap();
989
990        // Won't update the duration.
991        let new_parts = partitions.fork(&metadata, None);
992        assert!(new_parts.is_empty());
993        assert_eq!(Duration::from_secs(5), new_parts.part_duration().unwrap());
994        assert_eq!(2, new_parts.list_partitions()[0].memtable.id());
995        assert_eq!(3, new_parts.next_memtable_id());
996
997        // Although we don't fork a memtable multiple times, we still add a test for it.
998        let new_parts = partitions.fork(&metadata, Some(Duration::from_secs(10)));
999        assert!(new_parts.is_empty());
1000        assert_eq!(Duration::from_secs(10), new_parts.part_duration().unwrap());
1001        assert_eq!(3, new_parts.list_partitions()[0].memtable.id());
1002        assert_eq!(4, new_parts.next_memtable_id());
1003    }
1004
1005    #[test]
1006    fn test_find_partitions_by_time_range() {
1007        let metadata = memtable_util::metadata_for_test();
1008        let builder = Arc::new(PartitionTreeMemtableBuilder::default());
1009
1010        // Case 1: No time range partitioning
1011        let partitions = TimePartitions::new(metadata.clone(), builder.clone(), 0, None);
1012        let parts = partitions.list_partitions();
1013        let (matching, missing) = partitions
1014            .find_partitions_by_time_range(
1015                &(Arc::new(TimestampMillisecondArray::from_iter_values(1000..=2000)) as ArrayRef),
1016                &parts,
1017                Timestamp::new_millisecond(1000),
1018                Timestamp::new_millisecond(2000),
1019            )
1020            .unwrap();
1021        assert_eq!(matching.len(), 1);
1022        assert!(missing.is_empty());
1023        assert!(matching[0].time_range.is_none());
1024
1025        // Case 2: With time range partitioning
1026        let partitions = TimePartitions::new(
1027            metadata.clone(),
1028            builder.clone(),
1029            0,
1030            Some(Duration::from_secs(5)),
1031        );
1032
1033        // Create two existing partitions: [0, 5000) and [5000, 10000)
1034        let kvs =
1035            memtable_util::build_key_values(&metadata, "hello".to_string(), 0, &[2000, 4000], 0);
1036        partitions.write(&kvs).unwrap();
1037        let kvs =
1038            memtable_util::build_key_values(&metadata, "hello".to_string(), 0, &[7000, 8000], 2);
1039        partitions.write(&kvs).unwrap();
1040
1041        let parts = partitions.list_partitions();
1042        assert_eq!(2, parts.len());
1043
1044        // Test case 2a: Query fully within existing partition
1045        let (matching, missing) = partitions
1046            .find_partitions_by_time_range(
1047                &(Arc::new(TimestampMillisecondArray::from_iter_values(2000..=4000)) as ArrayRef),
1048                &parts,
1049                Timestamp::new_millisecond(2000),
1050                Timestamp::new_millisecond(4000),
1051            )
1052            .unwrap();
1053        assert_eq!(matching.len(), 1);
1054        assert!(missing.is_empty());
1055        assert_eq!(matching[0].time_range.unwrap().min_timestamp.value(), 0);
1056
1057        // Test case 2b: Query spanning multiple existing partitions
1058        let (matching, missing) = partitions
1059            .find_partitions_by_time_range(
1060                &(Arc::new(TimestampMillisecondArray::from_iter_values(3000..=8000)) as ArrayRef),
1061                &parts,
1062                Timestamp::new_millisecond(3000),
1063                Timestamp::new_millisecond(8000),
1064            )
1065            .unwrap();
1066        assert_eq!(matching.len(), 2);
1067        assert!(missing.is_empty());
1068        assert_eq!(matching[0].time_range.unwrap().min_timestamp.value(), 0);
1069        assert_eq!(matching[1].time_range.unwrap().min_timestamp.value(), 5000);
1070
1071        // Test case 2c: Query requiring new partition
1072        let (matching, missing) = partitions
1073            .find_partitions_by_time_range(
1074                &(Arc::new(TimestampMillisecondArray::from_iter_values(12000..=13000)) as ArrayRef),
1075                &parts,
1076                Timestamp::new_millisecond(12000),
1077                Timestamp::new_millisecond(13000),
1078            )
1079            .unwrap();
1080        assert!(matching.is_empty());
1081        assert_eq!(missing.len(), 1);
1082        assert_eq!(missing[0].value(), 10000);
1083
1084        // Test case 2d: Query partially overlapping existing partition
1085        let (matching, missing) = partitions
1086            .find_partitions_by_time_range(
1087                &(Arc::new(TimestampMillisecondArray::from_iter_values(4000..=6000)) as ArrayRef),
1088                &parts,
1089                Timestamp::new_millisecond(4000),
1090                Timestamp::new_millisecond(6000),
1091            )
1092            .unwrap();
1093        assert_eq!(matching.len(), 2);
1094        assert!(missing.is_empty());
1095        assert_eq!(matching[0].time_range.unwrap().min_timestamp.value(), 0);
1096        assert_eq!(matching[1].time_range.unwrap().min_timestamp.value(), 5000);
1097
1098        // Test case 2e: Corner case
1099        let (matching, missing) = partitions
1100            .find_partitions_by_time_range(
1101                &(Arc::new(TimestampMillisecondArray::from_iter_values(4999..=5000)) as ArrayRef),
1102                &parts,
1103                Timestamp::new_millisecond(4999),
1104                Timestamp::new_millisecond(5000),
1105            )
1106            .unwrap();
1107        assert_eq!(matching.len(), 2);
1108        assert!(missing.is_empty());
1109        assert_eq!(matching[0].time_range.unwrap().min_timestamp.value(), 0);
1110        assert_eq!(matching[1].time_range.unwrap().min_timestamp.value(), 5000);
1111
1112        // Test case 2f: Corner case with
1113        let (matching, missing) = partitions
1114            .find_partitions_by_time_range(
1115                &(Arc::new(TimestampMillisecondArray::from_iter_values(9999..=10000)) as ArrayRef),
1116                &parts,
1117                Timestamp::new_millisecond(9999),
1118                Timestamp::new_millisecond(10000),
1119            )
1120            .unwrap();
1121        assert_eq!(matching.len(), 1);
1122        assert_eq!(1, missing.len());
1123        assert_eq!(matching[0].time_range.unwrap().min_timestamp.value(), 5000);
1124        assert_eq!(missing[0].value(), 10000);
1125
1126        // Test case 2g: Cross 0
1127        let (matching, missing) = partitions
1128            .find_partitions_by_time_range(
1129                &(Arc::new(TimestampMillisecondArray::from_iter_values(-1000..=1000)) as ArrayRef),
1130                &parts,
1131                Timestamp::new_millisecond(-1000),
1132                Timestamp::new_millisecond(1000),
1133            )
1134            .unwrap();
1135        assert_eq!(matching.len(), 1);
1136        assert_eq!(matching[0].time_range.unwrap().min_timestamp.value(), 0);
1137        assert_eq!(1, missing.len());
1138        assert_eq!(missing[0].value(), -5000);
1139
1140        // Test case 3: sparse data
1141        let (matching, missing) = partitions
1142            .find_partitions_by_time_range(
1143                &(Arc::new(TimestampMillisecondArray::from(vec![
1144                    -100000000000,
1145                    0,
1146                    100000000000,
1147                ])) as ArrayRef),
1148                &parts,
1149                Timestamp::new_millisecond(-100000000000),
1150                Timestamp::new_millisecond(100000000000),
1151            )
1152            .unwrap();
1153        assert_eq!(2, matching.len());
1154        assert_eq!(matching[0].time_range.unwrap().min_timestamp.value(), 0);
1155        assert_eq!(matching[1].time_range.unwrap().min_timestamp.value(), 5000);
1156        assert_eq!(2, missing.len());
1157        assert_eq!(missing[0].value(), -100000000000);
1158        assert_eq!(missing[1].value(), 100000000000);
1159    }
1160
1161    fn build_part(ts: &[i64], sequence: SequenceNumber) -> BulkPart {
1162        let schema = Arc::new(Schema::new(vec![
1163            Field::new(
1164                "ts",
1165                arrow::datatypes::DataType::Timestamp(
1166                    arrow::datatypes::TimeUnit::Millisecond,
1167                    None,
1168                ),
1169                false,
1170            ),
1171            Field::new("val", DataType::Utf8, true),
1172        ]));
1173        let ts_data = ts.to_vec();
1174        let ts_array = Arc::new(TimestampMillisecondArray::from(ts_data));
1175        let val_array = Arc::new(StringArray::from_iter_values(
1176            ts.iter().map(|v| v.to_string()),
1177        ));
1178        let batch = RecordBatch::try_new(
1179            schema,
1180            vec![ts_array.clone() as ArrayRef, val_array.clone() as ArrayRef],
1181        )
1182        .unwrap();
1183        let max_ts = ts.iter().max().copied().unwrap();
1184        let min_ts = ts.iter().min().copied().unwrap();
1185        BulkPart {
1186            batch,
1187            num_rows: ts.len(),
1188            max_ts,
1189            min_ts,
1190            sequence,
1191            timestamp_index: 0,
1192        }
1193    }
1194
1195    #[test]
1196    fn test_write_bulk() {
1197        let mut metadata_builder = RegionMetadataBuilder::new(0.into());
1198        metadata_builder
1199            .push_column_metadata(ColumnMetadata {
1200                column_schema: ColumnSchema::new(
1201                    "ts",
1202                    ConcreteDataType::timestamp_millisecond_datatype(),
1203                    false,
1204                ),
1205                semantic_type: SemanticType::Timestamp,
1206                column_id: 0,
1207            })
1208            .push_column_metadata(ColumnMetadata {
1209                column_schema: ColumnSchema::new("val", ConcreteDataType::string_datatype(), false),
1210                semantic_type: SemanticType::Field,
1211                column_id: 1,
1212            })
1213            .primary_key(vec![]);
1214        let metadata = Arc::new(metadata_builder.build().unwrap());
1215
1216        let builder = Arc::new(TimeSeriesMemtableBuilder::default());
1217        let partitions = TimePartitions::new(
1218            metadata.clone(),
1219            builder.clone(),
1220            0,
1221            Some(Duration::from_secs(5)),
1222        );
1223
1224        // Test case 1: Write to single partition
1225        partitions
1226            .write_bulk(build_part(&[1000, 2000, 3000], 0))
1227            .unwrap();
1228
1229        let parts = partitions.list_partitions();
1230        assert_eq!(1, parts.len());
1231        let iter = parts[0].memtable.iter(None, None, None).unwrap();
1232        let timestamps = collect_iter_timestamps(iter);
1233        assert_eq!(&[1000, 2000, 3000], &timestamps[..]);
1234
1235        // Test case 2: Write across multiple existing partitions
1236        partitions
1237            .write_bulk(build_part(&[4000, 5000, 6000], 1))
1238            .unwrap();
1239        let parts = partitions.list_partitions();
1240        assert_eq!(2, parts.len());
1241        // Check first partition [0, 5000)
1242        let iter = parts[0].memtable.iter(None, None, None).unwrap();
1243        let timestamps = collect_iter_timestamps(iter);
1244        assert_eq!(&[1000, 2000, 3000, 4000], &timestamps[..]);
1245        // Check second partition [5000, 10000)
1246        let iter = parts[1].memtable.iter(None, None, None).unwrap();
1247        let timestamps = collect_iter_timestamps(iter);
1248        assert_eq!(&[5000, 6000], &timestamps[..]);
1249
1250        // Test case 3: Write requiring new partition
1251        partitions
1252            .write_bulk(build_part(&[11000, 12000], 3))
1253            .unwrap();
1254
1255        let parts = partitions.list_partitions();
1256        assert_eq!(3, parts.len());
1257
1258        // Check new partition [10000, 15000)
1259        let iter = parts[2].memtable.iter(None, None, None).unwrap();
1260        let timestamps = collect_iter_timestamps(iter);
1261        assert_eq!(&[11000, 12000], &timestamps[..]);
1262
1263        // Test case 4: Write with no time range partitioning
1264        let partitions = TimePartitions::new(metadata.clone(), builder, 3, None);
1265
1266        partitions
1267            .write_bulk(build_part(&[1000, 5000, 9000], 4))
1268            .unwrap();
1269
1270        let parts = partitions.list_partitions();
1271        assert_eq!(1, parts.len());
1272        let iter = parts[0].memtable.iter(None, None, None).unwrap();
1273        let timestamps = collect_iter_timestamps(iter);
1274        assert_eq!(&[1000, 5000, 9000], &timestamps[..]);
1275    }
1276
1277    #[test]
1278    fn test_split_record_batch() {
1279        let schema = Arc::new(Schema::new(vec![
1280            Field::new(
1281                "ts",
1282                DataType::Timestamp(TimeUnit::Millisecond.as_arrow_time_unit(), None),
1283                false,
1284            ),
1285            Field::new("val", DataType::Utf8, true),
1286        ]));
1287
1288        let ts_array = Arc::new(TimestampMillisecondArray::from(vec![
1289            1000, 2000, 5000, 7000, 8000,
1290        ]));
1291        let val_array = Arc::new(StringArray::from(vec!["a", "b", "c", "d", "e"]));
1292        let batch = RecordBatch::try_new(
1293            schema.clone(),
1294            vec![ts_array as ArrayRef, val_array as ArrayRef],
1295        )
1296        .unwrap();
1297
1298        let part = BulkPart {
1299            batch,
1300            num_rows: 5,
1301            max_ts: 8000,
1302            min_ts: 1000,
1303            sequence: 0,
1304            timestamp_index: 0,
1305        };
1306
1307        let result = filter_record_batch(&part, 1000, 2000).unwrap();
1308        assert!(result.is_some());
1309        let filtered = result.unwrap();
1310        assert_eq!(filtered.num_rows, 1);
1311        assert_eq!(filtered.min_ts, 1000);
1312        assert_eq!(filtered.max_ts, 1000);
1313
1314        // Test splitting with range [3000, 6000)
1315        let result = filter_record_batch(&part, 3000, 6000).unwrap();
1316        assert!(result.is_some());
1317        let filtered = result.unwrap();
1318        assert_eq!(filtered.num_rows, 1);
1319        assert_eq!(filtered.min_ts, 5000);
1320        assert_eq!(filtered.max_ts, 5000);
1321
1322        // Test splitting with range that includes no points
1323        let result = filter_record_batch(&part, 3000, 4000).unwrap();
1324        assert!(result.is_none());
1325
1326        // Test splitting with range that includes all points
1327        let result = filter_record_batch(&part, 0, 9000).unwrap();
1328        assert!(result.is_some());
1329        let filtered = result.unwrap();
1330        assert_eq!(filtered.num_rows, 5);
1331        assert_eq!(filtered.min_ts, 1000);
1332        assert_eq!(filtered.max_ts, 8000);
1333    }
1334}