mito2/compaction/
task.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::fmt::{Debug, Formatter};
16use std::sync::Arc;
17use std::time::Instant;
18
19use common_telemetry::{error, info, warn};
20use itertools::Itertools;
21use snafu::ResultExt;
22use tokio::sync::mpsc;
23
24use crate::compaction::compactor::{CompactionRegion, Compactor};
25use crate::compaction::picker::{CompactionTask, PickerOutput};
26use crate::error::CompactRegionSnafu;
27use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList};
28use crate::metrics::{COMPACTION_FAILURE_COUNT, COMPACTION_STAGE_ELAPSED};
29use crate::region::RegionRoleState;
30use crate::request::{
31    BackgroundNotify, CompactionFailed, CompactionFinished, OutputTx, RegionEditResult,
32    WorkerRequest, WorkerRequestWithTime,
33};
34use crate::sst::file::FileMeta;
35use crate::worker::WorkerListener;
36use crate::{error, metrics};
37
38/// Maximum number of compaction tasks in parallel.
39pub const MAX_PARALLEL_COMPACTION: usize = 1;
40
41pub(crate) struct CompactionTaskImpl {
42    pub compaction_region: CompactionRegion,
43    /// Request sender to notify the worker.
44    pub(crate) request_sender: mpsc::Sender<WorkerRequestWithTime>,
45    /// Senders that are used to notify waiters waiting for pending compaction tasks.
46    pub waiters: Vec<OutputTx>,
47    /// Start time of compaction task
48    pub start_time: Instant,
49    /// Event listener.
50    pub(crate) listener: WorkerListener,
51    /// Compactor to handle compaction.
52    pub(crate) compactor: Arc<dyn Compactor>,
53    /// Output of the picker.
54    pub(crate) picker_output: PickerOutput,
55}
56
57impl Debug for CompactionTaskImpl {
58    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
59        f.debug_struct("TwcsCompactionTask")
60            .field("region_id", &self.compaction_region.region_id)
61            .field("picker_output", &self.picker_output)
62            .field(
63                "append_mode",
64                &self.compaction_region.region_options.append_mode,
65            )
66            .finish()
67    }
68}
69
70impl Drop for CompactionTaskImpl {
71    fn drop(&mut self) {
72        self.mark_files_compacting(false)
73    }
74}
75
76impl CompactionTaskImpl {
77    fn mark_files_compacting(&self, compacting: bool) {
78        self.picker_output
79            .outputs
80            .iter()
81            .for_each(|o| o.inputs.iter().for_each(|f| f.set_compacting(compacting)));
82    }
83
84    /// Remove expired ssts files, update manifest immediately
85    /// and apply the edit to region version.
86    ///
87    /// This function logs errors but does not stop the compaction process if removal fails.
88    async fn remove_expired(
89        &self,
90        compaction_region: &CompactionRegion,
91        expired_files: Vec<FileMeta>,
92    ) {
93        let region_id = compaction_region.region_id;
94        let expired_files_str = expired_files.iter().map(|f| f.file_id).join(",");
95        let (expire_delete_sender, expire_delete_listener) = tokio::sync::oneshot::channel();
96        // Update manifest to remove expired SSTs
97        let edit = RegionEdit {
98            files_to_add: Vec::new(),
99            files_to_remove: expired_files,
100            timestamp_ms: Some(chrono::Utc::now().timestamp_millis()),
101            compaction_time_window: None,
102            flushed_entry_id: None,
103            flushed_sequence: None,
104            committed_sequence: None,
105        };
106
107        // 1. Update manifest
108        let action_list = RegionMetaActionList::with_action(RegionMetaAction::Edit(edit.clone()));
109        let RegionRoleState::Leader(current_region_state) =
110            compaction_region.manifest_ctx.current_state()
111        else {
112            warn!(
113                "Region {} not in leader state, skip removing expired files",
114                region_id
115            );
116            return;
117        };
118        if let Err(e) = compaction_region
119            .manifest_ctx
120            .update_manifest(current_region_state, action_list, false)
121            .await
122        {
123            warn!(
124                e;
125                "Failed to update manifest for expired files removal, region: {region_id}, files: [{expired_files_str}]. Compaction will continue."
126            );
127            return;
128        }
129
130        // 2. Notify region worker loop to remove expired files from region version.
131        self.send_to_worker(WorkerRequest::Background {
132            region_id,
133            notify: BackgroundNotify::RegionEdit(RegionEditResult {
134                region_id,
135                sender: expire_delete_sender,
136                edit,
137                result: Ok(()),
138                update_region_state: false,
139            }),
140        })
141        .await;
142
143        if let Err(e) = expire_delete_listener
144            .await
145            .context(error::RecvSnafu)
146            .flatten()
147        {
148            warn!(
149                e;
150                "Failed to remove expired files from region version, region: {region_id}, files: [{expired_files_str}]. Compaction will continue."
151            );
152            return;
153        }
154
155        info!(
156            "Successfully removed expired files, region: {region_id}, files: [{expired_files_str}]"
157        );
158    }
159
160    async fn handle_expiration_and_compaction(&mut self) -> error::Result<RegionEdit> {
161        self.mark_files_compacting(true);
162
163        // 1. In case of local compaction, we can delete expired ssts in advance.
164        if !self.picker_output.expired_ssts.is_empty() {
165            let remove_timer = COMPACTION_STAGE_ELAPSED
166                .with_label_values(&["remove_expired"])
167                .start_timer();
168            let expired_ssts = self
169                .picker_output
170                .expired_ssts
171                .drain(..)
172                .map(|f| f.meta_ref().clone())
173                .collect();
174            // remove_expired logs errors but doesn't stop compaction
175            self.remove_expired(&self.compaction_region, expired_ssts)
176                .await;
177            remove_timer.observe_duration();
178        }
179
180        // 2. Merge inputs
181        let merge_timer = COMPACTION_STAGE_ELAPSED
182            .with_label_values(&["merge"])
183            .start_timer();
184
185        let compaction_result = match self
186            .compactor
187            .merge_ssts(&self.compaction_region, self.picker_output.clone())
188            .await
189        {
190            Ok(v) => v,
191            Err(e) => {
192                error!(e; "Failed to compact region: {}", self.compaction_region.region_id);
193                merge_timer.stop_and_discard();
194                return Err(e);
195            }
196        };
197        let merge_time = merge_timer.stop_and_record();
198
199        metrics::COMPACTION_INPUT_BYTES.inc_by(compaction_result.input_file_size() as f64);
200        metrics::COMPACTION_OUTPUT_BYTES.inc_by(compaction_result.output_file_size() as f64);
201        info!(
202            "Compacted SST files, region_id: {}, input: {:?}, output: {:?}, window: {:?}, waiter_num: {}, merge_time: {}s",
203            self.compaction_region.region_id,
204            compaction_result.files_to_remove,
205            compaction_result.files_to_add,
206            compaction_result.compaction_time_window,
207            self.waiters.len(),
208            merge_time,
209        );
210
211        self.listener
212            .on_merge_ssts_finished(self.compaction_region.region_id)
213            .await;
214
215        let _manifest_timer = COMPACTION_STAGE_ELAPSED
216            .with_label_values(&["write_manifest"])
217            .start_timer();
218
219        self.compactor
220            .update_manifest(&self.compaction_region, compaction_result)
221            .await
222    }
223
224    /// Handles compaction failure, notifies all waiters.
225    fn on_failure(&mut self, err: Arc<error::Error>) {
226        COMPACTION_FAILURE_COUNT.inc();
227        for waiter in self.waiters.drain(..) {
228            waiter.send(Err(err.clone()).context(CompactRegionSnafu {
229                region_id: self.compaction_region.region_id,
230            }));
231        }
232    }
233
234    /// Notifies region worker to handle post-compaction tasks.
235    async fn send_to_worker(&self, request: WorkerRequest) {
236        if let Err(e) = self
237            .request_sender
238            .send(WorkerRequestWithTime::new(request))
239            .await
240        {
241            error!(
242                "Failed to notify compaction job status for region {}, request: {:?}",
243                self.compaction_region.region_id, e.0
244            );
245        }
246    }
247}
248
249#[async_trait::async_trait]
250impl CompactionTask for CompactionTaskImpl {
251    async fn run(&mut self) {
252        let notify = match self.handle_expiration_and_compaction().await {
253            Ok(edit) => BackgroundNotify::CompactionFinished(CompactionFinished {
254                region_id: self.compaction_region.region_id,
255                senders: std::mem::take(&mut self.waiters),
256                start_time: self.start_time,
257                edit,
258            }),
259            Err(e) => {
260                error!(e; "Failed to compact region, region id: {}", self.compaction_region.region_id);
261                let err = Arc::new(e);
262                // notify compaction waiters
263                self.on_failure(err.clone());
264                BackgroundNotify::CompactionFailed(CompactionFailed {
265                    region_id: self.compaction_region.region_id,
266                    err,
267                })
268            }
269        };
270
271        self.send_to_worker(WorkerRequest::Background {
272            region_id: self.compaction_region.region_id,
273            notify,
274        })
275        .await;
276    }
277}
278
279#[cfg(test)]
280mod tests {
281    use store_api::storage::FileId;
282
283    use crate::compaction::picker::PickerOutput;
284    use crate::compaction::test_util::new_file_handle;
285
286    #[test]
287    fn test_picker_output_with_expired_ssts() {
288        // Test that PickerOutput correctly includes expired_ssts
289        // This verifies that expired SSTs are properly identified and included
290        // in the picker output, which is then handled by handle_expiration_and_compaction
291
292        let file_ids = (0..3).map(|_| FileId::random()).collect::<Vec<_>>();
293        let expired_ssts = vec![
294            new_file_handle(file_ids[0], 0, 999, 0),
295            new_file_handle(file_ids[1], 1000, 1999, 0),
296        ];
297
298        let picker_output = PickerOutput {
299            outputs: vec![],
300            expired_ssts: expired_ssts.clone(),
301            time_window_size: 3600,
302            max_file_size: None,
303        };
304
305        // Verify expired_ssts are included
306        assert_eq!(picker_output.expired_ssts.len(), 2);
307        assert_eq!(
308            picker_output.expired_ssts[0].file_id(),
309            expired_ssts[0].file_id()
310        );
311        assert_eq!(
312            picker_output.expired_ssts[1].file_id(),
313            expired_ssts[1].file_id()
314        );
315    }
316
317    #[test]
318    fn test_picker_output_without_expired_ssts() {
319        // Test that PickerOutput works correctly when there are no expired SSTs
320        let picker_output = PickerOutput {
321            outputs: vec![],
322            expired_ssts: vec![],
323            time_window_size: 3600,
324            max_file_size: None,
325        };
326
327        // Verify empty expired_ssts
328        assert!(picker_output.expired_ssts.is_empty());
329    }
330
331    // Note: Testing remove_expired() directly requires extensive mocking of:
332    // - manifest_ctx (ManifestContext)
333    // - request_sender (mpsc::Sender<WorkerRequestWithTime>)
334    // - WorkerRequest handling
335    //
336    // The behavior is tested indirectly through integration tests:
337    // - remove_expired() logs errors but doesn't stop compaction
338    // - handle_expiration_and_compaction() continues even if remove_expired() encounters errors
339    // - The function is designed to be non-blocking for compaction
340}