common_meta/heartbeat/handler/
suspend.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16use std::sync::atomic::{AtomicBool, Ordering};
17
18use async_trait::async_trait;
19use common_telemetry::{info, warn};
20
21use crate::error::Result;
22use crate::heartbeat::handler::{
23    HandleControl, HeartbeatResponseHandler, HeartbeatResponseHandlerContext,
24};
25use crate::instruction::Instruction;
26
27/// A heartbeat response handler that handles special "suspend" error.
28/// It will simply set or clear (if previously set) the inner suspend atomic state.
29pub struct SuspendHandler {
30    suspend: Arc<AtomicBool>,
31}
32
33impl SuspendHandler {
34    pub fn new(suspend: Arc<AtomicBool>) -> Self {
35        Self { suspend }
36    }
37}
38
39#[async_trait]
40impl HeartbeatResponseHandler for SuspendHandler {
41    fn is_acceptable(&self, context: &HeartbeatResponseHandlerContext) -> bool {
42        matches!(
43            context.incoming_message,
44            Some((_, Instruction::Suspend)) | None
45        )
46    }
47
48    async fn handle(&self, context: &mut HeartbeatResponseHandlerContext) -> Result<HandleControl> {
49        let flip_state = |expect: bool| {
50            self.suspend
51                .compare_exchange(expect, !expect, Ordering::Relaxed, Ordering::Relaxed)
52                .is_ok()
53        };
54
55        if let Some((_, Instruction::Suspend)) = context.incoming_message.take() {
56            if flip_state(false) {
57                warn!("Suspend instruction received from meta, entering suspension state");
58            }
59        } else {
60            // Suspended components are made always tried to get rid of this state, we don't want
61            // an "un-suspend" instruction to resume them running. That can be error-prone.
62            // So if the "suspend" instruction is not found in the heartbeat, just unset the state.
63            if flip_state(true) {
64                info!("clear suspend state");
65            }
66        }
67        Ok(HandleControl::Continue)
68    }
69}