meta_srv/
utils.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod insert_forwarder;
16
17#[macro_export]
18macro_rules! define_ticker {
19    (
20        $(#[$meta:meta])*
21        $name:ident,
22        event_type = $event_ty:ty,
23        event_value = $event_val:expr
24    ) => {
25        $(#[$meta])*
26        pub(crate) struct $name {
27            pub(crate) tick_handle: std::sync::Mutex<Option<tokio::task::JoinHandle<()>>>,
28            pub(crate) tick_interval: std::time::Duration,
29            pub(crate) sender: tokio::sync::mpsc::Sender<$event_ty>,
30        }
31
32        #[async_trait::async_trait]
33        impl common_meta::leadership_notifier::LeadershipChangeListener for $name {
34            fn name(&self) -> &'static str {
35                stringify!($name)
36            }
37
38            async fn on_leader_start(&self) -> common_meta::error::Result<()> {
39                self.start();
40                Ok(())
41            }
42
43            async fn on_leader_stop(&self) -> common_meta::error::Result<()> {
44                self.stop();
45                Ok(())
46            }
47        }
48
49        impl $name {
50            pub(crate) fn new(
51                tick_interval: std::time::Duration,
52                sender: tokio::sync::mpsc::Sender<$event_ty>,
53            ) -> Self {
54                Self {
55                    tick_handle: std::sync::Mutex::new(None),
56                    tick_interval,
57                    sender,
58                }
59            }
60
61            pub fn start(&self) {
62                let mut handle = self.tick_handle.lock().unwrap();
63                if handle.is_none() {
64                    let sender = self.sender.clone();
65                    let tick_interval = self.tick_interval;
66                    let ticker_loop = tokio::spawn(async move {
67                        let mut interval = tokio::time::interval_at(
68                            tokio::time::Instant::now() + tick_interval,
69                            tick_interval,
70                        );
71                        interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
72                        loop {
73                            interval.tick().await;
74                            if sender.send($event_val).await.is_err() {
75                                common_telemetry::info!("EventReceiver is dropped, tick loop is stopped");
76                                break;
77                            }
78                        }
79                    });
80                    *handle = Some(ticker_loop);
81                }
82                common_telemetry::info!("{} started.", stringify!($name));
83            }
84
85            pub fn stop(&self) {
86                let mut handle = self.tick_handle.lock().unwrap();
87                if let Some(handle) = handle.take() {
88                    handle.abort();
89                }
90                common_telemetry::info!("{} stopped.", stringify!($name));
91            }
92        }
93
94        impl Drop for $name {
95            fn drop(&mut self) {
96                self.stop();
97            }
98        }
99    };
100}