mito2/compaction/
window.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::BTreeMap;
16use std::fmt::Debug;
17
18use common_telemetry::info;
19use common_time::Timestamp;
20use common_time::range::TimestampRange;
21use common_time::timestamp::TimeUnit;
22use common_time::timestamp_millis::BucketAligned;
23use store_api::storage::RegionId;
24
25use crate::compaction::buckets::infer_time_bucket;
26use crate::compaction::compactor::{CompactionRegion, CompactionVersion};
27use crate::compaction::picker::{Picker, PickerOutput};
28use crate::compaction::{CompactionOutput, get_expired_ssts};
29use crate::sst::file::FileHandle;
30
31/// Compaction picker that splits the time range of all involved files to windows, and merges
32/// the data segments intersects with those windows of files together so that the output files
33/// never overlaps.
34#[derive(Debug)]
35pub struct WindowedCompactionPicker {
36    compaction_time_window_seconds: Option<i64>,
37}
38
39impl WindowedCompactionPicker {
40    pub fn new(window_seconds: Option<i64>) -> Self {
41        Self {
42            compaction_time_window_seconds: window_seconds,
43        }
44    }
45
46    // Computes compaction time window. First we respect user specified parameter, then
47    // use persisted window. If persist window is not present, we check the time window
48    // provided while creating table. If all of those are absent, we infer the window
49    // from files in level0.
50    fn calculate_time_window(
51        &self,
52        region_id: RegionId,
53        current_version: &CompactionVersion,
54    ) -> i64 {
55        self.compaction_time_window_seconds
56            .or(current_version
57                .compaction_time_window
58                .map(|t| t.as_secs() as i64))
59            .unwrap_or_else(|| {
60                let levels = current_version.ssts.levels();
61                let inferred = infer_time_bucket(levels[0].files());
62                info!(
63                    "Compaction window for region {} is not present, inferring from files: {:?}",
64                    region_id, inferred
65                );
66                inferred
67            })
68    }
69
70    fn pick_inner(
71        &self,
72        region_id: RegionId,
73        current_version: &CompactionVersion,
74        current_time: Timestamp,
75    ) -> (Vec<CompactionOutput>, Vec<FileHandle>, i64) {
76        let time_window = self.calculate_time_window(region_id, current_version);
77        info!(
78            "Compaction window for region: {} is {} seconds",
79            region_id, time_window
80        );
81
82        let expired_ssts = get_expired_ssts(
83            current_version.ssts.levels(),
84            current_version.options.ttl,
85            current_time,
86        );
87        if !expired_ssts.is_empty() {
88            info!("Expired SSTs in region {}: {:?}", region_id, expired_ssts);
89            // here we mark expired SSTs as compacting to avoid them being picked.
90            expired_ssts.iter().for_each(|f| f.set_compacting(true));
91        }
92
93        let windows = assign_files_to_time_windows(
94            time_window,
95            current_version
96                .ssts
97                .levels()
98                .iter()
99                .flat_map(|level| level.files.values()),
100        );
101
102        (build_output(windows), expired_ssts, time_window)
103    }
104}
105
106impl Picker for WindowedCompactionPicker {
107    fn pick(&self, compaction_region: &CompactionRegion) -> Option<PickerOutput> {
108        let (outputs, expired_ssts, time_window) = self.pick_inner(
109            compaction_region.current_version.metadata.region_id,
110            &compaction_region.current_version,
111            Timestamp::current_millis(),
112        );
113
114        Some(PickerOutput {
115            outputs,
116            expired_ssts,
117            time_window_size: time_window,
118            max_file_size: None, // todo (hl): we may need to support `max_file_size` parameter in manual compaction.
119        })
120    }
121}
122
123fn build_output(windows: BTreeMap<i64, (i64, Vec<FileHandle>)>) -> Vec<CompactionOutput> {
124    let mut outputs = Vec::with_capacity(windows.len());
125    for (lower_bound, (upper_bound, files)) in windows {
126        // safety: the upper bound must > lower bound.
127        let output_time_range = Some(
128            TimestampRange::new(
129                Timestamp::new_second(lower_bound),
130                Timestamp::new_second(upper_bound),
131            )
132            .unwrap(),
133        );
134
135        let output = CompactionOutput {
136            output_level: 1,
137            inputs: files,
138            filter_deleted: false,
139            output_time_range,
140        };
141        outputs.push(output);
142    }
143
144    outputs
145}
146
147/// Assigns files to time windows. If file does not contain a time range in metadata, it will be
148/// assigned to a special bucket `i64::MAX` (normally no timestamp can be aligned to this bucket)
149/// so that all files without timestamp can be compacted together.
150fn assign_files_to_time_windows<'a>(
151    bucket_sec: i64,
152    files: impl Iterator<Item = &'a FileHandle>,
153) -> BTreeMap<i64, (i64, Vec<FileHandle>)> {
154    let mut buckets = BTreeMap::new();
155
156    for file in files {
157        if file.compacting() {
158            continue;
159        }
160        let (start, end) = file.time_range();
161        let bounds = file_time_bucket_span(
162            // safety: converting whatever timestamp to seconds will not overflow.
163            start.convert_to(TimeUnit::Second).unwrap().value(),
164            end.convert_to(TimeUnit::Second).unwrap().value(),
165            bucket_sec,
166        );
167        for (lower_bound, upper_bound) in bounds {
168            let (_, files) = buckets
169                .entry(lower_bound)
170                .or_insert_with(|| (upper_bound, Vec::new()));
171            files.push(file.clone());
172        }
173    }
174    buckets
175}
176
177/// Calculates timestamp span between start and end timestamp.
178fn file_time_bucket_span(start_sec: i64, end_sec: i64, bucket_sec: i64) -> Vec<(i64, i64)> {
179    assert!(start_sec <= end_sec);
180
181    // if timestamp is between `[i64::MIN, i64::MIN.align_by_bucket(bucket)]`, which cannot
182    // be aligned to a valid i64 bound, simply return `i64::MIN` rather than just underflow.
183    let mut start_aligned = start_sec.align_by_bucket(bucket_sec).unwrap_or(i64::MIN);
184    let end_aligned = end_sec
185        .align_by_bucket(bucket_sec)
186        .unwrap_or(start_aligned + (end_sec - start_sec));
187
188    let mut res = Vec::with_capacity(((end_aligned - start_aligned) / bucket_sec + 1) as usize);
189    while start_aligned <= end_aligned {
190        let window_size = if start_aligned % bucket_sec == 0 {
191            bucket_sec
192        } else {
193            (start_aligned % bucket_sec).abs()
194        };
195        let upper_bound = start_aligned.checked_add(window_size).unwrap_or(i64::MAX);
196        res.push((start_aligned, upper_bound));
197        start_aligned = upper_bound;
198    }
199    res
200}
201
202#[cfg(test)]
203mod tests {
204    use std::sync::Arc;
205    use std::time::Duration;
206
207    use common_time::Timestamp;
208    use common_time::range::TimestampRange;
209    use store_api::storage::{FileId, RegionId};
210
211    use crate::compaction::compactor::CompactionVersion;
212    use crate::compaction::window::{WindowedCompactionPicker, file_time_bucket_span};
213    use crate::region::options::RegionOptions;
214    use crate::sst::file::{FileMeta, Level};
215    use crate::sst::file_purger::NoopFilePurger;
216    use crate::sst::version::SstVersion;
217    use crate::test_util::memtable_util::metadata_for_test;
218
219    fn build_version(
220        files: &[(FileId, i64, i64, Level)],
221        ttl: Option<Duration>,
222    ) -> CompactionVersion {
223        let metadata = metadata_for_test();
224        let file_purger_ref = Arc::new(NoopFilePurger);
225
226        let mut ssts = SstVersion::new();
227
228        ssts.add_files(
229            file_purger_ref,
230            files.iter().map(|(file_id, start, end, level)| FileMeta {
231                file_id: *file_id,
232                time_range: (
233                    Timestamp::new_millisecond(*start),
234                    Timestamp::new_millisecond(*end),
235                ),
236                level: *level,
237                ..Default::default()
238            }),
239        );
240
241        CompactionVersion {
242            metadata,
243            ssts: Arc::new(ssts),
244            options: RegionOptions {
245                ttl: ttl.map(|t| t.into()),
246                compaction: Default::default(),
247                compaction_override: false,
248                storage: None,
249                append_mode: false,
250                wal_options: Default::default(),
251                index_options: Default::default(),
252                memtable: None,
253                merge_mode: None,
254                sst_format: None,
255            },
256            compaction_time_window: None,
257        }
258    }
259
260    #[test]
261    fn test_pick_expired() {
262        let picker = WindowedCompactionPicker::new(None);
263        let files = vec![(FileId::random(), 0, 10, 0)];
264
265        let version = build_version(&files, Some(Duration::from_millis(1)));
266        let (outputs, expired_ssts, _window) = picker.pick_inner(
267            RegionId::new(0, 0),
268            &version,
269            Timestamp::new_millisecond(12),
270        );
271        assert!(outputs.is_empty());
272        assert_eq!(1, expired_ssts.len());
273    }
274
275    const HOUR: i64 = 60 * 60 * 1000;
276
277    #[test]
278    fn test_infer_window() {
279        let picker = WindowedCompactionPicker::new(None);
280
281        let files = vec![
282            (FileId::random(), 0, HOUR, 0),
283            (FileId::random(), HOUR, HOUR * 2 - 1, 0),
284        ];
285
286        let version = build_version(&files, Some(Duration::from_millis(3 * HOUR as u64)));
287
288        let (outputs, expired_ssts, window_seconds) = picker.pick_inner(
289            RegionId::new(0, 0),
290            &version,
291            Timestamp::new_millisecond(HOUR * 2),
292        );
293        assert!(expired_ssts.is_empty());
294        assert_eq!(2 * HOUR / 1000, window_seconds);
295        assert_eq!(1, outputs.len());
296        assert_eq!(2, outputs[0].inputs.len());
297    }
298
299    #[test]
300    fn test_assign_files_to_windows() {
301        let picker = WindowedCompactionPicker::new(Some(HOUR / 1000));
302        let files = vec![
303            (FileId::random(), 0, 2 * HOUR - 1, 0),
304            (FileId::random(), HOUR, HOUR * 3 - 1, 0),
305        ];
306        let version = build_version(&files, Some(Duration::from_millis(3 * HOUR as u64)));
307        let (outputs, expired_ssts, window_seconds) = picker.pick_inner(
308            RegionId::new(0, 0),
309            &version,
310            Timestamp::new_millisecond(HOUR * 3),
311        );
312
313        assert!(expired_ssts.is_empty());
314        assert_eq!(HOUR / 1000, window_seconds);
315        assert_eq!(3, outputs.len());
316
317        assert_eq!(1, outputs[0].inputs.len());
318        assert_eq!(files[0].0, outputs[0].inputs[0].file_id().file_id());
319        assert_eq!(
320            TimestampRange::new(
321                Timestamp::new_millisecond(0),
322                Timestamp::new_millisecond(HOUR)
323            ),
324            outputs[0].output_time_range
325        );
326
327        assert_eq!(2, outputs[1].inputs.len());
328        assert_eq!(
329            TimestampRange::new(
330                Timestamp::new_millisecond(HOUR),
331                Timestamp::new_millisecond(2 * HOUR)
332            ),
333            outputs[1].output_time_range
334        );
335
336        assert_eq!(1, outputs[2].inputs.len());
337        assert_eq!(files[1].0, outputs[2].inputs[0].file_id().file_id());
338        assert_eq!(
339            TimestampRange::new(
340                Timestamp::new_millisecond(2 * HOUR),
341                Timestamp::new_millisecond(3 * HOUR)
342            ),
343            outputs[2].output_time_range
344        );
345    }
346
347    #[test]
348    fn test_assign_compacting_files_to_windows() {
349        let picker = WindowedCompactionPicker::new(Some(HOUR / 1000));
350        let files = vec![
351            (FileId::random(), 0, 2 * HOUR - 1, 0),
352            (FileId::random(), HOUR, HOUR * 3 - 1, 0),
353        ];
354        let version = build_version(&files, Some(Duration::from_millis(3 * HOUR as u64)));
355        version.ssts.levels()[0]
356            .files()
357            .for_each(|f| f.set_compacting(true));
358        let (outputs, expired_ssts, window_seconds) = picker.pick_inner(
359            RegionId::new(0, 0),
360            &version,
361            Timestamp::new_millisecond(HOUR * 3),
362        );
363
364        assert!(expired_ssts.is_empty());
365        assert_eq!(HOUR / 1000, window_seconds);
366        assert!(outputs.is_empty());
367    }
368
369    #[test]
370    fn test_file_time_bucket_span() {
371        assert_eq!(
372            vec![(i64::MIN, i64::MIN + 8),],
373            file_time_bucket_span(i64::MIN, i64::MIN + 1, 10)
374        );
375
376        assert_eq!(
377            vec![(i64::MIN, i64::MIN + 8), (i64::MIN + 8, i64::MIN + 18)],
378            file_time_bucket_span(i64::MIN, i64::MIN + 8, 10)
379        );
380
381        assert_eq!(
382            vec![
383                (i64::MIN, i64::MIN + 8),
384                (i64::MIN + 8, i64::MIN + 18),
385                (i64::MIN + 18, i64::MIN + 28)
386            ],
387            file_time_bucket_span(i64::MIN, i64::MIN + 20, 10)
388        );
389
390        assert_eq!(
391            vec![(-10, 0), (0, 10), (10, 20)],
392            file_time_bucket_span(-1, 11, 10)
393        );
394
395        assert_eq!(
396            vec![(-3, 0), (0, 3), (3, 6)],
397            file_time_bucket_span(-1, 3, 3)
398        );
399
400        assert_eq!(vec![(0, 10)], file_time_bucket_span(0, 9, 10));
401
402        assert_eq!(
403            vec![(i64::MAX - (i64::MAX % 10), i64::MAX)],
404            file_time_bucket_span(i64::MAX - 1, i64::MAX, 10)
405        );
406    }
407}