meta_srv/service/
heartbeat.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::io::ErrorKind;
16use std::sync::Arc;
17use std::sync::atomic::AtomicU64;
18
19use api::v1::meta::{
20    AskLeaderRequest, AskLeaderResponse, HeartbeatRequest, HeartbeatResponse, Peer, RequestHeader,
21    ResponseHeader, Role, heartbeat_server,
22};
23use common_telemetry::{debug, error, info, warn};
24use futures::StreamExt;
25use once_cell::sync::OnceCell;
26use snafu::OptionExt;
27use tokio::sync::mpsc;
28use tokio::sync::mpsc::Sender;
29use tokio_stream::wrappers::ReceiverStream;
30use tonic::{Request, Response, Status, Streaming};
31
32use crate::error::{self, Result};
33use crate::handler::{HeartbeatHandlerGroup, Pusher, PusherId};
34use crate::metasrv::{Context, Metasrv};
35use crate::metrics::METRIC_META_HEARTBEAT_RECV;
36use crate::service::{GrpcResult, GrpcStream};
37
38#[async_trait::async_trait]
39impl heartbeat_server::Heartbeat for Metasrv {
40    type HeartbeatStream = GrpcStream<HeartbeatResponse>;
41
42    async fn heartbeat(
43        &self,
44        req: Request<Streaming<HeartbeatRequest>>,
45    ) -> GrpcResult<Self::HeartbeatStream> {
46        let mut in_stream = req.into_inner();
47        let (tx, rx) = mpsc::channel(128);
48        let handler_group = self.handler_group().context(error::UnexpectedSnafu {
49            violated: "expected heartbeat handlers",
50        })?;
51
52        let ctx = self.new_ctx();
53        let _handle = common_runtime::spawn_global(async move {
54            let mut pusher_id = None;
55            while let Some(msg) = in_stream.next().await {
56                let mut is_not_leader = false;
57                match msg {
58                    Ok(req) => {
59                        debug!("Receiving heartbeat request: {:?}", req);
60
61                        let Some(header) = req.header.as_ref() else {
62                            error!("Exit on malformed request: MissingRequestHeader");
63                            let _ = tx
64                                .send(Err(error::MissingRequestHeaderSnafu {}.build().into()))
65                                .await;
66                            break;
67                        };
68
69                        let is_handshake = pusher_id.is_none();
70                        if is_handshake {
71                            pusher_id =
72                                Some(register_pusher(&handler_group, header, tx.clone()).await);
73                        }
74                        if let Some(k) = &pusher_id {
75                            METRIC_META_HEARTBEAT_RECV.with_label_values(&[&k.to_string()]);
76                        } else {
77                            METRIC_META_HEARTBEAT_RECV.with_label_values(&["none"]);
78                        }
79
80                        let res = handler_group
81                            .handle(req, ctx.clone().with_handshake(is_handshake))
82                            .await
83                            .inspect_err(|e| warn!(e; "Failed to handle heartbeat request, pusher: {pusher_id:?}", ))
84                            .map_err(|e| e.into());
85
86                        is_not_leader = res.as_ref().is_ok_and(|r| r.is_not_leader());
87
88                        debug!("Sending heartbeat response: {:?}", res);
89
90                        if tx.send(res).await.is_err() {
91                            info!("ReceiverStream was dropped; shutting down");
92                            break;
93                        }
94                    }
95                    Err(err) => {
96                        if let Some(io_err) = error::match_for_io_error(&err)
97                            && io_err.kind() == ErrorKind::BrokenPipe
98                        {
99                            // client disconnected in unexpected way
100                            error!("Client disconnected: broken pipe");
101                            break;
102                        }
103                        error!(err; "Sending heartbeat response error");
104
105                        if tx.send(Err(err)).await.is_err() {
106                            info!("ReceiverStream was dropped; shutting down");
107                            break;
108                        }
109                    }
110                }
111
112                if is_not_leader {
113                    warn!("Quit because it is no longer the leader");
114                    let _ = tx
115                        .send(Err(Status::aborted(format!(
116                            "The requested metasrv node is not leader, node addr: {}",
117                            ctx.server_addr
118                        ))))
119                        .await;
120                    break;
121                }
122            }
123
124            info!("Heartbeat stream closed: {pusher_id:?}");
125
126            if let Some(pusher_id) = pusher_id {
127                let _ = handler_group.deregister_push(pusher_id).await;
128            }
129        });
130
131        let out_stream = ReceiverStream::new(rx);
132
133        Ok(Response::new(Box::pin(out_stream)))
134    }
135
136    async fn ask_leader(&self, req: Request<AskLeaderRequest>) -> GrpcResult<AskLeaderResponse> {
137        let req = req.into_inner();
138        let ctx = self.new_ctx();
139        let res = handle_ask_leader(req, ctx).await?;
140
141        Ok(Response::new(res))
142    }
143}
144
145async fn handle_ask_leader(_req: AskLeaderRequest, ctx: Context) -> Result<AskLeaderResponse> {
146    let addr = match ctx.election {
147        Some(election) => {
148            if election.is_leader() {
149                ctx.server_addr
150            } else {
151                election.leader().await?.0
152            }
153        }
154        None => ctx.server_addr,
155    };
156
157    let leader = Some(Peer {
158        id: 0, // TODO(jiachun): meta node should have a Id
159        addr,
160    });
161
162    let header = Some(ResponseHeader::success());
163    Ok(AskLeaderResponse { header, leader })
164}
165
166fn get_node_id(header: &RequestHeader) -> u64 {
167    static ID: OnceCell<Arc<AtomicU64>> = OnceCell::new();
168
169    fn next_id() -> u64 {
170        let id = ID.get_or_init(|| Arc::new(AtomicU64::new(0))).clone();
171        id.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
172    }
173
174    match header.role() {
175        Role::Frontend => next_id(),
176        Role::Datanode | Role::Flownode => header.member_id,
177    }
178}
179
180async fn register_pusher(
181    handler_group: &HeartbeatHandlerGroup,
182    header: &RequestHeader,
183    sender: Sender<std::result::Result<HeartbeatResponse, tonic::Status>>,
184) -> PusherId {
185    let role = header.role();
186    let id = get_node_id(header);
187    let pusher_id = PusherId::new(role, id);
188    let pusher = Pusher::new(sender);
189    handler_group.register_pusher(pusher_id, pusher).await;
190    pusher_id
191}
192
193#[cfg(test)]
194mod tests {
195    use std::sync::Arc;
196
197    use api::v1::meta::heartbeat_server::Heartbeat;
198    use api::v1::meta::*;
199    use common_meta::kv_backend::memory::MemoryKvBackend;
200    use common_telemetry::tracing_context::W3cTrace;
201    use servers::grpc::GrpcOptions;
202    use tonic::IntoRequest;
203
204    use super::get_node_id;
205    use crate::metasrv::MetasrvOptions;
206    use crate::metasrv::builder::MetasrvBuilder;
207
208    #[tokio::test]
209    async fn test_ask_leader() {
210        let kv_backend = Arc::new(MemoryKvBackend::new());
211
212        let metasrv = MetasrvBuilder::new()
213            .kv_backend(kv_backend)
214            .options(MetasrvOptions {
215                grpc: GrpcOptions {
216                    server_addr: "127.0.0.1:3002".to_string(),
217                    ..Default::default()
218                },
219                ..Default::default()
220            })
221            .build()
222            .await
223            .unwrap();
224
225        let req = AskLeaderRequest {
226            header: Some(RequestHeader::new(1, Role::Datanode, W3cTrace::new())),
227        };
228
229        let res = metasrv.ask_leader(req.into_request()).await.unwrap();
230        let res = res.into_inner();
231        assert_eq!(metasrv.options().grpc.server_addr, res.leader.unwrap().addr);
232    }
233
234    #[test]
235    fn test_get_node_id() {
236        let header = RequestHeader {
237            role: Role::Datanode.into(),
238            member_id: 11,
239            ..Default::default()
240        };
241        assert_eq!(11, get_node_id(&header));
242
243        let header = RequestHeader {
244            role: Role::Frontend.into(),
245            ..Default::default()
246        };
247        for i in 0..10 {
248            assert_eq!(i, get_node_id(&header));
249        }
250
251        let header = RequestHeader {
252            role: Role::Frontend.into(),
253            member_id: 11,
254            ..Default::default()
255        };
256        for i in 10..20 {
257            assert_eq!(i, get_node_id(&header));
258        }
259    }
260}