1use std::fmt::Debug;
16use std::sync::Arc;
17
18use api::v1::region::compact_request;
19use serde::{Deserialize, Serialize};
20
21use crate::compaction::compactor::CompactionRegion;
22use crate::compaction::twcs::TwcsPicker;
23use crate::compaction::window::WindowedCompactionPicker;
24use crate::compaction::{CompactionOutput, SerializedCompactionOutput};
25use crate::region::options::CompactionOptions;
26use crate::sst::file::{FileHandle, FileMeta};
27use crate::sst::file_purger::FilePurger;
28
29#[async_trait::async_trait]
30pub(crate) trait CompactionTask: Debug + Send + Sync + 'static {
31 async fn run(&mut self);
32}
33
34pub trait Picker: Debug + Send + Sync + 'static {
37 fn pick(&self, compaction_region: &CompactionRegion) -> Option<PickerOutput>;
39}
40
41#[derive(Default, Clone, Debug)]
44pub struct PickerOutput {
45 pub outputs: Vec<CompactionOutput>,
46 pub expired_ssts: Vec<FileHandle>,
47 pub time_window_size: i64,
48 pub max_file_size: Option<usize>,
50}
51
52#[derive(Default, Clone, Debug, Serialize, Deserialize)]
54pub struct SerializedPickerOutput {
55 pub outputs: Vec<SerializedCompactionOutput>,
56 pub expired_ssts: Vec<FileMeta>,
57 pub time_window_size: i64,
58 pub max_file_size: Option<usize>,
59}
60
61impl From<&PickerOutput> for SerializedPickerOutput {
62 fn from(input: &PickerOutput) -> Self {
63 let outputs = input
64 .outputs
65 .iter()
66 .map(|output| SerializedCompactionOutput {
67 output_level: output.output_level,
68 inputs: output.inputs.iter().map(|s| s.meta_ref().clone()).collect(),
69 filter_deleted: output.filter_deleted,
70 output_time_range: output.output_time_range,
71 })
72 .collect();
73 let expired_ssts = input
74 .expired_ssts
75 .iter()
76 .map(|s| s.meta_ref().clone())
77 .collect();
78 Self {
79 outputs,
80 expired_ssts,
81 time_window_size: input.time_window_size,
82 max_file_size: input.max_file_size,
83 }
84 }
85}
86
87impl PickerOutput {
88 pub fn from_serialized(
90 input: SerializedPickerOutput,
91 file_purger: Arc<dyn FilePurger>,
92 ) -> Self {
93 let outputs = input
94 .outputs
95 .into_iter()
96 .map(|output| CompactionOutput {
97 output_level: output.output_level,
98 inputs: output
99 .inputs
100 .into_iter()
101 .map(|file_meta| FileHandle::new(file_meta, file_purger.clone()))
102 .collect(),
103 filter_deleted: output.filter_deleted,
104 output_time_range: output.output_time_range,
105 })
106 .collect();
107
108 let expired_ssts = input
109 .expired_ssts
110 .into_iter()
111 .map(|file_meta| FileHandle::new(file_meta, file_purger.clone()))
112 .collect();
113
114 Self {
115 outputs,
116 expired_ssts,
117 time_window_size: input.time_window_size,
118 max_file_size: input.max_file_size,
119 }
120 }
121}
122
123pub fn new_picker(
125 compact_request_options: &compact_request::Options,
126 compaction_options: &CompactionOptions,
127 append_mode: bool,
128 max_background_tasks: Option<usize>,
129) -> Arc<dyn Picker> {
130 if let compact_request::Options::StrictWindow(window) = compact_request_options {
131 let window = if window.window_seconds == 0 {
132 None
133 } else {
134 Some(window.window_seconds)
135 };
136 Arc::new(WindowedCompactionPicker::new(window)) as Arc<_>
137 } else {
138 match compaction_options {
139 CompactionOptions::Twcs(twcs_opts) => Arc::new(TwcsPicker {
140 trigger_file_num: twcs_opts.trigger_file_num,
141 time_window_seconds: twcs_opts.time_window_seconds(),
142 max_output_file_size: twcs_opts.max_output_file_size.map(|r| r.as_bytes()),
143 append_mode,
144 max_background_tasks,
145 }) as Arc<_>,
146 }
147 }
148}
149
150#[cfg(test)]
151mod tests {
152 use store_api::storage::FileId;
153
154 use super::*;
155 use crate::compaction::test_util::new_file_handle;
156 use crate::test_util::new_noop_file_purger;
157
158 #[test]
159 fn test_picker_output_serialization() {
160 let inputs_file_handle = vec![
161 new_file_handle(FileId::random(), 0, 999, 0),
162 new_file_handle(FileId::random(), 0, 999, 0),
163 new_file_handle(FileId::random(), 0, 999, 0),
164 ];
165 let expired_ssts_file_handle = vec![
166 new_file_handle(FileId::random(), 0, 999, 0),
167 new_file_handle(FileId::random(), 0, 999, 0),
168 ];
169
170 let picker_output = PickerOutput {
171 outputs: vec![
172 CompactionOutput {
173 output_level: 0,
174 inputs: inputs_file_handle.clone(),
175 filter_deleted: false,
176 output_time_range: None,
177 },
178 CompactionOutput {
179 output_level: 0,
180 inputs: inputs_file_handle.clone(),
181 filter_deleted: false,
182 output_time_range: None,
183 },
184 ],
185 expired_ssts: expired_ssts_file_handle.clone(),
186 time_window_size: 1000,
187 max_file_size: None,
188 };
189
190 let picker_output_str =
191 serde_json::to_string(&SerializedPickerOutput::from(&picker_output)).unwrap();
192 let serialized_picker_output: SerializedPickerOutput =
193 serde_json::from_str(&picker_output_str).unwrap();
194 let picker_output_from_serialized =
195 PickerOutput::from_serialized(serialized_picker_output, new_noop_file_purger());
196
197 picker_output
198 .expired_ssts
199 .iter()
200 .zip(picker_output_from_serialized.expired_ssts.iter())
201 .for_each(|(expected, actual)| {
202 assert_eq!(expected.meta_ref(), actual.meta_ref());
203 });
204
205 picker_output
206 .outputs
207 .iter()
208 .zip(picker_output_from_serialized.outputs.iter())
209 .for_each(|(expected, actual)| {
210 assert_eq!(expected.output_level, actual.output_level);
211 expected
212 .inputs
213 .iter()
214 .zip(actual.inputs.iter())
215 .for_each(|(expected, actual)| {
216 assert_eq!(expected.meta_ref(), actual.meta_ref());
217 });
218 assert_eq!(expected.filter_deleted, actual.filter_deleted);
219 assert_eq!(expected.output_time_range, actual.output_time_range);
220 });
221 }
222}