1use std::fmt::{Debug, Formatter};
16use std::sync::Arc;
17use std::time::Instant;
18
19use common_memory_manager::OnExhaustedPolicy;
20use common_telemetry::{error, info, warn};
21use itertools::Itertools;
22use snafu::ResultExt;
23use tokio::sync::mpsc;
24
25use crate::compaction::compactor::{CompactionRegion, Compactor};
26use crate::compaction::memory_manager::{CompactionMemoryGuard, CompactionMemoryManager};
27use crate::compaction::picker::{CompactionTask, PickerOutput};
28use crate::error::{CompactRegionSnafu, CompactionMemoryExhaustedSnafu};
29use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList};
30use crate::metrics::{COMPACTION_FAILURE_COUNT, COMPACTION_MEMORY_WAIT, COMPACTION_STAGE_ELAPSED};
31use crate::region::RegionRoleState;
32use crate::request::{
33 BackgroundNotify, CompactionFailed, CompactionFinished, OutputTx, RegionEditResult,
34 WorkerRequest, WorkerRequestWithTime,
35};
36use crate::sst::file::FileMeta;
37use crate::worker::WorkerListener;
38use crate::{error, metrics};
39
40pub const MAX_PARALLEL_COMPACTION: usize = 1;
42
43pub(crate) struct CompactionTaskImpl {
44 pub compaction_region: CompactionRegion,
45 pub(crate) request_sender: mpsc::Sender<WorkerRequestWithTime>,
47 pub waiters: Vec<OutputTx>,
49 pub start_time: Instant,
51 pub(crate) listener: WorkerListener,
53 pub(crate) compactor: Arc<dyn Compactor>,
55 pub(crate) picker_output: PickerOutput,
57 pub(crate) memory_manager: Arc<CompactionMemoryManager>,
59 pub(crate) memory_policy: OnExhaustedPolicy,
61 pub(crate) estimated_memory_bytes: u64,
63}
64
65impl Debug for CompactionTaskImpl {
66 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
67 f.debug_struct("TwcsCompactionTask")
68 .field("region_id", &self.compaction_region.region_id)
69 .field("picker_output", &self.picker_output)
70 .field(
71 "append_mode",
72 &self.compaction_region.region_options.append_mode,
73 )
74 .finish()
75 }
76}
77
78impl Drop for CompactionTaskImpl {
79 fn drop(&mut self) {
80 self.mark_files_compacting(false)
81 }
82}
83
84impl CompactionTaskImpl {
85 fn mark_files_compacting(&self, compacting: bool) {
86 self.picker_output
87 .outputs
88 .iter()
89 .for_each(|o| o.inputs.iter().for_each(|f| f.set_compacting(compacting)));
90 }
91
92 async fn acquire_memory_with_policy(&self) -> error::Result<CompactionMemoryGuard> {
96 let region_id = self.compaction_region.region_id;
97 let requested_bytes = self.estimated_memory_bytes;
98 let policy = self.memory_policy;
99
100 let _timer = COMPACTION_MEMORY_WAIT.start_timer();
101 self.memory_manager
102 .acquire_with_policy(requested_bytes, policy)
103 .await
104 .context(CompactionMemoryExhaustedSnafu {
105 region_id,
106 policy: format!("{policy:?}"),
107 })
108 }
109
110 async fn remove_expired(
115 &self,
116 compaction_region: &CompactionRegion,
117 expired_files: Vec<FileMeta>,
118 ) {
119 let region_id = compaction_region.region_id;
120 let expired_files_str = expired_files.iter().map(|f| f.file_id).join(",");
121 let (expire_delete_sender, expire_delete_listener) = tokio::sync::oneshot::channel();
122 let edit = RegionEdit {
124 files_to_add: Vec::new(),
125 files_to_remove: expired_files,
126 timestamp_ms: Some(chrono::Utc::now().timestamp_millis()),
127 compaction_time_window: None,
128 flushed_entry_id: None,
129 flushed_sequence: None,
130 committed_sequence: None,
131 };
132
133 let action_list = RegionMetaActionList::with_action(RegionMetaAction::Edit(edit.clone()));
135 let RegionRoleState::Leader(current_region_state) =
136 compaction_region.manifest_ctx.current_state()
137 else {
138 warn!(
139 "Region {} not in leader state, skip removing expired files",
140 region_id
141 );
142 return;
143 };
144 if let Err(e) = compaction_region
145 .manifest_ctx
146 .update_manifest(current_region_state, action_list, false)
147 .await
148 {
149 warn!(
150 e;
151 "Failed to update manifest for expired files removal, region: {region_id}, files: [{expired_files_str}]. Compaction will continue."
152 );
153 return;
154 }
155
156 self.send_to_worker(WorkerRequest::Background {
158 region_id,
159 notify: BackgroundNotify::RegionEdit(RegionEditResult {
160 region_id,
161 sender: expire_delete_sender,
162 edit,
163 result: Ok(()),
164 update_region_state: false,
165 is_staging: false,
166 }),
167 })
168 .await;
169
170 if let Err(e) = expire_delete_listener
171 .await
172 .context(error::RecvSnafu)
173 .flatten()
174 {
175 warn!(
176 e;
177 "Failed to remove expired files from region version, region: {region_id}, files: [{expired_files_str}]. Compaction will continue."
178 );
179 return;
180 }
181
182 info!(
183 "Successfully removed expired files, region: {region_id}, files: [{expired_files_str}]"
184 );
185 }
186
187 async fn handle_expiration_and_compaction(&mut self) -> error::Result<RegionEdit> {
188 self.mark_files_compacting(true);
189
190 if !self.picker_output.expired_ssts.is_empty() {
192 let remove_timer = COMPACTION_STAGE_ELAPSED
193 .with_label_values(&["remove_expired"])
194 .start_timer();
195 let expired_ssts = self
196 .picker_output
197 .expired_ssts
198 .drain(..)
199 .map(|f| f.meta_ref().clone())
200 .collect();
201 self.remove_expired(&self.compaction_region, expired_ssts)
203 .await;
204 remove_timer.observe_duration();
205 }
206
207 let merge_timer = COMPACTION_STAGE_ELAPSED
209 .with_label_values(&["merge"])
210 .start_timer();
211
212 let compaction_result = match self
213 .compactor
214 .merge_ssts(&self.compaction_region, self.picker_output.clone())
215 .await
216 {
217 Ok(v) => v,
218 Err(e) => {
219 error!(e; "Failed to compact region: {}", self.compaction_region.region_id);
220 merge_timer.stop_and_discard();
221 return Err(e);
222 }
223 };
224 let merge_time = merge_timer.stop_and_record();
225
226 metrics::COMPACTION_INPUT_BYTES.inc_by(compaction_result.input_file_size() as f64);
227 metrics::COMPACTION_OUTPUT_BYTES.inc_by(compaction_result.output_file_size() as f64);
228 info!(
229 "Compacted SST files, region_id: {}, input: {:?}, output: {:?}, window: {:?}, waiter_num: {}, merge_time: {}s",
230 self.compaction_region.region_id,
231 compaction_result.files_to_remove,
232 compaction_result.files_to_add,
233 compaction_result.compaction_time_window,
234 self.waiters.len(),
235 merge_time,
236 );
237
238 self.listener
239 .on_merge_ssts_finished(self.compaction_region.region_id)
240 .await;
241
242 let _manifest_timer = COMPACTION_STAGE_ELAPSED
243 .with_label_values(&["write_manifest"])
244 .start_timer();
245
246 self.compactor
247 .update_manifest(&self.compaction_region, compaction_result)
248 .await
249 }
250
251 pub(crate) fn on_failure(&mut self, err: Arc<error::Error>) {
253 COMPACTION_FAILURE_COUNT.inc();
254 for waiter in self.waiters.drain(..) {
255 waiter.send(Err(err.clone()).context(CompactRegionSnafu {
256 region_id: self.compaction_region.region_id,
257 }));
258 }
259 }
260
261 async fn send_to_worker(&self, request: WorkerRequest) {
263 if let Err(e) = self
264 .request_sender
265 .send(WorkerRequestWithTime::new(request))
266 .await
267 {
268 error!(
269 "Failed to notify compaction job status for region {}, request: {:?}",
270 self.compaction_region.region_id, e.0
271 );
272 }
273 }
274}
275
276#[async_trait::async_trait]
277impl CompactionTask for CompactionTaskImpl {
278 async fn run(&mut self) {
279 let _memory_guard = match self.acquire_memory_with_policy().await {
281 Ok(guard) => guard,
282 Err(e) => {
283 error!(e; "Failed to acquire memory for compaction, region id: {}", self.compaction_region.region_id);
284 let err = Arc::new(e);
285 self.on_failure(err.clone());
286 let notify = BackgroundNotify::CompactionFailed(CompactionFailed {
287 region_id: self.compaction_region.region_id,
288 err,
289 });
290 self.send_to_worker(WorkerRequest::Background {
291 region_id: self.compaction_region.region_id,
292 notify,
293 })
294 .await;
295 return;
296 }
297 };
298
299 let notify = match self.handle_expiration_and_compaction().await {
300 Ok(edit) => BackgroundNotify::CompactionFinished(CompactionFinished {
301 region_id: self.compaction_region.region_id,
302 senders: std::mem::take(&mut self.waiters),
303 start_time: self.start_time,
304 edit,
305 }),
306 Err(e) => {
307 error!(e; "Failed to compact region, region id: {}", self.compaction_region.region_id);
308 let err = Arc::new(e);
309 self.on_failure(err.clone());
311 BackgroundNotify::CompactionFailed(CompactionFailed {
312 region_id: self.compaction_region.region_id,
313 err,
314 })
315 }
316 };
317
318 self.send_to_worker(WorkerRequest::Background {
319 region_id: self.compaction_region.region_id,
320 notify,
321 })
322 .await;
323 }
324}
325
326#[cfg(test)]
327mod tests {
328 use store_api::storage::FileId;
329
330 use crate::compaction::picker::PickerOutput;
331 use crate::compaction::test_util::new_file_handle;
332
333 #[test]
334 fn test_picker_output_with_expired_ssts() {
335 let file_ids = (0..3).map(|_| FileId::random()).collect::<Vec<_>>();
340 let expired_ssts = vec![
341 new_file_handle(file_ids[0], 0, 999, 0),
342 new_file_handle(file_ids[1], 1000, 1999, 0),
343 ];
344
345 let picker_output = PickerOutput {
346 outputs: vec![],
347 expired_ssts: expired_ssts.clone(),
348 time_window_size: 3600,
349 max_file_size: None,
350 };
351
352 assert_eq!(picker_output.expired_ssts.len(), 2);
354 assert_eq!(
355 picker_output.expired_ssts[0].file_id(),
356 expired_ssts[0].file_id()
357 );
358 assert_eq!(
359 picker_output.expired_ssts[1].file_id(),
360 expired_ssts[1].file_id()
361 );
362 }
363
364 #[test]
365 fn test_picker_output_without_expired_ssts() {
366 let picker_output = PickerOutput {
368 outputs: vec![],
369 expired_ssts: vec![],
370 time_window_size: 3600,
371 max_file_size: None,
372 };
373
374 assert!(picker_output.expired_ssts.is_empty());
376 }
377
378 }