meta_srv/service/
heartbeat.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::io::ErrorKind;
16use std::sync::Arc;
17use std::sync::atomic::AtomicU64;
18
19use api::v1::meta::{
20    AskLeaderRequest, AskLeaderResponse, HeartbeatRequest, HeartbeatResponse, Peer, RequestHeader,
21    ResponseHeader, Role, heartbeat_server,
22};
23use common_telemetry::{debug, error, info, warn};
24use futures::StreamExt;
25use once_cell::sync::OnceCell;
26use snafu::OptionExt;
27use tokio::sync::mpsc;
28use tokio::sync::mpsc::Sender;
29use tokio_stream::wrappers::ReceiverStream;
30use tonic::{Request, Response, Status, Streaming};
31
32use crate::error::{self, Result};
33use crate::handler::{HeartbeatHandlerGroup, Pusher, PusherId};
34use crate::metasrv::{Context, Metasrv};
35use crate::metrics::METRIC_META_HEARTBEAT_RECV;
36use crate::service::{GrpcResult, GrpcStream};
37
38#[async_trait::async_trait]
39impl heartbeat_server::Heartbeat for Metasrv {
40    type HeartbeatStream = GrpcStream<HeartbeatResponse>;
41
42    async fn heartbeat(
43        &self,
44        req: Request<Streaming<HeartbeatRequest>>,
45    ) -> GrpcResult<Self::HeartbeatStream> {
46        let mut in_stream = req.into_inner();
47        let (tx, rx) = mpsc::channel(128);
48        let handler_group = self.handler_group().context(error::UnexpectedSnafu {
49            violated: "expected heartbeat handlers",
50        })?;
51
52        let ctx = self.new_ctx();
53        let _handle = common_runtime::spawn_global(async move {
54            let mut pusher_id = None;
55            while let Some(msg) = in_stream.next().await {
56                let mut is_not_leader = false;
57                match msg {
58                    Ok(req) => {
59                        debug!("Receiving heartbeat request: {:?}", req);
60
61                        let Some(header) = req.header.as_ref() else {
62                            error!("Exit on malformed request: MissingRequestHeader");
63                            let _ = tx
64                                .send(Err(error::MissingRequestHeaderSnafu {}.build().into()))
65                                .await;
66                            break;
67                        };
68
69                        if pusher_id.is_none() {
70                            pusher_id =
71                                Some(register_pusher(&handler_group, header, tx.clone()).await);
72                        }
73                        if let Some(k) = &pusher_id {
74                            METRIC_META_HEARTBEAT_RECV.with_label_values(&[&k.to_string()]);
75                        } else {
76                            METRIC_META_HEARTBEAT_RECV.with_label_values(&["none"]);
77                        }
78
79                        let res = handler_group
80                            .handle(req, ctx.clone())
81                            .await
82                            .inspect_err(|e| warn!(e; "Failed to handle heartbeat request, pusher: {pusher_id:?}", ))
83                            .map_err(|e| e.into());
84
85                        is_not_leader = res.as_ref().is_ok_and(|r| r.is_not_leader());
86
87                        debug!("Sending heartbeat response: {:?}", res);
88
89                        if tx.send(res).await.is_err() {
90                            info!("ReceiverStream was dropped; shutting down");
91                            break;
92                        }
93                    }
94                    Err(err) => {
95                        if let Some(io_err) = error::match_for_io_error(&err)
96                            && io_err.kind() == ErrorKind::BrokenPipe
97                        {
98                            // client disconnected in unexpected way
99                            error!("Client disconnected: broken pipe");
100                            break;
101                        }
102                        error!(err; "Sending heartbeat response error");
103
104                        if tx.send(Err(err)).await.is_err() {
105                            info!("ReceiverStream was dropped; shutting down");
106                            break;
107                        }
108                    }
109                }
110
111                if is_not_leader {
112                    warn!("Quit because it is no longer the leader");
113                    let _ = tx
114                        .send(Err(Status::aborted(format!(
115                            "The requested metasrv node is not leader, node addr: {}",
116                            ctx.server_addr
117                        ))))
118                        .await;
119                    break;
120                }
121            }
122
123            info!("Heartbeat stream closed: {pusher_id:?}");
124
125            if let Some(pusher_id) = pusher_id {
126                let _ = handler_group.deregister_push(pusher_id).await;
127            }
128        });
129
130        let out_stream = ReceiverStream::new(rx);
131
132        Ok(Response::new(Box::pin(out_stream)))
133    }
134
135    async fn ask_leader(&self, req: Request<AskLeaderRequest>) -> GrpcResult<AskLeaderResponse> {
136        let req = req.into_inner();
137        let ctx = self.new_ctx();
138        let res = handle_ask_leader(req, ctx).await?;
139
140        Ok(Response::new(res))
141    }
142}
143
144async fn handle_ask_leader(_req: AskLeaderRequest, ctx: Context) -> Result<AskLeaderResponse> {
145    let addr = match ctx.election {
146        Some(election) => {
147            if election.is_leader() {
148                ctx.server_addr
149            } else {
150                election.leader().await?.0
151            }
152        }
153        None => ctx.server_addr,
154    };
155
156    let leader = Some(Peer {
157        id: 0, // TODO(jiachun): meta node should have a Id
158        addr,
159    });
160
161    let header = Some(ResponseHeader::success());
162    Ok(AskLeaderResponse { header, leader })
163}
164
165fn get_node_id(header: &RequestHeader) -> u64 {
166    static ID: OnceCell<Arc<AtomicU64>> = OnceCell::new();
167
168    fn next_id() -> u64 {
169        let id = ID.get_or_init(|| Arc::new(AtomicU64::new(0))).clone();
170        id.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
171    }
172
173    match header.role() {
174        Role::Frontend => next_id(),
175        Role::Datanode | Role::Flownode => header.member_id,
176    }
177}
178
179async fn register_pusher(
180    handler_group: &HeartbeatHandlerGroup,
181    header: &RequestHeader,
182    sender: Sender<std::result::Result<HeartbeatResponse, tonic::Status>>,
183) -> PusherId {
184    let role = header.role();
185    let id = get_node_id(header);
186    let pusher_id = PusherId::new(role, id);
187    let pusher = Pusher::new(sender);
188    handler_group.register_pusher(pusher_id, pusher).await;
189    pusher_id
190}
191
192#[cfg(test)]
193mod tests {
194    use std::sync::Arc;
195
196    use api::v1::meta::heartbeat_server::Heartbeat;
197    use api::v1::meta::*;
198    use common_meta::kv_backend::memory::MemoryKvBackend;
199    use common_telemetry::tracing_context::W3cTrace;
200    use servers::grpc::GrpcOptions;
201    use tonic::IntoRequest;
202
203    use super::get_node_id;
204    use crate::metasrv::MetasrvOptions;
205    use crate::metasrv::builder::MetasrvBuilder;
206
207    #[tokio::test]
208    async fn test_ask_leader() {
209        let kv_backend = Arc::new(MemoryKvBackend::new());
210
211        let metasrv = MetasrvBuilder::new()
212            .kv_backend(kv_backend)
213            .options(MetasrvOptions {
214                grpc: GrpcOptions {
215                    server_addr: "127.0.0.1:3002".to_string(),
216                    ..Default::default()
217                },
218                ..Default::default()
219            })
220            .build()
221            .await
222            .unwrap();
223
224        let req = AskLeaderRequest {
225            header: Some(RequestHeader::new(1, Role::Datanode, W3cTrace::new())),
226        };
227
228        let res = metasrv.ask_leader(req.into_request()).await.unwrap();
229        let res = res.into_inner();
230        assert_eq!(metasrv.options().grpc.server_addr, res.leader.unwrap().addr);
231    }
232
233    #[test]
234    fn test_get_node_id() {
235        let header = RequestHeader {
236            role: Role::Datanode.into(),
237            member_id: 11,
238            ..Default::default()
239        };
240        assert_eq!(11, get_node_id(&header));
241
242        let header = RequestHeader {
243            role: Role::Frontend.into(),
244            ..Default::default()
245        };
246        for i in 0..10 {
247            assert_eq!(i, get_node_id(&header));
248        }
249
250        let header = RequestHeader {
251            role: Role::Frontend.into(),
252            member_id: 11,
253            ..Default::default()
254        };
255        for i in 10..20 {
256            assert_eq!(i, get_node_id(&header));
257        }
258    }
259}