1use std::io::ErrorKind;
16use std::sync::Arc;
17use std::sync::atomic::AtomicU64;
18
19use api::v1::meta::{
20 AskLeaderRequest, AskLeaderResponse, HeartbeatRequest, HeartbeatResponse, Peer, RequestHeader,
21 ResponseHeader, Role, heartbeat_server,
22};
23use common_meta::election::LeaderChangeMessage;
24use common_telemetry::{debug, error, info, warn};
25use futures::StreamExt;
26use once_cell::sync::OnceCell;
27use snafu::{OptionExt, ResultExt};
28use tokio::sync::broadcast::error::RecvError;
29use tokio::sync::mpsc;
30use tokio::sync::mpsc::Sender;
31use tokio_stream::wrappers::ReceiverStream;
32use tonic::{Request, Response, Status, Streaming};
33
34use crate::error::{self, Result};
35use crate::handler::{HeartbeatHandlerGroup, Pusher, PusherId};
36use crate::metasrv::{Context, ElectionRef, Metasrv};
37use crate::metrics::METRIC_META_HEARTBEAT_RECV;
38use crate::service::{GrpcResult, GrpcStream};
39
40type HeartbeatResponseResult = std::result::Result<HeartbeatResponse, Status>;
41
42#[async_trait::async_trait]
43trait HeartbeatRequestStream {
44 async fn next(&mut self) -> Option<std::result::Result<HeartbeatRequest, Status>>;
45}
46
47struct TonicHeartbeatRequestStream {
48 inner: Streaming<HeartbeatRequest>,
49}
50
51impl TonicHeartbeatRequestStream {
52 fn new(inner: Streaming<HeartbeatRequest>) -> Self {
53 Self { inner }
54 }
55}
56
57#[async_trait::async_trait]
58impl HeartbeatRequestStream for TonicHeartbeatRequestStream {
59 async fn next(&mut self) -> Option<std::result::Result<HeartbeatRequest, Status>> {
60 self.inner.next().await
61 }
62}
63
64enum LeaderStepDownEvent {
65 StepDown,
66 Closed,
67}
68
69#[async_trait::async_trait]
70trait LeaderStepDown {
71 async fn wait(&mut self) -> LeaderStepDownEvent;
72}
73
74struct ElectionLeaderStepDown {
75 rx: tokio::sync::broadcast::Receiver<LeaderChangeMessage>,
76}
77
78impl ElectionLeaderStepDown {
79 fn new(election: ElectionRef) -> Self {
80 Self {
81 rx: election.subscribe_leader_change(),
82 }
83 }
84}
85
86#[async_trait::async_trait]
87impl LeaderStepDown for ElectionLeaderStepDown {
88 async fn wait(&mut self) -> LeaderStepDownEvent {
89 loop {
90 match self.rx.recv().await {
91 Ok(LeaderChangeMessage::StepDown(_)) => return LeaderStepDownEvent::StepDown,
92 Ok(LeaderChangeMessage::Elected(_)) => {}
93 Err(RecvError::Lagged(skipped)) => {
94 warn!(
95 "Leader step-down watcher lagged, skipped {} leader change events",
96 skipped
97 );
98 }
99 Err(RecvError::Closed) => return LeaderStepDownEvent::Closed,
100 }
101 }
102 }
103}
104
105struct HeartbeatSession<R, L> {
106 requests: R,
107 tx: Sender<HeartbeatResponseResult>,
108 leader_step_down: Option<L>,
109 handler_group: Arc<HeartbeatHandlerGroup>,
110 ctx: Context,
111 sender_id: PusherId,
112}
113
114impl<R, L> HeartbeatSession<R, L>
115where
116 R: HeartbeatRequestStream,
117 L: LeaderStepDown,
118{
119 async fn init(
122 mut requests: R,
123 tx: Sender<HeartbeatResponseResult>,
124 leader_step_down: Option<L>,
125 handler_group: Arc<HeartbeatHandlerGroup>,
126 ctx: Context,
127 ) -> Option<Self> {
128 let msg = requests.next().await?;
129
130 let req = match msg {
131 Ok(req) => req,
132 Err(err) => {
133 error!("Failed to receive the first heartbeat request, error: {err}");
134 let _ = handle_request_stream_error(None, &tx, err).await;
135 return None;
136 }
137 };
138
139 let Some(header) = req.header.as_ref() else {
140 error!("Exit on malformed request: MissingRequestHeader");
141 let _ = tx
142 .send(Err(error::MissingRequestHeaderSnafu {}.build().into()))
143 .await;
144 return None;
145 };
146
147 let sender_id = register_pusher(&handler_group, header, tx.clone()).await;
148 let mut session = Self {
149 requests,
150 tx,
151 leader_step_down,
152 handler_group,
153 ctx,
154 sender_id,
155 };
156
157 if session.handle_request(req, true).await {
158 Some(session)
159 } else {
160 session.cleanup().await;
161 None
162 }
163 }
164
165 async fn run(mut self) {
167 let mut leader_step_down = self.leader_step_down.take();
168
169 loop {
170 tokio::select! {
171 msg = self.requests.next() => {
172 let Some(msg) = msg else {
173 break;
174 };
175
176 if !self.handle_message(msg).await {
177 break;
178 }
179 }
180 event = wait_leader_step_down(leader_step_down.as_mut()), if leader_step_down.is_some() => {
181 match event {
182 LeaderStepDownEvent::StepDown => {
183 self.send_not_leader_error().await;
184 break;
185 }
186 LeaderStepDownEvent::Closed => {
187 warn!("Leader step-down watcher closed");
188 self.send_election_unavailable_error().await;
189 break;
190 }
191 }
192 }
193 }
194 }
195
196 self.cleanup().await;
197 }
198
199 async fn handle_message(&mut self, msg: std::result::Result<HeartbeatRequest, Status>) -> bool {
201 match msg {
202 Ok(req) => self.handle_request(req, false).await,
203 Err(err) => handle_request_stream_error(Some(self.sender_id), &self.tx, err).await,
204 }
205 }
206
207 async fn handle_request(&mut self, req: HeartbeatRequest, is_handshake: bool) -> bool {
209 debug!("Receiving heartbeat request: {:?}", req);
210
211 let sender_id = self.sender_id.to_string();
212 METRIC_META_HEARTBEAT_RECV
213 .with_label_values(&[sender_id.as_str()])
214 .inc();
215
216 let res = self
217 .handler_group
218 .handle(req, self.ctx.clone().with_handshake(is_handshake))
219 .await
220 .inspect_err(
221 |e| warn!(e; "Failed to handle heartbeat request, sender: {}", self.sender_id),
222 )
223 .map_err(|e| e.into());
224
225 let is_not_leader = res.as_ref().is_ok_and(|r| r.is_not_leader());
226
227 debug!("Sending heartbeat response: {:?}", res);
228
229 if self.tx.send(res).await.is_err() {
230 info!(
231 "ReceiverStream was dropped; shutting down, sender: {}",
232 self.sender_id
233 );
234 return false;
235 }
236
237 if is_not_leader {
238 warn!(
239 "Quit because it is no longer the leader, sender: {}",
240 self.sender_id
241 );
242 self.send_not_leader_error().await;
243 return false;
244 }
245
246 true
247 }
248
249 async fn send_not_leader_error(&mut self) {
250 let _ = self
251 .tx
252 .send(Err(Status::aborted(format!(
253 "The requested metasrv node is not leader, node addr: {}",
254 self.ctx.server_addr
255 ))))
256 .await;
257 }
258
259 async fn send_election_unavailable_error(&mut self) {
260 let _ = self
261 .tx
262 .send(Err(Status::unavailable(format!(
263 "The requested metasrv node is shutting down, node addr: {}",
264 self.ctx.server_addr
265 ))))
266 .await;
267 }
268
269 async fn cleanup(&self) {
270 info!("Heartbeat stream closed, sender: {}", self.sender_id);
271 let _ = self.handler_group.deregister_push(self.sender_id).await;
272 }
273}
274
275async fn wait_leader_step_down<L>(leader_step_down: Option<&mut L>) -> LeaderStepDownEvent
276where
277 L: LeaderStepDown,
278{
279 match leader_step_down {
280 Some(leader_step_down) => leader_step_down.wait().await,
281 None => std::future::pending().await,
282 }
283}
284
285async fn handle_request_stream_error(
289 sender_id: Option<PusherId>,
290 tx: &Sender<HeartbeatResponseResult>,
291 err: Status,
292) -> bool {
293 if let Some(io_err) = error::match_for_io_error(&err)
294 && io_err.kind() == ErrorKind::BrokenPipe
295 {
296 error!("Client disconnected: broken pipe, sender: {:?}", sender_id);
297 return false;
298 }
299 error!(err; "Error while receiving heartbeat request, sender: {:?}", sender_id);
300
301 if tx.send(Err(err)).await.is_err() {
302 info!(
303 "Failed to forward heartbeat request stream error; response stream was dropped, sender: {:?}",
304 sender_id
305 );
306 return false;
307 }
308
309 true
310}
311
312#[async_trait::async_trait]
313impl heartbeat_server::Heartbeat for Metasrv {
314 type HeartbeatStream = GrpcStream<HeartbeatResponse>;
315
316 async fn heartbeat(
317 &self,
318 req: Request<Streaming<HeartbeatRequest>>,
319 ) -> GrpcResult<Self::HeartbeatStream> {
320 let (tx, rx) = mpsc::channel(128);
321 let handler_group = self.handler_group().context(error::UnexpectedSnafu {
322 violated: "expected heartbeat handlers",
323 })?;
324
325 let ctx = self.new_ctx();
326 let requests = TonicHeartbeatRequestStream::new(req.into_inner());
327 let _handle = common_runtime::spawn_global(async move {
328 if let Some(session) = HeartbeatSession::init(
329 requests,
330 tx,
331 ctx.election
332 .as_ref()
333 .map(|r| ElectionLeaderStepDown::new(r.clone())),
334 handler_group,
335 ctx,
336 )
337 .await
338 {
339 session.run().await;
340 }
341 });
342
343 let out_stream = ReceiverStream::new(rx);
344
345 Ok(Response::new(Box::pin(out_stream)))
346 }
347
348 async fn ask_leader(&self, req: Request<AskLeaderRequest>) -> GrpcResult<AskLeaderResponse> {
349 let req = req.into_inner();
350 let ctx = self.new_ctx();
351 let res = handle_ask_leader(req, ctx).await?;
352
353 Ok(Response::new(res))
354 }
355}
356
357async fn handle_ask_leader(_req: AskLeaderRequest, ctx: Context) -> Result<AskLeaderResponse> {
358 let addr = match ctx.election {
359 Some(election) => {
360 if election.is_leader() {
361 ctx.server_addr
362 } else {
363 election.leader().await.context(error::KvBackendSnafu)?.0
364 }
365 }
366 None => ctx.server_addr,
367 };
368
369 let leader = Some(Peer {
370 id: 0, addr,
372 });
373
374 let header = Some(ResponseHeader::success());
375 Ok(AskLeaderResponse { header, leader })
376}
377
378fn get_node_id(header: &RequestHeader) -> u64 {
379 static ID: OnceCell<Arc<AtomicU64>> = OnceCell::new();
380
381 fn next_id() -> u64 {
382 let id = ID.get_or_init(|| Arc::new(AtomicU64::new(0))).clone();
383 id.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
384 }
385
386 match header.role() {
387 Role::Frontend => next_id(),
388 Role::Datanode | Role::Flownode => header.member_id,
389 }
390}
391
392async fn register_pusher(
393 handler_group: &HeartbeatHandlerGroup,
394 header: &RequestHeader,
395 sender: Sender<std::result::Result<HeartbeatResponse, tonic::Status>>,
396) -> PusherId {
397 let role = header.role();
398 let id = get_node_id(header);
399 let pusher_id = PusherId::new(role, id);
400 let pusher = Pusher::new(sender);
401 handler_group.register_pusher(pusher_id, pusher).await;
402 pusher_id
403}
404
405#[cfg(test)]
406mod tests {
407 use std::collections::VecDeque;
408 use std::sync::Arc;
409
410 use api::v1::meta::heartbeat_server::Heartbeat;
411 use api::v1::meta::*;
412 use common_meta::kv_backend::memory::MemoryKvBackend;
413 use common_telemetry::tracing_context::W3cTrace;
414 use servers::grpc::GrpcOptions;
415 use tokio::sync::mpsc;
416 use tonic::{Code, IntoRequest};
417
418 use super::*;
419 use crate::handler::test_utils::TestEnv;
420 use crate::metasrv::MetasrvOptions;
421 use crate::metasrv::builder::MetasrvBuilder;
422
423 struct MockHeartbeatRequestStream {
424 messages: VecDeque<std::result::Result<HeartbeatRequest, Status>>,
425 pending_when_empty: bool,
426 }
427
428 impl MockHeartbeatRequestStream {
429 fn new(messages: Vec<std::result::Result<HeartbeatRequest, Status>>) -> Self {
430 Self {
431 messages: messages.into(),
432 pending_when_empty: false,
433 }
434 }
435
436 fn pending_after(messages: Vec<std::result::Result<HeartbeatRequest, Status>>) -> Self {
437 Self {
438 messages: messages.into(),
439 pending_when_empty: true,
440 }
441 }
442 }
443
444 #[async_trait::async_trait]
445 impl HeartbeatRequestStream for MockHeartbeatRequestStream {
446 async fn next(&mut self) -> Option<std::result::Result<HeartbeatRequest, Status>> {
447 if let Some(message) = self.messages.pop_front() {
448 return Some(message);
449 }
450
451 if self.pending_when_empty {
452 std::future::pending().await
453 } else {
454 None
455 }
456 }
457 }
458
459 struct MockLeaderStepDown {
460 event: Option<LeaderStepDownEvent>,
461 }
462
463 impl MockLeaderStepDown {
464 fn new(event: LeaderStepDownEvent) -> Self {
465 Self { event: Some(event) }
466 }
467 }
468
469 #[async_trait::async_trait]
470 impl LeaderStepDown for MockLeaderStepDown {
471 async fn wait(&mut self) -> LeaderStepDownEvent {
472 self.event.take().unwrap()
473 }
474 }
475
476 fn heartbeat_request(role: Role, member_id: u64) -> HeartbeatRequest {
477 HeartbeatRequest {
478 header: Some(RequestHeader {
479 role: role.into(),
480 member_id,
481 ..Default::default()
482 }),
483 ..Default::default()
484 }
485 }
486
487 fn sender_id(role: Role, member_id: u64) -> PusherId {
488 PusherId::new(role, member_id)
489 }
490
491 fn test_context() -> Context {
492 TestEnv::new().ctx()
493 }
494
495 fn test_handler_group() -> Arc<HeartbeatHandlerGroup> {
496 Arc::new(HeartbeatHandlerGroup::default())
497 }
498
499 async fn init_session<L>(
500 requests: MockHeartbeatRequestStream,
501 tx: Sender<HeartbeatResponseResult>,
502 leader_step_down: Option<L>,
503 handler_group: Arc<HeartbeatHandlerGroup>,
504 ) -> Option<HeartbeatSession<MockHeartbeatRequestStream, L>>
505 where
506 L: LeaderStepDown,
507 {
508 HeartbeatSession::init(
509 requests,
510 tx,
511 leader_step_down,
512 handler_group,
513 test_context(),
514 )
515 .await
516 }
517
518 async fn recv_response(
519 rx: &mut mpsc::Receiver<HeartbeatResponseResult>,
520 ) -> HeartbeatResponseResult {
521 rx.recv().await.unwrap()
522 }
523
524 #[tokio::test]
525 async fn test_heartbeat_session_init_returns_none_on_empty_stream() {
526 let (tx, _rx) = mpsc::channel(8);
527 let handler_group = test_handler_group();
528 let requests = MockHeartbeatRequestStream::new(vec![]);
529
530 let session = init_session(
531 requests,
532 tx,
533 None::<MockLeaderStepDown>,
534 handler_group.clone(),
535 )
536 .await;
537
538 assert!(session.is_none());
539 assert!(
540 !handler_group
541 .contains_pusher(&sender_id(Role::Datanode, 42))
542 .await
543 );
544 }
545
546 #[tokio::test]
547 async fn test_heartbeat_session_init_forwards_first_stream_error() {
548 let (tx, mut rx) = mpsc::channel(8);
549 let handler_group = test_handler_group();
550 let requests = MockHeartbeatRequestStream::new(vec![Err(Status::internal("boom"))]);
551
552 let session = init_session(requests, tx, None::<MockLeaderStepDown>, handler_group).await;
553
554 assert!(session.is_none());
555 let status = recv_response(&mut rx).await.unwrap_err();
556 assert_eq!(Code::Internal, status.code());
557 assert_eq!("boom", status.message());
558 }
559
560 #[tokio::test]
561 async fn test_heartbeat_session_init_sends_error_on_missing_header() {
562 let (tx, mut rx) = mpsc::channel(8);
563 let handler_group = test_handler_group();
564 let requests = MockHeartbeatRequestStream::new(vec![Ok(HeartbeatRequest::default())]);
565
566 let session = init_session(
567 requests,
568 tx,
569 None::<MockLeaderStepDown>,
570 handler_group.clone(),
571 )
572 .await;
573
574 assert!(session.is_none());
575 assert!(
576 !handler_group
577 .contains_pusher(&sender_id(Role::Datanode, 42))
578 .await
579 );
580
581 let status = recv_response(&mut rx).await.unwrap_err();
582 assert_eq!(Code::InvalidArgument, status.code());
583 }
584
585 #[tokio::test]
586 async fn test_heartbeat_session_init_registers_sender() {
587 let (tx, mut rx) = mpsc::channel(8);
588 let handler_group = test_handler_group();
589 let sender_id = sender_id(Role::Datanode, 42);
590 let requests =
591 MockHeartbeatRequestStream::new(vec![Ok(heartbeat_request(Role::Datanode, 42))]);
592
593 let session = init_session(
594 requests,
595 tx,
596 None::<MockLeaderStepDown>,
597 handler_group.clone(),
598 )
599 .await;
600
601 assert!(session.is_some());
602 assert!(handler_group.contains_pusher(&sender_id).await);
603
604 let response = recv_response(&mut rx).await.unwrap();
605 assert!(response.heartbeat_config.is_some());
606 }
607
608 #[tokio::test]
609 async fn test_heartbeat_session_run_deregisters_sender_on_stream_close() {
610 let (tx, mut rx) = mpsc::channel(8);
611 let handler_group = test_handler_group();
612 let sender_id = sender_id(Role::Datanode, 42);
613 let requests =
614 MockHeartbeatRequestStream::new(vec![Ok(heartbeat_request(Role::Datanode, 42))]);
615 let session = init_session(
616 requests,
617 tx,
618 None::<MockLeaderStepDown>,
619 handler_group.clone(),
620 )
621 .await
622 .unwrap();
623 let _ = recv_response(&mut rx).await.unwrap();
624
625 session.run().await;
626
627 assert!(!handler_group.contains_pusher(&sender_id).await);
628 }
629
630 #[tokio::test]
631 async fn test_heartbeat_session_run_forwards_stream_error_after_init() {
632 let (tx, mut rx) = mpsc::channel(8);
633 let handler_group = test_handler_group();
634 let sender_id = sender_id(Role::Datanode, 42);
635 let requests = MockHeartbeatRequestStream::new(vec![
636 Ok(heartbeat_request(Role::Datanode, 42)),
637 Err(Status::unavailable("temporary")),
638 ]);
639 let session = init_session(
640 requests,
641 tx,
642 None::<MockLeaderStepDown>,
643 handler_group.clone(),
644 )
645 .await
646 .unwrap();
647 let _ = recv_response(&mut rx).await.unwrap();
648
649 session.run().await;
650
651 let status = recv_response(&mut rx).await.unwrap_err();
652 assert_eq!(Code::Unavailable, status.code());
653 assert_eq!("temporary", status.message());
654 assert!(!handler_group.contains_pusher(&sender_id).await);
655 }
656
657 #[tokio::test]
658 async fn test_heartbeat_session_leader_step_down_sends_aborted_and_deregisters() {
659 let (tx, mut rx) = mpsc::channel(8);
660 let handler_group = test_handler_group();
661 let sender_id = sender_id(Role::Datanode, 42);
662 let requests = MockHeartbeatRequestStream::pending_after(vec![Ok(heartbeat_request(
663 Role::Datanode,
664 42,
665 ))]);
666 let session = init_session(
667 requests,
668 tx,
669 Some(MockLeaderStepDown::new(LeaderStepDownEvent::StepDown)),
670 handler_group.clone(),
671 )
672 .await
673 .unwrap();
674 let _ = recv_response(&mut rx).await.unwrap();
675
676 session.run().await;
677
678 let status = recv_response(&mut rx).await.unwrap_err();
679 assert_eq!(Code::Aborted, status.code());
680 assert!(!handler_group.contains_pusher(&sender_id).await);
681 }
682
683 #[tokio::test]
684 async fn test_heartbeat_session_leader_watcher_closed_sends_unavailable_and_deregisters() {
685 let (tx, mut rx) = mpsc::channel(8);
686 let handler_group = test_handler_group();
687 let sender_id = sender_id(Role::Datanode, 42);
688 let requests = MockHeartbeatRequestStream::pending_after(vec![Ok(heartbeat_request(
689 Role::Datanode,
690 42,
691 ))]);
692 let session = init_session(
693 requests,
694 tx,
695 Some(MockLeaderStepDown::new(LeaderStepDownEvent::Closed)),
696 handler_group.clone(),
697 )
698 .await
699 .unwrap();
700 let _ = recv_response(&mut rx).await.unwrap();
701
702 session.run().await;
703
704 let status = recv_response(&mut rx).await.unwrap_err();
705 assert_eq!(Code::Unavailable, status.code());
706 assert!(!handler_group.contains_pusher(&sender_id).await);
707 }
708
709 #[tokio::test]
710 async fn test_ask_leader() {
711 let kv_backend = Arc::new(MemoryKvBackend::new());
712
713 let metasrv = MetasrvBuilder::new()
714 .kv_backend(kv_backend)
715 .options(MetasrvOptions {
716 grpc: GrpcOptions {
717 server_addr: "127.0.0.1:3002".to_string(),
718 ..Default::default()
719 },
720 ..Default::default()
721 })
722 .build()
723 .await
724 .unwrap();
725
726 let req = AskLeaderRequest {
727 header: Some(RequestHeader::new(1, Role::Datanode, W3cTrace::new())),
728 };
729
730 let res = metasrv.ask_leader(req.into_request()).await.unwrap();
731 let res = res.into_inner();
732 assert_eq!(metasrv.options().grpc.server_addr, res.leader.unwrap().addr);
733 }
734
735 #[test]
736 fn test_get_node_id() {
737 let header = RequestHeader {
738 role: Role::Datanode.into(),
739 member_id: 11,
740 ..Default::default()
741 };
742 assert_eq!(11, get_node_id(&header));
743
744 let header = RequestHeader {
745 role: Role::Frontend.into(),
746 ..Default::default()
747 };
748 for i in 0..10 {
749 assert_eq!(i, get_node_id(&header));
750 }
751
752 let header = RequestHeader {
753 role: Role::Frontend.into(),
754 member_id: 11,
755 ..Default::default()
756 };
757 for i in 10..20 {
758 assert_eq!(i, get_node_id(&header));
759 }
760 }
761}