mito2/worker/
handle_compaction.rs1use api::v1::region::compact_request;
16use common_telemetry::{error, info, warn};
17use store_api::logstore::LogStore;
18use store_api::region_request::RegionCompactRequest;
19use store_api::storage::RegionId;
20
21use crate::config::IndexBuildMode;
22use crate::error::RegionNotFoundSnafu;
23use crate::metrics::COMPACTION_REQUEST_COUNT;
24use crate::region::MitoRegionRef;
25use crate::request::{
26 BuildIndexRequest, CompactionCancelled, CompactionFailed, CompactionFinished, OnFailure,
27 OptionOutputTx,
28};
29use crate::sst::index::IndexBuildType;
30use crate::worker::RegionWorkerLoop;
31
32impl<S> RegionWorkerLoop<S> {
33 pub(crate) async fn handle_compaction_request(
35 &mut self,
36 region_id: RegionId,
37 req: RegionCompactRequest,
38 mut sender: OptionOutputTx,
39 ) {
40 let Some(region) = self.regions.writable_region_or(region_id, &mut sender) else {
41 return;
42 };
43 COMPACTION_REQUEST_COUNT.inc();
44 let parallelism = req.parallelism.unwrap_or(1) as usize;
45 if let Err(e) = self
46 .compaction_scheduler
47 .schedule_compaction(
48 region.region_id,
49 req.options,
50 ®ion.version_control,
51 ®ion.access_layer,
52 sender,
53 ®ion.manifest_ctx,
54 self.schema_metadata_manager.clone(),
55 parallelism,
56 )
57 .await
58 {
59 error!(e; "Failed to schedule compaction task for region: {}", region_id);
60 } else {
61 info!(
62 "Successfully scheduled compaction task for region: {}",
63 region_id
64 );
65 }
66 }
67
68 pub(crate) async fn handle_compaction_finished(
70 &mut self,
71 region_id: RegionId,
72 mut request: CompactionFinished,
73 ) where
74 S: LogStore,
75 {
76 let region = match self.regions.get_region(region_id) {
77 Some(region) => region,
78 None => {
79 request.on_failure(RegionNotFoundSnafu { region_id }.build());
80 return;
81 }
82 };
83 region.update_compaction_millis();
84
85 region.version_control.apply_edit(
86 Some(request.edit.clone()),
87 &[],
88 region.file_purger.clone(),
89 );
90
91 let index_build_file_metas = std::mem::take(&mut request.edit.files_to_add);
92
93 request.on_success();
95
96 if self.config.index.build_mode == IndexBuildMode::Async
98 && !index_build_file_metas.is_empty()
99 {
100 self.handle_rebuild_index(
101 BuildIndexRequest {
102 region_id,
103 build_type: IndexBuildType::Compact,
104 file_metas: index_build_file_metas,
105 },
106 OptionOutputTx::new(None),
107 )
108 .await;
109 }
110
111 let mut pending_ddls = self
113 .compaction_scheduler
114 .on_compaction_finished(
115 region_id,
116 ®ion.manifest_ctx,
117 self.schema_metadata_manager.clone(),
118 )
119 .await;
120 self.handle_ddl_requests(&mut pending_ddls).await;
121 }
122
123 pub(crate) async fn handle_compaction_cancelled(
124 &mut self,
125 region_id: RegionId,
126 request: CompactionCancelled,
127 ) where
128 S: LogStore,
129 {
130 request.on_success();
131
132 let mut pending_ddls = match self.regions.get_region(region_id) {
134 Some(_) => {
135 self.compaction_scheduler
136 .on_compaction_cancelled(region_id)
137 .await
138 }
139 None => Vec::new(),
140 };
141
142 self.handle_ddl_requests(&mut pending_ddls).await;
143 }
144
145 pub(crate) async fn handle_compaction_failure(&mut self, req: CompactionFailed) {
147 error!(req.err; "Failed to compact region: {}", req.region_id);
148
149 self.compaction_scheduler
150 .on_compaction_failed(req.region_id, req.err);
151 }
152
153 pub(crate) async fn schedule_compaction(&mut self, region: &MitoRegionRef) {
155 if region.is_staging() || region.is_enter_staging() {
156 info!(
157 "Region {} is staging or entering staging, skip compaction",
158 region.region_id
159 );
160 return;
161 }
162 let now = self.time_provider.current_time_millis();
163 if now - region.last_compaction_millis()
164 >= self.config.min_compaction_interval.as_millis() as i64
165 && let Err(e) = self
166 .compaction_scheduler
167 .schedule_compaction(
168 region.region_id,
169 compact_request::Options::Regular(Default::default()),
170 ®ion.version_control,
171 ®ion.access_layer,
172 OptionOutputTx::none(),
173 ®ion.manifest_ctx,
174 self.schema_metadata_manager.clone(),
175 1, )
177 .await
178 {
179 warn!(
180 "Failed to schedule compaction for region: {}, err: {}",
181 region.region_id, e
182 );
183 }
184 }
185}