mito2/compaction/
task.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::fmt::{Debug, Formatter};
16use std::sync::Arc;
17use std::time::Instant;
18
19use common_telemetry::{error, info};
20use snafu::ResultExt;
21use tokio::sync::mpsc;
22
23use crate::compaction::compactor::{CompactionRegion, Compactor};
24use crate::compaction::picker::{CompactionTask, PickerOutput};
25use crate::error::CompactRegionSnafu;
26use crate::manifest::action::RegionEdit;
27use crate::metrics::{COMPACTION_FAILURE_COUNT, COMPACTION_STAGE_ELAPSED};
28use crate::request::{
29    BackgroundNotify, CompactionFailed, CompactionFinished, OutputTx, WorkerRequest,
30    WorkerRequestWithTime,
31};
32use crate::worker::WorkerListener;
33use crate::{error, metrics};
34
35/// Maximum number of compaction tasks in parallel.
36pub const MAX_PARALLEL_COMPACTION: usize = 1;
37
38pub(crate) struct CompactionTaskImpl {
39    pub compaction_region: CompactionRegion,
40    /// Request sender to notify the worker.
41    pub(crate) request_sender: mpsc::Sender<WorkerRequestWithTime>,
42    /// Senders that are used to notify waiters waiting for pending compaction tasks.
43    pub waiters: Vec<OutputTx>,
44    /// Start time of compaction task
45    pub start_time: Instant,
46    /// Event listener.
47    pub(crate) listener: WorkerListener,
48    /// Compactor to handle compaction.
49    pub(crate) compactor: Arc<dyn Compactor>,
50    /// Output of the picker.
51    pub(crate) picker_output: PickerOutput,
52}
53
54impl Debug for CompactionTaskImpl {
55    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
56        f.debug_struct("TwcsCompactionTask")
57            .field("region_id", &self.compaction_region.region_id)
58            .field("picker_output", &self.picker_output)
59            .field(
60                "append_mode",
61                &self.compaction_region.region_options.append_mode,
62            )
63            .finish()
64    }
65}
66
67impl Drop for CompactionTaskImpl {
68    fn drop(&mut self) {
69        self.mark_files_compacting(false)
70    }
71}
72
73impl CompactionTaskImpl {
74    fn mark_files_compacting(&self, compacting: bool) {
75        self.picker_output
76            .outputs
77            .iter()
78            .for_each(|o| o.inputs.iter().for_each(|f| f.set_compacting(compacting)));
79    }
80
81    async fn handle_compaction(&mut self) -> error::Result<RegionEdit> {
82        self.mark_files_compacting(true);
83
84        let merge_timer = COMPACTION_STAGE_ELAPSED
85            .with_label_values(&["merge"])
86            .start_timer();
87
88        let compaction_result = match self
89            .compactor
90            .merge_ssts(&self.compaction_region, self.picker_output.clone())
91            .await
92        {
93            Ok(v) => v,
94            Err(e) => {
95                error!(e; "Failed to compact region: {}", self.compaction_region.region_id);
96                merge_timer.stop_and_discard();
97                return Err(e);
98            }
99        };
100        let merge_time = merge_timer.stop_and_record();
101
102        metrics::COMPACTION_INPUT_BYTES.inc_by(compaction_result.input_file_size() as f64);
103        metrics::COMPACTION_OUTPUT_BYTES.inc_by(compaction_result.output_file_size() as f64);
104        info!(
105            "Compacted SST files, region_id: {}, input: {:?}, output: {:?}, window: {:?}, waiter_num: {}, merge_time: {}s",
106            self.compaction_region.region_id,
107            compaction_result.files_to_remove,
108            compaction_result.files_to_add,
109            compaction_result.compaction_time_window,
110            self.waiters.len(),
111            merge_time,
112        );
113
114        self.listener
115            .on_merge_ssts_finished(self.compaction_region.region_id)
116            .await;
117
118        let _manifest_timer = COMPACTION_STAGE_ELAPSED
119            .with_label_values(&["write_manifest"])
120            .start_timer();
121
122        self.compactor
123            .update_manifest(&self.compaction_region, compaction_result)
124            .await
125    }
126
127    /// Handles compaction failure, notifies all waiters.
128    fn on_failure(&mut self, err: Arc<error::Error>) {
129        COMPACTION_FAILURE_COUNT.inc();
130        for waiter in self.waiters.drain(..) {
131            waiter.send(Err(err.clone()).context(CompactRegionSnafu {
132                region_id: self.compaction_region.region_id,
133            }));
134        }
135    }
136
137    /// Notifies region worker to handle post-compaction tasks.
138    async fn send_to_worker(&self, request: WorkerRequest) {
139        if let Err(e) = self
140            .request_sender
141            .send(WorkerRequestWithTime::new(request))
142            .await
143        {
144            error!(
145                "Failed to notify compaction job status for region {}, request: {:?}",
146                self.compaction_region.region_id, e.0
147            );
148        }
149    }
150}
151
152#[async_trait::async_trait]
153impl CompactionTask for CompactionTaskImpl {
154    async fn run(&mut self) {
155        let notify = match self.handle_compaction().await {
156            Ok(edit) => BackgroundNotify::CompactionFinished(CompactionFinished {
157                region_id: self.compaction_region.region_id,
158                senders: std::mem::take(&mut self.waiters),
159                start_time: self.start_time,
160                edit,
161            }),
162            Err(e) => {
163                error!(e; "Failed to compact region, region id: {}", self.compaction_region.region_id);
164                let err = Arc::new(e);
165                // notify compaction waiters
166                self.on_failure(err.clone());
167                BackgroundNotify::CompactionFailed(CompactionFailed {
168                    region_id: self.compaction_region.region_id,
169                    err,
170                })
171            }
172        };
173
174        self.send_to_worker(WorkerRequest::Background {
175            region_id: self.compaction_region.region_id,
176            notify,
177        })
178        .await;
179    }
180}