1use std::fmt::{Debug, Formatter};
16use std::sync::Arc;
17use std::time::Instant;
18
19use common_telemetry::{error, info};
20use snafu::ResultExt;
21use tokio::sync::mpsc;
22
23use crate::compaction::compactor::{CompactionRegion, Compactor};
24use crate::compaction::picker::{CompactionTask, PickerOutput};
25use crate::error::CompactRegionSnafu;
26use crate::manifest::action::RegionEdit;
27use crate::metrics::{COMPACTION_FAILURE_COUNT, COMPACTION_STAGE_ELAPSED};
28use crate::request::{
29 BackgroundNotify, CompactionFailed, CompactionFinished, OutputTx, WorkerRequest,
30};
31use crate::worker::WorkerListener;
32use crate::{error, metrics};
33
34pub const MAX_PARALLEL_COMPACTION: usize = 1;
36
37pub(crate) struct CompactionTaskImpl {
38 pub compaction_region: CompactionRegion,
39 pub(crate) request_sender: mpsc::Sender<WorkerRequest>,
41 pub waiters: Vec<OutputTx>,
43 pub start_time: Instant,
45 pub(crate) listener: WorkerListener,
47 pub(crate) compactor: Arc<dyn Compactor>,
49 pub(crate) picker_output: PickerOutput,
51}
52
53impl Debug for CompactionTaskImpl {
54 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
55 f.debug_struct("TwcsCompactionTask")
56 .field("region_id", &self.compaction_region.region_id)
57 .field("picker_output", &self.picker_output)
58 .field(
59 "append_mode",
60 &self.compaction_region.region_options.append_mode,
61 )
62 .finish()
63 }
64}
65
66impl Drop for CompactionTaskImpl {
67 fn drop(&mut self) {
68 self.mark_files_compacting(false)
69 }
70}
71
72impl CompactionTaskImpl {
73 fn mark_files_compacting(&self, compacting: bool) {
74 self.picker_output
75 .outputs
76 .iter()
77 .for_each(|o| o.inputs.iter().for_each(|f| f.set_compacting(compacting)));
78 }
79
80 async fn handle_compaction(&mut self) -> error::Result<RegionEdit> {
81 self.mark_files_compacting(true);
82
83 let merge_timer = COMPACTION_STAGE_ELAPSED
84 .with_label_values(&["merge"])
85 .start_timer();
86
87 let compaction_result = match self
88 .compactor
89 .merge_ssts(&self.compaction_region, self.picker_output.clone())
90 .await
91 {
92 Ok(v) => v,
93 Err(e) => {
94 error!(e; "Failed to compact region: {}", self.compaction_region.region_id);
95 merge_timer.stop_and_discard();
96 return Err(e);
97 }
98 };
99 let merge_time = merge_timer.stop_and_record();
100
101 metrics::COMPACTION_INPUT_BYTES.inc_by(compaction_result.input_file_size() as f64);
102 metrics::COMPACTION_OUTPUT_BYTES.inc_by(compaction_result.output_file_size() as f64);
103 info!(
104 "Compacted SST files, region_id: {}, input: {:?}, output: {:?}, window: {:?}, waiter_num: {}, merge_time: {}s",
105 self.compaction_region.region_id,
106 compaction_result.files_to_remove,
107 compaction_result.files_to_add,
108 compaction_result.compaction_time_window,
109 self.waiters.len(),
110 merge_time,
111 );
112
113 self.listener
114 .on_merge_ssts_finished(self.compaction_region.region_id)
115 .await;
116
117 let _manifest_timer = COMPACTION_STAGE_ELAPSED
118 .with_label_values(&["write_manifest"])
119 .start_timer();
120
121 self.compactor
122 .update_manifest(&self.compaction_region, compaction_result)
123 .await
124 }
125
126 fn on_failure(&mut self, err: Arc<error::Error>) {
128 COMPACTION_FAILURE_COUNT.inc();
129 for waiter in self.waiters.drain(..) {
130 waiter.send(Err(err.clone()).context(CompactRegionSnafu {
131 region_id: self.compaction_region.region_id,
132 }));
133 }
134 }
135
136 async fn send_to_worker(&self, request: WorkerRequest) {
138 if let Err(e) = self.request_sender.send(request).await {
139 error!(
140 "Failed to notify compaction job status for region {}, request: {:?}",
141 self.compaction_region.region_id, e.0
142 );
143 }
144 }
145}
146
147#[async_trait::async_trait]
148impl CompactionTask for CompactionTaskImpl {
149 async fn run(&mut self) {
150 let notify = match self.handle_compaction().await {
151 Ok(edit) => BackgroundNotify::CompactionFinished(CompactionFinished {
152 region_id: self.compaction_region.region_id,
153 senders: std::mem::take(&mut self.waiters),
154 start_time: self.start_time,
155 edit,
156 }),
157 Err(e) => {
158 error!(e; "Failed to compact region, region id: {}", self.compaction_region.region_id);
159 let err = Arc::new(e);
160 self.on_failure(err.clone());
162 BackgroundNotify::CompactionFailed(CompactionFailed {
163 region_id: self.compaction_region.region_id,
164 err,
165 })
166 }
167 };
168
169 self.send_to_worker(WorkerRequest::Background {
170 region_id: self.compaction_region.region_id,
171 notify,
172 })
173 .await;
174 }
175}