common_meta/heartbeat/handler/
suspend.rs1use std::sync::Arc;
16use std::sync::atomic::{AtomicBool, Ordering};
17
18use async_trait::async_trait;
19use common_telemetry::{info, warn};
20
21use crate::error::Result;
22use crate::heartbeat::handler::{
23 HandleControl, HeartbeatResponseHandler, HeartbeatResponseHandlerContext,
24};
25use crate::instruction::Instruction;
26
27pub struct SuspendHandler {
30 suspend: Arc<AtomicBool>,
31}
32
33impl SuspendHandler {
34 pub fn new(suspend: Arc<AtomicBool>) -> Self {
35 Self { suspend }
36 }
37}
38
39#[async_trait]
40impl HeartbeatResponseHandler for SuspendHandler {
41 fn is_acceptable(&self, context: &HeartbeatResponseHandlerContext) -> bool {
42 matches!(
43 context.incoming_message,
44 Some((_, Instruction::Suspend)) | None
45 )
46 }
47
48 async fn handle(&self, context: &mut HeartbeatResponseHandlerContext) -> Result<HandleControl> {
49 let flip_state = |expect: bool| {
50 self.suspend
51 .compare_exchange(expect, !expect, Ordering::Relaxed, Ordering::Relaxed)
52 .is_ok()
53 };
54
55 if let Some((_, Instruction::Suspend)) = context.incoming_message.take() {
56 if flip_state(false) {
57 warn!("Suspend instruction received from meta, entering suspension state");
58 }
59 } else {
60 if flip_state(true) {
64 info!("clear suspend state");
65 }
66 }
67 Ok(HandleControl::Continue)
68 }
69}