mito2/worker/
handle_compaction.rsuse api::v1::region::compact_request;
use common_telemetry::{error, info, warn};
use store_api::region_request::RegionCompactRequest;
use store_api::storage::RegionId;
use crate::error::RegionNotFoundSnafu;
use crate::metrics::COMPACTION_REQUEST_COUNT;
use crate::region::MitoRegionRef;
use crate::request::{CompactionFailed, CompactionFinished, OnFailure, OptionOutputTx};
use crate::worker::RegionWorkerLoop;
impl<S> RegionWorkerLoop<S> {
pub(crate) async fn handle_compaction_request(
&mut self,
region_id: RegionId,
req: RegionCompactRequest,
mut sender: OptionOutputTx,
) {
let Some(region) = self.regions.writable_region_or(region_id, &mut sender) else {
return;
};
COMPACTION_REQUEST_COUNT.inc();
if let Err(e) = self
.compaction_scheduler
.schedule_compaction(
region.region_id,
req.options,
®ion.version_control,
®ion.access_layer,
sender,
®ion.manifest_ctx,
self.schema_metadata_manager.clone(),
1,
)
.await
{
error!(e; "Failed to schedule compaction task for region: {}", region_id);
} else {
info!(
"Successfully scheduled compaction task for region: {}",
region_id
);
}
}
pub(crate) async fn handle_compaction_finished(
&mut self,
region_id: RegionId,
mut request: CompactionFinished,
) {
let region = match self.regions.get_region(region_id) {
Some(region) => region,
None => {
request.on_failure(RegionNotFoundSnafu { region_id }.build());
return;
}
};
region.update_compaction_millis();
region
.version_control
.apply_edit(request.edit.clone(), &[], region.file_purger.clone());
request.on_success();
self.compaction_scheduler
.on_compaction_finished(
region_id,
®ion.manifest_ctx,
self.schema_metadata_manager.clone(),
)
.await;
}
pub(crate) async fn handle_compaction_failure(&mut self, req: CompactionFailed) {
error!(req.err; "Failed to compact region: {}", req.region_id);
self.compaction_scheduler
.on_compaction_failed(req.region_id, req.err);
}
pub(crate) async fn schedule_compaction(&mut self, region: &MitoRegionRef) {
let now = self.time_provider.current_time_millis();
if now - region.last_compaction_millis()
>= self.config.min_compaction_interval.as_millis() as i64
{
if let Err(e) = self
.compaction_scheduler
.schedule_compaction(
region.region_id,
compact_request::Options::Regular(Default::default()),
®ion.version_control,
®ion.access_layer,
OptionOutputTx::none(),
®ion.manifest_ctx,
self.schema_metadata_manager.clone(),
1,
)
.await
{
warn!(
"Failed to schedule compaction for region: {}, err: {}",
region.region_id, e
);
}
}
}
}