mito2/worker/
handle_compaction.rs1use api::v1::region::compact_request;
16use common_telemetry::{error, info, warn};
17use store_api::region_request::RegionCompactRequest;
18use store_api::storage::RegionId;
19
20use crate::error::RegionNotFoundSnafu;
21use crate::metrics::COMPACTION_REQUEST_COUNT;
22use crate::region::MitoRegionRef;
23use crate::request::{CompactionFailed, CompactionFinished, OnFailure, OptionOutputTx};
24use crate::worker::RegionWorkerLoop;
25
26impl<S> RegionWorkerLoop<S> {
27 pub(crate) async fn handle_compaction_request(
29 &mut self,
30 region_id: RegionId,
31 req: RegionCompactRequest,
32 mut sender: OptionOutputTx,
33 ) {
34 let Some(region) = self.regions.writable_region_or(region_id, &mut sender) else {
35 return;
36 };
37 COMPACTION_REQUEST_COUNT.inc();
38 if let Err(e) = self
39 .compaction_scheduler
40 .schedule_compaction(
41 region.region_id,
42 req.options,
43 ®ion.version_control,
44 ®ion.access_layer,
45 sender,
46 ®ion.manifest_ctx,
47 self.schema_metadata_manager.clone(),
48 1,
50 )
51 .await
52 {
53 error!(e; "Failed to schedule compaction task for region: {}", region_id);
54 } else {
55 info!(
56 "Successfully scheduled compaction task for region: {}",
57 region_id
58 );
59 }
60 }
61
62 pub(crate) async fn handle_compaction_finished(
64 &mut self,
65 region_id: RegionId,
66 mut request: CompactionFinished,
67 ) {
68 let region = match self.regions.get_region(region_id) {
69 Some(region) => region,
70 None => {
71 request.on_failure(RegionNotFoundSnafu { region_id }.build());
72 return;
73 }
74 };
75 region.update_compaction_millis();
76
77 region
78 .version_control
79 .apply_edit(request.edit.clone(), &[], region.file_purger.clone());
80
81 request.on_success();
83
84 self.compaction_scheduler
86 .on_compaction_finished(
87 region_id,
88 ®ion.manifest_ctx,
89 self.schema_metadata_manager.clone(),
90 )
91 .await;
92 }
93
94 pub(crate) async fn handle_compaction_failure(&mut self, req: CompactionFailed) {
96 error!(req.err; "Failed to compact region: {}", req.region_id);
97
98 self.compaction_scheduler
99 .on_compaction_failed(req.region_id, req.err);
100 }
101
102 pub(crate) async fn schedule_compaction(&mut self, region: &MitoRegionRef) {
104 let now = self.time_provider.current_time_millis();
105 if now - region.last_compaction_millis()
106 >= self.config.min_compaction_interval.as_millis() as i64
107 {
108 if let Err(e) = self
109 .compaction_scheduler
110 .schedule_compaction(
111 region.region_id,
112 compact_request::Options::Regular(Default::default()),
113 ®ion.version_control,
114 ®ion.access_layer,
115 OptionOutputTx::none(),
116 ®ion.manifest_ctx,
117 self.schema_metadata_manager.clone(),
118 1,
119 )
120 .await
121 {
122 warn!(
123 "Failed to schedule compaction for region: {}, err: {}",
124 region.region_id, e
125 );
126 }
127 }
128 }
129}