1use std::fmt::{Debug, Formatter};
16use std::sync::Arc;
17use std::time::Instant;
18
19use common_base::cancellation::CancellableFuture;
20use common_memory_manager::OnExhaustedPolicy;
21use common_telemetry::{error, info, warn};
22use itertools::Itertools;
23use snafu::ResultExt;
24use tokio::sync::mpsc;
25
26use crate::compaction::LocalCompactionState;
27use crate::compaction::compactor::{CompactionRegion, Compactor, MergeOutput};
28use crate::compaction::memory_manager::{CompactionMemoryGuard, CompactionMemoryManager};
29use crate::compaction::picker::{CompactionTask, PickerOutput};
30use crate::error::{CompactRegionSnafu, CompactionMemoryExhaustedSnafu};
31use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList};
32use crate::metrics::{COMPACTION_FAILURE_COUNT, COMPACTION_MEMORY_WAIT, COMPACTION_STAGE_ELAPSED};
33use crate::region::RegionRoleState;
34use crate::request::{
35 BackgroundNotify, CompactionCancelled, CompactionFailed, CompactionFinished, OutputTx,
36 RegionEditResult, WorkerRequest, WorkerRequestWithTime,
37};
38use crate::sst::file::FileMeta;
39use crate::worker::WorkerListener;
40use crate::{error, metrics};
41
42pub const MAX_PARALLEL_COMPACTION: usize = 1;
44
45pub(crate) struct CompactionTaskImpl {
46 pub(crate) state: LocalCompactionState,
48 pub compaction_region: CompactionRegion,
49 pub(crate) request_sender: mpsc::Sender<WorkerRequestWithTime>,
51 pub waiters: Vec<OutputTx>,
53 pub start_time: Instant,
55 pub(crate) listener: WorkerListener,
57 pub(crate) compactor: Arc<dyn Compactor>,
59 pub(crate) picker_output: PickerOutput,
61 pub(crate) memory_manager: Arc<CompactionMemoryManager>,
63 pub(crate) memory_policy: OnExhaustedPolicy,
65 pub(crate) estimated_memory_bytes: u64,
67}
68
69impl Debug for CompactionTaskImpl {
70 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
71 f.debug_struct("TwcsCompactionTask")
72 .field("region_id", &self.compaction_region.region_id)
73 .field("picker_output", &self.picker_output)
74 .field(
75 "append_mode",
76 &self.compaction_region.region_options.append_mode,
77 )
78 .finish()
79 }
80}
81
82impl Drop for CompactionTaskImpl {
83 fn drop(&mut self) {
84 self.mark_files_compacting(false)
85 }
86}
87
88impl CompactionTaskImpl {
89 fn mark_files_compacting(&self, compacting: bool) {
90 self.picker_output
91 .outputs
92 .iter()
93 .for_each(|o| o.inputs.iter().for_each(|f| f.set_compacting(compacting)));
94 }
95
96 async fn acquire_memory_with_policy(&self) -> error::Result<CompactionMemoryGuard> {
100 let region_id = self.compaction_region.region_id;
101 let requested_bytes = self.estimated_memory_bytes;
102 let policy = self.memory_policy;
103
104 let _timer = COMPACTION_MEMORY_WAIT.start_timer();
105 self.memory_manager
106 .acquire_with_policy(requested_bytes, policy)
107 .await
108 .context(CompactionMemoryExhaustedSnafu {
109 region_id,
110 policy: format!("{policy:?}"),
111 })
112 }
113
114 async fn remove_expired(
119 &self,
120 compaction_region: &CompactionRegion,
121 expired_files: Vec<FileMeta>,
122 ) {
123 let region_id = compaction_region.region_id;
124 let expired_files_str = expired_files.iter().map(|f| f.file_id).join(",");
125 let (expire_delete_sender, expire_delete_listener) = tokio::sync::oneshot::channel();
126 let edit = RegionEdit {
128 files_to_add: Vec::new(),
129 files_to_remove: expired_files,
130 timestamp_ms: Some(chrono::Utc::now().timestamp_millis()),
131 compaction_time_window: None,
132 flushed_entry_id: None,
133 flushed_sequence: None,
134 committed_sequence: None,
135 };
136
137 let action_list = RegionMetaActionList::with_action(RegionMetaAction::Edit(edit.clone()));
139 let RegionRoleState::Leader(current_region_state) =
140 compaction_region.manifest_ctx.current_state()
141 else {
142 warn!(
143 "Region {} not in leader state, skip removing expired files",
144 region_id
145 );
146 return;
147 };
148 if let Err(e) = compaction_region
149 .manifest_ctx
150 .update_manifest(current_region_state, action_list, false)
151 .await
152 {
153 warn!(
154 e;
155 "Failed to update manifest for expired files removal, region: {region_id}, files: [{expired_files_str}]. Compaction will continue."
156 );
157 return;
158 }
159
160 self.send_to_worker(WorkerRequest::Background {
162 region_id,
163 notify: BackgroundNotify::RegionEdit(RegionEditResult {
164 region_id,
165 sender: expire_delete_sender,
166 edit,
167 result: Ok(()),
168 update_region_state: false,
169 is_staging: false,
170 }),
171 })
172 .await;
173
174 if let Err(e) = expire_delete_listener
175 .await
176 .context(error::RecvSnafu)
177 .flatten()
178 {
179 warn!(
180 e;
181 "Failed to remove expired files from region version, region: {region_id}, files: [{expired_files_str}]. Compaction will continue."
182 );
183 return;
184 }
185
186 info!(
187 "Successfully removed expired files, region: {region_id}, files: [{expired_files_str}]"
188 );
189 }
190
191 async fn handle_expiration(&mut self) {
192 if !self.picker_output.expired_ssts.is_empty() {
194 let remove_timer = COMPACTION_STAGE_ELAPSED
195 .with_label_values(&["remove_expired"])
196 .start_timer();
197 let expired_ssts = self
198 .picker_output
199 .expired_ssts
200 .drain(..)
201 .map(|f| f.meta_ref().clone())
202 .collect();
203 self.remove_expired(&self.compaction_region, expired_ssts)
205 .await;
206 remove_timer.observe_duration();
207 }
208 }
209
210 async fn handle_compaction(&mut self) -> error::Result<MergeOutput> {
211 let merge_timer = COMPACTION_STAGE_ELAPSED
213 .with_label_values(&["merge"])
214 .start_timer();
215
216 let compaction_result = match self
217 .compactor
218 .merge_ssts(&self.compaction_region, self.picker_output.clone())
219 .await
220 {
221 Ok(v) => v,
222 Err(e) => {
223 error!(e; "Failed to compact region: {}", self.compaction_region.region_id);
224 merge_timer.stop_and_discard();
225 return Err(e);
226 }
227 };
228 let merge_time = merge_timer.stop_and_record();
229
230 metrics::COMPACTION_INPUT_BYTES.inc_by(compaction_result.input_file_size() as f64);
231 metrics::COMPACTION_OUTPUT_BYTES.inc_by(compaction_result.output_file_size() as f64);
232 info!(
233 "Compacted SST files, region_id: {}, input: {:?}, output: {:?}, window: {:?}, waiter_num: {}, merge_time: {}s",
234 self.compaction_region.region_id,
235 compaction_result.files_to_remove,
236 compaction_result.files_to_add,
237 compaction_result.compaction_time_window,
238 self.waiters.len(),
239 merge_time,
240 );
241
242 self.listener
243 .on_merge_ssts_finished(self.compaction_region.region_id)
244 .await;
245
246 Ok(compaction_result)
247 }
248
249 async fn update_manifest(
250 &self,
251 compaction_result: crate::compaction::compactor::MergeOutput,
252 ) -> error::Result<RegionEdit> {
253 let _manifest_timer = COMPACTION_STAGE_ELAPSED
254 .with_label_values(&["write_manifest"])
255 .start_timer();
256
257 self.compactor
258 .update_manifest(&self.compaction_region, compaction_result)
259 .await
260 }
261
262 pub(crate) fn on_failure(&mut self, err: Arc<error::Error>) {
264 COMPACTION_FAILURE_COUNT.inc();
265 for waiter in self.waiters.drain(..) {
266 waiter.send(Err(err.clone()).context(CompactRegionSnafu {
267 region_id: self.compaction_region.region_id,
268 }));
269 }
270 }
271
272 async fn send_to_worker(&self, request: WorkerRequest) {
274 if let Err(e) = self
275 .request_sender
276 .send(WorkerRequestWithTime::new(request))
277 .await
278 {
279 error!(
280 "Failed to notify compaction job status for region {}, request: {:?}",
281 self.compaction_region.region_id, e.0
282 );
283 }
284 }
285}
286
287#[async_trait::async_trait]
288impl CompactionTask for CompactionTaskImpl {
289 async fn run(&mut self) {
290 let _memory_guard = match self.acquire_memory_with_policy().await {
292 Ok(guard) => guard,
293 Err(e) => {
294 error!(e; "Failed to acquire memory for compaction, region id: {}", self.compaction_region.region_id);
295 let err = Arc::new(e);
296 self.on_failure(err.clone());
297 let notify = BackgroundNotify::CompactionFailed(CompactionFailed {
298 region_id: self.compaction_region.region_id,
299 err,
300 });
301 self.send_to_worker(WorkerRequest::Background {
302 region_id: self.compaction_region.region_id,
303 notify,
304 })
305 .await;
306 return;
307 }
308 };
309
310 self.mark_files_compacting(true);
312 self.handle_expiration().await;
313
314 let cancel_handle = self.state.cancel_handle();
315 let notify = match CancellableFuture::new(
317 async { self.handle_compaction().await },
318 cancel_handle,
319 )
320 .await
321 {
322 Ok(Ok(merge_output)) => {
323 if !self.state.mark_commit_started() {
325 let senders = std::mem::take(&mut self.waiters);
326 BackgroundNotify::CompactionCancelled(CompactionCancelled {
327 region_id: self.compaction_region.region_id,
328 senders,
329 })
330 } else {
331 match self.update_manifest(merge_output).await {
332 Ok(edit) => {
333 let senders = std::mem::take(&mut self.waiters);
334 BackgroundNotify::CompactionFinished(CompactionFinished {
335 region_id: self.compaction_region.region_id,
336 senders,
337 start_time: self.start_time,
338 edit,
339 })
340 }
341 Err(e) => {
342 error!(e; "Failed to compact region, region id: {}", self.compaction_region.region_id);
343 let err = Arc::new(e);
344 self.on_failure(err.clone());
345 BackgroundNotify::CompactionFailed(CompactionFailed {
346 region_id: self.compaction_region.region_id,
347 err,
348 })
349 }
350 }
351 }
352 }
353 Err(_) => {
354 info!(
355 "Compaction cancelled, region id: {}",
356 self.compaction_region.region_id
357 );
358 let senders = std::mem::take(&mut self.waiters);
359 BackgroundNotify::CompactionCancelled(CompactionCancelled {
360 region_id: self.compaction_region.region_id,
361 senders,
362 })
363 }
364 Ok(Err(e)) => {
365 error!(e; "Failed to compact region, region id: {}", self.compaction_region.region_id);
366 let err = Arc::new(e);
367 self.on_failure(err.clone());
369 BackgroundNotify::CompactionFailed(CompactionFailed {
370 region_id: self.compaction_region.region_id,
371 err,
372 })
373 }
374 };
375
376 self.send_to_worker(WorkerRequest::Background {
377 region_id: self.compaction_region.region_id,
378 notify,
379 })
380 .await;
381 }
382}
383
384#[cfg(test)]
385mod tests {
386 use store_api::storage::FileId;
387
388 use crate::compaction::picker::PickerOutput;
389 use crate::compaction::test_util::new_file_handle;
390
391 #[test]
392 fn test_picker_output_with_expired_ssts() {
393 let file_ids = (0..3).map(|_| FileId::random()).collect::<Vec<_>>();
398 let expired_ssts = vec![
399 new_file_handle(file_ids[0], 0, 999, 0),
400 new_file_handle(file_ids[1], 1000, 1999, 0),
401 ];
402
403 let picker_output = PickerOutput {
404 outputs: vec![],
405 expired_ssts: expired_ssts.clone(),
406 time_window_size: 3600,
407 max_file_size: None,
408 };
409
410 assert_eq!(picker_output.expired_ssts.len(), 2);
412 assert_eq!(
413 picker_output.expired_ssts[0].file_id(),
414 expired_ssts[0].file_id()
415 );
416 assert_eq!(
417 picker_output.expired_ssts[1].file_id(),
418 expired_ssts[1].file_id()
419 );
420 }
421
422 #[test]
423 fn test_picker_output_without_expired_ssts() {
424 let picker_output = PickerOutput {
426 outputs: vec![],
427 expired_ssts: vec![],
428 time_window_size: 3600,
429 max_file_size: None,
430 };
431
432 assert!(picker_output.expired_ssts.is_empty());
434 }
435
436 }