use std::fmt::Debug;
use std::sync::Arc;
use api::v1::region::compact_request;
use serde::{Deserialize, Serialize};
use crate::compaction::compactor::CompactionRegion;
use crate::compaction::twcs::TwcsPicker;
use crate::compaction::window::WindowedCompactionPicker;
use crate::compaction::{CompactionOutput, SerializedCompactionOutput};
use crate::region::options::CompactionOptions;
use crate::sst::file::{FileHandle, FileMeta};
use crate::sst::file_purger::FilePurger;
#[async_trait::async_trait]
pub(crate) trait CompactionTask: Debug + Send + Sync + 'static {
async fn run(&mut self);
}
pub trait Picker: Debug + Send + Sync + 'static {
fn pick(&self, compaction_region: &CompactionRegion) -> Option<PickerOutput>;
}
#[derive(Default, Clone, Debug)]
pub struct PickerOutput {
pub outputs: Vec<CompactionOutput>,
pub expired_ssts: Vec<FileHandle>,
pub time_window_size: i64,
}
#[derive(Default, Clone, Debug, Serialize, Deserialize)]
pub struct SerializedPickerOutput {
pub outputs: Vec<SerializedCompactionOutput>,
pub expired_ssts: Vec<FileMeta>,
pub time_window_size: i64,
}
impl From<&PickerOutput> for SerializedPickerOutput {
fn from(input: &PickerOutput) -> Self {
let outputs = input
.outputs
.iter()
.map(|output| SerializedCompactionOutput {
output_level: output.output_level,
inputs: output.inputs.iter().map(|s| s.meta_ref().clone()).collect(),
filter_deleted: output.filter_deleted,
output_time_range: output.output_time_range,
})
.collect();
let expired_ssts = input
.expired_ssts
.iter()
.map(|s| s.meta_ref().clone())
.collect();
Self {
outputs,
expired_ssts,
time_window_size: input.time_window_size,
}
}
}
impl PickerOutput {
pub fn from_serialized(
input: SerializedPickerOutput,
file_purger: Arc<dyn FilePurger>,
) -> Self {
let outputs = input
.outputs
.into_iter()
.map(|output| CompactionOutput {
output_level: output.output_level,
inputs: output
.inputs
.into_iter()
.map(|file_meta| FileHandle::new(file_meta, file_purger.clone()))
.collect(),
filter_deleted: output.filter_deleted,
output_time_range: output.output_time_range,
})
.collect();
let expired_ssts = input
.expired_ssts
.into_iter()
.map(|file_meta| FileHandle::new(file_meta, file_purger.clone()))
.collect();
Self {
outputs,
expired_ssts,
time_window_size: input.time_window_size,
}
}
}
pub fn new_picker(
compact_request_options: &compact_request::Options,
compaction_options: &CompactionOptions,
append_mode: bool,
) -> Arc<dyn Picker> {
if let compact_request::Options::StrictWindow(window) = compact_request_options {
let window = if window.window_seconds == 0 {
None
} else {
Some(window.window_seconds)
};
Arc::new(WindowedCompactionPicker::new(window)) as Arc<_>
} else {
match compaction_options {
CompactionOptions::Twcs(twcs_opts) => Arc::new(TwcsPicker {
max_active_window_runs: twcs_opts.max_active_window_runs,
max_active_window_files: twcs_opts.max_active_window_files,
max_inactive_window_runs: twcs_opts.max_inactive_window_runs,
max_inactive_window_files: twcs_opts.max_inactive_window_files,
time_window_seconds: twcs_opts.time_window_seconds(),
max_output_file_size: twcs_opts.max_output_file_size.map(|r| r.as_bytes()),
append_mode,
}) as Arc<_>,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::compaction::test_util::new_file_handle;
use crate::sst::file::FileId;
use crate::test_util::new_noop_file_purger;
#[test]
fn test_picker_output_serialization() {
let inputs_file_handle = vec![
new_file_handle(FileId::random(), 0, 999, 0),
new_file_handle(FileId::random(), 0, 999, 0),
new_file_handle(FileId::random(), 0, 999, 0),
];
let expired_ssts_file_handle = vec![
new_file_handle(FileId::random(), 0, 999, 0),
new_file_handle(FileId::random(), 0, 999, 0),
];
let picker_output = PickerOutput {
outputs: vec![
CompactionOutput {
output_level: 0,
inputs: inputs_file_handle.clone(),
filter_deleted: false,
output_time_range: None,
},
CompactionOutput {
output_level: 0,
inputs: inputs_file_handle.clone(),
filter_deleted: false,
output_time_range: None,
},
],
expired_ssts: expired_ssts_file_handle.clone(),
time_window_size: 1000,
};
let picker_output_str =
serde_json::to_string(&SerializedPickerOutput::from(&picker_output)).unwrap();
let serialized_picker_output: SerializedPickerOutput =
serde_json::from_str(&picker_output_str).unwrap();
let picker_output_from_serialized =
PickerOutput::from_serialized(serialized_picker_output, new_noop_file_purger());
picker_output
.expired_ssts
.iter()
.zip(picker_output_from_serialized.expired_ssts.iter())
.for_each(|(expected, actual)| {
assert_eq!(expected.meta_ref(), actual.meta_ref());
});
picker_output
.outputs
.iter()
.zip(picker_output_from_serialized.outputs.iter())
.for_each(|(expected, actual)| {
assert_eq!(expected.output_level, actual.output_level);
expected
.inputs
.iter()
.zip(actual.inputs.iter())
.for_each(|(expected, actual)| {
assert_eq!(expected.meta_ref(), actual.meta_ref());
});
assert_eq!(expected.filter_deleted, actual.filter_deleted);
assert_eq!(expected.output_time_range, actual.output_time_range);
});
}
}