1use std::fmt;
16use std::sync::Arc;
17use std::time::Duration;
18
19use api::v1::meta::heartbeat_client::HeartbeatClient;
20use api::v1::meta::{HeartbeatRequest, HeartbeatResponse, RequestHeader, Role};
21use common_grpc::channel_manager::ChannelManager;
22use common_meta::distributed_time_constants::BASE_HEARTBEAT_INTERVAL;
23use common_meta::util;
24use common_telemetry::tracing_context::TracingContext;
25use common_telemetry::{info, warn};
26use snafu::{OptionExt, ResultExt, ensure};
27use tokio::sync::{RwLock, mpsc};
28use tokio_stream::wrappers::ReceiverStream;
29use tonic::Streaming;
30use tonic::codec::CompressionEncoding;
31use tonic::transport::Channel;
32
33use crate::client::ask_leader::AskLeader;
34use crate::client::{Id, LeaderProviderRef};
35use crate::error;
36use crate::error::{InvalidResponseHeaderSnafu, Result};
37
38#[derive(Debug, Clone, Copy)]
40pub struct HeartbeatConfig {
41 pub interval: Duration,
42 pub retry_interval: Duration,
43}
44
45impl Default for HeartbeatConfig {
46 fn default() -> Self {
47 Self {
48 interval: BASE_HEARTBEAT_INTERVAL,
49 retry_interval: BASE_HEARTBEAT_INTERVAL,
50 }
51 }
52}
53
54impl fmt::Display for HeartbeatConfig {
55 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
56 write!(
57 f,
58 "interval={:?}, retry={:?}",
59 self.interval, self.retry_interval
60 )
61 }
62}
63
64impl HeartbeatConfig {
65 pub fn from_response(res: &HeartbeatResponse) -> Self {
67 if let Some(cfg) = &res.heartbeat_config {
68 Self {
70 interval: Duration::from_millis(cfg.heartbeat_interval_ms),
71 retry_interval: Duration::from_millis(cfg.retry_interval_ms),
72 }
73 } else {
74 let fallback = Self::default();
75 warn!(
76 "Metasrv didn't provide heartbeat_config, using default: {}",
77 fallback
78 );
79 fallback
80 }
81 }
82}
83
84pub struct HeartbeatSender {
85 id: Id,
86 role: Role,
87 sender: mpsc::Sender<HeartbeatRequest>,
88}
89
90impl HeartbeatSender {
91 #[inline]
92 fn new(id: Id, role: Role, sender: mpsc::Sender<HeartbeatRequest>) -> Self {
93 Self { id, role, sender }
94 }
95
96 #[inline]
97 pub fn id(&self) -> Id {
98 self.id
99 }
100
101 #[inline]
102 pub async fn send(&self, mut req: HeartbeatRequest) -> Result<()> {
103 req.set_header(
104 self.id,
105 self.role,
106 TracingContext::from_current_span().to_w3c(),
107 );
108 self.sender.send(req).await.map_err(|e| {
109 error::SendHeartbeatSnafu {
110 err_msg: e.to_string(),
111 }
112 .build()
113 })
114 }
115}
116
117#[derive(Debug)]
118pub struct HeartbeatStream {
119 id: Id,
120 stream: Streaming<HeartbeatResponse>,
121}
122
123impl HeartbeatStream {
124 #[inline]
125 fn new(id: Id, stream: Streaming<HeartbeatResponse>) -> Self {
126 Self { id, stream }
127 }
128
129 #[inline]
130 pub fn id(&self) -> Id {
131 self.id
132 }
133
134 #[inline]
136 pub async fn message(&mut self) -> Result<Option<HeartbeatResponse>> {
137 let res = self.stream.message().await.map_err(error::Error::from);
138 if let Ok(Some(heartbeat)) = &res {
139 util::check_response_header(heartbeat.header.as_ref())
140 .context(InvalidResponseHeaderSnafu)?;
141 }
142 res
143 }
144}
145
146#[derive(Clone, Debug)]
147pub struct Client {
148 inner: Arc<RwLock<Inner>>,
149}
150
151impl Client {
152 pub fn new(id: Id, role: Role, channel_manager: ChannelManager, max_retry: usize) -> Self {
153 let inner = Arc::new(RwLock::new(Inner::new(
154 id,
155 role,
156 channel_manager,
157 max_retry,
158 )));
159 Self { inner }
160 }
161
162 pub async fn start<U, A>(&mut self, urls: A) -> Result<()>
163 where
164 U: AsRef<str>,
165 A: AsRef<[U]>,
166 {
167 let mut inner = self.inner.write().await;
168 inner.start(urls)
169 }
170
171 pub(crate) async fn start_with(&self, leader_provider: LeaderProviderRef) -> Result<()> {
173 let mut inner = self.inner.write().await;
174 inner.start_with(leader_provider)
175 }
176
177 pub async fn ask_leader(&mut self) -> Result<String> {
178 let inner = self.inner.read().await;
179 inner.ask_leader().await
180 }
181
182 pub async fn heartbeat(
183 &mut self,
184 ) -> Result<(HeartbeatSender, HeartbeatStream, HeartbeatConfig)> {
185 let inner = self.inner.read().await;
186 inner.ask_leader().await?;
187 inner.heartbeat().await
188 }
189}
190
191#[derive(Debug)]
192struct Inner {
193 id: Id,
194 role: Role,
195 channel_manager: ChannelManager,
196 leader_provider: Option<LeaderProviderRef>,
197 max_retry: usize,
198}
199
200impl Inner {
201 fn new(id: Id, role: Role, channel_manager: ChannelManager, max_retry: usize) -> Self {
202 Self {
203 id,
204 role,
205 channel_manager,
206 leader_provider: None,
207 max_retry,
208 }
209 }
210
211 fn start_with(&mut self, leader_provider: LeaderProviderRef) -> Result<()> {
212 ensure!(
213 !self.is_started(),
214 error::IllegalGrpcClientStateSnafu {
215 err_msg: "Heartbeat client already started"
216 }
217 );
218 self.leader_provider = Some(leader_provider);
219 Ok(())
220 }
221
222 fn start<U, A>(&mut self, urls: A) -> Result<()>
223 where
224 U: AsRef<str>,
225 A: AsRef<[U]>,
226 {
227 let peers = urls
228 .as_ref()
229 .iter()
230 .map(|url| url.as_ref().to_string())
231 .collect::<Vec<_>>();
232 let ask_leader = AskLeader::new(
233 self.id,
234 self.role,
235 peers,
236 self.channel_manager.clone(),
237 self.max_retry,
238 );
239 self.start_with(Arc::new(ask_leader))
240 }
241
242 async fn ask_leader(&self) -> Result<String> {
243 let Some(leader_provider) = self.leader_provider.as_ref() else {
244 return error::IllegalGrpcClientStateSnafu {
245 err_msg: "not started",
246 }
247 .fail();
248 };
249 leader_provider.ask_leader().await
250 }
251
252 async fn heartbeat(&self) -> Result<(HeartbeatSender, HeartbeatStream, HeartbeatConfig)> {
253 ensure!(
254 self.is_started(),
255 error::IllegalGrpcClientStateSnafu {
256 err_msg: "Heartbeat client not start"
257 }
258 );
259
260 let leader_addr = self
261 .leader_provider
262 .as_ref()
263 .unwrap()
264 .leader()
265 .context(error::NoLeaderSnafu)?;
266 let mut leader = self.make_client(&leader_addr)?;
267
268 let (sender, receiver) = mpsc::channel::<HeartbeatRequest>(128);
269
270 let header = RequestHeader::new(
271 self.id,
272 self.role,
273 TracingContext::from_current_span().to_w3c(),
274 );
275 let handshake = HeartbeatRequest {
276 header: Some(header),
277 ..Default::default()
278 };
279 sender.send(handshake).await.map_err(|e| {
280 error::SendHeartbeatSnafu {
281 err_msg: e.to_string(),
282 }
283 .build()
284 })?;
285 let receiver = ReceiverStream::new(receiver);
286
287 let mut stream = leader
288 .heartbeat(receiver)
289 .await
290 .map_err(error::Error::from)?
291 .into_inner();
292
293 let res = stream
294 .message()
295 .await
296 .map_err(error::Error::from)?
297 .context(error::CreateHeartbeatStreamSnafu)?;
298
299 let config = HeartbeatConfig::from_response(&res);
301
302 info!(
303 "Handshake successful with Metasrv at {}, received config: {}",
304 leader_addr, config
305 );
306
307 Ok((
308 HeartbeatSender::new(self.id, self.role, sender),
309 HeartbeatStream::new(self.id, stream),
310 config,
311 ))
312 }
313
314 fn make_client(&self, addr: impl AsRef<str>) -> Result<HeartbeatClient<Channel>> {
315 let channel = self
316 .channel_manager
317 .get(addr)
318 .context(error::CreateChannelSnafu)?;
319
320 Ok(HeartbeatClient::new(channel)
321 .accept_compressed(CompressionEncoding::Zstd)
322 .accept_compressed(CompressionEncoding::Gzip)
323 .send_compressed(CompressionEncoding::Zstd))
324 }
325
326 #[inline]
327 pub(crate) fn is_started(&self) -> bool {
328 self.leader_provider.is_some()
329 }
330}
331
332#[cfg(test)]
333mod test {
334 use super::*;
335
336 #[tokio::test]
337 async fn test_already_start() {
338 let mut client = Client::new(0, Role::Datanode, ChannelManager::default(), 3);
339 client
340 .start(&["127.0.0.1:1000", "127.0.0.1:1001"])
341 .await
342 .unwrap();
343 let res = client.start(&["127.0.0.1:1002"]).await;
344 assert!(res.is_err());
345 assert!(matches!(
346 res.err(),
347 Some(error::Error::IllegalGrpcClientState { .. })
348 ));
349 }
350
351 #[tokio::test]
352 async fn test_heartbeat_stream() {
353 let (sender, mut receiver) = mpsc::channel::<HeartbeatRequest>(100);
354 let sender = HeartbeatSender::new(8, Role::Datanode, sender);
355 let _handle = tokio::spawn(async move {
356 for _ in 0..10 {
357 sender.send(HeartbeatRequest::default()).await.unwrap();
358 }
359 });
360 while let Some(req) = receiver.recv().await {
361 let header = req.header.unwrap();
362 assert_eq!(8, header.member_id);
363 }
364 }
365}