1use std::fmt::{Debug, Formatter};
16use std::sync::Arc;
17use std::time::Instant;
18
19use common_telemetry::{error, info, warn};
20use itertools::Itertools;
21use snafu::ResultExt;
22use tokio::sync::mpsc;
23
24use crate::compaction::compactor::{CompactionRegion, Compactor};
25use crate::compaction::picker::{CompactionTask, PickerOutput};
26use crate::error::CompactRegionSnafu;
27use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList};
28use crate::metrics::{COMPACTION_FAILURE_COUNT, COMPACTION_STAGE_ELAPSED};
29use crate::region::RegionRoleState;
30use crate::request::{
31 BackgroundNotify, CompactionFailed, CompactionFinished, OutputTx, RegionEditResult,
32 WorkerRequest, WorkerRequestWithTime,
33};
34use crate::sst::file::FileMeta;
35use crate::worker::WorkerListener;
36use crate::{error, metrics};
37
38pub const MAX_PARALLEL_COMPACTION: usize = 1;
40
41pub(crate) struct CompactionTaskImpl {
42 pub compaction_region: CompactionRegion,
43 pub(crate) request_sender: mpsc::Sender<WorkerRequestWithTime>,
45 pub waiters: Vec<OutputTx>,
47 pub start_time: Instant,
49 pub(crate) listener: WorkerListener,
51 pub(crate) compactor: Arc<dyn Compactor>,
53 pub(crate) picker_output: PickerOutput,
55}
56
57impl Debug for CompactionTaskImpl {
58 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
59 f.debug_struct("TwcsCompactionTask")
60 .field("region_id", &self.compaction_region.region_id)
61 .field("picker_output", &self.picker_output)
62 .field(
63 "append_mode",
64 &self.compaction_region.region_options.append_mode,
65 )
66 .finish()
67 }
68}
69
70impl Drop for CompactionTaskImpl {
71 fn drop(&mut self) {
72 self.mark_files_compacting(false)
73 }
74}
75
76impl CompactionTaskImpl {
77 fn mark_files_compacting(&self, compacting: bool) {
78 self.picker_output
79 .outputs
80 .iter()
81 .for_each(|o| o.inputs.iter().for_each(|f| f.set_compacting(compacting)));
82 }
83
84 async fn remove_expired(
89 &self,
90 compaction_region: &CompactionRegion,
91 expired_files: Vec<FileMeta>,
92 ) {
93 let region_id = compaction_region.region_id;
94 let expired_files_str = expired_files.iter().map(|f| f.file_id).join(",");
95 let (expire_delete_sender, expire_delete_listener) = tokio::sync::oneshot::channel();
96 let edit = RegionEdit {
98 files_to_add: Vec::new(),
99 files_to_remove: expired_files,
100 timestamp_ms: Some(chrono::Utc::now().timestamp_millis()),
101 compaction_time_window: None,
102 flushed_entry_id: None,
103 flushed_sequence: None,
104 committed_sequence: None,
105 };
106
107 let action_list = RegionMetaActionList::with_action(RegionMetaAction::Edit(edit.clone()));
109 let RegionRoleState::Leader(current_region_state) =
110 compaction_region.manifest_ctx.current_state()
111 else {
112 warn!(
113 "Region {} not in leader state, skip removing expired files",
114 region_id
115 );
116 return;
117 };
118 if let Err(e) = compaction_region
119 .manifest_ctx
120 .update_manifest(current_region_state, action_list, false)
121 .await
122 {
123 warn!(
124 e;
125 "Failed to update manifest for expired files removal, region: {region_id}, files: [{expired_files_str}]. Compaction will continue."
126 );
127 return;
128 }
129
130 self.send_to_worker(WorkerRequest::Background {
132 region_id,
133 notify: BackgroundNotify::RegionEdit(RegionEditResult {
134 region_id,
135 sender: expire_delete_sender,
136 edit,
137 result: Ok(()),
138 update_region_state: false,
139 }),
140 })
141 .await;
142
143 if let Err(e) = expire_delete_listener
144 .await
145 .context(error::RecvSnafu)
146 .flatten()
147 {
148 warn!(
149 e;
150 "Failed to remove expired files from region version, region: {region_id}, files: [{expired_files_str}]. Compaction will continue."
151 );
152 return;
153 }
154
155 info!(
156 "Successfully removed expired files, region: {region_id}, files: [{expired_files_str}]"
157 );
158 }
159
160 async fn handle_expiration_and_compaction(&mut self) -> error::Result<RegionEdit> {
161 self.mark_files_compacting(true);
162
163 if !self.picker_output.expired_ssts.is_empty() {
165 let remove_timer = COMPACTION_STAGE_ELAPSED
166 .with_label_values(&["remove_expired"])
167 .start_timer();
168 let expired_ssts = self
169 .picker_output
170 .expired_ssts
171 .drain(..)
172 .map(|f| f.meta_ref().clone())
173 .collect();
174 self.remove_expired(&self.compaction_region, expired_ssts)
176 .await;
177 remove_timer.observe_duration();
178 }
179
180 let merge_timer = COMPACTION_STAGE_ELAPSED
182 .with_label_values(&["merge"])
183 .start_timer();
184
185 let compaction_result = match self
186 .compactor
187 .merge_ssts(&self.compaction_region, self.picker_output.clone())
188 .await
189 {
190 Ok(v) => v,
191 Err(e) => {
192 error!(e; "Failed to compact region: {}", self.compaction_region.region_id);
193 merge_timer.stop_and_discard();
194 return Err(e);
195 }
196 };
197 let merge_time = merge_timer.stop_and_record();
198
199 metrics::COMPACTION_INPUT_BYTES.inc_by(compaction_result.input_file_size() as f64);
200 metrics::COMPACTION_OUTPUT_BYTES.inc_by(compaction_result.output_file_size() as f64);
201 info!(
202 "Compacted SST files, region_id: {}, input: {:?}, output: {:?}, window: {:?}, waiter_num: {}, merge_time: {}s",
203 self.compaction_region.region_id,
204 compaction_result.files_to_remove,
205 compaction_result.files_to_add,
206 compaction_result.compaction_time_window,
207 self.waiters.len(),
208 merge_time,
209 );
210
211 self.listener
212 .on_merge_ssts_finished(self.compaction_region.region_id)
213 .await;
214
215 let _manifest_timer = COMPACTION_STAGE_ELAPSED
216 .with_label_values(&["write_manifest"])
217 .start_timer();
218
219 self.compactor
220 .update_manifest(&self.compaction_region, compaction_result)
221 .await
222 }
223
224 fn on_failure(&mut self, err: Arc<error::Error>) {
226 COMPACTION_FAILURE_COUNT.inc();
227 for waiter in self.waiters.drain(..) {
228 waiter.send(Err(err.clone()).context(CompactRegionSnafu {
229 region_id: self.compaction_region.region_id,
230 }));
231 }
232 }
233
234 async fn send_to_worker(&self, request: WorkerRequest) {
236 if let Err(e) = self
237 .request_sender
238 .send(WorkerRequestWithTime::new(request))
239 .await
240 {
241 error!(
242 "Failed to notify compaction job status for region {}, request: {:?}",
243 self.compaction_region.region_id, e.0
244 );
245 }
246 }
247}
248
249#[async_trait::async_trait]
250impl CompactionTask for CompactionTaskImpl {
251 async fn run(&mut self) {
252 let notify = match self.handle_expiration_and_compaction().await {
253 Ok(edit) => BackgroundNotify::CompactionFinished(CompactionFinished {
254 region_id: self.compaction_region.region_id,
255 senders: std::mem::take(&mut self.waiters),
256 start_time: self.start_time,
257 edit,
258 }),
259 Err(e) => {
260 error!(e; "Failed to compact region, region id: {}", self.compaction_region.region_id);
261 let err = Arc::new(e);
262 self.on_failure(err.clone());
264 BackgroundNotify::CompactionFailed(CompactionFailed {
265 region_id: self.compaction_region.region_id,
266 err,
267 })
268 }
269 };
270
271 self.send_to_worker(WorkerRequest::Background {
272 region_id: self.compaction_region.region_id,
273 notify,
274 })
275 .await;
276 }
277}
278
279#[cfg(test)]
280mod tests {
281 use store_api::storage::FileId;
282
283 use crate::compaction::picker::PickerOutput;
284 use crate::compaction::test_util::new_file_handle;
285
286 #[test]
287 fn test_picker_output_with_expired_ssts() {
288 let file_ids = (0..3).map(|_| FileId::random()).collect::<Vec<_>>();
293 let expired_ssts = vec![
294 new_file_handle(file_ids[0], 0, 999, 0),
295 new_file_handle(file_ids[1], 1000, 1999, 0),
296 ];
297
298 let picker_output = PickerOutput {
299 outputs: vec![],
300 expired_ssts: expired_ssts.clone(),
301 time_window_size: 3600,
302 max_file_size: None,
303 };
304
305 assert_eq!(picker_output.expired_ssts.len(), 2);
307 assert_eq!(
308 picker_output.expired_ssts[0].file_id(),
309 expired_ssts[0].file_id()
310 );
311 assert_eq!(
312 picker_output.expired_ssts[1].file_id(),
313 expired_ssts[1].file_id()
314 );
315 }
316
317 #[test]
318 fn test_picker_output_without_expired_ssts() {
319 let picker_output = PickerOutput {
321 outputs: vec![],
322 expired_ssts: vec![],
323 time_window_size: 3600,
324 max_file_size: None,
325 };
326
327 assert!(picker_output.expired_ssts.is_empty());
329 }
330
331 }