meta_srv/service/
heartbeat.rs1use std::io::ErrorKind;
16use std::sync::atomic::AtomicU64;
17use std::sync::Arc;
18
19use api::v1::meta::{
20 heartbeat_server, AskLeaderRequest, AskLeaderResponse, HeartbeatRequest, HeartbeatResponse,
21 Peer, RequestHeader, ResponseHeader, Role,
22};
23use common_telemetry::{debug, error, info, warn};
24use futures::StreamExt;
25use once_cell::sync::OnceCell;
26use snafu::OptionExt;
27use tokio::sync::mpsc;
28use tokio::sync::mpsc::Sender;
29use tokio_stream::wrappers::ReceiverStream;
30use tonic::{Request, Response, Streaming};
31
32use crate::error;
33use crate::error::Result;
34use crate::handler::{HeartbeatHandlerGroup, Pusher, PusherId};
35use crate::metasrv::{Context, Metasrv};
36use crate::metrics::METRIC_META_HEARTBEAT_RECV;
37use crate::service::{GrpcResult, GrpcStream};
38
39#[async_trait::async_trait]
40impl heartbeat_server::Heartbeat for Metasrv {
41 type HeartbeatStream = GrpcStream<HeartbeatResponse>;
42
43 async fn heartbeat(
44 &self,
45 req: Request<Streaming<HeartbeatRequest>>,
46 ) -> GrpcResult<Self::HeartbeatStream> {
47 let mut in_stream = req.into_inner();
48 let (tx, rx) = mpsc::channel(128);
49 let handler_group = self.handler_group().context(error::UnexpectedSnafu {
50 violated: "expected heartbeat handlers",
51 })?;
52
53 let ctx = self.new_ctx();
54 let _handle = common_runtime::spawn_global(async move {
55 let mut pusher_id = None;
56 while let Some(msg) = in_stream.next().await {
57 let mut is_not_leader = false;
58 match msg {
59 Ok(req) => {
60 debug!("Receiving heartbeat request: {:?}", req);
61
62 let Some(header) = req.header.as_ref() else {
63 error!("Exit on malformed request: MissingRequestHeader");
64 let _ = tx
65 .send(Err(error::MissingRequestHeaderSnafu {}.build().into()))
66 .await;
67 break;
68 };
69
70 if pusher_id.is_none() {
71 pusher_id =
72 Some(register_pusher(&handler_group, header, tx.clone()).await);
73 }
74 if let Some(k) = &pusher_id {
75 METRIC_META_HEARTBEAT_RECV.with_label_values(&[&k.to_string()]);
76 } else {
77 METRIC_META_HEARTBEAT_RECV.with_label_values(&["none"]);
78 }
79
80 let res = handler_group
81 .handle(req, ctx.clone())
82 .await
83 .map_err(|e| e.into());
84
85 is_not_leader = res.as_ref().is_ok_and(|r| r.is_not_leader());
86
87 debug!("Sending heartbeat response: {:?}", res);
88
89 if tx.send(res).await.is_err() {
90 info!("ReceiverStream was dropped; shutting down");
91 break;
92 }
93 }
94 Err(err) => {
95 if let Some(io_err) = error::match_for_io_error(&err) {
96 if io_err.kind() == ErrorKind::BrokenPipe {
97 error!("Client disconnected: broken pipe");
99 break;
100 }
101 }
102
103 if tx.send(Err(err)).await.is_err() {
104 info!("ReceiverStream was dropped; shutting down");
105 break;
106 }
107 }
108 }
109
110 if is_not_leader {
111 warn!("Quit because it is no longer the leader");
112 break;
113 }
114 }
115
116 info!("Heartbeat stream closed: {pusher_id:?}");
117
118 if let Some(pusher_id) = pusher_id {
119 let _ = handler_group.deregister_push(pusher_id).await;
120 }
121 });
122
123 let out_stream = ReceiverStream::new(rx);
124
125 Ok(Response::new(Box::pin(out_stream)))
126 }
127
128 async fn ask_leader(&self, req: Request<AskLeaderRequest>) -> GrpcResult<AskLeaderResponse> {
129 let req = req.into_inner();
130 let ctx = self.new_ctx();
131 let res = handle_ask_leader(req, ctx).await?;
132
133 Ok(Response::new(res))
134 }
135}
136
137async fn handle_ask_leader(_req: AskLeaderRequest, ctx: Context) -> Result<AskLeaderResponse> {
138 let addr = match ctx.election {
139 Some(election) => {
140 if election.is_leader() {
141 ctx.server_addr
142 } else {
143 election.leader().await?.0
144 }
145 }
146 None => ctx.server_addr,
147 };
148
149 let leader = Some(Peer {
150 id: 0, addr,
152 });
153
154 let header = Some(ResponseHeader::success());
155 Ok(AskLeaderResponse { header, leader })
156}
157
158fn get_node_id(header: &RequestHeader) -> u64 {
159 static ID: OnceCell<Arc<AtomicU64>> = OnceCell::new();
160
161 fn next_id() -> u64 {
162 let id = ID.get_or_init(|| Arc::new(AtomicU64::new(0))).clone();
163 id.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
164 }
165
166 match header.role() {
167 Role::Frontend => next_id(),
168 Role::Datanode | Role::Flownode => header.member_id,
169 }
170}
171
172async fn register_pusher(
173 handler_group: &HeartbeatHandlerGroup,
174 header: &RequestHeader,
175 sender: Sender<std::result::Result<HeartbeatResponse, tonic::Status>>,
176) -> PusherId {
177 let role = header.role();
178 let id = get_node_id(header);
179 let pusher_id = PusherId::new(role, id);
180 let pusher = Pusher::new(sender);
181 handler_group.register_pusher(pusher_id, pusher).await;
182 pusher_id
183}
184
185#[cfg(test)]
186mod tests {
187 use std::sync::Arc;
188
189 use api::v1::meta::heartbeat_server::Heartbeat;
190 use api::v1::meta::*;
191 use common_meta::kv_backend::memory::MemoryKvBackend;
192 use common_telemetry::tracing_context::W3cTrace;
193 use servers::grpc::GrpcOptions;
194 use tonic::IntoRequest;
195
196 use super::get_node_id;
197 use crate::metasrv::builder::MetasrvBuilder;
198 use crate::metasrv::MetasrvOptions;
199
200 #[tokio::test]
201 async fn test_ask_leader() {
202 let kv_backend = Arc::new(MemoryKvBackend::new());
203
204 let metasrv = MetasrvBuilder::new()
205 .kv_backend(kv_backend)
206 .options(MetasrvOptions {
207 grpc: GrpcOptions {
208 server_addr: "127.0.0.1:3002".to_string(),
209 ..Default::default()
210 },
211 ..Default::default()
212 })
213 .build()
214 .await
215 .unwrap();
216
217 let req = AskLeaderRequest {
218 header: Some(RequestHeader::new(1, Role::Datanode, W3cTrace::new())),
219 };
220
221 let res = metasrv.ask_leader(req.into_request()).await.unwrap();
222 let res = res.into_inner();
223 assert_eq!(metasrv.options().grpc.server_addr, res.leader.unwrap().addr);
224 }
225
226 #[test]
227 fn test_get_node_id() {
228 let header = RequestHeader {
229 role: Role::Datanode.into(),
230 member_id: 11,
231 ..Default::default()
232 };
233 assert_eq!(11, get_node_id(&header));
234
235 let header = RequestHeader {
236 role: Role::Frontend.into(),
237 ..Default::default()
238 };
239 for i in 0..10 {
240 assert_eq!(i, get_node_id(&header));
241 }
242
243 let header = RequestHeader {
244 role: Role::Frontend.into(),
245 member_id: 11,
246 ..Default::default()
247 };
248 for i in 10..20 {
249 assert_eq!(i, get_node_id(&header));
250 }
251 }
252}