mito2/compaction/
task.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::fmt::{Debug, Formatter};
16use std::sync::Arc;
17use std::time::Instant;
18
19use common_telemetry::{error, info};
20use snafu::ResultExt;
21use tokio::sync::mpsc;
22
23use crate::compaction::compactor::{CompactionRegion, Compactor};
24use crate::compaction::picker::{CompactionTask, PickerOutput};
25use crate::error::CompactRegionSnafu;
26use crate::manifest::action::RegionEdit;
27use crate::metrics::{COMPACTION_FAILURE_COUNT, COMPACTION_STAGE_ELAPSED};
28use crate::request::{
29    BackgroundNotify, CompactionFailed, CompactionFinished, OutputTx, WorkerRequest,
30};
31use crate::worker::WorkerListener;
32use crate::{error, metrics};
33
34/// Maximum number of compaction tasks in parallel.
35pub const MAX_PARALLEL_COMPACTION: usize = 1;
36
37pub(crate) struct CompactionTaskImpl {
38    pub compaction_region: CompactionRegion,
39    /// Request sender to notify the worker.
40    pub(crate) request_sender: mpsc::Sender<WorkerRequest>,
41    /// Senders that are used to notify waiters waiting for pending compaction tasks.
42    pub waiters: Vec<OutputTx>,
43    /// Start time of compaction task
44    pub start_time: Instant,
45    /// Event listener.
46    pub(crate) listener: WorkerListener,
47    /// Compactor to handle compaction.
48    pub(crate) compactor: Arc<dyn Compactor>,
49    /// Output of the picker.
50    pub(crate) picker_output: PickerOutput,
51}
52
53impl Debug for CompactionTaskImpl {
54    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
55        f.debug_struct("TwcsCompactionTask")
56            .field("region_id", &self.compaction_region.region_id)
57            .field("picker_output", &self.picker_output)
58            .field(
59                "append_mode",
60                &self.compaction_region.region_options.append_mode,
61            )
62            .finish()
63    }
64}
65
66impl Drop for CompactionTaskImpl {
67    fn drop(&mut self) {
68        self.mark_files_compacting(false)
69    }
70}
71
72impl CompactionTaskImpl {
73    fn mark_files_compacting(&self, compacting: bool) {
74        self.picker_output
75            .outputs
76            .iter()
77            .for_each(|o| o.inputs.iter().for_each(|f| f.set_compacting(compacting)));
78    }
79
80    async fn handle_compaction(&mut self) -> error::Result<RegionEdit> {
81        self.mark_files_compacting(true);
82
83        let merge_timer = COMPACTION_STAGE_ELAPSED
84            .with_label_values(&["merge"])
85            .start_timer();
86
87        let compaction_result = match self
88            .compactor
89            .merge_ssts(&self.compaction_region, self.picker_output.clone())
90            .await
91        {
92            Ok(v) => v,
93            Err(e) => {
94                error!(e; "Failed to compact region: {}", self.compaction_region.region_id);
95                merge_timer.stop_and_discard();
96                return Err(e);
97            }
98        };
99        let merge_time = merge_timer.stop_and_record();
100
101        metrics::COMPACTION_INPUT_BYTES.inc_by(compaction_result.input_file_size() as f64);
102        metrics::COMPACTION_OUTPUT_BYTES.inc_by(compaction_result.output_file_size() as f64);
103        info!(
104            "Compacted SST files, region_id: {}, input: {:?}, output: {:?}, window: {:?}, waiter_num: {}, merge_time: {}s",
105            self.compaction_region.region_id,
106            compaction_result.files_to_remove,
107            compaction_result.files_to_add,
108            compaction_result.compaction_time_window,
109            self.waiters.len(),
110            merge_time,
111        );
112
113        self.listener
114            .on_merge_ssts_finished(self.compaction_region.region_id)
115            .await;
116
117        let _manifest_timer = COMPACTION_STAGE_ELAPSED
118            .with_label_values(&["write_manifest"])
119            .start_timer();
120
121        self.compactor
122            .update_manifest(&self.compaction_region, compaction_result)
123            .await
124    }
125
126    /// Handles compaction failure, notifies all waiters.
127    fn on_failure(&mut self, err: Arc<error::Error>) {
128        COMPACTION_FAILURE_COUNT.inc();
129        for waiter in self.waiters.drain(..) {
130            waiter.send(Err(err.clone()).context(CompactRegionSnafu {
131                region_id: self.compaction_region.region_id,
132            }));
133        }
134    }
135
136    /// Notifies region worker to handle post-compaction tasks.
137    async fn send_to_worker(&self, request: WorkerRequest) {
138        if let Err(e) = self.request_sender.send(request).await {
139            error!(
140                "Failed to notify compaction job status for region {}, request: {:?}",
141                self.compaction_region.region_id, e.0
142            );
143        }
144    }
145}
146
147#[async_trait::async_trait]
148impl CompactionTask for CompactionTaskImpl {
149    async fn run(&mut self) {
150        let notify = match self.handle_compaction().await {
151            Ok(edit) => BackgroundNotify::CompactionFinished(CompactionFinished {
152                region_id: self.compaction_region.region_id,
153                senders: std::mem::take(&mut self.waiters),
154                start_time: self.start_time,
155                edit,
156            }),
157            Err(e) => {
158                error!(e; "Failed to compact region, region id: {}", self.compaction_region.region_id);
159                let err = Arc::new(e);
160                // notify compaction waiters
161                self.on_failure(err.clone());
162                BackgroundNotify::CompactionFailed(CompactionFailed {
163                    region_id: self.compaction_region.region_id,
164                    err,
165                })
166            }
167        };
168
169        self.send_to_worker(WorkerRequest::Background {
170            region_id: self.compaction_region.region_id,
171            notify,
172        })
173        .await;
174    }
175}