1use std::fmt::{Debug, Formatter};
16use std::sync::Arc;
17use std::time::Instant;
18
19use common_telemetry::{error, info};
20use snafu::ResultExt;
21use tokio::sync::mpsc;
22
23use crate::compaction::compactor::{CompactionRegion, Compactor};
24use crate::compaction::picker::{CompactionTask, PickerOutput};
25use crate::error::CompactRegionSnafu;
26use crate::manifest::action::RegionEdit;
27use crate::metrics::{COMPACTION_FAILURE_COUNT, COMPACTION_STAGE_ELAPSED};
28use crate::request::{
29 BackgroundNotify, CompactionFailed, CompactionFinished, OutputTx, WorkerRequest,
30 WorkerRequestWithTime,
31};
32use crate::worker::WorkerListener;
33use crate::{error, metrics};
34
35pub const MAX_PARALLEL_COMPACTION: usize = 1;
37
38pub(crate) struct CompactionTaskImpl {
39 pub compaction_region: CompactionRegion,
40 pub(crate) request_sender: mpsc::Sender<WorkerRequestWithTime>,
42 pub waiters: Vec<OutputTx>,
44 pub start_time: Instant,
46 pub(crate) listener: WorkerListener,
48 pub(crate) compactor: Arc<dyn Compactor>,
50 pub(crate) picker_output: PickerOutput,
52}
53
54impl Debug for CompactionTaskImpl {
55 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
56 f.debug_struct("TwcsCompactionTask")
57 .field("region_id", &self.compaction_region.region_id)
58 .field("picker_output", &self.picker_output)
59 .field(
60 "append_mode",
61 &self.compaction_region.region_options.append_mode,
62 )
63 .finish()
64 }
65}
66
67impl Drop for CompactionTaskImpl {
68 fn drop(&mut self) {
69 self.mark_files_compacting(false)
70 }
71}
72
73impl CompactionTaskImpl {
74 fn mark_files_compacting(&self, compacting: bool) {
75 self.picker_output
76 .outputs
77 .iter()
78 .for_each(|o| o.inputs.iter().for_each(|f| f.set_compacting(compacting)));
79 }
80
81 async fn handle_compaction(&mut self) -> error::Result<RegionEdit> {
82 self.mark_files_compacting(true);
83
84 let merge_timer = COMPACTION_STAGE_ELAPSED
85 .with_label_values(&["merge"])
86 .start_timer();
87
88 let compaction_result = match self
89 .compactor
90 .merge_ssts(&self.compaction_region, self.picker_output.clone())
91 .await
92 {
93 Ok(v) => v,
94 Err(e) => {
95 error!(e; "Failed to compact region: {}", self.compaction_region.region_id);
96 merge_timer.stop_and_discard();
97 return Err(e);
98 }
99 };
100 let merge_time = merge_timer.stop_and_record();
101
102 metrics::COMPACTION_INPUT_BYTES.inc_by(compaction_result.input_file_size() as f64);
103 metrics::COMPACTION_OUTPUT_BYTES.inc_by(compaction_result.output_file_size() as f64);
104 info!(
105 "Compacted SST files, region_id: {}, input: {:?}, output: {:?}, window: {:?}, waiter_num: {}, merge_time: {}s",
106 self.compaction_region.region_id,
107 compaction_result.files_to_remove,
108 compaction_result.files_to_add,
109 compaction_result.compaction_time_window,
110 self.waiters.len(),
111 merge_time,
112 );
113
114 self.listener
115 .on_merge_ssts_finished(self.compaction_region.region_id)
116 .await;
117
118 let _manifest_timer = COMPACTION_STAGE_ELAPSED
119 .with_label_values(&["write_manifest"])
120 .start_timer();
121
122 self.compactor
123 .update_manifest(&self.compaction_region, compaction_result)
124 .await
125 }
126
127 fn on_failure(&mut self, err: Arc<error::Error>) {
129 COMPACTION_FAILURE_COUNT.inc();
130 for waiter in self.waiters.drain(..) {
131 waiter.send(Err(err.clone()).context(CompactRegionSnafu {
132 region_id: self.compaction_region.region_id,
133 }));
134 }
135 }
136
137 async fn send_to_worker(&self, request: WorkerRequest) {
139 if let Err(e) = self
140 .request_sender
141 .send(WorkerRequestWithTime::new(request))
142 .await
143 {
144 error!(
145 "Failed to notify compaction job status for region {}, request: {:?}",
146 self.compaction_region.region_id, e.0
147 );
148 }
149 }
150}
151
152#[async_trait::async_trait]
153impl CompactionTask for CompactionTaskImpl {
154 async fn run(&mut self) {
155 let notify = match self.handle_compaction().await {
156 Ok(edit) => BackgroundNotify::CompactionFinished(CompactionFinished {
157 region_id: self.compaction_region.region_id,
158 senders: std::mem::take(&mut self.waiters),
159 start_time: self.start_time,
160 edit,
161 }),
162 Err(e) => {
163 error!(e; "Failed to compact region, region id: {}", self.compaction_region.region_id);
164 let err = Arc::new(e);
165 self.on_failure(err.clone());
167 BackgroundNotify::CompactionFailed(CompactionFailed {
168 region_id: self.compaction_region.region_id,
169 err,
170 })
171 }
172 };
173
174 self.send_to_worker(WorkerRequest::Background {
175 region_id: self.compaction_region.region_id,
176 notify,
177 })
178 .await;
179 }
180}