1use std::collections::{BTreeMap, HashSet};
16use std::fmt::{Debug, Display};
17use std::ops::Range;
18use std::sync::{Arc, Mutex};
19use std::time::{Duration, Instant};
20
21use api::v1::meta::mailbox_message::Payload;
22use api::v1::meta::{
23 HeartbeatRequest, HeartbeatResponse, MailboxMessage, PROTOCOL_VERSION, RegionLease,
24 ResponseHeader, Role,
25};
26use check_leader_handler::CheckLeaderHandler;
27use collect_cluster_info_handler::{
28 CollectDatanodeClusterInfoHandler, CollectFlownodeClusterInfoHandler,
29 CollectFrontendClusterInfoHandler,
30};
31use collect_leader_region_handler::CollectLeaderRegionHandler;
32use collect_stats_handler::CollectStatsHandler;
33use common_base::Plugins;
34use common_meta::datanode::Stat;
35use common_meta::instruction::InstructionReply;
36use common_meta::sequence::Sequence;
37use common_telemetry::{debug, info, warn};
38use dashmap::DashMap;
39use extract_stat_handler::ExtractStatHandler;
40use failure_handler::RegionFailureHandler;
41use filter_inactive_region_stats::FilterInactiveRegionStatsHandler;
42use futures::future::join_all;
43use keep_lease_handler::{DatanodeKeepLeaseHandler, FlownodeKeepLeaseHandler};
44use mailbox_handler::MailboxHandler;
45use on_leader_start_handler::OnLeaderStartHandler;
46use publish_heartbeat_handler::PublishHeartbeatHandler;
47use region_lease_handler::RegionLeaseHandler;
48use remap_flow_peer_handler::RemapFlowPeerHandler;
49use response_header_handler::ResponseHeaderHandler;
50use snafu::{OptionExt, ResultExt};
51use store_api::storage::RegionId;
52use tokio::sync::mpsc::Sender;
53use tokio::sync::{Notify, RwLock, oneshot, watch};
54
55use crate::error::{self, DeserializeFromJsonSnafu, Result, UnexpectedInstructionReplySnafu};
56use crate::handler::collect_topic_stats_handler::CollectTopicStatsHandler;
57use crate::handler::flow_state_handler::FlowStateHandler;
58use crate::handler::persist_stats_handler::PersistStatsHandler;
59use crate::metasrv::Context;
60use crate::metrics::{METRIC_META_HANDLER_EXECUTE, METRIC_META_HEARTBEAT_CONNECTION_NUM};
61use crate::pubsub::PublisherRef;
62use crate::service::mailbox::{
63 BroadcastChannel, Channel, Mailbox, MailboxReceiver, MailboxRef, MessageId,
64};
65
66pub mod check_leader_handler;
67pub mod collect_cluster_info_handler;
68pub mod collect_leader_region_handler;
69pub mod collect_stats_handler;
70pub mod collect_topic_stats_handler;
71pub mod extract_stat_handler;
72pub mod failure_handler;
73pub mod filter_inactive_region_stats;
74pub mod flow_state_handler;
75pub mod keep_lease_handler;
76pub mod mailbox_handler;
77pub mod on_leader_start_handler;
78pub mod persist_stats_handler;
79pub mod publish_heartbeat_handler;
80pub mod region_lease_handler;
81pub mod remap_flow_peer_handler;
82pub mod response_header_handler;
83
84#[cfg(test)]
85pub mod test_utils;
86
87#[async_trait::async_trait]
88pub trait HeartbeatHandler: Send + Sync {
89 fn is_acceptable(&self, role: Role) -> bool;
90
91 fn name(&self) -> &'static str {
92 let type_name = std::any::type_name::<Self>();
93 type_name.split("::").last().unwrap_or(type_name)
95 }
96
97 async fn handle(
98 &self,
99 req: &HeartbeatRequest,
100 ctx: &mut Context,
101 acc: &mut HeartbeatAccumulator,
102 ) -> Result<HandleControl>;
103}
104
105#[derive(PartialEq, Debug)]
109pub enum HandleControl {
110 Continue,
111 Done,
112}
113
114#[derive(Debug, Default)]
115pub struct HeartbeatAccumulator {
116 pub header: Option<ResponseHeader>,
117 mailbox_message: Option<MailboxMessage>,
118 pub stat: Option<Stat>,
119 pub inactive_region_ids: HashSet<RegionId>,
120 pub region_lease: Option<RegionLease>,
121}
122
123impl HeartbeatAccumulator {
124 pub(crate) fn take_mailbox_message(&mut self) -> Option<MailboxMessage> {
125 self.mailbox_message.take()
126 }
127
128 pub fn set_mailbox_message(&mut self, message: MailboxMessage) {
129 let _ = self.mailbox_message.insert(message);
130 }
131}
132
133#[derive(Copy, Clone)]
134pub struct PusherId {
135 pub role: Role,
136 pub id: u64,
137}
138
139impl Debug for PusherId {
140 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
141 write!(f, "{:?}-{}", self.role, self.id)
142 }
143}
144
145impl Display for PusherId {
146 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
147 write!(f, "{:?}-{}", self.role, self.id)
148 }
149}
150
151impl PusherId {
152 pub fn new(role: Role, id: u64) -> Self {
153 Self { role, id }
154 }
155
156 pub fn string_key(&self) -> String {
157 format!("{}-{}", self.role as i32, self.id)
158 }
159}
160
161pub type DeregisterSignalReceiver = watch::Receiver<bool>;
163
164pub struct Pusher {
166 sender: Sender<std::result::Result<HeartbeatResponse, tonic::Status>>,
167 deregister_signal_sender: watch::Sender<bool>,
171 deregister_signal_receiver: DeregisterSignalReceiver,
172
173 res_header: ResponseHeader,
174}
175
176impl Drop for Pusher {
177 fn drop(&mut self) {
178 let _ = self.deregister_signal_sender.send(true);
181 }
182}
183
184impl Pusher {
185 pub fn new(sender: Sender<std::result::Result<HeartbeatResponse, tonic::Status>>) -> Self {
186 let res_header = ResponseHeader {
187 protocol_version: PROTOCOL_VERSION,
188 ..Default::default()
189 };
190 let (deregister_signal_sender, deregister_signal_receiver) = watch::channel(false);
191 Self {
192 sender,
193 deregister_signal_sender,
194 deregister_signal_receiver,
195 res_header,
196 }
197 }
198
199 #[inline]
200 pub async fn push(&self, res: HeartbeatResponse) -> Result<()> {
201 self.sender.send(Ok(res)).await.map_err(|e| {
202 error::PushMessageSnafu {
203 err_msg: e.to_string(),
204 }
205 .build()
206 })
207 }
208
209 #[inline]
210 pub fn header(&self) -> ResponseHeader {
211 self.res_header.clone()
212 }
213}
214
215#[derive(Clone, Default)]
217pub struct Pushers(Arc<RwLock<BTreeMap<String, Pusher>>>);
218
219impl Pushers {
220 async fn push(
221 &self,
222 pusher_id: PusherId,
223 mailbox_message: MailboxMessage,
224 ) -> Result<DeregisterSignalReceiver> {
225 let pusher_id = pusher_id.string_key();
226 let pushers = self.0.read().await;
227 let pusher = pushers
228 .get(&pusher_id)
229 .context(error::PusherNotFoundSnafu { pusher_id })?;
230
231 pusher
232 .push(HeartbeatResponse {
233 header: Some(pusher.header()),
234 mailbox_message: Some(mailbox_message),
235 ..Default::default()
236 })
237 .await?;
238
239 Ok(pusher.deregister_signal_receiver.clone())
240 }
241
242 async fn broadcast(
243 &self,
244 range: Range<String>,
245 mailbox_message: &MailboxMessage,
246 ) -> Result<()> {
247 let pushers = self.0.read().await;
248 let pushers = pushers
249 .range(range)
250 .map(|(_, value)| value)
251 .collect::<Vec<_>>();
252 let mut results = Vec::with_capacity(pushers.len());
253
254 for pusher in pushers {
255 let mut mailbox_message = mailbox_message.clone();
256 mailbox_message.id = 0; results.push(pusher.push(HeartbeatResponse {
259 header: Some(pusher.header()),
260 mailbox_message: Some(mailbox_message),
261 ..Default::default()
262 }))
263 }
264
265 let _ = join_all(results)
267 .await
268 .into_iter()
269 .collect::<Result<Vec<_>>>()?;
270
271 Ok(())
272 }
273
274 pub(crate) async fn insert(&self, pusher_id: String, pusher: Pusher) -> Option<Pusher> {
275 self.0.write().await.insert(pusher_id, pusher)
276 }
277
278 async fn remove(&self, pusher_id: &str) -> Option<Pusher> {
279 self.0.write().await.remove(pusher_id)
280 }
281
282 pub(crate) async fn clear(&self) -> Vec<String> {
283 let mut pushers = self.0.write().await;
284 let keys = pushers.keys().cloned().collect::<Vec<_>>();
285 if !keys.is_empty() {
286 pushers.clear();
287 }
288 keys
289 }
290}
291
292#[derive(Clone)]
293pub struct NameCachedHandler {
294 name: &'static str,
295 handler: Arc<dyn HeartbeatHandler>,
296}
297
298impl NameCachedHandler {
299 fn new(handler: impl HeartbeatHandler + 'static) -> Self {
300 let name = handler.name();
301 let handler = Arc::new(handler);
302 Self { name, handler }
303 }
304}
305
306pub type HeartbeatHandlerGroupRef = Arc<HeartbeatHandlerGroup>;
307
308#[derive(Default, Clone)]
310pub struct HeartbeatHandlerGroup {
311 handlers: Vec<NameCachedHandler>,
312 pushers: Pushers,
313}
314
315impl HeartbeatHandlerGroup {
316 pub async fn register_pusher(&self, pusher_id: PusherId, pusher: Pusher) {
318 METRIC_META_HEARTBEAT_CONNECTION_NUM.inc();
319 info!("Pusher register: {}", pusher_id);
320 let _ = self.pushers.insert(pusher_id.string_key(), pusher).await;
321 }
322
323 pub async fn deregister_push(&self, pusher_id: PusherId) {
325 info!("Pusher unregister: {}", pusher_id);
326 if self.pushers.remove(&pusher_id.string_key()).await.is_some() {
327 METRIC_META_HEARTBEAT_CONNECTION_NUM.dec();
328 }
329 }
330
331 pub fn pushers(&self) -> Pushers {
333 self.pushers.clone()
334 }
335
336 pub async fn handle(
338 &self,
339 req: HeartbeatRequest,
340 mut ctx: Context,
341 ) -> Result<HeartbeatResponse> {
342 let mut acc = HeartbeatAccumulator::default();
343 let role = req
344 .header
345 .as_ref()
346 .and_then(|h| Role::try_from(h.role).ok())
347 .context(error::InvalidArgumentsSnafu {
348 err_msg: format!("invalid role: {:?}", req.header),
349 })?;
350
351 let is_handshake = ctx.is_handshake;
352
353 for NameCachedHandler { name, handler } in self.handlers.iter() {
354 if !handler.is_acceptable(role) {
355 continue;
356 }
357
358 let _timer = METRIC_META_HANDLER_EXECUTE
359 .with_label_values(&[*name])
360 .start_timer();
361
362 if handler.handle(&req, &mut ctx, &mut acc).await? == HandleControl::Done {
363 break;
364 }
365 }
366 let header = std::mem::take(&mut acc.header);
367 let mailbox_message = acc.take_mailbox_message();
368
369 let heartbeat_config = if is_handshake {
371 let config = ctx.heartbeat_options_for(role).into();
372
373 info!(
374 "Handshake with {:?} node, sending config: {:?}",
375 role, config
376 );
377
378 Some(config)
379 } else {
380 None
381 };
382
383 let res = HeartbeatResponse {
384 header,
385 region_lease: acc.region_lease,
386 mailbox_message,
387 heartbeat_config,
388 };
389 Ok(res)
390 }
391}
392
393pub struct HeartbeatMailbox {
394 pushers: Pushers,
395 sequence: Sequence,
396 senders: DashMap<MessageId, oneshot::Sender<Result<MailboxMessage>>>,
397 timeouts: DashMap<MessageId, Instant>,
398 timeout_notify: Notify,
399}
400
401impl HeartbeatMailbox {
402 pub fn json_reply(msg: &MailboxMessage) -> Result<InstructionReply> {
403 let Payload::Json(payload) =
404 msg.payload
405 .as_ref()
406 .with_context(|| UnexpectedInstructionReplySnafu {
407 mailbox_message: msg.to_string(),
408 reason: format!("empty payload, msg: {msg:?}"),
409 })?;
410 serde_json::from_str(payload).context(DeserializeFromJsonSnafu { input: payload })
411 }
412
413 #[cfg(test)]
415 pub(crate) fn json_instruction(
416 msg: &MailboxMessage,
417 ) -> Result<common_meta::instruction::Instruction> {
418 let Payload::Json(payload) =
419 msg.payload
420 .as_ref()
421 .with_context(|| UnexpectedInstructionReplySnafu {
422 mailbox_message: msg.to_string(),
423 reason: format!("empty payload, msg: {msg:?}"),
424 })?;
425 serde_json::from_str(payload).context(DeserializeFromJsonSnafu { input: payload })
426 }
427
428 pub fn create(pushers: Pushers, sequence: Sequence) -> MailboxRef {
429 let mailbox = Arc::new(Self::new(pushers, sequence));
430
431 let timeout_checker = mailbox.clone();
432 let _handle = common_runtime::spawn_global(async move {
433 timeout_checker.check_timeout_bg(10).await;
434 });
435
436 mailbox
437 }
438
439 fn new(pushers: Pushers, sequence: Sequence) -> Self {
440 Self {
441 pushers,
442 sequence,
443 senders: DashMap::default(),
444 timeouts: DashMap::default(),
445 timeout_notify: Notify::new(),
446 }
447 }
448
449 async fn check_timeout_bg(&self, interval_millis: u64) {
450 let mut interval = tokio::time::interval(Duration::from_millis(interval_millis));
451
452 loop {
453 let _ = interval.tick().await;
454
455 if self.timeouts.is_empty() {
456 self.timeout_notify.notified().await;
457 }
458
459 let now = Instant::now();
460 let timeout_ids = self
461 .timeouts
462 .iter()
463 .filter_map(|entry| {
464 let (id, deadline) = entry.pair();
465 if deadline < &now { Some(*id) } else { None }
466 })
467 .collect::<Vec<_>>();
468
469 for id in timeout_ids {
470 let _ = self
471 .on_recv(id, Err(error::MailboxTimeoutSnafu { id }.build()))
472 .await;
473 }
474 }
475 }
476
477 #[inline]
478 async fn next_message_id(&self) -> Result<u64> {
479 loop {
482 let next = self
483 .sequence
484 .next()
485 .await
486 .context(error::NextSequenceSnafu)?;
487 if next > 0 {
488 return Ok(next);
489 }
490 }
491 }
492}
493
494#[async_trait::async_trait]
495impl Mailbox for HeartbeatMailbox {
496 async fn send(
497 &self,
498 ch: &Channel,
499 mut msg: MailboxMessage,
500 timeout: Duration,
501 ) -> Result<MailboxReceiver> {
502 let message_id = self.next_message_id().await?;
503 msg.id = message_id;
504
505 let pusher_id = ch.pusher_id();
506 debug!("Sending mailbox message {msg:?} to {pusher_id}");
507
508 let (tx, rx) = oneshot::channel();
509 let _ = self.senders.insert(message_id, tx);
510 let deadline = Instant::now() + timeout;
511 self.timeouts.insert(message_id, deadline);
512 self.timeout_notify.notify_one();
513 let deregister_signal_receiver = self.pushers.push(pusher_id, msg).await?;
514
515 Ok(MailboxReceiver::new(
516 message_id,
517 rx,
518 deregister_signal_receiver,
519 *ch,
520 ))
521 }
522
523 async fn send_oneway(&self, ch: &Channel, mut msg: MailboxMessage) -> Result<()> {
524 let message_id = 0; msg.id = message_id;
526
527 let pusher_id = ch.pusher_id();
528 debug!("Sending mailbox message {msg:?} to {pusher_id}");
529
530 self.pushers.push(pusher_id, msg).await?;
531
532 Ok(())
533 }
534
535 async fn broadcast(&self, ch: &BroadcastChannel, msg: &MailboxMessage) -> Result<()> {
536 self.pushers.broadcast(ch.pusher_range(), msg).await
537 }
538
539 async fn on_recv(&self, id: MessageId, maybe_msg: Result<MailboxMessage>) -> Result<()> {
540 debug!("Received mailbox message {maybe_msg:?}");
541
542 let _ = self.timeouts.remove(&id);
543
544 if let Some((_, tx)) = self.senders.remove(&id) {
545 tx.send(maybe_msg)
546 .map_err(|_| error::MailboxClosedSnafu { id }.build())?;
547 } else if let Ok(finally_msg) = maybe_msg {
548 warn!("The response arrived too late: {finally_msg:?}");
549 }
550
551 Ok(())
552 }
553
554 async fn reset(&self) {
555 let keys = self.pushers.clear().await;
556 if !keys.is_empty() {
557 info!("Reset mailbox, deregister pushers: {:?}", keys);
558 METRIC_META_HEARTBEAT_CONNECTION_NUM.sub(keys.len() as i64);
559 }
560 }
561}
562
563pub struct HeartbeatHandlerGroupBuilder {
565 region_failure_handler: Option<RegionFailureHandler>,
567
568 region_lease_handler: Option<RegionLeaseHandler>,
570
571 flush_stats_factor: Option<usize>,
575 flow_state_handler: Option<FlowStateHandler>,
577
578 persist_stats_handler: Option<PersistStatsHandler>,
580
581 plugins: Option<Plugins>,
583
584 pushers: Pushers,
586
587 handlers: Vec<NameCachedHandler>,
589}
590
591impl HeartbeatHandlerGroupBuilder {
592 pub fn new(pushers: Pushers) -> Self {
593 Self {
594 region_failure_handler: None,
595 region_lease_handler: None,
596 flush_stats_factor: None,
597 flow_state_handler: None,
598 persist_stats_handler: None,
599 plugins: None,
600 pushers,
601 handlers: vec![],
602 }
603 }
604
605 pub fn with_flow_state_handler(mut self, handler: Option<FlowStateHandler>) -> Self {
606 self.flow_state_handler = handler;
607 self
608 }
609
610 pub fn with_region_lease_handler(mut self, handler: Option<RegionLeaseHandler>) -> Self {
611 self.region_lease_handler = handler;
612 self
613 }
614
615 pub fn with_region_failure_handler(mut self, handler: Option<RegionFailureHandler>) -> Self {
617 self.region_failure_handler = handler;
618 self
619 }
620
621 pub fn with_flush_stats_factor(mut self, flush_stats_factor: Option<usize>) -> Self {
623 self.flush_stats_factor = flush_stats_factor;
624 self
625 }
626
627 pub fn with_persist_stats_handler(mut self, handler: Option<PersistStatsHandler>) -> Self {
628 self.persist_stats_handler = handler;
629 self
630 }
631
632 pub fn with_plugins(mut self, plugins: Option<Plugins>) -> Self {
634 self.plugins = plugins;
635 self
636 }
637
638 pub fn add_default_handlers(mut self) -> Self {
640 let publish_heartbeat_handler = if let Some(plugins) = self.plugins.as_ref() {
642 plugins
643 .get::<PublisherRef>()
644 .map(|publish| PublishHeartbeatHandler::new(publish.clone()))
645 } else {
646 None
647 };
648
649 self.add_handler_last(ResponseHeaderHandler);
650 self.add_handler_last(DatanodeKeepLeaseHandler);
654 self.add_handler_last(FlownodeKeepLeaseHandler);
655 self.add_handler_last(CheckLeaderHandler);
656 self.add_handler_last(OnLeaderStartHandler);
657 self.add_handler_last(ExtractStatHandler);
658 self.add_handler_last(CollectDatanodeClusterInfoHandler);
659 self.add_handler_last(CollectFrontendClusterInfoHandler);
660 self.add_handler_last(CollectFlownodeClusterInfoHandler);
661 self.add_handler_last(MailboxHandler);
662 if let Some(region_lease_handler) = self.region_lease_handler.take() {
663 self.add_handler_last(region_lease_handler);
664 }
665 self.add_handler_last(FilterInactiveRegionStatsHandler);
666 if let Some(region_failure_handler) = self.region_failure_handler.take() {
667 self.add_handler_last(region_failure_handler);
668 }
669 if let Some(publish_heartbeat_handler) = publish_heartbeat_handler {
670 self.add_handler_last(publish_heartbeat_handler);
671 }
672 self.add_handler_last(CollectLeaderRegionHandler);
673 self.add_handler_last(CollectTopicStatsHandler);
674 if let Some(persist_stats_handler) = self.persist_stats_handler.take() {
677 self.add_handler_last(persist_stats_handler);
678 }
679 self.add_handler_last(CollectStatsHandler::new(self.flush_stats_factor));
680 self.add_handler_last(RemapFlowPeerHandler::default());
681
682 if let Some(flow_state_handler) = self.flow_state_handler.take() {
683 self.add_handler_last(flow_state_handler);
684 }
685
686 self
687 }
688
689 pub fn build(mut self) -> Result<HeartbeatHandlerGroup> {
693 if let Some(customizer) = self
694 .plugins
695 .as_ref()
696 .and_then(|plugins| plugins.get::<HeartbeatHandlerGroupBuilderCustomizerRef>())
697 {
698 debug!("Customizing the heartbeat handler group builder");
699 customizer.customize(&mut self)?;
700 }
701
702 Ok(HeartbeatHandlerGroup {
703 handlers: self.handlers,
704 pushers: self.pushers,
705 })
706 }
707
708 fn add_handler_after_inner(&mut self, target: &str, handler: NameCachedHandler) -> Result<()> {
709 if let Some(pos) = self.handlers.iter().position(|x| x.name == target) {
710 self.handlers.insert(pos + 1, handler);
711 return Ok(());
712 }
713
714 error::HandlerNotFoundSnafu { name: target }.fail()
715 }
716
717 pub fn add_handler_after(
719 &mut self,
720 target: &'static str,
721 handler: impl HeartbeatHandler + 'static,
722 ) -> Result<()> {
723 self.add_handler_after_inner(target, NameCachedHandler::new(handler))
724 }
725
726 fn add_handler_before_inner(&mut self, target: &str, handler: NameCachedHandler) -> Result<()> {
727 if let Some(pos) = self.handlers.iter().position(|x| x.name == target) {
728 self.handlers.insert(pos, handler);
729 return Ok(());
730 }
731
732 error::HandlerNotFoundSnafu { name: target }.fail()
733 }
734
735 pub fn add_handler_before(
737 &mut self,
738 target: &'static str,
739 handler: impl HeartbeatHandler + 'static,
740 ) -> Result<()> {
741 self.add_handler_before_inner(target, NameCachedHandler::new(handler))
742 }
743
744 fn replace_handler_inner(&mut self, target: &str, handler: NameCachedHandler) -> Result<()> {
745 if let Some(pos) = self.handlers.iter().position(|x| x.name == target) {
746 self.handlers[pos] = handler;
747 return Ok(());
748 }
749
750 error::HandlerNotFoundSnafu { name: target }.fail()
751 }
752
753 pub fn replace_handler(
755 &mut self,
756 target: &'static str,
757 handler: impl HeartbeatHandler + 'static,
758 ) -> Result<()> {
759 self.replace_handler_inner(target, NameCachedHandler::new(handler))
760 }
761
762 fn add_handler_last_inner(&mut self, handler: NameCachedHandler) {
763 self.handlers.push(handler);
764 }
765
766 fn add_handler_last(&mut self, handler: impl HeartbeatHandler + 'static) {
767 self.add_handler_last_inner(NameCachedHandler::new(handler));
768 }
769}
770
771pub type HeartbeatHandlerGroupBuilderCustomizerRef =
772 Arc<dyn HeartbeatHandlerGroupBuilderCustomizer>;
773
774pub enum CustomizeHeartbeatGroupAction {
775 AddHandlerAfter {
776 target: String,
777 handler: NameCachedHandler,
778 },
779 AddHandlerBefore {
780 target: String,
781 handler: NameCachedHandler,
782 },
783 ReplaceHandler {
784 target: String,
785 handler: NameCachedHandler,
786 },
787 AddHandlerLast {
788 handler: NameCachedHandler,
789 },
790}
791
792impl CustomizeHeartbeatGroupAction {
793 pub fn new_add_handler_after(
794 target: &'static str,
795 handler: impl HeartbeatHandler + 'static,
796 ) -> Self {
797 Self::AddHandlerAfter {
798 target: target.to_string(),
799 handler: NameCachedHandler::new(handler),
800 }
801 }
802
803 pub fn new_add_handler_before(
804 target: &'static str,
805 handler: impl HeartbeatHandler + 'static,
806 ) -> Self {
807 Self::AddHandlerBefore {
808 target: target.to_string(),
809 handler: NameCachedHandler::new(handler),
810 }
811 }
812
813 pub fn new_replace_handler(
814 target: &'static str,
815 handler: impl HeartbeatHandler + 'static,
816 ) -> Self {
817 Self::ReplaceHandler {
818 target: target.to_string(),
819 handler: NameCachedHandler::new(handler),
820 }
821 }
822
823 pub fn new_add_handler_last(handler: impl HeartbeatHandler + 'static) -> Self {
824 Self::AddHandlerLast {
825 handler: NameCachedHandler::new(handler),
826 }
827 }
828}
829
830pub trait HeartbeatHandlerGroupBuilderCustomizer: Send + Sync {
832 fn customize(&self, builder: &mut HeartbeatHandlerGroupBuilder) -> Result<()>;
833
834 fn add_action(&self, action: CustomizeHeartbeatGroupAction);
835}
836
837#[derive(Default)]
838pub struct DefaultHeartbeatHandlerGroupBuilderCustomizer {
839 actions: Mutex<Vec<CustomizeHeartbeatGroupAction>>,
840}
841
842impl HeartbeatHandlerGroupBuilderCustomizer for DefaultHeartbeatHandlerGroupBuilderCustomizer {
843 fn customize(&self, builder: &mut HeartbeatHandlerGroupBuilder) -> Result<()> {
844 info!("Customizing the heartbeat handler group builder");
845 let mut actions = self.actions.lock().unwrap();
846 for action in actions.drain(..) {
847 match action {
848 CustomizeHeartbeatGroupAction::AddHandlerAfter { target, handler } => {
849 builder.add_handler_after_inner(&target, handler)?;
850 }
851 CustomizeHeartbeatGroupAction::AddHandlerBefore { target, handler } => {
852 builder.add_handler_before_inner(&target, handler)?;
853 }
854 CustomizeHeartbeatGroupAction::ReplaceHandler { target, handler } => {
855 builder.replace_handler_inner(&target, handler)?;
856 }
857 CustomizeHeartbeatGroupAction::AddHandlerLast { handler } => {
858 builder.add_handler_last_inner(handler);
859 }
860 }
861 }
862 Ok(())
863 }
864
865 fn add_action(&self, action: CustomizeHeartbeatGroupAction) {
866 self.actions.lock().unwrap().push(action);
867 }
868}
869
870#[cfg(test)]
871mod tests {
872
873 use std::assert_matches::assert_matches;
874 use std::sync::Arc;
875 use std::time::Duration;
876
877 use api::v1::meta::{MailboxMessage, Role};
878 use common_meta::kv_backend::memory::MemoryKvBackend;
879 use common_meta::sequence::SequenceBuilder;
880 use tokio::sync::mpsc;
881
882 use super::{HeartbeatHandlerGroupBuilder, PusherId, Pushers};
883 use crate::error;
884 use crate::handler::collect_stats_handler::CollectStatsHandler;
885 use crate::handler::response_header_handler::ResponseHeaderHandler;
886 use crate::handler::{HeartbeatHandlerGroup, HeartbeatMailbox, Pusher};
887 use crate::service::mailbox::{Channel, MailboxReceiver, MailboxRef};
888
889 #[tokio::test]
890 async fn test_mailbox() {
891 let (mailbox, receiver) = push_msg_via_mailbox().await;
892 let id = receiver.message_id();
893
894 let resp_msg = MailboxMessage {
895 id,
896 subject: "resp-test".to_string(),
897 timestamp_millis: 456,
898 ..Default::default()
899 };
900
901 mailbox.on_recv(id, Ok(resp_msg)).await.unwrap();
902
903 let recv_msg = receiver.await.unwrap();
904 assert_eq!(recv_msg.id, id);
905 assert_eq!(recv_msg.timestamp_millis, 456);
906 assert_eq!(recv_msg.subject, "resp-test".to_string());
907 }
908
909 #[tokio::test]
910 async fn test_mailbox_timeout() {
911 let (_, receiver) = push_msg_via_mailbox().await;
912 let res = receiver.await;
913 assert!(res.is_err());
914 }
915
916 async fn push_msg_via_mailbox() -> (MailboxRef, MailboxReceiver) {
917 let datanode_id = 12;
918 let (pusher_tx, mut pusher_rx) = mpsc::channel(16);
919 let pusher_id = PusherId::new(Role::Datanode, datanode_id);
920 let pusher: Pusher = Pusher::new(pusher_tx);
921 let handler_group = HeartbeatHandlerGroup::default();
922 handler_group.register_pusher(pusher_id, pusher).await;
923
924 let kv_backend = Arc::new(MemoryKvBackend::new());
925 let seq = SequenceBuilder::new("test_seq", kv_backend).build();
926 let mailbox = HeartbeatMailbox::create(handler_group.pushers(), seq);
927
928 let msg = MailboxMessage {
929 id: 0,
930 subject: "req-test".to_string(),
931 timestamp_millis: 123,
932 ..Default::default()
933 };
934 let ch = Channel::Datanode(datanode_id);
935
936 let receiver = mailbox
937 .send(&ch, msg, Duration::from_secs(1))
938 .await
939 .unwrap();
940
941 let recv_obj = pusher_rx.recv().await.unwrap().unwrap();
942 let message = recv_obj.mailbox_message.unwrap();
943 assert_eq!(message.timestamp_millis, 123);
944 assert_eq!(message.subject, "req-test".to_string());
945
946 (mailbox, receiver)
947 }
948
949 #[test]
950 fn test_handler_group_builder() {
951 let group = HeartbeatHandlerGroupBuilder::new(Pushers::default())
952 .add_default_handlers()
953 .build()
954 .unwrap();
955
956 let handlers = group.handlers;
957 let names = [
958 "ResponseHeaderHandler",
959 "DatanodeKeepLeaseHandler",
960 "FlownodeKeepLeaseHandler",
961 "CheckLeaderHandler",
962 "OnLeaderStartHandler",
963 "ExtractStatHandler",
964 "CollectDatanodeClusterInfoHandler",
965 "CollectFrontendClusterInfoHandler",
966 "CollectFlownodeClusterInfoHandler",
967 "MailboxHandler",
968 "FilterInactiveRegionStatsHandler",
969 "CollectLeaderRegionHandler",
970 "CollectTopicStatsHandler",
971 "CollectStatsHandler",
972 "RemapFlowPeerHandler",
973 ];
974 assert_eq!(names.len(), handlers.len());
975 for (handler, name) in handlers.iter().zip(names.into_iter()) {
976 assert_eq!(handler.name, name);
977 }
978 }
979
980 #[test]
981 fn test_handler_group_builder_add_before() {
982 let mut builder =
983 HeartbeatHandlerGroupBuilder::new(Pushers::default()).add_default_handlers();
984 builder
985 .add_handler_before(
986 "FilterInactiveRegionStatsHandler",
987 CollectStatsHandler::default(),
988 )
989 .unwrap();
990
991 let group = builder.build().unwrap();
992 let handlers = group.handlers;
993 let names = [
994 "ResponseHeaderHandler",
995 "DatanodeKeepLeaseHandler",
996 "FlownodeKeepLeaseHandler",
997 "CheckLeaderHandler",
998 "OnLeaderStartHandler",
999 "ExtractStatHandler",
1000 "CollectDatanodeClusterInfoHandler",
1001 "CollectFrontendClusterInfoHandler",
1002 "CollectFlownodeClusterInfoHandler",
1003 "MailboxHandler",
1004 "CollectStatsHandler",
1005 "FilterInactiveRegionStatsHandler",
1006 "CollectLeaderRegionHandler",
1007 "CollectTopicStatsHandler",
1008 "CollectStatsHandler",
1009 "RemapFlowPeerHandler",
1010 ];
1011 assert_eq!(names.len(), handlers.len());
1012 for (handler, name) in handlers.iter().zip(names.into_iter()) {
1013 assert_eq!(handler.name, name);
1014 }
1015 }
1016
1017 #[test]
1018 fn test_handler_group_builder_add_before_first() {
1019 let mut builder =
1020 HeartbeatHandlerGroupBuilder::new(Pushers::default()).add_default_handlers();
1021 builder
1022 .add_handler_before("ResponseHeaderHandler", CollectStatsHandler::default())
1023 .unwrap();
1024
1025 let group = builder.build().unwrap();
1026 let handlers = group.handlers;
1027 let names = [
1028 "CollectStatsHandler",
1029 "ResponseHeaderHandler",
1030 "DatanodeKeepLeaseHandler",
1031 "FlownodeKeepLeaseHandler",
1032 "CheckLeaderHandler",
1033 "OnLeaderStartHandler",
1034 "ExtractStatHandler",
1035 "CollectDatanodeClusterInfoHandler",
1036 "CollectFrontendClusterInfoHandler",
1037 "CollectFlownodeClusterInfoHandler",
1038 "MailboxHandler",
1039 "FilterInactiveRegionStatsHandler",
1040 "CollectLeaderRegionHandler",
1041 "CollectTopicStatsHandler",
1042 "CollectStatsHandler",
1043 "RemapFlowPeerHandler",
1044 ];
1045 assert_eq!(names.len(), handlers.len());
1046 for (handler, name) in handlers.iter().zip(names.into_iter()) {
1047 assert_eq!(handler.name, name);
1048 }
1049 }
1050
1051 #[test]
1052 fn test_handler_group_builder_add_after() {
1053 let mut builder =
1054 HeartbeatHandlerGroupBuilder::new(Pushers::default()).add_default_handlers();
1055 builder
1056 .add_handler_after("MailboxHandler", CollectStatsHandler::default())
1057 .unwrap();
1058
1059 let group = builder.build().unwrap();
1060 let handlers = group.handlers;
1061 let names = [
1062 "ResponseHeaderHandler",
1063 "DatanodeKeepLeaseHandler",
1064 "FlownodeKeepLeaseHandler",
1065 "CheckLeaderHandler",
1066 "OnLeaderStartHandler",
1067 "ExtractStatHandler",
1068 "CollectDatanodeClusterInfoHandler",
1069 "CollectFrontendClusterInfoHandler",
1070 "CollectFlownodeClusterInfoHandler",
1071 "MailboxHandler",
1072 "CollectStatsHandler",
1073 "FilterInactiveRegionStatsHandler",
1074 "CollectLeaderRegionHandler",
1075 "CollectTopicStatsHandler",
1076 "CollectStatsHandler",
1077 "RemapFlowPeerHandler",
1078 ];
1079 assert_eq!(names.len(), handlers.len());
1080 for (handler, name) in handlers.iter().zip(names.into_iter()) {
1081 assert_eq!(handler.name, name);
1082 }
1083 }
1084
1085 #[test]
1086 fn test_handler_group_builder_add_after_last() {
1087 let mut builder =
1088 HeartbeatHandlerGroupBuilder::new(Pushers::default()).add_default_handlers();
1089 builder
1090 .add_handler_after("CollectStatsHandler", ResponseHeaderHandler)
1091 .unwrap();
1092
1093 let group = builder.build().unwrap();
1094 let handlers = group.handlers;
1095 let names = [
1096 "ResponseHeaderHandler",
1097 "DatanodeKeepLeaseHandler",
1098 "FlownodeKeepLeaseHandler",
1099 "CheckLeaderHandler",
1100 "OnLeaderStartHandler",
1101 "ExtractStatHandler",
1102 "CollectDatanodeClusterInfoHandler",
1103 "CollectFrontendClusterInfoHandler",
1104 "CollectFlownodeClusterInfoHandler",
1105 "MailboxHandler",
1106 "FilterInactiveRegionStatsHandler",
1107 "CollectLeaderRegionHandler",
1108 "CollectTopicStatsHandler",
1109 "CollectStatsHandler",
1110 "ResponseHeaderHandler",
1111 "RemapFlowPeerHandler",
1112 ];
1113 assert_eq!(names.len(), handlers.len());
1114 for (handler, name) in handlers.iter().zip(names.into_iter()) {
1115 assert_eq!(handler.name, name);
1116 }
1117 }
1118
1119 #[test]
1120 fn test_handler_group_builder_replace() {
1121 let mut builder =
1122 HeartbeatHandlerGroupBuilder::new(Pushers::default()).add_default_handlers();
1123 builder
1124 .replace_handler("MailboxHandler", CollectStatsHandler::default())
1125 .unwrap();
1126
1127 let group = builder.build().unwrap();
1128 let handlers = group.handlers;
1129 let names = [
1130 "ResponseHeaderHandler",
1131 "DatanodeKeepLeaseHandler",
1132 "FlownodeKeepLeaseHandler",
1133 "CheckLeaderHandler",
1134 "OnLeaderStartHandler",
1135 "ExtractStatHandler",
1136 "CollectDatanodeClusterInfoHandler",
1137 "CollectFrontendClusterInfoHandler",
1138 "CollectFlownodeClusterInfoHandler",
1139 "CollectStatsHandler",
1140 "FilterInactiveRegionStatsHandler",
1141 "CollectLeaderRegionHandler",
1142 "CollectTopicStatsHandler",
1143 "CollectStatsHandler",
1144 "RemapFlowPeerHandler",
1145 ];
1146
1147 assert_eq!(names.len(), handlers.len());
1148 for (handler, name) in handlers.iter().zip(names.into_iter()) {
1149 assert_eq!(handler.name, name);
1150 }
1151 }
1152
1153 #[test]
1154 fn test_handler_group_builder_replace_last() {
1155 let mut builder =
1156 HeartbeatHandlerGroupBuilder::new(Pushers::default()).add_default_handlers();
1157 builder
1158 .replace_handler("CollectStatsHandler", ResponseHeaderHandler)
1159 .unwrap();
1160
1161 let group = builder.build().unwrap();
1162 let handlers = group.handlers;
1163 let names = [
1164 "ResponseHeaderHandler",
1165 "DatanodeKeepLeaseHandler",
1166 "FlownodeKeepLeaseHandler",
1167 "CheckLeaderHandler",
1168 "OnLeaderStartHandler",
1169 "ExtractStatHandler",
1170 "CollectDatanodeClusterInfoHandler",
1171 "CollectFrontendClusterInfoHandler",
1172 "CollectFlownodeClusterInfoHandler",
1173 "MailboxHandler",
1174 "FilterInactiveRegionStatsHandler",
1175 "CollectLeaderRegionHandler",
1176 "CollectTopicStatsHandler",
1177 "ResponseHeaderHandler",
1178 "RemapFlowPeerHandler",
1179 ];
1180
1181 assert_eq!(names.len(), handlers.len());
1182 for (handler, name) in handlers.iter().zip(names.into_iter()) {
1183 assert_eq!(handler.name, name);
1184 }
1185 }
1186
1187 #[test]
1188 fn test_handler_group_builder_replace_first() {
1189 let mut builder =
1190 HeartbeatHandlerGroupBuilder::new(Pushers::default()).add_default_handlers();
1191 builder
1192 .replace_handler("ResponseHeaderHandler", CollectStatsHandler::default())
1193 .unwrap();
1194
1195 let group = builder.build().unwrap();
1196 let handlers = group.handlers;
1197 let names = [
1198 "CollectStatsHandler",
1199 "DatanodeKeepLeaseHandler",
1200 "FlownodeKeepLeaseHandler",
1201 "CheckLeaderHandler",
1202 "OnLeaderStartHandler",
1203 "ExtractStatHandler",
1204 "CollectDatanodeClusterInfoHandler",
1205 "CollectFrontendClusterInfoHandler",
1206 "CollectFlownodeClusterInfoHandler",
1207 "MailboxHandler",
1208 "FilterInactiveRegionStatsHandler",
1209 "CollectLeaderRegionHandler",
1210 "CollectTopicStatsHandler",
1211 "CollectStatsHandler",
1212 "RemapFlowPeerHandler",
1213 ];
1214 assert_eq!(names.len(), handlers.len());
1215 for (handler, name) in handlers.iter().zip(names.into_iter()) {
1216 assert_eq!(handler.name, name);
1217 }
1218 }
1219
1220 #[test]
1221 fn test_handler_group_builder_handler_not_found() {
1222 let mut builder =
1223 HeartbeatHandlerGroupBuilder::new(Pushers::default()).add_default_handlers();
1224 let err = builder
1225 .add_handler_before("NotExists", CollectStatsHandler::default())
1226 .unwrap_err();
1227 assert_matches!(err, error::Error::HandlerNotFound { .. });
1228
1229 let err = builder
1230 .add_handler_after("NotExists", CollectStatsHandler::default())
1231 .unwrap_err();
1232 assert_matches!(err, error::Error::HandlerNotFound { .. });
1233
1234 let err = builder
1235 .replace_handler("NotExists", CollectStatsHandler::default())
1236 .unwrap_err();
1237 assert_matches!(err, error::Error::HandlerNotFound { .. });
1238 }
1239
1240 #[tokio::test]
1241 async fn test_pusher_drop() {
1242 let (tx, _rx) = mpsc::channel(1);
1243 let pusher = Pusher::new(tx);
1244 let mut deregister_signal_tx = pusher.deregister_signal_receiver.clone();
1245
1246 drop(pusher);
1247 deregister_signal_tx.changed().await.unwrap();
1248 }
1249}