mito2/schedule/
remote_job_scheduler.rs1use std::fmt;
16use std::sync::Arc;
17use std::time::Instant;
18
19use common_telemetry::error;
20use common_time::TimeToLive;
21use serde::{Deserialize, Serialize};
22use snafu::{Location, ResultExt, Snafu};
23use store_api::storage::RegionId;
24use tokio::sync::mpsc::Sender;
25use uuid::Uuid;
26
27use crate::compaction::compactor::CompactionRegion;
28use crate::compaction::picker::PickerOutput;
29use crate::error::{CompactRegionSnafu, Error, ParseJobIdSnafu, Result};
30use crate::manifest::action::RegionEdit;
31use crate::metrics::{COMPACTION_FAILURE_COUNT, INFLIGHT_COMPACTION_COUNT};
32use crate::request::{
33 BackgroundNotify, CompactionFailed, CompactionFinished, OutputTx, WorkerRequest,
34 WorkerRequestWithTime,
35};
36
37pub type RemoteJobSchedulerRef = Arc<dyn RemoteJobScheduler>;
38
39#[cfg_attr(doc, aquamarine::aquamarine)]
40#[async_trait::async_trait]
58pub trait RemoteJobScheduler: Send + Sync + 'static {
59 async fn schedule(
61 &self,
62 job: RemoteJob,
63 notifier: Box<dyn Notifier>,
64 ) -> Result<JobId, RemoteJobSchedulerError>;
65}
66
67#[derive(Snafu, Debug)]
68#[snafu(display("Internal error occurred in remote job scheduler: {}", reason))]
69pub struct RemoteJobSchedulerError {
70 #[snafu(implicit)]
71 pub location: Location,
72 pub reason: String,
73 pub waiters: Vec<OutputTx>,
75}
76
77#[async_trait::async_trait]
79pub trait Notifier: Send + Sync + 'static {
80 async fn notify(&self, result: RemoteJobResult, waiters: Vec<OutputTx>);
82}
83
84#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
86pub struct JobId(Uuid);
87
88impl JobId {
89 pub fn parse_str(input: &str) -> Result<JobId> {
91 Uuid::parse_str(input).map(JobId).context(ParseJobIdSnafu)
92 }
93}
94
95impl fmt::Display for JobId {
96 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
97 write!(f, "{}", self.0)
98 }
99}
100
101#[allow(dead_code)]
103pub enum RemoteJob {
104 CompactionJob(CompactionJob),
105}
106
107#[allow(dead_code)]
109pub struct CompactionJob {
110 pub compaction_region: CompactionRegion,
111 pub picker_output: PickerOutput,
112 pub start_time: Instant,
113 pub ttl: TimeToLive,
114 pub waiters: Vec<OutputTx>,
116}
117
118#[allow(dead_code)]
120pub enum RemoteJobResult {
121 CompactionJobResult(CompactionJobResult),
122}
123
124#[allow(dead_code)]
126pub struct CompactionJobResult {
127 pub job_id: JobId,
128 pub region_id: RegionId,
129 pub start_time: Instant,
130 pub region_edit: Result<RegionEdit>,
131}
132
133pub(crate) struct DefaultNotifier {
135 pub(crate) request_sender: Sender<WorkerRequestWithTime>,
137}
138
139impl DefaultNotifier {
140 fn on_failure(&self, err: Arc<Error>, region_id: RegionId, mut waiters: Vec<OutputTx>) {
141 COMPACTION_FAILURE_COUNT.inc();
142 for waiter in waiters.drain(..) {
143 waiter.send(Err(err.clone()).context(CompactRegionSnafu { region_id }));
144 }
145 }
146}
147
148#[async_trait::async_trait]
149impl Notifier for DefaultNotifier {
150 async fn notify(&self, result: RemoteJobResult, waiters: Vec<OutputTx>) {
151 INFLIGHT_COMPACTION_COUNT.dec();
152 match result {
153 RemoteJobResult::CompactionJobResult(result) => {
154 let notify = {
155 match result.region_edit {
156 Ok(edit) => BackgroundNotify::CompactionFinished(CompactionFinished {
157 region_id: result.region_id,
158 senders: waiters,
159 start_time: result.start_time,
160 edit,
161 }),
162 Err(err) => {
163 error!(
164 "Compaction failed for region {}: {:?}",
165 result.region_id, err
166 );
167 let err = Arc::new(err);
168 self.on_failure(err.clone(), result.region_id, waiters);
169 BackgroundNotify::CompactionFailed(CompactionFailed {
170 region_id: result.region_id,
171 err,
172 })
173 }
174 }
175 };
176
177 if let Err(e) = self
178 .request_sender
179 .send(WorkerRequestWithTime::new(WorkerRequest::Background {
180 region_id: result.region_id,
181 notify,
182 }))
183 .await
184 {
185 error!(
186 "Failed to notify compaction job status for region {}, error: {:?}",
187 result.region_id, e
188 );
189 }
190 }
191 }
192 }
193}
194
195#[cfg(test)]
196mod tests {
197 use super::*;
198
199 #[test]
200 fn test_job_id() {
201 let id = Uuid::new_v4().to_string();
202 let job_id = JobId::parse_str(&id).unwrap();
203 assert_eq!(job_id.to_string(), id);
204 }
205}