meta_srv/service/
heartbeat.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::io::ErrorKind;
16use std::sync::atomic::AtomicU64;
17use std::sync::Arc;
18
19use api::v1::meta::{
20    heartbeat_server, AskLeaderRequest, AskLeaderResponse, HeartbeatRequest, HeartbeatResponse,
21    Peer, RequestHeader, ResponseHeader, Role,
22};
23use common_telemetry::{debug, error, info, warn};
24use futures::StreamExt;
25use once_cell::sync::OnceCell;
26use snafu::OptionExt;
27use tokio::sync::mpsc;
28use tokio::sync::mpsc::Sender;
29use tokio_stream::wrappers::ReceiverStream;
30use tonic::{Request, Response, Status, Streaming};
31
32use crate::error::{self, Result};
33use crate::handler::{HeartbeatHandlerGroup, Pusher, PusherId};
34use crate::metasrv::{Context, Metasrv};
35use crate::metrics::METRIC_META_HEARTBEAT_RECV;
36use crate::service::{GrpcResult, GrpcStream};
37
38#[async_trait::async_trait]
39impl heartbeat_server::Heartbeat for Metasrv {
40    type HeartbeatStream = GrpcStream<HeartbeatResponse>;
41
42    async fn heartbeat(
43        &self,
44        req: Request<Streaming<HeartbeatRequest>>,
45    ) -> GrpcResult<Self::HeartbeatStream> {
46        let mut in_stream = req.into_inner();
47        let (tx, rx) = mpsc::channel(128);
48        let handler_group = self.handler_group().context(error::UnexpectedSnafu {
49            violated: "expected heartbeat handlers",
50        })?;
51
52        let ctx = self.new_ctx();
53        let _handle = common_runtime::spawn_global(async move {
54            let mut pusher_id = None;
55            while let Some(msg) = in_stream.next().await {
56                let mut is_not_leader = false;
57                match msg {
58                    Ok(req) => {
59                        debug!("Receiving heartbeat request: {:?}", req);
60
61                        let Some(header) = req.header.as_ref() else {
62                            error!("Exit on malformed request: MissingRequestHeader");
63                            let _ = tx
64                                .send(Err(error::MissingRequestHeaderSnafu {}.build().into()))
65                                .await;
66                            break;
67                        };
68
69                        if pusher_id.is_none() {
70                            pusher_id =
71                                Some(register_pusher(&handler_group, header, tx.clone()).await);
72                        }
73                        if let Some(k) = &pusher_id {
74                            METRIC_META_HEARTBEAT_RECV.with_label_values(&[&k.to_string()]);
75                        } else {
76                            METRIC_META_HEARTBEAT_RECV.with_label_values(&["none"]);
77                        }
78
79                        let res = handler_group
80                            .handle(req, ctx.clone())
81                            .await
82                            .map_err(|e| e.into());
83
84                        is_not_leader = res.as_ref().is_ok_and(|r| r.is_not_leader());
85
86                        debug!("Sending heartbeat response: {:?}", res);
87
88                        if tx.send(res).await.is_err() {
89                            info!("ReceiverStream was dropped; shutting down");
90                            break;
91                        }
92                    }
93                    Err(err) => {
94                        if let Some(io_err) = error::match_for_io_error(&err) {
95                            if io_err.kind() == ErrorKind::BrokenPipe {
96                                // client disconnected in unexpected way
97                                error!("Client disconnected: broken pipe");
98                                break;
99                            }
100                        }
101
102                        if tx.send(Err(err)).await.is_err() {
103                            info!("ReceiverStream was dropped; shutting down");
104                            break;
105                        }
106                    }
107                }
108
109                if is_not_leader {
110                    warn!("Quit because it is no longer the leader");
111                    let _ = tx
112                        .send(Err(Status::aborted(format!(
113                            "The requested metasrv node is not leader, node addr: {}",
114                            ctx.server_addr
115                        ))))
116                        .await;
117                    break;
118                }
119            }
120
121            info!("Heartbeat stream closed: {pusher_id:?}");
122
123            if let Some(pusher_id) = pusher_id {
124                let _ = handler_group.deregister_push(pusher_id).await;
125            }
126        });
127
128        let out_stream = ReceiverStream::new(rx);
129
130        Ok(Response::new(Box::pin(out_stream)))
131    }
132
133    async fn ask_leader(&self, req: Request<AskLeaderRequest>) -> GrpcResult<AskLeaderResponse> {
134        let req = req.into_inner();
135        let ctx = self.new_ctx();
136        let res = handle_ask_leader(req, ctx).await?;
137
138        Ok(Response::new(res))
139    }
140}
141
142async fn handle_ask_leader(_req: AskLeaderRequest, ctx: Context) -> Result<AskLeaderResponse> {
143    let addr = match ctx.election {
144        Some(election) => {
145            if election.is_leader() {
146                ctx.server_addr
147            } else {
148                election.leader().await?.0
149            }
150        }
151        None => ctx.server_addr,
152    };
153
154    let leader = Some(Peer {
155        id: 0, // TODO(jiachun): meta node should have a Id
156        addr,
157    });
158
159    let header = Some(ResponseHeader::success());
160    Ok(AskLeaderResponse { header, leader })
161}
162
163fn get_node_id(header: &RequestHeader) -> u64 {
164    static ID: OnceCell<Arc<AtomicU64>> = OnceCell::new();
165
166    fn next_id() -> u64 {
167        let id = ID.get_or_init(|| Arc::new(AtomicU64::new(0))).clone();
168        id.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
169    }
170
171    match header.role() {
172        Role::Frontend => next_id(),
173        Role::Datanode | Role::Flownode => header.member_id,
174    }
175}
176
177async fn register_pusher(
178    handler_group: &HeartbeatHandlerGroup,
179    header: &RequestHeader,
180    sender: Sender<std::result::Result<HeartbeatResponse, tonic::Status>>,
181) -> PusherId {
182    let role = header.role();
183    let id = get_node_id(header);
184    let pusher_id = PusherId::new(role, id);
185    let pusher = Pusher::new(sender);
186    handler_group.register_pusher(pusher_id, pusher).await;
187    pusher_id
188}
189
190#[cfg(test)]
191mod tests {
192    use std::sync::Arc;
193
194    use api::v1::meta::heartbeat_server::Heartbeat;
195    use api::v1::meta::*;
196    use common_meta::kv_backend::memory::MemoryKvBackend;
197    use common_telemetry::tracing_context::W3cTrace;
198    use servers::grpc::GrpcOptions;
199    use tonic::IntoRequest;
200
201    use super::get_node_id;
202    use crate::metasrv::builder::MetasrvBuilder;
203    use crate::metasrv::MetasrvOptions;
204
205    #[tokio::test]
206    async fn test_ask_leader() {
207        let kv_backend = Arc::new(MemoryKvBackend::new());
208
209        let metasrv = MetasrvBuilder::new()
210            .kv_backend(kv_backend)
211            .options(MetasrvOptions {
212                grpc: GrpcOptions {
213                    server_addr: "127.0.0.1:3002".to_string(),
214                    ..Default::default()
215                },
216                ..Default::default()
217            })
218            .build()
219            .await
220            .unwrap();
221
222        let req = AskLeaderRequest {
223            header: Some(RequestHeader::new(1, Role::Datanode, W3cTrace::new())),
224        };
225
226        let res = metasrv.ask_leader(req.into_request()).await.unwrap();
227        let res = res.into_inner();
228        assert_eq!(metasrv.options().grpc.server_addr, res.leader.unwrap().addr);
229    }
230
231    #[test]
232    fn test_get_node_id() {
233        let header = RequestHeader {
234            role: Role::Datanode.into(),
235            member_id: 11,
236            ..Default::default()
237        };
238        assert_eq!(11, get_node_id(&header));
239
240        let header = RequestHeader {
241            role: Role::Frontend.into(),
242            ..Default::default()
243        };
244        for i in 0..10 {
245            assert_eq!(i, get_node_id(&header));
246        }
247
248        let header = RequestHeader {
249            role: Role::Frontend.into(),
250            member_id: 11,
251            ..Default::default()
252        };
253        for i in 10..20 {
254            assert_eq!(i, get_node_id(&header));
255        }
256    }
257}