1use std::fmt::{Debug, Formatter};
16use std::sync::Arc;
17use std::time::Instant;
18
19use common_base::cancellation::CancellableFuture;
20use common_memory_manager::OnExhaustedPolicy;
21use common_telemetry::{error, info, warn};
22use itertools::Itertools;
23use snafu::ResultExt;
24use store_api::ManifestVersion;
25use tokio::sync::mpsc;
26
27use crate::compaction::LocalCompactionState;
28use crate::compaction::compactor::{CompactionRegion, Compactor, MergeOutput};
29use crate::compaction::memory_manager::{CompactionMemoryGuard, CompactionMemoryManager};
30use crate::compaction::picker::{CompactionTask, PickerOutput};
31use crate::engine::region_hook::{RegionHookRef, SstFileInfo};
32use crate::error::{CompactRegionSnafu, CompactionMemoryExhaustedSnafu};
33use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList};
34use crate::metrics::{COMPACTION_FAILURE_COUNT, COMPACTION_MEMORY_WAIT, COMPACTION_STAGE_ELAPSED};
35use crate::region::RegionRoleState;
36use crate::request::{
37 BackgroundNotify, CompactionCancelled, CompactionFailed, CompactionFinished, OutputTx,
38 RegionEditResult, Waiters, WorkerRequest, WorkerRequestWithTime,
39};
40use crate::sst::file::FileMeta;
41use crate::worker::WorkerListener;
42use crate::{error, metrics};
43
44pub const MAX_PARALLEL_COMPACTION: usize = 1;
46
47pub(crate) struct CompactionTaskImpl {
48 pub(crate) state: LocalCompactionState,
50 pub compaction_region: CompactionRegion,
51 pub(crate) request_sender: mpsc::Sender<WorkerRequestWithTime>,
53 pub waiters: Vec<OutputTx>,
55 pub start_time: Instant,
57 pub(crate) listener: WorkerListener,
59 pub(crate) compactor: Arc<dyn Compactor>,
61 pub(crate) picker_output: PickerOutput,
63 pub(crate) memory_manager: Arc<CompactionMemoryManager>,
65 pub(crate) memory_policy: OnExhaustedPolicy,
67 pub(crate) estimated_memory_bytes: u64,
69}
70
71impl Debug for CompactionTaskImpl {
72 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
73 f.debug_struct("TwcsCompactionTask")
74 .field("region_id", &self.compaction_region.region_id)
75 .field("picker_output", &self.picker_output)
76 .field(
77 "append_mode",
78 &self.compaction_region.region_options.append_mode,
79 )
80 .finish()
81 }
82}
83
84impl Drop for CompactionTaskImpl {
85 fn drop(&mut self) {
86 self.mark_files_compacting(false)
87 }
88}
89
90impl CompactionTaskImpl {
91 fn mark_files_compacting(&self, compacting: bool) {
92 self.picker_output
93 .outputs
94 .iter()
95 .for_each(|o| o.inputs.iter().for_each(|f| f.set_compacting(compacting)));
96 }
97
98 async fn acquire_memory_with_policy(&self) -> error::Result<CompactionMemoryGuard> {
102 let region_id = self.compaction_region.region_id;
103 let requested_bytes = self.estimated_memory_bytes;
104 let policy = self.memory_policy;
105
106 let _timer = COMPACTION_MEMORY_WAIT.start_timer();
107 self.memory_manager
108 .acquire_with_policy(requested_bytes, policy)
109 .await
110 .context(CompactionMemoryExhaustedSnafu {
111 region_id,
112 policy: format!("{policy:?}"),
113 })
114 }
115
116 async fn remove_expired(
121 &self,
122 compaction_region: &CompactionRegion,
123 expired_files: Vec<FileMeta>,
124 ) {
125 let region_id = compaction_region.region_id;
126 let expired_files_str = expired_files.iter().map(|f| f.file_id).join(",");
127 let (expire_delete_sender, expire_delete_listener) = tokio::sync::oneshot::channel();
128 let edit = RegionEdit {
130 files_to_add: Vec::new(),
131 files_to_remove: expired_files,
132 timestamp_ms: Some(chrono::Utc::now().timestamp_millis()),
133 compaction_time_window: None,
134 flushed_entry_id: None,
135 flushed_sequence: None,
136 committed_sequence: None,
137 };
138
139 let action_list = RegionMetaActionList::with_action(RegionMetaAction::Edit(edit.clone()));
141 let RegionRoleState::Leader(current_region_state) =
142 compaction_region.manifest_ctx.current_state()
143 else {
144 warn!(
145 "Region {} not in leader state, skip removing expired files",
146 region_id
147 );
148 return;
149 };
150 if let Err(e) = compaction_region
151 .manifest_ctx
152 .update_manifest(current_region_state, action_list, false)
153 .await
154 {
155 warn!(
156 e;
157 "Failed to update manifest for expired files removal, region: {region_id}, files: [{expired_files_str}]. Compaction will continue."
158 );
159 return;
160 }
161
162 self.send_to_worker(WorkerRequest::Background {
164 region_id,
165 notify: BackgroundNotify::RegionEdit(RegionEditResult {
166 region_id,
167 waiters: Waiters::one(expire_delete_sender),
168 edit,
169 result: Ok(()),
170 update_region_state: false,
171 is_staging: false,
172 }),
173 })
174 .await;
175
176 if let Err(e) = expire_delete_listener
177 .await
178 .context(error::RecvSnafu)
179 .flatten()
180 {
181 warn!(
182 e;
183 "Failed to remove expired files from region version, region: {region_id}, files: [{expired_files_str}]. Compaction will continue."
184 );
185 return;
186 }
187
188 info!(
189 "Successfully removed expired files, region: {region_id}, files: [{expired_files_str}]"
190 );
191 }
192
193 async fn handle_expiration(&mut self) {
194 if !self.picker_output.expired_ssts.is_empty() {
196 let remove_timer = COMPACTION_STAGE_ELAPSED
197 .with_label_values(&["remove_expired"])
198 .start_timer();
199 let expired_ssts = self
200 .picker_output
201 .expired_ssts
202 .drain(..)
203 .map(|f| f.meta_ref().clone())
204 .collect();
205 self.remove_expired(&self.compaction_region, expired_ssts)
207 .await;
208 remove_timer.observe_duration();
209 }
210 }
211
212 async fn handle_compaction(&mut self) -> error::Result<MergeOutput> {
213 let merge_timer = COMPACTION_STAGE_ELAPSED
215 .with_label_values(&["merge"])
216 .start_timer();
217
218 let compaction_result = match self
219 .compactor
220 .merge_ssts(&self.compaction_region, self.picker_output.clone())
221 .await
222 {
223 Ok(v) => v,
224 Err(e) => {
225 error!(e; "Failed to compact region: {}", self.compaction_region.region_id);
226 merge_timer.stop_and_discard();
227 return Err(e);
228 }
229 };
230 let merge_time = merge_timer.stop_and_record();
231
232 metrics::COMPACTION_INPUT_BYTES.inc_by(compaction_result.input_file_size() as f64);
233 metrics::COMPACTION_OUTPUT_BYTES.inc_by(compaction_result.output_file_size() as f64);
234 info!(
235 "Compacted SST files, region_id: {}, input: {:?}, output: {:?}, window: {:?}, waiter_num: {}, merge_time: {}s",
236 self.compaction_region.region_id,
237 compaction_result.files_to_remove,
238 compaction_result.files_to_add,
239 compaction_result.compaction_time_window,
240 self.waiters.len(),
241 merge_time,
242 );
243
244 self.listener
245 .on_merge_ssts_finished(self.compaction_region.region_id)
246 .await;
247
248 Ok(compaction_result)
249 }
250
251 async fn update_manifest(
252 &self,
253 compaction_result: crate::compaction::compactor::MergeOutput,
254 ) -> error::Result<(RegionEdit, ManifestVersion)> {
255 let _manifest_timer = COMPACTION_STAGE_ELAPSED
256 .with_label_values(&["write_manifest"])
257 .start_timer();
258
259 self.compactor
260 .update_manifest(&self.compaction_region, compaction_result)
261 .await
262 }
263
264 pub(crate) fn on_failure(&mut self, err: Arc<error::Error>) {
266 COMPACTION_FAILURE_COUNT.inc();
267 for waiter in self.waiters.drain(..) {
268 waiter.send(Err(err.clone()).context(CompactRegionSnafu {
269 region_id: self.compaction_region.region_id,
270 }));
271 }
272 }
273
274 async fn send_to_worker(&self, request: WorkerRequest) {
276 if let Err(e) = self
277 .request_sender
278 .send(WorkerRequestWithTime::new(request))
279 .await
280 {
281 error!(
282 "Failed to notify compaction job status for region {}, request: {:?}",
283 self.compaction_region.region_id, e.0
284 );
285 }
286 }
287
288 async fn invoke_sst_hook(&self, merge_output: &MergeOutput) {
289 let hook: Option<RegionHookRef> = self.compaction_region.plugins.get();
290 if let Some(hook) = hook {
291 let files: Vec<SstFileInfo<'_>> = merge_output
292 .sst_infos
293 .iter()
294 .zip(merge_output.files_to_add.iter())
295 .map(|(info, meta)| SstFileInfo {
296 sst_info_ref: info,
297 file_meta: meta,
298 })
299 .collect();
300 hook.on_sst_files_written(
301 self.compaction_region.region_id,
302 &self.compaction_region.region_metadata,
303 &files,
304 )
305 .await;
306 }
307 }
308}
309
310#[async_trait::async_trait]
311impl CompactionTask for CompactionTaskImpl {
312 async fn run(&mut self) {
313 let _memory_guard = match self.acquire_memory_with_policy().await {
315 Ok(guard) => guard,
316 Err(e) => {
317 error!(e; "Failed to acquire memory for compaction, region id: {}", self.compaction_region.region_id);
318 let err = Arc::new(e);
319 self.on_failure(err.clone());
320 let notify = BackgroundNotify::CompactionFailed(CompactionFailed {
321 region_id: self.compaction_region.region_id,
322 err,
323 });
324 self.send_to_worker(WorkerRequest::Background {
325 region_id: self.compaction_region.region_id,
326 notify,
327 })
328 .await;
329 return;
330 }
331 };
332
333 self.mark_files_compacting(true);
335 self.handle_expiration().await;
336
337 let cancel_handle = self.state.cancel_handle();
338 let notify = match CancellableFuture::new(
340 async { self.handle_compaction().await },
341 cancel_handle,
342 )
343 .await
344 {
345 Ok(Ok(merge_output)) => {
346 self.invoke_sst_hook(&merge_output).await;
347 if !self.state.mark_commit_started() {
349 let senders = std::mem::take(&mut self.waiters);
350 BackgroundNotify::CompactionCancelled(CompactionCancelled {
351 region_id: self.compaction_region.region_id,
352 senders,
353 })
354 } else {
355 match self.update_manifest(merge_output).await {
356 Ok((edit, _manifest_version)) => {
357 let senders = std::mem::take(&mut self.waiters);
358 BackgroundNotify::CompactionFinished(CompactionFinished {
359 region_id: self.compaction_region.region_id,
360 senders,
361 start_time: self.start_time,
362 edit,
363 })
364 }
365 Err(e) => {
366 error!(e; "Failed to compact region, region id: {}", self.compaction_region.region_id);
367 let err = Arc::new(e);
368 self.on_failure(err.clone());
369 BackgroundNotify::CompactionFailed(CompactionFailed {
370 region_id: self.compaction_region.region_id,
371 err,
372 })
373 }
374 }
375 }
376 }
377 Err(_) => {
378 info!(
379 "Compaction cancelled, region id: {}",
380 self.compaction_region.region_id
381 );
382 let senders = std::mem::take(&mut self.waiters);
383 BackgroundNotify::CompactionCancelled(CompactionCancelled {
384 region_id: self.compaction_region.region_id,
385 senders,
386 })
387 }
388 Ok(Err(e)) => {
389 error!(e; "Failed to compact region, region id: {}", self.compaction_region.region_id);
390 let err = Arc::new(e);
391 self.on_failure(err.clone());
393 BackgroundNotify::CompactionFailed(CompactionFailed {
394 region_id: self.compaction_region.region_id,
395 err,
396 })
397 }
398 };
399
400 self.send_to_worker(WorkerRequest::Background {
401 region_id: self.compaction_region.region_id,
402 notify,
403 })
404 .await;
405 }
406}
407
408#[cfg(test)]
409mod tests {
410 use store_api::storage::FileId;
411
412 use crate::compaction::picker::PickerOutput;
413 use crate::compaction::test_util::new_file_handle;
414
415 #[test]
416 fn test_picker_output_with_expired_ssts() {
417 let file_ids = (0..3).map(|_| FileId::random()).collect::<Vec<_>>();
422 let expired_ssts = vec![
423 new_file_handle(file_ids[0], 0, 999, 0),
424 new_file_handle(file_ids[1], 1000, 1999, 0),
425 ];
426
427 let picker_output = PickerOutput {
428 outputs: vec![],
429 expired_ssts: expired_ssts.clone(),
430 time_window_size: 3600,
431 max_file_size: None,
432 };
433
434 assert_eq!(picker_output.expired_ssts.len(), 2);
436 assert_eq!(
437 picker_output.expired_ssts[0].file_id(),
438 expired_ssts[0].file_id()
439 );
440 assert_eq!(
441 picker_output.expired_ssts[1].file_id(),
442 expired_ssts[1].file_id()
443 );
444 }
445
446 #[test]
447 fn test_picker_output_without_expired_ssts() {
448 let picker_output = PickerOutput {
450 outputs: vec![],
451 expired_ssts: vec![],
452 time_window_size: 3600,
453 max_file_size: None,
454 };
455
456 assert!(picker_output.expired_ssts.is_empty());
458 }
459
460 }