meta_client/client/
heartbeat.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::fmt;
16use std::sync::Arc;
17use std::time::Duration;
18
19use api::v1::meta::heartbeat_client::HeartbeatClient;
20use api::v1::meta::{HeartbeatRequest, HeartbeatResponse, RequestHeader, Role};
21use common_grpc::channel_manager::ChannelManager;
22use common_meta::distributed_time_constants::BASE_HEARTBEAT_INTERVAL;
23use common_meta::util;
24use common_telemetry::tracing_context::TracingContext;
25use common_telemetry::{info, warn};
26use snafu::{OptionExt, ResultExt, ensure};
27use tokio::sync::{RwLock, mpsc};
28use tokio_stream::wrappers::ReceiverStream;
29use tonic::Streaming;
30use tonic::codec::CompressionEncoding;
31use tonic::transport::Channel;
32
33use crate::client::ask_leader::AskLeader;
34use crate::client::{Id, LeaderProviderRef};
35use crate::error;
36use crate::error::{InvalidResponseHeaderSnafu, Result};
37
38/// Heartbeat configuration received from Metasrv during handshake.
39#[derive(Debug, Clone, Copy)]
40pub struct HeartbeatConfig {
41    pub interval: Duration,
42    pub retry_interval: Duration,
43}
44
45impl Default for HeartbeatConfig {
46    fn default() -> Self {
47        Self {
48            interval: BASE_HEARTBEAT_INTERVAL,
49            retry_interval: BASE_HEARTBEAT_INTERVAL,
50        }
51    }
52}
53
54impl fmt::Display for HeartbeatConfig {
55    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
56        write!(
57            f,
58            "interval={:?}, retry={:?}",
59            self.interval, self.retry_interval
60        )
61    }
62}
63
64impl HeartbeatConfig {
65    /// Extract configuration from HeartbeatResponse.
66    pub fn from_response(res: &HeartbeatResponse) -> Self {
67        if let Some(cfg) = &res.heartbeat_config {
68            // Metasrv provided complete configuration
69            Self {
70                interval: Duration::from_millis(cfg.heartbeat_interval_ms),
71                retry_interval: Duration::from_millis(cfg.retry_interval_ms),
72            }
73        } else {
74            let fallback = Self::default();
75            warn!(
76                "Metasrv didn't provide heartbeat_config, using default: {}",
77                fallback
78            );
79            fallback
80        }
81    }
82}
83
84pub struct HeartbeatSender {
85    id: Id,
86    role: Role,
87    sender: mpsc::Sender<HeartbeatRequest>,
88}
89
90impl HeartbeatSender {
91    #[inline]
92    fn new(id: Id, role: Role, sender: mpsc::Sender<HeartbeatRequest>) -> Self {
93        Self { id, role, sender }
94    }
95
96    #[inline]
97    pub fn id(&self) -> Id {
98        self.id
99    }
100
101    #[inline]
102    pub async fn send(&self, mut req: HeartbeatRequest) -> Result<()> {
103        req.set_header(
104            self.id,
105            self.role,
106            TracingContext::from_current_span().to_w3c(),
107        );
108        self.sender.send(req).await.map_err(|e| {
109            error::SendHeartbeatSnafu {
110                err_msg: e.to_string(),
111            }
112            .build()
113        })
114    }
115}
116
117#[derive(Debug)]
118pub struct HeartbeatStream {
119    id: Id,
120    stream: Streaming<HeartbeatResponse>,
121}
122
123impl HeartbeatStream {
124    #[inline]
125    fn new(id: Id, stream: Streaming<HeartbeatResponse>) -> Self {
126        Self { id, stream }
127    }
128
129    #[inline]
130    pub fn id(&self) -> Id {
131        self.id
132    }
133
134    /// Fetch the next message from this stream.
135    #[inline]
136    pub async fn message(&mut self) -> Result<Option<HeartbeatResponse>> {
137        let res = self.stream.message().await.map_err(error::Error::from);
138        if let Ok(Some(heartbeat)) = &res {
139            util::check_response_header(heartbeat.header.as_ref())
140                .context(InvalidResponseHeaderSnafu)?;
141        }
142        res
143    }
144}
145
146#[derive(Clone, Debug)]
147pub struct Client {
148    inner: Arc<RwLock<Inner>>,
149}
150
151impl Client {
152    pub fn new(id: Id, role: Role, channel_manager: ChannelManager, max_retry: usize) -> Self {
153        let inner = Arc::new(RwLock::new(Inner::new(
154            id,
155            role,
156            channel_manager,
157            max_retry,
158        )));
159        Self { inner }
160    }
161
162    pub async fn start<U, A>(&mut self, urls: A) -> Result<()>
163    where
164        U: AsRef<str>,
165        A: AsRef<[U]>,
166    {
167        let mut inner = self.inner.write().await;
168        inner.start(urls)
169    }
170
171    /// Start the client with a [LeaderProvider].
172    pub(crate) async fn start_with(&self, leader_provider: LeaderProviderRef) -> Result<()> {
173        let mut inner = self.inner.write().await;
174        inner.start_with(leader_provider)
175    }
176
177    pub async fn ask_leader(&mut self) -> Result<String> {
178        let inner = self.inner.read().await;
179        inner.ask_leader().await
180    }
181
182    pub async fn heartbeat(
183        &mut self,
184    ) -> Result<(HeartbeatSender, HeartbeatStream, HeartbeatConfig)> {
185        let inner = self.inner.read().await;
186        inner.ask_leader().await?;
187        inner.heartbeat().await
188    }
189}
190
191#[derive(Debug)]
192struct Inner {
193    id: Id,
194    role: Role,
195    channel_manager: ChannelManager,
196    leader_provider: Option<LeaderProviderRef>,
197    max_retry: usize,
198}
199
200impl Inner {
201    fn new(id: Id, role: Role, channel_manager: ChannelManager, max_retry: usize) -> Self {
202        Self {
203            id,
204            role,
205            channel_manager,
206            leader_provider: None,
207            max_retry,
208        }
209    }
210
211    fn start_with(&mut self, leader_provider: LeaderProviderRef) -> Result<()> {
212        ensure!(
213            !self.is_started(),
214            error::IllegalGrpcClientStateSnafu {
215                err_msg: "Heartbeat client already started"
216            }
217        );
218        self.leader_provider = Some(leader_provider);
219        Ok(())
220    }
221
222    fn start<U, A>(&mut self, urls: A) -> Result<()>
223    where
224        U: AsRef<str>,
225        A: AsRef<[U]>,
226    {
227        let peers = urls
228            .as_ref()
229            .iter()
230            .map(|url| url.as_ref().to_string())
231            .collect::<Vec<_>>();
232        let ask_leader = AskLeader::new(
233            self.id,
234            self.role,
235            peers,
236            self.channel_manager.clone(),
237            self.max_retry,
238        );
239        self.start_with(Arc::new(ask_leader))
240    }
241
242    async fn ask_leader(&self) -> Result<String> {
243        let Some(leader_provider) = self.leader_provider.as_ref() else {
244            return error::IllegalGrpcClientStateSnafu {
245                err_msg: "not started",
246            }
247            .fail();
248        };
249        leader_provider.ask_leader().await
250    }
251
252    async fn heartbeat(&self) -> Result<(HeartbeatSender, HeartbeatStream, HeartbeatConfig)> {
253        ensure!(
254            self.is_started(),
255            error::IllegalGrpcClientStateSnafu {
256                err_msg: "Heartbeat client not start"
257            }
258        );
259
260        let leader_addr = self
261            .leader_provider
262            .as_ref()
263            .unwrap()
264            .leader()
265            .context(error::NoLeaderSnafu)?;
266        let mut leader = self.make_client(&leader_addr)?;
267
268        let (sender, receiver) = mpsc::channel::<HeartbeatRequest>(128);
269
270        let header = RequestHeader::new(
271            self.id,
272            self.role,
273            TracingContext::from_current_span().to_w3c(),
274        );
275        let handshake = HeartbeatRequest {
276            header: Some(header),
277            ..Default::default()
278        };
279        sender.send(handshake).await.map_err(|e| {
280            error::SendHeartbeatSnafu {
281                err_msg: e.to_string(),
282            }
283            .build()
284        })?;
285        let receiver = ReceiverStream::new(receiver);
286
287        let mut stream = leader
288            .heartbeat(receiver)
289            .await
290            .map_err(error::Error::from)?
291            .into_inner();
292
293        let res = stream
294            .message()
295            .await
296            .map_err(error::Error::from)?
297            .context(error::CreateHeartbeatStreamSnafu)?;
298
299        // Extract heartbeat configuration from handshake response
300        let config = HeartbeatConfig::from_response(&res);
301
302        info!(
303            "Handshake successful with Metasrv at {}, received config: {}",
304            leader_addr, config
305        );
306
307        Ok((
308            HeartbeatSender::new(self.id, self.role, sender),
309            HeartbeatStream::new(self.id, stream),
310            config,
311        ))
312    }
313
314    fn make_client(&self, addr: impl AsRef<str>) -> Result<HeartbeatClient<Channel>> {
315        let channel = self
316            .channel_manager
317            .get(addr)
318            .context(error::CreateChannelSnafu)?;
319
320        Ok(HeartbeatClient::new(channel)
321            .accept_compressed(CompressionEncoding::Zstd)
322            .accept_compressed(CompressionEncoding::Gzip)
323            .send_compressed(CompressionEncoding::Zstd))
324    }
325
326    #[inline]
327    pub(crate) fn is_started(&self) -> bool {
328        self.leader_provider.is_some()
329    }
330}
331
332#[cfg(test)]
333mod test {
334    use super::*;
335
336    #[tokio::test]
337    async fn test_already_start() {
338        let mut client = Client::new(0, Role::Datanode, ChannelManager::default(), 3);
339        client
340            .start(&["127.0.0.1:1000", "127.0.0.1:1001"])
341            .await
342            .unwrap();
343        let res = client.start(&["127.0.0.1:1002"]).await;
344        assert!(res.is_err());
345        assert!(matches!(
346            res.err(),
347            Some(error::Error::IllegalGrpcClientState { .. })
348        ));
349    }
350
351    #[tokio::test]
352    async fn test_heartbeat_stream() {
353        let (sender, mut receiver) = mpsc::channel::<HeartbeatRequest>(100);
354        let sender = HeartbeatSender::new(8, Role::Datanode, sender);
355        let _handle = tokio::spawn(async move {
356            for _ in 0..10 {
357                sender.send(HeartbeatRequest::default()).await.unwrap();
358            }
359        });
360        while let Some(req) = receiver.recv().await {
361            let header = req.header.unwrap();
362            assert_eq!(8, header.member_id);
363        }
364    }
365}