1pub mod etcd;
16pub mod insert_forwarder;
17#[cfg(feature = "mysql_kvbackend")]
18pub mod mysql;
19#[cfg(feature = "pg_kvbackend")]
20pub mod postgres;
21
22#[macro_export]
23macro_rules! define_ticker {
24 (
25 $(#[$meta:meta])*
26 $name:ident,
27 event_type = $event_ty:ty,
28 event_value = $event_val:expr
29 ) => {
30 $(#[$meta])*
31 pub(crate) struct $name {
32 pub(crate) tick_handle: std::sync::Mutex<Option<tokio::task::JoinHandle<()>>>,
33 pub(crate) tick_interval: std::time::Duration,
34 pub(crate) sender: tokio::sync::mpsc::Sender<$event_ty>,
35 }
36
37 #[async_trait::async_trait]
38 impl common_meta::leadership_notifier::LeadershipChangeListener for $name {
39 fn name(&self) -> &'static str {
40 stringify!($name)
41 }
42
43 async fn on_leader_start(&self) -> common_meta::error::Result<()> {
44 self.start();
45 Ok(())
46 }
47
48 async fn on_leader_stop(&self) -> common_meta::error::Result<()> {
49 self.stop();
50 Ok(())
51 }
52 }
53
54 impl $name {
55 pub(crate) fn new(
56 tick_interval: std::time::Duration,
57 sender: tokio::sync::mpsc::Sender<$event_ty>,
58 ) -> Self {
59 Self {
60 tick_handle: std::sync::Mutex::new(None),
61 tick_interval,
62 sender,
63 }
64 }
65
66 pub fn start(&self) {
67 let mut handle = self.tick_handle.lock().unwrap();
68 if handle.is_none() {
69 let sender = self.sender.clone();
70 let tick_interval = self.tick_interval;
71 let ticker_loop = tokio::spawn(async move {
72 let mut interval = tokio::time::interval_at(
73 tokio::time::Instant::now() + tick_interval,
74 tick_interval,
75 );
76 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
77 loop {
78 interval.tick().await;
79 if sender.send($event_val).await.is_err() {
80 common_telemetry::info!("EventReceiver is dropped, tick loop is stopped");
81 break;
82 }
83 }
84 });
85 *handle = Some(ticker_loop);
86 }
87 common_telemetry::info!("{} started.", stringify!($name));
88 }
89
90 pub fn stop(&self) {
91 let mut handle = self.tick_handle.lock().unwrap();
92 if let Some(handle) = handle.take() {
93 handle.abort();
94 }
95 common_telemetry::info!("{} stopped.", stringify!($name));
96 }
97 }
98
99 impl Drop for $name {
100 fn drop(&mut self) {
101 self.stop();
102 }
103 }
104 };
105}