Skip to main content

cli/data/export_v2/
chunker.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::time::Duration;
16
17use chrono::Duration as ChronoDuration;
18
19use crate::data::export_v2::manifest::{ChunkMeta, TimeRange};
20
21pub fn generate_chunks(time_range: &TimeRange, window: Duration) -> Vec<ChunkMeta> {
22    let (Some(start), Some(end)) = (time_range.start, time_range.end) else {
23        return vec![ChunkMeta::new(1, time_range.clone())];
24    };
25
26    if start == end {
27        return vec![ChunkMeta::skipped(1, time_range.clone())];
28    }
29
30    if start > end {
31        return Vec::new();
32    }
33
34    let window = match ChronoDuration::from_std(window) {
35        Ok(window) if window > ChronoDuration::zero() => window,
36        _ => return vec![ChunkMeta::new(1, time_range.clone())],
37    };
38
39    let mut chunks = Vec::new();
40    let mut cursor = start;
41    let mut id = 1;
42
43    while cursor < end {
44        let next = cursor
45            .checked_add_signed(window)
46            .map_or(end, |timestamp| timestamp.min(end));
47        chunks.push(ChunkMeta::new(id, TimeRange::new(Some(cursor), Some(next))));
48        id += 1;
49        cursor = next;
50    }
51
52    chunks
53}
54
55#[cfg(test)]
56mod tests {
57    use chrono::{TimeZone, Utc};
58
59    use super::*;
60    use crate::data::export_v2::manifest::ChunkStatus;
61
62    #[test]
63    fn test_generate_chunks_unbounded() {
64        let range = TimeRange::unbounded();
65        let chunks = generate_chunks(&range, Duration::from_secs(3600));
66        assert_eq!(chunks.len(), 1);
67        assert_eq!(chunks[0].time_range, range);
68    }
69
70    #[test]
71    fn test_generate_chunks_split() {
72        let start = Utc.with_ymd_and_hms(2025, 1, 1, 0, 0, 0).unwrap();
73        let end = Utc.with_ymd_and_hms(2025, 1, 1, 3, 0, 0).unwrap();
74        let range = TimeRange::new(Some(start), Some(end));
75
76        let chunks = generate_chunks(&range, Duration::from_secs(3600));
77        assert_eq!(chunks.len(), 3);
78        assert_eq!(chunks[0].time_range.start, Some(start));
79        assert_eq!(
80            chunks[2].time_range.end,
81            Some(Utc.with_ymd_and_hms(2025, 1, 1, 3, 0, 0).unwrap())
82        );
83    }
84
85    #[test]
86    fn test_generate_chunks_empty_range() {
87        let start = Utc.with_ymd_and_hms(2025, 1, 1, 0, 0, 0).unwrap();
88        let range = TimeRange::new(Some(start), Some(start));
89        let chunks = generate_chunks(&range, Duration::from_secs(3600));
90        assert_eq!(chunks.len(), 1);
91        assert_eq!(chunks[0].status, ChunkStatus::Skipped);
92        assert_eq!(chunks[0].time_range, range);
93    }
94
95    #[test]
96    fn test_generate_chunks_invalid_range_is_empty() {
97        let start = Utc.with_ymd_and_hms(2025, 1, 1, 1, 0, 0).unwrap();
98        let end = Utc.with_ymd_and_hms(2025, 1, 1, 0, 0, 0).unwrap();
99        let range = TimeRange::new(Some(start), Some(end));
100        let chunks = generate_chunks(&range, Duration::from_secs(3600));
101        assert!(chunks.is_empty());
102    }
103}