1use std::fmt;
16use std::sync::Arc;
17use std::time::Duration;
18
19use api::v1::meta::heartbeat_client::HeartbeatClient;
20use api::v1::meta::{HeartbeatRequest, HeartbeatResponse, RequestHeader, Role};
21use common_grpc::channel_manager::ChannelManager;
22use common_meta::distributed_time_constants::BASE_HEARTBEAT_INTERVAL;
23use common_meta::util;
24use common_telemetry::tracing_context::TracingContext;
25use common_telemetry::{info, warn};
26use snafu::{OptionExt, ResultExt, ensure};
27use tokio::sync::{RwLock, mpsc};
28use tokio_stream::wrappers::ReceiverStream;
29use tonic::Streaming;
30use tonic::codec::CompressionEncoding;
31use tonic::transport::Channel;
32
33use crate::client::{Id, LeaderProviderRef};
34use crate::error;
35use crate::error::{InvalidResponseHeaderSnafu, Result};
36
37#[derive(Debug, Clone, Copy)]
39pub struct HeartbeatConfig {
40 pub interval: Duration,
41 pub retry_interval: Duration,
42}
43
44impl Default for HeartbeatConfig {
45 fn default() -> Self {
46 Self {
47 interval: BASE_HEARTBEAT_INTERVAL,
48 retry_interval: BASE_HEARTBEAT_INTERVAL,
49 }
50 }
51}
52
53impl fmt::Display for HeartbeatConfig {
54 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
55 write!(
56 f,
57 "interval={:?}, retry={:?}",
58 self.interval, self.retry_interval
59 )
60 }
61}
62
63impl HeartbeatConfig {
64 pub fn from_response(res: &HeartbeatResponse) -> Self {
66 if let Some(cfg) = &res.heartbeat_config {
67 Self {
69 interval: Duration::from_millis(cfg.heartbeat_interval_ms),
70 retry_interval: Duration::from_millis(cfg.retry_interval_ms),
71 }
72 } else {
73 let fallback = Self::default();
74 warn!(
75 "Metasrv didn't provide heartbeat_config, using default: {}",
76 fallback
77 );
78 fallback
79 }
80 }
81}
82
83pub struct HeartbeatSender {
84 id: Id,
85 role: Role,
86 sender: mpsc::Sender<HeartbeatRequest>,
87}
88
89impl HeartbeatSender {
90 #[inline]
91 fn new(id: Id, role: Role, sender: mpsc::Sender<HeartbeatRequest>) -> Self {
92 Self { id, role, sender }
93 }
94
95 #[inline]
96 pub fn id(&self) -> Id {
97 self.id
98 }
99
100 #[inline]
101 pub async fn send(&self, mut req: HeartbeatRequest) -> Result<()> {
102 req.set_header(
103 self.id,
104 self.role,
105 TracingContext::from_current_span().to_w3c(),
106 );
107 self.sender.send(req).await.map_err(|e| {
108 error::SendHeartbeatSnafu {
109 err_msg: e.to_string(),
110 }
111 .build()
112 })
113 }
114}
115
116#[derive(Debug)]
117pub struct HeartbeatStream {
118 id: Id,
119 stream: Streaming<HeartbeatResponse>,
120}
121
122impl HeartbeatStream {
123 #[inline]
124 fn new(id: Id, stream: Streaming<HeartbeatResponse>) -> Self {
125 Self { id, stream }
126 }
127
128 #[inline]
129 pub fn id(&self) -> Id {
130 self.id
131 }
132
133 #[inline]
135 pub async fn message(&mut self) -> Result<Option<HeartbeatResponse>> {
136 let res = self.stream.message().await.map_err(error::Error::from);
137 if let Ok(Some(heartbeat)) = &res {
138 util::check_response_header(heartbeat.header.as_ref())
139 .context(InvalidResponseHeaderSnafu)?;
140 }
141 res
142 }
143}
144
145#[derive(Clone, Debug)]
146pub struct Client {
147 inner: Arc<RwLock<Inner>>,
148}
149
150impl Client {
151 pub fn new(id: Id, role: Role, channel_manager: ChannelManager) -> Self {
152 let inner = Arc::new(RwLock::new(Inner::new(id, role, channel_manager)));
153 Self { inner }
154 }
155
156 pub(crate) async fn start_with(&self, leader_provider: LeaderProviderRef) -> Result<()> {
158 let mut inner = self.inner.write().await;
159 inner.start_with(leader_provider)
160 }
161
162 pub async fn ask_leader(&mut self) -> Result<String> {
163 let inner = self.inner.read().await;
164 inner.ask_leader().await
165 }
166
167 pub async fn heartbeat(
168 &mut self,
169 ) -> Result<(HeartbeatSender, HeartbeatStream, HeartbeatConfig)> {
170 let inner = self.inner.read().await;
171 inner.ask_leader().await?;
172 inner.heartbeat().await
173 }
174}
175
176#[derive(Debug)]
177struct Inner {
178 id: Id,
179 role: Role,
180 channel_manager: ChannelManager,
181 leader_provider: Option<LeaderProviderRef>,
182}
183
184impl Inner {
185 fn new(id: Id, role: Role, channel_manager: ChannelManager) -> Self {
186 Self {
187 id,
188 role,
189 channel_manager,
190 leader_provider: None,
191 }
192 }
193
194 fn start_with(&mut self, leader_provider: LeaderProviderRef) -> Result<()> {
195 ensure!(
196 !self.is_started(),
197 error::IllegalGrpcClientStateSnafu {
198 err_msg: "Heartbeat client already started"
199 }
200 );
201 self.leader_provider = Some(leader_provider);
202 Ok(())
203 }
204
205 async fn ask_leader(&self) -> Result<String> {
206 let Some(leader_provider) = self.leader_provider.as_ref() else {
207 return error::IllegalGrpcClientStateSnafu {
208 err_msg: "not started",
209 }
210 .fail();
211 };
212 leader_provider.ask_leader().await
213 }
214
215 async fn heartbeat(&self) -> Result<(HeartbeatSender, HeartbeatStream, HeartbeatConfig)> {
216 ensure!(
217 self.is_started(),
218 error::IllegalGrpcClientStateSnafu {
219 err_msg: "Heartbeat client not start"
220 }
221 );
222
223 let leader_addr = self
224 .leader_provider
225 .as_ref()
226 .unwrap()
227 .leader()
228 .context(error::NoLeaderSnafu)?;
229 let mut leader = self.make_client(&leader_addr)?;
230
231 let (sender, receiver) = mpsc::channel::<HeartbeatRequest>(128);
232
233 let header = RequestHeader::new(
234 self.id,
235 self.role,
236 TracingContext::from_current_span().to_w3c(),
237 );
238 let handshake = HeartbeatRequest {
239 header: Some(header),
240 ..Default::default()
241 };
242 sender.send(handshake).await.map_err(|e| {
243 error::SendHeartbeatSnafu {
244 err_msg: e.to_string(),
245 }
246 .build()
247 })?;
248 let receiver = ReceiverStream::new(receiver);
249
250 let mut stream = leader
251 .heartbeat(receiver)
252 .await
253 .map_err(error::Error::from)?
254 .into_inner();
255
256 let res = stream
257 .message()
258 .await
259 .map_err(error::Error::from)?
260 .context(error::CreateHeartbeatStreamSnafu)?;
261
262 let config = HeartbeatConfig::from_response(&res);
264
265 info!(
266 "Handshake successful with Metasrv at {}, received config: {}",
267 leader_addr, config
268 );
269
270 Ok((
271 HeartbeatSender::new(self.id, self.role, sender),
272 HeartbeatStream::new(self.id, stream),
273 config,
274 ))
275 }
276
277 fn make_client(&self, addr: impl AsRef<str>) -> Result<HeartbeatClient<Channel>> {
278 let channel = self
279 .channel_manager
280 .get(addr)
281 .context(error::CreateChannelSnafu)?;
282
283 Ok(HeartbeatClient::new(channel)
284 .accept_compressed(CompressionEncoding::Zstd)
285 .accept_compressed(CompressionEncoding::Gzip)
286 .send_compressed(CompressionEncoding::Zstd))
287 }
288
289 #[inline]
290 pub(crate) fn is_started(&self) -> bool {
291 self.leader_provider.is_some()
292 }
293}
294
295#[cfg(test)]
296mod test {
297 use super::*;
298 use crate::client::AskLeader;
299
300 #[tokio::test]
301 async fn test_already_start() {
302 let client = Client::new(0, Role::Datanode, ChannelManager::default());
303 let leader_provider = Arc::new(AskLeader::new(
304 0,
305 Role::Datanode,
306 vec!["127.0.0.1:1000".to_string(), "127.0.0.1:1001".to_string()],
307 ChannelManager::default(),
308 3,
309 ));
310 client.start_with(leader_provider.clone()).await.unwrap();
311 let res = client.start_with(leader_provider).await;
312 assert!(res.is_err());
313 assert!(matches!(
314 res.err(),
315 Some(error::Error::IllegalGrpcClientState { .. })
316 ));
317 }
318
319 #[tokio::test]
320 async fn test_heartbeat_stream() {
321 let (sender, mut receiver) = mpsc::channel::<HeartbeatRequest>(100);
322 let sender = HeartbeatSender::new(8, Role::Datanode, sender);
323 let _handle = tokio::spawn(async move {
324 for _ in 0..10 {
325 sender.send(HeartbeatRequest::default()).await.unwrap();
326 }
327 });
328 while let Some(req) = receiver.recv().await {
329 let header = req.header.unwrap();
330 assert_eq!(8, header.member_id);
331 }
332 }
333}