1use std::collections::BTreeMap;
16use std::fmt::Debug;
17
18use common_telemetry::info;
19use common_time::Timestamp;
20use common_time::range::TimestampRange;
21use common_time::timestamp::TimeUnit;
22use common_time::timestamp_millis::BucketAligned;
23use store_api::storage::RegionId;
24
25use crate::compaction::buckets::infer_time_bucket;
26use crate::compaction::compactor::{CompactionRegion, CompactionVersion};
27use crate::compaction::picker::{Picker, PickerOutput};
28use crate::compaction::{CompactionOutput, get_expired_ssts};
29use crate::sst::file::FileHandle;
30
31#[derive(Debug)]
35pub struct WindowedCompactionPicker {
36 compaction_time_window_seconds: Option<i64>,
37}
38
39impl WindowedCompactionPicker {
40 pub fn new(window_seconds: Option<i64>) -> Self {
41 Self {
42 compaction_time_window_seconds: window_seconds,
43 }
44 }
45
46 fn calculate_time_window(
51 &self,
52 region_id: RegionId,
53 current_version: &CompactionVersion,
54 ) -> i64 {
55 self.compaction_time_window_seconds
56 .or(current_version
57 .compaction_time_window
58 .map(|t| t.as_secs() as i64))
59 .unwrap_or_else(|| {
60 let levels = current_version.ssts.levels();
61 let inferred = infer_time_bucket(levels[0].files());
62 info!(
63 "Compaction window for region {} is not present, inferring from files: {:?}",
64 region_id, inferred
65 );
66 inferred
67 })
68 }
69
70 fn pick_inner(
71 &self,
72 region_id: RegionId,
73 current_version: &CompactionVersion,
74 current_time: Timestamp,
75 ) -> (Vec<CompactionOutput>, Vec<FileHandle>, i64) {
76 let time_window = self.calculate_time_window(region_id, current_version);
77 info!(
78 "Compaction window for region: {} is {} seconds",
79 region_id, time_window
80 );
81
82 let expired_ssts = get_expired_ssts(
83 current_version.ssts.levels(),
84 current_version.options.ttl,
85 current_time,
86 );
87 if !expired_ssts.is_empty() {
88 info!("Expired SSTs in region {}: {:?}", region_id, expired_ssts);
89 expired_ssts.iter().for_each(|f| f.set_compacting(true));
91 }
92
93 let windows = assign_files_to_time_windows(
94 time_window,
95 current_version
96 .ssts
97 .levels()
98 .iter()
99 .flat_map(|level| level.files.values()),
100 );
101
102 (build_output(windows), expired_ssts, time_window)
103 }
104}
105
106impl Picker for WindowedCompactionPicker {
107 fn pick(&self, compaction_region: &CompactionRegion) -> Option<PickerOutput> {
108 let (outputs, expired_ssts, time_window) = self.pick_inner(
109 compaction_region.current_version.metadata.region_id,
110 &compaction_region.current_version,
111 Timestamp::current_millis(),
112 );
113
114 Some(PickerOutput {
115 outputs,
116 expired_ssts,
117 time_window_size: time_window,
118 max_file_size: None, })
120 }
121}
122
123fn build_output(windows: BTreeMap<i64, (i64, Vec<FileHandle>)>) -> Vec<CompactionOutput> {
124 let mut outputs = Vec::with_capacity(windows.len());
125 for (lower_bound, (upper_bound, files)) in windows {
126 let output_time_range = Some(
128 TimestampRange::new(
129 Timestamp::new_second(lower_bound),
130 Timestamp::new_second(upper_bound),
131 )
132 .unwrap(),
133 );
134
135 let output = CompactionOutput {
136 output_level: 1,
137 inputs: files,
138 filter_deleted: false,
139 output_time_range,
140 };
141 outputs.push(output);
142 }
143
144 outputs
145}
146
147fn assign_files_to_time_windows<'a>(
151 bucket_sec: i64,
152 files: impl Iterator<Item = &'a FileHandle>,
153) -> BTreeMap<i64, (i64, Vec<FileHandle>)> {
154 let mut buckets = BTreeMap::new();
155
156 for file in files {
157 if file.compacting() {
158 continue;
159 }
160 let (start, end) = file.time_range();
161 let bounds = file_time_bucket_span(
162 start.convert_to(TimeUnit::Second).unwrap().value(),
164 end.convert_to(TimeUnit::Second).unwrap().value(),
165 bucket_sec,
166 );
167 for (lower_bound, upper_bound) in bounds {
168 let (_, files) = buckets
169 .entry(lower_bound)
170 .or_insert_with(|| (upper_bound, Vec::new()));
171 files.push(file.clone());
172 }
173 }
174 buckets
175}
176
177fn file_time_bucket_span(start_sec: i64, end_sec: i64, bucket_sec: i64) -> Vec<(i64, i64)> {
179 assert!(start_sec <= end_sec);
180
181 let mut start_aligned = start_sec.align_by_bucket(bucket_sec).unwrap_or(i64::MIN);
184 let end_aligned = end_sec
185 .align_by_bucket(bucket_sec)
186 .unwrap_or(start_aligned + (end_sec - start_sec));
187
188 let mut res = Vec::with_capacity(((end_aligned - start_aligned) / bucket_sec + 1) as usize);
189 while start_aligned <= end_aligned {
190 let window_size = if start_aligned % bucket_sec == 0 {
191 bucket_sec
192 } else {
193 (start_aligned % bucket_sec).abs()
194 };
195 let upper_bound = start_aligned.checked_add(window_size).unwrap_or(i64::MAX);
196 res.push((start_aligned, upper_bound));
197 start_aligned = upper_bound;
198 }
199 res
200}
201
202#[cfg(test)]
203mod tests {
204 use std::sync::Arc;
205 use std::time::Duration;
206
207 use common_time::Timestamp;
208 use common_time::range::TimestampRange;
209 use store_api::storage::{FileId, RegionId};
210
211 use crate::compaction::compactor::CompactionVersion;
212 use crate::compaction::window::{WindowedCompactionPicker, file_time_bucket_span};
213 use crate::region::options::RegionOptions;
214 use crate::sst::file::{FileMeta, Level};
215 use crate::sst::file_purger::NoopFilePurger;
216 use crate::sst::version::SstVersion;
217 use crate::test_util::memtable_util::metadata_for_test;
218
219 fn build_version(
220 files: &[(FileId, i64, i64, Level)],
221 ttl: Option<Duration>,
222 ) -> CompactionVersion {
223 let metadata = metadata_for_test();
224 let file_purger_ref = Arc::new(NoopFilePurger);
225
226 let mut ssts = SstVersion::new();
227
228 ssts.add_files(
229 file_purger_ref,
230 files.iter().map(|(file_id, start, end, level)| FileMeta {
231 file_id: *file_id,
232 time_range: (
233 Timestamp::new_millisecond(*start),
234 Timestamp::new_millisecond(*end),
235 ),
236 level: *level,
237 ..Default::default()
238 }),
239 );
240
241 CompactionVersion {
242 metadata,
243 ssts: Arc::new(ssts),
244 options: RegionOptions {
245 ttl: ttl.map(|t| t.into()),
246 compaction: Default::default(),
247 compaction_override: false,
248 storage: None,
249 append_mode: false,
250 wal_options: Default::default(),
251 index_options: Default::default(),
252 memtable: None,
253 merge_mode: None,
254 sst_format: None,
255 },
256 compaction_time_window: None,
257 }
258 }
259
260 #[test]
261 fn test_pick_expired() {
262 let picker = WindowedCompactionPicker::new(None);
263 let files = vec![(FileId::random(), 0, 10, 0)];
264
265 let version = build_version(&files, Some(Duration::from_millis(1)));
266 let (outputs, expired_ssts, _window) = picker.pick_inner(
267 RegionId::new(0, 0),
268 &version,
269 Timestamp::new_millisecond(12),
270 );
271 assert!(outputs.is_empty());
272 assert_eq!(1, expired_ssts.len());
273 }
274
275 const HOUR: i64 = 60 * 60 * 1000;
276
277 #[test]
278 fn test_infer_window() {
279 let picker = WindowedCompactionPicker::new(None);
280
281 let files = vec![
282 (FileId::random(), 0, HOUR, 0),
283 (FileId::random(), HOUR, HOUR * 2 - 1, 0),
284 ];
285
286 let version = build_version(&files, Some(Duration::from_millis(3 * HOUR as u64)));
287
288 let (outputs, expired_ssts, window_seconds) = picker.pick_inner(
289 RegionId::new(0, 0),
290 &version,
291 Timestamp::new_millisecond(HOUR * 2),
292 );
293 assert!(expired_ssts.is_empty());
294 assert_eq!(2 * HOUR / 1000, window_seconds);
295 assert_eq!(1, outputs.len());
296 assert_eq!(2, outputs[0].inputs.len());
297 }
298
299 #[test]
300 fn test_assign_files_to_windows() {
301 let picker = WindowedCompactionPicker::new(Some(HOUR / 1000));
302 let files = vec![
303 (FileId::random(), 0, 2 * HOUR - 1, 0),
304 (FileId::random(), HOUR, HOUR * 3 - 1, 0),
305 ];
306 let version = build_version(&files, Some(Duration::from_millis(3 * HOUR as u64)));
307 let (outputs, expired_ssts, window_seconds) = picker.pick_inner(
308 RegionId::new(0, 0),
309 &version,
310 Timestamp::new_millisecond(HOUR * 3),
311 );
312
313 assert!(expired_ssts.is_empty());
314 assert_eq!(HOUR / 1000, window_seconds);
315 assert_eq!(3, outputs.len());
316
317 assert_eq!(1, outputs[0].inputs.len());
318 assert_eq!(files[0].0, outputs[0].inputs[0].file_id().file_id());
319 assert_eq!(
320 TimestampRange::new(
321 Timestamp::new_millisecond(0),
322 Timestamp::new_millisecond(HOUR)
323 ),
324 outputs[0].output_time_range
325 );
326
327 assert_eq!(2, outputs[1].inputs.len());
328 assert_eq!(
329 TimestampRange::new(
330 Timestamp::new_millisecond(HOUR),
331 Timestamp::new_millisecond(2 * HOUR)
332 ),
333 outputs[1].output_time_range
334 );
335
336 assert_eq!(1, outputs[2].inputs.len());
337 assert_eq!(files[1].0, outputs[2].inputs[0].file_id().file_id());
338 assert_eq!(
339 TimestampRange::new(
340 Timestamp::new_millisecond(2 * HOUR),
341 Timestamp::new_millisecond(3 * HOUR)
342 ),
343 outputs[2].output_time_range
344 );
345 }
346
347 #[test]
348 fn test_assign_compacting_files_to_windows() {
349 let picker = WindowedCompactionPicker::new(Some(HOUR / 1000));
350 let files = vec![
351 (FileId::random(), 0, 2 * HOUR - 1, 0),
352 (FileId::random(), HOUR, HOUR * 3 - 1, 0),
353 ];
354 let version = build_version(&files, Some(Duration::from_millis(3 * HOUR as u64)));
355 version.ssts.levels()[0]
356 .files()
357 .for_each(|f| f.set_compacting(true));
358 let (outputs, expired_ssts, window_seconds) = picker.pick_inner(
359 RegionId::new(0, 0),
360 &version,
361 Timestamp::new_millisecond(HOUR * 3),
362 );
363
364 assert!(expired_ssts.is_empty());
365 assert_eq!(HOUR / 1000, window_seconds);
366 assert!(outputs.is_empty());
367 }
368
369 #[test]
370 fn test_file_time_bucket_span() {
371 assert_eq!(
372 vec![(i64::MIN, i64::MIN + 8),],
373 file_time_bucket_span(i64::MIN, i64::MIN + 1, 10)
374 );
375
376 assert_eq!(
377 vec![(i64::MIN, i64::MIN + 8), (i64::MIN + 8, i64::MIN + 18)],
378 file_time_bucket_span(i64::MIN, i64::MIN + 8, 10)
379 );
380
381 assert_eq!(
382 vec![
383 (i64::MIN, i64::MIN + 8),
384 (i64::MIN + 8, i64::MIN + 18),
385 (i64::MIN + 18, i64::MIN + 28)
386 ],
387 file_time_bucket_span(i64::MIN, i64::MIN + 20, 10)
388 );
389
390 assert_eq!(
391 vec![(-10, 0), (0, 10), (10, 20)],
392 file_time_bucket_span(-1, 11, 10)
393 );
394
395 assert_eq!(
396 vec![(-3, 0), (0, 3), (3, 6)],
397 file_time_bucket_span(-1, 3, 3)
398 );
399
400 assert_eq!(vec![(0, 10)], file_time_bucket_span(0, 9, 10));
401
402 assert_eq!(
403 vec![(i64::MAX - (i64::MAX % 10), i64::MAX)],
404 file_time_bucket_span(i64::MAX - 1, i64::MAX, 10)
405 );
406 }
407}