mito2/compaction/
picker.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::fmt::Debug;
16use std::sync::Arc;
17
18use api::v1::region::compact_request;
19use serde::{Deserialize, Serialize};
20
21use crate::compaction::compactor::CompactionRegion;
22use crate::compaction::twcs::TwcsPicker;
23use crate::compaction::window::WindowedCompactionPicker;
24use crate::compaction::{CompactionOutput, SerializedCompactionOutput};
25use crate::region::options::CompactionOptions;
26use crate::sst::file::{FileHandle, FileMeta};
27use crate::sst::file_purger::FilePurger;
28
29#[async_trait::async_trait]
30pub(crate) trait CompactionTask: Debug + Send + Sync + 'static {
31    async fn run(&mut self);
32}
33
34/// Picker picks input SST files for compaction.
35/// Different compaction strategy may implement different pickers.
36pub trait Picker: Debug + Send + Sync + 'static {
37    /// Picks input SST files for compaction.
38    fn pick(&self, compaction_region: &CompactionRegion) -> Option<PickerOutput>;
39}
40
41/// PickerOutput is the output of a [`Picker`].
42/// It contains the outputs of the compaction and the expired SST files.
43#[derive(Default, Clone, Debug)]
44pub struct PickerOutput {
45    pub outputs: Vec<CompactionOutput>,
46    pub expired_ssts: Vec<FileHandle>,
47    pub time_window_size: i64,
48}
49
50/// SerializedPickerOutput is a serialized version of PickerOutput by replacing [CompactionOutput] and [FileHandle] with [SerializedCompactionOutput] and [FileMeta].
51#[derive(Default, Clone, Debug, Serialize, Deserialize)]
52pub struct SerializedPickerOutput {
53    pub outputs: Vec<SerializedCompactionOutput>,
54    pub expired_ssts: Vec<FileMeta>,
55    pub time_window_size: i64,
56}
57
58impl From<&PickerOutput> for SerializedPickerOutput {
59    fn from(input: &PickerOutput) -> Self {
60        let outputs = input
61            .outputs
62            .iter()
63            .map(|output| SerializedCompactionOutput {
64                output_level: output.output_level,
65                inputs: output.inputs.iter().map(|s| s.meta_ref().clone()).collect(),
66                filter_deleted: output.filter_deleted,
67                output_time_range: output.output_time_range,
68            })
69            .collect();
70        let expired_ssts = input
71            .expired_ssts
72            .iter()
73            .map(|s| s.meta_ref().clone())
74            .collect();
75        Self {
76            outputs,
77            expired_ssts,
78            time_window_size: input.time_window_size,
79        }
80    }
81}
82
83impl PickerOutput {
84    /// Converts a [SerializedPickerOutput] to a [PickerOutput].
85    pub fn from_serialized(
86        input: SerializedPickerOutput,
87        file_purger: Arc<dyn FilePurger>,
88    ) -> Self {
89        let outputs = input
90            .outputs
91            .into_iter()
92            .map(|output| CompactionOutput {
93                output_level: output.output_level,
94                inputs: output
95                    .inputs
96                    .into_iter()
97                    .map(|file_meta| FileHandle::new(file_meta, file_purger.clone()))
98                    .collect(),
99                filter_deleted: output.filter_deleted,
100                output_time_range: output.output_time_range,
101            })
102            .collect();
103
104        let expired_ssts = input
105            .expired_ssts
106            .into_iter()
107            .map(|file_meta| FileHandle::new(file_meta, file_purger.clone()))
108            .collect();
109
110        Self {
111            outputs,
112            expired_ssts,
113            time_window_size: input.time_window_size,
114        }
115    }
116}
117
118/// Create a new picker based on the compaction request options and compaction options.
119pub fn new_picker(
120    compact_request_options: &compact_request::Options,
121    compaction_options: &CompactionOptions,
122    append_mode: bool,
123) -> Arc<dyn Picker> {
124    if let compact_request::Options::StrictWindow(window) = compact_request_options {
125        let window = if window.window_seconds == 0 {
126            None
127        } else {
128            Some(window.window_seconds)
129        };
130        Arc::new(WindowedCompactionPicker::new(window)) as Arc<_>
131    } else {
132        match compaction_options {
133            CompactionOptions::Twcs(twcs_opts) => Arc::new(TwcsPicker {
134                max_active_window_runs: twcs_opts.max_active_window_runs,
135                max_active_window_files: twcs_opts.max_active_window_files,
136                max_inactive_window_runs: twcs_opts.max_inactive_window_runs,
137                max_inactive_window_files: twcs_opts.max_inactive_window_files,
138                time_window_seconds: twcs_opts.time_window_seconds(),
139                max_output_file_size: twcs_opts.max_output_file_size.map(|r| r.as_bytes()),
140                append_mode,
141            }) as Arc<_>,
142        }
143    }
144}
145
146#[cfg(test)]
147mod tests {
148    use super::*;
149    use crate::compaction::test_util::new_file_handle;
150    use crate::sst::file::FileId;
151    use crate::test_util::new_noop_file_purger;
152
153    #[test]
154    fn test_picker_output_serialization() {
155        let inputs_file_handle = vec![
156            new_file_handle(FileId::random(), 0, 999, 0),
157            new_file_handle(FileId::random(), 0, 999, 0),
158            new_file_handle(FileId::random(), 0, 999, 0),
159        ];
160        let expired_ssts_file_handle = vec![
161            new_file_handle(FileId::random(), 0, 999, 0),
162            new_file_handle(FileId::random(), 0, 999, 0),
163        ];
164
165        let picker_output = PickerOutput {
166            outputs: vec![
167                CompactionOutput {
168                    output_level: 0,
169                    inputs: inputs_file_handle.clone(),
170                    filter_deleted: false,
171                    output_time_range: None,
172                },
173                CompactionOutput {
174                    output_level: 0,
175                    inputs: inputs_file_handle.clone(),
176                    filter_deleted: false,
177                    output_time_range: None,
178                },
179            ],
180            expired_ssts: expired_ssts_file_handle.clone(),
181            time_window_size: 1000,
182        };
183
184        let picker_output_str =
185            serde_json::to_string(&SerializedPickerOutput::from(&picker_output)).unwrap();
186        let serialized_picker_output: SerializedPickerOutput =
187            serde_json::from_str(&picker_output_str).unwrap();
188        let picker_output_from_serialized =
189            PickerOutput::from_serialized(serialized_picker_output, new_noop_file_purger());
190
191        picker_output
192            .expired_ssts
193            .iter()
194            .zip(picker_output_from_serialized.expired_ssts.iter())
195            .for_each(|(expected, actual)| {
196                assert_eq!(expected.meta_ref(), actual.meta_ref());
197            });
198
199        picker_output
200            .outputs
201            .iter()
202            .zip(picker_output_from_serialized.outputs.iter())
203            .for_each(|(expected, actual)| {
204                assert_eq!(expected.output_level, actual.output_level);
205                expected
206                    .inputs
207                    .iter()
208                    .zip(actual.inputs.iter())
209                    .for_each(|(expected, actual)| {
210                        assert_eq!(expected.meta_ref(), actual.meta_ref());
211                    });
212                assert_eq!(expected.filter_deleted, actual.filter_deleted);
213                assert_eq!(expected.output_time_range, actual.output_time_range);
214            });
215    }
216}