meta_srv/
utils.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod etcd;
16pub mod insert_forwarder;
17#[cfg(feature = "mysql_kvbackend")]
18pub mod mysql;
19#[cfg(feature = "pg_kvbackend")]
20pub mod postgres;
21
22#[macro_export]
23macro_rules! define_ticker {
24    (
25        $(#[$meta:meta])*
26        $name:ident,
27        event_type = $event_ty:ty,
28        event_value = $event_val:expr
29    ) => {
30        $(#[$meta])*
31        pub(crate) struct $name {
32            pub(crate) tick_handle: std::sync::Mutex<Option<tokio::task::JoinHandle<()>>>,
33            pub(crate) tick_interval: std::time::Duration,
34            pub(crate) sender: tokio::sync::mpsc::Sender<$event_ty>,
35        }
36
37        #[async_trait::async_trait]
38        impl common_meta::leadership_notifier::LeadershipChangeListener for $name {
39            fn name(&self) -> &'static str {
40                stringify!($name)
41            }
42
43            async fn on_leader_start(&self) -> common_meta::error::Result<()> {
44                self.start();
45                Ok(())
46            }
47
48            async fn on_leader_stop(&self) -> common_meta::error::Result<()> {
49                self.stop();
50                Ok(())
51            }
52        }
53
54        impl $name {
55            pub(crate) fn new(
56                tick_interval: std::time::Duration,
57                sender: tokio::sync::mpsc::Sender<$event_ty>,
58            ) -> Self {
59                Self {
60                    tick_handle: std::sync::Mutex::new(None),
61                    tick_interval,
62                    sender,
63                }
64            }
65
66            pub fn start(&self) {
67                let mut handle = self.tick_handle.lock().unwrap();
68                if handle.is_none() {
69                    let sender = self.sender.clone();
70                    let tick_interval = self.tick_interval;
71                    let ticker_loop = tokio::spawn(async move {
72                        let mut interval = tokio::time::interval_at(
73                            tokio::time::Instant::now() + tick_interval,
74                            tick_interval,
75                        );
76                        interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
77                        loop {
78                            interval.tick().await;
79                            if sender.send($event_val).await.is_err() {
80                                common_telemetry::info!("EventReceiver is dropped, tick loop is stopped");
81                                break;
82                            }
83                        }
84                    });
85                    *handle = Some(ticker_loop);
86                }
87                common_telemetry::info!("{} started.", stringify!($name));
88            }
89
90            pub fn stop(&self) {
91                let mut handle = self.tick_handle.lock().unwrap();
92                if let Some(handle) = handle.take() {
93                    handle.abort();
94                }
95                common_telemetry::info!("{} stopped.", stringify!($name));
96            }
97        }
98
99        impl Drop for $name {
100            fn drop(&mut self) {
101                self.stop();
102            }
103        }
104    };
105}