mito2/memtable/
time_partition.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Partitions memtables by time.
16
17use std::collections::{HashMap, HashSet};
18use std::sync::{Arc, Mutex, MutexGuard};
19use std::time::Duration;
20
21use common_telemetry::debug;
22use common_time::Timestamp;
23use common_time::timestamp::TimeUnit;
24use common_time::timestamp_millis::BucketAligned;
25use datatypes::arrow;
26use datatypes::arrow::array::{
27    ArrayRef, BooleanArray, RecordBatch, RecordBatchOptions, TimestampMicrosecondArray,
28    TimestampMillisecondArray, TimestampNanosecondArray, TimestampSecondArray,
29};
30use datatypes::arrow::buffer::{BooleanBuffer, MutableBuffer};
31use datatypes::arrow::datatypes::{DataType, Int64Type, SchemaRef};
32use mito_codec::key_values::KeyValue;
33use mito_codec::row_converter::{PrimaryKeyCodec, build_primary_key_codec};
34use smallvec::{SmallVec, smallvec};
35use snafu::{OptionExt, ResultExt};
36use store_api::metadata::RegionMetadataRef;
37
38use crate::error;
39use crate::error::{InvalidRequestSnafu, Result};
40use crate::memtable::bulk::part::{BulkPart, BulkPartConverter};
41use crate::memtable::version::SmallMemtableVec;
42use crate::memtable::{KeyValues, MemtableBuilderRef, MemtableId, MemtableRef};
43use crate::sst::{FlatSchemaOptions, to_flat_sst_arrow_schema};
44
45/// Initial time window if not specified.
46const INITIAL_TIME_WINDOW: Duration = Duration::from_days(1);
47
48/// A partition holds rows with timestamps between `[min, max)`.
49#[derive(Debug, Clone)]
50pub struct TimePartition {
51    /// Memtable of the partition.
52    memtable: MemtableRef,
53    /// Time range of the partition. `min` is inclusive and `max` is exclusive.
54    time_range: PartTimeRange,
55}
56
57impl TimePartition {
58    /// Returns whether the `ts` belongs to the partition.
59    fn contains_timestamp(&self, ts: Timestamp) -> bool {
60        self.time_range.contains_timestamp(ts)
61    }
62
63    /// Write rows to the part.
64    fn write(&self, kvs: &KeyValues) -> Result<()> {
65        self.memtable.write(kvs)
66    }
67
68    /// Writes a record batch to memtable.
69    fn write_record_batch(&self, rb: BulkPart) -> Result<()> {
70        self.memtable.write_bulk(rb)
71    }
72
73    /// Write a partial [BulkPart] according to [TimePartition::time_range].
74    fn write_record_batch_partial(&self, part: &BulkPart) -> Result<()> {
75        let Some(filtered) = filter_record_batch(
76            part,
77            self.time_range.min_timestamp.value(),
78            self.time_range.max_timestamp.value(),
79        )?
80        else {
81            return Ok(());
82        };
83        self.write_record_batch(filtered)
84    }
85}
86
87macro_rules! create_filter_buffer {
88    ($ts_array:expr, $min:expr, $max:expr) => {{
89        let len = $ts_array.len();
90        let mut buffer = MutableBuffer::new(len.div_ceil(64) * 8);
91
92        let f = |idx: usize| -> bool {
93            // SAFETY: we only iterate the array within index bound.
94            unsafe {
95                let val = $ts_array.value_unchecked(idx);
96                val >= $min && val < $max
97            }
98        };
99
100        let chunks = len / 64;
101        let remainder = len % 64;
102
103        for chunk in 0..chunks {
104            let mut packed = 0;
105            for bit_idx in 0..64 {
106                let i = bit_idx + chunk * 64;
107                packed |= (f(i) as u64) << bit_idx;
108            }
109            // SAFETY: Already allocated sufficient capacity
110            unsafe { buffer.push_unchecked(packed) }
111        }
112
113        if remainder != 0 {
114            let mut packed = 0;
115            for bit_idx in 0..remainder {
116                let i = bit_idx + chunks * 64;
117                packed |= (f(i) as u64) << bit_idx;
118            }
119            // SAFETY: Already allocated sufficient capacity
120            unsafe { buffer.push_unchecked(packed) }
121        }
122
123        BooleanArray::new(BooleanBuffer::new(buffer.into(), 0, len), None)
124    }};
125}
126
127macro_rules! handle_timestamp_array {
128    ($ts_array:expr, $array_type:ty, $min:expr, $max:expr) => {{
129        let ts_array = $ts_array.as_any().downcast_ref::<$array_type>().unwrap();
130        let filter = create_filter_buffer!(ts_array, $min, $max);
131
132        let res = arrow::compute::filter(ts_array, &filter).context(error::ComputeArrowSnafu)?;
133        if res.is_empty() {
134            return Ok(None);
135        }
136
137        let i64array = res.as_any().downcast_ref::<$array_type>().unwrap();
138        // safety: we've checked res is not empty
139        let max_ts = arrow::compute::max(i64array).unwrap();
140        let min_ts = arrow::compute::min(i64array).unwrap();
141
142        (res, filter, min_ts, max_ts)
143    }};
144}
145
146/// Filters the given part according to min (inclusive) and max (exclusive) timestamp range.
147/// Returns [None] if no matching rows.
148pub fn filter_record_batch(part: &BulkPart, min: i64, max: i64) -> Result<Option<BulkPart>> {
149    let ts_array = part.timestamps();
150    let (ts_array, filter, min_ts, max_ts) = match ts_array.data_type() {
151        DataType::Timestamp(unit, _) => match unit {
152            arrow::datatypes::TimeUnit::Second => {
153                handle_timestamp_array!(ts_array, TimestampSecondArray, min, max)
154            }
155            arrow::datatypes::TimeUnit::Millisecond => {
156                handle_timestamp_array!(ts_array, TimestampMillisecondArray, min, max)
157            }
158            arrow::datatypes::TimeUnit::Microsecond => {
159                handle_timestamp_array!(ts_array, TimestampMicrosecondArray, min, max)
160            }
161            arrow::datatypes::TimeUnit::Nanosecond => {
162                handle_timestamp_array!(ts_array, TimestampNanosecondArray, min, max)
163            }
164        },
165        _ => {
166            unreachable!("Got data type: {:?}", ts_array.data_type());
167        }
168    };
169
170    let num_rows = ts_array.len();
171    let arrays = part
172        .batch
173        .columns()
174        .iter()
175        .enumerate()
176        .map(|(index, array)| {
177            if index == part.timestamp_index {
178                Ok(ts_array.clone())
179            } else {
180                arrow::compute::filter(&array, &filter).context(error::ComputeArrowSnafu)
181            }
182        })
183        .collect::<Result<Vec<_>>>()?;
184    let batch = RecordBatch::try_new_with_options(
185        part.batch.schema(),
186        arrays,
187        &RecordBatchOptions::default().with_row_count(Some(num_rows)),
188    )
189    .context(error::NewRecordBatchSnafu)?;
190    Ok(Some(BulkPart {
191        batch,
192        max_timestamp: max_ts,
193        min_timestamp: min_ts,
194        sequence: part.sequence,
195        timestamp_index: part.timestamp_index,
196        raw_data: None,
197    }))
198}
199
200type PartitionVec = SmallVec<[TimePartition; 2]>;
201
202/// Partitions.
203#[derive(Debug)]
204pub struct TimePartitions {
205    /// Mutable data of partitions.
206    inner: Mutex<PartitionsInner>,
207    /// Duration of a partition.
208    part_duration: Duration,
209    /// Metadata of the region.
210    metadata: RegionMetadataRef,
211    /// Builder of memtables.
212    builder: MemtableBuilderRef,
213    /// Primary key encoder.
214    primary_key_codec: Arc<dyn PrimaryKeyCodec>,
215
216    /// Cached schema for bulk insert.
217    /// This field is Some if the memtable uses bulk insert.
218    bulk_schema: Option<SchemaRef>,
219}
220
221pub type TimePartitionsRef = Arc<TimePartitions>;
222
223impl TimePartitions {
224    /// Returns a new empty partition list with optional duration.
225    pub fn new(
226        metadata: RegionMetadataRef,
227        builder: MemtableBuilderRef,
228        next_memtable_id: MemtableId,
229        part_duration: Option<Duration>,
230    ) -> Self {
231        let inner = PartitionsInner::new(next_memtable_id);
232        let primary_key_codec = build_primary_key_codec(&metadata);
233        let bulk_schema = builder.use_bulk_insert(&metadata).then(|| {
234            let opts = FlatSchemaOptions::from_encoding(metadata.primary_key_encoding);
235            to_flat_sst_arrow_schema(&metadata, &opts)
236        });
237
238        Self {
239            inner: Mutex::new(inner),
240            part_duration: part_duration.unwrap_or(INITIAL_TIME_WINDOW),
241            metadata,
242            builder,
243            primary_key_codec,
244            bulk_schema,
245        }
246    }
247
248    /// Write key values to memtables.
249    ///
250    /// It creates new partitions if necessary.
251    pub fn write(&self, kvs: &KeyValues) -> Result<()> {
252        if let Some(bulk_schema) = &self.bulk_schema {
253            let mut converter = BulkPartConverter::new(
254                &self.metadata,
255                bulk_schema.clone(),
256                kvs.num_rows(),
257                self.primary_key_codec.clone(),
258                // Always store primary keys for bulk mode.
259                true,
260            );
261            converter.append_key_values(kvs)?;
262            let part = converter.convert()?;
263
264            return self.write_bulk_inner(part);
265        }
266
267        // Get all parts.
268        let parts = self.list_partitions();
269
270        // Checks whether all rows belongs to a single part. Checks in reverse order as we usually
271        // put to latest part.
272        for part in parts.iter().rev() {
273            let mut all_in_partition = true;
274            for kv in kvs.iter() {
275                // Safety: We checked the schema in the write request.
276                let ts = kv.timestamp().try_into_timestamp().unwrap().unwrap();
277                if !part.contains_timestamp(ts) {
278                    all_in_partition = false;
279                    break;
280                }
281            }
282            if !all_in_partition {
283                continue;
284            }
285
286            // We can write all rows to this part.
287            return part.write(kvs);
288        }
289
290        // Slow path: We have to split kvs by partitions.
291        self.write_multi_parts(kvs, &parts)
292    }
293
294    /// Writes a bulk part.
295    pub fn write_bulk(&self, part: BulkPart) -> Result<()> {
296        // Convert the bulk part if bulk_schema is Some
297        let part = if let Some(bulk_schema) = &self.bulk_schema {
298            let converted = crate::memtable::bulk::part::convert_bulk_part(
299                part,
300                &self.metadata,
301                self.primary_key_codec.clone(),
302                bulk_schema.clone(),
303                // Always store primary keys for bulk mode.
304                true,
305            )?;
306            match converted {
307                Some(p) => p,
308                None => return Ok(()),
309            }
310        } else {
311            part
312        };
313
314        self.write_bulk_inner(part)
315    }
316
317    /// Writes a bulk part without converting.
318    fn write_bulk_inner(&self, part: BulkPart) -> Result<()> {
319        let time_type = self
320            .metadata
321            .time_index_column()
322            .column_schema
323            .data_type
324            .as_timestamp()
325            .unwrap();
326
327        // Get all parts.
328        let parts = self.list_partitions();
329        let (matching_parts, missing_parts) = self.find_partitions_by_time_range(
330            part.timestamps(),
331            &parts,
332            time_type.create_timestamp(part.min_timestamp),
333            time_type.create_timestamp(part.max_timestamp),
334        )?;
335
336        if matching_parts.len() == 1 && missing_parts.is_empty() {
337            // fast path: all timestamps fall in one time partition.
338            return matching_parts[0].write_record_batch(part);
339        }
340
341        for matching in matching_parts {
342            matching.write_record_batch_partial(&part)?
343        }
344
345        for missing in missing_parts {
346            let new_part = {
347                let mut inner = self.inner.lock().unwrap();
348                self.get_or_create_time_partition(missing, &mut inner)?
349            };
350            new_part.write_record_batch_partial(&part)?;
351        }
352        Ok(())
353    }
354
355    // Creates or gets parts with given start timestamp.
356    // Acquires the lock to avoid others create the same partition.
357    fn get_or_create_time_partition(
358        &self,
359        part_start: Timestamp,
360        inner: &mut MutexGuard<PartitionsInner>,
361    ) -> Result<TimePartition> {
362        let part_pos = match inner
363            .parts
364            .iter()
365            .position(|part| part.time_range.min_timestamp == part_start)
366        {
367            Some(pos) => pos,
368            None => {
369                let range = PartTimeRange::from_start_duration(part_start, self.part_duration)
370                    .with_context(|| InvalidRequestSnafu {
371                        region_id: self.metadata.region_id,
372                        reason: format!(
373                            "Partition time range for {part_start:?} is out of bound, bucket size: {:?}", self.part_duration
374                        ),
375                    })?;
376                let memtable = self
377                    .builder
378                    .build(inner.alloc_memtable_id(), &self.metadata);
379                debug!(
380                    "Create time partition {:?} for region {}, duration: {:?}, memtable_id: {}, parts_total: {}, metadata: {:?}",
381                    range,
382                    self.metadata.region_id,
383                    self.part_duration,
384                    memtable.id(),
385                    inner.parts.len() + 1,
386                    self.metadata,
387                );
388                let pos = inner.parts.len();
389                inner.parts.push(TimePartition {
390                    memtable,
391                    time_range: range,
392                });
393                pos
394            }
395        };
396        Ok(inner.parts[part_pos].clone())
397    }
398
399    /// Append memtables in partitions to `memtables`.
400    pub fn list_memtables(&self, memtables: &mut Vec<MemtableRef>) {
401        let inner = self.inner.lock().unwrap();
402        memtables.extend(inner.parts.iter().map(|part| part.memtable.clone()));
403    }
404
405    /// Returns the number of partitions.
406    pub fn num_partitions(&self) -> usize {
407        let inner = self.inner.lock().unwrap();
408        inner.parts.len()
409    }
410
411    /// Returns true if all memtables are empty.
412    pub fn is_empty(&self) -> bool {
413        let inner = self.inner.lock().unwrap();
414        inner.parts.iter().all(|part| part.memtable.is_empty())
415    }
416
417    /// Freezes all memtables.
418    pub fn freeze(&self) -> Result<()> {
419        let inner = self.inner.lock().unwrap();
420        for part in &*inner.parts {
421            part.memtable.freeze()?;
422        }
423        Ok(())
424    }
425
426    /// Forks latest partition and updates the partition duration if `part_duration` is Some.
427    pub fn fork(&self, metadata: &RegionMetadataRef, part_duration: Option<Duration>) -> Self {
428        // Fall back to the existing partition duration.
429        let part_duration = part_duration.unwrap_or(self.part_duration);
430
431        let mut inner = self.inner.lock().unwrap();
432        let latest_part = inner
433            .parts
434            .iter()
435            .max_by_key(|part| part.time_range.min_timestamp)
436            .cloned();
437
438        let Some(old_part) = latest_part else {
439            // If there is no partition, then we create a new partition with the new duration.
440            return Self::new(
441                metadata.clone(),
442                self.builder.clone(),
443                inner.next_memtable_id,
444                Some(part_duration),
445            );
446        };
447
448        let old_stats = old_part.memtable.stats();
449        // Use the max timestamp to compute the new time range for the memtable.
450        let partitions_inner = old_stats
451            .time_range()
452            .and_then(|(_, old_stats_end_timestamp)| {
453                partition_start_timestamp(old_stats_end_timestamp, part_duration)
454                    .and_then(|start| PartTimeRange::from_start_duration(start, part_duration))
455            })
456            .map(|part_time_range| {
457                // Forks the latest partition, but compute the time range based on the new duration.
458                let memtable = old_part.memtable.fork(inner.alloc_memtable_id(), metadata);
459                let part = TimePartition {
460                    memtable,
461                    time_range: part_time_range,
462                };
463                PartitionsInner::with_partition(part, inner.next_memtable_id)
464            })
465            .unwrap_or_else(|| PartitionsInner::new(inner.next_memtable_id));
466
467        Self {
468            inner: Mutex::new(partitions_inner),
469            part_duration,
470            metadata: metadata.clone(),
471            builder: self.builder.clone(),
472            primary_key_codec: self.primary_key_codec.clone(),
473            bulk_schema: self.bulk_schema.clone(),
474        }
475    }
476
477    /// Returns partition duration.
478    pub(crate) fn part_duration(&self) -> Duration {
479        self.part_duration
480    }
481
482    /// Returns the memtable builder.
483    pub(crate) fn memtable_builder(&self) -> &MemtableBuilderRef {
484        &self.builder
485    }
486
487    /// Returns memory usage.
488    pub(crate) fn memory_usage(&self) -> usize {
489        let inner = self.inner.lock().unwrap();
490        inner
491            .parts
492            .iter()
493            .map(|part| part.memtable.stats().estimated_bytes)
494            .sum()
495    }
496
497    /// Returns the number of rows.
498    pub(crate) fn num_rows(&self) -> u64 {
499        let inner = self.inner.lock().unwrap();
500        inner
501            .parts
502            .iter()
503            .map(|part| part.memtable.stats().num_rows as u64)
504            .sum()
505    }
506
507    /// Append memtables in partitions to small vec.
508    pub(crate) fn list_memtables_to_small_vec(&self, memtables: &mut SmallMemtableVec) {
509        let inner = self.inner.lock().unwrap();
510        memtables.extend(inner.parts.iter().map(|part| part.memtable.clone()));
511    }
512
513    /// Returns the next memtable id.
514    pub(crate) fn next_memtable_id(&self) -> MemtableId {
515        let inner = self.inner.lock().unwrap();
516        inner.next_memtable_id
517    }
518
519    /// Creates a new empty partition list from this list and a `part_duration`.
520    /// It falls back to the old partition duration if `part_duration` is `None`.
521    pub(crate) fn new_with_part_duration(
522        &self,
523        part_duration: Option<Duration>,
524        memtable_builder: Option<MemtableBuilderRef>,
525    ) -> Self {
526        debug_assert!(self.is_empty());
527
528        Self::new(
529            self.metadata.clone(),
530            memtable_builder.unwrap_or_else(|| self.builder.clone()),
531            self.next_memtable_id(),
532            Some(part_duration.unwrap_or(self.part_duration)),
533        )
534    }
535
536    /// Returns all partitions.
537    fn list_partitions(&self) -> PartitionVec {
538        let inner = self.inner.lock().unwrap();
539        inner.parts.clone()
540    }
541
542    /// Find existing partitions that match the bulk data's time range and identify
543    /// any new partitions that need to be created
544    fn find_partitions_by_time_range<'a>(
545        &self,
546        ts_array: &ArrayRef,
547        existing_parts: &'a [TimePartition],
548        min: Timestamp,
549        max: Timestamp,
550    ) -> Result<(Vec<&'a TimePartition>, Vec<Timestamp>)> {
551        let mut matching = Vec::new();
552
553        let mut present = HashSet::new();
554        // First find any existing partitions that overlap
555        for part in existing_parts {
556            let part_time_range = &part.time_range;
557            if !(max < part_time_range.min_timestamp || min >= part_time_range.max_timestamp) {
558                matching.push(part);
559                present.insert(part_time_range.min_timestamp.value());
560            }
561        }
562
563        // safety: self.part_duration can only be present when reach here.
564        let part_duration = self.part_duration_or_default();
565        let timestamp_unit = self.metadata.time_index_type().unit();
566
567        let part_duration_sec = part_duration.as_secs() as i64;
568        // SAFETY: Timestamps won't overflow when converting to Second.
569        let start_bucket = min
570            .convert_to(TimeUnit::Second)
571            .unwrap()
572            .value()
573            .div_euclid(part_duration_sec);
574        let end_bucket = max
575            .convert_to(TimeUnit::Second)
576            .unwrap()
577            .value()
578            .div_euclid(part_duration_sec);
579        let bucket_num = (end_bucket - start_bucket + 1) as usize;
580
581        let num_timestamps = ts_array.len();
582        let missing = if bucket_num <= num_timestamps {
583            (start_bucket..=end_bucket)
584                .filter_map(|start_sec| {
585                    let Some(timestamp) = Timestamp::new_second(start_sec * part_duration_sec)
586                        .convert_to(timestamp_unit)
587                    else {
588                        return Some(
589                            InvalidRequestSnafu {
590                                region_id: self.metadata.region_id,
591                                reason: format!("Timestamp out of range: {}", start_sec),
592                            }
593                            .fail(),
594                        );
595                    };
596                    if present.insert(timestamp.value()) {
597                        Some(Ok(timestamp))
598                    } else {
599                        None
600                    }
601                })
602                .collect::<Result<Vec<_>>>()?
603        } else {
604            let ts_primitive = match ts_array.data_type() {
605                DataType::Timestamp(unit, _) => match unit {
606                    arrow::datatypes::TimeUnit::Second => ts_array
607                        .as_any()
608                        .downcast_ref::<TimestampSecondArray>()
609                        .unwrap()
610                        .reinterpret_cast::<Int64Type>(),
611                    arrow::datatypes::TimeUnit::Millisecond => ts_array
612                        .as_any()
613                        .downcast_ref::<TimestampMillisecondArray>()
614                        .unwrap()
615                        .reinterpret_cast::<Int64Type>(),
616                    arrow::datatypes::TimeUnit::Microsecond => ts_array
617                        .as_any()
618                        .downcast_ref::<TimestampMicrosecondArray>()
619                        .unwrap()
620                        .reinterpret_cast::<Int64Type>(),
621                    arrow::datatypes::TimeUnit::Nanosecond => ts_array
622                        .as_any()
623                        .downcast_ref::<TimestampNanosecondArray>()
624                        .unwrap()
625                        .reinterpret_cast::<Int64Type>(),
626                },
627                _ => unreachable!(),
628            };
629
630            ts_primitive
631                .values()
632                .iter()
633                .filter_map(|ts| {
634                    let ts = self.metadata.time_index_type().create_timestamp(*ts);
635                    let Some(bucket_start) = ts
636                        .convert_to(TimeUnit::Second)
637                        .and_then(|ts| ts.align_by_bucket(part_duration_sec))
638                        .and_then(|ts| ts.convert_to(timestamp_unit))
639                    else {
640                        return Some(
641                            InvalidRequestSnafu {
642                                region_id: self.metadata.region_id,
643                                reason: format!("Timestamp out of range: {:?}", ts),
644                            }
645                            .fail(),
646                        );
647                    };
648                    if present.insert(bucket_start.value()) {
649                        Some(Ok(bucket_start))
650                    } else {
651                        None
652                    }
653                })
654                .collect::<Result<Vec<_>>>()?
655        };
656        Ok((matching, missing))
657    }
658
659    /// Returns partition duration, or use default 1day duration is not present.
660    fn part_duration_or_default(&self) -> Duration {
661        self.part_duration
662    }
663
664    /// Write to multiple partitions.
665    fn write_multi_parts(&self, kvs: &KeyValues, parts: &PartitionVec) -> Result<()> {
666        let mut parts_to_write = HashMap::new();
667        let mut missing_parts = HashMap::new();
668        for kv in kvs.iter() {
669            let mut part_found = false;
670            // Safety: We used the timestamp before.
671            let ts = kv.timestamp().try_into_timestamp().unwrap().unwrap();
672            for part in parts {
673                if part.contains_timestamp(ts) {
674                    parts_to_write
675                        .entry(part.time_range.min_timestamp)
676                        .or_insert_with(|| PartitionToWrite {
677                            partition: part.clone(),
678                            key_values: Vec::new(),
679                        })
680                        .key_values
681                        .push(kv);
682                    part_found = true;
683                    break;
684                }
685            }
686
687            if !part_found {
688                // We need to write it to a new part.
689                // Safety: `new()` ensures duration is always Some if we do to this method.
690                let part_duration = self.part_duration_or_default();
691                let part_start =
692                    partition_start_timestamp(ts, part_duration).with_context(|| {
693                        InvalidRequestSnafu {
694                            region_id: self.metadata.region_id,
695                            reason: format!(
696                                "timestamp {ts:?} and bucket {part_duration:?} are out of range"
697                            ),
698                        }
699                    })?;
700                missing_parts
701                    .entry(part_start)
702                    .or_insert_with(Vec::new)
703                    .push(kv);
704            }
705        }
706
707        // Writes rows to existing parts.
708        for part_to_write in parts_to_write.into_values() {
709            for kv in part_to_write.key_values {
710                part_to_write.partition.memtable.write_one(kv)?;
711            }
712        }
713
714        // Creates new parts and writes to them. Acquires the lock to avoid others create
715        // the same partition.
716        let mut inner = self.inner.lock().unwrap();
717        for (part_start, key_values) in missing_parts {
718            let partition = self.get_or_create_time_partition(part_start, &mut inner)?;
719            for kv in key_values {
720                partition.memtable.write_one(kv)?;
721            }
722        }
723
724        Ok(())
725    }
726
727    /// Timeseries count in all time partitions.
728    pub(crate) fn series_count(&self) -> usize {
729        self.inner.lock().unwrap().series_count()
730    }
731}
732
733/// Computes the start timestamp of the partition for `ts`.
734///
735/// It always use bucket size in seconds which should fit all timestamp resolution.
736fn partition_start_timestamp(ts: Timestamp, bucket: Duration) -> Option<Timestamp> {
737    // Safety: We convert it to seconds so it never returns `None`.
738    let ts_sec = ts.convert_to(TimeUnit::Second).unwrap();
739    let bucket_sec: i64 = bucket.as_secs().try_into().ok()?;
740    let start_sec = ts_sec.align_by_bucket(bucket_sec)?;
741    start_sec.convert_to(ts.unit())
742}
743
744#[derive(Debug)]
745struct PartitionsInner {
746    /// All partitions.
747    parts: PartitionVec,
748    /// Next memtable id.
749    next_memtable_id: MemtableId,
750}
751
752impl PartitionsInner {
753    fn new(next_memtable_id: MemtableId) -> Self {
754        Self {
755            parts: Default::default(),
756            next_memtable_id,
757        }
758    }
759
760    fn with_partition(part: TimePartition, next_memtable_id: MemtableId) -> Self {
761        Self {
762            parts: smallvec![part],
763            next_memtable_id,
764        }
765    }
766
767    fn alloc_memtable_id(&mut self) -> MemtableId {
768        let id = self.next_memtable_id;
769        self.next_memtable_id += 1;
770        id
771    }
772
773    pub(crate) fn series_count(&self) -> usize {
774        self.parts
775            .iter()
776            .map(|p| p.memtable.stats().series_count)
777            .sum()
778    }
779}
780
781/// Time range of a partition.
782#[derive(Debug, Clone, Copy)]
783struct PartTimeRange {
784    /// Inclusive min timestamp of rows in the partition.
785    min_timestamp: Timestamp,
786    /// Exclusive max timestamp of rows in the partition.
787    max_timestamp: Timestamp,
788}
789
790impl PartTimeRange {
791    fn from_start_duration(start: Timestamp, duration: Duration) -> Option<Self> {
792        let start_sec = start.convert_to(TimeUnit::Second)?;
793        let end_sec = start_sec.add_duration(duration).ok()?;
794        let min_timestamp = start_sec.convert_to(start.unit())?;
795        let max_timestamp = end_sec.convert_to(start.unit())?;
796
797        Some(Self {
798            min_timestamp,
799            max_timestamp,
800        })
801    }
802
803    /// Returns whether the `ts` belongs to the partition.
804    fn contains_timestamp(&self, ts: Timestamp) -> bool {
805        self.min_timestamp <= ts && ts < self.max_timestamp
806    }
807}
808
809struct PartitionToWrite<'a> {
810    partition: TimePartition,
811    key_values: Vec<KeyValue<'a>>,
812}
813
814#[cfg(test)]
815mod tests {
816    use std::sync::Arc;
817
818    use api::v1::SemanticType;
819    use datatypes::arrow::array::{ArrayRef, StringArray, TimestampMillisecondArray};
820    use datatypes::arrow::datatypes::{DataType, Field, Schema};
821    use datatypes::arrow::record_batch::RecordBatch;
822    use datatypes::prelude::ConcreteDataType;
823    use datatypes::schema::ColumnSchema;
824    use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
825    use store_api::storage::SequenceNumber;
826
827    use super::*;
828    use crate::memtable::partition_tree::PartitionTreeMemtableBuilder;
829    use crate::memtable::time_series::TimeSeriesMemtableBuilder;
830    use crate::memtable::{IterBuilder, RangesOptions};
831    use crate::test_util::memtable_util::{self, collect_iter_timestamps};
832
833    #[test]
834    fn test_no_duration() {
835        let metadata = memtable_util::metadata_for_test();
836        let builder = Arc::new(PartitionTreeMemtableBuilder::default());
837        let partitions = TimePartitions::new(metadata.clone(), builder, 0, None);
838        assert_eq!(0, partitions.num_partitions());
839        assert!(partitions.is_empty());
840
841        let kvs = memtable_util::build_key_values(
842            &metadata,
843            "hello".to_string(),
844            0,
845            &[1000, 3000, 7000, 5000, 6000],
846            0, // sequence 0, 1, 2, 3, 4
847        );
848        partitions.write(&kvs).unwrap();
849
850        assert_eq!(1, partitions.num_partitions());
851        assert!(!partitions.is_empty());
852        let mut memtables = Vec::new();
853        partitions.list_memtables(&mut memtables);
854        assert_eq!(0, memtables[0].id());
855
856        let iter = memtables[0]
857            .ranges(None, RangesOptions::default())
858            .unwrap()
859            .build(None)
860            .unwrap();
861        let timestamps = collect_iter_timestamps(iter);
862        assert_eq!(&[1000, 3000, 5000, 6000, 7000], &timestamps[..]);
863    }
864
865    #[test]
866    fn test_write_single_part() {
867        let metadata = memtable_util::metadata_for_test();
868        let builder = Arc::new(PartitionTreeMemtableBuilder::default());
869        let partitions =
870            TimePartitions::new(metadata.clone(), builder, 0, Some(Duration::from_secs(10)));
871        assert_eq!(0, partitions.num_partitions());
872
873        let kvs = memtable_util::build_key_values(
874            &metadata,
875            "hello".to_string(),
876            0,
877            &[5000, 2000, 0],
878            0, // sequence 0, 1, 2
879        );
880        // It should creates a new partition.
881        partitions.write(&kvs).unwrap();
882        assert_eq!(1, partitions.num_partitions());
883        assert!(!partitions.is_empty());
884
885        let kvs = memtable_util::build_key_values(
886            &metadata,
887            "hello".to_string(),
888            0,
889            &[3000, 7000, 4000],
890            3, // sequence 3, 4, 5
891        );
892        // Still writes to the same partition.
893        partitions.write(&kvs).unwrap();
894        assert_eq!(1, partitions.num_partitions());
895
896        let mut memtables = Vec::new();
897        partitions.list_memtables(&mut memtables);
898        let iter = memtables[0]
899            .ranges(None, RangesOptions::default())
900            .unwrap()
901            .build(None)
902            .unwrap();
903        let timestamps = collect_iter_timestamps(iter);
904        assert_eq!(&[0, 2000, 3000, 4000, 5000, 7000], &timestamps[..]);
905        let parts = partitions.list_partitions();
906        assert_eq!(
907            Timestamp::new_millisecond(0),
908            parts[0].time_range.min_timestamp
909        );
910        assert_eq!(
911            Timestamp::new_millisecond(10000),
912            parts[0].time_range.max_timestamp
913        );
914    }
915
916    #[cfg(test)]
917    fn new_multi_partitions(metadata: &RegionMetadataRef) -> TimePartitions {
918        let builder = Arc::new(PartitionTreeMemtableBuilder::default());
919        let partitions =
920            TimePartitions::new(metadata.clone(), builder, 0, Some(Duration::from_secs(5)));
921        assert_eq!(0, partitions.num_partitions());
922
923        let kvs = memtable_util::build_key_values(
924            metadata,
925            "hello".to_string(),
926            0,
927            &[2000, 0],
928            0, // sequence 0, 1
929        );
930        // It should creates a new partition.
931        partitions.write(&kvs).unwrap();
932        assert_eq!(1, partitions.num_partitions());
933        assert!(!partitions.is_empty());
934
935        let kvs = memtable_util::build_key_values(
936            metadata,
937            "hello".to_string(),
938            0,
939            &[3000, 7000, 4000, 5000],
940            2, // sequence 2, 3, 4, 5
941        );
942        // Writes 2 rows to the old partition and 1 row to a new partition.
943        partitions.write(&kvs).unwrap();
944        assert_eq!(2, partitions.num_partitions());
945
946        partitions
947    }
948
949    #[test]
950    fn test_write_multi_parts() {
951        let metadata = memtable_util::metadata_for_test();
952        let partitions = new_multi_partitions(&metadata);
953
954        let parts = partitions.list_partitions();
955        let iter = parts[0]
956            .memtable
957            .ranges(None, RangesOptions::default())
958            .unwrap()
959            .build(None)
960            .unwrap();
961        let timestamps = collect_iter_timestamps(iter);
962        assert_eq!(0, parts[0].memtable.id());
963        assert_eq!(
964            Timestamp::new_millisecond(0),
965            parts[0].time_range.min_timestamp
966        );
967        assert_eq!(
968            Timestamp::new_millisecond(5000),
969            parts[0].time_range.max_timestamp
970        );
971        assert_eq!(&[0, 2000, 3000, 4000], &timestamps[..]);
972        let iter = parts[1]
973            .memtable
974            .ranges(None, RangesOptions::default())
975            .unwrap()
976            .build(None)
977            .unwrap();
978        assert_eq!(1, parts[1].memtable.id());
979        let timestamps = collect_iter_timestamps(iter);
980        assert_eq!(&[5000, 7000], &timestamps[..]);
981        assert_eq!(
982            Timestamp::new_millisecond(5000),
983            parts[1].time_range.min_timestamp
984        );
985        assert_eq!(
986            Timestamp::new_millisecond(10000),
987            parts[1].time_range.max_timestamp
988        );
989    }
990
991    #[test]
992    fn test_new_with_part_duration() {
993        let metadata = memtable_util::metadata_for_test();
994        let builder = Arc::new(PartitionTreeMemtableBuilder::default());
995        let partitions = TimePartitions::new(metadata.clone(), builder.clone(), 0, None);
996
997        let new_parts = partitions.new_with_part_duration(Some(Duration::from_secs(5)), None);
998        assert_eq!(Duration::from_secs(5), new_parts.part_duration());
999        assert_eq!(0, new_parts.next_memtable_id());
1000
1001        // Won't update the duration if it's None.
1002        let new_parts = new_parts.new_with_part_duration(None, None);
1003        assert_eq!(Duration::from_secs(5), new_parts.part_duration());
1004        // Don't need to create new memtables.
1005        assert_eq!(0, new_parts.next_memtable_id());
1006
1007        let new_parts = new_parts.new_with_part_duration(Some(Duration::from_secs(10)), None);
1008        assert_eq!(Duration::from_secs(10), new_parts.part_duration());
1009        // Don't need to create new memtables.
1010        assert_eq!(0, new_parts.next_memtable_id());
1011
1012        let builder = Arc::new(PartitionTreeMemtableBuilder::default());
1013        let partitions = TimePartitions::new(metadata.clone(), builder.clone(), 0, None);
1014        // Need to build a new memtable as duration is still None.
1015        let new_parts = partitions.new_with_part_duration(None, None);
1016        assert_eq!(INITIAL_TIME_WINDOW, new_parts.part_duration());
1017        assert_eq!(0, new_parts.next_memtable_id());
1018    }
1019
1020    #[test]
1021    fn test_fork_empty() {
1022        let metadata = memtable_util::metadata_for_test();
1023        let builder = Arc::new(PartitionTreeMemtableBuilder::default());
1024        let partitions = TimePartitions::new(metadata.clone(), builder, 0, None);
1025        partitions.freeze().unwrap();
1026        let new_parts = partitions.fork(&metadata, None);
1027        assert_eq!(INITIAL_TIME_WINDOW, new_parts.part_duration());
1028        assert!(new_parts.list_partitions().is_empty());
1029        assert_eq!(0, new_parts.next_memtable_id());
1030
1031        new_parts.freeze().unwrap();
1032        let new_parts = new_parts.fork(&metadata, Some(Duration::from_secs(5)));
1033        assert_eq!(Duration::from_secs(5), new_parts.part_duration());
1034        assert!(new_parts.list_partitions().is_empty());
1035        assert_eq!(0, new_parts.next_memtable_id());
1036
1037        new_parts.freeze().unwrap();
1038        let new_parts = new_parts.fork(&metadata, None);
1039        // Won't update the duration.
1040        assert_eq!(Duration::from_secs(5), new_parts.part_duration());
1041        assert!(new_parts.list_partitions().is_empty());
1042        assert_eq!(0, new_parts.next_memtable_id());
1043
1044        new_parts.freeze().unwrap();
1045        let new_parts = new_parts.fork(&metadata, Some(Duration::from_secs(10)));
1046        assert_eq!(Duration::from_secs(10), new_parts.part_duration());
1047        assert!(new_parts.list_partitions().is_empty());
1048        assert_eq!(0, new_parts.next_memtable_id());
1049    }
1050
1051    #[test]
1052    fn test_fork_non_empty_none() {
1053        let metadata = memtable_util::metadata_for_test();
1054        let partitions = new_multi_partitions(&metadata);
1055        partitions.freeze().unwrap();
1056
1057        // Won't update the duration.
1058        let new_parts = partitions.fork(&metadata, None);
1059        assert!(new_parts.is_empty());
1060        assert_eq!(Duration::from_secs(5), new_parts.part_duration());
1061        assert_eq!(2, new_parts.list_partitions()[0].memtable.id());
1062        assert_eq!(3, new_parts.next_memtable_id());
1063
1064        // Although we don't fork a memtable multiple times, we still add a test for it.
1065        let new_parts = partitions.fork(&metadata, Some(Duration::from_secs(10)));
1066        assert!(new_parts.is_empty());
1067        assert_eq!(Duration::from_secs(10), new_parts.part_duration());
1068        assert_eq!(3, new_parts.list_partitions()[0].memtable.id());
1069        assert_eq!(4, new_parts.next_memtable_id());
1070    }
1071
1072    #[test]
1073    fn test_find_partitions_by_time_range() {
1074        let metadata = memtable_util::metadata_for_test();
1075        let builder = Arc::new(PartitionTreeMemtableBuilder::default());
1076
1077        // Case 1: No time range partitioning
1078        let partitions = TimePartitions::new(metadata.clone(), builder.clone(), 0, None);
1079        let parts = partitions.list_partitions();
1080        let (matching, missing) = partitions
1081            .find_partitions_by_time_range(
1082                &(Arc::new(TimestampMillisecondArray::from_iter_values(1000..=2000)) as ArrayRef),
1083                &parts,
1084                Timestamp::new_millisecond(1000),
1085                Timestamp::new_millisecond(2000),
1086            )
1087            .unwrap();
1088        assert_eq!(matching.len(), 0);
1089        assert_eq!(missing.len(), 1);
1090        assert_eq!(missing[0], Timestamp::new_millisecond(0));
1091
1092        // Case 2: With time range partitioning
1093        let partitions = TimePartitions::new(
1094            metadata.clone(),
1095            builder.clone(),
1096            0,
1097            Some(Duration::from_secs(5)),
1098        );
1099
1100        // Create two existing partitions: [0, 5000) and [5000, 10000)
1101        let kvs =
1102            memtable_util::build_key_values(&metadata, "hello".to_string(), 0, &[2000, 4000], 0);
1103        partitions.write(&kvs).unwrap();
1104        let kvs =
1105            memtable_util::build_key_values(&metadata, "hello".to_string(), 0, &[7000, 8000], 2);
1106        partitions.write(&kvs).unwrap();
1107
1108        let parts = partitions.list_partitions();
1109        assert_eq!(2, parts.len());
1110
1111        // Test case 2a: Query fully within existing partition
1112        let (matching, missing) = partitions
1113            .find_partitions_by_time_range(
1114                &(Arc::new(TimestampMillisecondArray::from_iter_values(2000..=4000)) as ArrayRef),
1115                &parts,
1116                Timestamp::new_millisecond(2000),
1117                Timestamp::new_millisecond(4000),
1118            )
1119            .unwrap();
1120        assert_eq!(matching.len(), 1);
1121        assert!(missing.is_empty());
1122        assert_eq!(matching[0].time_range.min_timestamp.value(), 0);
1123
1124        // Test case 2b: Query spanning multiple existing partitions
1125        let (matching, missing) = partitions
1126            .find_partitions_by_time_range(
1127                &(Arc::new(TimestampMillisecondArray::from_iter_values(3000..=8000)) as ArrayRef),
1128                &parts,
1129                Timestamp::new_millisecond(3000),
1130                Timestamp::new_millisecond(8000),
1131            )
1132            .unwrap();
1133        assert_eq!(matching.len(), 2);
1134        assert!(missing.is_empty());
1135        assert_eq!(matching[0].time_range.min_timestamp.value(), 0);
1136        assert_eq!(matching[1].time_range.min_timestamp.value(), 5000);
1137
1138        // Test case 2c: Query requiring new partition
1139        let (matching, missing) = partitions
1140            .find_partitions_by_time_range(
1141                &(Arc::new(TimestampMillisecondArray::from_iter_values(12000..=13000)) as ArrayRef),
1142                &parts,
1143                Timestamp::new_millisecond(12000),
1144                Timestamp::new_millisecond(13000),
1145            )
1146            .unwrap();
1147        assert!(matching.is_empty());
1148        assert_eq!(missing.len(), 1);
1149        assert_eq!(missing[0].value(), 10000);
1150
1151        // Test case 2d: Query partially overlapping existing partition
1152        let (matching, missing) = partitions
1153            .find_partitions_by_time_range(
1154                &(Arc::new(TimestampMillisecondArray::from_iter_values(4000..=6000)) as ArrayRef),
1155                &parts,
1156                Timestamp::new_millisecond(4000),
1157                Timestamp::new_millisecond(6000),
1158            )
1159            .unwrap();
1160        assert_eq!(matching.len(), 2);
1161        assert!(missing.is_empty());
1162        assert_eq!(matching[0].time_range.min_timestamp.value(), 0);
1163        assert_eq!(matching[1].time_range.min_timestamp.value(), 5000);
1164
1165        // Test case 2e: Corner case
1166        let (matching, missing) = partitions
1167            .find_partitions_by_time_range(
1168                &(Arc::new(TimestampMillisecondArray::from_iter_values(4999..=5000)) as ArrayRef),
1169                &parts,
1170                Timestamp::new_millisecond(4999),
1171                Timestamp::new_millisecond(5000),
1172            )
1173            .unwrap();
1174        assert_eq!(matching.len(), 2);
1175        assert!(missing.is_empty());
1176        assert_eq!(matching[0].time_range.min_timestamp.value(), 0);
1177        assert_eq!(matching[1].time_range.min_timestamp.value(), 5000);
1178
1179        // Test case 2f: Corner case with
1180        let (matching, missing) = partitions
1181            .find_partitions_by_time_range(
1182                &(Arc::new(TimestampMillisecondArray::from_iter_values(9999..=10000)) as ArrayRef),
1183                &parts,
1184                Timestamp::new_millisecond(9999),
1185                Timestamp::new_millisecond(10000),
1186            )
1187            .unwrap();
1188        assert_eq!(matching.len(), 1);
1189        assert_eq!(1, missing.len());
1190        assert_eq!(matching[0].time_range.min_timestamp.value(), 5000);
1191        assert_eq!(missing[0].value(), 10000);
1192
1193        // Test case 2g: Cross 0
1194        let (matching, missing) = partitions
1195            .find_partitions_by_time_range(
1196                &(Arc::new(TimestampMillisecondArray::from_iter_values(-1000..=1000)) as ArrayRef),
1197                &parts,
1198                Timestamp::new_millisecond(-1000),
1199                Timestamp::new_millisecond(1000),
1200            )
1201            .unwrap();
1202        assert_eq!(matching.len(), 1);
1203        assert_eq!(matching[0].time_range.min_timestamp.value(), 0);
1204        assert_eq!(1, missing.len());
1205        assert_eq!(missing[0].value(), -5000);
1206
1207        // Test case 3: sparse data
1208        let (matching, missing) = partitions
1209            .find_partitions_by_time_range(
1210                &(Arc::new(TimestampMillisecondArray::from(vec![
1211                    -100000000000,
1212                    0,
1213                    100000000000,
1214                ])) as ArrayRef),
1215                &parts,
1216                Timestamp::new_millisecond(-100000000000),
1217                Timestamp::new_millisecond(100000000000),
1218            )
1219            .unwrap();
1220        assert_eq!(2, matching.len());
1221        assert_eq!(matching[0].time_range.min_timestamp.value(), 0);
1222        assert_eq!(matching[1].time_range.min_timestamp.value(), 5000);
1223        assert_eq!(2, missing.len());
1224        assert_eq!(missing[0].value(), -100000000000);
1225        assert_eq!(missing[1].value(), 100000000000);
1226    }
1227
1228    fn build_part(ts: &[i64], sequence: SequenceNumber) -> BulkPart {
1229        let schema = Arc::new(Schema::new(vec![
1230            Field::new(
1231                "ts",
1232                DataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
1233                false,
1234            ),
1235            Field::new("val", DataType::Utf8, true),
1236        ]));
1237        let ts_data = ts.to_vec();
1238        let ts_array = Arc::new(TimestampMillisecondArray::from(ts_data));
1239        let val_array = Arc::new(StringArray::from_iter_values(
1240            ts.iter().map(|v| v.to_string()),
1241        ));
1242        let batch = RecordBatch::try_new(
1243            schema,
1244            vec![ts_array.clone() as ArrayRef, val_array.clone() as ArrayRef],
1245        )
1246        .unwrap();
1247        let max_ts = ts.iter().max().copied().unwrap();
1248        let min_ts = ts.iter().min().copied().unwrap();
1249        BulkPart {
1250            batch,
1251            max_timestamp: max_ts,
1252            min_timestamp: min_ts,
1253            sequence,
1254            timestamp_index: 0,
1255            raw_data: None,
1256        }
1257    }
1258
1259    #[test]
1260    fn test_write_bulk() {
1261        let mut metadata_builder = RegionMetadataBuilder::new(0.into());
1262        metadata_builder
1263            .push_column_metadata(ColumnMetadata {
1264                column_schema: ColumnSchema::new(
1265                    "ts",
1266                    ConcreteDataType::timestamp_millisecond_datatype(),
1267                    false,
1268                ),
1269                semantic_type: SemanticType::Timestamp,
1270                column_id: 0,
1271            })
1272            .push_column_metadata(ColumnMetadata {
1273                column_schema: ColumnSchema::new("val", ConcreteDataType::string_datatype(), false),
1274                semantic_type: SemanticType::Field,
1275                column_id: 1,
1276            })
1277            .primary_key(vec![]);
1278        let metadata = Arc::new(metadata_builder.build().unwrap());
1279
1280        let builder = Arc::new(TimeSeriesMemtableBuilder::default());
1281        let partitions = TimePartitions::new(
1282            metadata.clone(),
1283            builder.clone(),
1284            0,
1285            Some(Duration::from_secs(5)),
1286        );
1287
1288        // Test case 1: Write to single partition
1289        partitions
1290            .write_bulk(build_part(&[1000, 2000, 3000], 0))
1291            .unwrap();
1292
1293        let parts = partitions.list_partitions();
1294        assert_eq!(1, parts.len());
1295        let iter = parts[0]
1296            .memtable
1297            .ranges(None, RangesOptions::default())
1298            .unwrap()
1299            .build(None)
1300            .unwrap();
1301        let timestamps = collect_iter_timestamps(iter);
1302        assert_eq!(&[1000, 2000, 3000], &timestamps[..]);
1303
1304        // Test case 2: Write across multiple existing partitions
1305        partitions
1306            .write_bulk(build_part(&[4000, 5000, 6000], 1))
1307            .unwrap();
1308        let parts = partitions.list_partitions();
1309        assert_eq!(2, parts.len());
1310        // Check first partition [0, 5000)
1311        let iter = parts[0]
1312            .memtable
1313            .ranges(None, RangesOptions::default())
1314            .unwrap()
1315            .build(None)
1316            .unwrap();
1317        let timestamps = collect_iter_timestamps(iter);
1318        assert_eq!(&[1000, 2000, 3000, 4000], &timestamps[..]);
1319        // Check second partition [5000, 10000)
1320        let iter = parts[1]
1321            .memtable
1322            .ranges(None, RangesOptions::default())
1323            .unwrap()
1324            .build(None)
1325            .unwrap();
1326        let timestamps = collect_iter_timestamps(iter);
1327        assert_eq!(&[5000, 6000], &timestamps[..]);
1328
1329        // Test case 3: Write requiring new partition
1330        partitions
1331            .write_bulk(build_part(&[11000, 12000], 3))
1332            .unwrap();
1333
1334        let parts = partitions.list_partitions();
1335        assert_eq!(3, parts.len());
1336
1337        // Check new partition [10000, 15000)
1338        let iter = parts[2]
1339            .memtable
1340            .ranges(None, RangesOptions::default())
1341            .unwrap()
1342            .build(None)
1343            .unwrap();
1344        let timestamps = collect_iter_timestamps(iter);
1345        assert_eq!(&[11000, 12000], &timestamps[..]);
1346
1347        // Test case 4: Write with no time range partitioning
1348        let partitions = TimePartitions::new(metadata.clone(), builder, 3, None);
1349
1350        partitions
1351            .write_bulk(build_part(&[1000, 5000, 9000], 4))
1352            .unwrap();
1353
1354        let parts = partitions.list_partitions();
1355        assert_eq!(1, parts.len());
1356        let iter = parts[0]
1357            .memtable
1358            .ranges(None, RangesOptions::default())
1359            .unwrap()
1360            .build(None)
1361            .unwrap();
1362        let timestamps = collect_iter_timestamps(iter);
1363        assert_eq!(&[1000, 5000, 9000], &timestamps[..]);
1364    }
1365
1366    #[test]
1367    fn test_split_record_batch() {
1368        let schema = Arc::new(Schema::new(vec![
1369            Field::new(
1370                "ts",
1371                DataType::Timestamp(TimeUnit::Millisecond.as_arrow_time_unit(), None),
1372                false,
1373            ),
1374            Field::new("val", DataType::Utf8, true),
1375        ]));
1376
1377        let ts_array = Arc::new(TimestampMillisecondArray::from(vec![
1378            1000, 2000, 5000, 7000, 8000,
1379        ]));
1380        let val_array = Arc::new(StringArray::from(vec!["a", "b", "c", "d", "e"]));
1381        let batch = RecordBatch::try_new(
1382            schema.clone(),
1383            vec![ts_array as ArrayRef, val_array as ArrayRef],
1384        )
1385        .unwrap();
1386
1387        let part = BulkPart {
1388            batch,
1389            max_timestamp: 8000,
1390            min_timestamp: 1000,
1391            sequence: 0,
1392            timestamp_index: 0,
1393            raw_data: None,
1394        };
1395
1396        let result = filter_record_batch(&part, 1000, 2000).unwrap();
1397        assert!(result.is_some());
1398        let filtered = result.unwrap();
1399        assert_eq!(filtered.num_rows(), 1);
1400        assert_eq!(filtered.min_timestamp, 1000);
1401        assert_eq!(filtered.max_timestamp, 1000);
1402
1403        // Test splitting with range [3000, 6000)
1404        let result = filter_record_batch(&part, 3000, 6000).unwrap();
1405        assert!(result.is_some());
1406        let filtered = result.unwrap();
1407        assert_eq!(filtered.num_rows(), 1);
1408        assert_eq!(filtered.min_timestamp, 5000);
1409        assert_eq!(filtered.max_timestamp, 5000);
1410
1411        // Test splitting with range that includes no points
1412        let result = filter_record_batch(&part, 3000, 4000).unwrap();
1413        assert!(result.is_none());
1414
1415        // Test splitting with range that includes all points
1416        let result = filter_record_batch(&part, 0, 9000).unwrap();
1417        assert!(result.is_some());
1418        let filtered = result.unwrap();
1419        assert_eq!(filtered.num_rows(), 5);
1420        assert_eq!(filtered.min_timestamp, 1000);
1421        assert_eq!(filtered.max_timestamp, 8000);
1422    }
1423}