1use std::collections::{BTreeMap, HashSet};
16use std::fmt::{Debug, Display};
17use std::ops::Range;
18use std::sync::{Arc, Mutex};
19use std::time::{Duration, Instant};
20
21use api::v1::meta::mailbox_message::Payload;
22use api::v1::meta::{
23 HeartbeatRequest, HeartbeatResponse, MailboxMessage, PROTOCOL_VERSION, RegionLease,
24 ResponseHeader, Role,
25};
26use check_leader_handler::CheckLeaderHandler;
27use collect_cluster_info_handler::{
28 CollectDatanodeClusterInfoHandler, CollectFlownodeClusterInfoHandler,
29 CollectFrontendClusterInfoHandler,
30};
31use collect_leader_region_handler::CollectLeaderRegionHandler;
32use collect_stats_handler::CollectStatsHandler;
33use common_base::Plugins;
34use common_meta::datanode::Stat;
35use common_meta::instruction::{Instruction, InstructionReply};
36use common_meta::sequence::Sequence;
37use common_telemetry::{debug, info, warn};
38use dashmap::DashMap;
39use extract_stat_handler::ExtractStatHandler;
40use failure_handler::RegionFailureHandler;
41use filter_inactive_region_stats::FilterInactiveRegionStatsHandler;
42use futures::future::join_all;
43use keep_lease_handler::{DatanodeKeepLeaseHandler, FlownodeKeepLeaseHandler};
44use mailbox_handler::MailboxHandler;
45use on_leader_start_handler::OnLeaderStartHandler;
46use publish_heartbeat_handler::PublishHeartbeatHandler;
47use region_lease_handler::RegionLeaseHandler;
48use remap_flow_peer_handler::RemapFlowPeerHandler;
49use response_header_handler::ResponseHeaderHandler;
50use snafu::{OptionExt, ResultExt};
51use store_api::storage::RegionId;
52use tokio::sync::mpsc::Sender;
53use tokio::sync::{Notify, RwLock, oneshot, watch};
54
55use crate::error::{self, DeserializeFromJsonSnafu, Result, UnexpectedInstructionReplySnafu};
56use crate::handler::collect_topic_stats_handler::CollectTopicStatsHandler;
57use crate::handler::flow_state_handler::FlowStateHandler;
58use crate::handler::persist_stats_handler::PersistStatsHandler;
59use crate::metasrv::Context;
60use crate::metrics::{METRIC_META_HANDLER_EXECUTE, METRIC_META_HEARTBEAT_CONNECTION_NUM};
61use crate::pubsub::PublisherRef;
62use crate::service::mailbox::{
63 BroadcastChannel, Channel, Mailbox, MailboxReceiver, MailboxRef, MessageId,
64};
65
66pub mod check_leader_handler;
67pub mod collect_cluster_info_handler;
68pub mod collect_leader_region_handler;
69pub mod collect_stats_handler;
70pub mod collect_topic_stats_handler;
71pub mod extract_stat_handler;
72pub mod failure_handler;
73pub mod filter_inactive_region_stats;
74pub mod flow_state_handler;
75pub mod keep_lease_handler;
76pub mod mailbox_handler;
77pub mod on_leader_start_handler;
78pub mod persist_stats_handler;
79pub mod publish_heartbeat_handler;
80pub mod region_lease_handler;
81pub mod remap_flow_peer_handler;
82pub mod response_header_handler;
83
84#[cfg(test)]
85pub mod test_utils;
86
87#[async_trait::async_trait]
88pub trait HeartbeatHandler: Send + Sync {
89 fn is_acceptable(&self, role: Role) -> bool;
90
91 fn name(&self) -> &'static str {
92 let type_name = std::any::type_name::<Self>();
93 type_name.split("::").last().unwrap_or(type_name)
95 }
96
97 async fn handle(
98 &self,
99 req: &HeartbeatRequest,
100 ctx: &mut Context,
101 acc: &mut HeartbeatAccumulator,
102 ) -> Result<HandleControl>;
103}
104
105#[derive(PartialEq, Debug)]
109pub enum HandleControl {
110 Continue,
111 Done,
112}
113
114#[derive(Debug, Default)]
115pub struct HeartbeatAccumulator {
116 pub header: Option<ResponseHeader>,
117 pub instructions: Vec<Instruction>,
118 pub stat: Option<Stat>,
119 pub inactive_region_ids: HashSet<RegionId>,
120 pub region_lease: Option<RegionLease>,
121}
122
123impl HeartbeatAccumulator {
124 pub fn into_mailbox_message(self) -> Option<MailboxMessage> {
125 None
127 }
128}
129
130#[derive(Copy, Clone)]
131pub struct PusherId {
132 pub role: Role,
133 pub id: u64,
134}
135
136impl Debug for PusherId {
137 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
138 write!(f, "{:?}-{}", self.role, self.id)
139 }
140}
141
142impl Display for PusherId {
143 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
144 write!(f, "{:?}-{}", self.role, self.id)
145 }
146}
147
148impl PusherId {
149 pub fn new(role: Role, id: u64) -> Self {
150 Self { role, id }
151 }
152
153 pub fn string_key(&self) -> String {
154 format!("{}-{}", self.role as i32, self.id)
155 }
156}
157
158pub type DeregisterSignalReceiver = watch::Receiver<bool>;
160
161pub struct Pusher {
163 sender: Sender<std::result::Result<HeartbeatResponse, tonic::Status>>,
164 deregister_signal_sender: watch::Sender<bool>,
168 deregister_signal_receiver: DeregisterSignalReceiver,
169
170 res_header: ResponseHeader,
171}
172
173impl Drop for Pusher {
174 fn drop(&mut self) {
175 let _ = self.deregister_signal_sender.send(true);
178 }
179}
180
181impl Pusher {
182 pub fn new(sender: Sender<std::result::Result<HeartbeatResponse, tonic::Status>>) -> Self {
183 let res_header = ResponseHeader {
184 protocol_version: PROTOCOL_VERSION,
185 ..Default::default()
186 };
187 let (deregister_signal_sender, deregister_signal_receiver) = watch::channel(false);
188 Self {
189 sender,
190 deregister_signal_sender,
191 deregister_signal_receiver,
192 res_header,
193 }
194 }
195
196 #[inline]
197 pub async fn push(&self, res: HeartbeatResponse) -> Result<()> {
198 self.sender.send(Ok(res)).await.map_err(|e| {
199 error::PushMessageSnafu {
200 err_msg: e.to_string(),
201 }
202 .build()
203 })
204 }
205
206 #[inline]
207 pub fn header(&self) -> ResponseHeader {
208 self.res_header.clone()
209 }
210}
211
212#[derive(Clone, Default)]
214pub struct Pushers(Arc<RwLock<BTreeMap<String, Pusher>>>);
215
216impl Pushers {
217 async fn push(
218 &self,
219 pusher_id: PusherId,
220 mailbox_message: MailboxMessage,
221 ) -> Result<DeregisterSignalReceiver> {
222 let pusher_id = pusher_id.string_key();
223 let pushers = self.0.read().await;
224 let pusher = pushers
225 .get(&pusher_id)
226 .context(error::PusherNotFoundSnafu { pusher_id })?;
227
228 pusher
229 .push(HeartbeatResponse {
230 header: Some(pusher.header()),
231 mailbox_message: Some(mailbox_message),
232 ..Default::default()
233 })
234 .await?;
235
236 Ok(pusher.deregister_signal_receiver.clone())
237 }
238
239 async fn broadcast(
240 &self,
241 range: Range<String>,
242 mailbox_message: &MailboxMessage,
243 ) -> Result<()> {
244 let pushers = self.0.read().await;
245 let pushers = pushers
246 .range(range)
247 .map(|(_, value)| value)
248 .collect::<Vec<_>>();
249 let mut results = Vec::with_capacity(pushers.len());
250
251 for pusher in pushers {
252 let mut mailbox_message = mailbox_message.clone();
253 mailbox_message.id = 0; results.push(pusher.push(HeartbeatResponse {
256 header: Some(pusher.header()),
257 mailbox_message: Some(mailbox_message),
258 ..Default::default()
259 }))
260 }
261
262 let _ = join_all(results)
264 .await
265 .into_iter()
266 .collect::<Result<Vec<_>>>()?;
267
268 Ok(())
269 }
270
271 pub(crate) async fn insert(&self, pusher_id: String, pusher: Pusher) -> Option<Pusher> {
272 self.0.write().await.insert(pusher_id, pusher)
273 }
274
275 async fn remove(&self, pusher_id: &str) -> Option<Pusher> {
276 self.0.write().await.remove(pusher_id)
277 }
278}
279
280#[derive(Clone)]
281pub struct NameCachedHandler {
282 name: &'static str,
283 handler: Arc<dyn HeartbeatHandler>,
284}
285
286impl NameCachedHandler {
287 fn new(handler: impl HeartbeatHandler + 'static) -> Self {
288 let name = handler.name();
289 let handler = Arc::new(handler);
290 Self { name, handler }
291 }
292}
293
294pub type HeartbeatHandlerGroupRef = Arc<HeartbeatHandlerGroup>;
295
296#[derive(Default, Clone)]
298pub struct HeartbeatHandlerGroup {
299 handlers: Vec<NameCachedHandler>,
300 pushers: Pushers,
301}
302
303impl HeartbeatHandlerGroup {
304 pub async fn register_pusher(&self, pusher_id: PusherId, pusher: Pusher) {
306 METRIC_META_HEARTBEAT_CONNECTION_NUM.inc();
307 info!("Pusher register: {}", pusher_id);
308 let _ = self.pushers.insert(pusher_id.string_key(), pusher).await;
309 }
310
311 pub async fn deregister_push(&self, pusher_id: PusherId) -> Option<Pusher> {
315 METRIC_META_HEARTBEAT_CONNECTION_NUM.dec();
316 info!("Pusher unregister: {}", pusher_id);
317 self.pushers.remove(&pusher_id.string_key()).await
318 }
319
320 pub fn pushers(&self) -> Pushers {
322 self.pushers.clone()
323 }
324
325 pub async fn handle(
327 &self,
328 req: HeartbeatRequest,
329 mut ctx: Context,
330 ) -> Result<HeartbeatResponse> {
331 let mut acc = HeartbeatAccumulator::default();
332 let role = req
333 .header
334 .as_ref()
335 .and_then(|h| Role::try_from(h.role).ok())
336 .context(error::InvalidArgumentsSnafu {
337 err_msg: format!("invalid role: {:?}", req.header),
338 })?;
339
340 for NameCachedHandler { name, handler } in self.handlers.iter() {
341 if !handler.is_acceptable(role) {
342 continue;
343 }
344
345 let _timer = METRIC_META_HANDLER_EXECUTE
346 .with_label_values(&[*name])
347 .start_timer();
348
349 if handler.handle(&req, &mut ctx, &mut acc).await? == HandleControl::Done {
350 break;
351 }
352 }
353 let header = std::mem::take(&mut acc.header);
354 let res = HeartbeatResponse {
355 header,
356 region_lease: acc.region_lease,
357 ..Default::default()
358 };
359 Ok(res)
360 }
361}
362
363pub struct HeartbeatMailbox {
364 pushers: Pushers,
365 sequence: Sequence,
366 senders: DashMap<MessageId, oneshot::Sender<Result<MailboxMessage>>>,
367 timeouts: DashMap<MessageId, Instant>,
368 timeout_notify: Notify,
369}
370
371impl HeartbeatMailbox {
372 pub fn json_reply(msg: &MailboxMessage) -> Result<InstructionReply> {
373 let Payload::Json(payload) =
374 msg.payload
375 .as_ref()
376 .with_context(|| UnexpectedInstructionReplySnafu {
377 mailbox_message: msg.to_string(),
378 reason: format!("empty payload, msg: {msg:?}"),
379 })?;
380 serde_json::from_str(payload).context(DeserializeFromJsonSnafu { input: payload })
381 }
382
383 #[cfg(test)]
385 pub fn json_instruction(msg: &MailboxMessage) -> Result<Instruction> {
386 let Payload::Json(payload) =
387 msg.payload
388 .as_ref()
389 .with_context(|| UnexpectedInstructionReplySnafu {
390 mailbox_message: msg.to_string(),
391 reason: format!("empty payload, msg: {msg:?}"),
392 })?;
393 serde_json::from_str(payload).context(DeserializeFromJsonSnafu { input: payload })
394 }
395
396 pub fn create(pushers: Pushers, sequence: Sequence) -> MailboxRef {
397 let mailbox = Arc::new(Self::new(pushers, sequence));
398
399 let timeout_checker = mailbox.clone();
400 let _handle = common_runtime::spawn_global(async move {
401 timeout_checker.check_timeout_bg(10).await;
402 });
403
404 mailbox
405 }
406
407 fn new(pushers: Pushers, sequence: Sequence) -> Self {
408 Self {
409 pushers,
410 sequence,
411 senders: DashMap::default(),
412 timeouts: DashMap::default(),
413 timeout_notify: Notify::new(),
414 }
415 }
416
417 async fn check_timeout_bg(&self, interval_millis: u64) {
418 let mut interval = tokio::time::interval(Duration::from_millis(interval_millis));
419
420 loop {
421 let _ = interval.tick().await;
422
423 if self.timeouts.is_empty() {
424 self.timeout_notify.notified().await;
425 }
426
427 let now = Instant::now();
428 let timeout_ids = self
429 .timeouts
430 .iter()
431 .filter_map(|entry| {
432 let (id, deadline) = entry.pair();
433 if deadline < &now { Some(*id) } else { None }
434 })
435 .collect::<Vec<_>>();
436
437 for id in timeout_ids {
438 let _ = self
439 .on_recv(id, Err(error::MailboxTimeoutSnafu { id }.build()))
440 .await;
441 }
442 }
443 }
444
445 #[inline]
446 async fn next_message_id(&self) -> Result<u64> {
447 loop {
450 let next = self
451 .sequence
452 .next()
453 .await
454 .context(error::NextSequenceSnafu)?;
455 if next > 0 {
456 return Ok(next);
457 }
458 }
459 }
460}
461
462#[async_trait::async_trait]
463impl Mailbox for HeartbeatMailbox {
464 async fn send(
465 &self,
466 ch: &Channel,
467 mut msg: MailboxMessage,
468 timeout: Duration,
469 ) -> Result<MailboxReceiver> {
470 let message_id = self.next_message_id().await?;
471 msg.id = message_id;
472
473 let pusher_id = ch.pusher_id();
474 debug!("Sending mailbox message {msg:?} to {pusher_id}");
475
476 let (tx, rx) = oneshot::channel();
477 let _ = self.senders.insert(message_id, tx);
478 let deadline = Instant::now() + timeout;
479 self.timeouts.insert(message_id, deadline);
480 self.timeout_notify.notify_one();
481
482 let deregister_signal_receiver = self.pushers.push(pusher_id, msg).await?;
483
484 Ok(MailboxReceiver::new(
485 message_id,
486 rx,
487 deregister_signal_receiver,
488 *ch,
489 ))
490 }
491
492 async fn send_oneway(&self, ch: &Channel, mut msg: MailboxMessage) -> Result<()> {
493 let message_id = 0; msg.id = message_id;
495
496 let pusher_id = ch.pusher_id();
497 debug!("Sending mailbox message {msg:?} to {pusher_id}");
498
499 self.pushers.push(pusher_id, msg).await?;
500
501 Ok(())
502 }
503
504 async fn broadcast(&self, ch: &BroadcastChannel, msg: &MailboxMessage) -> Result<()> {
505 self.pushers.broadcast(ch.pusher_range(), msg).await
506 }
507
508 async fn on_recv(&self, id: MessageId, maybe_msg: Result<MailboxMessage>) -> Result<()> {
509 debug!("Received mailbox message {maybe_msg:?}");
510
511 let _ = self.timeouts.remove(&id);
512
513 if let Some((_, tx)) = self.senders.remove(&id) {
514 tx.send(maybe_msg)
515 .map_err(|_| error::MailboxClosedSnafu { id }.build())?;
516 } else if let Ok(finally_msg) = maybe_msg {
517 warn!("The response arrived too late: {finally_msg:?}");
518 }
519
520 Ok(())
521 }
522}
523
524pub struct HeartbeatHandlerGroupBuilder {
526 region_failure_handler: Option<RegionFailureHandler>,
528
529 region_lease_handler: Option<RegionLeaseHandler>,
531
532 flush_stats_factor: Option<usize>,
536 flow_state_handler: Option<FlowStateHandler>,
538
539 persist_stats_handler: Option<PersistStatsHandler>,
541
542 plugins: Option<Plugins>,
544
545 pushers: Pushers,
547
548 handlers: Vec<NameCachedHandler>,
550}
551
552impl HeartbeatHandlerGroupBuilder {
553 pub fn new(pushers: Pushers) -> Self {
554 Self {
555 region_failure_handler: None,
556 region_lease_handler: None,
557 flush_stats_factor: None,
558 flow_state_handler: None,
559 persist_stats_handler: None,
560 plugins: None,
561 pushers,
562 handlers: vec![],
563 }
564 }
565
566 pub fn with_flow_state_handler(mut self, handler: Option<FlowStateHandler>) -> Self {
567 self.flow_state_handler = handler;
568 self
569 }
570
571 pub fn with_region_lease_handler(mut self, handler: Option<RegionLeaseHandler>) -> Self {
572 self.region_lease_handler = handler;
573 self
574 }
575
576 pub fn with_region_failure_handler(mut self, handler: Option<RegionFailureHandler>) -> Self {
578 self.region_failure_handler = handler;
579 self
580 }
581
582 pub fn with_flush_stats_factor(mut self, flush_stats_factor: Option<usize>) -> Self {
584 self.flush_stats_factor = flush_stats_factor;
585 self
586 }
587
588 pub fn with_persist_stats_handler(mut self, handler: Option<PersistStatsHandler>) -> Self {
589 self.persist_stats_handler = handler;
590 self
591 }
592
593 pub fn with_plugins(mut self, plugins: Option<Plugins>) -> Self {
595 self.plugins = plugins;
596 self
597 }
598
599 pub fn add_default_handlers(mut self) -> Self {
601 let publish_heartbeat_handler = if let Some(plugins) = self.plugins.as_ref() {
603 plugins
604 .get::<PublisherRef>()
605 .map(|publish| PublishHeartbeatHandler::new(publish.clone()))
606 } else {
607 None
608 };
609
610 self.add_handler_last(ResponseHeaderHandler);
611 self.add_handler_last(DatanodeKeepLeaseHandler);
615 self.add_handler_last(FlownodeKeepLeaseHandler);
616 self.add_handler_last(CheckLeaderHandler);
617 self.add_handler_last(OnLeaderStartHandler);
618 self.add_handler_last(ExtractStatHandler);
619 self.add_handler_last(CollectDatanodeClusterInfoHandler);
620 self.add_handler_last(CollectFrontendClusterInfoHandler);
621 self.add_handler_last(CollectFlownodeClusterInfoHandler);
622 self.add_handler_last(MailboxHandler);
623 if let Some(region_lease_handler) = self.region_lease_handler.take() {
624 self.add_handler_last(region_lease_handler);
625 }
626 self.add_handler_last(FilterInactiveRegionStatsHandler);
627 if let Some(region_failure_handler) = self.region_failure_handler.take() {
628 self.add_handler_last(region_failure_handler);
629 }
630 if let Some(publish_heartbeat_handler) = publish_heartbeat_handler {
631 self.add_handler_last(publish_heartbeat_handler);
632 }
633 self.add_handler_last(CollectLeaderRegionHandler);
634 self.add_handler_last(CollectTopicStatsHandler);
635 if let Some(persist_stats_handler) = self.persist_stats_handler.take() {
638 self.add_handler_last(persist_stats_handler);
639 }
640 self.add_handler_last(CollectStatsHandler::new(self.flush_stats_factor));
641 self.add_handler_last(RemapFlowPeerHandler::default());
642
643 if let Some(flow_state_handler) = self.flow_state_handler.take() {
644 self.add_handler_last(flow_state_handler);
645 }
646
647 self
648 }
649
650 pub fn build(mut self) -> Result<HeartbeatHandlerGroup> {
654 if let Some(customizer) = self
655 .plugins
656 .as_ref()
657 .and_then(|plugins| plugins.get::<HeartbeatHandlerGroupBuilderCustomizerRef>())
658 {
659 debug!("Customizing the heartbeat handler group builder");
660 customizer.customize(&mut self)?;
661 }
662
663 Ok(HeartbeatHandlerGroup {
664 handlers: self.handlers,
665 pushers: self.pushers,
666 })
667 }
668
669 fn add_handler_after_inner(&mut self, target: &str, handler: NameCachedHandler) -> Result<()> {
670 if let Some(pos) = self.handlers.iter().position(|x| x.name == target) {
671 self.handlers.insert(pos + 1, handler);
672 return Ok(());
673 }
674
675 error::HandlerNotFoundSnafu { name: target }.fail()
676 }
677
678 pub fn add_handler_after(
680 &mut self,
681 target: &'static str,
682 handler: impl HeartbeatHandler + 'static,
683 ) -> Result<()> {
684 self.add_handler_after_inner(target, NameCachedHandler::new(handler))
685 }
686
687 fn add_handler_before_inner(&mut self, target: &str, handler: NameCachedHandler) -> Result<()> {
688 if let Some(pos) = self.handlers.iter().position(|x| x.name == target) {
689 self.handlers.insert(pos, handler);
690 return Ok(());
691 }
692
693 error::HandlerNotFoundSnafu { name: target }.fail()
694 }
695
696 pub fn add_handler_before(
698 &mut self,
699 target: &'static str,
700 handler: impl HeartbeatHandler + 'static,
701 ) -> Result<()> {
702 self.add_handler_before_inner(target, NameCachedHandler::new(handler))
703 }
704
705 fn replace_handler_inner(&mut self, target: &str, handler: NameCachedHandler) -> Result<()> {
706 if let Some(pos) = self.handlers.iter().position(|x| x.name == target) {
707 self.handlers[pos] = handler;
708 return Ok(());
709 }
710
711 error::HandlerNotFoundSnafu { name: target }.fail()
712 }
713
714 pub fn replace_handler(
716 &mut self,
717 target: &'static str,
718 handler: impl HeartbeatHandler + 'static,
719 ) -> Result<()> {
720 self.replace_handler_inner(target, NameCachedHandler::new(handler))
721 }
722
723 fn add_handler_last_inner(&mut self, handler: NameCachedHandler) {
724 self.handlers.push(handler);
725 }
726
727 fn add_handler_last(&mut self, handler: impl HeartbeatHandler + 'static) {
728 self.add_handler_last_inner(NameCachedHandler::new(handler));
729 }
730}
731
732pub type HeartbeatHandlerGroupBuilderCustomizerRef =
733 Arc<dyn HeartbeatHandlerGroupBuilderCustomizer>;
734
735pub enum CustomizeHeartbeatGroupAction {
736 AddHandlerAfter {
737 target: String,
738 handler: NameCachedHandler,
739 },
740 AddHandlerBefore {
741 target: String,
742 handler: NameCachedHandler,
743 },
744 ReplaceHandler {
745 target: String,
746 handler: NameCachedHandler,
747 },
748 AddHandlerLast {
749 handler: NameCachedHandler,
750 },
751}
752
753impl CustomizeHeartbeatGroupAction {
754 pub fn new_add_handler_after(
755 target: &'static str,
756 handler: impl HeartbeatHandler + 'static,
757 ) -> Self {
758 Self::AddHandlerAfter {
759 target: target.to_string(),
760 handler: NameCachedHandler::new(handler),
761 }
762 }
763
764 pub fn new_add_handler_before(
765 target: &'static str,
766 handler: impl HeartbeatHandler + 'static,
767 ) -> Self {
768 Self::AddHandlerBefore {
769 target: target.to_string(),
770 handler: NameCachedHandler::new(handler),
771 }
772 }
773
774 pub fn new_replace_handler(
775 target: &'static str,
776 handler: impl HeartbeatHandler + 'static,
777 ) -> Self {
778 Self::ReplaceHandler {
779 target: target.to_string(),
780 handler: NameCachedHandler::new(handler),
781 }
782 }
783
784 pub fn new_add_handler_last(handler: impl HeartbeatHandler + 'static) -> Self {
785 Self::AddHandlerLast {
786 handler: NameCachedHandler::new(handler),
787 }
788 }
789}
790
791pub trait HeartbeatHandlerGroupBuilderCustomizer: Send + Sync {
793 fn customize(&self, builder: &mut HeartbeatHandlerGroupBuilder) -> Result<()>;
794
795 fn add_action(&self, action: CustomizeHeartbeatGroupAction);
796}
797
798#[derive(Default)]
799pub struct DefaultHeartbeatHandlerGroupBuilderCustomizer {
800 actions: Mutex<Vec<CustomizeHeartbeatGroupAction>>,
801}
802
803impl HeartbeatHandlerGroupBuilderCustomizer for DefaultHeartbeatHandlerGroupBuilderCustomizer {
804 fn customize(&self, builder: &mut HeartbeatHandlerGroupBuilder) -> Result<()> {
805 info!("Customizing the heartbeat handler group builder");
806 let mut actions = self.actions.lock().unwrap();
807 for action in actions.drain(..) {
808 match action {
809 CustomizeHeartbeatGroupAction::AddHandlerAfter { target, handler } => {
810 builder.add_handler_after_inner(&target, handler)?;
811 }
812 CustomizeHeartbeatGroupAction::AddHandlerBefore { target, handler } => {
813 builder.add_handler_before_inner(&target, handler)?;
814 }
815 CustomizeHeartbeatGroupAction::ReplaceHandler { target, handler } => {
816 builder.replace_handler_inner(&target, handler)?;
817 }
818 CustomizeHeartbeatGroupAction::AddHandlerLast { handler } => {
819 builder.add_handler_last_inner(handler);
820 }
821 }
822 }
823 Ok(())
824 }
825
826 fn add_action(&self, action: CustomizeHeartbeatGroupAction) {
827 self.actions.lock().unwrap().push(action);
828 }
829}
830
831#[cfg(test)]
832mod tests {
833
834 use std::assert_matches::assert_matches;
835 use std::sync::Arc;
836 use std::time::Duration;
837
838 use api::v1::meta::{MailboxMessage, Role};
839 use common_meta::kv_backend::memory::MemoryKvBackend;
840 use common_meta::sequence::SequenceBuilder;
841 use tokio::sync::mpsc;
842
843 use super::{HeartbeatHandlerGroupBuilder, PusherId, Pushers};
844 use crate::error;
845 use crate::handler::collect_stats_handler::CollectStatsHandler;
846 use crate::handler::response_header_handler::ResponseHeaderHandler;
847 use crate::handler::{HeartbeatHandlerGroup, HeartbeatMailbox, Pusher};
848 use crate::service::mailbox::{Channel, MailboxReceiver, MailboxRef};
849
850 #[tokio::test]
851 async fn test_mailbox() {
852 let (mailbox, receiver) = push_msg_via_mailbox().await;
853 let id = receiver.message_id();
854
855 let resp_msg = MailboxMessage {
856 id,
857 subject: "resp-test".to_string(),
858 timestamp_millis: 456,
859 ..Default::default()
860 };
861
862 mailbox.on_recv(id, Ok(resp_msg)).await.unwrap();
863
864 let recv_msg = receiver.await.unwrap();
865 assert_eq!(recv_msg.id, id);
866 assert_eq!(recv_msg.timestamp_millis, 456);
867 assert_eq!(recv_msg.subject, "resp-test".to_string());
868 }
869
870 #[tokio::test]
871 async fn test_mailbox_timeout() {
872 let (_, receiver) = push_msg_via_mailbox().await;
873 let res = receiver.await;
874 assert!(res.is_err());
875 }
876
877 async fn push_msg_via_mailbox() -> (MailboxRef, MailboxReceiver) {
878 let datanode_id = 12;
879 let (pusher_tx, mut pusher_rx) = mpsc::channel(16);
880 let pusher_id = PusherId::new(Role::Datanode, datanode_id);
881 let pusher: Pusher = Pusher::new(pusher_tx);
882 let handler_group = HeartbeatHandlerGroup::default();
883 handler_group.register_pusher(pusher_id, pusher).await;
884
885 let kv_backend = Arc::new(MemoryKvBackend::new());
886 let seq = SequenceBuilder::new("test_seq", kv_backend).build();
887 let mailbox = HeartbeatMailbox::create(handler_group.pushers(), seq);
888
889 let msg = MailboxMessage {
890 id: 0,
891 subject: "req-test".to_string(),
892 timestamp_millis: 123,
893 ..Default::default()
894 };
895 let ch = Channel::Datanode(datanode_id);
896
897 let receiver = mailbox
898 .send(&ch, msg, Duration::from_secs(1))
899 .await
900 .unwrap();
901
902 let recv_obj = pusher_rx.recv().await.unwrap().unwrap();
903 let message = recv_obj.mailbox_message.unwrap();
904 assert_eq!(message.timestamp_millis, 123);
905 assert_eq!(message.subject, "req-test".to_string());
906
907 (mailbox, receiver)
908 }
909
910 #[test]
911 fn test_handler_group_builder() {
912 let group = HeartbeatHandlerGroupBuilder::new(Pushers::default())
913 .add_default_handlers()
914 .build()
915 .unwrap();
916
917 let handlers = group.handlers;
918 let names = [
919 "ResponseHeaderHandler",
920 "DatanodeKeepLeaseHandler",
921 "FlownodeKeepLeaseHandler",
922 "CheckLeaderHandler",
923 "OnLeaderStartHandler",
924 "ExtractStatHandler",
925 "CollectDatanodeClusterInfoHandler",
926 "CollectFrontendClusterInfoHandler",
927 "CollectFlownodeClusterInfoHandler",
928 "MailboxHandler",
929 "FilterInactiveRegionStatsHandler",
930 "CollectLeaderRegionHandler",
931 "CollectTopicStatsHandler",
932 "CollectStatsHandler",
933 "RemapFlowPeerHandler",
934 ];
935 assert_eq!(names.len(), handlers.len());
936 for (handler, name) in handlers.iter().zip(names.into_iter()) {
937 assert_eq!(handler.name, name);
938 }
939 }
940
941 #[test]
942 fn test_handler_group_builder_add_before() {
943 let mut builder =
944 HeartbeatHandlerGroupBuilder::new(Pushers::default()).add_default_handlers();
945 builder
946 .add_handler_before(
947 "FilterInactiveRegionStatsHandler",
948 CollectStatsHandler::default(),
949 )
950 .unwrap();
951
952 let group = builder.build().unwrap();
953 let handlers = group.handlers;
954 let names = [
955 "ResponseHeaderHandler",
956 "DatanodeKeepLeaseHandler",
957 "FlownodeKeepLeaseHandler",
958 "CheckLeaderHandler",
959 "OnLeaderStartHandler",
960 "ExtractStatHandler",
961 "CollectDatanodeClusterInfoHandler",
962 "CollectFrontendClusterInfoHandler",
963 "CollectFlownodeClusterInfoHandler",
964 "MailboxHandler",
965 "CollectStatsHandler",
966 "FilterInactiveRegionStatsHandler",
967 "CollectLeaderRegionHandler",
968 "CollectTopicStatsHandler",
969 "CollectStatsHandler",
970 "RemapFlowPeerHandler",
971 ];
972 assert_eq!(names.len(), handlers.len());
973 for (handler, name) in handlers.iter().zip(names.into_iter()) {
974 assert_eq!(handler.name, name);
975 }
976 }
977
978 #[test]
979 fn test_handler_group_builder_add_before_first() {
980 let mut builder =
981 HeartbeatHandlerGroupBuilder::new(Pushers::default()).add_default_handlers();
982 builder
983 .add_handler_before("ResponseHeaderHandler", CollectStatsHandler::default())
984 .unwrap();
985
986 let group = builder.build().unwrap();
987 let handlers = group.handlers;
988 let names = [
989 "CollectStatsHandler",
990 "ResponseHeaderHandler",
991 "DatanodeKeepLeaseHandler",
992 "FlownodeKeepLeaseHandler",
993 "CheckLeaderHandler",
994 "OnLeaderStartHandler",
995 "ExtractStatHandler",
996 "CollectDatanodeClusterInfoHandler",
997 "CollectFrontendClusterInfoHandler",
998 "CollectFlownodeClusterInfoHandler",
999 "MailboxHandler",
1000 "FilterInactiveRegionStatsHandler",
1001 "CollectLeaderRegionHandler",
1002 "CollectTopicStatsHandler",
1003 "CollectStatsHandler",
1004 "RemapFlowPeerHandler",
1005 ];
1006 assert_eq!(names.len(), handlers.len());
1007 for (handler, name) in handlers.iter().zip(names.into_iter()) {
1008 assert_eq!(handler.name, name);
1009 }
1010 }
1011
1012 #[test]
1013 fn test_handler_group_builder_add_after() {
1014 let mut builder =
1015 HeartbeatHandlerGroupBuilder::new(Pushers::default()).add_default_handlers();
1016 builder
1017 .add_handler_after("MailboxHandler", CollectStatsHandler::default())
1018 .unwrap();
1019
1020 let group = builder.build().unwrap();
1021 let handlers = group.handlers;
1022 let names = [
1023 "ResponseHeaderHandler",
1024 "DatanodeKeepLeaseHandler",
1025 "FlownodeKeepLeaseHandler",
1026 "CheckLeaderHandler",
1027 "OnLeaderStartHandler",
1028 "ExtractStatHandler",
1029 "CollectDatanodeClusterInfoHandler",
1030 "CollectFrontendClusterInfoHandler",
1031 "CollectFlownodeClusterInfoHandler",
1032 "MailboxHandler",
1033 "CollectStatsHandler",
1034 "FilterInactiveRegionStatsHandler",
1035 "CollectLeaderRegionHandler",
1036 "CollectTopicStatsHandler",
1037 "CollectStatsHandler",
1038 "RemapFlowPeerHandler",
1039 ];
1040 assert_eq!(names.len(), handlers.len());
1041 for (handler, name) in handlers.iter().zip(names.into_iter()) {
1042 assert_eq!(handler.name, name);
1043 }
1044 }
1045
1046 #[test]
1047 fn test_handler_group_builder_add_after_last() {
1048 let mut builder =
1049 HeartbeatHandlerGroupBuilder::new(Pushers::default()).add_default_handlers();
1050 builder
1051 .add_handler_after("CollectStatsHandler", ResponseHeaderHandler)
1052 .unwrap();
1053
1054 let group = builder.build().unwrap();
1055 let handlers = group.handlers;
1056 let names = [
1057 "ResponseHeaderHandler",
1058 "DatanodeKeepLeaseHandler",
1059 "FlownodeKeepLeaseHandler",
1060 "CheckLeaderHandler",
1061 "OnLeaderStartHandler",
1062 "ExtractStatHandler",
1063 "CollectDatanodeClusterInfoHandler",
1064 "CollectFrontendClusterInfoHandler",
1065 "CollectFlownodeClusterInfoHandler",
1066 "MailboxHandler",
1067 "FilterInactiveRegionStatsHandler",
1068 "CollectLeaderRegionHandler",
1069 "CollectTopicStatsHandler",
1070 "CollectStatsHandler",
1071 "ResponseHeaderHandler",
1072 "RemapFlowPeerHandler",
1073 ];
1074 assert_eq!(names.len(), handlers.len());
1075 for (handler, name) in handlers.iter().zip(names.into_iter()) {
1076 assert_eq!(handler.name, name);
1077 }
1078 }
1079
1080 #[test]
1081 fn test_handler_group_builder_replace() {
1082 let mut builder =
1083 HeartbeatHandlerGroupBuilder::new(Pushers::default()).add_default_handlers();
1084 builder
1085 .replace_handler("MailboxHandler", CollectStatsHandler::default())
1086 .unwrap();
1087
1088 let group = builder.build().unwrap();
1089 let handlers = group.handlers;
1090 let names = [
1091 "ResponseHeaderHandler",
1092 "DatanodeKeepLeaseHandler",
1093 "FlownodeKeepLeaseHandler",
1094 "CheckLeaderHandler",
1095 "OnLeaderStartHandler",
1096 "ExtractStatHandler",
1097 "CollectDatanodeClusterInfoHandler",
1098 "CollectFrontendClusterInfoHandler",
1099 "CollectFlownodeClusterInfoHandler",
1100 "CollectStatsHandler",
1101 "FilterInactiveRegionStatsHandler",
1102 "CollectLeaderRegionHandler",
1103 "CollectTopicStatsHandler",
1104 "CollectStatsHandler",
1105 "RemapFlowPeerHandler",
1106 ];
1107
1108 assert_eq!(names.len(), handlers.len());
1109 for (handler, name) in handlers.iter().zip(names.into_iter()) {
1110 assert_eq!(handler.name, name);
1111 }
1112 }
1113
1114 #[test]
1115 fn test_handler_group_builder_replace_last() {
1116 let mut builder =
1117 HeartbeatHandlerGroupBuilder::new(Pushers::default()).add_default_handlers();
1118 builder
1119 .replace_handler("CollectStatsHandler", ResponseHeaderHandler)
1120 .unwrap();
1121
1122 let group = builder.build().unwrap();
1123 let handlers = group.handlers;
1124 let names = [
1125 "ResponseHeaderHandler",
1126 "DatanodeKeepLeaseHandler",
1127 "FlownodeKeepLeaseHandler",
1128 "CheckLeaderHandler",
1129 "OnLeaderStartHandler",
1130 "ExtractStatHandler",
1131 "CollectDatanodeClusterInfoHandler",
1132 "CollectFrontendClusterInfoHandler",
1133 "CollectFlownodeClusterInfoHandler",
1134 "MailboxHandler",
1135 "FilterInactiveRegionStatsHandler",
1136 "CollectLeaderRegionHandler",
1137 "CollectTopicStatsHandler",
1138 "ResponseHeaderHandler",
1139 "RemapFlowPeerHandler",
1140 ];
1141
1142 assert_eq!(names.len(), handlers.len());
1143 for (handler, name) in handlers.iter().zip(names.into_iter()) {
1144 assert_eq!(handler.name, name);
1145 }
1146 }
1147
1148 #[test]
1149 fn test_handler_group_builder_replace_first() {
1150 let mut builder =
1151 HeartbeatHandlerGroupBuilder::new(Pushers::default()).add_default_handlers();
1152 builder
1153 .replace_handler("ResponseHeaderHandler", CollectStatsHandler::default())
1154 .unwrap();
1155
1156 let group = builder.build().unwrap();
1157 let handlers = group.handlers;
1158 let names = [
1159 "CollectStatsHandler",
1160 "DatanodeKeepLeaseHandler",
1161 "FlownodeKeepLeaseHandler",
1162 "CheckLeaderHandler",
1163 "OnLeaderStartHandler",
1164 "ExtractStatHandler",
1165 "CollectDatanodeClusterInfoHandler",
1166 "CollectFrontendClusterInfoHandler",
1167 "CollectFlownodeClusterInfoHandler",
1168 "MailboxHandler",
1169 "FilterInactiveRegionStatsHandler",
1170 "CollectLeaderRegionHandler",
1171 "CollectTopicStatsHandler",
1172 "CollectStatsHandler",
1173 "RemapFlowPeerHandler",
1174 ];
1175 assert_eq!(names.len(), handlers.len());
1176 for (handler, name) in handlers.iter().zip(names.into_iter()) {
1177 assert_eq!(handler.name, name);
1178 }
1179 }
1180
1181 #[test]
1182 fn test_handler_group_builder_handler_not_found() {
1183 let mut builder =
1184 HeartbeatHandlerGroupBuilder::new(Pushers::default()).add_default_handlers();
1185 let err = builder
1186 .add_handler_before("NotExists", CollectStatsHandler::default())
1187 .unwrap_err();
1188 assert_matches!(err, error::Error::HandlerNotFound { .. });
1189
1190 let err = builder
1191 .add_handler_after("NotExists", CollectStatsHandler::default())
1192 .unwrap_err();
1193 assert_matches!(err, error::Error::HandlerNotFound { .. });
1194
1195 let err = builder
1196 .replace_handler("NotExists", CollectStatsHandler::default())
1197 .unwrap_err();
1198 assert_matches!(err, error::Error::HandlerNotFound { .. });
1199 }
1200
1201 #[tokio::test]
1202 async fn test_pusher_drop() {
1203 let (tx, _rx) = mpsc::channel(1);
1204 let pusher = Pusher::new(tx);
1205 let mut deregister_signal_tx = pusher.deregister_signal_receiver.clone();
1206
1207 drop(pusher);
1208 deregister_signal_tx.changed().await.unwrap();
1209 }
1210}