mito2/compaction/
buckets.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use common_time::timestamp::TimeUnit;
use common_time::Timestamp;

use crate::sst::file::FileHandle;

/// Infers the suitable time bucket duration.
/// Now it simply find the max and min timestamp across all SSTs in level and fit the time span
/// into time bucket.
pub(crate) fn infer_time_bucket<'a>(files: impl Iterator<Item = &'a FileHandle>) -> i64 {
    let mut max_ts = Timestamp::new(i64::MIN, TimeUnit::Second);
    let mut min_ts = Timestamp::new(i64::MAX, TimeUnit::Second);

    for f in files {
        let (start, end) = f.time_range();
        min_ts = min_ts.min(start);
        max_ts = max_ts.max(end);
    }

    // safety: Convert whatever timestamp into seconds will not cause overflow.
    let min_sec = min_ts.convert_to(TimeUnit::Second).unwrap().value();
    let max_sec = max_ts.convert_to(TimeUnit::Second).unwrap().value();

    max_sec
        .checked_sub(min_sec)
        .map(|span| TIME_BUCKETS.fit_time_bucket(span)) // return the max bucket on subtraction overflow.
        .unwrap_or_else(|| TIME_BUCKETS.max()) // safety: TIME_BUCKETS cannot be empty.
}

pub(crate) struct TimeBuckets([i64; 7]);

impl TimeBuckets {
    /// Fits a given time span into time bucket by find the minimum bucket that can cover the span.
    /// Returns the max bucket if no such bucket can be found.
    fn fit_time_bucket(&self, span_sec: i64) -> i64 {
        assert!(span_sec >= 0);
        match self.0.binary_search(&span_sec) {
            Ok(idx) => self.0[idx],
            Err(idx) => {
                if idx < self.0.len() {
                    self.0[idx]
                } else {
                    self.0.last().copied().unwrap()
                }
            }
        }
    }

    #[cfg(test)]
    fn get(&self, idx: usize) -> i64 {
        self.0[idx]
    }

    fn max(&self) -> i64 {
        self.0.last().copied().unwrap()
    }
}

/// A set of predefined time buckets.
pub(crate) const TIME_BUCKETS: TimeBuckets = TimeBuckets([
    60 * 60,                 // one hour
    2 * 60 * 60,             // two hours
    12 * 60 * 60,            // twelve hours
    24 * 60 * 60,            // one day
    7 * 24 * 60 * 60,        // one week
    365 * 24 * 60 * 60,      // one year
    10 * 365 * 24 * 60 * 60, // ten years
]);

#[cfg(test)]
mod tests {
    use super::*;
    use crate::compaction::test_util::new_file_handle;
    use crate::sst::file::FileId;

    #[test]
    fn test_time_bucket() {
        assert_eq!(TIME_BUCKETS.get(0), TIME_BUCKETS.fit_time_bucket(1));
        assert_eq!(TIME_BUCKETS.get(0), TIME_BUCKETS.fit_time_bucket(60 * 60));
        assert_eq!(
            TIME_BUCKETS.get(1),
            TIME_BUCKETS.fit_time_bucket(60 * 60 + 1)
        );

        assert_eq!(
            TIME_BUCKETS.get(2),
            TIME_BUCKETS.fit_time_bucket(TIME_BUCKETS.get(2) - 1)
        );
        assert_eq!(
            TIME_BUCKETS.get(2),
            TIME_BUCKETS.fit_time_bucket(TIME_BUCKETS.get(2))
        );
        assert_eq!(
            TIME_BUCKETS.get(3),
            TIME_BUCKETS.fit_time_bucket(TIME_BUCKETS.get(3) - 1)
        );
        assert_eq!(TIME_BUCKETS.get(6), TIME_BUCKETS.fit_time_bucket(i64::MAX));
    }

    #[test]
    fn test_infer_time_buckets() {
        assert_eq!(
            TIME_BUCKETS.get(0),
            infer_time_bucket(
                [
                    new_file_handle(FileId::random(), 0, TIME_BUCKETS.get(0) * 1000 - 1, 0),
                    new_file_handle(FileId::random(), 1, 10_000, 0)
                ]
                .iter()
            )
        );
    }
}