mito2/schedule/remote_job_scheduler.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::fmt;
use std::sync::Arc;
use std::time::Instant;
use common_telemetry::error;
use serde::{Deserialize, Serialize};
use snafu::{Location, ResultExt, Snafu};
use store_api::storage::RegionId;
use tokio::sync::mpsc::Sender;
use uuid::Uuid;
use crate::compaction::compactor::CompactionRegion;
use crate::compaction::picker::PickerOutput;
use crate::error::{CompactRegionSnafu, Error, ParseJobIdSnafu, Result};
use crate::manifest::action::RegionEdit;
use crate::metrics::{COMPACTION_FAILURE_COUNT, INFLIGHT_COMPACTION_COUNT};
use crate::request::{
BackgroundNotify, CompactionFailed, CompactionFinished, OutputTx, WorkerRequest,
};
pub type RemoteJobSchedulerRef = Arc<dyn RemoteJobScheduler>;
#[cfg_attr(doc, aquamarine::aquamarine)]
/// RemoteJobScheduler is a trait that defines the API to schedule remote jobs.
/// For example, a compaction job can be scheduled remotely as the following workflow:
/// ```mermaid
/// participant User
/// participant MitoEngine
/// participant CompactionScheduler
/// participant Plugins
/// participant RemoteJobScheduler
///
/// User->>MitoEngine: Initiates compaction
/// MitoEngine->>CompactionScheduler: schedule_compaction()
/// CompactionScheduler->>Plugins: Handle plugins
/// CompactionScheduler->>RemoteJobScheduler: schedule(CompactionJob)
/// RemoteJobScheduler-->>CompactionScheduler: Returns Job UUID
/// CompactionScheduler-->>MitoEngine: Task scheduled with Job UUID
/// MitoEngine-->>User: Compaction task scheduled
/// ```
#[async_trait::async_trait]
pub trait RemoteJobScheduler: Send + Sync + 'static {
/// Sends a job to the scheduler and returns a UUID for the job.
async fn schedule(
&self,
job: RemoteJob,
notifier: Box<dyn Notifier>,
) -> Result<JobId, RemoteJobSchedulerError>;
}
#[derive(Snafu, Debug)]
#[snafu(display("Internal error occurred in remote job scheduler: {}", reason))]
pub struct RemoteJobSchedulerError {
#[snafu(implicit)]
pub location: Location,
pub reason: String,
// Keep the waiters in the error so that we can notify them when fallback to the local compaction.
pub waiters: Vec<OutputTx>,
}
/// Notifier is used to notify the mito engine when a remote job is completed.
#[async_trait::async_trait]
pub trait Notifier: Send + Sync + 'static {
/// Notify the mito engine that a remote job is completed.
async fn notify(&self, result: RemoteJobResult, waiters: Vec<OutputTx>);
}
/// Unique id for a remote job.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct JobId(Uuid);
impl JobId {
/// Parses job id from string.
pub fn parse_str(input: &str) -> Result<JobId> {
Uuid::parse_str(input).map(JobId).context(ParseJobIdSnafu)
}
}
impl fmt::Display for JobId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.0)
}
}
/// RemoteJob is a job that can be executed remotely. For example, a remote compaction job.
#[allow(dead_code)]
pub enum RemoteJob {
CompactionJob(CompactionJob),
}
/// CompactionJob is a remote job that compacts a set of files in a compaction service.
#[allow(dead_code)]
pub struct CompactionJob {
pub compaction_region: CompactionRegion,
pub picker_output: PickerOutput,
pub start_time: Instant,
/// Send the result of the compaction job to these waiters.
pub waiters: Vec<OutputTx>,
}
/// RemoteJobResult is the result of a remote job.
#[allow(dead_code)]
pub enum RemoteJobResult {
CompactionJobResult(CompactionJobResult),
}
/// CompactionJobResult is the result of a compaction job.
#[allow(dead_code)]
pub struct CompactionJobResult {
pub job_id: JobId,
pub region_id: RegionId,
pub start_time: Instant,
pub region_edit: Result<RegionEdit>,
}
/// DefaultNotifier is a default implementation of Notifier that sends WorkerRequest to the mito engine.
pub(crate) struct DefaultNotifier {
/// The sender to send WorkerRequest to the mito engine. This is used to notify the mito engine when a remote job is completed.
pub(crate) request_sender: Sender<WorkerRequest>,
}
impl DefaultNotifier {
fn on_failure(&self, err: Arc<Error>, region_id: RegionId, mut waiters: Vec<OutputTx>) {
COMPACTION_FAILURE_COUNT.inc();
for waiter in waiters.drain(..) {
waiter.send(Err(err.clone()).context(CompactRegionSnafu { region_id }));
}
}
}
#[async_trait::async_trait]
impl Notifier for DefaultNotifier {
async fn notify(&self, result: RemoteJobResult, waiters: Vec<OutputTx>) {
INFLIGHT_COMPACTION_COUNT.dec();
match result {
RemoteJobResult::CompactionJobResult(result) => {
let notify = {
match result.region_edit {
Ok(edit) => BackgroundNotify::CompactionFinished(CompactionFinished {
region_id: result.region_id,
senders: waiters,
start_time: result.start_time,
edit,
}),
Err(err) => {
error!(
"Compaction failed for region {}: {:?}",
result.region_id, err
);
let err = Arc::new(err);
self.on_failure(err.clone(), result.region_id, waiters);
BackgroundNotify::CompactionFailed(CompactionFailed {
region_id: result.region_id,
err,
})
}
}
};
if let Err(e) = self
.request_sender
.send(WorkerRequest::Background {
region_id: result.region_id,
notify,
})
.await
{
error!(
"Failed to notify compaction job status for region {}, error: {:?}",
result.region_id, e
);
}
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_job_id() {
let id = Uuid::new_v4().to_string();
let job_id = JobId::parse_str(&id).unwrap();
assert_eq!(job_id.to_string(), id);
}
}