mito2/schedule/
remote_job_scheduler.rs1use std::fmt;
16use std::sync::Arc;
17use std::time::Instant;
18
19use common_telemetry::error;
20use serde::{Deserialize, Serialize};
21use snafu::{Location, ResultExt, Snafu};
22use store_api::storage::RegionId;
23use tokio::sync::mpsc::Sender;
24use uuid::Uuid;
25
26use crate::compaction::compactor::CompactionRegion;
27use crate::compaction::picker::PickerOutput;
28use crate::error::{CompactRegionSnafu, Error, ParseJobIdSnafu, Result};
29use crate::manifest::action::RegionEdit;
30use crate::metrics::{COMPACTION_FAILURE_COUNT, INFLIGHT_COMPACTION_COUNT};
31use crate::request::{
32 BackgroundNotify, CompactionFailed, CompactionFinished, OutputTx, WorkerRequest,
33};
34
35pub type RemoteJobSchedulerRef = Arc<dyn RemoteJobScheduler>;
36
37#[cfg_attr(doc, aquamarine::aquamarine)]
38#[async_trait::async_trait]
56pub trait RemoteJobScheduler: Send + Sync + 'static {
57 async fn schedule(
59 &self,
60 job: RemoteJob,
61 notifier: Box<dyn Notifier>,
62 ) -> Result<JobId, RemoteJobSchedulerError>;
63}
64
65#[derive(Snafu, Debug)]
66#[snafu(display("Internal error occurred in remote job scheduler: {}", reason))]
67pub struct RemoteJobSchedulerError {
68 #[snafu(implicit)]
69 pub location: Location,
70 pub reason: String,
71 pub waiters: Vec<OutputTx>,
73}
74
75#[async_trait::async_trait]
77pub trait Notifier: Send + Sync + 'static {
78 async fn notify(&self, result: RemoteJobResult, waiters: Vec<OutputTx>);
80}
81
82#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
84pub struct JobId(Uuid);
85
86impl JobId {
87 pub fn parse_str(input: &str) -> Result<JobId> {
89 Uuid::parse_str(input).map(JobId).context(ParseJobIdSnafu)
90 }
91}
92
93impl fmt::Display for JobId {
94 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
95 write!(f, "{}", self.0)
96 }
97}
98
99#[allow(dead_code)]
101pub enum RemoteJob {
102 CompactionJob(CompactionJob),
103}
104
105#[allow(dead_code)]
107pub struct CompactionJob {
108 pub compaction_region: CompactionRegion,
109 pub picker_output: PickerOutput,
110 pub start_time: Instant,
111 pub waiters: Vec<OutputTx>,
113}
114
115#[allow(dead_code)]
117pub enum RemoteJobResult {
118 CompactionJobResult(CompactionJobResult),
119}
120
121#[allow(dead_code)]
123pub struct CompactionJobResult {
124 pub job_id: JobId,
125 pub region_id: RegionId,
126 pub start_time: Instant,
127 pub region_edit: Result<RegionEdit>,
128}
129
130pub(crate) struct DefaultNotifier {
132 pub(crate) request_sender: Sender<WorkerRequest>,
134}
135
136impl DefaultNotifier {
137 fn on_failure(&self, err: Arc<Error>, region_id: RegionId, mut waiters: Vec<OutputTx>) {
138 COMPACTION_FAILURE_COUNT.inc();
139 for waiter in waiters.drain(..) {
140 waiter.send(Err(err.clone()).context(CompactRegionSnafu { region_id }));
141 }
142 }
143}
144
145#[async_trait::async_trait]
146impl Notifier for DefaultNotifier {
147 async fn notify(&self, result: RemoteJobResult, waiters: Vec<OutputTx>) {
148 INFLIGHT_COMPACTION_COUNT.dec();
149 match result {
150 RemoteJobResult::CompactionJobResult(result) => {
151 let notify = {
152 match result.region_edit {
153 Ok(edit) => BackgroundNotify::CompactionFinished(CompactionFinished {
154 region_id: result.region_id,
155 senders: waiters,
156 start_time: result.start_time,
157 edit,
158 }),
159 Err(err) => {
160 error!(
161 "Compaction failed for region {}: {:?}",
162 result.region_id, err
163 );
164 let err = Arc::new(err);
165 self.on_failure(err.clone(), result.region_id, waiters);
166 BackgroundNotify::CompactionFailed(CompactionFailed {
167 region_id: result.region_id,
168 err,
169 })
170 }
171 }
172 };
173
174 if let Err(e) = self
175 .request_sender
176 .send(WorkerRequest::Background {
177 region_id: result.region_id,
178 notify,
179 })
180 .await
181 {
182 error!(
183 "Failed to notify compaction job status for region {}, error: {:?}",
184 result.region_id, e
185 );
186 }
187 }
188 }
189 }
190}
191
192#[cfg(test)]
193mod tests {
194 use super::*;
195
196 #[test]
197 fn test_job_id() {
198 let id = Uuid::new_v4().to_string();
199 let job_id = JobId::parse_str(&id).unwrap();
200 assert_eq!(job_id.to_string(), id);
201 }
202}