mito2/compaction/
task.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::fmt::{Debug, Formatter};
16use std::sync::Arc;
17use std::time::Instant;
18
19use common_memory_manager::OnExhaustedPolicy;
20use common_telemetry::{error, info, warn};
21use itertools::Itertools;
22use snafu::ResultExt;
23use tokio::sync::mpsc;
24
25use crate::compaction::compactor::{CompactionRegion, Compactor};
26use crate::compaction::memory_manager::{CompactionMemoryGuard, CompactionMemoryManager};
27use crate::compaction::picker::{CompactionTask, PickerOutput};
28use crate::error::{CompactRegionSnafu, CompactionMemoryExhaustedSnafu};
29use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList};
30use crate::metrics::{COMPACTION_FAILURE_COUNT, COMPACTION_MEMORY_WAIT, COMPACTION_STAGE_ELAPSED};
31use crate::region::RegionRoleState;
32use crate::request::{
33    BackgroundNotify, CompactionFailed, CompactionFinished, OutputTx, RegionEditResult,
34    WorkerRequest, WorkerRequestWithTime,
35};
36use crate::sst::file::FileMeta;
37use crate::worker::WorkerListener;
38use crate::{error, metrics};
39
40/// Maximum number of compaction tasks in parallel.
41pub const MAX_PARALLEL_COMPACTION: usize = 1;
42
43pub(crate) struct CompactionTaskImpl {
44    pub compaction_region: CompactionRegion,
45    /// Request sender to notify the worker.
46    pub(crate) request_sender: mpsc::Sender<WorkerRequestWithTime>,
47    /// Senders that are used to notify waiters waiting for pending compaction tasks.
48    pub waiters: Vec<OutputTx>,
49    /// Start time of compaction task
50    pub start_time: Instant,
51    /// Event listener.
52    pub(crate) listener: WorkerListener,
53    /// Compactor to handle compaction.
54    pub(crate) compactor: Arc<dyn Compactor>,
55    /// Output of the picker.
56    pub(crate) picker_output: PickerOutput,
57    /// Memory manager to acquire memory budget.
58    pub(crate) memory_manager: Arc<CompactionMemoryManager>,
59    /// Policy when memory is exhausted.
60    pub(crate) memory_policy: OnExhaustedPolicy,
61    /// Estimated memory bytes needed for this compaction.
62    pub(crate) estimated_memory_bytes: u64,
63}
64
65impl Debug for CompactionTaskImpl {
66    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
67        f.debug_struct("TwcsCompactionTask")
68            .field("region_id", &self.compaction_region.region_id)
69            .field("picker_output", &self.picker_output)
70            .field(
71                "append_mode",
72                &self.compaction_region.region_options.append_mode,
73            )
74            .finish()
75    }
76}
77
78impl Drop for CompactionTaskImpl {
79    fn drop(&mut self) {
80        self.mark_files_compacting(false)
81    }
82}
83
84impl CompactionTaskImpl {
85    fn mark_files_compacting(&self, compacting: bool) {
86        self.picker_output
87            .outputs
88            .iter()
89            .for_each(|o| o.inputs.iter().for_each(|f| f.set_compacting(compacting)));
90    }
91
92    /// Acquires memory budget based on the configured policy.
93    ///
94    /// Returns an error if memory cannot be acquired according to the policy.
95    async fn acquire_memory_with_policy(&self) -> error::Result<CompactionMemoryGuard> {
96        let region_id = self.compaction_region.region_id;
97        let requested_bytes = self.estimated_memory_bytes;
98        let policy = self.memory_policy;
99
100        let _timer = COMPACTION_MEMORY_WAIT.start_timer();
101        self.memory_manager
102            .acquire_with_policy(requested_bytes, policy)
103            .await
104            .context(CompactionMemoryExhaustedSnafu {
105                region_id,
106                policy: format!("{policy:?}"),
107            })
108    }
109
110    /// Remove expired ssts files, update manifest immediately
111    /// and apply the edit to region version.
112    ///
113    /// This function logs errors but does not stop the compaction process if removal fails.
114    async fn remove_expired(
115        &self,
116        compaction_region: &CompactionRegion,
117        expired_files: Vec<FileMeta>,
118    ) {
119        let region_id = compaction_region.region_id;
120        let expired_files_str = expired_files.iter().map(|f| f.file_id).join(",");
121        let (expire_delete_sender, expire_delete_listener) = tokio::sync::oneshot::channel();
122        // Update manifest to remove expired SSTs
123        let edit = RegionEdit {
124            files_to_add: Vec::new(),
125            files_to_remove: expired_files,
126            timestamp_ms: Some(chrono::Utc::now().timestamp_millis()),
127            compaction_time_window: None,
128            flushed_entry_id: None,
129            flushed_sequence: None,
130            committed_sequence: None,
131        };
132
133        // 1. Update manifest
134        let action_list = RegionMetaActionList::with_action(RegionMetaAction::Edit(edit.clone()));
135        let RegionRoleState::Leader(current_region_state) =
136            compaction_region.manifest_ctx.current_state()
137        else {
138            warn!(
139                "Region {} not in leader state, skip removing expired files",
140                region_id
141            );
142            return;
143        };
144        if let Err(e) = compaction_region
145            .manifest_ctx
146            .update_manifest(current_region_state, action_list, false)
147            .await
148        {
149            warn!(
150                e;
151                "Failed to update manifest for expired files removal, region: {region_id}, files: [{expired_files_str}]. Compaction will continue."
152            );
153            return;
154        }
155
156        // 2. Notify region worker loop to remove expired files from region version.
157        self.send_to_worker(WorkerRequest::Background {
158            region_id,
159            notify: BackgroundNotify::RegionEdit(RegionEditResult {
160                region_id,
161                sender: expire_delete_sender,
162                edit,
163                result: Ok(()),
164                update_region_state: false,
165                is_staging: false,
166            }),
167        })
168        .await;
169
170        if let Err(e) = expire_delete_listener
171            .await
172            .context(error::RecvSnafu)
173            .flatten()
174        {
175            warn!(
176                e;
177                "Failed to remove expired files from region version, region: {region_id}, files: [{expired_files_str}]. Compaction will continue."
178            );
179            return;
180        }
181
182        info!(
183            "Successfully removed expired files, region: {region_id}, files: [{expired_files_str}]"
184        );
185    }
186
187    async fn handle_expiration_and_compaction(&mut self) -> error::Result<RegionEdit> {
188        self.mark_files_compacting(true);
189
190        // 1. In case of local compaction, we can delete expired ssts in advance.
191        if !self.picker_output.expired_ssts.is_empty() {
192            let remove_timer = COMPACTION_STAGE_ELAPSED
193                .with_label_values(&["remove_expired"])
194                .start_timer();
195            let expired_ssts = self
196                .picker_output
197                .expired_ssts
198                .drain(..)
199                .map(|f| f.meta_ref().clone())
200                .collect();
201            // remove_expired logs errors but doesn't stop compaction
202            self.remove_expired(&self.compaction_region, expired_ssts)
203                .await;
204            remove_timer.observe_duration();
205        }
206
207        // 2. Merge inputs
208        let merge_timer = COMPACTION_STAGE_ELAPSED
209            .with_label_values(&["merge"])
210            .start_timer();
211
212        let compaction_result = match self
213            .compactor
214            .merge_ssts(&self.compaction_region, self.picker_output.clone())
215            .await
216        {
217            Ok(v) => v,
218            Err(e) => {
219                error!(e; "Failed to compact region: {}", self.compaction_region.region_id);
220                merge_timer.stop_and_discard();
221                return Err(e);
222            }
223        };
224        let merge_time = merge_timer.stop_and_record();
225
226        metrics::COMPACTION_INPUT_BYTES.inc_by(compaction_result.input_file_size() as f64);
227        metrics::COMPACTION_OUTPUT_BYTES.inc_by(compaction_result.output_file_size() as f64);
228        info!(
229            "Compacted SST files, region_id: {}, input: {:?}, output: {:?}, window: {:?}, waiter_num: {}, merge_time: {}s",
230            self.compaction_region.region_id,
231            compaction_result.files_to_remove,
232            compaction_result.files_to_add,
233            compaction_result.compaction_time_window,
234            self.waiters.len(),
235            merge_time,
236        );
237
238        self.listener
239            .on_merge_ssts_finished(self.compaction_region.region_id)
240            .await;
241
242        let _manifest_timer = COMPACTION_STAGE_ELAPSED
243            .with_label_values(&["write_manifest"])
244            .start_timer();
245
246        self.compactor
247            .update_manifest(&self.compaction_region, compaction_result)
248            .await
249    }
250
251    /// Handles compaction failure, notifies all waiters.
252    pub(crate) fn on_failure(&mut self, err: Arc<error::Error>) {
253        COMPACTION_FAILURE_COUNT.inc();
254        for waiter in self.waiters.drain(..) {
255            waiter.send(Err(err.clone()).context(CompactRegionSnafu {
256                region_id: self.compaction_region.region_id,
257            }));
258        }
259    }
260
261    /// Notifies region worker to handle post-compaction tasks.
262    async fn send_to_worker(&self, request: WorkerRequest) {
263        if let Err(e) = self
264            .request_sender
265            .send(WorkerRequestWithTime::new(request))
266            .await
267        {
268            error!(
269                "Failed to notify compaction job status for region {}, request: {:?}",
270                self.compaction_region.region_id, e.0
271            );
272        }
273    }
274}
275
276#[async_trait::async_trait]
277impl CompactionTask for CompactionTaskImpl {
278    async fn run(&mut self) {
279        // Acquire memory budget before starting compaction
280        let _memory_guard = match self.acquire_memory_with_policy().await {
281            Ok(guard) => guard,
282            Err(e) => {
283                error!(e; "Failed to acquire memory for compaction, region id: {}", self.compaction_region.region_id);
284                let err = Arc::new(e);
285                self.on_failure(err.clone());
286                let notify = BackgroundNotify::CompactionFailed(CompactionFailed {
287                    region_id: self.compaction_region.region_id,
288                    err,
289                });
290                self.send_to_worker(WorkerRequest::Background {
291                    region_id: self.compaction_region.region_id,
292                    notify,
293                })
294                .await;
295                return;
296            }
297        };
298
299        let notify = match self.handle_expiration_and_compaction().await {
300            Ok(edit) => BackgroundNotify::CompactionFinished(CompactionFinished {
301                region_id: self.compaction_region.region_id,
302                senders: std::mem::take(&mut self.waiters),
303                start_time: self.start_time,
304                edit,
305            }),
306            Err(e) => {
307                error!(e; "Failed to compact region, region id: {}", self.compaction_region.region_id);
308                let err = Arc::new(e);
309                // notify compaction waiters
310                self.on_failure(err.clone());
311                BackgroundNotify::CompactionFailed(CompactionFailed {
312                    region_id: self.compaction_region.region_id,
313                    err,
314                })
315            }
316        };
317
318        self.send_to_worker(WorkerRequest::Background {
319            region_id: self.compaction_region.region_id,
320            notify,
321        })
322        .await;
323    }
324}
325
326#[cfg(test)]
327mod tests {
328    use store_api::storage::FileId;
329
330    use crate::compaction::picker::PickerOutput;
331    use crate::compaction::test_util::new_file_handle;
332
333    #[test]
334    fn test_picker_output_with_expired_ssts() {
335        // Test that PickerOutput correctly includes expired_ssts
336        // This verifies that expired SSTs are properly identified and included
337        // in the picker output, which is then handled by handle_expiration_and_compaction
338
339        let file_ids = (0..3).map(|_| FileId::random()).collect::<Vec<_>>();
340        let expired_ssts = vec![
341            new_file_handle(file_ids[0], 0, 999, 0),
342            new_file_handle(file_ids[1], 1000, 1999, 0),
343        ];
344
345        let picker_output = PickerOutput {
346            outputs: vec![],
347            expired_ssts: expired_ssts.clone(),
348            time_window_size: 3600,
349            max_file_size: None,
350        };
351
352        // Verify expired_ssts are included
353        assert_eq!(picker_output.expired_ssts.len(), 2);
354        assert_eq!(
355            picker_output.expired_ssts[0].file_id(),
356            expired_ssts[0].file_id()
357        );
358        assert_eq!(
359            picker_output.expired_ssts[1].file_id(),
360            expired_ssts[1].file_id()
361        );
362    }
363
364    #[test]
365    fn test_picker_output_without_expired_ssts() {
366        // Test that PickerOutput works correctly when there are no expired SSTs
367        let picker_output = PickerOutput {
368            outputs: vec![],
369            expired_ssts: vec![],
370            time_window_size: 3600,
371            max_file_size: None,
372        };
373
374        // Verify empty expired_ssts
375        assert!(picker_output.expired_ssts.is_empty());
376    }
377
378    // Note: Testing remove_expired() directly requires extensive mocking of:
379    // - manifest_ctx (ManifestContext)
380    // - request_sender (mpsc::Sender<WorkerRequestWithTime>)
381    // - WorkerRequest handling
382    //
383    // The behavior is tested indirectly through integration tests:
384    // - remove_expired() logs errors but doesn't stop compaction
385    // - handle_expiration_and_compaction() continues even if remove_expired() encounters errors
386    // - The function is designed to be non-blocking for compaction
387}