1use std::collections::{BTreeMap, HashSet};
16use std::fmt::{Debug, Display};
17use std::ops::Range;
18use std::sync::{Arc, Mutex};
19use std::time::{Duration, Instant};
20
21use api::v1::meta::mailbox_message::Payload;
22use api::v1::meta::{
23 HeartbeatRequest, HeartbeatResponse, MailboxMessage, PROTOCOL_VERSION, RegionLease,
24 ResponseHeader, Role,
25};
26use check_leader_handler::CheckLeaderHandler;
27use collect_cluster_info_handler::{
28 CollectDatanodeClusterInfoHandler, CollectFlownodeClusterInfoHandler,
29 CollectFrontendClusterInfoHandler,
30};
31use collect_leader_region_handler::CollectLeaderRegionHandler;
32use collect_stats_handler::CollectStatsHandler;
33use common_base::Plugins;
34use common_meta::datanode::Stat;
35use common_meta::instruction::InstructionReply;
36use common_meta::sequence::Sequence;
37use common_telemetry::{debug, info, warn};
38use dashmap::DashMap;
39use extract_stat_handler::ExtractStatHandler;
40use failure_handler::RegionFailureHandler;
41use filter_inactive_region_stats::FilterInactiveRegionStatsHandler;
42use futures::future::join_all;
43use keep_lease_handler::{DatanodeKeepLeaseHandler, FlownodeKeepLeaseHandler};
44use mailbox_handler::MailboxHandler;
45use on_leader_start_handler::OnLeaderStartHandler;
46use publish_heartbeat_handler::PublishHeartbeatHandler;
47use region_lease_handler::RegionLeaseHandler;
48use remap_flow_peer_handler::RemapFlowPeerHandler;
49use response_header_handler::ResponseHeaderHandler;
50use snafu::{OptionExt, ResultExt};
51use store_api::storage::RegionId;
52use tokio::sync::mpsc::Sender;
53use tokio::sync::{Notify, RwLock, oneshot, watch};
54
55use crate::error::{self, DeserializeFromJsonSnafu, Result, UnexpectedInstructionReplySnafu};
56use crate::handler::collect_topic_stats_handler::CollectTopicStatsHandler;
57use crate::handler::flow_state_handler::FlowStateHandler;
58use crate::handler::persist_stats_handler::PersistStatsHandler;
59use crate::metasrv::Context;
60use crate::metrics::{METRIC_META_HANDLER_EXECUTE, METRIC_META_HEARTBEAT_CONNECTION_NUM};
61use crate::pubsub::PublisherRef;
62use crate::service::mailbox::{
63 BroadcastChannel, Channel, Mailbox, MailboxReceiver, MailboxRef, MessageId,
64};
65
66pub mod check_leader_handler;
67pub mod collect_cluster_info_handler;
68pub mod collect_leader_region_handler;
69pub mod collect_stats_handler;
70pub mod collect_topic_stats_handler;
71pub mod extract_stat_handler;
72pub mod failure_handler;
73pub mod filter_inactive_region_stats;
74pub mod flow_state_handler;
75pub mod keep_lease_handler;
76pub mod mailbox_handler;
77pub mod on_leader_start_handler;
78pub mod persist_stats_handler;
79pub mod publish_heartbeat_handler;
80pub mod region_lease_handler;
81pub mod remap_flow_peer_handler;
82pub mod response_header_handler;
83
84#[cfg(test)]
85pub mod test_utils;
86
87#[async_trait::async_trait]
88pub trait HeartbeatHandler: Send + Sync {
89 fn is_acceptable(&self, role: Role) -> bool;
90
91 fn name(&self) -> &'static str {
92 let type_name = std::any::type_name::<Self>();
93 type_name.split("::").last().unwrap_or(type_name)
95 }
96
97 async fn handle(
98 &self,
99 req: &HeartbeatRequest,
100 ctx: &mut Context,
101 acc: &mut HeartbeatAccumulator,
102 ) -> Result<HandleControl>;
103}
104
105#[derive(PartialEq, Debug)]
109pub enum HandleControl {
110 Continue,
111 Done,
112}
113
114#[derive(Debug, Default)]
115pub struct HeartbeatAccumulator {
116 pub header: Option<ResponseHeader>,
117 mailbox_message: Option<MailboxMessage>,
118 pub stat: Option<Stat>,
119 pub inactive_region_ids: HashSet<RegionId>,
120 pub region_lease: Option<RegionLease>,
121}
122
123impl HeartbeatAccumulator {
124 pub(crate) fn take_mailbox_message(&mut self) -> Option<MailboxMessage> {
125 self.mailbox_message.take()
126 }
127
128 pub fn set_mailbox_message(&mut self, message: MailboxMessage) {
129 let _ = self.mailbox_message.insert(message);
130 }
131}
132
133#[derive(Copy, Clone)]
134pub struct PusherId {
135 pub role: Role,
136 pub id: u64,
137}
138
139impl Debug for PusherId {
140 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
141 write!(f, "{:?}-{}", self.role, self.id)
142 }
143}
144
145impl Display for PusherId {
146 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
147 write!(f, "{:?}-{}", self.role, self.id)
148 }
149}
150
151impl PusherId {
152 pub fn new(role: Role, id: u64) -> Self {
153 Self { role, id }
154 }
155
156 pub fn string_key(&self) -> String {
157 format!("{}-{}", self.role as i32, self.id)
158 }
159}
160
161pub type DeregisterSignalReceiver = watch::Receiver<bool>;
163
164pub struct Pusher {
166 sender: Sender<std::result::Result<HeartbeatResponse, tonic::Status>>,
167 deregister_signal_sender: watch::Sender<bool>,
171 deregister_signal_receiver: DeregisterSignalReceiver,
172
173 res_header: ResponseHeader,
174}
175
176impl Drop for Pusher {
177 fn drop(&mut self) {
178 let _ = self.deregister_signal_sender.send(true);
181 }
182}
183
184impl Pusher {
185 pub fn new(sender: Sender<std::result::Result<HeartbeatResponse, tonic::Status>>) -> Self {
186 let res_header = ResponseHeader {
187 protocol_version: PROTOCOL_VERSION,
188 ..Default::default()
189 };
190 let (deregister_signal_sender, deregister_signal_receiver) = watch::channel(false);
191 Self {
192 sender,
193 deregister_signal_sender,
194 deregister_signal_receiver,
195 res_header,
196 }
197 }
198
199 #[inline]
200 pub async fn push(&self, res: HeartbeatResponse) -> Result<()> {
201 self.sender.send(Ok(res)).await.map_err(|e| {
202 error::PushMessageSnafu {
203 err_msg: e.to_string(),
204 }
205 .build()
206 })
207 }
208
209 #[inline]
210 pub fn header(&self) -> ResponseHeader {
211 self.res_header.clone()
212 }
213}
214
215#[derive(Clone, Default)]
217pub struct Pushers(Arc<RwLock<BTreeMap<String, Pusher>>>);
218
219impl Pushers {
220 async fn push(
221 &self,
222 pusher_id: PusherId,
223 mailbox_message: MailboxMessage,
224 ) -> Result<DeregisterSignalReceiver> {
225 let pusher_id = pusher_id.string_key();
226 let pushers = self.0.read().await;
227 let pusher = pushers
228 .get(&pusher_id)
229 .context(error::PusherNotFoundSnafu { pusher_id })?;
230
231 pusher
232 .push(HeartbeatResponse {
233 header: Some(pusher.header()),
234 mailbox_message: Some(mailbox_message),
235 ..Default::default()
236 })
237 .await?;
238
239 Ok(pusher.deregister_signal_receiver.clone())
240 }
241
242 async fn broadcast(
243 &self,
244 range: Range<String>,
245 mailbox_message: &MailboxMessage,
246 ) -> Result<()> {
247 let pushers = self.0.read().await;
248 let pushers = pushers
249 .range(range)
250 .map(|(_, value)| value)
251 .collect::<Vec<_>>();
252 let mut results = Vec::with_capacity(pushers.len());
253
254 for pusher in pushers {
255 let mut mailbox_message = mailbox_message.clone();
256 mailbox_message.id = 0; results.push(pusher.push(HeartbeatResponse {
259 header: Some(pusher.header()),
260 mailbox_message: Some(mailbox_message),
261 ..Default::default()
262 }))
263 }
264
265 let _ = join_all(results)
267 .await
268 .into_iter()
269 .collect::<Result<Vec<_>>>()?;
270
271 Ok(())
272 }
273
274 pub(crate) async fn insert(&self, pusher_id: String, pusher: Pusher) -> Option<Pusher> {
275 self.0.write().await.insert(pusher_id, pusher)
276 }
277
278 async fn remove(&self, pusher_id: &str) -> Option<Pusher> {
279 self.0.write().await.remove(pusher_id)
280 }
281
282 pub(crate) async fn clear(&self) -> Vec<String> {
283 let mut pushers = self.0.write().await;
284 let keys = pushers.keys().cloned().collect::<Vec<_>>();
285 if !keys.is_empty() {
286 pushers.clear();
287 }
288 keys
289 }
290}
291
292#[derive(Clone)]
293pub struct NameCachedHandler {
294 name: &'static str,
295 handler: Arc<dyn HeartbeatHandler>,
296}
297
298impl NameCachedHandler {
299 fn new(handler: impl HeartbeatHandler + 'static) -> Self {
300 let name = handler.name();
301 let handler = Arc::new(handler);
302 Self { name, handler }
303 }
304}
305
306pub type HeartbeatHandlerGroupRef = Arc<HeartbeatHandlerGroup>;
307
308#[derive(Default, Clone)]
310pub struct HeartbeatHandlerGroup {
311 handlers: Vec<NameCachedHandler>,
312 pushers: Pushers,
313}
314
315impl HeartbeatHandlerGroup {
316 pub async fn register_pusher(&self, pusher_id: PusherId, pusher: Pusher) {
318 METRIC_META_HEARTBEAT_CONNECTION_NUM.inc();
319 info!("Pusher register: {}", pusher_id);
320 let _ = self.pushers.insert(pusher_id.string_key(), pusher).await;
321 }
322
323 pub async fn deregister_push(&self, pusher_id: PusherId) {
325 info!("Pusher unregister: {}", pusher_id);
326 if self.pushers.remove(&pusher_id.string_key()).await.is_some() {
327 METRIC_META_HEARTBEAT_CONNECTION_NUM.dec();
328 }
329 }
330
331 pub fn pushers(&self) -> Pushers {
333 self.pushers.clone()
334 }
335
336 pub async fn handle(
338 &self,
339 req: HeartbeatRequest,
340 mut ctx: Context,
341 ) -> Result<HeartbeatResponse> {
342 let mut acc = HeartbeatAccumulator::default();
343 let role = req
344 .header
345 .as_ref()
346 .and_then(|h| Role::try_from(h.role).ok())
347 .context(error::InvalidArgumentsSnafu {
348 err_msg: format!("invalid role: {:?}", req.header),
349 })?;
350
351 let is_handshake = ctx.is_handshake;
352
353 for NameCachedHandler { name, handler } in self.handlers.iter() {
354 if !handler.is_acceptable(role) {
355 continue;
356 }
357
358 let _timer = METRIC_META_HANDLER_EXECUTE
359 .with_label_values(&[*name])
360 .start_timer();
361
362 if handler.handle(&req, &mut ctx, &mut acc).await? == HandleControl::Done {
363 break;
364 }
365 }
366 let header = std::mem::take(&mut acc.header);
367 let mailbox_message = acc.take_mailbox_message();
368
369 let heartbeat_config = if is_handshake {
371 let config = ctx.heartbeat_options_for(role).into();
372
373 info!(
374 "Handshake with {:?} node, sending config: {:?}",
375 role, config
376 );
377
378 Some(config)
379 } else {
380 None
381 };
382
383 let res = HeartbeatResponse {
384 header,
385 region_lease: acc.region_lease,
386 mailbox_message,
387 heartbeat_config,
388 };
389 Ok(res)
390 }
391}
392
393pub struct HeartbeatMailbox {
394 pushers: Pushers,
395 sequence: Sequence,
396 senders: DashMap<MessageId, oneshot::Sender<Result<MailboxMessage>>>,
397 timeouts: DashMap<MessageId, Instant>,
398 timeout_notify: Notify,
399}
400
401impl HeartbeatMailbox {
402 pub fn json_reply(msg: &MailboxMessage) -> Result<InstructionReply> {
403 let Payload::Json(payload) =
404 msg.payload
405 .as_ref()
406 .with_context(|| UnexpectedInstructionReplySnafu {
407 mailbox_message: msg.to_string(),
408 reason: format!("empty payload, msg: {msg:?}"),
409 })?;
410 serde_json::from_str(payload).context(DeserializeFromJsonSnafu { input: payload })
411 }
412
413 #[cfg(test)]
415 pub(crate) fn json_instruction(
416 msg: &MailboxMessage,
417 ) -> Result<common_meta::instruction::Instruction> {
418 let Payload::Json(payload) =
419 msg.payload
420 .as_ref()
421 .with_context(|| UnexpectedInstructionReplySnafu {
422 mailbox_message: msg.to_string(),
423 reason: format!("empty payload, msg: {msg:?}"),
424 })?;
425 serde_json::from_str(payload).context(DeserializeFromJsonSnafu { input: payload })
426 }
427
428 pub fn create(pushers: Pushers, sequence: Sequence) -> MailboxRef {
429 let mailbox = Arc::new(Self::new(pushers, sequence));
430
431 let timeout_checker = mailbox.clone();
432 let _handle = common_runtime::spawn_global(async move {
433 timeout_checker.check_timeout_bg(10).await;
434 });
435
436 mailbox
437 }
438
439 fn new(pushers: Pushers, sequence: Sequence) -> Self {
440 Self {
441 pushers,
442 sequence,
443 senders: DashMap::default(),
444 timeouts: DashMap::default(),
445 timeout_notify: Notify::new(),
446 }
447 }
448
449 async fn check_timeout_bg(&self, interval_millis: u64) {
450 let mut interval = tokio::time::interval(Duration::from_millis(interval_millis));
451
452 loop {
453 let _ = interval.tick().await;
454
455 if self.timeouts.is_empty() {
456 self.timeout_notify.notified().await;
457 }
458
459 let now = Instant::now();
460 let timeout_ids = self
461 .timeouts
462 .iter()
463 .filter_map(|entry| {
464 let (id, deadline) = entry.pair();
465 if deadline < &now { Some(*id) } else { None }
466 })
467 .collect::<Vec<_>>();
468
469 for id in timeout_ids {
470 let _ = self
471 .on_recv(id, Err(error::MailboxTimeoutSnafu { id }.build()))
472 .await;
473 }
474 }
475 }
476
477 #[inline]
478 async fn next_message_id(&self) -> Result<u64> {
479 loop {
482 let next = self
483 .sequence
484 .next()
485 .await
486 .context(error::NextSequenceSnafu)?;
487 if next > 0 {
488 return Ok(next);
489 }
490 }
491 }
492}
493
494#[async_trait::async_trait]
495impl Mailbox for HeartbeatMailbox {
496 async fn send(
497 &self,
498 ch: &Channel,
499 mut msg: MailboxMessage,
500 timeout: Duration,
501 ) -> Result<MailboxReceiver> {
502 let message_id = self.next_message_id().await?;
503 msg.id = message_id;
504
505 let pusher_id = ch.pusher_id();
506 debug!("Sending mailbox message {msg:?} to {pusher_id}");
507
508 let (tx, rx) = oneshot::channel();
509 let _ = self.senders.insert(message_id, tx);
510 let deadline = Instant::now() + timeout;
511 self.timeouts.insert(message_id, deadline);
512 self.timeout_notify.notify_one();
513
514 let deregister_signal_receiver = self.pushers.push(pusher_id, msg).await?;
515
516 Ok(MailboxReceiver::new(
517 message_id,
518 rx,
519 deregister_signal_receiver,
520 *ch,
521 ))
522 }
523
524 async fn send_oneway(&self, ch: &Channel, mut msg: MailboxMessage) -> Result<()> {
525 let message_id = 0; msg.id = message_id;
527
528 let pusher_id = ch.pusher_id();
529 debug!("Sending mailbox message {msg:?} to {pusher_id}");
530
531 self.pushers.push(pusher_id, msg).await?;
532
533 Ok(())
534 }
535
536 async fn broadcast(&self, ch: &BroadcastChannel, msg: &MailboxMessage) -> Result<()> {
537 self.pushers.broadcast(ch.pusher_range(), msg).await
538 }
539
540 async fn on_recv(&self, id: MessageId, maybe_msg: Result<MailboxMessage>) -> Result<()> {
541 debug!("Received mailbox message {maybe_msg:?}");
542
543 let _ = self.timeouts.remove(&id);
544
545 if let Some((_, tx)) = self.senders.remove(&id) {
546 tx.send(maybe_msg)
547 .map_err(|_| error::MailboxClosedSnafu { id }.build())?;
548 } else if let Ok(finally_msg) = maybe_msg {
549 warn!("The response arrived too late: {finally_msg:?}");
550 }
551
552 Ok(())
553 }
554
555 async fn reset(&self) {
556 let keys = self.pushers.clear().await;
557 if !keys.is_empty() {
558 info!("Reset mailbox, deregister pushers: {:?}", keys);
559 METRIC_META_HEARTBEAT_CONNECTION_NUM.sub(keys.len() as i64);
560 }
561 }
562}
563
564pub struct HeartbeatHandlerGroupBuilder {
566 region_failure_handler: Option<RegionFailureHandler>,
568
569 region_lease_handler: Option<RegionLeaseHandler>,
571
572 flush_stats_factor: Option<usize>,
576 flow_state_handler: Option<FlowStateHandler>,
578
579 persist_stats_handler: Option<PersistStatsHandler>,
581
582 plugins: Option<Plugins>,
584
585 pushers: Pushers,
587
588 handlers: Vec<NameCachedHandler>,
590}
591
592impl HeartbeatHandlerGroupBuilder {
593 pub fn new(pushers: Pushers) -> Self {
594 Self {
595 region_failure_handler: None,
596 region_lease_handler: None,
597 flush_stats_factor: None,
598 flow_state_handler: None,
599 persist_stats_handler: None,
600 plugins: None,
601 pushers,
602 handlers: vec![],
603 }
604 }
605
606 pub fn with_flow_state_handler(mut self, handler: Option<FlowStateHandler>) -> Self {
607 self.flow_state_handler = handler;
608 self
609 }
610
611 pub fn with_region_lease_handler(mut self, handler: Option<RegionLeaseHandler>) -> Self {
612 self.region_lease_handler = handler;
613 self
614 }
615
616 pub fn with_region_failure_handler(mut self, handler: Option<RegionFailureHandler>) -> Self {
618 self.region_failure_handler = handler;
619 self
620 }
621
622 pub fn with_flush_stats_factor(mut self, flush_stats_factor: Option<usize>) -> Self {
624 self.flush_stats_factor = flush_stats_factor;
625 self
626 }
627
628 pub fn with_persist_stats_handler(mut self, handler: Option<PersistStatsHandler>) -> Self {
629 self.persist_stats_handler = handler;
630 self
631 }
632
633 pub fn with_plugins(mut self, plugins: Option<Plugins>) -> Self {
635 self.plugins = plugins;
636 self
637 }
638
639 pub fn add_default_handlers(mut self) -> Self {
641 let publish_heartbeat_handler = if let Some(plugins) = self.plugins.as_ref() {
643 plugins
644 .get::<PublisherRef>()
645 .map(|publish| PublishHeartbeatHandler::new(publish.clone()))
646 } else {
647 None
648 };
649
650 self.add_handler_last(ResponseHeaderHandler);
651 self.add_handler_last(DatanodeKeepLeaseHandler);
655 self.add_handler_last(FlownodeKeepLeaseHandler);
656 self.add_handler_last(CheckLeaderHandler);
657 self.add_handler_last(OnLeaderStartHandler);
658 self.add_handler_last(ExtractStatHandler);
659 self.add_handler_last(CollectDatanodeClusterInfoHandler);
660 self.add_handler_last(CollectFrontendClusterInfoHandler);
661 self.add_handler_last(CollectFlownodeClusterInfoHandler);
662 self.add_handler_last(MailboxHandler);
663 if let Some(region_lease_handler) = self.region_lease_handler.take() {
664 self.add_handler_last(region_lease_handler);
665 }
666 self.add_handler_last(FilterInactiveRegionStatsHandler);
667 if let Some(region_failure_handler) = self.region_failure_handler.take() {
668 self.add_handler_last(region_failure_handler);
669 }
670 if let Some(publish_heartbeat_handler) = publish_heartbeat_handler {
671 self.add_handler_last(publish_heartbeat_handler);
672 }
673 self.add_handler_last(CollectLeaderRegionHandler);
674 self.add_handler_last(CollectTopicStatsHandler);
675 if let Some(persist_stats_handler) = self.persist_stats_handler.take() {
678 self.add_handler_last(persist_stats_handler);
679 }
680 self.add_handler_last(CollectStatsHandler::new(self.flush_stats_factor));
681 self.add_handler_last(RemapFlowPeerHandler::default());
682
683 if let Some(flow_state_handler) = self.flow_state_handler.take() {
684 self.add_handler_last(flow_state_handler);
685 }
686
687 self
688 }
689
690 pub fn build(mut self) -> Result<HeartbeatHandlerGroup> {
694 if let Some(customizer) = self
695 .plugins
696 .as_ref()
697 .and_then(|plugins| plugins.get::<HeartbeatHandlerGroupBuilderCustomizerRef>())
698 {
699 debug!("Customizing the heartbeat handler group builder");
700 customizer.customize(&mut self)?;
701 }
702
703 Ok(HeartbeatHandlerGroup {
704 handlers: self.handlers,
705 pushers: self.pushers,
706 })
707 }
708
709 fn add_handler_after_inner(&mut self, target: &str, handler: NameCachedHandler) -> Result<()> {
710 if let Some(pos) = self.handlers.iter().position(|x| x.name == target) {
711 self.handlers.insert(pos + 1, handler);
712 return Ok(());
713 }
714
715 error::HandlerNotFoundSnafu { name: target }.fail()
716 }
717
718 pub fn add_handler_after(
720 &mut self,
721 target: &'static str,
722 handler: impl HeartbeatHandler + 'static,
723 ) -> Result<()> {
724 self.add_handler_after_inner(target, NameCachedHandler::new(handler))
725 }
726
727 fn add_handler_before_inner(&mut self, target: &str, handler: NameCachedHandler) -> Result<()> {
728 if let Some(pos) = self.handlers.iter().position(|x| x.name == target) {
729 self.handlers.insert(pos, handler);
730 return Ok(());
731 }
732
733 error::HandlerNotFoundSnafu { name: target }.fail()
734 }
735
736 pub fn add_handler_before(
738 &mut self,
739 target: &'static str,
740 handler: impl HeartbeatHandler + 'static,
741 ) -> Result<()> {
742 self.add_handler_before_inner(target, NameCachedHandler::new(handler))
743 }
744
745 fn replace_handler_inner(&mut self, target: &str, handler: NameCachedHandler) -> Result<()> {
746 if let Some(pos) = self.handlers.iter().position(|x| x.name == target) {
747 self.handlers[pos] = handler;
748 return Ok(());
749 }
750
751 error::HandlerNotFoundSnafu { name: target }.fail()
752 }
753
754 pub fn replace_handler(
756 &mut self,
757 target: &'static str,
758 handler: impl HeartbeatHandler + 'static,
759 ) -> Result<()> {
760 self.replace_handler_inner(target, NameCachedHandler::new(handler))
761 }
762
763 fn add_handler_last_inner(&mut self, handler: NameCachedHandler) {
764 self.handlers.push(handler);
765 }
766
767 fn add_handler_last(&mut self, handler: impl HeartbeatHandler + 'static) {
768 self.add_handler_last_inner(NameCachedHandler::new(handler));
769 }
770}
771
772pub type HeartbeatHandlerGroupBuilderCustomizerRef =
773 Arc<dyn HeartbeatHandlerGroupBuilderCustomizer>;
774
775pub enum CustomizeHeartbeatGroupAction {
776 AddHandlerAfter {
777 target: String,
778 handler: NameCachedHandler,
779 },
780 AddHandlerBefore {
781 target: String,
782 handler: NameCachedHandler,
783 },
784 ReplaceHandler {
785 target: String,
786 handler: NameCachedHandler,
787 },
788 AddHandlerLast {
789 handler: NameCachedHandler,
790 },
791}
792
793impl CustomizeHeartbeatGroupAction {
794 pub fn new_add_handler_after(
795 target: &'static str,
796 handler: impl HeartbeatHandler + 'static,
797 ) -> Self {
798 Self::AddHandlerAfter {
799 target: target.to_string(),
800 handler: NameCachedHandler::new(handler),
801 }
802 }
803
804 pub fn new_add_handler_before(
805 target: &'static str,
806 handler: impl HeartbeatHandler + 'static,
807 ) -> Self {
808 Self::AddHandlerBefore {
809 target: target.to_string(),
810 handler: NameCachedHandler::new(handler),
811 }
812 }
813
814 pub fn new_replace_handler(
815 target: &'static str,
816 handler: impl HeartbeatHandler + 'static,
817 ) -> Self {
818 Self::ReplaceHandler {
819 target: target.to_string(),
820 handler: NameCachedHandler::new(handler),
821 }
822 }
823
824 pub fn new_add_handler_last(handler: impl HeartbeatHandler + 'static) -> Self {
825 Self::AddHandlerLast {
826 handler: NameCachedHandler::new(handler),
827 }
828 }
829}
830
831pub trait HeartbeatHandlerGroupBuilderCustomizer: Send + Sync {
833 fn customize(&self, builder: &mut HeartbeatHandlerGroupBuilder) -> Result<()>;
834
835 fn add_action(&self, action: CustomizeHeartbeatGroupAction);
836}
837
838#[derive(Default)]
839pub struct DefaultHeartbeatHandlerGroupBuilderCustomizer {
840 actions: Mutex<Vec<CustomizeHeartbeatGroupAction>>,
841}
842
843impl HeartbeatHandlerGroupBuilderCustomizer for DefaultHeartbeatHandlerGroupBuilderCustomizer {
844 fn customize(&self, builder: &mut HeartbeatHandlerGroupBuilder) -> Result<()> {
845 info!("Customizing the heartbeat handler group builder");
846 let mut actions = self.actions.lock().unwrap();
847 for action in actions.drain(..) {
848 match action {
849 CustomizeHeartbeatGroupAction::AddHandlerAfter { target, handler } => {
850 builder.add_handler_after_inner(&target, handler)?;
851 }
852 CustomizeHeartbeatGroupAction::AddHandlerBefore { target, handler } => {
853 builder.add_handler_before_inner(&target, handler)?;
854 }
855 CustomizeHeartbeatGroupAction::ReplaceHandler { target, handler } => {
856 builder.replace_handler_inner(&target, handler)?;
857 }
858 CustomizeHeartbeatGroupAction::AddHandlerLast { handler } => {
859 builder.add_handler_last_inner(handler);
860 }
861 }
862 }
863 Ok(())
864 }
865
866 fn add_action(&self, action: CustomizeHeartbeatGroupAction) {
867 self.actions.lock().unwrap().push(action);
868 }
869}
870
871#[cfg(test)]
872mod tests {
873
874 use std::assert_matches::assert_matches;
875 use std::sync::Arc;
876 use std::time::Duration;
877
878 use api::v1::meta::{MailboxMessage, Role};
879 use common_meta::kv_backend::memory::MemoryKvBackend;
880 use common_meta::sequence::SequenceBuilder;
881 use tokio::sync::mpsc;
882
883 use super::{HeartbeatHandlerGroupBuilder, PusherId, Pushers};
884 use crate::error;
885 use crate::handler::collect_stats_handler::CollectStatsHandler;
886 use crate::handler::response_header_handler::ResponseHeaderHandler;
887 use crate::handler::{HeartbeatHandlerGroup, HeartbeatMailbox, Pusher};
888 use crate::service::mailbox::{Channel, MailboxReceiver, MailboxRef};
889
890 #[tokio::test]
891 async fn test_mailbox() {
892 let (mailbox, receiver) = push_msg_via_mailbox().await;
893 let id = receiver.message_id();
894
895 let resp_msg = MailboxMessage {
896 id,
897 subject: "resp-test".to_string(),
898 timestamp_millis: 456,
899 ..Default::default()
900 };
901
902 mailbox.on_recv(id, Ok(resp_msg)).await.unwrap();
903
904 let recv_msg = receiver.await.unwrap();
905 assert_eq!(recv_msg.id, id);
906 assert_eq!(recv_msg.timestamp_millis, 456);
907 assert_eq!(recv_msg.subject, "resp-test".to_string());
908 }
909
910 #[tokio::test]
911 async fn test_mailbox_timeout() {
912 let (_, receiver) = push_msg_via_mailbox().await;
913 let res = receiver.await;
914 assert!(res.is_err());
915 }
916
917 async fn push_msg_via_mailbox() -> (MailboxRef, MailboxReceiver) {
918 let datanode_id = 12;
919 let (pusher_tx, mut pusher_rx) = mpsc::channel(16);
920 let pusher_id = PusherId::new(Role::Datanode, datanode_id);
921 let pusher: Pusher = Pusher::new(pusher_tx);
922 let handler_group = HeartbeatHandlerGroup::default();
923 handler_group.register_pusher(pusher_id, pusher).await;
924
925 let kv_backend = Arc::new(MemoryKvBackend::new());
926 let seq = SequenceBuilder::new("test_seq", kv_backend).build();
927 let mailbox = HeartbeatMailbox::create(handler_group.pushers(), seq);
928
929 let msg = MailboxMessage {
930 id: 0,
931 subject: "req-test".to_string(),
932 timestamp_millis: 123,
933 ..Default::default()
934 };
935 let ch = Channel::Datanode(datanode_id);
936
937 let receiver = mailbox
938 .send(&ch, msg, Duration::from_secs(1))
939 .await
940 .unwrap();
941
942 let recv_obj = pusher_rx.recv().await.unwrap().unwrap();
943 let message = recv_obj.mailbox_message.unwrap();
944 assert_eq!(message.timestamp_millis, 123);
945 assert_eq!(message.subject, "req-test".to_string());
946
947 (mailbox, receiver)
948 }
949
950 #[test]
951 fn test_handler_group_builder() {
952 let group = HeartbeatHandlerGroupBuilder::new(Pushers::default())
953 .add_default_handlers()
954 .build()
955 .unwrap();
956
957 let handlers = group.handlers;
958 let names = [
959 "ResponseHeaderHandler",
960 "DatanodeKeepLeaseHandler",
961 "FlownodeKeepLeaseHandler",
962 "CheckLeaderHandler",
963 "OnLeaderStartHandler",
964 "ExtractStatHandler",
965 "CollectDatanodeClusterInfoHandler",
966 "CollectFrontendClusterInfoHandler",
967 "CollectFlownodeClusterInfoHandler",
968 "MailboxHandler",
969 "FilterInactiveRegionStatsHandler",
970 "CollectLeaderRegionHandler",
971 "CollectTopicStatsHandler",
972 "CollectStatsHandler",
973 "RemapFlowPeerHandler",
974 ];
975 assert_eq!(names.len(), handlers.len());
976 for (handler, name) in handlers.iter().zip(names.into_iter()) {
977 assert_eq!(handler.name, name);
978 }
979 }
980
981 #[test]
982 fn test_handler_group_builder_add_before() {
983 let mut builder =
984 HeartbeatHandlerGroupBuilder::new(Pushers::default()).add_default_handlers();
985 builder
986 .add_handler_before(
987 "FilterInactiveRegionStatsHandler",
988 CollectStatsHandler::default(),
989 )
990 .unwrap();
991
992 let group = builder.build().unwrap();
993 let handlers = group.handlers;
994 let names = [
995 "ResponseHeaderHandler",
996 "DatanodeKeepLeaseHandler",
997 "FlownodeKeepLeaseHandler",
998 "CheckLeaderHandler",
999 "OnLeaderStartHandler",
1000 "ExtractStatHandler",
1001 "CollectDatanodeClusterInfoHandler",
1002 "CollectFrontendClusterInfoHandler",
1003 "CollectFlownodeClusterInfoHandler",
1004 "MailboxHandler",
1005 "CollectStatsHandler",
1006 "FilterInactiveRegionStatsHandler",
1007 "CollectLeaderRegionHandler",
1008 "CollectTopicStatsHandler",
1009 "CollectStatsHandler",
1010 "RemapFlowPeerHandler",
1011 ];
1012 assert_eq!(names.len(), handlers.len());
1013 for (handler, name) in handlers.iter().zip(names.into_iter()) {
1014 assert_eq!(handler.name, name);
1015 }
1016 }
1017
1018 #[test]
1019 fn test_handler_group_builder_add_before_first() {
1020 let mut builder =
1021 HeartbeatHandlerGroupBuilder::new(Pushers::default()).add_default_handlers();
1022 builder
1023 .add_handler_before("ResponseHeaderHandler", CollectStatsHandler::default())
1024 .unwrap();
1025
1026 let group = builder.build().unwrap();
1027 let handlers = group.handlers;
1028 let names = [
1029 "CollectStatsHandler",
1030 "ResponseHeaderHandler",
1031 "DatanodeKeepLeaseHandler",
1032 "FlownodeKeepLeaseHandler",
1033 "CheckLeaderHandler",
1034 "OnLeaderStartHandler",
1035 "ExtractStatHandler",
1036 "CollectDatanodeClusterInfoHandler",
1037 "CollectFrontendClusterInfoHandler",
1038 "CollectFlownodeClusterInfoHandler",
1039 "MailboxHandler",
1040 "FilterInactiveRegionStatsHandler",
1041 "CollectLeaderRegionHandler",
1042 "CollectTopicStatsHandler",
1043 "CollectStatsHandler",
1044 "RemapFlowPeerHandler",
1045 ];
1046 assert_eq!(names.len(), handlers.len());
1047 for (handler, name) in handlers.iter().zip(names.into_iter()) {
1048 assert_eq!(handler.name, name);
1049 }
1050 }
1051
1052 #[test]
1053 fn test_handler_group_builder_add_after() {
1054 let mut builder =
1055 HeartbeatHandlerGroupBuilder::new(Pushers::default()).add_default_handlers();
1056 builder
1057 .add_handler_after("MailboxHandler", CollectStatsHandler::default())
1058 .unwrap();
1059
1060 let group = builder.build().unwrap();
1061 let handlers = group.handlers;
1062 let names = [
1063 "ResponseHeaderHandler",
1064 "DatanodeKeepLeaseHandler",
1065 "FlownodeKeepLeaseHandler",
1066 "CheckLeaderHandler",
1067 "OnLeaderStartHandler",
1068 "ExtractStatHandler",
1069 "CollectDatanodeClusterInfoHandler",
1070 "CollectFrontendClusterInfoHandler",
1071 "CollectFlownodeClusterInfoHandler",
1072 "MailboxHandler",
1073 "CollectStatsHandler",
1074 "FilterInactiveRegionStatsHandler",
1075 "CollectLeaderRegionHandler",
1076 "CollectTopicStatsHandler",
1077 "CollectStatsHandler",
1078 "RemapFlowPeerHandler",
1079 ];
1080 assert_eq!(names.len(), handlers.len());
1081 for (handler, name) in handlers.iter().zip(names.into_iter()) {
1082 assert_eq!(handler.name, name);
1083 }
1084 }
1085
1086 #[test]
1087 fn test_handler_group_builder_add_after_last() {
1088 let mut builder =
1089 HeartbeatHandlerGroupBuilder::new(Pushers::default()).add_default_handlers();
1090 builder
1091 .add_handler_after("CollectStatsHandler", ResponseHeaderHandler)
1092 .unwrap();
1093
1094 let group = builder.build().unwrap();
1095 let handlers = group.handlers;
1096 let names = [
1097 "ResponseHeaderHandler",
1098 "DatanodeKeepLeaseHandler",
1099 "FlownodeKeepLeaseHandler",
1100 "CheckLeaderHandler",
1101 "OnLeaderStartHandler",
1102 "ExtractStatHandler",
1103 "CollectDatanodeClusterInfoHandler",
1104 "CollectFrontendClusterInfoHandler",
1105 "CollectFlownodeClusterInfoHandler",
1106 "MailboxHandler",
1107 "FilterInactiveRegionStatsHandler",
1108 "CollectLeaderRegionHandler",
1109 "CollectTopicStatsHandler",
1110 "CollectStatsHandler",
1111 "ResponseHeaderHandler",
1112 "RemapFlowPeerHandler",
1113 ];
1114 assert_eq!(names.len(), handlers.len());
1115 for (handler, name) in handlers.iter().zip(names.into_iter()) {
1116 assert_eq!(handler.name, name);
1117 }
1118 }
1119
1120 #[test]
1121 fn test_handler_group_builder_replace() {
1122 let mut builder =
1123 HeartbeatHandlerGroupBuilder::new(Pushers::default()).add_default_handlers();
1124 builder
1125 .replace_handler("MailboxHandler", CollectStatsHandler::default())
1126 .unwrap();
1127
1128 let group = builder.build().unwrap();
1129 let handlers = group.handlers;
1130 let names = [
1131 "ResponseHeaderHandler",
1132 "DatanodeKeepLeaseHandler",
1133 "FlownodeKeepLeaseHandler",
1134 "CheckLeaderHandler",
1135 "OnLeaderStartHandler",
1136 "ExtractStatHandler",
1137 "CollectDatanodeClusterInfoHandler",
1138 "CollectFrontendClusterInfoHandler",
1139 "CollectFlownodeClusterInfoHandler",
1140 "CollectStatsHandler",
1141 "FilterInactiveRegionStatsHandler",
1142 "CollectLeaderRegionHandler",
1143 "CollectTopicStatsHandler",
1144 "CollectStatsHandler",
1145 "RemapFlowPeerHandler",
1146 ];
1147
1148 assert_eq!(names.len(), handlers.len());
1149 for (handler, name) in handlers.iter().zip(names.into_iter()) {
1150 assert_eq!(handler.name, name);
1151 }
1152 }
1153
1154 #[test]
1155 fn test_handler_group_builder_replace_last() {
1156 let mut builder =
1157 HeartbeatHandlerGroupBuilder::new(Pushers::default()).add_default_handlers();
1158 builder
1159 .replace_handler("CollectStatsHandler", ResponseHeaderHandler)
1160 .unwrap();
1161
1162 let group = builder.build().unwrap();
1163 let handlers = group.handlers;
1164 let names = [
1165 "ResponseHeaderHandler",
1166 "DatanodeKeepLeaseHandler",
1167 "FlownodeKeepLeaseHandler",
1168 "CheckLeaderHandler",
1169 "OnLeaderStartHandler",
1170 "ExtractStatHandler",
1171 "CollectDatanodeClusterInfoHandler",
1172 "CollectFrontendClusterInfoHandler",
1173 "CollectFlownodeClusterInfoHandler",
1174 "MailboxHandler",
1175 "FilterInactiveRegionStatsHandler",
1176 "CollectLeaderRegionHandler",
1177 "CollectTopicStatsHandler",
1178 "ResponseHeaderHandler",
1179 "RemapFlowPeerHandler",
1180 ];
1181
1182 assert_eq!(names.len(), handlers.len());
1183 for (handler, name) in handlers.iter().zip(names.into_iter()) {
1184 assert_eq!(handler.name, name);
1185 }
1186 }
1187
1188 #[test]
1189 fn test_handler_group_builder_replace_first() {
1190 let mut builder =
1191 HeartbeatHandlerGroupBuilder::new(Pushers::default()).add_default_handlers();
1192 builder
1193 .replace_handler("ResponseHeaderHandler", CollectStatsHandler::default())
1194 .unwrap();
1195
1196 let group = builder.build().unwrap();
1197 let handlers = group.handlers;
1198 let names = [
1199 "CollectStatsHandler",
1200 "DatanodeKeepLeaseHandler",
1201 "FlownodeKeepLeaseHandler",
1202 "CheckLeaderHandler",
1203 "OnLeaderStartHandler",
1204 "ExtractStatHandler",
1205 "CollectDatanodeClusterInfoHandler",
1206 "CollectFrontendClusterInfoHandler",
1207 "CollectFlownodeClusterInfoHandler",
1208 "MailboxHandler",
1209 "FilterInactiveRegionStatsHandler",
1210 "CollectLeaderRegionHandler",
1211 "CollectTopicStatsHandler",
1212 "CollectStatsHandler",
1213 "RemapFlowPeerHandler",
1214 ];
1215 assert_eq!(names.len(), handlers.len());
1216 for (handler, name) in handlers.iter().zip(names.into_iter()) {
1217 assert_eq!(handler.name, name);
1218 }
1219 }
1220
1221 #[test]
1222 fn test_handler_group_builder_handler_not_found() {
1223 let mut builder =
1224 HeartbeatHandlerGroupBuilder::new(Pushers::default()).add_default_handlers();
1225 let err = builder
1226 .add_handler_before("NotExists", CollectStatsHandler::default())
1227 .unwrap_err();
1228 assert_matches!(err, error::Error::HandlerNotFound { .. });
1229
1230 let err = builder
1231 .add_handler_after("NotExists", CollectStatsHandler::default())
1232 .unwrap_err();
1233 assert_matches!(err, error::Error::HandlerNotFound { .. });
1234
1235 let err = builder
1236 .replace_handler("NotExists", CollectStatsHandler::default())
1237 .unwrap_err();
1238 assert_matches!(err, error::Error::HandlerNotFound { .. });
1239 }
1240
1241 #[tokio::test]
1242 async fn test_pusher_drop() {
1243 let (tx, _rx) = mpsc::channel(1);
1244 let pusher = Pusher::new(tx);
1245 let mut deregister_signal_tx = pusher.deregister_signal_receiver.clone();
1246
1247 drop(pusher);
1248 deregister_signal_tx.changed().await.unwrap();
1249 }
1250}