mito2/schedule/
remote_job_scheduler.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::fmt;
16use std::sync::Arc;
17use std::time::Instant;
18
19use common_telemetry::error;
20use common_time::TimeToLive;
21use serde::{Deserialize, Serialize};
22use snafu::{Location, ResultExt, Snafu};
23use store_api::storage::RegionId;
24use tokio::sync::mpsc::Sender;
25use uuid::Uuid;
26
27use crate::compaction::compactor::CompactionRegion;
28use crate::compaction::picker::PickerOutput;
29use crate::error::{CompactRegionSnafu, Error, ParseJobIdSnafu, Result};
30use crate::manifest::action::RegionEdit;
31use crate::metrics::{COMPACTION_FAILURE_COUNT, INFLIGHT_COMPACTION_COUNT};
32use crate::request::{
33    BackgroundNotify, CompactionFailed, CompactionFinished, OutputTx, WorkerRequest,
34    WorkerRequestWithTime,
35};
36
37pub type RemoteJobSchedulerRef = Arc<dyn RemoteJobScheduler>;
38
39#[cfg_attr(doc, aquamarine::aquamarine)]
40/// RemoteJobScheduler is a trait that defines the API to schedule remote jobs.
41/// For example, a compaction job can be scheduled remotely as the following workflow:
42/// ```mermaid
43///   participant User
44///   participant MitoEngine
45///   participant CompactionScheduler
46///   participant Plugins
47///   participant RemoteJobScheduler
48///
49///   User->>MitoEngine: Initiates compaction
50///   MitoEngine->>CompactionScheduler: schedule_compaction()
51///   CompactionScheduler->>Plugins: Handle plugins
52///   CompactionScheduler->>RemoteJobScheduler: schedule(CompactionJob)
53///   RemoteJobScheduler-->>CompactionScheduler: Returns Job UUID
54///   CompactionScheduler-->>MitoEngine: Task scheduled with Job UUID
55///   MitoEngine-->>User: Compaction task scheduled
56/// ```
57#[async_trait::async_trait]
58pub trait RemoteJobScheduler: Send + Sync + 'static {
59    /// Sends a job to the scheduler and returns a UUID for the job.
60    async fn schedule(
61        &self,
62        job: RemoteJob,
63        notifier: Box<dyn Notifier>,
64    ) -> Result<JobId, RemoteJobSchedulerError>;
65}
66
67#[derive(Snafu, Debug)]
68#[snafu(display("Internal error occurred in remote job scheduler: {}", reason))]
69pub struct RemoteJobSchedulerError {
70    #[snafu(implicit)]
71    pub location: Location,
72    pub reason: String,
73    // Keep the waiters in the error so that we can notify them when fallback to the local compaction.
74    pub waiters: Vec<OutputTx>,
75}
76
77/// Notifier is used to notify the mito engine when a remote job is completed.
78#[async_trait::async_trait]
79pub trait Notifier: Send + Sync + 'static {
80    /// Notify the mito engine that a remote job is completed.
81    async fn notify(&self, result: RemoteJobResult, waiters: Vec<OutputTx>);
82}
83
84/// Unique id for a remote job.
85#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
86pub struct JobId(Uuid);
87
88impl JobId {
89    /// Parses job id from string.
90    pub fn parse_str(input: &str) -> Result<JobId> {
91        Uuid::parse_str(input).map(JobId).context(ParseJobIdSnafu)
92    }
93}
94
95impl fmt::Display for JobId {
96    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
97        write!(f, "{}", self.0)
98    }
99}
100
101/// RemoteJob is a job that can be executed remotely. For example, a remote compaction job.
102#[allow(dead_code)]
103pub enum RemoteJob {
104    CompactionJob(CompactionJob),
105}
106
107/// CompactionJob is a remote job that compacts a set of files in a compaction service.
108#[allow(dead_code)]
109pub struct CompactionJob {
110    pub compaction_region: CompactionRegion,
111    pub picker_output: PickerOutput,
112    pub start_time: Instant,
113    pub ttl: TimeToLive,
114    /// Send the result of the compaction job to these waiters.
115    pub waiters: Vec<OutputTx>,
116}
117
118/// RemoteJobResult is the result of a remote job.
119#[allow(dead_code)]
120pub enum RemoteJobResult {
121    CompactionJobResult(CompactionJobResult),
122}
123
124/// CompactionJobResult is the result of a compaction job.
125#[allow(dead_code)]
126pub struct CompactionJobResult {
127    pub job_id: JobId,
128    pub region_id: RegionId,
129    pub start_time: Instant,
130    pub region_edit: Result<RegionEdit>,
131}
132
133/// DefaultNotifier is a default implementation of Notifier that sends WorkerRequest to the mito engine.
134pub(crate) struct DefaultNotifier {
135    /// The sender to send WorkerRequest to the mito engine. This is used to notify the mito engine when a remote job is completed.
136    pub(crate) request_sender: Sender<WorkerRequestWithTime>,
137}
138
139impl DefaultNotifier {
140    fn on_failure(&self, err: Arc<Error>, region_id: RegionId, mut waiters: Vec<OutputTx>) {
141        COMPACTION_FAILURE_COUNT.inc();
142        for waiter in waiters.drain(..) {
143            waiter.send(Err(err.clone()).context(CompactRegionSnafu { region_id }));
144        }
145    }
146}
147
148#[async_trait::async_trait]
149impl Notifier for DefaultNotifier {
150    async fn notify(&self, result: RemoteJobResult, waiters: Vec<OutputTx>) {
151        INFLIGHT_COMPACTION_COUNT.dec();
152        match result {
153            RemoteJobResult::CompactionJobResult(result) => {
154                let notify = {
155                    match result.region_edit {
156                        Ok(edit) => BackgroundNotify::CompactionFinished(CompactionFinished {
157                            region_id: result.region_id,
158                            senders: waiters,
159                            start_time: result.start_time,
160                            edit,
161                        }),
162                        Err(err) => {
163                            error!(
164                                "Compaction failed for region {}: {:?}",
165                                result.region_id, err
166                            );
167                            let err = Arc::new(err);
168                            self.on_failure(err.clone(), result.region_id, waiters);
169                            BackgroundNotify::CompactionFailed(CompactionFailed {
170                                region_id: result.region_id,
171                                err,
172                            })
173                        }
174                    }
175                };
176
177                if let Err(e) = self
178                    .request_sender
179                    .send(WorkerRequestWithTime::new(WorkerRequest::Background {
180                        region_id: result.region_id,
181                        notify,
182                    }))
183                    .await
184                {
185                    error!(
186                        "Failed to notify compaction job status for region {}, error: {:?}",
187                        result.region_id, e
188                    );
189                }
190            }
191        }
192    }
193}
194
195#[cfg(test)]
196mod tests {
197    use super::*;
198
199    #[test]
200    fn test_job_id() {
201        let id = Uuid::new_v4().to_string();
202        let job_id = JobId::parse_str(&id).unwrap();
203        assert_eq!(job_id.to_string(), id);
204    }
205}