mito2/compaction/
picker.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::fmt::Debug;
16use std::sync::Arc;
17
18use api::v1::region::compact_request;
19use serde::{Deserialize, Serialize};
20
21use crate::compaction::compactor::CompactionRegion;
22use crate::compaction::twcs::TwcsPicker;
23use crate::compaction::window::WindowedCompactionPicker;
24use crate::compaction::{CompactionOutput, SerializedCompactionOutput};
25use crate::region::options::CompactionOptions;
26use crate::sst::file::{FileHandle, FileMeta};
27use crate::sst::file_purger::FilePurger;
28
29#[async_trait::async_trait]
30pub(crate) trait CompactionTask: Debug + Send + Sync + 'static {
31    async fn run(&mut self);
32}
33
34/// Picker picks input SST files for compaction.
35/// Different compaction strategy may implement different pickers.
36pub trait Picker: Debug + Send + Sync + 'static {
37    /// Picks input SST files for compaction.
38    fn pick(&self, compaction_region: &CompactionRegion) -> Option<PickerOutput>;
39}
40
41/// PickerOutput is the output of a [`Picker`].
42/// It contains the outputs of the compaction and the expired SST files.
43#[derive(Default, Clone, Debug)]
44pub struct PickerOutput {
45    pub outputs: Vec<CompactionOutput>,
46    pub expired_ssts: Vec<FileHandle>,
47    pub time_window_size: i64,
48    /// Max single output file size in bytes.
49    pub max_file_size: Option<usize>,
50}
51
52/// SerializedPickerOutput is a serialized version of PickerOutput by replacing [CompactionOutput] and [FileHandle] with [SerializedCompactionOutput] and [FileMeta].
53#[derive(Default, Clone, Debug, Serialize, Deserialize)]
54pub struct SerializedPickerOutput {
55    pub outputs: Vec<SerializedCompactionOutput>,
56    pub expired_ssts: Vec<FileMeta>,
57    pub time_window_size: i64,
58    pub max_file_size: Option<usize>,
59}
60
61impl From<&PickerOutput> for SerializedPickerOutput {
62    fn from(input: &PickerOutput) -> Self {
63        let outputs = input
64            .outputs
65            .iter()
66            .map(|output| SerializedCompactionOutput {
67                output_level: output.output_level,
68                inputs: output.inputs.iter().map(|s| s.meta_ref().clone()).collect(),
69                filter_deleted: output.filter_deleted,
70                output_time_range: output.output_time_range,
71            })
72            .collect();
73        let expired_ssts = input
74            .expired_ssts
75            .iter()
76            .map(|s| s.meta_ref().clone())
77            .collect();
78        Self {
79            outputs,
80            expired_ssts,
81            time_window_size: input.time_window_size,
82            max_file_size: input.max_file_size,
83        }
84    }
85}
86
87impl PickerOutput {
88    /// Converts a [SerializedPickerOutput] to a [PickerOutput].
89    pub fn from_serialized(
90        input: SerializedPickerOutput,
91        file_purger: Arc<dyn FilePurger>,
92    ) -> Self {
93        let outputs = input
94            .outputs
95            .into_iter()
96            .map(|output| CompactionOutput {
97                output_level: output.output_level,
98                inputs: output
99                    .inputs
100                    .into_iter()
101                    .map(|file_meta| FileHandle::new(file_meta, file_purger.clone()))
102                    .collect(),
103                filter_deleted: output.filter_deleted,
104                output_time_range: output.output_time_range,
105            })
106            .collect();
107
108        let expired_ssts = input
109            .expired_ssts
110            .into_iter()
111            .map(|file_meta| FileHandle::new(file_meta, file_purger.clone()))
112            .collect();
113
114        Self {
115            outputs,
116            expired_ssts,
117            time_window_size: input.time_window_size,
118            max_file_size: input.max_file_size,
119        }
120    }
121}
122
123/// Create a new picker based on the compaction request options and compaction options.
124pub fn new_picker(
125    compact_request_options: &compact_request::Options,
126    compaction_options: &CompactionOptions,
127    append_mode: bool,
128) -> Arc<dyn Picker> {
129    if let compact_request::Options::StrictWindow(window) = compact_request_options {
130        let window = if window.window_seconds == 0 {
131            None
132        } else {
133            Some(window.window_seconds)
134        };
135        Arc::new(WindowedCompactionPicker::new(window)) as Arc<_>
136    } else {
137        match compaction_options {
138            CompactionOptions::Twcs(twcs_opts) => Arc::new(TwcsPicker {
139                trigger_file_num: twcs_opts.trigger_file_num,
140                time_window_seconds: twcs_opts.time_window_seconds(),
141                max_output_file_size: twcs_opts.max_output_file_size.map(|r| r.as_bytes()),
142                append_mode,
143            }) as Arc<_>,
144        }
145    }
146}
147
148#[cfg(test)]
149mod tests {
150    use super::*;
151    use crate::compaction::test_util::new_file_handle;
152    use crate::sst::file::FileId;
153    use crate::test_util::new_noop_file_purger;
154
155    #[test]
156    fn test_picker_output_serialization() {
157        let inputs_file_handle = vec![
158            new_file_handle(FileId::random(), 0, 999, 0),
159            new_file_handle(FileId::random(), 0, 999, 0),
160            new_file_handle(FileId::random(), 0, 999, 0),
161        ];
162        let expired_ssts_file_handle = vec![
163            new_file_handle(FileId::random(), 0, 999, 0),
164            new_file_handle(FileId::random(), 0, 999, 0),
165        ];
166
167        let picker_output = PickerOutput {
168            outputs: vec![
169                CompactionOutput {
170                    output_level: 0,
171                    inputs: inputs_file_handle.clone(),
172                    filter_deleted: false,
173                    output_time_range: None,
174                },
175                CompactionOutput {
176                    output_level: 0,
177                    inputs: inputs_file_handle.clone(),
178                    filter_deleted: false,
179                    output_time_range: None,
180                },
181            ],
182            expired_ssts: expired_ssts_file_handle.clone(),
183            time_window_size: 1000,
184            max_file_size: None,
185        };
186
187        let picker_output_str =
188            serde_json::to_string(&SerializedPickerOutput::from(&picker_output)).unwrap();
189        let serialized_picker_output: SerializedPickerOutput =
190            serde_json::from_str(&picker_output_str).unwrap();
191        let picker_output_from_serialized =
192            PickerOutput::from_serialized(serialized_picker_output, new_noop_file_purger());
193
194        picker_output
195            .expired_ssts
196            .iter()
197            .zip(picker_output_from_serialized.expired_ssts.iter())
198            .for_each(|(expected, actual)| {
199                assert_eq!(expected.meta_ref(), actual.meta_ref());
200            });
201
202        picker_output
203            .outputs
204            .iter()
205            .zip(picker_output_from_serialized.outputs.iter())
206            .for_each(|(expected, actual)| {
207                assert_eq!(expected.output_level, actual.output_level);
208                expected
209                    .inputs
210                    .iter()
211                    .zip(actual.inputs.iter())
212                    .for_each(|(expected, actual)| {
213                        assert_eq!(expected.meta_ref(), actual.meta_ref());
214                    });
215                assert_eq!(expected.filter_deleted, actual.filter_deleted);
216                assert_eq!(expected.output_time_range, actual.output_time_range);
217            });
218    }
219}