mito2/worker/
handle_compaction.rs1use api::v1::region::compact_request;
16use common_telemetry::{error, info, warn};
17use store_api::logstore::LogStore;
18use store_api::region_request::RegionCompactRequest;
19use store_api::storage::RegionId;
20
21use crate::config::IndexBuildMode;
22use crate::error::RegionNotFoundSnafu;
23use crate::metrics::COMPACTION_REQUEST_COUNT;
24use crate::region::MitoRegionRef;
25use crate::request::{
26 BuildIndexRequest, CompactionFailed, CompactionFinished, OnFailure, OptionOutputTx,
27};
28use crate::sst::index::IndexBuildType;
29use crate::worker::RegionWorkerLoop;
30
31impl<S> RegionWorkerLoop<S> {
32 pub(crate) async fn handle_compaction_request(
34 &mut self,
35 region_id: RegionId,
36 req: RegionCompactRequest,
37 mut sender: OptionOutputTx,
38 ) {
39 let Some(region) = self.regions.writable_region_or(region_id, &mut sender) else {
40 return;
41 };
42 COMPACTION_REQUEST_COUNT.inc();
43 let parallelism = req.parallelism.unwrap_or(1) as usize;
44 if let Err(e) = self
45 .compaction_scheduler
46 .schedule_compaction(
47 region.region_id,
48 req.options,
49 ®ion.version_control,
50 ®ion.access_layer,
51 sender,
52 ®ion.manifest_ctx,
53 self.schema_metadata_manager.clone(),
54 parallelism,
55 )
56 .await
57 {
58 error!(e; "Failed to schedule compaction task for region: {}", region_id);
59 } else {
60 info!(
61 "Successfully scheduled compaction task for region: {}",
62 region_id
63 );
64 }
65 }
66
67 pub(crate) async fn handle_compaction_finished(
69 &mut self,
70 region_id: RegionId,
71 mut request: CompactionFinished,
72 ) where
73 S: LogStore,
74 {
75 let region = match self.regions.get_region(region_id) {
76 Some(region) => region,
77 None => {
78 request.on_failure(RegionNotFoundSnafu { region_id }.build());
79 return;
80 }
81 };
82 region.update_compaction_millis();
83
84 region.version_control.apply_edit(
85 Some(request.edit.clone()),
86 &[],
87 region.file_purger.clone(),
88 );
89
90 let index_build_file_metas = std::mem::take(&mut request.edit.files_to_add);
91
92 request.on_success();
94
95 if self.config.index.build_mode == IndexBuildMode::Async
97 && !index_build_file_metas.is_empty()
98 {
99 self.handle_rebuild_index(
100 BuildIndexRequest {
101 region_id,
102 build_type: IndexBuildType::Compact,
103 file_metas: index_build_file_metas,
104 },
105 OptionOutputTx::new(None),
106 )
107 .await;
108 }
109
110 let mut pending_ddls = self
112 .compaction_scheduler
113 .on_compaction_finished(
114 region_id,
115 ®ion.manifest_ctx,
116 self.schema_metadata_manager.clone(),
117 )
118 .await;
119 self.handle_ddl_requests(&mut pending_ddls).await;
120 }
121
122 pub(crate) async fn handle_compaction_failure(&mut self, req: CompactionFailed) {
124 error!(req.err; "Failed to compact region: {}", req.region_id);
125
126 self.compaction_scheduler
127 .on_compaction_failed(req.region_id, req.err);
128 }
129
130 pub(crate) async fn schedule_compaction(&mut self, region: &MitoRegionRef) {
132 if region.is_staging() || region.is_enter_staging() {
133 info!(
134 "Region {} is staging or entering staging, skip compaction",
135 region.region_id
136 );
137 return;
138 }
139 let now = self.time_provider.current_time_millis();
140 if now - region.last_compaction_millis()
141 >= self.config.min_compaction_interval.as_millis() as i64
142 && let Err(e) = self
143 .compaction_scheduler
144 .schedule_compaction(
145 region.region_id,
146 compact_request::Options::Regular(Default::default()),
147 ®ion.version_control,
148 ®ion.access_layer,
149 OptionOutputTx::none(),
150 ®ion.manifest_ctx,
151 self.schema_metadata_manager.clone(),
152 1, )
154 .await
155 {
156 warn!(
157 "Failed to schedule compaction for region: {}, err: {}",
158 region.region_id, e
159 );
160 }
161 }
162}