meta_srv/service/
heartbeat.rs1use std::io::ErrorKind;
16use std::sync::Arc;
17use std::sync::atomic::AtomicU64;
18
19use api::v1::meta::{
20 AskLeaderRequest, AskLeaderResponse, HeartbeatRequest, HeartbeatResponse, Peer, RequestHeader,
21 ResponseHeader, Role, heartbeat_server,
22};
23use common_telemetry::{debug, error, info, warn};
24use futures::StreamExt;
25use once_cell::sync::OnceCell;
26use snafu::OptionExt;
27use tokio::sync::mpsc;
28use tokio::sync::mpsc::Sender;
29use tokio_stream::wrappers::ReceiverStream;
30use tonic::{Request, Response, Status, Streaming};
31
32use crate::error::{self, Result};
33use crate::handler::{HeartbeatHandlerGroup, Pusher, PusherId};
34use crate::metasrv::{Context, Metasrv};
35use crate::metrics::METRIC_META_HEARTBEAT_RECV;
36use crate::service::{GrpcResult, GrpcStream};
37
38#[async_trait::async_trait]
39impl heartbeat_server::Heartbeat for Metasrv {
40 type HeartbeatStream = GrpcStream<HeartbeatResponse>;
41
42 async fn heartbeat(
43 &self,
44 req: Request<Streaming<HeartbeatRequest>>,
45 ) -> GrpcResult<Self::HeartbeatStream> {
46 let mut in_stream = req.into_inner();
47 let (tx, rx) = mpsc::channel(128);
48 let handler_group = self.handler_group().context(error::UnexpectedSnafu {
49 violated: "expected heartbeat handlers",
50 })?;
51
52 let ctx = self.new_ctx();
53 let _handle = common_runtime::spawn_global(async move {
54 let mut pusher_id = None;
55 while let Some(msg) = in_stream.next().await {
56 let mut is_not_leader = false;
57 match msg {
58 Ok(req) => {
59 debug!("Receiving heartbeat request: {:?}", req);
60
61 let Some(header) = req.header.as_ref() else {
62 error!("Exit on malformed request: MissingRequestHeader");
63 let _ = tx
64 .send(Err(error::MissingRequestHeaderSnafu {}.build().into()))
65 .await;
66 break;
67 };
68
69 let is_handshake = pusher_id.is_none();
70 if is_handshake {
71 pusher_id =
72 Some(register_pusher(&handler_group, header, tx.clone()).await);
73 }
74 if let Some(k) = &pusher_id {
75 METRIC_META_HEARTBEAT_RECV.with_label_values(&[&k.to_string()]);
76 } else {
77 METRIC_META_HEARTBEAT_RECV.with_label_values(&["none"]);
78 }
79
80 let res = handler_group
81 .handle(req, ctx.clone().with_handshake(is_handshake))
82 .await
83 .inspect_err(|e| warn!(e; "Failed to handle heartbeat request, pusher: {pusher_id:?}", ))
84 .map_err(|e| e.into());
85
86 is_not_leader = res.as_ref().is_ok_and(|r| r.is_not_leader());
87
88 debug!("Sending heartbeat response: {:?}", res);
89
90 if tx.send(res).await.is_err() {
91 info!("ReceiverStream was dropped; shutting down");
92 break;
93 }
94 }
95 Err(err) => {
96 if let Some(io_err) = error::match_for_io_error(&err)
97 && io_err.kind() == ErrorKind::BrokenPipe
98 {
99 error!("Client disconnected: broken pipe");
101 break;
102 }
103 error!(err; "Sending heartbeat response error");
104
105 if tx.send(Err(err)).await.is_err() {
106 info!("ReceiverStream was dropped; shutting down");
107 break;
108 }
109 }
110 }
111
112 if is_not_leader {
113 warn!("Quit because it is no longer the leader");
114 let _ = tx
115 .send(Err(Status::aborted(format!(
116 "The requested metasrv node is not leader, node addr: {}",
117 ctx.server_addr
118 ))))
119 .await;
120 break;
121 }
122 }
123
124 info!("Heartbeat stream closed: {pusher_id:?}");
125
126 if let Some(pusher_id) = pusher_id {
127 let _ = handler_group.deregister_push(pusher_id).await;
128 }
129 });
130
131 let out_stream = ReceiverStream::new(rx);
132
133 Ok(Response::new(Box::pin(out_stream)))
134 }
135
136 async fn ask_leader(&self, req: Request<AskLeaderRequest>) -> GrpcResult<AskLeaderResponse> {
137 let req = req.into_inner();
138 let ctx = self.new_ctx();
139 let res = handle_ask_leader(req, ctx).await?;
140
141 Ok(Response::new(res))
142 }
143}
144
145async fn handle_ask_leader(_req: AskLeaderRequest, ctx: Context) -> Result<AskLeaderResponse> {
146 let addr = match ctx.election {
147 Some(election) => {
148 if election.is_leader() {
149 ctx.server_addr
150 } else {
151 election.leader().await?.0
152 }
153 }
154 None => ctx.server_addr,
155 };
156
157 let leader = Some(Peer {
158 id: 0, addr,
160 });
161
162 let header = Some(ResponseHeader::success());
163 Ok(AskLeaderResponse { header, leader })
164}
165
166fn get_node_id(header: &RequestHeader) -> u64 {
167 static ID: OnceCell<Arc<AtomicU64>> = OnceCell::new();
168
169 fn next_id() -> u64 {
170 let id = ID.get_or_init(|| Arc::new(AtomicU64::new(0))).clone();
171 id.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
172 }
173
174 match header.role() {
175 Role::Frontend => next_id(),
176 Role::Datanode | Role::Flownode => header.member_id,
177 }
178}
179
180async fn register_pusher(
181 handler_group: &HeartbeatHandlerGroup,
182 header: &RequestHeader,
183 sender: Sender<std::result::Result<HeartbeatResponse, tonic::Status>>,
184) -> PusherId {
185 let role = header.role();
186 let id = get_node_id(header);
187 let pusher_id = PusherId::new(role, id);
188 let pusher = Pusher::new(sender);
189 handler_group.register_pusher(pusher_id, pusher).await;
190 pusher_id
191}
192
193#[cfg(test)]
194mod tests {
195 use std::sync::Arc;
196
197 use api::v1::meta::heartbeat_server::Heartbeat;
198 use api::v1::meta::*;
199 use common_meta::kv_backend::memory::MemoryKvBackend;
200 use common_telemetry::tracing_context::W3cTrace;
201 use servers::grpc::GrpcOptions;
202 use tonic::IntoRequest;
203
204 use super::get_node_id;
205 use crate::metasrv::MetasrvOptions;
206 use crate::metasrv::builder::MetasrvBuilder;
207
208 #[tokio::test]
209 async fn test_ask_leader() {
210 let kv_backend = Arc::new(MemoryKvBackend::new());
211
212 let metasrv = MetasrvBuilder::new()
213 .kv_backend(kv_backend)
214 .options(MetasrvOptions {
215 grpc: GrpcOptions {
216 server_addr: "127.0.0.1:3002".to_string(),
217 ..Default::default()
218 },
219 ..Default::default()
220 })
221 .build()
222 .await
223 .unwrap();
224
225 let req = AskLeaderRequest {
226 header: Some(RequestHeader::new(1, Role::Datanode, W3cTrace::new())),
227 };
228
229 let res = metasrv.ask_leader(req.into_request()).await.unwrap();
230 let res = res.into_inner();
231 assert_eq!(metasrv.options().grpc.server_addr, res.leader.unwrap().addr);
232 }
233
234 #[test]
235 fn test_get_node_id() {
236 let header = RequestHeader {
237 role: Role::Datanode.into(),
238 member_id: 11,
239 ..Default::default()
240 };
241 assert_eq!(11, get_node_id(&header));
242
243 let header = RequestHeader {
244 role: Role::Frontend.into(),
245 ..Default::default()
246 };
247 for i in 0..10 {
248 assert_eq!(i, get_node_id(&header));
249 }
250
251 let header = RequestHeader {
252 role: Role::Frontend.into(),
253 member_id: 11,
254 ..Default::default()
255 };
256 for i in 10..20 {
257 assert_eq!(i, get_node_id(&header));
258 }
259 }
260}