1use std::fmt::Debug;
16use std::sync::Arc;
17
18use api::v1::region::compact_request;
19use serde::{Deserialize, Serialize};
20
21use crate::compaction::compactor::CompactionRegion;
22use crate::compaction::twcs::TwcsPicker;
23use crate::compaction::window::WindowedCompactionPicker;
24use crate::compaction::{CompactionOutput, SerializedCompactionOutput};
25use crate::region::options::CompactionOptions;
26use crate::sst::file::{FileHandle, FileMeta};
27use crate::sst::file_purger::FilePurger;
28
29#[async_trait::async_trait]
30pub(crate) trait CompactionTask: Debug + Send + Sync + 'static {
31 async fn run(&mut self);
32}
33
34pub trait Picker: Debug + Send + Sync + 'static {
37 fn pick(&self, compaction_region: &CompactionRegion) -> Option<PickerOutput>;
39}
40
41#[derive(Default, Clone, Debug)]
44pub struct PickerOutput {
45 pub outputs: Vec<CompactionOutput>,
46 pub expired_ssts: Vec<FileHandle>,
47 pub time_window_size: i64,
48}
49
50#[derive(Default, Clone, Debug, Serialize, Deserialize)]
52pub struct SerializedPickerOutput {
53 pub outputs: Vec<SerializedCompactionOutput>,
54 pub expired_ssts: Vec<FileMeta>,
55 pub time_window_size: i64,
56}
57
58impl From<&PickerOutput> for SerializedPickerOutput {
59 fn from(input: &PickerOutput) -> Self {
60 let outputs = input
61 .outputs
62 .iter()
63 .map(|output| SerializedCompactionOutput {
64 output_level: output.output_level,
65 inputs: output.inputs.iter().map(|s| s.meta_ref().clone()).collect(),
66 filter_deleted: output.filter_deleted,
67 output_time_range: output.output_time_range,
68 })
69 .collect();
70 let expired_ssts = input
71 .expired_ssts
72 .iter()
73 .map(|s| s.meta_ref().clone())
74 .collect();
75 Self {
76 outputs,
77 expired_ssts,
78 time_window_size: input.time_window_size,
79 }
80 }
81}
82
83impl PickerOutput {
84 pub fn from_serialized(
86 input: SerializedPickerOutput,
87 file_purger: Arc<dyn FilePurger>,
88 ) -> Self {
89 let outputs = input
90 .outputs
91 .into_iter()
92 .map(|output| CompactionOutput {
93 output_level: output.output_level,
94 inputs: output
95 .inputs
96 .into_iter()
97 .map(|file_meta| FileHandle::new(file_meta, file_purger.clone()))
98 .collect(),
99 filter_deleted: output.filter_deleted,
100 output_time_range: output.output_time_range,
101 })
102 .collect();
103
104 let expired_ssts = input
105 .expired_ssts
106 .into_iter()
107 .map(|file_meta| FileHandle::new(file_meta, file_purger.clone()))
108 .collect();
109
110 Self {
111 outputs,
112 expired_ssts,
113 time_window_size: input.time_window_size,
114 }
115 }
116}
117
118pub fn new_picker(
120 compact_request_options: &compact_request::Options,
121 compaction_options: &CompactionOptions,
122 append_mode: bool,
123) -> Arc<dyn Picker> {
124 if let compact_request::Options::StrictWindow(window) = compact_request_options {
125 let window = if window.window_seconds == 0 {
126 None
127 } else {
128 Some(window.window_seconds)
129 };
130 Arc::new(WindowedCompactionPicker::new(window)) as Arc<_>
131 } else {
132 match compaction_options {
133 CompactionOptions::Twcs(twcs_opts) => Arc::new(TwcsPicker {
134 max_active_window_runs: twcs_opts.max_active_window_runs,
135 max_active_window_files: twcs_opts.max_active_window_files,
136 max_inactive_window_runs: twcs_opts.max_inactive_window_runs,
137 max_inactive_window_files: twcs_opts.max_inactive_window_files,
138 time_window_seconds: twcs_opts.time_window_seconds(),
139 max_output_file_size: twcs_opts.max_output_file_size.map(|r| r.as_bytes()),
140 append_mode,
141 }) as Arc<_>,
142 }
143 }
144}
145
146#[cfg(test)]
147mod tests {
148 use super::*;
149 use crate::compaction::test_util::new_file_handle;
150 use crate::sst::file::FileId;
151 use crate::test_util::new_noop_file_purger;
152
153 #[test]
154 fn test_picker_output_serialization() {
155 let inputs_file_handle = vec![
156 new_file_handle(FileId::random(), 0, 999, 0),
157 new_file_handle(FileId::random(), 0, 999, 0),
158 new_file_handle(FileId::random(), 0, 999, 0),
159 ];
160 let expired_ssts_file_handle = vec![
161 new_file_handle(FileId::random(), 0, 999, 0),
162 new_file_handle(FileId::random(), 0, 999, 0),
163 ];
164
165 let picker_output = PickerOutput {
166 outputs: vec![
167 CompactionOutput {
168 output_level: 0,
169 inputs: inputs_file_handle.clone(),
170 filter_deleted: false,
171 output_time_range: None,
172 },
173 CompactionOutput {
174 output_level: 0,
175 inputs: inputs_file_handle.clone(),
176 filter_deleted: false,
177 output_time_range: None,
178 },
179 ],
180 expired_ssts: expired_ssts_file_handle.clone(),
181 time_window_size: 1000,
182 };
183
184 let picker_output_str =
185 serde_json::to_string(&SerializedPickerOutput::from(&picker_output)).unwrap();
186 let serialized_picker_output: SerializedPickerOutput =
187 serde_json::from_str(&picker_output_str).unwrap();
188 let picker_output_from_serialized =
189 PickerOutput::from_serialized(serialized_picker_output, new_noop_file_purger());
190
191 picker_output
192 .expired_ssts
193 .iter()
194 .zip(picker_output_from_serialized.expired_ssts.iter())
195 .for_each(|(expected, actual)| {
196 assert_eq!(expected.meta_ref(), actual.meta_ref());
197 });
198
199 picker_output
200 .outputs
201 .iter()
202 .zip(picker_output_from_serialized.outputs.iter())
203 .for_each(|(expected, actual)| {
204 assert_eq!(expected.output_level, actual.output_level);
205 expected
206 .inputs
207 .iter()
208 .zip(actual.inputs.iter())
209 .for_each(|(expected, actual)| {
210 assert_eq!(expected.meta_ref(), actual.meta_ref());
211 });
212 assert_eq!(expected.filter_deleted, actual.filter_deleted);
213 assert_eq!(expected.output_time_range, actual.output_time_range);
214 });
215 }
216}