mito2/worker/
handle_compaction.rs1use api::v1::region::compact_request;
16use common_telemetry::{error, info, warn};
17use store_api::region_request::RegionCompactRequest;
18use store_api::storage::RegionId;
19
20use crate::config::IndexBuildMode;
21use crate::error::RegionNotFoundSnafu;
22use crate::metrics::COMPACTION_REQUEST_COUNT;
23use crate::region::MitoRegionRef;
24use crate::request::{
25 BuildIndexRequest, CompactionFailed, CompactionFinished, OnFailure, OptionOutputTx,
26};
27use crate::sst::index::IndexBuildType;
28use crate::worker::RegionWorkerLoop;
29
30impl<S> RegionWorkerLoop<S> {
31 pub(crate) async fn handle_compaction_request(
33 &mut self,
34 region_id: RegionId,
35 req: RegionCompactRequest,
36 mut sender: OptionOutputTx,
37 ) {
38 let Some(region) = self.regions.writable_region_or(region_id, &mut sender) else {
39 return;
40 };
41 COMPACTION_REQUEST_COUNT.inc();
42 let parallelism = req.parallelism.unwrap_or(1) as usize;
43 if let Err(e) = self
44 .compaction_scheduler
45 .schedule_compaction(
46 region.region_id,
47 req.options,
48 ®ion.version_control,
49 ®ion.access_layer,
50 sender,
51 ®ion.manifest_ctx,
52 self.schema_metadata_manager.clone(),
53 parallelism,
54 )
55 .await
56 {
57 error!(e; "Failed to schedule compaction task for region: {}", region_id);
58 } else {
59 info!(
60 "Successfully scheduled compaction task for region: {}",
61 region_id
62 );
63 }
64 }
65
66 pub(crate) async fn handle_compaction_finished(
68 &mut self,
69 region_id: RegionId,
70 mut request: CompactionFinished,
71 ) {
72 let region = match self.regions.get_region(region_id) {
73 Some(region) => region,
74 None => {
75 request.on_failure(RegionNotFoundSnafu { region_id }.build());
76 return;
77 }
78 };
79 region.update_compaction_millis();
80
81 region.version_control.apply_edit(
82 Some(request.edit.clone()),
83 &[],
84 region.file_purger.clone(),
85 );
86
87 let index_build_file_metas = std::mem::take(&mut request.edit.files_to_add);
88
89 request.on_success();
91
92 if self.config.index.build_mode == IndexBuildMode::Async
94 && !index_build_file_metas.is_empty()
95 {
96 self.handle_rebuild_index(
97 BuildIndexRequest {
98 region_id,
99 build_type: IndexBuildType::Compact,
100 file_metas: index_build_file_metas,
101 },
102 OptionOutputTx::new(None),
103 )
104 .await;
105 }
106
107 self.compaction_scheduler
109 .on_compaction_finished(
110 region_id,
111 ®ion.manifest_ctx,
112 self.schema_metadata_manager.clone(),
113 )
114 .await;
115 }
116
117 pub(crate) async fn handle_compaction_failure(&mut self, req: CompactionFailed) {
119 error!(req.err; "Failed to compact region: {}", req.region_id);
120
121 self.compaction_scheduler
122 .on_compaction_failed(req.region_id, req.err);
123 }
124
125 pub(crate) async fn schedule_compaction(&mut self, region: &MitoRegionRef) {
127 let now = self.time_provider.current_time_millis();
128 if now - region.last_compaction_millis()
129 >= self.config.min_compaction_interval.as_millis() as i64
130 && let Err(e) = self
131 .compaction_scheduler
132 .schedule_compaction(
133 region.region_id,
134 compact_request::Options::Regular(Default::default()),
135 ®ion.version_control,
136 ®ion.access_layer,
137 OptionOutputTx::none(),
138 ®ion.manifest_ctx,
139 self.schema_metadata_manager.clone(),
140 1, )
142 .await
143 {
144 warn!(
145 "Failed to schedule compaction for region: {}, err: {}",
146 region.region_id, e
147 );
148 }
149 }
150}