mito2/memtable/
time_partition.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Partitions memtables by time.
16
17use std::collections::{HashMap, HashSet};
18use std::sync::{Arc, Mutex, MutexGuard};
19use std::time::Duration;
20
21use common_telemetry::debug;
22use common_time::Timestamp;
23use common_time::timestamp::TimeUnit;
24use common_time::timestamp_millis::BucketAligned;
25use datatypes::arrow;
26use datatypes::arrow::array::{
27    ArrayRef, BooleanArray, RecordBatch, RecordBatchOptions, TimestampMicrosecondArray,
28    TimestampMillisecondArray, TimestampNanosecondArray, TimestampSecondArray,
29};
30use datatypes::arrow::buffer::{BooleanBuffer, MutableBuffer};
31use datatypes::arrow::datatypes::{DataType, Int64Type, SchemaRef};
32use mito_codec::key_values::KeyValue;
33use mito_codec::row_converter::{PrimaryKeyCodec, build_primary_key_codec};
34use smallvec::{SmallVec, smallvec};
35use snafu::{OptionExt, ResultExt};
36use store_api::metadata::RegionMetadataRef;
37
38use crate::error;
39use crate::error::{InvalidRequestSnafu, Result};
40use crate::memtable::bulk::part::{BulkPart, BulkPartConverter};
41use crate::memtable::version::SmallMemtableVec;
42use crate::memtable::{KeyValues, MemtableBuilderRef, MemtableId, MemtableRef};
43use crate::sst::{FlatSchemaOptions, to_flat_sst_arrow_schema};
44
45/// Initial time window if not specified.
46const INITIAL_TIME_WINDOW: Duration = Duration::from_days(1);
47
48/// A partition holds rows with timestamps between `[min, max)`.
49#[derive(Debug, Clone)]
50pub struct TimePartition {
51    /// Memtable of the partition.
52    memtable: MemtableRef,
53    /// Time range of the partition. `min` is inclusive and `max` is exclusive.
54    time_range: PartTimeRange,
55}
56
57impl TimePartition {
58    /// Returns whether the `ts` belongs to the partition.
59    fn contains_timestamp(&self, ts: Timestamp) -> bool {
60        self.time_range.contains_timestamp(ts)
61    }
62
63    /// Write rows to the part.
64    fn write(&self, kvs: &KeyValues) -> Result<()> {
65        self.memtable.write(kvs)
66    }
67
68    /// Writes a record batch to memtable.
69    fn write_record_batch(&self, rb: BulkPart) -> Result<()> {
70        self.memtable.write_bulk(rb)
71    }
72
73    /// Write a partial [BulkPart] according to [TimePartition::time_range].
74    fn write_record_batch_partial(&self, part: &BulkPart) -> Result<()> {
75        let Some(filtered) = filter_record_batch(
76            part,
77            self.time_range.min_timestamp.value(),
78            self.time_range.max_timestamp.value(),
79        )?
80        else {
81            return Ok(());
82        };
83        self.write_record_batch(filtered)
84    }
85}
86
87macro_rules! create_filter_buffer {
88    ($ts_array:expr, $min:expr, $max:expr) => {{
89        let len = $ts_array.len();
90        let mut buffer = MutableBuffer::new(len.div_ceil(64) * 8);
91
92        let f = |idx: usize| -> bool {
93            // SAFETY: we only iterate the array within index bound.
94            unsafe {
95                let val = $ts_array.value_unchecked(idx);
96                val >= $min && val < $max
97            }
98        };
99
100        let chunks = len / 64;
101        let remainder = len % 64;
102
103        for chunk in 0..chunks {
104            let mut packed = 0;
105            for bit_idx in 0..64 {
106                let i = bit_idx + chunk * 64;
107                packed |= (f(i) as u64) << bit_idx;
108            }
109            // SAFETY: Already allocated sufficient capacity
110            unsafe { buffer.push_unchecked(packed) }
111        }
112
113        if remainder != 0 {
114            let mut packed = 0;
115            for bit_idx in 0..remainder {
116                let i = bit_idx + chunks * 64;
117                packed |= (f(i) as u64) << bit_idx;
118            }
119            // SAFETY: Already allocated sufficient capacity
120            unsafe { buffer.push_unchecked(packed) }
121        }
122
123        BooleanArray::new(BooleanBuffer::new(buffer.into(), 0, len), None)
124    }};
125}
126
127macro_rules! handle_timestamp_array {
128    ($ts_array:expr, $array_type:ty, $min:expr, $max:expr) => {{
129        let ts_array = $ts_array.as_any().downcast_ref::<$array_type>().unwrap();
130        let filter = create_filter_buffer!(ts_array, $min, $max);
131
132        let res = arrow::compute::filter(ts_array, &filter).context(error::ComputeArrowSnafu)?;
133        if res.is_empty() {
134            return Ok(None);
135        }
136
137        let i64array = res.as_any().downcast_ref::<$array_type>().unwrap();
138        // safety: we've checked res is not empty
139        let max_ts = arrow::compute::max(i64array).unwrap();
140        let min_ts = arrow::compute::min(i64array).unwrap();
141
142        (res, filter, min_ts, max_ts)
143    }};
144}
145
146/// Filters the given part according to min (inclusive) and max (exclusive) timestamp range.
147/// Returns [None] if no matching rows.
148pub fn filter_record_batch(part: &BulkPart, min: i64, max: i64) -> Result<Option<BulkPart>> {
149    let ts_array = part.timestamps();
150    let (ts_array, filter, min_ts, max_ts) = match ts_array.data_type() {
151        DataType::Timestamp(unit, _) => match unit {
152            arrow::datatypes::TimeUnit::Second => {
153                handle_timestamp_array!(ts_array, TimestampSecondArray, min, max)
154            }
155            arrow::datatypes::TimeUnit::Millisecond => {
156                handle_timestamp_array!(ts_array, TimestampMillisecondArray, min, max)
157            }
158            arrow::datatypes::TimeUnit::Microsecond => {
159                handle_timestamp_array!(ts_array, TimestampMicrosecondArray, min, max)
160            }
161            arrow::datatypes::TimeUnit::Nanosecond => {
162                handle_timestamp_array!(ts_array, TimestampNanosecondArray, min, max)
163            }
164        },
165        _ => {
166            unreachable!("Got data type: {:?}", ts_array.data_type());
167        }
168    };
169
170    let num_rows = ts_array.len();
171    let arrays = part
172        .batch
173        .columns()
174        .iter()
175        .enumerate()
176        .map(|(index, array)| {
177            if index == part.timestamp_index {
178                Ok(ts_array.clone())
179            } else {
180                arrow::compute::filter(&array, &filter).context(error::ComputeArrowSnafu)
181            }
182        })
183        .collect::<Result<Vec<_>>>()?;
184    let batch = RecordBatch::try_new_with_options(
185        part.batch.schema(),
186        arrays,
187        &RecordBatchOptions::default().with_row_count(Some(num_rows)),
188    )
189    .context(error::NewRecordBatchSnafu)?;
190    Ok(Some(BulkPart {
191        batch,
192        max_timestamp: max_ts,
193        min_timestamp: min_ts,
194        sequence: part.sequence,
195        timestamp_index: part.timestamp_index,
196        raw_data: None,
197    }))
198}
199
200type PartitionVec = SmallVec<[TimePartition; 2]>;
201
202/// Partitions.
203#[derive(Debug)]
204pub struct TimePartitions {
205    /// Mutable data of partitions.
206    inner: Mutex<PartitionsInner>,
207    /// Duration of a partition.
208    part_duration: Duration,
209    /// Metadata of the region.
210    metadata: RegionMetadataRef,
211    /// Builder of memtables.
212    builder: MemtableBuilderRef,
213    /// Primary key encoder.
214    primary_key_codec: Arc<dyn PrimaryKeyCodec>,
215
216    /// Cached schema for bulk insert.
217    /// This field is Some if the memtable uses bulk insert.
218    bulk_schema: Option<SchemaRef>,
219}
220
221pub type TimePartitionsRef = Arc<TimePartitions>;
222
223impl TimePartitions {
224    /// Returns a new empty partition list with optional duration.
225    pub fn new(
226        metadata: RegionMetadataRef,
227        builder: MemtableBuilderRef,
228        next_memtable_id: MemtableId,
229        part_duration: Option<Duration>,
230    ) -> Self {
231        let inner = PartitionsInner::new(next_memtable_id);
232        let primary_key_codec = build_primary_key_codec(&metadata);
233        let bulk_schema = builder.use_bulk_insert(&metadata).then(|| {
234            let opts = FlatSchemaOptions::from_encoding(metadata.primary_key_encoding);
235            to_flat_sst_arrow_schema(&metadata, &opts)
236        });
237
238        Self {
239            inner: Mutex::new(inner),
240            part_duration: part_duration.unwrap_or(INITIAL_TIME_WINDOW),
241            metadata,
242            builder,
243            primary_key_codec,
244            bulk_schema,
245        }
246    }
247
248    /// Write key values to memtables.
249    ///
250    /// It creates new partitions if necessary.
251    pub fn write(&self, kvs: &KeyValues) -> Result<()> {
252        if let Some(bulk_schema) = &self.bulk_schema {
253            let mut converter = BulkPartConverter::new(
254                &self.metadata,
255                bulk_schema.clone(),
256                kvs.num_rows(),
257                self.primary_key_codec.clone(),
258                // Always store primary keys for bulk mode.
259                true,
260            );
261            converter.append_key_values(kvs)?;
262            let part = converter.convert()?;
263
264            return self.write_bulk(part);
265        }
266
267        // Get all parts.
268        let parts = self.list_partitions();
269
270        // Checks whether all rows belongs to a single part. Checks in reverse order as we usually
271        // put to latest part.
272        for part in parts.iter().rev() {
273            let mut all_in_partition = true;
274            for kv in kvs.iter() {
275                // Safety: We checked the schema in the write request.
276                let ts = kv.timestamp().try_into_timestamp().unwrap().unwrap();
277                if !part.contains_timestamp(ts) {
278                    all_in_partition = false;
279                    break;
280                }
281            }
282            if !all_in_partition {
283                continue;
284            }
285
286            // We can write all rows to this part.
287            return part.write(kvs);
288        }
289
290        // Slow path: We have to split kvs by partitions.
291        self.write_multi_parts(kvs, &parts)
292    }
293
294    pub fn write_bulk(&self, part: BulkPart) -> Result<()> {
295        let time_type = self
296            .metadata
297            .time_index_column()
298            .column_schema
299            .data_type
300            .as_timestamp()
301            .unwrap();
302
303        // Get all parts.
304        let parts = self.list_partitions();
305        let (matching_parts, missing_parts) = self.find_partitions_by_time_range(
306            part.timestamps(),
307            &parts,
308            time_type.create_timestamp(part.min_timestamp),
309            time_type.create_timestamp(part.max_timestamp),
310        )?;
311
312        if matching_parts.len() == 1 && missing_parts.is_empty() {
313            // fast path: all timestamps fall in one time partition.
314            return matching_parts[0].write_record_batch(part);
315        }
316
317        for matching in matching_parts {
318            matching.write_record_batch_partial(&part)?
319        }
320
321        for missing in missing_parts {
322            let new_part = {
323                let mut inner = self.inner.lock().unwrap();
324                self.get_or_create_time_partition(missing, &mut inner)?
325            };
326            new_part.write_record_batch_partial(&part)?;
327        }
328        Ok(())
329    }
330
331    // Creates or gets parts with given start timestamp.
332    // Acquires the lock to avoid others create the same partition.
333    fn get_or_create_time_partition(
334        &self,
335        part_start: Timestamp,
336        inner: &mut MutexGuard<PartitionsInner>,
337    ) -> Result<TimePartition> {
338        let part_pos = match inner
339            .parts
340            .iter()
341            .position(|part| part.time_range.min_timestamp == part_start)
342        {
343            Some(pos) => pos,
344            None => {
345                let range = PartTimeRange::from_start_duration(part_start, self.part_duration)
346                    .with_context(|| InvalidRequestSnafu {
347                        region_id: self.metadata.region_id,
348                        reason: format!(
349                            "Partition time range for {part_start:?} is out of bound, bucket size: {:?}", self.part_duration
350                        ),
351                    })?;
352                let memtable = self
353                    .builder
354                    .build(inner.alloc_memtable_id(), &self.metadata);
355                debug!(
356                    "Create time partition {:?} for region {}, duration: {:?}, memtable_id: {}, parts_total: {}, metadata: {:?}",
357                    range,
358                    self.metadata.region_id,
359                    self.part_duration,
360                    memtable.id(),
361                    inner.parts.len() + 1,
362                    self.metadata,
363                );
364                let pos = inner.parts.len();
365                inner.parts.push(TimePartition {
366                    memtable,
367                    time_range: range,
368                });
369                pos
370            }
371        };
372        Ok(inner.parts[part_pos].clone())
373    }
374
375    /// Append memtables in partitions to `memtables`.
376    pub fn list_memtables(&self, memtables: &mut Vec<MemtableRef>) {
377        let inner = self.inner.lock().unwrap();
378        memtables.extend(inner.parts.iter().map(|part| part.memtable.clone()));
379    }
380
381    /// Returns the number of partitions.
382    pub fn num_partitions(&self) -> usize {
383        let inner = self.inner.lock().unwrap();
384        inner.parts.len()
385    }
386
387    /// Returns true if all memtables are empty.
388    pub fn is_empty(&self) -> bool {
389        let inner = self.inner.lock().unwrap();
390        inner.parts.iter().all(|part| part.memtable.is_empty())
391    }
392
393    /// Freezes all memtables.
394    pub fn freeze(&self) -> Result<()> {
395        let inner = self.inner.lock().unwrap();
396        for part in &*inner.parts {
397            part.memtable.freeze()?;
398        }
399        Ok(())
400    }
401
402    /// Forks latest partition and updates the partition duration if `part_duration` is Some.
403    pub fn fork(&self, metadata: &RegionMetadataRef, part_duration: Option<Duration>) -> Self {
404        // Fall back to the existing partition duration.
405        let part_duration = part_duration.unwrap_or(self.part_duration);
406
407        let mut inner = self.inner.lock().unwrap();
408        let latest_part = inner
409            .parts
410            .iter()
411            .max_by_key(|part| part.time_range.min_timestamp)
412            .cloned();
413
414        let Some(old_part) = latest_part else {
415            // If there is no partition, then we create a new partition with the new duration.
416            return Self::new(
417                metadata.clone(),
418                self.builder.clone(),
419                inner.next_memtable_id,
420                Some(part_duration),
421            );
422        };
423
424        let old_stats = old_part.memtable.stats();
425        // Use the max timestamp to compute the new time range for the memtable.
426        let partitions_inner = old_stats
427            .time_range()
428            .and_then(|(_, old_stats_end_timestamp)| {
429                partition_start_timestamp(old_stats_end_timestamp, part_duration)
430                    .and_then(|start| PartTimeRange::from_start_duration(start, part_duration))
431            })
432            .map(|part_time_range| {
433                // Forks the latest partition, but compute the time range based on the new duration.
434                let memtable = old_part.memtable.fork(inner.alloc_memtable_id(), metadata);
435                let part = TimePartition {
436                    memtable,
437                    time_range: part_time_range,
438                };
439                PartitionsInner::with_partition(part, inner.next_memtable_id)
440            })
441            .unwrap_or_else(|| PartitionsInner::new(inner.next_memtable_id));
442
443        Self {
444            inner: Mutex::new(partitions_inner),
445            part_duration,
446            metadata: metadata.clone(),
447            builder: self.builder.clone(),
448            primary_key_codec: self.primary_key_codec.clone(),
449            bulk_schema: self.bulk_schema.clone(),
450        }
451    }
452
453    /// Returns partition duration.
454    pub(crate) fn part_duration(&self) -> Duration {
455        self.part_duration
456    }
457
458    /// Returns the memtable builder.
459    pub(crate) fn memtable_builder(&self) -> &MemtableBuilderRef {
460        &self.builder
461    }
462
463    /// Returns memory usage.
464    pub(crate) fn memory_usage(&self) -> usize {
465        let inner = self.inner.lock().unwrap();
466        inner
467            .parts
468            .iter()
469            .map(|part| part.memtable.stats().estimated_bytes)
470            .sum()
471    }
472
473    /// Returns the number of rows.
474    pub(crate) fn num_rows(&self) -> u64 {
475        let inner = self.inner.lock().unwrap();
476        inner
477            .parts
478            .iter()
479            .map(|part| part.memtable.stats().num_rows as u64)
480            .sum()
481    }
482
483    /// Append memtables in partitions to small vec.
484    pub(crate) fn list_memtables_to_small_vec(&self, memtables: &mut SmallMemtableVec) {
485        let inner = self.inner.lock().unwrap();
486        memtables.extend(inner.parts.iter().map(|part| part.memtable.clone()));
487    }
488
489    /// Returns the next memtable id.
490    pub(crate) fn next_memtable_id(&self) -> MemtableId {
491        let inner = self.inner.lock().unwrap();
492        inner.next_memtable_id
493    }
494
495    /// Creates a new empty partition list from this list and a `part_duration`.
496    /// It falls back to the old partition duration if `part_duration` is `None`.
497    pub(crate) fn new_with_part_duration(
498        &self,
499        part_duration: Option<Duration>,
500        memtable_builder: Option<MemtableBuilderRef>,
501    ) -> Self {
502        debug_assert!(self.is_empty());
503
504        Self::new(
505            self.metadata.clone(),
506            memtable_builder.unwrap_or_else(|| self.builder.clone()),
507            self.next_memtable_id(),
508            Some(part_duration.unwrap_or(self.part_duration)),
509        )
510    }
511
512    /// Returns all partitions.
513    fn list_partitions(&self) -> PartitionVec {
514        let inner = self.inner.lock().unwrap();
515        inner.parts.clone()
516    }
517
518    /// Find existing partitions that match the bulk data's time range and identify
519    /// any new partitions that need to be created
520    fn find_partitions_by_time_range<'a>(
521        &self,
522        ts_array: &ArrayRef,
523        existing_parts: &'a [TimePartition],
524        min: Timestamp,
525        max: Timestamp,
526    ) -> Result<(Vec<&'a TimePartition>, Vec<Timestamp>)> {
527        let mut matching = Vec::new();
528
529        let mut present = HashSet::new();
530        // First find any existing partitions that overlap
531        for part in existing_parts {
532            let part_time_range = &part.time_range;
533            if !(max < part_time_range.min_timestamp || min >= part_time_range.max_timestamp) {
534                matching.push(part);
535                present.insert(part_time_range.min_timestamp.value());
536            }
537        }
538
539        // safety: self.part_duration can only be present when reach here.
540        let part_duration = self.part_duration_or_default();
541        let timestamp_unit = self.metadata.time_index_type().unit();
542
543        let part_duration_sec = part_duration.as_secs() as i64;
544        // SAFETY: Timestamps won't overflow when converting to Second.
545        let start_bucket = min
546            .convert_to(TimeUnit::Second)
547            .unwrap()
548            .value()
549            .div_euclid(part_duration_sec);
550        let end_bucket = max
551            .convert_to(TimeUnit::Second)
552            .unwrap()
553            .value()
554            .div_euclid(part_duration_sec);
555        let bucket_num = (end_bucket - start_bucket + 1) as usize;
556
557        let num_timestamps = ts_array.len();
558        let missing = if bucket_num <= num_timestamps {
559            (start_bucket..=end_bucket)
560                .filter_map(|start_sec| {
561                    let Some(timestamp) = Timestamp::new_second(start_sec * part_duration_sec)
562                        .convert_to(timestamp_unit)
563                    else {
564                        return Some(
565                            InvalidRequestSnafu {
566                                region_id: self.metadata.region_id,
567                                reason: format!("Timestamp out of range: {}", start_sec),
568                            }
569                            .fail(),
570                        );
571                    };
572                    if present.insert(timestamp.value()) {
573                        Some(Ok(timestamp))
574                    } else {
575                        None
576                    }
577                })
578                .collect::<Result<Vec<_>>>()?
579        } else {
580            let ts_primitive = match ts_array.data_type() {
581                DataType::Timestamp(unit, _) => match unit {
582                    arrow::datatypes::TimeUnit::Second => ts_array
583                        .as_any()
584                        .downcast_ref::<TimestampSecondArray>()
585                        .unwrap()
586                        .reinterpret_cast::<Int64Type>(),
587                    arrow::datatypes::TimeUnit::Millisecond => ts_array
588                        .as_any()
589                        .downcast_ref::<TimestampMillisecondArray>()
590                        .unwrap()
591                        .reinterpret_cast::<Int64Type>(),
592                    arrow::datatypes::TimeUnit::Microsecond => ts_array
593                        .as_any()
594                        .downcast_ref::<TimestampMicrosecondArray>()
595                        .unwrap()
596                        .reinterpret_cast::<Int64Type>(),
597                    arrow::datatypes::TimeUnit::Nanosecond => ts_array
598                        .as_any()
599                        .downcast_ref::<TimestampNanosecondArray>()
600                        .unwrap()
601                        .reinterpret_cast::<Int64Type>(),
602                },
603                _ => unreachable!(),
604            };
605
606            ts_primitive
607                .values()
608                .iter()
609                .filter_map(|ts| {
610                    let ts = self.metadata.time_index_type().create_timestamp(*ts);
611                    let Some(bucket_start) = ts
612                        .convert_to(TimeUnit::Second)
613                        .and_then(|ts| ts.align_by_bucket(part_duration_sec))
614                        .and_then(|ts| ts.convert_to(timestamp_unit))
615                    else {
616                        return Some(
617                            InvalidRequestSnafu {
618                                region_id: self.metadata.region_id,
619                                reason: format!("Timestamp out of range: {:?}", ts),
620                            }
621                            .fail(),
622                        );
623                    };
624                    if present.insert(bucket_start.value()) {
625                        Some(Ok(bucket_start))
626                    } else {
627                        None
628                    }
629                })
630                .collect::<Result<Vec<_>>>()?
631        };
632        Ok((matching, missing))
633    }
634
635    /// Returns partition duration, or use default 1day duration is not present.
636    fn part_duration_or_default(&self) -> Duration {
637        self.part_duration
638    }
639
640    /// Write to multiple partitions.
641    fn write_multi_parts(&self, kvs: &KeyValues, parts: &PartitionVec) -> Result<()> {
642        let mut parts_to_write = HashMap::new();
643        let mut missing_parts = HashMap::new();
644        for kv in kvs.iter() {
645            let mut part_found = false;
646            // Safety: We used the timestamp before.
647            let ts = kv.timestamp().try_into_timestamp().unwrap().unwrap();
648            for part in parts {
649                if part.contains_timestamp(ts) {
650                    parts_to_write
651                        .entry(part.time_range.min_timestamp)
652                        .or_insert_with(|| PartitionToWrite {
653                            partition: part.clone(),
654                            key_values: Vec::new(),
655                        })
656                        .key_values
657                        .push(kv);
658                    part_found = true;
659                    break;
660                }
661            }
662
663            if !part_found {
664                // We need to write it to a new part.
665                // Safety: `new()` ensures duration is always Some if we do to this method.
666                let part_duration = self.part_duration_or_default();
667                let part_start =
668                    partition_start_timestamp(ts, part_duration).with_context(|| {
669                        InvalidRequestSnafu {
670                            region_id: self.metadata.region_id,
671                            reason: format!(
672                                "timestamp {ts:?} and bucket {part_duration:?} are out of range"
673                            ),
674                        }
675                    })?;
676                missing_parts
677                    .entry(part_start)
678                    .or_insert_with(Vec::new)
679                    .push(kv);
680            }
681        }
682
683        // Writes rows to existing parts.
684        for part_to_write in parts_to_write.into_values() {
685            for kv in part_to_write.key_values {
686                part_to_write.partition.memtable.write_one(kv)?;
687            }
688        }
689
690        // Creates new parts and writes to them. Acquires the lock to avoid others create
691        // the same partition.
692        let mut inner = self.inner.lock().unwrap();
693        for (part_start, key_values) in missing_parts {
694            let partition = self.get_or_create_time_partition(part_start, &mut inner)?;
695            for kv in key_values {
696                partition.memtable.write_one(kv)?;
697            }
698        }
699
700        Ok(())
701    }
702
703    /// Timeseries count in all time partitions.
704    pub(crate) fn series_count(&self) -> usize {
705        self.inner.lock().unwrap().series_count()
706    }
707}
708
709/// Computes the start timestamp of the partition for `ts`.
710///
711/// It always use bucket size in seconds which should fit all timestamp resolution.
712fn partition_start_timestamp(ts: Timestamp, bucket: Duration) -> Option<Timestamp> {
713    // Safety: We convert it to seconds so it never returns `None`.
714    let ts_sec = ts.convert_to(TimeUnit::Second).unwrap();
715    let bucket_sec: i64 = bucket.as_secs().try_into().ok()?;
716    let start_sec = ts_sec.align_by_bucket(bucket_sec)?;
717    start_sec.convert_to(ts.unit())
718}
719
720#[derive(Debug)]
721struct PartitionsInner {
722    /// All partitions.
723    parts: PartitionVec,
724    /// Next memtable id.
725    next_memtable_id: MemtableId,
726}
727
728impl PartitionsInner {
729    fn new(next_memtable_id: MemtableId) -> Self {
730        Self {
731            parts: Default::default(),
732            next_memtable_id,
733        }
734    }
735
736    fn with_partition(part: TimePartition, next_memtable_id: MemtableId) -> Self {
737        Self {
738            parts: smallvec![part],
739            next_memtable_id,
740        }
741    }
742
743    fn alloc_memtable_id(&mut self) -> MemtableId {
744        let id = self.next_memtable_id;
745        self.next_memtable_id += 1;
746        id
747    }
748
749    pub(crate) fn series_count(&self) -> usize {
750        self.parts
751            .iter()
752            .map(|p| p.memtable.stats().series_count)
753            .sum()
754    }
755}
756
757/// Time range of a partition.
758#[derive(Debug, Clone, Copy)]
759struct PartTimeRange {
760    /// Inclusive min timestamp of rows in the partition.
761    min_timestamp: Timestamp,
762    /// Exclusive max timestamp of rows in the partition.
763    max_timestamp: Timestamp,
764}
765
766impl PartTimeRange {
767    fn from_start_duration(start: Timestamp, duration: Duration) -> Option<Self> {
768        let start_sec = start.convert_to(TimeUnit::Second)?;
769        let end_sec = start_sec.add_duration(duration).ok()?;
770        let min_timestamp = start_sec.convert_to(start.unit())?;
771        let max_timestamp = end_sec.convert_to(start.unit())?;
772
773        Some(Self {
774            min_timestamp,
775            max_timestamp,
776        })
777    }
778
779    /// Returns whether the `ts` belongs to the partition.
780    fn contains_timestamp(&self, ts: Timestamp) -> bool {
781        self.min_timestamp <= ts && ts < self.max_timestamp
782    }
783}
784
785struct PartitionToWrite<'a> {
786    partition: TimePartition,
787    key_values: Vec<KeyValue<'a>>,
788}
789
790#[cfg(test)]
791mod tests {
792    use std::sync::Arc;
793
794    use api::v1::SemanticType;
795    use datatypes::arrow::array::{ArrayRef, StringArray, TimestampMillisecondArray};
796    use datatypes::arrow::datatypes::{DataType, Field, Schema};
797    use datatypes::arrow::record_batch::RecordBatch;
798    use datatypes::prelude::ConcreteDataType;
799    use datatypes::schema::ColumnSchema;
800    use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
801    use store_api::storage::SequenceNumber;
802
803    use super::*;
804    use crate::memtable::partition_tree::PartitionTreeMemtableBuilder;
805    use crate::memtable::time_series::TimeSeriesMemtableBuilder;
806    use crate::test_util::memtable_util::{self, collect_iter_timestamps};
807
808    #[test]
809    fn test_no_duration() {
810        let metadata = memtable_util::metadata_for_test();
811        let builder = Arc::new(PartitionTreeMemtableBuilder::default());
812        let partitions = TimePartitions::new(metadata.clone(), builder, 0, None);
813        assert_eq!(0, partitions.num_partitions());
814        assert!(partitions.is_empty());
815
816        let kvs = memtable_util::build_key_values(
817            &metadata,
818            "hello".to_string(),
819            0,
820            &[1000, 3000, 7000, 5000, 6000],
821            0, // sequence 0, 1, 2, 3, 4
822        );
823        partitions.write(&kvs).unwrap();
824
825        assert_eq!(1, partitions.num_partitions());
826        assert!(!partitions.is_empty());
827        let mut memtables = Vec::new();
828        partitions.list_memtables(&mut memtables);
829        assert_eq!(0, memtables[0].id());
830
831        let iter = memtables[0].iter(None, None, None).unwrap();
832        let timestamps = collect_iter_timestamps(iter);
833        assert_eq!(&[1000, 3000, 5000, 6000, 7000], &timestamps[..]);
834    }
835
836    #[test]
837    fn test_write_single_part() {
838        let metadata = memtable_util::metadata_for_test();
839        let builder = Arc::new(PartitionTreeMemtableBuilder::default());
840        let partitions =
841            TimePartitions::new(metadata.clone(), builder, 0, Some(Duration::from_secs(10)));
842        assert_eq!(0, partitions.num_partitions());
843
844        let kvs = memtable_util::build_key_values(
845            &metadata,
846            "hello".to_string(),
847            0,
848            &[5000, 2000, 0],
849            0, // sequence 0, 1, 2
850        );
851        // It should creates a new partition.
852        partitions.write(&kvs).unwrap();
853        assert_eq!(1, partitions.num_partitions());
854        assert!(!partitions.is_empty());
855
856        let kvs = memtable_util::build_key_values(
857            &metadata,
858            "hello".to_string(),
859            0,
860            &[3000, 7000, 4000],
861            3, // sequence 3, 4, 5
862        );
863        // Still writes to the same partition.
864        partitions.write(&kvs).unwrap();
865        assert_eq!(1, partitions.num_partitions());
866
867        let mut memtables = Vec::new();
868        partitions.list_memtables(&mut memtables);
869        let iter = memtables[0].iter(None, None, None).unwrap();
870        let timestamps = collect_iter_timestamps(iter);
871        assert_eq!(&[0, 2000, 3000, 4000, 5000, 7000], &timestamps[..]);
872        let parts = partitions.list_partitions();
873        assert_eq!(
874            Timestamp::new_millisecond(0),
875            parts[0].time_range.min_timestamp
876        );
877        assert_eq!(
878            Timestamp::new_millisecond(10000),
879            parts[0].time_range.max_timestamp
880        );
881    }
882
883    #[cfg(test)]
884    fn new_multi_partitions(metadata: &RegionMetadataRef) -> TimePartitions {
885        let builder = Arc::new(PartitionTreeMemtableBuilder::default());
886        let partitions =
887            TimePartitions::new(metadata.clone(), builder, 0, Some(Duration::from_secs(5)));
888        assert_eq!(0, partitions.num_partitions());
889
890        let kvs = memtable_util::build_key_values(
891            metadata,
892            "hello".to_string(),
893            0,
894            &[2000, 0],
895            0, // sequence 0, 1
896        );
897        // It should creates a new partition.
898        partitions.write(&kvs).unwrap();
899        assert_eq!(1, partitions.num_partitions());
900        assert!(!partitions.is_empty());
901
902        let kvs = memtable_util::build_key_values(
903            metadata,
904            "hello".to_string(),
905            0,
906            &[3000, 7000, 4000, 5000],
907            2, // sequence 2, 3, 4, 5
908        );
909        // Writes 2 rows to the old partition and 1 row to a new partition.
910        partitions.write(&kvs).unwrap();
911        assert_eq!(2, partitions.num_partitions());
912
913        partitions
914    }
915
916    #[test]
917    fn test_write_multi_parts() {
918        let metadata = memtable_util::metadata_for_test();
919        let partitions = new_multi_partitions(&metadata);
920
921        let parts = partitions.list_partitions();
922        let iter = parts[0].memtable.iter(None, None, None).unwrap();
923        let timestamps = collect_iter_timestamps(iter);
924        assert_eq!(0, parts[0].memtable.id());
925        assert_eq!(
926            Timestamp::new_millisecond(0),
927            parts[0].time_range.min_timestamp
928        );
929        assert_eq!(
930            Timestamp::new_millisecond(5000),
931            parts[0].time_range.max_timestamp
932        );
933        assert_eq!(&[0, 2000, 3000, 4000], &timestamps[..]);
934        let iter = parts[1].memtable.iter(None, None, None).unwrap();
935        assert_eq!(1, parts[1].memtable.id());
936        let timestamps = collect_iter_timestamps(iter);
937        assert_eq!(&[5000, 7000], &timestamps[..]);
938        assert_eq!(
939            Timestamp::new_millisecond(5000),
940            parts[1].time_range.min_timestamp
941        );
942        assert_eq!(
943            Timestamp::new_millisecond(10000),
944            parts[1].time_range.max_timestamp
945        );
946    }
947
948    #[test]
949    fn test_new_with_part_duration() {
950        let metadata = memtable_util::metadata_for_test();
951        let builder = Arc::new(PartitionTreeMemtableBuilder::default());
952        let partitions = TimePartitions::new(metadata.clone(), builder.clone(), 0, None);
953
954        let new_parts = partitions.new_with_part_duration(Some(Duration::from_secs(5)), None);
955        assert_eq!(Duration::from_secs(5), new_parts.part_duration());
956        assert_eq!(0, new_parts.next_memtable_id());
957
958        // Won't update the duration if it's None.
959        let new_parts = new_parts.new_with_part_duration(None, None);
960        assert_eq!(Duration::from_secs(5), new_parts.part_duration());
961        // Don't need to create new memtables.
962        assert_eq!(0, new_parts.next_memtable_id());
963
964        let new_parts = new_parts.new_with_part_duration(Some(Duration::from_secs(10)), None);
965        assert_eq!(Duration::from_secs(10), new_parts.part_duration());
966        // Don't need to create new memtables.
967        assert_eq!(0, new_parts.next_memtable_id());
968
969        let builder = Arc::new(PartitionTreeMemtableBuilder::default());
970        let partitions = TimePartitions::new(metadata.clone(), builder.clone(), 0, None);
971        // Need to build a new memtable as duration is still None.
972        let new_parts = partitions.new_with_part_duration(None, None);
973        assert_eq!(INITIAL_TIME_WINDOW, new_parts.part_duration());
974        assert_eq!(0, new_parts.next_memtable_id());
975    }
976
977    #[test]
978    fn test_fork_empty() {
979        let metadata = memtable_util::metadata_for_test();
980        let builder = Arc::new(PartitionTreeMemtableBuilder::default());
981        let partitions = TimePartitions::new(metadata.clone(), builder, 0, None);
982        partitions.freeze().unwrap();
983        let new_parts = partitions.fork(&metadata, None);
984        assert_eq!(INITIAL_TIME_WINDOW, new_parts.part_duration());
985        assert!(new_parts.list_partitions().is_empty());
986        assert_eq!(0, new_parts.next_memtable_id());
987
988        new_parts.freeze().unwrap();
989        let new_parts = new_parts.fork(&metadata, Some(Duration::from_secs(5)));
990        assert_eq!(Duration::from_secs(5), new_parts.part_duration());
991        assert!(new_parts.list_partitions().is_empty());
992        assert_eq!(0, new_parts.next_memtable_id());
993
994        new_parts.freeze().unwrap();
995        let new_parts = new_parts.fork(&metadata, None);
996        // Won't update the duration.
997        assert_eq!(Duration::from_secs(5), new_parts.part_duration());
998        assert!(new_parts.list_partitions().is_empty());
999        assert_eq!(0, new_parts.next_memtable_id());
1000
1001        new_parts.freeze().unwrap();
1002        let new_parts = new_parts.fork(&metadata, Some(Duration::from_secs(10)));
1003        assert_eq!(Duration::from_secs(10), new_parts.part_duration());
1004        assert!(new_parts.list_partitions().is_empty());
1005        assert_eq!(0, new_parts.next_memtable_id());
1006    }
1007
1008    #[test]
1009    fn test_fork_non_empty_none() {
1010        let metadata = memtable_util::metadata_for_test();
1011        let partitions = new_multi_partitions(&metadata);
1012        partitions.freeze().unwrap();
1013
1014        // Won't update the duration.
1015        let new_parts = partitions.fork(&metadata, None);
1016        assert!(new_parts.is_empty());
1017        assert_eq!(Duration::from_secs(5), new_parts.part_duration());
1018        assert_eq!(2, new_parts.list_partitions()[0].memtable.id());
1019        assert_eq!(3, new_parts.next_memtable_id());
1020
1021        // Although we don't fork a memtable multiple times, we still add a test for it.
1022        let new_parts = partitions.fork(&metadata, Some(Duration::from_secs(10)));
1023        assert!(new_parts.is_empty());
1024        assert_eq!(Duration::from_secs(10), new_parts.part_duration());
1025        assert_eq!(3, new_parts.list_partitions()[0].memtable.id());
1026        assert_eq!(4, new_parts.next_memtable_id());
1027    }
1028
1029    #[test]
1030    fn test_find_partitions_by_time_range() {
1031        let metadata = memtable_util::metadata_for_test();
1032        let builder = Arc::new(PartitionTreeMemtableBuilder::default());
1033
1034        // Case 1: No time range partitioning
1035        let partitions = TimePartitions::new(metadata.clone(), builder.clone(), 0, None);
1036        let parts = partitions.list_partitions();
1037        let (matching, missing) = partitions
1038            .find_partitions_by_time_range(
1039                &(Arc::new(TimestampMillisecondArray::from_iter_values(1000..=2000)) as ArrayRef),
1040                &parts,
1041                Timestamp::new_millisecond(1000),
1042                Timestamp::new_millisecond(2000),
1043            )
1044            .unwrap();
1045        assert_eq!(matching.len(), 0);
1046        assert_eq!(missing.len(), 1);
1047        assert_eq!(missing[0], Timestamp::new_millisecond(0));
1048
1049        // Case 2: With time range partitioning
1050        let partitions = TimePartitions::new(
1051            metadata.clone(),
1052            builder.clone(),
1053            0,
1054            Some(Duration::from_secs(5)),
1055        );
1056
1057        // Create two existing partitions: [0, 5000) and [5000, 10000)
1058        let kvs =
1059            memtable_util::build_key_values(&metadata, "hello".to_string(), 0, &[2000, 4000], 0);
1060        partitions.write(&kvs).unwrap();
1061        let kvs =
1062            memtable_util::build_key_values(&metadata, "hello".to_string(), 0, &[7000, 8000], 2);
1063        partitions.write(&kvs).unwrap();
1064
1065        let parts = partitions.list_partitions();
1066        assert_eq!(2, parts.len());
1067
1068        // Test case 2a: Query fully within existing partition
1069        let (matching, missing) = partitions
1070            .find_partitions_by_time_range(
1071                &(Arc::new(TimestampMillisecondArray::from_iter_values(2000..=4000)) as ArrayRef),
1072                &parts,
1073                Timestamp::new_millisecond(2000),
1074                Timestamp::new_millisecond(4000),
1075            )
1076            .unwrap();
1077        assert_eq!(matching.len(), 1);
1078        assert!(missing.is_empty());
1079        assert_eq!(matching[0].time_range.min_timestamp.value(), 0);
1080
1081        // Test case 2b: Query spanning multiple existing partitions
1082        let (matching, missing) = partitions
1083            .find_partitions_by_time_range(
1084                &(Arc::new(TimestampMillisecondArray::from_iter_values(3000..=8000)) as ArrayRef),
1085                &parts,
1086                Timestamp::new_millisecond(3000),
1087                Timestamp::new_millisecond(8000),
1088            )
1089            .unwrap();
1090        assert_eq!(matching.len(), 2);
1091        assert!(missing.is_empty());
1092        assert_eq!(matching[0].time_range.min_timestamp.value(), 0);
1093        assert_eq!(matching[1].time_range.min_timestamp.value(), 5000);
1094
1095        // Test case 2c: Query requiring new partition
1096        let (matching, missing) = partitions
1097            .find_partitions_by_time_range(
1098                &(Arc::new(TimestampMillisecondArray::from_iter_values(12000..=13000)) as ArrayRef),
1099                &parts,
1100                Timestamp::new_millisecond(12000),
1101                Timestamp::new_millisecond(13000),
1102            )
1103            .unwrap();
1104        assert!(matching.is_empty());
1105        assert_eq!(missing.len(), 1);
1106        assert_eq!(missing[0].value(), 10000);
1107
1108        // Test case 2d: Query partially overlapping existing partition
1109        let (matching, missing) = partitions
1110            .find_partitions_by_time_range(
1111                &(Arc::new(TimestampMillisecondArray::from_iter_values(4000..=6000)) as ArrayRef),
1112                &parts,
1113                Timestamp::new_millisecond(4000),
1114                Timestamp::new_millisecond(6000),
1115            )
1116            .unwrap();
1117        assert_eq!(matching.len(), 2);
1118        assert!(missing.is_empty());
1119        assert_eq!(matching[0].time_range.min_timestamp.value(), 0);
1120        assert_eq!(matching[1].time_range.min_timestamp.value(), 5000);
1121
1122        // Test case 2e: Corner case
1123        let (matching, missing) = partitions
1124            .find_partitions_by_time_range(
1125                &(Arc::new(TimestampMillisecondArray::from_iter_values(4999..=5000)) as ArrayRef),
1126                &parts,
1127                Timestamp::new_millisecond(4999),
1128                Timestamp::new_millisecond(5000),
1129            )
1130            .unwrap();
1131        assert_eq!(matching.len(), 2);
1132        assert!(missing.is_empty());
1133        assert_eq!(matching[0].time_range.min_timestamp.value(), 0);
1134        assert_eq!(matching[1].time_range.min_timestamp.value(), 5000);
1135
1136        // Test case 2f: Corner case with
1137        let (matching, missing) = partitions
1138            .find_partitions_by_time_range(
1139                &(Arc::new(TimestampMillisecondArray::from_iter_values(9999..=10000)) as ArrayRef),
1140                &parts,
1141                Timestamp::new_millisecond(9999),
1142                Timestamp::new_millisecond(10000),
1143            )
1144            .unwrap();
1145        assert_eq!(matching.len(), 1);
1146        assert_eq!(1, missing.len());
1147        assert_eq!(matching[0].time_range.min_timestamp.value(), 5000);
1148        assert_eq!(missing[0].value(), 10000);
1149
1150        // Test case 2g: Cross 0
1151        let (matching, missing) = partitions
1152            .find_partitions_by_time_range(
1153                &(Arc::new(TimestampMillisecondArray::from_iter_values(-1000..=1000)) as ArrayRef),
1154                &parts,
1155                Timestamp::new_millisecond(-1000),
1156                Timestamp::new_millisecond(1000),
1157            )
1158            .unwrap();
1159        assert_eq!(matching.len(), 1);
1160        assert_eq!(matching[0].time_range.min_timestamp.value(), 0);
1161        assert_eq!(1, missing.len());
1162        assert_eq!(missing[0].value(), -5000);
1163
1164        // Test case 3: sparse data
1165        let (matching, missing) = partitions
1166            .find_partitions_by_time_range(
1167                &(Arc::new(TimestampMillisecondArray::from(vec![
1168                    -100000000000,
1169                    0,
1170                    100000000000,
1171                ])) as ArrayRef),
1172                &parts,
1173                Timestamp::new_millisecond(-100000000000),
1174                Timestamp::new_millisecond(100000000000),
1175            )
1176            .unwrap();
1177        assert_eq!(2, matching.len());
1178        assert_eq!(matching[0].time_range.min_timestamp.value(), 0);
1179        assert_eq!(matching[1].time_range.min_timestamp.value(), 5000);
1180        assert_eq!(2, missing.len());
1181        assert_eq!(missing[0].value(), -100000000000);
1182        assert_eq!(missing[1].value(), 100000000000);
1183    }
1184
1185    fn build_part(ts: &[i64], sequence: SequenceNumber) -> BulkPart {
1186        let schema = Arc::new(Schema::new(vec![
1187            Field::new(
1188                "ts",
1189                DataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
1190                false,
1191            ),
1192            Field::new("val", DataType::Utf8, true),
1193        ]));
1194        let ts_data = ts.to_vec();
1195        let ts_array = Arc::new(TimestampMillisecondArray::from(ts_data));
1196        let val_array = Arc::new(StringArray::from_iter_values(
1197            ts.iter().map(|v| v.to_string()),
1198        ));
1199        let batch = RecordBatch::try_new(
1200            schema,
1201            vec![ts_array.clone() as ArrayRef, val_array.clone() as ArrayRef],
1202        )
1203        .unwrap();
1204        let max_ts = ts.iter().max().copied().unwrap();
1205        let min_ts = ts.iter().min().copied().unwrap();
1206        BulkPart {
1207            batch,
1208            max_timestamp: max_ts,
1209            min_timestamp: min_ts,
1210            sequence,
1211            timestamp_index: 0,
1212            raw_data: None,
1213        }
1214    }
1215
1216    #[test]
1217    fn test_write_bulk() {
1218        let mut metadata_builder = RegionMetadataBuilder::new(0.into());
1219        metadata_builder
1220            .push_column_metadata(ColumnMetadata {
1221                column_schema: ColumnSchema::new(
1222                    "ts",
1223                    ConcreteDataType::timestamp_millisecond_datatype(),
1224                    false,
1225                ),
1226                semantic_type: SemanticType::Timestamp,
1227                column_id: 0,
1228            })
1229            .push_column_metadata(ColumnMetadata {
1230                column_schema: ColumnSchema::new("val", ConcreteDataType::string_datatype(), false),
1231                semantic_type: SemanticType::Field,
1232                column_id: 1,
1233            })
1234            .primary_key(vec![]);
1235        let metadata = Arc::new(metadata_builder.build().unwrap());
1236
1237        let builder = Arc::new(TimeSeriesMemtableBuilder::default());
1238        let partitions = TimePartitions::new(
1239            metadata.clone(),
1240            builder.clone(),
1241            0,
1242            Some(Duration::from_secs(5)),
1243        );
1244
1245        // Test case 1: Write to single partition
1246        partitions
1247            .write_bulk(build_part(&[1000, 2000, 3000], 0))
1248            .unwrap();
1249
1250        let parts = partitions.list_partitions();
1251        assert_eq!(1, parts.len());
1252        let iter = parts[0].memtable.iter(None, None, None).unwrap();
1253        let timestamps = collect_iter_timestamps(iter);
1254        assert_eq!(&[1000, 2000, 3000], &timestamps[..]);
1255
1256        // Test case 2: Write across multiple existing partitions
1257        partitions
1258            .write_bulk(build_part(&[4000, 5000, 6000], 1))
1259            .unwrap();
1260        let parts = partitions.list_partitions();
1261        assert_eq!(2, parts.len());
1262        // Check first partition [0, 5000)
1263        let iter = parts[0].memtable.iter(None, None, None).unwrap();
1264        let timestamps = collect_iter_timestamps(iter);
1265        assert_eq!(&[1000, 2000, 3000, 4000], &timestamps[..]);
1266        // Check second partition [5000, 10000)
1267        let iter = parts[1].memtable.iter(None, None, None).unwrap();
1268        let timestamps = collect_iter_timestamps(iter);
1269        assert_eq!(&[5000, 6000], &timestamps[..]);
1270
1271        // Test case 3: Write requiring new partition
1272        partitions
1273            .write_bulk(build_part(&[11000, 12000], 3))
1274            .unwrap();
1275
1276        let parts = partitions.list_partitions();
1277        assert_eq!(3, parts.len());
1278
1279        // Check new partition [10000, 15000)
1280        let iter = parts[2].memtable.iter(None, None, None).unwrap();
1281        let timestamps = collect_iter_timestamps(iter);
1282        assert_eq!(&[11000, 12000], &timestamps[..]);
1283
1284        // Test case 4: Write with no time range partitioning
1285        let partitions = TimePartitions::new(metadata.clone(), builder, 3, None);
1286
1287        partitions
1288            .write_bulk(build_part(&[1000, 5000, 9000], 4))
1289            .unwrap();
1290
1291        let parts = partitions.list_partitions();
1292        assert_eq!(1, parts.len());
1293        let iter = parts[0].memtable.iter(None, None, None).unwrap();
1294        let timestamps = collect_iter_timestamps(iter);
1295        assert_eq!(&[1000, 5000, 9000], &timestamps[..]);
1296    }
1297
1298    #[test]
1299    fn test_split_record_batch() {
1300        let schema = Arc::new(Schema::new(vec![
1301            Field::new(
1302                "ts",
1303                DataType::Timestamp(TimeUnit::Millisecond.as_arrow_time_unit(), None),
1304                false,
1305            ),
1306            Field::new("val", DataType::Utf8, true),
1307        ]));
1308
1309        let ts_array = Arc::new(TimestampMillisecondArray::from(vec![
1310            1000, 2000, 5000, 7000, 8000,
1311        ]));
1312        let val_array = Arc::new(StringArray::from(vec!["a", "b", "c", "d", "e"]));
1313        let batch = RecordBatch::try_new(
1314            schema.clone(),
1315            vec![ts_array as ArrayRef, val_array as ArrayRef],
1316        )
1317        .unwrap();
1318
1319        let part = BulkPart {
1320            batch,
1321            max_timestamp: 8000,
1322            min_timestamp: 1000,
1323            sequence: 0,
1324            timestamp_index: 0,
1325            raw_data: None,
1326        };
1327
1328        let result = filter_record_batch(&part, 1000, 2000).unwrap();
1329        assert!(result.is_some());
1330        let filtered = result.unwrap();
1331        assert_eq!(filtered.num_rows(), 1);
1332        assert_eq!(filtered.min_timestamp, 1000);
1333        assert_eq!(filtered.max_timestamp, 1000);
1334
1335        // Test splitting with range [3000, 6000)
1336        let result = filter_record_batch(&part, 3000, 6000).unwrap();
1337        assert!(result.is_some());
1338        let filtered = result.unwrap();
1339        assert_eq!(filtered.num_rows(), 1);
1340        assert_eq!(filtered.min_timestamp, 5000);
1341        assert_eq!(filtered.max_timestamp, 5000);
1342
1343        // Test splitting with range that includes no points
1344        let result = filter_record_batch(&part, 3000, 4000).unwrap();
1345        assert!(result.is_none());
1346
1347        // Test splitting with range that includes all points
1348        let result = filter_record_batch(&part, 0, 9000).unwrap();
1349        assert!(result.is_some());
1350        let filtered = result.unwrap();
1351        assert_eq!(filtered.num_rows(), 5);
1352        assert_eq!(filtered.min_timestamp, 1000);
1353        assert_eq!(filtered.max_timestamp, 8000);
1354    }
1355}