mito2/memtable/
time_partition.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Partitions memtables by time.
16
17use std::collections::{HashMap, HashSet};
18use std::sync::{Arc, Mutex, MutexGuard};
19use std::time::Duration;
20
21use common_telemetry::debug;
22use common_time::timestamp::TimeUnit;
23use common_time::timestamp_millis::BucketAligned;
24use common_time::Timestamp;
25use datatypes::arrow;
26use datatypes::arrow::array::{
27    ArrayRef, BooleanArray, RecordBatch, RecordBatchOptions, TimestampMicrosecondArray,
28    TimestampMillisecondArray, TimestampNanosecondArray, TimestampSecondArray,
29};
30use datatypes::arrow::buffer::{BooleanBuffer, MutableBuffer};
31use datatypes::arrow::datatypes::{DataType, Int64Type, SchemaRef};
32use mito_codec::key_values::KeyValue;
33use mito_codec::row_converter::{build_primary_key_codec, PrimaryKeyCodec};
34use smallvec::{smallvec, SmallVec};
35use snafu::{OptionExt, ResultExt};
36use store_api::metadata::RegionMetadataRef;
37
38use crate::error;
39use crate::error::{InvalidRequestSnafu, Result};
40use crate::memtable::bulk::part::{BulkPart, BulkPartConverter};
41use crate::memtable::version::SmallMemtableVec;
42use crate::memtable::{KeyValues, MemtableBuilderRef, MemtableId, MemtableRef};
43use crate::sst::{to_flat_sst_arrow_schema, FlatSchemaOptions};
44
45/// Initial time window if not specified.
46const INITIAL_TIME_WINDOW: Duration = Duration::from_days(1);
47
48/// A partition holds rows with timestamps between `[min, max)`.
49#[derive(Debug, Clone)]
50pub struct TimePartition {
51    /// Memtable of the partition.
52    memtable: MemtableRef,
53    /// Time range of the partition. `min` is inclusive and `max` is exclusive.
54    time_range: PartTimeRange,
55}
56
57impl TimePartition {
58    /// Returns whether the `ts` belongs to the partition.
59    fn contains_timestamp(&self, ts: Timestamp) -> bool {
60        self.time_range.contains_timestamp(ts)
61    }
62
63    /// Write rows to the part.
64    fn write(&self, kvs: &KeyValues) -> Result<()> {
65        self.memtable.write(kvs)
66    }
67
68    /// Writes a record batch to memtable.
69    fn write_record_batch(&self, rb: BulkPart) -> Result<()> {
70        self.memtable.write_bulk(rb)
71    }
72
73    /// Write a partial [BulkPart] according to [TimePartition::time_range].
74    fn write_record_batch_partial(&self, part: &BulkPart) -> Result<()> {
75        let Some(filtered) = filter_record_batch(
76            part,
77            self.time_range.min_timestamp.value(),
78            self.time_range.max_timestamp.value(),
79        )?
80        else {
81            return Ok(());
82        };
83        self.write_record_batch(filtered)
84    }
85}
86
87macro_rules! create_filter_buffer {
88    ($ts_array:expr, $min:expr, $max:expr) => {{
89        let len = $ts_array.len();
90        let mut buffer = MutableBuffer::new(len.div_ceil(64) * 8);
91
92        let f = |idx: usize| -> bool {
93            // SAFETY: we only iterate the array within index bound.
94            unsafe {
95                let val = $ts_array.value_unchecked(idx);
96                val >= $min && val < $max
97            }
98        };
99
100        let chunks = len / 64;
101        let remainder = len % 64;
102
103        for chunk in 0..chunks {
104            let mut packed = 0;
105            for bit_idx in 0..64 {
106                let i = bit_idx + chunk * 64;
107                packed |= (f(i) as u64) << bit_idx;
108            }
109            // SAFETY: Already allocated sufficient capacity
110            unsafe { buffer.push_unchecked(packed) }
111        }
112
113        if remainder != 0 {
114            let mut packed = 0;
115            for bit_idx in 0..remainder {
116                let i = bit_idx + chunks * 64;
117                packed |= (f(i) as u64) << bit_idx;
118            }
119            // SAFETY: Already allocated sufficient capacity
120            unsafe { buffer.push_unchecked(packed) }
121        }
122
123        BooleanArray::new(BooleanBuffer::new(buffer.into(), 0, len), None)
124    }};
125}
126
127macro_rules! handle_timestamp_array {
128    ($ts_array:expr, $array_type:ty, $min:expr, $max:expr) => {{
129        let ts_array = $ts_array.as_any().downcast_ref::<$array_type>().unwrap();
130        let filter = create_filter_buffer!(ts_array, $min, $max);
131
132        let res = arrow::compute::filter(ts_array, &filter).context(error::ComputeArrowSnafu)?;
133        if res.is_empty() {
134            return Ok(None);
135        }
136
137        let i64array = res.as_any().downcast_ref::<$array_type>().unwrap();
138        // safety: we've checked res is not empty
139        let max_ts = arrow::compute::max(i64array).unwrap();
140        let min_ts = arrow::compute::min(i64array).unwrap();
141
142        (res, filter, min_ts, max_ts)
143    }};
144}
145
146/// Filters the given part according to min (inclusive) and max (exclusive) timestamp range.
147/// Returns [None] if no matching rows.
148pub fn filter_record_batch(part: &BulkPart, min: i64, max: i64) -> Result<Option<BulkPart>> {
149    let ts_array = part.timestamps();
150    let (ts_array, filter, min_ts, max_ts) = match ts_array.data_type() {
151        DataType::Timestamp(unit, _) => match unit {
152            arrow::datatypes::TimeUnit::Second => {
153                handle_timestamp_array!(ts_array, TimestampSecondArray, min, max)
154            }
155            arrow::datatypes::TimeUnit::Millisecond => {
156                handle_timestamp_array!(ts_array, TimestampMillisecondArray, min, max)
157            }
158            arrow::datatypes::TimeUnit::Microsecond => {
159                handle_timestamp_array!(ts_array, TimestampMicrosecondArray, min, max)
160            }
161            arrow::datatypes::TimeUnit::Nanosecond => {
162                handle_timestamp_array!(ts_array, TimestampNanosecondArray, min, max)
163            }
164        },
165        _ => {
166            unreachable!("Got data type: {:?}", ts_array.data_type());
167        }
168    };
169
170    let num_rows = ts_array.len();
171    let arrays = part
172        .batch
173        .columns()
174        .iter()
175        .enumerate()
176        .map(|(index, array)| {
177            if index == part.timestamp_index {
178                Ok(ts_array.clone())
179            } else {
180                arrow::compute::filter(&array, &filter).context(error::ComputeArrowSnafu)
181            }
182        })
183        .collect::<Result<Vec<_>>>()?;
184    let batch = RecordBatch::try_new_with_options(
185        part.batch.schema(),
186        arrays,
187        &RecordBatchOptions::default().with_row_count(Some(num_rows)),
188    )
189    .context(error::NewRecordBatchSnafu)?;
190    Ok(Some(BulkPart {
191        batch,
192        max_ts,
193        min_ts,
194        sequence: part.sequence,
195        timestamp_index: part.timestamp_index,
196        raw_data: None,
197    }))
198}
199
200type PartitionVec = SmallVec<[TimePartition; 2]>;
201
202/// Partitions.
203#[derive(Debug)]
204pub struct TimePartitions {
205    /// Mutable data of partitions.
206    inner: Mutex<PartitionsInner>,
207    /// Duration of a partition.
208    part_duration: Duration,
209    /// Metadata of the region.
210    metadata: RegionMetadataRef,
211    /// Builder of memtables.
212    builder: MemtableBuilderRef,
213    /// Primary key encoder.
214    primary_key_codec: Arc<dyn PrimaryKeyCodec>,
215
216    /// Cached schema for bulk insert.
217    /// This field is Some if the memtable uses bulk insert.
218    bulk_schema: Option<SchemaRef>,
219}
220
221pub type TimePartitionsRef = Arc<TimePartitions>;
222
223impl TimePartitions {
224    /// Returns a new empty partition list with optional duration.
225    pub fn new(
226        metadata: RegionMetadataRef,
227        builder: MemtableBuilderRef,
228        next_memtable_id: MemtableId,
229        part_duration: Option<Duration>,
230    ) -> Self {
231        let inner = PartitionsInner::new(next_memtable_id);
232        let primary_key_codec = build_primary_key_codec(&metadata);
233        let bulk_schema = builder.use_bulk_insert(&metadata).then(|| {
234            let opts = FlatSchemaOptions::from_encoding(metadata.primary_key_encoding);
235            to_flat_sst_arrow_schema(&metadata, &opts)
236        });
237
238        Self {
239            inner: Mutex::new(inner),
240            part_duration: part_duration.unwrap_or(INITIAL_TIME_WINDOW),
241            metadata,
242            builder,
243            primary_key_codec,
244            bulk_schema,
245        }
246    }
247
248    /// Write key values to memtables.
249    ///
250    /// It creates new partitions if necessary.
251    pub fn write(&self, kvs: &KeyValues) -> Result<()> {
252        if let Some(bulk_schema) = &self.bulk_schema {
253            let mut converter = BulkPartConverter::new(
254                &self.metadata,
255                bulk_schema.clone(),
256                kvs.num_rows(),
257                self.primary_key_codec.clone(),
258                // Always store primary keys for bulk mode.
259                true,
260            );
261            converter.append_key_values(kvs)?;
262            let part = converter.convert()?;
263
264            return self.write_bulk(part);
265        }
266
267        // Get all parts.
268        let parts = self.list_partitions();
269
270        // Checks whether all rows belongs to a single part. Checks in reverse order as we usually
271        // put to latest part.
272        for part in parts.iter().rev() {
273            let mut all_in_partition = true;
274            for kv in kvs.iter() {
275                // Safety: We checked the schema in the write request.
276                let ts = kv.timestamp().as_timestamp().unwrap().unwrap();
277                if !part.contains_timestamp(ts) {
278                    all_in_partition = false;
279                    break;
280                }
281            }
282            if !all_in_partition {
283                continue;
284            }
285
286            // We can write all rows to this part.
287            return part.write(kvs);
288        }
289
290        // Slow path: We have to split kvs by partitions.
291        self.write_multi_parts(kvs, &parts)
292    }
293
294    pub fn write_bulk(&self, part: BulkPart) -> Result<()> {
295        let time_type = self
296            .metadata
297            .time_index_column()
298            .column_schema
299            .data_type
300            .as_timestamp()
301            .unwrap();
302
303        // Get all parts.
304        let parts = self.list_partitions();
305        let (matching_parts, missing_parts) = self.find_partitions_by_time_range(
306            part.timestamps(),
307            &parts,
308            time_type.create_timestamp(part.min_ts),
309            time_type.create_timestamp(part.max_ts),
310        )?;
311
312        if matching_parts.len() == 1 && missing_parts.is_empty() {
313            // fast path: all timestamps fall in one time partition.
314            return matching_parts[0].write_record_batch(part);
315        }
316
317        for matching in matching_parts {
318            matching.write_record_batch_partial(&part)?
319        }
320
321        for missing in missing_parts {
322            let new_part = {
323                let mut inner = self.inner.lock().unwrap();
324                self.get_or_create_time_partition(missing, &mut inner)?
325            };
326            new_part.write_record_batch_partial(&part)?;
327        }
328        Ok(())
329    }
330
331    // Creates or gets parts with given start timestamp.
332    // Acquires the lock to avoid others create the same partition.
333    fn get_or_create_time_partition(
334        &self,
335        part_start: Timestamp,
336        inner: &mut MutexGuard<PartitionsInner>,
337    ) -> Result<TimePartition> {
338        let part_pos = match inner
339            .parts
340            .iter()
341            .position(|part| part.time_range.min_timestamp == part_start)
342        {
343            Some(pos) => pos,
344            None => {
345                let range = PartTimeRange::from_start_duration(part_start, self.part_duration)
346                    .with_context(|| InvalidRequestSnafu {
347                        region_id: self.metadata.region_id,
348                        reason: format!(
349                            "Partition time range for {part_start:?} is out of bound, bucket size: {:?}", self.part_duration
350                        ),
351                    })?;
352                let memtable = self
353                    .builder
354                    .build(inner.alloc_memtable_id(), &self.metadata);
355                debug!(
356                        "Create time partition {:?} for region {}, duration: {:?}, memtable_id: {}, parts_total: {}",
357                        range,
358                        self.metadata.region_id,
359                        self.part_duration,
360                        memtable.id(),
361                        inner.parts.len() + 1
362                    );
363                let pos = inner.parts.len();
364                inner.parts.push(TimePartition {
365                    memtable,
366                    time_range: range,
367                });
368                pos
369            }
370        };
371        Ok(inner.parts[part_pos].clone())
372    }
373
374    /// Append memtables in partitions to `memtables`.
375    pub fn list_memtables(&self, memtables: &mut Vec<MemtableRef>) {
376        let inner = self.inner.lock().unwrap();
377        memtables.extend(inner.parts.iter().map(|part| part.memtable.clone()));
378    }
379
380    /// Returns the number of partitions.
381    pub fn num_partitions(&self) -> usize {
382        let inner = self.inner.lock().unwrap();
383        inner.parts.len()
384    }
385
386    /// Returns true if all memtables are empty.
387    pub fn is_empty(&self) -> bool {
388        let inner = self.inner.lock().unwrap();
389        inner.parts.iter().all(|part| part.memtable.is_empty())
390    }
391
392    /// Freezes all memtables.
393    pub fn freeze(&self) -> Result<()> {
394        let inner = self.inner.lock().unwrap();
395        for part in &*inner.parts {
396            part.memtable.freeze()?;
397        }
398        Ok(())
399    }
400
401    /// Forks latest partition and updates the partition duration if `part_duration` is Some.
402    pub fn fork(&self, metadata: &RegionMetadataRef, part_duration: Option<Duration>) -> Self {
403        // Fall back to the existing partition duration.
404        let part_duration = part_duration.unwrap_or(self.part_duration);
405
406        let mut inner = self.inner.lock().unwrap();
407        let latest_part = inner
408            .parts
409            .iter()
410            .max_by_key(|part| part.time_range.min_timestamp)
411            .cloned();
412
413        let Some(old_part) = latest_part else {
414            // If there is no partition, then we create a new partition with the new duration.
415            return Self::new(
416                metadata.clone(),
417                self.builder.clone(),
418                inner.next_memtable_id,
419                Some(part_duration),
420            );
421        };
422
423        let old_stats = old_part.memtable.stats();
424        // Use the max timestamp to compute the new time range for the memtable.
425        let partitions_inner = old_stats
426            .time_range()
427            .and_then(|(_, old_stats_end_timestamp)| {
428                partition_start_timestamp(old_stats_end_timestamp, part_duration)
429                    .and_then(|start| PartTimeRange::from_start_duration(start, part_duration))
430            })
431            .map(|part_time_range| {
432                // Forks the latest partition, but compute the time range based on the new duration.
433                let memtable = old_part.memtable.fork(inner.alloc_memtable_id(), metadata);
434                let part = TimePartition {
435                    memtable,
436                    time_range: part_time_range,
437                };
438                PartitionsInner::with_partition(part, inner.next_memtable_id)
439            })
440            .unwrap_or_else(|| PartitionsInner::new(inner.next_memtable_id));
441
442        Self {
443            inner: Mutex::new(partitions_inner),
444            part_duration,
445            metadata: metadata.clone(),
446            builder: self.builder.clone(),
447            primary_key_codec: self.primary_key_codec.clone(),
448            bulk_schema: self.bulk_schema.clone(),
449        }
450    }
451
452    /// Returns partition duration.
453    pub(crate) fn part_duration(&self) -> Duration {
454        self.part_duration
455    }
456
457    /// Returns memory usage.
458    pub(crate) fn memory_usage(&self) -> usize {
459        let inner = self.inner.lock().unwrap();
460        inner
461            .parts
462            .iter()
463            .map(|part| part.memtable.stats().estimated_bytes)
464            .sum()
465    }
466
467    /// Returns the number of rows.
468    pub(crate) fn num_rows(&self) -> u64 {
469        let inner = self.inner.lock().unwrap();
470        inner
471            .parts
472            .iter()
473            .map(|part| part.memtable.stats().num_rows as u64)
474            .sum()
475    }
476
477    /// Append memtables in partitions to small vec.
478    pub(crate) fn list_memtables_to_small_vec(&self, memtables: &mut SmallMemtableVec) {
479        let inner = self.inner.lock().unwrap();
480        memtables.extend(inner.parts.iter().map(|part| part.memtable.clone()));
481    }
482
483    /// Returns the next memtable id.
484    pub(crate) fn next_memtable_id(&self) -> MemtableId {
485        let inner = self.inner.lock().unwrap();
486        inner.next_memtable_id
487    }
488
489    /// Creates a new empty partition list from this list and a `part_duration`.
490    /// It falls back to the old partition duration if `part_duration` is `None`.
491    pub(crate) fn new_with_part_duration(&self, part_duration: Option<Duration>) -> Self {
492        debug_assert!(self.is_empty());
493
494        Self::new(
495            self.metadata.clone(),
496            self.builder.clone(),
497            self.next_memtable_id(),
498            Some(part_duration.unwrap_or(self.part_duration)),
499        )
500    }
501
502    /// Returns all partitions.
503    fn list_partitions(&self) -> PartitionVec {
504        let inner = self.inner.lock().unwrap();
505        inner.parts.clone()
506    }
507
508    /// Find existing partitions that match the bulk data's time range and identify
509    /// any new partitions that need to be created
510    fn find_partitions_by_time_range<'a>(
511        &self,
512        ts_array: &ArrayRef,
513        existing_parts: &'a [TimePartition],
514        min: Timestamp,
515        max: Timestamp,
516    ) -> Result<(Vec<&'a TimePartition>, Vec<Timestamp>)> {
517        let mut matching = Vec::new();
518
519        let mut present = HashSet::new();
520        // First find any existing partitions that overlap
521        for part in existing_parts {
522            let part_time_range = &part.time_range;
523            if !(max < part_time_range.min_timestamp || min >= part_time_range.max_timestamp) {
524                matching.push(part);
525                present.insert(part_time_range.min_timestamp.value());
526            }
527        }
528
529        // safety: self.part_duration can only be present when reach here.
530        let part_duration = self.part_duration_or_default();
531        let timestamp_unit = self.metadata.time_index_type().unit();
532
533        let part_duration_sec = part_duration.as_secs() as i64;
534        // SAFETY: Timestamps won't overflow when converting to Second.
535        let start_bucket = min
536            .convert_to(TimeUnit::Second)
537            .unwrap()
538            .value()
539            .div_euclid(part_duration_sec);
540        let end_bucket = max
541            .convert_to(TimeUnit::Second)
542            .unwrap()
543            .value()
544            .div_euclid(part_duration_sec);
545        let bucket_num = (end_bucket - start_bucket + 1) as usize;
546
547        let num_timestamps = ts_array.len();
548        let missing = if bucket_num <= num_timestamps {
549            (start_bucket..=end_bucket)
550                .filter_map(|start_sec| {
551                    let Some(timestamp) = Timestamp::new_second(start_sec * part_duration_sec)
552                        .convert_to(timestamp_unit)
553                    else {
554                        return Some(
555                            InvalidRequestSnafu {
556                                region_id: self.metadata.region_id,
557                                reason: format!("Timestamp out of range: {}", start_sec),
558                            }
559                            .fail(),
560                        );
561                    };
562                    if present.insert(timestamp.value()) {
563                        Some(Ok(timestamp))
564                    } else {
565                        None
566                    }
567                })
568                .collect::<Result<Vec<_>>>()?
569        } else {
570            let ts_primitive = match ts_array.data_type() {
571                DataType::Timestamp(unit, _) => match unit {
572                    arrow::datatypes::TimeUnit::Second => ts_array
573                        .as_any()
574                        .downcast_ref::<TimestampSecondArray>()
575                        .unwrap()
576                        .reinterpret_cast::<Int64Type>(),
577                    arrow::datatypes::TimeUnit::Millisecond => ts_array
578                        .as_any()
579                        .downcast_ref::<TimestampMillisecondArray>()
580                        .unwrap()
581                        .reinterpret_cast::<Int64Type>(),
582                    arrow::datatypes::TimeUnit::Microsecond => ts_array
583                        .as_any()
584                        .downcast_ref::<TimestampMicrosecondArray>()
585                        .unwrap()
586                        .reinterpret_cast::<Int64Type>(),
587                    arrow::datatypes::TimeUnit::Nanosecond => ts_array
588                        .as_any()
589                        .downcast_ref::<TimestampNanosecondArray>()
590                        .unwrap()
591                        .reinterpret_cast::<Int64Type>(),
592                },
593                _ => unreachable!(),
594            };
595
596            ts_primitive
597                .values()
598                .iter()
599                .filter_map(|ts| {
600                    let ts = self.metadata.time_index_type().create_timestamp(*ts);
601                    let Some(bucket_start) = ts
602                        .convert_to(TimeUnit::Second)
603                        .and_then(|ts| ts.align_by_bucket(part_duration_sec))
604                        .and_then(|ts| ts.convert_to(timestamp_unit))
605                    else {
606                        return Some(
607                            InvalidRequestSnafu {
608                                region_id: self.metadata.region_id,
609                                reason: format!("Timestamp out of range: {:?}", ts),
610                            }
611                            .fail(),
612                        );
613                    };
614                    if present.insert(bucket_start.value()) {
615                        Some(Ok(bucket_start))
616                    } else {
617                        None
618                    }
619                })
620                .collect::<Result<Vec<_>>>()?
621        };
622        Ok((matching, missing))
623    }
624
625    /// Returns partition duration, or use default 1day duration is not present.
626    fn part_duration_or_default(&self) -> Duration {
627        self.part_duration
628    }
629
630    /// Write to multiple partitions.
631    fn write_multi_parts(&self, kvs: &KeyValues, parts: &PartitionVec) -> Result<()> {
632        let mut parts_to_write = HashMap::new();
633        let mut missing_parts = HashMap::new();
634        for kv in kvs.iter() {
635            let mut part_found = false;
636            // Safety: We used the timestamp before.
637            let ts = kv.timestamp().as_timestamp().unwrap().unwrap();
638            for part in parts {
639                if part.contains_timestamp(ts) {
640                    parts_to_write
641                        .entry(part.time_range.min_timestamp)
642                        .or_insert_with(|| PartitionToWrite {
643                            partition: part.clone(),
644                            key_values: Vec::new(),
645                        })
646                        .key_values
647                        .push(kv);
648                    part_found = true;
649                    break;
650                }
651            }
652
653            if !part_found {
654                // We need to write it to a new part.
655                // Safety: `new()` ensures duration is always Some if we do to this method.
656                let part_duration = self.part_duration_or_default();
657                let part_start =
658                    partition_start_timestamp(ts, part_duration).with_context(|| {
659                        InvalidRequestSnafu {
660                            region_id: self.metadata.region_id,
661                            reason: format!(
662                                "timestamp {ts:?} and bucket {part_duration:?} are out of range"
663                            ),
664                        }
665                    })?;
666                missing_parts
667                    .entry(part_start)
668                    .or_insert_with(Vec::new)
669                    .push(kv);
670            }
671        }
672
673        // Writes rows to existing parts.
674        for part_to_write in parts_to_write.into_values() {
675            for kv in part_to_write.key_values {
676                part_to_write.partition.memtable.write_one(kv)?;
677            }
678        }
679
680        // Creates new parts and writes to them. Acquires the lock to avoid others create
681        // the same partition.
682        let mut inner = self.inner.lock().unwrap();
683        for (part_start, key_values) in missing_parts {
684            let partition = self.get_or_create_time_partition(part_start, &mut inner)?;
685            for kv in key_values {
686                partition.memtable.write_one(kv)?;
687            }
688        }
689
690        Ok(())
691    }
692
693    /// Timeseries count in all time partitions.
694    pub(crate) fn series_count(&self) -> usize {
695        self.inner.lock().unwrap().series_count()
696    }
697}
698
699/// Computes the start timestamp of the partition for `ts`.
700///
701/// It always use bucket size in seconds which should fit all timestamp resolution.
702fn partition_start_timestamp(ts: Timestamp, bucket: Duration) -> Option<Timestamp> {
703    // Safety: We convert it to seconds so it never returns `None`.
704    let ts_sec = ts.convert_to(TimeUnit::Second).unwrap();
705    let bucket_sec: i64 = bucket.as_secs().try_into().ok()?;
706    let start_sec = ts_sec.align_by_bucket(bucket_sec)?;
707    start_sec.convert_to(ts.unit())
708}
709
710#[derive(Debug)]
711struct PartitionsInner {
712    /// All partitions.
713    parts: PartitionVec,
714    /// Next memtable id.
715    next_memtable_id: MemtableId,
716}
717
718impl PartitionsInner {
719    fn new(next_memtable_id: MemtableId) -> Self {
720        Self {
721            parts: Default::default(),
722            next_memtable_id,
723        }
724    }
725
726    fn with_partition(part: TimePartition, next_memtable_id: MemtableId) -> Self {
727        Self {
728            parts: smallvec![part],
729            next_memtable_id,
730        }
731    }
732
733    fn alloc_memtable_id(&mut self) -> MemtableId {
734        let id = self.next_memtable_id;
735        self.next_memtable_id += 1;
736        id
737    }
738
739    pub(crate) fn series_count(&self) -> usize {
740        self.parts
741            .iter()
742            .map(|p| p.memtable.stats().series_count)
743            .sum()
744    }
745}
746
747/// Time range of a partition.
748#[derive(Debug, Clone, Copy)]
749struct PartTimeRange {
750    /// Inclusive min timestamp of rows in the partition.
751    min_timestamp: Timestamp,
752    /// Exclusive max timestamp of rows in the partition.
753    max_timestamp: Timestamp,
754}
755
756impl PartTimeRange {
757    fn from_start_duration(start: Timestamp, duration: Duration) -> Option<Self> {
758        let start_sec = start.convert_to(TimeUnit::Second)?;
759        let end_sec = start_sec.add_duration(duration).ok()?;
760        let min_timestamp = start_sec.convert_to(start.unit())?;
761        let max_timestamp = end_sec.convert_to(start.unit())?;
762
763        Some(Self {
764            min_timestamp,
765            max_timestamp,
766        })
767    }
768
769    /// Returns whether the `ts` belongs to the partition.
770    fn contains_timestamp(&self, ts: Timestamp) -> bool {
771        self.min_timestamp <= ts && ts < self.max_timestamp
772    }
773}
774
775struct PartitionToWrite<'a> {
776    partition: TimePartition,
777    key_values: Vec<KeyValue<'a>>,
778}
779
780#[cfg(test)]
781mod tests {
782    use std::sync::Arc;
783
784    use api::v1::SemanticType;
785    use datatypes::arrow::array::{ArrayRef, StringArray, TimestampMillisecondArray};
786    use datatypes::arrow::datatypes::{DataType, Field, Schema};
787    use datatypes::arrow::record_batch::RecordBatch;
788    use datatypes::prelude::ConcreteDataType;
789    use datatypes::schema::ColumnSchema;
790    use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
791    use store_api::storage::SequenceNumber;
792
793    use super::*;
794    use crate::memtable::partition_tree::PartitionTreeMemtableBuilder;
795    use crate::memtable::time_series::TimeSeriesMemtableBuilder;
796    use crate::test_util::memtable_util::{self, collect_iter_timestamps};
797
798    #[test]
799    fn test_no_duration() {
800        let metadata = memtable_util::metadata_for_test();
801        let builder = Arc::new(PartitionTreeMemtableBuilder::default());
802        let partitions = TimePartitions::new(metadata.clone(), builder, 0, None);
803        assert_eq!(0, partitions.num_partitions());
804        assert!(partitions.is_empty());
805
806        let kvs = memtable_util::build_key_values(
807            &metadata,
808            "hello".to_string(),
809            0,
810            &[1000, 3000, 7000, 5000, 6000],
811            0, // sequence 0, 1, 2, 3, 4
812        );
813        partitions.write(&kvs).unwrap();
814
815        assert_eq!(1, partitions.num_partitions());
816        assert!(!partitions.is_empty());
817        let mut memtables = Vec::new();
818        partitions.list_memtables(&mut memtables);
819        assert_eq!(0, memtables[0].id());
820
821        let iter = memtables[0].iter(None, None, None).unwrap();
822        let timestamps = collect_iter_timestamps(iter);
823        assert_eq!(&[1000, 3000, 5000, 6000, 7000], &timestamps[..]);
824    }
825
826    #[test]
827    fn test_write_single_part() {
828        let metadata = memtable_util::metadata_for_test();
829        let builder = Arc::new(PartitionTreeMemtableBuilder::default());
830        let partitions =
831            TimePartitions::new(metadata.clone(), builder, 0, Some(Duration::from_secs(10)));
832        assert_eq!(0, partitions.num_partitions());
833
834        let kvs = memtable_util::build_key_values(
835            &metadata,
836            "hello".to_string(),
837            0,
838            &[5000, 2000, 0],
839            0, // sequence 0, 1, 2
840        );
841        // It should creates a new partition.
842        partitions.write(&kvs).unwrap();
843        assert_eq!(1, partitions.num_partitions());
844        assert!(!partitions.is_empty());
845
846        let kvs = memtable_util::build_key_values(
847            &metadata,
848            "hello".to_string(),
849            0,
850            &[3000, 7000, 4000],
851            3, // sequence 3, 4, 5
852        );
853        // Still writes to the same partition.
854        partitions.write(&kvs).unwrap();
855        assert_eq!(1, partitions.num_partitions());
856
857        let mut memtables = Vec::new();
858        partitions.list_memtables(&mut memtables);
859        let iter = memtables[0].iter(None, None, None).unwrap();
860        let timestamps = collect_iter_timestamps(iter);
861        assert_eq!(&[0, 2000, 3000, 4000, 5000, 7000], &timestamps[..]);
862        let parts = partitions.list_partitions();
863        assert_eq!(
864            Timestamp::new_millisecond(0),
865            parts[0].time_range.min_timestamp
866        );
867        assert_eq!(
868            Timestamp::new_millisecond(10000),
869            parts[0].time_range.max_timestamp
870        );
871    }
872
873    #[cfg(test)]
874    fn new_multi_partitions(metadata: &RegionMetadataRef) -> TimePartitions {
875        let builder = Arc::new(PartitionTreeMemtableBuilder::default());
876        let partitions =
877            TimePartitions::new(metadata.clone(), builder, 0, Some(Duration::from_secs(5)));
878        assert_eq!(0, partitions.num_partitions());
879
880        let kvs = memtable_util::build_key_values(
881            metadata,
882            "hello".to_string(),
883            0,
884            &[2000, 0],
885            0, // sequence 0, 1
886        );
887        // It should creates a new partition.
888        partitions.write(&kvs).unwrap();
889        assert_eq!(1, partitions.num_partitions());
890        assert!(!partitions.is_empty());
891
892        let kvs = memtable_util::build_key_values(
893            metadata,
894            "hello".to_string(),
895            0,
896            &[3000, 7000, 4000, 5000],
897            2, // sequence 2, 3, 4, 5
898        );
899        // Writes 2 rows to the old partition and 1 row to a new partition.
900        partitions.write(&kvs).unwrap();
901        assert_eq!(2, partitions.num_partitions());
902
903        partitions
904    }
905
906    #[test]
907    fn test_write_multi_parts() {
908        let metadata = memtable_util::metadata_for_test();
909        let partitions = new_multi_partitions(&metadata);
910
911        let parts = partitions.list_partitions();
912        let iter = parts[0].memtable.iter(None, None, None).unwrap();
913        let timestamps = collect_iter_timestamps(iter);
914        assert_eq!(0, parts[0].memtable.id());
915        assert_eq!(
916            Timestamp::new_millisecond(0),
917            parts[0].time_range.min_timestamp
918        );
919        assert_eq!(
920            Timestamp::new_millisecond(5000),
921            parts[0].time_range.max_timestamp
922        );
923        assert_eq!(&[0, 2000, 3000, 4000], &timestamps[..]);
924        let iter = parts[1].memtable.iter(None, None, None).unwrap();
925        assert_eq!(1, parts[1].memtable.id());
926        let timestamps = collect_iter_timestamps(iter);
927        assert_eq!(&[5000, 7000], &timestamps[..]);
928        assert_eq!(
929            Timestamp::new_millisecond(5000),
930            parts[1].time_range.min_timestamp
931        );
932        assert_eq!(
933            Timestamp::new_millisecond(10000),
934            parts[1].time_range.max_timestamp
935        );
936    }
937
938    #[test]
939    fn test_new_with_part_duration() {
940        let metadata = memtable_util::metadata_for_test();
941        let builder = Arc::new(PartitionTreeMemtableBuilder::default());
942        let partitions = TimePartitions::new(metadata.clone(), builder.clone(), 0, None);
943
944        let new_parts = partitions.new_with_part_duration(Some(Duration::from_secs(5)));
945        assert_eq!(Duration::from_secs(5), new_parts.part_duration());
946        assert_eq!(0, new_parts.next_memtable_id());
947
948        // Won't update the duration if it's None.
949        let new_parts = new_parts.new_with_part_duration(None);
950        assert_eq!(Duration::from_secs(5), new_parts.part_duration());
951        // Don't need to create new memtables.
952        assert_eq!(0, new_parts.next_memtable_id());
953
954        let new_parts = new_parts.new_with_part_duration(Some(Duration::from_secs(10)));
955        assert_eq!(Duration::from_secs(10), new_parts.part_duration());
956        // Don't need to create new memtables.
957        assert_eq!(0, new_parts.next_memtable_id());
958
959        let builder = Arc::new(PartitionTreeMemtableBuilder::default());
960        let partitions = TimePartitions::new(metadata.clone(), builder.clone(), 0, None);
961        // Need to build a new memtable as duration is still None.
962        let new_parts = partitions.new_with_part_duration(None);
963        assert_eq!(INITIAL_TIME_WINDOW, new_parts.part_duration());
964        assert_eq!(0, new_parts.next_memtable_id());
965    }
966
967    #[test]
968    fn test_fork_empty() {
969        let metadata = memtable_util::metadata_for_test();
970        let builder = Arc::new(PartitionTreeMemtableBuilder::default());
971        let partitions = TimePartitions::new(metadata.clone(), builder, 0, None);
972        partitions.freeze().unwrap();
973        let new_parts = partitions.fork(&metadata, None);
974        assert_eq!(INITIAL_TIME_WINDOW, new_parts.part_duration());
975        assert!(new_parts.list_partitions().is_empty());
976        assert_eq!(0, new_parts.next_memtable_id());
977
978        new_parts.freeze().unwrap();
979        let new_parts = new_parts.fork(&metadata, Some(Duration::from_secs(5)));
980        assert_eq!(Duration::from_secs(5), new_parts.part_duration());
981        assert!(new_parts.list_partitions().is_empty());
982        assert_eq!(0, new_parts.next_memtable_id());
983
984        new_parts.freeze().unwrap();
985        let new_parts = new_parts.fork(&metadata, None);
986        // Won't update the duration.
987        assert_eq!(Duration::from_secs(5), new_parts.part_duration());
988        assert!(new_parts.list_partitions().is_empty());
989        assert_eq!(0, new_parts.next_memtable_id());
990
991        new_parts.freeze().unwrap();
992        let new_parts = new_parts.fork(&metadata, Some(Duration::from_secs(10)));
993        assert_eq!(Duration::from_secs(10), new_parts.part_duration());
994        assert!(new_parts.list_partitions().is_empty());
995        assert_eq!(0, new_parts.next_memtable_id());
996    }
997
998    #[test]
999    fn test_fork_non_empty_none() {
1000        let metadata = memtable_util::metadata_for_test();
1001        let partitions = new_multi_partitions(&metadata);
1002        partitions.freeze().unwrap();
1003
1004        // Won't update the duration.
1005        let new_parts = partitions.fork(&metadata, None);
1006        assert!(new_parts.is_empty());
1007        assert_eq!(Duration::from_secs(5), new_parts.part_duration());
1008        assert_eq!(2, new_parts.list_partitions()[0].memtable.id());
1009        assert_eq!(3, new_parts.next_memtable_id());
1010
1011        // Although we don't fork a memtable multiple times, we still add a test for it.
1012        let new_parts = partitions.fork(&metadata, Some(Duration::from_secs(10)));
1013        assert!(new_parts.is_empty());
1014        assert_eq!(Duration::from_secs(10), new_parts.part_duration());
1015        assert_eq!(3, new_parts.list_partitions()[0].memtable.id());
1016        assert_eq!(4, new_parts.next_memtable_id());
1017    }
1018
1019    #[test]
1020    fn test_find_partitions_by_time_range() {
1021        let metadata = memtable_util::metadata_for_test();
1022        let builder = Arc::new(PartitionTreeMemtableBuilder::default());
1023
1024        // Case 1: No time range partitioning
1025        let partitions = TimePartitions::new(metadata.clone(), builder.clone(), 0, None);
1026        let parts = partitions.list_partitions();
1027        let (matching, missing) = partitions
1028            .find_partitions_by_time_range(
1029                &(Arc::new(TimestampMillisecondArray::from_iter_values(1000..=2000)) as ArrayRef),
1030                &parts,
1031                Timestamp::new_millisecond(1000),
1032                Timestamp::new_millisecond(2000),
1033            )
1034            .unwrap();
1035        assert_eq!(matching.len(), 0);
1036        assert_eq!(missing.len(), 1);
1037        assert_eq!(missing[0], Timestamp::new_millisecond(0));
1038
1039        // Case 2: With time range partitioning
1040        let partitions = TimePartitions::new(
1041            metadata.clone(),
1042            builder.clone(),
1043            0,
1044            Some(Duration::from_secs(5)),
1045        );
1046
1047        // Create two existing partitions: [0, 5000) and [5000, 10000)
1048        let kvs =
1049            memtable_util::build_key_values(&metadata, "hello".to_string(), 0, &[2000, 4000], 0);
1050        partitions.write(&kvs).unwrap();
1051        let kvs =
1052            memtable_util::build_key_values(&metadata, "hello".to_string(), 0, &[7000, 8000], 2);
1053        partitions.write(&kvs).unwrap();
1054
1055        let parts = partitions.list_partitions();
1056        assert_eq!(2, parts.len());
1057
1058        // Test case 2a: Query fully within existing partition
1059        let (matching, missing) = partitions
1060            .find_partitions_by_time_range(
1061                &(Arc::new(TimestampMillisecondArray::from_iter_values(2000..=4000)) as ArrayRef),
1062                &parts,
1063                Timestamp::new_millisecond(2000),
1064                Timestamp::new_millisecond(4000),
1065            )
1066            .unwrap();
1067        assert_eq!(matching.len(), 1);
1068        assert!(missing.is_empty());
1069        assert_eq!(matching[0].time_range.min_timestamp.value(), 0);
1070
1071        // Test case 2b: Query spanning multiple existing partitions
1072        let (matching, missing) = partitions
1073            .find_partitions_by_time_range(
1074                &(Arc::new(TimestampMillisecondArray::from_iter_values(3000..=8000)) as ArrayRef),
1075                &parts,
1076                Timestamp::new_millisecond(3000),
1077                Timestamp::new_millisecond(8000),
1078            )
1079            .unwrap();
1080        assert_eq!(matching.len(), 2);
1081        assert!(missing.is_empty());
1082        assert_eq!(matching[0].time_range.min_timestamp.value(), 0);
1083        assert_eq!(matching[1].time_range.min_timestamp.value(), 5000);
1084
1085        // Test case 2c: Query requiring new partition
1086        let (matching, missing) = partitions
1087            .find_partitions_by_time_range(
1088                &(Arc::new(TimestampMillisecondArray::from_iter_values(12000..=13000)) as ArrayRef),
1089                &parts,
1090                Timestamp::new_millisecond(12000),
1091                Timestamp::new_millisecond(13000),
1092            )
1093            .unwrap();
1094        assert!(matching.is_empty());
1095        assert_eq!(missing.len(), 1);
1096        assert_eq!(missing[0].value(), 10000);
1097
1098        // Test case 2d: Query partially overlapping existing partition
1099        let (matching, missing) = partitions
1100            .find_partitions_by_time_range(
1101                &(Arc::new(TimestampMillisecondArray::from_iter_values(4000..=6000)) as ArrayRef),
1102                &parts,
1103                Timestamp::new_millisecond(4000),
1104                Timestamp::new_millisecond(6000),
1105            )
1106            .unwrap();
1107        assert_eq!(matching.len(), 2);
1108        assert!(missing.is_empty());
1109        assert_eq!(matching[0].time_range.min_timestamp.value(), 0);
1110        assert_eq!(matching[1].time_range.min_timestamp.value(), 5000);
1111
1112        // Test case 2e: Corner case
1113        let (matching, missing) = partitions
1114            .find_partitions_by_time_range(
1115                &(Arc::new(TimestampMillisecondArray::from_iter_values(4999..=5000)) as ArrayRef),
1116                &parts,
1117                Timestamp::new_millisecond(4999),
1118                Timestamp::new_millisecond(5000),
1119            )
1120            .unwrap();
1121        assert_eq!(matching.len(), 2);
1122        assert!(missing.is_empty());
1123        assert_eq!(matching[0].time_range.min_timestamp.value(), 0);
1124        assert_eq!(matching[1].time_range.min_timestamp.value(), 5000);
1125
1126        // Test case 2f: Corner case with
1127        let (matching, missing) = partitions
1128            .find_partitions_by_time_range(
1129                &(Arc::new(TimestampMillisecondArray::from_iter_values(9999..=10000)) as ArrayRef),
1130                &parts,
1131                Timestamp::new_millisecond(9999),
1132                Timestamp::new_millisecond(10000),
1133            )
1134            .unwrap();
1135        assert_eq!(matching.len(), 1);
1136        assert_eq!(1, missing.len());
1137        assert_eq!(matching[0].time_range.min_timestamp.value(), 5000);
1138        assert_eq!(missing[0].value(), 10000);
1139
1140        // Test case 2g: Cross 0
1141        let (matching, missing) = partitions
1142            .find_partitions_by_time_range(
1143                &(Arc::new(TimestampMillisecondArray::from_iter_values(-1000..=1000)) as ArrayRef),
1144                &parts,
1145                Timestamp::new_millisecond(-1000),
1146                Timestamp::new_millisecond(1000),
1147            )
1148            .unwrap();
1149        assert_eq!(matching.len(), 1);
1150        assert_eq!(matching[0].time_range.min_timestamp.value(), 0);
1151        assert_eq!(1, missing.len());
1152        assert_eq!(missing[0].value(), -5000);
1153
1154        // Test case 3: sparse data
1155        let (matching, missing) = partitions
1156            .find_partitions_by_time_range(
1157                &(Arc::new(TimestampMillisecondArray::from(vec![
1158                    -100000000000,
1159                    0,
1160                    100000000000,
1161                ])) as ArrayRef),
1162                &parts,
1163                Timestamp::new_millisecond(-100000000000),
1164                Timestamp::new_millisecond(100000000000),
1165            )
1166            .unwrap();
1167        assert_eq!(2, matching.len());
1168        assert_eq!(matching[0].time_range.min_timestamp.value(), 0);
1169        assert_eq!(matching[1].time_range.min_timestamp.value(), 5000);
1170        assert_eq!(2, missing.len());
1171        assert_eq!(missing[0].value(), -100000000000);
1172        assert_eq!(missing[1].value(), 100000000000);
1173    }
1174
1175    fn build_part(ts: &[i64], sequence: SequenceNumber) -> BulkPart {
1176        let schema = Arc::new(Schema::new(vec![
1177            Field::new(
1178                "ts",
1179                DataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
1180                false,
1181            ),
1182            Field::new("val", DataType::Utf8, true),
1183        ]));
1184        let ts_data = ts.to_vec();
1185        let ts_array = Arc::new(TimestampMillisecondArray::from(ts_data));
1186        let val_array = Arc::new(StringArray::from_iter_values(
1187            ts.iter().map(|v| v.to_string()),
1188        ));
1189        let batch = RecordBatch::try_new(
1190            schema,
1191            vec![ts_array.clone() as ArrayRef, val_array.clone() as ArrayRef],
1192        )
1193        .unwrap();
1194        let max_ts = ts.iter().max().copied().unwrap();
1195        let min_ts = ts.iter().min().copied().unwrap();
1196        BulkPart {
1197            batch,
1198            max_ts,
1199            min_ts,
1200            sequence,
1201            timestamp_index: 0,
1202            raw_data: None,
1203        }
1204    }
1205
1206    #[test]
1207    fn test_write_bulk() {
1208        let mut metadata_builder = RegionMetadataBuilder::new(0.into());
1209        metadata_builder
1210            .push_column_metadata(ColumnMetadata {
1211                column_schema: ColumnSchema::new(
1212                    "ts",
1213                    ConcreteDataType::timestamp_millisecond_datatype(),
1214                    false,
1215                ),
1216                semantic_type: SemanticType::Timestamp,
1217                column_id: 0,
1218            })
1219            .push_column_metadata(ColumnMetadata {
1220                column_schema: ColumnSchema::new("val", ConcreteDataType::string_datatype(), false),
1221                semantic_type: SemanticType::Field,
1222                column_id: 1,
1223            })
1224            .primary_key(vec![]);
1225        let metadata = Arc::new(metadata_builder.build().unwrap());
1226
1227        let builder = Arc::new(TimeSeriesMemtableBuilder::default());
1228        let partitions = TimePartitions::new(
1229            metadata.clone(),
1230            builder.clone(),
1231            0,
1232            Some(Duration::from_secs(5)),
1233        );
1234
1235        // Test case 1: Write to single partition
1236        partitions
1237            .write_bulk(build_part(&[1000, 2000, 3000], 0))
1238            .unwrap();
1239
1240        let parts = partitions.list_partitions();
1241        assert_eq!(1, parts.len());
1242        let iter = parts[0].memtable.iter(None, None, None).unwrap();
1243        let timestamps = collect_iter_timestamps(iter);
1244        assert_eq!(&[1000, 2000, 3000], &timestamps[..]);
1245
1246        // Test case 2: Write across multiple existing partitions
1247        partitions
1248            .write_bulk(build_part(&[4000, 5000, 6000], 1))
1249            .unwrap();
1250        let parts = partitions.list_partitions();
1251        assert_eq!(2, parts.len());
1252        // Check first partition [0, 5000)
1253        let iter = parts[0].memtable.iter(None, None, None).unwrap();
1254        let timestamps = collect_iter_timestamps(iter);
1255        assert_eq!(&[1000, 2000, 3000, 4000], &timestamps[..]);
1256        // Check second partition [5000, 10000)
1257        let iter = parts[1].memtable.iter(None, None, None).unwrap();
1258        let timestamps = collect_iter_timestamps(iter);
1259        assert_eq!(&[5000, 6000], &timestamps[..]);
1260
1261        // Test case 3: Write requiring new partition
1262        partitions
1263            .write_bulk(build_part(&[11000, 12000], 3))
1264            .unwrap();
1265
1266        let parts = partitions.list_partitions();
1267        assert_eq!(3, parts.len());
1268
1269        // Check new partition [10000, 15000)
1270        let iter = parts[2].memtable.iter(None, None, None).unwrap();
1271        let timestamps = collect_iter_timestamps(iter);
1272        assert_eq!(&[11000, 12000], &timestamps[..]);
1273
1274        // Test case 4: Write with no time range partitioning
1275        let partitions = TimePartitions::new(metadata.clone(), builder, 3, None);
1276
1277        partitions
1278            .write_bulk(build_part(&[1000, 5000, 9000], 4))
1279            .unwrap();
1280
1281        let parts = partitions.list_partitions();
1282        assert_eq!(1, parts.len());
1283        let iter = parts[0].memtable.iter(None, None, None).unwrap();
1284        let timestamps = collect_iter_timestamps(iter);
1285        assert_eq!(&[1000, 5000, 9000], &timestamps[..]);
1286    }
1287
1288    #[test]
1289    fn test_split_record_batch() {
1290        let schema = Arc::new(Schema::new(vec![
1291            Field::new(
1292                "ts",
1293                DataType::Timestamp(TimeUnit::Millisecond.as_arrow_time_unit(), None),
1294                false,
1295            ),
1296            Field::new("val", DataType::Utf8, true),
1297        ]));
1298
1299        let ts_array = Arc::new(TimestampMillisecondArray::from(vec![
1300            1000, 2000, 5000, 7000, 8000,
1301        ]));
1302        let val_array = Arc::new(StringArray::from(vec!["a", "b", "c", "d", "e"]));
1303        let batch = RecordBatch::try_new(
1304            schema.clone(),
1305            vec![ts_array as ArrayRef, val_array as ArrayRef],
1306        )
1307        .unwrap();
1308
1309        let part = BulkPart {
1310            batch,
1311            max_ts: 8000,
1312            min_ts: 1000,
1313            sequence: 0,
1314            timestamp_index: 0,
1315            raw_data: None,
1316        };
1317
1318        let result = filter_record_batch(&part, 1000, 2000).unwrap();
1319        assert!(result.is_some());
1320        let filtered = result.unwrap();
1321        assert_eq!(filtered.num_rows(), 1);
1322        assert_eq!(filtered.min_ts, 1000);
1323        assert_eq!(filtered.max_ts, 1000);
1324
1325        // Test splitting with range [3000, 6000)
1326        let result = filter_record_batch(&part, 3000, 6000).unwrap();
1327        assert!(result.is_some());
1328        let filtered = result.unwrap();
1329        assert_eq!(filtered.num_rows(), 1);
1330        assert_eq!(filtered.min_ts, 5000);
1331        assert_eq!(filtered.max_ts, 5000);
1332
1333        // Test splitting with range that includes no points
1334        let result = filter_record_batch(&part, 3000, 4000).unwrap();
1335        assert!(result.is_none());
1336
1337        // Test splitting with range that includes all points
1338        let result = filter_record_batch(&part, 0, 9000).unwrap();
1339        assert!(result.is_some());
1340        let filtered = result.unwrap();
1341        assert_eq!(filtered.num_rows(), 5);
1342        assert_eq!(filtered.min_ts, 1000);
1343        assert_eq!(filtered.max_ts, 8000);
1344    }
1345}