Skip to main content

meta_srv/
utils.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod database;
16pub mod etcd;
17pub mod insert_forwarder;
18#[cfg(feature = "mysql_kvbackend")]
19pub mod mysql;
20#[cfg(feature = "pg_kvbackend")]
21pub mod postgres;
22
23#[macro_export]
24macro_rules! define_ticker {
25    (
26        $(#[$meta:meta])*
27        $name:ident,
28        event_type = $event_ty:ty,
29        event_value = $event_val:expr
30    ) => {
31        $(#[$meta])*
32        pub struct $name {
33            pub tick_handle: std::sync::Mutex<Option<tokio::task::JoinHandle<()>>>,
34            pub tick_interval: std::time::Duration,
35            pub sender: tokio::sync::mpsc::Sender<$event_ty>,
36        }
37
38        #[async_trait::async_trait]
39        impl common_meta::leadership_notifier::LeadershipChangeListener for $name {
40            fn name(&self) -> &'static str {
41                stringify!($name)
42            }
43
44            async fn on_leader_start(&self) -> common_meta::error::Result<()> {
45                self.start();
46                Ok(())
47            }
48
49            async fn on_leader_stop(&self) -> common_meta::error::Result<()> {
50                self.stop();
51                Ok(())
52            }
53        }
54
55        impl $name {
56            pub(crate) fn new(
57                tick_interval: std::time::Duration,
58                sender: tokio::sync::mpsc::Sender<$event_ty>,
59            ) -> Self {
60                Self {
61                    tick_handle: std::sync::Mutex::new(None),
62                    tick_interval,
63                    sender,
64                }
65            }
66
67            pub fn start(&self) {
68                let mut handle = self.tick_handle.lock().unwrap();
69                if handle.is_none() {
70                    let sender = self.sender.clone();
71                    let tick_interval = self.tick_interval;
72                    let ticker_loop = tokio::spawn(async move {
73                        let mut interval = tokio::time::interval_at(
74                            tokio::time::Instant::now() + tick_interval,
75                            tick_interval,
76                        );
77                        interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
78                        loop {
79                            interval.tick().await;
80                            if sender.send($event_val).await.is_err() {
81                                common_telemetry::info!("EventReceiver is dropped, tick loop is stopped");
82                                break;
83                            }
84                        }
85                    });
86                    *handle = Some(ticker_loop);
87                }
88                common_telemetry::info!("{} started.", stringify!($name));
89            }
90
91            pub fn stop(&self) {
92                let mut handle = self.tick_handle.lock().unwrap();
93                if let Some(handle) = handle.take() {
94                    handle.abort();
95                }
96                common_telemetry::info!("{} stopped.", stringify!($name));
97            }
98        }
99
100        impl Drop for $name {
101            fn drop(&mut self) {
102                self.stop();
103            }
104        }
105    };
106}