meta_client/client/
heartbeat.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::fmt;
16use std::sync::Arc;
17use std::time::Duration;
18
19use api::v1::meta::heartbeat_client::HeartbeatClient;
20use api::v1::meta::{HeartbeatRequest, HeartbeatResponse, RequestHeader, Role};
21use common_grpc::channel_manager::ChannelManager;
22use common_meta::distributed_time_constants::BASE_HEARTBEAT_INTERVAL;
23use common_meta::util;
24use common_telemetry::tracing_context::TracingContext;
25use common_telemetry::{info, warn};
26use snafu::{OptionExt, ResultExt, ensure};
27use tokio::sync::{RwLock, mpsc};
28use tokio_stream::wrappers::ReceiverStream;
29use tonic::Streaming;
30use tonic::codec::CompressionEncoding;
31use tonic::transport::Channel;
32
33use crate::client::{Id, LeaderProviderRef};
34use crate::error;
35use crate::error::{InvalidResponseHeaderSnafu, Result};
36
37/// Heartbeat configuration received from Metasrv during handshake.
38#[derive(Debug, Clone, Copy)]
39pub struct HeartbeatConfig {
40    pub interval: Duration,
41    pub retry_interval: Duration,
42}
43
44impl Default for HeartbeatConfig {
45    fn default() -> Self {
46        Self {
47            interval: BASE_HEARTBEAT_INTERVAL,
48            retry_interval: BASE_HEARTBEAT_INTERVAL,
49        }
50    }
51}
52
53impl fmt::Display for HeartbeatConfig {
54    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
55        write!(
56            f,
57            "interval={:?}, retry={:?}",
58            self.interval, self.retry_interval
59        )
60    }
61}
62
63impl HeartbeatConfig {
64    /// Extract configuration from HeartbeatResponse.
65    pub fn from_response(res: &HeartbeatResponse) -> Self {
66        if let Some(cfg) = &res.heartbeat_config {
67            // Metasrv provided complete configuration
68            Self {
69                interval: Duration::from_millis(cfg.heartbeat_interval_ms),
70                retry_interval: Duration::from_millis(cfg.retry_interval_ms),
71            }
72        } else {
73            let fallback = Self::default();
74            warn!(
75                "Metasrv didn't provide heartbeat_config, using default: {}",
76                fallback
77            );
78            fallback
79        }
80    }
81}
82
83pub struct HeartbeatSender {
84    id: Id,
85    role: Role,
86    sender: mpsc::Sender<HeartbeatRequest>,
87}
88
89impl HeartbeatSender {
90    #[inline]
91    fn new(id: Id, role: Role, sender: mpsc::Sender<HeartbeatRequest>) -> Self {
92        Self { id, role, sender }
93    }
94
95    #[inline]
96    pub fn id(&self) -> Id {
97        self.id
98    }
99
100    #[inline]
101    pub async fn send(&self, mut req: HeartbeatRequest) -> Result<()> {
102        req.set_header(
103            self.id,
104            self.role,
105            TracingContext::from_current_span().to_w3c(),
106        );
107        self.sender.send(req).await.map_err(|e| {
108            error::SendHeartbeatSnafu {
109                err_msg: e.to_string(),
110            }
111            .build()
112        })
113    }
114}
115
116#[derive(Debug)]
117pub struct HeartbeatStream {
118    id: Id,
119    stream: Streaming<HeartbeatResponse>,
120}
121
122impl HeartbeatStream {
123    #[inline]
124    fn new(id: Id, stream: Streaming<HeartbeatResponse>) -> Self {
125        Self { id, stream }
126    }
127
128    #[inline]
129    pub fn id(&self) -> Id {
130        self.id
131    }
132
133    /// Fetch the next message from this stream.
134    #[inline]
135    pub async fn message(&mut self) -> Result<Option<HeartbeatResponse>> {
136        let res = self.stream.message().await.map_err(error::Error::from);
137        if let Ok(Some(heartbeat)) = &res {
138            util::check_response_header(heartbeat.header.as_ref())
139                .context(InvalidResponseHeaderSnafu)?;
140        }
141        res
142    }
143}
144
145#[derive(Clone, Debug)]
146pub struct Client {
147    inner: Arc<RwLock<Inner>>,
148}
149
150impl Client {
151    pub fn new(id: Id, role: Role, channel_manager: ChannelManager) -> Self {
152        let inner = Arc::new(RwLock::new(Inner::new(id, role, channel_manager)));
153        Self { inner }
154    }
155
156    /// Start the client with a [LeaderProvider].
157    pub(crate) async fn start_with(&self, leader_provider: LeaderProviderRef) -> Result<()> {
158        let mut inner = self.inner.write().await;
159        inner.start_with(leader_provider)
160    }
161
162    pub async fn ask_leader(&mut self) -> Result<String> {
163        let inner = self.inner.read().await;
164        inner.ask_leader().await
165    }
166
167    pub async fn heartbeat(
168        &mut self,
169    ) -> Result<(HeartbeatSender, HeartbeatStream, HeartbeatConfig)> {
170        let inner = self.inner.read().await;
171        inner.ask_leader().await?;
172        inner.heartbeat().await
173    }
174}
175
176#[derive(Debug)]
177struct Inner {
178    id: Id,
179    role: Role,
180    channel_manager: ChannelManager,
181    leader_provider: Option<LeaderProviderRef>,
182}
183
184impl Inner {
185    fn new(id: Id, role: Role, channel_manager: ChannelManager) -> Self {
186        Self {
187            id,
188            role,
189            channel_manager,
190            leader_provider: None,
191        }
192    }
193
194    fn start_with(&mut self, leader_provider: LeaderProviderRef) -> Result<()> {
195        ensure!(
196            !self.is_started(),
197            error::IllegalGrpcClientStateSnafu {
198                err_msg: "Heartbeat client already started"
199            }
200        );
201        self.leader_provider = Some(leader_provider);
202        Ok(())
203    }
204
205    async fn ask_leader(&self) -> Result<String> {
206        let Some(leader_provider) = self.leader_provider.as_ref() else {
207            return error::IllegalGrpcClientStateSnafu {
208                err_msg: "not started",
209            }
210            .fail();
211        };
212        leader_provider.ask_leader().await
213    }
214
215    async fn heartbeat(&self) -> Result<(HeartbeatSender, HeartbeatStream, HeartbeatConfig)> {
216        ensure!(
217            self.is_started(),
218            error::IllegalGrpcClientStateSnafu {
219                err_msg: "Heartbeat client not start"
220            }
221        );
222
223        let leader_addr = self
224            .leader_provider
225            .as_ref()
226            .unwrap()
227            .leader()
228            .context(error::NoLeaderSnafu)?;
229        let mut leader = self.make_client(&leader_addr)?;
230
231        let (sender, receiver) = mpsc::channel::<HeartbeatRequest>(128);
232
233        let header = RequestHeader::new(
234            self.id,
235            self.role,
236            TracingContext::from_current_span().to_w3c(),
237        );
238        let handshake = HeartbeatRequest {
239            header: Some(header),
240            ..Default::default()
241        };
242        sender.send(handshake).await.map_err(|e| {
243            error::SendHeartbeatSnafu {
244                err_msg: e.to_string(),
245            }
246            .build()
247        })?;
248        let receiver = ReceiverStream::new(receiver);
249
250        let mut stream = leader
251            .heartbeat(receiver)
252            .await
253            .map_err(error::Error::from)?
254            .into_inner();
255
256        let res = stream
257            .message()
258            .await
259            .map_err(error::Error::from)?
260            .context(error::CreateHeartbeatStreamSnafu)?;
261
262        // Extract heartbeat configuration from handshake response
263        let config = HeartbeatConfig::from_response(&res);
264
265        info!(
266            "Handshake successful with Metasrv at {}, received config: {}",
267            leader_addr, config
268        );
269
270        Ok((
271            HeartbeatSender::new(self.id, self.role, sender),
272            HeartbeatStream::new(self.id, stream),
273            config,
274        ))
275    }
276
277    fn make_client(&self, addr: impl AsRef<str>) -> Result<HeartbeatClient<Channel>> {
278        let channel = self
279            .channel_manager
280            .get(addr)
281            .context(error::CreateChannelSnafu)?;
282
283        Ok(HeartbeatClient::new(channel)
284            .accept_compressed(CompressionEncoding::Zstd)
285            .accept_compressed(CompressionEncoding::Gzip)
286            .send_compressed(CompressionEncoding::Zstd))
287    }
288
289    #[inline]
290    pub(crate) fn is_started(&self) -> bool {
291        self.leader_provider.is_some()
292    }
293}
294
295#[cfg(test)]
296mod test {
297    use super::*;
298    use crate::client::AskLeader;
299
300    #[tokio::test]
301    async fn test_already_start() {
302        let client = Client::new(0, Role::Datanode, ChannelManager::default());
303        let leader_provider = Arc::new(AskLeader::new(
304            0,
305            Role::Datanode,
306            vec!["127.0.0.1:1000".to_string(), "127.0.0.1:1001".to_string()],
307            ChannelManager::default(),
308            3,
309        ));
310        client.start_with(leader_provider.clone()).await.unwrap();
311        let res = client.start_with(leader_provider).await;
312        assert!(res.is_err());
313        assert!(matches!(
314            res.err(),
315            Some(error::Error::IllegalGrpcClientState { .. })
316        ));
317    }
318
319    #[tokio::test]
320    async fn test_heartbeat_stream() {
321        let (sender, mut receiver) = mpsc::channel::<HeartbeatRequest>(100);
322        let sender = HeartbeatSender::new(8, Role::Datanode, sender);
323        let _handle = tokio::spawn(async move {
324            for _ in 0..10 {
325                sender.send(HeartbeatRequest::default()).await.unwrap();
326            }
327        });
328        while let Some(req) = receiver.recv().await {
329            let header = req.header.unwrap();
330            assert_eq!(8, header.member_id);
331        }
332    }
333}