1pub mod insert_forwarder;
16
17#[macro_export]
18macro_rules! define_ticker {
19 (
20 $(#[$meta:meta])*
21 $name:ident,
22 event_type = $event_ty:ty,
23 event_value = $event_val:expr
24 ) => {
25 $(#[$meta])*
26 pub(crate) struct $name {
27 pub(crate) tick_handle: std::sync::Mutex<Option<tokio::task::JoinHandle<()>>>,
28 pub(crate) tick_interval: std::time::Duration,
29 pub(crate) sender: tokio::sync::mpsc::Sender<$event_ty>,
30 }
31
32 #[async_trait::async_trait]
33 impl common_meta::leadership_notifier::LeadershipChangeListener for $name {
34 fn name(&self) -> &'static str {
35 stringify!($name)
36 }
37
38 async fn on_leader_start(&self) -> common_meta::error::Result<()> {
39 self.start();
40 Ok(())
41 }
42
43 async fn on_leader_stop(&self) -> common_meta::error::Result<()> {
44 self.stop();
45 Ok(())
46 }
47 }
48
49 impl $name {
50 pub(crate) fn new(
51 tick_interval: std::time::Duration,
52 sender: tokio::sync::mpsc::Sender<$event_ty>,
53 ) -> Self {
54 Self {
55 tick_handle: std::sync::Mutex::new(None),
56 tick_interval,
57 sender,
58 }
59 }
60
61 pub fn start(&self) {
62 let mut handle = self.tick_handle.lock().unwrap();
63 if handle.is_none() {
64 let sender = self.sender.clone();
65 let tick_interval = self.tick_interval;
66 let ticker_loop = tokio::spawn(async move {
67 let mut interval = tokio::time::interval_at(
68 tokio::time::Instant::now() + tick_interval,
69 tick_interval,
70 );
71 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
72 loop {
73 interval.tick().await;
74 if sender.send($event_val).await.is_err() {
75 common_telemetry::info!("EventReceiver is dropped, tick loop is stopped");
76 break;
77 }
78 }
79 });
80 *handle = Some(ticker_loop);
81 }
82 common_telemetry::info!("{} started.", stringify!($name));
83 }
84
85 pub fn stop(&self) {
86 let mut handle = self.tick_handle.lock().unwrap();
87 if let Some(handle) = handle.take() {
88 handle.abort();
89 }
90 common_telemetry::info!("{} stopped.", stringify!($name));
91 }
92 }
93
94 impl Drop for $name {
95 fn drop(&mut self) {
96 self.stop();
97 }
98 }
99 };
100}