Skip to main content

meta_srv/service/
heartbeat.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::io::ErrorKind;
16use std::sync::Arc;
17use std::sync::atomic::AtomicU64;
18
19use api::v1::meta::{
20    AskLeaderRequest, AskLeaderResponse, HeartbeatRequest, HeartbeatResponse, Peer, RequestHeader,
21    ResponseHeader, Role, heartbeat_server,
22};
23use common_meta::election::LeaderChangeMessage;
24use common_telemetry::{debug, error, info, warn};
25use futures::StreamExt;
26use once_cell::sync::OnceCell;
27use snafu::{OptionExt, ResultExt};
28use tokio::sync::broadcast::error::RecvError;
29use tokio::sync::mpsc;
30use tokio::sync::mpsc::Sender;
31use tokio_stream::wrappers::ReceiverStream;
32use tonic::{Request, Response, Status, Streaming};
33
34use crate::error::{self, Result};
35use crate::handler::{HeartbeatHandlerGroup, Pusher, PusherId};
36use crate::metasrv::{Context, ElectionRef, Metasrv};
37use crate::metrics::METRIC_META_HEARTBEAT_RECV;
38use crate::service::{GrpcResult, GrpcStream};
39
40type HeartbeatResponseResult = std::result::Result<HeartbeatResponse, Status>;
41
42#[async_trait::async_trait]
43trait HeartbeatRequestStream {
44    async fn next(&mut self) -> Option<std::result::Result<HeartbeatRequest, Status>>;
45}
46
47struct TonicHeartbeatRequestStream {
48    inner: Streaming<HeartbeatRequest>,
49}
50
51impl TonicHeartbeatRequestStream {
52    fn new(inner: Streaming<HeartbeatRequest>) -> Self {
53        Self { inner }
54    }
55}
56
57#[async_trait::async_trait]
58impl HeartbeatRequestStream for TonicHeartbeatRequestStream {
59    async fn next(&mut self) -> Option<std::result::Result<HeartbeatRequest, Status>> {
60        self.inner.next().await
61    }
62}
63
64enum LeaderStepDownEvent {
65    StepDown,
66    Closed,
67}
68
69#[async_trait::async_trait]
70trait LeaderStepDown {
71    async fn wait(&mut self) -> LeaderStepDownEvent;
72}
73
74struct ElectionLeaderStepDown {
75    rx: tokio::sync::broadcast::Receiver<LeaderChangeMessage>,
76}
77
78impl ElectionLeaderStepDown {
79    fn new(election: ElectionRef) -> Self {
80        Self {
81            rx: election.subscribe_leader_change(),
82        }
83    }
84}
85
86#[async_trait::async_trait]
87impl LeaderStepDown for ElectionLeaderStepDown {
88    async fn wait(&mut self) -> LeaderStepDownEvent {
89        loop {
90            match self.rx.recv().await {
91                Ok(LeaderChangeMessage::StepDown(_)) => return LeaderStepDownEvent::StepDown,
92                Ok(LeaderChangeMessage::Elected(_)) => {}
93                Err(RecvError::Lagged(skipped)) => {
94                    warn!(
95                        "Leader step-down watcher lagged, skipped {} leader change events",
96                        skipped
97                    );
98                }
99                Err(RecvError::Closed) => return LeaderStepDownEvent::Closed,
100            }
101        }
102    }
103}
104
105struct HeartbeatSession<R, L> {
106    requests: R,
107    tx: Sender<HeartbeatResponseResult>,
108    leader_step_down: Option<L>,
109    handler_group: Arc<HeartbeatHandlerGroup>,
110    ctx: Context,
111    sender_id: PusherId,
112}
113
114impl<R, L> HeartbeatSession<R, L>
115where
116    R: HeartbeatRequestStream,
117    L: LeaderStepDown,
118{
119    /// Initializes the heartbeat session by receiving the first request,
120    /// and returns `None` if the stream is closed or an error occurs.
121    async fn init(
122        mut requests: R,
123        tx: Sender<HeartbeatResponseResult>,
124        leader_step_down: Option<L>,
125        handler_group: Arc<HeartbeatHandlerGroup>,
126        ctx: Context,
127    ) -> Option<Self> {
128        let msg = requests.next().await?;
129
130        let req = match msg {
131            Ok(req) => req,
132            Err(err) => {
133                error!("Failed to receive the first heartbeat request, error: {err}");
134                let _ = handle_request_stream_error(None, &tx, err).await;
135                return None;
136            }
137        };
138
139        let Some(header) = req.header.as_ref() else {
140            error!("Exit on malformed request: MissingRequestHeader");
141            let _ = tx
142                .send(Err(error::MissingRequestHeaderSnafu {}.build().into()))
143                .await;
144            return None;
145        };
146
147        let sender_id = register_pusher(&handler_group, header, tx.clone()).await;
148        let mut session = Self {
149            requests,
150            tx,
151            leader_step_down,
152            handler_group,
153            ctx,
154            sender_id,
155        };
156
157        if session.handle_request(req, true).await {
158            Some(session)
159        } else {
160            session.cleanup().await;
161            None
162        }
163    }
164
165    /// Runs the heartbeat session until the stream is closed or an error occurs.
166    async fn run(mut self) {
167        let mut leader_step_down = self.leader_step_down.take();
168
169        loop {
170            tokio::select! {
171                msg = self.requests.next() => {
172                    let Some(msg) = msg else {
173                        break;
174                    };
175
176                    if !self.handle_message(msg).await {
177                        break;
178                    }
179                }
180                event = wait_leader_step_down(leader_step_down.as_mut()), if leader_step_down.is_some() => {
181                    match event {
182                        LeaderStepDownEvent::StepDown => {
183                            self.send_not_leader_error().await;
184                            break;
185                        }
186                        LeaderStepDownEvent::Closed => {
187                            warn!("Leader step-down watcher closed");
188                            self.send_election_unavailable_error().await;
189                            break;
190                        }
191                    }
192                }
193            }
194        }
195
196        self.cleanup().await;
197    }
198
199    /// Handles the incoming message, and returns whether to continue the session.
200    async fn handle_message(&mut self, msg: std::result::Result<HeartbeatRequest, Status>) -> bool {
201        match msg {
202            Ok(req) => self.handle_request(req, false).await,
203            Err(err) => handle_request_stream_error(Some(self.sender_id), &self.tx, err).await,
204        }
205    }
206
207    /// Handles the incoming heartbeat request, and returns whether to continue the session.
208    async fn handle_request(&mut self, req: HeartbeatRequest, is_handshake: bool) -> bool {
209        debug!("Receiving heartbeat request: {:?}", req);
210
211        let sender_id = self.sender_id.to_string();
212        METRIC_META_HEARTBEAT_RECV
213            .with_label_values(&[sender_id.as_str()])
214            .inc();
215
216        let res = self
217            .handler_group
218            .handle(req, self.ctx.clone().with_handshake(is_handshake))
219            .await
220            .inspect_err(
221                |e| warn!(e; "Failed to handle heartbeat request, sender: {}", self.sender_id),
222            )
223            .map_err(|e| e.into());
224
225        let is_not_leader = res.as_ref().is_ok_and(|r| r.is_not_leader());
226
227        debug!("Sending heartbeat response: {:?}", res);
228
229        if self.tx.send(res).await.is_err() {
230            info!(
231                "ReceiverStream was dropped; shutting down, sender: {}",
232                self.sender_id
233            );
234            return false;
235        }
236
237        if is_not_leader {
238            warn!(
239                "Quit because it is no longer the leader, sender: {}",
240                self.sender_id
241            );
242            self.send_not_leader_error().await;
243            return false;
244        }
245
246        true
247    }
248
249    async fn send_not_leader_error(&mut self) {
250        let _ = self
251            .tx
252            .send(Err(Status::aborted(format!(
253                "The requested metasrv node is not leader, node addr: {}",
254                self.ctx.server_addr
255            ))))
256            .await;
257    }
258
259    async fn send_election_unavailable_error(&mut self) {
260        let _ = self
261            .tx
262            .send(Err(Status::unavailable(format!(
263                "The requested metasrv node is shutting down, node addr: {}",
264                self.ctx.server_addr
265            ))))
266            .await;
267    }
268
269    async fn cleanup(&self) {
270        info!("Heartbeat stream closed, sender: {}", self.sender_id);
271        let _ = self.handler_group.deregister_push(self.sender_id).await;
272    }
273}
274
275async fn wait_leader_step_down<L>(leader_step_down: Option<&mut L>) -> LeaderStepDownEvent
276where
277    L: LeaderStepDown,
278{
279    match leader_step_down {
280        Some(leader_step_down) => leader_step_down.wait().await,
281        None => std::future::pending().await,
282    }
283}
284
285/// Handles request stream error by logging and forwarding the error to the client if possible.
286///
287/// Returns `false` if the stream should be terminated.
288async fn handle_request_stream_error(
289    sender_id: Option<PusherId>,
290    tx: &Sender<HeartbeatResponseResult>,
291    err: Status,
292) -> bool {
293    if let Some(io_err) = error::match_for_io_error(&err)
294        && io_err.kind() == ErrorKind::BrokenPipe
295    {
296        error!("Client disconnected: broken pipe, sender: {:?}", sender_id);
297        return false;
298    }
299    error!(err; "Error while receiving heartbeat request, sender: {:?}", sender_id);
300
301    if tx.send(Err(err)).await.is_err() {
302        info!(
303            "Failed to forward heartbeat request stream error; response stream was dropped, sender: {:?}",
304            sender_id
305        );
306        return false;
307    }
308
309    true
310}
311
312#[async_trait::async_trait]
313impl heartbeat_server::Heartbeat for Metasrv {
314    type HeartbeatStream = GrpcStream<HeartbeatResponse>;
315
316    async fn heartbeat(
317        &self,
318        req: Request<Streaming<HeartbeatRequest>>,
319    ) -> GrpcResult<Self::HeartbeatStream> {
320        let (tx, rx) = mpsc::channel(128);
321        let handler_group = self.handler_group().context(error::UnexpectedSnafu {
322            violated: "expected heartbeat handlers",
323        })?;
324
325        let ctx = self.new_ctx();
326        let requests = TonicHeartbeatRequestStream::new(req.into_inner());
327        let _handle = common_runtime::spawn_global(async move {
328            if let Some(session) = HeartbeatSession::init(
329                requests,
330                tx,
331                ctx.election
332                    .as_ref()
333                    .map(|r| ElectionLeaderStepDown::new(r.clone())),
334                handler_group,
335                ctx,
336            )
337            .await
338            {
339                session.run().await;
340            }
341        });
342
343        let out_stream = ReceiverStream::new(rx);
344
345        Ok(Response::new(Box::pin(out_stream)))
346    }
347
348    async fn ask_leader(&self, req: Request<AskLeaderRequest>) -> GrpcResult<AskLeaderResponse> {
349        let req = req.into_inner();
350        let ctx = self.new_ctx();
351        let res = handle_ask_leader(req, ctx).await?;
352
353        Ok(Response::new(res))
354    }
355}
356
357async fn handle_ask_leader(_req: AskLeaderRequest, ctx: Context) -> Result<AskLeaderResponse> {
358    let addr = match ctx.election {
359        Some(election) => {
360            if election.is_leader() {
361                ctx.server_addr
362            } else {
363                election.leader().await.context(error::KvBackendSnafu)?.0
364            }
365        }
366        None => ctx.server_addr,
367    };
368
369    let leader = Some(Peer {
370        id: 0, // TODO(jiachun): meta node should have a Id
371        addr,
372    });
373
374    let header = Some(ResponseHeader::success());
375    Ok(AskLeaderResponse { header, leader })
376}
377
378fn get_node_id(header: &RequestHeader) -> u64 {
379    static ID: OnceCell<Arc<AtomicU64>> = OnceCell::new();
380
381    fn next_id() -> u64 {
382        let id = ID.get_or_init(|| Arc::new(AtomicU64::new(0))).clone();
383        id.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
384    }
385
386    match header.role() {
387        Role::Frontend => next_id(),
388        Role::Datanode | Role::Flownode => header.member_id,
389    }
390}
391
392async fn register_pusher(
393    handler_group: &HeartbeatHandlerGroup,
394    header: &RequestHeader,
395    sender: Sender<std::result::Result<HeartbeatResponse, tonic::Status>>,
396) -> PusherId {
397    let role = header.role();
398    let id = get_node_id(header);
399    let pusher_id = PusherId::new(role, id);
400    let pusher = Pusher::new(sender);
401    handler_group.register_pusher(pusher_id, pusher).await;
402    pusher_id
403}
404
405#[cfg(test)]
406mod tests {
407    use std::collections::VecDeque;
408    use std::sync::Arc;
409
410    use api::v1::meta::heartbeat_server::Heartbeat;
411    use api::v1::meta::*;
412    use common_meta::kv_backend::memory::MemoryKvBackend;
413    use common_telemetry::tracing_context::W3cTrace;
414    use servers::grpc::GrpcOptions;
415    use tokio::sync::mpsc;
416    use tonic::{Code, IntoRequest};
417
418    use super::*;
419    use crate::handler::test_utils::TestEnv;
420    use crate::metasrv::MetasrvOptions;
421    use crate::metasrv::builder::MetasrvBuilder;
422
423    struct MockHeartbeatRequestStream {
424        messages: VecDeque<std::result::Result<HeartbeatRequest, Status>>,
425        pending_when_empty: bool,
426    }
427
428    impl MockHeartbeatRequestStream {
429        fn new(messages: Vec<std::result::Result<HeartbeatRequest, Status>>) -> Self {
430            Self {
431                messages: messages.into(),
432                pending_when_empty: false,
433            }
434        }
435
436        fn pending_after(messages: Vec<std::result::Result<HeartbeatRequest, Status>>) -> Self {
437            Self {
438                messages: messages.into(),
439                pending_when_empty: true,
440            }
441        }
442    }
443
444    #[async_trait::async_trait]
445    impl HeartbeatRequestStream for MockHeartbeatRequestStream {
446        async fn next(&mut self) -> Option<std::result::Result<HeartbeatRequest, Status>> {
447            if let Some(message) = self.messages.pop_front() {
448                return Some(message);
449            }
450
451            if self.pending_when_empty {
452                std::future::pending().await
453            } else {
454                None
455            }
456        }
457    }
458
459    struct MockLeaderStepDown {
460        event: Option<LeaderStepDownEvent>,
461    }
462
463    impl MockLeaderStepDown {
464        fn new(event: LeaderStepDownEvent) -> Self {
465            Self { event: Some(event) }
466        }
467    }
468
469    #[async_trait::async_trait]
470    impl LeaderStepDown for MockLeaderStepDown {
471        async fn wait(&mut self) -> LeaderStepDownEvent {
472            self.event.take().unwrap()
473        }
474    }
475
476    fn heartbeat_request(role: Role, member_id: u64) -> HeartbeatRequest {
477        HeartbeatRequest {
478            header: Some(RequestHeader {
479                role: role.into(),
480                member_id,
481                ..Default::default()
482            }),
483            ..Default::default()
484        }
485    }
486
487    fn sender_id(role: Role, member_id: u64) -> PusherId {
488        PusherId::new(role, member_id)
489    }
490
491    fn test_context() -> Context {
492        TestEnv::new().ctx()
493    }
494
495    fn test_handler_group() -> Arc<HeartbeatHandlerGroup> {
496        Arc::new(HeartbeatHandlerGroup::default())
497    }
498
499    async fn init_session<L>(
500        requests: MockHeartbeatRequestStream,
501        tx: Sender<HeartbeatResponseResult>,
502        leader_step_down: Option<L>,
503        handler_group: Arc<HeartbeatHandlerGroup>,
504    ) -> Option<HeartbeatSession<MockHeartbeatRequestStream, L>>
505    where
506        L: LeaderStepDown,
507    {
508        HeartbeatSession::init(
509            requests,
510            tx,
511            leader_step_down,
512            handler_group,
513            test_context(),
514        )
515        .await
516    }
517
518    async fn recv_response(
519        rx: &mut mpsc::Receiver<HeartbeatResponseResult>,
520    ) -> HeartbeatResponseResult {
521        rx.recv().await.unwrap()
522    }
523
524    #[tokio::test]
525    async fn test_heartbeat_session_init_returns_none_on_empty_stream() {
526        let (tx, _rx) = mpsc::channel(8);
527        let handler_group = test_handler_group();
528        let requests = MockHeartbeatRequestStream::new(vec![]);
529
530        let session = init_session(
531            requests,
532            tx,
533            None::<MockLeaderStepDown>,
534            handler_group.clone(),
535        )
536        .await;
537
538        assert!(session.is_none());
539        assert!(
540            !handler_group
541                .contains_pusher(&sender_id(Role::Datanode, 42))
542                .await
543        );
544    }
545
546    #[tokio::test]
547    async fn test_heartbeat_session_init_forwards_first_stream_error() {
548        let (tx, mut rx) = mpsc::channel(8);
549        let handler_group = test_handler_group();
550        let requests = MockHeartbeatRequestStream::new(vec![Err(Status::internal("boom"))]);
551
552        let session = init_session(requests, tx, None::<MockLeaderStepDown>, handler_group).await;
553
554        assert!(session.is_none());
555        let status = recv_response(&mut rx).await.unwrap_err();
556        assert_eq!(Code::Internal, status.code());
557        assert_eq!("boom", status.message());
558    }
559
560    #[tokio::test]
561    async fn test_heartbeat_session_init_sends_error_on_missing_header() {
562        let (tx, mut rx) = mpsc::channel(8);
563        let handler_group = test_handler_group();
564        let requests = MockHeartbeatRequestStream::new(vec![Ok(HeartbeatRequest::default())]);
565
566        let session = init_session(
567            requests,
568            tx,
569            None::<MockLeaderStepDown>,
570            handler_group.clone(),
571        )
572        .await;
573
574        assert!(session.is_none());
575        assert!(
576            !handler_group
577                .contains_pusher(&sender_id(Role::Datanode, 42))
578                .await
579        );
580
581        let status = recv_response(&mut rx).await.unwrap_err();
582        assert_eq!(Code::InvalidArgument, status.code());
583    }
584
585    #[tokio::test]
586    async fn test_heartbeat_session_init_registers_sender() {
587        let (tx, mut rx) = mpsc::channel(8);
588        let handler_group = test_handler_group();
589        let sender_id = sender_id(Role::Datanode, 42);
590        let requests =
591            MockHeartbeatRequestStream::new(vec![Ok(heartbeat_request(Role::Datanode, 42))]);
592
593        let session = init_session(
594            requests,
595            tx,
596            None::<MockLeaderStepDown>,
597            handler_group.clone(),
598        )
599        .await;
600
601        assert!(session.is_some());
602        assert!(handler_group.contains_pusher(&sender_id).await);
603
604        let response = recv_response(&mut rx).await.unwrap();
605        assert!(response.heartbeat_config.is_some());
606    }
607
608    #[tokio::test]
609    async fn test_heartbeat_session_run_deregisters_sender_on_stream_close() {
610        let (tx, mut rx) = mpsc::channel(8);
611        let handler_group = test_handler_group();
612        let sender_id = sender_id(Role::Datanode, 42);
613        let requests =
614            MockHeartbeatRequestStream::new(vec![Ok(heartbeat_request(Role::Datanode, 42))]);
615        let session = init_session(
616            requests,
617            tx,
618            None::<MockLeaderStepDown>,
619            handler_group.clone(),
620        )
621        .await
622        .unwrap();
623        let _ = recv_response(&mut rx).await.unwrap();
624
625        session.run().await;
626
627        assert!(!handler_group.contains_pusher(&sender_id).await);
628    }
629
630    #[tokio::test]
631    async fn test_heartbeat_session_run_forwards_stream_error_after_init() {
632        let (tx, mut rx) = mpsc::channel(8);
633        let handler_group = test_handler_group();
634        let sender_id = sender_id(Role::Datanode, 42);
635        let requests = MockHeartbeatRequestStream::new(vec![
636            Ok(heartbeat_request(Role::Datanode, 42)),
637            Err(Status::unavailable("temporary")),
638        ]);
639        let session = init_session(
640            requests,
641            tx,
642            None::<MockLeaderStepDown>,
643            handler_group.clone(),
644        )
645        .await
646        .unwrap();
647        let _ = recv_response(&mut rx).await.unwrap();
648
649        session.run().await;
650
651        let status = recv_response(&mut rx).await.unwrap_err();
652        assert_eq!(Code::Unavailable, status.code());
653        assert_eq!("temporary", status.message());
654        assert!(!handler_group.contains_pusher(&sender_id).await);
655    }
656
657    #[tokio::test]
658    async fn test_heartbeat_session_leader_step_down_sends_aborted_and_deregisters() {
659        let (tx, mut rx) = mpsc::channel(8);
660        let handler_group = test_handler_group();
661        let sender_id = sender_id(Role::Datanode, 42);
662        let requests = MockHeartbeatRequestStream::pending_after(vec![Ok(heartbeat_request(
663            Role::Datanode,
664            42,
665        ))]);
666        let session = init_session(
667            requests,
668            tx,
669            Some(MockLeaderStepDown::new(LeaderStepDownEvent::StepDown)),
670            handler_group.clone(),
671        )
672        .await
673        .unwrap();
674        let _ = recv_response(&mut rx).await.unwrap();
675
676        session.run().await;
677
678        let status = recv_response(&mut rx).await.unwrap_err();
679        assert_eq!(Code::Aborted, status.code());
680        assert!(!handler_group.contains_pusher(&sender_id).await);
681    }
682
683    #[tokio::test]
684    async fn test_heartbeat_session_leader_watcher_closed_sends_unavailable_and_deregisters() {
685        let (tx, mut rx) = mpsc::channel(8);
686        let handler_group = test_handler_group();
687        let sender_id = sender_id(Role::Datanode, 42);
688        let requests = MockHeartbeatRequestStream::pending_after(vec![Ok(heartbeat_request(
689            Role::Datanode,
690            42,
691        ))]);
692        let session = init_session(
693            requests,
694            tx,
695            Some(MockLeaderStepDown::new(LeaderStepDownEvent::Closed)),
696            handler_group.clone(),
697        )
698        .await
699        .unwrap();
700        let _ = recv_response(&mut rx).await.unwrap();
701
702        session.run().await;
703
704        let status = recv_response(&mut rx).await.unwrap_err();
705        assert_eq!(Code::Unavailable, status.code());
706        assert!(!handler_group.contains_pusher(&sender_id).await);
707    }
708
709    #[tokio::test]
710    async fn test_ask_leader() {
711        let kv_backend = Arc::new(MemoryKvBackend::new());
712
713        let metasrv = MetasrvBuilder::new()
714            .kv_backend(kv_backend)
715            .options(MetasrvOptions {
716                grpc: GrpcOptions {
717                    server_addr: "127.0.0.1:3002".to_string(),
718                    ..Default::default()
719                },
720                ..Default::default()
721            })
722            .build()
723            .await
724            .unwrap();
725
726        let req = AskLeaderRequest {
727            header: Some(RequestHeader::new(1, Role::Datanode, W3cTrace::new())),
728        };
729
730        let res = metasrv.ask_leader(req.into_request()).await.unwrap();
731        let res = res.into_inner();
732        assert_eq!(metasrv.options().grpc.server_addr, res.leader.unwrap().addr);
733    }
734
735    #[test]
736    fn test_get_node_id() {
737        let header = RequestHeader {
738            role: Role::Datanode.into(),
739            member_id: 11,
740            ..Default::default()
741        };
742        assert_eq!(11, get_node_id(&header));
743
744        let header = RequestHeader {
745            role: Role::Frontend.into(),
746            ..Default::default()
747        };
748        for i in 0..10 {
749            assert_eq!(i, get_node_id(&header));
750        }
751
752        let header = RequestHeader {
753            role: Role::Frontend.into(),
754            member_id: 11,
755            ..Default::default()
756        };
757        for i in 10..20 {
758            assert_eq!(i, get_node_id(&header));
759        }
760    }
761}