Skip to main content

mito2/compaction/
task.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::fmt::{Debug, Formatter};
16use std::sync::Arc;
17use std::time::Instant;
18
19use common_base::cancellation::CancellableFuture;
20use common_memory_manager::OnExhaustedPolicy;
21use common_telemetry::{error, info, warn};
22use itertools::Itertools;
23use snafu::ResultExt;
24use store_api::ManifestVersion;
25use tokio::sync::mpsc;
26
27use crate::compaction::LocalCompactionState;
28use crate::compaction::compactor::{CompactionRegion, Compactor, MergeOutput};
29use crate::compaction::memory_manager::{CompactionMemoryGuard, CompactionMemoryManager};
30use crate::compaction::picker::{CompactionTask, PickerOutput};
31use crate::engine::region_hook::{RegionHookRef, SstFileInfo};
32use crate::error::{CompactRegionSnafu, CompactionMemoryExhaustedSnafu};
33use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList};
34use crate::metrics::{COMPACTION_FAILURE_COUNT, COMPACTION_MEMORY_WAIT, COMPACTION_STAGE_ELAPSED};
35use crate::region::RegionRoleState;
36use crate::request::{
37    BackgroundNotify, CompactionCancelled, CompactionFailed, CompactionFinished, OutputTx,
38    RegionEditResult, Waiters, WorkerRequest, WorkerRequestWithTime,
39};
40use crate::sst::file::FileMeta;
41use crate::worker::WorkerListener;
42use crate::{error, metrics};
43
44/// Maximum number of compaction tasks in parallel.
45pub const MAX_PARALLEL_COMPACTION: usize = 1;
46
47pub(crate) struct CompactionTaskImpl {
48    /// Shared local-compaction state for cooperative cancellation.
49    pub(crate) state: LocalCompactionState,
50    pub compaction_region: CompactionRegion,
51    /// Request sender to notify the worker.
52    pub(crate) request_sender: mpsc::Sender<WorkerRequestWithTime>,
53    /// Senders that are used to notify waiters waiting for pending compaction tasks.
54    pub waiters: Vec<OutputTx>,
55    /// Start time of compaction task
56    pub start_time: Instant,
57    /// Event listener.
58    pub(crate) listener: WorkerListener,
59    /// Compactor to handle compaction.
60    pub(crate) compactor: Arc<dyn Compactor>,
61    /// Output of the picker.
62    pub(crate) picker_output: PickerOutput,
63    /// Memory manager to acquire memory budget.
64    pub(crate) memory_manager: Arc<CompactionMemoryManager>,
65    /// Policy when memory is exhausted.
66    pub(crate) memory_policy: OnExhaustedPolicy,
67    /// Estimated memory bytes needed for this compaction.
68    pub(crate) estimated_memory_bytes: u64,
69}
70
71impl Debug for CompactionTaskImpl {
72    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
73        f.debug_struct("TwcsCompactionTask")
74            .field("region_id", &self.compaction_region.region_id)
75            .field("picker_output", &self.picker_output)
76            .field(
77                "append_mode",
78                &self.compaction_region.region_options.append_mode,
79            )
80            .finish()
81    }
82}
83
84impl Drop for CompactionTaskImpl {
85    fn drop(&mut self) {
86        self.mark_files_compacting(false)
87    }
88}
89
90impl CompactionTaskImpl {
91    fn mark_files_compacting(&self, compacting: bool) {
92        self.picker_output
93            .outputs
94            .iter()
95            .for_each(|o| o.inputs.iter().for_each(|f| f.set_compacting(compacting)));
96    }
97
98    /// Acquires memory budget based on the configured policy.
99    ///
100    /// Returns an error if memory cannot be acquired according to the policy.
101    async fn acquire_memory_with_policy(&self) -> error::Result<CompactionMemoryGuard> {
102        let region_id = self.compaction_region.region_id;
103        let requested_bytes = self.estimated_memory_bytes;
104        let policy = self.memory_policy;
105
106        let _timer = COMPACTION_MEMORY_WAIT.start_timer();
107        self.memory_manager
108            .acquire_with_policy(requested_bytes, policy)
109            .await
110            .context(CompactionMemoryExhaustedSnafu {
111                region_id,
112                policy: format!("{policy:?}"),
113            })
114    }
115
116    /// Remove expired ssts files, update manifest immediately
117    /// and apply the edit to region version.
118    ///
119    /// This function logs errors but does not stop the compaction process if removal fails.
120    async fn remove_expired(
121        &self,
122        compaction_region: &CompactionRegion,
123        expired_files: Vec<FileMeta>,
124    ) {
125        let region_id = compaction_region.region_id;
126        let expired_files_str = expired_files.iter().map(|f| f.file_id).join(",");
127        let (expire_delete_sender, expire_delete_listener) = tokio::sync::oneshot::channel();
128        // Update manifest to remove expired SSTs
129        let edit = RegionEdit {
130            files_to_add: Vec::new(),
131            files_to_remove: expired_files,
132            timestamp_ms: Some(chrono::Utc::now().timestamp_millis()),
133            compaction_time_window: None,
134            flushed_entry_id: None,
135            flushed_sequence: None,
136            committed_sequence: None,
137        };
138
139        // 1. Update manifest
140        let action_list = RegionMetaActionList::with_action(RegionMetaAction::Edit(edit.clone()));
141        let RegionRoleState::Leader(current_region_state) =
142            compaction_region.manifest_ctx.current_state()
143        else {
144            warn!(
145                "Region {} not in leader state, skip removing expired files",
146                region_id
147            );
148            return;
149        };
150        if let Err(e) = compaction_region
151            .manifest_ctx
152            .update_manifest(current_region_state, action_list, false)
153            .await
154        {
155            warn!(
156                e;
157                "Failed to update manifest for expired files removal, region: {region_id}, files: [{expired_files_str}]. Compaction will continue."
158            );
159            return;
160        }
161
162        // 2. Notify region worker loop to remove expired files from region version.
163        self.send_to_worker(WorkerRequest::Background {
164            region_id,
165            notify: BackgroundNotify::RegionEdit(RegionEditResult {
166                region_id,
167                waiters: Waiters::one(expire_delete_sender),
168                edit,
169                result: Ok(()),
170                update_region_state: false,
171                is_staging: false,
172            }),
173        })
174        .await;
175
176        if let Err(e) = expire_delete_listener
177            .await
178            .context(error::RecvSnafu)
179            .flatten()
180        {
181            warn!(
182                e;
183                "Failed to remove expired files from region version, region: {region_id}, files: [{expired_files_str}]. Compaction will continue."
184            );
185            return;
186        }
187
188        info!(
189            "Successfully removed expired files, region: {region_id}, files: [{expired_files_str}]"
190        );
191    }
192
193    async fn handle_expiration(&mut self) {
194        // 1. In case of local compaction, we can delete expired ssts in advance.
195        if !self.picker_output.expired_ssts.is_empty() {
196            let remove_timer = COMPACTION_STAGE_ELAPSED
197                .with_label_values(&["remove_expired"])
198                .start_timer();
199            let expired_ssts = self
200                .picker_output
201                .expired_ssts
202                .drain(..)
203                .map(|f| f.meta_ref().clone())
204                .collect();
205            // remove_expired logs errors but doesn't stop compaction
206            self.remove_expired(&self.compaction_region, expired_ssts)
207                .await;
208            remove_timer.observe_duration();
209        }
210    }
211
212    async fn handle_compaction(&mut self) -> error::Result<MergeOutput> {
213        // 2. Merge inputs
214        let merge_timer = COMPACTION_STAGE_ELAPSED
215            .with_label_values(&["merge"])
216            .start_timer();
217
218        let compaction_result = match self
219            .compactor
220            .merge_ssts(&self.compaction_region, self.picker_output.clone())
221            .await
222        {
223            Ok(v) => v,
224            Err(e) => {
225                error!(e; "Failed to compact region: {}", self.compaction_region.region_id);
226                merge_timer.stop_and_discard();
227                return Err(e);
228            }
229        };
230        let merge_time = merge_timer.stop_and_record();
231
232        metrics::COMPACTION_INPUT_BYTES.inc_by(compaction_result.input_file_size() as f64);
233        metrics::COMPACTION_OUTPUT_BYTES.inc_by(compaction_result.output_file_size() as f64);
234        info!(
235            "Compacted SST files, region_id: {}, input: {:?}, output: {:?}, window: {:?}, waiter_num: {}, merge_time: {}s",
236            self.compaction_region.region_id,
237            compaction_result.files_to_remove,
238            compaction_result.files_to_add,
239            compaction_result.compaction_time_window,
240            self.waiters.len(),
241            merge_time,
242        );
243
244        self.listener
245            .on_merge_ssts_finished(self.compaction_region.region_id)
246            .await;
247
248        Ok(compaction_result)
249    }
250
251    async fn update_manifest(
252        &self,
253        compaction_result: crate::compaction::compactor::MergeOutput,
254    ) -> error::Result<(RegionEdit, ManifestVersion)> {
255        let _manifest_timer = COMPACTION_STAGE_ELAPSED
256            .with_label_values(&["write_manifest"])
257            .start_timer();
258
259        self.compactor
260            .update_manifest(&self.compaction_region, compaction_result)
261            .await
262    }
263
264    /// Handles compaction failure, notifies all waiters.
265    pub(crate) fn on_failure(&mut self, err: Arc<error::Error>) {
266        COMPACTION_FAILURE_COUNT.inc();
267        for waiter in self.waiters.drain(..) {
268            waiter.send(Err(err.clone()).context(CompactRegionSnafu {
269                region_id: self.compaction_region.region_id,
270            }));
271        }
272    }
273
274    /// Notifies region worker to handle post-compaction tasks.
275    async fn send_to_worker(&self, request: WorkerRequest) {
276        if let Err(e) = self
277            .request_sender
278            .send(WorkerRequestWithTime::new(request))
279            .await
280        {
281            error!(
282                "Failed to notify compaction job status for region {}, request: {:?}",
283                self.compaction_region.region_id, e.0
284            );
285        }
286    }
287
288    async fn invoke_sst_hook(&self, merge_output: &MergeOutput) {
289        let hook: Option<RegionHookRef> = self.compaction_region.plugins.get();
290        if let Some(hook) = hook {
291            let files: Vec<SstFileInfo<'_>> = merge_output
292                .sst_infos
293                .iter()
294                .zip(merge_output.files_to_add.iter())
295                .map(|(info, meta)| SstFileInfo {
296                    sst_info_ref: info,
297                    file_meta: meta,
298                })
299                .collect();
300            hook.on_sst_files_written(
301                self.compaction_region.region_id,
302                &self.compaction_region.region_metadata,
303                &files,
304            )
305            .await;
306        }
307    }
308}
309
310#[async_trait::async_trait]
311impl CompactionTask for CompactionTaskImpl {
312    async fn run(&mut self) {
313        // Acquire memory budget before starting compaction
314        let _memory_guard = match self.acquire_memory_with_policy().await {
315            Ok(guard) => guard,
316            Err(e) => {
317                error!(e; "Failed to acquire memory for compaction, region id: {}", self.compaction_region.region_id);
318                let err = Arc::new(e);
319                self.on_failure(err.clone());
320                let notify = BackgroundNotify::CompactionFailed(CompactionFailed {
321                    region_id: self.compaction_region.region_id,
322                    err,
323                });
324                self.send_to_worker(WorkerRequest::Background {
325                    region_id: self.compaction_region.region_id,
326                    notify,
327                })
328                .await;
329                return;
330            }
331        };
332
333        // Marks files compacting before compaction and unmark after compaction (even if compaction is cancelled or failed), so that they won't be picked by other compaction tasks.
334        self.mark_files_compacting(true);
335        self.handle_expiration().await;
336
337        let cancel_handle = self.state.cancel_handle();
338        // Run compaction with cooperative cancellation.
339        let notify = match CancellableFuture::new(
340            async { self.handle_compaction().await },
341            cancel_handle,
342        )
343        .await
344        {
345            Ok(Ok(merge_output)) => {
346                self.invoke_sst_hook(&merge_output).await;
347                // Stop accepting cancellation once we are about to publish the compaction edit.
348                if !self.state.mark_commit_started() {
349                    let senders = std::mem::take(&mut self.waiters);
350                    BackgroundNotify::CompactionCancelled(CompactionCancelled {
351                        region_id: self.compaction_region.region_id,
352                        senders,
353                    })
354                } else {
355                    match self.update_manifest(merge_output).await {
356                        Ok((edit, _manifest_version)) => {
357                            let senders = std::mem::take(&mut self.waiters);
358                            BackgroundNotify::CompactionFinished(CompactionFinished {
359                                region_id: self.compaction_region.region_id,
360                                senders,
361                                start_time: self.start_time,
362                                edit,
363                            })
364                        }
365                        Err(e) => {
366                            error!(e; "Failed to compact region, region id: {}", self.compaction_region.region_id);
367                            let err = Arc::new(e);
368                            self.on_failure(err.clone());
369                            BackgroundNotify::CompactionFailed(CompactionFailed {
370                                region_id: self.compaction_region.region_id,
371                                err,
372                            })
373                        }
374                    }
375                }
376            }
377            Err(_) => {
378                info!(
379                    "Compaction cancelled, region id: {}",
380                    self.compaction_region.region_id
381                );
382                let senders = std::mem::take(&mut self.waiters);
383                BackgroundNotify::CompactionCancelled(CompactionCancelled {
384                    region_id: self.compaction_region.region_id,
385                    senders,
386                })
387            }
388            Ok(Err(e)) => {
389                error!(e; "Failed to compact region, region id: {}", self.compaction_region.region_id);
390                let err = Arc::new(e);
391                // notify compaction waiters
392                self.on_failure(err.clone());
393                BackgroundNotify::CompactionFailed(CompactionFailed {
394                    region_id: self.compaction_region.region_id,
395                    err,
396                })
397            }
398        };
399
400        self.send_to_worker(WorkerRequest::Background {
401            region_id: self.compaction_region.region_id,
402            notify,
403        })
404        .await;
405    }
406}
407
408#[cfg(test)]
409mod tests {
410    use store_api::storage::FileId;
411
412    use crate::compaction::picker::PickerOutput;
413    use crate::compaction::test_util::new_file_handle;
414
415    #[test]
416    fn test_picker_output_with_expired_ssts() {
417        // Test that PickerOutput correctly includes expired_ssts
418        // This verifies that expired SSTs are properly identified and included
419        // in the picker output, which is then handled by handle_expiration()
420
421        let file_ids = (0..3).map(|_| FileId::random()).collect::<Vec<_>>();
422        let expired_ssts = vec![
423            new_file_handle(file_ids[0], 0, 999, 0),
424            new_file_handle(file_ids[1], 1000, 1999, 0),
425        ];
426
427        let picker_output = PickerOutput {
428            outputs: vec![],
429            expired_ssts: expired_ssts.clone(),
430            time_window_size: 3600,
431            max_file_size: None,
432        };
433
434        // Verify expired_ssts are included
435        assert_eq!(picker_output.expired_ssts.len(), 2);
436        assert_eq!(
437            picker_output.expired_ssts[0].file_id(),
438            expired_ssts[0].file_id()
439        );
440        assert_eq!(
441            picker_output.expired_ssts[1].file_id(),
442            expired_ssts[1].file_id()
443        );
444    }
445
446    #[test]
447    fn test_picker_output_without_expired_ssts() {
448        // Test that PickerOutput works correctly when there are no expired SSTs
449        let picker_output = PickerOutput {
450            outputs: vec![],
451            expired_ssts: vec![],
452            time_window_size: 3600,
453            max_file_size: None,
454        };
455
456        // Verify empty expired_ssts
457        assert!(picker_output.expired_ssts.is_empty());
458    }
459
460    // Note: Testing remove_expired() directly requires extensive mocking of:
461    // - manifest_ctx (ManifestContext)
462    // - request_sender (mpsc::Sender<WorkerRequestWithTime>)
463    // - WorkerRequest handling
464    //
465    // The behavior is tested indirectly through integration tests:
466    // - remove_expired() logs errors but doesn't stop compaction
467    // - handle_expiration() continues even if remove_expired() encounters errors
468    // - The expiration stage is designed to be non-blocking for compaction
469}