1pub mod database;
16pub mod etcd;
17pub mod insert_forwarder;
18#[cfg(feature = "mysql_kvbackend")]
19pub mod mysql;
20#[cfg(feature = "pg_kvbackend")]
21pub mod postgres;
22
23#[macro_export]
24macro_rules! define_ticker {
25 (
26 $(#[$meta:meta])*
27 $name:ident,
28 event_type = $event_ty:ty,
29 event_value = $event_val:expr
30 ) => {
31 $(#[$meta])*
32 pub struct $name {
33 pub tick_handle: std::sync::Mutex<Option<tokio::task::JoinHandle<()>>>,
34 pub tick_interval: std::time::Duration,
35 pub sender: tokio::sync::mpsc::Sender<$event_ty>,
36 }
37
38 #[async_trait::async_trait]
39 impl common_meta::leadership_notifier::LeadershipChangeListener for $name {
40 fn name(&self) -> &'static str {
41 stringify!($name)
42 }
43
44 async fn on_leader_start(&self) -> common_meta::error::Result<()> {
45 self.start();
46 Ok(())
47 }
48
49 async fn on_leader_stop(&self) -> common_meta::error::Result<()> {
50 self.stop();
51 Ok(())
52 }
53 }
54
55 impl $name {
56 pub(crate) fn new(
57 tick_interval: std::time::Duration,
58 sender: tokio::sync::mpsc::Sender<$event_ty>,
59 ) -> Self {
60 Self {
61 tick_handle: std::sync::Mutex::new(None),
62 tick_interval,
63 sender,
64 }
65 }
66
67 pub fn start(&self) {
68 let mut handle = self.tick_handle.lock().unwrap();
69 if handle.is_none() {
70 let sender = self.sender.clone();
71 let tick_interval = self.tick_interval;
72 let ticker_loop = tokio::spawn(async move {
73 let mut interval = tokio::time::interval_at(
74 tokio::time::Instant::now() + tick_interval,
75 tick_interval,
76 );
77 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
78 loop {
79 interval.tick().await;
80 if sender.send($event_val).await.is_err() {
81 common_telemetry::info!("EventReceiver is dropped, tick loop is stopped");
82 break;
83 }
84 }
85 });
86 *handle = Some(ticker_loop);
87 }
88 common_telemetry::info!("{} started.", stringify!($name));
89 }
90
91 pub fn stop(&self) {
92 let mut handle = self.tick_handle.lock().unwrap();
93 if let Some(handle) = handle.take() {
94 handle.abort();
95 }
96 common_telemetry::info!("{} stopped.", stringify!($name));
97 }
98 }
99
100 impl Drop for $name {
101 fn drop(&mut self) {
102 self.stop();
103 }
104 }
105 };
106}