mito2/compaction/
task.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::fmt::{Debug, Formatter};
16use std::sync::Arc;
17use std::time::Instant;
18
19use common_telemetry::{error, info, warn};
20use itertools::Itertools;
21use snafu::ResultExt;
22use tokio::sync::mpsc;
23
24use crate::compaction::compactor::{CompactionRegion, Compactor};
25use crate::compaction::picker::{CompactionTask, PickerOutput};
26use crate::error::CompactRegionSnafu;
27use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList};
28use crate::metrics::{COMPACTION_FAILURE_COUNT, COMPACTION_STAGE_ELAPSED};
29use crate::region::RegionLeaderState;
30use crate::request::{
31    BackgroundNotify, CompactionFailed, CompactionFinished, OutputTx, RegionEditResult,
32    WorkerRequest, WorkerRequestWithTime,
33};
34use crate::sst::file::FileMeta;
35use crate::worker::WorkerListener;
36use crate::{error, metrics};
37
38/// Maximum number of compaction tasks in parallel.
39pub const MAX_PARALLEL_COMPACTION: usize = 1;
40
41pub(crate) struct CompactionTaskImpl {
42    pub compaction_region: CompactionRegion,
43    /// Request sender to notify the worker.
44    pub(crate) request_sender: mpsc::Sender<WorkerRequestWithTime>,
45    /// Senders that are used to notify waiters waiting for pending compaction tasks.
46    pub waiters: Vec<OutputTx>,
47    /// Start time of compaction task
48    pub start_time: Instant,
49    /// Event listener.
50    pub(crate) listener: WorkerListener,
51    /// Compactor to handle compaction.
52    pub(crate) compactor: Arc<dyn Compactor>,
53    /// Output of the picker.
54    pub(crate) picker_output: PickerOutput,
55}
56
57impl Debug for CompactionTaskImpl {
58    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
59        f.debug_struct("TwcsCompactionTask")
60            .field("region_id", &self.compaction_region.region_id)
61            .field("picker_output", &self.picker_output)
62            .field(
63                "append_mode",
64                &self.compaction_region.region_options.append_mode,
65            )
66            .finish()
67    }
68}
69
70impl Drop for CompactionTaskImpl {
71    fn drop(&mut self) {
72        self.mark_files_compacting(false)
73    }
74}
75
76impl CompactionTaskImpl {
77    fn mark_files_compacting(&self, compacting: bool) {
78        self.picker_output
79            .outputs
80            .iter()
81            .for_each(|o| o.inputs.iter().for_each(|f| f.set_compacting(compacting)));
82    }
83
84    /// Remove expired ssts files, update manifest immediately
85    /// and apply the edit to region version.
86    ///
87    /// This function logs errors but does not stop the compaction process if removal fails.
88    async fn remove_expired(
89        &self,
90        compaction_region: &CompactionRegion,
91        expired_files: Vec<FileMeta>,
92    ) {
93        let region_id = compaction_region.region_id;
94        let expired_files_str = expired_files.iter().map(|f| f.file_id).join(",");
95        let (expire_delete_sender, expire_delete_listener) = tokio::sync::oneshot::channel();
96        // Update manifest to remove expired SSTs
97        let edit = RegionEdit {
98            files_to_add: Vec::new(),
99            files_to_remove: expired_files,
100            timestamp_ms: Some(chrono::Utc::now().timestamp_millis()),
101            compaction_time_window: None,
102            flushed_entry_id: None,
103            flushed_sequence: None,
104            committed_sequence: None,
105        };
106
107        // 1. Update manifest
108        let action_list = RegionMetaActionList::with_action(RegionMetaAction::Edit(edit.clone()));
109        if let Err(e) = compaction_region
110            .manifest_ctx
111            .update_manifest(RegionLeaderState::Writable, action_list)
112            .await
113        {
114            error!(
115                e;
116                "Failed to update manifest for expired files removal, region: {region_id}, files: [{expired_files_str}]. Compaction will continue."
117            );
118            return;
119        }
120
121        // 2. Notify region worker loop to remove expired files from region version.
122        self.send_to_worker(WorkerRequest::Background {
123            region_id,
124            notify: BackgroundNotify::RegionEdit(RegionEditResult {
125                region_id,
126                sender: expire_delete_sender,
127                edit,
128                result: Ok(()),
129            }),
130        })
131        .await;
132
133        if let Err(e) = expire_delete_listener
134            .await
135            .context(error::RecvSnafu)
136            .flatten()
137        {
138            warn!(
139                e;
140                "Failed to remove expired files from region version, region: {region_id}, files: [{expired_files_str}]. Compaction will continue."
141            );
142            return;
143        }
144
145        info!(
146            "Successfully removed expired files, region: {region_id}, files: [{expired_files_str}]"
147        );
148    }
149
150    async fn handle_expiration_and_compaction(&mut self) -> error::Result<RegionEdit> {
151        self.mark_files_compacting(true);
152
153        // 1. In case of local compaction, we can delete expired ssts in advance.
154        if !self.picker_output.expired_ssts.is_empty() {
155            let remove_timer = COMPACTION_STAGE_ELAPSED
156                .with_label_values(&["remove_expired"])
157                .start_timer();
158            let expired_ssts = self
159                .picker_output
160                .expired_ssts
161                .drain(..)
162                .map(|f| f.meta_ref().clone())
163                .collect();
164            // remove_expired logs errors but doesn't stop compaction
165            self.remove_expired(&self.compaction_region, expired_ssts)
166                .await;
167            remove_timer.observe_duration();
168        }
169
170        // 2. Merge inputs
171        let merge_timer = COMPACTION_STAGE_ELAPSED
172            .with_label_values(&["merge"])
173            .start_timer();
174
175        let compaction_result = match self
176            .compactor
177            .merge_ssts(&self.compaction_region, self.picker_output.clone())
178            .await
179        {
180            Ok(v) => v,
181            Err(e) => {
182                error!(e; "Failed to compact region: {}", self.compaction_region.region_id);
183                merge_timer.stop_and_discard();
184                return Err(e);
185            }
186        };
187        let merge_time = merge_timer.stop_and_record();
188
189        metrics::COMPACTION_INPUT_BYTES.inc_by(compaction_result.input_file_size() as f64);
190        metrics::COMPACTION_OUTPUT_BYTES.inc_by(compaction_result.output_file_size() as f64);
191        info!(
192            "Compacted SST files, region_id: {}, input: {:?}, output: {:?}, window: {:?}, waiter_num: {}, merge_time: {}s",
193            self.compaction_region.region_id,
194            compaction_result.files_to_remove,
195            compaction_result.files_to_add,
196            compaction_result.compaction_time_window,
197            self.waiters.len(),
198            merge_time,
199        );
200
201        self.listener
202            .on_merge_ssts_finished(self.compaction_region.region_id)
203            .await;
204
205        let _manifest_timer = COMPACTION_STAGE_ELAPSED
206            .with_label_values(&["write_manifest"])
207            .start_timer();
208
209        self.compactor
210            .update_manifest(&self.compaction_region, compaction_result)
211            .await
212    }
213
214    /// Handles compaction failure, notifies all waiters.
215    fn on_failure(&mut self, err: Arc<error::Error>) {
216        COMPACTION_FAILURE_COUNT.inc();
217        for waiter in self.waiters.drain(..) {
218            waiter.send(Err(err.clone()).context(CompactRegionSnafu {
219                region_id: self.compaction_region.region_id,
220            }));
221        }
222    }
223
224    /// Notifies region worker to handle post-compaction tasks.
225    async fn send_to_worker(&self, request: WorkerRequest) {
226        if let Err(e) = self
227            .request_sender
228            .send(WorkerRequestWithTime::new(request))
229            .await
230        {
231            error!(
232                "Failed to notify compaction job status for region {}, request: {:?}",
233                self.compaction_region.region_id, e.0
234            );
235        }
236    }
237}
238
239#[async_trait::async_trait]
240impl CompactionTask for CompactionTaskImpl {
241    async fn run(&mut self) {
242        let notify = match self.handle_expiration_and_compaction().await {
243            Ok(edit) => BackgroundNotify::CompactionFinished(CompactionFinished {
244                region_id: self.compaction_region.region_id,
245                senders: std::mem::take(&mut self.waiters),
246                start_time: self.start_time,
247                edit,
248            }),
249            Err(e) => {
250                error!(e; "Failed to compact region, region id: {}", self.compaction_region.region_id);
251                let err = Arc::new(e);
252                // notify compaction waiters
253                self.on_failure(err.clone());
254                BackgroundNotify::CompactionFailed(CompactionFailed {
255                    region_id: self.compaction_region.region_id,
256                    err,
257                })
258            }
259        };
260
261        self.send_to_worker(WorkerRequest::Background {
262            region_id: self.compaction_region.region_id,
263            notify,
264        })
265        .await;
266    }
267}
268
269#[cfg(test)]
270mod tests {
271    use store_api::storage::FileId;
272
273    use crate::compaction::picker::PickerOutput;
274    use crate::compaction::test_util::new_file_handle;
275
276    #[test]
277    fn test_picker_output_with_expired_ssts() {
278        // Test that PickerOutput correctly includes expired_ssts
279        // This verifies that expired SSTs are properly identified and included
280        // in the picker output, which is then handled by handle_expiration_and_compaction
281
282        let file_ids = (0..3).map(|_| FileId::random()).collect::<Vec<_>>();
283        let expired_ssts = vec![
284            new_file_handle(file_ids[0], 0, 999, 0),
285            new_file_handle(file_ids[1], 1000, 1999, 0),
286        ];
287
288        let picker_output = PickerOutput {
289            outputs: vec![],
290            expired_ssts: expired_ssts.clone(),
291            time_window_size: 3600,
292            max_file_size: None,
293        };
294
295        // Verify expired_ssts are included
296        assert_eq!(picker_output.expired_ssts.len(), 2);
297        assert_eq!(
298            picker_output.expired_ssts[0].file_id(),
299            expired_ssts[0].file_id()
300        );
301        assert_eq!(
302            picker_output.expired_ssts[1].file_id(),
303            expired_ssts[1].file_id()
304        );
305    }
306
307    #[test]
308    fn test_picker_output_without_expired_ssts() {
309        // Test that PickerOutput works correctly when there are no expired SSTs
310        let picker_output = PickerOutput {
311            outputs: vec![],
312            expired_ssts: vec![],
313            time_window_size: 3600,
314            max_file_size: None,
315        };
316
317        // Verify empty expired_ssts
318        assert!(picker_output.expired_ssts.is_empty());
319    }
320
321    // Note: Testing remove_expired() directly requires extensive mocking of:
322    // - manifest_ctx (ManifestContext)
323    // - request_sender (mpsc::Sender<WorkerRequestWithTime>)
324    // - WorkerRequest handling
325    //
326    // The behavior is tested indirectly through integration tests:
327    // - remove_expired() logs errors but doesn't stop compaction
328    // - handle_expiration_and_compaction() continues even if remove_expired() encounters errors
329    // - The function is designed to be non-blocking for compaction
330}