mito2/worker/
handle_compaction.rs1use api::v1::region::compact_request;
16use common_telemetry::{error, info, warn};
17use store_api::region_request::RegionCompactRequest;
18use store_api::storage::RegionId;
19
20use crate::error::RegionNotFoundSnafu;
21use crate::metrics::COMPACTION_REQUEST_COUNT;
22use crate::region::MitoRegionRef;
23use crate::request::{CompactionFailed, CompactionFinished, OnFailure, OptionOutputTx};
24use crate::worker::RegionWorkerLoop;
25
26impl<S> RegionWorkerLoop<S> {
27 pub(crate) async fn handle_compaction_request(
29 &mut self,
30 region_id: RegionId,
31 req: RegionCompactRequest,
32 mut sender: OptionOutputTx,
33 ) {
34 let Some(region) = self.regions.writable_region_or(region_id, &mut sender) else {
35 return;
36 };
37 COMPACTION_REQUEST_COUNT.inc();
38 if let Err(e) = self
39 .compaction_scheduler
40 .schedule_compaction(
41 region.region_id,
42 req.options,
43 ®ion.version_control,
44 ®ion.access_layer,
45 sender,
46 ®ion.manifest_ctx,
47 self.schema_metadata_manager.clone(),
48 1,
50 )
51 .await
52 {
53 error!(e; "Failed to schedule compaction task for region: {}", region_id);
54 } else {
55 info!(
56 "Successfully scheduled compaction task for region: {}",
57 region_id
58 );
59 }
60 }
61
62 pub(crate) async fn handle_compaction_finished(
64 &mut self,
65 region_id: RegionId,
66 mut request: CompactionFinished,
67 ) {
68 let region = match self.regions.get_region(region_id) {
69 Some(region) => region,
70 None => {
71 request.on_failure(RegionNotFoundSnafu { region_id }.build());
72 return;
73 }
74 };
75 region.update_compaction_millis();
76
77 region.version_control.apply_edit(
78 Some(request.edit.clone()),
79 &[],
80 region.file_purger.clone(),
81 );
82
83 request.on_success();
85
86 self.compaction_scheduler
88 .on_compaction_finished(
89 region_id,
90 ®ion.manifest_ctx,
91 self.schema_metadata_manager.clone(),
92 )
93 .await;
94 }
95
96 pub(crate) async fn handle_compaction_failure(&mut self, req: CompactionFailed) {
98 error!(req.err; "Failed to compact region: {}", req.region_id);
99
100 self.compaction_scheduler
101 .on_compaction_failed(req.region_id, req.err);
102 }
103
104 pub(crate) async fn schedule_compaction(&mut self, region: &MitoRegionRef) {
106 let now = self.time_provider.current_time_millis();
107 if now - region.last_compaction_millis()
108 >= self.config.min_compaction_interval.as_millis() as i64
109 && let Err(e) = self
110 .compaction_scheduler
111 .schedule_compaction(
112 region.region_id,
113 compact_request::Options::Regular(Default::default()),
114 ®ion.version_control,
115 ®ion.access_layer,
116 OptionOutputTx::none(),
117 ®ion.manifest_ctx,
118 self.schema_metadata_manager.clone(),
119 1,
120 )
121 .await
122 {
123 warn!(
124 "Failed to schedule compaction for region: {}, err: {}",
125 region.region_id, e
126 );
127 }
128 }
129}