mito2/schedule/
remote_job_scheduler.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::fmt;
16use std::sync::Arc;
17use std::time::Instant;
18
19use common_telemetry::error;
20use serde::{Deserialize, Serialize};
21use snafu::{Location, ResultExt, Snafu};
22use store_api::storage::RegionId;
23use tokio::sync::mpsc::Sender;
24use uuid::Uuid;
25
26use crate::compaction::compactor::CompactionRegion;
27use crate::compaction::picker::PickerOutput;
28use crate::error::{CompactRegionSnafu, Error, ParseJobIdSnafu, Result};
29use crate::manifest::action::RegionEdit;
30use crate::metrics::{COMPACTION_FAILURE_COUNT, INFLIGHT_COMPACTION_COUNT};
31use crate::request::{
32    BackgroundNotify, CompactionFailed, CompactionFinished, OutputTx, WorkerRequest,
33};
34
35pub type RemoteJobSchedulerRef = Arc<dyn RemoteJobScheduler>;
36
37#[cfg_attr(doc, aquamarine::aquamarine)]
38/// RemoteJobScheduler is a trait that defines the API to schedule remote jobs.
39/// For example, a compaction job can be scheduled remotely as the following workflow:
40/// ```mermaid
41///   participant User
42///   participant MitoEngine
43///   participant CompactionScheduler
44///   participant Plugins
45///   participant RemoteJobScheduler
46///
47///   User->>MitoEngine: Initiates compaction
48///   MitoEngine->>CompactionScheduler: schedule_compaction()
49///   CompactionScheduler->>Plugins: Handle plugins
50///   CompactionScheduler->>RemoteJobScheduler: schedule(CompactionJob)
51///   RemoteJobScheduler-->>CompactionScheduler: Returns Job UUID
52///   CompactionScheduler-->>MitoEngine: Task scheduled with Job UUID
53///   MitoEngine-->>User: Compaction task scheduled
54/// ```
55#[async_trait::async_trait]
56pub trait RemoteJobScheduler: Send + Sync + 'static {
57    /// Sends a job to the scheduler and returns a UUID for the job.
58    async fn schedule(
59        &self,
60        job: RemoteJob,
61        notifier: Box<dyn Notifier>,
62    ) -> Result<JobId, RemoteJobSchedulerError>;
63}
64
65#[derive(Snafu, Debug)]
66#[snafu(display("Internal error occurred in remote job scheduler: {}", reason))]
67pub struct RemoteJobSchedulerError {
68    #[snafu(implicit)]
69    pub location: Location,
70    pub reason: String,
71    // Keep the waiters in the error so that we can notify them when fallback to the local compaction.
72    pub waiters: Vec<OutputTx>,
73}
74
75/// Notifier is used to notify the mito engine when a remote job is completed.
76#[async_trait::async_trait]
77pub trait Notifier: Send + Sync + 'static {
78    /// Notify the mito engine that a remote job is completed.
79    async fn notify(&self, result: RemoteJobResult, waiters: Vec<OutputTx>);
80}
81
82/// Unique id for a remote job.
83#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
84pub struct JobId(Uuid);
85
86impl JobId {
87    /// Parses job id from string.
88    pub fn parse_str(input: &str) -> Result<JobId> {
89        Uuid::parse_str(input).map(JobId).context(ParseJobIdSnafu)
90    }
91}
92
93impl fmt::Display for JobId {
94    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
95        write!(f, "{}", self.0)
96    }
97}
98
99/// RemoteJob is a job that can be executed remotely. For example, a remote compaction job.
100#[allow(dead_code)]
101pub enum RemoteJob {
102    CompactionJob(CompactionJob),
103}
104
105/// CompactionJob is a remote job that compacts a set of files in a compaction service.
106#[allow(dead_code)]
107pub struct CompactionJob {
108    pub compaction_region: CompactionRegion,
109    pub picker_output: PickerOutput,
110    pub start_time: Instant,
111    /// Send the result of the compaction job to these waiters.
112    pub waiters: Vec<OutputTx>,
113}
114
115/// RemoteJobResult is the result of a remote job.
116#[allow(dead_code)]
117pub enum RemoteJobResult {
118    CompactionJobResult(CompactionJobResult),
119}
120
121/// CompactionJobResult is the result of a compaction job.
122#[allow(dead_code)]
123pub struct CompactionJobResult {
124    pub job_id: JobId,
125    pub region_id: RegionId,
126    pub start_time: Instant,
127    pub region_edit: Result<RegionEdit>,
128}
129
130/// DefaultNotifier is a default implementation of Notifier that sends WorkerRequest to the mito engine.
131pub(crate) struct DefaultNotifier {
132    /// The sender to send WorkerRequest to the mito engine. This is used to notify the mito engine when a remote job is completed.
133    pub(crate) request_sender: Sender<WorkerRequest>,
134}
135
136impl DefaultNotifier {
137    fn on_failure(&self, err: Arc<Error>, region_id: RegionId, mut waiters: Vec<OutputTx>) {
138        COMPACTION_FAILURE_COUNT.inc();
139        for waiter in waiters.drain(..) {
140            waiter.send(Err(err.clone()).context(CompactRegionSnafu { region_id }));
141        }
142    }
143}
144
145#[async_trait::async_trait]
146impl Notifier for DefaultNotifier {
147    async fn notify(&self, result: RemoteJobResult, waiters: Vec<OutputTx>) {
148        INFLIGHT_COMPACTION_COUNT.dec();
149        match result {
150            RemoteJobResult::CompactionJobResult(result) => {
151                let notify = {
152                    match result.region_edit {
153                        Ok(edit) => BackgroundNotify::CompactionFinished(CompactionFinished {
154                            region_id: result.region_id,
155                            senders: waiters,
156                            start_time: result.start_time,
157                            edit,
158                        }),
159                        Err(err) => {
160                            error!(
161                                "Compaction failed for region {}: {:?}",
162                                result.region_id, err
163                            );
164                            let err = Arc::new(err);
165                            self.on_failure(err.clone(), result.region_id, waiters);
166                            BackgroundNotify::CompactionFailed(CompactionFailed {
167                                region_id: result.region_id,
168                                err,
169                            })
170                        }
171                    }
172                };
173
174                if let Err(e) = self
175                    .request_sender
176                    .send(WorkerRequest::Background {
177                        region_id: result.region_id,
178                        notify,
179                    })
180                    .await
181                {
182                    error!(
183                        "Failed to notify compaction job status for region {}, error: {:?}",
184                        result.region_id, e
185                    );
186                }
187            }
188        }
189    }
190}
191
192#[cfg(test)]
193mod tests {
194    use super::*;
195
196    #[test]
197    fn test_job_id() {
198        let id = Uuid::new_v4().to_string();
199        let job_id = JobId::parse_str(&id).unwrap();
200        assert_eq!(job_id.to_string(), id);
201    }
202}