1use std::fmt::{Debug, Formatter};
16use std::sync::Arc;
17use std::time::Instant;
18
19use common_telemetry::{error, info, warn};
20use itertools::Itertools;
21use snafu::ResultExt;
22use tokio::sync::mpsc;
23
24use crate::compaction::compactor::{CompactionRegion, Compactor};
25use crate::compaction::picker::{CompactionTask, PickerOutput};
26use crate::error::CompactRegionSnafu;
27use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList};
28use crate::metrics::{COMPACTION_FAILURE_COUNT, COMPACTION_STAGE_ELAPSED};
29use crate::region::RegionLeaderState;
30use crate::request::{
31 BackgroundNotify, CompactionFailed, CompactionFinished, OutputTx, RegionEditResult,
32 WorkerRequest, WorkerRequestWithTime,
33};
34use crate::sst::file::FileMeta;
35use crate::worker::WorkerListener;
36use crate::{error, metrics};
37
38pub const MAX_PARALLEL_COMPACTION: usize = 1;
40
41pub(crate) struct CompactionTaskImpl {
42 pub compaction_region: CompactionRegion,
43 pub(crate) request_sender: mpsc::Sender<WorkerRequestWithTime>,
45 pub waiters: Vec<OutputTx>,
47 pub start_time: Instant,
49 pub(crate) listener: WorkerListener,
51 pub(crate) compactor: Arc<dyn Compactor>,
53 pub(crate) picker_output: PickerOutput,
55}
56
57impl Debug for CompactionTaskImpl {
58 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
59 f.debug_struct("TwcsCompactionTask")
60 .field("region_id", &self.compaction_region.region_id)
61 .field("picker_output", &self.picker_output)
62 .field(
63 "append_mode",
64 &self.compaction_region.region_options.append_mode,
65 )
66 .finish()
67 }
68}
69
70impl Drop for CompactionTaskImpl {
71 fn drop(&mut self) {
72 self.mark_files_compacting(false)
73 }
74}
75
76impl CompactionTaskImpl {
77 fn mark_files_compacting(&self, compacting: bool) {
78 self.picker_output
79 .outputs
80 .iter()
81 .for_each(|o| o.inputs.iter().for_each(|f| f.set_compacting(compacting)));
82 }
83
84 async fn remove_expired(
89 &self,
90 compaction_region: &CompactionRegion,
91 expired_files: Vec<FileMeta>,
92 ) {
93 let region_id = compaction_region.region_id;
94 let expired_files_str = expired_files.iter().map(|f| f.file_id).join(",");
95 let (expire_delete_sender, expire_delete_listener) = tokio::sync::oneshot::channel();
96 let edit = RegionEdit {
98 files_to_add: Vec::new(),
99 files_to_remove: expired_files,
100 timestamp_ms: Some(chrono::Utc::now().timestamp_millis()),
101 compaction_time_window: None,
102 flushed_entry_id: None,
103 flushed_sequence: None,
104 committed_sequence: None,
105 };
106
107 let action_list = RegionMetaActionList::with_action(RegionMetaAction::Edit(edit.clone()));
109 if let Err(e) = compaction_region
110 .manifest_ctx
111 .update_manifest(RegionLeaderState::Writable, action_list)
112 .await
113 {
114 error!(
115 e;
116 "Failed to update manifest for expired files removal, region: {region_id}, files: [{expired_files_str}]. Compaction will continue."
117 );
118 return;
119 }
120
121 self.send_to_worker(WorkerRequest::Background {
123 region_id,
124 notify: BackgroundNotify::RegionEdit(RegionEditResult {
125 region_id,
126 sender: expire_delete_sender,
127 edit,
128 result: Ok(()),
129 }),
130 })
131 .await;
132
133 if let Err(e) = expire_delete_listener
134 .await
135 .context(error::RecvSnafu)
136 .flatten()
137 {
138 warn!(
139 e;
140 "Failed to remove expired files from region version, region: {region_id}, files: [{expired_files_str}]. Compaction will continue."
141 );
142 return;
143 }
144
145 info!(
146 "Successfully removed expired files, region: {region_id}, files: [{expired_files_str}]"
147 );
148 }
149
150 async fn handle_expiration_and_compaction(&mut self) -> error::Result<RegionEdit> {
151 self.mark_files_compacting(true);
152
153 if !self.picker_output.expired_ssts.is_empty() {
155 let remove_timer = COMPACTION_STAGE_ELAPSED
156 .with_label_values(&["remove_expired"])
157 .start_timer();
158 let expired_ssts = self
159 .picker_output
160 .expired_ssts
161 .drain(..)
162 .map(|f| f.meta_ref().clone())
163 .collect();
164 self.remove_expired(&self.compaction_region, expired_ssts)
166 .await;
167 remove_timer.observe_duration();
168 }
169
170 let merge_timer = COMPACTION_STAGE_ELAPSED
172 .with_label_values(&["merge"])
173 .start_timer();
174
175 let compaction_result = match self
176 .compactor
177 .merge_ssts(&self.compaction_region, self.picker_output.clone())
178 .await
179 {
180 Ok(v) => v,
181 Err(e) => {
182 error!(e; "Failed to compact region: {}", self.compaction_region.region_id);
183 merge_timer.stop_and_discard();
184 return Err(e);
185 }
186 };
187 let merge_time = merge_timer.stop_and_record();
188
189 metrics::COMPACTION_INPUT_BYTES.inc_by(compaction_result.input_file_size() as f64);
190 metrics::COMPACTION_OUTPUT_BYTES.inc_by(compaction_result.output_file_size() as f64);
191 info!(
192 "Compacted SST files, region_id: {}, input: {:?}, output: {:?}, window: {:?}, waiter_num: {}, merge_time: {}s",
193 self.compaction_region.region_id,
194 compaction_result.files_to_remove,
195 compaction_result.files_to_add,
196 compaction_result.compaction_time_window,
197 self.waiters.len(),
198 merge_time,
199 );
200
201 self.listener
202 .on_merge_ssts_finished(self.compaction_region.region_id)
203 .await;
204
205 let _manifest_timer = COMPACTION_STAGE_ELAPSED
206 .with_label_values(&["write_manifest"])
207 .start_timer();
208
209 self.compactor
210 .update_manifest(&self.compaction_region, compaction_result)
211 .await
212 }
213
214 fn on_failure(&mut self, err: Arc<error::Error>) {
216 COMPACTION_FAILURE_COUNT.inc();
217 for waiter in self.waiters.drain(..) {
218 waiter.send(Err(err.clone()).context(CompactRegionSnafu {
219 region_id: self.compaction_region.region_id,
220 }));
221 }
222 }
223
224 async fn send_to_worker(&self, request: WorkerRequest) {
226 if let Err(e) = self
227 .request_sender
228 .send(WorkerRequestWithTime::new(request))
229 .await
230 {
231 error!(
232 "Failed to notify compaction job status for region {}, request: {:?}",
233 self.compaction_region.region_id, e.0
234 );
235 }
236 }
237}
238
239#[async_trait::async_trait]
240impl CompactionTask for CompactionTaskImpl {
241 async fn run(&mut self) {
242 let notify = match self.handle_expiration_and_compaction().await {
243 Ok(edit) => BackgroundNotify::CompactionFinished(CompactionFinished {
244 region_id: self.compaction_region.region_id,
245 senders: std::mem::take(&mut self.waiters),
246 start_time: self.start_time,
247 edit,
248 }),
249 Err(e) => {
250 error!(e; "Failed to compact region, region id: {}", self.compaction_region.region_id);
251 let err = Arc::new(e);
252 self.on_failure(err.clone());
254 BackgroundNotify::CompactionFailed(CompactionFailed {
255 region_id: self.compaction_region.region_id,
256 err,
257 })
258 }
259 };
260
261 self.send_to_worker(WorkerRequest::Background {
262 region_id: self.compaction_region.region_id,
263 notify,
264 })
265 .await;
266 }
267}
268
269#[cfg(test)]
270mod tests {
271 use store_api::storage::FileId;
272
273 use crate::compaction::picker::PickerOutput;
274 use crate::compaction::test_util::new_file_handle;
275
276 #[test]
277 fn test_picker_output_with_expired_ssts() {
278 let file_ids = (0..3).map(|_| FileId::random()).collect::<Vec<_>>();
283 let expired_ssts = vec![
284 new_file_handle(file_ids[0], 0, 999, 0),
285 new_file_handle(file_ids[1], 1000, 1999, 0),
286 ];
287
288 let picker_output = PickerOutput {
289 outputs: vec![],
290 expired_ssts: expired_ssts.clone(),
291 time_window_size: 3600,
292 max_file_size: None,
293 };
294
295 assert_eq!(picker_output.expired_ssts.len(), 2);
297 assert_eq!(
298 picker_output.expired_ssts[0].file_id(),
299 expired_ssts[0].file_id()
300 );
301 assert_eq!(
302 picker_output.expired_ssts[1].file_id(),
303 expired_ssts[1].file_id()
304 );
305 }
306
307 #[test]
308 fn test_picker_output_without_expired_ssts() {
309 let picker_output = PickerOutput {
311 outputs: vec![],
312 expired_ssts: vec![],
313 time_window_size: 3600,
314 max_file_size: None,
315 };
316
317 assert!(picker_output.expired_ssts.is_empty());
319 }
320
321 }