meta_srv/service/
heartbeat.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::io::ErrorKind;
16use std::sync::Arc;
17use std::sync::atomic::AtomicU64;
18
19use api::v1::meta::{
20    AskLeaderRequest, AskLeaderResponse, HeartbeatRequest, HeartbeatResponse, Peer, RequestHeader,
21    ResponseHeader, Role, heartbeat_server,
22};
23use common_telemetry::{debug, error, info, warn};
24use futures::StreamExt;
25use once_cell::sync::OnceCell;
26use snafu::OptionExt;
27use tokio::sync::mpsc;
28use tokio::sync::mpsc::Sender;
29use tokio_stream::wrappers::ReceiverStream;
30use tonic::{Request, Response, Status, Streaming};
31
32use crate::error::{self, Result};
33use crate::handler::{HeartbeatHandlerGroup, Pusher, PusherId};
34use crate::metasrv::{Context, Metasrv};
35use crate::metrics::METRIC_META_HEARTBEAT_RECV;
36use crate::service::{GrpcResult, GrpcStream};
37
38#[async_trait::async_trait]
39impl heartbeat_server::Heartbeat for Metasrv {
40    type HeartbeatStream = GrpcStream<HeartbeatResponse>;
41
42    async fn heartbeat(
43        &self,
44        req: Request<Streaming<HeartbeatRequest>>,
45    ) -> GrpcResult<Self::HeartbeatStream> {
46        let mut in_stream = req.into_inner();
47        let (tx, rx) = mpsc::channel(128);
48        let handler_group = self.handler_group().context(error::UnexpectedSnafu {
49            violated: "expected heartbeat handlers",
50        })?;
51
52        let ctx = self.new_ctx();
53        let _handle = common_runtime::spawn_global(async move {
54            let mut pusher_id = None;
55            while let Some(msg) = in_stream.next().await {
56                let mut is_not_leader = false;
57                match msg {
58                    Ok(req) => {
59                        debug!("Receiving heartbeat request: {:?}", req);
60
61                        let Some(header) = req.header.as_ref() else {
62                            error!("Exit on malformed request: MissingRequestHeader");
63                            let _ = tx
64                                .send(Err(error::MissingRequestHeaderSnafu {}.build().into()))
65                                .await;
66                            break;
67                        };
68
69                        if pusher_id.is_none() {
70                            pusher_id =
71                                Some(register_pusher(&handler_group, header, tx.clone()).await);
72                        }
73                        if let Some(k) = &pusher_id {
74                            METRIC_META_HEARTBEAT_RECV.with_label_values(&[&k.to_string()]);
75                        } else {
76                            METRIC_META_HEARTBEAT_RECV.with_label_values(&["none"]);
77                        }
78
79                        let res = handler_group
80                            .handle(req, ctx.clone())
81                            .await
82                            .inspect_err(|e| warn!(e; "Failed to handle heartbeat request, pusher: {pusher_id:?}", ))
83                            .map_err(|e| e.into());
84
85                        is_not_leader = res.as_ref().is_ok_and(|r| r.is_not_leader());
86
87                        debug!("Sending heartbeat response: {:?}", res);
88
89                        if tx.send(res).await.is_err() {
90                            info!("ReceiverStream was dropped; shutting down");
91                            break;
92                        }
93                    }
94                    Err(err) => {
95                        if let Some(io_err) = error::match_for_io_error(&err)
96                            && io_err.kind() == ErrorKind::BrokenPipe
97                        {
98                            // client disconnected in unexpected way
99                            error!("Client disconnected: broken pipe");
100                            break;
101                        }
102
103                        if tx.send(Err(err)).await.is_err() {
104                            info!("ReceiverStream was dropped; shutting down");
105                            break;
106                        }
107                    }
108                }
109
110                if is_not_leader {
111                    warn!("Quit because it is no longer the leader");
112                    let _ = tx
113                        .send(Err(Status::aborted(format!(
114                            "The requested metasrv node is not leader, node addr: {}",
115                            ctx.server_addr
116                        ))))
117                        .await;
118                    break;
119                }
120            }
121
122            info!("Heartbeat stream closed: {pusher_id:?}");
123
124            if let Some(pusher_id) = pusher_id {
125                let _ = handler_group.deregister_push(pusher_id).await;
126            }
127        });
128
129        let out_stream = ReceiverStream::new(rx);
130
131        Ok(Response::new(Box::pin(out_stream)))
132    }
133
134    async fn ask_leader(&self, req: Request<AskLeaderRequest>) -> GrpcResult<AskLeaderResponse> {
135        let req = req.into_inner();
136        let ctx = self.new_ctx();
137        let res = handle_ask_leader(req, ctx).await?;
138
139        Ok(Response::new(res))
140    }
141}
142
143async fn handle_ask_leader(_req: AskLeaderRequest, ctx: Context) -> Result<AskLeaderResponse> {
144    let addr = match ctx.election {
145        Some(election) => {
146            if election.is_leader() {
147                ctx.server_addr
148            } else {
149                election.leader().await?.0
150            }
151        }
152        None => ctx.server_addr,
153    };
154
155    let leader = Some(Peer {
156        id: 0, // TODO(jiachun): meta node should have a Id
157        addr,
158    });
159
160    let header = Some(ResponseHeader::success());
161    Ok(AskLeaderResponse { header, leader })
162}
163
164fn get_node_id(header: &RequestHeader) -> u64 {
165    static ID: OnceCell<Arc<AtomicU64>> = OnceCell::new();
166
167    fn next_id() -> u64 {
168        let id = ID.get_or_init(|| Arc::new(AtomicU64::new(0))).clone();
169        id.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
170    }
171
172    match header.role() {
173        Role::Frontend => next_id(),
174        Role::Datanode | Role::Flownode => header.member_id,
175    }
176}
177
178async fn register_pusher(
179    handler_group: &HeartbeatHandlerGroup,
180    header: &RequestHeader,
181    sender: Sender<std::result::Result<HeartbeatResponse, tonic::Status>>,
182) -> PusherId {
183    let role = header.role();
184    let id = get_node_id(header);
185    let pusher_id = PusherId::new(role, id);
186    let pusher = Pusher::new(sender);
187    handler_group.register_pusher(pusher_id, pusher).await;
188    pusher_id
189}
190
191#[cfg(test)]
192mod tests {
193    use std::sync::Arc;
194
195    use api::v1::meta::heartbeat_server::Heartbeat;
196    use api::v1::meta::*;
197    use common_meta::kv_backend::memory::MemoryKvBackend;
198    use common_telemetry::tracing_context::W3cTrace;
199    use servers::grpc::GrpcOptions;
200    use tonic::IntoRequest;
201
202    use super::get_node_id;
203    use crate::metasrv::MetasrvOptions;
204    use crate::metasrv::builder::MetasrvBuilder;
205
206    #[tokio::test]
207    async fn test_ask_leader() {
208        let kv_backend = Arc::new(MemoryKvBackend::new());
209
210        let metasrv = MetasrvBuilder::new()
211            .kv_backend(kv_backend)
212            .options(MetasrvOptions {
213                grpc: GrpcOptions {
214                    server_addr: "127.0.0.1:3002".to_string(),
215                    ..Default::default()
216                },
217                ..Default::default()
218            })
219            .build()
220            .await
221            .unwrap();
222
223        let req = AskLeaderRequest {
224            header: Some(RequestHeader::new(1, Role::Datanode, W3cTrace::new())),
225        };
226
227        let res = metasrv.ask_leader(req.into_request()).await.unwrap();
228        let res = res.into_inner();
229        assert_eq!(metasrv.options().grpc.server_addr, res.leader.unwrap().addr);
230    }
231
232    #[test]
233    fn test_get_node_id() {
234        let header = RequestHeader {
235            role: Role::Datanode.into(),
236            member_id: 11,
237            ..Default::default()
238        };
239        assert_eq!(11, get_node_id(&header));
240
241        let header = RequestHeader {
242            role: Role::Frontend.into(),
243            ..Default::default()
244        };
245        for i in 0..10 {
246            assert_eq!(i, get_node_id(&header));
247        }
248
249        let header = RequestHeader {
250            role: Role::Frontend.into(),
251            member_id: 11,
252            ..Default::default()
253        };
254        for i in 10..20 {
255            assert_eq!(i, get_node_id(&header));
256        }
257    }
258}