Skip to main content

mito2/compaction/
task.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::fmt::{Debug, Formatter};
16use std::sync::Arc;
17use std::time::Instant;
18
19use common_base::cancellation::CancellableFuture;
20use common_memory_manager::OnExhaustedPolicy;
21use common_telemetry::{error, info, warn};
22use itertools::Itertools;
23use snafu::ResultExt;
24use tokio::sync::mpsc;
25
26use crate::compaction::LocalCompactionState;
27use crate::compaction::compactor::{CompactionRegion, Compactor, MergeOutput};
28use crate::compaction::memory_manager::{CompactionMemoryGuard, CompactionMemoryManager};
29use crate::compaction::picker::{CompactionTask, PickerOutput};
30use crate::error::{CompactRegionSnafu, CompactionMemoryExhaustedSnafu};
31use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList};
32use crate::metrics::{COMPACTION_FAILURE_COUNT, COMPACTION_MEMORY_WAIT, COMPACTION_STAGE_ELAPSED};
33use crate::region::RegionRoleState;
34use crate::request::{
35    BackgroundNotify, CompactionCancelled, CompactionFailed, CompactionFinished, OutputTx,
36    RegionEditResult, WorkerRequest, WorkerRequestWithTime,
37};
38use crate::sst::file::FileMeta;
39use crate::worker::WorkerListener;
40use crate::{error, metrics};
41
42/// Maximum number of compaction tasks in parallel.
43pub const MAX_PARALLEL_COMPACTION: usize = 1;
44
45pub(crate) struct CompactionTaskImpl {
46    /// Shared local-compaction state for cooperative cancellation.
47    pub(crate) state: LocalCompactionState,
48    pub compaction_region: CompactionRegion,
49    /// Request sender to notify the worker.
50    pub(crate) request_sender: mpsc::Sender<WorkerRequestWithTime>,
51    /// Senders that are used to notify waiters waiting for pending compaction tasks.
52    pub waiters: Vec<OutputTx>,
53    /// Start time of compaction task
54    pub start_time: Instant,
55    /// Event listener.
56    pub(crate) listener: WorkerListener,
57    /// Compactor to handle compaction.
58    pub(crate) compactor: Arc<dyn Compactor>,
59    /// Output of the picker.
60    pub(crate) picker_output: PickerOutput,
61    /// Memory manager to acquire memory budget.
62    pub(crate) memory_manager: Arc<CompactionMemoryManager>,
63    /// Policy when memory is exhausted.
64    pub(crate) memory_policy: OnExhaustedPolicy,
65    /// Estimated memory bytes needed for this compaction.
66    pub(crate) estimated_memory_bytes: u64,
67}
68
69impl Debug for CompactionTaskImpl {
70    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
71        f.debug_struct("TwcsCompactionTask")
72            .field("region_id", &self.compaction_region.region_id)
73            .field("picker_output", &self.picker_output)
74            .field(
75                "append_mode",
76                &self.compaction_region.region_options.append_mode,
77            )
78            .finish()
79    }
80}
81
82impl Drop for CompactionTaskImpl {
83    fn drop(&mut self) {
84        self.mark_files_compacting(false)
85    }
86}
87
88impl CompactionTaskImpl {
89    fn mark_files_compacting(&self, compacting: bool) {
90        self.picker_output
91            .outputs
92            .iter()
93            .for_each(|o| o.inputs.iter().for_each(|f| f.set_compacting(compacting)));
94    }
95
96    /// Acquires memory budget based on the configured policy.
97    ///
98    /// Returns an error if memory cannot be acquired according to the policy.
99    async fn acquire_memory_with_policy(&self) -> error::Result<CompactionMemoryGuard> {
100        let region_id = self.compaction_region.region_id;
101        let requested_bytes = self.estimated_memory_bytes;
102        let policy = self.memory_policy;
103
104        let _timer = COMPACTION_MEMORY_WAIT.start_timer();
105        self.memory_manager
106            .acquire_with_policy(requested_bytes, policy)
107            .await
108            .context(CompactionMemoryExhaustedSnafu {
109                region_id,
110                policy: format!("{policy:?}"),
111            })
112    }
113
114    /// Remove expired ssts files, update manifest immediately
115    /// and apply the edit to region version.
116    ///
117    /// This function logs errors but does not stop the compaction process if removal fails.
118    async fn remove_expired(
119        &self,
120        compaction_region: &CompactionRegion,
121        expired_files: Vec<FileMeta>,
122    ) {
123        let region_id = compaction_region.region_id;
124        let expired_files_str = expired_files.iter().map(|f| f.file_id).join(",");
125        let (expire_delete_sender, expire_delete_listener) = tokio::sync::oneshot::channel();
126        // Update manifest to remove expired SSTs
127        let edit = RegionEdit {
128            files_to_add: Vec::new(),
129            files_to_remove: expired_files,
130            timestamp_ms: Some(chrono::Utc::now().timestamp_millis()),
131            compaction_time_window: None,
132            flushed_entry_id: None,
133            flushed_sequence: None,
134            committed_sequence: None,
135        };
136
137        // 1. Update manifest
138        let action_list = RegionMetaActionList::with_action(RegionMetaAction::Edit(edit.clone()));
139        let RegionRoleState::Leader(current_region_state) =
140            compaction_region.manifest_ctx.current_state()
141        else {
142            warn!(
143                "Region {} not in leader state, skip removing expired files",
144                region_id
145            );
146            return;
147        };
148        if let Err(e) = compaction_region
149            .manifest_ctx
150            .update_manifest(current_region_state, action_list, false)
151            .await
152        {
153            warn!(
154                e;
155                "Failed to update manifest for expired files removal, region: {region_id}, files: [{expired_files_str}]. Compaction will continue."
156            );
157            return;
158        }
159
160        // 2. Notify region worker loop to remove expired files from region version.
161        self.send_to_worker(WorkerRequest::Background {
162            region_id,
163            notify: BackgroundNotify::RegionEdit(RegionEditResult {
164                region_id,
165                sender: expire_delete_sender,
166                edit,
167                result: Ok(()),
168                update_region_state: false,
169                is_staging: false,
170            }),
171        })
172        .await;
173
174        if let Err(e) = expire_delete_listener
175            .await
176            .context(error::RecvSnafu)
177            .flatten()
178        {
179            warn!(
180                e;
181                "Failed to remove expired files from region version, region: {region_id}, files: [{expired_files_str}]. Compaction will continue."
182            );
183            return;
184        }
185
186        info!(
187            "Successfully removed expired files, region: {region_id}, files: [{expired_files_str}]"
188        );
189    }
190
191    async fn handle_expiration(&mut self) {
192        // 1. In case of local compaction, we can delete expired ssts in advance.
193        if !self.picker_output.expired_ssts.is_empty() {
194            let remove_timer = COMPACTION_STAGE_ELAPSED
195                .with_label_values(&["remove_expired"])
196                .start_timer();
197            let expired_ssts = self
198                .picker_output
199                .expired_ssts
200                .drain(..)
201                .map(|f| f.meta_ref().clone())
202                .collect();
203            // remove_expired logs errors but doesn't stop compaction
204            self.remove_expired(&self.compaction_region, expired_ssts)
205                .await;
206            remove_timer.observe_duration();
207        }
208    }
209
210    async fn handle_compaction(&mut self) -> error::Result<MergeOutput> {
211        // 2. Merge inputs
212        let merge_timer = COMPACTION_STAGE_ELAPSED
213            .with_label_values(&["merge"])
214            .start_timer();
215
216        let compaction_result = match self
217            .compactor
218            .merge_ssts(&self.compaction_region, self.picker_output.clone())
219            .await
220        {
221            Ok(v) => v,
222            Err(e) => {
223                error!(e; "Failed to compact region: {}", self.compaction_region.region_id);
224                merge_timer.stop_and_discard();
225                return Err(e);
226            }
227        };
228        let merge_time = merge_timer.stop_and_record();
229
230        metrics::COMPACTION_INPUT_BYTES.inc_by(compaction_result.input_file_size() as f64);
231        metrics::COMPACTION_OUTPUT_BYTES.inc_by(compaction_result.output_file_size() as f64);
232        info!(
233            "Compacted SST files, region_id: {}, input: {:?}, output: {:?}, window: {:?}, waiter_num: {}, merge_time: {}s",
234            self.compaction_region.region_id,
235            compaction_result.files_to_remove,
236            compaction_result.files_to_add,
237            compaction_result.compaction_time_window,
238            self.waiters.len(),
239            merge_time,
240        );
241
242        self.listener
243            .on_merge_ssts_finished(self.compaction_region.region_id)
244            .await;
245
246        Ok(compaction_result)
247    }
248
249    async fn update_manifest(
250        &self,
251        compaction_result: crate::compaction::compactor::MergeOutput,
252    ) -> error::Result<RegionEdit> {
253        let _manifest_timer = COMPACTION_STAGE_ELAPSED
254            .with_label_values(&["write_manifest"])
255            .start_timer();
256
257        self.compactor
258            .update_manifest(&self.compaction_region, compaction_result)
259            .await
260    }
261
262    /// Handles compaction failure, notifies all waiters.
263    pub(crate) fn on_failure(&mut self, err: Arc<error::Error>) {
264        COMPACTION_FAILURE_COUNT.inc();
265        for waiter in self.waiters.drain(..) {
266            waiter.send(Err(err.clone()).context(CompactRegionSnafu {
267                region_id: self.compaction_region.region_id,
268            }));
269        }
270    }
271
272    /// Notifies region worker to handle post-compaction tasks.
273    async fn send_to_worker(&self, request: WorkerRequest) {
274        if let Err(e) = self
275            .request_sender
276            .send(WorkerRequestWithTime::new(request))
277            .await
278        {
279            error!(
280                "Failed to notify compaction job status for region {}, request: {:?}",
281                self.compaction_region.region_id, e.0
282            );
283        }
284    }
285}
286
287#[async_trait::async_trait]
288impl CompactionTask for CompactionTaskImpl {
289    async fn run(&mut self) {
290        // Acquire memory budget before starting compaction
291        let _memory_guard = match self.acquire_memory_with_policy().await {
292            Ok(guard) => guard,
293            Err(e) => {
294                error!(e; "Failed to acquire memory for compaction, region id: {}", self.compaction_region.region_id);
295                let err = Arc::new(e);
296                self.on_failure(err.clone());
297                let notify = BackgroundNotify::CompactionFailed(CompactionFailed {
298                    region_id: self.compaction_region.region_id,
299                    err,
300                });
301                self.send_to_worker(WorkerRequest::Background {
302                    region_id: self.compaction_region.region_id,
303                    notify,
304                })
305                .await;
306                return;
307            }
308        };
309
310        // Marks files compacting before compaction and unmark after compaction (even if compaction is cancelled or failed), so that they won't be picked by other compaction tasks.
311        self.mark_files_compacting(true);
312        self.handle_expiration().await;
313
314        let cancel_handle = self.state.cancel_handle();
315        // Run compaction with cooperative cancellation.
316        let notify = match CancellableFuture::new(
317            async { self.handle_compaction().await },
318            cancel_handle,
319        )
320        .await
321        {
322            Ok(Ok(merge_output)) => {
323                // Stop accepting cancellation once we are about to publish the compaction edit.
324                if !self.state.mark_commit_started() {
325                    let senders = std::mem::take(&mut self.waiters);
326                    BackgroundNotify::CompactionCancelled(CompactionCancelled {
327                        region_id: self.compaction_region.region_id,
328                        senders,
329                    })
330                } else {
331                    match self.update_manifest(merge_output).await {
332                        Ok(edit) => {
333                            let senders = std::mem::take(&mut self.waiters);
334                            BackgroundNotify::CompactionFinished(CompactionFinished {
335                                region_id: self.compaction_region.region_id,
336                                senders,
337                                start_time: self.start_time,
338                                edit,
339                            })
340                        }
341                        Err(e) => {
342                            error!(e; "Failed to compact region, region id: {}", self.compaction_region.region_id);
343                            let err = Arc::new(e);
344                            self.on_failure(err.clone());
345                            BackgroundNotify::CompactionFailed(CompactionFailed {
346                                region_id: self.compaction_region.region_id,
347                                err,
348                            })
349                        }
350                    }
351                }
352            }
353            Err(_) => {
354                info!(
355                    "Compaction cancelled, region id: {}",
356                    self.compaction_region.region_id
357                );
358                let senders = std::mem::take(&mut self.waiters);
359                BackgroundNotify::CompactionCancelled(CompactionCancelled {
360                    region_id: self.compaction_region.region_id,
361                    senders,
362                })
363            }
364            Ok(Err(e)) => {
365                error!(e; "Failed to compact region, region id: {}", self.compaction_region.region_id);
366                let err = Arc::new(e);
367                // notify compaction waiters
368                self.on_failure(err.clone());
369                BackgroundNotify::CompactionFailed(CompactionFailed {
370                    region_id: self.compaction_region.region_id,
371                    err,
372                })
373            }
374        };
375
376        self.send_to_worker(WorkerRequest::Background {
377            region_id: self.compaction_region.region_id,
378            notify,
379        })
380        .await;
381    }
382}
383
384#[cfg(test)]
385mod tests {
386    use store_api::storage::FileId;
387
388    use crate::compaction::picker::PickerOutput;
389    use crate::compaction::test_util::new_file_handle;
390
391    #[test]
392    fn test_picker_output_with_expired_ssts() {
393        // Test that PickerOutput correctly includes expired_ssts
394        // This verifies that expired SSTs are properly identified and included
395        // in the picker output, which is then handled by handle_expiration()
396
397        let file_ids = (0..3).map(|_| FileId::random()).collect::<Vec<_>>();
398        let expired_ssts = vec![
399            new_file_handle(file_ids[0], 0, 999, 0),
400            new_file_handle(file_ids[1], 1000, 1999, 0),
401        ];
402
403        let picker_output = PickerOutput {
404            outputs: vec![],
405            expired_ssts: expired_ssts.clone(),
406            time_window_size: 3600,
407            max_file_size: None,
408        };
409
410        // Verify expired_ssts are included
411        assert_eq!(picker_output.expired_ssts.len(), 2);
412        assert_eq!(
413            picker_output.expired_ssts[0].file_id(),
414            expired_ssts[0].file_id()
415        );
416        assert_eq!(
417            picker_output.expired_ssts[1].file_id(),
418            expired_ssts[1].file_id()
419        );
420    }
421
422    #[test]
423    fn test_picker_output_without_expired_ssts() {
424        // Test that PickerOutput works correctly when there are no expired SSTs
425        let picker_output = PickerOutput {
426            outputs: vec![],
427            expired_ssts: vec![],
428            time_window_size: 3600,
429            max_file_size: None,
430        };
431
432        // Verify empty expired_ssts
433        assert!(picker_output.expired_ssts.is_empty());
434    }
435
436    // Note: Testing remove_expired() directly requires extensive mocking of:
437    // - manifest_ctx (ManifestContext)
438    // - request_sender (mpsc::Sender<WorkerRequestWithTime>)
439    // - WorkerRequest handling
440    //
441    // The behavior is tested indirectly through integration tests:
442    // - remove_expired() logs errors but doesn't stop compaction
443    // - handle_expiration() continues even if remove_expired() encounters errors
444    // - The expiration stage is designed to be non-blocking for compaction
445}