mito2/memtable/
time_partition.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Partitions memtables by time.
16
17use std::collections::{HashMap, HashSet};
18use std::sync::{Arc, Mutex, MutexGuard};
19use std::time::Duration;
20
21use common_telemetry::debug;
22use common_time::Timestamp;
23use common_time::timestamp::TimeUnit;
24use common_time::timestamp_millis::BucketAligned;
25use datatypes::arrow;
26use datatypes::arrow::array::{
27    ArrayRef, BooleanArray, RecordBatch, RecordBatchOptions, TimestampMicrosecondArray,
28    TimestampMillisecondArray, TimestampNanosecondArray, TimestampSecondArray,
29};
30use datatypes::arrow::buffer::{BooleanBuffer, MutableBuffer};
31use datatypes::arrow::datatypes::{DataType, Int64Type, SchemaRef};
32use mito_codec::key_values::KeyValue;
33use mito_codec::row_converter::{PrimaryKeyCodec, build_primary_key_codec};
34use smallvec::{SmallVec, smallvec};
35use snafu::{OptionExt, ResultExt};
36use store_api::metadata::RegionMetadataRef;
37
38use crate::error;
39use crate::error::{InvalidRequestSnafu, Result};
40use crate::memtable::bulk::part::{BulkPart, BulkPartConverter};
41use crate::memtable::version::SmallMemtableVec;
42use crate::memtable::{KeyValues, MemtableBuilderRef, MemtableId, MemtableRef};
43use crate::sst::{FlatSchemaOptions, to_flat_sst_arrow_schema};
44
45/// Initial time window if not specified.
46const INITIAL_TIME_WINDOW: Duration = Duration::from_days(1);
47
48/// A partition holds rows with timestamps between `[min, max)`.
49#[derive(Debug, Clone)]
50pub struct TimePartition {
51    /// Memtable of the partition.
52    memtable: MemtableRef,
53    /// Time range of the partition. `min` is inclusive and `max` is exclusive.
54    time_range: PartTimeRange,
55}
56
57impl TimePartition {
58    /// Returns whether the `ts` belongs to the partition.
59    fn contains_timestamp(&self, ts: Timestamp) -> bool {
60        self.time_range.contains_timestamp(ts)
61    }
62
63    /// Write rows to the part.
64    fn write(&self, kvs: &KeyValues) -> Result<()> {
65        self.memtable.write(kvs)
66    }
67
68    /// Writes a record batch to memtable.
69    fn write_record_batch(&self, rb: BulkPart) -> Result<()> {
70        self.memtable.write_bulk(rb)
71    }
72
73    /// Write a partial [BulkPart] according to [TimePartition::time_range].
74    fn write_record_batch_partial(&self, part: &BulkPart) -> Result<()> {
75        let Some(filtered) = filter_record_batch(
76            part,
77            self.time_range.min_timestamp.value(),
78            self.time_range.max_timestamp.value(),
79        )?
80        else {
81            return Ok(());
82        };
83        self.write_record_batch(filtered)
84    }
85}
86
87macro_rules! create_filter_buffer {
88    ($ts_array:expr, $min:expr, $max:expr) => {{
89        let len = $ts_array.len();
90        let mut buffer = MutableBuffer::new(len.div_ceil(64) * 8);
91
92        let f = |idx: usize| -> bool {
93            // SAFETY: we only iterate the array within index bound.
94            unsafe {
95                let val = $ts_array.value_unchecked(idx);
96                val >= $min && val < $max
97            }
98        };
99
100        let chunks = len / 64;
101        let remainder = len % 64;
102
103        for chunk in 0..chunks {
104            let mut packed = 0;
105            for bit_idx in 0..64 {
106                let i = bit_idx + chunk * 64;
107                packed |= (f(i) as u64) << bit_idx;
108            }
109            // SAFETY: Already allocated sufficient capacity
110            unsafe { buffer.push_unchecked(packed) }
111        }
112
113        if remainder != 0 {
114            let mut packed = 0;
115            for bit_idx in 0..remainder {
116                let i = bit_idx + chunks * 64;
117                packed |= (f(i) as u64) << bit_idx;
118            }
119            // SAFETY: Already allocated sufficient capacity
120            unsafe { buffer.push_unchecked(packed) }
121        }
122
123        BooleanArray::new(BooleanBuffer::new(buffer.into(), 0, len), None)
124    }};
125}
126
127macro_rules! handle_timestamp_array {
128    ($ts_array:expr, $array_type:ty, $min:expr, $max:expr) => {{
129        let ts_array = $ts_array.as_any().downcast_ref::<$array_type>().unwrap();
130        let filter = create_filter_buffer!(ts_array, $min, $max);
131
132        let res = arrow::compute::filter(ts_array, &filter).context(error::ComputeArrowSnafu)?;
133        if res.is_empty() {
134            return Ok(None);
135        }
136
137        let i64array = res.as_any().downcast_ref::<$array_type>().unwrap();
138        // safety: we've checked res is not empty
139        let max_ts = arrow::compute::max(i64array).unwrap();
140        let min_ts = arrow::compute::min(i64array).unwrap();
141
142        (res, filter, min_ts, max_ts)
143    }};
144}
145
146/// Filters the given part according to min (inclusive) and max (exclusive) timestamp range.
147/// Returns [None] if no matching rows.
148pub fn filter_record_batch(part: &BulkPart, min: i64, max: i64) -> Result<Option<BulkPart>> {
149    let ts_array = part.timestamps();
150    let (ts_array, filter, min_ts, max_ts) = match ts_array.data_type() {
151        DataType::Timestamp(unit, _) => match unit {
152            arrow::datatypes::TimeUnit::Second => {
153                handle_timestamp_array!(ts_array, TimestampSecondArray, min, max)
154            }
155            arrow::datatypes::TimeUnit::Millisecond => {
156                handle_timestamp_array!(ts_array, TimestampMillisecondArray, min, max)
157            }
158            arrow::datatypes::TimeUnit::Microsecond => {
159                handle_timestamp_array!(ts_array, TimestampMicrosecondArray, min, max)
160            }
161            arrow::datatypes::TimeUnit::Nanosecond => {
162                handle_timestamp_array!(ts_array, TimestampNanosecondArray, min, max)
163            }
164        },
165        _ => {
166            unreachable!("Got data type: {:?}", ts_array.data_type());
167        }
168    };
169
170    let num_rows = ts_array.len();
171    let arrays = part
172        .batch
173        .columns()
174        .iter()
175        .enumerate()
176        .map(|(index, array)| {
177            if index == part.timestamp_index {
178                Ok(ts_array.clone())
179            } else {
180                arrow::compute::filter(&array, &filter).context(error::ComputeArrowSnafu)
181            }
182        })
183        .collect::<Result<Vec<_>>>()?;
184    let batch = RecordBatch::try_new_with_options(
185        part.batch.schema(),
186        arrays,
187        &RecordBatchOptions::default().with_row_count(Some(num_rows)),
188    )
189    .context(error::NewRecordBatchSnafu)?;
190    Ok(Some(BulkPart {
191        batch,
192        max_timestamp: max_ts,
193        min_timestamp: min_ts,
194        sequence: part.sequence,
195        timestamp_index: part.timestamp_index,
196        raw_data: None,
197    }))
198}
199
200type PartitionVec = SmallVec<[TimePartition; 2]>;
201
202/// Partitions.
203#[derive(Debug)]
204pub struct TimePartitions {
205    /// Mutable data of partitions.
206    inner: Mutex<PartitionsInner>,
207    /// Duration of a partition.
208    part_duration: Duration,
209    /// Metadata of the region.
210    metadata: RegionMetadataRef,
211    /// Builder of memtables.
212    builder: MemtableBuilderRef,
213    /// Primary key encoder.
214    primary_key_codec: Arc<dyn PrimaryKeyCodec>,
215
216    /// Cached schema for bulk insert.
217    /// This field is Some if the memtable uses bulk insert.
218    bulk_schema: Option<SchemaRef>,
219}
220
221pub type TimePartitionsRef = Arc<TimePartitions>;
222
223impl TimePartitions {
224    /// Returns a new empty partition list with optional duration.
225    pub fn new(
226        metadata: RegionMetadataRef,
227        builder: MemtableBuilderRef,
228        next_memtable_id: MemtableId,
229        part_duration: Option<Duration>,
230    ) -> Self {
231        let inner = PartitionsInner::new(next_memtable_id);
232        let primary_key_codec = build_primary_key_codec(&metadata);
233        let bulk_schema = builder.use_bulk_insert(&metadata).then(|| {
234            let opts = FlatSchemaOptions::from_encoding(metadata.primary_key_encoding);
235            to_flat_sst_arrow_schema(&metadata, &opts)
236        });
237
238        Self {
239            inner: Mutex::new(inner),
240            part_duration: part_duration.unwrap_or(INITIAL_TIME_WINDOW),
241            metadata,
242            builder,
243            primary_key_codec,
244            bulk_schema,
245        }
246    }
247
248    /// Write key values to memtables.
249    ///
250    /// It creates new partitions if necessary.
251    pub fn write(&self, kvs: &KeyValues) -> Result<()> {
252        if let Some(bulk_schema) = &self.bulk_schema {
253            let mut converter = BulkPartConverter::new(
254                &self.metadata,
255                bulk_schema.clone(),
256                kvs.num_rows(),
257                self.primary_key_codec.clone(),
258                // Always store primary keys for bulk mode.
259                true,
260            );
261            converter.append_key_values(kvs)?;
262            let part = converter.convert()?;
263
264            return self.write_bulk_inner(part);
265        }
266
267        // Get all parts.
268        let parts = self.list_partitions();
269
270        // Checks whether all rows belongs to a single part. Checks in reverse order as we usually
271        // put to latest part.
272        for part in parts.iter().rev() {
273            let mut all_in_partition = true;
274            for kv in kvs.iter() {
275                // Safety: We checked the schema in the write request.
276                let ts = kv.timestamp().try_into_timestamp().unwrap().unwrap();
277                if !part.contains_timestamp(ts) {
278                    all_in_partition = false;
279                    break;
280                }
281            }
282            if !all_in_partition {
283                continue;
284            }
285
286            // We can write all rows to this part.
287            return part.write(kvs);
288        }
289
290        // Slow path: We have to split kvs by partitions.
291        self.write_multi_parts(kvs, &parts)
292    }
293
294    /// Writes a bulk part.
295    pub fn write_bulk(&self, part: BulkPart) -> Result<()> {
296        // Convert the bulk part if bulk_schema is Some
297        let part = if let Some(bulk_schema) = &self.bulk_schema {
298            let converted = crate::memtable::bulk::part::convert_bulk_part(
299                part,
300                &self.metadata,
301                self.primary_key_codec.clone(),
302                bulk_schema.clone(),
303                // Always store primary keys for bulk mode.
304                true,
305            )?;
306            match converted {
307                Some(p) => p,
308                None => return Ok(()),
309            }
310        } else {
311            part
312        };
313
314        self.write_bulk_inner(part)
315    }
316
317    /// Writes a bulk part without converting.
318    fn write_bulk_inner(&self, part: BulkPart) -> Result<()> {
319        let time_type = self
320            .metadata
321            .time_index_column()
322            .column_schema
323            .data_type
324            .as_timestamp()
325            .unwrap();
326
327        // Get all parts.
328        let parts = self.list_partitions();
329        let (matching_parts, missing_parts) = self.find_partitions_by_time_range(
330            part.timestamps(),
331            &parts,
332            time_type.create_timestamp(part.min_timestamp),
333            time_type.create_timestamp(part.max_timestamp),
334        )?;
335
336        if matching_parts.len() == 1 && missing_parts.is_empty() {
337            // fast path: all timestamps fall in one time partition.
338            return matching_parts[0].write_record_batch(part);
339        }
340
341        for matching in matching_parts {
342            matching.write_record_batch_partial(&part)?
343        }
344
345        for missing in missing_parts {
346            let new_part = {
347                let mut inner = self.inner.lock().unwrap();
348                self.get_or_create_time_partition(missing, &mut inner)?
349            };
350            new_part.write_record_batch_partial(&part)?;
351        }
352        Ok(())
353    }
354
355    // Creates or gets parts with given start timestamp.
356    // Acquires the lock to avoid others create the same partition.
357    fn get_or_create_time_partition(
358        &self,
359        part_start: Timestamp,
360        inner: &mut MutexGuard<PartitionsInner>,
361    ) -> Result<TimePartition> {
362        let part_pos = match inner
363            .parts
364            .iter()
365            .position(|part| part.time_range.min_timestamp == part_start)
366        {
367            Some(pos) => pos,
368            None => {
369                let range = PartTimeRange::from_start_duration(part_start, self.part_duration)
370                    .with_context(|| InvalidRequestSnafu {
371                        region_id: self.metadata.region_id,
372                        reason: format!(
373                            "Partition time range for {part_start:?} is out of bound, bucket size: {:?}", self.part_duration
374                        ),
375                    })?;
376                let memtable = self
377                    .builder
378                    .build(inner.alloc_memtable_id(), &self.metadata);
379                debug!(
380                    "Create time partition {:?} for region {}, duration: {:?}, memtable_id: {}, parts_total: {}, metadata: {:?}",
381                    range,
382                    self.metadata.region_id,
383                    self.part_duration,
384                    memtable.id(),
385                    inner.parts.len() + 1,
386                    self.metadata,
387                );
388                let pos = inner.parts.len();
389                inner.parts.push(TimePartition {
390                    memtable,
391                    time_range: range,
392                });
393                pos
394            }
395        };
396        Ok(inner.parts[part_pos].clone())
397    }
398
399    /// Append memtables in partitions to `memtables`.
400    pub fn list_memtables(&self, memtables: &mut Vec<MemtableRef>) {
401        let inner = self.inner.lock().unwrap();
402        memtables.extend(inner.parts.iter().map(|part| part.memtable.clone()));
403    }
404
405    /// Returns the number of partitions.
406    pub fn num_partitions(&self) -> usize {
407        let inner = self.inner.lock().unwrap();
408        inner.parts.len()
409    }
410
411    /// Returns true if all memtables are empty.
412    pub fn is_empty(&self) -> bool {
413        let inner = self.inner.lock().unwrap();
414        inner.parts.iter().all(|part| part.memtable.is_empty())
415    }
416
417    /// Freezes all memtables.
418    pub fn freeze(&self) -> Result<()> {
419        let inner = self.inner.lock().unwrap();
420        for part in &*inner.parts {
421            part.memtable.freeze()?;
422        }
423        Ok(())
424    }
425
426    /// Forks latest partition and updates the partition duration if `part_duration` is Some.
427    pub fn fork(&self, metadata: &RegionMetadataRef, part_duration: Option<Duration>) -> Self {
428        // Fall back to the existing partition duration.
429        let part_duration = part_duration.unwrap_or(self.part_duration);
430
431        let mut inner = self.inner.lock().unwrap();
432        let latest_part = inner
433            .parts
434            .iter()
435            .max_by_key(|part| part.time_range.min_timestamp)
436            .cloned();
437
438        let Some(old_part) = latest_part else {
439            // If there is no partition, then we create a new partition with the new duration.
440            return Self::new(
441                metadata.clone(),
442                self.builder.clone(),
443                inner.next_memtable_id,
444                Some(part_duration),
445            );
446        };
447
448        let old_stats = old_part.memtable.stats();
449        // Use the max timestamp to compute the new time range for the memtable.
450        let partitions_inner = old_stats
451            .time_range()
452            .and_then(|(_, old_stats_end_timestamp)| {
453                partition_start_timestamp(old_stats_end_timestamp, part_duration)
454                    .and_then(|start| PartTimeRange::from_start_duration(start, part_duration))
455            })
456            .map(|part_time_range| {
457                // Forks the latest partition, but compute the time range based on the new duration.
458                let memtable = old_part.memtable.fork(inner.alloc_memtable_id(), metadata);
459                let part = TimePartition {
460                    memtable,
461                    time_range: part_time_range,
462                };
463                PartitionsInner::with_partition(part, inner.next_memtable_id)
464            })
465            .unwrap_or_else(|| PartitionsInner::new(inner.next_memtable_id));
466
467        Self {
468            inner: Mutex::new(partitions_inner),
469            part_duration,
470            metadata: metadata.clone(),
471            builder: self.builder.clone(),
472            primary_key_codec: self.primary_key_codec.clone(),
473            bulk_schema: self.bulk_schema.clone(),
474        }
475    }
476
477    /// Returns partition duration.
478    pub(crate) fn part_duration(&self) -> Duration {
479        self.part_duration
480    }
481
482    /// Returns the memtable builder.
483    pub(crate) fn memtable_builder(&self) -> &MemtableBuilderRef {
484        &self.builder
485    }
486
487    /// Returns memory usage.
488    pub(crate) fn memory_usage(&self) -> usize {
489        let inner = self.inner.lock().unwrap();
490        inner
491            .parts
492            .iter()
493            .map(|part| part.memtable.stats().estimated_bytes)
494            .sum()
495    }
496
497    /// Returns the number of rows.
498    pub(crate) fn num_rows(&self) -> u64 {
499        let inner = self.inner.lock().unwrap();
500        inner
501            .parts
502            .iter()
503            .map(|part| part.memtable.stats().num_rows as u64)
504            .sum()
505    }
506
507    /// Append memtables in partitions to small vec.
508    pub(crate) fn list_memtables_to_small_vec(&self, memtables: &mut SmallMemtableVec) {
509        let inner = self.inner.lock().unwrap();
510        memtables.extend(inner.parts.iter().map(|part| part.memtable.clone()));
511    }
512
513    /// Returns the next memtable id.
514    pub(crate) fn next_memtable_id(&self) -> MemtableId {
515        let inner = self.inner.lock().unwrap();
516        inner.next_memtable_id
517    }
518
519    /// Creates a new empty partition list from this list and a `part_duration`.
520    /// It falls back to the old partition duration if `part_duration` is `None`.
521    pub(crate) fn new_with_part_duration(
522        &self,
523        part_duration: Option<Duration>,
524        memtable_builder: Option<MemtableBuilderRef>,
525    ) -> Self {
526        debug_assert!(self.is_empty());
527
528        Self::new(
529            self.metadata.clone(),
530            memtable_builder.unwrap_or_else(|| self.builder.clone()),
531            self.next_memtable_id(),
532            Some(part_duration.unwrap_or(self.part_duration)),
533        )
534    }
535
536    /// Returns all partitions.
537    fn list_partitions(&self) -> PartitionVec {
538        let inner = self.inner.lock().unwrap();
539        inner.parts.clone()
540    }
541
542    /// Find existing partitions that match the bulk data's time range and identify
543    /// any new partitions that need to be created
544    fn find_partitions_by_time_range<'a>(
545        &self,
546        ts_array: &ArrayRef,
547        existing_parts: &'a [TimePartition],
548        min: Timestamp,
549        max: Timestamp,
550    ) -> Result<(Vec<&'a TimePartition>, Vec<Timestamp>)> {
551        let mut matching = Vec::new();
552
553        let mut present = HashSet::new();
554        // First find any existing partitions that overlap
555        for part in existing_parts {
556            let part_time_range = &part.time_range;
557            if !(max < part_time_range.min_timestamp || min >= part_time_range.max_timestamp) {
558                matching.push(part);
559                present.insert(part_time_range.min_timestamp.value());
560            }
561        }
562
563        // safety: self.part_duration can only be present when reach here.
564        let part_duration = self.part_duration_or_default();
565        let timestamp_unit = self.metadata.time_index_type().unit();
566
567        let part_duration_sec = part_duration.as_secs() as i64;
568        // SAFETY: Timestamps won't overflow when converting to Second.
569        let start_bucket = min
570            .convert_to(TimeUnit::Second)
571            .unwrap()
572            .value()
573            .div_euclid(part_duration_sec);
574        let end_bucket = max
575            .convert_to(TimeUnit::Second)
576            .unwrap()
577            .value()
578            .div_euclid(part_duration_sec);
579        let bucket_num = (end_bucket - start_bucket + 1) as usize;
580
581        let num_timestamps = ts_array.len();
582        let missing = if bucket_num <= num_timestamps {
583            (start_bucket..=end_bucket)
584                .filter_map(|start_sec| {
585                    let Some(timestamp) = Timestamp::new_second(start_sec * part_duration_sec)
586                        .convert_to(timestamp_unit)
587                    else {
588                        return Some(
589                            InvalidRequestSnafu {
590                                region_id: self.metadata.region_id,
591                                reason: format!("Timestamp out of range: {}", start_sec),
592                            }
593                            .fail(),
594                        );
595                    };
596                    if present.insert(timestamp.value()) {
597                        Some(Ok(timestamp))
598                    } else {
599                        None
600                    }
601                })
602                .collect::<Result<Vec<_>>>()?
603        } else {
604            let ts_primitive = match ts_array.data_type() {
605                DataType::Timestamp(unit, _) => match unit {
606                    arrow::datatypes::TimeUnit::Second => ts_array
607                        .as_any()
608                        .downcast_ref::<TimestampSecondArray>()
609                        .unwrap()
610                        .reinterpret_cast::<Int64Type>(),
611                    arrow::datatypes::TimeUnit::Millisecond => ts_array
612                        .as_any()
613                        .downcast_ref::<TimestampMillisecondArray>()
614                        .unwrap()
615                        .reinterpret_cast::<Int64Type>(),
616                    arrow::datatypes::TimeUnit::Microsecond => ts_array
617                        .as_any()
618                        .downcast_ref::<TimestampMicrosecondArray>()
619                        .unwrap()
620                        .reinterpret_cast::<Int64Type>(),
621                    arrow::datatypes::TimeUnit::Nanosecond => ts_array
622                        .as_any()
623                        .downcast_ref::<TimestampNanosecondArray>()
624                        .unwrap()
625                        .reinterpret_cast::<Int64Type>(),
626                },
627                _ => unreachable!(),
628            };
629
630            ts_primitive
631                .values()
632                .iter()
633                .filter_map(|ts| {
634                    let ts = self.metadata.time_index_type().create_timestamp(*ts);
635                    let Some(bucket_start) = ts
636                        .convert_to(TimeUnit::Second)
637                        .and_then(|ts| ts.align_by_bucket(part_duration_sec))
638                        .and_then(|ts| ts.convert_to(timestamp_unit))
639                    else {
640                        return Some(
641                            InvalidRequestSnafu {
642                                region_id: self.metadata.region_id,
643                                reason: format!("Timestamp out of range: {:?}", ts),
644                            }
645                            .fail(),
646                        );
647                    };
648                    if present.insert(bucket_start.value()) {
649                        Some(Ok(bucket_start))
650                    } else {
651                        None
652                    }
653                })
654                .collect::<Result<Vec<_>>>()?
655        };
656        Ok((matching, missing))
657    }
658
659    /// Returns partition duration, or use default 1day duration is not present.
660    fn part_duration_or_default(&self) -> Duration {
661        self.part_duration
662    }
663
664    /// Write to multiple partitions.
665    fn write_multi_parts(&self, kvs: &KeyValues, parts: &PartitionVec) -> Result<()> {
666        let mut parts_to_write = HashMap::new();
667        let mut missing_parts = HashMap::new();
668        for kv in kvs.iter() {
669            let mut part_found = false;
670            // Safety: We used the timestamp before.
671            let ts = kv.timestamp().try_into_timestamp().unwrap().unwrap();
672            for part in parts {
673                if part.contains_timestamp(ts) {
674                    parts_to_write
675                        .entry(part.time_range.min_timestamp)
676                        .or_insert_with(|| PartitionToWrite {
677                            partition: part.clone(),
678                            key_values: Vec::new(),
679                        })
680                        .key_values
681                        .push(kv);
682                    part_found = true;
683                    break;
684                }
685            }
686
687            if !part_found {
688                // We need to write it to a new part.
689                // Safety: `new()` ensures duration is always Some if we do to this method.
690                let part_duration = self.part_duration_or_default();
691                let part_start =
692                    partition_start_timestamp(ts, part_duration).with_context(|| {
693                        InvalidRequestSnafu {
694                            region_id: self.metadata.region_id,
695                            reason: format!(
696                                "timestamp {ts:?} and bucket {part_duration:?} are out of range"
697                            ),
698                        }
699                    })?;
700                missing_parts
701                    .entry(part_start)
702                    .or_insert_with(Vec::new)
703                    .push(kv);
704            }
705        }
706
707        // Writes rows to existing parts.
708        for part_to_write in parts_to_write.into_values() {
709            for kv in part_to_write.key_values {
710                part_to_write.partition.memtable.write_one(kv)?;
711            }
712        }
713
714        // Creates new parts and writes to them. Acquires the lock to avoid others create
715        // the same partition.
716        let mut inner = self.inner.lock().unwrap();
717        for (part_start, key_values) in missing_parts {
718            let partition = self.get_or_create_time_partition(part_start, &mut inner)?;
719            for kv in key_values {
720                partition.memtable.write_one(kv)?;
721            }
722        }
723
724        Ok(())
725    }
726
727    /// Timeseries count in all time partitions.
728    pub(crate) fn series_count(&self) -> usize {
729        self.inner.lock().unwrap().series_count()
730    }
731}
732
733/// Computes the start timestamp of the partition for `ts`.
734///
735/// It always use bucket size in seconds which should fit all timestamp resolution.
736fn partition_start_timestamp(ts: Timestamp, bucket: Duration) -> Option<Timestamp> {
737    // Safety: We convert it to seconds so it never returns `None`.
738    let ts_sec = ts.convert_to(TimeUnit::Second).unwrap();
739    let bucket_sec: i64 = bucket.as_secs().try_into().ok()?;
740    let start_sec = ts_sec.align_by_bucket(bucket_sec)?;
741    start_sec.convert_to(ts.unit())
742}
743
744#[derive(Debug)]
745struct PartitionsInner {
746    /// All partitions.
747    parts: PartitionVec,
748    /// Next memtable id.
749    next_memtable_id: MemtableId,
750}
751
752impl PartitionsInner {
753    fn new(next_memtable_id: MemtableId) -> Self {
754        Self {
755            parts: Default::default(),
756            next_memtable_id,
757        }
758    }
759
760    fn with_partition(part: TimePartition, next_memtable_id: MemtableId) -> Self {
761        Self {
762            parts: smallvec![part],
763            next_memtable_id,
764        }
765    }
766
767    fn alloc_memtable_id(&mut self) -> MemtableId {
768        let id = self.next_memtable_id;
769        self.next_memtable_id += 1;
770        id
771    }
772
773    pub(crate) fn series_count(&self) -> usize {
774        self.parts
775            .iter()
776            .map(|p| p.memtable.stats().series_count)
777            .sum()
778    }
779}
780
781/// Time range of a partition.
782#[derive(Debug, Clone, Copy)]
783struct PartTimeRange {
784    /// Inclusive min timestamp of rows in the partition.
785    min_timestamp: Timestamp,
786    /// Exclusive max timestamp of rows in the partition.
787    max_timestamp: Timestamp,
788}
789
790impl PartTimeRange {
791    fn from_start_duration(start: Timestamp, duration: Duration) -> Option<Self> {
792        let start_sec = start.convert_to(TimeUnit::Second)?;
793        let end_sec = start_sec.add_duration(duration).ok()?;
794        let min_timestamp = start_sec.convert_to(start.unit())?;
795        let max_timestamp = end_sec.convert_to(start.unit())?;
796
797        Some(Self {
798            min_timestamp,
799            max_timestamp,
800        })
801    }
802
803    /// Returns whether the `ts` belongs to the partition.
804    fn contains_timestamp(&self, ts: Timestamp) -> bool {
805        self.min_timestamp <= ts && ts < self.max_timestamp
806    }
807}
808
809struct PartitionToWrite<'a> {
810    partition: TimePartition,
811    key_values: Vec<KeyValue<'a>>,
812}
813
814#[cfg(test)]
815mod tests {
816    use std::sync::Arc;
817
818    use api::v1::SemanticType;
819    use datatypes::arrow::array::{ArrayRef, StringArray, TimestampMillisecondArray};
820    use datatypes::arrow::datatypes::{DataType, Field, Schema};
821    use datatypes::arrow::record_batch::RecordBatch;
822    use datatypes::prelude::ConcreteDataType;
823    use datatypes::schema::ColumnSchema;
824    use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
825    use store_api::storage::SequenceNumber;
826
827    use super::*;
828    use crate::memtable::partition_tree::PartitionTreeMemtableBuilder;
829    use crate::memtable::time_series::TimeSeriesMemtableBuilder;
830    use crate::test_util::memtable_util::{self, collect_iter_timestamps};
831
832    #[test]
833    fn test_no_duration() {
834        let metadata = memtable_util::metadata_for_test();
835        let builder = Arc::new(PartitionTreeMemtableBuilder::default());
836        let partitions = TimePartitions::new(metadata.clone(), builder, 0, None);
837        assert_eq!(0, partitions.num_partitions());
838        assert!(partitions.is_empty());
839
840        let kvs = memtable_util::build_key_values(
841            &metadata,
842            "hello".to_string(),
843            0,
844            &[1000, 3000, 7000, 5000, 6000],
845            0, // sequence 0, 1, 2, 3, 4
846        );
847        partitions.write(&kvs).unwrap();
848
849        assert_eq!(1, partitions.num_partitions());
850        assert!(!partitions.is_empty());
851        let mut memtables = Vec::new();
852        partitions.list_memtables(&mut memtables);
853        assert_eq!(0, memtables[0].id());
854
855        let iter = memtables[0].iter(None, None, None).unwrap();
856        let timestamps = collect_iter_timestamps(iter);
857        assert_eq!(&[1000, 3000, 5000, 6000, 7000], &timestamps[..]);
858    }
859
860    #[test]
861    fn test_write_single_part() {
862        let metadata = memtable_util::metadata_for_test();
863        let builder = Arc::new(PartitionTreeMemtableBuilder::default());
864        let partitions =
865            TimePartitions::new(metadata.clone(), builder, 0, Some(Duration::from_secs(10)));
866        assert_eq!(0, partitions.num_partitions());
867
868        let kvs = memtable_util::build_key_values(
869            &metadata,
870            "hello".to_string(),
871            0,
872            &[5000, 2000, 0],
873            0, // sequence 0, 1, 2
874        );
875        // It should creates a new partition.
876        partitions.write(&kvs).unwrap();
877        assert_eq!(1, partitions.num_partitions());
878        assert!(!partitions.is_empty());
879
880        let kvs = memtable_util::build_key_values(
881            &metadata,
882            "hello".to_string(),
883            0,
884            &[3000, 7000, 4000],
885            3, // sequence 3, 4, 5
886        );
887        // Still writes to the same partition.
888        partitions.write(&kvs).unwrap();
889        assert_eq!(1, partitions.num_partitions());
890
891        let mut memtables = Vec::new();
892        partitions.list_memtables(&mut memtables);
893        let iter = memtables[0].iter(None, None, None).unwrap();
894        let timestamps = collect_iter_timestamps(iter);
895        assert_eq!(&[0, 2000, 3000, 4000, 5000, 7000], &timestamps[..]);
896        let parts = partitions.list_partitions();
897        assert_eq!(
898            Timestamp::new_millisecond(0),
899            parts[0].time_range.min_timestamp
900        );
901        assert_eq!(
902            Timestamp::new_millisecond(10000),
903            parts[0].time_range.max_timestamp
904        );
905    }
906
907    #[cfg(test)]
908    fn new_multi_partitions(metadata: &RegionMetadataRef) -> TimePartitions {
909        let builder = Arc::new(PartitionTreeMemtableBuilder::default());
910        let partitions =
911            TimePartitions::new(metadata.clone(), builder, 0, Some(Duration::from_secs(5)));
912        assert_eq!(0, partitions.num_partitions());
913
914        let kvs = memtable_util::build_key_values(
915            metadata,
916            "hello".to_string(),
917            0,
918            &[2000, 0],
919            0, // sequence 0, 1
920        );
921        // It should creates a new partition.
922        partitions.write(&kvs).unwrap();
923        assert_eq!(1, partitions.num_partitions());
924        assert!(!partitions.is_empty());
925
926        let kvs = memtable_util::build_key_values(
927            metadata,
928            "hello".to_string(),
929            0,
930            &[3000, 7000, 4000, 5000],
931            2, // sequence 2, 3, 4, 5
932        );
933        // Writes 2 rows to the old partition and 1 row to a new partition.
934        partitions.write(&kvs).unwrap();
935        assert_eq!(2, partitions.num_partitions());
936
937        partitions
938    }
939
940    #[test]
941    fn test_write_multi_parts() {
942        let metadata = memtable_util::metadata_for_test();
943        let partitions = new_multi_partitions(&metadata);
944
945        let parts = partitions.list_partitions();
946        let iter = parts[0].memtable.iter(None, None, None).unwrap();
947        let timestamps = collect_iter_timestamps(iter);
948        assert_eq!(0, parts[0].memtable.id());
949        assert_eq!(
950            Timestamp::new_millisecond(0),
951            parts[0].time_range.min_timestamp
952        );
953        assert_eq!(
954            Timestamp::new_millisecond(5000),
955            parts[0].time_range.max_timestamp
956        );
957        assert_eq!(&[0, 2000, 3000, 4000], &timestamps[..]);
958        let iter = parts[1].memtable.iter(None, None, None).unwrap();
959        assert_eq!(1, parts[1].memtable.id());
960        let timestamps = collect_iter_timestamps(iter);
961        assert_eq!(&[5000, 7000], &timestamps[..]);
962        assert_eq!(
963            Timestamp::new_millisecond(5000),
964            parts[1].time_range.min_timestamp
965        );
966        assert_eq!(
967            Timestamp::new_millisecond(10000),
968            parts[1].time_range.max_timestamp
969        );
970    }
971
972    #[test]
973    fn test_new_with_part_duration() {
974        let metadata = memtable_util::metadata_for_test();
975        let builder = Arc::new(PartitionTreeMemtableBuilder::default());
976        let partitions = TimePartitions::new(metadata.clone(), builder.clone(), 0, None);
977
978        let new_parts = partitions.new_with_part_duration(Some(Duration::from_secs(5)), None);
979        assert_eq!(Duration::from_secs(5), new_parts.part_duration());
980        assert_eq!(0, new_parts.next_memtable_id());
981
982        // Won't update the duration if it's None.
983        let new_parts = new_parts.new_with_part_duration(None, None);
984        assert_eq!(Duration::from_secs(5), new_parts.part_duration());
985        // Don't need to create new memtables.
986        assert_eq!(0, new_parts.next_memtable_id());
987
988        let new_parts = new_parts.new_with_part_duration(Some(Duration::from_secs(10)), None);
989        assert_eq!(Duration::from_secs(10), new_parts.part_duration());
990        // Don't need to create new memtables.
991        assert_eq!(0, new_parts.next_memtable_id());
992
993        let builder = Arc::new(PartitionTreeMemtableBuilder::default());
994        let partitions = TimePartitions::new(metadata.clone(), builder.clone(), 0, None);
995        // Need to build a new memtable as duration is still None.
996        let new_parts = partitions.new_with_part_duration(None, None);
997        assert_eq!(INITIAL_TIME_WINDOW, new_parts.part_duration());
998        assert_eq!(0, new_parts.next_memtable_id());
999    }
1000
1001    #[test]
1002    fn test_fork_empty() {
1003        let metadata = memtable_util::metadata_for_test();
1004        let builder = Arc::new(PartitionTreeMemtableBuilder::default());
1005        let partitions = TimePartitions::new(metadata.clone(), builder, 0, None);
1006        partitions.freeze().unwrap();
1007        let new_parts = partitions.fork(&metadata, None);
1008        assert_eq!(INITIAL_TIME_WINDOW, new_parts.part_duration());
1009        assert!(new_parts.list_partitions().is_empty());
1010        assert_eq!(0, new_parts.next_memtable_id());
1011
1012        new_parts.freeze().unwrap();
1013        let new_parts = new_parts.fork(&metadata, Some(Duration::from_secs(5)));
1014        assert_eq!(Duration::from_secs(5), new_parts.part_duration());
1015        assert!(new_parts.list_partitions().is_empty());
1016        assert_eq!(0, new_parts.next_memtable_id());
1017
1018        new_parts.freeze().unwrap();
1019        let new_parts = new_parts.fork(&metadata, None);
1020        // Won't update the duration.
1021        assert_eq!(Duration::from_secs(5), new_parts.part_duration());
1022        assert!(new_parts.list_partitions().is_empty());
1023        assert_eq!(0, new_parts.next_memtable_id());
1024
1025        new_parts.freeze().unwrap();
1026        let new_parts = new_parts.fork(&metadata, Some(Duration::from_secs(10)));
1027        assert_eq!(Duration::from_secs(10), new_parts.part_duration());
1028        assert!(new_parts.list_partitions().is_empty());
1029        assert_eq!(0, new_parts.next_memtable_id());
1030    }
1031
1032    #[test]
1033    fn test_fork_non_empty_none() {
1034        let metadata = memtable_util::metadata_for_test();
1035        let partitions = new_multi_partitions(&metadata);
1036        partitions.freeze().unwrap();
1037
1038        // Won't update the duration.
1039        let new_parts = partitions.fork(&metadata, None);
1040        assert!(new_parts.is_empty());
1041        assert_eq!(Duration::from_secs(5), new_parts.part_duration());
1042        assert_eq!(2, new_parts.list_partitions()[0].memtable.id());
1043        assert_eq!(3, new_parts.next_memtable_id());
1044
1045        // Although we don't fork a memtable multiple times, we still add a test for it.
1046        let new_parts = partitions.fork(&metadata, Some(Duration::from_secs(10)));
1047        assert!(new_parts.is_empty());
1048        assert_eq!(Duration::from_secs(10), new_parts.part_duration());
1049        assert_eq!(3, new_parts.list_partitions()[0].memtable.id());
1050        assert_eq!(4, new_parts.next_memtable_id());
1051    }
1052
1053    #[test]
1054    fn test_find_partitions_by_time_range() {
1055        let metadata = memtable_util::metadata_for_test();
1056        let builder = Arc::new(PartitionTreeMemtableBuilder::default());
1057
1058        // Case 1: No time range partitioning
1059        let partitions = TimePartitions::new(metadata.clone(), builder.clone(), 0, None);
1060        let parts = partitions.list_partitions();
1061        let (matching, missing) = partitions
1062            .find_partitions_by_time_range(
1063                &(Arc::new(TimestampMillisecondArray::from_iter_values(1000..=2000)) as ArrayRef),
1064                &parts,
1065                Timestamp::new_millisecond(1000),
1066                Timestamp::new_millisecond(2000),
1067            )
1068            .unwrap();
1069        assert_eq!(matching.len(), 0);
1070        assert_eq!(missing.len(), 1);
1071        assert_eq!(missing[0], Timestamp::new_millisecond(0));
1072
1073        // Case 2: With time range partitioning
1074        let partitions = TimePartitions::new(
1075            metadata.clone(),
1076            builder.clone(),
1077            0,
1078            Some(Duration::from_secs(5)),
1079        );
1080
1081        // Create two existing partitions: [0, 5000) and [5000, 10000)
1082        let kvs =
1083            memtable_util::build_key_values(&metadata, "hello".to_string(), 0, &[2000, 4000], 0);
1084        partitions.write(&kvs).unwrap();
1085        let kvs =
1086            memtable_util::build_key_values(&metadata, "hello".to_string(), 0, &[7000, 8000], 2);
1087        partitions.write(&kvs).unwrap();
1088
1089        let parts = partitions.list_partitions();
1090        assert_eq!(2, parts.len());
1091
1092        // Test case 2a: Query fully within existing partition
1093        let (matching, missing) = partitions
1094            .find_partitions_by_time_range(
1095                &(Arc::new(TimestampMillisecondArray::from_iter_values(2000..=4000)) as ArrayRef),
1096                &parts,
1097                Timestamp::new_millisecond(2000),
1098                Timestamp::new_millisecond(4000),
1099            )
1100            .unwrap();
1101        assert_eq!(matching.len(), 1);
1102        assert!(missing.is_empty());
1103        assert_eq!(matching[0].time_range.min_timestamp.value(), 0);
1104
1105        // Test case 2b: Query spanning multiple existing partitions
1106        let (matching, missing) = partitions
1107            .find_partitions_by_time_range(
1108                &(Arc::new(TimestampMillisecondArray::from_iter_values(3000..=8000)) as ArrayRef),
1109                &parts,
1110                Timestamp::new_millisecond(3000),
1111                Timestamp::new_millisecond(8000),
1112            )
1113            .unwrap();
1114        assert_eq!(matching.len(), 2);
1115        assert!(missing.is_empty());
1116        assert_eq!(matching[0].time_range.min_timestamp.value(), 0);
1117        assert_eq!(matching[1].time_range.min_timestamp.value(), 5000);
1118
1119        // Test case 2c: Query requiring new partition
1120        let (matching, missing) = partitions
1121            .find_partitions_by_time_range(
1122                &(Arc::new(TimestampMillisecondArray::from_iter_values(12000..=13000)) as ArrayRef),
1123                &parts,
1124                Timestamp::new_millisecond(12000),
1125                Timestamp::new_millisecond(13000),
1126            )
1127            .unwrap();
1128        assert!(matching.is_empty());
1129        assert_eq!(missing.len(), 1);
1130        assert_eq!(missing[0].value(), 10000);
1131
1132        // Test case 2d: Query partially overlapping existing partition
1133        let (matching, missing) = partitions
1134            .find_partitions_by_time_range(
1135                &(Arc::new(TimestampMillisecondArray::from_iter_values(4000..=6000)) as ArrayRef),
1136                &parts,
1137                Timestamp::new_millisecond(4000),
1138                Timestamp::new_millisecond(6000),
1139            )
1140            .unwrap();
1141        assert_eq!(matching.len(), 2);
1142        assert!(missing.is_empty());
1143        assert_eq!(matching[0].time_range.min_timestamp.value(), 0);
1144        assert_eq!(matching[1].time_range.min_timestamp.value(), 5000);
1145
1146        // Test case 2e: Corner case
1147        let (matching, missing) = partitions
1148            .find_partitions_by_time_range(
1149                &(Arc::new(TimestampMillisecondArray::from_iter_values(4999..=5000)) as ArrayRef),
1150                &parts,
1151                Timestamp::new_millisecond(4999),
1152                Timestamp::new_millisecond(5000),
1153            )
1154            .unwrap();
1155        assert_eq!(matching.len(), 2);
1156        assert!(missing.is_empty());
1157        assert_eq!(matching[0].time_range.min_timestamp.value(), 0);
1158        assert_eq!(matching[1].time_range.min_timestamp.value(), 5000);
1159
1160        // Test case 2f: Corner case with
1161        let (matching, missing) = partitions
1162            .find_partitions_by_time_range(
1163                &(Arc::new(TimestampMillisecondArray::from_iter_values(9999..=10000)) as ArrayRef),
1164                &parts,
1165                Timestamp::new_millisecond(9999),
1166                Timestamp::new_millisecond(10000),
1167            )
1168            .unwrap();
1169        assert_eq!(matching.len(), 1);
1170        assert_eq!(1, missing.len());
1171        assert_eq!(matching[0].time_range.min_timestamp.value(), 5000);
1172        assert_eq!(missing[0].value(), 10000);
1173
1174        // Test case 2g: Cross 0
1175        let (matching, missing) = partitions
1176            .find_partitions_by_time_range(
1177                &(Arc::new(TimestampMillisecondArray::from_iter_values(-1000..=1000)) as ArrayRef),
1178                &parts,
1179                Timestamp::new_millisecond(-1000),
1180                Timestamp::new_millisecond(1000),
1181            )
1182            .unwrap();
1183        assert_eq!(matching.len(), 1);
1184        assert_eq!(matching[0].time_range.min_timestamp.value(), 0);
1185        assert_eq!(1, missing.len());
1186        assert_eq!(missing[0].value(), -5000);
1187
1188        // Test case 3: sparse data
1189        let (matching, missing) = partitions
1190            .find_partitions_by_time_range(
1191                &(Arc::new(TimestampMillisecondArray::from(vec![
1192                    -100000000000,
1193                    0,
1194                    100000000000,
1195                ])) as ArrayRef),
1196                &parts,
1197                Timestamp::new_millisecond(-100000000000),
1198                Timestamp::new_millisecond(100000000000),
1199            )
1200            .unwrap();
1201        assert_eq!(2, matching.len());
1202        assert_eq!(matching[0].time_range.min_timestamp.value(), 0);
1203        assert_eq!(matching[1].time_range.min_timestamp.value(), 5000);
1204        assert_eq!(2, missing.len());
1205        assert_eq!(missing[0].value(), -100000000000);
1206        assert_eq!(missing[1].value(), 100000000000);
1207    }
1208
1209    fn build_part(ts: &[i64], sequence: SequenceNumber) -> BulkPart {
1210        let schema = Arc::new(Schema::new(vec![
1211            Field::new(
1212                "ts",
1213                DataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
1214                false,
1215            ),
1216            Field::new("val", DataType::Utf8, true),
1217        ]));
1218        let ts_data = ts.to_vec();
1219        let ts_array = Arc::new(TimestampMillisecondArray::from(ts_data));
1220        let val_array = Arc::new(StringArray::from_iter_values(
1221            ts.iter().map(|v| v.to_string()),
1222        ));
1223        let batch = RecordBatch::try_new(
1224            schema,
1225            vec![ts_array.clone() as ArrayRef, val_array.clone() as ArrayRef],
1226        )
1227        .unwrap();
1228        let max_ts = ts.iter().max().copied().unwrap();
1229        let min_ts = ts.iter().min().copied().unwrap();
1230        BulkPart {
1231            batch,
1232            max_timestamp: max_ts,
1233            min_timestamp: min_ts,
1234            sequence,
1235            timestamp_index: 0,
1236            raw_data: None,
1237        }
1238    }
1239
1240    #[test]
1241    fn test_write_bulk() {
1242        let mut metadata_builder = RegionMetadataBuilder::new(0.into());
1243        metadata_builder
1244            .push_column_metadata(ColumnMetadata {
1245                column_schema: ColumnSchema::new(
1246                    "ts",
1247                    ConcreteDataType::timestamp_millisecond_datatype(),
1248                    false,
1249                ),
1250                semantic_type: SemanticType::Timestamp,
1251                column_id: 0,
1252            })
1253            .push_column_metadata(ColumnMetadata {
1254                column_schema: ColumnSchema::new("val", ConcreteDataType::string_datatype(), false),
1255                semantic_type: SemanticType::Field,
1256                column_id: 1,
1257            })
1258            .primary_key(vec![]);
1259        let metadata = Arc::new(metadata_builder.build().unwrap());
1260
1261        let builder = Arc::new(TimeSeriesMemtableBuilder::default());
1262        let partitions = TimePartitions::new(
1263            metadata.clone(),
1264            builder.clone(),
1265            0,
1266            Some(Duration::from_secs(5)),
1267        );
1268
1269        // Test case 1: Write to single partition
1270        partitions
1271            .write_bulk(build_part(&[1000, 2000, 3000], 0))
1272            .unwrap();
1273
1274        let parts = partitions.list_partitions();
1275        assert_eq!(1, parts.len());
1276        let iter = parts[0].memtable.iter(None, None, None).unwrap();
1277        let timestamps = collect_iter_timestamps(iter);
1278        assert_eq!(&[1000, 2000, 3000], &timestamps[..]);
1279
1280        // Test case 2: Write across multiple existing partitions
1281        partitions
1282            .write_bulk(build_part(&[4000, 5000, 6000], 1))
1283            .unwrap();
1284        let parts = partitions.list_partitions();
1285        assert_eq!(2, parts.len());
1286        // Check first partition [0, 5000)
1287        let iter = parts[0].memtable.iter(None, None, None).unwrap();
1288        let timestamps = collect_iter_timestamps(iter);
1289        assert_eq!(&[1000, 2000, 3000, 4000], &timestamps[..]);
1290        // Check second partition [5000, 10000)
1291        let iter = parts[1].memtable.iter(None, None, None).unwrap();
1292        let timestamps = collect_iter_timestamps(iter);
1293        assert_eq!(&[5000, 6000], &timestamps[..]);
1294
1295        // Test case 3: Write requiring new partition
1296        partitions
1297            .write_bulk(build_part(&[11000, 12000], 3))
1298            .unwrap();
1299
1300        let parts = partitions.list_partitions();
1301        assert_eq!(3, parts.len());
1302
1303        // Check new partition [10000, 15000)
1304        let iter = parts[2].memtable.iter(None, None, None).unwrap();
1305        let timestamps = collect_iter_timestamps(iter);
1306        assert_eq!(&[11000, 12000], &timestamps[..]);
1307
1308        // Test case 4: Write with no time range partitioning
1309        let partitions = TimePartitions::new(metadata.clone(), builder, 3, None);
1310
1311        partitions
1312            .write_bulk(build_part(&[1000, 5000, 9000], 4))
1313            .unwrap();
1314
1315        let parts = partitions.list_partitions();
1316        assert_eq!(1, parts.len());
1317        let iter = parts[0].memtable.iter(None, None, None).unwrap();
1318        let timestamps = collect_iter_timestamps(iter);
1319        assert_eq!(&[1000, 5000, 9000], &timestamps[..]);
1320    }
1321
1322    #[test]
1323    fn test_split_record_batch() {
1324        let schema = Arc::new(Schema::new(vec![
1325            Field::new(
1326                "ts",
1327                DataType::Timestamp(TimeUnit::Millisecond.as_arrow_time_unit(), None),
1328                false,
1329            ),
1330            Field::new("val", DataType::Utf8, true),
1331        ]));
1332
1333        let ts_array = Arc::new(TimestampMillisecondArray::from(vec![
1334            1000, 2000, 5000, 7000, 8000,
1335        ]));
1336        let val_array = Arc::new(StringArray::from(vec!["a", "b", "c", "d", "e"]));
1337        let batch = RecordBatch::try_new(
1338            schema.clone(),
1339            vec![ts_array as ArrayRef, val_array as ArrayRef],
1340        )
1341        .unwrap();
1342
1343        let part = BulkPart {
1344            batch,
1345            max_timestamp: 8000,
1346            min_timestamp: 1000,
1347            sequence: 0,
1348            timestamp_index: 0,
1349            raw_data: None,
1350        };
1351
1352        let result = filter_record_batch(&part, 1000, 2000).unwrap();
1353        assert!(result.is_some());
1354        let filtered = result.unwrap();
1355        assert_eq!(filtered.num_rows(), 1);
1356        assert_eq!(filtered.min_timestamp, 1000);
1357        assert_eq!(filtered.max_timestamp, 1000);
1358
1359        // Test splitting with range [3000, 6000)
1360        let result = filter_record_batch(&part, 3000, 6000).unwrap();
1361        assert!(result.is_some());
1362        let filtered = result.unwrap();
1363        assert_eq!(filtered.num_rows(), 1);
1364        assert_eq!(filtered.min_timestamp, 5000);
1365        assert_eq!(filtered.max_timestamp, 5000);
1366
1367        // Test splitting with range that includes no points
1368        let result = filter_record_batch(&part, 3000, 4000).unwrap();
1369        assert!(result.is_none());
1370
1371        // Test splitting with range that includes all points
1372        let result = filter_record_batch(&part, 0, 9000).unwrap();
1373        assert!(result.is_some());
1374        let filtered = result.unwrap();
1375        assert_eq!(filtered.num_rows(), 5);
1376        assert_eq!(filtered.min_timestamp, 1000);
1377        assert_eq!(filtered.max_timestamp, 8000);
1378    }
1379}